Parcourir la source

Merge pull request #9401 from sstrigler/EMQX-7751-support-message-properties

EMQX 7751 support message properties
Stefan Strigler il y a 3 ans
Parent
commit
08163feed7

+ 45 - 0
apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf

@@ -124,4 +124,49 @@ MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消
             zh: "失败的详细原因。"
             zh: "失败的详细原因。"
         }
         }
     }
     }
+    message_properties {
+        desc {
+             en: "The Properties of the PUBLISH message."
+             zh: "PUBLISH 消息里的 Property 字段。"
+        }
+    }
+    msg_payload_format_indicator {
+        desc {
+             en: """0 (0x00) Byte Indicates that the Payload is unspecified bytes, which is equivalent to not sending a Payload Format Indicator.
+
+1 (0x01) Byte Indicates that the Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload MUST be well-formed UTF-8 as defined by the Unicode specification and restated in RFC 3629.
+"""
+             zh: "载荷格式指示标识符,0 表示载荷是未指定格式的数据,相当于没有发送载荷格式指示;1 表示载荷是 UTF-8 编码的字符数据,载荷中的 UTF-8 数据必须是按照 Unicode 的规范和 RFC 3629 的标准要求进行编码的。"
+        }
+    }
+    msg_message_expiry_interval {
+        desc {
+             en: "Identifier of the Message Expiry Interval. If the Message Expiry Interval has passed and the Server has not managed to start onward delivery to a matching subscriber, then it MUST delete the copy of the message for that subscriber."
+             zh: "消息过期间隔标识符,以秒为单位。当消失已经过期时,如果服务端还没有开始向匹配的订阅者投递该消息,则服务端会删除该订阅者的消息副本。如果不设置,则消息永远不会过期"
+        }
+    }
+    msg_response_topic {
+        desc {
+             en: "Identifier of the Response Topic.The Response Topic MUST be a UTF-8 Encoded, It MUST NOT contain wildcard characters."
+             zh: "响应主题标识符, UTF-8 编码的字符串,用作响应消息的主题名。响应主题不能包含通配符,也不能包含多个主题,否则将造成协议错误。当存在响应主题时,消息将被视作请求报文。服务端在收到应用消息时必须将响应主题原封不动的发送给所有的订阅者。"
+        }
+    }
+    msg_correlation_data {
+        desc {
+             en: "Identifier of the Correlation Data. The Server MUST send the Correlation Data unaltered to all subscribers receiving the Application Message."
+             zh: "对比数据标识符,服务端在收到应用消息时必须原封不动的把对比数据发送给所有的订阅者。对比数据只对请求消息(Request Message)的发送端和响应消息(Response Message)的接收端有意义。"
+        }
+    }
+    msg_user_properties {
+        desc {
+             en: "The User-Property key-value pairs. Note: in case there are duplicated keys, only the last one will be used."
+             zh: "指定 MQTT 消息的 User Property 键值对。注意,如果出现重复的键,只有最后一个会保留。"
+        }
+    }
+    msg_content_type {
+        desc {
+             en: "The Content Type MUST be a UTF-8 Encoded String."
+             zh: "内容类型标识符,以 UTF-8 格式编码的字符串,用来描述应用消息的内容,服务端必须把收到的应用消息中的内容类型原封不动的发送给所有的订阅者。"
+        }
+    }
 }
 }

+ 67 - 1
apps/emqx_management/src/emqx_mgmt_api_publish.erl

@@ -114,6 +114,11 @@ fields(message) ->
                 required => true,
                 required => true,
                 example => <<"hello emqx api">>
                 example => <<"hello emqx api">>
             })},
             })},
+        {properties,
+            hoconsc:mk(hoconsc:ref(?MODULE, message_properties), #{
+                desc => ?DESC(message_properties),
+                required => false
+            })},
         {retain,
         {retain,
             hoconsc:mk(boolean(), #{
             hoconsc:mk(boolean(), #{
                 desc => ?DESC(retain),
                 desc => ?DESC(retain),
@@ -130,6 +135,43 @@ fields(publish_message) ->
                 default => plain
                 default => plain
             })}
             })}
     ] ++ fields(message);
     ] ++ fields(message);
