|
|
@@ -1944,7 +1944,7 @@ t_expiration_async_batch_after_reply(_Config) ->
|
|
|
#{name => test_resource},
|
|
|
#{
|
|
|
query_mode => async,
|
|
|
- batch_size => 2,
|
|
|
+ batch_size => 3,
|
|
|
batch_time => 100,
|
|
|
worker_pool_size => 1,
|
|
|
resume_interval => 2_000
|
|
|
@@ -1959,7 +1959,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
|
|
|
NAcks =
|
|
|
case IsBatch of
|
|
|
batch -> 1;
|
|
|
- single -> 2
|
|
|
+ single -> 3
|
|
|
end,
|
|
|
?force_ordering(
|
|
|
#{?snk_kind := buffer_worker_flush_ack},
|
|
|
@@ -1980,6 +1980,10 @@ do_t_expiration_async_after_reply(IsBatch) ->
|
|
|
ok,
|
|
|
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
|
|
|
),
|
|
|
+ ?assertEqual(
|
|
|
+ ok,
|
|
|
+ emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS})
|
|
|
+ ),
|
|
|
?assertEqual(
|
|
|
ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity})
|
|
|
),
|
|
|
@@ -2004,23 +2008,37 @@ do_t_expiration_async_after_reply(IsBatch) ->
|
|
|
ok
|
|
|
end,
|
|
|
fun(Trace) ->
|
|
|
- ?assertMatch(
|
|
|
- [
|
|
|
- #{
|
|
|
- expired := [{query, _, {inc_counter, 199}, _, _}]
|
|
|
- }
|
|
|
- ],
|
|
|
- ?of_kind(handle_async_reply_expired, Trace)
|
|
|
- ),
|
|
|
+ case IsBatch of
|
|
|
+ batch ->
|
|
|
+ ?assertMatch(
|
|
|
+ [
|
|
|
+ #{
|
|
|
+ expired := [
|
|
|
+ {query, _, {inc_counter, 199}, _, _},
|
|
|
+ {query, _, {inc_counter, 299}, _, _}
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ ?of_kind(handle_async_reply_expired, Trace)
|
|
|
+ );
|
|
|
+ single ->
|
|
|
+ ?assertMatch(
|
|
|
+ [
|
|
|
+ #{expired := [{query, _, {inc_counter, 199}, _, _}]},
|
|
|
+ #{expired := [{query, _, {inc_counter, 299}, _, _}]}
|
|
|
+ ],
|
|
|
+ ?of_kind(handle_async_reply_expired, Trace)
|
|
|
+ )
|
|
|
+ end,
|
|
|
Metrics = tap_metrics(?LINE),
|
|
|
?assertMatch(
|
|
|
#{
|
|
|
counters := #{
|
|
|
- matched := 2,
|
|
|
+ matched := 3,
|
|
|
%% the request with infinity timeout.
|
|
|
success := 1,
|
|
|
dropped := 0,
|
|
|
- late_reply := 1,
|
|
|
+ late_reply := 2,
|
|
|
retried := 0,
|
|
|
failed := 0
|
|
|
}
|
|
|
@@ -2042,7 +2060,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
|
|
|
#{name => test_resource},
|
|
|
#{
|
|
|
query_mode => async,
|
|
|
- batch_size => 2,
|
|
|
+ batch_size => 3,
|
|
|
batch_time => 100,
|
|
|
worker_pool_size => 1,
|
|
|
resume_interval => ResumeInterval
|
|
|
@@ -2067,6 +2085,10 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
|
|
|
ok,
|
|
|
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
|
|
|
),
|
|
|
+ ?assertEqual(
|
|
|
+ ok,
|
|
|
+ emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS})
|
|
|
+ ),
|
|
|
Pid0 =
|
|
|
spawn_link(fun() ->
|
|
|
?tp(delay_enter, #{}),
|
|
|
@@ -2087,7 +2109,10 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
|
|
|
?assertMatch(
|
|
|
[
|
|
|
#{
|
|
|
- expired := [{query, _, {inc_counter, 199}, _, _}]
|
|
|
+ expired := [
|
|
|
+ {query, _, {inc_counter, 199}, _, _},
|
|
|
+ {query, _, {inc_counter, 299}, _, _}
|
|
|
+ ]
|
|
|
}
|
|
|
],
|
|
|
?of_kind(handle_async_reply_expired, Trace)
|
|
|
@@ -2096,10 +2121,10 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
|
|
|
?assertMatch(
|
|
|
#{
|
|
|
counters := #{
|
|
|
- matched := 1,
|
|
|
+ matched := 2,
|
|
|
success := 0,
|
|
|
dropped := 0,
|
|
|
- late_reply := 1,
|
|
|
+ late_reply := 2,
|
|
|
retried := 0,
|
|
|
failed := 0
|
|
|
},
|