Bläddra i källkod

Merge pull request #13072 from ieQu1/dev/fix-drop-generation

Idempotent drop_generation
ieQu1 1 år sedan
förälder
incheckning
bfd01c425d

+ 4 - 6
apps/emqx/src/emqx_ds_schema.erl

@@ -246,7 +246,8 @@ fields(layout_builtin_reference) ->
                 reference,
                 #{
                     'readOnly' => true,
-                    importance => ?IMPORTANCE_HIDDEN
+                    importance => ?IMPORTANCE_LOW,
+                    desc => ?DESC(layout_builtin_reference_type)
                 }
             )}
     ].
@@ -257,6 +258,8 @@ desc(builtin_local_write_buffer) ->
     ?DESC(builtin_local_write_buffer);
 desc(layout_builtin_wildcard_optimized) ->
     ?DESC(layout_builtin_wildcard_optimized);
+desc(layout_builtin_reference) ->
+    ?DESC(layout_builtin_reference);
 desc(_) ->
     undefined.
 
@@ -273,17 +276,12 @@ ds_schema(Options) ->
         Options
     ).
 
--ifndef(TEST).
-builtin_layouts() ->
-    [ref(layout_builtin_wildcard_optimized)].
--else.
 builtin_layouts() ->
     %% Reference layout stores everything in one stream, so it's not
     %% suitable for production use. However, it's very simple and
     %% produces a very predictabale replay order, which can be useful
     %% for testing and debugging:
     [ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
--endif.
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 

+ 2 - 0
apps/emqx_durable_storage/README.md

@@ -124,6 +124,8 @@ The following application environment variables are available:
 
 - `emqx_durable_storage.egress_flush_interval`: period at which the batches of messages are committed to the durable storage.
 
+- `emqx_durable_storage.reads`: `leader_preferred` | `local_preferred`.
+
 Runtime settings for the durable storages can be modified via CLI as well as the REST API.
 The following CLI commands are available:
 

+ 28 - 0
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -33,6 +33,12 @@
 ]).
 -export([which_dbs/0, which_shards/1]).
 
+%% Debug:
+-export([
+    get_egress_workers/1,
+    get_shard_workers/1
+]).
+
 %% behaviour callbacks:
 -export([init/1]).
 
@@ -111,6 +117,28 @@ which_dbs() ->
     Key = {n, l, #?db_sup{_ = '_', db = '$1'}},
     gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]).
 
+%% @doc Get pids of all local egress servers for the given DB.
+-spec get_egress_workers(emqx_ds:db()) -> #{_Shard => pid()}.
+get_egress_workers(DB) ->
+    Children = supervisor:which_children(?via(#?egress_sup{db = DB})),
+    L = [{Shard, Child} || {Shard, Child, _, _} <- Children, is_pid(Child)],
+    maps:from_list(L).
+
+%% @doc Get pids of all local shard servers for the given DB.
+-spec get_shard_workers(emqx_ds:db()) -> #{_Shard => pid()}.
+get_shard_workers(DB) ->
+    Shards = supervisor:which_children(?via(#?shards_sup{db = DB})),
+    L = lists:flatmap(
+        fun
+            ({_Shard, Sup, _, _}) when is_pid(Sup) ->
+                [{Id, Pid} || {Id, Pid, _, _} <- supervisor:which_children(Sup), is_pid(Pid)];
+            (_) ->
+                []
+        end,
+        Shards
+    ),
+    maps:from_list(L).
+
 %%================================================================================
 %% behaviour callbacks
 %%================================================================================

+ 99 - 24
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -561,12 +561,27 @@ list_nodes() ->
 %% Too large for normal operation, need better backpressure mechanism.
 -define(RA_TIMEOUT, 60 * 1000).
 
--define(SAFERPC(EXPR),
+-define(SAFE_ERPC(EXPR),
     try
         EXPR
     catch
-        error:RPCError = {erpc, _} ->
-            {error, recoverable, RPCError}
+        error:RPCError__ = {erpc, _} ->
+            {error, recoverable, RPCError__}
+    end
+).
+
+-define(SHARD_RPC(DB, SHARD, NODE, BODY),
+    case
+        emqx_ds_replication_layer_shard:servers(
+            DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred)
+        )
+    of
+        [{_, NODE} | _] ->
+            begin
+                BODY
+            end;
+        [] ->
+            {error, recoverable, replica_offline}
     end
 ).
 
@@ -623,44 +638,79 @@ ra_drop_generation(DB, Shard, GenId) ->
     end.
 
 ra_get_streams(DB, Shard, TopicFilter, Time) ->
-    {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
     TimestampUs = timestamp_to_timeus(Time),
-    ?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)).
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        ?SAFE_ERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs))
+    ).
 
 ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    ?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)).
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        ?SAFE_ERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time))
+    ).
 
 ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
