Преглед на файлове

Merge pull request #13218 from keynslug/feat/EMQX-12468/wal-less

feat(dsrepl): enable WAL-less batch writes
Andrew Mayorov преди 1 година
родител
ревизия
3ff9440a01

+ 1 - 1
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -163,7 +163,7 @@ mk_clientid(Prefix, ID) ->
 
 restart_node(Node, NodeSpec) ->
     ?tp(will_restart_node, #{}),
-    emqx_cth_cluster:restart(Node, NodeSpec),
+    emqx_cth_cluster:restart(NodeSpec),
     wait_nodeup(Node),
     ?tp(restarted_node, #{}),
     ok.

+ 8 - 8
apps/emqx/test/emqx_cth_cluster.erl

@@ -38,7 +38,7 @@
 %%    in `end_per_suite/1` or `end_per_group/2`) with the result from step 2.
 -module(emqx_cth_cluster).
 
--export([start/1, start/2, restart/1, restart/2]).
+-export([start/1, start/2, restart/1]).
 -export([stop/1, stop_node/1]).
 
 -export([start_bare_nodes/1, start_bare_nodes/2]).
@@ -163,13 +163,13 @@ wait_clustered([Node | Nodes] = All, Check, Deadline) ->
             wait_clustered(All, Check, Deadline)
     end.
 
-restart(NodeSpec) ->
-    restart(maps:get(name, NodeSpec), NodeSpec).
-
-restart(Node, Spec) ->
-    ct:pal("Stopping peer node ~p", [Node]),
-    ok = emqx_cth_peer:stop(Node),
-    start([Spec#{boot_type => restart}]).
+restart(NodeSpecs = [_ | _]) ->
+    Nodes = [maps:get(name, Spec) || Spec <- NodeSpecs],
+    ct:pal("Stopping peer nodes: ~p", [Nodes]),
+    ok = stop(Nodes),
+    start([Spec#{boot_type => restart} || Spec <- NodeSpecs]);
+restart(NodeSpec = #{}) ->
+    restart([NodeSpec]).
 
 mk_nodespecs(Nodes, ClusterOpts) ->
     NodeSpecs = lists:zipwith(

+ 27 - 0
apps/emqx/test/emqx_cth_peer.erl

@@ -22,6 +22,7 @@
 -export([start/2, start/3, start/4]).
 -export([start_link/2, start_link/3, start_link/4]).
 -export([stop/1]).
+-export([kill/1]).
 
 start(Name, Args) ->
     start(Name, Args, []).
@@ -66,6 +67,32 @@ stop(Node) when is_atom(Node) ->
             ok
     end.
 
+%% @doc Kill a node abruptly, through mechanisms provided by OS.
+%% Relies on POSIX `kill`.
+kill(Node) ->
+    try erpc:call(Node, os, getpid, []) of
+        OSPid ->
+            Pid = whereis(Node),
+            _ = is_pid(Pid) andalso unlink(Pid),
+            Result = kill_os_process(OSPid),
+            %% Either ensure control process stops, or try to stop if not killed.
+            _ = is_pid(Pid) andalso catch peer:stop(Pid),
+            Result
+    catch
+        error:{erpc, _} = Reason ->
+            {error, Reason}
+    end.
+
+kill_os_process(OSPid) ->
+    Cmd = "kill -SIGKILL " ++ OSPid,
+    Port = erlang:open_port({spawn, Cmd}, [binary, exit_status, hide]),
+    receive
+        {Port, {exit_status, 0}} ->
+            ok;
+        {Port, {exit_status, EC}} ->
+            {error, EC}
+    end.
+
 parse_node_name(NodeName) ->
     case string:tokens(atom_to_list(NodeName), "@") of
         [Name, Host] ->

+ 142 - 36
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -57,11 +57,14 @@
     ra_store_batch/3
 ]).
 
+-behaviour(ra_machine).
 -export([
     init/1,
     apply/3,
     tick/2,
 
+    state_enter/2,
+
     snapshot_module/0
 ]).
 
@@ -143,7 +146,12 @@
 
 %% Core state of the replication, i.e. the state of ra machine.
 -type ra_state() :: #{
+    %% Shard ID.
     db_shard := {emqx_ds:db(), shard_id()},
+
+    %% Unique timestamp tracking real time closely.
+    %% With microsecond granularity it should be nearly impossible for it to run
+    %% too far ahead of the real time clock.
     latest := timestamp_us()
 }.
 
@@ -374,7 +382,7 @@ init_buffer(_DB, _Shard, _Options) ->
     {ok, #bs{}}.
 
 -spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) ->
-    {egress_state(), ok | {error, recoverable | unrecoverable, _}}.
+    {egress_state(), ok | emqx_ds:error(_)}.
 flush_buffer(DB, Shard, Messages, State) ->
     case ra_store_batch(DB, Shard, Messages) of
         {timeout, ServerId} ->
@@ -574,6 +582,20 @@ list_nodes() ->
 %% Too large for normal operation, need better backpressure mechanism.
 -define(RA_TIMEOUT, 60 * 1000).
 
+%% How often to release Raft logs?
+%% Each time we written approximately this number of bytes.
+%% Close to the RocksDB's default of 64 MiB.
+-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000_000).
+%% ...Or at least each N log entries.
+-define(RA_RELEASE_LOG_MIN_FREQ, 64_000).
+
+-ifdef(TEST).
+-undef(RA_RELEASE_LOG_APPROX_SIZE).
+-undef(RA_RELEASE_LOG_MIN_FREQ).
+-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000).
+-define(RA_RELEASE_LOG_MIN_FREQ, 1_000).
+-endif.
+
 -define(SAFE_ERPC(EXPR),
     try
         EXPR
@@ -603,18 +625,20 @@ list_nodes() ->
 ).
 
 -spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) ->
-    ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err.
+    ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}.
 ra_store_batch(DB, Shard, Messages) ->
     Command = #{
         ?tag => ?BATCH,
         ?batch_messages => Messages
     },
     Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
-    case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
+    case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of
         {ok, Result, _Leader} ->
             Result;
