emqttd_session.erl 27 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc Session for persistent MQTT client.
  17. %%
  18. %% Session State in the broker consists of:
  19. %%
  20. %% 1. The Client’s subscriptions.
  21. %%
  22. %% 2. inflight qos1/2 messages sent to the client but unacked, QoS 1 and QoS 2
  23. %% messages which have been sent to the Client, but have not been completely
  24. %% acknowledged.
  25. %%
  26. %% 3. inflight qos2 messages received from client and waiting for pubrel. QoS 2
  27. %% messages which have been received from the Client, but have not been
  28. %% completely acknowledged.
  29. %%
  30. %% 4. all qos1, qos2 messages published to when client is disconnected.
  31. %% QoS 1 and QoS 2 messages pending transmission to the Client.
  32. %%
  33. %% 5. Optionally, QoS 0 messages pending transmission to the Client.
  34. %%
  35. %% State of Message: newcome, inflight, pending
  36. %%
  37. %% @end
  38. -module(emqttd_session).
  39. -include("emqttd.hrl").
  40. -include("emqttd_protocol.hrl").
  41. -include("emqttd_internal.hrl").
  42. -behaviour(gen_server2).
  43. -import(proplists, [get_value/2, get_value/3]).
  44. %% Session API
  45. -export([start_link/3, resume/3, info/1, destroy/2]).
  46. %% PubSub APIs
  47. -export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2,
  48. subscribe/2, subscribe/3, unsubscribe/2]).
  49. %% gen_server Function Exports
  50. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  51. terminate/2, code_change/3]).
  52. %% gen_server2 Message Priorities
  53. -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
  54. -record(session, {
  55. %% Clean Session Flag
  56. clean_sess = true,
  57. %% ClientId: Identifier of Session
  58. client_id :: binary(),
  59. %% Client Pid bind with session
  60. client_pid :: pid(),
  61. %% Old Client Pid that has been kickout
  62. old_client_pid :: pid(),
  63. %% Last packet id of the session
  64. packet_id = 1,
  65. %% Client’s subscriptions.
  66. subscriptions :: dict:dict(),
  67. %% Inflight qos1, qos2 messages sent to the client but unacked,
  68. %% QoS 1 and QoS 2 messages which have been sent to the Client,
  69. %% but have not been completely acknowledged.
  70. %% Client <- Broker
  71. inflight_queue :: list(),
  72. max_inflight = 0,
  73. %% All qos1, qos2 messages published to when client is disconnected.
  74. %% QoS 1 and QoS 2 messages pending transmission to the Client.
  75. %%
  76. %% Optionally, QoS 0 messages pending transmission to the Client.
  77. message_queue :: emqttd_mqueue:mqueue(),
  78. %% Inflight qos2 messages received from client and waiting for pubrel.
  79. %% QoS 2 messages which have been received from the Client,
  80. %% but have not been completely acknowledged.
  81. %% Client -> Broker
  82. awaiting_rel :: map(),
  83. %% Awaiting PUBREL timeout
  84. await_rel_timeout = 8,
  85. %% Max Packets that Awaiting PUBREL
  86. max_awaiting_rel = 100,
  87. %% Awaiting timers for ack, rel.
  88. awaiting_ack :: map(),
  89. %% Retry interval for redelivering QoS1/2 messages
  90. retry_interval = 20,
  91. %% Awaiting for PUBCOMP
  92. awaiting_comp :: map(),
  93. %% session expired after 48 hours
  94. expired_after = 172800,
  95. expired_timer,
  96. collect_interval,
  97. collect_timer,
  98. timestamp}).
  99. -define(PUBSUB_TIMEOUT, 60000).
  100. -define(LOG(Level, Format, Args, State),
  101. lager:Level([{client, State#session.client_id}],
  102. "Session(~s): " ++ Format, [State#session.client_id | Args])).
  103. %% @doc Start a session.
  104. -spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}.
  105. start_link(CleanSess, ClientId, ClientPid) ->
  106. gen_server2:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
  107. %% @doc Resume a session.
  108. -spec resume(pid(), mqtt_client_id(), pid()) -> ok.
  109. resume(SessPid, ClientId, ClientPid) ->
  110. gen_server2:cast(SessPid, {resume, ClientId, ClientPid}).
  111. %% @doc Session Info.
  112. info(SessPid) ->
  113. gen_server2:call(SessPid, info).
  114. %% @doc Destroy a session.
  115. -spec destroy(pid(), mqtt_client_id()) -> ok.
  116. destroy(SessPid, ClientId) ->
  117. gen_server2:cast(SessPid, {destroy, ClientId}).
  118. %%--------------------------------------------------------------------
  119. %% PubSub
  120. %%--------------------------------------------------------------------
  121. %% @doc Subscribe Topics
  122. -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
  123. subscribe(SessPid, TopicTable) ->
  124. gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}).
  125. -spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok.
  126. subscribe(SessPid, PacketId, TopicTable) ->
  127. From = self(),
  128. AckFun = fun(GrantedQos) ->
  129. From ! {suback, PacketId, GrantedQos}
  130. end,
  131. gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
  132. %% @doc Publish message
  133. -spec publish(pid(), mqtt_message()) -> ok | {error, any()}.
  134. publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
  135. %% publish qos0 directly
  136. emqttd:publish(Msg);
  137. publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
  138. %% publish qos1 directly, and client will puback automatically
  139. emqttd:publish(Msg);
  140. publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
  141. %% publish qos2 by session
  142. gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT).
  143. %% @doc PubAck message
  144. -spec puback(pid(), mqtt_packet_id()) -> ok.
  145. puback(SessPid, PktId) ->
  146. gen_server2:cast(SessPid, {puback, PktId}).
  147. -spec pubrec(pid(), mqtt_packet_id()) -> ok.
  148. pubrec(SessPid, PktId) ->
  149. gen_server2:cast(SessPid, {pubrec, PktId}).
  150. -spec pubrel(pid(), mqtt_packet_id()) -> ok.
  151. pubrel(SessPid, PktId) ->
  152. gen_server2:cast(SessPid, {pubrel, PktId}).
  153. -spec pubcomp(pid(), mqtt_packet_id()) -> ok.
  154. pubcomp(SessPid, PktId) ->
  155. gen_server2:cast(SessPid, {pubcomp, PktId}).
  156. %% @doc Unsubscribe Topics
  157. -spec unsubscribe(pid(), [binary()]) -> ok.
  158. unsubscribe(SessPid, Topics) ->
  159. gen_server2:cast(SessPid, {unsubscribe, Topics}).
  160. %%--------------------------------------------------------------------
  161. %% gen_server callbacks
  162. %%--------------------------------------------------------------------
  163. init([CleanSess, ClientId, ClientPid]) ->
  164. process_flag(trap_exit, true),
  165. true = link(ClientPid),
  166. QEnv = emqttd:env(mqtt, queue),
  167. SessEnv = emqttd:env(mqtt, session),
  168. Session = #session{
  169. clean_sess = CleanSess,
  170. client_id = ClientId,
  171. client_pid = ClientPid,
  172. subscriptions = dict:new(),
  173. inflight_queue = [],
  174. max_inflight = get_value(max_inflight, SessEnv, 0),
  175. message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
  176. awaiting_rel = #{},
  177. awaiting_ack = #{},
  178. awaiting_comp = #{},
  179. retry_interval = get_value(unack_retry_interval, SessEnv),
  180. await_rel_timeout = get_value(await_rel_timeout, SessEnv),
  181. max_awaiting_rel = get_value(max_awaiting_rel, SessEnv),
  182. expired_after = get_value(expired_after, SessEnv) * 3600,
  183. collect_interval = get_value(collect_interval, SessEnv, 0),
  184. timestamp = os:timestamp()},
  185. emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
  186. %% start statistics
  187. {ok, start_collector(Session), hibernate}.
  188. prioritise_call(Msg, _From, _Len, _State) ->
  189. case Msg of
  190. info -> 10;
  191. _ -> 0
  192. end.
  193. prioritise_cast(Msg, _Len, _State) ->
  194. case Msg of
  195. {destroy, _} -> 10;
  196. {resume, _, _} -> 9;
  197. {pubrel, _PktId} -> 8;
  198. {pubcomp, _PktId} -> 8;
  199. {pubrec, _PktId} -> 8;
  200. {puback, _PktId} -> 7;
  201. {unsubscribe, _, _} -> 6;
  202. {subscribe, _, _} -> 5;
  203. _ -> 0
  204. end.
  205. prioritise_info(Msg, _Len, _State) ->
  206. case Msg of
  207. {'EXIT', _, _} -> 10;
  208. expired -> 10;
  209. {timeout, _, _} -> 5;
  210. collect_info -> 2;
  211. {dispatch, _, _} -> 1;
  212. _ -> 0
  213. end.
  214. handle_call(info, _From, State) ->
  215. {reply, sess_info(State), State, hibernate};
  216. handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
  217. _From, Session = #session{awaiting_rel = AwaitingRel,
  218. await_rel_timeout = Timeout}) ->
  219. case check_awaiting_rel(Session) of
  220. true ->
  221. TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
  222. AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
  223. {reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
  224. false ->
  225. ?LOG(critical, "Dropped Qos2 message for too many awaiting_rel: ~p", [Msg], Session),
  226. {reply, {error, dropped}, Session, hibernate}
  227. end;
  228. handle_call(Req, _From, State) ->
  229. ?UNEXPECTED_REQ(Req, State).
  230. handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
  231. subscriptions = Subscriptions}) ->
  232. case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of
  233. {ok, TopicTable} ->
  234. ?LOG(info, "Subscribe ~p", [TopicTable], Session),
  235. Subscriptions1 = lists:foldl(
  236. fun({Topic, Qos}, SubDict) ->
  237. case dict:find(Topic, SubDict) of
  238. {ok, Qos} ->
  239. ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
  240. SubDict;
  241. {ok, OldQos} ->
  242. emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos),
  243. ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
  244. dict:store(Topic, Qos, SubDict);
  245. error ->
  246. emqttd:subscribe(ClientId, Topic, Qos),
  247. %%TODO: the design is ugly...
  248. %% <MQTT V3.1.1>: 3.8.4
  249. %% Where the Topic Filter is not identical to any existing Subscription’s filter,
  250. %% a new Subscription is created and all matching retained messages are sent.
  251. emqttd_retainer:dispatch(Topic, self()),
  252. dict:store(Topic, Qos, SubDict)
  253. end
  254. end, Subscriptions, TopicTable),
  255. AckFun([Qos || {_, Qos} <- TopicTable]),
  256. emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable),
  257. hibernate(Session#session{subscriptions = Subscriptions1});
  258. {stop, TopicTable} ->
  259. ?LOG(error, "Cannot subscribe: ~p", [TopicTable], Session),
  260. hibernate(Session)
  261. end;
  262. handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
  263. subscriptions = Subscriptions}) ->
  264. case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of
  265. {ok, Topics} ->
  266. ?LOG(info, "unsubscribe ~p", [Topics], Session),
  267. Subscriptions1 = lists:foldl(
  268. fun(Topic, SubDict) ->
  269. case dict:find(Topic, SubDict) of
  270. {ok, Qos} ->
  271. emqttd:unsubscribe(ClientId, Topic, Qos),
  272. dict:erase(Topic, SubDict);
  273. error ->
  274. SubDict
  275. end
  276. end, Subscriptions, Topics),
  277. hibernate(Session#session{subscriptions = Subscriptions1});
  278. {stop, Topics} ->
  279. ?LOG(info, "Cannot unsubscribe: ~p", [Topics], Session),
  280. hibernate(Session)
  281. end;
  282. handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
  283. ?LOG(warning, "destroyed", [], Session),
  284. shutdown(destroy, Session);
  285. handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId,
  286. client_pid = OldClientPid,
  287. clean_sess = CleanSess,
  288. inflight_queue = InflightQ,
  289. awaiting_ack = AwaitingAck,
  290. awaiting_comp = AwaitingComp,
  291. expired_timer = ETimer} = Session) ->
  292. ?LOG(info, "resumed by ~p", [ClientPid], Session),
  293. %% Cancel expired timer
  294. cancel_timer(ETimer),
  295. case kick(ClientId, OldClientPid, ClientPid) of
  296. ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], Session);
  297. ignore -> ok
  298. end,
  299. true = link(ClientPid),
  300. %% Redeliver PUBREL
  301. [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
  302. %% Clear awaiting_ack timers
  303. [cancel_timer(TRef) || TRef <- maps:values(AwaitingAck)],
  304. %% Clear awaiting_comp timers
  305. [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
  306. Session1 = Session#session{client_pid = ClientPid,
  307. old_client_pid = OldClientPid,
  308. clean_sess = false,
  309. awaiting_ack = #{},
  310. awaiting_comp = #{},
  311. expired_timer = undefined},
  312. %% CleanSess: true -> false?
  313. if
  314. CleanSess =:= true ->
  315. ?LOG(warning, "CleanSess changed to false.", [], Session),
  316. emqttd_sm:unregister_session(CleanSess, ClientId),
  317. emqttd_sm:register_session(false, ClientId, sess_info(Session1));
  318. CleanSess =:= false ->
  319. ok
  320. end,
  321. %% Redeliver inflight messages
  322. Session2 =
  323. lists:foldl(fun({_Id, Msg}, Sess) ->
  324. redeliver(Msg, Sess)
  325. end, Session1, lists:reverse(InflightQ)),
  326. %% Dequeue pending messages
  327. hibernate(dequeue(Session2));
  328. %% PUBACK
  329. handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) ->
  330. case maps:find(PktId, AwaitingAck) of
  331. {ok, TRef} ->
  332. cancel_timer(TRef),
  333. hibernate(dequeue(acked(PktId, Session)));
  334. error ->
  335. ?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session),
  336. hibernate(Session)
  337. end;
  338. %% PUBREC
  339. handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck,
  340. awaiting_comp = AwaitingComp,
  341. await_rel_timeout = Timeout}) ->
  342. case maps:find(PktId, AwaitingAck) of
  343. {ok, TRef} ->
  344. cancel_timer(TRef),
  345. TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
  346. AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
  347. Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
  348. hibernate(dequeue(Session1));
  349. error ->
  350. ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session),
  351. hibernate(Session)
  352. end;
  353. %% PUBREL
  354. handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
  355. case maps:find(PktId, AwaitingRel) of
  356. {ok, {Msg, TRef}} ->
  357. cancel_timer(TRef),
  358. emqttd:publish(Msg),
  359. hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
  360. error ->
  361. ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session),
  362. hibernate(Session)
  363. end;
  364. %% PUBCOMP
  365. handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) ->
  366. case maps:find(PktId, AwaitingComp) of
  367. {ok, TRef} ->
  368. cancel_timer(TRef),
  369. hibernate(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
  370. error ->
  371. ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session),
  372. hibernate(Session)
  373. end;
  374. handle_cast(Msg, State) ->
  375. ?UNEXPECTED_MSG(Msg, State).
  376. %% Dispatch Message
  377. handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
  378. when is_record(Msg, mqtt_message) ->
  379. dispatch(tune_qos(Topic, Msg, Subscriptions), Session);
  380. handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
  381. awaiting_ack = AwaitingAck}) ->
  382. %% just remove awaiting
  383. hibernate(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
  384. handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ,
  385. awaiting_ack = AwaitingAck}) ->
  386. case maps:find(PktId, AwaitingAck) of
  387. {ok, _TRef} ->
  388. case lists:keyfind(PktId, 1, InflightQ) of
  389. {_, Msg} ->
  390. hibernate(redeliver(Msg, Session));
  391. false ->
  392. ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session),
  393. hibernate(dequeue(Session))
  394. end;
  395. error ->
  396. ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session),
  397. hibernate(Session)
  398. end;
  399. handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
  400. case maps:find(PktId, AwaitingRel) of
  401. {ok, {_Msg, _TRef}} ->
  402. ?LOG(warning, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session),
  403. hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
  404. error ->
  405. ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session),
  406. hibernate(Session)
  407. end;
  408. handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) ->
  409. case maps:find(PktId, Awaiting) of
  410. {ok, _TRef} ->
  411. ?LOG(warning, "Awaiting PUBCOMP Timout: ~p", [PktId], Session),
  412. hibernate(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
  413. error ->
  414. ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session),
  415. hibernate(Session)
  416. end;
  417. handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
  418. emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
  419. hibernate(start_collector(Session));
  420. handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
  421. client_pid = ClientPid}) ->
  422. {stop, normal, Session};
  423. handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
  424. client_pid = ClientPid,
  425. expired_after = Expires}) ->
  426. ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session),
  427. TRef = timer(Expires, expired),
  428. hibernate(Session#session{client_pid = undefined, expired_timer = TRef});
  429. handle_info({'EXIT', Pid, _Reason}, Session = #session{old_client_pid = Pid}) ->
  430. %%ignore
  431. hibernate(Session);
  432. handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) ->
  433. ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
  434. [ClientPid, Pid, Reason], Session),
  435. hibernate(Session);
  436. handle_info(expired, Session) ->
  437. ?LOG(info, "expired, shutdown now.", [], Session),
  438. shutdown(expired, Session);
  439. handle_info(Info, Session) ->
  440. ?UNEXPECTED_INFO(Info, Session).
  441. terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
  442. emqttd_sm:unregister_session(CleanSess, ClientId).
  443. code_change(_OldVsn, Session, _Extra) ->
  444. {ok, Session}.
  445. %%--------------------------------------------------------------------
  446. %% Kick old client out
  447. %%--------------------------------------------------------------------
  448. kick(_ClientId, undefined, _Pid) ->
  449. ignore;
  450. kick(_ClientId, Pid, Pid) ->
  451. ignore;
  452. kick(ClientId, OldPid, Pid) ->
  453. unlink(OldPid),
  454. OldPid ! {shutdown, conflict, {ClientId, Pid}},
  455. %% Clean noproc
  456. receive {'EXIT', OldPid, _} -> ok after 0 -> ok end.
  457. %%--------------------------------------------------------------------
  458. %% Dispatch Messages
  459. %%--------------------------------------------------------------------
  460. %% Queue message if client disconnected
  461. dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) ->
  462. hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
  463. %% Deliver qos0 message directly to client
  464. dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
  465. ClientPid ! {deliver, Msg},
  466. hibernate(Session);
  467. dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
  468. when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
  469. case check_inflight(Session) of
  470. true ->
  471. noreply(deliver(Msg, Session));
  472. false ->
  473. hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
  474. end.
  475. tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
  476. case dict:find(Topic, Subscriptions) of
  477. {ok, SubQos} when PubQos > SubQos ->
  478. Msg#mqtt_message{qos = SubQos};
  479. {ok, _SubQos} ->
  480. Msg;
  481. error ->
  482. Msg
  483. end.
  484. %%--------------------------------------------------------------------
  485. %% Check inflight and awaiting_rel
  486. %%--------------------------------------------------------------------
  487. check_inflight(#session{max_inflight = 0}) ->
  488. true;
  489. check_inflight(#session{max_inflight = Max, inflight_queue = Q}) ->
  490. Max > length(Q).
  491. check_awaiting_rel(#session{max_awaiting_rel = 0}) ->
  492. true;
  493. check_awaiting_rel(#session{awaiting_rel = AwaitingRel,
  494. max_awaiting_rel = MaxLen}) ->
  495. maps:size(AwaitingRel) < MaxLen.
  496. %%--------------------------------------------------------------------
  497. %% Dequeue and Deliver
  498. %%--------------------------------------------------------------------
  499. dequeue(Session = #session{client_pid = undefined}) ->
  500. %% do nothing if client is disconnected
  501. Session;
  502. dequeue(Session) ->
  503. case check_inflight(Session) of
  504. true -> dequeue2(Session);
  505. false -> Session
  506. end.
  507. dequeue2(Session = #session{message_queue = Q}) ->
  508. case emqttd_mqueue:out(Q) of
  509. {empty, _Q} ->
  510. Session;
  511. {{value, Msg}, Q1} ->
  512. %% dequeue more
  513. dequeue(deliver(Msg, Session#session{message_queue = Q1}))
  514. end.
  515. deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
  516. ClientPid ! {deliver, Msg}, Session;
  517. deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId,
  518. client_pid = ClientPid,
  519. inflight_queue = InflightQ})
  520. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
  521. Msg1 = Msg#mqtt_message{pktid = PktId, dup = false},
  522. ClientPid ! {deliver, Msg1},
  523. await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).
  524. redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
  525. deliver(Msg, Session);
  526. redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid})
  527. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
  528. ClientPid ! {deliver, Msg#mqtt_message{dup = true}},
  529. await(Msg, Session).
  530. %%--------------------------------------------------------------------
  531. %% Awaiting ack for qos1, qos2 message
  532. %%------------------------------------------------------------------------------
  533. await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting,
  534. retry_interval = Timeout}) ->
  535. TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
  536. Awaiting1 = maps:put(PktId, TRef, Awaiting),
  537. Session#session{awaiting_ack = Awaiting1}.
  538. acked(PktId, Session = #session{client_id = ClientId,
  539. inflight_queue = InflightQ,
  540. awaiting_ack = Awaiting}) ->
  541. case lists:keyfind(PktId, 1, InflightQ) of
  542. {_, Msg} ->
  543. emqttd:run_hooks('message.acked', [ClientId], Msg);
  544. false ->
  545. ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session)
  546. end,
  547. Session#session{awaiting_ack = maps:remove(PktId, Awaiting),
  548. inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.
  549. next_packet_id(Session = #session{packet_id = 16#ffff}) ->
  550. Session#session{packet_id = 1};
  551. next_packet_id(Session = #session{packet_id = Id}) ->
  552. Session#session{packet_id = Id + 1}.
  553. timer(TimeoutSec, TimeoutMsg) ->
  554. erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg).
  555. cancel_timer(undefined) ->
  556. undefined;
  557. cancel_timer(Ref) ->
  558. catch erlang:cancel_timer(Ref).
  559. noreply(State) ->
  560. {noreply, State}.
  561. hibernate(State) ->
  562. {noreply, State, hibernate}.
  563. shutdown(Reason, State) ->
  564. {stop, {shutdown, Reason}, State}.
  565. start_collector(Session = #session{collect_interval = 0}) ->
  566. Session;
  567. start_collector(Session = #session{collect_interval = Interval}) ->
  568. TRef = erlang:send_after(timer:seconds(Interval), self(), collect_info),
  569. Session#session{collect_timer = TRef}.
  570. sess_info(#session{clean_sess = CleanSess,
  571. inflight_queue = InflightQueue,
  572. max_inflight = MaxInflight,
  573. message_queue = MessageQueue,
  574. awaiting_rel = AwaitingRel,
  575. awaiting_ack = AwaitingAck,
  576. awaiting_comp = AwaitingComp,
  577. timestamp = CreatedAt}) ->
  578. Stats = emqttd_mqueue:stats(MessageQueue),
  579. [{clean_sess, CleanSess},
  580. {max_inflight, MaxInflight},
  581. {inflight_queue, length(InflightQueue)},
  582. {message_queue, get_value(len, Stats)},
  583. {message_dropped,get_value(dropped, Stats)},
  584. {awaiting_rel, maps:size(AwaitingRel)},
  585. {awaiting_ack, maps:size(AwaitingAck)},
  586. {awaiting_comp, maps:size(AwaitingComp)},
  587. {created_at, CreatedAt}].