Jelajahi Sumber

chore(dsraft): bump `ra` to 2.14.0

Andrew Mayorov 1 tahun lalu
induk
melakukan
285b8f536b

+ 1 - 1
apps/emqx_ds_builtin_raft/rebar.config

@@ -2,5 +2,5 @@
 
 
 {deps, [
 {deps, [
     {emqx_durable_storage, {path, "../emqx_durable_storage"}},
     {emqx_durable_storage, {path, "../emqx_durable_storage"}},
-    {ra, "2.7.3"}
+    {ra, "2.14.0"}
 ]}.
 ]}.

+ 3 - 2
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl

@@ -438,6 +438,7 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
     MutableConfig = #{tick_timeout => 100},
     MutableConfig = #{tick_timeout => 100},
     case ra:restart_server(DB, LocalServer, MutableConfig) of
     case ra:restart_server(DB, LocalServer, MutableConfig) of
         {error, name_not_registered} ->
         {error, name_not_registered} ->
+            UID = server_uid(DB, Shard),
             Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
             Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
             LogOpts = maps:with(
             LogOpts = maps:with(
                 [
                 [
@@ -448,11 +449,11 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
             ),
             ),
             ok = ra:start_server(DB, MutableConfig#{
             ok = ra:start_server(DB, MutableConfig#{
                 id => LocalServer,
                 id => LocalServer,
-                uid => server_uid(DB, Shard),
+                uid => UID,
                 cluster_name => ClusterName,
                 cluster_name => ClusterName,
                 initial_members => Servers,
                 initial_members => Servers,
                 machine => Machine,
                 machine => Machine,
-                log_init_args => LogOpts
+                log_init_args => LogOpts#{uid => UID}
             }),
             }),
             {_NewServer = true, LocalServer};
             {_NewServer = true, LocalServer};
         ok ->
         ok ->

+ 15 - 5
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl

@@ -21,7 +21,8 @@
 -behaviour(ra_snapshot).
 -behaviour(ra_snapshot).
 -export([
 -export([
     prepare/2,
     prepare/2,
-    write/3,
+    write/4,
+    sync/1,
 
 
     begin_read/2,
     begin_read/2,
     read_chunk/3,
     read_chunk/3,
@@ -66,12 +67,20 @@
 prepare(Index, State) ->
 prepare(Index, State) ->
     ra_log_snapshot:prepare(Index, State).
     ra_log_snapshot:prepare(Index, State).
 
 
--spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) ->
+-spec write(
+    _SnapshotDir :: file:filename(),
+    ra_snapshot:meta(),
+    _State :: ra_state(),
+    _Sync :: boolean()
+) ->
     ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
     ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
-write(Dir, Meta, MachineState) ->
+write(Dir, Meta, MachineState, Sync) ->
     ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}),
     ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}),
     ok = emqx_ds_storage_layer:flush(shard_id(MachineState)),
     ok = emqx_ds_storage_layer:flush(shard_id(MachineState)),
-    ra_log_snapshot:write(Dir, Meta, MachineState).
+    ra_log_snapshot:write(Dir, Meta, MachineState, Sync).
+
+sync(Dir) ->
+    ra_log_snapshot:sync(Dir).
 
 
 %% Reading a snapshot.
 %% Reading a snapshot.
 %%
 %%
@@ -230,7 +239,8 @@ complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
     write_machine_snapshot(WS).
     write_machine_snapshot(WS).
 
 
 write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
 write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
-    ra_log_snapshot:write(Dir, Meta, MachineState).
+    {ok, _Bytes} = ra_log_snapshot:write(Dir, Meta, MachineState, _Sync = false),
+    ok.
 
 
 %% Restoring machine state from a snapshot.
 %% Restoring machine state from a snapshot.
 %% This is equivalent to restoring from a log snapshot.
 %% This is equivalent to restoring from a log snapshot.

+ 1 - 1
apps/emqx_durable_storage/mix.exs

@@ -37,7 +37,7 @@ defmodule EMQXDurableStorage.MixProject do
       {:emqx_utils, in_umbrella: true},
       {:emqx_utils, in_umbrella: true},
       UMP.common_dep(:rocksdb),
       UMP.common_dep(:rocksdb),
       UMP.common_dep(:gproc),
       UMP.common_dep(:gproc),
-      {:ra, "2.7.3"},
+      {:ra, "2.14.0"},
     ]
     ]
   end
   end
 end
 end

+ 1 - 1
mix.exs

@@ -208,7 +208,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:telemetry), do: {:telemetry, "1.1.0", override: true}
   def common_dep(:telemetry), do: {:telemetry, "1.1.0", override: true}
   # in conflict by grpc and eetcd
   # in conflict by grpc and eetcd
   def common_dep(:gpb), do: {:gpb, "4.19.9", override: true, runtime: false}
   def common_dep(:gpb), do: {:gpb, "4.19.9", override: true, runtime: false}
-  def common_dep(:ra), do: {:ra, "2.7.3", override: true}
+  def common_dep(:ra), do: {:ra, "2.14.0", override: true}
 
 
   # in conflict by emqx_connector and system_monitor
   # in conflict by emqx_connector and system_monitor
   def common_dep(:epgsql), do: {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true}
   def common_dep(:epgsql), do: {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true}

+ 1 - 1
rebar.config

@@ -111,7 +111,7 @@
     {ssl_verify_fun, "1.1.7"},
     {ssl_verify_fun, "1.1.7"},
     {rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}},
     {rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}},
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}},
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}},
-    {ra, "2.7.3"}
+    {ra, "2.14.0"}
 ]}.
 ]}.
 
 
 {xref_ignores,
 {xref_ignores,