Przeglądaj źródła

Improve the Hooks's design

Feng Lee 7 lat temu
rodzic
commit
2a75105580

+ 42 - 35
src/emqx.erl

@@ -29,16 +29,16 @@
 -export([get_subopts/2, set_subopts/3]).
 
 %% Hooks API
--export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]).
+-export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]).
 
 %% Shutdown and reboot
 -export([shutdown/0, shutdown/1, reboot/0]).
 
 -define(APP, ?MODULE).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Bootstrap, is_running...
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 %% @doc Start emqx application
 -spec(start() -> {ok, list(atom())} | {error, term()}).
@@ -62,9 +62,9 @@ is_running(Node) ->
         Pid when is_pid(Pid) -> true
     end.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% PubSub API
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -spec(subscribe(emqx_topic:topic() | string()) -> ok).
 subscribe(Topic) ->
@@ -97,9 +97,9 @@ unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
 unsubscribe(Topic, SubPid) when is_pid(SubPid) ->
     emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% PubSub management API
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber())
       -> emqx_types:subopts()).
@@ -128,36 +128,43 @@ subscribed(Topic, SubPid) when is_pid(SubPid) ->
 subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
     emqx_broker:subscribed(iolist_to_binary(Topic), SubId).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Hooks API
-%%--------------------------------------------------------------------
-
--spec(hook(atom(), function() | {emqx_hooks:hooktag(), function()}, list(any()))
-      -> ok | {error, term()}).
-hook(Hook, TagFunction, InitArgs) ->
-    emqx_hooks:add(Hook, TagFunction, InitArgs).
-
--spec(hook(atom(), function() | {emqx_hooks:hooktag(), function()}, list(any()), integer())
-      -> ok | {error, term()}).
-hook(Hook, TagFunction, InitArgs, Priority) ->
-    emqx_hooks:add(Hook, TagFunction, InitArgs, Priority).
-
--spec(unhook(atom(), function() | {emqx_hooks:hooktag(), function()})
-      -> ok | {error, term()}).
-unhook(Hook, TagFunction) ->
-    emqx_hooks:delete(Hook, TagFunction).
-
--spec(run_hooks(atom(), list(any())) -> ok | stop).
-run_hooks(Hook, Args) ->
-    emqx_hooks:run(Hook, Args).
-
--spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}).
-run_hooks(Hook, Args, Acc) ->
-    emqx_hooks:run(Hook, Args, Acc).
-
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+
+-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}).
+hook(HookPoint, Action) ->
+    emqx_hooks:add(HookPoint, Action).
+
+-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer())
+      -> ok | {error, already_exists}).
+hook(HookPoint, Action, Priority) when is_integer(Priority) ->
+    emqx_hooks:add(HookPoint, Action, Priority);
+hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
+    emqx_hooks:add(HookPoint, Action, Filter);
+hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
+    emqx_hooks:add(HookPoint, Action, InitArgs).
+
+-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer())
+      -> ok | {error, already_exists}).
+hook(HookPoint, Action, Filter, Priority) ->
+    emqx_hooks:add(HookPoint, Action, Filter, Priority).
+
+-spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok).
+unhook(HookPoint, Action) ->
+    emqx_hooks:del(HookPoint, Action).
+
+-spec(run_hooks(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
+run_hooks(HookPoint, Args) ->
+    emqx_hooks:run(HookPoint, Args).
+
+-spec(run_hooks(emqx_hooks:hookpoint(), list(any()), any()) -> {ok | stop, any()}).
+run_hooks(HookPoint, Args, Acc) ->
+    emqx_hooks:run(HookPoint, Args, Acc).
+
+%%------------------------------------------------------------------------------
 %% Shutdown and reboot
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 shutdown() ->
     shutdown(normal).

+ 26 - 12
src/emqx_hooks.erl

@@ -134,16 +134,16 @@ lookup(HookPoint) ->
         [] -> []
     end.
 
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% gen_server callbacks
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 init([]) ->
     _ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
     {ok, #{}}.
 
 handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
-    Reply = case lists:keyfind(Action, 2, Callbacks = lookup(HookPoint)) of
+    Reply = case lists:keymember(Action, 2, Callbacks = lookup(HookPoint)) of
                 true ->
                     {error, already_exists};
                 false ->
@@ -151,18 +151,18 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat
             end,
     {reply, Reply, State};
 
-handle_call({del, HookPoint, Action}, _From, State) ->
-    case lists:keydelete(Action, 2, lookup(HookPoint)) of
+handle_call(Req, _From, State) ->
+    emqx_logger:error("[Hooks] unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast({del, HookPoint, Action}, State) ->
+    case del_callback(Action, lookup(HookPoint)) of
         [] ->
             ets:delete(?TAB, HookPoint);
         Callbacks ->
             insert_hook(HookPoint, Callbacks)
     end,
-    {reply, ok, State};
-
-handle_call(Req, _From, State) ->
-    emqx_logger:error("[Hooks] unexpected call: ~p", [Req]),
-    {reply, ignored, State}.
+    {noreply, State};
 
 handle_cast(Msg, State) ->
     emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]),
@@ -178,9 +178,9 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Internal functions
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 insert_hook(HookPoint, Callbacks) ->
     ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok.
@@ -196,3 +196,17 @@ add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More]
 add_callback(C1, More, Acc) ->
     lists:append(lists:reverse(Acc), [C1 | More]).
 
+del_callback(Action, Callbacks) ->
+    del_callback(Action, Callbacks, []).
+
+del_callback(_Action, [], Acc) ->
+    lists:reverse(Acc);
+del_callback(Action, [#callback{action = Action} | Callbacks], Acc) ->
+    del_callback(Action, Callbacks, Acc);
+del_callback(Action = {M, F}, [#callback{action = {M, F, _A}} | Callbacks], Acc) ->
+    del_callback(Action, Callbacks, Acc);
+del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) ->
+    del_callback(Func, Callbacks, Acc);
+del_callback(Action, [Callback | Callbacks], Acc) ->
+    del_callback(Action, Callbacks, [Callback | Acc]).
+

+ 12 - 12
src/emqx_mod_presence.erl

@@ -19,24 +19,24 @@
 -include("emqx.hrl").
 
 -export([load/1, unload/1]).
+
 -export([on_client_connected/4, on_client_disconnected/3]).
 
+-define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]).
+
 load(Env) ->
     emqx_hooks:add('client.connected',    fun ?MODULE:on_client_connected/4, [Env]),
     emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
 
 on_client_connected(#{client_id := ClientId,
                       username  := Username,
-                      peername  := {IpAddr, _}}, ConnAck, ConnInfo, Env) ->
+                      peername  := {IpAddr, _}}, ConnAck, ConnAttrs, Env) ->
+    Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs),
     case emqx_json:safe_encode([{clientid, ClientId},
                                 {username, Username},
                                 {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))},
