emqx_session.erl 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc
  18. %% A stateful interaction between a Client and a Server. Some Sessions
  19. %% last only as long as the Network Connection, others can span multiple
  20. %% consecutive Network Connections between a Client and a Server.
  21. %%
  22. %% The Session State in the Server consists of:
  23. %%
  24. %% The existence of a Session, even if the rest of the Session State is empty.
  25. %%
  26. %% The Clients subscriptions, including any Subscription Identifiers.
  27. %%
  28. %% QoS 1 and QoS 2 messages which have been sent to the Client, but have not
  29. %% been completely acknowledged.
  30. %%
  31. %% QoS 1 and QoS 2 messages pending transmission to the Client and OPTIONALLY
  32. %% QoS 0 messages pending transmission to the Client.
  33. %%
  34. %% QoS 2 messages which have been received from the Client, but have not been
  35. %% completely acknowledged.The Will Message and the Will Delay Interval
  36. %%
  37. %% If the Session is currently not connected, the time at which the Session
  38. %% will end and Session State will be discarded.
  39. %% @end
  40. %%--------------------------------------------------------------------
  41. %% MQTT Session
  42. -module(emqx_session).
  43. -include("emqx.hrl").
  44. -include("emqx_mqtt.hrl").
  45. -include("logger.hrl").
  46. -include("types.hrl").
  47. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  48. -ifdef(TEST).
  49. -compile(export_all).
  50. -compile(nowarn_export_all).
  51. -endif.
  52. -export([init/1]).
  53. -export([
  54. info/1,
  55. info/2,
  56. is_session/1,
  57. stats/1,
  58. obtain_next_pkt_id/1,
  59. get_mqueue/1
  60. ]).
  61. -export([
  62. subscribe/4,
  63. unsubscribe/4
  64. ]).
  65. -export([
  66. publish/4,
  67. puback/3,
  68. pubrec/3,
  69. pubrel/3,
  70. pubcomp/3
  71. ]).
  72. -export([
  73. deliver/3,
  74. enqueue/3,
  75. dequeue/2,
  76. ignore_local/4,
  77. retry/2,
  78. terminate/3
  79. ]).
  80. -export([
  81. takeover/1,
  82. resume/2,
  83. replay/2
  84. ]).
  85. -export([expire/3]).
  86. %% Export for CT
  87. -export([set_field/3]).
  88. -type sessionID() :: emqx_guid:guid().
  89. -export_type([
  90. session/0,
  91. sessionID/0
  92. ]).
  93. -record(session, {
  94. %% Client's id
  95. clientid :: emqx_types:clientid(),
  96. id :: sessionID(),
  97. %% Is this session a persistent session i.e. was it started with Session-Expiry > 0
  98. is_persistent :: boolean(),
  99. %% Client’s Subscriptions.
  100. subscriptions :: map(),
  101. %% Max subscriptions allowed
  102. max_subscriptions :: non_neg_integer() | infinity,
  103. %% Upgrade QoS?
  104. upgrade_qos :: boolean(),
  105. %% Client <- Broker: QoS1/2 messages sent to the client but
  106. %% have not been unacked.
  107. inflight :: emqx_inflight:inflight(),
  108. %% All QoS1/2 messages published to when client is disconnected,
  109. %% or QoS1/2 messages pending transmission to the Client.
  110. %%
  111. %% Optionally, QoS0 messages pending transmission to the Client.
  112. mqueue :: emqx_mqueue:mqueue(),
  113. %% Next packet id of the session
  114. next_pkt_id = 1 :: emqx_types:packet_id(),
  115. %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
  116. retry_interval :: timeout(),
  117. %% Client -> Broker: QoS2 messages received from the client, but
  118. %% have not been completely acknowledged
  119. awaiting_rel :: map(),
  120. %% Maximum number of awaiting QoS2 messages allowed
  121. max_awaiting_rel :: non_neg_integer() | infinity,
  122. %% Awaiting PUBREL Timeout (Unit: millisecond)
  123. await_rel_timeout :: timeout(),
  124. %% Created at
  125. created_at :: pos_integer()
  126. %% Message deliver latency stats
  127. }).
  128. -type inflight_data_phase() :: wait_ack | wait_comp.
  129. -record(inflight_data, {
  130. phase :: inflight_data_phase(),
  131. message :: emqx_types:message(),
  132. timestamp :: non_neg_integer()
  133. }).
  134. -type session() :: #session{}.
  135. -type publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}.
  136. -type pubrel() :: {pubrel, emqx_types:packet_id()}.
  137. -type replies() :: list(publish() | pubrel()).
  138. -define(INFO_KEYS, [
  139. id,
  140. is_persistent,
  141. subscriptions,
  142. upgrade_qos,
  143. retry_interval,
  144. await_rel_timeout,
  145. created_at
  146. ]).
  147. -define(STATS_KEYS, [
  148. subscriptions_cnt,
  149. subscriptions_max,
  150. inflight_cnt,
  151. inflight_max,
  152. mqueue_len,
  153. mqueue_max,
  154. mqueue_dropped,
  155. next_pkt_id,
  156. awaiting_rel_cnt,
  157. awaiting_rel_max
  158. ]).
  159. -define(DEFAULT_BATCH_N, 1000).
  160. -type options() :: #{
  161. max_subscriptions => non_neg_integer(),
  162. upgrade_qos => boolean(),
  163. retry_interval => timeout(),
  164. max_awaiting_rel => non_neg_integer() | infinity,
  165. await_rel_timeout => timeout(),
  166. max_inflight => integer(),
  167. mqueue => emqx_mqueue:options(),
  168. is_persistent => boolean(),
  169. clientid => emqx_types:clientid()
  170. }.
  171. %%--------------------------------------------------------------------
  172. %% Init a Session
  173. %%--------------------------------------------------------------------
  174. -spec init(options()) -> session().
  175. init(Opts) ->
  176. MaxInflight = maps:get(max_inflight, Opts, 1),
  177. QueueOpts = maps:merge(
  178. #{
  179. max_len => 1000,
  180. store_qos0 => true
  181. },
  182. maps:get(mqueue, Opts, #{})
  183. ),
  184. #session{
  185. id = emqx_guid:gen(),
  186. clientid = maps:get(clientid, Opts, <<>>),
  187. is_persistent = maps:get(is_persistent, Opts, false),
  188. max_subscriptions = maps:get(max_subscriptions, Opts, infinity),
  189. subscriptions = #{},
  190. upgrade_qos = maps:get(upgrade_qos, Opts, false),
  191. inflight = emqx_inflight:new(MaxInflight),
  192. mqueue = emqx_mqueue:init(QueueOpts),
  193. next_pkt_id = 1,
  194. retry_interval = maps:get(retry_interval, Opts, 30000),
  195. awaiting_rel = #{},
  196. max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
  197. await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000),
  198. created_at = erlang:system_time(millisecond)
  199. }.
  200. %%--------------------------------------------------------------------
  201. %% Info, Stats
  202. %%--------------------------------------------------------------------
  203. is_session(#session{}) -> true;
  204. is_session(_) -> false.
  205. %% @doc Get infos of the session.
  206. -spec info(session()) -> emqx_types:infos().
  207. info(Session) ->
  208. maps:from_list(info(?INFO_KEYS, Session)).
  209. info(Keys, Session) when is_list(Keys) ->
  210. [{Key, info(Key, Session)} || Key <- Keys];
  211. info(id, #session{id = Id}) ->
  212. Id;
  213. info(is_persistent, #session{is_persistent = Bool}) ->
  214. Bool;
  215. info(subscriptions, #session{subscriptions = Subs}) ->
  216. Subs;
  217. info(subscriptions_cnt, #session{subscriptions = Subs}) ->
  218. maps:size(Subs);
  219. info(subscriptions_max, #session{max_subscriptions = MaxSubs}) ->
  220. MaxSubs;
  221. info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) ->
  222. UpgradeQoS;
  223. info(inflight, #session{inflight = Inflight}) ->
  224. Inflight;
  225. info(inflight_cnt, #session{inflight = Inflight}) ->
  226. emqx_inflight:size(Inflight);
  227. info(inflight_max, #session{inflight = Inflight}) ->
  228. emqx_inflight:max_size(Inflight);
  229. info(retry_interval, #session{retry_interval = Interval}) ->
  230. Interval;
  231. info(mqueue, #session{mqueue = MQueue}) ->
  232. MQueue;
  233. info(mqueue_len, #session{mqueue = MQueue}) ->
  234. emqx_mqueue:len(MQueue);
  235. info(mqueue_max, #session{mqueue = MQueue}) ->
  236. emqx_mqueue:max_len(MQueue);
  237. info(mqueue_dropped, #session{mqueue = MQueue}) ->
  238. emqx_mqueue:dropped(MQueue);
  239. info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
  240. PacketId;
  241. info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
  242. AwaitingRel;
  243. info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
  244. maps:size(AwaitingRel);
  245. info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
  246. Max;
  247. info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
  248. Timeout;
  249. info(created_at, #session{created_at = CreatedAt}) ->
  250. CreatedAt.
  251. %% @doc Get stats of the session.
  252. -spec stats(session()) -> emqx_types:stats().
  253. stats(Session) -> info(?STATS_KEYS, Session).
  254. %%--------------------------------------------------------------------
  255. %% Ignore local messages
  256. %%--------------------------------------------------------------------
  257. ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
  258. Subs = info(subscriptions, Session),
  259. lists:dropwhile(
  260. fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
  261. case maps:find(Topic, Subs) of
  262. {ok, #{nl := 1}} when Subscriber =:= Publisher ->
  263. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
  264. ok = emqx_metrics:inc('delivery.dropped'),
  265. ok = emqx_metrics:inc('delivery.dropped.no_local'),
  266. true;
  267. _ ->
  268. false
  269. end
  270. end,
  271. Delivers
  272. ).
  273. %%--------------------------------------------------------------------
  274. %% Client -> Broker: SUBSCRIBE
  275. %%--------------------------------------------------------------------
  276. -spec subscribe(
  277. emqx_types:clientinfo(),
  278. emqx_types:topic(),
  279. emqx_types:subopts(),
  280. session()
  281. ) ->
  282. {ok, session()} | {error, emqx_types:reason_code()}.
  283. subscribe(
  284. ClientInfo = #{clientid := ClientId},
  285. TopicFilter,
  286. SubOpts,
  287. Session = #session{id = SessionID, is_persistent = IsPS, subscriptions = Subs}
  288. ) ->
  289. IsNew = not maps:is_key(TopicFilter, Subs),
  290. case IsNew andalso is_subscriptions_full(Session) of
  291. false ->
  292. ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
  293. ok = emqx_persistent_session:add_subscription(TopicFilter, SessionID, IsPS),
  294. ok = emqx_hooks:run(
  295. 'session.subscribed',
  296. [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]
  297. ),
  298. {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
  299. true ->
  300. {error, ?RC_QUOTA_EXCEEDED}
  301. end.
  302. is_subscriptions_full(#session{max_subscriptions = infinity}) ->
  303. false;
  304. is_subscriptions_full(#session{
  305. subscriptions = Subs,
  306. max_subscriptions = MaxLimit
  307. }) ->
  308. maps:size(Subs) >= MaxLimit.
  309. %%--------------------------------------------------------------------
  310. %% Client -> Broker: UNSUBSCRIBE
  311. %%--------------------------------------------------------------------
  312. -spec unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session()) ->
  313. {ok, session()} | {error, emqx_types:reason_code()}.
  314. unsubscribe(
  315. ClientInfo,
  316. TopicFilter,
  317. UnSubOpts,
  318. Session = #session{id = SessionID, subscriptions = Subs, is_persistent = IsPS}
  319. ) ->
  320. case maps:find(TopicFilter, Subs) of
  321. {ok, SubOpts} ->
  322. ok = emqx_broker:unsubscribe(TopicFilter),
  323. ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS),
  324. ok = emqx_hooks:run(
  325. 'session.unsubscribed',
  326. [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]
  327. ),
  328. {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
  329. error ->
  330. {error, ?RC_NO_SUBSCRIPTION_EXISTED}
  331. end.
  332. %%--------------------------------------------------------------------
  333. %% Client -> Broker: PUBLISH
  334. %%--------------------------------------------------------------------
  335. -spec publish(emqx_types:clientinfo(), emqx_types:packet_id(), emqx_types:message(), session()) ->
  336. {ok, emqx_types:publish_result(), session()}
  337. | {error, emqx_types:reason_code()}.
  338. publish(
  339. _ClientInfo,
  340. PacketId,
  341. Msg = #message{qos = ?QOS_2, timestamp = Ts},
  342. Session = #session{awaiting_rel = AwaitingRel}
  343. ) ->
  344. case is_awaiting_full(Session) of
  345. false ->
  346. case maps:is_key(PacketId, AwaitingRel) of
  347. false ->
  348. Results = emqx_broker:publish(Msg),
  349. AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
  350. {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
  351. true ->
  352. drop_qos2_msg(PacketId, Msg, ?RC_PACKET_IDENTIFIER_IN_USE)
  353. end;
  354. true ->
  355. drop_qos2_msg(PacketId, Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED)
  356. end;
  357. %% Publish QoS0/1 directly
  358. publish(_ClientInfo, _PacketId, Msg, Session) ->
  359. {ok, emqx_broker:publish(Msg), Session}.
  360. drop_qos2_msg(PacketId, Msg, RC) ->
  361. ?SLOG(
  362. warning,
  363. #{
  364. msg => "dropped_qos2_packet",
  365. reason => emqx_reason_codes:name(RC),
  366. packet_id => PacketId
  367. },
  368. #{topic => Msg#message.topic}
  369. ),
  370. ok = emqx_metrics:inc('messages.dropped'),
  371. ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(RC)]),
  372. {error, RC}.
  373. is_awaiting_full(#session{max_awaiting_rel = infinity}) ->
  374. false;
  375. is_awaiting_full(#session{
  376. awaiting_rel = AwaitingRel,
  377. max_awaiting_rel = MaxLimit
  378. }) ->
  379. maps:size(AwaitingRel) >= MaxLimit.
  380. %%--------------------------------------------------------------------
  381. %% Client -> Broker: PUBACK
  382. %%--------------------------------------------------------------------
  383. -spec puback(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  384. {ok, emqx_types:message(), session()}
  385. | {ok, emqx_types:message(), replies(), session()}
  386. | {error, emqx_types:reason_code()}.
  387. puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
  388. case emqx_inflight:lookup(PacketId, Inflight) of
  389. {value, #inflight_data{phase = wait_ack, message = Msg}} ->
  390. on_delivery_completed(Msg, Session),
  391. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  392. return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
  393. {value, _} ->
  394. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  395. none ->
  396. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  397. end.
  398. return_with(Msg, {ok, Session}) ->
  399. {ok, Msg, Session};
  400. return_with(Msg, {ok, Publishes, Session}) ->
  401. {ok, Msg, Publishes, Session}.
  402. %%--------------------------------------------------------------------
  403. %% Client -> Broker: PUBREC
  404. %%--------------------------------------------------------------------
  405. -spec pubrec(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  406. {ok, emqx_types:message(), session()}
  407. | {error, emqx_types:reason_code()}.
  408. pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
  409. case emqx_inflight:lookup(PacketId, Inflight) of
  410. {value, #inflight_data{phase = wait_ack, message = Msg} = Data} ->
  411. Update = Data#inflight_data{phase = wait_comp},
  412. Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
  413. {ok, Msg, Session#session{inflight = Inflight1}};
  414. {value, _} ->
  415. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  416. none ->
  417. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  418. end.
  419. %%--------------------------------------------------------------------
  420. %% Client -> Broker: PUBREL
  421. %%--------------------------------------------------------------------
  422. -spec pubrel(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  423. {ok, session()} | {error, emqx_types:reason_code()}.
  424. pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
  425. case maps:take(PacketId, AwaitingRel) of
  426. {_Ts, AwaitingRel1} ->
  427. {ok, Session#session{awaiting_rel = AwaitingRel1}};
  428. error ->
  429. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  430. end.
  431. %%--------------------------------------------------------------------
  432. %% Client -> Broker: PUBCOMP
  433. %%--------------------------------------------------------------------
  434. -spec pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session()) ->
  435. {ok, session()}
  436. | {ok, replies(), session()}
  437. | {error, emqx_types:reason_code()}.
  438. pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
  439. case emqx_inflight:lookup(PacketId, Inflight) of
  440. {value, #inflight_data{phase = wait_comp, message = Msg}} ->
  441. on_delivery_completed(Msg, Session),
  442. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  443. dequeue(ClientInfo, Session#session{inflight = Inflight1});
  444. {value, _Other} ->
  445. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  446. none ->
  447. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  448. end.
  449. %%--------------------------------------------------------------------
  450. %% Dequeue Msgs
  451. %%--------------------------------------------------------------------
  452. dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
  453. case emqx_mqueue:is_empty(Q) of
  454. true ->
  455. {ok, Session};
  456. false ->
  457. {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
  458. do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1})
  459. end.
  460. dequeue(_ClientInfo, 0, Msgs, Q) ->
  461. {lists:reverse(Msgs), Q};
  462. dequeue(ClientInfo, Cnt, Msgs, Q) ->
  463. case emqx_mqueue:out(Q) of
  464. {empty, _Q} ->
  465. dequeue(ClientInfo, 0, Msgs, Q);
  466. {{value, Msg}, Q1} ->
  467. case emqx_message:is_expired(Msg) of
  468. true ->
  469. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
  470. ok = inc_delivery_expired_cnt(),
  471. dequeue(ClientInfo, Cnt, Msgs, Q1);
  472. false ->
  473. dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg | Msgs], Q1)
  474. end
  475. end.
  476. acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
  477. acc_cnt(_Msg, Cnt) -> Cnt - 1.
  478. %%--------------------------------------------------------------------
  479. %% Broker -> Client: Deliver
  480. %%--------------------------------------------------------------------
  481. -spec deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session()) ->
  482. {ok, session()} | {ok, replies(), session()}.
  483. %% Optimize
  484. deliver(ClientInfo, [Deliver], Session) ->
  485. Msg = enrich_deliver(Deliver, Session),
  486. deliver_msg(ClientInfo, Msg, Session);
  487. deliver(ClientInfo, Delivers, Session) ->
  488. Msgs = [enrich_deliver(D, Session) || D <- Delivers],
  489. do_deliver(ClientInfo, Msgs, [], Session).
  490. do_deliver(_ClientInfo, [], Publishes, Session) ->
  491. {ok, lists:reverse(Publishes), Session};
  492. do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
  493. case deliver_msg(ClientInfo, Msg, Session) of
  494. {ok, Session1} ->
  495. do_deliver(ClientInfo, More, Acc, Session1);
  496. {ok, [Publish], Session1} ->
  497. do_deliver(ClientInfo, More, [Publish | Acc], Session1)
  498. end.
  499. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
  500. %
  501. on_delivery_completed(Msg, Session),
  502. {ok, [{undefined, maybe_ack(Msg)}], Session};
  503. deliver_msg(
  504. ClientInfo,
  505. Msg = #message{qos = QoS},
  506. Session =
  507. #session{next_pkt_id = PacketId, inflight = Inflight}
  508. ) when
  509. QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2
  510. ->
  511. case emqx_inflight:is_full(Inflight) of
  512. true ->
  513. Session1 =
  514. case maybe_nack(Msg) of
  515. true -> Session;
  516. false -> enqueue(ClientInfo, Msg, Session)
  517. end,
  518. {ok, Session1};
  519. false ->
  520. %% Note that we publish message without shared ack header
  521. %% But add to inflight with ack headers
  522. %% This ack header is required for redispatch-on-terminate feature to work
  523. Publish = {PacketId, maybe_ack(Msg)},
  524. MarkedMsg = mark_begin_deliver(Msg),
  525. Inflight1 = emqx_inflight:insert(PacketId, with_ts(MarkedMsg), Inflight),
  526. {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
  527. end.
  528. -spec enqueue(
  529. emqx_types:clientinfo(),
  530. list(emqx_types:deliver()) | emqx_types:message(),
  531. session()
  532. ) -> session().
  533. enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
  534. lists:foldl(
  535. fun(Deliver, Session0) ->
  536. Msg = enrich_deliver(Deliver, Session),
  537. enqueue(ClientInfo, Msg, Session0)
  538. end,
  539. Session,
  540. Delivers
  541. );
  542. enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) ->
  543. {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
  544. (Dropped =/= undefined) andalso handle_dropped(ClientInfo, Dropped, Session),
  545. Session#session{mqueue = NewQ}.
  546. handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
  547. Payload = emqx_message:to_log_map(Msg),
  548. #{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q),
  549. case (QoS == ?QOS_0) andalso (not StoreQos0) of
  550. true ->
  551. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
  552. ok = emqx_metrics:inc('delivery.dropped'),
  553. ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
  554. ok = inc_pd('send_msg.dropped'),
  555. ?SLOG(
  556. warning,
  557. #{
  558. msg => "dropped_qos0_msg",
  559. queue => QueueInfo,
  560. payload => Payload
  561. },
  562. #{topic => Topic}
  563. );
  564. false ->
  565. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
  566. ok = emqx_metrics:inc('delivery.dropped'),
  567. ok = emqx_metrics:inc('delivery.dropped.queue_full'),
  568. ok = inc_pd('send_msg.dropped'),
  569. ok = inc_pd('send_msg.dropped.queue_full'),
  570. ?SLOG(
  571. warning,
  572. #{
  573. msg => "dropped_msg_due_to_mqueue_is_full",
  574. queue => QueueInfo,
  575. payload => Payload
  576. },
  577. #{topic => Topic}
  578. )
  579. end.
  580. enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) ->
  581. enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
  582. maybe_ack(Msg) ->
  583. emqx_shared_sub:maybe_ack(Msg).
  584. maybe_nack(Msg) ->
  585. emqx_shared_sub:maybe_nack_dropped(Msg).
  586. get_subopts(Topic, SubMap) ->
  587. case maps:find(Topic, SubMap) of
  588. {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
  589. [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}];
  590. {ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
  591. [{nl, Nl}, {qos, QoS}, {rap, Rap}];
  592. error ->
  593. []
  594. end.
  595. enrich_subopts([], Msg, _Session) ->
  596. Msg;
  597. enrich_subopts([{nl, 1} | Opts], Msg, Session) ->
  598. enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session);
  599. enrich_subopts([{nl, 0} | Opts], Msg, Session) ->
  600. enrich_subopts(Opts, Msg, Session);
  601. enrich_subopts(
  602. [{qos, SubQoS} | Opts],
  603. Msg = #message{qos = PubQoS},
  604. Session = #session{upgrade_qos = true}
  605. ) ->
  606. enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
  607. enrich_subopts(
  608. [{qos, SubQoS} | Opts],
  609. Msg = #message{qos = PubQoS},
  610. Session = #session{upgrade_qos = false}
  611. ) ->
  612. enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
  613. enrich_subopts([{rap, 1} | Opts], Msg, Session) ->
  614. enrich_subopts(Opts, Msg, Session);
  615. enrich_subopts([{rap, 0} | Opts], Msg = #message{headers = #{retained := true}}, Session) ->
  616. enrich_subopts(Opts, Msg, Session);
  617. enrich_subopts([{rap, 0} | Opts], Msg, Session) ->
  618. enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
  619. enrich_subopts([{subid, SubId} | Opts], Msg, Session) ->
  620. Props = emqx_message:get_header(properties, Msg, #{}),
  621. Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
  622. enrich_subopts(Opts, Msg1, Session).
  623. %%--------------------------------------------------------------------
  624. %% Retry Delivery
  625. %%--------------------------------------------------------------------
  626. -spec retry(emqx_types:clientinfo(), session()) ->
  627. {ok, session()} | {ok, replies(), timeout(), session()}.
  628. retry(ClientInfo, Session = #session{inflight = Inflight}) ->
  629. case emqx_inflight:is_empty(Inflight) of
  630. true ->
  631. {ok, Session};
  632. false ->
  633. Now = erlang:system_time(millisecond),
  634. retry_delivery(
  635. emqx_inflight:to_list(fun sort_fun/2, Inflight),
  636. [],
  637. Now,
  638. Session,
  639. ClientInfo
  640. )
  641. end.
  642. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
  643. {ok, lists:reverse(Acc), Interval, Session};
  644. retry_delivery(
  645. [{PacketId, #inflight_data{timestamp = Ts} = Data} | More],
  646. Acc,
  647. Now,
  648. Session = #session{retry_interval = Interval, inflight = Inflight},
  649. ClientInfo
  650. ) ->
  651. case (Age = age(Now, Ts)) >= Interval of
  652. true ->
  653. {Acc1, Inflight1} = do_retry_delivery(PacketId, Data, Now, Acc, Inflight, ClientInfo),
  654. retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
  655. false ->
  656. {ok, lists:reverse(Acc), Interval - max(0, Age), Session}
  657. end.
  658. do_retry_delivery(
  659. PacketId,
  660. #inflight_data{phase = wait_ack, message = Msg} = Data,
  661. Now,
  662. Acc,
  663. Inflight,
  664. ClientInfo
  665. ) ->
  666. case emqx_message:is_expired(Msg) of
  667. true ->
  668. ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
  669. ok = inc_delivery_expired_cnt(),
  670. {Acc, emqx_inflight:delete(PacketId, Inflight)};
  671. false ->
  672. Msg1 = emqx_message:set_flag(dup, true, Msg),
  673. Update = Data#inflight_data{message = Msg1, timestamp = Now},
  674. Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
  675. {[{PacketId, Msg1} | Acc], Inflight1}
  676. end;
  677. do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) ->
  678. Update = Data#inflight_data{timestamp = Now},
  679. Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
  680. {[{pubrel, PacketId} | Acc], Inflight1}.
  681. %%--------------------------------------------------------------------
  682. %% Expire Awaiting Rel
  683. %%--------------------------------------------------------------------
  684. -spec expire(emqx_types:clientinfo(), awaiting_rel, session()) ->
  685. {ok, session()} | {ok, timeout(), session()}.
  686. expire(_ClientInfo, awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
  687. case maps:size(AwaitingRel) of
  688. 0 -> {ok, Session};
  689. _ -> expire_awaiting_rel(erlang:system_time(millisecond), Session)
  690. end.
  691. expire_awaiting_rel(
  692. Now,
  693. Session = #session{
  694. awaiting_rel = AwaitingRel,
  695. await_rel_timeout = Timeout
  696. }
  697. ) ->
  698. NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
  699. AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
  700. ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
  701. (ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt),
  702. NSession = Session#session{awaiting_rel = AwaitingRel1},
  703. case maps:size(AwaitingRel1) of
  704. 0 -> {ok, NSession};
  705. _ -> {ok, Timeout, NSession}
  706. end.
  707. %%--------------------------------------------------------------------
  708. %% Takeover, Resume and Replay
  709. %%--------------------------------------------------------------------
  710. -spec takeover(session()) -> ok.
  711. takeover(#session{subscriptions = Subs}) ->
  712. lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
  713. -spec resume(emqx_types:clientinfo(), session()) -> ok.
  714. resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = Subs}) ->
  715. lists:foreach(
  716. fun({TopicFilter, SubOpts}) ->
  717. ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
  718. end,
  719. maps:to_list(Subs)
  720. ),
  721. ok = emqx_metrics:inc('session.resumed'),
  722. emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
  723. -spec replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}.
  724. replay(ClientInfo, Session = #session{inflight = Inflight}) ->
  725. Pubs = lists:map(
  726. fun
  727. ({PacketId, #inflight_data{phase = wait_comp}}) ->
  728. {pubrel, PacketId};
  729. ({PacketId, #inflight_data{message = Msg}}) ->
  730. {PacketId, emqx_message:set_flag(dup, true, Msg)}
  731. end,
  732. emqx_inflight:to_list(Inflight)
  733. ),
  734. case dequeue(ClientInfo, Session) of
  735. {ok, NSession} -> {ok, Pubs, NSession};
  736. {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession}
  737. end.
  738. -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
  739. terminate(ClientInfo, Reason, Session) ->
  740. run_terminate_hooks(ClientInfo, Reason, Session),
  741. maybe_redispatch_shared_messages(Reason, Session),
  742. ok.
  743. run_terminate_hooks(ClientInfo, discarded, Session) ->
  744. run_hook('session.discarded', [ClientInfo, info(Session)]);
  745. run_terminate_hooks(ClientInfo, takenover, Session) ->
  746. run_hook('session.takenover', [ClientInfo, info(Session)]);
  747. run_terminate_hooks(ClientInfo, Reason, Session) ->
  748. run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
  749. maybe_redispatch_shared_messages(takenover, _Session) ->
  750. ok;
  751. maybe_redispatch_shared_messages(kicked, _Session) ->
  752. ok;
  753. maybe_redispatch_shared_messages(_Reason, Session) ->
  754. redispatch_shared_messages(Session).
  755. redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
  756. AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight),
  757. F = fun
  758. ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) ->
  759. %% For QoS 2, here is what the spec says:
  760. %% If the Client's Session terminates before the Client reconnects,
  761. %% the Server MUST NOT send the Application Message to any other
  762. %% subscribed Client [MQTT-4.8.2-5].
  763. {true, Msg};
  764. ({_PacketId, #inflight_data{}}) ->
  765. false
  766. end,
  767. InflightList = lists:filtermap(F, AllInflights),
  768. emqx_shared_sub:redispatch(InflightList ++ emqx_mqueue:to_list(Q)).
  769. -compile({inline, [run_hook/2]}).
  770. run_hook(Name, Args) ->
  771. ok = emqx_metrics:inc(Name),
  772. emqx_hooks:run(Name, Args).
  773. %%--------------------------------------------------------------------
  774. %% Inc message/delivery expired counter
  775. %%--------------------------------------------------------------------
  776. inc_delivery_expired_cnt() ->
  777. inc_delivery_expired_cnt(1).
  778. inc_delivery_expired_cnt(N) ->
  779. ok = inc_pd('send_msg.dropped', N),
  780. ok = inc_pd('send_msg.dropped.expired', N),
  781. ok = emqx_metrics:inc('delivery.dropped', N),
  782. emqx_metrics:inc('delivery.dropped.expired', N).
  783. inc_await_pubrel_timeout(N) ->
  784. ok = inc_pd('recv_msg.dropped', N),
  785. ok = inc_pd('recv_msg.dropped.await_pubrel_timeout', N),
  786. ok = emqx_metrics:inc('messages.dropped', N),
  787. emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
  788. inc_pd(Key) ->
  789. inc_pd(Key, 1).
  790. inc_pd(Key, Inc) ->
  791. _ = emqx_pd:inc_counter(Key, Inc),
  792. ok.
  793. %%--------------------------------------------------------------------
  794. %% Next Packet Id
  795. %%--------------------------------------------------------------------
  796. obtain_next_pkt_id(Session) ->
  797. {Session#session.next_pkt_id, next_pkt_id(Session)}.
  798. next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
  799. Session#session{next_pkt_id = 1};
  800. next_pkt_id(Session = #session{next_pkt_id = Id}) ->
  801. Session#session{next_pkt_id = Id + 1}.
  802. %%--------------------------------------------------------------------
  803. %% Message Latency Stats
  804. %%--------------------------------------------------------------------
  805. on_delivery_completed(
  806. Msg,
  807. #session{created_at = CreateAt, clientid = ClientId}
  808. ) ->
  809. emqx:run_hook(
  810. 'delivery.completed',
  811. [
  812. Msg,
  813. #{session_birth_time => CreateAt, clientid => ClientId}
  814. ]
  815. ).
  816. mark_begin_deliver(Msg) ->
  817. emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg).
  818. %%--------------------------------------------------------------------
  819. %% Helper functions
  820. %%--------------------------------------------------------------------
  821. -compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
  822. sort_fun({_, A}, {_, B}) ->
  823. A#inflight_data.timestamp =< B#inflight_data.timestamp.
  824. batch_n(Inflight) ->
  825. case emqx_inflight:max_size(Inflight) of
  826. 0 -> ?DEFAULT_BATCH_N;
  827. Sz -> Sz - emqx_inflight:size(Inflight)
  828. end.
  829. with_ts(Msg) ->
  830. #inflight_data{
  831. phase = wait_ack,
  832. message = Msg,
  833. timestamp = erlang:system_time(millisecond)
  834. }.
  835. age(Now, Ts) -> Now - Ts.
  836. %%--------------------------------------------------------------------
  837. %% For CT tests
  838. %%--------------------------------------------------------------------
  839. set_field(Name, Value, Session) ->
  840. Pos = emqx_misc:index_of(Name, record_info(fields, session)),
  841. setelement(Pos + 1, Session, Value).
  842. get_mqueue(#session{mqueue = Q}) ->
  843. emqx_mqueue:to_list(Q).