Просмотр исходного кода

refactor(exhook): move all manager code into mngr module

JianBo He 4 лет назад
Родитель
Сommit
f5acf5fd0b

+ 25 - 64
apps/emqx_exhook/src/emqx_exhook.erl

@@ -20,10 +20,8 @@
 -include_lib("emqx/include/logger.hrl").
 
 
-%% Mgmt APIs
--export([ enable/2
+-export([ enable/1
         , disable/1
-        , disable_all/0
         , list/0
         ]).
 
@@ -35,64 +33,54 @@
 %% Mgmt APIs
 %%--------------------------------------------------------------------
 
-%% XXX: Only return the running servers
--spec list() -> [emqx_exhook_server:server()].
-list() ->
-    [server(Name) || Name <- running()].
-
--spec enable(binary(), map()) -> ok | {error, term()}.
-enable(Name, Options) ->
-    case lists:member(Name, running()) of
-        true ->
-            {error, already_started};
-        _ ->
-            case emqx_exhook_server:load(Name, Options) of
-                {ok, ServiceState} ->
-                    save(Name, ServiceState);
-                {error, Reason} ->
-                    ?LOG(error, "Load server ~p failed: ~p", [Name, Reason]),
-                    {error, Reason}
-            end
-    end.
+-spec enable(atom()|string()) -> ok | {error, term()}.
+enable(Name) ->
+    with_mngr(fun(Pid) -> emqx_exhook_mngr:enable(Pid, Name) end).
 
 -spec disable(binary()) -> ok | {error, term()}.
 disable(Name) ->
-    case server(Name) of
-        undefined -> {error, not_running};
-        Service ->
-            ok = emqx_exhook_server:unload(Service),
-            unsave(Name)
-    end.
+    with_mngr(fun(Pid) -> emqx_exhook_mngr:disable(Pid, Name) end).
+
+-spec list() -> [atom() | string()].
+list() ->
+    with_mngr(fun(Pid) -> emqx_exhook_mngr:list(Pid) end).
 
--spec disable_all() -> ok.
-disable_all() ->
-    lists:foreach(fun disable/1, running()).
+with_mngr(Fun) ->
+    case lists:keyfind(emqx_exhook_mngr, 1,
+                       supervisor:which_children(emqx_exhook_sup)) of
+        {_, Pid, _, _} ->
+            Fun(Pid);
+        _ ->
+            {error, no_manager_svr}
+    end.
 
-%%----------------------------------------------------------
+%%--------------------------------------------------------------------
 %% Dispatch APIs
-%%----------------------------------------------------------
+%%--------------------------------------------------------------------
 
 -spec cast(atom(), map()) -> ok.
 cast(Hookpoint, Req) ->
-    cast(Hookpoint, Req, running()).
+    cast(Hookpoint, Req, emqx_exhook_mngr:running()).
 
 cast(_, _, []) ->
     ok;
 cast(Hookpoint, Req, [ServiceName|More]) ->
     %% XXX: Need a real asynchronous running
-    _ = emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)),
+    _ = emqx_exhook_server:call(Hookpoint, Req,
+                                emqx_exhook_mngr:server(ServiceName)),
     cast(Hookpoint, Req, More).
 
 -spec call_fold(atom(), term(), function())
   -> {ok, term()}
    | {stop, term()}.
 call_fold(Hookpoint, Req, AccFun) ->
-    call_fold(Hookpoint, Req, AccFun, running()).
+    call_fold(Hookpoint, Req, AccFun, emqx_exhook_mngr:running()).
 
 call_fold(_, Req, _, []) ->
     {ok, Req};
 call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
-    case emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)) of
+    case emqx_exhook_server:call(Hookpoint, Req,
+                                 emqx_exhook_mngr:server(ServiceName)) of
         {ok, Resp} ->
             case AccFun(Req, Resp) of
                 {stop, NReq} -> {stop, NReq};
@@ -102,30 +90,3 @@ call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
         _ ->
             call_fold(Hookpoint, Req, AccFun, More)
     end.
