Browse Source

broker, metrics

Ery Lee 11 years ago
parent
commit
818d4741a6

+ 2 - 2
apps/emqtt/include/emqtt_topic.hrl

@@ -25,8 +25,8 @@
 %%------------------------------------------------------------------------------
 -record(topic, {
     name    :: binary(),
-    type    :: static | dynamic | bridge
-    node    :: node(),
+    type    :: static | dynamic | bridge,
+    node    :: node()
 }).
 
 -type topic() :: #topic{}.

+ 3 - 0
apps/emqtt/src/emqtt_app.erl

@@ -61,6 +61,7 @@ print_vsn() ->
 start_servers(Sup) ->
     {ok, SessOpts} = application:get_env(session),
     {ok, RetainOpts} = application:get_env(retain),
+    {ok, BrokerOpts} = application:get_env(broker),
 	lists:foreach(
         fun({Name, F}) when is_function(F) ->
 			?PRINT("~s is starting...", [Name]),
@@ -83,6 +84,8 @@ start_servers(Sup) ->
          {"emqtt auth", emqtt_auth},
 		 {"emqtt pubsub", emqtt_pubsub},
 		 {"emqtt router", emqtt_router},
+		 {"emqtt broker", emqtt_broker, BrokerOpts},
+		 {"emqtt metrics", emqtt_metrics},
 		 {"emqtt monitor", emqtt_monitor}
 		]).
 

+ 8 - 7
apps/emqtt/src/emqtt_broker.erl

@@ -58,12 +58,10 @@ start_link(Options) ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
 
 version() ->
-    {ok, Version} = application:get_key(emqtt, vsn),
-    Version.
+    {ok, Version} = application:get_key(emqtt, vsn), Version.
 
 description() ->
-    {ok, Descr} = application:get_key(emqtt, description),
-    Descr.
+    {ok, Descr} = application:get_key(emqtt, description), Descr.
 
 uptime() ->
     gen_server:call(?SERVER, uptime).
@@ -78,9 +76,8 @@ init([Options]) ->
     ets:new(?MODULE, [set, public, name_table, {write_concurrency, true}]),
     {ok, #state{started_at = os:timestamp(), sys_interval = SysInterval}}.
 
-handle_call(uptime, _From, State = #state{started_at = Ts}) ->
-    Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
-    {reply, uptime(seconds, Secs), State};
+handle_call(uptime, _From, State) ->
+    {reply, uptime(State), State};
 
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
@@ -103,6 +100,10 @@ code_change(_OldVsn, State, _Extra) ->
 systop(Topic) ->
     <<"$SYS/broker/", Topic/binary>>.
 
+uptime(#state{started_at = Ts}) ->
+    Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
+    uptime(seconds, Secs).
+
 uptime(seconds, Secs) when Secs < 60 ->
     [integer_to_list(Secs), " seconds"];
 uptime(seconds, Secs) ->

+ 1 - 3
apps/emqtt/src/emqtt_metrics.erl

@@ -80,7 +80,7 @@ dec(Metric, Val) ->
 
 init(_Args) ->
     % Bytes sent and received
-    [ok = emqtt_pubsub:create(<<"$SYS/broker/", Topic/binary>>) || Topic <- ?SYSTOP_METRICS],
+    [ok = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS],
     % $SYS/broker/version
     %## Uptime
     % $SYS/broker/uptime
@@ -111,5 +111,3 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Function Definitions
 %% ------------------------------------------------------------------
 
-
-

+ 5 - 0
apps/emqtt/src/emqtt_pubsub.erl

@@ -39,6 +39,7 @@
 -export([start_link/0]).
 
 -export([topics/0,
+        create/1,
 		subscribe/2,
 		unsubscribe/2,
 		publish/1,
@@ -94,6 +95,10 @@ start_link() ->
 topics() ->
 	mnesia:dirty_all_keys(topic).
 
+%TODO
+create(Topic) -> 
+    ok.
+
 %%
 %% @doc Subscribe Topic or Topics
 %%

+ 3 - 0
rel/files/app.config

@@ -49,6 +49,9 @@
     {retain, [
         {store_limit, 100000}
     ]},
+    {broker, [
+        {sys_interval, 60}
+    ]},
     {listen, [
         {mqtt, 1883, [
             {backlog,   512},