| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_mgmt_cli).
- -include_lib("emqx/include/emqx.hrl").
- -include_lib("emqx/include/emqx_cm.hrl").
- -include_lib("emqx/include/emqx_router.hrl").
- -include_lib("emqx/include/emqx_mqtt.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include("emqx_mgmt.hrl").
- -define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])).
- -define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
- -export([load/0]).
- -export([
- status/1,
- broker/1,
- cluster/1,
- clients/1,
- topics/1,
- subscriptions/1,
- plugins/1,
- listeners/1,
- vm/1,
- mnesia/1,
- trace/1,
- traces/1,
- log/1,
- authz/1,
- pem_cache/1,
- olp/1,
- data/1
- ]).
- -define(PROC_INFOKEYS, [
- status,
- memory,
- message_queue_len,
- total_heap_size,
- heap_size,
- stack_size,
- reductions
- ]).
- -define(MAX_LIMIT, 10000).
- -define(APP, emqx).
- -spec load() -> ok.
- load() ->
- Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
- lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
- is_cmd(Fun) ->
- not lists:member(Fun, [init, load, module_info]).
- %%--------------------------------------------------------------------
- %% @doc Node status
- status([]) ->
- {InternalStatus, _ProvidedStatus} = init:get_status(),
- emqx_ctl:print("Node ~p ~ts is ~p~n", [node(), emqx_app:get_release(), InternalStatus]);
- status(_) ->
- emqx_ctl:usage("status", "Show broker status").
- %%--------------------------------------------------------------------
- %% @doc Query broker
- broker([]) ->
- Funs = [sysdescr, version, datetime],
- [emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs],
- emqx_ctl:print("~-10s: ~ts~n", [
- uptime, emqx_datetime:human_readable_duration_string(emqx_sys:uptime())
- ]);
- broker(["stats"]) ->
- [
- emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
- || {Stat, Val} <- lists:sort(emqx_stats:getstats())
- ];
- broker(["metrics"]) ->
- [
- emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
- || {Metric, Val} <- lists:sort(emqx_metrics:all())
- ];
- broker(_) ->
- emqx_ctl:usage([
- {"broker", "Show broker version, uptime and description"},
- {"broker stats", "Show broker statistics of clients, topics, subscribers"},
- {"broker metrics", "Show broker metrics"}
- ]).
- %%-----------------------------------------------------------------------------
- %% @doc Cluster with other nodes
- cluster(["join", SNode]) ->
- case ekka:join(ekka_node:parse_name(SNode)) of
- ok ->
- emqx_ctl:print("Join the cluster successfully.~n"),
- cluster(["status"]);
- ignore ->
- emqx_ctl:print("Ignore.~n");
- {error, Error} ->
- emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
- end;
- cluster(["leave"]) ->
- case ekka:leave() of
- ok ->
- emqx_ctl:print("Leave the cluster successfully.~n"),
- cluster(["status"]);
- {error, Error} ->
- emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
- end;
- cluster(["force-leave", SNode]) ->
- case ekka:force_leave(ekka_node:parse_name(SNode)) of
- ok ->
- emqx_ctl:print("Remove the node from cluster successfully.~n"),
- cluster(["status"]);
- ignore ->
- emqx_ctl:print("Ignore.~n");
- {error, Error} ->
- emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
- end;
- cluster(["status"]) ->
- emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]);
- cluster(["status", "--json"]) ->
- Info = sort_map_list_fields(ekka_cluster:info()),
- emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]);
- cluster(_) ->
- emqx_ctl:usage([
- {"cluster join <Node>", "Join the cluster"},
- {"cluster leave", "Leave the cluster"},
- {"cluster force-leave <Node>", "Force the node leave from cluster"},
- {"cluster status [--json]", "Cluster status"}
- ]).
- %% sort lists for deterministic output
- sort_map_list_fields(Map) when is_map(Map) ->
- lists:foldl(
- fun(Field, Acc) ->
- sort_map_list_field(Field, Acc)
- end,
- Map,
- maps:keys(Map)
- );
- sort_map_list_fields(NotMap) ->
- NotMap.
- sort_map_list_field(Field, Map) ->
- case maps:get(Field, Map) of
- [_ | _] = L -> Map#{Field := lists:sort(L)};
- _ -> Map
- end.
- %%--------------------------------------------------------------------
- %% @doc Query clients
- clients(["list"]) ->
- dump(?CHAN_TAB, client);
- clients(["show", ClientId]) ->
- if_client(ClientId, fun print/1);
- clients(["kick", ClientId]) ->
- ok = emqx_cm:kick_session(bin(ClientId)),
- emqx_ctl:print("ok~n");
- clients(_) ->
- emqx_ctl:usage([
- {"clients list", "List all clients"},
- {"clients show <ClientId>", "Show a client"},
- {"clients kick <ClientId>", "Kick out a client"}
- ]).
- if_client(ClientId, Fun) ->
- case ets:lookup(?CHAN_TAB, (bin(ClientId))) of
- [] -> emqx_ctl:print("Not Found.~n");
- [Channel] -> Fun({client, Channel})
- end.
- %%--------------------------------------------------------------------
- %% @doc Topics Command
- topics(["list"]) ->
- dump(?ROUTE_TAB, emqx_topic);
- topics(["show", Topic]) ->
- Routes = ets:lookup(?ROUTE_TAB, bin(Topic)),
- [print({emqx_topic, Route}) || Route <- Routes];
- topics(_) ->
- emqx_ctl:usage([
- {"topics list", "List all topics"},
- {"topics show <Topic>", "Show a topic"}
- ]).
- subscriptions(["list"]) ->
- lists:foreach(
- fun(Suboption) ->
- print({?SUBOPTION, Suboption})
- end,
- ets:tab2list(?SUBOPTION)
- );
- subscriptions(["show", ClientId]) ->
- case ets:lookup(emqx_subid, bin(ClientId)) of
- [] ->
- emqx_ctl:print("Not Found.~n");
- [{_, Pid}] ->
- case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of
- [] -> emqx_ctl:print("Not Found.~n");
- Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption]
- end
- end;
- subscriptions(["add", ClientId, Topic, QoS]) ->
- if_valid_qos(QoS, fun(IntQos) ->
- case ets:lookup(?CHAN_TAB, bin(ClientId)) of
- [] ->
- emqx_ctl:print("Error: Channel not found!");
- [{_, Pid}] ->
- {Topic1, Options} = emqx_topic:parse(bin(Topic)),
- Pid ! {subscribe, [{Topic1, Options#{qos => IntQos}}]},
- emqx_ctl:print("ok~n")
- end
- end);
- subscriptions(["del", ClientId, Topic]) ->
- case ets:lookup(?CHAN_TAB, bin(ClientId)) of
- [] ->
- emqx_ctl:print("Error: Channel not found!");
- [{_, Pid}] ->
- Pid ! {unsubscribe, [emqx_topic:parse(bin(Topic))]},
- emqx_ctl:print("ok~n")
- end;
- subscriptions(_) ->
- emqx_ctl:usage(
- [
- {"subscriptions list", "List all subscriptions"},
- {"subscriptions show <ClientId>", "Show subscriptions of a client"},
- {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
- {"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}
- ]
- ).
- if_valid_qos(QoS, Fun) ->
- try list_to_integer(QoS) of
- Int when ?IS_QOS(Int) -> Fun(Int);
- _ -> emqx_ctl:print("QoS should be 0, 1, 2~n")
- catch
- _:_ ->
- emqx_ctl:print("QoS should be 0, 1, 2~n")
- end.
- plugins(["list"]) ->
- emqx_plugins_cli:list(fun emqx_ctl:print/2);
- plugins(["describe", NameVsn]) ->
- emqx_plugins_cli:describe(NameVsn, fun emqx_ctl:print/2);
- plugins(["install", NameVsn]) ->
- emqx_plugins_cli:ensure_installed(NameVsn, fun emqx_ctl:print/2);
- plugins(["uninstall", NameVsn]) ->
- emqx_plugins_cli:ensure_uninstalled(NameVsn, fun emqx_ctl:print/2);
- plugins(["start", NameVsn]) ->
- emqx_plugins_cli:ensure_started(NameVsn, fun emqx_ctl:print/2);
- plugins(["stop", NameVsn]) ->
- emqx_plugins_cli:ensure_stopped(NameVsn, fun emqx_ctl:print/2);
- plugins(["restart", NameVsn]) ->
- emqx_plugins_cli:restart(NameVsn, fun emqx_ctl:print/2);
- plugins(["disable", NameVsn]) ->
- emqx_plugins_cli:ensure_disabled(NameVsn, fun emqx_ctl:print/2);
- plugins(["enable", NameVsn]) ->
- emqx_plugins_cli:ensure_enabled(NameVsn, no_move, fun emqx_ctl:print/2);
- plugins(["enable", NameVsn, "front"]) ->
- emqx_plugins_cli:ensure_enabled(NameVsn, front, fun emqx_ctl:print/2);
- plugins(["enable", NameVsn, "rear"]) ->
- emqx_plugins_cli:ensure_enabled(NameVsn, rear, fun emqx_ctl:print/2);
- plugins(["enable", NameVsn, "before", Other]) ->
- emqx_plugins_cli:ensure_enabled(NameVsn, {before, Other}, fun emqx_ctl:print/2);
- plugins(_) ->
- emqx_ctl:usage(
- [
- {"plugins <command> [Name-Vsn]", "e.g. 'start emqx_plugin_template-5.0-rc.1'"},
- {"plugins list", "List all installed plugins"},
- {"plugins describe Name-Vsn", "Describe an installed plugins"},
- {"plugins install Name-Vsn",
- "Install a plugin package placed\n"
- "in plugin'sinstall_dir"},
- {"plugins uninstall Name-Vsn",
- "Uninstall a plugin. NOTE: it deletes\n"
- "all files in install_dir/Name-Vsn"},
- {"plugins start Name-Vsn", "Start a plugin"},
- {"plugins stop Name-Vsn", "Stop a plugin"},
- {"plugins restart Name-Vsn", "Stop then start a plugin"},
- {"plugins disable Name-Vsn", "Disable auto-boot"},
- {"plugins enable Name-Vsn [Position]",
- "Enable auto-boot at Position in the boot list, where Position could be\n"
- "'front', 'rear', or 'before Other-Vsn' to specify a relative position.\n"
- "The Position parameter can be used to adjust the boot order.\n"
- "If no Position is given, an already configured plugin\n"
- "will stay at is old position; a newly plugin is appended to the rear\n"
- "e.g. plugins disable foo-0.1.0 front\n"
- " plugins enable bar-0.2.0 before foo-0.1.0"}
- ]
- ).
- %%--------------------------------------------------------------------
- %% @doc vm command
- vm([]) ->
- vm(["all"]);
- vm(["all"]) ->
- [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
- vm(["load"]) ->
- [emqx_ctl:print("cpu/~-20s: ~w~n", [L, V]) || {L, V} <- emqx_vm:loads()];
- vm(["memory"]) ->
- [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
- vm(["process"]) ->
- [
- emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
- || {Name, Key} <- [{limit, process_limit}, {count, process_count}]
- ];
- vm(["io"]) ->
- IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))),
- [
- emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
- || Key <- [max_fds, active_fds]
- ];
- vm(["ports"]) ->
- [
- emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
- || {Name, Key} <- [{count, port_count}, {limit, port_limit}]
- ];
- vm(_) ->
- emqx_ctl:usage([
- {"vm all", "Show info of Erlang VM"},
- {"vm load", "Show load of Erlang VM"},
- {"vm memory", "Show memory of Erlang VM"},
- {"vm process", "Show process of Erlang VM"},
- {"vm io", "Show IO of Erlang VM"},
- {"vm ports", "Show Ports of Erlang VM"}
- ]).
- %%--------------------------------------------------------------------
- %% @doc mnesia Command
- mnesia([]) ->
- mnesia:system_info();
- mnesia(_) ->
- emqx_ctl:usage([{"mnesia", "Mnesia system info"}]).
- %%--------------------------------------------------------------------
- %% @doc Logger Command
- log(["set-level", Level]) ->
- case emqx_utils:safe_to_existing_atom(Level) of
- {ok, Level1} ->
- case emqx_logger:set_log_level(Level1) of
- ok -> emqx_ctl:print("~ts~n", [Level]);
- Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error])
- end;
- _ ->
- emqx_ctl:print("[error] invalid level: ~p~n", [Level])
- end;
- log(["primary-level"]) ->
- Level = emqx_logger:get_primary_log_level(),
- emqx_ctl:print("~ts~n", [Level]);
- log(["primary-level", Level]) ->
- case emqx_utils:safe_to_existing_atom(Level) of
- {ok, Level1} ->
- _ = emqx_logger:set_primary_log_level(Level1),
- ok;
- _ ->
- emqx_ctl:print("[error] invalid level: ~p~n", [Level])
- end,
- emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]);
- log(["handlers", "list"]) ->
- _ = [
- emqx_ctl:print(
- "LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n",
- [Id, Level, Dst, Status]
- )
- || #{
- id := Id,
- level := Level,
- dst := Dst,
- status := Status
- } <- emqx_logger:get_log_handlers()
- ],
- ok;
- log(["handlers", "start", HandlerId]) ->
- case emqx_utils:safe_to_existing_atom(HandlerId) of
- {ok, HandlerId1} ->
- case emqx_logger:start_log_handler(HandlerId1) of
- ok ->
- emqx_ctl:print("log handler ~ts started~n", [HandlerId]);
- {error, Reason} ->
- emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [
- HandlerId, Reason
- ])
- end;
- _ ->
- emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
- end;
- log(["handlers", "stop", HandlerId]) ->
- case emqx_utils:safe_to_existing_atom(HandlerId) of
- {ok, HandlerId1} ->
- case emqx_logger:stop_log_handler(HandlerId1) of
- ok ->
- emqx_ctl:print("log handler ~ts stopped~n", [HandlerId1]);
- {error, Reason} ->
- emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [
- HandlerId1, Reason
- ])
- end;
- _ ->
- emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
- end;
- log(["handlers", "set-level", HandlerId, Level]) ->
- case emqx_utils:safe_to_existing_atom(HandlerId) of
- {ok, HandlerId1} ->
- case emqx_utils:safe_to_existing_atom(Level) of
- {ok, Level1} ->
- case emqx_logger:set_log_handler_level(HandlerId1, Level1) of
- ok ->
- #{level := NewLevel} = emqx_logger:get_log_handler(HandlerId1),
- emqx_ctl:print("~ts~n", [NewLevel]);
- {error, Error} ->
- emqx_ctl:print("[error] ~p~n", [Error])
- end;
- _ ->
- emqx_ctl:print("[error] invalid level:~p~n", [Level])
- end;
- _ ->
- emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
- end;
- log(_) ->
- emqx_ctl:usage(
- [
- {"log set-level <Level>", "Set the overall log level"},
- {"log primary-level", "Show the primary log level now"},
- {"log primary-level <Level>", "Set the primary log level"},
- {"log handlers list", "Show log handlers"},
- {"log handlers start <HandlerId>", "Start a log handler"},
- {"log handlers stop <HandlerId>", "Stop a log handler"},
- {"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}
- ]
- ).
- %%--------------------------------------------------------------------
- %% @doc Trace Command
- trace(["list"]) ->
- lists:foreach(
- fun(Trace) ->
- #{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
- emqx_ctl:print("Trace(~s=~s, level=~s, destination=~0p)~n", [Type, Filter, Level, Dst])
- end,
- emqx_trace_handler:running()
- );
- trace(["stop", Operation, Filter0]) ->
- case trace_type(Operation, Filter0) of
- {ok, Type, Filter} -> trace_off(Type, Filter);
- error -> trace([])
- end;
- trace(["start", Operation, ClientId, LogFile]) ->
- trace(["start", Operation, ClientId, LogFile, "all"]);
- trace(["start", Operation, Filter0, LogFile, Level]) ->
- case trace_type(Operation, Filter0) of
- {ok, Type, Filter} ->
- trace_on(
- name(Filter0),
- Type,
- Filter,
- list_to_existing_atom(Level),
- LogFile
- );
- error ->
- trace([])
- end;
- trace(_) ->
- emqx_ctl:usage([
- {"trace list", "List all traces started on local node"},
- {"trace start client <ClientId> <File> [<Level>]", "Traces for a client on local node"},
- {"trace stop client <ClientId>", "Stop tracing for a client on local node"},
- {"trace start topic <Topic> <File> [<Level>] ", "Traces for a topic on local node"},
- {"trace stop topic <Topic> ", "Stop tracing for a topic on local node"},
- {"trace start ip_address <IP> <File> [<Level>] ",
- "Traces for a client ip on local node"},
- {"trace stop ip_addresss <IP> ", "Stop tracing for a client ip on local node"}
- ]).
- trace_on(Name, Type, Filter, Level, LogFile) ->
- case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of
- ok ->
- emqx_trace:check(),
- emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]);
- {error, Error} ->
- emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error])
- end.
- trace_off(Type, Filter) ->
- ?TRACE("CLI", "trace_stopping", #{Type => Filter}),
- case emqx_trace_handler:uninstall(Type, name(Filter)) of
- ok ->
- emqx_trace:check(),
- emqx_ctl:print("stop tracing ~s ~s successfully~n", [Type, Filter]);
- {error, Error} ->
- emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Type, Filter, Error])
- end.
- %%--------------------------------------------------------------------
- %% @doc Trace Cluster Command
- -define(DEFAULT_TRACE_DURATION, "1800").
- traces(["list"]) ->
- {200, List} = emqx_mgmt_api_trace:trace(get, []),
- case List of
- [] ->
- emqx_ctl:print("Cluster Trace is empty~n", []);
- _ ->
- lists:foreach(
- fun(Trace) ->
- #{
- type := Type,
- name := Name,
- status := Status,
- log_size := LogSize
- } = Trace,
- emqx_ctl:print(
- "Trace(~s: ~s=~s, ~s, LogSize:~0p)~n",
- [Name, Type, maps:get(Type, Trace), Status, LogSize]
- )
- end,
- List
- )
- end,
- length(List);
- traces(["stop", Name]) ->
- trace_cluster_off(Name);
- traces(["delete", Name]) ->
- trace_cluster_del(Name);
- traces(["start", Name, Operation, Filter]) ->
- traces(["start", Name, Operation, Filter, ?DEFAULT_TRACE_DURATION]);
- traces(["start", Name, Operation, Filter0, DurationS]) ->
- case trace_type(Operation, Filter0) of
- {ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
- error -> traces([])
- end;
- traces(_) ->
- emqx_ctl:usage([
- {"traces list", "List all cluster traces started"},
- {"traces start <Name> client <ClientId> [<Duration>]", "Traces for a client in cluster"},
- {"traces start <Name> topic <Topic> [<Duration>]", "Traces for a topic in cluster"},
- {"traces start <Name> ip_address <IPAddr> [<Duration>]",
- "Traces for a client IP in cluster\n"
- "Trace will start immediately on all nodes, including the core and replicant,\n"
- "and will end after <Duration> seconds. The default value for <Duration> is "
- ?DEFAULT_TRACE_DURATION
- " seconds."},
- {"traces stop <Name>", "Stop trace in cluster"},
- {"traces delete <Name>", "Delete trace in cluster"}
- ]).
- trace_cluster_on(Name, Type, Filter, DurationS0) ->
- Now = emqx_trace:now_second(),
- DurationS = list_to_integer(DurationS0),
- Trace = #{
- name => bin(Name),
- type => Type,
- Type => bin(Filter),
- start_at => Now,
- end_at => Now + DurationS
- },
- case emqx_trace:create(Trace) of
- {ok, _} ->
- emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
- {error, Error} ->
- emqx_ctl:print(
- "[error] cluster_trace ~s ~s=~s ~p~n",
- [Name, Type, Filter, Error]
- )
- end.
- trace_cluster_del(Name) ->
- case emqx_trace:delete(bin(Name)) of
- ok -> emqx_ctl:print("Del cluster_trace ~s successfully~n", [Name]);
- {error, Error} -> emqx_ctl:print("[error] Del cluster_trace ~s: ~p~n", [Name, Error])
- end.
- trace_cluster_off(Name) ->
- case emqx_trace:update(bin(Name), false) of
- ok -> emqx_ctl:print("Stop cluster_trace ~s successfully~n", [Name]);
- {error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error])
- end.
- trace_type("client", ClientId) -> {ok, clientid, bin(ClientId)};
- trace_type("topic", Topic) -> {ok, topic, bin(Topic)};
- trace_type("ip_address", IP) -> {ok, ip_address, IP};
- trace_type(_, _) -> error.
- %%--------------------------------------------------------------------
- %% @doc Listeners Command
- listeners([]) ->
- lists:foreach(
- fun({ID, Conf}) ->
- Bind = maps:get(bind, Conf),
- Acceptors = maps:get(acceptors, Conf),
- ProxyProtocol = maps:get(proxy_protocol, Conf, undefined),
- Running = maps:get(running, Conf),
- case Running of
- true ->
- CurrentConns =
- case emqx_listeners:current_conns(ID, Bind) of
- {error, _} -> [];
- CC -> [{current_conn, CC}]
- end,
- MaxConn =
- case emqx_listeners:max_conns(ID, Bind) of
- {error, _} -> [];
- MC -> [{max_conns, MC}]
- end,
- ShutdownCount =
- case emqx_listeners:shutdown_count(ID, Bind) of
- {error, _} -> [];
- SC -> [{shutdown_count, SC}]
- end;
- false ->
- CurrentConns = [],
- MaxConn = [],
- ShutdownCount = []
- end,
- Info =
- [
- {listen_on, {string, emqx_listeners:format_bind(Bind)}},
- {acceptors, Acceptors},
- {proxy_protocol, ProxyProtocol},
- {running, Running}
- ] ++ CurrentConns ++ MaxConn ++ ShutdownCount,
- emqx_ctl:print("~ts~n", [ID]),
- lists:foreach(fun indent_print/1, Info)
- end,
- emqx_listeners:list()
- );
- listeners(["stop", ListenerId]) ->
- case emqx_utils:safe_to_existing_atom(ListenerId) of
- {ok, ListenerId1} ->
- case emqx_listeners:stop_listener(ListenerId1) of
- ok ->
- emqx_ctl:print("Stop ~ts listener successfully.~n", [ListenerId]);
- {error, Error} ->
- emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error])
- end;
- _ ->
- emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
- end;
- listeners(["start", ListenerId]) ->
- case emqx_utils:safe_to_existing_atom(ListenerId) of
- {ok, ListenerId1} ->
- case emqx_listeners:start_listener(ListenerId1) of
- ok ->
- emqx_ctl:print("Started ~ts listener successfully.~n", [ListenerId]);
- {error, Error} ->
- emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error])
- end;
- _ ->
- emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
- end;
- listeners(["restart", ListenerId]) ->
- case emqx_utils:safe_to_existing_atom(ListenerId) of
- {ok, ListenerId1} ->
- case emqx_listeners:restart_listener(ListenerId1) of
- ok ->
- emqx_ctl:print("Restarted ~ts listener successfully.~n", [ListenerId]);
- {error, Error} ->
- emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error])
- end;
- _ ->
- emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
- end;
- listeners(_) ->
- emqx_ctl:usage([
- {"listeners", "List listeners"},
- {"listeners stop <Identifier>", "Stop a listener"},
- {"listeners start <Identifier>", "Start a listener"},
- {"listeners restart <Identifier>", "Restart a listener"}
- ]).
- %%--------------------------------------------------------------------
- %% @doc authz Command
- authz(["cache-clean", "node", Node]) ->
- Msg = io_lib:format("Authorization cache drain started on node ~ts", [Node]),
- with_log(fun() -> for_node(fun emqx_mgmt:clean_authz_cache_all/1, Node) end, Msg);
- authz(["cache-clean", "all"]) ->
- Msg = "Authorization cache drain started on all nodes",
- with_log(fun emqx_mgmt:clean_authz_cache_all/0, Msg);
- authz(["cache-clean", ClientId]) ->
- Msg = io_lib:format("Drain ~ts authz cache", [ClientId]),
- with_log(fun() -> emqx_mgmt:clean_authz_cache(ClientId) end, Msg);
- authz(_) ->
- emqx_ctl:usage(
- [
- {"authz cache-clean all", "Clears authorization cache on all nodes"},
- {"authz cache-clean node <Node>", "Clears authorization cache on given node"},
- {"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
- ]
- ).
- pem_cache(["clean", "all"]) ->
- with_log(fun emqx_mgmt:clean_pem_cache_all/0, "PEM cache clean");
- pem_cache(["clean", "node", Node]) ->
- Msg = io_lib:format("~ts PEM cache clean", [Node]),
- with_log(fun() -> for_node(fun emqx_mgmt:clean_pem_cache_all/1, Node) end, Msg);
- pem_cache(_) ->
- emqx_ctl:usage([
- {"pem_cache clean all", "Clears x509 certificate cache on all nodes"},
- {"pem_cache clean node <Node>", "Clears x509 certificate cache on given node"}
- ]).
- %%--------------------------------------------------------------------
- %% @doc OLP (Overload Protection related)
- olp(["status"]) ->
- S =
- case emqx_olp:is_overloaded() of
- true -> "overloaded";
- false -> "not overloaded"
- end,
- emqx_ctl:print("~p is ~s ~n", [node(), S]);
- olp(["disable"]) ->
- Res = emqx_olp:disable(),
- emqx_ctl:print("Disable overload protetion ~p : ~p ~n", [node(), Res]);
- olp(["enable"]) ->
- Res = emqx_olp:enable(),
- emqx_ctl:print("Enable overload protection ~p : ~p ~n", [node(), Res]);
- olp(_) ->
- emqx_ctl:usage([
- {"olp status", "Return OLP status if system is overloaded"},
- {"olp enable", "Enable overload protection"},
- {"olp disable", "Disable overload protection"}
- ]).
- %%--------------------------------------------------------------------
- %% @doc data Command
- data(["export"]) ->
- case emqx_mgmt_data_backup:export(?DATA_BACKUP_OPTS) of
- {ok, #{filename := Filename}} ->
- emqx_ctl:print("Data has been successfully exported to ~s.~n", [Filename]);
- {error, Reason} ->
- Reason1 = emqx_mgmt_data_backup:format_error(Reason),
- emqx_ctl:print("[error] Data export failed, reason: ~p.~n", [Reason1])
- end;
- data(["import", Filename]) ->
- case emqx_mgmt_data_backup:import(Filename, ?DATA_BACKUP_OPTS) of
- {ok, #{db_errors := DbErrs, config_errors := ConfErrs}} when
- map_size(DbErrs) =:= 0, map_size(ConfErrs) =:= 0
- ->
- emqx_ctl:print("Data has been imported successfully.~n");
- {ok, _} ->
- emqx_ctl:print(
- "Data has been imported, but some errors occurred, see the the log above.~n"
- );
- {error, Reason} ->
- Reason1 = emqx_mgmt_data_backup:format_error(Reason),
- emqx_ctl:print("[error] Data import failed, reason: ~p.~n", [Reason1])
- end;
- data(_) ->
- emqx_ctl:usage([
- {"data import <File>", "Import data from the specified tar archive file"},
- {"data export", "Export data"}
- ]).
- %%--------------------------------------------------------------------
- %% Dump ETS
- %%--------------------------------------------------------------------
- dump(Table, Tag) ->
- dump(Table, Tag, ets:first(Table), []).
- dump(_Table, _, '$end_of_table', Result) ->
- lists:reverse(Result);
- dump(Table, Tag, Key, Result) ->
- PrintValue = [print({Tag, Record}) || Record <- ets:lookup(Table, Key)],
- dump(Table, Tag, ets:next(Table, Key), [PrintValue | Result]).
- print({_, []}) ->
- ok;
- print({client, {ClientId, ChanPid}}) ->
- Attrs =
- case emqx_cm:get_chan_info(ClientId, ChanPid) of
- undefined -> #{};
- Attrs0 -> Attrs0
- end,
- Stats =
- case emqx_cm:get_chan_stats(ClientId, ChanPid) of
- undefined -> #{};
- Stats0 -> maps:from_list(Stats0)
- end,
- ClientInfo = maps:get(clientinfo, Attrs, #{}),
- ConnInfo = maps:get(conninfo, Attrs, #{}),
- Session = maps:get(session, Attrs, #{}),
- Connected =
- case maps:get(conn_state, Attrs) of
- connected -> true;
- _ -> false
- end,
- Info = lists:foldl(
- fun(Items, Acc) ->
- maps:merge(Items, Acc)
- end,
- #{connected => Connected},
- [
- maps:with(
- [
- subscriptions_cnt,
- inflight_cnt,
- awaiting_rel_cnt,
- mqueue_len,
- mqueue_dropped,
- send_msg
- ],
- Stats
- ),
- maps:with([clientid, username], ClientInfo),
- maps:with(
- [
- peername,
- clean_start,
- keepalive,
- expiry_interval,
- connected_at,
- disconnected_at
- ],
- ConnInfo
- ),
- maps:with([created_at], Session)
- ]
- ),
- InfoKeys =
- [
- clientid,
- username,
- peername,
- clean_start,
- keepalive,
- expiry_interval,
- subscriptions_cnt,
- inflight_cnt,
- awaiting_rel_cnt,
- send_msg,
- mqueue_len,
- mqueue_dropped,
- connected,
- created_at,
- connected_at
- ] ++
- case maps:is_key(disconnected_at, Info) of
- true -> [disconnected_at];
- false -> []
- end,
- Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
- emqx_ctl:print(
- "Client(~ts, username=~ts, peername=~ts, clean_start=~ts, "
- "keepalive=~w, session_expiry_interval=~w, subscriptions=~w, "
- "inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, "
- "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" ++
- case maps:is_key(disconnected_at, Info1) of
- true -> ", disconnected_at=~w)~n";
- false -> ")~n"
- end,
- [format(K, maps:get(K, Info1)) || K <- InfoKeys]
- );
- print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) ->
- emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
- print({emqx_topic, #route{topic = Topic, dest = Node}}) ->
- emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
- print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
- SubId = maps:get(subid, Options),
- QoS = maps:get(qos, Options, 0),
- NL = maps:get(nl, Options, 0),
- RH = maps:get(rh, Options, 0),
- RAP = maps:get(rap, Options, 0),
- emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]).
- format(_, undefined) ->
- undefined;
- format(peername, {IPAddr, Port}) ->
- IPStr = emqx_mgmt_util:ntoa(IPAddr),
- io_lib:format("~ts:~p", [IPStr, Port]);
- format(_, Val) ->
- Val.
- bin(S) -> iolist_to_binary(S).
- indent_print({Key, {string, Val}}) ->
- emqx_ctl:print(" ~-16s: ~ts~n", [Key, Val]);
- indent_print({Key, Val}) ->
- emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
- name(Filter) ->
- iolist_to_binary(["CLI-", Filter]).
- for_node(Fun, Node) ->
- try list_to_existing_atom(Node) of
- NodeAtom ->
- Fun(NodeAtom)
- catch
- error:badarg ->
- {error, unknown_node}
- end.
- with_log(Fun, Msg) ->
- case Fun() of
- ok ->
- emqx_ctl:print("~s OK~n", [Msg]);
- {error, Reason} ->
- emqx_ctl:print("~s FAILED~n~p~n", [Msg, Reason])
- end.
|