Feng Lee 10 лет назад
Родитель
Сommit
750fb939b1

+ 5 - 7
apps/emqttd/include/emqttd_systop.hrl

@@ -43,8 +43,6 @@
 -define(SYSTOP_CLIENTS, [
     'clients/count',         % clients connected current
     'clients/max'            % max clients connected
-    %'clients/connected',     
-    %'clients/disconnected',  
 ]).
 
 %%------------------------------------------------------------------------------
@@ -59,12 +57,12 @@
 %% $SYS Topics for Subscribers
 %%------------------------------------------------------------------------------
 -define(SYSTOP_PUBSUB, [
-    'queues/count',        % ...
-    'queues/max',        % ...
-    'topics/count',        % ...
+    'topics/count',      % ...
     'topics/max',        % ...
-    'subscribers/count',   % ...
-    'subscribers/max'      % ...
+    'subscribers/count', % ...
+    'subscribers/max',   % ...
+    'queues/count',      % ...
+    'queues/max'         % ...
 ]).
 
 %%------------------------------------------------------------------------------

+ 1 - 2
apps/emqttd/src/emqttd_app.erl

@@ -68,8 +68,7 @@ print_vsn() ->
     ?PRINT("~s ~s is running now~n", [Desc, Vsn]).
 
 start_servers(Sup) ->
-    Servers = [{"emqttd event", emqttd_event},
-               {"emqttd trace", emqttd_trace},
+    Servers = [{"emqttd trace", emqttd_trace},
                {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
                {"emqttd client manager", {supervisor, emqttd_cm_sup}},
                {"emqttd session manager", {supervisor, emqttd_sm_sup}},

+ 16 - 0
apps/emqttd/src/emqttd_broker.erl

@@ -39,6 +39,9 @@
 %% API Function Exports
 -export([start_link/0]).
 
+%% Running nodes
+-export([running_nodes/0]).
+
 %% Event API
 -export([subscribe/1, notify/2]).
 
@@ -71,6 +74,13 @@
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
+%%------------------------------------------------------------------------------
+%% @doc Get running nodes
+%% @end
+%%------------------------------------------------------------------------------
+running_nodes() ->
+    mnesia:system_info(running_db_nodes).
+
 %%------------------------------------------------------------------------------
 %% @doc Subscribe broker event
 %% @end
@@ -205,6 +215,7 @@ init([]) ->
     random:seed(now()),
     ets:new(?BROKER_TAB, [set, public, named_table]),
     % Create $SYS Topics
+    emqttd_pubsub:create(<<"$SYS/brokers">>),
     [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS],
     % Tick
     {ok, #state{started_at = os:timestamp(), tick_tref = start_tick(tick)}, hibernate}.
@@ -244,6 +255,7 @@ handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info(tick, State) ->
+    retain(brokers),
     retain(version, list_to_binary(version())),
     retain(sysdescr, list_to_binary(sysdescr())),
     publish(uptime, list_to_binary(uptime(State))),
@@ -266,6 +278,10 @@ code_change(_OldVsn, State, _Extra) ->
 create_topic(Topic) ->
     emqttd_pubsub:create(emqtt_topic:systop(Topic)).
 
+retain(brokers) ->
+    Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),
+    publish(#mqtt_message{retain = true, topic = <<"$SYS/brokers">>, payload = Payload}).
+
 retain(Topic, Payload) when is_binary(Payload) ->
     publish(#mqtt_message{retain = true,
                           topic = emqtt_topic:systop(Topic),

+ 4 - 2
apps/emqttd/src/emqttd_client.erl

@@ -257,8 +257,10 @@ inc(_) ->
 notify(disconnected, _Reason, undefined) -> ingore;
 
 notify(disconnected, {shutdown, Reason}, ProtoState) ->
-    emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason});
+    %emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason});
+    ok;
 
 notify(disconnected, Reason, ProtoState) ->
-    emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}).
+    %emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}).
+    ok.
 

+ 1 - 0
apps/emqttd/src/emqttd_cluster.erl

@@ -34,6 +34,7 @@
 %% @doc Get running nodes
 %% @end
 %%------------------------------------------------------------------------------
+%%TODO: remove...
 running_nodes() ->
     mnesia:system_info(running_db_nodes).
 

+ 0 - 122
apps/emqttd/src/emqttd_event.erl

@@ -1,122 +0,0 @@
-%%%-----------------------------------------------------------------------------
-%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
-%%%
-%%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%%% of this software and associated documentation files (the "Software"), to deal
-%%% in the Software without restriction, including without limitation the rights
-%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%%% copies of the Software, and to permit persons to whom the Software is
-%%% furnished to do so, subject to the following conditions:
-%%%
-%%% The above copyright notice and this permission notice shall be included in all
-%%% copies or substantial portions of the Software.
-%%%
-%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%%% SOFTWARE.
-%%%-----------------------------------------------------------------------------
-%%% @doc
-%%% emqttd event manager.
-%%%
-%%% @end
-%%%-----------------------------------------------------------------------------
--module(emqttd_event).
-
--author("Feng Lee <feng@emqtt.io>").
-
--include_lib("emqtt/include/emqtt.hrl").
-
-%% API Function Exports
--export([start_link/0, add_handler/2, notify/1]).
-
-%% gen_event Function Exports
--export([init/1, handle_event/2, handle_call/2, handle_info/2,
-         terminate/2, code_change/3]).
-
--record(state, {systop}).
-
-%%------------------------------------------------------------------------------
-%% @doc Start event manager
-%% @end
-%%------------------------------------------------------------------------------
--spec start_link() -> {ok, pid()} | {error, any()}.
-start_link() ->
-    case gen_event:start_link({local, ?MODULE}) of
-        {ok, Pid} -> 
-            add_handler(?MODULE, []),
-            {ok, Pid};
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-add_handler(Handler, Args) ->
-    gen_event:add_handler(?MODULE, Handler, Args).
-
-notify(Event) ->
-    gen_event:notify(?MODULE, Event).
-
-%%%=============================================================================
-%%% gen_event callbacks
-%%%=============================================================================
-
-init([]) ->
-    SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node(), "/"])),
-    {ok, #state{systop = SysTop}}.
-
-handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
-    Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>,
-    Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)},
-    emqttd_pubsub:publish(event, Msg),
-    {ok, State};
-
-%%TODO: Protect from undefined clientId...
-handle_event({disconnected, undefined, Reason}, State = #state{systop = SysTop}) ->
-    {ok, State};
-
-handle_event({disconnected, ClientId, Reason}, State = #state{systop = SysTop}) ->
-    Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
-    Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
-    emqttd_pubsub:publish(event, Msg),
-    {ok, State};
-
-handle_event({subscribed, ClientId, TopicTable}, State) ->
-    lager:error("TODO: subscribed ~s, ~p", [ClientId, TopicTable]),
-    {ok, State};
-
-handle_event({unsubscribed, ClientId, Topics}, State) ->
-    lager:error("TODO: unsubscribed ~s, ~p", [ClientId, Topics]),
-    {ok, State};
-
-handle_event(_Event, State) ->
-    {ok, State}.
-
-handle_call(_Request, State) ->
-    Reply = ok,
-    {ok, Reply, State}.
-
-handle_info(_Info, State) ->
-    {ok, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%%=============================================================================
-%%% Internal functions
-%%%=============================================================================
-
-payload(connected, Params) ->
-    From = proplists:get_value(from, Params),
-    Proto = proplists:get_value(protocol, Params),
-    Sess = proplists:get_value(session, Params),
-    iolist_to_binary(io_lib:format("from: ~s~nprotocol: ~p~nsession: ~s", [From, Proto, Sess]));
-
-payload(disconnected, Reason) ->
-    list_to_binary(io_lib:format("reason: ~p", [Reason])).
-

+ 4 - 4
apps/emqttd/src/emqttd_metrics.erl

@@ -163,7 +163,7 @@ init([]) ->
     % Init metrics
     [create_metric(Metric) ||  Metric <- Metrics],
     % $SYS Topics for metrics
-    [ok = create_topic(Topic) || {_, Topic} <- Metrics],
+    [ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics],
     % Tick to publish metrics
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
 
@@ -192,7 +192,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%%=============================================================================
 
 publish(Metric, Val) ->
-    emqttd_pubsub:publish(metrics, #mqtt_message{topic = emqtt_topic:systop(Metric),
+    emqttd_pubsub:publish(metrics, #mqtt_message{topic   = metric_topic(Metric),
                                                  payload = emqttd_util:integer_to_binary(Val)}).
 
 create_metric({gauge, Name}) ->
@@ -202,7 +202,7 @@ create_metric({counter, Name}) ->
     Schedulers = lists:seq(1, erlang:system_info(schedulers)),
     [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
 
-create_topic(Topic) ->
-    emqttd_pubsub:create(emqtt_topic:systop(Topic)).
+metric_topic(Metric) ->
+    emqtt_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))).
 
 

+ 53 - 0
apps/emqttd/src/emqttd_mod_presence.erl

@@ -0,0 +1,53 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd presence management module.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_mod_presence).
+
+-include_lib("emqtt/include/emqtt.hrl").
+
+-export([load/1, unload/1]).
+
+-export([client_connected/2, client_disconnected/2]).
+
+load(Opts) ->
+    emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}),
+    emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
+    {ok, Opts}.
+
+client_connected({Client, ClientId}, _Opts) ->
+    Topic = emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])),
+    Payload = iolist_to_binary(mochijson2:encode([{ts, emqttd_util:timestamp()}])),
+    emqttd_pubsub:publish(presence, #mqtt_message{topic = Topic, payload = Payload}).
+
+client_disconnected({ClientId, Reason}, _Opts) ->
+    Topic = emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])),
+    Payload = iolist_to_binary(mochijson2:encode([{reason, Reason}, {ts, emqttd_util:timestamp()}])),
+    emqttd_pubsub:publish(presence, #mqtt_message{topic = Topic, payload = Payload}).
+
+unload(_Opts) ->
+    emqttd_broker:unhook(client_connected, {?MODULE, client_connected}),
+    emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}).
+

+ 2 - 2
apps/emqttd/src/emqttd_protocol.erl

@@ -422,7 +422,7 @@ notify(connected, ReturnCode, #proto_state{peername   = Peername,
     Params = [{from, emqttd_net:format(Peername)},
               {protocol, ProtoVer},
               {session, Sess},
-              {connack, ReturnCode}],
-    emqttd_event:notify({connected, ClientId, Params}).
+              {connack, ReturnCode}].
+    %emqttd_event:notify({connected, ClientId, Params}).
 
 

+ 5 - 2
apps/emqttd/src/emqttd_stats.erl

@@ -126,7 +126,7 @@ init([]) ->
     Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
     [ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
     % Create $SYS Topics
-    [ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics],
+    [ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics],
     % Tick to publish stats
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
 
@@ -154,6 +154,9 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%=============================================================================
 publish(Stat, Val) ->
-    emqttd_pubsub:publish(stats, #mqtt_message{topic   = emqtt_topic:systop(Stat),
+    emqttd_pubsub:publish(stats, #mqtt_message{topic   = stats_topic(Stat),
                                                payload = emqttd_util:integer_to_binary(Val)}).
 
+stats_topic(Stat) ->
+    emqtt_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
+