Browse Source

fix(rule_engine): add resource reconnect check (#3974)

* fix(rule_engine): add resource reconnect check
* fix(rule_engine): add reason for RAISE macro
bignullnull 5 years atrás
parent
commit
37974f7376

+ 20 - 6
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -140,15 +140,29 @@
 -define(is_logical(Op), (Op =:= 'and' orelse Op =:= 'or')).
 
 -define(RAISE(_EXP_, _ERROR_),
-        begin
-          fun() ->
-            try (_EXP_) catch _:_REASON_:_ST_ -> throw(_ERROR_) end
-          end()
-        end).
+        fun() ->
+            try (_EXP_)
+            catch _EXCLASS_:_REASON_:_ST_ ->
+                throw(_ERROR_)
+            end
+        end()).
+
+-define(RAISE(_EXP_, _EXP_ON_FAIL_, _ERROR_),
+        fun() ->
+            try (_EXP_)
+            catch _EXCLASS_:_EXCPTION_:_ST_ ->
+                _REASON_ = {_EXCLASS_, _EXCPTION_},
+                _EXP_ON_FAIL_,
+                throw(_ERROR_)
+            end
+        end()).
 
 -define(THROW(_EXP_, _ERROR_),
         begin
-            try (_EXP_) catch _:_ -> throw(_ERROR_) end
+            try (_EXP_)
+            catch _:_ ->
+                throw(_ERROR_)
+            end
         end).
 
 %% Tables

+ 18 - 2
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -490,8 +490,13 @@ cluster_call(Func, Args) ->
 
 init_resource(Module, OnCreate, ResId, Config) ->
     Params = ?RAISE(Module:OnCreate(ResId, Config),
-                    {{init_resource_failure, node()}, {{Module, OnCreate}, {_REASON_,_ST_}}}),
-    emqx_rule_registry:add_resource_params(#resource_params{id = ResId, params = Params, status = #{is_alive => true}}).
+                    start_reinitial_loop(ResId),
+                    {{init_resource_failure, node()},
+                     {{Module, OnCreate}, {_REASON_, _ST_}}}),
+    ResParams = #resource_params{id = ResId,
+                                 params = Params,
+                                 status = #{is_alive => true}},
+    emqx_rule_registry:add_resource_params(ResParams).
 
 init_action(Module, OnCreate, ActionInstId, Params) ->
     ok = emqx_rule_metrics:create_metrics(ActionInstId),
@@ -614,3 +619,14 @@ find_type(ResId) ->
 
 alarm_name_of_resource_down(Type, ResId) ->
     list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])).
+
+start_reinitial_loop(Id) ->
+    spawn(fun() ->
+        timer:sleep(60000),
+        case emqx_rule_registry:find_resource(Id) of
+           {ok, _}->
+                ?LOG(warning, "try to re-initialize resource: ~p", [Id]),
+                start_resource(Id);
+           not_found -> ok
+        end
+    end).