Feng 10 лет назад
Родитель
Сommit
855152f653

+ 2 - 2
src/emqttd_client.erl

@@ -36,7 +36,7 @@
 %% API Function Exports
 -export([start_link/2, session/1, info/1, kick/1]).
 
-%% SUB/UNSUB Asynchronously, called by plugins.
+%% SUB/UNSUB Asynchronously. Called by plugins.
 -export([subscribe/2, unsubscribe/2]).
 
 %% gen_server Function Exports
@@ -243,7 +243,7 @@ with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
     Fun(emqttd_protocol:session(ProtoState)),
     hibernate(State).
 
-%% receive and parse tcp data
+%% Receive and parse tcp data
 received(<<>>, State) ->
     hibernate(State);
 

+ 1 - 1
src/emqttd_cm_sup.erl

@@ -48,7 +48,7 @@ init([]) ->
 
     %% CM Pool Sup
     MFA = {?CM, start_link, [emqttd_stats:statsfun('clients/count', 'clients/max')]},
-    PoolSup = emqttd_pool_sup:spec(pool_sup, [?CM, hash, erlang:system_info(schedulers), MFA]),
+    PoolSup = emqttd_pool_sup:spec([?CM, hash, erlang:system_info(schedulers), MFA]),
 
     {ok, {{one_for_all, 10, 3600}, [PoolSup]}}.
 

+ 2 - 2
src/emqttd_http.erl

@@ -41,7 +41,7 @@ handle_request('GET', "/status", Req) ->
     AppStatus =
     case lists:keysearch(emqttd, 1, application:which_applications()) of
         false         -> not_running;
-        {value, _Ver} -> running
+        {value, _Val} -> running
     end,
     Status = io_lib:format("Node ~s is ~s~nemqttd is ~s",
                             [node(), InternalStatus, AppStatus]),
@@ -78,7 +78,7 @@ handle_request('POST', "/mqtt/publish", Req) ->
 %% MQTT Over WebSocket
 %%------------------------------------------------------------------------------
 handle_request('GET', "/mqtt", Req) ->
-    lager:info("Websocket Connection from: ~s", [Req:get(peer)]),
+    lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
     Upgrade = Req:get_header_value("Upgrade"),
     Proto   = Req:get_header_value("Sec-WebSocket-Protocol"),
     case {is_websocket(Upgrade), Proto} of

+ 1 - 1
src/emqttd_mnesia.erl

@@ -35,7 +35,7 @@
 
 start() ->
     case init_schema() of
-        ok -> 
+        ok ->
             ok;
         {error, {_Node, {already_exists, _Node}}} ->
             ok;

+ 3 - 5
src/emqttd_opts.erl

@@ -35,14 +35,12 @@ merge(Defaults, Options) ->
     lists:foldl(
         fun({Opt, Val}, Acc) ->
                 case lists:keymember(Opt, 1, Acc) of
-                    true ->
-                        lists:keyreplace(Opt, 1, Acc, {Opt, Val});
-                    false ->
-                        [{Opt, Val}|Acc]
+                    true  -> lists:keyreplace(Opt, 1, Acc, {Opt, Val});
+                    false -> [{Opt, Val}|Acc]
                 end;
             (Opt, Acc) ->
                 case lists:member(Opt, Acc) of
-                    true -> Acc;
+                    true  -> Acc;
                     false -> [Opt | Acc]
                 end
         end, Defaults, Options).

+ 8 - 4
src/emqttd_pool_sup.erl

@@ -28,14 +28,18 @@
 -behaviour(supervisor).
 
 %% API
--export([spec/2, start_link/3, start_link/4]).
+-export([spec/1, spec/2, start_link/3, start_link/4]).
 
 %% Supervisor callbacks
 -export([init/1]).
 
+-spec spec(list()) -> supervisor:child_spec().
+spec(Args) ->
+    spec(pool_sup, Args).
+
 -spec spec(any(), list()) -> supervisor:child_spec().
-spec(Id, Args) ->
-    {Id, {?MODULE, start_link, Args},
+spec(ChildId, Args) ->
+    {ChildId, {?MODULE, start_link, Args},
         transient, infinity, supervisor, [?MODULE]}.
 
 -spec start_link(atom(), atom(), mfa()) -> {ok, pid()} | {error, any()}.
@@ -47,7 +51,7 @@ start_link(Pool, Type, MFA) ->
 start_link(Pool, Type, Size, MFA) ->
     supervisor:start_link({local, sup_name(Pool)}, ?MODULE, [Pool, Type, Size, MFA]).
 
-sup_name(Pool) ->
+sup_name(Pool) when is_atom(Pool) ->
     list_to_atom(atom_to_list(Pool) ++ "_pool_sup").
 
 init([Pool, Type, Size, {M, F, Args}]) ->

+ 5 - 4
src/emqttd_pooler.erl

@@ -63,16 +63,17 @@ name(Id) ->
 %% @end
 %%------------------------------------------------------------------------------
 submit(Fun) ->
-    Worker = gproc_pool:pick_worker(pooler),
-    gen_server:call(Worker, {submit, Fun}, infinity).
+    gen_server:call(worker(), {submit, Fun}, infinity).
 
 %%------------------------------------------------------------------------------
 %% @doc Submit work to pooler asynchronously
 %% @end
 %%------------------------------------------------------------------------------
 async_submit(Fun) ->
-    Worker = gproc_pool:pick_worker(pooler),
-    gen_server:cast(Worker, {async_submit, Fun}).
+    gen_server:cast(worker(), {async_submit, Fun}).
+
+worker() ->
+    gproc_pool:pick_worker(pooler).
 
 %%%=============================================================================
 %%% gen_server callbacks

+ 53 - 25
src/emqttd_pubsub.erl

@@ -40,9 +40,9 @@
 -copy_mnesia({mnesia, [copy]}).
 
 %% API Exports 
--export([start_link/3]).
+-export([start_link/4]).
 
--export([create/1, subscribe/1, subscribe/2,
+-export([create/2, subscribe/1, subscribe/2,
          unsubscribe/1, unsubscribe/2, publish/1]).
 
 %% Local node
@@ -56,7 +56,7 @@
 -compile(export_all).
 -endif.
 
--record(state, {pool, id}).
+-record(state, {pool, id, statsfun}).
 
 -define(ROUTER, emqttd_router).
 
@@ -123,26 +123,33 @@ cache_env(Key) ->
 %% @doc Start one pubsub server
 %% @end
 %%------------------------------------------------------------------------------
--spec start_link(Pool, Id, Opts) -> {ok, pid()} | ignore | {error, any()} when
-    Pool :: atom(),
-    Id   :: pos_integer(),
-    Opts :: list(tuple()).
-start_link(Pool, Id, Opts) ->
-    gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, Opts], []).
+-spec start_link(Pool, Id, StatsFun, Opts) -> {ok, pid()} | ignore | {error, any()} when
+    Pool     :: atom(),
+    Id       :: pos_integer(),
+    StatsFun :: fun(),
+    Opts     :: list(tuple()).
+start_link(Pool, Id, StatsFun, Opts) ->
+    gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []).
 
 name(Id) ->
     list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)).
 
 %%------------------------------------------------------------------------------
-%% @doc Create Topic.
+%% @doc Create Topic or Subscription.
 %% @end
 %%------------------------------------------------------------------------------
--spec create(Topic :: binary()) -> ok | {error, Error :: any()}.
-create(Topic) when is_binary(Topic) ->
+-spec create(topic | subscription, binary()) -> ok | {error, any()}.
+create(topic, Topic) when is_binary(Topic) ->
     Record = #mqtt_topic{topic = Topic, node = node()},
     case mnesia:transaction(fun add_topic/1, [Record]) of
         {atomic, ok}     -> ok;
         {aborted, Error} -> {error, Error}
+    end;
+
+create(subscription, {SubId, Topic, Qos}) ->
+    case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, Qos}]) of
+        {atomic, ok}     -> ok;
+        {aborted, Error} -> {error, Error}
     end.
 
 %%------------------------------------------------------------------------------
@@ -233,12 +240,13 @@ match(Topic) when is_binary(Topic) ->
 %%% gen_server callbacks
 %%%=============================================================================
 
-init([Pool, Id, Opts]) ->
+init([Pool, Id, StatsFun, Opts]) ->
     ?ROUTER:init(Opts),
     ?GPROC_POOL(join, Pool, Id),
