Просмотр исходного кода

add broker, bridges, listeners commands. fix issue #63

Ery Lee 11 лет назад
Родитель
Сommit
31c8ea3b73
2 измененных файлов с 47 добавлено и 10 удалено
  1. 6 0
      apps/emqttd/src/emqttd_bridge_sup.erl
  2. 41 10
      apps/emqttd/src/emqttd_ctl.erl

+ 6 - 0
apps/emqttd/src/emqttd_bridge_sup.erl

@@ -31,6 +31,7 @@
 -behavior(supervisor).
 
 -export([start_link/0,
+         bridges/0,
          start_bridge/2, start_bridge/3,
          stop_bridge/2]).
 
@@ -49,6 +50,11 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
+-spec bridges() -> [{tuple(), pid()}].
+bridges() ->
+    [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} 
+                                <- supervisor:which_children(?MODULE)].
+
 %%------------------------------------------------------------------------------
 %% @doc
 %% Start a bridge.

+ 41 - 10
apps/emqttd/src/emqttd_ctl.erl

@@ -36,16 +36,11 @@
 -define(PRINT(Format, Args), 
     io:format(Format, Args)).
 
--export([status/1,
-		cluster/1,
-		useradd/1,
-		userdel/1]).
-
-%TODO: add comment
-% bridge
-% metrics
-% broker
-% sockets
+-export([status/1, cluster/1,
+         listeners/1,
+         useradd/1, userdel/1,
+         broker/1,
+         bridges/1, start_bridge/1, stop_bridge/1]).
 
 status([]) ->
     {InternalStatus, _ProvidedStatus} = init:get_status(),
@@ -84,6 +79,41 @@ useradd([Username, Password]) ->
 userdel([Username]) ->
 	?PRINT("~p", [emqttd_auth:delete(list_to_binary(Username))]).
 
+broker([]) ->
+    Funs = [sysdescr, version, uptime, datetime],
+    [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs];
+
+broker(["stats"]) ->
+    [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()];
+    
+broker(["metrics"]) ->
+    [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()].
+    
+listeners([]) ->
+    lists:foreach(fun({{Protocol, Port}, Pid}) ->
+                ?PRINT("listener ~s:~p~n", [Protocol, Port]), 
+                ?PRINT("  acceptor_pool: ~p~n", [esockd:get_acceptor_pool(Pid)]),
+                ?PRINT("  max_clients: ~p~n", [esockd:get_max_clients(Pid)]),
+                ?PRINT("  current_clients: ~p~n", [esockd:get_current_clients(Pid)])
+        end, esockd:listeners()).
+
+bridges([]) ->
+    lists:foreach(fun({{Node, Topic}, _Pid}) -> 
+                ?PRINT("bridge: ~s ~s~n", [Node, Topic]) 
+        end, emqttd_bridge_sup:bridges()).
+
+start_bridge([SNode, Topic]) ->
+    case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
+        {ok, _} -> ?PRINT_MSG("bridge is started.~n"); 
+        {error, Error} -> ?PRINT("error: ~p~n", [Error])
+    end.
+
+stop_bridge([SNode, Topic]) ->
+    case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
+        ok -> ?PRINT_MSG("bridge is stopped.~n");
+        {error, Error} -> ?PRINT("error: ~p~n", [Error])
+    end.
+
 node_name(SNode) ->
     SNode1 =
     case string:tokens(SNode, "@") of
@@ -101,3 +131,4 @@ node_name(SNode) ->
          end
     end,
     list_to_atom(SNode1).
+