فهرست منبع

Merge pull request #11985 from sstrigler/EMQX-11378-the-status-code-returned-by-the-interface-for-deleting-the-connector-is-not-appropriate

fix(emqx_connector): don't crash in API on delete with active channels
Stefan Strigler 2 سال پیش
والد
کامیت
5e313c7a3e
2فایلهای تغییر یافته به همراه130 افزوده شده و 15 حذف شده
  1. 1 1
      apps/emqx_connector/src/emqx_connector_api.erl
  2. 129 14
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl

+ 1 - 1
apps/emqx_connector/src/emqx_connector_api.erl

@@ -372,7 +372,7 @@ schema("/connectors_probe") ->
                 case emqx_connector:remove(ConnectorType, ConnectorName) of
                     ok ->
                         ?NO_CONTENT;
-                    {error, {active_channels, Channels}} ->
+                    {error, {post_config_update, _HandlerMod, {active_channels, Channels}}} ->
                         ?BAD_REQUEST(
                             {<<"Cannot delete connector while there are active channels defined for this connector">>,
                                 Channels}

+ 129 - 14
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -25,7 +25,7 @@
 -include_lib("snabbkaffe/include/test_macros.hrl").
 
 -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))).
--define(CONNECTOR(NAME, TYPE), #{
+-define(RESOURCE(NAME, TYPE), #{
     %<<"ssl">> => #{<<"enable">> => false},
     <<"type">> => TYPE,
     <<"name">> => NAME
@@ -52,12 +52,57 @@
 -define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)).
 -define(KAFKA_CONNECTOR(Name, BootstrapHosts),
     maps:merge(
-        ?CONNECTOR(Name, ?CONNECTOR_TYPE),
+        ?RESOURCE(Name, ?CONNECTOR_TYPE),
         ?KAFKA_CONNECTOR_BASE(BootstrapHosts)
     )
 ).
 -define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
 
