Преглед изворни кода

Merge pull request #12561 from SergeTupchiy/EMQX-11861-client-mqueue-inflight-API

feat: add client mqueue/inflight messages API
SergeTupchiy пре 1 година
родитељ
комит
1f38813cb9

+ 4 - 0
apps/emqx/src/emqx_channel.erl

@@ -1210,6 +1210,10 @@ handle_call(
     ChanInfo1 = info(NChannel),
     emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
     reply(ok, reset_timer(keepalive, NChannel));
+handle_call({Type, _Meta} = MsgsReq, Channel = #channel{session = Session}) when
+    Type =:= mqueue_msgs; Type =:= inflight_msgs
+->
+    {reply, emqx_session:info(MsgsReq, Session), Channel};
 handle_call(Req, Channel) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     reply(ignored, Channel).

+ 46 - 1
apps/emqx/src/emqx_inflight.erl

@@ -36,7 +36,8 @@
     max_size/1,
     is_full/1,
     is_empty/1,
-    window/1
+    window/1,
+    query/2
 ]).
 
 -export_type([inflight/0]).
@@ -138,3 +139,47 @@ size(?INFLIGHT(Tree)) ->
 -spec max_size(inflight()) -> non_neg_integer().
 max_size(?INFLIGHT(MaxSize, _Tree)) ->
     MaxSize.
