|
|
@@ -39,6 +39,7 @@
|
|
|
, get_resource_status/1
|
|
|
, get_resource_params/1
|
|
|
, delete_resource/1
|
|
|
+ , ensure_resource_deleted/1
|
|
|
]).
|
|
|
|
|
|
-export([ init_resource/4
|
|
|
@@ -154,7 +155,7 @@ module_attributes(Module) ->
|
|
|
%% APIs for rules and resources
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
--dialyzer([{nowarn_function, create_rule/1}]).
|
|
|
+-dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]).
|
|
|
-spec(create_rule(#{}) -> {ok, rule()} | no_return()).
|
|
|
create_rule(Params = #{rawsql := Sql, actions := Actions}) ->
|
|
|
case emqx_rule_sqlparser:parse_select(Sql) of
|
|
|
@@ -179,7 +180,7 @@ create_rule(Params = #{rawsql := Sql, actions := Actions}) ->
|
|
|
Error -> error(Error)
|
|
|
end.
|
|
|
|
|
|
--spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | no_return()).
|
|
|
+-spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}).
|
|
|
update_rule(Params = #{id := RuleId}) ->
|
|
|
case emqx_rule_registry:get_rule(RuleId) of
|
|
|
{ok, Rule0} ->
|
|
|
@@ -206,7 +207,7 @@ delete_rule(RuleId) ->
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
--spec(create_resource(#{}) -> {ok, resource()} | {error, Reason :: term()}).
|
|
|
+-spec(create_resource(#{type := _, config := _, _ => _}) -> {ok, resource()} | {error, Reason :: term()}).
|
|
|
create_resource(#{type := Type, config := Config} = Params) ->
|
|
|
case emqx_rule_registry:find_resource_type(Type) of
|
|
|
{ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} ->
|
|
|
@@ -215,7 +216,9 @@ create_resource(#{type := Type, config := Config} = Params) ->
|
|
|
Resource = #resource{id = ResId,
|
|
|
type = Type,
|
|
|
config = Config,
|
|
|
- description = iolist_to_binary(maps:get(description, Params, ""))},
|
|
|
+ description = iolist_to_binary(maps:get(description, Params, "")),
|
|
|
+ created_at = erlang:system_time(millisecond)
|
|
|
+ },
|
|
|
ok = emqx_rule_registry:add_resource(Resource),
|
|
|
%% Note that we will return OK in case of resource creation failure,
|
|
|
%% users can always re-start the resource later.
|
|
|
@@ -231,14 +234,14 @@ start_resource(ResId) ->
|
|
|
{ok, #resource{type = ResType, config = Config}} ->
|
|
|
{ok, #resource_type{on_create = {Mod, Create}}}
|
|
|
= emqx_rule_registry:find_resource_type(ResType),
|
|
|
- init_resource(Mod, Create, ResId, Config),
|
|
|
+ _ = init_resource(Mod, Create, ResId, Config),
|
|
|
refresh_actions_of_a_resource(ResId),
|
|
|
ok;
|
|
|
not_found ->
|
|
|
{error, {resource_not_found, ResId}}
|
|
|
end.
|
|
|
|
|
|
--spec(test_resource(#{}) -> ok | {error, Reason :: term()}).
|
|
|
+-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
|
|
|
test_resource(#{type := Type, config := Config}) ->
|
|
|
case emqx_rule_registry:find_resource_type(Type) of
|
|
|
{ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} ->
|
|
|
@@ -284,6 +287,12 @@ delete_resource(ResId) ->
|
|
|
{error, {resource_not_found, ResId}}
|
|
|
end.
|
|
|
|
|
|
+%% @doc Ensure resource deleted. `resource_not_found` error is discarded.
|
|
|
+-spec(ensure_resource_deleted(resource_id()) -> ok).
|
|
|
+ensure_resource_deleted(ResId) ->
|
|
|
+ _ = delete_resource(ResId),
|
|
|
+ ok.
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Re-establish resources
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -530,12 +539,12 @@ fetch_resource_status(Module, OnStatus, ResId) ->
|
|
|
end.
|
|
|
|
|
|
refresh_actions_of_a_resource(ResId) ->
|
|
|
- [refresh_actions(Actions,
|
|
|
- fun (#action_instance{args = #{<<"$resource">> := ResId0}})
|
|
|
+ R = fun (#action_instance{args = #{<<"$resource">> := ResId0}})
|
|
|
when ResId0 =:= ResId -> true;
|
|
|
(_) -> false
|
|
|
- end)
|
|
|
- || #rule{actions = Actions} <- emqx_rule_registry:get_rules()].
|
|
|
+ end,
|
|
|
+ F = fun(#rule{actions = Actions}) -> refresh_actions(Actions, R) end,
|
|
|
+ lists:foreach(F, emqx_rule_registry:get_rules()).
|
|
|
|
|
|
refresh_actions(Actions) ->
|
|
|
refresh_actions(Actions, fun(_) -> true end).
|