Преглед изворни кода

fix(connector_api): avoid calling resource process to get channels

Fixes https://emqx.atlassian.net/browse/EMQX-11586
Thales Macedo Garitezi пре 2 година
родитељ
комит
a7fe5da8d6

+ 53 - 4
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(export_all).
 
 -import(emqx_mgmt_api_test_util, [uri/1]).
+-import(emqx_common_test_helpers, [on_exit/1]).
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
@@ -830,9 +831,9 @@ t_list_disabled_channels(Config) ->
         )
     ),
     ActionName = ?BRIDGE_NAME,
-    ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := true},
+    ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := false},
     ?assertMatch(
-        {ok, 201, #{<<"enable">> := true}},
+        {ok, 201, #{<<"enable">> := false}},
         request_json(
             post,
             uri(["actions"]),
@@ -841,14 +842,35 @@ t_list_disabled_channels(Config) ->
         )
     ),
     ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
+    ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName),
     ?assertMatch(
-        {ok, 200, #{<<"actions">> := [ActionName]}},
+        {ok, 200, #{
+            <<"status">> := <<"disconnected">>,
+            <<"status_reason">> := <<"Not installed">>,
+            <<"error">> := <<"Not installed">>
+        }},
         request_json(
             get,
-            uri(["connectors", ConnectorID]),
+            uri(["actions", ActionID]),
             Config
         )
     ),
+    %% This should be fast even if the connector resource process is unresponsive.
+    ConnectorResID = emqx_connector_resource:resource_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
+    suspend_connector_resource(ConnectorResID, Config),
+    try
+        ?assertMatch(
+            {ok, 200, #{<<"actions">> := [ActionName]}},
+            request_json(
+                get,
+                uri(["connectors", ConnectorID]),
+                Config
+            )
+        ),
+        ok
+    after
+        resume_connector_resource(ConnectorResID, Config)
+    end,
     ok.
 
 t_raw_config_response_defaults(Config) ->
@@ -987,3 +1009,30 @@ json(B) when is_binary(B) ->
             ct:pal("Failed to decode json: ~p~n~p", [Reason, B]),
             Error
     end.
+
+suspend_connector_resource(ConnectorResID, Config) ->
+    Node = ?config(node, Config),
+    Pid = erpc:call(Node, fun() ->
+        [Pid] = [
+            Pid
+         || {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup),
+            ID =:= ConnectorResID
+        ],
+        sys:suspend(Pid),
+        Pid
+    end),
+    on_exit(fun() -> erpc:call(Node, fun() -> catch sys:resume(Pid) end) end),
+    ok.
+
+resume_connector_resource(ConnectorResID, Config) ->
+    Node = ?config(node, Config),
+    erpc:call(Node, fun() ->
+        [Pid] = [
+            Pid
+         || {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup),
+            ID =:= ConnectorResID
+        ],
+        sys:resume(Pid),
+        ok
+    end),
+    ok.

+ 6 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -453,7 +453,12 @@ channel_health_check(ResId, ChannelId) ->
 
 -spec get_channels(resource_id()) -> {ok, [{binary(), map()}]} | {error, term()}.
 get_channels(ResId) ->
-    emqx_resource_manager:get_channels(ResId).
+    case emqx_resource_manager:lookup_cached(ResId) of
+        {error, not_found} ->
+            {error, not_found};
+        {ok, _Group, _ResourceData = #{mod := Mod}} ->
+            {ok, emqx_resource:call_get_channels(ResId, Mod)}
+    end.
 
 set_resource_status_connecting(ResId) ->
     emqx_resource_manager:set_resource_status_connecting(ResId).