فهرست منبع

Merge pull request #11038 from thalesmg/fix-pulsar-get-status-r51

fix(pulsar_producer): do not return `disconnected` when checking status (r5.1)
zhongwencool 2 سال پیش
والد
کامیت
f57ff802e4

+ 2 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -84,6 +84,8 @@
 %% Toxiproxy API
 -export([
     with_failure/5,
+    enable_failure/4,
+    heal_failure/4,
     reset_proxy/2
 ]).
 

+ 8 - 5
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl

@@ -145,7 +145,10 @@ on_stop(InstanceId, _State) ->
             ok
     end.
 
--spec on_get_status(resource_id(), state()) -> connected | disconnected.
+%% Note: since Pulsar client has its own replayq that is not managed by
+%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here.  Otherwise,
+%% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost.
+-spec on_get_status(resource_id(), state()) -> connected | connecting.
 on_get_status(_InstanceId, State = #{}) ->
     #{
         pulsar_client_id := ClientId,
@@ -157,15 +160,15 @@ on_get_status(_InstanceId, State = #{}) ->
                 true ->
                     get_producer_status(Producers);
                 false ->
-                    disconnected
+                    connecting
             catch
                 error:timeout ->
-                    disconnected;
+                    connecting;
                 exit:{noproc, _} ->
-                    disconnected
+                    connecting
             end;
         {error, _} ->
-            disconnected
+            connecting
     end;
 on_get_status(_InstanceId, _State) ->
     %% If a health check happens just after a concurrent request to

+ 89 - 8
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -45,6 +45,7 @@ only_once_tests() ->
         t_send_when_timeout,
         t_failure_to_start_producer,
         t_producer_process_crash,
+        t_resilience,
         t_resource_manager_crash_after_producers_started,
         t_resource_manager_crash_before_producers_started
     ].
@@ -733,13 +734,6 @@ t_start_stop(Config) ->
             ),
 
             %% Check that the bridge probe API doesn't leak atoms.
-            redbug:start(
-                [
-                    "emqx_resource_manager:health_check_interval -> return",
-                    "emqx_resource_manager:with_health_check -> return"
-                ],
-                [{msgs, 100}, {time, 30_000}]
-            ),
             ProbeRes0 = probe_bridge_api(
                 Config,
                 #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
@@ -795,7 +789,11 @@ t_on_get_status(Config) ->
     ),
     emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         ct:sleep(500),
-        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+        ?retry(
+            _Sleep = 1_000,
+            _Attempts = 20,
+            ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
+        )
     end),
     %% Check that it recovers itself.
     ?retry(
@@ -1154,3 +1152,86 @@ do_t_cluster(Config) ->
         []
     ),
     ok.
+
+t_resilience(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            {ok, _} = create_bridge(Config),
+            {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+            on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+            ?retry(
+                _Sleep0 = 1_000,
+                _Attempts0 = 20,
+                ?assertEqual(
+                    {ok, connected},
+                    emqx_resource_manager:health_check(ResourceId)
+                )
+            ),
+
+            {ok, C} = emqtt:start_link(),
+            {ok, _} = emqtt:connect(C),
+            ProduceInterval = 100,
+            TestPid = self(),
+            StartSequentialProducer =
+                fun Go(SeqNo0) ->
+                    receive
+                        stop -> TestPid ! {done, SeqNo0}
+                    after 0 ->
+                        SeqNo = SeqNo0 + 1,
+                        emqtt:publish(C, ?RULE_TOPIC_BIN, integer_to_binary(SeqNo)),
+                        SeqNo rem 10 =:= 0 andalso (TestPid ! {sent, SeqNo}),
+                        timer:sleep(ProduceInterval),
+                        Go(SeqNo)
+                    end
+                end,
+            SequentialProducer = spawn_link(fun() -> StartSequentialProducer(0) end),
+            ct:sleep(2 * ProduceInterval),
+            {ok, _} = emqx_common_test_helpers:enable_failure(
+                down, ProxyName, ProxyHost, ProxyPort
+            ),
+            ?retry(
+                _Sleep1 = 1_000,
+                _Attempts1 = 20,
+                ?assertNotEqual(
+                    {ok, connected},
+                    emqx_resource_manager:health_check(ResourceId)
+                )
+            ),
+            %% Note: we don't check for timeouts here because:
+            %%   a) If we do trigger auto reconnect, that means that the producers were
+            %%   killed and the `receive_consumed' below will fail.
+            %%   b) If there's a timeout, that's the correct path; we just need to give the
+            %%   resource manager a chance to do so.
+            ?block_until(#{?snk_kind := resource_auto_reconnect}, 5_000),
+            {ok, _} = emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort),
+            ?retry(
+                _Sleep2 = 1_000,
+                _Attempts2 = 20,
+                ?assertEqual(
+                    {ok, connected},
+                    emqx_resource_manager:health_check(ResourceId)
+                )
+            ),
+            SequentialProducer ! stop,
+            NumProduced =
+                receive
+                    {done, SeqNo} -> SeqNo
+                after 1_000 -> ct:fail("producer didn't stop!")
+                end,
+            Consumed = lists:flatmap(
+                fun(_) -> receive_consumed(5_000) end, lists:seq(1, NumProduced)
+            ),
+            ?assertEqual(NumProduced, length(Consumed)),
+            ExpectedPayloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumProduced)),
+            ?assertEqual(
+                ExpectedPayloads, lists:map(fun(#{<<"payload">> := P}) -> P end, Consumed)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ee/fix-11038.en.md

@@ -0,0 +1 @@
+Fixed a health check issue for Pulsar Producer that could lead to loss of messages when the connection to Pulsar's brokers were down.