|
|
@@ -157,7 +157,7 @@ t_store_and_clean(_) ->
|
|
|
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
|
|
-t_retain_handling(_) ->
|
|
|
+t_retain_handling(Config) ->
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
|
|
|
@@ -173,11 +173,12 @@ t_retain_handling(_) ->
|
|
|
?assertEqual(0, length(receive_messages(1))),
|
|
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
|
|
|
|
|
|
- emqtt:publish(
|
|
|
+ publish(
|
|
|
C1,
|
|
|
<<"retained">>,
|
|
|
<<"this is a retained message">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
+ [{qos, 0}, {retain, true}],
|
|
|
+ Config
|
|
|
),
|
|
|
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
|
|
@@ -205,7 +206,7 @@ t_retain_handling(_) ->
|
|
|
emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
|
|
-t_wildcard_subscription(_) ->
|
|
|
+t_wildcard_subscription(Config) ->
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
emqtt:publish(
|
|
|
@@ -226,17 +227,19 @@ t_wildcard_subscription(_) ->
|
|
|
<<"this is a retained message 2">>,
|
|
|
[{qos, 0}, {retain, true}]
|
|
|
),
|
|
|
- emqtt:publish(
|
|
|
+ publish(
|
|
|
C1,
|
|
|
<<"/x/y/z">>,
|
|
|
<<"this is a retained message 3">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
+ [{qos, 0}, {retain, true}],
|
|
|
+ Config
|
|
|
),
|
|
|
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"/+/y/#">>, 0),
|
|
|
- ?assertEqual(4, length(receive_messages(4))),
|
|
|
+ Msgs = receive_messages(4),
|
|
|
+ ?assertEqual(4, length(Msgs), #{msgs => Msgs}),
|
|
|
|
|
|
emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
@@ -244,7 +247,7 @@ t_wildcard_subscription(_) ->
|
|
|
emqtt:publish(C1, <<"/x/y/z">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
|
|
-t_message_expiry(_) ->
|
|
|
+t_message_expiry(Config) ->
|
|
|
ConfMod = fun(Conf) ->
|
|
|
Conf#{<<"delivery_rate">> := <<"infinity">>}
|
|
|
end,
|
|
|
@@ -279,11 +282,12 @@ t_message_expiry(_) ->
|
|
|
<<"don't expire">>,
|
|
|
[{qos, 0}, {retain, true}]
|
|
|
),
|
|
|
- emqtt:publish(
|
|
|
+ publish(
|
|
|
C1,
|
|
|
<<"$SYS/retained/4">>,
|
|
|
<<"don't expire">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
+ [{qos, 0}, {retain, true}],
|
|
|
+ Config
|
|
|
),
|
|
|
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
|
|
@@ -307,14 +311,14 @@ t_message_expiry(_) ->
|
|
|
end,
|
|
|
with_conf(ConfMod, Case).
|
|
|
|
|
|
-t_message_expiry_2(_) ->
|
|
|
+t_message_expiry_2(Config) ->
|
|
|
ConfMod = fun(Conf) ->
|
|
|
Conf#{<<"msg_expiry_interval">> := <<"2s">>}
|
|
|
end,
|
|
|
Case = fun() ->
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
- emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
|
|
|
+ publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}], Config),
|
|
|
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
|
|
?assertEqual(1, length(receive_messages(1))),
|
|
|
@@ -348,7 +352,7 @@ t_table_full(_) ->
|
|
|
end,
|
|
|
with_conf(ConfMod, Case).
|
|
|
|
|
|
-t_clean(_) ->
|
|
|
+t_clean(Config) ->
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
emqtt:publish(
|
|
|
@@ -363,11 +367,12 @@ t_clean(_) ->
|
|
|
<<"this is a retained message 1">>,
|
|
|
[{qos, 0}, {retain, true}]
|
|
|
),
|
|
|
- emqtt:publish(
|
|
|
+ publish(
|
|
|
C1,
|
|
|
<<"retained/test/0">>,
|
|
|
<<"this is a retained message 2">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
+ [{qos, 0}, {retain, true}],
|
|
|
+ Config
|
|
|
),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
|
|
?assertEqual(3, length(receive_messages(3))),
|
|
|
@@ -871,3 +876,36 @@ make_limiter_json(Rate) ->
|
|
|
<<"initial">> => 0,
|
|
|
<<"burst">> => <<"0">>
|
|
|
}.
|
|
|
+
|
|
|
+publish(Client, Topic, Payload, Opts, TCConfig) ->
|
|
|
+ PublishOpts = publish_opts(TCConfig),
|
|
|
+ do_publish(Client, Topic, Payload, Opts, PublishOpts).
|
|
|
+
|
|
|
+publish_opts(TCConfig) ->
|
|
|
+ Timeout = proplists:get_value(publish_wait_timeout, TCConfig, undefined),
|
|
|
+ Predicate =
|
|
|
+ case proplists:get_value(publish_wait_predicate, TCConfig, undefined) of
|
|
|
+ undefined -> undefined;
|
|
|
+ {NEvents, Pred} -> {predicate, {NEvents, Pred, Timeout}};
|
|
|
+ Pred -> {predicate, {1, Pred, Timeout}}
|
|
|
+ end,
|
|
|
+ Sleep =
|
|
|
+ case proplists:get_value(sleep_after_publish, TCConfig, undefined) of
|
|
|
+ undefined -> undefined;
|
|
|
+ Time -> {sleep, Time}
|
|
|
+ end,
|
|
|
+ emqx_maybe:define(Predicate, Sleep).
|
|
|
+
|
|
|
+do_publish(Client, Topic, Payload, Opts, undefined) ->
|
|
|
+ emqtt:publish(Client, Topic, Payload, Opts);
|
|
|
+do_publish(Client, Topic, Payload, Opts, {predicate, {NEvents, Predicate, Timeout}}) ->
|
|
|
+ %% Do not delete this clause: it's used by other retainer implementation tests
|
|
|
+ {ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, Timeout),
|
|
|
+ Res = emqtt:publish(Client, Topic, Payload, Opts),
|
|
|
+ {ok, _} = snabbkaffe:receive_events(SRef0),
|
|
|
+ Res;
|
|
|
+do_publish(Client, Topic, Payload, Opts, {sleep, Time}) ->
|
|
|
+ %% Do not delete this clause: it's used by other retainer implementation tests
|
|
|
+ Res = emqtt:publish(Client, Topic, Payload, Opts),
|
|
|
+ ct:sleep(Time),
|
|
|
+ Res.
|