Просмотр исходного кода

Merge pull request #12732 from SergeTupchiy/EMQX-11861-client-mqueue-inflight-API-updates

EMQX-11861 client mqueue inflight api updates
SergeTupchiy 1 год назад
Родитель
Сommit
27c1f1a4e4

+ 1 - 46
apps/emqx/src/emqx_inflight.erl

@@ -36,8 +36,7 @@
     max_size/1,
     is_full/1,
     is_empty/1,
-    window/1,
-    query/2
+    window/1
 ]).
 
 -export_type([inflight/0]).
@@ -139,47 +138,3 @@ size(?INFLIGHT(Tree)) ->
 -spec max_size(inflight()) -> non_neg_integer().
 max_size(?INFLIGHT(MaxSize, _Tree)) ->
     MaxSize.
-
--spec query(inflight(), #{continuation => Cont, limit := L}) ->
-    {[{key(), term()}], #{continuation := Cont, count := C}}
-when
-    Cont :: none | end_of_data | key(),
-    L :: non_neg_integer(),
-    C :: non_neg_integer().
-query(?INFLIGHT(Tree), #{limit := Limit} = Pager) ->
-    Count = gb_trees:size(Tree),
-    ContKey = maps:get(continuation, Pager, none),
-    {List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit),
-    {List, #{continuation => NextCont, count => Count}}.
-
-iterator_from(none, Tree) ->
-    gb_trees:iterator(Tree);
-iterator_from(ContKey, Tree) ->
-    It = gb_trees:iterator_from(ContKey, Tree),
-    case gb_trees:next(It) of
-        {ContKey, _Val, ItNext} -> ItNext;
-        _ -> It
-    end.
-
-sublist(_It, 0) ->
-    {[], none};
-sublist(It, Len) ->
-    {ListAcc, HasNext} = sublist(It, Len, []),
-    {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
-
-sublist(It, 0, Acc) ->
-    {Acc, gb_trees:next(It) =/= none};
-sublist(It, Len, Acc) ->
-    case gb_trees:next(It) of
-        none ->
-            {Acc, false};
-        {Key, Val, ItNext} ->
-            sublist(ItNext, Len - 1, [{Key, Val} | Acc])
-    end.
-
-next_cont(_Acc, false) ->
-    end_of_data;
-next_cont([{LastKey, _LastVal} | _Acc], _HasNext) ->
-    LastKey;
-next_cont([], _HasNext) ->
-    end_of_data.

+ 101 - 47
apps/emqx/src/emqx_mqueue.erl

@@ -98,6 +98,7 @@
 -define(HIGHEST_PRIORITY, infinity).
 -define(MAX_LEN_INFINITY, 0).
 -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
+-define(INSERT_TS, mqueue_insert_ts).
 
 -record(shift_opts, {
     multiplier :: non_neg_integer(),
@@ -172,54 +173,82 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
             MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
     end.
 
--spec query(mqueue(), #{continuation => ContMsgId, limit := L}) ->
-    {[message()], #{continuation := ContMsgId, count := C}}
+-spec query(mqueue(), #{position => Pos, limit := Limit}) ->
+    {[message()], #{position := Pos, start := Pos}}
 when
-    ContMsgId :: none | end_of_data | binary(),
-    C :: non_neg_integer(),
-    L :: non_neg_integer().
-query(MQ, #{limit := Limit} = Pager) ->
-    ContMsgId = maps:get(continuation, Pager, none),
-    {List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit),
-    {List, #{continuation => NextCont, count => len(MQ)}}.
-
-skip_until(MQ, none = _MsgId) ->
-    MQ;
-skip_until(MQ, MsgId) ->
-    do_skip_until(MQ, MsgId).
+    Pos :: none | {integer(), priority()},
+    Limit :: non_neg_integer().
+query(MQ, #{limit := Limit} = PagerParams) ->
+    Pos = maps:get(position, PagerParams, none),
+    PQsList = ?PQUEUE:to_queues_list(MQ#mqueue.q),
+    {Msgs, NxtPos} = sublist(skip_until(PQsList, Pos), Limit, [], Pos),
+    {Msgs, #{position => NxtPos, start => first_msg_pos(PQsList)}}.
+
+first_msg_pos([]) ->
+    none;
+first_msg_pos([{Prio, PQ} | T]) ->
+    case ?PQUEUE:out(PQ) of
+        {empty, _PQ} ->
+            first_msg_pos(T);
+        {{value, Msg}, _Q} ->
+            {insert_ts(Msg), Prio}
+    end.
 
-do_skip_until(MQ, MsgId) ->
-    case out(MQ) of
-        {empty, MQ} ->
-            MQ;
-        {{value, #message{id = MsgId}}, Q1} ->
-            Q1;
-        {{value, _Msg}, Q1} ->
-            do_skip_until(Q1, MsgId)
+skip_until(PQsList, none = _Pos) ->
+    PQsList;
+skip_until(PQsList, {MsgPos, PrioPos}) ->
+    case skip_until_prio(PQsList, PrioPos) of
+        [{Prio, PQ} | T] ->
+            PQ1 = skip_until_msg(PQ, MsgPos),
+            [{Prio, PQ1} | T];
+        [] ->
+            []
     end.
 
-sublist(_MQ, 0) ->
-    {[], none};
-sublist(MQ, Len) ->
-    {ListAcc, HasNext} = sublist(MQ, Len, []),
-    {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
+skip_until_prio(PQsList, PrioPos) ->
+    lists:dropwhile(fun({Prio, _PQ}) -> Prio > PrioPos end, PQsList).
+
+skip_until_msg(PQ, MsgPos) ->
+    case ?PQUEUE:out(PQ) of
+        {empty, PQ1} ->
+            PQ1;
+        {{value, Msg}, PQ1} ->
+            case insert_ts(Msg) > MsgPos of
+                true -> PQ;
+                false -> skip_until_msg(PQ1, MsgPos)
+            end
+    end.
 
-sublist(MQ, 0, Acc) ->
-    {Acc, element(1, out(MQ)) =/= empty};
-sublist(MQ, Len, Acc) ->
-    case out(MQ) of
-        {empty, _MQ} ->
-            {Acc, false};
-        {{value, Msg}, Q1} ->
-            sublist(Q1, Len - 1, [Msg | Acc])
+sublist(PQs, Len, Acc, LastPosPrio) when PQs =:= []; Len =:= 0 ->
+    {Acc, LastPosPrio};
+sublist([{Prio, PQ} | T], Len, Acc, LastPosPrio) ->
+    {SingleQAcc, SingleQSize} = sublist_single_pq(Prio, PQ, Len, [], 0),
+    Acc1 = Acc ++ lists:reverse(SingleQAcc),
+    NxtPosPrio =
+        case SingleQAcc of
+            [H | _] -> {insert_ts(H), Prio};
+            [] -> LastPosPrio
+        end,
+    case SingleQSize =:= Len of
+        true ->
+            {Acc1, NxtPosPrio};
+        false ->
+            sublist(T, Len - SingleQSize, Acc1, NxtPosPrio)
     end.
 
-next_cont(_Acc, false) ->
-    end_of_data;
-next_cont([#message{id = Id} | _Acc], _HasNext) ->
-    Id;
-next_cont([], _HasNext) ->
-    end_of_data.
+sublist_single_pq(_Prio, _PQ, 0, Acc, AccSize) ->
+    {Acc, AccSize};
+sublist_single_pq(Prio, PQ, Len, Acc, AccSize) ->
+    case ?PQUEUE:out(0, PQ) of
+        {empty, _PQ} ->
+            {Acc, AccSize};
+        {{value, Msg}, PQ1} ->
+            Msg1 = with_prio(Msg, Prio),
+            sublist_single_pq(Prio, PQ1, Len - 1, [Msg1 | Acc], AccSize + 1)
+    end.
+
+with_prio(#message{extra = Extra} = Msg, Prio) ->
+    Msg#message{extra = Extra#{mqueue_priority => Prio}}.
 
 to_list(MQ, Acc) ->
     case out(MQ) of
@@ -256,14 +285,15 @@ in(
 ) ->
     Priority = get_priority(Topic, PTab, Dp),
     PLen = ?PQUEUE:plen(Priority, Q),
+    Msg1 = with_ts(Msg),
     case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
         true ->
             %% reached max length, drop the oldest message
             {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
-            Q2 = ?PQUEUE:in(Msg, Priority, Q1),
-            {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
+            Q2 = ?PQUEUE:in(Msg1, Priority, Q1),
+            {without_ts(DroppedMsg), MQ#mqueue{q = Q2, dropped = Dropped + 1}};
         false ->
-            {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
+            {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg1, Priority, Q)}}
     end.
 
 -spec out(mqueue()) -> {empty | {value, message()}, mqueue()}.
@@ -280,7 +310,7 @@ out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts
         last_prio = Prio,
         p_credit = get_credits(Prio, ShiftOpts)
     },
-    {{value, Val}, MQ1};
+    {{value, without_ts(Val)}, MQ1};
 out(MQ = #mqueue{q = Q, p_credit = 0}) ->
     MQ1 = MQ#mqueue{
         q = ?PQUEUE:shift(Q),
@@ -288,8 +318,12 @@ out(MQ = #mqueue{q = Q, p_credit = 0}) ->
     },
     out(MQ1);
 out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) ->
-    {R, Q1} = ?PQUEUE:out(Q),
-    {R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}.
+    {R, Q2} =
+        case ?PQUEUE:out(Q) of
+            {{value, Val}, Q1} -> {{value, without_ts(Val)}, Q1};
+            Other -> Other
+        end,
+    {R, MQ#mqueue{q = Q2, len = Len - 1, p_credit = Cnt - 1}}.
 
 get_opt(Key, Opts, Default) ->
     case maps:get(Key, Opts, Default) of
@@ -359,3 +393,23 @@ p_table(PTab = #{}) ->
     );
 p_table(PTab) ->
     PTab.
+
+%% This is used to sort/traverse messages in query/2
+with_ts(#message{extra = Extra} = Msg) ->
+    TsNano = erlang:system_time(nanosecond),
+    Extra1 =
+        case is_map(Extra) of
+            true -> Extra;
+            %% extra field has not being used before EMQX 5.4.0
+            %% and defaulted to an empty list,
+            %% if it's not a map it's safe to overwrite it
+            false -> #{}
+        end,
+    Msg#message{extra = Extra1#{?INSERT_TS => TsNano}}.
+
+without_ts(#message{extra = Extra} = Msg) ->
+    Msg#message{extra = maps:remove(?INSERT_TS, Extra)};
+without_ts(Msg) ->
+    Msg.
+
+insert_ts(#message{extra = #{?INSERT_TS := Ts}}) -> Ts.

+ 13 - 0
apps/emqx/src/emqx_pqueue.erl

@@ -46,6 +46,7 @@
     len/1,
     plen/2,
     to_list/1,
+    to_queues_list/1,
     from_list/1,
     in/2,
     in/3,
@@ -121,6 +122,18 @@ to_list({pqueue, Queues}) ->
         {0, V} <- to_list(Q)
     ].
 
+-spec to_queues_list(pqueue()) -> [{priority(), squeue()}].
+to_queues_list({queue, _In, _Out, _Len} = Squeue) ->
+    [{0, Squeue}];
+to_queues_list({pqueue, Queues}) ->
+    lists:sort(
+        fun
+            ({infinity = _P1, _}, {_P2, _}) -> true;
+            ({P1, _}, {P2, _}) -> P1 >= P2
+        end,
+        [{maybe_negate_priority(P), Q} || {P, Q} <- Queues]
+    ).
+
 -spec from_list([{priority(), any()}]) -> pqueue().
 from_list(L) ->
     lists:foldl(fun({P, E}, Q) -> in(E, P, Q) end, new(), L).

+ 74 - 12
apps/emqx/src/emqx_session_mem.erl

@@ -146,6 +146,8 @@
 
 -define(DEFAULT_BATCH_N, 1000).
 
+-define(INFLIGHT_INSERT_TS, inflight_insert_ts).
+
 %%--------------------------------------------------------------------
 %% Init a Session
 %%--------------------------------------------------------------------
@@ -269,8 +271,7 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
 info(inflight_max, #session{inflight = Inflight}) ->
     emqx_inflight:max_size(Inflight);
 info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) ->
-    {InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams),
-    {[I#inflight_data.message || {_, I} <- InflightList], Meta};
+    inflight_query(Inflight, PagerParams);
 info(retry_interval, #session{retry_interval = Interval}) ->
     Interval;
 info(mqueue, #session{mqueue = MQueue}) ->
@@ -396,7 +397,7 @@ puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
             Session1 = Session#session{inflight = Inflight1},
             {ok, Replies, Session2} = dequeue(ClientInfo, Session1),
-            {ok, Msg, Replies, Session2};
+            {ok, without_inflight_insert_ts(Msg), Replies, Session2};
         {value, _} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
@@ -415,7 +416,7 @@ pubrec(PacketId, Session = #session{inflight = Inflight}) ->
         {value, #inflight_data{phase = wait_ack, message = Msg} = Data} ->
             Update = Data#inflight_data{phase = wait_comp},
             Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
-            {ok, Msg, Session#session{inflight = Inflight1}};
+            {ok, without_inflight_insert_ts(Msg), Session#session{inflight = Inflight1}};
         {value, _} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
@@ -451,7 +452,7 @@ pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
             Session1 = Session#session{inflight = Inflight1},
             {ok, Replies, Session2} = dequeue(ClientInfo, Session1),
-            {ok, Msg, Replies, Session2};
+            {ok, without_inflight_insert_ts(Msg), Replies, Session2};
         {value, _Other} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
@@ -636,7 +637,7 @@ do_retry_delivery(
             _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
             {Acc, emqx_inflight:delete(PacketId, Inflight)};
         false ->
-            Msg1 = emqx_message:set_flag(dup, true, Msg),
+            Msg1 = without_inflight_insert_ts(emqx_message:set_flag(dup, true, Msg)),
             Update = Data#inflight_data{message = Msg1, timestamp = Now},
             Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
             {[{PacketId, Msg1} | Acc], Inflight1}
@@ -728,7 +729,7 @@ replay(ClientInfo, Session) ->
             ({PacketId, #inflight_data{phase = wait_comp}}) ->
                 {pubrel, PacketId};
             ({PacketId, #inflight_data{message = Msg}}) ->
-                {PacketId, emqx_message:set_flag(dup, true, Msg)}
+                {PacketId, without_inflight_insert_ts(emqx_message:set_flag(dup, true, Msg))}
         end,
         emqx_inflight:to_list(Session#session.inflight)
     ),
@@ -775,7 +776,7 @@ redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
             %% If the Client's Session terminates before the Client reconnects,
             %% the Server MUST NOT send the Application Message to any other
             %% subscribed Client [MQTT-4.8.2-5].
-            {true, Msg};
+            {true, without_inflight_insert_ts(Msg)};
         ({_PacketId, #inflight_data{}}) ->
             false
     end,
@@ -798,22 +799,83 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
 %% Helper functions
 %%--------------------------------------------------------------------
 
--compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
+-compile(
+    {inline, [
+        sort_fun/2, batch_n/1, inflight_insert_ts/1, without_inflight_insert_ts/1, with_ts/1, age/2
+    ]}
+).
 
 sort_fun({_, A}, {_, B}) ->
     A#inflight_data.timestamp =< B#inflight_data.timestamp.
 
+query_sort_fun({_, #inflight_data{message = A}}, {_, #inflight_data{message = B}}) ->
+    inflight_insert_ts(A) =< inflight_insert_ts(B).
+
+-spec inflight_query(emqx_inflight:inflight(), #{
+    position => integer() | none, limit := pos_integer()
+}) ->
+    {[emqx_types:message()], #{position := integer() | none, start := integer() | none}}.
+inflight_query(Inflight, #{limit := Limit} = PagerParams) ->
+    InflightL = emqx_inflight:to_list(fun query_sort_fun/2, Inflight),
+    StartPos =
+        case InflightL of
+            [{_, #inflight_data{message = FirstM}} | _] -> inflight_insert_ts(FirstM);
+            [] -> none
+        end,
+    Position = maps:get(position, PagerParams, none),
+    InflightMsgs = sublist_from_pos(InflightL, Position, Limit),
+    NextPos =
+        case InflightMsgs of
+            [_ | _] = L ->
+                inflight_insert_ts(lists:last(L));
+            [] ->
+                Position
+        end,
+    {InflightMsgs, #{start => StartPos, position => NextPos}}.
+
+sublist_from_pos(InflightList, none = _Position, Limit) ->
+    inflight_msgs_sublist(InflightList, Limit);
+sublist_from_pos(InflightList, Position, Limit) ->
+    Inflight = lists:dropwhile(
+        fun({_, #inflight_data{message = M}}) ->
+            inflight_insert_ts(M) =< Position
+        end,
+        InflightList
+    ),
+    inflight_msgs_sublist(Inflight, Limit).
+
+%% Small optimization to get sublist and drop keys in one traversal
+inflight_msgs_sublist([{_Key, #inflight_data{message = Msg}} | T], Limit) when Limit > 0 ->
+    [Msg | inflight_msgs_sublist(T, Limit - 1)];
+inflight_msgs_sublist(_, _) ->
+    [].
+
+inflight_insert_ts(#message{extra = #{?INFLIGHT_INSERT_TS := Ts}}) -> Ts.
+
+without_inflight_insert_ts(#message{extra = Extra} = Msg) ->
+    Msg#message{extra = maps:remove(?INFLIGHT_INSERT_TS, Extra)}.
+
 batch_n(Inflight) ->
     case emqx_inflight:max_size(Inflight) of
         0 -> ?DEFAULT_BATCH_N;
         Sz -> Sz - emqx_inflight:size(Inflight)
     end.
 
-with_ts(Msg) ->
+with_ts(#message{extra = Extra} = Msg) ->
+    InsertTsNano = erlang:system_time(nanosecond),
+    %% This is used to sort/traverse messages in inflight_query/2
+    Extra1 =
+        case is_map(Extra) of
+            true -> Extra;
+            %% extra field has not being used before EMQX 5.4.0 and defaulted to an empty list,
+            %% if it's not a map it's safe to overwrite it
+            false -> #{}
+        end,
+    Msg1 = Msg#message{extra = Extra1#{?INFLIGHT_INSERT_TS => InsertTsNano}},
     #inflight_data{
         phase = wait_ack,
-        message = Msg,
-        timestamp = erlang:system_time(millisecond)
+        message = Msg1,
+        timestamp = erlang:convert_time_unit(InsertTsNano, nanosecond, millisecond)
     }.
 
 age(Now, Ts) -> Now - Ts.

+ 0 - 70
apps/emqx/test/emqx_inflight_SUITE.erl

@@ -126,73 +126,3 @@ t_to_list(_) ->
     ),
     ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)],
     ?assertEqual(ExpList, emqx_inflight:to_list(Inflight)).
-
-t_query(_) ->
-    EmptyInflight = emqx_inflight:new(500),
-    ?assertMatch(
-        {[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50})
-    ),
-    ?assertMatch(
-        {[], #{continuation := end_of_data}},
-        emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50})
-    ),
-    ?assertMatch(
-        {[], #{continuation := end_of_data}},
-        emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50})
-    ),
-
-    Inflight = lists:foldl(
-        fun(Seq, QAcc) ->
-            emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc)
-        end,
-        EmptyInflight,
-        lists:reverse(lists:seq(1, 114))
-    ),
-
-    LastCont = lists:foldl(
-        fun(PageSeq, Cont) ->
-            Limit = 10,
-            PagerParams = #{continuation => Cont, limit => Limit},
-            {Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams),
-            ?assertEqual(10, length(Page)),
-            ExpFirst = PageSeq * Limit - Limit + 1,
-            ExpLast = PageSeq * Limit,
-            ?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)),
-            ?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)),
-            ?assertMatch(
-                #{count := 114, continuation := IntCont} when is_integer(IntCont),
-                Meta
-            ),
-            NextCont
-        end,
-        none,
-        lists:seq(1, 11)
-    ),
-    {LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{
-        continuation => LastCont, limit => 10
-    }),
-    ?assertEqual(4, length(LastPartialPage)),
-    ?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)),
-    ?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)),
-    ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
-
-    ?assertMatch(
-        {[], #{continuation := end_of_data}},
-        emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10})
-    ),
-
-    {LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}),
-    ?assertEqual(114, length(LargePage)),
-    ?assertEqual({1, <<"1">>}, hd(LargePage)),
-    ?assertEqual({114, <<"114">>}, lists:last(LargePage)),
-    ?assertMatch(#{continuation := end_of_data}, LargeMeta),
-
-    {FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}),
-    ?assertEqual(114, length(FullPage)),
-    ?assertEqual({1, <<"1">>}, hd(FullPage)),
-    ?assertEqual({114, <<"114">>}, lists:last(FullPage)),
-    ?assertMatch(#{continuation := end_of_data}, FullMeta),
-
-    {EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}),
-    ?assertEqual([], EmptyPage),
-    ?assertMatch(#{continuation := none, count := 114}, EmptyMeta).

+ 138 - 36
apps/emqx/test/emqx_mqueue_SUITE.erl

@@ -284,13 +284,15 @@ t_dropped(_) ->
 
 t_query(_) ->
     EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}),
-    ?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})),
-    ?assertMatch(
-        {[], #{continuation := end_of_data}},
-        ?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50})
+    ?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})),
+    RandPos = {erlang:system_time(nanosecond), 0},
+    ?assertEqual(
+        {[], #{position => RandPos, start => none}},
+        ?Q:query(EmptyQ, #{position => RandPos, limit => 50})
     ),
-    ?assertMatch(
-        {[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50})
+    ?assertEqual(
+        {[], #{position => none, start => none}},
+        ?Q:query(EmptyQ, #{continuation => none, limit => 50})
     ),
 
     Q = lists:foldl(
@@ -303,52 +305,146 @@ t_query(_) ->
         lists:seq(1, 114)
     ),
 
-    LastCont = lists:foldl(
-        fun(PageSeq, Cont) ->
+    {LastPos, LastStart} = lists:foldl(
+        fun(PageSeq, {Pos, PrevStart}) ->
             Limit = 10,
-            PagerParams = #{continuation => Cont, limit => Limit},
-            {Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams),
+            PagerParams = #{position => Pos, limit => Limit},
+            {Page, #{position := NextPos, start := Start}} = ?Q:query(Q, PagerParams),
             ?assertEqual(10, length(Page)),
             ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
             ExpLastPayload = integer_to_binary(PageSeq * Limit),
-            ?assertEqual(
-                ExpFirstPayload,
-                emqx_message:payload(lists:nth(1, Page)),
-                #{page_seq => PageSeq, page => Page, meta => Meta}
-            ),
-            ?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))),
-            ?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta),
-            NextCont
+            FirstMsg = lists:nth(1, Page),
+            LastMsg = lists:nth(10, Page),
+            ?assertEqual(ExpFirstPayload, emqx_message:payload(FirstMsg)),
+            ?assertEqual(ExpLastPayload, emqx_message:payload(LastMsg)),
+            %% start value must not change as Mqueue is not modified during traversal
+            NextStart =
+                case PageSeq of
+                    1 ->
+                        ?assertEqual({mqueue_ts(FirstMsg), 0}, Start),
+                        Start;
+                    _ ->
+                        ?assertEqual(PrevStart, Start),
+                        PrevStart
+                end,
+            {NextPos, NextStart}
         end,
-        none,
+        {none, none},
         lists:seq(1, 11)
     ),
-    {LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}),
+
+    {LastPartialPage, #{position := FinalPos} = LastMeta} = ?Q:query(Q, #{
+        position => LastPos, limit => 10
+    }),
+    LastMsg = lists:nth(4, LastPartialPage),
     ?assertEqual(4, length(LastPartialPage)),
     ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
-    ?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))),
-    ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
-
-    ?assertMatch(
-        {[], #{continuation := end_of_data}},
-        ?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10})
+    ?assertEqual(<<"114">>, emqx_message:payload(LastMsg)),
+    ?assertEqual(#{position => {mqueue_ts(LastMsg), 0}, start => LastStart}, LastMeta),
+    ?assertEqual(
+        {[], #{start => LastStart, position => FinalPos}},
+        ?Q:query(Q, #{position => FinalPos, limit => 10})
     ),
 
-    {LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}),
+    {LargePage, LargeMeta} = ?Q:query(Q, #{position => none, limit => 1000}),
     ?assertEqual(114, length(LargePage)),
     ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
     ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
-    ?assertMatch(#{continuation := end_of_data}, LargeMeta),
+    ?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta),
+
+    {FullPage, FullMeta} = ?Q:query(Q, #{position => none, limit => 114}),
+    ?assertEqual(LargePage, FullPage),
+    ?assertEqual(LargeMeta, FullMeta),
+
+    {_, Q1} = emqx_mqueue:out(Q),
+    {PageAfterRemove, #{start := StartAfterRemove}} = ?Q:query(Q1, #{position => none, limit => 10}),
+    ?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))),
+    ?assertEqual(StartAfterRemove, {mqueue_ts(hd(PageAfterRemove)), 0}).
+
+t_query_with_priorities(_) ->
+    Priorities = #{<<"t/infinity">> => infinity, <<"t/10">> => 10, <<"t/5">> => 5},
+    EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true, priorities => Priorities}),
+
+    ?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})),
+    RandPos = {erlang:system_time(nanosecond), 0},
+    ?assertEqual(
+        {[], #{position => RandPos, start => none}},
+        ?Q:query(EmptyQ, #{position => RandPos, limit => 50})
+    ),
+    ?assertEqual(
+        {[], #{position => none, start => none}},
+        ?Q:query(EmptyQ, #{continuation => none, limit => 50})
+    ),
 
-    {FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}),
-    ?assertEqual(114, length(FullPage)),
-    ?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))),
-    ?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))),
-    ?assertMatch(#{continuation := end_of_data}, FullMeta),
+    {Q, ExpMsgsAcc} = lists:foldl(
+        fun(Topic, {QAcc, MsgsAcc}) ->
+            {TopicQ, TopicMsgs} =
+                lists:foldl(
+                    fun(Seq, {TopicQAcc, TopicMsgsAcc}) ->
+                        Payload = <<Topic/binary, "_", (integer_to_binary(Seq))/binary>>,
+                        Msg = emqx_message:make(Topic, Payload),
+                        {_, TopicQAcc1} = ?Q:in(Msg, TopicQAcc),
+                        {TopicQAcc1, [Msg | TopicMsgsAcc]}
+                    end,
+                    {QAcc, []},
+                    lists:seq(1, 10)
+                ),
+            {TopicQ, [lists:reverse(TopicMsgs) | MsgsAcc]}
+        end,
+        {EmptyQ, []},
+        [<<"t/test">>, <<"t/5">>, <<"t/infinity">>, <<"t/10">>]
+    ),
 
-    {EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}),
-    ?assertEqual([], EmptyPage),
-    ?assertMatch(#{continuation := none, count := 114}, EmptyMeta).
+    %% Manual resorting from the highest to the lowest priority
+    [ExpMsgsPrio0, ExpMsgsPrio5, ExpMsgsPrioInf, ExpMsgsPrio10] = lists:reverse(ExpMsgsAcc),
+    ExpMsgs = ExpMsgsPrioInf ++ ExpMsgsPrio10 ++ ExpMsgsPrio5 ++ ExpMsgsPrio0,
+    {AllMsgs, #{start := StartPos, position := Pos}} = ?Q:query(Q, #{position => none, limit => 40}),
+    ?assertEqual(40, length(AllMsgs)),
+    ?assertEqual(ExpMsgs, with_empty_extra(AllMsgs)),
+    FirstMsg = hd(AllMsgs),
+    LastMsg = lists:last(AllMsgs),
+    ?assertEqual(<<"t/infinity_1">>, emqx_message:payload(FirstMsg)),
+    ?assertEqual(StartPos, {mqueue_ts(FirstMsg), infinity}),
+    ?assertEqual(<<"t/test_10">>, emqx_message:payload(LastMsg)),
+    ?assertMatch({_, 0}, Pos),
+    ?assertEqual(Pos, {mqueue_ts(LastMsg), mqueue_prio(LastMsg)}),
+
+    Pos5 = {mqueue_ts(lists:nth(5, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))},
+    LastInfPos = {mqueue_ts(lists:nth(10, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))},
+
+    {MsgsPrioInfTo10, #{start := StartPos, position := PosPrio10Msg5}} = ?Q:query(Q, #{
+        position => Pos5, limit => 10
+    }),
+    ?assertEqual(10, length(MsgsPrioInfTo10)),
+    ?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo10))),
+    ?assertEqual(<<"t/10_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo10))),
+    ?assertEqual(PosPrio10Msg5, {
+        mqueue_ts(lists:last(MsgsPrioInfTo10)), mqueue_prio(lists:last(MsgsPrioInfTo10))
+    }),
+
+    {MsgsPrioInfTo5, #{start := StartPos, position := PosPrio5Msg5}} = ?Q:query(Q, #{
+        position => Pos5, limit => 20
+    }),
+    ?assertEqual(20, length(MsgsPrioInfTo5)),
+    ?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo5))),
+    ?assertEqual(<<"t/5_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo5))),
+    ?assertEqual(PosPrio5Msg5, {
+        mqueue_ts(lists:last(MsgsPrioInfTo5)), mqueue_prio(lists:last(MsgsPrioInfTo5))
+    }),
+
+    {MsgsPrio10, #{start := StartPos, position := PosPrio10}} = ?Q:query(Q, #{
+        position => LastInfPos, limit => 10
+    }),
+    ?assertEqual(ExpMsgsPrio10, with_empty_extra(MsgsPrio10)),
+    ?assertEqual(10, length(MsgsPrio10)),
+    ?assertEqual(<<"t/10_1">>, emqx_message:payload(hd(MsgsPrio10))),
+    ?assertEqual(<<"t/10_10">>, emqx_message:payload(lists:last(MsgsPrio10))),
+    ?assertEqual(PosPrio10, {mqueue_ts(lists:last(MsgsPrio10)), mqueue_prio(lists:last(MsgsPrio10))}),
+
+    {MsgsPrio10To5, #{start := StartPos, position := _}} = ?Q:query(Q, #{
+        position => LastInfPos, limit => 20
+    }),
+    ?assertEqual(ExpMsgsPrio10 ++ ExpMsgsPrio5, with_empty_extra(MsgsPrio10To5)).
 
 conservation_prop() ->
     ?FORALL(
@@ -413,3 +509,9 @@ drain(Q) ->
         {{value, #message{topic = T, payload = P}}, Q1} ->
             [{T, P} | drain(Q1)]
     end.
+
+mqueue_ts(#message{extra = #{mqueue_insert_ts := Ts}}) -> Ts.
+mqueue_prio(#message{extra = #{mqueue_priority := Prio}}) -> Prio.
+
+with_empty_extra(Msgs) ->
+    [M#message{extra = #{}} || M <- Msgs].

+ 83 - 2
apps/emqx/test/emqx_session_mem_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
@@ -115,6 +116,80 @@ t_session_stats(_) ->
         maps:from_list(Stats)
     ).
 
+t_session_inflight_query(_) ->
+    EmptyInflight = emqx_inflight:new(500),
+    Session = session(#{inflight => EmptyInflight}),
+    EmptyQueryResMeta = {[], #{position => none, start => none}},
+    ?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
+    ?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
+    RandPos = erlang:system_time(nanosecond),
+    ?assertEqual({[], #{position => RandPos, start => none}}, inflight_query(Session, RandPos, 10)),
+    Inflight = lists:foldl(
+        fun(Seq, Acc) ->
+            Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, integer_to_binary(Seq)),
+            emqx_inflight:insert(Seq, emqx_session_mem:with_ts(Msg), Acc)
+        end,
+        EmptyInflight,
+        lists:seq(1, 114)
+    ),
+
+    Session1 = session(#{inflight => Inflight}),
+
+    {LastPos, LastStart} = lists:foldl(
+        fun(PageSeq, {Pos, PrevStart}) ->
+            Limit = 10,
+            {Page, #{position := NextPos, start := Start}} = inflight_query(Session1, Pos, Limit),
+            ?assertEqual(10, length(Page)),
+            ExpFirst = PageSeq * Limit - Limit + 1,
+            ExpLast = PageSeq * Limit,
+            FirstMsg = lists:nth(1, Page),
+            LastMsg = lists:nth(10, Page),
+            ?assertEqual(integer_to_binary(ExpFirst), emqx_message:payload(FirstMsg)),
+            ?assertEqual(integer_to_binary(ExpLast), emqx_message:payload(LastMsg)),
+            %% start value must not change as Inflight is not modified during traversal
+            NextStart =
+                case PageSeq of
+                    1 ->
+                        ?assertEqual(inflight_ts(FirstMsg), Start),
+                        Start;
+                    _ ->
+                        ?assertEqual(PrevStart, Start),
+                        PrevStart
+                end,
+            ?assertEqual(inflight_ts(LastMsg), NextPos),
+            {NextPos, NextStart}
+        end,
+        {none, none},
+        lists:seq(1, 11)
+    ),
+    {LastPartialPage, #{position := FinalPos} = LastMeta} = inflight_query(
+        Session1, LastPos, 10
+    ),
+    LastMsg = lists:nth(4, LastPartialPage),
+    ?assertEqual(4, length(LastPartialPage)),
+    ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
+    ?assertEqual(<<"114">>, emqx_message:payload(LastMsg)),
+    ?assertEqual(#{position => inflight_ts(LastMsg), start => LastStart}, LastMeta),
+    ?assertEqual(
+        {[], #{start => LastStart, position => FinalPos}},
+        inflight_query(Session1, FinalPos, 10)
+    ),
+
+    {LargePage, LargeMeta} = inflight_query(Session1, none, 1000),
+    ?assertEqual(114, length(LargePage)),
+    ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
+    ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
+    ?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta),
+
+    {FullPage, FullMeta} = inflight_query(Session1, none, 114),
+    ?assertEqual(LargePage, FullPage),
+    ?assertEqual(LargeMeta, FullMeta),
+
+    Session2 = session(#{inflight => emqx_inflight:delete(1, Inflight)}),
+    {PageAfterRemove, #{start := StartAfterRemove}} = inflight_query(Session2, none, 10),
+    ?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))),
+    ?assertEqual(StartAfterRemove, inflight_ts(hd(PageAfterRemove))).
+
 %%--------------------------------------------------------------------
 %% Test cases for sub/unsub
 %%--------------------------------------------------------------------
@@ -274,9 +349,10 @@ t_pubrel_error_packetid_not_found(_) ->
     {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubrel(1, session()).
 
 t_pubcomp(_) ->
-    Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
+    Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
+    Inflight = emqx_inflight:insert(1, with_ts(wait_comp, Msg), emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
-    {ok, undefined, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session),
+    {ok, Msg, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session1)).
 
 t_pubcomp_error_packetid_in_use(_) ->
@@ -598,3 +674,8 @@ set_duplicate_pub({Id, Msg}) ->
 
 get_packet_id({Id, _}) ->
     Id.
+
+inflight_query(Session, Pos, Limit) ->
+    emqx_session_mem:info({inflight_msgs, #{position => Pos, limit => Limit}}, Session).
+
+inflight_ts(#message{extra = #{inflight_insert_ts := Ts}}) -> Ts.

+ 12 - 21
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -178,36 +178,27 @@ fields(hasnext) ->
     >>,
     Meta = #{desc => Desc, required => true},
     [{hasnext, hoconsc:mk(boolean(), Meta)}];
-fields('after') ->
+fields(position) ->
     Desc = <<
-        "The value of \"last\" field returned in the previous response. It can then be used"
-        " in subsequent requests to get the next chunk of results.<br/>"
-        "It is used instead of \"page\" parameter to traverse volatile data.<br/>"
-        "Can be omitted or set to \"none\" to get the first chunk of data.<br/>"
-        "\last\" = end_of_data\" is returned, if there is no more data.<br/>"
-        "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
-        " error response."
+        "An opaque token that can then be in subsequent requests to get "
+        " the next chunk of results: \"?position={prev_response.meta.position}\"<br/>"
+        "It is used instead of \"page\" parameter to traverse highly volatile data.<br/>"
+        "Can be omitted or set to \"none\" to get the first chunk of data."
     >>,
     Meta = #{
-        in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">>
+        in => query, desc => Desc, required => false, example => <<"none">>
     },
-    [{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
-fields(last) ->
-    Desc = <<
-        "An opaque token that can then be in subsequent requests to get "
-        " the next chunk of results: \"?after={last}\"<br/>"
-        "if there is no more data, \"last\" = end_of_data\" is returned.<br/>"
-        "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
-        " error response."
-    >>,
+    [{position, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
+fields(start) ->
+    Desc = <<"The position of the current first element of the data collection.">>,
     Meta = #{
-        desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">>
+        desc => Desc, required => true, example => <<"none">>
     },
-    [{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
+    [{start, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}];
 fields(meta) ->
     fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext);
 fields(continuation_meta) ->
-    fields(last) ++ fields(count).
+    fields(start) ++ fields(position).
 
 -spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema().
 schema_with_example(Type, Example) ->

+ 0 - 3
apps/emqx_management/include/emqx_mgmt.hrl

@@ -15,6 +15,3 @@
 %%--------------------------------------------------------------------
 
 -define(DEFAULT_ROW_LIMIT, 100).
-
--define(URL_PARAM_INTEGER, url_param_integer).
--define(URL_PARAM_BINARY, url_param_binary).

+ 11 - 33
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -39,7 +39,6 @@
 -export([
     parse_pager_params/1,
     parse_cont_pager_params/2,
-    encode_cont_pager_params/2,
     parse_qstring/2,
     init_query_result/0,
     init_query_state/5,
@@ -138,32 +137,18 @@ page(Params) ->
 limit(Params) when is_map(Params) ->
     maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()).
 
-continuation(Params, Encoding) ->
+position(Params, Decoder) ->
     try
-        decode_continuation(maps:get(<<"after">>, Params, none), Encoding)
+        decode_position(maps:get(<<"position">>, Params, none), Decoder)
     catch
         _:_ ->
             error
     end.
 
-decode_continuation(none, _Encoding) ->
+decode_position(none, _Decoder) ->
     none;
-decode_continuation(end_of_data, _Encoding) ->
-    %% Clients should not send "after=end_of_data" back to the server
-    error;
-decode_continuation(Cont, ?URL_PARAM_INTEGER) ->
-    binary_to_integer(Cont);
-decode_continuation(Cont, ?URL_PARAM_BINARY) ->
-    emqx_utils:hexstr_to_bin(Cont).
-
-encode_continuation(none, _Encoding) ->
-    none;
-encode_continuation(end_of_data, _Encoding) ->
-    end_of_data;
-encode_continuation(Cont, ?URL_PARAM_INTEGER) ->
-    integer_to_binary(Cont);
-encode_continuation(Cont, ?URL_PARAM_BINARY) ->
-    emqx_utils:bin_to_hexstr(Cont, lower).
+decode_position(Pos, Decoder) ->
+    Decoder(Pos).
 
 %%--------------------------------------------------------------------
 %% Node Query
@@ -670,25 +655,18 @@ parse_pager_params(Params) ->
             false
     end.
 
--spec parse_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) ->
-    #{limit := pos_integer(), continuation := none | end_of_table | binary()} | false.
-parse_cont_pager_params(Params, Encoding) ->
-    Cont = continuation(Params, Encoding),
+-spec parse_cont_pager_params(map(), fun((binary()) -> term())) ->
+    #{limit := pos_integer(), position := none | term()} | false.
+parse_cont_pager_params(Params, PositionDecoder) ->
+    Pos = position(Params, PositionDecoder),
     Limit = b2i(limit(Params)),
-    case Limit > 0 andalso Cont =/= error of
+    case Limit > 0 andalso Pos =/= error of
         true ->
-            #{continuation => Cont, limit => Limit};
+            #{position => Pos, limit => Limit};
         false ->
             false
     end.
 
--spec encode_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) -> map().
-encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) ->
-    Meta1 = maps:remove(continuation, Meta),
-    Meta1#{last => encode_continuation(Cont, ContEncoding)};
-encode_cont_pager_params(Meta, _ContEncoding) ->
-    Meta.
-
 %%--------------------------------------------------------------------
 %% Types
 %%--------------------------------------------------------------------

+ 76 - 28
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -413,11 +413,11 @@ schema("/clients/:clientid/keepalive") ->
         }
     };
 schema("/clients/:clientid/mqueue_messages") ->
-    ContExample = <<"AAYS53qRa0n07AAABFIACg">>,
+    ContExample = <<"1710785444656449826_10">>,
     RespSchema = ?R_REF(mqueue_messages),
     client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema);
 schema("/clients/:clientid/inflight_messages") ->
-    ContExample = <<"10">>,
+    ContExample = <<"1710785444656449826">>,
     RespSchema = ?R_REF(inflight_messages),
     client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema);
 schema("/sessions_count") ->
@@ -656,7 +656,7 @@ fields(unsubscribe) ->
     ];
 fields(mqueue_messages) ->
     [
-        {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})},
+        {data, hoconsc:mk(hoconsc:array(?REF(mqueue_message)), #{desc => ?DESC(mqueue_msgs_list)})},
         {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})}
     ];
 fields(inflight_messages) ->
@@ -672,8 +672,18 @@ fields(message) ->
         {publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})},
         {from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})},
         {from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})},
-        {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}
+        {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})},
+        {inserted_at, hoconsc:mk(binary(), #{desc => ?DESC(msg_inserted_at)})}
     ];
+fields(mqueue_message) ->
+    fields(message) ++
+        [
+            {mqueue_priority,
+                hoconsc:mk(
+                    hoconsc:union([integer(), infinity]),
+                    #{desc => ?DESC(msg_mqueue_priority)}
+                )}
+        ];
 fields(requested_client_fields) ->
     %% NOTE: some Client fields actually returned in response are missing in schema:
     %%  enable_authn, is_persistent, listener, peerport
@@ -920,7 +930,7 @@ client_msgs_schema(OpId, Desc, ContExample, RespSchema) ->
             responses => #{
                 200 =>
                     emqx_dashboard_swagger:schema_with_example(RespSchema, #{
-                        <<"data">> => [message_example()],
+                        <<"data">> => [message_example(OpId)],
                         <<"meta">> => #{
                             <<"count">> => 100,
                             <<"last">> => ContExample
@@ -963,7 +973,7 @@ client_msgs_params() ->
                 >>,
                 validator => fun max_bytes_validator/1
             })},
-        hoconsc:ref(emqx_dashboard_swagger, 'after'),
+        hoconsc:ref(emqx_dashboard_swagger, position),
         hoconsc:ref(emqx_dashboard_swagger, limit)
     ].
 
@@ -1200,9 +1210,9 @@ is_live_session(ClientId) ->
     [] =/= emqx_cm_registry:lookup_channels(ClientId).
 
 list_client_msgs(MsgType, ClientID, QString) ->
-    case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of
+    case emqx_mgmt_api:parse_cont_pager_params(QString, pos_decoder(MsgType)) of
         false ->
-            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}};
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"position_limit_invalid">>}};
         PagerParams = #{} ->
             case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of
                 {error, not_found} ->
@@ -1212,10 +1222,34 @@ list_client_msgs(MsgType, ClientID, QString) ->
             end
     end.
 
-%% integer packet id
-cont_encoding(inflight_msgs) -> ?URL_PARAM_INTEGER;
-%% binary message id
-cont_encoding(mqueue_msgs) -> ?URL_PARAM_BINARY.
+pos_decoder(mqueue_msgs) -> fun decode_mqueue_pos/1;
+pos_decoder(inflight_msgs) -> fun decode_msg_pos/1.
+
+encode_msgs_meta(_MsgType, #{start := StartPos, position := Pos}) ->
+    #{start => encode_pos(StartPos), position => encode_pos(Pos)}.
+
+encode_pos(none) ->
+    none;
+encode_pos({MsgPos, PrioPos}) ->
+    MsgPosBin = integer_to_binary(MsgPos),
+    PrioPosBin =
+        case PrioPos of
+            infinity -> <<"infinity">>;
+            _ -> integer_to_binary(PrioPos)
+        end,
+    <<MsgPosBin/binary, "_", PrioPosBin/binary>>;
+encode_pos(Pos) when is_integer(Pos) ->
+    integer_to_binary(Pos).
+
+-spec decode_mqueue_pos(binary()) -> {integer(), infinity | integer()}.
+decode_mqueue_pos(Pos) ->
+    [MsgPos, PrioPos] = binary:split(Pos, <<"_">>),
+    {decode_msg_pos(MsgPos), decode_priority_pos(PrioPos)}.
+
+decode_msg_pos(Pos) -> binary_to_integer(Pos).
+
+decode_priority_pos(<<"infinity">>) -> infinity;
+decode_priority_pos(Pos) -> binary_to_integer(Pos).
 
 max_bytes_validator(MaxBytes) when is_integer(MaxBytes), MaxBytes > 0 ->
     ok;
@@ -1415,8 +1449,8 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) ->
         <<"payload">> := PayloadFmt,
         <<"max_payload_bytes">> := MaxBytes
     } = QString,
-    Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)),
-    Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)},
+    Meta1 = encode_msgs_meta(MsgType, Meta),
+    Resp = #{meta => Meta1, data => format_msgs(MsgType, Msgs, PayloadFmt, MaxBytes)},
     %% Make sure minirest won't set another content-type for self-encoded JSON response body
     Headers = #{<<"content-type">> => <<"application/json">>},
     case emqx_utils_json:safe_encode(Resp) of
@@ -1432,13 +1466,13 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) ->
             ?INTERNAL_ERROR(Error)
     end.
 
-format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
+format_msgs(MsgType, [FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
     %% Always include at least one message payload, even if it exceeds the limit
-    {FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt),
+    {FirstMsg1, PayloadSize0} = format_msg(MsgType, FirstMsg, PayloadFmt),
     {Msgs1, _} =
         catch lists:foldl(
             fun(Msg, {MsgsAcc, SizeAcc} = Acc) ->
-                {Msg1, PayloadSize} = format_msg(Msg, PayloadFmt),
+                {Msg1, PayloadSize} = format_msg(MsgType, Msg, PayloadFmt),
                 case SizeAcc + PayloadSize of
                     SizeAcc1 when SizeAcc1 =< MaxBytes ->
                         {[Msg1 | MsgsAcc], SizeAcc1};
@@ -1450,10 +1484,11 @@ format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
             Msgs
         ),
     lists:reverse(Msgs1);
-format_msgs([], _PayloadFmt, _MaxBytes) ->
+format_msgs(_MsgType, [], _PayloadFmt, _MaxBytes) ->
     [].
 
 format_msg(
+    MsgType,
     #message{
         id = ID,
         qos = Qos,
@@ -1462,10 +1497,10 @@ format_msg(
         timestamp = Timestamp,
         headers = Headers,
         payload = Payload
-    },
+    } = Msg,
     PayloadFmt
 ) ->
-    Msg = #{
+    MsgMap = #{
         msgid => emqx_guid:to_hexstr(ID),
         qos => Qos,
         topic => Topic,
@@ -1473,15 +1508,23 @@ format_msg(
         from_clientid => emqx_utils_conv:bin(From),
         from_username => maps:get(username, Headers, <<>>)
     },
-    format_payload(PayloadFmt, Msg, Payload).
-
-format_payload(none, Msg, _Payload) ->
-    {Msg, 0};
-format_payload(base64, Msg, Payload) ->
+    MsgMap1 = format_by_msg_type(MsgType, Msg, MsgMap),
+    format_payload(PayloadFmt, MsgMap1, Payload).
+
+format_by_msg_type(mqueue_msgs, Msg, MsgMap) ->
+    #message{extra = #{mqueue_priority := Prio, mqueue_insert_ts := Ts}} = Msg,
+    MsgMap#{mqueue_priority => Prio, inserted_at => integer_to_binary(Ts)};
+format_by_msg_type(inflight_msgs, Msg, MsgMap) ->
+    #message{extra = #{inflight_insert_ts := Ts}} = Msg,
+    MsgMap#{inserted_at => integer_to_binary(Ts)}.
+
+format_payload(none, MsgMap, _Payload) ->
+    {MsgMap, 0};
+format_payload(base64, MsgMap, Payload) ->
     Payload1 = base64:encode(Payload),
-    {Msg#{payload => Payload1}, erlang:byte_size(Payload1)};
-format_payload(plain, Msg, Payload) ->
-    {Msg#{payload => Payload}, erlang:iolist_size(Payload)}.
+    {MsgMap#{payload => Payload1}, erlang:byte_size(Payload1)};
+format_payload(plain, MsgMap, Payload) ->
+    {MsgMap#{payload => Payload}, erlang:iolist_size(Payload)}.
 
 %% format func helpers
 take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
@@ -1584,6 +1627,11 @@ client_example() ->
         <<"recv_msg.qos0">> => 0
     }.
 
+message_example(inflight_msgs) ->
+    message_example();
+message_example(mqueue_msgs) ->
+    (message_example())#{<<"mqueue_priority">> => 0}.
+
 message_example() ->
     #{
         <<"msgid">> => <<"000611F460D57FA9F44500000D360002">>,

+ 55 - 40
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -1070,18 +1070,19 @@ t_mqueue_messages(Config) ->
     Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
     ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)),
+    IsMqueue = true,
+    test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config), IsMqueue),
 
     ?assertMatch(
         {error, {_, 400, _}},
         emqx_mgmt_api_test_util:request_api(
-            get, Path, "limit=10&after=not-base64%23%21", AuthHeader
+            get, Path, "limit=10&position=not-valid", AuthHeader
         )
     ),
     ?assertMatch(
         {error, {_, 400, _}},
         emqx_mgmt_api_test_util:request_api(
-            get, Path, "limit=-5&after=not-base64%23%21", AuthHeader
+            get, Path, "limit=-5&position=not-valid", AuthHeader
         )
     ).
 
@@ -1093,18 +1094,21 @@ t_inflight_messages(Config) ->
     Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
     InflightLimit = emqx:get_config([mqtt, max_inflight]),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)),
+    IsMqueue = false,
+    test_messages(
+        Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config), IsMqueue
+    ),
 
     ?assertMatch(
         {error, {_, 400, _}},
         emqx_mgmt_api_test_util:request_api(
-            get, Path, "limit=10&after=not-int", AuthHeader
+            get, Path, "limit=10&position=not-int", AuthHeader
         )
     ),
     ?assertMatch(
         {error, {_, 400, _}},
         emqx_mgmt_api_test_util:request_api(
-            get, Path, "limit=-5&after=invalid-int", AuthHeader
+            get, Path, "limit=-5&position=invalid-int", AuthHeader
         )
     ),
     emqtt:stop(Client).
@@ -1142,19 +1146,16 @@ publish_msgs(Topic, Count) ->
         lists:seq(1, Count)
     ).
 
-test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
+test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
     Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
     {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
     #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
+    #{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
 
-    ?assertMatch(
-        #{
-            <<"last">> := <<"end_of_data">>,
-            <<"count">> := Count
-        },
-        Meta
-    ),
+    ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
+    ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
     ?assertEqual(length(Msgs), Count),
+
     lists:foreach(
         fun({Seq, #{<<"payload">> := P} = M}) ->
             ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
@@ -1165,10 +1166,12 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
                     <<"qos">> := _,
                     <<"publish_at">> := _,
                     <<"from_clientid">> := _,
-                    <<"from_username">> := _
+                    <<"from_username">> := _,
+                    <<"inserted_at">> := _
                 },
                 M
-            )
+            ),
+            IsMqueue andalso ?assertMatch(#{<<"mqueue_priority">> := _}, M)
         end,
         lists:zip(lists:seq(1, Count), Msgs)
     ),
@@ -1183,62 +1186,69 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
         get, Path, QsPayloadLimit, AuthHeader
     ),
     #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
