Просмотр исходного кода

Improve emqx_hooks and credentials (#2309)

* Improve emqx_hooks and credentials

1. Modify the return modes of emqx hooks.

Change the return value of hook functions to:
- ok: stop the hook chain and return ok
- {error, Reason}: stop the hook chain and return error
- continue: continue the hook chain

And the return value of emqx_hooks:run/2 is changed to:
- ok
- {error, Reason}

And the return value of emqx_hooks:run/3:
- {ok, Acc}
- {error, Reason, Acc}

2. Treat password as a member of credentials.

Password should be wrapped in the `credentials` data-structure, as the
username/password pair together consists of an authentication method.
There can be some methods using some other credential data (e.g.
a JWT token), and these credential data should also be wrapped in the
the `credentials` data-structure.

An event `client.authenticate` is triggered when an user logs in:
```erlang
emqx_hooks:run('client.authenticate', [], Credentials)
```

A `default callback` that deny/allow any user (according to the
`allow_anonymous` config) should be appended to the end of the
callback chain.

The `credentails` is passed through all of the callbacks, and
can be changed over in this process.

* Refactor emqx hooks return mode

* Remove password from PState
Shawn 7 лет назад
Родитель
Сommit
02fe8560e2

+ 1 - 1
Makefile

@@ -31,7 +31,7 @@ EUNIT_OPTS = verbose
 ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
 
 CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
-			emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
+			emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
 			emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
 			emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
 			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \

+ 6 - 6
src/emqx.erl

@@ -29,7 +29,7 @@
 -export([topics/0, subscriptions/1, subscribers/1, subscribed/2]).
 
 %% Hooks API
--export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]).
+-export([hook/2, hook/3, hook/4, unhook/2, run_hook/2, run_fold_hook/3]).
 
 %% Shutdown and reboot
 -export([shutdown/0, shutdown/1, reboot/0]).
@@ -142,13 +142,13 @@ hook(HookPoint, Action, Filter, Priority) ->
 unhook(HookPoint, Action) ->
     emqx_hooks:del(HookPoint, Action).
 
