|
|
@@ -124,14 +124,19 @@ load_rules() ->
|
|
|
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
|
|
|
create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
|
|
|
case get_rule(RuleId) of
|
|
|
- not_found -> do_create_rule(Params);
|
|
|
- {ok, _} -> {error, {already_exists, RuleId}}
|
|
|
+ not_found -> parse_and_insert(Params, now_ms());
|
|
|
+ {ok, _} -> {error, already_exists}
|
|
|
end.
|
|
|
|
|
|
-spec update_rule(map()) -> {ok, rule()} | {error, term()}.
|
|
|
update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
|
|
|
- ok = delete_rule(RuleId),
|
|
|
- do_create_rule(Params).
|
|
|
+ case get_rule(RuleId) of
|
|
|
+ not_found ->
|
|
|
+ {error, not_found};
|
|
|
+ {ok, #{created_at := CreatedAt}} ->
|
|
|
+ ok = delete_rule(RuleId),
|
|
|
+ parse_and_insert(Params, CreatedAt)
|
|
|
+ end.
|
|
|
|
|
|
-spec(delete_rule(RuleId :: rule_id()) -> ok).
|
|
|
delete_rule(RuleId) when is_binary(RuleId) ->
|
|
|
@@ -232,13 +237,14 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%% Internal Functions
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
|
|
|
+parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, CreatedAt) ->
|
|
|
case emqx_rule_sqlparser:parse(Sql) of
|
|
|
{ok, Select} ->
|
|
|
Rule = #{
|
|
|
id => RuleId,
|
|
|
name => maps:get(name, Params, <<"">>),
|
|
|
- created_at => erlang:system_time(millisecond),
|
|
|
+ created_at => CreatedAt,
|
|
|
+ updated_at => now_ms(),
|
|
|
enable => maps:get(enable, Params, true),
|
|
|
sql => Sql,
|
|
|
outputs => parse_outputs(Outputs),
|
|
|
@@ -287,5 +293,8 @@ get_all_records(Tab) ->
|
|
|
maps_foreach(Fun, Map) ->
|
|
|
lists:foreach(Fun, maps:to_list(Map)).
|
|
|
|
|
|
+now_ms() ->
|
|
|
+ erlang:system_time(millisecond).
|
|
|
+
|
|
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
|
|
bin(B) when is_binary(B) -> B.
|