|
|
@@ -308,12 +308,13 @@ deliver(Messages, Pid, Topic, Limiter) ->
|
|
|
no_receiver;
|
|
|
_ ->
|
|
|
BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
|
|
|
+ NMessages = filter_delivery(Messages, Topic),
|
|
|
case BatchSize of
|
|
|
0 ->
|
|
|
- deliver_to_client(Messages, Pid, Topic),
|
|
|
+ deliver_to_client(NMessages, Pid, Topic),
|
|
|
{ok, Limiter};
|
|
|
_ ->
|
|
|
- deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter)
|
|
|
+ deliver_in_batches(NMessages, BatchSize, Pid, Topic, Limiter)
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
@@ -334,28 +335,37 @@ deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
|
|
|
Drop
|
|
|
end.
|
|
|
|
|
|
+deliver_to_client([Msg | Rest], Pid, Topic) ->
|
|
|
+ Pid ! {deliver, Topic, Msg},
|
|
|
+ deliver_to_client(Rest, Pid, Topic);
|
|
|
+deliver_to_client([], _, _) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
-define(DELIVER_ALLOWED, true).
|
|
|
-define(DELIVER_NOT_ALLOWED, false).
|
|
|
-
|
|
|
-deliver_to_client([Msg | T], Pid, Topic) ->
|
|
|
- Pipe = emqx_utils:pipeline(
|
|
|
- [
|
|
|
- fun check_clientid_banned/2,
|
|
|
- fun 'check_prefixed_$_with_wildcard'/2
|
|
|
- ],
|
|
|
- {Msg, Topic},
|
|
|
- ?DELIVER_NOT_ALLOWED
|
|
|
- ),
|
|
|
- _ =
|
|
|
- case Pipe of
|
|
|
- {ok, _, ?DELIVER_ALLOWED} ->
|
|
|
- Pid ! {deliver, Topic, Msg};
|
|
|
- {error, _, _} ->
|
|
|
- ok
|
|
|
+-define(msg_pub_from_is_banned, msg_pub_from_is_banned).
|
|
|
+-define(msg_topic_not_match, msg_topic_not_match).
|
|
|
+
|
|
|
+filter_delivery(Messages, Topic) ->
|
|
|
+ FilterFun =
|
|
|
+ fun(Msg) ->
|
|
|
+ Pipe = emqx_utils:pipeline(
|
|
|
+ [
|
|
|
+ fun check_clientid_banned/2,
|
|
|
+ fun 'check_prefixed_$_with_wildcard'/2
|
|
|
+ ],
|
|
|
+ {Msg, Topic},
|
|
|
+ ?DELIVER_NOT_ALLOWED
|
|
|
+ ),
|
|
|
+ _ =
|
|
|
+ case Pipe of
|
|
|
+ {ok, _, ?DELIVER_ALLOWED} ->
|
|
|
+ true;
|
|
|
+ {error, _, _} ->
|
|
|
+ false
|
|
|
+ end
|
|
|
end,
|
|
|
- deliver_to_client(T, Pid, Topic);
|
|
|
-deliver_to_client([], _, _) ->
|
|
|
- ok.
|
|
|
+ lists:filter(FilterFun, Messages).
|
|
|
|
|
|
check_clientid_banned({Msg, _Topic} = Input, _) ->
|
|
|
case emqx_banned:check_clientid(Msg#message.from) of
|
|
|
@@ -363,29 +373,29 @@ check_clientid_banned({Msg, _Topic} = Input, _) ->
|
|
|
{ok, Input, ?DELIVER_ALLOWED};
|
|
|
true ->
|
|
|
?tp(
|
|
|
- notice,
|
|
|
+ debug,
|
|
|
ignore_retained_message_deliver,
|
|
|
#{
|
|
|
- reason => client_is_banned,
|
|
|
+ reason => ?msg_pub_from_is_banned,
|
|
|
clientid => Msg#message.from
|
|
|
}
|
|
|
),
|
|
|
- {error, client_is_banned, ?DELIVER_NOT_ALLOWED}
|
|
|
+ {error, ?msg_pub_from_is_banned, ?DELIVER_NOT_ALLOWED}
|
|
|
end.
|
|
|
|
|
|
'check_prefixed_$_with_wildcard'({Msg, Topic} = Input, _) ->
|
|
|
case emqx_topic:match(Msg#message.topic, Topic) of
|
|
|
false ->
|
|
|
?tp(
|
|
|
- notice,
|
|
|
+ debug,
|
|
|
ignore_retained_message_deliver,
|
|
|
#{
|
|
|
- reason => topic_not_match,
|
|
|
+ reason => ?msg_topic_not_match,
|
|
|
msg_topic => Msg#message.topic,
|
|
|
subscribed_topic => Topic
|
|
|
}
|
|
|
),
|
|
|
- {error, topic_not_match, ?DELIVER_NOT_ALLOWED};
|
|
|
+ {error, ?msg_topic_not_match, ?DELIVER_NOT_ALLOWED};
|
|
|
true ->
|
|
|
{ok, Input, ?DELIVER_ALLOWED}
|
|
|
end.
|