Просмотр исходного кода

feat(message transformation dry run): add more input fields

Fixes https://emqx.atlassian.net/browse/EMQX-12932
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
bf908b5fae

+ 23 - 6
apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl

@@ -39,7 +39,8 @@
 -define(METRIC_NAME, message_transformation).
 
 -type user_property() :: #{binary() => binary()}.
--reflect_type([user_property/0]).
+-type publish_properties() :: #{binary() => binary() | integer()}.
+-reflect_type([user_property/0, publish_properties/0]).
 
 %%-------------------------------------------------------------------------------------------------
 %% `minirest' and `minirest_trails' API
@@ -305,7 +306,14 @@ fields(dryrun_input_message) ->
     %% See `emqx_message_transformation:eval_context()'.
     [
         {client_attrs, mk(map(), #{default => #{}})},
+        {clientid, mk(binary(), #{default => <<"test-clientid">>})},
         {payload, mk(binary(), #{required => true})},
+        {peername, mk(emqx_schema:ip_port(), #{default => <<"127.0.0.1:19872">>})},
+        {pub_props,
+            mk(
+                typerefl:alias("map()", publish_properties()),
+                #{default => #{}}
+            )},
         {qos, mk(range(0, 2), #{default => 0})},
         {retain, mk(boolean(), #{default => false})},
         {topic, mk(binary(), #{required => true})},
@@ -313,7 +321,8 @@ fields(dryrun_input_message) ->
             mk(
                 typerefl:alias("map(binary(), binary())", user_property()),
                 #{default => #{}}
-            )}
+            )},
+        {username, mk(binary(), #{required => false})}
     ];
 fields(get_metrics) ->
     [
@@ -728,26 +737,34 @@ dryrun_input_message_in(Params) ->
         ),
     #{
         client_attrs := ClientAttrs,
+        clientid := ClientId,
         payload := Payload,
+        peername := Peername,
+        pub_props := PublishProperties,
         qos := QoS,
         retain := Retain,
         topic := Topic,
         user_property := UserProperty0
     } = Message0,
+    Username = maps:get(username, Message0, undefined),
     UserProperty = maps:to_list(UserProperty0),
     Message1 = #{
         id => emqx_guid:gen(),
         timestamp => emqx_message:timestamp_now(),
         extra => #{},
-        from => <<"test-clientid">>,
-
-        flags => #{retain => Retain},
+        from => ClientId,
+        flags => #{dup => false, retain => Retain},
         qos => QoS,
         topic => Topic,
         payload => Payload,
         headers => #{
             client_attrs => ClientAttrs,
-            properties => #{'User-Property' => UserProperty}
+            peername => Peername,
+            properties => maps:merge(
+                PublishProperties,
+                #{'User-Property' => UserProperty}
+            ),
+            username => Username
         }
     },
     Message = emqx_message:from_map(Message1),

+ 45 - 4
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -1875,12 +1875,26 @@ t_dryrun_transformation(_Config) ->
                 operation(retain, <<"payload.r">>),
                 operation(<<"user_property.a">>, <<"payload.u.a">>),
                 operation(<<"user_property.copy">>, <<"user_property.original">>),
-                operation(<<"payload">>, <<"payload.p.hello">>)
+                operation(<<"payload.user">>, <<"username">>),
+                operation(<<"payload.flags">>, <<"flags">>),
+                operation(<<"payload.pprops">>, <<"pub_props">>),
+                operation(<<"payload.expiry">>, <<"pub_props.Message-Expiry-Interval">>),
+                operation(<<"payload.peername">>, <<"peername">>),
+                operation(<<"payload.node">>, <<"node">>),
+                operation(<<"payload.id">>, <<"id">>),
+                operation(<<"payload.clientid">>, <<"clientid">>),
+                operation(<<"payload.now">>, <<"timestamp">>),
+                operation(<<"payload.recv_at">>, <<"publish_received_at">>),
+                operation(<<"payload.hi">>, <<"payload.p.hello">>)
             ],
             Transformation1 = transformation(Name1, Operations),
 
             %% Good input
+            ClientId = <<"myclientid">>,
+            Username = <<"myusername">>,
+            Peername = <<"10.0.50.1:63221">>,
             Message1 = dryrun_input_message(#{
+                clientid => ClientId,
                 payload => #{
                     p => #{<<"hello">> => <<"world">>},
                     q => 1,
@@ -1888,11 +1902,15 @@ t_dryrun_transformation(_Config) ->
                     t => <<"t">>,
                     u => #{a => <<"b">>}
                 },
-                user_property => #{<<"original">> => <<"user_prop">>}
+                peername => Peername,
+                pub_props => #{<<"Message-Expiry-Interval">> => 30},
+                user_property => #{<<"original">> => <<"user_prop">>},
+                username => Username
             }),
+            Res1 = dryrun_transformation(Transformation1, Message1),
             ?assertMatch(
                 {200, #{
-                    <<"payload">> := <<"\"world\"">>,
+                    <<"payload">> := _,
                     <<"qos">> := 1,
                     <<"retain">> := true,
                     <<"topic">> := <<"t/u/v/t">>,
@@ -1902,7 +1920,30 @@ t_dryrun_transformation(_Config) ->
                         <<"copy">> := <<"user_prop">>
                     }
                 }},
-                dryrun_transformation(Transformation1, Message1)
+                Res1
+            ),
+            {200, #{<<"payload">> := EncPayloadRes1}} = Res1,
+            NodeBin = atom_to_binary(node()),
+            ?assertMatch(
+                #{
+                    <<"hi">> := <<"world">>,
+                    <<"now">> := _,
+                    <<"recv_at">> := _,
+                    <<"clientid">> := ClientId,
+                    <<"pprops">> := #{
+                        <<"Message-Expiry-Interval">> := 30,
+                        <<"User-Property">> := #{
+                            <<"original">> := <<"user_prop">>
+                        }
+                    },
+                    <<"expiry">> := 30,
+                    <<"peername">> := Peername,
+                    <<"node">> := NodeBin,
+                    <<"id">> := <<_/binary>>,
+                    <<"flags">> := #{<<"dup">> := false, <<"retain">> := true},
+                    <<"user">> := Username
+                },
+                emqx_utils_json:decode(EncPayloadRes1, [return_maps])
             ),
 
             %% Bad input: fails to decode