فهرست منبع

fix(ds): Use leader's clock for computing LTS safe cutoff time

ieQu1 1 سال پیش
والد
کامیت
b2a633aca1

+ 3 - 2
apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl

@@ -77,7 +77,8 @@ stop_db(DB) ->
 %% @doc Set a DB-global variable. Please don't abuse this API.
 %% @doc Set a DB-global variable. Please don't abuse this API.
 -spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok.
 -spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok.
 set_gvar(DB, Key, Val) ->
 set_gvar(DB, Key, Val) ->
-    ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}).
+    ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}),
+    ok.
 
 
 -spec get_gvar(emqx_ds:db(), _Key, Val) -> Val.
 -spec get_gvar(emqx_ds:db(), _Key, Val) -> Val.
 get_gvar(DB, Key, Default) ->
 get_gvar(DB, Key, Default) ->
@@ -123,7 +124,7 @@ init(?top) ->
         type => supervisor,
         type => supervisor,
         shutdown => infinity
         shutdown => infinity
     },
     },
-    ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]),
+    _ = ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]),
     %%
     %%
     SupFlags = #{
     SupFlags = #{
         strategy => one_for_all,
         strategy => one_for_all,

+ 45 - 5
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -36,7 +36,8 @@
     update_iterator/3,
     update_iterator/3,
     next/3,
     next/3,
     delete_next/4,
     delete_next/4,
-    shard_of_message/3
+    shard_of_message/3,
+    current_timestamp/2
 ]).
 ]).
 
 
 %% internal exports:
 %% internal exports:
@@ -65,6 +66,7 @@
 -export([
 -export([
     init/1,
     init/1,
     apply/3,
     apply/3,
+    tick/2,
 
 
     snapshot_module/0
     snapshot_module/0
 ]).
 ]).
@@ -161,6 +163,8 @@
 
 
 -type timestamp_us() :: non_neg_integer().
 -type timestamp_us() :: non_neg_integer().
 
 
+-define(gv_timestamp(SHARD), {gv_timestamp, SHARD}).
+
 %%================================================================================
 %%================================================================================
 %% API functions
 %% API functions
 %%================================================================================
 %%================================================================================
@@ -367,6 +371,12 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
 foreach_shard(DB, Fun) ->
 foreach_shard(DB, Fun) ->
     lists:foreach(Fun, list_shards(DB)).
     lists:foreach(Fun, list_shards(DB)).
 
 
+%% @doc Messages have been replicated up to this timestamp on the
+%% local server
+-spec current_timestamp(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> emqx_ds:time().
+current_timestamp(DB, Shard) ->
+    emqx_ds_builtin_sup:get_gvar(DB, ?gv_timestamp(Shard), 0).
+
 %%================================================================================
 %%================================================================================
 %% behavior callbacks
 %% behavior callbacks
 %%================================================================================
 %%================================================================================
@@ -491,7 +501,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
     ShardId = {DB, Shard},
     ShardId = {DB, Shard},
     ?IF_STORAGE_RUNNING(
     ?IF_STORAGE_RUNNING(
         ShardId,
         ShardId,
-        emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us())
+        emqx_ds_storage_layer:next(
+            ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
+        )
     ).
     ).
 
 
 -spec do_delete_next_v4(
 -spec do_delete_next_v4(
@@ -504,7 +516,11 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
     emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()).
     emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()).
 do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
 do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
     emqx_ds_storage_layer:delete_next(
     emqx_ds_storage_layer:delete_next(
-        {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us()
+        {DB, Shard},
+        Iter,
+        Selector,
+        BatchSize,
+        emqx_ds_replication_layer:current_timestamp(DB, Shard)
     ).
     ).
 
 
 -spec do_add_generation_v2(emqx_ds:db()) -> no_return().
 -spec do_add_generation_v2(emqx_ds:db()) -> no_return().
@@ -675,7 +691,7 @@ apply(
         ?tag := ?BATCH,
         ?tag := ?BATCH,
         ?batch_messages := MessagesIn
         ?batch_messages := MessagesIn
     },
     },
-    #{db_shard := DBShard, latest := Latest0} = State
+    #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State
 ) ->
 ) ->
     %% NOTE
     %% NOTE
     %% Unique timestamp tracking real time closely.
     %% Unique timestamp tracking real time closely.
