Feng 10 лет назад
Родитель
Сommit
26655f1ee3

+ 90 - 0
src/emqttd_backend.erl

@@ -0,0 +1,90 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_backend).
+
+-include("emqttd.hrl").
+
+%% Mnesia Callbacks
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+%% API.
+-export([add_static_subscription/1, lookup_static_subscriptions/1,
+         del_static_subscriptions/1, del_static_subscription/2]).
+
+%%--------------------------------------------------------------------
+%% Mnesia callbacks
+%%--------------------------------------------------------------------
+
+mnesia(boot) ->
+    ok = emqttd_mnesia:create_table(static_subscription, [
+                {type, bag},
+                {disc_copies, [node()]},
+                {record_name, mqtt_subscription},
+                {attributes, record_info(fields, mqtt_subscription)},
+                {storage_properties, [{ets, [compressed]},
+                                      {dets, [{auto_save, 5000}]}]}]);
+
+mnesia(copy) ->
+    ok = emqttd_mnesia:copy_table(static_subscription).
+
+%%--------------------------------------------------------------------
+%% Static Subscriptions
+%%--------------------------------------------------------------------
+
+%% @doc Add a static subscription manually.
+-spec add_static_subscription(mqtt_subscription()) -> {atom, ok}.
+add_static_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) ->
+    Pattern = match_pattern(SubId, Topic),
+    mnesia:transaction(
+        fun() ->
+            case mnesia:match_object(static_subscription, Pattern, write) of
+                [] ->
+                    mnesia:write(static_subscription, Subscription, write);
+                [Subscription] ->
+                    mnesia:abort({error, existed});
+                [Subscription1] -> %% QoS is different
+                    mnesia:delete_object(static_subscription, Subscription1, write),
+                    mnesia:write(static_subscription, Subscription, write)
+            end
+        end).
+
+%% @doc Lookup static subscriptions.
+-spec lookup_static_subscriptions(binary()) -> list(mqtt_subscription()).
+lookup_static_subscriptions(ClientId) when is_binary(ClientId) ->
+    mnesia:dirty_read(static_subscription, ClientId).
+
+%% @doc Delete static subscriptions by ClientId manually.
+-spec del_static_subscriptions(binary()) -> ok.
+del_static_subscriptions(ClientId) when is_binary(ClientId) ->
+    mnesia:transaction(fun mnesia:delete/1, [{static_subscription, ClientId}]).
+
+%% @doc Delete a static subscription manually.
+-spec del_static_subscription(binary(), binary()) -> ok.
+del_static_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) ->
+    mnesia:transaction(fun del_static_subscription_/1, [match_pattern(ClientId, Topic)]).
+
+del_static_subscription_(Pattern) ->
+    lists:foreach(fun(Subscription) ->
+                mnesia:delete_object(static_subscription, Subscription, write)
+        end, mnesia:match_object(static_subscription, Pattern, write)).
+
+match_pattern(SubId, Topic) ->
+    #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}.
+

+ 1 - 1
src/emqttd_http.erl

@@ -56,7 +56,7 @@ handle_request('POST', "/mqtt/publish", Req) ->
         case {validate(qos, Qos), validate(topic, Topic)} of
             {true, true} ->
                 Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
