|
|
@@ -118,9 +118,11 @@ list_nodes() ->
|
|
|
Running = mria_mnesia:cluster_nodes(running),
|
|
|
Stopped = mria_mnesia:cluster_nodes(stopped),
|
|
|
DownNodes = lists:map(fun stopped_node_info/1, Stopped),
|
|
|
- [{Node, node_info(Node)} || Node <- Running] ++ DownNodes.
|
|
|
+ [{Node, Info} || #{node := Node} = Info <- node_info(Running)] ++ DownNodes.
|
|
|
|
|
|
-lookup_node(Node) -> node_info(Node).
|
|
|
+lookup_node(Node) ->
|
|
|
+ [Info] = node_info([Node]),
|
|
|
+ Info.
|
|
|
|
|
|
node_info() ->
|
|
|
{UsedRatio, Total} = get_sys_memory(),
|
|
|
@@ -152,8 +154,8 @@ get_sys_memory() ->
|
|
|
{0, 0}
|
|
|
end.
|
|
|
|
|
|
-node_info(Node) ->
|
|
|
- wrap_rpc(emqx_management_proto_v2:node_info(Node)).
|
|
|
+node_info(Nodes) ->
|
|
|
+ emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)).
|
|
|
|
|
|
stopped_node_info(Node) ->
|
|
|
#{name => Node, node_status => 'stopped'}.
|
|
|
@@ -163,17 +165,19 @@ stopped_node_info(Node) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
list_brokers() ->
|
|
|
- [{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()].
|
|
|
+ Running = mria_mnesia:running_nodes(),
|
|
|
+ [{Node, Broker} || #{node := Node} = Broker <- broker_info(Running)].
|
|
|
|
|
|
lookup_broker(Node) ->
|
|
|
- broker_info(Node).
|
|
|
+ [Broker] = broker_info([Node]),
|
|
|
+ Broker.
|
|
|
|
|
|
broker_info() ->
|
|
|
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
|
|
|
Info#{node => node(), otp_release => otp_rel(), node_status => 'Running'}.
|
|
|
|
|
|
-broker_info(Node) ->
|
|
|
- wrap_rpc(emqx_management_proto_v2:broker_info(Node)).
|
|
|
+broker_info(Nodes) ->
|
|
|
+ emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Metrics and Stats
|
|
|
@@ -183,7 +187,7 @@ get_metrics() ->
|
|
|
nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
|
|
|
|
|
|
get_metrics(Node) ->
|
|
|
- wrap_rpc(emqx_proto_v1:get_metrics(Node)).
|
|
|
+ unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
|
|
|
|
|
|
get_stats() ->
|
|
|
GlobalStatsKeys =
|
|
|
@@ -211,7 +215,7 @@ delete_keys(List, [Key | Keys]) ->
|
|
|
delete_keys(proplists:delete(Key, List), Keys).
|
|
|
|
|
|
get_stats(Node) ->
|
|
|
- wrap_rpc(emqx_proto_v1:get_stats(Node)).
|
|
|
+ unwrap_rpc(emqx_proto_v1:get_stats(Node)).
|
|
|
|
|
|
nodes_info_count(PropList) ->
|
|
|
NodeCount =
|
|
|
@@ -241,7 +245,7 @@ lookup_client({username, Username}, FormatFun) ->
|
|
|
]).
|
|
|
|
|
|
lookup_client(Node, Key, {M, F}) ->
|
|
|
- case wrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of
|
|
|
+ case unwrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of
|
|
|
{error, Err} ->
|
|
|
{error, Err};
|
|
|
L ->
|
|
|
@@ -264,7 +268,7 @@ kickout_client({ClientID, FormatFun}) ->
|
|
|
end.
|
|
|
|
|
|
kickout_client(Node, ClientId) ->
|
|
|
- wrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
|
|
|
+ unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
|
|
|
|
|
|
list_authz_cache(ClientId) ->
|
|
|
call_client(ClientId, list_authz_cache).
|
|
|
@@ -284,14 +288,14 @@ list_client_subscriptions(ClientId) ->
|
|
|
end.
|
|
|
|
|
|
client_subscriptions(Node, ClientId) ->
|
|
|
- {Node, wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
|
|
|
+ {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
|
|
|
|
|
|
clean_authz_cache(ClientId) ->
|
|
|
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
|
|
|
check_results(Results).
|
|
|
|
|
|
clean_authz_cache(Node, ClientId) ->
|
|
|
- wrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)).
|
|
|
+ unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)).
|
|
|
|
|
|
clean_authz_cache_all() ->
|
|
|
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
|
|
|
@@ -308,10 +312,10 @@ wrap_results(Results) ->
|
|
|
end.
|
|
|
|
|
|
clean_authz_cache_all(Node) ->
|
|
|
- wrap_rpc(emqx_proto_v1:clean_authz_cache(Node)).
|
|
|
+ unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node)).
|
|
|
|
|
|
clean_pem_cache_all(Node) ->
|
|
|
- wrap_rpc(emqx_proto_v1:clean_pem_cache(Node)).
|
|
|
+ unwrap_rpc(emqx_proto_v1:clean_pem_cache(Node)).
|
|
|
|
|
|
set_ratelimit_policy(ClientId, Policy) ->
|
|
|
call_client(ClientId, {ratelimit, Policy}).
|
|
|
@@ -357,7 +361,7 @@ do_call_client(ClientId, Req) ->
|
|
|
|
|
|
%% @private
|
|
|
call_client(Node, ClientId, Req) ->
|
|
|
- wrap_rpc(emqx_management_proto_v2:call_client(Node, ClientId, Req)).
|
|
|
+ unwrap_rpc(emqx_management_proto_v3:call_client(Node, ClientId, Req)).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Subscriptions
|
|
|
@@ -376,7 +380,7 @@ do_list_subscriptions() ->
|
|
|
end.
|
|
|
|
|
|
list_subscriptions(Node) ->
|
|
|
- wrap_rpc(emqx_management_proto_v2:list_subscriptions(Node)).
|
|
|
+ unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)).
|
|
|
|
|
|
list_subscriptions_via_topic(Topic, FormatFun) ->
|
|
|
lists:append([
|
|
|
@@ -385,7 +389,7 @@ list_subscriptions_via_topic(Topic, FormatFun) ->
|
|
|
]).
|
|
|
|
|
|
list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
|
|
|
- case wrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of
|
|
|
+ case unwrap_rpc(emqx_broker_proto_v1:list_subscriptions_via_topic(Node, Topic)) of
|
|
|
{error, Reason} -> {error, Reason};
|
|
|
Result -> M:F(Result)
|
|
|
end.
|
|
|
@@ -394,7 +398,7 @@ lookup_subscriptions(ClientId) ->
|
|
|
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
|
|
|
|
|
|
lookup_subscriptions(Node, ClientId) ->
|
|
|
- wrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)).
|
|
|
+ unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% PubSub
|
|
|
@@ -404,7 +408,7 @@ subscribe(ClientId, TopicTables) ->
|
|
|
subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
|
|
|
|
|
|
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
|
|
- case wrap_rpc(emqx_management_proto_v2:subscribe(Node, ClientId, TopicTables)) of
|
|
|
+ case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of
|
|
|
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
|
|
{subscribe, Res} -> {subscribe, Res, Node}
|
|
|
end;
|
|
|
@@ -431,7 +435,7 @@ unsubscribe(ClientId, Topic) ->
|
|
|
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
|
|
|
{unsubscribe, _} | {error, channel_not_found}.
|
|
|
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
|
|
- case wrap_rpc(emqx_management_proto_v2:unsubscribe(Node, ClientId, Topic)) of
|
|
|
+ case unwrap_rpc(emqx_management_proto_v3:unsubscribe(Node, ClientId, Topic)) of
|
|
|
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
|
|
Re -> Re
|
|
|
end;
|
|
|
@@ -454,7 +458,7 @@ unsubscribe_batch(ClientId, Topics) ->
|
|
|
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
|
|
|
{unsubscribe_batch, _} | {error, channel_not_found}.
|
|
|
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
|
|
|
- case wrap_rpc(emqx_management_proto_v2:unsubscribe_batch(Node, ClientId, Topics)) of
|
|
|
+ case unwrap_rpc(emqx_management_proto_v3:unsubscribe_batch(Node, ClientId, Topics)) of
|
|
|
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
|
|
|
Re -> Re
|
|
|
end;
|
|
|
@@ -477,16 +481,16 @@ get_alarms(Type) ->
|
|
|
[{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
|
|
|
|
|
|
get_alarms(Node, Type) ->
|
|
|
- add_duration_field(wrap_rpc(emqx_proto_v1:get_alarms(Node, Type))).
|
|
|
+ add_duration_field(unwrap_rpc(emqx_proto_v1:get_alarms(Node, Type))).
|
|
|
|
|
|
deactivate(Node, Name) ->
|
|
|
- wrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)).
|
|
|
+ unwrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)).
|
|
|
|
|
|
delete_all_deactivated_alarms() ->
|
|
|
[delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
|
|
|
|
|
|
delete_all_deactivated_alarms(Node) ->
|
|
|
- wrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)).
|
|
|
+ unwrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)).
|
|
|
|
|
|
add_duration_field(Alarms) ->
|
|
|
Now = erlang:system_time(microsecond),
|
|
|
@@ -523,10 +527,9 @@ delete_banned(Who) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal Functions.
|
|
|
%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-wrap_rpc({badrpc, Reason}) ->
|
|
|
+unwrap_rpc({badrpc, Reason}) ->
|
|
|
{error, Reason};
|
|
|
-wrap_rpc(Res) ->
|
|
|
+unwrap_rpc(Res) ->
|
|
|
Res.
|
|
|
|
|
|
otp_rel() ->
|
|
|
@@ -546,7 +549,7 @@ check_row_limit([Tab | Tables], Limit) ->
|
|
|
check_results(Results) ->
|
|
|
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
|
|
true -> ok;
|
|
|
- false -> wrap_rpc(lists:last(Results))
|
|
|
+ false -> unwrap_rpc(lists:last(Results))
|
|
|
end.
|
|
|
|
|
|
max_row_limit() ->
|