|
|
@@ -38,6 +38,7 @@
|
|
|
|
|
|
-include("emqx_ds.hrl").
|
|
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
|
|
+-include_lib("kernel/include/logger.hrl").
|
|
|
|
|
|
%%================================================================================
|
|
|
%% Type declarations
|
|
|
@@ -106,7 +107,7 @@ update_stats(Worker, {_Key, Msg}) ->
|
|
|
|
|
|
%%%%%
|
|
|
-spec start_link(#{
|
|
|
- topic := binary(),
|
|
|
+ topic := string() | [emqx_types:word()],
|
|
|
name => term(),
|
|
|
db => emqx_ds:db(),
|
|
|
callback => callback(),
|
|
|
@@ -197,7 +198,7 @@ more() ->
|
|
|
%% Number of streams with currently inflight poll requests:
|
|
|
inflight = 0 :: non_neg_integer(),
|
|
|
%% Number of poll requests sent to the DS so far, or since the
|
|
|
- %% last `next{}' call:
|
|
|
+ %% last `next()' call:
|
|
|
poll_count = 0 :: non_neg_integer()
|
|
|
}).
|
|
|
|
|
|
@@ -299,11 +300,14 @@ handle_poll_reply(
|
|
|
};
|
|
|
handle_poll_reply(Stream, {error, recoverable, _Err}, S = #s{pq = PQ0}) ->
|
|
|
S#s{pq = queue:in(Stream, PQ0)};
|
|
|
-handle_poll_reply(Stream, EOSEvent, S = #s{eos = EOS}) ->
|
|
|
+handle_poll_reply(Stream, EOSEvent, S = #s{eos = EOS, db = DB}) ->
|
|
|
case EOSEvent of
|
|
|
{ok, end_of_stream} ->
|
|
|
ok;
|
|
|
- {error, unrecoverable, _Err} ->
|
|
|
+ {error, unrecoverable, Err} ->
|
|
|
+ ?LOG_ERROR(#{
|
|
|
+ msg => "Unrecoverable stream poll error", stream => Stream, error => Err, db => DB
|
|
|
+ }),
|
|
|
ok
|
|
|
end,
|
|
|
S#s{eos = EOS#{Stream => true}}.
|
|
|
@@ -375,6 +379,6 @@ grab_more(#s{active = Active, pq = PQ, poll_count = PC}) ->
|
|
|
do_poll(S = #s{db = DB, poll_timeout = Timeout, polls = Polls0, its = Its}, PollStreams) ->
|
|
|
Req = [{Stream, maps:get(Stream, Its)} || Stream <- PollStreams],
|
|
|
{ok, Ref} = emqx_ds:poll(DB, Req, #{timeout => Timeout}),
|
|
|
- NewPolls = maps:from_list([{Stream, Ref} || Stream <- PollStreams]),
|
|
|
+ NewPolls = maps:from_keys(PollStreams, Ref),
|
|
|
Polls = maps:merge(Polls0, NewPolls),
|
|
|
S#s{polls = Polls}.
|