瀏覽代碼

fix(kafka_producer): handle ancient v1 config when migrating to actions

Fixes https://emqx.atlassian.net/browse/EMQX-12064
Thales Macedo Garitezi 1 年之前
父節點
當前提交
bc5775a988

+ 11 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl

@@ -28,6 +28,17 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
     BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
     emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
 
+bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
+    %% Ancient v1 config, when `kafka' key was wrapped by `producer'
+    BridgeV1Conf1 = emqx_utils_maps:unindent(<<"producer">>, BridgeV1Conf0),
+    BridgeV1Conf =
+        case maps:take(<<"mqtt">>, BridgeV1Conf1) of
+            {#{<<"topic">> := Topic}, BridgeV1Conf2} when is_binary(Topic) ->
+                BridgeV1Conf2#{<<"local_topic">> => Topic};
+            _ ->
+                maps:remove(<<"mqtt">>, BridgeV1Conf1)
+        end,
+    bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName);
 bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
     Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
         BridgeV1Conf, ConnectorName, schema_module(), kafka_producer

+ 2 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

@@ -6,7 +6,8 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
--export([atoms/0]).
+-export([atoms/0, kafka_producer_old_hocon/1]).
+
 %% ensure atoms exist
 atoms() -> [myproducer, my_consumer].
 

+ 60 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -36,6 +36,7 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
+    emqx_common_test_helpers:clear_screen(),
     ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
     ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
     KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "toxiproxy.emqx.net"),
@@ -79,9 +80,22 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(Apps),
     ok.
 
+init_per_testcase(t_ancient_v1_config_migration_with_local_topic = TestCase, Config) ->
+    Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => true}),
+    [{cluster, Cluster} | Config];
+init_per_testcase(t_ancient_v1_config_migration_without_local_topic = TestCase, Config) ->
+    Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => false}),
+    [{cluster, Cluster} | Config];
 init_per_testcase(_TestCase, Config) ->
     Config.
 
+end_per_testcase(TestCase, Config) when
+    TestCase =:= t_ancient_v1_config_migration_with_local_topic;
+    TestCase =:= t_ancient_v1_config_migration_without_local_topic
+->
+    Cluster = ?config(cluster, Config),
+    emqx_cth_cluster:stop(Cluster),
+    ok;
 end_per_testcase(_TestCase, Config) ->
     ProxyHost = ?config(proxy_host, Config),
     ProxyPort = ?config(proxy_port, Config),
@@ -94,6 +108,32 @@ end_per_testcase(_TestCase, Config) ->
 %% Helper fns
 %%-------------------------------------------------------------------------------------
 
+basic_node_conf(WorkDir) ->
+    #{
+        <<"node">> => #{
+            <<"cookie">> => erlang:get_cookie(),
+            <<"data_dir">> => unicode:characters_to_binary(WorkDir)
+        }
+    }.
+
+setup_cluster_ancient_config(TestCase, Config, #{with_local_topic := WithLocalTopic}) ->
+    AncientIOList = emqx_bridge_kafka_tests:kafka_producer_old_hocon(WithLocalTopic),
+    {ok, AncientCfg0} = hocon:binary(AncientIOList),
+    WorkDir = emqx_cth_suite:work_dir(TestCase, Config),
+    BasicConf = basic_node_conf(WorkDir),
+    AncientCfg = emqx_utils_maps:deep_merge(BasicConf, AncientCfg0),
+    Apps = [
+        emqx,
+        emqx_conf,
+        emqx_connector,
+        emqx_bridge_kafka,
+        {emqx_bridge, #{schema_mod => emqx_enterprise_schema, config => AncientCfg}}
+    ],
+    emqx_cth_cluster:start(
+        [{kafka_producer_ancient_cfg1, #{apps => Apps}}],
+        #{work_dir => WorkDir}
+    ).
+
 check_send_message_with_bridge(BridgeName) ->
     #{offset := Offset, payload := Payload} = send_message(BridgeName),
     %% ######################################
@@ -578,3 +618,23 @@ t_create_connector_while_connection_is_down(Config) ->
         []
     ),
     ok.
+
+t_ancient_v1_config_migration_with_local_topic(Config) ->
+    %% Simply starting this test case successfully is enough, as the core of the test is
+    %% to be able to successfully start the node with the ancient config.
+    [Node] = ?config(cluster, Config),
+    ?assertMatch(
+        [#{type := <<"kafka_producer">>}],
+        erpc:call(Node, fun emqx_bridge_v2:list/0)
+    ),
+    ok.
+
+t_ancient_v1_config_migration_without_local_topic(Config) ->
+    %% Simply starting this test case successfully is enough, as the core of the test is
+    %% to be able to successfully start the node with the ancient config.
+    [Node] = ?config(cluster, Config),
+    ?assertMatch(
+        [#{type := <<"kafka_producer">>}],
+        erpc:call(Node, fun emqx_bridge_v2:list/0)
+    ),
+    ok.

+ 1 - 0
changes/ee/fix-12767.en.md

@@ -0,0 +1 @@
+Correctly migrate older Kafka Producer configurations (pre 5.0.2) to action and connector configurations.