|
|
@@ -22,6 +22,8 @@
|
|
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
+-include_lib("common_test/include/ct.hrl").
|
|
|
+
|
|
|
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
|
|
|
|
|
|
all() ->
|
|
|
@@ -44,6 +46,8 @@ groups() ->
|
|
|
{pubsub, [sequence],
|
|
|
[subscribe_unsubscribe,
|
|
|
publish, pubsub,
|
|
|
+ t_local_subscribe,
|
|
|
+ t_shared_subscribe,
|
|
|
'pubsub#', 'pubsub+']},
|
|
|
{router, [sequence],
|
|
|
[router_add_del,
|
|
|
@@ -148,6 +152,37 @@ pubsub(_) ->
|
|
|
timer:sleep(20),
|
|
|
emqttd:unsubscribe(<<"a/b/c">>).
|
|
|
|
|
|
+t_local_subscribe(_) ->
|
|
|
+ emqttd:subscribe("$local/topic0"),
|
|
|
+ emqttd:subscribe("$local/topic1", <<"x">>),
|
|
|
+ emqttd:subscribe("$local/topic2", <<"x">>, [{qos, 2}]),
|
|
|
+ timer:sleep(10),
|
|
|
+ ?assertEqual([self()], emqttd:subscribers("$local/topic0")),
|
|
|
+ ?assertEqual([<<"x">>], emqttd:subscribers("$local/topic1")),
|
|
|
+ ?assertEqual([{<<"$local/topic1">>,<<"x">>,[]},{<<"$local/topic2">>,<<"x">>,[{qos,2}]}], emqttd:subscriptions(<<"x">>)),
|
|
|
+
|
|
|
+ ?assertEqual(ok, emqttd:unsubscribe("$local/topic0")),
|
|
|
+ ?assertMatch({error, {subscription_not_found, _}}, emqttd:unsubscribe("$local/topic0")),
|
|
|
+ ?assertEqual(ok, emqttd:unsubscribe("$local/topic1", <<"x">>)),
|
|
|
+ ?assertEqual(ok, emqttd:unsubscribe("$local/topic2", <<"x">>)),
|
|
|
+ ?assertEqual([], emqttd:subscribers("topic1")),
|
|
|
+ ?assertEqual([], emqttd:subscriptions(<<"x">>)).
|
|
|
+
|
|
|
+t_shared_subscribe(_) ->
|
|
|
+ emqttd:subscribe("$local/$share/group1/topic1"),
|
|
|
+ emqttd:subscribe("$share/group2/topic2"),
|
|
|
+ emqttd:subscribe("$queue/topic3"),
|
|
|
+ timer:sleep(10),
|
|
|
+ ?assertEqual([self()], emqttd:subscribers(<<"$local/$share/group1/topic1">>)),
|
|
|
+ ?assertEqual([{<<"$local/$share/group1/topic1">>, self(), []},
|
|
|
+ {<<"$queue/topic3">>, self(), []},
|
|
|
+ {<<"$share/group2/topic2">>, self(), []}],
|
|
|
+ lists:sort(emqttd:subscriptions(self()))),
|
|
|
+ emqttd:unsubscribe("$local/$share/group1/topic1"),
|
|
|
+ emqttd:unsubscribe("$share/group2/topic2"),
|
|
|
+ emqttd:unsubscribe("$queue/topic3"),
|
|
|
+ ?assertEqual([], lists:sort(emqttd:subscriptions(self()))).
|
|
|
+
|
|
|
'pubsub#'(_) ->
|
|
|
emqttd:subscribe(<<"a/#">>),
|
|
|
timer:sleep(10),
|