emqttd_cli.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. %%%-----------------------------------------------------------------------------
  2. %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
  3. %%%
  4. %%% Permission is hereby granted, free of charge, to any person obtaining a copy
  5. %%% of this software and associated documentation files (the "Software"), to deal
  6. %%% in the Software without restriction, including without limitation the rights
  7. %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. %%% copies of the Software, and to permit persons to whom the Software is
  9. %%% furnished to do so, subject to the following conditions:
  10. %%%
  11. %%% The above copyright notice and this permission notice shall be included in all
  12. %%% copies or substantial portions of the Software.
  13. %%%
  14. %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. %%% SOFTWARE.
  21. %%%-----------------------------------------------------------------------------
  22. %%% @doc
  23. %%% emqttd cli.
  24. %%%
  25. %%% @end
  26. %%%-----------------------------------------------------------------------------
  27. -module(emqttd_cli).
  28. -author("Feng Lee <feng@emqtt.io>").
  29. -include("emqttd.hrl").
  30. -include("emqttd_cli.hrl").
  31. -import(lists, [foreach/2]).
  32. -import(proplists, [get_value/2]).
  33. -export([load/0]).
  34. -export([status/1, broker/1, cluster/1, bridges/1,
  35. clients/1, sessions/1, plugins/1, listeners/1,
  36. vm/1, mnesia/1, trace/1]).
  37. -define(PROC_INFOKEYS, [status,
  38. memory,
  39. message_queue_len,
  40. total_heap_size,
  41. heap_size,
  42. stack_size,
  43. reductions]).
  44. load() ->
  45. Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
  46. [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
  47. is_cmd(Fun) ->
  48. not lists:member(Fun, [init, load, module_info]).
  49. %%%=============================================================================
  50. %%% Commands
  51. %%%=============================================================================
  52. %%------------------------------------------------------------------------------
  53. %% @doc Node status
  54. %% @end
  55. %%------------------------------------------------------------------------------
  56. status([]) ->
  57. {InternalStatus, _ProvidedStatus} = init:get_status(),
  58. ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
  59. case lists:keysearch(emqttd, 1, application:which_applications()) of
  60. false ->
  61. ?PRINT_MSG("emqttd is not running~n");
  62. {value, {emqttd, _Desc, Vsn}} ->
  63. ?PRINT("emqttd ~s is running~n", [Vsn])
  64. end;
  65. status(_) ->
  66. ?PRINT_CMD("status", "query broker status").
  67. %%------------------------------------------------------------------------------
  68. %% @doc Query broker
  69. %% @end
  70. %%------------------------------------------------------------------------------
  71. broker([]) ->
  72. Funs = [sysdescr, version, uptime, datetime],
  73. foreach(fun(Fun) ->
  74. ?PRINT("~-10s: ~s~n", [Fun, emqttd_broker:Fun()])
  75. end, Funs);
  76. broker(["stats"]) ->
  77. foreach(fun({Stat, Val}) ->
  78. ?PRINT("~-20s: ~w~n", [Stat, Val])
  79. end, emqttd_stats:getstats());
  80. broker(["metrics"]) ->
  81. foreach(fun({Metric, Val}) ->
  82. ?PRINT("~-24s: ~w~n", [Metric, Val])
  83. end, lists:sort(emqttd_metrics:all()));
  84. broker(["pubsub"]) ->
  85. Pubsubs = supervisor:which_children(emqttd_pubsub_sup),
  86. foreach(fun({{_, Id}, Pid, _, _}) ->
  87. ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
  88. ?PRINT("pubsub: ~w~n", [Id]),
  89. foreach(fun({Key, Val}) ->
  90. ?PRINT(" ~-18s: ~w~n", [Key, Val])
  91. end, ProcInfo)
  92. end, lists:reverse(Pubsubs));
  93. broker(_) ->
  94. ?USAGE([{"broker", "query broker version, uptime and description"},
  95. {"broker pubsub", "query process_info of pubsub"},
  96. {"broker stats", "query broker statistics of clients, topics, subscribers"},
  97. {"broker metrics", "query broker metrics"}]).
  98. %%------------------------------------------------------------------------------
  99. %% @doc Cluster with other node
  100. %% @end
  101. %%------------------------------------------------------------------------------
  102. cluster([]) ->
  103. Nodes = emqttd_broker:running_nodes(),
  104. ?PRINT("cluster nodes: ~p~n", [Nodes]);
  105. cluster(usage) ->
  106. ?PRINT_CMD("cluster [<Node>]", "cluster with node, query cluster info ");
  107. cluster([SNode]) ->
  108. Node = emqttd_dist:parse_node(SNode),
  109. case lists:member(Node, emqttd_broker:running_nodes()) of
  110. true ->
  111. ?PRINT("~s is already clustered~n", [Node]);
  112. false ->
  113. cluster(Node, fun() ->
  114. emqttd_plugins:unload(),
  115. application:stop(emqttd),
  116. application:stop(esockd),
  117. application:stop(gproc),
  118. emqttd_mnesia:cluster(Node),
  119. application:start(gproc),
  120. application:start(esockd),
  121. application:start(emqttd)
  122. end)
  123. end;
  124. cluster(_) ->
  125. cluster(usage).
  126. cluster(Node, DoCluster) ->
  127. cluster(net_adm:ping(Node), Node, DoCluster).
  128. cluster(pong, Node, DoCluster) ->
  129. case emqttd:is_running(Node) of
  130. true ->
  131. DoCluster(),
  132. ?PRINT("cluster with ~s successfully.~n", [Node]);
  133. false ->
  134. ?PRINT("emqttd is not running on ~s~n", [Node])
  135. end;
  136. cluster(pang, Node, _DoCluster) ->
  137. ?PRINT("Cannot connect to ~s~n", [Node]).
  138. %%------------------------------------------------------------------------------
  139. %% @doc Query clients
  140. %% @end
  141. %%------------------------------------------------------------------------------
  142. clients(["list"]) ->
  143. emqttd_mnesia:dump(ets, mqtt_client, fun print/1);
  144. clients(["show", ClientId]) ->
  145. case emqttd_cm:lookup(list_to_binary(ClientId)) of
  146. undefined -> ?PRINT_MSG("Not Found.~n");
  147. Client -> print(Client)
  148. end;
  149. clients(["kick", ClientId]) ->
  150. case emqttd_cm:lookup(list_to_binary(ClientId)) of
  151. undefined ->
  152. ?PRINT_MSG("Not Found.~n");
  153. #mqtt_client{client_pid = Pid} ->
  154. emqttd_client:kick(Pid)
  155. end;
  156. clients(_) ->
  157. ?USAGE([{"clients list", "list all clients"},
  158. {"clients show <ClientId>", "show a client"},
  159. {"clients kick <ClientId>", "kick a client"}]).
  160. %%------------------------------------------------------------------------------
  161. %% @doc Sessions Command
  162. %% @end
  163. %%------------------------------------------------------------------------------
  164. sessions(["list"]) ->
  165. [sessions(["list", Type]) || Type <- ["persistent", "transient"]];
  166. sessions(["list", "persistent"]) ->
  167. emqttd_mnesia:dump(ets, mqtt_persistent_session, fun print/1);
  168. sessions(["list", "transient"]) ->
  169. emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1);
  170. sessions(["show", ClientId]) ->
  171. MP = {{list_to_binary(ClientId), '_'}, '_'},
  172. case {ets:match_object(mqtt_transient_session, MP),
  173. ets:match_object(mqtt_persistent_session, MP)} of
  174. {[], []} ->
  175. ?PRINT_MSG("Not Found.~n");
  176. {[SessInfo], _} ->
  177. print(SessInfo);
  178. {_, [SessInfo]} ->
  179. print(SessInfo)
  180. end;
  181. sessions(_) ->
  182. ?USAGE([{"sessions list", "list all sessions"},
  183. {"sessions list persistent", "list all persistent sessions"},
  184. {"sessions list transient", "list all transient sessions"},
  185. {"sessions show <ClientId>", "show a session"}]).
  186. plugins(["list"]) ->
  187. foreach(fun print/1, emqttd_plugins:list());
  188. plugins(["load", Name]) ->
  189. case emqttd_plugins:load(list_to_atom(Name)) of
  190. {ok, StartedApps} ->
  191. ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
  192. {error, Reason} ->
  193. ?PRINT("load plugin error: ~p~n", [Reason])
  194. end;
  195. plugins(["unload", Name]) ->
  196. case emqttd_plugins:unload(list_to_atom(Name)) of
  197. ok ->
  198. ?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
  199. {error, Reason} ->
  200. ?PRINT("unload plugin error: ~p~n", [Reason])
  201. end;
  202. plugins(_) ->
  203. ?USAGE([{"plugins list", "show loaded plugins"},
  204. {"plugins load <Plugin>", "load plugin"},
  205. {"plugins unload <Plugin>", "unload plugin"}]).
  206. %%------------------------------------------------------------------------------
  207. %% @doc Bridges command
  208. %% @end
  209. %%------------------------------------------------------------------------------
  210. bridges(["list"]) ->
  211. foreach(fun({{Node, Topic}, _Pid}) ->
  212. ?PRINT("bridge: ~s ~s~n", [Node, Topic])
  213. end, emqttd_bridge_sup:bridges());
  214. bridges(["options"]) ->
  215. ?PRINT_MSG("Options:~n"),
  216. ?PRINT_MSG(" qos = 0 | 1 | 2~n"),
  217. ?PRINT_MSG(" prefix = string~n"),
  218. ?PRINT_MSG(" suffix = string~n"),
  219. ?PRINT_MSG(" queue = integer~n"),
  220. ?PRINT_MSG("Example:~n"),
  221. ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
  222. bridges(["start", SNode, Topic]) ->
  223. case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
  224. {ok, _} -> ?PRINT_MSG("bridge is started.~n");
  225. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  226. end;
  227. bridges(["start", SNode, Topic, OptStr]) ->
  228. Opts = parse_opts(bridge, OptStr),
  229. case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of
  230. {ok, _} -> ?PRINT_MSG("bridge is started.~n");
  231. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  232. end;
  233. bridges(["stop", SNode, Topic]) ->
  234. case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
  235. ok -> ?PRINT_MSG("bridge is stopped.~n");
  236. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  237. end;
  238. bridges(_) ->
  239. ?USAGE([{"bridges list", "query bridges"},
  240. {"bridges options", "bridge options"},
  241. {"bridges start <Node> <Topic>", "start bridge"},
  242. {"bridges start <Node> <Topic> <Options>", "start bridge with options"},
  243. {"bridges stop <Node> <Topic>", "stop bridge"}]).
  244. parse_opts(Cmd, OptStr) ->
  245. Tokens = string:tokens(OptStr, ","),
  246. [parse_opt(Cmd, list_to_atom(Opt), Val)
  247. || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
  248. parse_opt(bridge, qos, Qos) ->
  249. {qos, list_to_integer(Qos)};
  250. parse_opt(bridge, suffix, Suffix) ->
  251. {topic_suffix, list_to_binary(Suffix)};
  252. parse_opt(bridge, prefix, Prefix) ->
  253. {topic_prefix, list_to_binary(Prefix)};
  254. parse_opt(bridge, queue, Len) ->
  255. {max_queue_len, list_to_integer(Len)};
  256. parse_opt(_Cmd, Opt, _Val) ->
  257. ?PRINT("Bad Option: ~s~n", [Opt]).
  258. %%------------------------------------------------------------------------------
  259. %% @doc vm command
  260. %% @end
  261. %%------------------------------------------------------------------------------
  262. vm([]) ->
  263. vm(["all"]);
  264. vm(["all"]) ->
  265. [vm([Name]) || Name <- ["load", "memory", "process", "io"]];
  266. vm(["load"]) ->
  267. [?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()];
  268. vm(["memory"]) ->
  269. [?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
  270. vm(["process"]) ->
  271. foreach(fun({Name, Key}) ->
  272. ?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
  273. end, [{limit, process_limit}, {count, process_count}]);
  274. vm(["io"]) ->
  275. IoInfo = erlang:system_info(check_io),
  276. foreach(fun(Key) ->
  277. ?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)])
  278. end, [max_fds, active_fds]);
  279. vm(_) ->
  280. ?USAGE([{"vm all", "query info of erlang vm"},
  281. {"vm load", "query load of erlang vm"},
  282. {"vm memory", "query memory of erlang vm"},
  283. {"vm process", "query process of erlang vm"},
  284. {"vm io", "queue io of erlang vm"}]).
  285. %%------------------------------------------------------------------------------
  286. %% @doc mnesia Command
  287. %% @end
  288. %%------------------------------------------------------------------------------
  289. mnesia([]) ->
  290. mnesia:system_info();
  291. mnesia(_) ->
  292. ?PRINT_CMD("mnesia", "mnesia system info").
  293. %%------------------------------------------------------------------------------
  294. %% @doc Trace Command
  295. %% @end
  296. %%------------------------------------------------------------------------------
  297. trace(["list"]) ->
  298. foreach(fun({{Who, Name}, LogFile}) ->
  299. ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
  300. end, emqttd_trace:all_traces());
  301. trace(["client", ClientId, "off"]) ->
  302. trace_off(client, ClientId);
  303. trace(["client", ClientId, LogFile]) ->
  304. trace_on(client, ClientId, LogFile);
  305. trace(["topic", Topic, "off"]) ->
  306. trace_off(topic, Topic);
  307. trace(["topic", Topic, LogFile]) ->
  308. trace_on(topic, Topic, LogFile);
  309. trace(_) ->
  310. ?USAGE([{"trace list", "query all traces"},
  311. {"trace client <ClientId> <LogFile>","trace client with ClientId"},
  312. {"trace client <ClientId> off", "stop to trace client"},
  313. {"trace topic <Topic> <LogFile>", "trace topic with Topic"},
  314. {"trace topic <Topic> off", "stop to trace Topic"}]).
  315. trace_on(Who, Name, LogFile) ->
  316. case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of
  317. ok ->
  318. ?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
  319. {error, Error} ->
  320. ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
  321. end.
  322. trace_off(Who, Name) ->
  323. case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of
  324. ok ->
  325. ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
  326. {error, Error} ->
  327. ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
  328. end.
  329. %%------------------------------------------------------------------------------
  330. %% @doc Listeners Command
  331. %% @end
  332. %%------------------------------------------------------------------------------
  333. listeners([]) ->
  334. foreach(fun({{Protocol, Port}, Pid}) ->
  335. Info = [{acceptors, esockd:get_acceptors(Pid)},
  336. {max_clients, esockd:get_max_clients(Pid)},
  337. {current_clients,esockd:get_current_clients(Pid)},
  338. {shutdown_count, esockd:get_shutdown_count(Pid)}],
  339. ?PRINT("listener on ~s:~w~n", [Protocol, Port]),
  340. foreach(fun({Key, Val}) ->
  341. ?PRINT(" ~-16s: ~w~n", [Key, Val])
  342. end, Info)
  343. end, esockd:listeners());
  344. listeners(_) ->
  345. ?PRINT_CMD("listeners", "query broker listeners").
  346. print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
  347. ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
  348. [Name, Ver, Descr, Active]);
  349. print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess,
  350. username = Username, peername = Peername,
  351. connected_at = ConnectedAt}) ->
  352. ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
  353. [ClientId, CleanSess, Username,
  354. emqttd_net:format(Peername),
  355. emqttd_util:now_to_secs(ConnectedAt)]);
  356. print({{ClientId, _ClientPid}, SessInfo}) ->
  357. InfoKeys = [clean_sess,
  358. max_inflight,
  359. inflight_queue,
  360. message_queue,
  361. message_dropped,
  362. awaiting_rel,
  363. awaiting_ack,
  364. awaiting_comp,
  365. created_at,
  366. subscriptions],
  367. ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
  368. "message_queue=~w, message_dropped=~w, "
  369. "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
  370. "created_at=~w, subscriptions=~s)~n",
  371. [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]).
  372. format(created_at, Val) ->
  373. emqttd_util:now_to_secs(Val);
  374. format(subscriptions, List) ->
  375. string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ",");
  376. format(_, Val) ->
  377. Val.