-        Error ->
-            Error
+        {timeout, _} = Timeout ->
+            Timeout;
+        {error, Reason = servers_unreachable} ->
+            {error, recoverable, Reason}
     end.
 
 ra_add_generation(DB, Shard) ->
@@ -741,6 +765,9 @@ ra_drop_shard(DB, Shard) ->
 
 %%
 
+-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
+-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release').
+
 -spec init(_Args :: map()) -> ra_state().
 init(#{db := DB, shard := Shard}) ->
     #{db_shard => {DB, Shard}, latest => 0}.
@@ -748,33 +775,29 @@ init(#{db := DB, shard := Shard}) ->
 -spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
     {ra_state(), _Reply, _Effects}.
 apply(
-    #{index := RaftIdx},
+    RaftMeta,
     #{
         ?tag := ?BATCH,
         ?batch_messages := MessagesIn
     },
     #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
 ) ->
-    %% NOTE
-    %% Unique timestamp tracking real time closely.
-    %% With microsecond granularity it should be nearly impossible for it to run
-    %% too far ahead than the real time clock.
-    ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}),
-    {Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
-    Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
+    ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}),
+    {Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
+    Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}),
     State = State0#{latest := Latest},
     set_ts(DBShard, Latest),
-    %% TODO: Need to measure effects of changing frequency of `release_cursor`.
-    Effect = {release_cursor, RaftIdx, State},
-    {State, Result, Effect};
+    Effects = try_release_log(Stats, RaftMeta, State),
+    Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
+    {State, Result, Effects};
 apply(
-    _RaftMeta,
+    RaftMeta,
     #{?tag := add_generation, ?since := Since},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
     ?tp(
         info,
-        ds_replication_layer_add_generation,
+        ds_ra_add_generation,
         #{
             shard => DBShard,
             since => Since
@@ -784,15 +807,17 @@ apply(
     Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
     State = State0#{latest := Latest},
     set_ts(DBShard, Latest),
-    {State, Result};
+    Effects = release_log(RaftMeta, State),
+    Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
+    {State, Result, Effects};
 apply(
-    _RaftMeta,
+    RaftMeta,
     #{?tag := update_config, ?since := Since, ?config := Opts},
     #{db_shard := DBShard, latest := Latest0} = State0
 ) ->
     ?tp(
         notice,
-        ds_replication_layer_update_config,
+        ds_ra_update_config,
         #{
             shard => DBShard,
             config => Opts,
@@ -802,7 +827,9 @@ apply(
     {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
     Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
     State = State0#{latest := Latest},
-    {State, Result};
+    Effects = release_log(RaftMeta, State),
+    Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
+    {State, Result, Effects};
 apply(
     _RaftMeta,
     #{?tag := drop_generation, ?generation := GenId},
@@ -810,7 +837,7 @@ apply(
 ) ->
     ?tp(
         info,
-        ds_replication_layer_drop_generation,
+        ds_ra_drop_generation,
         #{
             shard => DBShard,
             generation => GenId
@@ -827,7 +854,7 @@ apply(
     set_ts(DBShard, Latest),
     ?tp(
         debug,
-        emqx_ds_replication_layer_storage_event,
+        ds_ra_storage_event,
         #{
             shard => DBShard, payload => CustomEvent, latest => Latest
         }
@@ -835,27 +862,83 @@ apply(
     Effects = handle_custom_event(DBShard, Latest, CustomEvent),
     {State#{latest => Latest}, ok, Effects}.
 
+try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) ->
+    %% NOTE
+    %% Because cursor release means storage flush (see
+    %% `emqx_ds_replication_snapshot:write/3`), we should do that not too often
+    %% (so the storage is happy with L0 SST sizes) and not too rarely (so we don't
+    %% accumulate huge Raft logs).
+    case inc_bytes_need_release(BatchSize) of
+        AccSize when AccSize > ?RA_RELEASE_LOG_APPROX_SIZE ->
+            release_log(RaftMeta, State);
+        _NotYet ->
+            case get_log_need_release(RaftMeta) of
+                undefined ->
+                    [];
+                PrevIdx when CurrentIdx - PrevIdx > ?RA_RELEASE_LOG_MIN_FREQ ->
+                    %% Release everything up to the last log entry, but only if there were
+                    %% more than %% `?RA_RELEASE_LOG_MIN_FREQ` new entries since the last
+                    %% release.
+                    release_log(RaftMeta, State);
+                _ ->
+                    []
+            end
+    end.
+
+release_log(RaftMeta = #{index := CurrentIdx}, State) ->
+    %% NOTE
+    %% Release everything up to the last log entry. This is important: any log entries
+    %% following `CurrentIdx` should not contribute to `State` (that will be recovered
+    %% from a snapshot).
+    update_log_need_release(RaftMeta),
+    reset_bytes_need_release(),
+    {release_cursor, CurrentIdx, State}.
+
+get_log_need_release(RaftMeta) ->
+    case erlang:get(?pd_ra_idx_need_release) of
+        undefined ->
+            update_log_need_release(RaftMeta),
+            undefined;
+        LastIdx ->
+            LastIdx
+    end.
+
+update_log_need_release(#{index := CurrentIdx}) ->
+    erlang:put(?pd_ra_idx_need_release, CurrentIdx).
+
+get_bytes_need_release() ->
+    emqx_maybe:define(erlang:get(?pd_ra_bytes_need_release), 0).
+
+inc_bytes_need_release(Size) ->
+    Acc = get_bytes_need_release() + Size,
+    erlang:put(?pd_ra_bytes_need_release, Acc),
+    Acc.
+
+reset_bytes_need_release() ->
+    erlang:put(?pd_ra_bytes_need_release, 0).
+
 -spec tick(integer(), ra_state()) -> ra_machine:effects().
 tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
     %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
     {Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest),
-    ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}),
+    ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
     handle_custom_event(DBShard, Timestamp, tick).
 
 assign_timestamps(Latest, Messages) ->
-    assign_timestamps(Latest, Messages, []).
+    assign_timestamps(Latest, Messages, [], 0, 0).
 
-assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
-    case emqx_message:timestamp(MessageIn, microsecond) of
-        TimestampUs when TimestampUs > Latest ->
-            Message = assign_timestamp(TimestampUs, MessageIn),
-            assign_timestamps(TimestampUs, Rest, [Message | Acc]);
+assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
+    case emqx_message:timestamp(Message0, microsecond) of
+        TimestampUs when TimestampUs > Latest0 ->
+            Latest = TimestampUs,
+            Message = assign_timestamp(TimestampUs, Message0);
         _Earlier ->
-            Message = assign_timestamp(Latest + 1, MessageIn),
-            assign_timestamps(Latest + 1, Rest, [Message | Acc])
-    end;
-assign_timestamps(Latest, [], Acc) ->
-    {Latest, lists:reverse(Acc)}.
+            Latest = Latest0 + 1,
+            Message = assign_timestamp(Latest, Message0)
+    end,
+    assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0));
+assign_timestamps(Latest, [], Acc, N, Size) ->
+    {{N, Size}, Latest, lists:reverse(Acc)}.
 
 assign_timestamp(TimestampUs, Message) ->
     {TimestampUs, Message}.
@@ -888,3 +971,26 @@ handle_custom_event(DBShard, Latest, Event) ->
 
 set_ts({DB, Shard}, TS) ->
     emqx_ds_builtin_raft_sup:set_gvar(DB, ?gv_timestamp(Shard), TS).
+
+%%
+
+-spec state_enter(ra_server:ra_state() | eol, ra_state()) -> ra_machine:effects().
+state_enter(MemberState, #{db_shard := {DB, Shard}, latest := Latest}) ->
+    ?tp(
+        ds_ra_state_enter,
+        #{db => DB, shard => Shard, latest => Latest, state => MemberState}
+    ),
+    [].
+
+%%
+
+approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) ->
+    %% NOTE: Overhead here is basically few empty maps + 8-byte message id.
+    %% TODO: Probably need to ask the storage layer about the footprint.
+    MinOverhead = 40,
+    MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload).
+
+clientid_size(ClientID) when is_binary(ClientID) ->
+    byte_size(ClientID);
+clientid_size(ClientID) ->
+    erlang:external_size(ClientID).

+ 52 - 18
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl

@@ -4,6 +4,8 @@
 
 -module(emqx_ds_replication_layer_shard).
 
+-include_lib("snabbkaffe/include/trace.hrl").
+
 %% API:
 -export([start_link/3]).
 
@@ -19,6 +21,12 @@
     servers/3
 ]).
 
+%% Safe Process Command API
+-export([
+    process_command/3,
+    try_servers/3
+]).
+
 %% Membership
 -export([
     add_local_server/2,
@@ -37,6 +45,12 @@
 
 -type server() :: ra:server_id().
 
+-type server_error() :: server_error(none()).
+-type server_error(Reason) ::
+    {timeout, server()}
+    | {error, server(), Reason}
+    | {error, servers_unreachable}.
+
 -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
 
 %%
@@ -146,6 +160,40 @@ local_site() ->
 
 %%
 
+-spec process_command([server()], _Command, timeout()) ->
+    {ok, _Result, _Leader :: server()} | server_error().
+process_command(Servers, Command, Timeout) ->
+    try_servers(Servers, fun ra:process_command/3, [Command, Timeout]).
+
+-spec try_servers([server()], function(), [_Arg]) ->
+    {ok, _Result, _Leader :: server()} | server_error(_Reason).
+try_servers([Server | Rest], Fun, Args) ->
+    case is_server_online(Server) andalso erlang:apply(Fun, [Server | Args]) of
+        {ok, R, Leader} ->
+            {ok, R, Leader};
+        _Online = false ->
+            ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => offline}),
+            try_servers(Rest, Fun, Args);
+        {error, Reason = noproc} ->
+            ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => Reason}),
+            try_servers(Rest, Fun, Args);
+        {error, Reason} when Reason =:= nodedown orelse Reason =:= shutdown ->
+            %% NOTE
+            %% Conceptually, those error conditions basically mean the same as a plain
+            %% timeout: "it's impossible to tell if operation has succeeded or not".
+            ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => Reason}),
+            {timeout, Server};
+        {timeout, _} = Timeout ->
+            ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => timeout}),
+            Timeout;
+        {error, Reason} ->
+            {error, Server, Reason}
+    end;
+try_servers([], _Fun, _Args) ->
+    {error, servers_unreachable}.
+
+%%
+
 %% @doc Add a local server to the shard cluster.
 %% It's recommended to have the local server running before calling this function.
 %% This function is idempotent.