-    {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
     TimeUs = timestamp_to_timeus(StartTime),
-    ?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        ?SAFE_ERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs))
+    ).
 
 ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
     TimeUs = timestamp_to_timeus(StartTime),
-    ?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        ?SAFE_ERPC(
+            emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)
+        )
+    ).
 
 ra_update_iterator(DB, Shard, Iter, DSKey) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    ?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        ?SAFE_ERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey))
+    ).
 
 ra_next(DB, Shard, Iter, BatchSize) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
-        RPCError = {badrpc, _} ->
-            {error, recoverable, RPCError};
-        Other ->
-            Other
-    end.
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
+            Err = {badrpc, _} ->
+                {error, recoverable, Err};
+            Ret ->
+                Ret
+        end
+    ).
 
 ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize).
+    ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        ?SAFE_ERPC(emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize))
+    ).
 
 ra_list_generations_with_lifetimes(DB, Shard) ->
-    {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
-    case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of
+    Reply = ?SHARD_RPC(
+        DB,
+        Shard,
+        Node,
+        ?SAFE_ERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard))
+    ),
+    case Reply of
         Gens = #{} ->
             maps:map(
                 fun(_GenId, Data = #{since := Since, until := Until}) ->
@@ -711,6 +761,14 @@ apply(
     #{?tag := add_generation, ?since := Since},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
+    ?tp(
+        info,
+        ds_replication_layer_add_generation,
+        #{
+            shard => DBShard,
+            since => Since
+        }
+    ),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
     State = State0#{latest := Latest},
@@ -721,6 +779,15 @@ apply(
     #{?tag := update_config, ?since := Since, ?config := Opts},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
+    ?tp(
+        notice,
+        ds_replication_layer_update_config,
+        #{
+            shard => DBShard,
+            config => Opts,
+            since => Since
+        }
+    ),
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
     State = State0#{latest := Latest},
@@ -730,6 +797,14 @@ apply(
     #{?tag := drop_generation, ?generation := GenId},
     #{db_shard := DBShard} = State
 ) ->
+    ?tp(
+        info,
+        ds_replication_layer_drop_generation,
+        #{
+            shard => DBShard,
+            generation => GenId
+        }
+    ),
     Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
     {State, Result};
 apply(

+ 16 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -33,7 +33,7 @@
 -export([start_link/2, store_batch/3]).
 
 %% behavior callbacks:
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 %% internal exports:
 -export([]).
@@ -129,6 +129,21 @@ init([DB, Shard]) ->
     },
     {ok, S}.
 
+format_status(Status) ->
+    maps:map(
+        fun
+            (state, #s{db = DB, shard = Shard, queue = Q}) ->
+                #{
+                    db => DB,
+                    shard => Shard,
+                    queue => queue:len(Q)
+                };
+            (_, Val) ->
+                Val
+        end,
+        Status
+    ).
+
 handle_call(
     #enqueue_req{
         messages = Msgs,

+ 21 - 27
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -28,8 +28,7 @@
 
 %% Dynamic server location API
 -export([
-    servers/3,
-    server/3
+    servers/3
 ]).
 
 %% Membership
@@ -83,16 +82,15 @@ server_name(DB, Shard, Site) ->
 
 %%
 
--spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server(), ...] when
-    Order :: leader_preferred | undefined.
-servers(DB, Shard, _Order = leader_preferred) ->
+-spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server()] when
+    Order :: leader_preferred | local_preferred | undefined.
+servers(DB, Shard, leader_preferred) ->
     get_servers_leader_preferred(DB, Shard);
