Просмотр исходного кода

Merge pull request #14226 from thalesmg/20241114-r58-resource-missing-metrics

fix(resource metrics): throttle resource metrics handling error message, ensure metrics when restarting or resetting metrics
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
39a0e3e654

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -251,5 +251,11 @@
     "listen": "0.0.0.0:8362",
     "upstream": "datalayers_tls:8362",
     "enabled": true
+  },
+  {
+    "name": "mongo_single_tcp",
+    "listen": "0.0.0.0:27017",
+    "upstream": "mongo:27017",
+    "enabled": true
   }
 ]

+ 1 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -1004,7 +1004,7 @@ clear_screen() ->
     end.
 
 with_mock(Mod, FnName, MockedFn, Fun) ->
-    ok = meck:new(Mod, [non_strict, no_link, no_history, passthrough]),
+    ok = meck:new(Mod, [no_link, no_history, passthrough]),
     ok = meck:expect(Mod, FnName, MockedFn),
     try
         Fun()

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.2.6"},
+    {vsn, "0.2.7"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 11 - 2
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -524,7 +524,6 @@ uninstall_bridge_v2(
     BridgeV2Id = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName),
     CreationOpts = emqx_resource:fetch_creation_opts(Config),
     ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
-    ok = emqx_resource:clear_metrics(BridgeV2Id),
     case referenced_connectors_exist(BridgeV2Type, ConnectorName, BridgeName) of
         {error, _} ->
             ok;
@@ -533,7 +532,14 @@ uninstall_bridge_v2(
             ConnectorId = emqx_connector_resource:resource_id(
                 connector_type(BridgeV2Type), ConnectorName
             ),
-            emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id)
+            Res = emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id),
+            case Res of
+                ok ->
+                    ok = emqx_resource:clear_metrics(BridgeV2Id);
+                _ ->
+                    ok
+            end,
+            Res
     end.
 
 combine_connector_and_bridge_v2_config(
@@ -1182,6 +1188,9 @@ post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf
         {error, timeout} ->
             ErrorContext = #{
                 error => uninstall_timeout,
+                bridge_kind => ConfRootKey,
+                type => BridgeType,
+                name => BridgeName,
                 reason => <<
                     "Timed out trying to remove action or source.  Please try again and,"
                     " if the error persists, try disabling the connector before retrying."

+ 10 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -486,6 +486,16 @@ get_action_api(Config) ->
     ct:pal("get action (http) result:\n  ~p", [Res]),
     Res.
 
+get_action_metrics_api(Config) ->
+    ActionName = ?config(action_name, Config),
+    ActionType = ?config(action_type, Config),
+    ActionId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
+    Path = emqx_mgmt_api_test_util:api_path(["actions", ActionId, "metrics"]),
+    ct:pal("getting action (http)"),
+    Res = request(get, Path, []),
+    ct:pal("get action (http) result:\n  ~p", [Res]),
+    simplify_result(Res).
+
 update_bridge_api(Config) ->
     update_bridge_api(Config, _Overrides = #{}).
 

+ 1 - 18
apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl

@@ -1003,25 +1003,8 @@ t_authentication_error_on_send_message(Config0) ->
         end,
     Config = lists:keyreplace(greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig}),
 
-    % Fake initialization to simulate credential update after bridge was created.
-    emqx_common_test_helpers:with_mock(
-        greptimedb,
-        check_auth,
-        fun(_) ->
-            ok
-        end,
-        fun() ->
-            {ok, _} = create_bridge(Config),
-            ResourceId = resource_id(Config0),
-            ?retry(
-                _Sleep = 1_000,
-                _Attempts = 10,
-                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
-            )
-        end
-    ),
+    {ok, _} = create_bridge(Config),
 
-    % Now back to wrong credentials
     ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
     Payload = #{
         int_key => -123,

+ 1 - 0
apps/emqx_bridge_mongodb/docker-ct

@@ -1,2 +1,3 @@
+toxiproxy
 mongo
 mongo_rs_sharded

+ 1 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mongodb, [
     {description, "EMQX Enterprise MongoDB Bridge"},
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {registered, []},
     {applications, [
         kernel,

+ 7 - 2
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl

@@ -61,8 +61,13 @@ on_get_channel_status(InstanceId, _ChannelId, State) ->
 on_get_channels(InstanceId) ->
     emqx_bridge_v2:get_channels_for_connector(InstanceId).
 
-on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
-    emqx_mongodb:on_get_status(InstanceId, ConnectorState).
+on_get_status(InstanceId, ConnectorState = #{connector_state := DriverState0}) ->
+    case emqx_mongodb:on_get_status(InstanceId, DriverState0) of
+        {Status, DriverState, Reason} ->
+            {Status, ConnectorState#{connector_state := DriverState}, Reason};
+        Status when is_atom(Status) ->
+            Status
+    end.
 
 on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_state := ConnectorState}) ->
     #{

+ 148 - 13
apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl

@@ -19,9 +19,10 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(BRIDGE_TYPE, mongodb).
--define(BRIDGE_TYPE_BIN, <<"mongodb">>).
+-define(ACTION_TYPE, mongodb).
+-define(ACTION_TYPE_BIN, <<"mongodb">>).
 -define(CONNECTOR_TYPE, mongodb).
 -define(CONNECTOR_TYPE_BIN, <<"mongodb">>).
 
@@ -36,8 +37,11 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"),
+    MongoHost = os:getenv("MONGO_SINGLE_HOST", "toxiproxy"),
     MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
+    ProxyHost = "toxiproxy",
+    ProxyPort = 8474,
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
         true ->
             Apps = emqx_cth_suite:start(
@@ -49,14 +53,15 @@ init_per_suite(Config) ->
                     emqx_bridge_mongodb,
                     emqx_rule_engine,
                     emqx_management,
-                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                    emqx_mgmt_api_test_util:emqx_dashboard()
                 ],
                 #{work_dir => emqx_cth_suite:work_dir(Config)}
             ),
-            {ok, Api} = emqx_common_test_http:create_default_app(),
             [
+                {proxy_name, "mongo_single_tcp"},
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
                 {apps, Apps},
-                {api, Api},
                 {mongo_host, MongoHost},
                 {mongo_port, MongoPort}
                 | Config
@@ -80,6 +85,9 @@ init_per_testcase(TestCase, Config) ->
 
 common_init_per_testcase(TestCase, Config) ->
     ct:timetrap(timer:seconds(60)),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     emqx_config:delete_override_conf_files(),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
@@ -97,15 +105,16 @@ common_init_per_testcase(TestCase, Config) ->
         | Config
     ],
     ConnectorConfig = connector_config(Name, NConfig),
-    BridgeConfig = bridge_config(Name, Name),
+    BridgeConfig = action_config(Name, Name),
     ok = snabbkaffe:start_trace(),
     [
+        {bridge_kind, action},
         {connector_type, ?CONNECTOR_TYPE},
         {connector_name, Name},
         {connector_config, ConnectorConfig},
-        {bridge_type, ?BRIDGE_TYPE},
-        {bridge_name, Name},
-        {bridge_config, BridgeConfig}
+        {action_type, ?ACTION_TYPE},
+        {action_name, Name},
+        {action_config, BridgeConfig}
         | NConfig
     ].
 
@@ -164,7 +173,7 @@ parse_and_check_connector_config(InnerConfigMap, Name) ->
     ct:pal("parsed config: ~p", [Config]),
     InnerConfigMap.
 
-bridge_config(Name, ConnectorId) ->
+action_config(Name, ConnectorId) ->
     InnerConfigMap0 =
         #{
             <<"enable">> => true,
@@ -197,7 +206,7 @@ serde_roundtrip(InnerConfigMap0) ->
     InnerConfigMap.
 
 parse_and_check_bridge_config(InnerConfigMap, Name) ->
-    emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
+    emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, InnerConfigMap).
 
 shared_secret_path() ->
     os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
@@ -221,6 +230,39 @@ make_message() ->
         timestamp => Time
     }.
 
+create_bridge_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_bridge_api(Config, Overrides)
+    ).
+
+create_action_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_kind_api(Config, Overrides)
+    ).
+
+get_connector_api(Config) ->
+    ConnectorName = ?config(connector_name, Config),
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:get_connector_api(
+            ?CONNECTOR_TYPE,
+            ConnectorName
+        )
+    ).
+
+get_action_api(Config) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:get_action_api(
+            Config
+        )
+    ).
+
+delete_action_api(Config) ->
+    emqx_bridge_v2_testlib:delete_kind_api(
+        action,
+        ?ACTION_TYPE,
+        ?config(action_name, Config)
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -234,7 +276,7 @@ t_create_via_http(Config) ->
     ok.
 
 t_on_get_status(Config) ->
-    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    emqx_bridge_v2_testlib:t_on_get_status(Config),
     ok.
 
 t_sync_query(Config) ->
@@ -245,3 +287,96 @@ t_sync_query(Config) ->
         mongo_bridge_connector_on_query_return
     ),
     ok.
