Просмотр исходного кода

feat(rabbitmq): expose some meta info into the rule engine for the source action

includes the `queue`, `exchange`, `routing_key`
firest 1 год назад
Родитель
Сommit
d38b3a1300

+ 12 - 4
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl

@@ -35,13 +35,13 @@ handle_cast(_Request, State) ->
     {noreply, State}.
 
 handle_info(
-    {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{
+    {#'basic.deliver'{delivery_tag = Tag} = BasicDeliver, #amqp_msg{
         payload = Payload,
         props = PBasic
     }},
     {Channel, InstanceId, Params} = State
 ) ->
-    Message = to_map(PBasic, Payload),
+    Message = to_map(BasicDeliver, PBasic, Params, Payload),
     #{hookpoints := Hooks, no_ack := NoAck} = Params,
     lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [Message]) end, Hooks),
     (NoAck =:= false) andalso
@@ -53,7 +53,9 @@ handle_info(#'basic.cancel_ok'{}, State) ->
 handle_info(_Info, State) ->
     {noreply, State}.
 
-to_map(PBasic, Payload) ->
+to_map(BasicDeliver, PBasic, Params, Payload) ->
+    #'basic.deliver'{exchange = Exchange, routing_key = RoutingKey} = BasicDeliver,
+
     #'P_basic'{
         content_type = ContentType,
         content_encoding = ContentEncoding,
@@ -70,6 +72,9 @@ to_map(PBasic, Payload) ->
         app_id = AppId,
         cluster_id = ClusterId
     } = PBasic,
+
+    #{queue := Queue} = Params,
+
     Message = #{
         <<"payload">> => make_payload(Payload),
         <<"content_type">> => ContentType,
@@ -85,7 +90,10 @@ to_map(PBasic, Payload) ->
         <<"type">> => Type,
         <<"user_id">> => UserId,
         <<"app_id">> => AppId,
-        <<"cluster_id">> => ClusterId
+        <<"cluster_id">> => ClusterId,
+        <<"exchange">> => Exchange,
+        <<"routing_key">> => RoutingKey,
+        <<"queue">> => Queue
     },
     maps:filtermap(fun(_K, V) -> V =/= undefined andalso V =/= <<"undefined">> end, Message).
 

+ 20 - 4
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl

@@ -163,7 +163,13 @@ t_source(Config) ->
     Topic = <<"tesldkafd">>,
     {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
         #{
-            sql => <<"select * from \"$bridges/rabbitmq:", Name/binary, "\"">>,
+            sql =>
+                <<
+                    "select *, queue as payload.queue, exchange as payload.exchange,"
+                    "routing_key as payload.routing_key from \"$bridges/rabbitmq:",
+                    Name/binary,
+                    "\""
+                >>,
             id => atom_to_binary(?FUNCTION_NAME),
             actions => [
                 #{
@@ -187,7 +193,8 @@ t_source(Config) ->
     {ok, _} = emqtt:connect(C1),
     {ok, #{}, [0]} = emqtt:subscribe(C1, Topic, [{qos, 0}, {rh, 0}]),
     send_test_message_to_rabbitmq(Config),
-    PayloadBin = emqx_utils_json:encode(payload()),
+
+    Received = receive_messages(1),
     ?assertMatch(
         [
             #{
@@ -195,12 +202,21 @@ t_source(Config) ->
                 properties := undefined,
                 topic := Topic,
                 qos := 0,
-                payload := PayloadBin,
+                payload := _,
                 retain := false
             }
         ],
-        receive_messages(1)
+        Received
     ),
+    [#{payload := ReceivedPayload}] = Received,
+    Meta = #{
+        <<"exchange">> => rabbit_mq_exchange(),
+        <<"routing_key">> => rabbit_mq_routing_key(),
+        <<"queue">> => rabbit_mq_queue()
+    },
+    ExpectedPayload = maps:merge(payload(), Meta),
+    ?assertMatch(ExpectedPayload, emqx_utils_json:decode(ReceivedPayload)),
+
     ok = emqtt:disconnect(C1),
     InstanceId = instance_id(sources, Name),
     #{counters := Counters} = emqx_resource:get_metrics(InstanceId),

+ 5 - 0
changes/ee/feat-14176.en.md

@@ -0,0 +1,5 @@
+Some metadata was exposed to the rule engine for RabbitMQ source actions, including `queue`, `exchange` and the `routing_key`.
+
+Here is an example:
+`select *, queue as payload.queue, exchange as payload.exchange, routing_key as payload.routing_key from "$bridges/rabbitmq:test"`
+