Просмотр исходного кода

Merge pull request #13090 from thalesmg/fix-reconnect-action-disabled-connector-r57-20240522

fix(bridge v2 api): don't attempt to start disabled connector when starting action/source
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
3f23548cc5

+ 40 - 3
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -987,14 +987,45 @@ call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType,
             ?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
             ?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
     end.
     end.
 
 
-is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) ->
-    try emqx_bridge_v2:lookup(ConfRootKey, BridgeType, binary_to_existing_atom(BridgeName)) of
+is_enabled_bridge(ConfRootKey, ActionOrSourceType, BridgeName) ->
+    try
+        emqx_bridge_v2:lookup(ConfRootKey, ActionOrSourceType, binary_to_existing_atom(BridgeName))
+    of
         {ok, #{raw_config := ConfMap}} ->
         {ok, #{raw_config := ConfMap}} ->
-            maps:get(<<"enable">>, ConfMap, true);
+            maps:get(<<"enable">>, ConfMap, true) andalso
+                is_connector_enabled(
+                    ActionOrSourceType,
+                    maps:get(<<"connector">>, ConfMap)
+                );
         {error, not_found} ->
         {error, not_found} ->
             throw(not_found)
             throw(not_found)
     catch
     catch
         error:badarg ->
         error:badarg ->
+            %% catch non-existing atom,
+            %% none-existing atom means it is not available in config PT storage.
+            throw(not_found);
+        error:{badkey, _} ->
+            %% `connector' field not present.  Should never happen if action/source schema
+            %% is properly defined.
+            throw(not_found)
+    end.
+
+is_connector_enabled(ActionOrSourceType, ConnectorName0) ->
+    try
+        ConnectorType = emqx_bridge_v2:connector_type(ActionOrSourceType),
+        ConnectorName = to_existing_atom(ConnectorName0),
+        case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of
+            undefined ->
+                throw(not_found);
+            Config = #{} ->
+                maps:get(enable, Config, true)
+        end
+    catch
+        throw:badarg ->
+            %% catch non-existing atom,
+            %% none-existing atom means it is not available in config PT storage.
+            throw(not_found);
+        throw:bad_atom ->
             %% catch non-existing atom,
             %% catch non-existing atom,
             %% none-existing atom means it is not available in config PT storage.
             %% none-existing atom means it is not available in config PT storage.
             throw(not_found)
             throw(not_found)
@@ -1407,3 +1438,9 @@ map_to_json(M0) ->
             M2 = maps:without([value, <<"value">>], M1),
             M2 = maps:without([value, <<"value">>], M1),
             emqx_utils_json:encode(M2)
             emqx_utils_json:encode(M2)
     end.
     end.
+
+to_existing_atom(X) ->
+    case emqx_utils:safe_to_existing_atom(X, utf8) of
+        {ok, A} -> A;
+        {error, _} -> throw(bad_atom)
+    end.

+ 56 - 5
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -109,6 +109,7 @@
 
 
 -define(SOURCE_TYPE_STR, "mqtt").
 -define(SOURCE_TYPE_STR, "mqtt").
 -define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
 -define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
+-define(SOURCE_CONNECTOR_TYPE, ?SOURCE_TYPE).
 
 
 -define(APPSPECS, [
 -define(APPSPECS, [
     emqx_conf,
     emqx_conf,
@@ -166,9 +167,19 @@ init_per_group(single = Group, Config) ->
     Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
     Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
     init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]);
     init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]);
 init_per_group(actions, Config) ->
 init_per_group(actions, Config) ->
-    [{bridge_kind, action} | Config];
+    [
+        {bridge_kind, action},
+        {connector_type, ?ACTION_CONNECTOR_TYPE},
+        {connector_name, ?ACTION_CONNECTOR_NAME}
+        | Config
+    ];
 init_per_group(sources, Config) ->
 init_per_group(sources, Config) ->
-    [{bridge_kind, source} | Config];
+    [
+        {bridge_kind, source},
+        {connector_type, ?SOURCE_CONNECTOR_TYPE},
+        {connector_name, ?SOURCE_CONNECTOR_NAME}
+        | Config
+    ];
 init_per_group(_Group, Config) ->
 init_per_group(_Group, Config) ->
     Config.
     Config.
 
 
@@ -202,14 +213,45 @@ end_per_group(single, Config) ->
 end_per_group(_Group, _Config) ->
 end_per_group(_Group, _Config) ->
     ok.
     ok.
 
 
-init_per_testcase(t_action_types, Config) ->
+init_per_testcase(TestCase, Config) when
+    TestCase =:= t_start_action_or_source_with_disabled_connector;
+    TestCase =:= t_action_types
+->
     case ?config(cluster_nodes, Config) of
     case ?config(cluster_nodes, Config) of
         undefined ->
         undefined ->
             init_mocks();
             init_mocks();
         Nodes ->
         Nodes ->
             [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
             [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
     end,
     end,
-    Config;
+    #{
+        connector_config := ConnectorConfig,
+        bridge_type := BridgeType,
+        bridge_name := BridgeName,
+        bridge_config := BridgeConfig
+    } =
+        case ?config(bridge_kind, Config) of
+            action ->
+                #{
+                    connector_config => ?ACTIONS_CONNECTOR,
+                    bridge_type => {action_type, ?ACTION_TYPE},
+                    bridge_name => {action_name, ?ACTION_CONNECTOR_NAME},
+                    bridge_config => {action_config, ?KAFKA_BRIDGE(?ACTION_CONNECTOR_NAME)}
+                };
+            source ->
+                #{
+                    connector_config => source_connector_create_config(#{}),
+                    bridge_type => {source_type, ?SOURCE_TYPE},
+                    bridge_name => {source_name, ?SOURCE_CONNECTOR_NAME},
+                    bridge_config => {source_config, source_config_base()}
+                }
+        end,
+    [
+        {connector_config, ConnectorConfig},
+        BridgeType,
+        BridgeName,
+        BridgeConfig
+        | Config
+    ];
 init_per_testcase(_TestCase, Config) ->
 init_per_testcase(_TestCase, Config) ->
     case ?config(cluster_nodes, Config) of
     case ?config(cluster_nodes, Config) of
         undefined ->
         undefined ->
@@ -434,7 +476,7 @@ source_connector_create_config(Overrides0) ->
         source_connector_config_base(),
         source_connector_config_base(),
         #{
         #{
             <<"enable">> => true,
             <<"enable">> => true,
-            <<"type">> => ?SOURCE_TYPE,
+            <<"type">> => ?SOURCE_CONNECTOR_TYPE,
             <<"name">> => ?SOURCE_CONNECTOR_NAME
             <<"name">> => ?SOURCE_CONNECTOR_NAME
         }
         }
     ),
     ),
@@ -1547,3 +1589,12 @@ t_older_version_nodes_in_cluster(Config) ->
     ),
     ),
 
 
     ok.
     ok.
+
+t_start_action_or_source_with_disabled_connector(matrix) ->
+    [
+        [single, actions],
+        [single, sources]
+    ];
+t_start_action_or_source_with_disabled_connector(Config) ->
+    ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
+    ok.

+ 43 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -383,6 +383,25 @@ start_connector_api(ConnectorName, ConnectorType) ->
     ct:pal("connector update (http) result:\n  ~p", [Res]),
     ct:pal("connector update (http) result:\n  ~p", [Res]),
     Res.
     Res.
 
 
+enable_connector_api(ConnectorType, ConnectorName) ->
+    do_enable_disable_connector_api(ConnectorType, ConnectorName, enable).
+
+disable_connector_api(ConnectorType, ConnectorName) ->
+    do_enable_disable_connector_api(ConnectorType, ConnectorName, disable).
+
+do_enable_disable_connector_api(ConnectorType, ConnectorName, Op) ->
+    ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
+    {OpPath, OpStr} =
+        case Op of
+            enable -> {"true", "enable"};
+            disable -> {"false", "disable"}
+        end,
+    Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "enable", OpPath]),
+    ct:pal(OpStr ++ " connector ~s (http)", [ConnectorId]),
+    Res = request(put, Path, []),
+    ct:pal(OpStr ++ " connector ~s (http) result:\n  ~p", [ConnectorId, Res]),
+    Res.
+
 get_connector_api(ConnectorType, ConnectorName) ->
 get_connector_api(ConnectorType, ConnectorName) ->
     ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
     ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
     Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
     Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
@@ -956,3 +975,27 @@ t_on_get_status(Config, Opts) ->
             )
             )
     end,
     end,
     ok.
     ok.
+
+%% Verifies that attempting to start an action while its connnector is disabled does not
+%% start the connector.
+t_start_action_or_source_with_disabled_connector(Config) ->
+    #{
+        kind := Kind,
+        type := Type,
+        name := Name,
+        connector_type := ConnectorType,
+        connector_name := ConnectorName
+    } = get_common_values(Config),
+    ?check_trace(
+        begin
+            {ok, _} = create_bridge_api(Config),
+            {ok, {{_, 204, _}, _, _}} = disable_connector_api(ConnectorType, ConnectorName),
+            ?assertMatch(
+                {error, {{_, 400, _}, _, _}},
+                op_bridge_api(Kind, "start", Type, Name)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 4 - 0
apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl

@@ -228,3 +228,7 @@ t_sync_query(Config) ->
         postgres_bridge_connector_on_query_return
         postgres_bridge_connector_on_query_return
     ),
     ),
     ok.
     ok.
+
+t_start_action_or_source_with_disabled_connector(Config) ->
+    ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
+    ok.

+ 1 - 0
changes/ce/fix-13090.en.md

@@ -0,0 +1 @@
+Attempting to start an action or source whose connector is disabled will no longer attempt to start the connector itself.