@@ -174,10 +222,10 @@ add_local_server(DB, Shard) ->
             }
     end,
     Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT,
-    case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of
+    case try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of
         {ok, _, _Leader} ->
             ok;
-        {error, already_member} ->
+        {error, _Server, already_member} ->
             ok;
         Error ->
             {error, recoverable, Error}
@@ -208,10 +256,10 @@ drop_local_server(DB, Shard) ->
 remove_server(DB, Shard, Server) ->
     ShardServers = shard_servers(DB, Shard),
     Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT,
-    case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of
+    case try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of
         {ok, _, _Leader} ->
             ok;
-        {error, not_member} ->
+        {error, _Server, not_member} ->
             ok;
         Error ->
             {error, recoverable, Error}
@@ -261,20 +309,6 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership}
 member_readiness(#{}) ->
     unknown.
 
-%%
-
-ra_try_servers([Server | Rest], Fun, Args) ->
-    case erlang:apply(Fun, [Server | Args]) of
-        {ok, R, Leader} ->
-            {ok, R, Leader};
-        {error, Reason} when Reason == noproc; Reason == nodedown ->
-            ra_try_servers(Rest, Fun, Args);
-        ErrorOrTimeout ->
-            ErrorOrTimeout
-    end;
-ra_try_servers([], _Fun, _Args) ->
-    {error, servers_unreachable}.
-
 ra_overview(Server) ->
     case ra:member_overview(Server) of
         {ok, Overview, _Leader} ->

+ 6 - 3
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl

@@ -69,6 +69,8 @@ prepare(Index, State) ->
 -spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) ->
     ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
 write(Dir, Meta, MachineState) ->
+    ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}),
+    ok = emqx_ds_storage_layer:flush(shard_id(MachineState)),
     ra_log_snapshot:write(Dir, Meta, MachineState).
 
 %% Reading a snapshot.
