|
|
@@ -256,61 +256,67 @@ t_wildcard_subscription(_) ->
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
|
|
t_message_expiry(_) ->
|
|
|
- {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
- {ok, _} = emqtt:connect(C1),
|
|
|
+ ConfMod = fun(Conf) ->
|
|
|
+ Conf#{<<"delivery_rate">> := <<"infinity">>}
|
|
|
+ end,
|
|
|
+ Case = fun() ->
|
|
|
+ {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
+ {ok, _} = emqtt:connect(C1),
|
|
|
|
|
|
- emqtt:publish(
|
|
|
- C1,
|
|
|
- <<"retained/0">>,
|
|
|
- #{'Message-Expiry-Interval' => 0},
|
|
|
- <<"don't expire">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
- ),
|
|
|
- emqtt:publish(
|
|
|
- C1,
|
|
|
- <<"retained/1">>,
|
|
|
- #{'Message-Expiry-Interval' => 2},
|
|
|
- <<"expire">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
- ),
|
|
|
- emqtt:publish(
|
|
|
- C1,
|
|
|
- <<"retained/2">>,
|
|
|
- #{'Message-Expiry-Interval' => 5},
|
|
|
- <<"don't expire">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
- ),
|
|
|
- emqtt:publish(
|
|
|
- C1,
|
|
|
- <<"retained/3">>,
|
|
|
- <<"don't expire">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
- ),
|
|
|
- emqtt:publish(
|
|
|
- C1,
|
|
|
- <<"$SYS/retained/4">>,
|
|
|
- <<"don't expire">>,
|
|
|
- [{qos, 0}, {retain, true}]
|
|
|
- ),
|
|
|
+ emqtt:publish(
|
|
|
+ C1,
|
|
|
+ <<"retained/0">>,
|
|
|
+ #{'Message-Expiry-Interval' => 0},
|
|
|
+ <<"don't expire">>,
|
|
|
+ [{qos, 0}, {retain, true}]
|
|
|
+ ),
|
|
|
+ emqtt:publish(
|
|
|
+ C1,
|
|
|
+ <<"retained/1">>,
|
|
|
+ #{'Message-Expiry-Interval' => 2},
|
|
|
+ <<"expire">>,
|
|
|
+ [{qos, 0}, {retain, true}]
|
|
|
+ ),
|
|
|
+ emqtt:publish(
|
|
|
+ C1,
|
|
|
+ <<"retained/2">>,
|
|
|
+ #{'Message-Expiry-Interval' => 5},
|
|
|
+ <<"don't expire">>,
|
|
|
+ [{qos, 0}, {retain, true}]
|
|
|
+ ),
|
|
|
+ emqtt:publish(
|
|
|
+ C1,
|
|
|
+ <<"retained/3">>,
|
|
|
+ <<"don't expire">>,
|
|
|
+ [{qos, 0}, {retain, true}]
|
|
|
+ ),
|
|
|
+ emqtt:publish(
|
|
|
+ C1,
|
|
|
+ <<"$SYS/retained/4">>,
|
|
|
+ <<"don't expire">>,
|
|
|
+ [{qos, 0}, {retain, true}]
|
|
|
+ ),
|
|
|
|
|
|
- {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
|
|
- {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
|
|
|
- ?assertEqual(5, length(receive_messages(5))),
|
|
|
- {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
|
|
|
- {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>),
|
|
|
+ {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
|
|
+ {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
|
|
|
+ ?assertEqual(5, length(receive_messages(5))),
|
|
|
+ {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
|
|
|
+ {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>),
|
|
|
|
|
|
- timer:sleep(3000),
|
|
|
- {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
|
|
- {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
|
|
|
- ?assertEqual(4, length(receive_messages(5))),
|
|
|
+ timer:sleep(3000),
|
|
|
+ {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
|
|
+ {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
|
|
|
+ ?assertEqual(4, length(receive_messages(5))),
|
|
|
|
|
|
- emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
- emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
- emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
- emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
- emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
+ emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
+ emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
+ emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
+ emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
+ emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]),
|
|
|
|
|
|
- ok = emqtt:disconnect(C1).
|
|
|
+ ok = emqtt:disconnect(C1)
|
|
|
+ end,
|
|
|
+ with_conf(ConfMod, Case).
|
|
|
|
|
|
t_message_expiry_2(_) ->
|
|
|
ConfMod = fun(Conf) ->
|
|
|
@@ -410,6 +416,7 @@ t_flow_control(_) ->
|
|
|
JsonCfg = make_limiter_json(<<"1/1s">>),
|
|
|
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
|
|
|
emqx_retainer:update_config(#{
|
|
|
+ <<"delivery_rate">> => <<"1/1s">>,
|
|
|
<<"flow_control">> =>
|
|
|
#{
|
|
|
<<"batch_read_number">> => 1,
|