|
|
@@ -20,6 +20,8 @@
|
|
|
|
|
|
-include("emqttd.hrl").
|
|
|
|
|
|
+-include_lib("eunit/include/eunit.hrl").
|
|
|
+
|
|
|
all() ->
|
|
|
[{group, pubsub},
|
|
|
{group, router},
|
|
|
@@ -38,7 +40,8 @@ groups() ->
|
|
|
create_subscription,
|
|
|
subscribe_unsubscribe,
|
|
|
publish, pubsub,
|
|
|
- 'pubsub#', 'pubsub+']},
|
|
|
+ 'pubsub#', 'pubsub+',
|
|
|
+ pubsub_queue]},
|
|
|
{router, [sequence],
|
|
|
[router_add_del,
|
|
|
router_print,
|
|
|
@@ -99,7 +102,7 @@ create_subscription(_) ->
|
|
|
[#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
|
|
|
= emqttd_backend:lookup_subscriptions(<<"clientId">>),
|
|
|
ok = emqttd_backend:del_subscriptions(<<"clientId">>),
|
|
|
- [] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
|
|
|
+ ?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)).
|
|
|
|
|
|
subscribe_unsubscribe(_) ->
|
|
|
ok = emqttd:subscribe(<<"topic/subunsub">>),
|
|
|
@@ -114,7 +117,7 @@ publish(_) ->
|
|
|
ok = emqttd:subscribe(<<"test/+">>),
|
|
|
timer:sleep(10),
|
|
|
emqttd:publish(Msg),
|
|
|
- true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end.
|
|
|
+ ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
|
|
|
|
|
|
pubsub(_) ->
|
|
|
Self = self(),
|
|
|
@@ -124,7 +127,7 @@ pubsub(_) ->
|
|
|
[{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self),
|
|
|
[{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>),
|
|
|
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
|
- true = receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end,
|
|
|
+ ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
|
|
|
spawn(fun() ->
|
|
|
emqttd:subscribe(<<"a/b/c">>),
|
|
|
emqttd:subscribe(<<"c/d/e">>),
|
|
|
@@ -138,16 +141,42 @@ pubsub(_) ->
|
|
|
emqttd:subscribe(<<"a/#">>),
|
|
|
timer:sleep(10),
|
|
|
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
|
- true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end,
|
|
|
+ ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end),
|
|
|
emqttd:unsubscribe(<<"a/#">>).
|
|
|
|
|
|
'pubsub+'(_) ->
|
|
|
emqttd:subscribe(<<"a/+/+">>),
|
|
|
timer:sleep(10),
|
|
|
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
|
|
|
- true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end,
|
|
|
+ ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
|
|
|
emqttd:unsubscribe(<<"a/+/+">>).
|
|
|
|
|
|
+pubsub_queue(_) ->
|
|
|
+ Self = self(), Q = <<"$queue/abc">>,
|
|
|
+ SubFun = fun() ->
|
|
|
+ emqttd:subscribe(Q),
|
|
|
+ {ok, Msgs} = loop_recv(Q, 10),
|
|
|
+ Self ! {recv, self(), Msgs}
|
|
|
+ end,
|
|
|
+ Sub1 = spawn(SubFun), Sub2 = spawn(SubFun),
|
|
|
+ timer:sleep(5),
|
|
|
+ emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)),
|
|
|
+ emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)),
|
|
|
+ emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)),
|
|
|
+ ?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end),
|
|
|
+ ?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end).
|
|
|
+
|
|
|
+loop_recv(Topic, Timeout) ->
|
|
|
+ loop_recv(Topic, Timeout, []).
|
|
|
+
|
|
|
+loop_recv(Topic, Timeout, Acc) ->
|
|
|
+ receive
|
|
|
+ {dispatch, Topic, Msg} ->
|
|
|
+ loop_recv(Topic, Timeout, [Msg|Acc])
|
|
|
+ after
|
|
|
+ Timeout -> {ok, Acc}
|
|
|
+ end.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Router Test
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -293,7 +322,7 @@ dispatch_retained_messages(_) ->
|
|
|
payload = <<"payload">>},
|
|
|
emqttd_retainer:retain(Msg),
|
|
|
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
|
|
|
- true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
|
|
|
+ ?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end),
|
|
|
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
|
|
|
[] = emqttd_backend:read_messages(<<"a/b/c">>).
|
|
|
|