+
+%% Checks that we don't mangle the connector state if the underlying connector
+%% implementation returns a tuple with new state.
+%% See also: https://emqx.atlassian.net/browse/EMQX-13496.
+t_timeout_during_connector_health_check(Config0) ->
+    ProxyName = ?config(proxy_name, Config0),
+    ProxyHost = ?config(proxy_host, Config0),
+    ProxyPort = ?config(proxy_port, Config0),
+    ConnectorName = ?config(connector_name, Config0),
+    Overrides = #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"700ms">>}},
+    Config = emqx_bridge_v2_testlib:proplist_update(
+        Config0,
+        connector_config,
+        fun(Cfg) ->
+            emqx_utils_maps:deep_merge(Cfg, Overrides)
+        end
+    ),
+    ?check_trace(
+        begin
+            {201, _} = create_bridge_api(Config, Overrides),
+
+            %% emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
+            %%   ?retry(500, 10, ?assertEqual(<<"disconnected">>, GetConnectorStatus())),
+            %%   ok
+            %% end),
+            %% Wait until it's disconnected
+            emqx_common_test_helpers:with_mock(
+                emqx_resource_pool,
+                health_check_workers,
+                fun(_Pool, _Fun, _Timeout, _Opts) -> {error, timeout} end,
+                fun() ->
+                    ?retry(
+                        1_000,
+                        10,
+                        ?assertMatch(
+                            {200, #{<<"status">> := <<"disconnected">>}},
+                            get_connector_api(Config)
+                        )
+                    ),
+                    ?retry(
+                        1_000,
+                        10,
+                        ?assertMatch(
+                            {200, #{<<"status">> := <<"disconnected">>}},
+                            get_action_api(Config)
+                        )
+                    ),
+                    ok
+                end
+            ),
+
+            %% Wait for recovery
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_connector_api(Config)
+                )
+            ),
+
+            RuleTopic = <<"t/timeout">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
+                ?ACTION_TYPE, RuleTopic, Config
+            ),
+
+            %% Action should be fine, including its metrics.
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_action_api(Config)
+                )
+            ),
+            emqx:publish(emqx_message:make(RuleTopic, <<"hey">>)),
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"metrics">> := #{<<"matched">> := 1}}},
+                    emqx_bridge_v2_testlib:get_action_metrics_api(Config)
+                )
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertEqual([], ?of_kind("health_check_exception", Trace)),
+            ?assertEqual([], ?of_kind("remove_channel_failed", Trace)),
+            ok
+        end
+    ),
+    ok.

+ 1 - 0
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -90,6 +90,7 @@
     dropped_msg_due_to_mqueue_is_full,
     external_broker_crashed,
     failed_to_retain_message,
+    handle_resource_metrics_failed,
     socket_receive_paused_by_rate_limit,
     unrecoverable_resource_error
 ]).

+ 27 - 21
apps/emqx_resource/src/emqx_resource.erl

@@ -51,6 +51,7 @@
     reset_metrics_local/2,
     %% Create metrics for a resource ID
     create_metrics/1,
+    ensure_metrics/1,
     %% Delete metrics for a resource ID
     clear_metrics/1
 ]).
@@ -725,33 +726,38 @@ deallocate_resource(InstanceId, Key) ->
 
 -spec create_metrics(resource_id()) -> ok.
 create_metrics(ResId) ->
-    emqx_metrics_worker:create_metrics(
-        ?RES_METRICS,
-        ResId,
-        [
-            'matched',
-            'retried',
-            'retried.success',
-            'retried.failed',
-            'success',
-            'late_reply',
-            'failed',
-            'dropped',
-            'dropped.expired',
-            'dropped.queue_full',
-            'dropped.resource_not_found',
-            'dropped.resource_stopped',
-            'dropped.other',
-            'received'
-        ],
-        [matched]
-    ).
+    emqx_metrics_worker:create_metrics(?RES_METRICS, ResId, metrics(), rate_metrics()).
+
+-spec ensure_metrics(resource_id()) -> {ok, created | already_created}.
+ensure_metrics(ResId) ->
+    emqx_metrics_worker:ensure_metrics(?RES_METRICS, ResId, metrics(), rate_metrics()).
 
 -spec clear_metrics(resource_id()) -> ok.
 clear_metrics(ResId) ->
     emqx_metrics_worker:clear_metrics(?RES_METRICS, ResId).
 %% =================================================================================
 