-                                {clean_start, proplists:get_value(clean_start, ConnInfo)},
-                                {proto_ver, proplists:get_value(proto_ver, ConnInfo)},
-                                {proto_name, proplists:get_value(proto_name, ConnInfo)},
-                                {keepalive, proplists:get_value(keepalive, ConnInfo)},
                                 {connack, ConnAck},
-                                {ts, os:system_time(second)}]) of
+                                {ts, os:system_time(second)} | Attrs]) of
         {ok, Payload} ->
             emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
         {error, Reason} ->
@@ -55,20 +55,20 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E
     end.
 
 unload(_Env) ->
-    emqx_hooks:delete('client.connected',    fun ?MODULE:on_client_connected/4),
-    emqx_hooks:delete('client.disconnected', fun ?MODULE:on_client_disconnected/3).
+    emqx_hooks:del('client.connected',    fun ?MODULE:on_client_connected/4),
+    emqx_hooks:del('client.disconnected', fun ?MODULE:on_client_disconnected/3).
 
 message(QoS, Topic, Payload) ->
-    Msg = emqx_message:make(?MODULE, QoS, Topic, iolist_to_binary(Payload)),
-    emqx_message:set_flag(sys, Msg).
+    emqx_message:set_flag(
+      sys, emqx_message:make(
+             ?MODULE, QoS, Topic, iolist_to_binary(Payload))).
 
 topic(connected, ClientId) ->
     emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"]));
 topic(disconnected, ClientId) ->
     emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])).
 
