|
|
@@ -143,12 +143,15 @@ t_connection_with_authn_failed(_) ->
|
|
|
ok.
|
|
|
|
|
|
t_publish(_) ->
|
|
|
- Action = fun(Channel, Token) ->
|
|
|
- Topic = <<"/abc">>,
|
|
|
+ %% can publish to a normal topic
|
|
|
+ Topics = [
|
|
|
+ <<"abc">>,
|
|
|
+ %% can publish to a `/` leading topic
|
|
|
+ <<"/abc">>
|
|
|
+ ],
|
|
|
+ Action = fun(Topic, Channel, Token) ->
|
|
|
Payload = <<"123">>,
|
|
|
-
|
|
|
- TopicStr = binary_to_list(Topic),
|
|
|
- URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
|
|
|
+ URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
|
|
|
|
%% Sub topic first
|
|
|
emqx:subscribe(Topic),
|
|
|
@@ -164,24 +167,28 @@ t_publish(_) ->
|
|
|
?assert(false)
|
|
|
end
|
|
|
end,
|
|
|
- with_connection(Action).
|
|
|
+ with_connection(Topics, Action).
|
|
|
|
|
|
t_subscribe(_) ->
|
|
|
- Topic = <<"/abc">>,
|
|
|
- Fun = fun(Channel, Token) ->
|
|
|
- TopicStr = binary_to_list(Topic),
|
|
|
+ %% can subscribe to a normal topic
|
|
|
+ Topics = [
|
|
|
+ <<"abc">>,
|
|
|
+ %% can subscribe to a `/` leading topic
|
|
|
+ <<"/abc">>
|
|
|
+ ],
|
|
|
+ Fun = fun(Topic, Channel, Token) ->
|
|
|
Payload = <<"123">>,
|
|
|
-
|
|
|
- URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
|
|
|
+ URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
|
?LOGT("observer topic:~ts~n", [Topic]),
|
|
|
|
|
|
+ %% ensure subscribe succeed
|
|
|
timer:sleep(100),
|
|
|
[SubPid] = emqx:subscribers(Topic),
|
|
|
?assert(is_pid(SubPid)),
|
|
|
|
|
|
- %% Publish a message
|
|
|
+ %% publish a message
|
|
|
emqx:publish(emqx_message:make(Topic, Payload)),
|
|
|
{ok, content, Notify} = with_response(Channel),
|
|
|
?LOGT("observer get Notif=~p", [Notify]),
|
|
|
@@ -191,18 +198,27 @@ t_subscribe(_) ->
|
|
|
?assertEqual(Payload, PayloadRecv)
|
|
|
end,
|
|
|
|
|
|
- with_connection(Fun),
|
|
|
- timer:sleep(100),
|
|
|
+ with_connection(Topics, Fun),
|
|
|
|
|
|
- ?assertEqual([], emqx:subscribers(Topic)).
|
|
|
+ %% subscription removed if coap client disconnected
|
|
|
+ timer:sleep(100),
|
|
|
+ lists:foreach(
|
|
|
+ fun(Topic) ->
|
|
|
+ ?assertEqual([], emqx:subscribers(Topic))
|
|
|
+ end,
|
|
|
+ Topics
|
|
|
+ ).
|
|
|
|
|
|
t_un_subscribe(_) ->
|
|
|
- Topic = <<"/abc">>,
|
|
|
- Fun = fun(Channel, Token) ->
|
|
|
- TopicStr = binary_to_list(Topic),
|
|
|
+ %% can unsubscribe to a normal topic
|
|
|
+ Topics = [
|
|
|
+ <<"abc">>,
|
|
|
+ %% can unsubscribe to a `/` leading topic
|
|
|
+ <<"/abc">>
|
|
|
+ ],
|
|
|
+ Fun = fun(Topic, Channel, Token) ->
|
|
|
Payload = <<"123">>,
|
|
|
-
|
|
|
- URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
|
|
|
+ URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
|
|
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
|
@@ -219,16 +235,15 @@ t_un_subscribe(_) ->
|
|
|
?assertEqual([], emqx:subscribers(Topic))
|
|
|
end,
|
|
|
|
|
|
- with_connection(Fun).
|
|
|
+ with_connection(Topics, Fun).
|
|
|
|
|
|
t_observe_wildcard(_) ->
|
|
|
Fun = fun(Channel, Token) ->
|
|
|
%% resolve_url can't process wildcard with #
|
|
|
- Topic = <<"/abc/+">>,
|
|
|
- TopicStr = binary_to_list(Topic),
|
|
|
+ Topic = <<"abc/+">>,
|
|
|
Payload = <<"123">>,
|
|
|
|
|
|
- URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
|
|
|
+ URI = pubsub_uri(binary_to_list(Topic), Token),
|
|
|
Req = make_req(get, Payload, [{observe, 0}]),
|
|
|
{ok, content, _} = do_request(Channel, URI, Req),
|
|
|
?LOGT("observer topic:~ts~n", [Topic]),
|
|
|
@@ -238,7 +253,7 @@ t_observe_wildcard(_) ->
|
|
|
?assert(is_pid(SubPid)),
|
|
|
|
|
|
%% Publish a message
|
|
|
- PubTopic = <<"/abc/def">>,
|
|
|
+ PubTopic = <<"abc/def">>,
|
|
|
emqx:publish(emqx_message:make(PubTopic, Payload)),
|
|
|
{ok, content, Notify} = with_response(Channel),
|
|
|
|
|
|
@@ -320,7 +335,7 @@ t_clients_get_subscription_api(_) ->
|
|
|
|
|
|
{200, [Subs]} = request(get, Path),
|
|
|
|
|
|
- ?assertEqual(<<"/coap/observe">>, maps:get(topic, Subs)),
|
|
|
+ ?assertEqual(<<"coap/observe">>, maps:get(topic, Subs)),
|
|
|
|
|
|
observe(Channel, Token, false),
|
|
|
|
|
|
@@ -386,6 +401,9 @@ observe(Channel, Token, false) ->
|
|
|
{ok, nocontent, _Data} = do_request(Channel, URI, Req),
|
|
|
ok.
|
|
|
|
|
|
+pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
|
|
|
+ ?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.
|
|
|
+
|
|
|
make_req(Method) ->
|
|
|
make_req(Method, <<>>).
|
|
|
|
|
|
@@ -442,6 +460,16 @@ with_connection(Action) ->
|
|
|
end,
|
|
|
do(Fun).
|
|
|
|
|
|
+with_connection(Checks, Action) ->
|
|
|
+ Fun = fun(Channel) ->
|
|
|
+ Token = connection(Channel),
|
|
|
+ timer:sleep(100),
|
|
|
+ lists:foreach(fun(E) -> Action(E, Channel, Token) end, Checks),
|
|
|
+ disconnection(Channel, Token),
|
|
|
+ timer:sleep(100)
|
|
|
+ end,
|
|
|
+ do(Fun).
|
|
|
+
|
|
|
receive_deliver(Wait) ->
|
|
|
receive
|
|
|
{deliver, _, Msg} ->
|