emqttd_protocol.erl 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. %%%-----------------------------------------------------------------------------
  2. %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
  3. %%%
  4. %%% Permission is hereby granted, free of charge, to any person obtaining a copy
  5. %%% of this software and associated documentation files (the "Software"), to deal
  6. %%% in the Software without restriction, including without limitation the rights
  7. %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. %%% copies of the Software, and to permit persons to whom the Software is
  9. %%% furnished to do so, subject to the following conditions:
  10. %%%
  11. %%% The above copyright notice and this permission notice shall be included in all
  12. %%% copies or substantial portions of the Software.
  13. %%%
  14. %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. %%% SOFTWARE.
  21. %%%-----------------------------------------------------------------------------
  22. %%% @doc
  23. %%% emqttd protocol.
  24. %%%
  25. %%% @end
  26. %%%-----------------------------------------------------------------------------
  27. -module(emqttd_protocol).
  28. -author("Feng Lee <feng@emqtt.io>").
  29. -include("emqttd.hrl").
  30. -include("emqttd_protocol.hrl").
  31. %% API
  32. -export([init/3, info/1, clientid/1, client/1, session/1]).
  33. -export([received/2, send/2, redeliver/2, shutdown/2]).
  34. -export([process/2]).
  35. %% Protocol State
  36. -record(proto_state, {peername,
  37. sendfun,
  38. connected = false, %received CONNECT action?
  39. proto_ver,
  40. proto_name,
  41. username,
  42. client_id,
  43. clean_sess,
  44. session,
  45. will_msg,
  46. keepalive,
  47. max_clientid_len = ?MAX_CLIENTID_LEN,
  48. client_pid,
  49. ws_initial_headers, %% Headers from first HTTP request for websocket client
  50. connected_at}).
  51. -type proto_state() :: #proto_state{}.
  52. %%------------------------------------------------------------------------------
  53. %% @doc Init protocol
  54. %% @end
  55. %%------------------------------------------------------------------------------
  56. init(Peername, SendFun, Opts) ->
  57. MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
  58. WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts),
  59. #proto_state{peername = Peername,
  60. sendfun = SendFun,
  61. max_clientid_len = MaxLen,
  62. client_pid = self(),
  63. ws_initial_headers = WsInitialHeaders}.
  64. info(#proto_state{client_id = ClientId,
  65. username = Username,
  66. peername = Peername,
  67. proto_ver = ProtoVer,
  68. proto_name = ProtoName,
  69. keepalive = KeepAlive,
  70. clean_sess = CleanSess,
  71. ws_initial_headers = WsInitialHeaders,
  72. will_msg = WillMsg,
  73. connected_at = ConnectedAt}) ->
  74. [{client_id, ClientId},
  75. {username, Username},
  76. {peername, Peername},
  77. {proto_ver, ProtoVer},
  78. {proto_name, ProtoName},
  79. {keepalive, KeepAlive},
  80. {clean_sess, CleanSess},
  81. {ws_initial_headers, WsInitialHeaders},
  82. {will_msg, WillMsg},
  83. {connected_at, ConnectedAt}].
  84. clientid(#proto_state{client_id = ClientId}) ->
  85. ClientId.
  86. client(#proto_state{client_id = ClientId,
  87. peername = Peername,
  88. username = Username,
  89. clean_sess = CleanSess,
  90. proto_ver = ProtoVer,
  91. keepalive = Keepalive,
  92. will_msg = WillMsg,
  93. client_pid = Pid,
  94. ws_initial_headers = WsInitialHeaders,
  95. connected_at = Time}) ->
  96. WillTopic = if
  97. WillMsg =:= undefined -> undefined;
  98. true -> WillMsg#mqtt_message.topic
  99. end,
  100. #mqtt_client{client_id = ClientId,
  101. client_pid = Pid,
  102. username = Username,
  103. peername = Peername,
  104. clean_sess = CleanSess,
  105. proto_ver = ProtoVer,
  106. keepalive = Keepalive,
  107. will_topic = WillTopic,
  108. ws_initial_headers = WsInitialHeaders,
  109. connected_at = Time}.
  110. session(#proto_state{session = Session}) ->
  111. Session.
  112. %% CONNECT – Client requests a connection to a Server
  113. %%A Client can only send the CONNECT Packet once over a Network Connection.
  114. -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
  115. received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
  116. process(Packet, State#proto_state{connected = true});
  117. received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
  118. {error, protocol_bad_connect, State};
  119. %%Received other packets when CONNECT not arrived.
  120. received(_Packet, State = #proto_state{connected = false}) ->
  121. {error, protocol_not_connected, State};
  122. received(Packet = ?PACKET(_Type), State) ->
  123. trace(recv, Packet, State),
  124. case validate_packet(Packet) of
  125. ok ->
  126. process(Packet, State);
  127. {error, Reason} ->
  128. {error, Reason, State}
  129. end.
  130. process(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
  131. #mqtt_packet_connect{proto_ver = ProtoVer,
  132. proto_name = ProtoName,
  133. username = Username,
  134. password = Password,
  135. clean_sess = CleanSess,
  136. keep_alive = KeepAlive,
  137. client_id = ClientId} = Var,
  138. State1 = State0#proto_state{proto_ver = ProtoVer,
  139. proto_name = ProtoName,
  140. username = Username,
  141. client_id = ClientId,
  142. clean_sess = CleanSess,
  143. keepalive = KeepAlive,
  144. will_msg = willmsg(Var),
  145. connected_at = os:timestamp()},
  146. trace(recv, Packet, State1),
  147. {ReturnCode1, State3} =
  148. case validate_connect(Var, State1) of
  149. ?CONNACK_ACCEPT ->
  150. case emqttd_access_control:auth(client(State1), Password) of
  151. ok ->
  152. %% Generate clientId if null
  153. State2 = maybe_set_clientid(State1),
  154. %% Start session
  155. case emqttd_sm:start_session(CleanSess, clientid(State2)) of
  156. {ok, Session} ->
  157. %% Register the client
  158. emqttd_cm:register(client(State2)),
  159. %% Start keepalive
  160. start_keepalive(KeepAlive),
  161. %% ACCEPT
  162. {?CONNACK_ACCEPT, State2#proto_state{session = Session}};
  163. {error, Error} ->
  164. exit({shutdown, Error})
  165. end;
  166. {error, Reason}->
  167. lager:error("~s@~s: username '~s' login failed for ~s",
  168. [ClientId, emqttd_net:format(Peername), Username, Reason]),
  169. {?CONNACK_CREDENTIALS, State1}
  170. end;
  171. ReturnCode ->
  172. {ReturnCode, State1}
  173. end,
  174. %% Run hooks
  175. emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]),
  176. %% Send connack
  177. send(?CONNACK_PACKET(ReturnCode1), State3);
  178. process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
  179. State = #proto_state{client_id = ClientId}) ->
  180. case check_acl(publish, Topic, State) of
  181. allow ->
  182. publish(Packet, State);
  183. deny ->
  184. lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
  185. end,
  186. {ok, State};
  187. process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
  188. emqttd_session:puback(Session, PacketId), {ok, State};
  189. process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
  190. emqttd_session:pubrec(Session, PacketId),
  191. send(?PUBREL_PACKET(PacketId), State);
  192. process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
  193. emqttd_session:pubrel(Session, PacketId),
  194. send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
  195. process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})->
  196. emqttd_session:pubcomp(Session, PacketId), {ok, State};
  197. %% protect from empty topic list
  198. process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
  199. send(?SUBACK_PACKET(PacketId, []), State);
  200. process(?SUBSCRIBE_PACKET(PacketId, TopicTable),
  201. State = #proto_state{client_id = ClientId, session = Session}) ->
  202. AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
  203. case lists:member(deny, AllowDenies) of
  204. true ->
  205. lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
  206. send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
  207. false ->
  208. AckFun = fun(GrantedQos) ->
  209. send(?SUBACK_PACKET(PacketId, GrantedQos), State)
  210. end,
  211. emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State}
  212. end;
  213. %% protect from empty topic list
  214. process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
  215. send(?UNSUBACK_PACKET(PacketId), State);
  216. process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
  217. emqttd_session:unsubscribe(Session, Topics),
  218. send(?UNSUBACK_PACKET(PacketId), State);
  219. process(?PACKET(?PINGREQ), State) ->
  220. send(?PACKET(?PINGRESP), State);
  221. process(?PACKET(?DISCONNECT), State) ->
  222. % clean willmsg
  223. {stop, normal, State#proto_state{will_msg = undefined}}.
  224. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
  225. #proto_state{client_id = ClientId, session = Session}) ->
  226. Msg = emqttd_message:from_packet(ClientId, Packet),
  227. emqttd_session:publish(Session, Msg);
  228. publish(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId),
  229. State = #proto_state{client_id = ClientId, session = Session}) ->
  230. Msg = emqttd_message:from_packet(ClientId, Packet),
  231. case emqttd_session:publish(Session, Msg) of
  232. ok ->
  233. send(?PUBACK_PACKET(?PUBACK, PacketId), State);
  234. {error, Error} ->
  235. lager:error("Client(~s): publish qos1 error - ~p", [ClientId, Error])
  236. end;
  237. publish(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
  238. State = #proto_state{client_id = ClientId, session = Session}) ->
  239. Msg = emqttd_message:from_packet(ClientId, Packet),
  240. case emqttd_session:publish(Session, Msg) of
  241. ok ->
  242. send(?PUBACK_PACKET(?PUBREC, PacketId), State);
  243. {error, Error} ->
  244. lager:error("Client(~s): publish qos2 error - ~p", [ClientId, Error])
  245. end.
  246. -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
  247. send(Msg, State) when is_record(Msg, mqtt_message) ->
  248. send(emqttd_message:to_packet(Msg), State);
  249. send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername})
  250. when is_record(Packet, mqtt_packet) ->
  251. trace(send, Packet, State),
  252. emqttd_metrics:sent(Packet),
  253. Data = emqttd_serialiser:serialise(Packet),
  254. lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
  255. emqttd_metrics:inc('bytes/sent', size(Data)),
  256. SendFun(Data),
  257. {ok, State}.
  258. trace(recv, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
  259. lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
  260. [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]);
  261. trace(send, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
  262. lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
  263. [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]).
  264. %% @doc redeliver PUBREL PacketId
  265. redeliver({?PUBREL, PacketId}, State) ->
  266. send(?PUBREL_PACKET(PacketId), State).
  267. shutdown(Error, #proto_state{client_id = undefined}) ->
  268. lager:info("Protocol shutdown ~p", [Error]),
  269. ignore;
  270. shutdown(duplicate_id, #proto_state{client_id = ClientId}) ->
  271. %% unregister the device
  272. emqttd_cm:unregister(ClientId);
  273. %% TODO: ClientId??
  274. shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
  275. lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",
  276. [ClientId, emqttd_net:format(Peername), Error]),
  277. send_willmsg(ClientId, WillMsg),
  278. emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]),
  279. emqttd_cm:unregister(ClientId).
  280. willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
  281. emqttd_message:from_packet(Packet).
  282. %% Generate a client if if nulll
  283. maybe_set_clientid(State = #proto_state{client_id = NullId})
  284. when NullId =:= undefined orelse NullId =:= <<>> ->
  285. {_, NPid, _} = emqttd_guid:new(),
  286. ClientId = iolist_to_binary(["emqttd_", integer_to_list(NPid)]),
  287. State#proto_state{client_id = ClientId};
  288. maybe_set_clientid(State) ->
  289. State.
  290. send_willmsg(_ClientId, undefined) ->
  291. ignore;
  292. send_willmsg(ClientId, WillMsg) ->
  293. lager:info("Client ~s send willmsg: ~p", [ClientId, WillMsg]),
  294. emqttd_pubsub:publish(WillMsg#mqtt_message{from = ClientId}).
  295. start_keepalive(0) -> ignore;
  296. start_keepalive(Sec) when Sec > 0 ->
  297. self() ! {keepalive, start, round(Sec * 1.2)}.
  298. %%----------------------------------------------------------------------------
  299. %% Validate Packets
  300. %%----------------------------------------------------------------------------
  301. validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
  302. case validate_protocol(Connect) of
  303. true ->
  304. case validate_clientid(Connect, ProtoState) of
  305. true ->
  306. ?CONNACK_ACCEPT;
  307. false ->
  308. ?CONNACK_INVALID_ID
  309. end;
  310. false ->
  311. ?CONNACK_PROTO_VER
  312. end.
  313. validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
  314. lists:member({Ver, Name}, ?PROTOCOL_NAMES).
  315. validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen})
  316. when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) ->
  317. true;
  318. %% MQTT3.1.1 allow null clientId.
  319. validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311,
  320. client_id = ClientId}, _ProtoState)
  321. when size(ClientId) =:= 0 ->
  322. true;
  323. validate_clientid(#mqtt_packet_connect{proto_ver = Ver,
  324. clean_sess = CleanSess,
  325. client_id = ClientId}, _ProtoState) ->
  326. lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
  327. false.
  328. validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
  329. variable = #mqtt_packet_publish{topic_name = Topic}}) ->
  330. case emqttd_topic:validate({name, Topic}) of
  331. true -> ok;
  332. false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
  333. end;
  334. validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE},
  335. variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
  336. validate_topics(filter, Topics);
  337. validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE},
  338. variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
  339. validate_topics(filter, Topics);
  340. validate_packet(_Packet) ->
  341. ok.
  342. validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
  343. lager:error("Empty Topics!"),
  344. {error, empty_topics};
  345. validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
  346. ErrTopics = [Topic || {Topic, Qos} <- Topics,
  347. not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))],
  348. case ErrTopics of
  349. [] -> ok;
  350. _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
  351. end.
  352. validate_qos(undefined) ->
  353. true;
  354. validate_qos(Qos) when ?IS_QOS(Qos) ->
  355. true;
  356. validate_qos(_) ->
  357. false.
  358. %% publish ACL is cached in process dictionary.
  359. check_acl(publish, Topic, State) ->
  360. case get({acl, publish, Topic}) of
  361. undefined ->
  362. AllowDeny = emqttd_access_control:check_acl(client(State), publish, Topic),
  363. put({acl, publish, Topic}, AllowDeny),
  364. AllowDeny;
  365. AllowDeny ->
  366. AllowDeny
  367. end;
  368. check_acl(subscribe, Topic, State) ->
  369. emqttd_access_control:check_acl(client(State), subscribe, Topic).