Browse Source

feat(ds): list disconnected persistent sessions in clients API

Fixes https://emqx.atlassian.net/browse/EMQX-11540

Note that not all information provided by disconnected in-memory sessions is available to
disconnected persistent sessions, nor does all of them make sense.
Thales Macedo Garitezi 2 years ago
parent
commit
3a4c7f60e2

+ 32 - 9
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -36,10 +36,20 @@
 -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
 -export([get_subscriptions/1, put_subscription/4, del_subscription/3]).
 
--export([make_session_iterator/0, session_iterator_next/2]).
+-export([
+    make_session_iterator/0,
+    session_iterator_next/2,
+    session_count/0
+]).
 
 -export_type([
-    t/0, metadata/0, subscriptions/0, seqno_type/0, stream_key/0, rank_key/0, session_iterator/0
+    t/0,
+    metadata/0,
+    subscriptions/0,
+    seqno_type/0,
+    stream_key/0,
+    rank_key/0,
+    session_iterator/0
 ]).
 
 -include("emqx_mqtt.hrl").
@@ -359,17 +369,18 @@ del_rank(Key, Rec) ->
 fold_ranks(Fun, Acc, Rec) ->
     gen_fold(ranks, Fun, Acc, Rec).
 
+-spec session_count() -> non_neg_integer().
+session_count() ->
+    %% N.B.: this is potentially costly.  Should not be called in hot paths.
+    %% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse...
+    do_session_count(make_session_iterator(), 0).
+
 -spec make_session_iterator() -> session_iterator().
 make_session_iterator() ->
-    case mnesia:dirty_first(?session_tab) of
-        '$end_of_table' ->
-            '$end_of_table';
-        Key ->
-            Key
-    end.
+    mnesia:dirty_first(?session_tab).
 
 -spec session_iterator_next(session_iterator(), pos_integer()) ->
-    {[{emqx_persistent_session_ds:id(), metadata()}], session_iterator()}.
+    {[{emqx_persistent_session_ds:id(), metadata()}], session_iterator() | '$end_of_table'}.
 session_iterator_next(Cursor, 0) ->
     {[], Cursor};
 session_iterator_next('$end_of_table', _N) ->
@@ -564,6 +575,18 @@ ro_transaction(Fun) ->
 %%     {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
 %%     Res.
 
+%%
+
+do_session_count('$end_of_table', N) ->
+    N;
+do_session_count(Cursor, N) ->
+    case session_iterator_next(Cursor, 1) of
+        {[], _} ->
+            N;
+        {_, NextCursor} ->
+            do_session_count(NextCursor, N + 1)
+    end.
+
 -compile({inline, check_sequence/1}).
 
 -ifdef(CHECK_SEQNO).

+ 23 - 4
apps/emqx_management/src/emqx_mgmt.erl

@@ -66,6 +66,9 @@
     do_kickout_clients/1
 ]).
 
+%% Internal exports
+-export([lookup_running_client/2]).
+
 %% Internal functions
 -export([do_call_client/2]).
 
@@ -314,10 +317,16 @@ nodes_info_count(PropList) ->
 %%--------------------------------------------------------------------
 
 lookup_client({clientid, ClientId}, FormatFun) ->
-    lists:append([
-        lookup_client(Node, {clientid, ClientId}, FormatFun)
-     || Node <- emqx:running_nodes()
-    ]);
+    IsPersistenceEnabled = emqx_persistent_message:is_persistence_enabled(),
+    case lookup_running_client(ClientId, FormatFun) of
+        [] when IsPersistenceEnabled ->
+            case emqx_persistent_session_ds_state:print_session(ClientId) of
+                undefined -> [];
+                Session -> [maybe_format(FormatFun, {ClientId, Session})]
+            end;
+        Res ->
+            Res
+    end;
 lookup_client({username, Username}, FormatFun) ->
     lists:append([
         lookup_client(Node, {username, Username}, FormatFun)
@@ -633,6 +642,16 @@ create_banned(Banned) ->
 delete_banned(Who) ->
     emqx_banned:delete(Who).
 
+%%--------------------------------------------------------------------
+%% Internal exports
+%%--------------------------------------------------------------------
+
+lookup_running_client(ClientId, FormatFun) ->
+    lists:append([
+        lookup_client(Node, {clientid, ClientId}, FormatFun)
+     || Node <- emqx:running_nodes()
+    ]).
+
 %%--------------------------------------------------------------------
 %% Internal Functions.
 %%--------------------------------------------------------------------

+ 7 - 1
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -39,7 +39,13 @@
     parse_pager_params/1,
     parse_qstring/2,
     init_query_result/0,
-    accumulate_query_rows/4
+    init_query_state/5,
+    reset_query_state/1,
+    accumulate_query_rows/4,
+    finalize_query/2,
+    mark_complete/2,
+    format_query_result/3,
+    maybe_collect_total_from_tail_nodes/2
 ]).
 
 -ifdef(TEST).

