|
|
@@ -208,6 +208,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
|
|
ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
|
|
|
RequestTimeout = maps:get(request_timeout, Config, "10s"),
|
|
|
ResumeInterval = maps:get(resume_interval, Config, "1s"),
|
|
|
+ HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"),
|
|
|
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
|
|
|
LocalTopic =
|
|
|
case maps:find(local_topic, Config) of
|
|
|
@@ -232,7 +233,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
|
|
" body = \"${id}\"\n"
|
|
|
" resource_opts {\n"
|
|
|
" inflight_window = 100\n"
|
|
|
- " health_check_interval = \"200ms\"\n"
|
|
|
+ " health_check_interval = \"~s\"\n"
|
|
|
" max_buffer_bytes = \"1GB\"\n"
|
|
|
" query_mode = \"~s\"\n"
|
|
|
" request_ttl = \"~p\"\n"
|
|
|
@@ -254,6 +255,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
|
|
LocalTopic,
|
|
|
PoolSize,
|
|
|
RequestTimeout,
|
|
|
+ HealthCheckInterval,
|
|
|
QueryMode,
|
|
|
ResourceRequestTTL,
|
|
|
ResumeInterval
|
|
|
@@ -350,19 +352,27 @@ t_send_async_connection_timeout(Config) ->
|
|
|
port => Port,
|
|
|
pool_size => 1,
|
|
|
query_mode => "async",
|
|
|
- connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "s",
|
|
|
+ connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms",
|
|
|
request_timeout => "10s",
|
|
|
+ resume_interval => "200ms",
|
|
|
+ health_check_interval => "200ms",
|
|
|
resource_request_ttl => "infinity"
|
|
|
}),
|
|
|
+ ResourceId = emqx_bridge_resource:resource_id(BridgeID),
|
|
|
+ ?retry(
|
|
|
+ _Interval0 = 200,
|
|
|
+ _NAttempts0 = 20,
|
|
|
+ ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
+ ),
|
|
|
NumberOfMessagesToSend = 10,
|
|
|
[
|
|
|
emqx_bridge:send_message(BridgeID, #{<<"id">> => Id})
|
|
|
|| Id <- lists:seq(1, NumberOfMessagesToSend)
|
|
|
],
|
|
|
- %% Make sure server recive all messages
|
|
|
+ %% Make sure server receives all messages
|
|
|
ct:pal("Sent messages\n"),
|
|
|
MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void),
|
|
|
- receive_request_notifications(MessageIDs, ResponseDelayMS),
|
|
|
+ receive_request_notifications(MessageIDs, ResponseDelayMS, []),
|
|
|
ok.
|
|
|
|
|
|
t_async_free_retries(Config) ->
|
|
|
@@ -569,15 +579,16 @@ do_t_async_retries(TestContext, Error, Fn) ->
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
-receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
|
|
|
+receive_request_notifications(MessageIDs, _ResponseDelay, _Acc) when map_size(MessageIDs) =:= 0 ->
|
|
|
ok;
|
|
|
-receive_request_notifications(MessageIDs, ResponseDelay) ->
|
|
|
+receive_request_notifications(MessageIDs, ResponseDelay, Acc) ->
|
|
|
receive
|
|
|
{http_server, received, Req} ->
|
|
|
RemainingMessageIDs = remove_message_id(MessageIDs, Req),
|
|
|
- receive_request_notifications(RemainingMessageIDs, ResponseDelay)
|
|
|
+ receive_request_notifications(RemainingMessageIDs, ResponseDelay, [Req | Acc])
|
|
|
after (30 * 1000) ->
|
|
|
- ct:pal("Waited to long time but did not get any message\n"),
|
|
|
+ ct:pal("Waited a long time but did not get any message"),
|
|
|
+ ct:pal("Messages received so far:\n ~p", [Acc]),
|
|
|
ct:fail("All requests did not reach server at least once")
|
|
|
end.
|
|
|
|