Просмотр исходного кода

Merge branch 'release-57' into sync-r57-m-20240510

Thales Macedo Garitezi 1 год назад
Родитель
Сommit
6be4e6f631
26 измененных файлов с 426 добавлено и 207 удалено
  1. 24 0
      apps/emqx/src/config/emqx_config_zones.erl
  2. 7 1
      apps/emqx/src/emqx_listeners.erl
  3. 2 2
      apps/emqx/src/emqx_persistent_session_ds.erl
  4. 1 1
      apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl
  5. 1 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src
  6. 2 2
      apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl
  7. 24 17
      apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl
  8. 54 0
      apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl
  9. 19 10
      apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl
  10. 4 4
      apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl
  11. 1 1
      apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl
  12. 15 13
      apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl
  13. 0 42
      apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl
  14. 0 1
      apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src
  15. 3 2
      apps/emqx_management/src/emqx_mgmt_api_listeners.erl
  16. 24 2
      apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl
  17. 1 1
      apps/emqx_retainer/src/emqx_retainer.app.src
  18. 3 1
      apps/emqx_retainer/src/emqx_retainer.erl
  19. 104 59
      apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
  20. 9 2
      apps/emqx_retainer/src/emqx_retainer_mnesia.erl
  21. 108 34
      apps/emqx_retainer/test/emqx_retainer_SUITE.erl
  22. 11 11
      apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl
  23. 2 1
      apps/emqx_utils/src/emqx_variform_bif.erl
  24. 5 0
      changes/ce/fix-12993.en.md
  25. 1 0
      changes/ce/fix-12996.en.md
  26. 1 0
      changes/ee/fix-13001.en.md

+ 24 - 0
apps/emqx/src/config/emqx_config_zones.erl

@@ -20,6 +20,7 @@
 %% API
 -export([add_handler/0, remove_handler/0, pre_config_update/3]).
 -export([is_olp_enabled/0]).
+-export([assert_zone_exists/1]).
 
 -define(ZONES, [zones]).
 
@@ -44,3 +45,26 @@ is_olp_enabled() ->
         false,
         emqx_config:get([zones], #{})
     ).
+
+-spec assert_zone_exists(binary() | atom()) -> ok.
+assert_zone_exists(Name0) when is_binary(Name0) ->
+    %% an existing zone must have already an atom-name
+    Name =
+        try
+            binary_to_existing_atom(Name0)
+        catch
+            _:_ ->
+                throw({unknown_zone, Name0})
+        end,
+    assert_zone_exists(Name);
+assert_zone_exists(default) ->
+    %% there is always a 'default' zone
+    ok;
+assert_zone_exists(Name) when is_atom(Name) ->
+    try
+        _ = emqx_config:get([zones, Name]),
+        ok
+    catch
+        error:{config_not_found, _} ->
+            throw({unknown_zone, Name})
+    end.

+ 7 - 1
apps/emqx/src/emqx_listeners.erl

@@ -124,7 +124,7 @@ format_raw_listeners({Type0, Conf}) ->
                 Bind = parse_bind(LConf0),
                 MaxConn = maps:get(<<"max_connections">>, LConf0, default_max_conn()),
                 Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
-                LConf1 = maps:without([<<"authentication">>, <<"zone">>], LConf0),
+                LConf1 = maps:without([<<"authentication">>], LConf0),
                 LConf2 = maps:put(<<"running">>, Running, LConf1),
                 CurrConn =
                     case Running of
@@ -526,6 +526,7 @@ pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) ->
 pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) ->
     RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request),
     RawConf2 = ensure_override_limiter_conf(RawConf1, Request),
+    ok = assert_zone_exists(RawConf2),
     {ok, convert_certs(Type, Name, RawConf2)};
 pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) ->
     {ok, emqx_utils_maps:deep_merge(RawConf, Updated)};
@@ -896,6 +897,11 @@ convert_certs(Type, Name, Conf) ->
 filter_stacktrace({Reason, _Stacktrace}) -> Reason;
 filter_stacktrace(Reason) -> Reason.
 
+assert_zone_exists(#{<<"zone">> := Zone}) ->
+    emqx_config_zones:assert_zone_exists(Zone);
+assert_zone_exists(_) ->
+    ok.
+
 %% limiter config should override, not merge
 ensure_override_limiter_conf(Conf, #{<<"limiter">> := Limiter}) ->
     Conf#{<<"limiter">> => Limiter};

+ 2 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -622,7 +622,7 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) -
             replay_streams(Session#{replay := Rest}, ClientInfo);
         {error, recoverable, Reason} ->
             RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
-            ?SLOG(warning, #{
+            ?SLOG(debug, #{
                 msg => "failed_to_fetch_replay_batch",
                 stream => StreamKey,
                 reason => Reason,
@@ -925,7 +925,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
             Session#{s => S};
         {error, Class, Reason} ->
             %% TODO: Handle unrecoverable error.
-            ?SLOG(info, #{
+            ?SLOG(debug, #{
                 msg => "failed_to_fetch_batch",
                 stream => StreamKey,
                 reason => Reason,

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -220,7 +220,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream
                     },
                     emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
                 {error, recoverable, Reason} ->
-                    ?SLOG(warning, #{
+                    ?SLOG(debug, #{
                         msg => "failed_to_initialize_stream_iterator",
                         stream => Stream,
                         class => recoverable,

+ 1 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src

@@ -19,6 +19,7 @@
             emqx_bridge_s3_connector_info
         ]}
     ]},
+    {mod, {emqx_bridge_s3_app, []}},
     {modules, []},
     {links, []}
 ]}.

+ 2 - 2
apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_connector_aggreg_app).
+-module(emqx_bridge_s3_app).
 
 -behaviour(application).
 -export([start/2, stop/1]).
