Explorar o código

Ensure stats timer

Feng Lee %!s(int64=6) %!d(string=hai) anos
pai
achega
f2b552e29e
Modificáronse 3 ficheiros con 60 adicións e 38 borrados
  1. 14 10
      src/emqx_channel.erl
  2. 23 15
      src/emqx_connection.erl
  3. 23 13
      src/emqx_ws_connection.erl

+ 14 - 10
src/emqx_channel.erl

@@ -46,6 +46,8 @@
         , terminate/2
         ]).
 
+-export([ensure_timer/2]).
+
 -export([gc/3]).
 
 -import(emqx_access_control,
@@ -53,6 +55,8 @@
         , check_acl/3
         ]).
 
+-import(emqx_misc, [start_timer/2]).
+
 -export_type([channel/0]).
 
 -record(channel, {
@@ -659,17 +663,17 @@ handle_info(Info, Channel) ->
       -> {ok, channel()}
        | {ok, Result :: term(), channel()}
        | {stop, Reason :: term(), channel()}).
+timeout(TRef, {emit_stats, Stats}, Channel = #channel{stats_timer = TRef}) ->
+    ClientId = info(client_id, Channel),
+    ok = emqx_cm:set_chan_stats(ClientId, Stats),
+    {ok, Channel#channel{stats_timer = undefined}};
+
 timeout(TRef, retry_deliver, Channel = #channel{%%session = Session,
                                                 retry_timer = TRef}) ->
     %% case emqx_session:retry(Session) of
     %% TODO: ...
     {ok, Channel#channel{retry_timer = undefined}};
 
-timeout(TRef, emit_stats, Channel = #channel{stats_timer = TRef}) ->
-    ClientId = info(client_id, Channel),
-    %% ok = emqx_cm:set_chan_stats(ClientId, stats(Channel)),
-    {ok, Channel#channel{stats_timer = undefined}};
-
 timeout(_TRef, Msg, Channel) ->
     ?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
     {ok, Channel}.
@@ -678,17 +682,17 @@ timeout(_TRef, Msg, Channel) ->
 %% Ensure timers
 %%--------------------------------------------------------------------
 
+ensure_timer(emit_stats, Channel = #channel{stats_timer = undefined,
+                                            idle_timeout = IdleTimeout
+                                           }) ->
+    Channel#channel{stats_timer = start_timer(IdleTimeout, emit_stats)};
+
 ensure_timer(retry, Channel = #channel{session = Session,
                                        retry_timer = undefined}) ->
     Interval = emqx_session:info(retry_interval, Session),
     TRef = emqx_misc:start_timer(Interval, retry_deliver),
     Channel#channel{retry_timer = TRef};
 
-ensure_timer(stats, Channel = #channel{stats_timer = undefined,
-                                       idle_timeout = IdleTimeout}) ->
-    TRef = emqx_misc:start_timer(IdleTimeout, emit_stats),
-    Channel#channel{stats_timer = TRef};
-
 %% disabled or timer existed
 ensure_timer(_Name, Channel) ->
     Channel.

+ 23 - 15
src/emqx_connection.erl

@@ -360,7 +360,8 @@ handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState})
     ?LOG(debug, "RECV ~p", [Data]),
     emqx_pd:update_counter(incoming_bytes, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
-    NChanState = emqx_channel:gc(1, Oct, ChanState),
+    NChanState = emqx_channel:ensure_timer(
+                   emit_stats, emqx_channel:gc(1, Oct, ChanState)),
     process_incoming(Data, State#state{chan_state = NChanState});
 
 handle(info, {Error, _Sock, Reason}, State)
@@ -398,24 +399,19 @@ handle(info, activate_socket, State) ->
             shutdown(Reason, NState)
     end;
 
-handle(info, {inet_reply, _Sock, ok}, State) ->
+handle(info, {inet_reply, _Sock, ok}, State = #state{chan_state = ChanState}) ->
     %% something sent
-    keep_state(State);
+    NChanState = emqx_channel:ensure_timer(emit_stats, ChanState),
+    keep_state(State#state{chan_state = NChanState});
 
 handle(info, {inet_reply, _Sock, {error, Reason}}, State) ->
     shutdown(Reason, State);
 
-handle(info, {timeout, TRef, Msg}, State = #state{chan_state = ChanState})
-  when is_reference(TRef) ->
-    case emqx_channel:timeout(TRef, Msg, ChanState) of
-        {ok, NChanState} ->
-            keep_state(State#state{chan_state = NChanState});
-        {ok, Packets, NChanState} ->
-            handle_outgoing(Packets, fun keep_state/1,
-                            State#state{chan_state = NChanState});
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end;
+handle(info, {timeout, TRef, emit_stats}, State) when is_reference(TRef) ->
+    handle_timeout(TRef, {emit_stats, stats(State)}, State);
+
+handle(info, {timeout, TRef, Msg}, State) when is_reference(TRef) ->
+    handle_timeout(TRef, Msg, State);
 
 handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
     ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]),
@@ -528,7 +524,19 @@ send(IoData, SuccFun, State = #state{transport = Transport,
             shutdown(Reason, State)
     end.
 
-%% TODO: maybe_gc(1, Oct, State)
+%%--------------------------------------------------------------------
+%% Handle timeout
+
+handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) ->
+    case emqx_channel:timeout(TRef, Msg, ChanState) of
+        {ok, NChanState} ->
+            keep_state(State#state{chan_state = NChanState});
+        {ok, Packets, NChanState} ->
+            handle_outgoing(Packets, fun keep_state/1,
+                            State#state{chan_state = NChanState});
+        {stop, Reason, NChanState} ->
+            stop(Reason, State#state{chan_state = NChanState})
+    end.
 
 %%--------------------------------------------------------------------
 %% Ensure keepalive

+ 23 - 13
src/emqx_ws_connection.erl

@@ -198,7 +198,8 @@ websocket_handle({binary, Data}, State = #state{chan_state = ChanState})
     emqx_pd:update_counter(recv_cnt, 1),
     emqx_pd:update_counter(recv_oct, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
-    NChanState = emqx_channel:gc(1, Oct, ChanState),
+    NChanState = emqx_channel:ensure_timer(
+                   emit_stats, emqx_channel:gc(1, Oct, ChanState)),
     process_incoming(Data, State#state{chan_state = NChanState});
 
 %% Pings should be replied with pongs, cowboy does it automatically
@@ -281,16 +282,11 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
             stop(keepalive_error, State)
     end;
 
-websocket_info({timeout, TRef, Msg}, State = #state{chan_state = ChanState})
-  when is_reference(TRef) ->
-    case emqx_channel:timeout(TRef, Msg, ChanState) of
-        {ok, NChanState} ->
-            {ok, State#state{chan_state = NChanState}};
-        {ok, Packets, NChanState} ->
-            reply(enqueue(Packets, State#state{chan_state = NChanState}));
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end;
+websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) ->
+    handle_timeout(TRef, {emit_stats, stats(State)}, State);
+
+websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
+    handle_timeout(TRef, Msg, State);
 
 websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
     ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]),
@@ -341,6 +337,19 @@ connected(State = #state{chan_state = ChanState}) ->
             stop(Reason, NState)
     end.
 
+%%--------------------------------------------------------------------
+%% Handle timeout
+
+handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) ->
+    case emqx_channel:timeout(TRef, Msg, ChanState) of
+        {ok, NChanState} ->
+            {ok, State#state{chan_state = NChanState}};
+        {ok, Packets, NChanState} ->
+            reply(enqueue(Packets, State#state{chan_state = NChanState}));
+        {stop, Reason, NChanState} ->
+            stop(Reason, State#state{chan_state = NChanState})
+    end.
+
 %%--------------------------------------------------------------------
 %% Ensure keepalive
 
@@ -429,9 +438,10 @@ inc_outgoing_stats(Type) ->
 
 reply(State = #state{pendings = []}) ->
     {ok, State};
-reply(State = #state{pendings = Pendings}) ->
+reply(State = #state{chan_state = ChanState, pendings = Pendings}) ->
     Reply = handle_outgoing(Pendings, State),
-    {reply, Reply, State#state{pendings = []}}.
+    NChanState = emqx_channel:ensure_timer(emit_stats, ChanState),
+    {reply, Reply, State#state{chan_state = NChanState, pendings = []}}.
 
 stop(Reason, State = #state{pendings = []}) ->
     {stop, State#state{reason = Reason}};