+servers(DB, Shard, local_preferred) ->
+    get_servers_local_preferred(DB, Shard);
 servers(DB, Shard, _Order = undefined) ->
     get_shard_servers(DB, Shard).
 
-server(DB, Shard, _Which = local_preferred) ->
-    get_server_local_preferred(DB, Shard).
-
 get_servers_leader_preferred(DB, Shard) ->
     %% NOTE: Contact last known leader first, then rest of shard servers.
     ClusterName = get_cluster_name(DB, Shard),
@@ -104,17 +102,24 @@ get_servers_leader_preferred(DB, Shard) ->
             get_online_servers(DB, Shard)
     end.
 
-get_server_local_preferred(DB, Shard) ->
-    %% NOTE: Contact either local server or a random replica.
+get_servers_local_preferred(DB, Shard) ->
+    %% Return list of servers, where the local replica (if exists) is
+    %% the first element. Note: result is _NOT_ shuffled. This can be
+    %% bad for the load balancing, but it makes results more
+    %% deterministic. Caller that doesn't care about that can shuffle
+    %% the results by itself.
     ClusterName = get_cluster_name(DB, Shard),
     case ra_leaderboard:lookup_members(ClusterName) of
-        Servers when is_list(Servers) ->
-            pick_local(Servers);
         undefined ->
-            %% TODO
-            %% Leader is unkonwn if there are no servers of this group on the
-            %% local node. We want to pick a replica in that case as well.
-            pick_random(get_online_servers(DB, Shard))
+            Servers = get_online_servers(DB, Shard);
+        Servers when is_list(Servers) ->
+            ok
+    end,
+    case lists:keytake(node(), 2, Servers) of
+        false ->
+            Servers;
+        {value, Local, Rest} ->
+            [Local | Rest]
     end.
 
 lookup_leader(DB, Shard) ->
@@ -139,17 +144,6 @@ filter_online(Servers) ->
 is_server_online({_Name, Node}) ->
     Node == node() orelse lists:member(Node, nodes()).
 
-pick_local(Servers) ->
-    case lists:keyfind(node(), 2, Servers) of
-        Local when is_tuple(Local) ->
-            Local;
-        false ->
-            pick_random(Servers)
-    end.
-
-pick_random(Servers) ->
-    lists:nth(rand:uniform(length(Servers)), Servers).
-
 get_cluster_name(DB, Shard) ->
     memoize(fun cluster_name/2, [DB, Shard]).
 

+ 44 - 14
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -35,7 +35,7 @@
     make_iterator/5,
     make_delete_iterator/5,
     update_iterator/4,
-    next/5,
+    next/6,
     delete_next/6,
     post_creation_actions/1,
 
@@ -161,7 +161,7 @@
 
 %% GVar used for idle detection:
 -define(IDLE_DETECT, idle_detect).
