emqx_session_mem_SUITE.erl 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_session_mem_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx.hrl").
  20. -include_lib("emqx/include/emqx_mqtt.hrl").
  21. -include_lib("eunit/include/eunit.hrl").
  22. -include_lib("common_test/include/ct.hrl").
  23. all() -> emqx_common_test_helpers:all(?MODULE).
  24. -type inflight_data_phase() :: wait_ack | wait_comp.
  25. -record(inflight_data, {
  26. phase :: inflight_data_phase(),
  27. message :: emqx_types:message(),
  28. timestamp :: non_neg_integer()
  29. }).
  30. %%--------------------------------------------------------------------
  31. %% CT callbacks
  32. %%--------------------------------------------------------------------
  33. init_per_suite(Config) ->
  34. ok = meck:new(
  35. [emqx_broker, emqx_hooks, emqx_session],
  36. [passthrough, no_history, no_link]
  37. ),
  38. ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
  39. Apps = emqx_cth_suite:start(
  40. [
  41. {emqx, #{
  42. override_env => [{boot_modules, [broker]}]
  43. }}
  44. ],
  45. #{work_dir => emqx_cth_suite:work_dir(Config)}
  46. ),
  47. [{suite_apps, Apps} | Config].
  48. end_per_suite(Config) ->
  49. ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
  50. meck:unload([emqx_broker, emqx_hooks]).
  51. %%--------------------------------------------------------------------
  52. %% Test cases for session init
  53. %%--------------------------------------------------------------------
  54. t_session_init(_) ->
  55. ClientInfo = #{zone => default, clientid => <<"fake-test">>},
  56. ConnInfo = #{receive_maximum => 64, expiry_interval => 0},
  57. Session = emqx_session_mem:create(
  58. ClientInfo,
  59. ConnInfo,
  60. _WillMsg = undefined,
  61. emqx_session:get_session_conf(ClientInfo)
  62. ),
  63. ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)),
  64. ?assertEqual(0, emqx_session_mem:info(subscriptions_cnt, Session)),
  65. ?assertEqual(infinity, emqx_session_mem:info(subscriptions_max, Session)),
  66. ?assertEqual(false, emqx_session_mem:info(upgrade_qos, Session)),
  67. ?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session)),
  68. ?assertEqual(64, emqx_session_mem:info(inflight_max, Session)),
  69. ?assertEqual(1, emqx_session_mem:info(next_pkt_id, Session)),
  70. ?assertEqual(30000, emqx_session_mem:info(retry_interval, Session)),
  71. ?assertEqual(0, emqx_mqueue:len(emqx_session_mem:info(mqueue, Session))),
  72. ?assertEqual(0, emqx_session_mem:info(awaiting_rel_cnt, Session)),
  73. ?assertEqual(100, emqx_session_mem:info(awaiting_rel_max, Session)),
  74. ?assertEqual(300000, emqx_session_mem:info(await_rel_timeout, Session)),
  75. ?assert(is_integer(emqx_session_mem:info(created_at, Session))).
  76. %%--------------------------------------------------------------------
  77. %% Test cases for session info/stats
  78. %%--------------------------------------------------------------------
  79. t_session_info(_) ->
  80. Keys = [subscriptions, upgrade_qos, retry_interval, await_rel_timeout],
  81. ?assertMatch(
  82. #{
  83. subscriptions := #{},
  84. upgrade_qos := false,
  85. retry_interval := 30000,
  86. await_rel_timeout := 300000
  87. },
  88. maps:from_list(emqx_session_mem:info(Keys, session()))
  89. ).
  90. t_session_stats(_) ->
  91. Stats = emqx_session_mem:stats(session()),
  92. ?assertMatch(
  93. #{
  94. subscriptions_max := infinity,
  95. inflight_max := 0,
  96. mqueue_len := 0,
  97. mqueue_max := 1000,
  98. mqueue_dropped := 0,
  99. next_pkt_id := 1,
  100. awaiting_rel_cnt := 0,
  101. awaiting_rel_max := 100
  102. },
  103. maps:from_list(Stats)
  104. ).
  105. t_session_inflight_query(_) ->
  106. EmptyInflight = emqx_inflight:new(500),
  107. Session = session(#{inflight => EmptyInflight}),
  108. EmptyQueryResMeta = {[], #{position => none, start => none}},
  109. ?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
  110. ?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
  111. RandPos = erlang:system_time(nanosecond),
  112. ?assertEqual({[], #{position => RandPos, start => none}}, inflight_query(Session, RandPos, 10)),
  113. Inflight = lists:foldl(
  114. fun(Seq, Acc) ->
  115. Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, integer_to_binary(Seq)),
  116. emqx_inflight:insert(Seq, emqx_session_mem:with_ts(Msg), Acc)
  117. end,
  118. EmptyInflight,
  119. lists:seq(1, 114)
  120. ),
  121. Session1 = session(#{inflight => Inflight}),
  122. {LastPos, LastStart} = lists:foldl(
  123. fun(PageSeq, {Pos, PrevStart}) ->
  124. Limit = 10,
  125. {Page, #{position := NextPos, start := Start}} = inflight_query(Session1, Pos, Limit),
  126. ?assertEqual(10, length(Page)),
  127. ExpFirst = PageSeq * Limit - Limit + 1,
  128. ExpLast = PageSeq * Limit,
  129. FirstMsg = lists:nth(1, Page),
  130. LastMsg = lists:nth(10, Page),
  131. ?assertEqual(integer_to_binary(ExpFirst), emqx_message:payload(FirstMsg)),
  132. ?assertEqual(integer_to_binary(ExpLast), emqx_message:payload(LastMsg)),
  133. %% start value must not change as Inflight is not modified during traversal
  134. NextStart =
  135. case PageSeq of
  136. 1 ->
  137. ?assertEqual(inflight_ts(FirstMsg), Start),
  138. Start;
  139. _ ->
  140. ?assertEqual(PrevStart, Start),
  141. PrevStart
  142. end,
  143. ?assertEqual(inflight_ts(LastMsg), NextPos),
  144. {NextPos, NextStart}
  145. end,
  146. {none, none},
  147. lists:seq(1, 11)
  148. ),
  149. {LastPartialPage, #{position := FinalPos} = LastMeta} = inflight_query(
  150. Session1, LastPos, 10
  151. ),
  152. LastMsg = lists:nth(4, LastPartialPage),
  153. ?assertEqual(4, length(LastPartialPage)),
  154. ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
  155. ?assertEqual(<<"114">>, emqx_message:payload(LastMsg)),
  156. ?assertEqual(#{position => inflight_ts(LastMsg), start => LastStart}, LastMeta),
  157. ?assertEqual(
  158. {[], #{start => LastStart, position => FinalPos}},
  159. inflight_query(Session1, FinalPos, 10)
  160. ),
  161. {LargePage, LargeMeta} = inflight_query(Session1, none, 1000),
  162. ?assertEqual(114, length(LargePage)),
  163. ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
  164. ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
  165. ?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta),
  166. {FullPage, FullMeta} = inflight_query(Session1, none, 114),
  167. ?assertEqual(LargePage, FullPage),
  168. ?assertEqual(LargeMeta, FullMeta),
  169. Session2 = session(#{inflight => emqx_inflight:delete(1, Inflight)}),
  170. {PageAfterRemove, #{start := StartAfterRemove}} = inflight_query(Session2, none, 10),
  171. ?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))),
  172. ?assertEqual(StartAfterRemove, inflight_ts(hd(PageAfterRemove))).
  173. %%--------------------------------------------------------------------
  174. %% Test cases for sub/unsub
  175. %%--------------------------------------------------------------------
  176. t_subscribe(_) ->
  177. ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
  178. {ok, Session} = emqx_session_mem:subscribe(<<"#">>, subopts(), session()),
  179. ?assertEqual(1, emqx_session_mem:info(subscriptions_cnt, Session)).
  180. t_is_subscriptions_full_false(_) ->
  181. Session = session(#{max_subscriptions => infinity}),
  182. ?assertNot(emqx_session_mem:is_subscriptions_full(Session)).
  183. t_is_subscriptions_full_true(_) ->
  184. ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
  185. Session = session(#{max_subscriptions => 1}),
  186. ?assertNot(emqx_session_mem:is_subscriptions_full(Session)),
  187. {ok, Session1} = emqx_session_mem:subscribe(
  188. <<"t1">>, subopts(), Session
  189. ),
  190. ?assert(emqx_session_mem:is_subscriptions_full(Session1)),
  191. {error, ?RC_QUOTA_EXCEEDED} = emqx_session_mem:subscribe(
  192. <<"t2">>, subopts(), Session1
  193. ).
  194. t_unsubscribe(_) ->
  195. ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
  196. SubOpts = subopts(),
  197. Session = session(#{subscriptions => #{<<"#">> => SubOpts}}),
  198. {ok, Session1, SubOpts} = emqx_session_mem:unsubscribe(<<"#">>, Session),
  199. {error, ?RC_NO_SUBSCRIPTION_EXISTED} = emqx_session_mem:unsubscribe(<<"#">>, Session1).
  200. t_publish_qos0(_) ->
  201. ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
  202. Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>),
  203. {ok, [], [], Session} = emqx_session_mem:publish(1, Msg, Session = session()),
  204. {ok, [], [], Session} = emqx_session_mem:publish(undefined, Msg, Session).
  205. t_publish_qos1(_) ->
  206. ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
  207. Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>),
  208. {ok, [], [], Session} = emqx_session_mem:publish(1, Msg, Session = session()),
  209. {ok, [], [], Session} = emqx_session_mem:publish(2, Msg, Session).
  210. t_publish_qos2(_) ->
  211. ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
  212. Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
  213. {ok, [], Session} = emqx_session_mem:publish(1, Msg, session()),
  214. ?assertEqual(1, emqx_session_mem:info(awaiting_rel_cnt, Session)),
  215. {ok, Session1} = emqx_session_mem:pubrel(1, Session),
  216. ?assertEqual(0, emqx_session_mem:info(awaiting_rel_cnt, Session1)),
  217. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubrel(1, Session1).
  218. t_publish_qos2_with_error_return(_) ->
  219. ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
  220. ok = meck:expect(emqx_hooks, run, fun
  221. ('message.dropped', [Msg, _By, ReasonName]) ->
  222. self() ! {'message.dropped', ReasonName, Msg},
  223. ok;
  224. (_Hook, _Arg) ->
  225. ok
  226. end),
  227. PacketId1 = 1,
  228. Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{PacketId1 => ts(millisecond)}}),
  229. Msg1 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload1">>),
  230. {error, RC1 = ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(
  231. clientinfo(), PacketId1, Msg1, Session
  232. ),
  233. receive
  234. {'message.dropped', Reason1, RecMsg1} ->
  235. ?assertEqual(Reason1, emqx_reason_codes:name(RC1)),
  236. ?assertEqual(RecMsg1, Msg1)
  237. after 1000 ->
  238. ct:fail(?FUNCTION_NAME)
  239. end,
  240. Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>),
  241. {ok, [], Session1} = emqx_session:publish(
  242. clientinfo(), _PacketId2 = 2, Msg2, Session
  243. ),
  244. ?assertEqual(2, emqx_session_mem:info(awaiting_rel_cnt, Session1)),
  245. {error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(
  246. clientinfo(), _PacketId3 = 3, Msg2, Session1
  247. ),
  248. receive
  249. {'message.dropped', Reason2, RecMsg2} ->
  250. ?assertEqual(Reason2, emqx_reason_codes:name(RC2)),
  251. ?assertEqual(RecMsg2, Msg2)
  252. after 1000 ->
  253. ct:fail(?FUNCTION_NAME)
  254. end,
  255. ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end).
  256. t_is_awaiting_full_false(_) ->
  257. Session = session(#{max_awaiting_rel => infinity}),
  258. ?assertNot(emqx_session_mem:is_awaiting_full(Session)).
  259. t_is_awaiting_full_true(_) ->
  260. Session = session(#{
  261. max_awaiting_rel => 1,
  262. awaiting_rel => #{1 => ts(millisecond)}
  263. }),
  264. ?assert(emqx_session_mem:is_awaiting_full(Session)).
  265. t_puback(_) ->
  266. Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
  267. Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg), emqx_inflight:new()),
  268. Session = session(#{inflight => Inflight, mqueue => mqueue()}),
  269. {ok, Msg, [], Session1} = emqx_session_mem:puback(clientinfo(), 1, Session),
  270. ?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session1)).
  271. t_puback_with_dequeue(_) ->
  272. Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>),
  273. Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg1), emqx_inflight:new()),
  274. Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
  275. {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
  276. Session = session(#{inflight => Inflight, mqueue => Q}),
  277. {ok, Msg1, [{_, Msg3}], Session1} = emqx_session_mem:puback(clientinfo(), 1, Session),
  278. ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)),
  279. ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session1)),
  280. ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
  281. t_puback_error_packet_id_in_use(_) ->
  282. Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
  283. {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
  284. emqx_session_mem:puback(clientinfo(), 1, session(#{inflight => Inflight})).
  285. t_puback_error_packet_id_not_found(_) ->
  286. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:puback(clientinfo(), 1, session()).
  287. t_pubrec(_) ->
  288. Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
  289. Inflight = emqx_inflight:insert(2, with_ts(wait_ack, Msg), emqx_inflight:new()),
  290. Session = session(#{inflight => Inflight}),
  291. {ok, Msg, Session1} = emqx_session_mem:pubrec(2, Session),
  292. ?assertMatch(
  293. [#inflight_data{phase = wait_comp}],
  294. emqx_inflight:values(emqx_session_mem:info(inflight, Session1))
  295. ).
  296. t_pubrec_packet_id_in_use_error(_) ->
  297. Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
  298. Session = session(#{inflight => Inflight}),
  299. {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session_mem:pubrec(1, Session).
  300. t_pubrec_packet_id_not_found_error(_) ->
  301. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubrec(1, session()).
  302. t_pubrel(_) ->
  303. Session = session(#{awaiting_rel => #{1 => ts(millisecond)}}),
  304. {ok, Session1} = emqx_session_mem:pubrel(1, Session),
  305. ?assertEqual(#{}, emqx_session_mem:info(awaiting_rel, Session1)).
  306. t_pubrel_error_packetid_not_found(_) ->
  307. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubrel(1, session()).
  308. t_pubcomp(_) ->
  309. Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
  310. Inflight = emqx_inflight:insert(1, with_ts(wait_comp, Msg), emqx_inflight:new()),
  311. Session = session(#{inflight => Inflight}),
  312. {ok, Msg, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session),
  313. ?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session1)).
  314. t_pubcomp_error_packetid_in_use(_) ->
  315. Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
  316. Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
  317. Session = session(#{inflight => Inflight}),
  318. {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session_mem:pubcomp(clientinfo(), 1, Session).
  319. t_pubcomp_error_packetid_not_found(_) ->
  320. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubcomp(clientinfo(), 1, session()).
  321. %%--------------------------------------------------------------------
  322. %% Test cases for deliver/retry
  323. %%--------------------------------------------------------------------
  324. t_dequeue(_) ->
  325. Q = mqueue(#{store_qos0 => true}),
  326. {ok, [], Session} = emqx_session_mem:dequeue(clientinfo(), session(#{mqueue => Q})),
  327. Msgs = [
  328. emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>),
  329. emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
  330. emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>)
  331. ],
  332. Session1 = emqx_session_mem:enqueue(clientinfo(), Msgs, Session),
  333. {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
  334. emqx_session_mem:dequeue(clientinfo(), Session1),
  335. ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session2)),
  336. ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session2)),
  337. ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
  338. ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
  339. ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)).
  340. t_deliver_qos0(_) ->
  341. ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
  342. {ok, Session} = emqx_session_mem:subscribe(<<"t0">>, subopts(), session()),
  343. {ok, Session1} = emqx_session_mem:subscribe(<<"t1">>, subopts(), Session),
  344. Deliveries = enrich([delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], Session1),
  345. {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
  346. emqx_session_mem:deliver(clientinfo(), Deliveries, Session1),
  347. ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
  348. ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
  349. t_deliver_qos1(_) ->
  350. ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
  351. {ok, Session} = emqx_session_mem:subscribe(
  352. <<"t1">>, subopts(#{qos => ?QOS_1}), session()
  353. ),
  354. Delivers = enrich([delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], Session),
  355. {ok, [{1, Msg1}, {2, Msg2}], Session1} =
  356. emqx_session_mem:deliver(clientinfo(), Delivers, Session),
  357. ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session1)),
  358. ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
  359. ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
  360. {ok, Msg1T, [], Session2} = emqx_session_mem:puback(clientinfo(), 1, Session1),
  361. ?assertEqual(Msg1, remove_deliver_flag(Msg1T)),
  362. ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session2)),
  363. {ok, Msg2T, [], Session3} = emqx_session_mem:puback(clientinfo(), 2, Session2),
  364. ?assertEqual(Msg2, remove_deliver_flag(Msg2T)),
  365. ?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session3)).
  366. t_deliver_qos2(_) ->
  367. ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
  368. Session = session(),
  369. Delivers = enrich([delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], Session),
  370. {ok, [{1, Msg1}, {2, Msg2}], Session1} =
  371. emqx_session_mem:deliver(clientinfo(), Delivers, Session),
  372. ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session1)),
  373. ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
  374. ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
  375. t_deliver_one_msg(_) ->
  376. Session = session(),
  377. {ok, [{1, Msg}], Session1} = emqx_session_mem:deliver(
  378. clientinfo(),
  379. enrich(delivery(?QOS_1, <<"t1">>), Session),
  380. Session
  381. ),
  382. ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)),
  383. ?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
  384. t_deliver_when_inflight_is_full(_) ->
  385. Session = session(#{inflight => emqx_inflight:new(1)}),
  386. Delivers = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session),
  387. {ok, Publishes, Session1} =
  388. emqx_session_mem:deliver(clientinfo(), Delivers, Session),
  389. ?assertEqual(1, length(Publishes)),
  390. ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)),
  391. ?assertEqual(1, emqx_session_mem:info(mqueue_len, Session1)),
  392. {ok, Msg1, [{2, Msg2}], Session2} =
  393. emqx_session_mem:puback(clientinfo(), 1, Session1),
  394. ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session2)),
  395. ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session2)),
  396. ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
  397. ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)).
  398. t_enqueue(_) ->
  399. Session = session(#{mqueue => mqueue(#{max_len => 3, store_qos0 => true})}),
  400. Session1 = emqx_session_mem:enqueue(
  401. clientinfo(),
  402. emqx_session:enrich_delivers(
  403. clientinfo(),
  404. [
  405. delivery(?QOS_0, <<"t0">>),
  406. delivery(?QOS_1, <<"t1">>),
  407. delivery(?QOS_2, <<"t2">>)
  408. ],
  409. Session
  410. ),
  411. Session
  412. ),
  413. ?assertEqual(3, emqx_session_mem:info(mqueue_len, Session1)),
  414. Session2 = emqx_session_mem:enqueue(
  415. clientinfo(),
  416. emqx_session:enrich_delivers(clientinfo(), [delivery(?QOS_1, <<"drop">>)], Session1),
  417. Session1
  418. ),
  419. ?assertEqual(3, emqx_session_mem:info(mqueue_len, Session2)).
  420. t_enqueue_qos0(_) ->
  421. Session = session(#{mqueue => mqueue(#{store_qos0 => false})}),
  422. Session1 = emqx_session_mem:enqueue(
  423. clientinfo(),
  424. emqx_session:enrich_delivers(
  425. clientinfo(),
  426. [
  427. delivery(?QOS_0, <<"t0">>),
  428. delivery(?QOS_1, <<"t1">>),
  429. delivery(?QOS_2, <<"t2">>)
  430. ],
  431. Session
  432. ),
  433. Session
  434. ),
  435. ?assertEqual(2, emqx_session_mem:info(mqueue_len, Session1)).
  436. t_retry(_) ->
  437. RetryIntervalMs = 1000,
  438. Session = session(#{retry_interval => RetryIntervalMs}),
  439. Delivers = enrich(
  440. [
  441. delivery(?QOS_1, <<"t1">>, <<"expiressoon">>, _Expiry = 1),
  442. delivery(?QOS_2, <<"t2">>),
  443. delivery(?QOS_0, <<"t3">>),
  444. delivery(?QOS_1, <<"t4">>)
  445. ],
  446. Session
  447. ),
  448. {ok, Pubs, Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session),
  449. [_Pub1, Pub2, _Pub3, Pub4] = Pubs,
  450. {ok, _Msg, Session2} = emqx_session_mem:pubrec(get_packet_id(Pub2), Session1),
  451. ElapseMs = 1500,
  452. ok = timer:sleep(ElapseMs),
  453. {ok, PubsRetry, RetryIntervalMs, Session3} = emqx_session_mem:handle_timeout(
  454. clientinfo(), retry_delivery, Session2
  455. ),
  456. ?assertEqual(
  457. [
  458. % Pub1 is expired
  459. {pubrel, get_packet_id(Pub2)},
  460. % Pub3 is QoS0
  461. set_duplicate_pub(Pub4)
  462. ],
  463. remove_deliver_flag(PubsRetry)
  464. ),
  465. ?assertEqual(
  466. 2,
  467. emqx_session_mem:info(inflight_cnt, Session3)
  468. ).
  469. %%--------------------------------------------------------------------
  470. %% Test cases for takeover/resume
  471. %%--------------------------------------------------------------------
  472. t_takeover(_) ->
  473. ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
  474. Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
  475. ok = emqx_session_mem:takeover(Session).
  476. t_resume(_) ->
  477. ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
  478. Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
  479. _ = emqx_session_mem:resume(#{clientid => <<"clientid">>}, Session).
  480. t_replay(_) ->
  481. Session = session(),
  482. Messages = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session),
  483. {ok, Pubs, Session1} = emqx_session_mem:deliver(clientinfo(), Messages, Session),
  484. Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
  485. Session2 = emqx_session_mem:enqueue(clientinfo(), [Msg], Session1),
  486. Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
  487. Pendings =
  488. [Msg4, Msg5] = enrich(
  489. [_D4 = delivery(?QOS_1, <<"t4">>), D5 = delivery(?QOS_2, <<"t5">>)],
  490. Session1
  491. ),
  492. _ = self() ! D5,
  493. _ = self() ! D6 = delivery(?QOS_1, <<"t6">>),
  494. [Msg6] = enrich([D6], Session1),
  495. {ok, ReplayPubs, Session3} = emqx_session_mem:replay(clientinfo(), Pendings, Session2),
  496. ?assertEqual(
  497. Pubs1 ++ [{3, Msg}, {4, Msg4}, {5, Msg5}, {6, Msg6}],
  498. remove_deliver_flag(ReplayPubs)
  499. ),
  500. ?assertEqual(6, emqx_session_mem:info(inflight_cnt, Session3)).
  501. t_expire_awaiting_rel(_) ->
  502. Now = ts(millisecond),
  503. AwaitRelTimeout = 10000,
  504. Session = session(#{await_rel_timeout => AwaitRelTimeout}),
  505. Ts1 = Now - 1000,
  506. Ts2 = Now - 20000,
  507. {ok, [], Session1} = emqx_session_mem:expire(clientinfo(), Session),
  508. Session2 = emqx_session_mem:set_field(awaiting_rel, #{1 => Ts1, 2 => Ts2}, Session1),
  509. {ok, [], Timeout, Session3} = emqx_session_mem:expire(clientinfo(), Session2),
  510. ?assertEqual(#{1 => Ts1}, emqx_session_mem:info(awaiting_rel, Session3)),
  511. ?assert(Timeout =< AwaitRelTimeout).
  512. t_expire_awaiting_rel_all(_) ->
  513. Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}),
  514. {ok, [], Session1} = emqx_session_mem:expire(clientinfo(), Session),
  515. ?assertEqual(#{}, emqx_session_mem:info(awaiting_rel, Session1)).
  516. %%--------------------------------------------------------------------
  517. %% CT for utility functions
  518. %%--------------------------------------------------------------------
  519. t_next_pakt_id(_) ->
  520. Session = session(#{next_pkt_id => 16#FFFF}),
  521. Session1 = emqx_session_mem:next_pkt_id(Session),
  522. ?assertEqual(1, emqx_session_mem:info(next_pkt_id, Session1)),
  523. Session2 = emqx_session_mem:next_pkt_id(Session1),
  524. ?assertEqual(2, emqx_session_mem:info(next_pkt_id, Session2)).
  525. t_obtain_next_pkt_id(_) ->
  526. Session = session(#{next_pkt_id => 16#FFFF}),
  527. {16#FFFF, Session1} = emqx_session_mem:obtain_next_pkt_id(Session),
  528. ?assertEqual(1, emqx_session_mem:info(next_pkt_id, Session1)),
  529. {1, Session2} = emqx_session_mem:obtain_next_pkt_id(Session1),
  530. ?assertEqual(2, emqx_session_mem:info(next_pkt_id, Session2)).
  531. %% Helper functions
  532. %%--------------------------------------------------------------------
  533. mqueue() -> mqueue(#{}).
  534. mqueue(Opts) ->
  535. emqx_mqueue:init(maps:merge(#{max_len => 0, store_qos0 => false}, Opts)).
  536. session() -> session(#{}).
  537. session(InitFields) when is_map(InitFields) ->
  538. ClientInfo = #{zone => default, clientid => <<"fake-test">>},
  539. ConnInfo = #{receive_maximum => 0, expiry_interval => 0},
  540. Session = emqx_session_mem:create(
  541. ClientInfo,
  542. ConnInfo,
  543. _WillMsg = undefined,
  544. emqx_session:get_session_conf(ClientInfo)
  545. ),
  546. maps:fold(
  547. fun(Field, Value, SessionAcc) ->
  548. emqx_session_mem:set_field(Field, Value, SessionAcc)
  549. end,
  550. Session,
  551. InitFields
  552. ).
  553. clientinfo() -> clientinfo(#{}).
  554. clientinfo(Init) ->
  555. maps:merge(
  556. #{
  557. zone => ?MODULE,
  558. clientid => <<"clientid">>,
  559. username => <<"username">>
  560. },
  561. Init
  562. ).
  563. subopts() -> subopts(#{}).
  564. subopts(Init) ->
  565. maps:merge(?DEFAULT_SUBOPTS, Init).
  566. delivery(QoS, Topic) ->
  567. Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
  568. {deliver, Topic, emqx_message:make(test, QoS, Topic, Payload)}.
  569. delivery(QoS, Topic, Payload, ExpiryInterval) ->
  570. Headers = #{properties => #{'Message-Expiry-Interval' => ExpiryInterval}},
  571. {deliver, Topic, emqx_message:make(test, QoS, Topic, Payload, #{}, Headers)}.
  572. enrich(Delivers, Session) when is_list(Delivers) ->
  573. emqx_session:enrich_delivers(clientinfo(), Delivers, Session);
  574. enrich(Delivery, Session) when is_tuple(Delivery) ->
  575. enrich([Delivery], Session).
  576. ts(second) ->
  577. erlang:system_time(second);
  578. ts(millisecond) ->
  579. erlang:system_time(millisecond).
  580. with_ts(Phase, Msg) ->
  581. with_ts(Phase, Msg, erlang:system_time(millisecond)).
  582. with_ts(Phase, Msg, Ts) ->
  583. #inflight_data{
  584. phase = Phase,
  585. message = Msg,
  586. timestamp = Ts
  587. }.
  588. remove_deliver_flag({pubrel, Id}) ->
  589. {pubrel, Id};
  590. remove_deliver_flag({Id, Data}) ->
  591. {Id, remove_deliver_flag(Data)};
  592. remove_deliver_flag(List) when is_list(List) ->
  593. lists:map(fun remove_deliver_flag/1, List);
  594. remove_deliver_flag(Msg) ->
  595. emqx_message:remove_header(deliver_begin_at, Msg).
  596. set_duplicate_pub({Id, Msg}) ->
  597. {Id, emqx_message:set_flag(dup, Msg)}.
  598. get_packet_id({Id, _}) ->
  599. Id.
  600. inflight_query(Session, Pos, Limit) ->
  601. emqx_session_mem:info({inflight_msgs, #{position => Pos, limit => Limit}}, Session).
  602. inflight_ts(#message{extra = #{inflight_insert_ts := Ts}}) -> Ts.