Просмотр исходного кода

chore(rule_engine): sync the code from rule-engine/dev/v4.3.0

Shawn 5 лет назад
Родитель
Сommit
573a4b2df8

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl

@@ -45,7 +45,7 @@ init([]) ->
                 shutdown => 5000,
                 type => worker,
                 modules => [emqx_rule_metrics]},
-    {ok, {{one_for_all, 10, 100}, [Registry, Metrics]}}.
+    {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
 
 start_locker() ->
     Locker = #{id => emqx_rule_locker,

+ 7 - 1
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -360,6 +360,12 @@ printable_maps(Headers) ->
         fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
                 AccIn#{K => ntoa(V0)};
             ('User-Property', V0, AccIn) when is_list(V0) ->
-                AccIn#{'User-Property' => maps:from_list(V0)};
+                AccIn#{
+                    'User-Property' => maps:from_list(V0),
+                    'User-Property-Pairs' => [#{
+                        key => Key,
+                        value => Value
+                     } || {Key, Value} <- V0]
+                };
             (K, V0, AccIn) -> AccIn#{K => V0}
         end, #{}, Headers).

+ 13 - 9
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -234,20 +234,26 @@ take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = A
             = emqx_rule_registry:get_action_instance_params(Id),
         emqx_rule_metrics:inc_actions_taken(Id),
         apply_action_func(Selected, Envs, Apply, ActName)
+    of
+        {badact, Reason} ->
+            handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason);
+        Result -> Result
     catch
         error:{badfun, _Func}:_ST ->
             %?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
             %              "Func: ~p~nST:~0p", [Id, Func, ST]),
             trans_action_on(Id, fun() ->
-                    emqx_rule_engine:refresh_actions([ActInst])
+                emqx_rule_engine:refresh_actions([ActInst])
             end, 5000),
             emqx_rule_metrics:inc_actions_retry(Id),
             take_action(ActInst, Selected, Envs, OnFailed, RetryN-1);
         Error:Reason:Stack ->
+            emqx_rule_metrics:inc_actions_exception(Id),
             handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack})
     end;
 
 take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) ->
+    emqx_rule_metrics:inc_actions_error(Id),
     handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}).
 
 apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) ->
@@ -284,12 +290,10 @@ wait_action_on(Id, RetryN) ->
     end.
 
 handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
-    emqx_rule_metrics:inc_actions_exception(Id),
     ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
     take_actions(Fallbacks, Selected, Envs, continue),
     failed;
 handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
-    emqx_rule_metrics:inc_actions_exception(Id),
     ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
     take_actions(Fallbacks, Selected, Envs, continue),
     error({take_action_failed, {Id, Reason}}).
@@ -409,11 +413,13 @@ add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
 %%------------------------------------------------------------------------------
 %% Internal Functions
 %%------------------------------------------------------------------------------
-may_decode_payload(Payload) ->
+may_decode_payload(Payload) when is_binary(Payload) ->
     case get_cached_payload() of
-        undefined -> ensure_decoded(Payload);
+        undefined -> safe_decode_and_cache(Payload);
         DecodedP -> DecodedP
-    end.
+    end;
+may_decode_payload(Payload) ->
+    Payload.
 
 get_cached_payload() ->
     erlang:get(rule_payload).
@@ -422,9 +428,7 @@ cache_payload(DecodedP) ->
     erlang:put(rule_payload, DecodedP),
     DecodedP.
 
-ensure_decoded(Json) when is_map(Json); is_list(Json) ->
-    Json;
-ensure_decoded(MaybeJson) ->
+safe_decode_and_cache(MaybeJson) ->
     try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
     catch _:_ -> #{}
     end.

+ 94 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -37,6 +37,7 @@ all() ->
     , {group, runtime}
     , {group, events}
     , {group, multi_actions}
+    , {group, bugs}
     ].
 
 suite() ->
@@ -123,11 +124,15 @@ groups() ->
      {events, [],
       [t_events
       ]},
+     {bugs, [],
+      [t_sqlparse_payload_as
+      ]},
      {multi_actions, [],
       [t_sqlselect_multi_actoins_1,
        t_sqlselect_multi_actoins_1_1,
        t_sqlselect_multi_actoins_2,
        t_sqlselect_multi_actoins_3,
+       t_sqlselect_multi_actoins_3_1,
        t_sqlselect_multi_actoins_4
       ]}
     ].
@@ -200,6 +205,7 @@ init_per_testcase(Test, Config)
             ;Test =:= t_sqlselect_multi_actoins_1_1
             ;Test =:= t_sqlselect_multi_actoins_2
             ;Test =:= t_sqlselect_multi_actoins_3
+            ;Test =:= t_sqlselect_multi_actoins_3_1
             ;Test =:= t_sqlselect_multi_actoins_4
         ->
     ok = emqx_rule_engine:load_providers(),
@@ -209,6 +215,12 @@ init_per_testcase(Test, Config)
                     types=[], params_spec = #{},
                     title = #{en => <<"Crash Action">>},
                     description = #{en => <<"This action will always fail!">>}}),
