Sfoglia il codice sorgente

Try to simulate a '{ssl_passive, Sock}' message:(

Feng Lee 7 anni fa
parent
commit
3fec9cdf0a
1 ha cambiato i file con 39 aggiunte e 15 eliminazioni
  1. 39 15
      src/emqx_connection.erl

+ 39 - 15
src/emqx_connection.erl

@@ -214,8 +214,8 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
 
 
 handle_info({timeout, Timer, emit_stats},
 handle_info({timeout, Timer, emit_stats},
             State = #state{stats_timer = Timer,
             State = #state{stats_timer = Timer,
-                           proto_state = ProtoState
-                          }) ->
+                           proto_state = ProtoState,
+                           gc_state = GcState}) ->
     emqx_metrics:commit(),
     emqx_metrics:commit(),
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     NewState = State#state{stats_timer = undefined},
     NewState = State#state{stats_timer = undefined},
@@ -224,8 +224,9 @@ handle_info({timeout, Timer, emit_stats},
         continue ->
         continue ->
             {noreply, NewState};
             {noreply, NewState};
         hibernate ->
         hibernate ->
-            ok = emqx_gc:reset(),
-            {noreply, NewState, hibernate};
+            %% going to hibernate, reset gc stats
+            GcState1 = emqx_gc:reset(GcState),
+            {noreply, NewState#state{gc_state = GcState1}, hibernate};
         {shutdown, Reason} ->
         {shutdown, Reason} ->
             ?LOG(warning, "shutdown due to ~p", [Reason]),
             ?LOG(warning, "shutdown due to ~p", [Reason]),
             shutdown(Reason, NewState)
             shutdown(Reason, NewState)
@@ -246,22 +247,29 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
     shutdown(conflict, State);
     shutdown(conflict, State);
 
 
 handle_info({tcp, _Sock, Data}, State) ->
 handle_info({tcp, _Sock, Data}, State) ->
-    ?LOG(debug, "RECV ~p", [Data]),
-    Oct = iolist_size(Data),
-    emqx_pd:update_counter(incoming_bytes, Oct),
-    emqx_metrics:trans(inc, 'bytes/received', Oct),
-    handle_packet(Data, maybe_gc({1, Oct}, State));
+    process_incoming(Data, State);
+%% FIXME Later
+handle_info({ssl, _Sock, Data}, State) ->
+    process_incoming(Data, run_socket(State));
 
 
 %% Rate limit here, cool:)
 %% Rate limit here, cool:)
 handle_info({tcp_passive, _Sock}, State) ->
 handle_info({tcp_passive, _Sock}, State) ->
     {noreply, run_socket(ensure_rate_limit(State))};
     {noreply, run_socket(ensure_rate_limit(State))};
+%% FIXME Later
+handle_info({ssl_passive, _Sock}, State) ->
+    {noreply, run_socket(ensure_rate_limit(State))};
 
 
 handle_info({tcp_error, _Sock, Reason}, State) ->
 handle_info({tcp_error, _Sock, Reason}, State) ->
     shutdown(Reason, State);
     shutdown(Reason, State);
+handle_info({ssl_error, _Sock, Reason}, State) ->
+    shutdown(Reason, State);
 
 
 handle_info({tcp_closed, _Sock}, State) ->
 handle_info({tcp_closed, _Sock}, State) ->
     shutdown(closed, State);
     shutdown(closed, State);
+handle_info({ssl_closed, _Sock}, State) ->
+    shutdown(closed, State);
 
 
+%% Rate limit timer
 handle_info(activate_sock, State) ->
 handle_info(activate_sock, State) ->
     {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})};
     {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})};
 
 
@@ -319,12 +327,24 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
-%% Parse and handle packets
+%% Internals: process incoming, parse and handle packets
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
-%% Receive and parse data
+process_incoming(Data, State) ->
+    Oct = iolist_size(Data),
+    ?LOG(debug, "RECV ~p", [Data]),
+    emqx_pd:update_counter(incoming_bytes, Oct),
+    emqx_metrics:trans(inc, 'bytes/received', Oct),
+    case handle_packet(Data, State) of
+        {noreply, State1} ->
+            State2 = maybe_gc({1, Oct}, State1),
+            {noreply, ensure_stats_timer(State2)};
+        Shutdown -> Shutdown
+    end.
+
+%% Parse and handle packets
 handle_packet(<<>>, State) ->
 handle_packet(<<>>, State) ->
-    {noreply, ensure_stats_timer(State)};
+    {noreply, State};
 
 
 handle_packet(Data, State = #state{proto_state  = ProtoState,
 handle_packet(Data, State = #state{proto_state  = ProtoState,
                                    parser_state = ParserState,
                                    parser_state = ParserState,
@@ -384,8 +404,8 @@ run_socket(State = #state{conn_state = blocked}) ->
     State;
     State;
 run_socket(State = #state{transport = Transport,
 run_socket(State = #state{transport = Transport,
                           socket = Socket,
                           socket = Socket,
-                          active_n = ActiveN}) ->
-    Transport:setopts(Socket, [{active, ActiveN}]),
+                          active_n = N}) ->
+    ensure_ok_or_exit(Transport:setopts(Socket, [{active, N}])),
     State.
     State.
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -393,7 +413,7 @@ run_socket(State = #state{transport = Transport,
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
 ensure_stats_timer(State = #state{enable_stats = true,
 ensure_stats_timer(State = #state{enable_stats = true,
-                                  stats_timer  = undefined,
+                                  stats_timer = undefined,
                                   idle_timeout = IdleTimeout}) ->
                                   idle_timeout = IdleTimeout}) ->
     State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
     State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
 ensure_stats_timer(State) -> State.
 ensure_stats_timer(State) -> State.
@@ -415,4 +435,8 @@ maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) ->
 maybe_gc(_, State) ->
 maybe_gc(_, State) ->
     State.
     State.
 
 
+ensure_ok_or_exit(ok) ->
+    ok;
+ensure_ok_or_exit({error, Reason}) ->
+    self() ! {shutdown, Reason}.