@@ -15,7 +15,7 @@
 %%------------------------------------------------------------------------------
 
 start(_StartType, _StartArgs) ->
-    emqx_connector_aggreg_sup:start_link().
+    emqx_bridge_s3_sup:start_link().
 
 stop(_State) ->
     ok.

+ 24 - 17
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -87,6 +87,8 @@
     channels := #{channel_id() => channel_state()}
 }.
 
+-define(AGGREG_SUP, emqx_bridge_s3_sup).
+
 %%
 
 -spec callback_mode() -> callback_mode().
@@ -209,6 +211,7 @@ start_channel(State, #{
         bucket := Bucket
     }
 }) ->
+    AggregId = {Type, Name},
     AggregOpts = #{
         time_interval => TimeInterval,
         max_records => MaxRecords,
@@ -223,19 +226,21 @@ start_channel(State, #{
         client_config => maps:get(client_config, State),
         uploader_config => maps:with([min_part_size, max_part_size], Parameters)
     },
-    _ = emqx_connector_aggreg_sup:delete_child({Type, Name}),
-    {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
-        id => {Type, Name},
-        start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]},
+    _ = ?AGGREG_SUP:delete_child(AggregId),
+    {ok, SupPid} = ?AGGREG_SUP:start_child(#{
+        id => AggregId,
+        start =>
+            {emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
         type => supervisor,
         restart => permanent
     }),
     #{
         type => ?ACTION_AGGREGATED_UPLOAD,
         name => Name,
+        aggreg_id => AggregId,
         bucket => Bucket,
         supervisor => SupPid,
-        on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end
+        on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
     }.
 
 upload_options(Parameters) ->
@@ -254,12 +259,14 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
     %% Since bucket name may be templated, we can't really provide any additional
     %% information regarding the channel health.
     ?status_connected;
-channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) ->
+channel_status(
+    #{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State
+) ->
     %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
     Timestamp = erlang:system_time(second),
-    ok = emqx_connector_aggregator:tick(Name, Timestamp),
+    ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
     ok = check_bucket_accessible(Bucket, State),
-    ok = check_aggreg_upload_errors(Name),
+    ok = check_aggreg_upload_errors(AggregId),
     ?status_connected.
 
 check_bucket_accessible(Bucket, #{client_config := Config}) ->
@@ -278,8 +285,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
             end
     end.
 
-check_aggreg_upload_errors(Name) ->
-    case emqx_connector_aggregator:take_error(Name) of
+check_aggreg_upload_errors(AggregId) ->
+    case emqx_connector_aggregator:take_error(AggregId) of
         [Error] ->
             %% TODO
             %% This approach means that, for example, 3 upload failures will cause
@@ -353,11 +360,11 @@ run_simple_upload(
             {error, map_error(Reason)}
     end.
 
-run_aggregated_upload(InstId, Records, #{name := Name}) ->
+run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
     Timestamp = erlang:system_time(second),
-    case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of
+    case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
         ok ->
-            ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}),
+            ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
             ok;
         {error, Reason} ->
             {error, {unrecoverable_error, Reason}}
@@ -406,8 +413,8 @@ init_transfer_state(BufferMap, Opts) ->
     Key = mk_object_key(BufferMap, Opts),
     emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
 
-mk_object_key(BufferMap, #{action := Name, key := Template}) ->
-    emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}).
+mk_object_key(BufferMap, #{action := AggregId, key := Template}) ->
+    emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}).
 
 process_append(Writes, Upload0) ->
     {ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
@@ -443,9 +450,9 @@ process_terminate(Upload) ->
 
 -spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
     {ok, integer() | string()} | {error, undefined}.
-lookup([<<"action">>], {Name, _Buffer}) ->
+lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) ->
     {ok, mk_fs_safe_string(Name)};
-lookup(Accessor, {_Name, Buffer = #{}}) ->
+lookup(Accessor, {_AggregId, Buffer = #{}}) ->
     lookup_buffer_var(Accessor, Buffer);
 lookup(_Accessor, _Context) ->
     {error, undefined}.

+ 54 - 0
apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl

@@ -0,0 +1,54 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_s3_sup).
+
+%% API
+-export([
+    start_link/0,
+    start_child/1,
+    delete_child/1
+]).
+
+%% `supervisor' API
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_child(ChildSpec) ->
+    supervisor:start_child(?MODULE, ChildSpec).
+
+delete_child(ChildId) ->
+    case supervisor:terminate_child(?MODULE, ChildId) of
+        ok ->
+            supervisor:delete_child(?MODULE, ChildId);
+        Error ->
+            Error
+    end.
+
+%%------------------------------------------------------------------------------
+%% `supervisor' API
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 1,
+        period => 1
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------

+ 19 - 10
apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl

@@ -158,6 +158,7 @@ t_on_get_status(Config) ->
 t_aggreg_upload(Config) ->
     Bucket = ?config(s3_bucket, Config),
     BridgeName = ?config(bridge_name, Config),
+    AggregId = aggreg_id(BridgeName),
     BridgeNameString = unicode:characters_to_list(BridgeName),
     NodeString = atom_to_list(node()),
     %% Create a bridge with the sample configuration.
@@ -170,7 +171,7 @@ t_aggreg_upload(Config) ->
     ]),
     ok = send_messages(BridgeName, MessageEvents),
     %% Wait until the delivery is completed.
-    ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
+    ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
     %% Check the uploaded objects.
     _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
     ?assertMatch(
@@ -196,6 +197,7 @@ t_aggreg_upload(Config) ->
 t_aggreg_upload_rule(Config) ->
     Bucket = ?config(s3_bucket, Config),
     BridgeName = ?config(bridge_name, Config),
+    AggregId = aggreg_id(BridgeName),
     ClientID = emqx_utils_conv:bin(?FUNCTION_NAME),
     %% Create a bridge with the sample configuration and a simple SQL rule.
     ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@@ -217,7 +219,7 @@ t_aggreg_upload_rule(Config) ->
         emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
         emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
     ]),
