Prechádzať zdrojové kódy

Merge pull request #14429 from thalesmg/20241217-r58-fix-rule-metrics-oos

fix(rule metrics): fix handling of `actions.failed.out_of_service` counters
Thales Macedo Garitezi 1 rok pred
rodič
commit
ae6cd53e24

+ 77 - 2
apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

@@ -119,7 +119,7 @@ start_apps() ->
     ].
 
 setup_mocks() ->
-    MeckOpts = [passthrough, no_link, no_history, non_strict],
+    MeckOpts = [passthrough, no_link, no_history],
 
     catch meck:new(emqx_connector_schema, MeckOpts),
     meck:expect(emqx_connector_schema, fields, 1, con_schema()),
@@ -368,7 +368,13 @@ t_send_message(_) ->
 
 t_send_message_through_rule(_) ->
     BridgeName = my_test_bridge,
-    {ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()),
+    BridgeType = bridge_type(),
+    BridgeConfig0 =
+        emqx_utils_maps:deep_merge(
+            bridge_config(),
+            #{<<"resource_opts">> => #{<<"query_mode">> => <<"async">>}}
+        ),
+    {ok, _} = emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig0),
     %% Create a rule to send message to the bridge
     {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
         #{
@@ -398,6 +404,72 @@ t_send_message_through_rule(_) ->
     after 10000 ->
         ct:fail("Failed to receive message")
     end,
+    ?assertMatch(
+        #{
+            counters :=
+                #{
+                    'matched' := 1,
+                    'failed' := 0,
+                    'passed' := 1,
+                    'actions.success' := 1,
+                    'actions.failed' := 0,
+                    'actions.failed.unknown' := 0,
+                    'actions.failed.out_of_service' := 0
+                }
+        },
+        get_rule_metrics(RuleId)
+    ),
+    %% Now, turn off connector.  Should increase `actions.out_of_service' metric.
+    {ok, _} = emqx_connector:create(con_type(), con_name(), (con_config())#{<<"enable">> := false}),
+    emqx:publish(Msg),
+    ?retry(
+        100,
+        10,
+        ?assertMatch(
+            #{
+                counters :=
+                    #{
+                        'matched' := 2,
+                        'failed' := 0,
+                        'passed' := 2,
+                        'actions.success' := 1,
+                        'actions.failed' := 1,
+                        'actions.failed.unknown' := 0,
+                        'actions.failed.out_of_service' := 1,
+                        'actions.discarded' := 0
+                    }
+            },
+            get_rule_metrics(RuleId)
+        )
+    ),
+    %% Sync query
+    BridgeConfig1 =
+        emqx_utils_maps:deep_merge(
+            bridge_config(),
+            #{<<"resource_opts">> => #{<<"query_mode">> => <<"sync">>}}
+        ),
+    {ok, _} = emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig1),
+    emqx:publish(Msg),
+    ?retry(
+        100,
+        10,
+        ?assertMatch(
+            #{
+                counters :=
+                    #{
+                        'matched' := 3,
+                        'failed' := 0,
+                        'passed' := 3,
+                        'actions.success' := 1,
+                        'actions.failed' := 2,
+                        'actions.failed.unknown' := 0,
+                        'actions.failed.out_of_service' := 2,
+                        'actions.discarded' := 0
+                    }
+            },
+            get_rule_metrics(RuleId)
+        )
+    ),
     unregister(registered_process_name()),
     ok = emqx_bridge_v2:remove(bridge_type(), BridgeName),
     ok.
@@ -1179,3 +1251,6 @@ wait_until(_, _) ->
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
+
+get_rule_metrics(RuleId) ->
+    emqx_metrics_worker:get_metrics(rule_metrics, RuleId).

+ 91 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -1448,3 +1448,94 @@ t_inexistent_topic_after_created(Config) ->
         []
     ),
     ok.
+
+%% When the connector is disabled but the action is enabled, we should bump the rule
+%% metrics accordingly, bumping only `actions.out_of_service'.
+t_metrics_out_of_service(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig0 = proplists:get_value(
+        connector_config, Config, connector_config_toxiproxy(Config)
+    ),
+    ConnectorConfig = ConnectorConfig0#{<<"enable">> := false},
+    ActionName = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    Topic = atom_to_binary(?FUNCTION_NAME),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{<<"parameters">> => #{<<"query_mode">> => <<"async">>}}
+        )
+    ),
+    ConnectorParams = [
+        {connector_config, ConnectorConfig},
+        {connector_name, ConnectorName},
+        {connector_type, Type}
+    ],
+    ActionParams = [
+        {action_config, ActionConfig},
+        {action_name, ActionName},
+        {action_type, Type}
+    ],
+    {201, #{<<"enable">> := false}} =
+        simplify_result(emqx_bridge_v2_testlib:create_connector_api(ConnectorParams)),
+    {201, _} = simplify_result(emqx_bridge_v2_testlib:create_action_api(ActionParams)),
+    RuleTopic = <<"t/k/oos">>,
+    {ok, #{<<"id">> := RuleId}} =
+        emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [
+            {bridge_name, ActionName}
+        ]),
+    %% Async query
+    emqx:publish(emqx_message:make(RuleTopic, <<"a">>)),
+    ?retry(
+        100,
+        10,
+        ?assertMatch(
+            #{
+                counters :=
+                    #{
+                        'matched' := 1,
+                        'failed' := 0,
+                        'passed' := 1,
+                        'actions.success' := 0,
+                        'actions.failed' := 1,
+                        'actions.failed.out_of_service' := 1,
+                        'actions.failed.unknown' := 0,
+                        'actions.discarded' := 0
+                    }
+            },
+            get_rule_metrics(RuleId)
+        )
+    ),
+    %% Sync query
+    {200, _} = simplify_result(
+        emqx_bridge_v2_testlib:update_bridge_api(
+            ActionParams,
+            #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
+        )
+    ),
+    emqx:publish(emqx_message:make(RuleTopic, <<"a">>)),
+    ?retry(
+        100,
+        10,
+        ?assertMatch(
+            #{
+                counters :=
+                    #{
+                        'matched' := 2,
+                        'failed' := 0,
+                        'passed' := 2,
+                        'actions.success' := 0,
+                        'actions.failed' := 2,
+                        'actions.failed.out_of_service' := 2,
+                        'actions.failed.unknown' := 0,
+                        'actions.discarded' := 0
+                    }
+            },
+            get_rule_metrics(RuleId)
+        )
+    ),
+    ok.

