| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_persistent_session_ds_SUITE).
- -compile(export_all).
- -compile(nowarn_export_all).
- -include_lib("stdlib/include/assert.hrl").
- -include_lib("common_test/include/ct.hrl").
- -include_lib("snabbkaffe/include/snabbkaffe.hrl").
- -include_lib("emqx/include/asserts.hrl").
- -include_lib("emqx/include/emqx_mqtt.hrl").
- -import(emqx_common_test_helpers, [on_exit/1]).
- %%------------------------------------------------------------------------------
- %% CT boilerplate
- %%------------------------------------------------------------------------------
- suite() ->
- [{timetrap, {seconds, 60}}].
- all() ->
- emqx_common_test_helpers:all(?MODULE).
- init_per_suite(Config) ->
- TCApps = emqx_cth_suite:start(
- app_specs(),
- #{work_dir => emqx_cth_suite:work_dir(Config)}
- ),
- [{tc_apps, TCApps} | Config].
- end_per_suite(Config) ->
- TCApps = ?config(tc_apps, Config),
- emqx_cth_suite:stop(TCApps),
- ok.
- init_per_testcase(TestCase, Config) when
- TestCase =:= t_session_subscription_idempotency;
- TestCase =:= t_session_unsubscription_idempotency
- ->
- Cluster = cluster(#{n => 1}),
- ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
- NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
- Nodes = emqx_cth_cluster:start(NodeSpecs),
- [
- {cluster, Cluster},
- {node_specs, NodeSpecs},
- {cluster_opts, ClusterOpts},
- {nodes, Nodes}
- | Config
- ];
- init_per_testcase(t_session_gc = TestCase, Config) ->
- Opts = #{
- n => 3,
- roles => [core, core, core],
- extra_emqx_conf =>
- "\n session_persistence {"
- "\n last_alive_update_interval = 500ms "
- "\n session_gc_interval = 1s "
- "\n session_gc_batch_size = 2 "
- "\n }"
- },
- Cluster = cluster(Opts),
- ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
- NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
- Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts),
- [
- {cluster, Cluster},
- {node_specs, NodeSpecs},
- {cluster_opts, ClusterOpts},
- {nodes, Nodes},
- {gc_interval, timer:seconds(2)}
- | Config
- ];
- init_per_testcase(_TestCase, Config) ->
- Config.
- end_per_testcase(TestCase, Config) when
- TestCase =:= t_session_subscription_idempotency;
- TestCase =:= t_session_unsubscription_idempotency;
- TestCase =:= t_session_gc
- ->
- Nodes = ?config(nodes, Config),
- emqx_common_test_helpers:call_janitor(60_000),
- ok = emqx_cth_cluster:stop(Nodes),
- ok;
- end_per_testcase(_TestCase, _Config) ->
- emqx_common_test_helpers:call_janitor(60_000),
- ok.
- %%------------------------------------------------------------------------------
- %% Helper functions
- %%------------------------------------------------------------------------------
- cluster(#{n := N} = Opts) ->
- MkRole = fun(M) ->
- case maps:get(roles, Opts, undefined) of
- undefined ->
- core;
- Roles ->
- lists:nth(M, Roles)
- end
- end,
- MkSpec = fun(M) -> #{role => MkRole(M), apps => app_specs(Opts)} end,
- lists:map(
- fun(M) ->
- Name = list_to_atom("ds_SUITE" ++ integer_to_list(M)),
- {Name, MkSpec(M)}
- end,
- lists:seq(1, N)
- ).
- app_specs() ->
- app_specs(_Opts = #{}).
- app_specs(Opts) ->
- DefaultEMQXConf = "session_persistence {enable = true, renew_streams_interval = 1s}",
- ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
- [
- {emqx, DefaultEMQXConf ++ ExtraEMQXConf}
- ].
- get_mqtt_port(Node, Type) ->
- {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
- Port.
- wait_nodeup(Node) ->
- ?retry(
- _Sleep0 = 500,
- _Attempts0 = 50,
- pong = net_adm:ping(Node)
- ).
- start_client(Opts0 = #{}) ->
- Defaults = #{
- port => 1883,
- proto_ver => v5,
- properties => #{'Session-Expiry-Interval' => 300}
- },
- Opts = emqx_utils_maps:deep_merge(Defaults, Opts0),
- ?tp(notice, "starting client", Opts),
- {ok, Client} = emqtt:start_link(maps:to_list(Opts)),
- unlink(Client),
- on_exit(fun() -> catch emqtt:stop(Client) end),
- Client.
- start_connect_client(Opts = #{}) ->
- Client = start_client(Opts),
- ?assertMatch({ok, _}, emqtt:connect(Client)),
- Client.
- mk_clientid(Prefix, ID) ->
- iolist_to_binary(io_lib:format("~p/~p", [Prefix, ID])).
- restart_node(Node, NodeSpec) ->
- ?tp(will_restart_node, #{}),
- emqx_cth_cluster:restart(Node, NodeSpec),
- wait_nodeup(Node),
- ?tp(restarted_node, #{}),
- ok.
- is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) ->
- EI > 0.
- list_all_sessions(Node) ->
- erpc:call(Node, emqx_persistent_session_ds_state, list_sessions, []).
- list_all_subscriptions(Node) ->
- Sessions = list_all_sessions(Node),
- lists:flatmap(
- fun(ClientId) ->
- #{s := #{subscriptions := Subs}} = erpc:call(
- Node, emqx_persistent_session_ds, print_session, [ClientId]
- ),
- maps:to_list(Subs)
- end,
- Sessions
- ).
- list_all_pubranges(Node) ->
- erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
- session_open(Node, ClientId) ->
- ClientInfo = #{},
- ConnInfo = #{peername => {undefined, undefined}, proto_name => <<"MQTT">>, proto_ver => 5},
- WillMsg = undefined,
- erpc:call(
- Node,
- emqx_persistent_session_ds,
- session_open,
- [ClientId, ClientInfo, ConnInfo, WillMsg]
- ).
- force_last_alive_at(ClientId, Time) ->
- {ok, S0} = emqx_persistent_session_ds_state:open(ClientId),
- S = emqx_persistent_session_ds_state:set_last_alive_at(Time, S0),
- _ = emqx_persistent_session_ds_state:commit(S),
- ok.
- %%------------------------------------------------------------------------------
- %% Testcases
- %%------------------------------------------------------------------------------
- t_session_subscription_idempotency(Config) ->
- [Node1Spec | _] = ?config(node_specs, Config),
- [Node1] = ?config(nodes, Config),
- Port = get_mqtt_port(Node1, tcp),
- SubTopicFilter = <<"t/+">>,
- ClientId = <<"myclientid">>,
- ?check_trace(
- #{timetrap => 30_000},
- begin
- ?force_ordering(
- #{?snk_kind := persistent_session_ds_subscription_added},
- _NEvents0 = 1,
- #{?snk_kind := will_restart_node},
- _Guard0 = true
- ),
- ?force_ordering(
- #{?snk_kind := restarted_node},
- _NEvents1 = 1,
- #{?snk_kind := persistent_session_ds_open_iterators, ?snk_span := start},
- _Guard1 = true
- ),
- spawn_link(fun() -> restart_node(Node1, Node1Spec) end),
- ?tp(notice, "starting 1", #{}),
- Client0 = start_client(#{port => Port, clientid => ClientId}),
- {ok, _} = emqtt:connect(Client0),
- ?tp(notice, "subscribing 1", #{}),
- process_flag(trap_exit, true),
- catch emqtt:subscribe(Client0, SubTopicFilter, qos2),
- receive
- {'EXIT', {shutdown, _}} ->
- ok
- after 100 -> ok
- end,
- process_flag(trap_exit, false),
- {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
- ?tp(notice, "starting 2", #{}),
- Client1 = start_client(#{port => Port, clientid => ClientId}),
- {ok, _} = emqtt:connect(Client1),
- ?tp(notice, "subscribing 2", #{}),
- {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
- ok = emqtt:stop(Client1),
- ok
- end,
- fun(Trace) ->
- Session = session_open(Node1, ClientId),
- ?assertMatch(
- #{SubTopicFilter := #{}},
- emqx_session:info(subscriptions, Session)
- )
- end
- ),
- ok.
- %% Check that we close the iterators before deleting the iterator id entry.
- t_session_unsubscription_idempotency(Config) ->
- [Node1Spec | _] = ?config(node_specs, Config),
- [Node1] = ?config(nodes, Config),
- Port = get_mqtt_port(Node1, tcp),
- SubTopicFilter = <<"t/+">>,
- ClientId = <<"myclientid">>,
- ?check_trace(
- #{timetrap => 30_000},
- begin
- ?force_ordering(
- #{
- ?snk_kind := persistent_session_ds_subscription_delete
- },
- _NEvents0 = 1,
- #{?snk_kind := will_restart_node},
- _Guard0 = true
- ),
- ?force_ordering(
- #{?snk_kind := restarted_node},
- _NEvents1 = 1,
- #{?snk_kind := persistent_session_ds_subscription_route_delete, ?snk_span := start},
- _Guard1 = true
- ),
- spawn_link(fun() -> restart_node(Node1, Node1Spec) end),
- ?tp(notice, "starting 1", #{}),
- Client0 = start_client(#{port => Port, clientid => ClientId}),
- {ok, _} = emqtt:connect(Client0),
- ?tp(notice, "subscribing 1", #{}),
- {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
- ?tp(notice, "unsubscribing 1", #{}),
- process_flag(trap_exit, true),
- catch emqtt:unsubscribe(Client0, SubTopicFilter),
- receive
- {'EXIT', {shutdown, _}} ->
- ok
- after 100 -> ok
- end,
- process_flag(trap_exit, false),
- {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
- ?tp(notice, "starting 2", #{}),
- Client1 = start_client(#{port => Port, clientid => ClientId}),
- {ok, _} = emqtt:connect(Client1),
- ?tp(notice, "subscribing 2", #{}),
- {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
- ?tp(notice, "unsubscribing 2", #{}),
- {{ok, _, [?RC_SUCCESS]}, {ok, _}} =
- ?wait_async_action(
- emqtt:unsubscribe(Client1, SubTopicFilter),
- #{
- ?snk_kind := persistent_session_ds_subscription_route_delete,
- ?snk_span := {complete, _}
- },
- 15_000
- ),
- ok = emqtt:stop(Client1),
- ok
- end,
- fun(Trace) ->
- Session = session_open(Node1, ClientId),
- ?assertEqual(
- #{},
- emqx_session:info(subscriptions, Session)
- ),
- ok
- end
- ),
- ok.
- t_session_discard_persistent_to_non_persistent(_Config) ->
- ClientId = atom_to_binary(?FUNCTION_NAME),
- Params = #{
- client_id => ClientId,
- reconnect_opts =>
- #{
- clean_start => true,
- %% we set it to zero so that a new session is not created.
- properties => #{'Session-Expiry-Interval' => 0},
- proto_ver => v5
- }
- },
- do_t_session_discard(Params).
- t_session_discard_persistent_to_persistent(_Config) ->
- ClientId = atom_to_binary(?FUNCTION_NAME),
- Params = #{
- client_id => ClientId,
- reconnect_opts =>
- #{
- clean_start => true,
- properties => #{'Session-Expiry-Interval' => 30},
- proto_ver => v5
- }
- },
- do_t_session_discard(Params).
- do_t_session_discard(Params) ->
- #{
- client_id := ClientId,
- reconnect_opts := ReconnectOpts0
- } = Params,
- ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
- SubTopicFilter = <<"t/+">>,
- ?check_trace(
- #{timetrap => 30_000},
- begin
- ?tp(notice, "starting", #{}),
- Client0 = start_client(#{
- clientid => ClientId,
- clean_start => false,
- properties => #{'Session-Expiry-Interval' => 30},
- proto_ver => v5
- }),
- {ok, _} = emqtt:connect(Client0),
- ?tp(notice, "subscribing", #{}),
- {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
- %% Store some matching messages so that streams and iterators are created.
- ok = emqtt:publish(Client0, <<"t/1">>, <<"1">>),
- ok = emqtt:publish(Client0, <<"t/2">>, <<"2">>),
- ?retry(
- _Sleep0 = 100,
- _Attempts0 = 50,
- #{} = emqx_persistent_session_ds_state:print_session(ClientId)
- ),
- ok = emqtt:stop(Client0),
- ?tp(notice, "disconnected", #{}),
- ?tp(notice, "reconnecting", #{}),
- %% we still have the session:
- ?assertMatch(#{}, emqx_persistent_session_ds_state:print_session(ClientId)),
- Client1 = start_client(ReconnectOpts),
- {ok, _} = emqtt:connect(Client1),
- ?assertEqual([], emqtt:subscriptions(Client1)),
- case is_persistent_connect_opts(ReconnectOpts) of
- true ->
- ?assertMatch(#{}, emqx_persistent_session_ds_state:print_session(ClientId));
- false ->
- ?assertEqual(
- undefined, emqx_persistent_session_ds_state:print_session(ClientId)
- )
- end,
- ?assertEqual([], emqx_persistent_session_ds_router:topics()),
- ok = emqtt:stop(Client1),
- ?tp(notice, "disconnected", #{}),
- ok
- end,
- []
- ),
- ok.
- t_session_expiration1(Config) ->
- %% This testcase verifies that the properties passed in the
- %% CONNECT packet are respected by the GC process:
- ClientId = atom_to_binary(?FUNCTION_NAME),
- Opts = #{
- clientid => ClientId,
- sequence => [
- {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}},
- {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 1}}, #{}},
- {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}
- ]
- },
- do_t_session_expiration(Config, Opts).
- t_session_expiration2(Config) ->
- %% This testcase updates the expiry interval for the session in
- %% the _DISCONNECT_ packet. This setting should be respected by GC
- %% process:
- ClientId = atom_to_binary(?FUNCTION_NAME),
- Opts = #{
- clientid => ClientId,
- sequence => [
- {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}},
- {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{
- 'Session-Expiry-Interval' => 1
- }},
- {#{clean_start => false, properties => #{'Session-Expiry-Interval' => 30}}, #{}}
- ]
- },
- do_t_session_expiration(Config, Opts).
- do_t_session_expiration(_Config, Opts) ->
- %% Sequence is a list of pairs of properties passed through the
- %% CONNECT and for the DISCONNECT for each session:
- #{
- clientid := ClientId,
- sequence := [
- {FirstConn, FirstDisconn},
- {SecondConn, SecondDisconn},
- {ThirdConn, ThirdDisconn}
- ]
- } = Opts,
- CommonParams = #{proto_ver => v5, clientid => ClientId},
- ?check_trace(
- #{timetrap => 30_000},
- begin
- Topic = <<"some/topic">>,
- Params0 = maps:merge(CommonParams, FirstConn),
- Client0 = start_client(Params0),
- {ok, _} = emqtt:connect(Client0),
- {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2),
- #{s := #{subscriptions := Subs0}} = emqx_persistent_session_ds:print_session(ClientId),
- ?assertEqual(1, map_size(Subs0), #{subs => Subs0}),
- Info0 = maps:from_list(emqtt:info(Client0)),
- ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}),
- emqtt:disconnect(Client0, ?RC_NORMAL_DISCONNECTION, FirstDisconn),
- Params1 = maps:merge(CommonParams, SecondConn),
- Client1 = start_client(Params1),
- {ok, _} = emqtt:connect(Client1),
- Info1 = maps:from_list(emqtt:info(Client1)),
- ?assertEqual(1, maps:get(session_present, Info1), #{info => Info1}),
- Subs1 = emqtt:subscriptions(Client1),
- ?assertEqual([], Subs1),
- emqtt:disconnect(Client1, ?RC_NORMAL_DISCONNECTION, SecondDisconn),
- ct:sleep(2_500),
- Params2 = maps:merge(CommonParams, ThirdConn),
- Client2 = start_client(Params2),
- {ok, _} = emqtt:connect(Client2),
- Info2 = maps:from_list(emqtt:info(Client2)),
- ?assertEqual(0, maps:get(session_present, Info2), #{info => Info2}),
- Subs2 = emqtt:subscriptions(Client2),
- ?assertEqual([], Subs2),
- emqtt:publish(Client2, Topic, <<"payload">>),
- ?assertNotReceive({publish, #{topic := Topic}}),
- %% ensure subscriptions are absent from table.
- #{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId),
- ?assertEqual([], maps:to_list(Subs3)),
- emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn),
- ok
- end,
- []
- ),
- ok.
- t_session_gc(Config) ->
- [Node1, _Node2, _Node3] = Nodes = ?config(nodes, Config),
- [
- Port1,
- Port2,
- Port3
- ] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes),
- ct:pal("Ports: ~p", [[Port1, Port2, Port3]]),
- CommonParams = #{
- clean_start => false,
- proto_ver => v5
- },
- StartClient = fun(ClientId, Port, ExpiryInterval) ->
- Params = maps:merge(CommonParams, #{
- clientid => ClientId,
- port => Port,
- properties => #{'Session-Expiry-Interval' => ExpiryInterval}
- }),
- Client = start_client(Params),
- {ok, _} = emqtt:connect(Client),
- Client
- end,
- ?check_trace(
- #{timetrap => 30_000},
- begin
- ClientId1 = <<"session_gc1">>,
- Client1 = StartClient(ClientId1, Port1, 30),
- ClientId2 = <<"session_gc2">>,
- Client2 = StartClient(ClientId2, Port2, 1),
- ClientId3 = <<"session_gc3">>,
- Client3 = StartClient(ClientId3, Port3, 1),
- lists:foreach(
- fun(Client) ->
- Topic = <<"some/topic">>,
- Payload = <<"hi">>,
- {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client, Topic, ?QOS_1),
- {ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1),
- ok
- end,
- [Client1, Client2, Client3]
- ),
- %% Clients are still alive; no session is garbage collected.
- ?tp(notice, "waiting for gc", #{}),
- ?assertMatch(
- {ok, _},
- ?block_until(
- #{
- ?snk_kind := ds_session_gc,
- ?snk_span := {complete, _},
- ?snk_meta := #{node := N}
- } when N =/= node()
- )
- ),
- ?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
- ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),
- ?tp(notice, "gc ran", #{}),
- %% Now we disconnect 2 of them; only those should be GC'ed.
- ?tp(notice, "disconnecting client1", #{}),
- ?assertMatch(
- {ok, {ok, _}},
- ?wait_async_action(
- emqtt:stop(Client2),
- #{?snk_kind := terminate}
- )
- ),
- ?tp(notice, "disconnected client1", #{}),
- ?assertMatch(
- {ok, {ok, _}},
- ?wait_async_action(
- emqtt:stop(Client3),
- #{?snk_kind := terminate}
- )
- ),
- ?tp(notice, "disconnected client2", #{}),
- ?assertMatch(
- {ok, _},
- ?block_until(
- #{
- ?snk_kind := ds_session_gc_cleaned,
- session_id := ClientId2
- }
- )
- ),
- ?assertMatch(
- {ok, _},
- ?block_until(
- #{
- ?snk_kind := ds_session_gc_cleaned,
- session_id := ClientId3
- }
- )
- ),
- ?retry(50, 3, [ClientId1] = list_all_sessions(Node1)),
- ?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
- ok
- end,
- []
- ),
- ok.
- t_session_replay_retry(_Config) ->
- %% Verify that the session recovers smoothly from transient errors during
- %% replay.
- ok = emqx_ds_test_helpers:mock_rpc(),
- NClients = 10,
- ClientSubOpts = #{
- clientid => mk_clientid(?FUNCTION_NAME, sub),
- auto_ack => never
- },
- ClientSub = start_connect_client(ClientSubOpts),
- ?assertMatch(
- {ok, _, [?RC_GRANTED_QOS_1]},
- emqtt:subscribe(ClientSub, <<"t/#">>, ?QOS_1)
- ),
- ClientsPub = [
- start_connect_client(#{
- clientid => mk_clientid(?FUNCTION_NAME, I),
- properties => #{'Session-Expiry-Interval' => 0}
- })
- || I <- lists:seq(1, NClients)
- ],
- lists:foreach(
- fun(Client) ->
- Index = integer_to_binary(rand:uniform(NClients)),
- Topic = <<"t/", Index/binary>>,
- ?assertMatch({ok, #{}}, emqtt:publish(Client, Topic, Index, 1))
- end,
- ClientsPub
- ),
- Pubs0 = emqx_common_test_helpers:wait_publishes(NClients, 5_000),
- NPubs = length(Pubs0),
- ?assertEqual(NClients, NPubs, ?drainMailbox(1_500)),
- ok = emqtt:stop(ClientSub),
- %% Make `emqx_ds` believe that roughly half of the shards are unavailable.
- ok = emqx_ds_test_helpers:mock_rpc_result(
- fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
- case erlang:phash2(Shard) rem 2 of
- 0 -> unavailable;
- 1 -> passthrough
- end
- end
- ),
- _ClientSub = start_connect_client(ClientSubOpts#{clean_start => false}),
- Pubs1 = emqx_common_test_helpers:wait_publishes(NPubs, 5_000),
- ?assert(length(Pubs1) < length(Pubs0), Pubs1),
- %% "Recover" the shards.
- emqx_ds_test_helpers:unmock_rpc(),
- Pubs2 = emqx_common_test_helpers:wait_publishes(NPubs - length(Pubs1), 5_000),
- ?assertEqual(
- [maps:with([topic, payload, qos], P) || P <- Pubs0],
- [maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2]
- ).
- %% Check that we send will messages when performing GC without relying on timers set by
- %% the channel process.
- t_session_gc_will_message(_Config) ->
- ?check_trace(
- #{timetrap => 10_000},
- begin
- WillTopic = <<"will/t">>,
- ok = emqx:subscribe(WillTopic, #{qos => 2}),
- ClientId = <<"will_msg_client">>,
- Client = start_client(#{
- clientid => ClientId,
- will_topic => WillTopic,
- will_payload => <<"will payload">>,
- will_qos => 0,
- will_props => #{'Will-Delay-Interval' => 300}
- }),
- {ok, _} = emqtt:connect(Client),
- %% Use reason code =/= `?RC_SUCCESS' to allow will message
- {ok, {ok, _}} =
- ?wait_async_action(
- emqtt:disconnect(Client, ?RC_UNSPECIFIED_ERROR),
- #{?snk_kind := emqx_cm_clean_down}
- ),
- ?assertNotReceive({deliver, WillTopic, _}),
- %% Set fake `last_alive_at' to trigger immediate will message.
- force_last_alive_at(ClientId, _Time = 0),
- {ok, {ok, _}} =
- ?wait_async_action(
- emqx_persistent_session_ds_gc_worker:check_session(ClientId),
- #{?snk_kind := session_gc_published_will_msg}
- ),
- ?assertReceive({deliver, WillTopic, _}),
- ok
- end,
- []
- ),
- ok.
|