--spec(run_hooks(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
-run_hooks(HookPoint, Args) ->
+-spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
+run_hook(HookPoint, Args) ->
     emqx_hooks:run(HookPoint, Args).
 
--spec(run_hooks(emqx_hooks:hookpoint(), list(any()), any()) -> {ok | stop, any()}).
-run_hooks(HookPoint, Args, Acc) ->
-    emqx_hooks:run(HookPoint, Args, Acc).
+-spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
+run_fold_hook(HookPoint, Args, Acc) ->
+    emqx_hooks:run_fold(HookPoint, Args, Acc).
 
 %%------------------------------------------------------------------------------
 %% Shutdown and reboot

+ 34 - 177
src/emqx_access_control.erl

@@ -14,198 +14,55 @@
 
 -module(emqx_access_control).
 
--behaviour(gen_server).
-
 -include("emqx.hrl").
 -include("logger.hrl").
 
--export([start_link/0]).
--export([authenticate/2]).
+-export([authenticate/1]).
 -export([check_acl/3, reload_acl/0]).
--export([register_mod/3, register_mod/4, unregister_mod/2]).
--export([lookup_mods/1]).
--export([stop/0]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
-         code_change/3]).
-
--define(TAB, ?MODULE).
--define(SERVER, ?MODULE).
 
 %%------------------------------------------------------------------------------
-%% API
+%% APIs
 %%------------------------------------------------------------------------------
-
-%% @doc Start access control server.
--spec(start_link() -> {ok, pid()} | {error, term()}).
-start_link() ->
-    start_with(fun register_default_acl/0).
-
-start_with(Fun) ->
-    case gen_server:start_link({local, ?SERVER}, ?MODULE, [], []) of
-        {ok, Pid} ->
-            Fun(), {ok, Pid};
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-register_default_acl() ->
-    case emqx_config:get_env(acl_file) of
-        undefined -> ok;
-        File -> register_mod(acl, emqx_acl_internal, [File])
-    end.
-
--spec(authenticate(emqx_types:credentials(), emqx_types:password())
-      -> ok | {ok, map()} | {continue, map()} | {error, term()}).
-authenticate(Credentials, Password) ->
-    authenticate(Credentials, Password, lookup_mods(auth)).
-
-authenticate(Credentials, _Password, []) ->
-    Zone = maps:get(zone, Credentials, undefined),
-    case emqx_zone:get_env(Zone, allow_anonymous, false) of
-        true  -> ok;
-        false -> {error, auth_modules_not_found}
-    end;
-
-authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) ->
-    try Mod:check(Credentials, Password, State) of
-        ok -> ok;
-        {ok, IsSuper} when is_boolean(IsSuper) ->
-            {ok, #{is_superuser => IsSuper}};
-        {ok, Result} when is_map(Result) ->
-            {ok, Result};
-        {continue, Result} when is_map(Result) ->
-            {continue, Result};
-        ignore ->
-            authenticate(Credentials, Password, Mods);
-        {error, Reason} ->
-            {error, Reason}
-    catch
-        error:Reason:StackTrace ->
-            ?LOG(error, "Authenticate failed. StackTrace: ~p", [StackTrace]),
-            {error, Reason}
+-spec(authenticate(emqx_types:credentials())
+      -> {ok, emqx_types:credentials()} | {error, term()}).
+authenticate(Credentials) ->
+    case emqx_hooks:run_fold('client.authenticate', [], Credentials#{result => init_result(Credentials)}) of
+        #{result := success} = NewCredentials ->
+            {ok, NewCredentials};
+        NewCredentials ->
+            {error, maps:get(result, NewCredentials, unknown_error)}
     end.
 
 %% @doc Check ACL
 -spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny).
-check_acl(Credentials, PubSub, Topic) when PubSub =:= publish; PubSub =:= subscribe ->
-    check_acl(Credentials, PubSub, Topic, lookup_mods(acl), emqx_acl_cache:is_enabled()).
-
-check_acl(Credentials, PubSub, Topic, AclMods, false) ->
-    do_check_acl(Credentials, PubSub, Topic, AclMods);
-check_acl(Credentials, PubSub, Topic, AclMods, true) ->
-    case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
-        not_found ->
-            AclResult = do_check_acl(Credentials, PubSub, Topic, AclMods),
-            emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
-            AclResult;
-        AclResult ->
-            AclResult
+check_acl(Credentials, PubSub, Topic) ->
+    case emqx_acl_cache:is_enabled() of
+        false ->
+            do_check_acl(Credentials, PubSub, Topic);
+        true ->
+            case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
+                not_found ->
+                    AclResult = do_check_acl(Credentials, PubSub, Topic),
+                    emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
+                    AclResult;
+                AclResult ->
+                    AclResult
+            end
     end.
 
-do_check_acl(#{zone := Zone}, _PubSub, _Topic, []) ->
-    emqx_zone:get_env(Zone, acl_nomatch, deny);
-do_check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
-    case Mod:check_acl({Credentials, PubSub, Topic}, State) of
-        allow  -> allow;
-        deny   -> deny;
-        ignore -> do_check_acl(Credentials, PubSub, Topic, AclMods)
+do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) ->
+    case emqx_hooks:run_fold('client.check_acl', [Credentials, PubSub, Topic],
+                             emqx_zone:get_env(Zone, acl_nomatch, deny)) of
+        allow -> allow;
+        _ -> deny
     end.
 
 -spec(reload_acl() -> list(ok | {error, term()})).
 reload_acl() ->
-    [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
-
-%% @doc Register an Auth/ACL module.
--spec(register_mod(auth | acl, module(), list()) -> ok | {error, term()}).
-register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl ->
-    register_mod(Type, Mod, Opts, 0).
-
--spec(register_mod(auth | acl, module(), list(), non_neg_integer())
-      -> ok | {error, term()}).
-register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
-    gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
-
-%% @doc Unregister an Auth/ACL module.
--spec(unregister_mod(auth | acl, module()) -> ok | {error, not_found | term()}).
-unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
-    gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
+    emqx_mod_acl_internal:reload_acl().
 
-%% @doc Lookup all Auth/ACL modules.
--spec(lookup_mods(auth | acl) -> list()).
-lookup_mods(Type) ->
-    case ets:lookup(?TAB, tab_key(Type)) of
-        [] -> [];
-        [{_, Mods}] -> Mods
-    end.
-
-tab_key(auth) -> auth_modules;
-tab_key(acl)  -> acl_modules.
-
-stop() ->
-    gen_server:stop(?SERVER, normal, infinity).
-
-%%-----------------------------------------------------------------------------
-%% gen_server callbacks
-%%-----------------------------------------------------------------------------
-
-init([]) ->
-    ok = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
-    {ok, #{}}.
-
-handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
-    Mods = lookup_mods(Type),
-    reply(case lists:keymember(Mod, 1, Mods) of
-              true  -> {error, already_exists};
-              false ->
-                    try Mod:init(Opts) of
-                        {ok, ModState} ->
-                            NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) ->
-                                                        Seq1 >= Seq2
-                                                end, [{Mod, ModState, Seq} | Mods]),
-                            ets:insert(?TAB, {tab_key(Type), NewMods}),
-                            ok
-                    catch
-                        _:Error ->
-                            emqx_logger:error("[AccessControl] Failed to init ~s: ~p", [Mod, Error]),
-                            {error, Error}
-                    end
-          end, State);
-
-handle_call({unregister_mod, Type, Mod}, _From, State) ->
-    Mods = lookup_mods(Type),
-    reply(case lists:keyfind(Mod, 1, Mods) of
-              false ->
-                  {error, not_found};
-              {Mod, _ModState, _Seq} ->
-                  ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok
-          end, State);
-
-handle_call(stop, _From, State) ->
-    {stop, normal, ok, State};
-
-handle_call(Req, _From, State) ->
-    emqx_logger:error("[AccessControl] unexpected request: ~p", [Req]),
-    {reply, ignored, State}.
-
-handle_cast(Msg, State) ->
-    emqx_logger:error("[AccessControl] unexpected msg: ~p", [Msg]),
-    {noreply, State}.
-
-handle_info(Info, State) ->
-    emqx_logger:error("[AccessControl] unexpected info: ~p", [Info]),
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-reply(Reply, State) ->
-    {reply, Reply, State}.
+init_result(#{zone := Zone}) ->
+    case emqx_zone:get_env(Zone, allow_anonymous, false) of
+        true -> success;
+        false -> not_authorized
+    end.

+ 4 - 4
src/emqx_access_rule.erl

@@ -16,6 +16,8 @@
 
 -include("emqx.hrl").
 
+-type(acl_result() :: allow | deny).
+
 -type(who() :: all | binary() |
                {client, binary()} |
                {user, binary()} |
@@ -23,10 +25,8 @@
 
 -type(access() :: subscribe | publish | pubsub).
 
--type(rule() :: {allow, all} |
-                {allow, who(), access(), list(emqx_topic:topic())} |
-                {deny, all} |
-                {deny, who(), access(), list(emqx_topic:topic())}).
+-type(rule() :: {acl_result(), all} |
+                {acl_result(), who(), access(), list(emqx_topic:topic())}).
 
 -export_type([rule/0]).
 

+ 8 - 8
src/emqx_broker.erl

@@ -167,13 +167,14 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
 -spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
 publish(Msg) when is_record(Msg, message) ->
     _ = emqx_tracer:trace(publish, Msg),
-    case emqx_hooks:run('message.publish', [], Msg) of
-        {ok, Msg1 = #message{topic = Topic}} ->
+    Headers = Msg#message.headers,
+    case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of
+        #message{headers = #{allow_publish := false}} ->
+            ?WARN("Publishing interrupted: ~s", [emqx_message:format(Msg)]),
+            [];
+        #message{topic = Topic} = Msg1 ->
             Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
-            Delivery#delivery.results;
-        {stop, _} ->
-            ?WARN("Stop publishing: ~s", [emqx_message:format(Msg)]),
-            []
+            Delivery#delivery.results
     end.
 
 %% Called internally
@@ -443,5 +444,4 @@ code_change(_OldVsn, State, _Extra) ->
 
 %%------------------------------------------------------------------------------
 %% Internal functions
-%%------------------------------------------------------------------------------
-
+%%------------------------------------------------------------------------------

+ 28 - 23
src/emqx_hooks.erl

@@ -22,7 +22,7 @@
 -export([start_link/0, stop/0]).
 
 %% Hooks API
--export([add/2, add/3, add/4, del/2, run/2, run/3, lookup/1]).
+-export([add/2, add/3, add/4, del/2, run/2, run_fold/3, lookup/1]).
 
 %% gen_server Function Exports
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -90,39 +90,44 @@ del(HookPoint, Action) ->
     gen_server:cast(?SERVER, {del, HookPoint, Action}).
 
 %% @doc Run hooks.
--spec(run(atom(), list(Arg :: any())) -> ok | stop).
+-spec(run(atom(), list(Arg::term())) -> ok).
 run(HookPoint, Args) ->
-    run_(lookup(HookPoint), Args).
+    do_run(lookup(HookPoint), Args).
 
 %% @doc Run hooks with Accumulator.
--spec(run(atom(), list(Arg::any()), Acc::any()) -> {ok, Acc::any()} | {stop, Acc::any()}).
-run(HookPoint, Args, Acc) ->
-    run_(lookup(HookPoint), Args, Acc).
+-spec(run_fold(atom(), list(Arg::term()), Acc::term()) -> Acc::term()).
+run_fold(HookPoint, Args, Acc) ->
+    do_run_fold(lookup(HookPoint), Args, Acc).
 
-%% @private
-run_([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
+
+do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
     case filter_passed(Filter, Args) andalso execute(Action, Args) of
-        false -> run_(Callbacks, Args);
-        ok    -> run_(Callbacks, Args);
-        stop  -> stop;
-        _Any  -> run_(Callbacks, Args)
+        %% stop the hook chain and return
+        stop -> ok;
+        %% continue the hook chain, in following cases:
+        %%   - the filter validation failed with 'false'
+        %%   - the callback returns any term other than 'stop'
+        _ -> do_run(Callbacks, Args)
     end;
-run_([], _Args) ->
+do_run([], _Args) ->
     ok.
 
-%% @private
-run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
+do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
     Args1 = Args ++ [Acc],
     case filter_passed(Filter, Args1) andalso execute(Action, Args1) of
-        false          -> run_(Callbacks, Args, Acc);
-        ok             -> run_(Callbacks, Args, Acc);
-        {ok, NewAcc}   -> run_(Callbacks, Args, NewAcc);
-        stop           -> {stop, Acc};
-        {stop, NewAcc} -> {stop, NewAcc};
-        _Any           -> run_(Callbacks, Args, Acc)
+        %% stop the hook chain
+        stop -> Acc;
+        %% stop the hook chain with NewAcc
+        {stop, NewAcc}   -> NewAcc;
+        %% continue the hook chain with NewAcc
+        {ok, NewAcc}   -> do_run_fold(Callbacks, Args, NewAcc);
+        %% continue the hook chain, in following cases:
+        %%   - the filter validation failed with 'false'
+        %%   - the callback returns any term other than 'stop' or {'stop', NewAcc}
+        _ -> do_run_fold(Callbacks, Args, Acc)
     end;
-run_([], _Args, Acc) ->
-    {ok, Acc}.
+do_run_fold([], _Args, Acc) ->
+    Acc.
 
 -spec(filter_passed(filter(), Args::term()) -> true | false).
 filter_passed(undefined, _Args) -> true;

+ 38 - 42
src/emqx_acl_internal.erl

@@ -12,57 +12,56 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 
--module(emqx_acl_internal).
+-module(emqx_mod_acl_internal).
 
--behaviour(emqx_acl_mod).
+-behaviour(emqx_gen_mod).
 
 -include("emqx.hrl").
 -include("logger.hrl").
 
+-export([load/1, unload/1]).
+
 -export([all_rules/0]).
 
-%% ACL mod callbacks
--export([init/1, check_acl/2, reload_acl/1, description/0]).
+-export([check_acl/5, reload_acl/0]).
 
 -define(ACL_RULE_TAB, emqx_acl_rule).
 
--type(state() :: #{acl_file := string()}).
+-define(FUNC(M, F, A), {M, F, A}).
+
+-type(acl_rules() :: #{publish => [emqx_access_rule:rule()],
+                       subscribe => [emqx_access_rule:rule()]}).
 
 %%------------------------------------------------------------------------------
 %% API
 %%------------------------------------------------------------------------------
 
+load(_Env) ->
+    Rules = load_rules_from_file(acl_file()),
+    emqx_hooks:add('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules]),  -1).
+
+unload(_Env) ->
+    Rules = load_rules_from_file(acl_file()),
+    emqx_hooks:del('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules])).
+
 %% @doc Read all rules
 -spec(all_rules() -> list(emqx_access_rule:rule())).
 all_rules() ->
-    case ets:lookup(?ACL_RULE_TAB, all_rules) of
-        [] -> [];
-        [{_, Rules}] -> Rules
-    end.
+    load_rules_from_file(acl_file()).
 
 %%------------------------------------------------------------------------------
 %% ACL callbacks
 %%------------------------------------------------------------------------------
 
--spec(init([File :: string()]) -> {ok, #{}}).
-init([File]) ->
-    _ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
-    ok = load_rules_from_file(File),
-    {ok, #{acl_file => File}}.
-
 load_rules_from_file(AclFile) ->
     case file:consult(AclFile) of
         {ok, Terms} ->
             Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
-            lists:foreach(fun(PubSub) ->
-                ets:insert(?ACL_RULE_TAB, {PubSub,
-                    lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
-                end, [publish, subscribe]),
-            ets:insert(?ACL_RULE_TAB, {all_rules, Terms}),
-            ok;
+            #{publish => lists:filter(fun(Rule) -> filter(publish, Rule) end, Rules),
+              subscribe => lists:filter(fun(Rule) -> filter(subscribe, Rule) end, Rules)};
         {error, Reason} ->
-            emqx_logger:error("[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
-            {error, Reason}
+            ?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
+            []
     end.
 
 filter(_PubSub, {allow, all}) ->
@@ -79,20 +78,18 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
     false.
 
 %% @doc Check ACL
--spec(check_acl({emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic()}, #{})
-      -> allow | deny | ignore).
-check_acl({Credentials, PubSub, Topic}, _State) ->
-    case match(Credentials, Topic, lookup(PubSub)) of
-        {matched, allow} -> allow;
-        {matched, deny}  -> deny;
-        nomatch          -> ignore
+-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(),
+                emqx_access_rule:acl_result(), acl_rules())
+      -> {ok, allow} | {ok, deny} | ok).
+check_acl(Credentials, PubSub, Topic, _AclResult, Rules) ->
+    case match(Credentials, Topic, lookup(PubSub, Rules)) of
+        {matched, allow} -> {ok, allow};
+        {matched, deny}  -> {ok, deny};
+        nomatch          -> ok
     end.
 
-lookup(PubSub) ->
-    case ets:lookup(?ACL_RULE_TAB, PubSub) of
-        [] -> [];
-        [{PubSub, Rules}] -> Rules
-    end.
+lookup(PubSub, Rules) ->
+    maps:get(PubSub, Rules, []).
 
 match(_Credentials, _Topic, []) ->
     nomatch;
@@ -104,11 +101,11 @@ match(Credentials, Topic, [Rule|Rules]) ->
             {matched, AllowDeny}
     end.
 
--spec(reload_acl(state()) -> ok | {error, term()}).
-reload_acl(#{acl_file := AclFile}) ->
-    try load_rules_from_file(AclFile) of
+-spec(reload_acl() -> ok | {error, term()}).
+reload_acl() ->
+    try load_rules_from_file(acl_file()) of
         ok ->
-            emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
+            emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]),
             ok;
         {error, Error} ->
             {error, Error}
@@ -118,6 +115,5 @@ reload_acl(#{acl_file := AclFile}) ->
             {error, Reason}
     end.
 
--spec(description() -> string()).
-description() ->
-    "Internal ACL with etc/acl.conf".
+acl_file() ->
+    emqx_config:get_env(acl_file).

+ 2 - 0
src/emqx_modules.erl

@@ -18,6 +18,7 @@
 
 -spec(load() -> ok).
 load() ->
+    ok = emqx_mod_acl_internal:load([]),
     lists:foreach(
       fun({Mod, Env}) ->
         ok = Mod:load(Env),
@@ -26,6 +27,7 @@ load() ->
 
 -spec(unload() -> ok).
 unload() ->
+    ok = emqx_mod_acl_internal:unload([]),
     lists:foreach(
       fun({Mod, Env}) ->
           Mod:unload(Env) end,

+ 52 - 78
src/emqx_protocol.erl

@@ -57,7 +57,6 @@
           will_msg,
           keepalive,
           mountpoint,
-          is_super,
           is_bridge,
           enable_ban,
           enable_acl,
@@ -68,7 +67,8 @@
           connected_at,
           ignore_loop,
           topic_alias_maximum,
-          conn_mod
+          conn_mod,
+          credentials
         }).
 
 -opaque(state() :: #pstate{}).
@@ -97,7 +97,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
             is_assigned         = false,
             conn_pid            = self(),
             username            = init_username(Peercert, Options),
-            is_super            = false,
             clean_start         = false,
             topic_aliases       = #{},
             packet_size         = emqx_zone:get_env(Zone, max_packet_size),
@@ -111,7 +110,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
             connected           = false,
             ignore_loop         = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
             topic_alias_maximum = #{to_client => 0, from_client => 0},
-            conn_mod            = maps:get(conn_mod, SocketOpts, undefined)}.
+            conn_mod            = maps:get(conn_mod, SocketOpts, undefined),
+            credentials         = #{}}.
 
 init_username(Peercert, Options) ->
     case proplists:get_value(peer_cert_as_username, Options) of
@@ -153,10 +153,10 @@ attrs(#pstate{zone         = Zone,
               proto_name   = ProtoName,
               keepalive    = Keepalive,
               mountpoint   = Mountpoint,
-              is_super     = IsSuper,
               is_bridge    = IsBridge,
               connected_at = ConnectedAt,
-              conn_mod     = ConnMod}) ->
+              conn_mod     = ConnMod,
+              credentials  = Credentials}) ->
     [{zone, Zone},
      {client_id, ClientId},
      {username, Username},
@@ -167,10 +167,11 @@ attrs(#pstate{zone         = Zone,
      {clean_start, CleanStart},
      {keepalive, Keepalive},
      {mountpoint, Mountpoint},
-     {is_super, IsSuper},
      {is_bridge, IsBridge},
      {connected_at, ConnectedAt},
-     {conn_mod, ConnMod}].
+     {conn_mod, ConnMod},
+     {credentials, Credentials}
+     ].
 
 attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
     get_property('Receive-Maximum', ConnProps, 65535);
@@ -200,6 +201,8 @@ caps(#pstate{zone = Zone}) ->
 client_id(#pstate{client_id = ClientId}) ->
     ClientId.
 
+credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 ->
+    Credentials;
 credentials(#pstate{zone       = Zone,
                     client_id  = ClientId,
                     username   = Username,
@@ -362,8 +365,7 @@ process(?CONNECT_PACKET(
 
     %% TODO: Mountpoint...
     %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
-
-    PState1 = set_username(Username,
+    PState0 = set_username(Username,
                            PState#pstate{client_id    = NewClientId,
                                          proto_ver    = ProtoVer,
                                          proto_name   = ProtoName,
@@ -372,20 +374,21 @@ process(?CONNECT_PACKET(
                                          conn_props   = ConnProps,
                                          is_bridge    = IsBridge,
                                          connected_at = os:timestamp()}),
-
+    Credentials = credentials(PState0),
+    PState1 = PState0#pstate{credentials = Credentials},
     connack(
       case check_connect(ConnPkt, PState1) of
-          {ok, PState2} ->
-              case authenticate(credentials(PState2), Password) of
-                  {ok, IsSuper} ->
-                      %% Maybe assign a clientId
-                      PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
+          ok ->
+              case emqx_access_control:authenticate(Credentials#{password => Password}) of
+                  {ok, Credentials0} ->
+                      PState3 = maybe_assign_client_id(PState1),
                       emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
                       %% Open session
                       SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
                       case try_open_session(SessAttrs, PState3) of
                           {ok, SPid, SP} ->
-                              PState4 = PState3#pstate{session = SPid, connected = true},
+                              PState4 = PState3#pstate{session = SPid, connected = true,
+                                                       credentials = maps:remove(password, Credentials0)},
                               ok = emqx_cm:register_connection(client_id(PState4)),
                               true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
                               %% Start keepalive
@@ -394,11 +397,11 @@ process(?CONNECT_PACKET(
                               {?RC_SUCCESS, SP, PState4};
                           {error, Error} ->
                               ?LOG(error, "Failed to open session: ~p", [Error]),
-                              {?RC_UNSPECIFIED_ERROR, PState1}
+                              {?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}}
                       end;
                   {error, Reason} ->
-                      ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason]),
-                      {?RC_NOT_AUTHORIZED, PState1}
+                      ?LOG(error, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
+                      {emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}}
               end;
           {error, ReasonCode} ->
               {ReasonCode, PState1}
@@ -406,8 +409,8 @@ process(?CONNECT_PACKET(
 
 process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
     case check_publish(Packet, PState) of
-        {ok, PState1} ->
-            do_publish(Packet, PState1);
+        ok ->
+            do_publish(Packet, PState);
         {error, ReasonCode} ->
             ?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
                  [Topic, emqx_reason_codes:text(ReasonCode)]),
@@ -416,8 +419,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
 
 process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
     case check_publish(Packet, PState) of
-        {ok, PState1} ->
-            do_publish(Packet, PState1);
+        ok ->
+            do_publish(Packet, PState);
         {error, ReasonCode} ->
             ?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
                 [Topic, emqx_reason_codes:text(ReasonCode)]),
@@ -430,8 +433,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
 
 process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
     case check_publish(Packet, PState) of
-        {ok, PState1} ->
-            do_publish(Packet, PState1);
+        ok ->
+            do_publish(Packet, PState);
         {error, ReasonCode} ->
             ?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
                  [Topic, emqx_reason_codes:text(ReasonCode)]),
@@ -480,16 +483,10 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
     case check_subscribe(
            parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
         {ok, TopicFilters} ->
-            case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
-                {ok, TopicFilters1} ->
-                    ok = emqx_session:subscribe(SPid, PacketId, Properties,
-                                                emqx_mountpoint:mount(Mountpoint, TopicFilters1)),
-                    {ok, PState};
-                {stop, _} ->
-                    ReasonCodes = lists:duplicate(length(TopicFilters),
-                                                  ?RC_IMPLEMENTATION_SPECIFIC_ERROR),
-                    deliver({suback, PacketId, ReasonCodes}, PState)
-            end;
+            TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters),
+            ok = emqx_session:subscribe(SPid, PacketId, Properties,
+                                        emqx_mountpoint:mount(Mountpoint, TopicFilters0)),
+            {ok, PState};
         {error, TopicFilters} ->
             {SubTopics, ReasonCodes} =
                 lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) ->
@@ -509,17 +506,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
 
 process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
         PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
-    case emqx_hooks:run('client.unsubscribe', [credentials(PState)],
-                        parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of
-        {ok, TopicFilters} ->
-            ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
-                                          emqx_mountpoint:mount(MountPoint, TopicFilters)),
-            {ok, PState};
-        {stop, _Acc} ->
-            ReasonCodes = lists:duplicate(length(RawTopicFilters),
-                                          ?RC_IMPLEMENTATION_SPECIFIC_ERROR),
-            deliver({unsuback, PacketId, ReasonCodes}, PState)
-    end;
+    TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)],
+                                       parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
+    ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
+                                  emqx_mountpoint:mount(MountPoint, TopicFilters)),
+    {ok, PState};
 
 process(?PACKET(?PINGREQ), PState) ->
     send(?PACKET(?PINGRESP), PState);
@@ -547,11 +538,11 @@ process(?DISCONNECT_PACKET(_), PState) ->
 %%------------------------------------------------------------------------------
 
 connack({?RC_SUCCESS, SP, PState}) ->
-    emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
+    ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
     deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
 
 connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
-    emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
+    ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
     [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer),
     _ = deliver({connack, ReasonCode1}, PState),
     {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
@@ -660,8 +651,8 @@ deliver({connack, ReasonCode, SP}, PState) ->
     send(?CONNACK_PACKET(ReasonCode, SP), PState);
 
 deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
-    _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
-    Msg1 = emqx_message:update_expiry(Msg),
+    Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg),
+    Msg1 = emqx_message:update_expiry(Msg0),
     Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
     send(emqx_packet:from_message(PacketId, emqx_message:remove_topic_alias(Msg2)), PState);
 
@@ -744,17 +735,6 @@ try_open_session(SessAttrs, PState = #pstate{zone = Zone,
         Other -> Other
     end.
 
-authenticate(Credentials, Password) ->
-    case emqx_access_control:authenticate(Credentials, Password) of
-        ok -> {ok, false};
-        {ok, IsSuper} when is_boolean(IsSuper) ->
-            {ok, IsSuper};
-        {ok, Result} when is_map(Result) ->
-            {ok, maps:get(is_superuser, Result, false)};
-        {error, Error} ->
-            {error, Error}
-    end.
-
 set_property(Name, Value, ?NO_PROPS) ->
     #{Name => Value};
 set_property(Name, Value, Props) ->
@@ -855,25 +835,21 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
                #pstate{zone = Zone}) ->
     emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
 
-check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
-    when IsSuper orelse (not EnableAcl) ->
+check_pub_acl(_Packet, #pstate{credentials = #{is_super := IsSuper}, enable_acl = EnableAcl})
+        when IsSuper orelse (not EnableAcl) ->
     ok;
-
 check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) ->
     case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
         allow -> ok;
-        deny  ->
-            {error, ?RC_NOT_AUTHORIZED}
+        deny -> {error, ?RC_NOT_AUTHORIZED}
     end.
 
-run_check_steps([], _Packet, PState) ->
-    {ok, PState};
+run_check_steps([], _Packet, _PState) ->
+    ok;
 run_check_steps([Check|Steps], Packet, PState) ->
     case Check(Packet, PState) of
         ok ->
             run_check_steps(Steps, Packet, PState);
-        {ok, PState1} ->
-            run_check_steps(Steps, Packet, PState1);
         Error = {error, _RC} ->
             Error
     end.
@@ -886,15 +862,13 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
             {error, TopicFilter1}
     end.
 
-check_sub_acl(TopicFilters, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
-    when IsSuper orelse (not EnableAcl) ->
+check_sub_acl(TopicFilters, #pstate{credentials = #{is_super := IsSuper}, enable_acl = EnableAcl})
+        when IsSuper orelse (not EnableAcl) ->
     {ok, TopicFilters};
-
 check_sub_acl(TopicFilters, PState) ->
-    Credentials = credentials(PState),
     lists:foldr(
       fun({Topic, SubOpts}, {Ok, Acc}) ->
-              case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
+              case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
                   allow -> {Ok, [{Topic, SubOpts}|Acc]};
                   deny  ->
                       {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
@@ -928,7 +902,7 @@ terminate(discard, _PState) ->
     ok;
 terminate(Reason, PState) ->
     ?LOG(info, "Shutdown for ~p", [Reason]),
-    emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
+    ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
 
 start_keepalive(0, _PState) ->
     ignore;
@@ -999,4 +973,4 @@ reason_codes_compat(_PktType, ReasonCodes, ?MQTT_PROTO_V5) ->
 reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
     undefined;
 reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
-    [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
+    [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].

+ 6 - 4
src/emqx_psk.erl

@@ -25,10 +25,12 @@
 -type psk_user_state() :: term().
 
 -spec lookup(psk, psk_identity(), psk_user_state()) -> {ok, SharedSecret :: binary()} | error.
-lookup(psk, ClientPSKID, UserState) ->
-    try emqx_hooks:run('tls_handshake.psk_lookup', [ClientPSKID], UserState) of
-        {ok, SharedSecret} -> {ok, SharedSecret};
-        {stop, SharedSecret} -> {ok, SharedSecret}
+lookup(psk, ClientPSKID, _UserState) ->
+    try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of
+        SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret};
+        Error ->
+            ?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]),
+            error
     catch
         Except:Error:Stacktrace ->
           ?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),

+ 10 - 1
src/emqx_reason_codes.erl

@@ -17,7 +17,7 @@
 
 -include("emqx_mqtt.hrl").
 
--export([name/2, text/1]).
+-export([name/2, text/1, connack_error/1]).
 -export([compat/2]).
 
 name(I, Ver) when Ver >= ?MQTT_PROTO_V5 ->
@@ -143,3 +143,12 @@ compat(suback, Code) when Code =< ?QOS_2 -> Code;
 compat(suback, Code) when Code >= 16#80  -> 16#80;
 
 compat(unsuback, _Code) -> undefined.
+
+connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
+connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
+connack_error(not_authorized) -> ?RC_NOT_AUTHORIZED;
+connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE;
+connack_error(server_busy) -> ?RC_SERVER_BUSY;
+connack_error(banned) -> ?RC_BANNED;
+connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
+connack_error(_) -> ?RC_NOT_AUTHORIZED.

+ 29 - 23
src/emqx_session.erl

@@ -369,7 +369,7 @@ init([Parent, #{zone            := Zone,
     ok = emqx_sm:register_session(ClientId, self()),
     true = emqx_sm:set_session_attrs(ClientId, attrs(State)),
     true = emqx_sm:set_session_stats(ClientId, stats(State)),
-    emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
+    ok = emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     ok = emqx_misc:init_proc_mng_policy(Zone),
     ok = proc_lib:init_ack(Parent, {ok, self()}),
     gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
@@ -466,22 +466,13 @@ handle_call(Req, _From, State) ->
 handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
             State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
     {ReasonCodes, Subscriptions1} =
-        lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
-                            {[QoS|RcAcc], case maps:find(Topic, SubMap) of
-                                              {ok, SubOpts} ->
-                                                  emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
-                                                  SubMap;
-                                              {ok, _SubOpts} ->
-                                                  emqx_broker:set_subopts(Topic, SubOpts),
-                                                  %% Why???
-                                                  emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
-                                                  maps:put(Topic, SubOpts, SubMap);
-                                              error ->
-                                                  emqx_broker:subscribe(Topic, ClientId, SubOpts),
-                                                  emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
-                                                  maps:put(Topic, SubOpts, SubMap)
-                                          end}
-                    end, {[], Subscriptions}, TopicFilters),
+        lists:foldr(
+            fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when
+                      RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 ->
+                    {[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)};
+                ({_Topic, #{rc := RC}}, {RcAcc, SubMap}) ->
+                    {[RC|RcAcc], SubMap}
+            end, {[], Subscriptions}, TopicFilters),
     suback(FromPid, PacketId, ReasonCodes),
     noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
 
@@ -493,7 +484,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
                             case maps:find(Topic, SubMap) of
                                 {ok, SubOpts} ->
                                     ok = emqx_broker:unsubscribe(Topic),
-                                    emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
+                                    ok = emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
                                     {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
                                 error ->
                                     {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
@@ -568,7 +559,7 @@ handle_cast({resume, #{conn_pid        := ConnPid,
     %% Clean Session: true -> false???
     CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
 
-    emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
+    ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
 
     %% Replay delivery and Dequeue pending messages
     noreply(ensure_stats_timer(dequeue(retry_delivery(true, State1))));
@@ -668,7 +659,7 @@ terminate(Reason, #state{will_msg = WillMsg,
                          old_conn_pid = OldConnPid}) ->
     send_willmsg(WillMsg),
     [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
-    emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
+    ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -941,7 +932,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use
     if
         Dropped =/= undefined ->
             SessProps = #{client_id => ClientId, username => Username},
-            emqx_hooks:run('message.dropped', [SessProps, Msg]);
+            ok = emqx_hooks:run('message.dropped', [SessProps, Msg]);
         true -> ok
     end,
     State#state{mqueue = NewQ}.
@@ -980,7 +971,7 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) ->
 acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight  = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {publish, {_, Msg}, _Ts}} ->
-            emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
+            ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
             State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
         none ->
             ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]),
@@ -990,7 +981,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username
 acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {publish, {_, Msg}, _Ts}} ->
-            emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
+            ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
             State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
         {value, {pubrel, PacketId, _Ts}} ->
             ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]),
@@ -1118,3 +1109,18 @@ noreply(State) ->
 shutdown(Reason, State) ->
     {stop, {shutdown, Reason}, State}.
 
+do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) ->
+    case maps:find(Topic, SubMap) of
+        {ok, SubOpts} ->
+            ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
+            SubMap;
+        {ok, _SubOpts} ->
+            emqx_broker:set_subopts(Topic, SubOpts),
+            %% Why???
+            ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
+            maps:put(Topic, SubOpts, SubMap);
+        error ->
+            emqx_broker:subscribe(Topic, ClientId, SubOpts),
+            ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
+            maps:put(Topic, SubOpts, SubMap)
+    end.

+ 0 - 3
src/emqx_sup.erl

@@ -62,8 +62,6 @@ init([]) ->
     %% Broker Sup
     BrokerSup = supervisor_spec(emqx_broker_sup),
     BridgeSup = supervisor_spec(emqx_bridge_sup),
-    %% AccessControl
-    AccessControl = worker_spec(emqx_access_control),
     %% Session Manager
     SMSup = supervisor_spec(emqx_sm_sup),
     %% Connection Manager
@@ -75,7 +73,6 @@ init([]) ->
            RouterSup,
            BrokerSup,
            BridgeSup,
-           AccessControl,
            SMSup,
            CMSup,
            SysSup]}}.

+ 2 - 0
src/emqx_topic.erl

@@ -217,6 +217,8 @@ parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
                 _ -> error({invalid_topic, Topic})
             end
     end;
+parse(Topic, Options = #{qos := QoS}) ->
+    {Topic, Options#{rc => QoS}};
 parse(Topic, Options) ->
     {Topic, Options}.
 

+ 13 - 0
test/emqx_alarm_handler_SUITE.erl

@@ -39,6 +39,16 @@ end_per_suite(_Config) ->
 local_path(RelativePath) ->
     filename:join([get_base_dir(), RelativePath]).
 
+deps_path(App, RelativePath) ->
+    %% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
+    %% but priv dir is
+    Path0 = code:priv_dir(App),
+    Path = case file:read_link(Path0) of
+               {ok, Resolved} -> Resolved;
+               {error, _} -> Path0
+           end,
+    filename:join([Path, "..", RelativePath]).
+
 get_base_dir() ->
     {file, Here} = code:is_loaded(?MODULE),
     filename:dirname(filename:dirname(Here)).
@@ -56,6 +66,9 @@ read_schema_configs(App, {SchemaFile, ConfigFile}) ->
     Vals = proplists:get_value(App, NewConfig, []),
     [application:set_env(App, Par, Value) || {Par, Value} <- Vals].
 
+set_special_configs(emqx) ->
+    application:set_env(emqx, acl_file, deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
+
 set_special_configs(_App) ->
     ok.
 

+ 1 - 1
test/emqx_broker_SUITE.erl

@@ -72,7 +72,7 @@ publish(_) ->
     ok = emqx:subscribe(<<"test/+">>),
     timer:sleep(10),
     emqx:publish(Msg),
-    ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
+    ?assert(receive {dispatch, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 5 -> false end).
 
 dispatch_with_no_sub(_) ->
     Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),

+ 21 - 23
test/emqx_hooks_SUITE.erl

@@ -21,7 +21,7 @@
 -include_lib("common_test/include/ct.hrl").
 
 all() ->
-    [add_delete_hook, run_hooks].
+    [add_delete_hook, run_hook].
 
 add_delete_hook(_) ->
     {ok, _} = emqx_hooks:start_link(),
@@ -54,57 +54,55 @@ add_delete_hook(_) ->
     ?assertEqual([], emqx_hooks:lookup(emqx_hook)),
     ok = emqx_hooks:stop().
 
-run_hooks(_) ->
+run_hook(_) ->
     {ok, _} = emqx_hooks:start_link(),
     ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
     ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}),
     ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
     ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
-    {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []),
-    {ok, []} = emqx:run_hooks(unknown_hook, [], []),
+    [r5,r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []),
+    [] = emqx:run_fold_hook(unknown_hook, [], []),
+
+    ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2),
+    ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
+    [r9] = emqx:run_fold_hook(foldl_hook2, [arg], []),
 
     ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
     {error, already_exists} = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
     ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
     ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
-    stop = emqx:run_hooks(foreach_hook, [arg]),
-
-    ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2),
-    ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
-    {stop, []} = emqx:run_hooks(foldl_hook2, [arg], []),
+    ok = emqx:run_hook(foreach_hook, [arg]),
 
-    %% foreach hook always returns 'ok' or 'stop'
     ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
-    ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg])), %% filter passed
-    ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg1])), %% filter failed
+    ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])), %% filter passed
+    ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])), %% filter failed
 
