Просмотр исходного кода

refactor: add more trace points to pinpoint error

There's some race condition where data is pushed to a buffer, the second in the history of
the action, then immediately afterwards a 3rd, new buffer is allocated but the 2nd buffer
is never enqueued as a delivery, and later, after a long timeout, only the 3rd, empty
buffer delivery is started.

```
2024-10-16T16:42:22.279746+00:00 [$trace_begin] #{'~meta' => #{},begin_system_time => 1729096942279746}.
/emqx/apps/emqx_resource/src/emqx_resource_manager.erl:640 2024-10-16T16:42:22.288646+00:00 [resource_connected_enter] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396873.0>,gl => <0.396499.0>}}.
/emqx/apps/emqx_connector/src/emqx_connector.erl:177 2024-10-16T16:42:22.386853+00:00 [connector_post_config_update_done] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396218.0>,gl => <0.396206.0>}}.
/emqx/apps/emqx_connector_jwt/src/emqx_connector_jwt.erl:139 2024-10-16T16:42:22.407142+00:00 [emqx_connector_jwt_token_stored] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396873.0>,gl => <0.396499.0>},resource_id => <<"action:snowflake:t_aggreg_upload-576460752303422495:connector:snowflake:t_aggreg_upload-576460752303422495">>}.
/emqx/apps/emqx_bridge/src/emqx_bridge_v2.erl:1172 2024-10-16T16:42:22.770083+00:00 [bridge_post_config_update_done] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396218.0>,gl => <0.396206.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:285 2024-10-16T16:42:22.901320+00:00 [connector_aggreg_buffer_allocated] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0000">>,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396897.0>,gl => <0.396511.0>},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:108 2024-10-16T16:42:22.901393+00:00 [connector_aggreg_records_written] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0000">>,seq => 0,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396886.0>,gl => <0.396499.0>,rule_ids => #{},client_ids => #{},rule_trigger_ts => []},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>},records => [#{<<"clientid">> => <<"C1">>,<<"payload">> => <<"7B2268656C6C6F223A22776F726C64227D">>,<<"publish_received_at">> => <<"2024-10-16T16:42:22.800+00:00">>,<<"topic">> => <<"sf/a/b/c">>}]}.
/emqx/apps/emqx_rule_engine/src/emqx_rule_runtime.erl:62 2024-10-16T16:42:22.901449+00:00 [rule_engine_applied_all_rules] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:108 2024-10-16T16:42:23.003069+00:00 [connector_aggreg_records_written] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0000">>,seq => 0,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396886.0>,gl => <0.396499.0>,rule_ids => #{},client_ids => #{},rule_trigger_ts => []},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>},records => [#{<<"clientid">> => <<"C2">>,<<"payload">> => <<"62617A">>,<<"publish_received_at">> => <<"2024-10-16T16:42:22.800+00:00">>,<<"topic">> => <<"sf/foo/bar">>}]}.
/emqx/apps/emqx_rule_engine/src/emqx_rule_runtime.erl:62 2024-10-16T16:42:23.003127+00:00 [rule_engine_applied_all_rules] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:108 2024-10-16T16:42:23.104087+00:00 [connector_aggreg_records_written] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0000">>,seq => 0,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396886.0>,gl => <0.396499.0>,rule_ids => #{},client_ids => #{},rule_trigger_ts => []},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>},records => [#{<<"clientid">> => <<"C3">>,<<"payload">> => <<>>,<<"publish_received_at">> => <<"2024-10-16T16:42:22.800+00:00">>,<<"topic">> => <<"sf/t/42">>}]}.
/emqx/apps/emqx_rule_engine/src/emqx_rule_runtime.erl:62 2024-10-16T16:42:23.104173+00:00 [rule_engine_applied_all_rules] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl:576 2024-10-16T16:42:23.104479+00:00 [published first batch] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:285 2024-10-16T16:42:23.104658+00:00 [connector_aggreg_buffer_allocated] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0001">>,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396897.0>,gl => <0.396511.0>},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl:73 2024-10-16T16:42:23.104866+00:00 [connector_aggreg_delivery_started] #{buffer => {buffer,1729096942,1729096947,0,<<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0000">>,undefined,3,{#Ref<0.609744082.2445672450.135056>,{1729096942,0}}},'~meta' => #{node => 'test@127.0.0.1',pid => <0.396912.0>,gl => <0.396511.0>},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>}}.
/emqx/apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl:607 2024-10-16T16:42:23.106088+00:00 [snowflake_flush_on_complete] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396912.0>,gl => <0.396511.0>}}.
/emqx/apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl:569 2024-10-16T16:42:23.106201+00:00 [snowflake_will_stage_file] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396912.0>,gl => <0.396511.0>}}.
/emqx/apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl:418 2024-10-16T16:42:23.106625+00:00 [snowflake_stage_file_succeeded] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/tmp/1729096942_0_0.csv">>,pool => <<"connector:snowflake:t_aggreg_upload-576460752303422495">>,result => [#{<<"encryption">> => <<"ENCRYPTED">>,<<"message">> => <<>>,<<"source">> => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/tmp/1729096942_0_0.csv">>,<<"source_compression">> => <<"none">>,<<"source_size">> => <<"5">>,<<"status">> => <<"UPLOADED">>,<<"target">> => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/tmp/1729096942_0_0.csv.gz">>,<<"target_compression">> => <<"gzip">>,<<"target_size">> => <<"32">>}],schema => <<"public">>,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396912.0>,gl => <0.396511.0>},stage => <<"teststage0">>,database => <<"testdatabase">>}.
/emqx/apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl:896 2024-10-16T16:42:23.106746+00:00 [snowflake_stage_insert_files_request] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396912.0>,gl => <0.396511.0>},action_res_id => <<"action:snowflake:t_aggreg_upload-576460752303422495:connector:snowflake:t_aggreg_upload-576460752303422495">>,staged_files => [#{size => 213,path => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/tmp/1729096942_0_0.csv.gz">>}]}.
/emqx/apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl:225 2024-10-16T16:42:23.106777+00:00 [mock_snowflake_insert_file_request] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396912.0>,gl => <0.396511.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl:174 2024-10-16T16:42:23.106789+00:00 [connector_aggreg_delivery_completed] #{transfer => #{},'~meta' => #{node => 'test@127.0.0.1',pid => <0.396912.0>,gl => <0.396511.0>},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>}}.
/emqx/apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl:579 2024-10-16T16:42:23.106916+00:00 [first batch delivered] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:108 2024-10-16T16:42:23.208067+00:00 [connector_aggreg_records_written] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0001">>,seq => 1,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396886.0>,gl => <0.396499.0>,rule_ids => #{},client_ids => #{},rule_trigger_ts => []},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>},records => [#{<<"clientid">> => <<"C4">>,<<"payload">> => <<"7B2268656C6C6F223A22776F726C64227D">>,<<"publish_received_at">> => <<"2024-10-16T16:42:23.106+00:00">>,<<"topic">> => <<"sf/a/b/c">>}]}.
/emqx/apps/emqx_rule_engine/src/emqx_rule_runtime.erl:62 2024-10-16T16:42:23.208136+00:00 [rule_engine_applied_all_rules] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:108 2024-10-16T16:42:23.309027+00:00 [connector_aggreg_records_written] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0001">>,seq => 1,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396886.0>,gl => <0.396499.0>,rule_ids => #{},client_ids => #{},rule_trigger_ts => []},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>},records => [#{<<"clientid">> => <<"C5">>,<<"payload">> => <<"62617A">>,<<"publish_received_at">> => <<"2024-10-16T16:42:23.106+00:00">>,<<"topic">> => <<"sf/foo/bar">>}]}.
/emqx/apps/emqx_rule_engine/src/emqx_rule_runtime.erl:62 2024-10-16T16:42:23.309104+00:00 [rule_engine_applied_all_rules] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:108 2024-10-16T16:42:23.410090+00:00 [connector_aggreg_records_written] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0001">>,seq => 1,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396886.0>,gl => <0.396499.0>,rule_ids => #{},client_ids => #{},rule_trigger_ts => []},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>},records => [#{<<"clientid">> => <<"C6">>,<<"payload">> => <<>>,<<"publish_received_at">> => <<"2024-10-16T16:42:23.106+00:00">>,<<"topic">> => <<"sf/t/42">>}]}.
/emqx/apps/emqx_rule_engine/src/emqx_rule_runtime.erl:62 2024-10-16T16:42:23.410172+00:00 [rule_engine_applied_all_rules] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl:590 2024-10-16T16:42:23.410361+00:00 [published second batch] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl:285 2024-10-16T16:42:23.410727+00:00 [connector_aggreg_buffer_allocated] #{filename => <<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0002">>,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396897.0>,gl => <0.396511.0>},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl:73 2024-10-16T16:42:27.414506+00:00 [connector_aggreg_delivery_started] #{buffer => {buffer,1729096942,1729096947,2,<<"/emqx/_build/test/logs/ct_run.test@127.0.0.1.2024-10-16_16.42.04/lib.emqx_bridge_snowflake.emqx_bridge_snowflake_SUITE.logs/run.2024-10-16_16.42.05/log_private/emqx_bridge_snowflake_SUITE_data/bridge/snowflake/t_aggreg_upload-576460752303422495/T1729096942_0002">>,undefined,3,{#Ref<0.609744082.2445672450.135056>,{1729096942,2}}},'~meta' => #{node => 'test@127.0.0.1',pid => <0.396959.0>,gl => <0.396511.0>},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>}}.
/emqx/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl:165 2024-10-16T16:42:27.414741+00:00 [connector_aggreg_delivery_completed] #{transfer => empty,'~meta' => #{node => 'test@127.0.0.1',pid => <0.396959.0>,gl => <0.396511.0>},action => {<<"snowflake">>,<<"t_aggreg_upload-576460752303422495">>}}.
/emqx/apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl:594 2024-10-16T16:42:27.414909+00:00 [second batch delivered] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396857.0>,gl => <0.395897.0>}}.
/emqx/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl:189 2024-10-16T16:42:30.001933+00:00 [dashboard_monitor_flushed] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396645.0>,gl => <0.396616.0>}}.
/emqx/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl:189 2024-10-16T16:42:40.001905+00:00 [dashboard_monitor_flushed] #{'~meta' => #{node => 'test@127.0.0.1',pid => <0.396645.0>,gl => <0.396616.0>}}.
2024-10-16T16:42:41.115005+00:00 [$trace_end] #{'~meta' => #{}}.
```
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
ca927a15f3

