Sfoglia il codice sorgente

Merge branch '0.13' of github.com:emqtt/emqttd into 0.13

Feng Lee 10 anni fa
parent
commit
f0ffdfb095

+ 5 - 0
include/emqttd_protocol.hrl

@@ -209,6 +209,11 @@
     #mqtt_packet{header   = #mqtt_packet_header{type = ?CONNACK},
                  variable = #mqtt_packet_connack{return_code = ReturnCode}}).
 
+-define(CONNACK_PACKET(ReturnCode, SessPresent),
+    #mqtt_packet{header   = #mqtt_packet_header{type = ?CONNACK},
+                 variable = #mqtt_packet_connack{ack_flags = SessPresent,
+                                                 return_code = ReturnCode}}).
+
 -define(PUBLISH_PACKET(Qos, PacketId),
     #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBLISH,
                                                 qos = Qos},

+ 2 - 2
rebar.config

@@ -29,8 +29,8 @@
 {deps, [
 	{gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}},
 	{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
-	{esockd, "3.*", {git, "git://github.com/emqtt/esockd.git", {branch, "3.0"}}},
-	{mochiweb, ".*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "4.0"}}}
+	{esockd, "3.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}},
+	{mochiweb, "4.*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "master"}}}
 ]}.
 
 {recursive_cmds, [ct, eunit, clean]}.

+ 17 - 17
rel/files/acl.config

@@ -1,21 +1,21 @@
 %%%-----------------------------------------------------------------------------
-%% 
-%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL)
-%% 
-%% -type who() :: all | binary() |
-%%               {ipaddr, esockd_access:cidr()} |
-%%               {client, binary()} |
-%%               {user, binary()}.
-%%
-%% -type access() :: subscribe | publish | pubsub.
-%%
-%% -type topic() :: binary().
-%%
-%% -type rule() :: {allow, all} |
-%%                {allow, who(), access(), list(topic())} |
-%%                {deny, all} |
-%%                {deny, who(), access(), list(topic())}.
-%%
+%%%
+%%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL)
+%%%
+%%% -type who() :: all | binary() |
+%%%                {ipaddr, esockd_access:cidr()} |
+%%%                {client, binary()} |
+%%%                {user, binary()}.
+%%%
+%%% -type access() :: subscribe | publish | pubsub.
+%%%
+%%% -type topic() :: binary().
+%%%
+%%% -type rule() :: {allow, all} |
+%%%                 {allow, who(), access(), list(topic())} |
+%%%                 {deny, all} |
+%%%                 {deny, who(), access(), list(topic())}.
+%%%
 %%%-----------------------------------------------------------------------------
 
 {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.

+ 1 - 1
rel/files/emqttd.config.production

@@ -246,7 +246,7 @@
         {long_gc, false},
 
         %% Long Schedule(ms)
-        {long_schedule, 50},
+        {long_schedule, 100},
 
         %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
         %% 8 * 1024 * 1024

+ 1 - 10
src/emqttd.erl

@@ -91,8 +91,7 @@ open_listener({https, Port, Options}) ->
 	mochiweb:start_http(Port, Options, MFArgs).
 
 open_listener(Protocol, Port, Options) ->
-    Rl = rate_limiter(emqttd_opts:g(rate_limit, Options)),
-    MFArgs = {emqttd_client, start_link, [[{rate_limiter, Rl} | env(mqtt)]]},
+    MFArgs = {emqttd_client, start_link, [env(mqtt)]},
     esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs).
 
 merge_sockopts(Options) ->
@@ -100,14 +99,6 @@ merge_sockopts(Options) ->
                                  proplists:get_value(sockopts, Options, [])),
     emqttd_opts:merge(Options, [{sockopts, SockOpts}]).
 
-%% TODO: will refactor in 0.14.0 release.
-rate_limiter(undefined) ->
-    undefined;
-rate_limiter(Config) ->
-    Bps = fun(S) -> list_to_integer(string:strip(S)) * 1024 end,
-    [Burst, Rate] = [Bps(S) || S <- string:tokens(Config, ",")],
-    esockd_rate_limiter:new(Burst, Rate).
-
 %%------------------------------------------------------------------------------
 %% @doc Close Listeners
 %% @end