+    ok = emqx_rule_registry:add_action(
+            #action{name = 'failure_action', app = ?APP,
+                    module = ?MODULE, on_create = failure_action,
+                    types=[], params_spec = #{},
+                    title = #{en => <<"Crash Action">>},
+                    description = #{en => <<"This action will always fail!">>}}),
     ok = emqx_rule_registry:add_action(
             #action{name = 'plus_by_one', app = ?APP,
                     module = ?MODULE, on_create = plus_by_one_action,
@@ -1288,6 +1300,44 @@ t_sqlselect_multi_actoins_3(Config) ->
 
     emqx_rule_registry:remove_rule(Rule).
 
+t_sqlselect_multi_actoins_3_1(Config) ->
+    %% We create 2 actions in the same rule (on_action_failed = continue):
+    %% The first will fail (with a 'badact' return) and we need to make sure the
+    %% fallback actions can be executed, and the next actoins
+    %% will be run without influence
+    {ok, Rule} = emqx_rule_engine:create_rule(
+                    #{rawsql => ?config(connsql, Config),
+                      on_action_failed => continue,
+                      actions => [
+                          #{name => 'failure_action', args => #{}, fallbacks =>[
+                              #{name => 'plus_by_one', args => #{}, fallbacks =>[]},
+                              #{name => 'plus_by_one', args => #{}, fallbacks =>[]}
+                          ]},
+                          #{name => 'republish',
+                            args => #{<<"target_topic">> => <<"t2">>,
+                                      <<"target_qos">> => -1,
+                                      <<"payload_tmpl">> => <<"clientid=${clientid}">>
+                                    },
+                            fallbacks => []}
+                      ]
+                     }),
+
+    (?config(conn_event, Config))(),
+    timer:sleep(100),
+
+    %% verfiy the fallback actions has been run
+    ?assertEqual(2, ets:lookup_element(plus_by_one_action, num, 2)),
+
+    %% verfiy the next actions can be run
+    receive {publish, #{topic := T, payload := Payload}} ->
+        ?assertEqual(<<"t2">>, T),
+        ?assertEqual(<<"clientid=c_emqx1">>, Payload)
+    after 1000 ->
+        ct:fail(wait_for_t2)
+    end,
+
+    emqx_rule_registry:remove_rule(Rule).
+
 t_sqlselect_multi_actoins_4(Config) ->
     %% We create 2 actions in the same rule (on_action_failed = continue):
     %% The first will fail and we need to make sure the
@@ -1920,6 +1970,44 @@ t_sqlparse_new_map(_Config) ->
                    <<"c">> := [#{}]
                    }, Res00).
 
+t_sqlparse_payload_as(_Config) ->
+    %% https://github.com/emqx/emqx/issues/3866
+    Sql00 = "SELECT "
+            " payload, map_get('engineWorkTime', payload.params, -1) as payload.params.engineWorkTime, "
+            " map_get('hydOilTem', payload.params, -1) as payload.params.hydOilTem "
+            "FROM \"t/#\" ",
+    Payload1 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42, \"hydOilTem\": 30 } }">>,
+    {ok, Res01} = emqx_rule_sqltester:test(
+                    #{<<"rawsql">> => Sql00,
+                      <<"ctx">> => #{<<"payload">> => Payload1,
+                                     <<"topic">> => <<"t/a">>}}),
+    ?assertMatch(#{
+        <<"payload">> := #{
+            <<"params">> := #{
+                <<"convertTemp">> := 20,
+                <<"engineSpeed">> := 42,
+                <<"engineWorkTime">> := -1,
+                <<"hydOilTem">> := 30
+            }
+        }
+    }, Res01),
+
+    Payload2 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42 } }">>,
+    {ok, Res02} = emqx_rule_sqltester:test(
+                    #{<<"rawsql">> => Sql00,
+                      <<"ctx">> => #{<<"payload">> => Payload2,
+                                     <<"topic">> => <<"t/a">>}}),
+    ?assertMatch(#{
+        <<"payload">> := #{
+            <<"params">> := #{
+                <<"convertTemp">> := 20,
+                <<"engineSpeed">> := 42,
+                <<"engineWorkTime">> := -1,
+                <<"hydOilTem">> := -1
+            }
+        }
+    }, Res02).
+
 %%------------------------------------------------------------------------------
 %% Internal helpers
 %%------------------------------------------------------------------------------
@@ -2006,6 +2094,12 @@ mfa_action(Id, _Params) ->
 mfa_action_do(_Data, _Envs, K) ->
     persistent_term:put(K, 1).
 
+failure_action(_Id, _Params) ->
+    fun(Data, _Envs) ->
+        ct:pal("applying crash action, Data: ~p", [Data]),
+        {badact, intentional_failure}
+    end.
+
 crash_action(_Id, _Params) ->
     fun(Data, _Envs) ->
         ct:pal("applying crash action, Data: ~p", [Data]),