Просмотр исходного кода

Merge pull request #13573 from lafirest/feat/cattrs

feat(ruleengine): expose client_attrs to rule-engine
lafirest 1 год назад
Родитель
Сommit
9a2f878017

+ 29 - 17
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -334,13 +334,14 @@ eventmsg_publish(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp
+            publish_received_at => Timestamp,
+            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
         },
         #{headers => Headers}
     ).
 
 eventmsg_connected(
-    _ClientInfo = #{
+    ClientInfo = #{
         clientid := ClientId,
         username := Username,
         is_bridge := IsBridge,
@@ -375,13 +376,14 @@ eventmsg_connected(
             expiry_interval => ExpiryInterval div 1000,
             is_bridge => IsBridge,
             conn_props => printable_maps(ConnProps),
-            connected_at => ConnectedAt
+            connected_at => ConnectedAt,
+            client_attrs => maps:get(client_attrs, ClientInfo, #{})
         },
         #{}
     ).
 
 eventmsg_disconnected(
-    _ClientInfo = #{
+    ClientInfo = #{
         clientid := ClientId,
         username := Username
     },
@@ -405,7 +407,8 @@ eventmsg_disconnected(
             proto_name => ProtoName,
             proto_ver => ProtoVer,
             disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
-            disconnected_at => DisconnectedAt
+            disconnected_at => DisconnectedAt,
+            client_attrs => maps:get(client_attrs, ClientInfo, #{})
         },
         #{}
     ).
@@ -444,7 +447,7 @@ eventmsg_connack(
     ).
 
 eventmsg_check_authz_complete(
-    _ClientInfo = #{
+    ClientInfo = #{
         clientid := ClientId,
         username := Username,
         peerhost := PeerHost,
@@ -465,13 +468,14 @@ eventmsg_check_authz_complete(
             topic => Topic,
             action => PubSub,
             authz_source => AuthzSource,
-            result => Result
+            result => Result,
+            client_attrs => maps:get(client_attrs, ClientInfo, #{})
         },
         #{}
     ).
 
 eventmsg_check_authn_complete(
-    _ClientInfo = #{
+    ClientInfo = #{
         clientid := ClientId,
         username := Username,
         peername := PeerName
@@ -493,14 +497,15 @@ eventmsg_check_authn_complete(
             peername => ntoa(PeerName),
             reason_code => force_to_bin(Reason),
             is_anonymous => IsAnonymous,
-            is_superuser => IsSuperuser
+            is_superuser => IsSuperuser,
+            client_attrs => maps:get(client_attrs, ClientInfo, #{})
         },
         #{}
     ).
 
 eventmsg_sub_or_unsub(
     Event,
-    _ClientInfo = #{
+    ClientInfo = #{
         clientid := ClientId,
         username := Username,
         peerhost := PeerHost,
@@ -519,7 +524,8 @@ eventmsg_sub_or_unsub(
             peername => ntoa(PeerName),
             PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
             topic => Topic,
-            qos => QoS
+            qos => QoS,
+            client_attrs => maps:get(client_attrs, ClientInfo, #{})
         },
         #{}
     ).
@@ -551,7 +557,8 @@ eventmsg_dropped(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp
+            publish_received_at => Timestamp,
+            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
         },
         #{headers => Headers}
     ).
@@ -583,7 +590,8 @@ eventmsg_transformation_failed(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp
+            publish_received_at => Timestamp,
+            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
         },
         #{headers => Headers}
     ).
@@ -616,7 +624,8 @@ eventmsg_validation_failed(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp
+            publish_received_at => Timestamp,
+            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
         },
         #{headers => Headers}
     ).
@@ -654,7 +663,8 @@ eventmsg_delivered(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp
+            publish_received_at => Timestamp,
+            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
         },
         #{headers => Headers}
     ).
@@ -693,7 +703,8 @@ eventmsg_acked(
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
             puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
-            publish_received_at => Timestamp
+            publish_received_at => Timestamp,
+            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
         },
         #{headers => Headers}
     ).
@@ -733,7 +744,8 @@ eventmsg_delivery_dropped(
             qos => QoS,
             flags => Flags,
             pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
-            publish_received_at => Timestamp
+            publish_received_at => Timestamp,
+            client_attrs => emqx_message:get_header(client_attrs, Message, #{})
         },
         #{headers => Headers}
     ).