+
+-spec query(inflight(), #{continuation => Cont, limit := L}) ->
+    {[{key(), term()}], #{continuation := Cont, count := C}}
+when
+    Cont :: none | end_of_data | key(),
+    L :: non_neg_integer(),
+    C :: non_neg_integer().
+query(?INFLIGHT(Tree), #{limit := Limit} = Pager) ->
+    Count = gb_trees:size(Tree),
+    ContKey = maps:get(continuation, Pager, none),
+    {List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit),
+    {List, #{continuation => NextCont, count => Count}}.
+
+iterator_from(none, Tree) ->
+    gb_trees:iterator(Tree);
+iterator_from(ContKey, Tree) ->
+    It = gb_trees:iterator_from(ContKey, Tree),
+    case gb_trees:next(It) of
+        {ContKey, _Val, ItNext} -> ItNext;
+        _ -> It
+    end.
+
+sublist(_It, 0) ->
+    {[], none};
+sublist(It, Len) ->
+    {ListAcc, HasNext} = sublist(It, Len, []),
+    {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
+
+sublist(It, 0, Acc) ->
+    {Acc, gb_trees:next(It) =/= none};
+sublist(It, Len, Acc) ->
+    case gb_trees:next(It) of
+        none ->
+            {Acc, false};
+        {Key, Val, ItNext} ->
+            sublist(ItNext, Len - 1, [{Key, Val} | Acc])
+    end.
+
+next_cont(_Acc, false) ->
+    end_of_data;
+next_cont([{LastKey, _LastVal} | _Acc], _HasNext) ->
+    LastKey;
+next_cont([], _HasNext) ->
+    end_of_data.

+ 51 - 1
apps/emqx/src/emqx_mqueue.erl

@@ -68,7 +68,8 @@
     stats/1,
     dropped/1,
     to_list/1,
-    filter/2
+    filter/2,
+    query/2
 ]).
 
 -define(NO_PRIORITY_TABLE, disabled).
@@ -171,6 +172,55 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
             MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
     end.
 
+-spec query(mqueue(), #{continuation => ContMsgId, limit := L}) ->
+    {[message()], #{continuation := ContMsgId, count := C}}
+when
+    ContMsgId :: none | end_of_data | binary(),
+    C :: non_neg_integer(),
+    L :: non_neg_integer().
+query(MQ, #{limit := Limit} = Pager) ->
+    ContMsgId = maps:get(continuation, Pager, none),
+    {List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit),
+    {List, #{continuation => NextCont, count => len(MQ)}}.
+
+skip_until(MQ, none = _MsgId) ->
+    MQ;
+skip_until(MQ, MsgId) ->
+    do_skip_until(MQ, MsgId).
+
+do_skip_until(MQ, MsgId) ->
+    case out(MQ) of
+        {empty, MQ} ->
+            MQ;
+        {{value, #message{id = MsgId}}, Q1} ->
+            Q1;
+        {{value, _Msg}, Q1} ->
+            do_skip_until(Q1, MsgId)
+    end.
+
+sublist(_MQ, 0) ->
+    {[], none};
+sublist(MQ, Len) ->
+    {ListAcc, HasNext} = sublist(MQ, Len, []),
+    {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
+
+sublist(MQ, 0, Acc) ->
+    {Acc, element(1, out(MQ)) =/= empty};
+sublist(MQ, Len, Acc) ->
+    case out(MQ) of
+        {empty, _MQ} ->
+            {Acc, false};
+        {{value, Msg}, Q1} ->
+            sublist(Q1, Len - 1, [Msg | Acc])
+    end.
+
+next_cont(_Acc, false) ->
+    end_of_data;
+next_cont([#message{id = Id} | _Acc], _HasNext) ->
+    Id;
+next_cont([], _HasNext) ->
+    end_of_data.
+
 to_list(MQ, Acc) ->
     case out(MQ) of
         {empty, _MQ} ->

+ 1 - 1
apps/emqx/src/emqx_session.erl

@@ -527,7 +527,7 @@ info(Session) ->
 
 -spec info
     ([atom()], t()) -> [{atom(), _Value}];
-    (atom(), t()) -> _Value.
+    (atom() | {atom(), _Meta}, t()) -> _Value.
 info(Keys, Session) when is_list(Keys) ->
     [{Key, info(Key, Session)} || Key <- Keys];
 info(impl, Session) ->

+ 5 - 0
apps/emqx/src/emqx_session_mem.erl

@@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
     emqx_inflight:size(Inflight);
 info(inflight_max, #session{inflight = Inflight}) ->
     emqx_inflight:max_size(Inflight);
+info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) ->
+    {InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams),
+    {[I#inflight_data.message || {_, I} <- InflightList], Meta};
 info(retry_interval, #session{retry_interval = Interval}) ->
     Interval;
 info(mqueue, #session{mqueue = MQueue}) ->
@@ -278,6 +281,8 @@ info(mqueue_max, #session{mqueue = MQueue}) ->
     emqx_mqueue:max_len(MQueue);
 info(mqueue_dropped, #session{mqueue = MQueue}) ->
     emqx_mqueue:dropped(MQueue);
+info({mqueue_msgs, PagerParams}, #session{mqueue = MQueue}) ->
+    emqx_mqueue:query(MQueue, PagerParams);
 info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
     PacketId;
 info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->

+ 80 - 2
apps/emqx/test/emqx_inflight_SUITE.erl

@@ -116,5 +116,83 @@ t_window(_) ->
     ),
     ?assertEqual([a, b], emqx_inflight:window(Inflight)).
 
-% t_to_list(_) ->
-%     error('TODO').
+t_to_list(_) ->
+    Inflight = lists:foldl(
+        fun(Seq, InflightAcc) ->
+            emqx_inflight:insert(Seq, integer_to_binary(Seq), InflightAcc)
+        end,
+        emqx_inflight:new(100),
+        [1, 6, 2, 3, 10, 7, 9, 8, 4, 5]
+    ),
+    ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)],
+    ?assertEqual(ExpList, emqx_inflight:to_list(Inflight)).
+
+t_query(_) ->
+    EmptyInflight = emqx_inflight:new(500),
+    ?assertMatch(
+        {[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50})
+    ),
+    ?assertMatch(
+        {[], #{continuation := end_of_data}},
+        emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50})
+    ),
+    ?assertMatch(
+        {[], #{continuation := end_of_data}},
+        emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50})
+    ),
+
+    Inflight = lists:foldl(
+        fun(Seq, QAcc) ->
+            emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc)
+        end,
+        EmptyInflight,
+        lists:reverse(lists:seq(1, 114))
+    ),
+
+    LastCont = lists:foldl(
+        fun(PageSeq, Cont) ->
+            Limit = 10,
+            PagerParams = #{continuation => Cont, limit => Limit},
+            {Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams),
+            ?assertEqual(10, length(Page)),
+            ExpFirst = PageSeq * Limit - Limit + 1,
+            ExpLast = PageSeq * Limit,
+            ?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)),
+            ?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)),
+            ?assertMatch(
+                #{count := 114, continuation := IntCont} when is_integer(IntCont),
+                Meta
+            ),
+            NextCont
+        end,
+        none,
+        lists:seq(1, 11)
+    ),
+    {LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{
+        continuation => LastCont, limit => 10
+    }),
+    ?assertEqual(4, length(LastPartialPage)),
+    ?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)),
+    ?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)),
+    ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
+
+    ?assertMatch(
+        {[], #{continuation := end_of_data}},
+        emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10})
+    ),
+
+    {LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}),
+    ?assertEqual(114, length(LargePage)),
+    ?assertEqual({1, <<"1">>}, hd(LargePage)),
+    ?assertEqual({114, <<"114">>}, lists:last(LargePage)),
+    ?assertMatch(#{continuation := end_of_data}, LargeMeta),
+
+    {FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}),
+    ?assertEqual(114, length(FullPage)),
+    ?assertEqual({1, <<"1">>}, hd(FullPage)),
+    ?assertEqual({114, <<"114">>}, lists:last(FullPage)),
+    ?assertMatch(#{continuation := end_of_data}, FullMeta),
+
+    {EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}),
+    ?assertEqual([], EmptyPage),
+    ?assertMatch(#{continuation := none, count := 114}, EmptyMeta).

+ 68 - 0
apps/emqx/test/emqx_mqueue_SUITE.erl

@@ -282,6 +282,74 @@ t_dropped(_) ->
     {Msg, Q2} = ?Q:in(Msg, Q1),
     ?assertEqual(1, ?Q:dropped(Q2)).
 
+t_query(_) ->
+    EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}),
+    ?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})),
+    ?assertMatch(
+        {[], #{continuation := end_of_data}},
+        ?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50})
+    ),
+    ?assertMatch(
+        {[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50})
+    ),
+
+    Q = lists:foldl(
+        fun(Seq, QAcc) ->
+            Msg = emqx_message:make(<<"t">>, integer_to_binary(Seq)),
+            {_, QAcc1} = ?Q:in(Msg, QAcc),
+            QAcc1
+        end,
+        EmptyQ,
+        lists:seq(1, 114)
+    ),
+
+    LastCont = lists:foldl(
+        fun(PageSeq, Cont) ->
+            Limit = 10,
+            PagerParams = #{continuation => Cont, limit => Limit},
+            {Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams),
+            ?assertEqual(10, length(Page)),
+            ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
+            ExpLastPayload = integer_to_binary(PageSeq * Limit),
+            ?assertEqual(
+                ExpFirstPayload,
+                emqx_message:payload(lists:nth(1, Page)),
+                #{page_seq => PageSeq, page => Page, meta => Meta}
+            ),
+            ?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))),
+            ?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta),
+            NextCont
+        end,
+        none,
+        lists:seq(1, 11)
+    ),
+    {LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}),
+    ?assertEqual(4, length(LastPartialPage)),
+    ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
+    ?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))),
+    ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
+
+    ?assertMatch(
+        {[], #{continuation := end_of_data}},
+        ?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10})
+    ),
+
+    {LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}),
+    ?assertEqual(114, length(LargePage)),
+    ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
+    ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
+    ?assertMatch(#{continuation := end_of_data}, LargeMeta),
+
+    {FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}),
+    ?assertEqual(114, length(FullPage)),
+    ?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))),
+    ?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))),
+    ?assertMatch(#{continuation := end_of_data}, FullMeta),
+
+    {EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}),
+    ?assertEqual([], EmptyPage),
+    ?assertMatch(#{continuation := none, count := 114}, EmptyMeta).
+
 conservation_prop() ->
     ?FORALL(
         {Priorities, Messages},

+ 29 - 1
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -178,8 +178,36 @@ fields(hasnext) ->
     >>,
     Meta = #{desc => Desc, required => true},
     [{hasnext, hoconsc:mk(boolean(), Meta)}];
+fields('after') ->
+    Desc = <<
+        "The value of \"last\" field returned in the previous response. It can then be used"
+        " in subsequent requests to get the next chunk of results.<br/>"
+        "It is used instead of \"page\" parameter to traverse volatile data.<br/>"
+        "Can be omitted or set to \"none\" to get the first chunk of data.<br/>"
+        "\last\" = end_of_data\" is returned, if there is no more data.<br/>"
+        "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
+        " error response."
+    >>,
+    Meta = #{
+        in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">>
+    },
+    [{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
+fields(last) ->
+    Desc = <<
+        "An opaque token that can then be in subsequent requests to get "
+        " the next chunk of results: \"?after={last}\"<br/>"
+        "if there is no more data, \"last\" = end_of_data\" is returned.<br/>"
+        "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
+        " error response."
+    >>,
+    Meta = #{
+        desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">>
+    },
+    [{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
 fields(meta) ->
-    fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext).
+    fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext);
+fields(continuation_meta) ->
+    fields(last) ++ fields(count).
 
 -spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema().
 schema_with_example(Type, Example) ->

+ 7 - 0
apps/emqx_management/src/emqx_mgmt.erl

@@ -52,6 +52,7 @@
     kickout_clients/1,
     list_authz_cache/1,
     list_client_subscriptions/1,
+    list_client_msgs/3,
     client_subscriptions/2,
     clean_authz_cache/1,
     clean_authz_cache/2,
@@ -417,6 +418,12 @@ list_client_subscriptions_mem(ClientId) ->
             end
     end.
 
+list_client_msgs(MsgsType, ClientId, PagerParams) when
+    MsgsType =:= inflight_msgs;
+    MsgsType =:= mqueue_msgs
+->
+    call_client(ClientId, {MsgsType, PagerParams}).
+
 client_subscriptions(Node, ClientId) ->
     {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
 

+ 50 - 0
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -22,6 +22,8 @@
 
 -define(LONG_QUERY_TIMEOUT, 50000).
 
+-define(CONT_BASE64_OPTS, #{mode => urlsafe, padding => false}).
+
 -export([
     paginate/3
 ]).
@@ -37,6 +39,8 @@
 
 -export([
     parse_pager_params/1,
+    parse_cont_pager_params/2,
+    encode_cont_pager_params/2,
     parse_qstring/2,
     init_query_result/0,
     init_query_state/5,
@@ -134,6 +138,33 @@ page(Params) ->
 limit(Params) when is_map(Params) ->
     maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()).
 
+continuation(Params, Encoding) ->
+    try
+        decode_continuation(maps:get(<<"after">>, Params, none), Encoding)
+    catch
+        _:_ ->
+            error
+    end.
+
+decode_continuation(none, _Encoding) ->
+    none;
+decode_continuation(end_of_data, _Encoding) ->
+    %% Clients should not send "after=end_of_data" back to the server
+    error;
+decode_continuation(Cont, none) ->
+    Cont;
+decode_continuation(Cont, base64) ->
+    base64:decode(Cont, ?CONT_BASE64_OPTS).
+
+encode_continuation(none, _Encoding) ->
+    none;
+encode_continuation(end_of_data, _Encoding) ->
+    end_of_data;
+encode_continuation(Cont, none) ->
+    emqx_utils_conv:bin(Cont);
+encode_continuation(Cont, base64) ->
+    base64:encode(emqx_utils_conv:bin(Cont), ?CONT_BASE64_OPTS).
+
 %%--------------------------------------------------------------------
 %% Node Query
 %%--------------------------------------------------------------------
@@ -632,6 +663,25 @@ parse_pager_params(Params) ->
             false
     end.
 
+-spec parse_cont_pager_params(map(), none | base64) ->
+    #{limit := pos_integer(), continuation := none | end_of_table | binary()} | false.
+parse_cont_pager_params(Params, Encoding) ->
+    Cont = continuation(Params, Encoding),
+    Limit = b2i(limit(Params)),
+    case Limit > 0 andalso Cont =/= error of
+        true ->
+            #{continuation => Cont, limit => Limit};
+        false ->
+            false
+    end.
+
+-spec encode_cont_pager_params(map(), none | base64) -> map().
+encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) ->
+    Meta1 = maps:remove(continuation, Meta),
+    Meta1#{last => encode_continuation(Cont, ContEncoding)};
+encode_cont_pager_params(Meta, _ContEncoding) ->
+    Meta.
+
 %%--------------------------------------------------------------------
 %% Types
 %%--------------------------------------------------------------------

+ 219 - 2
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -22,8 +22,8 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_cm.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
-
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
 
 -include("emqx_mgmt.hrl").
 
@@ -47,7 +47,9 @@
     unsubscribe/2,
     unsubscribe_batch/2,
     set_keepalive/2,
-    sessions_count/2
+    sessions_count/2,
+    inflight_msgs/2,
+    mqueue_msgs/2
 ]).
 
 -export([
@@ -101,6 +103,8 @@ paths() ->
         "/clients/:clientid/unsubscribe",
         "/clients/:clientid/unsubscribe/bulk",
         "/clients/:clientid/keepalive",
+        "/clients/:clientid/mqueue_messages",
+        "/clients/:clientid/inflight_messages",
         "/sessions_count"
     ].
 
@@ -391,6 +395,14 @@ schema("/clients/:clientid/keepalive") ->
             }
         }
     };
