| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_persistent_session_SUITE).
- -include_lib("stdlib/include/assert.hrl").
- -include_lib("common_test/include/ct.hrl").
- -include_lib("snabbkaffe/include/snabbkaffe.hrl").
- -include_lib("emqx/include/emqx_mqtt.hrl").
- -compile(export_all).
- -compile(nowarn_export_all).
- %%--------------------------------------------------------------------
- %% SUITE boilerplate
- %%--------------------------------------------------------------------
- all() ->
- [
- % NOTE
- % Tests are disabled while existing session persistence impl is being
- % phased out.
- {group, persistent_store_disabled},
- {group, persistent_store_ds}
- ].
- %% A persistent session can be resumed in two ways:
- %% 1. The old connection process is still alive, and the session is taken
- %% over by the new connection.
- %% 2. The old session process has died (e.g., because of node down).
- %% The new process resumes the session from the stored state, and finds
- %% any subscribed messages from the persistent message store.
- %%
- %% We want to test both ways, both with the db backend enabled and disabled.
- %%
- %% In addition, we test both tcp and quic connections.
- groups() ->
- TCs = emqx_common_test_helpers:all(?MODULE),
- TCsNonGeneric = [t_choose_impl],
- [
- {persistent_store_disabled, [{group, no_kill_connection_process}]},
- {persistent_store_ds, [{group, no_kill_connection_process}]},
- {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
- {tcp, [], TCs},
- {quic, [], TCs -- TCsNonGeneric},
- {ws, [], TCs -- TCsNonGeneric}
- ].
- init_per_group(persistent_store_disabled, Config) ->
- [
- {emqx_config, "persistent_session_store { enabled = false }"},
- {persistent_store, false}
- | Config
- ];
- init_per_group(persistent_store_ds, Config) ->
- [
- {emqx_config, "persistent_session_store { ds = true }"},
- {persistent_store, ds}
- | Config
- ];
- init_per_group(Group, Config) when Group == tcp ->
- Apps = emqx_cth_suite:start(
- [{emqx, ?config(emqx_config, Config)}],
- #{work_dir => emqx_cth_suite:work_dir(Config)}
- ),
- [
- {port, get_listener_port(tcp, default)},
- {conn_fun, connect},
- {group_apps, Apps}
- | Config
- ];
- init_per_group(Group, Config) when Group == ws ->
- Apps = emqx_cth_suite:start(
- [{emqx, ?config(emqx_config, Config)}],
- #{work_dir => emqx_cth_suite:work_dir(Config)}
- ),
- [
- {ssl, false},
- {host, "localhost"},
- {enable_websocket, true},
- {port, get_listener_port(ws, default)},
- {conn_fun, ws_connect},
- {group_apps, Apps}
- | Config
- ];
- init_per_group(Group, Config) when Group == quic ->
- Apps = emqx_cth_suite:start(
- [
- {emqx,
- ?config(emqx_config, Config) ++
- "\n listeners.quic.test {"
- "\n enable = true"
- "\n ssl_options.verify = verify_peer"
- "\n }"}
- ],
- #{work_dir => emqx_cth_suite:work_dir(Config)}
- ),
- [
- {port, get_listener_port(quic, test)},
- {conn_fun, quic_connect},
- {ssl_opts, emqx_common_test_helpers:client_ssl_twoway()},
- {ssl, true},
- {group_apps, Apps}
- | Config
- ];
- init_per_group(no_kill_connection_process, Config) ->
- [{kill_connection_process, false} | Config];
- init_per_group(kill_connection_process, Config) ->
- [{kill_connection_process, true} | Config].
- get_listener_port(Type, Name) ->
- case emqx_config:get([listeners, Type, Name, bind]) of
- {_, Port} -> Port;
- Port -> Port
- end.
- end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
- ok = emqx_cth_suite:stop(?config(group_apps, Config));
- end_per_group(_, _Config) ->
- ok.
- init_per_testcase(TestCase, Config) ->
- Config1 = preconfig_per_testcase(TestCase, Config),
- case erlang:function_exported(?MODULE, TestCase, 2) of
- true -> ?MODULE:TestCase(init, Config1);
- _ -> Config1
- end.
- end_per_testcase(TestCase, Config) ->
- case erlang:function_exported(?MODULE, TestCase, 2) of
- true -> ?MODULE:TestCase('end', Config);
- false -> ok
- end,
- Config.
- preconfig_per_testcase(TestCase, Config) ->
- {BaseName, Config1} =
- case ?config(tc_group_properties, Config) of
- [] ->
- %% We are running a single testcase
- {
- atom_to_binary(TestCase),
- init_per_group(tcp, init_per_group(kill_connection_process, Config))
- };
- [_ | _] = Props ->
- Path = lists:reverse(?config(tc_group_path, Config) ++ Props),
- Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)],
- Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]),
- {iolist_to_binary(Pre1), Config}
- end,
- [
- {topic, iolist_to_binary([BaseName, "/foo"])},
- {stopic, iolist_to_binary([BaseName, "/+"])},
- {stopic_alt, iolist_to_binary([BaseName, "/foo"])},
- {client_id, BaseName}
- | Config1
- ].
- %%--------------------------------------------------------------------
- %% Helpers
- %%--------------------------------------------------------------------
- client_info(Key, Client) ->
- maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
- receive_messages(Count) ->
- receive_messages(Count, []).
- receive_messages(0, Msgs) ->
- Msgs;
- receive_messages(Count, Msgs) ->
- receive
- {publish, Msg} ->
- receive_messages(Count - 1, [Msg | Msgs]);
- _Other ->
- receive_messages(Count, Msgs)
- after 5000 ->
- Msgs
- end.
- maybe_kill_connection_process(ClientId, Config) ->
- case ?config(kill_connection_process, Config) of
- true ->
- case emqx_cm:lookup_channels(ClientId) of
- [] ->
- ok;
- [ConnectionPid] ->
- ?assert(is_pid(ConnectionPid)),
- Ref = monitor(process, ConnectionPid),
- ConnectionPid ! die_if_test,
- receive
- {'DOWN', Ref, process, ConnectionPid, normal} -> ok
- after 3000 -> error(process_did_not_die)
- end,
- wait_for_cm_unregister(ClientId)
- end;
- false ->
- ok
- end.
- wait_for_cm_unregister(ClientId) ->
- wait_for_cm_unregister(ClientId, 100).
- wait_for_cm_unregister(_ClientId, 0) ->
- error(cm_did_not_unregister);
- wait_for_cm_unregister(ClientId, N) ->
- case emqx_cm:lookup_channels(ClientId) of
- [] ->
- ok;
- [_] ->
- timer:sleep(100),
- wait_for_cm_unregister(ClientId, N - 1)
- end.
- publish(Topic, Payloads) ->
- publish(Topic, Payloads, false).
- publish(Topic, Payloads, WaitForUnregister) ->
- Fun = fun(Client, Payload) ->
- {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
- end,
- do_publish(Payloads, Fun, WaitForUnregister).
- do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) ->
- %% Publish from another process to avoid connection confusion.
- {Pid, Ref} =
- spawn_monitor(
- fun() ->
- %% For convenience, always publish using tcp.
- %% The publish path is not what we are testing.
- ClientID = <<"ps_SUITE_publisher">>,
- {ok, Client} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientID},
- {port, 1883}
- ]),
- {ok, _} = emqtt:connect(Client),
- lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
- ok = emqtt:disconnect(Client),
- %% Snabbkaffe sometimes fails unless all processes are gone.
- case WaitForUnregister of
- false ->
- ok;
- true ->
- case emqx_cm:lookup_channels(ClientID) of
- [] ->
- ok;
- [ConnectionPid] ->
- ?assert(is_pid(ConnectionPid)),
- Ref1 = monitor(process, ConnectionPid),
- receive
- {'DOWN', Ref1, process, ConnectionPid, _} -> ok
- after 3000 -> error(process_did_not_die)
- end,
- wait_for_cm_unregister(ClientID)
- end
- end
- end
- ),
- receive
- {'DOWN', Ref, process, Pid, normal} -> ok;
- {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
- end;
- do_publish(Payload, PublishFun, WaitForUnregister) ->
- do_publish([Payload], PublishFun, WaitForUnregister).
- %%--------------------------------------------------------------------
- %% Test Cases
- %%--------------------------------------------------------------------
- t_choose_impl(Config) ->
- ClientId = ?config(client_id, Config),
- ConnFun = ?config(conn_fun, Config),
- {ok, Client} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client),
- [ChanPid] = emqx_cm:lookup_channels(ClientId),
- ?assertEqual(
- case ?config(persistent_store, Config) of
- false -> emqx_session_mem;
- ds -> emqx_persistent_session_ds
- end,
- emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid))
- ).
- t_connect_discards_existing_client(Config) ->
- ClientId = ?config(client_id, Config),
- ConnFun = ?config(conn_fun, Config),
- ClientOpts = [
- {clientid, ClientId},
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}}
- | Config
- ],
- {ok, Client1} = emqtt:start_link(ClientOpts),
- true = unlink(Client1),
- MRef = erlang:monitor(process, Client1),
- {ok, _} = emqtt:ConnFun(Client1),
- {ok, Client2} = emqtt:start_link(ClientOpts),
- {ok, _} = emqtt:ConnFun(Client2),
- receive
- {'DOWN', MRef, process, Client1, Reason} ->
- ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason),
- ok = emqtt:stop(Client2),
- ok
- after 1000 ->
- error({client_still_connected, Client1})
- end.
- %% [MQTT-3.1.2-23]
- t_connect_session_expiry_interval(init, Config) -> skip_ds_tc(Config);
- t_connect_session_expiry_interval('end', _Config) -> ok.
- t_connect_session_expiry_interval(Config) ->
- ConnFun = ?config(conn_fun, Config),
- Topic = ?config(topic, Config),
- STopic = ?config(stopic, Config),
- Payload = <<"test message">>,
- ClientId = ?config(client_id, Config),
- {ok, Client1} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
- ok = emqtt:disconnect(Client1),
- maybe_kill_connection_process(ClientId, Config),
- publish(Topic, Payload),
- {ok, Client2} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, false}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- [Msg | _] = receive_messages(1),
- ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
- ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
- ?assertEqual({ok, 2}, maps:find(qos, Msg)),
- ok = emqtt:disconnect(Client2).
- t_without_client_id(Config) ->
- %% Emqtt client dies
- process_flag(trap_exit, true),
- ConnFun = ?config(conn_fun, Config),
- {ok, Client0} = emqtt:start_link([
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, false}
- | Config
- ]),
- {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0),
- ok.
- t_assigned_clientid_persistent_session(Config) ->
- ConnFun = ?config(conn_fun, Config),
- {ok, Client1} = emqtt:start_link([
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, true}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- AssignedClientId = client_info(clientid, Client1),
- ok = emqtt:disconnect(Client1),
- maybe_kill_connection_process(AssignedClientId, Config),
- {ok, Client2} = emqtt:start_link([
- {clientid, AssignedClientId},
- {proto_ver, v5},
- {clean_start, false}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- ?assertEqual(1, client_info(session_present, Client2)),
- ok = emqtt:disconnect(Client2).
- t_cancel_on_disconnect(Config) ->
- %% Open a persistent session, but cancel the persistence when
- %% shutting down the connection.
- ConnFun = ?config(conn_fun, Config),
- ClientId = ?config(client_id, Config),
- {ok, Client1} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, true}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
- wait_for_cm_unregister(ClientId),
- {ok, Client2} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {clean_start, false},
- {properties, #{'Session-Expiry-Interval' => 30}}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- ?assertEqual(0, client_info(session_present, Client2)),
- ok = emqtt:disconnect(Client2).
- t_persist_on_disconnect(Config) ->
- %% Open a non-persistent session, but add the persistence when
- %% shutting down the connection. This is a protocol error, and
- %% should not convert the session into a persistent session.
- ConnFun = ?config(conn_fun, Config),
- ClientId = ?config(client_id, Config),
- {ok, Client1} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 0}},
- {clean_start, true}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- %% Strangely enough, the disconnect is reported as successful by emqtt.
- ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}),
- wait_for_cm_unregister(ClientId),
- {ok, Client2} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {clean_start, false},
- {properties, #{'Session-Expiry-Interval' => 30}}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- %% The session should not be known, since it wasn't persisted because of the
- %% changed expiry interval in the disconnect call.
- ?assertEqual(0, client_info(session_present, Client2)),
- ok = emqtt:disconnect(Client2).
- t_process_dies_session_expires(init, Config) -> skip_ds_tc(Config);
- t_process_dies_session_expires('end', _Config) -> ok.
- t_process_dies_session_expires(Config) ->
- %% Emulate an error in the connect process,
- %% or that the node of the process goes down.
- %% A persistent session should eventually expire.
- ConnFun = ?config(conn_fun, Config),
- ClientId = ?config(client_id, Config),
- Topic = ?config(topic, Config),
- STopic = ?config(stopic, Config),
- Payload = <<"test">>,
- {ok, Client1} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 1}},
- {clean_start, true}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
- ok = emqtt:disconnect(Client1),
- maybe_kill_connection_process(ClientId, Config),
- ok = publish(Topic, [Payload]),
- timer:sleep(1100),
- {ok, Client2} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, false}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- ?assertEqual(0, client_info(session_present, Client2)),
- %% We should not receive the pending message
- ?assertEqual([], receive_messages(1)),
- emqtt:disconnect(Client2).
- t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config);
- t_publish_while_client_is_gone('end', _Config) -> ok.
- t_publish_while_client_is_gone(Config) ->
- %% A persistent session should receive messages in its
- %% subscription even if the process owning the session dies.
- ConnFun = ?config(conn_fun, Config),
- Topic = ?config(topic, Config),
- STopic = ?config(stopic, Config),
- Payload1 = <<"hello1">>,
- Payload2 = <<"hello2">>,
- ClientId = ?config(client_id, Config),
- {ok, Client1} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, true}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
- ok = emqtt:disconnect(Client1),
- maybe_kill_connection_process(ClientId, Config),
- ok = publish(Topic, [Payload1, Payload2]),
- {ok, Client2} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, false}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- Msgs = receive_messages(2),
- ?assertMatch([_, _], Msgs),
- [Msg2, Msg1] = Msgs,
- ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
- ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
- ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
- ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
- ok = emqtt:disconnect(Client2).
- t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config);
- t_clean_start_drops_subscriptions('end', _Config) -> ok.
- t_clean_start_drops_subscriptions(Config) ->
- %% 1. A persistent session is started and disconnected.
- %% 2. While disconnected, a message is published and persisted.
- %% 3. When connecting again, the clean start flag is set, the subscription is renewed,
- %% then we disconnect again.
- %% 4. Finally, a new connection is made with clean start set to false.
- %% The original message should not be delivered.
- ConnFun = ?config(conn_fun, Config),
- Topic = ?config(topic, Config),
- STopic = ?config(stopic, Config),
- Payload1 = <<"hello1">>,
- Payload2 = <<"hello2">>,
- Payload3 = <<"hello3">>,
- ClientId = ?config(client_id, Config),
- %% 1.
- {ok, Client1} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, true}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
- ok = emqtt:disconnect(Client1),
- maybe_kill_connection_process(ClientId, Config),
- %% 2.
- ok = publish(Topic, Payload1),
- %% 3.
- {ok, Client2} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, true}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- ?assertEqual(0, client_info(session_present, Client2)),
- {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
- ok = publish(Topic, Payload2),
- [Msg1] = receive_messages(1),
- ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
- ok = emqtt:disconnect(Client2),
- maybe_kill_connection_process(ClientId, Config),
- %% 4.
- {ok, Client3} = emqtt:start_link([
- {proto_ver, v5},
- {clientid, ClientId},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, false}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client3),
- ok = publish(Topic, Payload3),
- [Msg2] = receive_messages(1),
- ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
- ok = emqtt:disconnect(Client3).
- t_unsubscribe(Config) ->
- ConnFun = ?config(conn_fun, Config),
- STopic = ?config(stopic, Config),
- ClientId = ?config(client_id, Config),
- {ok, Client} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client),
- {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2),
- ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
- {ok, _, _} = emqtt:unsubscribe(Client, STopic),
- ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
- ok = emqtt:disconnect(Client).
- t_multiple_subscription_matches(init, Config) -> skip_ds_tc(Config);
- t_multiple_subscription_matches('end', _Config) -> ok.
- t_multiple_subscription_matches(Config) ->
- ConnFun = ?config(conn_fun, Config),
- Topic = ?config(topic, Config),
- STopic1 = ?config(stopic, Config),
- STopic2 = ?config(stopic_alt, Config),
- Payload = <<"test message">>,
- ClientId = ?config(client_id, Config),
- {ok, Client1} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client1),
- {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2),
- {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2),
- ok = emqtt:disconnect(Client1),
- maybe_kill_connection_process(ClientId, Config),
- publish(Topic, Payload),
- {ok, Client2} = emqtt:start_link([
- {clientid, ClientId},
- {proto_ver, v5},
- {properties, #{'Session-Expiry-Interval' => 30}},
- {clean_start, false}
- | Config
- ]),
- {ok, _} = emqtt:ConnFun(Client2),
- %% We will receive the same message twice because it matches two subscriptions.
- [Msg1, Msg2] = receive_messages(2),
- ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)),
- ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)),
- ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)),
- ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)),
- ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
- ?assertEqual({ok, 2}, maps:find(qos, Msg2)),
- ok = emqtt:disconnect(Client2).
- skip_ds_tc(Config) ->
- case ?config(persistent_store, Config) of
- ds ->
- {skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"};
- _ ->
- Config
- end.
|