--define(EPOCH(S, TS), (TS bsl S#s.ts_bits)).
+-define(EPOCH(S, TS), (TS bsr S#s.ts_offset)).
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
@@ -424,23 +424,21 @@ next(
     Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
     It = #{?storage_key := Stream},
     BatchSize,
-    Now
+    Now,
+    IsCurrent
 ) ->
     init_counters(),
     %% Compute safe cutoff time. It's the point in time where the last
     %% complete epoch ends, so we need to know the current time to
     %% compute it. This is needed because new keys can be added before
     %% the iterator.
-    IsWildcard =
+    %%
+    %% This is needed to avoid situations when the iterator advances
+    %% to position k1, and then a new message with k2, such that k2 <
+    %% k1 is inserted. k2 would be missed.
+    HasCutoff =
         case Stream of
-            {_StaticKey, []} -> false;
-            _ -> true
-        end,
-    SafeCutoffTime =
-        case IsWildcard of
-            true ->
-                (Now bsr TSOffset) bsl TSOffset;
-            false ->
+            {_StaticKey, []} ->
                 %% Iterators scanning streams without varying topic
                 %% levels can operate on incomplete epochs, since new
                 %% matching keys for the single topic are added in
@@ -450,10 +448,27 @@ next(
                 %% filters operating on streams with varying parts:
                 %% iterator can jump to the next topic and then it
                 %% won't backtrack.
+                false;
+            _ ->
+                %% New batches are only added to the current
+                %% generation. We can ignore cutoff time for old
+                %% generations:
+                IsCurrent
+        end,
+    SafeCutoffTime =
+        case HasCutoff of
+            true ->
+                ?EPOCH(Schema, Now) bsl TSOffset;
+            false ->
                 1 bsl TSBits - 1
         end,
     try
-        next_until(Schema, It, SafeCutoffTime, BatchSize)
+        case next_until(Schema, It, SafeCutoffTime, BatchSize) of
+            {ok, _, []} when not IsCurrent ->
+                {ok, end_of_stream};
+            Result ->
+                Result
+        end
     after
         report_counters(Shard)
     end.
@@ -538,6 +553,17 @@ delete_next_until(
     end.
 
 handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
+    %% If the last message was published more than one epoch ago, and
+    %% the shard remains idle, we need to advance safety cutoff
+    %% interval to make sure the last epoch becomes visible to the
+    %% readers.
+    %%
+    %% We do so by emitting a dummy event that will be persisted by
+    %% the replication layer. Processing it will advance the
+    %% replication layer's clock.
+    %%
+    %% This operation is latched to avoid publishing events on every
+    %% tick.
     case ets:lookup(Gvars, ?IDLE_DETECT) of
         [{?IDLE_DETECT, Latch, LastWrittenTs}] ->
             ok;
@@ -546,13 +572,17 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
             LastWrittenTs = 0
     end,
     case Latch of
-        false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) ->
+        false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 ->
+            %% Note: + 1 above delays the event by one epoch to add a
+            %% safety margin.
             ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
             [dummy_event];
         _ ->
             []
     end;
 handle_event(_ShardId, _Data, _Time, _Event) ->
+    %% `dummy_event' goes here and does nothing. But it forces update
+    %% of `Time' in the replication layer.
     [].
 
 %%================================================================================

+ 120 - 70
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -52,7 +52,7 @@
 ]).
 
 %% gen_server
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 %% internal exports:
 -export([db_dir/1]).
@@ -80,6 +80,10 @@
 -define(stream_v2(GENERATION, INNER), [GENERATION | INNER]).
 -define(delete_stream(GENERATION, INNER), [GENERATION | INNER]).
 
+%% Wrappers for the storage events:
+-define(storage_event(GEN_ID, PAYLOAD), #{0 := 3333, 1 := GEN_ID, 2 := PAYLOAD}).
+-define(mk_storage_event(GEN_ID, PAYLOAD), #{0 => 3333, 1 => GEN_ID, 2 => PAYLOAD}).
+
 %%================================================================================
 %% Type declarations
 %%================================================================================
@@ -244,8 +248,8 @@
 ) ->
     emqx_ds:make_delete_iterator_result(_Iterator).
 
--callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) ->
-    {ok, Iter, [emqx_types:message()]} | {error, _}.
+-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) ->
+    {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
 
 -callback delete_next(
     shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
@@ -297,25 +301,30 @@ store_batch(Shard, Messages, Options) ->
     [{emqx_ds:time(), emqx_types:message()}],
     emqx_ds:message_store_opts()
 ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
-prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
+prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
     %% NOTE
     %% We assume that batches do not span generations. Callers should enforce this.
     ?tp(emqx_ds_storage_layer_prepare_batch, #{
         shard => Shard, messages => Messages, options => Options
     }),
-    {GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
-    T0 = erlang:monotonic_time(microsecond),
-    Result =
-        case Mod:prepare_batch(Shard, GenData, Messages, Options) of
-            {ok, CookedBatch} ->
-                {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
-            Error = {error, _, _} ->
-                Error
-        end,
-    T1 = erlang:monotonic_time(microsecond),
-    %% TODO store->prepare
-    emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
-    Result;
+    %% FIXME: always store messages in the current generation
+    case generation_at(Shard, Time) of
+        {GenId, #{module := Mod, data := GenData}} ->
+            T0 = erlang:monotonic_time(microsecond),
+            Result =
+                case Mod:prepare_batch(Shard, GenData, Messages, Options) of
+                    {ok, CookedBatch} ->
+                        {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
+                    Error = {error, _, _} ->
+                        Error
+                end,
+            T1 = erlang:monotonic_time(microsecond),
+            %% TODO store->prepare
+            emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
+            Result;
+        not_found ->
+            ignore
+    end;
 prepare_batch(_Shard, [], _Options) ->
     ignore.
 
@@ -444,15 +453,12 @@ update_iterator(
 next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) ->
     case generation_get(Shard, GenId) of
         #{module := Mod, data := GenData} ->
-            Current = generation_current(Shard),
-            case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of
-                {ok, _GenIter, []} when GenId < Current ->
-                    %% This is a past generation. Storage layer won't write
-                    %% any more messages here. The iterator reached the end:
-                    %% the stream has been fully replayed.
-                    {ok, end_of_stream};
+            IsCurrent = GenId =:= generation_current(Shard),
+            case Mod:next(Shard, GenData, GenIter0, BatchSize, Now, IsCurrent) of
                 {ok, GenIter, Batch} ->
                     {ok, Iter#{?enc := GenIter}, Batch};
+                {ok, end_of_stream} ->
+                    {ok, end_of_stream};
                 Error = {error, _, _} ->
                     Error
             end;
@@ -513,7 +519,7 @@ add_generation(ShardId, Since) ->
 list_generations_with_lifetimes(ShardId) ->
     gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity).
 
--spec drop_generation(shard_id(), gen_id()) -> ok.
+-spec drop_generation(shard_id(), gen_id()) -> ok | {error, _}.
 drop_generation(ShardId, GenId) ->
     gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
 
@@ -563,6 +569,7 @@ start_link(Shard = {_, _}, Options) ->
 
 init({ShardId, Options}) ->
     process_flag(trap_exit, true),
+    ?tp(info, ds_storage_init, #{shard => ShardId}),
     logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}),
     erase_schema_runtime(ShardId),
     clear_all_checkpoints(ShardId),
@@ -586,6 +593,17 @@ init({ShardId, Options}) ->
     commit_metadata(S),
     {ok, S}.
 
+format_status(Status) ->
+    maps:map(
+        fun
+            (state, State) ->
+                format_state(State);
+            (_, Val) ->
+                Val
+        end,
+        Status
+    ).
+
 handle_call(#call_update_config{since = Since, options = Options}, _From, S0) ->
     case handle_update_config(S0, Since, Options) of
         S = #s{} ->
@@ -758,18 +776,31 @@ handle_drop_generation(S0, GenId) ->
     } = S0,
     #{module := Mod, cf_refs := GenCFRefs} = GenSchema,
     #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
-    case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of
-        ok ->
-            CFRefs = OldCFRefs -- GenCFRefs,
-            Shard = maps:remove(?GEN_KEY(GenId), OldShard),
-            Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
-            S = S0#s{
-                cf_refs = CFRefs,
-                shard = Shard,
-                schema = Schema
-            },
-            {ok, S}
-    end.
+    try
+        Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData)
+    catch
+        EC:Err:Stack ->
+            ?tp(
+                error,
+                ds_storage_layer_failed_to_drop_generation,
+                #{
+                    shard => ShardId,
+                    EC => Err,
+                    stacktrace => Stack,
+                    generation => GenId,
+                    s => format_state(S0)
+                }
+            )
+    end,
+    CFRefs = OldCFRefs -- GenCFRefs,
+    Shard = maps:remove(?GEN_KEY(GenId), OldShard),
+    Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
+    S = S0#s{
+        cf_refs = CFRefs,
+        shard = Shard,
+        schema = Schema
+    },
+    {ok, S}.
 
 -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
     generation().
@@ -815,10 +846,6 @@ new_generation(ShardId, DB, Schema0, Since) ->
 next_generation_id(GenId) ->
     GenId + 1.
 
--spec prev_generation_id(gen_id()) -> gen_id().
-prev_generation_id(GenId) when GenId > 0 ->
-    GenId - 1.
-
 %% @doc Commit current state of the server to both rocksdb and the persistent term
 -spec commit_metadata(server_state()) -> ok.
 commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) ->
@@ -914,23 +941,23 @@ handle_accept_snapshot(ShardId) ->
     Dir = db_dir(ShardId),
     emqx_ds_storage_snapshot:new_writer(Dir).
 
-%% FIXME: currently this interface is a hack to handle safe cutoff
-%% timestamp in LTS. It has many shortcomings (can lead to infinite
-%% loops if the CBM is not careful; events from one generation may be
-%% sent to the next one, etc.) and the API is not well thought out in
-%% general.
-%%
-%% The mechanism of storage layer events should be refined later.
--spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
+-spec handle_event(shard_id(), emqx_ds:time(), Event) -> [Event].
+handle_event(Shard, Time, ?storage_event(GenId, Event)) ->
+    case generation_get(Shard, GenId) of
+        not_found ->
+            [];
+        #{module := Mod, data := GenData} ->
+            case erlang:function_exported(Mod, handle_event, 4) of
+                true ->
+                    NewEvents = Mod:handle_event(Shard, GenData, Time, Event),
+                    [?mk_storage_event(GenId, E) || E <- NewEvents];
+                false ->
+                    []
+            end
+    end;
 handle_event(Shard, Time, Event) ->
-    {_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
-    ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}),
-    case erlang:function_exported(Mod, handle_event, 4) of
-        true ->
-            Mod:handle_event(Shard, GenData, Time, Event);
-        false ->
-            []
-    end.
+    GenId = generation_current(Shard),
+    handle_event(Shard, Time, ?mk_storage_event(GenId, Event)).
 
 %%--------------------------------------------------------------------------------
 %% Schema access
