Просмотр исходного кода

fix: remove the 'headers' field from the rule events

Shawn 3 лет назад
Родитель
Сommit
6685a3c5a8

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -118,7 +118,7 @@ unload_hook() ->
 on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
     case maps:get(sys, Flags, false) of
         false ->
-            Msg = emqx_rule_events:eventmsg_publish(Message),
+            {Msg, _} = emqx_rule_events:eventmsg_publish(Message),
             send_to_matched_egress_bridges(Topic, Msg);
         true ->
             ok

+ 2 - 1
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -63,7 +63,8 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
     exp_msg().
 to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
     Retain0 = maps:get(retain, Flags0, false),
-    MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)),
+    {Columns, _} = emqx_rule_events:eventmsg_publish(Msg),
+    MapMsg = maps:put(retain, Retain0, Columns),
     to_remote_msg(MapMsg, Vars);
 to_remote_msg(MapMsg, #{
     remote_topic := TopicToken,

+ 61 - 53
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -107,36 +107,41 @@ unload(Topic) ->
 %%--------------------------------------------------------------------
 %% Callbacks
 %%--------------------------------------------------------------------
-on_message_publish(Message = #message{topic = Topic}, _Env) ->
+on_message_publish(Message = #message{topic = Topic}, _Conf) ->
     case ignore_sys_message(Message) of
         true ->
             ok;
         false ->
             case emqx_rule_engine:get_rules_for_topic(Topic) of
-                [] -> ok;
-                Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
+                [] ->
+                    ok;
+                Rules ->
+                    %% ENVs are the fields that can't be refereced by the SQL, but can be used
+                    %% from actions. e.g. The 'headers' field in the internal record `#message{}`.
+                    {Columns, Envs} = eventmsg_publish(Message),
+                    emqx_rule_runtime:apply_rules(Rules, Columns, Envs)
             end
     end,
     {ok, Message}.
 
-on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) ->
-    apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env).
+on_bridge_message_received(Message, Conf = #{event_topic := BridgeTopic}) ->
+    apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message, #{}) end, Conf).
 
-on_client_connected(ClientInfo, ConnInfo, Env) ->
+on_client_connected(ClientInfo, ConnInfo, Conf) ->
     apply_event(
         'client.connected',
         fun() -> eventmsg_connected(ClientInfo, ConnInfo) end,
-        Env
+        Conf
     ).
 
-on_client_connack(ConnInfo, Reason, _, Env) ->
+on_client_connack(ConnInfo, Reason, _, Conf) ->
     apply_event(
         'client.connack',
         fun() -> eventmsg_connack(ConnInfo, Reason) end,
-        Env
+        Conf
     ).
 
-on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Env) ->
+on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Conf) ->
     apply_event(
         'client.check_authz_complete',
         fun() ->
@@ -148,35 +153,35 @@ on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, E
                 AuthzSource
             )
         end,
-        Env
+        Conf
     ).
 
-on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
+on_client_disconnected(ClientInfo, Reason, ConnInfo, Conf) ->
     apply_event(
         'client.disconnected',
         fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end,
-        Env
+        Conf
     ).
 
-on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
+on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) ->
     apply_event(
         'session.subscribed',
         fun() ->
             eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
         end,
-        Env
+        Conf
     ).
 
-on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
+on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) ->
     apply_event(
         'session.unsubscribed',
         fun() ->
             eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
         end,
-        Env
+        Conf
     ).
 
-on_message_dropped(Message, _, Reason, Env) ->
+on_message_dropped(Message, _, Reason, Conf) ->
     case ignore_sys_message(Message) of
         true ->
             ok;
@@ -184,12 +189,12 @@ on_message_dropped(Message, _, Reason, Env) ->
             apply_event(
                 'message.dropped',
                 fun() -> eventmsg_dropped(Message, Reason) end,
-                Env
+                Conf
             )
     end,
     {ok, Message}.
 
-on_message_delivered(ClientInfo, Message, Env) ->
+on_message_delivered(ClientInfo, Message, Conf) ->
     case ignore_sys_message(Message) of
         true ->
             ok;
@@ -197,12 +202,12 @@ on_message_delivered(ClientInfo, Message, Env) ->
             apply_event(
                 'message.delivered',
                 fun() -> eventmsg_delivered(ClientInfo, Message) end,
-                Env
+                Conf
             )
     end,
     {ok, Message}.
 
