emqx_persistent_session_ds.erl 56 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_ds).
  17. -behaviour(emqx_session).
  18. -include("emqx.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -include_lib("snabbkaffe/include/trace.hrl").
  21. -include_lib("stdlib/include/ms_transform.hrl").
  22. -include("emqx_mqtt.hrl").
  23. -include("emqx_session.hrl").
  24. -include("emqx_persistent_session_ds/session_internals.hrl").
  25. -ifdef(TEST).
  26. -include_lib("proper/include/proper.hrl").
  27. -include_lib("eunit/include/eunit.hrl").
  28. -endif.
  29. %% Session API
  30. -export([
  31. create/4,
  32. open/4,
  33. destroy/1,
  34. kick_offline_session/1
  35. ]).
  36. -export([
  37. info/2,
  38. stats/1
  39. ]).
  40. -export([
  41. subscribe/3,
  42. unsubscribe/2,
  43. get_subscription/2
  44. ]).
  45. -export([
  46. publish/3,
  47. puback/3,
  48. pubrec/2,
  49. pubrel/2,
  50. pubcomp/3
  51. ]).
  52. -export([
  53. deliver/3,
  54. replay/3,
  55. handle_timeout/3,
  56. handle_info/2,
  57. disconnect/2,
  58. terminate/2
  59. ]).
  60. %% Will message handling
  61. -export([
  62. clear_will_message/1,
  63. publish_will_message_now/2
  64. ]).
  65. %% Managment APIs:
  66. -export([
  67. list_client_subscriptions/1,
  68. get_client_subscription/2
  69. ]).
  70. %% session table operations
  71. -export([create_tables/0, sync/1]).
  72. %% internal export used by session GC process
  73. -export([destroy_session/1]).
  74. %% Remove me later (satisfy checks for an unused BPAPI)
  75. -export([
  76. do_open_iterator/3,
  77. do_ensure_iterator_closed/1,
  78. do_ensure_all_iterators_closed/1
  79. ]).
  80. -export([print_session/1, seqno_diff/4]).
  81. -ifdef(TEST).
  82. -export([
  83. session_open/4,
  84. list_all_sessions/0
  85. ]).
  86. -endif.
  87. -export_type([
  88. id/0,
  89. seqno/0,
  90. timestamp/0,
  91. topic_filter/0,
  92. share_topic_filter/0,
  93. subscription_id/0,
  94. subscription/0,
  95. session/0,
  96. stream_state/0
  97. ]).
  98. -type seqno() :: non_neg_integer().
  99. %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
  100. %% an atom, in theory (?).
  101. -type id() :: binary().
  102. -type share_topic_filter() :: #share{}.
  103. -type topic_filter() :: emqx_types:topic() | share_topic_filter().
  104. %% Subscription and subscription states:
  105. %%
  106. %% Persistent sessions cannot simply update or delete subscriptions,
  107. %% since subscription parameters must be exactly the same during
  108. %% replay.
  109. %%
  110. %% To solve this problem, we store subscriptions in a twofold manner:
  111. %%
  112. %% - `subscription' is an object that holds up-to-date information
  113. %% about the client's subscription and a reference to the latest
  114. %% subscription state id
  115. %%
  116. %% - `subscription_state' is an immutable object that holds
  117. %% information about the subcription parameters at a certain point of
  118. %% time
  119. %%
  120. %% New subscription states are created whenever the client subscribes
  121. %% to a topics, or updates an existing subscription.
  122. %%
  123. %% Stream replay states contain references to the subscription states.
  124. %%
  125. %% Outdated subscription states are discarded when they are not
  126. %% referenced by either subscription or stream replay state objects.
  127. -type subscription_id() :: integer().
  128. %% This type is a result of merging
  129. %% `emqx_persistent_session_ds_subs:subscription()' with its current
  130. %% state.
  131. -type subscription() :: #{
  132. id := subscription_id(),
  133. start_time := emqx_ds:time(),
  134. current_state := emqx_persistent_session_ds_subs:subscription_state_id(),
  135. subopts := map()
  136. }.
  137. -type shared_sub_state() :: term().
  138. -define(TIMER_PULL, timer_pull).
  139. -define(TIMER_GET_STREAMS, timer_get_streams).
  140. -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
  141. -define(TIMER_RETRY_REPLAY, timer_retry_replay).
  142. -type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT | ?TIMER_RETRY_REPLAY.
  143. %% TODO: Needs configuration?
  144. -define(TIMEOUT_RETRY_REPLAY, 1000).
  145. -type session() :: #{
  146. %% Client ID
  147. id := id(),
  148. %% Configuration:
  149. props := map(),
  150. %% Persistent state:
  151. s := emqx_persistent_session_ds_state:t(),
  152. %% Shared subscription state:
  153. shared_sub_s := shared_sub_state(),
  154. %% Buffer:
  155. inflight := emqx_persistent_session_ds_inflight:t(),
  156. %% In-progress replay:
  157. %% List of stream replay states to be added to the inflight buffer.
  158. replay => [{_StreamKey, stream_state()}, ...],
  159. %% Timers:
  160. timer() => reference()
  161. }.
  162. -define(IS_REPLAY_ONGOING(SESS), is_map_key(replay, SESS)).
  163. -record(req_sync, {
  164. from :: pid(),
  165. ref :: reference()
  166. }).
  167. -type stream_state() :: #srs{}.
  168. -type message() :: emqx_types:message().
  169. -type timestamp() :: emqx_utils_calendar:epoch_millisecond().
  170. -type millisecond() :: non_neg_integer().
  171. -type clientinfo() :: emqx_types:clientinfo().
  172. -type conninfo() :: emqx_session:conninfo().
  173. -type replies() :: emqx_session:replies().
  174. -define(STATS_KEYS, [
  175. durable,
  176. subscriptions_cnt,
  177. subscriptions_max,
  178. inflight_cnt,
  179. inflight_max,
  180. mqueue_len,
  181. mqueue_dropped,
  182. seqno_q1_comm,
  183. seqno_q1_dup,
  184. seqno_q1_next,
  185. seqno_q2_comm,
  186. seqno_q2_dup,
  187. seqno_q2_rec,
  188. seqno_q2_next,
  189. n_streams,
  190. awaiting_rel_cnt,
  191. awaiting_rel_max
  192. ]).
  193. %%
  194. -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
  195. session().
  196. create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
  197. ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)).
  198. -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
  199. {_IsPresent :: true, session(), []} | false.
  200. open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
  201. %% NOTE
  202. %% The fact that we need to concern about discarding all live channels here
  203. %% is essentially a consequence of the in-memory session design, where we
  204. %% have disconnected channels holding onto session state. Ideally, we should
  205. %% somehow isolate those idling not-yet-expired sessions into a separate process
  206. %% space, and move this call back into `emqx_cm` where it belongs.
  207. ok = emqx_cm:takeover_kick(ClientID),
  208. case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of
  209. Session0 = #{} ->
  210. Session1 = Session0#{props => Conf},
  211. Session = do_expire(ClientInfo, Session1),
  212. {true, ensure_timers(Session), []};
  213. false ->
  214. false
  215. end.
  216. -spec destroy(session() | clientinfo()) -> ok.
  217. destroy(#{id := ClientID}) ->
  218. destroy_session(ClientID);
  219. destroy(#{clientid := ClientID}) ->
  220. destroy_session(ClientID).
  221. destroy_session(ClientID) ->
  222. session_drop(ClientID, destroy).
  223. -spec kick_offline_session(emqx_types:clientid()) -> ok.
  224. kick_offline_session(ClientID) ->
  225. case emqx_persistent_message:is_persistence_enabled() of
  226. true ->
  227. session_drop(ClientID, kicked);
  228. false ->
  229. ok
  230. end.
  231. %%--------------------------------------------------------------------
  232. %% Info, Stats
  233. %%--------------------------------------------------------------------
  234. info(Keys, Session) when is_list(Keys) ->
  235. [{Key, info(Key, Session)} || Key <- Keys];
  236. info(id, #{id := ClientID}) ->
  237. ClientID;
  238. info(clientid, #{id := ClientID}) ->
  239. ClientID;
  240. info(durable, _) ->
  241. true;
  242. info(created_at, #{s := S}) ->
  243. emqx_persistent_session_ds_state:get_created_at(S);
  244. info(is_persistent, #{}) ->
  245. true;
  246. info(subscriptions, #{s := S, shared_sub_s := SharedSubS}) ->
  247. maps:merge(
  248. emqx_persistent_session_ds_subs:to_map(S),
  249. emqx_persistent_session_ds_shared_subs:to_map(S, SharedSubS)
  250. );
  251. info(subscriptions_cnt, #{s := S}) ->
  252. emqx_persistent_session_ds_state:n_subscriptions(S);
  253. info(subscriptions_max, #{props := Conf}) ->
  254. maps:get(max_subscriptions, Conf);
  255. info(upgrade_qos, #{props := Conf}) ->
  256. maps:get(upgrade_qos, Conf);
  257. info(inflight, #{inflight := Inflight}) ->
  258. Inflight;
  259. info(inflight_cnt, #{inflight := Inflight}) ->
  260. emqx_persistent_session_ds_inflight:n_inflight(Inflight);
  261. info(inflight_max, #{inflight := Inflight}) ->
  262. emqx_persistent_session_ds_inflight:receive_maximum(Inflight);
  263. info(retry_interval, #{props := Conf}) ->
  264. maps:get(retry_interval, Conf);
  265. info(mqueue_len, #{inflight := Inflight}) ->
  266. emqx_persistent_session_ds_inflight:n_buffered(all, Inflight);
  267. info(mqueue_dropped, _Session) ->
  268. 0;
  269. %% info(next_pkt_id, #{s := S}) ->
  270. %% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S),
  271. %% PacketId;
  272. info(awaiting_rel, #{s := S}) ->
  273. emqx_persistent_session_ds_state:fold_awaiting_rel(fun maps:put/3, #{}, S);
  274. info(awaiting_rel_max, #{props := Conf}) ->
  275. maps:get(max_awaiting_rel, Conf);
  276. info(awaiting_rel_cnt, #{s := S}) ->
  277. emqx_persistent_session_ds_state:n_awaiting_rel(S);
  278. info(await_rel_timeout, #{props := Conf}) ->
  279. maps:get(await_rel_timeout, Conf);
  280. info(seqno_q1_comm, #{s := S}) ->
  281. emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S);
  282. info(seqno_q1_dup, #{s := S}) ->
  283. emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S);
  284. info(seqno_q1_next, #{s := S}) ->
  285. emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S);
  286. info(seqno_q2_comm, #{s := S}) ->
  287. emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S);
  288. info(seqno_q2_dup, #{s := S}) ->
  289. emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S);
  290. info(seqno_q2_rec, #{s := S}) ->
  291. emqx_persistent_session_ds_state:get_seqno(?rec, S);
  292. info(seqno_q2_next, #{s := S}) ->
  293. emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S);
  294. info(n_streams, #{s := S}) ->
  295. emqx_persistent_session_ds_state:n_streams(S);
  296. info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs ->
  297. {error, not_implemented}.
  298. -spec stats(session()) -> emqx_types:stats().
  299. stats(Session) ->
  300. info(?STATS_KEYS, Session).
  301. %% Used by management API
  302. -spec print_session(emqx_types:clientid()) -> map() | undefined.
  303. print_session(ClientId) ->
  304. case try_get_live_session(ClientId) of
  305. {Pid, SessionState} ->
  306. maps:update_with(
  307. s, fun emqx_persistent_session_ds_state:format/1, SessionState#{
  308. '_alive' => {true, Pid}
  309. }
  310. );
  311. not_found ->
  312. case emqx_persistent_session_ds_state:print_session(ClientId) of
  313. undefined ->
  314. undefined;
  315. S ->
  316. #{s => S, '_alive' => false}
  317. end;
  318. not_persistent ->
  319. undefined
  320. end.
  321. %%--------------------------------------------------------------------
  322. %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
  323. %%--------------------------------------------------------------------
  324. %% Suppress warnings about clauses handling unimplemented reuslts
  325. %% of `emqx_persistent_session_ds_shared_subs:on_subscribe/3`
  326. -dialyzer({nowarn_function, subscribe/3}).
  327. -spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
  328. {ok, session()} | {error, emqx_types:reason_code()}.
  329. subscribe(
  330. #share{} = TopicFilter,
  331. SubOpts,
  332. Session
  333. ) ->
  334. case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of
  335. {ok, S1} ->
  336. S = emqx_persistent_session_ds_state:commit(S1),
  337. {ok, Session#{s => S}};
  338. Error = {error, _} ->
  339. Error
  340. end;
  341. subscribe(
  342. TopicFilter,
  343. SubOpts,
  344. Session
  345. ) ->
  346. case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of
  347. {ok, S1} ->
  348. S = emqx_persistent_session_ds_state:commit(S1),
  349. {ok, Session#{s => S}};
  350. Error = {error, _} ->
  351. Error
  352. end.
  353. %% Suppress warnings about clauses handling unimplemented reuslts
  354. %% of `emqx_persistent_session_ds_shared_subs:on_subscribe/4`
  355. -dialyzer({nowarn_function, unsubscribe/2}).
  356. -spec unsubscribe(topic_filter(), session()) ->
  357. {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
  358. unsubscribe(
  359. #share{} = TopicFilter,
  360. Session = #{id := SessionId, s := S0}
  361. ) ->
  362. case emqx_persistent_session_ds_shared_subs:on_unsubscribe(SessionId, TopicFilter, S0) of
  363. {ok, S1, #{id := SubId, subopts := SubOpts}} ->
  364. S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
  365. S = emqx_persistent_session_ds_state:commit(S2),
  366. {ok, Session#{s => S}, SubOpts};
  367. Error = {error, _} ->
  368. Error
  369. end;
  370. unsubscribe(
  371. TopicFilter,
  372. Session = #{id := SessionId, s := S0}
  373. ) ->
  374. case emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0) of
  375. {ok, S1, #{id := SubId, subopts := SubOpts}} ->
  376. S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
  377. S = emqx_persistent_session_ds_state:commit(S2),
  378. {ok, Session#{s => S}, SubOpts};
  379. Error = {error, _} ->
  380. Error
  381. end.
  382. -spec get_subscription(topic_filter(), session()) ->
  383. emqx_types:subopts() | undefined.
  384. get_subscription(#share{}, _) ->
  385. %% TODO: shared subscriptions are not supported yet:
  386. undefined;
  387. get_subscription(TopicFilter, #{s := S}) ->
  388. case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of
  389. #{subopts := SubOpts} ->
  390. SubOpts;
  391. undefined ->
  392. undefined
  393. end.
  394. %%--------------------------------------------------------------------
  395. %% Client -> Broker: PUBLISH
  396. %%--------------------------------------------------------------------
  397. -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) ->
  398. {ok, emqx_types:publish_result(), session()}
  399. | {error, emqx_types:reason_code()}.
  400. publish(
  401. PacketId,
  402. Msg = #message{qos = ?QOS_2, timestamp = Ts},
  403. Session = #{s := S0}
  404. ) ->
  405. case is_awaiting_full(Session) of
  406. false ->
  407. case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of
  408. undefined ->
  409. Results = emqx_broker:publish(Msg),
  410. S = emqx_persistent_session_ds_state:put_awaiting_rel(PacketId, Ts, S0),
  411. {ok, Results, Session#{s => S}};
  412. _Ts ->
  413. {error, ?RC_PACKET_IDENTIFIER_IN_USE}
  414. end;
  415. true ->
  416. {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
  417. end;
  418. publish(_PacketId, Msg, Session) ->
  419. Result = emqx_broker:publish(Msg),
  420. {ok, Result, Session}.
  421. is_awaiting_full(#{s := S, props := Props}) ->
  422. emqx_persistent_session_ds_state:n_awaiting_rel(S) >=
  423. maps:get(max_awaiting_rel, Props, infinity).
  424. -spec expire(emqx_types:clientinfo(), session()) ->
  425. {ok, [], timeout(), session()} | {ok, [], session()}.
  426. expire(ClientInfo, Session0 = #{props := Props}) ->
  427. Session = #{s := S} = do_expire(ClientInfo, Session0),
  428. case emqx_persistent_session_ds_state:n_awaiting_rel(S) of
  429. 0 ->
  430. {ok, [], Session};
  431. _ ->
  432. AwaitRelTimeout = maps:get(await_rel_timeout, Props),
  433. {ok, [], AwaitRelTimeout, Session}
  434. end.
  435. do_expire(ClientInfo, Session = #{s := S0, props := Props}) ->
  436. %% 1. Find expired packet IDs:
  437. Now = erlang:system_time(millisecond),
  438. AwaitRelTimeout = maps:get(await_rel_timeout, Props),
  439. ExpiredPacketIds =
  440. emqx_persistent_session_ds_state:fold_awaiting_rel(
  441. fun(PacketId, Ts, Acc) ->
  442. Age = Now - Ts,
  443. case Age > AwaitRelTimeout of
  444. true ->
  445. [PacketId | Acc];
  446. false ->
  447. Acc
  448. end
  449. end,
  450. [],
  451. S0
  452. ),
  453. %% 2. Perform side effects:
  454. _ = emqx_session_events:handle_event(ClientInfo, {expired_rel, length(ExpiredPacketIds)}),
  455. %% 3. Update state:
  456. S = lists:foldl(
  457. fun emqx_persistent_session_ds_state:del_awaiting_rel/2,
  458. S0,
  459. ExpiredPacketIds
  460. ),
  461. Session#{s => S}.
  462. %%--------------------------------------------------------------------
  463. %% Client -> Broker: PUBACK
  464. %%--------------------------------------------------------------------
  465. -spec puback(clientinfo(), emqx_types:packet_id(), session()) ->
  466. {ok, emqx_types:message(), replies(), session()}
  467. | {error, emqx_types:reason_code()}.
  468. puback(_ClientInfo, PacketId, Session0) ->
  469. case update_seqno(puback, PacketId, Session0) of
  470. {ok, Msg, Session} ->
  471. {ok, Msg, [], pull_now(Session)};
  472. Error ->
  473. Error
  474. end.
  475. %%--------------------------------------------------------------------
  476. %% Client -> Broker: PUBREC
  477. %%--------------------------------------------------------------------
  478. -spec pubrec(emqx_types:packet_id(), session()) ->
  479. {ok, emqx_types:message(), session()}
  480. | {error, emqx_types:reason_code()}.
  481. pubrec(PacketId, Session0) ->
  482. case update_seqno(pubrec, PacketId, Session0) of
  483. {ok, Msg, Session} ->
  484. {ok, Msg, Session};
  485. Error = {error, _} ->
  486. Error
  487. end.
  488. %%--------------------------------------------------------------------
  489. %% Client -> Broker: PUBREL
  490. %%--------------------------------------------------------------------
  491. -spec pubrel(emqx_types:packet_id(), session()) ->
  492. {ok, session()} | {error, emqx_types:reason_code()}.
  493. pubrel(PacketId, Session = #{s := S0}) ->
  494. case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of
  495. undefined ->
  496. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND};
  497. _TS ->
  498. S = emqx_persistent_session_ds_state:del_awaiting_rel(PacketId, S0),
  499. {ok, Session#{s => S}}
  500. end.
  501. %%--------------------------------------------------------------------
  502. %% Client -> Broker: PUBCOMP
  503. %%--------------------------------------------------------------------
  504. -spec pubcomp(clientinfo(), emqx_types:packet_id(), session()) ->
  505. {ok, emqx_types:message(), replies(), session()}
  506. | {error, emqx_types:reason_code()}.
  507. pubcomp(_ClientInfo, PacketId, Session0) ->
  508. case update_seqno(pubcomp, PacketId, Session0) of
  509. {ok, Msg, Session} ->
  510. {ok, Msg, [], pull_now(Session)};
  511. Error = {error, _} ->
  512. Error
  513. end.
  514. %%--------------------------------------------------------------------
  515. %% Delivers
  516. %%--------------------------------------------------------------------
  517. -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
  518. {ok, replies(), session()}.
  519. deliver(ClientInfo, Delivers, Session0) ->
  520. %% Durable sessions still have to handle some transient messages.
  521. %% For example, retainer sends messages to the session directly.
  522. Session = lists:foldl(
  523. fun(Msg, Acc) -> enqueue_transient(ClientInfo, Msg, Acc) end, Session0, Delivers
  524. ),
  525. {ok, [], pull_now(Session)}.
  526. %%--------------------------------------------------------------------
  527. %% Timeouts
  528. %%--------------------------------------------------------------------
  529. -spec handle_timeout(clientinfo(), _Timeout, session()) ->
  530. {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
  531. handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
  532. {Publishes, Session1} =
  533. case ?IS_REPLAY_ONGOING(Session0) of
  534. false ->
  535. drain_buffer(fetch_new_messages(Session0, ClientInfo));
  536. true ->
  537. {[], Session0}
  538. end,
  539. Timeout =
  540. case Publishes of
  541. [] ->
  542. get_config(ClientInfo, [idle_poll_interval]);
  543. [_ | _] ->
  544. 0
  545. end,
  546. Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
  547. {ok, Publishes, Session};
  548. handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
  549. Session = replay_streams(Session0, ClientInfo),
  550. {ok, [], Session};
  551. handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
  552. S1 = emqx_persistent_session_ds_subs:gc(S0),
  553. S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
  554. {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
  555. Interval = get_config(ClientInfo, [renew_streams_interval]),
  556. Session = emqx_session:ensure_timer(
  557. ?TIMER_GET_STREAMS,
  558. Interval,
  559. Session0#{s => S, shared_sub_s => SharedSubS}
  560. ),
  561. {ok, [], Session};
  562. handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
  563. S = emqx_persistent_session_ds_state:commit(bump_last_alive(S0)),
  564. Session = emqx_session:ensure_timer(
  565. ?TIMER_BUMP_LAST_ALIVE_AT,
  566. bump_interval(),
  567. Session0#{s => S}
  568. ),
  569. {ok, [], Session};
  570. handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S0}) ->
  571. S = emqx_persistent_session_ds_state:commit(S0),
  572. From ! Ref,
  573. {ok, [], Session#{s => S}};
  574. handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
  575. expire(ClientInfo, Session);
  576. handle_timeout(_ClientInfo, Timeout, Session) ->
  577. ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
  578. {ok, [], Session}.
  579. %%--------------------------------------------------------------------
  580. %% Generic messages
  581. %%--------------------------------------------------------------------
  582. -spec handle_info(term(), session()) -> session().
  583. handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := SharedSubS0}) ->
  584. {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_info(S0, SharedSubS0, Msg),
  585. Session#{s => S, shared_sub_s => SharedSubS}.
  586. %%--------------------------------------------------------------------
  587. %% Shared subscription outgoing messages
  588. %%--------------------------------------------------------------------
  589. shared_sub_opts(SessionId) ->
  590. #{
  591. session_id => SessionId,
  592. send_funs => #{
  593. send => fun send_message/2,
  594. send_after => fun send_message_after/3
  595. }
  596. }.
  597. send_message(Dest, Msg) ->
  598. case Dest =:= self() of
  599. true ->
  600. erlang:send(Dest, ?session_message(?shared_sub_message(Msg))),
  601. Msg;
  602. false ->
  603. erlang:send(Dest, Msg)
  604. end.
  605. send_message_after(Time, Dest, Msg) ->
  606. case Dest =:= self() of
  607. true ->
  608. erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg)));
  609. false ->
  610. erlang:send_after(Time, Dest, Msg)
  611. end.
  612. bump_last_alive(S0) ->
  613. %% Note: we take a pessimistic approach here and assume that the client will be alive
  614. %% until the next bump timeout. With this, we avoid garbage collecting this session
  615. %% too early in case the session/connection/node crashes earlier without having time
  616. %% to commit the time.
  617. EstimatedLastAliveAt = now_ms() + bump_interval(),
  618. emqx_persistent_session_ds_state:set_last_alive_at(EstimatedLastAliveAt, S0).
  619. -spec replay(clientinfo(), [], session()) ->
  620. {ok, replies(), session()}.
  621. replay(ClientInfo, [], Session0 = #{s := S0}) ->
  622. Streams = emqx_persistent_session_ds_stream_scheduler:find_replay_streams(S0),
  623. Session = replay_streams(Session0#{replay => Streams}, ClientInfo),
  624. {ok, [], Session}.
  625. replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
  626. case replay_batch(Srs0, Session0, ClientInfo) of
  627. Session = #{} ->
  628. replay_streams(Session#{replay := Rest}, ClientInfo);
  629. {error, recoverable, Reason} ->
  630. RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
  631. ?SLOG(debug, #{
  632. msg => "failed_to_fetch_replay_batch",
  633. stream => StreamKey,
  634. reason => Reason,
  635. class => recoverable,
  636. retry_in_ms => RetryTimeout
  637. }),
  638. emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0);
  639. {error, unrecoverable, Reason} ->
  640. Session1 = skip_batch(StreamKey, Srs0, Session0, ClientInfo, Reason),
  641. replay_streams(Session1#{replay := Rest}, ClientInfo)
  642. end;
  643. replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
  644. Session = maps:remove(replay, Session0),
  645. %% Note: we filled the buffer with the historical messages, and
  646. %% from now on we'll rely on the normal inflight/flow control
  647. %% mechanisms to replay them:
  648. pull_now(Session).
  649. -spec replay_batch(stream_state(), session(), clientinfo()) -> session() | emqx_ds:error(_).
  650. replay_batch(Srs0, Session0, ClientInfo) ->
  651. #srs{batch_size = BatchSize} = Srs0,
  652. case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of
  653. {ok, Srs, Session} ->
  654. %% Assert:
  655. Srs =:= Srs0 orelse
  656. ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{
  657. expected => Srs0,
  658. got => Srs
  659. }),
  660. Session;
  661. {error, _, _} = Error ->
  662. Error
  663. end.
  664. %% Handle `{error, unrecoverable, _}' returned by `enqueue_batch'.
  665. %% Most likely they mean that the generation containing the messages
  666. %% has been removed.
  667. -spec skip_batch(_StreamKey, stream_state(), session(), clientinfo(), _Reason) -> session().
  668. skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
  669. ?SLOG(info, #{
  670. msg => "session_ds_replay_unrecoverable_error",
  671. reason => Reason,
  672. srs => SRS0
  673. }),
  674. GenEvents = fun
  675. F(QoS, SeqNo, LastSeqNo) when SeqNo < LastSeqNo ->
  676. FakeMsg = #message{
  677. id = <<>>,
  678. qos = QoS,
  679. payload = <<>>,
  680. topic = <<>>,
  681. timestamp = 0
  682. },
  683. _ = emqx_session_events:handle_event(ClientInfo, {expired, FakeMsg}),
  684. F(QoS, inc_seqno(QoS, SeqNo), LastSeqNo);
  685. F(_, _, _) ->
  686. ok
  687. end,
  688. %% Treat messages as expired:
  689. GenEvents(?QOS_1, SRS0#srs.first_seqno_qos1, SRS0#srs.last_seqno_qos1),
  690. GenEvents(?QOS_2, SRS0#srs.first_seqno_qos2, SRS0#srs.last_seqno_qos2),
  691. SRS = SRS0#srs{it_end = end_of_stream, batch_size = 0},
  692. %% That's it for the iterator. Mark SRS as reached the
  693. %% `end_of_stream', and let stream scheduler do the rest:
  694. S = emqx_persistent_session_ds_state:put_stream(StreamKey, SRS, S0),
  695. Session#{s := S}.
  696. %%--------------------------------------------------------------------
  697. -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
  698. disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
  699. S1 = maybe_set_offline_info(S0, Id),
  700. S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
  701. S3 =
  702. case ConnInfo of
  703. #{expiry_interval := EI} when is_number(EI) ->
  704. emqx_persistent_session_ds_state:set_expiry_interval(EI, S2);
  705. _ ->
  706. S2
  707. end,
  708. S = emqx_persistent_session_ds_state:commit(S3),
  709. {shutdown, Session#{s => S}}.
  710. -spec terminate(Reason :: term(), session()) -> ok.
  711. terminate(_Reason, Session = #{id := Id, s := S}) ->
  712. maybe_set_will_message_timer(Session),
  713. _ = emqx_persistent_session_ds_state:commit(S),
  714. ?tp(debug, persistent_session_ds_terminate, #{id => Id}),
  715. ok.
  716. %%--------------------------------------------------------------------
  717. %% Management APIs (dashboard)
  718. %%--------------------------------------------------------------------
  719. -spec list_client_subscriptions(emqx_types:clientid()) ->
  720. {node() | undefined, [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}]}
  721. | {error, not_found}.
  722. list_client_subscriptions(ClientId) ->
  723. case emqx_persistent_message:is_persistence_enabled() of
  724. true ->
  725. %% TODO: this is not the most optimal implementation, since it
  726. %% should be possible to avoid reading extra data (streams, etc.)
  727. case print_session(ClientId) of
  728. Sess = #{s := #{subscriptions := Subs, subscription_states := SStates}} ->
  729. Node =
  730. case Sess of
  731. #{'_alive' := {true, Pid}} ->
  732. node(Pid);
  733. _ ->
  734. undefined
  735. end,
  736. SubList =
  737. maps:fold(
  738. fun(Topic, #{current_state := CS}, Acc) ->
  739. #{subopts := SubOpts} = maps:get(CS, SStates),
  740. Elem = {Topic, SubOpts#{durable => true}},
  741. [Elem | Acc]
  742. end,
  743. [],
  744. Subs
  745. ),
  746. {Node, SubList};
  747. undefined ->
  748. {error, not_found}
  749. end;
  750. false ->
  751. {error, not_found}
  752. end.
  753. -spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
  754. subscription() | undefined.
  755. get_client_subscription(ClientId, Topic) ->
  756. emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
  757. %%--------------------------------------------------------------------
  758. %% Session tables operations
  759. %%--------------------------------------------------------------------
  760. create_tables() ->
  761. emqx_persistent_session_ds_state:create_tables().
  762. %% @doc Force syncing of the transient state to persistent storage
  763. sync(ClientId) ->
  764. case emqx_cm:lookup_channels(ClientId) of
  765. [Pid] ->
  766. Ref = monitor(process, Pid),
  767. Pid ! {emqx_session, #req_sync{from = self(), ref = Ref}},
  768. receive
  769. {'DOWN', Ref, process, _Pid, Reason} ->
  770. {error, Reason};
  771. Ref ->
  772. demonitor(Ref, [flush]),
  773. ok
  774. end;
  775. [] ->
  776. {error, noproc}
  777. end.
  778. %% @doc Called when a client connects. This function looks up a
  779. %% session or returns `false` if previous one couldn't be found.
  780. %%
  781. %% Note: session API doesn't handle session takeovers, it's the job of
  782. %% the broker.
  783. -spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
  784. session() | false.
  785. session_open(
  786. SessionId,
  787. ClientInfo,
  788. NewConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer},
  789. MaybeWillMsg
  790. ) ->
  791. NowMS = now_ms(),
  792. case emqx_persistent_session_ds_state:open(SessionId) of
  793. {ok, S0} ->
  794. EI = emqx_persistent_session_ds_state:get_expiry_interval(S0),
  795. LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0),
  796. case NowMS >= LastAliveAt + EI of
  797. true ->
  798. session_drop(SessionId, expired),
  799. false;
  800. false ->
  801. ?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}),
  802. %% New connection being established
  803. S1 = emqx_persistent_session_ds_state:set_expiry_interval(EI, S0),
  804. S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
  805. S3 = emqx_persistent_session_ds_state:set_peername(
  806. maps:get(peername, NewConnInfo), S2
  807. ),
  808. S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
  809. S5 = set_clientinfo(ClientInfo, S4),
  810. S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5),
  811. {ok, S7, SharedSubS} = emqx_persistent_session_ds_shared_subs:open(
  812. S6, shared_sub_opts(SessionId)
  813. ),
  814. S = emqx_persistent_session_ds_state:commit(S7),
  815. Inflight = emqx_persistent_session_ds_inflight:new(
  816. receive_maximum(NewConnInfo)
  817. ),
  818. #{
  819. id => SessionId,
  820. s => S,
  821. shared_sub_s => SharedSubS,
  822. inflight => Inflight,
  823. props => #{}
  824. }
  825. end;
  826. undefined ->
  827. false
  828. end.
  829. -spec session_ensure_new(
  830. id(),
  831. emqx_types:clientinfo(),
  832. emqx_types:conninfo(),
  833. emqx_maybe:t(message()),
  834. emqx_session:conf()
  835. ) ->
  836. session().
  837. session_ensure_new(
  838. Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf
  839. ) ->
  840. ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
  841. Now = now_ms(),
  842. S0 = emqx_persistent_session_ds_state:create_new(Id),
  843. S1 = emqx_persistent_session_ds_state:set_expiry_interval(expiry_interval(ConnInfo), S0),
  844. S2 = bump_last_alive(S1),
  845. S3 = emqx_persistent_session_ds_state:set_created_at(Now, S2),
  846. S4 = lists:foldl(
  847. fun(Track, Acc) ->
  848. emqx_persistent_session_ds_state:put_seqno(Track, 0, Acc)
  849. end,
  850. S3,
  851. [
  852. ?next(?QOS_1),
  853. ?dup(?QOS_1),
  854. ?committed(?QOS_1),
  855. ?next(?QOS_2),
  856. ?dup(?QOS_2),
  857. ?rec,
  858. ?committed(?QOS_2)
  859. ]
  860. ),
  861. S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
  862. S6 = set_clientinfo(ClientInfo, S5),
  863. S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
  864. S = emqx_persistent_session_ds_state:commit(S7),
  865. #{
  866. id => Id,
  867. props => Conf,
  868. s => S,
  869. shared_sub_s => emqx_persistent_session_ds_shared_subs:new(shared_sub_opts(Id)),
  870. inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
  871. }.
  872. %% @doc Called when a client reconnects with `clean session=true' or
  873. %% during session GC
  874. -spec session_drop(id(), _Reason) -> ok.
  875. session_drop(SessionId, Reason) ->
  876. case emqx_persistent_session_ds_state:open(SessionId) of
  877. {ok, S0} ->
  878. ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}),
  879. ok = emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0),
  880. ok = emqx_persistent_session_ds_state:delete(SessionId);
  881. undefined ->
  882. ok
  883. end.
  884. now_ms() ->
  885. erlang:system_time(millisecond).
  886. set_clientinfo(ClientInfo0, S) ->
  887. %% Remove unnecessary fields from the clientinfo:
  888. ClientInfo = maps:without([cn, dn, auth_result], ClientInfo0),
  889. emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S).
  890. %%--------------------------------------------------------------------
  891. %% RPC targets (v1)
  892. %%--------------------------------------------------------------------
  893. %% RPC target.
  894. -spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) ->
  895. {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}.
  896. do_open_iterator(_TopicFilter, _StartMS, _IteratorID) ->
  897. {error, not_implemented}.
  898. %% RPC target.
  899. -spec do_ensure_iterator_closed(emqx_ds:iterator_id()) -> ok.
  900. do_ensure_iterator_closed(_IteratorID) ->
  901. ok.
  902. %% RPC target.
  903. -spec do_ensure_all_iterators_closed(id()) -> ok.
  904. do_ensure_all_iterators_closed(_DSSessionID) ->
  905. ok.
  906. %%--------------------------------------------------------------------
  907. %% Normal replay:
  908. %%--------------------------------------------------------------------
  909. fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
  910. Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S0),
  911. Session1 = fetch_new_messages(Streams, Session0, ClientInfo),
  912. #{s := S1, shared_sub_s := SharedSubS0} = Session1,
  913. {S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
  914. Session1#{s => S2, shared_sub_s => SharedSubS1}.
  915. fetch_new_messages([], Session, _ClientInfo) ->
  916. Session;
  917. fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
  918. BatchSize = get_config(ClientInfo, [batch_size]),
  919. case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
  920. true ->
  921. %% Buffer is full:
  922. Session0;
  923. false ->
  924. Session = new_batch(I, BatchSize, Session0, ClientInfo),
  925. fetch_new_messages(Streams, Session, ClientInfo)
  926. end.
  927. new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
  928. SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
  929. SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
  930. Srs1 = Srs0#srs{
  931. first_seqno_qos1 = SN1,
  932. first_seqno_qos2 = SN2,
  933. batch_size = 0,
  934. last_seqno_qos1 = SN1,
  935. last_seqno_qos2 = SN2
  936. },
  937. case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
  938. {ok, Srs, Session} ->
  939. S1 = emqx_persistent_session_ds_state:put_seqno(
  940. ?next(?QOS_1),
  941. Srs#srs.last_seqno_qos1,
  942. S0
  943. ),
  944. S2 = emqx_persistent_session_ds_state:put_seqno(
  945. ?next(?QOS_2),
  946. Srs#srs.last_seqno_qos2,
  947. S1
  948. ),
  949. S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
  950. Session#{s => S};
  951. {error, recoverable, Reason} ->
  952. ?SLOG(debug, #{
  953. msg => "failed_to_fetch_batch",
  954. stream => StreamKey,
  955. reason => Reason,
  956. class => recoverable
  957. }),
  958. Session0;
  959. {error, unrecoverable, Reason} ->
  960. skip_batch(StreamKey, Srs1, Session0, ClientInfo, Reason)
  961. end.
  962. enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->
  963. #srs{
  964. it_begin = ItBegin0,
  965. it_end = ItEnd0,
  966. first_seqno_qos1 = FirstSeqnoQos1,
  967. first_seqno_qos2 = FirstSeqnoQos2,
  968. sub_state_id = SubStateId
  969. } = Srs0,
  970. ItBegin =
  971. case IsReplay of
  972. true -> ItBegin0;
  973. false -> ItEnd0
  974. end,
  975. SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S),
  976. case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
  977. {ok, ItEnd, Messages} ->
  978. {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
  979. IsReplay,
  980. Session,
  981. SubState,
  982. ClientInfo,
  983. FirstSeqnoQos1,
  984. FirstSeqnoQos2,
  985. Messages,
  986. Inflight0
  987. ),
  988. Srs = Srs0#srs{
  989. it_begin = ItBegin,
  990. it_end = ItEnd,
  991. %% TODO: it should be possible to avoid calling
  992. %% length here by diffing size of inflight before
  993. %% and after inserting messages:
  994. batch_size = length(Messages),
  995. last_seqno_qos1 = LastSeqnoQos1,
  996. last_seqno_qos2 = LastSeqnoQos2
  997. },
  998. {ok, Srs, Session#{inflight := Inflight}};
  999. {ok, end_of_stream} ->
  1000. %% No new messages; just update the end iterator:
  1001. Srs = Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0},
  1002. {ok, Srs, Session#{inflight := Inflight0}};
  1003. {error, _, _} = Error ->
  1004. Error
  1005. end.
  1006. %% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
  1007. %% K.
  1008. process_batch(
  1009. _IsReplay, _Session, _SubState, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight
  1010. ) ->
  1011. {Inflight, LastSeqNoQos1, LastSeqNoQos2};
  1012. process_batch(
  1013. IsReplay,
  1014. Session,
  1015. SubState,
  1016. ClientInfo,
  1017. FirstSeqNoQos1,
  1018. FirstSeqNoQos2,
  1019. [KV | Messages],
  1020. Inflight0
  1021. ) ->
  1022. #{s := S} = Session,
  1023. #{upgrade_qos := UpgradeQoS, subopts := SubOpts} = SubState,
  1024. {_DsMsgKey, Msg0} = KV,
  1025. Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
  1026. Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
  1027. Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S),
  1028. Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S),
  1029. Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S),
  1030. Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS),
  1031. {Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl(
  1032. fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) ->
  1033. case Qos of
  1034. ?QOS_0 ->
  1035. SeqNoQos1 = SeqNoQos10,
  1036. SeqNoQos2 = SeqNoQos20;
  1037. ?QOS_1 ->
  1038. SeqNoQos1 = inc_seqno(?QOS_1, SeqNoQos10),
  1039. SeqNoQos2 = SeqNoQos20;
  1040. ?QOS_2 ->
  1041. SeqNoQos1 = SeqNoQos10,
  1042. SeqNoQos2 = inc_seqno(?QOS_2, SeqNoQos20)
  1043. end,
  1044. {
  1045. case Qos of
  1046. ?QOS_0 when IsReplay ->
  1047. %% We ignore QoS 0 messages during replay:
  1048. Acc;
  1049. ?QOS_0 ->
  1050. emqx_persistent_session_ds_inflight:push({undefined, Msg}, Acc);
  1051. ?QOS_1 when SeqNoQos1 =< Comm1 ->
  1052. %% QoS1 message has been acked by the client, ignore:
  1053. Acc;
  1054. ?QOS_1 when SeqNoQos1 =< Dup1 ->
  1055. %% QoS1 message has been sent but not
  1056. %% acked. Retransmit:
  1057. Msg1 = emqx_message:set_flag(dup, true, Msg),
  1058. emqx_persistent_session_ds_inflight:push({SeqNoQos1, Msg1}, Acc);
  1059. ?QOS_1 ->
  1060. emqx_persistent_session_ds_inflight:push({SeqNoQos1, Msg}, Acc);
  1061. ?QOS_2 when SeqNoQos2 =< Comm2 ->
  1062. %% QoS2 message has been PUBCOMP'ed by the client, ignore:
  1063. Acc;
  1064. ?QOS_2 when SeqNoQos2 =< Rec ->
  1065. %% QoS2 message has been PUBREC'ed by the client, resend PUBREL:
  1066. emqx_persistent_session_ds_inflight:push({pubrel, SeqNoQos2}, Acc);
  1067. ?QOS_2 when SeqNoQos2 =< Dup2 ->
  1068. %% QoS2 message has been sent, but we haven't received PUBREC.
  1069. %%
  1070. %% TODO: According to the MQTT standard 4.3.3:
  1071. %% DUP flag is never set for QoS2 messages? We
  1072. %% do so for mem sessions, though.
  1073. Msg1 = emqx_message:set_flag(dup, true, Msg),
  1074. emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg1}, Acc);
  1075. ?QOS_2 ->
  1076. emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg}, Acc)
  1077. end,
  1078. SeqNoQos1,
  1079. SeqNoQos2
  1080. }
  1081. end,
  1082. {Inflight0, FirstSeqNoQos1, FirstSeqNoQos2},
  1083. Msgs
  1084. ),
  1085. process_batch(
  1086. IsReplay, Session, SubState, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
  1087. ).
  1088. %%--------------------------------------------------------------------
  1089. %% Transient messages
  1090. %%--------------------------------------------------------------------
  1091. enqueue_transient(
  1092. _ClientInfo, Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}
  1093. ) ->
  1094. %% TODO: Such messages won't be retransmitted, should the session
  1095. %% reconnect before transient messages are acked.
  1096. %%
  1097. %% Proper solution could look like this: session publishes
  1098. %% transient messages to a separate DS DB that serves as a queue,
  1099. %% then subscribes to a special system topic that contains the
  1100. %% queued messages. Since streams in this DB are exclusive to the
  1101. %% session, messages from the queue can be dropped as soon as they
  1102. %% are acked.
  1103. case Qos of
  1104. ?QOS_0 ->
  1105. S = S0,
  1106. Inflight = emqx_persistent_session_ds_inflight:push({undefined, Msg}, Inflight0);
  1107. QoS when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
  1108. SeqNo = inc_seqno(
  1109. QoS, emqx_persistent_session_ds_state:get_seqno(?next(QoS), S0)
  1110. ),
  1111. S = emqx_persistent_session_ds_state:put_seqno(?next(QoS), SeqNo, S0),
  1112. Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0)
  1113. end,
  1114. Session#{
  1115. inflight => Inflight,
  1116. s => S
  1117. }.
  1118. %%--------------------------------------------------------------------
  1119. %% Buffer drain
  1120. %%--------------------------------------------------------------------
  1121. drain_buffer(Session = #{inflight := Inflight0, s := S0}) ->
  1122. {Publishes, Inflight, S} = do_drain_buffer(Inflight0, S0, []),
  1123. {Publishes, Session#{inflight => Inflight, s := S}}.
  1124. do_drain_buffer(Inflight0, S0, Acc) ->
  1125. case emqx_persistent_session_ds_inflight:pop(Inflight0) of
  1126. undefined ->
  1127. {lists:reverse(Acc), Inflight0, S0};
  1128. {{pubrel, SeqNo}, Inflight} ->
  1129. Publish = {pubrel, seqno_to_packet_id(?QOS_2, SeqNo)},
  1130. do_drain_buffer(Inflight, S0, [Publish | Acc]);
  1131. {{SeqNo, Msg}, Inflight} ->
  1132. case Msg#message.qos of
  1133. ?QOS_0 ->
  1134. do_drain_buffer(Inflight, S0, [{undefined, Msg} | Acc]);
  1135. Qos ->
  1136. S = emqx_persistent_session_ds_state:put_seqno(?dup(Qos), SeqNo, S0),
  1137. Publish = {seqno_to_packet_id(Qos, SeqNo), Msg},
  1138. do_drain_buffer(Inflight, S, [Publish | Acc])
  1139. end
  1140. end.
  1141. %%--------------------------------------------------------------------------------
  1142. %% TODO: find a more reliable way to perform actions that have side
  1143. %% effects. Add `CBM:init' callback to the session behavior?
  1144. -spec ensure_timers(session()) -> session().
  1145. ensure_timers(Session0) ->
  1146. Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0),
  1147. Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
  1148. emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
  1149. -spec pull_now(session()) -> session().
  1150. pull_now(Session) ->
  1151. emqx_session:reset_timer(?TIMER_PULL, 0, Session).
  1152. -spec receive_maximum(conninfo()) -> pos_integer().
  1153. receive_maximum(ConnInfo) ->
  1154. %% Note: the default value should be always set by the channel
  1155. %% with respect to the zone configuration, but the type spec
  1156. %% indicates that it's optional.
  1157. maps:get(receive_maximum, ConnInfo, 65_535).
  1158. -spec expiry_interval(conninfo()) -> millisecond().
  1159. expiry_interval(ConnInfo) ->
  1160. maps:get(expiry_interval, ConnInfo, 0).
  1161. %% Note: we don't allow overriding `heartbeat_interval' per
  1162. %% zone, since the GC process is responsible for all sessions
  1163. %% regardless of the zone.
  1164. bump_interval() ->
  1165. emqx_config:get([durable_sessions, heartbeat_interval]).
  1166. get_config(#{zone := Zone}, Key) ->
  1167. emqx_config:get_zone_conf(Zone, [durable_sessions | Key]).
  1168. -spec try_get_live_session(emqx_types:clientid()) ->
  1169. {pid(), session()} | not_found | not_persistent.
  1170. try_get_live_session(ClientId) ->
  1171. case emqx_cm:lookup_channels(local, ClientId) of
  1172. [Pid] ->
  1173. try
  1174. #{channel := ChanState} = emqx_connection:get_state(Pid),
  1175. case emqx_channel:info(impl, ChanState) of
  1176. ?MODULE ->
  1177. {Pid, emqx_channel:info(session_state, ChanState)};
  1178. _ ->
  1179. not_persistent
  1180. end
  1181. catch
  1182. _:_ ->
  1183. not_found
  1184. end;
  1185. _ ->
  1186. not_found
  1187. end.
  1188. -spec maybe_set_offline_info(emqx_persistent_session_ds_state:t(), emqx_types:clientid()) ->
  1189. emqx_persistent_session_ds_state:t().
  1190. maybe_set_offline_info(S, Id) ->
  1191. case emqx_cm:lookup_client({clientid, Id}) of
  1192. [{_Key, ChannelInfo, Stats}] ->
  1193. emqx_persistent_session_ds_state:set_offline_info(
  1194. #{
  1195. chan_info => ChannelInfo,
  1196. stats => Stats,
  1197. disconnected_at => erlang:system_time(millisecond),
  1198. last_connected_to => node()
  1199. },
  1200. S
  1201. );
  1202. _ ->
  1203. S
  1204. end.
  1205. %%--------------------------------------------------------------------
  1206. %% SeqNo tracking
  1207. %% --------------------------------------------------------------------
  1208. -spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
  1209. {ok, emqx_types:message(), session()} | {error, _}.
  1210. update_seqno(Track, PacketId, Session = #{id := SessionId, s := S, inflight := Inflight0}) ->
  1211. SeqNo = packet_id_to_seqno(PacketId, S),
  1212. case Track of
  1213. puback ->
  1214. SeqNoKey = ?committed(?QOS_1),
  1215. Result = emqx_persistent_session_ds_inflight:puback(SeqNo, Inflight0);
  1216. pubrec ->
  1217. SeqNoKey = ?rec,
  1218. Result = emqx_persistent_session_ds_inflight:pubrec(SeqNo, Inflight0);
  1219. pubcomp ->
  1220. SeqNoKey = ?committed(?QOS_2),
  1221. Result = emqx_persistent_session_ds_inflight:pubcomp(SeqNo, Inflight0)
  1222. end,
  1223. case Result of
  1224. {ok, Inflight} ->
  1225. %% TODO: we pass a bogus message into the hook:
  1226. Msg = emqx_message:make(SessionId, <<>>, <<>>),
  1227. {ok, Msg, Session#{
  1228. s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S),
  1229. inflight => Inflight
  1230. }};
  1231. {error, Expected} ->
  1232. ?SLOG(warning, #{
  1233. msg => "out-of-order_commit",
  1234. track => Track,
  1235. packet_id => PacketId,
  1236. seqno => SeqNo,
  1237. expected => Expected
  1238. }),
  1239. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  1240. end.
  1241. %%--------------------------------------------------------------------
  1242. %% Functions for dealing with the sequence number and packet ID
  1243. %% generation
  1244. %% --------------------------------------------------------------------
  1245. -define(EPOCH_BITS, 15).
  1246. -define(PACKET_ID_MASK, 2#111_1111_1111_1111).
  1247. %% Epoch size = `16#10000 div 2' since we generate different sets of
  1248. %% packet IDs for QoS1 and QoS2:
  1249. -define(EPOCH_SIZE, 16#8000).
  1250. %% Reconstruct session counter by adding most significant bits from
  1251. %% the current counter to the packet id:
  1252. -spec packet_id_to_seqno(emqx_types:packet_id(), emqx_persistent_session_ds_state:t()) ->
  1253. seqno().
  1254. packet_id_to_seqno(PacketId, S) ->
  1255. NextSeqNo = emqx_persistent_session_ds_state:get_seqno(?next(packet_id_to_qos(PacketId)), S),
  1256. Epoch = NextSeqNo bsr ?EPOCH_BITS,
  1257. SeqNo = (Epoch bsl ?EPOCH_BITS) + (PacketId band ?PACKET_ID_MASK),
  1258. case SeqNo =< NextSeqNo of
  1259. true ->
  1260. SeqNo;
  1261. false ->
  1262. SeqNo - ?EPOCH_SIZE
  1263. end.
  1264. -spec inc_seqno(?QOS_1 | ?QOS_2, seqno()) -> emqx_types:packet_id().
  1265. inc_seqno(Qos, SeqNo) ->
  1266. NextSeqno = SeqNo + 1,
  1267. case seqno_to_packet_id(Qos, NextSeqno) of
  1268. 0 ->
  1269. %% We skip sequence numbers that lead to PacketId = 0 to
  1270. %% simplify math. Note: it leads to occasional gaps in the
  1271. %% sequence numbers.
  1272. NextSeqno + 1;
  1273. _ ->
  1274. NextSeqno
  1275. end.
  1276. %% Note: we use the most significant bit to store the QoS.
  1277. seqno_to_packet_id(?QOS_1, SeqNo) ->
  1278. SeqNo band ?PACKET_ID_MASK;
  1279. seqno_to_packet_id(?QOS_2, SeqNo) ->
  1280. SeqNo band ?PACKET_ID_MASK bor ?EPOCH_SIZE.
  1281. packet_id_to_qos(PacketId) ->
  1282. PacketId bsr ?EPOCH_BITS + 1.
  1283. seqno_diff(Qos, A, B, S) ->
  1284. seqno_diff(
  1285. Qos,
  1286. emqx_persistent_session_ds_state:get_seqno(A, S),
  1287. emqx_persistent_session_ds_state:get_seqno(B, S)
  1288. ).
  1289. %% Dialyzer complains about the second clause, since it's currently
  1290. %% unused, shut it up:
  1291. -dialyzer({nowarn_function, seqno_diff/3}).
  1292. seqno_diff(?QOS_1, A, B) ->
  1293. %% For QoS1 messages we skip a seqno every time the epoch changes,
  1294. %% we need to substract that from the diff:
  1295. EpochA = A bsr ?EPOCH_BITS,
  1296. EpochB = B bsr ?EPOCH_BITS,
  1297. A - B - (EpochA - EpochB);
  1298. seqno_diff(?QOS_2, A, B) ->
  1299. A - B.
  1300. %%--------------------------------------------------------------------
  1301. %% Will message handling
  1302. %%--------------------------------------------------------------------
  1303. -spec clear_will_message(session()) -> session().
  1304. clear_will_message(#{s := S0} = Session) ->
  1305. S = emqx_persistent_session_ds_state:clear_will_message(S0),
  1306. Session#{s := S}.
  1307. -spec publish_will_message_now(session(), message()) -> session().
  1308. publish_will_message_now(#{} = Session, WillMsg = #message{}) ->
  1309. _ = emqx_broker:publish(WillMsg),
  1310. clear_will_message(Session).
  1311. maybe_set_will_message_timer(#{id := SessionId, s := S}) ->
  1312. case emqx_persistent_session_ds_state:get_will_message(S) of
  1313. #message{} = WillMsg ->
  1314. WillDelayInterval = emqx_channel:will_delay_interval(WillMsg),
  1315. WillDelayInterval > 0 andalso
  1316. emqx_persistent_session_ds_gc_worker:check_session_after(
  1317. SessionId,
  1318. timer:seconds(WillDelayInterval)
  1319. ),
  1320. ok;
  1321. _ ->
  1322. ok
  1323. end.
  1324. %%--------------------------------------------------------------------
  1325. %% Tests
  1326. %%--------------------------------------------------------------------
  1327. -ifdef(TEST).
  1328. %% Warning: the below functions may return out-of-date results because
  1329. %% the sessions commit data to mria asynchronously.
  1330. list_all_sessions() ->
  1331. maps:from_list(
  1332. [
  1333. {Id, print_session(Id)}
  1334. || Id <- emqx_persistent_session_ds_state:list_sessions()
  1335. ]
  1336. ).
  1337. %%%% Proper generators:
  1338. %% Generate a sequence number that smaller than the given `NextSeqNo'
  1339. %% number by at most `?EPOCH_SIZE':
  1340. seqno_gen(NextSeqNo) ->
  1341. WindowSize = ?EPOCH_SIZE - 1,
  1342. Min = max(0, NextSeqNo - WindowSize),
  1343. Max = max(0, NextSeqNo - 1),
  1344. range(Min, Max).
  1345. %% Generate a sequence number:
  1346. next_seqno_gen() ->
  1347. ?LET(
  1348. {Epoch, Offset},
  1349. {non_neg_integer(), range(0, ?EPOCH_SIZE)},
  1350. Epoch bsl ?EPOCH_BITS + Offset
  1351. ).
  1352. %%%% Property-based tests:
  1353. %% erlfmt-ignore
  1354. packet_id_to_seqno_prop() ->
  1355. ?FORALL(
  1356. {Qos, NextSeqNo}, {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()},
  1357. ?FORALL(
  1358. ExpectedSeqNo, seqno_gen(NextSeqNo),
  1359. begin
  1360. PacketId = seqno_to_packet_id(Qos, ExpectedSeqNo),
  1361. SeqNo = packet_id_to_seqno(PacketId, NextSeqNo),
  1362. ?WHENFAIL(
  1363. begin
  1364. io:format(user, " *** PacketID = ~p~n", [PacketId]),
  1365. io:format(user, " *** SeqNo = ~p -> ~p~n", [ExpectedSeqNo, SeqNo]),
  1366. io:format(user, " *** NextSeqNo = ~p~n", [NextSeqNo])
  1367. end,
  1368. PacketId < 16#10000 andalso SeqNo =:= ExpectedSeqNo
  1369. )
  1370. end)).
  1371. inc_seqno_prop() ->
  1372. ?FORALL(
  1373. {Qos, SeqNo},
  1374. {oneof([?QOS_1, ?QOS_2]), next_seqno_gen()},
  1375. begin
  1376. NewSeqNo = inc_seqno(Qos, SeqNo),
  1377. PacketId = seqno_to_packet_id(Qos, NewSeqNo),
  1378. ?WHENFAIL(
  1379. begin
  1380. io:format(user, " *** QoS = ~p~n", [Qos]),
  1381. io:format(user, " *** SeqNo = ~p -> ~p~n", [SeqNo, NewSeqNo]),
  1382. io:format(user, " *** PacketId = ~p~n", [PacketId])
  1383. end,
  1384. PacketId > 0 andalso PacketId < 16#10000
  1385. )
  1386. end
  1387. ).
  1388. seqno_diff_prop() ->
  1389. ?FORALL(
  1390. {Qos, SeqNo, N},
  1391. {oneof([?QOS_1, ?QOS_2]), next_seqno_gen(), range(0, 100)},
  1392. ?IMPLIES(
  1393. seqno_to_packet_id(Qos, SeqNo) > 0,
  1394. begin
  1395. NewSeqNo = apply_n_times(N, fun(A) -> inc_seqno(Qos, A) end, SeqNo),
  1396. Diff = seqno_diff(Qos, NewSeqNo, SeqNo),
  1397. ?WHENFAIL(
  1398. begin
  1399. io:format(user, " *** QoS = ~p~n", [Qos]),
  1400. io:format(user, " *** SeqNo = ~p -> ~p~n", [SeqNo, NewSeqNo]),
  1401. io:format(user, " *** N : ~p == ~p~n", [N, Diff])
  1402. end,
  1403. N =:= Diff
  1404. )
  1405. end
  1406. )
  1407. ).
  1408. seqno_proper_test_() ->
  1409. Props = [packet_id_to_seqno_prop(), inc_seqno_prop(), seqno_diff_prop()],
  1410. Opts = [{numtests, 1000}, {to_file, user}],
  1411. {timeout, 30,
  1412. {setup,
  1413. fun() ->
  1414. meck:new(emqx_persistent_session_ds_state, [no_history]),
  1415. ok = meck:expect(emqx_persistent_session_ds_state, get_seqno, fun(_Track, Seqno) ->
  1416. Seqno
  1417. end)
  1418. end,
  1419. fun(_) ->
  1420. meck:unload(emqx_persistent_session_ds_state)
  1421. end,
  1422. [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}}.
  1423. apply_n_times(0, _Fun, A) ->
  1424. A;
  1425. apply_n_times(N, Fun, A) when N > 0 ->
  1426. apply_n_times(N - 1, Fun, Fun(A)).
  1427. -endif.