Explorar o código

Merge pull request #10345 from thalesmg/fix-kafka-async-return-action-rv50

fix(rule_action): fix metrics for bridges returning `async_return`
Thales Macedo Garitezi %!s(int64=2) %!d(string=hai) anos
pai
achega
67ff058d46

+ 4 - 2
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -508,8 +508,6 @@ nested_put(Alias, Val, Columns0) ->
     emqx_rule_maps:nested_put(Alias, Val, Columns).
 
 -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
-inc_action_metrics(ok, RuleId) ->
-    emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
 inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
 inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
@@ -525,6 +523,10 @@ inc_action_metrics(R, RuleId) ->
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
     end.
 
+is_ok_result(ok) ->
+    true;
+is_ok_result({async_return, R}) ->
+    is_ok_result(R);
 is_ok_result(R) when is_tuple(R) ->
     ok == erlang:element(1, R);
 is_ok_result(_) ->

+ 4 - 1
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -309,7 +309,7 @@ kafka_bridge_rest_api_helper(Config) ->
     AtomsAfter = erlang:system_info(atom_count),
     ?assertEqual(AtomsBefore, AtomsAfter),
     %% Create a rule that uses the bridge
-    {ok, 201, _Rule} = http_post(
+    {ok, 201, Rule} = http_post(
         ["rules"],
         #{
             <<"name">> => <<"kafka_bridge_rest_api_helper_rule">>,
@@ -318,6 +318,7 @@ kafka_bridge_rest_api_helper(Config) ->
             <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">>
         }
     ),
+    #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
     %% counters should be empty before
     ?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)),
@@ -346,6 +347,8 @@ kafka_bridge_rest_api_helper(Config) ->
     %% Check crucial counters and gauges
     ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)),
     ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)),
+    ?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')),
+    ?assertEqual(0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')),
     ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),