Просмотр исходного кода

Merge pull request #11172 from lafirest/fix/dup_payload

fix(ruleengine): fix duplicate `payload` problems
lafirest 2 лет назад
Родитель
Сommit
bf5167a8bf

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -1202,5 +1202,5 @@ convert_timestamp(MillisecondsTimestamp) ->
     MicroSecs = MicroTimestamp rem 1000_000,
     {MegaSecs, Secs, MicroSecs}.
 
-uuid_str(UUID, DisplyOpt) ->
-    uuid:uuid_to_string(UUID, DisplyOpt).
+uuid_str(UUID, DisplayOpt) ->
+    uuid:uuid_to_string(UUID, DisplayOpt).

+ 24 - 27
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -36,7 +36,7 @@
     ]
 ).
 
--compile({no_auto_import, [alias/1]}).
+-compile({no_auto_import, [alias/2]}).
 
 -type columns() :: map().
 -type alias() :: atom().
@@ -204,7 +204,7 @@ select_and_transform([{as, Field, Alias} | More], Columns, Action) ->
     );
 select_and_transform([Field | More], Columns, Action) ->
     Val = eval(Field, Columns),
-    Key = alias(Field),
+    Key = alias(Field, Columns),
     select_and_transform(
         More,
         nested_put(Key, Val, Columns),
@@ -228,11 +228,11 @@ select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) ->
     );
 select_and_collect([Field], Columns, {Action, _}) ->
     Val = eval(Field, Columns),
-    Key = alias(Field),
+    Key = alias(Field, Columns),
     {nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
 select_and_collect([Field | More], Columns, {Action, LastKV}) ->
     Val = eval(Field, Columns),
-    Key = alias(Field),
+    Key = alias(Field, Columns),
     select_and_collect(
         More,
         nested_put(Key, Val, Columns),
@@ -401,38 +401,36 @@ eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) ->
 eval({'fun', {_, Name}, Args}, Columns) ->
     apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
 
-handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Columns) ->
-    Columns#{payload => may_decode_payload(Payload)};
-handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns) ->
-    Columns#{<<"payload">> => may_decode_payload(Payload)};
-handle_alias(_, Columns) ->
-    Columns.
-
-alias({var, Var}) ->
+alias({var, Var}, _Columns) ->
     {var, Var};
-alias({const, Val}) when is_binary(Val) ->
+alias({const, Val}, _Columns) when is_binary(Val) ->
     {var, Val};
-alias({list, L}) ->
+alias({list, L}, _Columns) ->
     {var, ?ephemeral_alias(list, length(L))};
-alias({range, R}) ->
+alias({range, R}, _Columns) ->
     {var, ?ephemeral_alias(range, R)};
-alias({get_range, _, {var, Key}}) ->
+alias({get_range, _, {var, Key}}, _Columns) ->
     {var, Key};
-alias({get_range, _, {path, Path}}) ->
-    {path, Path};
-alias({path, Path}) ->
-    {path, Path};
-alias({const, Val}) ->
+alias({get_range, _, {path, _Path} = Path}, Columns) ->
+    handle_path_alias(Path, Columns);
+alias({path, _Path} = Path, Columns) ->
+    handle_path_alias(Path, Columns);
+alias({const, Val}, _Columns) ->
     {var, ?ephemeral_alias(const, Val)};
-alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) ->
+alias({Op, _L, _R}, _Columns) when ?is_arith(Op); ?is_comp(Op) ->
     {var, ?ephemeral_alias(op, Op)};
-alias({'case', On, _, _}) ->
+alias({'case', On, _, _}, _Columns) ->
     {var, ?ephemeral_alias('case', On)};
-alias({'fun', Name, _}) ->
+alias({'fun', Name, _}, _Columns) ->
     {var, ?ephemeral_alias('fun', Name)};
-alias(_) ->
+alias(_, _Columns) ->
     ?ephemeral_alias(unknown, unknown).
 
+handle_path_alias({path, [{key, <<"payload">>} | Rest]}, #{payload := _Payload} = _Columns) ->
+    {path, [{key, payload} | Rest]};
+handle_path_alias(Path, _Columns) ->
+    Path.
+
 eval_case_clauses([], ElseClauses, Columns) ->
     case ElseClauses of
         {} -> undefined;
@@ -515,8 +513,7 @@ safe_decode_and_cache(MaybeJson) ->
 ensure_list(List) when is_list(List) -> List;
 ensure_list(_NotList) -> [].
 
-nested_put(Alias, Val, Columns0) ->
-    Columns = handle_alias(Alias, Columns0),
+nested_put(Alias, Val, Columns) ->
     emqx_rule_maps:nested_put(Alias, Val, Columns).
 
 inc_action_metrics(RuleId, Result) ->

+ 71 - 4
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -1580,6 +1580,73 @@ t_sqlparse_foreach_1(_Config) ->
                 }
             }
         ),
