|
|
@@ -119,10 +119,7 @@ begin_read(Dir, _Context) ->
|
|
|
|
|
|
start_snapshot_reader(Meta, RS) ->
|
|
|
ShardId = shard_id(RS),
|
|
|
- logger:info(#{
|
|
|
- msg => "dsrepl_snapshot_read_started",
|
|
|
- shard => ShardId
|
|
|
- }),
|
|
|
+ ?tp(info, "dsrepl_snapshot_read_started", #{shard => ShardId}),
|
|
|
{ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(ShardId),
|
|
|
{ok, Meta, RS#rs{reader = SnapReader}}.
|
|
|
|
|
|
@@ -134,10 +131,11 @@ read_chunk(RS = #rs{phase = machine_state, state = MachineState}, _Size, _Dir) -
|
|
|
read_chunk(RS = #rs{phase = storage_snapshot, reader = SnapReader0}, Size, _Dir) ->
|
|
|
case emqx_ds_storage_snapshot:read_chunk(SnapReader0, Size) of
|
|
|
{next, Chunk, SnapReader} ->
|
|
|
+ ?tp(dsrepl_snapshot_read, #{shard => shard_id(RS), reader => SnapReader, last => false}),
|
|
|
{ok, Chunk, {next, RS#rs{reader = SnapReader}}};
|
|
|
{last, Chunk, SnapReader} ->
|
|
|
%% TODO: idempotence?
|
|
|
- ?tp(dsrepl_snapshot_read_complete, #{reader => SnapReader}),
|
|
|
+ ?tp(dsrepl_snapshot_read, #{shard => shard_id(RS), reader => SnapReader, last => true}),
|
|
|
_ = complete_read(RS#rs{reader = SnapReader}),
|
|
|
{ok, Chunk, last};
|
|
|
{error, Reason} ->
|
|
|
@@ -148,8 +146,7 @@ read_chunk(RS = #rs{phase = storage_snapshot, reader = SnapReader0}, Size, _Dir)
|
|
|
|
|
|
complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) ->
|
|
|
_ = emqx_ds_storage_snapshot:release_reader(SnapReader),
|
|
|
- logger:info(#{
|
|
|
- msg => "dsrepl_snapshot_read_complete",
|
|
|
+ ?tp(info, "dsrepl_snapshot_read_complete", #{
|
|
|
shard => shard_id(RS),
|
|
|
duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
|
|
|
read_bytes => emqx_ds_storage_snapshot:reader_info(bytes_read, SnapReader)
|
|
|
@@ -194,19 +191,21 @@ accept_chunk(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
|
|
|
%% TODO: idempotence?
|
|
|
case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
|
|
|
{next, SnapWriter} ->
|
|
|
+ ?tp(dsrepl_snapshot_write, #{shard => shard_id(WS), writer => SnapWriter, last => false}),
|
|
|
{ok, WS#ws{writer = SnapWriter}};
|
|
|
{error, Reason} ->
|
|
|
- ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
|
|
|
+ ?tp(dsrepl_snapshot_write_error, #{
|
|
|
+ shard => shard_id(WS),
|
|
|
+ reason => Reason,
|
|
|
+ writer => SnapWriter0
|
|
|
+ }),
|
|
|
_ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
|
|
|
error(Reason)
|
|
|
end.
|
|
|
|
|
|
start_snapshot_writer(WS) ->
|
|
|
ShardId = shard_id(WS),
|
|
|
- logger:info(#{
|
|
|
- msg => "dsrepl_snapshot_write_started",
|
|
|
- shard => ShardId
|
|
|
- }),
|
|
|
+ ?tp(info, "dsrepl_snapshot_write_started", #{shard => ShardId}),
|
|
|
_ = emqx_ds_builtin_raft_db_sup:terminate_storage(ShardId),
|
|
|
{ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(ShardId),
|
|
|
{ok, WS#ws{phase = storage_snapshot, writer = SnapWriter}}.
|
|
|
@@ -216,21 +215,24 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0})
|
|
|
%% TODO: idempotence?
|
|
|
case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
|
|
|
{last, SnapWriter} ->
|
|
|
- ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}),
|
|
|
+ ?tp(dsrepl_snapshot_write, #{shard => shard_id(WS), writer => SnapWriter, last => true}),
|
|
|
_ = emqx_ds_storage_snapshot:release_writer(SnapWriter),
|
|
|
Result = complete_accept(WS#ws{writer = SnapWriter}),
|
|
|
?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS), state => WS#ws.state}),
|
|
|
Result;
|
|
|
{error, Reason} ->
|
|
|
- ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
|
|
|
+ ?tp(dsrepl_snapshot_write_error, #{
|
|
|
+ shard => shard_id(WS),
|
|
|
+ reason => Reason,
|
|
|
+ writer => SnapWriter0
|
|
|
+ }),
|
|
|
_ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
|
|
|
error(Reason)
|
|
|
end.
|
|
|
|
|
|
complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
|
|
|
ShardId = shard_id(WS),
|
|
|
- logger:info(#{
|
|
|
- msg => "dsrepl_snapshot_write_complete",
|
|
|
+ ?tp(info, "dsrepl_snapshot_write_complete", #{
|
|
|
shard => ShardId,
|
|
|
duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
|
|
|
bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter)
|