Forráskód Böngészése

refactor(emqx_mgmt): avoid call_client RPC to all runing nodes if global cm_registry is enabled

Serge Tupchii 2 éve
szülő
commit
8be02327b2

+ 8 - 0
apps/emqx_management/src/emqx_mgmt.erl

@@ -461,6 +461,14 @@ set_keepalive(_ClientId, _Interval) ->
 
 %% @private
 call_client(ClientId, Req) ->
+    case emqx_cm_registry:is_enabled() of
+        true ->
+            do_call_client(ClientId, Req);
+        false ->
+            call_client_on_all_nodes(ClientId, Req)
+    end.
+
+call_client_on_all_nodes(ClientId, Req) ->
     Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()],
     Expected = lists:filter(
         fun

+ 67 - 5
apps/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -28,14 +28,19 @@
 all() ->
     [
         {group, persistence_disabled},
-        {group, persistence_enabled}
+        {group, persistence_enabled},
+        {group, cm_registry_enabled},
+        {group, cm_registry_disabled}
     ].
 
 groups() ->
-    TCs = emqx_common_test_helpers:all(?MODULE),
+    CMRegistryTCs = [t_call_client_cluster],
+    TCs = emqx_common_test_helpers:all(?MODULE) -- CMRegistryTCs,
     [
         {persistence_disabled, [], TCs},
-        {persistence_enabled, [], [t_persist_list_subs]}
+        {persistence_enabled, [], [t_persist_list_subs]},
+        {cm_registry_enabled, CMRegistryTCs},
+        {cm_registry_disabled, CMRegistryTCs}
     ].
 
 init_per_group(persistence_disabled, Config) ->
@@ -66,10 +71,17 @@ init_per_group(persistence_enabled, Config) ->
     [
         {apps, Apps}
         | Config
-    ].
+    ];
+init_per_group(cm_registry_enabled, Config) ->
+    [{emqx_config, "broker.enable_session_registry = true"} | Config];
+init_per_group(cm_registry_disabled, Config) ->
+    [{emqx_config, "broker.enable_session_registry = false"} | Config].
 
 end_per_group(_Grp, Config) ->
-    emqx_cth_suite:stop(?config(apps, Config)).
+    case ?config(apps, Config) of
+        undefined -> ok;
+        Apps -> emqx_cth_suite:stop(Apps)
+    end.
 
 init_per_suite(Config) ->
     Config.
@@ -447,6 +459,38 @@ t_persist_list_subs(_) ->
     %% clients:
     VerifySubs().
 
+t_call_client_cluster(Config) ->
+    [Node1, Node2] = ?config(cluster, Config),
+    {ok, Node1Client, Node1ClientId} = connect_client(Node1),
+    {ok, Node2Client, Node2ClientId} = connect_client(Node2),
+    ?assertMatch(
+        {[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId))
+    ),
+    ?assertMatch(
+        {[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId))
+    ),
+    ?assertMatch(
+        {[], #{}}, rpc:call(Node1, emqx_mgmt, list_client_msgs, client_msgs_args(Node2ClientId))
+    ),
+    ?assertMatch(
+        {[], #{}}, rpc:call(Node2, emqx_mgmt, list_client_msgs, client_msgs_args(Node1ClientId))
+    ),
+    _ = emqtt:stop(Node1Client),
+    _ = emqtt:stop(Node2Client).
+
+t_call_client_cluster(init, Config) ->
+    Apps = [{emqx, ?config(emqx_config, Config)}, emqx_management],
+    Cluster = emqx_cth_cluster:start(
+        [
+            {list_to_atom(atom_to_list(?MODULE) ++ "1"), #{role => core, apps => Apps}},
+            {list_to_atom(atom_to_list(?MODULE) ++ "2"), #{role => core, apps => Apps}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+    ),
+    [{cluster, Cluster} | Config];
+t_call_client_cluster('end', Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config)).
+
 %%% helpers
 ident(Arg) ->
     Arg.
@@ -462,3 +506,21 @@ setup_clients(Config) ->
 disconnect_clients(Config) ->
     Clients = ?config(clients, Config),
     lists:foreach(fun emqtt:disconnect/1, Clients).
+
+get_mqtt_port(Node) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.
+
+connect_client(Node) ->
+    Port = get_mqtt_port(Node),
+    ClientId = <<(atom_to_binary(Node))/binary, "_client">>,
+    {ok, Client} = emqtt:start_link([
+        {port, Port},
+        {proto_ver, v5},
+        {clientid, ClientId}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, Client, ClientId}.
+
+client_msgs_args(ClientId) ->
+    [mqueue_msgs, ClientId, #{limit => 10, continuation => none}].