Browse Source

feat(sessds): Expose subscriptions in the REST API

ieQu1 2 years ago
parent
commit
98d1094d73

+ 3 - 1
apps/emqx/src/emqx_channel.erl

@@ -193,7 +193,9 @@ info(alias_maximum, #channel{alias_maximum = Limits}) ->
 info(timers, #channel{timers = Timers}) ->
     Timers;
 info(session_state, #channel{session = Session}) ->
-    Session.
+    Session;
+info(impl, #channel{session = Session}) ->
+    emqx_session:info(impl, Session).
 
 set_conn_state(ConnState, Channel) ->
     Channel#channel{conn_state = ConnState}.

+ 81 - 10
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -66,6 +66,11 @@
     terminate/2
 ]).
 
+%% Managment APIs:
+-export([
+    list_client_subscriptions/1
+]).
+
 %% session table operations
 -export([create_tables/0, sync/1]).
 
@@ -243,18 +248,25 @@ info(await_rel_timeout, #{props := Conf}) ->
 stats(Session) ->
     info(?STATS_KEYS, Session).
 
-%% Debug/troubleshooting
+%% Used by management API
 -spec print_session(emqx_types:clientid()) -> map() | undefined.
 print_session(ClientId) ->
-    case emqx_cm:lookup_channels(ClientId) of
-        [Pid] ->
-            #{channel := ChanState} = emqx_connection:get_state(Pid),
-            SessionState = emqx_channel:info(session_state, ChanState),
-            maps:update_with(s, fun emqx_persistent_session_ds_state:format/1, SessionState#{
-                '_alive' => {true, Pid}
-            });
-        [] ->
-            emqx_persistent_session_ds_state:print_session(ClientId)
+    case try_get_live_session(ClientId) of
+        {Pid, SessionState} ->
+            maps:update_with(
+                s, fun emqx_persistent_session_ds_state:format/1, SessionState#{
+                    '_alive' => {true, Pid}
+                }
+            );
+        not_found ->
+            case emqx_persistent_session_ds_state:print_session(ClientId) of
+                undefined ->
+                    undefined;
+                S ->
+                    #{s => S, '_alive' => false}
+            end;
+        not_persistent ->
+            undefined
     end.
 
 %%--------------------------------------------------------------------
@@ -529,6 +541,44 @@ terminate(_Reason, _Session = #{id := Id, s := S}) ->
     ?tp(debug, persistent_session_ds_terminate, #{id => Id}),
     ok.
 
+%%--------------------------------------------------------------------
+%% Management APIs (dashboard)
+%%--------------------------------------------------------------------
+
+-spec list_client_subscriptions(emqx_types:clientid()) ->
+    {node() | undefined, [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}]}
+    | {error, not_found}.
+list_client_subscriptions(ClientId) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            %% TODO: this is not the most optimal implementation, since it
+            %% should be possible to avoid reading extra data (streams, etc.)
+            case print_session(ClientId) of
+                Sess = #{s := #{subscriptions := Subs}} ->
+                    Node =
+                        case Sess of
+                            #{'_alive' := {true, Pid}} ->
+                                node(Pid);
+                            _ ->
+                                undefined
+                        end,
+                    SubList =
+                        maps:fold(
+                            fun(Topic, #{props := SubProps}, Acc) ->
+                                Elem = {Topic, SubProps},
+                                [Elem | Acc]
+                            end,
+                            [],
+                            Subs
+                        ),
+                    {Node, SubList};
+                undefined ->
+                    {error, not_found}
+            end;
+        false ->
+            {error, not_found}
+    end.
+
 %%--------------------------------------------------------------------
 %% Session tables operations
 %%--------------------------------------------------------------------
@@ -899,6 +949,27 @@ expiry_interval(ConnInfo) ->
 bump_interval() ->
     emqx_config:get([session_persistence, last_alive_update_interval]).
 
+-spec try_get_live_session(emqx_types:clientid()) ->
+    {pid(), session()} | not_found | not_persistent.
+try_get_live_session(ClientId) ->
+    case emqx_cm:lookup_channels(local, ClientId) of
+        [Pid] ->
+            try
+                #{channel := ChanState} = emqx_connection:get_state(Pid),
+                case emqx_channel:info(impl, ChanState) of
+                    ?MODULE ->
+                        {Pid, emqx_channel:info(session_state, ChanState)};
+                    _ ->
+                        not_persistent
+                end
+            catch
+                _:_ ->
+                    not_found
+            end;
+        _ ->
+            not_found
+    end.
+
 %%--------------------------------------------------------------------
 %% SeqNo tracking
 %% --------------------------------------------------------------------

+ 3 - 1
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -181,7 +181,9 @@ format(#{
     ranks := Ranks
 }) ->
     Subs = emqx_topic_gbt:fold(
-        fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end,
+        fun(Key, Sub, Acc) ->
+            maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc)
+        end,
         #{},
         SubsGBT
     ),

+ 2 - 2
apps/emqx/test/emqx_cth_suite.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@
 %%
 %% Most of the time, you just need to:
 %% 1. Describe the appspecs for the applications you want to test.
-%% 2. Call `emqx_cth_sutie:start/2` to start the applications before the testrun
+%% 2. Call `emqx_cth_suite:start/2` to start the applications before the testrun
 %%    (e.g. in `init_per_suite/1` / `init_per_group/2`), providing the appspecs
 %%    and unique work dir for the testrun (e.g. `work_dir/1`). Save the result
 %%    in a context.

+ 0 - 8
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -1092,14 +1092,6 @@ get_msgs_essentials(Msgs) ->
 pick_respective_msgs(MsgRefs, Msgs) ->
     [M || M <- Msgs, Ref <- MsgRefs, maps:get(packet_id, M) =:= maps:get(packet_id, Ref)].
 
-skip_ds_tc(Config) ->
-    case ?config(persistence, Config) of
-        ds ->
-            {skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"};
-        _ ->
-            Config
-    end.
-
 debug_info(ClientId) ->
     Info = emqx_persistent_session_ds:print_session(ClientId),
     ct:pal("*** State:~n~p", [Info]).

+ 10 - 1
apps/emqx_management/src/emqx_mgmt.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -380,6 +380,15 @@ list_authz_cache(ClientId) ->
     call_client(ClientId, list_authz_cache).
 
 list_client_subscriptions(ClientId) ->
+    case emqx_persistent_session_ds:list_client_subscriptions(ClientId) of
+        {error, not_found} ->
+            list_client_subscriptions_mem(ClientId);
+        Result ->
+            Result
+    end.
+
+%% List subscriptions of an in-memory session:
+list_client_subscriptions_mem(ClientId) ->
     case lookup_client({clientid, ClientId}, undefined) of
         [] ->
             {error, not_found};

+ 81 - 4
apps/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -26,14 +26,56 @@
 -define(FORMATFUN, {?MODULE, ident}).
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        {group, persistence_disabled},
+        {group, persistence_enabled}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {persistence_disabled, [], TCs},
+        {persistence_enabled, [], [t_persist_list_subs]}
+    ].
+
+init_per_group(persistence_disabled, Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, "session_persistence { enable = false }"},
+            emqx_management
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [
+        {apps, Apps}
+        | Config
+    ];
+init_per_group(persistence_enabled, Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx,
+                "session_persistence {\n"
+                "  enable = true\n"
+                "  last_alive_update_interval = 100ms\n"
+                "  renew_streams_interval = 100ms\n"
+                "}"},
+            emqx_management
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [
+        {apps, Apps}
+        | Config
+    ].
+
+end_per_group(_Grp, Config) ->
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
     Config.
 
 end_per_suite(_) ->
-    emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
+    ok.
 
 init_per_testcase(TestCase, Config) ->
     meck:expect(emqx, running_nodes, 0, [node()]),
@@ -370,6 +412,41 @@ t_banned(_) ->
         emqx_mgmt:delete_banned({clientid, <<"TestClient">>})
     ).
 