+fields(message_properties) ->
+    [
+        {'payload_format_indicator',
+            hoconsc:mk(typerefl:range(0, 1), #{
+                desc => ?DESC(msg_payload_format_indicator),
+                required => false,
+                example => 0
+            })},
+        {'message_expiry_interval',
+            hoconsc:mk(integer(), #{
+                desc => ?DESC(msg_message_expiry_interval),
+                required => false
+            })},
+        {'response_topic',
+            hoconsc:mk(binary(), #{
+                desc => ?DESC(msg_response_topic),
+                required => false,
+                example => <<"some_other_topic">>
+            })},
+        {'correlation_data',
+            hoconsc:mk(binary(), #{
+                desc => ?DESC(msg_correlation_data),
+                required => false
+            })},
+        {'user_properties',
+            hoconsc:mk(map(), #{
+                desc => ?DESC(msg_user_properties),
+                required => false,
+                example => #{<<"foo">> => <<"bar">>}
+            })},
+        {'content_type',
+            hoconsc:mk(binary(), #{
+                desc => ?DESC(msg_content_type),
+                required => false,
+                example => <<"text/plain">>
+            })}
+    ];
 fields(publish_ok) ->
 fields(publish_ok) ->
     [
     [
         {id,
         {id,
@@ -288,13 +330,23 @@ make_message(Map) ->
             QoS = maps:get(<<"qos">>, Map, 0),
             QoS = maps:get(<<"qos">>, Map, 0),
             Topic = maps:get(<<"topic">>, Map),
             Topic = maps:get(<<"topic">>, Map),
             Retain = maps:get(<<"retain">>, Map, false),
             Retain = maps:get(<<"retain">>, Map, false),
+            Headers =
+                case maps:get(<<"properties">>, Map, #{}) of
+                    Properties when
+                        is_map(Properties) andalso
+                            map_size(Properties) > 0
+                    ->
+                        #{properties => to_msg_properties(Properties)};
+                    _ ->
+                        #{}
+                end,
             try
             try
                 _ = emqx_topic:validate(name, Topic)
                 _ = emqx_topic:validate(name, Topic)
             catch
             catch
                 error:_Reason ->
                 error:_Reason ->
                     throw(invalid_topic_name)
                     throw(invalid_topic_name)
             end,
             end,
-            Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}),
+            Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, Headers),
             Size = emqx_message:estimate_size(Message),
             Size = emqx_message:estimate_size(Message),
             (Size > size_limit()) andalso throw(packet_too_large),
             (Size > size_limit()) andalso throw(packet_too_large),
             {ok, Message};
             {ok, Message};
@@ -302,6 +354,20 @@ make_message(Map) ->
             {error, R}
             {error, R}
     end.
     end.
 
 
+to_msg_properties(Properties) ->
+    maps:fold(
+        fun to_property/3,
+        #{},
+        Properties
+    ).
+
+to_property(<<"payload_format_indicator">>, V, M) -> M#{'Payload-Format-Indicator' => V};
+to_property(<<"message_expiry_interval">>, V, M) -> M#{'Message-Expiry-Interval' => V};
+to_property(<<"response_topic">>, V, M) -> M#{'Response-Topic' => V};
+to_property(<<"correlation_data">>, V, M) -> M#{'Correlation-Data' => V};
+to_property(<<"user_properties">>, V, M) -> M#{'User-Property' => maps:to_list(V)};
+to_property(<<"content_type">>, V, M) -> M#{'Content-Type' => V}.
+
 %% get the global packet size limit since HTTP API does not belong to any zone.
 %% get the global packet size limit since HTTP API does not belong to any zone.
 size_limit() ->
 size_limit() ->
     try
     try

+ 65 - 34
apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl

@@ -20,9 +20,7 @@
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
-
--define(CLIENTID, <<"api_clientid">>).
--define(USERNAME, <<"api_username">>).
+-include_lib("common_test/include/ct.hrl").
 
 
 -define(TOPIC1, <<"api_topic1">>).
 -define(TOPIC1, <<"api_topic1">>).
 -define(TOPIC2, <<"api_topic2">>).
 -define(TOPIC2, <<"api_topic2">>).
@@ -44,25 +42,56 @@ end_per_testcase(Case, Config) ->
     ?MODULE:Case({'end', Config}).
     ?MODULE:Case({'end', Config}).
 
 
 t_publish_api({init, Config}) ->
 t_publish_api({init, Config}) ->
-    Config;
-t_publish_api({'end', _Config}) ->
-    ok;
-t_publish_api(_) ->
-    {ok, Client} = emqtt:start_link(#{
-        username => <<"api_username">>, clientid => <<"api_clientid">>
-    }),
+    {ok, Client} = emqtt:start_link(
+        #{
+            username => <<"api_username">>,
+            clientid => <<"api_clientid">>,
+            proto_ver => v5
+        }
+    ),
     {ok, _} = emqtt:connect(Client),
     {ok, _} = emqtt:connect(Client),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