+ 21 - 15
src/emqttd_access_control.erl

@@ -24,7 +24,6 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
-
 -module(emqttd_access_control).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -36,14 +35,13 @@
 -define(SERVER, ?MODULE).
 
 %% API Function Exports
--export([start_link/0,
-         start_link/1,
+-export([start_link/0, start_link/1,
          auth/2,       % authentication
          check_acl/3,  % acl check
          reload_acl/0, % reload acl
-         register_mod/3,
-         unregister_mod/2,
          lookup_mods/1,
+         register_mod/3, register_mod/4,
+         unregister_mod/2,
          stop/0]).
 
 %% gen_server callbacks
@@ -77,7 +75,7 @@ auth(Client, Password) when is_record(Client, mqtt_client) ->
     auth(Client, Password, lookup_mods(auth)).
 auth(_Client, _Password, []) ->
     {error, "No auth module to check!"};
-auth(Client, Password, [{Mod, State} | Mods]) ->
+auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
     case Mod:check(Client, Password, State) of
         ok -> ok;
         {error, Reason} -> {error, Reason};
@@ -100,7 +98,7 @@ check_acl(Client, PubSub, Topic) when ?IS_PUBSUB(PubSub) ->
 check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) ->
     lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
     allow;
-check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->
+check_acl(Client, PubSub, Topic, [{M, State, _Seq}|AclMods]) ->
     case M:check_acl({Client, PubSub, Topic}, State) of
         allow  -> allow;
         deny   -> deny;
@@ -113,7 +111,7 @@ check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->
 %%------------------------------------------------------------------------------
 -spec reload_acl() -> list() | {error, any()}.
 reload_acl() ->
-    [M:reload_acl(State) || {M, State} <- lookup_mods(acl)].
+    [M:reload_acl(State) || {M, State, _Seq} <- lookup_mods(acl)].
 
 %%------------------------------------------------------------------------------
 %% @doc Register authentication or ACL module
@@ -121,7 +119,11 @@ reload_acl() ->
 %%------------------------------------------------------------------------------
 -spec register_mod(Type :: auth | acl, Mod :: atom(), Opts :: list()) -> ok | {error, any()}.
 register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
-    gen_server:call(?SERVER, {register_mod, Type, Mod, Opts}).
+    register_mod(Type, Mod, Opts, 0).
+
+-spec register_mod(auth | acl, atom(), list(), pos_integer()) -> ok | {error, any()}.
+register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
+    gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
 
 %%------------------------------------------------------------------------------
 %% @doc Unregister authentication or ACL module
@@ -172,22 +174,26 @@ init_mods(acl, AclMods) ->
 init_mod(Fun, Name, Opts) ->
     Module = Fun(Name),
     {ok, State} = Module:init(Opts),
-    {Module, State}.
+    {Module, State, 0}.
 
-handle_call({register_mod, Type, Mod, Opts}, _From, State) ->
+handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
     Mods = lookup_mods(Type),
     Reply =
     case lists:keyfind(Mod, 1, Mods) of
-        false -> 
+        false ->
             case catch Mod:init(Opts) of
-                {ok, ModState} -> 
-                    ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), [{Mod, ModState}|Mods]}),
+                {ok, ModState} ->
+                    NewMods =
+                    lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) ->
+                                   Seq1 >= Seq2
+                               end, [{Mod, ModState, Seq} | Mods]),
+                    ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}),
                     ok;
                 {'EXIT', Error} ->
                     lager:error("Access Control: register ~s error - ~p", [Mod, Error]),
                     {error, Error}
             end;
-        _ -> 
+        _ ->
             {error, existed}
     end,
     {reply, Reply, State};

+ 20 - 4
src/emqttd_access_rule.erl

@@ -24,7 +24,6 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
-
 -module(emqttd_access_rule).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -49,17 +48,22 @@
 
 -export([compile/1, match/3]).
 
+-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
+
 %%------------------------------------------------------------------------------
 %% @doc Compile access rule
 %% @end
 %%------------------------------------------------------------------------------
-compile({A, all}) when (A =:= allow) orelse (A =:= deny) ->
+compile({A, all}) when ?ALLOW_DENY(A) ->
     {A, all};
 