-qos(Env) ->
-    proplists:get_value(qos, Env, 0).
+qos(Env) -> proplists:get_value(qos, Env, 0).
 
 reason(Reason) when is_atom(Reason) -> Reason;
 reason({Error, _}) when is_atom(Error) -> Error;

+ 14 - 10
src/emqx_mod_rewrite.erl

@@ -21,11 +21,15 @@
 
 -export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]).
 
-load(Rules0) ->
-    Rules = compile(Rules0),
-    emqx_hooks:add('client.subscribe',  fun ?MODULE:rewrite_subscribe/3, [Rules]),
-    emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]),
-    emqx_hooks:add('message.publish',   fun ?MODULE:rewrite_publish/2, [Rules]).
+%%------------------------------------------------------------------------------
+%% Load/Unload
+%%------------------------------------------------------------------------------
+
+load(RawRules) ->
+    Rules = compile(RawRules),
+    emqx_hooks:add('client.subscribe',   fun ?MODULE:rewrite_subscribe/3, [Rules]),
+    emqx_hooks:add('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Rules]),
+    emqx_hooks:add('message.publish',    fun ?MODULE:rewrite_publish/2, [Rules]).
 
 rewrite_subscribe(_Credentials, TopicTable, Rules) ->
     {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
@@ -37,13 +41,13 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) ->
     {ok, Message#message{topic = match_rule(Topic, Rules)}}.
 
 unload(_) ->
-    emqx_hooks:delete('client.subscribe',  fun ?MODULE:rewrite_subscribe/3),
-    emqx_hooks:delete('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3),
-    emqx_hooks:delete('message.publish',   fun ?MODULE:rewrite_publish/2).
+    emqx_hooks:del('client.subscribe',   fun ?MODULE:rewrite_subscribe/3),
+    emqx_hooks:del('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3),
+    emqx_hooks:del('message.publish',    fun ?MODULE:rewrite_publish/2).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Internal functions
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 match_rule(Topic, []) ->
     Topic;

+ 1 - 57
test/emqx_broker_SUITE.erl

@@ -20,7 +20,6 @@
 -define(APP, emqx).
 
 -include_lib("eunit/include/eunit.hrl").
-
 -include_lib("common_test/include/ct.hrl").
 
 -include("emqx.hrl").
@@ -32,7 +31,6 @@ all() ->
      {group, broker},
      {group, metrics},
      {group, stats},
-     {group, hook},
      {group, alarms}].
 
 groups() ->
@@ -43,10 +41,8 @@ groups() ->
                            t_shared_subscribe,
                            'pubsub#', 'pubsub+']},
      {session, [sequence], [start_session]},
-     {broker, [sequence], [hook_unhook]},
      {metrics, [sequence], [inc_dec_metric]},
      {stats, [sequence], [set_get_stat]},
-     {hook, [sequence], [add_delete_hook, run_hooks]},
      {alarms, [sequence], [set_alarms]}
     ].
 
@@ -165,8 +161,6 @@ start_session(_) ->
 %%--------------------------------------------------------------------
 %% Broker Group
 %%--------------------------------------------------------------------
-hook_unhook(_) ->
-    ok.
 
 %%--------------------------------------------------------------------
 %% Metric Group
@@ -178,61 +172,11 @@ inc_dec_metric(_) ->
 %%--------------------------------------------------------------------
 %% Stats Group
 %%--------------------------------------------------------------------
+
 set_get_stat(_) ->
     emqx_stats:setstat('retained/max', 99),
     99 = emqx_stats:getstat('retained/max').
 
