|
|
@@ -20,9 +20,6 @@ all() ->
|
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
- %% avoid inter-suite flakiness...
|
|
|
- application:stop(emqx),
|
|
|
- application:stop(emqx_durable_storage),
|
|
|
TCApps = emqx_cth_suite:start(
|
|
|
app_specs(),
|
|
|
#{work_dir => ?config(priv_dir, Config)}
|
|
|
@@ -36,8 +33,16 @@ end_per_suite(Config) ->
|
|
|
|
|
|
init_per_testcase(t_session_subscription_idempotency, Config) ->
|
|
|
Cluster = cluster(#{n => 1}),
|
|
|
- Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}),
|
|
|
- [{cluster, Cluster}, {nodes, Nodes} | Config];
|
|
|
+ ClusterOpts = #{work_dir => ?config(priv_dir, Config)},
|
|
|
+ NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
|
|
|
+ Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts),
|
|
|
+ [
|
|
|
+ {cluster, Cluster},
|
|
|
+ {node_specs, NodeSpecs},
|
|
|
+ {cluster_opts, ClusterOpts},
|
|
|
+ {nodes, Nodes}
|
|
|
+ | Config
|
|
|
+ ];
|
|
|
init_per_testcase(_TestCase, Config) ->
|
|
|
Config.
|
|
|
|
|
|
@@ -92,12 +97,28 @@ get_all_iterator_ids(Node) ->
|
|
|
emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
|
|
|
end).
|
|
|
|
|
|
+wait_nodeup(Node) ->
|
|
|
+ ?retry(
|
|
|
+ _Sleep0 = 500,
|
|
|
+ _Attempts0 = 50,
|
|
|
+ pong = net_adm:ping(Node)
|
|
|
+ ).
|
|
|
+
|
|
|
+wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) ->
|
|
|
+ #{override_env := Env} = proplists:get_value(gen_rpc, Apps),
|
|
|
+ Port = proplists:get_value(tcp_server_port, Env),
|
|
|
+ ?retry(
|
|
|
+ _Sleep0 = 500,
|
|
|
+ _Attempts0 = 50,
|
|
|
+ false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port)
|
|
|
+ ).
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Testcases
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
t_session_subscription_idempotency(Config) ->
|
|
|
- Cluster = ?config(cluster, Config),
|
|
|
+ [Node1Spec | _] = ?config(node_specs, Config),
|
|
|
[Node1] = ?config(nodes, Config),
|
|
|
Port = get_mqtt_port(Node1, tcp),
|
|
|
SubTopicFilter = <<"t/+">>,
|
|
|
@@ -119,13 +140,25 @@ t_session_subscription_idempotency(Config) ->
|
|
|
|
|
|
spawn_link(fun() ->
|
|
|
?tp(will_restart_node, #{}),
|
|
|
- ct:pal("stopping node ~p", [Node1]),
|
|
|
- ok = emqx_cth_cluster:stop_node(Node1),
|
|
|
- ct:pal("stopped node ~p; restarting...", [Node1]),
|
|
|
- [Node1] = emqx_cth_cluster:start(Cluster, #{
|
|
|
- work_dir => ?config(priv_dir, Config),
|
|
|
- skip_clean_suite_state_check => true
|
|
|
- }),
|
|
|
+ ct:pal("restarting node ~p", [Node1]),
|
|
|
+ true = monitor_node(Node1, true),
|
|
|
+ ok = erpc:call(Node1, init, restart, []),
|
|
|
+ receive
|
|
|
+ {nodedown, Node1} ->
|
|
|
+ ok
|
|
|
+ after 10_000 ->
|
|
|
+ ct:fail("node ~p didn't stop", [Node1])
|
|
|
+ end,
|
|
|
+ ct:pal("waiting for nodeup ~p", [Node1]),
|
|
|
+ wait_nodeup(Node1),
|
|
|
+ wait_gen_rpc_down(Node1Spec),
|
|
|
+ ct:pal("restarting apps on ~p", [Node1]),
|
|
|
+ Apps = maps:get(apps, Node1Spec),
|
|
|
+ ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
|
|
|
+ _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
|
|
|
+ %% have to re-inject this so that we may stop the node succesfully at the
|
|
|
+ %% end....
|
|
|
+ ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
|
|
|
ct:pal("node ~p restarted", [Node1]),
|
|
|
?tp(restarted_node, #{}),
|
|
|
ok
|