|
|
@@ -286,18 +286,38 @@ t_wildcard_subscription(Config) ->
|
|
|
Config
|
|
|
),
|
|
|
|
|
|
+ snabbkaffe:start_trace(),
|
|
|
+ SnabbkaffeSubFun = fun(NEvents) ->
|
|
|
+ snabbkaffe:subscribe(
|
|
|
+ ?match_event(#{?snk_kind := ignore_retained_message_due_to_topic_not_match}),
|
|
|
+ NEvents,
|
|
|
+ _Timeout = 10000,
|
|
|
+ 0
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ SnabbkaffeReceiveAndAssert = fun(SubRef, NEvents) ->
|
|
|
+ {ok, Trace} = snabbkaffe:receive_events(SubRef),
|
|
|
+ ?assertEqual(
|
|
|
+ NEvents, length(?of_kind(ignore_retained_message_due_to_topic_not_match, Trace))
|
|
|
+ )
|
|
|
+ end,
|
|
|
+
|
|
|
%%%%%%%%%%
|
|
|
%% C1 subscribes to `#'
|
|
|
- %% The order in which messages are received is not always the same as the order in which they are published.
|
|
|
- %% The received order follows the order in which the indexes match.
|
|
|
- %% i.e.
|
|
|
- %% The first level of the topic `/t/3` is empty.
|
|
|
- %% So it will be the first message that be matched and be sent.
|
|
|
+ {ok, SubRef1} = SnabbkaffeSubFun(2),
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"#">>, 0),
|
|
|
+ %% Matched 5 msgs but only receive 3 msgs, 2 ignored
|
|
|
+ %% (`$test/t/0` and `$test/test/1` with `$` prefix in topic are ignored)
|
|
|
+ SnabbkaffeReceiveAndAssert(SubRef1, 2),
|
|
|
Msgs1 = receive_messages(3),
|
|
|
?assertMatch(
|
|
|
+ %% The order in which messages are received is not always the same as the order in which they are published.
|
|
|
+ %% The received order follows the order in which the indexes match.
|
|
|
+ %% i.e.
|
|
|
+ %% The first level of the topic `/t/3` is empty.
|
|
|
+ %% So it will be the first message that be matched and be sent.
|
|
|
[
|
|
|
#{topic := <<"/t/3">>},
|
|
|
#{topic := <<"t/1">>},
|
|
|
@@ -313,6 +333,7 @@ t_wildcard_subscription(Config) ->
|
|
|
{ok, C2} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C2),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C2, <<"$test/#">>, 0),
|
|
|
+ %% Matched 2 msgs and receive them all, no ignored
|
|
|
Msgs2 = receive_messages(2),
|
|
|
?assertMatch(
|
|
|
[
|
|
|
@@ -329,6 +350,7 @@ t_wildcard_subscription(Config) ->
|
|
|
{ok, C3} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C3),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C3, <<"+/+">>, 0),
|
|
|
+ %% Matched 2 msgs and receive them all, no ignored
|
|
|
Msgs3 = receive_messages(2),
|
|
|
?assertMatch(
|
|
|
[
|
|
|
@@ -342,9 +364,13 @@ t_wildcard_subscription(Config) ->
|
|
|
|
|
|
%%%%%%%%%%
|
|
|
%% C4 subscribes to `+/t/#'
|
|
|
+ {ok, SubRef4} = SnabbkaffeSubFun(1),
|
|
|
{ok, C4} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C4),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C4, <<"+/t/#">>, 0),
|
|
|
+ %% Matched 2 msgs but only receive 1 msgs, 1 ignored
|
|
|
+ %% (`$test/t/0` with `$` prefix in topic are ignored)
|
|
|
+ SnabbkaffeReceiveAndAssert(SubRef4, 1),
|
|
|
Msgs4 = receive_messages(1),
|
|
|
?assertMatch(
|
|
|
[
|
|
|
@@ -355,6 +381,8 @@ t_wildcard_subscription(Config) ->
|
|
|
),
|
|
|
ok = emqtt:disconnect(C4),
|
|
|
|
|
|
+ snabbkaffe:stop(),
|
|
|
+
|
|
|
ok.
|
|
|
|
|
|
t_message_expiry(Config) ->
|
|
|
@@ -798,7 +826,7 @@ t_deliver_when_banned(_) ->
|
|
|
snabbkaffe:start_trace(),
|
|
|
{ok, SubRef} =
|
|
|
snabbkaffe:subscribe(
|
|
|
- ?match_event(#{?snk_kind := ignore_retained_message_deliver}),
|
|
|
+ ?match_event(#{?snk_kind := ignore_retained_message_due_to_banned}),
|
|
|
_NEvents = 3,
|
|
|
_Timeout = 10000,
|
|
|
0
|
|
|
@@ -807,7 +835,7 @@ t_deliver_when_banned(_) ->
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
|
|
|
|
|
|
{ok, Trace} = snabbkaffe:receive_events(SubRef),
|
|
|
- ?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
|
|
|
+ ?assertEqual(3, length(?of_kind(ignore_retained_message_due_to_banned, Trace))),
|
|
|
snabbkaffe:stop(),
|
|
|
emqx_banned:delete(Who),
|
|
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
|
|
|
@@ -923,7 +951,7 @@ receive_messages(Count, Msgs) ->
|
|
|
ct:log("Msg: ~p ~n", [Msg]),
|
|
|
receive_messages(Count - 1, [Msg | Msgs]);
|
|
|
Other ->
|
|
|
- ct:log("Other Msg: ~p~n", [Other]),
|
|
|
+ ct:print("Other Msg: ~p~n", [Other]),
|
|
|
receive_messages(Count, Msgs)
|
|
|
after 2000 ->
|
|
|
Msgs
|