Просмотр исходного кода

feat(sessds): List persistent subscriptions in the REST API

ieQu1 1 год назад
Родитель
Сommit
180130d684

+ 207 - 3
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -176,7 +176,8 @@ format(WhichNode, {{Topic, _Subscriber}, SubOpts}) ->
         #{
             topic => emqx_topic:maybe_format_share(Topic),
             clientid => maps:get(subid, SubOpts, null),
-            node => WhichNode
+            node => WhichNode,
+            durable => false
         },
         maps:with([qos, nl, rap, rh], SubOpts)
     ).
@@ -196,7 +197,22 @@ check_match_topic(#{<<"match_topic">> := MatchTopic}) ->
 check_match_topic(_) ->
     ok.
 
-do_subscriptions_query(QString) ->
+do_subscriptions_query(QString0) ->
+    {IsDurable, QString} = maps:take(
+        <<"durable">>, maps:merge(#{<<"durable">> => undefined}, QString0)
+    ),
+    case emqx_persistent_message:is_persistence_enabled() andalso IsDurable of
+        false ->
+            do_subscriptions_query_mem(QString);
+        true ->
+            do_subscriptions_query_persistent(QString);
+        undefined ->
+            merge_queries(
+                QString, fun do_subscriptions_query_mem/1, fun do_subscriptions_query_persistent/1
+            )
+    end.
+
+do_subscriptions_query_mem(QString) ->
     Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2],
     case maps:get(<<"node">>, QString, undefined) of
         undefined ->
@@ -210,8 +226,196 @@ do_subscriptions_query(QString) ->
             end
     end.
 
+do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = QString) ->
+    Count = emqx_persistent_session_ds_router:stats(n_routes),
+    %% TODO: filtering by client ID can be implemented more efficiently:
+    FilterTopic = maps:get(<<"topic">>, QString, '_'),
+    Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic),
+    SubPred = fun(Sub) ->
+        compare_optional(<<"topic">>, QString, topic, Sub) andalso
+            compare_optional(<<"clientid">>, QString, clientid, Sub) andalso
+            compare_optional(<<"qos">>, QString, qos, Sub) andalso
+            compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub)
+    end,
+    NDropped = (Page - 1) * Limit,
+    {_, Stream} = consume_n_matching(
+        fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0
+    ),
+    {Subscriptions, Stream1} = consume_n_matching(
+        fun persistent_route_to_subscription/1, SubPred, Limit, Stream
+    ),
+    HasNext = Stream1 =/= [],
+    Meta =
+        case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of
+            true ->
+                %% Fuzzy searches shouldn't return count:
+                #{
+                    limit => Limit,
+                    page => Page,
+                    hasnext => HasNext
+                };
+            false ->
+                #{
+                    count => Count,
+                    limit => Limit,
+                    page => Page,
+                    hasnext => HasNext
+                }
+        end,
+
+    #{
+        meta => Meta,
+        data => Subscriptions
+    }.
+
+compare_optional(QField, Query, SField, Subscription) ->
+    case Query of
+        #{QField := Expected} ->
+            maps:get(SField, Subscription) =:= Expected;
+        _ ->
+            true
+    end.
+
+compare_match_topic_optional(QField, Query, SField, Subscription) ->
+    case Query of
+        #{QField := TopicFilter} ->
+            Topic = maps:get(SField, Subscription),
+            emqx_topic:match(Topic, TopicFilter);
+        _ ->
+            true
+    end.
+
+%% @doc Drop elements from the stream until encountered N elements
+%% matching the predicate function.
+-spec consume_n_matching(
+    fun((T) -> Q),
+    fun((Q) -> boolean()),
+    non_neg_integer(),
+    emqx_utils_stream:stream(T)
+) -> {[Q], emqx_utils_stream:stream(T) | empty}.
+consume_n_matching(Map, Pred, N, S) ->
+    consume_n_matching(Map, Pred, N, S, []).
+
+consume_n_matching(_Map, _Pred, _N, [], Acc) ->
+    {lists:reverse(Acc), []};
+consume_n_matching(_Map, _Pred, 0, S, Acc) ->
+    {lists:reverse(Acc), S};
+consume_n_matching(Map, Pred, N, S0, Acc) ->
+    case emqx_utils_stream:next(S0) of
+        [] ->
+            consume_n_matching(Map, Pred, N, [], Acc);
+        [Elem | S] ->
+            Mapped = Map(Elem),
+            case Pred(Mapped) of
+                true -> consume_n_matching(Map, Pred, N - 1, S, [Mapped | Acc]);
+                false -> consume_n_matching(Map, Pred, N, S, Acc)
+            end
+    end.
+
+persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) ->
+    case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of
+        #{subopts := SubOpts} ->
+            #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts,
+            #{
+                topic => Topic,
+                clientid => SessionId,
+                node => all,
+
+                qos => Qos,
+                nl => Nl,
+                rh => Rh,
+                rap => Rap,
+                durable => true
+            };
+        undefined ->
+            #{
+                topic => Topic,
+                clientid => SessionId,
+                node => all,
+                durable => true
+            }
+    end.
+
+%% @private This function merges paginated results from two sources.
+%%
+%% Note: this implementation is far from ideal: `count' for the
+%% queries may be missing, it may be larger than the actual number of
+%% elements. This may lead to empty pages that can confuse the user.
+%%
+%% Not much can be done to mitigate that, though: since the count may
+%% be incorrect, we cannot run simple math to determine when one
+%% stream begins and another ends: it requires actual iteration.
+%%
+%% Ideally, the dashboard must be split between durable and mem
+%% subscriptions, and this function should be removed for good.
+merge_queries(QString0, Q1, Q2) ->
+    #{<<"limit">> := Limit, <<"page">> := Page} = QString0,
+    C1 = resp_count(QString0, Q1),
+    C2 = resp_count(QString0, Q2),
+    Meta =
+        case is_number(C1) andalso is_number(C2) of
+            true ->
+                #{
+                    count => C1 + C2,
+                    limit => Limit,
+                    page => Page
+                };
+            false ->
+                #{
+                    limit => Limit,
+                    page => Page
+                }
+        end,
+    case {C1, C2} of
+        {_, 0} ->
+            %% The second query is empty. Just return the result of Q1 as usual:
+            Q1(QString0);
+        {0, _} ->
+            %% The first query is empty. Just return the result of Q2 as usual:
+            Q2(QString0);
+        _ when is_number(C1) ->
+            %% Both queries are potentially non-empty, but we at least
+            %% have the page number for the first query. We try to
+            %% stich the pages together and thus respect the limit
+            %% (except for the page where the results switch from Q1
+            %% to Q2).
+
+            %% Page where data from the second query is estimated to
+            %% begin:
+            Q2Page = ceil(C1 / Limit),
+            case Page =< Q2Page of
+                true ->
+                    #{data := Data, meta := #{hasnext := HN}} = Q1(QString0),
+                    #{
+                        data => Data,
+                        meta => Meta#{hasnext => HN orelse C2 > 0}
+                    };
+                false ->
+                    QString = QString0#{<<"page">> => Page - Q2Page},
+                    #{data := Data, meta := #{hasnext := HN}} = Q2(QString),
+                    #{data => Data, meta => Meta#{hasnext => HN}}
+            end;
+        _ ->
+            %% We don't know how many items is there in the first
+            %% query, and the second query is not empty (this includes
+            %% the case where `C2' is `undefined'). Best we can do is
+            %% to interleave the queries. This may produce less
+            %% results per page than `Limit'.
+            QString = QString0#{<<"limit">> => ceil(Limit / 2)},
+            #{data := D1, meta := #{hasnext := HN1}} = Q1(QString),
+            #{data := D2, meta := #{hasnext := HN2}} = Q2(QString),
+            #{
+                meta => Meta#{hasnext => HN1 or HN2},
+                data => D1 ++ D2
+            }
+    end.
+
+resp_count(Query, QFun) ->
+    #{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}),
+    maps:get(count, Meta, undefined).
+
 %%--------------------------------------------------------------------
-%% QueryString to MatchSpec
+%% QueryString to MatchSpec (mem sessions)
 %%--------------------------------------------------------------------
 
 -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().

+ 71 - 20
apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl

@@ -36,17 +36,72 @@
 -define(TOPIC_SORT, #{?TOPIC1 => 1, ?TOPIC2 => 2}).
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        {group, mem},
+        {group, persistent}
+    ].
+
+groups() ->
+    CommonTCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {mem, CommonTCs},
+        %% Shared subscriptions are currently not supported:
+        {persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]}
+    ].
 
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite(),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx,
+                "session_persistence {\n"
+                "    enable = true\n"
+                "    renew_streams_interval = 10ms\n"
+                "}"},
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
+
+init_per_group(persistent, Config) ->
+    ClientConfig = #{
+        username => ?USERNAME,
+        clientid => ?CLIENTID,
+        proto_ver => v5,
+        clean_start => true,
+        properties => #{'Session-Expiry-Interval' => 300}
+    },
+    [{client_config, ClientConfig}, {durable, true} | Config];
+init_per_group(mem, Config) ->
+    ClientConfig = #{
+        username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5, clean_start => true
+    },
+    [{client_config, ClientConfig}, {durable, false} | Config].
+
+end_per_group(_, Config) ->
     Config.
 
-end_per_suite(_) ->
-    emqx_mgmt_api_test_util:end_suite().
+init_per_testcase(_TC, Config) ->
+    case ?config(client_config, Config) of
+        ClientConfig when is_map(ClientConfig) ->
+            {ok, Client} = emqtt:start_link(ClientConfig),
+            {ok, _} = emqtt:connect(Client),
+            [{client, Client} | Config];
+        _ ->
+            Config
+    end.
+
+end_per_testcase(_TC, Config) ->
+    Client = proplists:get_value(client, Config),
+    emqtt:disconnect(Client).
 
 t_subscription_api(Config) ->
     Client = proplists:get_value(client, Config),
+    Durable = atom_to_list(?config(durable, Config)),
     {ok, _, _} = emqtt:subscribe(
         Client, [
             {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]}
@@ -54,12 +109,13 @@ t_subscription_api(Config) ->
     ),
     {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2),
     Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]),
+    timer:sleep(100),
     {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path),
     Data = emqx_utils_json:decode(Response, [return_maps]),
     Meta = maps:get(<<"meta">>, Data),
     ?assertEqual(1, maps:get(<<"page">>, Meta)),
     ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)),
-    ?assertEqual(2, maps:get(<<"count">>, Meta)),
+    ?assertEqual(2, maps:get(<<"count">>, Meta), Data),
     Subscriptions = maps:get(<<"data">>, Data),
     ?assertEqual(length(Subscriptions), 2),
     Sort =
@@ -90,7 +146,8 @@ t_subscription_api(Config) ->
         {"node", atom_to_list(node())},
         {"qos", "0"},
         {"share_group", "test_group"},
-        {"match_topic", "t/#"}
+        {"match_topic", "t/#"},
+        {"durable", Durable}
     ],
     Headers = emqx_mgmt_api_test_util:auth_header_(),
 
@@ -103,6 +160,7 @@ t_subscription_api(Config) ->
 
 t_subscription_fuzzy_search(Config) ->
     Client = proplists:get_value(client, Config),
+    Durable = atom_to_list(?config(durable, Config)),
     Topics = [
         <<"t/foo">>,
         <<"t/foo/bar">>,
@@ -116,7 +174,8 @@ t_subscription_fuzzy_search(Config) ->
     MatchQs = [
         {"clientid", ?CLIENTID},
         {"node", atom_to_list(node())},
-        {"match_topic", "t/#"}
+        {"match_topic", "t/#"},
+        {"durable", Durable}
     ],
 
     MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers),
@@ -130,12 +189,13 @@ t_subscription_fuzzy_search(Config) ->
     LimitMatchQuery = [
         {"clientid", ?CLIENTID},
         {"match_topic", "+/+/+"},
-        {"limit", "3"}
+        {"limit", "3"},
+        {"durable", Durable}
     ],
 
     MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers),
     ?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2),
-    ?assertEqual(3, length(maps:get(<<"data">>, MatchData2))),
+    ?assertEqual(3, length(maps:get(<<"data">>, MatchData2)), MatchData2),
 
     MatchData2P2 =
         #{<<"meta">> := MatchMeta2P2} =
@@ -176,8 +236,8 @@ t_list_with_shared_sub(_Config) ->
 
     ok.
 
-t_list_with_invalid_match_topic(_Config) ->
-    Client = proplists:get_value(client, _Config),
+t_list_with_invalid_match_topic(Config) ->
+    Client = proplists:get_value(client, Config),
     RealTopic = <<"t/+">>,
     Topic = <<"$share/g1/", RealTopic/binary>>,
 
@@ -212,12 +272,3 @@ request_json(Method, Query, Headers) when is_list(Query) ->
 
 path() ->
     emqx_mgmt_api_test_util:api_path(["subscriptions"]).
-
-init_per_testcase(_TC, Config) ->
-    {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}),
-    {ok, _} = emqtt:connect(Client),
-    [{client, Client} | Config].
-
-end_per_testcase(_TC, Config) ->
-    Client = proplists:get_value(client, Config),
-    emqtt:disconnect(Client).

+ 7 - 0
changes/ce/fix-12874.en.md

@@ -0,0 +1,7 @@
+- Ensure consistency of the durable message replay when the subscriptions are modified before session reconnects
+
+- Persistent sessions save inflight packet IDs for the received QoS2 messages
+
+- Make behavior of the persistent sessions consistent with the non-persistent sessions in regard to overlapping subscriptions
+
+- List persistent subscriptions in the REST API