+
+    Sql5 =
+        "foreach payload.sensors "
+        "from \"t/#\" ",
+    {ok, [
+        #{payload := #{<<"sensors">> := _}},
+        #{payload := #{<<"sensors">> := _}}
+    ]} =
+        emqx_rule_sqltester:test(
+            #{
+                sql => Sql5,
+                context => #{
+                    payload => <<"{\"sensors\": [1, 2]}">>,
+                    topic => <<"t/a">>
+                }
+            }
+        ),
+
+    try
+        meck:new(emqx_rule_runtime, [non_strict, passthrough]),
+        meck:expect(
+            emqx_rule_runtime,
+            apply_rule,
+            fun(Rule, #{payload := Payload} = Columns, Env) ->
+                Columns2 = maps:put(<<"payload">>, Payload, maps:without([payload], Columns)),
+                meck:passthrough([Rule, Columns2, Env])
+            end
+        ),
+
+        Sql6 =
+            "foreach payload.sensors "
+            "from \"t/#\" ",
+        {ok, [
+            #{<<"payload">> := #{<<"sensors">> := _}},
+            #{<<"payload">> := #{<<"sensors">> := _}}
+        ]} =
+            emqx_rule_sqltester:test(
+                #{
+                    sql => Sql6,
+                    context => #{
+                        <<"payload">> => <<"{\"sensors\": [1, 2]}">>,
+                        topic => <<"t/a">>
+                    }
+                }
+            ),
+
+        Sql7 =
+            "foreach payload.sensors "
+            "from \"t/#\" ",
+        ?assertNotMatch(
+            {ok, [
+                #{<<"payload">> := _, payload := _},
+                #{<<"payload">> := _, payload := _}
+            ]},
+            emqx_rule_sqltester:test(
+                #{
+                    sql => Sql7,
+                    context => #{
+                        <<"payload">> => <<"{\"sensors\": [1, 2]}">>,
+                        topic => <<"t/a">>
+                    }
+                }
+            )
+        )
+    after
+        meck:unload(emqx_rule_runtime)
+    end,
     ?assert(is_binary(TRuleId)).
 
 t_sqlparse_foreach_2(_Config) ->
@@ -2168,7 +2235,7 @@ t_sqlparse_array_index_1(_Config) ->
         "  payload.x[2] "
         "from \"t/#\" ",
     ?assertMatch(
-        {ok, #{<<"payload">> := #{<<"x">> := [3]}}},
+        {ok, #{payload := #{<<"x">> := [3]}}},
         emqx_rule_sqltester:test(
             #{
                 sql => Sql2,
@@ -2185,7 +2252,7 @@ t_sqlparse_array_index_1(_Config) ->
         "  payload.x[2].y "
         "from \"t/#\" ",
     ?assertMatch(
-        {ok, #{<<"payload">> := #{<<"x">> := [#{<<"y">> := 3}]}}},
+        {ok, #{payload := #{<<"x">> := [#{<<"y">> := 3}]}}},
         emqx_rule_sqltester:test(
             #{
                 sql => Sql3,
@@ -2373,7 +2440,7 @@ t_sqlparse_array_index_4(_Config) ->
         "0 as payload.x[2].y "
         "from \"t/#\" ",
     ?assertMatch(
-        {ok, #{<<"payload">> := #{<<"x">> := [1, #{<<"y">> := 0}, 3]}}},
+        {ok, #{payload := #{<<"x">> := [1, #{<<"y">> := 0}, 3]}}},
         emqx_rule_sqltester:test(
             #{
                 sql => Sql1,
@@ -2548,7 +2615,7 @@ t_sqlparse_array_range_2(_Config) ->
         "  payload.a[1..4] "
         "from \"t/#\" ",
     ?assertMatch(
-        {ok, #{<<"payload">> := #{<<"a">> := [0, 1, 2, 3]}}},
+        {ok, #{payload := #{<<"a">> := [0, 1, 2, 3]}}},
         emqx_rule_sqltester:test(
             #{
                 sql => Sql02,

+ 11 - 0
changes/ce/fix-11172.en.md

@@ -0,0 +1,11 @@
+Fix the `payload` will be duplicated in the below situations:
+- Use a `foreach` sentence without  the `as` sub-expression and select all fields(use the `*` or omitted the `do` sub-expression)
+
+  For example:
+
+  `FOREACH payload.sensors FROM "t/#"`
+- Select the `payload` field and all fields
+
+  For example:
+
+  `SELECT payload.sensors, * FROM "t/#"`