Selaa lähdekoodia

chore: distinguish between hook prohibition and no routes when publishing

Fixes https://emqx.atlassian.net/browse/EMQX-12917
Thales Macedo Garitezi 1 vuosi sitten
vanhempi
commit
c71e7f4e6e

+ 27 - 6
apps/emqx/src/emqx_broker.erl

@@ -40,7 +40,9 @@
 
 
 -export([
 -export([
     publish/1,
     publish/1,
-    safe_publish/1
+    publish/2,
+    safe_publish/1,
+    safe_publish/2
 ]).
 ]).
 
 
 -export([dispatch/2]).
 -export([dispatch/2]).
@@ -92,6 +94,12 @@
     end
     end
 ).
 ).
 
 
+-type publish_opts() :: #{
+    %% Whether to return a disinguishing value `{blocked, #message{}}' when a hook from
+    %% `'message.publish''` returns `allow_publish => false'.  Defaults to `false'.
+    hook_prohibition_as_error => boolean()
+}.
+
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
 start_link(Pool, Id) ->
 start_link(Pool, Id) ->
     ok = create_tabs(),
     ok = create_tabs(),
@@ -228,7 +236,11 @@ do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -spec publish(emqx_types:message()) -> emqx_types:publish_result().
 -spec publish(emqx_types:message()) -> emqx_types:publish_result().
-publish(Msg) when is_record(Msg, message) ->
+publish(#message{} = Msg) ->
+    publish(#message{} = Msg, _Opts = #{}).
+
+-spec publish(emqx_types:message(), publish_opts()) -> emqx_types:publish_result().
+publish(#message{} = Msg, Opts) ->
     _ = emqx_trace:publish(Msg),
     _ = emqx_trace:publish(Msg),
     emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
     emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
     case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
     case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
@@ -238,12 +250,17 @@ publish(Msg) when is_record(Msg, message) ->
                 topic => Topic
                 topic => Topic
             }),
             }),
             disconnect;
             disconnect;
-        #message{headers = #{allow_publish := false}, topic = Topic} ->
+        #message{headers = #{allow_publish := false}, topic = Topic} = Message ->
             ?TRACE("MQTT", "msg_publish_not_allowed", #{
             ?TRACE("MQTT", "msg_publish_not_allowed", #{
                 message => emqx_message:to_log_map(Msg),
                 message => emqx_message:to_log_map(Msg),
                 topic => Topic
                 topic => Topic
             }),
             }),
-            [];
+            case maps:get(hook_prohibition_as_error, Opts, false) of
+                true ->
+                    {blocked, Message};
+                false ->
+                    []
+            end;
         Msg1 = #message{} ->
         Msg1 = #message{} ->
             do_publish(Msg1);
             do_publish(Msg1);
         Msgs when is_list(Msgs) ->
         Msgs when is_list(Msgs) ->
@@ -277,9 +294,13 @@ persist_publish(Msg) ->
 
 
 %% Called internally
 %% Called internally
 -spec safe_publish(emqx_types:message()) -> emqx_types:publish_result().
 -spec safe_publish(emqx_types:message()) -> emqx_types:publish_result().
-safe_publish(Msg) when is_record(Msg, message) ->
+safe_publish(Msg) ->
+    safe_publish(Msg, _Opts = #{}).
+
+-spec safe_publish(emqx_types:message(), publish_opts()) -> emqx_types:publish_result().
+safe_publish(#message{} = Msg, Opts) ->
     try
     try
-        publish(Msg)
+        publish(Msg, Opts)
     catch
     catch
         Error:Reason:Stk ->
         Error:Reason:Stk ->
             ?SLOG(
             ?SLOG(

+ 4 - 1
apps/emqx/src/emqx_types.erl

@@ -270,7 +270,10 @@
         | {emqx_external_broker:dest(), topic(), deliver_result()}
         | {emqx_external_broker:dest(), topic(), deliver_result()}
         | persisted
         | persisted
     ]
     ]
-    | disconnect.
+    %% If schema validation failure action is set to `disconnect'.
+    | disconnect
+    %% If caller specifies `hook_prohibition_as_error => true'.
+    | {blocked, message()}.
 -type route() :: #route{}.
 -type route() :: #route{}.
 -type route_entry() :: {topic(), node()} | {topic, group()}.
 -type route_entry() :: {topic(), node()} | {topic, group()}.
 -type command() :: #command{}.
 -type command() :: #command{}.

+ 3 - 6
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -246,16 +246,13 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
         payload = Payload,
         payload = Payload,
         timestamp = erlang:system_time(millisecond)
         timestamp = erlang:system_time(millisecond)
     },
     },
-    case emqx_broker:safe_publish(Msg) of
-        [_ | _] ->
+    case emqx_broker:safe_publish(Msg, #{hook_prohibition_as_error => true}) of
+        Routes when is_list(Routes) ->
             emqx_metrics:inc_msg(Msg),
             emqx_metrics:inc_msg(Msg),
             ok;
             ok;
         disconnect ->
         disconnect ->
             error;
             error;
-        [] ->
-            %% Have to check previous logs to distinguish between schema validation
-            %% failure, no subscribers, blocked by authz, or anything else in the
-            %% `message.publish' hook evaluation.
+        {blocked, _Msg} ->
             error
             error
     end.
     end.
 
 

+ 0 - 2
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -140,8 +140,6 @@ console_print_action() ->
     #{<<"function">> => <<"console">>}.
     #{<<"function">> => <<"console">>}.
 
 
 basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) ->
 basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) ->
-    %% Subscribe to republish action target topic so there's at least one subscriber.
-    _ = emqx:subscribe(?REPUBLISH_TOPIC),
     %% Create Rule
     %% Create Rule
     RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
     RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
     SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>,
     SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>,