-                emqttd_pubsub:publish(Msg#mqtt_message{retain  = Retain}),
+                emqttd:publish(Msg#mqtt_message{retain  = Retain}),
                 Req:ok({"text/plain", <<"ok">>});
            {false, _} ->
                 Req:respond({400, [], <<"Bad QoS">>});

+ 265 - 0
src/emqttd_pubsub.erl

@@ -0,0 +1,265 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_pubsub).
+
+-behaviour(gen_server2).
+
+-include("emqttd.hrl").
+
+-include("emqttd_protocol.hrl").
+
+-include("emqttd_internal.hrl").
+
+%% Mnesia Callbacks
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+%% API Exports
+-export([start_link/3, create_topic/1, lookup_topic/1]).
+
+-export([subscribe/2, unsubscribe/2, publish/2, dispatch/2,
+         async_subscribe/2, async_unsubscribe/2]).
+
+%% gen_server.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {pool, id, env}).
+
+%%--------------------------------------------------------------------
+%% Mnesia callbacks
+%%--------------------------------------------------------------------
+
+mnesia(boot) ->
+    ok = emqttd_mnesia:create_table(topic, [
+                {ram_copies, [node()]},
+                {record_name, mqtt_topic},
+                {attributes, record_info(fields, mqtt_topic)}]);
+
+mnesia(copy) ->
+    ok = emqttd_mnesia:copy_table(topic).
+
+%%--------------------------------------------------------------------
+%% Start PubSub
+%%--------------------------------------------------------------------
+
+%% @doc Start one pubsub
+-spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
+    Pool :: atom(),
+    Id   :: pos_integer(),
+    Env  :: list(tuple()).
+start_link(Pool, Id, Env) ->
+    gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
+
+%% @doc Create a Topic.
+-spec create_topic(emqttd_topic:topic()) -> ok | {error, any()}.
+create_topic(Topic) when is_binary(Topic) ->
+    case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of
+        {atomic, ok}     -> ok;
+        {aborted, Error} -> {error, Error}
+    end.
+
+%% @doc Lookup a Topic.
+-spec lookup_topic(emqttd_topic:topic()) -> list(mqtt_topic()).
+lookup_topic(Topic) when is_binary(Topic) ->
+    mnesia:dirty_read(topic, Topic).
+
+%%--------------------------------------------------------------------
+%% PubSub API
+%%--------------------------------------------------------------------
+
+%% @doc Subscribe a Topic
+-spec subscribe(binary(), pid()) -> ok.
+subscribe(Topic, SubPid) when is_binary(Topic) ->
+    call(pick(Topic), {subscribe, Topic, SubPid}).
+
+%% @doc Asynchronous Subscribe
+-spec async_subscribe(binary(), pid()) -> ok.
+async_subscribe(Topic, SubPid) when is_binary(Topic) ->
+    cast(pick(Topic), {subscribe, Topic, SubPid}).
+
+%% @doc Publish message to Topic.
+-spec publish(binary(), any()) -> ok.
+publish(Topic, Msg) ->
+    lists:foreach(
+        fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
+            ?MODULE:dispatch(To, Msg);
+           (#mqtt_route{topic = To, node = Node}) ->
+            rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
+        end, emqttd_router:lookup(Topic)).
+
+%% @doc Dispatch Message to Subscribers
+-spec dispatch(binary(), mqtt_message()) -> ok.
+dispatch(Queue = <<"$Q/", _Q>>, Msg) ->
+    case subscribers(Queue) of
+        [] ->
+            dropped(Queue);
+        [SubPid] ->
+            SubPid ! {dispatch, Queue, Msg};
+        SubPids ->
+            Idx = crypto:rand_uniform(1, length(SubPids) + 1),
+            SubPid = lists:nth(Idx, SubPids),
+            SubPid ! {dispatch, Queue, Msg}
+    end;
+
+dispatch(Topic, Msg) ->
+    case subscribers(Topic) of
+        [] ->
+            dropped(Topic);
+        [SubPid] ->
+            SubPid ! {dispatch, Topic, Msg};
+        SubPids ->
+            lists:foreach(fun(SubPid) ->
+                SubPid ! {dispatch, Topic, Msg}
+            end, SubPids)
+    end.
+
+%% @private
+%% @doc Find all subscribers
+subscribers(Topic) ->
+    case ets:member(subscriber, Topic) of
+        true -> %% faster then lookup?
+            try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end;
+        false ->
+            []
+    end.
+
+%% @private
+%% @doc Ingore $SYS Messages.
+dropped(<<"$SYS/", _/binary>>) ->
+    ok;
+dropped(_Topic) ->
+    emqttd_metrics:inc('messages/dropped').
+
+%% @doc Unsubscribe
+-spec unsubscribe(binary(), pid()) -> ok.
+unsubscribe(Topic, SubPid) when is_binary(Topic) ->
+    call(pick(Topic), {unsubscribe, Topic, SubPid}).
+
+%% @doc Asynchronous Unsubscribe
+-spec async_unsubscribe(binary(), pid()) -> ok.
+async_unsubscribe(Topic, SubPid)  when is_binary(Topic) ->
+    cast(pick(Topic), {unsubscribe, Topic, SubPid}).
+
+call(PubSub, Req) when is_pid(PubSub) ->
+    gen_server2:call(PubSub, Req, infinity).
+
+cast(PubSub, Msg) when is_pid(PubSub) ->
+    gen_server2:cast(PubSub, Msg).
+
+pick(Topic) -> gproc_pool:pick_worker(pubsub, Topic).
+
+%%--------------------------------------------------------------------
+%% gen_server Callbacks
+%%--------------------------------------------------------------------
+
+init([Pool, Id, Env]) ->
+    ?GPROC_POOL(join, Pool, Id),
+    {ok, #state{pool = Pool, id = Id, env = Env}}.
+
+handle_call({subscribe, Topic, SubPid}, _From, State) ->
+    add_subscriber_(Topic, SubPid),
+	{reply, ok, setstats(State)};
+
+handle_call({unsubscribe, Topic, SubPid}, _From, State) ->
+    del_subscriber_(Topic, SubPid),
+	{reply, ok, setstats(State)};
+
+handle_call(Req, _From, State) ->
+    ?UNEXPECTED_REQ(Req, State).
+
+handle_cast({subscribe, Topic, SubPid}, State) ->
+    add_subscriber_(Topic, SubPid),
+	{noreply, setstats(State)};
+
+handle_cast({unsubscribe, Topic, SubPid}, State) ->
+    del_subscriber_(Topic, SubPid),
+	{noreply, setstats(State)};
+
+handle_cast(Msg, State) ->
+    ?UNEXPECTED_MSG(Msg, State).
+
+handle_info(Info, State) ->
+    ?UNEXPECTED_INFO(Info, State).
+
+terminate(_Reason, #state{pool = Pool, id = Id}) ->
+    ?GPROC_POOL(leave, Pool, Id).
+
+code_change(_OldVsn, State, _Extra) ->
+	{ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal Functions
+%%--------------------------------------------------------------------
+
+add_subscriber_(Topic, SubPid) ->
+    case ets:member(subscriber, Topic) of
+        false ->
+            mnesia:transaction(fun add_topic_/1, [Topic]),
+            emqttd_router:add_route(Topic, node()),
+            setstats(topic);
+        true ->
+            ok
+    end,
+    ets:insert(subscriber, {Topic, SubPid}).
+
+del_subscriber_(Topic, SubPid) ->
+    ets:delete_object(subscriber, {Topic, SubPid}),
+    case ets:lookup(subscriber, Topic) of
+        [] ->
+            emqttd_router:del_route(Topic, node()),
+            mnesia:transaction(fun del_topic_/1, [Topic]),
+            setstats(topic);
+        [_|_] ->
+            ok
+    end.
+
+add_topic_(Topic) ->
+    add_topic_(Topic, []).
+
+add_topic_(Topic, Flags) ->
+    Record = #mqtt_topic{topic = Topic, flags = Flags},
+    case mnesia:wread({topic, Topic}) of
+        []  -> mnesia:write(topic, Record, write);
+        [_] -> ok
+    end.
+
+del_topic_(Topic) ->
+    case emqttd_router:has_route(Topic) of
+        true  -> ok;
+        false -> do_del_topic_(Topic)
+    end.
+
+do_del_topic_(Topic) ->
+    case mnesia:wread({topic, Topic}) of
+        [#mqtt_topic{flags = []}] ->
+            mnesia:delete(topic, Topic, write);
+        _ ->
+            ok
+    end.
+
+setstats(State) when is_record(State, state) ->
+    setstats(subscriber), State;
+
+setstats(topic) ->
+    emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size));
+
+setstats(subscriber) ->
+    emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)).
+

+ 0 - 414
src/emqttd_pubsub_old.erl

@@ -1,414 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_pubsub).
-
--behaviour(gen_server2).
-
--include("emqttd.hrl").
-
--include("emqttd_protocol.hrl").
-
--include("emqttd_internal.hrl").
-
-%% Mnesia Callbacks
--export([mnesia/1]).
-
--boot_mnesia({mnesia, [boot]}).
--copy_mnesia({mnesia, [copy]}).
-
-%% API Exports
--export([start_link/4]).
-
--export([create/2, lookup/2, subscribe/1, subscribe/2,
-         publish/1, unsubscribe/1, unsubscribe/2, delete/2]).
-
-%% Local node
--export([match/1]).
-
-%% gen_server Function Exports
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
--record(state, {pool, id, statsfun}).
-
--define(ROUTER, emqttd_router).
-
-%%--------------------------------------------------------------------
-%% Mnesia callbacks
-%%--------------------------------------------------------------------
-mnesia(boot) ->
-    ok = create_table(topic, ram_copies),
-    if_subscription(fun(RamOrDisc) ->
-                      ok = create_table(subscription, RamOrDisc)
-                    end);
-
-mnesia(copy) ->
-    ok = emqttd_mnesia:copy_table(topic),
-    %% Only one disc_copy???
-    if_subscription(fun(_RamOrDisc) ->
-                      ok = emqttd_mnesia:copy_table(subscription)
-                    end).
-
-%% Topic Table
-create_table(topic, RamOrDisc) ->
-    emqttd_mnesia:create_table(topic, [
-            {type, bag},
-            {RamOrDisc, [node()]},
-            {record_name, mqtt_topic},
-            {attributes, record_info(fields, mqtt_topic)}]);
-
-%% Subscription Table
-create_table(subscription, RamOrDisc) ->
-    emqttd_mnesia:create_table(subscription, [
-            {type, bag},
-            {RamOrDisc, [node()]},
-            {record_name, mqtt_subscription},
-            {attributes, record_info(fields, mqtt_subscription)},
-            {storage_properties, [{ets, [compressed]},
-                                  {dets, [{auto_save, 5000}]}]}]).
-
-if_subscription(Fun) ->
-    case env(subscription) of
-        disc      -> Fun(disc_copies);
-        ram       -> Fun(ram_copies);
-        false     -> ok;
-        undefined -> ok
-    end.
-
-env(Key) ->
-    case get({pubsub, Key}) of
-        undefined ->
-            cache_env(Key);
-        Val ->
-            Val
-    end.
-
-cache_env(Key) ->
-    Val = proplists:get_value(Key, emqttd_broker:env(pubsub)),
-    put({pubsub, Key}, Val),
-    Val.
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
-
-%% @doc Start one pubsub server
--spec start_link(Pool, Id, StatsFun, Opts) -> {ok, pid()} | ignore | {error, any()} when
-    Pool     :: atom(),
-    Id       :: pos_integer(),
-    StatsFun :: fun((atom()) -> any()),
-    Opts     :: list(tuple()).
-start_link(Pool, Id, StatsFun, Opts) ->
-    gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)},
-                           ?MODULE, [Pool, Id, StatsFun, Opts], []).
-
-%% @doc Create Topic or Subscription.
--spec create(topic, emqttd_topic:topic()) -> ok | {error, any()};
-            (subscription, {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}.
-create(topic, Topic) when is_binary(Topic) ->
-    Record = #mqtt_topic{topic = Topic, node = node()},
-    case mnesia:transaction(fun add_topic/1, [Record]) of
-        {atomic, ok}     -> ok;
-        {aborted, Error} -> {error, Error}
-    end;
-
-create(subscription, {SubId, Topic, Qos}) when is_binary(SubId) andalso is_binary(Topic) ->
-    case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, ?QOS_I(Qos)}]) of
-        {atomic, ok}     -> ok;
-        {aborted, Error} -> {error, Error}
-    end.
-
-%% @doc Lookup Topic or Subscription.
--spec lookup(topic, emqttd_topic:topic()) -> list(mqtt_topic());
-            (subscription, binary())      -> list(mqtt_subscription()).
-lookup(topic, Topic) when is_binary(Topic) ->
-    mnesia:dirty_read(topic, Topic);
-
-lookup(subscription, SubId) when is_binary(SubId) ->
-    mnesia:dirty_read(subscription, SubId).
-
-%% @doc Delete Topic or Subscription.
--spec delete(topic, emqttd_topic:topic()) -> ok | {error, any()};
-            (subscription, binary() | {binary(), emqttd_topic:topic()}) -> ok.
-delete(topic, _Topic) ->
-    {error, unsupported};
-
-delete(subscription, SubId) when is_binary(SubId) ->
-    mnesia:dirty_delete({subscription, SubId});
-
-delete(subscription, {SubId, Topic}) when is_binary(SubId) andalso is_binary(Topic) ->
-    mnesia:async_dirty(fun remove_subscriptions/2, [SubId, [Topic]]).
-
-%% @doc Subscribe Topics
--spec subscribe({Topic, Qos} | list({Topic, Qos})) ->
-    {ok, Qos | list(Qos)} | {error, any()} when
-    Topic   :: binary(),
-    Qos     :: mqtt_qos() | mqtt_qos_name().
-subscribe({Topic, Qos}) ->
-    subscribe([{Topic, Qos}]);
-subscribe(TopicTable) when is_list(TopicTable) ->
-    call({subscribe, {undefined, self()}, fixqos(TopicTable)}).
-
--spec subscribe(ClientId, {Topic, Qos} | list({Topic, Qos})) ->
-    {ok, Qos | list(Qos)} | {error, any()} when
-    ClientId :: binary(),
-    Topic    :: binary(),
-    Qos      :: mqtt_qos() | mqtt_qos_name().
-subscribe(ClientId, {Topic, Qos}) when is_binary(ClientId) ->
-    subscribe(ClientId, [{Topic, Qos}]);
-subscribe(ClientId, TopicTable) when is_binary(ClientId) andalso is_list(TopicTable) ->
-    call({subscribe, {ClientId, self()}, fixqos(TopicTable)}).
-
-fixqos(TopicTable) ->
-    [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable].
-
-%% @doc Unsubscribe Topic or Topics
--spec unsubscribe(emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok.
-unsubscribe(Topic) when is_binary(Topic) ->
-    unsubscribe([Topic]);
-unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
-    cast({unsubscribe, {undefined, self()}, Topics}).
-
--spec unsubscribe(binary(), emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok.
-unsubscribe(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) ->
-    unsubscribe(ClientId, [Topic]);
-unsubscribe(ClientId, Topics = [Topic|_]) when is_binary(Topic) ->
-    cast({unsubscribe, {ClientId, self()}, Topics}).
-
-call(Request) ->
-    gen_server2:call(pick(self()), Request, infinity).
-
-cast(Msg) ->
-    gen_server2:cast(pick(self()), Msg).
-
-pick(Self) -> gproc_pool:pick_worker(pubsub, Self).
-
-%% @doc Publish to cluster nodes
--spec publish(Msg :: mqtt_message()) -> ok.
-publish(Msg = #mqtt_message{from = From}) ->
-    trace(publish, From, Msg),
-    Msg1 = #mqtt_message{topic = To}
-               = emqttd_broker:foldl_hooks('message.publish', [], Msg),
-
-    %% Retain message first. Don't create retained topic.
-    case emqttd_retainer:retain(Msg1) of
-        ok ->
-            %% TODO: why unset 'retain' flag?
-            publish(To, emqttd_message:unset_flag(Msg1));
-        ignore ->
-            publish(To, Msg1)
-     end.
-
-publish(To, Msg) ->
-    lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) ->
-                    case Node =:= node() of
-                        true  -> ?ROUTER:route(Topic, Msg);
-                        false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg])
-                    end
-                  end, match(To)).
-
-%% @doc Match Topic Name with Topic Filters
--spec match(emqttd_topic:topic()) -> [mqtt_topic()].
-match(To) ->
-    Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
-    %% ets:lookup for topic table will be replicated to all nodes.
-    lists:append([ets:lookup(topic, Topic) || Topic <- [To | Matched]]).
-
-%%--------------------------------------------------------------------
-%% gen_server callbacks
-%%--------------------------------------------------------------------
-
-init([Pool, Id, StatsFun, _Opts]) ->
-    ?GPROC_POOL(join, Pool, Id),
-    {ok, #state{pool = Pool, id = Id, statsfun = StatsFun}}.
-
-handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
-            State = #state{statsfun = StatsFun}) ->
-
-    %% Monitor SubPid first
-    try_monitor(SubPid),
-
-    %% Topics
-    Topics = [Topic || {Topic, _Qos} <- TopicTable],
-
-    NewTopics = Topics -- reverse_routes(SubPid),
-
-    %% Add routes
-    ?ROUTER:add_routes(NewTopics, SubPid),
-
-    insert_reverse_routes(SubPid, NewTopics),
-
-    StatsFun(reverse_route),
-
-    %% Insert topic records to mnesia
-    Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics],
-
-    case mnesia:transaction(fun add_topics/1, [Records]) of
-        {atomic, _} ->
-            StatsFun(topic),
-            if_subscription(
-                fun(_) ->
-                    %% Add subscriptions
-                    Args = [fun add_subscriptions/2, [SubId, TopicTable]],
-                    emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
-                    StatsFun(subscription)
-                end),
-            %% Grant all qos...
-            {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State};
-        {aborted, Error} ->
-            {reply, {error, Error}, State}
-    end;
-
-handle_call(Req, _From, State) ->
-   ?UNEXPECTED_REQ(Req, State).
-
-handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) ->
-
-    %% Delete routes first
-    ?ROUTER:delete_routes(Topics, SubPid),
-
-    delete_reverse_routes(SubPid, Topics),
-
-    StatsFun(reverse_route),
-
-    %% Remove subscriptions
-    if_subscription(
-        fun(_) ->
-            Args = [fun remove_subscriptions/2, [SubId, Topics]],
-            emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
-            StatsFun(subscription)
-        end),
-
-    {noreply, State};
-
-handle_cast(Msg, State) ->
-    ?UNEXPECTED_MSG(Msg, State).
-
-handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{statsfun = StatsFun}) ->
-
-    Topics = reverse_routes(DownPid),
-
-    ?ROUTER:delete_routes(Topics, DownPid),
-
-    delete_reverse_routes(DownPid),
-
-    StatsFun(reverse_route),
-
-    {noreply, State, hibernate};
-
-handle_info(Info, State) ->
-    ?UNEXPECTED_INFO(Info, State).
-
-terminate(_Reason, #state{pool = Pool, id = Id}) ->
-    ?GPROC_POOL(leave, Pool, Id).
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-add_topics(Records) ->
-    lists:foreach(fun add_topic/1, Records).
-
-add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
-    case mnesia:wread({topic, Topic}) of
-        [] ->
-            case emqttd_topic:wildcard(Topic) of
-                true  -> emqttd_trie:insert(Topic);
-                false -> ok
-            end,
-            mnesia:write(topic, TopicR, write);
-        Records ->
-            case lists:member(TopicR, Records) of
-                true  -> ok;
-                false -> mnesia:write(topic, TopicR, write)
-            end
-    end.
-
-add_subscriptions(undefined, _TopicTable) ->
-    ok;
-add_subscriptions(SubId, TopicTable) ->
-    lists:foreach(fun({Topic, Qos}) ->
-                    add_subscription(SubId, {Topic, Qos})
-                  end,TopicTable).
-
-add_subscription(SubId, {Topic, Qos}) ->
-    Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos},
-    Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
-    Records = mnesia:match_object(subscription, Pattern, write),
-    case lists:member(Subscription, Records) of
-        true ->
-            ok;
-        false ->
-            [delete_subscription(Record) || Record <- Records],
-            insert_subscription(Subscription)
-    end.
-
-insert_subscription(Record) ->
-    mnesia:write(subscription, Record, write).
-
-remove_subscriptions(undefined, _Topics) ->
-    ok;
-remove_subscriptions(SubId, Topics) ->
-    lists:foreach(fun(Topic) ->
-         Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
-         Records = mnesia:match_object(subscription, Pattern, write),
-         lists:foreach(fun delete_subscription/1, Records)
-     end, Topics).
-
-delete_subscription(Record) ->
-    mnesia:delete_object(subscription, Record, write).
-
-reverse_routes(SubPid) ->
-    case ets:member(reverse_route, SubPid) of
-        true  ->
-            try ets:lookup_element(reverse_route, SubPid, 2) catch error:badarg -> [] end;
-        false ->
-            []
-    end.
-
-insert_reverse_routes(SubPid, Topics) ->
-    ets:insert(reverse_route, [{SubPid, Topic} || Topic <- Topics]).
-
-delete_reverse_routes(SubPid, Topics) ->
-    lists:foreach(fun(Topic) ->
-                ets:delete_object(reverse_route, {SubPid, Topic})
-        end, Topics).
-
-delete_reverse_routes(SubPid) ->
-    ets:delete(reverse_route, SubPid).
-
-try_monitor(SubPid) ->
-    case ets:member(reverse_route, SubPid) of
-        true  -> ignore;
-        false -> erlang:monitor(process, SubPid)
-    end.
-
-%%--------------------------------------------------------------------
-%% Trace Functions
-%%--------------------------------------------------------------------
-
-trace(publish, From, _Msg) when is_atom(From) ->
-    %% Dont' trace '$SYS' publish
-    ignore;
-
-trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
-    lager:info([{client, From}, {topic, Topic}],
-               "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
-

+ 23 - 44
src/emqttd_pubsub_sup.erl

@@ -21,8 +21,6 @@
 
 -include("emqttd.hrl").
 
--define(HELPER, emqttd_pubsub_helper).
-
 -define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
 
 %% API
@@ -38,33 +36,37 @@ pubsub_pool() ->
     hd([Pid|| {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
 
 init([Env]) ->
-    %% Create tabs
-    create_tab(route), create_tab(reverse_route),
-
-    %% PubSub Helper
-    Helper = {helper, {?HELPER, start_link, [fun setstats/1]},
-                permanent, infinity, worker, [?HELPER]},
 
-    %% Router Pool Sup
-    RouterMFA = {emqttd_router, start_link, [fun setstats/1, Env]},
+    %% Create ETS Tabs
+    create_tab(subscriber), create_tab(subscribed),
 
-    %% Pool_size / 2
-    RouterSup = emqttd_pool_sup:spec(router_pool, [router, hash, router_pool(Env), RouterMFA]),
+    %% Router
+    Router = {router, {emqttd_router, start_link, []},
+                permanent, 5000, worker, [emqttd_router]},
 
     %% PubSub Pool Sup
-    PubSubMFA = {emqttd_pubsub, start_link, [fun setstats/1, Env]},
-    PubSubSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]),
+    PubSubMFA = {emqttd_pubsub, start_link, [Env]},
+    PubSubPoolSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]),
+
+    %% Server Pool Sup
+    ServerMFA = {emqttd_server, start_link, [Env]},
+    ServerPoolSup = emqttd_pool_sup:spec(server_pool, [server, hash, pool_size(Env), ServerMFA]),
 