-    ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
+    ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
     %% Check the uploaded objects.
     _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
     _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
@@ -249,6 +251,7 @@ t_aggreg_upload_restart(Config) ->
     %% after a restart.
     Bucket = ?config(s3_bucket, Config),
     BridgeName = ?config(bridge_name, Config),
+    AggregId = aggreg_id(BridgeName),
     %% Create a bridge with the sample configuration.
     ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
     %% Send some sample messages that look like Rule SQL productions.
@@ -258,15 +261,15 @@ t_aggreg_upload_restart(Config) ->
         {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
     ]),
     ok = send_messages(BridgeName, MessageEvents),
-    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
+    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
     %% Restart the bridge.
     {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
     {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
     %% Send some more messages.
     ok = send_messages(BridgeName, MessageEvents),
-    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
+    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
     %% Wait until the delivery is completed.
-    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
+    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
     %% Check there's still only one upload.
     _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
     _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
@@ -289,6 +292,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
     %% and does so while preserving uncompromised data.
     Bucket = ?config(s3_bucket, Config),
     BridgeName = ?config(bridge_name, Config),
+    AggregId = aggreg_id(BridgeName),
     BatchSize = ?CONF_MAX_RECORDS div 2,
     %% Create a bridge with the sample configuration.
     ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@@ -300,13 +304,13 @@ t_aggreg_upload_restart_corrupted(Config) ->
     %% Ensure that they span multiple batch queries.
     ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
     {ok, _} = ?block_until(
-        #{?snk_kind := connector_aggreg_records_written, action := BridgeName},
+        #{?snk_kind := connector_aggreg_records_written, action := AggregId},
         infinity,
         0
     ),
     %% Find out the buffer file.
     {ok, #{filename := Filename}} = ?block_until(
-        #{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName}
+        #{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId}
     ),
     %% Stop the bridge, corrupt the buffer file, and restart the bridge.
     {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
@@ -320,7 +324,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
     ],
     ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
     %% Wait until the delivery is completed.
-    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
+    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
     %% Check that upload contains part of the first batch and all of the second batch.
     _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
     CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@@ -341,6 +345,7 @@ t_aggreg_pending_upload_restart(Config) ->
     %% a restart.
     Bucket = ?config(s3_bucket, Config),
     BridgeName = ?config(bridge_name, Config),
+    AggregId = aggreg_id(BridgeName),
     %% Create a bridge with the sample configuration.
     ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
     %% Send few large messages that will require multipart upload.
@@ -362,7 +367,7 @@ t_aggreg_pending_upload_restart(Config) ->
     %% Restart the bridge.
     {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
     %% Wait until the delivery is completed.
-    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
+    {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
     %% Check that delivery contains all the messages.
     _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
     [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@@ -377,6 +382,7 @@ t_aggreg_next_rotate(Config) ->
     %% and windowing work correctly under high rate, high concurrency conditions.
     Bucket = ?config(s3_bucket, Config),
     BridgeName = ?config(bridge_name, Config),
+    AggregId = aggreg_id(BridgeName),
     NSenders = 4,
     %% Create a bridge with the sample configuration.
     ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@@ -393,7 +399,7 @@ t_aggreg_next_rotate(Config) ->
     %% Wait for the last delivery to complete.
     ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
     ?block_until(
-        #{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0
+        #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}, infinity, 0
     ),
     %% There should be at least 2 time windows of aggregated records.
     Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
@@ -465,3 +471,6 @@ fetch_parse_csv(Bucket, Key) ->
     #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
     {ok, CSV} = erl_csv:decode(Content),
     CSV.
+
+aggreg_id(BridgeName) ->
+    {?BRIDGE_TYPE, BridgeName}.

+ 4 - 4
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl

@@ -213,9 +213,9 @@ on_get_status(_InstanceId, #{pool_name := Pool, ack_timeout := AckTimeout}) ->
     ),
     status_result(Health).
 
-status_result(true) -> connected;
-status_result(false) -> connecting;
-status_result({error, _}) -> connecting.
+status_result(true) -> ?status_connected;
+status_result(false) -> ?status_disconnected;
+status_result({error, _}) -> ?status_disconnected.
 
 on_add_channel(
     _InstanceId,
@@ -251,7 +251,7 @@ on_get_channels(InstanceId) ->
 on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
     case maps:is_key(ChannelId, Channels) of
         true ->
-            connected;
+            ?status_connected;
         _ ->
             {error, not_exists}
     end.

+ 1 - 1
apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl

@@ -347,7 +347,7 @@ t_get_status(Config) ->
         _Sleep = 500,
         _Attempts = 10,
         ?assertMatch(
-            #{status := connecting},
+            #{status := disconnected},
             emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME)
         )
     ).

+ 15 - 13
apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl

@@ -25,7 +25,7 @@
 ]).
 
 -record(delivery, {
-    name :: _Name,
+    id :: id(),
     callback_module :: module(),
     container :: emqx_connector_aggreg_csv:container(),
     reader :: emqx_connector_aggreg_buffer:reader(),
@@ -33,6 +33,8 @@
     empty :: boolean()
 }).
 
+-type id() :: term().
+
 -type state() :: #delivery{}.
 
 -type init_opts() :: #{
@@ -59,22 +61,22 @@
 
 %%
 
-start_link(Name, Buffer, Opts) ->
-    proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
+start_link(Id, Buffer, Opts) ->
+    proc_lib:start_link(?MODULE, init, [self(), Id, Buffer, Opts]).
 
 %%
 
--spec init(pid(), _Name, buffer(), init_opts()) -> no_return().
-init(Parent, Name, Buffer, Opts) ->
-    ?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
+-spec init(pid(), id(), buffer(), init_opts()) -> no_return().
+init(Parent, Id, Buffer, Opts) ->
+    ?tp(connector_aggreg_delivery_started, #{action => Id, buffer => Buffer}),
     Reader = open_buffer(Buffer),
-    Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
+    Delivery = init_delivery(Id, Reader, Buffer, Opts#{action => Id}),
     _ = erlang:process_flag(trap_exit, true),
     ok = proc_lib:init_ack({ok, self()}),
     loop(Delivery, Parent, []).
 
 init_delivery(
-    Name,
+    Id,
     Reader,
     Buffer,
     Opts = #{
@@ -84,7 +86,7 @@ init_delivery(
 ) ->
     BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
     #delivery{
-        name = Name,
+        id = Id,
         callback_module = Mod,
         container = mk_container(ContainerOpts),
         reader = Reader,
@@ -158,16 +160,16 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0})
             error({transfer_failed, Reason})
     end.
 
-process_complete(#delivery{name = Name, empty = true}) ->
-    ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}),
+process_complete(#delivery{id = Id, empty = true}) ->
+    ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => empty}),
     exit({shutdown, {skipped, empty}});
 process_complete(#delivery{
-    name = Name, callback_module = Mod, container = Container, transfer = Transfer0
+    id = Id, callback_module = Mod, container = Container, transfer = Transfer0
 }) ->
     Trailer = emqx_connector_aggreg_csv:close(Container),
     Transfer = Mod:process_append(Trailer, Transfer0),
     {ok, Completed} = Mod:process_complete(Transfer),
