Jelajahi Sumber

Merge pull request #13925 from keynslug/feat/EMQX-12588/list-queues

feat(dssubs): add facilities to iterate through the store
Andrew Mayorov 1 tahun lalu
induk
melakukan
e979376b72

+ 61 - 26
apps/emqx/src/emqx_ds_schema.erl

@@ -18,9 +18,12 @@
 -module(emqx_ds_schema).
 
 %% API:
--export([schema/0, storage_schema/1, translate_builtin_raft/1, translate_builtin_local/1]).
+-export([schema/0, db_schema/1, db_schema/2]).
 -export([db_config/1]).
 
+%% Internal exports:
+-export([translate_builtin_raft/1, translate_builtin_local/1]).
+
 %% Behavior callbacks:
 -export([fields/1, desc/1, namespace/0]).
 
@@ -39,10 +42,10 @@
 
 -if(?EMQX_RELEASE_EDITION == ee).
 -define(DEFAULT_BACKEND, builtin_raft).
--define(BUILTIN_BACKENDS, [ref(builtin_raft), ref(builtin_local)]).
+-define(BUILTIN_BACKENDS, [builtin_raft, builtin_local]).
 -else.
 -define(DEFAULT_BACKEND, builtin_local).
--define(BUILTIN_BACKENDS, [ref(builtin_local)]).
+-define(BUILTIN_BACKENDS, [builtin_local]).
 -endif.
 
 %%================================================================================
@@ -59,32 +62,34 @@ translate_builtin_raft(
         backend := builtin_raft,
         n_shards := NShards,
         n_sites := NSites,
-        replication_factor := ReplFactor,
-        layout := Layout
+        replication_factor := ReplFactor
     }
 ) ->
+    %% NOTE: Undefined if `basic` schema is in use.
+    Layout = maps:get(layout, Backend, undefined),
     #{
         backend => builtin_raft,
         n_shards => NShards,
         n_sites => NSites,
         replication_factor => ReplFactor,
         replication_options => maps:get(replication_options, Backend, #{}),
-        storage => translate_layout(Layout)
+        storage => emqx_maybe:apply(fun translate_layout/1, Layout)
     }.
 
 translate_builtin_local(
-    #{
+    Backend = #{
         backend := builtin_local,
-        n_shards := NShards,
-        layout := Layout,
-        poll_workers_per_shard := NPollers,
-        poll_batch_size := BatchSize
+        n_shards := NShards
     }
 ) ->
+    %% NOTE: Undefined if `basic` schema is in use.
+    Layout = maps:get(layout, Backend, undefined),
+    NPollers = maps:get(poll_workers_per_shard, Backend, undefined),
+    BatchSize = maps:get(poll_batch_size, Backend, undefined),
     #{
         backend => builtin_local,
         n_shards => NShards,
-        storage => translate_layout(Layout),
+        storage => emqx_maybe:apply(fun translate_layout/1, Layout),
         poll_workers_per_shard => NPollers,
         poll_batch_size => BatchSize
     }.
@@ -99,24 +104,35 @@ namespace() ->
 schema() ->
     [
         {messages,
-            storage_schema(#{
+            db_schema(#{
                 importance => ?IMPORTANCE_MEDIUM,
                 desc => ?DESC(messages)
             })}
     ] ++ emqx_schema_hooks:injection_point('durable_storage', []).
 
-storage_schema(ExtraOptions) ->
+db_schema(ExtraOptions) ->
+    db_schema(complete, ExtraOptions).
+
+db_schema(Flavor, ExtraOptions) ->
     Options = #{
         default => #{<<"backend">> => ?DEFAULT_BACKEND}
     },
+    BuiltinBackends = [backend_ref(Backend, Flavor) || Backend <- ?BUILTIN_BACKENDS],
+    CustomBackends = emqx_schema_hooks:injection_point('durable_storage.backends', []),
     sc(
-        hoconsc:union(
-            ?BUILTIN_BACKENDS ++ emqx_schema_hooks:injection_point('durable_storage.backends', [])
-        ),
+        hoconsc:union(BuiltinBackends ++ CustomBackends),
         maps:merge(Options, ExtraOptions)
     ).
 
-fields(builtin_local) ->
+-dialyzer({nowarn_function, backend_ref/2}).
+backend_ref(Backend, complete) ->
+    ref(Backend);
+backend_ref(builtin_local, basic) ->
+    ref(builtin_local_basic);
+backend_ref(builtin_raft, basic) ->
+    ref(builtin_raft_basic).
+
+backend_fields(builtin_local, Flavor) ->
     %% Schema for the builtin_raft backend:
     [
         {backend,
@@ -138,9 +154,9 @@ fields(builtin_local) ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}
-        | common_builtin_fields()
+        | common_builtin_fields(Flavor)
     ];
-fields(builtin_raft) ->
+backend_fields(builtin_raft, Flavor) ->
     %% Schema for the builtin_raft backend:
     [
         {backend,
@@ -189,8 +205,17 @@ fields(builtin_raft) ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}
-        | common_builtin_fields()
-    ];
+        | common_builtin_fields(Flavor)
+    ].
+
+fields(builtin_local) ->
+    backend_fields(builtin_local, complete);
+fields(builtin_raft) ->
+    backend_fields(builtin_raft, complete);
+fields(builtin_local_basic) ->
+    backend_fields(builtin_local, basic);
+fields(builtin_raft_basic) ->
+    backend_fields(builtin_raft, basic);
 fields(builtin_write_buffer) ->
     [
         {max_items,
@@ -301,7 +326,7 @@ fields(layout_builtin_reference) ->
             )}
     ].
 
-common_builtin_fields() ->
+common_builtin_fields(basic) ->
     [
         {data_dir,
             sc(
@@ -329,7 +354,10 @@ common_builtin_fields() ->
                     importance => ?IMPORTANCE_HIDDEN,
                     desc => ?DESC(builtin_write_buffer)
                 }
-            )},
+            )}
+    ];
+common_builtin_fields(layout) ->
+    [
         {layout,
             sc(
                 hoconsc:union(builtin_layouts()),
@@ -341,7 +369,10 @@ common_builtin_fields() ->
                             <<"type">> => wildcard_optimized_v2
                         }
                 }
-            )},
+            )}
+    ];
+common_builtin_fields(polling) ->
+    [
         {poll_workers_per_shard,
             sc(
                 pos_integer(),
@@ -358,7 +389,11 @@ common_builtin_fields() ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}
-    ].
+    ];
+common_builtin_fields(complete) ->
+    common_builtin_fields(basic) ++
+        common_builtin_fields(layout) ++
+        common_builtin_fields(polling).
 
 desc(builtin_raft) ->
     ?DESC(builtin_raft);

+ 6 - 59
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -54,11 +54,6 @@
     cold_get_subscription/2
 ]).
 
--export([
-    format_lease_events/1,
-    format_stream_progresses/1
-]).
-
 -define(schedule_subscribe, schedule_subscribe).
 -define(schedule_unsubscribe, schedule_unsubscribe).
 
@@ -239,7 +234,7 @@ schedule_subscribe(
             ?tp(debug, shared_subs_schedule_subscribe_override, #{
                 share_topic_filter => ShareTopicFilter,
                 new_type => {?schedule_subscribe, SubOpts},
-                old_action => format_schedule_action(ScheduledAction)
+                old_action => ScheduledAction
             }),
             SharedSubS0#{scheduled_actions := ScheduledActions1};
         _ ->