-
-%%----------------------------------------------------------
-%% Storage
-
-save(Name, ServiceState) ->
-    Saved = persistent_term:get(?APP, []),
-    persistent_term:put(?APP, lists:reverse([Name | Saved])),
-    persistent_term:put({?APP, Name}, ServiceState).
-
-unsave(Name) ->
-    case persistent_term:get(?APP, []) of
-        [] ->
-            persistent_term:erase(?APP);
-        Saved ->
-            persistent_term:put(?APP, lists:delete(Name, Saved))
-    end,
-    persistent_term:erase({?APP, Name}),
-    ok.
-
-running() ->
-    persistent_term:get(?APP, []).
-
-server(Name) ->
-    case catch persistent_term:get({?APP, Name}) of
-        {'EXIT', {badarg,_}} -> undefined;
-        Service -> Service
-    end.

+ 15 - 13
apps/emqx_exhook/src/emqx_exhook_cli.erl

@@ -22,25 +22,18 @@
 
 cli(["server", "list"]) ->
     if_enabled(fun() ->
-        Services = emqx_exhook:list(),
-        [emqx_ctl:print("HookServer(~s)~n",
-                        [emqx_exhook_server:format(Service)]) || Service <- Services]
+        ServerNames = emqx_exhook:list(),
+        [emqx_ctl:print("Server(~s)~n", [format(Name)]) || Name <- ServerNames]
     end);
 
-cli(["server", "enable", Name0]) ->
+cli(["server", "enable", Name]) ->
     if_enabled(fun() ->
-        Name = iolist_to_binary(Name0),
-        case find_server_options(Name) of
-            undefined ->
-                emqx_ctl:print("not_found~n");
-            Opts ->
-                print(emqx_exhook:enable(Name, Opts))
-        end
+        print(emqx_exhook:enable(list_to_existing_atom(Name)))
     end);
 
 cli(["server", "disable", Name]) ->
     if_enabled(fun() ->
-        print(emqx_exhook:disable(iolist_to_binary(Name)))
+        print(emqx_exhook:disable(list_to_existing_atom(Name)))
     end);
 
 cli(["server", "stats"]) ->
@@ -73,7 +66,8 @@ find_server_options(Name) ->
 
 if_enabled(Fun) ->
     case lists:keymember(?APP, 1, application:which_applications()) of
-        true -> Fun();
+        true ->
+            Fun();
         _ -> hint()
     end.
 
@@ -87,3 +81,11 @@ stats() ->
             _ -> Acc
         end
     end, [], emqx_metrics:all())).
+
+format(Name) ->
+    case emqx_exhook_mngr:server(Name) of
+        undefined ->
+            io_lib:format("name=~s, hooks=#{}, active=false", [Name]);
+        Server ->
+            emqx_exhook_server:format(Server)
+    end.

+ 165 - 40
apps/emqx_exhook/src/emqx_exhook_mngr.erl

@@ -23,7 +23,18 @@
 -include_lib("emqx/include/logger.hrl").
 
 %% APIs
--export([start_link/2]).
+-export([start_link/3]).
+
+%% Mgmt API
+-export([ enable/2
+        , disable/2
+        , list/1
+        ]).
+
+%% Helper funcs
+-export([ running/0
+        , server/1
+        ]).
 
 %% gen_server callbacks
 -export([ init/1
@@ -36,13 +47,15 @@
 
 -record(state, {
           %% Running servers
-          running :: map(),
+          running :: map(),         %% XXX: server order?
           %% Wait to reload servers
           waiting :: map(),
           %% Marked stopped servers
           stopped :: map(),
           %% Auto reconnect timer interval
           auto_reconnect :: false | non_neg_integer(),
+          %% Request options
+          request_options :: grpc_client:options(),
           %% Timer references
           trefs :: map()
          }).
@@ -54,24 +67,40 @@
                           | {port, inet:port_number()}
                           ].
 
+-define(DEFAULT_TIMEOUT, 60000).
+
 -define(CNTER, emqx_exhook_counter).
 
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
 
--spec start_link(servers(), false | non_neg_integer())
+-spec start_link(servers(), false | non_neg_integer(), grpc_client:options())
     ->ignore
      | {ok, pid()}
      | {error, any()}.
-start_link(Servers, AutoReconnect) ->
-    gen_server:start_link(?MODULE, [Servers, AutoReconnect], []).
+start_link(Servers, AutoReconnect, ReqOpts) ->
+    gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []).
+
+-spec enable(pid(), atom()|string()) -> ok | {error, term()}.
+enable(Pid, Name) ->
+    call(Pid, {load, Name}).
+
+-spec disable(pid(), atom()|string()) -> ok | {error, term()}.
+disable(Pid, Name) ->
+    call(Pid, {unload, Name}).
+
+list(Pid) ->
+    call(Pid, list).
+
+call(Pid, Req) ->
+    gen_server:call(Pid, Req, ?DEFAULT_TIMEOUT).
 
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
-init([Servers, AutoReconnect]) ->
+init([Servers, AutoReconnect, ReqOpts]) ->
     %% XXX: Due to the ExHook Module in the enterprise,
     %% this process may start multiple times and they will share this table
     try