+    [{client, Client} | Config];
+t_publish_api({'end', Config}) ->
+    Client = ?config(client, Config),
+    emqtt:stop(Client),
+    ok;
+t_publish_api(_) ->
     Payload = <<"hello">>,
     Payload = <<"hello">>,
     Path = emqx_mgmt_api_test_util:api_path(["publish"]),
     Path = emqx_mgmt_api_test_util:api_path(["publish"]),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
-    Body = #{topic => ?TOPIC1, payload => Payload},
+    UserProperties = #{<<"foo">> => <<"bar">>},
+    Properties =
+        #{
+            <<"payload_format_indicator">> => 0,
+            <<"message_expiry_interval">> => 1000,
+            <<"response_topic">> => ?TOPIC2,
+            <<"correlation_data">> => <<"some_correlation_id">>,
+            <<"user_properties">> => UserProperties,
+            <<"content_type">> => <<"application/json">>
+        },
+    Body = #{topic => ?TOPIC1, payload => Payload, properties => Properties},
     {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
     {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
     ResponseMap = decode_json(Response),
     ResponseMap = decode_json(Response),
     ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
     ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
-    ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
-    emqtt:stop(Client).
+    {ok, Message} = receive_assert(?TOPIC1, 0, Payload),
+    RecvProperties = maps:get(properties, Message),
+    UserPropertiesList = maps:to_list(UserProperties),
+    #{
+        'Payload-Format-Indicator' := 0,
+        'Message-Expiry-Interval' := RecvMessageExpiry,
+        'Correlation-Data' := <<"some_correlation_id">>,
+        'User-Property' := UserPropertiesList,
+        'Content-Type' := <<"application/json">>
+    } = RecvProperties,
+    ?assert(RecvMessageExpiry =< 1000),
+    %% note: without props this time
+    Body2 = #{topic => ?TOPIC2, payload => Payload},
+    {ok, Response2} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body2),
+    ResponseMap2 = decode_json(Response2),
+    ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap2))),
+    ?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))).
 
 
 t_publish_no_subscriber({init, Config}) ->
 t_publish_no_subscriber({init, Config}) ->
     Config;
     Config;
@@ -163,16 +192,18 @@ t_publish_bad_topic_bulk(_Config) ->
     ).
     ).
 
 
 t_publish_bulk_api({init, Config}) ->
 t_publish_bulk_api({init, Config}) ->
-    Config;
-t_publish_bulk_api({'end', _Config}) ->
-    ok;
-t_publish_bulk_api(_) ->
     {ok, Client} = emqtt:start_link(#{
     {ok, Client} = emqtt:start_link(#{
         username => <<"api_username">>, clientid => <<"api_clientid">>
         username => <<"api_username">>, clientid => <<"api_clientid">>
     }),
     }),
     {ok, _} = emqtt:connect(Client),
     {ok, _} = emqtt:connect(Client),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
+    [{client, Client} | Config];
+t_publish_bulk_api({'end', Config}) ->
+    Client = ?config(client, Config),
+    emqtt:stop(Client),
+    ok;
+t_publish_bulk_api(_) ->
     Payload = <<"hello">>,
     Payload = <<"hello">>,
     Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
     Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
@@ -199,9 +230,8 @@ t_publish_bulk_api(_) ->
         end,
         end,
         ResponseList
         ResponseList
     ),
     ),
-    ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
-    ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
-    emqtt:stop(Client).
+    ?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))),
+    ?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))).
 
 
 t_publish_no_subscriber_bulk({init, Config}) ->
 t_publish_no_subscriber_bulk({init, Config}) ->
     Config;
     Config;