-compile({A, Who, Access, TopicFilters}) when (A =:= allow) orelse (A =:= deny) ->
+compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A) andalso is_binary(Topic) ->
+    {A, compile(who, Who), Access, [compile(topic, Topic)]};
+
+compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) ->
     {A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}.
 
-compile(who, all) -> 
+compile(who, all) ->
     all;
 compile(who, {ipaddr, CIDR}) ->
     {Start, End} = esockd_access:range(CIDR),
@@ -72,6 +76,10 @@ compile(who, {user, all}) ->
     {user, all};
 compile(who, {user, Username}) ->
     {user, bin(Username)};
+compile(who, {'and', Conds}) when is_list(Conds) ->
+    {'and', [compile(who, Cond) || Cond <- Conds]};
+compile(who, {'or', Conds}) when is_list(Conds) ->
+    {'or', [compile(who, Cond) || Cond <- Conds]};
 
 compile(topic, {eq, Topic}) ->
     {eq, emqttd_topic:words(bin(Topic))};
@@ -120,6 +128,14 @@ match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) ->
 match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, {_CDIR, Start, End}}) ->
     I = esockd_access:atoi(IP),
     I >= Start andalso I =< End;
+match_who(Client, {'and', Conds}) when is_list(Conds) ->
+    lists:foldl(fun(Who, Allow) ->
+                  match_who(Client, Who) andalso Allow
+                end, true, Conds);
+match_who(Client, {'or', Conds}) when is_list(Conds) ->
+    lists:foldl(fun(Who, Allow) ->
+                  match_who(Client, Who) orelse Allow
+                end, false, Conds);
 match_who(_Client, _Who) ->
     false.
 

+ 1 - 1
src/emqttd_app.erl

@@ -82,7 +82,7 @@ start_servers(Sup) ->
                {"emqttd session supervisor", {supervisor, emqttd_session_sup}},
                {"emqttd broker", emqttd_broker},
                {"emqttd alarm", emqttd_alarm},
-               {"emqttd mode supervisor", emqttd_mod_sup},
+               {"emqttd mod supervisor", emqttd_mod_sup},
                {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
                {"emqttd access control", emqttd_access_control},
                {"emqttd system monitor", emqttd_sysmon, emqttd:env(sysmon)}],

+ 1 - 1
src/emqttd_auth_username.erl

