Просмотр исходного кода

Merge pull request #14166 from lafirest/feat/rabbit_dynamic_key

feat(rabbitmq): supports dynamic exchange and routing key
lafirest 1 год назад
Родитель
Сommit
868ed7c8d3

+ 1 - 1
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_rabbitmq, [
     {description, "EMQX Enterprise RabbitMQ Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {mod, {emqx_bridge_rabbitmq_app, []}},
     {applications, [

+ 36 - 4
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -314,8 +314,8 @@ publish_messages(
     Conn,
     RabbitMQ,
     DeliveryMode,
-    Exchange,
-    RoutingKey,
+    Exchange0,
+    RoutingKey0,
     PayloadTmpl,
     Messages,
     WaitForPublishConfirmations,
@@ -328,14 +328,19 @@ publish_messages(
                 headers = [],
                 delivery_mode = DeliveryMode
             },
+
+            Exchange = render_template(Exchange0, Messages),
+            RoutingKey = render_template(RoutingKey0, Messages),
             Method = #'basic.publish'{
                 exchange = Exchange,
                 routing_key = RoutingKey
             },
+
             FormattedMsgs = [
                 format_data(PayloadTmpl, M)
              || {_, M} <- Messages
             ],
+
             emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
                 messages => FormattedMsgs,
                 properties => #{
@@ -390,6 +395,20 @@ format_data([], Msg) ->
 format_data(Tokens, Msg) ->
     emqx_placeholder:proc_tmpl(Tokens, Msg).
 
+%% Dynamic `exchange` and `routing_key` are restricted in batch mode,
+%% we assume these two values ​​are the same in a batch.
+render_template({fixed, Data}, _) ->
+    Data;
+render_template(Template, [Req | _]) ->
+    render_template(Template, Req);
+render_template({dynamic, Template}, {_, Message}) ->
+    try
+        erlang:iolist_to_binary(emqx_template:render_strict(Template, {emqx_jsonish, Message}))
+    catch
+        error:_Errors ->
+            erlang:throw(bad_template)
+    end.
+
 handle_result({error, ecpool_empty}) ->
     {error, {recoverable_error, ecpool_empty}};
 handle_result(Res) ->
@@ -444,16 +463,29 @@ init_secret() ->
 preproc_parameter(#{config_root := actions, parameters := Parameter}) ->
     #{
         payload_template := PayloadTemplate,
-        delivery_mode := InitialDeliveryMode
+        delivery_mode := InitialDeliveryMode,
+        exchange := Exchange,
+        routing_key := RoutingKey
     } = Parameter,
     Parameter#{
         delivery_mode => delivery_mode(InitialDeliveryMode),
         payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
-        config_root => actions
+        config_root => actions,
+        exchange := preproc_template(Exchange),
+        routing_key := preproc_template(RoutingKey)
     };
 preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) ->
     Parameter#{hookpoints => Hooks, config_root => sources}.
 
+preproc_template(Template0) ->
+    Template = emqx_template:parse(Template0),
+    case emqx_template:placeholders(Template) of
+        [] ->
+            {fixed, emqx_utils_conv:bin(Template0)};
+        [_ | _] ->
+            {dynamic, Template}
+    end.
+
 delivery_mode(non_persistent) -> 1;
 delivery_mode(persistent) -> 2.
 

+ 2 - 2
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl

@@ -75,7 +75,7 @@ fields(action_parameters) ->
             )},
         {exchange,
             hoconsc:mk(
-                typerefl:binary(),
+                emqx_schema:template(),
                 #{
                     required => true,
                     desc => ?DESC(?CONNECTOR_SCHEMA, "exchange")
@@ -83,7 +83,7 @@ fields(action_parameters) ->
             )},
         {routing_key,
             hoconsc:mk(
-                typerefl:binary(),
+                emqx_schema:template(),
                 #{
                     required => true,
                     desc => ?DESC(?CONNECTOR_SCHEMA, "routing_key")

+ 67 - 12
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl

@@ -95,10 +95,10 @@ rabbitmq_source() ->
     },
     parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name).
 