-    {ok, #state{pool = Pool, id = Id}}.
+    {ok, #state{pool = Pool, id = Id, statsfun = StatsFun}}.
 
-handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) ->
+handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
+            State = #state{statsfun = StatsFun}) ->
     %% Add routes first
     ?ROUTER:add_routes(TopicTable, SubPid),
 
@@ -247,11 +255,13 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) ->
 
     case mnesia:transaction(fun add_topics/1, [Topics]) of
         {atomic, _} ->
+            StatsFun(topic),
             if_subscription(
                 fun(_) ->
                     %% Add subscriptions
                     Args = [fun add_subscriptions/2, [SubId, TopicTable]],
-                    emqttd_pooler:async_submit({mnesia, async_dirty, Args})
+                    emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
+                    StatsFun(subscription)
                 end),
             {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State};
         {aborted, Error} ->
@@ -262,14 +272,16 @@ handle_call(Req, _From, State) ->
     lager:error("Bad Request: ~p", [Req]),
 	{reply, {error, badreq}, State}.
 
-handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State) ->
+handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) ->
     %% Delete routes first
     ?ROUTER:delete_routes(Topics, SubPid),
+
     %% Remove subscriptions
     if_subscription(
         fun(_) ->
             Args = [fun remove_subscriptions/2, [SubId, Topics]],
-            emqttd_pooler:async_submit({mnesia, async_dirty, Args})
+            emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
+            StatsFun(subscription)
         end),
     {noreply, State};
 
@@ -311,7 +323,7 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
             mnesia:write(topic, TopicR, write);
         Records ->
             case lists:member(TopicR, Records) of
-                true -> ok;
+                true  -> ok;
                 false -> mnesia:write(topic, TopicR, write)
             end
     end.
@@ -320,20 +332,36 @@ add_subscriptions(undefined, _TopicTable) ->
     ok;
 add_subscriptions(SubId, TopicTable) ->
     lists:foreach(fun({Topic, Qos}) ->
-            %%TODO: this is not right...
-            Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos},
-            mnesia:write(subscription, Subscription, write)
-        end,TopicTable).
+                    add_subscription(SubId, {Topic, Qos})
+                  end,TopicTable).
+
+add_subscription(SubId, {Topic, Qos}) ->
+    Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos},
+    Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
+    Records = mnesia:match_object(subscription, Pattern, write),
+    case lists:member(Subscription, Records) of
+        true  ->
+            ok;
+        false ->
+            [delete_subscription(Record) || Record <- Records],
+            insert_subscription(Subscription)
+    end.
+
+insert_subscription(Record) ->
+    mnesia:write(subscription, Record, write).
 
 remove_subscriptions(undefined, _Topics) ->
     ok;
 remove_subscriptions(SubId, Topics) ->
     lists:foreach(fun(Topic) ->
          Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
-         [mnesia:delete_object(subscription, Subscription, write)
-            || Subscription <- mnesia:match_object(subscription, Pattern, write)]
+         Records = mnesia:match_object(subscription, Pattern, write),
+         [delete_subscription(Record) || Record <- Records]
      end, Topics).
 
+delete_subscription(Record) ->
+    mnesia:delete_object(subscription, Record, write).
+
 %%%=============================================================================
 %%% Trace Functions
 %%%=============================================================================

+ 17 - 20
src/emqttd_pubsub_helper.erl

@@ -30,7 +30,7 @@
 -include("emqttd.hrl").
 
 %% API Function Exports
--export([start_link/1, aging/1, setstats/1]).
+-export([start_link/2, aging/1]).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -42,7 +42,7 @@
 
 -record(aging, {topics, time, tref}).
 
