|
|
@@ -50,7 +50,7 @@
|
|
|
parse_state :: emqx_frame:parse_state(),
|
|
|
chan_state :: emqx_channel:channel(),
|
|
|
pendings :: list(),
|
|
|
- reason :: term()
|
|
|
+ stop_reason :: term()
|
|
|
}).
|
|
|
|
|
|
-type(state() :: #state{}).
|
|
|
@@ -97,6 +97,12 @@ stats(#state{chan_state = ChanState}) ->
|
|
|
ChanStats = emqx_channel:stats(ChanState),
|
|
|
lists:append([ProcStats, wsock_stats(), conn_stats(), ChanStats]).
|
|
|
|
|
|
+wsock_stats() ->
|
|
|
+ [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].
|
|
|
+
|
|
|
+conn_stats() ->
|
|
|
+ [{Name, emqx_pd:get_counter(Name)} || Name <- ?CONN_STATS].
|
|
|
+
|
|
|
-spec(kick(pid()) -> ok).
|
|
|
kick(CPid) ->
|
|
|
call(CPid, kick).
|
|
|
@@ -311,8 +317,8 @@ websocket_info(Info, State = #state{chan_state = ChanState}) ->
|
|
|
stop(Reason, State#state{chan_state = NChanState})
|
|
|
end.
|
|
|
|
|
|
-terminate(SockError, _Req, #state{chan_state = ChanState,
|
|
|
- reason = Reason}) ->
|
|
|
+terminate(SockError, _Req, #state{chan_state = ChanState,
|
|
|
+ stop_reason = Reason}) ->
|
|
|
?LOG(debug, "Terminated for ~p, sockerror: ~p",
|
|
|
[Reason, SockError]),
|
|
|
emqx_channel:terminate(Reason, ChanState).
|
|
|
@@ -424,20 +430,14 @@ reply(State = #state{chan_state = ChanState, pendings = Pendings}) ->
|
|
|
{reply, Reply, State#state{chan_state = NChanState, pendings = []}}.
|
|
|
|
|
|
stop(Reason, State = #state{pendings = []}) ->
|
|
|
- {stop, State#state{reason = Reason}};
|
|
|
+ {stop, State#state{stop_reason = Reason}};
|
|
|
stop(Reason, State = #state{pendings = Pendings}) ->
|
|
|
Reply = handle_outgoing(Pendings, State),
|
|
|
{reply, [Reply, close],
|
|
|
- State#state{pendings = [], reason = Reason}}.
|
|
|
+ State#state{pendings = [], stop_reason = Reason}}.
|
|
|
|
|
|
enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
|
|
|
enqueue([Packet], State);
|
|
|
enqueue(Packets, State = #state{pendings = Pendings}) ->
|
|
|
State#state{pendings = lists:append(Pendings, Packets)}.
|
|
|
|
|
|
-wsock_stats() ->
|
|
|
- [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].
|
|
|
-
|
|
|
-conn_stats() ->
|
|
|
- [{Name, emqx_pd:get_counter(Name)} || Name <- ?CONN_STATS].
|
|
|
-
|