emqttd_protocol.erl 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. %%%-----------------------------------------------------------------------------
  2. %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
  3. %%%
  4. %%% Permission is hereby granted, free of charge, to any person obtaining a copy
  5. %%% of this software and associated documentation files (the "Software"), to deal
  6. %%% in the Software without restriction, including without limitation the rights
  7. %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. %%% copies of the Software, and to permit persons to whom the Software is
  9. %%% furnished to do so, subject to the following conditions:
  10. %%%
  11. %%% The above copyright notice and this permission notice shall be included in all
  12. %%% copies or substantial portions of the Software.
  13. %%%
  14. %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. %%% SOFTWARE.
  21. %%%-----------------------------------------------------------------------------
  22. %%% @doc
  23. %%% emqttd protocol.
  24. %%%
  25. %%% @end
  26. %%%-----------------------------------------------------------------------------
  27. -module(emqttd_protocol).
  28. -author("Feng Lee <feng@emqtt.io>").
  29. -include("emqttd.hrl").
  30. -include("emqttd_protocol.hrl").
  31. %% API
  32. -export([init/3, info/1, clientid/1, client/1]).
  33. -export([received/2, send/2, redeliver/2, shutdown/2]).
  34. -export([handle/2]).
  35. %% Protocol State
  36. -record(proto_state, {peername,
  37. sendfun,
  38. connected = false, %received CONNECT action?
  39. proto_ver,
  40. proto_name,
  41. username,
  42. client_id,
  43. clean_sess,
  44. session,
  45. will_msg,
  46. keepalive,
  47. max_clientid_len = ?MAX_CLIENTID_LEN,
  48. client_pid,
  49. connected_at}).
  50. -type proto_state() :: #proto_state{}.
  51. %%------------------------------------------------------------------------------
  52. %% @doc Init protocol
  53. %% @end
  54. %%------------------------------------------------------------------------------
  55. init(Peername, SendFun, Opts) ->
  56. MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
  57. #proto_state{peername = Peername,
  58. sendfun = SendFun,
  59. max_clientid_len = MaxLen,
  60. client_pid = self()}.
  61. info(#proto_state{client_id = ClientId,
  62. username = Username,
  63. peername = Peername,
  64. proto_ver = ProtoVer,
  65. proto_name = ProtoName,
  66. keepalive = KeepAlive,
  67. clean_sess = CleanSess,
  68. will_msg = WillMsg,
  69. connected_at = ConnectedAt}) ->
  70. [{client_id, ClientId},
  71. {username, Username},
  72. {peername, Peername},
  73. {proto_ver, ProtoVer},
  74. {proto_name, ProtoName},
  75. {keepalive, KeepAlive},
  76. {clean_sess, CleanSess},
  77. {will_msg, WillMsg},
  78. {connected_at, ConnectedAt}].
  79. clientid(#proto_state{client_id = ClientId}) ->
  80. ClientId.
  81. client(#proto_state{client_id = ClientId,
  82. peername = Peername,
  83. username = Username,
  84. clean_sess = CleanSess,
  85. proto_ver = ProtoVer,
  86. keepalive = Keepalive,
  87. will_msg = WillMsg,
  88. client_pid = Pid,
  89. connected_at = Time}) ->
  90. WillTopic = if
  91. WillMsg =:= undefined -> undefined;
  92. true -> WillMsg#mqtt_message.topic
  93. end,
  94. #mqtt_client{client_id = ClientId,
  95. client_pid = Pid,
  96. username = Username,
  97. peername = Peername,
  98. clean_sess = CleanSess,
  99. proto_ver = ProtoVer,
  100. keepalive = Keepalive,
  101. will_topic = WillTopic,
  102. connected_at = Time}.
  103. %% CONNECT – Client requests a connection to a Server
  104. %%A Client can only send the CONNECT Packet once over a Network Connection.
  105. -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
  106. received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
  107. handle(Packet, State#proto_state{connected = true});
  108. received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
  109. {error, protocol_bad_connect, State};
  110. %%Received other packets when CONNECT not arrived.
  111. received(_Packet, State = #proto_state{connected = false}) ->
  112. {error, protocol_not_connected, State};
  113. received(Packet = ?PACKET(_Type), State) ->
  114. trace(recv, Packet, State),
  115. case validate_packet(Packet) of
  116. ok ->
  117. handle(Packet, State);
  118. {error, Reason} ->
  119. {error, Reason, State}
  120. end.
  121. handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
  122. #mqtt_packet_connect{proto_ver = ProtoVer,
  123. proto_name = ProtoName,
  124. username = Username,
  125. password = Password,
  126. clean_sess = CleanSess,
  127. keep_alive = KeepAlive,
  128. client_id = ClientId} = Var,
  129. State1 = State0#proto_state{proto_ver = ProtoVer,
  130. proto_name = ProtoName,
  131. username = Username,
  132. client_id = ClientId,
  133. clean_sess = CleanSess,
  134. keepalive = KeepAlive,
  135. will_msg = willmsg(Var),
  136. connected_at = os:timestamp()},
  137. trace(recv, Packet, State1),
  138. {ReturnCode1, State3} =
  139. case validate_connect(Var, State1) of
  140. ?CONNACK_ACCEPT ->
  141. case emqttd_access_control:auth(client(State1), Password) of
  142. ok ->
  143. %% Generate clientId if null
  144. State2 = maybe_set_clientid(State1),
  145. %% Start session
  146. case emqttd_sm:start_session(CleanSess, clientid(State2)) of
  147. {ok, Session} ->
  148. %% Register the client
  149. emqttd_cm:register(client(State2)),
  150. %% Start keepalive
  151. start_keepalive(KeepAlive),
  152. %% ACCEPT
  153. {?CONNACK_ACCEPT, State2#proto_state{session = Session}};
  154. {error, Error} ->
  155. exit({shutdown, Error})
  156. end;
  157. {error, Reason}->
  158. lager:error("~s@~s: username '~s', login failed - ~s",
  159. [ClientId, emqttd_net:format(Peername), Username, Reason]),
  160. {?CONNACK_CREDENTIALS, State1}
  161. end;
  162. ReturnCode ->
  163. {ReturnCode, State1}
  164. end,
  165. %% Run hooks
  166. emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]),
  167. %% Send connack
  168. send(?CONNACK_PACKET(ReturnCode1), State3);
  169. handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
  170. State = #proto_state{client_id = ClientId}) ->
  171. case check_acl(publish, Topic, State) of
  172. allow ->
  173. publish(Packet, State);
  174. deny ->
  175. lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
  176. end,
  177. {ok, State};
  178. handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
  179. emqttd_session:puback(Session, PacketId),
  180. {ok, State};
  181. handle(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
  182. emqttd_session:pubrec(Session, PacketId),
  183. send(?PUBREL_PACKET(PacketId), State);
  184. handle(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
  185. emqttd_session:pubrel(Session, PacketId),
  186. send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
  187. handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session}) ->
  188. emqttd_session:pubcomp(Session, PacketId),
  189. {ok, State};
  190. %% protect from empty topic list
  191. handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
  192. send(?SUBACK_PACKET(PacketId, []), State);
  193. handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) ->
  194. AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
  195. case lists:member(deny, AllowDenies) of
  196. true ->
  197. %%TODO: return 128 QoS when deny... no need to SUBACK?
  198. lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
  199. {ok, State};
  200. false ->
  201. %%TODO: GrantedQos should be renamed.
  202. {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
  203. send(?SUBACK_PACKET(PacketId, GrantedQos), State)
  204. end;
  205. handle({subscribe, TopicTable}, State = #proto_state{session = Session}) ->
  206. {ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
  207. {ok, State};
  208. %% protect from empty topic list
  209. handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
  210. send(?UNSUBACK_PACKET(PacketId), State);
  211. handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
  212. ok = emqttd_session:unsubscribe(Session, Topics),
  213. send(?UNSUBACK_PACKET(PacketId), State);
  214. handle(?PACKET(?PINGREQ), State) ->
  215. send(?PACKET(?PINGRESP), State);
  216. handle(?PACKET(?DISCONNECT), State) ->
  217. % clean willmsg
  218. {stop, normal, State#proto_state{will_msg = undefined}}.
  219. publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) ->
  220. emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
  221. publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
  222. case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
  223. ok ->
  224. send(?PUBACK_PACKET(?PUBACK, PacketId), State);
  225. {error, Error} ->
  226. lager:error("Client ~s: publish qos1 error - ~p", [ClientId, Error])
  227. end;
  228. publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
  229. case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
  230. ok ->
  231. send(?PUBACK_PACKET(?PUBREC, PacketId), State);
  232. {error, Error} ->
  233. lager:error("Client ~s: publish qos2 error - ~p", [ClientId, Error])
  234. end.
  235. -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
  236. send(Msg, State) when is_record(Msg, mqtt_message) ->
  237. send(emqttd_message:to_packet(Msg), State);
  238. send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername})
  239. when is_record(Packet, mqtt_packet) ->
  240. trace(send, Packet, State),
  241. sent_stats(Packet),
  242. Data = emqttd_serialiser:serialise(Packet),
  243. lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
  244. emqttd_metrics:inc('bytes/sent', size(Data)),
  245. SendFun(Data),
  246. {ok, State}.
  247. trace(recv, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
  248. lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
  249. [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]);
  250. trace(send, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
  251. lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
  252. [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]).
  253. %% @doc redeliver PUBREL PacketId
  254. redeliver({?PUBREL, PacketId}, State) ->
  255. send(?PUBREL_PACKET(PacketId), State).
  256. shutdown(duplicate_id, _State) ->
  257. quiet; %%
  258. shutdown(Error, #proto_state{client_id = undefined}) ->
  259. lager:info("Protocol shutdown ~p", [Error]),
  260. ignore;
  261. shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
  262. lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",
  263. [ClientId, emqttd_net:format(Peername), Error]),
  264. send_willmsg(ClientId, WillMsg),
  265. emqttd_cm:unregister(ClientId),
  266. emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]).
  267. willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
  268. emqttd_message:from_packet(Packet).
  269. %% Generate a client if if nulll
  270. maybe_set_clientid(State = #proto_state{client_id = NullId})
  271. when NullId =:= undefined orelse NullId =:= <<>> ->
  272. {_, NPid, _} = emqttd_guid:new(),
  273. ClientId = iolist_to_binary(["emqttd_", integer_to_list(NPid)]),
  274. State#proto_state{client_id = ClientId};
  275. maybe_set_clientid(State) ->
  276. State.
  277. send_willmsg(_ClientId, undefined) ->
  278. ignore;
  279. send_willmsg(ClientId, WillMsg) ->
  280. lager:info("Client ~s send willmsg: ~p", [ClientId, WillMsg]),
  281. emqttd_pubsub:publish(WillMsg#mqtt_message{from = ClientId}).
  282. start_keepalive(0) -> ignore;
  283. start_keepalive(Sec) when Sec > 0 ->
  284. self() ! {keepalive, start, round(Sec * 1.5)}.
  285. %%----------------------------------------------------------------------------
  286. %% Validate Packets
  287. %%----------------------------------------------------------------------------
  288. validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
  289. case validate_protocol(Connect) of
  290. true ->
  291. case validate_clientid(Connect, ProtoState) of
  292. true ->
  293. ?CONNACK_ACCEPT;
  294. false ->
  295. ?CONNACK_INVALID_ID
  296. end;
  297. false ->
  298. ?CONNACK_PROTO_VER
  299. end.
  300. validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
  301. lists:member({Ver, Name}, ?PROTOCOL_NAMES).
  302. validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen})
  303. when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) ->
  304. true;
  305. %% MQTT3.1.1 allow null clientId.
  306. validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState)
  307. when size(ClientId) =:= 0 ->
  308. true;
  309. validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) ->
  310. lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
  311. false.
  312. validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
  313. variable = #mqtt_packet_publish{topic_name = Topic}}) ->
  314. case emqttd_topic:validate({name, Topic}) of
  315. true -> ok;
  316. false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
  317. end;
  318. validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE},
  319. variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
  320. validate_topics(filter, Topics);
  321. validate_packet(#mqtt_packet{ header = #mqtt_packet_header{type = ?UNSUBSCRIBE},
  322. variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
  323. validate_topics(filter, Topics);
  324. validate_packet(_Packet) ->
  325. ok.
  326. validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
  327. lager:error("Empty Topics!"),
  328. {error, empty_topics};
  329. validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
  330. ErrTopics = [Topic || {Topic, Qos} <- Topics,
  331. not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))],
  332. case ErrTopics of
  333. [] -> ok;
  334. _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
  335. end.
  336. validate_qos(undefined) -> true;
  337. validate_qos(Qos) when Qos =< ?QOS_2 -> true;
  338. validate_qos(_) -> false.
  339. %% publish ACL is cached in process dictionary.
  340. check_acl(publish, Topic, State) ->
  341. case get({acl, publish, Topic}) of
  342. undefined ->
  343. AllowDeny = emqttd_access_control:check_acl(client(State), publish, Topic),
  344. put({acl, publish, Topic}, AllowDeny),
  345. AllowDeny;
  346. AllowDeny ->
  347. AllowDeny
  348. end;
  349. check_acl(subscribe, Topic, State) ->
  350. emqttd_access_control:check_acl(client(State), subscribe, Topic).
  351. sent_stats(?PACKET(Type)) ->
  352. emqttd_metrics:inc('packets/sent'),
  353. inc(Type).
  354. inc(?CONNACK) ->
  355. emqttd_metrics:inc('packets/connack');
  356. inc(?PUBLISH) ->
  357. emqttd_metrics:inc('messages/sent'),
  358. emqttd_metrics:inc('packets/publish/sent');
  359. inc(?SUBACK) ->
  360. emqttd_metrics:inc('packets/suback');
  361. inc(?UNSUBACK) ->
  362. emqttd_metrics:inc('packets/unsuback');
  363. inc(?PINGRESP) ->
  364. emqttd_metrics:inc('packets/pingresp');
  365. inc(_) ->
  366. ingore.