emqttd_session.erl 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. %%%-----------------------------------------------------------------------------
  2. %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
  3. %%%
  4. %%% Permission is hereby granted, free of charge, to any person obtaining a copy
  5. %%% of this software and associated documentation files (the "Software"), to deal
  6. %%% in the Software without restriction, including without limitation the rights
  7. %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. %%% copies of the Software, and to permit persons to whom the Software is
  9. %%% furnished to do so, subject to the following conditions:
  10. %%%
  11. %%% The above copyright notice and this permission notice shall be included in all
  12. %%% copies or substantial portions of the Software.
  13. %%%
  14. %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. %%% SOFTWARE.
  21. %%%-----------------------------------------------------------------------------
  22. %%% @doc
  23. %%%
  24. %%% Session for persistent MQTT client.
  25. %%%
  26. %%% Session State in the broker consists of:
  27. %%%
  28. %%% 1. The Client’s subscriptions.
  29. %%%
  30. %%% 2. inflight qos1/2 messages sent to the client but unacked, QoS 1 and QoS 2
  31. %%% messages which have been sent to the Client, but have not been completely
  32. %%% acknowledged.
  33. %%%
  34. %%% 3. inflight qos2 messages received from client and waiting for pubrel. QoS 2
  35. %%% messages which have been received from the Client, but have not been
  36. %%% completely acknowledged.
  37. %%%
  38. %%% 4. all qos1, qos2 messages published to when client is disconnected.
  39. %%% QoS 1 and QoS 2 messages pending transmission to the Client.
  40. %%%
  41. %%% 5. Optionally, QoS 0 messages pending transmission to the Client.
  42. %%%
  43. %%% State of Message: newcome, inflight, pending
  44. %%%
  45. %%% @end
  46. %%%-----------------------------------------------------------------------------
  47. -module(emqttd_session).
  48. -author("Feng Lee <feng@emqtt.io>").
  49. -include("emqttd.hrl").
  50. -include("emqttd_protocol.hrl").
  51. -behaviour(gen_server2).
  52. %% Session API
  53. -export([start_link/3, resume/3, info/1, destroy/2]).
  54. %% PubSub APIs
  55. -export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2,
  56. subscribe/2, subscribe/3, unsubscribe/2]).
  57. %% gen_server Function Exports
  58. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  59. terminate/2, code_change/3]).
  60. %% gen_server2 Message Priorities
  61. -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
  62. -record(session, {
  63. %% Clean Session Flag
  64. clean_sess = true,
  65. %% ClientId: Identifier of Session
  66. client_id :: binary(),
  67. %% Client Pid bind with session
  68. client_pid :: pid(),
  69. %% Last packet id of the session
  70. packet_id = 1,
  71. %% Client’s subscriptions.
  72. subscriptions :: list(),
  73. %% Inflight qos1, qos2 messages sent to the client but unacked,
  74. %% QoS 1 and QoS 2 messages which have been sent to the Client,
  75. %% but have not been completely acknowledged.
  76. %% Client <- Broker
  77. inflight_queue :: list(),
  78. max_inflight = 0,
  79. %% All qos1, qos2 messages published to when client is disconnected.
  80. %% QoS 1 and QoS 2 messages pending transmission to the Client.
  81. %%
  82. %% Optionally, QoS 0 messages pending transmission to the Client.
  83. message_queue :: emqttd_mqueue:mqueue(),
  84. %% Inflight qos2 messages received from client and waiting for pubrel.
  85. %% QoS 2 messages which have been received from the Client,
  86. %% but have not been completely acknowledged.
  87. %% Client -> Broker
  88. awaiting_rel :: map(),
  89. %% Awaiting PUBREL timeout
  90. await_rel_timeout = 8,
  91. %% Max Packets that Awaiting PUBREL
  92. max_awaiting_rel = 100,
  93. %% Awaiting timers for ack, rel.
  94. awaiting_ack :: map(),
  95. %% Retry interval for redelivering QoS1/2 messages
  96. retry_interval = 20,
  97. %% Awaiting for PUBCOMP
  98. awaiting_comp :: map(),
  99. %% session expired after 48 hours
  100. expired_after = 172800,
  101. expired_timer,
  102. collect_interval,
  103. collect_timer,
  104. timestamp}).
  105. -define(PUBSUB_TIMEOUT, 60000).
  106. -define(LOG(Level, Format, Args, State),
  107. lager:Level([{client, State#session.client_id}],
  108. "Session(~s): " ++ Format, [State#session.client_id | Args])).
  109. %%------------------------------------------------------------------------------
  110. %% @doc Start a session.
  111. %% @end
  112. %%------------------------------------------------------------------------------
  113. -spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}.
  114. start_link(CleanSess, ClientId, ClientPid) ->
  115. gen_server2:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
  116. %%------------------------------------------------------------------------------
  117. %% @doc Resume a session.
  118. %% @end
  119. %%------------------------------------------------------------------------------
  120. -spec resume(pid(), mqtt_client_id(), pid()) -> ok.
  121. resume(SessPid, ClientId, ClientPid) ->
  122. gen_server2:cast(SessPid, {resume, ClientId, ClientPid}).
  123. %%------------------------------------------------------------------------------
  124. %% @doc Session Info.
  125. %% @end
  126. %%------------------------------------------------------------------------------
  127. info(SessPid) ->
  128. gen_server2:call(SessPid, info).
  129. %%------------------------------------------------------------------------------
  130. %% @doc Destroy a session.
  131. %% @end
  132. %%------------------------------------------------------------------------------
  133. -spec destroy(pid(), mqtt_client_id()) -> ok.
  134. destroy(SessPid, ClientId) ->
  135. gen_server2:cast(SessPid, {destroy, ClientId}).
  136. %%------------------------------------------------------------------------------
  137. %% @doc Subscribe Topics
  138. %% @end
  139. %%------------------------------------------------------------------------------
  140. -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
  141. subscribe(SessPid, TopicTable) ->
  142. gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}).
  143. -spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok.
  144. subscribe(SessPid, PacketId, TopicTable) ->
  145. From = self(),
  146. AckFun = fun(GrantedQos) ->
  147. From ! {suback, PacketId, GrantedQos}
  148. end,
  149. gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
  150. %%------------------------------------------------------------------------------
  151. %% @doc Publish message
  152. %% @end
  153. %%------------------------------------------------------------------------------
  154. -spec publish(pid(), mqtt_message()) -> ok.
  155. publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
  156. %% publish qos0 directly
  157. emqttd_pubsub:publish(Msg);
  158. publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
  159. %% publish qos1 directly, and client will puback automatically
  160. emqttd_pubsub:publish(Msg);
  161. publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
  162. %% publish qos2 by session
  163. gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT).
  164. %%------------------------------------------------------------------------------
  165. %% @doc PubAck message
  166. %% @end
  167. %%------------------------------------------------------------------------------
  168. -spec puback(pid(), mqtt_packet_id()) -> ok.
  169. puback(SessPid, PktId) ->
  170. gen_server2:cast(SessPid, {puback, PktId}).
  171. -spec pubrec(pid(), mqtt_packet_id()) -> ok.
  172. pubrec(SessPid, PktId) ->
  173. gen_server2:cast(SessPid, {pubrec, PktId}).
  174. -spec pubrel(pid(), mqtt_packet_id()) -> ok.
  175. pubrel(SessPid, PktId) ->
  176. gen_server2:cast(SessPid, {pubrel, PktId}).
  177. -spec pubcomp(pid(), mqtt_packet_id()) -> ok.
  178. pubcomp(SessPid, PktId) ->
  179. gen_server2:cast(SessPid, {pubcomp, PktId}).
  180. %%------------------------------------------------------------------------------
  181. %% @doc Unsubscribe Topics
  182. %% @end
  183. %%------------------------------------------------------------------------------
  184. -spec unsubscribe(pid(), [binary()]) -> ok.
  185. unsubscribe(SessPid, Topics) ->
  186. gen_server2:cast(SessPid, {unsubscribe, Topics}).
  187. %%%=============================================================================
  188. %%% gen_server callbacks
  189. %%%=============================================================================
  190. init([CleanSess, ClientId, ClientPid]) ->
  191. process_flag(trap_exit, true),
  192. true = link(ClientPid),
  193. QEnv = emqttd:env(mqtt, queue),
  194. SessEnv = emqttd:env(mqtt, session),
  195. Session = #session{
  196. clean_sess = CleanSess,
  197. client_id = ClientId,
  198. client_pid = ClientPid,
  199. subscriptions = [],
  200. inflight_queue = [],
  201. max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0),
  202. message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
  203. awaiting_rel = #{},
  204. awaiting_ack = #{},
  205. awaiting_comp = #{},
  206. retry_interval = emqttd_opts:g(unack_retry_interval, SessEnv),
  207. await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
  208. max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv),
  209. expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600,
  210. collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0),
  211. timestamp = os:timestamp()},
  212. emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
  213. %% start statistics
  214. {ok, start_collector(Session), hibernate}.
  215. prioritise_call(Msg, _From, _Len, _State) ->
  216. case Msg of
  217. info -> 10;
  218. _ -> 0
  219. end.
  220. prioritise_cast(Msg, _Len, _State) ->
  221. case Msg of
  222. {destroy, _} -> 10;
  223. {resume, _, _} -> 9;
  224. {pubrel, _PktId} -> 8;
  225. {pubcomp, _PktId} -> 8;
  226. {pubrec, _PktId} -> 8;
  227. {puback, _PktId} -> 7;
  228. {unsubscribe, _, _} -> 6;
  229. {subscribe, _, _} -> 5;
  230. _ -> 0
  231. end.
  232. prioritise_info(Msg, _Len, _State) ->
  233. case Msg of
  234. {'EXIT', _, _} -> 10;
  235. expired -> 10;
  236. {timeout, _, _} -> 5;
  237. collect_info -> 2;
  238. {dispatch, _} -> 1;
  239. _ -> 0
  240. end.
  241. handle_call(info, _From, State) ->
  242. {reply, sess_info(State), State, hibernate};
  243. handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
  244. _From, Session = #session{awaiting_rel = AwaitingRel,
  245. await_rel_timeout = Timeout}) ->
  246. case check_awaiting_rel(Session) of
  247. true ->
  248. TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
  249. AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
  250. {reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
  251. false ->
  252. ?LOG(critical, "Dropped Qos2 message for too many awaiting_rel: ~p", [Msg], Session),
  253. {reply, {error, dropped}, Session, hibernate}
  254. end;
  255. handle_call(Req, _From, State) ->
  256. ?LOG(critical, "Unexpected Request: ~p", [Req], State),
  257. {reply, {error, unsupported_req}, State, hibernate}.
  258. handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
  259. subscriptions = Subscriptions}) ->
  260. TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
  261. case TopicTable -- Subscriptions of
  262. [] ->
  263. AckFun([Qos || {_, Qos} <- TopicTable]),
  264. hibernate(Session);
  265. _ ->
  266. %% subscribe first and don't care if the subscriptions have been existed
  267. {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
  268. AckFun(GrantedQos),
  269. emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
  270. ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session),
  271. Subscriptions1 =
  272. lists:foldl(fun({Topic, Qos}, Acc) ->
  273. case lists:keyfind(Topic, 1, Acc) of
  274. {Topic, Qos} ->
  275. ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session),
  276. Acc;
  277. {Topic, OldQos} ->
  278. ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session),
  279. lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
  280. false ->
  281. %%TODO: the design is ugly, rewrite later...:(
  282. %% <MQTT V3.1.1>: 3.8.4
  283. %% Where the Topic Filter is not identical to any existing Subscription’s filter,
  284. %% a new Subscription is created and all matching retained messages are sent.
  285. emqttd_retained:dispatch(Topic, self()),
  286. [{Topic, Qos} | Acc]
  287. end
  288. end, Subscriptions, TopicTable),
  289. hibernate(Session#session{subscriptions = Subscriptions1})
  290. end;
  291. handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
  292. subscriptions = Subscriptions}) ->
  293. Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
  294. %% unsubscribe from topic tree
  295. ok = emqttd_pubsub:unsubscribe(Topics),
  296. ?LOG(info, "unsubscribe ~p", [Topics], Session),
  297. Subscriptions1 =
  298. lists:foldl(fun(Topic, Acc) ->
  299. case lists:keyfind(Topic, 1, Acc) of
  300. {Topic, _Qos} ->
  301. lists:keydelete(Topic, 1, Acc);
  302. false ->
  303. Acc
  304. end
  305. end, Subscriptions, Topics),
  306. hibernate(Session#session{subscriptions = Subscriptions1});
  307. handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
  308. ?LOG(warning, "destroyed", [], Session),
  309. shutdown(destroy, Session);
  310. handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId,
  311. client_pid = OldClientPid,
  312. clean_sess = CleanSess,
  313. inflight_queue = InflightQ,
  314. awaiting_ack = AwaitingAck,
  315. awaiting_comp = AwaitingComp,
  316. expired_timer = ETimer} = Session) ->
  317. ?LOG(info, "resumed by ~p", [ClientPid], Session),
  318. %% Cancel expired timer
  319. cancel_timer(ETimer),
  320. case kick(ClientId, OldClientPid, ClientPid) of
  321. ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], Session);
  322. ignore -> ok
  323. end,
  324. true = link(ClientPid),
  325. %% Redeliver PUBREL
  326. [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
  327. %% Clear awaiting_ack timers
  328. [cancel_timer(TRef) || TRef <- maps:values(AwaitingAck)],
  329. %% Clear awaiting_comp timers
  330. [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
  331. Session1 = Session#session{client_pid = ClientPid,
  332. clean_sess = false,
  333. awaiting_ack = #{},
  334. awaiting_comp = #{},
  335. expired_timer = undefined},
  336. %% CleanSess: true -> false?
  337. if
  338. CleanSess =:= true ->
  339. ?LOG(warning, "CleanSess changed to false.", [], Session),
  340. emqttd_sm:unregister_session(CleanSess, ClientId),
  341. emqttd_sm:register_session(false, ClientId, sess_info(Session1));
  342. CleanSess =:= false ->
  343. ok
  344. end,
  345. %% Redeliver inflight messages
  346. Session2 =
  347. lists:foldl(fun({_Id, Msg}, Sess) ->
  348. redeliver(Msg, Sess)
  349. end, Session1, lists:reverse(InflightQ)),
  350. %% Dequeue pending messages
  351. hibernate(dequeue(Session2));
  352. %% PUBACK
  353. handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) ->
  354. case maps:find(PktId, AwaitingAck) of
  355. {ok, TRef} ->
  356. cancel_timer(TRef),
  357. hibernate(dequeue(acked(PktId, Session)));
  358. error ->
  359. ?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session),
  360. hibernate(Session)
  361. end;
  362. %% PUBREC
  363. handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck,
  364. awaiting_comp = AwaitingComp,
  365. await_rel_timeout = Timeout}) ->
  366. case maps:find(PktId, AwaitingAck) of
  367. {ok, TRef} ->
  368. cancel_timer(TRef),
  369. TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
  370. AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
  371. Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
  372. hibernate(dequeue(Session1));
  373. error ->
  374. ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session),
  375. hibernate(Session)
  376. end;
  377. %% PUBREL
  378. handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
  379. case maps:find(PktId, AwaitingRel) of
  380. {ok, {Msg, TRef}} ->
  381. cancel_timer(TRef),
  382. emqttd_pubsub:publish(Msg),
  383. hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
  384. error ->
  385. ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session),
  386. hibernate(Session)
  387. end;
  388. %% PUBCOMP
  389. handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) ->
  390. case maps:find(PktId, AwaitingComp) of
  391. {ok, TRef} ->
  392. cancel_timer(TRef),
  393. hibernate(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
  394. error ->
  395. ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session),
  396. hibernate(Session)
  397. end;
  398. handle_cast(Msg, State) ->
  399. ?LOG(critical, "Unexpected Msg: ~p", [Msg], State),
  400. hibernate(State).
  401. %% Queue messages when client is offline
  402. handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
  403. message_queue = Q})
  404. when is_record(Msg, mqtt_message) ->
  405. hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
  406. %% Dispatch qos0 message directly to client
  407. handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
  408. Session = #session{client_pid = ClientPid}) ->
  409. ClientPid ! {deliver, Msg},
  410. hibernate(Session);
  411. handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
  412. Session = #session{message_queue = MsgQ})
  413. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
  414. case check_inflight(Session) of
  415. true ->
  416. noreply(deliver(Msg, Session));
  417. false ->
  418. hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
  419. end;
  420. handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
  421. awaiting_ack = AwaitingAck}) ->
  422. %% just remove awaiting
  423. hibernate(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
  424. handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ,
  425. awaiting_ack = AwaitingAck}) ->
  426. case maps:find(PktId, AwaitingAck) of
  427. {ok, _TRef} ->
  428. case lists:keyfind(PktId, 1, InflightQ) of
  429. {_, Msg} ->
  430. hibernate(redeliver(Msg, Session));
  431. false ->
  432. ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session),
  433. hibernate(dequeue(Session))
  434. end;
  435. error ->
  436. ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session),
  437. hibernate(Session)
  438. end;
  439. handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
  440. case maps:find(PktId, AwaitingRel) of
  441. {ok, {_Msg, _TRef}} ->
  442. ?LOG(warning, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session),
  443. hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
  444. error ->
  445. ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session),
  446. hibernate(Session)
  447. end;
  448. handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) ->
  449. case maps:find(PktId, Awaiting) of
  450. {ok, _TRef} ->
  451. ?LOG(warning, "Awaiting PUBCOMP Timout: ~p", [PktId], Session),
  452. hibernate(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
  453. error ->
  454. ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session),
  455. hibernate(Session)
  456. end;
  457. handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
  458. emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
  459. hibernate(start_collector(Session));
  460. handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
  461. client_pid = ClientPid}) ->
  462. {stop, normal, Session};
  463. handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
  464. client_pid = ClientPid,
  465. expired_after = Expires}) ->
  466. ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session),
  467. TRef = timer(Expires, expired),
  468. hibernate(Session#session{client_pid = undefined, expired_timer = TRef});
  469. handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) ->
  470. ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
  471. [ClientPid, Pid, Reason], Session),
  472. hibernate(Session);
  473. handle_info(expired, Session) ->
  474. ?LOG(info, "expired, shutdown now.", [], Session),
  475. shutdown(expired, Session);
  476. handle_info(Info, Session) ->
  477. ?LOG(critical, "Unexpected info: ~p", [Info], Session),
  478. hibernate(Session).
  479. terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
  480. emqttd_sm:unregister_session(CleanSess, ClientId).
  481. code_change(_OldVsn, Session, _Extra) ->
  482. {ok, Session}.
  483. %%%=============================================================================
  484. %%% Internal functions
  485. %%%=============================================================================
  486. %%------------------------------------------------------------------------------
  487. %% Kick old client out
  488. %%------------------------------------------------------------------------------
  489. kick(_ClientId, undefined, _Pid) ->
  490. ignore;
  491. kick(_ClientId, Pid, Pid) ->
  492. ignore;
  493. kick(ClientId, OldPid, Pid) ->
  494. unlink(OldPid),
  495. OldPid ! {shutdown, conflict, {ClientId, Pid}},
  496. ok.
  497. %%------------------------------------------------------------------------------
  498. %% Check inflight and awaiting_rel
  499. %%------------------------------------------------------------------------------
  500. check_inflight(#session{max_inflight = 0}) ->
  501. true;
  502. check_inflight(#session{max_inflight = Max, inflight_queue = Q}) ->
  503. Max > length(Q).
  504. check_awaiting_rel(#session{max_awaiting_rel = 0}) ->
  505. true;
  506. check_awaiting_rel(#session{awaiting_rel = AwaitingRel,
  507. max_awaiting_rel = MaxLen}) ->
  508. maps:size(AwaitingRel) < MaxLen.
  509. %%------------------------------------------------------------------------------
  510. %% Dequeue and Deliver
  511. %%------------------------------------------------------------------------------
  512. dequeue(Session = #session{client_pid = undefined}) ->
  513. %% do nothing if client is disconnected
  514. Session;
  515. dequeue(Session) ->
  516. case check_inflight(Session) of
  517. true -> dequeue2(Session);
  518. false -> Session
  519. end.
  520. dequeue2(Session = #session{message_queue = Q}) ->
  521. case emqttd_mqueue:out(Q) of
  522. {empty, _Q} ->
  523. Session;
  524. {{value, Msg}, Q1} ->
  525. %% dequeue more
  526. dequeue(deliver(Msg, Session#session{message_queue = Q1}))
  527. end.
  528. deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
  529. ClientPid ! {deliver, Msg}, Session;
  530. deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId,
  531. client_pid = ClientPid,
  532. inflight_queue = InflightQ})
  533. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
  534. Msg1 = Msg#mqtt_message{pktid = PktId, dup = false},
  535. ClientPid ! {deliver, Msg1},
  536. await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).
  537. redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
  538. deliver(Msg, Session);
  539. redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid})
  540. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
  541. ClientPid ! {deliver, Msg#mqtt_message{dup = true}},
  542. await(Msg, Session).
  543. %%------------------------------------------------------------------------------
  544. %% Awaiting ack for qos1, qos2 message
  545. %%------------------------------------------------------------------------------
  546. await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting,
  547. retry_interval = Timeout}) ->
  548. TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
  549. Awaiting1 = maps:put(PktId, TRef, Awaiting),
  550. Session#session{awaiting_ack = Awaiting1}.
  551. acked(PktId, Session = #session{client_id = ClientId,
  552. inflight_queue = InflightQ,
  553. awaiting_ack = Awaiting}) ->
  554. case lists:keyfind(PktId, 1, InflightQ) of
  555. {_, Msg} ->
  556. emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]);
  557. false ->
  558. ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session)
  559. end,
  560. Session#session{awaiting_ack = maps:remove(PktId, Awaiting),
  561. inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.
  562. next_packet_id(Session = #session{packet_id = 16#ffff}) ->
  563. Session#session{packet_id = 1};
  564. next_packet_id(Session = #session{packet_id = Id}) ->
  565. Session#session{packet_id = Id + 1}.
  566. timer(TimeoutSec, TimeoutMsg) ->
  567. erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg).
  568. cancel_timer(undefined) ->
  569. undefined;
  570. cancel_timer(Ref) ->
  571. catch erlang:cancel_timer(Ref).
  572. noreply(State) ->
  573. {noreply, State}.
  574. hibernate(State) ->
  575. {noreply, State, hibernate}.
  576. shutdown(Reason, State) ->
  577. {stop, {shutdown, Reason}, State}.
  578. start_collector(Session = #session{collect_interval = 0}) ->
  579. Session;
  580. start_collector(Session = #session{collect_interval = Interval}) ->
  581. TRef = erlang:send_after(timer:seconds(Interval), self(), collect_info),
  582. Session#session{collect_timer = TRef}.
  583. sess_info(#session{clean_sess = CleanSess,
  584. subscriptions = Subscriptions,
  585. inflight_queue = InflightQueue,
  586. max_inflight = MaxInflight,
  587. message_queue = MessageQueue,
  588. awaiting_rel = AwaitingRel,
  589. awaiting_ack = AwaitingAck,
  590. awaiting_comp = AwaitingComp,
  591. timestamp = CreatedAt}) ->
  592. Stats = emqttd_mqueue:stats(MessageQueue),
  593. [{clean_sess, CleanSess},
  594. {subscriptions, Subscriptions},
  595. {max_inflight, MaxInflight},
  596. {inflight_queue, length(InflightQueue)},
  597. {message_queue, proplists:get_value(len, Stats)},
  598. {message_dropped,proplists:get_value(dropped, Stats)},
  599. {awaiting_rel, maps:size(AwaitingRel)},
  600. {awaiting_ack, maps:size(AwaitingAck)},
  601. {awaiting_comp, maps:size(AwaitingComp)},
  602. {created_at, CreatedAt}].