Pārlūkot izejas kodu

feat: migrate old replayq directories to new structure

Fixes https://emqx.atlassian.net/browse/EMQX-13248
Thales Macedo Garitezi 1 gadu atpakaļ
vecāks
revīzija
25aa0ae3fc

+ 49 - 3
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -32,6 +32,10 @@
     handle_telemetry_event/4
 ]).
 
+-ifdef(TEST).
+-export([replayq_dir/2]).
+-endif.
+
 -include_lib("emqx/include/logger.hrl").
 
 %% Allocatable resources
@@ -159,9 +163,11 @@ create_producers_for_bridge_v2(
     #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
     IsDryRun = emqx_resource:is_dry_run(ActionResId),
     ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
-    WolffProducerConfig = producers_config(
-        BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
-    ),
+    WolffProducerConfig =
+        #{replayq_dir := ReplayqDir} = producers_config(
+            BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
+        ),
+    maybe_migrate_old_replayq_dir(ReplayqDir, ActionResId, TopicType, MKafkaTopic),
     case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
         {ok, Producers} ->
             case TopicType of
@@ -818,6 +824,46 @@ replayq_dir(BridgeType, BridgeName) ->
     ]),
     filename:join([emqx:data_dir(), "kafka", DirName]).
 
+%% new (wolff >= 2.0.0):
+%% Dir = filename:join([BaseDir, PathSegment, integer_to_list(Partition)]),
+%% old:
+%% Dir = filename:join([BaseDir, Topic, integer_to_list(Partition)]),
+maybe_migrate_old_replayq_dir(false, _ActionResId, _TopicType, _Topic) ->
+    ok;
+maybe_migrate_old_replayq_dir(ReplayqDir, ActionResId, fixed = _TopicType, Topic) ->
+    OldWolffDir = filename:join([ReplayqDir, Topic]),
+    maybe
+        true ?= filelib:is_dir(OldWolffDir),
+        {ok, Files} ?= file:list_dir_all(OldWolffDir),
+        %% Each partition number has a sub-directory.
+        PartitionDirs = lists:filtermap(
+            fun(File) ->
+                PartitionDir = filename:join([OldWolffDir, File]),
+                IsDir = filelib:is_dir(PartitionDir),
+                case IsDir andalso string:to_integer(File) of
+                    {_Int, Rest} when Rest =:= ""; Rest =:= <<>> ->
+                        {true, PartitionDir};
+                    _ ->
+                        false
+                end
+            end,
+            Files
+        ),
+        [_ | _] ?= PartitionDirs,
+        NewWolffDir = filename:join([ReplayqDir, <<ActionResId/binary, $_, Topic/binary>>]),
+        ?tp(info, "migrating_old_wolff_dirs", #{
+            action_id => ActionResId,
+            from => OldWolffDir,
+            to => NewWolffDir
+        }),
+        ok = file:rename(OldWolffDir, NewWolffDir),
+        ok
+    else
+        _ -> ok
+    end;
+maybe_migrate_old_replayq_dir(_ReplayqDir, _ActionResId, _TopicType, _Topic) ->
+    ok.
+
 %% To avoid losing queued data on disk, we must use the same directory as the old v1
 %% bridges, if any.  Among the Kafka-based bridges that exist since v1, only Kafka changed
 %% its type name.  Other bridges are either unchanged, or v2-only, and should use their v2

+ 75 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -1278,3 +1278,78 @@ t_disallow_disk_mode_for_dynamic_topic(Config) ->
         )
     ),
     ok.
+
+%% In wolff < 2.0.0, replayq filepath was computed differently than current versions,
+%% after dynamic topics were introduced.  This verifies that we migrate older directories
+%% if we detect them when starting the producer.
+t_migrate_old_replayq_dir(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(
+        connector_config, Config, connector_config_toxiproxy(Config)
+    ),
+    ActionName = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{
+                <<"parameters">> => #{
+                    <<"buffer">> => #{
+                        <<"mode">> => <<"disk">>
+                    }
+                }
+            }
+        )
+    ),
+    #{<<"parameters">> := #{<<"topic">> := Topic}} = ActionConfig,
+    ReplayqDir = emqx_bridge_kafka_impl_producer:replayq_dir(Type, ActionName),
+    OldWolffDir = filename:join([ReplayqDir, Topic]),
+    %% simulate partition sub-directories
+    NumPartitions = 3,
+    OldDirs = lists:map(
+        fun(N) ->
+            filename:join([OldWolffDir, integer_to_binary(N)])
+        end,
+        lists:seq(1, NumPartitions)
+    ),
+    lists:foreach(
+        fun(D) ->
+            ok = filelib:ensure_path(D)
+        end,
+        OldDirs
+    ),
+    ConnectorParams = [
+        {connector_config, ConnectorConfig},
+        {connector_name, ConnectorName},
+        {connector_type, Type}
+    ],
+    ActionParams = [
+        {action_config, ActionConfig},
+        {action_name, ActionName},
+        {action_type, Type}
+    ],
+    {ok, {{_, 201, _}, _, #{}}} =
+        emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+    ?check_trace(
+        begin
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams),
+            %% Old directories have been moved
+            lists:foreach(
+                fun(D) ->
+                    ?assertNot(filelib:is_dir(D))
+                end,
+                OldDirs
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([#{from := OldWolffDir}], ?of_kind("migrating_old_wolff_dirs", Trace)),
+            ok
+        end
+    ),
+    ok.

+ 2 - 0
changes/ee/fix-14015.en.md

@@ -1 +1,3 @@
 Previously, if a Kafka/Confluent/Azure Event Hub Producer action with disk buffering had queued messages and was restarted, the queued messages were not sent until a new message arrived.  For actions that have a fixed topic (i.e., the topic does not contain any placeholders), this was fixed.
+
+Prior to EMQX 5.7.2, when using a Kafka/Confluent/Azure Event Hub Producer action with disk buffering, its files were stored in a different directory structure.  Now, when starting such an action, if an old disk buffer directory is detected, it'll be renamed to the newer structure to avoid losing data.