Kaynağa Gözat

Implement the channel architecture

Feng Lee 6 yıl önce
ebeveyn
işleme
7774b85f81

+ 10 - 10
include/emqx.hrl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,6 +12,7 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
 -ifndef(EMQ_X_HRL).
 -define(EMQ_X_HRL, true).
@@ -19,10 +21,6 @@
 %% Banner
 %%--------------------------------------------------------------------
 
--define(COPYRIGHT, "Copyright (c) 2013-2019 EMQ Technologies Co., Ltd").
-
--define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0").
-
 -define(PROTOCOL_VERSION, "MQTT/5.0").
 
 -define(ERTS_MINIMUM_REQUIRED, "10.0").
@@ -47,8 +45,6 @@
 %% Message and Delivery
 %%--------------------------------------------------------------------
 
--record(session, {sid, pid}).
-
 -record(subscription, {topic, subid, subopts}).
 
 %% See 'Application Message' in MQTT Version 5.0
@@ -72,9 +68,12 @@
         }).
 
 -record(delivery, {
-          sender  :: pid(),      %% Sender of the delivery
-          message :: #message{}, %% The message delivered
-          results :: list()      %% Dispatches of the message
+          %% Sender of the delivery
+          sender  :: pid(),
+          %% The message delivered
+          message :: #message{},
+          %% Dispatches of the message
+          results :: list()
         }).
 
 %%--------------------------------------------------------------------
@@ -151,6 +150,7 @@
 %%--------------------------------------------------------------------
 %% Banned
 %%--------------------------------------------------------------------
+
 -type(banned_who() ::  {client_id,  binary()}
                      | {username,   binary()}
                      | {ip_address, inet:ip_address()}).

+ 25 - 21
src/emqx_banned.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,6 +12,7 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
 -module(emqx_banned).
 
@@ -42,14 +44,14 @@
         , code_change/3
         ]).
 
--define(TAB, ?MODULE).
+-define(BANNED_TAB, ?MODULE).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Mnesia bootstrap
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 mnesia(boot) ->
-    ok = ekka_mnesia:create_table(?TAB, [
+    ok = ekka_mnesia:create_table(?BANNED_TAB, [
                 {type, set},
                 {disc_copies, [node()]},
                 {record_name, banned},
@@ -57,7 +59,7 @@ mnesia(boot) ->
                 {storage_properties, [{ets, [{read_concurrency, true}]}]}]);
 
 mnesia(copy) ->
-    ok = ekka_mnesia:copy_table(?TAB).
+    ok = ekka_mnesia:copy_table(?BANNED_TAB).
 
 %% @doc Start the banned server.
 -spec(start_link() -> startlink_ret()).
@@ -66,41 +68,42 @@ start_link() ->
 
 -spec(check(emqx_types:credentials()) -> boolean()).
 check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) ->
-    ets:member(?TAB, {client_id, ClientId})
-        orelse ets:member(?TAB, {username, Username})
-            orelse ets:member(?TAB, {ipaddr, IPAddr}).
+    ets:member(?BANNED_TAB, {client_id, ClientId})
+        orelse ets:member(?BANNED_TAB, {username, Username})
+            orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}).
 
 -spec(add(emqx_types:banned()) -> ok).
 add(Banned) when is_record(Banned, banned) ->
-    mnesia:dirty_write(?TAB, Banned).
+    mnesia:dirty_write(?BANNED_TAB, Banned).
 
 -spec(delete({client_id, emqx_types:client_id()}
              | {username, emqx_types:username()}
              | {peername, emqx_types:peername()}) -> ok).
 delete(Key) ->
-    mnesia:dirty_delete(?TAB, Key).
+    mnesia:dirty_delete(?BANNED_TAB, Key).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% gen_server callbacks
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 init([]) ->
     {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "[Banned] unexpected call: ~p", [Req]),
+    ?LOG(error, "[Banned] Unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "[Banned] unexpected msg: ~p", [Msg]),
+    ?LOG(error, "[Banned] Unexpected msg: ~p", [Msg]),
     {noreply, State}.
 
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
-    mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]),
+    mnesia:async_dirty(fun expire_banned_items/1,
+                       [erlang:system_time(second)]),
     {noreply, ensure_expiry_timer(State), hibernate};
 
 handle_info(Info, State) ->
-    ?LOG(error, "[Banned] unexpected info: ~p", [Info]),
+    ?LOG(error, "[Banned] Unexpected info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, #{expiry_timer := TRef}) ->
@@ -109,9 +112,9 @@ terminate(_Reason, #{expiry_timer := TRef}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Internal functions
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 -ifdef(TEST).
 ensure_expiry_timer(State) ->
@@ -124,6 +127,7 @@ ensure_expiry_timer(State) ->
 expire_banned_items(Now) ->
     mnesia:foldl(
       fun(B = #banned{until = Until}, _Acc) when Until < Now ->
-              mnesia:delete_object(?TAB, B, sticky_write);
+              mnesia:delete_object(?BANNED_TAB, B, sticky_write);
          (_, _Acc) -> ok
-      end, ok, ?TAB).
+      end, ok, ?BANNED_TAB).
+

+ 14 - 1
src/emqx_channel.erl

@@ -30,7 +30,10 @@
         , stats/1
         ]).
 