-    ct:pal("~p", [FirstMsgOnly]),
     ?assertEqual(1, length(FirstMsgOnly)),
     ?assertEqual(
         <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
     ),
 
     Limit = 19,
-    LastCont = lists:foldl(
-        fun(PageSeq, Cont) ->
-            Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]),
-            {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
+    LastPos = lists:foldl(
+        fun(PageSeq, ThisPos) ->
+            Qs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, ThisPos, Limit]),
+            {ok, MsgsRespPage} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
             #{
-                <<"meta">> := #{<<"last">> := NextCont} = MetaP,
-                <<"data">> := MsgsP
-            } = emqx_utils_json:decode(MsgsRespP),
-            ?assertMatch(#{<<"count">> := Count}, MetaP),
-            ?assertNotEqual(<<"end_of_data">>, NextCont),
-            ?assertEqual(length(MsgsP), Limit),
+                <<"meta">> := #{<<"position">> := NextPos, <<"start">> := ThisStart},
+                <<"data">> := MsgsPage
+            } = emqx_utils_json:decode(MsgsRespPage),
+
+            ?assertEqual(NextPos, msg_pos(lists:last(MsgsPage), IsMqueue)),
+            %% Start position is the same in every response and points to the first msg
+            ?assertEqual(StartPos, ThisStart),
+
+            ?assertEqual(length(MsgsPage), Limit),
             ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
             ExpLastPayload = integer_to_binary(PageSeq * Limit),
             ?assertEqual(
-                ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding)
+                ExpFirstPayload,
+                decode_payload(maps:get(<<"payload">>, hd(MsgsPage)), PayloadEncoding)
             ),
             ?assertEqual(
                 ExpLastPayload,
-                decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding)
+                decode_payload(maps:get(<<"payload">>, lists:last(MsgsPage)), PayloadEncoding)
             ),