-rabbitmq_action() ->
-    rabbitmq_action(rabbit_mq_exchange()).
+rabbitmq_action(TestCase) ->
+    rabbitmq_action(TestCase, rabbit_mq_exchange(TestCase)).
 
-rabbitmq_action(Exchange) ->
+rabbitmq_action(TestCase, Exchange) ->
     Name = atom_to_binary(?MODULE),
     Action = #{
         <<"actions">> => #{
@@ -109,7 +109,7 @@ rabbitmq_action(Exchange) ->
                     <<"parameters">> => #{
                         <<"exchange">> => Exchange,
                         <<"payload_template">> => <<"${.payload}">>,
-                        <<"routing_key">> => rabbit_mq_routing_key(),
+                        <<"routing_key">> => rabbit_mq_routing_key(TestCase),
                         <<"delivery_mode">> => <<"non_persistent">>,
                         <<"publish_confirmation_timeout">> => <<"30s">>,
                         <<"wait_for_publish_confirmations">> => true
@@ -138,11 +138,11 @@ create_source(Name) ->
 delete_source(Name) ->
     ok = emqx_bridge_v2:remove(sources, ?TYPE, Name).
 
-create_action(Name) ->
-    create_action(Name, rabbit_mq_exchange()).
+create_action(TestCase, Name) ->
+    create_action(TestCase, Name, rabbit_mq_exchange(TestCase)).
 
-create_action(Name, Exchange) ->
-    Action = rabbitmq_action(Exchange),
+create_action(TestCase, Name, Exchange) ->
+    Action = rabbitmq_action(TestCase, Exchange),
     {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action).
 
 delete_action(Name) ->