-on_message_acked(ClientInfo, Message, Env) ->
+on_message_acked(ClientInfo, Message, Conf) ->
     case ignore_sys_message(Message) of
         true ->
             ok;
@@ -210,12 +215,12 @@ on_message_acked(ClientInfo, Message, Env) ->
             apply_event(
                 'message.acked',
                 fun() -> eventmsg_acked(ClientInfo, Message) end,
-                Env
+                Conf
             )
     end,
     {ok, Message}.
 
-on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
+on_delivery_dropped(ClientInfo, Message, Reason, Conf) ->
     case ignore_sys_message(Message) of
         true ->
             ok;
@@ -223,7 +228,7 @@ on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
             apply_event(
                 'delivery.dropped',
                 fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end,
-                Env
+                Conf
             )
     end,
     {ok, Message}.
@@ -256,10 +261,9 @@ eventmsg_publish(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            %% the column 'headers' will be removed in the next major release
-            headers => printable_maps(Headers),
             publish_received_at => Timestamp
-        }
+        },
+        #{headers => Headers}
     ).
 
 eventmsg_connected(
@@ -299,7 +303,8 @@ eventmsg_connected(
             is_bridge => IsBridge,
             conn_props => printable_maps(ConnProps),
             connected_at => ConnectedAt
-        }
+        },
+        #{}
     ).
 
 eventmsg_disconnected(
@@ -328,7 +333,8 @@ eventmsg_disconnected(
             proto_ver => ProtoVer,
             disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
             disconnected_at => DisconnectedAt
-        }
+        },
+        #{}
     ).
 
 eventmsg_connack(
@@ -360,7 +366,8 @@ eventmsg_connack(
             keepalive => Keepalive,
             expiry_interval => ExpiryInterval,
             conn_props => printable_maps(ConnProps)
-        }
+        },
+        #{}
     ).
 
 eventmsg_check_authz_complete(
@@ -384,7 +391,8 @@ eventmsg_check_authz_complete(
             action => PubSub,
             authz_source => AuthzSource,
             result => Result
-        }
+        },
+        #{}
     ).
 
 eventmsg_sub_or_unsub(
@@ -407,7 +415,8 @@ eventmsg_sub_or_unsub(
             PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
             topic => Topic,
             qos => QoS
-        }
+        },
+        #{}
     ).
 
 eventmsg_dropped(
@@ -435,11 +444,10 @@ eventmsg_dropped(
             topic => Topic,
             qos => QoS,
             flags => Flags,
-            %% the column 'headers' will be removed in the next major release
-            headers => printable_maps(Headers),
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
             publish_received_at => Timestamp
-        }
+        },
+        #{headers => Headers}
     ).
 
 eventmsg_delivered(
@@ -472,11 +480,10 @@ eventmsg_delivered(
             topic => Topic,
             qos => QoS,
             flags => Flags,
-            %% the column 'headers' will be removed in the next major release
-            headers => printable_maps(Headers),
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
             publish_received_at => Timestamp
-        }
+        },
+        #{headers => Headers}
     ).
 
 eventmsg_acked(
@@ -509,12 +516,11 @@ eventmsg_acked(
             topic => Topic,
             qos => QoS,
             flags => Flags,
-            %% the column 'headers' will be removed in the next major release
-            headers => printable_maps(Headers),
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
             puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
             publish_received_at => Timestamp
-        }
+        },
+        #{headers => Headers}
     ).
 
 eventmsg_delivery_dropped(
@@ -549,34 +555,37 @@ eventmsg_delivery_dropped(
             topic => Topic,
             qos => QoS,
             flags => Flags,
-            %% the column 'headers' will be removed in the next major release
-            headers => printable_maps(Headers),
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
             publish_received_at => Timestamp
-        }
+        },
+        #{headers => Headers}
     ).
 
 sub_unsub_prop_key('session.subscribed') -> sub_props;
 sub_unsub_prop_key('session.unsubscribed') -> unsub_props.
 
-with_basic_columns(EventName, Data) when is_map(Data) ->
-    Data#{
-        event => EventName,
-        timestamp => erlang:system_time(millisecond),
-        node => node()
+with_basic_columns(EventName, Columns, Envs) when is_map(Columns) ->
+    {
+        Columns#{
+            event => EventName,
+            timestamp => erlang:system_time(millisecond),
+            node => node()
+        },
+        Envs
     }.
 
 %%--------------------------------------------------------------------
 %% rules applying
 %%--------------------------------------------------------------------