-            NextCont
+            NextPos
         end,
         none,
         lists:seq(1, Count div 19)
     ),
     LastPartialPage = Count div 19 + 1,
-    LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]),
+    LastQs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, LastPos, Limit]),
     {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
-    #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(
+    #{<<"meta">> := #{<<"position">> := LastPartialPos}, <<"data">> := MsgsLastPage} = emqx_utils_json:decode(
         MsgsRespLastP
     ),
-    ?assertEqual(<<"end_of_data">>, EmptyCont),
-    ?assertMatch(#{<<"count">> := Count}, MetaLastP),
+    %% The same as the position of all messages returned in one request
+    ?assertEqual(Pos, LastPartialPos),
 
     ?assertEqual(
         integer_to_binary(LastPartialPage * Limit - Limit + 1),
-        decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding)
+        decode_payload(maps:get(<<"payload">>, hd(MsgsLastPage)), PayloadEncoding)
     ),
     ?assertEqual(
         integer_to_binary(Count),
-        decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding)
+        decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastPage)), PayloadEncoding)
     ),
 
-    ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [
-        PayloadEncoding, EmptyCont, Limit
+    ExceedQs = io_lib:format("payload=~s&position=~s&limit=~p", [
+        PayloadEncoding, LastPartialPos, Limit
     ]),
+    {ok, MsgsEmptyResp} = emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader),
     ?assertMatch(
-        {error, {_, 400, _}},
-        emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader)
+        #{
+            <<"data">> := [],
+            <<"meta">> := #{<<"position">> := LastPartialPos, <<"start">> := StartPos}
+        },
+        emqx_utils_json:decode(MsgsEmptyResp)
     ),
 
     %% Invalid common page params
