|
|
@@ -43,7 +43,6 @@
|
|
|
]).
|
|
|
|
|
|
-export([ init_resource/4
|
|
|
- , init_resource/5
|
|
|
, init_action/4
|
|
|
, clear_resource/3
|
|
|
, clear_rule/1
|
|
|
@@ -239,7 +238,7 @@ create_resource(#{type := Type, config := Config0} = Params) ->
|
|
|
ok = emqx_rule_registry:add_resource(Resource),
|
|
|
%% Note that we will return OK in case of resource creation failure,
|
|
|
%% A timer is started to re-start the resource later.
|
|
|
- catch cluster_call(init_resource, [M, F, ResId, Config, true]),
|
|
|
+ catch cluster_call(init_resource, [M, F, ResId, Config]),
|
|
|
{ok, Resource};
|
|
|
not_found ->
|
|
|
{error, {resource_type_not_found, Type}}
|
|
|
@@ -382,24 +381,23 @@ delete_resource(ResId) ->
|
|
|
|
|
|
-spec(refresh_resources() -> ok).
|
|
|
refresh_resources() ->
|
|
|
- lists:foreach(fun(#resource{id = ResId} = Res) ->
|
|
|
- try refresh_resource(Res)
|
|
|
+ lists:foreach(fun refresh_resource/1,
|
|
|
+ emqx_rule_registry:get_resources()).
|
|
|
+
|
|
|
+refresh_resource(Type) when is_atom(Type) ->
|
|
|
+ lists:foreach(fun refresh_resource/1,
|
|
|
+ emqx_rule_registry:get_resources_by_type(Type));
|
|
|
+
|
|
|
+refresh_resource(#resource{id = ResId, config = Config, type = Type}) ->
|
|
|
+ {ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
|
|
|
+ try cluster_call(init_resource, [M, F, ResId, Config])
|
|
|
catch Error:Reason:ST ->
|
|
|
logger:critical(
|
|
|
"Can not re-stablish resource ~p: ~0p. The resource is disconnected."
|
|
|
"Fix the issue and establish it manually.\n"
|
|
|
"Stacktrace: ~0p",
|
|
|
[ResId, {Error, Reason}, ST])
|
|
|
- end
|
|
|
- end, emqx_rule_registry:get_resources()).
|
|
|
-
|
|
|
-refresh_resource(Type) when is_atom(Type) ->
|
|
|
- lists:foreach(fun(Resource) ->
|
|
|
- refresh_resource(Resource)
|
|
|
- end, emqx_rule_registry:get_resources_by_type(Type));
|
|
|
-refresh_resource(#resource{id = ResId, config = Config, type = Type}) ->
|
|
|
- {ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
|
|
|
- cluster_call(init_resource, [M, F, ResId, Config]).
|
|
|
+ end.
|
|
|
|
|
|
-spec(refresh_rules() -> ok).
|
|
|
refresh_rules() ->
|
|
|
@@ -531,14 +529,11 @@ cluster_call(Func, Args) ->
|
|
|
end.
|
|
|
|
|
|
init_resource(Module, OnCreate, ResId, Config) ->
|
|
|
- init_resource(Module, OnCreate, ResId, Config, false).
|
|
|
-
|
|
|
-init_resource(Module, OnCreate, ResId, Config, Restart) ->
|
|
|
Params = ?RAISE(
|
|
|
- Module:OnCreate(ResId, Config),
|
|
|
- Restart andalso
|
|
|
- timer:apply_after(timer:seconds(60), ?MODULE, init_resource,
|
|
|
- [Module, OnCreate, ResId, Config, Restart]),
|
|
|
+ begin
|
|
|
+ emqx_rule_registry:find_resource(ResId) /= not_found
|
|
|
+ andalso Module:OnCreate(ResId, Config)
|
|
|
+ end,
|
|
|
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
|
|
|
ResParams = #resource_params{id = ResId,
|
|
|
params = Params,
|