|
|
@@ -267,8 +267,6 @@ receive_messages(Count, Msgs) ->
|
|
|
receive
|
|
|
{publish, Msg} ->
|
|
|
receive_messages(Count - 1, [Msg | Msgs]);
|
|
|
- {deliver, _Topic, Msg} ->
|
|
|
- receive_messages(Count - 1, [Msg | Msgs]);
|
|
|
_Other ->
|
|
|
receive_messages(Count, Msgs)
|
|
|
after 5000 ->
|
|
|
@@ -375,26 +373,6 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) ->
|
|
|
do_publish(Payload, PublishFun, WaitForUnregister) ->
|
|
|
do_publish([Payload], PublishFun, WaitForUnregister).
|
|
|
|
|
|
-get_replay_messages(ReplayID) ->
|
|
|
- DSShard = <<"local">>,
|
|
|
- case emqx_ds_storage_layer:restore_iterator(DSShard, ReplayID) of
|
|
|
- {ok, It} ->
|
|
|
- do_get_replay_messages(It, []);
|
|
|
- Error ->
|
|
|
- error({"error restoring iterator", #{error => Error, replay_id => ReplayID}})
|
|
|
- end.
|
|
|
-
|
|
|
-do_get_replay_messages(It, Acc) ->
|
|
|
- case emqx_ds_storage_layer:next(It) of
|
|
|
- {value, Val, NewIt} ->
|
|
|
- Msg = erlang:binary_to_term(Val),
|
|
|
- do_get_replay_messages(NewIt, [Msg | Acc]);
|
|
|
- none ->
|
|
|
- {ok, lists:reverse(Acc)};
|
|
|
- {error, Reason} ->
|
|
|
- {error, Reason}
|
|
|
- end.
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test Cases
|
|
|
%%--------------------------------------------------------------------
|