+ 8 - 4
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1227,19 +1227,23 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
         %% This seems to be the only place where the `rm_status_stopped' status matters,
         %% to distinguish from the `disconnected' status.
         {ok, #rt{st_err = #{status := ?rm_status_stopped}}} ->
-            ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
+            Result = ?RESOURCE_ERROR(stopped, "resource stopped or disabled"),
+            maybe_reply_to(Result, QueryOpts);
         {ok, #rt{st_err = #{status := ?status_connecting, error := unhealthy_target}}} ->
-            {error, {unrecoverable_error, unhealthy_target}};
+            Result = {error, {unrecoverable_error, unhealthy_target}},
+            maybe_reply_to(Result, QueryOpts);
         {ok, #rt{st_err = #{status := Status}, cb = Resource, channel_status = ChanSt}} ->
             IsAlwaysSend = is_always_send(QueryOpts, Resource),
             case Status =:= ?status_connected orelse IsAlwaysSend of
                 true ->
                     call_query2(QM, Id, Index, Ref, Query, QueryOpts, Resource, ChanSt);
                 false ->
-                    ?RESOURCE_ERROR(not_connected, "resource not connected")
+                    Result = ?RESOURCE_ERROR(not_connected, "resource not connected"),
+                    maybe_reply_to(Result, QueryOpts)
             end;
         {error, not_found} ->
-            ?RESOURCE_ERROR(not_found, "resource not found")
+            Result = ?RESOURCE_ERROR(not_found, "resource not found"),
+            maybe_reply_to(Result, QueryOpts)
     end.
 
 call_query2(QM, Id, Index, Ref, Query, QueryOpts, Resource, ChanSt) ->

+ 7 - 10
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -404,12 +404,6 @@ handle_action(RuleId, ActId, Selected, Envs) ->
     try
         do_handle_action(RuleId, ActId, Selected, Envs)
     catch
-        throw:out_of_service ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
-            ok = emqx_metrics_worker:inc(
-                rule_metrics, RuleId, 'actions.failed.out_of_service'
-            ),
-            trace_action(ActId, "out_of_service", #{}, warning);
         throw:{discard, Reason} ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.discarded'),
             trace_action(ActId, "discarded", #{cause => Reason}, debug);
@@ -449,8 +443,6 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selec
     of
         {error, Reason} when Reason == bridge_not_found; Reason == bridge_disabled ->
             throw({discard, Reason});
-        ?RESOURCE_ERROR_M(R, _) when ?IS_RES_DOWN(R) ->
-            throw(out_of_service);
         Result ->
             Result
     end;
@@ -473,8 +465,6 @@ do_handle_action(
     of
         {error, Reason} when Reason == bridge_not_found; Reason == bridge_disabled ->
             throw({discard, Reason});
-        ?RESOURCE_ERROR_M(R, _) when ?IS_RES_DOWN(R) ->
-            throw(out_of_service);
         Result ->
             Result
     end;
@@ -776,6 +766,13 @@ do_inc_action_metrics(
     ),
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
+do_inc_action_metrics(
+    #{rule_id := RuleId, action_id := ActId},
+    ?RESOURCE_ERROR_M(R, _)
+) when ?IS_RES_DOWN(R) ->
+    ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
+    ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'),
+    trace_action(ActId, "out_of_service", #{}, warning);
 do_inc_action_metrics(
     #{rule_id := RuleId, action_id := ActId} = TraceContext,
     {error, {recoverable_error, _}} = Reason

+ 1 - 0
changes/ce/fix-14429.en.md

@@ -0,0 +1 @@
+Fixed handling of rule action metrics when an underlying connector is disabled.  Before the fix, the failed counter would be bumped twice for each message the action received (one categorized under `unknown` and another under `out_of_service`).  Now, only `out_of_service` is bumped.