Просмотр исходного кода

Merge pull request #13017 from thalesmg/fix-ds-subs-pages-r57-20240510

fix(subs mgmt api): attempt to return mixed clients ds/non-ds in the same page
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
7023e6ad96

+ 41 - 6
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -310,7 +310,12 @@ consume_n_matching(Map, Pred, N, S) ->
 consume_n_matching(_Map, _Pred, _N, [], Acc) ->
     {lists:reverse(Acc), []};
 consume_n_matching(_Map, _Pred, 0, S, Acc) ->
-    {lists:reverse(Acc), S};
+    case emqx_utils_stream:next(S) of
+        [] ->
+            {lists:reverse(Acc), []};
+        _ ->
+            {lists:reverse(Acc), S}
+    end;
 consume_n_matching(Map, Pred, N, S0, Acc) ->
     case emqx_utils_stream:next(S0) of
         [] ->
@@ -396,11 +401,16 @@ merge_queries(QString0, Q1, Q2) ->
             Q2Page = ceil(C1 / Limit),
             case Page =< Q2Page of
                 true ->
-                    #{data := Data, meta := #{hasnext := HN}} = Q1(QString0),
-                    #{
-                        data => Data,
-                        meta => Meta#{hasnext => HN orelse C2 > 0}
-                    };
+                    #{data := Data1, meta := #{hasnext := HN1}} = Q1(QString0),
+                    maybe_fetch_from_second_query(#{
+                        rows1 => Data1,
+                        limit => Limit,
+                        hasnext1 => HN1,
+                        meta => Meta,
+                        count2 => C2,
+                        query2 => Q2,
+                        query_string => QString0
+                    });
                 false ->
                     QString = QString0#{<<"page">> => Page - Q2Page},
                     #{data := Data, meta := #{hasnext := HN}} = Q2(QString),
@@ -421,6 +431,31 @@ merge_queries(QString0, Q1, Q2) ->
             }
     end.
 
+maybe_fetch_from_second_query(Params) ->
+    #{
+        rows1 := Data1,
+        limit := Limit,
+        hasnext1 := HN1,
+        meta := Meta,
+        count2 := C2,
+        query2 := Q2,
+        query_string := QString0
+    } = Params,
+    NumRows1 = length(Data1),
+    {Data, HN} =
+        case (NumRows1 >= Limit) orelse HN1 of
+            true ->
+                {Data1, HN1 orelse C2 > 0};
+            false ->
+                #{data := Data2, meta := #{hasnext := HN2}} =
+                    Q2(QString0#{<<"limit">> := Limit - NumRows1}),
+                {Data1 ++ Data2, HN2}
+        end,
+    #{
+        data => Data,
+        meta => Meta#{hasnext => HN}
+    }.
+
 resp_count(Query, QFun) ->
     #{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}),
     maps:get(count, Meta, undefined).

+ 0 - 1
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -24,7 +24,6 @@
 -include_lib("proper/include/proper.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/asserts.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
 
 all() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),

+ 94 - 2
apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl

@@ -20,6 +20,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 
 -define(CLIENTID, <<"api_clientid">>).
 -define(USERNAME, <<"api_username">>).
@@ -42,11 +43,18 @@ all() ->
     ].
 
 groups() ->
-    CommonTCs = emqx_common_test_helpers:all(?MODULE),
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    CommonTCs = AllTCs -- persistent_only_tcs(),
     [
         {mem, CommonTCs},
         %% Shared subscriptions are currently not supported:
-        {persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]}
+        {persistent,
+            (CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()}
+    ].
+
+persistent_only_tcs() ->
+    [
+        t_mixed_persistent_sessions
     ].
 
 init_per_suite(Config) ->
@@ -158,6 +166,51 @@ t_subscription_api(Config) ->
     SubscriptionsList2 = maps:get(<<"data">>, DataTopic2),
     ?assertEqual(length(SubscriptionsList2), 1).
 
+%% Checks a few edge cases where persistent and non-persistent client subscriptions exist.
+t_mixed_persistent_sessions(Config) ->
+    ClientConfig = ?config(client_config, Config),
+    PersistentClient = ?config(client, Config),
+    {ok, MemClient} = emqtt:start_link(ClientConfig#{clientid => <<"mem">>, properties => #{}}),
+    {ok, _} = emqtt:connect(MemClient),
+
+    {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(PersistentClient, <<"t/1">>, 1),
+    {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(MemClient, <<"t/1">>, 1),
+
+    %% First page with sufficient limit should have both mem and DS clients.
+    ?assertMatch(
+        {ok,
+            {{_, 200, _}, _, #{
+                <<"data">> := [_, _],
+                <<"meta">> :=
+                    #{
+                        <<"hasnext">> := false,
+                        <<"count">> := 2
+                    }
+            }}},
+        get_subs(#{page => "1"})
+    ),
+
+    ?assertMatch(
+        {ok,
+            {{_, 200, _}, _, #{
+                <<"data">> := [_],
+                <<"meta">> := #{<<"hasnext">> := true}
+            }}},
+        get_subs(#{page => "1", limit => "1"})
+    ),
+    ?assertMatch(
+        {ok,
+            {{_, 200, _}, _, #{
+                <<"data">> := [_],
+                <<"meta">> := #{<<"hasnext">> := false}
+            }}},
+        get_subs(#{page => "2", limit => "1"})
+    ),
+
+    emqtt:disconnect(MemClient),
+
+    ok.
+
 t_subscription_fuzzy_search(Config) ->
     Client = proplists:get_value(client, Config),
     Durable = atom_to_list(?config(durable, Config)),
@@ -272,3 +325,42 @@ request_json(Method, Query, Headers) when is_list(Query) ->
 
 path() ->
     emqx_mgmt_api_test_util:api_path(["subscriptions"]).
+
+get_subs() ->
+    get_subs(_QueryParams = #{}).
+
+get_subs(QueryParams = #{}) ->
+    QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))),
+    request(get, path(), [], QS).
+
+request(Method, Path, Params) ->
+    request(Method, Path, Params, _QueryParams = "").
+
+request(Method, Path, Params, QueryParams) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of
+        {ok, {Status, Headers, Body0}} ->
+            Body = maybe_json_decode(Body0),
+            {ok, {Status, Headers, Body}};
+        {error, {Status, Headers, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+                        Msg = maybe_json_decode(Msg0),
+                        Decoded0#{<<"message">> := Msg};
+                    {ok, Decoded0} ->
+                        Decoded0;
+                    {error, _} ->
+                        Body0
+                end,
+            {error, {Status, Headers, Body}};
+        Error ->
+            Error
+    end.
+
+maybe_json_decode(X) ->
+    case emqx_utils_json:safe_decode(X, [return_maps]) of
+        {ok, Decoded} -> Decoded;
+        {error, _} -> X
+    end.