+ 12 - 1
apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl

@@ -44,6 +44,11 @@
     | T
 ]).
 
+-define(tpal(MSG), begin
+    ct:pal(MSG),
+    ?tp(notice, MSG, #{})
+end).
+
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %%------------------------------------------------------------------------------
@@ -568,8 +573,10 @@ t_aggreg_upload(Config) ->
                 {<<"C4">>, <<"t/42">>, <<"won't appear in results">>}
             ]),
             ok = publish_messages(Messages1),
+            ?tpal("published first batch"),
             %% Wait until the delivery is completed.
             ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
+            ?tpal("first batch delivered"),
             %% Send a second batch of messages to be staged in a second file
             Messages2 = lists:map(fun mk_message/1, [
                 {<<"C4">>, T4 = <<"sf/a/b/c">>, P4 = <<"{\"hello\":\"world\"}">>},
@@ -578,9 +585,13 @@ t_aggreg_upload(Config) ->
             ]),
             {ok, {ok, _}} =
                 ?wait_async_action(
-                    publish_messages(Messages2),
+                    begin
+                        publish_messages(Messages2),
+                        ?tpal("published second batch")
+                    end,
                     #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}
                 ),
+            ?tpal("second batch delivered"),
             %% Check the uploaded objects.
             ExpectedNumFiles = 2,
             wait_until_processed(Config, ActionResId, BeginMark, ExpectedNumFiles),

