|
|
@@ -33,6 +33,7 @@
|
|
|
-define(WAREHOUSE, <<"testwarehouse">>).
|
|
|
-define(PIPE, <<"testpipe0">>).
|
|
|
-define(PIPE_USER, <<"snowpipeuser">>).
|
|
|
+-define(PIPE_USER_RO, <<"snowpipe_ro_user">>).
|
|
|
|
|
|
-define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])).
|
|
|
-define(CONF_COLUMN_ORDER(T), [
|
|
|
@@ -147,7 +148,7 @@ end_per_testcase(_Testcase, Config) ->
|
|
|
timetrap(Config) ->
|
|
|
case ?config(mock, Config) of
|
|
|
true ->
|
|
|
- {seconds, 10};
|
|
|
+ {seconds, 20};
|
|
|
false ->
|
|
|
{seconds, 150}
|
|
|
end.
|
|
|
@@ -1050,28 +1051,42 @@ t_aggreg_invalid_column_values(Config0) ->
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
-t_aggreg_inexistent_database(init, Config) when is_list(Config) ->
|
|
|
- t_aggreg_inexistent_database(init, maps:from_list(Config));
|
|
|
-t_aggreg_inexistent_database(init, #{mock := true} = Config) ->
|
|
|
+%% Checks that we enqueue aggregated buffer errors if the delivery fails, and that
|
|
|
+%% reflects on the action status.
|
|
|
+t_aggreg_failed_delivery(init, Config) when is_list(Config) ->
|
|
|
+ t_aggreg_failed_delivery(init, maps:from_list(Config));
|
|
|
+t_aggreg_failed_delivery(init, #{mock := true} = Config) ->
|
|
|
Mod = ?CONN_MOD,
|
|
|
- meck:expect(Mod, do_stage_file, fun(
|
|
|
- _ConnPid, _Filename, _Database, _Schema, _Stage, _ActionName
|
|
|
+ meck:expect(Mod, do_insert_files_request, fun(
|
|
|
+ _HTTPPool, _Req, _RequestTTL, _MaxRetries
|
|
|
) ->
|
|
|
- Msg =
|
|
|
- "SQL compilation error:, Database 'INEXISTENT' does not"
|
|
|
- " exist or not authorized. SQLSTATE IS: 02000",
|
|
|
- {error, Msg}
|
|
|
+ Headers = [
|
|
|
+ {<<"content-type">>, <<"application/json">>},
|
|
|
+ {<<"date">>, <<"Wed, 09 Oct 2024 13:12:55 GMT">>},
|
|
|
+ {<<"strict-transport-security">>, <<"max-age=31536000">>},
|
|
|
+ {<<"x-content-type-options">>, <<"nosniff">>},
|
|
|
+ {<<"x-frame-options">>, <<"deny">>},
|
|
|
+ {<<"content-length">>, <<"175">>},
|
|
|
+ {<<"connection">>, <<"keep-alive">>}
|
|
|
+ ],
|
|
|
+ Body = <<
|
|
|
+ "{\n \"data\" : null,\n \"code\" : \"390403\",\n "
|
|
|
+ "\"message\" : \"Not authorized to manage the specified object. "
|
|
|
+ "Pipe access permission denied\",\n \"success\" : false,\n "
|
|
|
+ "\"headers\" : null\n}"
|
|
|
+ >>,
|
|
|
+ {ok, 403, Headers, Body}
|
|
|
end),
|
|
|
maps:to_list(Config);
|
|
|
-t_aggreg_inexistent_database(init, #{} = Config) ->
|
|
|
+t_aggreg_failed_delivery(init, #{} = Config) ->
|
|
|
maps:to_list(Config).
|
|
|
-t_aggreg_inexistent_database(Config) ->
|
|
|
+t_aggreg_failed_delivery(Config) ->
|
|
|
?check_trace(
|
|
|
emqx_bridge_v2_testlib:snk_timetrap(),
|
|
|
begin
|
|
|
{ok, _} = emqx_bridge_v2_testlib:create_bridge_api(
|
|
|
Config,
|
|
|
- #{<<"parameters">> => #{<<"database">> => <<"inexistent">>}}
|
|
|
+ #{<<"parameters">> => #{<<"pipe_user">> => ?PIPE_USER_RO}}
|
|
|
),
|
|
|
ActionResId = emqx_bridge_v2_testlib:bridge_id(Config),
|
|
|
%% BeginMark = get_begin_mark(Config, ActionResId),
|
|
|
@@ -1186,4 +1201,3 @@ t_wrong_snowpipe_user(Config) ->
|
|
|
%% + Not supported when using pipes: `ABORT_STATEMENT'
|
|
|
%% - Missing data for a required column
|
|
|
%% * Transient failure when staging file
|
|
|
-%% * Transient failure when calling `insertFiles'
|