emqx_mgmt_cli.erl 33 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_mgmt_cli).
  17. -include_lib("emqx/include/emqx.hrl").
  18. -include_lib("emqx/include/emqx_cm.hrl").
  19. -include_lib("emqx/include/emqx_router.hrl").
  20. -include_lib("emqx/include/emqx_mqtt.hrl").
  21. -include_lib("emqx/include/logger.hrl").
  22. -include("emqx_mgmt.hrl").
  23. -define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])).
  24. -define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
  25. -export([load/0]).
  26. -export([
  27. status/1,
  28. broker/1,
  29. cluster/1,
  30. clients/1,
  31. topics/1,
  32. subscriptions/1,
  33. plugins/1,
  34. listeners/1,
  35. vm/1,
  36. mnesia/1,
  37. trace/1,
  38. traces/1,
  39. log/1,
  40. authz/1,
  41. pem_cache/1,
  42. olp/1,
  43. data/1
  44. ]).
  45. -define(PROC_INFOKEYS, [
  46. status,
  47. memory,
  48. message_queue_len,
  49. total_heap_size,
  50. heap_size,
  51. stack_size,
  52. reductions
  53. ]).
  54. -define(MAX_LIMIT, 10000).
  55. -define(APP, emqx).
  56. -spec load() -> ok.
  57. load() ->
  58. Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
  59. lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
  60. is_cmd(Fun) ->
  61. not lists:member(Fun, [init, load, module_info]).
  62. %%--------------------------------------------------------------------
  63. %% @doc Node status
  64. status([]) ->
  65. {InternalStatus, _ProvidedStatus} = init:get_status(),
  66. emqx_ctl:print("Node ~p ~ts is ~p~n", [node(), emqx_app:get_release(), InternalStatus]);
  67. status(_) ->
  68. emqx_ctl:usage("status", "Show broker status").
  69. %%--------------------------------------------------------------------
  70. %% @doc Query broker
  71. broker([]) ->
  72. Funs = [sysdescr, version, datetime],
  73. [emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs],
  74. emqx_ctl:print("~-10s: ~ts~n", [
  75. uptime, emqx_datetime:human_readable_duration_string(emqx_sys:uptime())
  76. ]);
  77. broker(["stats"]) ->
  78. [
  79. emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
  80. || {Stat, Val} <- lists:sort(emqx_stats:getstats())
  81. ];
  82. broker(["metrics"]) ->
  83. [
  84. emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
  85. || {Metric, Val} <- lists:sort(emqx_metrics:all())
  86. ];
  87. broker(_) ->
  88. emqx_ctl:usage([
  89. {"broker", "Show broker version, uptime and description"},
  90. {"broker stats", "Show broker statistics of clients, topics, subscribers"},
  91. {"broker metrics", "Show broker metrics"}
  92. ]).
  93. %%-----------------------------------------------------------------------------
  94. %% @doc Cluster with other nodes
  95. cluster(["join", SNode]) ->
  96. case ekka:join(ekka_node:parse_name(SNode)) of
  97. ok ->
  98. emqx_ctl:print("Join the cluster successfully.~n"),
  99. cluster(["status"]);
  100. ignore ->
  101. emqx_ctl:print("Ignore.~n");
  102. {error, Error} ->
  103. emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
  104. end;
  105. cluster(["leave"]) ->
  106. case ekka:leave() of
  107. ok ->
  108. emqx_ctl:print("Leave the cluster successfully.~n"),
  109. cluster(["status"]);
  110. {error, Error} ->
  111. emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
  112. end;
  113. cluster(["force-leave", SNode]) ->
  114. case ekka:force_leave(ekka_node:parse_name(SNode)) of
  115. ok ->
  116. emqx_ctl:print("Remove the node from cluster successfully.~n"),
  117. cluster(["status"]);
  118. ignore ->
  119. emqx_ctl:print("Ignore.~n");
  120. {error, Error} ->
  121. emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
  122. end;
  123. cluster(["status"]) ->
  124. emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]);
  125. cluster(["status", "--json"]) ->
  126. Info = sort_map_list_fields(ekka_cluster:info()),
  127. emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]);
  128. cluster(_) ->
  129. emqx_ctl:usage([
  130. {"cluster join <Node>", "Join the cluster"},
  131. {"cluster leave", "Leave the cluster"},
  132. {"cluster force-leave <Node>", "Force the node leave from cluster"},
  133. {"cluster status [--json]", "Cluster status"}
  134. ]).
  135. %% sort lists for deterministic output
  136. sort_map_list_fields(Map) when is_map(Map) ->
  137. lists:foldl(
  138. fun(Field, Acc) ->
  139. sort_map_list_field(Field, Acc)
  140. end,
  141. Map,
  142. maps:keys(Map)
  143. );
  144. sort_map_list_fields(NotMap) ->
  145. NotMap.
  146. sort_map_list_field(Field, Map) ->
  147. case maps:get(Field, Map) of
  148. [_ | _] = L -> Map#{Field := lists:sort(L)};
  149. _ -> Map
  150. end.
  151. %%--------------------------------------------------------------------
  152. %% @doc Query clients
  153. clients(["list"]) ->
  154. dump(?CHAN_TAB, client);
  155. clients(["show", ClientId]) ->
  156. if_client(ClientId, fun print/1);
  157. clients(["kick", ClientId]) ->
  158. ok = emqx_cm:kick_session(bin(ClientId)),
  159. emqx_ctl:print("ok~n");
  160. clients(_) ->
  161. emqx_ctl:usage([
  162. {"clients list", "List all clients"},
  163. {"clients show <ClientId>", "Show a client"},
  164. {"clients kick <ClientId>", "Kick out a client"}
  165. ]).
  166. if_client(ClientId, Fun) ->
  167. case ets:lookup(?CHAN_TAB, (bin(ClientId))) of
  168. [] -> emqx_ctl:print("Not Found.~n");
  169. [Channel] -> Fun({client, Channel})
  170. end.
  171. %%--------------------------------------------------------------------
  172. %% @doc Topics Command
  173. topics(["list"]) ->
  174. dump(?ROUTE_TAB, emqx_topic);
  175. topics(["show", Topic]) ->
  176. Routes = ets:lookup(?ROUTE_TAB, bin(Topic)),
  177. [print({emqx_topic, Route}) || Route <- Routes];
  178. topics(_) ->
  179. emqx_ctl:usage([
  180. {"topics list", "List all topics"},
  181. {"topics show <Topic>", "Show a topic"}
  182. ]).
  183. subscriptions(["list"]) ->
  184. lists:foreach(
  185. fun(Suboption) ->
  186. print({?SUBOPTION, Suboption})
  187. end,
  188. ets:tab2list(?SUBOPTION)
  189. );
  190. subscriptions(["show", ClientId]) ->
  191. case ets:lookup(emqx_subid, bin(ClientId)) of
  192. [] ->
  193. emqx_ctl:print("Not Found.~n");
  194. [{_, Pid}] ->
  195. case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of
  196. [] -> emqx_ctl:print("Not Found.~n");
  197. Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption]
  198. end
  199. end;
  200. subscriptions(["add", ClientId, Topic, QoS]) ->
  201. if_valid_qos(QoS, fun(IntQos) ->
  202. case ets:lookup(?CHAN_TAB, bin(ClientId)) of
  203. [] ->
  204. emqx_ctl:print("Error: Channel not found!");
  205. [{_, Pid}] ->
  206. {Topic1, Options} = emqx_topic:parse(bin(Topic)),
  207. Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]},
  208. emqx_ctl:print("ok~n")
  209. end
  210. end);
  211. subscriptions(["del", ClientId, Topic]) ->
  212. case ets:lookup(?CHAN_TAB, bin(ClientId)) of
  213. [] ->
  214. emqx_ctl:print("Error: Channel not found!");
  215. [{_, Pid}] ->
  216. Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]},
  217. emqx_ctl:print("ok~n")
  218. end;
  219. subscriptions(_) ->
  220. emqx_ctl:usage(
  221. [
  222. {"subscriptions list", "List all subscriptions"},
  223. {"subscriptions show <ClientId>", "Show subscriptions of a client"},
  224. {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
  225. {"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}
  226. ]
  227. ).
  228. if_valid_qos(QoS, Fun) ->
  229. try list_to_integer(QoS) of
  230. Int when ?IS_QOS(Int) -> Fun(Int);
  231. _ -> emqx_ctl:print("QoS should be 0, 1, 2~n")
  232. catch
  233. _:_ ->
  234. emqx_ctl:print("QoS should be 0, 1, 2~n")
  235. end.
  236. plugins(["list"]) ->
  237. emqx_plugins_cli:list(fun emqx_ctl:print/2);
  238. plugins(["describe", NameVsn]) ->
  239. emqx_plugins_cli:describe(NameVsn, fun emqx_ctl:print/2);
  240. plugins(["install", NameVsn]) ->
  241. emqx_plugins_cli:ensure_installed(NameVsn, fun emqx_ctl:print/2);
  242. plugins(["uninstall", NameVsn]) ->
  243. emqx_plugins_cli:ensure_uninstalled(NameVsn, fun emqx_ctl:print/2);
  244. plugins(["start", NameVsn]) ->
  245. emqx_plugins_cli:ensure_started(NameVsn, fun emqx_ctl:print/2);
  246. plugins(["stop", NameVsn]) ->
  247. emqx_plugins_cli:ensure_stopped(NameVsn, fun emqx_ctl:print/2);
  248. plugins(["restart", NameVsn]) ->
  249. emqx_plugins_cli:restart(NameVsn, fun emqx_ctl:print/2);
  250. plugins(["disable", NameVsn]) ->
  251. emqx_plugins_cli:ensure_disabled(NameVsn, fun emqx_ctl:print/2);
  252. plugins(["enable", NameVsn]) ->
  253. emqx_plugins_cli:ensure_enabled(NameVsn, no_move, fun emqx_ctl:print/2);
  254. plugins(["enable", NameVsn, "front"]) ->
  255. emqx_plugins_cli:ensure_enabled(NameVsn, front, fun emqx_ctl:print/2);
  256. plugins(["enable", NameVsn, "rear"]) ->
  257. emqx_plugins_cli:ensure_enabled(NameVsn, rear, fun emqx_ctl:print/2);
  258. plugins(["enable", NameVsn, "before", Other]) ->
  259. emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2);
  260. plugins(_) ->
  261. emqx_ctl:usage(
  262. [
  263. {"plugins <command> [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"},
  264. {"plugins list", "List all installed plugins"},
  265. {"plugins describe Name-Vsn", "Describe an installed plugins"},
  266. {"plugins install Name-Vsn",
  267. "Install a plugin package placed\n"
  268. "in plugin'sinstall_dir"},
  269. {"plugins uninstall Name-Vsn",
  270. "Uninstall a plugin. NOTE: it deletes\n"
  271. "all files in install_dir/Name-Vsn"},
  272. {"plugins start Name-Vsn", "Start a plugin"},
  273. {"plugins stop Name-Vsn", "Stop a plugin"},
  274. {"plugins restart Name-Vsn", "Stop then start a plugin"},
  275. {"plugins disable Name-Vsn", "Disable auto-boot"},
  276. {"plugins enable Name-Vsn [Position]",
  277. "Enable auto-boot at Position in the boot list, where Position could be\n"
  278. "'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n"
  279. "The Position parameter can be used to adjust the boot order.\n"
  280. "If no Position is given, an already configured plugin\n"
  281. "will stay at is old position; a newly plugin is appended to the rear\n"
  282. "e.g. plugins disable foo-0.1.0 front\n"
  283. " plugins enable bar-0.2.0 before foo-0.1.0"}
  284. ]
  285. ).
  286. %%--------------------------------------------------------------------
  287. %% @doc vm command
  288. vm([]) ->
  289. vm(["all"]);
  290. vm(["all"]) ->
  291. [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
  292. vm(["load"]) ->
  293. [emqx_ctl:print("cpu/~-20s: ~w~n", [L, V]) || {L, V} <- emqx_vm:loads()];
  294. vm(["memory"]) ->
  295. [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
  296. vm(["process"]) ->
  297. [
  298. emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
  299. || {Name, Key} <- [{limit, process_limit}, {count, process_count}]
  300. ];
  301. vm(["io"]) ->
  302. IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))),
  303. [
  304. emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
  305. || Key <- [max_fds, active_fds]
  306. ];
  307. vm(["ports"]) ->
  308. [
  309. emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
  310. || {Name, Key} <- [{count, port_count}, {limit, port_limit}]
  311. ];
  312. vm(_) ->
  313. emqx_ctl:usage([
  314. {"vm all", "Show info of Erlang VM"},
  315. {"vm load", "Show load of Erlang VM"},
  316. {"vm memory", "Show memory of Erlang VM"},
  317. {"vm process", "Show process of Erlang VM"},
  318. {"vm io", "Show IO of Erlang VM"},
  319. {"vm ports", "Show Ports of Erlang VM"}
  320. ]).
  321. %%--------------------------------------------------------------------
  322. %% @doc mnesia Command
  323. mnesia([]) ->
  324. mnesia:system_info();
  325. mnesia(_) ->
  326. emqx_ctl:usage([{"mnesia", "Mnesia system info"}]).
  327. %%--------------------------------------------------------------------
  328. %% @doc Logger Command
  329. log(["set-level", Level]) ->
  330. case emqx_utils:safe_to_existing_atom(Level) of
  331. {ok, Level1} ->
  332. case emqx_logger:set_log_level(Level1) of
  333. ok -> emqx_ctl:print("~ts~n", [Level]);
  334. Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error])
  335. end;
  336. _ ->
  337. emqx_ctl:print("[error] invalid level: ~p~n", [Level])
  338. end;
  339. log(["primary-level"]) ->
  340. Level = emqx_logger:get_primary_log_level(),
  341. emqx_ctl:print("~ts~n", [Level]);
  342. log(["primary-level", Level]) ->
  343. case emqx_utils:safe_to_existing_atom(Level) of
  344. {ok, Level1} ->
  345. _ = emqx_logger:set_primary_log_level(Level1),
  346. ok;
  347. _ ->
  348. emqx_ctl:print("[error] invalid level: ~p~n", [Level])
  349. end,
  350. emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]);
  351. log(["handlers", "list"]) ->
  352. _ = [
  353. emqx_ctl:print(
  354. "LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n",
  355. [Id, Level, Dst, Status]
  356. )
  357. || #{
  358. id := Id,
  359. level := Level,
  360. dst := Dst,
  361. status := Status
  362. } <- emqx_logger:get_log_handlers()
  363. ],
  364. ok;
  365. log(["handlers", "start", HandlerId]) ->
  366. case emqx_utils:safe_to_existing_atom(HandlerId) of
  367. {ok, HandlerId1} ->
  368. case emqx_logger:start_log_handler(HandlerId1) of
  369. ok ->
  370. emqx_ctl:print("log handler ~ts started~n", [HandlerId]);
  371. {error, Reason} ->
  372. emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [
  373. HandlerId, Reason
  374. ])
  375. end;
  376. _ ->
  377. emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
  378. end;
  379. log(["handlers", "stop", HandlerId]) ->
  380. case emqx_utils:safe_to_existing_atom(HandlerId) of
  381. {ok, HandlerId1} ->
  382. case emqx_logger:stop_log_handler(HandlerId1) of
  383. ok ->
  384. emqx_ctl:print("log handler ~ts stopped~n", [HandlerId1]);
  385. {error, Reason} ->
  386. emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [
  387. HandlerId1, Reason
  388. ])
  389. end;
  390. _ ->
  391. emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
  392. end;
  393. log(["handlers", "set-level", HandlerId, Level]) ->
  394. case emqx_utils:safe_to_existing_atom(HandlerId) of
  395. {ok, HandlerId1} ->
  396. case emqx_utils:safe_to_existing_atom(Level) of
  397. {ok, Level1} ->
  398. case emqx_logger:set_log_handler_level(HandlerId1, Level1) of
  399. ok ->
  400. #{level := NewLevel} = emqx_logger:get_log_handler(HandlerId1),
  401. emqx_ctl:print("~ts~n", [NewLevel]);
  402. {error, Error} ->
  403. emqx_ctl:print("[error] ~p~n", [Error])
  404. end;
  405. _ ->
  406. emqx_ctl:print("[error] invalid level:~p~n", [Level])
  407. end;
  408. _ ->
  409. emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
  410. end;
  411. log(_) ->
  412. emqx_ctl:usage(
  413. [
  414. {"log set-level <Level>", "Set the overall log level"},
  415. {"log primary-level", "Show the primary log level now"},
  416. {"log primary-level <Level>", "Set the primary log level"},
  417. {"log handlers list", "Show log handlers"},
  418. {"log handlers start <HandlerId>", "Start a log handler"},
  419. {"log handlers stop <HandlerId>", "Stop a log handler"},
  420. {"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}
  421. ]
  422. ).
  423. %%--------------------------------------------------------------------
  424. %% @doc Trace Command
  425. trace(["list"]) ->
  426. lists:foreach(
  427. fun(Trace) ->
  428. #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
  429. emqx_ctl:print("Trace(~s=~s, level=~s, destination=~0p)~n", [Type, Filter, Level, Dst])
  430. end,
  431. emqx_trace_handler:running()
  432. );
  433. trace(["stop", Operation, Filter0]) ->
  434. case trace_type(Operation, Filter0) of
  435. {ok, Type, Filter} -> trace_off(Type, Filter);
  436. error -> trace([])
  437. end;
  438. trace(["start", Operation, ClientId, LogFile]) ->
  439. trace(["start", Operation, ClientId, LogFile, "all"]);
  440. trace(["start", Operation, Filter0, LogFile, Level]) ->
  441. case trace_type(Operation, Filter0) of
  442. {ok, Type, Filter} ->
  443. trace_on(
  444. name(Filter0),
  445. Type,
  446. Filter,
  447. list_to_existing_atom(Level),
  448. LogFile
  449. );
  450. error ->
  451. trace([])
  452. end;
  453. trace(_) ->
  454. emqx_ctl:usage([
  455. {"trace list", "List all traces started on local node"},
  456. {"trace start client <ClientId> <File> [<Level>]", "Traces for a client on local node"},
  457. {"trace stop client <ClientId>", "Stop tracing for a client on local node"},
  458. {"trace start topic <Topic> <File> [<Level>] ", "Traces for a topic on local node"},
  459. {"trace stop topic <Topic> ", "Stop tracing for a topic on local node"},
  460. {"trace start ip_address <IP> <File> [<Level>] ",
  461. "Traces for a client ip on local node"},
  462. {"trace stop ip_addresss <IP> ", "Stop tracing for a client ip on local node"}
  463. ]).
  464. trace_on(Name, Type, Filter, Level, LogFile) ->
  465. case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of
  466. ok ->
  467. emqx_trace:check(),
  468. emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]);
  469. {error, Error} ->
  470. emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error])
  471. end.
  472. trace_off(Type, Filter) ->
  473. ?TRACE("CLI", "trace_stopping", #{Type => Filter}),
  474. case emqx_trace_handler:uninstall(Type, name(Filter)) of
  475. ok ->
  476. emqx_trace:check(),
  477. emqx_ctl:print("stop tracing ~s ~s successfully~n", [Type, Filter]);
  478. {error, Error} ->
  479. emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Type, Filter, Error])
  480. end.
  481. %%--------------------------------------------------------------------
  482. %% @doc Trace Cluster Command
  483. -define(DEFAULT_TRACE_DURATION, "1800").
  484. traces(["list"]) ->
  485. {200, List} = emqx_mgmt_api_trace:trace(get, []),
  486. case List of
  487. [] ->
  488. emqx_ctl:print("Cluster Trace is empty~n", []);
  489. _ ->
  490. lists:foreach(
  491. fun(Trace) ->
  492. #{
  493. type := Type,
  494. name := Name,
  495. status := Status,
  496. log_size := LogSize
  497. } = Trace,
  498. emqx_ctl:print(
  499. "Trace(~s: ~s=~s, ~s, LogSize:~0p)~n",
  500. [Name, Type, maps:get(Type, Trace), Status, LogSize]
  501. )
  502. end,
  503. List
  504. )
  505. end,
  506. length(List);
  507. traces(["stop", Name]) ->
  508. trace_cluster_off(Name);
  509. traces(["delete", Name]) ->
  510. trace_cluster_del(Name);
  511. traces(["start", Name, Operation, Filter]) ->
  512. traces(["start", Name, Operation, Filter, ?DEFAULT_TRACE_DURATION]);
  513. traces(["start", Name, Operation, Filter0, DurationS]) ->
  514. case trace_type(Operation, Filter0) of
  515. {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
  516. error -> traces([])
  517. end;
  518. traces(_) ->
  519. emqx_ctl:usage([
  520. {"traces list", "List all cluster traces started"},
  521. {"traces start <Name> client <ClientId> [<Duration>]", "Traces for a client in cluster"},
  522. {"traces start <Name> topic <Topic> [<Duration>]", "Traces for a topic in cluster"},
  523. {"traces start <Name> ip_address <IPAddr> [<Duration>]",
  524. "Traces for a client IP in cluster\n"
  525. "Trace will start immediately on all nodes, including the core and replicant,\n"
  526. "and will end after <Duration> seconds. The default value for <Duration> is "
  527. ?DEFAULT_TRACE_DURATION
  528. " seconds."},
  529. {"traces stop <Name>", "Stop trace in cluster"},
  530. {"traces delete <Name>", "Delete trace in cluster"}
  531. ]).
  532. trace_cluster_on(Name, Type, Filter, DurationS0) ->
  533. Now = emqx_trace:now_second(),
  534. DurationS = list_to_integer(DurationS0),
  535. Trace = #{
  536. name => bin(Name),
  537. type => Type,
  538. Type => bin(Filter),
  539. start_at => Now,
  540. end_at => Now + DurationS
  541. },
  542. case emqx_trace:create(Trace) of
  543. {ok, _} ->
  544. emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
  545. {error, Error} ->
  546. emqx_ctl:print(
  547. "[error] cluster_trace ~s ~s=~s ~p~n",
  548. [Name, Type, Filter, Error]
  549. )
  550. end.
  551. trace_cluster_del(Name) ->
  552. case emqx_trace:delete(bin(Name)) of
  553. ok -> emqx_ctl:print("Del cluster_trace ~s successfully~n", [Name]);
  554. {error, Error} -> emqx_ctl:print("[error] Del cluster_trace ~s: ~p~n", [Name, Error])
  555. end.
  556. trace_cluster_off(Name) ->
  557. case emqx_trace:update(bin(Name), false) of
  558. ok -> emqx_ctl:print("Stop cluster_trace ~s successfully~n", [Name]);
  559. {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error])
  560. end.
  561. trace_type("client", ClientId) -> {ok, clientid, bin(ClientId)};
  562. trace_type("topic", Topic) -> {ok, topic, bin(Topic)};
  563. trace_type("ip_address", IP) -> {ok, ip_address, IP};
  564. trace_type(_, _) -> error.
  565. %%--------------------------------------------------------------------
  566. %% @doc Listeners Command
  567. listeners([]) ->
  568. lists:foreach(
  569. fun({ID, Conf}) ->
  570. Bind = maps:get(bind, Conf),
  571. Acceptors = maps:get(acceptors, Conf),
  572. ProxyProtocol = maps:get(proxy_protocol, Conf, undefined),
  573. Running = maps:get(running, Conf),
  574. case Running of
  575. true ->
  576. CurrentConns =
  577. case emqx_listeners:current_conns(ID, Bind) of
  578. {error, _} -> [];
  579. CC -> [{current_conn, CC}]
  580. end,
  581. MaxConn =
  582. case emqx_listeners:max_conns(ID, Bind) of
  583. {error, _} -> [];
  584. MC -> [{max_conns, MC}]
  585. end,
  586. ShutdownCount =
  587. case emqx_listeners:shutdown_count(ID, Bind) of
  588. {error, _} -> [];
  589. SC -> [{shutdown_count, SC}]
  590. end;
  591. false ->
  592. CurrentConns = [],
  593. MaxConn = [],
  594. ShutdownCount = []
  595. end,
  596. Info =
  597. [
  598. {listen_on, {string, emqx_listeners:format_bind(Bind)}},
  599. {acceptors, Acceptors},
  600. {proxy_protocol, ProxyProtocol},
  601. {running, Running}
  602. ] ++ CurrentConns ++ MaxConn ++ ShutdownCount,
  603. emqx_ctl:print("~ts~n", [ID]),
  604. lists:foreach(fun indent_print/1, Info)
  605. end,
  606. emqx_listeners:list()
  607. );
  608. listeners(["stop", ListenerId]) ->
  609. case emqx_utils:safe_to_existing_atom(ListenerId) of
  610. {ok, ListenerId1} ->
  611. case emqx_listeners:stop_listener(ListenerId1) of
  612. ok ->
  613. emqx_ctl:print("Stop ~ts listener successfully.~n", [ListenerId]);
  614. {error, Error} ->
  615. emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error])
  616. end;
  617. _ ->
  618. emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
  619. end;
  620. listeners(["start", ListenerId]) ->
  621. case emqx_utils:safe_to_existing_atom(ListenerId) of
  622. {ok, ListenerId1} ->
  623. case emqx_listeners:start_listener(ListenerId1) of
  624. ok ->
  625. emqx_ctl:print("Started ~ts listener successfully.~n", [ListenerId]);
  626. {error, Error} ->
  627. emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error])
  628. end;
  629. _ ->
  630. emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
  631. end;
  632. listeners(["restart", ListenerId]) ->
  633. case emqx_utils:safe_to_existing_atom(ListenerId) of
  634. {ok, ListenerId1} ->
  635. case emqx_listeners:restart_listener(ListenerId1) of
  636. ok ->
  637. emqx_ctl:print("Restarted ~ts listener successfully.~n", [ListenerId]);
  638. {error, Error} ->
  639. emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error])
  640. end;
  641. _ ->
  642. emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
  643. end;
  644. listeners(_) ->
  645. emqx_ctl:usage([
  646. {"listeners", "List listeners"},
  647. {"listeners stop <Identifier>", "Stop a listener"},
  648. {"listeners start <Identifier>", "Start a listener"},
  649. {"listeners restart <Identifier>", "Restart a listener"}
  650. ]).
  651. %%--------------------------------------------------------------------
  652. %% @doc authz Command
  653. authz(["cache-clean", "node", Node]) ->
  654. Msg = io_lib:format("Authorization cache drain started on node ~ts", [Node]),
  655. with_log(fun() -> for_node(fun emqx_mgmt:clean_authz_cache_all/1, Node) end, Msg);
  656. authz(["cache-clean", "all"]) ->
  657. Msg = "Authorization cache drain started on all nodes",
  658. with_log(fun emqx_mgmt:clean_authz_cache_all/0, Msg);
  659. authz(["cache-clean", ClientId]) ->
  660. Msg = io_lib:format("Drain ~ts authz cache", [ClientId]),
  661. with_log(fun() -> emqx_mgmt:clean_authz_cache(ClientId) end, Msg);
  662. authz(_) ->
  663. emqx_ctl:usage(
  664. [
  665. {"authz cache-clean all", "Clears authorization cache on all nodes"},
  666. {"authz cache-clean node <Node>", "Clears authorization cache on given node"},
  667. {"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
  668. ]
  669. ).
  670. pem_cache(["clean", "all"]) ->
  671. with_log(fun emqx_mgmt:clean_pem_cache_all/0, "PEM cache clean");
  672. pem_cache(["clean", "node", Node]) ->
  673. Msg = io_lib:format("~ts PEM cache clean", [Node]),
  674. with_log(fun() -> for_node(fun emqx_mgmt:clean_pem_cache_all/1, Node) end, Msg);
  675. pem_cache(_) ->
  676. emqx_ctl:usage([
  677. {"pem_cache clean all", "Clears x509 certificate cache on all nodes"},
  678. {"pem_cache clean node <Node>", "Clears x509 certificate cache on given node"}
  679. ]).
  680. %%--------------------------------------------------------------------
  681. %% @doc OLP (Overload Protection related)
  682. olp(["status"]) ->
  683. S =
  684. case emqx_olp:is_overloaded() of
  685. true -> "overloaded";
  686. false -> "not overloaded"
  687. end,
  688. emqx_ctl:print("~p is ~s ~n", [node(), S]);
  689. olp(["disable"]) ->
  690. Res = emqx_olp:disable(),
  691. emqx_ctl:print("Disable overload protetion ~p : ~p ~n", [node(), Res]);
  692. olp(["enable"]) ->
  693. Res = emqx_olp:enable(),
  694. emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]);
  695. olp(_) ->
  696. emqx_ctl:usage([
  697. {"olp status", "Return OLP status if system is overloaded"},
  698. {"olp enable", "Enable overload protection"},
  699. {"olp disable", "Disable overload protection"}
  700. ]).
  701. %%--------------------------------------------------------------------
  702. %% @doc data Command
  703. data(["export"]) ->
  704. case emqx_mgmt_data_backup:export(?DATA_BACKUP_OPTS) of
  705. {ok, #{filename := Filename}} ->
  706. emqx_ctl:print("Data has been successfully exported to ~s.~n", [Filename]);
  707. {error, Reason} ->
  708. Reason1 = emqx_mgmt_data_backup:format_error(Reason),
  709. emqx_ctl:print("[error] Data export failed, reason: ~p.~n", [Reason1])
  710. end;
  711. data(["import", Filename]) ->
  712. case emqx_mgmt_data_backup:import(Filename, ?DATA_BACKUP_OPTS) of
  713. {ok, #{db_errors := DbErrs, config_errors := ConfErrs}} when
  714. map_size(DbErrs) =:= 0, map_size(ConfErrs) =:= 0
  715. ->
  716. emqx_ctl:print("Data has been imported successfully.~n");
  717. {ok, _} ->
  718. emqx_ctl:print(
  719. "Data has been imported, but some errors occurred, see the the log above.~n"
  720. );
  721. {error, Reason} ->
  722. Reason1 = emqx_mgmt_data_backup:format_error(Reason),
  723. emqx_ctl:print("[error] Data import failed, reason: ~p.~n", [Reason1])
  724. end;
  725. data(_) ->
  726. emqx_ctl:usage([
  727. {"data import <File>", "Import data from the specified tar archive file"},
  728. {"data export", "Export data"}
  729. ]).
  730. %%--------------------------------------------------------------------
  731. %% Dump ETS
  732. %%--------------------------------------------------------------------
  733. dump(Table, Tag) ->
  734. dump(Table, Tag, ets:first(Table), []).
  735. dump(_Table, _, '$end_of_table', Result) ->
  736. lists:reverse(Result);
  737. dump(Table, Tag, Key, Result) ->
  738. PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)],
  739. dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
  740. print({_, []}) ->
  741. ok;
  742. print({client, {ClientId, ChanPid}}) ->
  743. Attrs =
  744. case emqx_cm:get_chan_info(ClientId, ChanPid) of
  745. undefined -> #{};
  746. Attrs0 -> Attrs0
  747. end,
  748. Stats =
  749. case emqx_cm:get_chan_stats(ClientId, ChanPid) of
  750. undefined -> #{};
  751. Stats0 -> maps:from_list(Stats0)
  752. end,
  753. ClientInfo = maps:get(clientinfo, Attrs, #{}),
  754. ConnInfo = maps:get(conninfo, Attrs, #{}),
  755. Session = maps:get(session, Attrs, #{}),
  756. Connected =
  757. case maps:get(conn_state, Attrs) of
  758. connected -> true;
  759. _ -> false
  760. end,
  761. Info = lists:foldl(
  762. fun(Items, Acc) ->
  763. maps:merge(Items, Acc)
  764. end,
  765. #{connected => Connected},
  766. [
  767. maps:with(
  768. [
  769. subscriptions_cnt,
  770. inflight_cnt,
  771. awaiting_rel_cnt,
  772. mqueue_len,
  773. mqueue_dropped,
  774. send_msg
  775. ],
  776. Stats
  777. ),
  778. maps:with([clientid, username], ClientInfo),
  779. maps:with(
  780. [
  781. peername,
  782. clean_start,
  783. keepalive,
  784. expiry_interval,
  785. connected_at,
  786. disconnected_at
  787. ],
  788. ConnInfo
  789. ),
  790. maps:with([created_at], Session)
  791. ]
  792. ),
  793. InfoKeys =
  794. [
  795. clientid,
  796. username,
  797. peername,
  798. clean_start,
  799. keepalive,
  800. expiry_interval,
  801. subscriptions_cnt,
  802. inflight_cnt,
  803. awaiting_rel_cnt,
  804. send_msg,
  805. mqueue_len,
  806. mqueue_dropped,
  807. connected,
  808. created_at,
  809. connected_at
  810. ] ++
  811. case maps:is_key(disconnected_at, Info) of
  812. true -> [disconnected_at];
  813. false -> []
  814. end,
  815. Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
  816. emqx_ctl:print(
  817. "Client(~ts, username=~ts, peername=~ts, clean_start=~ts, "
  818. "keepalive=~w, session_expiry_interval=~w, subscriptions=~w, "
  819. "inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, "
  820. "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" ++
  821. case maps:is_key(disconnected_at, Info1) of
  822. true -> ", disconnected_at=~w)~n";
  823. false -> ")~n"
  824. end,
  825. [format(K, maps:get(K, Info1)) || K <- InfoKeys]
  826. );
  827. print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) ->
  828. emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
  829. print({emqx_topic, #route{topic = Topic, dest = Node}}) ->
  830. emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
  831. print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
  832. SubId = maps:get(subid, Options),
  833. QoS = maps:get(qos, Options, 0),
  834. NL = maps:get(nl, Options, 0),
  835. RH = maps:get(rh, Options, 0),
  836. RAP = maps:get(rap, Options, 0),
  837. emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]).
  838. format(_, undefined) ->
  839. undefined;
  840. format(peername, {IPAddr, Port}) ->
  841. IPStr = emqx_mgmt_util:ntoa(IPAddr),
  842. io_lib:format("~ts:~p", [IPStr, Port]);
  843. format(_, Val) ->
  844. Val.
  845. bin(S) -> iolist_to_binary(S).
  846. indent_print({Key, {string, Val}}) ->
  847. emqx_ctl:print(" ~-16s: ~ts~n", [Key, Val]);
  848. indent_print({Key, Val}) ->
  849. emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
  850. name(Filter) ->
  851. iolist_to_binary(["CLI-", Filter]).
  852. for_node(Fun, Node) ->
  853. try list_to_existing_atom(Node) of
  854. NodeAtom ->
  855. Fun(NodeAtom)
  856. catch
  857. error:badarg ->
  858. {error, unknown_node}
  859. end.
  860. with_log(Fun, Msg) ->
  861. case Fun() of
  862. ok ->
  863. emqx_ctl:print("~s OK~n", [Msg]);
  864. {error, Reason} ->
  865. emqx_ctl:print("~s FAILED~n~p~n", [Msg, Reason])
  866. end.