Procházet zdrojové kódy

feat: new monitor TODO: API

DDDHuang před 4 roky
rodič
revize
21b9943df9

+ 1 - 0
apps/emqx_dashboard/etc/emqx_dashboard.conf

@@ -6,6 +6,7 @@ dashboard {
     default_username = "admin"
     default_username = "admin"
     default_password = "public"
     default_password = "public"
     ## notice: sample_interval should be divisible by 60.
     ## notice: sample_interval should be divisible by 60.
+    ## as like 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s
     sample_interval = 10s
     sample_interval = 10s
     ## api jwt timeout. default is 30 minute
     ## api jwt timeout. default is 30 minute
     token_expired_time = 60m
     token_expired_time = 60m

+ 176 - 20
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -15,9 +15,14 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -module(emqx_dashboard_monitor).
 -module(emqx_dashboard_monitor).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 
--export([start_link/0]).
+-boot_mnesia({mnesia, [boot]}).
+
+-export([ start_link/0]).
 
 
 -export([ init/1
 -export([ init/1
         , handle_call/3
         , handle_call/3
@@ -29,36 +34,88 @@
 
 
 -export([ mnesia/1]).
 -export([ mnesia/1]).
 
 
+-export([ samples/0
+        , samples/1
+        , aggregate_samplers/0
+        ]).
+
 -define(TAB, ?MODULE).
 -define(TAB, ?MODULE).
 
 
--record(state, {}).
--define(INIT_DATA,
-    #{
-        sent            => 0,
-        received        => 0,
-        sent_bytes      => 0,
-        received_bytes  => 0,
-        dropped         => 0,
-        subscriptions   => 0,
-        routes          => 0,
-        connection      => 0
+%% 10 seconds
+-define(DEFAULT_INTERVAL, 10).
+
+-ifdef(TEST).
+%% for test
+-define(CLEAN_EXPIRED_INTERVAL, 2 * 1000).
+-define(RETENTION_TIME, 3 * 1000).
+-define(DEFAULT_GET_DATA_TIME, 5* 1000).
+
+-else.
+
+%% 1 hour = 60 * 60 * 1000 milliseconds
+-define(CLEAN_EXPIRED_INTERVAL, 60 * 60 * 1000).
+%% 7 days = 7 * 24 * 60 * 60 * 1000 milliseconds
+-define(RETENTION_TIME, 7 * 24 * 60 * 60 * 1000).
+%% 1 day = 60 * 60 * 1000 milliseconds
+-define(DEFAULT_GET_DATA_TIME, 60 * 60 * 1000).
+
+-endif.
+
+-record(state, {
+    last
     }).
     }).
 
 
--record(monitor_data)
+-record(emqx_monit, {
+    time :: integer(),
+    data :: map()
+    }).
+
+
+-define(DELTA_LIST,
+    [ received
+    , received_bytes
+    , sent
+    , sent_bytes
+    , dropped
+    ]).
+
+-define(SAMPLER_LIST,
+    [ subscriptions
+    , routes
+    , connections
+    ] ++ ?DELTA_LIST).
 
 
 mnesia(boot) ->
 mnesia(boot) ->
     ok = mria:create_table(?TAB, [
     ok = mria:create_table(?TAB, [
         {type, set},
         {type, set},
         {local_content, true},
         {local_content, true},
         {storage, disc_copies},
         {storage, disc_copies},
-        {record_name, monitor_data},
-        {attributes, record_info(fields, monitor_data)}]).
+        {record_name, emqx_monit},
+        {attributes, record_info(fields, emqx_monit)}]).
+
+aggregate_samplers() ->
+    [#{node => Node, data => samples(Node)} || Node <- mria_mnesia:cluster_nodes(running)].
+
+samples() ->
+    All = [samples(Node) || Node <- mria_mnesia:cluster_nodes(running)],
+    lists:foldl(fun merge_cluster_samplers/2, #{}, All).
+
+samples(Node) when Node == node() ->
+    get_data(?DEFAULT_GET_DATA_TIME);
+samples(Node) ->
+    rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [Node]).
+
+%%%===================================================================
+%%% gen_server functions
+%%%===================================================================
 
 
 start_link() ->
 start_link() ->
-    gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
 init([]) ->
 init([]) ->
-    {ok, #state{}}.
+    sample_timer(),
+    clean_timer(),
+    {ok, #state{last = undefined}}.
 
 
 handle_call(_Request, _From, State = #state{}) ->
 handle_call(_Request, _From, State = #state{}) ->
     {reply, ok, State}.
     {reply, ok, State}.
@@ -66,6 +123,17 @@ handle_call(_Request, _From, State = #state{}) ->
 handle_cast(_Request, State = #state{}) ->
 handle_cast(_Request, State = #state{}) ->
     {noreply, State}.
     {noreply, State}.
 
 
+handle_info({sample, Time}, State = #state{last = Last}) ->
+    Now = sample(Time),
+    {atomic, ok} = flush(Last, Now),
+    sample_timer(),
+    {noreply, State#state{last = Now}};
+
+handle_info(clean_expired, State) ->
+    clean(),
+    clean_timer(),
+    {noreply, State};
+
 handle_info(_Info, State = #state{}) ->
 handle_info(_Info, State = #state{}) ->
     {noreply, State}.
     {noreply, State}.
 
 
@@ -79,9 +147,97 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
 %%% Internal functions
 %%% Internal functions
 %%%===================================================================
 %%%===================================================================
 
 
+sample_timer() ->
+    {NextTime, Remaining} = next_interval(),
+    erlang:send_after(Remaining, self(), {sample, NextTime}).
+
+clean_timer() ->
+    erlang:send_after(?CLEAN_EXPIRED_INTERVAL, self(), clean_expired).
+
+%% Per interval seconds.
+%% As an example:
+%%  Interval = 10
+%%  The monitor will start working at full seconds, as like 00:00:00, 00:00:10, 00:00:20 ...
+%% Ensure that the monitor data of all nodes in the cluster are aligned in time
 next_interval() ->
 next_interval() ->
-    ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL),
+    Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_INTERVAL) * 1000,
+    Now = erlang:system_time(millisecond),
+    NextTime = ((Now div Interval) + 1) * Interval,
+    Remaining = NextTime - Now,
+    {NextTime, Remaining}.
+
+sample(Time) ->
+    Fun =
+        fun(Key, Res) ->
+            maps:put(Key, value(Key), Res)
+        end,
+    Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST),
+    #emqx_monit{time = Time, data = Data}.
+
+flush(_Last = undefined, Now) ->
+    store(Now);
+flush(_Last = #emqx_monit{data = LastData}, Now = #emqx_monit{data = NowData}) ->
+    Store = Now#emqx_monit{data = delta(LastData, NowData)},
+    store(Store).
+
+delta(LastData, NowData) ->
+    Fun =
+        fun(Key, Data) ->
+            Value = maps:get(Key, NowData) - maps:get(Key, LastData),
+            Data#{Key => Value}
+        end,
+    lists:foldl(Fun, NowData, ?DELTA_LIST).
+
+store(MonitData) ->
+    {atomic, ok} =
+        mria:transaction(mria:local_content_shard(), fun mnesia:write/3, [?TAB, MonitData, write]).
+
+clean() ->
+    Now = erlang:system_time(millisecond),
+    ExpiredMS = [{{'_', '$1', '_'}, [{'>', {'-', Now, '$1'}, ?RETENTION_TIME}], ['$_']}],
+    Expired = ets:select(?TAB, ExpiredMS),
+    lists:foreach(fun(Data) ->
+        true = ets:delete_object(?TAB, Data)
+    end, Expired),
     ok.
     ok.
 
 
-monit() ->
-    ok.
+get_data(PastTime) ->
+    Now = erlang:system_time(millisecond),
+    ExpiredMS = [{{'_', '$1', '_'}, [{'<', {'-', Now, '$1'}, PastTime}], ['$_']}],
+    format(ets:select(?TAB, ExpiredMS)).
+
+format(List) when is_list(List) ->
+    Fun =
+        fun(Data, All) ->
+            maps:merge(format(Data), All)
+        end,
+    lists:foldl(Fun, #{}, List);
+format(#emqx_monit{time = Time, data = Data}) ->
+    #{Time => Data}.
+
+merge_cluster_samplers(Node, Cluster) ->
+    maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
+
+merge_cluster_samplers(TS, NodeData, Cluster) ->
+    case maps:get(TS, Cluster, undefined) of
+        undefined ->
+            Cluster#{TS => NodeData};
+        ClusterData ->
+            Cluster#{TS => count_map(NodeData, ClusterData)}
+    end.
+
+count_map(M1, M2) ->
+    Fun =
+        fun(Key, Map) ->
+            Map#{key => maps:get(Key, M1) + maps:get(Key, M2)}
+        end,
+    lists:foldl(Fun, #{}, ?SAMPLER_LIST).
+
+value(connections) -> emqx_stats:getstat('connections.count');
+value(routes) -> emqx_stats:getstat('routes.count');
+value(subscriptions) -> emqx_stats:getstat('subscriptions.count');
+value(received) -> emqx_metrics:val('messages.received');
+value(received_bytes) -> emqx_metrics:val('bytes.received');
+value(sent) -> emqx_metrics:val('messages.sent');
+value(sent_bytes) -> emqx_metrics:val('bytes.sent');
+value(dropped) -> emqx_metrics:val('messages.dropped').

+ 1 - 0
apps/emqx_dashboard/src/emqx_dashboard_schema.erl

@@ -39,6 +39,7 @@ but use the same port.
 """   })}
 """   })}
     , {default_username, fun default_username/1}
     , {default_username, fun default_username/1}
     , {default_password, fun default_password/1}
     , {default_password, fun default_password/1}
+    %% TODO: enum 1s, 2s, 3s, 5s, 10s, 12s, 15s, 20s, 30s, 60s
     , {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})}
     , {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})}
     , {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})}
     , {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})}
     , {cors, fun cors/1}
     , {cors, fun cors/1}

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_sup.erl

@@ -29,4 +29,4 @@ start_link() ->
 
 
 init([]) ->
 init([]) ->
     {ok, {{one_for_all, 10, 100},
     {ok, {{one_for_all, 10, 100},
-        [?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_collection)]}}.
+        [?CHILD(emqx_dashboard_token), ?CHILD(emqx_dashboard_monitor)]}}.