Explorar o código

fix(kafka producer): kick off recovery from disk for fixed topics

Fixes https://emqx.atlassian.net/browse/EMQX-13242
Thales Macedo Garitezi hai 1 ano
pai
achega
8b342d577e

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.5.0"},
+    {vsn, "0.5.1"},
     {registered, [emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 7 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -164,6 +164,13 @@ create_producers_for_bridge_v2(
     ),
     case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
         {ok, Producers} ->
+            case TopicType of
+                fixed ->
+                    ok = wolff:add_topic(Producers, MKafkaTopic),
+                    ok;
+                _ ->
+                    ok
+            end,
             ok = emqx_resource:allocate_resource(
                 ConnResId, {?kafka_producers, ActionResId}, Producers
             ),

+ 150 - 17
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -28,6 +28,7 @@
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 -define(TYPE, kafka_producer).
+-define(TELEMETRY_PREFIX, emqx, resource).
 
 %%------------------------------------------------------------------------------
 %% CT boilerplate
@@ -186,6 +187,10 @@ check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) ->
     {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
     ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
 
+fetch_since(Hosts, KafkaTopic, Partition, Offset) ->
+    {ok, {_, Msgs}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
+    Msgs.
+
 ensure_kafka_topic(KafkaTopic) ->
     TopicConfigs = [
         #{
@@ -251,6 +256,11 @@ bridge_v2_config(ConnectorName, KafkaTopic) ->
         }
     }.
 
+connector_config_toxiproxy(Config) ->
+    BootstrapHosts = toxiproxy_bootstrap_hosts(Config),
+    Overrides = #{<<"bootstrap_hosts">> => BootstrapHosts},
+    connector_config(Overrides).
+
 connector_config() ->
     connector_config(_Overrides = #{}).
 
@@ -289,6 +299,13 @@ kafka_hosts_string() ->
     KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
     KafkaHost ++ ":" ++ KafkaPort.
 
+toxiproxy_bootstrap_hosts(Config) ->
+    Host = ?config(kafka_host, Config),
+    %% assert
+    "toxiproxy" ++ _ = Host,
+    Port = ?config(kafka_port, Config),
+    iolist_to_binary([Host, ":", integer_to_binary(Port)]).
+
 create_connector(Name, Config) ->
     Res = emqx_connector:create(?TYPE, Name, Config),
     on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
@@ -330,6 +347,27 @@ assert_status_api(Line, Type, Name, Status) ->
 get_rule_metrics(RuleId) ->
     emqx_metrics_worker:get_metrics(rule_metrics, RuleId).
 
+tap_telemetry(HandlerId) ->
+    TestPid = self(),
+    telemetry:attach_many(
+        HandlerId,
+        emqx_resource_metrics:events(),
+        fun(EventName, Measurements, Metadata, _Config) ->
+            Data = #{
+                name => EventName,
+                measurements => Measurements,
+                metadata => Metadata
+            },
+            TestPid ! {telemetry, Data},
+            ok
+        end,
+        unused_config
+    ),
+    on_exit(fun() -> telemetry:detach(HandlerId) end),
+    ok.
+
+-define(tapTelemetry(), tap_telemetry(?FUNCTION_NAME)).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1028,23 +1066,7 @@ t_dynamic_topics(Config) ->
             ),
             ?assertStatusAPI(Type, ActionName, <<"connected">>),
 
-            HandlerId = ?FUNCTION_NAME,
-            TestPid = self(),
-            telemetry:attach_many(
-                HandlerId,
-                emqx_resource_metrics:events(),
-                fun(EventName, Measurements, Metadata, _Config) ->
-                    Data = #{
-                        name => EventName,
-                        measurements => Measurements,
-                        metadata => Metadata
-                    },
-                    TestPid ! {telemetry, Data},
-                    ok
-                end,
-                unused_config
-            ),
-            on_exit(fun() -> telemetry:detach(HandlerId) end),
+            ?tapTelemetry(),
 
             {ok, C} = emqtt:start_link(#{}),
             {ok, _} = emqtt:connect(C),
@@ -1094,3 +1116,114 @@ t_dynamic_topics(Config) ->
         []
     ),
     ok.
+
+%% Checks that messages accumulated in disk mode for a fixed topic producer are kicked off
+%% when the action is later restarted and kafka is online.
+t_fixed_topic_recovers_in_disk_mode(Config) ->
+    ProxyName = ?config(proxy_name, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(
+        connector_config, Config, connector_config_toxiproxy(Config)
+    ),
+    ActionName = <<"fixed_topic_disk_recover">>,
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{
+                <<"parameters">> => #{
+                    <<"query_mode">> => <<"async">>,
+                    <<"buffer">> => #{
+                        <<"mode">> => <<"disk">>
+                    }
+                }
+            }
+        )
+    ),
+    Topic = emqx_utils_maps:deep_get([<<"parameters">>, <<"topic">>], ActionConfig),
+    Hosts = kpro:parse_endpoints(
+        binary_to_list(maps:get(<<"bootstrap_hosts">>, ConnectorConfig))
+    ),
+    ?check_trace(
+        #{timetrap => 7_000},
+        begin
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionParams = [
+                {action_config, ActionConfig},
+                {action_name, ActionName},
+                {action_type, Type}
+            ],
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams),
+            RuleTopic = <<"fixed/disk/recover">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
+                Type,
+                RuleTopic,
+                [
+                    {bridge_name, ActionName}
+                ]
+            ),
+            %% Cut connection to kafka and enqueue some messages
+            ActionId = emqx_bridge_v2:id(Type, ActionName),
+            SentMessages =
+                emqx_common_test_helpers:with_failure(
+                    down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                        ct:sleep(100),
+                        SentMessages = [send_message(ActionName) || _ <- lists:seq(1, 5)],
+                        ?assertEqual(5, emqx_resource_metrics:matched_get(ActionId)),
+                        ?retry(
+                            _Sleep = 200,
+                            _Attempts = 20,
+                            ?assertEqual(5, emqx_resource_metrics:queuing_get(ActionId))
+                        ),
+                        ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
+                        %% Turn off action, restore kafka connection
+                        ?assertMatch(
+                            {204, _},
+                            emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName)
+                        ),
+                        SentMessages
+                    end
+                ),
+            %% Restart action; should've shot enqueued messages
+            ?tapTelemetry(),
+            ?assertMatch(
+                {204, _},
+                emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName)
+            ),
+            ?assertReceive(
+                {telemetry, #{
+                    name := [?TELEMETRY_PREFIX, inflight],
+                    measurements := #{gauge_set := 0}
+                }}
+            ),
+            %% Success metrics are not bumped because wolff does not store callbacks in
+            %% disk.
+            ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
+            ?retry(
+                _Sleep1 = 200,
+                _Attempts1 = 20,
+                ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId))
+            ),
+            [#{offset := Offset} | _] = SentMessages,
+            Partition = 0,
+            Messages = fetch_since(Hosts, Topic, Partition, Offset),
+            ?assertMatch([_, _, _, _, _], Messages),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
apps/emqx_resource/src/emqx_resource.app.src

@@ -10,6 +10,7 @@
         gproc,
         jsx,
         ecpool,
+        replayq,
         emqx,
         telemetry
     ]},

+ 1 - 0
changes/ee/fix-14015.en.md

@@ -0,0 +1 @@
+Previously, if a Kafka/Confluent/Azure Event Hub Producer action with disk buffering had queued messages and was restarted, the queued messages were not sent until a new message arrived.  For actions that have a fixed topic (i.e., the topic does not contain any placeholders), this was fixed.