Просмотр исходного кода

0.17.0 - Improve the design of Hook, PubSub and Router

Feng 10 лет назад
Родитель
Сommit
d9d7581013

+ 55 - 14
src/emqttd.erl

@@ -22,9 +22,13 @@
 
 -export([start/0, env/1, env/2, is_running/1]).
 
--export([create/2, publish/1, subscribe/1, subscribe/3,
+%% PubSub API
+-export([create/2, lookup/2, publish/1, subscribe/1, subscribe/3,
          unsubscribe/1, unsubscribe/3]).
 
+%% Hooks API
+-export([hook/4, hook/3, unhook/2, run_hooks/3]).
+
 -define(APP, ?MODULE).
 
 %%--------------------------------------------------------------------
@@ -32,19 +36,19 @@
 %%--------------------------------------------------------------------
 
 %% @doc Start emqttd application.
--spec start() -> ok | {error, any()}.
+-spec(start() -> ok | {error, any()}).
 start() -> application:start(?APP).
 
 %% @doc Group environment
--spec env(Group :: atom()) -> list().
+-spec(env(Group :: atom()) -> list()).
 env(Group) -> application:get_env(?APP, Group, []).
 
 %% @doc Get environment
--spec env(Group :: atom(), Name :: atom()) -> undefined | any().
+-spec(env(Group :: atom(), Name :: atom()) -> undefined | any()).
 env(Group, Name) -> proplists:get_value(Name, env(Group)).
 
 %% @doc Is running?
--spec is_running(node()) -> boolean().
+-spec(is_running(node()) -> boolean()).
 is_running(Node) ->
     case rpc:call(Node, erlang, whereis, [?APP]) of
         {badrpc, _}          -> false;
@@ -56,30 +60,67 @@ is_running(Node) ->
 %% PubSub APIs that wrap emqttd_server, emqttd_pubsub
 %%--------------------------------------------------------------------
 
-%% @doc Create a Topic
+%% @doc Lookup Topic or Subscription
+-spec(lookup(topic, binary()) -> [mqtt_topic()];
+            (subscription, binary()) -> [mqtt_subscription()]).
+lookup(topic, Topic) when is_binary(Topic) ->
+    emqttd_pubsub:lookup_topic(Topic);
+
+lookup(subscription, ClientId) when is_binary(ClientId) ->
+    emqttd_server:lookup_subscription(ClientId).
+
+%% @doc Create a Topic or Subscription
+-spec(create(topic | subscription, binary()) -> ok | {error, any()}).
 create(topic, Topic) when is_binary(Topic) ->
-    emqttd_pubsub:create_topic(Topic).
+    emqttd_pubsub:create_topic(Topic);
+
+create(subscription, {ClientId, Topic, Qos}) ->
+    Subscription = #mqtt_subscription{subid = ClientId, topic = Topic, qos = ?QOS_I(Qos)},
+    emqttd_backend:add_subscription(Subscription).
 
 %% @doc Publish MQTT Message
--spec publish(mqtt_message()) -> ok.
+-spec(publish(mqtt_message()) -> ok).
 publish(Msg) when is_record(Msg, mqtt_message) ->
-    emqttd_server:publish(Msg).
+    emqttd_server:publish(Msg), ok.
 
 %% @doc Subscribe
--spec subscribe(binary()) -> ok.
+-spec(subscribe(binary()) -> ok;
+               ({binary(), binary(), mqtt_qos()}) -> ok).
 subscribe(Topic) when is_binary(Topic) ->
-    emqttd_server:subscribe(Topic).
+    emqttd_server:subscribe(Topic);
+subscribe({ClientId, Topic, Qos}) ->
+    subscribe(ClientId, Topic, Qos).
 
--spec subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}.
+-spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}).
 subscribe(ClientId, Topic, Qos) ->
     emqttd_server:subscribe(ClientId, Topic, Qos).
 
 %% @doc Unsubscribe
--spec unsubscribe(binary()) -> ok.
+-spec(unsubscribe(binary()) -> ok).
 unsubscribe(Topic) when is_binary(Topic) ->
     emqttd_server:unsubscribe(Topic).
 
--spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok.
+-spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok).
 unsubscribe(ClientId, Topic, Qos) ->
     emqttd_server:unsubscribe(ClientId, Topic, Qos).
 
+%%--------------------------------------------------------------------
+%% Hooks API
+%%--------------------------------------------------------------------
+
+-spec(hook(atom(), function(), list(any())) -> ok | {error, any()}).
+hook(Hook, Function, InitArgs) ->
+    emqttd_hook:add(Hook, Function, InitArgs).
+
+-spec(hook(atom(), function(), list(any()), integer()) -> ok | {error, any()}).
+hook(Hook, Function, InitArgs, Priority) ->
+    emqttd_hook:add(Hook, Function, InitArgs, Priority).
+
+-spec(unhook(atom(), function()) -> ok | {error, any()}).
+unhook(Hook, Function) ->
+    emqttd_hook:delete(Hook, Function).
+
+-spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}).
+run_hooks(Hook, Args, Acc) ->
+    emqttd_hook:run(Hook, Args, Acc).
+

+ 1 - 0
src/emqttd_app.erl

@@ -79,6 +79,7 @@ print_vsn() ->
 
 start_servers(Sup) ->
     Servers = [{"emqttd ctl", emqttd_ctl},
+               {"emqttd hook", emqttd_hook},
                {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
                {"emqttd stats", emqttd_stats},
                {"emqttd metrics", emqttd_metrics},

+ 0 - 65
src/emqttd_broker.erl

@@ -28,9 +28,6 @@
 %% Event API
 -export([subscribe/1, notify/2]).
 
-%% Hook API
--export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]).
-
 %% Broker API
 -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
 
