|
|
@@ -105,7 +105,11 @@ write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records)
|
|
|
write_records(Name, Buffer = #buffer{fd = Writer, max_records = MaxRecords}, Records, NumWritten) ->
|
|
|
case emqx_connector_aggreg_buffer:write(Records, Writer) of
|
|
|
ok ->
|
|
|
- ?tp(connector_aggreg_records_written, #{action => Name, records => Records}),
|
|
|
+ ?tp(connector_aggreg_records_written, #{
|
|
|
+ action => Name,
|
|
|
+ records => Records,
|
|
|
+ buffer => Buffer
|
|
|
+ }),
|
|
|
case is_number(NumWritten) andalso NumWritten >= MaxRecords of
|
|
|
true ->
|
|
|
rotate_buffer_async(Name, Buffer);
|
|
|
@@ -186,10 +190,10 @@ init(St0 = #st{name = Name}) ->
|
|
|
|
|
|
handle_call({next_buffer, Timestamp}, _From, St0) ->
|
|
|
St = #st{buffer = Buffer} = handle_next_buffer(Timestamp, St0),
|
|
|
- {reply, Buffer, St, 0};
|
|
|
+ {reply, Buffer, St};
|
|
|
handle_call({rotate_buffer, FD}, _From, St0) ->
|
|
|
St = #st{buffer = Buffer} = handle_rotate_buffer(FD, St0),
|
|
|
- {reply, Buffer, St, 0};
|
|
|
+ {reply, Buffer, St};
|
|
|
handle_call(take_error, _From, St0) ->
|
|
|
{MaybeError, St} = handle_take_error(St0),
|
|
|
{reply, MaybeError, St}.
|
|
|
@@ -198,12 +202,12 @@ handle_cast({close_buffer, Timestamp}, St) ->
|
|
|
{noreply, handle_close_buffer(Timestamp, St)};
|
|
|
handle_cast({rotate_buffer, FD}, St0) ->
|
|
|
St = handle_rotate_buffer(FD, St0),
|
|
|
- {noreply, St, 0};
|
|
|
+ {noreply, St};
|
|
|
+handle_cast(enqueue_delivery, St0) ->
|
|
|
+ {noreply, handle_queued_buffer(St0)};
|
|
|
handle_cast(_Cast, St) ->
|
|
|
{noreply, St}.
|
|
|
|
|
|
-handle_info(timeout, St) ->
|
|
|
- {noreply, handle_queued_buffer(St)};
|
|
|
handle_info({'DOWN', MRef, _, Pid, Reason}, St0 = #st{name = Name, deliveries = Ds0}) ->
|
|
|
case maps:take(MRef, Ds0) of
|
|
|
{Buffer, Ds} ->
|
|
|
@@ -254,6 +258,7 @@ handle_rotate_buffer(_ClosedFD, St) ->
|
|
|
St.
|
|
|
|
|
|
enqueue_closed_buffer(Buffer, St = #st{queued = undefined}) ->
|
|
|
+ trigger_enqueue_delivery(),
|
|
|
St#st{queued = Buffer};
|
|
|
enqueue_closed_buffer(Buffer, St0) ->
|
|
|
%% NOTE: Should never really happen unless interval / max records are too tight.
|
|
|
@@ -277,7 +282,7 @@ allocate_buffer(Since, Seq, St = #st{name = Name}) ->
|
|
|
{ok, FD} = file:open(Filename, [write, binary]),
|
|
|
Writer = emqx_connector_aggreg_buffer:new_writer(FD, _Meta = []),
|
|
|
_ = add_counter(Counter),
|
|
|
- ?tp(connector_aggreg_buffer_allocated, #{action => Name, filename => Filename}),
|
|
|
+ ?tp(connector_aggreg_buffer_allocated, #{action => Name, filename => Filename, buffer => Buffer}),
|
|
|
Buffer#buffer{fd = Writer}.
|
|
|
|
|
|
recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
|
|
|
@@ -386,6 +391,9 @@ lookup_current_buffer(Name) ->
|
|
|
|
|
|
%%
|
|
|
|
|
|
+trigger_enqueue_delivery() ->
|
|
|
+ gen_server:cast(self(), enqueue_delivery).
|
|
|
+
|
|
|
enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
|
|
|
case emqx_connector_aggreg_upload_sup:start_delivery(Name, Buffer) of
|
|
|
{ok, Pid} ->
|
|
|
@@ -398,16 +406,14 @@ enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
|
|
|
handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when
|
|
|
Normal == normal; Normal == noproc
|
|
|
->
|
|
|
- ?SLOG(debug, #{
|
|
|
- msg => "aggregated_buffer_delivery_completed",
|
|
|
+ ?tp(debug, "aggregated_buffer_delivery_completed", #{
|
|
|
action => Name,
|
|
|
buffer => Buffer#buffer.filename
|
|
|
}),
|
|
|
ok = discard_buffer(Buffer),
|
|
|
St;
|
|
|
handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name}) ->
|
|
|
- ?SLOG(info, #{
|
|
|
- msg => "aggregated_buffer_delivery_skipped",
|
|
|
+ ?tp(info, "aggregated_buffer_delivery_skipped", #{
|
|
|
action => Name,
|
|
|
buffer => {Buffer#buffer.since, Buffer#buffer.seq},
|
|
|
reason => Reason
|