+schema("/clients/:clientid/mqueue_messages") ->
+    ContExample = <<"AAYS53qRa0n07AAABFIACg">>,
+    RespSchema = ?R_REF(mqueue_messages),
+    client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema);
+schema("/clients/:clientid/inflight_messages") ->
+    ContExample = <<"10">>,
+    RespSchema = ?R_REF(inflight_messages),
+    client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema);
 schema("/sessions_count") ->
     #{
         'operationId' => sessions_count,
@@ -621,6 +633,26 @@ fields(subscribe) ->
 fields(unsubscribe) ->
     [
         {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>, example => <<"testtopic/#">>})}
+    ];
+fields(mqueue_messages) ->
+    [
+        {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})},
+        {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})}
+    ];
+fields(inflight_messages) ->
+    [
+        {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(inflight_msgs_list)})},
+        {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})}
+    ];
+fields(message) ->
+    [
+        {msgid, hoconsc:mk(binary(), #{desc => ?DESC(msg_id)})},
+        {topic, hoconsc:mk(binary(), #{desc => ?DESC(msg_topic)})},
+        {qos, hoconsc:mk(emqx_schema:qos(), #{desc => ?DESC(msg_qos)})},
+        {publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})},
+        {from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})},
+        {from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})},
+        {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}
     ].
 
 %%%==============================================================================================