@@ -100,40 +97,6 @@ datetime() ->
         io_lib:format(
             "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
 
-%% @doc Hook
--spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
-hook(Hook, Name, MFA) ->
-    gen_server:call(?SERVER, {hook, Hook, Name, MFA}).
-
-%% @doc Unhook
--spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
-unhook(Hook, Name) ->
-    gen_server:call(?SERVER, {unhook, Hook, Name}).
-
-%% @doc Foreach hooks
--spec foreach_hooks(Hook :: atom(), Args :: list()) -> any().
-foreach_hooks(Hook, Args) ->
-    case ets:lookup(?BROKER_TAB, {hook, Hook}) of
-        [{_, Hooks}] ->
-            lists:foreach(fun({_Name, {M, F, A}}) ->
-                    apply(M, F, Args++A)
-                end, Hooks);
-        [] ->
-            ok
-    end.
-
-%% @doc Foldl hooks
--spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any().
-foldl_hooks(Hook, Args, Acc0) ->
-    case ets:lookup(?BROKER_TAB, {hook, Hook}) of
-        [{_, Hooks}] -> 
-            lists:foldl(fun({_Name, {M, F, A}}, Acc) -> 
-                    apply(M, F, lists:append([Args, [Acc], A]))
-                end, Acc0, Hooks);
-        [] -> 
-            Acc0
-    end.
-
 %% @doc Start a tick timer
 start_tick(Msg) ->
     start_tick(timer:seconds(env(sys_interval)), Msg).
@@ -167,31 +130,6 @@ init([]) ->
 handle_call(uptime, _From, State) ->
     {reply, uptime(State), State};
 
-handle_call({hook, Hook, Name, MFArgs}, _From, State) ->
-    Key = {hook, Hook}, Reply =
-    case ets:lookup(?BROKER_TAB, Key) of
-        [{Key, Hooks}] ->
-            case lists:keyfind(Name, 1, Hooks) of
-                {Name, _MFArgs} ->
-                    {error, existed};
-                false ->
-                    insert_hooks(Key, Hooks ++ [{Name, MFArgs}])
-            end;
-        [] ->
-            insert_hooks(Key, [{Name, MFArgs}])
-    end,
-    {reply, Reply, State};
-
-handle_call({unhook, Hook, Name}, _From, State) ->
-    Key = {hook, Hook}, Reply =
-    case ets:lookup(?BROKER_TAB, Key) of
-        [{Key, Hooks}] ->
-            insert_hooks(Key, lists:keydelete(Name, 1, Hooks));
-        [] ->
-            {error, not_found}
-    end,
-    {reply, Reply, State};
-
 handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
 
@@ -224,9 +162,6 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-insert_hooks(Key, Hooks) ->
-    ets:insert(?BROKER_TAB, {Key, Hooks}), ok.
-
 create_topic(Topic) ->
     emqttd:create(topic, emqttd_topic:systop(Topic)).
 

+ 54 - 56
src/emqttd_cli.erl

@@ -40,7 +40,7 @@
                         stack_size,
                         reductions]).
 
--define(MAX_LINES, 20000).
+-define(MAX_LINES, 10000).
 
 -define(APP, emqttd).
 
@@ -148,7 +148,7 @@ users(Args) -> emqttd_auth_username:cli(Args).
 %%--------------------------------------------------------------------
 %% @doc Query clients
 clients(["list"]) ->
-    dump(ets, mqtt_client, fun print/1);
+    dump(mqtt_client);
 
 clients(["show", ClientId]) ->
     if_client(ClientId, fun print/1);
@@ -173,10 +173,10 @@ sessions(["list"]) ->
     [sessions(["list", Type]) || Type <- ["persistent", "transient"]];
 
 sessions(["list", "persistent"]) ->
-    dump(ets, mqtt_persistent_session, fun print/1);
+    dump(mqtt_persistent_session);
 
 sessions(["list", "transient"]) ->
-    dump(ets, mqtt_transient_session,  fun print/1);
+    dump(mqtt_transient_session);
 
 sessions(["show", ClientId]) ->
     MP = {{bin(ClientId), '_'}, '_'},
@@ -199,11 +199,10 @@ sessions(_) ->
 %%--------------------------------------------------------------------
 %% @doc Routes Command
 routes(["list"]) ->
-    Print = fun(Topic, Records) -> print(route, Topic, Records) end,
-    if_could_print(route, Print);
+    if_could_print(route, fun print/1);
 
 routes(["show", Topic]) ->
