emqx_protocol.erl 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015
  1. %% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  2. %%
  3. %% Licensed under the Apache License, Version 2.0 (the "License");
  4. %% you may not use this file except in compliance with the License.
  5. %% You may obtain a copy of the License at
  6. %%
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. -module(emqx_protocol).
  15. -include("emqx.hrl").
  16. -include("emqx_mqtt.hrl").
  17. -include("logger.hrl").
  18. -export([ init/2
  19. , info/1
  20. , attrs/1
  21. , attr/2
  22. , caps/1
  23. , stats/1
  24. , client_id/1
  25. , credentials/1
  26. , parser/1
  27. , session/1
  28. , received/2
  29. , process/2
  30. , deliver/2
  31. , send/2
  32. , terminate/2
  33. ]).
  34. -export_type([state/0]).
  35. -record(pstate, {
  36. zone,
  37. sendfun,
  38. peername,
  39. peercert,
  40. proto_ver,
  41. proto_name,
  42. client_id,
  43. is_assigned,
  44. conn_pid,
  45. conn_props,
  46. ack_props,
  47. username,
  48. session,
  49. clean_start,
  50. topic_aliases,
  51. packet_size,
  52. will_topic,
  53. will_msg,
  54. keepalive,
  55. is_bridge,
  56. enable_ban,
  57. enable_acl,
  58. enable_flapping_detect,
  59. acl_deny_action,
  60. recv_stats,
  61. send_stats,
  62. connected,
  63. connected_at,
  64. ignore_loop,
  65. topic_alias_maximum,
  66. conn_mod,
  67. credentials,
  68. ws_cookie
  69. }).
  70. -opaque(state() :: #pstate{}).
  71. -ifdef(TEST).
  72. -compile(export_all).
  73. -compile(nowarn_export_all).
  74. -endif.
  75. -define(NO_PROPS, undefined).
  76. %%------------------------------------------------------------------------------
  77. %% Init
  78. %%------------------------------------------------------------------------------
  79. -spec(init(map(), list()) -> state()).
  80. init(SocketOpts = #{ peername := Peername
  81. , peercert := Peercert
  82. , sendfun := SendFun}, Options) ->
  83. Zone = proplists:get_value(zone, Options),
  84. #pstate{zone = Zone,
  85. sendfun = SendFun,
  86. peername = Peername,
  87. peercert = Peercert,
  88. proto_ver = ?MQTT_PROTO_V4,
  89. proto_name = <<"MQTT">>,
  90. client_id = <<>>,
  91. is_assigned = false,
  92. conn_pid = self(),
  93. username = init_username(Peercert, Options),
  94. clean_start = false,
  95. topic_aliases = #{},
  96. packet_size = emqx_zone:get_env(Zone, max_packet_size),
  97. is_bridge = false,
  98. enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
  99. enable_acl = emqx_zone:get_env(Zone, enable_acl),
  100. enable_flapping_detect = emqx_zone:get_env(Zone, enable_flapping_detect, false),
  101. acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore),
  102. recv_stats = #{msg => 0, pkt => 0},
  103. send_stats = #{msg => 0, pkt => 0},
  104. connected = false,
  105. ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
  106. topic_alias_maximum = #{to_client => 0, from_client => 0},
  107. conn_mod = maps:get(conn_mod, SocketOpts, undefined),
  108. credentials = #{},
  109. ws_cookie = maps:get(ws_cookie, SocketOpts, undefined)}.
  110. init_username(Peercert, Options) ->
  111. case proplists:get_value(peer_cert_as_username, Options) of
  112. cn -> esockd_peercert:common_name(Peercert);
  113. dn -> esockd_peercert:subject(Peercert);
  114. crt -> Peercert;
  115. _ -> undefined
  116. end.
  117. set_username(Username, PState = #pstate{username = undefined}) ->
  118. PState#pstate{username = Username};
  119. set_username(_Username, PState) ->
  120. PState.
  121. %%------------------------------------------------------------------------------
  122. %% API
  123. %%------------------------------------------------------------------------------
  124. info(PState = #pstate{conn_props = ConnProps,
  125. ack_props = AckProps,
  126. session = Session,
  127. topic_aliases = Aliases,
  128. will_msg = WillMsg,
  129. enable_acl = EnableAcl}) ->
  130. maps:merge(attrs(PState), #{conn_props => ConnProps,
  131. ack_props => AckProps,
  132. session => Session,
  133. topic_aliases => Aliases,
  134. will_msg => WillMsg,
  135. enable_acl => EnableAcl
  136. }).
  137. attrs(#pstate{zone = Zone,
  138. client_id = ClientId,
  139. username = Username,
  140. peername = Peername,
  141. peercert = Peercert,
  142. clean_start = CleanStart,
  143. proto_ver = ProtoVer,
  144. proto_name = ProtoName,
  145. keepalive = Keepalive,
  146. is_bridge = IsBridge,
  147. connected_at = ConnectedAt,
  148. conn_mod = ConnMod,
  149. credentials = Credentials}) ->
  150. #{ zone => Zone
  151. , client_id => ClientId
  152. , username => Username
  153. , peername => Peername
  154. , peercert => Peercert
  155. , proto_ver => ProtoVer
  156. , proto_name => ProtoName
  157. , clean_start => CleanStart
  158. , keepalive => Keepalive
  159. , is_bridge => IsBridge
  160. , connected_at => ConnectedAt
  161. , conn_mod => ConnMod
  162. , credentials => Credentials
  163. }.
  164. attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
  165. get_property('Receive-Maximum', ConnProps, 65535);
  166. attr(max_inflight, #pstate{zone = Zone}) ->
  167. emqx_zone:get_env(Zone, max_inflight, 65535);
  168. attr(expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
  169. get_property('Session-Expiry-Interval', ConnProps, 0);
  170. attr(expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}) ->
  171. case CleanStart of
  172. true -> 0;
  173. false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
  174. end;
  175. attr(topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
  176. get_property('Topic-Alias-Maximum', ConnProps, 0);
  177. attr(topic_alias_maximum, #pstate{zone = Zone}) ->
  178. emqx_zone:get_env(Zone, max_topic_alias, 0);
  179. attr(Name, PState) ->
  180. Attrs = lists:zip(record_info(fields, pstate), tl(tuple_to_list(PState))),
  181. case lists:keyfind(Name, 1, Attrs) of
  182. {_, Value} -> Value;
  183. false -> undefined
  184. end.
  185. caps(#pstate{zone = Zone}) ->
  186. emqx_mqtt_caps:get_caps(Zone).
  187. client_id(#pstate{client_id = ClientId}) ->
  188. ClientId.
  189. credentials(#pstate{zone = Zone,
  190. client_id = ClientId,
  191. username = Username,
  192. peername = Peername,
  193. peercert = Peercert,
  194. ws_cookie = WsCookie}) ->
  195. with_cert(#{zone => Zone,
  196. client_id => ClientId,
  197. username => Username,
  198. peername => Peername,
  199. ws_cookie => WsCookie,
  200. mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert).
  201. with_cert(Credentials, undefined) -> Credentials;
  202. with_cert(Credentials, Peercert) ->
  203. Credentials#{dn => esockd_peercert:subject(Peercert),
  204. cn => esockd_peercert:common_name(Peercert)}.
  205. keepsafety(Credentials) ->
  206. maps:filter(fun(password, _) -> false;
  207. (dn, _) -> false;
  208. (cn, _) -> false;
  209. (_, _) -> true end, Credentials).
  210. stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg},
  211. send_stats = #{pkt := SendPkt, msg := SendMsg}}) ->
  212. [{recv_pkt, RecvPkt},
  213. {recv_msg, RecvMsg},
  214. {send_pkt, SendPkt},
  215. {send_msg, SendMsg}].
  216. session(#pstate{session = SPid}) ->
  217. SPid.
  218. parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
  219. emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}).
  220. %%------------------------------------------------------------------------------
  221. %% Packet Received
  222. %%------------------------------------------------------------------------------
  223. set_protover(?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = ProtoVer}), PState) ->
  224. PState#pstate{proto_ver = ProtoVer};
  225. set_protover(_Packet, PState) ->
  226. PState.
  227. -spec(received(emqx_mqtt_types:packet(), state())
  228. -> {ok, state()}
  229. | {error, term()}
  230. | {error, term(), state()}
  231. | {stop, term(), state()}).
  232. received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
  233. {error, proto_not_connected, PState};
  234. received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
  235. {error, proto_unexpected_connect, PState};
  236. received(Packet = ?PACKET(Type), PState) ->
  237. trace(recv, Packet),
  238. PState1 = set_protover(Packet, PState),
  239. try emqx_packet:validate(Packet) of
  240. true ->
  241. case preprocess_properties(Packet, PState1) of
  242. {ok, Packet1, PState2} ->
  243. process(Packet1, inc_stats(recv, Type, PState2));
  244. {error, ReasonCode} ->
  245. {error, ReasonCode, PState1}
  246. end
  247. catch
  248. error:protocol_error ->
  249. deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState1),
  250. {error, protocol_error, PState};
  251. error:subscription_identifier_invalid ->
  252. deliver({disconnect, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}, PState1),
  253. {error, subscription_identifier_invalid, PState1};
  254. error:topic_alias_invalid ->
  255. deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState1),
  256. {error, topic_alias_invalid, PState1};
  257. error:topic_filters_invalid ->
  258. deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1),
  259. {error, topic_filters_invalid, PState1};
  260. error:topic_name_invalid ->
  261. deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1),
  262. {error, topic_filters_invalid, PState1};
  263. error:Reason ->
  264. deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1),
  265. {error, Reason, PState1}
  266. end.
  267. %%------------------------------------------------------------------------------
  268. %% Preprocess MQTT Properties
  269. %%------------------------------------------------------------------------------
  270. preprocess_properties(Packet = #mqtt_packet{
  271. variable = #mqtt_packet_connect{
  272. properties = #{'Topic-Alias-Maximum' := ToClient}
  273. }
  274. },
  275. PState = #pstate{topic_alias_maximum = TopicAliasMaximum}) ->
  276. {ok, Packet, PState#pstate{topic_alias_maximum = TopicAliasMaximum#{to_client => ToClient}}};
  277. %% Subscription Identifier
  278. preprocess_properties(Packet = #mqtt_packet{
  279. variable = Subscribe = #mqtt_packet_subscribe{
  280. properties = #{'Subscription-Identifier' := SubId},
  281. topic_filters = TopicFilters
  282. }
  283. },
  284. PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) ->
  285. TopicFilters1 = [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters],
  286. {ok, Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState};
  287. %% Topic Alias Mapping
  288. preprocess_properties(#mqtt_packet{
  289. variable = #mqtt_packet_publish{
  290. properties = #{'Topic-Alias' := 0}}
  291. },
  292. PState) ->
  293. deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
  294. {error, ?RC_TOPIC_ALIAS_INVALID};
  295. preprocess_properties(Packet = #mqtt_packet{
  296. variable = Publish = #mqtt_packet_publish{
  297. topic_name = <<>>,
  298. properties = #{'Topic-Alias' := AliasId}}
  299. },
  300. PState = #pstate{proto_ver = ?MQTT_PROTO_V5,
  301. topic_aliases = Aliases,
  302. topic_alias_maximum = #{from_client := TopicAliasMaximum}}) ->
  303. case AliasId =< TopicAliasMaximum of
  304. true ->
  305. {ok, Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{
  306. topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState};
  307. false ->
  308. deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
  309. {error, ?RC_TOPIC_ALIAS_INVALID}
  310. end;
  311. preprocess_properties(Packet = #mqtt_packet{
  312. variable = #mqtt_packet_publish{
  313. topic_name = Topic,
  314. properties = #{'Topic-Alias' := AliasId}}
  315. },
  316. PState = #pstate{proto_ver = ?MQTT_PROTO_V5,
  317. topic_aliases = Aliases,
  318. topic_alias_maximum = #{from_client := TopicAliasMaximum}}) ->
  319. case AliasId =< TopicAliasMaximum of
  320. true ->
  321. {ok, Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}};
  322. false ->
  323. deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
  324. {error, ?RC_TOPIC_ALIAS_INVALID}
  325. end;
  326. preprocess_properties(Packet, PState) ->
  327. {ok, Packet, PState}.
  328. %%------------------------------------------------------------------------------
  329. %% Process MQTT Packet
  330. %%------------------------------------------------------------------------------
  331. process(?CONNECT_PACKET(
  332. #mqtt_packet_connect{proto_name = ProtoName,
  333. proto_ver = ProtoVer,
  334. is_bridge = IsBridge,
  335. clean_start = CleanStart,
  336. keepalive = Keepalive,
  337. properties = ConnProps,
  338. client_id = ClientId,
  339. username = Username,
  340. password = Password} = ConnPkt), PState) ->
  341. NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState),
  342. emqx_logger:set_metadata_client_id(NewClientId),
  343. %% TODO: Mountpoint...
  344. %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
  345. PState0 = set_username(Username,
  346. PState#pstate{client_id = NewClientId,
  347. proto_ver = ProtoVer,
  348. proto_name = ProtoName,
  349. clean_start = CleanStart,
  350. keepalive = Keepalive,
  351. conn_props = ConnProps,
  352. is_bridge = IsBridge,
  353. connected_at = os:timestamp()}),
  354. Credentials = credentials(PState0),
  355. PState1 = PState0#pstate{credentials = Credentials},
  356. connack(
  357. case check_connect(ConnPkt, PState1) of
  358. ok ->
  359. case emqx_access_control:authenticate(Credentials#{password => Password}) of
  360. {ok, Credentials0} ->
  361. PState3 = maybe_assign_client_id(PState1),
  362. emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
  363. %% Open session
  364. SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
  365. case try_open_session(SessAttrs, PState3) of
  366. {ok, SPid, SP} ->
  367. PState4 = PState3#pstate{session = SPid, connected = true,
  368. credentials = keepsafety(Credentials0)},
  369. ok = emqx_cm:register_connection(client_id(PState4)),
  370. true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
  371. %% Start keepalive
  372. start_keepalive(Keepalive, PState4),
  373. %% Success
  374. {?RC_SUCCESS, SP, PState4};
  375. {error, Error} ->
  376. ?LOG(error, "[Protocol] Failed to open session: ~p", [Error]),
  377. {?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}}
  378. end;
  379. {error, Reason} ->
  380. ?LOG(warning, "[Protocol] Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
  381. {emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}}
  382. end;
  383. {error, ReasonCode} ->
  384. {ReasonCode, PState1}
  385. end);
  386. process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
  387. case check_publish(Packet, PState) of
  388. ok ->
  389. do_publish(Packet, PState);
  390. {error, ReasonCode} ->
  391. ?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s",
  392. [Topic, emqx_reason_codes:text(ReasonCode)]),
  393. do_acl_deny_action(Packet, ReasonCode, PState)
  394. end;
  395. process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
  396. case check_publish(Packet, PState) of
  397. ok ->
  398. do_publish(Packet, PState);
  399. {error, ReasonCode} ->
  400. ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s",
  401. [Topic, emqx_reason_codes:text(ReasonCode)]),
  402. case deliver({puback, PacketId, ReasonCode}, PState) of
  403. {ok, PState1} ->
  404. do_acl_deny_action(Packet, ReasonCode, PState1);
  405. Error -> Error
  406. end
  407. end;
  408. process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
  409. case check_publish(Packet, PState) of
  410. ok ->
  411. do_publish(Packet, PState);
  412. {error, ReasonCode} ->
  413. ?LOG(warning, "[Protocol] Cannot publish qos2 message to ~s for ~s",
  414. [Topic, emqx_reason_codes:text(ReasonCode)]),
  415. case deliver({pubrec, PacketId, ReasonCode}, PState) of
  416. {ok, PState1} ->
  417. do_acl_deny_action(Packet, ReasonCode, PState1);
  418. Error -> Error
  419. end
  420. end;
  421. process(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
  422. {ok = emqx_session:puback(SPid, PacketId, ReasonCode), PState};
  423. process(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
  424. case emqx_session:pubrec(SPid, PacketId, ReasonCode) of
  425. ok ->
  426. send(?PUBREL_PACKET(PacketId), PState);
  427. {error, NotFound} ->
  428. send(?PUBREL_PACKET(PacketId, NotFound), PState)
  429. end;
  430. process(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
  431. case emqx_session:pubrel(SPid, PacketId, ReasonCode) of
  432. ok ->
  433. send(?PUBCOMP_PACKET(PacketId), PState);
  434. {error, NotFound} ->
  435. send(?PUBCOMP_PACKET(PacketId, NotFound), PState)
  436. end;
  437. process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
  438. {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
  439. process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
  440. PState = #pstate{session = SPid, credentials = Credentials}) ->
  441. case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
  442. {ok, TopicFilters} ->
  443. TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
  444. TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0),
  445. ok = emqx_session:subscribe(SPid, PacketId, Properties, TopicFilters1),
  446. {ok, PState};
  447. {error, TopicFilters} ->
  448. {SubTopics, ReasonCodes} =
  449. lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) ->
  450. {[Topic|Topics], [?RC_IMPLEMENTATION_SPECIFIC_ERROR | Codes]};
  451. ({Topic, #{rc := Code}}, {Topics, Codes}) ->
  452. {[Topic|Topics], [Code|Codes]}
  453. end, {[], []}, TopicFilters),
  454. ?LOG(warning, "[Protocol] Cannot subscribe ~p for ~p",
  455. [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
  456. case deliver({suback, PacketId, ReasonCodes}, PState) of
  457. {ok, PState1} ->
  458. do_acl_deny_action(Packet, ReasonCodes, PState1);
  459. Error ->
  460. Error
  461. end
  462. end;
  463. process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
  464. PState = #pstate{session = SPid, credentials = Credentials}) ->
  465. TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials],
  466. parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
  467. ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
  468. emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters)),
  469. {ok, PState};
  470. process(?PACKET(?PINGREQ), PState) ->
  471. send(?PACKET(?PINGRESP), PState);
  472. process(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}),
  473. PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) ->
  474. case Interval =/= 0 andalso OldInterval =:= 0 of
  475. true ->
  476. deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
  477. {error, protocol_error, PState#pstate{will_msg = undefined}};
  478. false ->
  479. emqx_session:update_expiry_interval(SPid, Interval),
  480. %% Clean willmsg
  481. {stop, normal, PState#pstate{will_msg = undefined}}
  482. end;
  483. process(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
  484. {stop, normal, PState#pstate{will_msg = undefined}};
  485. process(?DISCONNECT_PACKET(_), PState) ->
  486. {stop, {shutdown, abnormal_disconnet}, PState}.
  487. %%------------------------------------------------------------------------------
  488. %% ConnAck --> Client
  489. %%------------------------------------------------------------------------------
  490. connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) ->
  491. ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]),
  492. deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
  493. connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) ->
  494. ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]),
  495. [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer),
  496. _ = deliver({connack, ReasonCode1}, PState),
  497. {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
  498. %%------------------------------------------------------------------------------
  499. %% Publish Message -> Broker
  500. %%------------------------------------------------------------------------------
  501. do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
  502. PState = #pstate{session = SPid, credentials = Credentials}) ->
  503. Msg = emqx_mountpoint:mount(mountpoint(Credentials),
  504. emqx_packet:to_message(Credentials, Packet)),
  505. puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
  506. %%------------------------------------------------------------------------------
  507. %% Puback -> Client
  508. %%------------------------------------------------------------------------------
  509. puback(?QOS_0, _PacketId, _Result, PState) ->
  510. {ok, PState};
  511. puback(?QOS_1, PacketId, {ok, []}, PState) ->
  512. deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
  513. %%TODO: calc the deliver count?
  514. puback(?QOS_1, PacketId, {ok, _Result}, PState) ->
  515. deliver({puback, PacketId, ?RC_SUCCESS}, PState);
  516. puback(?QOS_1, PacketId, {error, ReasonCode}, PState) ->
  517. deliver({puback, PacketId, ReasonCode}, PState);
  518. puback(?QOS_2, PacketId, {ok, []}, PState) ->
  519. deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
  520. puback(?QOS_2, PacketId, {ok, _Result}, PState) ->
  521. deliver({pubrec, PacketId, ?RC_SUCCESS}, PState);
  522. puback(?QOS_2, PacketId, {error, ReasonCode}, PState) ->
  523. deliver({pubrec, PacketId, ReasonCode}, PState).
  524. %%------------------------------------------------------------------------------
  525. %% Deliver Packet -> Client
  526. %%------------------------------------------------------------------------------
  527. -spec(deliver(list(tuple()) | tuple(), state()) -> {ok, state()} | {error, term()}).
  528. deliver([], PState) ->
  529. {ok, PState};
  530. deliver([Pub|More], PState) ->
  531. case deliver(Pub, PState) of
  532. {ok, PState1} ->
  533. deliver(More, PState1);
  534. {error, _} = Error ->
  535. Error
  536. end;
  537. deliver({connack, ReasonCode}, PState) ->
  538. send(?CONNACK_PACKET(ReasonCode), PState);
  539. deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
  540. proto_ver = ?MQTT_PROTO_V5,
  541. client_id = ClientId,
  542. is_assigned = IsAssigned,
  543. topic_alias_maximum = TopicAliasMaximum}) ->
  544. #{max_packet_size := MaxPktSize,
  545. max_qos_allowed := MaxQoS,
  546. mqtt_retain_available := Retain,
  547. max_topic_alias := MaxAlias,
  548. mqtt_shared_subscription := Shared,
  549. mqtt_wildcard_subscription := Wildcard} = caps(PState),
  550. %% Response-Information is so far not set by broker.
  551. %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.
  552. %% According to MQTT 5.0 spec:
  553. %% A common use of this is to pass a globally unique portion of the topic tree which
  554. %% is reserved for this Client for at least the lifetime of its Session.
  555. %% This often cannot just be a random name as both the requesting Client and the
  556. %% responding Client need to be authorized to use it.
  557. %% If we are to support it in the feature, the implementation should be flexible
  558. %% to allow prefixing the response topic based on different ACL config.
  559. %% e.g. prefix by username or client-id, so that unauthorized clients can not
  560. %% subscribe requests or responses that are not intended for them.
  561. Props = #{'Retain-Available' => flag(Retain),
  562. 'Maximum-Packet-Size' => MaxPktSize,
  563. 'Topic-Alias-Maximum' => MaxAlias,
  564. 'Wildcard-Subscription-Available' => flag(Wildcard),
  565. 'Subscription-Identifier-Available' => 1,
  566. %'Response-Information' =>
  567. 'Shared-Subscription-Available' => flag(Shared)},
  568. Props1 = if
  569. MaxQoS =:= ?QOS_2 ->
  570. Props;
  571. true ->
  572. maps:put('Maximum-QoS', MaxQoS, Props)
  573. end,
  574. Props2 = if IsAssigned ->
  575. Props1#{'Assigned-Client-Identifier' => ClientId};
  576. true -> Props1
  577. end,
  578. Props3 = case emqx_zone:get_env(Zone, server_keepalive) of
  579. undefined -> Props2;
  580. Keepalive -> Props2#{'Server-Keep-Alive' => Keepalive}
  581. end,
  582. PState1 = PState#pstate{topic_alias_maximum = TopicAliasMaximum#{from_client => MaxAlias}},
  583. send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState1);
  584. deliver({connack, ReasonCode, SP}, PState) ->
  585. send(?CONNACK_PACKET(ReasonCode, SP), PState);
  586. deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) ->
  587. Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg),
  588. Msg1 = emqx_message:update_expiry(Msg0),
  589. Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1),
  590. send(emqx_packet:from_message(PacketId, Msg2), PState);
  591. deliver({puback, PacketId, ReasonCode}, PState) ->
  592. send(?PUBACK_PACKET(PacketId, ReasonCode), PState);
  593. deliver({pubrel, PacketId}, PState) ->
  594. send(?PUBREL_PACKET(PacketId), PState);
  595. deliver({pubrec, PacketId, ReasonCode}, PState) ->
  596. send(?PUBREC_PACKET(PacketId, ReasonCode), PState);
  597. deliver({suback, PacketId, ReasonCodes}, PState = #pstate{proto_ver = ProtoVer}) ->
  598. send(?SUBACK_PACKET(PacketId, reason_codes_compat(suback, ReasonCodes, ProtoVer)), PState);
  599. deliver({unsuback, PacketId, ReasonCodes}, PState = #pstate{proto_ver = ProtoVer}) ->
  600. send(?UNSUBACK_PACKET(PacketId, reason_codes_compat(unsuback, ReasonCodes, ProtoVer)), PState);
  601. %% Deliver a disconnect for mqtt 5.0
  602. deliver({disconnect, ReasonCode}, PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) ->
  603. send(?DISCONNECT_PACKET(ReasonCode), PState);
  604. deliver({disconnect, _ReasonCode}, PState) ->
  605. {ok, PState}.
  606. %%------------------------------------------------------------------------------
  607. %% Send Packet to Client
  608. -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
  609. send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) ->
  610. case Send(Packet, #{version => Ver}) of
  611. {ok, Data} ->
  612. trace(send, Packet),
  613. emqx_metrics:sent(Packet),
  614. emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),
  615. {ok, inc_stats(send, Type, PState)};
  616. {error, Reason} ->
  617. {error, Reason}
  618. end.
  619. %%------------------------------------------------------------------------------
  620. %% Maybe use username replace client id
  621. maybe_use_username_as_clientid(ClientId, undefined, _PState) ->
  622. ClientId;
  623. maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) ->
  624. case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
  625. true -> Username;
  626. false -> ClientId
  627. end.
  628. %%------------------------------------------------------------------------------
  629. %% Assign a clientId
  630. maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) ->
  631. ClientId = emqx_guid:to_base62(emqx_guid:gen()),
  632. AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
  633. PState#pstate{client_id = ClientId, is_assigned = true, ack_props = AckProps1};
  634. maybe_assign_client_id(PState) ->
  635. PState.
  636. try_open_session(SessAttrs, PState = #pstate{zone = Zone,
  637. client_id = ClientId,
  638. conn_pid = ConnPid,
  639. username = Username,
  640. clean_start = CleanStart}) ->
  641. case emqx_sm:open_session(
  642. maps:merge(#{zone => Zone,
  643. client_id => ClientId,
  644. conn_pid => ConnPid,
  645. username => Username,
  646. clean_start => CleanStart,
  647. max_inflight => attr(max_inflight, PState),
  648. expiry_interval => attr(expiry_interval, PState),
  649. topic_alias_maximum => attr(topic_alias_maximum, PState)},
  650. SessAttrs)) of
  651. {ok, SPid} ->
  652. {ok, SPid, false};
  653. Other -> Other
  654. end.
  655. set_property(Name, Value, ?NO_PROPS) ->
  656. #{Name => Value};
  657. set_property(Name, Value, Props) ->
  658. Props#{Name => Value}.
  659. get_property(_Name, undefined, Default) ->
  660. Default;
  661. get_property(Name, Props, Default) ->
  662. maps:get(Name, Props, Default).
  663. make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer,
  664. will_props = WillProps} = ConnPkt) ->
  665. emqx_packet:will_msg(
  666. case ProtoVer of
  667. ?MQTT_PROTO_V5 ->
  668. WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
  669. ConnPkt#mqtt_packet_connect{
  670. will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
  671. _ ->
  672. ConnPkt
  673. end).
  674. %%------------------------------------------------------------------------------
  675. %% Check Packet
  676. %%------------------------------------------------------------------------------
  677. check_connect(Packet, PState) ->
  678. run_check_steps([fun check_proto_ver/2,
  679. fun check_client_id/2,
  680. fun check_flapping/2,
  681. fun check_banned/2,
  682. fun check_will_topic/2], Packet, PState).
  683. check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
  684. proto_name = Name}, _PState) ->
  685. case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
  686. true -> ok;
  687. false -> {error, ?RC_PROTOCOL_ERROR}
  688. end.
  689. %% MQTT3.1 does not allow null clientId
  690. check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
  691. client_id = <<>>}, _PState) ->
  692. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
  693. %% Issue#599: Null clientId and clean_start = false
  694. check_client_id(#mqtt_packet_connect{client_id = <<>>,
  695. clean_start = false}, _PState) ->
  696. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
  697. check_client_id(#mqtt_packet_connect{client_id = <<>>,
  698. clean_start = true}, _PState) ->
  699. ok;
  700. check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}) ->
  701. Len = byte_size(ClientId),
  702. MaxLen = emqx_zone:get_env(Zone, max_clientid_len),
  703. case (1 =< Len) andalso (Len =< MaxLen) of
  704. true -> ok;
  705. false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
  706. end.
  707. check_flapping(#mqtt_packet_connect{}, PState) ->
  708. do_flapping_detect(connect, PState).
  709. check_banned(_ConnPkt, #pstate{enable_ban = false}) ->
  710. ok;
  711. check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
  712. #pstate{peername = Peername}) ->
  713. case emqx_banned:check(#{client_id => ClientId,
  714. username => Username,
  715. peername => Peername}) of
  716. true -> {error, ?RC_BANNED};
  717. false -> ok
  718. end.
  719. check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) ->
  720. ok;
  721. check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) ->
  722. try emqx_topic:validate(WillTopic) of
  723. true -> check_will_acl(ConnPkt, PState)
  724. catch error : _Error ->
  725. {error, ?RC_TOPIC_NAME_INVALID}
  726. end.
  727. check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl ->
  728. ok;
  729. check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) ->
  730. case emqx_access_control:check_acl(Credentials, publish, WillTopic) of
  731. allow -> ok;
  732. deny ->
  733. ?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]),
  734. {error, ?RC_NOT_AUTHORIZED}
  735. end.
  736. check_publish(Packet, PState) ->
  737. run_check_steps([fun check_pub_caps/2,
  738. fun check_pub_acl/2], Packet, PState).
  739. check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain},
  740. variable = #mqtt_packet_publish{properties = _Properties}},
  741. #pstate{zone = Zone}) ->
  742. emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
  743. check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
  744. when IsSuper orelse (not EnableAcl) ->
  745. ok;
  746. check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) ->
  747. case emqx_access_control:check_acl(Credentials, publish, Topic) of
  748. allow -> ok;
  749. deny -> {error, ?RC_NOT_AUTHORIZED}
  750. end.
  751. run_check_steps([], _Packet, _PState) ->
  752. ok;
  753. run_check_steps([Check|Steps], Packet, PState) ->
  754. case Check(Packet, PState) of
  755. ok ->
  756. run_check_steps(Steps, Packet, PState);
  757. Error = {error, _RC} ->
  758. Error
  759. end.
  760. check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
  761. case emqx_mqtt_caps:check_sub(Zone, TopicFilters) of
  762. {ok, TopicFilter1} ->
  763. check_sub_acl(TopicFilter1, PState);
  764. {error, TopicFilter1} ->
  765. {error, TopicFilter1}
  766. end.
  767. check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
  768. when IsSuper orelse (not EnableAcl) ->
  769. {ok, TopicFilters};
  770. check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) ->
  771. lists:foldr(
  772. fun({Topic, SubOpts}, {Ok, Acc}) ->
  773. case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
  774. allow -> {Ok, [{Topic, SubOpts}|Acc]};
  775. deny ->
  776. {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
  777. end
  778. end, {ok, []}, TopicFilters).
  779. trace(recv, Packet) ->
  780. ?LOG(debug, "[Protocol] RECV ~s", [emqx_packet:format(Packet)]);
  781. trace(send, Packet) ->
  782. ?LOG(debug, "[Protocol] SEND ~s", [emqx_packet:format(Packet)]).
  783. inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) ->
  784. PState#pstate{recv_stats = inc_stats(Type, Stats)};
  785. inc_stats(send, Type, PState = #pstate{send_stats = Stats}) ->
  786. PState#pstate{send_stats = inc_stats(Type, Stats)}.
  787. inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
  788. Stats#{pkt := PktCnt + 1, msg := case Type =:= ?PUBLISH of
  789. true -> MsgCnt + 1;
  790. false -> MsgCnt
  791. end}.
  792. terminate(_Reason, #pstate{client_id = undefined}) ->
  793. ok;
  794. terminate(_Reason, PState = #pstate{connected = false}) ->
  795. do_flapping_detect(disconnect, PState),
  796. ok;
  797. terminate(Reason, PState) when Reason =:= conflict;
  798. Reason =:= discard ->
  799. do_flapping_detect(disconnect, PState),
  800. ok;
  801. terminate(Reason, PState = #pstate{credentials = Credentials}) ->
  802. do_flapping_detect(disconnect, PState),
  803. ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]),
  804. ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]).
  805. start_keepalive(0, _PState) ->
  806. ignore;
  807. start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
  808. Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
  809. self() ! {keepalive, start, round(Secs * Backoff)}.
  810. %%-----------------------------------------------------------------------------
  811. %% Parse topic filters
  812. %%-----------------------------------------------------------------------------
  813. parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
  814. [emqx_topic:parse(RawTopic, SubOpts) || {RawTopic, SubOpts} <- RawTopicFilters];
  815. parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
  816. lists:map(fun emqx_topic:parse/1, RawTopicFilters).
  817. sp(true) -> 1;
  818. sp(false) -> 0.
  819. flag(false) -> 0;
  820. flag(true) -> 1.
  821. %%------------------------------------------------------------------------------
  822. %% Execute actions in case acl deny
  823. do_flapping_detect(Action, #pstate{zone = Zone,
  824. client_id = ClientId,
  825. enable_flapping_detect = true}) ->
  826. BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
  827. Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
  828. Until = erlang:system_time(second) + BanExpiryInterval,
  829. case emqx_flapping:check(Action, ClientId, Threshold) of
  830. flapping ->
  831. emqx_banned:add(#banned{who = {client_id, ClientId},
  832. reason = <<"flapping">>,
  833. by = <<"flapping_checker">>,
  834. until = Until}),
  835. ok;
  836. _Other ->
  837. ok
  838. end;
  839. do_flapping_detect(_Action, _PState) ->
  840. ok.
  841. do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
  842. ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
  843. acl_deny_action = disconnect}) ->
  844. {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
  845. do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload),
  846. ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
  847. acl_deny_action = disconnect}) ->
  848. deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
  849. {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
  850. do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload),
  851. ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer,
  852. acl_deny_action = disconnect}) ->
  853. deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
  854. {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
  855. do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
  856. ReasonCodes, PState = #pstate{proto_ver = ProtoVer,
  857. acl_deny_action = disconnect}) ->
  858. case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of
  859. true ->
  860. deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
  861. {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState};
  862. false ->
  863. {ok, PState}
  864. end;
  865. do_acl_deny_action(_PubSupPacket, _ReasonCode, PState) ->
  866. {ok, PState}.
  867. %% Reason code compat
  868. reason_codes_compat(_PktType, ReasonCodes, ?MQTT_PROTO_V5) ->
  869. ReasonCodes;
  870. reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
  871. undefined;
  872. reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
  873. [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
  874. raw_topic_filters(#pstate{proto_ver = ProtoVer,
  875. is_bridge = IsBridge,
  876. ignore_loop = IgnoreLoop}, RawTopicFilters) ->
  877. case ProtoVer < ?MQTT_PROTO_V5 of
  878. true ->
  879. IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
  880. case IsBridge of
  881. true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
  882. false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
  883. end;
  884. false ->
  885. RawTopicFilters
  886. end.
  887. mountpoint(Credentials) ->
  888. maps:get(mountpoint, Credentials, undefined).