Просмотр исходного кода

feat: make republish action possible run without dead-loop

the republish action triggers message.publish hook which
then triggers the republish action itself again. i.e. dead-loop
zmstone 1 год назад
Родитель
Сommit
801c5b19fe

+ 11 - 1
apps/emqx/src/emqx_broker.erl

@@ -97,7 +97,9 @@
 -type publish_opts() :: #{
     %% Whether to return a disinguishing value `{blocked, #message{}}' when a hook from
     %% `'message.publish''` returns `allow_publish => false'.  Defaults to `false'.
-    hook_prohibition_as_error => boolean()
+    hook_prohibition_as_error => boolean(),
+    %% do not call message.publish hook point if true
+    bypass_hook => boolean()
 }.
 
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
@@ -243,6 +245,14 @@ publish(#message{} = Msg) ->
 publish(#message{} = Msg, Opts) ->
     _ = emqx_trace:publish(Msg),
     emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
+    case maps:get(bypass_hook, Opts, false) of
+        true ->
+            do_publish(Msg);
+        false ->
+            eval_hook_and_publish(Msg, Opts)
+    end.
+
+eval_hook_and_publish(Msg, Opts) ->
     case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
         #message{headers = #{should_disconnect := true}, topic = Topic} ->
             ?TRACE("MQTT", "msg_publish_not_allowed_disconnect", #{

+ 2 - 1
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl

@@ -173,7 +173,8 @@ t_source(Config) ->
                         payload => <<"${payload}">>,
                         qos => 0,
                         retain => false,
-                        user_properties => []
+                        user_properties => [],
+                        direct_dispatch => false
                     },
                     function => republish
                 }

+ 16 - 7
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -83,7 +83,8 @@ pre_process_action_args(
         retain := Retain,
         payload := Payload,
         mqtt_properties := MQTTProperties,
-        user_properties := UserProperties
+        user_properties := UserProperties,
+        direct_dispatch := DirectDispatch
     } = Args
 ) ->
     Args#{
@@ -93,7 +94,8 @@ pre_process_action_args(
             retain => parse_simple_var(Retain),
             payload => parse_payload(Payload),
             mqtt_properties => parse_mqtt_properties(MQTTProperties),
-            user_properties => parse_user_properties(UserProperties)
+            user_properties => parse_user_properties(UserProperties),
+            direct_dispatch => parse_simple_var(DirectDispatch)
         }
     };
 pre_process_action_args(_, Args) ->
@@ -153,7 +155,8 @@ republish(
             topic := TopicTemplate,
             payload := PayloadTemplate,
             mqtt_properties := MQTTPropertiesTemplate,
-            user_properties := UserPropertiesTemplate
+            user_properties := UserPropertiesTemplate,
+            direct_dispatch := DirectDispatchTemplate
         }
     }
 ) ->
@@ -164,6 +167,7 @@ republish(
     Payload = iolist_to_binary(PayloadString),
     QoS = render_simple_var(QoSTemplate, Selected, 0),
     Retain = render_simple_var(RetainTemplate, Selected, false),
+    DirectDispatch = render_simple_var(DirectDispatchTemplate, Selected, false),
     %% 'flags' is set for message re-publishes or message related
     %% events such as message.acked and message.dropped
     Flags0 = maps:get(flags, Env, #{}),
@@ -175,7 +179,8 @@ republish(
         flags => Flags,
         topic => Topic,
         payload => Payload,
-        pub_props => PubProps
+        pub_props => PubProps,
+        direct_dispatch => DirectDispatch
     },
     case logger:get_process_metadata() of
         #{action_id := ActionID} ->
@@ -190,7 +195,7 @@ republish(
         "republish_message",
         TraceInfo
     ),
-    safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
+    safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps, DirectDispatch).
 
 %%--------------------------------------------------------------------
 %% internal functions
@@ -232,7 +237,7 @@ pre_process_args(Mod, Func, Args) ->
         false -> Args
     end.
 
-safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
+safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps, DirectDispatch) ->
     Msg = #message{
         id = emqx_guid:gen(),
         qos = QoS,
@@ -246,7 +251,11 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
         payload = Payload,
         timestamp = erlang:system_time(millisecond)
     },
-    case emqx_broker:safe_publish(Msg, #{hook_prohibition_as_error => true}) of
+    case
+        emqx_broker:safe_publish(Msg, #{
+            bypass_hook => DirectDispatch, hook_prohibition_as_error => true
+        })
+    of
         Routes when is_list(Routes) ->
             emqx_metrics:inc_msg(Msg),
             ok;

+ 13 - 5
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -144,7 +144,7 @@ fields("republish_args") ->
     [
         {topic,
             ?HOCON(
-                binary(),
+                emqx_schema:template(),
                 #{
                     desc => ?DESC("republish_args_topic"),
                     required => true,
@@ -162,7 +162,7 @@ fields("republish_args") ->
             )},
         {retain,
             ?HOCON(
-                hoconsc:union([boolean(), binary()]),
+                hoconsc:union([boolean(), emqx_schema:template()]),
                 #{
                     desc => ?DESC("republish_args_retain"),
                     default => <<"${retain}">>,
@@ -171,7 +171,7 @@ fields("republish_args") ->
             )},
         {payload,
             ?HOCON(
-                binary(),
+                emqx_schema:template(),
                 #{
                     desc => ?DESC("republish_args_payload"),
                     default => <<"${payload}">>,
@@ -188,12 +188,20 @@ fields("republish_args") ->
             )},
         {user_properties,
             ?HOCON(
-                binary(),
+                emqx_schema:template(),
                 #{
                     desc => ?DESC("republish_args_user_properties"),
                     default => <<"${user_properties}">>,
                     example => <<"${pub_props.'User-Property'}">>
                 }
+            )},
+        {direct_dispatch,
+            ?HOCON(
+                hoconsc:union([boolean(), emqx_schema:template()]),
+                #{
+                    desc => ?DESC("republish_args_direct_dispatch"),
+                    default => false
+                }
             )}
     ];
 fields("republish_mqtt_properties") ->
@@ -263,7 +271,7 @@ actions() ->
     end.
 
 qos() ->
-    hoconsc:union([emqx_schema:qos(), binary()]).
+    hoconsc:union([emqx_schema:qos(), emqx_schema:template()]).
 
 rule_engine_settings() ->
     [

+ 42 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -81,6 +81,7 @@ groups() ->
             t_sqlselect_1,
             t_sqlselect_2,
             t_sqlselect_3,
+            t_direct_dispatch,
             t_sqlselect_message_publish_event_keep_original_props_1,
             t_sqlselect_message_publish_event_keep_original_props_2,
             t_sqlselect_missing_template_vars_render_as_undefined,
@@ -1996,6 +1997,42 @@ t_sqlselect_3(_Config) ->
     emqtt:stop(Client),
     delete_rule(TopicRule).
 
+%% select from t/1, republish to t/1, no dead-loop expected
+%% i.e. payload is mutated once and only once
+t_direct_dispatch(_Config) ->
+    SQL = "SELECT * FROM \"t/1\"",
+    Repub = republish_action(
+        <<"t/1">>,
+        <<"republished: ${payload}">>,
+        <<"${user_properties}">>,
+        #{},
+        true
+    ),
+    {ok, Rule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [Repub]
+        }
+    ),
+    {ok, Pub} = emqtt:start_link([{clientid, <<"pubclient">>}]),
+    {ok, _} = emqtt:connect(Pub),
+    {ok, Sub} = emqtt:start_link([{clientid, <<"subclient">>}]),
+    {ok, _} = emqtt:connect(Sub),
+    {ok, _, _} = emqtt:subscribe(Sub, <<"t/1">>, 0),
+    Payload = base64:encode(crypto:strong_rand_bytes(12)),
+    emqtt:publish(Pub, <<"t/1">>, Payload, 0),
+    receive
+        {publish, #{topic := T, payload := Payload1}} ->
+            ?assertEqual(<<"t/1">>, T),
+            ?assertEqual(<<"republished: ", Payload/binary>>, Payload1)
+    after 2000 ->
+        ct:fail(wait_for_t2)
+    end,
+    emqtt:stop(Pub),
+    emqtt:stop(Sub),
+    delete_rule(Rule).
+
 t_sqlselect_message_publish_event_keep_original_props_1(_Config) ->
     %% republish the client.connected msg
     Topic = <<"foo/bar/1">>,
@@ -3960,6 +3997,9 @@ republish_action(Topic, Payload, UserProperties) ->
     republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}).
 
 republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
+    republish_action(Topic, Payload, UserProperties, MQTTProperties, false).
+
+republish_action(Topic, Payload, UserProperties, MQTTProperties, DirectDispatch) ->
     #{
         function => republish,
         args => #{
@@ -3968,7 +4008,8 @@ republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
             qos => 0,
             retain => false,
             mqtt_properties => MQTTProperties,
-            user_properties => UserProperties
+            user_properties => UserProperties,
+            direct_dispatch => DirectDispatch
         }
     }.
 

+ 2 - 1
apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl

@@ -1499,7 +1499,8 @@ t_republish_action_failure(_Config) ->
                                 <<"qos">> => 0,
                                 <<"retain">> => false,
                                 <<"topic">> => <<"t/republished">>,
-                                <<"user_properties">> => <<>>
+                                <<"user_properties">> => <<>>,
+                                <<"direct_dispatch">> => false
                             }
                     }
                 ]

+ 4 - 0
changes/ce/feat-13516.en.md

@@ -0,0 +1,4 @@
+Add a `direct_dispatch` argument for `republish` action.
+
+When `direct_dispatch` is set to `true` (or rendered as `true` from template) the message will be directly dispatched to subscribers.
+This can be used to avoid to triggering other rules or recursively trigger the self-rule.

+ 17 - 0
rel/i18n/emqx_rule_engine_schema.hocon

@@ -113,6 +113,23 @@ Placeholders like <code>${.payload.content_type}</code> may be used."""
 republish_args_mqtt_properties.label:
 """MQTT Properties"""
 
+republish_args_direct_dispatch.desc:
+"""Enable direct dispatch to subscribers without initiating a new message publish event.
+When set to `true`, this prevents the recursive processing of a message by the same action
+and is used when the output message does not require further processing.
+
+However, enabling this feature has several limitations:
+
+- The output message from this action is not retained.
+- It does not trigger other rules that operate based on the output topic of this action.
+- It does not activate rules that select from the `$events/message_publish`.
+- It does not trigger plugins that use the `'message.publish'` hook.
+- Topic metrics are not collected for the output message of this action.
+- Message schema validation is not applied (feature of EMQX Enterprise).
+- Message transformation processes are not applied (feature of EMQX Enterprise)."""
+
+republish_args_direct_dispatch.label: "Direct Dispatch"
+
 republish_function.desc:
 """Republish the message as a new MQTT message"""