Просмотр исходного кода

refactor(emqx_mgmt): add call_client timeout and improve RPC error handling

Serge Tupchii 1 год назад
Родитель
Сommit
65be76aa06

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -39,6 +39,7 @@
 {emqx_management,2}.
 {emqx_management,3}.
 {emqx_management,4}.
+{emqx_management,5}.
 {emqx_metrics,1}.
 {emqx_mgmt_api_plugins,1}.
 {emqx_mgmt_api_plugins,2}.

+ 34 - 16
apps/emqx_management/src/emqx_mgmt.erl

@@ -18,6 +18,7 @@
 
 -include("emqx_mgmt.hrl").
 -include_lib("emqx/include/emqx_cm.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -elvis([{elvis_style, invalid_dynamic_call, disable}]).
 -elvis([{elvis_style, god_modules, disable}]).
@@ -117,6 +118,13 @@
 
 -elvis([{elvis_style, god_modules, disable}]).
 
+-define(maybe_log_node_errors(LogData, Errors),
+    case Errors of
+        [] -> ok;
+        _ -> ?SLOG(error, (LogData)#{node_errors => Errors})
+    end
+).
+
 %%--------------------------------------------------------------------
 %% Node Info
 %%--------------------------------------------------------------------
@@ -185,7 +193,7 @@ get_sys_memory() ->
     end.
 
 node_info(Nodes) ->
-    emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)).
+    emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)).
 
 stopped_node_info(Node) ->
     {Node, #{node => Node, node_status => 'stopped', role => core}}.
@@ -248,7 +256,7 @@ convert_broker_info({K, V}, M) ->
     M#{K => iolist_to_binary(V)}.
 
 broker_info(Nodes) ->
-    emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)).
+    emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)).
 
 %%--------------------------------------------------------------------
 %% Metrics and Stats
@@ -361,7 +369,7 @@ kickout_client(Node, ClientId) ->
 
 kickout_clients(ClientIds) when is_list(ClientIds) ->
     F = fun(Node) ->
-        emqx_management_proto_v4:kickout_clients(Node, ClientIds)
+        emqx_management_proto_v5:kickout_clients(Node, ClientIds)
     end,
     Results = lists:map(F, emqx:running_nodes()),
     case lists:filter(fun(Res) -> Res =/= ok end, Results) of
@@ -469,17 +477,26 @@ call_client(ClientId, Req) ->
     end.
 
 call_client_on_all_nodes(ClientId, Req) ->
-    Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()],
-    Expected = lists:filter(
+    Nodes = emqx:running_nodes(),
+    Results = call_client(Nodes, ClientId, Req),
+    {Expected, Errs} = lists:foldr(
         fun
-            ({error, _}) -> false;
-            (_) -> true
+            ({_N, {error, not_found}}, Acc) -> Acc;
+            ({_N, {error, _}} = Err, {OkAcc, ErrAcc}) -> {OkAcc, [Err | ErrAcc]};
+            ({_N, OkRes}, {OkAcc, ErrAcc}) -> {[OkRes | OkAcc], ErrAcc}
         end,
-        Results
+        {[], []},
+        lists:zip(Nodes, Results)
     ),
+    ?maybe_log_node_errors(#{msg => "call_client_failed", request => Req}, Errs),
     case Expected of
-        [] -> {error, not_found};
-        [Result | _] -> Result
+        [] ->
+            case Errs of
+                [] -> {error, not_found};
+                [{_Node, FirstErr} | _] -> FirstErr
+            end;
+        [Result | _] ->
+            Result
     end.
 
 %% @private
@@ -499,8 +516,8 @@ do_call_client(ClientId, Req) ->
     end.
 
 %% @private
-call_client(Node, ClientId, Req) ->
-    unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)).
+call_client(Nodes, ClientId, Req) ->
+    emqx_rpc:unwrap_erpc(emqx_management_proto_v5:call_client(Nodes, ClientId, Req)).
 
 %%--------------------------------------------------------------------
 %% Subscriptions
@@ -513,7 +530,7 @@ do_list_subscriptions() ->
     throw(not_implemented).
 
 list_subscriptions(Node) ->
