|
|
@@ -79,10 +79,12 @@ broker([]) ->
|
|
|
emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]);
|
|
|
|
|
|
broker(["stats"]) ->
|
|
|
- [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) || {Stat, Val} <- lists:sort(emqx_stats:getstats())];
|
|
|
+ [emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
|
|
|
+ || {Stat, Val} <- lists:sort(emqx_stats:getstats())];
|
|
|
|
|
|
broker(["metrics"]) ->
|
|
|
- [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqx_metrics:all())];
|
|
|
+ [emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
|
|
|
+ || {Metric, Val} <- lists:sort(emqx_metrics:all())];
|
|
|
|
|
|
broker(_) ->
|
|
|
emqx_ctl:usage([{"broker", "Show broker version, uptime and description"},
|
|
|
@@ -207,10 +209,11 @@ subscriptions(["del", ClientId, Topic]) ->
|
|
|
end;
|
|
|
|
|
|
subscriptions(_) ->
|
|
|
- emqx_ctl:usage([{"subscriptions list", "List all subscriptions"},
|
|
|
- {"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
|
|
- {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
|
|
- {"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
|
|
|
+ emqx_ctl:usage(
|
|
|
+ [{"subscriptions list", "List all subscriptions"},
|
|
|
+ {"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
|
|
+ {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
|
|
+ {"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
|
|
|
|
|
|
if_valid_qos(QoS, Fun) ->
|
|
|
try list_to_integer(QoS) of
|
|
|
@@ -279,14 +282,17 @@ vm(["memory"]) ->
|
|
|
[emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
|
|
|
|
|
|
vm(["process"]) ->
|
|
|
- [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{limit, process_limit}, {count, process_count}]];
|
|
|
+ [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
|
|
|
+ || {Name, Key} <- [{limit, process_limit}, {count, process_count}]];
|
|
|
|
|
|
vm(["io"]) ->
|
|
|
IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))),
|
|
|
- [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) || Key <- [max_fds, active_fds]];
|
|
|
+ [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
|
|
|
+ || Key <- [max_fds, active_fds]];
|
|
|
|
|
|
vm(["ports"]) ->
|
|
|
- [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{count, port_count}, {limit, port_limit}]];
|
|
|
+ [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
|
|
|
+ || {Name, Key} <- [{count, port_count}, {limit, port_limit}]];
|
|
|
|
|
|
vm(_) ->
|
|
|
emqx_ctl:usage([{"vm all", "Show info of Erlang VM"},
|
|
|
@@ -323,8 +329,14 @@ log(["primary-level", Level]) ->
|
|
|
emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]);
|
|
|
|
|
|
log(["handlers", "list"]) ->
|
|
|
- _ = [emqx_ctl:print("LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n", [Id, Level, Dst, Status])
|
|
|
- || #{id := Id, level := Level, dst := Dst, status := Status} <- emqx_logger:get_log_handlers()],
|
|
|
+ _ = [emqx_ctl:print(
|
|
|
+ "LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n",
|
|
|
+ [Id, Level, Dst, Status]
|
|
|
+ )
|
|
|
+ || #{id := Id,
|
|
|
+ level := Level,
|
|
|
+ dst := Dst,
|
|
|
+ status := Status} <- emqx_logger:get_log_handlers()],
|
|
|
ok;
|
|
|
|
|
|
log(["handlers", "start", HandlerId]) ->
|
|
|
@@ -351,21 +363,25 @@ log(["handlers", "set-level", HandlerId, Level]) ->
|
|
|
end;
|
|
|
|
|
|
log(_) ->
|
|
|
- emqx_ctl:usage([{"log set-level <Level>", "Set the overall log level"},
|
|
|
- {"log primary-level", "Show the primary log level now"},
|
|
|
- {"log primary-level <Level>","Set the primary log level"},
|
|
|
- {"log handlers list", "Show log handlers"},
|
|
|
- {"log handlers start <HandlerId>", "Start a log handler"},
|
|
|
- {"log handlers stop <HandlerId>", "Stop a log handler"},
|
|
|
- {"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}]).
|
|
|
+ emqx_ctl:usage(
|
|
|
+ [{"log set-level <Level>", "Set the overall log level"},
|
|
|
+ {"log primary-level", "Show the primary log level now"},
|
|
|
+ {"log primary-level <Level>","Set the primary log level"},
|
|
|
+ {"log handlers list", "Show log handlers"},
|
|
|
+ {"log handlers start <HandlerId>", "Start a log handler"},
|
|
|
+ {"log handlers stop <HandlerId>", "Stop a log handler"},
|
|
|
+ {"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% @doc Trace Command
|
|
|
|
|
|
trace(["list"]) ->
|
|
|
lists:foreach(fun({{Who, Name}, {Level, LogFile}}) ->
|
|
|
- emqx_ctl:print("Trace(~ts=~ts, level=~ts, destination=~p)~n", [Who, Name, Level, LogFile])
|
|
|
- end, emqx_tracer:lookup_traces());
|
|
|
+ emqx_ctl:print(
|
|
|
+ "Trace(~ts=~ts, level=~ts, destination=~p)~n",
|
|
|
+ [Who, Name, Level, LogFile]
|
|
|
+ )
|
|
|
+ end, emqx_tracer:lookup_traces());
|
|
|
|
|
|
trace(["stop", "client", ClientId]) ->
|
|
|
trace_off(clientid, ClientId);
|
|
|
@@ -489,10 +505,11 @@ authz(["cache-clean", ClientId]) ->
|
|
|
emqx_mgmt:clean_authz_cache(ClientId);
|
|
|
|
|
|
authz(_) ->
|
|
|
- emqx_ctl:usage([{"authz cache-clean all", "Clears authorization cache on all nodes"},
|
|
|
- {"authz cache-clean node <Node>", "Clears authorization cache on given node"},
|
|
|
- {"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
|
|
|
- ]).
|
|
|
+ emqx_ctl:usage(
|
|
|
+ [{"authz cache-clean all", "Clears authorization cache on all nodes"},
|
|
|
+ {"authz cache-clean node <Node>", "Clears authorization cache on given node"},
|
|
|
+ {"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
|
|
|
+ ]).
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -560,23 +577,24 @@ print({client, {ClientId, ChanPid}}) ->
|
|
|
maps:with([peername, clean_start, keepalive, expiry_interval,
|
|
|
connected_at, disconnected_at], ConnInfo),
|
|
|
maps:with([created_at], Session)]),
|
|
|
- InfoKeys = [clientid, username, peername,
|
|
|
- clean_start, keepalive, expiry_interval,
|
|
|
- subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
|
|
|
+ InfoKeys = [clientid, username, peername, clean_start, keepalive,
|
|
|
+ expiry_interval, subscriptions_cnt, inflight_cnt,
|
|
|
+ awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
|
|
|
connected, created_at, connected_at] ++
|
|
|
case maps:is_key(disconnected_at, Info) of
|
|
|
true -> [disconnected_at];
|
|
|
false -> []
|
|
|
end,
|
|
|
Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
|
|
|
- emqx_ctl:print("Client(~ts, username=~ts, peername=~ts, "
|
|
|
- "clean_start=~ts, keepalive=~w, session_expiry_interval=~w, "
|
|
|
- "subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, "
|
|
|
- "connected=~ts, created_at=~w, connected_at=~w" ++
|
|
|
- case maps:is_key(disconnected_at, Info1) of
|
|
|
- true -> ", disconnected_at=~w)~n";
|
|
|
- false -> ")~n"
|
|
|
- end,
|
|
|
+ emqx_ctl:print(
|
|
|
+ "Client(~ts, username=~ts, peername=~ts, clean_start=~ts, "
|
|
|
+ "keepalive=~w, session_expiry_interval=~w, subscriptions=~w, "
|
|
|
+ "inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, "
|
|
|
+ "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w"
|
|
|
+ ++ case maps:is_key(disconnected_at, Info1) of
|
|
|
+ true -> ", disconnected_at=~w)~n";
|
|
|
+ false -> ")~n"
|
|
|
+ end,
|
|
|
[format(K, maps:get(K, Info1)) || K <- InfoKeys]);
|
|
|
|
|
|
print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
|