emqx_protocol.erl 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% MQTT Protocol
  17. -module(emqx_protocol).
  18. -include("emqx.hrl").
  19. -include("emqx_mqtt.hrl").
  20. -include("logger.hrl").
  21. -include("types.hrl").
  22. -logger_header("[Protocol]").
  23. -export([ info/1
  24. , info/2
  25. , attrs/1
  26. , caps/1
  27. ]).
  28. -export([ init/2
  29. , handle_in/2
  30. , handle_deliver/2
  31. , handle_out/2
  32. , handle_timeout/3
  33. , terminate/2
  34. ]).
  35. -import(emqx_access_control,
  36. [ authenticate/1
  37. , check_acl/3
  38. ]).
  39. -export_type([proto_state/0]).
  40. -record(protocol, {
  41. client :: emqx_types:client(),
  42. session :: emqx_session:session(),
  43. proto_name :: binary(),
  44. proto_ver :: emqx_types:ver(),
  45. keepalive :: non_neg_integer(),
  46. will_msg :: emqx_types:message(),
  47. topic_aliases :: maybe(map()),
  48. alias_maximum :: maybe(map()),
  49. ack_props :: maybe(emqx_types:properties()) %% Tmp props
  50. }).
  51. -opaque(proto_state() :: #protocol{}).
  52. -define(NO_PROPS, undefined).
  53. -spec(info(proto_state()) -> emqx_types:infos()).
  54. info(#protocol{client = Client,
  55. session = Session,
  56. proto_name = ProtoName,
  57. proto_ver = ProtoVer,
  58. keepalive = Keepalive,
  59. will_msg = WillMsg,
  60. topic_aliases = Aliases}) ->
  61. #{client => Client,
  62. session => emqx_session:info(Session),
  63. proto_name => ProtoName,
  64. proto_ver => ProtoVer,
  65. keepalive => Keepalive,
  66. will_msg => WillMsg,
  67. topic_aliases => Aliases
  68. }.
  69. -spec(info(atom(), proto_state()) -> term()).
  70. info(client, #protocol{client = Client}) ->
  71. Client;
  72. info(zone, #protocol{client = #{zone := Zone}}) ->
  73. Zone;
  74. info(client_id, #protocol{client = #{client_id := ClientId}}) ->
  75. ClientId;
  76. info(session, #protocol{session = Session}) ->
  77. Session;
  78. info(proto_name, #protocol{proto_name = ProtoName}) ->
  79. ProtoName;
  80. info(proto_ver, #protocol{proto_ver = ProtoVer}) ->
  81. ProtoVer;
  82. info(keepalive, #protocol{keepalive = Keepalive}) ->
  83. Keepalive;
  84. info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
  85. Aliases.
  86. attrs(#protocol{client = Client,
  87. session = Session,
  88. proto_name = ProtoName,
  89. proto_ver = ProtoVer,
  90. keepalive = Keepalive}) ->
  91. #{client => Client,
  92. session => emqx_session:attrs(Session),
  93. proto_name => ProtoName,
  94. proto_ver => ProtoVer,
  95. keepalive => Keepalive
  96. }.
  97. caps(#protocol{client = #{zone := Zone}}) ->
  98. emqx_mqtt_caps:get_caps(Zone).
  99. -spec(init(emqx_types:conn(), proplists:proplist()) -> proto_state()).
  100. init(ConnInfo, Options) ->
  101. Zone = proplists:get_value(zone, Options),
  102. Peercert = maps:get(peercert, ConnInfo, undefined),
  103. Username = case peer_cert_as_username(Options) of
  104. cn -> esockd_peercert:common_name(Peercert);
  105. dn -> esockd_peercert:subject(Peercert);
  106. crt -> Peercert;
  107. _ -> undefined
  108. end,
  109. MountPoint = emqx_zone:get_env(Zone, mountpoint),
  110. Client = maps:merge(#{zone => Zone,
  111. username => Username,
  112. mountpoint => MountPoint,
  113. is_bridge => false,
  114. is_superuser => false
  115. }, ConnInfo),
  116. #protocol{client = Client,
  117. proto_name = <<"MQTT">>,
  118. proto_ver = ?MQTT_PROTO_V4
  119. }.
  120. peer_cert_as_username(Options) ->
  121. proplists:get_value(peer_cert_as_username, Options).
  122. %%--------------------------------------------------------------------
  123. %% Handle incoming packet
  124. %%--------------------------------------------------------------------
  125. -spec(handle_in(emqx_types:packet(), proto_state())
  126. -> {ok, proto_state()}
  127. | {ok, emqx_types:packet(), proto_state()}
  128. | {ok, list(emqx_types:packet()), proto_state()}
  129. | {error, Reason :: term(), proto_state()}
  130. | {stop, Error :: atom(), proto_state()}).
  131. handle_in(?CONNECT_PACKET(
  132. #mqtt_packet_connect{proto_name = ProtoName,
  133. proto_ver = ProtoVer,
  134. keepalive = Keepalive,
  135. client_id = ClientId
  136. } = ConnPkt), PState) ->
  137. PState1 = PState#protocol{proto_name = ProtoName,
  138. proto_ver = ProtoVer,
  139. keepalive = Keepalive
  140. },
  141. ok = emqx_logger:set_metadata_client_id(ClientId),
  142. case pipeline([fun validate_in/2,
  143. fun process_props/2,
  144. fun check_connect/2,
  145. fun enrich_client/2,
  146. fun auth_connect/2], ConnPkt, PState1) of
  147. {ok, NConnPkt, NPState} ->
  148. process_connect(NConnPkt, maybe_assign_clientid(NPState));
  149. {error, ReasonCode, NPState} ->
  150. handle_out({disconnect, ReasonCode}, NPState)
  151. end;
  152. handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState) ->
  153. case pipeline([fun validate_in/2,
  154. fun process_alias/2,
  155. fun check_publish/2], Packet, PState) of
  156. {ok, NPacket, NPState} ->
  157. process_publish(NPacket, NPState);
  158. {error, ReasonCode, NPState} ->
  159. ?LOG(warning, "Cannot publish message to ~s due to ~s",
  160. [Topic, emqx_reason_codes:text(ReasonCode)]),
  161. puback(QoS, PacketId, ReasonCode, NPState)
  162. end;
  163. handle_in(?PUBACK_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
  164. case emqx_session:puback(PacketId, ReasonCode, Session) of
  165. {ok, Publishes, NSession} ->
  166. handle_out({publish, Publishes}, PState#protocol{session = NSession});
  167. {ok, NSession} ->
  168. {ok, PState#protocol{session = NSession}};
  169. {error, _NotFound} ->
  170. {ok, PState}
  171. end;
  172. handle_in(?PUBREC_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
  173. case emqx_session:pubrec(PacketId, ReasonCode, Session) of
  174. {ok, NSession} ->
  175. handle_out({pubrel, PacketId}, PState#protocol{session = NSession});
  176. {error, ReasonCode} ->
  177. handle_out({pubrel, PacketId, ReasonCode}, PState)
  178. end;
  179. handle_in(?PUBREL_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
  180. case emqx_session:pubrel(PacketId, ReasonCode, Session) of
  181. {ok, NSession} ->
  182. handle_out({pubcomp, PacketId}, PState#protocol{session = NSession});
  183. {error, ReasonCode} ->
  184. handle_out({pubcomp, PacketId, ReasonCode}, PState)
  185. end;
  186. handle_in(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #protocol{session = Session}) ->
  187. case emqx_session:pubcomp(PacketId, ReasonCode, Session) of
  188. {ok, Publishes, NSession} ->
  189. handle_out({publish, Publishes}, PState#protocol{session = NSession});
  190. {ok, NSession} ->
  191. {ok, PState#protocol{session = NSession}};
  192. {error, _NotFound} ->
  193. {ok, PState}
  194. end;
  195. handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
  196. PState = #protocol{client = Client}) ->
  197. case validate_in(Packet, PState) of
  198. ok -> TopicFilters1 = [emqx_topic:parse(TopicFilter, SubOpts)
  199. || {TopicFilter, SubOpts} <- TopicFilters],
  200. TopicFilters2 = emqx_hooks:run_fold('client.subscribe',
  201. [Client, Properties],
  202. TopicFilters1),
  203. TopicFilters3 = enrich_subid(Properties, TopicFilters2),
  204. {ReasonCodes, NPState} = process_subscribe(TopicFilters3, PState),
  205. handle_out({suback, PacketId, ReasonCodes}, NPState);
  206. {error, ReasonCode} ->
  207. handle_out({disconnect, ReasonCode}, PState)
  208. end;
  209. handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
  210. PState = #protocol{client = Client}) ->
  211. case validate_in(Packet, PState) of
  212. ok -> TopicFilters1 = lists:map(fun emqx_topic:parse/1, TopicFilters),
  213. TopicFilters2 = emqx_hooks:run_fold('client.unsubscribe',
  214. [Client, Properties],
  215. TopicFilters1),
  216. {ReasonCodes, NPState} = process_unsubscribe(TopicFilters2, PState),
  217. handle_out({unsuback, PacketId, ReasonCodes}, NPState);
  218. {error, ReasonCode} ->
  219. handle_out({disconnect, ReasonCode}, PState)
  220. end;
  221. handle_in(?PACKET(?PINGREQ), PState) ->
  222. {ok, ?PACKET(?PINGRESP), PState};
  223. handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
  224. %% Clear will msg
  225. {stop, normal, PState#protocol{will_msg = undefined}};
  226. handle_in(?DISCONNECT_PACKET(RC), PState = #protocol{proto_ver = Ver}) ->
  227. %% TODO:
  228. %% {stop, {shutdown, abnormal_disconnet}, PState};
  229. {stop, {shutdown, emqx_reason_codes:name(RC, Ver)}, PState};
  230. handle_in(?AUTH_PACKET(), PState) ->
  231. %%TODO: implement later.
  232. {ok, PState};
  233. handle_in(Packet, PState) ->
  234. io:format("In: ~p~n", [Packet]),
  235. {ok, PState}.
  236. %%--------------------------------------------------------------------
  237. %% Handle delivers
  238. %%--------------------------------------------------------------------
  239. handle_deliver(Delivers, PState = #protocol{session = Session})
  240. when is_list(Delivers) ->
  241. case emqx_session:deliver(Delivers, Session) of
  242. {ok, Publishes, NSession} ->
  243. handle_out({publish, Publishes}, PState#protocol{session = NSession});
  244. {ok, NSession} ->
  245. {ok, PState#protocol{session = NSession}}
  246. end.
  247. %%--------------------------------------------------------------------
  248. %% Handle outgoing packet
  249. %%--------------------------------------------------------------------
  250. handle_out({connack, ?RC_SUCCESS, SP},
  251. PState = #protocol{client = Client = #{zone := Zone},
  252. ack_props = AckProps,
  253. alias_maximum = AliasMaximum}) ->
  254. ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(PState)]),
  255. #{max_packet_size := MaxPktSize,
  256. max_qos_allowed := MaxQoS,
  257. mqtt_retain_available := Retain,
  258. max_topic_alias := MaxAlias,
  259. mqtt_shared_subscription := Shared,
  260. mqtt_wildcard_subscription := Wildcard
  261. } = caps(PState),
  262. %% Response-Information is so far not set by broker.
  263. %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.
  264. %% According to MQTT 5.0 spec:
  265. %% A common use of this is to pass a globally unique portion of the topic tree which
  266. %% is reserved for this Client for at least the lifetime of its Session.
  267. %% This often cannot just be a random name as both the requesting Client and the
  268. %% responding Client need to be authorized to use it.
  269. %% If we are to support it in the feature, the implementation should be flexible
  270. %% to allow prefixing the response topic based on different ACL config.
  271. %% e.g. prefix by username or client-id, so that unauthorized clients can not
  272. %% subscribe requests or responses that are not intended for them.
  273. AckProps1 = if AckProps == undefined -> #{}; true -> AckProps end,
  274. AckProps2 = AckProps1#{'Retain-Available' => flag(Retain),
  275. 'Maximum-Packet-Size' => MaxPktSize,
  276. 'Topic-Alias-Maximum' => MaxAlias,
  277. 'Wildcard-Subscription-Available' => flag(Wildcard),
  278. 'Subscription-Identifier-Available' => 1,
  279. %'Response-Information' =>
  280. 'Shared-Subscription-Available' => flag(Shared),
  281. 'Maximum-QoS' => MaxQoS
  282. },
  283. AckProps3 = case emqx_zone:get_env(Zone, server_keepalive) of
  284. undefined -> AckProps2;
  285. Keepalive -> AckProps2#{'Server-Keep-Alive' => Keepalive}
  286. end,
  287. AliasMaximum1 = set_property(inbound, MaxAlias, AliasMaximum),
  288. PState1 = PState#protocol{alias_maximum = AliasMaximum1,
  289. ack_props = undefined
  290. },
  291. {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps3), PState1};
  292. handle_out({connack, ReasonCode}, PState = #protocol{client = Client,
  293. proto_ver = ProtoVer}) ->
  294. ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(PState)]),
  295. ReasonCode1 = if
  296. ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode;
  297. true -> emqx_reason_codes:compat(connack, ReasonCode)
  298. end,
  299. Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
  300. {error, Reason, ?CONNACK_PACKET(ReasonCode1), PState};
  301. handle_out({publish, Publishes}, PState = #protocol{client = Client}) ->
  302. Packets = [element(2, handle_out(Publish, PState)) || Publish <- Publishes],
  303. {ok, Packets, PState};
  304. handle_out({publish, PacketId, Msg}, PState = #protocol{client = Client}) ->
  305. Msg1 = emqx_hooks:run_fold('message.deliver', [Client],
  306. emqx_message:update_expiry(Msg)),
  307. Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)),
  308. {ok, Packet, PState};
  309. %% TODO: How to handle the err?
  310. handle_out({puberr, _ReasonCode}, PState) ->
  311. {ok, PState};
  312. handle_out({puback, PacketId, ReasonCode}, PState) ->
  313. {ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState};
  314. handle_out({pubrel, PacketId}, PState) ->
  315. {ok, ?PUBREL_PACKET(PacketId), PState};
  316. handle_out({pubrel, PacketId, ReasonCode}, PState) ->
  317. {ok, ?PUBREL_PACKET(PacketId, ReasonCode), PState};
  318. handle_out({pubrec, PacketId, ReasonCode}, PState) ->
  319. {ok, ?PUBREC_PACKET(PacketId, ReasonCode), PState};
  320. handle_out({pubcomp, PacketId}, PState) ->
  321. {ok, ?PUBCOMP_PACKET(PacketId), PState};
  322. handle_out({pubcomp, PacketId, ReasonCode}, PState) ->
  323. {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), PState};
  324. handle_out({suback, PacketId, ReasonCodes}, PState = #protocol{proto_ver = ?MQTT_PROTO_V5}) ->
  325. %% TODO: ACL Deny
  326. {ok, ?SUBACK_PACKET(PacketId, ReasonCodes), PState};
  327. handle_out({suback, PacketId, ReasonCodes}, PState) ->
  328. %% TODO: ACL Deny
  329. ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
  330. {ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), PState};
  331. handle_out({unsuback, PacketId, ReasonCodes}, PState = #protocol{proto_ver = ?MQTT_PROTO_V5}) ->
  332. {ok, ?UNSUBACK_PACKET(PacketId, ReasonCodes), PState};
  333. %% Ignore reason codes if not MQTT5
  334. handle_out({unsuback, PacketId, _ReasonCodes}, PState) ->
  335. {ok, ?UNSUBACK_PACKET(PacketId), PState};
  336. handle_out(Packet, State) ->
  337. io:format("Out: ~p~n", [Packet]),
  338. {ok, State}.
  339. %%--------------------------------------------------------------------
  340. %% Handle timeout
  341. %%--------------------------------------------------------------------
  342. handle_timeout(TRef, Msg, PState = #protocol{session = Session}) ->
  343. case emqx_session:timeout(TRef, Msg, Session) of
  344. {ok, NSession} ->
  345. {ok, PState#protocol{session = NSession}};
  346. {ok, Publishes, NSession} ->
  347. handle_out({publish, Publishes}, PState#protocol{session = NSession})
  348. end.
  349. terminate(Reason, _PState) ->
  350. io:format("Terminated for ~p~n", [Reason]),
  351. ok.
  352. %%--------------------------------------------------------------------
  353. %% Validate incoming packet
  354. %%--------------------------------------------------------------------
  355. -spec(validate_in(emqx_types:packet(), proto_state())
  356. -> ok | {error, emqx_types:reason_code()}).
  357. validate_in(Packet, _PState) ->
  358. try emqx_packet:validate(Packet) of
  359. true -> ok
  360. catch
  361. error:protocol_error ->
  362. {error, ?RC_PROTOCOL_ERROR};
  363. error:subscription_identifier_invalid ->
  364. {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED};
  365. error:topic_alias_invalid ->
  366. {error, ?RC_TOPIC_ALIAS_INVALID};
  367. error:topic_filters_invalid ->
  368. {error, ?RC_TOPIC_FILTER_INVALID};
  369. error:topic_name_invalid ->
  370. {error, ?RC_TOPIC_FILTER_INVALID};
  371. error:_Reason ->
  372. {error, ?RC_MALFORMED_PACKET}
  373. end.
  374. %%--------------------------------------------------------------------
  375. %% Preprocess properties
  376. %%--------------------------------------------------------------------
  377. process_props(#mqtt_packet_connect{
  378. properties = #{'Topic-Alias-Maximum' := Max}
  379. },
  380. PState = #protocol{alias_maximum = AliasMaximum}) ->
  381. NAliasMaximum = if AliasMaximum == undefined ->
  382. #{outbound => Max};
  383. true -> AliasMaximum#{outbound => Max}
  384. end,
  385. {ok, PState#protocol{alias_maximum = NAliasMaximum}};
  386. process_props(Packet, PState) ->
  387. {ok, Packet, PState}.
  388. %%--------------------------------------------------------------------
  389. %% Check Connect Packet
  390. %%--------------------------------------------------------------------
  391. check_connect(ConnPkt, PState) ->
  392. case pipeline([fun check_proto_ver/2,
  393. fun check_client_id/2,
  394. %%fun check_flapping/2,
  395. fun check_banned/2,
  396. fun check_will_topic/2,
  397. fun check_will_retain/2], ConnPkt, PState) of
  398. ok -> {ok, PState};
  399. Error -> Error
  400. end.
  401. check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
  402. proto_name = Name}, _PState) ->
  403. case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
  404. true -> ok;
  405. false -> {error, ?RC_PROTOCOL_ERROR}
  406. end.
  407. %% MQTT3.1 does not allow null clientId
  408. check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
  409. client_id = <<>>
  410. }, _PState) ->
  411. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
  412. %% Issue#599: Null clientId and clean_start = false
  413. check_client_id(#mqtt_packet_connect{client_id = <<>>,
  414. clean_start = false}, _PState) ->
  415. {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
  416. check_client_id(#mqtt_packet_connect{client_id = <<>>,
  417. clean_start = true}, _PState) ->
  418. ok;
  419. check_client_id(#mqtt_packet_connect{client_id = ClientId},
  420. #protocol{client = #{zone := Zone}}) ->
  421. Len = byte_size(ClientId),
  422. MaxLen = emqx_zone:get_env(Zone, max_clientid_len),
  423. case (1 =< Len) andalso (Len =< MaxLen) of
  424. true -> ok;
  425. false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
  426. end.
  427. %%TODO: check banned...
  428. check_banned(#mqtt_packet_connect{client_id = ClientId,
  429. username = Username},
  430. #protocol{client = Client = #{zone := Zone}}) ->
  431. case emqx_zone:get_env(Zone, enable_ban, false) of
  432. true ->
  433. case emqx_banned:check(Client#{client_id => ClientId,
  434. username => Username}) of
  435. true -> {error, ?RC_BANNED};
  436. false -> ok
  437. end;
  438. false -> ok
  439. end.
  440. check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) ->
  441. ok;
  442. check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _PState) ->
  443. try emqx_topic:validate(WillTopic) of
  444. true -> ok
  445. catch error:_Error ->
  446. {error, ?RC_TOPIC_NAME_INVALID}
  447. end.
  448. check_will_retain(#mqtt_packet_connect{will_retain = false}, _PState) ->
  449. ok;
  450. check_will_retain(#mqtt_packet_connect{will_retain = true},
  451. #protocol{client = #{zone := Zone}}) ->
  452. case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
  453. true -> ok;
  454. false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
  455. end.
  456. %%--------------------------------------------------------------------
  457. %% Enrich client
  458. %%--------------------------------------------------------------------
  459. enrich_client(#mqtt_packet_connect{client_id = ClientId,
  460. username = Username,
  461. is_bridge = IsBridge
  462. },
  463. PState = #protocol{client = Client}) ->
  464. Client1 = set_username(Username, Client#{client_id => ClientId,
  465. is_bridge => IsBridge
  466. }),
  467. {ok, PState#protocol{client = maybe_username_as_clientid(Client1)}}.
  468. %% Username maybe not undefined if peer_cert_as_username
  469. set_username(Username, Client = #{username := undefined}) ->
  470. Client#{username => Username};
  471. set_username(_Username, Client) -> Client.
  472. maybe_username_as_clientid(Client = #{username := undefined}) ->
  473. Client;
  474. maybe_username_as_clientid(Client = #{zone := Zone,
  475. username := Username}) ->
  476. case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
  477. true -> Client#{client_id => Username};
  478. false -> Client
  479. end.
  480. %%--------------------------------------------------------------------
  481. %% Auth Connect
  482. %%--------------------------------------------------------------------
  483. auth_connect(#mqtt_packet_connect{client_id = ClientId,
  484. username = Username,
  485. password = Password},
  486. PState = #protocol{client = Client}) ->
  487. case authenticate(Client#{password => Password}) of
  488. {ok, AuthResult} ->
  489. {ok, PState#protocol{client = maps:merge(Client, AuthResult)}};
  490. {error, Reason} ->
  491. ?LOG(warning, "Client ~s (Username: '~s') login failed for ~p",
  492. [ClientId, Username, Reason]),
  493. {error, emqx_reason_codes:connack_error(Reason)}
  494. end.
  495. %%--------------------------------------------------------------------
  496. %% Assign a random clientId
  497. %%--------------------------------------------------------------------
  498. maybe_assign_clientid(PState = #protocol{client = Client = #{client_id := <<>>},
  499. ack_props = AckProps}) ->
  500. ClientId = emqx_guid:to_base62(emqx_guid:gen()),
  501. Client1 = Client#{client_id => ClientId},
  502. AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
  503. PState#protocol{client = Client1, ack_props = AckProps1};
  504. maybe_assign_clientid(PState) -> PState.
  505. %%--------------------------------------------------------------------
  506. %% Process Connect
  507. %%--------------------------------------------------------------------
  508. process_connect(ConnPkt, PState) ->
  509. case open_session(ConnPkt, PState) of
  510. {ok, Session, SP} ->
  511. WillMsg = emqx_packet:will_msg(ConnPkt),
  512. NPState = PState#protocol{session = Session,
  513. will_msg = WillMsg
  514. },
  515. handle_out({connack, ?RC_SUCCESS, sp(SP)}, NPState);
  516. {error, Reason} ->
  517. %% TODO: Unknown error?
  518. ?LOG(error, "Failed to open session: ~p", [Reason]),
  519. handle_out({connack, ?RC_UNSPECIFIED_ERROR}, PState)
  520. end.
  521. %%--------------------------------------------------------------------
  522. %% Open session
  523. %%--------------------------------------------------------------------
  524. open_session(#mqtt_packet_connect{clean_start = CleanStart,
  525. properties = ConnProps},
  526. #protocol{client = Client = #{zone := Zone}}) ->
  527. MaxInflight = get_property('Receive-Maximum', ConnProps,
  528. emqx_zone:get_env(Zone, max_inflight, 65535)),
  529. Interval = get_property('Session-Expiry-Interval', ConnProps,
  530. emqx_zone:get_env(Zone, session_expiry_interval, 0)),
  531. emqx_cm:open_session(CleanStart, Client, #{max_inflight => MaxInflight,
  532. expiry_interval => Interval
  533. }).
  534. %%--------------------------------------------------------------------
  535. %% Process publish message: Client -> Broker
  536. %%--------------------------------------------------------------------
  537. process_alias(Packet = #mqtt_packet{
  538. variable = #mqtt_packet_publish{topic_name = <<>>,
  539. properties = #{'Topic-Alias' := AliasId}
  540. } = Publish
  541. }, PState = #protocol{topic_aliases = Aliases}) ->
  542. case find_alias(AliasId, Aliases) of
  543. {ok, Topic} ->
  544. {ok, Packet#mqtt_packet{
  545. variable = Publish#mqtt_packet_publish{
  546. topic_name = Topic}}, PState};
  547. false -> {error, ?RC_TOPIC_ALIAS_INVALID}
  548. end;
  549. process_alias(#mqtt_packet{
  550. variable = #mqtt_packet_publish{topic_name = Topic,
  551. properties = #{'Topic-Alias' := AliasId}
  552. }
  553. }, PState = #protocol{topic_aliases = Aliases}) ->
  554. {ok, PState#protocol{topic_aliases = save_alias(AliasId, Topic, Aliases)}};
  555. process_alias(_Packet, PState) ->
  556. {ok, PState}.
  557. find_alias(_AliasId, undefined) ->
  558. false;
  559. find_alias(AliasId, Aliases) ->
  560. maps:find(AliasId, Aliases).
  561. save_alias(AliasId, Topic, undefined) ->
  562. #{AliasId => Topic};
  563. save_alias(AliasId, Topic, Aliases) ->
  564. maps:put(AliasId, Topic, Aliases).
  565. %% Check Publish
  566. check_publish(Packet, PState) ->
  567. pipeline([fun check_pub_acl/2,
  568. fun check_pub_alias/2,
  569. fun check_pub_caps/2], Packet, PState).
  570. %% Check Pub ACL
  571. check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
  572. #protocol{client = Client}) ->
  573. case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of
  574. false -> ok;
  575. allow -> ok;
  576. deny -> {error, ?RC_NOT_AUTHORIZED}
  577. end.
  578. %% Check Pub Alias
  579. check_pub_alias(#mqtt_packet{
  580. variable = #mqtt_packet_publish{
  581. properties = #{'Topic-Alias' := AliasId}
  582. }
  583. },
  584. #protocol{alias_maximum = Limits}) ->
  585. case (Limits == undefined)
  586. orelse (Max = maps:get(inbound, Limits, 0)) == 0
  587. orelse (AliasId > Max) of
  588. false -> ok;
  589. true -> {error, ?RC_TOPIC_ALIAS_INVALID}
  590. end;
  591. check_pub_alias(_Packet, _PState) -> ok.
  592. %% Check Pub Caps
  593. check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
  594. retain = Retain
  595. }
  596. },
  597. #protocol{client = #{zone := Zone}}) ->
  598. emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
  599. %% Process Publish
  600. process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId),
  601. PState = #protocol{client = Client}) ->
  602. Msg = emqx_packet:to_message(Client, Packet),
  603. %%TODO: Improve later.
  604. Msg1 = emqx_message:set_flag(dup, false, Msg),
  605. process_publish(PacketId, mount(Client, Msg1), PState).
  606. process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, PState) ->
  607. _ = emqx_broker:publish(Msg),
  608. {ok, PState};
  609. process_publish(PacketId, Msg = #message{qos = ?QOS_1}, PState) ->
  610. Deliveries = emqx_broker:publish(Msg),
  611. ReasonCode = emqx_reason_codes:puback(Deliveries),
  612. handle_out({puback, PacketId, ReasonCode}, PState);
  613. process_publish(PacketId, Msg = #message{qos = ?QOS_2},
  614. PState = #protocol{session = Session}) ->
  615. case emqx_session:publish(PacketId, Msg, Session) of
  616. {ok, Deliveries, NSession} ->
  617. ReasonCode = emqx_reason_codes:puback(Deliveries),
  618. handle_out({pubrec, PacketId, ReasonCode},
  619. PState#protocol{session = NSession});
  620. {error, ReasonCode} ->
  621. handle_out({pubrec, PacketId, ReasonCode}, PState)
  622. end.
  623. %%--------------------------------------------------------------------
  624. %% Puback
  625. %%--------------------------------------------------------------------
  626. puback(?QOS_0, _PacketId, ReasonCode, PState) ->
  627. handle_out({puberr, ReasonCode}, PState);
  628. puback(?QOS_1, PacketId, ReasonCode, PState) ->
  629. handle_out({puback, PacketId, ReasonCode}, PState);
  630. puback(?QOS_2, PacketId, ReasonCode, PState) ->
  631. handle_out({pubrec, PacketId, ReasonCode}, PState).
  632. %%--------------------------------------------------------------------
  633. %% Process subscribe request
  634. %%--------------------------------------------------------------------
  635. process_subscribe(TopicFilters, PState) ->
  636. process_subscribe(TopicFilters, [], PState).
  637. process_subscribe([], Acc, PState) ->
  638. {lists:reverse(Acc), PState};
  639. process_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
  640. {RC, NPState} = do_subscribe(TopicFilter, SubOpts, PState),
  641. process_subscribe(More, [RC|Acc], NPState).
  642. do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
  643. PState = #protocol{client = Client, session = Session}) ->
  644. case check_subscribe(TopicFilter, PState) of
  645. ok -> TopicFilter1 = mount(Client, TopicFilter),
  646. SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState),
  647. case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
  648. {ok, NSession} ->
  649. {QoS, PState#protocol{session = NSession}};
  650. {error, RC} -> {RC, PState}
  651. end;
  652. {error, RC} -> {RC, PState}
  653. end.
  654. enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
  655. [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
  656. enrich_subid(_Properties, TopicFilters) ->
  657. TopicFilters.
  658. enrich_subopts(SubOpts, #protocol{proto_ver = ?MQTT_PROTO_V5}) ->
  659. SubOpts;
  660. enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, is_bridge := IsBridge}}) ->
  661. Rap = flag(IsBridge),
  662. Nl = flag(emqx_zone:get_env(Zone, ignore_loop_deliver, false)),
  663. SubOpts#{rap => Rap, nl => Nl}.
  664. %% Check Sub
  665. check_subscribe(TopicFilter, PState) ->
  666. case check_sub_acl(TopicFilter, PState) of
  667. allow -> ok; %%TODO: check_sub_caps(TopicFilter, PState);
  668. deny -> {error, ?RC_NOT_AUTHORIZED}
  669. end.
  670. %% Check Sub ACL
  671. check_sub_acl(TopicFilter, #protocol{client = Client}) ->
  672. case is_acl_enabled(Client) andalso
  673. check_acl(Client, subscribe, TopicFilter) of
  674. false -> allow;
  675. Result -> Result
  676. end.
  677. %% Check Sub Caps
  678. check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) ->
  679. emqx_mqtt_caps:check_sub(Zone, TopicFilter).
  680. %%--------------------------------------------------------------------
  681. %% Process unsubscribe request
  682. %%--------------------------------------------------------------------
  683. process_unsubscribe(TopicFilters, PState) ->
  684. process_unsubscribe(TopicFilters, [], PState).
  685. process_unsubscribe([], Acc, PState) ->
  686. {lists:reverse(Acc), PState};
  687. process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
  688. {RC, PState1} = do_unsubscribe(TopicFilter, SubOpts, PState),
  689. process_unsubscribe(More, [RC|Acc], PState1).
  690. do_unsubscribe(TopicFilter, _SubOpts, PState = #protocol{client = Client,
  691. session = Session}) ->
  692. case emqx_session:unsubscribe(Client, mount(Client, TopicFilter), Session) of
  693. {ok, NSession} ->
  694. {?RC_SUCCESS, PState#protocol{session = NSession}};
  695. {error, RC} -> {RC, PState}
  696. end.
  697. %%--------------------------------------------------------------------
  698. %% Is ACL enabled?
  699. %%--------------------------------------------------------------------
  700. is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
  701. (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true).
  702. %%--------------------------------------------------------------------
  703. %% Pipeline
  704. %%--------------------------------------------------------------------
  705. pipeline([], Packet, PState) ->
  706. {ok, Packet, PState};
  707. pipeline([Fun|More], Packet, PState) ->
  708. case Fun(Packet, PState) of
  709. ok -> pipeline(More, Packet, PState);
  710. {ok, NPState} ->
  711. pipeline(More, Packet, NPState);
  712. {ok, NPacket, NPState} ->
  713. pipeline(More, NPacket, NPState);
  714. {error, ReasonCode} ->
  715. {error, ReasonCode, PState};
  716. {error, ReasonCode, NPState} ->
  717. {error, ReasonCode, NPState}
  718. end.
  719. %%--------------------------------------------------------------------
  720. %% Mount/Unmount
  721. %%--------------------------------------------------------------------
  722. mount(#{mountpoint := MountPoint}, TopicOrMsg) ->
  723. emqx_mountpoint:mount(MountPoint, TopicOrMsg).
  724. unmount(#{mountpoint := MountPoint}, TopicOrMsg) ->
  725. emqx_mountpoint:unmount(MountPoint, TopicOrMsg).
  726. %%--------------------------------------------------------------------
  727. %% Helper functions
  728. %%--------------------------------------------------------------------
  729. set_property(Name, Value, ?NO_PROPS) ->
  730. #{Name => Value};
  731. set_property(Name, Value, Props) ->
  732. Props#{Name => Value}.
  733. get_property(_Name, undefined, Default) ->
  734. Default;
  735. get_property(Name, Props, Default) ->
  736. maps:get(Name, Props, Default).
  737. sp(true) -> 1;
  738. sp(false) -> 0.
  739. flag(true) -> 1;
  740. flag(false) -> 0.