-    unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)).
+    unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)).
 
 list_subscriptions_via_topic(Topic, FormatFun) ->
     lists:append([
@@ -535,7 +552,7 @@ subscribe(ClientId, TopicTables) ->
     subscribe(emqx:running_nodes(), ClientId, TopicTables).
 
 subscribe([Node | Nodes], ClientId, TopicTables) ->
-    case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of
+    case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of
         {error, _} -> subscribe(Nodes, ClientId, TopicTables);
         {subscribe, Res} -> {subscribe, Res, Node}
     end;
@@ -562,7 +579,7 @@ unsubscribe(ClientId, Topic) ->
 -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
     {unsubscribe, _} | {error, channel_not_found}.
 unsubscribe([Node | Nodes], ClientId, Topic) ->
-    case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of
+    case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of
         {error, _} -> unsubscribe(Nodes, ClientId, Topic);
         Re -> Re
     end;
@@ -585,7 +602,7 @@ unsubscribe_batch(ClientId, Topics) ->
 -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
     {unsubscribe_batch, _} | {error, channel_not_found}.
 unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
-    case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of
+    case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of
         {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
         Re -> Re
     end;
@@ -664,6 +681,7 @@ lookup_running_client(ClientId, FormatFun) ->
 %%--------------------------------------------------------------------
 %% Internal Functions.
 %%--------------------------------------------------------------------
+
 unwrap_rpc({badrpc, Reason}) ->
     {error, Reason};
 unwrap_rpc(Res) ->

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_configs.erl

@@ -407,7 +407,7 @@ get_configs_v1(QueryStr) ->
     Node = maps:get(<<"node">>, QueryStr, node()),
     case
         lists:member(Node, emqx:running_nodes()) andalso
-            emqx_management_proto_v4:get_full_config(Node)
+            emqx_management_proto_v5:get_full_config(Node)
     of
         false ->
             Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -516,7 +516,7 @@ list_listeners() ->
     lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
 
 list_listeners(Node) ->
-    wrap_rpc(emqx_management_proto_v4:list_listeners(Node)).
+    wrap_rpc(emqx_management_proto_v5:list_listeners(Node)).
 
 listener_status_by_id(NodeL) ->
     Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),

+ 86 - 0
apps/emqx_management/src/proto/emqx_management_proto_v5.erl

@@ -0,0 +1,86 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_management_proto_v5).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    node_info/1,
+    broker_info/1,
+    list_subscriptions/1,
+
+    list_listeners/1,
+    subscribe/3,
+    unsubscribe/3,
+    unsubscribe_batch/3,
+
+    call_client/3,
+
+    get_full_config/1,
+
+    kickout_clients/2
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.6.0".
+
+-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) ->
+    {unsubscribe, _} | {error, _} | {badrpc, _}.
+unsubscribe_batch(Node, ClientId, Topics) ->
+    rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]).
+
+-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()).
+node_info(Nodes) ->
+    erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000).
+
+-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()).
+broker_info(Nodes) ->
+    erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000).
+
+-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
+list_subscriptions(Node) ->
+    rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
+
+-spec list_listeners(node()) -> map() | {badrpc, _}.
+list_listeners(Node) ->
+    rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
+
+-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
+    {subscribe, _} | {error, atom()} | {badrpc, _}.
+subscribe(Node, ClientId, TopicTables) ->
+    rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
+
+-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
+    {unsubscribe, _} | {error, _} | {badrpc, _}.
+unsubscribe(Node, ClientId, Topic) ->
+    rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
+
+-spec call_client([node()], emqx_types:clientid(), term()) -> emqx_rpc:erpc_multicall(term()).
+call_client(Nodes, ClientId, Req) ->
+    erpc:multicall(Nodes, emqx_mgmt, do_call_client, [ClientId, Req], 30000).
+
+-spec get_full_config(node()) -> map() | list() | {badrpc, _}.
+get_full_config(Node) ->
+    rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).
+
+-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}.
+kickout_clients(Node, ClientIds) ->
+    rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]).

+ 61 - 13
apps/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -461,8 +461,7 @@ t_persist_list_subs(_) ->
 
 t_call_client_cluster(Config) ->
     [Node1, Node2] = ?config(cluster, Config),