@@ -1269,6 +1279,11 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
         emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0MB", AuthHeader)
     ).
 
+msg_pos(#{<<"inserted_at">> := TsBin, <<"mqueue_priority">> := Prio} = _Msg, true = _IsMqueue) ->
+    <<TsBin/binary, "_", (emqx_utils_conv:bin(Prio))/binary>>;
+msg_pos(#{<<"inserted_at">> := TsBin} = _Msg, _IsMqueue) ->
+    TsBin.
+
 decode_payload(Payload, base64) ->
     base64:decode(Payload);
 decode_payload(Payload, _) ->

+ 3 - 0
apps/emqx_utils/include/emqx_message.hrl

@@ -37,6 +37,9 @@
     %% Timestamp (Unit: millisecond)
     timestamp :: integer(),
     %% Miscellaneous extensions, currently used for OpenTelemetry context propagation
+    %% and storing mqueue/inflight insertion timestamps.
+    %% It was not used prior to 5.4.0 and defaulted to an empty list.
+    %% Must be a map now.
     extra = #{} :: term()
 }).
 

+ 9 - 10
changes/ce/feat-12561.en.md

@@ -1,21 +1,20 @@
-Implement HTTP APIs to get the list of client's inflight and mqueue messages.
+Implement HTTP APIs to get the list of client's in-flight and mqueue messages.
 
 To get the first chunk of data:
  - GET /clients/{clientid}/mqueue_messages?limit=100
  - GET /clients/{clientid}/inflight_messages?limit=100
 
 Alternatively:
- - GET /clients/{clientid}/mqueue_messages?limit=100&after=none
- - GET /clients/{clientid}/inflight_messages?limit=100&after=none
+ - GET /clients/{clientid}/mqueue_messages?limit=100&position=none
+ - GET /clients/{clientid}/inflight_messages?limit=100&position=none
 
 To get the next chunk of data:
- - GET /clients/{clientid}/mqueue_messages?limit=100&after={last}
- - GET /clients/{clientid}/inflight_messages?limit=100&after={last}
+ - GET /clients/{clientid}/mqueue_messages?limit=100&position={position}
+ - GET /clients/{clientid}/inflight_messages?limit=100&position={position}
 
- Where {last} is a value (opaque string token) of "meta.last" field from the previous response.
+Where {position} is a value (opaque string token) of "meta.position" field from the previous response.
 
- If there is no more data, "last" = "end_of_data" is returned.
- If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received.
+Mqueue messages are ordered according to their priority and queue (FIFO) order: from higher priority to lower priority.
+By default, all messages in Mqueue have the same priority of 0.
 
-Mqueue messages are ordered according to the queue (FIFO) order.
-Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order.
+In-flight messages are ordered by time at which they were inserted to the in-flight storage (from older to newer messages).

+ 20 - 8
rel/i18n/emqx_mgmt_api_clients.hocon

