Selaa lähdekoodia

feat(snowflake): mark action as unhealthy when upload fails

Fixes https://emqx.atlassian.net/browse/EMQX-13112

This makes the Snowflake action behave like S3 and Azure Blob Storage actions in
aggregated mode.

Currently, failure metric bumping is not supported when using `emqx_connector_aggregator`.
Thales Macedo Garitezi 1 vuosi sitten
vanhempi
commit
706bbf7ed5

+ 16 - 0
apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl

@@ -392,6 +392,7 @@ stage_file(ODBCPool, Filename, Database, Schema, Stage, ActionName) ->
     {ok, file:filename()} | {error, term()}.
 do_stage_file(ConnPid, Filename, Database, Schema, Stage, ActionName) ->
     SQL = stage_file_sql(Filename, Database, Schema, Stage, ActionName),
+    ?tp(debug, "snowflake_stage_file", #{sql => SQL, action => ActionName}),
     %% Should we also check if it actually succeeded by inspecting reportFiles?
     odbc:sql_query(ConnPid, SQL).
 
@@ -607,6 +608,7 @@ process_complete(TransferState0) ->
             {ok, 200, _, Body} ->
                 {ok, emqx_utils_json:decode(Body, [return_maps])};
             Res ->
+                ?tp("snowflake_insert_files_request_failed", #{response => Res}),
                 %% TODO: retry?
                 exit({insert_failed, Res})
         end
@@ -915,6 +917,7 @@ action_status(ActionResId, #{mode := aggregated} = ActionState) ->
     %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
     Timestamp = erlang:system_time(second),
     ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
+    ok = check_aggreg_upload_errors(AggregId),
     case http_pool_workers_healthy(ActionResId, ConnectTimeout) of
         true ->
             ?status_connected;
@@ -978,6 +981,19 @@ maybe_quote(Identifier) ->
             Identifier
     end.
 
+check_aggreg_upload_errors(AggregId) ->
+    case emqx_connector_aggregator:take_error(AggregId) of
+        [Error] ->
+            ?tp("snowflake_check_aggreg_upload_error_found", #{error => Error}),
+            %% TODO
+            %% This approach means that, for example, 3 upload failures will cause
+            %% the channel to be marked as unhealthy for 3 consecutive health checks.
+            ErrorMessage = emqx_utils:format(Error),
+            throw({unhealthy_target, ErrorMessage});
+        [] ->
+            ok
+    end.
+
 %%------------------------------------------------------------------------------
 %% Tests
 %%------------------------------------------------------------------------------

+ 89 - 9
apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl

@@ -462,26 +462,36 @@ get_begin_mark(#{mock := false}, ActionResId) ->
         emqx_bridge_snowflake_connector:insert_report(ActionResId, #{}),
     BeginMark.
 
-wait_until_processed(Config, ActionResId, BeginMark) when is_list(Config) ->
-    wait_until_processed(maps:from_list(Config), ActionResId, BeginMark);
-wait_until_processed(#{mock := true} = Config, _ActionResId, _BeginMark) ->
-    {ok, _} = ?block_until(#{?snk_kind := "mock_snowflake_insert_file_request"}),
+wait_until_processed(Config, ActionResId, BeginMark) ->
+    wait_until_processed(Config, ActionResId, BeginMark, _ExpectedNumFiles = 1).
+
+wait_until_processed(Config, ActionResId, BeginMark, ExpectedNumFiles) when is_list(Config) ->
+    wait_until_processed(maps:from_list(Config), ActionResId, BeginMark, ExpectedNumFiles);
+wait_until_processed(#{mock := true} = Config, _ActionResId, _BeginMark, ExpectedNumFiles) ->
+    snabbkaffe:block_until(
+        ?match_n_events(
+            ExpectedNumFiles,
+            #{?snk_kind := "mock_snowflake_insert_file_request"}
+        ),
+        _Timeout = infinity,
+        _BackInTIme = infinity
+    ),
     InsertRes = maps:get(mocked_insert_report, Config, #{}),
     {ok, InsertRes};
-wait_until_processed(#{mock := false} = Config, ActionResId, BeginMark) ->
+wait_until_processed(#{mock := false} = Config, ActionResId, BeginMark, ExpectedNumFiles) ->
     {ok, Res} =
         emqx_bridge_snowflake_connector:insert_report(ActionResId, #{begin_mark => BeginMark}),
     ct:pal("insert report (begin mark ~s):\n  ~p", [BeginMark, Res]),
     case Res of
         #{
-            <<"files">> := [_ | _],
+            <<"files">> := Files,
             <<"statistics">> := #{<<"activeFilesCount">> := 0}
-        } ->
+        } when length(Files) >= ExpectedNumFiles ->
             ct:pal("insertReport response:\n  ~p", [Res]),
             {ok, Res};
         _ ->
             ct:sleep(2_000),
-            wait_until_processed(Config, ActionResId, BeginMark)
+            wait_until_processed(Config, ActionResId, BeginMark, ExpectedNumFiles)
     end.
 
 bin2hex(Bin) ->
@@ -564,7 +574,8 @@ t_aggreg_upload(Config) ->
                     #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}
                 ),
             %% Check the uploaded objects.
-            wait_until_processed(Config, ActionResId, BeginMark),
+            ExpectedNumFiles = 2,
+            wait_until_processed(Config, ActionResId, BeginMark, ExpectedNumFiles),
             Rows = get_all_rows(Config),
             [
                 P1Hex,
@@ -1032,6 +1043,75 @@ t_aggreg_invalid_column_values(Config0) ->
     ),
     ok.
 
+t_aggreg_inexistent_database(init, Config) when is_list(Config) ->
+    t_aggreg_inexistent_database(init, maps:from_list(Config));
+t_aggreg_inexistent_database(init, #{mock := true} = Config) ->
+    Mod = ?CONN_MOD,
+    meck:expect(Mod, do_stage_file, fun(
+        _ConnPid, _Filename, _Database, _Schema, _Stage, _ActionName
+    ) ->
+        Msg =
+            "SQL compilation error:, Database 'INEXISTENT' does not"
+            " exist or not authorized. SQLSTATE IS: 02000",
+        {error, Msg}
+    end),
+    maps:to_list(Config);
+t_aggreg_inexistent_database(init, #{} = Config) ->
+    maps:to_list(Config).
+t_aggreg_inexistent_database(Config) ->
+    ?check_trace(
+        emqx_bridge_v2_testlib:snk_timetrap(),
+        begin
+            {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(
+                Config,
+                #{<<"parameters">> => #{<<"database">> => <<"inexistent">>}}
+            ),
+            ActionResId = emqx_bridge_v2_testlib:bridge_id(Config),
+            %% BeginMark = get_begin_mark(Config, ActionResId),
+            {ok, _Rule} =
+                emqx_bridge_v2_testlib:create_rule_and_action_http(
+                    ?ACTION_TYPE_BIN, <<"">>, Config, #{
+                        sql => sql1()
+                    }
+                ),
+            Messages1 = lists:map(fun mk_message/1, [
+                {<<"C1">>, <<"sf/a/b/c">>, <<"{\"hello\":\"world\"}">>},
+                {<<"C2">>, <<"sf/foo/bar">>, <<"baz">>},
+                {<<"C3">>, <<"sf/t/42">>, <<"">>}
+            ]),
+            ok = publish_messages(Messages1),
+            %% Wait until the insert files request fails
+            ct:pal("waiting for delivery to fail..."),
+            ?block_until(#{?snk_kind := "aggregated_buffer_delivery_failed"}),
+            %% When channel health check happens, we check aggregator for errors.
+            %% Current implementation will mark the action as unhealthy.
+            ct:pal("waiting for delivery failure to be noticed by health check..."),
+            ?block_until(#{?snk_kind := "snowflake_check_aggreg_upload_error_found"}),
+
+            ?retry(
+                _Sleep = 500,
+                _Retries = 10,
+                ?assertMatch(
+                    {200, #{
+                        <<"error">> :=
+                            <<"{unhealthy_target,", _/binary>>
+                    }},
+                    emqx_bridge_v2_testlib:simplify_result(
+                        emqx_bridge_v2_testlib:get_action_api(Config)
+                    )
+                )
+            ),
+
+            ?assertEqual(3, emqx_resource_metrics:matched_get(ActionResId)),
+            %% Currently, failure metrics are not bumped when aggregated uploads fail
+            ?assertEqual(0, emqx_resource_metrics:failed_get(ActionResId)),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %% Todo: test scenarios
 %% * User error in rule definition; e.g.:
 %%    - forgot to use `bin2hexstr' to encode the payload

+ 8 - 4
apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl

@@ -158,7 +158,7 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0})
             Delivery#delivery{transfer = Transfer};
         {error, Reason} ->
             %% Todo: handle more gracefully?  Retry?
-            error({transfer_failed, Reason})
+            exit({upload_failed, Reason})
     end.
 
 process_complete(#delivery{id = Id, empty = true}) ->
@@ -169,9 +169,13 @@ process_complete(#delivery{
 }) ->
     Trailer = emqx_connector_aggreg_csv:close(Container),
     Transfer = Mod:process_append(Trailer, Transfer0),
-    {ok, Completed} = Mod:process_complete(Transfer),
-    ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
-    ok.
+    case Mod:process_complete(Transfer) of
+        {ok, Completed} ->
+            ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
+            ok;
+        {error, Error} ->
+            exit({upload_failed, Error})
+    end.
 
 %%
 

+ 1 - 2
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl

@@ -415,8 +415,7 @@ handle_delivery_exit(Buffer, {shutdown, {skipped, Reason}}, St = #st{name = Name
     ok = discard_buffer(Buffer),
     St;
 handle_delivery_exit(Buffer, Error, St = #st{name = Name}) ->
-    ?SLOG(error, #{
-        msg => "aggregated_buffer_delivery_failed",
+    ?tp(error, "aggregated_buffer_delivery_failed", #{
         action => Name,
         buffer => {Buffer#buffer.since, Buffer#buffer.seq},
         filename => Buffer#buffer.filename,