@@ -941,6 +968,25 @@ generation_current(Shard) ->
     #{current_generation := Current} = get_schema_runtime(Shard),
     Current.
 
+%% TODO: remove me
+-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()} | not_found.
+generation_at(Shard, Time) ->
+    Schema = #{current_generation := Current} = get_schema_runtime(Shard),
+    generation_at(Time, Current, Schema).
+
+generation_at(Time, GenId, Schema) ->
+    case Schema of
+        #{?GEN_KEY(GenId) := Gen} ->
+            case Gen of
+                #{since := Since} when Time < Since andalso GenId > 0 ->
+                    generation_at(Time, GenId - 1, Schema);
+                _ ->
+                    {GenId, Gen}
+            end;
+        _ ->
+            not_found
+    end.
+
 -spec generation_get(shard_id(), gen_id()) -> generation() | not_found.
 generation_get(Shard, GenId) ->
     case get_schema_runtime(Shard) of
@@ -964,19 +1010,23 @@ generations_since(Shard, Since) ->
         Schema
     ).
 
--spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}.
-generation_at(Shard, Time) ->
-    Schema = #{current_generation := Current} = get_schema_runtime(Shard),
-    generation_at(Time, Current, Schema).
-
-generation_at(Time, GenId, Schema) ->
-    #{?GEN_KEY(GenId) := Gen} = Schema,
-    case Gen of
-        #{since := Since} when Time < Since andalso GenId > 0 ->
-            generation_at(Time, prev_generation_id(GenId), Schema);
-        _ ->
-            {GenId, Gen}
-    end.
+format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) ->
+    #{
+        id => ShardId,
+        db => DB,
+        cf_refs => CFRefs,
+        schema => Schema,
+        shard =>
+            maps:map(
+                fun
+                    (?GEN_KEY(_), _Schema) ->
+                        '...';
+                    (_K, Val) ->
+                        Val
+                end,
+                Shard
+            )
+    }.
 
 -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
 