+metrics() ->
+    [
+        'matched',
+        'retried',
+        'retried.success',
+        'retried.failed',
+        'success',
+        'late_reply',
+        'failed',
+        'dropped',
+        'dropped.expired',
+        'dropped.queue_full',
+        'dropped.resource_not_found',
+        'dropped.resource_stopped',
+        'dropped.other',
+        'received'
+    ].
+
+rate_metrics() ->
+    ['matched'].
+
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 

+ 13 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -399,6 +399,7 @@ get_metrics(ResId) ->
 %% @doc Reset the metrics for the specified resource
 -spec reset_metrics(resource_id()) -> ok.
 reset_metrics(ResId) ->
+    ok = ensure_metrics(ResId),
     emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId).
 
 %% @doc Returns the data for all resources
@@ -788,6 +789,7 @@ handle_remove_event(From, ClearMetrics, Data) ->
 start_resource(Data, From) ->
     %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
     #data{id = ResId, mod = Mod, config = Config, group = Group, type = Type} = Data,
+    ok = ensure_metrics(ResId),
     case emqx_resource:call_start(ResId, Mod, Config) of
         {ok, ResourceState} ->
             UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
@@ -952,6 +954,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
                 added_channels = NewAddedChannelsMap
             };
         {error, Reason} ->