@@ -693,6 +725,15 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) ->
             end
     end.
 
+mqueue_msgs(get, #{bindings := #{clientid := ClientID}, query_string := QString}) ->
+    list_client_msgs(mqueue_msgs, ClientID, QString).
+
+inflight_msgs(get, #{
+    bindings := #{clientid := ClientID},
+    query_string := QString
+}) ->
+    list_client_msgs(inflight_msgs, ClientID, QString).
+
 %%%==============================================================================================
 %% api apply
 
@@ -825,6 +866,62 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
 %%--------------------------------------------------------------------
 %% internal function
 
+client_msgs_schema(OpId, Desc, ContExample, RespSchema) ->
+    #{
+        'operationId' => OpId,
+        get => #{
+            description => Desc,
+            tags => ?TAGS,
+            parameters => client_msgs_params(),
+            responses => #{
+                200 =>
+                    emqx_dashboard_swagger:schema_with_example(RespSchema, #{
+                        <<"data">> => [message_example()],
+                        <<"meta">> => #{
+                            <<"count">> => 100,
+                            <<"last">> => ContExample
+                        }
+                    }),
+                400 =>
+                    emqx_dashboard_swagger:error_codes(
+                        ['INVALID_PARAMETER'], <<"Invalid parameters">>
+                    ),
+                404 => emqx_dashboard_swagger:error_codes(
+                    ['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
+                )
+            }
+        }
+    }.
+
+client_msgs_params() ->
+    [
+        {clientid, hoconsc:mk(binary(), #{in => path})},
+        {payload,
+            hoconsc:mk(hoconsc:enum([none, base64, plain]), #{
+                in => query,
+                default => base64,
+                desc => <<
+                    "Client's inflight/mqueue messages payload encoding."
+                    " If set to `none`, no payload is returned in the response."
+                >>
+            })},
+        {max_payload_bytes,
+            hoconsc:mk(emqx_schema:bytesize(), #{
+                in => query,
+                default => <<"1MB">>,
+                desc => <<
+                    "Client's inflight/mqueue messages payload limit."
+                    " The total payload size of all messages in the response will not exceed this value."
+                    " Messages beyond the limit will be silently omitted in the response."
+                    " The only exception to this rule is when the first message payload"
+                    " is already larger than the limit."
+                    " In this case, the first message will be returned in the response."
+                >>
+            })},
+        hoconsc:ref(emqx_dashboard_swagger, 'after'),
+        hoconsc:ref(emqx_dashboard_swagger, limit)
+    ].
+
 do_subscribe(ClientID, Topic0, Options) ->
     try emqx_topic:parse(Topic0, Options) of
         {Topic, Opts} ->
