|
|
@@ -448,12 +448,11 @@ en_writeq(Msg, State = #state{writeq = WriteQ, replayq = ReplayQ,
|
|
|
NewReplayQ =replayq:append(ReplayQ, lists:reverse(WriteQ)),
|
|
|
State#state{writeq = [Msg], replayq = NewReplayQ}.
|
|
|
|
|
|
-publish_readq_msg(_ClientPid, [], ReadQ) ->
|
|
|
- {ok, ReadQ};
|
|
|
-publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], ReadQ) ->
|
|
|
- io:format("~n replay msg: ~p ~n", [Msg]),
|
|
|
+publish_readq_msg(_ClientPid, [], NewReadQ) ->
|
|
|
+ {ok, NewReadQ};
|
|
|
+publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) ->
|
|
|
{ok, PktId} = emqx_client:publish(ClientPid, Msg),
|
|
|
- publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | ReadQ]).
|
|
|
+ publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]).
|
|
|
|
|
|
delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) ->
|
|
|
ok = replayq:ack(ReplayQ, AckRef),
|