@@ -228,14 +228,14 @@ t_source_probe(_Config) ->
 
 t_action_probe(_Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
-    Action = rabbitmq_action(),
+    Action = rabbitmq_action(?FUNCTION_NAME),
     {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
     ?assertMatch({{_, 204, _}, _, _}, Res0),
     ok.
 
 t_action(Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
-    create_action(Name),
+    create_action(?FUNCTION_NAME, Name),
     Actions = emqx_bridge_v2:list(actions),
     Any = fun(#{name := BName}) -> BName =:= Name end,
     ?assert(lists:any(Any, Actions), Actions),
@@ -276,7 +276,7 @@ t_action(Config) ->
 
 t_action_not_exist_exchange(_Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
-    create_action(Name, <<"not_exist_exchange">>),
+    create_action(?FUNCTION_NAME, Name, <<"not_exist_exchange">>),
     Actions = emqx_bridge_v2:list(actions),
     Any = fun(#{name := BName}) -> BName =:= Name end,
     ?assert(lists:any(Any, Actions), Actions),
@@ -336,7 +336,7 @@ t_action_not_exist_exchange(_Config) ->
     ).
 
 t_replace_action_source(Config) ->
-    Action = #{<<"rabbitmq">> => #{<<"my_action">> => rabbitmq_action()}},
+    Action = #{<<"rabbitmq">> => #{<<"my_action">> => rabbitmq_action(?FUNCTION_NAME)}},
     Source = #{<<"rabbitmq">> => #{<<"my_source">> => rabbitmq_source()}},
     ConnectorName = atom_to_binary(?MODULE),
     Connector = #{<<"rabbitmq">> => #{ConnectorName => rabbitmq_connector(get_rabbitmq(Config))}},
@@ -389,6 +389,47 @@ t_replace_action_source(Config) ->
     ),
     ok.
 
+t_action_dynamic(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    create_action(?FUNCTION_NAME, Name),
+    Actions = emqx_bridge_v2:list(actions),
+    Any = fun(#{name := BName}) -> BName =:= Name end,
+    ?assert(lists:any(Any, Actions), Actions),
+    Topic = <<"rabbitdynaction">>,
+    {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
+        #{
+            sql => <<"select * from \"", Topic/binary, "\"">>,
+            id => atom_to_binary(?FUNCTION_NAME),
+            actions => [<<"rabbitmq:", Name/binary>>],
+            description => <<"bridge_v2 send msg to rabbitmq action">>
+        }
+    ),
+    on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
+    {ok, C1} = emqtt:start_link([{clean_start, true}]),
+    {ok, _} = emqtt:connect(C1),
+    Payload = payload(?FUNCTION_NAME),
+    PayloadBin = emqx_utils_json:encode(Payload),
+    {ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]),
+    Msg = receive_message_from_rabbitmq(Config),
+    ?assertMatch(Payload, Msg),
+    ok = emqtt:disconnect(C1),
+    InstanceId = instance_id(actions, Name),
+    #{counters := Counters} = emqx_resource:get_metrics(InstanceId),
+    ok = delete_action(Name),
+    ActionsAfterDelete = emqx_bridge_v2:list(actions),
+    ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
+    ?assertMatch(
+        #{
+            dropped := 0,
+            success := 0,
+            matched := 1,
+            failed := 0,
+            received := 0
+        },
+        Counters
+    ),
+    ok.
+
 waiting_for_disconnected_alarms(InstanceId) ->
     waiting_for_disconnected_alarms(InstanceId, 0).
 
@@ -452,6 +493,10 @@ receive_messages(Count, Msgs) ->
 payload() ->
     #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}.
 
+payload(t_action_dynamic) ->
+    Payload = payload(),
+    Payload#{<<"e">> => rabbit_mq_exchange(), <<"r">> => rabbit_mq_routing_key()}.
+
 send_test_message_to_rabbitmq(Config) ->
     #{channel := Channel} = get_channel_connection(Config),
     MessageProperties = #'P_basic'{
@@ -481,3 +526,13 @@ instance_id(Type, Name) ->
             actions -> <<"action:">>
         end,
     <<TypeBin/binary, BridgeId/binary, ":", ConnectorId/binary>>.
+
+rabbit_mq_exchange(t_action_dynamic) ->
+    <<"${payload.e}">>;
+rabbit_mq_exchange(_) ->
+    rabbit_mq_exchange().
+
+rabbit_mq_routing_key(t_action_dynamic) ->
+    <<"${payload.r}">>;
+rabbit_mq_routing_key(_) ->
+    rabbit_mq_routing_key().

+ 4 - 0
changes/ee/feat-14166.en.md

@@ -0,0 +1,4 @@
+The `exchange` and `routing_key` in RabbitMQ producer can be configured as template values.
+For example, to extract the routing key from the payload, we could set "routing_key" to "${payload.akey}".
+
+Note, the templated `exchange` and `routing_key` are restricted in batch mode: We always assume that the value of them is the same for every message in a batch.

+ 2 - 2
rel/i18n/emqx_bridge_rabbitmq_connector_schema.hocon

@@ -56,7 +56,7 @@ auto_reconnect.label:
 """Auto Reconnect"""
 
 exchange.desc:
-"""The name of the RabbitMQ exchange where the messages will be sent."""
+"""The name of the RabbitMQ exchange where the messages will be sent. Supports templates (e.g.: `e-${payload.e}`)."""
 
 exchange.label:
 """Exchange"""
@@ -68,7 +68,7 @@ exchange_type.label:
 """Exchange Type"""
 
 routing_key.desc:
-"""The routing key used to route messages to the correct queue in the RabbitMQ exchange."""
+"""The routing key used to route messages to the correct queue in the RabbitMQ exchange. Supports templates (e.g.: `k-${payload.r}`)."""
 
 routing_key.label:
 """Routing Key"""

+ 0 - 1
rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon

@@ -10,7 +10,6 @@ subscriber_source.desc:
 subscriber_source.label:
 """Source"""
 
-
 action_parameters.desc:
 """The action config defines how this bridge send messages to the remote RabbitMQ broker"""
 action_parameters.label: