Explorar o código

fix issue #449 - Improve the CLI of cluster

Feng %!s(int64=10) %!d(string=hai) anos
pai
achega
767c4ccc6e
Modificáronse 1 ficheiros con 49 adicións e 46 borrados
  1. 49 46
      src/emqttd_cli.erl

+ 49 - 46
src/emqttd_cli.erl

@@ -14,8 +14,6 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc emqttd cli
-%% @author Feng Lee <feng@emqtt.io>
 -module(emqttd_cli).
 
 -include("emqttd.hrl").
@@ -106,56 +104,47 @@ broker(_) ->
             {"broker metrics", "query broker metrics"}]).
 
 %%--------------------------------------------------------------------
-%% @doc Cluster with other node
-cluster([]) ->
-    Nodes = emqttd_broker:running_nodes(),
-    ?PRINT("cluster nodes: ~p~n", [Nodes]);
-
-cluster(usage) ->
-    ?PRINT_CMD("cluster [<Node>]", "cluster with node, query cluster info");
-
-cluster([SNode]) ->
-    Node = emqttd_dist:parse_node(SNode),
-    case lists:member(Node, emqttd_broker:running_nodes()) of
-        true ->
-            ?PRINT("~s is already clustered~n", [Node]);
-        false ->
-            cluster(Node, fun() ->
-                emqttd_plugins:unload(),
-                stop_apps(),
-                emqttd_mnesia:cluster(Node),
-                start_apps() 
-           end)
+%% @doc Cluster with other nodes
+cluster(["join", SNode]) ->
+    case emqttd_cluster:join(emqttd_node:parse_name(SNode)) of
+        ok ->
+            ?PRINT_MSG("Join the cluster successfully.~n"),
+            cluster(["status"]);
+        {error, Error} ->
+            ?PRINT("Failed to join the cluster: ~p~n", [Error])
     end;
 
-cluster(_) ->
-    cluster(usage).
-
-cluster(Node, DoCluster) ->
-    cluster(net_adm:ping(Node), Node, DoCluster).
-
-cluster(pong, Node, DoCluster) ->
-    case emqttd:is_running(Node) of
-        true ->
-            DoCluster(),
-            ?PRINT("cluster with ~s successfully.~n", [Node]);
-        false ->
-            ?PRINT("emqttd is not running on ~s~n", [Node])
+cluster(["leave"]) ->
+    case emqttd_cluster:leave() of
+        ok ->
+            ?PRINT_MSG("Leave the cluster successfully.~n"),
+            cluster(["status"]);
+        {error, Error} ->
+            ?PRINT("Failed to leave the cluster: ~p~n", [Error])
     end;
 
-cluster(pang, Node, _DoCluster) ->
-    ?PRINT("Cannot connect to ~s~n", [Node]).
+cluster(["remove", SNode]) ->
+    case emqttd_cluster:remove(emqttd_node:parse_name(SNode)) of
+        ok ->
+            ?PRINT_MSG("Remove the node from cluster successfully.~n"),
+            cluster(["status"]);
+        {error, Error} ->
+            ?PRINT("Failed to remove the node from cluster: ~p~n", [Error])
+    end;
 
-stop_apps() ->
-    [application:stop(App) || App <- [emqttd, esockd, gproc]].
+cluster(["status"]) ->
+    ?PRINT("Cluster status: ~p~n", [emqttd_cluster:status()]);
 
-start_apps() ->
-    [application:start(App) || App <- [gproc, esockd, emqttd]].
+cluster(_) ->
+    ?USAGE([{"cluster join <Node>",  "Join the cluster"},
+            {"cluster leave",        "Leave the cluster"},
+            {"cluster remove <Node>","Remove the node from cluster"},
+            {"cluster status",       "Cluster status"}]).
 
 %%--------------------------------------------------------------------
 %% @doc Query clients
 clients(["list"]) ->
-    emqttd_mnesia:dump(ets, mqtt_client, fun print/1);
+    dump(ets, mqtt_client, fun print/1);
 
 clients(["show", ClientId]) ->
     if_client(ClientId, fun print/1);
@@ -180,10 +169,10 @@ sessions(["list"]) ->
     [sessions(["list", Type]) || Type <- ["persistent", "transient"]];
 
 sessions(["list", "persistent"]) ->
-    emqttd_mnesia:dump(ets, mqtt_persistent_session, fun print/1);
+    dump(ets, mqtt_persistent_session, fun print/1);
 
 sessions(["list", "transient"]) ->
-    emqttd_mnesia:dump(ets, mqtt_transient_session,  fun print/1);
+    dump(ets, mqtt_transient_session,  fun print/1);
 
 sessions(["show", ClientId]) ->
     MP = {{bin(ClientId), '_'}, '_'},
@@ -463,7 +452,7 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess,
     ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
             [ClientId, CleanSess, Username,
              emqttd_net:format(Peername),
-             emqttd_util:now_to_secs(ConnectedAt)]);
+             emqttd_time:now_to_secs(ConnectedAt)]);
 
 print(#mqtt_topic{topic = Topic, node = Node}) ->
     ?PRINT("~s on ~s~n", [Topic, Node]);
@@ -493,7 +482,7 @@ print(subscription, ClientId, Subscriptions) ->
     ?PRINT("~s: ~p~n", [ClientId, TopicTable]).
 
 format(created_at, Val) ->
-    emqttd_util:now_to_secs(Val);
+    emqttd_time:now_to_secs(Val);
 
 format(subscriptions, List) ->
     string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ",");
@@ -503,3 +492,17 @@ format(_, Val) ->
 
 bin(S) -> iolist_to_binary(S).
 
+%%TODO: ...
+dump(ets, Table, Fun) ->
+    dump(ets, Table, ets:first(Table), Fun).
+
+dump(ets, _Table, '$end_of_table', _Fun) ->
+    ok;
+
+dump(ets, Table, Key, Fun) ->
+    case ets:lookup(Table, Key) of
+        [Record] -> Fun(Record);
+        [] -> ignore
+    end,
+    dump(ets, Table, ets:next(Table, Key), Fun).
+