-%%--------------------------------------------------------------------
-%% Hook Test
-%%--------------------------------------------------------------------
-
-add_delete_hook(_) ->
-    ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
-    ok = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
-    {error, already_hooked} = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
-    Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0},
-                 {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}],
-    Callbacks = emqx_hooks:lookup(test_hook),
-    ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1),
-    ct:print("Callbacks: ~p~n", [emqx_hooks:lookup(test_hook)]),
-    ok = emqx:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}),
-    {error, not_found} = emqx:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}),
-    [] = emqx_hooks:lookup(test_hook),
-
-    ok = emqx:hook(emqx_hook, fun ?MODULE:hook_fun1/1, [], 9),
-    ok = emqx:hook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8),
-    Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8},
-                  {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}],
-    Callbacks2 = emqx_hooks:lookup(emqx_hook),
-    ok = emqx:unhook(emqx_hook, fun ?MODULE:hook_fun1/1),
-    ok = emqx:unhook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}),
-    [] = emqx_hooks:lookup(emqx_hook).
-
-run_hooks(_) ->
-    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
-    ok = emqx:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]),
-    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
-    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
-    {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []),
-    {ok, []} = emqx:run_hooks(unknown_hook, [], []),
-
-    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
-    ok = emqx:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]),
-    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
-    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
-    stop = emqx:run_hooks(foreach_hook, [arg]).
-
-hook_fun1([]) -> ok.
-hook_fun2([]) -> {ok, []}.
-
-hook_fun3(arg1, arg2, _Acc, init) -> ok.
-hook_fun4(arg1, arg2, Acc, init)  -> {ok, [r2 | Acc]}.
-hook_fun5(arg1, arg2, Acc, init)  -> {stop, [r3 | Acc]}.
-
-hook_fun6(arg, initArg) -> ok.
-hook_fun7(arg, initArg) -> any.
-hook_fun8(arg, initArg) -> stop.
-
 set_alarms(_) ->
     AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
     emqx_alarm_mgr:set_alarm(AlarmTest),

+ 77 - 0
test/emqx_hooks_SUITE.erl

@@ -0,0 +1,77 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_hooks_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    [add_delete_hook, run_hooks].
+
+add_delete_hook(_) ->
+    {ok, _} = emqx_hooks:start_link(),
+    ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
+    ok = emqx:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
+    ?assertEqual({error, already_exists},
+                 emqx:hook(test_hook, fun ?MODULE:hook_fun2/1, [])),
+    Callbacks = [{callback, {fun ?MODULE:hook_fun1/1, []}, undefined, 0},
+                 {callback, {fun ?MODULE:hook_fun2/1, []}, undefined, 0}],
+    ?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)),
+    ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1),
+    ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun2/1),
+    timer:sleep(1000),
+    ?assertEqual([], emqx_hooks:lookup(test_hook)),
+
+    ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 8),
+    ok = emqx:hook(emqx_hook, {?MODULE, hook_fun1, []}, 9),
+    Callbacks2 = [{callback, {?MODULE, hook_fun1, []}, undefined, 9},
+                  {callback, {?MODULE, hook_fun2, []}, undefined, 8}],
+    ?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)),
+    ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}),
+    ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}),
+    timer:sleep(1000),
+    ?assertEqual([], emqx_hooks:lookup(emqx_hook)),
+    ok = emqx_hooks:stop().
+
+run_hooks(_) ->
+    {ok, _} = emqx_hooks:start_link(),
+    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
+    ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}),
+    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
+    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
+    {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []),
+    {ok, []} = emqx:run_hooks(unknown_hook, [], []),
+
+    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
+    {error, already_exists} = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
+    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
+    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
+    stop = emqx:run_hooks(foreach_hook, [arg]),
+    ok = emqx_hooks:stop().
+
+hook_fun1([]) -> ok.
+hook_fun2([]) -> {ok, []}.
+
+hook_fun3(arg1, arg2, _Acc, init) -> ok.
+hook_fun4(arg1, arg2, Acc, init)  -> {ok, [r2 | Acc]}.
+hook_fun5(arg1, arg2, Acc, init)  -> {stop, [r3 | Acc]}.
+
+hook_fun6(arg, initArg) -> ok.
+hook_fun7(arg, initArg) -> any.
+hook_fun8(arg, initArg) -> stop.
+