+ 196 - 19
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -698,26 +698,13 @@ list_clients(QString) ->
         case maps:get(<<"node">>, QString, undefined) of
             undefined ->
                 Options = #{fast_total_counting => true},
-                emqx_mgmt_api:cluster_query(
-                    ?CHAN_INFO_TAB,
-                    QString,
-                    ?CLIENT_QSCHEMA,
-                    fun ?MODULE:qs2ms/2,
-                    fun ?MODULE:format_channel_info/2,
-                    Options
-                );
+                list_clients_cluster_query(QString, Options);
             Node0 ->
                 case emqx_utils:safe_to_existing_atom(Node0) of
                     {ok, Node1} ->
-                        QStringWithoutNode = maps:without([<<"node">>], QString),
-                        emqx_mgmt_api:node_query(
-                            Node1,
-                            ?CHAN_INFO_TAB,
-                            QStringWithoutNode,
-                            ?CLIENT_QSCHEMA,
-                            fun ?MODULE:qs2ms/2,
-                            fun ?MODULE:format_channel_info/2
-                        );
+                        QStringWithoutNode = maps:remove(<<"node">>, QString),
+                        Options = #{},
+                        list_clients_node_query(Node1, QStringWithoutNode, Options);
                     {error, _} ->
                         {error, Node0, {badrpc, <<"invalid node">>}}
                 end
@@ -851,6 +838,170 @@ do_unsubscribe(ClientID, Topic) ->
             Res
     end.
 