--record(state, {aging :: #aging{}}).
+-record(state, {aging :: #aging{}, statsfun}).
 
 -define(SERVER, ?MODULE).
 
@@ -53,12 +53,12 @@
 %%%=============================================================================
 
 %%------------------------------------------------------------------------------
-%% @doc Start pubsub helper. 
+%% @doc Start pubsub helper.
 %% @end
 %%------------------------------------------------------------------------------
--spec start_link(list(tuple())) -> {ok, pid()} | ignore | {error, any()}. 
-start_link(Opts) ->
-    gen_server2:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
+-spec start_link(fun(), list(tuple())) -> {ok, pid()} | ignore | {error, any()}.
+start_link(StatsFun, Opts) ->
+    gen_server2:start_link({local, ?SERVER}, ?MODULE, [StatsFun, Opts], []).
 
 %%------------------------------------------------------------------------------
 %% @doc Aging topics
@@ -68,19 +68,11 @@ start_link(Opts) ->
 aging(Topics) ->
     gen_server2:cast(?SERVER, {aging, Topics}).
 
-setstats(topic) ->
-    emqttd_stats:setstats('topics/count', 'topics/max',
-                          mnesia:table_info(topic, size));
-setstats(subscription) ->
-    emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
-                          mnesia:table_info(subscription, size)).
-
 %%%=============================================================================
 %%% gen_server callbacks
 %%%=============================================================================
 
-init([Opts]) ->
-
+init([StatsFun, Opts]) ->
     mnesia:subscribe(system),
 
     AgingSecs = proplists:get_value(route_aging, Opts, 5),
@@ -90,13 +82,15 @@ init([Opts]) ->
 
     {ok, #state{aging = #aging{topics = dict:new(),
                                time   = AgingSecs,
-                               tref   = AgingTref}}}.
+                               tref   = AgingTref},
+               statsfun = StatsFun}}.
 
 start_tick(Secs) ->
     timer:send_interval(timer:seconds(Secs), {clean, aged}).
 
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
+handle_call(Req, _From, State) ->
+    lager:error("Unexpected Request: ~p", [Req]),
+    {reply, {error, unsupported_request}, State}.
 
 handle_cast({aging, Topics}, State = #state{aging = Aging}) ->
     #aging{topics = Dict} = Aging,
@@ -123,7 +117,7 @@ handle_info({clean, aged}, State = #state{aging = Aging}) ->
 
     NewAging = Aging#aging{topics = dict:from_list(Dict1)},
 
-    {noreply, State#state{aging = NewAging}, hibernate};
+    noreply(State#state{aging = NewAging});
 
 handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
     Pattern = #mqtt_topic{_ = '_', node = Node},
@@ -132,7 +126,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
                 R <- mnesia:match_object(topic, Pattern, write)]
         end,
     mnesia:async_dirty(F),
-    {noreply, State};
+    noreply(State);
 
 handle_info(_Info, State) ->
     {noreply, State}.
@@ -147,6 +141,9 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal Functions
 %%%=============================================================================
 
+noreply(State = #state{statsfun = StatsFun}) ->
+    StatsFun(topic), {noreply, State, hibernate}.
+
 try_clean(ByTime, List) ->
     try_clean(ByTime, List, []).
 

+ 10 - 4
src/emqttd_pubsub_sup.erl

@@ -42,16 +42,22 @@ start_link() ->
 
 init([Opts]) ->
     %% PubSub Helper
-    Helper = {helper, {?HELPER, start_link, [Opts]},
+    Helper = {helper, {?HELPER, start_link, [fun stats/1, Opts]},
                 permanent, infinity, worker, [?HELPER]},
 
     %% PubSub Pool Sup
-    MFA = {emqttd_pubsub, start_link, [Opts]},
-    PoolSup = emqttd_pool_sup:spec(pool_sup, [
-                pubsub, hash, pool_size(Opts), MFA]),
+    MFA = {emqttd_pubsub, start_link, [fun stats/1, Opts]},
+    PoolSup = emqttd_pool_sup:spec([pubsub, hash, pool_size(Opts), MFA]),
     {ok, {{one_for_all, 10, 60}, [Helper, PoolSup]}}.
 
 pool_size(Opts) ->
     Schedulers = erlang:system_info(schedulers),
     proplists:get_value(pool_size, Opts, Schedulers).
 
+stats(topic) ->
+    emqttd_stats:setstats('topics/count', 'topics/max',
+                          mnesia:table_info(topic, size));
+stats(subscription) ->
+    emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
+                          mnesia:table_info(subscription, size)).
+

+ 1 - 2
src/emqttd_sm_sup.erl

@@ -56,8 +56,7 @@ init([]) ->
 
     %% SM Pool Sup
     MFA = {?SM, start_link, []},
-    PoolSup = emqttd_pool_sup:spec(pool_sup, [
-                ?SM, hash, erlang:system_info(schedulers), MFA]),
+    PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]),
 
     {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}.