+-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
+-define(BRIDGE_TYPE_STR, "kafka_producer").
+-define(BRIDGE_TYPE, <<?BRIDGE_TYPE_STR>>).
+-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{
+    <<"enable">> => true,
+    <<"connector">> => Connector,
+    <<"kafka">> => #{
+        <<"buffer">> => #{
+            <<"memory_overload_protection">> => true,
+            <<"mode">> => <<"hybrid">>,
+            <<"per_partition_limit">> => <<"2GB">>,
+            <<"segment_bytes">> => <<"100MB">>
+        },
+        <<"compression">> => <<"no_compression">>,
+        <<"kafka_ext_headers">> => [
+            #{
+                <<"kafka_ext_header_key">> => <<"clientid">>,
+                <<"kafka_ext_header_value">> => <<"${clientid}">>
+            },
+            #{
+                <<"kafka_ext_header_key">> => <<"topic">>,
+                <<"kafka_ext_header_value">> => <<"${topic}">>
+            }
+        ],
+        <<"kafka_header_value_encode_mode">> => <<"none">>,
+        <<"kafka_headers">> => <<"${pub_props}">>,
+        <<"max_batch_bytes">> => <<"896KB">>,
+        <<"max_inflight">> => 10,
+        <<"message">> => #{
+            <<"key">> => <<"${.clientid}">>,
+            <<"timestamp">> => <<"${.timestamp}">>,
+            <<"value">> => <<"${.}">>
+        },
+        <<"partition_count_refresh_interval">> => <<"60s">>,
+        <<"partition_strategy">> => <<"random">>,
+        <<"required_acks">> => <<"all_isr">>,
+        <<"topic">> => <<"kafka-topic">>
+    },
+    <<"local_topic">> => <<"mqtt/local/topic">>,
+    <<"resource_opts">> => #{
+        <<"health_check_interval">> => <<"32s">>
+    }
+}).
+-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)).
+
 %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>).
 %% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{
 %%     <<"server">> => SERVER,
@@ -105,7 +150,8 @@
     emqx,
     emqx_auth,
     emqx_management,
-    {emqx_connector, "connectors {}"}
+    {emqx_connector, "connectors {}"},
+    {emqx_bridge, "actions {}"}
 ]).
 
 -define(APPSPEC_DASHBOARD,
@@ -128,7 +174,8 @@ all() ->
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     SingleOnlyTests = [
-        t_connectors_probe
+        t_connectors_probe,
+        t_fail_delete_with_action
     ],
     ClusterLaterJoinOnlyTCs = [
         % t_cluster_later_join_metrics
@@ -187,29 +234,38 @@ end_per_group(_, Config) ->
     emqx_cth_suite:stop(?config(group_apps, Config)),
     ok.
 
-init_per_testcase(_TestCase, Config) ->
+init_per_testcase(TestCase, Config) ->
     case ?config(cluster_nodes, Config) of
         undefined ->
-            init_mocks();
+            init_mocks(TestCase);
         Nodes ->
-            [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
+            [erpc:call(Node, ?MODULE, init_mocks, [TestCase]) || Node <- Nodes]
     end,
     Config.
 
-end_per_testcase(_TestCase, Config) ->
+end_per_testcase(TestCase, Config) ->
+    Node = ?config(node, Config),
+    ok = erpc:call(Node, ?MODULE, clear_resources, [TestCase]),
     case ?config(cluster_nodes, Config) of
         undefined ->
             meck:unload();
         Nodes ->
-            [erpc:call(Node, meck, unload, []) || Node <- Nodes]
+            [erpc:call(N, meck, unload, []) || N <- Nodes]
     end,
-    Node = ?config(node, Config),
     ok = emqx_common_test_helpers:call_janitor(),
-    ok = erpc:call(Node, fun clear_resources/0),
     ok.
 
 -define(CONNECTOR_IMPL, dummy_connector_impl).
-init_mocks() ->
+init_mocks(t_fail_delete_with_action) ->
+    init_mocks(common),
+    meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
+    meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
+    meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
+    ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
+        emqx_bridge_v2:get_channels_for_connector(ResId)
+    end),
+    ok;
+init_mocks(_TestCase) ->
     meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
     meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
     meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
@@ -235,7 +291,15 @@ init_mocks() ->
     ),
     [?CONNECTOR_IMPL, emqx_connector_ee_schema].
 
-clear_resources() ->
+clear_resources(t_fail_delete_with_action) ->
+    lists:foreach(
+        fun(#{type := Type, name := Name}) ->
+            ok = emqx_bridge_v2:remove(Type, Name)
+        end,
+        emqx_bridge_v2:list()
+    ),
+    clear_resources(common);
+clear_resources(_) ->
     lists:foreach(
         fun(#{type := Type, name := Name}) ->
             ok = emqx_connector:remove(Type, Name)
@@ -646,7 +710,7 @@ t_connectors_probe(Config) ->
         request_json(
             post,
             uri(["connectors_probe"]),
-            ?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>),
+            ?RESOURCE(<<"broken_connector">>, <<"unknown_type">>),
             Config
         )
     ),
@@ -674,6 +738,57 @@ t_create_with_bad_name(Config) ->
     ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
     ok.
 
+t_fail_delete_with_action(Config) ->
+    Name = ?CONNECTOR_NAME,
+    ?assertMatch(
+        {ok, 201, #{
+            <<"type">> := ?CONNECTOR_TYPE,
+            <<"name">> := Name,
+            <<"enable">> := true,
+            <<"status">> := <<"connected">>,
+            <<"node_status">> := [_ | _]
+        }},
+        request_json(
+            post,
+            uri(["connectors"]),
+            ?KAFKA_CONNECTOR(Name),
+            Config
+        )
+    ),
+    ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
+    BridgeName = ?BRIDGE_NAME,
+    ?assertMatch(
+        {ok, 201, #{
+            <<"type">> := ?BRIDGE_TYPE,
+            <<"name">> := BridgeName,
+            <<"enable">> := true,
+            <<"status">> := <<"connected">>,
+            <<"node_status">> := [_ | _],
+            <<"connector">> := Name,
+            <<"kafka">> := #{},
+            <<"local_topic">> := _,
+            <<"resource_opts">> := _
+        }},
+        request_json(
+            post,
+            uri(["actions"]),
+            ?KAFKA_BRIDGE(?BRIDGE_NAME),
+            Config
+        )
+    ),
+
+    %% delete the connector
+    ?assertMatch(
+        {ok, 400, #{
+            <<"code">> := <<"BAD_REQUEST">>,
+            <<"message">> :=
+                <<"{<<\"Cannot delete connector while there are active channels",
+                    " defined for this connector\">>,", _/binary>>
+        }},
+        request_json(delete, uri(["connectors", ConnectorID]), Config)
+    ),
+    ok.
+
 %%% helpers
 listen_on_random_port() ->
     SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],