+list_clients_cluster_query(QString, Options) ->
+    case emqx_mgmt_api:parse_pager_params(QString) of
+        false ->
+            {error, page_limit_invalid};
+        Meta = #{} ->
+            try
+                {_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA),
+                Nodes = emqx:running_nodes(),
+                ResultAcc = emqx_mgmt_api:init_query_result(),
+                QueryState = emqx_mgmt_api:init_query_state(
+                    ?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
+                ),
+                Res = do_list_clients_cluster_query(Nodes, QueryState, ResultAcc),
+                emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
+            catch
+                throw:{bad_value_type, {Key, ExpectedType, AcutalValue}} ->
+                    {error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}}
+            end
+    end.
+
+%% adapted from `emqx_mgmt_api:do_cluster_query'
+do_list_clients_cluster_query(
+    [Node | Tail] = Nodes,
+    QueryState0,
+    ResultAcc
+) ->
+    case emqx_mgmt_api:do_query(Node, QueryState0) of
+        {error, Error} ->
+            {error, Node, Error};
+        {Rows, QueryState1 = #{complete := Complete0}} ->
+            case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of
+                {enough, NResultAcc} ->
+                    %% TODO: add persistent session count?
+                    %% TODO: this may return `{error, _, _}'...
+                    QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes(
+                        Tail, QueryState1
+                    ),
+                    QueryState = add_persistent_session_count(QueryState2),
+                    Complete = Complete0 andalso Tail =:= [] andalso no_persistent_sessions(),
+                    emqx_mgmt_api:finalize_query(
+                        NResultAcc, emqx_mgmt_api:mark_complete(QueryState, Complete)
+                    );
+                {more, NResultAcc} when not Complete0 ->
+                    do_list_clients_cluster_query(Nodes, QueryState1, NResultAcc);
+                {more, NResultAcc} when Tail =/= [] ->
+                    do_list_clients_cluster_query(
+                        Tail, emqx_mgmt_api:reset_query_state(QueryState1), NResultAcc
+                    );
+                {more, NResultAcc} ->
+                    QueryState = add_persistent_session_count(QueryState1),
+                    do_persistent_session_query(NResultAcc, QueryState)
+            end
+    end.
+
+list_clients_node_query(Node, QString, Options) ->
+    case emqx_mgmt_api:parse_pager_params(QString) of
+        false ->
+            {error, page_limit_invalid};
+        Meta = #{} ->
+            {_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA),
+            ResultAcc = emqx_mgmt_api:init_query_result(),
+            QueryState = emqx_mgmt_api:init_query_state(
+                ?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
+            ),
+            Res = do_list_clients_node_query(Node, QueryState, ResultAcc),
+            emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
+    end.
+
+add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            %% TODO: currently, counting persistent sessions can be not only costly (needs
+            %% to traverse the whole table), but also hard to deduplicate live connections
+            %% from it...  So this count will possibly overshoot the true count of
+            %% sessions.
+            SessionCount = emqx_persistent_session_ds_state:session_count(),
+            Totals = Totals0#{undefined => SessionCount},
+            QueryState0#{total := Totals};
+        false ->
+            QueryState0
+    end;
+add_persistent_session_count(QueryState) ->
+    QueryState.
+
+%% adapted from `emqx_mgmt_api:do_node_query'
+do_list_clients_node_query(
+    Node,
+    QueryState,
+    ResultAcc
+) ->
+    case emqx_mgmt_api:do_query(Node, QueryState) of
+        {error, Error} ->
+            {error, Node, Error};
+        {Rows, NQueryState = #{complete := Complete}} ->
+            case emqx_mgmt_api:accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
+                {enough, NResultAcc} ->
+                    FComplete = Complete andalso no_persistent_sessions(),
+                    emqx_mgmt_api:finalize_query(
+                        NResultAcc, emqx_mgmt_api:mark_complete(NQueryState, FComplete)
+                    );
+                {more, NResultAcc} when Complete ->
+                    do_persistent_session_query(NResultAcc, NQueryState);
+                {more, NResultAcc} ->
+                    do_list_clients_node_query(Node, NQueryState, NResultAcc)
+            end
+    end.
+
+init_persistent_session_iterator() ->
+    emqx_persistent_session_ds_state:make_session_iterator().
+
+no_persistent_sessions() ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            Cursor = init_persistent_session_iterator(),
+            case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of
+                {[], _} ->
+                    true;
+                _ ->
+                    false
+            end;
+        false ->
+            true
+    end.
+
+do_persistent_session_query(ResultAcc, QueryState) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            do_persistent_session_query1(
+                ResultAcc,
+                QueryState,
+                init_persistent_session_iterator()
+            );
+        false ->
+            emqx_mgmt_api:finalize_query(ResultAcc, QueryState)
+    end.
+
+do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
+    %% Since persistent session data is accessible from all nodes, there's no need to go
+    %% through all the nodes.
+    #{limit := Limit} = QueryState,
+    {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
+    Rows = remove_live_sessions(Rows0),
+    case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
+        {enough, NResultAcc} ->
+            emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
+        {more, NResultAcc} when Iter =:= '$end_of_table' ->
+            emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
+        {more, NResultAcc} ->
+            do_persistent_session_query1(NResultAcc, QueryState, Iter)
+    end.
+
+remove_live_sessions(Rows) ->
+    lists:filtermap(
+        fun({ClientId, _Session}) ->
+            case emqx_mgmt:lookup_running_client(ClientId, _FormatFn = undefined) of
+                [] ->
+                    {true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}};
+                [_ | _] ->
+                    false
+            end
+        end,
+        Rows
+    ).
+
 %%--------------------------------------------------------------------
 %% QueryString to Match Spec
 
@@ -925,7 +1076,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} |
 %% format funcs
 
 format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) ->
-    format_channel_info(node(), ChannInfo).
+    %% channel info from ETS table (live and/or in-memory session)
+    format_channel_info(node(), ChannInfo);
+format_channel_info({ClientId, PSInfo}) ->
+    %% offline persistent session
+    format_persistent_session_info(ClientId, PSInfo).
 
 format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
     Node = maps:get(node, ClientInfo0, WhichNode),
@@ -983,7 +1138,29 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
             maps:without(RemoveList, ClientInfoMap),
             TimesKeys
         )
-    ).
+    );
+format_channel_info(undefined, {ClientId, PSInfo0 = #{}}) ->
+    format_persistent_session_info(ClientId, PSInfo0).
+
+format_persistent_session_info(ClientId, PSInfo0) ->
+    Metadata = maps:get(metadata, PSInfo0, #{}),
+    PSInfo1 = maps:with([created_at, expiry_interval], Metadata),
+    CreatedAt = maps:get(created_at, PSInfo1),
+    PSInfo2 = convert_expiry_interval_unit(PSInfo1),
+    PSInfo3 = PSInfo2#{
+        clientid => ClientId,
+        connected => false,
+        connected_at => CreatedAt,
+        ip_address => undefined,
+        is_persistent => true,
+        port => undefined
+    },
+    PSInfo = lists:foldl(
+        fun result_format_time_fun/2,
+        PSInfo3,
+        [created_at, connected_at]
+    ),
+    result_format_undefined_to_null(PSInfo).
 
 %% format func helpers
 take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->

+ 437 - 1
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -18,10 +18,27 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {group, persistent_sessions}
+        | AllTCs -- persistent_session_testcases()
+    ].
+
+groups() ->
+    [{persistent_sessions, persistent_session_testcases()}].
+
+persistent_session_testcases() ->
+    [
+        t_persistent_sessions1,
+        t_persistent_sessions2,
+        t_persistent_sessions3,
+        t_persistent_sessions4,
+        t_persistent_sessions5
+    ].
 
 init_per_suite(Config) ->
     emqx_mgmt_api_test_util:init_suite(),
@@ -30,6 +47,33 @@ init_per_suite(Config) ->
 end_per_suite(_) ->
     emqx_mgmt_api_test_util:end_suite().
 
+init_per_group(persistent_sessions, Config) ->
+    AppSpecs = [
+        {emqx, "session_persistence.enable = true"},
+        emqx_management
+    ],
+    Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
+        "dashboard.listeners.http { enable = true, bind = 18084 }"
+    ),
+    Cluster = [
+        {emqx_mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}},
+        {emqx_mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}}
+    ],
+    Nodes = emqx_cth_cluster:start(
+        Cluster,
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{nodes, Nodes} | Config];
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(persistent_sessions, Config) ->
+    Nodes = ?config(nodes, Config),
+    emqx_cth_cluster:stop(Nodes),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
 t_clients(_) ->
     process_flag(trap_exit, true),
 
@@ -171,6 +215,290 @@ t_clients(_) ->
     AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path),
     ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1).
 
+t_persistent_sessions1(Config) ->
+    [N1, _N2] = ?config(nodes, Config),
+    APIPort = 18084,
+    Port1 = get_mqtt_port(N1, tcp),
+
+    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
+
+    ?check_trace(
+        begin
+            %% Scenario 1
+            %% 1) Client connects and is listed as connected.
+            ?tp(notice, "scenario 1", #{}),
+            O = #{api_port => APIPort},
+            ClientId = <<"c1">>,
+            C1 = connect_client(#{port => Port1, clientid => ClientId}),
+            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
+            %% 2) Client disconnects and is listed as disconnected.
+            ok = emqtt:disconnect(C1),
+            assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
+            %% 3) Client reconnects and is listed as connected.
+            C2 = connect_client(#{port => Port1, clientid => ClientId}),
+            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
+            %% 4) Client disconnects.
+            ok = emqtt:stop(C2),
+            %% 5) Session is GC'ed, client is removed from list.
+            ?tp(notice, "gc", #{}),
+            %% simulate GC
+            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
+            ?retry(
+                100,
+                20,
+                ?assertMatch(
+                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
+                    list_request(APIPort)
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_persistent_sessions2(Config) ->
+    [N1, _N2] = ?config(nodes, Config),
+    APIPort = 18084,
+    Port1 = get_mqtt_port(N1, tcp),
+
+    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
+
+    ?check_trace(
+        begin
+            %% Scenario 2
+            %% 1) Client connects and is listed as connected.
+            ?tp(notice, "scenario 2", #{}),
+            O = #{api_port => APIPort},
+            ClientId = <<"c2">>,
+            C1 = connect_client(#{port => Port1, clientid => ClientId}),
+            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
+            unlink(C1),
+            %% 2) Client connects to the same node and takes over, listed only once.
+            C2 = connect_client(#{port => Port1, clientid => ClientId}),
+            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
+            ok = emqtt:stop(C2),
+            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
+            ?retry(
+                100,
+                20,
+                ?assertMatch(
+                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
+                    list_request(APIPort)
+                )
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_persistent_sessions3(Config) ->
+    [N1, N2] = ?config(nodes, Config),
+    APIPort = 18084,
+    Port1 = get_mqtt_port(N1, tcp),
+    Port2 = get_mqtt_port(N2, tcp),
+
+    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
+
+    ?check_trace(
+        begin
+            %% Scenario 3
+            %% 1) Client connects and is listed as connected.
+            ?tp(notice, "scenario 3", #{}),
+            O = #{api_port => APIPort},
+            ClientId = <<"c3">>,
+            C1 = connect_client(#{port => Port1, clientid => ClientId}),
+            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
+            unlink(C1),
+            %% 2) Client connects to *another node* and takes over, listed only once.
+            C2 = connect_client(#{port => Port2, clientid => ClientId}),
+            assert_single_client(O#{node => N2, clientid => ClientId, status => connected}),
+            %% Doesn't show up in the other node while alive
+            ?retry(
+                100,
+                20,
+                ?assertMatch(
+                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
+                    list_request(APIPort, "node=" ++ atom_to_list(N1))
+                )
+            ),
+            ok = emqtt:stop(C2),
+            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_persistent_sessions4(Config) ->
+    [N1, N2] = ?config(nodes, Config),
+    APIPort = 18084,
+    Port1 = get_mqtt_port(N1, tcp),
+    Port2 = get_mqtt_port(N2, tcp),
+
+    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
+
+    ?check_trace(
+        begin
+            %% Scenario 4
+            %% 1) Client connects and is listed as connected.
+            ?tp(notice, "scenario 4", #{}),
+            O = #{api_port => APIPort},
+            ClientId = <<"c4">>,
+            C1 = connect_client(#{port => Port1, clientid => ClientId}),
+            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
+            %% 2) Client disconnects and is listed as disconnected.
+            ok = emqtt:stop(C1),
+            %% While disconnected, shows up in both nodes.
+            assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
+            assert_single_client(O#{node => N2, clientid => ClientId, status => disconnected}),
+            %% 3) Client reconnects to *another node* and is listed as connected once.
+            C2 = connect_client(#{port => Port2, clientid => ClientId}),
+            assert_single_client(O#{node => N2, clientid => ClientId, status => connected}),
+            %% Doesn't show up in the other node while alive
+            ?retry(
+                100,
+                20,
+                ?assertMatch(
+                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
+                    list_request(APIPort, "node=" ++ atom_to_list(N1))
+                )
+            ),
+            ok = emqtt:stop(C2),
+            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_persistent_sessions5(Config) ->
+    [N1, N2] = ?config(nodes, Config),
+    APIPort = 18084,
+    Port1 = get_mqtt_port(N1, tcp),
+    Port2 = get_mqtt_port(N2, tcp),
+
+    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
+
+    ?check_trace(
+        begin
+            %% Pagination with mixed clients
+            ClientId1 = <<"c5">>,
+            ClientId2 = <<"c6">>,
+            ClientId3 = <<"c7">>,
+            ClientId4 = <<"c8">>,
+            %% persistent
+            C1 = connect_client(#{port => Port1, clientid => ClientId1}),
+            C2 = connect_client(#{port => Port2, clientid => ClientId2}),
+            %% in-memory
+            C3 = connect_client(#{
+                port => Port1, clientid => ClientId3, expiry => 0, clean_start => true
+            }),
+            C4 = connect_client(#{
+                port => Port2, clientid => ClientId4, expiry => 0, clean_start => true
+            }),
+
+            P1 = list_request(APIPort, "limit=3&page=1"),
+            P2 = list_request(APIPort, "limit=3&page=2"),
+            ?assertMatch(
+                {ok,
+                    {{_, 200, _}, _, #{
+                        <<"data">> := [_, _, _],
+                        <<"meta">> := #{
+                            %% TODO: if/when we fix the persistent session count, this
+                            %% should be 4.
+                            <<"count">> := 6,
+                            <<"hasnext">> := true
+                        }
+                    }}},
+                P1
+            ),
+            ?assertMatch(
+                {ok,
+                    {{_, 200, _}, _, #{
+                        <<"data">> := [_],
+                        <<"meta">> := #{
+                            %% TODO: if/when we fix the persistent session count, this
+                            %% should be 4.
+                            <<"count">> := 6,
+                            <<"hasnext">> := false
+                        }
+                    }}},
+                P2
+            ),
+            {ok, {_, _, #{<<"data">> := R1}}} = P1,
+            {ok, {_, _, #{<<"data">> := R2}}} = P2,
+            ?assertEqual(
+                lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]),
+                lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R1 ++ R2))
+            ),
+            ?assertMatch(
+                {ok,
+                    {{_, 200, _}, _, #{
+                        <<"data">> := [_, _],
+                        <<"meta">> := #{
+                            %% TODO: if/when we fix the persistent session count, this
+                            %% should be 4.
+                            <<"count">> := 6,
+                            <<"hasnext">> := true
+                        }
+                    }}},
+                list_request(APIPort, "limit=2&page=1")
+            ),
+            %% Disconnect persistent sessions
+            lists:foreach(fun emqtt:stop/1, [C1, C2]),
+
+            P3 =
+                ?retry(200, 10, begin
+                    P3_ = list_request(APIPort, "limit=3&page=1"),
+                    ?assertMatch(
+                        {ok,
+                            {{_, 200, _}, _, #{
+                                <<"data">> := [_, _, _],
+                                <<"meta">> := #{
+                                    <<"count">> := 4,
+                                    <<"hasnext">> := true
+                                }
+                            }}},
+                        P3_
+                    ),
+                    P3_
+                end),
+            P4 =
+                ?retry(200, 10, begin
+                    P4_ = list_request(APIPort, "limit=3&page=2"),
+                    ?assertMatch(
+                        {ok,
+                            {{_, 200, _}, _, #{
+                                <<"data">> := [_],
+                                <<"meta">> := #{
+                                    <<"count">> := 4,
+                                    <<"hasnext">> := false
+                                }
+                            }}},
+                        P4_
+                    ),
+                    P4_
+                end),
+            {ok, {_, _, #{<<"data">> := R3}}} = P3,
+            {ok, {_, _, #{<<"data">> := R4}}} = P4,
+            ?assertEqual(
+                lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]),
+                lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R3 ++ R4))
+            ),
+
+            lists:foreach(fun emqtt:stop/1, [C3, C4]),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_clients_bad_value_type(_) ->
     %% get /clients
     AuthHeader = [emqx_common_test_http:default_auth_header()],
@@ -442,3 +770,111 @@ time_string_to_epoch(DateTime, Unit) when is_binary(DateTime) ->
                 binary_to_list(DateTime), [{unit, Unit}]
             )
     end.
+
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
+    Port.
+
+request(Method, Path, Params) ->
+    request(Method, Path, Params, _QueryParams = "").
+
+request(Method, Path, Params, QueryParams) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of
+        {ok, {Status, Headers, Body0}} ->
+            Body = maybe_json_decode(Body0),
+            {ok, {Status, Headers, Body}};
+        {error, {Status, Headers, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+                        Msg = maybe_json_decode(Msg0),
+                        Decoded0#{<<"message">> := Msg};
+                    {ok, Decoded0} ->
+                        Decoded0;
+                    {error, _} ->
+                        Body0
+                end,
+            {error, {Status, Headers, Body}};
+        Error ->
+            Error
+    end.
+
+maybe_json_decode(X) ->
+    case emqx_utils_json:safe_decode(X, [return_maps]) of
+        {ok, Decoded} -> Decoded;
+        {error, _} -> X
+    end.
+
+list_request(Port) ->
+    list_request(Port, _QueryParams = "").
+
+list_request(Port, QueryParams) ->
+    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]),
+    request(get, Path, [], QueryParams).
+
+lookup_request(ClientId) ->
+    lookup_request(ClientId, 18083).
+
+lookup_request(ClientId, Port) ->
+    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
+    Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
+    request(get, Path, []).
+
+assert_single_client(Opts) ->
+    #{
+        api_port := APIPort,
+        clientid := ClientId,
+        node := Node,
+        status := Connected
+    } = Opts,
+    IsConnected =
+        case Connected of
+            connected -> true;
+            disconnected -> false
+        end,
+    ?retry(
+        100,
+        20,
+        ?assertMatch(
+            {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
+            list_request(APIPort)
+        )
+    ),
+    ?retry(
+        100,
+        20,
+        ?assertMatch(
+            {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
+            list_request(APIPort, "node=" ++ atom_to_list(Node)),
+            #{node => Node}
+        )
+    ),
+    ?assertMatch(
+        {ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}},
+        lookup_request(ClientId, APIPort)
+    ),
+    ok.
+
+connect_client(Opts) ->
+    Defaults = #{
+        expiry => 30,
+        clean_start => false
+    },
+    #{
+        port := Port,
+        clientid := ClientId,
+        clean_start := CleanStart,
+        expiry := EI
+    } = maps:merge(Defaults, Opts),
+    {ok, C} = emqtt:start_link([
+        {port, Port},
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {clean_start, CleanStart},
+        {properties, #{'Session-Expiry-Interval' => EI}}
+    ]),
+    {ok, _} = emqtt:connect(C),
+    C.

+ 3 - 0
changes/ce/fix-12500.en.md

@@ -0,0 +1,3 @@
+Now disconnected persistent sessions are returned in the `GET /clients` and `GET /client/:clientid` HTTP APIs.
+
+Known issue: the total count returned by this API may overestimate the total number of clients.