Forráskód Böngészése

feat: dashboard monitor granularity adapter

DDDHuang 4 éve
szülő
commit
c21bc9d329

+ 12 - 2
apps/emqx_dashboard/include/emqx_dashboard.hrl

@@ -55,8 +55,18 @@
     , dropped
     ]).
 
--define(SAMPLER_LIST,
+-define(GAUGE_SAMPLER_LIST,
     [ subscriptions
     , routes
     , connections
-    ] ++ ?DELTA_SAMPLER_LIST).
+    ]).
+
+-define(SAMPLER_LIST, ?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST).
+
+-define(DELTA_SAMPLER_RATE_MAP, #{
+    received        => received_rate,
+    received_bytes  => received_bytes_rate,
+    sent            => sent_rate,
+    sent_bytes      => sent_bytes_rate,
+    dropped         => dropped_rate
+    }).

+ 107 - 27
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -35,12 +35,14 @@
 -export([ mnesia/1]).
 
 -export([ samplers/0
-        , samplers/1
         , samplers/2
+        , current_rate/0
+        , current_rate/1
+        , granularity_adapter/1
         ]).
 
 %% for rpc
--export([ do_sample/1]).
+-export([ do_sample/2]).
 
 -define(TAB, ?MODULE).
 
@@ -66,30 +68,53 @@ mnesia(boot) ->
         {record_name, emqx_monit},
         {attributes, record_info(fields, emqx_monit)}]).
 
-samplers() ->
-    samplers(all).
+%% -------------------------------------------------------------------------------------------------
+%% API
 
-samplers(NodeOrCluster) ->
-    format(do_sample(NodeOrCluster)).
+samplers() ->
+    format(do_sample(all, infinity)).
 
-samplers(NodeOrCluster, 0) ->
-    samplers(NodeOrCluster);
 samplers(NodeOrCluster, Latest) ->
-    case samplers(NodeOrCluster) of
+    Now = erlang:system_time(millisecond),
+    MatchTime = Now - (Latest * 1000),
+    case format(do_sample(NodeOrCluster, MatchTime)) of
         {badrpc, Reason} ->
             {badrpc, Reason};
         List when is_list(List) ->
-            case erlang:length(List) - Latest of
-                Start when Start > 0 ->
-                    lists:sublist(List, Start, Latest);
-                _ ->
-                    List
+            granularity_adapter(List)
+    end.
+
+granularity_adapter(List) when length(List) > 100 ->
+    granularity_adapter(List, []);
+granularity_adapter(List) ->
+    List.
+
+current_rate() ->
+    Fun =
+        fun(Node, Cluster) ->
+            case current_rate(Node) of
+                {ok, CurrentRate} ->
+                    merge_cluster_rate(CurrentRate, Cluster);
+                {badrpc, Reason} ->
+                    {badrpc, {Node, Reason}}
             end
+        end,
+    lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running)).
+
+current_rate(all) ->
+    current_rate();
+current_rate(Node) when Node == node() ->
+    do_call(current_rate);
+current_rate(Node) ->
+    case rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000) of
+        {badrpc, Reason} ->
+            {badrpc, {Node, Reason}};
+        {ok, Rate} ->
+            {ok, Rate}
     end.
 