@@ -686,6 +702,7 @@ apply(
     NState = State#{latest := Latest},
     NState = State#{latest := Latest},
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
     %% TODO: Need to measure effects of changing frequency of `release_cursor`.
     Effect = {release_cursor, RaftIdx, NState},
     Effect = {release_cursor, RaftIdx, NState},
+    emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), Latest),
     {NState, Result, Effect};
     {NState, Result, Effect};
 apply(
 apply(
     _RaftMeta,
     _RaftMeta,
@@ -711,7 +728,20 @@ apply(
     #{db_shard := DBShard} = State
     #{db_shard := DBShard} = State
 ) ->
 ) ->
     Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
     Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
-    {State, Result}.
+    {State, Result};
+apply(
+    _RaftMeta,
+    #{?tag := storage_event, ?payload := CustomEvent},
+    #{db_shard := DBShard, latest := Latest0} = State
+) ->
+    {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0),
+    Effects = handle_custom_event(DBShard, Timestamp, CustomEvent),
+    {State#{latest := Latest}, ok, Effects}.
+
+-spec tick(integer(), ra_state()) -> ra_machine:effects().
+tick(TimeMs, #{db_shard := DBShard, latest := Latest}) ->
+    {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest),
+    handle_custom_event(DBShard, Timestamp, tick).
 
 
 assign_timestamps(Latest, Messages) ->
 assign_timestamps(Latest, Messages) ->
     assign_timestamps(Latest, Messages, []).
     assign_timestamps(Latest, Messages, []).
@@ -744,3 +774,13 @@ timeus_to_timestamp(TimestampUs) ->
 
 
 snapshot_module() ->
 snapshot_module() ->
     emqx_ds_replication_snapshot.
     emqx_ds_replication_snapshot.
+
+handle_custom_event(DBShard, Latest, Event) ->
+    try
+        Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event),
+        [{append, #{?tag => storage_event, ?payload => I}} || I <- Events]
+    catch
+        EC:Err:Stacktrace ->
+            logger:error(#{EC => Err, stacktrace => Stacktrace, msg => "ds_storage_layer_tick"}),
+            []
+    end.

+ 3 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -41,4 +41,7 @@
 %% drop_generation
 %% drop_generation
 -define(generation, 2).
 -define(generation, 2).
 
 
+%% custom events
+-define(payload, 2).
+
 -endif.
 -endif.

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -16,6 +16,7 @@
 
 
 -module(emqx_ds_replication_layer_shard).
 -module(emqx_ds_replication_layer_shard).
 
 
+%% API:
 -export([start_link/3]).
 -export([start_link/3]).
 
 
 %% Static server configuration
 %% Static server configuration

+ 34 - 8
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -36,7 +36,9 @@
     update_iterator/4,
     update_iterator/4,
     next/5,
     next/5,
     delete_next/6,
     delete_next/6,
-    post_creation_actions/1
+    post_creation_actions/1,
+
+    handle_event/4
 ]).
 ]).
 
 
 %% internal exports:
 %% internal exports:
@@ -90,7 +92,8 @@
     trie :: emqx_ds_lts:trie(),
     trie :: emqx_ds_lts:trie(),
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
     ts_bits :: non_neg_integer(),
     ts_bits :: non_neg_integer(),
-    ts_offset :: non_neg_integer()
+    ts_offset :: non_neg_integer(),
+    gvars :: ets:table()
 }).
 }).
 
 
 -type s() :: #s{}.
 -type s() :: #s{}.
@@ -142,6 +145,10 @@
 
 
 -define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]).
 -define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]).
 
 
+%% GVar used for idle detection:
+-define(IDLE_DETECT, idle_detect).
+-define(EPOCH(S, TS), (TS bsl S#s.ts_bits)).
+
 -ifdef(TEST).
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -endif.
 -endif.
@@ -215,7 +222,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         trie = Trie,
         trie = Trie,
         keymappers = KeymapperCache,
         keymappers = KeymapperCache,
         ts_offset = TSOffsetBits,
         ts_offset = TSOffsetBits,
-        ts_bits = TSBits
+        ts_bits = TSBits,
+        gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
     }.
 
 
 -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
 -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
@@ -240,8 +248,9 @@ post_creation_actions(
     s()
     s()
 ) ->
 ) ->
     ok.
     ok.
-drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) ->
+drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
     emqx_ds_lts:destroy(Trie),
     emqx_ds_lts:destroy(Trie),
+    catch ets:delete(GVars),
     {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
     {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
     {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
     ok = rocksdb:drop_column_family(DBHandle, DataCF),
     ok = rocksdb:drop_column_family(DBHandle, DataCF),
@@ -255,18 +264,21 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) ->
     emqx_ds:message_store_opts()
     emqx_ds:message_store_opts()
 ) ->
 ) ->
     emqx_ds:store_batch_result().
     emqx_ds:store_batch_result().
