瀏覽代碼

Merge pull request #13327 from thalesmg/fix-kprodu-delete-wolff-r57-20240624

fix(kafka and derivatives): add `alias` config to avoid clashes with same topic
Thales Macedo Garitezi 1 年之前
父節點
當前提交
9215b3710f

+ 38 - 1
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -288,6 +288,14 @@ request(Method, Path, Params) ->
             Error
     end.
 
+simplify_result(Res) ->
+    case Res of
+        {error, {{_, Status, _}, _, Body}} ->
+            {Status, Body};
+        {ok, {{_, Status, _}, _, Body}} ->
+            {Status, Body}
+    end.
+
 list_bridges_api() ->
     Params = [],
     Path = emqx_mgmt_api_test_util:api_path(["actions"]),
@@ -321,7 +329,7 @@ get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
     Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
     ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
     Res = request(get, Path, Params),
-    ct:pal("get bridge ~p result: ~p", [{BridgeKind, BridgeType, BridgeName}, Res]),
+    ct:pal("get bridge ~p result:\n  ~p", [{BridgeKind, BridgeType, BridgeName}, Res]),
     Res.
 
 create_bridge_api(Config) ->
@@ -349,6 +357,26 @@ create_kind_api(Config, Overrides) ->
     ct:pal("bridge create (~s, http) result:\n  ~p", [Kind, Res]),
     Res.
 
+enable_kind_api(Kind, ConnectorType, ConnectorName) ->
+    do_enable_disable_kind_api(Kind, ConnectorType, ConnectorName, enable).
+
+disable_kind_api(Kind, ConnectorType, ConnectorName) ->
+    do_enable_disable_kind_api(Kind, ConnectorType, ConnectorName, disable).
+
+do_enable_disable_kind_api(Kind, Type, Name, Op) ->
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    RootBin = api_path_root(Kind),
+    {OpPath, OpStr} =
+        case Op of
+            enable -> {"true", "enable"};
+            disable -> {"false", "disable"}
+        end,
+    Path = emqx_mgmt_api_test_util:api_path([RootBin, BridgeId, "enable", OpPath]),
+    ct:pal(OpStr ++ " ~s ~s (http)", [Kind, BridgeId]),
+    Res = request(put, Path, []),
+    ct:pal(OpStr ++ " ~s ~s (http) result:\n  ~p", [Kind, BridgeId, Res]),
+    simplify_result(Res).
+
 create_connector_api(Config) ->
     create_connector_api(Config, _Overrides = #{}).
 
@@ -453,6 +481,15 @@ update_bridge_api(Config, Overrides) ->
     ct:pal("update bridge (~s, http) result:\n  ~p", [Kind, Res]),
     Res.
 
+delete_kind_api(Kind, Type, Name) ->
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    PathRoot = api_path_root(Kind),
+    Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId]),
+    ct:pal("deleting bridge (~s, http)", [Kind]),
+    Res = request(delete, Path, _Params = []),
+    ct:pal("delete bridge (~s, http) result:\n  ~p", [Kind, Res]),
+    simplify_result(Res).
+
 op_bridge_api(Op, BridgeType, BridgeName) ->
     op_bridge_api(_Kind = action, Op, BridgeType, BridgeName).
 

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 31 - 14
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl

@@ -40,6 +40,8 @@ init_per_suite(Config) ->
                     emqx,
                     emqx_management,
                     emqx_resource,
+                    %% Just for test helpers
+                    brod,
                     emqx_bridge_azure_event_hub,
                     emqx_bridge,
                     emqx_rule_engine,
@@ -93,6 +95,9 @@ common_init_per_testcase(TestCase, Config) ->
             {connector_type, ?CONNECTOR_TYPE},
             {connector_name, Name},
             {connector_config, ConnectorConfig},
+            {action_type, ?BRIDGE_TYPE},
+            {action_name, Name},
+            {action_config, BridgeConfig},
             {bridge_type, ?BRIDGE_TYPE},
             {bridge_name, Name},
             {bridge_config, BridgeConfig}
