Просмотр исходного кода

Merge pull request #5393 from DDDHuang/dashboard_metrics

feat: add dashboard metrics; api username params
DDDHuang 4 лет назад
Родитель
Сommit
dc2f6303d4

+ 2 - 0
apps/emqx_dashboard/etc/emqx_dashboard.conf

@@ -5,6 +5,8 @@
 emqx_dashboard:{
     default_username: "admin"
     default_password: "public"
+    ## notice: sample_interval should be divisible by 60.
+    sample_interval: 10s
     listeners: [
         {
             num_acceptors: 4

+ 5 - 0
apps/emqx_dashboard/include/emqx_dashboard.hrl

@@ -21,3 +21,8 @@
 -define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))).
 
 -define(DASHBOARD_SHARD, emqx_dashboard_shard).
+
+-record(mqtt_collect, {
+    timestamp :: integer(),
+    collect
+    }).

+ 13 - 0
apps/emqx_dashboard/src/emqx_dashboard_api.erl

@@ -105,6 +105,7 @@ user_api() ->
     Metadata = #{
         delete => #{
             description => <<"Delete dashboard users">>,
+            parameters => [path_param_username()],
             responses => #{
                 <<"200">> => response_schema(<<"Delete User successfully">>),
                 <<"400">> => bad_request()
@@ -112,6 +113,7 @@ user_api() ->
         },
         put => #{
             description => <<"Update dashboard users">>,
+            parameters => [path_param_username()],
             'requestBody' => request_body_schema(#{
                 type => object,
                 properties => #{
@@ -127,6 +129,7 @@ user_api() ->
         },
         post => #{
             description => <<"Create dashboard users">>,
+            parameters => [path_param_username()],
             'requestBody' => request_body_schema(create_user),
             responses => #{
                 <<"200">> => response_schema(<<"Create Users successfully">>),
@@ -140,6 +143,7 @@ change_pwd_api() ->
     Metadata = #{
         put => #{
             description => <<"Update dashboard users password">>,
+            parameters => [path_param_username()],
             'requestBody' => request_body_schema(#{
                 type => object,
                 properties => #{
@@ -159,6 +163,15 @@ change_pwd_api() ->
     },
     {"/change_pwd/:username", Metadata, change_pwd}.
 
+path_param_username() ->
+    #{
+        name => username,
+        in => path,
+        required => true,
+        schema => #{type => string},
+        example => <<"admin">>
+    }.
+
 -define(EMPTY(V), (V == undefined orelse V == <<>>)).
 
 auth(post, Request) ->

+ 173 - 0
apps/emqx_dashboard/src/emqx_dashboard_collection.erl

