Преглед на файлове

Merge pull request #11568 from thalesmg/republish-props-m-20230905

feat(republish): allow templating mqtt properties
Thales Macedo Garitezi преди 2 години
родител
ревизия
e089fda260

+ 1 - 1
apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl

@@ -640,7 +640,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
         #{
             <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
             <<"enable">> => true,
-            <<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}],
+            <<"actions">> => [#{<<"function">> => <<"emqx_bridge_mqtt_SUITE:inspect">>}],
             <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
         }
     ),

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -2,7 +2,7 @@
 {application, emqx_dashboard, [
     {description, "EMQX Web Dashboard"},
     % strict semver, bump manually!
-    {vsn, "5.0.26"},
+    {vsn, "5.0.27"},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
     {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},

+ 100 - 3
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -20,6 +20,7 @@
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqtt/include/emqtt.hrl").
 
 %% APIs
 -export([parse_action/1]).
@@ -60,16 +61,23 @@ pre_process_action_args(
         qos := QoS,
         retain := Retain,
         payload := Payload,
-        user_properties := UserProperties
+        mqtt_properties := MQTTPropertiesTemplate0,
+        user_properties := UserPropertiesTemplate
     } = Args
 ) ->
+    MQTTPropertiesTemplate =
+        maps:map(
+            fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end,
+            MQTTPropertiesTemplate0
+        ),
     Args#{
         preprocessed_tmpl => #{
             topic => emqx_placeholder:preproc_tmpl(Topic),
             qos => preproc_vars(QoS),
             retain => preproc_vars(Retain),
             payload => emqx_placeholder:preproc_tmpl(Payload),
-            user_properties => preproc_user_properties(UserProperties)
+            mqtt_properties => MQTTPropertiesTemplate,
+            user_properties => preproc_user_properties(UserPropertiesTemplate)
         }
     };
 pre_process_action_args(_, Args) ->
@@ -106,6 +114,7 @@ republish(
             retain := RetainTks,
             topic := TopicTks,
             payload := PayloadTks,
+            mqtt_properties := MQTTPropertiesTemplate,
             user_properties := UserPropertiesTks
         }
     }
@@ -118,7 +127,9 @@ republish(
     %% events such as message.acked and message.dropped
     Flags0 = maps:get(flags, Env, #{}),
     Flags = Flags0#{retain => Retain},
-    PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
+    PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env),
+    MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env),
+    PubProps = maps:merge(PubProps0, MQTTProps),
     ?TRACE(
         "RULE",
         "republish_message",
@@ -232,3 +243,89 @@ format_pub_props(UserPropertiesTks, Selected, Env) ->
                 replace_simple_var(UserPropertiesTks, Selected, #{})
         end,
     #{'User-Property' => UserProperties}.
+
+format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) ->
+    #{metadata := #{rule_id := RuleId}} = Env,
+    MQTTProperties0 =
+        maps:fold(
+            fun(K, Template, Acc) ->
+                try
+                    V = emqx_placeholder:proc_tmpl(Template, Selected),
+                    Acc#{K => V}
+                catch
+                    Kind:Error ->
+                        ?SLOG(
+                            debug,
+                            #{
+                                msg => "bad_mqtt_property_value_ignored",
+                                rule_id => RuleId,
+                                exception => Kind,
+                                reason => Error,
+                                property => K,
+                                selected => Selected
+                            }
+                        ),
+                        Acc
+                end
+            end,
+            #{},
+            MQTTPropertiesTemplate
+        ),
+    coerce_properties_values(MQTTProperties0, Env).
+
+ensure_int(B) when is_binary(B) ->
+    try
+        binary_to_integer(B)
+    catch
+        error:badarg ->
+            throw(bad_integer)
+    end;
+ensure_int(I) when is_integer(I) ->
+    I.
+
+coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) ->
+    maps:fold(
+        fun(K, V0, Acc) ->
+            try
+                V = encode_mqtt_property(K, V0),
+                Acc#{K => V}
+            catch
+                throw:bad_integer ->
+                    ?SLOG(
+                        debug,
+                        #{
+                            msg => "bad_mqtt_property_value_ignored",
+                            rule_id => RuleId,
+                            reason => bad_integer,
+                            property => K,
+                            value => V0
+                        }
+                    ),
+                    Acc;
+                Kind:Reason:Stacktrace ->
+                    ?SLOG(
+                        debug,
+                        #{
+                            msg => "bad_mqtt_property_value_ignored",
+                            rule_id => RuleId,
+                            exception => Kind,
+                            reason => Reason,
+                            property => K,
+                            value => V0,
+                            stacktrace => Stacktrace
+                        }
+                    ),
+                    Acc
+            end
+        end,
+        #{},
+        MQTTProperties
+    ).
+
+%% Note: currently we do not support `Topic-Alias', which would need to be encoded as an
+%% int.
+encode_mqtt_property('Payload-Format-Indicator', V) -> ensure_int(V);
+encode_mqtt_property('Message-Expiry-Interval', V) -> ensure_int(V);
+encode_mqtt_property('Subscription-Identifier', V) -> ensure_int(V);
+%% note: `emqx_placeholder:proc_tmpl/2' currently always return a binary.
+encode_mqtt_property(_Prop, V) when is_binary(V) -> V.