+ 8 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -38,7 +38,7 @@
     make_iterator/5,
     make_delete_iterator/5,
     update_iterator/4,
-    next/5,
+    next/6,
     delete_next/6
 ]).
 
@@ -148,7 +148,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) ->
         last_seen_message_key = DSKey
     }}.
 
-next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
+next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) ->
     #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
     {ok, ITHandle} = rocksdb:iterator(DB, CF, []),
     Action =
@@ -162,7 +162,12 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
     {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []),
     rocksdb:iterator_close(ITHandle),
     It = It0#it{last_seen_message_key = Key},
-    {ok, It, lists:reverse(Messages)}.
+    case Messages of
+        [] when not IsCurrent ->
+            {ok, end_of_stream};
+        _ ->
+            {ok, It, lists:reverse(Messages)}
+    end.
 
 delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
     #delete_it{

+ 1 - 2
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl

@@ -179,8 +179,7 @@ make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
     | {ok, end_of_stream}
     | {error, _}.
 delete_next(Node, DB, Shard, Iter, Selector, BatchSize) ->
-    emqx_rpc:call(
-        Shard,
+    erpc:call(
         Node,
         emqx_ds_replication_layer,
         do_delete_next_v4,

+ 10 - 4
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -67,10 +67,16 @@ t_00_smoke_open_drop(_Config) ->
 %% A simple smoke test that verifies that storing the messages doesn't
 %% crash
 t_01_smoke_store(_Config) ->
-    DB = default,
-    ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
-    Msg = message(<<"foo/bar">>, <<"foo">>, 0),
-    ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])).
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            DB = default,
+            ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
+            Msg = message(<<"foo/bar">>, <<"foo">>, 0),
+            ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg]))
+        end,
+        []
+    ).
 
 %% A simple smoke test that verifies that getting the list of streams
 %% doesn't crash and that iterators can be opened.

