Przeglądaj źródła

feat(dsrepl): release log entries occasionally

Also make tracepoints in `apply/3` callback implementation more
uniform.
Andrew Mayorov 1 rok temu
rodzic
commit
2705226eb5

+ 60 - 12
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -579,6 +579,10 @@ list_nodes() ->
 %% Too large for normal operation, need better backpressure mechanism.
 -define(RA_TIMEOUT, 60 * 1000).
 
+%% How often to release Raft logs?
+%% Each N log entries mark everything up to the last N entries "releasable".
+-define(RA_RELEASE_LOG_FREQ, 1000).
+
 -define(SAFE_ERPC(EXPR),
     try
         EXPR
@@ -746,6 +750,8 @@ ra_drop_shard(DB, Shard) ->
 
 %%
 
+-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
+
 -spec init(_Args :: map()) -> ra_state().
 init(#{db := DB, shard := Shard}) ->
     #{db_shard => {DB, Shard}, latest => 0}.
@@ -753,7 +759,7 @@ init(#{db := DB, shard := Shard}) ->
 -spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
     {ra_state(), _Reply, _Effects}.
 apply(
-    #{index := RaftIdx},
+    RaftMeta,
     #{
         ?tag := ?BATCH,
         ?batch_messages := MessagesIn
@@ -765,17 +771,17 @@ apply(
     Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
     State = State0#{latest := Latest},
     set_ts(DBShard, Latest),
-    %% TODO: Need to measure effects of changing frequency of `release_cursor`.
-    Effect = {release_cursor, RaftIdx, State},
-    {State, Result, Effect};
+    Effects = try_release_log(RaftMeta, State),
+    Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
+    {State, Result, Effects};
 apply(
-    _RaftMeta,
+    RaftMeta,
     #{?tag := add_generation, ?since := Since},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
     ?tp(
         info,
-        ds_replication_layer_add_generation,
+        ds_ra_add_generation,
         #{
             shard => DBShard,
             since => Since
@@ -785,15 +791,17 @@ apply(
     Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
     State = State0#{latest := Latest},
     set_ts(DBShard, Latest),
-    {State, Result};
+    Effects = release_log(RaftMeta, State),
+    Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
+    {State, Result, Effects};
 apply(
-    _RaftMeta,
+    RaftMeta,
     #{?tag := update_config, ?since := Since, ?config := Opts},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
     ?tp(
         notice,
-        ds_replication_layer_update_config,
+        ds_ra_update_config,
         #{
             shard => DBShard,
             config => Opts,
@@ -803,7 +811,9 @@ apply(
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
     State = State0#{latest := Latest},
-    {State, Result};
+    Effects = release_log(RaftMeta, State),
+    Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
+    {State, Result, Effects};
 apply(
     _RaftMeta,
     #{?tag := drop_generation, ?generation := GenId},
@@ -811,7 +821,7 @@ apply(
 ) ->
     ?tp(
         info,
-        ds_replication_layer_drop_generation,
+        ds_ra_drop_generation,
         #{
             shard => DBShard,
             generation => GenId
@@ -828,7 +838,7 @@ apply(
     set_ts(DBShard, Latest),
     ?tp(
         debug,
-        emqx_ds_replication_layer_storage_event,
+        ds_ra_storage_event,
         #{
             shard => DBShard, payload => CustomEvent, latest => Latest
         }
@@ -836,6 +846,44 @@ apply(
     Effects = handle_custom_event(DBShard, Latest, CustomEvent),
     {State#{latest => Latest}, ok, Effects}.
 
+try_release_log(RaftMeta = #{index := CurrentIdx}, State) ->
+    %% NOTE
+    %% Release everything up to the last log entry, but only if there were more than
+    %% `?RA_RELEASE_LOG_FREQ` new entries since the last release.
+    case get_log_need_release(RaftMeta) of
+        undefined ->
+            [];
+        PrevIdx when ?RA_RELEASE_LOG_FREQ > CurrentIdx - PrevIdx ->
+            [];
+        _PrevIdx ->
+            %% TODO
+            %% Number of log entries is not the best metric. Because cursor release
+            %% means storage flush (see `emqx_ds_replication_snapshot:write/3`), we
+            %% should do that not too often (so the storage is happy with L0 SST size)
+            %% and not too rarely (so we don't accumulate huge Raft logs).
+            release_log(RaftMeta, State)
+    end.
+
+release_log(RaftMeta = #{index := CurrentIdx}, State) ->
+    %% NOTE
+    %% Release everything up to the last log entry. This is important: any log entries
+    %% following `CurrentIdx` should not contribute to `State` (that will be recovered
+    %% from a snapshot).
+    update_log_need_release(RaftMeta),
+    {release_cursor, CurrentIdx, State}.
+
+get_log_need_release(RaftMeta) ->
+    case erlang:get(?pd_ra_idx_need_release) of
+        undefined ->
+            update_log_need_release(RaftMeta),
+            undefined;
+        LastIdx ->
+            LastIdx
+    end.
+
+update_log_need_release(#{index := CurrentIdx}) ->
+    erlang:put(?pd_ra_idx_need_release, CurrentIdx).
+
 -spec tick(integer(), ra_state()) -> ra_machine:effects().
 tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
     %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),