@@ -82,29 +111,53 @@ init([Servers, AutoReconnect]) ->
     end,
 
     %% Load the hook servers
-    {Waiting, Running} = load_all_servers(Servers),
+    {Waiting, Running} = load_all_servers(Servers, ReqOpts),
     {ok, ensure_reload_timer(
            #state{waiting = Waiting,
                   running = Running,
                   stopped = #{},
+                  request_options = ReqOpts,
                   auto_reconnect = AutoReconnect,
                   trefs = #{}
                  }
           )}.
 
 %% @private
-load_all_servers(Servers) ->
-    load_all_servers(Servers, #{}, #{}).
-load_all_servers([], Waiting, Running) ->
+load_all_servers(Servers, ReqOpts) ->
+    load_all_servers(Servers, ReqOpts, #{}, #{}).
+load_all_servers([], _Request, Waiting, Running) ->
     {Waiting, Running};
-load_all_servers([{Name, Options}|More], Waiting, Running) ->
-    {NWaiting, NRunning} = case emqx_exhook:enable(Name, Options) of
-        ok ->
-            {Waiting, Running#{Name => Options}};
-        {error, _} ->
-            {Waiting#{Name => Options}, Running}
-    end,
-    load_all_servers(More, NWaiting, NRunning).
+load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) ->
+    {NWaiting, NRunning} =
+        case emqx_exhook_server:load(Name, Options, ReqOpts) of
+            {ok, ServerState} ->
+                save(Name, ServerState),
+                {Waiting, Running#{Name => Options}};
+            {error, _} ->
+                {Waiting#{Name => Options}, Running}
+        end,
+    load_all_servers(More, ReqOpts, NWaiting, NRunning).
+
+handle_call({load, Name}, _From, State) ->
+    {Result, NState} = do_load_server(Name, State),
+    {reply, Result, NState};
+
+handle_call({unload, Name}, _From, State) ->
+    case do_unload_server(Name, State) of
+        {error, Reason} ->
+            {reply, {error, Reason}, State};
+        {ok, NState} ->
+            {reply, ok, NState}
+    end;
+
+handle_call(list, _From, State = #state{
+                                    running = Running,
+                                    waiting = Waiting,
+                                    stopped = Stopped}) ->
+    ServerNames = maps:keys(Running)
+                    ++ maps:keys(Waiting)
+                    ++ maps:keys(Stopped),
+    {reply, ServerNames, State};
 
 handle_call(_Request, _From, State) ->
     Reply = ok,
@@ -113,33 +166,27 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({timeout, _Ref, {reload, Name}},
-            State0 = #state{waiting = Waiting,
-                            running = Running,
-                            trefs = TRefs}) ->
-    State = State0#state{trefs = maps:remove(Name, TRefs)},
-    case maps:get(Name, Waiting, undefined) of
-        undefined ->
-            {noreply, State};
-        Options ->
-            case emqx_exhook:enable(Name, Options) of
-                ok ->
-                    ?LOG(warning, "Reconnect to exhook callback server "
-                                  "\"~s\" successfully!", [Name]),
-                    {noreply, State#state{
-                                running = maps:put(Name, Options, Running),
-                                waiting = maps:remove(Name, Waiting)}
-                    };
-                {error, _} ->
-                    {noreply, ensure_reload_timer(State)}
-            end
+handle_info({timeout, _Ref, {reload, Name}}, State) ->
+    {Result, NState} = do_load_server(Name, State),
+    case Result of
+        ok ->
+            {noreply, NState};
+        {error, not_found} ->
+            {noreply, NState};
+        {error, Reason} ->
+            ?LOG(warning, "Failed to reload exhook callback server \"~s\", "
+                          "Reason: ~0p", [Name, Reason]),
+            {noreply, ensure_reload_timer(NState)}
     end;
 
 handle_info(_Info, State) ->
     {noreply, State}.
 
-terminate(_Reason, _State) ->
-    _ = emqx_exhook:disable_all(),
+terminate(_Reason, State = #state{stopped = Stopped}) ->
+    _ = maps:fold(fun(Name, _, AccIn) ->
+            {ok, NAccIn} = do_unload_server(Name, AccIn),
+            NAccIn
+        end, State, Stopped),
     _ = unload_exhooks(),
     ok.
 
@@ -154,6 +201,49 @@ unload_exhooks() ->
     [emqx:unhook(Name, {M, F}) ||
      {Name, {M, F, _A}} <- ?ENABLED_HOOKS].
 
+do_load_server(Name, State0 = #state{
+                                 waiting = Waiting,
+                                 running = Running,
+                                 stopped = Stopped,
+                                 request_options = ReqOpts}) ->
+    State = clean_reload_timer(Name, State0),
+    case maps:get(Name, Running, undefined) of
+        undefined ->
+            case maps:get(Name, Stopped,
+                          maps:get(Name, Waiting, undefined)) of
+                undefined ->
+                    {{error, not_found}, State};
+                Options ->
+                    case emqx_exhook_server:load(Name, Options, ReqOpts) of
+                        {ok, ServerState} ->
+                            save(Name, ServerState),
+                            ?LOG(info, "Load exhook callback server "
+                                          "\"~s\" successfully!", [Name]),
+                            {ok, State#state{
+                                   running = maps:put(Name, Options, Running),
+                                   waiting = maps:remove(Name, Waiting),
+                                   stopped = maps:remove(Name, Stopped)
+                                  }
+                            };
+                        {error, Reason} ->
+                            {{error, Reason}, State}
+                    end
+            end;
+        _ ->
+            {{error, already_started}, State}
+    end.
+
+do_unload_server(Name, State = #state{running = Running, stopped = Stopped}) ->
+    case maps:take(Name, Running) of
+        error -> {error, not_running};
+        {Options, NRunning} ->
+            ok = emqx_exhook_server:unload(server(Name)),
+            ok = unsave(Name),
+            {ok, State#state{running = NRunning,
+                             stopped = maps:put(Name, Options, Stopped)
+                            }}
+    end.
+
 ensure_reload_timer(State = #state{auto_reconnect = false}) ->
     State;
 ensure_reload_timer(State = #state{waiting = Waiting,
@@ -169,3 +259,38 @@ ensure_reload_timer(State = #state{waiting = Waiting,
         end
     end, TRefs, Waiting),
     State#state{trefs = NRefs}.
+
+clean_reload_timer(Name, State = #state{trefs = TRefs}) ->
+    case maps:take(Name, TRefs) of
+        error -> State;
+        {TRef, NTRefs} ->
+            _ = erlang:cancel_timer(TRef),
+            State#state{trefs = NTRefs}
+    end.
+
+%%--------------------------------------------------------------------
+%% Server state persistent
+
+save(Name, ServerState) ->
+    Saved = persistent_term:get(?APP, []),
+    persistent_term:put(?APP, lists:reverse([Name | Saved])),
+    persistent_term:put({?APP, Name}, ServerState).
+
+unsave(Name) ->
+    case persistent_term:get(?APP, []) of
+        [] ->
+            persistent_term:erase(?APP);
+        Saved ->
+            persistent_term:put(?APP, lists:delete(Name, Saved))
+    end,
+    persistent_term:erase({?APP, Name}),
+    ok.
+
+running() ->
+    persistent_term:get(?APP, []).
+
+server(Name) ->
+    case catch persistent_term:get({?APP, Name}) of
+        {'EXIT', {badarg,_}} -> undefined;
+        Service -> Service
+    end.

+ 20 - 19
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -24,7 +24,7 @@
 -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
 
 %% Load/Unload
--export([ load/2
+-export([ load/3
         , unload/1
         ]).
 
@@ -39,8 +39,8 @@
 -record(server, {
           %% Server name (equal to grpc client channel name)
           name :: server_name(),
-          %% The server started options
-          options :: options(),
+          %% The function options
+          options :: map(),
           %% gRPC channel pid
           channel :: pid(),
           %% Registered hook names and options
@@ -84,8 +84,8 @@
 %% Load/Unload APIs
 %%--------------------------------------------------------------------
 
--spec load(binary(), options()) -> {ok, server()} | {error, term()} .
-load(Name0, Opts0) ->
+-spec load(atom(), options(), map()) -> {ok, server()} | {error, term()} .
+load(Name0, Opts0, ReqOpts) ->
     Name = to_list(Name0),
     {SvrAddr, ClientOpts} = channel_opts(Opts0),
     case emqx_exhook_sup:start_grpc_client_channel(
@@ -93,7 +93,7 @@ load(Name0, Opts0) ->
            SvrAddr,
            ClientOpts) of
         {ok, _ChannPoolPid} ->
-            case do_init(Name) of
+            case do_init(Name, ReqOpts) of
                 {ok, HookSpecs} ->
                     %% Reigster metrics
                     Prefix = lists:flatten(
@@ -102,7 +102,7 @@ load(Name0, Opts0) ->
                     %% Ensure hooks
                     ensure_hooks(HookSpecs),
                     {ok, #server{name = Name,
-                                 options = Opts0,
+                                 options = ReqOpts,
                                  channel = _ChannPoolPid,
                                  hookspec = HookSpecs,
                                  prefix = Prefix }};
@@ -149,22 +149,22 @@ filter(Ls) ->
     [ E || E <- Ls, E /= undefined].
 
 -spec unload(server()) -> ok.
-unload(#server{name = Name, hookspec = HookSpecs}) ->
-    _ = do_deinit(Name),
+unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) ->
+    _ = do_deinit(Name, ReqOpts),
     _ = may_unload_hooks(HookSpecs),
     _ = emqx_exhook_sup:stop_grpc_client_channel(Name),
     ok.
 
-do_deinit(Name) ->
-    _ = do_call(Name, 'on_provider_unloaded', #{}),
+do_deinit(Name, ReqOpts) ->
+    _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts),
     ok.
 
-do_init(ChannName) ->
+do_init(ChannName, ReqOpts) ->
     %% BrokerInfo defined at: exhook.protos
     BrokerInfo = maps:with([version, sysdescr, uptime, datetime],
                         maps:from_list(emqx_sys:info())),
     Req = #{broker => BrokerInfo},
-    case do_call(ChannName, 'on_provider_loaded', Req) of
+    case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of
         {ok, InitialResp} ->
             try
                 {ok, resovle_hookspec(maps:get(hooks, InitialResp, []))}
@@ -230,7 +230,7 @@ may_unload_hooks(HookSpecs) ->
     end, maps:keys(HookSpecs)).
 
 format(#server{name = Name, hookspec = Hooks}) ->
-    io_lib:format("name=~p, hooks=~0p", [Name, Hooks]).
+    io_lib:format("name=~s, hooks=~0p, active=true", [Name, Hooks]).
 
 %%--------------------------------------------------------------------
 %% APIs
@@ -243,7 +243,8 @@ name(#server{name = Name}) ->
   -> ignore
    | {ok, Resp :: term()}
    | {error, term()}.
-call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix}) ->
+call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
+                             hookspec = Hooks, prefix = Prefix}) ->
     GrpcFunc = hk2func(Hookpoint),
     case maps:get(Hookpoint, Hooks, undefined) of
         undefined -> ignore;
@@ -258,7 +259,7 @@ call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix
                 false -> ignore;
                 _ ->
                     inc_metrics(Prefix, Hookpoint),
-                    do_call(ChannName, GrpcFunc, Req)
+                    do_call(ChannName, GrpcFunc, Req, ReqOpts)
             end
     end.
 
@@ -276,9 +277,9 @@ match_topic_filter(_, []) ->
 match_topic_filter(TopicName, TopicFilter) ->
     lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
 
--spec do_call(string(), atom(), map()) -> {ok, map()} | {error, term()}.
-do_call(ChannName, Fun, Req) ->
-    Options = #{channel => ChannName},
+-spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
+do_call(ChannName, Fun, Req, ReqOpts) ->
+    Options = ReqOpts#{channel => ChannName},
     ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
     case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
         {ok, Resp, _Metadata} ->

+ 5 - 1
apps/emqx_exhook/src/emqx_exhook_sup.erl

@@ -41,7 +41,8 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    Mngr = ?CHILD(emqx_exhook_mngr, worker, [servers(), auto_reconnect()]),
+    Mngr = ?CHILD(emqx_exhook_mngr, worker,
+                  [servers(), auto_reconnect(), request_options()]),
     {ok, {{one_for_one, 10, 100}, [Mngr]}}.
 
 servers() ->
@@ -50,6 +51,9 @@ servers() ->
 auto_reconnect() ->
     application:get_env(emqx_exhook, auto_reconnect, 60000).
 
+request_options() ->
+    #{timeout => application:get_env(emqx_exhook, request_timeout, 5000)}.
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------