Просмотр исходного кода

Merge pull request #14015 from thalesmg/20241017-m-kafka-recover-disk-static

fix(kafka producer): kick off recovery from disk for fixed topics, migrate old replayq directories
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
f71886b9f2

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_azure_event_hub, [
     {description, "EMQX Enterprise Azure Event Hub Bridge"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -419,7 +419,7 @@ bridge_v2_overrides() ->
         parameters =>
             mk(ref(producer_kafka_opts), #{
                 required => true,
-                validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
+                validator => fun emqx_bridge_kafka:producer_parameters_validator/1
             }),
         ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
         type => mk(

+ 13 - 0
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl

@@ -410,3 +410,16 @@ t_dynamic_topics(Config) ->
             ]
         ),
     ok.
+
+t_disallow_disk_mode_for_dynamic_topic(Config) ->
+    ActionConfig = ?config(action_config, Config),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?BRIDGE_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
+    ok.

+ 1 - 1
apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_confluent, [
     {description, "EMQX Enterprise Confluent Connector and Action"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl

@@ -342,7 +342,7 @@ bridge_v2_overrides() ->
         parameters =>
             mk(ref(producer_kafka_opts), #{
                 required => true,
-                validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
+                validator => fun emqx_bridge_kafka:producer_parameters_validator/1
             }),
         ssl => mk(ref(ssl_client_opts), #{
             default => #{

+ 13 - 0
apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl

@@ -419,3 +419,16 @@ t_dynamic_topics(Config) ->
             ]
         ),
     ok.
+
+t_disallow_disk_mode_for_dynamic_topic(Config) ->
+    ActionConfig = ?config(action_config, Config),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?ACTION_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
+    ok.

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.5.0"},
+    {vsn, "0.5.1"},
     {registered, [emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 27 - 10
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -3,19 +3,14 @@
 %%--------------------------------------------------------------------
 -module(emqx_bridge_kafka).
 
+-feature(maybe_expr, enable).
+
 -behaviour(emqx_connector_examples).
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
-%% allow atoms like scram_sha_256 and scram_sha_512
-%% i.e. the _256 part does not start with a-z
--elvis([
-    {elvis_style, atom_naming_convention, #{
-        regex => "^([a-z][a-z0-9]*_?)([a-z0-9]*_?)*$",
-        enclosed_atoms => ".*"
-    }}
-]).
+-elvis([{elvis_style, atom_naming_convention, disable}]).
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
 -export([
@@ -40,7 +35,9 @@
 -export([
     kafka_connector_config_fields/0,
     kafka_producer_converter/2,
-    producer_strategy_key_validator/1
+    producer_strategy_key_validator/1,
+    producer_buffer_mode_validator/1,
+    producer_parameters_validator/1
 ]).
 
 -define(CONNECTOR_TYPE, kafka_producer).
@@ -721,7 +718,7 @@ parameters_field(ActionOrBridgeV1) ->
             required => true,
             aliases => [Alias],
             desc => ?DESC(producer_kafka_opts),
-            validator => fun producer_strategy_key_validator/1
+            validator => fun producer_parameters_validator/1
         })}.
 
 %% -------------------------------------------------------------------------------------------------
@@ -781,6 +778,26 @@ consumer_topic_mapping_validator(TopicMapping0 = [_ | _]) ->
             {error, "Kafka topics must not be repeated in a bridge"}
     end.
 