-apply_event(EventName, GenEventMsg, _Env) ->
+apply_event(EventName, GenEventMsg, _Conf) ->
     EventTopic = event_topic(EventName),
     case emqx_rule_engine:get_rules_for_topic(EventTopic) of
         [] ->
             ok;
         Rules ->
             %% delay the generating of eventmsg after we have found some rules to apply
-            emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
+            {Columns, Envs} = GenEventMsg(),
+            emqx_rule_runtime:apply_rules(Rules, Columns, Envs)
     end.
 
 %%--------------------------------------------------------------------
@@ -777,7 +786,6 @@ columns_with_exam('message.publish') ->
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
-        {<<"headers">>, undefined},
         {<<"publish_received_at">>, erlang:system_time(millisecond)},
         columns_example_props(pub_props),
         {<<"timestamp">>, erlang:system_time(millisecond)},

+ 112 - 109
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -21,8 +21,8 @@
 -include_lib("emqx/include/logger.hrl").
 
 -export([
-    apply_rule/2,
-    apply_rules/2,
+    apply_rule/3,
+    apply_rules/3,
     clear_rule_payload/0
 ]).
 
@@ -37,7 +37,7 @@
 
 -compile({no_auto_import, [alias/1]}).
 
--type input() :: map().
+-type columns() :: map().
 -type alias() :: atom().
 -type collection() :: {alias(), [term()]}.
 
@@ -50,24 +50,24 @@
 %%------------------------------------------------------------------------------
 %% Apply rules
 %%------------------------------------------------------------------------------
--spec apply_rules(list(rule()), input()) -> ok.
-apply_rules([], _Input) ->
+-spec apply_rules(list(rule()), columns(), envs()) -> ok.
+apply_rules([], _Columns, _Envs) ->
     ok;
