ソースを参照

Merge pull request #7314 from terry-xiaoyu/demo0315

Fix some issues in rule engine found in the demo session
Xinyu Liu 3 年 前
コミット
ef8ad9b1f0

+ 1 - 0
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -50,6 +50,7 @@
         , enable := boolean()
         , enable := boolean()
         , description => binary()
         , description => binary()
         , created_at := integer() %% epoch in millisecond precision
         , created_at := integer() %% epoch in millisecond precision
+        , updated_at := integer() %% epoch in millisecond precision
         , from := list(topic())
         , from := list(topic())
         , is_foreach := boolean()
         , is_foreach := boolean()
         , fields := list()
         , fields := list()

+ 15 - 6
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -124,14 +124,19 @@ load_rules() ->
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
 create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
 create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
     case get_rule(RuleId) of
     case get_rule(RuleId) of
-        not_found -> do_create_rule(Params);
-        {ok, _} -> {error, {already_exists, RuleId}}
+        not_found -> parse_and_insert(Params, now_ms());
+        {ok, _} -> {error, already_exists}
     end.
     end.
 
 
 -spec update_rule(map()) -> {ok, rule()} | {error, term()}.
 -spec update_rule(map()) -> {ok, rule()} | {error, term()}.
 update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
 update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
-    ok = delete_rule(RuleId),
-    do_create_rule(Params).
+    case get_rule(RuleId) of
+        not_found ->
+            {error, not_found};
+        {ok, #{created_at := CreatedAt}} ->
+            ok = delete_rule(RuleId),
+            parse_and_insert(Params, CreatedAt)
+    end.
 
 
 -spec(delete_rule(RuleId :: rule_id()) -> ok).
 -spec(delete_rule(RuleId :: rule_id()) -> ok).
 delete_rule(RuleId) when is_binary(RuleId) ->
 delete_rule(RuleId) when is_binary(RuleId) ->
@@ -232,13 +237,14 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Functions
 %% Internal Functions
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
-do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
+parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, CreatedAt) ->
     case emqx_rule_sqlparser:parse(Sql) of
     case emqx_rule_sqlparser:parse(Sql) of
         {ok, Select} ->
         {ok, Select} ->
             Rule = #{
             Rule = #{
                 id => RuleId,
                 id => RuleId,
                 name => maps:get(name, Params, <<"">>),
                 name => maps:get(name, Params, <<"">>),
-                created_at => erlang:system_time(millisecond),
+                created_at => CreatedAt,
+                updated_at => now_ms(),
                 enable => maps:get(enable, Params, true),
                 enable => maps:get(enable, Params, true),
                 sql => Sql,
                 sql => Sql,
                 outputs => parse_outputs(Outputs),
                 outputs => parse_outputs(Outputs),
@@ -287,5 +293,8 @@ get_all_records(Tab) ->
 maps_foreach(Fun, Map) ->
 maps_foreach(Fun, Map) ->
     lists:foreach(Fun, maps:to_list(Map)).
     lists:foreach(Fun, maps:to_list(Map)).
 
 
+now_ms() ->
+    erlang:system_time(millisecond).
+
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(B) when is_binary(B) -> B.
 bin(B) when is_binary(B) -> B.

+ 3 - 3
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -207,7 +207,7 @@ param_path_id() ->
                 {ok, _Rule} ->
                 {ok, _Rule} ->
                     {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
                     {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
                 not_found ->
                 not_found ->
-                    case emqx_conf:update(ConfPath, Params, #{}) of
+                    case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
                         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
                         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
                             [Rule] = get_one_rule(AllRules, Id),
                             [Rule] = get_one_rule(AllRules, Id),
                             {201, format_rule_resp(Rule)};
                             {201, format_rule_resp(Rule)};
@@ -238,7 +238,7 @@ param_path_id() ->
 '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
 '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
     Params = filter_out_request_body(Params0),
     Params = filter_out_request_body(Params0),
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
-    case emqx_conf:update(ConfPath, Params, #{}) of
+    case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
             [Rule] = get_one_rule(AllRules, Id),
             [Rule] = get_one_rule(AllRules, Id),
             {200, format_rule_resp(Rule)};
             {200, format_rule_resp(Rule)};
@@ -250,7 +250,7 @@ param_path_id() ->
 
 
 '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
 '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
-    case emqx_conf:remove(ConfPath, #{}) of
+    case emqx_conf:remove(ConfPath, #{override_to => cluster}) of
         {ok, _} -> {204};
         {ok, _} -> {204};
         {error, Reason} ->
         {error, Reason} ->
             ?SLOG(error, #{msg => "delete_rule_failed",
             ?SLOG(error, #{msg => "delete_rule_failed",