Parcourir la source

feat: add user_properties arg for republish action

Zaiming (Stone) Shi il y a 3 ans
Parent
commit
f3df2c80d8

+ 25 - 1
apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf

@@ -218,7 +218,7 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
 of the rule, then the string "undefined" is used.
 """
                          zh: """
-要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
+要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
 默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。
 """
                         }
@@ -227,6 +227,30 @@ of the rule, then the string "undefined" is used.
                            zh: "消息负载"
                           }
                   }
+    republish_args_user_properties {
+        desc {
+            en: """
+From which variable should the MQTT message's User-Property pairs be taken from.
+The value must be a map.
+You may configure it to <code>${pub_props.'User-Property'}</code> or
+use <code>SELECT *,pub_props.'User-Property' as user_properties</code>
+to forward the original user properties to the republished message.
+You may also call <code>map_put</code> function like
+<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
+to inject user properties.
+NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.
+"""
+            zh: """
+指定使用哪个变量来填充 MQTT 消息的 User-Property 列表。这个变量的值必须是一个 map 类型。
+可以设置成 <code>${pub_props.'User-Property'}</code> 或者
+使用 <code>SELECT *,pub_props.'User-Property' as user_properties</code> 来把源 MQTT 消息
+的 User-Property 列表用于填充。
+也可以使用 <code>map_put</code> 函数来添加新的 User-Property,
+<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
+注意:MQTT 协议允许一个消息中出现多次同一个 property 名,但是 EMQX 的规则引擎不允许。
+"""
+        }
+    }
 
     rule_engine_ignore_sys_message {
                    desc {

+ 48 - 40
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -37,6 +37,8 @@
 
 -callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().
 
+-define(ORIGINAL_USER_PROPERTIES, original).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -57,7 +59,8 @@ pre_process_action_args(
         topic := Topic,
         qos := QoS,
         retain := Retain,
-        payload := Payload
+        payload := Payload,
+        user_properties := UserProperties
     } = Args
 ) ->
     Args#{
@@ -65,7 +68,8 @@ pre_process_action_args(
             topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
             qos => preproc_vars(QoS),
             retain => preproc_vars(Retain),
-            payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
+            payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
+            user_properties => preproc_user_properties(UserProperties)
         }
     };
 pre_process_action_args(_, Args) ->
@@ -93,16 +97,16 @@ republish(
     _Args
 ) ->
     ?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
-%% republish a PUBLISH message
 republish(
     Selected,
-    #{flags := Flags, metadata := #{rule_id := RuleId}},
+    #{metadata := #{rule_id := RuleId}} = Env,
     #{
         preprocessed_tmpl := #{
             qos := QoSTks,
             retain := RetainTks,
             topic := TopicTks,
-            payload := PayloadTks
+            payload := PayloadTks,
+            user_properties := UserPropertiesTks
         }
     }
 ) ->
@@ -110,29 +114,22 @@ republish(
     Payload = format_msg(PayloadTks, Selected),
     QoS = replace_simple_var(QoSTks, Selected, 0),
     Retain = replace_simple_var(RetainTks, Selected, false),
-    PubProps = format_pub_props(maps:get(<<"pub_props">>, Selected, #{})),
-    ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
-    safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload, PubProps);
-%% in case this is a "$events/" event
-republish(
-    Selected,
-    #{metadata := #{rule_id := RuleId}},
-    #{
-        preprocessed_tmpl := #{
-            qos := QoSTks,
-            retain := RetainTks,
-            topic := TopicTks,
-            payload := PayloadTks
+    %% 'flags' is set for message re-publishes or message related
+    %% events such as message.acked and message.dropped
+    Flags0 = maps:get(flags, Env, #{}),
+    Flags = Flags0#{retain => Retain},
+    PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
+    ?TRACE(
+        "RULE",
+        "republish_message",
+        #{
+            flags => Flags,
+            topic => Topic,
+            payload => Payload,
+            pub_props => PubProps
         }
-    }
-) ->
-    Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
-    Payload = format_msg(PayloadTks, Selected),
-    QoS = replace_simple_var(QoSTks, Selected, 0),
-    Retain = replace_simple_var(RetainTks, Selected, false),
-    PubProps = maps:get(pub_props, Selected, #{}),
-    ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
-    safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload, PubProps).
+    ),
+    safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
 
 %%--------------------------------------------------------------------
 %% internal functions
@@ -192,6 +189,19 @@ preproc_vars(Data) when is_binary(Data) ->
 preproc_vars(Data) ->
     Data.
 
+preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
+    %% keep the original
+    %% avoid processing this special variable because
+    %% we do not want to force users to select the value
+    %% the value will be taken from Env.pub_props directly
+    ?ORIGINAL_USER_PROPERTIES;
+preproc_user_properties(<<"${", _/binary>> = V) ->
+    %% use a variable
+    emqx_plugin_libs_rule:preproc_tmpl(V);
+preproc_user_properties(_) ->
+    %% invalid, discard
+    undefined.
+
 replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
     [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
     case Var of
@@ -207,16 +217,14 @@ format_msg([], Selected) ->
 format_msg(Tokens, Selected) ->
     emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
 
-format_pub_props(Props) ->
-    maps:fold(fun format_pub_prop/3, #{}, Props).
-
-format_pub_prop(K, V, Acc) when is_atom(K) ->
-    Acc#{K => V};
-format_pub_prop(K, V, Acc) when is_binary(K) ->
-    try
-        K1 = erlang:binary_to_existing_atom(K),
-        format_pub_prop(K1, V, Acc)
-    catch
-        _:_ ->
-            Acc#{K => V}
-    end.
+format_pub_props(UserPropertiesTks, Selected, Env) ->
+    UserProperties =
+        case UserPropertiesTks of
+            ?ORIGINAL_USER_PROPERTIES ->
+                maps:get('User-Property', maps:get(pub_props, Env, #{}), #{});
+            undefined ->
+                #{};
+            _ ->
+                replace_simple_var(UserPropertiesTks, Selected, #{})
+        end,
+    #{'User-Property' => UserProperties}.

+ 9 - 0
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -173,6 +173,15 @@ fields("republish_args") ->
                     default => <<"${payload}">>,
                     example => <<"${payload}">>
                 }
+            )},
+        {user_properties,
+            ?HOCON(
+                binary(),
+                #{
+                    desc => ?DESC("republish_args_user_properties"),
+                    default => <<"${user_properties}">>,
+                    example => <<"${pub_props.'User-Property'}">>
+                }
             )}
     ].
 

+ 66 - 21
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -65,7 +65,8 @@ groups() ->
             t_sqlselect_1,
             t_sqlselect_2,
             t_sqlselect_3,
-            t_sqlselect_message_publish_event,
+            t_sqlselect_message_publish_event_keep_original_props_1,
+            t_sqlselect_message_publish_event_keep_original_props_2,
             t_sqlparse_event_1,
             t_sqlparse_event_2,
             t_sqlparse_event_3,
@@ -941,8 +942,7 @@ t_sqlselect_001(_Config) ->
 t_sqlselect_inject_props(_Config) ->
     SQL =
         "SELECT json_decode(payload) as p, payload, "
-        "map_put('discard', 'discard', pub_props) as pub_props, "
-        "map_put('inject_key', 'inject_val', pub_props.'User-Property') as pub_props.'User-Property' "
+        "map_put('inject_key', 'inject_val', user_properties) as user_properties "
         "FROM \"t3/#\", \"t1\" "
         "WHERE p.x = 1",
     Repub = republish_action(<<"t2">>),
@@ -958,13 +958,12 @@ t_sqlselect_inject_props(_Config) ->
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
     emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
-    ct:sleep(100),
     receive
         {publish, #{topic := T, payload := Payload, properties := Props2}} ->
             ?assertEqual(Props, Props2),
             ?assertEqual(<<"t2">>, T),
             ?assertEqual(<<"{\"x\":1}">>, Payload)
-    after 1000 ->
+    after 2000 ->
         ct:fail(wait_for_t2)
     end,
     emqtt:stop(Client),
@@ -972,10 +971,10 @@ t_sqlselect_inject_props(_Config) ->
 
 t_sqlselect_01(_Config) ->
     SQL =
-        "SELECT json_decode(payload) as p, payload, pub_props "
+        "SELECT json_decode(payload) as p, payload "
         "FROM \"t3/#\", \"t1\" "
         "WHERE p.x = 1",
-    Repub = republish_action(<<"t2">>),
+    Repub = republish_action(<<"t2">>, <<"${payload}">>, <<"${pub_props.'User-Property'}">>),
     {ok, TopicRule1} = emqx_rule_engine:create_rule(
         #{
             sql => SQL,
@@ -988,12 +987,11 @@ t_sqlselect_01(_Config) ->
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
     emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
-    ct:sleep(100),
     receive
         {publish, #{topic := T, payload := Payload}} ->
             ?assertEqual(<<"t2">>, T),
             ?assertEqual(<<"{\"x\":1}">>, Payload)
-    after 1000 ->
+    after 2000 ->
         ct:fail(wait_for_t2)
     end,
 
@@ -1001,7 +999,7 @@ t_sqlselect_01(_Config) ->
     receive
         {publish, #{topic := <<"t2">>, payload := _}} ->
             ct:fail(unexpected_t2)
-    after 1000 ->
+    after 2000 ->
         ok
     end,
 
@@ -1011,8 +1009,8 @@ t_sqlselect_01(_Config) ->
             ?assertEqual(Props, Props2),
             ?assertEqual(<<"t2">>, T3),
             ?assertEqual(<<"{\"x\":1}">>, Payload3)
-    after 1000 ->
-        ct:fail(wait_for_t2)
+    after 2000 ->
+        ct:fail(wait_for_t3)
     end,
 
     emqtt:stop(Client),
@@ -1080,13 +1078,12 @@ t_sqlselect_1(_Config) ->
     {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
-    ct:sleep(200),
     emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
     receive
         {publish, #{topic := T, payload := Payload}} ->
             ?assertEqual(<<"t2">>, T),
             ?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
-    after 1000 ->
+    after 2000 ->
         ct:fail(wait_for_t2)
     end,
 
@@ -1149,14 +1146,13 @@ t_sqlselect_3(_Config) ->
     {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
-    ct:sleep(200),
     {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
     {ok, _} = emqtt:connect(Client1),
     receive
         {publish, #{topic := T, payload := Payload}} ->
             ?assertEqual(<<"t2">>, T),
             ?assertEqual(<<"clientid=c_emqx1">>, Payload)
-    after 1000 ->
+    after 2000 ->
         ct:fail(wait_for_t2)
     end,
 
@@ -1171,11 +1167,51 @@ t_sqlselect_3(_Config) ->
     emqtt:stop(Client),
     delete_rule(TopicRule).
 
-t_sqlselect_message_publish_event(_Config) ->
+t_sqlselect_message_publish_event_keep_original_props_1(_Config) ->
+    %% republish the client.connected msg
+    Topic = <<"foo/bar/1">>,
+    SQL = <<
+        "SELECT clientid "
+        "FROM \"$events/message_dropped\" "
+    >>,
+
+    %"WHERE topic = \"", Topic/binary, "\"">>,
+    Repub = republish_action(
+        <<"t2">>,
+        <<"clientid=${clientid}">>,
+        <<"${pub_props.'User-Property'}">>
+    ),
+    {ok, TopicRule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [Repub]
+        }
+    ),
+    {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(Client1),
+    {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
+    {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(Client2),
+    Props = user_properties(#{<<"mykey">> => <<"111111">>}),
+    emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
+    receive
+        {publish, #{topic := T, payload := Payload, properties := Props1}} ->
+            ?assertEqual(Props1, Props),
+            ?assertEqual(<<"t2">>, T),
+            ?assertEqual(<<"clientid=pub-02">>, Payload)
+    after 2000 ->
+        ct:fail(wait_for_t2)
+    end,
+    emqtt:stop(Client2),
+    emqtt:stop(Client1),
+    delete_rule(TopicRule).
+
+t_sqlselect_message_publish_event_keep_original_props_2(_Config) ->
     %% republish the client.connected msg
     Topic = <<"foo/bar/1">>,
     SQL = <<
-        "SELECT clientid, pub_props "
+        "SELECT clientid, pub_props.'User-Property' as user_properties "
         "FROM \"$events/message_dropped\" "
     >>,
 
@@ -1191,7 +1227,6 @@ t_sqlselect_message_publish_event(_Config) ->
     {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(Client1),
     {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
-    ct:sleep(200),
     {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(Client2),
     Props = user_properties(#{<<"mykey">> => <<"222222222222">>}),
@@ -1201,7 +1236,7 @@ t_sqlselect_message_publish_event(_Config) ->
             ?assertEqual(Props1, Props),
             ?assertEqual(<<"t2">>, T),
             ?assertEqual(<<"clientid=pub-02">>, Payload)
-    after 1000 ->
+    after 2000 ->
         ct:fail(wait_for_t2)
     end,
     emqtt:stop(Client2),
@@ -2553,10 +2588,20 @@ t_get_basic_usage_info_1(_Config) ->
 
 republish_action(Topic) ->
     republish_action(Topic, <<"${payload}">>).
+
 republish_action(Topic, Payload) ->
+    republish_action(Topic, Payload, <<"${user_properties}">>).
+
+republish_action(Topic, Payload, UserProperties) ->
     #{
         function => republish,
-        args => #{payload => Payload, topic => Topic, qos => 0, retain => false}
+        args => #{
+            payload => Payload,
+            topic => Topic,
+            qos => 0,
+            retain => false,
+            user_properties => UserProperties
+        }
     }.
 
 make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->