-    print(route, Topic, mnesia:dirty_read(route, bin(Topic)));
+    print(mnesia:dirty_read(route, bin(Topic)));
 
 routes(_) ->
     ?USAGE([{"routes list",         "List all routes"},
@@ -212,28 +211,25 @@ routes(_) ->
 %%--------------------------------------------------------------------
 %% @doc Topics Command
 topics(["list"]) ->
-    Print = fun(Topic, Records) -> print(topic, Topic, Records) end,
-    if_could_print(topic, Print);
+    if_could_print(topic, fun print/1);
 
 topics(["show", Topic]) ->
-    print(topic, Topic, ets:lookup(topic, bin(Topic)));
+    print(mnesia:dirty_read(topic, bin(Topic)));
 
 topics(_) ->
     ?USAGE([{"topics list",         "List all topics"},
             {"topics show <Topic>", "Show a topic"}]).
 
 subscriptions(["list"]) ->
-    Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end,
-    if_could_print(subscription, Print);
+    if_could_print(subscription, fun print/1);
 
 subscriptions(["list", "static"]) ->
-    Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end,
-    if_could_print(backend_subscription, Print);
+    if_could_print(backend_subscription, fun print/1);
 
 subscriptions(["show", ClientId]) ->
     case mnesia:dirty_read(subscription, bin(ClientId)) of
         []      -> ?PRINT_MSG("Not Found.~n");
-        Records -> print(subscription, ClientId, Records)
+        Records -> print(Records)
     end;
 
 subscriptions(["add", ClientId, Topic, QoS]) ->
@@ -242,11 +238,11 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
                                               topic = bin(Topic),
                                               qos   = IntQos},
             case emqttd_backend:add_subscription(Subscription) of
-                {atomic, ok} ->
+                ok ->
                     ?PRINT_MSG("ok~n");
-                {aborted, {error, existed}} ->
+                {error, already_existed} ->
                     ?PRINT_MSG("Error: already existed~n");
-                {aborted, Reason} ->
+                {error, Reason} ->
                     ?PRINT("Error: ~p~n", [Reason])
             end
           end,
@@ -274,7 +270,7 @@ if_could_print(Tab, Fun) ->
             ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]);
         _Size ->
             Keys = mnesia:dirty_all_keys(Tab),
-            foreach(fun(Key) -> Fun(Key, ets:lookup(Tab, Key)) end, Keys)
+            foreach(fun(Key) -> Fun(ets:lookup(Tab, Key)) end, Keys)
     end.
 
 if_valid_qos(QoS, Fun) ->
@@ -465,23 +461,53 @@ listeners([]) ->
 listeners(_) ->
     ?PRINT_CMD("listeners", "List listeners").
 
+%%--------------------------------------------------------------------
+%% Dump ETS
+%%--------------------------------------------------------------------
+
+dump(Table) ->
+    dump(Table, ets:first(Table)).
+
+dump(_Table, '$end_of_table') ->
+    ok;
+
+dump(Table, Key) ->
+    case ets:lookup(Table, Key) of
+        [Record] -> print(Record);
+        [] -> ok
+    end,
+    dump(Table, ets:next(Table, Key)).
+
+print([]) ->
+    ok;
+
+print(Routes = [#mqtt_route{topic = Topic} | _]) ->
+    Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes],
+    ?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]);
+
+print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) ->
+    TopicTable = [io_lib:format("~s:~w", [Topic, Qos])
+                  || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
+    ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]);
+
+print(Topics = [#mqtt_topic{}|_]) ->
+    foreach(fun print/1, Topics);
+
 print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
     ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
-               [Name, Ver, Descr, Active]);
+           [Name, Ver, Descr, Active]);
 
-print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess,
-                   username = Username, peername = Peername,
-                   connected_at = ConnectedAt}) ->
+print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username,
+                   peername = Peername, connected_at = ConnectedAt}) ->
     ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n",
