|
|
@@ -282,6 +282,34 @@ t_publish_as_persistent(_Config) ->
|
|
|
emqtt:stop(Pub)
|
|
|
end.
|
|
|
|
|
|
+t_publish_empty_topic_levels(_Config) ->
|
|
|
+ Sub = connect(<<?MODULE_STRING "1">>, true, 30),
|
|
|
+ Pub = connect(<<?MODULE_STRING "2">>, true, 30),
|
|
|
+ try
|
|
|
+ {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t//+//#">>, qos1),
|
|
|
+ Messages = [
|
|
|
+ {<<"t//1">>, <<"1">>},
|
|
|
+ {<<"t//1/">>, <<"2">>},
|
|
|
+ {<<"t//2//">>, <<"3">>},
|
|
|
+ {<<"t//2//foo">>, <<"4">>},
|
|
|
+ {<<"t//2/foo">>, <<"5">>},
|
|
|
+ {<<"t/3/bar">>, <<"6">>}
|
|
|
+ ],
|
|
|
+ [emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages],
|
|
|
+ Received = receive_messages(length(Messages), 1_500),
|
|
|
+ ?assertMatch(
|
|
|
+ [
|
|
|
+ #{topic := <<"t//1/">>, payload := <<"2">>},
|
|
|
+ #{topic := <<"t//2//">>, payload := <<"3">>},
|
|
|
+ #{topic := <<"t//2//foo">>, payload := <<"4">>}
|
|
|
+ ],
|
|
|
+ lists:sort(emqx_utils_maps:key_comparer(payload), Received)
|
|
|
+ )
|
|
|
+ after
|
|
|
+ emqtt:stop(Sub),
|
|
|
+ emqtt:stop(Pub)
|
|
|
+ end.
|
|
|
+
|
|
|
%%
|
|
|
|
|
|
connect(ClientId, CleanStart, EI) ->
|
|
|
@@ -322,15 +350,18 @@ consume(It) ->
|
|
|
end.
|
|
|
|
|
|
receive_messages(Count) ->
|
|
|
- lists:reverse(receive_messages(Count, [])).
|
|
|
+ receive_messages(Count, 5_000).
|
|
|
+
|
|
|
+receive_messages(Count, Timeout) ->
|
|
|
+ lists:reverse(receive_messages(Count, [], Timeout)).
|
|
|
|
|
|
-receive_messages(0, Msgs) ->
|
|
|
+receive_messages(0, Msgs, _Timeout) ->
|
|
|
Msgs;
|
|
|
-receive_messages(Count, Msgs) ->
|
|
|
+receive_messages(Count, Msgs, Timeout) ->
|
|
|
receive
|
|
|
{publish, Msg} ->
|
|
|
- receive_messages(Count - 1, [Msg | Msgs])
|
|
|
- after 5_000 ->
|
|
|
+ receive_messages(Count - 1, [Msg | Msgs], Timeout)
|
|
|
+ after Timeout ->
|
|
|
Msgs
|
|
|
end.
|
|
|
|