|
|
@@ -1,5 +1,5 @@
|
|
|
%%-----------------------------------------------------------------------------
|
|
|
-%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
|
|
|
+%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
|
|
%%
|
|
|
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
%% of this software and associated documentation files (the "Software"), to deal
|
|
|
@@ -24,71 +24,89 @@
|
|
|
|
|
|
-include("emqtt.hrl").
|
|
|
|
|
|
--include("emqtt_frame.hrl").
|
|
|
+-include("emqtt_packet.hrl").
|
|
|
|
|
|
-record(proto_state, {
|
|
|
socket,
|
|
|
- message_id,
|
|
|
+ connected = false, %received CONNECT action?
|
|
|
+ packet_id,
|
|
|
client_id,
|
|
|
clean_sess,
|
|
|
will_msg,
|
|
|
+ subscriptions,
|
|
|
awaiting_ack,
|
|
|
- subtopics,
|
|
|
awaiting_rel
|
|
|
}).
|
|
|
|
|
|
-type proto_state() :: #proto_state{}.
|
|
|
|
|
|
--export([initial_state/1, handle_frame/2, send_message/2, client_terminated/1]).
|
|
|
+-export([initial_state/1]).
|
|
|
+
|
|
|
+-export([handle_packet/2, send_packet/2, client_terminated/1]).
|
|
|
|
|
|
-export([info/1]).
|
|
|
|
|
|
--define(FRAME_TYPE(Frame, Type),
|
|
|
- Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
|
|
|
+-define(PACKET_TYPE(Packet, Type),
|
|
|
+ Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}).
|
|
|
|
|
|
initial_state(Socket) ->
|
|
|
#proto_state{
|
|
|
socket = Socket,
|
|
|
- message_id = 1,
|
|
|
+ packet_id = 1,
|
|
|
+ subscriptions = [],
|
|
|
awaiting_ack = gb_trees:empty(),
|
|
|
- subtopics = [],
|
|
|
awaiting_rel = gb_trees:empty()
|
|
|
}.
|
|
|
|
|
|
-info(#proto_state{ message_id = MsgId,
|
|
|
+info(#proto_state{ packet_id = PacketId,
|
|
|
client_id = ClientId,
|
|
|
clean_sess = CleanSess,
|
|
|
will_msg = WillMsg,
|
|
|
- subtopics = SubTopics}) ->
|
|
|
- [ {message_id, MsgId},
|
|
|
- {client_id, ClientId},
|
|
|
- {clean_sess, CleanSess},
|
|
|
- {will_msg, WillMsg},
|
|
|
- {subtopics, SubTopics} ].
|
|
|
-
|
|
|
--spec handle_frame(Frame, State) -> {ok, NewState} | {error, any()} when
|
|
|
- Frame :: #mqtt_frame{},
|
|
|
- State :: proto_state(),
|
|
|
- NewState :: proto_state().
|
|
|
-
|
|
|
-handle_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
|
|
+ subscriptions= Subs }) ->
|
|
|
+ [ {packet_id, PacketId},
|
|
|
+ {client_id, ClientId},
|
|
|
+ {clean_sess, CleanSess},
|
|
|
+ {will_msg, WillMsg},
|
|
|
+ {subscriptions, Subs} ].
|
|
|
+
|
|
|
+-spec handle_packet(Packet, State) -> {ok, NewState} | {error, any()} when
|
|
|
+ Packet :: mqtt_packet(),
|
|
|
+ State :: proto_state(),
|
|
|
+ NewState :: proto_state().
|
|
|
+
|
|
|
+%%CONNECT – Client requests a connection to a Server
|
|
|
+
|
|
|
+%%A Client can only send the CONNECT Packet once over a Network Connection. 369
|
|
|
+handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = false}) ->
|
|
|
+ handle_packet(?CONNECT, Packet, State#proto_state{connected = true});
|
|
|
+
|
|
|
+handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = true}) ->
|
|
|
+ {error, protocol_bad_connect, State};
|
|
|
+
|
|
|
+%%Received other packets when CONNECT not arrived.
|
|
|
+handle_packet(_Packet, State = #proto_state{connected = false}) ->
|
|
|
+ {error, protocol_not_connected, State};
|
|
|
+
|
|
|
+handle_packet(?PACKET_TYPE(Packet, Type),
|
|
|
State = #proto_state{client_id = ClientId}) ->
|
|
|
- lager:info("frame from ~s: ~p", [ClientId, Frame]),
|
|
|
- case validate_frame(Type, Frame) of
|
|
|
+ lager:info("packet from ~s: ~p", [ClientId, Packet]),
|
|
|
+ case validate_packet(Type, Packet) of
|
|
|
ok ->
|
|
|
- handle_request(Type, Frame, State);
|
|
|
+ handle_packet(Type, Packet, State);
|
|
|
{error, Reason} ->
|
|
|
{error, Reason, State}
|
|
|
end.
|
|
|
|
|
|
-handle_request(?CONNECT,
|
|
|
- #mqtt_frame{ variable = #mqtt_frame_connect{
|
|
|
- username = Username,
|
|
|
- password = Password,
|
|
|
- proto_ver = ProtoVersion,
|
|
|
- clean_sess = CleanSess,
|
|
|
- keep_alive = AlivePeriod,
|
|
|
- client_id = ClientId } = Var}, State0 = #proto_state{socket = Sock}) ->
|
|
|
+handle_packet(?CONNECT, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_connect {
|
|
|
+ username = Username,
|
|
|
+ password = Password,
|
|
|
+ proto_ver = ProtoVersion,
|
|
|
+ clean_sess = CleanSess,
|
|
|
+ keep_alive = AlivePeriod,
|
|
|
+ client_id = ClientId } = Var },
|
|
|
+ State0 = #proto_state{socket = Sock}) ->
|
|
|
+
|
|
|
State = State0#proto_state{client_id = ClientId},
|
|
|
{ReturnCode, State1} =
|
|
|
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
|
|
@@ -113,202 +131,204 @@ handle_request(?CONNECT,
|
|
|
end
|
|
|
end,
|
|
|
lager:info("recv conn...:~p", [ReturnCode]),
|
|
|
- send_frame(Sock, #mqtt_frame{
|
|
|
- fixed = #mqtt_frame_fixed{ type = ?CONNACK },
|
|
|
- variable = #mqtt_frame_connack{
|
|
|
- return_code = ReturnCode }}),
|
|
|
+ send_packet(Sock, #mqtt_packet {
|
|
|
+ header = #mqtt_packet_header { type = ?CONNACK },
|
|
|
+ variable = #mqtt_packet_connack{ return_code = ReturnCode }}),
|
|
|
{ok, State1};
|
|
|
|
|
|
-handle_request(?PUBLISH, Frame=#mqtt_frame{
|
|
|
- fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
|
|
|
- emqtt_router:route(make_msg(Frame)),
|
|
|
+handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
|
|
+ header = #mqtt_packet_header {qos = ?QOS_0}}, State) ->
|
|
|
+ emqtt_router:route(make_message(Packet)),
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?PUBLISH,
|
|
|
- Frame=#mqtt_frame{
|
|
|
- fixed = #mqtt_frame_fixed{qos = ?QOS_1},
|
|
|
- variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
|
+handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
|
|
+ header = #mqtt_packet_header { qos = ?QOS_1 },
|
|
|
+ variable = #mqtt_packet_publish{packet_id = PacketId}},
|
|
|
State=#proto_state{socket=Sock}) ->
|
|
|
- emqtt_pubsub:publish(make_msg(Frame)),
|
|
|
- send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
|
- variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
|
+ emqtt_router:route(make_message(Packet)),
|
|
|
+ send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBACK },
|
|
|
+ variable = #mqtt_packet_puback { packet_id = PacketId}}),
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?PUBLISH,
|
|
|
- Frame=#mqtt_frame{
|
|
|
- fixed = #mqtt_frame_fixed{qos = ?QOS_2},
|
|
|
- variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
|
+handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
|
|
+ header = #mqtt_packet_header { qos = ?QOS_2 },
|
|
|
+ variable = #mqtt_packet_publish{packet_id = PacketId}},
|
|
|
State=#proto_state{socket=Sock}) ->
|
|
|
- emqtt_pubsub:publish(make_msg(Frame)),
|
|
|
- put({msg, MsgId}, pubrec),
|
|
|
- send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
|
|
|
- variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
|
-
|
|
|
+ %%FIXME: this is not right...should store it first...
|
|
|
+ emqtt_router:route(make_message(Packet)),
|
|
|
+ put({msg, PacketId}, pubrec),
|
|
|
+ send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBREC },
|
|
|
+ variable = #mqtt_packet_puback { packet_id = PacketId}}),
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?PUBACK, #mqtt_frame{}, State) ->
|
|
|
- %TODO: fixme later
|
|
|
+handle_packet(?PUBACK, #mqtt_packet {}, State) ->
|
|
|
+ %FIXME Later
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?PUBREC, #mqtt_frame{
|
|
|
- variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
|
- State=#proto_state{socket=Sock}) ->
|
|
|
- %TODO: fixme later
|
|
|
- send_frame(Sock,
|
|
|
- #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL},
|
|
|
- variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
|
+handle_packet(?PUBREC, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_puback { packet_id = PktId }},
|
|
|
+ State=#proto_state{socket=Sock}) ->
|
|
|
+ %FIXME Later: should release the message here
|
|
|
+ send_packet(Sock, #mqtt_packet {
|
|
|
+ header = #mqtt_packet_header { type = ?PUBREL},
|
|
|
+ variable = #mqtt_packet_puback { packet_id = PktId}}),
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?PUBREL,
|
|
|
- #mqtt_frame{
|
|
|
- variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
|
- State=#proto_state{socket=Sock}) ->
|
|
|
- erase({msg, MsgId}),
|
|
|
- send_frame(Sock,
|
|
|
- #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP},
|
|
|
- variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
|
+handle_packet(?PUBREL, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_puback { packet_id = PktId}},
|
|
|
+ State=#proto_state{socket=Sock}) ->
|
|
|
+ %%FIXME: not right...
|
|
|
+ erase({msg, PktId}),
|
|
|
+ send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?PUBCOMP},
|
|
|
+ variable = #mqtt_packet_puback { packet_id = PktId}}),
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?PUBCOMP, #mqtt_frame{
|
|
|
- variable = #mqtt_frame_publish{message_id = _MsgId}}, State) ->
|
|
|
+handle_packet(?PUBCOMP, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_puback{packet_id = _PktId}}, State) ->
|
|
|
%TODO: fixme later
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?SUBSCRIBE,
|
|
|
- #mqtt_frame{
|
|
|
- variable = #mqtt_frame_subscribe{message_id = MessageId,
|
|
|
- topic_table = Topics},
|
|
|
- payload = undefined},
|
|
|
- #proto_state{socket=Sock} = State) ->
|
|
|
+handle_packet(?SUBSCRIBE, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_subscribe{
|
|
|
+ packet_id = PacketId,
|
|
|
+ topic_table = Topics},
|
|
|
+ payload = undefined},
|
|
|
+ State = #proto_state{socket=Sock}) ->
|
|
|
|
|
|
+ %%FIXME: this is not right...
|
|
|
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
|
|
|
#mqtt_topic{name=Name, qos=Qos} <- Topics],
|
|
|
|
|
|
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
|
|
|
|
|
- send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
|
|
|
- variable = #mqtt_frame_suback{
|
|
|
- message_id = MessageId,
|
|
|
- qos_table = GrantedQos}}),
|
|
|
+ send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK },
|
|
|
+ variable = #mqtt_packet_suback{
|
|
|
+ packet_id = PacketId,
|
|
|
+ qos_table = GrantedQos }}),
|
|
|
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?UNSUBSCRIBE,
|
|
|
- #mqtt_frame{
|
|
|
- variable = #mqtt_frame_subscribe{message_id = MessageId,
|
|
|
- topic_table = Topics },
|
|
|
- payload = undefined}, #proto_state{socket = Sock, client_id = ClientId} = State) ->
|
|
|
-
|
|
|
+handle_packet(?UNSUBSCRIBE, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_subscribe{
|
|
|
+ packet_id = PacketId,
|
|
|
+ topic_table = Topics },
|
|
|
+ payload = undefined},
|
|
|
+ State = #proto_state{socket = Sock, client_id = ClientId}) ->
|
|
|
|
|
|
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
|
|
|
|
|
- send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
|
|
|
- variable = #mqtt_frame_suback{message_id = MessageId }}),
|
|
|
+ send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK },
|
|
|
+ variable = #mqtt_packet_suback{packet_id = PacketId }}),
|
|
|
|
|
|
{ok, State};
|
|
|
|
|
|
-%, keep_alive=KeepAlive
|
|
|
-handle_request(?PINGREQ, #mqtt_frame{}, #proto_state{socket=Sock}=State) ->
|
|
|
- %Keep alive timer
|
|
|
- %%TODO:...
|
|
|
- %%KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
|
|
- send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
|
|
|
+handle_packet(?PINGREQ, #mqtt_packet{}, #proto_state{socket=Sock}=State) ->
|
|
|
+ send_packet(Sock, make_packet(?PINGRESP)),
|
|
|
{ok, State};
|
|
|
|
|
|
-handle_request(?DISCONNECT, #mqtt_frame{}, State=#proto_state{client_id=ClientId}) ->
|
|
|
+handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{client_id=ClientId}) ->
|
|
|
lager:info("~s disconnected", [ClientId]),
|
|
|
{stop, State}.
|
|
|
|
|
|
+
|
|
|
+make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT ->
|
|
|
+ #mqtt_packet{ header = #mqtt_packet_header { type = Type } }.
|
|
|
+
|
|
|
-spec send_message(Message, State) -> {ok, NewState} when
|
|
|
- Message :: mqtt_msg(),
|
|
|
+ Message :: mqtt_message(),
|
|
|
State :: proto_state(),
|
|
|
NewState :: proto_state().
|
|
|
|
|
|
-send_message(Message, State = #proto_state{socket = Sock, message_id = MsgId}) ->
|
|
|
-
|
|
|
- #mqtt_msg{retain = Retain,
|
|
|
- qos = Qos,
|
|
|
- topic = Topic,
|
|
|
- dup = Dup,
|
|
|
- payload = Payload,
|
|
|
- encoder = Encoder} = Message,
|
|
|
-
|
|
|
- Payload1 =
|
|
|
- if
|
|
|
- Encoder == undefined -> Payload;
|
|
|
- true -> Encoder(Payload)
|
|
|
- end,
|
|
|
- Frame = #mqtt_frame{
|
|
|
- fixed = #mqtt_frame_fixed{type = ?PUBLISH,
|
|
|
- qos = Qos,
|
|
|
- retain = Retain,
|
|
|
- dup = Dup},
|
|
|
- variable = #mqtt_frame_publish{topic_name = Topic,
|
|
|
- message_id = if
|
|
|
- Qos == ?QOS_0 -> undefined;
|
|
|
- true -> MsgId
|
|
|
- end},
|
|
|
- payload = Payload1},
|
|
|
-
|
|
|
- send_frame(Sock, Frame),
|
|
|
+send_message(Message = #mqtt_message{
|
|
|
+ retain = Retain,
|
|
|
+ qos = Qos,
|
|
|
+ topic = Topic,
|
|
|
+ dup = Dup,
|
|
|
+ payload = Payload},
|
|
|
+ State = #proto_state{socket = Sock, packet_id = PacketId}) ->
|
|
|
+
|
|
|
+ Packet = #mqtt_packet {
|
|
|
+ header = #mqtt_packet_header {
|
|
|
+ type = ?PUBLISH,
|
|
|
+ qos = Qos,
|
|
|
+ retain = Retain,
|
|
|
+ dup = Dup },
|
|
|
+ variable = #mqtt_packet_publish {
|
|
|
+ topic_name = Topic,
|
|
|
+ packet_id = if
|
|
|
+ Qos == ?QOS_0 -> undefined;
|
|
|
+ true -> PacketId
|
|
|
+ end },
|
|
|
+ payload = Payload},
|
|
|
+
|
|
|
+ send_packet(Sock, Packet),
|
|
|
if
|
|
|
Qos == ?QOS_0 ->
|
|
|
{ok, State};
|
|
|
true ->
|
|
|
- {ok, next_msg_id(State)}
|
|
|
+ {ok, next_packet_id(State)}
|
|
|
end.
|
|
|
|
|
|
-send_frame(Sock, Frame) ->
|
|
|
- lager:info("send frame:~p", [Frame]),
|
|
|
- erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
|
|
|
+send_packet(Sock, Packet) ->
|
|
|
+ lager:info("send packet:~p", [Packet]),
|
|
|
+ %%FIXME Later...
|
|
|
+ erlang:port_command(Sock, emqtt_packet:serialise(Packet)).
|
|
|
|
|
|
%%TODO: fix me later...
|
|
|
client_terminated(#proto_state{client_id = ClientId} = State) ->
|
|
|
ok.
|
|
|
%emqtt_cm:unregister(ClientId, self()).
|
|
|
|
|
|
-make_msg(#mqtt_frame{
|
|
|
- fixed = #mqtt_frame_fixed{qos = Qos,
|
|
|
- retain = Retain,
|
|
|
- dup = Dup},
|
|
|
- variable = #mqtt_frame_publish{topic_name = Topic,
|
|
|
- message_id = MessageId},
|
|
|
- payload = Payload}) ->
|
|
|
- #mqtt_msg{retain = Retain,
|
|
|
- qos = Qos,
|
|
|
- topic = Topic,
|
|
|
- dup = Dup,
|
|
|
- msgid = MessageId,
|
|
|
- payload = Payload}.
|
|
|
-
|
|
|
-make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
|
|
|
+make_message(#mqtt_packet {
|
|
|
+ header = #mqtt_packet_header{
|
|
|
+ qos = Qos,
|
|
|
+ retain = Retain,
|
|
|
+ dup = Dup },
|
|
|
+ variable = #mqtt_packet_publish{
|
|
|
+ topic_name = Topic,
|
|
|
+ packet_id = PacketId },
|
|
|
+ payload = Payload }) ->
|
|
|
+
|
|
|
+ #mqtt_message{ retain = Retain,
|
|
|
+ qos = Qos,
|
|
|
+ topic = Topic,
|
|
|
+ dup = Dup,
|
|
|
+ msgid = PacketId,
|
|
|
+ payload = Payload}.
|
|
|
+
|
|
|
+make_will_msg(#mqtt_packet_connect{ will_flag = false }) ->
|
|
|
undefined;
|
|
|
-make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
|
|
|
- will_qos = Qos,
|
|
|
- will_topic = Topic,
|
|
|
- will_msg = Msg }) ->
|
|
|
- #mqtt_msg{retain = Retain,
|
|
|
- qos = Qos,
|
|
|
- topic = Topic,
|
|
|
- dup = false,
|
|
|
- payload = Msg }.
|
|
|
-
|
|
|
-next_msg_id(State = #proto_state{ message_id = 16#ffff }) ->
|
|
|
- State #proto_state{ message_id = 1 };
|
|
|
-next_msg_id(State = #proto_state{ message_id = MsgId }) ->
|
|
|
- State #proto_state{ message_id = MsgId + 1 }.
|
|
|
+
|
|
|
+make_will_msg(#mqtt_packet_connect{ will_retain = Retain,
|
|
|
+ will_qos = Qos,
|
|
|
+ will_topic = Topic,
|
|
|
+ will_msg = Msg }) ->
|
|
|
+ #mqtt_message{ retain = Retain,
|
|
|
+ qos = Qos,
|
|
|
+ topic = Topic,
|
|
|
+ dup = false,
|
|
|
+ payload = Msg }.
|
|
|
+
|
|
|
+next_packet_id(State = #proto_state{ packet_id = 16#ffff }) ->
|
|
|
+ State #proto_state{ packet_id = 1 };
|
|
|
+next_packet_id(State = #proto_state{ packet_id = PacketId }) ->
|
|
|
+ State #proto_state{ packet_id = PacketId + 1 }.
|
|
|
|
|
|
valid_client_id(ClientId) ->
|
|
|
ClientIdLen = size(ClientId),
|
|
|
- 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
|
|
|
+ 1 =< ClientIdLen andalso ClientIdLen =< ?MAX_CLIENTID_LEN.
|
|
|
|
|
|
-validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) ->
|
|
|
+validate_packet(?PUBLISH, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_publish{
|
|
|
+ topic_name = Topic }}) ->
|
|
|
case emqtt_topic:validate({publish, Topic}) of
|
|
|
true -> ok;
|
|
|
false -> {error, badtopic}
|
|
|
end;
|
|
|
|
|
|
-validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
|
|
+validate_packet(?UNSUBSCRIBE, #mqtt_packet {
|
|
|
+ variable = #mqtt_packet_subscribe{
|
|
|
+ topic_table = Topics }}) ->
|
|
|
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
|
|
|
not emqtt_topic:validate({subscribe, Topic})],
|
|
|
case ErrTopics of
|
|
|
@@ -316,7 +336,7 @@ validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_
|
|
|
_ -> lager:error("error topics: ~p", [ErrTopics]), {error, badtopic}
|
|
|
end;
|
|
|
|
|
|
-validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
|
|
+validate_packet(?SUBSCRIBE, #mqtt_packet{variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
|
|
|
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
|
|
|
not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))],
|
|
|
case ErrTopics of
|
|
|
@@ -324,7 +344,7 @@ validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_ta
|
|
|
_ -> lager:error("error topics: ~p", [ErrTopics]), {error, badtopic}
|
|
|
end;
|
|
|
|
|
|
-validate_frame(_Type, _Frame) ->
|
|
|
+validate_packet(_Type, _Frame) ->
|
|
|
ok.
|
|
|
|
|
|
maybe_clean_sess(false, _Conn, _ClientId) ->
|
|
|
@@ -337,3 +357,4 @@ send_will_msg(#proto_state{will_msg = undefined}) ->
|
|
|
ignore;
|
|
|
send_will_msg(#proto_state{will_msg = WillMsg }) ->
|
|
|
emqtt_router:route(WillMsg).
|
|
|
+
|