Просмотр исходного кода

Merge pull request #3808 from zmstone/improve-speed-average

refactor(metrics): Use modified moving average for topic metrics speed
Zaiming Shi 5 лет назад
Родитель
Сommit
380c874fb5

+ 43 - 26
apps/emqx/src/emqx_mod_topic_metrics.erl

@@ -52,6 +52,9 @@
         , all_registered_topics/0
         ]).
 
+%% stats.
+-export([ rates/2 ]).
+
 %% gen_server callbacks
 -export([ init/1
         , handle_call/3
@@ -78,13 +81,15 @@
         ]).
 
 -define(TICKING_INTERVAL, 1).
+-define(SPEED_AVERAGE_WINDOW_SIZE, 5).
+-define(SPEED_MEDIUM_WINDOW_SIZE, 60).
+-define(SPEED_LONG_WINDOW_SIZE, 300).
 
 -record(speed, {
             last = 0 :: number(),
-            tick = 1 :: number(),
             last_v = 0 :: number(),
-            acc = 0 :: number(),
-            samples = [] :: list()
+            last_medium = 0 :: number(),
+            last_long = 0 :: number()
         }).
 
 -record(state, {
@@ -180,7 +185,15 @@ val(Topic, Metric) ->
     end.
 
 rate(Topic, Metric) ->
-    gen_server:call(?MODULE, {get_rate, Topic, Metric}).
+    case rates(Topic, Metric) of
+        #{short := Last} ->
+            Last;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+rates(Topic, Metric) ->
+    gen_server:call(?MODULE, {get_rates, Topic, Metric}).
 
 metrics(Topic) ->
     case ets:lookup(?TAB, Topic) of
@@ -253,7 +266,7 @@ handle_call({unregister, Topic}, _From, State = #state{speeds = Speeds}) ->
             {reply, ok, State#state{speeds = NSpeeds}}
     end;
 
-handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) ->
+handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) ->
     case is_registered(Topic) of
         false ->
             {reply, {error, topic_not_found}, State};
@@ -261,8 +274,8 @@ handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) -
             case maps:get({Topic, Metric}, Speeds, undefined) of
                 undefined ->
                     {reply, {error, invalid_metric}, State};
-                #speed{last = Last} ->
-                    {reply, Last, State}
+                #speed{last = Short, last_medium = Medium, last_long = Long}  ->
+                    {reply, #{ short => Short, medium => Medium, long => Long }, State}
             end
     end.
 
@@ -358,25 +371,29 @@ counters_size() ->
 number_of_registered_topics() ->
     proplists:get_value(size, ets:info(?TAB)).
 
-calculate_speed(CurVal, #speed{last_v = LastVal, tick = Tick, acc = Acc, samples = Samples}) ->
+calculate_speed(CurVal, #speed{last = Last,
+    last_v = LastVal,
+    last_medium = LastMedium,
+    last_long = LastLong
+}) ->
     %% calculate the current speed based on the last value of the counter
     CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL,
+    #speed{
+        last_v = CurVal,
+        last = short_mma(Last, CurSpeed),
+        last_medium = medium_mma(LastMedium, CurSpeed),
+        last_long = long_mma(LastLong, CurSpeed)
+    }.
 
-    %% calculate the average speed in last 5 seconds
-    case Tick < 5 of
-        true ->
-            Acc1 = Acc + CurSpeed,
-            #speed{last = Acc1 / Tick,
-                   last_v = CurVal,
-                   acc = Acc1,
-                   samples = Samples ++ [CurSpeed],
-                   tick = Tick + 1};
-        false ->
-            [FirstSpeed | Speeds] = Samples,
-            Acc1 =  Acc + CurSpeed - FirstSpeed,
-            #speed{last = Acc1 / Tick,
-                   last_v = CurVal,
-                   acc = Acc1,
-                   samples = Speeds ++ [CurSpeed],
-                   tick = Tick}
-    end.
+%% Modified Moving Average ref: https://en.wikipedia.org/wiki/Moving_average
+mma(WindowSize, LastSpeed, CurSpeed) ->
+    (LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize.
+
+short_mma(LastSpeed, CurSpeed) ->
+    mma(?SPEED_AVERAGE_WINDOW_SIZE, LastSpeed, CurSpeed).
+
+medium_mma(LastSpeed, CurSpeed) ->
+    mma(?SPEED_MEDIUM_WINDOW_SIZE, LastSpeed, CurSpeed).
+
+long_mma(LastSpeed, CurSpeed) ->
+    mma(?SPEED_LONG_WINDOW_SIZE, LastSpeed, CurSpeed).

+ 4 - 1
apps/emqx/test/emqx_mod_topic_metrics_SUITE.erl

@@ -36,11 +36,13 @@ t_nonexistent_topic_metrics(_) ->
     ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
     ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
     ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')),
+    ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in')),
     emqx_mod_topic_metrics:register(<<"a/b/c">>),
     ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
     ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')),
     ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
     ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
+    ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')),
     emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
     emqx_mod_topic_metrics:unload([]).
 
@@ -57,6 +59,7 @@ t_topic_metrics(_) ->
     ?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
     ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
     ?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
+    ?assert(emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}),
     emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
     emqx_mod_topic_metrics:unload([]).
 
@@ -89,4 +92,4 @@ t_hook(_) ->
     ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
     ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
     emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
-    emqx_mod_topic_metrics:unload([]).
+    emqx_mod_topic_metrics:unload([]).