Explorar o código

Merge pull request #12404 from thalesmg/fix-action-metric-handler-r55-20240126

fix(resource_metrics): avoid detaching handler on crashes
JianBo He %!s(int64=2) %!d(string=hai) anos
pai
achega
d286b7e28c

+ 52 - 10
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_resource_metrics).
 
+-include_lib("emqx/include/logger.hrl").
+
 -export([
     events/0,
     install_telemetry_handler/1,
@@ -118,6 +120,52 @@ handle_telemetry_event(
     _Metadata = #{resource_id := ID},
     _HandlerConfig
 ) ->
+    try
+        handle_counter_telemetry_event(Event, ID, Val)
+    catch
+        Kind:Reason:Stacktrace ->
+            %% We catch errors to avoid detaching the telemetry handler function.
+            %% When restarting a resource while it's under load, there might be transient
+            %% failures while the metrics are not yet created.
+            ?SLOG(warning, #{
+                msg => "handle_resource_metrics_failed",
+                hint => "transient failures may occur when restarting a resource",
+                kind => Kind,
+                reason => Reason,
+                stacktrace => Stacktrace,
+                resource_id => ID,
+                event => Event
+            }),
+            ok
+    end;
+handle_telemetry_event(
+    [?TELEMETRY_PREFIX, Event],
+    _Measurements = #{gauge_set := Val},
+    _Metadata = #{resource_id := ID, worker_id := WorkerID},
+    _HandlerConfig
+) ->
+    try
+        handle_gauge_telemetry_event(Event, ID, WorkerID, Val)
+    catch
+        Kind:Reason:Stacktrace ->
+            %% We catch errors to avoid detaching the telemetry handler function.
+            %% When restarting a resource while it's under load, there might be transient
+            %% failures while the metrics are not yet created.
+            ?SLOG(warning, #{
+                msg => "handle_resource_metrics_failed",
+                hint => "transient failures may occur when restarting a resource",
+                kind => Kind,
+                reason => Reason,
+                stacktrace => Stacktrace,
+                resource_id => ID,
+                event => Event
+            }),
+            ok
+    end;
+handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
+    ok.
+
+handle_counter_telemetry_event(Event, ID, Val) ->
     case Event of
         dropped_other ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
@@ -154,13 +202,9 @@ handle_telemetry_event(
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val);
         _ ->
             ok
-    end;
-handle_telemetry_event(
-    [?TELEMETRY_PREFIX, Event],
-    _Measurements = #{gauge_set := Val},
-    _Metadata = #{resource_id := ID, worker_id := WorkerID},
-    _HandlerConfig
-) ->
+    end.
+
+handle_gauge_telemetry_event(Event, ID, WorkerID, Val) ->
     case Event of
         inflight ->
             emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
@@ -168,9 +212,7 @@ handle_telemetry_event(
             emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
         _ ->
             ok
-    end;
-handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
-    ok.
+    end.
 
 %% Gauges (value can go both up and down):
 %% --------------------------------------

+ 53 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -28,6 +28,7 @@
 -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
 -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
 -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
+-define(TELEMETRY_PREFIX, emqx, resource).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -3006,6 +3007,36 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
         end
     ).
 
+t_telemetry_handler_crash(_Config) ->
+    %% Check that a crash while handling a telemetry event, such as when a busy resource
+    %% is restarted and its metrics are not recreated while handling an increment, does
+    %% not lead to the handler being uninstalled.
+    ?check_trace(
+        begin
+            NonExistentId = <<"I-dont-exist">>,
+            WorkerId = 1,
+            HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]),
+            ?assertMatch([_ | _], HandlersBefore),
+            lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()),
+            emqx_common_test_helpers:with_mock(
+                emqx_metrics_worker,
+                set_gauge,
+                fun(_Name, _Id, _WorkerId, _Metric, _Val) ->
+                    error(random_crash)
+                end,
+                fun() ->
+                    lists:foreach(
+                        fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns()
+                    )
+                end
+            ),
+            ?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -3235,3 +3266,25 @@ do_wait_until_all_marked_as_retriable(NumExpected, Seen) ->
                     })
             end
     end.
+
+counter_metric_inc_fns() ->
+    Mod = emqx_resource_metrics,
+    [
+        fun Mod:Fn/1
+     || {Fn, 1} <- Mod:module_info(functions),
+        case string:find(atom_to_list(Fn), "_inc", trailing) of
+            "_inc" -> true;
+            _ -> false
+        end
+    ].
+
+gauge_metric_set_fns() ->
+    Mod = emqx_resource_metrics,
+    [
+        fun Mod:Fn/3
+     || {Fn, 3} <- Mod:module_info(functions),
+        case string:find(atom_to_list(Fn), "_set", trailing) of
+            "_set" -> true;
+            _ -> false
+        end
+    ].

+ 1 - 0
changes/ce/fix-12404.en.md

@@ -0,0 +1 @@
+Fixed an issue where restarting a busy data integration could lead to data integration metrics to stop being collected.