+ 99 - 23
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -112,7 +112,8 @@ groups() ->
             t_sqlparse_undefined_variable,
             t_sqlparse_new_map,
             t_sqlparse_invalid_json,
-            t_sqlselect_as_put
+            t_sqlselect_as_put,
+            t_sqlselect_client_attr
         ]},
         {events, [], [
             t_events,
@@ -3891,6 +3892,57 @@ t_trace_rule_id(_Config) ->
     ?assertEqual([], emqx_trace_handler:running()),
     emqtt:disconnect(T).
 
+t_sqlselect_client_attr(_) ->
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    {ok, Compiled} = emqx_variform:compile("user_property.group"),
+    emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], [
+        #{
+            expression => Compiled,
+            set_as_attr => <<"group">>
+        },
+        #{
+            expression => Compiled,
+            set_as_attr => <<"group2">>
+        }
+    ]),
+
+    SQL =
+        "SELECT client_attrs as payload FROM \"t/1\" ",
+    Repub = republish_action(<<"t/2">>),
+    {ok, _TopicRule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [Repub]
+        }
+    ),
+
+    {ok, Client} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {properties, #{'User-Property' => [{<<"group">>, <<"g1">>}]}}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+
+    {ok, _, _} = emqtt:subscribe(Client, <<"t/2">>, 0),
+    ct:sleep(100),
+    emqtt:publish(Client, <<"t/1">>, <<"Hello">>),
+
+    receive
+        {publish, #{topic := Topic, payload := Payload}} ->
+            ?assertEqual(<<"t/2">>, Topic),
+            ?assertMatch(
+                #{<<"group">> := <<"g1">>, <<"group2">> := <<"g1">>},
+                emqx_utils_json:decode(Payload)
+            )
+    after 1000 ->
+        ct:fail(wait_for_t_2)
+    end,
+
+    emqtt:disconnect(Client),
+    emqx_rule_engine:delete_rule(?TMP_RULEID),
+    emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []).
+
 %%------------------------------------------------------------------------------
 %% Internal helpers
 %%------------------------------------------------------------------------------
@@ -3990,7 +4042,8 @@ verify_event_fields('message.publish', Fields) ->
         flags := Flags,
         pub_props := Properties,
         timestamp := Timestamp,
-        publish_received_at := EventAt
+        publish_received_at := EventAt,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4007,7 +4060,8 @@ verify_event_fields('message.publish', Fields) ->
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp);
+    ?assert(EventAt =< Timestamp),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('client.connected', Fields) ->
     #{
         clientid := ClientId,
@@ -4023,7 +4077,8 @@ verify_event_fields('client.connected', Fields) ->
         is_bridge := IsBridge,
         conn_props := Properties,
         timestamp := Timestamp,
-        connected_at := EventAt
+        connected_at := EventAt,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4042,7 +4097,8 @@ verify_event_fields('client.connected', Fields) ->
     ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp);
+    ?assert(EventAt =< Timestamp),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('client.disconnected', Fields) ->
     #{
         reason := Reason,
@@ -4052,7 +4108,8 @@ verify_event_fields('client.disconnected', Fields) ->
         sockname := SockName,
         disconn_props := Properties,
         timestamp := Timestamp,
-        disconnected_at := EventAt
+        disconnected_at := EventAt,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4065,7 +4122,8 @@ verify_event_fields('client.disconnected', Fields) ->
     ?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp);
+    ?assert(EventAt =< Timestamp),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields(SubUnsub, Fields) when
     SubUnsub == 'session.subscribed';
     SubUnsub == 'session.unsubscribed'
@@ -4077,7 +4135,8 @@ verify_event_fields(SubUnsub, Fields) when
         peername := PeerName,
         topic := Topic,
         qos := QoS,
-        timestamp := Timestamp
+        timestamp := Timestamp,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4097,7 +4156,8 @@ verify_event_fields(SubUnsub, Fields) when
         #{'User-Property' := #{<<"topic_name">> := <<"t1">>}},
         maps:get(PropKey, Fields)
     ),
-    ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000);
+    ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('delivery.dropped', Fields) ->
     #{
         event := 'delivery.dropped',
@@ -4117,7 +4177,8 @@ verify_event_fields('delivery.dropped', Fields) ->
         qos := QoS,
         flags := Flags,
         timestamp := Timestamp,
