Browse Source

test(dsrepl): relax crash-recover testcase to tolerate message loss

Which is quite an expected occasion for this kind of stress test.
Andrew Mayorov 1 year ago
parent
commit
30efa1f57e

+ 47 - 6
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -817,9 +817,10 @@ t_crash_restart_recover(Config) ->
     DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}),
 
     %% Prepare test event stream.
-    {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
-        ?FUNCTION_NAME, _NClients = 8, _NMsgs = 400
-    ),
+    NMsgs = 400,
+    NClients = 8,
+    {Stream0, TopicStreams} =
+        emqx_ds_test_helpers:interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
     Stream1 = emqx_utils_stream:interleave(
         [
             {300, Stream0},
@@ -849,19 +850,59 @@ t_crash_restart_recover(Config) ->
             %% Apply the test events, including simulated node crashes.
             NodeStream = emqx_utils_stream:const(N1),
             emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
-            timer:sleep(5000),
 
-            %% Verify that all the data is there.
-            emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
+            %% It's expected to lose few messages when leaders are abruptly killed.
+            MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}),
+            {ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity),
+            {timeout, Events} = snabbkaffe:receive_events(SubRef),
+            LostMessages = [M || #{batch := Messages} <- Events, M <- Messages],
+            ct:pal("Some messages were lost: ~p", [LostMessages]),
+            ?assert(length(LostMessages) < NMsgs div 20),
+
+            %% Verify that all the successfully persisted messages are there.
+            VerifyClient = fun({ClientId, ExpectedStream}) ->
+                Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
+                ClientNodes = nodes_of_clientid(ClientId, Nodes),
+                DSStream1 = ds_topic_stream(ClientId, Topic, hd(ClientNodes)),
+                %% Do nodes contain same messages for a client?
+                lists:foreach(
+                    fun(ClientNode) ->
+                        DSStream = ds_topic_stream(ClientId, Topic, ClientNode),
+                        ?defer_assert(emqx_ds_test_helpers:diff_messages(DSStream1, DSStream))
+                    end,
+                    tl(ClientNodes)
+                ),
+                %% Does any messages were lost unexpectedly?
+                {_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)),
+                ExpectedMessages = emqx_utils_stream:consume(ExpectedStream),
+                MissingMessages = ExpectedMessages -- DSMessages,
+                ?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages))
+            end,
+            lists:foreach(VerifyClient, TopicStreams)
         end,
         []
     ).
 
+nodes_of_clientid(ClientId, Nodes) ->
+    emqx_ds_test_helpers:nodes_of_clientid(?DB, ClientId, Nodes).
+
+ds_topic_stream(ClientId, ClientTopic, Node) ->
+    emqx_ds_test_helpers:ds_topic_stream(?DB, ClientId, ClientTopic, Node).
+
+is_message_lost(Message, MessagesLost) ->
+    lists:any(
+        fun(ML) ->
+            emqx_ds_test_helpers:message_eq([clientid, topic, payload], Message, ML)
+        end,
+        MessagesLost
+    ).
+
 kill_restart_node_async(Node, Spec, DBOpts) ->
     erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]).
 
 kill_restart_node(Node, Spec, DBOpts) ->
     ok = emqx_cth_peer:kill(Node),
+    ?tp(test_cluster_node_killed, #{node => Node}),
     _ = emqx_cth_cluster:restart(Spec),
     ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]).
 

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_buffer.erl

@@ -314,7 +314,7 @@ do_flush(
             ?tp(
                 debug,
                 emqx_ds_buffer_flush_failed,
-                #{db => DB, shard => Shard, error => Err}
+                #{db => DB, shard => Shard, batch => Messages, error => Err}
             ),
             emqx_ds_builtin_metrics:inc_buffer_batches_failed(Metrics),
             Reply =

+ 6 - 3
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -266,15 +266,18 @@ verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) ->
     ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
     ?defer_assert(
         begin
-            snabbkaffe_diff:assert_lists_eq(
+            diff_messages(
                 ExpectedStream,
-                ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node),
-                message_diff_options([id, qos, from, flags, headers, topic, payload, extra])
+                ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node)
             ),
             ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
         end
     ).
 
+diff_messages(Expected, Got) ->
+    Fields = [id, qos, from, flags, headers, topic, payload, extra],
+    diff_messages(Fields, Expected, Got).
+
 diff_messages(Fields, Expected, Got) ->
     snabbkaffe_diff:assert_lists_eq(Expected, Got, message_diff_options(Fields)).