--export([kick/1]).
+-export([ kick/1
+        , discard/1
+        , takeover/1
+        ]).
 
 -export([session/1]).
 
@@ -135,6 +138,12 @@ stats(#state{transport = Transport,
 kick(CPid) ->
     call(CPid, kick).
 
+discard(CPid) ->
+    call(CPid, discard).
+
+takeover(CPid) ->
+    call(CPid, takeover).
+
 session(CPid) ->
     call(CPid, session).
 
@@ -284,6 +293,10 @@ handle({call, From}, kick, State) ->
     ok = gen_statem:reply(From, ok),
     shutdown(kicked, State);
 
+handle({call, From}, discard, State) ->
+    ok = gen_statem:reply(From, ok),
+    shutdown(discard, State);
+
 handle({call, From}, session, State = #state{proto_state = ProtoState}) ->
     reply(From, emqx_protocol:session(ProtoState), State);
 

+ 323 - 115
src/emqx_cm.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,7 +12,9 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
+%% Channel Manager
 -module(emqx_cm).
 
 -behaviour(gen_server).
@@ -22,25 +25,39 @@
 
 -export([start_link/0]).
 
--export([ register_connection/1
-        , register_connection/2
-        , unregister_connection/1
-        , unregister_connection/2
+-export([ register_channel/1
+        , unregister_channel/1
+        , unregister_channel/2
         ]).
 
 -export([ get_conn_attrs/1
         , get_conn_attrs/2
         , set_conn_attrs/2
-        , set_conn_attrs/3
         ]).
 
 -export([ get_conn_stats/1
         , get_conn_stats/2
         , set_conn_stats/2
-        , set_conn_stats/3
         ]).
 
--export([lookup_conn_pid/1]).
+-export([ open_session/1
+        , discard_session/1
+        , resume_session/1
+        ]).
+
+-export([ get_session_attrs/1
+        , get_session_attrs/2
+        , set_session_attrs/2
+        ]).
+
+-export([ get_session_stats/1
+        , get_session_stats/2
+        , set_session_stats/2
+        ]).
+
+-export([ lookup_channels/1
+        , lookup_channels/2
+        ]).
 
 %% gen_server callbacks
 -export([ init/1
@@ -51,159 +68,350 @@
         , code_change/3
         ]).
 
-%% internal export
+%% Internal export
 -export([stats_fun/0]).
 
--define(CM, ?MODULE).
+-type(chan_pid() :: pid()).
+
+-opaque(attrs() :: #{atom() => term()}).
 
-%% ETS tables for connection management.
--define(CONN_TAB, emqx_conn).
--define(CONN_ATTRS_TAB, emqx_conn_attrs).
--define(CONN_STATS_TAB, emqx_conn_stats).
+-opaque(stats() :: #{atom() => integer()}).
 
+-export_type([attrs/0, stats/0]).
+
+%% Tables for channel management.
+-define(CHAN_TAB, emqx_channel).
+
+-define(CONN_TAB, emqx_connection).
+
+-define(SESSION_TAB, emqx_session).
+
+-define(SESSION_P_TAB, emqx_session_p).
+
+%% Chan stats
+-define(CHAN_STATS,
+        [{?CHAN_TAB, 'channels.count', 'channels.max'},
+         {?CONN_TAB, 'connections.count', 'connections.max'},
+         {?SESSION_TAB, 'sessions.count', 'sessions.max'},
+         {?SESSION_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'}
+        ]).
+
+%% Batch drain
 -define(BATCH_SIZE, 100000).
 
-%% @doc Start the connection manager.
+%% Server name
+-define(CM, ?MODULE).
+
+%% @doc Start the channel manager.
 -spec(start_link() -> startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?CM}, ?MODULE, [], []).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% API
-%%------------------------------------------------------------------------------
-
-%% @doc Register a connection.
--spec(register_connection(emqx_types:client_id()) -> ok).
-register_connection(ClientId) when is_binary(ClientId) ->
-    register_connection(ClientId, self()).
-
--spec(register_connection(emqx_types:client_id(), pid()) -> ok).
-register_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) ->
-    true = ets:insert(?CONN_TAB, {ClientId, ConnPid}),
-    notify({registered, ClientId, ConnPid}).
-
-%% @doc Unregister a connection.
--spec(unregister_connection(emqx_types:client_id()) -> ok).
-unregister_connection(ClientId) when is_binary(ClientId) ->
-    unregister_connection(ClientId, self()).
-
--spec(unregister_connection(emqx_types:client_id(), pid()) -> ok).
-unregister_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) ->
-    true = do_unregister_connection({ClientId, ConnPid}),
-    notify({unregistered, ConnPid}).
-
-do_unregister_connection(Conn) ->
-    true = ets:delete(?CONN_STATS_TAB, Conn),
-    true = ets:delete(?CONN_ATTRS_TAB, Conn),
-    true = ets:delete_object(?CONN_TAB, Conn).
-
-%% @doc Get conn attrs
--spec(get_conn_attrs(emqx_types:client_id()) -> list()).
-get_conn_attrs(ClientId) when is_binary(ClientId) ->
-    ConnPid = lookup_conn_pid(ClientId),
-    get_conn_attrs(ClientId, ConnPid).
-
--spec(get_conn_attrs(emqx_types:client_id(), pid()) -> list()).
-get_conn_attrs(ClientId, ConnPid) when is_binary(ClientId) ->
-    emqx_tables:lookup_value(?CONN_ATTRS_TAB, {ClientId, ConnPid}, []).
-
-%% @doc Set conn attrs
--spec(set_conn_attrs(emqx_types:client_id(), list()) -> true).
-set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) ->
-    set_conn_attrs(ClientId, self(), Attrs).
-
--spec(set_conn_attrs(emqx_types:client_id(), pid(), list()) -> true).
-set_conn_attrs(ClientId, ConnPid, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
-    Conn = {ClientId, ConnPid},
-    ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}).
-
-%% @doc Get conn stats
--spec(get_conn_stats(emqx_types:client_id()) -> list(emqx_stats:stats())).
-get_conn_stats(ClientId) when is_binary(ClientId) ->
-    ConnPid = lookup_conn_pid(ClientId),
-    get_conn_stats(ClientId, ConnPid).
-
--spec(get_conn_stats(emqx_types:client_id(), pid()) -> list(emqx_stats:stats())).
-get_conn_stats(ClientId, ConnPid) when is_binary(ClientId) ->
-    Conn = {ClientId, ConnPid},
-    emqx_tables:lookup_value(?CONN_STATS_TAB, Conn, []).
+%%--------------------------------------------------------------------
+
+%% @doc Register a channel.
+-spec(register_channel(emqx_types:client_id()) -> ok).
+register_channel(ClientId) when is_binary(ClientId) ->
+    register_channel(ClientId, self()).
+
+-spec(register_channel(emqx_types:client_id(), chan_pid()) -> ok).
+register_channel(ClientId, ChanPid) ->
+    Chan = {ClientId, ChanPid},
+    true = ets:insert(?CHAN_TAB, Chan),
+    ok = emqx_cm_registry:register_channel(Chan),
+    cast({registered, Chan}).
+
+%% @doc Unregister a channel.
+-spec(unregister_channel(emqx_types:client_id()) -> ok).
+unregister_channel(ClientId) when is_binary(ClientId) ->
+    unregister_channel(ClientId, self()).
+
+-spec(unregister_channel(emqx_types:client_id(), chan_pid()) -> ok).
+unregister_channel(ClientId, ChanPid) ->
+    Chan = {ClientId, ChanPid},
+    true = do_unregister_channel(Chan),
+    cast({unregistered, Chan}).
+
+%% @private
+do_unregister_channel(Chan) ->
+    ok = emqx_cm_registry:unregister_channel(Chan),
+    true = ets:delete_object(?SESSION_P_TAB, Chan),
+    true = ets:delete(?SESSION_TAB, Chan),
+    true = ets:delete(?CONN_TAB, Chan),
+    ets:delete_object(?CHAN_TAB, Chan).
+
+%% @doc Get conn attrs.
+-spec(get_conn_attrs(emqx_types:client_id()) -> maybe(attrs())).
+get_conn_attrs(ClientId) ->
+    with_channel(ClientId, fun(ChanPid) ->
+                                   get_conn_attrs(ClientId, ChanPid)
+                           end).
+
+-spec(get_conn_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())).
+get_conn_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
+    Chan = {ClientId, ChanPid},
+    try ets:lookup_element(?CONN_TAB, Chan, 2) of
+        Attrs -> Attrs
+    catch
+        error:badarg -> undefined
+    end;
+get_conn_attrs(ClientId, ChanPid) ->
+    rpc_call(node(ChanPid), get_conn_attrs, [ClientId, ChanPid]).
+
+%% @doc Set conn attrs.
+-spec(set_conn_attrs(emqx_types:client_id(), attrs()) -> ok).
+set_conn_attrs(ClientId, Attrs) when is_map(Attrs) ->
+    Chan = {ClientId, self()},
+    case ets:update_element(?CONN_TAB, Chan, {2, Attrs}) of
+        true  -> ok;
+        false -> true = ets:insert(?CONN_TAB, {Chan, Attrs, #{}}),
+                 ok
+    end.
+
+%% @doc Get conn stats.
+-spec(get_conn_stats(emqx_types:client_id()) -> maybe(stats())).
+get_conn_stats(ClientId) ->
+    with_channel(ClientId, fun(ChanPid) ->
+                                   get_conn_stats(ClientId, ChanPid)
+                           end).
+
+-spec(get_conn_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())).
+get_conn_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
+    Chan = {ClientId, ChanPid},
+    try ets:lookup_element(?CONN_TAB, Chan, 3) of
+        Stats -> Stats
+    catch
+        error:badarg -> undefined
+    end;
+get_conn_stats(ClientId, ChanPid) ->
+    rpc_call(node(ChanPid), get_conn_stats, [ClientId, ChanPid]).
 
 %% @doc Set conn stats.
--spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> true).
+-spec(set_conn_stats(emqx_types:client_id(), stats()) -> ok).
 set_conn_stats(ClientId, Stats) when is_binary(ClientId) ->
     set_conn_stats(ClientId, self(), Stats).
 
--spec(set_conn_stats(emqx_types:client_id(), pid(), list(emqx_stats:stats())) -> true).
-set_conn_stats(ClientId, ConnPid, Stats) when is_binary(ClientId), is_pid(ConnPid) ->
-    Conn = {ClientId, ConnPid},
-    ets:insert(?CONN_STATS_TAB, {Conn, Stats}).
+-spec(set_conn_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok).
+set_conn_stats(ClientId, ChanPid, Stats) ->
+    Chan = {ClientId, ChanPid},
+    _ = ets:update_element(?CONN_TAB, Chan, {3, Stats}),
+    ok.
+
+%% @doc Open a session.
+-spec(open_session(map()) -> {ok, emqx_session:session()}
+                           | {error, Reason :: term()}).
+open_session(Attrs = #{clean_start := true,
+                       client_id := ClientId}) ->
+    CleanStart = fun(_) ->
+                     ok = discard_session(ClientId),
+                     {ok, emqx_session:new(Attrs)}
+                 end,
+    emqx_cm_locker:trans(ClientId, CleanStart);
+
+open_session(Attrs = #{clean_start := false,
+                       client_id := ClientId}) ->
+    ResumeStart = fun(_) ->
+                      case resume_session(ClientId) of
+                          {ok, Session} ->
+                              {ok, Session, true};
+                          {error, not_found} ->
+                              {ok, emqx_session:new(Attrs)}
+                      end
+                  end,
+    emqx_cm_locker:trans(ClientId, ResumeStart).
+
+%% @doc Try to resume a session.
+-spec(resume_session(emqx_types:client_id())
+      -> {ok, emqx_session:session()} | {error, Reason :: term()}).
+resume_session(ClientId) ->
+    case lookup_channels(ClientId) of
+        [] -> {error, not_found};
+        [ChanPid] ->
+            emqx_channel:resume(ChanPid);
+        ChanPids ->
+            [ChanPid|StalePids] = lists:reverse(ChanPids),
+            ?LOG(error, "[SM] More than one channel found: ~p", [ChanPids]),
+            lists:foreach(fun(StalePid) ->
+                              catch emqx_channel:discard(StalePid)
+                          end, StalePids),
+            emqx_channel:resume(ChanPid)
+    end.
 
