Просмотр исходного кода

fix(rule_metrics): notify rule metrics of late replies and expired requests

Fixes https://emqx.atlassian.net/browse/EMQX-10600
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
eb41b77de4

+ 67 - 4
apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl

@@ -40,13 +40,13 @@ groups() ->
 
 init_per_suite(_Config) ->
     emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
-    ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]),
+    ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]),
     ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
     {ok, _} = application:ensure_all_started(emqx_connector),
     [].
 
 end_per_suite(_Config) ->
-    ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]),
+    ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]),
     ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
     _ = application:stop(emqx_connector),
     _ = application:stop(emqx_bridge),
@@ -77,13 +77,19 @@ init_per_testcase(t_too_many_requests, Config) ->
     ),
     ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
     [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
+init_per_testcase(t_rule_action_expired, Config) ->
+    [
+        {bridge_name, ?BRIDGE_NAME}
+        | Config
+    ];
 init_per_testcase(_TestCase, Config) ->
     Server = start_http_server(#{response_delay_ms => 0}),
     [{http_server, Server} | Config].
 
 end_per_testcase(TestCase, _Config) when
     TestCase =:= t_path_not_found;
-    TestCase =:= t_too_many_requests
+    TestCase =:= t_too_many_requests;
+    TestCase =:= t_rule_action_expired
 ->
     ok = emqx_bridge_http_connector_test_server:stop(),
     persistent_term:erase({?MODULE, times_called}),
@@ -202,6 +208,7 @@ parse_http_request_assertive(ReqStr0) ->
 bridge_async_config(#{port := Port} = Config) ->
     Type = maps:get(type, Config, ?BRIDGE_TYPE),
     Name = maps:get(name, Config, ?BRIDGE_NAME),
+    Host = maps:get(host, Config, "localhost"),
     Path = maps:get(path, Config, ""),
     PoolSize = maps:get(pool_size, Config, 1),
     QueryMode = maps:get(query_mode, Config, "async"),
@@ -218,7 +225,7 @@ bridge_async_config(#{port := Port} = Config) ->
         end,
     ConfigString = io_lib:format(
         "bridges.~s.~s {\n"
-        "  url = \"http://localhost:~p~s\"\n"
+        "  url = \"http://~s:~p~s\"\n"
         "  connect_timeout = \"~p\"\n"
         "  enable = true\n"
         %% local_topic
@@ -248,6 +255,7 @@ bridge_async_config(#{port := Port} = Config) ->
         [
             Type,
             Name,
+            Host,
             Port,
             Path,
             ConnectTimeout,
@@ -540,6 +548,61 @@ t_too_many_requests(Config) ->
     ),
     ok.
 
+t_rule_action_expired(Config) ->
+    ?check_trace(
+        begin
+            RuleTopic = <<"t/webhook/rule">>,
+            BridgeConfig = bridge_async_config(#{
+                type => ?BRIDGE_TYPE,
+                name => ?BRIDGE_NAME,
+                host => "non.existent.host",
+                port => 9999,
+                path => <<"/some/path">>,
+                resume_interval => "100ms",
+                connect_timeout => "1s",
+                request_timeout => "100ms",
+                resource_request_ttl => "100ms"
+            }),
+            {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
+            {ok, #{<<"id">> := RuleId}} =
+                emqx_bridge_testlib:create_rule_and_action_http(?BRIDGE_TYPE, RuleTopic, Config),
+            Msg = emqx_message:make(RuleTopic, <<"timeout">>),
+            emqx:publish(Msg),
+            ?retry(
+                _Interval = 500,
+                _NAttempts = 20,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            matched := 1,
+                            failed := 0,
+                            dropped := 1
+                        }
+                    },
+                    emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME)
+                )
+            ),
+            ?retry(
+                _Interval = 500,
+                _NAttempts = 20,
+                ?assertMatch(
+                    #{
+                        counters := #{
+                            matched := 1,
+                            'actions.failed' := 1,
+                            'actions.failed.unknown' := 1,
+                            'actions.total' := 1
+                        }
+                    },
+                    emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %% helpers
 do_t_async_retries(TestContext, Error, Fn) ->
     #{error_attempts := ErrorAttempts} = TestContext,

+ 5 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -24,7 +24,11 @@
 -type callback_mode() :: always_sync | async_if_possible.
 -type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
 -type result() :: term().
--type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
+-type reply_fun() ::
+    {fun((result(), Args :: term()) -> any()), Args :: term()}
+    | {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()}
+    | undefined.
+-type reply_context() :: #{reply_dropped => boolean()}.
 -type query_opts() :: #{
     %% The key used for picking a resource worker
     pick_key => term(),

+ 36 - 2
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -366,6 +366,7 @@ resume_from_blocked(Data) ->
                     true -> #{dropped_expired => length(Batch)};
                     false -> #{}
                 end,
+            batch_reply_dropped(Batch, {error, request_expired}),
             NData = aggregate_counters(Data, Counters),
             ?tp(buffer_worker_retry_expired, #{expired => Batch}),
             resume_from_blocked(NData);
@@ -378,6 +379,7 @@ resume_from_blocked(Data) ->
         {batch, Ref, NotExpired, Expired} ->
             NumExpired = length(Expired),
             ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
+            batch_reply_dropped(Expired, {error, request_expired}),
             NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
             ?tp(buffer_worker_retry_expired, #{expired => Expired}),
             %% We retry msgs in inflight window sync, as if we send them
@@ -484,6 +486,9 @@ do_reply_caller({F, Args}, {async_return, Result}) ->
     %% decision has to be made by the caller
     do_reply_caller({F, Args}, Result);
 do_reply_caller({F, Args}, Result) when is_function(F) ->
+    _ = erlang:apply(F, Args ++ [Result]),
+    ok;
+do_reply_caller({F, Args, _Context}, Result) when is_function(F) ->
     _ = erlang:apply(F, Args ++ [Result]),
     ok.
 
@@ -537,11 +542,13 @@ flush(Data0) ->
                 {[], _AllExpired} ->
                     ok = replayq:ack(Q1, QAckRef),
                     NumExpired = length(Batch),
+                    batch_reply_dropped(Batch, {error, request_expired}),
                     Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
                     ?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
                     flush(Data3);
                 {NotExpired, Expired} ->
                     NumExpired = length(Expired),
+                    batch_reply_dropped(Expired, {error, request_expired}),
                     Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
                     IsBatch = (BatchSize > 1),
                     %% We *must* use the new queue, because we currently can't
@@ -809,6 +816,28 @@ reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts)
     end,
     {ShouldAck, PostFn, DeltaCounters}.
 
+%% This is basically used only by rule actions.  To avoid rule action metrics from
+%% becoming inconsistent when we drop messages, we need a way to signal rule engine that
+%% this action has reached a conclusion.
+-spec reply_dropped(reply_fun(), {error, late_reply | request_expired}) -> ok.
+reply_dropped(_ReplyTo = {Fn, Args, #{reply_dropped := true}}, Result) when
+    is_function(Fn), is_list(Args)
+->
+    %% We want to avoid bumping metrics inside the buffer worker, since it's costly.
+    spawn(fun() -> erlang:apply(Fn, Args ++ [Result]) end),
+    ok;
+reply_dropped(_ReplyTo, _Result) ->
+    ok.
+
+-spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok.
+batch_reply_dropped(Batch, Result) ->
+    lists:foreach(
+        fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) ->
+            reply_dropped(ReplyTo, Result)
+        end,
+        Batch
+    ).
+
 %% This is only called by `simple_{,a}sync_query', so we can bump the
 %% counters here.
 handle_query_result(Id, Result, HasBeenSent) ->
@@ -1164,7 +1193,7 @@ handle_async_reply1(
         inflight_tid := InflightTID,
         resource_id := Id,
         buffer_worker := BufferWorkerPid,
-        min_query := ?QUERY(_, _, _, ExpireAt) = _Query
+        min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query
     } = ReplyContext,
     Result
 ) ->
@@ -1178,7 +1207,11 @@ handle_async_reply1(
             IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
             %% evalutate metrics call here since we're not inside
             %% buffer worker
-            IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
+            IsAcked andalso
+                begin
+                    emqx_resource_metrics:late_reply_inc(Id),
+                    reply_dropped(ReplyTo, {error, late_reply})
+                end,
             ?tp(handle_async_reply_expired, #{expired => [_Query]}),
             ok;
         false ->
@@ -1292,6 +1325,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
     %% evalutate metrics call here since we're not inside buffer
     %% worker
     emqx_resource_metrics:late_reply_inc(Id, NumExpired),
+    batch_reply_dropped(RealExpired, {error, late_reply}),
     case RealNotExpired of
         [] ->
             %% all expired, no need to update back the inflight batch

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -350,7 +350,7 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env
         "bridge_action",
         #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
     ),
-    ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]},
+    ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
     case
         emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
     of

+ 1 - 0
changes/ce/fix-11306.en.md

@@ -0,0 +1 @@
+Fixed rule action metrics inconsistency where dropped requests were not accounted for.