-%%%===================================================================
-%%% gen_server functions
-%%%===================================================================
+%% -------------------------------------------------------------------------------------------------
+%% gen_server functions
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -99,6 +124,11 @@ init([]) ->
     clean_timer(),
     {ok, #state{last = undefined}}.
 
+handle_call(current_rate, _From, State = #state{last = Last}) ->
+    NowTime = erlang:system_time(millisecond),
+    NowSamplers = sample(NowTime),
+    Rate = cal_rate(NowSamplers, Last),
+    {reply, {ok, Rate}, State};
 handle_call(_Request, _From, State = #state{}) ->
     {reply, ok, State}.
 
@@ -125,14 +155,16 @@ terminate(_Reason, _State = #state{}) ->
 code_change(_OldVsn, State = #state{}, _Extra) ->
     {ok, State}.
 
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
+%% -------------------------------------------------------------------------------------------------
+%% Internal functions
 
-do_sample(all) ->
+do_call(Request) ->
+    gen_server:call(?MODULE, Request, 5000).
+
+do_sample(all, MatchTime) ->
     Fun =
         fun(Node, All) ->
-            case do_sample(Node) of
+            case do_sample(Node, MatchTime) of
                 {badrpc, Reason} ->
                     {badrpc, {Node, Reason}};
                 NodeSamplers ->
@@ -140,11 +172,16 @@ do_sample(all) ->
             end
         end,
     lists:foldl(Fun, #{}, mria_mnesia:cluster_nodes(running));
-do_sample(Node) when Node == node() ->
-    ExpiredMS = [{'$1',[],['$1']}],
-    internal_format(ets:select(?TAB, ExpiredMS));
-do_sample(Node) ->
-    rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node], 5000).
+do_sample(Node, MatchTime) when Node == node() ->
+    MS = match_spec(MatchTime),
+    internal_format(ets:select(?TAB, MS));
+do_sample(Node, MatchTime) ->
+    rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node, MatchTime], 5000).
+
+match_spec(infinity) ->
+    [{'$1',[],['$1']}];
+match_spec(MatchTime) ->
+    [{{'_', '$1', '_'}, [{'>=', '$1', MatchTime}], ['$_']}].
 
 merge_cluster_samplers(Node, Cluster) ->
     maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
@@ -157,6 +194,14 @@ merge_cluster_samplers(TS, NodeData, Cluster) ->
             Cluster#{TS => count_map(NodeData, ClusterData)}
     end.
 
+merge_cluster_rate(Node, Cluster) ->
+    Fun =
+        fun(Key, Value, NCluster) ->
+            ClusterValue = maps:get(Key, NCluster, 0),
+            NCluster#{Key => Value + ClusterValue}
+        end,
+    maps:fold(Fun, Cluster, Node).
+
 format({badrpc, Reason}) ->
     {badrpc, Reason};
 format(Data) ->
@@ -167,6 +212,38 @@ format(Data) ->
 format(TimeStamp, Data, All) ->
     [Data#{time_stamp => TimeStamp} | All].
 
+cal_rate( #emqx_monit{data = NowData, time = NowTime}
+        , #emqx_monit{data = LastData, time = LastTime}) ->
+    TimeDelta = NowTime - LastTime,
+    Filter = fun(Key, _) -> lists:member(Key, ?GAUGE_SAMPLER_LIST) end,
+    Gauge = maps:filter(Filter, NowData),
+    {_, _, _, Rate} =
+        lists:foldl(fun cal_rate_/2, {NowData, LastData, TimeDelta, Gauge}, ?DELTA_SAMPLER_LIST),
+    Rate.
+
+cal_rate_(Key, {Now, Last, TDelta, Res}) ->
+    NewValue = maps:get(Key, Now),
+    LastValue = maps:get(Key, Last),
+    Rate = ((NewValue - LastValue) * 1000) div TDelta,
+    RateKey = maps:get(Key, ?DELTA_SAMPLER_RATE_MAP),
+    {Now, Last, TDelta, Res#{RateKey => Rate}}.
+
+granularity_adapter([], Res) ->
+    lists:reverse(Res);
+granularity_adapter([Sampler], Res) ->
+    granularity_adapter([], [Sampler | Res]);
+granularity_adapter([Sampler1, Sampler2 | Rest], Res) ->
+    Fun =
+        fun(Key, M) ->
+            Value1 = maps:get(Key, Sampler1),
+            Value2 = maps:get(Key, Sampler2),
+            M#{Key => Value1 + Value2}
+        end,
+    granularity_adapter(Rest, [lists:foldl(Fun, Sampler2, ?DELTA_SAMPLER_LIST) | Res]).
+
+%% -------------------------------------------------------------------------------------------------
+%% timer
+
 sample_timer() ->
     {NextTime, Remaining} = next_interval(),
     erlang:send_after(Remaining, self(), {sample, NextTime}).
@@ -186,6 +263,9 @@ next_interval() ->
     Remaining = NextTime - Now,
     {NextTime, Remaining}.
 
+%% -------------------------------------------------------------------------------------------------
+%% data
+
 sample(Time) ->
     Fun =
         fun(Key, Res) ->

+ 42 - 5
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -16,7 +16,9 @@
         , fields/1
         ]).
 
--export([ monitor/2]).
+-export([ monitor/2
+        , monitor_current/2
+        ]).
 
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
@@ -24,6 +26,7 @@ api_spec() ->
 paths() ->
     [ "/monitor"
     , "/monitor/nodes/:node"
+    , "/monitor/current"
     ].
 
 schema("/monitor") ->
@@ -55,19 +58,34 @@ schema("/monitor/nodes/:node") ->
                 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
             }
         }
+    };
+
+schema("/monitor/current") ->
+    #{
+        'operationId' => monitor_current,
+        get => #{
+            description => <<"Current monitor data. Gauge and rate">>,
+            responses => #{
+                200 => hoconsc:mk(hoconsc:ref(sampler_current), #{})
+            }
+        }
     }.
 
 fields(sampler) ->
     Samplers =
         [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})}
         || SamplerName <- ?SAMPLER_LIST],
