|
|
@@ -26,7 +26,7 @@
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
--export([start_link/1, info/1, go/2]).
|
|
|
+-export([start_link/1, info/1]).
|
|
|
|
|
|
-export([init/1,
|
|
|
handle_call/3,
|
|
|
@@ -40,8 +40,9 @@
|
|
|
-include("emqtt_packet.hrl").
|
|
|
|
|
|
%%Client State...
|
|
|
--record(state, {
|
|
|
- socket,
|
|
|
+-record(state, {
|
|
|
+ transport,
|
|
|
+ socket,
|
|
|
peer_name,
|
|
|
conn_name,
|
|
|
await_recv,
|
|
|
@@ -52,32 +53,28 @@
|
|
|
keepalive
|
|
|
}).
|
|
|
|
|
|
-start_link(Sock) ->
|
|
|
- gen_server:start_link(?MODULE, [Sock], []).
|
|
|
+start_link(SockArgs) ->
|
|
|
+ {ok, proc_lib:spawn_link(?MODULE, init, [SockArgs])}.
|
|
|
|
|
|
info(Pid) ->
|
|
|
gen_server:call(Pid, info).
|
|
|
|
|
|
-go(Pid, Sock) ->
|
|
|
- gen_server:call(Pid, {go, Sock}).
|
|
|
-
|
|
|
-init([Sock]) ->
|
|
|
- {ok, #state{socket = Sock}, 1000}.
|
|
|
-
|
|
|
-handle_call({go, Sock}, _From, #state{socket = Sock}) ->
|
|
|
+init([SockArgs = {Transport, Sock, _SockFun}]) ->
|
|
|
+ %%TODO: replace emqtt_net??
|
|
|
{ok, Peername} = emqtt_net:peer_string(Sock),
|
|
|
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
|
|
|
lager:info("Connect from ~s", [ConnStr]),
|
|
|
- {reply, ok,
|
|
|
- control_throttle(
|
|
|
- #state{ socket = Sock,
|
|
|
- peer_name = Peername,
|
|
|
- conn_name = ConnStr,
|
|
|
- await_recv = false,
|
|
|
- conn_state = running,
|
|
|
- conserve = false,
|
|
|
- parse_state = emqtt_packet:initial_state(),
|
|
|
- proto_state = emqtt_protocol:initial_state(Sock, Peername)}), 10000};
|
|
|
+ {ok, NewSock} = esockd_connection:accept(SockArgs),
|
|
|
+ State = control_throttle(#state{transport = Transport,
|
|
|
+ socket = Sock,
|
|
|
+ peer_name = Peername,
|
|
|
+ conn_name = ConnStr,
|
|
|
+ await_recv = false,
|
|
|
+ conn_state = running,
|
|
|
+ conserve = false,
|
|
|
+ parse_state = emqtt_packet:initial_state(),
|
|
|
+ proto_state = emqtt_protocol:initial_state(Transport, NewSock, Peername)}),
|
|
|
+ gen_server:enter_loop(?MODULE, [], State, 10000).
|
|
|
|
|
|
handle_call(info, _From, State = #state{
|
|
|
conn_name=ConnName, proto_state = ProtoState}) ->
|
|
|
@@ -157,12 +154,6 @@ terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_sta
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
-async_recv(Sock, Length, infinity) when is_port(Sock) ->
|
|
|
- prim_inet:async_recv(Sock, Length, -1);
|
|
|
-
|
|
|
-async_recv(Sock, Length, Timeout) when is_port(Sock) ->
|
|
|
- prim_inet:async_recv(Sock, Length, Timeout).
|
|
|
-
|
|
|
%-------------------------------------------------------
|
|
|
% receive and parse tcp data
|
|
|
%-------------------------------------------------------
|
|
|
@@ -203,12 +194,12 @@ network_error(Reason, State = #state{ peer_name = PeerName }) ->
|
|
|
lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]),
|
|
|
stop({shutdown, conn_closed}, State).
|
|
|
|
|
|
-run_socket(State = #state{ conn_state = blocked }) ->
|
|
|
+run_socket(State = #state{conn_state = blocked}) ->
|
|
|
State;
|
|
|
-run_socket(State = #state{ await_recv = true }) ->
|
|
|
+run_socket(State = #state{await_recv = true}) ->
|
|
|
State;
|
|
|
-run_socket(State = #state{ socket = Sock }) ->
|
|
|
- async_recv(Sock, 0, infinity),
|
|
|
+run_socket(State = #state{transport = Transport, socket = Sock}) ->
|
|
|
+ Transport:async_recv(Sock, 0, infinity),
|
|
|
State#state{ await_recv = true }.
|
|
|
|
|
|
control_throttle(State = #state{ conn_state = Flow,
|
|
|
@@ -223,4 +214,3 @@ control_throttle(State = #state{ conn_state = Flow,
|
|
|
stop(Reason, State ) ->
|
|
|
{stop, Reason, State}.
|
|
|
|
|
|
-
|