瀏覽代碼

fix(resource): check status when (re)starting a resource

Fixes https://emqx.atlassian.net/browse/EMQX-10290
Thales Macedo Garitezi 2 年之前
父節點
當前提交
13746c2cdf

+ 1 - 2
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -223,8 +223,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
 start(ResId, Opts) ->
     case safe_call(ResId, start, ?T_OPERATION) of
         ok ->
-            _ = wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)),
-            ok;
+            wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000));
         {error, _Reason} = Error ->
             Error
     end.

+ 3 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -246,6 +246,9 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
 on_get_status(_InstId, #{health_check_error := true}) ->
     ?tp(connector_demo_health_check_error, #{}),
     disconnected;
+on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
+    ?tp(connector_demo_health_check_error, #{}),
+    {disconnected, State, Message};
 on_get_status(_InstId, #{pid := Pid}) ->
     timer:sleep(300),
     case is_process_alive(Pid) of

+ 26 - 2
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -40,6 +40,7 @@ groups() ->
 init_per_testcase(_, Config) ->
     ct:timetrap({seconds, 30}),
     emqx_connector_demo:set_callback_mode(always_sync),
+    snabbkaffe:start_trace(),
     Config.
 
 end_per_testcase(_, _Config) ->
@@ -1145,10 +1146,33 @@ t_auto_retry(_) ->
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, create_error => true},
-        #{auto_retry_interval => 100}
+        #{health_check_interval => 100}
     ),
     ?assertEqual(ok, Res).
 
+%% tests resources that have an asynchronous start: they are created
+%% without problems, but later some issue is found when calling the
+%% health check.
+t_start_throw_error(_Config) ->
+    Message = "something went wrong",
+    ?assertMatch(
+        {{ok, _}, {ok, _}},
+        ?wait_async_action(
+            emqx_resource:create_local(
+                ?ID,
+                ?DEFAULT_RESOURCE_GROUP,
+                ?TEST_RESOURCE,
+                #{name => test_resource, health_check_error => {msg, Message}},
+                #{health_check_interval => 100}
+            ),
+            #{?snk_kind := connector_demo_health_check_error},
+            1_000
+        )
+    ),
+    %% Now, if we try to "reconnect" (restart) it, we should get the error
+    ?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})),
+    ok.
+
 t_health_check_disconnected(_) ->
     ?check_trace(
         begin
@@ -1157,7 +1181,7 @@ t_health_check_disconnected(_) ->
                 ?DEFAULT_RESOURCE_GROUP,
                 ?TEST_RESOURCE,
                 #{name => test_resource, create_error => true},
-                #{auto_retry_interval => 100}
+                #{health_check_interval => 100}
             ),
             ?assertEqual(
                 {ok, disconnected},

+ 1 - 0
changes/ce/fix-11094.en.md

@@ -0,0 +1 @@
+Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge.