Browse Source

Merge pull request #13962 from emqx/sync-release-58-20241009-062048

Sync release-58
Ivan Dyachkov 1 year ago
parent
commit
5cef737796
40 changed files with 774 additions and 301 deletions
  1. 61 26
      apps/emqx/src/emqx_ds_schema.erl
  2. 6 59
      apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl
  3. 1 2
      apps/emqx/src/emqx_types.erl
  4. 2 2
      apps/emqx_auth_cinfo/README.md
  5. 7 1
      apps/emqx_auth_cinfo/src/emqx_authn_cinfo.erl
  6. 37 2
      apps/emqx_auth_cinfo/test/emqx_authn_cinfo_SUITE.erl
  7. 28 3
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl
  8. 1 1
      apps/emqx_bridge_pulsar/mix.exs
  9. 1 1
      apps/emqx_bridge_pulsar/rebar.config
  10. 2 0
      apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl
  11. 1 1
      apps/emqx_connector/mix.exs
  12. 2 2
      apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
  13. 88 29
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl
  14. 2 6
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl
  15. 2 3
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl
  16. 0 4
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl
  17. 43 5
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_queue.erl
  18. 7 4
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl
  19. 147 70
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_store.erl
  20. 76 0
      apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl
  21. 118 29
      apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl
  22. 25 4
      apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl
  23. 22 10
      apps/emqx_management/src/emqx_mgmt_api_banned.erl
  24. 8 8
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  25. 3 0
      apps/emqx_management/src/emqx_mgmt_api_ds.erl
  26. 5 2
      apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl
  27. 7 3
      apps/emqx_management/src/emqx_mgmt_cli.erl
  28. 4 0
      apps/emqx_management/test/emqx_mgmt_api_banned_SUITE.erl
  29. 7 13
      apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
  30. 9 2
      apps/emqx_management/test/emqx_mgmt_api_test_util.erl
  31. 1 1
      apps/emqx_redis/mix.exs
  32. 1 1
      apps/emqx_redis/rebar.config
  33. 16 1
      apps/emqx_utils/src/emqx_variform_bif.erl
  34. 21 0
      apps/emqx_utils/test/emqx_variform_bif_tests.erl
  35. 2 0
      changes/ce/fix-13956.en.md
  36. 1 1
      changes/ee/feat-13810.en.md
  37. 3 0
      changes/ee/fix-13959.en.md
  38. 1 1
      rel/config/ee-examples/cinfo-authn.conf
  39. 2 0
      rel/i18n/emqx_authn_cinfo_schema.hocon
  40. 4 4
      rel/i18n/emqx_mgmt_api_banned.hocon

+ 61 - 26
apps/emqx/src/emqx_ds_schema.erl

@@ -18,9 +18,12 @@
 -module(emqx_ds_schema).
 
 %% API:
--export([schema/0, storage_schema/1, translate_builtin_raft/1, translate_builtin_local/1]).
+-export([schema/0, db_schema/1, db_schema/2]).
 -export([db_config/1]).
 
+%% Internal exports:
+-export([translate_builtin_raft/1, translate_builtin_local/1]).
+
 %% Behavior callbacks:
 -export([fields/1, desc/1, namespace/0]).
 
@@ -39,10 +42,10 @@
 
 -if(?EMQX_RELEASE_EDITION == ee).
 -define(DEFAULT_BACKEND, builtin_raft).
--define(BUILTIN_BACKENDS, [ref(builtin_raft), ref(builtin_local)]).
+-define(BUILTIN_BACKENDS, [builtin_raft, builtin_local]).
 -else.
 -define(DEFAULT_BACKEND, builtin_local).
--define(BUILTIN_BACKENDS, [ref(builtin_local)]).
+-define(BUILTIN_BACKENDS, [builtin_local]).
 -endif.
 
 %%================================================================================
@@ -59,32 +62,34 @@ translate_builtin_raft(
         backend := builtin_raft,
         n_shards := NShards,
         n_sites := NSites,
-        replication_factor := ReplFactor,
-        layout := Layout
+        replication_factor := ReplFactor
     }
 ) ->
+    %% NOTE: Undefined if `basic` schema is in use.
+    Layout = maps:get(layout, Backend, undefined),
     #{
         backend => builtin_raft,
         n_shards => NShards,
         n_sites => NSites,
         replication_factor => ReplFactor,
         replication_options => maps:get(replication_options, Backend, #{}),
-        storage => translate_layout(Layout)
+        storage => emqx_maybe:apply(fun translate_layout/1, Layout)
     }.
 
 translate_builtin_local(
-    #{
+    Backend = #{
         backend := builtin_local,
-        n_shards := NShards,
-        layout := Layout,
-        poll_workers_per_shard := NPollers,
-        poll_batch_size := BatchSize
+        n_shards := NShards
     }
 ) ->
+    %% NOTE: Undefined if `basic` schema is in use.
+    Layout = maps:get(layout, Backend, undefined),
+    NPollers = maps:get(poll_workers_per_shard, Backend, undefined),
+    BatchSize = maps:get(poll_batch_size, Backend, undefined),
     #{
         backend => builtin_local,
         n_shards => NShards,
-        storage => translate_layout(Layout),
+        storage => emqx_maybe:apply(fun translate_layout/1, Layout),
         poll_workers_per_shard => NPollers,
         poll_batch_size => BatchSize
     }.
@@ -99,24 +104,35 @@ namespace() ->
 schema() ->
     [
         {messages,
-            storage_schema(#{
+            db_schema(#{
                 importance => ?IMPORTANCE_MEDIUM,
                 desc => ?DESC(messages)
             })}
     ] ++ emqx_schema_hooks:injection_point('durable_storage', []).
 
-storage_schema(ExtraOptions) ->
+db_schema(ExtraOptions) ->
+    db_schema(complete, ExtraOptions).
+
+db_schema(Flavor, ExtraOptions) ->
     Options = #{
         default => #{<<"backend">> => ?DEFAULT_BACKEND}
     },
+    BuiltinBackends = [backend_ref(Backend, Flavor) || Backend <- ?BUILTIN_BACKENDS],
+    CustomBackends = emqx_schema_hooks:injection_point('durable_storage.backends', []),
     sc(
-        hoconsc:union(
-            ?BUILTIN_BACKENDS ++ emqx_schema_hooks:injection_point('durable_storage.backends', [])
-        ),
+        hoconsc:union(BuiltinBackends ++ CustomBackends),
         maps:merge(Options, ExtraOptions)
     ).
 
-fields(builtin_local) ->
+-dialyzer({nowarn_function, backend_ref/2}).
+backend_ref(Backend, complete) ->
+    ref(Backend);
+backend_ref(builtin_local, basic) ->
+    ref(builtin_local_basic);
+backend_ref(builtin_raft, basic) ->
+    ref(builtin_raft_basic).
+
+backend_fields(builtin_local, Flavor) ->
     %% Schema for the builtin_raft backend:
     [
         {backend,
@@ -138,9 +154,9 @@ fields(builtin_local) ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}
-        | common_builtin_fields()
+        | common_builtin_fields(Flavor)
     ];
-fields(builtin_raft) ->
+backend_fields(builtin_raft, Flavor) ->
     %% Schema for the builtin_raft backend:
     [
         {backend,
@@ -189,8 +205,17 @@ fields(builtin_raft) ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}
-        | common_builtin_fields()
-    ];
+        | common_builtin_fields(Flavor)
+    ].
+
+fields(builtin_local) ->
+    backend_fields(builtin_local, complete);
+fields(builtin_raft) ->
+    backend_fields(builtin_raft, complete);
+fields(builtin_local_basic) ->
+    backend_fields(builtin_local, basic);
+fields(builtin_raft_basic) ->
+    backend_fields(builtin_raft, basic);
 fields(builtin_write_buffer) ->
     [
         {max_items,
@@ -301,7 +326,7 @@ fields(layout_builtin_reference) ->
             )}
     ].
 
-common_builtin_fields() ->
+common_builtin_fields(basic) ->
     [
         {data_dir,
             sc(
@@ -329,7 +354,10 @@ common_builtin_fields() ->
                     importance => ?IMPORTANCE_HIDDEN,
                     desc => ?DESC(builtin_write_buffer)
                 }
-            )},
+            )}
+    ];
+common_builtin_fields(layout) ->
+    [
         {layout,
             sc(
                 hoconsc:union(builtin_layouts()),
@@ -341,7 +369,10 @@ common_builtin_fields() ->
                             <<"type">> => wildcard_optimized_v2
                         }
                 }
-            )},
+            )}
+    ];
+common_builtin_fields(polling) ->
+    [
         {poll_workers_per_shard,
             sc(
                 pos_integer(),
@@ -358,7 +389,11 @@ common_builtin_fields() ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}
-    ].
+    ];
+common_builtin_fields(complete) ->
+    common_builtin_fields(basic) ++
+        common_builtin_fields(layout) ++
+        common_builtin_fields(polling).
 
 desc(builtin_raft) ->
     ?DESC(builtin_raft);

+ 6 - 59
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -54,11 +54,6 @@
     cold_get_subscription/2
 ]).
 
--export([
-    format_lease_events/1,
-    format_stream_progresses/1
-]).
-
 -define(schedule_subscribe, schedule_subscribe).
 -define(schedule_unsubscribe, schedule_unsubscribe).
 
@@ -239,7 +234,7 @@ schedule_subscribe(
             ?tp(debug, shared_subs_schedule_subscribe_override, #{
                 share_topic_filter => ShareTopicFilter,
                 new_type => {?schedule_subscribe, SubOpts},
-                old_action => format_schedule_action(ScheduledAction)
+                old_action => ScheduledAction
             }),
             SharedSubS0#{scheduled_actions := ScheduledActions1};
         _ ->