-apply_rules([#{enable := false} | More], Input) ->
-    apply_rules(More, Input);
-apply_rules([Rule | More], Input) ->
-    apply_rule_discard_result(Rule, Input),
-    apply_rules(More, Input).
-
-apply_rule_discard_result(Rule, Input) ->
-    _ = apply_rule(Rule, Input),
+apply_rules([#{enable := false} | More], Columns, Envs) ->
+    apply_rules(More, Columns, Envs);
+apply_rules([Rule | More], Columns, Envs) ->
+    apply_rule_discard_result(Rule, Columns, Envs),
+    apply_rules(More, Columns, Envs).
+
+apply_rule_discard_result(Rule, Columns, Envs) ->
+    _ = apply_rule(Rule, Columns, Envs),
     ok.
 
-apply_rule(Rule = #{id := RuleID}, Input) ->
+apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
     ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'),
     clear_rule_payload(),
     try
-        do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
+        do_apply_rule(Rule, add_metadata(Columns, #{rule_id => RuleID}), Envs)
     catch
         %% ignore the errors if select or match failed
         _:Reason = {select_and_transform_error, Error} ->
@@ -124,13 +124,14 @@ do_apply_rule(
         conditions := Conditions,
         actions := Actions
     },
-    Input
+    Columns,
+    Envs
 ) ->
     {Selected, Collection} = ?RAISE(
-        select_and_collect(Fields, Input),
+        select_and_collect(Fields, Columns),
         {select_and_collect_error, {_EXCLASS_, _EXCPTION_, _ST_}}
     ),
-    ColumnsAndSelected = maps:merge(Input, Selected),
+    ColumnsAndSelected = maps:merge(Columns, Selected),
     case
         ?RAISE(
             match_conditions(Conditions, ColumnsAndSelected),
@@ -138,14 +139,15 @@ do_apply_rule(
         )
     of
         true ->
-            Collection2 = filter_collection(Input, InCase, DoEach, Collection),
+            Collection2 = filter_collection(Columns, InCase, DoEach, Collection),
             case Collection2 of
                 [] ->
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
                 _ ->
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
             end,
-            {ok, [handle_action_list(RuleId, Actions, Coll, Input) || Coll <- Collection2]};
+            NewEnvs = maps:merge(Columns, Envs),
+            {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]};
         false ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
             {error, nomatch}
@@ -158,21 +160,22 @@ do_apply_rule(
         conditions := Conditions,
         actions := Actions
     },
-    Input
+    Columns,
+    Envs
 ) ->
     Selected = ?RAISE(
-        select_and_transform(Fields, Input),
+        select_and_transform(Fields, Columns),
         {select_and_transform_error, {_EXCLASS_, _EXCPTION_, _ST_}}
     ),
     case
         ?RAISE(
-            match_conditions(Conditions, maps:merge(Input, Selected)),
+            match_conditions(Conditions, maps:merge(Columns, Selected)),
             {match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}}
         )
     of
         true ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
-            {ok, handle_action_list(RuleId, Actions, Selected, Input)};
+            {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
         false ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
             {error, nomatch}
@@ -182,73 +185,73 @@ clear_rule_payload() ->
     erlang:erase(rule_payload).
 
 %% SELECT Clause
-select_and_transform(Fields, Input) ->
-    select_and_transform(Fields, Input, #{}).
+select_and_transform(Fields, Columns) ->
+    select_and_transform(Fields, Columns, #{}).
 
-select_and_transform([], _Input, Action) ->
+select_and_transform([], _Columns, Action) ->
     Action;
-select_and_transform(['*' | More], Input, Action) ->
-    select_and_transform(More, Input, maps:merge(Action, Input));
-select_and_transform([{as, Field, Alias} | More], Input, Action) ->
-    Val = eval(Field, Input),
+select_and_transform(['*' | More], Columns, Action) ->
+    select_and_transform(More, Columns, maps:merge(Action, Columns));
+select_and_transform([{as, Field, Alias} | More], Columns, Action) ->
+    Val = eval(Field, Columns),
     select_and_transform(
         More,
-        nested_put(Alias, Val, Input),
+        nested_put(Alias, Val, Columns),
         nested_put(Alias, Val, Action)
     );
-select_and_transform([Field | More], Input, Action) ->
-    Val = eval(Field, Input),
+select_and_transform([Field | More], Columns, Action) ->
+    Val = eval(Field, Columns),
     Key = alias(Field),
     select_and_transform(
         More,
-        nested_put(Key, Val, Input),
+        nested_put(Key, Val, Columns),
         nested_put(Key, Val, Action)
     ).
 
 %% FOREACH Clause
--spec select_and_collect(list(), input()) -> {input(), collection()}.
-select_and_collect(Fields, Input) ->
-    select_and_collect(Fields, Input, {#{}, {'item', []}}).
+-spec select_and_collect(list(), columns()) -> {columns(), collection()}.
+select_and_collect(Fields, Columns) ->
+    select_and_collect(Fields, Columns, {#{}, {'item', []}}).
 
-select_and_collect([{as, Field, {_, A} = Alias}], Input, {Action, _}) ->
-    Val = eval(Field, Input),
+select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) ->
+    Val = eval(Field, Columns),
     {nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
-select_and_collect([{as, Field, Alias} | More], Input, {Action, LastKV}) ->
-    Val = eval(Field, Input),
+select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) ->
+    Val = eval(Field, Columns),
     select_and_collect(
         More,
-        nested_put(Alias, Val, Input),
+        nested_put(Alias, Val, Columns),
         {nested_put(Alias, Val, Action), LastKV}
     );
-select_and_collect([Field], Input, {Action, _}) ->
-    Val = eval(Field, Input),
+select_and_collect([Field], Columns, {Action, _}) ->
+    Val = eval(Field, Columns),
     Key = alias(Field),
     {nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
-select_and_collect([Field | More], Input, {Action, LastKV}) ->
-    Val = eval(Field, Input),
+select_and_collect([Field | More], Columns, {Action, LastKV}) ->
+    Val = eval(Field, Columns),
     Key = alias(Field),
     select_and_collect(
         More,
-        nested_put(Key, Val, Input),
+        nested_put(Key, Val, Columns),
         {nested_put(Key, Val, Action), LastKV}
     ).
 
 %% Filter each item got from FOREACH
-filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
+filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) ->
     lists:filtermap(
         fun(Item) ->
-            InputAndItem = maps:merge(Input, #{CollKey => Item}),
+            ColumnsAndItem = maps:merge(Columns, #{CollKey => Item}),
             case
                 ?RAISE(
-                    match_conditions(InCase, InputAndItem),
+                    match_conditions(InCase, ColumnsAndItem),
                     {match_incase_error, {_EXCLASS_, _EXCPTION_, _ST_}}
                 )
             of
-                true when DoEach == [] -> {true, InputAndItem};
+                true when DoEach == [] -> {true, ColumnsAndItem};
                 true ->
                     {true,
                         ?RAISE(
-                            select_and_transform(DoEach, InputAndItem),
+                            select_and_transform(DoEach, ColumnsAndItem),
                             {doeach_error, {_EXCLASS_, _EXCPTION_, _ST_}}
                         )};
                 false ->
@@ -356,41 +359,41 @@ eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
     nested_get({path, Path}, may_decode_payload(Payload));
 eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
     nested_get({path, Path}, may_decode_payload(Payload));
-eval({path, _} = Path, Input) ->
-    nested_get(Path, Input);
-eval({range, {Begin, End}}, _Input) ->
+eval({path, _} = Path, Columns) ->
+    nested_get(Path, Columns);
+eval({range, {Begin, End}}, _Columns) ->
     range_gen(Begin, End);
-eval({get_range, {Begin, End}, Data}, Input) ->
-    range_get(Begin, End, eval(Data, Input));
-eval({var, _} = Var, Input) ->
-    nested_get(Var, Input);
-eval({const, Val}, _Input) ->
+eval({get_range, {Begin, End}, Data}, Columns) ->
+    range_get(Begin, End, eval(Data, Columns));
+eval({var, _} = Var, Columns) ->
+    nested_get(Var, Columns);
+eval({const, Val}, _Columns) ->
     Val;
 %% unary add
-eval({'+', L}, Input) ->
-    eval(L, Input);
+eval({'+', L}, Columns) ->
+    eval(L, Columns);
 %% unary subtract
-eval({'-', L}, Input) ->
-    -(eval(L, Input));
-eval({Op, L, R}, Input) when ?is_arith(Op) ->
-    apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
-eval({Op, L, R}, Input) when ?is_comp(Op) ->
-    compare(Op, eval(L, Input), eval(R, Input));
-eval({list, List}, Input) ->
-    [eval(L, Input) || L <- List];
-eval({'case', <<>>, CaseClauses, ElseClauses}, Input) ->
-    eval_case_clauses(CaseClauses, ElseClauses, Input);
-eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
-    eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
-eval({'fun', {_, Name}, Args}, Input) ->
-    apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
-
-handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) ->
-    Input#{payload => may_decode_payload(Payload)};
-handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) ->
-    Input#{<<"payload">> => may_decode_payload(Payload)};
-handle_alias(_, Input) ->
-    Input.
+eval({'-', L}, Columns) ->
+    -(eval(L, Columns));
+eval({Op, L, R}, Columns) when ?is_arith(Op) ->
+    apply_func(Op, [eval(L, Columns), eval(R, Columns)], Columns);
+eval({Op, L, R}, Columns) when ?is_comp(Op) ->
+    compare(Op, eval(L, Columns), eval(R, Columns));
+eval({list, List}, Columns) ->
+    [eval(L, Columns) || L <- List];
+eval({'case', <<>>, CaseClauses, ElseClauses}, Columns) ->
+    eval_case_clauses(CaseClauses, ElseClauses, Columns);
+eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) ->
+    eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns);
+eval({'fun', {_, Name}, Args}, Columns) ->
+    apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
+
+handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Columns) ->
+    Columns#{payload => may_decode_payload(Payload)};
+handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns) ->
+    Columns#{<<"payload">> => may_decode_payload(Payload)};
+handle_alias(_, Columns) ->
+    Columns.
 
 alias({var, Var}) ->
     {var, Var};
@@ -417,55 +420,55 @@ alias({'fun', Name, _}) ->
 alias(_) ->
     ?ephemeral_alias(unknown, unknown).
 
-eval_case_clauses([], ElseClauses, Input) ->
+eval_case_clauses([], ElseClauses, Columns) ->
     case ElseClauses of
         {} -> undefined;
-        _ -> eval(ElseClauses, Input)
+        _ -> eval(ElseClauses, Columns)
     end;
-eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
-    case match_conditions(Cond, Input) of
+eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Columns) ->
+    case match_conditions(Cond, Columns) of
         true ->
-            eval(Clause, Input);
+            eval(Clause, Columns);
         _ ->
-            eval_case_clauses(CaseClauses, ElseClauses, Input)
+            eval_case_clauses(CaseClauses, ElseClauses, Columns)
     end.
 
-eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
+eval_switch_clauses(_CaseOn, [], ElseClauses, Columns) ->
     case ElseClauses of
         {} -> undefined;
-        _ -> eval(ElseClauses, Input)
+        _ -> eval(ElseClauses, Columns)
     end;
-eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
-    ConResult = eval(Cond, Input),
-    case eval(CaseOn, Input) of
+eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Columns) ->
+    ConResult = eval(Cond, Columns),
+    case eval(CaseOn, Columns) of
         ConResult ->
-            eval(Clause, Input);
+            eval(Clause, Columns);
         _ ->
-            eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
+            eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns)
     end.
 
-apply_func(Name, Args, Input) when is_atom(Name) ->
-    do_apply_func(Name, Args, Input);
-apply_func(Name, Args, Input) when is_binary(Name) ->
+apply_func(Name, Args, Columns) when is_atom(Name) ->
+    do_apply_func(Name, Args, Columns);
+apply_func(Name, Args, Columns) when is_binary(Name) ->
     FunName =
         try
             binary_to_existing_atom(Name, utf8)
         catch
             error:badarg -> error({sql_function_not_supported, Name})
         end,
-    do_apply_func(FunName, Args, Input).
+    do_apply_func(FunName, Args, Columns).
 
-do_apply_func(Name, Args, Input) ->
+do_apply_func(Name, Args, Columns) ->
     case erlang:apply(emqx_rule_funcs, Name, Args) of
         Func when is_function(Func) ->
-            erlang:apply(Func, [Input]);
+            erlang:apply(Func, [Columns]);
         Result ->
             Result
     end.
 
-add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
-    NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata),
-    Input#{metadata => NewMetadata}.
+add_metadata(Columns, Metadata) when is_map(Columns), is_map(Metadata) ->
+    NewMetadata = maps:merge(maps:get(metadata, Columns, #{}), Metadata),
+    Columns#{metadata => NewMetadata}.
 
 %%------------------------------------------------------------------------------
 %% Internal Functions
@@ -495,6 +498,6 @@ safe_decode_and_cache(MaybeJson) ->
 ensure_list(List) when is_list(List) -> List;
 ensure_list(_NotList) -> [].
 
-nested_put(Alias, Val, Input0) ->
-    Input = handle_alias(Alias, Input0),
-    emqx_rule_maps:nested_put(Alias, Val, Input).
+nested_put(Alias, Val, Columns0) ->
+    Columns = handle_alias(Alias, Columns0),
+    emqx_rule_maps:nested_put(Alias, Val, Columns).

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -61,7 +61,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
         created_at => erlang:system_time(millisecond)
     },
     FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
-    try emqx_rule_runtime:apply_rule(Rule, FullContext) of
+    try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of
         {ok, Data} -> {ok, flatten(Data)};
         {error, Reason} -> {error, Reason}
     after

+ 1 - 3
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -243,7 +243,7 @@ t_add_get_remove_rule(_Config) ->
     ok.
 
 t_add_get_remove_rules(_Config) ->
-    delete_rules_by_ids(emqx_rule_engine:get_rules()),
+    delete_rules_by_ids([Id || #{id := Id} <- emqx_rule_engine:get_rules()]),
     ok = insert_rules(
         [
             make_simple_rule(<<"rule-debug-1">>),
@@ -2386,7 +2386,6 @@ verify_event_fields('message.publish', Fields) ->
         topic := Topic,
         qos := QoS,
         flags := Flags,
-        headers := Headers,
         pub_props := Properties,
         timestamp := Timestamp,
         publish_received_at := EventAt
@@ -2402,7 +2401,6 @@ verify_event_fields('message.publish', Fields) ->
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
     ?assert(is_map(Flags)),
-    ?assert(is_map(Headers)),
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),

+ 5 - 2
apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl

@@ -23,8 +23,6 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--import(emqx_rule_events, [eventmsg_publish/1]).
-
 -define(PROPTEST(F), ?assert(proper:quickcheck(F()))).
 %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
 
@@ -36,6 +34,11 @@ init_per_suite(Config) ->
 
 end_per_suite(_Config) ->
     ok.
+
+eventmsg_publish(Msg) ->
+    {Columns, _} = emqx_rule_events:eventmsg_publish(Msg),
+    Columns.
+
 %%------------------------------------------------------------------------------
 %% Test cases for IoT Funcs
 %%------------------------------------------------------------------------------