|
|
@@ -245,7 +245,7 @@ receive_messages(Count, Msgs) ->
|
|
|
receive_messages(Count-1, [Msg|Msgs]);
|
|
|
_Other ->
|
|
|
receive_messages(Count, Msgs)
|
|
|
- after 1000 ->
|
|
|
+ after 5000 ->
|
|
|
Msgs
|
|
|
end.
|
|
|
|
|
|
@@ -576,7 +576,7 @@ t_publish_while_client_is_gone(Config) ->
|
|
|
| Config]),
|
|
|
{ok, _} = emqtt:ConnFun(Client2),
|
|
|
Msgs = receive_messages(2),
|
|
|
- ?assertEqual(length(Msgs), 2),
|
|
|
+ ?assertMatch([_, _], Msgs),
|
|
|
[Msg2, Msg1] = Msgs,
|
|
|
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
|
|
|
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
|
|
@@ -768,7 +768,7 @@ t_lost_messages_because_of_gc(Config) ->
|
|
|
|
|
|
check_snabbkaffe_vanilla(Trace) ->
|
|
|
ResumeTrace = [T || #{?snk_kind := K} = T <- Trace,
|
|
|
- re:run(atom_to_list(K), "^ps_") /= nomatch],
|
|
|
+ re:run(to_list(K), "^ps_") /= nomatch],
|
|
|
?assertMatch([_|_], ResumeTrace),
|
|
|
[_Sid] = lists:usort(?projection(sid, ResumeTrace)),
|
|
|
%% Check internal flow of the emqx_cm resuming
|
|
|
@@ -811,6 +811,10 @@ check_snabbkaffe_vanilla(Trace) ->
|
|
|
[Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)),
|
|
|
?assertMatch([_], Markers).
|
|
|
|
|
|
+to_list(L) when is_list(L) -> L;
|
|
|
+to_list(A) when is_atom(A) -> atom_to_list(A);
|
|
|
+to_list(B) when is_binary(B) -> binary_to_list(B).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Snabbkaffe tests
|
|
|
%%--------------------------------------------------------------------
|