Просмотр исходного кода

Merge pull request #13506 from thalesmg/20240722-m-peername-sys-events

feat: add `peername` to rule events that already have `peerhost`
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
9c0f1df8a3

+ 4 - 1
apps/emqx/src/emqx_channel.erl

@@ -235,7 +235,7 @@ caps(#channel{clientinfo = #{zone := Zone}}) ->
 -spec init(emqx_types:conninfo(), opts()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, PeerPort},
+        peername := {PeerHost, PeerPort} = PeerName,
         sockname := {_Host, SockPort}
     },
     #{
@@ -259,6 +259,9 @@ init(
             listener => ListenerId,
             protocol => Protocol,
             peerhost => PeerHost,
+            %% We copy peername to clientinfo because some event contexts only have access
+            %% to client info (e.g.: authn/authz).
+            peername => PeerName,
             peerport => PeerPort,
             sockport => SockPort,
             clientid => undefined,

+ 1 - 0
apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl

@@ -308,6 +308,7 @@ clientinfo(OldClientInfo) ->
             zone,
             protocol,
             peerhost,
+            peername,
             sockport,
             clientid,
             username,

+ 2 - 1
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -120,7 +120,7 @@ stats(#channel{session = Session}) ->
 -spec init(map(), map()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     #{ctx := Ctx} = Config
@@ -140,6 +140,7 @@ init(
             listener => ListenerId,
             protocol => 'coap',
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => emqx_guid:to_base62(emqx_guid:gen()),
             username => undefined,

+ 2 - 1
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -802,7 +802,7 @@ default_conninfo(ConnInfo) ->
     }.
 
 default_clientinfo(#{
-    peername := {PeerHost, _},
+    peername := {PeerHost, _} = PeerName,
     sockname := {_, SockPort},
     clientid := ClientId
 }) ->
@@ -810,6 +810,7 @@ default_clientinfo(#{
         zone => default,
         protocol => exproto,
         peerhost => PeerHost,
+        peername => PeerName,
         sockport => SockPort,
         clientid => ClientId,
         username => undefined,

+ 2 - 1
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -138,7 +138,7 @@ set_conn_state(ConnState, Channel) ->
 
 init(
     ConnInfo = #{
-        peername := {PeerHost, _Port},
+        peername := {PeerHost, _Port} = PeerName,
         sockname := {_Host, SockPort}
     },
     Options
@@ -160,6 +160,7 @@ init(
             listener => ListenerId,
             protocol => gbt32960,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 2 - 1
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -147,7 +147,7 @@ stats(#channel{inflight = Inflight, mqueue = Queue}) ->
 -spec init(emqx_types:conninfo(), map()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, _Port},
+        peername := {PeerHost, _Port} = PeerName,
         sockname := {_Host, SockPort}
     },
     Options = #{
@@ -171,6 +171,7 @@ init(
             listener => ListenerId,
             protocol => jt808,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 1 - 1
apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_lwm2m, [
     {description, "LwM2M Gateway"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]},
     {env, []},

+ 2 - 1
apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl

@@ -119,7 +119,7 @@ stats(#channel{session = Session}) ->
 
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     #{ctx := Ctx} = Config
@@ -139,6 +139,7 @@ init(
             listener => ListenerId,
             protocol => lwm2m,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             username => undefined,
             clientid => undefined,

+ 2 - 1
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -130,7 +130,7 @@
 %% @doc Init protocol
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     Option
@@ -152,6 +152,7 @@ init(
             listener => ListenerId,
             protocol => 'mqtt-sn',
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 4 - 3
apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

@@ -209,7 +209,7 @@ stats(#channel{mqueue = MQueue}) ->
 -spec init(emqx_types:conninfo(), map()) -> channel().
 init(
     ConnInfo = #{
-        peername := {PeerHost, _Port},
+        peername := {PeerHost, _Port} = PeerName,
         sockname := {_Host, SockPort}
     },
     Options
@@ -230,6 +230,7 @@ init(
             listener => ListenerId,
             protocol => ocpp,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,
@@ -325,9 +326,9 @@ enrich_client(
 
 set_log_meta(#channel{
     clientinfo = #{clientid := ClientId},
-    conninfo = #{peername := Peername}
+    conninfo = #{peername := PeerName}
 }) ->
-    emqx_logger:set_metadata_peername(esockd:format(Peername)),
+    emqx_logger:set_metadata_peername(esockd:format(PeerName)),
     emqx_logger:set_metadata_clientid(ClientId).
 
 run_conn_hooks(_UserInfo, Channel = #channel{conninfo = ConnInfo}) ->

+ 2 - 1
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -117,7 +117,7 @@
 %% @doc Init protocol
 init(
     ConnInfo = #{
-        peername := {PeerHost, _},
+        peername := {PeerHost, _} = PeerName,
         sockname := {_, SockPort}
     },
     Option
@@ -137,6 +137,7 @@ init(
             listener => ListenerId,
             protocol => stomp,
             peerhost => PeerHost,
+            peername => PeerName,
             sockport => SockPort,
             clientid => undefined,
             username => undefined,

+ 24 - 5
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -329,6 +329,7 @@ eventmsg_publish(
             username => emqx_message:get_header(username, Message, undefined),
             payload => Payload,
             peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            peername => ntoa(emqx_message:get_header(peername, Message, undefined)),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -446,7 +447,8 @@ eventmsg_check_authz_complete(
     _ClientInfo = #{
         clientid := ClientId,
         username := Username,
-        peerhost := PeerHost
+        peerhost := PeerHost,
+        peername := PeerName
     },
     PubSub,
     Topic,
@@ -458,6 +460,7 @@ eventmsg_check_authz_complete(
         #{
             clientid => ClientId,
             username => Username,
+            peername => ntoa(PeerName),
             peerhost => ntoa(PeerHost),
             topic => Topic,
             action => PubSub,
@@ -471,8 +474,7 @@ eventmsg_check_authn_complete(
     _ClientInfo = #{
         clientid := ClientId,
         username := Username,
-        peerhost := PeerHost,
-        peerport := PeerPort
+        peername := PeerName
     },
     Result
 ) ->
@@ -488,7 +490,7 @@ eventmsg_check_authn_complete(
         #{
             clientid => ClientId,
             username => Username,
-            peername => ntoa({PeerHost, PeerPort}),
+            peername => ntoa(PeerName),
             reason_code => force_to_bin(Reason),
             is_anonymous => IsAnonymous,
             is_superuser => IsSuperuser
@@ -501,7 +503,8 @@ eventmsg_sub_or_unsub(
     _ClientInfo = #{
         clientid := ClientId,
         username := Username,
-        peerhost := PeerHost
+        peerhost := PeerHost,
+        peername := PeerName
     },
     Topic,
     SubOpts = #{qos := QoS}
@@ -513,6 +516,7 @@ eventmsg_sub_or_unsub(
             clientid => ClientId,
             username => Username,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
             topic => Topic,
             qos => QoS
@@ -542,6 +546,7 @@ eventmsg_dropped(
             username => emqx_message:get_header(username, Message, undefined),
             payload => Payload,
             peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            peername => ntoa(emqx_message:get_header(peername, Message, undefined)),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -606,6 +611,7 @@ eventmsg_validation_failed(
             username => emqx_message:get_header(username, Message, undefined),
             payload => Payload,
             peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
+            peername => ntoa(emqx_message:get_header(peername, Message, undefined)),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -618,6 +624,7 @@ eventmsg_validation_failed(
 eventmsg_delivered(
     _ClientInfo = #{
         peerhost := PeerHost,
+        peername := PeerName,
         clientid := ReceiverCId,
         username := ReceiverUsername
     },
@@ -642,6 +649,7 @@ eventmsg_delivered(
             username => ReceiverUsername,
             payload => Payload,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -654,6 +662,7 @@ eventmsg_delivered(
 eventmsg_acked(
     _ClientInfo = #{
         peerhost := PeerHost,
+        peername := PeerName,
         clientid := ReceiverCId,
         username := ReceiverUsername
     },
@@ -678,6 +687,7 @@ eventmsg_acked(
             username => ReceiverUsername,
             payload => Payload,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -691,6 +701,7 @@ eventmsg_acked(
 eventmsg_delivery_dropped(
     _ClientInfo = #{
         peerhost := PeerHost,
+        peername := PeerName,
         clientid := ReceiverCId,
         username := ReceiverUsername
     },
@@ -717,6 +728,7 @@ eventmsg_delivery_dropped(
             username => ReceiverUsername,
             payload => Payload,
             peerhost => ntoa(PeerHost),
+            peername => ntoa(PeerName),
             topic => Topic,
             qos => QoS,
             flags => Flags,
@@ -1009,6 +1021,7 @@ columns_with_exam('message.publish') ->
         {<<"username">>, <<"u_emqx">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1031,6 +1044,7 @@ columns_with_exam('message.dropped') ->
         {<<"username">>, <<"u_emqx">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1048,6 +1062,7 @@ columns_with_exam('schema.validation_failed') ->
         {<<"username">>, <<"u_emqx">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1084,6 +1099,7 @@ columns_with_exam('delivery.dropped') ->
         {<<"username">>, <<"u_emqx_2">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},
@@ -1150,6 +1166,7 @@ columns_with_exam('client.check_authz_complete') ->
         {<<"clientid">>, <<"c_emqx">>},
         {<<"username">>, <<"u_emqx">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"action">>, <<"publish">>},
         {<<"authz_source">>, <<"cache">>},
@@ -1197,6 +1214,7 @@ columns_message_sub_unsub(EventName) ->
         {<<"clientid">>, <<"c_emqx">>},
         {<<"username">>, <<"u_emqx">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"timestamp">>, erlang:system_time(millisecond)},
@@ -1213,6 +1231,7 @@ columns_message_ack_delivered(EventName) ->
         {<<"username">>, <<"u_emqx_2">>},
         {<<"payload">>, <<"{\"msg\": \"hello\"}">>},
         {<<"peerhost">>, <<"192.168.0.10">>},
+        {<<"peername">>, <<"192.168.0.10:32781">>},
         {<<"topic">>, <<"t/a">>},
         {<<"qos">>, 1},
         {<<"flags">>, #{}},

+ 120 - 4
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -160,13 +160,55 @@ groups() ->
 
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
-        [
+        lists:flatten([
             emqx,
             emqx_conf,
             emqx_rule_engine,
             emqx_auth,
-            emqx_bridge
-        ],
+            emqx_bridge,
+            [
+                {emqx_schema_validation, #{
+                    config => #{
+                        <<"schema_validation">> => #{
+                            <<"validations">> => [
+                                #{
+                                    <<"name">> => <<"v1">>,
+                                    <<"topics">> => [<<"sv/fail">>],
+                                    <<"strategy">> => <<"all_pass">>,
+                                    <<"failure_action">> => <<"drop">>,
+                                    <<"checks">> => [
+                                        #{
+                                            <<"type">> => <<"sql">>,
+                                            <<"sql">> => <<"select 1 where false">>
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    }
+                }}
+             || is_ee()
+            ],
+            [
+                {emqx_message_transformation, #{
+                    config => #{
+                        <<"message_transformation">> => #{
+                            <<"transformations">> => [
+                                #{
+                                    <<"name">> => <<"t1">>,
+                                    <<"topics">> => <<"mt/fail">>,
+                                    <<"failure_action">> => <<"drop">>,
+                                    <<"payload_decoder">> => #{<<"type">> => <<"json">>},
+                                    <<"payload_encoder">> => #{<<"type">> => <<"json">>},
+                                    <<"operations">> => []
+                                }
+                            ]
+                        }
+                    }
+                }}
+             || is_ee()
+            ]
+        ]),
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     [{apps, Apps} | Config].
@@ -250,6 +292,8 @@ init_per_testcase(t_events, Config) ->
         "\"$events/message_delivered\", "
         "\"$events/message_dropped\", "
         "\"$events/delivery_dropped\", "
+        "\"$events/schema_validation_failed\", "
+        "\"$events/message_transformation_failed\", "
         "\"t1\"",
     {ok, Rule} = emqx_rule_engine:create_rule(
         #{
@@ -834,6 +878,13 @@ t_events(_Config) ->
     session_subscribed(Client2),
     ct:pal("====== verify t1"),
     message_publish(Client),
+    is_ee() andalso
+        begin
+            ct:pal("====== verify $events/schema_validation_failed"),
+            schema_validation_failed(Client),
+            ct:pal("====== verify $events/message_transformation_failed"),
+            message_transformation_failed(Client)
+        end,
     ct:pal("====== verify $events/delivery_dropped"),
     delivery_dropped(Client),
     ct:pal("====== verify $events/message_delivered"),
@@ -1151,6 +1202,16 @@ message_dropped(Client) ->
 message_acked(_Client) ->
     verify_event('message.acked'),
     ok.
+schema_validation_failed(Client) ->
+    {ok, _} = emqtt:publish(Client, <<"sv/fail">>, <<"">>, [{qos, 1}]),
+    ct:sleep(100),
+    verify_event('schema.validation_failed'),
+    ok.
+message_transformation_failed(Client) ->
+    {ok, _} = emqtt:publish(Client, <<"mt/fail">>, <<"will fail to { parse">>, [{qos, 1}]),
+    ct:sleep(100),
+    verify_event('message.transformation_failed'),
+    ok.
 
 t_match_atom_and_binary(_Config) ->
     SQL =
@@ -3834,6 +3895,9 @@ t_trace_rule_id(_Config) ->
 %% Internal helpers
 %%------------------------------------------------------------------------------
 
+is_ee() ->
+    emqx_release:edition() == ee.
+
 republish_action(Topic) ->
     republish_action(Topic, <<"${payload}">>).
 
@@ -3920,6 +3984,7 @@ verify_event_fields('message.publish', Fields) ->
         username := Username,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -3934,6 +3999,7 @@ verify_event_fields('message.publish', Fields) ->
     ?assertEqual(<<"c_event">>, ClientId),
     ?assertEqual(<<"u_event">>, Username),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4008,6 +4074,7 @@ verify_event_fields(SubUnsub, Fields) when
         clientid := ClientId,
         username := Username,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         timestamp := Timestamp
@@ -4017,6 +4084,7 @@ verify_event_fields(SubUnsub, Fields) when
     ?assert(is_atom(reason)),
     ?assertEqual(<<"c_event2">>, ClientId),
     ?assertEqual(<<"u_event2">>, Username),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4043,6 +4111,7 @@ verify_event_fields('delivery.dropped', Fields) ->
         node := Node,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         pub_props := Properties,
         publish_received_at := EventAt,
         qos := QoS,
@@ -4062,6 +4131,7 @@ verify_event_fields('delivery.dropped', Fields) ->
     ?assertEqual(<<"c_event">>, FromClientId),
     ?assertEqual(<<"u_event">>, FromUsername),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4078,6 +4148,7 @@ verify_event_fields('message.dropped', Fields) ->
         username := Username,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -4093,6 +4164,7 @@ verify_event_fields('message.dropped', Fields) ->
     ?assertEqual(<<"c_event">>, ClientId),
     ?assertEqual(<<"u_event">>, Username),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4110,6 +4182,7 @@ verify_event_fields('message.delivered', Fields) ->
         from_username := FromUsername,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -4126,6 +4199,7 @@ verify_event_fields('message.delivered', Fields) ->
     ?assertEqual(<<"c_event">>, FromClientId),
     ?assertEqual(<<"u_event">>, FromUsername),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4143,6 +4217,7 @@ verify_event_fields('message.acked', Fields) ->
         from_username := FromUsername,
         payload := Payload,
         peerhost := PeerHost,
+        peername := PeerName,
         topic := Topic,
         qos := QoS,
         flags := Flags,
@@ -4160,6 +4235,7 @@ verify_event_fields('message.acked', Fields) ->
     ?assertEqual(<<"c_event">>, FromClientId),
     ?assertEqual(<<"u_event">>, FromUsername),
     ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_peername(PeerName),
     verify_ipaddr(PeerHost),
     ?assertEqual(<<"t1">>, Topic),
     ?assertEqual(1, QoS),
@@ -4203,6 +4279,7 @@ verify_event_fields('client.check_authz_complete', Fields) ->
         clientid := ClientId,
         action := Action,
         result := Result,
+        peername := PeerName,
         topic := Topic,
         authz_source := AuthzSource,
         username := Username
@@ -4210,6 +4287,7 @@ verify_event_fields('client.check_authz_complete', Fields) ->
     ?assertEqual(<<"t1">>, Topic),
     ?assert(lists:member(Action, [subscribe, publish])),
     ?assert(lists:member(Result, [allow, deny])),
+    verify_peername(PeerName),
     ?assert(
         lists:member(AuthzSource, [
             cache,
@@ -4228,14 +4306,52 @@ verify_event_fields('client.check_authz_complete', Fields) ->
 verify_event_fields('client.check_authn_complete', Fields) ->
     #{
         clientid := ClientId,
+        peername := PeerName,
         username := Username,
         is_anonymous := IsAnonymous,
         is_superuser := IsSuperuser
     } = Fields,
+    verify_peername(PeerName),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
     ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
     ?assert(erlang:is_boolean(IsAnonymous)),
-    ?assert(erlang:is_boolean(IsSuperuser)).
+    ?assert(erlang:is_boolean(IsSuperuser));
+verify_event_fields('schema.validation_failed', Fields) ->
+    #{
+        validation := ValidationName,
+        clientid := ClientId,
+        username := Username,
+        payload := _Payload,
+        peername := PeerName,
+        qos := _QoS,
+        topic := _Topic,
+        flags := _Flags,
+        pub_props := _PubProps,
+        publish_received_at := _PublishReceivedAt
+    } = Fields,
+    ?assertEqual(<<"v1">>, ValidationName),
+    verify_peername(PeerName),
+    ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
+    ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
+    ok;
+verify_event_fields('message.transformation_failed', Fields) ->
+    #{
+        transformation := TransformationName,
+        clientid := ClientId,
+        username := Username,
+        payload := _Payload,
+        peername := PeerName,
+        qos := _QoS,
+        topic := _Topic,
+        flags := _Flags,
+        pub_props := _PubProps,
+        publish_received_at := _PublishReceivedAt
+    } = Fields,
+    ?assertEqual(<<"t1">>, TransformationName),
+    verify_peername(PeerName),
+    ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
+    ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
+    ok.
 
 verify_peername(PeerName) ->
     case string:split(PeerName, ":") of

+ 1 - 0
changes/ce/feat-13506.en.md

@@ -0,0 +1 @@
+Added the `peername` field to all rule engine events that already contained the `peerhost` field.  `peername` is a string and has the `IP:PORT` format.