@@ -1037,6 +1134,42 @@ remove_live_sessions(Rows) ->
         Rows
     ).
 
+list_client_msgs(MsgType, ClientID, QString) ->
+    case parse_cont_pager_params(QString, MsgType) of
+        false ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}};
+        PagerParams = #{} ->
+            case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of
+                {error, not_found} ->
+                    {404, ?CLIENTID_NOT_FOUND};
+                {Msgs, Meta = #{}} when is_list(Msgs) ->
+                    format_msgs_resp(MsgType, Msgs, Meta, QString)
+            end
+    end.
+
+parse_cont_pager_params(QString, MsgType) ->
+    case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of
+        false ->
+            false;
+        PagerParams ->
+            maybe_cast_cont(MsgType, PagerParams)
+    end.
+
+maybe_cast_cont(inflight_msgs, #{continuation := Cont} = PagerParams) when is_binary(Cont) ->
+    try
+        PagerParams#{continuation => emqx_utils_conv:int(Cont)}
+    catch
+        _:_ ->
+            false
+    end;
+maybe_cast_cont(_, PagerParams) ->
+    PagerParams.
+
+%% integer packet id
+cont_encoding(inflight_msgs) -> none;
+%% binary message id
+cont_encoding(mqueue_msgs) -> base64.
+
 %%--------------------------------------------------------------------
 %% QueryString to Match Spec
 
@@ -1197,6 +1330,79 @@ format_persistent_session_info(ClientId, PSInfo0) ->
     ),
     result_format_undefined_to_null(PSInfo).
 
+format_msgs_resp(MsgType, Msgs, Meta, QString) ->
+    #{
+        <<"payload">> := PayloadFmt,
+        <<"max_payload_bytes">> := MaxBytes
+    } = QString,
+    Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)),
+    Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)},
+    %% Make sure minirest won't set another content-type for self-encoded JSON response body
+    Headers = #{<<"content-type">> => <<"application/json">>},
+    case emqx_utils_json:safe_encode(Resp) of
+        {ok, RespBin} ->
+            {200, Headers, RespBin};
+        _Error when PayloadFmt =:= plain ->
+            ?BAD_REQUEST(
+                <<"INVALID_PARAMETER">>,
+                <<"Some message payloads are not JSON serializable">>
+            );
+        %% Unexpected internal error
+        Error ->
+            ?INTERNAL_ERROR(Error)
+    end.
+
+format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
+    %% Always include at least one message payload, even if it exceeds the limit
+    {FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt),
+    {Msgs1, _} =
+        catch lists:foldl(
+            fun(Msg, {MsgsAcc, SizeAcc} = Acc) ->
+                {Msg1, PayloadSize} = format_msg(Msg, PayloadFmt),
+                case SizeAcc + PayloadSize of
+                    SizeAcc1 when SizeAcc1 =< MaxBytes ->
+                        {[Msg1 | MsgsAcc], SizeAcc1};
+                    _ ->
+                        throw(Acc)
+                end
+            end,
+            {[FirstMsg1], PayloadSize0},
+            Msgs
+        ),
+    lists:reverse(Msgs1);
+format_msgs([], _PayloadFmt, _MaxBytes) ->
+    [].
+
+format_msg(
+    #message{
+        id = ID,
+        qos = Qos,
+        topic = Topic,
+        from = From,
+        timestamp = Timestamp,
+        headers = Headers,
+        payload = Payload
+    },
+    PayloadFmt
+) ->
+    Msg = #{
+        msgid => emqx_guid:to_hexstr(ID),
+        qos => Qos,
+        topic => Topic,
+        publish_at => Timestamp,
+        from_clientid => emqx_utils_conv:bin(From),
+        from_username => maps:get(username, Headers, <<>>)
+    },
+    format_payload(PayloadFmt, Msg, Payload).
+
+format_payload(none, Msg, _Payload) ->
+    {Msg, 0};
+format_payload(base64, Msg, Payload) ->
+    Payload1 = base64:encode(Payload),
+    {Msg#{payload => Payload1}, erlang:byte_size(Payload1)};
+format_payload(plain, Msg, Payload) ->
+    {Msg#{payload => Payload}, erlang:iolist_size(Payload)}.
+
 %% format func helpers
 take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
     maps:merge(Current, Value);
@@ -1298,6 +1504,17 @@ client_example() ->
         <<"recv_msg.qos0">> => 0
     }.
 
+message_example() ->
+    #{
+        <<"msgid">> => <<"000611F460D57FA9F44500000D360002">>,
+        <<"topic">> => <<"t/test">>,
+        <<"qos">> => 0,
+        <<"publish_at">> => 1709055346487,
+        <<"from_clientid">> => <<"mqttx_59ac0a87">>,
+        <<"from_username">> => <<"test-user">>,
+        <<"payload">> => <<"eyJmb28iOiAiYmFyIn0=">>
+    }.
+
 sessions_count(get, #{query_string := QString}) ->
     Since = maps:get(<<"since">>, QString, 0),
     Count = emqx_cm_registry_keeper:count(Since),

+ 234 - 3
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -23,16 +23,23 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/asserts.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 
 all() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {group, persistent_sessions}
-        | AllTCs -- persistent_session_testcases()
+        {group, persistent_sessions},
+        {group, msgs_base64_encoding},
+        {group, msgs_plain_encoding}
+        | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases())
     ].
 
 groups() ->