@@ -100,18 +105,13 @@ common_init_per_testcase(TestCase, Config) ->
         ].
 
 end_per_testcase(_Testcase, Config) ->
-    case proplists:get_bool(skip_does_not_apply, Config) of
-        true ->
-            ok;
-        false ->
-            ProxyHost = ?config(proxy_host, Config),
-            ProxyPort = ?config(proxy_port, Config),
-            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-            emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
-            emqx_common_test_helpers:call_janitor(60_000),
-            ok = snabbkaffe:stop(),
-            ok
-    end.
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+    emqx_common_test_helpers:call_janitor(60_000),
+    ok = snabbkaffe:stop(),
+    ok.
 
 %%------------------------------------------------------------------------------
 %% Helper fns
@@ -172,7 +172,7 @@ bridge_config(Name, ConnectorId, KafkaTopic) ->
         #{
             <<"enable">> => true,
             <<"connector">> => ConnectorId,
-            <<"kafka">> =>
+            <<"parameters">> =>
                 #{
                     <<"buffer">> =>
                         #{
@@ -322,7 +322,7 @@ t_same_name_azure_kafka_bridges(Config) ->
     ),
 
     %% then create a Kafka bridge with same name and delete it after creation
-    ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
+    ConfigKafka0 = lists:keyreplace(action_type, 1, Config, {action_type, ?KAFKA_BRIDGE_TYPE}),
     ConfigKafka = lists:keyreplace(
         connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE}
     ),
@@ -374,3 +374,20 @@ t_http_api_get(Config) ->
         emqx_bridge_testlib:list_bridges_api()
     ),
     ok.
+
+t_multiple_actions_sharing_topic(Config) ->
+    ActionConfig0 = ?config(action_config, Config),
+    ActionConfig =
+        emqx_utils_maps:deep_merge(
+            ActionConfig0,
+            #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
+        ),
+    ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
+        [
+            {type, ?BRIDGE_TYPE_BIN},
+            {connector_name, ?config(connector_name, Config)},
+            {connector_config, ?config(connector_config, Config)},
+            {action_config, ActionConfig}
+        ]
+    ),
+    ok.

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 23 - 1
apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl

@@ -40,6 +40,8 @@ init_per_suite(Config) ->
                     emqx,
                     emqx_management,
                     emqx_resource,
+                    %% Just for test helpers
+                    brod,
                     emqx_bridge_confluent,
                     emqx_bridge,
                     emqx_rule_engine,
@@ -93,6 +95,9 @@ common_init_per_testcase(TestCase, Config) ->
             {connector_type, ?CONNECTOR_TYPE},
             {connector_name, Name},
             {connector_config, ConnectorConfig},
+            {action_type, ?ACTION_TYPE},
+            {action_name, Name},
+            {action_config, BridgeConfig},
             {bridge_type, ?ACTION_TYPE},
             {bridge_name, Name},
             {bridge_config, BridgeConfig}
@@ -306,7 +311,7 @@ t_same_name_confluent_kafka_bridges(Config) ->
     ),
 
     %% then create a Kafka bridge with same name and delete it after creation
-    ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
+    ConfigKafka0 = lists:keyreplace(action_type, 1, Config, {action_type, ?KAFKA_BRIDGE_TYPE}),
     ConfigKafka = lists:keyreplace(
         connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE}
     ),
@@ -378,3 +383,20 @@ t_list_v1_bridges(Config) ->
         []
     ),
     ok.
+
+t_multiple_actions_sharing_topic(Config) ->
+    ActionConfig0 = ?config(action_config, Config),
+    ActionConfig =
+        emqx_utils_maps:deep_merge(
+            ActionConfig0,
+            #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
+        ),
+    ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
+        [
+            {type, ?ACTION_TYPE_BIN},
+            {connector_name, ?config(connector_name, Config)},
+            {connector_config, ?config(connector_config, Config)},
+            {action_config, ActionConfig}
+        ]
+    ),
+    ok.

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.3.2"},
+    {vsn, "0.3.3"},
     {registered, [emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 13 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -324,6 +324,12 @@ on_query(
         }),
         do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
     catch
+        error:{invalid_partition_count, Count, _Partitioner} ->
+            ?tp("kafka_producer_invalid_partition_count", #{
+                action_id => MessageTag,
+                query_mode => sync
+            }),
+            {error, {unrecoverable_error, {invalid_partition_count, Count}}};
         throw:{bad_kafka_header, _} = Error ->
             ?tp(
                 emqx_bridge_kafka_impl_producer_sync_query_failed,
@@ -384,8 +390,12 @@ on_query_async(
         }),
         do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
     catch
