Просмотр исходного кода

Merge pull request #14027 from ieQu1/dev/handle-unpack-iterator-errors

Handle errors in beamformer:unpack_iterator
ieQu1 1 год назад
Родитель
Сommit
e378169e52

+ 24 - 28
apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl

@@ -584,34 +584,30 @@ t_drop_generation_with_used_once_iterator(Config) ->
         emqx_ds_test_helpers:consume_iter(DB, Iter1)
         emqx_ds_test_helpers:consume_iter(DB, Iter1)
     ).
     ).
 
 
-%% t_drop_generation_update_iterator(Config) ->
-%%     %% This checks the behavior of `emqx_ds:update_iterator' after the generation
-%%     %% underlying the iterator has been dropped.
-
-%%     DB = ?FUNCTION_NAME,
-%%     ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
-%%     [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
-
-%%     TopicFilter = emqx_topic:words(<<"foo/+">>),
-%%     StartTime = 0,
-%%     Msgs0 = [
-%%         message(<<"foo/bar">>, <<"1">>, 0),
-%%         message(<<"foo/baz">>, <<"2">>, 1)
-%%     ],
-%%     ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)),
-
-%%     [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
-%%     {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime),
-%%     {ok, Iter1, _Batch1} = emqx_ds:next(DB, Iter0, 1),
-%%     {ok, _Iter2, [{Key2, _Msg}]} = emqx_ds:next(DB, Iter1, 1),
-
-%%     ok = emqx_ds:add_generation(DB),
-%%     ok = emqx_ds:drop_generation(DB, GenId0),
-
-%%     ?assertEqual(
-%%         {error, unrecoverable, generation_not_found},
-%%         emqx_ds:update_iterator(DB, Iter1, Key2)
-%%     ).
+%% Validate that polling the iterator from the deleted generation is
+%% handled gracefully:
+t_poll_missing_generation(Config) ->
+    %% Open the DB and push some messages to create a stream:
+    DB = ?FUNCTION_NAME,
+    ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
+    Msgs = [message(<<"foo/bar">>, <<"1">>, 0)],
+    ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
+    timer:sleep(1_000),
+    %% Create an iterator:
+    TopicFilter = [<<"foo">>, '+'],
+    [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, 0),
+    {ok, It} = emqx_ds:make_iterator(DB, Stream, TopicFilter, 0),
+    %% Rotate generations:
+    [GenId0] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+    emqx_ds:add_generation(DB),
+    emqx_ds:drop_generation(DB, GenId0),
+    timer:sleep(1_000),
+    [GenId1] = maps:keys(emqx_ds:list_generations_with_lifetimes(DB)),
+    ?assertNotEqual(GenId0, GenId1),
+    %% Poll iterator:
+    Tag = ?FUNCTION_NAME,
+    {ok, Ref} = emqx_ds:poll(DB, [{Tag, It}], #{timeout => 1_000}),
+    ?assertReceive(#poll_reply{ref = Ref, userdata = Tag, payload = {error, unrecoverable, _}}).
 
 
 t_make_iterator_stale_stream(Config) ->
 t_make_iterator_stale_stream(Config) ->
     %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying
     %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying

+ 55 - 41
apps/emqx_durable_storage/src/emqx_ds_beamformer.erl

@@ -226,45 +226,53 @@
     ok.
     ok.
 poll(Node, ReturnAddr, Shard, Iterator, Opts = #{timeout := Timeout}) ->
 poll(Node, ReturnAddr, Shard, Iterator, Opts = #{timeout := Timeout}) ->
     CBM = emqx_ds_beamformer_sup:cbm(Shard),
     CBM = emqx_ds_beamformer_sup:cbm(Shard),
-    #{
-        stream := Stream,
-        topic_filter := TF,
-        last_seen_key := DSKey,
-        timestamp := Timestamp,
-        message_matcher := MsgMatcher
-    } = CBM:unpack_iterator(Shard, Iterator),
-    Deadline = erlang:monotonic_time(millisecond) + Timeout,
-    ?tp(beamformer_poll, #{
-        shard => Shard, key => DSKey, timeout => Timeout, deadline => Deadline
-    }),
-    %% Try to maximize likelyhood of sending similar iterators to the
-    %% same worker:
-    Token = {Stream, Timestamp div 10_000_000},
-    Worker = gproc_pool:pick_worker(
-        emqx_ds_beamformer_sup:pool(Shard),
-        Token
-    ),
-    %% Make request:
-    Req = #poll_req{
-        key = {Stream, TF, DSKey},
-        node = Node,
-        return_addr = ReturnAddr,
-        it = Iterator,
-        opts = Opts,
-        deadline = Deadline,
-        msg_matcher = MsgMatcher
-    },
-    emqx_ds_builtin_metrics:inc_poll_requests(shard_metrics_id(Shard), 1),
-    %% Currently we implement backpressure by ignoring transient
-    %% errors (gen_server timeouts, `too_many_requests'), and just
-    %% letting poll requests expire at the higher level. This should
-    %% hold back the caller.
-    try gen_server:call(Worker, Req, Timeout) of
-        ok -> ok;
-        {error, recoverable, too_many_requests} -> ok
-    catch
-        exit:timeout ->
-            ok
+    case CBM:unpack_iterator(Shard, Iterator) of
+        #{
+            stream := Stream,
+            topic_filter := TF,
+            last_seen_key := DSKey,
+            timestamp := Timestamp,
+            message_matcher := MsgMatcher
+        } ->
+            Deadline = erlang:monotonic_time(millisecond) + Timeout,
+            ?tp(beamformer_poll, #{
+                shard => Shard, key => DSKey, timeout => Timeout, deadline => Deadline
+            }),
+            %% Try to maximize likelyhood of sending similar iterators to the
+            %% same worker:
+            Token = {Stream, Timestamp div 10_000_000},
+            Worker = gproc_pool:pick_worker(
+                emqx_ds_beamformer_sup:pool(Shard),
+                Token
+            ),
+            %% Make request:
+            Req = #poll_req{
+                key = {Stream, TF, DSKey},
+                node = Node,
+                return_addr = ReturnAddr,
+                it = Iterator,
+                opts = Opts,
+                deadline = Deadline,
+                msg_matcher = MsgMatcher
+            },
+            emqx_ds_builtin_metrics:inc_poll_requests(shard_metrics_id(Shard), 1),
+            %% Currently we implement backpressure by ignoring transient
+            %% errors (gen_server timeouts, `too_many_requests'), and just
+            %% letting poll requests expire at the higher level. This should
+            %% hold back the caller.
+            try gen_server:call(Worker, Req, Timeout) of
+                ok -> ok;
+                {error, recoverable, too_many_requests} -> ok
+            catch
+                exit:timeout ->
+                    ok
+            end;
+        Err = {error, _, _} ->
+            Beam = #beam{
+                iterators = [{ReturnAddr, Iterator}],
+                pack = Err
+            },
+            send_out(Node, Beam)
     end.
     end.
 
 
 %% @doc This internal API notifies the beamformer that new data is
 %% @doc This internal API notifies the beamformer that new data is
@@ -755,8 +763,14 @@ send_out(Node, Beam) ->
         dest_node => Node,
         dest_node => Node,
         beam => Beam
         beam => Beam
     }),
     }),
-    emqx_ds_beamsplitter_proto_v1:dispatch(Node, Beam),
-    ok.
+    %% gen_rpc currently doesn't optimize local casts:
+    case node() of
+        Node ->
+            erlang:spawn(?MODULE, do_dispatch, [Beam]),
+            ok;
+        _ ->
+            emqx_ds_beamsplitter_proto_v1:dispatch(Node, Beam)
+    end.
 
 
 shard_metrics_id({DB, Shard}) ->
 shard_metrics_id({DB, Shard}) ->
     emqx_ds_builtin_metrics:shard_metric_id(DB, Shard).
     emqx_ds_builtin_metrics:shard_metric_id(DB, Shard).

+ 2 - 1
apps/emqx_durable_storage/src/emqx_ds_buffer.erl

@@ -68,7 +68,8 @@
 -callback buffer_config
 -callback buffer_config
     (emqx_ds:db(), _Shard, _State, batch_size | batch_bytes) ->
     (emqx_ds:db(), _Shard, _State, batch_size | batch_bytes) ->
         {ok, size_limit()} | undefined;
         {ok, size_limit()} | undefined;
-    (emqx_ds:db(), _Shard, _State, flush_interval) -> pos_integer().
+    (emqx_ds:db(), _Shard, _State, flush_interval) ->
+        {ok, pos_integer()} | undefined.
 
 
 -optional_callbacks([buffer_config/4]).
 -optional_callbacks([buffer_config/4]).