|
|
@@ -38,7 +38,7 @@
|
|
|
|
|
|
-export([received/2, send/2, redeliver/2, shutdown/2]).
|
|
|
|
|
|
--export([handle/2]).
|
|
|
+-export([process/2]).
|
|
|
|
|
|
%% Protocol State
|
|
|
-record(proto_state, {peername,
|
|
|
@@ -65,8 +65,8 @@
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
init(Peername, SendFun, Opts) ->
|
|
|
- MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
|
|
|
- WsInitialHeaders = proplists:get_value(ws_initial_headers, Opts),
|
|
|
+ MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
|
|
|
+ WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts),
|
|
|
#proto_state{peername = Peername,
|
|
|
sendfun = SendFun,
|
|
|
max_clientid_len = MaxLen,
|
|
|
@@ -130,7 +130,7 @@ session(#proto_state{session = Session}) ->
|
|
|
%%A Client can only send the CONNECT Packet once over a Network Connection.
|
|
|
-spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
|
|
|
received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
|
|
|
- handle(Packet, State#proto_state{connected = true});
|
|
|
+ process(Packet, State#proto_state{connected = true});
|
|
|
|
|
|
received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
|
|
|
{error, protocol_bad_connect, State};
|
|
|
@@ -143,19 +143,19 @@ received(Packet = ?PACKET(_Type), State) ->
|
|
|
trace(recv, Packet, State),
|
|
|
case validate_packet(Packet) of
|
|
|
ok ->
|
|
|
- handle(Packet, State);
|
|
|
+ process(Packet, State);
|
|
|
{error, Reason} ->
|
|
|
{error, Reason, State}
|
|
|
end.
|
|
|
|
|
|
-handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
|
|
|
+process(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
|
|
|
|
|
|
#mqtt_packet_connect{proto_ver = ProtoVer,
|
|
|
proto_name = ProtoName,
|
|
|
username = Username,
|
|
|
password = Password,
|
|
|
clean_sess = CleanSess,
|
|
|
- keep_alive = KeepAlive,
|
|
|
+ keep_alive = KeepAlive,
|
|
|
client_id = ClientId} = Var,
|
|
|
|
|
|
State1 = State0#proto_state{proto_ver = ProtoVer,
|
|
|
@@ -190,7 +190,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
|
|
|
exit({shutdown, Error})
|
|
|
end;
|
|
|
{error, Reason}->
|
|
|
- lager:error("~s@~s: username '~s', login failed - ~s",
|
|
|
+ lager:error("~s@~s: username '~s' login failed for ~s",
|
|
|
[ClientId, emqttd_net:format(Peername), Username, Reason]),
|
|
|
{?CONNACK_CREDENTIALS, State1}
|
|
|
|
|
|
@@ -203,8 +203,8 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
|
|
|
%% Send connack
|
|
|
send(?CONNACK_PACKET(ReturnCode1), State3);
|
|
|
|
|
|
-handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
|
|
|
- State = #proto_state{client_id = ClientId}) ->
|
|
|
+process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
|
|
|
+ State = #proto_state{client_id = ClientId}) ->
|
|
|
|
|
|
case check_acl(publish, Topic, State) of
|
|
|
allow ->
|
|
|
@@ -214,70 +214,76 @@ handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
|
|
|
end,
|
|
|
{ok, State};
|
|
|
|
|
|
-handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
|
|
|
- emqttd_session:puback(Session, PacketId),
|
|
|
- {ok, State};
|
|
|
+process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
|
|
|
+ emqttd_session:puback(Session, PacketId), {ok, State};
|
|
|
|
|
|
-handle(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
|
|
|
+process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
|
|
|
emqttd_session:pubrec(Session, PacketId),
|
|
|
send(?PUBREL_PACKET(PacketId), State);
|
|
|
|
|
|
-handle(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
|
|
|
+process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
|
|
|
emqttd_session:pubrel(Session, PacketId),
|
|
|
send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
|
|
|
|
|
|
-handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session}) ->
|
|
|
- emqttd_session:pubcomp(Session, PacketId),
|
|
|
- {ok, State};
|
|
|
+process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})->
|
|
|
+ emqttd_session:pubcomp(Session, PacketId), {ok, State};
|
|
|
|
|
|
%% protect from empty topic list
|
|
|
-handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
|
+process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
|
send(?SUBACK_PACKET(PacketId, []), State);
|
|
|
|
|
|
-handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
+process(?SUBSCRIBE_PACKET(PacketId, TopicTable),
|
|
|
+ State = #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
|
|
|
case lists:member(deny, AllowDenies) of
|
|
|
true ->
|
|
|
- %%TODO: return 128 QoS when deny... no need to SUBACK?
|
|
|
- lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]);
|
|
|
+ lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
|
|
+ send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
|
|
|
false ->
|
|
|
- Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end,
|
|
|
- emqttd_session:subscribe(Session, TopicTable, Callback)
|
|
|
- end,
|
|
|
- {ok, State};
|
|
|
+ AckFun = fun(GrantedQos) ->
|
|
|
+ send(?SUBACK_PACKET(PacketId, GrantedQos), State)
|
|
|
+ end,
|
|
|
+ emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State}
|
|
|
+ end;
|
|
|
|
|
|
%% protect from empty topic list
|
|
|
-handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
|
+process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
|
send(?UNSUBACK_PACKET(PacketId), State);
|
|
|
|
|
|
-handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
|
|
|
+process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
|
|
|
emqttd_session:unsubscribe(Session, Topics),
|
|
|
send(?UNSUBACK_PACKET(PacketId), State);
|
|
|
|
|
|
-handle(?PACKET(?PINGREQ), State) ->
|
|
|
+process(?PACKET(?PINGREQ), State) ->
|
|
|
send(?PACKET(?PINGRESP), State);
|
|
|
|
|
|
-handle(?PACKET(?DISCONNECT), State) ->
|
|
|
+process(?PACKET(?DISCONNECT), State) ->
|
|
|
% clean willmsg
|
|
|
{stop, normal, State#proto_state{will_msg = undefined}}.
|
|
|
|
|
|
-publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
- emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
|
|
|
+publish(Packet = ?PUBLISH(?QOS_0, _PacketId),
|
|
|
+ #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
+ Msg = emqttd_message:from_packet(ClientId, Packet),
|
|
|
+ emqttd_session:publish(Session, Msg);
|
|
|
|
|
|
-publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
- case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
|
|
+publish(Packet = ?PUBLISH(?QOS_1, PacketId),
|
|
|
+ State = #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
+ Msg = emqttd_message:from_packet(ClientId, Packet),
|
|
|
+ case emqttd_session:publish(Session, Msg) of
|
|
|
ok ->
|
|
|
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
|
|
|
{error, Error} ->
|
|
|
- lager:error("Client ~s: publish qos1 error - ~p", [ClientId, Error])
|
|
|
+ lager:error("Client(~s): publish qos1 error - ~p", [ClientId, Error])
|
|
|
end;
|
|
|
|
|
|
-publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
- case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
|
|
+publish(Packet = ?PUBLISH(?QOS_2, PacketId),
|
|
|
+ State = #proto_state{client_id = ClientId, session = Session}) ->
|
|
|
+ Msg = emqttd_message:from_packet(ClientId, Packet),
|
|
|
+ case emqttd_session:publish(Session, Msg) of
|
|
|
ok ->
|
|
|
send(?PUBACK_PACKET(?PUBREC, PacketId), State);
|
|
|
{error, Error} ->
|
|
|
- lager:error("Client ~s: publish qos2 error - ~p", [ClientId, Error])
|
|
|
+ lager:error("Client(~s): publish qos2 error - ~p", [ClientId, Error])
|
|
|
end.
|
|
|
|
|
|
-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
|
|
|
@@ -285,9 +291,9 @@ send(Msg, State) when is_record(Msg, mqtt_message) ->
|
|
|
send(emqttd_message:to_packet(Msg), State);
|
|
|
|
|
|
send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername})
|
|
|
- when is_record(Packet, mqtt_packet) ->
|
|
|
+ when is_record(Packet, mqtt_packet) ->
|
|
|
trace(send, Packet, State),
|
|
|
- sent_stats(Packet),
|
|
|
+ emqttd_metrics:sent(Packet),
|
|
|
Data = emqttd_serialiser:serialise(Packet),
|
|
|
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
|
|
|
emqttd_metrics:inc('bytes/sent', size(Data)),
|
|
|
@@ -370,28 +376,31 @@ validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_c
|
|
|
true;
|
|
|
|
|
|
%% MQTT3.1.1 allow null clientId.
|
|
|
-validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState)
|
|
|
+validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311,
|
|
|
+ client_id = ClientId}, _ProtoState)
|
|
|
when size(ClientId) =:= 0 ->
|
|
|
true;
|
|
|
|
|
|
-validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) ->
|
|
|
+validate_clientid(#mqtt_packet_connect{proto_ver = Ver,
|
|
|
+ clean_sess = CleanSess,
|
|
|
+ client_id = ClientId}, _ProtoState) ->
|
|
|
lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
|
|
|
false.
|
|
|
|
|
|
-validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
|
|
|
+validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
|
|
|
variable = #mqtt_packet_publish{topic_name = Topic}}) ->
|
|
|
case emqttd_topic:validate({name, Topic}) of
|
|
|
- true -> ok;
|
|
|
- false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
|
|
+ true -> ok;
|
|
|
+ false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
|
|
end;
|
|
|
|
|
|
-validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE},
|
|
|
+validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE},
|
|
|
variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
|
|
|
|
|
|
validate_topics(filter, Topics);
|
|
|
|
|
|
-validate_packet(#mqtt_packet{ header = #mqtt_packet_header{type = ?UNSUBSCRIBE},
|
|
|
- variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
|
|
|
+validate_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE},
|
|
|
+ variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
|
|
|
|
|
|
validate_topics(filter, Topics);
|
|
|
|
|
|
@@ -406,13 +415,16 @@ validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
|
|
ErrTopics = [Topic || {Topic, Qos} <- Topics,
|
|
|
not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
|
|
case ErrTopics of
|
|
|
- [] -> ok;
|
|
|
- _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
|
|
|
+ [] -> ok;
|
|
|
+ _ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
|
|
|
end.
|
|
|
|
|
|
-validate_qos(undefined) -> true;
|
|
|
-validate_qos(Qos) when Qos =< ?QOS_2 -> true;
|
|
|
-validate_qos(_) -> false.
|
|
|
+validate_qos(undefined) ->
|
|
|
+ true;
|
|
|
+validate_qos(Qos) when ?IS_QOS(Qos) ->
|
|
|
+ true;
|
|
|
+validate_qos(_) ->
|
|
|
+ false.
|
|
|
|
|
|
%% publish ACL is cached in process dictionary.
|
|
|
check_acl(publish, Topic, State) ->
|
|
|
@@ -428,20 +440,3 @@ check_acl(publish, Topic, State) ->
|
|
|
check_acl(subscribe, Topic, State) ->
|
|
|
emqttd_access_control:check_acl(client(State), subscribe, Topic).
|
|
|
|
|
|
-sent_stats(?PACKET(Type)) ->
|
|
|
- emqttd_metrics:inc('packets/sent'),
|
|
|
- inc(Type).
|
|
|
-inc(?CONNACK) ->
|
|
|
- emqttd_metrics:inc('packets/connack');
|
|
|
-inc(?PUBLISH) ->
|
|
|
- emqttd_metrics:inc('messages/sent'),
|
|
|
- emqttd_metrics:inc('packets/publish/sent');
|
|
|
-inc(?SUBACK) ->
|
|
|
- emqttd_metrics:inc('packets/suback');
|
|
|
-inc(?UNSUBACK) ->
|
|
|
- emqttd_metrics:inc('packets/unsuback');
|
|
|
-inc(?PINGRESP) ->
|
|
|
- emqttd_metrics:inc('packets/pingresp');
|
|
|
-inc(_) ->
|
|
|
- ingore.
|
|
|
-
|