-    {ok, Node1Client, Node1ClientId} = connect_client(Node1),
-    {ok, Node2Client, Node2ClientId} = connect_client(Node2),
+    [Node1ClientId, Node2ClientId] = ?config(client_ids, Config),
     ?assertMatch(
         {[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId))
     ),
@@ -475,21 +474,67 @@ t_call_client_cluster(Config) ->
     ?assertMatch(
         {[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId))
     ),
-    _ = emqtt:stop(Node1Client),
-    _ = emqtt:stop(Node2Client).
+
+    case proplists:get_value(name, ?config(tc_group_properties, Config)) of
+        cm_registry_disabled ->
+            %% Simulating crashes that must be handled by erpc multicall
+            ?assertMatch(
+                {error, _},
+                rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node2ClientId))
+            ),
+            ?assertMatch(
+                {error, _},
+                rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node1ClientId))
+            );
+        cm_registry_enabled ->
+            %% Direct call to remote pid is expected to crash
+            ?assertMatch(
+                {badrpc, {'EXIT', _}},
+                rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node1ClientId))
+            ),
+            ?assertMatch(
+                {badrpc, {'EXIT', _}},
+                rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_bad_args(Node2ClientId))
+            );
+        _ ->
+            ok
+    end,
+
+    NotFoundClientId = <<"no_such_client_id">>,
+    ?assertEqual(
+        {error, not_found},
+        rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(NotFoundClientId))
+    ),
+    ?assertEqual(
+        {error, not_found},
+        rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(NotFoundClientId))
+    ).
 
 t_call_client_cluster(init, Config) ->
     Apps = [{emqx, ?config(emqx_config, Config)}, emqx_management],
-    Cluster = emqx_cth_cluster:start(
-        [
-            {list_to_atom(atom_to_list(?MODULE) ++ "1"), #{role => core, apps => Apps}},
-            {list_to_atom(atom_to_list(?MODULE) ++ "2"), #{role => core, apps => Apps}}
-        ],
-        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
-    ),
-    [{cluster, Cluster} | Config];
+    [Node1, Node2] =
+        Cluster = emqx_cth_cluster:start(
+            [
+                {list_to_atom(atom_to_list(?MODULE) ++ "1"), #{role => core, apps => Apps}},
+                {list_to_atom(atom_to_list(?MODULE) ++ "2"), #{role => core, apps => Apps}}
+            ],
+            #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+        ),
+    {ok, Node1Client, Node1ClientId} = connect_client(Node1),
+    {ok, Node2Client, Node2ClientId} = connect_client(Node2),
+    %% They may exit during the test due to simulated crashes
+    unlink(Node1Client),
+    unlink(Node2Client),
+    [
+        {cluster, Cluster},
+        {client_ids, [Node1ClientId, Node2ClientId]},
+        {client_pids, [Node1Client, Node2Client]}
+        | Config
+    ];
 t_call_client_cluster('end', Config) ->
-    emqx_cth_cluster:stop(?config(cluster, Config)).
+    emqx_cth_cluster:stop(?config(cluster, Config)),
+    [exit(ClientPid, kill) || ClientPid <- ?config(client_pids, Config)],
+    ok.
 
 %%% helpers
 ident(Arg) ->
@@ -524,3 +569,6 @@ connect_client(Node) ->
 
 client_msgs_args(ClientId) ->
     [mqueue_msgs, ClientId, #{limit => 10, continuation => none}].
+
+client_msgs_bad_args(ClientId) ->
+    [mqueue_msgs, ClientId, "bad_page_params"].

+ 2 - 2
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl

@@ -287,12 +287,12 @@ t_configs_node({'init', Config}) ->
         (other_node, _) -> <<"log=2">>;
         (bad_node, _) -> {badrpc, bad}
     end,
-    meck:expect(emqx_management_proto_v4, get_full_config, F),
+    meck:expect(emqx_management_proto_v5, get_full_config, F),
     meck:expect(emqx_conf_proto_v3, get_hocon_config, F2),
     meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end),
     Config;
 t_configs_node({'end', _}) ->
-    meck:unload([emqx, emqx_management_proto_v4, emqx_conf_proto_v3, hocon_pp]);
+    meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v3, hocon_pp]);
 t_configs_node(_) ->
     Node = atom_to_list(node()),