-    {ok, {{one_for_all, 10, 60}, [Helper, RouterSup, PubSubSup]}}.
+    {ok, {{one_for_all, 5, 60}, [Router, PubSubPoolSup, ServerPoolSup]}}.
 
-create_tab(route) ->
-    %% Route Table: Topic -> Pid1, Pid2, ..., PidN
+pool_size(Env) ->
+    Schedulers = erlang:system_info(schedulers),
+    proplists:get_value(pool_size, Env, Schedulers).
+
+create_tab(subscriber) ->
+    %% subscriber: Topic -> Pid1, Pid2, ..., PidN
     %% duplicate_bag: o(1) insert
-    ensure_tab(route, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
+    ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
 
-create_tab(reverse_route) ->
-    %% Reverse Route Table: Pid -> Topic1, Topic2, ..., TopicN
-    ensure_tab(reverse_route, [public, named_table, bag | ?CONCURRENCY_OPTS]).
+create_tab(subscribed) ->
+    %% subscribed: Pid -> Topic1, Topic2, ..., TopicN
+    %% bag: o(n) insert
+    ensure_tab(subscribed, [public, named_table, bag | ?CONCURRENCY_OPTS]).
 
 ensure_tab(Tab, Opts) ->
     case ets:info(Tab, name) of
@@ -72,26 +74,3 @@ ensure_tab(Tab, Opts) ->
         _ -> ok
     end.
 
-router_pool(Env) ->
-    case pool_size(Env) div 2 of
-        0 -> 1;
-        I -> I
-    end.
-
-pool_size(Env) ->
-    Schedulers = erlang:system_info(schedulers),
-    proplists:get_value(pool_size, Env, Schedulers).
-
-setstats(route) ->
-    emqttd_stats:setstat('routes/count', ets:info(route, size));
-
-setstats(reverse_route) ->
-    emqttd_stats:setstat('routes/reverse', ets:info(reverse_route, size));
-
-setstats(topic) ->
-    emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size));
-
-setstats(subscription) ->
-    emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
-                          mnesia:table_info(subscription, size)).
-