+ 1 - 1
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src

@@ -1,6 +1,6 @@
 {application, emqx_connector_aggregator, [
     {description, "EMQX Enterprise Connector Data Aggregator"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [
         kernel,

+ 19 - 6
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl

@@ -105,7 +105,11 @@ write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records)
 write_records(Name, Buffer = #buffer{fd = Writer, max_records = MaxRecords}, Records, NumWritten) ->
     case emqx_connector_aggreg_buffer:write(Records, Writer) of
         ok ->
-            ?tp(connector_aggreg_records_written, #{action => Name, records => Records}),
+            ?tp(connector_aggreg_records_written, #{
+                action => Name,
+                records => Records,
+                buffer => Buffer
+            }),
             case is_number(NumWritten) andalso NumWritten >= MaxRecords of
                 true ->
                     rotate_buffer_async(Name, Buffer);
@@ -227,12 +231,19 @@ terminate(_Reason, #st{name = Name}) ->
 %%
 
 handle_next_buffer(Timestamp, St = #st{buffer = #buffer{until = Until}}) when Timestamp < Until ->
+    ?tp(connector_aggreg_handle_next_buffer_too_soon, #{
+        timestamp => Timestamp, buffer => St#st.buffer
+    }),
     St;
 handle_next_buffer(Timestamp, St0 = #st{buffer = Buffer = #buffer{since = PrevSince}}) ->
+    ?tp(connector_aggreg_handle_next_buffer, #{timestamp => Timestamp, buffer => Buffer}),
     BufferClosed = close_buffer(Buffer),
     St = enqueue_closed_buffer(BufferClosed, St0),
     handle_next_buffer(Timestamp, PrevSince, St);
 handle_next_buffer(Timestamp, St = #st{buffer = undefined}) ->
+    ?tp(connector_aggreg_handle_next_buffer_undefined, #{
+        timestamp => Timestamp, buffer => St#st.buffer
+    }),
     handle_next_buffer(Timestamp, Timestamp, St).
 
 handle_next_buffer(Timestamp, PrevSince, St0) ->
@@ -245,12 +256,14 @@ handle_rotate_buffer(
     FD,
     St0 = #st{buffer = Buffer = #buffer{since = Since, seq = Seq, fd = FD}}
 ) ->
+    ?tp(connector_aggreg_rotate_buffer, #{fd => FD, buffer => Buffer}),
     BufferClosed = close_buffer(Buffer),
     NextBuffer = allocate_buffer(Since, Seq + 1, St0),
     St = enqueue_closed_buffer(BufferClosed, St0#st{buffer = NextBuffer}),
     _ = announce_current_buffer(St),
     St;
 handle_rotate_buffer(_ClosedFD, St) ->
+    ?tp(connector_aggreg_rotate_buffer_nop, #{fd => _ClosedFD, buffer => St#st.buffer}),
     St.
 
 enqueue_closed_buffer(Buffer, St = #st{queued = undefined}) ->
@@ -277,7 +290,7 @@ allocate_buffer(Since, Seq, St = #st{name = Name}) ->
     {ok, FD} = file:open(Filename, [write, binary]),
     Writer = emqx_connector_aggreg_buffer:new_writer(FD, _Meta = []),
     _ = add_counter(Counter),
-    ?tp(connector_aggreg_buffer_allocated, #{action => Name, filename => Filename}),
+    ?tp(connector_aggreg_buffer_allocated, #{action => Name, filename => Filename, buffer => Buffer}),
     Buffer#buffer{fd = Writer}.
 
 recover_buffer(Buffer = #buffer{filename = Filename, cnt_records = Counter}) ->
@@ -352,10 +365,12 @@ handle_close_buffer(
     Timestamp,
     St0 = #st{buffer = Buffer = #buffer{until = Until}}
 ) when Timestamp >= Until ->
+    ?tp(connector_aggreg_close_buffer, #{timestamp => Timestamp, buffer => Buffer}),
     St = St0#st{buffer = undefined},
     _ = announce_current_buffer(St),
     enqueue_delivery(close_buffer(Buffer), St);
 handle_close_buffer(_Timestamp, St = #st{buffer = undefined}) ->
+    ?tp(connector_aggreg_close_buffer_nop, #{timestamp => _Timestamp, buffer => undefined}),
     St.
 
 close_buffer(Buffer = #buffer{fd = FD}) ->
@@ -398,16 +413,14 @@ enqueue_delivery(Buffer, St = #st{name = Name, deliveries = Ds}) ->
 handle_delivery_exit(Buffer, Normal, St = #st{name = Name}) when
     Normal == normal; Normal == noproc
 ->
-    ?SLOG(debug, #{
-        msg => "aggregated_buffer_delivery_completed",
+    ?tp(debug, "aggregated_buffer_delivery_completed", #{
         action => Name,
         buffer => Buffer#buffer.filename
     }),
     ok = discard_buffer(Buffer),
     St;
 handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name}) ->
-    ?SLOG(info, #{
-        msg => "aggregated_buffer_delivery_skipped",
+    ?tp(info, "aggregated_buffer_delivery_skipped", #{
         action => Name,
         buffer => {Buffer#buffer.since, Buffer#buffer.seq},
         reason => Reason