@@ -0,0 +1,173 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_dashboard_collection).
+
+-behaviour(gen_server).
+
+-include("emqx_dashboard.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-export([ start_link/0
+        ]).
+
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+-export([get_collect/0]).
+
+-export([get_local_time/0]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+%% Mnesia bootstrap
+-export([mnesia/1]).
+
+-define(APP, emqx_dashboard).
+
+-define(DEFAULT_INTERVAL, 10). %% seconds
+
+-define(COLLECT, {[],[],[]}).
+
+-define(CLEAR_INTERVAL, 86400000).
+
+-define(EXPIRE_INTERVAL, 86400000 * 7).
+
+mnesia(boot) ->
+    ok = ekka_mnesia:create_table(emqx_collect, [
+        {type, set},
+        {local_content, true},
+        {disc_only_copies, [node()]},
+        {record_name, mqtt_collect},
+        {attributes, record_info(fields, mqtt_collect)}]);
+mnesia(copy) ->
+    mnesia:add_table_copy(emqx_collect, node(), disc_only_copies).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
+
+init([]) ->
+    timer(timer:seconds(interval()), collect),
+    timer(get_today_remaining_seconds(), clear_expire_data),
+    ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
+    State = #{
+        count => count(),
+        expire_interval => ExpireInterval,
+        collect => ?COLLECT,
+        temp_collect => {0, 0, 0, 0},
+        last_collects => {0, 0, 0}
+    },
+    {ok, State}.
+
+interval() ->
+    emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL).
+
+count() ->
+    60 div interval().
+
+handle_call(get_collect, _From, State = #{temp_collect := {Received, Sent, _, _}}) ->
+    {reply, {Received, Sent, collect(subscriptions), collect(connections)}, State, hibernate};
+handle_call(_Req, _From, State) ->
+    {reply, ok, State}.
+handle_cast(_Req, State) ->
+    {noreply, State}.
+
+handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) ->
+    NewLastCollect = flush(collect_all(Collect), LastCollect),
+    TempCollect1 = temp_collect(TempCollect),
+    timer(timer:seconds(interval()), collect),
+    {noreply, State#{count => count(),
+                     collect => ?COLLECT,
+                     temp_collect => TempCollect1,
+                     last_collects => NewLastCollect}};
+
+handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) ->
+    TempCollect1 = temp_collect(TempCollect),
+    timer(timer:seconds(interval()), collect),
+    {noreply, State#{count => Count - 1,
+                     collect => collect_all(Collect),
+                     temp_collect => TempCollect1}, hibernate};
+
+handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) ->
+    timer(?CLEAR_INTERVAL, clear_expire_data),
+    T1 = get_local_time(),
+    Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end),
+    Collects = dets:select(emqx_collect, Spec),
+    lists:foreach(fun(Collect) ->
+        dets:delete_object(emqx_collect, Collect)
+    end, Collects),
+    {noreply, State, hibernate};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+temp_collect({_, _, Received, Sent}) ->
+    Received1 = collect(received),
+    Sent1 = collect(sent),
+    {(Received1 - Received) div interval(),
+     (Sent1 - Sent) div interval(),
+     Received1,
+     Sent1}.
+
+collect_all({Connection, Route, Subscription}) ->
+    {[collect(connections)| Connection],
+     [collect(routes)| Route],
+     [collect(subscriptions)| Subscription]}.
+
+collect(connections) ->
+    emqx_stats:getstat('connections.count');
+collect(routes) ->
+    emqx_stats:getstat('routes.count');
+collect(subscriptions) ->
+    emqx_stats:getstat('subscriptions.count');
+collect(received) ->
+    emqx_metrics:val('messages.received');
+collect(sent) ->
+    emqx_metrics:val('messages.sent');
+collect(dropped) ->
+    emqx_metrics:val('messages.dropped').
+
+flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
+    Received = collect(received),
+    Sent = collect(sent),
+    Dropped = collect(dropped),
+    Collect = {avg(Connection),
+               avg(Route),
+               avg(Subscription),
+               diff(Received, Received0),
+               diff(Sent, Sent0),
+               diff(Dropped, Dropped0)},
+    Ts = get_local_time(),
+    _ = mnesia:dirty_write(emqx_collect, #mqtt_collect{timestamp = Ts, collect = Collect}),
+    {Received, Sent, Dropped}.
+
+avg(Items) ->
+    lists:sum(Items) div count().
+
+diff(Item0, Item1) ->
+    Item0 - Item1.
+
+timer(Secs, Msg) ->
+    erlang:send_after(Secs, self(), Msg).
+
+get_today_remaining_seconds() ->
+    ?CLEAR_INTERVAL - (get_local_time() rem ?CLEAR_INTERVAL).
+
+get_local_time() ->
+    (calendar:datetime_to_gregorian_seconds(calendar:local_time()) -
+        calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})) * 1000.