-store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
+store_batch(_ShardId, S = #s{db = DB, data = Data, gvars = Gvars}, Messages, _Options) ->
     {ok, Batch} = rocksdb:batch(),
     {ok, Batch} = rocksdb:batch(),
-    lists:foreach(
-        fun({Timestamp, Msg}) ->
+    MaxTs = lists:foldl(
+        fun({Timestamp, Msg}, Acc) ->
             {Key, _} = make_key(S, Timestamp, Msg),
             {Key, _} = make_key(S, Timestamp, Msg),
             Val = serialize(Msg),
             Val = serialize(Msg),
-            rocksdb:put(DB, Data, Key, Val, [])
+            ok = rocksdb:put(DB, Data, Key, Val, []),
+            max(Acc, Timestamp)
         end,
         end,
+        0,
         Messages
         Messages
     ),
     ),
     Result = rocksdb:write_batch(DB, Batch, []),
     Result = rocksdb:write_batch(DB, Batch, []),
     rocksdb:release_batch(Batch),
     rocksdb:release_batch(Batch),
+    ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}),
     %% NOTE
     %% NOTE
     %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to
     %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to
     %% observe until there's `{no_slowdown, true}` in write options.
     %% observe until there's `{no_slowdown, true}` in write options.
@@ -469,6 +481,20 @@ delete_next_until(
         rocksdb:iterator_close(ITHandle)
         rocksdb:iterator_close(ITHandle)
     end.
     end.
 
 
+handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
+    %% Cause replication layer to bump timestamp when idle
+    case ets:lookup(Gvars, ?IDLE_DETECT) of
+        [{?IDLE_DETECT, false, LastWrittenTs}] when
+            ?EPOCH(State, LastWrittenTs) > ?EPOCH(State, Time)
+        ->
+            ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
+            [emqx_ds_storage_bitfield_lts_dummy_event];
+        _ ->
+            []
+    end;
+handle_event(_ShardId, _Data, _Time, _Event) ->
+    [].
+
 %%================================================================================
 %%================================================================================
 %% Internal functions
 %% Internal functions
 %%================================================================================
 %%================================================================================

+ 26 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -42,7 +42,10 @@
 
 
     %% Snapshotting
     %% Snapshotting
     take_snapshot/1,
     take_snapshot/1,
-    accept_snapshot/1
+    accept_snapshot/1,
+
+    %% Custom events
+    handle_event/3
 ]).
 ]).
 
 
 %% gen_server
 %% gen_server
@@ -79,7 +82,6 @@
 
 
 %% # "Record" integer keys.  We use maps with integer keys to avoid persisting and sending
 %% # "Record" integer keys.  We use maps with integer keys to avoid persisting and sending
 %% records over the wire.
 %% records over the wire.
-
 %% tags:
 %% tags:
 -define(STREAM, 1).
 -define(STREAM, 1).
 -define(IT, 2).
 -define(IT, 2).
@@ -201,6 +203,7 @@
 -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
 -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
     _Data.
     _Data.
 
 
+%% Delete the schema and data
 -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
 -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
     ok | {error, _Reason}.
     ok | {error, _Reason}.
 
 
@@ -231,9 +234,11 @@
 ) ->
 ) ->
     {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
     {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
 
 
+-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
+
 -callback post_creation_actions(post_creation_context()) -> _Data.
 -callback post_creation_actions(post_creation_context()) -> _Data.
 
 
--optional_callbacks([post_creation_actions/1]).
+-optional_callbacks([post_creation_actions/1, handle_event/4]).
 
 
 %%================================================================================
 %%================================================================================
 %% API for the replication layer
 %% API for the replication layer
@@ -857,6 +862,24 @@ handle_accept_snapshot(ShardId) ->
     Dir = db_dir(ShardId),
     Dir = db_dir(ShardId),
     emqx_ds_storage_snapshot:new_writer(Dir).
     emqx_ds_storage_snapshot:new_writer(Dir).
 
 
+%% FIXME: currently this interface is a hack to handle safe cutoff
+%% timestamp in LTS. It has many shortcomings (can lead to infinite
+%% loops if the CBM is not careful; events from one generation may be
+%% sent to the next one, etc.) and the API is not well thought out in
+%% general.
+%%
+%% The mechanism of storage layer events should be refined later.
+-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
+handle_event(Shard, Time, Event) ->
+    #{module := Mod, data := GenData} = generation_at(Shard, Time),
+    ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}),
+    case erlang:function_exported(Mod, handle_event, 4) of
+        true ->
+            Mod:handle_event(Shard, GenData, Time, Event);
+        false ->
+            []
+    end.
+
 %%--------------------------------------------------------------------------------
 %%--------------------------------------------------------------------------------
 %% Schema access
 %% Schema access
 %%--------------------------------------------------------------------------------
 %%--------------------------------------------------------------------------------