Przeglądaj źródła

Rewrite the hooks module

Feng Lee 7 lat temu
rodzic
commit
4635921458
1 zmienionych plików z 113 dodań i 100 usunięć
  1. 113 100
      src/emqx_hooks.erl

+ 113 - 100
src/emqx_hooks.erl

@@ -16,142 +16,160 @@
 
 -behaviour(gen_server).
 
--export([start_link/0]).
+-export([start_link/0, stop/0]).
 
 %% Hooks API
--export([add/3, add/4, delete/2, run/2, run/3, lookup/1]).
+-export([add/2, add/3, add/4, del/2, run/2, run/3, lookup/1]).
 
 %% gen_server Function Exports
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+         code_change/3]).
 
--record(state, {}).
+-type(hookpoint() :: atom()).
+-type(action() :: function() | mfa()).
+-type(filter() :: function() | mfa()).
 
--type(hooktag() :: atom() | string() | binary()).
+-record(callback, {action   :: action(),
+                   filter   :: filter(),
+                   priority :: integer()}).
 
--export_type([hooktag/0]).
+-record(hook, {name :: hookpoint(), callbacks :: list(#callback{})}).
 
--record(callback, {tag            :: hooktag(),
-                   function       :: function(),
-                   init_args = [] :: list(any()),
-                   priority  = 0  :: integer()}).
-
--record(hook, {name :: atom(), callbacks = [] :: list(#callback{})}).
+-export_type([hookpoint/0, action/0, filter/0]).
 
 -define(TAB, ?MODULE).
+-define(SERVER, ?MODULE).
 
+-spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-%%--------------------------------------------------------------------
-%% Hooks API
-%%--------------------------------------------------------------------
-
--spec(add(atom(), function() | {hooktag(), function()}, list(any())) -> ok).
-add(HookPoint, Function, InitArgs) when is_function(Function) ->
-    add(HookPoint, {undefined, Function}, InitArgs, 0);
-
-add(HookPoint, {Tag, Function}, InitArgs) when is_function(Function) ->
-    add(HookPoint, {Tag, Function}, InitArgs, 0).
-
--spec(add(atom(), function() | {hooktag(), function()}, list(any()), integer()) -> ok).
-add(HookPoint, Function, InitArgs, Priority) when is_function(Function) ->
-    add(HookPoint, {undefined, Function}, InitArgs, Priority);
-add(HookPoint, {Tag, Function}, InitArgs, Priority) when is_function(Function) ->
-    gen_server:call(?MODULE, {add, HookPoint, {Tag, Function}, InitArgs, Priority}).
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{hibernate_after, 60000}]).
 
--spec(delete(atom(), function() | {hooktag(), function()}) -> ok).
-delete(HookPoint, Function) when is_function(Function) ->
-    delete(HookPoint, {undefined, Function});
-delete(HookPoint, {Tag, Function}) when is_function(Function) ->
-    gen_server:call(?MODULE, {delete, HookPoint, {Tag, Function}}).
+-spec(stop() -> ok).
+stop() ->
+    gen_server:stop(?SERVER, normal, infinity).
 
-%% @doc Run hooks without Acc.
+%%------------------------------------------------------------------------------
+%% Hooks API
+%%------------------------------------------------------------------------------
+
+%% @doc Register a callback
+-spec(add(hookpoint(), action() | #callback{}) -> emqx_types:ok_or_error(already_exists)).
+add(HookPoint, Callback) when is_record(Callback, callback) ->
+    gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
+add(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
+    add(HookPoint, #callback{action = Action, priority = 0}).
+
+-spec(add(hookpoint(), action(), filter() | integer() | list())
+      -> emqx_types:ok_or_error(already_exists)).
+add(HookPoint, Action, InitArgs) when is_function(Action), is_list(InitArgs) ->
+    add(HookPoint, #callback{action = {Action, InitArgs}, priority = 0});
+add(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
+    add(HookPoint, #callback{action = Action, filter = Filter, priority = 0});
+add(HookPoint, Action, Priority) when is_integer(Priority) ->
+    add(HookPoint, #callback{action = Action, priority = Priority}).
+
+-spec(add(hookpoint(), action(), filter(), integer())
+      -> emqx_types:ok_or_error(already_exists)).
+add(HookPoint, Action, Filter, Priority) ->
+    add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
+
+%% @doc Unregister a callback.
+-spec(del(hookpoint(), action()) -> ok).
+del(HookPoint, Action) ->
+    gen_server:cast(?SERVER, {del, HookPoint, Action}).
+
+%% @doc Run hooks.
 -spec(run(atom(), list(Arg :: any())) -> ok | stop).
 run(HookPoint, Args) ->
     run_(lookup(HookPoint), Args).
 
+%% @doc Run hooks with Accumulator.
 -spec(run(atom(), list(Arg :: any()), any()) -> any()).
 run(HookPoint, Args, Acc) ->
     run_(lookup(HookPoint), Args, Acc).
 
 %% @private
-run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args) ->
-    case apply(Fun, lists:append([Args, InitArgs])) of
+run_([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
+    case filtered(Filter, Args) orelse execute(Action, Args) of
+        true -> run_(Callbacks, Args);
         ok   -> run_(Callbacks, Args);
         stop -> stop;
         _Any -> run_(Callbacks, Args)
     end;
-
 run_([], _Args) ->
     ok.
 
 %% @private
-run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) ->
-    case apply(Fun, lists:append([Args, [Acc], InitArgs])) of
+run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
+    Args1 = Args ++ [Acc],
+    case filtered(Filter, Args1) orelse execute(Action, Args1) of
+        true           -> run_(Callbacks, Args, Acc);
         ok             -> run_(Callbacks, Args, Acc);
         {ok, NewAcc}   -> run_(Callbacks, Args, NewAcc);
         stop           -> {stop, Acc};
         {stop, NewAcc} -> {stop, NewAcc};
         _Any           -> run_(Callbacks, Args, Acc)
     end;
-
 run_([], _Args, Acc) ->
     {ok, Acc}.
 
--spec(lookup(atom()) -> [#callback{}]).
+filtered(undefined, _Args) ->
+    false;
+filtered(Filter, Args) ->
+    execute(Filter, Args).
+
+execute(Action, Args) when is_function(Action) ->
+    erlang:apply(Action, Args);
+execute({Fun, InitArgs}, Args) when is_function(Fun) ->
+    erlang:apply(Fun, Args ++ InitArgs);
+execute({M, F, A}, Args) ->
+    erlang:apply(M, F, Args ++ A).
+
+%% @doc Lookup callbacks.
+-spec(lookup(hookpoint()) -> [#callback{}]).
 lookup(HookPoint) ->
     case ets:lookup(?TAB, HookPoint) of
-        [#hook{callbacks = Callbacks}] -> Callbacks;
+        [#hook{callbacks = Callbacks}] ->
+            Callbacks;
         [] -> []
     end.
 
-%%--------------------------------------------------------------------
+%%-----------------------------------------------------------------------------
 %% gen_server callbacks
-%%--------------------------------------------------------------------
+%%-----------------------------------------------------------------------------
 
 init([]) ->
-    _ = emqx_tables:new(?TAB, [set, protected, {keypos, #hook.name},
-                               {read_concurrency, true}]),
-    {ok, #state{}}.
-
-handle_call({add, HookPoint, {Tag, Function}, InitArgs, Priority}, _From, State) ->
-    Callback = #callback{tag = Tag, function = Function,
-                         init_args = InitArgs, priority = Priority},
-    {reply,
-     case ets:lookup(?TAB, HookPoint) of
-         [#hook{callbacks = Callbacks}] ->
-             case contain_(Tag, Function, Callbacks) of
-                 false ->
-                     insert_hook_(HookPoint, add_callback_(Callback, Callbacks));
-                 true  ->
-                     {error, already_hooked}
-             end;
-         [] ->
-             insert_hook_(HookPoint, [Callback])
-     end, State};
-
-handle_call({delete, HookPoint, {Tag, Function}}, _From, State) ->
-    {reply,
-     case ets:lookup(?TAB, HookPoint) of
-         [#hook{callbacks = Callbacks}] ->
-             case contain_(Tag, Function, Callbacks) of
-                 true  ->
-                     insert_hook_(HookPoint, del_callback_(Tag, Function, Callbacks));
-                 false ->
-                     {error, not_found}
-             end;
-         [] ->
-             {error, not_found}
-     end, State};
+    _ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
+    {ok, #{}}.
+
+handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
+    Reply = case lists:keyfind(Action, 2, Callbacks = lookup(HookPoint)) of
+                true ->
+                    {error, already_exists};
+                false ->
+                    insert_hook(HookPoint, add_callback(Callback, Callbacks))
+            end,
+    {reply, Reply, State};
+
+handle_call({del, HookPoint, Action}, _From, State) ->
+    case lists:keydelete(Action, 2, lookup(HookPoint)) of
+        [] ->
+            ets:delete(?TAB, HookPoint);
+        Callbacks ->
+            insert_hook(HookPoint, Callbacks)
+    end,
+    {reply, ok, State};
 
 handle_call(Req, _From, State) ->
-    {reply, {error, {unexpected_request, Req}}, State}.
+    emqx_logger:error("[Hooks] unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
 
-handle_cast(_Msg, State) ->
+handle_cast(Msg, State) ->
+    emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]),
     {noreply, State}.
 
-handle_info(_Info, State) ->
+handle_info(Info, State) ->
+    emqx_logger:error("[Hooks] unexpected info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, _State) ->
@@ -160,26 +178,21 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%--------------------------------------------------------------------
+%%-----------------------------------------------------------------------------
 %% Internal functions
-%%--------------------------------------------------------------------
+%%-----------------------------------------------------------------------------
 
-insert_hook_(HookPoint, Callbacks) ->
+insert_hook(HookPoint, Callbacks) ->
     ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok.
 
-add_callback_(Callback, Callbacks) ->
-    lists:keymerge(#callback.priority, Callbacks, [Callback]).
-
-del_callback_(Tag, Function, Callbacks) ->
-    lists:filter(
-      fun(#callback{tag = Tag1, function = Func1}) ->
-        not ((Tag =:= Tag1) andalso (Function =:= Func1))
-      end, Callbacks).
+add_callback(C, Callbacks) ->
+    add_callback(C, Callbacks, []).
 
-contain_(_Tag, _Function, []) ->
-    false;
-contain_(Tag, Function, [#callback{tag = Tag, function = Function}|_Callbacks]) ->
-    true;
-contain_(Tag, Function, [_Callback | Callbacks]) ->
-    contain_(Tag, Function, Callbacks).
+add_callback(C, [], Acc) ->
+    lists:reverse([C|Acc]);
+add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More], Acc)
+    when P1 =< P2 ->
+    add_callback(C1, More, [C2|Acc]);
+add_callback(C1, More, Acc) ->
+    lists:append(lists:reverse(Acc), [C1 | More]).