emqx_channel.erl 78 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% MQTT Channel
  17. -module(emqx_channel).
  18. -include("emqx.hrl").
  19. -include("emqx_channel.hrl").
  20. -include("emqx_mqtt.hrl").
  21. -include("emqx_access_control.hrl").
  22. -include("logger.hrl").
  23. -include("types.hrl").
  24. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  25. -ifdef(TEST).
  26. -compile(export_all).
  27. -compile(nowarn_export_all).
  28. -endif.
  29. -export([
  30. info/1,
  31. info/2,
  32. get_mqtt_conf/2,
  33. get_mqtt_conf/3,
  34. set_conn_state/2,
  35. stats/1,
  36. caps/1
  37. ]).
  38. -export([
  39. init/2,
  40. handle_in/2,
  41. handle_deliver/2,
  42. handle_out/3,
  43. handle_timeout/3,
  44. handle_call/2,
  45. handle_info/2,
  46. terminate/2
  47. ]).
  48. %% Export for emqx_sn
  49. -export([
  50. do_deliver/2,
  51. ensure_keepalive/2,
  52. clear_keepalive/1
  53. ]).
  54. %% Export for emqx_channel implementations
  55. -export([
  56. maybe_nack/1
  57. ]).
  58. %% Exports for CT
  59. -export([set_field/3]).
  60. -import(
  61. emqx_utils,
  62. [
  63. run_fold/3,
  64. pipeline/3,
  65. maybe_apply/2
  66. ]
  67. ).
  68. -export_type([channel/0, opts/0, conn_state/0]).
  69. -record(channel, {
  70. %% MQTT ConnInfo
  71. conninfo :: emqx_types:conninfo(),
  72. %% MQTT ClientInfo
  73. clientinfo :: emqx_types:clientinfo(),
  74. %% MQTT Session
  75. session :: maybe(emqx_session:session()),
  76. %% Keepalive
  77. keepalive :: maybe(emqx_keepalive:keepalive()),
  78. %% MQTT Will Msg
  79. will_msg :: maybe(emqx_types:message()),
  80. %% MQTT Topic Aliases
  81. topic_aliases :: emqx_types:topic_aliases(),
  82. %% MQTT Topic Alias Maximum
  83. alias_maximum :: maybe(map()),
  84. %% Authentication Data Cache
  85. auth_cache :: maybe(map()),
  86. %% Quota checkers
  87. quota :: emqx_limiter_container:limiter(),
  88. %% Timers
  89. timers :: #{atom() => disabled | maybe(reference())},
  90. %% Conn State
  91. conn_state :: conn_state(),
  92. %% Takeover
  93. takeover :: boolean(),
  94. %% Resume
  95. resuming :: boolean(),
  96. %% Pending delivers when takeovering
  97. pendings :: list()
  98. }).
  99. -type channel() :: #channel{}.
  100. -type opts() :: #{
  101. zone := atom(),
  102. listener := {Type :: atom(), Name :: atom()},
  103. atom() => term()
  104. }.
  105. -type conn_state() :: idle | connecting | connected | reauthenticating | disconnected.
  106. -type reply() ::
  107. {outgoing, emqx_types:packet()}
  108. | {outgoing, [emqx_types:packet()]}
  109. | {event, conn_state() | updated}
  110. | {close, Reason :: atom()}.
  111. -type replies() :: emqx_types:packet() | reply() | [reply()].
  112. -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
  113. -define(TIMER_TABLE, #{
  114. alive_timer => keepalive,
  115. retry_timer => retry_delivery,
  116. await_timer => expire_awaiting_rel,
  117. expire_timer => expire_session,
  118. will_timer => will_message,
  119. quota_timer => expire_quota_limit
  120. }).
  121. -define(LIMITER_ROUTING, message_routing).
  122. -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
  123. %%--------------------------------------------------------------------
  124. %% Info, Attrs and Caps
  125. %%--------------------------------------------------------------------
  126. %% @doc Get infos of the channel.
  127. -spec info(channel()) -> emqx_types:infos().
  128. info(Channel) ->
  129. maps:from_list(info(?INFO_KEYS, Channel)).
  130. -spec info(list(atom()) | atom() | tuple(), channel()) -> term().
  131. info(Keys, Channel) when is_list(Keys) ->
  132. [{Key, info(Key, Channel)} || Key <- Keys];
  133. info(conninfo, #channel{conninfo = ConnInfo}) ->
  134. ConnInfo;
  135. info(socktype, #channel{conninfo = ConnInfo}) ->
  136. maps:get(socktype, ConnInfo, undefined);
  137. info(peername, #channel{conninfo = ConnInfo}) ->
  138. maps:get(peername, ConnInfo, undefined);
  139. info(sockname, #channel{conninfo = ConnInfo}) ->
  140. maps:get(sockname, ConnInfo, undefined);
  141. info(proto_name, #channel{conninfo = ConnInfo}) ->
  142. maps:get(proto_name, ConnInfo, undefined);
  143. info(proto_ver, #channel{conninfo = ConnInfo}) ->
  144. maps:get(proto_ver, ConnInfo, undefined);
  145. info(connected_at, #channel{conninfo = ConnInfo}) ->
  146. maps:get(connected_at, ConnInfo, undefined);
  147. info(clientinfo, #channel{clientinfo = ClientInfo}) ->
  148. ClientInfo;
  149. info(zone, #channel{clientinfo = ClientInfo}) ->
  150. maps:get(zone, ClientInfo);
  151. info(listener, #channel{clientinfo = ClientInfo}) ->
  152. maps:get(listener, ClientInfo);
  153. info(clientid, #channel{clientinfo = ClientInfo}) ->
  154. maps:get(clientid, ClientInfo, undefined);
  155. info(username, #channel{clientinfo = ClientInfo}) ->
  156. maps:get(username, ClientInfo, undefined);
  157. info(session, #channel{session = Session}) ->
  158. maybe_apply(fun emqx_session:info/1, Session);
  159. info({session, Info}, #channel{session = Session}) ->
  160. maybe_apply(fun(S) -> emqx_session:info(Info, S) end, Session);
  161. info(conn_state, #channel{conn_state = ConnState}) ->
  162. ConnState;
  163. info(keepalive, #channel{keepalive = Keepalive}) ->
  164. maybe_apply(fun emqx_keepalive:info/1, Keepalive);
  165. info(will_msg, #channel{will_msg = undefined}) ->
  166. undefined;
  167. info(will_msg, #channel{will_msg = WillMsg}) ->
  168. emqx_message:to_map(WillMsg);
  169. info(topic_aliases, #channel{topic_aliases = Aliases}) ->
  170. Aliases;
  171. info(alias_maximum, #channel{alias_maximum = Limits}) ->
  172. Limits;
  173. info(timers, #channel{timers = Timers}) ->
  174. Timers.
  175. set_conn_state(ConnState, Channel) ->
  176. Channel#channel{conn_state = ConnState}.
  177. -spec stats(channel()) -> emqx_types:stats().
  178. stats(#channel{session = undefined}) ->
  179. emqx_pd:get_counters(?CHANNEL_METRICS);
  180. stats(#channel{session = Session}) ->
  181. lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)).
  182. -spec caps(channel()) -> emqx_types:caps().
  183. caps(#channel{clientinfo = #{zone := Zone}}) ->
  184. emqx_mqtt_caps:get_caps(Zone).
  185. %%--------------------------------------------------------------------
  186. %% Init the channel
  187. %%--------------------------------------------------------------------
  188. -spec init(emqx_types:conninfo(), opts()) -> channel().
  189. init(
  190. ConnInfo = #{
  191. peername := {PeerHost, _Port},
  192. sockname := {_Host, SockPort}
  193. },
  194. #{
  195. zone := Zone,
  196. limiter := LimiterCfg,
  197. listener := {Type, Listener}
  198. } = Opts
  199. ) ->
  200. Peercert = maps:get(peercert, ConnInfo, undefined),
  201. Protocol = maps:get(protocol, ConnInfo, mqtt),
  202. MountPoint =
  203. case emqx_config:get_listener_conf(Type, Listener, [mountpoint]) of
  204. <<>> -> undefined;
  205. MP -> MP
  206. end,
  207. ListenerId = emqx_listeners:listener_id(Type, Listener),
  208. ClientInfo = set_peercert_infos(
  209. Peercert,
  210. #{
  211. zone => Zone,
  212. listener => ListenerId,
  213. protocol => Protocol,
  214. peerhost => PeerHost,
  215. sockport => SockPort,
  216. clientid => undefined,
  217. username => undefined,
  218. mountpoint => MountPoint,
  219. is_bridge => false,
  220. is_superuser => false,
  221. enable_authn => maps:get(enable_authn, Opts, true)
  222. },
  223. Zone
  224. ),
  225. {NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo),
  226. #channel{
  227. conninfo = NConnInfo,
  228. clientinfo = NClientInfo,
  229. topic_aliases = #{
  230. inbound => #{},
  231. outbound => #{}
  232. },
  233. auth_cache = #{},
  234. quota = emqx_limiter_container:get_limiter_by_types(
  235. ListenerId, [?LIMITER_ROUTING], LimiterCfg
  236. ),
  237. timers = #{},
  238. conn_state = idle,
  239. takeover = false,
  240. resuming = false,
  241. pendings = []
  242. }.
  243. set_peercert_infos(NoSSL, ClientInfo, _) when
  244. NoSSL =:= nossl;
  245. NoSSL =:= undefined
  246. ->
  247. ClientInfo#{username => undefined};
  248. set_peercert_infos(Peercert, ClientInfo, Zone) ->
  249. {DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)},
  250. PeercetAs = fun(Key) ->
  251. case get_mqtt_conf(Zone, Key) of
  252. cn -> CN;
  253. dn -> DN;
  254. crt -> Peercert;
  255. pem when is_binary(Peercert) -> base64:encode(Peercert);
  256. md5 when is_binary(Peercert) -> emqx_passwd:hash_data(md5, Peercert);
  257. _ -> undefined
  258. end
  259. end,
  260. Username = PeercetAs(peer_cert_as_username),
  261. ClientId = PeercetAs(peer_cert_as_clientid),
  262. ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}.
  263. take_ws_cookie(ClientInfo, ConnInfo) ->
  264. case maps:take(ws_cookie, ConnInfo) of
  265. {WsCookie, NConnInfo} ->
  266. {ClientInfo#{ws_cookie => WsCookie}, NConnInfo};
  267. _ ->
  268. {ClientInfo, ConnInfo}
  269. end.
  270. %%--------------------------------------------------------------------
  271. %% Handle incoming packet
  272. %%--------------------------------------------------------------------
  273. -spec handle_in(emqx_types:packet(), channel()) ->
  274. {ok, channel()}
  275. | {ok, replies(), channel()}
  276. | {shutdown, Reason :: term(), channel()}
  277. | {shutdown, Reason :: term(), replies(), channel()}.
  278. handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
  279. ConnState =:= connected orelse ConnState =:= reauthenticating
  280. ->
  281. handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
  282. handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
  283. handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
  284. handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
  285. case
  286. pipeline(
  287. [
  288. fun overload_protection/2,
  289. fun enrich_conninfo/2,
  290. fun run_conn_hooks/2,
  291. fun check_connect/2,
  292. fun enrich_client/2,
  293. fun set_log_meta/2,
  294. fun check_banned/2,
  295. fun count_flapping_event/2
  296. ],
  297. ConnPkt,
  298. Channel#channel{conn_state = connecting}
  299. )
  300. of
  301. {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
  302. ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
  303. NChannel1 = NChannel#channel{
  304. alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
  305. },
  306. case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of
  307. {ok, Properties, NChannel2} ->
  308. %% only store will_msg after successful authn
  309. %% fix for: https://github.com/emqx/emqx/issues/8886
  310. NChannel3 = NChannel2#channel{will_msg = emqx_packet:will_msg(NConnPkt)},
  311. process_connect(Properties, NChannel3);
  312. {continue, Properties, NChannel2} ->
  313. handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
  314. {error, ReasonCode} ->
  315. handle_out(connack, ReasonCode, NChannel1)
  316. end;
  317. {error, ReasonCode, NChannel} ->
  318. handle_out(connack, ReasonCode, NChannel)
  319. end;
  320. handle_in(
  321. Packet = ?AUTH_PACKET(ReasonCode, _Properties),
  322. Channel = #channel{conn_state = ConnState}
  323. ) ->
  324. try
  325. case {ReasonCode, ConnState} of
  326. {?RC_CONTINUE_AUTHENTICATION, connecting} -> ok;
  327. {?RC_CONTINUE_AUTHENTICATION, reauthenticating} -> ok;
  328. {?RC_RE_AUTHENTICATE, connected} -> ok;
  329. _ -> error(protocol_error)
  330. end,
  331. case authenticate(Packet, Channel) of
  332. {ok, NProperties, NChannel} ->
  333. case ConnState of
  334. connecting ->
  335. process_connect(NProperties, NChannel);
  336. _ ->
  337. handle_out(
  338. auth,
  339. {?RC_SUCCESS, NProperties},
  340. NChannel#channel{conn_state = connected}
  341. )
  342. end;
  343. {continue, NProperties, NChannel} ->
  344. handle_out(
  345. auth,
  346. {?RC_CONTINUE_AUTHENTICATION, NProperties},
  347. NChannel#channel{conn_state = reauthenticating}
  348. );
  349. {error, NReasonCode} ->
  350. case ConnState of
  351. connecting ->
  352. handle_out(connack, NReasonCode, Channel);
  353. _ ->
  354. handle_out(disconnect, NReasonCode, Channel)
  355. end
  356. end
  357. catch
  358. _Class:_Reason ->
  359. case ConnState of
  360. connecting ->
  361. handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
  362. _ ->
  363. handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel)
  364. end
  365. end;
  366. handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
  367. ConnState =/= connected andalso ConnState =/= reauthenticating
  368. ->
  369. handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
  370. handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
  371. case emqx_packet:check(Packet) of
  372. ok -> process_publish(Packet, Channel);
  373. {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel)
  374. end;
  375. handle_in(
  376. ?PUBACK_PACKET(PacketId, _ReasonCode, Properties),
  377. Channel =
  378. #channel{clientinfo = ClientInfo, session = Session}
  379. ) ->
  380. case emqx_session:puback(ClientInfo, PacketId, Session) of
  381. {ok, Msg, NSession} ->
  382. ok = after_message_acked(ClientInfo, Msg, Properties),
  383. {ok, Channel#channel{session = NSession}};
  384. {ok, Msg, Publishes, NSession} ->
  385. ok = after_message_acked(ClientInfo, Msg, Properties),
  386. handle_out(publish, Publishes, Channel#channel{session = NSession});
  387. {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
  388. ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
  389. ok = emqx_metrics:inc('packets.puback.inuse'),
  390. {ok, Channel};
  391. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
  392. ?SLOG(warning, #{msg => "puback_packetId_not_found", packetId => PacketId}),
  393. ok = emqx_metrics:inc('packets.puback.missed'),
  394. {ok, Channel}
  395. end;
  396. handle_in(
  397. ?PUBREC_PACKET(PacketId, _ReasonCode, Properties),
  398. Channel =
  399. #channel{clientinfo = ClientInfo, session = Session}
  400. ) ->
  401. case emqx_session:pubrec(ClientInfo, PacketId, Session) of
  402. {ok, Msg, NSession} ->
  403. ok = after_message_acked(ClientInfo, Msg, Properties),
  404. NChannel = Channel#channel{session = NSession},
  405. handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
  406. {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
  407. ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
  408. ok = emqx_metrics:inc('packets.pubrec.inuse'),
  409. handle_out(pubrel, {PacketId, RC}, Channel);
  410. {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
  411. ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
  412. ok = emqx_metrics:inc('packets.pubrec.missed'),
  413. handle_out(pubrel, {PacketId, RC}, Channel)
  414. end;
  415. handle_in(
  416. ?PUBREL_PACKET(PacketId, _ReasonCode),
  417. Channel = #channel{
  418. clientinfo = ClientInfo,
  419. session = Session
  420. }
  421. ) ->
  422. case emqx_session:pubrel(ClientInfo, PacketId, Session) of
  423. {ok, NSession} ->
  424. NChannel = Channel#channel{session = NSession},
  425. handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
  426. {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
  427. ?SLOG(warning, #{msg => "pubrel_packetId_not_found", packetId => PacketId}),
  428. ok = emqx_metrics:inc('packets.pubrel.missed'),
  429. handle_out(pubcomp, {PacketId, RC}, Channel)
  430. end;
  431. handle_in(
  432. ?PUBCOMP_PACKET(PacketId, _ReasonCode),
  433. Channel = #channel{
  434. clientinfo = ClientInfo, session = Session
  435. }
  436. ) ->
  437. case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
  438. {ok, NSession} ->
  439. {ok, Channel#channel{session = NSession}};
  440. {ok, Publishes, NSession} ->
  441. handle_out(publish, Publishes, Channel#channel{session = NSession});
  442. {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
  443. ok = emqx_metrics:inc('packets.pubcomp.inuse'),
  444. {ok, Channel};
  445. {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
  446. ?SLOG(warning, #{msg => "pubcomp_packetId_not_found", packetId => PacketId}),
  447. ok = emqx_metrics:inc('packets.pubcomp.missed'),
  448. {ok, Channel}
  449. end;
  450. handle_in(
  451. SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
  452. Channel = #channel{clientinfo = ClientInfo}
  453. ) ->
  454. case emqx_packet:check(SubPkt) of
  455. ok ->
  456. TopicFilters0 = parse_topic_filters(TopicFilters),
  457. TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
  458. TupleTopicFilters0 = check_sub_authzs(SubPkt, TopicFilters1, Channel),
  459. HasAuthzDeny = lists:any(
  460. fun({_TopicFilter, ReasonCode}) ->
  461. ReasonCode =:= ?RC_NOT_AUTHORIZED
  462. end,
  463. TupleTopicFilters0
  464. ),
  465. DenyAction = emqx:get_config([authorization, deny_action], ignore),
  466. case DenyAction =:= disconnect andalso HasAuthzDeny of
  467. true ->
  468. handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
  469. false ->
  470. TopicFilters2 = [
  471. TopicFilter
  472. || {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0
  473. ],
  474. TopicFilters3 = run_hooks(
  475. 'client.subscribe',
  476. [ClientInfo, Properties],
  477. TopicFilters2
  478. ),
  479. {TupleTopicFilters1, NChannel} = process_subscribe(
  480. TopicFilters3,
  481. Properties,
  482. Channel
  483. ),
  484. TupleTopicFilters2 =
  485. lists:foldl(
  486. fun
  487. ({{Topic, Opts = #{deny_subscription := true}}, _QoS}, Acc) ->
  488. Key = {Topic, maps:without([deny_subscription], Opts)},
  489. lists:keyreplace(Key, 1, Acc, {Key, ?RC_UNSPECIFIED_ERROR});
  490. (Tuple = {Key, _Value}, Acc) ->
  491. lists:keyreplace(Key, 1, Acc, Tuple)
  492. end,
  493. TupleTopicFilters0,
  494. TupleTopicFilters1
  495. ),
  496. ReasonCodes2 = [
  497. ReasonCode
  498. || {_TopicFilter, ReasonCode} <- TupleTopicFilters2
  499. ],
  500. handle_out(suback, {PacketId, ReasonCodes2}, NChannel)
  501. end;
  502. {error, ReasonCode} ->
  503. handle_out(disconnect, ReasonCode, Channel)
  504. end;
  505. handle_in(
  506. Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
  507. Channel = #channel{clientinfo = ClientInfo}
  508. ) ->
  509. case emqx_packet:check(Packet) of
  510. ok ->
  511. TopicFilters1 = run_hooks(
  512. 'client.unsubscribe',
  513. [ClientInfo, Properties],
  514. parse_topic_filters(TopicFilters)
  515. ),
  516. {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
  517. handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
  518. {error, ReasonCode} ->
  519. handle_out(disconnect, ReasonCode, Channel)
  520. end;
  521. handle_in(?PACKET(?PINGREQ), Channel) ->
  522. {ok, ?PACKET(?PINGRESP), Channel};
  523. handle_in(
  524. ?DISCONNECT_PACKET(ReasonCode, Properties),
  525. Channel = #channel{conninfo = ConnInfo}
  526. ) ->
  527. NConnInfo = ConnInfo#{disconn_props => Properties},
  528. NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = NConnInfo}),
  529. process_disconnect(ReasonCode, Properties, NChannel);
  530. handle_in(?AUTH_PACKET(), Channel) ->
  531. handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
  532. handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
  533. shutdown(Reason, Channel);
  534. handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connecting}) ->
  535. shutdown(frame_too_large, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel);
  536. handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
  537. shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
  538. handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = ConnState}) when
  539. ConnState =:= connected orelse ConnState =:= reauthenticating
  540. ->
  541. handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
  542. handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
  543. ConnState =:= connected orelse ConnState =:= reauthenticating
  544. ->
  545. handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
  546. handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
  547. ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
  548. {ok, Channel};
  549. handle_in(Packet, Channel) ->
  550. ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
  551. handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
  552. %%--------------------------------------------------------------------
  553. %% Process Connect
  554. %%--------------------------------------------------------------------
  555. process_connect(
  556. AckProps,
  557. Channel = #channel{
  558. conninfo = ConnInfo,
  559. clientinfo = ClientInfo
  560. }
  561. ) ->
  562. #{clean_start := CleanStart} = ConnInfo,
  563. case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
  564. {ok, #{session := Session, present := false}} ->
  565. NChannel = Channel#channel{session = Session},
  566. handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel));
  567. {ok, #{session := Session, present := true, pendings := Pendings}} ->
  568. Pendings1 = lists:usort(lists:append(Pendings, emqx_utils:drain_deliver())),
  569. NChannel = Channel#channel{
  570. session = Session,
  571. resuming = true,
  572. pendings = Pendings1
  573. },
  574. handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, ensure_connected(NChannel));
  575. {error, client_id_unavailable} ->
  576. handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
  577. {error, Reason} ->
  578. ?SLOG(error, #{msg => "failed_to_open_session", reason => Reason}),
  579. handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
  580. end.
  581. %%--------------------------------------------------------------------
  582. %% Process Publish
  583. %%--------------------------------------------------------------------
  584. process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
  585. case
  586. pipeline(
  587. [
  588. fun check_quota_exceeded/2,
  589. fun process_alias/2,
  590. fun check_pub_alias/2,
  591. fun check_pub_authz/2,
  592. fun check_pub_caps/2
  593. ],
  594. Packet,
  595. Channel
  596. )
  597. of
  598. {ok, NPacket, NChannel} ->
  599. Msg = packet_to_message(NPacket, NChannel),
  600. do_publish(PacketId, Msg, NChannel);
  601. {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
  602. ?SLOG(
  603. warning,
  604. #{
  605. msg => "cannot_publish_to_topic",
  606. reason => emqx_reason_codes:name(Rc)
  607. },
  608. #{topic => Topic}
  609. ),
  610. case emqx:get_config([authorization, deny_action], ignore) of
  611. ignore ->
  612. case QoS of
  613. ?QOS_0 -> {ok, NChannel};
  614. ?QOS_1 -> handle_out(puback, {PacketId, Rc}, NChannel);
  615. ?QOS_2 -> handle_out(pubrec, {PacketId, Rc}, NChannel)
  616. end;
  617. disconnect ->
  618. handle_out(disconnect, Rc, NChannel)
  619. end;
  620. {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
  621. ?SLOG(
  622. warning,
  623. #{
  624. msg => "cannot_publish_to_topic",
  625. reason => emqx_reason_codes:name(Rc)
  626. },
  627. #{topic => Topic}
  628. ),
  629. case QoS of
  630. ?QOS_0 ->
  631. ok = emqx_metrics:inc('packets.publish.dropped'),
  632. {ok, NChannel};
  633. ?QOS_1 ->
  634. handle_out(puback, {PacketId, Rc}, NChannel);
  635. ?QOS_2 ->
  636. handle_out(pubrec, {PacketId, Rc}, NChannel)
  637. end;
  638. {error, Rc, NChannel} ->
  639. ?SLOG(
  640. warning,
  641. #{
  642. msg => "cannot_publish_to_topic",
  643. topic => Topic,
  644. reason => emqx_reason_codes:name(Rc)
  645. },
  646. #{topic => Topic}
  647. ),
  648. handle_out(disconnect, Rc, NChannel)
  649. end.
  650. packet_to_message(Packet, #channel{
  651. conninfo = #{proto_ver := ProtoVer},
  652. clientinfo = #{
  653. protocol := Protocol,
  654. clientid := ClientId,
  655. username := Username,
  656. peerhost := PeerHost,
  657. mountpoint := MountPoint
  658. }
  659. }) ->
  660. emqx_mountpoint:mount(
  661. MountPoint,
  662. emqx_packet:to_message(
  663. Packet,
  664. ClientId,
  665. #{
  666. proto_ver => ProtoVer,
  667. protocol => Protocol,
  668. username => Username,
  669. peerhost => PeerHost
  670. }
  671. )
  672. ).
  673. do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
  674. Result = emqx_broker:publish(Msg),
  675. NChannel = ensure_quota(Result, Channel),
  676. {ok, NChannel};
  677. do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
  678. PubRes = emqx_broker:publish(Msg),
  679. RC = puback_reason_code(PacketId, Msg, PubRes),
  680. case RC of
  681. undefined ->
  682. {ok, Channel};
  683. _Value ->
  684. do_finish_publish(PacketId, PubRes, RC, Channel)
  685. end;
  686. do_publish(
  687. PacketId,
  688. Msg = #message{qos = ?QOS_2},
  689. Channel = #channel{clientinfo = ClientInfo, session = Session}
  690. ) ->
  691. case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
  692. {ok, PubRes, NSession} ->
  693. RC = pubrec_reason_code(PubRes),
  694. NChannel0 = Channel#channel{session = NSession},
  695. NChannel1 = ensure_timer(await_timer, NChannel0),
  696. NChannel2 = ensure_quota(PubRes, NChannel1),
  697. handle_out(pubrec, {PacketId, RC}, NChannel2);
  698. {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
  699. ok = emqx_metrics:inc('packets.publish.inuse'),
  700. handle_out(pubrec, {PacketId, RC}, Channel);
  701. {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
  702. ok = emqx_metrics:inc('packets.publish.dropped'),
  703. handle_out(disconnect, RC, Channel)
  704. end.
  705. do_finish_publish(PacketId, PubRes, RC, Channel) ->
  706. NChannel = ensure_quota(PubRes, Channel),
  707. handle_out(puback, {PacketId, RC}, NChannel).
  708. ensure_quota(_, Channel = #channel{quota = infinity}) ->
  709. Channel;
  710. ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
  711. Cnt = lists:foldl(
  712. fun
  713. ({_, _, ok}, N) -> N + 1;
  714. ({_, _, {ok, I}}, N) -> N + I;
  715. (_, N) -> N
  716. end,
  717. 1,
  718. PubRes
  719. ),
  720. case emqx_limiter_container:check(Cnt, ?LIMITER_ROUTING, Limiter) of
  721. {ok, NLimiter} ->
  722. Channel#channel{quota = NLimiter};
  723. {_, Intv, NLimiter} ->
  724. ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter})
  725. end.
  726. -compile({inline, [pubrec_reason_code/1]}).
  727. pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
  728. pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
  729. puback_reason_code(PacketId, Msg, [] = PubRes) ->
  730. emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
  731. puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->
  732. emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS).
  733. -compile({inline, [after_message_acked/3]}).
  734. after_message_acked(ClientInfo, Msg, PubAckProps) ->
  735. ok = emqx_metrics:inc('messages.acked'),
  736. emqx_hooks:run('message.acked', [
  737. ClientInfo,
  738. emqx_message:set_header(puback_props, PubAckProps, Msg)
  739. ]).
  740. %%--------------------------------------------------------------------
  741. %% Process Subscribe
  742. %%--------------------------------------------------------------------
  743. -compile({inline, [process_subscribe/3]}).
  744. process_subscribe(TopicFilters, SubProps, Channel) ->
  745. process_subscribe(TopicFilters, SubProps, Channel, []).
  746. process_subscribe([], _SubProps, Channel, Acc) ->
  747. {lists:reverse(Acc), Channel};
  748. process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) ->
  749. case check_sub_caps(TopicFilter, SubOpts, Channel) of
  750. ok ->
  751. {ReasonCode, NChannel} = do_subscribe(
  752. TopicFilter,
  753. SubOpts#{sub_props => SubProps},
  754. Channel
  755. ),
  756. process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]);
  757. {error, ReasonCode} ->
  758. ?SLOG(
  759. warning,
  760. #{
  761. msg => "cannot_subscribe_topic_filter",
  762. reason => emqx_reason_codes:name(ReasonCode)
  763. },
  764. #{topic => TopicFilter}
  765. ),
  766. process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc])
  767. end.
  768. do_subscribe(
  769. TopicFilter,
  770. SubOpts = #{qos := QoS},
  771. Channel =
  772. #channel{
  773. clientinfo = ClientInfo = #{mountpoint := MountPoint},
  774. session = Session
  775. }
  776. ) ->
  777. NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
  778. NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
  779. case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
  780. {ok, NSession} ->
  781. {QoS, Channel#channel{session = NSession}};
  782. {error, RC} ->
  783. ?SLOG(
  784. warning,
  785. #{
  786. msg => "cannot_subscribe_topic_filter",
  787. reason => emqx_reason_codes:text(RC)
  788. },
  789. #{topic => NTopicFilter}
  790. ),
  791. {RC, Channel}
  792. end.
  793. %%--------------------------------------------------------------------
  794. %% Process Unsubscribe
  795. %%--------------------------------------------------------------------
  796. -compile({inline, [process_unsubscribe/3]}).
  797. process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
  798. process_unsubscribe(TopicFilters, UnSubProps, Channel, []).
  799. process_unsubscribe([], _UnSubProps, Channel, Acc) ->
  800. {lists:reverse(Acc), Channel};
  801. process_unsubscribe([{TopicFilter, SubOpts} | More], UnSubProps, Channel, Acc) ->
  802. {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
  803. process_unsubscribe(More, UnSubProps, NChannel, [RC | Acc]).
  804. do_unsubscribe(
  805. TopicFilter,
  806. SubOpts,
  807. Channel =
  808. #channel{
  809. clientinfo = ClientInfo = #{mountpoint := MountPoint},
  810. session = Session
  811. }
  812. ) ->
  813. TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
  814. case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
  815. {ok, NSession} ->
  816. {?RC_SUCCESS, Channel#channel{session = NSession}};
  817. {error, RC} ->
  818. {RC, Channel}
  819. end.
  820. %%--------------------------------------------------------------------
  821. %% Process Disconnect
  822. %%--------------------------------------------------------------------
  823. %% MQTT-v5.0: 3.14.4 DISCONNECT Actions
  824. maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
  825. Channel#channel{will_msg = undefined};
  826. maybe_clean_will_msg(_ReasonCode, Channel) ->
  827. Channel.
  828. %% MQTT-v5.0: 3.14.2.2.2 Session Expiry Interval
  829. process_disconnect(
  830. _ReasonCode,
  831. #{'Session-Expiry-Interval' := Interval},
  832. Channel = #channel{conninfo = #{expiry_interval := 0}}
  833. ) when
  834. Interval > 0
  835. ->
  836. handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
  837. process_disconnect(ReasonCode, Properties, Channel) ->
  838. NChannel = maybe_update_expiry_interval(Properties, Channel),
  839. {ok, {close, disconnect_reason(ReasonCode)}, NChannel}.
  840. maybe_update_expiry_interval(
  841. #{'Session-Expiry-Interval' := Interval},
  842. Channel = #channel{conninfo = ConnInfo}
  843. ) ->
  844. EI = timer:seconds(Interval),
  845. OldEI = maps:get(expiry_interval, ConnInfo, 0),
  846. case OldEI =:= EI of
  847. true ->
  848. Channel;
  849. false ->
  850. NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}},
  851. %% Check if the client turns off persistence (turning it on is disallowed)
  852. case EI =:= 0 andalso OldEI > 0 of
  853. true ->
  854. NSession = emqx_session:unpersist(NChannel#channel.session),
  855. NChannel#channel{session = NSession};
  856. false ->
  857. NChannel
  858. end
  859. end;
  860. maybe_update_expiry_interval(_Properties, Channel) ->
  861. Channel.
  862. %%--------------------------------------------------------------------
  863. %% Handle Delivers from broker to client
  864. %%--------------------------------------------------------------------
  865. -spec handle_deliver(list(emqx_types:deliver()), channel()) ->
  866. {ok, channel()} | {ok, replies(), channel()}.
  867. handle_deliver(
  868. Delivers,
  869. Channel = #channel{
  870. takeover = true,
  871. pendings = Pendings
  872. }
  873. ) ->
  874. %% NOTE: Order is important here. While the takeover is in
  875. %% progress, the session cannot enqueue messages, since it already
  876. %% passed on the queue to the new connection in the session state.
  877. NPendings = lists:append(Pendings, maybe_nack(Delivers)),
  878. {ok, Channel#channel{pendings = NPendings}};
  879. handle_deliver(
  880. Delivers,
  881. Channel = #channel{
  882. conn_state = disconnected,
  883. takeover = false,
  884. session = Session,
  885. clientinfo = ClientInfo
  886. }
  887. ) ->
  888. Delivers1 = maybe_nack(Delivers),
  889. NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session),
  890. NChannel = Channel#channel{session = NSession},
  891. {ok, NChannel};
  892. handle_deliver(
  893. Delivers,
  894. Channel = #channel{
  895. session = Session,
  896. takeover = false,
  897. clientinfo = ClientInfo
  898. }
  899. ) ->
  900. case emqx_session:deliver(ClientInfo, Delivers, Session) of
  901. {ok, Publishes, NSession} ->
  902. NChannel = Channel#channel{session = NSession},
  903. handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
  904. {ok, NSession} ->
  905. {ok, Channel#channel{session = NSession}}
  906. end.
  907. %% Nack delivers from shared subscription
  908. maybe_nack(Delivers) ->
  909. lists:filter(fun not_nacked/1, Delivers).
  910. not_nacked({deliver, _Topic, Msg}) ->
  911. case emqx_shared_sub:is_ack_required(Msg) of
  912. true ->
  913. ok = emqx_shared_sub:nack_no_connection(Msg),
  914. false;
  915. false ->
  916. true
  917. end.
  918. %%--------------------------------------------------------------------
  919. %% Handle outgoing packet
  920. %%--------------------------------------------------------------------
  921. -spec handle_out(atom(), term(), channel()) ->
  922. {ok, channel()}
  923. | {ok, replies(), channel()}
  924. | {shutdown, Reason :: term(), channel()}
  925. | {shutdown, Reason :: term(), replies(), channel()}.
  926. handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) ->
  927. AckProps = run_fold(
  928. [
  929. fun enrich_connack_caps/2,
  930. fun enrich_server_keepalive/2,
  931. fun enrich_response_information/2,
  932. fun enrich_assigned_clientid/2
  933. ],
  934. Props,
  935. Channel
  936. ),
  937. NAckProps = run_hooks(
  938. 'client.connack',
  939. [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)],
  940. AckProps
  941. ),
  942. return_connack(
  943. ?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
  944. ensure_keepalive(NAckProps, Channel)
  945. );
  946. handle_out(connack, ReasonCode, Channel = #channel{conninfo = ConnInfo}) ->
  947. Reason = emqx_reason_codes:name(ReasonCode),
  948. AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
  949. AckPacket = ?CONNACK_PACKET(
  950. case maps:get(proto_ver, ConnInfo) of
  951. ?MQTT_PROTO_V5 -> ReasonCode;
  952. _ -> emqx_reason_codes:compat(connack, ReasonCode)
  953. end,
  954. sp(false),
  955. AckProps
  956. ),
  957. shutdown(Reason, AckPacket, Channel);
  958. %% Optimize?
  959. handle_out(publish, [], Channel) ->
  960. {ok, Channel};
  961. handle_out(publish, Publishes, Channel) ->
  962. {Packets, NChannel} = do_deliver(Publishes, Channel),
  963. {ok, {outgoing, Packets}, NChannel};
  964. handle_out(puback, {PacketId, ReasonCode}, Channel) ->
  965. {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
  966. handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
  967. {ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel};
  968. handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
  969. {ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel};
  970. handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
  971. {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
  972. handle_out(suback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
  973. return_sub_unsub_ack(?SUBACK_PACKET(PacketId, ReasonCodes), Channel);
  974. handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
  975. ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
  976. return_sub_unsub_ack(?SUBACK_PACKET(PacketId, ReasonCodes1), Channel);
  977. handle_out(unsuback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
  978. return_sub_unsub_ack(?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel);
  979. handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) ->
  980. return_sub_unsub_ack(?UNSUBACK_PACKET(PacketId), Channel);
  981. handle_out(disconnect, ReasonCode, Channel) when is_integer(ReasonCode) ->
  982. ReasonName = disconnect_reason(ReasonCode),
  983. handle_out(disconnect, {ReasonCode, ReasonName}, Channel);
  984. handle_out(disconnect, {ReasonCode, ReasonName}, Channel) ->
  985. handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel);
  986. handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) ->
  987. Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
  988. {ok, [{outgoing, Packet}, {close, ReasonName}], Channel};
  989. handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
  990. {ok, {close, ReasonName}, Channel};
  991. handle_out(auth, {ReasonCode, Properties}, Channel) ->
  992. {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
  993. handle_out(Type, Data, Channel) ->
  994. ?SLOG(error, #{msg => "unexpected_outgoing", type => Type, data => Data}),
  995. {ok, Channel}.
  996. %%--------------------------------------------------------------------
  997. %% Return ConnAck
  998. %%--------------------------------------------------------------------
  999. return_connack(AckPacket, Channel) ->
  1000. Replies = [{event, connected}, {connack, AckPacket}],
  1001. case maybe_resume_session(Channel) of
  1002. ignore ->
  1003. {ok, Replies, Channel};
  1004. {ok, Publishes, NSession} ->
  1005. NChannel1 = Channel#channel{
  1006. resuming = false,
  1007. pendings = [],
  1008. session = NSession
  1009. },
  1010. {Packets, NChannel2} = do_deliver(Publishes, NChannel1),
  1011. Outgoing = [{outgoing, Packets} || length(Packets) > 0],
  1012. {ok, Replies ++ Outgoing, NChannel2}
  1013. end.
  1014. %%--------------------------------------------------------------------
  1015. %% Deliver publish: broker -> client
  1016. %%--------------------------------------------------------------------
  1017. %% return list(emqx_types:packet())
  1018. do_deliver({pubrel, PacketId}, Channel) ->
  1019. {[?PUBREL_PACKET(PacketId, ?RC_SUCCESS)], Channel};
  1020. do_deliver(
  1021. {PacketId, Msg},
  1022. Channel = #channel{
  1023. clientinfo =
  1024. ClientInfo =
  1025. #{mountpoint := MountPoint}
  1026. }
  1027. ) ->
  1028. ok = emqx_metrics:inc('messages.delivered'),
  1029. Msg1 = emqx_hooks:run_fold(
  1030. 'message.delivered',
  1031. [ClientInfo],
  1032. emqx_message:update_expiry(Msg)
  1033. ),
  1034. Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
  1035. Packet = emqx_message:to_packet(PacketId, Msg2),
  1036. {NPacket, NChannel} = packing_alias(Packet, Channel),
  1037. {[NPacket], NChannel};
  1038. do_deliver([Publish], Channel) ->
  1039. do_deliver(Publish, Channel);
  1040. do_deliver(Publishes, Channel) when is_list(Publishes) ->
  1041. {Packets, NChannel} =
  1042. lists:foldl(
  1043. fun(Publish, {Acc, Chann}) ->
  1044. {Packets, NChann} = do_deliver(Publish, Chann),
  1045. {Packets ++ Acc, NChann}
  1046. end,
  1047. {[], Channel},
  1048. Publishes
  1049. ),
  1050. {lists:reverse(Packets), NChannel}.
  1051. %%--------------------------------------------------------------------
  1052. %% Handle out suback
  1053. %%--------------------------------------------------------------------
  1054. return_sub_unsub_ack(Packet, Channel) ->
  1055. {ok, [{outgoing, Packet}, {event, updated}], Channel}.
  1056. %%--------------------------------------------------------------------
  1057. %% Handle call
  1058. %%--------------------------------------------------------------------
  1059. -spec handle_call(Req :: term(), channel()) ->
  1060. {reply, Reply :: term(), channel()}
  1061. | {shutdown, Reason :: term(), Reply :: term(), channel()}
  1062. | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}.
  1063. handle_call(
  1064. kick,
  1065. Channel = #channel{
  1066. conn_state = ConnState,
  1067. will_msg = WillMsg,
  1068. clientinfo = ClientInfo,
  1069. conninfo = #{proto_ver := ProtoVer}
  1070. }
  1071. ) ->
  1072. (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
  1073. Channel1 =
  1074. case ConnState of
  1075. connected -> ensure_disconnected(kicked, Channel);
  1076. _ -> Channel
  1077. end,
  1078. case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
  1079. true ->
  1080. shutdown(
  1081. kicked,
  1082. ok,
  1083. ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
  1084. Channel1
  1085. );
  1086. _ ->
  1087. shutdown(kicked, ok, Channel1)
  1088. end;
  1089. handle_call(discard, Channel) ->
  1090. disconnect_and_shutdown(discarded, ok, Channel);
  1091. %% Session Takeover
  1092. handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
  1093. reply(Session, Channel#channel{takeover = true});
  1094. handle_call(
  1095. {takeover, 'end'},
  1096. Channel = #channel{
  1097. session = Session,
  1098. pendings = Pendings,
  1099. conninfo = #{clientid := ClientId}
  1100. }
  1101. ) ->
  1102. ok = emqx_session:takeover(Session),
  1103. %% TODO: Should not drain deliver here (side effect)
  1104. Delivers = emqx_utils:drain_deliver(),
  1105. AllPendings = lists:append(Delivers, Pendings),
  1106. ?tp(
  1107. debug,
  1108. emqx_channel_takeover_end,
  1109. #{clientid => ClientId}
  1110. ),
  1111. disconnect_and_shutdown(takenover, AllPendings, Channel);
  1112. handle_call(list_authz_cache, Channel) ->
  1113. {reply, emqx_authz_cache:list_authz_cache(), Channel};
  1114. handle_call(
  1115. {keepalive, Interval},
  1116. Channel = #channel{
  1117. keepalive = KeepAlive,
  1118. conninfo = ConnInfo
  1119. }
  1120. ) ->
  1121. ClientId = info(clientid, Channel),
  1122. NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive),
  1123. NConnInfo = maps:put(keepalive, Interval, ConnInfo),
  1124. NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
  1125. SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
  1126. ChanInfo1 = info(NChannel),
  1127. emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
  1128. reply(ok, reset_timer(alive_timer, NChannel));
  1129. handle_call(Req, Channel) ->
  1130. ?SLOG(error, #{msg => "unexpected_call", call => Req}),
  1131. reply(ignored, Channel).
  1132. %%--------------------------------------------------------------------
  1133. %% Handle Info
  1134. %%--------------------------------------------------------------------
  1135. -spec handle_info(Info :: term(), channel()) ->
  1136. ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
  1137. handle_info({subscribe, TopicFilters}, Channel) ->
  1138. {_, NChannel} = lists:foldl(
  1139. fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
  1140. do_subscribe(TopicFilter, SubOpts, ChannelAcc)
  1141. end,
  1142. {[], Channel},
  1143. parse_topic_filters(TopicFilters)
  1144. ),
  1145. {ok, NChannel};
  1146. handle_info({unsubscribe, TopicFilters}, Channel) ->
  1147. {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
  1148. {ok, NChannel};
  1149. handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
  1150. shutdown(Reason, Channel);
  1151. handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
  1152. shutdown(Reason, Channel);
  1153. handle_info(
  1154. {sock_closed, Reason},
  1155. Channel =
  1156. #channel{
  1157. conn_state = ConnState
  1158. }
  1159. ) when
  1160. ConnState =:= connected orelse ConnState =:= reauthenticating
  1161. ->
  1162. Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
  1163. case maybe_shutdown(Reason, Channel1) of
  1164. {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
  1165. Shutdown -> Shutdown
  1166. end;
  1167. handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
  1168. ?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}),
  1169. {ok, Channel};
  1170. handle_info(clean_authz_cache, Channel) ->
  1171. ok = emqx_authz_cache:empty_authz_cache(),
  1172. {ok, Channel};
  1173. handle_info(die_if_test = Info, Channel) ->
  1174. die_if_test_compiled(),
  1175. ?SLOG(error, #{msg => "unexpected_info", info => Info}),
  1176. {ok, Channel};
  1177. handle_info({disconnect, ReasonCode, ReasonName, Props}, Channel) ->
  1178. handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel);
  1179. handle_info({puback, PacketId, PubRes, RC}, Channel) ->
  1180. do_finish_publish(PacketId, PubRes, RC, Channel);
  1181. handle_info(Info, Channel) ->
  1182. ?SLOG(error, #{msg => "unexpected_info", info => Info}),
  1183. {ok, Channel}.
  1184. -ifdef(TEST).
  1185. -spec die_if_test_compiled() -> no_return().
  1186. die_if_test_compiled() ->
  1187. exit(normal).
  1188. -else.
  1189. die_if_test_compiled() ->
  1190. ok.
  1191. -endif.
  1192. %%--------------------------------------------------------------------
  1193. %% Handle timeout
  1194. %%--------------------------------------------------------------------
  1195. -spec handle_timeout(reference(), Msg :: term(), channel()) ->
  1196. {ok, channel()}
  1197. | {ok, replies(), channel()}
  1198. | {shutdown, Reason :: term(), channel()}.
  1199. handle_timeout(
  1200. _TRef,
  1201. {keepalive, _StatVal},
  1202. Channel = #channel{keepalive = undefined}
  1203. ) ->
  1204. {ok, Channel};
  1205. handle_timeout(
  1206. _TRef,
  1207. {keepalive, _StatVal},
  1208. Channel = #channel{conn_state = disconnected}
  1209. ) ->
  1210. {ok, Channel};
  1211. handle_timeout(
  1212. _TRef,
  1213. {keepalive, StatVal},
  1214. Channel = #channel{keepalive = Keepalive}
  1215. ) ->
  1216. case emqx_keepalive:check(StatVal, Keepalive) of
  1217. {ok, NKeepalive} ->
  1218. NChannel = Channel#channel{keepalive = NKeepalive},
  1219. {ok, reset_timer(alive_timer, NChannel)};
  1220. {error, timeout} ->
  1221. handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
  1222. end;
  1223. handle_timeout(
  1224. _TRef,
  1225. retry_delivery,
  1226. Channel = #channel{conn_state = disconnected}
  1227. ) ->
  1228. {ok, Channel};
  1229. handle_timeout(
  1230. _TRef,
  1231. retry_delivery,
  1232. Channel = #channel{session = Session, clientinfo = ClientInfo}
  1233. ) ->
  1234. case emqx_session:retry(ClientInfo, Session) of
  1235. {ok, NSession} ->
  1236. NChannel = Channel#channel{session = NSession},
  1237. {ok, clean_timer(retry_timer, NChannel)};
  1238. {ok, Publishes, Timeout, NSession} ->
  1239. NChannel = Channel#channel{session = NSession},
  1240. handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
  1241. end;
  1242. handle_timeout(
  1243. _TRef,
  1244. expire_awaiting_rel,
  1245. Channel = #channel{conn_state = disconnected}
  1246. ) ->
  1247. {ok, Channel};
  1248. handle_timeout(
  1249. _TRef,
  1250. expire_awaiting_rel,
  1251. Channel = #channel{session = Session, clientinfo = ClientInfo}
  1252. ) ->
  1253. case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
  1254. {ok, NSession} ->
  1255. NChannel = Channel#channel{session = NSession},
  1256. {ok, clean_timer(await_timer, NChannel)};
  1257. {ok, Timeout, NSession} ->
  1258. NChannel = Channel#channel{session = NSession},
  1259. {ok, reset_timer(await_timer, Timeout, NChannel)}
  1260. end;
  1261. handle_timeout(_TRef, expire_session, Channel) ->
  1262. shutdown(expired, Channel);
  1263. handle_timeout(
  1264. _TRef, will_message, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}
  1265. ) ->
  1266. (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
  1267. {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
  1268. handle_timeout(
  1269. _TRef,
  1270. expire_quota_limit,
  1271. #channel{quota = Quota} = Channel
  1272. ) ->
  1273. case emqx_limiter_container:retry(?LIMITER_ROUTING, Quota) of
  1274. {_, Intv, Quota2} ->
  1275. Channel2 = ensure_timer(quota_timer, Intv, Channel#channel{quota = Quota2}),
  1276. {ok, Channel2};
  1277. {_, Quota2} ->
  1278. {ok, clean_timer(quota_timer, Channel#channel{quota = Quota2})}
  1279. end;
  1280. handle_timeout(_TRef, Msg, Channel) ->
  1281. ?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}),
  1282. {ok, Channel}.
  1283. %%--------------------------------------------------------------------
  1284. %% Ensure timers
  1285. %%--------------------------------------------------------------------
  1286. ensure_timer([Name], Channel) ->
  1287. ensure_timer(Name, Channel);
  1288. ensure_timer([Name | Rest], Channel) ->
  1289. ensure_timer(Rest, ensure_timer(Name, Channel));
  1290. ensure_timer(Name, Channel = #channel{timers = Timers}) ->
  1291. TRef = maps:get(Name, Timers, undefined),
  1292. Time = interval(Name, Channel),
  1293. case TRef == undefined andalso Time > 0 of
  1294. true -> ensure_timer(Name, Time, Channel);
  1295. %% Timer disabled or exists
  1296. false -> Channel
  1297. end.
  1298. ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
  1299. Msg = maps:get(Name, ?TIMER_TABLE),
  1300. TRef = emqx_utils:start_timer(Time, Msg),
  1301. Channel#channel{timers = Timers#{Name => TRef}}.
  1302. reset_timer(Name, Channel) ->
  1303. ensure_timer(Name, clean_timer(Name, Channel)).
  1304. reset_timer(Name, Time, Channel) ->
  1305. ensure_timer(Name, Time, clean_timer(Name, Channel)).
  1306. clean_timer(Name, Channel = #channel{timers = Timers}) ->
  1307. Channel#channel{timers = maps:remove(Name, Timers)}.
  1308. interval(alive_timer, #channel{keepalive = KeepAlive}) ->
  1309. emqx_keepalive:info(interval, KeepAlive);
  1310. interval(retry_timer, #channel{session = Session}) ->
  1311. emqx_session:info(retry_interval, Session);
  1312. interval(await_timer, #channel{session = Session}) ->
  1313. emqx_session:info(await_rel_timeout, Session);
  1314. interval(expire_timer, #channel{conninfo = ConnInfo}) ->
  1315. maps:get(expiry_interval, ConnInfo);
  1316. interval(will_timer, #channel{will_msg = WillMsg}) ->
  1317. timer:seconds(will_delay_interval(WillMsg)).
  1318. %%--------------------------------------------------------------------
  1319. %% Terminate
  1320. %%--------------------------------------------------------------------
  1321. -spec terminate(any(), channel()) -> ok.
  1322. terminate(_, #channel{conn_state = idle} = _Channel) ->
  1323. ok;
  1324. terminate(normal, Channel) ->
  1325. run_terminate_hook(normal, Channel);
  1326. terminate({shutdown, kicked}, Channel) ->
  1327. run_terminate_hook(kicked, Channel);
  1328. terminate({shutdown, Reason}, Channel) when
  1329. Reason =:= discarded;
  1330. Reason =:= takenover
  1331. ->
  1332. run_terminate_hook(Reason, Channel);
  1333. terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
  1334. %% since will_msg is set to undefined as soon as it is published,
  1335. %% if will_msg still exists when the session is terminated, it
  1336. %% must be published immediately.
  1337. WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
  1338. run_terminate_hook(Reason, Channel).
  1339. run_terminate_hook(_Reason, #channel{session = undefined}) ->
  1340. ok;
  1341. run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
  1342. emqx_session:terminate(ClientInfo, Reason, Session).
  1343. %%--------------------------------------------------------------------
  1344. %% Internal functions
  1345. %%--------------------------------------------------------------------
  1346. overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
  1347. emqx_olp:backoff(Zone),
  1348. ok.
  1349. %%--------------------------------------------------------------------
  1350. %% Enrich MQTT Connect Info
  1351. enrich_conninfo(
  1352. ConnPkt = #mqtt_packet_connect{
  1353. proto_name = ProtoName,
  1354. proto_ver = ProtoVer,
  1355. clean_start = CleanStart,
  1356. keepalive = Keepalive,
  1357. properties = ConnProps,
  1358. clientid = ClientId,
  1359. username = Username
  1360. },
  1361. Channel = #channel{
  1362. conninfo = ConnInfo,
  1363. clientinfo = #{zone := Zone}
  1364. }
  1365. ) ->
  1366. ExpiryInterval = expiry_interval(Zone, ConnPkt),
  1367. NConnInfo = ConnInfo#{
  1368. proto_name => ProtoName,
  1369. proto_ver => ProtoVer,
  1370. clean_start => CleanStart,
  1371. keepalive => Keepalive,
  1372. clientid => ClientId,
  1373. username => Username,
  1374. conn_props => ConnProps,
  1375. expiry_interval => ExpiryInterval,
  1376. receive_maximum => receive_maximum(Zone, ConnProps)
  1377. },
  1378. {ok, Channel#channel{conninfo = NConnInfo}}.
  1379. %% If the Session Expiry Interval is absent the value 0 is used.
  1380. expiry_interval(_, #mqtt_packet_connect{
  1381. proto_ver = ?MQTT_PROTO_V5,
  1382. properties = ConnProps
  1383. }) ->
  1384. timer:seconds(emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0));
  1385. expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
  1386. get_mqtt_conf(Zone, session_expiry_interval);
  1387. expiry_interval(_, #mqtt_packet_connect{clean_start = true}) ->
  1388. 0.
  1389. receive_maximum(Zone, ConnProps) ->
  1390. MaxInflightConfig =
  1391. case get_mqtt_conf(Zone, max_inflight) of
  1392. 0 -> ?RECEIVE_MAXIMUM_LIMIT;
  1393. N -> N
  1394. end,
  1395. %% Received might be zero which should be a protocol error
  1396. %% we do not validate MQTT properties here
  1397. %% it is to be caught later
  1398. Received = emqx_mqtt_props:get('Receive-Maximum', ConnProps, MaxInflightConfig),
  1399. erlang:min(Received, MaxInflightConfig).
  1400. %%--------------------------------------------------------------------
  1401. %% Run Connect Hooks
  1402. run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
  1403. ConnProps = emqx_packet:info(properties, ConnPkt),
  1404. case run_hooks('client.connect', [ConnInfo], ConnProps) of
  1405. Error = {error, _Reason} -> Error;
  1406. NConnProps -> {ok, emqx_packet:set_props(NConnProps, ConnPkt), Channel}
  1407. end.
  1408. %%--------------------------------------------------------------------
  1409. %% Check Connect Packet
  1410. check_connect(ConnPkt, #channel{clientinfo = #{zone := Zone}}) ->
  1411. emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
  1412. %%--------------------------------------------------------------------
  1413. %% Enrich Client Info
  1414. enrich_client(ConnPkt, Channel = #channel{clientinfo = ClientInfo}) ->
  1415. Pipe = pipeline(
  1416. [
  1417. fun set_username/2,
  1418. fun set_bridge_mode/2,
  1419. fun maybe_username_as_clientid/2,
  1420. fun maybe_assign_clientid/2,
  1421. fun fix_mountpoint/2
  1422. ],
  1423. ConnPkt,
  1424. ClientInfo
  1425. ),
  1426. case Pipe of
  1427. {ok, NConnPkt, NClientInfo} ->
  1428. {ok, NConnPkt, Channel#channel{clientinfo = NClientInfo}};
  1429. {error, ReasonCode, NClientInfo} ->
  1430. {error, ReasonCode, Channel#channel{clientinfo = NClientInfo}}
  1431. end.
  1432. set_username(
  1433. #mqtt_packet_connect{username = Username},
  1434. ClientInfo = #{username := undefined}
  1435. ) ->
  1436. {ok, ClientInfo#{username => Username}};
  1437. set_username(_ConnPkt, ClientInfo) ->
  1438. {ok, ClientInfo}.
  1439. set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, ClientInfo) ->
  1440. {ok, ClientInfo#{is_bridge => true}};
  1441. set_bridge_mode(_ConnPkt, _ClientInfo) ->
  1442. ok.
  1443. maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) ->
  1444. {ok, ClientInfo};
  1445. maybe_username_as_clientid(
  1446. _ConnPkt,
  1447. ClientInfo = #{
  1448. zone := Zone,
  1449. username := Username
  1450. }
  1451. ) ->
  1452. case get_mqtt_conf(Zone, use_username_as_clientid) of
  1453. true when Username =/= <<>> -> {ok, ClientInfo#{clientid => Username}};
  1454. true -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID, ClientInfo};
  1455. false -> ok
  1456. end.
  1457. maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId}) when
  1458. ClientId /= undefined
  1459. ->
  1460. {ok, ClientInfo};
  1461. maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
  1462. %% Generate a rand clientId
  1463. {ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
  1464. maybe_assign_clientid(#mqtt_packet_connect{clientid = ClientId}, ClientInfo) ->
  1465. {ok, ClientInfo#{clientid => ClientId}}.
  1466. fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) ->
  1467. ok;
  1468. fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
  1469. MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
  1470. {ok, ClientInfo#{mountpoint := MountPoint1}}.
  1471. %%--------------------------------------------------------------------
  1472. %% Set log metadata
  1473. set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
  1474. emqx_logger:set_metadata_clientid(ClientId).
  1475. %%--------------------------------------------------------------------
  1476. %% Check banned
  1477. check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
  1478. case emqx_banned:check(ClientInfo) of
  1479. true -> {error, ?RC_BANNED};
  1480. false -> ok
  1481. end.
  1482. %%--------------------------------------------------------------------
  1483. %% Flapping
  1484. count_flapping_event(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
  1485. _ = emqx_flapping:detect(ClientInfo),
  1486. ok.
  1487. %%--------------------------------------------------------------------
  1488. %% Authenticate
  1489. authenticate(
  1490. ?CONNECT_PACKET(
  1491. #mqtt_packet_connect{
  1492. proto_ver = ?MQTT_PROTO_V5,
  1493. properties = #{'Authentication-Method' := AuthMethod} = Properties
  1494. }
  1495. ),
  1496. #channel{
  1497. clientinfo = ClientInfo,
  1498. auth_cache = AuthCache
  1499. } = Channel
  1500. ) ->
  1501. AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
  1502. do_authenticate(
  1503. ClientInfo#{
  1504. auth_method => AuthMethod,
  1505. auth_data => AuthData,
  1506. auth_cache => AuthCache
  1507. },
  1508. Channel
  1509. );
  1510. authenticate(
  1511. ?CONNECT_PACKET(#mqtt_packet_connect{password = Password}),
  1512. #channel{clientinfo = ClientInfo} = Channel
  1513. ) ->
  1514. do_authenticate(ClientInfo#{password => Password}, Channel);
  1515. authenticate(
  1516. ?AUTH_PACKET(_, #{'Authentication-Method' := AuthMethod} = Properties),
  1517. #channel{
  1518. clientinfo = ClientInfo,
  1519. conninfo = #{conn_props := ConnProps},
  1520. auth_cache = AuthCache
  1521. } = Channel
  1522. ) ->
  1523. case emqx_mqtt_props:get('Authentication-Method', ConnProps, undefined) of
  1524. AuthMethod ->
  1525. AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
  1526. do_authenticate(
  1527. ClientInfo#{
  1528. auth_method => AuthMethod,
  1529. auth_data => AuthData,
  1530. auth_cache => AuthCache
  1531. },
  1532. Channel
  1533. );
  1534. _ ->
  1535. {error, ?RC_BAD_AUTHENTICATION_METHOD}
  1536. end.
  1537. do_authenticate(
  1538. #{auth_method := AuthMethod} = Credential,
  1539. #channel{clientinfo = ClientInfo} = Channel
  1540. ) ->
  1541. Properties = #{'Authentication-Method' => AuthMethod},
  1542. case emqx_access_control:authenticate(Credential) of
  1543. {ok, AuthResult} ->
  1544. {ok, Properties, Channel#channel{
  1545. clientinfo = merge_auth_result(ClientInfo, AuthResult),
  1546. auth_cache = #{}
  1547. }};
  1548. {ok, AuthResult, AuthData} ->
  1549. {ok, Properties#{'Authentication-Data' => AuthData}, Channel#channel{
  1550. clientinfo = merge_auth_result(ClientInfo, AuthResult),
  1551. auth_cache = #{}
  1552. }};
  1553. {continue, AuthCache} ->
  1554. {continue, Properties, Channel#channel{auth_cache = AuthCache}};
  1555. {continue, AuthData, AuthCache} ->
  1556. {continue, Properties#{'Authentication-Data' => AuthData}, Channel#channel{
  1557. auth_cache = AuthCache
  1558. }};
  1559. {error, Reason} ->
  1560. {error, emqx_reason_codes:connack_error(Reason)}
  1561. end;
  1562. do_authenticate(Credential, #channel{clientinfo = ClientInfo} = Channel) ->
  1563. case emqx_access_control:authenticate(Credential) of
  1564. {ok, AuthResult} ->
  1565. {ok, #{}, Channel#channel{clientinfo = merge_auth_result(ClientInfo, AuthResult)}};
  1566. {error, Reason} ->
  1567. {error, emqx_reason_codes:connack_error(Reason)}
  1568. end.
  1569. merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
  1570. IsSuperuser = maps:get(is_superuser, AuthResult, false),
  1571. maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).
  1572. %%--------------------------------------------------------------------
  1573. %% Process Topic Alias
  1574. process_alias(
  1575. Packet = #mqtt_packet{
  1576. variable =
  1577. #mqtt_packet_publish{
  1578. topic_name = <<>>,
  1579. properties = #{'Topic-Alias' := AliasId}
  1580. } = Publish
  1581. },
  1582. Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}
  1583. ) ->
  1584. case find_alias(inbound, AliasId, TopicAliases) of
  1585. {ok, Topic} ->
  1586. NPublish = Publish#mqtt_packet_publish{topic_name = Topic},
  1587. {ok, Packet#mqtt_packet{variable = NPublish}, Channel};
  1588. error ->
  1589. {error, ?RC_PROTOCOL_ERROR}
  1590. end;
  1591. process_alias(
  1592. #mqtt_packet{
  1593. variable = #mqtt_packet_publish{
  1594. topic_name = Topic,
  1595. properties = #{'Topic-Alias' := AliasId}
  1596. }
  1597. },
  1598. Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}
  1599. ) ->
  1600. NTopicAliases = save_alias(inbound, AliasId, Topic, TopicAliases),
  1601. {ok, Channel#channel{topic_aliases = NTopicAliases}};
  1602. process_alias(_Packet, Channel) ->
  1603. {ok, Channel}.
  1604. %%--------------------------------------------------------------------
  1605. %% Packing Topic Alias
  1606. packing_alias(
  1607. Packet = #mqtt_packet{
  1608. variable =
  1609. #mqtt_packet_publish{
  1610. topic_name = Topic,
  1611. properties = Prop
  1612. } = Publish
  1613. },
  1614. Channel =
  1615. ?IS_MQTT_V5 = #channel{
  1616. topic_aliases = TopicAliases,
  1617. alias_maximum = Limits
  1618. }
  1619. ) ->
  1620. case find_alias(outbound, Topic, TopicAliases) of
  1621. {ok, AliasId} ->
  1622. NPublish = Publish#mqtt_packet_publish{
  1623. topic_name = <<>>,
  1624. properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
  1625. },
  1626. {Packet#mqtt_packet{variable = NPublish}, Channel};
  1627. error ->
  1628. #{outbound := Aliases} = TopicAliases,
  1629. AliasId = maps:size(Aliases) + 1,
  1630. case
  1631. (Limits =:= undefined) orelse
  1632. (AliasId =< maps:get(outbound, Limits, 0))
  1633. of
  1634. true ->
  1635. NTopicAliases = save_alias(outbound, AliasId, Topic, TopicAliases),
  1636. NChannel = Channel#channel{topic_aliases = NTopicAliases},
  1637. NPublish = Publish#mqtt_packet_publish{
  1638. topic_name = Topic,
  1639. properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
  1640. },
  1641. {Packet#mqtt_packet{variable = NPublish}, NChannel};
  1642. false ->
  1643. {Packet, Channel}
  1644. end
  1645. end;
  1646. packing_alias(Packet, Channel) ->
  1647. {Packet, Channel}.
  1648. %%--------------------------------------------------------------------
  1649. %% Check quota state
  1650. check_quota_exceeded(_, #channel{timers = Timers}) ->
  1651. case maps:get(quota_timer, Timers, undefined) of
  1652. undefined -> ok;
  1653. _ -> {error, ?RC_QUOTA_EXCEEDED}
  1654. end.
  1655. %%--------------------------------------------------------------------
  1656. %% Check Pub Alias
  1657. check_pub_alias(
  1658. #mqtt_packet{
  1659. variable = #mqtt_packet_publish{
  1660. properties = #{'Topic-Alias' := AliasId}
  1661. }
  1662. },
  1663. #channel{alias_maximum = Limits}
  1664. ) ->
  1665. case
  1666. (Limits =:= undefined) orelse
  1667. (AliasId =< maps:get(inbound, Limits, ?MAX_TOPIC_AlIAS))
  1668. of
  1669. true -> ok;
  1670. false -> {error, ?RC_TOPIC_ALIAS_INVALID}
  1671. end;
  1672. check_pub_alias(_Packet, _Channel) ->
  1673. ok.
  1674. %%--------------------------------------------------------------------
  1675. %% Authorization action
  1676. authz_action(#mqtt_packet{
  1677. header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{}
  1678. }) ->
  1679. ?AUTHZ_PUBLISH(QoS, Retain);
  1680. authz_action(#mqtt_packet{
  1681. header = #mqtt_packet_header{qos = QoS}, variable = #mqtt_packet_subscribe{}
  1682. }) ->
  1683. ?AUTHZ_SUBSCRIBE(QoS);
  1684. %% Will message
  1685. authz_action(#message{qos = QoS, flags = #{retain := Retain}}) ->
  1686. ?AUTHZ_PUBLISH(QoS, Retain);
  1687. authz_action(#message{qos = QoS}) ->
  1688. ?AUTHZ_PUBLISH(QoS).
  1689. %%--------------------------------------------------------------------
  1690. %% Check Pub Authorization
  1691. check_pub_authz(
  1692. #mqtt_packet{
  1693. variable = #mqtt_packet_publish{topic_name = Topic}
  1694. } = Packet,
  1695. #channel{clientinfo = ClientInfo}
  1696. ) ->
  1697. Action = authz_action(Packet),
  1698. case emqx_access_control:authorize(ClientInfo, Action, Topic) of
  1699. allow -> ok;
  1700. deny -> {error, ?RC_NOT_AUTHORIZED}
  1701. end.
  1702. %%--------------------------------------------------------------------
  1703. %% Check Pub Caps
  1704. check_pub_caps(
  1705. #mqtt_packet{
  1706. header = #mqtt_packet_header{
  1707. qos = QoS,
  1708. retain = Retain
  1709. },
  1710. variable = #mqtt_packet_publish{topic_name = Topic}
  1711. },
  1712. #channel{clientinfo = #{zone := Zone}}
  1713. ) ->
  1714. emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
  1715. %%--------------------------------------------------------------------
  1716. %% Check Sub Authorization
  1717. check_sub_authzs(Packet, TopicFilters, Channel) ->
  1718. Action = authz_action(Packet),
  1719. check_sub_authzs(Action, TopicFilters, Channel, []).
  1720. check_sub_authzs(
  1721. Action,
  1722. [TopicFilter = {Topic, _} | More],
  1723. Channel = #channel{clientinfo = ClientInfo},
  1724. Acc
  1725. ) ->
  1726. case emqx_access_control:authorize(ClientInfo, Action, Topic) of
  1727. allow ->
  1728. check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
  1729. deny ->
  1730. check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
  1731. end;
  1732. check_sub_authzs(_Action, [], _Channel, Acc) ->
  1733. lists:reverse(Acc).
  1734. %%--------------------------------------------------------------------
  1735. %% Check Sub Caps
  1736. check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
  1737. emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
  1738. %%--------------------------------------------------------------------
  1739. %% Enrich SubId
  1740. enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
  1741. [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
  1742. enrich_subopts_subid(_Properties, TopicFilters) ->
  1743. TopicFilters.
  1744. %%--------------------------------------------------------------------
  1745. %% Enrich SubOpts
  1746. enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
  1747. SubOpts;
  1748. enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
  1749. NL = flag(get_mqtt_conf(Zone, ignore_loop_deliver)),
  1750. SubOpts#{rap => flag(IsBridge), nl => NL}.
  1751. %%--------------------------------------------------------------------
  1752. %% Enrich ConnAck Caps
  1753. enrich_connack_caps(
  1754. AckProps,
  1755. ?IS_MQTT_V5 = #channel{
  1756. clientinfo = #{
  1757. zone := Zone
  1758. }
  1759. }
  1760. ) ->
  1761. #{
  1762. max_packet_size := MaxPktSize,
  1763. max_qos_allowed := MaxQoS,
  1764. retain_available := Retain,
  1765. max_topic_alias := MaxAlias,
  1766. shared_subscription := Shared,
  1767. wildcard_subscription := Wildcard
  1768. } = emqx_mqtt_caps:get_caps(Zone),
  1769. NAckProps = AckProps#{
  1770. 'Retain-Available' => flag(Retain),
  1771. 'Maximum-Packet-Size' => MaxPktSize,
  1772. 'Topic-Alias-Maximum' => MaxAlias,
  1773. 'Wildcard-Subscription-Available' => flag(Wildcard),
  1774. 'Subscription-Identifier-Available' => 1,
  1775. 'Shared-Subscription-Available' => flag(Shared)
  1776. },
  1777. %% MQTT 5.0 - 3.2.2.3.4:
  1778. %% It is a Protocol Error to include Maximum QoS more than once,
  1779. %% or to have a value other than 0 or 1. If the Maximum QoS is absent,
  1780. %% the Client uses a Maximum QoS of 2.
  1781. case MaxQoS =:= 2 of
  1782. true -> NAckProps;
  1783. _ -> NAckProps#{'Maximum-QoS' => MaxQoS}
  1784. end;
  1785. enrich_connack_caps(AckProps, _Channel) ->
  1786. AckProps.
  1787. %%--------------------------------------------------------------------
  1788. %% Enrich server keepalive
  1789. enrich_server_keepalive(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{zone := Zone}}) ->
  1790. case get_mqtt_conf(Zone, server_keepalive) of
  1791. disabled -> AckProps;
  1792. Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
  1793. end;
  1794. enrich_server_keepalive(AckProps, _Channel) ->
  1795. AckProps.
  1796. %%--------------------------------------------------------------------
  1797. %% Enrich response information
  1798. enrich_response_information(AckProps, #channel{
  1799. conninfo = #{conn_props := ConnProps},
  1800. clientinfo = #{zone := Zone}
  1801. }) ->
  1802. case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of
  1803. 0 ->
  1804. AckProps;
  1805. 1 ->
  1806. AckProps#{
  1807. 'Response-Information' =>
  1808. case get_mqtt_conf(Zone, response_information, "") of
  1809. "" -> undefined;
  1810. RspInfo -> RspInfo
  1811. end
  1812. }
  1813. end.
  1814. %%--------------------------------------------------------------------
  1815. %% Enrich Assigned ClientId
  1816. enrich_assigned_clientid(AckProps, #channel{
  1817. conninfo = ConnInfo,
  1818. clientinfo = #{clientid := ClientId}
  1819. }) ->
  1820. case maps:get(clientid, ConnInfo) of
  1821. %% Original ClientId is null.
  1822. <<>> ->
  1823. AckProps#{'Assigned-Client-Identifier' => ClientId};
  1824. _Origin ->
  1825. AckProps
  1826. end.
  1827. %%--------------------------------------------------------------------
  1828. %% Ensure connected
  1829. ensure_connected(
  1830. Channel = #channel{
  1831. conninfo = ConnInfo,
  1832. clientinfo = ClientInfo
  1833. }
  1834. ) ->
  1835. NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
  1836. ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
  1837. Channel#channel{
  1838. conninfo = trim_conninfo(NConnInfo),
  1839. conn_state = connected
  1840. }.
  1841. trim_conninfo(ConnInfo) ->
  1842. maps:without(
  1843. [
  1844. %% NOTE
  1845. %% We remove the peercert because it duplicates what's stored in the socket,
  1846. %% otherwise it wastes about 1KB per connection.
  1847. peercert
  1848. ],
  1849. ConnInfo
  1850. ).
  1851. %%--------------------------------------------------------------------
  1852. %% Init Alias Maximum
  1853. init_alias_maximum(
  1854. #mqtt_packet_connect{
  1855. proto_ver = ?MQTT_PROTO_V5,
  1856. properties = Properties
  1857. },
  1858. #{zone := Zone} = _ClientInfo
  1859. ) ->
  1860. #{
  1861. outbound => emqx_mqtt_props:get('Topic-Alias-Maximum', Properties, 0),
  1862. inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone))
  1863. };
  1864. init_alias_maximum(_ConnPkt, _ClientInfo) ->
  1865. undefined.
  1866. %%--------------------------------------------------------------------
  1867. %% Ensure Keepalive
  1868. %% MQTT 5
  1869. ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel = #channel{conninfo = ConnInfo}) ->
  1870. ensure_keepalive_timer(Interval, Channel#channel{conninfo = ConnInfo#{keepalive => Interval}});
  1871. %% MQTT 3,4
  1872. ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
  1873. ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
  1874. ensure_keepalive_timer(0, Channel) ->
  1875. Channel;
  1876. ensure_keepalive_timer(disabled, Channel) ->
  1877. Channel;
  1878. ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
  1879. Multiplier = get_mqtt_conf(Zone, keepalive_multiplier),
  1880. RecvCnt = emqx_pd:get_counter(recv_pkt),
  1881. Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
  1882. ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
  1883. clear_keepalive(Channel = #channel{timers = Timers}) ->
  1884. case maps:get(alive_timer, Timers, undefined) of
  1885. undefined ->
  1886. Channel;
  1887. TRef ->
  1888. emqx_utils:cancel_timer(TRef),
  1889. Channel#channel{timers = maps:without([alive_timer], Timers)}
  1890. end.
  1891. %%--------------------------------------------------------------------
  1892. %% Maybe Resume Session
  1893. maybe_resume_session(#channel{resuming = false}) ->
  1894. ignore;
  1895. maybe_resume_session(#channel{
  1896. session = Session,
  1897. resuming = true,
  1898. pendings = Pendings,
  1899. clientinfo = ClientInfo
  1900. }) ->
  1901. {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
  1902. case emqx_session:deliver(ClientInfo, Pendings, Session1) of
  1903. {ok, Session2} ->
  1904. {ok, Publishes, Session2};
  1905. {ok, More, Session2} ->
  1906. {ok, lists:append(Publishes, More), Session2}
  1907. end.
  1908. %%--------------------------------------------------------------------
  1909. %% Maybe Shutdown the Channel
  1910. maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
  1911. case maps:get(expiry_interval, ConnInfo) of
  1912. ?EXPIRE_INTERVAL_INFINITE ->
  1913. {ok, Channel};
  1914. I when I > 0 ->
  1915. {ok, ensure_timer(expire_timer, I, Channel)};
  1916. _ ->
  1917. shutdown(Reason, Channel)
  1918. end.
  1919. %%--------------------------------------------------------------------
  1920. %% Parse Topic Filters
  1921. -compile({inline, [parse_topic_filters/1]}).
  1922. parse_topic_filters(TopicFilters) ->
  1923. lists:map(fun emqx_topic:parse/1, TopicFilters).
  1924. %%--------------------------------------------------------------------
  1925. %% Maybe & Ensure disconnected
  1926. ensure_disconnected(
  1927. Reason,
  1928. Channel = #channel{
  1929. conninfo = ConnInfo,
  1930. clientinfo = ClientInfo
  1931. }
  1932. ) ->
  1933. NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
  1934. ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
  1935. ChanPid = self(),
  1936. emqx_cm:mark_channel_disconnected(ChanPid),
  1937. Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
  1938. %%--------------------------------------------------------------------
  1939. %% Maybe Publish will msg
  1940. maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
  1941. Channel;
  1942. maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
  1943. case will_delay_interval(WillMsg) of
  1944. 0 ->
  1945. ok = publish_will_msg(ClientInfo, WillMsg),
  1946. Channel#channel{will_msg = undefined};
  1947. I ->
  1948. ensure_timer(will_timer, timer:seconds(I), Channel)
  1949. end.
  1950. will_delay_interval(WillMsg) ->
  1951. maps:get(
  1952. 'Will-Delay-Interval',
  1953. emqx_message:get_header(properties, WillMsg, #{}),
  1954. 0
  1955. ).
  1956. publish_will_msg(
  1957. ClientInfo = #{mountpoint := MountPoint},
  1958. Msg = #message{topic = Topic}
  1959. ) ->
  1960. Action = authz_action(Msg),
  1961. PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow,
  1962. ClientBanned = emqx_banned:check(ClientInfo),
  1963. case PublishingDisallowed orelse ClientBanned of
  1964. true ->
  1965. ?tp(
  1966. warning,
  1967. last_will_testament_publish_denied,
  1968. #{
  1969. topic => Topic,
  1970. client_banned => ClientBanned,
  1971. publishing_disallowed => PublishingDisallowed
  1972. }
  1973. ),
  1974. ok;
  1975. false ->
  1976. NMsg = emqx_mountpoint:mount(MountPoint, Msg),
  1977. NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)},
  1978. _ = emqx_broker:publish(NMsg2),
  1979. ok
  1980. end.
  1981. %%--------------------------------------------------------------------
  1982. %% Disconnect Reason
  1983. disconnect_reason(?RC_SUCCESS) -> normal;
  1984. disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode).
  1985. reason_code(takenover) -> ?RC_SESSION_TAKEN_OVER;
  1986. reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER.
  1987. %%--------------------------------------------------------------------
  1988. %% Helper functions
  1989. %%--------------------------------------------------------------------
  1990. -compile({inline, [run_hooks/2, run_hooks/3]}).
  1991. run_hooks(Name, Args) ->
  1992. ok = emqx_metrics:inc(Name),
  1993. emqx_hooks:run(Name, Args).
  1994. run_hooks(Name, Args, Acc) ->
  1995. ok = emqx_metrics:inc(Name),
  1996. emqx_hooks:run_fold(Name, Args, Acc).
  1997. -compile({inline, [find_alias/3, save_alias/4]}).
  1998. find_alias(_, _, undefined) -> error;
  1999. find_alias(inbound, AliasId, _TopicAliases = #{inbound := Aliases}) -> maps:find(AliasId, Aliases);
  2000. find_alias(outbound, Topic, _TopicAliases = #{outbound := Aliases}) -> maps:find(Topic, Aliases).
  2001. save_alias(_, _, _, undefined) ->
  2002. false;
  2003. save_alias(inbound, AliasId, Topic, TopicAliases = #{inbound := Aliases}) ->
  2004. NAliases = maps:put(AliasId, Topic, Aliases),
  2005. TopicAliases#{inbound => NAliases};
  2006. save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
  2007. NAliases = maps:put(Topic, AliasId, Aliases),
  2008. TopicAliases#{outbound => NAliases}.
  2009. -compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
  2010. reply(Reply, Channel) ->
  2011. {reply, Reply, Channel}.
  2012. shutdown(success, Channel) ->
  2013. shutdown(normal, Channel);
  2014. shutdown(Reason, Channel) ->
  2015. {shutdown, Reason, Channel}.
  2016. shutdown(success, Reply, Channel) ->
  2017. shutdown(normal, Reply, Channel);
  2018. shutdown(Reason, Reply, Channel) ->
  2019. {shutdown, Reason, Reply, Channel}.
  2020. shutdown(success, Reply, Packet, Channel) ->
  2021. shutdown(normal, Reply, Packet, Channel);
  2022. shutdown(Reason, Reply, Packet, Channel) ->
  2023. {shutdown, Reason, Reply, Packet, Channel}.
  2024. %% mqtt v5 connected sessions
  2025. disconnect_and_shutdown(
  2026. Reason,
  2027. Reply,
  2028. Channel =
  2029. ?IS_MQTT_V5 =
  2030. #channel{conn_state = ConnState}
  2031. ) when
  2032. ConnState =:= connected orelse ConnState =:= reauthenticating
  2033. ->
  2034. NChannel = ensure_disconnected(Reason, Channel),
  2035. shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
  2036. %% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
  2037. disconnect_and_shutdown(Reason, Reply, Channel) ->
  2038. NChannel = ensure_disconnected(Reason, Channel),
  2039. shutdown(Reason, Reply, NChannel).
  2040. sp(true) -> 1;
  2041. sp(false) -> 0.
  2042. flag(true) -> 1;
  2043. flag(false) -> 0.
  2044. get_mqtt_conf(Zone, Key) ->
  2045. emqx_config:get_zone_conf(Zone, [mqtt, Key]).
  2046. get_mqtt_conf(Zone, Key, Default) ->
  2047. emqx_config:get_zone_conf(Zone, [mqtt, Key], Default).
  2048. %%--------------------------------------------------------------------
  2049. %% For CT tests
  2050. %%--------------------------------------------------------------------
  2051. set_field(Name, Value, Channel) ->
  2052. Pos = emqx_utils:index_of(Name, record_info(fields, channel)),
  2053. setelement(Pos + 1, Channel, Value).