@@ -165,6 +167,7 @@ complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) ->
 -spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) ->
     {ok, ws()}.
 begin_accept(Dir, Meta) ->
+    ?tp(dsrepl_snapshot_accept_started, #{meta => Meta}),
     WS = #ws{
         phase = machine_state,
         started_at = erlang:monotonic_time(millisecond),
@@ -207,7 +210,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0})
             ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}),
             _ = emqx_ds_storage_snapshot:release_writer(SnapWriter),
             Result = complete_accept(WS#ws{writer = SnapWriter}),
-            ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}),
+            ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS), state => WS#ws.state}),
             Result;
         {error, Reason} ->
             ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
@@ -218,7 +221,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0})
 complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
     ShardId = shard_id(WS),
     logger:info(#{
-        msg => "dsrepl_snapshot_read_complete",
+        msg => "dsrepl_snapshot_write_complete",
         shard => ShardId,
         duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
         bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter)
@@ -227,7 +230,7 @@ complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
     write_machine_snapshot(WS).
 
 write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
-    write(Dir, Meta, MachineState).
+    ra_log_snapshot:write(Dir, Meta, MachineState).
 
 %% Restoring machine state from a snapshot.
 %% This is equivalent to restoring from a log snapshot.

+ 116 - 3
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -140,6 +140,7 @@ t_replication_transfers_snapshots(Config) ->
 
             %% Stop the DB on the "offline" node.
             ok = emqx_cth_cluster:stop_node(NodeOffline),
+            _ = ?block_until(#{?snk_kind := ds_ra_state_enter, state := leader}, 500, 0),
 
             %% Fill the storage with messages and few additional generations.
             emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),
@@ -232,14 +233,14 @@ t_rebalance(Config) ->
             ],
             Stream1 = emqx_utils_stream:interleave(
                 [
-                    {10, Stream0},
+                    {20, Stream0},
                     emqx_utils_stream:const(add_generation)
                 ],
                 false
             ),
             Stream = emqx_utils_stream:interleave(
                 [
-                    {50, Stream0},
+                    {50, Stream1},
                     emqx_utils_stream:list(Sequence)
                 ],
                 true
@@ -604,7 +605,7 @@ t_drop_generation(Config) ->
         after
             emqx_cth_cluster:stop(Nodes)
         end,
-        fun(Trace) ->
+        fun(_Trace) ->
             %% TODO: some idempotency errors still happen
             %% ?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)),
             true
@@ -794,6 +795,118 @@ t_store_batch_fail(_Config) ->
         ]
     ).
 
+t_crash_restart_recover(init, Config) ->
+    Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
+    Specs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {t_crash_stop_recover1, #{apps => Apps}},
+            {t_crash_stop_recover2, #{apps => Apps}},
+            {t_crash_stop_recover3, #{apps => Apps}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+    ),
+    Nodes = emqx_cth_cluster:start(Specs),
+    [{nodes, Nodes}, {nodespecs, Specs} | Config];
+t_crash_restart_recover('end', Config) ->
+    ok = emqx_cth_cluster:stop(?config(nodes, Config)).
+
+t_crash_restart_recover(Config) ->
+    %% This testcase verifies that in the event of abrupt site failure message data is
+    %% correctly preserved.
+    Nodes = [N1, N2, N3] = ?config(nodes, Config),
+    _Specs = [_, NS2, NS3] = ?config(nodespecs, Config),
+    DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}),
+
+    %% Prepare test event stream.
+    NMsgs = 400,
+    NClients = 8,
+    {Stream0, TopicStreams} =
+        emqx_ds_test_helpers:interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs),
+    Stream1 = emqx_utils_stream:interleave(
+        [
+            {300, Stream0},
+            emqx_utils_stream:const(add_generation)
+        ],
+        false
+    ),
+    Stream = emqx_utils_stream:interleave(
+        [
+            {1000, Stream1},
+            emqx_utils_stream:list([
+                fun() -> kill_restart_node_async(N2, NS2, DBOpts) end,
+                fun() -> kill_restart_node_async(N3, NS3, DBOpts) end
+            ])
+        ],
+        true
+    ),
+
+    ?check_trace(
+        begin
+            %% Initialize DB on all nodes.
+            ?assertEqual(
+                [{ok, ok} || _ <- Nodes],
+                erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts])
+            ),
+
+            %% Apply the test events, including simulated node crashes.
+            NodeStream = emqx_utils_stream:const(N1),
+            emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
+
+            %% It's expected to lose few messages when leaders are abruptly killed.
+            MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}),
+            {ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity),
+            {timeout, Events} = snabbkaffe:receive_events(SubRef),
+            LostMessages = [M || #{batch := Messages} <- Events, M <- Messages],
+            ct:pal("Some messages were lost: ~p", [LostMessages]),
+            ?assert(length(LostMessages) < NMsgs div 20),
+
+            %% Verify that all the successfully persisted messages are there.
+            VerifyClient = fun({ClientId, ExpectedStream}) ->
+                Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
+                ClientNodes = nodes_of_clientid(ClientId, Nodes),
+                DSStream1 = ds_topic_stream(ClientId, Topic, hd(ClientNodes)),
+                %% Do nodes contain same messages for a client?
+                lists:foreach(
+                    fun(ClientNode) ->
+                        DSStream = ds_topic_stream(ClientId, Topic, ClientNode),
+                        ?defer_assert(emqx_ds_test_helpers:diff_messages(DSStream1, DSStream))
+                    end,
+                    tl(ClientNodes)
+                ),
+                %% Does any messages were lost unexpectedly?
+                {_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)),
+                ExpectedMessages = emqx_utils_stream:consume(ExpectedStream),
+                MissingMessages = ExpectedMessages -- DSMessages,
+                ?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages))
+            end,
+            lists:foreach(VerifyClient, TopicStreams)
+        end,
+        []
+    ).
+
+nodes_of_clientid(ClientId, Nodes) ->
+    emqx_ds_test_helpers:nodes_of_clientid(?DB, ClientId, Nodes).
+
+ds_topic_stream(ClientId, ClientTopic, Node) ->
+    emqx_ds_test_helpers:ds_topic_stream(?DB, ClientId, ClientTopic, Node).
+
+is_message_lost(Message, MessagesLost) ->
+    lists:any(
+        fun(ML) ->
+            emqx_ds_test_helpers:message_eq([clientid, topic, payload], Message, ML)
+        end,
+        MessagesLost
+    ).
+
+kill_restart_node_async(Node, Spec, DBOpts) ->
+    erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]).
+
+kill_restart_node(Node, Spec, DBOpts) ->
+    ok = emqx_cth_peer:kill(Node),
+    ?tp(test_cluster_node_killed, #{node => Node}),
+    _ = emqx_cth_cluster:restart(Spec),
+    ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]).
+
 %%
 
 shard_server_info(Node, DB, Shard, Site, Info) ->

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_buffer.erl

