Przeglądaj źródła

feat(dsrepl): implement raft-based replication

Still very rough but mostly working.
Andrew Mayorov 2 lat temu
rodzic
commit
146f082fdc

+ 1 - 1
apps/emqx/src/emqx_ds_schema.erl

@@ -201,7 +201,7 @@ fields(layout_builtin_wildcard_optimized) ->
             sc(
                 range(0, 64),
                 #{
-                    default => 10,
+                    default => 20,
                     importance => ?IMPORTANCE_HIDDEN,
                     desc => ?DESC(wildcard_optimized_epoch_bits)
                 }

+ 5 - 0
apps/emqx/src/emqx_message.erl

@@ -66,6 +66,7 @@
 
 -export([
     is_expired/2,
+    set_timestamp/2,
     update_expiry/1,
     timestamp_now/0
 ]).
@@ -288,6 +289,10 @@ is_expired(#message{timestamp = CreatedAt}, Zone) ->
         Interval -> elapsed(CreatedAt) > Interval
     end.
 
+-spec set_timestamp(integer(), emqx_types:message()) -> emqx_types:message().
+set_timestamp(Timestamp, Msg) ->
+    Msg#message{timestamp = Timestamp}.
+
 -spec update_expiry(emqx_types:message()) -> emqx_types:message().
 update_expiry(
     Msg = #message{

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -323,7 +323,7 @@ subscribe(
             ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
             {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
             Subscription = #{
-                start_time => now_ms(),
+                start_time => emqx_ds:timestamp_us(),
                 props => SubOpts,
                 id => SubId,
                 deleted => false

+ 0 - 4
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -33,10 +33,6 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    %% avoid inter-suite flakiness...
-    %% TODO: remove after other suites start to use `emx_cth_suite'
-    application:stop(emqx),
-    application:stop(emqx_durable_storage),
     Config.
 
 end_per_suite(_Config) ->

+ 9 - 2
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -43,6 +43,7 @@
 
 %% Misc. API:
 -export([count/1]).
+-export([timestamp_us/0]).
 
 -export_type([
     create_db_opts/0,
@@ -147,9 +148,11 @@
 -type error(Reason) :: {error, recoverable | unrecoverable, Reason}.
 
 %% Timestamp
+%% Each message must have unique timestamp.
 %% Earliest possible timestamp is 0.
-%% TODO granularity?  Currently, we should always use milliseconds, as that's the unit we
-%% use in emqx_guid.  Otherwise, the iterators won't match the message timestamps.
+%% Granularity: microsecond.
+%% TODO: Currently, we should always use milliseconds, as that's the unit we
+%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
 -type time() :: non_neg_integer().
 
 -type message_store_opts() ::
@@ -394,6 +397,10 @@ count(DB) ->
 %% Internal exports
 %%================================================================================
 
+-spec timestamp_us() -> time().
+timestamp_us() ->
+    erlang:system_time(microsecond).
+
 %%================================================================================
 %% Internal functions
 %%================================================================================

+ 13 - 9
apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl

@@ -87,14 +87,6 @@ init({#?db_sup{db = DB}, DefaultOpts}) ->
     %% Spec for the top-level supervisor for the database:
     logger:notice("Starting DS DB ~p", [DB]),
     _ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
-    %% TODO: before the leader election is implemented, we set ourselves as the leader for all shards:
-    MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
-    lists:foreach(
-        fun(Shard) ->
-            emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard)
-        end,
-        MyShards
-    ),
     Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])],
     SupFlags = #{
         strategy => one_for_all,
@@ -106,7 +98,11 @@ init({#?shard_sup{db = DB}, _}) ->
     %% Spec for the supervisor that manages the worker processes for
     %% each local shard of the DB:
     MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
-    Children = [shard_spec(DB, Shard) || Shard <- MyShards],
+    Children = [
+        Child
+     || Shard <- MyShards,
+        Child <- [shard_spec(DB, Shard), shard_replication_spec(DB, Shard)]
+    ],
     SupFlags = #{
         strategy => one_for_one,
         intensity => 10,
@@ -154,6 +150,14 @@ shard_spec(DB, Shard) ->
         type => worker
     }.
 
+shard_replication_spec(DB, Shard) ->
+    #{
+        id => {Shard, replication},
+        start => {emqx_ds_replication_layer, ra_start_shard, [DB, Shard]},
+        restart => transient,
+        type => worker
+    }.
+
 egress_spec(DB, Shard) ->
     #{
         id => Shard,

+ 176 - 10
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -56,7 +56,16 @@
     do_drop_generation_v3/3,
     do_get_delete_streams_v4/4,
     do_make_delete_iterator_v4/5,
-    do_delete_next_v4/5
+    do_delete_next_v4/5,
+
+    %% FIXME
+    ra_start_shard/2,
+    ra_store_batch/3
+]).
+
+-export([
+    init/1,
+    apply/3
 ]).
 
 -export_type([
@@ -208,10 +217,9 @@ get_streams(DB, TopicFilter, StartTime) ->
     Shards = list_shards(DB),
     lists:flatmap(
         fun(Shard) ->
-            Node = node_of_shard(DB, Shard),
             Streams =
                 try
-                    emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime)
+                    ra_get_streams(DB, Shard, TopicFilter, StartTime)
                 catch
                     error:{erpc, _} ->
                         %% TODO: log?
@@ -251,8 +259,7 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
     emqx_ds:make_iterator_result(iterator()).
 make_iterator(DB, Stream, TopicFilter, StartTime) ->
     ?stream_v2(Shard, StorageStream) = Stream,
-    Node = node_of_shard(DB, Shard),
-    try emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
+    try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Error = {error, _, _} ->
@@ -282,8 +289,7 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
     emqx_ds:make_iterator_result(iterator()).
 update_iterator(DB, OldIter, DSKey) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
-    Node = node_of_shard(DB, Shard),
-    try emqx_ds_proto_v4:update_iterator(Node, DB, Shard, StorageIter, DSKey) of
+    try ra_update_iterator(DB, Shard, StorageIter, DSKey) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Error = {error, _, _} ->
@@ -296,7 +302,6 @@ update_iterator(DB, OldIter, DSKey) ->
 -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
 next(DB, Iter0, BatchSize) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0,
-    Node = node_of_shard(DB, Shard),
     %% TODO: iterator can contain information that is useful for
     %% reconstructing messages sent over the network. For example,
     %% when we send messages with the learned topic index, we could
@@ -305,7 +310,7 @@ next(DB, Iter0, BatchSize) ->
     %%
     %% This kind of trickery should be probably done here in the
     %% replication layer. Or, perhaps, in the logic layer.
-    case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of
+    case ra_next(DB, Shard, StorageIter0, BatchSize) of
         {ok, StorageIter, Batch} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, Batch};
@@ -379,7 +384,8 @@ do_drop_db_v1(DB) ->
     emqx_ds_builtin_sup:stop_db(DB),
     lists:foreach(
         fun(Shard) ->
-            emqx_ds_storage_layer:drop_shard({DB, Shard})
+            emqx_ds_storage_layer:drop_shard({DB, Shard}),
+            ra_drop_shard(DB, Shard)
         end,
         MyShards
     ).
@@ -510,3 +516,163 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
 
 list_nodes() ->
     mria:running_nodes().
+
+%%
+
+ra_start_shard(DB, Shard) ->
+    System = default,
+    Site = emqx_ds_replication_layer_meta:this_site(),
+    ClusterName = ra_cluster_name(DB, Shard),
+    LocalServer = ra_local_server(DB, Shard),
+    Servers = ra_shard_servers(DB, Shard),
+    case ra:restart_server(System, LocalServer) of
+        ok ->
+            ok;
+        {error, name_not_registered} ->
+            ok = ra:start_server(System, #{
+                id => LocalServer,
+                uid => <<ClusterName/binary, "_", Site/binary>>,
+                cluster_name => ClusterName,
+                initial_members => Servers,
+                machine => {module, ?MODULE, #{db => DB, shard => Shard}},
+                log_init_args => #{}
+            })
+    end,
+    case Servers of
+        [LocalServer | _] ->
+            %% TODO
+            %% Not super robust, but we probably don't expect nodes to be down
+            %% when we bring up a fresh consensus group. Triggering election
+            %% is not really required otherwise.
+            %% TODO
+            %% Ensure that doing that on node restart does not disrupt consensus.
+            ok = ra:trigger_election(LocalServer);
+        _ ->
+            ok
+    end,
+    ignore.
+
+ra_store_batch(DB, Shard, Messages) ->
+    Command = #{
+        ?tag => ?BATCH,
+        ?batch_messages => Messages,
+        ?timestamp => emqx_ds:timestamp_us()
+    },
+    case ra:process_command(ra_leader_servers(DB, Shard), Command) of
+        {ok, Result, _Leader} ->
+            Result;
+        Error ->
+            error(Error, [DB, Shard])
+    end.
+
+ra_get_streams(DB, Shard, TopicFilter, Time) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
+
+ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
+
+ra_update_iterator(DB, Shard, Iter, DSKey) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
+
+ra_next(DB, Shard, Iter, BatchSize) ->
+    {_Name, Node} = ra_random_replica(DB, Shard),
+    emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
+
+ra_drop_shard(DB, Shard) ->
+    %% TODO: clean dsrepl state
+    ra:stop_server(_System = default, ra_local_server(DB, Shard)).
+
+ra_shard_servers(DB, Shard) ->
+    {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
+    [
+        {ra_server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
+     || Site <- ReplicaSet
+    ].
+
+ra_local_server(DB, Shard) ->
+    Site = emqx_ds_replication_layer_meta:this_site(),
+    {ra_server_name(DB, Shard, Site), node()}.
+
+ra_leader_servers(DB, Shard) ->
+    %% NOTE: Contact last known leader first, then rest of shard servers.
+    ClusterName = ra_cluster_name(DB, Shard),
+    case ra_leaderboard:lookup_leader(ClusterName) of
+        Leader when Leader /= undefined ->
+            Servers = ra_leaderboard:lookup_members(ClusterName),
+            [Leader | lists:delete(Leader, Servers)];
+        undefined ->
+            %% TODO: Dynamic membership.
+            ra_shard_servers(DB, Shard)
+    end.
+
+ra_random_replica(DB, Shard) ->
+    %% NOTE: Contact random replica that is not a known leader.
+    %% TODO: Replica may be down, so we may need to retry.
+    ClusterName = ra_cluster_name(DB, Shard),
+    case ra_leaderboard:lookup_members(ClusterName) of
+        Servers when is_list(Servers) ->
+            Leader = ra_leaderboard:lookup_leader(ClusterName),
+            ra_pick_replica(Servers, Leader);
+        undefined ->
+            %% TODO
+            %% Leader is unkonwn if there are no servers of this group on the
+            %% local node. We want to pick a replica in that case as well.
+            %% TODO: Dynamic membership.
+            ra_pick_server(ra_shard_servers(DB, Shard))
+    end.
+
+ra_pick_replica(Servers, Leader) ->
+    case lists:delete(Leader, Servers) of
+        [] ->
+            Leader;
+        Followers ->
+            ra_pick_server(Followers)
+    end.
+
+ra_pick_server(Servers) ->
+    lists:nth(rand:uniform(length(Servers)), Servers).
+
+ra_cluster_name(DB, Shard) ->
+    iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])).
+
+ra_server_name(DB, Shard, Site) ->
+    DBBin = atom_to_binary(DB),
+    binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>).
+
+%%
+
+init(#{db := DB, shard := Shard}) ->
+    _ = erlang:put(emqx_ds_db_shard, {DB, Shard}),
+    #{latest => 0}.
+
+apply(
+    #{index := RaftIdx},
+    #{
+        ?tag := ?BATCH,
+        ?batch_messages := MessagesIn,
+        ?timestamp := TimestampLocal
+    },
+    #{latest := Latest} = State
+) ->
+    %% NOTE
+    %% Unique timestamp tracking real time closely.
+    %% With microsecond granularity it should be nearly impossible for it to run
+    %% too far ahead than the real time clock.
+    Timestamp = max(Latest + 1, TimestampLocal),
+    Messages = assign_timestamps(Timestamp, MessagesIn),
+    Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}),
+    %% NOTE: Last assigned timestamp.
+    NLatest = Timestamp + length(Messages) - 1,
+    NState = State#{latest := NLatest},
+    %% TODO: Need to measure effects of changing frequency of `release_cursor`.
+    Effect = {release_cursor, RaftIdx, NState},
+    {NState, Result, Effect}.
+
+assign_timestamps(Timestamp, [MessageIn | Rest]) ->
+    Message = emqx_message:set_timestamp(Timestamp, MessageIn),
+    [Message | assign_timestamps(Timestamp + 1, Rest)];
+assign_timestamps(_Timestamp, []) ->
+    [].

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl

@@ -28,6 +28,7 @@
 %% keys:
 -define(tag, 1).
 -define(shard, 2).
+-define(timestamp, 3).
 -define(enc, 3).
 -define(batch_messages, 2).
 

+ 3 - 18
apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl

@@ -40,7 +40,6 @@
 
 -export_type([]).
 
--include("emqx_ds_replication_layer.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
 %%================================================================================
@@ -109,7 +108,6 @@ store_batch(DB, Messages, Opts) ->
 -record(s, {
     db :: emqx_ds:db(),
     shard :: emqx_ds_replication_layer:shard_id(),
-    leader :: node(),
     n = 0 :: non_neg_integer(),
     tref :: reference(),
     batch = [] :: [emqx_types:message()],
@@ -119,12 +117,9 @@ store_batch(DB, Messages, Opts) ->
 init([DB, Shard]) ->
     process_flag(trap_exit, true),
     process_flag(message_queue_data, off_heap),
-    %% TODO: adjust leader dynamically
-    Leader = shard_leader(DB, Shard),
     S = #s{
         db = DB,
         shard = Shard,
-        leader = Leader,
         tref = start_timer()
     },
     {ok, S}.
@@ -159,10 +154,10 @@ terminate(_Reason, _S) ->
 do_flush(S = #s{batch = []}) ->
     S#s{tref = start_timer()};
 do_flush(
-    S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader}
+    S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard}
 ) ->
-    Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
-    ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
+    %% FIXME
+    ok = emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)),
     [gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
     ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}),
     erlang:garbage_collect(),
@@ -212,13 +207,3 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies
 start_timer() ->
     Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
     erlang:send_after(Interval, self(), ?flush).
-
-shard_leader(DB, Shard) ->
-    %% TODO: use optvar
-    case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
-        {ok, Leader} ->
-            Leader;
-        {error, no_leader_for_shard} ->
-            timer:sleep(500),
-            shard_leader(DB, Shard)
-    end.

+ 56 - 27
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -34,6 +34,7 @@
     replica_set/2,
     in_sync_replicas/2,
     sites/0,
+    node/1,
     open_db/2,
     get_options/1,
     update_db_config/2,
@@ -113,7 +114,7 @@
 
 -spec print_status() -> ok.
 print_status() ->
-    io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]),
+    io:format("THIS SITE:~n~s~n", [this_site()]),
     io:format("~nSITES:~n", []),
     Nodes = [node() | nodes()],
     lists:foreach(
@@ -123,28 +124,18 @@ print_status() ->
                     true -> up;
                     false -> down
                 end,
-            io:format("~s    ~p    ~p~n", [base64:encode(Site), Node, Status])
+            io:format("~s    ~p    ~p~n", [Site, Node, Status])
         end,
         eval_qlc(mnesia:table(?NODE_TAB))
     ),
     io:format(
-        "~nSHARDS:~nId                             Leader                            Status~n", []
+        "~nSHARDS:~nId                             Replicas~n", []
     ),
     lists:foreach(
-        fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) ->
+        fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
             ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
-            LeaderStr = string:pad(atom_to_list(Leader), 33),
-            Status =
-                case lists:member(Leader, Nodes) of
-                    true ->
-                        case node() of
-                            Leader -> "up *";
-                            _ -> "up"
-                        end;
-                    false ->
-                        "down"
-                end,
-            io:format("~s ~s ~s~n", [ShardStr, LeaderStr, Status])
+            ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40),
+            io:format("~s ~s~n", [ShardStr, ReplicasStr])
         end,
         eval_qlc(mnesia:table(?SHARD_TAB))
     ).
@@ -169,8 +160,8 @@ shards(DB) ->
 -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 my_shards(DB) ->
     Site = this_site(),
-    filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) ->
-        lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync)
+    filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) ->
+        lists:member(Site, ReplicaSet)
     end).
 
 -spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
@@ -219,6 +210,15 @@ in_sync_replicas(DB, ShardId) ->
 sites() ->
     eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
 
+-spec node(site()) -> node() | undefined.
+node(Site) ->
+    case mnesia:dirty_read(?NODE_TAB, Site) of
+        [#?NODE_TAB{node = Node}] ->
+            Node;
+        [] ->
+            undefined
+    end.
+
 -spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
     {ok, node()} | {error, no_leader_for_shard}.
 shard_leader(DB, Shard) ->
@@ -248,8 +248,17 @@ get_options(DB) ->
 -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     emqx_ds_replication_layer:builtin_db_opts().
 open_db(DB, DefaultOpts) ->
-    {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]),
-    Opts.
+    case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of
+        {atomic, Opts} ->
+            Opts;
+        {aborted, {siteless_nodes, Nodes}} ->
+            %% TODO
+            %% This is ugly. We need a good story of how to fairly allocate shards in a
+            %% fresh cluster.
+            logger:notice("Aborting shard allocation, siteless nodes found: ~p", [Nodes]),
+            ok = timer:sleep(1000),
+            open_db(DB, DefaultOpts)
+    end.
 
 -spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
     ok | {error, _}.
@@ -273,12 +282,26 @@ drop_db(DB) ->
 init([]) ->
     process_flag(trap_exit, true),
     logger:set_process_metadata(#{domain => [ds, meta]}),
+    init_ra(),
     ensure_tables(),
     ensure_site(),
     {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}),
     S = #s{},
     {ok, S}.
 
+init_ra() ->
+    DataDir = filename:join([emqx:data_dir(), "dsrepl"]),
+    Config = maps:merge(ra_system:default_config(), #{
+        data_dir => DataDir,
+        wal_data_dir => DataDir
+    }),
+    case ra_system:start(Config) of
+        {ok, _System} ->
+            ok;
+        {error, {already_started, _System}} ->
+            ok
+    end.
+
 handle_call(_Call, _From, S) ->
     {reply, {error, unknown_call}, S}.
 
@@ -431,8 +454,8 @@ ensure_site() ->
         {ok, [Site]} ->
             ok;
         _ ->
-            Site = crypto:strong_rand_bytes(8),
-            logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]),
+            Site = binary:encode_hex(crypto:strong_rand_bytes(4)),
+            logger:notice("Creating a new site with ID=~s", [Site]),
             ok = filelib:ensure_dir(Filename),
             {ok, FD} = file:open(Filename, [write]),
             io:format(FD, "~p.", [Site]),
@@ -445,17 +468,23 @@ ensure_site() ->
 -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok.
 create_shards(DB, NShards, ReplicationFactor) ->
     Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
-    AllSites = sites(),
+    AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
+    Nodes = mria_mnesia:running_nodes(),
+    case Nodes -- [N || #?NODE_TAB{node = N} <- AllSites] of
+        [] ->
+            ok;
+        NodesSiteless ->
+            mnesia:abort({siteless_nodes, NodesSiteless})
+    end,
     lists:foreach(
         fun(Shard) ->
-            Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites],
+            Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites],
             Hashes = lists:sort(Hashes0),
             {_, Sites} = lists:unzip(Hashes),
-            [First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor),
+            ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
             Record = #?SHARD_TAB{
                 shard = {DB, Shard},
-                replica_set = ReplicaSet,
-                in_sync_replicas = [First]
+                replica_set = ReplicaSet
             },
             mnesia:write(Record)
         end,

+ 7 - 11
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -160,8 +160,8 @@ create(_ShardId, DBHandle, GenId, Options) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
-    %% 10 bits -> 1024 ms -> ~1 sec
-    TSOffsetBits = maps:get(epoch_bits, Options, 10),
+    %% 20 bits -> 1048576 us -> ~1 sec
+    TSOffsetBits = maps:get(epoch_bits, Options, 20),
     %% Create column families:
     DataCFName = data_cf(GenId),
     TrieCFName = trie_cf(GenId),
@@ -345,7 +345,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
     %% Compute safe cutoff time.
     %% It's the point in time where the last complete epoch ends, so we need to know
     %% the current time to compute it.
-    Now = emqx_message:timestamp_now(),
+    Now = emqx_ds:timestamp_us(),
     SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
     next_until(Schema, It, SafeCutoffTime, BatchSize).
 
@@ -436,9 +436,7 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key
     %% Make filter:
     Inequations = [
         {'=', TopicIndex},
-        {StartTime, '..', SafeCutoffTime - 1},
-        %% Unique integer:
-        any
+        {StartTime, '..', SafeCutoffTime - 1}
         %% Varying topic levels:
         | lists:map(
             fun
@@ -666,11 +664,10 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam
 ]) ->
     binary().
 make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
-    UniqueInteger = erlang:unique_integer([monotonic, positive]),
     emqx_ds_bitmask_keymapper:key_to_bitstring(
         KeyMapper,
         emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [
-            TopicIndex, Timestamp, UniqueInteger | Varying
+            TopicIndex, Timestamp | Varying
         ])
     ).
 
@@ -726,10 +723,9 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
     %% Dimension Offset        Bitsize
         [{1,     0,            TopicIndexBytes * ?BYTE_SIZE},      %% Topic index
          {2,     TSOffsetBits, TSBits - TSOffsetBits       }] ++   %% Timestamp epoch
-        [{3 + I, 0,            BitsPerTopicLevel           }       %% Varying topic levels
+        [{2 + I, 0,            BitsPerTopicLevel           }       %% Varying topic levels
                                                            || I <- lists:seq(1, N)] ++
-        [{2,     0,            TSOffsetBits                },      %% Timestamp offset
-         {3,     0,            64                          }],     %% Unique integer
+        [{2,     0,            TSOffsetBits                }],     %% Timestamp offset
     Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)),
     %% Assert:
     case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -5,7 +5,7 @@
     {vsn, "0.1.12"},
     {modules, []},
     {registered, []},
-    {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]},
+    {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},
     {mod, {emqx_ds_app, []}},
     {env, []}
 ]}.

+ 2 - 1
rebar.config

@@ -110,7 +110,8 @@
     {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}},
     {ssl_verify_fun, "1.1.7"},
     {rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}},
-    {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}}
+    {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}},
+    {ra, "2.7.3"}
 ]}.
 
 {xref_ignores,