-%% @doc Lookup connection pid.
--spec(lookup_conn_pid(emqx_types:client_id()) -> maybe(pid())).
-lookup_conn_pid(ClientId) when is_binary(ClientId) ->
-    emqx_tables:lookup_value(?CONN_TAB, ClientId).
+%% @doc Discard all the sessions identified by the ClientId.
+-spec(discard_session(emqx_types:client_id()) -> ok).
+discard_session(ClientId) when is_binary(ClientId) ->
+    case lookup_channels(ClientId) of
+        [] -> ok;
+        ChanPids ->
+            lists:foreach(
+              fun(ChanPid) ->
+                      try emqx_channel:discard(ChanPid)
+                      catch
+                          _:Error:_Stk ->
+                              ?LOG(warning, "[SM] Failed to discard ~p: ~p", [ChanPid, Error])
+                      end
+              end, ChanPids)
+    end.
+
+%% @doc Get session attrs.
+-spec(get_session_attrs(emqx_types:client_id()) -> attrs()).
+get_session_attrs(ClientId) ->
+    with_channel(ClientId, fun(ChanPid) ->
+                                   get_session_attrs(ClientId, ChanPid)
+                           end).
+
+-spec(get_session_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())).
+get_session_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
+    Chan = {ClientId, ChanPid},
+    try ets:lookup_element(?SESSION_TAB, Chan, 2) of
+        Attrs -> Attrs
+    catch
+        error:badarg -> undefined
+    end;
+get_session_attrs(ClientId, ChanPid) ->
+    rpc_call(node(ChanPid), get_session_attrs, [ClientId, ChanPid]).
+
+%% @doc Set session attrs.
+-spec(set_session_attrs(emqx_types:client_id(), attrs()) -> ok).
+set_session_attrs(ClientId, Attrs) when is_binary(ClientId) ->
+    Chan = {ClientId, self()},
+    case ets:update_element(?SESSION_TAB, Chan, {2, Attrs}) of
+        true -> ok;
+        false ->
+            true = ets:insert(?SESSION_TAB, {Chan, Attrs, #{}}),
+            is_clean_start(Attrs) orelse ets:insert(?SESSION_P_TAB, Chan),
+            ok
+    end.
+
+%% @doc Is clean start?
+is_clean_start(#{clean_start := false}) -> false;
+is_clean_start(_Attrs) -> true.
+
+%% @doc Get session stats.
+-spec(get_session_stats(emqx_types:client_id()) -> stats()).
+get_session_stats(ClientId) ->
+    with_channel(ClientId, fun(ChanPid) ->
+                                   get_session_stats(ClientId, ChanPid)
+                           end).
+
+-spec(get_session_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())).
+get_session_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
+    Chan = {ClientId, ChanPid},
+    try ets:lookup_element(?SESSION_TAB, Chan, 3) of
+        Stats -> Stats
+    catch
+        error:badarg -> undefined
+    end;
+get_session_stats(ClientId, ChanPid) ->
+    rpc_call(node(ChanPid), get_session_stats, [ClientId, ChanPid]).
+
+%% @doc Set session stats.
+-spec(set_session_stats(emqx_types:client_id(), stats()) -> ok).
+set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
+    set_session_stats(ClientId, self(), Stats).
+
+-spec(set_session_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok).
+set_session_stats(ClientId, ChanPid, Stats) ->
+    Chan = {ClientId, ChanPid},
+    _ = ets:update_element(?SESSION_TAB, Chan, {3, Stats}),
+    ok.
+
+with_channel(ClientId, Fun) ->
+    case lookup_channels(ClientId) of
+        []    -> undefined;
+        [Pid] -> Fun(Pid);
+        Pids  -> Fun(lists:last(Pids))
+    end.
 
-notify(Msg) ->
-    gen_server:cast(?CM, {notify, Msg}).
+%% @doc Lookup channels.
+-spec(lookup_channels(emqx_types:client_id()) -> list(chan_pid())).
+lookup_channels(ClientId) ->
+    lookup_channels(global, ClientId).
 
-%%-----------------------------------------------------------------------------
+%% @doc Lookup local or global channels.
+-spec(lookup_channels(local | global, emqx_types:client_id()) -> list(chan_pid())).
+lookup_channels(global, ClientId) ->
+    case emqx_cm_registry:is_enabled() of
+        true ->
+            emqx_cm_registry:lookup_channels(ClientId);
+        false ->
+            lookup_channels(local, ClientId)
+    end;
+
+lookup_channels(local, ClientId) ->
+    [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
+
+%% @private
+rpc_call(Node, Fun, Args) ->
+    case rpc:call(Node, ?MODULE, Fun, Args) of
+        {badrpc, Reason} -> error(Reason);
+        Res -> Res
+    end.
+
+%% @private
+cast(Msg) -> gen_server:cast(?CM, Msg).
+
+%%--------------------------------------------------------------------
 %% gen_server callbacks
-%%-----------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 init([]) ->
-    TabOpts = [public, set, {write_concurrency, true}],
-    ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
-    ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
-    ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
-    ok = emqx_stats:update_interval(conn_stats, fun ?MODULE:stats_fun/0),
-    {ok, #{conn_pmon => emqx_pmon:new()}}.
+    TabOpts = [public, {write_concurrency, true}],
+    ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
+    ok = emqx_tables:new(?CONN_TAB, [set, compressed | TabOpts]),
+    ok = emqx_tables:new(?SESSION_TAB, [set, compressed | TabOpts]),
+    ok = emqx_tables:new(?SESSION_P_TAB, [bag | TabOpts]),
+    ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
+    {ok, #{chan_pmon => emqx_pmon:new()}}.
 
 handle_call(Req, _From, State) ->
     ?LOG(error, "[CM] Unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
-handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
-    {noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
+handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
+    PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
+    {noreply, State#{chan_pmon := PMon1}};
 
-handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
-    {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
+handle_cast({unregistered, {_ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
+    PMon1 = emqx_pmon:demonitor(ChanPid, PMon),
+    {noreply, State#{chan_pmon := PMon1}};
 
 handle_cast(Msg, State) ->
     ?LOG(error, "[CM] Unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
-    ConnPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
-    {Items, PMon1} = emqx_pmon:erase_all(ConnPids, PMon),
-    ok = emqx_pool:async_submit(
-           fun lists:foreach/2, [fun clean_down/1, Items]),
-    {noreply, State#{conn_pmon := PMon1}};
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
+    ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
+    {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
+    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
+    {noreply, State#{chan_pmon := PMon1}};
 
 handle_info(Info, State) ->
     ?LOG(error, "[CM] Unexpected info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, _State) ->
-    emqx_stats:cancel_update(conn_stats).
+    emqx_stats:cancel_update(chan_stats).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Internal functions
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
-clean_down({Pid, ClientId}) ->
-    Conn = {ClientId, Pid},
-    case ets:member(?CONN_TAB, ClientId)
-         orelse ets:member(?CONN_ATTRS_TAB, Conn) of
-        true ->
-            do_unregister_connection(Conn);
-        false -> false
-    end.
+clean_down({ChanPid, ClientId}) ->
+    Chan = {ClientId, ChanPid},
+    do_unregister_channel(Chan).
 
 stats_fun() ->
-    case ets:info(?CONN_TAB, size) of
+    lists:foreach(fun update_stats/1, ?CHAN_STATS).
+
+update_stats({Tab, Stat, MaxStat}) ->
+    case ets:info(Tab, size) of
         undefined -> ok;
-        Size -> emqx_stats:setstat('connections.count', 'connections.max', Size)
+        Size -> emqx_stats:setstat(Stat, MaxStat, Size)
     end.
+

+ 66 - 0
src/emqx_cm_locker.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cm_locker).
+
+-include("emqx.hrl").
+-include("types.hrl").
+
+-export([start_link/0]).
+
+-export([ trans/2
+        , trans/3
+        , lock/1
+        , lock/2
+        , unlock/1
+        ]).
+
+-spec(start_link() -> startlink_ret()).
+start_link() ->
+    ekka_locker:start_link(?MODULE).
+
+-spec(trans(emqx_types:client_id(), fun(([node()]) -> any())) -> any()).
+trans(ClientId, Fun) ->
+    trans(ClientId, Fun, undefined).
+
+-spec(trans(maybe(emqx_types:client_id()),
+            fun(([node()])-> any()), ekka_locker:piggyback()) -> any()).
+trans(undefined, Fun, _Piggyback) ->
+    Fun([]);
+trans(ClientId, Fun, Piggyback) ->
+    case lock(ClientId, Piggyback) of
+        {true, Nodes} ->
+            try Fun(Nodes) after unlock(ClientId) end;
+        {false, _Nodes} ->
+            {error, client_id_unavailable}
+    end.
+
+-spec(lock(emqx_types:client_id()) -> ekka_locker:lock_result()).
+lock(ClientId) ->
+    ekka_locker:acquire(?MODULE, ClientId, strategy()).
+
+-spec(lock(emqx_types:client_id(), ekka_locker:piggyback()) -> ekka_locker:lock_result()).
+lock(ClientId, Piggyback) ->
+    ekka_locker:acquire(?MODULE, ClientId, strategy(), Piggyback).
+
+-spec(unlock(emqx_types:client_id()) -> {boolean(), [node()]}).
+unlock(ClientId) ->
+    ekka_locker:release(?MODULE, ClientId, strategy()).
+
+-spec(strategy() -> local | one | quorum | all).
+strategy() ->
+    emqx_config:get_env(session_locking_strategy, quorum).
+

+ 55 - 40
src/emqx_cm_registry.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,8 +12,10 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
--module(emqx_sm_registry).
+%% Global Channel Registry
+-module(emqx_cm_registry).
 
 -behaviour(gen_server).
 
@@ -22,12 +25,14 @@
 
 -export([start_link/0]).
 
--export([ is_enabled/0
-        , register_session/1
-        , lookup_session/1
-        , unregister_session/1
+-export([is_enabled/0]).
+
+-export([ register_channel/1
+        , unregister_channel/1
         ]).
 
+-export([lookup_channels/1]).
+
 %% gen_server callbacks
 -export([ init/1
         , handle_call/3
@@ -38,57 +43,67 @@
         ]).
 
 -define(REGISTRY, ?MODULE).
--define(TAB, emqx_session_registry).
--define(LOCK, {?MODULE, cleanup_sessions}).
-
--record(global_session, {sid, pid}).
+-define(TAB, emqx_channel_registry).
+-define(LOCK, {?MODULE, cleanup_down}).
 
--type(session_pid() :: pid()).
-
-%%------------------------------------------------------------------------------
-%% APIs
-%%------------------------------------------------------------------------------
+-record(channel, {chid, pid}).
 
-%% @doc Start the global session manager.
+%% @doc Start the global channel registry.
 -spec(start_link() -> startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
 
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+%% @doc Is the global registry enabled?
 -spec(is_enabled() -> boolean()).
 is_enabled() ->
-    emqx_config:get_env(enable_session_registry, true).
+    emqx_config:get_env(enable_channel_registry, true).
 
--spec(lookup_session(emqx_types:client_id()) -> list(session_pid())).
-lookup_session(ClientId) ->
-    [SessPid || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)].
+%% @doc Register a global channel.
+-spec(register_channel(emqx_types:client_id()
+                    | {emqx_types:client_id(), pid()}) -> ok).
+register_channel(ClientId) when is_binary(ClientId) ->
+    register_channel({ClientId, self()});
 
--spec(register_session({emqx_types:client_id(), session_pid()}) -> ok).
-register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
+register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
-        true -> mnesia:dirty_write(?TAB, record(ClientId, SessPid));
+        true -> mnesia:dirty_write(?TAB, record(ClientId, ChanPid));
         false -> ok
     end.
 
--spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok).
-unregister_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
+%% @doc Unregister a global channel.
+-spec(unregister_channel(emqx_types:client_id()
+                      | {emqx_types:client_id(), pid()}) -> ok).
+unregister_channel(ClientId) when is_binary(ClientId) ->
+    unregister_channel({ClientId, self()});
+
+unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
-        true -> mnesia:dirty_delete_object(?TAB, record(ClientId, SessPid));
+        true -> mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid));
         false -> ok
     end.
 
-record(ClientId, SessPid) ->
-    #global_session{sid = ClientId, pid = SessPid}.
+%% @doc Lookup the global channels.
+-spec(lookup_channels(emqx_types:client_id()) -> list(pid())).
+lookup_channels(ClientId) ->
+    [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)].
+
+record(ClientId, ChanPid) ->
+    #channel{chid = ClientId, pid = ChanPid}.
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% gen_server callbacks
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 init([]) ->
     ok = ekka_mnesia:create_table(?TAB, [
                 {type, bag},
                 {ram_copies, [node()]},
-                {record_name, global_session},
-                {attributes, record_info(fields, global_session)},
+                {record_name, channel},
+                {attributes, record_info(fields, channel)},
                 {storage_properties, [{ets, [{read_concurrency, true},
                                              {write_concurrency, true}]}]}]),
     ok = ekka_mnesia:copy_table(?TAB),
@@ -106,7 +121,7 @@ handle_cast(Msg, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
     global:trans({?LOCK, self()},
                  fun() ->
-                     mnesia:transaction(fun cleanup_sessions/1, [Node])
+                     mnesia:transaction(fun cleanup_channels/1, [Node])
                  end),
     {noreply, State};
 
@@ -123,14 +138,14 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Internal functions
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
-cleanup_sessions(Node) ->
-    Pat = [{#global_session{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
-    lists:foreach(fun delete_session/1, mnesia:select(?TAB, Pat, write)).
+cleanup_channels(Node) ->
+    Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
+    lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)).
 
-delete_session(Session) ->
-    mnesia:delete_object(?TAB, Session, write).
+delete_channel(Chan) ->
+    mnesia:delete_object(?TAB, Chan, write).
 

+ 29 - 23
src/emqx_cm_sup.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,8 +12,9 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
--module(emqx_sm_sup).
+-module(emqx_cm_sup).
 
 -behaviour(supervisor).
 
@@ -24,41 +26,45 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    %% Session locker
+    Banned = #{id => banned,
+               start => {emqx_banned, start_link, []},
+               restart => permanent,
+               shutdown => 1000,
+               type => worker,
+               modules => [emqx_banned]},
+    Flapping = #{id => flapping,
+                 start => {emqx_flapping, start_link, []},
+                 restart => permanent,
+                 shutdown => 1000,
+                 type => worker,
+                 modules => [emqx_flapping]},
+    %% Channel locker
     Locker = #{id => locker,
-               start => {emqx_sm_locker, start_link, []},
+               start => {emqx_cm_locker, start_link, []},
                restart => permanent,
                shutdown => 5000,
                type => worker,
-               modules => [emqx_sm_locker]
+               modules => [emqx_cm_locker]
               },
-    %% Session registry
+    %% Channel registry
     Registry = #{id => registry,
-                 start => {emqx_sm_registry, start_link, []},
+                 start => {emqx_cm_registry, start_link, []},
                  restart => permanent,
                  shutdown => 5000,
                  type => worker,
-                 modules => [emqx_sm_registry]
+                 modules => [emqx_cm_registry]
                 },
-    %% Session Manager
+    %% Channel Manager
     Manager = #{id => manager,
-                start => {emqx_sm, start_link, []},
+                start => {emqx_cm, start_link, []},
                 restart => permanent,
                 shutdown => 5000,
                 type => worker,
-                modules => [emqx_sm]
+                modules => [emqx_cm]
                },
-    %% Session Sup
-    SessSpec = #{start => {emqx_session, start_link, []},
-                 shutdown => brutal_kill,
-                 clean_down => fun emqx_sm:clean_down/1
+    SupFlags = #{strategy => one_for_one,
+                 intensity => 100,
+                 period => 10
                 },
-    SessionSup = #{id => session_sup,
-                   start => {emqx_session_sup, start_link, [SessSpec ]},
-                   restart => transient,
-                   shutdown => infinity,
-                   type => supervisor,
-                   modules => [emqx_session_sup]
-                  },
-    {ok, {{rest_for_one, 10, 3600}, [Locker, Registry, Manager, SessionSup]}}.
+    {ok, {SupFlags, [Banned, Flapping, Locker, Registry, Manager]}}.
 

+ 23 - 18
src/emqx_flapping.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,6 +12,9 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc This module is used to garbage clean the flapping records.
 
 -module(emqx_flapping).
 
@@ -19,31 +23,29 @@
 
 -behaviour(gen_statem).
 
--export([start_link/1]).
-
-%% This module is used to garbage clean the flapping records
+-export([start_link/0]).
 
 %% gen_statem callbacks
--export([ terminate/3
-        , code_change/4
-        , init/1
+-export([ init/1
         , initialized/3
         , callback_mode/0
+        , terminate/3
+        , code_change/4
         ]).
 
 -define(FLAPPING_TAB, ?MODULE).
 
 -export([check/3]).
 
--record(flapping,
-        { client_id   :: binary()
-        , check_count :: integer()
-        , timestamp   :: integer()
-        }).
+-record(flapping, {
+          client_id :: binary(),
+          check_count :: integer(),
+          timestamp :: integer()
+         }).
 
 -type(flapping_record() :: #flapping{}).
--type(flapping_state() :: flapping | ok).
 
+-type(flapping_state() :: flapping | ok).
 
 %% @doc This function is used to initialize flapping records
 %% the expiry time unit is minutes.
@@ -96,18 +98,20 @@ check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
 %%--------------------------------------------------------------------
 %% gen_statem callbacks
 %%--------------------------------------------------------------------
--spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()).
-start_link(TimerInterval) ->
-    gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []).
 
-init([TimerInterval]) ->
+-spec(start_link() -> startlink_ret()).
+start_link() ->
+    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+    Interval = emqx_config:get_env(flapping_clean_interval, 3600000),
     TabOpts = [ public
               , set
               , {keypos, 2}
               , {write_concurrency, true}
               , {read_concurrency, true}],
     ok = emqx_tables:new(?FLAPPING_TAB, TabOpts),
-    {ok, initialized, #{timer_interval => TimerInterval}}.
+    {ok, initialized, #{timer_interval => Interval}}.
 
 callback_mode() -> [state_functions, state_enter].
 
@@ -134,3 +138,4 @@ clean_expired_records() ->
     NowTime = emqx_time:now_secs(),
     MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}],
     ets:select_delete(?FLAPPING_TAB, MatchSpec).
+

+ 22 - 13
src/emqx_hooks.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,6 +12,7 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
 -module(emqx_hooks).
 
@@ -19,7 +21,9 @@
 -include("logger.hrl").
 -include("types.hrl").
 
--export([start_link/0, stop/0]).
+-export([ start_link/0
+        , stop/0
+        ]).
 
 %% Hooks API
 -export([ add/2
@@ -52,11 +56,16 @@
 -type(action() :: function() | mfa()).
 -type(filter() :: function() | mfa()).
 
--record(callback, {action   :: action(),
-                   filter   :: filter(),
-                   priority :: integer()}).
+-record(callback, {
+          action :: action(),
+          filter :: filter(),
+          priority :: integer()
+         }).
 
--record(hook, {name :: hookpoint(), callbacks :: list(#callback{})}).
+-record(hook, {
+          name :: hookpoint(),
+          callbacks :: list(#callback{})
+         }).
 
 -export_type([hookpoint/0, action/0, filter/0]).
 
@@ -65,15 +74,16 @@
 
 -spec(start_link() -> startlink_ret()).
 start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{hibernate_after, 1000}]).
+    gen_server:start_link({local, ?SERVER},
+                          ?MODULE, [], [{hibernate_after, 1000}]).
 
 -spec(stop() -> ok).
 stop() ->
     gen_server:stop(?SERVER, normal, infinity).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Hooks API
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 %% @doc Register a callback
 -spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)).
@@ -111,7 +121,6 @@ run(HookPoint, Args) ->
 run_fold(HookPoint, Args, Acc) ->
     do_run_fold(lookup(HookPoint), Args, Acc).
 
-
 do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
     case filter_passed(Filter, Args) andalso execute(Action, Args) of
         %% stop the hook chain and return
@@ -163,12 +172,12 @@ lookup(HookPoint) ->
         [] -> []
     end.
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% gen_server callbacks
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 init([]) ->
-    ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}, protected]),
+    ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
     {ok, #{}}.
 
 handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->

+ 14 - 4
src/emqx_keepalive.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,6 +12,7 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
 -module(emqx_keepalive).
 
@@ -20,15 +22,22 @@
         , cancel/1
         ]).
 
--record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
+-record(keepalive, {
+          statfun,
+          statval,
+          tsec,
+          tmsg,
+          tref,
+          repeat = 0
+         }).
 
 -opaque(keepalive() :: #keepalive{}).
 
 -export_type([keepalive/0]).
 
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 %% APIs
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 %% @doc Start a keepalive
 -spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}).
@@ -79,3 +88,4 @@ cancel(_) ->
 
 timer(Secs, Msg) ->
     erlang:send_after(timer:seconds(Secs), self(), Msg).
+

+ 16 - 12
src/emqx_logger_formatter.erl

@@ -34,18 +34,22 @@
 -define(IS_STRING(String),
         (is_list(String) orelse is_binary(String))).
 
-%%%-----------------------------------------------------------------
-%%% Types
--type config() :: #{chars_limit     => pos_integer() | unlimited,
-                    depth           => pos_integer() | unlimited,
-                    max_size        => pos_integer() | unlimited,
-                    report_cb       => logger:report_cb(),
-                    quit        => template()}.
--type template() :: [metakey() | {metakey(),template(),template()} | string()].
--type metakey() :: atom() | [atom()].
-
-%%%-----------------------------------------------------------------
-%%% API
+%%--------------------------------------------------------------------
+%% Types
+
+-type(config() :: #{chars_limit => pos_integer() | unlimited,
+                    depth       => pos_integer() | unlimited,
+                    max_size    => pos_integer() | unlimited,
+                    report_cb   => logger:report_cb(),
+                    quit        => template()}).
+
+-type(template() :: [metakey() | {metakey(),template(),template()} | string()]).
+
+-type(metakey() :: atom() | [atom()]).
+
+%%--------------------------------------------------------------------
+%% API
+
 -spec format(LogEvent,Config) -> unicode:chardata() when
       LogEvent :: logger:log_event(),
       Config :: config().

+ 17 - 10
src/emqx_modules.erl

@@ -1,4 +1,5 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -11,6 +12,7 @@
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
+%%--------------------------------------------------------------------
 
 -module(emqx_modules).
 
@@ -20,20 +22,25 @@
         , unload/0
         ]).
 
+%% @doc Load all the extended modules.
 -spec(load() -> ok).
 load() ->
     ok = emqx_mod_acl_internal:load([]),
-    lists:foreach(
-      fun({Mod, Env}) ->
-        ok = Mod:load(Env),
-        ?LOG(info, "[Modules] Load ~s module successfully.", [Mod])
-      end, emqx_config:get_env(modules, [])).
+    lists:foreach(fun load/1, modules()).
 
+load({Mod, Env}) ->
+    ok = Mod:load(Env),
+    ?LOG(info, "[Modules] Load ~s module successfully.", [Mod]).
+
+modules() ->
+    emqx_config:get_env(modules, []).
+
+%% @doc Unload all the extended modules.
 -spec(unload() -> ok).
 unload() ->
     ok = emqx_mod_acl_internal:unload([]),
-    lists:foreach(
-      fun({Mod, Env}) ->
-          Mod:unload(Env) end,
-      emqx_config:get_env(modules, [])).
+    lists:foreach(fun unload/1, modules()).
+
+unload({Mod, Env}) ->
+    Mod:unload(Env).