+ 8 - 9
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -183,7 +183,7 @@ t_rebalance(Config) ->
             ],
             Stream1 = emqx_utils_stream:interleave(
                 [
-                    {50, Stream0},
+                    {10, Stream0},
                     emqx_utils_stream:const(add_generation)
                 ],
                 false
@@ -479,11 +479,13 @@ t_rebalance_offline_restarts(Config) ->
 %%
 
 shard_server_info(Node, DB, Shard, Site, Info) ->
-    Server = shard_server(Node, DB, Shard, Site),
-    {Server, ds_repl_shard(Node, server_info, [Info, Server])}.
-
-shard_server(Node, DB, Shard, Site) ->
-    ds_repl_shard(Node, shard_server, [DB, Shard, Site]).
+    ?ON(
+        Node,
+        begin
+            Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
+            {Server, emqx_ds_replication_layer_shard:server_info(Info, Server)}
+        end
+    ).
 
 ds_repl_meta(Node, Fun) ->
     ds_repl_meta(Node, Fun, []).
@@ -499,9 +501,6 @@ ds_repl_meta(Node, Fun, Args) ->
             error(meta_op_failed)
     end.
 
-ds_repl_shard(Node, Fun, Args) ->
-    erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
-
 shards(Node, DB) ->
     erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
 

+ 1 - 20
apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl

@@ -27,25 +27,6 @@ opts() ->
 
 %%
 
-t_idempotent_store_batch(_Config) ->
-    Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
-    {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
-    %% Push some messages to the shard.
-    Msgs1 = [gen_message(N) || N <- lists:seq(10, 20)],
-    GenTs = 30,
-    Msgs2 = [gen_message(N) || N <- lists:seq(40, 50)],
-    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
-    %% Add new generation and push the same batch + some more.
-    ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, GenTs)),
-    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
-    ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})),
-    %% First batch should have been handled idempotently.
-    ?assertEqual(
-        Msgs1 ++ Msgs2,
-        lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
-    ),
-    ok = stop_shard(Pid).
-
 t_snapshot_take_restore(_Config) ->
     Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
     {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
@@ -77,7 +58,7 @@ t_snapshot_take_restore(_Config) ->
 
     %% Verify that the restored shard contains the messages up until the snapshot.
     {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
-    ?assertEqual(
+    snabbkaffe_diff:assert_lists_eq(
         Msgs1 ++ Msgs2,
         lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
     ).

+ 10 - 0
changes/ce/fix-13072.en.md

@@ -0,0 +1,10 @@
+Various fixes related to the `durable_sessions` feature:
+
+- Add an option to execute read operations on the leader.
+- `drop_generation` operation can be replayed multiple times by the replication layer, but it's not idempotent. This PR adds a workaround that avoids a crash when `drop_generation` doesn't succeed. In the future, however, we want to make `drop_generation` idempotent in a nicer way.
+- Wrap storage layer events in a small structure containing the generation ID, to make sure events are handled by the same layout CBM & context that produced them.
+- Fix crash when storage event arrives to the dropped generation (now removed `storage_layer:generation_at` function didn't handle the case of dropped generations).
+- Implement `format_status` callback for several workers to minimize log spam
+- Move the responsibility of `end_of_stream` detection to the layout CBM. Previously storage layer used a heuristic: old generations that return an empty batch won't produce more data. This was, obviously, incorrect: for example, bitfield-LTS layout MAY return empty batch while waiting for safe cutoff time.
+- `reference` layout has been enabled in prod build. It could be useful for integration testing.
+- Fix incorrect epoch calculation in `bitfield_lts:handle_event` callback that lead to missed safe cutoff time updates, and effectively, subscribers being unable to fetch messages until a fresh batch was published.

+ 13 - 4
rel/i18n/emqx_ds_schema.hocon

@@ -90,11 +90,20 @@ wildcard_optimized_epoch_bits.desc:
 
   Time span covered by each epoch grows exponentially with the value of `epoch_bits`:
 
-  - `epoch_bits = 1`: epoch time = 1 millisecond
-  - `epoch_bits = 2`: 2 milliseconds
+  - `epoch_bits = 1`: epoch time = 2 microseconds
+  - `epoch_bits = 2`: 4 microseconds
   ...
-  - `epoch_bits = 10`: 1024 milliseconds
-  - `epoch_bits = 13`: ~8 seconds
+  - `epoch_bits = 20`: ~1s
   ...~"""
 
+layout_builtin_reference.label: "Reference layout"
+layout_builtin_reference.desc:
+  """~
+  A simplistic layout type that stores all messages from all topics in chronological order in a single stream.
+
+  Not recommended for production use.~"""
+
+layout_builtin_reference_type.label: "Layout type"
+layout_builtin_reference_type.desc: "Reference layout type."
+
 }