Просмотр исходного кода

Merge pull request #13604 from lafirest/fix/cattrs

fix(ruleengine): adjusted the scope of `client_attrs` in the rule engine
lafirest 1 год назад
Родитель
Сommit
3057d68eee

+ 6 - 12
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -557,8 +557,7 @@ eventmsg_dropped(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp,
-            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
+            publish_received_at => Timestamp
         },
         #{headers => Headers}
     ).
@@ -590,8 +589,7 @@ eventmsg_transformation_failed(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp,
-            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
+            publish_received_at => Timestamp
         },
         #{headers => Headers}
     ).
@@ -624,8 +622,7 @@ eventmsg_validation_failed(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp,
-            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
+            publish_received_at => Timestamp
         },
         #{headers => Headers}
     ).
@@ -663,8 +660,7 @@ eventmsg_delivered(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp,
-            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
+            publish_received_at => Timestamp
         },
         #{headers => Headers}
     ).
@@ -703,8 +699,7 @@ eventmsg_acked(
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
             puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
-            publish_received_at => Timestamp,
-            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
+            publish_received_at => Timestamp
         },
         #{headers => Headers}
     ).
@@ -744,8 +739,7 @@ eventmsg_delivery_dropped(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp,
-            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
+            publish_received_at => Timestamp
         },
         #{headers => Headers}
     ).

+ 10 - 22
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -4177,8 +4177,7 @@ verify_event_fields('delivery.dropped', Fields) ->
         qos := QoS,
         flags := Flags,
         timestamp := Timestamp,
-        topic := Topic,
-        client_attrs := ClientAttrs
+        topic := Topic
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4200,8 +4199,7 @@ verify_event_fields('delivery.dropped', Fields) ->
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp),
-    ?assert(is_map(ClientAttrs));
+    ?assert(EventAt =< Timestamp);
 verify_event_fields('message.dropped', Fields) ->
     #{
         id := ID,
@@ -4216,8 +4214,7 @@ verify_event_fields('message.dropped', Fields) ->
         flags := Flags,
         pub_props := Properties,
         timestamp := Timestamp,
-        publish_received_at := EventAt,
-        client_attrs := ClientAttrs
+        publish_received_at := EventAt
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4235,8 +4232,7 @@ verify_event_fields('message.dropped', Fields) ->
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp),
-    ?assert(is_map(ClientAttrs));
+    ?assert(EventAt =< Timestamp);
 verify_event_fields('message.delivered', Fields) ->
     #{
         id := ID,
@@ -4252,8 +4248,7 @@ verify_event_fields('message.delivered', Fields) ->
         flags := Flags,
         pub_props := Properties,
         timestamp := Timestamp,
-        publish_received_at := EventAt,
-        client_attrs := ClientAttrs
+        publish_received_at := EventAt
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4272,8 +4267,7 @@ verify_event_fields('message.delivered', Fields) ->
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp),
-    ?assert(is_map(ClientAttrs));
+    ?assert(EventAt =< Timestamp);
 verify_event_fields('message.acked', Fields) ->
     #{
         id := ID,
@@ -4290,8 +4284,7 @@ verify_event_fields('message.acked', Fields) ->
         pub_props := PubProps,
         puback_props := PubAckProps,
         timestamp := Timestamp,
-        publish_received_at := EventAt,
-        client_attrs := ClientAttrs
+        publish_received_at := EventAt
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4311,8 +4304,7 @@ verify_event_fields('message.acked', Fields) ->
     ?assert(is_map(PubAckProps)),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp),
-    ?assert(is_map(ClientAttrs));
+    ?assert(EventAt =< Timestamp);
 verify_event_fields('client.connack', Fields) ->
     #{
         clientid := ClientId,
@@ -4399,14 +4391,12 @@ verify_event_fields('schema.validation_failed', Fields) ->
         topic := _Topic,
         flags := _Flags,
         pub_props := _PubProps,
-        publish_received_at := _PublishReceivedAt,
-        client_attrs := ClientAttrs
+        publish_received_at := _PublishReceivedAt
     } = Fields,
     ?assertEqual(<<"v1">>, ValidationName),
     verify_peername(PeerName),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
     ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
-    ?assert(is_map(ClientAttrs)),
     ok;
 verify_event_fields('message.transformation_failed', Fields) ->
     #{
@@ -4419,14 +4409,12 @@ verify_event_fields('message.transformation_failed', Fields) ->
         topic := _Topic,
         flags := _Flags,
         pub_props := _PubProps,
-        publish_received_at := _PublishReceivedAt,
-        client_attrs := ClientAttrs
+        publish_received_at := _PublishReceivedAt
     } = Fields,
     ?assertEqual(<<"t1">>, TransformationName),
     verify_peername(PeerName),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
     ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
-    ?assert(is_map(ClientAttrs)),
     ok.
 
 verify_peername(PeerName) ->

+ 3 - 1
changes/ce/feat-13573.en.md

@@ -1 +1,3 @@
-Expose `client_attrs` to rule engine and rule events.
+Add `client_attrs` to SQL context for client connectivity events and message `publish` event.
+Now one can access client attributes in rule SQL like `SELECT client_attrs.attr1 AS attribute1` and use `${attribute1}` in data integration actions.
+