Feng Lee 10 лет назад
Родитель
Сommit
e09148b040
7 измененных файлов с 243 добавлено и 100 удалено
  1. 2 0
      src/emqttd_app.erl
  2. 22 0
      src/emqttd_auth_username.erl
  3. 19 2
      src/emqttd_broker.erl
  4. 98 96
      src/emqttd_ctl.erl
  5. 17 1
      src/emqttd_metrics.erl
  6. 34 0
      src/emqttd_plugins.erl
  7. 51 1
      src/emqttd_trace.erl

+ 2 - 0
src/emqttd_app.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_app).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -49,6 +50,7 @@
 start(_StartType, _StartArgs) ->
     print_banner(),
     emqttd_mnesia:start(),
+    emqttd_ctl:init(),
     {ok, Sup} = emqttd_sup:start_link(),
     start_servers(Sup),
     emqttd:load_all_mods(),

+ 22 - 0
src/emqttd_auth_username.erl

@@ -29,6 +29,10 @@
 -author('feng@emqtt.io').
 
 -include("emqttd.hrl").
+-include("emqttd_cli.hrl").
+
+%% CLI callbacks
+-export([cli_useradd/1, cli_userdel/1]).
 
 -behaviour(emqttd_auth_mod).
 
@@ -42,6 +46,22 @@
 
 -record(?AUTH_USERNAME_TAB, {username, password}).
 
+%%%=============================================================================
+%%% CLI
+%%%=============================================================================
+
+cli_useradd([Username, Password]) ->
+    ?PRINT("~p~n", [add_user(list_to_binary(Username), list_to_binary(Password))]);
+
+cli_useradd(_) ->
+    ?PRINT_CMD("useradd <Username> <Password>", "#add user").
+
+cli_userdel([Username]) ->
+    ?PRINT("~p~n", [remove_user(list_to_binary(Username))]);
+
+cli_userdel(_) ->
+    ?PRINT_CMD("userdel <Username>", "#delete user").
+
 %%%=============================================================================
 %%% API 
 %%%=============================================================================
@@ -67,6 +87,8 @@ init(Opts) ->
 		{disc_copies, [node()]},
 		{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
 	mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies),
+    emqttd_ctl:register_cmd(useradd, {?MODULE, cli_useradd}, []),
+    emqttd_ctl:register_cmd(userdel, {?MODULE, cli_userdel}, []),
     {ok, Opts}.
 
 check(#mqtt_client{username = undefined}, _Password, _Opts) ->

+ 19 - 2
src/emqttd_broker.erl

@@ -28,7 +28,10 @@
 
 -author("Feng Lee <feng@emqtt.io>").
 
--include_lib("emqttd.hrl").
+-include("emqttd.hrl").
+-include("emqttd_cli.hrl").
+
+-export([cli/1]).
 
 %% API Function Exports
 -export([start_link/0]).
@@ -68,6 +71,16 @@
     sysdescr      % Broker description
 ]).
 
+%%%=============================================================================
+%%% CLI callback
+%%%=============================================================================
+cli([]) ->
+    Funs = [sysdescr, version, uptime, datetime],
+    [?PRINT("~-20s~s~n", [Fun, ?MODULE:Fun()]) || Fun <- Funs];
+
+cli(_) ->
+    ?PRINT_CMD("broker", "#query broker version, uptime and description").
+
 %%%=============================================================================
 %%% API
 %%%=============================================================================
@@ -223,6 +236,8 @@ init([]) ->
     % Create $SYS Topics
     emqttd_pubsub:create(<<"$SYS/brokers">>),
     [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS],
+    %% CLI
+    emqttd_ctl:register_cmd(broker, {?MODULE, cli}, []),
     % Tick
     {ok, #state{started_at = os:timestamp(),
                 heartbeat  = start_tick(1000, heartbeat),
@@ -279,7 +294,9 @@ handle_info(_Info, State) ->
 
 terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) ->
     stop_tick(Hb),
-    stop_tick(TRef).
+    stop_tick(TRef),
+    emqttd_ctl:unregister_cmd(broker),
+    ok.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.

+ 98 - 96
src/emqttd_ctl.erl

@@ -24,32 +24,86 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_ctl).
 
 -author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
 
--define(PRINT_MSG(Msg), 
-    io:format(Msg)).
+-include("emqttd_cli.hrl").
 
