emqx_channel.erl 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% MQTT Channel
  17. -module(emqx_channel).
  18. -include("emqx.hrl").
  19. -include("emqx_mqtt.hrl").
  20. -include("logger.hrl").
  21. -include("types.hrl").
  22. -logger_header("[Channel]").
  23. -export([ info/1
  24. , info/2
  25. , attrs/1
  26. , caps/1
  27. ]).
  28. %% for tests
  29. -export([set/3]).
  30. -export([takeover/2]).
  31. -export([ init/2
  32. , handle_in/2
  33. , handle_out/2
  34. , handle_out/3
  35. , handle_call/2
  36. , handle_cast/2
  37. , handle_info/2
  38. , timeout/3
  39. , terminate/2
  40. ]).
  41. -export([ensure_timer/2]).
  42. -export([gc/3]).
  43. -import(emqx_access_control,
  44. [ authenticate/1
  45. , check_acl/3
  46. ]).
  47. -import(emqx_misc, [start_timer/2]).
  48. -export_type([channel/0]).
  49. -record(channel, {
  50. client :: emqx_types:client(),
  51. session :: emqx_session:session(),
  52. proto_name :: binary(),
  53. proto_ver :: emqx_types:ver(),
  54. keepalive :: non_neg_integer(),
  55. will_msg :: emqx_types:message(),
  56. topic_aliases :: maybe(map()),
  57. alias_maximum :: maybe(map()),
  58. ack_props :: maybe(emqx_types:properties()),
  59. idle_timeout :: timeout(),
  60. retry_timer :: maybe(reference()),
  61. alive_timer :: maybe(reference()),
  62. stats_timer :: disabled | maybe(reference()),
  63. expiry_timer :: maybe(reference()),
  64. gc_state :: emqx_gc:gc_state(), %% GC State
  65. oom_policy :: emqx_oom:oom_policy(), %% OOM Policy
  66. connected :: boolean(),
  67. connected_at :: erlang:timestamp(),
  68. resuming :: boolean(),
  69. pendings :: list()
  70. }).
  71. -opaque(channel() :: #channel{}).
  72. -define(NO_PROPS, undefined).
  73. %%--------------------------------------------------------------------
  74. %% Info, Attrs and Caps
  75. %%--------------------------------------------------------------------
  76. -spec(info(channel()) -> emqx_types:infos()).
  77. info(#channel{client = Client,
  78. session = Session,
  79. proto_name = ProtoName,
  80. proto_ver = ProtoVer,
  81. keepalive = Keepalive,
  82. will_msg = WillMsg,
  83. topic_aliases = Aliases,
  84. stats_timer = StatsTimer,
  85. idle_timeout = IdleTimeout,
  86. gc_state = GCState,
  87. connected = Connected,
  88. connected_at = ConnectedAt}) ->
  89. #{client => Client,
  90. session => if Session == undefined ->
  91. undefined;
  92. true -> emqx_session:info(Session)
  93. end,
  94. proto_name => ProtoName,
  95. proto_ver => ProtoVer,
  96. keepalive => Keepalive,
  97. will_msg => WillMsg,
  98. topic_aliases => Aliases,
  99. enable_stats => case StatsTimer of
  100. disabled -> false;
  101. _Otherwise -> true
  102. end,
  103. idle_timeout => IdleTimeout,
  104. gc_state => emqx_gc:info(GCState),
  105. connected => Connected,
  106. connected_at => ConnectedAt,
  107. resuming => false,
  108. pendings => []
  109. }.
  110. -spec(info(atom(), channel()) -> term()).
  111. info(client, #channel{client = Client}) ->
  112. Client;
  113. info(zone, #channel{client = #{zone := Zone}}) ->
  114. Zone;
  115. info(client_id, #channel{client = #{client_id := ClientId}}) ->
  116. ClientId;
  117. info(session, #channel{session = Session}) ->
  118. Session;
  119. info(proto_name, #channel{proto_name = ProtoName}) ->
  120. ProtoName;
  121. info(proto_ver, #channel{proto_ver = ProtoVer}) ->
  122. ProtoVer;
  123. info(keepalive, #channel{keepalive = Keepalive}) ->
  124. Keepalive;
  125. info(will_msg, #channel{will_msg = WillMsg}) ->
  126. WillMsg;
  127. info(topic_aliases, #channel{topic_aliases = Aliases}) ->
  128. Aliases;
  129. info(enable_stats, #channel{stats_timer = disabled}) ->
  130. false;
  131. info(enable_stats, #channel{stats_timer = _TRef}) ->
  132. true;
  133. info(idle_timeout, #channel{idle_timeout = IdleTimeout}) ->
  134. IdleTimeout;
  135. info(gc_state, #channel{gc_state = GCState}) ->
  136. emqx_gc:info(GCState);
  137. info(connected, #channel{connected = Connected}) ->
  138. Connected;
  139. info(connected_at, #channel{connected_at = ConnectedAt}) ->
  140. ConnectedAt.
  141. -spec(attrs(channel()) -> emqx_types:attrs()).
  142. attrs(#channel{client = Client,
  143. session = Session,
  144. proto_name = ProtoName,
  145. proto_ver = ProtoVer,
  146. keepalive = Keepalive,
  147. connected = Connected,
  148. connected_at = ConnectedAt}) ->
  149. #{client => Client,
  150. session => if Session == undefined ->
  151. undefined;
  152. true -> emqx_session:attrs(Session)
  153. end,
  154. proto_name => ProtoName,
  155. proto_ver => ProtoVer,
  156. keepalive => Keepalive,
  157. connected => Connected,
  158. connected_at => ConnectedAt
  159. }.
  160. -spec(caps(channel()) -> emqx_types:caps()).
  161. caps(#channel{client = #{zone := Zone}}) ->
  162. emqx_mqtt_caps:get_caps(Zone).
  163. %%--------------------------------------------------------------------
  164. %% For unit tests
  165. %%--------------------------------------------------------------------
  166. set(client, Client, Channel) ->
  167. Channel#channel{client = Client};
  168. set(session, Session, Channel) ->
  169. Channel#channel{session = Session}.
  170. %%--------------------------------------------------------------------
  171. %% Takeover session
  172. %%--------------------------------------------------------------------
  173. takeover('begin', Channel = #channel{session = Session}) ->
  174. {ok, Session, Channel#channel{resuming = true}};
  175. takeover('end', Channel = #channel{session = Session,
  176. pendings = Pendings}) ->
  177. ok = emqx_session:takeover(Session),
  178. {ok, Pendings, Channel}.
  179. %%--------------------------------------------------------------------
  180. %% Init a channel
  181. %%--------------------------------------------------------------------
  182. -spec(init(emqx_types:conn(), proplists:proplist()) -> channel()).
  183. init(ConnInfo, Options) ->
  184. Zone = proplists:get_value(zone, Options),
  185. Peercert = maps:get(peercert, ConnInfo, undefined),
  186. Username = case peer_cert_as_username(Options) of
  187. cn -> esockd_peercert:common_name(Peercert);
  188. dn -> esockd_peercert:subject(Peercert);
  189. crt -> Peercert;
  190. _ -> undefined
  191. end,
  192. MountPoint = emqx_zone:get_env(Zone, mountpoint),
  193. Client = maps:merge(#{zone => Zone,
  194. username => Username,
  195. client_id => <<>>,
  196. mountpoint => MountPoint,
  197. is_bridge => false,
  198. is_superuser => false}, ConnInfo),
  199. IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
  200. EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
  201. StatsTimer = if EnableStats -> undefined;
  202. ?Otherwise -> disabled
  203. end,
  204. GcState = emqx_gc:init(emqx_zone:get_env(Zone, force_gc_policy, false)),
  205. OomPolicy = emqx_oom:init(emqx_zone:get_env(Zone, force_shutdown_policy)),
  206. #channel{client = Client,
  207. proto_name = <<"MQTT">>,
  208. proto_ver = ?MQTT_PROTO_V4,
  209. keepalive = 0,
  210. idle_timeout = IdleTimout,
  211. stats_timer = StatsTimer,
  212. gc_state = GcState,
  213. oom_policy = OomPolicy,
  214. connected = false
  215. }.
  216. peer_cert_as_username(Options) ->
  217. proplists:get_value(peer_cert_as_username, Options).
  218. %%--------------------------------------------------------------------
  219. %% Handle incoming packet
  220. %%--------------------------------------------------------------------
  221. -spec(handle_in(emqx_types:packet(), channel())
  222. -> {ok, channel()}
  223. | {ok, emqx_types:packet(), channel()}
  224. | {ok, list(emqx_types:packet()), channel()}
  225. | {stop, Error :: term(), channel()}
  226. | {stop, Error :: term(), emqx_types:packet(), channel()}).
  227. handle_in(?CONNECT_PACKET(_), Channel = #channel{connected = true}) ->
  228. handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
  229. handle_in(?CONNECT_PACKET(
  230. #mqtt_packet_connect{proto_name = ProtoName,
  231. proto_ver = ProtoVer,
  232. keepalive = Keepalive,
  233. client_id = ClientId
  234. } = ConnPkt), Channel) ->
  235. Channel1 = Channel#channel{proto_name = ProtoName,
  236. proto_ver = ProtoVer,
  237. keepalive = Keepalive
  238. },
  239. ok = emqx_logger:set_metadata_client_id(ClientId),
  240. case pipeline([fun validate_in/2,
  241. fun process_props/2,
  242. fun check_connect/2,
  243. fun enrich_client/2,
  244. fun auth_connect/2], ConnPkt, Channel1) of
  245. {ok, NConnPkt, NChannel = #channel{client = #{client_id := ClientId1}}} ->
  246. ok = emqx_logger:set_metadata_client_id(ClientId1),
  247. process_connect(NConnPkt, NChannel);
  248. {error, ReasonCode, NChannel} ->
  249. handle_out(connack, ReasonCode, NChannel)
  250. end;
  251. handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{proto_ver = Ver}) ->
  252. case pipeline([fun validate_in/2,
  253. fun process_alias/2,
  254. fun check_publish/2], Packet, Channel) of
  255. {ok, NPacket, NChannel} ->
  256. process_publish(NPacket, NChannel);
  257. {error, ReasonCode, NChannel} ->
  258. ?LOG(warning, "Cannot publish message to ~s due to ~s",
  259. [Topic, emqx_reason_codes:text(ReasonCode, Ver)]),
  260. case QoS of
  261. ?QOS_0 -> handle_out(puberr, ReasonCode, NChannel);
  262. ?QOS_1 -> handle_out(puback, {PacketId, ReasonCode}, NChannel);
  263. ?QOS_2 -> handle_out(pubrec, {PacketId, ReasonCode}, NChannel)
  264. end
  265. end;
  266. %%TODO: How to handle the ReasonCode?
  267. handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
  268. case emqx_session:puback(PacketId, Session) of
  269. {ok, Publishes, NSession} ->
  270. handle_out(publish, Publishes, Channel#channel{session = NSession});
  271. {ok, NSession} ->
  272. {ok, Channel#channel{session = NSession}};
  273. {error, _NotFound} ->
  274. %%TODO: How to handle NotFound, inc metrics?
  275. {ok, Channel}
  276. end;
  277. %%TODO: How to handle the ReasonCode?
  278. handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
  279. case emqx_session:pubrec(PacketId, Session) of
  280. {ok, NSession} ->
  281. handle_out(pubrel, {PacketId, ?RC_SUCCESS}, Channel#channel{session = NSession});
  282. {error, ReasonCode} ->
  283. handle_out(pubrel, {PacketId, ReasonCode}, Channel)
  284. end;
  285. %%TODO: How to handle the ReasonCode?
  286. handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
  287. case emqx_session:pubrel(PacketId, Session) of
  288. {ok, NSession} ->
  289. handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, Channel#channel{session = NSession});
  290. {error, ReasonCode} ->
  291. handle_out(pubcomp, {PacketId, ReasonCode}, Channel)
  292. end;
  293. handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
  294. case emqx_session:pubcomp(PacketId, Session) of
  295. {ok, Publishes, NSession} ->
  296. handle_out(publish, Publishes, Channel#channel{session = NSession});
  297. {ok, NSession} ->
  298. {ok, Channel#channel{session = NSession}};
  299. {error, _NotFound} ->
  300. %% TODO: how to handle NotFound?
  301. {ok, Channel}
  302. end;
  303. handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
  304. Channel = #channel{client = Client}) ->
  305. case validate_in(Packet, Channel) of
  306. ok ->
  307. TopicFilters1 = [emqx_topic:parse(TopicFilter, SubOpts)
  308. || {TopicFilter, SubOpts} <- TopicFilters],
  309. TopicFilters2 = emqx_hooks:run_fold('client.subscribe',
  310. [Client, Properties],
  311. TopicFilters1),
  312. TopicFilters3 = enrich_subid(Properties, TopicFilters2),
  313. {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
  314. handle_out(suback, {PacketId, ReasonCodes}, NChannel);
  315. {error, ReasonCode} ->
  316. handle_out(disconnect, ReasonCode, Channel)
  317. end;
  318. handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
  319. Channel = #channel{client = Client}) ->
  320. case validate_in(Packet, Channel) of
  321. ok ->
  322. TopicFilters1 = lists:map(fun emqx_topic:parse/1, TopicFilters),
  323. TopicFilters2 = emqx_hooks:run_fold('client.unsubscribe',
  324. [Client, Properties],
  325. TopicFilters1),
  326. {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters2, Channel),
  327. handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
  328. {error, ReasonCode} ->
  329. handle_out(disconnect, ReasonCode, Channel)
  330. end;
  331. handle_in(?PACKET(?PINGREQ), Channel) ->
  332. {ok, ?PACKET(?PINGRESP), Channel};
  333. handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel) ->
  334. %% Clear will msg
  335. {stop, normal, Channel#channel{will_msg = undefined}};
  336. handle_in(?DISCONNECT_PACKET(RC), Channel = #channel{proto_ver = Ver}) ->
  337. {stop, {shutdown, emqx_reason_codes:name(RC, Ver)}, Channel};
  338. handle_in(?AUTH_PACKET(), Channel) ->
  339. %%TODO: implement later.
  340. {ok, Channel};
  341. handle_in(Packet, Channel) ->
  342. ?LOG(error, "Unexpected incoming: ~p", [Packet]),
  343. {stop, {shutdown, unexpected_incoming_packet}, Channel}.
  344. %%--------------------------------------------------------------------
  345. %% Process Connect
  346. %%--------------------------------------------------------------------
  347. process_connect(ConnPkt, Channel) ->
  348. case open_session(ConnPkt, Channel) of
  349. {ok, Session, SP} ->
  350. WillMsg = emqx_packet:will_msg(ConnPkt),
  351. NChannel = Channel#channel{session = Session,
  352. will_msg = WillMsg,
  353. connected = true,
  354. connected_at = os:timestamp()
  355. },
  356. handle_out(connack, {?RC_SUCCESS, sp(SP)}, NChannel);
  357. {error, Reason} ->
  358. %% TODO: Unknown error?
  359. ?LOG(error, "Failed to open session: ~p", [Reason]),
  360. handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
  361. end.
  362. %%--------------------------------------------------------------------
  363. %% Process Publish
  364. %%--------------------------------------------------------------------
  365. %% Process Publish
  366. process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId),
  367. Channel = #channel{client = Client}) ->
  368. Msg = emqx_packet:to_message(Client, Packet),
  369. %%TODO: Improve later.
  370. Msg1 = emqx_message:set_flag(dup, false, Msg),
  371. process_publish(PacketId, mount(Client, Msg1), Channel).
  372. process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
  373. _ = emqx_broker:publish(Msg),
  374. {ok, Channel};
  375. process_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
  376. Deliveries = emqx_broker:publish(Msg),
  377. ReasonCode = emqx_reason_codes:puback(Deliveries),
  378. handle_out(puback, {PacketId, ReasonCode}, Channel);
  379. process_publish(PacketId, Msg = #message{qos = ?QOS_2},
  380. Channel = #channel{session = Session}) ->
  381. case emqx_session:publish(PacketId, Msg, Session) of
  382. {ok, Deliveries, NSession} ->
  383. ReasonCode = emqx_reason_codes:puback(Deliveries),
  384. handle_out(pubrec, {PacketId, ReasonCode},
  385. Channel#channel{session = NSession});
  386. {error, ReasonCode} ->
  387. handle_out(pubrec, {PacketId, ReasonCode}, Channel)
  388. end.
  389. %%--------------------------------------------------------------------
  390. %% Process Subscribe
  391. %%--------------------------------------------------------------------
  392. process_subscribe(TopicFilters, Channel) ->
  393. process_subscribe(TopicFilters, [], Channel).
  394. process_subscribe([], Acc, Channel) ->
  395. {lists:reverse(Acc), Channel};
  396. process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
  397. {RC, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel),
  398. process_subscribe(More, [RC|Acc], NChannel).
  399. do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
  400. Channel = #channel{client = Client, session = Session}) ->
  401. case check_subscribe(TopicFilter, SubOpts, Channel) of
  402. ok -> TopicFilter1 = mount(Client, TopicFilter),
  403. SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
  404. case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
  405. {ok, NSession} ->
  406. {QoS, Channel#channel{session = NSession}};
  407. {error, RC} -> {RC, Channel}
  408. end;
  409. {error, RC} -> {RC, Channel}
  410. end.
  411. %%--------------------------------------------------------------------
  412. %% Process Unsubscribe
  413. %%--------------------------------------------------------------------
  414. process_unsubscribe(TopicFilters, Channel) ->
  415. process_unsubscribe(TopicFilters, [], Channel).
  416. process_unsubscribe([], Acc, Channel) ->
  417. {lists:reverse(Acc), Channel};
  418. process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
  419. {RC, Channel1} = do_unsubscribe(TopicFilter, SubOpts, Channel),
  420. process_unsubscribe(More, [RC|Acc], Channel1).
  421. do_unsubscribe(TopicFilter, _SubOpts, Channel = #channel{client = Client,
  422. session = Session}) ->
  423. case emqx_session:unsubscribe(Client, mount(Client, TopicFilter), Session) of
  424. {ok, NSession} ->
  425. {?RC_SUCCESS, Channel#channel{session = NSession}};
  426. {error, RC} -> {RC, Channel}
  427. end.
  428. %%--------------------------------------------------------------------
  429. %% Handle outgoing packet
  430. %%--------------------------------------------------------------------
  431. handle_out(Deliver = {deliver, _Topic, _Msg},
  432. Channel = #channel{resuming = true, pendings = Pendings}) ->
  433. Delivers = emqx_misc:drain_deliver([Deliver]),
  434. {ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}};
  435. handle_out(Deliver = {deliver, _Topic, _Msg}, Channel = #channel{session = Session}) ->
  436. Delivers = emqx_misc:drain_deliver([Deliver]),
  437. case emqx_session:deliver(Delivers, Session) of
  438. {ok, Publishes, NSession} ->
  439. handle_out(publish, Publishes, Channel#channel{session = NSession});
  440. {ok, NSession} ->
  441. {ok, Channel#channel{session = NSession}}
  442. end;
  443. handle_out({publish, PacketId, Msg}, Channel = #channel{client = Client}) ->
  444. Msg1 = emqx_hooks:run_fold('message.deliver', [Client],
  445. emqx_message:update_expiry(Msg)),
  446. Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)),
  447. {ok, Packet, Channel}.
  448. handle_out(connack, {?RC_SUCCESS, SP},
  449. Channel = #channel{client = Client = #{zone := Zone},
  450. ack_props = AckProps,
  451. alias_maximum = AliasMaximum}) ->
  452. ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(Channel)]),
  453. #{max_packet_size := MaxPktSize,
  454. max_qos_allowed := MaxQoS,
  455. retain_available := Retain,
  456. max_topic_alias := MaxAlias,
  457. shared_subscription := Shared,
  458. wildcard_subscription := Wildcard
  459. } = caps(Channel),
  460. %% Response-Information is so far not set by broker.
  461. %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.
  462. %% According to MQTT 5.0 spec:
  463. %% A common use of this is to pass a globally unique portion of the topic tree which
  464. %% is reserved for this Client for at least the lifetime of its Session.
  465. %% This often cannot just be a random name as both the requesting Client and the
  466. %% responding Client need to be authorized to use it.
  467. %% If we are to support it in the feature, the implementation should be flexible
  468. %% to allow prefixing the response topic based on different ACL config.
  469. %% e.g. prefix by username or client-id, so that unauthorized clients can not
  470. %% subscribe requests or responses that are not intended for them.
  471. AckProps1 = if AckProps == undefined -> #{}; true -> AckProps end,
  472. AckProps2 = AckProps1#{'Retain-Available' => flag(Retain),
  473. 'Maximum-Packet-Size' => MaxPktSize,
  474. 'Topic-Alias-Maximum' => MaxAlias,
  475. 'Wildcard-Subscription-Available' => flag(Wildcard),
  476. 'Subscription-Identifier-Available' => 1,
  477. %'Response-Information' =>
  478. 'Shared-Subscription-Available' => flag(Shared),
  479. 'Maximum-QoS' => MaxQoS
  480. },
  481. AckProps3 = case emqx_zone:get_env(Zone, server_keepalive) of
  482. undefined -> AckProps2;
  483. Keepalive -> AckProps2#{'Server-Keep-Alive' => Keepalive}
  484. end,
  485. AliasMaximum1 = set_property(inbound, MaxAlias, AliasMaximum),
  486. Channel1 = Channel#channel{alias_maximum = AliasMaximum1,
  487. ack_props = undefined
  488. },
  489. {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps3), Channel1};
  490. handle_out(connack, ReasonCode, Channel = #channel{client = Client,
  491. proto_ver = ProtoVer}) ->
  492. ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(Channel)]),
  493. ReasonCode1 = if
  494. ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode;
  495. true -> emqx_reason_codes:compat(connack, ReasonCode)
  496. end,
  497. Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
  498. {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel};
  499. handle_out(publish, Publishes, Channel) ->
  500. Packets = [element(2, handle_out(Publish, Channel)) || Publish <- Publishes],
  501. {ok, Packets, Channel};
  502. %% TODO: How to handle the puberr?
  503. handle_out(puberr, _ReasonCode, Channel) ->
  504. {ok, Channel};
  505. handle_out(puback, {PacketId, ReasonCode}, Channel) ->
  506. {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
  507. handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
  508. {ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel};
  509. handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
  510. {ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel};
  511. handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
  512. {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
  513. handle_out(suback, {PacketId, ReasonCodes},
  514. Channel = #channel{proto_ver = ?MQTT_PROTO_V5}) ->
  515. %% TODO: ACL Deny
  516. {ok, ?SUBACK_PACKET(PacketId, ReasonCodes), Channel};
  517. handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
  518. %% TODO: ACL Deny
  519. ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
  520. {ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel};
  521. handle_out(unsuback, {PacketId, ReasonCodes},
  522. Channel = #channel{proto_ver = ?MQTT_PROTO_V5}) ->
  523. {ok, ?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel};
  524. %% Ignore reason codes if not MQTT5
  525. handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) ->
  526. {ok, ?UNSUBACK_PACKET(PacketId), Channel};
  527. handle_out(disconnect, ReasonCode, Channel = #channel{proto_ver = ?MQTT_PROTO_V5}) ->
  528. Reason = emqx_reason_codes:name(ReasonCode),
  529. {stop, {shutdown, Reason}, ?DISCONNECT_PACKET(ReasonCode), Channel};
  530. handle_out(disconnect, ReasonCode, Channel = #channel{proto_ver = ProtoVer}) ->
  531. {stop, {shutdown, emqx_reason_codes:name(ReasonCode, ProtoVer)}, Channel};
  532. handle_out(Type, Data, Channel) ->
  533. ?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
  534. {ok, Channel}.
  535. %%--------------------------------------------------------------------
  536. %% Handle call
  537. %%--------------------------------------------------------------------
  538. handle_call(Req, Channel) ->
  539. ?LOG(error, "Unexpected call: Req", [Req]),
  540. {ok, ignored, Channel}.
  541. %%--------------------------------------------------------------------
  542. %% Handle cast
  543. %%--------------------------------------------------------------------
  544. handle_cast(discard, Channel) ->
  545. {stop, {shutdown, discarded}, Channel};
  546. handle_cast(Msg, Channel) ->
  547. ?LOG(error, "Unexpected cast: ~p", [Msg]),
  548. {ok, Channel}.
  549. %%--------------------------------------------------------------------
  550. %% Handle Info
  551. %%--------------------------------------------------------------------
  552. -spec(handle_info(Info :: term(), channel())
  553. -> {ok, channel()} | {stop, Reason :: term(), channel()}).
  554. handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) ->
  555. TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
  556. [Client, #{'Internal' => true}],
  557. parse(subscribe, TopicFilters)),
  558. {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
  559. {ok, NChannel};
  560. handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
  561. TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
  562. [Client, #{'Internal' => true}],
  563. parse(unsubscribe, TopicFilters)),
  564. {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
  565. {ok, NChannel};
  566. handle_info(Info, Channel) ->
  567. ?LOG(error, "Unexpected info: ~p~n", [Info]),
  568. {ok, Channel}.
  569. %%--------------------------------------------------------------------
  570. %% Handle timeout
  571. %%--------------------------------------------------------------------
  572. -spec(timeout(reference(), Msg :: term(), channel())
  573. -> {ok, channel()}
  574. | {ok, Result :: term(), channel()}
  575. | {stop, Reason :: term(), channel()}).
  576. timeout(TRef, {emit_stats, Stats}, Channel = #channel{stats_timer = TRef}) ->
  577. ClientId = info(client_id, Channel),
  578. ok = emqx_cm:set_chan_stats(ClientId, Stats),
  579. {ok, Channel#channel{stats_timer = undefined}};
  580. timeout(TRef, retry_deliver, Channel = #channel{%%session = Session,
  581. retry_timer = TRef}) ->
  582. %% case emqx_session:retry(Session) of
  583. %% TODO: ...
  584. {ok, Channel#channel{retry_timer = undefined}};
  585. timeout(_TRef, Msg, Channel) ->
  586. ?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
  587. {ok, Channel}.
  588. %%--------------------------------------------------------------------
  589. %% Ensure timers
  590. %%--------------------------------------------------------------------
  591. ensure_timer(emit_stats, Channel = #channel{stats_timer = undefined,
  592. idle_timeout = IdleTimeout
  593. }) ->
  594. Channel#channel{stats_timer = start_timer(IdleTimeout, emit_stats)};
  595. ensure_timer(retry, Channel = #channel{session = Session,
  596. retry_timer = undefined}) ->
  597. Interval = emqx_session:info(retry_interval, Session),
  598. TRef = emqx_misc:start_timer(Interval, retry_deliver),
  599. Channel#channel{retry_timer = TRef};
  600. %% disabled or timer existed
  601. ensure_timer(_Name, Channel) ->
  602. Channel.
  603. %%--------------------------------------------------------------------
  604. %% Terminate
  605. %%--------------------------------------------------------------------
  606. terminate(normal, #channel{client = Client}) ->
  607. ok = emqx_hooks:run('client.disconnected', [Client, normal]);
  608. terminate(Reason, #channel{client = Client, will_msg = WillMsg}) ->
  609. ok = emqx_hooks:run('client.disconnected', [Client, Reason]),
  610. publish_will_msg(WillMsg).
  611. %%TODO: Improve will msg:)
  612. publish_will_msg(undefined) ->
  613. ok;
  614. publish_will_msg(Msg) ->
  615. emqx_broker:publish(Msg).
  616. %%--------------------------------------------------------------------
  617. %% GC the channel.
  618. %%--------------------------------------------------------------------
  619. gc(_Cnt, _Oct, Channel = #channel{gc_state = undefined}) ->
  620. Channel;
  621. gc(Cnt, Oct, Channel = #channel{gc_state = GCSt}) ->
  622. {Ok, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt),
  623. Ok andalso emqx_metrics:inc('channel.gc.cnt'),
  624. Channel#channel{gc_state = GCSt1}.
  625. %%--------------------------------------------------------------------
  626. %% Validate incoming packet
  627. %%--------------------------------------------------------------------
  628. -spec(validate_in(emqx_types:packet(), channel())
  629. -> ok | {error, emqx_types:reason_code()}).
  630. validate_in(Packet, _Channel) ->
  631. try emqx_packet:validate(Packet) of
  632. true -> ok
  633. catch
  634. error:protocol_error ->
  635. {error, ?RC_PROTOCOL_ERROR};
  636. error:subscription_identifier_invalid ->
  637. {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED};
  638. error:topic_alias_invalid ->
  639. {error, ?RC_TOPIC_ALIAS_INVALID};
  640. error:topic_filters_invalid ->
  641. {error, ?RC_TOPIC_FILTER_INVALID};
  642. error:topic_name_invalid ->
  643. {error, ?RC_TOPIC_FILTER_INVALID};
  644. error:_Reason ->
  645. {error, ?RC_MALFORMED_PACKET}
  646. end.
  647. %%--------------------------------------------------------------------
  648. %% Preprocess properties
  649. %%--------------------------------------------------------------------
  650. process_props(#mqtt_packet_connect{
  651. properties = #{'Topic-Alias-Maximum' := Max}
  652. },
  653. Channel = #channel{alias_maximum = AliasMaximum}) ->
  654. NAliasMaximum = if AliasMaximum == undefined ->
  655. #{outbound => Max};
  656. true -> AliasMaximum#{outbound => Max}
  657. end,
  658. {ok, Channel#channel{alias_maximum = NAliasMaximum}};
  659. process_props(Packet, Channel) ->
  660. {ok, Packet, Channel}.
  661. %%--------------------------------------------------------------------
  662. %% Check connect packet
  663. %%--------------------------------------------------------------------
  664. check_connect(ConnPkt, Channel) ->
  665. pipeline([fun check_proto_ver/2,
  666. fun check_client_id/2,
  667. %%fun check_flapping/2,
  668. fun check_banned/2,
  669. fun check_will_topic/2,
  670. fun check_will_retain/2], ConnPkt, Channel).
  671. check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
  672. proto_name = Name}, _Channel) ->
  673. case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
  674. true -> ok;
  675. false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
  676. end.
  677. %% MQTT3.1 does not allow null clientId
  678. check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
  679. client_id = <<>>
  680. }, _Channel) ->
  681. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
  682. %% Issue#599: Null clientId and clean_start = false
  683. check_client_id(#mqtt_packet_connect{client_id = <<>>,
  684. clean_start = false}, _Channel) ->
  685. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
  686. check_client_id(#mqtt_packet_connect{client_id = <<>>,
  687. clean_start = true}, _Channel) ->
  688. ok;
  689. check_client_id(#mqtt_packet_connect{client_id = ClientId},
  690. #channel{client = #{zone := Zone}}) ->
  691. Len = byte_size(ClientId),
  692. MaxLen = emqx_zone:get_env(Zone, max_clientid_len),
  693. case (1 =< Len) andalso (Len =< MaxLen) of
  694. true -> ok;
  695. false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
  696. end.
  697. %%TODO: check banned...
  698. check_banned(#mqtt_packet_connect{client_id = ClientId,
  699. username = Username},
  700. #channel{client = Client = #{zone := Zone}}) ->
  701. case emqx_zone:get_env(Zone, enable_ban, false) of
  702. true ->
  703. case emqx_banned:check(Client#{client_id => ClientId,
  704. username => Username}) of
  705. true -> {error, ?RC_BANNED};
  706. false -> ok
  707. end;
  708. false -> ok
  709. end.
  710. check_will_topic(#mqtt_packet_connect{will_flag = false}, _Channel) ->
  711. ok;
  712. check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _Channel) ->
  713. try emqx_topic:validate(WillTopic) of
  714. true -> ok
  715. catch error:_Error ->
  716. {error, ?RC_TOPIC_NAME_INVALID}
  717. end.
  718. check_will_retain(#mqtt_packet_connect{will_retain = false}, _Channel) ->
  719. ok;
  720. check_will_retain(#mqtt_packet_connect{will_retain = true},
  721. #channel{client = #{zone := Zone}}) ->
  722. case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
  723. true -> ok;
  724. false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
  725. end.
  726. %%--------------------------------------------------------------------
  727. %% Enrich client
  728. %%--------------------------------------------------------------------
  729. enrich_client(ConnPkt, Channel) ->
  730. case pipeline([fun set_username/2,
  731. fun maybe_use_username_as_clientid/2,
  732. fun maybe_assign_clientid/2,
  733. fun set_rest_client_fields/2], ConnPkt, Channel) of
  734. {ok, NConnPkt, NChannel} -> {ok, NConnPkt, NChannel};
  735. Error -> Error
  736. end.
  737. maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = #{username := undefined}}) ->
  738. {ok, Channel};
  739. maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = Client = #{zone := Zone,
  740. username := Username}}) ->
  741. NClient =
  742. case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
  743. true -> Client#{client_id => Username};
  744. false -> Client
  745. end,
  746. {ok, Channel#channel{client = NClient}}.
  747. maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>},
  748. Channel = #channel{client = Client,
  749. ack_props = AckProps}) ->
  750. ClientId = emqx_guid:to_base62(emqx_guid:gen()),
  751. AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
  752. {ok, Channel#channel{client = Client#{client_id => ClientId}, ack_props = AckProps1}};
  753. maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId},
  754. Channel = #channel{client = Client}) ->
  755. {ok, Channel#channel{client = Client#{client_id => ClientId}}}.
  756. %% Username maybe not undefined if peer_cert_as_username
  757. set_username(#mqtt_packet_connect{username = Username},
  758. Channel = #channel{client = Client = #{username := undefined}}) ->
  759. {ok, Channel#channel{client = Client#{username => Username}}};
  760. set_username(_ConnPkt, Channel) ->
  761. {ok, Channel}.
  762. set_rest_client_fields(#mqtt_packet_connect{is_bridge = IsBridge},
  763. Channel = #channel{client = Client}) ->
  764. {ok, Channel#channel{client = Client#{is_bridge => IsBridge}}}.
  765. %%--------------------------------------------------------------------
  766. %% Auth Connect
  767. %%--------------------------------------------------------------------
  768. auth_connect(#mqtt_packet_connect{client_id = ClientId,
  769. username = Username,
  770. password = Password},
  771. Channel = #channel{client = Client}) ->
  772. case authenticate(Client#{password => Password}) of
  773. {ok, AuthResult} ->
  774. {ok, Channel#channel{client = maps:merge(Client, AuthResult)}};
  775. {error, Reason} ->
  776. ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
  777. [ClientId, Username, Reason]),
  778. {error, emqx_reason_codes:connack_error(Reason)}
  779. end.
  780. %%--------------------------------------------------------------------
  781. %% Open session
  782. %%--------------------------------------------------------------------
  783. open_session(#mqtt_packet_connect{clean_start = CleanStart,
  784. properties = ConnProps},
  785. #channel{client = Client = #{zone := Zone}}) ->
  786. MaxInflight = get_property('Receive-Maximum', ConnProps,
  787. emqx_zone:get_env(Zone, max_inflight, 65535)),
  788. Interval = get_property('Session-Expiry-Interval', ConnProps,
  789. emqx_zone:get_env(Zone, session_expiry_interval, 0)),
  790. emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight,
  791. expiry_interval => Interval
  792. }).
  793. %%--------------------------------------------------------------------
  794. %% Process publish message: Client -> Broker
  795. %%--------------------------------------------------------------------
  796. process_alias(Packet = #mqtt_packet{
  797. variable = #mqtt_packet_publish{topic_name = <<>>,
  798. properties = #{'Topic-Alias' := AliasId}
  799. } = Publish
  800. }, Channel = #channel{topic_aliases = Aliases}) ->
  801. case find_alias(AliasId, Aliases) of
  802. {ok, Topic} ->
  803. {ok, Packet#mqtt_packet{
  804. variable = Publish#mqtt_packet_publish{
  805. topic_name = Topic}}, Channel};
  806. false -> {error, ?RC_PROTOCOL_ERROR}
  807. end;
  808. process_alias(#mqtt_packet{
  809. variable = #mqtt_packet_publish{topic_name = Topic,
  810. properties = #{'Topic-Alias' := AliasId}
  811. }
  812. }, Channel = #channel{topic_aliases = Aliases}) ->
  813. {ok, Channel#channel{topic_aliases = save_alias(AliasId, Topic, Aliases)}};
  814. process_alias(_Packet, Channel) ->
  815. {ok, Channel}.
  816. find_alias(_AliasId, undefined) ->
  817. false;
  818. find_alias(AliasId, Aliases) ->
  819. maps:find(AliasId, Aliases).
  820. save_alias(AliasId, Topic, undefined) ->
  821. #{AliasId => Topic};
  822. save_alias(AliasId, Topic, Aliases) ->
  823. maps:put(AliasId, Topic, Aliases).
  824. %% Check Publish
  825. check_publish(Packet, Channel) ->
  826. pipeline([fun check_pub_acl/2,
  827. fun check_pub_alias/2,
  828. fun check_pub_caps/2], Packet, Channel).
  829. %% Check Pub ACL
  830. check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
  831. #channel{client = Client}) ->
  832. case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of
  833. false -> ok;
  834. allow -> ok;
  835. deny -> {error, ?RC_NOT_AUTHORIZED}
  836. end.
  837. %% Check Pub Alias
  838. check_pub_alias(#mqtt_packet{
  839. variable = #mqtt_packet_publish{
  840. properties = #{'Topic-Alias' := AliasId}
  841. }
  842. },
  843. #channel{alias_maximum = Limits}) ->
  844. case (Limits == undefined)
  845. orelse (Max = maps:get(inbound, Limits, 0)) == 0
  846. orelse (AliasId > Max) of
  847. false -> ok;
  848. true -> {error, ?RC_TOPIC_ALIAS_INVALID}
  849. end;
  850. check_pub_alias(_Packet, _Channel) -> ok.
  851. %% Check Pub Caps
  852. check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
  853. retain = Retain
  854. }
  855. },
  856. #channel{client = #{zone := Zone}}) ->
  857. emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
  858. %% Check Sub
  859. check_subscribe(TopicFilter, SubOpts, Channel) ->
  860. case check_sub_acl(TopicFilter, Channel) of
  861. allow -> check_sub_caps(TopicFilter, SubOpts, Channel);
  862. deny -> {error, ?RC_NOT_AUTHORIZED}
  863. end.
  864. %% Check Sub ACL
  865. check_sub_acl(TopicFilter, #channel{client = Client}) ->
  866. case is_acl_enabled(Client) andalso
  867. check_acl(Client, subscribe, TopicFilter) of
  868. false -> allow;
  869. Result -> Result
  870. end.
  871. %% Check Sub Caps
  872. check_sub_caps(TopicFilter, SubOpts, #channel{client = #{zone := Zone}}) ->
  873. emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
  874. enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
  875. [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
  876. enrich_subid(_Properties, TopicFilters) ->
  877. TopicFilters.
  878. enrich_subopts(SubOpts, #channel{proto_ver = ?MQTT_PROTO_V5}) ->
  879. SubOpts;
  880. enrich_subopts(SubOpts, #channel{client = #{zone := Zone, is_bridge := IsBridge}}) ->
  881. Rap = flag(IsBridge),
  882. Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)),
  883. SubOpts#{rap => Rap, nl => Nl}.
  884. %%--------------------------------------------------------------------
  885. %% Is ACL enabled?
  886. %%--------------------------------------------------------------------
  887. is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
  888. (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true).
  889. %%--------------------------------------------------------------------
  890. %% Parse Topic Filters
  891. %%--------------------------------------------------------------------
  892. parse(subscribe, TopicFilters) ->
  893. [emqx_topic:parse(TopicFilter, SubOpts) || {TopicFilter, SubOpts} <- TopicFilters];
  894. parse(unsubscribe, TopicFilters) ->
  895. lists:map(fun emqx_topic:parse/1, TopicFilters).
  896. %%--------------------------------------------------------------------
  897. %% Mount/Unmount
  898. %%--------------------------------------------------------------------
  899. mount(Client = #{mountpoint := MountPoint}, TopicOrMsg) ->
  900. emqx_mountpoint:mount(
  901. emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg).
  902. unmount(Client = #{mountpoint := MountPoint}, TopicOrMsg) ->
  903. emqx_mountpoint:unmount(
  904. emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg).
  905. %%--------------------------------------------------------------------
  906. %% Pipeline
  907. %%--------------------------------------------------------------------
  908. pipeline([], Packet, Channel) ->
  909. {ok, Packet, Channel};
  910. pipeline([Fun|More], Packet, Channel) ->
  911. case Fun(Packet, Channel) of
  912. ok -> pipeline(More, Packet, Channel);
  913. {ok, NChannel} ->
  914. pipeline(More, Packet, NChannel);
  915. {ok, NPacket, NChannel} ->
  916. pipeline(More, NPacket, NChannel);
  917. {error, ReasonCode} ->
  918. {error, ReasonCode, Channel};
  919. {error, ReasonCode, NChannel} ->
  920. {error, ReasonCode, NChannel}
  921. end.
  922. %%--------------------------------------------------------------------
  923. %% Helper functions
  924. %%--------------------------------------------------------------------
  925. set_property(Name, Value, ?NO_PROPS) ->
  926. #{Name => Value};
  927. set_property(Name, Value, Props) ->
  928. Props#{Name => Value}.
  929. get_property(_Name, undefined, Default) ->
  930. Default;
  931. get_property(Name, Props, Default) ->
  932. maps:get(Name, Props, Default).
  933. sp(true) -> 1;
  934. sp(false) -> 0.
  935. flag(true) -> 1;
  936. flag(false) -> 0.