+ 205 - 0
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -0,0 +1,205 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_dashboard_monitor_api).
+
+-include("emqx_dashboard.hrl").
+
+-behaviour(minirest_api).
+
+-export([ sampling/1
+        , sampling/2
+        , get_collect/1
+        ]).
+
+-export([api_spec/0]).
+
+-export([counters/2, current_counters/2]).
+
+-define(COUNTERS, [ connection
+                  , route
+                  , subscriptions
+                  , received
+                  , sent
+                  , dropped]).
+
+api_spec() ->
+    {[monitor_api(), monitor_current_api()], [counters_schema()]}.
+
+monitor_api() ->
+    Metadata = #{
+        get => #{
+            description => <<"List monitor data">>,
+            parameters => [
+                #{
+                    name => node,
+                    in => query,
+                    required => false,
+                    schema => #{type => string},
+                    example => node()
+                },
+                #{
+                    name => counter,
+                    in => query,
+                    required => false,
+                    schema => #{type => string, enum => ?COUNTERS}
+                }
+            ],
+            responses => #{
+                <<"200">> => emqx_mgmt_util:response_array_schema(<<"Monitor count data">>, counters)}}},
+    {"/monitor", Metadata, counters}.
+monitor_current_api() ->
+    Metadata = #{
+        get => #{
+            description => <<"Current monitor data">>,
+            responses => #{
+                <<"200">> => emqx_mgmt_util:response_schema(<<"Current monitor data">>,
+                    current_counters_schema())}}},
+    {"/monitor/current", Metadata, current_counters}.
+
+current_counters_schema() ->
+    #{
+        type => object,
+        properties => #{
+            nodes => #{
+                type => integer,
+                description => <<"Nodes count">>},
+            connection => #{type => integer},
+            sent => #{type => integer},
+            received => #{type => integer},
+            subscription => #{type => integer}}
+    }.
+
+counters_schema() ->
+    Node = #{
+        node => #{
+            type => string,
+            example => node()
+        }
+    },
+    Properties = lists:foldl(fun(K, M) -> maps:merge(M, counters_schema(K)) end, Node, ?COUNTERS),
+    #{
+        counters => #{
+            type => object,
+            properties => Properties}
+    }.
+
+counters_schema(Name) ->
+    #{Name => #{
+        type => array,
+        items => #{
+            type => object,
+            properties => #{
+                timestamp => #{
+                    type => integer},
+                count => #{
+                    type => integer}}}}}.
+%%%==============================================================================================
+%% parameters trans
+counters(get, Request) ->
+    case cowboy_req:parse_qs(Request) of
+        [] ->
+            Response = [sampling(Node) || Node <- ekka_mnesia:running_nodes()],
+            {200, Response};
+        Params ->
+            lookup(Params)
+    end.
+
+current_counters(get, _) ->
+    Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()],
+    Nodes = length(ekka_mnesia:running_nodes()),
+    {Received, Sent, Sub, Conn} = format_current_metrics(Data),
+    Response = #{
+        nodes           => Nodes,
+        received        => Received,
+        sent            => Sent,
+        subscription    => Sub,
+        connection      => Conn
+    },
+    {200, Response}.
+
+    %%%==============================================================================================
+%% api apply
+
+lookup(Params) ->
+    Fun =
+        fun({K,V}, M) ->
+            maps:put(binary_to_atom(K, utf8), binary_to_atom(V, utf8), M)
+        end,
+    lookup_(lists:foldl(Fun, #{}, Params)).
+
+lookup_(#{node := Node, counter := Counter}) ->
+    {200, sampling(Node, Counter)};
+lookup_(#{node := Node}) ->
+    {200, sampling(Node)};
+lookup_(#{counter := Counter}) ->
+    Data = [sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()],
+    {200, Data}.
+
+format_current_metrics(Collects) ->
+    format_current_metrics(Collects, {0,0,0,0}).
+format_current_metrics([], Acc) ->
+    Acc;
+format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) ->
+    format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
+
+get_collect(Node) when Node =:= node() ->
+    emqx_dashboard_collection:get_collect();
+get_collect(Node) ->
+    case rpc:call(Node, emqx_dashboard_collection, get_collect, []) of
+        {badrpc, _Reason} -> #{};
+        Res -> Res
+    end.
+
+sampling(Node) when Node =:= node() ->
+    Time = emqx_dashboard_collection:get_local_time() - 7200000,
+    All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
+    maps:put(node, Node, format(lists:sort(All)));
+sampling(Node) ->
+    rpc:call(Node, ?MODULE, sampling, [Node]).
+
+sampling(Node, Counter) when Node =:= node() ->
+    Time = emqx_dashboard_collection:get_local_time() - 7200000,
+    All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
+    maps:put(node, Node, format_single(lists:sort(All), Counter));
+sampling(Node, Counter) ->
+    rpc:call(Node, ?MODULE, sampling, [Node, Counter]).
+
+format(Collects) ->
+    format(Collects, {[],[],[],[],[],[]}).
+format([], {Connection, Route, Subscription, Received, Sent, Dropped}) ->
+    #{
+        connection      => add_key(Connection),
+        route           => add_key(Route),
+        subscriptions   => add_key(Subscription),
+        received        => add_key(Received),
+        sent            => add_key(Sent),
+        dropped         => add_key(Dropped)
+    };
+
+format([#mqtt_collect{timestamp = Ts, collect = {C, R, S, Re, S1, D}} | Collects],
+       {Connection, Route, Subscription, Received, Sent, Dropped}) ->
+    format(Collects, {[[Ts, C]  | Connection],
+                      [[Ts, R]  | Route],
+                      [[Ts, S]  | Subscription],
+                      [[Ts, Re] | Received],
+                      [[Ts, S1] | Sent],
+                      [[Ts, D]  | Dropped]}).
+add_key(Collects) ->
+    lists:reverse([#{timestamp => Ts, count => C} || [Ts, C] <- Collects]).
+
+format_single(Collects, Counter) ->
+    #{Counter => format_single(Collects, counter_index(Counter), [])}.
+format_single([], _Index, Acc) ->
+    lists:reverse(Acc);
+format_single([#mqtt_collect{timestamp = Ts, collect = Collect} | Collects], Index, Acc) ->
+    format_single(Collects, Index,
+        [#{timestamp => Ts, count => erlang:element(Index, Collect)} | Acc]).
+
+counter_index(connection)    -> 1;
+counter_index(route)         -> 2;
+counter_index(subscriptions) -> 3;
+counter_index(received)      -> 4;
+counter_index(sent)          -> 5;
+counter_index(dropped)       -> 6.

+ 1 - 0
apps/emqx_dashboard/src/emqx_dashboard_schema.erl

@@ -27,6 +27,7 @@ fields("emqx_dashboard") ->
                                                hoconsc:ref(?MODULE, "https")]))}
     , {default_username, fun default_username/1}
     , {default_password, fun default_password/1}
+    , {sample_interval, emqx_schema:t(emqx_schema:duration_s(), undefined, "10s")}
     ];
 
 fields("http") ->

+ 2 - 2
apps/emqx_dashboard/src/emqx_dashboard_sup.erl

@@ -28,5 +28,5 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    {ok, { {one_for_all, 10, 100}, [?CHILD(emqx_dashboard_admin)] } }.
-
+    {ok, {{one_for_all, 10, 100},
+        [?CHILD(emqx_dashboard_admin), ?CHILD(emqx_dashboard_collection)]}}.

+ 1 - 1
rebar.config

@@ -17,7 +17,7 @@
 
 %% Check for the mnesia calls forbidden by Ekka:
 {xref_queries,
- [ {"E || \"mnesia\":\"dirty_write\"/\".*\" : Fun", []}
+ [ {"E || \"mnesia\":\"dirty_write\"/\".*\" : Fun", [{{emqx_dashboard_collection,flush,2},{mnesia,dirty_write,2}}]}
  , {"E || \"mnesia\":\"dirty_delete.*\"/\".*\" : Fun", []}
  , {"E || \"mnesia\":\"transaction\"/\".*\" : Fun", []}
  , {"E || \"mnesia\":\"async_dirty\"/\".*\" : Fun", []}