-    %% foldl hook always returns {'ok', Acc} or {'stop', Acc}
     ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}),
     ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}),
-    ?assertEqual({ok, 3}, emqx:run_hooks(foldl_filter2_hook, [arg], 1)),
-    ?assertEqual({ok, 2}, emqx:run_hooks(foldl_filter2_hook, [arg1], 1)),
+    ?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
+    ?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)),
 
     ok = emqx_hooks:stop().
 
 hook_fun1(arg) -> ok;
-hook_fun1(_) -> stop.
+hook_fun1(_) -> error.
 
 hook_fun2(arg) -> ok;
-hook_fun2(_) -> stop.
+hook_fun2(_) -> error.
 
 hook_fun2(_, Acc) -> {ok, Acc + 1}.
 hook_fun2_1(_, Acc) -> {ok, Acc + 1}.
 
 hook_fun3(arg1, arg2, _Acc, init) -> ok.
-hook_fun4(arg1, arg2, Acc, init)  -> {ok, [r2 | Acc]}.
-hook_fun5(arg1, arg2, Acc, init)  -> {stop, [r3 | Acc]}.
+hook_fun4(arg1, arg2, Acc, init)  -> {ok, [r4 | Acc]}.
+hook_fun5(arg1, arg2, Acc, init)  -> {ok, [r5 | Acc]}.
 
 hook_fun6(arg, initArg) -> ok.