-            [ClientId, CleanSess, Username,
-             emqttd_net:format(Peername),
-             emqttd_time:now_to_secs(ConnectedAt)]);
+           [ClientId, CleanSess, Username, emqttd_net:format(Peername),
+            emqttd_time:now_to_secs(ConnectedAt)]);
 
 print(#mqtt_topic{topic = Topic, flags = Flags}) ->
-    ?PRINT("~s: ~p~n", [Topic, Flags]);
+    ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]);
 
 print(#mqtt_route{topic = Topic, node = Node}) ->
-    ?PRINT("~s: ~s~n", [Topic, Node]);
+    ?PRINT("~s -> ~s~n", [Topic, Node]);
 
 print({{ClientId, _ClientPid}, SessInfo}) ->
     InfoKeys = [clean_sess, 
@@ -499,39 +525,11 @@ print({{ClientId, _ClientPid}, SessInfo}) ->
            "created_at=~w)~n",
             [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]).
 
-print(route, Topic, Routes) ->
-    Nodes = [Node || #mqtt_route{node = Node} <- Routes],
-    ?PRINT("~s: ~p~n", [Topic, Nodes]);
-
-print(topic, _Topic, Records) ->
-    [print(R) || R <- Records];
-
-print(subscription, ClientId, Subscriptions) ->
-    TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
-    ?PRINT("~s: ~p~n", [ClientId, TopicTable]).
-
 format(created_at, Val) ->
     emqttd_time:now_to_secs(Val);
 
-format(subscriptions, List) ->
-    string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ",");
-
 format(_, Val) ->
     Val.
 
 bin(S) -> iolist_to_binary(S).
 
-%%TODO: ...
-dump(ets, Table, Fun) ->
-    dump(ets, Table, ets:first(Table), Fun).
-
-dump(ets, _Table, '$end_of_table', _Fun) ->
-    ok;
-
-dump(ets, Table, Key, Fun) ->
-    case ets:lookup(Table, Key) of
-        [Record] -> Fun(Record);
-        [] -> ignore
-    end,
-    dump(ets, Table, ets:next(Table, Key), Fun).
-

+ 155 - 0
src/emqttd_hook.erl

@@ -0,0 +1,155 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_hook).
+
+-author("Feng Lee <feng@emqtt.io>").
+
+-behaviour(gen_server).
+
+%% Start
+-export([start_link/0]).
+
+%% Hooks API
+-export([add/3, add/4, delete/2, run/3, lookup/1]).
+
+%% gen_server Function Exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {}).
+
+-record(callback, {function       :: function(),
+                   init_args = [] :: list(any()),
+                   priority  = 0  :: integer()}).
+
+-record(hook, {name :: atom(), callbacks = [] :: list(#callback{})}).
+
+-define(HOOK_TAB, mqtt_hook).
+
+%%--------------------------------------------------------------------
+%% Start API
+%%--------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%--------------------------------------------------------------------
+%% Hooks API
+%%--------------------------------------------------------------------
+
+-spec(add(atom(), function(), list(any())) -> ok).
+add(HookPoint, Function, InitArgs) ->
+    add(HookPoint, Function, InitArgs, 0).
+
+-spec(add(atom(), function(), list(any()), integer()) -> ok).
+add(HookPoint, Function, InitArgs, Priority) ->
+    gen_server:call(?MODULE, {add, HookPoint, Function, InitArgs, Priority}).
+
+-spec(delete(atom(), function()) -> ok).
+delete(HookPoint, Function) ->
+    gen_server:call(?MODULE, {delete, HookPoint, Function}).
+
+-spec(run(atom(), list(any()), any()) -> any()).
+run(HookPoint, Args, Acc) ->
+    run_(lookup(HookPoint), Args, Acc).
+
+%% @private
+run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) ->
+    case apply(Fun, lists:append([Args, [Acc], InitArgs])) of
+        ok             -> run_(Callbacks, Args, Acc);
+        {ok, NewAcc}   -> run_(Callbacks, Args, NewAcc);
+        stop           -> {stop, Acc};
+        {stop, NewAcc} -> {stop, NewAcc}
+    end;
+
+run_([], _Args, Acc) ->
+    {ok, Acc}.
+
+-spec(lookup(atom()) -> [#callback{}]).
+lookup(HookPoint) ->
+    case ets:lookup(?HOOK_TAB, HookPoint) of
+        [] -> [];
+        [#hook{callbacks = Callbacks}] -> Callbacks
+    end.
+
+%%--------------------------------------------------------------------
+%% gen_server Callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    ets:new(?HOOK_TAB, [set, protected, named_table, {keypos, #hook.name}]),
+    {ok, #state{}}.
+
+handle_call({add, HookPoint, Function, InitArgs, Priority}, _From, State) ->
+    Reply =
+    case ets:lookup(?HOOK_TAB, HookPoint) of
+        [#hook{callbacks = Callbacks}] ->
+            case lists:keyfind(Function, #callback.function, Callbacks) of
+                false ->
+                    Callback = #callback{function  = Function,
+                                         init_args = InitArgs,
+                                         priority  = Priority},
+                    insert_hook_(HookPoint, add_callback_(Callback, Callbacks));
+                _Callback ->
+                    {error, already_hooked}
+            end;
+        [] ->
+            Callback = #callback{function  = Function,
+                                 init_args = InitArgs,
+                                 priority  = Priority},
+            insert_hook_(HookPoint, [Callback])
+    end,
+    {reply, Reply, State};
+
+handle_call({delete, HookPoint, Function}, _From, State) ->
+    Reply =
+    case ets:lookup(?HOOK_TAB, HookPoint) of
+        [#hook{callbacks = Callbacks}] ->
+            insert_hook_(HookPoint, del_callback_(Function, Callbacks));
+        [] ->
+            {error, not_found}
+    end,
+    {reply, Reply, State};
+
+handle_call(_Req, _From, State) ->
+    {reply, ignore, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+insert_hook_(HookPoint, Callbacks) ->
+    ets:insert(?HOOK_TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok.
+
+add_callback_(Callback, Callbacks) ->
+    lists:keymerge(#callback.priority, Callbacks, [Callback]).
+
+del_callback_(Function, Callbacks) ->
+    lists:keydelete(Function, #callback.function, Callbacks).
+

+ 25 - 31
src/emqttd_mod_presence.erl

@@ -23,57 +23,51 @@
 
 -export([load/1, unload/1]).
 
--export([client_connected/3, client_disconnected/3]).
+-export([on_client_connected/3, on_client_disconnected/3]).
 
 load(Opts) ->
-    emqttd_broker:hook('client.connected', {?MODULE, client_connected},
-                        {?MODULE, client_connected, [Opts]}),
-    emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected},
-                        {?MODULE, client_disconnected, [Opts]}),
-    ok.
+    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Opts]),
+    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Opts]).
 
-client_connected(ConnAck, #mqtt_client{client_id  = ClientId,
-                                       username   = Username,
-                                       peername   = {IpAddress, _},
-                                       clean_sess = CleanSess,
-                                       proto_ver  = ProtoVer}, Opts) ->
-    Sess = case CleanSess of
-        true -> false;
-        false -> true
-    end,
+on_client_connected(ConnAck, Client = #mqtt_client{client_id  = ClientId,
+                                                   username   = Username,
+                                                   peername   = {IpAddr, _},
+                                                   clean_sess = CleanSess,
+                                                   proto_ver  = ProtoVer}, Opts) ->
     Json = mochijson2:encode([{clientid, ClientId},
                               {username, Username},
-                              {ipaddress, list_to_binary(emqttd_net:ntoa(IpAddress))},
-                              {session, Sess},
+                              {ipaddress, list_to_binary(emqttd_net:ntoa(IpAddr))},
+                              {session, sess(CleanSess)},
                               {protocol, ProtoVer},
                               {connack, ConnAck},
                               {ts, emqttd_time:now_to_secs()}]),
-    Msg = emqttd_message:make(presence,
-                              proplists:get_value(qos, Opts, 0),
-                              topic(connected, ClientId),
-                              iolist_to_binary(Json)),
-    emqttd:publish(Msg).
+    emqttd:publish(message(qos(Opts), topic(connected, ClientId), Json)),
+    {ok, Client}.
 
-client_disconnected(Reason, ClientId, Opts) ->
+on_client_disconnected(Reason, ClientId, Opts) ->
     Json = mochijson2:encode([{clientid, ClientId},
                               {reason, reason(Reason)},
                               {ts, emqttd_time:now_to_secs()}]),
-    Msg = emqttd_message:make(presence,
-                              proplists:get_value(qos, Opts, 0),
-                              topic(disconnected, ClientId),
-                              iolist_to_binary(Json)),
-    emqttd:publish(Msg).
+    emqttd:publish(message(qos(Opts), topic(disconnected, ClientId), Json)).
 
 unload(_Opts) ->
-    emqttd_broker:unhook('client.connected', {?MODULE, client_connected}),
-    emqttd_broker:unhook('client.disconnected', {?MODULE, client_disconnected}).
+    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
+    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3).
+
+sess(false) -> true;
+sess(true)  -> false.
+
+qos(Opts) -> proplists:get_value(qos, Opts, 0).
+
+message(Qos, Topic, Json) ->
+    emqttd_message:make(presence, Qos, Topic, iolist_to_binary(Json)).
 
 topic(connected, ClientId) ->
     emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));
 topic(disconnected, ClientId) ->
     emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])).
 
