Преглед изворни кода

fix(api-topics): expose persistent session topics in APIs

Andrew Mayorov пре 1 година
родитељ
комит
985f1c4879

+ 45 - 0
apps/emqx/src/emqx_persistent_session_ds_router.erl

@@ -32,6 +32,9 @@
     foldl_routes/2
 ]).
 
+%% Topics API
+-export([stream/1]).
+
 -export([cleanup_routes/1]).
 -export([print_routes/1]).
 -export([topics/0]).
@@ -196,6 +199,15 @@ foldl_routes(FoldFun, AccIn) ->
 foldr_routes(FoldFun, AccIn) ->
     fold_routes(foldr, FoldFun, AccIn).
 
+%%--------------------------------------------------------------------
+%% Topic API
+%%--------------------------------------------------------------------
+
+-spec stream(_MTopic :: '_' | emqx_types:topic()) ->
+    emqx_utils_stream:stream(emqx_types:topic()).
+stream(MTopic) ->
+    emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)).
+
 %%--------------------------------------------------------------------
 %% Internal fns
 %%--------------------------------------------------------------------
@@ -225,6 +237,12 @@ get_dest_session_id({_, DSSessionId}) ->
 get_dest_session_id(DSSessionId) ->
     DSSessionId.
 
+export_route(#ps_route{topic = Topic, dest = Dest}) ->
+    #route{topic = Topic, dest = Dest}.
+
+export_routeidx(#ps_routeidx{entry = M}) ->
+    #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
+
 match_to_route(M) ->
     #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
 
@@ -242,3 +260,30 @@ list_route_tab_topics() ->
 
 mria_route_tab_delete(Route) ->
     mria:dirty_delete_object(?PS_ROUTER_TAB, Route).
+
+stream(Tab = ?PS_ROUTER_TAB, MTopic) ->
+    case MTopic == '_' orelse not emqx_topic:wildcard(MTopic) of
+        true ->
+            MatchSpec = #ps_route{topic = MTopic, _ = '_'},
+            mk_tab_stream(Tab, MatchSpec, fun export_route/1);
+        false ->
+            emqx_utils_stream:empty()
+    end;
+stream(Tab = ?PS_FILTERS_TAB, MTopic) ->
+    case MTopic == '_' orelse emqx_topic:wildcard(MTopic) of
+        true ->
+            MatchSpec = #ps_routeidx{entry = emqx_trie_search:make_pat(MTopic, '_'), _ = '_'},
+            mk_tab_stream(Tab, MatchSpec, fun export_routeidx/1);
+        false ->
+            emqx_utils_stream:empty()
+    end.
+
+mk_tab_stream(Tab, MatchSpec, Mapper) ->
+    %% NOTE: Currently relying on the fact that tables are backed by ETSes.
+    emqx_utils_stream:map(
+        Mapper,
+        emqx_utils_stream:ets(fun
+            (undefined) -> ets:match_object(Tab, MatchSpec, 1);
+            (Cont) -> ets:match_object(Cont)
+        end)
+    ).

+ 23 - 37
apps/emqx/src/emqx_router.erl

@@ -58,7 +58,7 @@
 ]).
 
 %% Topics API
--export([select/3]).
+-export([stream/1]).
 
 -export([print_routes/1]).
 
@@ -266,18 +266,15 @@ mria_batch_v1(Batch) ->
 batch_get_action(Op) ->
     element(1, Op).
 
--spec select(Spec, _Limit :: pos_integer(), Continuation) ->
-    {[emqx_types:route()], Continuation} | '$end_of_table'
-when
-    Spec :: {_TopicPat, _DestPat},
-    Continuation :: term() | '$end_of_table'.
-select(MatchSpec, Limit, Cont) ->
-    select(get_schema_vsn(), MatchSpec, Limit, Cont).
+-spec stream(_Spec :: {_TopicPat, _DestPat}) ->
+    emqx_utils_stream:stream(emqx_types:route()).
+stream(MatchSpec) ->
+    stream(get_schema_vsn(), MatchSpec).
 
-select(v2, MatchSpec, Limit, Cont) ->
-    select_v2(MatchSpec, Limit, Cont);
-select(v1, MatchSpec, Limit, Cont) ->
-    select_v1(MatchSpec, Limit, Cont).
+stream(v2, MatchSpec) ->
+    stream_v2(MatchSpec);
+stream(v1, MatchSpec) ->
+    stream_v1(MatchSpec).
 
 -spec topics() -> list(emqx_types:topic()).
 topics() ->
@@ -452,10 +449,8 @@ cleanup_routes_v1_fallback(Node) ->
         ]
     end).
 
