Просмотр исходного кода

feat: pass along client attributes down to message transformation context

Thales Macedo Garitezi 1 год назад
Родитель
Сommit
6e0ef893f4

+ 10 - 7
apps/emqx/src/emqx_channel.erl

@@ -685,20 +685,23 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
 
 packet_to_message(Packet, #channel{
     conninfo = #{proto_ver := ProtoVer},
-    clientinfo = #{
-        protocol := Protocol,
-        clientid := ClientId,
-        username := Username,
-        peerhost := PeerHost,
-        mountpoint := MountPoint
-    }
+    clientinfo =
+        #{
+            protocol := Protocol,
+            clientid := ClientId,
+            username := Username,
+            peerhost := PeerHost,
+            mountpoint := MountPoint
+        } = ClientInfo
 }) ->
+    ClientAttrs = maps:get(client_attrs, ClientInfo, #{}),
     emqx_mountpoint:mount(
         MountPoint,
         emqx_packet:to_message(
             Packet,
             ClientId,
             #{
+                client_attrs => ClientAttrs,
                 proto_ver => ProtoVer,
                 protocol => Protocol,
                 username => Username,

+ 2 - 0
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -45,6 +45,7 @@
 -type rendered_value() :: qos() | boolean() | binary().
 
 -type eval_context() :: #{
+    client_attrs := map(),
     payload := _,
     qos := _,
     retain := _,
@@ -309,6 +310,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
         end,
     #{
         dirty => Dirty,
+        client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
         payload => Payload,
         qos => Message#message.qos,
         retain => emqx_message:get_flag(retain, Message, false),

+ 39 - 1
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -250,7 +250,11 @@ connect(ClientId) ->
     connect(ClientId, _IsPersistent = false).
 
 connect(ClientId, IsPersistent) ->
-    Properties = emqx_utils_maps:put_if(#{}, 'Session-Expiry-Interval', 30, IsPersistent),
+    connect(ClientId, IsPersistent, _Opts = #{}).
+
+connect(ClientId, IsPersistent, Opts) ->
+    Properties0 = maps:get(properties, Opts, #{}),
+    Properties = emqx_utils_maps:put_if(Properties0, 'Session-Expiry-Interval', 30, IsPersistent),
     {ok, Client} = emqtt:start_link([
         {clean_start, true},
         {clientid, ClientId},
@@ -1441,3 +1445,37 @@ t_json_encode_decode_smoke_test(_Config) ->
         []
     ),
     ok.
+
+%% Simple smoke test for client attributes support.
+t_client_attrs(_Config) ->
+    {ok, Compiled} = emqx_variform:compile(<<"user_property.tenant">>),
+    ok = emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], [
+        #{
+            expression => Compiled,
+            set_as_attr => <<"tenant">>
+        }
+    ]),
+    on_exit(fun() -> ok = emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []) end),
+    ?check_trace(
+        begin
+            Name1 = <<"foo">>,
+            Operation1 = operation(topic, <<"concat([client_attrs.tenant, '/', topic])">>),
+            Transformation1 = transformation(Name1, [Operation1]),
+            {201, _} = insert(Transformation1),
+
+            Tenant = <<"mytenant">>,
+            C = connect(
+                <<"c1">>,
+                _IsPersistent = false,
+                #{properties => #{'User-Property' => [{<<"tenant">>, Tenant}]}}
+            ),
+            {ok, _, [_]} = emqtt:subscribe(C, emqx_topic:join([Tenant, <<"#">>])),
+
+            ok = publish(C, <<"t/1">>, #{x => 1, y => 2}),
+            ?assertReceive({publish, #{topic := <<"mytenant/t/1">>}}),
+
+            ok
+        end,
+        []
+    ),
+    ok.