-reason(Reason) when is_atom(Reason)    -> Reason;
+reason(Reason) when is_atom(Reason) -> Reason;
 reason({Error, _}) when is_atom(Error) -> Error;
 reason(_) -> internal_error.
 

+ 19 - 23
src/emqttd_mod_rewrite.erl

@@ -23,7 +23,7 @@
 
 -export([load/1, reload/1, unload/1]).
 
--export([rewrite/3, rewrite/4]).
+-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]).
 
 %%--------------------------------------------------------------------
 %% API
@@ -33,23 +33,19 @@ load(Opts) ->
     File = proplists:get_value(file, Opts),
     {ok, Terms} = file:consult(File),
     Sections = compile(Terms),
-    emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, 
-                        {?MODULE, rewrite, [subscribe, Sections]}),
-    emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe},
-                        {?MODULE, rewrite, [unsubscribe, Sections]}),
-    emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish},
-                        {?MODULE, rewrite, [publish, Sections]}),
-    ok.
-
-rewrite(_ClientId, TopicTable, subscribe, Sections) ->
-    lager:info("rewrite subscribe: ~p", [TopicTable]),
-    [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
-
-rewrite(_ClientId, Topics, unsubscribe, Sections) ->
-    lager:info("rewrite unsubscribe: ~p", [Topics]),
-    [match_topic(Topic, Sections) || Topic <- Topics].
-
-rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) ->
+    emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]),
+    emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]),
+    emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]).
+
+rewrite_subscribe(_ClientId, TopicTable, Sections) ->
+    lager:info("Rewrite subscribe: ~p", [TopicTable]),
+    {ok, [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]}.
+
+rewrite_unsubscribe(_ClientId, Topics, Sections) ->
+    lager:info("Rewrite unsubscribe: ~p", [Topics]),
+    {ok, [match_topic(Topic, Sections) || Topic <- Topics]}.
+
+rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
     %%TODO: this will not work if the client is always online.
     RewriteTopic =
     case get({rewrite, Topic}) of
@@ -59,11 +55,11 @@ rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) ->
         DestTopic ->
             DestTopic
         end,
-    Message#mqtt_message{topic = RewriteTopic}.
+    {ok, Message#mqtt_message{topic = RewriteTopic}}.
 
 reload(File) ->
     %%TODO: The unload api is not right...
-    case emqttd:is_mod_enabled(rewrite) of
+    case emqttd_app:is_mod_enabled(rewrite) of
         true -> 
             unload(state),
             load([{file, File}]);
@@ -72,9 +68,9 @@ reload(File) ->
     end.
             
 unload(_) ->
-    emqttd_broker:unhook('client.subscribe',  {?MODULE, rewrite_subscribe}),
-    emqttd_broker:unhook('client.unsubscribe',{?MODULE, rewrite_unsubscribe}),
-    emqttd_broker:unhook('message.publish',   {?MODULE, rewrite_publish}).
+    emqttd:unhook('client.subscribe',  fun ?MODULE:rewrite_subscribe/3),
+    emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3),
+    emqttd:unhook('message.publish',   fun ?MODULE:rewrite_publish/2).
 
 %%--------------------------------------------------------------------
 %% Internal functions

+ 12 - 11
src/emqttd_mod_subscription.erl

@@ -23,26 +23,27 @@
 
 -include("emqttd_protocol.hrl").
 
--export([load/1, client_connected/3, unload/1]).
+-export([load/1, on_client_connected/3, unload/1]).
 
 -record(state, {topics, backend = false}).
 
 load(Opts) ->
     Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)],
     State = #state{topics = Topics, backend = lists:member(backend, Opts)},
-    emqttd_broker:hook('client.connected', {?MODULE, client_connected},
-                       {?MODULE, client_connected, [State]}),
-    ok.
+    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [State]).
+
+on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id  = ClientId,
+                                                           client_pid = ClientPid,
+                                                           username   = Username},
+                    #state{topics = Topics, backend = Backend}) ->
 
-client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id  = ClientId,
-                                               client_pid = ClientPid,
-                                               username   = Username},
-                 #state{topics = Topics, backend = Backend}) ->
     Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end,
     TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- with_backend(Backend, ClientId, Topics)],
-    emqttd_client:subscribe(ClientPid, TopicTable);
+    emqttd_client:subscribe(ClientPid, TopicTable),
+    {ok, Client};
 
