|
@@ -114,15 +114,7 @@ end_per_group(_Group, _Config) ->
|
|
|
|
|
|
|
|
t_takeover(Config) ->
|
|
t_takeover(Config) ->
|
|
|
process_flag(trap_exit, true),
|
|
process_flag(trap_exit, true),
|
|
|
- Vsn = atom_to_list(?config(mqtt_vsn, Config)),
|
|
|
|
|
- Persist =
|
|
|
|
|
- case ?config(persistence_enabled, Config) of
|
|
|
|
|
- true ->
|
|
|
|
|
- "persistent-";
|
|
|
|
|
- false ->
|
|
|
|
|
- "not-persistent-"
|
|
|
|
|
- end,
|
|
|
|
|
- ClientId = iolist_to_binary("t_takeover-" ++ Persist ++ Vsn),
|
|
|
|
|
|
|
+ ClientId = make_client_id(?FUNCTION_NAME, Config),
|
|
|
ClientOpts = [
|
|
ClientOpts = [
|
|
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
|
|
{clean_start, false}
|
|
{clean_start, false}
|
|
@@ -153,7 +145,7 @@ t_takeover(Config) ->
|
|
|
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
|
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
|
|
{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]},
|
|
{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]},
|
|
|
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
|
|
[{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
|
|
|
- {fun stop_client/1, []}
|
|
|
|
|
|
|
+ {fun stop_the_last_client/1, []}
|
|
|
]),
|
|
]),
|
|
|
|
|
|
|
|
Sleep =
|
|
Sleep =
|
|
@@ -184,7 +176,7 @@ t_takeover(Config) ->
|
|
|
|
|
|
|
|
t_takeover_willmsg(Config) ->
|
|
t_takeover_willmsg(Config) ->
|
|
|
process_flag(trap_exit, true),
|
|
process_flag(trap_exit, true),
|
|
|
- ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
|
|
|
+ ClientId = make_client_id(?FUNCTION_NAME, Config),
|
|
|
WillTopic = <<ClientId/binary, <<"_willtopic">>/binary>>,
|
|
WillTopic = <<ClientId/binary, <<"_willtopic">>/binary>>,
|
|
|
Middle = ?CNT div 2,
|
|
Middle = ?CNT div 2,
|
|
|
Client1Msgs = messages(ClientId, 0, Middle),
|
|
Client1Msgs = messages(ClientId, 0, Middle),
|
|
@@ -208,25 +200,27 @@ t_takeover_willmsg(Config) ->
|
|
|
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
|
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
|
|
%% WHEN client reconnect with clean_start = false
|
|
%% WHEN client reconnect with clean_start = false
|
|
|
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
|
{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
|
|
|
- [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs]
|
|
|
|
|
|
|
+ [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
|
|
|
|
|
+ {fun stop_the_last_client/1, []}
|
|
|
]),
|
|
]),
|
|
|
|
|
+ Sleep =
|
|
|
|
|
+ case ?config(persistence_enabled, Config) of
|
|
|
|
|
+ true -> 2_000;
|
|
|
|
|
+ false -> ?SLEEP
|
|
|
|
|
+ end,
|
|
|
|
|
|
|
|
FCtx = lists:foldl(
|
|
FCtx = lists:foldl(
|
|
|
fun({Fun, Args}, Ctx) ->
|
|
fun({Fun, Args}, Ctx) ->
|
|
|
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
|
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
|
|
apply(Fun, [Ctx | Args])
|
|
apply(Fun, [Ctx | Args])
|
|
|
end,
|
|
end,
|
|
|
- #{persistence_enabled => ?config(persistence_enabled, Config)},
|
|
|
|
|
|
|
+ #{persistence_enabled => ?config(persistence_enabled, Config), sleep => Sleep},
|
|
|
Commands
|
|
Commands
|
|
|
),
|
|
),
|
|
|
|
|
|
|
|
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
|
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
|
|
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
|
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
|
|
- Sleep =
|
|
|
|
|
- case ?config(persistence_enabled, Config) of
|
|
|
|
|
- true -> 2_000;
|
|
|
|
|
- false -> ?SLEEP
|
|
|
|
|
- end,
|
|
|
|
|
|
|
+ ?assertReceive({'EXIT', CPid2, normal}),
|
|
|
Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)],
|
|
Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)],
|
|
|
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
|
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
|
|
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
|
|
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
|
|
@@ -235,9 +229,6 @@ t_takeover_willmsg(Config) ->
|
|
|
%% THEN will message should be received
|
|
%% THEN will message should be received
|
|
|
?assert(IsWill),
|
|
?assert(IsWill),
|
|
|
emqtt:stop(CPidSub),
|
|
emqtt:stop(CPidSub),
|
|
|
- emqtt:stop(CPid2),
|
|
|
|
|
- ?assertReceive({'EXIT', CPid2, normal}),
|
|
|
|
|
- ?assert(not is_process_alive(CPid1)),
|
|
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
t_takeover_willmsg_clean_session(Config) ->
|
|
t_takeover_willmsg_clean_session(Config) ->
|
|
@@ -992,7 +983,7 @@ publish_msg(Ctx, Msg) ->
|
|
|
[_ | _] -> Ctx
|
|
[_ | _] -> Ctx
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-stop_client(Ctx = #{client := [CPid | _], sleep := Sleep}) ->
|
|
|
|
|
|
|
+stop_the_last_client(Ctx = #{client := [CPid | _], sleep := Sleep}) ->
|
|
|
ok = timer:sleep(Sleep),
|
|
ok = timer:sleep(Sleep),
|
|
|
ok = emqtt:stop(CPid),
|
|
ok = emqtt:stop(CPid),
|
|
|
Ctx.
|
|
Ctx.
|
|
@@ -1082,3 +1073,14 @@ assert_client_exit(Pid, v5, kicked) ->
|
|
|
?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}});
|
|
?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}});
|
|
|
assert_client_exit(Pid, _, killed) ->
|
|
assert_client_exit(Pid, _, killed) ->
|
|
|
?assertReceive({'EXIT', Pid, killed}).
|
|
?assertReceive({'EXIT', Pid, killed}).
|
|
|
|
|
+
|
|
|
|
|
+make_client_id(Case, Config) ->
|
|
|
|
|
+ Vsn = atom_to_list(?config(mqtt_vsn, Config)),
|
|
|
|
|
+ Persist =
|
|
|
|
|
+ case ?config(persistence_enabled, Config) of
|
|
|
|
|
+ true ->
|
|
|
|
|
+ "-persistent-";
|
|
|
|
|
+ false ->
|
|
|
|
|
+ "-not-persistent-"
|
|
|
|
|
+ end,
|
|
|
|
|
+ iolist_to_binary([atom_to_binary(Case), Persist ++ Vsn]).
|