Parcourir la source

feat(emqx_metrics): Sliding window samples

ieQu1 il y a 3 ans
Parent
commit
5d9f9671e9

+ 227 - 44
apps/emqx/src/emqx_metrics_worker.erl

@@ -31,6 +31,7 @@
 -export([
     inc/3,
     inc/4,
+    observe/4,
     get/3,
     get_gauge/3,
     set_gauge/5,
@@ -38,6 +39,8 @@
     get_gauges/2,
     delete_gauges/2,
     get_rate/2,
+    get_slide/2,
+    get_slide/3,
     get_counters/2,
     create_metrics/3,
     create_metrics/4,
@@ -67,7 +70,16 @@
 -define(SAMPLING, 1).
 -endif.
 
--export_type([metrics/0, handler_name/0, metric_id/0]).
+-export_type([metrics/0, handler_name/0, metric_id/0, metric_spec/0]).
+
+% Default
+-type metric_type() ::
+    %% Simple counter
+    counter
+    %% Sliding window average
+    | slide.
+
+-type metric_spec() :: {metric_type(), atom()}.
 
 -type rate() :: #{
     current := float(),
@@ -77,6 +89,7 @@
 -type metrics() :: #{
     counters := #{metric_name() => integer()},
     gauges := #{metric_name() => integer()},
+    slides := #{metric_name() => number()},
     rate := #{metric_name() => rate()}
 }.
 -type handler_name() :: atom().
@@ -103,9 +116,22 @@
     last5m_smpl = [] :: list()
 }).
 
+-record(slide_datapoint, {
+    sum :: non_neg_integer(),
+    samples :: non_neg_integer(),
+    time :: non_neg_integer()
+}).
+
+-record(slide, {
+    %% Total number of samples through the history
+    n_samples = 0 :: non_neg_integer(),
+    datapoints = [] :: [#slide_datapoint{}]
+}).
+
 -record(state, {
     metric_ids = sets:new(),
-    rates :: undefined | #{metric_id() => #rate{}}
+    rates :: #{metric_id() => #{metric_name() => #rate{}}} | undefined,
+    slides = #{} :: #{metric_id() => #{metric_name() => #slide{}}}
 }).
 
 %%------------------------------------------------------------------------------
@@ -126,14 +152,18 @@ child_spec(ChldName, Name) ->
         modules => [emqx_metrics_worker]
     }.
 
--spec create_metrics(handler_name(), metric_id(), [metric_name()]) -> ok | {error, term()}.
+-spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()]) ->
+    ok | {error, term()}.
 create_metrics(Name, Id, Metrics) ->
-    create_metrics(Name, Id, Metrics, Metrics).
+    Metrics1 = desugar(Metrics),
+    Counters = filter_counters(Metrics1),
+    create_metrics(Name, Id, Metrics1, Counters).
 
--spec create_metrics(handler_name(), metric_id(), [metric_name()], [metric_name()]) ->
+-spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()], [atom()]) ->
     ok | {error, term()}.
 create_metrics(Name, Id, Metrics, RateMetrics) ->
-    gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}).
+    Metrics1 = desugar(Metrics),
+    gen_server:call(Name, {create_metrics, Id, Metrics1, RateMetrics}).
 
 -spec clear_metrics(handler_name(), metric_id()) -> ok.
 clear_metrics(Name, Id) ->
@@ -156,7 +186,7 @@ get(Name, Id, Metric) ->
         not_found ->
             0;
         Ref when is_atom(Metric) ->
-            counters:get(Ref, idx_metric(Name, Id, Metric));
+            counters:get(Ref, idx_metric(Name, Id, counter, Metric));
         Ref when is_integer(Metric) ->
             counters:get(Ref, Metric)
     end.
@@ -171,21 +201,37 @@ get_counters(Name, Id) ->
         fun(_Metric, Index) ->
             get(Name, Id, Index)
         end,
-        get_indexes(Name, Id)
+        get_indexes(Name, counter, Id)
     ).
 
