Преглед изворни кода

feat(resource): keep restart disconnected resources after emqx bootup (#4125)

* feat(resource): keep restart disconnected resources after emqx bootup

* feat(resource): improve the restart monitor

* fix(test): improve emqx_rule_monitor_SUITE

* fix(resource): refresh resource should be only applied on local node

* fix(test): improve the test case for restart_resource

* fix(resource): rename some functions
Shawn пре 5 година
родитељ
комит
1e047e84c2

+ 7 - 16
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -65,6 +65,8 @@
              , action_instance_params/0
              , action_instance_params/0
              ]).
              ]).
 
 
+-define(T_RETRY, 60000).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Load resource/action providers from all available applications
 %% Load resource/action providers from all available applications
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -217,7 +219,7 @@ delete_rule(RuleId) ->
             catch
             catch
                 Error:Reason:ST ->
                 Error:Reason:ST ->
                     ?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]),
                     ?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]),
-                    refresh_actions(Actions, fun(_) -> true end)
+                    refresh_actions(Actions)
             end;
             end;
         not_found ->
         not_found ->
             ok
             ok
@@ -388,16 +390,8 @@ refresh_resource(Type) when is_atom(Type) ->
     lists:foreach(fun refresh_resource/1,
     lists:foreach(fun refresh_resource/1,
                   emqx_rule_registry:get_resources_by_type(Type));
                   emqx_rule_registry:get_resources_by_type(Type));
 
 
