|
|
@@ -270,7 +270,7 @@ websocket_info(Deliver = {deliver, _Topic, _Msg},
|
|
|
{ok, NProtoState} ->
|
|
|
reply(State#state{proto_state = NProtoState});
|
|
|
{ok, Packets, NProtoState} ->
|
|
|
- reply(Packets, State#state{proto_state = NProtoState});
|
|
|
+ reply(enqueue(Packets, State#state{proto_state = NProtoState}));
|
|
|
{error, Reason} ->
|
|
|
stop(Reason, State);
|
|
|
{error, Reason, NProtoState} ->
|
|
|
@@ -282,7 +282,6 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
|
|
{ok, KeepAlive1} ->
|
|
|
{ok, State#state{keepalive = KeepAlive1}};
|
|
|
{error, timeout} ->
|
|
|
- ?LOG(debug, "Keepalive Timeout!"),
|
|
|
stop(keepalive_timeout, State);
|
|
|
{error, Error} ->
|
|
|
?LOG(error, "Keepalive error: ~p", [Error]),
|
|
|
@@ -315,7 +314,7 @@ websocket_info({timeout, Timer, Msg},
|
|
|
{ok, NProtoState} ->
|
|
|
{ok, State#state{proto_state = NProtoState}};
|
|
|
{ok, Packets, NProtoState} ->
|
|
|
- reply(Packets, State#state{proto_state = NProtoState});
|
|
|
+ reply(enqueue(Packets, State#state{proto_state = NProtoState}));
|
|
|
{error, Reason} ->
|
|
|
stop(Reason, State);
|
|
|
{error, Reason, NProtoState} ->
|
|
|
@@ -364,8 +363,7 @@ connected(State = #state{proto_state = ProtoState}) ->
|
|
|
%% Ensure keepalive after connected successfully.
|
|
|
Interval = emqx_protocol:info(keepalive, ProtoState),
|
|
|
case ensure_keepalive(Interval, NState) of
|
|
|
- ignore ->
|
|
|
- reply(NState);
|
|
|
+ ignore -> reply(NState);
|
|
|
{ok, KeepAlive} ->
|
|
|
reply(NState#state{keepalive = KeepAlive});
|
|
|
{error, Reason} ->
|
|
|
@@ -377,16 +375,10 @@ connected(State = #state{proto_state = ProtoState}) ->
|
|
|
|
|
|
ensure_keepalive(0, _State) ->
|
|
|
ignore;
|
|
|
-ensure_keepalive(Interval, State = #state{proto_state = ProtoState}) ->
|
|
|
+ensure_keepalive(Interval, #state{proto_state = ProtoState}) ->
|
|
|
Backoff = emqx_zone:get_env(emqx_protocol:info(zone, ProtoState),
|
|
|
keepalive_backoff, 0.75),
|
|
|
- case emqx_keepalive:start(stat_fun(), round(Interval * Backoff), {keepalive, check}) of
|
|
|
- {ok, KeepAlive} ->
|
|
|
- {ok, State#state{keepalive = KeepAlive}};
|
|
|
- {error, Reason} ->
|
|
|
- ?LOG(warning, "Keepalive error: ~p", [Reason]),
|
|
|
- stop(Reason, State)
|
|
|
- end.
|
|
|
+ emqx_keepalive:start(stat_fun(), round(Interval * Backoff), {keepalive, check}).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process incoming data
|
|
|
@@ -415,8 +407,7 @@ process_incoming(Data, State = #state{parse_state = ParseState}) ->
|
|
|
%% Handle incoming packets
|
|
|
|
|
|
handle_incoming(Packet = ?PACKET(Type), SuccFun,
|
|
|
- State = #state{proto_state = ProtoState,
|
|
|
- pendings = Pendings}) ->
|
|
|
+ State = #state{proto_state = ProtoState}) ->
|
|
|
_ = inc_incoming_stats(Type),
|
|
|
ok = emqx_metrics:inc_recv(Packet),
|
|
|
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
|
|
@@ -424,9 +415,7 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
|
|
|
{ok, NProtoState} ->
|
|
|
SuccFun(State#state{proto_state = NProtoState});
|
|
|
{ok, OutPackets, NProtoState} ->
|
|
|
- Pendings1 = lists:append(Pendings, OutPackets),
|
|
|
- SuccFun(State#state{proto_state = NProtoState,
|
|
|
- pendings = Pendings1});
|
|
|
+ SuccFun(enqueue(OutPackets, State#state{proto_state = NProtoState}));
|
|
|
{error, Reason, NProtoState} ->
|
|
|
stop(Reason, State#state{proto_state = NProtoState});
|
|
|
{stop, Error, NProtoState} ->
|
|
|
@@ -436,9 +425,6 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Handle outgoing packets
|
|
|
|
|
|
-handle_outgoing(Packet, State) when is_tuple(Packet) ->
|
|
|
- handle_outgoing([Packet], State);
|
|
|
-
|
|
|
handle_outgoing(Packets, #state{serialize = Serialize}) ->
|
|
|
Data = lists:map(Serialize, Packets),
|
|
|
emqx_pd:update_counter(send_oct, iolist_size(Data)),
|
|
|
@@ -471,10 +457,6 @@ inc_outgoing_stats(Type) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Reply or Stop
|
|
|
|
|
|
-reply(Packets, State = #state{pendings = Pendings}) ->
|
|
|
- Pendings1 = lists:append(Pendings, Packets),
|
|
|
- reply(State#state{pendings = Pendings1}).
|
|
|
-
|
|
|
reply(State = #state{pendings = []}) ->
|
|
|
{ok, State};
|
|
|
reply(State = #state{pendings = Pendings}) ->
|
|
|
@@ -488,6 +470,11 @@ stop(Reason, State = #state{pendings = Pendings}) ->
|
|
|
{reply, [Reply, close],
|
|
|
State#state{pendings = [], reason = Reason}}.
|
|
|
|
|
|
+enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
|
|
|
+ enqueue([Packet], State);
|
|
|
+enqueue(Packets, State = #state{pendings = Pendings}) ->
|
|
|
+ State#state{pendings = lists:append(Pendings, Packets)}.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Ensure stats timer
|
|
|
|