+-spec get_slide(handler_name(), metric_id()) -> map().
+get_slide(Name, Id) ->
+    gen_server:call(Name, {get_slide, Id}).
+
+%% Get the average for a specified sliding window period.
+%%
+%% It will only account for the samples recorded in the past `Window' seconds.
+-spec get_slide(handler_name(), metric_id(), non_neg_integer()) -> number().
+get_slide(Name, Id, Window) ->
+    gen_server:call(Name, {get_slide, Id, Window}).
+
 -spec reset_counters(handler_name(), metric_id()) -> ok.
 reset_counters(Name, Id) ->
-    Indexes = maps:values(get_indexes(Name, Id)),
-    Ref = get_ref(Name, Id),
-    lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, Indexes).
+    case get_ref(Name, Id) of
+        not_found ->
+            ok;
+        Ref ->
+            #{size := Size} = counters:info(Ref),
+            lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, lists:seq(1, Size))
+    end.
 
 -spec get_metrics(handler_name(), metric_id()) -> metrics().
 get_metrics(Name, Id) ->
     #{
         rate => get_rate(Name, Id),
         counters => get_counters(Name, Id),
-        gauges => get_gauges(Name, Id)
+        gauges => get_gauges(Name, Id),
+        slides => get_slide(Name, Id)
     }.
 
 -spec inc(handler_name(), metric_id(), atom()) -> ok.
@@ -194,7 +240,37 @@ inc(Name, Id, Metric) ->
 
 -spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok.
 inc(Name, Id, Metric, Val) ->
-    counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val).
+    counters:add(get_ref(Name, Id), idx_metric(Name, Id, counter, Metric), Val).
+
+%% Add a sample to the slide.
+%%
+%% Slide is short for "sliding window average" type of metric.
+%%
+%% It allows to monitor an average of some observed values in time,
+%% and it's mainly used for performance analysis. For example, it can
+%% be used to report run time of operations.
+%%
+%% Consider an example:
+%%
+%% ```
+%% emqx_metrics_worker:create_metrics(Name, Id, [{slide, a}]),
+%% emqx_metrics_worker:observe(Name, Id, a, 10),
+%% emqx_metrics_worker:observe(Name, Id, a, 30),
+%% #{a := 20} = emqx_metrics_worker:get_slide(Name, Id, _Window = 1).
+%% '''
+%%
+%% After recording 2 samples, this metric becomes 20 (the average of 10 and 30).
+%%
+%% But after 1 second it becomes 0 again, unless new samples are recorded.
+%%
+-spec observe(handler_name(), metric_id(), atom(), integer()) -> ok.
+observe(Name, Id, Metric, Val) ->
+    #{ref := CRef, slide := Idx} = maps:get(Id, get_pterm(Name)),
+    Index = maps:get(Metric, Idx),
+    %% Update sum:
+    counters:add(CRef, Index, Val),
+    %% Update number of samples:
+    counters:add(CRef, Index + 1, 1).
 
 -spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok.
 set_gauge(Name, Id, WorkerId, Metric, Val) ->
@@ -300,9 +376,9 @@ handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
 handle_call(
     {create_metrics, Id, Metrics, RateMetrics},
     _From,
-    State = #state{metric_ids = MIDs, rates = Rates}
+    State = #state{metric_ids = MIDs, rates = Rates, slides = Slides}
 ) ->
-    case RateMetrics -- Metrics of
+    case RateMetrics -- filter_counters(Metrics) of
         [] ->
             RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
             Rate1 =
@@ -310,9 +386,11 @@ handle_call(
                     undefined -> #{Id => RatePerId};
                     _ -> Rates#{Id => RatePerId}
                 end,
+            Slides1 = Slides#{Id => create_slides(Metrics)},
             {reply, create_counters(get_self_name(), Id, Metrics), State#state{
                 metric_ids = sets:add_element(Id, MIDs),
-                rates = Rate1
+                rates = Rate1,
+                slides = Slides1
             }};
         _ ->
             {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
@@ -320,7 +398,7 @@ handle_call(
 handle_call(
     {delete_metrics, Id},
     _From,
-    State = #state{metric_ids = MIDs, rates = Rates}
+    State = #state{metric_ids = MIDs, rates = Rates, slides = Slides}
 ) ->
     Name = get_self_name(),
     delete_counters(Name, Id),
@@ -331,29 +409,43 @@ handle_call(
             case Rates of
                 undefined -> undefined;
                 _ -> maps:remove(Id, Rates)
-            end
+            end,
+        slides = maps:remove(Id, Slides)
     }};
 handle_call(
     {reset_metrics, Id},
     _From,
-    State = #state{rates = Rates}
+    State = #state{rates = Rates, slides = Slides}
 ) ->
-    Name = get_self_name(),
-    delete_gauges(Name, Id),
-    {reply, reset_counters(Name, Id), State#state{
+    delete_gauges(get_self_name(), Id),
+    NewRates =
+        case Rates of
+            undefined ->
+                undefined;
+            _ ->
+                ResetRate =
+                    maps:map(
+                        fun(_Key, _Value) -> #rate{} end,
+                        maps:get(Id, Rates, #{})
+                    ),
+                maps:put(Id, ResetRate, Rates)
+        end,
+    SlideSpecs = [{slide, I} || I <- maps:keys(maps:get(Id, Slides, #{}))],
+    NewSlides = Slides#{Id => create_slides(SlideSpecs)},
+    {reply, reset_counters(get_self_name(), Id), State#state{
         rates =
-            case Rates of
-                undefined ->
-                    undefined;
-                _ ->
-                    ResetRate =
-                        maps:map(
-                            fun(_Key, _Value) -> #rate{} end,
-                            maps:get(Id, Rates, #{})
-                        ),
-                    maps:put(Id, ResetRate, Rates)
-            end
+            NewRates,
+        slides = NewSlides
     }};
+handle_call({get_slide, Id}, _From, State = #state{slides = Slides}) ->
+    SlidesForID = maps:get(Id, Slides, #{}),
+    {reply, maps:map(fun(Metric, Slide) -> do_get_slide(Id, Metric, Slide) end, SlidesForID),
+        State};
+handle_call({get_slide, Id, Window}, _From, State = #state{slides = Slides}) ->
+    SlidesForID = maps:get(Id, Slides, #{}),
+    {reply,
+        maps:map(fun(Metric, Slide) -> do_get_slide(Window, Id, Metric, Slide) end, SlidesForID),
+        State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
@@ -363,7 +455,7 @@ handle_cast(_Msg, State) ->
 handle_info(ticking, State = #state{rates = undefined}) ->
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
     {noreply, State};
-handle_info(ticking, State = #state{rates = Rates0}) ->
+handle_info(ticking, State = #state{rates = Rates0, slides = Slides0}) ->
     Rates =
         maps:map(
             fun(Id, RatesPerID) ->
@@ -376,8 +468,20 @@ handle_info(ticking, State = #state{rates = Rates0}) ->
             end,
             Rates0
         ),
+    Slides =
+        maps:map(
+            fun(Id, SlidesPerID) ->
+                maps:map(
+                    fun(Metric, Slide) ->
+                        update_slide(Id, Metric, Slide)
+                    end,
+                    SlidesPerID
+                )
+            end,
+            Slides0
+        ),
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
-    {noreply, State#state{rates = Rates}};
+    {noreply, State#state{rates = Rates, slides = Slides}};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -408,17 +512,18 @@ create_counters(_Name, _Id, []) ->
     error({create_counter_error, must_provide_a_list_of_metrics});
 create_counters(Name, Id, Metrics) ->
     %% backup the old counters
-    OlderCounters = maps:with(Metrics, get_counters(Name, Id)),
+    OlderCounters = maps:with(filter_counters(Metrics), get_counters(Name, Id)),
     %% create the new counter
-    Size = length(Metrics),
-    Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))),
+    {Size, Indexes} = create_metric_indexes(Metrics),
     Counters = get_pterm(Name),
     CntrRef = counters:new(Size, [write_concurrency]),
     persistent_term:put(
         ?CntrRef(Name),
-        Counters#{Id => #{ref => CntrRef, indexes => Indexes}}
+        Counters#{Id => Indexes#{ref => CntrRef}}
     ),
-    %% restore the old counters
+    %% Restore the old counters. Slides are not restored, since they
+    %% are periodically zeroed anyway. We do lose some samples in the
+    %% current interval, but that's acceptable for now.
     lists:foreach(
         fun({Metric, N}) ->
             inc(Name, Id, Metric, N)
@@ -426,6 +531,16 @@ create_counters(Name, Id, Metrics) ->
         maps:to_list(OlderCounters)
     ).
 
+create_metric_indexes(Metrics) ->
+    create_metric_indexes(Metrics, 1, [], []).
+
+create_metric_indexes([], Size, Counters, Slides) ->
+    {Size, #{counter => maps:from_list(Counters), slide => maps:from_list(Slides)}};
+create_metric_indexes([{counter, Id} | Rest], Index, Counters, Slides) ->
+    create_metric_indexes(Rest, Index + 1, [{Id, Index} | Counters], Slides);
+create_metric_indexes([{slide, Id} | Rest], Index, Counters, Slides) ->
+    create_metric_indexes(Rest, Index + 2, Counters, [{Id, Index} | Slides]).
+
 delete_counters(Name, Id) ->
     persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))).
 
@@ -435,12 +550,12 @@ get_ref(Name, Id) ->
         error -> not_found
     end.
 
-idx_metric(Name, Id, Metric) ->
-    maps:get(Metric, get_indexes(Name, Id)).
+idx_metric(Name, Id, Type, Metric) ->
+    maps:get(Metric, get_indexes(Name, Type, Id)).
 
-get_indexes(Name, Id) ->
+get_indexes(Name, Type, Id) ->
     case maps:find(Id, get_pterm(Name)) of
-        {ok, #{indexes := Indexes}} -> Indexes;
+        {ok, #{Type := Indexes}} -> Indexes;
         error -> #{}
     end.
 
@@ -488,6 +603,53 @@ calculate_rate(CurrVal, #rate{
         tick = Tick + 1
     }.
 
+do_get_slide(Id, Metric, S = #slide{n_samples = NSamples}) ->
+    #{
+        n_samples => NSamples,
+        current => do_get_slide(2, Id, Metric, S),
+        last5m => do_get_slide(?SECS_5M, Id, Metric, S)
+    }.
+
+do_get_slide(Window, Id, Metric, #slide{datapoints = DP0}) ->
+    Datapoint = get_slide_datapoint(Id, Metric),
+    {N, Sum} = get_slide_window(os:system_time(second) - Window, [Datapoint | DP0], 0, 0),
+    case N > 0 of
+        true -> Sum div N;
+        false -> 0
+    end.
+
+get_slide_window(_StartTime, [], N, S) ->
+    {N, S};
+get_slide_window(StartTime, [#slide_datapoint{time = T} | _], N, S) when T < StartTime ->
+    {N, S};
+get_slide_window(StartTime, [#slide_datapoint{samples = N, sum = S} | Rest], AccN, AccS) ->
+    get_slide_window(StartTime, Rest, AccN + N, AccS + S).
+
+get_slide_datapoint(Id, Metric) ->
+    Name = get_self_name(),
+    CRef = get_ref(Name, Id),
+    Index = idx_metric(Name, Id, slide, Metric),
+    Total = counters:get(CRef, Index),
+    N = counters:get(CRef, Index + 1),
+    #slide_datapoint{
+        sum = Total,
+        samples = N,
+        time = os:system_time(second)
+    }.
+
+update_slide(Id, Metric, Slide0 = #slide{n_samples = NSamples, datapoints = DPs}) ->
+    Datapoint = get_slide_datapoint(Id, Metric),
+    %% Reset counters:
+    Name = get_self_name(),
+    CRef = get_ref(Name, Id),
+    Index = idx_metric(Name, Id, slide, Metric),
+    counters:put(CRef, Index, 0),
+    counters:put(CRef, Index + 1, 0),
+    Slide0#slide{
+        datapoints = [Datapoint | lists:droplast(DPs)],
+        n_samples = Datapoint#slide_datapoint.samples + NSamples
+    }.
+
 format_rates_of_id(RatesPerId) ->
     maps:map(
         fun(_Metric, Rates) ->
@@ -510,6 +672,27 @@ precision(Float, N) ->
     Base = math:pow(10, N),
     round(Float * Base) / Base.
 
+desugar(Metrics) ->
+    lists:map(
+        fun
+            (Atom) when is_atom(Atom) ->
+                {counter, Atom};
+            (Spec = {_, _}) ->
+                Spec
+        end,
+        Metrics
+    ).
+
+filter_counters(Metrics) ->
+    [K || {counter, K} <- Metrics].
+
+create_slides(Metrics) ->
+    EmptyDatapoints = [
+        #slide_datapoint{sum = 0, samples = 0, time = 0}
+     || _ <- lists:seq(1, ?SECS_5M div ?SAMPLING)
+    ],
+    maps:from_list([{K, #slide{datapoints = EmptyDatapoints}} || {slide, K} <- Metrics]).
+
 get_self_name() ->
     {registered_name, Name} = process_info(self(), registered_name),
     Name.

+ 24 - 5
apps/emqx/test/emqx_metrics_worker_SUITE.erl

@@ -46,7 +46,7 @@ end_per_testcase(_, _Config) ->
     ok.
 
 t_get_metrics(_) ->
-    Metrics = [a, b, c],
+    Metrics = [a, b, c, {slide, d}],
     Id = <<"testid">>,
     ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
     %% all the metrics are set to zero at start
@@ -73,6 +73,8 @@ t_get_metrics(_) ->
     ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
     ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
     ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
+    ok = emqx_metrics_worker:observe(?NAME, Id, d, 10),
+    ok = emqx_metrics_worker:observe(?NAME, Id, d, 30),
     ct:sleep(1500),
     ?LET(
         #{
@@ -89,6 +91,9 @@ t_get_metrics(_) ->
                 a := 1,
                 b := 1,
                 c := 2
+            } = Counters,
+            slides := #{
+                d := #{n_samples := 2, last5m := 20, current := _}
             }
         },
         emqx_metrics_worker:get_metrics(?NAME, Id),
@@ -100,7 +105,8 @@ t_get_metrics(_) ->
             ?assert(MaxB > 0),
             ?assert(MaxC > 0),
             ?assert(Inflight == 12),
-            ?assert(Queuing == 9)
+            ?assert(Queuing == 9),
+            ?assertNot(maps:is_key(d, Counters))
         }
     ),
     ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
@@ -117,6 +123,7 @@ t_clear_metrics(_Config) ->
                 c := #{current := 0.0, max := 0.0, last5m := 0.0}
             },
             gauges := #{},
+            slides := #{},
             counters := #{
                 a := 0,
                 b := 0,
@@ -138,14 +145,15 @@ t_clear_metrics(_Config) ->
         #{
             counters => #{},
             gauges => #{},
-            rate => #{current => 0.0, last5m => 0.0, max => 0.0}
+            rate => #{current => 0.0, last5m => 0.0, max => 0.0},
+            slides => #{}
         },
         emqx_metrics_worker:get_metrics(?NAME, Id)
     ),
     ok.
 
 t_reset_metrics(_) ->
-    Metrics = [a, b, c],
+    Metrics = [a, b, c, {slide, d}],
     Id = <<"testid">>,
     ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics),
     %% all the metrics are set to zero at start
@@ -161,6 +169,9 @@ t_reset_metrics(_) ->
                 a := 0,
                 b := 0,
                 c := 0
+            },
+            slides := #{
+                d := #{n_samples := 0, last5m := 0, current := 0}
             }
         },
         emqx_metrics_worker:get_metrics(?NAME, Id)
@@ -172,7 +183,12 @@ t_reset_metrics(_) ->
     ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5),
     ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7),
     ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9),
+    ok = emqx_metrics_worker:observe(?NAME, Id, d, 100),
+    ok = emqx_metrics_worker:observe(?NAME, Id, d, 200),
     ct:sleep(1500),
+    ?assertMatch(
+        #{d := #{n_samples := 2}}, emqx_metrics_worker:get_slide(?NAME, <<"testid">>)
+    ),
     ok = emqx_metrics_worker:reset_metrics(?NAME, Id),
     ?LET(
         #{
@@ -186,6 +202,9 @@ t_reset_metrics(_) ->
                 a := 0,
                 b := 0,
                 c := 0
+            },
+            slides := #{
+                d := #{n_samples := 0, last5m := 0, current := 0}
             }
         },
         emqx_metrics_worker:get_metrics(?NAME, Id),
@@ -202,7 +221,7 @@ t_reset_metrics(_) ->
     ok = emqx_metrics_worker:clear_metrics(?NAME, Id).
 
 t_get_metrics_2(_) ->
-    Metrics = [a, b, c],
+    Metrics = [a, b, c, {slide, d}],
     Id = <<"testid">>,
     ok = emqx_metrics_worker:create_metrics(
         ?NAME,

+ 1 - 0
changes/v5.0.14/feat-9671.en.md

@@ -0,0 +1 @@
+Implement sliding window average metrics.

+ 1 - 0
changes/v5.0.14/feat-9671.zh.md

@@ -0,0 +1 @@
+实施滑动窗口平均度量。