-client_connected(_ConnAck, _Client, _State) -> ok.
+on_client_connected(_ConnAck, _Client, _State) ->
+    ok.
 
 with_backend(false, _ClientId, TopicTable) ->
     TopicTable;
@@ -51,7 +52,7 @@ with_backend(true, ClientId, TopicTable) ->
     emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_backend:lookup_subscriptions(ClientId)], TopicTable).
 
 unload(_Opts) ->
-    emqttd_broker:unhook('client.connected', {?MODULE, client_connected}).
+    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3).
 
 rep(<<"$c">>, ClientId, Topic) ->
     emqttd_topic:feed_var(<<"$c">>, ClientId, Topic);

+ 5 - 3
src/emqttd_protocol.erl

@@ -165,7 +165,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
             {ReturnCode, false, State1}
     end,
     %% Run hooks
-    emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]),
+    emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)),
     %% Send connack
     send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
 
@@ -247,7 +247,9 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
     end.
 
 -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
-send(Msg, State) when is_record(Msg, mqtt_message) ->
+send(Msg, State = #proto_state{client_id = ClientId})
+        when is_record(Msg, mqtt_message) ->
+    emqttd:run_hooks('message.delivered', [ClientId], Msg),
     send(emqttd_message:to_packet(Msg), State);
 
 send(Packet, State = #proto_state{sendfun = SendFun})
@@ -281,7 +283,7 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) ->
 shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) ->
     ?LOG(info, "Shutdown for ~p", [Error], State),
     send_willmsg(ClientId, WillMsg),
-    emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]),
+    emqttd:run_hooks('client.disconnected', [Error], ClientId),
     %% let it down
     %% emqttd_cm:unregister(ClientId).
     ok.

+ 12 - 12
src/emqttd_pubsub.erl

@@ -60,15 +60,15 @@ mnesia(copy) ->
 %%--------------------------------------------------------------------
 
 %% @doc Start one pubsub
--spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
-    Pool :: atom(),
-    Id   :: pos_integer(),
-    Env  :: list(tuple()).
+-spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
+      Pool :: atom(),
+      Id   :: pos_integer(),
+      Env  :: list(tuple())).
 start_link(Pool, Id, Env) ->
     gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
 
 %% @doc Create a Topic.
--spec create_topic(binary()) -> ok | {error, any()}.
+-spec(create_topic(binary()) -> ok | {error, any()}).
 create_topic(Topic) when is_binary(Topic) ->
     case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of
         {atomic, ok}     -> ok;
@@ -76,7 +76,7 @@ create_topic(Topic) when is_binary(Topic) ->
     end.
 
 %% @doc Lookup a Topic.
--spec lookup_topic(binary()) -> list(mqtt_topic()).
+-spec(lookup_topic(binary()) -> list(mqtt_topic())).
 lookup_topic(Topic) when is_binary(Topic) ->
     mnesia:dirty_read(topic, Topic).
 
@@ -85,17 +85,17 @@ lookup_topic(Topic) when is_binary(Topic) ->
 %%--------------------------------------------------------------------
 
 %% @doc Subscribe a Topic
--spec subscribe(binary(), pid()) -> ok.
+-spec(subscribe(binary(), pid()) -> ok).
 subscribe(Topic, SubPid) when is_binary(Topic) ->
     call(pick(Topic), {subscribe, Topic, SubPid}).
 
 %% @doc Asynchronous Subscribe
--spec async_subscribe(binary(), pid()) -> ok.
+-spec(async_subscribe(binary(), pid()) -> ok).
 async_subscribe(Topic, SubPid) when is_binary(Topic) ->
     cast(pick(Topic), {subscribe, Topic, SubPid}).
 
 %% @doc Publish message to Topic.
--spec publish(binary(), any()) -> ok.
+-spec(publish(binary(), any()) -> any()).
 publish(Topic, Msg) ->
     lists:foreach(
         fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
@@ -105,7 +105,7 @@ publish(Topic, Msg) ->
         end, emqttd_router:lookup(Topic)).
 
 %% @doc Dispatch Message to Subscribers
--spec dispatch(binary(), mqtt_message()) -> ok.
+-spec(dispatch(binary(), mqtt_message()) -> ok).
 dispatch(Queue = <<"$queue/", _T>>, Msg) ->
     case subscribers(Queue) of
         [] ->
@@ -148,12 +148,12 @@ dropped(_Topic) ->
     emqttd_metrics:inc('messages/dropped').
 
 %% @doc Unsubscribe
--spec unsubscribe(binary(), pid()) -> ok.
+-spec(unsubscribe(binary(), pid()) -> ok).
 unsubscribe(Topic, SubPid) when is_binary(Topic) ->
     call(pick(Topic), {unsubscribe, Topic, SubPid}).
 
 %% @doc Asynchronous Unsubscribe
--spec async_unsubscribe(binary(), pid()) -> ok.
+-spec(async_unsubscribe(binary(), pid()) -> ok).
 async_unsubscribe(Topic, SubPid)  when is_binary(Topic) ->
     cast(pick(Topic), {unsubscribe, Topic, SubPid}).
 

+ 8 - 8
src/emqttd_router.erl

@@ -63,31 +63,31 @@ start_link() ->
 %%--------------------------------------------------------------------
 
 %% @doc Lookup Routes.
--spec lookup(Topic:: binary()) -> [mqtt_route()].
+-spec(lookup(Topic:: binary()) -> [mqtt_route()]).
 lookup(Topic) when is_binary(Topic) ->
     Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]),
     %% Optimize: route table will be replicated to all nodes.
     lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]).
 
 %% @doc Print Routes.