-    [{persistent_sessions, persistent_session_testcases()}].
+    [
+        {persistent_sessions, persistent_session_testcases()},
+        {msgs_base64_encoding, client_msgs_testcases()},
+        {msgs_plain_encoding, client_msgs_testcases()}
+    ].
 
 persistent_session_testcases() ->
     [
@@ -42,12 +49,19 @@ persistent_session_testcases() ->
         t_persistent_sessions4,
         t_persistent_sessions5
     ].
+client_msgs_testcases() ->
+    [
+        t_inflight_messages,
+        t_mqueue_messages
+    ].
 
 init_per_suite(Config) ->
+    ok = snabbkaffe:start_trace(),
     emqx_mgmt_api_test_util:init_suite(),
     Config.
 
 end_per_suite(_) ->
+    ok = snabbkaffe:stop(),
     emqx_mgmt_api_test_util:end_suite().
 
 init_per_group(persistent_sessions, Config) ->
@@ -67,6 +81,10 @@ init_per_group(persistent_sessions, Config) ->
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     [{nodes, Nodes} | Config];
+init_per_group(msgs_base64_encoding, Config) ->
+    [{payload_encoding, base64} | Config];
+init_per_group(msgs_plain_encoding, Config) ->
+    [{payload_encoding, plain} | Config];
 init_per_group(_Group, Config) ->
     Config.
 