--define(PRINT(Format, Args), 
-    io:format(Format, Args)).
+-export([init/0,
+         register_cmd/3,
+         unregister_cmd/1,
+         run/1]).
 
 -export([status/1,
          vm/1,
-         broker/1,
          stats/1,
          metrics/1,
          cluster/1,
          clients/1,
          sessions/1,
          listeners/1,
-         bridges/1,
-         plugins/1,
-         trace/1,
-         useradd/1,
-         userdel/1]).
+         bridges/1]).
+
+-define(CMD_TAB, mqttd_ctl_cmd).
+
+%%%=============================================================================
+%%% API
+%%%=============================================================================
+
+%%------------------------------------------------------------------------------
+%% @doc Init cmd table.
+%% @end
+%%------------------------------------------------------------------------------
+init() ->
+    ets:new(?CMD_TAB, [ordered_set, named_table, public]),
+    register_cmd(status,  {?MODULE, status}, []),
+    register_cmd(vm,      {?MODULE, vm}, []),
+    register_cmd(cluster, {?MODULE, cluster}, []).
+
+%%------------------------------------------------------------------------------
+%% @doc Register a command
+%% @end
+%%------------------------------------------------------------------------------
+-spec register_cmd(atom(), {module(), atom()}, list()) -> true.
+register_cmd(Cmd, MF, Opts) ->
+    ets:insert(?CMD_TAB, {Cmd, MF, Opts}).
+
+%%------------------------------------------------------------------------------
+%% @doc Unregister a command
+%% @end
+%%------------------------------------------------------------------------------
+-spec unregister_cmd(atom()) -> true.
+unregister_cmd(Cmd) ->
+    ets:delete(?CMD_TAB, Cmd).
+
+%%------------------------------------------------------------------------------
+%% @doc Run a command
+%% @end
+%%------------------------------------------------------------------------------
+
+run([]) -> usage();
+
+run([CmdS|Args]) ->
+    case ets:lookup(?CMD_TAB, list_to_atom(CmdS)) of
+        [{_, {Mod, Fun}, _}] -> Mod:Fun(Args);
+        [] -> usage() 
+    end.
+    
+%%------------------------------------------------------------------------------
+%% @doc Usage
+%% @end
+%%------------------------------------------------------------------------------
+usage() ->
+    ?PRINT("Usage: ~s~n", [?MODULE]),
+    [Mod:Cmd(["help"]) || {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)].
+
+%%%=============================================================================
+%%% Commands
+%%%=============================================================================
 
 %%------------------------------------------------------------------------------
 %% @doc Query node status
@@ -63,7 +117,36 @@ status([]) ->
         ?PRINT_MSG("emqttd is not running~n");
     {value,_Version} ->
         ?PRINT_MSG("emqttd is running~n")