-    ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}),
+    ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
     ok.
 
 %%

+ 0 - 42
apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl

@@ -1,42 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
-
--module(emqx_connector_aggreg_sup).
-
--export([
-    start_link/0,
-    start_child/1,
-    delete_child/1
-]).
-
--behaviour(supervisor).
--export([init/1]).
-
--define(SUPREF, ?MODULE).
-
-%%
-
-start_link() ->
-    supervisor:start_link({local, ?SUPREF}, ?MODULE, root).
-
-start_child(ChildSpec) ->
-    supervisor:start_child(?SUPREF, ChildSpec).
-
-delete_child(ChildId) ->
-    case supervisor:terminate_child(?SUPREF, ChildId) of
-        ok ->
-            supervisor:delete_child(?SUPREF, ChildId);
-        Error ->
-            Error
-    end.
-
-%%
-
-init(root) ->
-    SupFlags = #{
-        strategy => one_for_one,
-        intensity => 1,
-        period => 1
-    },
-    {ok, {SupFlags, []}}.

+ 0 - 1
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src

@@ -7,7 +7,6 @@
         stdlib
     ]},
     {env, []},
-    {mod, {emqx_connector_aggreg_app, []}},
     {modules, []},
     {links, []}
 ]}.

+ 3 - 2
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -810,6 +810,7 @@ listener_id_status_example() ->
 
 tcp_schema_example() ->
     #{
+        type => tcp,
         acceptors => 16,
         access_rules => ["allow all"],
         bind => <<"0.0.0.0:1884">>,
@@ -820,6 +821,7 @@ tcp_schema_example() ->
         proxy_protocol => false,
         proxy_protocol_timeout => <<"3s">>,
         running => true,
+        zone => default,
         tcp_options => #{
             active_n => 100,
             backlog => 1024,
@@ -829,8 +831,7 @@ tcp_schema_example() ->
             reuseaddr => true,
             send_timeout => <<"15s">>,
             send_timeout_close => true
-        },
-        type => tcp
+        }
     }.
 
 create_listener(From, Body) ->

+ 24 - 2
apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl

@@ -36,9 +36,12 @@ groups() ->
     MaxConnTests = [
         t_max_connection_default
     ],
+    ZoneTests = [
+        t_update_listener_zone
+    ],
     [
         {with_defaults_in_file, AllTests -- MaxConnTests},
-        {without_defaults_in_file, AllTests -- MaxConnTests},
+        {without_defaults_in_file, AllTests -- (MaxConnTests ++ ZoneTests)},
         {max_connections, MaxConnTests}
     ].
 
@@ -403,6 +406,21 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port
     ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(delete, NewPath, [], [])),
     ok.
 
