|
|
@@ -224,10 +224,13 @@ idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
|
|
SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
|
|
|
handle_incoming(Packet, SuccFun, NState);
|
|
|
|
|
|
-idle(cast, {incoming, Packet}, State) ->
|
|
|
+idle(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
|
|
|
?LOG(warning, "Unexpected incoming: ~p", [Packet]),
|
|
|
shutdown(unexpected_incoming_packet, State);
|
|
|
|
|
|
+idle(cast, {incoming, {error, Reason}}, State) ->
|
|
|
+ shutdown(Reason, State);
|
|
|
+
|
|
|
idle(EventType, Content, State) ->
|
|
|
?HANDLE(EventType, Content, State).
|
|
|
|
|
|
@@ -241,6 +244,17 @@ connected(enter, _PrevSt, State) ->
|
|
|
connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
|
|
|
handle_incoming(Packet, fun keep_state/1, State);
|
|
|
|
|
|
+connected(cast, {incoming, {error, Reason}}, State = #connection{chan_state = ChanState}) ->
|
|
|
+ case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of
|
|
|
+ {wait_session_expire, _, NChanState} ->
|
|
|
+ ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
|
|
+ {next_state, disconnected, State#connection{chan_state= NChanState}};
|
|
|
+ {wait_session_expire, _, OutPackets, NChanState} ->
|
|
|
+ ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
|
|
+ NState = State#connection{chan_state= NChanState},
|
|
|
+ {next_state, disconnected, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)}
|
|
|
+ end;
|
|
|
+
|
|
|
connected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
|
|
|
handle_deliver(emqx_misc:drain_deliver([Deliver]), State);
|
|
|
|
|
|
@@ -408,8 +422,7 @@ process_incoming(Data, State) ->
|
|
|
process_incoming(<<>>, Packets, State) ->
|
|
|
{keep_state, State, next_incoming_events(Packets)};
|
|
|
|
|
|
-process_incoming(Data, Packets, State = #connection{parse_state = ParseState,
|
|
|
- chan_state = ChanState}) ->
|
|
|
+process_incoming(Data, Packets, State = #connection{parse_state = ParseState}) ->
|
|
|
try emqx_frame:parse(Data, ParseState) of
|
|
|
{more, NParseState} ->
|
|
|
NState = State#connection{parse_state = NParseState},
|
|
|
@@ -418,32 +431,16 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState,
|
|
|
NState = State#connection{parse_state = NParseState},
|
|
|
process_incoming(Rest, [Packet|Packets], NState);
|
|
|
{error, Reason} ->
|
|
|
- shutdown(Reason, State)
|
|
|
+ {keep_state, State, next_incoming_events({error, Reason})}
|
|
|
catch
|
|
|
error:Reason:Stk ->
|
|
|
- ?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Reason, Stk, Data]),
|
|
|
- Result =
|
|
|
- case emqx_channel:info(connected, ChanState) of
|
|
|
- undefined ->
|
|
|
- emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState);
|
|
|
- true ->
|
|
|
- emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState);
|
|
|
- _ ->
|
|
|
- ignore
|
|
|
- end,
|
|
|
- case Result of
|
|
|
- {stop, Reason0, OutPackets, NChanState} ->
|
|
|
- Shutdown = fun(NewSt) -> stop(Reason0, NewSt) end,
|
|
|
- NState = State#connection{chan_state = NChanState},
|
|
|
- handle_outgoing(OutPackets, Shutdown, NState);
|
|
|
- {stop, Reason0, NChanState} ->
|
|
|
- stop(Reason0, State#connection{chan_state = NChanState});
|
|
|
- ignore ->
|
|
|
- keep_state(State)
|
|
|
- end
|
|
|
+ ?LOG(error, "~nParse failed for ~p~nStacktrace: ~p~nError data:~p", [Reason, Stk, Data]),
|
|
|
+ {keep_state, State, next_incoming_events({error, Reason})}
|
|
|
end.
|
|
|
|
|
|
-compile({inline, [next_incoming_events/1]}).
|
|
|
+next_incoming_events({error, Reason}) ->
|
|
|
+ [next_event(cast, {incoming, {error, Reason}})];
|
|
|
next_incoming_events(Packets) ->
|
|
|
[next_event(cast, {incoming, Packet}) || Packet <- Packets].
|
|
|
|
|
|
@@ -459,14 +456,19 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
|
|
|
{ok, NChanState} ->
|
|
|
SuccFun(State#connection{chan_state= NChanState});
|
|
|
{ok, OutPackets, NChanState} ->
|
|
|
- handle_outgoing(OutPackets, SuccFun,
|
|
|
- State#connection{chan_state = NChanState});
|
|
|
+ handle_outgoing(OutPackets, SuccFun, State#connection{chan_state = NChanState});
|
|
|
+ {wait_session_expire, Reason, NChanState} ->
|
|
|
+ ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
|
|
+ {next_state, disconnected, State#connection{chan_state = NChanState}};
|
|
|
+ {wait_session_expire, Reason, OutPackets, NChanState} ->
|
|
|
+ ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
|
|
+ NState = State#connection{chan_state= NChanState},
|
|
|
+ {next_state, disconnected, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState)};
|
|
|
{stop, Reason, NChanState} ->
|
|
|
stop(Reason, State#connection{chan_state = NChanState});
|
|
|
{stop, Reason, OutPackets, NChanState} ->
|
|
|
- Shutdown = fun(NewSt) -> stop(Reason, NewSt) end,
|
|
|
- NState = State#connection{chan_state = NChanState},
|
|
|
- handle_outgoing(OutPackets, Shutdown, NState)
|
|
|
+ NState = State#connection{chan_state= NChanState},
|
|
|
+ stop(Reason, handle_outgoing(OutPackets, fun(NewSt) -> NewSt end, NState))
|
|
|
end.
|
|
|
|
|
|
%%-------------------------------------------------------------------
|
|
|
@@ -477,10 +479,7 @@ handle_deliver(Delivers, State = #connection{chan_state = ChanState}) ->
|
|
|
{ok, NChanState} ->
|
|
|
keep_state(State#connection{chan_state = NChanState});
|
|
|
{ok, Packets, NChanState} ->
|
|
|
- NState = State#connection{chan_state = NChanState},
|
|
|
- handle_outgoing(Packets, fun keep_state/1, NState);
|
|
|
- {stop, Reason, NChanState} ->
|
|
|
- stop(Reason, State#connection{chan_state = NChanState})
|
|
|
+ handle_outgoing(Packets, fun keep_state/1, State#connection{chan_state = NChanState})
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -534,8 +533,10 @@ handle_timeout(TRef, Msg, State = #connection{chan_state = ChanState}) ->
|
|
|
{ok, NChanState} ->
|
|
|
keep_state(State#connection{chan_state = NChanState});
|
|
|
{ok, Packets, NChanState} ->
|
|
|
- handle_outgoing(Packets, fun keep_state/1,
|
|
|
- State#connection{chan_state = NChanState});
|
|
|
+ handle_outgoing(Packets, fun keep_state/1, State#connection{chan_state = NChanState});
|
|
|
+ {wait_session_expire, Reason, NChanState} ->
|
|
|
+ ?LOG(debug, "Disconnect and wait for session to expire due to ~p", [Reason]),
|
|
|
+ {next_state, disconnected, State#connection{chan_state = NChanState}};
|
|
|
{stop, Reason, NChanState} ->
|
|
|
stop(Reason, State#connection{chan_state = NChanState})
|
|
|
end.
|