-refresh_resource(#resource{id = ResId, config = Config, type = Type}) ->
-    {ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
-        try cluster_call(init_resource, [M, F, ResId, Config])
-        catch Error:Reason:ST ->
-            logger:critical(
-                "Can not re-stablish resource ~p: ~0p. The resource is disconnected."
-                "Fix the issue and establish it manually.\n"
-                "Stacktrace: ~0p",
-                [ResId, {Error, Reason}, ST])
-        end.
+refresh_resource(#resource{id = ResId}) ->
+    emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY).
 
 
 -spec(refresh_rules() -> ok).
 -spec(refresh_rules() -> ok).
 refresh_rules() ->
 refresh_rules() ->
@@ -414,7 +408,7 @@ refresh_rules() ->
 
 
 refresh_rule(#rule{id = RuleId, actions = Actions}) ->
 refresh_rule(#rule{id = RuleId, actions = Actions}) ->
     ok = emqx_rule_metrics:create_rule_metrics(RuleId),
     ok = emqx_rule_metrics:create_rule_metrics(RuleId),
-    refresh_actions(Actions, fun(_) -> true end).
+    refresh_actions(Actions).
 
 
 -spec(refresh_resource_status() -> ok).
 -spec(refresh_resource_status() -> ok).
 refresh_resource_status() ->
 refresh_resource_status() ->
@@ -529,10 +523,7 @@ cluster_call(Func, Args) ->
    end.
    end.
 
 
 init_resource(Module, OnCreate, ResId, Config) ->
 init_resource(Module, OnCreate, ResId, Config) ->
-    Params = ?RAISE(
-        begin
-            Module:OnCreate(ResId, Config)
-        end,
+    Params = ?RAISE(Module:OnCreate(ResId, Config),
         {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
         {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
     ResParams = #resource_params{id = ResId,
     ResParams = #resource_params{id = ResId,
                                  params = Params,
                                  params = Params,

+ 8 - 2
apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl

@@ -27,7 +27,7 @@
 -export([init/1]).
 -export([init/1]).
 
 
 start_link() ->
 start_link() ->
-	supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
 init([]) ->
 init([]) ->
     Opts = [public, named_table, set, {read_concurrency, true}],
     Opts = [public, named_table, set, {read_concurrency, true}],
@@ -45,7 +45,13 @@ init([]) ->
                 shutdown => 5000,
                 shutdown => 5000,
                 type => worker,
                 type => worker,
                 modules => [emqx_rule_metrics]},
                 modules => [emqx_rule_metrics]},
-    {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
+    Monitor = #{id => emqx_rule_monitor,
+                start => {emqx_rule_monitor, start_link, []},
+                restart => permanent,
+                shutdown => 5000,
+                type => worker,
+                modules => [emqx_rule_monitor]},
+    {ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}.
 
 
 start_locker() ->
 start_locker() ->
     Locker = #{id => emqx_rule_locker,
     Locker = #{id => emqx_rule_locker,

+ 117 - 0
apps/emqx_rule_engine/src/emqx_rule_monitor.erl

@@ -0,0 +1,117 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_monitor).
+
+-behavior(gen_server).
+
+-include("rule_engine.hrl").
+-include_lib("emqx/include/logger.hrl").
+-logger_header("[Rule Monitor]").
+
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
+-export([ start_link/0
+        , stop/0
+        , ensure_resource_retrier/2
+        , retry_loop/3
+        ]).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+stop() ->
+    gen_server:stop(?MODULE).
+
+init([]) ->
+    _ = erlang:process_flag(trap_exit, true),
+    {ok, #{retryers => #{}}}.
+
+ensure_resource_retrier(ResId, Interval) ->
+    gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
+
+handle_call(_Msg, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
+    Objects = maps:get(Tag, State, #{}),
+    NewState = case maps:find(Obj, Objects) of
+        error ->
+            update_object(Tag, Obj,
+                create_restart_handler(Tag, Obj, Interval), State);
+        {ok, _Pid} ->
+            State
+    end,
+    {noreply, NewState};
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) ->
+    case maps:take(Pid, Retryers) of
+        {{Tag, Obj}, Retryers2} ->
+            Objects = maps:get(Tag, State, #{}),
+            {noreply, State#{Tag => maps:remove(Obj, Objects),
+                             retryers => Retryers2}};
+        error ->
+            ?LOG(error, "got unexpected proc down: ~p ~p", [Pid, Reason]),
+            {noreply, State}
+    end;
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+update_object(Tag, Obj, Retryer, State) ->
+    Objects = maps:get(Tag, State, #{}),
+    Retryers = maps:get(retryers, State, #{}),
+    State#{
+        Tag => Objects#{Obj => Retryer},
+        retryers => Retryers#{Retryer => {Tag, Obj}}
+    }.
+
+create_restart_handler(Tag, Obj, Interval) ->
+    ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]),
+    %% spawn a dedicated process to handle the restarting asynchronously
+    spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
+
+retry_loop(resource, ResId, Interval) ->
+    case emqx_rule_registry:find_resource(ResId) of
+        {ok, #resource{type = Type, config = Config}} ->
+            try
+                {ok, #resource_type{on_create = {M, F}}} =
+                    emqx_rule_registry:find_resource_type(Type),
+                emqx_rule_engine:init_resource(M, F, ResId, Config)
+            catch
+                Err:Reason:ST ->
+                    ?LOG(warning, "init_resource failed: ~p, ~0p",
+                        [{Err, Reason}, ST]),
+                    timer:sleep(Interval),
+                    ?MODULE:retry_loop(resource, ResId, Interval)
+            end;
+        not_found ->
+            ok
+    end.

+ 107 - 0
apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl

@@ -0,0 +1,107 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_monitor_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx_rule_engine/include/rule_engine.hrl").
+-include_lib("emqx/include/emqx.hrl").
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    [ {group, resource}
+    ].
+
+suite() ->
+    [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
+
+groups() ->
+    [{resource, [sequence],
+      [ t_restart_resource
+      ]}
+    ].
+
+init_per_suite(Config) ->
+    ok = ekka_mnesia:start(),
+    ok = emqx_rule_registry:mnesia(boot),
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_testcase(t_restart_resource, Config) ->
+    Opts = [public, named_table, set, {read_concurrency, true}],
+    _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
+    ets:new(t_restart_resource, [named_table, public]),
+    ets:insert(t_restart_resource, {failed_count, 0}),
+    ets:insert(t_restart_resource, {succ_count, 0}),
+    Config;
+
+init_per_testcase(_, Config) ->
+    Config.
+
+end_per_testcase(t_restart_resource, Config) ->
+    ets:delete(t_restart_resource),
+    Config;
+end_per_testcase(_, Config) ->
+    Config.
+
+t_restart_resource(_) ->
+    {ok, _} = emqx_rule_monitor:start_link(),
+    ok = emqx_rule_registry:register_resource_types(
+            [#resource_type{
+                name = test_res_1,
+                provider = ?APP,
+                params_spec = #{},
+                on_create = {?MODULE, on_resource_create},
+                on_destroy = {?MODULE, on_resource_destroy},
+                on_status = {?MODULE, on_get_resource_status},
+                title = #{en => <<"Test Resource">>},
+                description = #{en => <<"Test Resource">>}}]),
+    ok = emqx_rule_engine:load_providers(),
+    {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
+            #{type => test_res_1,
+              config => #{},
+              description => <<"debug resource">>}),
+    [{_, 1}] = ets:lookup(t_restart_resource, failed_count),
+    [{_, 0}] = ets:lookup(t_restart_resource, succ_count),
+    ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]),
+    emqx_rule_monitor:ensure_resource_retrier(ResId, 100),
+    timer:sleep(1000),
+    [{_, 5}] = ets:lookup(t_restart_resource, failed_count),
+    [{_, 1}] = ets:lookup(t_restart_resource, succ_count),
+    #{retryers := Pids} = sys:get_state(whereis(emqx_rule_monitor)),
+    ?assertEqual(0, map_size(Pids)),
+    ok = emqx_rule_engine:unload_providers(),
+    emqx_rule_registry:remove_resource(ResId),
+    emqx_rule_monitor:stop(),
+    ok.
+
+on_resource_create(Id, _) ->
+    case ets:lookup(t_restart_resource, failed_count) of
+        [{_, 5}] ->
+            ets:insert(t_restart_resource, {succ_count, 1}),
+            #{};
+        [{_, N}] ->
+            ets:insert(t_restart_resource, {failed_count, N+1}),
+            error({incorrect_params, Id})
+    end.
+on_resource_destroy(_Id, _) -> ok.
+on_get_resource_status(_Id, _) -> #{}.