-hook_fun7(arg, initArg) -> any.
-hook_fun8(arg, initArg) -> stop.
+hook_fun7(arg, initArg) -> ok.
+hook_fun8(arg, initArg) -> ok.
 
-hook_fun9(arg, _Acc)  -> any.
-hook_fun10(arg, _Acc)  -> stop.
+hook_fun9(arg, Acc)  -> {stop, [r9 | Acc]}.
+hook_fun10(arg, Acc)  -> {stop, [r10 | Acc]}.
 
 hook_filter1(arg) -> true;
 hook_filter1(_) -> false.

+ 4 - 0
test/emqx_protocol_SUITE.erl

@@ -546,6 +546,8 @@ acl_deny_do_disconnect(publish, QoS, Topic) ->
     {ok, _} = emqx_client:connect(Client),
     emqx_client:publish(Client, Topic, <<"test">>, QoS),
     receive
+        {disconnected, shutdown, tcp_closed} ->
+            ct:pal(info, "[OK] after publish, client got disconnected: tcp_closed", []);
         {'EXIT', Client, {shutdown,tcp_closed}} ->
             ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"),
             false = is_process_alive(Client);
@@ -560,6 +562,8 @@ acl_deny_do_disconnect(subscribe, QoS, Topic) ->
     {ok, _} = emqx_client:connect(Client),
     {ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS),
     receive
+        {disconnected, shutdown, tcp_closed} ->
+            ct:pal(info, "[OK] after subscribe, client got disconnected: tcp_closed", []);
         {'EXIT', Client, {shutdown,tcp_closed}} ->
             ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"),
             false = is_process_alive(Client);