emqttd_protocol.erl 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_protocol).
  17. -author("Feng Lee <feng@emqtt.io>").
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. -include("emqttd_internal.hrl").
  21. -import(proplists, [get_value/2, get_value/3]).
  22. %% API
  23. -export([init/3, init/4, info/1, stats/1, clientid/1, client/1, session/1]).
  24. -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]).
  25. -export([received/2, send/2]).
  26. -export([process/2]).
  27. -record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0,
  28. send_pkt = 0, send_msg = 0}).
  29. %% Protocol State
  30. %% ws_initial_headers: Headers from first HTTP request for WebSocket Client.
  31. -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid,
  32. clean_sess, proto_ver, proto_name, username, is_superuser,
  33. will_msg, keepalive, keepalive_backoff, max_clientid_len,
  34. session, stats_data, mountpoint, ws_initial_headers,
  35. connected_at}).
  36. -type(proto_state() :: #proto_state{}).
  37. -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name,
  38. keepalive, will_msg, ws_initial_headers, mountpoint, connected_at]).
  39. -define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]).
  40. -define(LOG(Level, Format, Args, State),
  41. lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format,
  42. [State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])).
  43. %% @doc Init protocol
  44. init(Peername, SendFun, Opts) ->
  45. Backoff = get_value(keepalive_backoff, Opts, 1.25),
  46. EnableStats = get_value(client_enable_stats, Opts, false),
  47. MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
  48. WsInitialHeaders = get_value(ws_initial_headers, Opts),
  49. #proto_state{peername = Peername,
  50. sendfun = SendFun,
  51. max_clientid_len = MaxLen,
  52. is_superuser = false,
  53. client_pid = self(),
  54. ws_initial_headers = WsInitialHeaders,
  55. keepalive_backoff = Backoff,
  56. stats_data = #proto_stats{enable_stats = EnableStats}}.
  57. init(Conn, Peername, SendFun, Opts) ->
  58. enrich_opt(Conn:opts(), Conn, init(Peername, SendFun, Opts)).
  59. enrich_opt([], _Conn, State) ->
  60. State;
  61. enrich_opt([{mountpoint, MountPoint} | ConnOpts], Conn, State) ->
  62. enrich_opt(ConnOpts, Conn, State#proto_state{mountpoint = MountPoint});
  63. enrich_opt([_ | ConnOpts], Conn, State) ->
  64. enrich_opt(ConnOpts, Conn, State).
  65. info(ProtoState) ->
  66. ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
  67. stats(#proto_state{stats_data = Stats}) ->
  68. tl(?record_to_proplist(proto_stats, Stats)).
  69. clientid(#proto_state{client_id = ClientId}) ->
  70. ClientId.
  71. client(#proto_state{client_id = ClientId,
  72. client_pid = ClientPid,
  73. peername = Peername,
  74. username = Username,
  75. clean_sess = CleanSess,
  76. proto_ver = ProtoVer,
  77. keepalive = Keepalive,
  78. will_msg = WillMsg,
  79. ws_initial_headers = WsInitialHeaders,
  80. mountpoint = MountPoint,
  81. connected_at = Time}) ->
  82. WillTopic = if
  83. WillMsg =:= undefined -> undefined;
  84. true -> WillMsg#mqtt_message.topic
  85. end,
  86. #mqtt_client{client_id = ClientId,
  87. client_pid = ClientPid,
  88. username = Username,
  89. peername = Peername,
  90. clean_sess = CleanSess,
  91. proto_ver = ProtoVer,
  92. keepalive = Keepalive,
  93. will_topic = WillTopic,
  94. ws_initial_headers = WsInitialHeaders,
  95. mountpoint = MountPoint,
  96. connected_at = Time}.
  97. session(#proto_state{session = Session}) ->
  98. Session.
  99. %% CONNECT – Client requests a connection to a Server
  100. %% A Client can only send the CONNECT Packet once over a Network Connection.
  101. -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, term()}).
  102. received(Packet = ?PACKET(?CONNECT),
  103. State = #proto_state{connected = false, stats_data = Stats}) ->
  104. trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats),
  105. process(Packet, State#proto_state{connected = true, stats_data = Stats1});
  106. received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
  107. {error, protocol_bad_connect, State};
  108. %% Received other packets when CONNECT not arrived.
  109. received(_Packet, State = #proto_state{connected = false}) ->
  110. {error, protocol_not_connected, State};
  111. received(Packet = ?PACKET(Type), State = #proto_state{stats_data = Stats}) ->
  112. trace(recv, Packet, State), Stats1 = inc_stats(recv, Type, Stats),
  113. case validate_packet(Packet) of
  114. ok ->
  115. process(Packet, State#proto_state{stats_data = Stats1});
  116. {error, Reason} ->
  117. {error, Reason, State}
  118. end.
  119. subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId,
  120. username = Username,
  121. session = Session}) ->
  122. TopicTable = parse_topic_table(RawTopicTable),
  123. case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
  124. {ok, TopicTable1} ->
  125. emqttd_session:subscribe(Session, TopicTable1);
  126. {stop, _} ->
  127. ok
  128. end,
  129. {ok, ProtoState}.
  130. unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId,
  131. username = Username,
  132. session = Session}) ->
  133. case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
  134. {ok, TopicTable} ->
  135. emqttd_session:unsubscribe(Session, TopicTable);
  136. {stop, _} ->
  137. ok
  138. end,
  139. {ok, ProtoState}.
  140. %% @doc Send PUBREL
  141. pubrel(PacketId, State) -> send(?PUBREL_PACKET(PacketId), State).
  142. process(?CONNECT_PACKET(Var), State0) ->
  143. #mqtt_packet_connect{proto_ver = ProtoVer,
  144. proto_name = ProtoName,
  145. username = Username,
  146. password = Password,
  147. clean_sess = CleanSess,
  148. keep_alive = KeepAlive,
  149. client_id = ClientId} = Var,
  150. State1 = State0#proto_state{proto_ver = ProtoVer,
  151. proto_name = ProtoName,
  152. username = Username,
  153. client_id = ClientId,
  154. clean_sess = CleanSess,
  155. keepalive = KeepAlive,
  156. will_msg = willmsg(Var, State0),
  157. connected_at = os:timestamp()},
  158. {ReturnCode1, SessPresent, State3} =
  159. case validate_connect(Var, State1) of
  160. ?CONNACK_ACCEPT ->
  161. case authenticate(client(State1), Password) of
  162. {ok, IsSuperuser} ->
  163. %% Generate clientId if null
  164. State2 = maybe_set_clientid(State1),
  165. %% Start session
  166. case emqttd_sm:start_session(CleanSess, {clientid(State2), Username}) of
  167. {ok, Session, SP} ->
  168. %% Register the client
  169. emqttd_cm:reg(client(State2)),
  170. %% Start keepalive
  171. start_keepalive(KeepAlive, State2),
  172. %% Emit Stats
  173. self() ! emit_stats,
  174. %% ACCEPT
  175. {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
  176. {error, Error} ->
  177. exit({shutdown, Error})
  178. end;
  179. {error, Reason}->
  180. ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
  181. {?CONNACK_CREDENTIALS, false, State1}
  182. end;
  183. ReturnCode ->
  184. {ReturnCode, false, State1}
  185. end,
  186. %% Run hooks
  187. emqttd_hooks:run('client.connected', [ReturnCode1], client(State3)),
  188. %% Send connack
  189. send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3),
  190. %% stop if authentication failure
  191. stop_if_auth_failure(ReturnCode1, State3);
  192. process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State = #proto_state{is_superuser = IsSuper}) ->
  193. case IsSuper orelse allow == check_acl(publish, Topic, client(State)) of
  194. true -> publish(Packet, State);
  195. false -> ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
  196. end,
  197. {ok, State};
  198. process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
  199. emqttd_session:puback(Session, PacketId),
  200. {ok, State};
  201. process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
  202. emqttd_session:pubrec(Session, PacketId),
  203. send(?PUBREL_PACKET(PacketId), State);
  204. process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
  205. emqttd_session:pubrel(Session, PacketId),
  206. send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
  207. process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})->
  208. emqttd_session:pubcomp(Session, PacketId), {ok, State};
  209. %% Protect from empty topic table
  210. process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
  211. send(?SUBACK_PACKET(PacketId, []), State);
  212. %% TODO: refactor later...
  213. process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable),
  214. State = #proto_state{client_id = ClientId,
  215. username = Username,
  216. is_superuser = IsSuperuser,
  217. mountpoint = MountPoint,
  218. session = Session}) ->
  219. Client = client(State), TopicTable = parse_topic_table(RawTopicTable),
  220. AllowDenies = if
  221. IsSuperuser -> [];
  222. true -> [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable]
  223. end,
  224. case lists:member(deny, AllowDenies) of
  225. true ->
  226. ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
  227. send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
  228. false ->
  229. case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
  230. {ok, TopicTable1} ->
  231. emqttd_session:subscribe(Session, PacketId, mount(MountPoint, TopicTable1)),
  232. {ok, State};
  233. {stop, _} ->
  234. {ok, State}
  235. end
  236. end;
  237. %% Protect from empty topic list
  238. process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
  239. send(?UNSUBACK_PACKET(PacketId), State);
  240. process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics),
  241. State = #proto_state{client_id = ClientId,
  242. username = Username,
  243. mountpoint = MountPoint,
  244. session = Session}) ->
  245. case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
  246. {ok, TopicTable} ->
  247. emqttd_session:unsubscribe(Session, mount(MountPoint, TopicTable));
  248. {stop, _} ->
  249. ok
  250. end,
  251. send(?UNSUBACK_PACKET(PacketId), State);
  252. process(?PACKET(?PINGREQ), State) ->
  253. send(?PACKET(?PINGRESP), State);
  254. process(?PACKET(?DISCONNECT), State) ->
  255. % Clean willmsg
  256. {stop, normal, State#proto_state{will_msg = undefined}}.
  257. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
  258. #proto_state{client_id = ClientId,
  259. username = Username,
  260. mountpoint = MountPoint,
  261. session = Session}) ->
  262. Msg = emqttd_message:from_packet(Username, ClientId, Packet),
  263. emqttd_session:publish(Session, mount(MountPoint, Msg));
  264. publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
  265. with_puback(?PUBACK, Packet, State);
  266. publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
  267. with_puback(?PUBREC, Packet, State).
  268. with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
  269. State = #proto_state{client_id = ClientId,
  270. username = Username,
  271. mountpoint = MountPoint,
  272. session = Session}) ->
  273. Msg = emqttd_message:from_packet(Username, ClientId, Packet),
  274. case emqttd_session:publish(Session, mount(MountPoint, Msg)) of
  275. ok ->
  276. send(?PUBACK_PACKET(Type, PacketId), State);
  277. {error, Error} ->
  278. ?LOG(error, "PUBLISH ~p error: ~p", [PacketId, Error], State)
  279. end.
  280. -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
  281. send(Msg, State = #proto_state{client_id = ClientId,
  282. username = Username,
  283. mountpoint = MountPoint})
  284. when is_record(Msg, mqtt_message) ->
  285. emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
  286. send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State);
  287. send(Packet = ?PACKET(Type),
  288. State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
  289. trace(send, Packet, State),
  290. emqttd_metrics:sent(Packet),
  291. SendFun(Packet),
  292. Stats1 = inc_stats(send, Type, Stats),
  293. {ok, State#proto_state{stats_data = Stats1}}.
  294. trace(recv, Packet, ProtoState) ->
  295. ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
  296. trace(send, Packet, ProtoState) ->
  297. ?LOG(debug, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
  298. inc_stats(_Direct, _Type, Stats = #proto_stats{enable_stats = false}) ->
  299. Stats;
  300. inc_stats(recv, Type, Stats) ->
  301. #proto_stats{recv_pkt = PktCnt, recv_msg = MsgCnt} = Stats,
  302. inc_stats(Type, #proto_stats.recv_pkt, PktCnt, #proto_stats.recv_msg, MsgCnt, Stats);
  303. inc_stats(send, Type, Stats) ->
  304. #proto_stats{send_pkt = PktCnt, send_msg = MsgCnt} = Stats,
  305. inc_stats(Type, #proto_stats.send_pkt, PktCnt, #proto_stats.send_msg, MsgCnt, Stats).
  306. inc_stats(Type, PktPos, PktCnt, MsgPos, MsgCnt, Stats) ->
  307. Stats1 = setelement(PktPos, Stats, PktCnt + 1),
  308. case Type =:= ?PUBLISH of
  309. true -> setelement(MsgPos, Stats1, MsgCnt + 1);
  310. false -> Stats1
  311. end.
  312. stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH ->
  313. {stop, {shutdown, auth_failure}, State};
  314. stop_if_auth_failure(_RC, State) ->
  315. {ok, State}.
  316. shutdown(_Error, #proto_state{client_id = undefined}) ->
  317. ignore;
  318. shutdown(conflict, #proto_state{client_id = _ClientId}) ->
  319. %% let it down
  320. %% emqttd_cm:unreg(ClientId);
  321. ignore;
  322. shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
  323. ?LOG(debug, "Shutdown for ~p", [Error], State),
  324. Client = client(State),
  325. %% Auth failure not publish the will message
  326. case Error =:= auth_failure of
  327. true -> ok;
  328. false -> send_willmsg(Client, WillMsg)
  329. end,
  330. emqttd_hooks:run('client.disconnected', [Error], Client),
  331. %% let it down
  332. %% emqttd_cm:unreg(ClientId).
  333. ok.
  334. willmsg(Packet, #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) ->
  335. case emqttd_message:from_packet(Packet) of
  336. undefined -> undefined;
  337. Msg -> mount(MountPoint, Msg)
  338. end.
  339. %% Generate a client if if nulll
  340. maybe_set_clientid(State = #proto_state{client_id = NullId})
  341. when NullId =:= undefined orelse NullId =:= <<>> ->
  342. {_, NPid, _} = emqttd_guid:new(),
  343. ClientId = iolist_to_binary(["emqttd_", integer_to_list(NPid)]),
  344. State#proto_state{client_id = ClientId};
  345. maybe_set_clientid(State) ->
  346. State.
  347. send_willmsg(_Client, undefined) ->
  348. ignore;
  349. send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
  350. emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
  351. start_keepalive(0, _State) -> ignore;
  352. start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 ->
  353. self() ! {keepalive, start, round(Sec * Backoff)}.
  354. %%--------------------------------------------------------------------
  355. %% Validate Packets
  356. %%--------------------------------------------------------------------
  357. validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
  358. case validate_protocol(Connect) of
  359. true ->
  360. case validate_clientid(Connect, ProtoState) of
  361. true ->
  362. ?CONNACK_ACCEPT;
  363. false ->
  364. ?CONNACK_INVALID_ID
  365. end;
  366. false ->
  367. ?CONNACK_PROTO_VER
  368. end.
  369. validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
  370. lists:member({Ver, Name}, ?PROTOCOL_NAMES).
  371. validate_clientid(#mqtt_packet_connect{client_id = ClientId},
  372. #proto_state{max_clientid_len = MaxLen})
  373. when (byte_size(ClientId) >= 1) andalso (byte_size(ClientId) =< MaxLen) ->
  374. true;
  375. %% Issue#599: Null clientId and clean_sess = false
  376. validate_clientid(#mqtt_packet_connect{client_id = ClientId,
  377. clean_sess = CleanSess}, _ProtoState)
  378. when byte_size(ClientId) == 0 andalso (not CleanSess) ->
  379. false;
  380. %% MQTT3.1.1 allow null clientId.
  381. validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V4,
  382. client_id = ClientId}, _ProtoState)
  383. when byte_size(ClientId) =:= 0 ->
  384. true;
  385. validate_clientid(#mqtt_packet_connect{proto_ver = ProtoVer,
  386. clean_sess = CleanSess}, ProtoState) ->
  387. ?LOG(warning, "Invalid clientId. ProtoVer: ~p, CleanSess: ~s",
  388. [ProtoVer, CleanSess], ProtoState),
  389. false.
  390. validate_packet(?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload)) ->
  391. case emqttd_topic:validate({name, Topic}) of
  392. true -> ok;
  393. false -> {error, badtopic}
  394. end;
  395. validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) ->
  396. validate_topics(filter, TopicTable);
  397. validate_packet(?UNSUBSCRIBE_PACKET(_PacketId, Topics)) ->
  398. validate_topics(filter, Topics);
  399. validate_packet(_Packet) ->
  400. ok.
  401. validate_topics(_Type, []) ->
  402. {error, empty_topics};
  403. validate_topics(Type, TopicTable = [{_Topic, _Qos}|_])
  404. when Type =:= name orelse Type =:= filter ->
  405. Valid = fun(Topic, Qos) ->
  406. emqttd_topic:validate({Type, Topic}) and validate_qos(Qos)
  407. end,
  408. case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of
  409. [] -> ok;
  410. _ -> {error, badtopic}
  411. end;
  412. validate_topics(Type, Topics = [Topic0|_]) when is_binary(Topic0) ->
  413. case [Topic || Topic <- Topics, not emqttd_topic:validate({Type, Topic})] of
  414. [] -> ok;
  415. _ -> {error, badtopic}
  416. end.
  417. validate_qos(undefined) ->
  418. true;
  419. validate_qos(Qos) when ?IS_QOS(Qos) ->
  420. true;
  421. validate_qos(_) ->
  422. false.
  423. parse_topic_table(TopicTable) ->
  424. lists:map(fun({Topic0, Qos}) ->
  425. {Topic, Opts} = emqttd_topic:parse(Topic0),
  426. {Topic, [{qos, Qos}|Opts]}
  427. end, TopicTable).
  428. parse_topics(Topics) ->
  429. [emqttd_topic:parse(Topic) || Topic <- Topics].
  430. authenticate(Client, Password) ->
  431. case emqttd_access_control:auth(Client, Password) of
  432. ok -> {ok, false};
  433. {ok, IsSuper} -> {ok, IsSuper};
  434. {error, Error} -> {error, Error}
  435. end.
  436. %% PUBLISH ACL is cached in process dictionary.
  437. check_acl(publish, Topic, Client) ->
  438. IfCache = emqttd:env(cache_acl, true),
  439. case {IfCache, get({acl, publish, Topic})} of
  440. {true, undefined} ->
  441. AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic),
  442. put({acl, publish, Topic}, AllowDeny),
  443. AllowDeny;
  444. {true, AllowDeny} ->
  445. AllowDeny;
  446. {false, _} ->
  447. emqttd_access_control:check_acl(Client, publish, Topic)
  448. end;
  449. check_acl(subscribe, Topic, Client) ->
  450. emqttd_access_control:check_acl(Client, subscribe, Topic).
  451. sp(true) -> 1;
  452. sp(false) -> 0.
  453. %%--------------------------------------------------------------------
  454. %% Mount Point
  455. %%--------------------------------------------------------------------
  456. mount(undefined, Any) ->
  457. Any;
  458. mount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
  459. Msg#mqtt_message{topic = <<MountPoint/binary, Topic/binary>>};
  460. mount(MountPoint, TopicTable) when is_list(TopicTable) ->
  461. [{<<MountPoint/binary, Topic/binary>>, Opts} || {Topic, Opts} <- TopicTable].
  462. unmount(undefined, Any) ->
  463. Any;
  464. unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
  465. case catch split_binary(Topic, byte_size(MountPoint)) of
  466. {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0};
  467. _ -> Msg
  468. end.