emqx_session.erl 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc
  18. %% A stateful interaction between a Client and a Server. Some Sessions
  19. %% last only as long as the Network Connection, others can span multiple
  20. %% consecutive Network Connections between a Client and a Server.
  21. %%
  22. %% The Session State in the Server consists of:
  23. %%
  24. %% The existence of a Session, even if the rest of the Session State is empty.
  25. %%
  26. %% The Clients subscriptions, including any Subscription Identifiers.
  27. %%
  28. %% QoS 1 and QoS 2 messages which have been sent to the Client, but have not
  29. %% been completely acknowledged.
  30. %%
  31. %% QoS 1 and QoS 2 messages pending transmission to the Client and OPTIONALLY
  32. %% QoS 0 messages pending transmission to the Client.
  33. %%
  34. %% QoS 2 messages which have been received from the Client, but have not been
  35. %% completely acknowledged.The Will Message and the Will Delay Interval
  36. %%
  37. %% If the Session is currently not connected, the time at which the Session
  38. %% will end and Session State will be discarded.
  39. %% @end
  40. %%--------------------------------------------------------------------
  41. %% MQTT Session
  42. -module(emqx_session).
  43. -include("emqx.hrl").
  44. -include("emqx_mqtt.hrl").
  45. -include("logger.hrl").
  46. -include("types.hrl").
  47. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  48. -ifdef(TEST).
  49. -compile(export_all).
  50. -compile(nowarn_export_all).
  51. -endif.
  52. -export([init/1]).
  53. -export([
  54. info/1,
  55. info/2,
  56. is_session/1,
  57. stats/1,
  58. obtain_next_pkt_id/1,
  59. get_mqueue/1
  60. ]).
  61. -export([
  62. subscribe/4,
  63. unsubscribe/4
  64. ]).
  65. -export([
  66. publish/4,
  67. puback/3,
  68. pubrec/3,
  69. pubrel/3,
  70. pubcomp/3
  71. ]).
  72. -export([
  73. deliver/3,
  74. enqueue/3,
  75. dequeue/2,
  76. filter_queue/2,
  77. ignore_local/4,
  78. retry/2,
  79. terminate/3
  80. ]).
  81. -export([
  82. takeover/1,
  83. resume/2,
  84. replay/2
  85. ]).
  86. -export([expire/3]).
  87. %% Export for CT
  88. -export([set_field/3]).
  89. -type sessionID() :: emqx_guid:guid().
  90. -export_type([
  91. session/0,
  92. sessionID/0
  93. ]).
  94. -record(session, {
  95. %% Client's id
  96. clientid :: emqx_types:clientid(),
  97. id :: sessionID(),
  98. %% Is this session a persistent session i.e. was it started with Session-Expiry > 0
  99. is_persistent :: boolean(),
  100. %% Client’s Subscriptions.
  101. subscriptions :: map(),
  102. %% Max subscriptions allowed
  103. max_subscriptions :: non_neg_integer() | infinity,
  104. %% Upgrade QoS?
  105. upgrade_qos :: boolean(),
  106. %% Client <- Broker: QoS1/2 messages sent to the client but
  107. %% have not been unacked.
  108. inflight :: emqx_inflight:inflight(),
  109. %% All QoS1/2 messages published to when client is disconnected,
  110. %% or QoS1/2 messages pending transmission to the Client.
  111. %%
  112. %% Optionally, QoS0 messages pending transmission to the Client.
  113. mqueue :: emqx_mqueue:mqueue(),
  114. %% Next packet id of the session
  115. next_pkt_id = 1 :: emqx_types:packet_id(),
  116. %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
  117. retry_interval :: timeout(),
  118. %% Client -> Broker: QoS2 messages received from the client, but
  119. %% have not been completely acknowledged
  120. awaiting_rel :: map(),
  121. %% Maximum number of awaiting QoS2 messages allowed
  122. max_awaiting_rel :: non_neg_integer() | infinity,
  123. %% Awaiting PUBREL Timeout (Unit: millisecond)
  124. await_rel_timeout :: timeout(),
  125. %% Created at
  126. created_at :: pos_integer()
  127. %% Message deliver latency stats
  128. }).
  129. -type inflight_data_phase() :: wait_ack | wait_comp.
  130. -record(inflight_data, {
  131. phase :: inflight_data_phase(),
  132. message :: emqx_types:message(),
  133. timestamp :: non_neg_integer()
  134. }).
  135. -type session() :: #session{}.
  136. -type publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}.
  137. -type pubrel() :: {pubrel, emqx_types:packet_id()}.
  138. -type replies() :: list(publish() | pubrel()).
  139. -define(INFO_KEYS, [
  140. id,
  141. is_persistent,
  142. subscriptions,
  143. upgrade_qos,
  144. retry_interval,
  145. await_rel_timeout,
  146. created_at
  147. ]).
  148. -define(STATS_KEYS, [
  149. subscriptions_cnt,
  150. subscriptions_max,
  151. inflight_cnt,
  152. inflight_max,
  153. mqueue_len,
  154. mqueue_max,
  155. mqueue_dropped,
  156. next_pkt_id,
  157. awaiting_rel_cnt,
  158. awaiting_rel_max
  159. ]).
  160. -define(DEFAULT_BATCH_N, 1000).
  161. -type options() :: #{
  162. max_subscriptions => non_neg_integer(),
  163. upgrade_qos => boolean(),
  164. retry_interval => timeout(),
  165. max_awaiting_rel => non_neg_integer() | infinity,
  166. await_rel_timeout => timeout(),
  167. max_inflight => integer(),
  168. mqueue => emqx_mqueue:options(),
  169. is_persistent => boolean(),
  170. clientid => emqx_types:clientid()
  171. }.
  172. %%--------------------------------------------------------------------
  173. %% Init a Session
  174. %%--------------------------------------------------------------------
  175. -spec init(options()) -> session().
  176. init(Opts) ->
  177. MaxInflight = maps:get(max_inflight, Opts),
  178. QueueOpts = maps:merge(
  179. #{
  180. max_len => 1000,
  181. store_qos0 => true
  182. },
  183. maps:get(mqueue, Opts, #{})
  184. ),
  185. #session{
  186. id = emqx_guid:gen(),
  187. clientid = maps:get(clientid, Opts, <<>>),
  188. is_persistent = maps:get(is_persistent, Opts),
  189. max_subscriptions = maps:get(max_subscriptions, Opts),
  190. subscriptions = #{},
  191. upgrade_qos = maps:get(upgrade_qos, Opts),
  192. inflight = emqx_inflight:new(MaxInflight),
  193. mqueue = emqx_mqueue:init(QueueOpts),
  194. next_pkt_id = 1,
  195. retry_interval = maps:get(retry_interval, Opts),
  196. awaiting_rel = #{},
  197. max_awaiting_rel = maps:get(max_awaiting_rel, Opts),
  198. await_rel_timeout = maps:get(await_rel_timeout, Opts),
  199. created_at = erlang:system_time(millisecond)
  200. }.
  201. %%--------------------------------------------------------------------
  202. %% Info, Stats
  203. %%--------------------------------------------------------------------
  204. is_session(#session{}) -> true;
  205. is_session(_) -> false.
  206. %% @doc Get infos of the session.
  207. -spec info(session()) -> emqx_types:infos().
  208. info(Session) ->
  209. maps:from_list(info(?INFO_KEYS, Session)).
  210. info(Keys, Session) when is_list(Keys) ->
  211. [{Key, info(Key, Session)} || Key <- Keys];
  212. info(id, #session{id = Id}) ->
  213. Id;
  214. info(is_persistent, #session{is_persistent = Bool}) ->
  215. Bool;
  216. info(subscriptions, #session{subscriptions = Subs}) ->
  217. Subs;
  218. info(subscriptions_cnt, #session{subscriptions = Subs}) ->
  219. maps:size(Subs);
  220. info(subscriptions_max, #session{max_subscriptions = MaxSubs}) ->
  221. MaxSubs;
  222. info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) ->
  223. UpgradeQoS;
  224. info(inflight, #session{inflight = Inflight}) ->
  225. Inflight;
  226. info(inflight_cnt, #session{inflight = Inflight}) ->
  227. emqx_inflight:size(Inflight);
  228. info(inflight_max, #session{inflight = Inflight}) ->
  229. emqx_inflight:max_size(Inflight);
  230. info(retry_interval, #session{retry_interval = Interval}) ->
  231. Interval;
  232. info(mqueue, #session{mqueue = MQueue}) ->
  233. MQueue;
  234. info(mqueue_len, #session{mqueue = MQueue}) ->
  235. emqx_mqueue:len(MQueue);
  236. info(mqueue_max, #session{mqueue = MQueue}) ->
  237. emqx_mqueue:max_len(MQueue);
  238. info(mqueue_dropped, #session{mqueue = MQueue}) ->
  239. emqx_mqueue:dropped(MQueue);
  240. info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
  241. PacketId;
  242. info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
  243. AwaitingRel;
  244. info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
  245. maps:size(AwaitingRel);
  246. info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
  247. Max;
  248. info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
  249. Timeout;
  250. info(created_at, #session{created_at = CreatedAt}) ->
  251. CreatedAt.
  252. %% @doc Get stats of the session.
  253. -spec stats(session()) -> emqx_types:stats().
  254. stats(Session) -> info(?STATS_KEYS, Session).
  255. %%--------------------------------------------------------------------
  256. %% Ignore local messages
  257. %%--------------------------------------------------------------------
  258. ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
  259. Subs = info(subscriptions, Session),
  260. lists:dropwhile(
  261. fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
  262. case maps:find(Topic, Subs) of
  263. {ok, #{nl := 1}} when Subscriber =:= Publisher ->
  264. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
  265. ok = emqx_metrics:inc('delivery.dropped'),
  266. ok = emqx_metrics:inc('delivery.dropped.no_local'),
  267. true;
  268. _ ->
  269. false
  270. end
  271. end,
  272. Delivers
  273. ).
  274. %%--------------------------------------------------------------------
  275. %% Client -> Broker: SUBSCRIBE
  276. %%--------------------------------------------------------------------
  277. -spec subscribe(
  278. emqx_types:clientinfo(),
  279. emqx_types:topic(),
  280. emqx_types:subopts(),
  281. session()
  282. ) ->
  283. {ok, session()} | {error, emqx_types:reason_code()}.
  284. subscribe(
  285. ClientInfo = #{clientid := ClientId},
  286. TopicFilter,
  287. SubOpts,
  288. Session = #session{id = SessionID, is_persistent = IsPS, subscriptions = Subs}
  289. ) ->
  290. IsNew = not maps:is_key(TopicFilter, Subs),
  291. case IsNew andalso is_subscriptions_full(Session) of
  292. false ->
  293. ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
  294. ok = emqx_persistent_session:add_subscription(TopicFilter, SessionID, IsPS),
  295. ok = emqx_hooks:run(
  296. 'session.subscribed',
  297. [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]
  298. ),
  299. {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
  300. true ->
  301. {error, ?RC_QUOTA_EXCEEDED}
  302. end.
  303. is_subscriptions_full(#session{max_subscriptions = infinity}) ->
  304. false;
  305. is_subscriptions_full(#session{
  306. subscriptions = Subs,
  307. max_subscriptions = MaxLimit
  308. }) ->
  309. maps:size(Subs) >= MaxLimit.
  310. %%--------------------------------------------------------------------
  311. %% Client -> Broker: UNSUBSCRIBE
  312. %%--------------------------------------------------------------------
  313. -spec unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session()) ->
  314. {ok, session()} | {error, emqx_types:reason_code()}.
  315. unsubscribe(
  316. ClientInfo,
  317. TopicFilter,
  318. UnSubOpts,
  319. Session = #session{id = SessionID, subscriptions = Subs, is_persistent = IsPS}
  320. ) ->
  321. case maps:find(TopicFilter, Subs) of
  322. {ok, SubOpts} ->
  323. ok = emqx_broker:unsubscribe(TopicFilter),
  324. ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS),
  325. ok = emqx_hooks:run(
  326. 'session.unsubscribed',
  327. [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]
  328. ),
  329. {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
  330. error ->
  331. {error, ?RC_NO_SUBSCRIPTION_EXISTED}
  332. end.
  333. %%--------------------------------------------------------------------
  334. %% Client -> Broker: PUBLISH
  335. %%--------------------------------------------------------------------
  336. -spec publish(emqx_types:clientinfo(), emqx_types:packet_id(), emqx_types:message(), session()) ->
  337. {ok, emqx_types:publish_result(), session()}
  338. | {error, emqx_types:reason_code()}.
  339. publish(
  340. _ClientInfo,
  341. PacketId,
  342. Msg = #message{qos = ?QOS_2, timestamp = Ts},
  343. Session = #session{awaiting_rel = AwaitingRel}
  344. ) ->
  345. case is_awaiting_full(Session) of
  346. false ->
  347. case maps:is_key(PacketId, AwaitingRel) of
  348. false ->
  349. Results = emqx_broker:publish(Msg),
  350. AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
  351. {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
  352. true ->
  353. drop_qos2_msg(PacketId, Msg, ?RC_PACKET_IDENTIFIER_IN_USE)
  354. end;
  355. true ->
  356. drop_qos2_msg(PacketId, Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED)
  357. end;
  358. %% Publish QoS0/1 directly
  359. publish(_ClientInfo, _PacketId, Msg, Session) ->
  360. {ok, emqx_broker:publish(Msg), Session}.
  361. drop_qos2_msg(PacketId, Msg, RC) ->
  362. ?SLOG(
  363. warning,
  364. #{
  365. msg => "dropped_qos2_packet",
  366. reason => emqx_reason_codes:name(RC),
  367. packet_id => PacketId
  368. },
  369. #{topic => Msg#message.topic}
  370. ),
  371. ok = emqx_metrics:inc('messages.dropped'),
  372. ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(RC)]),
  373. {error, RC}.
  374. is_awaiting_full(#session{max_awaiting_rel = infinity}) ->
  375. false;
  376. is_awaiting_full(#session{
  377. awaiting_rel = AwaitingRel,
  378. max_awaiting_rel = MaxLimit
  379. }) ->
  380. maps:size(AwaitingRel) >= MaxLimit.
  381. %%--------------------------------------------------------------------
  382. %% Client -> Broker: PUBACK
  383. %%--------------------------------------------------------------------
  384. -spec puback(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  385. {ok, emqx_types:message(), session()}
  386. | {ok, emqx_types:message(), replies(), session()}
  387. | {error, emqx_types:reason_code()}.
  388. puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
  389. case emqx_inflight:lookup(PacketId, Inflight) of
  390. {value, #inflight_data{phase = wait_ack, message = Msg}} ->
  391. on_delivery_completed(Msg, Session),
  392. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  393. return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
  394. {value, _} ->
  395. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  396. none ->
  397. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  398. end.
  399. return_with(Msg, {ok, Session}) ->
  400. {ok, Msg, Session};
  401. return_with(Msg, {ok, Publishes, Session}) ->
  402. {ok, Msg, Publishes, Session}.
  403. %%--------------------------------------------------------------------
  404. %% Client -> Broker: PUBREC
  405. %%--------------------------------------------------------------------
  406. -spec pubrec(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  407. {ok, emqx_types:message(), session()}
  408. | {error, emqx_types:reason_code()}.
  409. pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
  410. case emqx_inflight:lookup(PacketId, Inflight) of
  411. {value, #inflight_data{phase = wait_ack, message = Msg} = Data} ->
  412. Update = Data#inflight_data{phase = wait_comp},
  413. Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
  414. {ok, Msg, Session#session{inflight = Inflight1}};
  415. {value, _} ->
  416. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  417. none ->
  418. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  419. end.
  420. %%--------------------------------------------------------------------
  421. %% Client -> Broker: PUBREL
  422. %%--------------------------------------------------------------------
  423. -spec pubrel(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  424. {ok, session()} | {error, emqx_types:reason_code()}.
  425. pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
  426. case maps:take(PacketId, AwaitingRel) of
  427. {_Ts, AwaitingRel1} ->
  428. {ok, Session#session{awaiting_rel = AwaitingRel1}};
  429. error ->
  430. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  431. end.
  432. %%--------------------------------------------------------------------
  433. %% Client -> Broker: PUBCOMP
  434. %%--------------------------------------------------------------------
  435. -spec pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  436. {ok, session()}
  437. | {ok, replies(), session()}
  438. | {error, emqx_types:reason_code()}.
  439. pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
  440. case emqx_inflight:lookup(PacketId, Inflight) of
  441. {value, #inflight_data{phase = wait_comp, message = Msg}} ->
  442. on_delivery_completed(Msg, Session),
  443. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  444. dequeue(ClientInfo, Session#session{inflight = Inflight1});
  445. {value, _Other} ->
  446. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  447. none ->
  448. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  449. end.
  450. %%--------------------------------------------------------------------
  451. %% Dequeue Msgs
  452. %%--------------------------------------------------------------------
  453. dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
  454. case emqx_mqueue:is_empty(Q) of
  455. true ->
  456. {ok, Session};
  457. false ->
  458. {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
  459. do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1})
  460. end.
  461. dequeue(_ClientInfo, 0, Msgs, Q) ->
  462. {lists:reverse(Msgs), Q};
  463. dequeue(ClientInfo, Cnt, Msgs, Q) ->
  464. case emqx_mqueue:out(Q) of
  465. {empty, _Q} ->
  466. dequeue(ClientInfo, 0, Msgs, Q);
  467. {{value, Msg}, Q1} ->
  468. case emqx_message:is_expired(Msg) of
  469. true ->
  470. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
  471. ok = inc_delivery_expired_cnt(),
  472. dequeue(ClientInfo, Cnt, Msgs, Q1);
  473. false ->
  474. dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg | Msgs], Q1)
  475. end
  476. end.
  477. filter_queue(Pred, #session{mqueue = Q} = Session) ->
  478. Session#session{mqueue = emqx_mqueue:filter(Pred, Q)}.
  479. acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
  480. acc_cnt(_Msg, Cnt) -> Cnt - 1.
  481. %%--------------------------------------------------------------------
  482. %% Broker -> Client: Deliver
  483. %%--------------------------------------------------------------------
  484. -spec deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session()) ->
  485. {ok, session()} | {ok, replies(), session()}.
  486. %% Optimize
  487. deliver(ClientInfo, [Deliver], Session) ->
  488. Msg = enrich_deliver(Deliver, Session),
  489. deliver_msg(ClientInfo, Msg, Session);
  490. deliver(ClientInfo, Delivers, Session) ->
  491. Msgs = [enrich_deliver(D, Session) || D <- Delivers],
  492. do_deliver(ClientInfo, Msgs, [], Session).
  493. do_deliver(_ClientInfo, [], Publishes, Session) ->
  494. {ok, lists:reverse(Publishes), Session};
  495. do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
  496. case deliver_msg(ClientInfo, Msg, Session) of
  497. {ok, Session1} ->
  498. do_deliver(ClientInfo, More, Acc, Session1);
  499. {ok, [Publish], Session1} ->
  500. do_deliver(ClientInfo, More, [Publish | Acc], Session1)
  501. end.
  502. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
  503. %
  504. on_delivery_completed(Msg, Session),
  505. {ok, [{undefined, maybe_ack(Msg)}], Session};
  506. deliver_msg(
  507. ClientInfo,
  508. Msg = #message{qos = QoS},
  509. Session =
  510. #session{next_pkt_id = PacketId, inflight = Inflight}
  511. ) when
  512. QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2
  513. ->
  514. case emqx_inflight:is_full(Inflight) of
  515. true ->
  516. Session1 =
  517. case maybe_nack(Msg) of
  518. true -> Session;
  519. false -> enqueue(ClientInfo, Msg, Session)
  520. end,
  521. {ok, Session1};
  522. false ->
  523. %% Note that we publish message without shared ack header
  524. %% But add to inflight with ack headers
  525. %% This ack header is required for redispatch-on-terminate feature to work
  526. Publish = {PacketId, maybe_ack(Msg)},
  527. MarkedMsg = mark_begin_deliver(Msg),
  528. Inflight1 = emqx_inflight:insert(PacketId, with_ts(MarkedMsg), Inflight),
  529. {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
  530. end.
  531. -spec enqueue(
  532. emqx_types:clientinfo(),
  533. list(emqx_types:deliver()) | emqx_types:message(),
  534. session()
  535. ) -> session().
  536. enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
  537. lists:foldl(
  538. fun(Deliver, Session0) ->
  539. Msg = enrich_deliver(Deliver, Session),
  540. enqueue(ClientInfo, Msg, Session0)
  541. end,
  542. Session,
  543. Delivers
  544. );
  545. enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) ->
  546. {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
  547. (Dropped =/= undefined) andalso handle_dropped(ClientInfo, Dropped, Session),
  548. Session#session{mqueue = NewQ}.
  549. handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
  550. Payload = emqx_message:to_log_map(Msg),
  551. #{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q),
  552. case (QoS == ?QOS_0) andalso (not StoreQos0) of
  553. true ->
  554. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
  555. ok = emqx_metrics:inc('delivery.dropped'),
  556. ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
  557. ok = inc_pd('send_msg.dropped'),
  558. ?SLOG(
  559. warning,
  560. #{
  561. msg => "dropped_qos0_msg",
  562. queue => QueueInfo,
  563. payload => Payload
  564. },
  565. #{topic => Topic}
  566. );
  567. false ->
  568. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
  569. ok = emqx_metrics:inc('delivery.dropped'),
  570. ok = emqx_metrics:inc('delivery.dropped.queue_full'),
  571. ok = inc_pd('send_msg.dropped'),
  572. ok = inc_pd('send_msg.dropped.queue_full'),
  573. ?SLOG(
  574. warning,
  575. #{
  576. msg => "dropped_msg_due_to_mqueue_is_full",
  577. queue => QueueInfo,
  578. payload => Payload
  579. },
  580. #{topic => Topic}
  581. )
  582. end.
  583. enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) ->
  584. enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
  585. maybe_ack(Msg) ->
  586. emqx_shared_sub:maybe_ack(Msg).
  587. maybe_nack(Msg) ->
  588. emqx_shared_sub:maybe_nack_dropped(Msg).
  589. get_subopts(Topic, SubMap) ->
  590. case maps:find(Topic, SubMap) of
  591. {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
  592. [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}];
  593. {ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
  594. [{nl, Nl}, {qos, QoS}, {rap, Rap}];
  595. error ->
  596. []
  597. end.
  598. enrich_subopts([], Msg, _Session) ->
  599. Msg;
  600. enrich_subopts([{nl, 1} | Opts], Msg, Session) ->
  601. enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session);
  602. enrich_subopts([{nl, 0} | Opts], Msg, Session) ->
  603. enrich_subopts(Opts, Msg, Session);
  604. enrich_subopts(
  605. [{qos, SubQoS} | Opts],
  606. Msg = #message{qos = PubQoS},
  607. Session = #session{upgrade_qos = true}
  608. ) ->
  609. enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
  610. enrich_subopts(
  611. [{qos, SubQoS} | Opts],
  612. Msg = #message{qos = PubQoS},
  613. Session = #session{upgrade_qos = false}
  614. ) ->
  615. enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
  616. enrich_subopts([{rap, 1} | Opts], Msg, Session) ->
  617. enrich_subopts(Opts, Msg, Session);
  618. enrich_subopts([{rap, 0} | Opts], Msg = #message{headers = #{retained := true}}, Session) ->
  619. enrich_subopts(Opts, Msg, Session);
  620. enrich_subopts([{rap, 0} | Opts], Msg, Session) ->
  621. enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
  622. enrich_subopts([{subid, SubId} | Opts], Msg, Session) ->
  623. Props = emqx_message:get_header(properties, Msg, #{}),
  624. Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
  625. enrich_subopts(Opts, Msg1, Session).
  626. %%--------------------------------------------------------------------
  627. %% Retry Delivery
  628. %%--------------------------------------------------------------------
  629. -spec retry(emqx_types:clientinfo(), session()) ->
  630. {ok, session()} | {ok, replies(), timeout(), session()}.
  631. retry(ClientInfo, Session = #session{inflight = Inflight}) ->
  632. case emqx_inflight:is_empty(Inflight) of
  633. true ->
  634. {ok, Session};
  635. false ->
  636. Now = erlang:system_time(millisecond),
  637. retry_delivery(
  638. emqx_inflight:to_list(fun sort_fun/2, Inflight),
  639. [],
  640. Now,
  641. Session,
  642. ClientInfo
  643. )
  644. end.
  645. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
  646. {ok, lists:reverse(Acc), Interval, Session};
  647. retry_delivery(
  648. [{PacketId, #inflight_data{timestamp = Ts} = Data} | More],
  649. Acc,
  650. Now,
  651. Session = #session{retry_interval = Interval, inflight = Inflight},
  652. ClientInfo
  653. ) ->
  654. case (Age = age(Now, Ts)) >= Interval of
  655. true ->
  656. {Acc1, Inflight1} = do_retry_delivery(PacketId, Data, Now, Acc, Inflight, ClientInfo),
  657. retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
  658. false ->
  659. {ok, lists:reverse(Acc), Interval - max(0, Age), Session}
  660. end.
  661. do_retry_delivery(
  662. PacketId,
  663. #inflight_data{phase = wait_ack, message = Msg} = Data,
  664. Now,
  665. Acc,
  666. Inflight,
  667. ClientInfo
  668. ) ->
  669. case emqx_message:is_expired(Msg) of
  670. true ->
  671. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
  672. ok = inc_delivery_expired_cnt(),
  673. {Acc, emqx_inflight:delete(PacketId, Inflight)};
  674. false ->
  675. Msg1 = emqx_message:set_flag(dup, true, Msg),
  676. Update = Data#inflight_data{message = Msg1, timestamp = Now},
  677. Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
  678. {[{PacketId, Msg1} | Acc], Inflight1}
  679. end;
  680. do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) ->
  681. Update = Data#inflight_data{timestamp = Now},
  682. Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
  683. {[{pubrel, PacketId} | Acc], Inflight1}.
  684. %%--------------------------------------------------------------------
  685. %% Expire Awaiting Rel
  686. %%--------------------------------------------------------------------
  687. -spec expire(emqx_types:clientinfo(), awaiting_rel, session()) ->
  688. {ok, session()} | {ok, timeout(), session()}.
  689. expire(_ClientInfo, awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
  690. case maps:size(AwaitingRel) of
  691. 0 -> {ok, Session};
  692. _ -> expire_awaiting_rel(erlang:system_time(millisecond), Session)
  693. end.
  694. expire_awaiting_rel(
  695. Now,
  696. Session = #session{
  697. awaiting_rel = AwaitingRel,
  698. await_rel_timeout = Timeout
  699. }
  700. ) ->
  701. NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
  702. AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
  703. ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
  704. (ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt),
  705. NSession = Session#session{awaiting_rel = AwaitingRel1},
  706. case maps:size(AwaitingRel1) of
  707. 0 -> {ok, NSession};
  708. _ -> {ok, Timeout, NSession}
  709. end.
  710. %%--------------------------------------------------------------------
  711. %% Takeover, Resume and Replay
  712. %%--------------------------------------------------------------------
  713. -spec takeover(session()) -> ok.
  714. takeover(#session{subscriptions = Subs}) ->
  715. lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
  716. -spec resume(emqx_types:clientinfo(), session()) -> ok.
  717. resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = Subs}) ->
  718. lists:foreach(
  719. fun({TopicFilter, SubOpts}) ->
  720. ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
  721. end,
  722. maps:to_list(Subs)
  723. ),
  724. ok = emqx_metrics:inc('session.resumed'),
  725. emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
  726. -spec replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}.
  727. replay(ClientInfo, Session = #session{inflight = Inflight}) ->
  728. Pubs = lists:map(
  729. fun
  730. ({PacketId, #inflight_data{phase = wait_comp}}) ->
  731. {pubrel, PacketId};
  732. ({PacketId, #inflight_data{message = Msg}}) ->
  733. {PacketId, emqx_message:set_flag(dup, true, Msg)}
  734. end,
  735. emqx_inflight:to_list(Inflight)
  736. ),
  737. case dequeue(ClientInfo, Session) of
  738. {ok, NSession} -> {ok, Pubs, NSession};
  739. {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession}
  740. end.
  741. -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
  742. terminate(ClientInfo, Reason, Session) ->
  743. run_terminate_hooks(ClientInfo, Reason, Session),
  744. maybe_redispatch_shared_messages(Reason, Session),
  745. ok.
  746. run_terminate_hooks(ClientInfo, discarded, Session) ->
  747. run_hook('session.discarded', [ClientInfo, info(Session)]);
  748. run_terminate_hooks(ClientInfo, takenover, Session) ->
  749. run_hook('session.takenover', [ClientInfo, info(Session)]);
  750. run_terminate_hooks(ClientInfo, Reason, Session) ->
  751. run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
  752. maybe_redispatch_shared_messages(takenover, _Session) ->
  753. ok;
  754. maybe_redispatch_shared_messages(kicked, _Session) ->
  755. ok;
  756. maybe_redispatch_shared_messages(_Reason, Session) ->
  757. redispatch_shared_messages(Session).
  758. redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
  759. AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight),
  760. F = fun
  761. ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) ->
  762. %% For QoS 2, here is what the spec says:
  763. %% If the Client's Session terminates before the Client reconnects,
  764. %% the Server MUST NOT send the Application Message to any other
  765. %% subscribed Client [MQTT-4.8.2-5].
  766. {true, Msg};
  767. ({_PacketId, #inflight_data{}}) ->
  768. false
  769. end,
  770. InflightList = lists:filtermap(F, AllInflights),
  771. emqx_shared_sub:redispatch(InflightList ++ emqx_mqueue:to_list(Q)).
  772. -compile({inline, [run_hook/2]}).
  773. run_hook(Name, Args) ->
  774. ok = emqx_metrics:inc(Name),
  775. emqx_hooks:run(Name, Args).
  776. %%--------------------------------------------------------------------
  777. %% Inc message/delivery expired counter
  778. %%--------------------------------------------------------------------
  779. inc_delivery_expired_cnt() ->
  780. inc_delivery_expired_cnt(1).
  781. inc_delivery_expired_cnt(N) ->
  782. ok = inc_pd('send_msg.dropped', N),
  783. ok = inc_pd('send_msg.dropped.expired', N),
  784. ok = emqx_metrics:inc('delivery.dropped', N),
  785. emqx_metrics:inc('delivery.dropped.expired', N).
  786. inc_await_pubrel_timeout(N) ->
  787. ok = inc_pd('recv_msg.dropped', N),
  788. ok = inc_pd('recv_msg.dropped.await_pubrel_timeout', N),
  789. ok = emqx_metrics:inc('messages.dropped', N),
  790. emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
  791. inc_pd(Key) ->
  792. inc_pd(Key, 1).
  793. inc_pd(Key, Inc) ->
  794. _ = emqx_pd:inc_counter(Key, Inc),
  795. ok.
  796. %%--------------------------------------------------------------------
  797. %% Next Packet Id
  798. %%--------------------------------------------------------------------
  799. obtain_next_pkt_id(Session) ->
  800. {Session#session.next_pkt_id, next_pkt_id(Session)}.
  801. next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
  802. Session#session{next_pkt_id = 1};
  803. next_pkt_id(Session = #session{next_pkt_id = Id}) ->
  804. Session#session{next_pkt_id = Id + 1}.
  805. %%--------------------------------------------------------------------
  806. %% Message Latency Stats
  807. %%--------------------------------------------------------------------
  808. on_delivery_completed(
  809. Msg,
  810. #session{created_at = CreateAt, clientid = ClientId}
  811. ) ->
  812. emqx:run_hook(
  813. 'delivery.completed',
  814. [
  815. Msg,
  816. #{session_birth_time => CreateAt, clientid => ClientId}
  817. ]
  818. ).
  819. mark_begin_deliver(Msg) ->
  820. emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg).
  821. %%--------------------------------------------------------------------
  822. %% Helper functions
  823. %%--------------------------------------------------------------------
  824. -compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
  825. sort_fun({_, A}, {_, B}) ->
  826. A#inflight_data.timestamp =< B#inflight_data.timestamp.
  827. batch_n(Inflight) ->
  828. case emqx_inflight:max_size(Inflight) of
  829. 0 -> ?DEFAULT_BATCH_N;
  830. Sz -> Sz - emqx_inflight:size(Inflight)
  831. end.
  832. with_ts(Msg) ->
  833. #inflight_data{
  834. phase = wait_ack,
  835. message = Msg,
  836. timestamp = erlang:system_time(millisecond)
  837. }.
  838. age(Now, Ts) -> Now - Ts.
  839. %%--------------------------------------------------------------------
  840. %% For CT tests
  841. %%--------------------------------------------------------------------
  842. set_field(Name, Value, Session) ->
  843. Pos = emqx_misc:index_of(Name, record_info(fields, session)),
  844. setelement(Pos + 1, Session, Value).
  845. get_mqueue(#session{mqueue = Q}) ->
  846. emqx_mqueue:to_list(Q).