|
|
@@ -334,25 +334,62 @@ deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
|
|
|
Drop
|
|
|
end.
|
|
|
|
|
|
+-define(DELIVER_ALLOWED, true).
|
|
|
+-define(DELIVER_NOT_ALLOWED, false).
|
|
|
+
|
|
|
deliver_to_client([Msg | T], Pid, Topic) ->
|
|
|
+ Pipe = emqx_utils:pipeline(
|
|
|
+ [
|
|
|
+ fun check_clientid_banned/2,
|
|
|
+ fun 'check_prefixed_$_with_wildcard'/2
|
|
|
+ ],
|
|
|
+ {Msg, Topic},
|
|
|
+ ?DELIVER_NOT_ALLOWED
|
|
|
+ ),
|
|
|
_ =
|
|
|
- case emqx_banned:check_clientid(Msg#message.from) of
|
|
|
- false ->
|
|
|
+ case Pipe of
|
|
|
+ {ok, _, ?DELIVER_ALLOWED} ->
|
|
|
Pid ! {deliver, Topic, Msg};
|
|
|
- true ->
|
|
|
- ?tp(
|
|
|
- notice,
|
|
|
- ignore_retained_message_deliver,
|
|
|
- #{
|
|
|
- reason => "client is banned",
|
|
|
- clientid => Msg#message.from
|
|
|
- }
|
|
|
- )
|
|
|
+ {error, _, _} ->
|
|
|
+ ok
|
|
|
end,
|
|
|
deliver_to_client(T, Pid, Topic);
|
|
|
deliver_to_client([], _, _) ->
|
|
|
ok.
|
|
|
|
|
|
+check_clientid_banned({Msg, _Topic} = Input, _) ->
|
|
|
+ case emqx_banned:check_clientid(Msg#message.from) of
|
|
|
+ false ->
|
|
|
+ {ok, Input, ?DELIVER_ALLOWED};
|
|
|
+ true ->
|
|
|
+ ?tp(
|
|
|
+ notice,
|
|
|
+ ignore_retained_message_deliver,
|
|
|
+ #{
|
|
|
+ reason => client_is_banned,
|
|
|
+ clientid => Msg#message.from
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ {error, client_is_banned, ?DELIVER_NOT_ALLOWED}
|
|
|
+ end.
|
|
|
+
|
|
|
+'check_prefixed_$_with_wildcard'({Msg, Topic} = Input, _) ->
|
|
|
+ case emqx_topic:match(Msg#message.topic, Topic) of
|
|
|
+ false ->
|
|
|
+ ?tp(
|
|
|
+ notice,
|
|
|
+ ignore_retained_message_deliver,
|
|
|
+ #{
|
|
|
+ reason => topic_not_match,
|
|
|
+ msg_topic => Msg#message.topic,
|
|
|
+ subscribed_topic => Topic
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ {error, topic_not_match, ?DELIVER_NOT_ALLOWED};
|
|
|
+ true ->
|
|
|
+ {ok, Input, ?DELIVER_ALLOWED}
|
|
|
+ end.
|
|
|
+
|
|
|
take(N, List) ->
|
|
|
take(N, List, 0, []).
|
|
|
|