+            ?tp("remove_channel_failed", #{resource_id => ResId, reason => Reason}),
             ?SLOG(
                 log_level(IsDryRun),
                 #{
@@ -1080,6 +1083,7 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
             {keep_state, update_state(UpdatedData), [{reply, From, ok}]};
         {error, Reason} = Error ->
             IsDryRun = emqx_resource:is_dry_run(Id),
+            ?tp("remove_channel_failed", #{resource_id => Id, reason => Reason}),
             ?SLOG(
                 log_level(IsDryRun),
                 #{
@@ -1679,6 +1683,7 @@ parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
 parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
     {Status, NewState, {error, Error}};
 parse_health_check_result({error, Error}, Data) ->
+    ?tp("health_check_exception", #{resource_id => Data#data.id, reason => Error}),
     ?SLOG(
         error,
         #{
@@ -1894,3 +1899,11 @@ abort_channel_health_check(Pid) ->
         {'EXIT', Pid, _} ->
             ok
     end.
+
+%% For still unknown reasons (e.g.: `emqx_metrics_worker' process might die?), metrics
+%% might be lost for a running resource, and future attempts to bump them result in
+%% errors.  As mitigation, we ensure such metrics are created here so that restarting
+%% the resource or resetting its metrics can recreate them.
+ensure_metrics(ResId) ->
+    {ok, _} = emqx_resource:ensure_metrics(ResId),
+    ok.

+ 6 - 4
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -130,10 +130,11 @@ handle_telemetry_event(
             %% We catch errors to avoid detaching the telemetry handler function.
             %% When restarting a resource while it's under load, there might be transient
             %% failures while the metrics are not yet created.
-            ?SLOG(
+            ?SLOG_THROTTLE(
                 warning,
+                ID,
                 #{
-                    msg => "handle_resource_metrics_failed",
+                    msg => handle_resource_metrics_failed,
                     hint => "transient failures may occur when restarting a resource",
                     kind => Kind,
                     reason => Reason,
@@ -158,10 +159,11 @@ handle_telemetry_event(
             %% We catch errors to avoid detaching the telemetry handler function.
             %% When restarting a resource while it's under load, there might be transient
             %% failures while the metrics are not yet created.
-            ?SLOG(
+            ?SLOG_THROTTLE(
                 warning,
+                ID,
                 #{
-                    msg => "handle_resource_metrics_failed",
+                    msg => handle_resource_metrics_failed,
                     hint => "transient failures may occur when restarting a resource",
                     kind => Kind,
                     reason => Reason,

+ 82 - 22
apps/emqx_utils/src/emqx_metrics_worker.erl

@@ -45,6 +45,7 @@
     get_counters/2,
     create_metrics/3,
     create_metrics/4,
+    ensure_metrics/4,
     clear_metrics/2,
     reset_metrics/2,
     has_metrics/2
@@ -135,6 +136,13 @@
     slides = #{} :: #{metric_id() => #{metric_name() => #slide{}}}
 }).
 
+%% calls/casts/infos
+-record(ensure_metrics, {
+    id :: metric_id(),
+    metrics :: [metric_spec() | metric_name()],
+    rate_metrics :: [metric_name()]
+}).
+
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
@@ -166,6 +174,16 @@ create_metrics(Name, Id, Metrics, RateMetrics) ->
     Metrics1 = desugar(Metrics),
     gen_server:call(Name, {create_metrics, Id, Metrics1, RateMetrics}).
 
+-spec ensure_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()], [atom()]) ->
+    {ok, created | already_created} | {error, term()}.
+ensure_metrics(Name, Id, Metrics0, RateMetrics) ->
+    Metrics = desugar(Metrics0),
+    gen_server:call(
+        Name,
+        #ensure_metrics{id = Id, metrics = Metrics, rate_metrics = RateMetrics},
+        infinity
+    ).
+
 -spec clear_metrics(handler_name(), metric_id()) -> ok.
 clear_metrics(Name, Id) ->
     gen_server:call(Name, {delete_metrics, Id}).
@@ -379,28 +397,12 @@ handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
             undefined -> make_rate(0, 0, 0);
             RatesPerId -> format_rates_of_id(RatesPerId)
         end, State};
-handle_call(
-    {create_metrics, Id, Metrics, RateMetrics},
-    _From,
-    State = #state{metric_ids = MIDs, rates = Rates, slides = Slides}
-) ->
-    case RateMetrics -- filter_counters(Metrics) of
-        [] ->
-            RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
-            Rate1 =
-                case Rates of
-                    undefined -> #{Id => RatePerId};
-                    _ -> Rates#{Id => RatePerId}
-                end,
-            Slides1 = Slides#{Id => create_slides(Metrics)},
-            {reply, create_counters(get_self_name(), Id, Metrics), State#state{
-                metric_ids = sets:add_element(Id, MIDs),
-                rates = Rate1,
-                slides = Slides1
-            }};
-        _ ->
-            {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
-    end;
+handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, State0) ->
+    {Result, State} = handle_create_metrics(State0, Id, Metrics, RateMetrics),
+    {reply, Result, State};
+handle_call(#ensure_metrics{id = Id, metrics = Metrics, rate_metrics = RateMetrics}, _From, State0) ->
+    {Result, State} = handle_ensure_metrics(State0, Id, Metrics, RateMetrics),
+    {reply, Result, State};
 handle_call(
     {delete_metrics, Id},
     _From,
@@ -702,3 +704,61 @@ create_slides(Metrics) ->
 get_self_name() ->
     {registered_name, Name} = process_info(self(), registered_name),
     Name.
+
+is_superset_of(RateMetrics, Metrics) ->
+    case RateMetrics -- filter_counters(Metrics) of
+        [] ->
+            true;
+        [_ | _] ->
+            false
+    end.
+
+handle_create_metrics(State0, Id, Metrics, RateMetrics) ->
+    #state{metric_ids = MIDs0, rates = Rates0, slides = Slides0} = State0,
+    case is_superset_of(RateMetrics, Metrics) of
+        true ->
+            RatesPerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
+            Rates =
+                case Rates0 of
+                    undefined -> #{Id => RatesPerId};
+                    _ -> Rates0#{Id => RatesPerId}
+                end,
+            Slides = Slides0#{Id => create_slides(Metrics)},
+            MIDs = sets:add_element(Id, MIDs0),
+            State = State0#state{
+                metric_ids = MIDs,
+                rates = Rates,
+                slides = Slides
+            },
+            Result = create_counters(get_self_name(), Id, Metrics),
+            {Result, State};
+        false ->
+            {{error, not_super_set_of, {RateMetrics, Metrics}}, State0}
+    end.
+
+handle_ensure_metrics(State0, Id, Metrics, RateMetrics) ->
+    #state{metric_ids = MIDs, rates = Rates, slides = Slides} = State0,
+    Name = get_self_name(),
+    CounterKeys = filter_counters(Metrics),
+    CurrentCounters = get_counters(Name, Id),
+    HasAllCounters = [] =:= (CounterKeys -- maps:keys(CurrentCounters)),
+    HasMetricId = sets:is_element(Id, MIDs),
+    CurrentRates =
+        %% todo: no need to have `undefined' here?
+        case Rates of
+            #{Id := Rs} -> maps:keys(Rs);
+            _ -> []
+        end,
+    HasRates = [] =:= (RateMetrics -- CurrentRates),
+    SlideKeys = [K || {slide, K} <- Metrics],
+    CurrentSlides = maps:keys(maps:get(Id, Slides, #{})),
+    HasSlides = [] =:= (SlideKeys -- CurrentSlides),
+    case HasMetricId andalso HasAllCounters andalso HasRates andalso HasSlides of
+        true ->
+            {{ok, already_created}, State0};
+        false ->
+            case handle_create_metrics(State0, Id, Metrics, RateMetrics) of
+                {ok, State} -> {{ok, created}, State};
+                {Result, State} -> {Result, State}
+            end
+    end.

+ 103 - 0
apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl

@@ -429,3 +429,106 @@ t_shift_gauge(_Config) ->
     ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)),
 
     ok.
+
+%% Tests that check the behavior of `ensure_metrics'.
+t_ensure_metrics(_Config) ->
+    Id1 = <<"id1">>,
+    Metrics1 = [c1, {counter, c2}, c3, {slide, s1}, {slide, s2}],
+    RateMetrics1 = [c2, c3],
+    %% Behaves as `create_metrics' if absent
+    ?assertEqual(
+        {ok, created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics1, RateMetrics1)
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+    %% Does nothing if everything is in place
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics1, RateMetrics1)
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+
+    %% Does nothing if asked to ensure a subset of existing metrics
+    Metrics2 = [c1],
+    RateMetrics2 = [c3],
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics2, RateMetrics2)
+    ),
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, [], [])
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+
+    %% If we have an initially smaller set of metrics, `ensure_metrics' will behave as
+    %% `create_metrics' if one is missing.
+    Id2 = <<"id2">>,
+    lists:foreach(
+        fun(
+            #{remove_from_metrics := RemoveFromMetrics, remove_from_rates := RemoveFromRates} = Ctx
+        ) ->
+            ok = emqx_metrics_worker:clear_metrics(?NAME, Id2),
+            Metrics3 = Metrics1 -- RemoveFromMetrics,
+            RateMetrics3 = RateMetrics1 -- RemoveFromRates,
+            ok = emqx_metrics_worker:create_metrics(?NAME, Id2, Metrics3, RateMetrics3),
+            ?assertEqual(
+                {ok, created},
+                emqx_metrics_worker:ensure_metrics(?NAME, Id2, Metrics1, RateMetrics1),
+                Ctx
+            ),
+            ?assertMatch(
+                #{
+                    counters := #{c1 := _, c2 := _, c3 := _},
+                    rate := #{c2 := _, c3 := _},
+                    gauges := #{},
+                    slides := #{s1 := _, s2 := _}
+                },
+                emqx_metrics_worker:get_metrics(?NAME, Id2)
+            ),
+            ok
+        end,
+        [
+            #{
+                remove_from_metrics => [c1],
+                remove_from_rates => []
+            },
+            #{
+                remove_from_metrics => [{counter, c2}],
+                remove_from_rates => [c2]
+            },
+            #{
+                remove_from_metrics => [{slide, s2}],
+                remove_from_rates => []
+            },
+            #{
+                remove_from_metrics => [],
+                remove_from_rates => RateMetrics1
+            }
+        ]
+    ),
+    ok.

+ 11 - 0
changes/ce/fix-14226.en.md

@@ -0,0 +1,11 @@
+Previously, under high stress, the node could lose track of a resource's (action/source) metrics and not be able to recover until the node is reboot.  This is now mitigated by attempting to recreate such metrics when either restarting the resource or resetting its metrics.
+
+Also, warning logs about failures to bump said metrics would flood the logs for "hot-path" metrics such as `matched`.  Now, such logs are throttled to avoid bloating log files.
+
+An example of such throttled log:
+
+```
+2024-11-14T13:56:44.134289+00:00 [warning] tag: RESOURCE, clientid: clientid, msg: handle_resource_metrics_failed, peername: 172.100.239.1:33896, reason: {badkey,matched}, stacktrace: [{erlang,map_get,[matched,#{}],[{error_info,#{module => erl_erts_errors}}]},{emqx_metrics_worker,idx_metric,4,[{file,"src/emqx_metrics_worker.erl"},{line,560}]},...
+
+2024-11-14T13:57:12.490503+00:00 [warning] msg: log_events_throttled_during_last_period, period: 1 minutes, 0 seconds, dropped: #{handle_resource_metrics_failed => 2294}
+```