-select_v1({MTopic, MDest}, Limit, undefined) ->
-    ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit);
-select_v1(_Spec, _Limit, Cont) ->
-    ets:select(Cont).
+stream_v1(Spec) ->
+    mk_route_stream(?ROUTE_TAB, Spec).
 
 list_topics_v1() ->
     list_route_tab_topics().
@@ -591,36 +586,27 @@ make_route_rec_pat(DestPattern) ->
         [{1, route}, {#route.dest, DestPattern}]
     ).
 
-select_v2(Spec, Limit, undefined) ->
-    Stream = mk_route_stream(Spec),
-    select_next(Limit, Stream);
-select_v2(_Spec, Limit, Stream) ->
-    select_next(Limit, Stream).
-
-select_next(N, Stream) ->
-    case emqx_utils_stream:consume(N, Stream) of
-        {Routes, SRest} ->
-            {Routes, SRest};
-        Routes ->
-            {Routes, '$end_of_table'}
-    end.
-
-mk_route_stream(Spec) ->
+stream_v2(Spec) ->
     emqx_utils_stream:chain(
-        mk_route_stream(route, Spec),
-        mk_route_stream(filter, Spec)
+        mk_route_stream(?ROUTE_TAB, Spec),
+        mk_route_stream(?ROUTE_TAB_FILTERS, Spec)
     ).
 
-mk_route_stream(route, Spec) ->
-    emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end);
-mk_route_stream(filter, {MTopic, MDest}) ->
+mk_route_stream(Tab = ?ROUTE_TAB, {MTopic, MDest}) ->
+    emqx_utils_stream:ets(fun
+        (undefined) ->
+            ets:match_object(Tab, #route{topic = MTopic, dest = MDest}, 1);
+        (Cont) ->
+            ets:match_object(Cont)
+    end);
+mk_route_stream(Tab = ?ROUTE_TAB_FILTERS, {MTopic, MDest}) ->
     emqx_utils_stream:map(
         fun routeidx_to_route/1,
         emqx_utils_stream:ets(
             fun
                 (undefined) ->
                     MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)},
-                    ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1);
+                    ets:match_object(Tab, MatchSpec, 1);
                 (Cont) ->
                     ets:match_object(Cont)
             end

+ 33 - 20
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -96,6 +96,11 @@ fields(topic) ->
             hoconsc:mk(binary(), #{
                 desc => <<"Node">>,
                 required => true
+            })},
+        {session,
+            hoconsc:mk(binary(), #{
+                desc => <<"Session ID">>,
+                required => false
             })}
     ].
 
@@ -113,8 +118,8 @@ do_list(Params) ->
     try
         Pager = parse_pager_params(Params),
         {_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
-        QState = Pager#{continuation => undefined},
-        QResult = eval_topic_query(qs2ms(Query), QState),
+        Stream = mk_topic_stream(qs2ms(Query)),
+        QResult = eval_topic_query(Stream, Pager, emqx_mgmt_api:init_query_result()),
         {200, format_list_response(Pager, Query, QResult)}
     catch
         throw:{error, page_limit_invalid} ->
@@ -160,31 +165,37 @@ gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) ->
 gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) ->
     {MTopic, QNode}.
 
-eval_topic_query(MS, QState) ->
-    finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())).
+mk_topic_stream(Spec = {MTopic, _MDest = '_'}) ->
+    emqx_utils_stream:chain(emqx_router:stream(Spec), mk_persistent_topic_stream(MTopic));
+mk_topic_stream(Spec) ->
+    %% NOTE: Assuming that no persistent topic ever matches a query with `node` filter.
+    emqx_router:stream(Spec).
 
