|
@@ -258,13 +258,18 @@ query_resource(Config, Request) ->
|
|
|
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
|
|
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
|
|
|
|
|
|
|
|
query_resource_async(Config, Request) ->
|
|
query_resource_async(Config, Request) ->
|
|
|
|
|
+ query_resource_async(Config, Request, _Opts = #{}).
|
|
|
|
|
+
|
|
|
|
|
+query_resource_async(Config, Request, Opts) ->
|
|
|
Name = ?config(pgsql_name, Config),
|
|
Name = ?config(pgsql_name, Config),
|
|
|
BridgeType = ?config(pgsql_bridge_type, Config),
|
|
BridgeType = ?config(pgsql_bridge_type, Config),
|
|
|
Ref = alias([reply]),
|
|
Ref = alias([reply]),
|
|
|
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
|
|
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
|
|
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
|
|
|
|
+ Timeout = maps:get(timeout, Opts, 500),
|
|
|
Return = emqx_resource:query(ResourceID, Request, #{
|
|
Return = emqx_resource:query(ResourceID, Request, #{
|
|
|
- timeout => 500, async_reply_fun => {AsyncReplyFun, []}
|
|
|
|
|
|
|
+ timeout => Timeout,
|
|
|
|
|
+ async_reply_fun => {AsyncReplyFun, []}
|
|
|
}),
|
|
}),
|
|
|
{Return, Ref}.
|
|
{Return, Ref}.
|
|
|
|
|
|
|
@@ -498,9 +503,9 @@ t_write_timeout(Config) ->
|
|
|
Config,
|
|
Config,
|
|
|
#{
|
|
#{
|
|
|
<<"resource_opts">> => #{
|
|
<<"resource_opts">> => #{
|
|
|
- <<"request_timeout">> => 500,
|
|
|
|
|
- <<"resume_interval">> => 100,
|
|
|
|
|
- <<"health_check_interval">> => 100
|
|
|
|
|
|
|
+ <<"auto_restart_interval">> => <<"100ms">>,
|
|
|
|
|
+ <<"resume_interval">> => <<"100ms">>,
|
|
|
|
|
+ <<"health_check_interval">> => <<"100ms">>
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
),
|
|
),
|
|
@@ -515,7 +520,7 @@ t_write_timeout(Config) ->
|
|
|
Res1 =
|
|
Res1 =
|
|
|
case QueryMode of
|
|
case QueryMode of
|
|
|
async ->
|
|
async ->
|
|
|
- query_resource_async(Config, {send_message, SentData});
|
|
|
|
|
|
|
+ query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
|
|
|
sync ->
|
|
sync ->
|
|
|
query_resource(Config, {send_message, SentData})
|
|
query_resource(Config, {send_message, SentData})
|
|
|
end,
|
|
end,
|
|
@@ -526,7 +531,17 @@ t_write_timeout(Config) ->
|
|
|
{_, Ref} when is_reference(Ref) ->
|
|
{_, Ref} when is_reference(Ref) ->
|
|
|
case receive_result(Ref, 15_000) of
|
|
case receive_result(Ref, 15_000) of
|
|
|
{ok, Res} ->
|
|
{ok, Res} ->
|
|
|
- ?assertMatch({error, {unrecoverable_error, _}}, Res);
|
|
|
|
|
|
|
+ %% we may receive a successful result depending on
|
|
|
|
|
+ %% timing, if the request is retried after the
|
|
|
|
|
+ %% failure is healed.
|
|
|
|
|
+ case Res of
|
|
|
|
|
+ {error, {unrecoverable_error, _}} ->
|
|
|
|
|
+ ok;
|
|
|
|
|
+ {ok, _} ->
|
|
|
|
|
+ ok;
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ ct:fail("unexpected result: ~p", [Res])
|
|
|
|
|
+ end;
|
|
|
timeout ->
|
|
timeout ->
|
|
|
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
|
|
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
|
|
|
ct:fail("no response received")
|
|
ct:fail("no response received")
|