|
|
@@ -26,59 +26,38 @@
|
|
|
|
|
|
all() -> emqx_ct:all(?MODULE).
|
|
|
|
|
|
+-define(BASE_CONF, <<"""
|
|
|
+emqx_retainer {
|
|
|
+ enable = true
|
|
|
+ msg_clear_interval = 0s
|
|
|
+ msg_expiry_interval = 0s
|
|
|
+ max_payload_size = 1MB
|
|
|
+ flow_control {
|
|
|
+ max_read_number = 0
|
|
|
+ msg_deliver_quota = 0
|
|
|
+ quota_release_interval = 0s
|
|
|
+ }
|
|
|
+ config {
|
|
|
+ type = built_in_database
|
|
|
+ storage_type = ram
|
|
|
+ max_retained_messages = 0
|
|
|
+ }
|
|
|
+ }""">>).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Setups
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
- application:stop(emqx_retainer),
|
|
|
- emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1),
|
|
|
+ ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF),
|
|
|
+ emqx_ct_helpers:start_apps([emqx_retainer]),
|
|
|
Config.
|
|
|
|
|
|
end_per_suite(_Config) ->
|
|
|
emqx_ct_helpers:stop_apps([emqx_retainer]).
|
|
|
-
|
|
|
-init_per_testcase(TestCase, Config) ->
|
|
|
- emqx_retainer:clean(),
|
|
|
- DefaultCfg = new_emqx_retainer_conf(),
|
|
|
- NewCfg = case TestCase of
|
|
|
- t_message_expiry_2 ->
|
|
|
- DefaultCfg#{msg_expiry_interval := 2000};
|
|
|
- t_flow_control ->
|
|
|
- DefaultCfg#{flow_control := #{max_read_number => 1,
|
|
|
- msg_deliver_quota => 1,
|
|
|
- quota_release_interval => timer:seconds(1)}};
|
|
|
- _ ->
|
|
|
- DefaultCfg
|
|
|
- end,
|
|
|
- emqx_retainer:update_config(NewCfg),
|
|
|
- application:ensure_all_started(emqx_retainer),
|
|
|
- Config.
|
|
|
-
|
|
|
-set_special_configs(emqx_retainer) ->
|
|
|
- init_emqx_retainer_conf();
|
|
|
-set_special_configs(_) ->
|
|
|
- ok.
|
|
|
-
|
|
|
-init_emqx_retainer_conf() ->
|
|
|
- emqx_config:put([?APP], new_emqx_retainer_conf()).
|
|
|
-
|
|
|
-new_emqx_retainer_conf() ->
|
|
|
- #{enable => true,
|
|
|
- msg_expiry_interval => 0,
|
|
|
- msg_clear_interval => 0,
|
|
|
- config => #{type => built_in_database,
|
|
|
- max_retained_messages => 0,
|
|
|
- storage_type => ram},
|
|
|
- flow_control => #{max_read_number => 0,
|
|
|
- msg_deliver_quota => 0,
|
|
|
- quota_release_interval => 0},
|
|
|
- max_payload_size => 1024 * 1024}.
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test Cases
|
|
|
%%--------------------------------------------------------------------
|
|
|
-
|
|
|
t_store_and_clean(_) ->
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
@@ -184,13 +163,14 @@ t_message_expiry(_) ->
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
|
|
t_message_expiry_2(_) ->
|
|
|
+ emqx_retainer:update_config(#{<<"msg_expiry_interval">> => <<"2s">>}),
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
|
|
|
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
|
|
?assertEqual(1, length(receive_messages(1))),
|
|
|
- timer:sleep(3000),
|
|
|
+ timer:sleep(4000),
|
|
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
|
|
?assertEqual(0, length(receive_messages(1))),
|
|
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
|
|
@@ -216,6 +196,9 @@ t_clean(_) ->
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
|
|
t_flow_control(_) ->
|
|
|
+ emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1,
|
|
|
+ <<"msg_deliver_quota">> => 1,
|
|
|
+ <<"quota_release_interval">> => <<"1s">>}}),
|
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
|
{ok, _} = emqtt:connect(C1),
|
|
|
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|