Преглед на файлове

chore(resource metrics): add `ensure_metrics` to check for absent metrics

Thales Macedo Garitezi преди 1 година
родител
ревизия
5cc8e39d93
променени са 3 файла, в които са добавени 212 реда и са изтрити 43 реда
  1. 27 21
      apps/emqx_resource/src/emqx_resource.erl
  2. 82 22
      apps/emqx_utils/src/emqx_metrics_worker.erl
  3. 103 0
      apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl

+ 27 - 21
apps/emqx_resource/src/emqx_resource.erl

@@ -51,6 +51,7 @@
     reset_metrics_local/2,
     %% Create metrics for a resource ID
     create_metrics/1,
+    ensure_metrics/1,
     %% Delete metrics for a resource ID
     clear_metrics/1
 ]).
@@ -725,33 +726,38 @@ deallocate_resource(InstanceId, Key) ->
 
 -spec create_metrics(resource_id()) -> ok.
 create_metrics(ResId) ->
-    emqx_metrics_worker:create_metrics(
-        ?RES_METRICS,
-        ResId,
-        [
-            'matched',
-            'retried',
-            'retried.success',
-            'retried.failed',
-            'success',
-            'late_reply',
-            'failed',
-            'dropped',
-            'dropped.expired',
-            'dropped.queue_full',
-            'dropped.resource_not_found',
-            'dropped.resource_stopped',
-            'dropped.other',
-            'received'
-        ],
-        [matched]
-    ).
+    emqx_metrics_worker:create_metrics(?RES_METRICS, ResId, metrics(), rate_metrics()).
+
+-spec ensure_metrics(resource_id()) -> {ok, created | already_created}.
+ensure_metrics(ResId) ->
+    emqx_metrics_worker:ensure_metrics(?RES_METRICS, ResId, metrics(), rate_metrics()).
 
 -spec clear_metrics(resource_id()) -> ok.
 clear_metrics(ResId) ->
     emqx_metrics_worker:clear_metrics(?RES_METRICS, ResId).
 %% =================================================================================
 
+metrics() ->
+    [
+        'matched',
+        'retried',
+        'retried.success',
+        'retried.failed',
+        'success',
+        'late_reply',
+        'failed',
+        'dropped',
+        'dropped.expired',
+        'dropped.queue_full',
+        'dropped.resource_not_found',
+        'dropped.resource_stopped',
+        'dropped.other',
+        'received'
+    ].
+
+rate_metrics() ->
+    ['matched'].
+
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 

+ 82 - 22
apps/emqx_utils/src/emqx_metrics_worker.erl

@@ -45,6 +45,7 @@
     get_counters/2,
     create_metrics/3,
     create_metrics/4,
+    ensure_metrics/4,
     clear_metrics/2,
     reset_metrics/2,
     has_metrics/2
@@ -135,6 +136,13 @@
     slides = #{} :: #{metric_id() => #{metric_name() => #slide{}}}
 }).
 
+%% calls/casts/infos
+-record(ensure_metrics, {
+    id :: metric_id(),
+    metrics :: [metric_spec() | metric_name()],
+    rate_metrics :: [metric_name()]
+}).
+
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
@@ -166,6 +174,16 @@ create_metrics(Name, Id, Metrics, RateMetrics) ->
     Metrics1 = desugar(Metrics),
     gen_server:call(Name, {create_metrics, Id, Metrics1, RateMetrics}).
 
+-spec ensure_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()], [atom()]) ->
+    {ok, created | already_created} | {error, term()}.
+ensure_metrics(Name, Id, Metrics0, RateMetrics) ->
+    Metrics = desugar(Metrics0),
+    gen_server:call(
+        Name,
+        #ensure_metrics{id = Id, metrics = Metrics, rate_metrics = RateMetrics},
+        infinity
+    ).
+
 -spec clear_metrics(handler_name(), metric_id()) -> ok.
 clear_metrics(Name, Id) ->
     gen_server:call(Name, {delete_metrics, Id}).
@@ -379,28 +397,12 @@ handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
             undefined -> make_rate(0, 0, 0);
             RatesPerId -> format_rates_of_id(RatesPerId)
         end, State};
