|
|
@@ -1766,12 +1766,6 @@ t_async_pool_worker_death(_Config) ->
|
|
|
?assertEqual(NumReqs, Inflight0),
|
|
|
|
|
|
%% grab one of the worker pids and kill it
|
|
|
- {ok, SRef1} =
|
|
|
- snabbkaffe:subscribe(
|
|
|
- ?match_event(#{?snk_kind := buffer_worker_worker_down_update}),
|
|
|
- NumBufferWorkers,
|
|
|
- 10_000
|
|
|
- ),
|
|
|
{ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
|
|
|
MRef = monitor(process, Pid0),
|
|
|
ct:pal("will kill ~p", [Pid0]),
|
|
|
@@ -1785,13 +1779,27 @@ t_async_pool_worker_death(_Config) ->
|
|
|
end,
|
|
|
|
|
|
%% inflight requests should have been marked as retriable
|
|
|
- {ok, _} = snabbkaffe:receive_events(SRef1),
|
|
|
+ wait_until_all_marked_as_retriable(NumReqs),
|
|
|
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
|
|
|
?assertEqual(NumReqs, Inflight1),
|
|
|
|
|
|
- ok
|
|
|
+ NumReqs
|
|
|
end,
|
|
|
- []
|
|
|
+ fun(NumReqs, Trace) ->
|
|
|
+ Events = ?of_kind(buffer_worker_async_agent_down, Trace),
|
|
|
+ %% At least one buffer worker should have marked its
|
|
|
+ %% requests as retriable. If a single one has
|
|
|
+ %% received all requests, that's all we got.
|
|
|
+ ?assertMatch([_ | _], Events),
|
|
|
+ %% All requests distributed over all buffer workers
|
|
|
+ %% should have been marked as retriable, by the time
|
|
|
+ %% the inflight has been drained.
|
|
|
+ ?assertEqual(
|
|
|
+ NumReqs,
|
|
|
+ lists:sum([N || #{num_affected := N} <- Events])
|
|
|
+ ),
|
|
|
+ ok
|
|
|
+ end
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
@@ -3017,3 +3025,33 @@ trace_between_span(Trace0, Marker) ->
|
|
|
{Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0),
|
|
|
{[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1),
|
|
|
Trace2.
|
|
|
+
|
|
|
+wait_until_all_marked_as_retriable(NumExpected) when NumExpected =< 0 ->
|
|
|
+ ok;
|
|
|
+wait_until_all_marked_as_retriable(NumExpected) ->
|
|
|
+ Seen = #{},
|
|
|
+ do_wait_until_all_marked_as_retriable(NumExpected, Seen).
|
|
|
+
|
|
|
+do_wait_until_all_marked_as_retriable(NumExpected, _Seen) when NumExpected =< 0 ->
|
|
|
+ ok;
|
|
|
+do_wait_until_all_marked_as_retriable(NumExpected, Seen) ->
|
|
|
+ Res = ?block_until(
|
|
|
+ #{?snk_kind := buffer_worker_async_agent_down, ?snk_meta := #{pid := P}} when
|
|
|
+ not is_map_key(P, Seen),
|
|
|
+ 10_000
|
|
|
+ ),
|
|
|
+ case Res of
|
|
|
+ {timeout, Evts} ->
|
|
|
+ ct:pal("events so far:\n ~p", [Evts]),
|
|
|
+ ct:fail("timeout waiting for events");
|
|
|
+ {ok, #{num_affected := NumAffected, ?snk_meta := #{pid := Pid}}} ->
|
|
|
+ ct:pal("affected: ~p; pid: ~p", [NumAffected, Pid]),
|
|
|
+ case NumAffected >= NumExpected of
|
|
|
+ true ->
|
|
|
+ ok;
|
|
|
+ false ->
|
|
|
+ do_wait_until_all_marked_as_retriable(NumExpected - NumAffected, Seen#{
|
|
|
+ Pid => true
|
|
|
+ })
|
|
|
+ end
|
|
|
+ end.
|