Просмотр исходного кода

feat: add `peername` to rule events that already have `peerhost`

Fixes https://emqx.atlassian.net/browse/EMQX-12342
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
7ca5205f3f

+ 4 - 1
apps/emqx/src/emqx_channel.erl

@@ -235,7 +235,7 @@ caps(#channel{clientinfo = #{zone := Zone}}) ->
 -spec init(emqx_types:conninfo(), opts()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, PeerPort},
+        peername := {PeerHost, PeerPort} = PeerName,
         sockname := {_Host, SockPort}
     },
     #{
@@ -259,6 +259,9 @@ init(
             listener => ListenerId,
             protocol => Protocol,
             peerhost => PeerHost,
+            %% We copy peername to clientinfo because some event contexts only have access
+            %% to client info (e.g.: authn/authz).
+            peername => PeerName,
             peerport => PeerPort,
             sockport => SockPort,
             clientid => undefined,

+ 1 - 0
apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl

@@ -308,6 +308,7 @@ clientinfo(OldClientInfo) ->
             zone,
             protocol,
             peerhost,
+            peername,
             sockport,
             clientid,
             username,

+ 2 - 1
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -120,7 +120,7 @@ stats(#channel{session = Session}) ->
 -spec init(map(), map()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     #{ctx := Ctx} = Config
@@ -140,6 +140,7 @@ init(
             listener => ListenerId,
             protocol => 'coap',
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => emqx_guid:to_base62(emqx_guid:gen()),
             username => undefined,

+ 2 - 1
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -802,7 +802,7 @@ default_conninfo(ConnInfo) ->
     }.
 
 default_clientinfo(#{
-    peername := {PeerHost, _},
+    peername := {PeerHost, _} = PeerName,
     sockname := {_, SockPort},
     clientid := ClientId
 }) ->
@@ -810,6 +810,7 @@ default_clientinfo(#{
         zone => default,
         protocol => exproto,
         peerhost => PeerHost,
+        peername => PeerName,
         sockport => SockPort,
         clientid => ClientId,
         username => undefined,

+ 2 - 1
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -138,7 +138,7 @@ set_conn_state(ConnState, Channel) ->
 
 init(
     ConnInfo = #{
-        peername := {PeerHost, _Port},
+        peername := {PeerHost, _Port} = PeerName,
         sockname := {_Host, SockPort}
     },
     Options
@@ -160,6 +160,7 @@ init(
             listener => ListenerId,
             protocol => gbt32960,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 2 - 1
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -147,7 +147,7 @@ stats(#channel{inflight = Inflight, mqueue = Queue}) ->
 -spec init(emqx_types:conninfo(), map()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, _Port},
+        peername := {PeerHost, _Port} = PeerName,
         sockname := {_Host, SockPort}
     },
     Options = #{
@@ -171,6 +171,7 @@ init(
             listener => ListenerId,
             protocol => jt808,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 1 - 1
apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_lwm2m, [
     {description, "LwM2M Gateway"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]},
     {env, []},

+ 2 - 1
apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl

@@ -119,7 +119,7 @@ stats(#channel{session = Session}) ->
 
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     #{ctx := Ctx} = Config
@@ -139,6 +139,7 @@ init(
             listener => ListenerId,
             protocol => lwm2m,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             username => undefined,
             clientid => undefined,

+ 2 - 1
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -130,7 +130,7 @@
 %% @doc Init protocol
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     Option
@@ -152,6 +152,7 @@ init(
             listener => ListenerId,
             protocol => 'mqtt-sn',
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 4 - 3
apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

@@ -209,7 +209,7 @@ stats(#channel{mqueue = MQueue}) ->
 -spec init(emqx_types:conninfo(), map()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, _Port},
+        peername := {PeerHost, _Port} = PeerName,
         sockname := {_Host, SockPort}
     },
     Options
@@ -230,6 +230,7 @@ init(
             listener => ListenerId,
             protocol => ocpp,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,
@@ -325,9 +326,9 @@ enrich_client(
 
 set_log_meta(#channel{
     clientinfo = #{clientid := ClientId},
-    conninfo = #{peername := Peername}
+    conninfo = #{peername := PeerName}
 }) ->
-    emqx_logger:set_metadata_peername(esockd:format(Peername)),
+    emqx_logger:set_metadata_peername(esockd:format(PeerName)),
     emqx_logger:set_metadata_clientid(ClientId).
 
 run_conn_hooks(_UserInfo, Channel = #channel{conninfo = ConnInfo}) ->

+ 2 - 1
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -117,7 +117,7 @@
 %% @doc Init protocol
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     Option
@@ -137,6 +137,7 @@ init(
             listener => ListenerId,
             protocol => stomp,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 24 - 5
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -329,6 +329,7 @@ eventmsg_publish(
             username => emqx_message:get_header(username, Message, undefined),
             payload => Payload,
             peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            peername => ntoa(emqx_message:get_header(peername, Message, undefined)),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -446,7 +447,8 @@ eventmsg_check_authz_complete(
     _ClientInfo = #{
         clientid := ClientId,
         username := Username,
-        peerhost := PeerHost
+        peerhost := PeerHost,
+        peername := PeerName
     },
     PubSub,
     Topic,
@@ -458,6 +460,7 @@ eventmsg_check_authz_complete(
         #{
             clientid => ClientId,
             username => Username,
+            peername => ntoa(PeerName),
             peerhost => ntoa(PeerHost),
             topic => Topic,
             action => PubSub,
@@ -471,8 +474,7 @@ eventmsg_check_authn_complete(
     _ClientInfo = #{
         clientid := ClientId,
         username := Username,
-        peerhost := PeerHost,
-        peerport := PeerPort
+        peername := PeerName
     },
     Result
 ) ->
@@ -488,7 +490,7 @@ eventmsg_check_authn_complete(
         #{
             clientid => ClientId,
             username => Username,
-            peername => ntoa({PeerHost, PeerPort}),
+            peername => ntoa(PeerName),
             reason_code => force_to_bin(Reason),
             is_anonymous => IsAnonymous,
             is_superuser => IsSuperuser
@@ -501,7 +503,8 @@ eventmsg_sub_or_unsub(
     _ClientInfo = #{
         clientid := ClientId,
         username := Username,
-        peerhost := PeerHost
+        peerhost := PeerHost,
+        peername := PeerName
     },
     Topic,
     SubOpts = #{qos := QoS}
@@ -513,6 +516,7 @@ eventmsg_sub_or_unsub(
             clientid => ClientId,
             username => Username,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
             topic => Topic,
             qos => QoS
@@ -542,6 +546,7 @@ eventmsg_dropped(
             username => emqx_message:get_header(username, Message, undefined),
             payload => Payload,
             peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            peername => ntoa(emqx_message:get_header(peername, Message, undefined)),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -606,6 +611,7 @@ eventmsg_validation_failed(
             username => emqx_message:get_header(username, Message, undefined),
             payload => Payload,
             peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            peername => ntoa(emqx_message:get_header(peername, Message, undefined)),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -618,6 +624,7 @@ eventmsg_validation_failed(
 eventmsg_delivered(
     _ClientInfo = #{
         peerhost := PeerHost,
+        peername := PeerName,
         clientid := ReceiverCId,
         username := ReceiverUsername
     },
@@ -642,6 +649,7 @@ eventmsg_delivered(
             username => ReceiverUsername,
             payload => Payload,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -654,6 +662,7 @@ eventmsg_delivered(
 eventmsg_acked(
     _ClientInfo = #{
         peerhost := PeerHost,
+        peername := PeerName,
         clientid := ReceiverCId,
         username := ReceiverUsername
     },
@@ -678,6 +687,7 @@ eventmsg_acked(
             username => ReceiverUsername,
             payload => Payload,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -691,6 +701,7 @@ eventmsg_acked(
 eventmsg_delivery_dropped(
     _ClientInfo = #{
         peerhost := PeerHost,
+        peername := PeerName,
         clientid := ReceiverCId,
         username := ReceiverUsername
     },
@@ -717,6 +728,7 @@ eventmsg_delivery_dropped(
             username => ReceiverUsername,
             payload => Payload,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -1009,6 +1021,7 @@ columns_with_exam('message.publish') ->
         {<<"username">>, <<"u_emqx">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1031,6 +1044,7 @@ columns_with_exam('message.dropped') ->
         {<<"username">>, <<"u_emqx">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1048,6 +1062,7 @@ columns_with_exam('schema.validation_failed') ->
         {<<"username">>, <<"u_emqx">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1084,6 +1099,7 @@ columns_with_exam('delivery.dropped') ->
         {<<"username">>, <<"u_emqx_2">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1150,6 +1166,7 @@ columns_with_exam('client.check_authz_complete') ->
         {<<"clientid">>, <<"c_emqx">>},
         {<<"username">>, <<"u_emqx">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"action">>, <<"publish">>},
         {<<"authz_source">>, <<"cache">>},
@@ -1197,6 +1214,7 @@ columns_message_sub_unsub(EventName) ->
         {<<"clientid">>, <<"c_emqx">>},
         {<<"username">>, <<"u_emqx">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"timestamp">>, erlang:system_time(millisecond)},
@@ -1213,6 +1231,7 @@ columns_message_ack_delivered(EventName) ->
         {<<"username">>, <<"u_emqx_2">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},

+ 16 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -3920,6 +3920,7 @@ verify_event_fields('message.publish', Fields) ->
         username := Username,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -3934,6 +3935,7 @@ verify_event_fields('message.publish', Fields) ->
     ?assertEqual(<<"c_event">>, ClientId),
     ?assertEqual(<<"u_event">>, Username),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4008,6 +4010,7 @@ verify_event_fields(SubUnsub, Fields) when
         clientid := ClientId,
         username := Username,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         timestamp := Timestamp
@@ -4017,6 +4020,7 @@ verify_event_fields(SubUnsub, Fields) when
     ?assert(is_atom(reason)),
     ?assertEqual(<<"c_event2">>, ClientId),
     ?assertEqual(<<"u_event2">>, Username),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4043,6 +4047,7 @@ verify_event_fields('delivery.dropped', Fields) ->
         node := Node,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         pub_props := Properties,
         publish_received_at := EventAt,
         qos := QoS,
@@ -4062,6 +4067,7 @@ verify_event_fields('delivery.dropped', Fields) ->
     ?assertEqual(<<"c_event">>, FromClientId),
     ?assertEqual(<<"u_event">>, FromUsername),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4078,6 +4084,7 @@ verify_event_fields('message.dropped', Fields) ->
         username := Username,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -4093,6 +4100,7 @@ verify_event_fields('message.dropped', Fields) ->
     ?assertEqual(<<"c_event">>, ClientId),
     ?assertEqual(<<"u_event">>, Username),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4110,6 +4118,7 @@ verify_event_fields('message.delivered', Fields) ->
         from_username := FromUsername,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -4126,6 +4135,7 @@ verify_event_fields('message.delivered', Fields) ->
     ?assertEqual(<<"c_event">>, FromClientId),
     ?assertEqual(<<"u_event">>, FromUsername),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4143,6 +4153,7 @@ verify_event_fields('message.acked', Fields) ->
         from_username := FromUsername,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -4160,6 +4171,7 @@ verify_event_fields('message.acked', Fields) ->
     ?assertEqual(<<"c_event">>, FromClientId),
     ?assertEqual(<<"u_event">>, FromUsername),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4203,6 +4215,7 @@ verify_event_fields('client.check_authz_complete', Fields) ->
         clientid := ClientId,
         action := Action,
         result := Result,
+        peername := PeerName,
         topic := Topic,
         authz_source := AuthzSource,
         username := Username
@@ -4210,6 +4223,7 @@ verify_event_fields('client.check_authz_complete', Fields) ->
     ?assertEqual(<<"t1">>, Topic),
     ?assert(lists:member(Action, [subscribe, publish])),
     ?assert(lists:member(Result, [allow, deny])),
+    verify_peername(PeerName),
     ?assert(
         lists:member(AuthzSource, [
             cache,
@@ -4228,10 +4242,12 @@ verify_event_fields('client.check_authz_complete', Fields) ->
 verify_event_fields('client.check_authn_complete', Fields) ->
     #{
         clientid := ClientId,
+        peername := PeerName,
         username := Username,
         is_anonymous := IsAnonymous,
         is_superuser := IsSuperuser
     } = Fields,
+    verify_peername(PeerName),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
     ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
     ?assert(erlang:is_boolean(IsAnonymous)),

+ 1 - 0
changes/ce/feat-13506.en.md

@@ -0,0 +1 @@
+Added the `peername` field to all rule engine events that already contained the `peerhost` field.  `peername` is a string and has the `IP:PORT` format.