|
|
@@ -231,19 +231,12 @@ terminate(_Reason, #st{name = Name}) ->
|
|
|
%%
|
|
|
|
|
|
handle_next_buffer(Timestamp, St = #st{buffer = #buffer{until = Until}}) when Timestamp < Until ->
|
|
|
- ?tp(connector_aggreg_handle_next_buffer_too_soon, #{
|
|
|
- timestamp => Timestamp, buffer => St#st.buffer
|
|
|
- }),
|
|
|
St;
|
|
|
handle_next_buffer(Timestamp, St0 = #st{buffer = Buffer = #buffer{since = PrevSince}}) ->
|
|
|
- ?tp(connector_aggreg_handle_next_buffer, #{timestamp => Timestamp, buffer => Buffer}),
|
|
|
BufferClosed = close_buffer(Buffer),
|
|
|
St = enqueue_closed_buffer(BufferClosed, St0),
|
|
|
handle_next_buffer(Timestamp, PrevSince, St);
|
|
|
handle_next_buffer(Timestamp, St = #st{buffer = undefined}) ->
|
|
|
- ?tp(connector_aggreg_handle_next_buffer_undefined, #{
|
|
|
- timestamp => Timestamp, buffer => St#st.buffer
|
|
|
- }),
|
|
|
handle_next_buffer(Timestamp, Timestamp, St).
|
|
|
|
|
|
handle_next_buffer(Timestamp, PrevSince, St0) ->
|
|
|
@@ -256,14 +249,12 @@ handle_rotate_buffer(
|
|
|
FD,
|
|
|
St0 = #st{buffer = Buffer = #buffer{since = Since, seq = Seq, fd = FD}}
|
|
|
) ->
|
|
|
- ?tp(connector_aggreg_rotate_buffer, #{fd => FD, buffer => Buffer}),
|
|
|
BufferClosed = close_buffer(Buffer),
|
|
|
NextBuffer = allocate_buffer(Since, Seq + 1, St0),
|
|
|
St = enqueue_closed_buffer(BufferClosed, St0#st{buffer = NextBuffer}),
|
|
|
_ = announce_current_buffer(St),
|
|
|
St;
|
|
|
handle_rotate_buffer(_ClosedFD, St) ->
|
|
|
- ?tp(connector_aggreg_rotate_buffer_nop, #{fd => _ClosedFD, buffer => St#st.buffer}),
|
|
|
St.
|
|
|
|
|
|
enqueue_closed_buffer(Buffer, St = #st{queued = undefined}) ->
|
|
|
@@ -366,12 +357,10 @@ handle_close_buffer(
|
|
|
Timestamp,
|
|
|
St0 = #st{buffer = Buffer = #buffer{until = Until}}
|
|
|
) when Timestamp >= Until ->
|
|
|
- ?tp(connector_aggreg_close_buffer, #{timestamp => Timestamp, buffer => Buffer}),
|
|
|
St = St0#st{buffer = undefined},
|
|
|
_ = announce_current_buffer(St),
|
|
|
enqueue_delivery(close_buffer(Buffer), St);
|
|
|
handle_close_buffer(_Timestamp, St = #st{buffer = undefined}) ->
|
|
|
- ?tp(connector_aggreg_close_buffer_nop, #{timestamp => _Timestamp, buffer => undefined}),
|
|
|
St.
|
|
|
|
|
|
close_buffer(Buffer = #buffer{fd = FD}) ->
|