--spec print(Topic :: binary()) -> [ok].
+-spec(print(Topic :: binary()) -> [ok]).
 print(Topic) ->
     [io:format("~s -> ~s~n", [To, Node]) ||
         #mqtt_route{topic = To, node = Node} <- lookup(Topic)].
 
 %% @doc Add Route
--spec add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}.
+-spec(add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}).
 add_route(Topic) when is_binary(Topic) ->
     add_route(#mqtt_route{topic = Topic, node = node()});
 add_route(Route) when is_record(Route, mqtt_route) ->
     add_routes([Route]).
 
--spec add_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}.
+-spec(add_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}).
 add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
     add_route(#mqtt_route{topic = Topic, node = Node}).
 
 %% @doc Add Routes
--spec add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}.
+-spec(add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}).
 add_routes(Routes) ->
     Add = fun() -> [add_route_(Route) || Route <- Routes] end,
     case mnesia:transaction(Add) of
@@ -112,18 +112,18 @@ add_route_(Route = #mqtt_route{topic = Topic}) ->
     end.
 
 %% @doc Delete Route
--spec del_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}.
+-spec(del_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}).
 del_route(Topic) when is_binary(Topic) ->
     del_route(#mqtt_route{topic = Topic, node = node()});
 del_route(Route) when is_record(Route, mqtt_route) ->
     del_routes([Route]).
 
--spec del_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}.
+-spec(del_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}).
 del_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->
     del_route(#mqtt_route{topic = Topic, node = Node}).
 
 %% @doc Delete Routes
--spec del_routes([mqtt_route()]) -> ok | {error, any()}.
+-spec(del_routes([mqtt_route()]) -> ok | {error, any()}).
 del_routes(Routes) ->
     Del = fun() -> [del_route_(Route) || Route <- Routes] end,
     case mnesia:transaction(Del) of

+ 32 - 23
src/emqttd_server.erl

@@ -35,7 +35,7 @@
 
 %% PubSub API
 -export([subscribe/1, subscribe/3, publish/1, unsubscribe/1, unsubscribe/3,
-         update_subscription/4]).
+         lookup_subscription/1, update_subscription/4]).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -63,10 +63,10 @@ mnesia(copy) ->
 %%--------------------------------------------------------------------
 
 %% @doc Start a Server
--spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
-    Pool :: atom(),
-    Id   :: pos_integer(),
-    Env  :: list(tuple()).
+-spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
+      Pool :: atom(),
+      Id   :: pos_integer(),
+      Env  :: list(tuple())).
 start_link(Pool, Id, Env) ->
     gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
 
@@ -75,40 +75,48 @@ start_link(Pool, Id, Env) ->
 %%--------------------------------------------------------------------
 
 %% @doc Subscribe a Topic
--spec subscribe(binary()) -> ok.
+-spec(subscribe(binary()) -> ok).
 subscribe(Topic) when is_binary(Topic) ->
     From = self(), call(server(From), {subscribe, From, Topic}).
 
 %% @doc Subscribe from a MQTT session.
--spec subscribe(binary(), binary(), mqtt_qos()) -> ok.
+-spec(subscribe(binary(), binary(), mqtt_qos()) -> ok).
 subscribe(ClientId, Topic, Qos) ->
     From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
 
+%% @doc Lookup subscriptions.
+-spec(lookup_subscription(binary()) -> [#mqtt_subscription{}]).
+lookup_subscription(ClientId) ->
+    mnesia:dirty_read(subscription, ClientId).
+
 %% @doc Update a subscription.
--spec update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok.
+-spec(update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok).
 update_subscription(ClientId, Topic, OldQos, NewQos) ->
     call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}).
 
 %% @doc Publish a Message
--spec publish(Msg :: mqtt_message()) -> ok.
+-spec(publish(Msg :: mqtt_message()) -> any()).
 publish(Msg = #mqtt_message{from = From}) ->
     trace(publish, From, Msg),
-    Msg1 = #mqtt_message{topic = Topic}
-               = emqttd_broker:foldl_hooks('message.publish', [], Msg),
-    %% Retain message first. Don't create retained topic.
-    Msg2 = case emqttd_retainer:retain(Msg1) of
-               ok     -> emqttd_message:unset_flag(Msg1);
-               ignore -> Msg1
-           end,
-    emqttd_pubsub:publish(Topic, Msg2).
+    case emqttd:run_hooks('message.publish', [], Msg) of
+        {ok, Msg1 = #mqtt_message{topic = Topic}} ->
+            %% Retain message first. Don't create retained topic.
+            Msg2 = case emqttd_retainer:retain(Msg1) of
+                       ok     -> emqttd_message:unset_flag(Msg1);
+                       ignore -> Msg1
+                   end,
+            emqttd_pubsub:publish(Topic, Msg2);
+        {stop, Msg1} ->
+            lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)])
+    end.
 
 %% @doc Unsubscribe a Topic
--spec unsubscribe(binary()) -> ok.
+-spec(unsubscribe(binary()) -> ok).
 unsubscribe(Topic) when is_binary(Topic) ->
     From = self(), call(server(From), {unsubscribe, From, Topic}).
 
 %% @doc Unsubscribe a Topic from a MQTT session
--spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok.
+-spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok).
 unsubscribe(ClientId, Topic, Qos) ->
     From = self(), call(server(From), {unsubscribe, From, ClientId, Topic, Qos}).
 
@@ -188,11 +196,11 @@ code_change(_OldVsn, State, _Extra) ->
 
 %% @private
 %% @doc Add a subscription.