+t_update_listener_zone({init, Config}) ->
+    %% fake a zone
+    Config;
+t_update_listener_zone({'end', _Config}) ->
+    ok;
+t_update_listener_zone(_Config) ->
+    ListenerId = <<"tcp:default">>,
+    Path = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
+    Conf = request(get, Path, [], []),
+    %% update
+    AddConf1 = Conf#{<<"zone">> => <<"unknownzone">>},
+    AddConf2 = Conf#{<<"zone">> => <<"zone1">>},
+    ?assertMatch({error, {_, 400, _}}, request(put, Path, [], AddConf1)),
+    ?assertMatch(#{<<"zone">> := <<"zone1">>}, request(put, Path, [], AddConf2)).
+
 t_delete_nonexistent_listener(Config) when is_list(Config) ->
     NonExist = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:nonexistent"]),
     ?assertMatch(
@@ -518,5 +536,9 @@ cert_file(Name) ->
 default_listeners_hocon_text() ->
     Sc = #{roots => emqx_schema:listeners()},
     Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}),
-    Config = #{<<"listeners">> => Listeners},
+    Zones = #{<<"zone1">> => #{<<"mqtt">> => #{<<"max_inflight">> => 2}}},
+    Config = #{
+        <<"listeners">> => Listeners,
+        <<"zones">> => Zones
+    },
     hocon_pp:do(Config, #{}).

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.22"},
+    {vsn, "5.0.23"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx, emqx_ctl]},

+ 3 - 1
apps/emqx_retainer/src/emqx_retainer.erl

@@ -47,6 +47,7 @@
     retained_count/0,
     backend_module/0,
     backend_module/1,
+    backend_state/1,
     enabled/0
 ]).
 
@@ -103,6 +104,7 @@
 -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
     {ok, has_next(), list(message())}.
 -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}.
+-callback delete_cursor(backend_state(), cursor()) -> ok.
 -callback clear_expired(backend_state()) -> ok.
 -callback clean(backend_state()) -> ok.
 -callback size(backend_state()) -> non_neg_integer().
@@ -339,7 +341,7 @@ count(Context) ->
 clear_expired(Context) ->
     Mod = backend_module(Context),
     BackendState = backend_state(Context),
-    Mod:clear_expired(BackendState).
+    ok = Mod:clear_expired(BackendState).
 
 -spec store_retained(context(), message()) -> ok.
 store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->

+ 104 - 59
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -46,15 +46,26 @@
 -type limiter() :: emqx_htb_limiter:limiter().
 -type context() :: emqx_retainer:context().
 -type topic() :: emqx_types:topic().
--type cursor() :: emqx_retainer:cursor().
 
 -define(POOL, ?MODULE).
 
+%% For tests
+-export([
+    dispatch/3
+]).
+
+%% This module is `emqx_retainer` companion
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
 %%%===================================================================
 %%% API
 %%%===================================================================
+
 dispatch(Context, Topic) ->
-    cast({?FUNCTION_NAME, Context, self(), Topic}).
+    dispatch(Context, Topic, self()).
+
+dispatch(Context, Topic, Pid) ->
+    cast({dispatch, Context, Pid, Topic}).
 
 %% reset the client's limiter after updated the limiter's config
 refresh_limiter() ->
@@ -156,7 +167,7 @@ handle_call(Req, _From, State) ->
     | {noreply, NewState :: term(), hibernate}
     | {stop, Reason :: term(), NewState :: term()}.
 handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
-    {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
+    {ok, Limiter2} = dispatch(Context, Pid, Topic, Limiter),
     {noreply, State#{limiter := Limiter2}};
 handle_cast({refresh_limiter, Conf}, State) ->
     BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
@@ -234,86 +245,120 @@ format_status(_Opt, Status) ->
 cast(Msg) ->
     gen_server:cast(worker(), Msg).
 
--spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
-dispatch(Context, Pid, Topic, Cursor, Limiter) ->
+-spec dispatch(context(), pid(), topic(), limiter()) -> {ok, limiter()}.
+dispatch(Context, Pid, Topic, Limiter) ->
     Mod = emqx_retainer:backend_module(Context),
-    case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
-        false ->
-            {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
-            deliver(Result, Context, Pid, Topic, undefined, Limiter);
+    State = emqx_retainer:backend_state(Context),
+    case emqx_topic:wildcard(Topic) of
         true ->
-            {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]),
-            deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
+            {ok, Messages, Cursor} = Mod:match_messages(State, Topic, undefined),
+            dispatch_with_cursor(Context, Messages, Cursor, Pid, Topic, Limiter);
+        false ->
+            {ok, Messages} = Mod:read_message(State, Topic),
+            dispatch_at_once(Messages, Pid, Topic, Limiter)
+    end.
+
+dispatch_at_once(Messages, Pid, Topic, Limiter0) ->
+    case deliver(Messages, Pid, Topic, Limiter0) of
+        {ok, Limiter1} ->
+            {ok, Limiter1};
+        {drop, Limiter1} ->
+            {ok, Limiter1};
+        no_receiver ->
+            ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
+            {ok, Limiter0}
     end.
 
--spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) ->
-    {ok, limiter()}.
-deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
+dispatch_with_cursor(Context, [], Cursor, _Pid, _Topic, Limiter) ->
+    ok = delete_cursor(Context, Cursor),
     {ok, Limiter};
-deliver([], Context, Pid, Topic, Cursor, Limiter) ->
-    dispatch(Context, Pid, Topic, Cursor, Limiter);
-deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
+dispatch_with_cursor(Context, Messages0, Cursor0, Pid, Topic, Limiter0) ->
+    case deliver(Messages0, Pid, Topic, Limiter0) of
+        {ok, Limiter1} ->
+            {ok, Messages1, Cursor1} = match_next(Context, Topic, Cursor0),
+            dispatch_with_cursor(Context, Messages1, Cursor1, Pid, Topic, Limiter1);
+        {drop, Limiter1} ->
+            ok = delete_cursor(Context, Cursor0),
+            {ok, Limiter1};
+        no_receiver ->
+            ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
+            ok = delete_cursor(Context, Cursor0),
+            {ok, Limiter0}
+    end.
+
+match_next(_Context, _Topic, undefined) ->
+    {ok, [], undefined};
+match_next(Context, Topic, Cursor) ->
+    Mod = emqx_retainer:backend_module(Context),
+    State = emqx_retainer:backend_state(Context),
+    Mod:match_messages(State, Topic, Cursor).
+
+delete_cursor(_Context, undefined) ->
+    ok;
+delete_cursor(Context, Cursor) ->
+    Mod = emqx_retainer:backend_module(Context),
+    State = emqx_retainer:backend_state(Context),
+    Mod:delete_cursor(State, Cursor).
+
+-spec deliver([emqx_types:message()], pid(), topic(), limiter()) ->
+    {ok, limiter()} | {drop, limiter()} | no_receiver.
+deliver(Messages, Pid, Topic, Limiter) ->
     case erlang:is_process_alive(Pid) of
         false ->
-            {ok, Limiter};
+            no_receiver;
         _ ->
-            DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
-            case DeliverNum of
+            BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
+            case BatchSize of
                 0 ->
-                    do_deliver(Result, Pid, Topic),
+                    deliver_to_client(Messages, Pid, Topic),
                     {ok, Limiter};
                 _ ->
-                    case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of
-                        {ok, Limiter2} ->
-                            deliver([], Context, Pid, Topic, Cursor, Limiter2);
-                        {drop, Limiter2} ->
-                            {ok, Limiter2}
-                    end
+                    deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter)
             end
     end.
 