+producer_parameters_validator(Conf) ->
+    maybe
+        ok ?= producer_strategy_key_validator(Conf),
+        ok ?= producer_buffer_mode_validator(Conf)
+    end.
+
+producer_buffer_mode_validator(#{buffer := _} = Conf) ->
+    producer_buffer_mode_validator(emqx_utils_maps:binary_key_map(Conf));
+producer_buffer_mode_validator(#{<<"buffer">> := #{<<"mode">> := disk}, <<"topic">> := Topic}) ->
+    Template = emqx_template:parse(Topic),
+    case emqx_template:placeholders(Template) of
+        [] ->
+            ok;
+        [_ | _] ->
+            {error, <<"disk-mode buffering is disallowed when using dynamic topics">>}
+    end;
+producer_buffer_mode_validator(_) ->
+    %% `buffer' field is not required
+    ok.
+
 producer_strategy_key_validator(
     #{
         partition_strategy := _,

+ 80 - 3
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -32,6 +32,10 @@
     handle_telemetry_event/4
 ]).
 
+-ifdef(TEST).
+-export([replayq_dir/2]).
+-endif.
+
 -include_lib("emqx/include/logger.hrl").
 
 %% Allocatable resources
@@ -159,11 +163,30 @@ create_producers_for_bridge_v2(
     #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
     IsDryRun = emqx_resource:is_dry_run(ActionResId),
     ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
-    WolffProducerConfig = producers_config(
-        BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
-    ),
+    WolffProducerConfig =
+        #{replayq_dir := ReplayqDir} = producers_config(
+            BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
+        ),
+    maybe_migrate_old_replayq_dir(ReplayqDir, ActionResId, TopicType, MKafkaTopic),
     case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
         {ok, Producers} ->
+            case add_fixed_topic(TopicType, MKafkaTopic, Producers) of
+                ok ->
+                    ok;
+                {error, Reason} ->
+                    ?SLOG(error, #{
+                        msg => "kafka_producer_failed_to_add_fixed_topic",
+                        instance_id => ConnResId,
+                        kafka_client_id => ClientId,
+                        kafka_topic => MKafkaTopic,
+                        reason => Reason
+                    }),
+                    wolff:stop_and_delete_supervised_producers(Producers),
+                    throw(
+                        "Failed to start producers. Please check the logs for errors and check"
+                        " the configuration parameters."
+                    )
+            end,
             ok = emqx_resource:allocate_resource(
                 ConnResId, {?kafka_producers, ActionResId}, Producers
             ),
@@ -811,6 +834,60 @@ replayq_dir(BridgeType, BridgeName) ->
     ]),
     filename:join([emqx:data_dir(), "kafka", DirName]).
 
+%% new (wolff >= 2.0.0):
+%% Dir = filename:join([BaseDir, PathSegment, integer_to_list(Partition)]),
+%% old:
+%% Dir = filename:join([BaseDir, Topic, integer_to_list(Partition)]),
+maybe_migrate_old_replayq_dir(false, _ActionResId, _TopicType, _Topic) ->
+    ok;
+maybe_migrate_old_replayq_dir(ReplayqDir, ActionResId, fixed = _TopicType, Topic) ->
+    OldWolffDir = filename:join([ReplayqDir, Topic]),
+    maybe
+        true ?= is_old_replayq_dir(OldWolffDir),
+        NewWolffDir = filename:join([ReplayqDir, <<ActionResId/binary, $_, Topic/binary>>]),
+        ?tp(info, "migrating_old_wolff_dirs", #{
+            action_id => ActionResId,
+            from => OldWolffDir,
+            to => NewWolffDir
+        }),
+        ok = file:rename(OldWolffDir, NewWolffDir),
+        ok
+    else
+        _ -> ok
+    end;
+maybe_migrate_old_replayq_dir(_ReplayqDir, _ActionResId, _TopicType, _Topic) ->
+    ok.
+
+%% new (wolff >= 2.0.0):
+is_old_replayq_dir(OldWolffDir) ->
+    maybe
+        true ?= filelib:is_dir(OldWolffDir),
+        {ok, Files} ?= file:list_dir_all(OldWolffDir),
+        %% Each partition number has a sub-directory.
+        PartitionDirs = lists:filtermap(
+            fun(File) ->
+                PartitionDir = filename:join([OldWolffDir, File]),
+                IsDir = filelib:is_dir(PartitionDir),
+                case IsDir andalso string:to_integer(File) of
+                    {_Int, Rest} when Rest =:= ""; Rest =:= <<>> ->
+                        {true, PartitionDir};
+                    _ ->
+                        false
+                end
+            end,
+            Files
+        ),
+        [_ | _] ?= PartitionDirs,
+        true
+    else
+        _ -> false
+    end.
+
+add_fixed_topic(fixed, Topic, Producers) ->
+    wolff:add_topic(Producers, Topic);
+add_fixed_topic(_TopicType, _TopicTemplate, _Producers) ->
+    ok.
+
 %% To avoid losing queued data on disk, we must use the same directory as the old v1
 %% bridges, if any.  Among the Kafka-based bridges that exist since v1, only Kafka changed
 %% its type name.  Other bridges are either unchanged, or v2-only, and should use their v2

+ 276 - 17
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -28,6 +28,7 @@
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 -define(TYPE, kafka_producer).
+-define(TELEMETRY_PREFIX, emqx, resource).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -186,6 +187,10 @@ check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) ->
     {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
     ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
 
+fetch_since(Hosts, KafkaTopic, Partition, Offset) ->
+    {ok, {_, Msgs}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
+    Msgs.
+
 ensure_kafka_topic(KafkaTopic) ->
     TopicConfigs = [
         #{
@@ -251,6 +256,11 @@ bridge_v2_config(ConnectorName, KafkaTopic) ->
         }
     }.
 
+connector_config_toxiproxy(Config) ->
+    BootstrapHosts = toxiproxy_bootstrap_hosts(Config),
+    Overrides = #{<<"bootstrap_hosts">> => BootstrapHosts},
+    connector_config(Overrides).
+
 connector_config() ->
     connector_config(_Overrides = #{}).
 
@@ -289,6 +299,13 @@ kafka_hosts_string() ->
     KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
     KafkaHost ++ ":" ++ KafkaPort.
 
+toxiproxy_bootstrap_hosts(Config) ->
+    Host = ?config(kafka_host, Config),
+    %% assert
+    "toxiproxy" ++ _ = Host,
+    Port = ?config(kafka_port, Config),
+    iolist_to_binary([Host, ":", integer_to_binary(Port)]).
+
 create_connector(Name, Config) ->
     Res = emqx_connector:create(?TYPE, Name, Config),
     on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
@@ -330,6 +347,27 @@ assert_status_api(Line, Type, Name, Status) ->
 get_rule_metrics(RuleId) ->
     emqx_metrics_worker:get_metrics(rule_metrics, RuleId).
 
+tap_telemetry(HandlerId) ->
+    TestPid = self(),
+    telemetry:attach_many(
+        HandlerId,
+        emqx_resource_metrics:events(),
+        fun(EventName, Measurements, Metadata, _Config) ->
+            Data = #{
+                name => EventName,
+                measurements => Measurements,
+                metadata => Metadata
+            },
+            TestPid ! {telemetry, Data},
+            ok
+        end,
+        unused_config
+    ),
+    on_exit(fun() -> telemetry:detach(HandlerId) end),
+    ok.
+
+-define(tapTelemetry(), tap_telemetry(?FUNCTION_NAME)).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1028,23 +1066,7 @@ t_dynamic_topics(Config) ->
             ),
             ?assertStatusAPI(Type, ActionName, <<"connected">>),
 
-            HandlerId = ?FUNCTION_NAME,
-            TestPid = self(),
-            telemetry:attach_many(
-                HandlerId,
-                emqx_resource_metrics:events(),
-                fun(EventName, Measurements, Metadata, _Config) ->
-                    Data = #{
-                        name => EventName,
-                        measurements => Measurements,
-                        metadata => Metadata
-                    },
-                    TestPid ! {telemetry, Data},
-                    ok
-                end,
-                unused_config
-            ),
-            on_exit(fun() -> telemetry:detach(HandlerId) end),
+            ?tapTelemetry(),
 
             {ok, C} = emqtt:start_link(#{}),
             {ok, _} = emqtt:connect(C),
@@ -1094,3 +1116,240 @@ t_dynamic_topics(Config) ->
         []
     ),
     ok.
+
+%% Checks that messages accumulated in disk mode for a fixed topic producer are kicked off
+%% when the action is later restarted and kafka is online.
+t_fixed_topic_recovers_in_disk_mode(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(
+        connector_config, Config, connector_config_toxiproxy(Config)
+    ),
+    ActionName = <<"fixed_topic_disk_recover">>,
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{
+                <<"parameters">> => #{
+                    <<"query_mode">> => <<"async">>,
+                    <<"buffer">> => #{
+                        <<"mode">> => <<"disk">>
+                    }
+                }
+            }
+        )
+    ),
+    Topic = emqx_utils_maps:deep_get([<<"parameters">>, <<"topic">>], ActionConfig),
+    Hosts = kpro:parse_endpoints(
+        binary_to_list(maps:get(<<"bootstrap_hosts">>, ConnectorConfig))
+    ),
+    ?check_trace(
+        #{timetrap => 7_000},
+        begin
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionParams = [
+                {action_config, ActionConfig},
+                {action_name, ActionName},
+                {action_type, Type}
+            ],
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams),
+            RuleTopic = <<"fixed/disk/recover">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
+                Type,
+                RuleTopic,
+                [
+                    {bridge_name, ActionName}
+                ]
+            ),
+            %% Cut connection to kafka and enqueue some messages
+            ActionId = emqx_bridge_v2:id(Type, ActionName),
+            SentMessages =
+                emqx_common_test_helpers:with_failure(
+                    down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                        ct:sleep(100),
+                        SentMessages = [send_message(ActionName) || _ <- lists:seq(1, 5)],
+                        ?assertEqual(5, emqx_resource_metrics:matched_get(ActionId)),
+                        ?retry(
+                            _Sleep = 200,
+                            _Attempts = 20,
+                            ?assertEqual(5, emqx_resource_metrics:queuing_get(ActionId))
+                        ),
+                        ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
+                        %% Turn off action, restore kafka connection
+                        ?assertMatch(
+                            {204, _},
+                            emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName)
+                        ),
+                        SentMessages
+                    end
+                ),
+            %% Restart action; should've shot enqueued messages
+            ?tapTelemetry(),
+            ?assertMatch(
+                {204, _},
+                emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName)
+            ),
+            ?assertReceive(
+                {telemetry, #{
+                    name := [?TELEMETRY_PREFIX, inflight],
+                    measurements := #{gauge_set := 0}
+                }}
+            ),
+            %% Success metrics are not bumped because wolff does not store callbacks in
+            %% disk.
+            ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
+            ?retry(
+                _Sleep1 = 200,
+                _Attempts1 = 20,
+                ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId))
+            ),
+            [#{offset := Offset} | _] = SentMessages,
+            Partition = 0,
+            Messages = fetch_since(Hosts, Topic, Partition, Offset),
+            ?assertMatch([_, _, _, _, _], Messages),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+%% Verifies that we disallow disk mode when the kafka topic is dynamic.
+t_disallow_disk_mode_for_dynamic_topic(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ActionName = <<"dynamic_topic_disk">>,
+    ActionConfig = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ?assertMatch(
+        #{},
+        emqx_bridge_v2_testlib:parse_and_check(
+            action,
+            Type,
+            ActionName,
+            emqx_utils_maps:deep_merge(
+                ActionConfig,
+                #{
+                    <<"parameters">> => #{
+                        <<"topic">> => <<"dynamic-${.payload.n}">>,
+                        <<"buffer">> => #{
+                            <<"mode">> => <<"hybrid">>
+                        }
+                    }
+                }
+            )
+        )
+    ),
+    ?assertThrow(
+        {_SchemaMod, [
+            #{
+                reason := <<"disk-mode buffering is disallowed when using dynamic topics">>,
+                kind := validation_error
+            }
+        ]},
+        emqx_bridge_v2_testlib:parse_and_check(
+            action,
+            Type,
+            ActionName,
+            emqx_utils_maps:deep_merge(
+                ActionConfig,
+                #{
+                    <<"parameters">> => #{
+                        <<"topic">> => <<"dynamic-${.payload.n}">>,
+                        <<"buffer">> => #{
+                            <<"mode">> => <<"disk">>
+                        }
+                    }
+                }
+            )
+        )
+    ),
+    ok.
+
+%% In wolff < 2.0.0, replayq filepath was computed differently than current versions,
+%% after dynamic topics were introduced.  This verifies that we migrate older directories
+%% if we detect them when starting the producer.
+t_migrate_old_replayq_dir(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(
+        connector_config, Config, connector_config_toxiproxy(Config)
+    ),
+    ActionName = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{
+                <<"parameters">> => #{
+                    <<"buffer">> => #{
+                        <<"mode">> => <<"disk">>
+                    }
+                }
+            }
+        )
+    ),
+    #{<<"parameters">> := #{<<"topic">> := Topic}} = ActionConfig,
+    ReplayqDir = emqx_bridge_kafka_impl_producer:replayq_dir(Type, ActionName),
+    OldWolffDir = filename:join([ReplayqDir, Topic]),
+    %% simulate partition sub-directories
+    NumPartitions = 3,
+    OldDirs = lists:map(
+        fun(N) ->
+            filename:join([OldWolffDir, integer_to_binary(N)])
+        end,
+        lists:seq(1, NumPartitions)
+    ),
+    lists:foreach(
+        fun(D) ->
+            ok = filelib:ensure_path(D)
+        end,
+        OldDirs
+    ),
+    ConnectorParams = [
+        {connector_config, ConnectorConfig},
+        {connector_name, ConnectorName},
+        {connector_type, Type}
+    ],
+    ActionParams = [
+        {action_config, ActionConfig},
+        {action_name, ActionName},
+        {action_type, Type}
+    ],
+    {ok, {{_, 201, _}, _, #{}}} =
+        emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+    ?check_trace(
+        begin
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams),
+            %% Old directories have been moved
+            lists:foreach(
+                fun(D) ->
+                    ?assertNot(filelib:is_dir(D))
+                end,
+                OldDirs
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([#{from := OldWolffDir}], ?of_kind("migrating_old_wolff_dirs", Trace)),
+            ok
+        end
+    ),
+    ok.

+ 1 - 0
apps/emqx_resource/src/emqx_resource.app.src

@@ -10,6 +10,7 @@
         gproc,
         jsx,
         ecpool,
+        replayq,
         emqx,
         telemetry
     ]},

+ 1 - 0
changes/ee/breaking-14015.en.md

@@ -0,0 +1 @@
+Kafka/Confluent/Azure Event Hub Producers with a dynamic topic (i.e., a topic that contains placeholders) no longer support disk buffering.  Only memory and hybrid modes are now supported.

+ 3 - 0
changes/ee/fix-14015.en.md

@@ -0,0 +1,3 @@
+Previously, if a Kafka/Confluent/Azure Event Hub Producer action with disk buffering had queued messages and was restarted, the queued messages were not sent until a new message arrived.  For actions that have a fixed topic (i.e., the topic does not contain any placeholders), this was fixed.
+
+Prior to EMQX 5.7.2, when using a Kafka/Confluent/Azure Event Hub Producer action with disk buffering, its files were stored in a different directory structure.  Now, when starting such an action, if an old disk buffer directory is detected, it'll be renamed to the newer structure to avoid losing data.

+ 1 - 1
mix.exs

@@ -205,7 +205,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:cowboy), do: {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}
   def common_dep(:jsone), do: {:jsone, github: "emqx/jsone", tag: "1.7.1", override: true}
   def common_dep(:ecpool), do: {:ecpool, github: "emqx/ecpool", tag: "0.5.10", override: true}
-  def common_dep(:replayq), do: {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true}
+  def common_dep(:replayq), do: {:replayq, github: "emqx/replayq", tag: "0.3.9", override: true}
   def common_dep(:jsx), do: {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}
   # in conflict by emqtt and hocon
   def common_dep(:getopt), do: {:getopt, "1.0.2", override: true}

+ 1 - 1
rebar.config

@@ -88,7 +88,7 @@
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
     {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.4"}}},
     {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.10"}}},
-    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.8"}}},
+    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.9"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}},
     {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.1"}}},