@@ -314,7 +314,7 @@ do_flush(
             ?tp(
                 debug,
                 emqx_ds_buffer_flush_failed,
-                #{db => DB, shard => Shard, error => Err}
+                #{db => DB, shard => Shard, batch => Messages, error => Err}
             ),
             emqx_ds_builtin_metrics:inc_buffer_batches_failed(Metrics),
             Reply =

+ 16 - 6
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -29,7 +29,7 @@
     open/5,
     drop/5,
     prepare_batch/4,
-    commit_batch/3,
+    commit_batch/4,
     get_streams/4,
     get_delete_streams/4,
     make_iterator/5,
@@ -270,7 +270,7 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
     emqx_ds_storage_layer:shard_id(),
     s(),
     [{emqx_ds:time(), emqx_types:message()}, ...],
-    emqx_ds:message_store_opts()
+    emqx_ds_storage_layer:batch_store_opts()
 ) ->
     {ok, cooked_batch()}.
 prepare_batch(_ShardId, S, Messages, _Options) ->
@@ -294,12 +294,14 @@ prepare_batch(_ShardId, S, Messages, _Options) ->
 -spec commit_batch(
     emqx_ds_storage_layer:shard_id(),
     s(),
-    cooked_batch()
+    cooked_batch(),
+    emqx_ds_storage_layer:batch_store_opts()
 ) -> ok | emqx_ds:error(_).
 commit_batch(
     _ShardId,
     _Data,
-    #{?cooked_payloads := [], ?cooked_lts_ops := LTS}
+    #{?cooked_payloads := [], ?cooked_lts_ops := LTS},
+    _Options
 ) ->
     %% Assert:
     [] = LTS,
@@ -307,7 +309,8 @@ commit_batch(
 commit_batch(
     _ShardId,
     #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
-    #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}
+    #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs},
+    Options
 ) ->
     {ok, Batch} = rocksdb:batch(),
     %% Commit LTS trie to the storage:
@@ -326,7 +329,7 @@ commit_batch(
         end,
         Payloads
     ),
-    Result = rocksdb:write_batch(DB, Batch, []),
+    Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
     rocksdb:release_batch(Batch),
     ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}),
     %% NOTE
@@ -964,6 +967,13 @@ pop_lts_persist_ops() ->
             L
     end.
 
+-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) ->
+    _RocksDBOpts :: [{atom(), _}].
+write_batch_opts(#{durable := false}) ->
+    [{disable_wal, true}];
+write_batch_opts(#{}) ->
+    [].
+
 -ifdef(TEST).
 
 serialize(Msg) ->

+ 196 - 33
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -27,7 +27,7 @@
     %% Data
     store_batch/3,
     prepare_batch/3,
-    commit_batch/2,
+    commit_batch/3,
 
     get_streams/3,
     get_delete_streams/3,
@@ -44,6 +44,7 @@
     drop_generation/2,
 
     %% Snapshotting
+    flush/1,
     take_snapshot/1,
     accept_snapshot/1,
 
@@ -69,7 +70,8 @@
     shard_id/0,
     options/0,
     prototype/0,
-    cooked_batch/0
+    cooked_batch/0,
+    batch_store_opts/0
 ]).
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -108,10 +110,28 @@
 
 -type shard_id() :: {emqx_ds:db(), binary()}.
 
--type cf_refs() :: [{string(), rocksdb:cf_handle()}].
+-type cf_ref() :: {string(), rocksdb:cf_handle()}.
+-type cf_refs() :: [cf_ref()].
 
 -type gen_id() :: 0..16#ffff.
 
+%% Options affecting how batches should be stored.
+%% See also: `emqx_ds:message_store_opts()'.
+-type batch_store_opts() ::
+    #{
+        %% Whether the whole batch given to `store_batch' should be inserted atomically as
+        %% a unit. Default: `false'.
+        atomic => boolean(),
+        %% Should the storage make sure that the batch is written durably? Non-durable
+        %% writes are in general unsafe but require much less resources, i.e. with RocksDB
+        %% non-durable (WAL-less) writes do not usually involve _any_ disk I/O.
+        %% Default: `true'.
+        durable => boolean()
+    }.
+
+%% Options affecting how batches should be prepared.
+-type batch_prepare_opts() :: #{}.
+
 %% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6
 -opaque stream_v1() ::
     #{
@@ -159,7 +179,7 @@
     %% Module-specific data defined at generation creation time:
     data := Data,
     %% Column families used by this generation