-do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
+deliver_in_batches([], _BatchSize, _Pid, _Topic, Limiter) ->
     {ok, Limiter};
-do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
-    {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs),
-    case emqx_htb_limiter:consume(Num, Limiter) of
-        {ok, Limiter2} ->
-            do_deliver(ToDelivers, Pid, Topic),
-            do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
-        {drop, _} = Drop ->
+deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
+    {BatchActualSize, Batch, RestMsgs} = take(BatchSize, Msgs),
+    case emqx_htb_limiter:consume(BatchActualSize, Limiter0) of
+        {ok, Limiter1} ->
+            ok = deliver_to_client(Batch, Pid, Topic),
+            deliver_in_batches(RestMsgs, BatchSize, Pid, Topic, Limiter1);
+        {drop, _Limiter1} = Drop ->
             ?SLOG(debug, #{
                 msg => "retained_message_dropped",
                 reason => "reached_ratelimit",
-                dropped_count => length(ToDelivers)
+                dropped_count => BatchActualSize
             }),
             Drop
     end.
 
-do_deliver([Msg | T], Pid, Topic) ->
-    case emqx_banned:look_up({clientid, Msg#message.from}) of
-        [] ->
-            Pid ! {deliver, Topic, Msg},
-            ok;
-        _ ->
-            ?tp(
-                notice,
-                ignore_retained_message_deliver,
-                #{
-                    reason => "client is banned",
-                    clientid => Msg#message.from
-                }
-            )
-    end,
-    do_deliver(T, Pid, Topic);
-do_deliver([], _, _) ->
+deliver_to_client([Msg | T], Pid, Topic) ->
+    _ =
+        case emqx_banned:look_up({clientid, Msg#message.from}) of
+            [] ->
+                Pid ! {deliver, Topic, Msg};
+            _ ->
+                ?tp(
+                    notice,
+                    ignore_retained_message_deliver,
+                    #{
+                        reason => "client is banned",
+                        clientid => Msg#message.from
+                    }
+                )
+        end,
+    deliver_to_client(T, Pid, Topic);
+deliver_to_client([], _, _) ->
     ok.
 
-safe_split(N, List) ->
-    safe_split(N, List, 0, []).
+take(N, List) ->
+    take(N, List, 0, []).
 
-safe_split(0, List, Count, Acc) ->
+take(0, List, Count, Acc) ->
     {Count, lists:reverse(Acc), List};
-safe_split(_N, [], Count, Acc) ->
+take(_N, [], Count, Acc) ->
     {Count, lists:reverse(Acc), []};
-safe_split(N, [H | T], Count, Acc) ->
-    safe_split(N - 1, T, Count + 1, [H | Acc]).
+take(N, [H | T], Count, Acc) ->
+    take(N - 1, T, Count + 1, [H | Acc]).

+ 9 - 2
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -35,6 +35,7 @@
     read_message/2,
     page_read/4,
     match_messages/3,
+    delete_cursor/2,
     clear_expired/1,
     clean/1,
     size/1
@@ -205,7 +206,7 @@ delete_message(_State, Topic) ->
 read_message(_State, Topic) ->
     {ok, read_messages(Topic)}.
 
-match_messages(_State, Topic, undefined) ->
+match_messages(State, Topic, undefined) ->
     Tokens = topic_to_tokens(Topic),
     Now = erlang:system_time(millisecond),
     QH = msg_table(search_table(Tokens, Now)),
@@ -214,7 +215,7 @@ match_messages(_State, Topic, undefined) ->
             {ok, qlc:eval(QH), undefined};
         BatchNum when is_integer(BatchNum) ->
             Cursor = qlc:cursor(QH),
-            match_messages(undefined, Topic, {Cursor, BatchNum})
+            match_messages(State, Topic, {Cursor, BatchNum})
     end;
 match_messages(_State, _Topic, {Cursor, BatchNum}) ->
     case qlc_next_answers(Cursor, BatchNum) of
@@ -224,6 +225,11 @@ match_messages(_State, _Topic, {Cursor, BatchNum}) ->
             {ok, Rows, {Cursor, BatchNum}}
     end.
 
+delete_cursor(_State, {Cursor, _}) ->
+    qlc:delete_cursor(Cursor);
+delete_cursor(_State, undefined) ->
+    ok.
+
 page_read(_State, Topic, Page, Limit) ->
     Now = erlang:system_time(millisecond),
     QH =
@@ -562,6 +568,7 @@ reindex(NewIndices, Force, StatusFun) when
 
             %% Fill index records in batches.
             QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
+
             ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
 
             %% Enable read indices and unlock reindexing.

+ 108 - 34
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include("emqx_retainer.hrl").
 
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -96,14 +97,19 @@ end_per_group(_Group, Config) ->
     emqx_retainer_mnesia:populate_index_meta(),
     Config.
 
-init_per_testcase(t_get_basic_usage_info, Config) ->
+init_per_testcase(_TestCase, Config) ->
     mnesia:clear_table(?TAB_INDEX),
     mnesia:clear_table(?TAB_MESSAGE),
     emqx_retainer_mnesia:populate_index_meta(),
-    Config;
-init_per_testcase(_TestCase, Config) ->
     Config.
 
+end_per_testcase(t_flow_control, _Config) ->
+    restore_delivery();
+end_per_testcase(t_cursor_cleanup, _Config) ->
+    restore_delivery();
+end_per_testcase(_TestCase, _Config) ->
+    ok.
+
 app_spec() ->
     {emqx_retainer, ?BASE_CONF}.
 
@@ -405,19 +411,7 @@ t_stop_publish_clear_msg(_) ->
     ok = emqtt:disconnect(C1).
 
 t_flow_control(_) ->
-    Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
-    LimiterCfg = make_limiter_cfg(Rate),
-    JsonCfg = make_limiter_json(<<"1/1s">>),
-    emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
-    emqx_retainer:update_config(#{
-        <<"delivery_rate">> => <<"1/1s">>,
-        <<"flow_control">> =>
-            #{
-                <<"batch_read_number">> => 1,
-                <<"batch_deliver_number">> => 1,
-                <<"batch_deliver_limiter">> => JsonCfg
-            }
-    }),
+    setup_slow_delivery(),
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),
     emqtt:publish(
@@ -442,23 +436,60 @@ t_flow_control(_) ->
     {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
     ?assertEqual(3, length(receive_messages(3))),
     End = erlang:system_time(millisecond),
+
     Diff = End - Begin,
 
     ?assert(
-        Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
+        Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9),
         lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
     ),
 
     ok = emqtt:disconnect(C1),
+    ok.
+
+t_cursor_cleanup(_) ->
+    setup_slow_delivery(),
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C1),
+    lists:foreach(
+        fun(I) ->
+            emqtt:publish(
+                C1,
+                <<"retained/", (integer_to_binary(I))/binary>>,
+                <<"this is a retained message">>,
+                [{qos, 0}, {retain, true}]
+            )
+        end,
+        lists:seq(1, 5)
+    ),
+    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
+
+    snabbkaffe:start_trace(),
+
+    ?assertWaitEvent(
+        emqtt:disconnect(C1),
+        #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>},
+        2000
+    ),
+
+    ?assertEqual(0, qlc_process_count()),
+
+    {Pid, Ref} = spawn_monitor(fun() -> ok end),
+    receive
+        {'DOWN', Ref, _, _, _} -> ok
+    after 1000 -> ct:fail("should receive 'DOWN' message")
+    end,
+
+    ?assertWaitEvent(
+        emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid),
+        #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>},
+        2000
+    ),
+
+    ?assertEqual(0, qlc_process_count()),
+
+    snabbkaffe:stop(),
 