@@ -232,8 +262,8 @@ t_publish_no_subscriber_bulk(_) ->
         ],
         ],
         ResponseList
         ResponseList
     ),
     ),
-    ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
-    ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
+    ?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))),
+    ?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))),
     emqtt:stop(Client).
     emqtt:stop(Client).
 
 
 t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
 t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
@@ -267,17 +297,19 @@ t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) -
 t_publish_bulk_dispatch_failure({init, Config}) ->
 t_publish_bulk_dispatch_failure({init, Config}) ->
     meck:new(emqx, [no_link, passthrough, no_history]),
     meck:new(emqx, [no_link, passthrough, no_history]),
     meck:expect(emqx, is_running, fun() -> false end),
     meck:expect(emqx, is_running, fun() -> false end),
-    Config;
-t_publish_bulk_dispatch_failure({'end', _Config}) ->
-    meck:unload(emqx),
-    ok;
-t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
     {ok, Client} = emqtt:start_link(#{
     {ok, Client} = emqtt:start_link(#{
         username => <<"api_username">>, clientid => <<"api_clientid">>
         username => <<"api_username">>, clientid => <<"api_clientid">>
     }),
     }),
     {ok, _} = emqtt:connect(Client),
     {ok, _} = emqtt:connect(Client),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
     {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
+    [{client, Client} | Config];
+t_publish_bulk_dispatch_failure({'end', Config}) ->
+    meck:unload(emqx),
+    Client = ?config(client, Config),
+    emqtt:stop(Client),
+    ok;
+t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
     Payload = <<"hello">>,
     Payload = <<"hello">>,
     Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
     Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
@@ -303,8 +335,7 @@ t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
             #{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
             #{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
         ],
         ],
         decode_json(ResponseBody)
         decode_json(ResponseBody)
-    ),
-    emqtt:stop(Client).
+    ).
 
 
 receive_assert(Topic, Qos, Payload) ->
 receive_assert(Topic, Qos, Payload) ->
     receive
     receive
@@ -312,12 +343,12 @@ receive_assert(Topic, Qos, Payload) ->
             ReceiveTopic = maps:get(topic, Message),
             ReceiveTopic = maps:get(topic, Message),
             ReceiveQos = maps:get(qos, Message),
             ReceiveQos = maps:get(qos, Message),
             ReceivePayload = maps:get(payload, Message),
             ReceivePayload = maps:get(payload, Message),
-            ?assertEqual(ReceiveTopic, Topic),
-            ?assertEqual(ReceiveQos, Qos),
-            ?assertEqual(ReceivePayload, Payload),
-            ok
+            ?assertEqual(Topic, ReceiveTopic),
+            ?assertEqual(Qos, ReceiveQos),
+            ?assertEqual(Payload, ReceivePayload),
+            {ok, Message}
     after 5000 ->
     after 5000 ->
-        timeout
+        {error, timeout}
     end.
     end.
 
 
 decode_json(In) ->
 decode_json(In) ->

+ 2 - 0
changes/v5.0.11-en.md

@@ -23,6 +23,8 @@
 
 
 - Keep MQTT v5 User-Property pairs from bridge ingested MQTT messsages to bridge target [#9398](https://github.com/emqx/emqx/pull/9398).
 - Keep MQTT v5 User-Property pairs from bridge ingested MQTT messsages to bridge target [#9398](https://github.com/emqx/emqx/pull/9398).
 
 
+- Support message properties in `/publish` API [#9401](https://github.com/emqx/emqx/pull/9401).
+
 ## Bug fixes
 ## Bug fixes
 
 
 - Fix `ssl.existingName` option of  helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
 - Fix `ssl.existingName` option of  helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).

+ 2 - 0
changes/v5.0.11-zh.md

@@ -21,6 +21,8 @@
 
 
 - 为桥接收到的 MQTT v5 消息再转发时保留 User-Property 列表 [#9398](https://github.com/emqx/emqx/pull/9398)。
 - 为桥接收到的 MQTT v5 消息再转发时保留 User-Property 列表 [#9398](https://github.com/emqx/emqx/pull/9398)。
 
 
+- 支持在 /publish API 中添加消息属性 [#9401](https://github.com/emqx/emqx/pull/9401)。
+
 ## 修复
 ## 修复
 
 
 - 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。
 - 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。