|
|
@@ -1494,10 +1494,11 @@ t_pull_worker_death(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_async_worker_death_mid_pull(Config) ->
|
|
|
- ct:timetrap({seconds, 120}),
|
|
|
+ ct:timetrap({seconds, 122}),
|
|
|
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
|
|
|
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
?check_trace(
|
|
|
+ #{timetrap => 120_000},
|
|
|
begin
|
|
|
start_and_subscribe_mqtt(Config),
|
|
|
|
|
|
@@ -1513,18 +1514,22 @@ t_async_worker_death_mid_pull(Config) ->
|
|
|
#{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator}
|
|
|
),
|
|
|
spawn_link(fun() ->
|
|
|
+ ct:pal("will kill async worker"),
|
|
|
?tp_span(
|
|
|
kill_async_worker,
|
|
|
#{},
|
|
|
begin
|
|
|
%% produce a message while worker is being killed
|
|
|
Messages = [#{<<"data">> => Payload}],
|
|
|
+ ct:pal("publishing message"),
|
|
|
pubsub_publish(Config, PubSubTopic, Messages),
|
|
|
+ ct:pal("published message"),
|
|
|
|
|
|
AsyncWorkerPids = get_async_worker_pids(Config),
|
|
|
emqx_utils:pmap(
|
|
|
fun(AsyncWorkerPid) ->
|
|
|
Ref = monitor(process, AsyncWorkerPid),
|
|
|
+ ct:pal("killing pid ~p", [AsyncWorkerPid]),
|
|
|
sys:terminate(AsyncWorkerPid, die),
|
|
|
receive
|
|
|
{'DOWN', Ref, process, AsyncWorkerPid, _} ->
|
|
|
@@ -1538,7 +1543,8 @@ t_async_worker_death_mid_pull(Config) ->
|
|
|
|
|
|
ok
|
|
|
end
|
|
|
- )
|
|
|
+ ),
|
|
|
+ ct:pal("killed async worker")
|
|
|
end),
|
|
|
|
|
|
?assertMatch(
|