emqx_session.erl 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc
  18. %% A stateful interaction between a Client and a Server. Some Sessions
  19. %% last only as long as the Network Connection, others can span multiple
  20. %% consecutive Network Connections between a Client and a Server.
  21. %%
  22. %% The Session State in the Server consists of:
  23. %%
  24. %% The existence of a Session, even if the rest of the Session State is empty.
  25. %%
  26. %% The Clients subscriptions, including any Subscription Identifiers.
  27. %%
  28. %% QoS 1 and QoS 2 messages which have been sent to the Client, but have not
  29. %% been completely acknowledged.
  30. %%
  31. %% QoS 1 and QoS 2 messages pending transmission to the Client and OPTIONALLY
  32. %% QoS 0 messages pending transmission to the Client.
  33. %%
  34. %% QoS 2 messages which have been received from the Client, but have not been
  35. %% completely acknowledged.The Will Message and the Will Delay Interval
  36. %%
  37. %% If the Session is currently not connected, the time at which the Session
  38. %% will end and Session State will be discarded.
  39. %% @end
  40. %%--------------------------------------------------------------------
  41. %% MQTT Session
  42. -module(emqx_session).
  43. -include("emqx.hrl").
  44. -include("emqx_mqtt.hrl").
  45. -include("logger.hrl").
  46. -include("types.hrl").
  47. -logger_header("[Session]").
  48. -ifdef(TEST).
  49. -compile(export_all).
  50. -compile(nowarn_export_all).
  51. -endif.
  52. -export([init/2]).
  53. -export([ info/1
  54. , info/2
  55. , stats/1
  56. ]).
  57. -export([ subscribe/4
  58. , unsubscribe/3
  59. ]).
  60. -export([ publish/3
  61. , puback/2
  62. , pubrec/2
  63. , pubrel/2
  64. , pubcomp/2
  65. ]).
  66. -export([ deliver/2
  67. , enqueue/2
  68. , retry/1
  69. ]).
  70. -export([ takeover/1
  71. , resume/2
  72. , replay/1
  73. ]).
  74. -export([expire/2]).
  75. %% Export for CT
  76. -export([set_field/3]).
  77. -export_type([session/0]).
  78. -import(emqx_zone, [get_env/3]).
  79. -record(session, {
  80. %% Client’s Subscriptions.
  81. subscriptions :: map(),
  82. %% Max subscriptions allowed
  83. max_subscriptions :: non_neg_integer(),
  84. %% Upgrade QoS?
  85. upgrade_qos :: boolean(),
  86. %% Client <- Broker: QoS1/2 messages sent to the client but
  87. %% have not been unacked.
  88. inflight :: emqx_inflight:inflight(),
  89. %% All QoS1/2 messages published to when client is disconnected,
  90. %% or QoS1/2 messages pending transmission to the Client.
  91. %%
  92. %% Optionally, QoS0 messages pending transmission to the Client.
  93. mqueue :: emqx_mqueue:mqueue(),
  94. %% Next packet id of the session
  95. next_pkt_id = 1 :: emqx_types:packet_id(),
  96. %% Retry interval for redelivering QoS1/2 messages (Unit: millsecond)
  97. retry_interval :: timeout(),
  98. %% Client -> Broker: QoS2 messages received from the client, but
  99. %% have not been completely acknowledged
  100. awaiting_rel :: map(),
  101. %% Maximum number of awaiting QoS2 messages allowed
  102. max_awaiting_rel :: non_neg_integer(),
  103. %% Awaiting PUBREL Timeout (Unit: millsecond)
  104. await_rel_timeout :: timeout(),
  105. %% Created at
  106. created_at :: pos_integer()
  107. }).
  108. -opaque(session() :: #session{}).
  109. -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}).
  110. -type(pubrel() :: {pubrel, emqx_types:packet_id()}).
  111. -type(replies() :: list(publish() | pubrel())).
  112. -define(INFO_KEYS, [subscriptions,
  113. upgrade_qos,
  114. retry_interval,
  115. await_rel_timeout,
  116. created_at
  117. ]).
  118. -define(STATS_KEYS, [subscriptions_cnt,
  119. subscriptions_max,
  120. inflight_cnt,
  121. inflight_max,
  122. mqueue_len,
  123. mqueue_max,
  124. mqueue_dropped,
  125. next_pkt_id,
  126. awaiting_rel_cnt,
  127. awaiting_rel_max
  128. ]).
  129. -define(DEFAULT_BATCH_N, 1000).
  130. %%--------------------------------------------------------------------
  131. %% Init a Session
  132. %%--------------------------------------------------------------------
  133. -spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()).
  134. init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
  135. #session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
  136. subscriptions = #{},
  137. upgrade_qos = get_env(Zone, upgrade_qos, false),
  138. inflight = emqx_inflight:new(MaxInflight),
  139. mqueue = init_mqueue(Zone),
  140. next_pkt_id = 1,
  141. retry_interval = timer:seconds(get_env(Zone, retry_interval, 0)),
  142. awaiting_rel = #{},
  143. max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
  144. await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)),
  145. created_at = erlang:system_time(second)
  146. }.
  147. %% @private init mq
  148. init_mqueue(Zone) ->
  149. emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
  150. store_qos0 => get_env(Zone, mqueue_store_qos0, true),
  151. priorities => get_env(Zone, mqueue_priorities, none),
  152. default_priority => get_env(Zone, mqueue_default_priority, lowest)
  153. }).
  154. %%--------------------------------------------------------------------
  155. %% Info, Stats
  156. %%--------------------------------------------------------------------
  157. %% @doc Get infos of the session.
  158. -spec(info(session()) -> emqx_types:infos()).
  159. info(Session) ->
  160. maps:from_list(info(?INFO_KEYS, Session)).
  161. info(Keys, Session) when is_list(Keys) ->
  162. [{Key, info(Key, Session)} || Key <- Keys];
  163. info(subscriptions, #session{subscriptions = Subs}) ->
  164. Subs;
  165. info(subscriptions_cnt, #session{subscriptions = Subs}) ->
  166. maps:size(Subs);
  167. info(subscriptions_max, #session{max_subscriptions = MaxSubs}) ->
  168. MaxSubs;
  169. info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) ->
  170. UpgradeQoS;
  171. info(inflight, #session{inflight = Inflight}) ->
  172. Inflight;
  173. info(inflight_cnt, #session{inflight = Inflight}) ->
  174. emqx_inflight:size(Inflight);
  175. info(inflight_max, #session{inflight = Inflight}) ->
  176. emqx_inflight:max_size(Inflight);
  177. info(retry_interval, #session{retry_interval = Interval}) ->
  178. Interval div 1000;
  179. info(mqueue, #session{mqueue = MQueue}) ->
  180. MQueue;
  181. info(mqueue_len, #session{mqueue = MQueue}) ->
  182. emqx_mqueue:len(MQueue);
  183. info(mqueue_max, #session{mqueue = MQueue}) ->
  184. emqx_mqueue:max_len(MQueue);
  185. info(mqueue_dropped, #session{mqueue = MQueue}) ->
  186. emqx_mqueue:dropped(MQueue);
  187. info(next_pkt_id, #session{next_pkt_id = PacketId}) ->
  188. PacketId;
  189. info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
  190. AwaitingRel;
  191. info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
  192. maps:size(AwaitingRel);
  193. info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
  194. Max;
  195. info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
  196. Timeout div 1000;
  197. info(created_at, #session{created_at = CreatedAt}) ->
  198. CreatedAt.
  199. %% @doc Get stats of the session.
  200. -spec(stats(session()) -> emqx_types:stats()).
  201. stats(Session) -> info(?STATS_KEYS, Session).
  202. %%--------------------------------------------------------------------
  203. %% Client -> Broker: SUBSCRIBE
  204. %%--------------------------------------------------------------------
  205. -spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(),
  206. emqx_types:subopts(), session())
  207. -> {ok, session()} | {error, emqx_types:reason_code()}).
  208. subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
  209. Session = #session{subscriptions = Subs}) ->
  210. IsNew = not maps:is_key(TopicFilter, Subs),
  211. case IsNew andalso is_subscriptions_full(Session) of
  212. false ->
  213. ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
  214. ok = emqx_hooks:run('session.subscribed',
  215. [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]),
  216. {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
  217. true -> {error, ?RC_QUOTA_EXCEEDED}
  218. end.
  219. -compile({inline, [is_subscriptions_full/1]}).
  220. is_subscriptions_full(#session{max_subscriptions = 0}) ->
  221. false;
  222. is_subscriptions_full(#session{subscriptions = Subs,
  223. max_subscriptions = MaxLimit}) ->
  224. maps:size(Subs) >= MaxLimit.
  225. %%--------------------------------------------------------------------
  226. %% Client -> Broker: UNSUBSCRIBE
  227. %%--------------------------------------------------------------------
  228. -spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), session())
  229. -> {ok, session()} | {error, emqx_types:reason_code()}).
  230. unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) ->
  231. case maps:find(TopicFilter, Subs) of
  232. {ok, SubOpts} ->
  233. ok = emqx_broker:unsubscribe(TopicFilter),
  234. ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, SubOpts]),
  235. {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
  236. error ->
  237. {error, ?RC_NO_SUBSCRIPTION_EXISTED}
  238. end.
  239. %%--------------------------------------------------------------------
  240. %% Client -> Broker: PUBLISH
  241. %%--------------------------------------------------------------------
  242. -spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
  243. -> {ok, emqx_types:publish_result(), session()}
  244. | {error, emqx_types:reason_code()}).
  245. publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
  246. Session = #session{awaiting_rel = AwaitingRel}) ->
  247. case is_awaiting_full(Session) of
  248. false ->
  249. case maps:is_key(PacketId, AwaitingRel) of
  250. false ->
  251. Results = emqx_broker:publish(Msg),
  252. AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
  253. {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
  254. true ->
  255. {error, ?RC_PACKET_IDENTIFIER_IN_USE}
  256. end;
  257. true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
  258. end;
  259. %% Publish QoS0/1 directly
  260. publish(_PacketId, Msg, Session) ->
  261. {ok, emqx_broker:publish(Msg), Session}.
  262. -compile({inline, [is_awaiting_full/1]}).
  263. is_awaiting_full(#session{max_awaiting_rel = 0}) ->
  264. false;
  265. is_awaiting_full(#session{awaiting_rel = AwaitingRel,
  266. max_awaiting_rel = MaxLimit}) ->
  267. maps:size(AwaitingRel) >= MaxLimit.
  268. %%--------------------------------------------------------------------
  269. %% Client -> Broker: PUBACK
  270. %%--------------------------------------------------------------------
  271. -spec(puback(emqx_types:packet_id(), session())
  272. -> {ok, emqx_types:message(), session()}
  273. | {ok, emqx_types:message(), replies(), session()}
  274. | {error, emqx_types:reason_code()}).
  275. puback(PacketId, Session = #session{inflight = Inflight}) ->
  276. case emqx_inflight:lookup(PacketId, Inflight) of
  277. {value, {Msg, _Ts}} when is_record(Msg, message) ->
  278. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  279. return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
  280. {value, {_Pubrel, _Ts}} ->
  281. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  282. none ->
  283. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  284. end.
  285. -compile({inline, [return_with/2]}).
  286. return_with(Msg, {ok, Session}) ->
  287. {ok, Msg, Session};
  288. return_with(Msg, {ok, Publishes, Session}) ->
  289. {ok, Msg, Publishes, Session}.
  290. %%--------------------------------------------------------------------
  291. %% Client -> Broker: PUBREC
  292. %%--------------------------------------------------------------------
  293. -spec(pubrec(emqx_types:packet_id(), session())
  294. -> {ok, emqx_types:message(), session()}
  295. | {error, emqx_types:reason_code()}).
  296. pubrec(PacketId, Session = #session{inflight = Inflight}) ->
  297. case emqx_inflight:lookup(PacketId, Inflight) of
  298. {value, {Msg, _Ts}} when is_record(Msg, message) ->
  299. Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
  300. {ok, Msg, Session#session{inflight = Inflight1}};
  301. {value, {pubrel, _Ts}} ->
  302. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  303. none ->
  304. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  305. end.
  306. %%--------------------------------------------------------------------
  307. %% Client -> Broker: PUBREL
  308. %%--------------------------------------------------------------------
  309. -spec(pubrel(emqx_types:packet_id(), session())
  310. -> {ok, session()} | {error, emqx_types:reason_code()}).
  311. pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
  312. case maps:take(PacketId, AwaitingRel) of
  313. {_Ts, AwaitingRel1} ->
  314. {ok, Session#session{awaiting_rel = AwaitingRel1}};
  315. error ->
  316. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  317. end.
  318. %%--------------------------------------------------------------------
  319. %% Client -> Broker: PUBCOMP
  320. %%--------------------------------------------------------------------
  321. -spec(pubcomp(emqx_types:packet_id(), session())
  322. -> {ok, session()} | {ok, replies(), session()}
  323. | {error, emqx_types:reason_code()}).
  324. pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
  325. case emqx_inflight:lookup(PacketId, Inflight) of
  326. {value, {pubrel, _Ts}} ->
  327. Inflight1 = emqx_inflight:delete(PacketId, Inflight),
  328. dequeue(Session#session{inflight = Inflight1});
  329. {value, _Other} ->
  330. {error, ?RC_PACKET_IDENTIFIER_IN_USE};
  331. none ->
  332. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
  333. end.
  334. %%--------------------------------------------------------------------
  335. %% Dequeue Msgs
  336. %%--------------------------------------------------------------------
  337. dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
  338. case emqx_mqueue:is_empty(Q) of
  339. true -> {ok, Session};
  340. false ->
  341. {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
  342. deliver(Msgs, [], Session#session{mqueue = Q1})
  343. end.
  344. dequeue(0, Msgs, Q) ->
  345. {lists:reverse(Msgs), Q};
  346. dequeue(Cnt, Msgs, Q) ->
  347. case emqx_mqueue:out(Q) of
  348. {empty, _Q} -> dequeue(0, Msgs, Q);
  349. {{value, Msg}, Q1} ->
  350. case emqx_message:is_expired(Msg) of
  351. true -> ok = inc_expired_cnt(delivery),
  352. dequeue(Cnt, Msgs, Q1);
  353. false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
  354. end
  355. end.
  356. -compile({inline, [acc_cnt/2]}).
  357. acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
  358. acc_cnt(_Msg, Cnt) -> Cnt - 1.
  359. %%--------------------------------------------------------------------
  360. %% Broker -> Client: Deliver
  361. %%--------------------------------------------------------------------
  362. -spec(deliver(list(emqx_types:deliver()), session())
  363. -> {ok, session()} | {ok, replies(), session()}).
  364. deliver([Deliver], Session) -> %% Optimize
  365. Enrich = enrich_fun(Session),
  366. deliver_msg(Enrich(Deliver), Session);
  367. deliver(Delivers, Session) ->
  368. Msgs = lists:map(enrich_fun(Session), Delivers),
  369. deliver(Msgs, [], Session).
  370. deliver([], Publishes, Session) ->
  371. {ok, lists:reverse(Publishes), Session};
  372. deliver([Msg|More], Acc, Session) ->
  373. case deliver_msg(Msg, Session) of
  374. {ok, Session1} ->
  375. deliver(More, Acc, Session1);
  376. {ok, [Publish], Session1} ->
  377. deliver(More, [Publish|Acc], Session1)
  378. end.
  379. deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
  380. {ok, [{undefined, maybe_ack(Msg)}], Session};
  381. deliver_msg(Msg = #message{qos = QoS}, Session =
  382. #session{next_pkt_id = PacketId, inflight = Inflight})
  383. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
  384. case emqx_inflight:is_full(Inflight) of
  385. true ->
  386. Session1 = case maybe_nack(Msg) of
  387. true -> Session;
  388. false -> enqueue(Msg, Session)
  389. end,
  390. {ok, Session1};
  391. false ->
  392. Publish = {PacketId, maybe_ack(Msg)},
  393. Session1 = await(PacketId, Msg, Session),
  394. {ok, [Publish], next_pkt_id(Session1)}
  395. end.
  396. -spec(enqueue(list(emqx_types:deliver())|emqx_types:message(),
  397. session()) -> session()).
  398. enqueue([Deliver], Session) -> %% Optimize
  399. Enrich = enrich_fun(Session),
  400. enqueue(Enrich(Deliver), Session);
  401. enqueue(Delivers, Session) when is_list(Delivers) ->
  402. Msgs = lists:map(enrich_fun(Session), Delivers),
  403. lists:foldl(fun enqueue/2, Session, Msgs);
  404. enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
  405. {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
  406. (Dropped =/= undefined) andalso log_dropped(Dropped, Session),
  407. Session#session{mqueue = NewQ}.
  408. log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
  409. case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
  410. true ->
  411. ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
  412. ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
  413. false ->
  414. ok = emqx_metrics:inc('delivery.dropped.queue_full'),
  415. ?LOG(warning, "Dropped msg due to mqueue is full: ~s",
  416. [emqx_message:format(Msg)])
  417. end.
  418. enrich_fun(Session = #session{subscriptions = Subs}) ->
  419. fun({deliver, Topic, Msg}) ->
  420. enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
  421. end.
  422. maybe_ack(Msg) ->
  423. case emqx_shared_sub:is_ack_required(Msg) of
  424. true -> emqx_shared_sub:maybe_ack(Msg);
  425. false -> Msg
  426. end.
  427. maybe_nack(Msg) ->
  428. emqx_shared_sub:is_ack_required(Msg)
  429. andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
  430. get_subopts(Topic, SubMap) ->
  431. case maps:find(Topic, SubMap) of
  432. {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
  433. [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}];
  434. {ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
  435. [{nl, Nl}, {qos, QoS}, {rap, Rap}];
  436. error -> []
  437. end.
  438. enrich_subopts([], Msg, _Session) -> Msg;
  439. enrich_subopts([{nl, 1}|Opts], Msg, Session) ->
  440. enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session);
  441. enrich_subopts([{nl, 0}|Opts], Msg, Session) ->
  442. enrich_subopts(Opts, Msg, Session);
  443. enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
  444. Session = #session{upgrade_qos = true}) ->
  445. enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
  446. enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
  447. Session = #session{upgrade_qos = false}) ->
  448. enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
  449. enrich_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
  450. enrich_subopts(Opts, emqx_message:set_flag(retain, true, Msg), Session);
  451. enrich_subopts([{rap, 0}|Opts], Msg, Session) ->
  452. enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
  453. enrich_subopts([{rap, 1}|Opts], Msg, Session) ->
  454. enrich_subopts(Opts, Msg, Session);
  455. enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
  456. Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
  457. enrich_subopts(Opts, Msg1, Session).
  458. %%--------------------------------------------------------------------
  459. %% Awaiting ACK for QoS1/QoS2 Messages
  460. %%--------------------------------------------------------------------
  461. await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
  462. Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
  463. Session#session{inflight = Inflight1}.
  464. %%--------------------------------------------------------------------
  465. %% Retry Delivery
  466. %%--------------------------------------------------------------------
  467. -spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
  468. retry(Session = #session{inflight = Inflight}) ->
  469. case emqx_inflight:is_empty(Inflight) of
  470. true -> {ok, Session};
  471. false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
  472. [], erlang:system_time(millisecond), Session)
  473. end.
  474. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
  475. {ok, lists:reverse(Acc), Interval, Session};
  476. retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
  477. #session{retry_interval = Interval, inflight = Inflight}) ->
  478. case (Age = age(Now, Ts)) >= Interval of
  479. true ->
  480. {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
  481. retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1});
  482. false ->
  483. {ok, lists:reverse(Acc), Interval - max(0, Age), Session}
  484. end.
  485. retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
  486. case emqx_message:is_expired(Msg) of
  487. true ->
  488. ok = inc_expired_cnt(delivery),
  489. {Acc, emqx_inflight:delete(PacketId, Inflight)};
  490. false ->
  491. Msg1 = emqx_message:set_flag(dup, true, Msg),
  492. Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
  493. {[{PacketId, Msg1}|Acc], Inflight1}
  494. end;
  495. retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
  496. Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
  497. {[{pubrel, PacketId}|Acc], Inflight1}.
  498. %%--------------------------------------------------------------------
  499. %% Expire Awaiting Rel
  500. %%--------------------------------------------------------------------
  501. -spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}).
  502. expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
  503. case maps:size(AwaitingRel) of
  504. 0 -> {ok, Session};
  505. _ -> expire_awaiting_rel(erlang:system_time(millisecond), Session)
  506. end.
  507. expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
  508. await_rel_timeout = Timeout}) ->
  509. NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
  510. AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
  511. ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
  512. (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt),
  513. NSession = Session#session{awaiting_rel = AwaitingRel1},
  514. case maps:size(AwaitingRel1) of
  515. 0 -> {ok, NSession};
  516. _ -> {ok, Timeout, NSession}
  517. end.
  518. %%--------------------------------------------------------------------
  519. %% Takeover, Resume and Replay
  520. %%--------------------------------------------------------------------
  521. -spec(takeover(session()) -> ok).
  522. takeover(#session{subscriptions = Subs}) ->
  523. lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
  524. -spec(resume(emqx_types:clientid(), session()) -> ok).
  525. resume(ClientId, #session{subscriptions = Subs}) ->
  526. lists:foreach(fun({TopicFilter, SubOpts}) ->
  527. ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
  528. end, maps:to_list(Subs)).
  529. -spec(replay(session()) -> {ok, replies(), session()}).
  530. replay(Session = #session{inflight = Inflight}) ->
  531. Pubs = replay(Inflight),
  532. case dequeue(Session) of
  533. {ok, NSession} -> {ok, Pubs, NSession};
  534. {ok, More, NSession} ->
  535. {ok, lists:append(Pubs, More), NSession}
  536. end;
  537. replay(Inflight) ->
  538. lists:map(fun({PacketId, {pubrel, _Ts}}) ->
  539. {pubrel, PacketId};
  540. ({PacketId, {Msg, _Ts}}) ->
  541. {PacketId, emqx_message:set_flag(dup, true, Msg)}
  542. end, emqx_inflight:to_list(Inflight)).
  543. %%--------------------------------------------------------------------
  544. %% Inc message/delivery expired counter
  545. %%--------------------------------------------------------------------
  546. -compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}).
  547. inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
  548. inc_expired_cnt(delivery, N) ->
  549. ok = emqx_metrics:inc('delivery.dropped', N),
  550. emqx_metrics:inc('delivery.dropped.expired', N);
  551. inc_expired_cnt(message, N) ->
  552. ok = emqx_metrics:inc('messages.dropped', N),
  553. emqx_metrics:inc('messages.dropped.expired', N).
  554. %%--------------------------------------------------------------------
  555. %% Next Packet Id
  556. %%--------------------------------------------------------------------
  557. -compile({inline, [next_pkt_id/1]}).
  558. next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) ->
  559. Session#session{next_pkt_id = 1};
  560. next_pkt_id(Session = #session{next_pkt_id = Id}) ->
  561. Session#session{next_pkt_id = Id + 1}.
  562. %%--------------------------------------------------------------------
  563. %% Helper functions
  564. %%--------------------------------------------------------------------
  565. -compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}).
  566. sort_fun() ->
  567. fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end.
  568. batch_n(Inflight) ->
  569. case emqx_inflight:max_size(Inflight) of
  570. 0 -> ?DEFAULT_BATCH_N;
  571. Sz -> Sz - emqx_inflight:size(Inflight)
  572. end.
  573. with_ts(Msg) ->
  574. {Msg, erlang:system_time(millisecond)}.
  575. age(Now, Ts) -> Now - Ts.
  576. %%--------------------------------------------------------------------
  577. %% For CT tests
  578. %%--------------------------------------------------------------------
  579. set_field(Name, Value, Session) ->
  580. Pos = emqx_misc:index_of(Name, record_info(fields, session)),
  581. setelement(Pos+1, Session, Value).