|
|
@@ -246,11 +246,8 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
|
|
|
?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
|
|
|
shutdown(conflict, State);
|
|
|
|
|
|
-handle_info({tcp, _Sock, Data}, State) ->
|
|
|
+handle_info({TcpOrSsL, _Sock, Data}, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
|
|
|
process_incoming(Data, State);
|
|
|
-%% FIXME Later
|
|
|
-handle_info({ssl, _Sock, Data}, State) ->
|
|
|
- process_incoming(Data, run_socket(State));
|
|
|
|
|
|
%% Rate limit here, cool:)
|
|
|
handle_info({tcp_passive, _Sock}, State) ->
|
|
|
@@ -259,14 +256,10 @@ handle_info({tcp_passive, _Sock}, State) ->
|
|
|
handle_info({ssl_passive, _Sock}, State) ->
|
|
|
{noreply, run_socket(ensure_rate_limit(State))};
|
|
|
|
|
|
-handle_info({tcp_error, _Sock, Reason}, State) ->
|
|
|
- shutdown(Reason, State);
|
|
|
-handle_info({ssl_error, _Sock, Reason}, State) ->
|
|
|
+handle_info({Err, _Sock, Reason}, State) when Err =:= tcp_error; Err =:= ssl_error ->
|
|
|
shutdown(Reason, State);
|
|
|
|
|
|
-handle_info({tcp_closed, _Sock}, State) ->
|
|
|
- shutdown(closed, State);
|
|
|
-handle_info({ssl_closed, _Sock}, State) ->
|
|
|
+handle_info({Closed, _Sock}, State) when Closed =:= tcp_closed; Closed =:= ssl_closed ->
|
|
|
shutdown(closed, State);
|
|
|
|
|
|
%% Rate limit timer
|
|
|
@@ -380,7 +373,6 @@ reset_parser(State = #state{proto_state = ProtoState}) ->
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Ensure rate limit
|
|
|
-%%------------------------------------------------------------------------------
|
|
|
|
|
|
ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) ->
|
|
|
Limiters = [{Pl, #state.pub_limit, emqx_pd:reset_counter(incoming_pubs)},
|
|
|
@@ -400,17 +392,26 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
|
|
|
setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1)
|
|
|
end.
|
|
|
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Activate socket
|
|
|
+
|
|
|
run_socket(State = #state{conn_state = blocked}) ->
|
|
|
State;
|
|
|
-run_socket(State = #state{transport = Transport,
|
|
|
- socket = Socket,
|
|
|
- active_n = N}) ->
|
|
|
- ensure_ok_or_exit(Transport:setopts(Socket, [{active, N}])),
|
|
|
+
|
|
|
+run_socket(State = #state{transport = Transport, socket = Socket, active_n = N}) ->
|
|
|
+ TrueOrN = case Transport:is_ssl(Socket) of
|
|
|
+ true -> true; %% Cannot set '{active, N}' for SSL:(
|
|
|
+ false -> N
|
|
|
+ end,
|
|
|
+ ensure_ok_or_exit(Transport:setopts(Socket, [{active, TrueOrN}])),
|
|
|
State.
|
|
|
|
|
|
+ensure_ok_or_exit(ok) -> ok;
|
|
|
+ensure_ok_or_exit({error, Reason}) ->
|
|
|
+ self() ! {shutdown, Reason}.
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Ensure stats timer
|
|
|
-%%------------------------------------------------------------------------------
|
|
|
|
|
|
ensure_stats_timer(State = #state{enable_stats = true,
|
|
|
stats_timer = undefined,
|
|
|
@@ -418,11 +419,8 @@ ensure_stats_timer(State = #state{enable_stats = true,
|
|
|
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
|
|
ensure_stats_timer(State) -> State.
|
|
|
|
|
|
-shutdown(Reason, State) ->
|
|
|
- stop({shutdown, Reason}, State).
|
|
|
-
|
|
|
-stop(Reason, State) ->
|
|
|
- {stop, Reason, State}.
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Maybe GC
|
|
|
|
|
|
maybe_gc(_, State = #state{gc_state = undefined}) ->
|
|
|
State;
|
|
|
@@ -435,8 +433,11 @@ maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) ->
|
|
|
maybe_gc(_, State) ->
|
|
|
State.
|
|
|
|
|
|
-ensure_ok_or_exit(ok) ->
|
|
|
- ok;
|
|
|
-ensure_ok_or_exit({error, Reason}) ->
|
|
|
- self() ! {shutdown, Reason}.
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Shutdown or stop
|
|
|
+
|
|
|
+shutdown(Reason, State) ->
|
|
|
+ stop({shutdown, Reason}, State).
|
|
|
|
|
|
+stop(Reason, State) ->
|
|
|
+ {stop, Reason, State}.
|