Procházet zdrojové kódy

fix(dsraft): guard remote polls with shard readiness check

Also make it slightly more robust by introducing a trivial
"sentinel" process as the last of the supervisor children.
Andrew Mayorov před 1 rokem
rodič
revize
f5c241a53a

+ 46 - 6
apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl

@@ -14,6 +14,7 @@
     start_shard/1,
     start_egress/1,
     stop_shard/1,
+    shard_info/2,
     terminate_storage/1,
     restart_storage/1,
     ensure_shard/1,
@@ -31,23 +32,26 @@
 -export([init/1]).
 
 %% internal exports:
--export([start_link_sup/2]).
+-export([start_link_sup/2, start_link_sentinel/1, init_sentinel/2]).
 
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
--define(via(REC), {via, gproc, {n, l, REC}}).
+-define(name(REC), {n, l, REC}).
+-define(via(REC), {via, gproc, ?name(REC)}).
 
 -define(db_sup, ?MODULE).
--define(shards_sup, emqx_ds_builtin_db_shards_sup).
--define(egress_sup, emqx_ds_builtin_db_egress_sup).
--define(shard_sup, emqx_ds_builtin_db_shard_sup).
+-define(shards_sup, emqx_ds_builtin_raft_db_shards_sup).
+-define(egress_sup, emqx_ds_builtin_raft_db_egress_sup).
+-define(shard_sup, emqx_ds_builtin_raft_db_shard_sup).
+-define(shard_sentinel, emqx_ds_builtin_raft_db_shard_sentinel).
 
 -record(?db_sup, {db}).
 -record(?shards_sup, {db}).
 -record(?egress_sup, {db}).
 -record(?shard_sup, {db, shard}).
+-record(?shard_sentinel, {shardid}).
 
 %%================================================================================
 %% API functions
@@ -77,6 +81,13 @@ stop_shard({DB, Shard}) ->
             {error, Reason}
     end.
 
+-spec shard_info(emqx_ds_storage_layer:shard_id(), ready) -> boolean() | down.
+shard_info(ShardId = {DB, Shard}, Info) ->
+    case sentinel_alive(ShardId) of
+        true -> emqx_ds_replication_layer_shard:shard_info(DB, Shard, Info);
+        false -> down
+    end.
+
 -spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
 terminate_storage({DB, Shard}) ->
     Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
@@ -181,7 +192,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
     Children = [
         shard_storage_spec(DB, Shard, Opts),
         shard_replication_spec(DB, Shard, Opts),
-        shard_beamformers_spec(DB, Shard)
+        shard_beamformers_spec(DB, Shard),
+        shard_sentinel_spec(DB, Shard)
     ],
     {ok, {SupFlags, Children}}.
 
@@ -220,6 +232,22 @@ start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
 start_link_sup(Id, Options) ->
     supervisor:start_link(?via(Id), ?MODULE, {Id, Options}).
 
+-spec start_link_sentinel(emqx_ds_storage_layer:shard_id()) -> {ok, pid()}.
+start_link_sentinel(Id) ->
+    proc_lib:start_link(?MODULE, init_sentinel, [self(), Id]).
+
+-spec init_sentinel(pid(), emqx_ds_storage_layer:shard_id()) -> no_return().
+init_sentinel(Parent, Id) ->
+    Name = ?name(#?shard_sentinel{shardid = Id}),
+    gproc:reg(Name),
+    proc_lib:init_ack(Parent, {ok, self()}),
+    receive
+        %% Not trapping exits, but just in case.
+        {'EXIT', _Pid, Reason} ->
+            gproc:unreg(Name),
+            exit(Reason)
+    end.
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -292,6 +320,18 @@ shard_beamformers_spec(DB, Shard) ->
             ]}
     }.
 
+shard_sentinel_spec(DB, Shard) ->
+    #{
+        id => {Shard, sentinel},
+        type => worker,
+        restart => permanent,
+        shutdown => brutal_kill,
+        start => {?MODULE, start_link_sentinel, [{DB, Shard}]}
+    }.
+
+sentinel_alive(Id) ->
+    gproc:where(?name(#?shard_sentinel{shardid = Id})) =/= undefined.
+
 ensure_started(Res) ->
     case Res of
         {ok, _Pid} ->

+ 17 - 20
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -527,10 +527,10 @@ shards_of_batch(_DB, [], Acc) ->
 %% TODO
 %% There's a possibility of race condition: storage may shut down right after we
 %% ask for its status.
--define(IF_SHARD_READY(DB, SHARD, EXPR),
-    case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
+-define(IF_SHARD_READY(SHARDID, EXPR),
+    case emqx_ds_builtin_raft_db_sup:shard_info(SHARDID, ready) of
         true -> EXPR;
-        false -> {error, recoverable, shard_unavailable}
+        _Unready -> {error, recoverable, shard_unavailable}
     end
 ).
 
@@ -574,8 +574,7 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
 do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
     ).
 
@@ -602,8 +601,7 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
 do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
     ).
 
@@ -638,8 +636,7 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
 do_next_v1(DB, Shard, Iter, BatchSize) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:next(
             ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
         )
@@ -672,8 +669,7 @@ do_add_generation_v2(_DB) ->
 do_list_generations_with_lifetimes_v3(DB, Shard) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
     ).
 
@@ -700,11 +696,14 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
 do_poll_v1(SourceNode, DB, Shard, Iterators, PollOpts) ->
     ShardId = {DB, Shard},
     ?tp(ds_raft_do_poll, #{shard => ShardId, iterators => Iterators}),
-    lists:foreach(
-        fun({RAddr, It}) ->
-            emqx_ds_beamformer:poll(SourceNode, RAddr, ShardId, It, PollOpts)
-        end,
-        Iterators
+    ?IF_SHARD_READY(
+        ShardId,
+        lists:foreach(
+            fun({RAddr, It}) ->
+                emqx_ds_beamformer:poll(SourceNode, RAddr, ShardId, It, PollOpts)
+            end,
+            Iterators
+        )
     ).
 
 %%================================================================================
@@ -1249,11 +1248,9 @@ snapshot_module() ->
 unpack_iterator(Shard, #{?tag := ?IT, ?enc := Iterator}) ->
     emqx_ds_storage_layer:unpack_iterator(Shard, Iterator).
 
-scan_stream(ShardId, Stream, TopicFilter, StartMsg, BatchSize) ->
-    {DB, Shard} = ShardId,
+scan_stream(ShardId = {DB, Shard}, Stream, TopicFilter, StartMsg, BatchSize) ->
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         begin
             Now = current_timestamp(DB, Shard),
             emqx_ds_storage_layer:scan_stream(