+ 45 - 7
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -63,7 +63,7 @@ fields("rules") ->
             )},
         {"actions",
             ?HOCON(
-                ?ARRAY(?UNION(actions())),
+                ?ARRAY(hoconsc:union(actions())),
                 #{
                     desc => ?DESC("rules_actions"),
                     default => [],
@@ -161,6 +161,14 @@ fields("republish_args") ->
                     example => <<"${payload}">>
                 }
             )},
+        {mqtt_properties,
+            ?HOCON(
+                ?R_REF("republish_mqtt_properties"),
+                #{
+                    desc => ?DESC("republish_args_mqtt_properties"),
+                    default => #{}
+                }
+            )},
         {user_properties,
             ?HOCON(
                 binary(),
@@ -170,6 +178,17 @@ fields("republish_args") ->
                     example => <<"${pub_props.'User-Property'}">>
                 }
             )}
+    ];
+fields("republish_mqtt_properties") ->
+    [
+        {'Payload-Format-Indicator',
+            ?HOCON(binary(), #{required => false, desc => ?DESC('Payload-Format-Indicator')})},
+        {'Message-Expiry-Interval',
+            ?HOCON(binary(), #{required => false, desc => ?DESC('Message-Expiry-Interval')})},
+        {'Content-Type', ?HOCON(binary(), #{required => false, desc => ?DESC('Content-Type')})},
+        {'Response-Topic', ?HOCON(binary(), #{required => false, desc => ?DESC('Response-Topic')})},
+        {'Correlation-Data',
+            ?HOCON(binary(), #{required => false, desc => ?DESC('Correlation-Data')})}
     ].
 
 desc("rule_engine") ->
@@ -200,12 +219,31 @@ rule_name() ->
         )}.
 
 actions() ->
-    [
-        binary(),
-        ?R_REF("builtin_action_republish"),
-        ?R_REF("builtin_action_console"),
-        ?R_REF("user_provided_function")
-    ].
+    fun
+        (all_union_members) ->
+            [
+                binary(),
+                ?R_REF("builtin_action_republish"),
+                ?R_REF("builtin_action_console"),
+                ?R_REF("user_provided_function")
+            ];
+        ({value, V}) ->
+            case V of
+                #{<<"function">> := <<"console">>} ->
+                    [?R_REF("builtin_action_console")];
+                #{<<"function">> := <<"republish">>} ->
+                    [?R_REF("builtin_action_republish")];
+                #{<<"function">> := <<_/binary>>} ->
+                    [?R_REF("user_provided_function")];
+                <<_/binary>> ->
+                    [binary()];
+                _ ->
+                    throw(#{
+                        field_name => actions,
+                        reason => <<"unknown action type">>
+                    })
+            end
+    end.
 
 qos() ->
     ?UNION([emqx_schema:qos(), binary()]).

+ 265 - 2
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -23,6 +23,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
@@ -74,6 +75,7 @@ groups() ->
             t_sqlselect_inject_props,
             t_sqlselect_01,
             t_sqlselect_02,
+            t_sqlselect_03,
             t_sqlselect_1,
             t_sqlselect_2,
             t_sqlselect_3,
@@ -580,13 +582,16 @@ t_get_rule_ids_by_action(_) ->
 t_ensure_action_removed(_) ->
     Id = <<"t_ensure_action_removed">>,
     GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
-    emqx:update_config(
+    {ok, _} = emqx:update_config(
         [rule_engine, rules, Id],
         #{
             <<"actions">> => [
                 #{<<"function">> => GetSelectedData},
                 #{<<"function">> => <<"console">>},
-                #{<<"function">> => <<"republish">>},
+                #{
+                    <<"function">> => <<"republish">>,
+                    <<"args">> => #{<<"topic">> => <<"some/topic">>}
+                },
                 <<"mysql:foo">>,
                 <<"mqtt:bar">>
             ],
@@ -1467,6 +1472,260 @@ t_sqlselect_02(_Config) ->
     emqtt:stop(Client),
     delete_rule(TopicRule1).
 
+t_sqlselect_03(_Config) ->
+    init_events_counters(),
+    SQL = "SELECT * FROM \"t/r\" ",
+    Repub = republish_action(
+        <<"t/republish">>,
+        <<"${.}">>,
+        <<"${pub_props.'User-Property'}">>,
+        #{
+            <<"Payload-Format-Indicator">> => <<"${.payload.pfi}">>,
+            <<"Message-Expiry-Interval">> => <<"${.payload.mei}">>,
+            <<"Content-Type">> => <<"${.payload.ct}">>,
+            <<"Response-Topic">> => <<"${.payload.rt}">>,
+            <<"Correlation-Data">> => <<"${.payload.cd}">>
+        }
+    ),
+    RepubRaw = emqx_utils_maps:binary_key_map(Repub#{function => <<"republish">>}),
+    ct:pal("republish action raw:\n  ~p", [RepubRaw]),
+    RuleRaw = #{
+        <<"sql">> => SQL,
+        <<"actions">> => [RepubRaw]
+    },
+    {ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw, #{}),
+    on_exit(fun() -> emqx_rule_engine:delete_rule(?TMP_RULEID) end),
+    %% to check what republish is actually producing without loss of information
+    SQL1 = "select * from \"t/republish\" ",
+    RuleId0 = ?TMP_RULEID,
+    RuleId1 = <<RuleId0/binary, "2">>,
+    {ok, _} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL1,
+            id => RuleId1,
+            actions => [
+                #{
+                    function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>,
+                    args => #{}
+                }
+            ]
+        }
+    ),
+    on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId1) end),
+
+    UserProps = maps:to_list(#{<<"mykey">> => <<"myval">>}),
+    Payload =
+        emqx_utils_json:encode(
+            #{
+                pfi => 1,
+                mei => 2,
+                ct => <<"3">>,
+                rt => <<"4">>,
+                cd => <<"5">>
+            }
+        ),
+    {ok, Client} = emqtt:start_link([
+        {username, <<"emqx">>},
+        {proto_ver, v5},
+        {properties, #{'Topic-Alias-Maximum' => 100}}
+    ]),
+    on_exit(fun() -> emqtt:stop(Client) end),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, <<"t/republish">>, 0),
+    PubProps = #{'User-Property' => UserProps},
+    ExpectedMQTTProps0 = #{
+        'Payload-Format-Indicator' => 1,
+        'Message-Expiry-Interval' => 2,
+        'Content-Type' => <<"3">>,
+        'Response-Topic' => <<"4">>,
+        'Correlation-Data' => <<"5">>,
+        %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
+        %% so the channel controls those aliases on its own, starting from 1.
+        'Topic-Alias' => 1,
+        'User-Property' => UserProps
+    },
+    emqtt:publish(Client, <<"t/r">>, PubProps, Payload, [{qos, 0}]),
+    receive
+        {publish, #{topic := <<"t/republish">>, properties := Props1}} ->
+            ?assertEqual(ExpectedMQTTProps0, Props1),
+            ok
+    after 2000 ->
+        ct:pal("mailbox:\n  ~p", [?drainMailbox()]),
+        ct:fail("message not republished (l. ~b)", [?LINE])
+    end,
+    ExpectedMQTTProps1 = #{
+        'Payload-Format-Indicator' => 1,
+        'Message-Expiry-Interval' => 2,
+        'Content-Type' => <<"3">>,
+        'Response-Topic' => <<"4">>,
+        'Correlation-Data' => <<"5">>,
+        'User-Property' => maps:from_list(UserProps),
+        'User-Property-Pairs' => [
+            #{key => K, value => V}
+         || {K, V} <- UserProps
+        ]
+    },
+    ?assertMatch(
+        [
+            {'message.publish', #{
+                topic := <<"t/republish">>,
+                pub_props := ExpectedMQTTProps1
+            }}
+        ],
+        ets:lookup(events_record_tab, 'message.publish'),
+        #{expected_props => ExpectedMQTTProps1}
+    ),
+
+    ct:pal("testing payload that is not a json object"),
+    emqtt:publish(Client, <<"t/r">>, PubProps, <<"not-a-map">>, [{qos, 0}]),
+    ExpectedMQTTProps2 = #{
+        'Content-Type' => <<"undefined">>,
+        'Correlation-Data' => <<"undefined">>,
+        'Response-Topic' => <<"undefined">>,
+        %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
+        %% so the channel controls those aliases on its own, starting from 1.
+        'Topic-Alias' => 1,
+        'User-Property' => UserProps
+    },
+    receive
+        {publish, #{topic := T1, properties := Props2}} ->
+            ?assertEqual(ExpectedMQTTProps2, Props2),
+            %% empty this time, due to topic alias set before
+            ?assertEqual(<<>>, T1),
+            ok
+    after 2000 ->
+        ct:pal("mailbox:\n  ~p", [?drainMailbox()]),
+        ct:fail("message not republished (l. ~b)", [?LINE])
+    end,
+
+    ct:pal("testing payload with some uncoercible keys"),
+    ets:delete_all_objects(events_record_tab),
+    Payload1 =
+        emqx_utils_json:encode(#{
+            pfi => <<"bad_value1">>,
+            mei => <<"bad_value2">>,
+            ct => <<"some_value3">>,
+            rt => <<"some_value4">>,
+            cd => <<"some_value5">>
+        }),
+    emqtt:publish(Client, <<"t/r">>, PubProps, Payload1, [{qos, 0}]),
+    ExpectedMQTTProps3 = #{
+        %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
+        %% so the channel controls those aliases on its own, starting from 1.
+        'Topic-Alias' => 1,
+        'Content-Type' => <<"some_value3">>,
+        'Response-Topic' => <<"some_value4">>,
+        'Correlation-Data' => <<"some_value5">>,
+        'User-Property' => UserProps
+    },
+    receive
+        {publish, #{topic := T2, properties := Props3}} ->
+            ?assertEqual(ExpectedMQTTProps3, Props3),
+            %% empty this time, due to topic alias set before
+            ?assertEqual(<<>>, T2),
+            ok
+    after 2000 ->
+        ct:pal("mailbox:\n  ~p", [?drainMailbox()]),
+        ct:fail("message not republished (l. ~b)", [?LINE])
+    end,
+    ExpectedMQTTProps4 = #{
+        'Content-Type' => <<"some_value3">>,
+        'Response-Topic' => <<"some_value4">>,
+        'Correlation-Data' => <<"some_value5">>,
+        'User-Property' => maps:from_list(UserProps),
+        'User-Property-Pairs' => [
+            #{key => K, value => V}
+         || {K, V} <- UserProps
+        ]
+    },
+    ?assertMatch(
+        [
+            {'message.publish', #{
+                topic := <<"t/republish">>,
+                pub_props := ExpectedMQTTProps4
+            }}
+        ],
+        ets:lookup(events_record_tab, 'message.publish'),
+        #{expected_props => ExpectedMQTTProps4}
+    ),
+
+    ct:pal("testing a payload with a more complex placeholder"),
+    Repub1 = republish_action(
+        <<"t/republish">>,
+        <<"${.}">>,
+        <<"${pub_props.'User-Property'}">>,
+        #{
+            %% Note: `Payload-Format-Indicator' is capped at 225.
+            <<"Payload-Format-Indicator">> => <<"1${.payload.pfi}3">>,
+            <<"Message-Expiry-Interval">> => <<"9${.payload.mei}6">>
+        }
+    ),
+    RepubRaw1 = emqx_utils_maps:binary_key_map(Repub1#{function => <<"republish">>}),
+    ct:pal("republish action raw:\n  ~p", [RepubRaw1]),
+    RuleRaw1 = #{
+        <<"sql">> => SQL,
+        <<"actions">> => [RepubRaw1]
+    },
+    {ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw1, #{}),
+
+    Payload2 =
+        emqx_utils_json:encode(#{
+            pfi => <<"2">>,
+            mei => <<"87">>
+        }),
+    emqtt:publish(Client, <<"t/r">>, PubProps, Payload2, [{qos, 0}]),
+    ExpectedMQTTProps5 = #{
+        %% Note: PFI should be 0 or 1 according to spec, but we don't validate this when
+        %% serializing nor parsing...
+        'Payload-Format-Indicator' => 123,
+        'Message-Expiry-Interval' => 9876,
+        %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
+        %% so the channel controls those aliases on its own, starting from 1.
+        'Topic-Alias' => 1,
+        'User-Property' => UserProps
+    },
+    receive
+        {publish, #{topic := T3, properties := Props4}} ->
+            ?assertEqual(ExpectedMQTTProps5, Props4),
+            %% empty this time, due to topic alias set before
+            ?assertEqual(<<>>, T3),
+            ok
+    after 2000 ->
+        ct:pal("mailbox:\n  ~p", [?drainMailbox()]),
+        ct:fail("message not republished (l. ~b)", [?LINE])
+    end,
+
+    ct:pal("testing payload-format-indicator cap"),
+    Payload3 =
+        emqx_utils_json:encode(#{
+            pfi => <<"999999">>,
+            mei => <<"87">>
+        }),
+    emqtt:publish(Client, <<"t/r">>, PubProps, Payload3, [{qos, 0}]),
+    ExpectedMQTTProps6 = #{
+        %% Note: PFI should be 0 or 1 according to spec, but we don't validate this when
+        %% serializing nor parsing...
+        %% Note: PFI is capped at 16#FF
+        'Payload-Format-Indicator' => 16#FF band 19999993,
+        'Message-Expiry-Interval' => 9876,
+        %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
+        %% so the channel controls those aliases on its own, starting from 1.
+        'Topic-Alias' => 1,
+        'User-Property' => UserProps
+    },
+    receive
+        {publish, #{topic := T4, properties := Props5}} ->
+            ?assertEqual(ExpectedMQTTProps6, Props5),
+            %% empty this time, due to topic alias set before
+            ?assertEqual(<<>>, T4),
+            ok
+    after 2000 ->
+        ct:pal("mailbox:\n  ~p", [?drainMailbox()]),
+        ct:fail("message not republished (l. ~b)", [?LINE])
+    end,
+
+    ok.
+
 t_sqlselect_1(_Config) ->
     SQL =
         "SELECT json_decode(payload) as p, payload "
@@ -3271,6 +3530,9 @@ republish_action(Topic, Payload) ->
     republish_action(Topic, Payload, <<"${user_properties}">>).
 
 republish_action(Topic, Payload, UserProperties) ->
+    republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}).
+
+republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
     #{
         function => republish,
         args => #{
@@ -3278,6 +3540,7 @@ republish_action(Topic, Payload, UserProperties) ->
             topic => Topic,
             qos => 0,
             retain => false,
+            mqtt_properties => MQTTProperties,
             user_properties => UserProperties
         }
     }.

+ 132 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl

@@ -0,0 +1,132 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_schema_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%===========================================================================
+%% Data Section
+%%===========================================================================
+
+%% erlfmt-ignore
+republish_hocon0() ->
+"""
+rule_engine.rules.my_rule {
+  description = \"some desc\"
+  metadata = {created_at = 1693918992079}
+  sql = \"select * from \\\"t/topic\\\" \"
+  actions = [
+    {function = console}
+    { function = republish
+      args = {
+        payload = \"${.}\"
+        qos = 0
+        retain = false
+        topic = \"t/repu\"
+        mqtt_properties {
+          \"Payload-Format-Indicator\" = \"${.payload.pfi}\"
+          \"Message-Expiry-Interval\" = \"${.payload.mei}\"
+          \"Content-Type\" = \"${.payload.ct}\"
+          \"Response-Topic\" = \"${.payload.rt}\"
+          \"Correlation-Data\" = \"${.payload.cd}\"
+        }
+        user_properties = \"${pub_props.'User-Property'}\"
+      }
+    },
+    \"bridges:kafka:kprodu\",
+    { function = custom_fn
+      args = {
+        actually = not_republish
+      }
+    }
+  ]
+}
+""".
+
+%%===========================================================================
+%% Helper functions
+%%===========================================================================
+
+parse(Hocon) ->
+    {ok, Conf} = hocon:binary(Hocon),
+    Conf.
+
+check(Conf) when is_map(Conf) ->
+    hocon_tconf:check_plain(emqx_rule_engine_schema, Conf).
+
+-define(validation_error(Reason, Value),
+    {emqx_rule_engine_schema, [
+        #{
+            kind := validation_error,
+            reason := Reason,
+            value := Value
+        }
+    ]}
+).
+
+-define(ok_config(Cfg), #{
+    <<"rule_engine">> :=
+        #{
+            <<"rules">> :=
+                #{
+                    <<"my_rule">> :=
+                        Cfg
+                }
+        }
+}).
+
+%%===========================================================================
+%% Test cases
+%%===========================================================================
+
+republish_test_() ->
+    BaseConf = parse(republish_hocon0()),
+    [
+        {"base config",
+            ?_assertMatch(
+                ?ok_config(
+                    #{
+                        <<"actions">> := [
+                            #{<<"function">> := console},
+                            #{
+                                <<"function">> := republish,
+                                <<"args">> :=
+                                    #{
+                                        <<"mqtt_properties">> :=
+                                            #{
+                                                <<"Payload-Format-Indicator">> := <<_/binary>>,
+                                                <<"Message-Expiry-Interval">> := <<_/binary>>,
+                                                <<"Content-Type">> := <<_/binary>>,
+                                                <<"Response-Topic">> := <<_/binary>>,
+                                                <<"Correlation-Data">> := <<_/binary>>
+                                            }
+                                    }
+                            },
+                            <<"bridges:kafka:kprodu">>,
+                            #{
+                                <<"function">> := <<"custom_fn">>,
+                                <<"args">> :=
+                                    #{
+                                        <<"actually">> := <<"not_republish">>
+                                    }
+                            }
+                        ]
+                    }
+                ),
+                check(BaseConf)
+            )}
+    ].

+ 16 - 7
apps/emqx_utils/src/emqx_placeholder.erl

@@ -277,20 +277,29 @@ lookup_var([Prop | Rest], Data0) ->
     end.
 
 lookup(Prop, Data) when is_binary(Prop) ->
-    case maps:get(Prop, Data, undefined) of
-        undefined ->
-            try
-                {ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)}
+    case do_one_lookup(Prop, Data) of
+        {error, undefined} ->
+            try binary_to_existing_atom(Prop, utf8) of
+                AtomKey ->
+                    do_one_lookup(AtomKey, Data)
             catch
-                error:{badkey, _} ->
-                    {error, undefined};
                 error:badarg ->
                     {error, undefined}
             end;
-        Value ->
+        {ok, Value} ->
             {ok, Value}
     end.
 
+do_one_lookup(Key, Data) ->
+    try
+        {ok, maps:get(Key, Data)}
+    catch
+        error:{badkey, _} ->
+            {error, undefined};
+        error:{badmap, _} ->
+            {error, undefined}
+    end.
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------

+ 1 - 1
apps/emqx_utils/src/emqx_utils.app.src

@@ -2,7 +2,7 @@
 {application, emqx_utils, [
     {description, "Miscellaneous utilities for EMQX apps"},
     % strict semver, bump manually!
-    {vsn, "5.0.7"},
+    {vsn, "5.0.8"},
     {modules, [
         emqx_utils,
         emqx_utils_api,

+ 22 - 0
apps/emqx_utils/test/emqx_placeholder_SUITE.erl

@@ -256,3 +256,25 @@ t_proc_tmpl_arbitrary_var_name_double_quote(_) ->
         <<"a:1,a:1-1,b:1,b:2,c:1.0,d:oo,d1:hi}">>,
         emqx_placeholder:proc_tmpl(Tks, Selected)
     ).
+
+t_proc_tmpl_badmap(_Config) ->
+    ThisTks = emqx_placeholder:preproc_tmpl(<<"${.}">>),
+    Tks = emqx_placeholder:preproc_tmpl(<<"${.a.b.c}">>),
+    BadMap = <<"not-a-map">>,
+    ?assertEqual(
+        <<"not-a-map">>,
+        emqx_placeholder:proc_tmpl(ThisTks, BadMap)
+    ),
+    ?assertEqual(
+        <<"undefined">>,
+        emqx_placeholder:proc_tmpl(Tks, #{<<"a">> => #{<<"b">> => BadMap}})
+    ),
+    ?assertEqual(
+        <<"undefined">>,
+        emqx_placeholder:proc_tmpl(Tks, #{<<"a">> => BadMap})
+    ),
+    ?assertEqual(
+        <<"undefined">>,
+        emqx_placeholder:proc_tmpl(Tks, BadMap)
+    ),
+    ok.

+ 1 - 0
changes/ce/feat-11568.en.md

@@ -0,0 +1 @@
+Added support for defining templates for MQTT publish properties in Republish rule action.

+ 1 - 0
changes/ce/fix-11568.en.md

@@ -0,0 +1 @@
+Fixed an issue where an ill-defined builtin rule action config could be interpreted as a custom user function.

+ 10 - 0
rel/i18n/emqx_rule_engine_schema.hocon

@@ -103,6 +103,16 @@ You may also call <code>map_put</code> function like
 to inject user properties.
 NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not."""
 
+republish_args_user_properties.label:
+"""User Properties"""
+
+republish_args_mqtt_properties.desc:
+"""From which variable should the MQTT Publish Properties of the message be taken.
+Placeholders like <code>${.payload.content_type}</code> may be used."""
+
+republish_args_mqtt_properties.label:
+"""MQTT Properties"""
+
 republish_function.desc:
 """Republish the message as a new MQTT message"""