Просмотр исходного кода

feat(sessds): use session storage to serve Subscription API

Andrew Mayorov 1 год назад
Родитель
Сommit
f68cf4364c

+ 57 - 40
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -78,7 +78,9 @@
     n_awaiting_rel/1
 ]).
 
+%% Iterating storage:
 -export([make_session_iterator/0, session_iterator_next/2]).
+-export([make_subscription_iterator/0, subscription_iterator_next/2]).
 
 -export_type([
     t/0,
@@ -120,10 +122,10 @@
 -endif.
 
 -ifdef(STORE_STATE_IN_DS).
--opaque session_iterator() :: #{its := [emqx_ds:iterator()]} | '$end_of_table'.
+-opaque session_iterator() :: #{its := [emqx_ds:iterator()]}.
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
--opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
+-opaque session_iterator() :: emqx_persistent_session_ds:id().
 %% END ifdef(STORE_STATE_IN_DS).
 -endif.
 
@@ -918,62 +920,57 @@ n_awaiting_rel(Rec) ->
 %%
 
 -spec make_session_iterator() -> session_iterator().
+-spec session_iterator_next(session_iterator(), pos_integer()) ->
+    {[{emqx_persistent_session_ds:id(), metadata()}], session_iterator() | '$end_of_table'}.
+
 -ifdef(STORE_STATE_IN_DS).
+
 make_session_iterator() ->
     %% NB. This hides the existence of streams.  Users will need to start iteration
     %% again to see new streams, if any.
     %% NB. This is not assumed to be stored permanently anywhere.
-    Time = ?TS,
-    TopicFilter = [
-        <<"session">>,
-        '+',
-        ?metadata_domain_bin,
-        ?metadata_domain_bin
-    ],
-    Iterators = lists:map(
+    TopicFilter = [?session_topic_ns, '+', ?metadata_domain_bin, ?metadata_domain_bin],
+    #{its => make_iterators(TopicFilter, ?TS)}.
+
+make_subscription_iterator() ->
+    %% NB. Same as above.
+    TopicFilter = [?session_topic_ns, '+', ?subscription_domain_bin, '+'],
+    #{its => make_iterators(TopicFilter, ?TS)}.
+
+make_iterators(TopicFilter, Time) ->
+    lists:map(
         fun({_Rank, Stream}) ->
             {ok, Iterator} = emqx_ds:make_iterator(?DB, Stream, TopicFilter, Time),
             Iterator
         end,
         emqx_ds:get_streams(?DB, TopicFilter, Time)
-    ),
-    #{its => Iterators}.
-%% ELSE ifdef(STORE_STATE_IN_DS).
--else.
-make_session_iterator() ->
-    mnesia:dirty_first(?session_tab).
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
+    ).
 
--spec session_iterator_next(session_iterator(), pos_integer()) ->
-    {[{emqx_persistent_session_ds:id(), metadata()}], session_iterator() | '$end_of_table'}.
--ifdef(STORE_STATE_IN_DS).
 session_iterator_next(Cursor, N) ->
-    session_iterator_next(Cursor, N, []).
+    domain_iterator_next(Cursor, N, []).
+
+subscription_iterator_next(Cursor, N) ->
+    domain_iterator_next(Cursor, N, []).
 
 %% Note: ordering is not respected here.
-session_iterator_next(#{its := [It | Rest]} = Cursor, 0, Acc) ->
+domain_iterator_next(#{its := [It | Rest]} = Cursor, 0, Acc) ->
     %% Peek the next item to detect end of table.
     case emqx_ds:next(?DB, It, 1) of
         {ok, end_of_stream} ->
-            session_iterator_next(Cursor#{its := Rest}, 0, Acc);
+            domain_iterator_next(Cursor#{its := Rest}, 0, Acc);
         {ok, _NewIt, []} ->
-            session_iterator_next(Cursor#{its := Rest}, 0, Acc);
+            domain_iterator_next(Cursor#{its := Rest}, 0, Acc);
         {ok, _NewIt, _Batch} ->
             {Acc, Cursor}
     end;
-session_iterator_next(_Cursor, 0, Acc) ->
+domain_iterator_next(#{its := []}, _N, Acc) ->
     {Acc, '$end_of_table'};
-session_iterator_next('$end_of_table', _N, Acc) ->
-    {Acc, '$end_of_table'};
-session_iterator_next(#{its := []}, _N, Acc) ->
-    {Acc, '$end_of_table'};
-session_iterator_next(#{its := [It | Rest]} = Cursor0, N, Acc) ->
+domain_iterator_next(#{its := [It | Rest]} = Cursor0, N, Acc) ->
     case emqx_ds:next(?DB, It, N) of
         {ok, end_of_stream} ->
-            session_iterator_next(Cursor0#{its := Rest}, N, Acc);
+            domain_iterator_next(Cursor0#{its := Rest}, N, Acc);
         {ok, _NewIt, []} ->
-            session_iterator_next(Cursor0#{its := Rest}, N, Acc);
+            domain_iterator_next(Cursor0#{its := Rest}, N, Acc);
         {ok, NewIt, Batch} ->
             NumBatch = length(Batch),
             SessionIds = lists:map(
@@ -984,7 +981,7 @@ session_iterator_next(#{its := [It | Rest]} = Cursor0, N, Acc) ->
                 end,
                 Batch
             ),
-            session_iterator_next(
+            domain_iterator_next(
                 Cursor0#{its := [NewIt | Rest]}, N - NumBatch, SessionIds ++ Acc
             )
     end.
@@ -993,19 +990,39 @@ unwrap_value(#{domain := ?metadata_domain, val := #{metadata := Metadata}}) ->
     Metadata;
 unwrap_value(#{val := Val}) ->
     Val.
+
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
-session_iterator_next(Cursor, 0) ->
+
+make_session_iterator() ->
+    mnesia:dirty_first(?session_tab).
+
+session_iterator_next(Cursor, N) ->
+    mnesia_iterator_next(#{values => true}, ?session_tab, Cursor, N).
+
+make_subscription_iterator() ->
+    mnesia:dirty_first(?subscription_tab).
+
+subscription_iterator_next(Cursor, N) ->
+    mnesia_iterator_next(#{values => false}, ?subscription_tab, Cursor, N).
+
+mnesia_iterator_next(_Opts, _Tab, Cursor, 0) ->
     {[], Cursor};
-session_iterator_next('$end_of_table', _N) ->
+mnesia_iterator_next(_Opts, _Tab, '$end_of_table', _N) ->
     {[], '$end_of_table'};
-session_iterator_next(Cursor0, N) ->
+mnesia_iterator_next(Opts = #{values := true}, Tab, Cursor0, N) ->
     ThisVal = [
         {Cursor0, Metadata}
-     || #kv{v = Metadata} <- mnesia:dirty_read(?session_tab, Cursor0)
+     || #kv{v = Metadata} <- mnesia:dirty_read(Tab, Cursor0)
     ],
-    {NextVals, Cursor} = session_iterator_next(mnesia:dirty_next(?session_tab, Cursor0), N - 1),
-    {ThisVal ++ NextVals, Cursor}.
+    Cursor1 = mnesia:dirty_next(Tab, Cursor0),
+    {NextVals, Cursor} = mnesia_iterator_next(Opts, Tab, Cursor1, N - 1),
+    {ThisVal ++ NextVals, Cursor};
+mnesia_iterator_next(Opts = #{values := false}, Tab, Cursor0, N) ->
+    Cursor1 = mnesia:dirty_next(Tab, Cursor0),
+    {NextVals, Cursor} = mnesia_iterator_next(Opts, Tab, Cursor1, N - 1),
+    {[Cursor0 | NextVals], Cursor}.
+
 %% END ifdef(STORE_STATE_IN_DS).
 -endif.
 

+ 95 - 121
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -238,159 +238,133 @@ do_subscriptions_query_mem(QString) ->
     end.
 
 do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = QString) ->
-    Count = emqx_persistent_session_ds_router:stats(n_routes),
-    %% TODO: filtering by client ID can be implemented more efficiently:
-    FilterTopic = maps:get(<<"topic">>, QString, '_'),
-    Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic),
-
+    PageSize = 100,
+    Stream0 = emqx_utils_stream:ets(
+        %% FIXME
+        fun
+            (undefined) ->
+                {[], emqx_persistent_session_ds_state:make_subscription_iterator()};
+            (It) ->
+                emqx_persistent_session_ds_state:subscription_iterator_next(It, PageSize)
+        end
+    ),
+    SubMap = fun enrich_dssub/1,
     SubPred = fun(Sub) ->
-        compare_optional(<<"topic">>, QString, '_real_topic', Sub) andalso
-            compare_optional(<<"clientid">>, QString, clientid, Sub) andalso
-            compare_optional(<<"qos">>, QString, qos, Sub) andalso
-            compare_optional(<<"share_group">>, QString, '_group', Sub) andalso
-            compare_match_topic_optional(<<"match_topic">>, QString, '_real_topic', Sub)
+        Sub =/= undefined andalso
+            compare_optional(<<"topic">>, QString, fun dssub_topic/1, Sub) andalso
+            compare_optional(<<"clientid">>, QString, fun dssub_session_id/1, Sub) andalso
+            compare_optional(<<"qos">>, QString, fun dssub_qos/1, Sub) andalso
+            compare_optional(<<"share_group">>, QString, fun dssub_group/1, Sub) andalso
+            compare_match_topic_optional(<<"match_topic">>, QString, fun dssub_topic/1, Sub)
     end,
     NDropped = (Page - 1) * Limit,
-    {_, Stream} = consume_n_matching(
-        fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0
-    ),
-    {Subscriptions0, Stream1} = consume_n_matching(
-        fun persistent_route_to_subscription/1, SubPred, Limit, Stream
-    ),
-    HasNext = Stream1 =/= [],
-    Subscriptions1 = lists:map(
-        fun remove_temp_match_fields/1, Subscriptions0
-    ),
-    Meta =
-        case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of
-            true ->
-                %% Fuzzy searches shouldn't return count:
-                #{
-                    limit => Limit,
-                    page => Page,
-                    hasnext => HasNext
-                };
-            false ->
-                #{
-                    count => Count,
-                    limit => Limit,
-                    page => Page,
-                    hasnext => HasNext
-                }
-        end,
-
+    Stream1 = emqx_utils_stream:filter(SubPred, emqx_utils_stream:map(SubMap, Stream0)),
+    Stream2 = emqx_utils_stream:drop(NDropped, Stream1),
+    {DSSubs, Stream} = consume_n(Limit, Stream2),
+    Subscriptions = [dssub_to_subscription(S) || S <- DSSubs],
+    %% NOTE
+    %% We have `emqx_persistent_session_ds_state:total_subscriptions_count/0` but it's
+    %% too expensive for now, because it essentially a full-scan. There is also
+    %% `emqx_persistent_session_bookkeeper:get_subscription_count/0` but it lags behind
+    %% on the other hand, and that breaks few assumptions. Thus, API clients have to do
+    %% w/o `count` here, even when there's no filtering.
+    Meta = #{
+        limit => Limit,
+        page => Page,
+        hasnext => Stream =/= []
+    },
     #{
         meta => Meta,
-        data => Subscriptions1
+        data => Subscriptions
     }.
 
-compare_optional(QField, Query, SField, Subscription) ->
+dssub_session_id({SessionID, _Topic, _Sub}) ->
+    SessionID.
+
+dssub_topic({_SessionID, #share{topic = Topic}, _Sub}) ->
+    Topic;
+dssub_topic({_SessionID, Topic, _Sub}) ->
+    Topic.
+
+dssub_group({_SessionID, #share{group = Group}, _Sub}) ->
+    Group;
+dssub_group({_SessionID, _Topic, _Sub}) ->
+    undefined.
+
+dssub_subopts({_SessionID, _Topic, Sub}) ->
+    maps:get(subopts, Sub, #{}).
+
+dssub_qos(DSSub) ->
+    maps:get(qos, dssub_subopts(DSSub), undefined).
+
+dssub_to_subscription(DSSub = {SessionID, Topic, _}) ->
+    Sub = #{
+        topic => emqx_topic:maybe_format_share(Topic),
+        clientid => SessionID,
+        node => all,
+        durable => true
+    },
+    case dssub_subopts(DSSub) of
+        #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} ->
+            Sub#{
+                qos => Qos,
+                nl => Nl,
+                rh => Rh,
+                rap => Rap
+            };
+        undefined ->
+            Sub
+    end.
+
+enrich_dssub({SessionId, Topic}) ->
+    %% TODO: Suboptimal, especially with DS-backed session storage.
+    case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of
+        Subscription = #{} ->
+            {SessionId, Topic, Subscription};
+        undefined ->
+            undefined
+    end.
+
+compare_optional(QField, Query, AccessF, DSSub) ->
     case Query of
         #{QField := Expected} ->
-            maps:get(SField, Subscription) =:= Expected;
+            AccessF(DSSub) =:= Expected;
         _ ->
             true
     end.
 
-compare_match_topic_optional(QField, Query, SField, Subscription) ->
+compare_match_topic_optional(QField, Query, AccessF, DSSub) ->
     case Query of
         #{QField := TopicFilter} ->
-            Topic = maps:get(SField, Subscription),
+            Topic = AccessF(DSSub),
             emqx_topic:match(Topic, TopicFilter);
         _ ->
             true
     end.
 
-%% @doc Drop elements from the stream until encountered N elements
-%% matching the predicate function.
--spec consume_n_matching(
-    fun((T) -> Q),
-    fun((Q) -> boolean()),
-    non_neg_integer(),
-    emqx_utils_stream:stream(T)
-) -> {[Q], emqx_utils_stream:stream(T) | empty}.
-consume_n_matching(Map, Pred, N, S) ->
-    consume_n_matching(Map, Pred, N, S, []).
-
-consume_n_matching(_Map, _Pred, _N, [], Acc) ->
+%% @doc Consume the stream until encountered N elements.
+-spec consume_n(non_neg_integer(), emqx_utils_stream:stream(T)) ->
+    {[T], emqx_utils_stream:stream(T) | []}.
+consume_n(N, S) ->
+    consume_n(N, S, []).
+
+consume_n(_N, [], Acc) ->
     {lists:reverse(Acc), []};
-consume_n_matching(_Map, _Pred, 0, S, Acc) ->
+consume_n(0, S, Acc) ->
     case emqx_utils_stream:next(S) of
         [] ->
             {lists:reverse(Acc), []};
         _ ->
             {lists:reverse(Acc), S}
     end;
-consume_n_matching(Map, Pred, N, S0, Acc) ->
+consume_n(N, S0, Acc) ->
     case emqx_utils_stream:next(S0) of
         [] ->
-            consume_n_matching(Map, Pred, N, [], Acc);
+            consume_n(N, [], Acc);
         [Elem | S] ->
-            Mapped = Map(Elem),
-            case Pred(Mapped) of
-                true -> consume_n_matching(Map, Pred, N - 1, S, [Mapped | Acc]);
-                false -> consume_n_matching(Map, Pred, N, S, Acc)
-            end
+            consume_n(N - 1, S, [Elem | Acc])
     end.
 
-persistent_route_to_subscription(#route{dest = Dest} = Route) ->
-    Sub =
-        case get_client_subscription(Route) of
-            #{subopts := SubOpts} ->
-                #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts,
-                #{
-                    topic => format_topic(Route),
-                    clientid => session_id(Dest),
-                    node => all,
-
-                    qos => Qos,
-                    nl => Nl,
-                    rh => Rh,
-                    rap => Rap,
-                    durable => true
-                };
-            undefined ->
-                #{
-                    topic => format_topic(Route),
-                    clientid => session_id(Dest),
-                    node => all,
-                    durable => true
-                }
-        end,
-    add_temp_match_fields(Route, Sub).
-
-get_client_subscription(#route{
-    topic = Topic, dest = #share_dest{session_id = SessionId, group = Group}
-}) ->
-    emqx_persistent_session_ds:get_client_subscription(SessionId, #share{
-        topic = Topic, group = Group
-    });
-get_client_subscription(#route{topic = Topic, dest = SessionId}) ->
-    emqx_persistent_session_ds:get_client_subscription(SessionId, Topic).
-
-session_id(#share_dest{session_id = SessionId}) -> SessionId;
-session_id(SessionId) -> SessionId.
-
-add_temp_match_fields(Route, Sub) ->
-    add_temp_match_fields(['_real_topic', '_group'], Route, Sub).
-
-add_temp_match_fields([], _Route, Sub) ->
-    Sub;
-add_temp_match_fields(['_real_topic' | Rest], #route{topic = Topic} = Route, Sub) ->
-    add_temp_match_fields(Rest, Route, Sub#{'_real_topic' => Topic});
-add_temp_match_fields(['_group' | Rest], #route{dest = #share_dest{group = Group}} = Route, Sub) ->
-    add_temp_match_fields(Rest, Route, Sub#{'_group' => Group});
-add_temp_match_fields(['_group' | Rest], Route, Sub) ->
-    add_temp_match_fields(Rest, Route, Sub#{'_group' => undefined}).
-
-remove_temp_match_fields(Sub) ->
-    maps:without(['_real_topic', '_group'], Sub).
-
-format_topic(#route{topic = Topic, dest = #share_dest{group = Group}}) ->
-    <<"$share/", Group/binary, "/", Topic/binary>>;
-format_topic(#route{topic = Topic}) ->
-    Topic.
-
 %% @private This function merges paginated results from two sources.
 %%
 %% Note: this implementation is far from ideal: `count' for the

+ 1 - 5
apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl

@@ -126,7 +126,6 @@ t_subscription_api(Config) ->
     Meta = maps:get(<<"meta">>, Data),
     ?assertEqual(1, maps:get(<<"page">>, Meta)),
     ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)),
-    ?assertEqual(2, maps:get(<<"count">>, Meta), Data),
     Subscriptions = maps:get(<<"data">>, Data),
     ?assertEqual(length(Subscriptions), 2),
     Sort =
@@ -185,10 +184,7 @@ t_mixed_persistent_sessions(Config) ->
             {{_, 200, _}, _, #{
                 <<"data">> := [_, _],
                 <<"meta">> :=
-                    #{
-                        <<"hasnext">> := false,
-                        <<"count">> := 2
-                    }
+                    #{<<"hasnext">> := false}
             }}},
         get_subs(#{page => "1"})
     ),