@@ -291,7 +286,7 @@ schedule_unsubscribe(
             ?tp(debug, shared_subs_schedule_unsubscribe_override, #{
                 share_topic_filter => ShareTopicFilter,
                 new_type => ?schedule_unsubscribe,
-                old_action => format_schedule_action(ScheduledAction0)
+                old_action => ScheduledAction0
             }),
             SharedSubS0#{scheduled_actions := ScheduledActions1};
         _ ->
@@ -305,7 +300,7 @@ schedule_unsubscribe(
             },
             ?tp(debug, shared_subs_schedule_unsubscribe_new, #{
                 share_topic_filter => ShareTopicFilter,
-                stream_keys => format_stream_keys(StreamKeys)
+                stream_keys => StreamKeys
             }),
             SharedSubS0#{scheduled_actions := ScheduledActions1}
     end.
@@ -324,7 +319,7 @@ renew_streams(S0, SchedS0, #{agent := Agent0} = SharedS0) ->
         emqx_persistent_session_ds_shared_subs_agent:renew_streams(Agent0),
     StreamLeaseEvents =/= [] andalso
         ?tp(debug, shared_subs_new_stream_lease_events, #{
-            stream_lease_events => format_lease_events(StreamLeaseEvents)
+            stream_lease_events => StreamLeaseEvents
         }),
     {S, SchedS} = lists:foldl(
         fun
@@ -491,7 +486,7 @@ run_scheduled_action(
         [] ->
             ?tp(debug, shared_subs_schedule_action_complete, #{
                 share_topic_filter => ShareTopicFilter,
-                progresses => format_stream_progresses(Progresses1),
+                progresses => Progresses1,
                 type => Type
             }),
             %% Regular progress won't se unsubscribed streams, so we need to
@@ -515,7 +510,7 @@ run_scheduled_action(
             Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
             ?tp(debug, shared_subs_schedule_action_continue, #{
                 share_topic_filter => ShareTopicFilter,
-                new_action => format_schedule_action(Action1)
+                new_action => Action1
             }),
             {continue, Action1}
     end.
@@ -725,51 +720,3 @@ is_stream_fully_acked(_, _, #srs{
     true;
 is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
-
-%%--------------------------------------------------------------------
-%% Formatters
-%%--------------------------------------------------------------------
-
-format_schedule_action(#{
-    type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
-}) ->
-    #{
-        type => Type,
-        progresses => format_stream_progresses(Progresses),
-        stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
-    }.
-
-format_stream_progresses(Streams) ->
-    lists:map(
-        fun format_stream_progress/1,
-        Streams
-    ).
-
-format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
-    Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
-
-format_progress(#{iterator := Iterator} = Progress) ->
-    Progress#{iterator => format_opaque(Iterator)}.
-
-format_stream_key(beginning) -> beginning;
-format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
-
-format_stream_keys(StreamKeys) ->
-    lists:map(
-        fun format_stream_key/1,
-        StreamKeys
-    ).
-
-format_lease_events(Events) ->
-    lists:map(
-        fun format_lease_event/1,
-        Events
-    ).
-
-format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
-    Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
-format_lease_event(#{stream := Stream} = Event) ->
-    Event#{stream => format_opaque(Stream)}.
-
-format_opaque(Opaque) ->
-    erlang:phash2(Opaque).

+ 1 - 2
apps/emqx/src/emqx_types.erl

@@ -144,8 +144,7 @@
 
 -type subid() :: binary() | atom().
 
-%% '_' for match spec
--type group() :: binary() | '_'.
+-type group() :: binary().
 -type topic() :: binary().
 -type word() :: '' | '+' | '#' | binary().
 -type words() :: list(word()).

+ 2 - 2
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -202,8 +202,8 @@ fields(limit) ->
     [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}];
 fields(cursor) ->
     Desc = <<"Opaque value representing the current iteration state.">>,
-    Meta = #{default => none, in => query, desc => Desc},
-    [{cursor, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}];
+    Meta = #{required => false, in => query, desc => Desc},
+    [{cursor, hoconsc:mk(binary(), Meta)}];
 fields(cursor_response) ->
     Desc = <<"Opaque value representing the current iteration state.">>,
     Meta = #{desc => Desc, required => false},

+ 88 - 29
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -10,6 +10,11 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-define(DESC_BAD_REQUEST, <<"Erroneous request">>).
+-define(RESP_BAD_REQUEST(MSG),
+    {400, #{code => <<"BAD_REQUEST">>, message => MSG}}
+).
+
 -define(DESC_NOT_FOUND, <<"Queue not found">>).
 -define(RESP_NOT_FOUND,
     {404, #{code => <<"NOT_FOUND">>, message => ?DESC_NOT_FOUND}}
@@ -71,11 +76,13 @@ schema("/durable_queues") ->
             tags => ?TAGS,
             summary => <<"List declared durable queues">>,
             description => ?DESC("durable_queues_get"),
+            parameters => [
+                hoconsc:ref(emqx_dashboard_swagger, cursor),
+                hoconsc:ref(emqx_dashboard_swagger, limit)
+            ],
             responses => #{
-                200 => emqx_dashboard_swagger:schema_with_example(
-                    durable_queues_get(),
-                    durable_queues_get_example()
-                )
+                200 => resp_list_durable_queues(),
+                400 => error_codes(['BAD_REQUEST'], ?DESC_BAD_REQUEST)
             }
         },
         post => #{
@@ -84,10 +91,7 @@ schema("/durable_queues") ->
             description => ?DESC("durable_queues_post"),
             'requestBody' => durable_queue_post(),
             responses => #{
-                201 => emqx_dashboard_swagger:schema_with_example(
-                    durable_queue_get(),
-                    durable_queue_get_example()
-                ),
+                201 => resp_create_durable_queue(),
                 409 => error_codes(['CONFLICT'], ?DESC_CREATE_CONFICT)
             }
         }
@@ -121,8 +125,23 @@ schema("/durable_queues/:id") ->
         }
     }.
 