-eval_topic_query(MS, QState, QResult) ->
-    case eval_topic_query_page(MS, QState) of
-        {Rows, '$end_of_table'} ->
-            {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
-            NQResult#{complete => true};
-        {Rows, NCont} ->
+mk_persistent_topic_stream(Spec) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            emqx_persistent_session_ds_router:stream(Spec);
+        false ->
+            emqx_utils_stream:empty()
+    end.
+
+eval_topic_query(Stream, QState = #{limit := Limit}, QResult) ->
+    case emqx_utils_stream:consume(Limit, Stream) of
+        {Rows, NStream} ->
             case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of
                 {more, NQResult} ->
-                    eval_topic_query(MS, QState#{continuation := NCont}, NQResult);
+                    eval_topic_query(NStream, QState, NQResult);
                 {enough, NQResult} ->
-                    NQResult#{complete => false}
+                    finalize_query(false, NQResult)
             end;
-        '$end_of_table' ->
-            QResult#{complete => true}
+        Rows when is_list(Rows) ->
+            {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
+            finalize_query(true, NQResult)
     end.
 
-eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) ->
-    emqx_router:select(MS, Limit, Cont).
-
-finalize_query(QResult = #{overflow := Overflow, complete := Complete}) ->
+finalize_query(Complete, QResult = #{overflow := Overflow}) ->
     HasNext = Overflow orelse not Complete,
-    QResult#{hasnext => HasNext}.
+    QResult#{complete => Complete, hasnext => HasNext}.
 
 format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
     #{
@@ -205,7 +216,9 @@ format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
 format(#route{topic = Topic, dest = {Group, Node}}) ->
     #{topic => ?SHARE(Group, Topic), node => Node};
 format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
-    #{topic => Topic, node => Node}.
+    #{topic => Topic, node => Node};
+format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
+    #{topic => Topic, session => SessionId}.
 
 topic_param(In) ->
     {

+ 74 - 5
apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl

@@ -27,7 +27,7 @@ all() ->
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
         [
-            emqx,
+            {emqx, "session_persistence.enable = true"},
             emqx_management,
             emqx_mgmt_api_test_util:emqx_dashboard()
         ],
@@ -204,13 +204,82 @@ t_shared_topics_invalid(_Config) ->
         emqx_utils_json:decode(Body, [return_maps])
     ).
 
+t_persistent_topics(_Config) ->
+    PersistentOpts = #{
+        proto_ver => v5,
+        properties => #{'Session-Expiry-Interval' => 300}
+    },
+    Client1 = client(t_persistent_topics_m1),
+    Client2 = client(t_persistent_topics_m2),
+    SessionId1 = <<"t_persistent_topics_p1">>,
+    SessionId2 = <<"t_persistent_topics_p2">>,
+    ClientPersistent1 = client(SessionId1, PersistentOpts),
+    ClientPersistent2 = client(SessionId2, PersistentOpts),
+    _ = [
+        ?assertMatch({ok, _, _}, emqtt:subscribe(Client, Topic))
+     || {Client, Topics} <- [
+            {Client1, [<<"t/client/mem">>, <<"t/+">>]},
+            {Client2, [<<"t/client/mem">>, <<"t/+">>]},
+            {ClientPersistent1, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]},
+            {ClientPersistent2, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]}
+        ],
+        Topic <- Topics
+    ],
+    Matched = request_json(get, ["topics"]),
+    ?assertMatch(
+        #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 8},
+        maps:get(<<"meta">>, Matched)
+    ),
+    %% Get back both topics for both persistent and in-memory subscriptions.
+    Expected = [
+        #{<<"topic">> => <<"t/+">>, <<"node">> => atom_to_binary(node())},
+        #{<<"topic">> => <<"t/+">>, <<"session">> => SessionId1},
+        #{<<"topic">> => <<"t/+">>, <<"session">> => SessionId2},
+        #{<<"topic">> => <<"t/client/mem">>, <<"node">> => atom_to_binary(node())},
+        #{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId1},
+        #{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId2},
+        #{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId1},
+        #{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId2}
+    ],
+    ?assertEqual(
+        lists:sort(Expected),
+        lists:sort(maps:get(<<"data">>, Matched))
+    ),
+    %% Are results the same when paginating?
+    #{<<"data">> := Page1} = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]),
+    #{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]),
+    #{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]),
+    ?assertEqual(
+        lists:sort(Expected),
+        lists:sort(Page1 ++ Page2 ++ Page3)
+    ),
+    %% Filtering by node makes no sense for persistent sessions.
+    ?assertMatch(
+        #{
+            <<"data">> := [
+                #{<<"topic">> := <<"t/client/mem">>, <<"node">> := _},
+                #{<<"topic">> := <<"t/+">>, <<"node">> := _}
+            ],
+            <<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 2}
+        },
+        request_json(get, ["topics"], [{"node", atom_to_list(node())}])
+    ).
+
 %% Utilities
 
 client(Name) ->
-    {ok, Client} = emqtt:start_link(#{
-        username => emqx_utils_conv:bin(Name),
-        clientid => emqx_utils_conv:bin(Name)
-    }),
+    client(Name, #{}).
+
+client(Name, Overrides) ->
+    {ok, Client} = emqtt:start_link(
+        maps:merge(
+            #{
+                username => emqx_utils_conv:bin(Name),
+                clientid => emqx_utils_conv:bin(Name)
+            },
+            Overrides
+        )
+    ),
     {ok, _} = emqtt:connect(Client),
     Client.