emqttd_protocol.erl 15 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc MQTT Protocol Processor.
  17. -module(emqttd_protocol).
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. -include("emqttd_internal.hrl").
  21. %% API
  22. -export([init/3, info/1, clientid/1, client/1, session/1]).
  23. -export([received/2, send/2, redeliver/2, shutdown/2]).
  24. -export([process/2]).
  25. %% Protocol State
  26. -record(proto_state, {peername, sendfun, connected = false,
  27. client_id, client_pid, clean_sess,
  28. proto_ver, proto_name, username,
  29. will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
  30. session, ws_initial_headers, %% Headers from first HTTP request for websocket client
  31. connected_at}).
  32. -type proto_state() :: #proto_state{}.
  33. -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name,
  34. keepalive, will_msg, ws_initial_headers, connected_at]).
  35. -define(LOG(Level, Format, Args, State),
  36. lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format,
  37. [State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])).
  38. %% @doc Init protocol
  39. init(Peername, SendFun, Opts) ->
  40. MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
  41. WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts),
  42. #proto_state{peername = Peername,
  43. sendfun = SendFun,
  44. max_clientid_len = MaxLen,
  45. client_pid = self(),
  46. ws_initial_headers = WsInitialHeaders}.
  47. info(ProtoState) ->
  48. ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
  49. clientid(#proto_state{client_id = ClientId}) ->
  50. ClientId.
  51. client(#proto_state{client_id = ClientId,
  52. client_pid = ClientPid,
  53. peername = Peername,
  54. username = Username,
  55. clean_sess = CleanSess,
  56. proto_ver = ProtoVer,
  57. keepalive = Keepalive,
  58. will_msg = WillMsg,
  59. ws_initial_headers = WsInitialHeaders,
  60. connected_at = Time}) ->
  61. WillTopic = if
  62. WillMsg =:= undefined -> undefined;
  63. true -> WillMsg#mqtt_message.topic
  64. end,
  65. #mqtt_client{client_id = ClientId,
  66. client_pid = ClientPid,
  67. username = Username,
  68. peername = Peername,
  69. clean_sess = CleanSess,
  70. proto_ver = ProtoVer,
  71. keepalive = Keepalive,
  72. will_topic = WillTopic,
  73. ws_initial_headers = WsInitialHeaders,
  74. connected_at = Time}.
  75. session(#proto_state{session = Session}) ->
  76. Session.
  77. %% CONNECT – Client requests a connection to a Server
  78. %% A Client can only send the CONNECT Packet once over a Network Connection.
  79. -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
  80. received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
  81. process(Packet, State#proto_state{connected = true});
  82. received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
  83. {error, protocol_bad_connect, State};
  84. %% Received other packets when CONNECT not arrived.
  85. received(_Packet, State = #proto_state{connected = false}) ->
  86. {error, protocol_not_connected, State};
  87. received(Packet = ?PACKET(_Type), State) ->
  88. trace(recv, Packet, State),
  89. case validate_packet(Packet) of
  90. ok ->
  91. process(Packet, State);
  92. {error, Reason} ->
  93. {error, Reason, State}
  94. end.
  95. process(Packet = ?CONNECT_PACKET(Var), State0) ->
  96. #mqtt_packet_connect{proto_ver = ProtoVer,
  97. proto_name = ProtoName,
  98. username = Username,
  99. password = Password,
  100. clean_sess = CleanSess,
  101. keep_alive = KeepAlive,
  102. client_id = ClientId} = Var,
  103. State1 = State0#proto_state{proto_ver = ProtoVer,
  104. proto_name = ProtoName,
  105. username = Username,
  106. client_id = ClientId,
  107. clean_sess = CleanSess,
  108. keepalive = KeepAlive,
  109. will_msg = willmsg(Var),
  110. connected_at = os:timestamp()},
  111. trace(recv, Packet, State1),
  112. {ReturnCode1, SessPresent, State3} =
  113. case validate_connect(Var, State1) of
  114. ?CONNACK_ACCEPT ->
  115. case emqttd_access_control:auth(client(State1), Password) of
  116. ok ->
  117. %% Generate clientId if null
  118. State2 = maybe_set_clientid(State1),
  119. %% Start session
  120. case emqttd_sm:start_session(CleanSess, clientid(State2)) of
  121. {ok, Session, SP} ->
  122. %% Register the client
  123. emqttd_cm:register(client(State2)),
  124. %% Start keepalive
  125. start_keepalive(KeepAlive),
  126. %% ACCEPT
  127. {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}};
  128. {error, Error} ->
  129. exit({shutdown, Error})
  130. end;
  131. {error, Reason}->
  132. ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
  133. {?CONNACK_CREDENTIALS, false, State1}
  134. end;
  135. ReturnCode ->
  136. {ReturnCode, false, State1}
  137. end,
  138. %% Run hooks
  139. emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]),
  140. %% Send connack
  141. send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
  142. process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
  143. case check_acl(publish, Topic, client(State)) of
  144. allow ->
  145. publish(Packet, State);
  146. deny ->
  147. ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
  148. end,
  149. {ok, State};
  150. process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
  151. emqttd_session:puback(Session, PacketId),
  152. {ok, State};
  153. process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
  154. emqttd_session:pubrec(Session, PacketId),
  155. send(?PUBREL_PACKET(PacketId), State);
  156. process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
  157. emqttd_session:pubrel(Session, PacketId),
  158. send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
  159. process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})->
  160. emqttd_session:pubcomp(Session, PacketId), {ok, State};
  161. %% Protect from empty topic table
  162. process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
  163. send(?SUBACK_PACKET(PacketId, []), State);
  164. process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
  165. Client = client(State),
  166. AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],
  167. case lists:member(deny, AllowDenies) of
  168. true ->
  169. ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
  170. send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
  171. false ->
  172. emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
  173. end;
  174. %% Protect from empty topic list
  175. process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
  176. send(?UNSUBACK_PACKET(PacketId), State);
  177. process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
  178. emqttd_session:unsubscribe(Session, Topics),
  179. send(?UNSUBACK_PACKET(PacketId), State);
  180. process(?PACKET(?PINGREQ), State) ->
  181. send(?PACKET(?PINGRESP), State);
  182. process(?PACKET(?DISCONNECT), State) ->
  183. % Clean willmsg
  184. {stop, normal, State#proto_state{will_msg = undefined}}.
  185. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
  186. #proto_state{client_id = ClientId, session = Session}) ->
  187. emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
  188. publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
  189. with_puback(?PUBACK, Packet, State);
  190. publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
  191. with_puback(?PUBREC, Packet, State).
  192. with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
  193. State = #proto_state{client_id = ClientId, session = Session}) ->
  194. Msg = emqttd_message:from_packet(ClientId, Packet),
  195. case emqttd_session:publish(Session, Msg) of
  196. ok ->
  197. send(?PUBACK_PACKET(Type, PacketId), State);
  198. {error, Error} ->
  199. ?LOG(error, "PUBLISH ~p error: ~p", [PacketId, Error], State)
  200. end.
  201. -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
  202. send(Msg, State) when is_record(Msg, mqtt_message) ->
  203. send(emqttd_message:to_packet(Msg), State);
  204. send(Packet, State = #proto_state{sendfun = SendFun})
  205. when is_record(Packet, mqtt_packet) ->
  206. trace(send, Packet, State),
  207. emqttd_metrics:sent(Packet),
  208. Data = emqttd_serializer:serialize(Packet),
  209. ?LOG(debug, "SEND ~p", [Data], State),
  210. emqttd_metrics:inc('bytes/sent', size(Data)),
  211. SendFun(Data),
  212. {ok, State}.
  213. trace(recv, Packet, ProtoState) ->
  214. ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
  215. trace(send, Packet, ProtoState) ->
  216. ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
  217. %% @doc redeliver PUBREL PacketId
  218. redeliver({?PUBREL, PacketId}, State) ->
  219. send(?PUBREL_PACKET(PacketId), State).
  220. shutdown(_Error, #proto_state{client_id = undefined}) ->
  221. ignore;
  222. shutdown(conflict, #proto_state{client_id = _ClientId}) ->
  223. %% let it down
  224. %% emqttd_cm:unregister(ClientId);
  225. ignore;
  226. shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) ->
  227. ?LOG(info, "Shutdown for ~p", [Error], State),
  228. send_willmsg(ClientId, WillMsg),
  229. emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]),
  230. %% let it down
  231. %% emqttd_cm:unregister(ClientId).
  232. ok.
  233. willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
  234. emqttd_message:from_packet(Packet).
  235. %% Generate a client if if nulll
  236. maybe_set_clientid(State = #proto_state{client_id = NullId})
  237. when NullId =:= undefined orelse NullId =:= <<>> ->
  238. {_, NPid, _} = emqttd_guid:new(),
  239. ClientId = iolist_to_binary(["emqttd_", integer_to_list(NPid)]),
  240. State#proto_state{client_id = ClientId};
  241. maybe_set_clientid(State) ->
  242. State.
  243. send_willmsg(_ClientId, undefined) ->
  244. ignore;
  245. send_willmsg(ClientId, WillMsg) ->
  246. emqttd_pubsub:publish(WillMsg#mqtt_message{from = ClientId}).
  247. start_keepalive(0) -> ignore;
  248. start_keepalive(Sec) when Sec > 0 ->
  249. self() ! {keepalive, start, round(Sec * 1.2)}.
  250. %%--------------------------------------------------------------------
  251. %% Validate Packets
  252. %%--------------------------------------------------------------------
  253. validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
  254. case validate_protocol(Connect) of
  255. true ->
  256. case validate_clientid(Connect, ProtoState) of
  257. true ->
  258. ?CONNACK_ACCEPT;
  259. false ->
  260. ?CONNACK_INVALID_ID
  261. end;
  262. false ->
  263. ?CONNACK_PROTO_VER
  264. end.
  265. validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
  266. lists:member({Ver, Name}, ?PROTOCOL_NAMES).
  267. validate_clientid(#mqtt_packet_connect{client_id = ClientId},
  268. #proto_state{max_clientid_len = MaxLen})
  269. when (size(ClientId) >= 1) andalso (size(ClientId) =< MaxLen) ->
  270. true;
  271. %% MQTT3.1.1 allow null clientId.
  272. validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311,
  273. client_id = ClientId}, _ProtoState)
  274. when size(ClientId) =:= 0 ->
  275. true;
  276. validate_clientid(#mqtt_packet_connect{proto_ver = ProtoVer,
  277. clean_sess = CleanSess}, ProtoState) ->
  278. ?LOG(warning, "Invalid clientId. ProtoVer: ~p, CleanSess: ~s",
  279. [ProtoVer, CleanSess], ProtoState),
  280. false.
  281. validate_packet(?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload)) ->
  282. case emqttd_topic:validate({name, Topic}) of
  283. true -> ok;
  284. false -> {error, badtopic}
  285. end;
  286. validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) ->
  287. validate_topics(filter, TopicTable);
  288. validate_packet(?UNSUBSCRIBE_PACKET(_PacketId, Topics)) ->
  289. validate_topics(filter, Topics);
  290. validate_packet(_Packet) ->
  291. ok.
  292. validate_topics(_Type, []) ->
  293. {error, empty_topics};
  294. validate_topics(Type, TopicTable = [{_Topic, _Qos}|_])
  295. when Type =:= name orelse Type =:= filter ->
  296. Valid = fun(Topic, Qos) ->
  297. emqttd_topic:validate({Type, Topic}) and validate_qos(Qos)
  298. end,
  299. case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of
  300. [] -> ok;
  301. _ -> {error, badtopic}
  302. end;
  303. validate_topics(Type, Topics = [Topic0|_]) when is_binary(Topic0) ->
  304. case [Topic || Topic <- Topics, not emqttd_topic:validate({Type, Topic})] of
  305. [] -> ok;
  306. _ -> {error, badtopic}
  307. end.
  308. validate_qos(undefined) ->
  309. true;
  310. validate_qos(Qos) when ?IS_QOS(Qos) ->
  311. true;
  312. validate_qos(_) ->
  313. false.
  314. %% PUBLISH ACL is cached in process dictionary.
  315. check_acl(publish, Topic, Client) ->
  316. case get({acl, publish, Topic}) of
  317. undefined ->
  318. AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic),
  319. put({acl, publish, Topic}, AllowDeny),
  320. AllowDeny;
  321. AllowDeny ->
  322. AllowDeny
  323. end;
  324. check_acl(subscribe, Topic, Client) ->
  325. emqttd_access_control:check_acl(Client, subscribe, Topic).
  326. sp(true) -> 1;
  327. sp(false) -> 0.