-        error:{invalid_partition_count, _Count, _Partitioner} ->
-            {error, invalid_partition_count};
+        error:{invalid_partition_count, Count, _Partitioner} ->
+            ?tp("kafka_producer_invalid_partition_count", #{
+                action_id => MessageTag,
+                query_mode => async
+            }),
+            {error, {unrecoverable_error, {invalid_partition_count, Count}}};
         throw:{bad_kafka_header, _} = Error ->
             ?tp(
                 emqx_bridge_kafka_impl_producer_async_query_failed,
@@ -690,6 +700,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
         max_batch_bytes => MaxBatchBytes,
         max_send_ahead => MaxInflight - 1,
         compression => Compression,
+        alias => BridgeV2Id,
         telemetry_meta_data => #{bridge_id => BridgeV2Id},
         max_partitions => MaxPartitions
     }.

+ 221 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -142,6 +142,9 @@ check_send_message_with_bridge(BridgeName) ->
     check_kafka_message_payload(Offset, Payload).
 
 send_message(ActionName) ->
+    send_message(?TYPE, ActionName).
+
+send_message(Type, ActionName) ->
     %% ######################################
     %% Create Kafka message
     %% ######################################
@@ -157,8 +160,8 @@ send_message(ActionName) ->
     %% ######################################
     %% Send message
     %% ######################################
-    emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}),
-    #{offset => Offset, payload => Payload}.
+    Res = emqx_bridge_v2:send_message(Type, ActionName, Msg, #{}),
+    #{offset => Offset, payload => Payload, result => Res}.
 
 resolve_kafka_offset() ->
     KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
@@ -285,6 +288,21 @@ action_api_spec_props_for_get() ->
         emqx_bridge_v2_testlib:actions_api_spec_schemas(),
     Props.
 
+assert_status_api(Line, Type, Name, Status) ->
+    ?assertMatch(
+        {ok,
+            {{_, 200, _}, _, #{
+                <<"status">> := Status,
+                <<"node_status">> := [#{<<"status">> := Status}]
+            }}},
+        emqx_bridge_v2_testlib:get_bridge_api(Type, Name),
+        #{line => Line, name => Name, expected_status => Status}
+    ).
+-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
+
+get_rule_metrics(RuleId) ->
+    emqx_metrics_worker:get_metrics(rule_metrics, RuleId).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -662,3 +680,204 @@ t_ancient_v1_config_migration_without_local_topic(Config) ->
         erpc:call(Node, fun emqx_bridge_v2:list/0)
     ),
     ok.
