Jelajahi Sumber

test: add scenario for node stopping midway during subscribe

Thales Macedo Garitezi 2 tahun lalu
induk
melakukan
e4e88ebf36

+ 15 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -93,13 +93,27 @@ add_subscription(TopicFilterBin, DSSessionID) ->
             {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
                 DSSessionID, TopicFilter
             ),
-            ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID),
+            Ctx = #{
+                iterator_id => IteratorID,
+                start_time => StartMS,
+                is_new => IsNew
+            },
+            ?tp(persistent_session_ds_iterator_added, Ctx),
+            ?tp_span(
+                persistent_session_ds_open_iterators,
+                Ctx,
+                ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID)
+            ),
             {ok, IteratorID, IsNew}
         end
     ).
 
 -spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok.
 open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) ->
+    ?tp(persistent_session_ds_will_open_iterators, #{
+        iterator_id => IteratorID,
+        start_time => StartMS
+    }),
     Nodes = emqx:running_nodes(),
     Results = emqx_persistent_session_ds_proto_v1:open_iterator(
         Nodes, TopicFilter, StartMS, IteratorID

+ 14 - 4
apps/emqx/test/emqx_cth_cluster.erl

@@ -17,7 +17,7 @@
 -module(emqx_cth_cluster).
 
 -export([start/2]).
--export([stop/1]).
+-export([stop/1, stop_node/1]).
 
 -export([share_load_module/2]).
 -export([node_name/1]).
@@ -80,7 +80,12 @@ when
         %% Working directory
         %% Everything a test produces should go here. Each node's stuff should go in its
         %% own directory.
-        work_dir := file:name()
+        work_dir := file:name(),
+        %% Usually, we want to ensure the node / test suite starts from a clean slate.
+        %% However, sometimes, we may want to test restarting a node.  For such
+        %% situations, we need to disable this check to allow resuming from an existing
+        %% state.
+        skip_clean_suite_state_check => boolean()
     }.
 start(Nodes, ClusterOpts) ->
     NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
@@ -124,12 +129,14 @@ mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) ->
     Node = node_name(Name),
     BasePort = base_port(N),
     WorkDir = maps:get(work_dir, ClusterOpts),
+    SkipCleanSuiteStateCheck = maps:get(skip_clean_suite_state_check, ClusterOpts, false),
     Defaults = #{
         name => Node,
         role => core,
         apps => [],
         base_port => BasePort,
         work_dir => filename:join([WorkDir, Node]),
+        skip_clean_suite_state_check => SkipCleanSuiteStateCheck,
         driver => ct_slave
     },
     maps:merge(Defaults, NodeOpts).
@@ -288,17 +295,20 @@ load_apps(Node, #{apps := Apps}) ->
     erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
 
 start_apps_clustering(Node, #{apps := Apps} = Spec) ->
-    SuiteOpts = maps:with([work_dir], Spec),
+    SuiteOpts = suite_opts(Spec),
     AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING],
     _Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]),
     ok.
 
 start_apps(Node, #{apps := Apps} = Spec) ->
-    SuiteOpts = maps:with([work_dir], Spec),
+    SuiteOpts = suite_opts(Spec),
     AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
     _Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]),
     ok.
 
+suite_opts(Spec) ->
+    maps:with([work_dir, skip_clean_suite_state_check], Spec).
+
 maybe_join_cluster(_Node, #{role := replicant}) ->
     ok;
 maybe_join_cluster(Node, Spec) ->

+ 2 - 0
apps/emqx/test/emqx_cth_suite.erl

@@ -358,6 +358,8 @@ stop_apps(Apps) ->
 
 %%
 
+verify_clean_suite_state(#{skip_clean_suite_state_check := true}) ->
+    ok;
 verify_clean_suite_state(#{work_dir := WorkDir}) ->
     {ok, []} = file:list_dir(WorkDir),
     none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none),

+ 8 - 0
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -183,6 +183,8 @@ t_session_subscription_iterators(Config) ->
                     ok
             end,
             ?assertMatch([_], IteratorIds),
+            ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
+            ?assertMatch({ok, [_]}, get_all_iterator_ids(Node2)),
             [IteratorId] = IteratorIds,
             ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end),
             ExpectedMessages = [Message2, Message3],
@@ -280,3 +282,9 @@ cluster() ->
 get_mqtt_port(Node, Type) ->
     {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
     Port.
+
+get_all_iterator_ids(Node) ->
+    Fn = fun(K, _V, Acc) -> [K | Acc] end,
+    erpc:call(Node, fun() ->
+        emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
+    end).

+ 24 - 8
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -18,7 +18,8 @@
     restore_iterator/2,
     discard_iterator/2,
     is_iterator_present/2,
-    discard_iterator_prefix/2
+    discard_iterator_prefix/2,
+    foldl_iterator_prefix/4
 ]).
 
 %% behaviour callbacks:
@@ -204,6 +205,16 @@ discard_iterator(Shard, ReplayID) ->
 discard_iterator_prefix(Shard, KeyPrefix) ->
     do_discard_iterator_prefix(Shard, KeyPrefix).
 
+-spec foldl_iterator_prefix(
+    emqx_ds:shard(),
+    binary(),
+    fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc),
+    Acc
+) -> {ok, Acc} | {error, _TODO} when
+    Acc :: term().
+foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
+    do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc).
+
 %%================================================================================
 %% behaviour callbacks
 %%================================================================================
@@ -414,26 +425,31 @@ restore_iterator_state(
     open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
 
 do_discard_iterator_prefix(Shard, KeyPrefix) ->
+    #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db),
+    Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end,
+    do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok).
+
+do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
     #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
     case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of
         {ok, It} ->
             NextAction = {seek, KeyPrefix},
