|
|
@@ -56,7 +56,7 @@
|
|
|
]).
|
|
|
|
|
|
%% Internal callback
|
|
|
--export([wakeup_from_hib/2]).
|
|
|
+-export([wakeup_from_hib/2, recvloop/2]).
|
|
|
|
|
|
%% Export for CT
|
|
|
-export([set_field/3]).
|
|
|
@@ -284,15 +284,22 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
|
handle_recv({system, From, Request}, Parent, State) ->
|
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
|
|
|
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
|
|
|
+ %% FIXME: it's not trapping exit, should never receive an EXIT
|
|
|
terminate(Reason, State);
|
|
|
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|
|
- process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State)).
|
|
|
+ case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
|
|
|
+ {ok, NewState} ->
|
|
|
+ ?MODULE:recvloop(Parent, NewState);
|
|
|
+ {stop, Reason, NewSate} ->
|
|
|
+ terminate(Reason, NewSate)
|
|
|
+ end.
|
|
|
|
|
|
hibernate(Parent, State) ->
|
|
|
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
|
|
|
|
|
|
%% Maybe do something here later.
|
|
|
-wakeup_from_hib(Parent, State) -> recvloop(Parent, State).
|
|
|
+wakeup_from_hib(Parent, State) ->
|
|
|
+ ?MODULE:recvloop(Parent, State).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Ensure/cancel stats timer
|
|
|
@@ -311,22 +318,31 @@ cancel_stats_timer(State) -> State.
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process next Msg
|
|
|
|
|
|
-process_msg([], Parent, State) -> recvloop(Parent, State);
|
|
|
-
|
|
|
-process_msg([Msg|More], Parent, State) ->
|
|
|
- case catch handle_msg(Msg, State) of
|
|
|
- ok ->
|
|
|
- process_msg(More, Parent, State);
|
|
|
- {ok, NState} ->
|
|
|
- process_msg(More, Parent, NState);
|
|
|
- {ok, Msgs, NState} ->
|
|
|
- process_msg(append_msg(More, Msgs), Parent, NState);
|
|
|
- {stop, Reason} ->
|
|
|
- terminate(Reason, State);
|
|
|
- {stop, Reason, NState} ->
|
|
|
- terminate(Reason, NState);
|
|
|
- {'EXIT', Reason} ->
|
|
|
- terminate(Reason, State)
|
|
|
+process_msg([], State) ->
|
|
|
+ {ok, State};
|
|
|
+process_msg([Msg|More], State) ->
|
|
|
+ try
|
|
|
+ case handle_msg(Msg, State) of
|
|
|
+ ok ->
|
|
|
+ process_msg(More, State);
|
|
|
+ {ok, NState} ->
|
|
|
+ process_msg(More, NState);
|
|
|
+ {ok, Msgs, NState} ->
|
|
|
+ process_msg(append_msg(More, Msgs), NState);
|
|
|
+ {stop, Reason, NState} ->
|
|
|
+ {stop, Reason, NState}
|
|
|
+ end
|
|
|
+ catch
|
|
|
+ exit : normal ->
|
|
|
+ {stop, normal, State};
|
|
|
+ exit : shutdown ->
|
|
|
+ {stop, shutdown, State};
|
|
|
+ exit : {shutdown, _} = Shutdown ->
|
|
|
+ {stop, Shutdown, State};
|
|
|
+ Exception : Context : Stack ->
|
|
|
+ {stop, #{exception => Exception,
|
|
|
+ context => Context,
|
|
|
+ stacktrace => Stack}, State}
|
|
|
end.
|
|
|
|
|
|
-compile({inline, [append_msg/2]}).
|
|
|
@@ -450,18 +466,37 @@ handle_msg(Msg, State) ->
|
|
|
-spec terminate(any(), state()) -> no_return().
|
|
|
terminate(Reason, State = #state{channel = Channel, transport = Transport,
|
|
|
socket = Socket}) ->
|
|
|
- ?tp(debug, terminate, #{reason => Reason}),
|
|
|
- Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
|
|
|
- emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
|
|
- emqx_channel:terminate(Reason, Channel1),
|
|
|
+ try
|
|
|
+ Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
|
|
|
+ emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
|
|
+ emqx_channel:terminate(Reason, Channel1),
|
|
|
+ close_socket_ok(State)
|
|
|
+ catch
|
|
|
+ E : C : S ->
|
|
|
+ ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
|
|
|
+ end,
|
|
|
+ ?tp(debug, terminate, #{}),
|
|
|
+ maybe_raise_excption(Reason).
|
|
|
+
|
|
|
+%% close socket, discard new state, always return ok.
|
|
|
+close_socket_ok(State) ->
|
|
|
_ = close_socket(State),
|
|
|
+ ok.
|
|
|
+
|
|
|
+%% tell truth about the original exception
|
|
|
+maybe_raise_excption(#{exception := Exception,
|
|
|
+ context := Context,
|
|
|
+ stacktrace := Stacktrace
|
|
|
+ }) ->
|
|
|
+ erlang:raise(Exception, Context, Stacktrace);
|
|
|
+maybe_raise_excption(Reason) ->
|
|
|
exit(Reason).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Sys callbacks
|
|
|
|
|
|
system_continue(Parent, _Debug, State) ->
|
|
|
- recvloop(Parent, State).
|
|
|
+ ?MODULE:recvloop(Parent, State).
|
|
|
|
|
|
system_terminate(Reason, _Parent, _Debug, State) ->
|
|
|
terminate(Reason, State).
|