-'/durable_queues'(get, _Params) ->
-    {200, queue_list()};
+'/durable_queues'(get, #{query_string := QString}) ->
+    Cursor = maps:get(<<"cursor">>, QString, undefined),
+    Limit = maps:get(<<"limit">>, QString),
+    try queue_list(Cursor, Limit) of
+        {Queues, CursorNext} ->
+            Data = [encode_props(ID, Props) || {ID, Props} <- Queues],
+            case CursorNext of
+                undefined ->
+                    Meta = #{hasnext => false};
+                _Cursor ->
+                    Meta = #{hasnext => true, cursor => CursorNext}
+            end,
+            {200, #{data => Data, meta => Meta}}
+    catch
+        throw:Error ->
+            ?RESP_BAD_REQUEST(emqx_utils:readable_error_msg(Error))
+    end;
 '/durable_queues'(post, #{body := Params}) ->
     case queue_declare(Params) of
         {ok, Queue} ->
@@ -135,7 +154,7 @@ schema("/durable_queues/:id") ->
 
 '/durable_queues/:id'(get, Params) ->
     case queue_get(Params) of
-        Queue when Queue =/= false ->
+        {ok, Queue} ->
             {200, encode_queue(Queue)};
         false ->
             ?RESP_NOT_FOUND
@@ -152,9 +171,8 @@ schema("/durable_queues/:id") ->
             ?RESP_INTERNAL_ERROR(emqx_utils:readable_error_msg(Reason))
     end.
 
-queue_list() ->
-    %% TODO
-    [].
+queue_list(Cursor, Limit) ->
+    emqx_ds_shared_sub_queue:list(Cursor, Limit).
 
 queue_get(#{bindings := #{id := ID}}) ->
     emqx_ds_shared_sub_queue:lookup(ID).
@@ -175,6 +193,9 @@ encode_queue(Queue) ->
         emqx_ds_shared_sub_queue:properties(Queue)
     ).
 
+encode_props(ID, Props) ->
+    maps:merge(#{id => ID}, Props).
+
 %%--------------------------------------------------------------------
 %% Schemas
 %%--------------------------------------------------------------------
@@ -190,26 +211,50 @@ param_queue_id() ->
         })
     }.
 
+resp_list_durable_queues() ->
+    emqx_dashboard_swagger:schema_with_example(
+        ref(resp_list_durable_queues),
+        durable_queues_list_example()
+    ).
+
+resp_create_durable_queue() ->
+    emqx_dashboard_swagger:schema_with_example(
+        durable_queue_post(),
+        durable_queue_get_example()
+    ).
+
 validate_queue_id(Id) ->
     case emqx_topic:words(Id) of
         [Segment] when is_binary(Segment) -> true;
         _ -> {error, <<"Invalid queue id">>}
     end.
 
-durable_queues_get() ->
-    hoconsc:array(ref(durable_queue_get)).
-
 durable_queue_get() ->
-    ref(durable_queue_get).
+    ref(durable_queue).
 
 durable_queue_post() ->
     map().
 
 roots() -> [].
 
-fields(durable_queue_get) ->
+fields(durable_queue) ->
     [
-        {id, mk(binary(), #{})}
+        {id,
+            mk(binary(), #{
+                desc => <<"Identifier assigned at creation time">>
+            })},
+        {created_at,
+            mk(emqx_utils_calendar:epoch_millisecond(), #{
+                desc => <<"Queue creation time">>
+            })},
+        {group, mk(binary(), #{})},
+        {topic, mk(binary(), #{})},
+        {start_time, mk(emqx_utils_calendar:epoch_millisecond(), #{})}
+    ];
+fields(resp_list_durable_queues) ->
+    [
+        {data, hoconsc:mk(hoconsc:array(durable_queue_get()), #{})},
+        {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta_with_cursor), #{})}
     ].
 
 %%--------------------------------------------------------------------
@@ -218,15 +263,29 @@ fields(durable_queue_get) ->
 
 durable_queue_get_example() ->
     #{
-        id => <<"queue1">>
+        id => <<"mygrp:1234EF">>,
+        created_at => <<"2024-01-01T12:34:56.789+02:00">>,
+        group => <<"mygrp">>,
+        topic => <<"t/devices/#">>,
+        start_time => <<"2024-01-01T00:00:00.000+02:00">>
     }.
 
-durable_queues_get_example() ->
-    [
-        #{
-            id => <<"queue1">>
-        },
-        #{
-            id => <<"queue2">>
+durable_queues_list_example() ->
+    #{
+        data => [
+            durable_queue_get_example(),
+            #{
+                id => <<"mygrp:567890AABBCC">>,
+                created_at => <<"2024-02-02T22:33:44.000+02:00">>,
+                group => <<"mygrp">>,
+                topic => <<"t/devices/#">>,
+                start_time => <<"1970-01-01T02:00:00+02:00">>
+            }
+        ],
+        %% TODO: Probably needs to be defined in `emqx_dashboard_swagger`.
+        meta => #{
+            <<"count">> => 2,
+            <<"cursor">> => <<"g2wAAAADYQFhAm0AAAACYzJq">>,
+            <<"hasnext">> => true
         }
-    ].
+    }.

+ 2 - 6
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl

@@ -260,9 +260,7 @@ handle_leader_update_streams(
         id => Id,
         version_old => VersionOld,
         version_new => VersionNew,
-        stream_progresses => emqx_persistent_session_ds_shared_subs:format_stream_progresses(
-            StreamProgresses
-        )
+        stream_progresses => StreamProgresses
     }),
     {AddEvents, Streams1} = lists:foldl(
         fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) ->
@@ -299,9 +297,7 @@ handle_leader_update_streams(
     StreamLeaseEvents = AddEvents ++ RevokeEvents,
     ?tp(debug, shared_sub_group_sm_leader_update_streams, #{
         id => Id,
-        stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events(
-            StreamLeaseEvents
-        )
+        stream_lease_events => StreamLeaseEvents
     }),
     transition(
         GSM,

+ 2 - 3
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -93,7 +93,6 @@
 
 become(ShareTopicFilter, Claim) ->
     Data0 = init_data(ShareTopicFilter),
-    Data0 =/= false orelse exit(shared_subscription_not_declared),
     Data1 = attach_claim(Claim, Data0),
     gen_statem:enter_loop(?MODULE, [], ?leader_active, Data1, init_claim_renewal(Data1)).
 
@@ -110,7 +109,7 @@ init(_Args) ->
 init_data(#share{topic = Topic} = ShareTopicFilter) ->
     StoreID = emqx_ds_shared_sub_store:mk_id(ShareTopicFilter),
     case emqx_ds_shared_sub_store:open(StoreID) of
-        Store when Store =/= false ->
+        {ok, Store} ->
             ?tp(debug, dssub_store_open, #{topic => ShareTopicFilter, store => Store}),
             #{
                 group_id => ShareTopicFilter,
@@ -122,7 +121,7 @@ init_data(#share{topic = Topic} = ShareTopicFilter) ->
         false ->
             %% NOTE: No leader store -> no subscription
             ?tp(warning, dssub_store_notfound, #{topic => ShareTopicFilter}),
-            false
+            exit(shared_subscription_not_declared)
     end.
 
 attach_claim(Claim, Data) ->

+ 0 - 4
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl

@@ -40,15 +40,11 @@ format_leader_msg(Msg) ->
 
 format_agent_msg_value(agent_msg_type, Type) ->
     agent_msg_type(Type);
-format_agent_msg_value(agent_msg_stream_states, StreamStates) ->
-    emqx_persistent_session_ds_shared_subs:format_stream_progresses(StreamStates);
 format_agent_msg_value(_, Value) ->
     Value.
 
 format_leader_msg_value(leader_msg_type, Type) ->
     leader_msg_type(Type);
-format_leader_msg_value(leader_msg_streams, Streams) ->
-    emqx_persistent_session_ds_shared_subs:format_lease_events(Streams);
 format_leader_msg_value(_, Value) ->
     Value.
 

+ 43 - 5
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_queue.erl

@@ -10,7 +10,8 @@
     exists/2,
     declare/4,
     destroy/1,
-    destroy/2
+    destroy/2,
+    list/2
 ]).
 
 -export([
@@ -60,9 +61,7 @@ destroy(Group, Topic) ->
 destroy(ID) ->
     %% TODO: There's an obvious lack of transactionality.
     case lookup(ID) of
-        false ->
-            not_found;
-        Queue ->
+        {ok, Queue} ->
             #{topic := Topic} = properties(Queue),
             case emqx_ds_shared_sub_store:destroy(Queue) of
                 ok ->
@@ -70,9 +69,19 @@ destroy(ID) ->
                     ok;
                 Error ->
                     Error
-            end
+            end;
+        false ->
+            not_found
     end.
 
+list(undefined, Limit) ->
+    list(select_properties(), Limit);
+list(Cursor, Limit) when is_binary(Cursor) ->
+    list(select_properties(Cursor), Limit);
+list(Select, Limit) ->
+    {Records, SelectNext} = emqx_ds_shared_sub_store:select_next(Select, Limit),
+    {Records, preserve_cursor(SelectNext)}.
+
 ensure_route(Topic, QueueID) ->
     _ = emqx_persistent_session_ds_router:do_add_route(Topic, QueueID),
     _ = emqx_external_broker:add_persistent_route(Topic, QueueID),
@@ -89,6 +98,35 @@ ensure_delete_route(Topic, QueueID) ->
 
 %%
 
+select_properties() ->
+    emqx_ds_shared_sub_store:select(properties).
+
+select_properties(Cursor) ->
+    try
+        emqx_ds_shared_sub_store:select(properties, decode_cursor(Cursor))
+    catch
+        error:_ ->
+            throw("Invalid cursor")
+    end.
+
+preserve_cursor(end_of_iterator) ->
+    undefined;
+preserve_cursor(Select) ->
+    case emqx_ds_shared_sub_store:select_next(Select, 1) of
+        {[], end_of_iterator} ->
+            undefined;
+        {[_], _} ->
+            encode_cursor(emqx_ds_shared_sub_store:select_preserve(Select))
+    end.
+
+encode_cursor(Cursor) ->
+    emqx_base62:encode(term_to_binary(Cursor)).
+
+decode_cursor(Cursor) ->
+    binary_to_term(emqx_base62:decode(Cursor), [safe]).
+
+%%
+
 id(Queue) ->
     emqx_ds_shared_sub_store:id(Queue).
 

+ 7 - 4
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl

@@ -50,10 +50,13 @@ injected_fields() ->
     #{
         'durable_storage' => [
             {queues,
-                emqx_ds_schema:storage_schema(#{
-                    importance => ?IMPORTANCE_HIDDEN,
-                    desc => ?DESC(durable_queues_storage)
-                })}
+                emqx_ds_schema:db_schema(
+                    basic,
+                    #{
+                        importance => ?IMPORTANCE_HIDDEN,
+                        desc => ?DESC(durable_queues_storage)
+                    }
+                )}
         ]
     }.
 

+ 147 - 70
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_store.erl

@@ -49,6 +49,13 @@
     delete/3
 ]).
 
+-export([
+    select/1,
+    select/2,
+    select_next/2,
+    select_preserve/1
+]).
+
 %% Messing with IDs
 -export([
     mk_id/1,
@@ -110,25 +117,20 @@ tune_db_config(Config0 = #{backend := Backend}) ->
     },
     case Backend of
         B when B == builtin_raft; B == builtin_local ->
-            tune_db_storage_layout(Config);
+            Storage =
+                {emqx_ds_storage_bitfield_lts, #{
+                    %% Should be enough, topic structure is pretty simple.
+                    topic_index_bytes => 4,
+                    bits_per_wildcard_level => 64,
+                    %% Enables single-epoch storage.
+                    epoch_bits => 64,
+                    lts_threshold_spec => {simple, {inf, 0, inf, 0}}
+                }},
+            Config#{storage => Storage};
         _ ->
             Config
     end.
 
-tune_db_storage_layout(Config = #{storage := {Layout, Opts0}}) when
-    Layout == emqx_ds_storage_skipstream_lts;
-    Layout == emqx_ds_storage_bitfield_lts
-->
-    Opts = Opts0#{
-        %% Since these layouts impose somewhat strict requirements on message
-        %% timestamp uniqueness, we need to additionally ensure that LTS always
-        %% keeps different groups under separate indices.
-        lts_threshold_spec => {simple, {inf, inf, inf, 0}}
-    },
-    Config#{storage := {Layout, Opts}};
-tune_db_storage_layout(Config = #{storage := _}) ->
-    Config.
-
 %%
 
 -spec mk_id(emqx_types:share()) -> id().
@@ -286,12 +288,7 @@ mk_leader_topic(ID) ->
     %% General.
     id := id(),
     %% Spaces and variables: most up-to-date in-memory state.
-    properties := #{
-        %% TODO: Efficient encoding.
-        topic => emqx_types:topic(),
-        start_time => emqx_message:timestamp(),
-        created_at => emqx_message:timestamp()
-    },
+    properties := properties(),
     stream := #{emqx_ds:stream() => stream_state()},
     rank_progress => _RankProgress,
     %% Internal _sequence numbers_ that tracks every change.
@@ -304,6 +301,14 @@ mk_leader_topic(ID) ->
     stage := #{space_key() | var_name() => _Value}
 }.
 
+-type properties() :: #{
+    %% TODO: Efficient encoding.
+    group => emqx_types:group(),
+    topic => emqx_types:topic(),
+    start_time => emqx_message:timestamp(),
+    created_at => emqx_message:timestamp()
+}.
+
 -type stream_state() :: #{
     progress => emqx_persistent_session_ds_shared_subs:progress(),
     rank => emqx_ds:stream_rank()
@@ -314,7 +319,7 @@ init(ID) ->
     %% NOTE: Empty store is impicitly dirty because rootset needs to be persisted.
     mk_store(ID).
 
--spec open(id()) -> t() | false.
+-spec open(id()) -> {ok, t()} | false | emqx_ds:error(_).
 open(ID) ->
     case open_rootset(ID) of
         Rootset = #{} ->
@@ -354,7 +359,9 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
     TopicFilter = mk_store_wildcard(ID),
     StreamIts1 = ds_refresh_streams(TopicFilter, _StartTime = 0, StreamIts0),
     {StreamIts, Store} = ds_streams_fold(
-        fun(Message, StoreAcc) -> open_message(Message, StoreAcc) end,
+        fun(Message, StoreAcc) ->
+            lists:foldl(fun slurp_record/2, StoreAcc, open_message(Message))
+        end,
         Acc,
         StreamIts1
     ),
@@ -364,7 +371,7 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
         %% concerning, because this suggests there were concurrent writes that slipped
         %% past the leadership claim guards, yet we can still make progress.
         SeqNum when SeqNum >= map_get(seqnum, Rootset) ->
-            reset_dirty(maps:merge(Store, Rootset));
+            {ok, reset_dirty(maps:merge(Store, Rootset))};
         _Mismatch when Retries > 0 ->
             ok = timer:sleep(RetryTimeout),
             slurp_store(Rootset, StreamIts, Retries - 1, RetryTimeout, Store);
@@ -372,6 +379,9 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
             {error, unrecoverable, {leader_store_inconsistent, Store, Rootset}}
     end.
 
+slurp_record({_ID, Record, ChangeSeqNum}, Store = #{seqnum := SeqNum}) ->
+    open_record(Record, Store#{seqnum := max(ChangeSeqNum, SeqNum)}).
+
 -spec get(space_name(), _ID, t()) -> _Value.
 get(SpaceName, ID, Store) ->
     Space = maps:get(SpaceName, Store),
@@ -614,6 +624,21 @@ mk_store_root_matcher(#{id := ID, committed := Committed}) ->
         timestamp = 0
     }.
 
+mk_read_root_batch(ID) ->
+    %% NOTE
+    %% Construct batch that essentially does nothing but reads rootset in a consistent
+    %% manner.
+    Matcher = #message_matcher{
+        from = ID,
+        topic = mk_store_root_topic(ID),
+        payload = '_',
+        timestamp = 0
+    },
+    #dsbatch{
+        preconditions = [{unless_exists, Matcher}],
+        operations = [{delete, Matcher#message_matcher{payload = <<>>}}]
+    }.
+
 mk_store_operation(ID, SK, ?STORE_TOMBSTONE, SeqMap) ->
     {delete, #message_matcher{
         from = ID,
@@ -644,37 +669,27 @@ open_root_message(#message{payload = Payload, timestamp = 0}) ->
     #{} = binary_to_term(Payload).
 
 open_message(
-    Msg = #message{topic = Topic, payload = Payload, timestamp = SeqNum, headers = Headers}, Store
+    Msg = #message{topic = Topic, payload = Payload, timestamp = SeqNum, headers = Headers}
 ) ->
-    Entry =
-        try
-            ChangeSeqNum = maps:get(?STORE_HEADER_CHANGESEQNUM, Headers),
-            case emqx_topic:tokens(Topic) of
-                [_Prefix, _ID, SpaceTok, _SeqTok] ->
-                    SpaceName = token_to_space(SpaceTok),
-                    ?STORE_PAYLOAD(ID, Value) = binary_to_term(Payload),
-                    %% TODO: Records.
-                    Record = {SpaceName, ID, Value, SeqNum};
-                [_Prefix, _ID, VarTok] ->
-                    VarName = token_to_varname(VarTok),
-                    Value = binary_to_term(Payload),
-                    Record = {VarName, Value}
-            end,
-            {ChangeSeqNum, Record}
-        catch
-            error:_ ->
-                ?tp(warning, "dssub_leader_store_unrecognized_message", #{
-                    store => id(Store),
-                    message => Msg
-                }),
-                unrecognized
+    try
+        case emqx_topic:tokens(Topic) of
+            [_Prefix, SpaceID, SpaceTok, _SeqTok] ->
+                SpaceName = token_to_space(SpaceTok),
+                ?STORE_PAYLOAD(ID, Value) = binary_to_term(Payload),
+                %% TODO: Records.
+                Record = {SpaceName, ID, Value, SeqNum};
+            [_Prefix, SpaceID, VarTok] ->
+                VarName = token_to_varname(VarTok),
+                Value = binary_to_term(Payload),
+                Record = {VarName, Value}
         end,
-    open_entry(Entry, Store).
-
-open_entry({ChangeSeqNum, Record}, Store = #{seqnum := SeqNum}) ->
-    open_record(Record, Store#{seqnum := max(ChangeSeqNum, SeqNum)});
-open_entry(unrecognized, Store) ->
-    Store.
+        ChangeSeqNum = maps:get(?STORE_HEADER_CHANGESEQNUM, Headers),
+        [{SpaceID, Record, ChangeSeqNum}]
+    catch
+        error:_ ->
+            ?tp(warning, "dssub_leader_store_unrecognized_message", #{message => Msg}),
+            []
+    end.
 
 open_record({SpaceName, ID, Value, SeqNum}, Store = #{seqmap := SeqMap}) ->
     Space0 = maps:get(SpaceName, Store),
@@ -692,6 +707,77 @@ mk_store_payload(?STORE_SK(_SpaceName, ID), Value) ->
 mk_store_payload(_VarName, Value) ->
     Value.
 
+%%
+
+-record(select, {tf, start, it, streams}).
+
+-type select() :: #select{}.
+
+-spec select(var_name()) -> select().
+select(VarName) ->
+    select_jump(select_new(VarName)).
+
+-spec select(var_name(), _Cursor) -> select().
+select(VarName, Cursor) ->
+    select_restore(Cursor, select_new(VarName)).
+
+select_new(VarName) ->
+    TopicFilter = mk_store_varname_wildcard(VarName),
+    StartTime = 0,
+    RankedStreams = emqx_ds:get_streams(?DS_DB, TopicFilter, StartTime),
+    Streams = [S || {_Rank, S} <- lists:sort(RankedStreams)],
+    #select{tf = TopicFilter, start = StartTime, streams = Streams}.
+
+select_jump(It = #select{tf = TopicFilter, start = StartTime, streams = [Stream | Rest]}) ->
+    DSIt = ds_make_iterator(Stream, TopicFilter, StartTime),
+    It#select{it = DSIt, streams = Rest};
+select_jump(It = #select{streams = []}) ->
+    It#select{it = undefined}.
+
+-spec select_next(select(), _N) -> {[{id(), _Var}], select() | end_of_iterator}.
+select_next(ItSelect, N) ->
+    select_fold(
+        ItSelect,
+        N,
+        fun(Message, Acc) ->
+            [{ID, Var} || {ID, {_VarName, Var}, _} <- open_message(Message)] ++ Acc
+        end,
+        []
+    ).
+
+select_fold(#select{it = undefined}, _, _Fun, Acc) ->
+    {Acc, end_of_iterator};
+select_fold(It = #select{it = DSIt0}, N, Fun, Acc0) ->
+    case emqx_ds:next(?DS_DB, DSIt0, N) of
+        {ok, DSIt, Messages} ->
+            Acc = lists:foldl(fun({_Key, Msg}, Acc) -> Fun(Msg, Acc) end, Acc0, Messages),
+            case length(Messages) of
+                N ->
+                    {Acc, It#select{it = DSIt}};
+                NLess when NLess < N ->
+                    select_fold(select_jump(It), N - NLess, Fun, Acc)
+            end;
+        {ok, end_of_stream} ->
+            select_fold(select_jump(It), N, Fun, Acc0)
+    end.
+
+-spec select_preserve(select()) -> _Cursor.
+select_preserve(#select{it = It, streams = Streams}) ->
+    case Streams of
+        [StreamNext | _Rest] ->
+            %% Preserve only the subsequent stream.
+            [It | StreamNext];
+        [] ->
+            %% Iterating over last stream, preserve only iterator.
+            [It]
+    end.
+
+select_restore([It], Select) ->
+    Select#select{it = It, streams = []};
+select_restore([It | StreamNext], Select = #select{streams = Streams}) ->
+    StreamsRest = lists:dropwhile(fun(S) -> S =/= StreamNext end, Streams),
+    Select#select{it = It, streams = StreamsRest}.
+
 mk_store_root_topic(ID) ->
     emqx_topic:join([?STORE_TOPIC_PREFIX, ID]).
 
@@ -705,20 +791,8 @@ mk_store_topic(ID, VarName, _SeqMap) ->
 mk_store_wildcard(ID) ->
     [?STORE_TOPIC_PREFIX, ID, '+', '#'].
 
-mk_read_root_batch(ID) ->
-    %% NOTE
-    %% Construct batch that essentially does nothing but reads rootset in a consistent
-    %% manner.
-    Matcher = #message_matcher{
-        from = ID,
-        topic = mk_store_root_topic(ID),
-        payload = '_',
-        timestamp = 0
-    },
-    #dsbatch{
-        preconditions = [{unless_exists, Matcher}],
-        operations = [{delete, Matcher#message_matcher{payload = <<>>}}]
-    }.
+mk_store_varname_wildcard(VarName) ->
+    [?STORE_TOPIC_PREFIX, '+', varname_to_token(VarName)].
 
 ds_refresh_streams(TopicFilter, StartTime, StreamIts) ->
     Streams = emqx_ds:get_streams(?DS_DB, TopicFilter, StartTime),
@@ -728,15 +802,18 @@ ds_refresh_streams(TopicFilter, StartTime, StreamIts) ->
                 #{Stream := _It} ->
                     Acc;
                 #{} ->
-                    %% TODO: Gracefully handle `emqx_ds:error(_)`?
-                    {ok, It} = emqx_ds:make_iterator(?DS_DB, Stream, TopicFilter, StartTime),
-                    Acc#{Stream => It}
+                    Acc#{Stream => ds_make_iterator(Stream, TopicFilter, StartTime)}
             end
         end,
         StreamIts,
         Streams
     ).
 
+ds_make_iterator(Stream, TopicFilter, StartTime) ->
+    %% TODO: Gracefully handle `emqx_ds:error(_)`?
+    {ok, It} = emqx_ds:make_iterator(?DS_DB, Stream, TopicFilter, StartTime),
+    It.
+
 ds_streams_fold(Fun, AccIn, StreamItsIn) ->
     maps:fold(
         fun(Stream, It0, {StreamIts, Acc0}) ->

+ 76 - 0
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -425,6 +425,82 @@ t_intensive_reassign(_Config) ->
     ok = emqtt:disconnect(ConnShared3),
     ok = emqtt:disconnect(ConnPub).
 
+t_multiple_groups(groups, _Groups) ->
+    [declare_explicit];
+t_multiple_groups('init', Config) ->
+    Now = emqx_message:timestamp_now(),
+    NQueues = 50,
+    Group = <<"multi">>,
+    Topics = [emqx_utils:format("t/mg/~p", [I]) || I <- lists:seq(1, NQueues)],
+    Queues = lists:map(
+        fun(Topic) ->
+            {ok, Queue} = emqx_ds_shared_sub_queue:declare(Group, wildcard(Topic), Now, 0),
+            Queue
+        end,
+        Topics
+    ),
+    [
+        {queue_group, Group},
+        {queue_topics, Topics},
+        {queues, Queues}
+        | Config
+    ];
+t_multiple_groups('end', Config) ->
+    Topics = proplists:get_value(queue_topics, Config),
+    lists:foreach(
+        fun(Topic) -> emqx_ds_shared_sub_queue:destroy(<<"multi">>, Topic) end,
+        Topics
+    ).
+
+t_multiple_groups(Config) ->
+    Topics = proplists:get_value(queue_topics, Config),
+    NSubs = 20,
+    NPubs = 1000,
+    NQueues = length(Topics),
+    ConnPub = emqtt_connect_pub(<<"t_multiple_groups:pub">>),
+    ConnSubs = lists:map(
+        fun(I) ->
+            ClientId = emqx_utils:format("t_multiple_groups:sub:~p", [I]),
+            ConnSub = emqtt_connect_sub(ClientId),
+            ok = lists:foreach(
+                fun(Ti) ->
+                    Topic = lists:nth(Ti, Topics),
+                    TopicSub = emqx_topic:join([<<"$share/multi">>, wildcard(Topic)]),
+                    {ok, _, [1]} = emqtt:subscribe(ConnSub, TopicSub, 1)
+                end,
+                lists:seq(I, NQueues, NSubs)
+            ),
+            ConnSub
+        end,
+        lists:seq(1, NSubs)
+    ),
+
+    Payloads = lists:map(
+        fun(Pi) ->
+            Qi = pick_queue(Pi, NQueues),
+            Payload = integer_to_binary(Pi),
+            TopicPub = emqx_topic:join([lists:nth(Qi, Topics), integer_to_binary(Pi)]),
+            {ok, _} = emqtt:publish(ConnPub, TopicPub, Payload, 1),
+            Payload
+        end,
+        lists:seq(1, NPubs)
+    ),
+
+    Pubs = drain_publishes(),
+    ?assertMatch(
+        {[_ | _], []},
+        lists:partition(fun(#{payload := P}) -> lists:member(P, Payloads) end, Pubs)
+    ),
+
+    lists:foreach(fun emqtt:disconnect/1, [ConnPub | ConnSubs]).
+
+pick_queue(I, NQueues) ->
+    %% NOTE: Allocate publishes to queues unevenly, but every queue is utilized.
+    round(math:sqrt(NQueues) * math:log2(I)) rem NQueues + 1.
+
+wildcard(Topic) ->
+    emqx_topic:join([Topic, '#']).
+
 t_unsubscribe('init', Config) ->
     declare_queue_if_needed(<<"gr9">>, <<"topic9/#">>, Config);
 t_unsubscribe('end', Config) ->

+ 118 - 29
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -10,14 +10,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--import(
-    emqx_mgmt_api_test_util,
-    [
-        request_api/2,
-        request/3,
-        uri/1
-    ]
-).
+-import(emqx_mgmt_api_test_util, [uri/1]).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -33,10 +26,12 @@ init_per_suite(Config) ->
                     },
                     <<"durable_storage">> => #{
                         <<"messages">> => #{
-                            <<"backend">> => <<"builtin_raft">>
+                            <<"backend">> => <<"builtin_raft">>,
+                            <<"n_shards">> => 4
                         },
                         <<"queues">> => #{
-                            <<"backend">> => <<"builtin_raft">>
+                            <<"backend">> => <<"builtin_raft">>,
+                            <<"n_shards">> => 4
                         }
                     }
                 }
@@ -67,6 +62,7 @@ init_per_testcase(_TC, Config) ->
 end_per_testcase(_TC, _Config) ->
     ok = snabbkaffe:stop(),
     ok = emqx_ds_shared_sub_registry:purge(),
+    ok = destroy_queues(),
     ok.
 %%--------------------------------------------------------------------
 %% Tests
@@ -74,7 +70,7 @@ end_per_testcase(_TC, _Config) ->
 
 t_basic_crud(_Config) ->
     ?assertMatch(
-        {ok, []},
+        {ok, #{<<"data">> := []}},
         api_get(["durable_queues"])
     ),
 
@@ -124,11 +120,11 @@ t_basic_crud(_Config) ->
         Resp2
     ),
 
-    %% TODO
-    %% ?assertMatch(
-    %%     {ok, [#{<<"id">> := <<"q2">>}, #{<<"id">> := <<"q1">>}]},
-    %%     api_get(["durable_queues"])
-    %% ),
+    {ok, 201, #{<<"id">> := QueueID2}} = Resp2,
+    ?assertMatch(
+        {ok, #{<<"data">> := [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}]}},
+        api_get(["durable_queues"])
+    ),
 
     ?assertMatch(
         {ok, 200, <<"Queue deleted">>},
@@ -139,13 +135,90 @@ t_basic_crud(_Config) ->
         api(delete, ["durable_queues", QueueID1], #{})
     ),
 
-    %% TODO
-    %% ?assertMatch(
-    %%     {ok, [#{<<"id">> := <<"q1">>}]},
-    %%     api_get(["durable_queues"])
-    %% ).
+    ?assertMatch(
+        {ok, #{<<"data">> := [#{<<"id">> := QueueID2}]}},
+        api_get(["durable_queues"])
+    ).
 
-    ok.
+t_list_queues(_Config) ->
+    {ok, 201, #{<<"id">> := QID1}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq1">>,
+        <<"topic">> => <<"#">>,
+        <<"start_time">> => 42
+    }),
+    {ok, 201, #{<<"id">> := QID2}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq2">>,
+        <<"topic">> => <<"specific/topic">>,
+        <<"start_time">> => 0
+    }),
+    {ok, 201, #{<<"id">> := QID3}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq3">>,
+        <<"topic">> => <<"1/2/3/+">>,
+        <<"start_time">> => emqx_message:timestamp_now()
+    }),
+    {ok, 201, #{<<"id">> := QID4}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq4">>,
+        <<"topic">> => <<"4/5/6/#">>,
+        <<"start_time">> => emqx_message:timestamp_now()
+    }),
+
+    {ok, Resp} = api_get(["durable_queues"]),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}, #{}, #{}, #{}],
+            <<"meta">> := #{<<"hasnext">> := false}
+        },
+        Resp
+    ),
+
+    {ok, Resp1} = api_get(["durable_queues"], #{limit => <<"1">>}),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}],
+            <<"meta">> := #{<<"hasnext">> := true, <<"cursor">> := _}
+        },
+        Resp1
+    ),
+
+    {ok, Resp2} = api_get(["durable_queues"], #{
+        limit => <<"1">>,
+        cursor => emqx_utils_maps:deep_get([<<"meta">>, <<"cursor">>], Resp1)
+    }),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}],
+            <<"meta">> := #{<<"hasnext">> := true, <<"cursor">> := _}
+        },
+        Resp2
+    ),
+
+    {ok, Resp3} = api_get(["durable_queues"], #{
+        limit => <<"2">>,
+        cursor => emqx_utils_maps:deep_get([<<"meta">>, <<"cursor">>], Resp2)
+    }),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}, #{}],
+            <<"meta">> := #{<<"hasnext">> := false}
+        },
+        Resp3
+    ),
+
+    Data = maps:get(<<"data">>, Resp),
+    ?assertEqual(
+        [QID1, QID2, QID3, QID4],
+        lists:sort([ID || #{<<"id">> := ID} <- Data]),
+        Resp
+    ),
+
+    Data1 = maps:get(<<"data">>, Resp1),
+    Data2 = maps:get(<<"data">>, Resp2),
+    Data3 = maps:get(<<"data">>, Resp3),
+    ?assertEqual(
+        [QID1, QID2, QID3, QID4],
+        lists:sort([ID || D <- [Data1, Data2, Data3], #{<<"id">> := ID} <- D]),
+        [Resp1, Resp2, Resp3]
+    ).
 
 t_duplicate_queue(_Config) ->
     ?assertMatch(
@@ -171,20 +244,36 @@ t_duplicate_queue(_Config) ->
         })
     ).
 
+%%--------------------------------------------------------------------
+
+destroy_queues() ->
+    case api_get(["durable_queues"], #{limit => <<"100">>}) of
+        {ok, #{<<"data">> := Queues}} ->
+            lists:foreach(fun destroy_queue/1, Queues);
+        Error ->
+            Error
+    end.
+
+destroy_queue(#{<<"id">> := QueueID}) ->
+    {ok, 200, _Deleted} = api(delete, ["durable_queues", QueueID], #{}).
+
 %%--------------------------------------------------------------------
 %% Helpers
 %%--------------------------------------------------------------------
 
 api_get(Path) ->
-    case request_api(get, uri(Path)) of
-        {ok, ResponseBody} ->
-            {ok, jiffy:decode(list_to_binary(ResponseBody), [return_maps])};
-        {error, _} = Error ->
-            Error
-    end.
+    api_response(emqx_mgmt_api_test_util:request_api(get, uri(Path))).
+
+api_get(Path, Query) ->
+    api_response(emqx_mgmt_api_test_util:request_api(get, uri(Path), Query, [])).
+
+api_response({ok, ResponseBody}) ->
+    {ok, jiffy:decode(iolist_to_binary(ResponseBody), [return_maps])};
+api_response({error, _} = Error) ->
+    Error.
 
 api(Method, Path, Data) ->
-    case request(Method, uri(Path), Data) of
+    case emqx_mgmt_api_test_util:request(Method, uri(Path), Data) of
         {ok, Code, ResponseBody} ->
             Res =
                 case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of

+ 25 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -87,6 +87,9 @@
         bits_per_wildcard_level => pos_integer(),
         topic_index_bytes => pos_integer(),
         epoch_bits => non_neg_integer(),
+        %% Which epochs are considered readable, i.e. visible in `next/6`?
+        %% Default: `complete`, unless it's a single neverending epoch, it's always readable.
+        epoch_readable => complete | any,
         lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
@@ -97,6 +100,7 @@
         topic_index_bytes := pos_integer(),
         ts_bits := non_neg_integer(),
         ts_offset_bits := non_neg_integer(),
+        epoch_readable => complete | any,
         lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
@@ -109,6 +113,7 @@
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     ts_bits :: non_neg_integer(),
     ts_offset :: non_neg_integer(),
+    epoch_readable :: complete | any,
     threshold_fun :: emqx_ds_lts:threshold_fun(),
     gvars :: ets:table()
 }).
@@ -204,8 +209,15 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
+    TSBits = 64,
     %% 20 bits -> 1048576 us -> ~1 sec
     TSOffsetBits = maps:get(epoch_bits, Options, 20),
+    case TSBits - TSOffsetBits of
+        %% NOTE: There's a single, always incomplete epoch, consider it readable.
+        0 -> EpochReadableDefault = any;
+        _ -> EpochReadableDefault = complete
+    end,
+    EpochReadable = maps:get(epoch_readable, Options, EpochReadableDefault),
     ThresholdSpec = maps:get(lts_threshold_spec, Options, ?DEFAULT_LTS_THRESHOLD),
     %% Create column families:
     DataCFName = data_cf(GenId),
@@ -224,8 +236,9 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     Schema = #{
         bits_per_wildcard_level => BitsPerTopicLevel,
         topic_index_bytes => TopicIndexBytes,
-        ts_bits => 64,
+        ts_bits => TSBits,
         ts_offset_bits => TSOffsetBits,
+        epoch_readable => EpochReadable,
         lts_threshold_spec => ThresholdSpec
     },
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
@@ -267,6 +280,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         keymappers = KeymapperCache,
         ts_offset = TSOffsetBits,
         ts_bits = TSBits,
+        epoch_readable = maps:get(epoch_readable, Schema, complete),
         threshold_fun = emqx_ds_lts:threshold_fun(ThresholdSpec),
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
@@ -447,7 +461,7 @@ update_iterator(
 
 next(
     Shard,
-    Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
+    Schema = #s{ts_offset = TSOffset, ts_bits = TSBits, epoch_readable = EpochReadable},
     It = #{?storage_key := Stream},
     BatchSize,
     Now,
@@ -475,6 +489,9 @@ next(
                 %% iterator can jump to the next topic and then it
                 %% won't backtrack.
                 false;
+            _ when EpochReadable == any ->
+                %% Incomplete epochs are explicitly marked as readable:
+                false;
             _ ->
                 %% New batches are only added to the current
                 %% generation. We can ignore cutoff time for old
@@ -600,6 +617,9 @@ lookup_message(
             {error, unrecoverable, {rocksdb, Error}}
     end.
 
+handle_event(_ShardId, #s{epoch_readable = any}, _Time, tick) ->
+    %% No need for idle tracking.
+    [];
 handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
     %% If the last message was published more than one epoch ago, and
     %% the shard remains idle, we need to advance safety cutoff
@@ -959,10 +979,11 @@ deserialize(Blob) ->
 
 %% erlfmt-ignore
 make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
+    TsEpochBits = TSBits - TSOffsetBits,
     Bitsources =
     %% Dimension Offset   Bitsize
-        [{?DIM_TOPIC,     0,            TopicIndexBytes * ?BYTE_SIZE},      %% Topic index
-         {?DIM_TS,        TSOffsetBits, TSBits - TSOffsetBits       }] ++   %% Timestamp epoch
+        [{?DIM_TOPIC,     0,            TopicIndexBytes * ?BYTE_SIZE}] ++   %% Topic index
+        [{?DIM_TS,        TSOffsetBits, TsEpochBits} || TsEpochBits > 0] ++ %% Timestamp epoch
         [{?DIM_TS + I,    0,            BitsPerTopicLevel           }       %% Varying topic levels
                                                            || I <- lists:seq(1, N)] ++
         [{?DIM_TS,        0,            TSOffsetBits                }],     %% Timestamp offset

+ 8 - 8
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -956,19 +956,19 @@ list_clients(QString) ->
             {200, Response}
     end.
 
-list_clients_v2(get, #{query_string := QString0}) ->
+list_clients_v2(get, #{query_string := QString}) ->
     Nodes = emqx:running_nodes(),
-    case maps:get(<<"cursor">>, QString0, none) of
-        none ->
-            Cursor = initial_ets_cursor(Nodes),
-            do_list_clients_v2(Nodes, Cursor, QString0);
-        CursorBin when is_binary(CursorBin) ->
+    case QString of
+        #{<<"cursor">> := CursorBin} ->
             case parse_cursor(CursorBin, Nodes) of
                 {ok, Cursor} ->
-                    do_list_clients_v2(Nodes, Cursor, QString0);
+                    do_list_clients_v2(Nodes, Cursor, QString);
                 {error, bad_cursor} ->
                     ?BAD_REQUEST(<<"bad cursor">>)
-            end
+            end;
+        #{} ->
+            Cursor = initial_ets_cursor(Nodes),
+            do_list_clients_v2(Nodes, Cursor, QString)
     end.
 
 do_list_clients_v2(Nodes, Cursor, QString0) ->

+ 5 - 2
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -494,7 +494,9 @@ update_ms(clientid, X, {{Topic, Pid}, Opts}) ->
 update_ms(topic, X, {{Topic, Pid}, Opts}) when
     is_record(Topic, share)
 ->
-    {{#share{group = '_', topic = X}, Pid}, Opts};
+    %% NOTE: Equivalent to `#share{group = '_', topic = X}`, but dialyzer is happy.
+    Share = setelement(#share.group, #share{group = <<>>, topic = X}, '_'),
+    {{Share, Pid}, Opts};
 update_ms(topic, X, {{Topic, Pid}, Opts}) when
     is_binary(Topic) orelse Topic =:= '_'
 ->
@@ -502,7 +504,8 @@ update_ms(topic, X, {{Topic, Pid}, Opts}) when
 update_ms(share_group, X, {{Topic, Pid}, Opts}) when
     not is_record(Topic, share)
 ->
-    {{#share{group = X, topic = Topic}, Pid}, Opts};
+    Share = #share{group = X, topic = Topic},
+    {{Share, Pid}, Opts};
 update_ms(qos, X, {{Topic, Pid}, Opts}) ->
     {{Topic, Pid}, Opts#{qos => X}}.
 

+ 7 - 13
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -2198,28 +2198,22 @@ assert_contains_clientids(Results, ExpectedClientIds, Line) ->
     ).
 
 traverse_in_reverse_v2(QueryParams0, Results, Config) ->
-    Cursors0 =
-        lists:map(
-            fun(#{<<"meta">> := Meta}) ->
-                maps:get(<<"cursor">>, Meta, <<"wontbeused">>)
-            end,
-            Results
-        ),
-    Cursors1 = [<<"none">> | lists:droplast(Cursors0)],
+    Cursors = [C || #{<<"meta">> := #{<<"cursor">> := C}} <- lists:droplast(Results)],
+    CursorParams = [#{} | [#{cursor => C} || C <- Cursors]],
     DirectOrderClientIds = [
         ClientId
      || #{<<"data">> := Rows} <- Results,
         #{<<"clientid">> := ClientId} <- Rows
     ],
-    ReverseCursors = lists:reverse(Cursors1),
+    ReverseCursorParams = lists:reverse(CursorParams),
     do_traverse_in_reverse_v2(
-        QueryParams0, Config, ReverseCursors, DirectOrderClientIds, _Acc = []
+        QueryParams0, Config, ReverseCursorParams, DirectOrderClientIds, _Acc = []
     ).
 
-do_traverse_in_reverse_v2(_QueryParams0, _Config, _Cursors = [], DirectOrderClientIds, Acc) ->
+do_traverse_in_reverse_v2(_QueryParams0, _Config, [], DirectOrderClientIds, Acc) ->
     ?assertEqual(DirectOrderClientIds, Acc);
-do_traverse_in_reverse_v2(QueryParams0, Config, [Cursor | Rest], DirectOrderClientIds, Acc) ->
-    QueryParams = QueryParams0#{cursor => Cursor},
+do_traverse_in_reverse_v2(QueryParams0, Config, [CursorParam | Rest], DirectOrderClientIds, Acc) ->
+    QueryParams = maps:merge(QueryParams0, CursorParam),
     Res0 = list_v2_request(QueryParams, Config),
     ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := _}}}, Res0),
     {ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0,

+ 9 - 2
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -108,8 +108,8 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, [], Opts) when
 ->
     NewUrl =
         case QueryParams of
-            "" -> Url;
-            _ -> Url ++ "?" ++ QueryParams
+            [] -> Url;
+            _ -> Url ++ "?" ++ build_query_string(QueryParams)
         end,
     do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}, Opts);
 request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when
@@ -165,6 +165,13 @@ simplify_result(Res) ->
 auth_header_() ->
     emqx_common_test_http:default_auth_header().
 
+build_query_string(Query = #{}) ->
+    build_query_string(maps:to_list(Query));
+build_query_string(Query = [{_, _} | _]) ->
+    uri_string:compose_query([{emqx_utils_conv:bin(K), V} || {K, V} <- Query]);
+build_query_string(QueryString) ->
+    unicode:characters_to_list(QueryString).
+
 build_http_header(X) when is_list(X) ->
     X;
 build_http_header(X) ->