-    emqx_limiter_server:del_bucket(emqx_retainer, internal),
-    emqx_retainer:update_config(#{
-        <<"flow_control">> =>
-            #{
-                <<"batch_read_number">> => 1,
-                <<"batch_deliver_number">> => 1
-            }
-    }),
     ok.
 
 t_clear_expired(_) ->
@@ -849,15 +880,21 @@ with_conf(ConfMod, Case) ->
     end.
 
 make_limiter_cfg(Rate) ->
-    Client = #{
-        rate => Rate,
-        initial => 0,
-        burst => 0,
-        low_watermark => 1,
-        divisible => false,
-        max_retry_time => timer:seconds(5),
-        failure_strategy => force
-    },
+    make_limiter_cfg(Rate, #{}).
+
+make_limiter_cfg(Rate, ClientOpts) ->
+    Client = maps:merge(
+        #{
+            rate => Rate,
+            initial => 0,
+            burst => 0,
+            low_watermark => 1,
+            divisible => false,
+            max_retry_time => timer:seconds(5),
+            failure_strategy => force
+        },
+        ClientOpts
+    ),
     #{client => Client, rate => Rate, initial => 0, burst => 0}.
 
 make_limiter_json(Rate) ->
@@ -909,3 +946,40 @@ do_publish(Client, Topic, Payload, Opts, {sleep, Time}) ->
     Res = emqtt:publish(Client, Topic, Payload, Opts),
     ct:sleep(Time),
     Res.