--spec add_subscription_(binary(), binary(), mqtt_qos()) -> ok.
+-spec(add_subscription_(binary(), binary(), mqtt_qos()) -> ok).
 add_subscription_(ClientId, Topic, Qos) ->
     add_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}).
 
--spec add_subscription_(mqtt_subscription()) -> ok.
+-spec(add_subscription_(mqtt_subscription()) -> ok).
 add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
     mnesia:dirty_write(subscription, Subscription).
 
@@ -202,7 +210,7 @@ update_subscription_(OldSub, NewSub) ->
 
 %% @private
 %% @doc Delete a subscription
--spec del_subscription_(binary(), binary(), mqtt_qos()) -> ok.
+-spec(del_subscription_(binary(), binary(), mqtt_qos()) -> ok).
 del_subscription_(ClientId, Topic, Qos) ->
     del_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}).
 
@@ -246,7 +254,8 @@ trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
 %%--------------------------------------------------------------------
 
 set_subscription_stats() ->
-    emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', mnesia:table_info(subscription, size)).
+    emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
+                          mnesia:table_info(subscription, size)).
 
 %%--------------------------------------------------------------------
 

+ 51 - 40
src/emqttd_session.erl

@@ -288,49 +288,60 @@ handle_call(Req, _From, State) ->
 handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,
                                                                  subscriptions = Subscriptions}) ->
 
-    TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
-    ?LOG(info, "Subscribe ~p", [TopicTable], Session),
-    Subscriptions1 = lists:foldl(
-        fun({Topic, Qos}, SubDict) ->
-            case dict:find(Topic, SubDict) of
-                {ok, Qos} ->
-                    ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
-                    SubDict;
-                {ok, OldQos} ->
-                    emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos),
-                    ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
-                    dict:store(Topic, Qos, SubDict);
-                error ->
-                    emqttd:subscribe(ClientId, Topic, Qos),
-                    %%TODO: the design is ugly...
-                    %% <MQTT V3.1.1>: 3.8.4
-                    %% Where the Topic Filter is not identical to any existing Subscription’s filter,
-                    %% a new Subscription is created and all matching retained messages are sent.
-                    emqttd_retainer:dispatch(Topic, self()),
-
-                    dict:store(Topic, Qos, SubDict)
-            end
-        end, Subscriptions, TopicTable),
-    AckFun([Qos || {_, Qos} <- TopicTable]),
-    emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
-    hibernate(Session#session{subscriptions = Subscriptions1});
+    case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of
+        {ok, TopicTable} ->
+            ?LOG(info, "Subscribe ~p", [TopicTable], Session),
+            Subscriptions1 = lists:foldl(
+                fun({Topic, Qos}, SubDict) ->
+                    case dict:find(Topic, SubDict) of
+                        {ok, Qos} ->
+                            ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
+                            SubDict;
+                        {ok, OldQos} ->
+                            emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos),
+                            ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
+                            dict:store(Topic, Qos, SubDict);
+                        error ->
+                            emqttd:subscribe(ClientId, Topic, Qos),
+                            %%TODO: the design is ugly...
+                            %% <MQTT V3.1.1>: 3.8.4
+                            %% Where the Topic Filter is not identical to any existing Subscription’s filter,
+                            %% a new Subscription is created and all matching retained messages are sent.
+                            emqttd_retainer:dispatch(Topic, self()),
+
+                            dict:store(Topic, Qos, SubDict)
+                    end
+                end, Subscriptions, TopicTable),
+            AckFun([Qos || {_, Qos} <- TopicTable]),
+            emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable),
+            hibernate(Session#session{subscriptions = Subscriptions1});
+        {stop, TopicTable} ->
+            ?LOG(error, "Cannot subscribe: ~p", [TopicTable], Session),
+            hibernate(Session)
+    end;
+
 
 handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
                                                        subscriptions = Subscriptions}) ->
 
-    Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
-    ?LOG(info, "unsubscribe ~p", [Topics], Session),
-    Subscriptions1 = lists:foldl(
-        fun(Topic, SubDict) ->
-            case dict:find(Topic, SubDict) of
-                {ok, Qos} ->
-                    emqttd:unsubscribe(ClientId, Topic, Qos),
-                    dict:erase(Topic, SubDict);
-                error ->
-                    SubDict
-            end
-        end, Subscriptions, Topics),
-    hibernate(Session#session{subscriptions = Subscriptions1});
+    case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of
+        {ok, Topics} ->
+            ?LOG(info, "unsubscribe ~p", [Topics], Session),
+            Subscriptions1 = lists:foldl(
+                fun(Topic, SubDict) ->
+                    case dict:find(Topic, SubDict) of
+                        {ok, Qos} ->
+                            emqttd:unsubscribe(ClientId, Topic, Qos),
+                            dict:erase(Topic, SubDict);
+                        error ->
+                            SubDict
+                    end
+                end, Subscriptions, Topics),
+            hibernate(Session#session{subscriptions = Subscriptions1});
+        {stop, Topics} ->
+            ?LOG(info, "Cannot unsubscribe: ~p", [Topics], Session),
+            hibernate(Session)
+    end;
 
 handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
     ?LOG(warning, "destroyed", [], Session),
@@ -644,7 +655,7 @@ acked(PktId, Session = #session{client_id      = ClientId,
                                 awaiting_ack   = Awaiting}) ->
     case lists:keyfind(PktId, 1, InflightQ) of
         {_, Msg} ->
-            emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]);
+            emqttd:run_hooks('message.acked', [ClientId], Msg);
         false ->
             ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session)
     end,