+ 225 - 0
src/emqttd_router.erl

@@ -0,0 +1,225 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_router).
+
+-behaviour(gen_server).
+
+-include("emqttd.hrl").
+
+%% Mnesia Bootstrap
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+-export([start_link/0, stop/0]).
+
+-export([add_route/1, add_route/2, add_routes/1, lookup/1, print/1,
+         del_route/1, del_route/2, del_routes/1, has_route/1]).
+
+%% gen_server Function Exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {}).
+
+%%--------------------------------------------------------------------
+%% Mnesia Bootstrap
+%%--------------------------------------------------------------------
+
+mnesia(boot) ->
+    ok = emqttd_mnesia:create_table(route, [
+                {type, bag},
+                {ram_copies, [node()]},
+                {record_name, mqtt_route},
+                {attributes, record_info(fields, mqtt_route)}]);
+
+mnesia(copy) ->
+    ok = emqttd_mnesia:copy_table(route, ram_copies).
+
+%%--------------------------------------------------------------------
+%% Start the Router
+%%--------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+%% @doc Lookup Routes.
+-spec lookup(Topic:: binary()) -> [mqtt_route()].
+lookup(Topic) when is_binary(Topic) ->
+    Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]),
+    %% Optimize: route table will be replicated to all nodes.
+    lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]).
+
+%% @doc Print Routes.
+-spec print(Topic :: binary()) -> [ok].
+print(Topic) ->
+    [io:format("~s -> ~s~n", [To, Node]) ||
+        #mqtt_route{topic = To, node = Node} <- lookup(Topic)].
+
+%% @doc Add Route
+-spec add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}.
+add_route(Topic) when is_binary(Topic) ->
+    add_route(#mqtt_route{topic = Topic, node = node()});
+add_route(Route) when is_record(Route, mqtt_route) ->
+    add_routes([Route]).
+
+-spec add_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}.
+add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
+    add_route(#mqtt_route{topic = Topic, node = Node}).
+
+%% @doc Add Routes
+-spec add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}.
+add_routes(Routes) ->
+    Add = fun() -> [add_route_(Route) || Route <- Routes] end,
+    case mnesia:transaction(Add) of
+        {atomic, _}      -> update_stats_(), ok;
+        {aborted, Error} -> {error, Error}
+    end.
+
+%% @private
+add_route_(Route = #mqtt_route{topic = Topic}) ->
+    case mnesia:wread({route, Topic}) of
+        [] ->
+            case emqttd_topic:wildcard(Topic) of
+                true  -> emqttd_trie:insert(Topic);
+                false -> ok
+            end,
+            mnesia:write(route, Route, write);
+        Records ->
+            case lists:member(Route, Records) of
+                true  -> ok;
+                false -> mnesia:write(route, Route, write)
+            end
+    end.
+
+%% @doc Delete Route
+-spec del_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}.
+del_route(Topic) when is_binary(Topic) ->
+    del_route(#mqtt_route{topic = Topic, node = node()});
+del_route(Route) when is_record(Route, mqtt_route) ->
+    del_routes([Route]).
+
+-spec del_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}.
+del_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
+    del_route(#mqtt_route{topic = Topic, node = Node}).
+
+%% @doc Delete Routes
+-spec del_routes([mqtt_route()]) -> ok | {error, any()}.
+del_routes(Routes) ->
+    Del = fun() -> [del_route_(Route) || Route <- Routes] end,
+    case mnesia:transaction(Del) of
+        {atomic, _}      -> update_stats_(), ok;
+        {aborted, Error} -> {error, Error}
+    end.
+
+del_route_(Route = #mqtt_route{topic = Topic}) ->
+    case mnesia:wread({route, Topic}) of
+        [] ->
+            ok;
+        [Route] ->
+            %% Remove route and trie
+            mnesia:delete_object(route, Route, write),
+            case emqttd_topic:wildcard(Topic) of
+                true  -> emqttd_trie:delete(Topic);
+                false -> ok
+            end;
+        _More ->
+            %% Remove route only
+            mnesia:delete_object(route, Route, write)
+    end.
+
+%% @doc Has Route?
+-spec has_route(binary()) -> boolean().
+has_route(Topic) ->
+    Routes = case mnesia:is_transaction() of
+                 true  -> mnesia:read(route, Topic);
+                 false -> mnesia:dirty_read(route, Topic)
+             end,
+    length(Routes) > 0.
+
+stop() -> gen_server:call(?MODULE, stop).
+
+%%--------------------------------------------------------------------
+%% gen_server Callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    mnesia:subscribe(system),
+    {ok, #state{}}.
+
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State};
+
+handle_call(_Req, _From, State) ->
+    {reply, ignore, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({mnesia_system_event, {mnesia_up, Node}}, State) ->
+    lager:error("Mnesia up: ~p~n", [Node]),
+    {noreply, State};
+
+handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
+    lager:error("Mnesia down: ~p~n", [Node]),
+    clean_routes_(Node),
+    update_stats_(),
+    {noreply, State};
+
+handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
+    %% 1. Backup and restart
+    %% 2. Set master nodes
+    lager:critical("Mnesia inconsistent_database event: ~p, ~p~n", [Context, Node]),
+    {noreply, State};
+
+handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) ->
+    lager:critical("Mnesia overload: ~p~n", [Details]),
+    {noreply, State};
+
+handle_info({mnesia_system_event, _Event}, State) ->
+    {noreply, State};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    mnesia:unsubscribe(system).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal Functions
+%%--------------------------------------------------------------------
+
+%% Clean Routes on Node
+clean_routes_(Node) ->
+    Pattern = #mqtt_route{_ = '_', node = Node},
+    Clean = fun() ->
+                [mnesia:delete_object(route, R, write) ||
+                    R <- mnesia:match_object(route, Pattern, write)]
+            end,
+    mnesia:transaction(Clean).
+
+update_stats_() ->
+    emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(route, size)).
+

+ 254 - 0
src/emqttd_server.erl

@@ -0,0 +1,254 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_server).
+
+-behaviour(gen_server2).
+
+-include("emqttd.hrl").
+
+-include("emqttd_protocol.hrl").
+
+-include("emqttd_internal.hrl").
+
+%% Mnesia Callbacks
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+%% API Exports
+-export([start_link/3]).
+
+%% PubSub API
+-export([subscribe/1, subscribe/3, publish/1, unsubscribe/1, unsubscribe/3,
+         update_subscription/4]).
+
+%% gen_server Function Exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {pool, id, env, monitors}).
+
+%%--------------------------------------------------------------------
+%% Mnesia callbacks
+%%--------------------------------------------------------------------
+
+mnesia(boot) ->
+    ok = emqttd_mnesia:create_table(subscription, [
+                {type, bag},
+                {ram_copies, [node()]},
+                {local_content, true}, %% subscription table is local
+                {record_name, mqtt_subscription},
+                {attributes, record_info(fields, mqtt_subscription)}]);
+
+mnesia(copy) ->
+    ok = emqttd_mnesia:copy_table(subscription).
+
+%%--------------------------------------------------------------------
+%% Start server
+%%--------------------------------------------------------------------
+
+%% @doc Start a Server
+-spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
+    Pool :: atom(),
+    Id   :: pos_integer(),
+    Env  :: list(tuple()).
+start_link(Pool, Id, Env) ->
+    gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
+
+%%--------------------------------------------------------------------
+%% PubSub API
+%%--------------------------------------------------------------------
+
+%% @doc Subscribe a Topic
+-spec subscribe(binary()) -> ok.
+subscribe(Topic) when is_binary(Topic) ->
+    From = self(), call(server(From), {subscribe, From, Topic}).
+
+%% @doc Subscribe from a MQTT session.
+-spec subscribe(binary(), binary(), mqtt_qos()) -> ok.
+subscribe(ClientId, Topic, Qos) ->
+    From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
+
+%% @doc Update a subscription.
+-spec update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok.
+update_subscription(ClientId, Topic, OldQos, NewQos) ->
+    call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}).
+
+%% @doc Publish a Message
+-spec publish(Msg :: mqtt_message()) -> ok.
+publish(Msg = #mqtt_message{from = From}) ->
+    trace(publish, From, Msg),
+    Msg1 = #mqtt_message{topic = Topic}
+               = emqttd_broker:foldl_hooks('message.publish', [], Msg),
+    %% Retain message first. Don't create retained topic.
+    Msg2 = case emqttd_retainer:retain(Msg1) of
+               ok     -> emqttd_message:unset_flag(Msg1);
+               ignore -> Msg1
+           end,
+    emqttd_pubsub:publish(Topic, Msg2).
+
+%% @doc Unsubscribe a Topic
+-spec unsubscribe(binary()) -> ok.
+unsubscribe(Topic) when is_binary(Topic) ->
+    From = self(), call(server(From), {unsubscribe, From, Topic}).
+
+%% @doc Unsubscribe a Topic from a MQTT session
+-spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok.
+unsubscribe(ClientId, Topic, Qos) ->
+    From = self(), call(server(From), {unsubscribe, From, ClientId, Topic, Qos}).
+
+call(Server, Req) ->
+    gen_server2:call(Server, Req, infinity).
+
+server(From) ->
+    gproc_pool:pick_worker(server, From).
+
+%%--------------------------------------------------------------------
+%% gen_server Callbacks
+%%--------------------------------------------------------------------
+
+init([Pool, Id, Env]) ->
+    ?GPROC_POOL(join, Pool, Id),
+    {ok, #state{pool = Pool, id = Id, env = Env, monitors = dict:new()}}.
+
+handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
+    add_subscription_(ClientId, Topic, Qos),
+    set_subscription_stats(),
+    do_subscribe_(SubPid, Topic),
+    ok(monitor_subscriber_(ClientId, SubPid, State));
+
+handle_call({subscribe, SubPid, Topic}, _From, State) ->
+    do_subscribe_(SubPid, Topic),
+    ok(monitor_subscriber_(undefined, SubPid, State));
+
+handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) ->
+    OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos},
+    NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos},
+    mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]),
+    set_subscription_stats(), ok(State);
+
+handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, From, State) ->
+    del_subscription_(ClientId, Topic, Qos),
+    set_subscription_stats(),
+    handle_call({unsubscribe, SubPid, Topic}, From, State);
+
+handle_call({unsubscribe, SubPid, Topic}, _From, State) ->
+    emqttd_pubsub:unsubscribe(Topic, SubPid),
+    ets:delete_object(subscribed, {SubPid, Topic}),
+    ok(State);
+
+handle_call(Req, _From, State) ->
+    ?UNEXPECTED_REQ(Req, State).
+
+handle_cast(Msg, State) ->
+    ?UNEXPECTED_MSG(Msg, State).
+
+handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors = Monitors}) ->
+    %% unsubscribe
+    lists:foreach(fun({_, Topic}) ->
+                emqttd_pubsub:async_unsubscribe(Topic, DownPid)
+        end, ets:lookup(subscribed, DownPid)),
+    ets:delete(subscribed, DownPid),
+
+    %% clean subscriptions
+    case dict:find(DownPid, Monitors) of
+        {ok, {undefined, _}} -> ok;
+        {ok, {ClientId,  _}} -> mnesia:dirty_delete(subscription, ClientId);
+        error                -> ok
+    end,
+    {noreply, State#state{monitors = dict:erase(DownPid, Monitors)}};
+
+handle_info(Info, State) ->
+    ?UNEXPECTED_INFO(Info, State).
+
+terminate(_Reason, #state{pool = Pool, id = Id}) ->
+    ?GPROC_POOL(leave, Pool, Id).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal Functions
+%%--------------------------------------------------------------------
+
+%% @private
+%% @doc Add a subscription.
+-spec add_subscription_(binary(), binary(), mqtt_qos()) -> ok.
+add_subscription_(ClientId, Topic, Qos) ->
+    add_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}).
+
+-spec add_subscription_(mqtt_subscription()) -> ok.
+add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
+    mnesia:dirty_write(subscription, Subscription).
+
+update_subscription_(OldSub, NewSub) ->
+    mnesia:delete_object(subscription, OldSub, write),
+    mnesia:write(subscription, NewSub, write).
+
+%% @private
+%% @doc Delete a subscription
+-spec del_subscription_(binary(), binary(), mqtt_qos()) -> ok.
+del_subscription_(ClientId, Topic, Qos) ->
+    del_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}).
+
+del_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
+    mnesia:dirty_delete_object(subscription, Subscription).
+
+%% @private
+%% @doc Call pubsub to subscribe
+do_subscribe_(SubPid, Topic) ->
+    case ets:match(subscribed, {SubPid, Topic}) of
+        [] ->
+            emqttd_pubsub:subscribe(Topic, SubPid),
+            ets:insert(subscribed, {SubPid, Topic});
+        [_] ->
+            false
+    end.
+
+monitor_subscriber_(ClientId, SubPid, State = #state{monitors = Monitors}) ->
+    case dict:find(SubPid, Monitors) of
+        {ok, _} ->
+            State;
+        error ->
+            MRef = erlang:monitor(process, SubPid),
+            State#state{monitors = dict:store(SubPid, {ClientId, MRef}, Monitors)}
+    end.
+
+%%--------------------------------------------------------------------
+%% Trace Functions
+%%--------------------------------------------------------------------
+
+trace(publish, From, _Msg) when is_atom(From) ->
+    %% Dont' trace '$SYS' publish
+    ignore;
+
+trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
+    lager:info([{client, From}, {topic, Topic}],
+               "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
+
+%%--------------------------------------------------------------------
+%% Subscription Statistics
+%%--------------------------------------------------------------------
+
+set_subscription_stats() ->
+    emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', mnesia:table_info(subscription, size)).
+
+%%--------------------------------------------------------------------
+
+ok(State) -> {reply, ok, State}.
+

+ 1 - 3
src/lager_emqtt_backend.erl

@@ -77,9 +77,7 @@ publish_log(Message, State = #state{formatter = Formatter,
                                     format_config = FormatConfig}) ->
     Severity = lager_msg:severity(Message),
     Payload = Formatter:format(Message, FormatConfig),
-    emqttd_pubsub:publish(
-      emqttd_message:make(
-        log, topic(Severity), iolist_to_binary(Payload))),
+    emqttd:publish(emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload))),
     {ok, State}.
 
 topic(Severity) ->