emqttd_cli.erl 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_cli).
  17. -author("Feng Lee <feng@emqtt.io>").
  18. -include("emqttd.hrl").
  19. -include("emqttd_cli.hrl").
  20. -include("emqttd_protocol.hrl").
  21. -import(lists, [foreach/2]).
  22. -import(proplists, [get_value/2]).
  23. -export([load/0]).
  24. -export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,
  25. routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,
  26. listeners/1, vm/1, mnesia/1, trace/1, acl/1]).
  27. -define(PROC_INFOKEYS, [status,
  28. memory,
  29. message_queue_len,
  30. total_heap_size,
  31. heap_size,
  32. stack_size,
  33. reductions]).
  34. -define(MAX_LIMIT, 10000).
  35. -define(APP, emqttd).
  36. load() ->
  37. Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
  38. [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
  39. is_cmd(Fun) ->
  40. not lists:member(Fun, [init, load, module_info]).
  41. %%--------------------------------------------------------------------
  42. %% Commands
  43. %%--------------------------------------------------------------------
  44. %%--------------------------------------------------------------------
  45. %% @doc Node status
  46. status([]) ->
  47. {InternalStatus, _ProvidedStatus} = init:get_status(),
  48. ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
  49. case lists:keysearch(?APP, 1, application:which_applications()) of
  50. false ->
  51. ?PRINT_MSG("emqttd is not running~n");
  52. {value, {?APP, _Desc, Vsn}} ->
  53. ?PRINT("emqttd ~s is running~n", [Vsn])
  54. end;
  55. status(_) ->
  56. ?PRINT_CMD("status", "Show broker status").
  57. %%--------------------------------------------------------------------
  58. %% @doc Query broker
  59. broker([]) ->
  60. Funs = [sysdescr, version, uptime, datetime],
  61. foreach(fun(Fun) ->
  62. ?PRINT("~-10s: ~s~n", [Fun, emqttd_broker:Fun()])
  63. end, Funs);
  64. broker(["stats"]) ->
  65. foreach(fun({Stat, Val}) ->
  66. ?PRINT("~-20s: ~w~n", [Stat, Val])
  67. end, emqttd_stats:getstats());
  68. broker(["metrics"]) ->
  69. foreach(fun({Metric, Val}) ->
  70. ?PRINT("~-24s: ~w~n", [Metric, Val])
  71. end, lists:sort(emqttd_metrics:all()));
  72. broker(["pubsub"]) ->
  73. Pubsubs = supervisor:which_children(emqttd_pubsub_sup:pubsub_pool()),
  74. foreach(fun({{_, Id}, Pid, _, _}) ->
  75. ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
  76. ?PRINT("pubsub: ~w~n", [Id]),
  77. foreach(fun({Key, Val}) ->
  78. ?PRINT(" ~-18s: ~w~n", [Key, Val])
  79. end, ProcInfo)
  80. end, lists:reverse(Pubsubs));
  81. broker(_) ->
  82. ?USAGE([{"broker", "Show broker version, uptime and description"},
  83. {"broker pubsub", "Show process_info of pubsub"},
  84. {"broker stats", "Show broker statistics of clients, topics, subscribers"},
  85. {"broker metrics", "Show broker metrics"}]).
  86. %%--------------------------------------------------------------------
  87. %% @doc Cluster with other nodes
  88. cluster(["join", SNode]) ->
  89. case ekka:join(ekka_node:parse_name(SNode)) of
  90. ok ->
  91. ?PRINT_MSG("Join the cluster successfully.~n"),
  92. cluster(["status"]);
  93. {error, Error} ->
  94. ?PRINT("Failed to join the cluster: ~p~n", [Error])
  95. end;
  96. cluster(["leave"]) ->
  97. case ekka:leave() of
  98. ok ->
  99. ?PRINT_MSG("Leave the cluster successfully.~n"),
  100. cluster(["status"]);
  101. {error, Error} ->
  102. ?PRINT("Failed to leave the cluster: ~p~n", [Error])
  103. end;
  104. cluster(["force-leave", SNode]) ->
  105. case ekka:force_leave(ekka_node:parse_name(SNode)) of
  106. ok ->
  107. ?PRINT_MSG("Remove the node from cluster successfully.~n"),
  108. cluster(["status"]);
  109. {error, Error} ->
  110. ?PRINT("Failed to remove the node from cluster: ~p~n", [Error])
  111. end;
  112. cluster(["status"]) ->
  113. ?PRINT("Cluster status: ~p~n", [ekka_cluster:status()]);
  114. cluster(_) ->
  115. ?USAGE([{"cluster join <Node>", "Join the cluster"},
  116. {"cluster leave", "Leave the cluster"},
  117. {"cluster force-leave <Node>","Force the node leave from cluster"},
  118. {"cluster status", "Cluster status"}]).
  119. %%--------------------------------------------------------------------
  120. %% @doc Users usage
  121. users(Args) -> emq_auth_username:cli(Args).
  122. acl(["reload"]) -> emqttd_access_control:reload_acl();
  123. acl(_) -> ?USAGE([{"acl reload", "reload etc/acl.conf"}]).
  124. %%--------------------------------------------------------------------
  125. %% @doc Query clients
  126. clients(["list"]) ->
  127. dump(mqtt_client);
  128. clients(["show", ClientId]) ->
  129. if_client(ClientId, fun print/1);
  130. clients(["kick", ClientId]) ->
  131. if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end);
  132. clients(_) ->
  133. ?USAGE([{"clients list", "List all clients"},
  134. {"clients show <ClientId>", "Show a client"},
  135. {"clients kick <ClientId>", "Kick out a client"}]).
  136. if_client(ClientId, Fun) ->
  137. case emqttd_cm:lookup(bin(ClientId)) of
  138. undefined -> ?PRINT_MSG("Not Found.~n");
  139. Client -> Fun(Client)
  140. end.
  141. %%--------------------------------------------------------------------
  142. %% @doc Sessions Command
  143. sessions(["list"]) ->
  144. dump(mqtt_local_session);
  145. %% performance issue?
  146. sessions(["list", "persistent"]) ->
  147. lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', false, '_'}));
  148. %% performance issue?
  149. sessions(["list", "transient"]) ->
  150. lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', true, '_'}));
  151. sessions(["show", ClientId]) ->
  152. case ets:lookup(mqtt_local_session, bin(ClientId)) of
  153. [] -> ?PRINT_MSG("Not Found.~n");
  154. [SessInfo] -> print(SessInfo)
  155. end;
  156. sessions(_) ->
  157. ?USAGE([{"sessions list", "List all sessions"},
  158. {"sessions list persistent", "List all persistent sessions"},
  159. {"sessions list transient", "List all transient sessions"},
  160. {"sessions show <ClientId>", "Show a session"}]).
  161. %%--------------------------------------------------------------------
  162. %% @doc Routes Command
  163. routes(["list"]) ->
  164. Routes = emqttd_router:dump(),
  165. foreach(fun print/1, Routes);
  166. routes(["show", Topic]) ->
  167. print(mnesia:dirty_read(mqtt_route, bin(Topic)));
  168. routes(_) ->
  169. ?USAGE([{"routes list", "List all routes"},
  170. {"routes show <Topic>", "Show a route"}]).
  171. %%--------------------------------------------------------------------
  172. %% @doc Topics Command
  173. topics(["list"]) ->
  174. lists:foreach(fun(Topic) -> ?PRINT("~s~n", [Topic]) end, emqttd:topics());
  175. topics(["show", Topic]) ->
  176. print(mnesia:dirty_read(mqtt_route, bin(Topic)));
  177. topics(_) ->
  178. ?USAGE([{"topics list", "List all topics"},
  179. {"topics show <Topic>", "Show a topic"}]).
  180. subscriptions(["list"]) ->
  181. lists:foreach(fun(Subscription) ->
  182. print(subscription, Subscription)
  183. end, ets:tab2list(mqtt_subscription));
  184. subscriptions(["show", ClientId]) ->
  185. case ets:lookup(mqtt_subscription, bin(ClientId)) of
  186. [] -> ?PRINT_MSG("Not Found.~n");
  187. Records -> [print(subscription, Subscription) || Subscription <- Records]
  188. end;
  189. subscriptions(["add", ClientId, Topic, QoS]) ->
  190. Add = fun(IntQos) ->
  191. case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
  192. ok ->
  193. ?PRINT_MSG("ok~n");
  194. {error, Reason} ->
  195. ?PRINT("Error: ~p~n", [Reason])
  196. end
  197. end,
  198. if_valid_qos(QoS, Add);
  199. subscriptions(["del", ClientId]) ->
  200. Ok = emqttd:subscriber_down(bin(ClientId)),
  201. ?PRINT("~p~n", [Ok]);
  202. subscriptions(["del", ClientId, Topic]) ->
  203. Ok = emqttd:unsubscribe(bin(Topic), bin(ClientId)),
  204. ?PRINT("~p~n", [Ok]);
  205. subscriptions(_) ->
  206. ?USAGE([{"subscriptions list", "List all subscriptions"},
  207. {"subscriptions show <ClientId>", "Show subscriptions of a client"},
  208. {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
  209. {"subscriptions del <ClientId>", "Delete static subscriptions manually"},
  210. {"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
  211. % if_could_print(Tab, Fun) ->
  212. % case mnesia:table_info(Tab, size) of
  213. % Size when Size >= ?MAX_LIMIT ->
  214. % ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]);
  215. % _Size ->
  216. % Keys = mnesia:dirty_all_keys(Tab),
  217. % foreach(fun(Key) -> Fun(ets:lookup(Tab, Key)) end, Keys)
  218. % end.
  219. if_valid_qos(QoS, Fun) ->
  220. try list_to_integer(QoS) of
  221. Int when ?IS_QOS(Int) -> Fun(Int);
  222. _ -> ?PRINT_MSG("QoS should be 0, 1, 2~n")
  223. catch _:_ ->
  224. ?PRINT_MSG("QoS should be 0, 1, 2~n")
  225. end.
  226. plugins(["list"]) ->
  227. foreach(fun print/1, emqttd_plugins:list());
  228. plugins(["load", Name]) ->
  229. case emqttd_plugins:load(list_to_atom(Name)) of
  230. {ok, StartedApps} ->
  231. ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
  232. {error, Reason} ->
  233. ?PRINT("load plugin error: ~p~n", [Reason])
  234. end;
  235. plugins(["unload", Name]) ->
  236. case emqttd_plugins:unload(list_to_atom(Name)) of
  237. ok ->
  238. ?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
  239. {error, Reason} ->
  240. ?PRINT("unload plugin error: ~p~n", [Reason])
  241. end;
  242. plugins(_) ->
  243. ?USAGE([{"plugins list", "Show loaded plugins"},
  244. {"plugins load <Plugin>", "Load plugin"},
  245. {"plugins unload <Plugin>", "Unload plugin"}]).
  246. %%--------------------------------------------------------------------
  247. %% @doc Bridges command
  248. bridges(["list"]) ->
  249. foreach(fun({Node, Topic, _Pid}) ->
  250. ?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node])
  251. end, emqttd_bridge_sup_sup:bridges());
  252. bridges(["options"]) ->
  253. ?PRINT_MSG("Options:~n"),
  254. ?PRINT_MSG(" prefix = string~n"),
  255. ?PRINT_MSG(" suffix = string~n"),
  256. ?PRINT_MSG(" queue = integer~n"),
  257. ?PRINT_MSG("Example:~n"),
  258. ?PRINT_MSG(" prefix=abc/,suffix=/yxz,queue=1000~n");
  259. bridges(["start", SNode, Topic]) ->
  260. case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
  261. {ok, _} -> ?PRINT_MSG("bridge is started.~n");
  262. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  263. end;
  264. bridges(["start", SNode, Topic, OptStr]) ->
  265. Opts = parse_opts(bridge, OptStr),
  266. case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of
  267. {ok, _} -> ?PRINT_MSG("bridge is started.~n");
  268. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  269. end;
  270. bridges(["stop", SNode, Topic]) ->
  271. case emqttd_bridge_sup_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
  272. ok -> ?PRINT_MSG("bridge is stopped.~n");
  273. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  274. end;
  275. bridges(_) ->
  276. ?USAGE([{"bridges list", "List bridges"},
  277. {"bridges options", "Bridge options"},
  278. {"bridges start <Node> <Topic>", "Start a bridge"},
  279. {"bridges start <Node> <Topic> <Options>", "Start a bridge with options"},
  280. {"bridges stop <Node> <Topic>", "Stop a bridge"}]).
  281. parse_opts(Cmd, OptStr) ->
  282. Tokens = string:tokens(OptStr, ","),
  283. [parse_opt(Cmd, list_to_atom(Opt), Val)
  284. || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
  285. parse_opt(bridge, suffix, Suffix) ->
  286. {topic_suffix, bin(Suffix)};
  287. parse_opt(bridge, prefix, Prefix) ->
  288. {topic_prefix, bin(Prefix)};
  289. parse_opt(bridge, queue, Len) ->
  290. {max_queue_len, list_to_integer(Len)};
  291. parse_opt(_Cmd, Opt, _Val) ->
  292. ?PRINT("Bad Option: ~s~n", [Opt]).
  293. %%--------------------------------------------------------------------
  294. %% @doc vm command
  295. vm([]) ->
  296. vm(["all"]);
  297. vm(["all"]) ->
  298. [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
  299. vm(["load"]) ->
  300. [?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()];
  301. vm(["memory"]) ->
  302. [?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
  303. vm(["process"]) ->
  304. foreach(fun({Name, Key}) ->
  305. ?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
  306. end, [{limit, process_limit}, {count, process_count}]);
  307. vm(["io"]) ->
  308. IoInfo = erlang:system_info(check_io),
  309. foreach(fun(Key) ->
  310. ?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)])
  311. end, [max_fds, active_fds]);
  312. vm(["ports"]) ->
  313. foreach(fun({Name, Key}) ->
  314. ?PRINT("ports/~-16s: ~w~n", [Name, erlang:system_info(Key)])
  315. end, [{count, port_count}, {limit, port_limit}]);
  316. vm(_) ->
  317. ?USAGE([{"vm all", "Show info of Erlang VM"},
  318. {"vm load", "Show load of Erlang VM"},
  319. {"vm memory", "Show memory of Erlang VM"},
  320. {"vm process", "Show process of Erlang VM"},
  321. {"vm io", "Show IO of Erlang VM"},
  322. {"vm ports", "Show Ports of Erlang VM"}]).
  323. %%--------------------------------------------------------------------
  324. %% @doc mnesia Command
  325. mnesia([]) ->
  326. mnesia:system_info();
  327. mnesia(_) ->
  328. ?PRINT_CMD("mnesia", "Mnesia system info").
  329. %%--------------------------------------------------------------------
  330. %% @doc Trace Command
  331. trace(["list"]) ->
  332. foreach(fun({{Who, Name}, LogFile}) ->
  333. ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
  334. end, emqttd_trace:all_traces());
  335. trace(["client", ClientId, "off"]) ->
  336. trace_off(client, ClientId);
  337. trace(["client", ClientId, LogFile]) ->
  338. trace_on(client, ClientId, LogFile);
  339. trace(["topic", Topic, "off"]) ->
  340. trace_off(topic, Topic);
  341. trace(["topic", Topic, LogFile]) ->
  342. trace_on(topic, Topic, LogFile);
  343. trace(_) ->
  344. ?USAGE([{"trace list", "List all traces"},
  345. {"trace client <ClientId> <LogFile>","Trace a client"},
  346. {"trace client <ClientId> off", "Stop tracing a client"},
  347. {"trace topic <Topic> <LogFile>", "Trace a topic"},
  348. {"trace topic <Topic> off", "Stop tracing a Topic"}]).
  349. trace_on(Who, Name, LogFile) ->
  350. case emqttd_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of
  351. ok ->
  352. ?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
  353. {error, Error} ->
  354. ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
  355. end.
  356. trace_off(Who, Name) ->
  357. case emqttd_trace:stop_trace({Who, iolist_to_binary(Name)}) of
  358. ok ->
  359. ?PRINT("stop tracing ~s ~s successfully.~n", [Who, Name]);
  360. {error, Error} ->
  361. ?PRINT("stop tracing ~s ~s error: ~p.~n", [Who, Name, Error])
  362. end.
  363. %%--------------------------------------------------------------------
  364. %% @doc Listeners Command
  365. listeners([]) ->
  366. foreach(fun({{Protocol, ListenOn}, Pid}) ->
  367. Info = [{acceptors, esockd:get_acceptors(Pid)},
  368. {max_clients, esockd:get_max_clients(Pid)},
  369. {current_clients,esockd:get_current_clients(Pid)},
  370. {shutdown_count, esockd:get_shutdown_count(Pid)}],
  371. ?PRINT("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
  372. foreach(fun({Key, Val}) ->
  373. ?PRINT(" ~-16s: ~w~n", [Key, Val])
  374. end, Info)
  375. end, esockd:listeners());
  376. listeners(["restart", Proto, ListenOn]) ->
  377. ListenOn1 = case string:tokens(ListenOn, ":") of
  378. [Port] -> list_to_integer(Port);
  379. [IP, Port] -> {IP, list_to_integer(Port)}
  380. end,
  381. case emqttd_app:restart_listener({list_to_atom(Proto), ListenOn1, []}) of
  382. {ok, _Pid} ->
  383. io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]);
  384. {error, Error} ->
  385. io:format("Failed to restart ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
  386. end;
  387. listeners(["stop", Proto, ListenOn]) ->
  388. ListenOn1 = case string:tokens(ListenOn, ":") of
  389. [Port] -> list_to_integer(Port);
  390. [IP, Port] -> {IP, list_to_integer(Port)}
  391. end,
  392. case emqttd_app:stop_listener({list_to_atom(Proto), ListenOn1, []}) of
  393. ok ->
  394. io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
  395. {error, Error} ->
  396. io:format("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
  397. end;
  398. listeners(_) ->
  399. ?USAGE([{"listeners", "List listeners"},
  400. {"listeners restart <Proto> <Port>", "Restart a listener"},
  401. {"listeners stop <Proto> <Port>", "Stop a listener"}]).
  402. %%--------------------------------------------------------------------
  403. %% Dump ETS
  404. %%--------------------------------------------------------------------
  405. dump(Table) ->
  406. dump(Table, ets:first(Table)).
  407. dump(_Table, '$end_of_table') ->
  408. ok;
  409. dump(Table, Key) ->
  410. case ets:lookup(Table, Key) of
  411. [Record] -> print(Record);
  412. [] -> ok
  413. end,
  414. dump(Table, ets:next(Table, Key)).
  415. print([]) ->
  416. ok;
  417. print(Routes = [#mqtt_route{topic = Topic} | _]) ->
  418. Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes],
  419. ?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]);
  420. %% print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) ->
  421. %% TopicTable = [io_lib:format("~s:~w", [Topic, Qos])
  422. %% || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
  423. %% ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]);
  424. %% print(Topics = [#mqtt_topic{}|_]) ->
  425. %% foreach(fun print/1, Topics);
  426. print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
  427. ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
  428. [Name, Ver, Descr, Active]);
  429. print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username,
  430. peername = Peername, connected_at = ConnectedAt}) ->
  431. ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
  432. [ClientId, CleanSess, Username, emqttd_net:format(Peername),
  433. emqttd_time:now_secs(ConnectedAt)]);
  434. %% print(#mqtt_topic{topic = Topic, flags = Flags}) ->
  435. %% ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]);
  436. print({route, Routes}) ->
  437. foreach(fun print/1, Routes);
  438. print({local_route, Routes}) ->
  439. foreach(fun print/1, Routes);
  440. print(#mqtt_route{topic = Topic, node = Node}) ->
  441. ?PRINT("~s -> ~s~n", [Topic, Node]);
  442. print({Topic, Node}) ->
  443. ?PRINT("~s -> ~s~n", [Topic, Node]);
  444. print({ClientId, _ClientPid, _Persistent, SessInfo}) ->
  445. Data = lists:append(SessInfo, emqttd_stats:get_session_stats(ClientId)),
  446. InfoKeys = [clean_sess,
  447. subscriptions,
  448. max_inflight,
  449. inflight_len,
  450. mqueue_len,
  451. mqueue_dropped,
  452. awaiting_rel_len,
  453. deliver_msg,
  454. enqueue_msg,
  455. created_at],
  456. ?PRINT("Session(~s, clean_sess=~s, subscriptions=~w, max_inflight=~w, inflight=~w, "
  457. "mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, "
  458. "deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n",
  459. [ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]).
  460. print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) ->
  461. ?PRINT("~p -> ~s~n", [Sub, Topic]);
  462. print(subscription, {Sub, Topic}) when is_pid(Sub) ->
  463. ?PRINT("~p -> ~s~n", [Sub, Topic]);
  464. print(subscription, {Sub, {_Share, Topic}}) ->
  465. ?PRINT("~s -> ~s~n", [Sub, Topic]);
  466. print(subscription, {Sub, Topic}) ->
  467. ?PRINT("~s -> ~s~n", [Sub, Topic]).
  468. format(created_at, Val) ->
  469. emqttd_time:now_secs(Val);
  470. format(_, Val) ->
  471. Val.
  472. bin(S) -> iolist_to_binary(S).