|
|
@@ -29,19 +29,18 @@
|
|
|
-author("Feng Lee <feng@emqtt.io>").
|
|
|
|
|
|
-include_lib("emqtt/include/emqtt.hrl").
|
|
|
+
|
|
|
-include_lib("emqtt/include/emqtt_packet.hrl").
|
|
|
|
|
|
-include("emqttd.hrl").
|
|
|
|
|
|
%% API
|
|
|
--export([init/3, clientid/1]).
|
|
|
+-export([init/3, info/1, clientid/1, client/1]).
|
|
|
|
|
|
-export([received/2, send/2, redeliver/2, shutdown/2]).
|
|
|
|
|
|
-export([handle/2]).
|
|
|
|
|
|
--export([info/1]).
|
|
|
-
|
|
|
%% Protocol State
|
|
|
-record(proto_state, {
|
|
|
peername,
|
|
|
@@ -49,30 +48,29 @@
|
|
|
connected = false, %received CONNECT action?
|
|
|
proto_ver,
|
|
|
proto_name,
|
|
|
- %packet_id,
|
|
|
username,
|
|
|
clientid,
|
|
|
clean_sess,
|
|
|
- session, %% session state or session pid
|
|
|
+ session, %% session state or session pid
|
|
|
will_msg,
|
|
|
- max_clientid_len = ?MAX_CLIENTID_LEN
|
|
|
+ max_clientid_len = ?MAX_CLIENTID_LEN,
|
|
|
+ client_pid
|
|
|
}).
|
|
|
|
|
|
-type proto_state() :: #proto_state{}.
|
|
|
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% @doc Init protocol
|
|
|
+%% @end
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
init(Peername, SendFun, Opts) ->
|
|
|
MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
|
|
|
#proto_state{
|
|
|
peername = Peername,
|
|
|
sendfun = SendFun,
|
|
|
- max_clientid_len = MaxLen}.
|
|
|
-
|
|
|
-clientid(#proto_state{clientid = ClientId}) -> ClientId.
|
|
|
+ max_clientid_len = MaxLen,
|
|
|
+ client_pid = self()}.
|
|
|
|
|
|
-client(#proto_state{peername = {Addr, _Port}, clientid = ClientId, username = Username}) ->
|
|
|
- #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}.
|
|
|
-
|
|
|
-%%SHOULD be registered in emqttd_cm
|
|
|
info(#proto_state{proto_ver = ProtoVer,
|
|
|
proto_name = ProtoName,
|
|
|
clientid = ClientId,
|
|
|
@@ -80,11 +78,27 @@ info(#proto_state{proto_ver = ProtoVer,
|
|
|
will_msg = WillMsg}) ->
|
|
|
[{proto_ver, ProtoVer},
|
|
|
{proto_name, ProtoName},
|
|
|
- {clientid, ClientId},
|
|
|
+ {clientid, ClientId},
|
|
|
{clean_sess, CleanSess},
|
|
|
{will_msg, WillMsg}].
|
|
|
|
|
|
-%%CONNECT – Client requests a connection to a Server
|
|
|
+clientid(#proto_state{clientid = ClientId}) ->
|
|
|
+ ClientId.
|
|
|
+
|
|
|
+client(#proto_state{peername = {Addr, _Port},
|
|
|
+ clientid = ClientId,
|
|
|
+ username = Username,
|
|
|
+ clean_sess = CleanSess,
|
|
|
+ proto_ver = ProtoVer,
|
|
|
+ client_pid = Pid}) ->
|
|
|
+ #mqtt_client{clientid = ClientId,
|
|
|
+ username = Username,
|
|
|
+ ipaddress = Addr,
|
|
|
+ clean_sess = CleanSess,
|
|
|
+ proto_ver = ProtoVer,
|
|
|
+ client_pid = Pid}.
|
|
|
+
|
|
|
+%% CONNECT – Client requests a connection to a Server
|
|
|
|
|
|
%%A Client can only send the CONNECT Packet once over a Network Connection.
|
|
|
-spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
|
|
|
@@ -107,42 +121,45 @@ received(Packet = ?PACKET(_Type), State) ->
|
|
|
{error, Reason, State}
|
|
|
end.
|
|
|
|
|
|
-handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {Addr, _}}) ->
|
|
|
+handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
|
|
|
|
|
|
#mqtt_packet_connect{proto_ver = ProtoVer,
|
|
|
+ proto_name = ProtoName,
|
|
|
username = Username,
|
|
|
password = Password,
|
|
|
clean_sess = CleanSess,
|
|
|
keep_alive = KeepAlive,
|
|
|
- clientid = ClientId} = Var,
|
|
|
+ clientid = ClientId} = Var,
|
|
|
|
|
|
- trace(recv, Packet, State#proto_state{clientid = ClientId}), %%TODO: fix later...
|
|
|
+ State1 = State0#proto_state{proto_ver = ProtoVer,
|
|
|
+ proto_name = ProtoName,
|
|
|
+ username = Username,
|
|
|
+ clientid = ClientId,
|
|
|
+ clean_sess = CleanSess},
|
|
|
|
|
|
- State1 = State#proto_state{proto_ver = ProtoVer,
|
|
|
- username = Username,
|
|
|
- clientid = ClientId,
|
|
|
- clean_sess = CleanSess},
|
|
|
- {ReturnCode1, State2} =
|
|
|
- case validate_connect(Var, State) of
|
|
|
+ trace(recv, Packet, State1),
|
|
|
+
|
|
|
+ {ReturnCode1, State3} =
|
|
|
+ case validate_connect(Var, State1) of
|
|
|
?CONNACK_ACCEPT ->
|
|
|
- Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr},
|
|
|
- case emqttd_access_control:auth(Client, Password) of
|
|
|
+ case emqttd_access_control:auth(client(State1), Password) of
|
|
|
ok ->
|
|
|
- %% Generate one if null
|
|
|
- ClientId1 = clientid(ClientId, State),
|
|
|
- %% Register clientId
|
|
|
- emqttd_cm:register(ClientId1),
|
|
|
+ %% Generate clientId if null
|
|
|
+ State2 = State1#proto_state{clientid = clientid(ClientId, State1)},
|
|
|
+
|
|
|
+ %% Register the client to cm
|
|
|
+ emqttd_cm:register(client(State2)),
|
|
|
+
|
|
|
%%Starting session
|
|
|
- {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}),
|
|
|
+ {ok, Session} = emqttd_session:start({CleanSess, clientid(State2), self()}),
|
|
|
+
|
|
|
%% Start keepalive
|
|
|
start_keepalive(KeepAlive),
|
|
|
- %% Run hooks
|
|
|
- emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]),
|
|
|
- {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1,
|
|
|
- session = Session,
|
|
|
- will_msg = willmsg(Var)}};
|
|
|
+
|
|
|
+ %% ACCEPT
|
|
|
+ {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}};
|
|
|
{error, Reason}->
|
|
|
- lager:error("~s@~s: username '~s' login failed - ~s",
|
|
|
+ lager:error("~s@~s: username '~s', login failed - ~s",
|
|
|
[ClientId, emqttd_net:format(Peername), Username, Reason]),
|
|
|
{?CONNACK_CREDENTIALS, State1}
|
|
|
|
|
|
@@ -150,9 +167,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
|
|
|
ReturnCode ->
|
|
|
{ReturnCode, State1}
|
|
|
end,
|
|
|
- %%TODO: this is not right...
|
|
|
- notify(connected, ReturnCode1, State2),
|
|
|
- send(?CONNACK_PACKET(ReturnCode1), State2);
|
|
|
+ %% Run hooks
|
|
|
+ emqttd_broker:foreach_hooks(client_connected, [ReturnCode1, client(State3)]),
|
|
|
+ %% Send connack
|
|
|
+ send(?CONNACK_PACKET(ReturnCode1), State3);
|
|
|
|
|
|
handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
|
|
|
State = #proto_state{clientid = ClientId, session = Session}) ->
|
|
|
@@ -251,7 +269,6 @@ send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->
|
|
|
%% message from session
|
|
|
send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) ->
|
|
|
send(emqtt_message:to_packet(Message), State);
|
|
|
-
|
|
|
%% message(qos1, qos2) not from session
|
|
|
send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session})
|
|
|
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
|
|
|
@@ -279,12 +296,24 @@ trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
|
|
|
redeliver({?PUBREL, PacketId}, State) ->
|
|
|
send(?PUBREL_PACKET(PacketId), State).
|
|
|
|
|
|
+shutdown(duplicate_id, _State) ->
|
|
|
+ quiet; %%
|
|
|
+
|
|
|
+shutdown(_, #proto_state{clientid = undefined}) ->
|
|
|
+ ignore;
|
|
|
+
|
|
|
+shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) ->
|
|
|
+ lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown",
|
|
|
+ [ClientId, emqttd_net:format(Peername)]),
|
|
|
+ try_unregister(ClientId),
|
|
|
+ emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]);
|
|
|
+
|
|
|
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
|
|
|
- send_willmsg(ClientId, WillMsg),
|
|
|
- try_unregister(ClientId, self()),
|
|
|
- lager:info([{client, ClientId}], "Protocol ~s@~s Shutdown: ~p",
|
|
|
+ lager:info([{client, ClientId}], "Protocol ~s@~s: Shutdown for ~p",
|
|
|
[ClientId, emqttd_net:format(Peername), Error]),
|
|
|
- ok.
|
|
|
+ send_willmsg(ClientId, WillMsg),
|
|
|
+ try_unregister(ClientId),
|
|
|
+ emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]).
|
|
|
|
|
|
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
|
|
|
emqtt_message:from_packet(Packet).
|
|
|
@@ -377,8 +406,8 @@ validate_qos(undefined) -> true;
|
|
|
validate_qos(Qos) when Qos =< ?QOS_2 -> true;
|
|
|
validate_qos(_) -> false.
|
|
|
|
|
|
-try_unregister(undefined, _) -> ok;
|
|
|
-try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId).
|
|
|
+try_unregister(undefined) -> ok;
|
|
|
+try_unregister(ClientId) -> emqttd_cm:unregister(ClientId).
|
|
|
|
|
|
%% publish ACL is cached in process dictionary.
|
|
|
check_acl(publish, Topic, State) ->
|
|
|
@@ -411,18 +440,3 @@ inc(?PINGRESP) ->
|
|
|
inc(_) ->
|
|
|
ingore.
|
|
|
|
|
|
-notify(connected, ReturnCode, #proto_state{peername = Peername,
|
|
|
- proto_ver = ProtoVer,
|
|
|
- clientid = ClientId,
|
|
|
- clean_sess = CleanSess}) ->
|
|
|
- Sess = case CleanSess of
|
|
|
- true -> false;
|
|
|
- false -> true
|
|
|
- end,
|
|
|
- Params = [{from, emqttd_net:format(Peername)},
|
|
|
- {protocol, ProtoVer},
|
|
|
- {session, Sess},
|
|
|
- {connack, ReturnCode}],
|
|
|
- emqttd_event:notify({connected, ClientId, Params}).
|
|
|
-
|
|
|
-
|