|
|
@@ -120,6 +120,7 @@
|
|
|
|
|
|
-define(APP, emqx_management).
|
|
|
|
|
|
+-elvis([{elvis_style, god_modules, disable}]).
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Node Info
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -142,7 +143,9 @@ node_info(Node) when Node =:= node() ->
|
|
|
memory_used => proplists:get_value(used, Memory),
|
|
|
process_available => erlang:system_info(process_limit),
|
|
|
process_used => erlang:system_info(process_count),
|
|
|
- max_fds => proplists:get_value(max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))),
|
|
|
+ max_fds =>
|
|
|
+ proplists:get_value( max_fds
|
|
|
+ , lists:usort(lists:flatten(erlang:system_info(check_io)))),
|
|
|
connections => ets:info(emqx_channel, size),
|
|
|
node_status => 'Running',
|
|
|
uptime => iolist_to_binary(proplists:get_value(uptime, BrokerInfo)),
|
|
|
@@ -196,10 +199,12 @@ get_stats(Node) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
lookup_client({clientid, ClientId}, FormatFun) ->
|
|
|
- lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- ekka_mnesia:running_nodes()]);
|
|
|
+ lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun)
|
|
|
+ || Node <- ekka_mnesia:running_nodes()]);
|
|
|
|
|
|
lookup_client({username, Username}, FormatFun) ->
|
|
|
- lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
|
|
|
+ lists:append([lookup_client(Node, {username, Username}, FormatFun)
|
|
|
+ || Node <- ekka_mnesia:running_nodes()]).
|
|
|
|
|
|
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
|
|
|
lists:append(lists:map(
|
|
|
@@ -222,10 +227,7 @@ lookup_client(Node, {username, Username}, FormatFun) ->
|
|
|
|
|
|
kickout_client(ClientId) ->
|
|
|
Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
|
|
- case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
|
|
- true -> ok;
|
|
|
- false -> lists:last(Results)
|
|
|
- end.
|
|
|
+ check_every_ok(Results).
|
|
|
|
|
|
kickout_client(Node, ClientId) when Node =:= node() ->
|
|
|
emqx_cm:kick_session(ClientId);
|
|
|
@@ -238,10 +240,7 @@ list_acl_cache(ClientId) ->
|
|
|
|
|
|
clean_acl_cache(ClientId) ->
|
|
|
Results = [clean_acl_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
|
|
- case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
|
|
- true -> ok;
|
|
|
- false -> lists:last(Results)
|
|
|
- end.
|
|
|
+ check_every_ok(Results).
|
|
|
|
|
|
clean_acl_cache(Node, ClientId) when Node =:= node() ->
|
|
|
case emqx_cm:lookup_channels(ClientId) of
|
|
|
@@ -281,7 +280,7 @@ call_client(ClientId, Req) ->
|
|
|
end, Results),
|
|
|
case Expected of
|
|
|
[] -> {error, not_found};
|
|
|
- [Result|_] -> Result
|
|
|
+ [Result | _] -> Result
|
|
|
end.
|
|
|
|
|
|
%% @private
|
|
|
@@ -292,7 +291,7 @@ call_client(Node, ClientId, Req) when Node =:= node() ->
|
|
|
Pid = lists:last(Pids),
|
|
|
case emqx_cm:get_chan_info(ClientId, Pid) of
|
|
|
#{conninfo := #{conn_mod := ConnMod}} ->
|
|
|
- ConnMod:call(Pid, Req);
|
|
|
+ erlang:apply(ConnMod, call, [Pid, Req]);
|
|
|
undefined -> {error, not_found}
|
|
|
end
|
|
|
end;
|
|
|
@@ -313,11 +312,12 @@ list_subscriptions(Node) ->
|
|
|
rpc_call(Node, list_subscriptions, [Node]).
|
|
|
|
|
|
list_subscriptions_via_topic(Topic, FormatFun) ->
|
|
|
- lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
|
|
|
+ lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|
|
|
+ || Node <- ekka_mnesia:running_nodes()]).
|
|
|
|
|
|
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
|
|
|
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
|
|
|
- M:F(ets:select(emqx_suboption, MatchSpec));
|
|
|
+ erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
|
|
|
|
|
|
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
|
|
|
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
|
|
|
@@ -436,7 +436,8 @@ list_listeners(Node) when Node =:= node() ->
|
|
|
Http = lists:map(fun({Protocol, Opts}) ->
|
|
|
#{protocol => Protocol,
|
|
|
listen_on => proplists:get_value(port, Opts),
|
|
|
- acceptors => maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0),
|
|
|
+ acceptors => maps:get( num_acceptors
|
|
|
+ , proplists:get_value(transport_options, Opts, #{}), 0),
|
|
|
max_conns => proplists:get_value(max_connections, Opts),
|
|
|
current_conns => proplists:get_value(all_connections, Opts),
|
|
|
shutdown_count => []}
|
|
|
@@ -483,9 +484,12 @@ add_duration_field(Alarms) ->
|
|
|
|
|
|
add_duration_field([], _Now, Acc) ->
|
|
|
Acc;
|
|
|
-add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt}| Rest], Now, Acc) ->
|
|
|
+add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) ->
|
|
|
add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]);
|
|
|
-add_duration_field([Alarm = #{activated := false, activate_at := ActivateAt, deactivate_at := DeactivateAt}| Rest], Now, Acc) ->
|
|
|
+add_duration_field([Alarm = #{ activated := false
|
|
|
+ , activate_at := ActivateAt
|
|
|
+ , deactivate_at := DeactivateAt}
|
|
|
+ | Rest], Now, Acc) ->
|
|
|
add_duration_field(Rest, Now, [Alarm#{duration => DeactivateAt - ActivateAt} | Acc]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -560,12 +564,18 @@ check_row_limit(Tables) ->
|
|
|
|
|
|
check_row_limit([], _Limit) ->
|
|
|
ok;
|
|
|
-check_row_limit([Tab|Tables], Limit) ->
|
|
|
+check_row_limit([Tab | Tables], Limit) ->
|
|
|
case table_size(Tab) > Limit of
|
|
|
true -> false;
|
|
|
false -> check_row_limit(Tables, Limit)
|
|
|
end.
|
|
|
|
|
|
+check_every_ok(Results) ->
|
|
|
+ case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
|
|
+ true -> ok;
|
|
|
+ false -> lists:last(Results)
|
|
|
+ end.
|
|
|
+
|
|
|
max_row_limit() ->
|
|
|
application:get_env(?APP, max_row_limit, ?MAX_ROW_LIMIT).
|
|
|
|