+%% This testcase verifies the behavior of various read-only functions
+%% used by REST API via `emqx_mgmt' module:
+t_persist_list_subs(_) ->
+    ClientId = <<"persistent_client">>,
+    Topics = lists:sort([<<"foo/bar">>, <<"/a/+//+/#">>, <<"foo">>]),
+    VerifySubs =
+        fun() ->
+            {Node, Ret} = emqx_mgmt:list_client_subscriptions(ClientId),
+            ?assert(Node =:= node() orelse Node =:= undefined, Node),
+            {TopicsL, SubProps} = lists:unzip(Ret),
+            ?assertEqual(Topics, lists:sort(TopicsL)),
+            [?assertMatch(#{rh := _, rap := _, nl := _, qos := _}, I) || I <- SubProps]
+        end,
+    %% 0. Verify that management functions work for missing clients:
+    ?assertMatch(
+        {error, not_found},
+        emqx_mgmt:list_client_subscriptions(ClientId)
+    ),
+    %% 1. Connect the client and subscribe to topics:
+    {ok, Client} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {properties, #{'Session-Expiry-Interval' => 30}}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    [{ok, _, _} = emqtt:subscribe(Client, I, qos2) || I <- Topics],
+    %% 2. Verify that management functions work for the connected
+    %% clients:
+    VerifySubs(),
+    %% 3. Disconnect the client:
+    emqtt:disconnect(Client),
+    %% 4. Verify that management functions work for the offline
+    %% clients:
+    VerifySubs().
+
 %%% helpers
 ident(Arg) ->
     Arg.