@@ -77,6 +95,21 @@ end_per_group(persistent_sessions, Config) ->
 end_per_group(_Group, _Config) ->
     ok.
 
+end_per_testcase(TC, _Config) when
+    TC =:= t_inflight_messages;
+    TC =:= t_mqueue_messages
+->
+    ClientId = atom_to_binary(TC),
+    lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)),
+    ok = emqx_common_test_helpers:wait_for(
+        ?FUNCTION_NAME,
+        ?LINE,
+        fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end,
+        5000
+    );
+end_per_testcase(_TC, _Config) ->
+    ok.
+
 t_clients(_) ->
     process_flag(trap_exit, true),
 
@@ -759,8 +792,206 @@ t_client_id_not_found(_Config) ->
     ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)),
     ?assertMatch(
         {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody])
+    ),
+    %% Mqueue messages
+    ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))),
+    %% Inflight messages
+    ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))).
+
+t_mqueue_messages(Config) ->
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    Topic = <<"t/test_mqueue_msgs">>,
+    Count = emqx_mgmt:default_row_limit(),
+    {ok, _Client} = client_with_mqueue(ClientId, Topic, Count),
+    Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
+    ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)),
+
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(
+            get, Path, "limit=10&after=not-base64%23%21", AuthHeader
+        )
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(
+            get, Path, "limit=-5&after=not-base64%23%21", AuthHeader
+        )
+    ).
+
+t_inflight_messages(Config) ->
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    Topic = <<"t/test_inflight_msgs">>,
+    PubCount = emqx_mgmt:default_row_limit(),
+    {ok, Client} = client_with_inflight(ClientId, Topic, PubCount),
+    Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
+    InflightLimit = emqx:get_config([mqtt, max_inflight]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)),
+
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(
+            get, Path, "limit=10&after=not-int", AuthHeader
+        )
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(
+            get, Path, "limit=-5&after=invalid-int", AuthHeader
+        )
+    ),
+    emqtt:stop(Client).
+
+client_with_mqueue(ClientId, Topic, Count) ->
+    {ok, Client} = emqtt:start_link([
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {clean_start, false},
+        {properties, #{'Session-Expiry-Interval' => 120}}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
+    ok = emqtt:disconnect(Client),
+    publish_msgs(Topic, Count),
+    {ok, Client}.
+
+client_with_inflight(ClientId, Topic, Count) ->
+    {ok, Client} = emqtt:start_link([
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {clean_start, true},
+        {auto_ack, never}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
+    publish_msgs(Topic, Count),
+    {ok, Client}.
+
+publish_msgs(Topic, Count) ->
+    lists:foreach(
+        fun(Seq) ->
+            emqx_broker:publish(emqx_message:make(undefined, ?QOS_1, Topic, integer_to_binary(Seq)))
+        end,
+        lists:seq(1, Count)
     ).
 
+test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
+    Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
+    {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
+    #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
+
+    ?assertMatch(
+        #{
+            <<"last">> := <<"end_of_data">>,
+            <<"count">> := Count
+        },
+        Meta
+    ),
+    ?assertEqual(length(Msgs), Count),
+    lists:foreach(
+        fun({Seq, #{<<"payload">> := P} = M}) ->
+            ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
+            ?assertMatch(
+                #{
+                    <<"msgid">> := _,
+                    <<"topic">> := Topic,
+                    <<"qos">> := _,
+                    <<"publish_at">> := _,
+                    <<"from_clientid">> := _,
+                    <<"from_username">> := _
+                },
+                M
+            )
+        end,
+        lists:zip(lists:seq(1, Count), Msgs)
+    ),
+
+    %% The first message payload is <<"1">>,
+    %% and when it is urlsafe base64 encoded (with no padding), it's <<"MQ">>,
+    %% so we cover both cases:
+    %% - when total payload size exceeds the limit,
+    %% - when the first message payload already exceeds the limit but is still returned in the response.
+    QsPayloadLimit = io_lib:format("payload=~s&max_payload_bytes=1", [PayloadEncoding]),
+    {ok, LimitedMsgsResp} = emqx_mgmt_api_test_util:request_api(
+        get, Path, QsPayloadLimit, AuthHeader
+    ),
+    #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
+    ct:pal("~p", [FirstMsgOnly]),
+    ?assertEqual(1, length(FirstMsgOnly)),
+    ?assertEqual(
+        <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
+    ),
+
+    Limit = 19,
+    LastCont = lists:foldl(
+        fun(PageSeq, Cont) ->
+            Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]),
+            {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
+            #{
+                <<"meta">> := #{<<"last">> := NextCont} = MetaP,
+                <<"data">> := MsgsP
+            } = emqx_utils_json:decode(MsgsRespP),
+            ?assertMatch(#{<<"count">> := Count}, MetaP),
+            ?assertNotEqual(<<"end_of_data">>, NextCont),
+            ?assertEqual(length(MsgsP), Limit),
+            ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
+            ExpLastPayload = integer_to_binary(PageSeq * Limit),
+            ?assertEqual(
+                ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding)
+            ),
+            ?assertEqual(
+                ExpLastPayload,
+                decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding)
+            ),
+            NextCont
+        end,
+        none,
+        lists:seq(1, Count div 19)
+    ),
+    LastPartialPage = Count div 19 + 1,
+    LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]),
+    {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
+    #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(
+        MsgsRespLastP
+    ),
+    ?assertEqual(<<"end_of_data">>, EmptyCont),
+    ?assertMatch(#{<<"count">> := Count}, MetaLastP),
+
+    ?assertEqual(
+        integer_to_binary(LastPartialPage * Limit - Limit + 1),
+        decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding)
+    ),
+    ?assertEqual(
+        integer_to_binary(Count),
+        decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding)
+    ),
+
+    ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [
+        PayloadEncoding, EmptyCont, Limit
+    ]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader)
+    ),
+
+    %% Invalid common page params
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(get, Path, "limit=0", AuthHeader)
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(get, Path, "limit=limit", AuthHeader)
+    ).
+
+decode_payload(Payload, base64) ->
+    base64:decode(Payload);
+decode_payload(Payload, _) ->
+    Payload.
+
 t_subscribe_shared_topic(_Config) ->
     ClientId = <<"client_subscribe_shared">>,
 

