emqttd_protocol.erl 15 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc MQTT Protocol Processor.
  17. -module(emqttd_protocol).
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. -include("emqttd_internal.hrl").
  21. -import(proplists, [get_value/2, get_value/3]).
  22. %% API
  23. -export([init/3, info/1, clientid/1, client/1, session/1]).
  24. -export([received/2, send/2, redeliver/2, shutdown/2]).
  25. -export([process/2]).
  26. %% Protocol State
  27. -record(proto_state, {peername, sendfun, connected = false,
  28. client_id, client_pid, clean_sess,
  29. proto_ver, proto_name, username,
  30. will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
  31. session, ws_initial_headers, %% Headers from first HTTP request for websocket client
  32. connected_at}).
  33. -type proto_state() :: #proto_state{}.
  34. -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name,
  35. keepalive, will_msg, ws_initial_headers, connected_at]).
  36. -define(LOG(Level, Format, Args, State),
  37. lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format,
  38. [State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])).
  39. %% @doc Init protocol
  40. init(Peername, SendFun, Opts) ->
  41. MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
  42. WsInitialHeaders = get_value(ws_initial_headers, Opts),
  43. #proto_state{peername = Peername,
  44. sendfun = SendFun,
  45. max_clientid_len = MaxLen,
  46. client_pid = self(),
  47. ws_initial_headers = WsInitialHeaders}.
  48. info(ProtoState) ->
  49. ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
  50. clientid(#proto_state{client_id = ClientId}) ->
  51. ClientId.
  52. client(#proto_state{client_id = ClientId,
  53. client_pid = ClientPid,
  54. peername = Peername,
  55. username = Username,
  56. clean_sess = CleanSess,
  57. proto_ver = ProtoVer,
  58. keepalive = Keepalive,
  59. will_msg = WillMsg,
  60. ws_initial_headers = WsInitialHeaders,
  61. connected_at = Time}) ->
  62. WillTopic = if
  63. WillMsg =:= undefined -> undefined;
  64. true -> WillMsg#mqtt_message.topic
  65. end,
  66. #mqtt_client{client_id = ClientId,
  67. client_pid = ClientPid,
  68. username = Username,
  69. peername = Peername,
  70. clean_sess = CleanSess,
  71. proto_ver = ProtoVer,
  72. keepalive = Keepalive,
  73. will_topic = WillTopic,
  74. ws_initial_headers = WsInitialHeaders,
  75. connected_at = Time}.
  76. session(#proto_state{session = Session}) ->
  77. Session.
  78. %% CONNECT – Client requests a connection to a Server
  79. %% A Client can only send the CONNECT Packet once over a Network Connection.
  80. -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
  81. received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
  82. process(Packet, State#proto_state{connected = true});
  83. received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
  84. {error, protocol_bad_connect, State};
  85. %% Received other packets when CONNECT not arrived.
  86. received(_Packet, State = #proto_state{connected = false}) ->
  87. {error, protocol_not_connected, State};
  88. received(Packet = ?PACKET(_Type), State) ->
  89. trace(recv, Packet, State),
  90. case validate_packet(Packet) of
  91. ok ->
  92. process(Packet, State);
  93. {error, Reason} ->
  94. {error, Reason, State}
  95. end.
  96. process(Packet = ?CONNECT_PACKET(Var), State0) ->
  97. #mqtt_packet_connect{proto_ver = ProtoVer,
  98. proto_name = ProtoName,
  99. username = Username,
  100. password = Password,
  101. clean_sess = CleanSess,
  102. keep_alive = KeepAlive,
  103. client_id = ClientId} = Var,
  104. State1 = State0#proto_state{proto_ver = ProtoVer,
  105. proto_name = ProtoName,
  106. username = Username,
  107. client_id = ClientId,
  108. clean_sess = CleanSess,
  109. keepalive = KeepAlive,
  110. will_msg = willmsg(Var),
  111. connected_at = os:timestamp()},
  112. trace(recv, Packet, State1),
  113. {ReturnCode1, SessPresent, State3} =
  114. case validate_connect(Var, State1) of
  115. ?CONNACK_ACCEPT ->
  116. case emqttd_access_control:auth(client(State1), Password) of
  117. ok ->
  118. %% Generate clientId if null
  119. State2 = maybe_set_clientid(State1),
  120. %% Start session
  121. case emqttd_sm:start_session(CleanSess, {clientid(State2), Username}) of
  122. {ok, Session, SP} ->
  123. %% Register the client
  124. emqttd_cm:reg(client(State2)),
  125. %% Start keepalive
  126. start_keepalive(KeepAlive),
  127. %% ACCEPT
  128. {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}};
  129. {error, Error} ->
  130. exit({shutdown, Error})
  131. end;
  132. {error, Reason}->
  133. ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
  134. {?CONNACK_CREDENTIALS, false, State1}
  135. end;
  136. ReturnCode ->
  137. {ReturnCode, false, State1}
  138. end,
  139. %% Run hooks
  140. emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)),
  141. %% Send connack
  142. send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
  143. process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
  144. case check_acl(publish, Topic, client(State)) of
  145. allow ->
  146. publish(Packet, State);
  147. deny ->
  148. ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
  149. end,
  150. {ok, State};
  151. process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
  152. emqttd_session:puback(Session, PacketId),
  153. {ok, State};
  154. process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
  155. emqttd_session:pubrec(Session, PacketId),
  156. send(?PUBREL_PACKET(PacketId), State);
  157. process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
  158. emqttd_session:pubrel(Session, PacketId),
  159. send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
  160. process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})->
  161. emqttd_session:pubcomp(Session, PacketId), {ok, State};
  162. %% Protect from empty topic table
  163. process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
  164. send(?SUBACK_PACKET(PacketId, []), State);
  165. process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
  166. Client = client(State),
  167. AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],
  168. case lists:member(deny, AllowDenies) of
  169. true ->
  170. ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
  171. send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
  172. false ->
  173. emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
  174. end;
  175. %% Protect from empty topic list
  176. process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
  177. send(?UNSUBACK_PACKET(PacketId), State);
  178. process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
  179. emqttd_session:unsubscribe(Session, Topics),
  180. send(?UNSUBACK_PACKET(PacketId), State);
  181. process(?PACKET(?PINGREQ), State) ->
  182. send(?PACKET(?PINGRESP), State);
  183. process(?PACKET(?DISCONNECT), State) ->
  184. % Clean willmsg
  185. {stop, normal, State#proto_state{will_msg = undefined}}.
  186. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
  187. #proto_state{client_id = ClientId, username = Username, session = Session}) ->
  188. Msg = emqttd_message:from_packet(Username, ClientId, Packet),
  189. emqttd_session:publish(Session, Msg);
  190. publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
  191. with_puback(?PUBACK, Packet, State);
  192. publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
  193. with_puback(?PUBREC, Packet, State).
  194. with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
  195. State = #proto_state{client_id = ClientId,
  196. username = Username,
  197. session = Session}) ->
  198. Msg = emqttd_message:from_packet(Username, ClientId, Packet),
  199. case emqttd_session:publish(Session, Msg) of
  200. ok ->
  201. send(?PUBACK_PACKET(Type, PacketId), State);
  202. {error, Error} ->
  203. ?LOG(error, "PUBLISH ~p error: ~p", [PacketId, Error], State)
  204. end.
  205. -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
  206. send(Msg, State = #proto_state{client_id = ClientId, username = Username})
  207. when is_record(Msg, mqtt_message) ->
  208. emqttd:run_hooks('message.delivered', [{ClientId, Username}], Msg),
  209. send(emqttd_message:to_packet(Msg), State);
  210. send(Packet, State = #proto_state{sendfun = SendFun})
  211. when is_record(Packet, mqtt_packet) ->
  212. trace(send, Packet, State),
  213. emqttd_metrics:sent(Packet),
  214. SendFun(Packet),
  215. {ok, State}.
  216. trace(recv, Packet, ProtoState) ->
  217. ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
  218. trace(send, Packet, ProtoState) ->
  219. ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
  220. %% @doc redeliver PUBREL PacketId
  221. redeliver({?PUBREL, PacketId}, State) ->
  222. send(?PUBREL_PACKET(PacketId), State).
  223. shutdown(_Error, #proto_state{client_id = undefined}) ->
  224. ignore;
  225. shutdown(conflict, #proto_state{client_id = _ClientId}) ->
  226. %% let it down
  227. %% emqttd_cm:unreg(ClientId);
  228. ignore;
  229. shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
  230. ?LOG(info, "Shutdown for ~p", [Error], State),
  231. Client = client(State),
  232. send_willmsg(Client, WillMsg),
  233. emqttd:run_hooks('client.disconnected', [Error], Client),
  234. %% let it down
  235. %% emqttd_cm:unreg(ClientId).
  236. ok.
  237. willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
  238. emqttd_message:from_packet(Packet).
  239. %% Generate a client if if nulll
  240. maybe_set_clientid(State = #proto_state{client_id = NullId})
  241. when NullId =:= undefined orelse NullId =:= <<>> ->
  242. {_, NPid, _} = emqttd_guid:new(),
  243. ClientId = iolist_to_binary(["emqttd_", integer_to_list(NPid)]),
  244. State#proto_state{client_id = ClientId};
  245. maybe_set_clientid(State) ->
  246. State.
  247. send_willmsg(_Client, undefined) ->
  248. ignore;
  249. send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
  250. emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
  251. start_keepalive(0) -> ignore;
  252. start_keepalive(Sec) when Sec > 0 ->
  253. self() ! {keepalive, start, round(Sec * 1.25)}.
  254. %%--------------------------------------------------------------------
  255. %% Validate Packets
  256. %%--------------------------------------------------------------------
  257. validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
  258. case validate_protocol(Connect) of
  259. true ->
  260. case validate_clientid(Connect, ProtoState) of
  261. true ->
  262. ?CONNACK_ACCEPT;
  263. false ->
  264. ?CONNACK_INVALID_ID
  265. end;
  266. false ->
  267. ?CONNACK_PROTO_VER
  268. end.
  269. validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
  270. lists:member({Ver, Name}, ?PROTOCOL_NAMES).
  271. validate_clientid(#mqtt_packet_connect{client_id = ClientId},
  272. #proto_state{max_clientid_len = MaxLen})
  273. when (size(ClientId) >= 1) andalso (size(ClientId) =< MaxLen) ->
  274. true;
  275. %% Issue#599: Null clientId and clean_sess = false
  276. validate_clientid(#mqtt_packet_connect{client_id = ClientId,
  277. clean_sess = CleanSess}, _ProtoState)
  278. when size(ClientId) == 0 andalso (not CleanSess) ->
  279. false;
  280. %% MQTT3.1.1 allow null clientId.
  281. validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311,
  282. client_id = ClientId}, _ProtoState)
  283. when size(ClientId) =:= 0 ->
  284. true;
  285. validate_clientid(#mqtt_packet_connect{proto_ver = ProtoVer,
  286. clean_sess = CleanSess}, ProtoState) ->
  287. ?LOG(warning, "Invalid clientId. ProtoVer: ~p, CleanSess: ~s",
  288. [ProtoVer, CleanSess], ProtoState),
  289. false.
  290. validate_packet(?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload)) ->
  291. case emqttd_topic:validate({name, Topic}) of
  292. true -> ok;
  293. false -> {error, badtopic}
  294. end;
  295. validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) ->
  296. validate_topics(filter, TopicTable);
  297. validate_packet(?UNSUBSCRIBE_PACKET(_PacketId, Topics)) ->
  298. validate_topics(filter, Topics);
  299. validate_packet(_Packet) ->
  300. ok.
  301. validate_topics(_Type, []) ->
  302. {error, empty_topics};
  303. validate_topics(Type, TopicTable = [{_Topic, _Qos}|_])
  304. when Type =:= name orelse Type =:= filter ->
  305. Valid = fun(Topic, Qos) ->
  306. emqttd_topic:validate({Type, Topic}) and validate_qos(Qos)
  307. end,
  308. case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of
  309. [] -> ok;
  310. _ -> {error, badtopic}
  311. end;
  312. validate_topics(Type, Topics = [Topic0|_]) when is_binary(Topic0) ->
  313. case [Topic || Topic <- Topics, not emqttd_topic:validate({Type, Topic})] of
  314. [] -> ok;
  315. _ -> {error, badtopic}
  316. end.
  317. validate_qos(undefined) ->
  318. true;
  319. validate_qos(Qos) when ?IS_QOS(Qos) ->
  320. true;
  321. validate_qos(_) ->
  322. false.
  323. %% PUBLISH ACL is cached in process dictionary.
  324. check_acl(publish, Topic, Client) ->
  325. IfCache = emqttd:conf(cache_acl, true),
  326. case {IfCache, get({acl, publish, Topic})} of
  327. {true, undefined} ->
  328. AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic),
  329. put({acl, publish, Topic}, AllowDeny),
  330. AllowDeny;
  331. {true, AllowDeny} ->
  332. AllowDeny;
  333. {false, _} ->
  334. emqttd_access_control:check_acl(Client, publish, Topic)
  335. end;
  336. check_acl(subscribe, Topic, Client) ->
  337. emqttd_access_control:check_acl(Client, subscribe, Topic).
  338. sp(true) -> 1;
  339. sp(false) -> 0.