-            do_discard_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction);
+            do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc);
         Error ->
             Error
     end.
 
-do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction) ->
+do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) ->
     case rocksdb:iterator_move(It, NextAction) of
-        {ok, K = <<KeyPrefix:(size(KeyPrefix))/binary, _/binary>>, _V} ->
-            ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS),
-            do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, next);
+        {ok, K = <<KeyPrefix:(size(KeyPrefix))/binary, _/binary>>, V} ->
+            NewAcc = Fn(K, V, Acc),
+            do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc);
         {ok, _K, _V} ->
             ok = rocksdb:iterator_close(It),
-            ok;
+            {ok, Acc};
         {error, invalid_iterator} ->
             ok = rocksdb:iterator_close(It),
-            ok;
+            {ok, Acc};
         Error ->
             ok = rocksdb:iterator_close(It),
             Error

+ 178 - 0
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -0,0 +1,178 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ds_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(DS_SHARD, <<"local">>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    %% avoid inter-suite flakiness...
+    application:stop(emqx),
+    application:stop(emqx_durable_storage),
+    TCApps = emqx_cth_suite:start(
+        app_specs(),
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{tc_apps, TCApps} | Config].
+
+end_per_suite(Config) ->
+    TCApps = ?config(tc_apps, Config),
+    emqx_cth_suite:stop(TCApps),
+    ok.
+
+init_per_testcase(t_session_subscription_idempotency, Config) ->
+    Cluster = cluster(#{n => 1}),
+    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}),
+    [{cluster, Cluster}, {nodes, Nodes} | Config];
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(t_session_subscription_idempotency, Config) ->
+    Nodes = ?config(nodes, Config),
+    ok = emqx_cth_cluster:stop(Nodes),
+    ok;
+end_per_testcase(_TestCase, _Config) ->
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+cluster(#{n := N}) ->
+    Node1 = ds_SUITE1,
+    Spec = #{
+        role => core,
+        join_to => emqx_cth_cluster:node_name(Node1),
+        listeners => true,
+        apps => app_specs()
+    },
+    [
+        {Node1, Spec}
+        | lists:map(
+            fun(M) ->
+                Name = binary_to_atom(<<"ds_SUITE", (integer_to_binary(M))/binary>>),
+                {Name, Spec}
+            end,
+            lists:seq(2, N)
+        )
+    ].
+
+app_specs() ->
+    [
+        emqx_durable_storage,
+        {emqx, #{
+            before_start => fun() ->
+                emqx_app:set_config_loader(?MODULE)
+            end,
+            config => #{persistent_session_store => #{ds => true}},
+            override_env => [{boot_modules, [broker, listeners]}]
+        }}
+    ].
+
+get_mqtt_port(Node, Type) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
+    Port.
+
+get_all_iterator_ids(Node) ->
+    Fn = fun(K, _V, Acc) -> [K | Acc] end,
+    erpc:call(Node, fun() ->
+        emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
+    end).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_session_subscription_idempotency(Config) ->
+    Cluster = ?config(cluster, Config),
+    [Node1] = ?config(nodes, Config),
+    Port = get_mqtt_port(Node1, tcp),
+    SubTopicFilter = <<"t/+">>,
+    ClientId = <<"myclientid">>,
+    ?check_trace(
+        begin
+            ?force_ordering(
+                #{?snk_kind := persistent_session_ds_iterator_added},
+                _NEvents0 = 1,
+                #{?snk_kind := will_restart_node},
+                _Guard0 = true
+            ),
+            ?force_ordering(
+                #{?snk_kind := restarted_node},
+                _NEvents1 = 1,
+                #{?snk_kind := persistent_session_ds_open_iterators, ?snk_span := start},
+                _Guard1 = true
+            ),
+
+            spawn_link(fun() ->
+                ?tp(will_restart_node, #{}),
+                ct:pal("stopping node ~p", [Node1]),
+                ok = emqx_cth_cluster:stop_node(Node1),
+                ct:pal("stopped node ~p; restarting...", [Node1]),
+                [Node1] = emqx_cth_cluster:start(Cluster, #{
+                    work_dir => ?config(priv_dir, Config),
+                    skip_clean_suite_state_check => true
+                }),
+                ct:pal("node ~p restarted", [Node1]),
+                ?tp(restarted_node, #{}),
+                ok
+            end),
+
+            ct:pal("starting 1"),
+            {ok, Client0} = emqtt:start_link([
+                {port, Port},
+                {clientid, ClientId},
+                {proto_ver, v5}
+            ]),
+            {ok, _} = emqtt:connect(Client0),
+            ct:pal("subscribing 1"),
+            process_flag(trap_exit, true),
+            catch emqtt:subscribe(Client0, SubTopicFilter, qos2),
+            receive
+                {'EXIT', {shutdown, _}} ->
+                    ok
+            after 0 -> ok
+            end,
+            process_flag(trap_exit, false),
+
+            {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
+            ct:pal("starting 2"),
+            {ok, Client1} = emqtt:start_link([
+                {port, Port},
+                {clientid, ClientId},
+                {proto_ver, v5}
+            ]),
+            {ok, _} = emqtt:connect(Client1),
+            ct:pal("subscribing 2"),
+            {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
+
+            ok = emqtt:stop(Client1),
+
+            ok
+        end,
+        fun(Trace) ->
+            ct:pal("trace:\n  ~p", [Trace]),
+            %% Exactly one iterator should have been opened.
+            ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
+            ?assertMatch(
+                {_IsNew = false, ClientId, _},
+                erpc:call(Node1, emqx_ds, session_open, [ClientId])
+            ),
+            ok
+        end
+    ),
+    ok.