+ 21 - 0
changes/ce/feat-12561.en.md

@@ -0,0 +1,21 @@
+Implement HTTP APIs to get the list of client's inflight and mqueue messages.
+
+To get the first chunk of data:
+ - GET /clients/{clientid}/mqueue_messages?limit=100
+ - GET /clients/{clientid}/inflight_messages?limit=100
+
+Alternatively:
+ - GET /clients/{clientid}/mqueue_messages?limit=100&after=none
+ - GET /clients/{clientid}/inflight_messages?limit=100&after=none
+
+To get the next chunk of data:
+ - GET /clients/{clientid}/mqueue_messages?limit=100&after={last}
+ - GET /clients/{clientid}/inflight_messages?limit=100&after={last}
+
+ Where {last} is a value (opaque string token) of "meta.last" field from the previous response.
+
+ If there is no more data, "last" = "end_of_data" is returned.
+ If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received.
+
+Mqueue messages are ordered according to the queue (FIFO) order.
+Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order.

+ 51 - 0
rel/i18n/emqx_mgmt_api_clients.hocon

@@ -35,6 +35,57 @@ get_client_subs.desc:
 get_client_subs.label:
 """Get client subscriptions"""
 
+get_client_mqueue_msgs.desc:
+"""Get client mqueue messages"""
+get_client_mqueue_msgs.label:
+"""Get client mqueue messages"""
+
+get_client_inflight_msgs.desc:
+"""Get client inflight messages"""
+get_client_inflight_msgs.label:
+"""Get client inflight messages"""
+
+mqueue_msgs_list.desc:
+"""Client's mqueue messages list. The queue (FIFO) ordering is preserved."""
+mqueue_msgs_list.label:
+"""Client's mqueue messages"""
+
+inflight_msgs_list.desc:
+"""Client's inflight messages list.
+Ordered by MQTT Packet Id, which may not represent the chronological messages order."""
+inflight_msgs_list.label:
+"""Client's inflight messages"""
+
+msg_id.desc:
+"""Message ID."""
+msg_id.label:
+"""Message ID"""
+
+msg_topic.desc:
+"""Message topic."""
+msg_topic.label:
+"""Message Topic"""
+
+msg_qos.desc:
+"""Message QoS."""
+msg_topic.label:
+"""Message Qos"""
+
+msg_publish_at.desc:
+"""Message publish time, a millisecond precision Unix epoch timestamp."""
+msg_publish_at.label:
+"""Message Publish Time."""
+
+msg_from_clientid.desc:
+"""Message publisher's client ID."""
+msg_from_clientid.desc:
+"""Message publisher's Client ID"""
+
+msg_from_username.desc:
+"""Message publisher's username."""
+msg_from_username.label:
+"""Message Publisher's Username """
+
 subscribe.desc:
 """Subscribe"""
 subscribe.label: