emqttd_cli.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_cli).
  17. -include("emqttd.hrl").
  18. -include("emqttd_cli.hrl").
  19. -include("emqttd_protocol.hrl").
  20. -import(lists, [foreach/2]).
  21. -import(proplists, [get_value/2]).
  22. -export([load/0]).
  23. -export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,
  24. routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,
  25. listeners/1, vm/1, mnesia/1, trace/1]).
  26. -define(PROC_INFOKEYS, [status,
  27. memory,
  28. message_queue_len,
  29. total_heap_size,
  30. heap_size,
  31. stack_size,
  32. reductions]).
  33. -define(MAX_LIMIT, 10000).
  34. -define(APP, emqttd).
  35. load() ->
  36. Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
  37. [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
  38. is_cmd(Fun) ->
  39. not lists:member(Fun, [init, load, module_info]).
  40. %%--------------------------------------------------------------------
  41. %% Commands
  42. %%--------------------------------------------------------------------
  43. %%--------------------------------------------------------------------
  44. %% @doc Node status
  45. status([]) ->
  46. {InternalStatus, _ProvidedStatus} = init:get_status(),
  47. ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
  48. case lists:keysearch(?APP, 1, application:which_applications()) of
  49. false ->
  50. ?PRINT_MSG("emqttd is not running~n");
  51. {value, {?APP, _Desc, Vsn}} ->
  52. ?PRINT("emqttd ~s is running~n", [Vsn])
  53. end;
  54. status(_) ->
  55. ?PRINT_CMD("status", "Show broker status").
  56. %%--------------------------------------------------------------------
  57. %% @doc Query broker
  58. broker([]) ->
  59. Funs = [sysdescr, version, uptime, datetime],
  60. foreach(fun(Fun) ->
  61. ?PRINT("~-10s: ~s~n", [Fun, emqttd_broker:Fun()])
  62. end, Funs);
  63. broker(["stats"]) ->
  64. foreach(fun({Stat, Val}) ->
  65. ?PRINT("~-20s: ~w~n", [Stat, Val])
  66. end, emqttd_stats:getstats());
  67. broker(["metrics"]) ->
  68. foreach(fun({Metric, Val}) ->
  69. ?PRINT("~-24s: ~w~n", [Metric, Val])
  70. end, lists:sort(emqttd_metrics:all()));
  71. broker(["pubsub"]) ->
  72. Pubsubs = supervisor:which_children(emqttd_pubsub_sup:pubsub_pool()),
  73. foreach(fun({{_, Id}, Pid, _, _}) ->
  74. ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
  75. ?PRINT("pubsub: ~w~n", [Id]),
  76. foreach(fun({Key, Val}) ->
  77. ?PRINT(" ~-18s: ~w~n", [Key, Val])
  78. end, ProcInfo)
  79. end, lists:reverse(Pubsubs));
  80. broker(_) ->
  81. ?USAGE([{"broker", "Show broker version, uptime and description"},
  82. {"broker pubsub", "Show process_info of pubsub"},
  83. {"broker stats", "Show broker statistics of clients, topics, subscribers"},
  84. {"broker metrics", "Show broker metrics"}]).
  85. %%--------------------------------------------------------------------
  86. %% @doc Cluster with other nodes
  87. cluster(["join", SNode]) ->
  88. case emqttd_cluster:join(emqttd_node:parse_name(SNode)) of
  89. ok ->
  90. ?PRINT_MSG("Join the cluster successfully.~n"),
  91. cluster(["status"]);
  92. {error, Error} ->
  93. ?PRINT("Failed to join the cluster: ~p~n", [Error])
  94. end;
  95. cluster(["leave"]) ->
  96. case emqttd_cluster:leave() of
  97. ok ->
  98. ?PRINT_MSG("Leave the cluster successfully.~n"),
  99. cluster(["status"]);
  100. {error, Error} ->
  101. ?PRINT("Failed to leave the cluster: ~p~n", [Error])
  102. end;
  103. cluster(["remove", SNode]) ->
  104. case emqttd_cluster:remove(emqttd_node:parse_name(SNode)) of
  105. ok ->
  106. ?PRINT_MSG("Remove the node from cluster successfully.~n"),
  107. cluster(["status"]);
  108. {error, Error} ->
  109. ?PRINT("Failed to remove the node from cluster: ~p~n", [Error])
  110. end;
  111. cluster(["status"]) ->
  112. ?PRINT("Cluster status: ~p~n", [emqttd_cluster:status()]);
  113. cluster(_) ->
  114. ?USAGE([{"cluster join <Node>", "Join the cluster"},
  115. {"cluster leave", "Leave the cluster"},
  116. {"cluster remove <Node>","Remove the node from cluster"},
  117. {"cluster status", "Cluster status"}]).
  118. %%--------------------------------------------------------------------
  119. %% @doc Users usage
  120. users(Args) -> emq_auth_username:cli(Args).
  121. %%--------------------------------------------------------------------
  122. %% @doc Query clients
  123. clients(["list"]) ->
  124. dump(mqtt_client);
  125. clients(["show", ClientId]) ->
  126. if_client(ClientId, fun print/1);
  127. clients(["kick", ClientId]) ->
  128. if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end);
  129. clients(_) ->
  130. ?USAGE([{"clients list", "List all clients"},
  131. {"clients show <ClientId>", "Show a client"},
  132. {"clients kick <ClientId>", "Kick out a client"}]).
  133. if_client(ClientId, Fun) ->
  134. case emqttd_cm:lookup(bin(ClientId)) of
  135. undefined -> ?PRINT_MSG("Not Found.~n");
  136. Client -> Fun(Client)
  137. end.
  138. %%--------------------------------------------------------------------
  139. %% @doc Sessions Command
  140. sessions(["list"]) ->
  141. dump(mqtt_local_session);
  142. %% performance issue?
  143. sessions(["list", "persistent"]) ->
  144. lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', false, '_', '_'}));
  145. %% performance issue?
  146. sessions(["list", "transient"]) ->
  147. lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', true, '_', '_'}));
  148. sessions(["show", ClientId]) ->
  149. case ets:lookup(mqtt_local_session, bin(ClientId)) of
  150. [] -> ?PRINT_MSG("Not Found.~n");
  151. [SessInfo] -> print(SessInfo)
  152. end;
  153. sessions(_) ->
  154. ?USAGE([{"sessions list", "List all sessions"},
  155. {"sessions list persistent", "List all persistent sessions"},
  156. {"sessions list transient", "List all transient sessions"},
  157. {"sessions show <ClientId>", "Show a session"}]).
  158. %%--------------------------------------------------------------------
  159. %% @doc Routes Command
  160. routes(["list"]) ->
  161. if_could_print(mqtt_route, fun print/1);
  162. routes(["show", Topic]) ->
  163. print(mnesia:dirty_read(mqtt_route, bin(Topic)));
  164. routes(_) ->
  165. ?USAGE([{"routes list", "List all routes"},
  166. {"routes show <Topic>", "Show a route"}]).
  167. %%--------------------------------------------------------------------
  168. %% @doc Topics Command
  169. topics(["list"]) ->
  170. lists:foreach(fun(Topic) -> ?PRINT("~s~n", [Topic]) end, emqttd:topics());
  171. topics(["show", Topic]) ->
  172. print(mnesia:dirty_read(mqtt_route, bin(Topic)));
  173. topics(_) ->
  174. ?USAGE([{"topics list", "List all topics"},
  175. {"topics show <Topic>", "Show a topic"}]).
  176. subscriptions(["list"]) ->
  177. lists:foreach(fun(Subscription) ->
  178. print(subscription, Subscription)
  179. end, []); %%emqttd:subscriptions());
  180. subscriptions(["show", ClientId]) ->
  181. case ets:lookup(mqtt_subscription, bin(ClientId)) of
  182. [] -> ?PRINT_MSG("Not Found.~n");
  183. Records -> [print(subscription, Subscription) || Subscription <- Records]
  184. end;
  185. %%
  186. %% subscriptions(["add", ClientId, Topic, QoS]) ->
  187. %% Add = fun(IntQos) ->
  188. %% Subscription = #mqtt_subscription{subid = bin(ClientId),
  189. %% topic = bin(Topic),
  190. %% qos = IntQos},
  191. %% case emqttd_backend:add_subscription(Subscription) of
  192. %% ok ->
  193. %% ?PRINT_MSG("ok~n");
  194. %% {error, already_existed} ->
  195. %% ?PRINT_MSG("Error: already existed~n");
  196. %% {error, Reason} ->
  197. %% ?PRINT("Error: ~p~n", [Reason])
  198. %% end
  199. %% end,
  200. %% if_valid_qos(QoS, Add);
  201. %%
  202. %%
  203. %% subscriptions(["del", ClientId]) ->
  204. %% Ok = emqttd_backend:del_subscriptions(bin(ClientId)),
  205. %% ?PRINT("~p~n", [Ok]);
  206. %%
  207. %%
  208. %% subscriptions(["del", ClientId, Topic]) ->
  209. %% Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)),
  210. %% ?PRINT("~p~n", [Ok]);
  211. %%
  212. subscriptions(_) ->
  213. ?USAGE([{"subscriptions list", "List all subscriptions"},
  214. {"subscriptions show <ClientId>", "Show subscriptions of a client"},
  215. {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
  216. {"subscriptions del <ClientId>", "Delete static subscriptions manually"},
  217. {"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
  218. if_could_print(Tab, Fun) ->
  219. case mnesia:table_info(Tab, size) of
  220. Size when Size >= ?MAX_LIMIT ->
  221. ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]);
  222. _Size ->
  223. Keys = mnesia:dirty_all_keys(Tab),
  224. foreach(fun(Key) -> Fun(ets:lookup(Tab, Key)) end, Keys)
  225. end.
  226. if_valid_qos(QoS, Fun) ->
  227. try list_to_integer(QoS) of
  228. Int when ?IS_QOS(Int) -> Fun(Int);
  229. _ -> ?PRINT_MSG("QoS should be 0, 1, 2~n")
  230. catch _:_ ->
  231. ?PRINT_MSG("QoS should be 0, 1, 2~n")
  232. end.
  233. plugins(["list"]) ->
  234. foreach(fun print/1, emqttd_plugins:list());
  235. plugins(["load", Name]) ->
  236. case emqttd_plugins:load(list_to_atom(Name)) of
  237. {ok, StartedApps} ->
  238. ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
  239. {error, Reason} ->
  240. ?PRINT("load plugin error: ~p~n", [Reason])
  241. end;
  242. plugins(["unload", Name]) ->
  243. case emqttd_plugins:unload(list_to_atom(Name)) of
  244. ok ->
  245. ?PRINT("Plugin ~s unloaded successfully.~n", [Name]);
  246. {error, Reason} ->
  247. ?PRINT("unload plugin error: ~p~n", [Reason])
  248. end;
  249. plugins(_) ->
  250. ?USAGE([{"plugins list", "Show loaded plugins"},
  251. {"plugins load <Plugin>", "Load plugin"},
  252. {"plugins unload <Plugin>", "Unload plugin"}]).
  253. %%--------------------------------------------------------------------
  254. %% @doc Bridges command
  255. bridges(["list"]) ->
  256. foreach(fun({Node, Topic, _Pid}) ->
  257. ?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node])
  258. end, emqttd_bridge_sup_sup:bridges());
  259. bridges(["options"]) ->
  260. ?PRINT_MSG("Options:~n"),
  261. ?PRINT_MSG(" qos = 0 | 1 | 2~n"),
  262. ?PRINT_MSG(" prefix = string~n"),
  263. ?PRINT_MSG(" suffix = string~n"),
  264. ?PRINT_MSG(" queue = integer~n"),
  265. ?PRINT_MSG("Example:~n"),
  266. ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
  267. bridges(["start", SNode, Topic]) ->
  268. case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
  269. {ok, _} -> ?PRINT_MSG("bridge is started.~n");
  270. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  271. end;
  272. bridges(["start", SNode, Topic, OptStr]) ->
  273. Opts = parse_opts(bridge, OptStr),
  274. case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of
  275. {ok, _} -> ?PRINT_MSG("bridge is started.~n");
  276. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  277. end;
  278. bridges(["stop", SNode, Topic]) ->
  279. case emqttd_bridge_sup_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
  280. ok -> ?PRINT_MSG("bridge is stopped.~n");
  281. {error, Error} -> ?PRINT("error: ~p~n", [Error])
  282. end;
  283. bridges(_) ->
  284. ?USAGE([{"bridges list", "List bridges"},
  285. {"bridges options", "Bridge options"},
  286. {"bridges start <Node> <Topic>", "Start a bridge"},
  287. {"bridges start <Node> <Topic> <Options>", "Start a bridge with options"},
  288. {"bridges stop <Node> <Topic>", "Stop a bridge"}]).
  289. parse_opts(Cmd, OptStr) ->
  290. Tokens = string:tokens(OptStr, ","),
  291. [parse_opt(Cmd, list_to_atom(Opt), Val)
  292. || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
  293. parse_opt(bridge, qos, Qos) ->
  294. {qos, list_to_integer(Qos)};
  295. parse_opt(bridge, suffix, Suffix) ->
  296. {topic_suffix, bin(Suffix)};
  297. parse_opt(bridge, prefix, Prefix) ->
  298. {topic_prefix, bin(Prefix)};
  299. parse_opt(bridge, queue, Len) ->
  300. {max_queue_len, list_to_integer(Len)};
  301. parse_opt(_Cmd, Opt, _Val) ->
  302. ?PRINT("Bad Option: ~s~n", [Opt]).
  303. %%--------------------------------------------------------------------
  304. %% @doc vm command
  305. vm([]) ->
  306. vm(["all"]);
  307. vm(["all"]) ->
  308. [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
  309. vm(["load"]) ->
  310. [?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()];
  311. vm(["memory"]) ->
  312. [?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
  313. vm(["process"]) ->
  314. foreach(fun({Name, Key}) ->
  315. ?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
  316. end, [{limit, process_limit}, {count, process_count}]);
  317. vm(["io"]) ->
  318. IoInfo = erlang:system_info(check_io),
  319. foreach(fun(Key) ->
  320. ?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)])
  321. end, [max_fds, active_fds]);
  322. vm(["ports"]) ->
  323. foreach(fun({Name, Key}) ->
  324. ?PRINT("ports/~-16s: ~w~n", [Name, erlang:system_info(Key)])
  325. end, [{count, port_count}, {limit, port_limit}]);
  326. vm(_) ->
  327. ?USAGE([{"vm all", "Show info of Erlang VM"},
  328. {"vm load", "Show load of Erlang VM"},
  329. {"vm memory", "Show memory of Erlang VM"},
  330. {"vm process", "Show process of Erlang VM"},
  331. {"vm io", "Show IO of Erlang VM"},
  332. {"vm ports", "Show Ports of Erlang VM"}]).
  333. %%--------------------------------------------------------------------
  334. %% @doc mnesia Command
  335. mnesia([]) ->
  336. mnesia:system_info();
  337. mnesia(_) ->
  338. ?PRINT_CMD("mnesia", "Mnesia system info").
  339. %%--------------------------------------------------------------------
  340. %% @doc Trace Command
  341. trace(["list"]) ->
  342. foreach(fun({{Who, Name}, LogFile}) ->
  343. ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
  344. end, emqttd_trace:all_traces());
  345. trace(["client", ClientId, "off"]) ->
  346. trace_off(client, ClientId);
  347. trace(["client", ClientId, LogFile]) ->
  348. trace_on(client, ClientId, LogFile);
  349. trace(["topic", Topic, "off"]) ->
  350. trace_off(topic, Topic);
  351. trace(["topic", Topic, LogFile]) ->
  352. trace_on(topic, Topic, LogFile);
  353. trace(_) ->
  354. ?USAGE([{"trace list", "List all traces"},
  355. {"trace client <ClientId> <LogFile>","Trace a client"},
  356. {"trace client <ClientId> off", "Stop tracing a client"},
  357. {"trace topic <Topic> <LogFile>", "Trace a topic"},
  358. {"trace topic <Topic> off", "Stop tracing a Topic"}]).
  359. trace_on(Who, Name, LogFile) ->
  360. case emqttd_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of
  361. ok ->
  362. ?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
  363. {error, Error} ->
  364. ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
  365. end.
  366. trace_off(Who, Name) ->
  367. case emqttd_trace:stop_trace({Who, iolist_to_binary(Name)}) of
  368. ok ->
  369. ?PRINT("stop tracing ~s ~s successfully.~n", [Who, Name]);
  370. {error, Error} ->
  371. ?PRINT("stop tracing ~s ~s error: ~p.~n", [Who, Name, Error])
  372. end.
  373. %%--------------------------------------------------------------------
  374. %% @doc Listeners Command
  375. listeners([]) ->
  376. foreach(fun({{Protocol, ListenOn}, Pid}) ->
  377. Info = [{acceptors, esockd:get_acceptors(Pid)},
  378. {max_clients, esockd:get_max_clients(Pid)},
  379. {current_clients,esockd:get_current_clients(Pid)},
  380. {shutdown_count, esockd:get_shutdown_count(Pid)}],
  381. ?PRINT("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
  382. foreach(fun({Key, Val}) ->
  383. ?PRINT(" ~-16s: ~w~n", [Key, Val])
  384. end, Info)
  385. end, esockd:listeners());
  386. listeners(_) ->
  387. ?PRINT_CMD("listeners", "List listeners").
  388. %%--------------------------------------------------------------------
  389. %% Dump ETS
  390. %%--------------------------------------------------------------------
  391. dump(Table) ->
  392. dump(Table, ets:first(Table)).
  393. dump(_Table, '$end_of_table') ->
  394. ok;
  395. dump(Table, Key) ->
  396. case ets:lookup(Table, Key) of
  397. [Record] -> print(Record);
  398. [] -> ok
  399. end,
  400. dump(Table, ets:next(Table, Key)).
  401. print([]) ->
  402. ok;
  403. print(Routes = [#mqtt_route{topic = Topic} | _]) ->
  404. Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes],
  405. ?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]);
  406. %% print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) ->
  407. %% TopicTable = [io_lib:format("~s:~w", [Topic, Qos])
  408. %% || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
  409. %% ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]);
  410. %% print(Topics = [#mqtt_topic{}|_]) ->
  411. %% foreach(fun print/1, Topics);
  412. print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
  413. ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
  414. [Name, Ver, Descr, Active]);
  415. print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username,
  416. peername = Peername, connected_at = ConnectedAt}) ->
  417. ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
  418. [ClientId, CleanSess, Username, emqttd_net:format(Peername),
  419. emqttd_time:now_to_secs(ConnectedAt)]);
  420. %% print(#mqtt_topic{topic = Topic, flags = Flags}) ->
  421. %% ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]);
  422. print(#mqtt_route{topic = Topic, node = Node}) ->
  423. ?PRINT("~s -> ~s~n", [Topic, Node]);
  424. print({ClientId, _ClientPid, CleanSess, SessInfo}) ->
  425. InfoKeys = [max_inflight,
  426. inflight_queue,
  427. message_queue,
  428. message_dropped,
  429. awaiting_rel,
  430. awaiting_ack,
  431. awaiting_comp,
  432. created_at],
  433. ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight_queue=~w, "
  434. "message_queue=~w, message_dropped=~w, "
  435. "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
  436. "created_at=~w)~n",
  437. [ClientId, CleanSess | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
  438. print(subscription, {Sub, Topic, Opts}) when is_pid(Sub) ->
  439. ?PRINT("~p -> ~s: ~p~n", [Sub, Topic, Opts]);
  440. print(subscription, {Sub, Topic, Opts}) ->
  441. ?PRINT("~s -> ~s: ~p~n", [Sub, Topic, Opts]).
  442. format(created_at, Val) ->
  443. emqttd_time:now_to_secs(Val);
  444. format(_, Val) ->
  445. Val.
  446. bin(S) -> iolist_to_binary(S).