|
|
@@ -52,16 +52,18 @@
|
|
|
-export([prioritise_call/4, prioritise_info/3, handle_pre_hibernate/1]).
|
|
|
|
|
|
%% Client State
|
|
|
--record(client_state, {connection, connname, peername, peerhost, peerport, await_recv,
|
|
|
- conn_state, rate_limit, packet_limit, parse_state, proto_state,
|
|
|
+%% Unused fields: connname, peerhost, peerport
|
|
|
+-record(client_state, {connection, peername, conn_state, await_recv,
|
|
|
+ rate_limit, packet_size, parser, proto_state,
|
|
|
keepalive, enable_stats}).
|
|
|
|
|
|
--define(INFO_KEYS, [connname, peername, peerhost, peerport, await_recv, conn_state]).
|
|
|
+-define(INFO_KEYS, [peername, conn_state, await_recv]).
|
|
|
|
|
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
|
|
|
|
|
-define(LOG(Level, Format, Args, State),
|
|
|
- lager:Level("Client(~s): " ++ Format, [State#client_state.connname | Args])).
|
|
|
+ lager:Level("Client(~s): " ++ Format,
|
|
|
+ [esockd_net:format(State#client_state.peername) | Args])).
|
|
|
|
|
|
start_link(Conn, Env) ->
|
|
|
{ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
|
|
|
@@ -96,59 +98,56 @@ session(CPid) ->
|
|
|
|
|
|
init([Conn0, Env]) ->
|
|
|
{ok, Conn} = Conn0:wait(),
|
|
|
- {PeerHost, PeerPort, PeerName} =
|
|
|
case Conn:peername() of
|
|
|
- {ok, Peer = {Host, Port}} ->
|
|
|
- {Host, Port, Peer};
|
|
|
- {error, enotconn} ->
|
|
|
- Conn:fast_close(),
|
|
|
- exit(normal);
|
|
|
- {error, Reason} ->
|
|
|
- Conn:fast_close(),
|
|
|
- exit({shutdown, Reason})
|
|
|
- end,
|
|
|
- ConnName = esockd_net:format(PeerName),
|
|
|
- Self = self(),
|
|
|
- %% Send Packet...
|
|
|
- SendFun = fun(Packet) ->
|
|
|
- Data = emqttd_serializer:serialize(Packet),
|
|
|
- ?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}),
|
|
|
- emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
|
|
|
- try Conn:async_send(Data) of
|
|
|
- true -> ok
|
|
|
- catch
|
|
|
- error:Error -> Self ! {shutdown, Error}
|
|
|
- end
|
|
|
- end,
|
|
|
+ {ok, Peername} -> do_init(Conn, Env, Peername);
|
|
|
+ {error, enotconn} -> Conn:fast_close(),
|
|
|
+ exit(normal);
|
|
|
+ {error, Reason} -> Conn:fast_close(),
|
|
|
+ exit({shutdown, Reason})
|
|
|
+ end.
|
|
|
+
|
|
|
+do_init(Conn, Env, Peername) ->
|
|
|
+ %% Send Fun
|
|
|
+ SendFun = send_fun(Conn, Peername),
|
|
|
RateLimit = get_value(rate_limit, Conn:opts()),
|
|
|
- PacketLimit = proplists:get_value(max_packet_size, Env, ?MAX_PACKET_LEN),
|
|
|
- ParseState = emqttd_parser:initial_state(PacketLimit),
|
|
|
- ProtoState = emqttd_protocol:init(PeerName, SendFun, Env),
|
|
|
+ PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
|
|
|
+ Parser = emqttd_parser:initial_state(PacketSize),
|
|
|
+ ProtoState = emqttd_protocol:init(Peername, SendFun, Env),
|
|
|
EnableStats = get_value(client_enable_stats, Env, false),
|
|
|
State = run_socket(#client_state{connection = Conn,
|
|
|
- connname = ConnName,
|
|
|
- peername = PeerName,
|
|
|
- peerhost = PeerHost,
|
|
|
- peerport = PeerPort,
|
|
|
+ peername = Peername,
|
|
|
await_recv = false,
|
|
|
conn_state = running,
|
|
|
rate_limit = RateLimit,
|
|
|
- packet_limit = PacketLimit,
|
|
|
- parse_state = ParseState,
|
|
|
+ packet_size = PacketSize,
|
|
|
+ parser = Parser,
|
|
|
proto_state = ProtoState,
|
|
|
enable_stats = EnableStats}),
|
|
|
IdleTimout = get_value(client_idle_timeout, Env, 30000),
|
|
|
gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
|
|
|
{backoff, 1000, 1000, 5000}).
|
|
|
|
|
|
+send_fun(Conn, Peername) ->
|
|
|
+ Self = self(),
|
|
|
+ fun(Packet) ->
|
|
|
+ Data = emqttd_serializer:serialize(Packet),
|
|
|
+ ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
|
|
|
+ emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
|
|
|
+ try Conn:async_send(Data) of
|
|
|
+ true -> ok
|
|
|
+ catch
|
|
|
+ error:Error -> Self ! {shutdown, Error}
|
|
|
+ end
|
|
|
+ end.
|
|
|
+
|
|
|
prioritise_call(Msg, _From, _Len, _State) ->
|
|
|
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
|
|
|
|
|
|
prioritise_info(Msg, _Len, _State) ->
|
|
|
case Msg of {redeliver, _} -> 5; _ -> 0 end.
|
|
|
|
|
|
-handle_pre_hibernate(State = #client_state{connname = Connname}) ->
|
|
|
- io:format("Client(~s) will hibernate!~n", [Connname]),
|
|
|
+handle_pre_hibernate(State = #client_state{peername = Peername}) ->
|
|
|
+ io:format("Client(~s) will hibernate!~n", [esockd_net:format(Peername)]),
|
|
|
{hibernate, emit_stats(State)}.
|
|
|
|
|
|
handle_call(info, From, State = #client_state{proto_state = ProtoState}) ->
|
|
|
@@ -295,17 +294,17 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
received(<<>>, State) ->
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
-received(Bytes, State = #client_state{parse_state = ParseState,
|
|
|
- packet_limit = PacketLimit,
|
|
|
+received(Bytes, State = #client_state{parser = Parser,
|
|
|
+ packet_size = PacketSize,
|
|
|
proto_state = ProtoState}) ->
|
|
|
- case catch emqttd_parser:parse(Bytes, ParseState) of
|
|
|
- {more, NewParseState} ->
|
|
|
- {noreply, run_socket(State#client_state{parse_state = NewParseState}), hibernate};
|
|
|
+ case catch emqttd_parser:parse(Bytes, Parser) of
|
|
|
+ {more, NewParser} ->
|
|
|
+ {noreply, run_socket(State#client_state{parser = NewParser}), hibernate};
|
|
|
{ok, Packet, Rest} ->
|
|
|
emqttd_metrics:received(Packet),
|
|
|
case emqttd_protocol:received(Packet, ProtoState) of
|
|
|
{ok, ProtoState1} ->
|
|
|
- received(Rest, State#client_state{parse_state = emqttd_parser:initial_state(PacketLimit),
|
|
|
+ received(Rest, State#client_state{parser = emqttd_parser:initial_state(PacketSize),
|
|
|
proto_state = ProtoState1});
|
|
|
{error, Error} ->
|
|
|
?LOG(error, "Protocol error - ~p", [Error], State),
|