Преглед изворни кода

fix(buffer worker, kafka): send reply when async call fails immediately

Fixes https://emqx.atlassian.net/browse/EMQX-12585
Thales Macedo Garitezi пре 1 година
родитељ
комит
ed5e6599d9

+ 2 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -329,7 +329,7 @@ on_query(
                 action_id => MessageTag,
                 query_mode => sync
             }),
-            {error, invalid_partition_count};
+            {error, {unrecoverable_error, invalid_partition_count}};
         throw:{bad_kafka_header, _} = Error ->
             ?tp(
                 emqx_bridge_kafka_impl_producer_sync_query_failed,
@@ -395,7 +395,7 @@ on_query_async(
                 action_id => MessageTag,
                 query_mode => async
             }),
-            {error, invalid_partition_count};
+            {error, {unrecoverable_error, invalid_partition_count}};
         throw:{bad_kafka_header, _} = Error ->
             ?tp(
                 emqx_bridge_kafka_impl_producer_async_query_failed,

+ 130 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -300,6 +300,9 @@ assert_status_api(Line, Type, Name, Status) ->
     ).
 -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
 
+get_rule_metrics(RuleId) ->
+    emqx_metrics_worker:get_metrics(rule_metrics, RuleId).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -678,6 +681,133 @@ t_ancient_v1_config_migration_without_local_topic(Config) ->
     ),
     ok.
 
+%% Checks that, if Kafka raises `invalid_partition_count' error, we bump the corresponding
+%% failure rule action metric.
+t_invalid_partition_count_metrics(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionName = <<"a">>,
+            ActionParams = [
+                {action_config, ActionConfig1},
+                {action_name, ActionName},
+                {action_type, Type}
+            ],
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams),
+            RuleTopic = <<"t/a">>,
+            {ok, #{<<"id">> := RuleId}} =
+                emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [
+                    {bridge_name, ActionName}
+                ]),
+
+            {ok, C} = emqtt:start_link([]),
+            {ok, _} = emqtt:connect(C),
+
+            %%--------------------------------------------
+            ?tp(notice, "sync", #{}),
+            %%--------------------------------------------
+            %% Artificially force sync query to be used; otherwise, it's only used when the
+            %% resource is blocked and retrying.
+            ok = meck:new(emqx_bridge_kafka_impl_producer, [passthrough, no_history]),
+            on_exit(fun() -> catch meck:unload() end),
+            ok = meck:expect(emqx_bridge_kafka_impl_producer, query_mode, 1, simple_sync),
+
+            %% Simulate `invalid_partition_count'
+            emqx_common_test_helpers:with_mock(
+                wolff,
+                send_sync,
+                fun(_Producers, _Msgs, _Timeout) ->
+                    error({invalid_partition_count, 0, partitioner})
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            emqtt:publish(C, RuleTopic, <<"hi">>, 2),
+                            #{
+                                ?snk_kind := "kafka_producer_invalid_partition_count",
+                                query_mode := sync
+                            }
+                        ),
+                    ?assertMatch(
+                        #{
+                            counters := #{
+                                'actions.total' := 1,
+                                'actions.failed' := 1
+                            }
+                        },
+                        get_rule_metrics(RuleId)
+                    ),
+                    ok
+                end
+            ),
+
+            %%--------------------------------------------
+            %% Same thing, but async call
+            ?tp(notice, "async", #{}),
+            %%--------------------------------------------
+            ok = meck:expect(
+                emqx_bridge_kafka_impl_producer,
+                query_mode,
+                fun(Conf) -> meck:passthrough([Conf]) end
+            ),
+            ok = emqx_bridge_v2:remove(actions, Type, ActionName),
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(
+                    ActionParams,
+                    #{<<"parameters">> => #{<<"query_mode">> => <<"async">>}}
+                ),
+
+            %% Simulate `invalid_partition_count'
+            emqx_common_test_helpers:with_mock(
+                wolff,
+                send,
+                fun(_Producers, _Msgs, _Timeout) ->
+                    error({invalid_partition_count, 0, partitioner})
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            emqtt:publish(C, RuleTopic, <<"hi">>, 2),
+                            #{?snk_kind := "rule_engine_applied_all_rules"}
+                        ),
+                    ?assertMatch(
+                        #{
+                            counters := #{
+                                'actions.total' := 2,
+                                'actions.failed' := 2
+                            }
+                        },
+                        get_rule_metrics(RuleId)
+                    ),
+                    ok
+                end
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [#{query_mode := sync}, #{query_mode := async} | _],
+                ?of_kind("kafka_producer_invalid_partition_count", Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
 %% Tests that deleting/disabling an action that share the same Kafka topic with other
 %% actions do not disturb the latter.
 t_multiple_actions_sharing_topic(Config) ->

+ 28 - 8
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1401,16 +1401,26 @@ apply_query_fun(
                 query_opts => QueryOpts,
                 min_query => minimize(Query)
             },
+            IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
             IsRetriable = false,
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
             case pre_query_channel_check(Request, Channels, QueryOpts) of
                 ok ->
-                    Result = Mod:on_query_async(
-                        extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
-                    ),
-                    {async_return, Result};
+                    case
+                        Mod:on_query_async(
+                            extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
+                        )
+                    of
+                        {error, _} = Error when IsSimpleQuery ->
+                            %% If this callback returns error, we assume it won't reply
+                            %% anything else and won't retry.
+                            maybe_reply_to(Error, QueryOpts),
+                            Error;
+                        Result ->
+                            {async_return, Result}
+                    end;
                 Error ->
                     maybe_reply_to(Error, QueryOpts)
             end
@@ -1480,16 +1490,26 @@ apply_query_fun(
             Requests = lists:map(
                 fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch
             ),
+            IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
             IsRetriable = false,
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
             case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
                 ok ->
-                    Result = Mod:on_batch_query_async(
-                        extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
-                    ),
-                    {async_return, Result};
+                    case
+                        Mod:on_batch_query_async(
+                            extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
+                        )
+                    of
+                        {error, _} = Error when IsSimpleQuery ->
+                            %% If this callback returns error, we assume it won't reply
+                            %% anything else and won't retry.
+                            maybe_reply_to(Error, QueryOpts),
+                            Error;
+                        Result ->
+                            {async_return, Result}
+                    end;
                 Error ->
                     maybe_reply_to(Error, QueryOpts)
             end

+ 2 - 0
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -20,6 +20,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_trace.hrl").
 -include_lib("emqx_resource/include/emqx_resource_errors.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 -export([
     apply_rule/3,
@@ -58,6 +59,7 @@
 %%------------------------------------------------------------------------------
 -spec apply_rules(list(rule()), columns(), envs()) -> ok.
 apply_rules([], _Columns, _Envs) ->
+    ?tp("rule_engine_applied_all_rules", #{}),
     ok;
 apply_rules([#{enable := false} | More], Columns, Envs) ->
     apply_rules(More, Columns, Envs);

+ 2 - 4
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -216,10 +216,8 @@ init_per_group(metrics_fail_simple, Config) ->
         (_) -> simple_async
     end),
     meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
-    meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) ->
-        Result = {error, {unrecoverable_error, mecked_failure}},
-        erlang:apply(ReplyFun, Args ++ [Result]),
-        Result
+    meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {_ReplyFun, _Args}, _) ->
+        {error, {unrecoverable_error, mecked_failure}}
     end),
     [{mecked, [?BRIDGE_IMPL]} | Config];
 init_per_group(_Groupname, Config) ->