Explorar o código

broker metrics

Ery Lee %!s(int64=11) %!d(string=hai) anos
pai
achega
82772e4e38
Modificáronse 3 ficheiros con 72 adicións e 32 borrados
  1. 21 8
      apps/emqtt/src/emqtt_broker.erl
  2. 48 24
      apps/emqtt/src/emqtt_metrics.erl
  3. 3 0
      rel/files/app.config

+ 21 - 8
apps/emqtt/src/emqtt_broker.erl

@@ -26,6 +26,8 @@
 %%%-----------------------------------------------------------------------------
 -module(emqtt_broker).
 
+-include("emqtt_packet.hrl").
+
 -include("emqtt_topic.hrl").
 
 -behaviour(gen_server).
@@ -47,8 +49,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {started_at, sys_interval}).
-
+-record(state, {started_at, sys_interval, tick_timer}).
 
 %% ------------------------------------------------------------------
 %% API Function Definitions
@@ -72,9 +73,10 @@ uptime() ->
 init([Options]) ->
     SysInterval = proplists:get_value(sys_interval, Options, 60),
     % Create $SYS Topics
-    [emqtt_pubsub:create(systop(Topic)) || Topic <- ?SYSTOP_BROKER],
-    ets:new(?MODULE, [set, public, name_table, {write_concurrency, true}]),
-    {ok, #state{started_at = os:timestamp(), sys_interval = SysInterval}}.
+    [emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER],
+    ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]),
+    State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
+    {ok, tick(State)}.
 
 handle_call(uptime, _From, State) ->
     {reply, uptime(State), State};
@@ -85,6 +87,12 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
+handle_info(tick, State) ->
+    publish(true, <<"$SYS/broker/version">>, version()),
+    publish(false, <<"$SYS/broker/uptime">>, uptime(State)),
+    publish(true, <<"$SYS/broker/description">>, description()),
+    {noreply, tick(State)};
+
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -97,12 +105,15 @@ code_change(_OldVsn, State, _Extra) ->
 %% ------------------------------------------------------------------
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
-systop(Topic) ->
-    <<"$SYS/broker/", Topic/binary>>.
+publish(Retain, Topic, Payload) when is_list(Payload) ->
+    publish(Retain, Topic, list_to_binary(Payload));
+
+publish(Retain, Topic, Payload) when is_binary(Payload) ->
+    emqtt_router:route(#mqtt_message{retain = Retain, topic = Topic, payload = Payload}).
 
 uptime(#state{started_at = Ts}) ->
     Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
-    uptime(seconds, Secs).
+    lists:flatten(uptime(seconds, Secs)).
 
 uptime(seconds, Secs) when Secs < 60 ->
     [integer_to_list(Secs), " seconds"];
@@ -119,4 +130,6 @@ uptime(hours, H) ->
 uptime(days, D) ->
     [integer_to_list(D), " days,"].
 
+tick(State = #state{sys_interval = SysInterval}) ->
+    State#state{tick_timer = erlang:send_after(SysInterval * 1000, self(), tick)}.
 

+ 48 - 24
apps/emqtt/src/emqtt_metrics.erl

@@ -38,9 +38,11 @@
 %% API Function Exports
 %% ------------------------------------------------------------------
 
--export([start_link/0]).
+-export([start_link/1]).
 
--export([get_all/0, get_value/1, inc/1, dec/2]).
+-export([all/0, value/1,
+         inc/1, inc/2,
+         dec/1, dec/2]).
 
 %% ------------------------------------------------------------------
 %% gen_server Function Exports
@@ -48,46 +50,55 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {}).
+-record(state, {pub_interval, tick_timer}).
 
 %% ------------------------------------------------------------------
 %% API Function Definitions
 %% ------------------------------------------------------------------
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-get_all() -> 
-    gen_server:call(?SERVER, get_all).
-
-get_value(Metric) ->
-    gen_server:call(?SERVER, {get_value, Metric}).
+start_link(Options) ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
+
+all() ->
+    maps:to_list(
+        lists:foldl(
+            fun({{Metric, _N}, Val}, Map) ->
+                    case maps:find(Metric, Map) of
+                        {ok, Count} -> maps:put(Metric, Count+Val);
+                        error -> maps:put(Metric, 0)
+                    end
+            end, #{}, ets:tab2list(?TABLE))).
+
+value(Metric) ->
+    lists:sum(ets:select(?TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
 
 inc(Metric) ->
-    ok.
+    inc(Metric, 1).
 
 inc(Metric, Val) ->
-    ok.
+    ets:update_counter(?TABLE, key(Metric), {2, Val}).
 
 dec(Metric) ->
-    ok.
+    dec(Metric, 1).
 
 dec(Metric, Val) ->
-    ok.
+    %TODO: ok?
+    ets:update_counter(?TABLE, key(Metric), {2, -Val}).
+
+key(Metric) ->
+    {Metric, erlang:system_info(scheduler_id)}.
 
 %% ------------------------------------------------------------------
 %% gen_server Function Definitions
 %% ------------------------------------------------------------------
-
-init(_Args) ->
-    % Bytes sent and received
+init(Options) ->
+    % $SYS Topics for metrics
     [ok = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS],
-    % $SYS/broker/version
-    %## Uptime
-    % $SYS/broker/uptime
-    % $SYS/broker/clients/connected
-    % $SYS/broker/clients/disconnected
+    % Create metrics table
     ets:new(?TABLE, [set, public, named_table, {write_concurrency, true}]),
-    {ok, #state{}}.
+    % Init metrics
+    [new(Metric) || <<"$SYS/broker/", Metric/binary>> <- ?SYSTOP_METRICS],
+    PubInterval = proplists:get_value(pub_interval, Options, 60),
+    {ok, tick(#state{pub_interval = PubInterval})}.
 
 handle_call(get_metrics, _From, State) ->
     {reply, [], State};
@@ -98,6 +109,11 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
+handle_info(tick, State) ->
+    %TODO:...
+    % publish metric message
+    {noreply, tick(State)};
+
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -111,3 +127,11 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
+new(Metric) ->
+    Key = list_to_tuple([list_to_atom(binary_to_list(Token)) 
+                         || Token <- binary:split(Metric, <<"/">>, [global])]),
+    [ets:insert(?TABLE, {{Key, N}, 0}) || N <- lists:seq(1, erlang:system_info(schedulers))].
+
+tick(State = #state{pub_interval = PubInterval}) ->
+    State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}.
+

+ 3 - 0
rel/files/app.config

@@ -52,6 +52,9 @@
     {broker, [
         {sys_interval, 60}
     ]},
+    {metrics, [
+        {pub_interval, 60}
+    ]},
     {listen, [
         {mqtt, 1883, [
             {backlog,   512},