@@ -41,20 +41,22 @@ get_client_mqueue_msgs.label:
 """Get client mqueue messages"""
 
 get_client_inflight_msgs.desc:
-"""Get client inflight messages"""
+"""Get client in-flight messages"""
 get_client_inflight_msgs.label:
-"""Get client inflight messages"""
+"""Get client in-flight messages"""
 
 mqueue_msgs_list.desc:
-"""Client's mqueue messages list. The queue (FIFO) ordering is preserved."""
+"""Client's mqueue messages list.
+Messages are ordered according to their priority and queue (FIFO) order: from higher priority to lower priority.
+By default, all messages in Mqueue have the same priority of 0."""
 mqueue_msgs_list.label:
 """Client's mqueue messages"""
 
 inflight_msgs_list.desc:
-"""Client's inflight messages list.
-Ordered by MQTT Packet Id, which may not represent the chronological messages order."""
+"""Client's in-flight messages list.
+Messages are sorted by time at which they were inserted to the In-flight storage (from older to newer messages)."""
 inflight_msgs_list.label:
-"""Client's inflight messages"""
+"""Client's in-flight messages"""
 
 msg_id.desc:
 """Message ID."""
@@ -74,7 +76,7 @@ msg_topic.label:
 msg_publish_at.desc:
 """Message publish time, a millisecond precision Unix epoch timestamp."""
 msg_publish_at.label:
-"""Message Publish Time."""
+"""Message Publish Time"""
 
 msg_from_clientid.desc:
 """Message publisher's client ID."""
@@ -84,7 +86,17 @@ msg_from_clientid.desc:
 msg_from_username.desc:
 """Message publisher's username."""
 msg_from_username.label:
-"""Message Publisher's Username """
+"""Message Publisher's Username"""
+
+msg_inserted_at.desc:
+"""A nanosecond precision Unix epoch timestamp at which a message was inserted to In-flight / Mqueue."""
+msg_inserted_at.label:
+"""Message Insertion Time"""
+
+msg_mqueue_priority.desc:
+"""Message Mqueue Priority."""
+msg_mqueue_priority.label:
+"""Message Mqueue Priority"""
 
 subscribe.desc:
 """Subscribe"""