+
+%% Checks that, if Kafka raises `invalid_partition_count' error, we bump the corresponding
+%% failure rule action metric.
+t_invalid_partition_count_metrics(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionName = <<"a">>,
+            ActionParams = [
+                {action_config, ActionConfig1},
+                {action_name, ActionName},
+                {action_type, Type}
+            ],
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams),
+            RuleTopic = <<"t/a">>,
+            {ok, #{<<"id">> := RuleId}} =
+                emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [
+                    {bridge_name, ActionName}
+                ]),
+
+            {ok, C} = emqtt:start_link([]),
+            {ok, _} = emqtt:connect(C),
+
+            %%--------------------------------------------
+            ?tp(notice, "sync", #{}),
+            %%--------------------------------------------
+            %% Artificially force sync query to be used; otherwise, it's only used when the
+            %% resource is blocked and retrying.
+            ok = meck:new(emqx_bridge_kafka_impl_producer, [passthrough, no_history]),
+            on_exit(fun() -> catch meck:unload() end),
+            ok = meck:expect(emqx_bridge_kafka_impl_producer, query_mode, 1, simple_sync),
+
+            %% Simulate `invalid_partition_count'
+            emqx_common_test_helpers:with_mock(
+                wolff,
+                send_sync,
+                fun(_Producers, _Msgs, _Timeout) ->
+                    error({invalid_partition_count, 0, partitioner})
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            emqtt:publish(C, RuleTopic, <<"hi">>, 2),
+                            #{
+                                ?snk_kind := "kafka_producer_invalid_partition_count",
+                                query_mode := sync
+                            }
+                        ),
+                    ?assertMatch(
+                        #{
+                            counters := #{
+                                'actions.total' := 1,
+                                'actions.failed' := 1
+                            }
+                        },
+                        get_rule_metrics(RuleId)
+                    ),
+                    ok
+                end
+            ),
+
+            %%--------------------------------------------
+            %% Same thing, but async call
+            ?tp(notice, "async", #{}),
+            %%--------------------------------------------
+            ok = meck:expect(
+                emqx_bridge_kafka_impl_producer,
+                query_mode,
+                fun(Conf) -> meck:passthrough([Conf]) end
+            ),
+            ok = emqx_bridge_v2:remove(actions, Type, ActionName),
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(
+                    ActionParams,
+                    #{<<"parameters">> => #{<<"query_mode">> => <<"async">>}}
+                ),
+
+            %% Simulate `invalid_partition_count'
+            emqx_common_test_helpers:with_mock(
+                wolff,
+                send,
+                fun(_Producers, _Msgs, _Timeout) ->
+                    error({invalid_partition_count, 0, partitioner})
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            emqtt:publish(C, RuleTopic, <<"hi">>, 2),
+                            #{?snk_kind := "rule_engine_applied_all_rules"}
+                        ),
+                    ?assertMatch(
+                        #{
+                            counters := #{
+                                'actions.total' := 2,
+                                'actions.failed' := 2
+                            }
+                        },
+                        get_rule_metrics(RuleId)
+                    ),
+                    ok
+                end
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [#{query_mode := sync}, #{query_mode := async} | _],
+                ?of_kind("kafka_producer_invalid_partition_count", Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+%% Tests that deleting/disabling an action that share the same Kafka topic with other
+%% actions do not disturb the latter.
+t_multiple_actions_sharing_topic(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
+    ActionConfig = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ?check_trace(
+        begin
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionName1 = <<"a1">>,
+            ActionParams1 = [
+                {action_config, ActionConfig},
+                {action_name, ActionName1},
+                {action_type, Type}
+            ],
+            ActionName2 = <<"a2">>,
+            ActionParams2 = [
+                {action_config, ActionConfig},
+                {action_name, ActionName2},
+                {action_type, Type}
+            ],
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams1),
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams2),
+            RuleTopic = <<"t/a2">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config),
+
+            ?assertStatusAPI(Type, ActionName1, <<"connected">>),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+
+            %% Disabling a1 shouldn't disturb a2.
+            ?assertMatch(
+                {204, _}, emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName1)
+            ),
+
+            ?assertStatusAPI(Type, ActionName1, <<"disconnected">>),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+
+            ?assertMatch(#{result := ok}, send_message(Type, ActionName2)),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+
+            ?assertMatch(
+                {204, _},
+                emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName1)
+            ),
+            ?assertStatusAPI(Type, ActionName1, <<"connected">>),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+            ?assertMatch(#{result := ok}, send_message(Type, ActionName2)),
+
+            %% Deleting also shouldn't disrupt a2.
+            ?assertMatch(
+                {204, _},
+                emqx_bridge_v2_testlib:delete_kind_api(action, Type, ActionName1)
+            ),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+            ?assertMatch(#{result := ok}, send_message(Type, ActionName2)),
+
+            ok
+        end,
+        fun(Trace) ->
+            ?assertEqual([], ?of_kind("kafka_producer_invalid_partition_count", Trace)),
+            ok
+        end
+    ),
+    ok.

+ 171 - 80
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl

@@ -127,23 +127,18 @@ init_per_testcase(TestCase, Config) ->
     common_init_per_testcase(TestCase, Config).
 
 end_per_testcase(_Testcase, Config) ->
-    case proplists:get_bool(skip_does_not_apply, Config) of
-        true ->
-            ok;
-        false ->
-            ok = emqx_config:delete_override_conf_files(),
-            ProxyHost = ?config(proxy_host, Config),
-            ProxyPort = ?config(proxy_port, Config),
-            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
-            emqx_bridge_v2_testlib:delete_all_bridges(),
-            stop_consumer(Config),
-            %% in CI, apparently this needs more time since the
-            %% machines struggle with all the containers running...
-            emqx_common_test_helpers:call_janitor(60_000),
-            ok = snabbkaffe:stop(),
-            flush_consumed(),
-            ok
-    end.
+    ok = emqx_config:delete_override_conf_files(),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    emqx_bridge_v2_testlib:delete_all_bridges(),
+    stop_consumer(Config),
+    %% in CI, apparently this needs more time since the
+    %% machines struggle with all the containers running...
+    emqx_common_test_helpers:call_janitor(60_000),
+    ok = snabbkaffe:stop(),
+    flush_consumed(),
+    ok.
 
 common_init_per_testcase(TestCase, Config0) ->
     ct:timetrap(timer:seconds(60)),
@@ -160,6 +155,10 @@ common_init_per_testcase(TestCase, Config0) ->
     ok = snabbkaffe:start_trace(),
     Config.
 
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
 create_connector(Name, Config) ->
     Connector = pulsar_connector(Config),
     {ok, _} = emqx_connector:create(?TYPE, Name, Connector).
@@ -174,69 +173,6 @@ create_action(Name, Config) ->
 delete_action(Name) ->
     ok = emqx_bridge_v2:remove(actions, ?TYPE, Name).
 
-%%------------------------------------------------------------------------------
-%% Testcases
-%%------------------------------------------------------------------------------
-
-t_action_probe(Config) ->
-    Name = atom_to_binary(?FUNCTION_NAME),
-    Action = pulsar_action(Config),
-    {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
-    ?assertMatch({{_, 204, _}, _, _}, Res0),
-    ok.
-
-t_action(Config) ->
-    Name = atom_to_binary(?FUNCTION_NAME),
-    create_action(Name, Config),
-    Actions = emqx_bridge_v2:list(actions),
-    Any = fun(#{name := BName}) -> BName =:= Name end,
-    ?assert(lists:any(Any, Actions), Actions),
-    Topic = <<"lkadfdaction">>,
-    {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
-        #{
-            sql => <<"select * from \"", Topic/binary, "\"">>,
-            id => atom_to_binary(?FUNCTION_NAME),
-            actions => [<<"pulsar:", Name/binary>>],
-            description => <<"bridge_v2 send msg to pulsar action">>
-        }
-    ),
-    on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
-    MQTTClientID = <<"pulsar_mqtt_clientid">>,
-    {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]),
-    {ok, _} = emqtt:connect(C1),
-    ReqPayload = payload(),
-    ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
-    {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]),
-    [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000),
-    ?assertEqual(MQTTClientID, ClientID),
-    ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)),
-    ok = emqtt:disconnect(C1),
-    InstanceId = instance_id(actions, Name),
-    ?retry(
-        100,
-        20,
-        ?assertMatch(
-            #{
-                counters := #{
-                    dropped := 0,
-                    success := 1,
-                    matched := 1,
-                    failed := 0,
-                    received := 0
-                }
-            },
-            emqx_resource:get_metrics(InstanceId)
-        )
-    ),
-    ok = delete_action(Name),
-    ActionsAfterDelete = emqx_bridge_v2:list(actions),
-    ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
-    ok.
-
-%%------------------------------------------------------------------------------
-%% Helper fns
-%%------------------------------------------------------------------------------
-
 pulsar_connector(Config) ->
     PulsarHost = ?config(pulsar_host, Config),
     PulsarPort = ?config(pulsar_port, Config),
@@ -455,3 +391,158 @@ maybe_skip_without_ci() ->
         _ ->
             {skip, no_pulsar}
     end.
+
+assert_status_api(Line, Type, Name, Status) ->
+    ?assertMatch(
+        {ok,
+            {{_, 200, _}, _, #{
+                <<"status">> := Status,
+                <<"node_status">> := [#{<<"status">> := Status}]
+            }}},
+        emqx_bridge_v2_testlib:get_bridge_api(Type, Name),
+        #{line => Line, name => Name, expected_status => Status}
+    ).
+-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_action_probe(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    Action = pulsar_action(Config),
+    {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
+    ?assertMatch({{_, 204, _}, _, _}, Res0),
+    ok.
+
+t_action(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    create_action(Name, Config),
+    Actions = emqx_bridge_v2:list(actions),
+    Any = fun(#{name := BName}) -> BName =:= Name end,
+    ?assert(lists:any(Any, Actions), Actions),
+    Topic = <<"lkadfdaction">>,
+    {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
+        #{
+            sql => <<"select * from \"", Topic/binary, "\"">>,
+            id => atom_to_binary(?FUNCTION_NAME),
+            actions => [<<"pulsar:", Name/binary>>],
+            description => <<"bridge_v2 send msg to pulsar action">>
+        }
+    ),
+    on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
+    MQTTClientID = <<"pulsar_mqtt_clientid">>,
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]),
+    {ok, _} = emqtt:connect(C1),
+    ReqPayload = payload(),
+    ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
+    {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]),
+    [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000),
+    ?assertEqual(MQTTClientID, ClientID),
+    ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)),
+    ok = emqtt:disconnect(C1),
+    InstanceId = instance_id(actions, Name),
+    ?retry(
+        100,
+        20,
+        ?assertMatch(
+            #{
+                counters := #{
+                    dropped := 0,
+                    success := 1,
+                    matched := 1,
+                    failed := 0,
+                    received := 0
+                }
+            },
+            emqx_resource:get_metrics(InstanceId)
+        )
+    ),
+    ok = delete_action(Name),
+    ActionsAfterDelete = emqx_bridge_v2:list(actions),
+    ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
+    ok.
+
+%% Tests that deleting/disabling an action that share the same Pulsar topic with other
+%% actions do not disturb the latter.
+t_multiple_actions_sharing_topic(Config) ->
+    Type = ?TYPE,
+    ConnectorName = <<"c">>,
+    ConnectorConfig = pulsar_connector(Config),
+    ActionConfig = pulsar_action(Config),
+    ?check_trace(
+        begin
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionName1 = <<"a1">>,
+            ActionParams1 = [
+                {action_config, ActionConfig},
+                {action_name, ActionName1},
+                {action_type, Type}
+            ],
+            ActionName2 = <<"a2">>,
+            ActionParams2 = [
+                {action_config, ActionConfig},
+                {action_name, ActionName2},
+                {action_type, Type}
+            ],
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams1),
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams2),
+
+            ?assertStatusAPI(Type, ActionName1, <<"connected">>),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+
+            RuleTopic = <<"t/a2">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [
+                {bridge_name, ActionName2}
+            ]),
+            {ok, C} = emqtt:start_link([]),
+            {ok, _} = emqtt:connect(C),
+            SendMessage = fun() ->
+                ReqPayload = payload(),
+                ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
+                {ok, _} = emqtt:publish(C, RuleTopic, #{}, ReqPayloadBin, [
+                    {qos, 1}, {retain, false}
+                ]),
+                ok
+            end,
+
+            %% Disabling a1 shouldn't disturb a2.
+            ?assertMatch(
+                {204, _}, emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName1)
+            ),
+
+            ?assertStatusAPI(Type, ActionName1, <<"disconnected">>),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+
+            ?assertMatch(ok, SendMessage()),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+
+            ?assertMatch(
+                {204, _},
+                emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName1)
+            ),
+            ?assertStatusAPI(Type, ActionName1, <<"connected">>),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+            ?assertMatch(ok, SendMessage()),
+
+            %% Deleting also shouldn't disrupt a2.
+            ?assertMatch(
+                {204, _},
+                emqx_bridge_v2_testlib:delete_kind_api(action, Type, ActionName1)
+            ),
+            ?assertStatusAPI(Type, ActionName2, <<"connected">>),
+            ?assertMatch(ok, SendMessage()),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.31"},
+    {vsn, "0.1.32"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 28 - 8
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1401,16 +1401,26 @@ apply_query_fun(
                 query_opts => QueryOpts,
                 min_query => minimize(Query)
             },
+            IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
             IsRetriable = false,
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
             case pre_query_channel_check(Request, Channels, QueryOpts) of
                 ok ->
-                    Result = Mod:on_query_async(
-                        extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
-                    ),
-                    {async_return, Result};
+                    case
+                        Mod:on_query_async(
+                            extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
+                        )
+                    of
+                        {error, _} = Error when IsSimpleQuery ->
+                            %% If this callback returns error, we assume it won't reply
+                            %% anything else and won't retry.
+                            maybe_reply_to(Error, QueryOpts),
+                            Error;
+                        Result ->
+                            {async_return, Result}
+                    end;
                 Error ->
                     maybe_reply_to(Error, QueryOpts)
             end
@@ -1480,16 +1490,26 @@ apply_query_fun(
             Requests = lists:map(
                 fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch
             ),
+            IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
             IsRetriable = false,
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
             case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
                 ok ->
-                    Result = Mod:on_batch_query_async(
-                        extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
-                    ),
-                    {async_return, Result};
+                    case
+                        Mod:on_batch_query_async(
+                            extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
+                        )
+                    of
+                        {error, _} = Error when IsSimpleQuery ->
+                            %% If this callback returns error, we assume it won't reply
+                            %% anything else and won't retry.
+                            maybe_reply_to(Error, QueryOpts),
+                            Error;
+                        Result ->
+                            {async_return, Result}
+                    end;
                 Error ->
                     maybe_reply_to(Error, QueryOpts)
             end

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -2,7 +2,7 @@
 {application, emqx_rule_engine, [
     {description, "EMQX Rule Engine"},
     % strict semver, bump manually!
-    {vsn, "5.1.2"},
+    {vsn, "5.1.3"},
     {modules, []},
     {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
     {applications, [

+ 2 - 0
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -20,6 +20,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_trace.hrl").
 -include_lib("emqx_resource/include/emqx_resource_errors.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 -export([
     apply_rule/3,
@@ -58,6 +59,7 @@
 %%------------------------------------------------------------------------------
 -spec apply_rules(list(rule()), columns(), envs()) -> ok.
 apply_rules([], _Columns, _Envs) ->
+    ?tp("rule_engine_applied_all_rules", #{}),
     ok;
 apply_rules([#{enable := false} | More], Columns, Envs) ->
     apply_rules(More, Columns, Envs);

+ 2 - 4
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -216,10 +216,8 @@ init_per_group(metrics_fail_simple, Config) ->
         (_) -> simple_async
     end),
     meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
-    meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) ->
-        Result = {error, {unrecoverable_error, mecked_failure}},
-        erlang:apply(ReplyFun, Args ++ [Result]),
-        Result
+    meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {_ReplyFun, _Args}, _) ->
+        {error, {unrecoverable_error, mecked_failure}}
     end),
     [{mecked, [?BRIDGE_IMPL]} | Config];
 init_per_group(_Groupname, Config) ->

+ 3 - 0
changes/ee/breaking-13327.en.md

@@ -0,0 +1,3 @@
+The directory path scheme for on-disk Kafka/Confluent/Azure Event Hub buffers has changed.  It now uses the Action name instead of the topic name.
+
+Upgrading to this version will invalidate (not use) old buffer files, and will require manual cleanup of the old directories.

+ 1 - 0
changes/ee/fix-13327.en.md

@@ -0,0 +1 @@
+Fixed an issue with Kafka, Confluent and Azure Event Hub bridges where different actions targeting the same topic could break one another when being deleted or disabled.

+ 1 - 1
mix.exs

@@ -211,7 +211,7 @@ defmodule EMQXUmbrella.MixProject do
       {:hstreamdb_erl,
        github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.10.5"},
+      {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.18.0"},