-    cf_refs := cf_refs(),
+    cf_names := [string()],
     %% Time at which this was created.  Might differ from `since', in particular for the
     %% first generation.
     created_at := emqx_message:timestamp(),
@@ -225,14 +245,15 @@
     shard_id(),
     generation_data(),
     [{emqx_ds:time(), emqx_types:message()}, ...],
-    emqx_ds:message_store_opts()
+    batch_store_opts()
 ) ->
     {ok, term()} | emqx_ds:error(_).
 
 -callback commit_batch(
     shard_id(),
     generation_data(),
-    _CookedBatch
+    _CookedBatch,
+    batch_store_opts()
 ) -> ok | emqx_ds:error(_).
 
 -callback get_streams(
@@ -279,6 +300,7 @@
 -record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
 -record(call_list_generations_with_lifetimes, {}).
 -record(call_drop_generation, {gen_id :: gen_id()}).
+-record(call_flush, {}).
 -record(call_take_snapshot, {}).
 
 -spec drop_shard(shard_id()) -> ok.
@@ -288,16 +310,13 @@ drop_shard(Shard) ->
 -spec store_batch(
     shard_id(),
     [{emqx_ds:time(), emqx_types:message()}],
-    emqx_ds:message_store_opts()
+    batch_store_opts()
 ) ->
     emqx_ds:store_batch_result().
 store_batch(Shard, Messages, Options) ->
-    ?tp(emqx_ds_storage_layer_store_batch, #{
-        shard => Shard, messages => Messages, options => Options
-    }),
-    case prepare_batch(Shard, Messages, Options) of
+    case prepare_batch(Shard, Messages, #{}) of
         {ok, CookedBatch} ->
-            commit_batch(Shard, CookedBatch);
+            commit_batch(Shard, CookedBatch, Options);
         ignore ->
             ok;
         Error = {error, _, _} ->
@@ -307,7 +326,7 @@ store_batch(Shard, Messages, Options) ->
 -spec prepare_batch(
     shard_id(),
     [{emqx_ds:time(), emqx_types:message()}],
-    emqx_ds:message_store_opts()
+    batch_prepare_opts()
 ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
 prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
     %% NOTE
@@ -336,11 +355,15 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
 prepare_batch(_Shard, [], _Options) ->
     ignore.
 
--spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result().
-commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) ->
+-spec commit_batch(
+    shard_id(),
+    cooked_batch(),
+    batch_store_opts()
+) -> emqx_ds:store_batch_result().
+commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) ->
     #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard),
     T0 = erlang:monotonic_time(microsecond),
-    Result = Mod:commit_batch(Shard, GenData, CookedBatch),
+    Result = Mod:commit_batch(Shard, GenData, CookedBatch, Options),
     T1 = erlang:monotonic_time(microsecond),
     emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
     Result.
@@ -539,6 +562,10 @@ shard_info(ShardId, status) ->
         error:badarg -> down
     end.
 
+-spec flush(shard_id()) -> ok | {error, _}.
+flush(ShardId) ->
+    gen_server:call(?REF(ShardId), #call_flush{}, infinity).
+
 -spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}.
 take_snapshot(ShardId) ->
     case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of
@@ -566,6 +593,7 @@ start_link(Shard = {_, _}, Options) ->
     shard_id :: shard_id(),
     db :: rocksdb:db_handle(),
     cf_refs :: cf_refs(),
+    cf_need_flush :: gen_id(),
     schema :: shard_schema(),
     shard :: shard()
 }).
@@ -591,10 +619,12 @@ init({ShardId, Options}) ->
                 {Scm, CFRefs0}
         end,
     Shard = open_shard(ShardId, DB, CFRefs, Schema),
+    CurrentGenId = maps:get(current_generation, Schema),
     S = #s{
         shard_id = ShardId,
         db = DB,
         cf_refs = CFRefs,
+        cf_need_flush = CurrentGenId,
         schema = Schema,
         shard = Shard
     },
@@ -635,6 +665,9 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
 handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
     {Reply, S} = handle_drop_generation(S0, GenId),
     {reply, Reply, S};
+handle_call(#call_flush{}, _From, S0) ->
+    {Reply, S} = handle_flush(S0),
+    {reply, Reply, S};
 handle_call(#call_take_snapshot{}, _From, S) ->
     Snapshot = handle_take_snapshot(S),
     {reply, Snapshot, S};
@@ -750,9 +783,9 @@ handle_drop_generation(S0, GenId) ->
     #s{
         shard_id = ShardId,
         db = DB,
-        schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema,
-        shard = OldShard,
-        cf_refs = OldCFRefs
+        schema = #{?GEN_KEY(GenId) := GenSchema} = Schema0,
+        shard = #{?GEN_KEY(GenId) := #{data := RuntimeData}} = Shard0,
+        cf_refs = CFRefs0
     } = S0,
     %% 1. Commit the metadata first, so other functions are less
     %% likely to see stale data, and replicas don't end up
@@ -761,16 +794,16 @@ handle_drop_generation(S0, GenId) ->
     %%
     %% Note: in theory, this operation may be interrupted in the
     %% middle. This will leave column families hanging.
-    Shard = maps:remove(?GEN_KEY(GenId), OldShard),
-    Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
+    Shard = maps:remove(?GEN_KEY(GenId), Shard0),
+    Schema = maps:remove(?GEN_KEY(GenId), Schema0),
     S1 = S0#s{
         shard = Shard,
         schema = Schema
     },
     commit_metadata(S1),
     %% 2. Now, actually drop the data from RocksDB:
-    #{module := Mod, cf_refs := GenCFRefs} = GenSchema,
-    #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
+    #{module := Mod, cf_names := GenCFNames} = GenSchema,
+    GenCFRefs = [cf_ref(Name, CFRefs0) || Name <- GenCFNames],
     try
         Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData)
     catch
@@ -787,7 +820,7 @@ handle_drop_generation(S0, GenId) ->
                 }
             )
     end,
-    CFRefs = OldCFRefs -- GenCFRefs,
+    CFRefs = CFRefs0 -- GenCFRefs,
     S = S1#s{cf_refs = CFRefs},
     {ok, S}.
 
@@ -839,7 +872,7 @@ new_generation(ShardId, DB, Schema0, Shard0, Since) ->
     GenSchema = #{
         module => Mod,
         data => GenData,
-        cf_refs => NewCFRefs,
+        cf_names => cf_names(NewCFRefs),
         created_at => erlang:system_time(millisecond),
         since => Since,
         until => undefined
@@ -866,6 +899,10 @@ rocksdb_open(Shard, Options) ->
     DBOptions = [
         {create_if_missing, true},
         {create_missing_column_families, true},
+        %% NOTE
+        %% With WAL-less writes, it's important to have CFs flushed atomically.
+        %% For example, bitfield-lts backend needs data + trie CFs to be consistent.
+        {atomic_flush, true},
         {enable_write_thread_adaptive_yield, false}
         | maps:get(db_options, Options, [])
     ],
@@ -921,6 +958,34 @@ update_last_until(Schema = #{current_generation := GenId}, Until) ->
             {error, overlaps_existing_generations}
     end.
 
+handle_flush(S = #s{db = DB, cf_refs = CFRefs, cf_need_flush = NeedFlushGenId, shard = Shard}) ->
+    %% NOTE
+    %% There could have been few generations added since the last time `flush/1` was
+    %% called. Strictly speaking, we don't need to flush them all at once as part of
+    %% a single atomic flush, but the error handling is a bit easier this way.
+    CurrentGenId = maps:get(current_generation, Shard),
+    GenIds = lists:seq(NeedFlushGenId, CurrentGenId),
+    CFHandles = lists:flatmap(
+        fun(GenId) ->
+            case Shard of
+                #{?GEN_KEY(GenId) := #{cf_names := CFNames}} ->
+                    [cf_handle(N, CFRefs) || N <- CFNames];
+                #{} ->
+                    %% Generation was probably dropped.
+                    []
+            end
+        end,
+        GenIds
+    ),
+    case rocksdb:flush(DB, CFHandles, [{wait, true}]) of
+        ok ->
+            %% Current generation will always need a flush.
+            ?tp(ds_storage_flush_complete, #{gens => GenIds, cfs => CFHandles}),
+            {ok, S#s{cf_need_flush = CurrentGenId}};
+        {error, _} = Error ->
+            {Error, S}
+    end.
+
 handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
     Name = integer_to_list(erlang:system_time(millisecond)),
     Dir = checkpoint_dir(ShardId, Name),
@@ -954,6 +1019,21 @@ handle_event(Shard, Time, Event) ->
     GenId = generation_current(Shard),
     handle_event(Shard, Time, ?mk_storage_event(GenId, Event)).
 
+%%--------------------------------------------------------------------------------
+
+-spec cf_names(cf_refs()) -> [string()].
+cf_names(CFRefs) ->
+    {CFNames, _CFHandles} = lists:unzip(CFRefs),
+    CFNames.
+
+-spec cf_ref(_Name :: string(), cf_refs()) -> cf_ref().
+cf_ref(Name, CFRefs) ->
+    lists:keyfind(Name, 1, CFRefs).
+
+-spec cf_handle(_Name :: string(), cf_refs()) -> rocksdb:cf_handle().
+cf_handle(Name, CFRefs) ->
+    element(2, cf_ref(Name, CFRefs)).
+
 %%--------------------------------------------------------------------------------
 %% Schema access
 %%--------------------------------------------------------------------------------
@@ -1041,23 +1121,106 @@ erase_schema_runtime(Shard) ->
 
 -undef(PERSISTENT_TERM).
 
--define(ROCKSDB_SCHEMA_KEY, <<"schema_v1">>).
+-define(ROCKSDB_SCHEMA_KEY(V), <<"schema_", V>>).
+
+-define(ROCKSDB_SCHEMA_KEY, ?ROCKSDB_SCHEMA_KEY("v2")).
+-define(ROCKSDB_SCHEMA_KEYS, [
+    ?ROCKSDB_SCHEMA_KEY,
+    ?ROCKSDB_SCHEMA_KEY("v1")
+]).
 
 -spec get_schema_persistent(rocksdb:db_handle()) -> shard_schema() | not_found.
 get_schema_persistent(DB) ->
-    case rocksdb:get(DB, ?ROCKSDB_SCHEMA_KEY, []) of
+    get_schema_persistent(DB, ?ROCKSDB_SCHEMA_KEYS).
+
+get_schema_persistent(DB, [Key | Rest]) ->
+    case rocksdb:get(DB, Key, []) of
         {ok, Blob} ->
-            Schema = binary_to_term(Blob),
-            %% Sanity check:
-            #{current_generation := _, prototype := _} = Schema,
-            Schema;
+            deserialize_schema(Key, Blob);
         not_found ->
-            not_found
-    end.
+            get_schema_persistent(DB, Rest)
+    end;
+get_schema_persistent(_DB, []) ->
+    not_found.
 
 -spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok.
 put_schema_persistent(DB, Schema) ->
     Blob = term_to_binary(Schema),
     rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []).
 
+-spec deserialize_schema(_SchemaVsn :: binary(), binary()) -> shard_schema().
+deserialize_schema(SchemaVsn, Blob) ->
+    %% Sanity check:
+    Schema = #{current_generation := _, prototype := _} = binary_to_term(Blob),
+    decode_schema(SchemaVsn, Schema).
+
+decode_schema(?ROCKSDB_SCHEMA_KEY, Schema) ->
+    Schema;
+decode_schema(?ROCKSDB_SCHEMA_KEY("v1"), Schema) ->
+    maps:map(fun decode_schema_v1/2, Schema).
+
+decode_schema_v1(?GEN_KEY(_), Generation = #{}) ->
+    decode_generation_schema_v1(Generation);
+decode_schema_v1(_, V) ->
+    V.
+
+decode_generation_schema_v1(SchemaV1 = #{cf_refs := CFRefs}) ->
+    %% Drop potentially dead CF references from the time generation was created.
+    Schema = maps:remove(cf_refs, SchemaV1),
+    Schema#{cf_names => cf_names(CFRefs)};
+decode_generation_schema_v1(Schema = #{}) ->
+    Schema.
+
+%%--------------------------------------------------------------------------------
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+decode_schema_v1_test() ->
+    SchemaV1 = #{
+        current_generation => 42,
+        prototype => {emqx_ds_storage_reference, #{}},
+        ?GEN_KEY(41) => #{
+            module => emqx_ds_storage_reference,
+            data => {schema},
+            cf_refs => [{"emqx_ds_storage_reference41", erlang:make_ref()}],
+            created_at => 12345,
+            since => 0,
+            until => 123456
+        },
+        ?GEN_KEY(42) => #{
+            module => emqx_ds_storage_reference,
+            data => {schema},
+            cf_refs => [{"emqx_ds_storage_reference42", erlang:make_ref()}],
+            created_at => 54321,
+            since => 123456,
+            until => undefined
+        }
+    },
+    ?assertEqual(
+        #{
+            current_generation => 42,
+            prototype => {emqx_ds_storage_reference, #{}},
+            ?GEN_KEY(41) => #{
+                module => emqx_ds_storage_reference,
+                data => {schema},
+                cf_names => ["emqx_ds_storage_reference41"],
+                created_at => 12345,
+                since => 0,
+                until => 123456
+            },
+            ?GEN_KEY(42) => #{
+                module => emqx_ds_storage_reference,
+                data => {schema},
+                cf_names => ["emqx_ds_storage_reference42"],
+                created_at => 54321,
+                since => 123456,
+                until => undefined
+            }
+        },
+        deserialize_schema(?ROCKSDB_SCHEMA_KEY("v1"), term_to_binary(SchemaV1))
+    ).
+
+-endif.
+
 -undef(ROCKSDB_SCHEMA_KEY).

+ 10 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -32,7 +32,7 @@
     open/5,
     drop/5,
     prepare_batch/4,
-    commit_batch/3,
+    commit_batch/4,
     get_streams/4,
     get_delete_streams/4,
     make_iterator/5,
@@ -105,7 +105,7 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
 prepare_batch(_ShardId, _Data, Messages, _Options) ->
     {ok, Messages}.
 
-commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) ->
+commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) ->
     {ok, Batch} = rocksdb:batch(),
     lists:foreach(
         fun({TS, Msg}) ->
@@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) ->
         end,
         Messages
     ),
-    Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
+    Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
     rocksdb:release_batch(Batch),
     Res.
 
@@ -284,3 +284,10 @@ do_delete_next(
 -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
 data_cf(GenId) ->
     "emqx_ds_storage_reference" ++ integer_to_list(GenId).
+
+-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) ->
+    _RocksDBOpts :: [{atom(), _}].
+write_batch_opts(#{durable := false}) ->
+    [{disable_wal, true}];
+write_batch_opts(#{}) ->
+    [].

+ 8 - 8
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -64,7 +64,7 @@ t_iterate(_Config) ->
         {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
-    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
     %% Iterate through individual topics:
     [
         begin
@@ -94,7 +94,7 @@ t_delete(_Config) ->
         {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
-    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
 
     %% Iterate through topics:
     StartTime = 0,
@@ -125,7 +125,7 @@ t_get_streams(_Config) ->
         {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
-    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
     GetStream = fun(Topic) ->
         StartTime = 0,
         emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime)
@@ -152,7 +152,7 @@ t_get_streams(_Config) ->
         end
      || I <- lists:seq(1, 200)
     ],
-    ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []),
+    ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}),
     %% Check that "foo/bar/baz" topic now appears in two streams:
     %% "foo/bar/baz" and "foo/bar/+":
     NewStreams = lists:sort(GetStream("foo/bar/baz")),
@@ -180,7 +180,7 @@ t_new_generation_inherit_trie(_Config) ->
              || I <- lists:seq(1, 200),
                 Suffix <- [<<"foo">>, <<"bar">>]
             ],
-            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
             %% Now we create a new generation with the same LTS module.  It should inherit the
             %% learned trie.
             ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000),
@@ -194,7 +194,7 @@ t_new_generation_inherit_trie(_Config) ->
              || I <- lists:seq(1, 200),
                 Suffix <- [<<"foo">>, <<"bar">>]
             ],
-            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
             %% We should get only two streams for wildcard query, for "foo" and for "bar".
             ?assertMatch(
                 [_Foo, _Bar],
@@ -217,13 +217,13 @@ t_replay(_Config) ->
         {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
      || Topic <- Topics, PublishedAt <- Timestamps
     ],
-    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
+    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
     %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
     Batch2 = [
         {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))}
      || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
     ],
-    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
+    ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
     %% Check various topic filters:
     Messages = [M || {_TS, M} <- Batch1 ++ Batch2],
     %% Missing topics (no ghost messages):

+ 16 - 6
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -188,12 +188,14 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
             ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
             apply_stream(DB, NodeStream, Stream, N + 1);
         [add_generation | Stream] ->
-            %% FIXME:
+            ?tp(notice, test_add_generation, #{}),
             [Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
             ?ON(Node, emqx_ds:add_generation(DB)),
             apply_stream(DB, NodeStream, Stream, N);
         [{Node, Operation, Arg} | Stream] when
-            Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites
+            Operation =:= join_db_site;
+            Operation =:= leave_db_site;
+            Operation =:= assign_db_sites
         ->
             ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
             %% Apply the transition.
@@ -207,7 +209,12 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
             %% Give some time for at least one transition to complete.
             Transitions = transitions(Node, DB),
             ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
-            ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))),
+            case Transitions of
+                [_ | _] ->
+                    ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB)));
+                [] ->
+                    ok
+            end,
             apply_stream(DB, NodeStream0, Stream, N);
         [Fun | Stream] when is_function(Fun) ->
             Fun(),
@@ -259,15 +266,18 @@ verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) ->
     ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
     ?defer_assert(
         begin
-            snabbkaffe_diff:assert_lists_eq(
+            diff_messages(
                 ExpectedStream,
-                ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node),
-                message_diff_options([id, qos, from, flags, headers, topic, payload, extra])
+                ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node)
             ),
             ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
         end
     ).
 
+diff_messages(Expected, Got) ->
+    Fields = [id, qos, from, flags, headers, topic, payload, extra],
+    diff_messages(Fields, Expected, Got).
+
 diff_messages(Fields, Expected, Got) ->
     snabbkaffe_diff:assert_lists_eq(Expected, Got, message_diff_options(Fields)).
 

+ 1 - 1
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -190,7 +190,7 @@ transpose_tail(S, Tail) ->
 %% @doc Make a stream by concatenating multiple streams.
 -spec chain([stream(X)]) -> stream(X).
 chain(L) ->
-    lists:foldl(fun chain/2, empty(), L).
+    lists:foldr(fun chain/2, empty(), L).
 
 %% @doc Make a stream by chaining (concatenating) two streams.
 %% The second stream begins to produce values only after the first one is exhausted.

+ 1 - 1
mix.exs

@@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
       {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
       {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true},
-      {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true},
+      {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-6", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.19.4", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},

+ 1 - 1
rebar.config

@@ -82,7 +82,7 @@
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
-    {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}},
+    {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-6"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.4"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},