-        topic := Topic
+        topic := Topic,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4139,7 +4200,8 @@ verify_event_fields('delivery.dropped', Fields) ->
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp);
+    ?assert(EventAt =< Timestamp),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('message.dropped', Fields) ->
     #{
         id := ID,
@@ -4154,7 +4216,8 @@ verify_event_fields('message.dropped', Fields) ->
         flags := Flags,
         pub_props := Properties,
         timestamp := Timestamp,
-        publish_received_at := EventAt
+        publish_received_at := EventAt,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4172,7 +4235,8 @@ verify_event_fields('message.dropped', Fields) ->
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp);
+    ?assert(EventAt =< Timestamp),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('message.delivered', Fields) ->
     #{
         id := ID,
@@ -4188,7 +4252,8 @@ verify_event_fields('message.delivered', Fields) ->
         flags := Flags,
         pub_props := Properties,
         timestamp := Timestamp,
-        publish_received_at := EventAt
+        publish_received_at := EventAt,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4207,7 +4272,8 @@ verify_event_fields('message.delivered', Fields) ->
     ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp);
+    ?assert(EventAt =< Timestamp),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('message.acked', Fields) ->
     #{
         id := ID,
@@ -4224,7 +4290,8 @@ verify_event_fields('message.acked', Fields) ->
         pub_props := PubProps,
         puback_props := PubAckProps,
         timestamp := Timestamp,
-        publish_received_at := EventAt
+        publish_received_at := EventAt,
+        client_attrs := ClientAttrs
     } = Fields,
     Now = erlang:system_time(millisecond),
     TimestampElapse = Now - Timestamp,
@@ -4244,7 +4311,8 @@ verify_event_fields('message.acked', Fields) ->
     ?assert(is_map(PubAckProps)),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
-    ?assert(EventAt =< Timestamp);
+    ?assert(EventAt =< Timestamp),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('client.connack', Fields) ->
     #{
         clientid := ClientId,
@@ -4282,7 +4350,8 @@ verify_event_fields('client.check_authz_complete', Fields) ->
         peername := PeerName,
         topic := Topic,
         authz_source := AuthzSource,
-        username := Username
+        username := Username,
+        client_attrs := ClientAttrs
     } = Fields,
     ?assertEqual(<<"t1">>, Topic),
     ?assert(lists:member(Action, [subscribe, publish])),
@@ -4302,20 +4371,23 @@ verify_event_fields('client.check_authz_complete', Fields) ->
         ])
     ),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
-    ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>]));
+    ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('client.check_authn_complete', Fields) ->
     #{
         clientid := ClientId,
         peername := PeerName,
         username := Username,
         is_anonymous := IsAnonymous,
-        is_superuser := IsSuperuser
+        is_superuser := IsSuperuser,
+        client_attrs := ClientAttrs
     } = Fields,
     verify_peername(PeerName),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
     ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
     ?assert(erlang:is_boolean(IsAnonymous)),
-    ?assert(erlang:is_boolean(IsSuperuser));
+    ?assert(erlang:is_boolean(IsSuperuser)),
+    ?assert(is_map(ClientAttrs));
 verify_event_fields('schema.validation_failed', Fields) ->
     #{
         validation := ValidationName,
@@ -4327,12 +4399,14 @@ verify_event_fields('schema.validation_failed', Fields) ->
         topic := _Topic,
         flags := _Flags,
         pub_props := _PubProps,
-        publish_received_at := _PublishReceivedAt
+        publish_received_at := _PublishReceivedAt,
+        client_attrs := ClientAttrs
     } = Fields,
     ?assertEqual(<<"v1">>, ValidationName),
     verify_peername(PeerName),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
     ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
+    ?assert(is_map(ClientAttrs)),
     ok;
 verify_event_fields('message.transformation_failed', Fields) ->
     #{
@@ -4345,12 +4419,14 @@ verify_event_fields('message.transformation_failed', Fields) ->
         topic := _Topic,
         flags := _Flags,
         pub_props := _PubProps,
-        publish_received_at := _PublishReceivedAt
+        publish_received_at := _PublishReceivedAt,
+        client_attrs := ClientAttrs
     } = Fields,
     ?assertEqual(<<"t1">>, TransformationName),
     verify_peername(PeerName),
     ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
     ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
+    ?assert(is_map(ClientAttrs)),
     ok.
 
 verify_peername(PeerName) ->

+ 1 - 0
changes/ce/feat-13573.en.md

@@ -0,0 +1 @@
+Expose `client_attrs` to rule engine and rule events.