Sfoglia il codice sorgente

Merge pull request #9579 from thalesmg/bugfix-kafka-producer-stop-client-ee50

fix(kafka_producer): cleanup client after failing to start producers
Thales Macedo Garitezi 3 anni fa
parent
commit
a78ecc4bb6

+ 13 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -86,6 +86,19 @@ on_start(InstId, Config) ->
                 kafka_topic => KafkaTopic,
                 reason => Reason2
             }),
+            %% Need to stop the already running client; otherwise, the
+            %% next `on_start' call will try to ensure the client
+            %% exists and it will be already present and using the old
+            %% config.  This is specially bad if the original crash
+            %% was due to misconfiguration and we are trying to fix
+            %% it...
+            _ = with_log_at_error(
+                fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
+                #{
+                    msg => "failed_to_delete_kafka_client",
+                    client_id => ClientId
+                }
+            ),
             throw(failed_to_start_kafka_producer)
     end.
 

+ 61 - 0
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -337,6 +337,67 @@ kafka_bridge_rest_api_helper(Config) ->
     false = MyKafkaBridgeExists(),
     ok.
 
+%%------------------------------------------------------------------------------
+%% Other tests
+%%------------------------------------------------------------------------------
+
+%% Need to stop the already running client; otherwise, the
+%% next `on_start' call will try to ensure the client
+%% exists and it will.  This is specially bad if the
+%% original crash was due to misconfiguration and we are
+%% trying to fix it...
+t_failed_creation_then_fix(_Config) ->
+    HostsString = kafka_hosts_string_sasl(),
+    ValidAuthSettings = valid_sasl_plain_settings(),
+    WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"},
+    Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+    Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+    ResourceId = emqx_bridge_resource:resource_id("kafka", Name),
+    BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
+    KafkaTopic = "test-topic-one-partition",
+    WrongConf = config(#{
+        "authentication" => WrongAuthSettings,
+        "kafka_hosts_string" => HostsString,
+        "kafka_topic" => KafkaTopic,
+        "instance_id" => ResourceId,
+        "ssl" => #{}
+    }),
+    ValidConf = config(#{
+        "authentication" => ValidAuthSettings,
+        "kafka_hosts_string" => HostsString,
+        "kafka_topic" => KafkaTopic,
+        "instance_id" => ResourceId,
+        "ssl" => #{}
+    }),
+    %% creates, but fails to start producers
+    %% FIXME: change to kafka_producer after config refactoring
+    ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), WrongConf, #{})),
+    ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConf)),
+    %% before throwing, it should cleanup the client process.
+    ?assertEqual([], supervisor:which_children(wolff_client_sup)),
+    %% FIXME: change to kafka_producer after config refactoring
+    %% must succeed with correct config
+    ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), ValidConf, #{})),
+    {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConf),
+    %% To make sure we get unique value
+    timer:sleep(1),
+    Time = erlang:monotonic_time(),
+    BinTime = integer_to_binary(Time),
+    Msg = #{
+        clientid => BinTime,
+        payload => <<"payload">>,
+        timestamp => Time
+    },
+    {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
+    ct:pal("base offset before testing ~p", [Offset]),
+    ?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)),
+    {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
+    ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
+    %% TODO: refactor those into init/end per testcase
+    ok = ?PRODUCER:on_stop(ResourceId, State),
+    ok = emqx_bridge_resource:remove(BridgeId),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helper functions
 %%------------------------------------------------------------------------------