-    end.
+    end;
+status(_) ->
+     ?PRINT_CMD("status", "#query broker status").
+
+vm([]) ->
+    vm(["all"]);
+
+vm(["all"]) ->
+    [begin vm([Name]), ?PRINT_MSG("~n") end || Name <- ["load", "memory", "process", "io"]];
+
+vm(["load"]) ->
+    [?PRINT("cpu/~-20s~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()];
+
+vm(["memory"]) ->
+    [?PRINT("memory/~-17s~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
+
+vm(["process"]) ->
+    lists:foreach(fun({Name, Key}) ->
+        ?PRINT("process/~-16s~w~n", [Name, erlang:system_info(Key)])
+    end, [{limit, process_limit}, {count, process_count}]);
+
+vm(["io"]) ->
+    ?PRINT("io/~-21s~w~n", [max_fds, proplists:get_value(max_fds, erlang:system_info(check_io))]);
+
+vm(_) ->
+    ?PRINT_CMD("vm all",     "#query info of erlang vm"),
+    ?PRINT_CMD("vm load",    "#query load of erlang vm"),
+    ?PRINT_CMD("vm memory",  "#query memory of erlang vm"),
+    ?PRINT_CMD("vm process", "#query process of erlang vm"),
+    ?PRINT_CMD("vm io",      "#queue io of erlang vm").
 
 %%------------------------------------------------------------------------------
 %% @doc Cluster with other node
@@ -93,45 +176,11 @@ cluster([SNode]) ->
         end;
     pang ->
         ?PRINT("failed to connect to ~p~n", [Node])
-    end.
-
-%%------------------------------------------------------------------------------
-%% @doc Add user
-%% @end
-%%------------------------------------------------------------------------------
-useradd([Username, Password]) ->
-    ?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]).
-
-%%------------------------------------------------------------------------------
-%% @doc Delete user
-%% @end
-%%------------------------------------------------------------------------------
-userdel([Username]) ->
-    ?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]).
-
-vm([]) ->
-    [vm([Name]) || Name <- ["load", "memory", "process", "io"]];
-
-vm(["load"]) ->
-    ?PRINT_MSG("Load: ~n"),
-    [?PRINT("  ~s:~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()];
-
-vm(["memory"]) ->
-    ?PRINT_MSG("Memory: ~n"),
-    [?PRINT("  ~s:~p~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
-
-vm(["process"]) ->
-    ?PRINT_MSG("Process: ~n"),
-    ?PRINT("  process_limit:~p~n", [erlang:system_info(process_limit)]),
-    ?PRINT("  process_count:~p~n", [erlang:system_info(process_count)]);
+    end;
 
-vm(["io"]) ->
-    ?PRINT_MSG("IO: ~n"),
-    ?PRINT("  max_fds:~p~n", [proplists:get_value(max_fds, erlang:system_info(check_io))]).
+cluster(_) ->
+    ?PRINT_CMD("cluster [<Node>]", "#cluster with node, query cluster info ").
 
-broker([]) ->
-    Funs = [sysdescr, version, uptime, datetime],
-    [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs].
 
 stats([]) ->
     [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()].
@@ -216,50 +265,6 @@ bridges(["stop", SNode, Topic]) ->
         {error, Error} -> ?PRINT("error: ~p~n", [Error])
     end.
 
-plugins(["list"]) ->
-    lists:foreach(fun(Plugin) -> print(plugin, Plugin) end, emqttd_plugins:list());
-
-plugins(["load", Name]) ->
-    case emqttd_plugins:load(list_to_atom(Name)) of
-        {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
-        {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason])
-    end;
-
-plugins(["unload", Name]) ->
-    case emqttd_plugins:unload(list_to_atom(Name)) of
-        ok -> ?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
-        {error, Reason} -> ?PRINT("unload plugin error: ~p~n", [Reason])
-    end.
-
-trace(["list"]) ->
-    lists:foreach(fun({{Who, Name}, LogFile}) -> 
-            ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
-        end, emqttd_trace:all_traces());
-
-trace(["client", ClientId, "off"]) ->
-    stop_trace(client, ClientId);
-trace(["client", ClientId, LogFile]) ->
-    start_trace(client, ClientId, LogFile);
-trace(["topic", Topic, "off"]) ->
-    stop_trace(topic, Topic);
-trace(["topic", Topic, LogFile]) ->
-    start_trace(topic, Topic, LogFile).
-
-start_trace(Who, Name, LogFile) ->
-    case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of
-        ok -> 
-            ?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
-        {error, Error} ->
-            ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
-    end.
-stop_trace(Who, Name) ->
-    case emqttd_trace:stop_trace({Who, bin(Name)}) of
-        ok -> 
-            ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
-        {error, Error} ->
-            ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
-    end.
-
 
 node_name(SNode) ->
     SNode1 =
@@ -333,11 +338,8 @@ print(session, {{ClientId, _ClientPid}, SessInfo}) ->
            "message_queue=~w, message_dropped=~w, "
            "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
            "created_at=~w, subscriptions=~s)~n",
-            [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]);
+            [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]).
 
-print(plugin, #mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
-    ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
-               [Name, Ver, Descr, Active]).
 
 format(created_at, Val) ->
     emqttd_util:now_to_secs(Val);

+ 17 - 1
src/emqttd_metrics.erl

@@ -24,18 +24,21 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_metrics).
 
 -author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
 
+-include("emqttd_cli.hrl").
+
 -behaviour(gen_server).
 
 -define(SERVER, ?MODULE).
 
 %% API Function Exports
--export([start_link/0]).
+-export([start_link/0, cli/1]).
 
 -export([all/0, value/1,
          inc/1, inc/2, inc/3,
@@ -93,6 +96,17 @@
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
+%%------------------------------------------------------------------------------
+%% @doc CLI command callback
+%% @end
+%%------------------------------------------------------------------------------
+
+cli([]) ->
+    [?PRINT("~-32s ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqttd_metrics:all())];
+
+cli(_) ->
+    ?PRINT_CMD("metrics", "#query broker metrics").
+
 %%------------------------------------------------------------------------------
 %% @doc Get all metrics
 %% @end
@@ -193,6 +207,8 @@ init([]) ->
     [create_metric(Metric) ||  Metric <- Metrics],
     % $SYS Topics for metrics
     [ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics],
+    %% Register CLI commands
+    emqttd_ctl:register_cmd(metrics, {?MODULE, cli}, []),
     % Tick to publish metrics
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
 

+ 34 - 0
src/emqttd_plugins.erl

@@ -30,6 +30,9 @@
 -author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
+-include("emqttd_cli.hrl").
+
+-export([cli/1]).
 
 -export([load/0, unload/0]).
 
@@ -37,6 +40,36 @@
 
 -export([list/0]).
 
+%%------------------------------------------------------------------------------
+%% CLI callback
+%%------------------------------------------------------------------------------
+
+cli(["list"]) ->
+    lists:foreach(fun(Plugin) -> print(Plugin) end, list());
+
+cli(["load", Name]) ->
+    case load(list_to_atom(Name)) of
+        {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
+        {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason])
+    end;
+
+cli(["unload", Name]) ->
+    case emqttd_plugins:unload(list_to_atom(Name)) of
+        ok ->
+            ?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
+        {error, Reason} ->
+            ?PRINT("unload plugin error: ~p~n", [Reason])
+    end;
+
+cli(_) ->
+    ?PRINT_CMD("plugins list",            "#query loaded plugins"),
+    ?PRINT_CMD("plugins load <Plugin>",   "#load plugin"),
+    ?PRINT_CMD("plugins unload <Plugin>", "#unload plugin").
+
+print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
+    ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
+               [Name, Ver, Descr, Active]).
+
 %%------------------------------------------------------------------------------
 %% @doc Load all plugins when the broker started.
 %% @end
@@ -44,6 +77,7 @@
 
 -spec load() -> list() | {error, any()}.
 load() ->
+    emqttd_ctl:register_cmd(plugins, {?MODULE, cli}, []),
     case env(loaded_file) of
         {ok, File} ->
             with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end);

+ 51 - 1
src/emqttd_trace.erl

@@ -24,17 +24,23 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_trace).
 
 -author("Feng Lee <feng@emqtt.io>").
 
--behaviour(gen_server).
+-include("emqttd_cli.hrl").
+
+%% CLI
+-export([cli/1]).
 
 %% API Function Exports
 -export([start_link/0]).
 
 -export([start_trace/2, stop_trace/1, all_traces/0]).
 
+-behaviour(gen_server).
+
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
@@ -45,6 +51,48 @@
 
 -define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
 
+
+%%%=============================================================================
+%%% CLI
+%%%=============================================================================
+
+cli(["list"]) ->
+    lists:foreach(fun({{Who, Name}, LogFile}) -> 
+            ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
+        end, all_traces());
+
+cli(["client", ClientId, "off"]) ->
+    cli(trace_off, client, ClientId);
+cli(["client", ClientId, LogFile]) ->
+    cli(trace_on, client, ClientId, LogFile);
+cli(["topic", Topic, "off"]) ->
+    cli(trace_off, topic, Topic);
+cli(["topic", Topic, LogFile]) ->
+    cli(trace_on, topic, Topic, LogFile);
+
+cli(_) ->
+    ?PRINT_CMD("trace list",                       "#query all traces"),
+    ?PRINT_CMD("trace client <ClientId> <LogFile>","#trace client with ClientId"),
+    ?PRINT_CMD("trace client <ClientId> off",      "#stop to trace client"),
+    ?PRINT_CMD("trace topic <Topic> <LogFile>",    "#trace topic with Topic"),
+    ?PRINT_CMD("trace topic <Topic> off",          "#stop to trace Topic").
+
+cli(trace_on, Who, Name, LogFile) ->
+    case start_trace({Who, list_to_binary(Name)}, LogFile) of
+        ok ->
+            ?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
+        {error, Error} ->
+            ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
+    end.
+
+cli(trace_off, Who, Name) ->
+    case stop_trace({Who, list_to_binary(Name)}) of
+        ok -> 
+            ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
+        {error, Error} ->
+            ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
+    end.
+
 %%%=============================================================================
 %%% API
 %%%=============================================================================
@@ -85,6 +133,7 @@ all_traces() ->
     gen_server:call(?MODULE, all_traces).
 
 init([]) ->
+    emqttd_ctl:register_cmd(trace, {?MODULE, cli}, []),
     {ok, #state{level = info, trace_map = #{}}}.
 
 handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) ->
@@ -122,6 +171,7 @@ handle_info(_Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, _State) ->
+    emqttd_ctl:unregister_cmd(trace),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->