|
|
@@ -74,13 +74,14 @@ end_per_testcase(_Config) ->
|
|
|
t_base_test(_Config) ->
|
|
|
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
|
|
|
Pid = self(),
|
|
|
- MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
|
|
|
+ Msg = ?FUNCTION_NAME,
|
|
|
+ MFA = {M, F, A} = {?MODULE, echo, [Pid, Msg]},
|
|
|
{ok, TnxId, ok} = multicall(M, F, A),
|
|
|
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
|
|
|
?assertEqual(MFA, maps:get(mfa, Query)),
|
|
|
?assertEqual(node(), maps:get(initiator, Query)),
|
|
|
?assert(maps:is_key(created_at, Query)),
|
|
|
- ?assertEqual(ok, receive_msg(3, test)),
|
|
|
+ ?assertEqual(ok, receive_msg(3, Msg)),
|
|
|
?assertEqual({ok, 2, ok}, multicall(M, F, A)),
|
|
|
{atomic, Status} = emqx_cluster_rpc:status(),
|
|
|
case length(Status) =:= 3 of
|
|
|
@@ -118,9 +119,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
|
|
|
emqx_cluster_rpc:reset(),
|
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
|
Pid = self(),
|
|
|
- {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
|
|
|
+ Msg = ?FUNCTION_NAME,
|
|
|
+ {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
|
|
|
{ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
|
|
|
- ?assertEqual(ok, receive_msg(3, test)),
|
|
|
+ ?assertEqual(ok, receive_msg(3, Msg)),
|
|
|
|
|
|
{M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
|
|
|
{ok, _, ok} = multicall(M, F, A, 1, 1000),
|
|
|
@@ -154,9 +156,10 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
|
|
|
t_commit_concurrency(_Config) ->
|
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
|
Pid = self(),
|
|
|
- {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, test]},
|
|
|
- {ok, _TnxId, ok} = multicall(BaseM, BaseF, BaseA),
|
|
|
- ?assertEqual(ok, receive_msg(3, test)),
|
|
|
+ Msg = ?FUNCTION_NAME,
|
|
|
+ {BaseM, BaseF, BaseA} = {?MODULE, echo, [Pid, Msg]},
|
|
|
+ ?assertEqual({ok, 1, ok}, multicall(BaseM, BaseF, BaseA)),
|
|
|
+ ?assertEqual(ok, receive_msg(3, Msg)),
|
|
|
|
|
|
%% call concurrently without stale tnx_id error
|
|
|
Workers = lists:seq(1, 256),
|
|
|
@@ -231,23 +234,24 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
|
|
|
{atomic, [_Status | L]} = emqx_cluster_rpc:status(),
|
|
|
?assertEqual([], L),
|
|
|
ets:insert(test, {other_mfa_result, ok}),
|
|
|
- {ok, 2, ok} = multicall(io, format, ["test"], 1, 1000),
|
|
|
+ {ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
|
|
|
ct:sleep(1000),
|
|
|
{atomic, NewStatus} = emqx_cluster_rpc:status(),
|
|
|
?assertEqual(3, length(NewStatus)),
|
|
|
Pid = self(),
|
|
|
- MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]},
|
|
|
+ Msg = ?FUNCTION_NAME,
|
|
|
+ MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, Msg]},
|
|
|
{ok, TnxId, ok} = multicall(M1, F1, A1),
|
|
|
{atomic, Query} = emqx_cluster_rpc:query(TnxId),
|
|
|
?assertEqual(MFAEcho, maps:get(mfa, Query)),
|
|
|
?assertEqual(node(), maps:get(initiator, Query)),
|
|
|
?assert(maps:is_key(created_at, Query)),
|
|
|
- ?assertEqual(ok, receive_msg(3, test)),
|
|
|
+ ?assertEqual(ok, receive_msg(3, Msg)),
|
|
|
ok.
|
|
|
|
|
|
t_del_stale_mfa(_Config) ->
|
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
|
- MFA = {M, F, A} = {io, format, ["test"]},
|
|
|
+ MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]},
|
|
|
Keys = lists:seq(1, 50),
|
|
|
Keys2 = lists:seq(51, 150),
|
|
|
Ids =
|
|
|
@@ -288,7 +292,7 @@ t_del_stale_mfa(_Config) ->
|
|
|
|
|
|
t_skip_failed_commit(_Config) ->
|
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
|
- {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
|
|
|
+ {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
|
|
|
ct:sleep(180),
|
|
|
{atomic, List1} = emqx_cluster_rpc:status(),
|
|
|
Node = node(),
|
|
|
@@ -308,7 +312,7 @@ t_skip_failed_commit(_Config) ->
|
|
|
|
|
|
t_fast_forward_commit(_Config) ->
|
|
|
{atomic, []} = emqx_cluster_rpc:status(),
|
|
|
- {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000),
|
|
|
+ {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
|
|
|
ct:sleep(180),
|
|
|
{atomic, List1} = emqx_cluster_rpc:status(),
|
|
|
Node = node(),
|
|
|
@@ -356,7 +360,11 @@ tnx_ids(Status) ->
|
|
|
start() ->
|
|
|
{ok, _Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
|
|
|
{ok, _Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
|
|
|
+ ok = emqx_cluster_rpc:wait_for_cluster_rpc(),
|
|
|
ok = emqx_cluster_rpc:reset(),
|
|
|
+ %% Ensure all processes are idle status.
|
|
|
+ ok = gen_server:call(?NODE2, test),
|
|
|
+ ok = gen_server:call(?NODE3, test),
|
|
|
ok.
|
|
|
|
|
|
stop() ->
|
|
|
@@ -366,6 +374,7 @@ stop() ->
|
|
|
undefined ->
|
|
|
ok;
|
|
|
P ->
|
|
|
+ erlang:unregister(N),
|
|
|
erlang:unlink(P),
|
|
|
erlang:exit(P, kill)
|
|
|
end
|
|
|
@@ -379,8 +388,9 @@ receive_msg(Count, Msg) when Count > 0 ->
|
|
|
receive
|
|
|
Msg ->
|
|
|
receive_msg(Count - 1, Msg)
|
|
|
- after 1000 ->
|
|
|
- timeout
|
|
|
+ after 1300 ->
|
|
|
+ Msg = iolist_to_binary(io_lib:format("There's still ~w messages to be received", [Count])),
|
|
|
+ {Msg, flush_msg([])}
|
|
|
end.
|
|
|
|
|
|
echo(Pid, Msg) ->
|
|
|
@@ -425,3 +435,11 @@ multicall(M, F, A, N, T) ->
|
|
|
|
|
|
multicall(M, F, A) ->
|
|
|
multicall(M, F, A, all, timer:minutes(2)).
|
|
|
+
|
|
|
+flush_msg(Acc) ->
|
|
|
+ receive
|
|
|
+ Msg ->
|
|
|
+ flush_msg([Msg | Acc])
|
|
|
+ after 10 ->
|
|
|
+ Acc
|
|
|
+ end.
|