|
|
@@ -217,9 +217,13 @@ transitions(Node, DB) ->
|
|
|
|
|
|
%% Stream comparison
|
|
|
|
|
|
-message_eq(Msg1, {_Key, Msg2}) ->
|
|
|
- %% Timestamps can be modified by the replication layer, ignore them:
|
|
|
- Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}.
|
|
|
+message_eq(Fields, {_Key, Msg1 = #message{}}, Msg2) ->
|
|
|
+ message_eq(Fields, Msg1, Msg2);
|
|
|
+message_eq(Fields, Msg1, {_Key, Msg2 = #message{}}) ->
|
|
|
+ message_eq(Fields, Msg1, Msg2);
|
|
|
+message_eq(Fields, Msg1 = #message{}, Msg2 = #message{}) ->
|
|
|
+ maps:with(Fields, emqx_message:to_map(Msg1)) =:=
|
|
|
+ maps:with(Fields, emqx_message:to_map(Msg2)).
|
|
|
|
|
|
%% Consuming streams and iterators
|
|
|
|
|
|
@@ -242,18 +246,27 @@ verify_stream_effects(DB, TestCase, Nodes0, L) ->
|
|
|
-spec verify_stream_effects(atom(), binary(), node(), emqx_types:clientid(), ds_stream()) -> ok.
|
|
|
verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) ->
|
|
|
ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
|
|
|
- DiffOpts = #{context => 20, window => 1000, compare_fun => fun message_eq/2},
|
|
|
?defer_assert(
|
|
|
begin
|
|
|
snabbkaffe_diff:assert_lists_eq(
|
|
|
ExpectedStream,
|
|
|
ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node),
|
|
|
- DiffOpts
|
|
|
+ message_diff_options([id, qos, from, flags, headers, topic, payload, extra])
|
|
|
),
|
|
|
ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
|
|
|
end
|
|
|
).
|
|
|
|
|
|
+diff_messages(Fields, Expected, Got) ->
|
|
|
+ snabbkaffe_diff:assert_lists_eq(Expected, Got, message_diff_options(Fields)).
|
|
|
+
|
|
|
+message_diff_options(Fields) ->
|
|
|
+ #{
|
|
|
+ context => 20,
|
|
|
+ window => 1000,
|
|
|
+ compare_fun => fun(M1, M2) -> message_eq(Fields, M1, M2) end
|
|
|
+ }.
|
|
|
+
|
|
|
%% Create a stream from the topic (wildcards are NOT supported for a
|
|
|
%% good reason: order of messages is implementation-dependent!).
|
|
|
%%
|