-handle_call(
-    {create_metrics, Id, Metrics, RateMetrics},
-    _From,
-    State = #state{metric_ids = MIDs, rates = Rates, slides = Slides}
-) ->
-    case RateMetrics -- filter_counters(Metrics) of
-        [] ->
-            RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
-            Rate1 =
-                case Rates of
-                    undefined -> #{Id => RatePerId};
-                    _ -> Rates#{Id => RatePerId}
-                end,
-            Slides1 = Slides#{Id => create_slides(Metrics)},
-            {reply, create_counters(get_self_name(), Id, Metrics), State#state{
-                metric_ids = sets:add_element(Id, MIDs),
-                rates = Rate1,
-                slides = Slides1
-            }};
-        _ ->
-            {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
-    end;
+handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, State0) ->
+    {Result, State} = handle_create_metrics(State0, Id, Metrics, RateMetrics),
+    {reply, Result, State};
+handle_call(#ensure_metrics{id = Id, metrics = Metrics, rate_metrics = RateMetrics}, _From, State0) ->
+    {Result, State} = handle_ensure_metrics(State0, Id, Metrics, RateMetrics),
+    {reply, Result, State};
 handle_call(
     {delete_metrics, Id},
     _From,
@@ -702,3 +704,61 @@ create_slides(Metrics) ->
 get_self_name() ->
     {registered_name, Name} = process_info(self(), registered_name),
     Name.
+
+is_superset_of(RateMetrics, Metrics) ->
+    case RateMetrics -- filter_counters(Metrics) of
+        [] ->
+            true;
+        [_ | _] ->
+            false
+    end.
+
+handle_create_metrics(State0, Id, Metrics, RateMetrics) ->
+    #state{metric_ids = MIDs0, rates = Rates0, slides = Slides0} = State0,
+    case is_superset_of(RateMetrics, Metrics) of
+        true ->
+            RatesPerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
+            Rates =
+                case Rates0 of
+                    undefined -> #{Id => RatesPerId};
+                    _ -> Rates0#{Id => RatesPerId}
+                end,
+            Slides = Slides0#{Id => create_slides(Metrics)},
+            MIDs = sets:add_element(Id, MIDs0),
+            State = State0#state{
+                metric_ids = MIDs,
+                rates = Rates,
+                slides = Slides
+            },
+            Result = create_counters(get_self_name(), Id, Metrics),
+            {Result, State};
+        false ->
+            {{error, not_super_set_of, {RateMetrics, Metrics}}, State0}
+    end.
+
+handle_ensure_metrics(State0, Id, Metrics, RateMetrics) ->
+    #state{metric_ids = MIDs, rates = Rates, slides = Slides} = State0,
+    Name = get_self_name(),
+    CounterKeys = filter_counters(Metrics),
+    CurrentCounters = get_counters(Name, Id),
+    HasAllCounters = [] =:= (CounterKeys -- maps:keys(CurrentCounters)),
+    HasMetricId = sets:is_element(Id, MIDs),
+    CurrentRates =
+        %% todo: no need to have `undefined' here?
+        case Rates of
+            #{Id := Rs} -> maps:keys(Rs);
+            _ -> []
+        end,
+    HasRates = [] =:= (RateMetrics -- CurrentRates),
+    SlideKeys = [K || {slide, K} <- Metrics],
+    CurrentSlides = maps:keys(maps:get(Id, Slides, #{})),
+    HasSlides = [] =:= (SlideKeys -- CurrentSlides),
+    case HasMetricId andalso HasAllCounters andalso HasRates andalso HasSlides of
+        true ->
+            {{ok, already_created}, State0};
+        false ->
+            case handle_create_metrics(State0, Id, Metrics, RateMetrics) of
+                {ok, State} -> {{ok, created}, State};
+                {Result, State} -> {Result, State}
+            end
+    end.

+ 103 - 0
apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl

@@ -429,3 +429,106 @@ t_shift_gauge(_Config) ->
     ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)),
 
     ok.
+
+%% Tests that check the behavior of `ensure_metrics'.
+t_ensure_metrics(_Config) ->
+    Id1 = <<"id1">>,
+    Metrics1 = [c1, {counter, c2}, c3, {slide, s1}, {slide, s2}],
+    RateMetrics1 = [c2, c3],
+    %% Behaves as `create_metrics' if absent
+    ?assertEqual(
+        {ok, created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics1, RateMetrics1)
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+    %% Does nothing if everything is in place
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics1, RateMetrics1)
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+
+    %% Does nothing if asked to ensure a subset of existing metrics
+    Metrics2 = [c1],
+    RateMetrics2 = [c3],
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics2, RateMetrics2)
+    ),
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, [], [])
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+
+    %% If we have an initially smaller set of metrics, `ensure_metrics' will behave as
+    %% `create_metrics' if one is missing.
+    Id2 = <<"id2">>,
+    lists:foreach(
+        fun(
+            #{remove_from_metrics := RemoveFromMetrics, remove_from_rates := RemoveFromRates} = Ctx
+        ) ->
+            ok = emqx_metrics_worker:clear_metrics(?NAME, Id2),
+            Metrics3 = Metrics1 -- RemoveFromMetrics,
+            RateMetrics3 = RateMetrics1 -- RemoveFromRates,
+            ok = emqx_metrics_worker:create_metrics(?NAME, Id2, Metrics3, RateMetrics3),
+            ?assertEqual(
+                {ok, created},
+                emqx_metrics_worker:ensure_metrics(?NAME, Id2, Metrics1, RateMetrics1),
+                Ctx
+            ),
+            ?assertMatch(
+                #{
+                    counters := #{c1 := _, c2 := _, c3 := _},
+                    rate := #{c2 := _, c3 := _},
+                    gauges := #{},
+                    slides := #{s1 := _, s2 := _}
+                },
+                emqx_metrics_worker:get_metrics(?NAME, Id2)
+            ),
+            ok
+        end,
+        [
+            #{
+                remove_from_metrics => [c1],
+                remove_from_rates => []
+            },
+            #{
+                remove_from_metrics => [{counter, c2}],
+                remove_from_rates => [c2]
+            },
+            #{
+                remove_from_metrics => [{slide, s2}],
+                remove_from_rates => []
+            },
+            #{
+                remove_from_metrics => [],
+                remove_from_rates => RateMetrics1
+            }
+        ]
+    ),
+    ok.