-    [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers].
+    [{time_stamp, hoconsc:mk(integer(), #{desc => <<"Timestamp">>})} | Samplers];
+
+fields(sampler_current) ->
+    [{SamplerName, hoconsc:mk(integer(), #{desc => sampler_desc(SamplerName)})}
+    || SamplerName <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST].
 
 %% -------------------------------------------------------------------------------------------------
 %% API
 
 monitor(get, #{query_string := QS, bindings := Bindings}) ->
-    Latest = maps:get(<<"latest">>, QS, 0),
+    Latest = maps:get(<<"latest">>, QS, 1000),
     Node = binary_to_atom(maps:get(node, Bindings, <<"all">>)),
     case emqx_dashboard_monitor:samplers(Node, Latest) of
         {badrpc, {Node, Reason}} ->
@@ -77,6 +95,16 @@ monitor(get, #{query_string := QS, bindings := Bindings}) ->
             {200, Samplers}
     end.
 
+monitor_current(get, #{query_string := QS}) ->
+    NodeOrCluster = binary_to_atom(maps:get(<<"node">>, QS, <<"all">>), utf8),
+    case emqx_dashboard_monitor:current_rate(NodeOrCluster) of
+        {ok, CurrentRate} ->
+            {200, CurrentRate};
+        {badrpc, {Node, Reason}} ->
+            Message = list_to_binary(io_lib:format("Bad node ~p, rpc failed ~p", [Node, Reason])),
+            {400, 'BAD_RPC', Message}
+    end.
+
 %% -------------------------------------------------------------------------------------------------
 %% Internal
 
@@ -93,8 +121,17 @@ sampler_desc(routes) ->
     " Can only represent the approximate state">>;
 sampler_desc(connections) ->
     <<"Connections at the time of sampling."
-    " Can only represent the approximate state">>.
+    " Can only represent the approximate state">>;
+
+sampler_desc(received_rate)       -> sampler_desc_format("Dropped messages ", per);
+sampler_desc(received_bytes_rate) -> sampler_desc_format("Received bytes ", per);
+sampler_desc(sent_rate)           -> sampler_desc_format("Sent messages ", per);
+sampler_desc(sent_bytes_rate)     -> sampler_desc_format("Sent bytes ", per);
+sampler_desc(dropped_rate)        -> sampler_desc_format("Dropped messages ", per).
 
 sampler_desc_format(Format) ->
+    sampler_desc_format(Format, last).
+
+sampler_desc_format(Format, Type) ->
     Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
-    list_to_binary(io_lib:format(Format ++ "last ~p seconds", [Interval])).
+    list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).