@@ -105,7 +105,7 @@ init(Opts) ->
 	mnesia:create_table(?AUTH_USERNAME_TAB, [
 		{disc_copies, [node()]},
 		{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
-	mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies),
+	mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies),
     emqttd_ctl:register_cmd(users, {?MODULE, cli}, []),
     {ok, Opts}.
 

+ 2 - 2
src/emqttd_cli.erl

@@ -72,8 +72,8 @@ status([]) ->
     case lists:keysearch(emqttd, 1, application:which_applications()) of
     false ->
         ?PRINT_MSG("emqttd is not running~n");
-    {value,_Version} ->
-        ?PRINT_MSG("emqttd is running~n")
+    {value, {emqttd, _Desc, Vsn}} ->
+        ?PRINT("emqttd ~s is running~n", [Vsn])
     end;
 status(_) ->
      ?PRINT_CMD("status", "query broker status").

+ 8 - 8
src/emqttd_client.erl

@@ -124,7 +124,7 @@ handle_call(info, _From, State = #client_state{connection  = Connection,
     ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS),
     ProtoInfo  = emqttd_protocol:info(ProtoState),
     {ok, SockStats} = Connection:getstat(?SOCK_STATS),
-    {noreply, lists:append([ClientInfo, [{proto_info, ProtoInfo},
+    {reply, lists:append([ClientInfo, [{proto_info, ProtoInfo},
                                          {sock_stats, SockStats}]]), State};
 
 handle_call(kick, _From, State) ->
@@ -170,7 +170,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
 
 handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
     ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
-    shutdown(confict, State);
+    shutdown(conflict, State);
 
 handle_info(activate_sock, State) ->
     noreply(run_socket(State#client_state{conn_state = running}));
@@ -281,14 +281,14 @@ received(Bytes, State = #client_state{parser_fun  = ParserFun,
 
 rate_limit(_Size, State = #client_state{rate_limit = undefined}) ->
     run_socket(State);
-rate_limit(Size, State = #client_state{rate_limit = Limiter}) ->
-    case esockd_ratelimit:check(Limiter, Size) of
-        {0, Limiter1} ->
-            run_socket(State#client_state{conn_state = running, rate_limit = Limiter1});
-        {Pause, Limiter1} ->
+rate_limit(Size, State = #client_state{rate_limit = Rl}) ->
+    case Rl:check(Size) of
+        {0, Rl1} ->
+            run_socket(State#client_state{conn_state = running, rate_limit = Rl1});
+        {Pause, Rl1} ->
             ?LOG(error, "Rate limiter pause for ~p", [Size, Pause], State),
             erlang:send_after(Pause, self(), activate_sock),
-            State#client_state{conn_state = blocked, rate_limit = Limiter1}    
+            State#client_state{conn_state = blocked, rate_limit = Rl1}
     end.
 
 run_socket(State = #client_state{conn_state = blocked}) ->

+ 44 - 15
src/emqttd_cm.erl

@@ -41,6 +41,9 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
+%% gen_server2 priorities
+-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
+
 -record(state, {id, statsfun}).
 
 -define(CM_POOL, ?MODULE).
@@ -101,8 +104,21 @@ init([Id, StatsFun]) ->
     gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
     {ok, #state{id = Id, statsfun = StatsFun}}.
 
+prioritise_call(_Req, _From, _Len, _State) ->
+    1.
+
+prioritise_cast(Msg, _Len, _State) ->
+    case Msg of
+        {register, _Client}           -> 2;
+        {unregister, _ClientId, _Pid} -> 3;
+        _                             -> 1
+    end.
+
+prioritise_info(_Msg, _Len, _State) ->
+    1.
+
 handle_call(Req, _From, State) ->
-    lager:error("unexpected request: ~p", [Req]),
+    lager:error("Unexpected request: ~p", [Req]),
     {reply, {error, unsupported_req}, State}.
 
 handle_cast({register, Client = #mqtt_client{client_id  = ClientId,
@@ -110,32 +126,45 @@ handle_cast({register, Client = #mqtt_client{client_id  = ClientId,
 	case ets:lookup(mqtt_client, ClientId) of
         [#mqtt_client{client_pid = Pid}] ->
             ignore;
-		[#mqtt_client{client_pid = OldPid}] ->
-            %% TODO: should cancel monitor
-            ?LOG(warning, "client ~p conflict with ~p", [Pid, OldPid], Client);
-		[] -> 
+        [#mqtt_client{client_pid = _OldPid, client_mon = MRef}] ->
+            %% demonitor
+            erlang:demonitor(MRef, [flush]);
+        [] ->
             ok
 	end,
-    ets:insert(mqtt_client, Client),
+    ets:insert(mqtt_client, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}),
     {noreply, setstats(State)};
 
 handle_cast({unregister, ClientId, Pid}, State) ->
 	case ets:lookup(mqtt_client, ClientId) of
-	[#mqtt_client{client_pid = Pid}] ->
-		ets:delete(mqtt_client, ClientId);
-	[_] ->
-		ignore;
-	[] ->
-        ?LOG(error, "Cannot find registered: ~p", [Pid], State)
-	end,
-	{noreply, setstats(State)};
+        [#mqtt_client{client_pid = Pid, client_mon = MRef}] ->
+            erlang:demonitor(MRef, [flush]),
+            ets:delete(mqtt_client, ClientId),
+            {noreply, setstats(State)};
+        [_] ->
+            {noreply, State};
+        [] ->
+            lager:warning("CM(~s): Cannot find pid ~p", [ClientId, Pid]),
+            {noreply, State}
+    end;
 
 handle_cast(Msg, State) ->
     lager:error("Unexpected Msg: ~p", [Msg]),
     {noreply, State}.
 
+handle_info({'DOWN', MRef, process, DownPid, Reason}, State) ->
+    MP = #mqtt_client{client_pid = DownPid, client_mon = MRef, _ = '_'},
+    case ets:match_object(mqtt_client, MP) of
+        [Client] ->
+            ?LOG(warning, "client ~p DOWN for ~p", [DownPid, Reason], Client),
+            ets:delete_object(mqtt_client, Client);
+        [] ->
+            ignore
+    end,
+    {noreply, setstats(State)};
+
 handle_info(Info, State) ->
-    lager:error("Unexpected Msg: ~p", [Info]),
+    lager:error("Unexpected Info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, #state{id = Id}) ->

+ 17 - 13
src/emqttd_protocol.erl

@@ -149,7 +149,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
 
     trace(recv, Packet, State1),
 
-    {ReturnCode1, State3} =
+    {ReturnCode1, SessPresent, State3} =
     case validate_connect(Var, State1) of
         ?CONNACK_ACCEPT ->
             case emqttd_access_control:auth(client(State1), Password) of
@@ -159,30 +159,30 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
 
                     %% Start session
                     case emqttd_sm:start_session(CleanSess, clientid(State2)) of
-                        {ok, Session} ->
+                        {ok, Session, SP} ->
                             %% Register the client
                             emqttd_cm:register(client(State2)),
                             %% Start keepalive
                             start_keepalive(KeepAlive),
                             %% ACCEPT
-                            {?CONNACK_ACCEPT, State2#proto_state{session = Session}};
+                            {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}};
                         {error, Error} ->
                             exit({shutdown, Error})
                     end;
                 {error, Reason}->
                     ?LOG(error, "Username '~s' login failed for ~s", [Username, Reason], State1),
-                    {?CONNACK_CREDENTIALS, State1}
+                    {?CONNACK_CREDENTIALS, false, State1}
             end;
         ReturnCode ->
-            {ReturnCode, State1}
+            {ReturnCode, false, State1}
     end,
     %% Run hooks
     emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]),
     %% Send connack
-    send(?CONNACK_PACKET(ReturnCode1), State3);
+    send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
 
 process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
-    case check_acl(publish, Topic, State) of
+    case check_acl(publish, Topic, client(State)) of
         allow ->
             publish(Packet, State);
         deny ->
@@ -210,7 +210,8 @@ process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
     send(?SUBACK_PACKET(PacketId, []), State);
 
 process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
-    AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
+    Client = client(State),
+    AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],
     case lists:member(deny, AllowDenies) of
         true ->
             ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
@@ -281,7 +282,7 @@ redeliver({?PUBREL, PacketId}, State) ->
 shutdown(_Error, #proto_state{client_id = undefined}) ->
     ignore;
 
-shutdown(confict, #proto_state{client_id = ClientId}) ->
+shutdown(conflict, #proto_state{client_id = ClientId}) ->
     emqttd_cm:unregister(ClientId);
 
 shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) ->
@@ -391,16 +392,19 @@ validate_qos(_) ->
     false.
 
 %% PUBLISH ACL is cached in process dictionary.
-check_acl(publish, Topic, State) ->
+check_acl(publish, Topic, Client) ->
     case get({acl, publish, Topic}) of
         undefined ->
-            AllowDeny = emqttd_access_control:check_acl(client(State), publish, Topic),
+            AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic),
             put({acl, publish, Topic}, AllowDeny),
             AllowDeny;
         AllowDeny ->
             AllowDeny
     end;
 
-check_acl(subscribe, Topic, State) ->
-    emqttd_access_control:check_acl(client(State), subscribe, Topic).
+check_acl(subscribe, Topic, Client) ->
+    emqttd_access_control:check_acl(Client, subscribe, Topic).
+
+sp(true)  -> 1;
+sp(false) -> 0.
 

+ 14 - 1
src/emqttd_session.erl

@@ -378,6 +378,7 @@ handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
 
 handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id      = ClientId,
                                                               client_pid     = OldClientPid,
+                                                              clean_sess     = CleanSess,
                                                               inflight_queue = InflightQ,
                                                               awaiting_ack   = AwaitingAck,
                                                               awaiting_comp  = AwaitingComp,
@@ -405,10 +406,21 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id      = C
     [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
 
     Session1 = Session#session{client_pid    = ClientPid,
+                               clean_sess    = false,
                                awaiting_ack  = #{},
                                awaiting_comp = #{},
                                expired_timer = undefined},
 
+    %% CleanSess: true -> false?
+    if
+        CleanSess =:= true  ->
+            ?LOG(warning, "CleanSess changed to false.", [], Session),
+            emqttd_sm:unregister_session(CleanSess, ClientId),
+            emqttd_sm:register_session(false, ClientId, sess_info(Session1));
+        CleanSess =:= false ->
+            ok
+    end,
+
     %% Redeliver inflight messages
     Session2 =
     lists:foldl(fun({_Id, Msg}, Sess) ->
@@ -585,7 +597,8 @@ kick(_ClientId, Pid, Pid) ->
     ignore;
 kick(ClientId, OldPid, Pid) ->
     unlink(OldPid),
-    OldPid ! {shutdown, conflict, {ClientId, Pid}}.
+    OldPid ! {shutdown, conflict, {ClientId, Pid}},
+    ok.
 
 %%------------------------------------------------------------------------------
 %% Check inflight and awaiting_rel

+ 12 - 8
src/emqttd_sm.erl

@@ -24,7 +24,6 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
-
 -module(emqttd_sm).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -57,7 +56,7 @@
 
 -define(SM_POOL, ?MODULE).
 
--define(CALL_TIMEOUT, 60000).
+-define(TIMEOUT, 60000).
 
 -define(LOG(Level, Format, Args, Session),
             lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])).
@@ -103,7 +102,7 @@ pool() -> ?SM_POOL.
 %% @doc Start a session
 %% @end
 %%------------------------------------------------------------------------------
--spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid()} | {error, any()}.
+-spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}.
 start_session(CleanSess, ClientId) ->
     SM = gproc_pool:pick_worker(?SM_POOL, ClientId),
     call(SM, {start_session, {CleanSess, ClientId, self()}}).
@@ -144,7 +143,7 @@ sesstab(true)  -> mqtt_transient_session;
 sesstab(false) -> mqtt_persistent_session.
 
 call(SM, Req) ->
-    gen_server2:call(SM, Req, ?CALL_TIMEOUT). %%infinity).
+    gen_server2:call(SM, Req, ?TIMEOUT). %%infinity).
 
 %%%=============================================================================
 %%% gen_server callbacks
@@ -168,20 +167,20 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
     case lookup_session(ClientId) of
         undefined ->
             %% create session locally
-            {reply, create_session(false, ClientId, ClientPid), State};
+            reply(create_session(false, ClientId, ClientPid), false, State);
         Session ->
-            {reply, resume_session(Session, ClientPid), State}
+            reply(resume_session(Session, ClientPid), true, State)
     end;
 
 %% transient session
 handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
     case lookup_session(ClientId) of
         undefined ->
-            {reply, create_session(true, ClientId, ClientPid), State};
+            reply(create_session(true, ClientId, ClientPid), false, State);
         Session ->
             case destroy_session(Session) of
                 ok ->
-                    {reply, create_session(true, ClientId, ClientPid), State};
+                    reply(create_session(true, ClientId, ClientPid), false, State);
                 {error, Error} ->
                     {reply, {error, Error}, State}
             end
@@ -302,3 +301,8 @@ remove_session(Session) ->
         {aborted, Error} -> {error, Error}
     end.
 
+reply({ok, SessPid}, SP, State) ->
+    {reply, {ok, SessPid, SP}, State};
+reply({error, Error}, _SP, State) ->
+    {reply, {error, Error}, State}.
+

+ 1 - 1
src/emqttd_ws_client.erl

@@ -196,7 +196,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
 
 handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) ->
     ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req),
-    shutdown(confict, State);
+    shutdown(conflict, State);
 
 handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) ->
     ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req),

+ 12 - 9
test/emqttd_access_control_tests.erl

@@ -42,30 +42,33 @@ register_mod_test() ->
     with_acl(
         fun() ->
             emqttd_access_control:register_mod(acl, emqttd_acl_test_mod, []),
-            ?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}],
+            ?assertMatch([{emqttd_acl_test_mod, _, 0}, {emqttd_acl_internal, _, 0}],
                           emqttd_access_control:lookup_mods(acl)),
 	    emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
-	    ?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}],
-                          emqttd_access_control:lookup_mods(auth))
+	    emqttd_access_control:register_mod(auth, emqttd_auth_dashboard, [], 99),
+	    ?assertMatch([{emqttd_auth_dashboard, _, 99},
+                      {emqttd_auth_anonymous_test_mod, _, 0},
+                      {emqttd_auth_anonymous, _, 0}],
+                     emqttd_access_control:lookup_mods(auth))
         end).
 
 unregister_mod_test() ->
     with_acl(
         fun() ->
-            emqttd_access_control:register_mod(acl,emqttd_acl_test_mod, []),
-            ?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}],
+            emqttd_access_control:register_mod(acl, emqttd_acl_test_mod, []),
+            ?assertMatch([{emqttd_acl_test_mod, _, 0}, {emqttd_acl_internal, _, 0}],
                           emqttd_access_control:lookup_mods(acl)),
             emqttd_access_control:unregister_mod(acl, emqttd_acl_test_mod),
             timer:sleep(5),
-            ?assertMatch([{emqttd_acl_internal, _}], emqttd_access_control:lookup_mods(acl)),
+            ?assertMatch([{emqttd_acl_internal, _, 0}], emqttd_access_control:lookup_mods(acl)),
 	
 	    emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
-	    ?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}],
+	    ?assertMatch([{emqttd_auth_anonymous_test_mod, _, 0}, {emqttd_auth_anonymous, _, 0}],
                           emqttd_access_control:lookup_mods(auth)),
 		
 	    emqttd_access_control:unregister_mod(auth, emqttd_auth_anonymous_test_mod),
             timer:sleep(5),
-            ?assertMatch([{emqttd_auth_anonymous, _}], emqttd_access_control:lookup_mods(auth))
+            ?assertMatch([{emqttd_auth_anonymous, _, 0}], emqttd_access_control:lookup_mods(auth))
         end).
 
 check_acl_test() ->
@@ -83,7 +86,7 @@ check_acl_test() ->
 
 with_acl(Fun) ->
     process_flag(trap_exit, true),
-     AclOpts = [
+    AclOpts = [
         {auth, [
             %% Authentication with username, password
             %{username, []},

+ 17 - 4
test/emqttd_access_rule_tests.erl

@@ -35,6 +35,14 @@
 -include_lib("eunit/include/eunit.hrl").
 
 compile_test() ->
+
+    ?assertMatch({allow, {'and', [{ipaddr, {"127.0.0.1", _I, _I}},
+                                      {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]},
+                 compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]})),
+    ?assertMatch({allow, {'or', [{ipaddr, {"127.0.0.1", _I, _I}},
+                                 {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]},
+                 compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]})),
+
     ?assertMatch({allow, {ipaddr, {"127.0.0.1", _I, _I}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]},
                  compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})),
     ?assertMatch({allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]},
@@ -69,10 +77,15 @@ match_test() ->
     ?assertMatch({matched, allow}, match(User, <<"clients/testClient">>,
                                                        compile({allow, all, pubsub, ["clients/$c"]}))),
     ?assertMatch({matched, allow}, match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>,
-                                                                  compile({allow, all, subscribe, ["users/$u/#"]}))),
-    ?assertMatch({matched, deny}, 
-                 match(User, <<"d/e/f">>,
-                                     compile({deny, all, subscribe, ["$SYS/#", "#"]}))).
+                                         compile({allow, all, subscribe, ["users/$u/#"]}))),
+    ?assertMatch({matched, deny}, match(User, <<"d/e/f">>,
+                                        compile({deny, all, subscribe, ["$SYS/#", "#"]}))),
+    Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
+    ?assertMatch(nomatch, match(User, <<"Topic">>, Rule)),
+    AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}),
+    ?assertMatch({matched, allow}, match(User, <<"Topic">>, AndRule)),
+    OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
+    ?assertMatch({matched, allow}, match(User, <<"Topic">>, OrRule)).
 
 -endif.
 

+ 14 - 0
test/emqttd_auth_dashboard.erl

@@ -0,0 +1,14 @@
+
+-module(emqttd_auth_dashboard).
+
+%% Auth callbacks
+-export([init/1, check/3, description/0]).
+
+init(Opts) ->
+    {ok, Opts}.
+
+check(_Client, _Password, _Opts) ->
+    allow.
+
+description() ->
+    "Test emqttd_auth_dashboard Mod".