@@ -291,7 +286,7 @@ schedule_unsubscribe(
             ?tp(debug, shared_subs_schedule_unsubscribe_override, #{
                 share_topic_filter => ShareTopicFilter,
                 new_type => ?schedule_unsubscribe,
-                old_action => format_schedule_action(ScheduledAction0)
+                old_action => ScheduledAction0
             }),
             SharedSubS0#{scheduled_actions := ScheduledActions1};
         _ ->
@@ -305,7 +300,7 @@ schedule_unsubscribe(
             },
             ?tp(debug, shared_subs_schedule_unsubscribe_new, #{
                 share_topic_filter => ShareTopicFilter,
-                stream_keys => format_stream_keys(StreamKeys)
+                stream_keys => StreamKeys
             }),
             SharedSubS0#{scheduled_actions := ScheduledActions1}
     end.
@@ -324,7 +319,7 @@ renew_streams(S0, SchedS0, #{agent := Agent0} = SharedS0) ->
         emqx_persistent_session_ds_shared_subs_agent:renew_streams(Agent0),
     StreamLeaseEvents =/= [] andalso
         ?tp(debug, shared_subs_new_stream_lease_events, #{
-            stream_lease_events => format_lease_events(StreamLeaseEvents)
+            stream_lease_events => StreamLeaseEvents
         }),
     {S, SchedS} = lists:foldl(
         fun
@@ -491,7 +486,7 @@ run_scheduled_action(
         [] ->
             ?tp(debug, shared_subs_schedule_action_complete, #{
                 share_topic_filter => ShareTopicFilter,
-                progresses => format_stream_progresses(Progresses1),
+                progresses => Progresses1,
                 type => Type
             }),
             %% Regular progress won't se unsubscribed streams, so we need to
@@ -515,7 +510,7 @@ run_scheduled_action(
             Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
             ?tp(debug, shared_subs_schedule_action_continue, #{
                 share_topic_filter => ShareTopicFilter,
-                new_action => format_schedule_action(Action1)
+                new_action => Action1
             }),
             {continue, Action1}
     end.
@@ -725,51 +720,3 @@ is_stream_fully_acked(_, _, #srs{
     true;
 is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
-
-%%--------------------------------------------------------------------
-%% Formatters
-%%--------------------------------------------------------------------
-
-format_schedule_action(#{
-    type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
-}) ->
-    #{
-        type => Type,
-        progresses => format_stream_progresses(Progresses),
-        stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
-    }.
-
-format_stream_progresses(Streams) ->
-    lists:map(
-        fun format_stream_progress/1,
-        Streams
-    ).
-
-format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
-    Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
-
-format_progress(#{iterator := Iterator} = Progress) ->
-    Progress#{iterator => format_opaque(Iterator)}.
-
-format_stream_key(beginning) -> beginning;
-format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
-
-format_stream_keys(StreamKeys) ->
-    lists:map(
-        fun format_stream_key/1,
-        StreamKeys
-    ).
-
-format_lease_events(Events) ->
-    lists:map(
-        fun format_lease_event/1,
-        Events
-    ).
-
-format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
-    Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
-format_lease_event(#{stream := Stream} = Event) ->
-    Event#{stream => format_opaque(Stream)}.
-
-format_opaque(Opaque) ->
-    erlang:phash2(Opaque).

+ 1 - 2
apps/emqx/src/emqx_types.erl

@@ -144,8 +144,7 @@
 
 -type subid() :: binary() | atom().
 
-%% '_' for match spec
--type group() :: binary() | '_'.
+-type group() :: binary().
 -type topic() :: binary().
 -type word() :: '' | '+' | '#' | binary().
 -type words() :: list(word()).

+ 2 - 2
apps/emqx_auth_cinfo/README.md

@@ -4,7 +4,7 @@ This application implements an extended authentication for EMQX Enterprise editi
 
 Client-info (of type `cinfo`) authentication is a lightweight authentication mechanism which checks client properties and attributes against user defined rules.
 The rules make use of the Variform expression to define match conditions, and the authentication result when match is found.
-For example, to quickly fencing off clients without a username, the match condition can be `str_eq(username, '')` associated with a attributes result `deny`.
+For example, to quickly fencing off clients without a username, the match condition can be `is_empty_val(username)` associated with a attributes result `deny`.
 
 The new authenticator config look is like below.
 
@@ -21,7 +21,7 @@ authentication = [
       # deny clients with empty username and client ID starts with 'v1-'
       {
         # when is_match is an array, it yields 'true' if all individual checks yield 'true'
-        is_match = ["str_eq(username, '')", "str_eq(nth(1,tokens(clientid,'-')), 'v1')"]
+        is_match = ["is_empty_val(username)", "str_eq(nth(1,tokens(clientid,'-')), 'v1')"]
         result = deny
       }
       # if all checks are exhausted without an 'allow' or a 'deny' result, continue to the next authentication

+ 7 - 1
apps/emqx_auth_cinfo/src/emqx_authn_cinfo.erl

@@ -67,9 +67,15 @@ authenticate(#{auth_method := _}, _) ->
     %% enhanced authentication is not supported by this provider
     ignore;
 authenticate(Credential0, #{checks := Checks}) ->
-    Credential = add_credential_aliases(Credential0),
+    Credential1 = add_credential_aliases(Credential0),
+    Credential = peerhost_as_string(Credential1),
     check(Checks, Credential).
 
+peerhost_as_string(#{peerhost := Peerhost} = Credential) when is_tuple(Peerhost) ->
+    Credential#{peerhost => iolist_to_binary(inet:ntoa(Peerhost))};
+peerhost_as_string(Credential) ->
+    Credential.
+
 check([], _) ->
     ignore;
 check([Check | Rest], Credential) ->

+ 37 - 2
apps/emqx_auth_cinfo/test/emqx_authn_cinfo_SUITE.erl

@@ -42,7 +42,7 @@ t_username_equal_clientid(_) ->
     Checks =
         [
             #{
-                is_match => <<"str_eq(username, '')">>,
+                is_match => <<"is_empty_val(username)">>,
                 result => deny
             },
             #{
@@ -105,7 +105,7 @@ t_multiple_is_match_expressions(_) ->
             %% use AND to connect multiple is_match expressions
             %% this one means username is not empty, and clientid is 'super'
             is_match => [
-                <<"str_neq('', username)">>, <<"str_eq(clientid, 'super')">>
+                <<"not(is_empty_val(username))">>, <<"str_eq(clientid, 'super')">>
             ],
             result => allow
         }
@@ -153,6 +153,41 @@ t_cert_fields_as_alias(_) ->
         end
     ).
 
+t_peerhost_matches_username(_) ->
+    Checks = [
+        #{
+            is_match => [
+                <<"str_eq(peerhost, username)">>
+            ],
+            result => allow
+        },
+        #{
+            is_match => <<"true">>,
+            result => deny
+        }
+    ],
+    IPStr1 = "127.0.0.1",
+    IPStr2 = "::1",
+    {ok, IPTuple1} = inet:parse_address(IPStr1, inet),
+    {ok, IPTuple2} = inet:parse_address(IPStr2, inet6),
+    with_checks(
+        Checks,
+        fun(State) ->
+            ?assertMatch(
+                {ok, #{}},
+                emqx_authn_cinfo:authenticate(
+                    #{username => list_to_binary(IPStr1), peerhost => IPTuple1}, State
+                )
+            ),
+            ?assertMatch(
+                {ok, #{}},
+                emqx_authn_cinfo:authenticate(
+                    #{username => list_to_binary(IPStr2), peerhost => IPTuple2}, State
+                )
+            )
+        end
+    ).
+
 config(Checks) ->
     #{
         mechanism => cinfo,

+ 28 - 3
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -75,7 +75,9 @@
 
 -define(CONNECTOR_TYPE, iotdb).
 -define(IOTDB_PING_PATH, <<"ping">>).
--define(DEFAULT_THRIFT_TIMEOUT, timer:seconds(10)).
+
+%% timer:seconds(10)).
+-define(DEFAULT_THRIFT_TIMEOUT, 10000).
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
@@ -336,9 +338,10 @@ on_start(
 
     DriverOpts = maps:merge(
         #{
-            connect_timeout => ?DEFAULT_THRIFT_TIMEOUT, recv_timeout => ?DEFAULT_THRIFT_TIMEOUT
+            connect_timeout => ?DEFAULT_THRIFT_TIMEOUT,
+            recv_timeout => ?DEFAULT_THRIFT_TIMEOUT
         },
-        maps:with([connect_timeout, recv_timeout], Config)
+        normalize_thrift_timeout(maps:with([connect_timeout, recv_timeout], Config))
     ),
 
     DriverOpts1 =
@@ -1103,3 +1106,25 @@ do_on_query(InstanceId, ChannelId, Data, #{driver := restapi} = State) ->
     emqx_bridge_http_connector:on_query(InstanceId, {ChannelId, Data}, State);
 do_on_query(InstanceId, _ChannelId, Data, #{driver := thrift} = _State) ->
     ecpool:pick_and_do(InstanceId, {iotdb, insert_tablet, [Data]}, no_handover).
+
+%% 1. The default timeout in Thrift is `infinity`, but it may cause stuck
+%% 2. The schema of `timeout` accepts a zero value, but the Thrift driver not
+%% 3. If the timeout is too small, the driver may not work properly
+normalize_thrift_timeout(Timeouts) ->
+    maps:map(
+        fun
+            (_K, V) when V >= ?DEFAULT_THRIFT_TIMEOUT ->
+                V;
+            (K, V) ->
+                ?SLOG(warning, #{
+                    msg => "iotdb_thrift_timeout_normalized",
+                    reason => "The timeout is too small for the Thrift driver to work",
+                    timeout => K,
+                    from => V,
+                    to => ?DEFAULT_THRIFT_TIMEOUT,
+                    unit => millisecond
+                }),
+                ?DEFAULT_THRIFT_TIMEOUT
+        end,
+        Timeouts
+    ).

+ 1 - 1
apps/emqx_bridge_pulsar/mix.exs

@@ -25,7 +25,7 @@ defmodule EMQXBridgePulsar.MixProject do
     [
       UMP.common_dep(:crc32cer),
       UMP.common_dep(:snappyer),
-      {:pulsar, github: "emqx/pulsar-client-erl", tag: "0.8.4"},
+      {:pulsar, github: "emqx/pulsar-client-erl", tag: "0.8.5"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false}

+ 1 - 1
apps/emqx_bridge_pulsar/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.4"}}},
+    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.5"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 2 - 0
apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl

@@ -15,6 +15,7 @@
 -include_lib("emqx/include/emqx_trace.hrl").
 -include("emqx_bridge_snowflake.hrl").
 -include_lib("emqx_connector_aggregator/include/emqx_connector_aggregator.hrl").
+-include_lib("emqx_connector_jwt/include/emqx_connector_jwt_tables.hrl").
 
 -elvis([{elvis_style, macro_module_names, disable}]).
 
@@ -798,6 +799,7 @@ destroy_action(ActionResId, ActionState) ->
             ok
     end,
     ok = ehttpc_sup:stop_pool(ActionResId),
+    ok = emqx_connector_jwt:delete_jwt(?JWT_TABLE, ActionResId),
     ok.
 
 run_aggregated_action(Batch, #{aggreg_id := AggregId}) ->

+ 1 - 1
apps/emqx_connector/mix.exs

@@ -36,7 +36,7 @@ defmodule EMQXConnector.MixProject do
       {:emqx_connector_jwt, in_umbrella: true},
       UMP.common_dep(:jose),
       UMP.common_dep(:ecpool),
-      {:eredis_cluster, github: "emqx/eredis_cluster", tag: "0.8.5"},
+      {:eredis_cluster, github: "emqx/eredis_cluster", tag: "0.8.6"},
       UMP.common_dep(:ehttpc),
       UMP.common_dep(:emqtt),
     ]

+ 2 - 2
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -202,8 +202,8 @@ fields(limit) ->
     [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}];
 fields(cursor) ->
     Desc = <<"Opaque value representing the current iteration state.">>,
-    Meta = #{default => none, in => query, desc => Desc},
-    [{cursor, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}];
+    Meta = #{required => false, in => query, desc => Desc},
+    [{cursor, hoconsc:mk(binary(), Meta)}];
 fields(cursor_response) ->
     Desc = <<"Opaque value representing the current iteration state.">>,
     Meta = #{desc => Desc, required => false},

+ 88 - 29
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -10,6 +10,11 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-define(DESC_BAD_REQUEST, <<"Erroneous request">>).
+-define(RESP_BAD_REQUEST(MSG),
+    {400, #{code => <<"BAD_REQUEST">>, message => MSG}}
+).
+
 -define(DESC_NOT_FOUND, <<"Queue not found">>).
 -define(RESP_NOT_FOUND,
     {404, #{code => <<"NOT_FOUND">>, message => ?DESC_NOT_FOUND}}
@@ -71,11 +76,13 @@ schema("/durable_queues") ->
             tags => ?TAGS,
             summary => <<"List declared durable queues">>,
             description => ?DESC("durable_queues_get"),
+            parameters => [
+                hoconsc:ref(emqx_dashboard_swagger, cursor),
+                hoconsc:ref(emqx_dashboard_swagger, limit)
+            ],
             responses => #{
-                200 => emqx_dashboard_swagger:schema_with_example(
-                    durable_queues_get(),
-                    durable_queues_get_example()
-                )
+                200 => resp_list_durable_queues(),
+                400 => error_codes(['BAD_REQUEST'], ?DESC_BAD_REQUEST)
             }
         },
         post => #{
@@ -84,10 +91,7 @@ schema("/durable_queues") ->
             description => ?DESC("durable_queues_post"),
             'requestBody' => durable_queue_post(),
             responses => #{
-                201 => emqx_dashboard_swagger:schema_with_example(
-                    durable_queue_get(),
-                    durable_queue_get_example()
-                ),
+                201 => resp_create_durable_queue(),
                 409 => error_codes(['CONFLICT'], ?DESC_CREATE_CONFICT)
             }
         }
@@ -121,8 +125,23 @@ schema("/durable_queues/:id") ->
         }
     }.
 
-'/durable_queues'(get, _Params) ->
-    {200, queue_list()};
+'/durable_queues'(get, #{query_string := QString}) ->
+    Cursor = maps:get(<<"cursor">>, QString, undefined),
+    Limit = maps:get(<<"limit">>, QString),
+    try queue_list(Cursor, Limit) of
+        {Queues, CursorNext} ->
+            Data = [encode_props(ID, Props) || {ID, Props} <- Queues],
+            case CursorNext of
+                undefined ->
+                    Meta = #{hasnext => false};
+                _Cursor ->
+                    Meta = #{hasnext => true, cursor => CursorNext}
+            end,
+            {200, #{data => Data, meta => Meta}}
+    catch
+        throw:Error ->
+            ?RESP_BAD_REQUEST(emqx_utils:readable_error_msg(Error))
+    end;
 '/durable_queues'(post, #{body := Params}) ->
     case queue_declare(Params) of
         {ok, Queue} ->
@@ -135,7 +154,7 @@ schema("/durable_queues/:id") ->
 
 '/durable_queues/:id'(get, Params) ->
     case queue_get(Params) of
-        Queue when Queue =/= false ->
+        {ok, Queue} ->
             {200, encode_queue(Queue)};
         false ->
             ?RESP_NOT_FOUND
@@ -152,9 +171,8 @@ schema("/durable_queues/:id") ->
             ?RESP_INTERNAL_ERROR(emqx_utils:readable_error_msg(Reason))
     end.
 
-queue_list() ->
-    %% TODO
-    [].
+queue_list(Cursor, Limit) ->
+    emqx_ds_shared_sub_queue:list(Cursor, Limit).
 
 queue_get(#{bindings := #{id := ID}}) ->
     emqx_ds_shared_sub_queue:lookup(ID).
@@ -175,6 +193,9 @@ encode_queue(Queue) ->
         emqx_ds_shared_sub_queue:properties(Queue)
     ).
 
+encode_props(ID, Props) ->
+    maps:merge(#{id => ID}, Props).
+
 %%--------------------------------------------------------------------
 %% Schemas
 %%--------------------------------------------------------------------
@@ -190,26 +211,50 @@ param_queue_id() ->
         })
     }.
 
+resp_list_durable_queues() ->
+    emqx_dashboard_swagger:schema_with_example(
+        ref(resp_list_durable_queues),
+        durable_queues_list_example()
+    ).
+
+resp_create_durable_queue() ->
+    emqx_dashboard_swagger:schema_with_example(
+        durable_queue_post(),
+        durable_queue_get_example()
+    ).
+
 validate_queue_id(Id) ->
     case emqx_topic:words(Id) of
         [Segment] when is_binary(Segment) -> true;
         _ -> {error, <<"Invalid queue id">>}
     end.
 
-durable_queues_get() ->
-    hoconsc:array(ref(durable_queue_get)).
-
 durable_queue_get() ->
-    ref(durable_queue_get).
+    ref(durable_queue).
 
 durable_queue_post() ->
     map().
 
 roots() -> [].
 
-fields(durable_queue_get) ->
+fields(durable_queue) ->
     [
-        {id, mk(binary(), #{})}
+        {id,
+            mk(binary(), #{
+                desc => <<"Identifier assigned at creation time">>
+            })},
+        {created_at,
+            mk(emqx_utils_calendar:epoch_millisecond(), #{
+                desc => <<"Queue creation time">>
+            })},
+        {group, mk(binary(), #{})},
+        {topic, mk(binary(), #{})},
+        {start_time, mk(emqx_utils_calendar:epoch_millisecond(), #{})}
+    ];
+fields(resp_list_durable_queues) ->
+    [
+        {data, hoconsc:mk(hoconsc:array(durable_queue_get()), #{})},
+        {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta_with_cursor), #{})}
     ].
 
 %%--------------------------------------------------------------------
@@ -218,15 +263,29 @@ fields(durable_queue_get) ->
 
 durable_queue_get_example() ->
     #{
-        id => <<"queue1">>
+        id => <<"mygrp:1234EF">>,
+        created_at => <<"2024-01-01T12:34:56.789+02:00">>,
+        group => <<"mygrp">>,
+        topic => <<"t/devices/#">>,
+        start_time => <<"2024-01-01T00:00:00.000+02:00">>
     }.
 
-durable_queues_get_example() ->
-    [
-        #{
-            id => <<"queue1">>
-        },
-        #{
-            id => <<"queue2">>
+durable_queues_list_example() ->
+    #{
+        data => [
+            durable_queue_get_example(),
+            #{
+                id => <<"mygrp:567890AABBCC">>,
+                created_at => <<"2024-02-02T22:33:44.000+02:00">>,
+                group => <<"mygrp">>,
+                topic => <<"t/devices/#">>,
+                start_time => <<"1970-01-01T02:00:00+02:00">>
+            }
+        ],
+        %% TODO: Probably needs to be defined in `emqx_dashboard_swagger`.
+        meta => #{
+            <<"count">> => 2,
+            <<"cursor">> => <<"g2wAAAADYQFhAm0AAAACYzJq">>,
+            <<"hasnext">> => true
         }
-    ].
+    }.

+ 2 - 6
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl

@@ -260,9 +260,7 @@ handle_leader_update_streams(
         id => Id,
         version_old => VersionOld,
         version_new => VersionNew,
-        stream_progresses => emqx_persistent_session_ds_shared_subs:format_stream_progresses(
-            StreamProgresses
-        )
+        stream_progresses => StreamProgresses
     }),
     {AddEvents, Streams1} = lists:foldl(
         fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) ->
@@ -299,9 +297,7 @@ handle_leader_update_streams(
     StreamLeaseEvents = AddEvents ++ RevokeEvents,
     ?tp(debug, shared_sub_group_sm_leader_update_streams, #{
         id => Id,
-        stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events(
-            StreamLeaseEvents
-        )
+        stream_lease_events => StreamLeaseEvents
     }),
     transition(
         GSM,

+ 2 - 3
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -93,7 +93,6 @@
 
 become(ShareTopicFilter, Claim) ->
     Data0 = init_data(ShareTopicFilter),
-    Data0 =/= false orelse exit(shared_subscription_not_declared),
     Data1 = attach_claim(Claim, Data0),
     gen_statem:enter_loop(?MODULE, [], ?leader_active, Data1, init_claim_renewal(Data1)).
 
@@ -110,7 +109,7 @@ init(_Args) ->
 init_data(#share{topic = Topic} = ShareTopicFilter) ->
     StoreID = emqx_ds_shared_sub_store:mk_id(ShareTopicFilter),
     case emqx_ds_shared_sub_store:open(StoreID) of
-        Store when Store =/= false ->
+        {ok, Store} ->
             ?tp(debug, dssub_store_open, #{topic => ShareTopicFilter, store => Store}),
             #{
                 group_id => ShareTopicFilter,
@@ -122,7 +121,7 @@ init_data(#share{topic = Topic} = ShareTopicFilter) ->
         false ->
             %% NOTE: No leader store -> no subscription
             ?tp(warning, dssub_store_notfound, #{topic => ShareTopicFilter}),
-            false
+            exit(shared_subscription_not_declared)
     end.
 
 attach_claim(Claim, Data) ->

+ 0 - 4
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl

@@ -40,15 +40,11 @@ format_leader_msg(Msg) ->
 
 format_agent_msg_value(agent_msg_type, Type) ->
     agent_msg_type(Type);
-format_agent_msg_value(agent_msg_stream_states, StreamStates) ->
-    emqx_persistent_session_ds_shared_subs:format_stream_progresses(StreamStates);
 format_agent_msg_value(_, Value) ->
     Value.
 
 format_leader_msg_value(leader_msg_type, Type) ->
     leader_msg_type(Type);
-format_leader_msg_value(leader_msg_streams, Streams) ->
-    emqx_persistent_session_ds_shared_subs:format_lease_events(Streams);
 format_leader_msg_value(_, Value) ->
     Value.
 

+ 43 - 5
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_queue.erl

@@ -10,7 +10,8 @@
     exists/2,
     declare/4,
     destroy/1,
-    destroy/2
+    destroy/2,
+    list/2
 ]).
 
 -export([
@@ -60,9 +61,7 @@ destroy(Group, Topic) ->
 destroy(ID) ->
     %% TODO: There's an obvious lack of transactionality.
     case lookup(ID) of
-        false ->
-            not_found;
-        Queue ->
+        {ok, Queue} ->
             #{topic := Topic} = properties(Queue),
             case emqx_ds_shared_sub_store:destroy(Queue) of
                 ok ->
@@ -70,9 +69,19 @@ destroy(ID) ->
                     ok;
                 Error ->
                     Error
-            end
+            end;
+        false ->
+            not_found
     end.
 
+list(undefined, Limit) ->
+    list(select_properties(), Limit);
+list(Cursor, Limit) when is_binary(Cursor) ->
+    list(select_properties(Cursor), Limit);
+list(Select, Limit) ->
+    {Records, SelectNext} = emqx_ds_shared_sub_store:select_next(Select, Limit),
+    {Records, preserve_cursor(SelectNext)}.
+
 ensure_route(Topic, QueueID) ->
     _ = emqx_persistent_session_ds_router:do_add_route(Topic, QueueID),
     _ = emqx_external_broker:add_persistent_route(Topic, QueueID),
@@ -89,6 +98,35 @@ ensure_delete_route(Topic, QueueID) ->
 
 %%
 
+select_properties() ->
+    emqx_ds_shared_sub_store:select(properties).
+
+select_properties(Cursor) ->
+    try
+        emqx_ds_shared_sub_store:select(properties, decode_cursor(Cursor))
+    catch
+        error:_ ->
+            throw("Invalid cursor")
+    end.
+
+preserve_cursor(end_of_iterator) ->
+    undefined;
+preserve_cursor(Select) ->
+    case emqx_ds_shared_sub_store:select_next(Select, 1) of
+        {[], end_of_iterator} ->
+            undefined;
+        {[_], _} ->
+            encode_cursor(emqx_ds_shared_sub_store:select_preserve(Select))
+    end.
+
+encode_cursor(Cursor) ->
+    emqx_base62:encode(term_to_binary(Cursor)).
+
+decode_cursor(Cursor) ->
+    binary_to_term(emqx_base62:decode(Cursor), [safe]).
+
+%%
+
 id(Queue) ->
     emqx_ds_shared_sub_store:id(Queue).
 

+ 7 - 4
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl

@@ -50,10 +50,13 @@ injected_fields() ->
     #{
         'durable_storage' => [
             {queues,
-                emqx_ds_schema:storage_schema(#{
-                    importance => ?IMPORTANCE_HIDDEN,
-                    desc => ?DESC(durable_queues_storage)
-                })}
+                emqx_ds_schema:db_schema(
+                    basic,
+                    #{
+                        importance => ?IMPORTANCE_HIDDEN,
+                        desc => ?DESC(durable_queues_storage)
+                    }
+                )}
         ]
     }.
 

+ 147 - 70
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_store.erl

@@ -49,6 +49,13 @@
     delete/3
 ]).
 
+-export([
+    select/1,
+    select/2,
+    select_next/2,
+    select_preserve/1
+]).
+
 %% Messing with IDs
 -export([
     mk_id/1,
@@ -110,25 +117,20 @@ tune_db_config(Config0 = #{backend := Backend}) ->
     },
     case Backend of
         B when B == builtin_raft; B == builtin_local ->
-            tune_db_storage_layout(Config);
+            Storage =
+                {emqx_ds_storage_bitfield_lts, #{
+                    %% Should be enough, topic structure is pretty simple.
+                    topic_index_bytes => 4,
+                    bits_per_wildcard_level => 64,
+                    %% Enables single-epoch storage.
+                    epoch_bits => 64,
+                    lts_threshold_spec => {simple, {inf, 0, inf, 0}}
+                }},
+            Config#{storage => Storage};
         _ ->
             Config
     end.
 
-tune_db_storage_layout(Config = #{storage := {Layout, Opts0}}) when
-    Layout == emqx_ds_storage_skipstream_lts;
-    Layout == emqx_ds_storage_bitfield_lts
-->
-    Opts = Opts0#{
-        %% Since these layouts impose somewhat strict requirements on message
-        %% timestamp uniqueness, we need to additionally ensure that LTS always
-        %% keeps different groups under separate indices.
-        lts_threshold_spec => {simple, {inf, inf, inf, 0}}
-    },
-    Config#{storage := {Layout, Opts}};
-tune_db_storage_layout(Config = #{storage := _}) ->
-    Config.
-
 %%
 
 -spec mk_id(emqx_types:share()) -> id().
@@ -286,12 +288,7 @@ mk_leader_topic(ID) ->
     %% General.
     id := id(),
     %% Spaces and variables: most up-to-date in-memory state.
-    properties := #{
-        %% TODO: Efficient encoding.
-        topic => emqx_types:topic(),
-        start_time => emqx_message:timestamp(),
-        created_at => emqx_message:timestamp()
-    },
+    properties := properties(),
     stream := #{emqx_ds:stream() => stream_state()},
     rank_progress => _RankProgress,
     %% Internal _sequence numbers_ that tracks every change.
@@ -304,6 +301,14 @@ mk_leader_topic(ID) ->
     stage := #{space_key() | var_name() => _Value}
 }.
 
+-type properties() :: #{
+    %% TODO: Efficient encoding.
+    group => emqx_types:group(),
+    topic => emqx_types:topic(),
+    start_time => emqx_message:timestamp(),
+    created_at => emqx_message:timestamp()
+}.
+
 -type stream_state() :: #{
     progress => emqx_persistent_session_ds_shared_subs:progress(),
     rank => emqx_ds:stream_rank()
@@ -314,7 +319,7 @@ init(ID) ->
     %% NOTE: Empty store is impicitly dirty because rootset needs to be persisted.
     mk_store(ID).
 
--spec open(id()) -> t() | false.
+-spec open(id()) -> {ok, t()} | false | emqx_ds:error(_).
 open(ID) ->
     case open_rootset(ID) of
         Rootset = #{} ->
@@ -354,7 +359,9 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
     TopicFilter = mk_store_wildcard(ID),
     StreamIts1 = ds_refresh_streams(TopicFilter, _StartTime = 0, StreamIts0),
     {StreamIts, Store} = ds_streams_fold(
-        fun(Message, StoreAcc) -> open_message(Message, StoreAcc) end,
+        fun(Message, StoreAcc) ->
+            lists:foldl(fun slurp_record/2, StoreAcc, open_message(Message))
+        end,
         Acc,
         StreamIts1
     ),
@@ -364,7 +371,7 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
         %% concerning, because this suggests there were concurrent writes that slipped
         %% past the leadership claim guards, yet we can still make progress.
         SeqNum when SeqNum >= map_get(seqnum, Rootset) ->
-            reset_dirty(maps:merge(Store, Rootset));
+            {ok, reset_dirty(maps:merge(Store, Rootset))};
         _Mismatch when Retries > 0 ->
             ok = timer:sleep(RetryTimeout),
             slurp_store(Rootset, StreamIts, Retries - 1, RetryTimeout, Store);
@@ -372,6 +379,9 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
             {error, unrecoverable, {leader_store_inconsistent, Store, Rootset}}
     end.
 
+slurp_record({_ID, Record, ChangeSeqNum}, Store = #{seqnum := SeqNum}) ->
+    open_record(Record, Store#{seqnum := max(ChangeSeqNum, SeqNum)}).
+
 -spec get(space_name(), _ID, t()) -> _Value.
 get(SpaceName, ID, Store) ->
     Space = maps:get(SpaceName, Store),
@@ -614,6 +624,21 @@ mk_store_root_matcher(#{id := ID, committed := Committed}) ->
         timestamp = 0
     }.
 
+mk_read_root_batch(ID) ->
+    %% NOTE
+    %% Construct batch that essentially does nothing but reads rootset in a consistent
+    %% manner.
+    Matcher = #message_matcher{
+        from = ID,
+        topic = mk_store_root_topic(ID),
+        payload = '_',
+        timestamp = 0
+    },
+    #dsbatch{
+        preconditions = [{unless_exists, Matcher}],
+        operations = [{delete, Matcher#message_matcher{payload = <<>>}}]
+    }.
+
 mk_store_operation(ID, SK, ?STORE_TOMBSTONE, SeqMap) ->
     {delete, #message_matcher{
         from = ID,
@@ -644,37 +669,27 @@ open_root_message(#message{payload = Payload, timestamp = 0}) ->
     #{} = binary_to_term(Payload).
 
 open_message(
-    Msg = #message{topic = Topic, payload = Payload, timestamp = SeqNum, headers = Headers}, Store
+    Msg = #message{topic = Topic, payload = Payload, timestamp = SeqNum, headers = Headers}
 ) ->
-    Entry =
-        try
-            ChangeSeqNum = maps:get(?STORE_HEADER_CHANGESEQNUM, Headers),
-            case emqx_topic:tokens(Topic) of
-                [_Prefix, _ID, SpaceTok, _SeqTok] ->
-                    SpaceName = token_to_space(SpaceTok),
-                    ?STORE_PAYLOAD(ID, Value) = binary_to_term(Payload),
-                    %% TODO: Records.
-                    Record = {SpaceName, ID, Value, SeqNum};
-                [_Prefix, _ID, VarTok] ->
-                    VarName = token_to_varname(VarTok),
-                    Value = binary_to_term(Payload),
-                    Record = {VarName, Value}
-            end,
-            {ChangeSeqNum, Record}
-        catch
-            error:_ ->
-                ?tp(warning, "dssub_leader_store_unrecognized_message", #{
-                    store => id(Store),
-                    message => Msg
-                }),
-                unrecognized
+    try
+        case emqx_topic:tokens(Topic) of
+            [_Prefix, SpaceID, SpaceTok, _SeqTok] ->
+                SpaceName = token_to_space(SpaceTok),
+                ?STORE_PAYLOAD(ID, Value) = binary_to_term(Payload),
+                %% TODO: Records.
+                Record = {SpaceName, ID, Value, SeqNum};
+            [_Prefix, SpaceID, VarTok] ->
+                VarName = token_to_varname(VarTok),
+                Value = binary_to_term(Payload),
+                Record = {VarName, Value}
         end,
-    open_entry(Entry, Store).
-
-open_entry({ChangeSeqNum, Record}, Store = #{seqnum := SeqNum}) ->
-    open_record(Record, Store#{seqnum := max(ChangeSeqNum, SeqNum)});
-open_entry(unrecognized, Store) ->
-    Store.
+        ChangeSeqNum = maps:get(?STORE_HEADER_CHANGESEQNUM, Headers),
+        [{SpaceID, Record, ChangeSeqNum}]
+    catch
+        error:_ ->
+            ?tp(warning, "dssub_leader_store_unrecognized_message", #{message => Msg}),
+            []
+    end.
 
 open_record({SpaceName, ID, Value, SeqNum}, Store = #{seqmap := SeqMap}) ->
     Space0 = maps:get(SpaceName, Store),
@@ -692,6 +707,77 @@ mk_store_payload(?STORE_SK(_SpaceName, ID), Value) ->
 mk_store_payload(_VarName, Value) ->
     Value.
 
+%%
+
+-record(select, {tf, start, it, streams}).
+
+-type select() :: #select{}.
+
+-spec select(var_name()) -> select().
+select(VarName) ->
+    select_jump(select_new(VarName)).
+
+-spec select(var_name(), _Cursor) -> select().
+select(VarName, Cursor) ->
+    select_restore(Cursor, select_new(VarName)).
+
+select_new(VarName) ->
+    TopicFilter = mk_store_varname_wildcard(VarName),
+    StartTime = 0,
+    RankedStreams = emqx_ds:get_streams(?DS_DB, TopicFilter, StartTime),
+    Streams = [S || {_Rank, S} <- lists:sort(RankedStreams)],
+    #select{tf = TopicFilter, start = StartTime, streams = Streams}.
+
+select_jump(It = #select{tf = TopicFilter, start = StartTime, streams = [Stream | Rest]}) ->
+    DSIt = ds_make_iterator(Stream, TopicFilter, StartTime),
+    It#select{it = DSIt, streams = Rest};
+select_jump(It = #select{streams = []}) ->
+    It#select{it = undefined}.
+
+-spec select_next(select(), _N) -> {[{id(), _Var}], select() | end_of_iterator}.
+select_next(ItSelect, N) ->
+    select_fold(
+        ItSelect,
+        N,
+        fun(Message, Acc) ->
+            [{ID, Var} || {ID, {_VarName, Var}, _} <- open_message(Message)] ++ Acc
+        end,
+        []
+    ).
+
+select_fold(#select{it = undefined}, _, _Fun, Acc) ->
+    {Acc, end_of_iterator};
+select_fold(It = #select{it = DSIt0}, N, Fun, Acc0) ->
+    case emqx_ds:next(?DS_DB, DSIt0, N) of
+        {ok, DSIt, Messages} ->
+            Acc = lists:foldl(fun({_Key, Msg}, Acc) -> Fun(Msg, Acc) end, Acc0, Messages),
+            case length(Messages) of
+                N ->
+                    {Acc, It#select{it = DSIt}};
+                NLess when NLess < N ->
+                    select_fold(select_jump(It), N - NLess, Fun, Acc)
+            end;
+        {ok, end_of_stream} ->
+            select_fold(select_jump(It), N, Fun, Acc0)
+    end.
+
+-spec select_preserve(select()) -> _Cursor.
+select_preserve(#select{it = It, streams = Streams}) ->
+    case Streams of
+        [StreamNext | _Rest] ->
+            %% Preserve only the subsequent stream.
+            [It | StreamNext];
+        [] ->
+            %% Iterating over last stream, preserve only iterator.
+            [It]
+    end.
+
+select_restore([It], Select) ->
+    Select#select{it = It, streams = []};
+select_restore([It | StreamNext], Select = #select{streams = Streams}) ->
+    StreamsRest = lists:dropwhile(fun(S) -> S =/= StreamNext end, Streams),
+    Select#select{it = It, streams = StreamsRest}.
+
 mk_store_root_topic(ID) ->
     emqx_topic:join([?STORE_TOPIC_PREFIX, ID]).
 
@@ -705,20 +791,8 @@ mk_store_topic(ID, VarName, _SeqMap) ->
 mk_store_wildcard(ID) ->
     [?STORE_TOPIC_PREFIX, ID, '+', '#'].
 
-mk_read_root_batch(ID) ->
-    %% NOTE
-    %% Construct batch that essentially does nothing but reads rootset in a consistent
-    %% manner.
-    Matcher = #message_matcher{
-        from = ID,
-        topic = mk_store_root_topic(ID),
-        payload = '_',
-        timestamp = 0
-    },
-    #dsbatch{
-        preconditions = [{unless_exists, Matcher}],
-        operations = [{delete, Matcher#message_matcher{payload = <<>>}}]
-    }.
+mk_store_varname_wildcard(VarName) ->
+    [?STORE_TOPIC_PREFIX, '+', varname_to_token(VarName)].
 
 ds_refresh_streams(TopicFilter, StartTime, StreamIts) ->
     Streams = emqx_ds:get_streams(?DS_DB, TopicFilter, StartTime),
@@ -728,15 +802,18 @@ ds_refresh_streams(TopicFilter, StartTime, StreamIts) ->
                 #{Stream := _It} ->
                     Acc;
                 #{} ->
-                    %% TODO: Gracefully handle `emqx_ds:error(_)`?
-                    {ok, It} = emqx_ds:make_iterator(?DS_DB, Stream, TopicFilter, StartTime),
-                    Acc#{Stream => It}
+                    Acc#{Stream => ds_make_iterator(Stream, TopicFilter, StartTime)}
             end
         end,
         StreamIts,
         Streams
     ).
 
+ds_make_iterator(Stream, TopicFilter, StartTime) ->
+    %% TODO: Gracefully handle `emqx_ds:error(_)`?
+    {ok, It} = emqx_ds:make_iterator(?DS_DB, Stream, TopicFilter, StartTime),
+    It.
+
 ds_streams_fold(Fun, AccIn, StreamItsIn) ->
     maps:fold(
         fun(Stream, It0, {StreamIts, Acc0}) ->

+ 76 - 0
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -425,6 +425,82 @@ t_intensive_reassign(_Config) ->
     ok = emqtt:disconnect(ConnShared3),
     ok = emqtt:disconnect(ConnPub).
 
+t_multiple_groups(groups, _Groups) ->
+    [declare_explicit];
+t_multiple_groups('init', Config) ->
+    Now = emqx_message:timestamp_now(),
+    NQueues = 50,
+    Group = <<"multi">>,
+    Topics = [emqx_utils:format("t/mg/~p", [I]) || I <- lists:seq(1, NQueues)],
+    Queues = lists:map(
+        fun(Topic) ->
+            {ok, Queue} = emqx_ds_shared_sub_queue:declare(Group, wildcard(Topic), Now, 0),
+            Queue
+        end,
+        Topics
+    ),
+    [
+        {queue_group, Group},
+        {queue_topics, Topics},
+        {queues, Queues}
+        | Config
+    ];
+t_multiple_groups('end', Config) ->
+    Topics = proplists:get_value(queue_topics, Config),
+    lists:foreach(
+        fun(Topic) -> emqx_ds_shared_sub_queue:destroy(<<"multi">>, Topic) end,
+        Topics
+    ).
+
+t_multiple_groups(Config) ->
+    Topics = proplists:get_value(queue_topics, Config),
+    NSubs = 20,
+    NPubs = 1000,
+    NQueues = length(Topics),
+    ConnPub = emqtt_connect_pub(<<"t_multiple_groups:pub">>),
+    ConnSubs = lists:map(
+        fun(I) ->
+            ClientId = emqx_utils:format("t_multiple_groups:sub:~p", [I]),
+            ConnSub = emqtt_connect_sub(ClientId),
+            ok = lists:foreach(
+                fun(Ti) ->
+                    Topic = lists:nth(Ti, Topics),
+                    TopicSub = emqx_topic:join([<<"$share/multi">>, wildcard(Topic)]),
+                    {ok, _, [1]} = emqtt:subscribe(ConnSub, TopicSub, 1)
+                end,
+                lists:seq(I, NQueues, NSubs)
+            ),
+            ConnSub
+        end,
+        lists:seq(1, NSubs)
+    ),
+
+    Payloads = lists:map(
+        fun(Pi) ->
+            Qi = pick_queue(Pi, NQueues),
+            Payload = integer_to_binary(Pi),
+            TopicPub = emqx_topic:join([lists:nth(Qi, Topics), integer_to_binary(Pi)]),
+            {ok, _} = emqtt:publish(ConnPub, TopicPub, Payload, 1),
+            Payload
+        end,
+        lists:seq(1, NPubs)
+    ),
+
+    Pubs = drain_publishes(),
+    ?assertMatch(
+        {[_ | _], []},
+        lists:partition(fun(#{payload := P}) -> lists:member(P, Payloads) end, Pubs)
+    ),
+
+    lists:foreach(fun emqtt:disconnect/1, [ConnPub | ConnSubs]).
+
+pick_queue(I, NQueues) ->
+    %% NOTE: Allocate publishes to queues unevenly, but every queue is utilized.
+    round(math:sqrt(NQueues) * math:log2(I)) rem NQueues + 1.
+
+wildcard(Topic) ->
+    emqx_topic:join([Topic, '#']).
+
 t_unsubscribe('init', Config) ->
     declare_queue_if_needed(<<"gr9">>, <<"topic9/#">>, Config);
 t_unsubscribe('end', Config) ->

+ 118 - 29
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -10,14 +10,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--import(
-    emqx_mgmt_api_test_util,
-    [
-        request_api/2,
-        request/3,
-        uri/1
-    ]
-).
+-import(emqx_mgmt_api_test_util, [uri/1]).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -33,10 +26,12 @@ init_per_suite(Config) ->
                     },
                     <<"durable_storage">> => #{
                         <<"messages">> => #{
-                            <<"backend">> => <<"builtin_raft">>
+                            <<"backend">> => <<"builtin_raft">>,
+                            <<"n_shards">> => 4
                         },
                         <<"queues">> => #{
-                            <<"backend">> => <<"builtin_raft">>
+                            <<"backend">> => <<"builtin_raft">>,
+                            <<"n_shards">> => 4
                         }
                     }
                 }
@@ -67,6 +62,7 @@ init_per_testcase(_TC, Config) ->
 end_per_testcase(_TC, _Config) ->
     ok = snabbkaffe:stop(),
     ok = emqx_ds_shared_sub_registry:purge(),
+    ok = destroy_queues(),
     ok.
 %%--------------------------------------------------------------------
 %% Tests
@@ -74,7 +70,7 @@ end_per_testcase(_TC, _Config) ->
 
 t_basic_crud(_Config) ->
     ?assertMatch(
-        {ok, []},
+        {ok, #{<<"data">> := []}},
         api_get(["durable_queues"])
     ),
 
@@ -124,11 +120,11 @@ t_basic_crud(_Config) ->
         Resp2
     ),
 
-    %% TODO
-    %% ?assertMatch(
-    %%     {ok, [#{<<"id">> := <<"q2">>}, #{<<"id">> := <<"q1">>}]},
-    %%     api_get(["durable_queues"])
-    %% ),
+    {ok, 201, #{<<"id">> := QueueID2}} = Resp2,
+    ?assertMatch(
+        {ok, #{<<"data">> := [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}]}},
+        api_get(["durable_queues"])
+    ),
 
     ?assertMatch(
         {ok, 200, <<"Queue deleted">>},
@@ -139,13 +135,90 @@ t_basic_crud(_Config) ->
         api(delete, ["durable_queues", QueueID1], #{})
     ),
 
-    %% TODO
-    %% ?assertMatch(
-    %%     {ok, [#{<<"id">> := <<"q1">>}]},
-    %%     api_get(["durable_queues"])
-    %% ).
+    ?assertMatch(
+        {ok, #{<<"data">> := [#{<<"id">> := QueueID2}]}},
+        api_get(["durable_queues"])
+    ).
 
-    ok.
+t_list_queues(_Config) ->
+    {ok, 201, #{<<"id">> := QID1}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq1">>,
+        <<"topic">> => <<"#">>,
+        <<"start_time">> => 42
+    }),
+    {ok, 201, #{<<"id">> := QID2}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq2">>,
+        <<"topic">> => <<"specific/topic">>,
+        <<"start_time">> => 0
+    }),
+    {ok, 201, #{<<"id">> := QID3}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq3">>,
+        <<"topic">> => <<"1/2/3/+">>,
+        <<"start_time">> => emqx_message:timestamp_now()
+    }),
+    {ok, 201, #{<<"id">> := QID4}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq4">>,
+        <<"topic">> => <<"4/5/6/#">>,
+        <<"start_time">> => emqx_message:timestamp_now()
+    }),
+
+    {ok, Resp} = api_get(["durable_queues"]),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}, #{}, #{}, #{}],
+            <<"meta">> := #{<<"hasnext">> := false}
+        },
+        Resp
+    ),
+
+    {ok, Resp1} = api_get(["durable_queues"], #{limit => <<"1">>}),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}],
+            <<"meta">> := #{<<"hasnext">> := true, <<"cursor">> := _}
+        },
+        Resp1
+    ),
+
+    {ok, Resp2} = api_get(["durable_queues"], #{
+        limit => <<"1">>,
+        cursor => emqx_utils_maps:deep_get([<<"meta">>, <<"cursor">>], Resp1)
+    }),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}],
+            <<"meta">> := #{<<"hasnext">> := true, <<"cursor">> := _}
+        },
+        Resp2
+    ),
+
+    {ok, Resp3} = api_get(["durable_queues"], #{
+        limit => <<"2">>,
+        cursor => emqx_utils_maps:deep_get([<<"meta">>, <<"cursor">>], Resp2)
+    }),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}, #{}],
+            <<"meta">> := #{<<"hasnext">> := false}
+        },
+        Resp3
+    ),
+
+    Data = maps:get(<<"data">>, Resp),
+    ?assertEqual(
+        [QID1, QID2, QID3, QID4],
+        lists:sort([ID || #{<<"id">> := ID} <- Data]),
+        Resp
+    ),
+
+    Data1 = maps:get(<<"data">>, Resp1),
+    Data2 = maps:get(<<"data">>, Resp2),
+    Data3 = maps:get(<<"data">>, Resp3),
+    ?assertEqual(
+        [QID1, QID2, QID3, QID4],
+        lists:sort([ID || D <- [Data1, Data2, Data3], #{<<"id">> := ID} <- D]),
+        [Resp1, Resp2, Resp3]
+    ).
 
 t_duplicate_queue(_Config) ->
     ?assertMatch(
@@ -171,20 +244,36 @@ t_duplicate_queue(_Config) ->
         })
     ).
 
+%%--------------------------------------------------------------------
+
+destroy_queues() ->
+    case api_get(["durable_queues"], #{limit => <<"100">>}) of
+        {ok, #{<<"data">> := Queues}} ->
+            lists:foreach(fun destroy_queue/1, Queues);
+        Error ->
+            Error
+    end.
+
+destroy_queue(#{<<"id">> := QueueID}) ->
+    {ok, 200, _Deleted} = api(delete, ["durable_queues", QueueID], #{}).
+
 %%--------------------------------------------------------------------
 %% Helpers
 %%--------------------------------------------------------------------
 
 api_get(Path) ->
-    case request_api(get, uri(Path)) of
-        {ok, ResponseBody} ->
-            {ok, jiffy:decode(list_to_binary(ResponseBody), [return_maps])};
-        {error, _} = Error ->
-            Error
-    end.
+    api_response(emqx_mgmt_api_test_util:request_api(get, uri(Path))).
+
+api_get(Path, Query) ->
+    api_response(emqx_mgmt_api_test_util:request_api(get, uri(Path), Query, [])).
+
+api_response({ok, ResponseBody}) ->
+    {ok, jiffy:decode(iolist_to_binary(ResponseBody), [return_maps])};
+api_response({error, _} = Error) ->
+    Error.
 
 api(Method, Path, Data) ->
-    case request(Method, uri(Path), Data) of
+    case emqx_mgmt_api_test_util:request(Method, uri(Path), Data) of
         {ok, Code, ResponseBody} ->
             Res =
                 case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of

+ 25 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -87,6 +87,9 @@
         bits_per_wildcard_level => pos_integer(),
         topic_index_bytes => pos_integer(),
         epoch_bits => non_neg_integer(),
+        %% Which epochs are considered readable, i.e. visible in `next/6`?
+        %% Default: `complete`, unless it's a single neverending epoch, it's always readable.
+        epoch_readable => complete | any,
         lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
@@ -97,6 +100,7 @@
         topic_index_bytes := pos_integer(),
         ts_bits := non_neg_integer(),
         ts_offset_bits := non_neg_integer(),
+        epoch_readable => complete | any,
         lts_threshold_spec => emqx_ds_lts:threshold_spec()
     }.
 
@@ -109,6 +113,7 @@
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     ts_bits :: non_neg_integer(),
     ts_offset :: non_neg_integer(),
+    epoch_readable :: complete | any,
     threshold_fun :: emqx_ds_lts:threshold_fun(),
     gvars :: ets:table()
 }).
@@ -204,8 +209,15 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
+    TSBits = 64,
     %% 20 bits -> 1048576 us -> ~1 sec
     TSOffsetBits = maps:get(epoch_bits, Options, 20),
+    case TSBits - TSOffsetBits of
+        %% NOTE: There's a single, always incomplete epoch, consider it readable.
+        0 -> EpochReadableDefault = any;
+        _ -> EpochReadableDefault = complete
+    end,
+    EpochReadable = maps:get(epoch_readable, Options, EpochReadableDefault),
     ThresholdSpec = maps:get(lts_threshold_spec, Options, ?DEFAULT_LTS_THRESHOLD),
     %% Create column families:
     DataCFName = data_cf(GenId),
@@ -224,8 +236,9 @@ create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     Schema = #{
         bits_per_wildcard_level => BitsPerTopicLevel,
         topic_index_bytes => TopicIndexBytes,
-        ts_bits => 64,
+        ts_bits => TSBits,
         ts_offset_bits => TSOffsetBits,
+        epoch_readable => EpochReadable,
         lts_threshold_spec => ThresholdSpec
     },
     {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
@@ -267,6 +280,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         keymappers = KeymapperCache,
         ts_offset = TSOffsetBits,
         ts_bits = TSBits,
+        epoch_readable = maps:get(epoch_readable, Schema, complete),
         threshold_fun = emqx_ds_lts:threshold_fun(ThresholdSpec),
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
@@ -447,7 +461,7 @@ update_iterator(
 
 next(
     Shard,
-    Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
+    Schema = #s{ts_offset = TSOffset, ts_bits = TSBits, epoch_readable = EpochReadable},
     It = #{?storage_key := Stream},
     BatchSize,
     Now,
@@ -475,6 +489,9 @@ next(
                 %% iterator can jump to the next topic and then it
                 %% won't backtrack.
                 false;
+            _ when EpochReadable == any ->
+                %% Incomplete epochs are explicitly marked as readable:
+                false;
             _ ->
                 %% New batches are only added to the current
                 %% generation. We can ignore cutoff time for old
@@ -600,6 +617,9 @@ lookup_message(
             {error, unrecoverable, {rocksdb, Error}}
     end.
 
+handle_event(_ShardId, #s{epoch_readable = any}, _Time, tick) ->
+    %% No need for idle tracking.
+    [];
 handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
     %% If the last message was published more than one epoch ago, and
     %% the shard remains idle, we need to advance safety cutoff
@@ -959,10 +979,11 @@ deserialize(Blob) ->
 
 %% erlfmt-ignore
 make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
+    TsEpochBits = TSBits - TSOffsetBits,
     Bitsources =
     %% Dimension Offset   Bitsize
-        [{?DIM_TOPIC,     0,            TopicIndexBytes * ?BYTE_SIZE},      %% Topic index
-         {?DIM_TS,        TSOffsetBits, TSBits - TSOffsetBits       }] ++   %% Timestamp epoch
+        [{?DIM_TOPIC,     0,            TopicIndexBytes * ?BYTE_SIZE}] ++   %% Topic index
+        [{?DIM_TS,        TSOffsetBits, TsEpochBits} || TsEpochBits > 0] ++ %% Timestamp epoch
         [{?DIM_TS + I,    0,            BitsPerTopicLevel           }       %% Varying topic levels
                                                            || I <- lists:seq(1, N)] ++
         [{?DIM_TS,        0,            TSOffsetBits                }],     %% Timestamp offset

+ 22 - 10
apps/emqx_management/src/emqx_mgmt_api_banned.erl

@@ -282,16 +282,26 @@ list_banned([{As, '=:=', Who}], [], Params) ->
             data => lists:map(fun format/1, Result)
         }}
     end;
-list_banned([], [_Who], Params) ->
-    {200,
-        emqx_mgmt_api:node_query_with_tabs(
-            node(),
-            emqx_banned:tables(),
-            Params,
-            ?BANNED_QSCHEMA,
-            fun ?MODULE:qs2ms/2,
-            fun ?MODULE:format/1
-        )};
+list_banned([], [{_Type, like, Value}], Params) ->
+    case re:compile(Value) of
+        {ok, _} ->
+            {200,
+                emqx_mgmt_api:node_query_with_tabs(
+                    node(),
+                    emqx_banned:tables(),
+                    Params,
+                    ?BANNED_QSCHEMA,
+                    fun ?MODULE:qs2ms/2,
+                    fun ?MODULE:format/1
+                )};
+        {error, {Reason, Pos}} ->
+            {error, #{
+                message => <<"The filter is not a validation regex expression">>,
+                reason => emqx_utils_conv:bin(Reason),
+                position => Pos,
+                filter => Value
+            }}
+    end;
 list_banned(_QS, _FuzzyQS, _Params) ->
     {error, <<"too_many_filters">>}.
 
@@ -374,6 +384,8 @@ format(Banned) ->
 
 format_error(Error) when is_binary(Error) ->
     Error;
+format_error(Error) when is_map(Error) ->
+    emqx_utils_json:encode(Error);
 format_error(Reason) ->
     ErrorReason = io_lib:format("~p", [Reason]),
     erlang:iolist_to_binary(ErrorReason).

+ 8 - 8
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -956,19 +956,19 @@ list_clients(QString) ->
             {200, Response}
     end.
 
-list_clients_v2(get, #{query_string := QString0}) ->
+list_clients_v2(get, #{query_string := QString}) ->
     Nodes = emqx:running_nodes(),
-    case maps:get(<<"cursor">>, QString0, none) of
-        none ->
-            Cursor = initial_ets_cursor(Nodes),
-            do_list_clients_v2(Nodes, Cursor, QString0);
-        CursorBin when is_binary(CursorBin) ->
+    case QString of
+        #{<<"cursor">> := CursorBin} ->
             case parse_cursor(CursorBin, Nodes) of
                 {ok, Cursor} ->
-                    do_list_clients_v2(Nodes, Cursor, QString0);
+                    do_list_clients_v2(Nodes, Cursor, QString);
                 {error, bad_cursor} ->
                     ?BAD_REQUEST(<<"bad cursor">>)
-            end
+            end;
+        #{} ->
+            Cursor = initial_ets_cursor(Nodes),
+            do_list_clients_v2(Nodes, Cursor, QString)
     end.
 
 do_list_clients_v2(Nodes, Cursor, QString0) ->

+ 3 - 0
apps/emqx_management/src/emqx_mgmt_api_ds.erl

@@ -37,6 +37,9 @@
     forget/2
 ]).
 
+%% Internal exports:
+-export([is_enabled/0]).
+
 %% behavior callbacks:
 -export([
     namespace/0,

+ 5 - 2
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -494,7 +494,9 @@ update_ms(clientid, X, {{Topic, Pid}, Opts}) ->
 update_ms(topic, X, {{Topic, Pid}, Opts}) when
     is_record(Topic, share)
 ->
-    {{#share{group = '_', topic = X}, Pid}, Opts};
+    %% NOTE: Equivalent to `#share{group = '_', topic = X}`, but dialyzer is happy.
+    Share = setelement(#share.group, #share{group = <<>>, topic = X}, '_'),
+    {{Share, Pid}, Opts};
 update_ms(topic, X, {{Topic, Pid}, Opts}) when
     is_binary(Topic) orelse Topic =:= '_'
 ->
@@ -502,7 +504,8 @@ update_ms(topic, X, {{Topic, Pid}, Opts}) when
 update_ms(share_group, X, {{Topic, Pid}, Opts}) when
     not is_record(Topic, share)
 ->
-    {{#share{group = X, topic = Topic}, Pid}, Opts};
+    Share = #share{group = X, topic = Topic},
+    {{Share, Pid}, Opts};
 update_ms(qos, X, {{Topic, Pid}, Opts}) ->
     {{Topic, Pid}, Opts#{qos => X}}.
 

+ 7 - 3
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -898,15 +898,16 @@ data(_) ->
 %%--------------------------------------------------------------------
 %% @doc Durable storage command
 
+-if(?EMQX_RELEASE_EDITION == ee).
+
 ds(CMD) ->
-    case emqx_persistent_message:is_persistence_enabled() of
+    case emqx_mgmt_api_ds:is_enabled() of
         true ->
             do_ds(CMD);
         false ->
             emqx_ctl:usage([{"ds", "Durable storage is disabled"}])
     end.
 
--if(?EMQX_RELEASE_EDITION == ee).
 do_ds(["info"]) ->
     emqx_ds_replication_layer_meta:print_status();
 do_ds(["set_replicas", DBStr | SitesStr]) ->
@@ -966,9 +967,12 @@ do_ds(_) ->
         {"ds leave <storage> <site>", "Remove site from the replica set of the storage"},
         {"ds forget <site>", "Forcefully remove a site from the list of known sites"}
     ]).
+
 -else.
-do_ds(_CMD) ->
+
+ds(_CMD) ->
     emqx_ctl:usage([{"ds", "DS CLI is not available in this edition of EMQX"}]).
+
 -endif.
 
 %%--------------------------------------------------------------------

+ 4 - 0
apps/emqx_management/test/emqx_mgmt_api_banned_SUITE.erl

@@ -339,6 +339,10 @@ t_list_with_filters(_) ->
     test_for_list("like_peerhost_net=192.168", [<<"192.168.0.0/16">>]),
     test_for_list("like_peerhost_net=192.166", []),
 
+    %% with control characters
+    test_for_list("like_clientid=" ++ uri_string:quote("c\\d"), [<<"c1">>, <<"c2">>]),
+    ?assertMatch({error, _}, list_banned("like_clientid=???")),
+
     %% list all
     test_for_list([], [
         <<"c1">>,

+ 7 - 13
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -2200,28 +2200,22 @@ assert_contains_clientids(Results, ExpectedClientIds, Line) ->
     ).
 
 traverse_in_reverse_v2(QueryParams0, Results, Config) ->
-    Cursors0 =
-        lists:map(
-            fun(#{<<"meta">> := Meta}) ->
-                maps:get(<<"cursor">>, Meta, <<"wontbeused">>)
-            end,
-            Results
-        ),
-    Cursors1 = [<<"none">> | lists:droplast(Cursors0)],
+    Cursors = [C || #{<<"meta">> := #{<<"cursor">> := C}} <- lists:droplast(Results)],
+    CursorParams = [#{} | [#{cursor => C} || C <- Cursors]],
     DirectOrderClientIds = [
         ClientId
      || #{<<"data">> := Rows} <- Results,
         #{<<"clientid">> := ClientId} <- Rows
     ],
-    ReverseCursors = lists:reverse(Cursors1),
+    ReverseCursorParams = lists:reverse(CursorParams),
     do_traverse_in_reverse_v2(
-        QueryParams0, Config, ReverseCursors, DirectOrderClientIds, _Acc = []
+        QueryParams0, Config, ReverseCursorParams, DirectOrderClientIds, _Acc = []
     ).
 
-do_traverse_in_reverse_v2(_QueryParams0, _Config, _Cursors = [], DirectOrderClientIds, Acc) ->
+do_traverse_in_reverse_v2(_QueryParams0, _Config, [], DirectOrderClientIds, Acc) ->
     ?assertEqual(DirectOrderClientIds, Acc);
-do_traverse_in_reverse_v2(QueryParams0, Config, [Cursor | Rest], DirectOrderClientIds, Acc) ->
-    QueryParams = QueryParams0#{cursor => Cursor},
+do_traverse_in_reverse_v2(QueryParams0, Config, [CursorParam | Rest], DirectOrderClientIds, Acc) ->
+    QueryParams = maps:merge(QueryParams0, CursorParam),
     Res0 = list_v2_request(QueryParams, Config),
     ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := _}}}, Res0),
     {ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0,

+ 9 - 2
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -108,8 +108,8 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, [], Opts) when
 ->
     NewUrl =
         case QueryParams of
-            "" -> Url;
-            _ -> Url ++ "?" ++ QueryParams
+            [] -> Url;
+            _ -> Url ++ "?" ++ build_query_string(QueryParams)
         end,
     do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}, Opts);
 request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when
@@ -165,6 +165,13 @@ simplify_result(Res) ->
 auth_header_() ->
     emqx_common_test_http:default_auth_header().
 
+build_query_string(Query = #{}) ->
+    build_query_string(maps:to_list(Query));
+build_query_string(Query = [{_, _} | _]) ->
+    uri_string:compose_query([{emqx_utils_conv:bin(K), V} || {K, V} <- Query]);
+build_query_string(QueryString) ->
+    unicode:characters_to_list(QueryString).
+
 build_http_header(X) when is_list(X) ->
     X;
 build_http_header(X) ->

+ 1 - 1
apps/emqx_redis/mix.exs

@@ -25,7 +25,7 @@ defmodule EMQXRedis.MixProject do
     [
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
-      {:eredis_cluster, github: "emqx/eredis_cluster", tag: "0.8.5"}
+      {:eredis_cluster, github: "emqx/eredis_cluster", tag: "0.8.6"}
     ]
   end
 end

+ 1 - 1
apps/emqx_redis/rebar.config

@@ -3,7 +3,7 @@
 {erl_opts, [debug_info]}.
 {deps, [
     %% NOTE: mind ecpool version when updating eredis_cluster version
-    {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.8.5"}}},
+    {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.8.6"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}}
 ]}.

+ 16 - 1
apps/emqx_utils/src/emqx_variform_bif.erl

@@ -53,7 +53,9 @@
     join_to_string/1,
     join_to_string/2,
     unescape/1,
-    any_to_str/1
+    any_to_str/1,
+    is_empty_val/1,
+    'not'/1
 ]).
 
 %% Array functions
@@ -594,6 +596,19 @@ num_gte(A, B) ->
     R = num_comp(A, B),
     R =:= gt orelse R =:= eq.
 
+%% @doc Return 'true' if the argument is `undefined`, `null` or empty string, or empty array.
+is_empty_val(undefined) -> true;
+is_empty_val(null) -> true;
+is_empty_val(<<>>) -> true;
+is_empty_val([]) -> true;
+is_empty_val(_) -> false.
+
+%% @doc The 'not' operation for boolean values and strings.
+'not'(true) -> false;
+'not'(false) -> true;
+'not'(<<"true">>) -> <<"false">>;
+'not'(<<"false">>) -> <<"true">>.
+
 %%------------------------------------------------------------------------------
 %% System
 %%------------------------------------------------------------------------------

+ 21 - 0
apps/emqx_utils/test/emqx_variform_bif_tests.erl

@@ -79,3 +79,24 @@ system_test() ->
     EnvNameBin = erlang:list_to_binary(EnvName),
     os:putenv("EMQXVAR_" ++ EnvName, EnvVal),
     ?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)).
+
+empty_val_test_() ->
+    F = fun(X) -> emqx_variform_bif:is_empty_val(X) end,
+    [
+        ?_assert(F(undefined)),
+        ?_assert(F(null)),
+        ?_assert(F(<<>>)),
+        ?_assert(F([])),
+        ?_assertNot(F(true)),
+        ?_assertNot(F(false)),
+        ?_assertNot(F(<<"a">>))
+    ].
+
+bool_not_test_() ->
+    Not = fun(X) -> emqx_variform_bif:'not'(X) end,
+    [
+        ?_assertEqual(<<"false">>, Not(<<"true">>)),
+        ?_assertEqual(<<"true">>, Not(<<"false">>)),
+        ?_assertEqual(true, Not(false)),
+        ?_assertEqual(false, Not(true))
+    ].

+ 2 - 0
changes/ce/fix-13956.en.md

@@ -0,0 +1,2 @@
+Update `gen_rpc` library to version 3.4.1.
+The new version contains a fix that prevents escalation of client socket initialization errors to the node level at the server side.

+ 1 - 1
changes/ee/feat-13810.en.md

@@ -2,4 +2,4 @@ Add clinet-info authentication.
 
 Client-info (of type `cinfo`) authentication is a lightweight authentication mechanism which checks client properties and attributes against user defined rules.
 The rules make use of the Variform expression to define match conditions, and the authentication result when match is found.
-For example, to quickly fence off clients without a username, the match condition can be `str_eq(username, '')` associated with a check result `deny`.
+For example, to quickly fence off clients without a username, the match condition can be `is_empty_val(username)` associated with a check result `deny`.

+ 3 - 0
changes/ee/fix-13959.en.md

@@ -0,0 +1,3 @@
+Upgrade pulsar client from `0.8.4` to `0.8.5` (see [pulsar#62](https://github.com/emqx/pulsar-client-erl/pull/62).
+
+Prior to this fix, if the producer attempt to contact the client process in certain race conditions, it could stop and not be restarted.  The only workaround would then to manually restart the action.

+ 1 - 1
rel/config/ee-examples/cinfo-authn.conf

@@ -10,7 +10,7 @@ authentication = [
       # deny clients with empty username and client ID starts with 'v1-'
       {
         # when is_match is an array, it yields 'true' if all individual checks yield 'true'
-        is_match = ["str_eq(username, '')", "str_eq(nth(1,tokens(clientid,'-')), 'v1')"]
+        is_match = ["is_empty_val(username)", "str_eq(nth(1,tokens(clientid,'-')), 'v1')"]
         result = deny
       }
       # if all checks are exhausted without an 'allow' or a 'deny' result, continue to the next authentication

+ 2 - 0
rel/i18n/emqx_authn_cinfo_schema.hocon

@@ -28,11 +28,13 @@ emqx_authn_cinfo_schema {
       One Variform expression or an array of expressions to evaluate with a set of pre-bound variables derived from the client information.
       Supported variables:
       - `username`: the username of the client.
+      - `password`: the password of the client.
       - `clientid`: the client ID of the client.
       - `client_attrs.*`: the client attributes of the client.
       - `peerhost`: the IP address of the client.
       - `cert_subject`: the subject of the TLS certificate.
       - `cert_common_name`: the issuer of the TLS certificate.
+      - `zone`: the config zone associated with the listener from which the client is accepted.
       If the expression(s) all yields the string value `'true'`, then the associated `result` is returned from this authenticator.
       If any expression yields the other than `'true'`, then the current check is skipped."""
   }

+ 4 - 4
rel/i18n/emqx_mgmt_api_banned.hocon

@@ -74,15 +74,15 @@ filter_peerhost.desc:
 """Query the banned objects with an exact IP address."""
 
 filter_like_clientid.desc:
-"""Fuzzy query banned objects with a client ID."""
+"""Fuzzy query banned objects with a regular expression for client ID."""
 
 filter_like_username.desc:
-"""Fuzzy query banned objects with an username."""
+"""Fuzzy query banned objects with a regular expression for username."""
 
 filter_like_peerhost.desc:
-"""Fuzzy query banned objects with an IP address."""
+"""Fuzzy query banned objects with a regular expression for IP address."""
 
 filter_like_peerhost_net.desc:
-"""Fuzzy query banned objects with a CIDR."""
+"""Fuzzy query banned objects with a regular expression for CIDR."""
 
 }