emqx_session.erl 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc
  18. %% A stateful interaction between a Client and a Server. Some Sessions
  19. %% last only as long as the Network Connection, others can span multiple
  20. %% consecutive Network Connections between a Client and a Server.
  21. %%
  22. %% The Session State in the Server consists of:
  23. %%
  24. %% The existence of a Session, even if the rest of the Session State is empty.
  25. %%
  26. %% The Clients subscriptions, including any Subscription Identifiers.
  27. %%
  28. %% QoS 1 and QoS 2 messages which have been sent to the Client, but have not
  29. %% been completely acknowledged.
  30. %%
  31. %% QoS 1 and QoS 2 messages pending transmission to the Client and OPTIONALLY
  32. %% QoS 0 messages pending transmission to the Client.
  33. %%
  34. %% QoS 2 messages which have been received from the Client, but have not been
  35. %% completely acknowledged.The Will Message and the Will Delay Interval
  36. %%
  37. %% If the Session is currently not connected, the time at which the Session
  38. %% will end and Session State will be discarded.
  39. %% @end
  40. %%--------------------------------------------------------------------
  41. %% MQTT Session
  42. -module(emqx_session).
  43. -include("emqx.hrl").
  44. -include("emqx_mqtt.hrl").
  45. -include("logger.hrl").
  46. -include("types.hrl").
  47. -logger_header("[Session]").
  48. -export([init/3]).
  49. -export([ info/1
  50. , attrs/1
  51. , stats/1
  52. ]).
  53. -export([ subscribe/4
  54. , unsubscribe/3
  55. ]).
  56. -export([ publish/3
  57. , puback/3
  58. , pubrec/3
  59. , pubrel/3
  60. , pubcomp/3
  61. ]).
  62. -export([deliver/2]).
  63. -export([timeout/3]).
  64. -export_type([session/0]).
  65. -import(emqx_zone,
  66. [ get_env/2
  67. , get_env/3
  68. ]).
  69. -record(session, {
  70. %% Clean Start Flag
  71. clean_start :: boolean(),
  72. %% Client’s Subscriptions.
  73. subscriptions :: map(),
  74. %% Max subscriptions allowed
  75. max_subscriptions :: non_neg_integer(),
  76. %% Upgrade QoS?
  77. upgrade_qos :: boolean(),
  78. %% Client <- Broker:
  79. %% Inflight QoS1, QoS2 messages sent to the client but unacked.
  80. inflight :: emqx_inflight:inflight(),
  81. %% All QoS1, QoS2 messages published to when client is disconnected.
  82. %% QoS 1 and QoS 2 messages pending transmission to the Client.
  83. %%
  84. %% Optionally, QoS 0 messages pending transmission to the Client.
  85. mqueue :: emqx_mqueue:mqueue(),
  86. %% Next packet id of the session
  87. next_pkt_id = 1 :: emqx_types:packet_id(),
  88. %% Retry interval for redelivering QoS1/2 messages
  89. retry_interval :: timeout(),
  90. %% Retry delivery timer
  91. retry_timer :: maybe(reference()),
  92. %% Client -> Broker:
  93. %% Inflight QoS2 messages received from client and waiting for pubrel.
  94. awaiting_rel :: map(),
  95. %% Max Packets Awaiting PUBREL
  96. max_awaiting_rel :: non_neg_integer(),
  97. %% Awaiting PUBREL Timer
  98. await_rel_timer :: maybe(reference()),
  99. %% Awaiting PUBREL Timeout
  100. await_rel_timeout :: timeout(),
  101. %% Session Expiry Interval
  102. expiry_interval :: timeout(),
  103. %% Expired Timer
  104. expiry_timer :: maybe(reference()),
  105. %% Created at
  106. created_at :: erlang:timestamp()
  107. }).
  108. -opaque(session() :: #session{}).
  109. -type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}).
  110. -define(DEFAULT_BATCH_N, 1000).
  111. %%--------------------------------------------------------------------
  112. %% Init a session
  113. %%--------------------------------------------------------------------
  114. %% @doc Init a session.
  115. -spec(init(boolean(), emqx_types:client(), Options :: map()) -> session()).
  116. init(CleanStart, #{zone := Zone}, #{max_inflight := MaxInflight,
  117. expiry_interval := ExpiryInterval}) ->
  118. #session{clean_start = CleanStart,
  119. max_subscriptions = get_env(Zone, max_subscriptions, 0),
  120. subscriptions = #{},
  121. upgrade_qos = get_env(Zone, upgrade_qos, false),
  122. inflight = emqx_inflight:new(MaxInflight),
  123. mqueue = init_mqueue(Zone),
  124. next_pkt_id = 1,
  125. retry_interval = get_env(Zone, retry_interval, 0),
  126. awaiting_rel = #{},
  127. max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
  128. await_rel_timeout = get_env(Zone, await_rel_timeout),
  129. expiry_interval = ExpiryInterval,
  130. created_at = os:timestamp()
  131. }.
  132. init_mqueue(Zone) ->
  133. emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
  134. store_qos0 => get_env(Zone, mqueue_store_qos0, true),
  135. priorities => get_env(Zone, mqueue_priorities),
  136. default_priority => get_env(Zone, mqueue_default_priority)
  137. }).
  138. %%--------------------------------------------------------------------
  139. %% Infos of the session
  140. %%--------------------------------------------------------------------
  141. -spec(info(session()) -> emqx_types:infos()).
  142. info(#session{clean_start = CleanStart,
  143. max_subscriptions = MaxSubscriptions,
  144. subscriptions = Subscriptions,
  145. upgrade_qos = UpgradeQoS,
  146. inflight = Inflight,
  147. retry_interval = RetryInterval,
  148. mqueue = MQueue,
  149. next_pkt_id = PacketId,
  150. max_awaiting_rel = MaxAwaitingRel,
  151. awaiting_rel = AwaitingRel,
  152. await_rel_timeout = AwaitRelTimeout,
  153. expiry_interval = ExpiryInterval,
  154. created_at = CreatedAt}) ->
  155. #{clean_start => CleanStart,
  156. subscriptions => Subscriptions,
  157. max_subscriptions => MaxSubscriptions,
  158. upgrade_qos => UpgradeQoS,
  159. inflight => emqx_inflight:size(Inflight),
  160. max_inflight => emqx_inflight:max_size(Inflight),
  161. retry_interval => RetryInterval,
  162. mqueue_len => emqx_mqueue:len(MQueue),
  163. max_mqueue => emqx_mqueue:max_len(MQueue),
  164. mqueue_dropped => emqx_mqueue:dropped(MQueue),
  165. next_pkt_id => PacketId,
  166. awaiting_rel => maps:size(AwaitingRel),
  167. max_awaiting_rel => MaxAwaitingRel,
  168. await_rel_timeout => AwaitRelTimeout,
  169. expiry_interval => ExpiryInterval div 1000,
  170. created_at => CreatedAt
  171. }.
  172. %%--------------------------------------------------------------------
  173. %% Attrs of the session
  174. %%--------------------------------------------------------------------
  175. -spec(attrs(session()) -> emqx_types:attrs()).
  176. attrs(#session{clean_start = CleanStart,
  177. expiry_interval = ExpiryInterval,
  178. created_at = CreatedAt}) ->
  179. #{clean_start => CleanStart,
  180. expiry_interval => ExpiryInterval,
  181. created_at => CreatedAt
  182. }.
  183. %%--------------------------------------------------------------------
  184. %% Stats of the session
  185. %%--------------------------------------------------------------------
  186. %% @doc Get stats of the session.
  187. -spec(stats(session()) -> emqx_types:stats()).
  188. stats(#session{subscriptions = Subscriptions,
  189. max_subscriptions = MaxSubscriptions,
  190. inflight = Inflight,
  191. mqueue = MQueue,
  192. awaiting_rel = AwaitingRel,
  193. max_awaiting_rel = MaxAwaitingRel}) ->
  194. [{subscriptions, maps:size(Subscriptions)},
  195. {max_subscriptions, MaxSubscriptions},
  196. {inflight, emqx_inflight:size(Inflight)},
  197. {max_inflight, emqx_inflight:max_size(Inflight)},
  198. {mqueue_len, emqx_mqueue:len(MQueue)},
  199. {max_mqueue, emqx_mqueue:max_len(MQueue)},
  200. {mqueue_dropped, emqx_mqueue:dropped(MQueue)},
  201. {awaiting_rel, maps:size(AwaitingRel)},
  202. {max_awaiting_rel, MaxAwaitingRel}].
  203. %%--------------------------------------------------------------------
  204. %% Client -> Broker: SUBSCRIBE
  205. %%--------------------------------------------------------------------
  206. -spec(subscribe(emqx_types:client(), emqx_types:topic(), emqx_types:subopts(),
  207. session()) -> {ok, session()} | {error, emqx_types:reason_code()}).
  208. subscribe(Client, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) ->
  209. case is_subscriptions_full(Session)
  210. andalso (not maps:is_key(TopicFilter, Subs)) of
  211. true -> {error, ?RC_QUOTA_EXCEEDED};
  212. false ->
  213. do_subscribe(Client, TopicFilter, SubOpts, Session)
  214. end.
  215. is_subscriptions_full(#session{max_subscriptions = 0}) ->
  216. false;
  217. is_subscriptions_full(#session{max_subscriptions = MaxLimit,
  218. subscriptions = Subs}) ->
  219. maps:size(Subs) >= MaxLimit.
  220. do_subscribe(Client = #{client_id := ClientId},
  221. TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) ->
  222. case IsNew = (not maps:is_key(TopicFilter, Subs)) of
  223. true ->
  224. ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts);
  225. false ->
  226. _ = emqx_broker:set_subopts(TopicFilter, SubOpts)
  227. end,
  228. ok = emqx_hooks:run('session.subscribed',
  229. [Client, TopicFilter, SubOpts#{new => IsNew}]),
  230. Subs1 = maps:put(TopicFilter, SubOpts, Subs),
  231. {ok, Session#session{subscriptions = Subs1}}.
  232. %%--------------------------------------------------------------------
  233. %% Client -> Broker: UNSUBSCRIBE
  234. %%--------------------------------------------------------------------
  235. -spec(unsubscribe(emqx_types:client(), emqx_types:topic(), session())
  236. -> {ok, session()} | {error, emqx_types:reason_code()}).
  237. unsubscribe(Client, TopicFilter, Session = #session{subscriptions = Subs}) ->
  238. case maps:find(TopicFilter, Subs) of
  239. {ok, SubOpts} ->
  240. ok = emqx_broker:unsubscribe(TopicFilter),
  241. ok = emqx_hooks:run('session.unsubscribed', [Client, TopicFilter, SubOpts]),
  242. {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
  243. error ->
  244. {error, ?RC_NO_SUBSCRIPTION_EXISTED}
  245. end.
  246. %%--------------------------------------------------------------------
  247. %% Client -> Broker: PUBLISH
  248. %%--------------------------------------------------------------------
  249. -spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
  250. -> {ok, emqx_types:deliver_results()} |
  251. {ok, emqx_types:deliver_results(), session()} |
  252. {error, emqx_types:reason_code()}).
  253. publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
  254. case is_awaiting_full(Session) of
  255. false ->
  256. do_publish(PacketId, Msg, Session);
  257. true ->
  258. ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
  259. ok = emqx_metrics:inc('messages.qos2.dropped'),
  260. {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
  261. end;
  262. %% Publish QoS0/1 directly
  263. publish(_PacketId, Msg, _Session) ->
  264. {ok, emqx_broker:publish(Msg)}.
  265. is_awaiting_full(#session{max_awaiting_rel = 0}) ->
  266. false;
  267. is_awaiting_full(#session{awaiting_rel = AwaitingRel,
  268. max_awaiting_rel = MaxLimit}) ->
  269. maps:size(AwaitingRel) >= MaxLimit.
  270. -compile({inline, [do_publish/3]}).
  271. do_publish(PacketId, Msg = #message{timestamp = Ts},
  272. Session = #session{awaiting_rel = AwaitingRel}) ->
  273. case maps:is_key(PacketId, AwaitingRel) of
  274. false ->
  275. DeliverResults = emqx_broker:publish(Msg),
  276. AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
  277. Session1 = Session#session{awaiting_rel = AwaitingRel1},
  278. {ok, DeliverResults, ensure_await_rel_timer(Session1)};
  279. true ->
  280. {error, ?RC_PACKET_IDENTIFIER_IN_USE}
  281. end.
  282. %%--------------------------------------------------------------------
  283. %% Client -> Broker: PUBACK
  284. %%--------------------------------------------------------------------
  285. -spec(puback(emqx_types:packet_id(), emqx_types:reason_code(), session())
  286. -> {ok, session()} | {ok, list(publish()), session()} |
  287. {error, emqx_types:reason_code()}).
  288. puback(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
  289. case emqx_inflight:lookup(PacketId, Inflight) of
  290. {value, {Msg, _Ts}} when is_record(Msg, message) ->
  291. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  292. dequeue(Session#session{inflight = Inflight1});
  293. false ->
  294. ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
  295. ok = emqx_metrics:inc('packets.puback.missed'),
  296. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  297. end.
  298. %%--------------------------------------------------------------------
  299. %% Client -> Broker: PUBREC
  300. %%--------------------------------------------------------------------
  301. -spec(pubrec(emqx_types:packet_id(), emqx_types:reason_code(), session())
  302. -> {ok, session()} | {error, emqx_types:reason_code()}).
  303. pubrec(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
  304. case emqx_inflight:lookup(PacketId, Inflight) of
  305. {value, {Msg, _Ts}} when is_record(Msg, message) ->
  306. Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight),
  307. {ok, Session#session{inflight = Inflight1}};
  308. {value, {pubrel, _Ts}} ->
  309. ?LOG(warning, "The PUBREC ~w is duplicated", [PacketId]),
  310. ok = emqx_metrics:inc('packets.pubrec.inuse'),
  311. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  312. none ->
  313. ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
  314. ok = emqx_metrics:inc('packets.pubrec.missed'),
  315. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  316. end.
  317. %%--------------------------------------------------------------------
  318. %% Client -> Broker: PUBREL
  319. %%--------------------------------------------------------------------
  320. -spec(pubrel(emqx_types:packet_id(), emqx_types:reason_code(), session())
  321. -> {ok, session()} | {error, emqx_types:reason_code()}).
  322. pubrel(PacketId, _ReasonCode, Session = #session{awaiting_rel = AwaitingRel}) ->
  323. case maps:take(PacketId, AwaitingRel) of
  324. {_Ts, AwaitingRel1} ->
  325. {ok, Session#session{awaiting_rel = AwaitingRel1}};
  326. error ->
  327. ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
  328. ok = emqx_metrics:inc('packets.pubrel.missed'),
  329. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  330. end.
  331. %%--------------------------------------------------------------------
  332. %% Client -> Broker: PUBCOMP
  333. %%--------------------------------------------------------------------
  334. -spec(pubcomp(emqx_types:packet_id(), emqx_types:reason_code(), session())
  335. -> {ok, session()} | {ok, list(publish()), session()} |
  336. {error, emqx_types:reason_code()}).
  337. pubcomp(PacketId, _ReasonCode, Session = #session{inflight = Inflight}) ->
  338. case emqx_inflight:contain(PacketId, Inflight) of
  339. true ->
  340. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  341. dequeue(Session#session{inflight = Inflight1});
  342. false ->
  343. ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
  344. ok = emqx_metrics:inc('packets.pubcomp.missed'),
  345. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  346. end.
  347. %%--------------------------------------------------------------------
  348. %% Dequeue Msgs
  349. %%--------------------------------------------------------------------
  350. dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
  351. case emqx_mqueue:is_empty(Q) of
  352. true -> {ok, Session};
  353. false ->
  354. {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
  355. deliver(lists:reverse(Msgs), [], Session#session{mqueue = Q1})
  356. end.
  357. dequeue(Cnt, Msgs, Q) when Cnt =< 0 ->
  358. {Msgs, Q};
  359. dequeue(Cnt, Msgs, Q) ->
  360. case emqx_mqueue:out(Q) of
  361. {empty, _Q} -> {Msgs, Q};
  362. {{value, Msg}, Q1} ->
  363. dequeue(Cnt-1, [Msg|Msgs], Q1)
  364. end.
  365. batch_n(Inflight) ->
  366. case emqx_inflight:max_size(Inflight) of
  367. 0 -> ?DEFAULT_BATCH_N;
  368. Sz -> Sz - emqx_inflight:size(Inflight)
  369. end.
  370. %%--------------------------------------------------------------------
  371. %% Broker -> Client: Publish | Msg
  372. %%--------------------------------------------------------------------
  373. deliver(Delivers, Session = #session{subscriptions = Subs})
  374. when is_list(Delivers) ->
  375. Msgs = [enrich(get_subopts(Topic, Subs), Msg, Session)
  376. || {deliver, Topic, Msg} <- Delivers],
  377. deliver(Msgs, [], Session).
  378. deliver([], Publishes, Session) ->
  379. {ok, lists:reverse(Publishes), Session};
  380. deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) ->
  381. deliver(More, [{publish, undefined, Msg}|Acc], Session);
  382. deliver([Msg = #message{qos = QoS}|More], Acc,
  383. Session = #session{next_pkt_id = PacketId, inflight = Inflight})
  384. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
  385. case emqx_inflight:is_full(Inflight) of
  386. true ->
  387. deliver(More, Acc, enqueue(Msg, Session));
  388. false ->
  389. Publish = {publish, PacketId, Msg},
  390. Session1 = await(PacketId, Msg, Session),
  391. deliver(More, [Publish|Acc], next_pkt_id(Session1))
  392. end.
  393. enqueue(Msg, Session = #session{mqueue = Q}) ->
  394. emqx_pd:update_counter(enqueue_stats, 1),
  395. {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
  396. if
  397. Dropped =/= undefined ->
  398. %% TODO:...
  399. %% SessProps = #{client_id => ClientId, username => Username},
  400. ok; %% = emqx_hooks:run('message.dropped', [SessProps, Dropped]);
  401. true -> ok
  402. end,
  403. Session#session{mqueue = NewQ}.
  404. %%--------------------------------------------------------------------
  405. %% Awaiting ACK for QoS1/QoS2 Messages
  406. %%--------------------------------------------------------------------
  407. await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
  408. Inflight1 = emqx_inflight:insert(
  409. PacketId, {Msg, os:timestamp()}, Inflight),
  410. ensure_retry_timer(Session#session{inflight = Inflight1}).
  411. get_subopts(Topic, SubMap) ->
  412. case maps:find(Topic, SubMap) of
  413. {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
  414. [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}];
  415. {ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
  416. [{nl, Nl}, {qos, QoS}, {rap, Rap}];
  417. error -> []
  418. end.
  419. enrich([], Msg, _Session) ->
  420. Msg;
  421. %%enrich([{nl, 1}|_Opts], #message{from = ClientId}, #session{client_id = ClientId}) ->
  422. %% ignore;
  423. enrich([{nl, _}|Opts], Msg, Session) ->
  424. enrich(Opts, Msg, Session);
  425. enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= true}) ->
  426. enrich(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
  427. enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= false}) ->
  428. enrich(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
  429. enrich([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, Session = #session{}) ->
  430. enrich(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, Session);
  431. enrich([{rap, 0}|Opts], Msg = #message{flags = Flags}, Session) ->
  432. enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
  433. enrich([{rap, _}|Opts], Msg, Session) ->
  434. enrich(Opts, Msg, Session);
  435. enrich([{subid, SubId}|Opts], Msg, Session) ->
  436. enrich(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), Session).
  437. %%--------------------------------------------------------------------
  438. %% Handle timeout
  439. %%--------------------------------------------------------------------
  440. -spec(timeout(reference(), atom(), session())
  441. -> {ok, session()} | {ok, list(), session()}).
  442. timeout(TRef, retry_delivery, Session = #session{retry_timer = TRef}) ->
  443. retry_delivery(Session#session{retry_timer = undefined});
  444. timeout(TRef, check_awaiting_rel, Session = #session{await_rel_timer = TRef}) ->
  445. expire_awaiting_rel(Session);
  446. timeout(TRef, Msg, Session) ->
  447. ?LOG(error, "unexpected timeout - ~p: ~p", [TRef, Msg]),
  448. {ok, Session}.
  449. %%--------------------------------------------------------------------
  450. %% Ensure retry timer
  451. %%--------------------------------------------------------------------
  452. ensure_retry_timer(Session = #session{retry_interval = Interval,
  453. retry_timer = undefined}) ->
  454. ensure_retry_timer(Interval, Session);
  455. ensure_retry_timer(Session) ->
  456. Session.
  457. ensure_retry_timer(Interval, Session = #session{retry_timer = undefined}) ->
  458. TRef = emqx_misc:start_timer(Interval, retry_delivery),
  459. Session#session{retry_timer = TRef};
  460. ensure_retry_timer(_Interval, Session) ->
  461. Session.
  462. %%--------------------------------------------------------------------
  463. %% Retry Delivery
  464. %%--------------------------------------------------------------------
  465. %% Redeliver at once if force is true
  466. retry_delivery(Session = #session{inflight = Inflight}) ->
  467. case emqx_inflight:is_empty(Inflight) of
  468. true -> {ok, Session};
  469. false ->
  470. SortFun = fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end,
  471. Msgs = lists:sort(SortFun, emqx_inflight:to_list(Inflight)),
  472. retry_delivery(Msgs, os:timestamp(), [], Session)
  473. end.
  474. retry_delivery([], _Now, Acc, Session) ->
  475. %% Retry again...
  476. {ok, lists:reverse(Acc), ensure_retry_timer(Session)};
  477. retry_delivery([{PacketId, {Val, Ts}}|More], Now, Acc,
  478. Session = #session{retry_interval = Interval, inflight = Inflight}) ->
  479. %% Microseconds -> MilliSeconds
  480. Age = timer:now_diff(Now, Ts) div 1000,
  481. if
  482. Age >= Interval ->
  483. {Acc1, Inflight1} = retry_delivery(PacketId, Val, Now, Acc, Inflight),
  484. retry_delivery(More, Now, Acc1, Session#session{inflight = Inflight1});
  485. true ->
  486. {ok, lists:reverse(Acc), ensure_retry_timer(Interval - max(0, Age), Session)}
  487. end.
  488. retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
  489. case emqx_message:is_expired(Msg) of
  490. true ->
  491. ok = emqx_metrics:inc('messages.expired'),
  492. {Acc, emqx_inflight:delete(PacketId, Inflight)};
  493. false ->
  494. {[{publish, PacketId, Msg}|Acc],
  495. emqx_inflight:update(PacketId, {Msg, Now}, Inflight)}
  496. end;
  497. retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
  498. Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
  499. {[{pubrel, PacketId}|Acc], Inflight1}.
  500. %%--------------------------------------------------------------------
  501. %% Ensure await_rel timer
  502. %%--------------------------------------------------------------------
  503. ensure_await_rel_timer(Session = #session{await_rel_timeout = Timeout,
  504. await_rel_timer = undefined}) ->
  505. ensure_await_rel_timer(Timeout, Session);
  506. ensure_await_rel_timer(Session) ->
  507. Session.
  508. ensure_await_rel_timer(Timeout, Session = #session{await_rel_timer = undefined}) ->
  509. TRef = emqx_misc:start_timer(Timeout, check_awaiting_rel),
  510. Session#session{await_rel_timer = TRef};
  511. ensure_await_rel_timer(_Timeout, Session) ->
  512. Session.
  513. %%--------------------------------------------------------------------
  514. %% Expire Awaiting Rel
  515. %%--------------------------------------------------------------------
  516. expire_awaiting_rel(Session = #session{awaiting_rel = AwaitingRel}) ->
  517. case maps:size(AwaitingRel) of
  518. 0 -> {ok, Session};
  519. _ -> expire_awaiting_rel(lists:keysort(2, maps:to_list(AwaitingRel)), os:timestamp(), Session)
  520. end.
  521. expire_awaiting_rel([], _Now, Session) ->
  522. {ok, Session#session{await_rel_timer = undefined}};
  523. expire_awaiting_rel([{PacketId, Ts} | More], Now,
  524. Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
  525. case (timer:now_diff(Now, Ts) div 1000) of
  526. Age when Age >= Timeout ->
  527. ok = emqx_metrics:inc('messages.qos2.expired'),
  528. ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
  529. Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)},
  530. expire_awaiting_rel(More, Now, Session1);
  531. Age ->
  532. ensure_await_rel_timer(Timeout - max(0, Age), Session)
  533. end.
  534. %%--------------------------------------------------------------------
  535. %% Next Packet Id
  536. %%--------------------------------------------------------------------
  537. next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) ->
  538. Session#session{next_pkt_id = 1};
  539. next_pkt_id(Session = #session{next_pkt_id = Id}) ->
  540. Session#session{next_pkt_id = Id + 1}.