Pārlūkot izejas kodu

fix(client mgmt api): make bulk subscribe work again in clusters

Fixes https://emqx.atlassian.net/browse/EMQX-12337
Thales Macedo Garitezi 1 gadu atpakaļ
vecāks
revīzija
7b7f44b9ac

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -2,7 +2,7 @@
 {application, emqx_management, [
     {description, "EMQX Management API and CLI"},
     % strict semver, bump manually!
-    {vsn, "5.2.2"},
+    {vsn, "5.2.3"},
     {modules, []},
     {registered, [emqx_management_sup]},
     {applications, [

+ 14 - 4
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -1224,10 +1224,20 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
     end.
 
 subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
-    %% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2),
-    %% as the emqx_channel_info table will only be populated after the hook `client.connected`
-    %% has returned. So if one want to subscribe topics in this hook, it will fail.
-    case ets:lookup(?CHAN_TAB, ClientID) of
+    %% On the one hand, we first try to use `emqx_channel' instead of `emqx_channel_info'
+    %% (used by the `emqx_mgmt:lookup_client/2'), as the `emqx_channel_info' table will
+    %% only be populated after the hook `client.connected' has returned. So if one want to
+    %% subscribe topics in this hook, it will fail.
+    %% ... On the other hand, using only `emqx_channel' would render this API unusable if
+    %% called from a node that doesn't have hold the targeted client connection, so we
+    %% fall back to `emqx_mgmt:lookup_client/2', which consults the global registry.
+    Result1 = ets:lookup(?CHAN_TAB, ClientID),
+    Result =
+        case Result1 of
+            [] -> emqx_mgmt:lookup_client({clientid, ClientID}, _FormatFn = undefined);
+            _ -> Result1
+        end,
+    case Result of
         [] ->
             {404, ?CLIENTID_NOT_FOUND};
         _ ->

+ 94 - 3
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -29,14 +29,18 @@ all() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
         {group, persistent_sessions},
+        {group, non_persistent_cluster},
         {group, msgs_base64_encoding},
         {group, msgs_plain_encoding}
-        | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases())
+        | AllTCs --
+            (persistent_session_testcases() ++
+                non_persistent_cluster_testcases() ++ client_msgs_testcases())
     ].
 
 groups() ->
     [
         {persistent_sessions, persistent_session_testcases()},
+        {non_persistent_cluster, non_persistent_cluster_testcases()},
         {msgs_base64_encoding, client_msgs_testcases()},
         {msgs_plain_encoding, client_msgs_testcases()}
     ].
@@ -52,6 +56,10 @@ persistent_session_testcases() ->
         t_persistent_sessions_subscriptions1,
         t_list_clients_v2
     ].
+non_persistent_cluster_testcases() ->
+    [
+        t_bulk_subscribe
+    ].
 client_msgs_testcases() ->
     [
         t_inflight_messages,
@@ -96,6 +104,24 @@ init_per_group(persistent_sessions, Config) ->
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     [{nodes, Nodes} | Config];
+init_per_group(non_persistent_cluster, Config) ->
+    AppSpecs = [
+        emqx,
+        emqx_conf,
+        emqx_management
+    ],
+    Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
+        "dashboard.listeners.http { enable = true, bind = 18084 }"
+    ),
+    Cluster = [
+        {mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}},
+        {mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}}
+    ],
+    Nodes = emqx_cth_cluster:start(
+        Cluster,
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{nodes, Nodes} | Config];
 init_per_group(msgs_base64_encoding, Config) ->
     [{payload_encoding, base64} | Config];
 init_per_group(msgs_plain_encoding, Config) ->
@@ -103,7 +129,10 @@ init_per_group(msgs_plain_encoding, Config) ->
 init_per_group(_Group, Config) ->
     Config.
 
-end_per_group(persistent_sessions, Config) ->
+end_per_group(Group, Config) when
+    Group =:= persistent_sessions;
+    Group =:= non_persistent_cluster
+->
     Nodes = ?config(nodes, Config),
     emqx_cth_cluster:stop(Nodes),
     ok;
@@ -1572,6 +1601,42 @@ t_subscribe_shared_topic_nl(_Config) ->
         PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1})
     ).
 
+%% Checks that we can use the bulk subscribe API on a different node than the one a client
+%% is connected to.
+t_bulk_subscribe(Config) ->
+    [N1, N2] = ?config(nodes, Config),
+    APIPort = 18084,
+    Port1 = get_mqtt_port(N1, tcp),
+    Port2 = get_mqtt_port(N2, tcp),
+    ?check_trace(
+        begin
+            ClientId1 = <<"bulk-sub1">>,
+            _C1 = connect_client(#{port => Port2, clientid => ClientId1, clean_start => true}),
+            ClientId2 = <<"bulk-sub2">>,
+            C2 = connect_client(#{port => Port1, clientid => ClientId2, clean_start => true}),
+            Topic = <<"testtopic">>,
+            BulkSub = [#{topic => Topic, qos => 1, nl => 1, rh => 1}],
+            ?assertMatch({200, [_]}, bulk_subscribe_request(APIPort, ClientId1, BulkSub)),
+            ?assertMatch(
+                {200, [_]},
+                get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true})
+            ),
+            {ok, _} = emqtt:publish(C2, Topic, <<"hi1">>, [{qos, 1}]),
+            ?assertReceive({publish, #{topic := Topic, payload := <<"hi1">>}}),
+            BulkUnsub = [#{topic => Topic}],
+            ?assertMatch({204, _}, bulk_unsubscribe_request(APIPort, ClientId1, BulkUnsub)),
+            ?assertMatch(
+                {200, []},
+                get_subscriptions_request(APIPort, ClientId1, #{simplify_result => true})
+            ),
+            {ok, _} = emqtt:publish(C2, Topic, <<"hi2">>, [{qos, 1}]),
+            ?assertNotReceive({publish, _}),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_list_clients_v2(Config) ->
     [N1, N2] = ?config(nodes, Config),
     APIPort = 18084,
@@ -1935,9 +2000,17 @@ maybe_json_decode(X) ->
     end.
 
 get_subscriptions_request(APIPort, ClientId) ->
+    get_subscriptions_request(APIPort, ClientId, _Opts = #{}).
+
+get_subscriptions_request(APIPort, ClientId, Opts) ->
+    Simplify = maps:get(simplify_result, Opts, false),
     Host = "http://127.0.0.1:" ++ integer_to_list(APIPort),
     Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscriptions"]),
-    request(get, Path, []).
+    Res = request(get, Path, []),
+    case Simplify of
+        true -> simplify_result(Res);
+        false -> Res
+    end.
 
 get_client_request(Port, ClientId) ->
     Host = "http://127.0.0.1:" ++ integer_to_list(Port),
@@ -1952,6 +2025,24 @@ list_request(Port, QueryParams) ->
     Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]),
     request(get, Path, [], QueryParams).
 
+bulk_subscribe_request(Port, ClientId, Body) ->
+    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "subscribe", "bulk"]),
+    simplify_result(request(post, Path, Body)).
+
+bulk_unsubscribe_request(Port, ClientId, Body) ->
+    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId, "unsubscribe", "bulk"]),
+    simplify_result(request(post, Path, Body)).
+
+simplify_result(Res) ->
+    case Res of
+        {error, {{_, Status, _}, _, Body}} ->
+            {Status, Body};
+        {ok, {{_, Status, _}, _, Body}} ->
+            {Status, Body}
+    end.
+
 list_v2_request(Port, QueryParams = #{}) ->
     Host = "http://127.0.0.1:" ++ integer_to_list(Port),
     Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]),

+ 1 - 0
changes/ce/fix-13344.en.md

@@ -0,0 +1 @@
+Fixed an issue that prevented the `POST /clients/:clientid/subscribe/bulk` API from working properly if the node receiving the API request did not hold the connection to the targeted clientid.