+
+setup_slow_delivery() ->
+    Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
+    LimiterCfg = make_limiter_cfg(Rate),
+    JsonCfg = make_limiter_json(<<"1/1s">>),
+    emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
+    emqx_retainer:update_config(#{
+        <<"delivery_rate">> => <<"1/1s">>,
+        <<"flow_control">> =>
+            #{
+                <<"batch_read_number">> => 1,
+                <<"batch_deliver_number">> => 1,
+                <<"batch_deliver_limiter">> => JsonCfg
+            }
+    }).
+
+restore_delivery() ->
+    emqx_limiter_server:del_bucket(emqx_retainer, internal),
+    emqx_retainer:update_config(#{
+        <<"flow_control">> =>
+            #{
+                <<"batch_read_number">> => 1,
+                <<"batch_deliver_number">> => 1
+            }
+    }).
+
+qlc_processes() ->
+    lists:filter(
+        fun(Pid) ->
+            {current_function, {qlc, wait_for_request, 3}} =:=
+                erlang:process_info(Pid, current_function)
+        end,
+        erlang:processes()
+    ).
+
+qlc_process_count() ->
+    length(qlc_processes()).

+ 11 - 11
apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl

@@ -744,18 +744,18 @@ t_regex_replace(_) ->
     ?assertEqual(<<"a[cc]b[c]d">>, apply_func(regex_replace, [<<"accbcd">>, <<"c+">>, <<"[&]">>])).
 
 t_unescape(_) ->
-    ?assertEqual(<<"\n">>, emqx_rule_funcs:unescape(<<"\\n">>)),
-    ?assertEqual(<<"\t">>, emqx_rule_funcs:unescape(<<"\\t">>)),
-    ?assertEqual(<<"\r">>, emqx_rule_funcs:unescape(<<"\\r">>)),
-    ?assertEqual(<<"\b">>, emqx_rule_funcs:unescape(<<"\\b">>)),
-    ?assertEqual(<<"\f">>, emqx_rule_funcs:unescape(<<"\\f">>)),
-    ?assertEqual(<<"\v">>, emqx_rule_funcs:unescape(<<"\\v">>)),
-    ?assertEqual(<<"'">>, emqx_rule_funcs:unescape(<<"\\'">>)),
-    ?assertEqual(<<"\"">>, emqx_rule_funcs:unescape(<<"\\\"">>)),
-    ?assertEqual(<<"?">>, emqx_rule_funcs:unescape(<<"\\?">>)),
-    ?assertEqual(<<"\a">>, emqx_rule_funcs:unescape(<<"\\a">>)),
+    ?assertEqual(<<"\n">> = <<10>>, emqx_rule_funcs:unescape(<<"\\n">>)),
+    ?assertEqual(<<"\t">> = <<9>>, emqx_rule_funcs:unescape(<<"\\t">>)),
+    ?assertEqual(<<"\r">> = <<13>>, emqx_rule_funcs:unescape(<<"\\r">>)),
+    ?assertEqual(<<"\b">> = <<8>>, emqx_rule_funcs:unescape(<<"\\b">>)),
+    ?assertEqual(<<"\f">> = <<12>>, emqx_rule_funcs:unescape(<<"\\f">>)),
+    ?assertEqual(<<"\v">> = <<11>>, emqx_rule_funcs:unescape(<<"\\v">>)),
+    ?assertEqual(<<"'">> = <<39>>, emqx_rule_funcs:unescape(<<"\\'">>)),
+    ?assertEqual(<<"\"">> = <<34>>, emqx_rule_funcs:unescape(<<"\\\"">>)),
+    ?assertEqual(<<"?">> = <<63>>, emqx_rule_funcs:unescape(<<"\\?">>)),
+    ?assertEqual(<<7>>, emqx_rule_funcs:unescape(<<"\\a">>)),
     % Test escaping backslash itself
-    ?assertEqual(<<"\\">>, emqx_rule_funcs:unescape(<<"\\\\">>)),
+    ?assertEqual(<<"\\">> = <<92>>, emqx_rule_funcs:unescape(<<"\\\\">>)),
     % Test a string without any escape sequences
     ?assertEqual(<<"Hello, World!">>, emqx_rule_funcs:unescape(<<"Hello, World!">>)),
     % Test a string with escape sequences

+ 2 - 1
apps/emqx_utils/src/emqx_variform_bif.erl

@@ -289,7 +289,8 @@ unescape_string([$\\, $" | Rest], Acc) ->
 unescape_string([$\\, $? | Rest], Acc) ->
     unescape_string(Rest, [$\? | Acc]);
 unescape_string([$\\, $a | Rest], Acc) ->
-    unescape_string(Rest, [$\a | Acc]);
+    %% Terminal bell
+    unescape_string(Rest, [7 | Acc]);
 %% Start of HEX escape code
 unescape_string([$\\, $x | [$0 | _] = HexStringStart], Acc) ->
     unescape_handle_hex_string(HexStringStart, Acc);

+ 5 - 0
changes/ce/fix-12993.en.md

@@ -0,0 +1,5 @@
+Fix listener config update API when handling an unknown zone.
+
+Prior to this fix, when a listener config is updated with an unknown zone, for example `{"zone": "unknown"}`,
+the change would be accepted, causing all clients to crash when connect.
+After this fix, updating listener with an unknown zone name will get a "Bad request" response.

+ 1 - 0
changes/ce/fix-12996.en.md

@@ -0,0 +1 @@
+Fix process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak.

+ 1 - 0
changes/ee/fix-13001.en.md

@@ -0,0 +1 @@
+Fixed an issue where the syskeeper forwarder would never reconnect when the connection was lost.