Просмотр исходного кода

feat: do not drop MQTTv5 properties in rule/bridge

kraftwerk28 3 лет назад
Родитель
Сommit
00c57de4c3

+ 16 - 1
apps/emqx/src/emqx_misc.erl

@@ -54,7 +54,8 @@
     pmap/3,
     readable_error_msg/1,
     safe_to_existing_atom/1,
-    safe_to_existing_atom/2
+    safe_to_existing_atom/2,
+    pub_props_to_packet/1
 ]).
 
 -export([
@@ -568,3 +569,17 @@ ipv6_probe_test() ->
     end.
 
 -endif.
+
+pub_props_to_packet(Properties) ->
+    F = fun
+        ('User-Property', M) ->
+            case is_map(M) andalso map_size(M) > 0 of
+                true -> {true, maps:to_list(M)};
+                false -> false
+            end;
+        ('User-Property-Pairs', _) ->
+            false;
+        (_, _) ->
+            true
+    end,
+    maps:filtermap(F, Properties).

+ 1 - 1
apps/emqx_connector/src/emqx_connector.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_connector, [
     {description, "An OTP application"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
     {mod, {emqx_connector_app, []}},
     {applications, [

+ 6 - 4
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl

@@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{
     Payload = process_payload(PayloadToken, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),
     Retain = replace_simple_var(RetainToken, MapMsg),
+    PubProps = maps:get(pub_props, MapMsg, #{}),
     #mqtt_msg{
         qos = QoS,
         retain = Retain,
         topic = topic(Mountpoint, Topic),
-        props = #{},
+        props = emqx_misc:pub_props_to_packet(PubProps),
         payload = Payload
     };
 to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
     Msg#message{topic = topic(Mountpoint, Topic)}.
 
 %% published from remote node over a MQTT connection
+to_broker_msg(Msg, Vars, undefined) ->
+    to_broker_msg(Msg, Vars, #{});
 to_broker_msg(
     #{dup := Dup} = MapMsg,
     #{
@@ -103,8 +106,9 @@ to_broker_msg(
     Payload = process_payload(PayloadToken, MapMsg),
     QoS = replace_simple_var(QoSToken, MapMsg),
     Retain = replace_simple_var(RetainToken, MapMsg),
+    PubProps = maps:get(pub_props, MapMsg, #{}),
     set_headers(
-        Props,
+        Props#{properties => emqx_misc:pub_props_to_packet(PubProps)},
         emqx_message:set_flags(
             #{dup => Dup, retain => Retain},
             emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
@@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) ->
 estimate_size(Term) ->
     erlang:external_size(Term).
 
-set_headers(undefined, Msg) ->
-    Msg;
 set_headers(Val, Msg) ->
     emqx_message:set_headers(Val, Msg).
 topic(undefined, Topic) -> Topic;

+ 23 - 4
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -110,8 +110,9 @@ republish(
     Payload = format_msg(PayloadTks, Selected),
     QoS = replace_simple_var(QoSTks, Selected, 0),
     Retain = replace_simple_var(RetainTks, Selected, false),
+    PubProps = format_pub_props(maps:get(<<"pub_props">>, Selected, #{})),
     ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
-    safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
+    safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload, PubProps);
 %% in case this is a "$events/" event
 republish(
     Selected,
@@ -129,8 +130,9 @@ republish(
     Payload = format_msg(PayloadTks, Selected),
     QoS = replace_simple_var(QoSTks, Selected, 0),
     Retain = replace_simple_var(RetainTks, Selected, false),
+    PubProps = maps:get(pub_props, Selected, #{}),
     ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
-    safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
+    safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload, PubProps).
 
 %%--------------------------------------------------------------------
 %% internal functions
@@ -168,13 +170,16 @@ pre_process_args(Mod, Func, Args) ->
         false -> Args
     end.
 
-safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
+safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
     Msg = #message{
         id = emqx_guid:gen(),
         qos = QoS,
         from = RuleId,
         flags = Flags,
-        headers = #{republish_by => RuleId},
+        headers = #{
+            republish_by => RuleId,
+            properties => emqx_misc:pub_props_to_packet(PubProps)
+        },
         topic = Topic,
         payload = Payload,
         timestamp = erlang:system_time(millisecond)
@@ -201,3 +206,17 @@ format_msg([], Selected) ->
     emqx_json:encode(Selected);
 format_msg(Tokens, Selected) ->
     emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
+
+format_pub_props(Props) ->
+    maps:fold(fun format_pub_prop/3, #{}, Props).
+
+format_pub_prop(K, V, Acc) when is_atom(K) ->
+    Acc#{K => V};
+format_pub_prop(K, V, Acc) when is_binary(K) ->
+    try
+        K1 = erlang:binary_to_existing_atom(K),
+        format_pub_prop(K1, V, Acc)
+    catch
+        _:_ ->
+            Acc#{K => V}
+    end.

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -2,7 +2,7 @@
 {application, emqx_rule_engine, [
     {description, "EMQX Rule Engine"},
     % strict semver, bump manually!
-    {vsn, "5.0.3"},
+    {vsn, "5.0.4"},
     {modules, []},
     {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
     {applications, [kernel, stdlib, rulesql, getopt]},

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -1060,7 +1060,7 @@ printable_maps(Headers) ->
             (K, V, AccIn) ->
                 AccIn#{K => V}
         end,
-        #{},
+        #{'User-Property' => #{}},
         Headers
     ).
 

+ 82 - 6
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -59,11 +59,13 @@ groups() ->
             t_sqlselect_0,
             t_sqlselect_00,
             t_sqlselect_001,
+            t_sqlselect_inject_props,
             t_sqlselect_01,
             t_sqlselect_02,
             t_sqlselect_1,
             t_sqlselect_2,
             t_sqlselect_3,
+            t_sqlselect_message_publish_event,
             t_sqlparse_event_1,
             t_sqlparse_event_2,
             t_sqlparse_event_3,
@@ -936,9 +938,41 @@ t_sqlselect_001(_Config) ->
         )
     ).
 
+t_sqlselect_inject_props(_Config) ->
+    SQL =
+        "SELECT json_decode(payload) as p, payload, "
+        "map_put('discard', 'discard', pub_props) as pub_props, "
+        "map_put('inject_key', 'inject_val', pub_props.'User-Property') as pub_props.'User-Property' "
+        "FROM \"t3/#\", \"t1\" "
+        "WHERE p.x = 1",
+    Repub = republish_action(<<"t2">>),
+    {ok, TopicRule1} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [Repub]
+        }
+    ),
+    Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}),
+    {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
+    emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
+    ct:sleep(100),
+    receive
+        {publish, #{topic := T, payload := Payload, properties := Props2}} ->
+            ?assertEqual(Props, Props2),
+            ?assertEqual(<<"t2">>, T),
+            ?assertEqual(<<"{\"x\":1}">>, Payload)
+    after 1000 ->
+        ct:fail(wait_for_t2)
+    end,
+    emqtt:stop(Client),
+    delete_rule(TopicRule1).
+
 t_sqlselect_01(_Config) ->
     SQL =
-        "SELECT json_decode(payload) as p, payload "
+        "SELECT json_decode(payload) as p, payload, pub_props "
         "FROM \"t3/#\", \"t1\" "
         "WHERE p.x = 1",
     Repub = republish_action(<<"t2">>),
@@ -949,10 +983,11 @@ t_sqlselect_01(_Config) ->
             actions => [Repub]
         }
     ),
-    {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
+    Props = user_properties(#{<<"mykey">> => <<"myval">>}),
+    {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
-    emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
+    emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
     ct:sleep(100),
     receive
         {publish, #{topic := T, payload := Payload}} ->
@@ -962,7 +997,7 @@ t_sqlselect_01(_Config) ->
         ct:fail(wait_for_t2)
     end,
 
-    emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
+    emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":2}">>, [{qos, 0}]),
     receive
         {publish, #{topic := <<"t2">>, payload := _}} ->
             ct:fail(unexpected_t2)
@@ -970,9 +1005,10 @@ t_sqlselect_01(_Config) ->
         ok
     end,
 
-    emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
+    emqtt:publish(Client, <<"t3/a">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
     receive
-        {publish, #{topic := T3, payload := Payload3}} ->
+        {publish, #{topic := T3, payload := Payload3, properties := Props2}} ->
+            ?assertEqual(Props, Props2),
             ?assertEqual(<<"t2">>, T3),
             ?assertEqual(<<"{\"x\":1}">>, Payload3)
     after 1000 ->
@@ -1135,6 +1171,43 @@ t_sqlselect_3(_Config) ->
     emqtt:stop(Client),
     delete_rule(TopicRule).
 
+t_sqlselect_message_publish_event(_Config) ->
+    %% republish the client.connected msg
+    Topic = <<"foo/bar/1">>,
+    SQL = <<
+        "SELECT clientid, pub_props "
+        "FROM \"$events/message_dropped\" "
+    >>,
+
+    %"WHERE topic = \"", Topic/binary, "\"">>,
+    Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
+    {ok, TopicRule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [Repub]
+        }
+    ),
+    {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(Client1),
+    {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
+    ct:sleep(200),
+    {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(Client2),
+    Props = user_properties(#{<<"mykey">> => <<"222222222222">>}),
+    emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
+    receive
+        {publish, #{topic := T, payload := Payload, properties := Props1}} ->
+            ?assertEqual(Props1, Props),
+            ?assertEqual(<<"t2">>, T),
+            ?assertEqual(<<"clientid=pub-02">>, Payload)
+    after 1000 ->
+        ct:fail(wait_for_t2)
+    end,
+    emqtt:stop(Client2),
+    emqtt:stop(Client1),
+    delete_rule(TopicRule).
+
 t_sqlparse_event_1(_Config) ->
     Sql =
         "select topic as tp "
@@ -2869,6 +2942,9 @@ verify_ipaddr(IPAddrS) ->
 init_events_counters() ->
     ets:new(events_record_tab, [named_table, bag, public]).
 
+user_properties(PairsMap) ->
+    #{'User-Property' => maps:to_list(PairsMap)}.
+
 %%------------------------------------------------------------------------------
 %% Start Apps
 %%------------------------------------------------------------------------------

+ 2 - 0
changes/v5.0.11-en.md

@@ -21,6 +21,8 @@
 
 - Set the default value for the maximum level of a topic to 128 [#9406](https://github.com/emqx/emqx/pull/9406).
 
+- Keep MQTT v5 User-Property pairs from bridge ingested MQTT messsages to bridge target [#9398](https://github.com/emqx/emqx/pull/9398).
+
 ## Bug fixes
 
 - Fix `ssl.existingName` option of  helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).

+ 2 - 0
changes/v5.0.11-zh.md

@@ -19,6 +19,8 @@
 
 - 将主题的最大层级限制的默认值设置为128 [#9406](https://github.com/emqx/emqx/pull/9406)。
 
+- 为桥接收到的 MQTT v5 消息再转发时保留 User-Property 列表 [#9398](https://github.com/emqx/emqx/pull/9398)。
+
 ## 修复
 
 - 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。