Selaa lähdekoodia

fix(influxdb): check authentication

Checks authentication on bridge start and get status. Also, handle
authentication error when sending message.

Fixes https://emqx.atlassian.net/browse/EMQX-10213
Paulo Zulato 2 vuotta sitten
vanhempi
commit
a573b9b38c

+ 1 - 1
apps/emqx_bridge_influxdb/rebar.config

@@ -1,7 +1,7 @@
 {erl_opts, [debug_info]}.
 
 {deps, [
-    {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
+    {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.10"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 39 - 12
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl

@@ -150,7 +150,7 @@ on_batch_query_async(
     end.
 
 on_get_status(_InstId, #{client := Client}) ->
-    case influxdb:is_alive(Client) of
+    case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
         true ->
             connected;
         false ->
@@ -262,17 +262,32 @@ do_start_client(
         {ok, Client} ->
             case influxdb:is_alive(Client, true) of
                 true ->
-                    State = #{
-                        client => Client,
-                        write_syntax => to_config(Lines, Precision)
-                    },
-                    ?SLOG(info, #{
-                        msg => "starting influxdb connector success",
-                        connector => InstId,
-                        client => redact_auth(Client),
-                        state => redact_auth(State)
-                    }),
-                    {ok, State};
+                    case influxdb:check_auth(Client) of
+                        ok ->
+                            State = #{
+                                client => Client,
+                                write_syntax => to_config(Lines, Precision)
+                            },
+                            ?SLOG(info, #{
+                                msg => "starting influxdb connector success",
+                                connector => InstId,
+                                client => redact_auth(Client),
+                                state => redact_auth(State)
+                            }),
+                            {ok, State};
+                        Error ->
+                            ?tp(influxdb_connector_start_failed, #{error => auth_error}),
+                            ?SLOG(warning, #{
+                                msg => "failed_to_start_influxdb_connector",
+                                error => Error,
+                                connector => InstId,
+                                client => redact_auth(Client),
+                                reason => auth_error
+                            }),
+                            %% no leak
+                            _ = influxdb:stop_client(Client),
+                            {error, influxdb_client_auth_error}
+                    end;
                 {false, Reason} ->
                     ?tp(influxdb_connector_start_failed, #{
                         error => influxdb_client_not_alive, reason => Reason
@@ -388,6 +403,14 @@ do_query(InstId, Client, Points) ->
                 connector => InstId,
                 points => Points
             });
+        {error, {401, _, _}} ->
+            ?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
+            ?SLOG(error, #{
+                msg => "influxdb_authorization_failed",
+                client => redact_auth(Client),
+                connector => InstId
+            }),
+            {error, {unrecoverable_error, <<"authorization failure">>}};
         {error, Reason} = Err ->
             ?tp(influxdb_connector_do_query_failure, #{error => Reason}),
             ?SLOG(error, #{
@@ -421,6 +444,10 @@ reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
             Result = {error, {recoverable_error, Reason}},
             emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
     end;
+reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) ->
+    ?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
+    Result = {error, {unrecoverable_error, <<"authorization failure">>}},
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
 reply_callback(ReplyFunAndArgs, Result) ->
     emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
 

+ 128 - 0
apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl

@@ -1058,3 +1058,131 @@ t_missing_field(Config) ->
         end
     ),
     ok.
+
+t_authentication_error(Config0) ->
+    InfluxDBType = ?config(influxdb_type, Config0),
+    InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
+    InfluxConfig =
+        case InfluxDBType of
+            apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
+            apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
+        end,
+    Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                create_bridge(Config),
+                #{?snk_kind := influxdb_connector_start_failed},
+                10_000
+            )
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [#{error := auth_error} | _],
+                ?of_kind(influxdb_connector_start_failed, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
+t_authentication_error_on_get_status(Config0) ->
+    ResourceId = resource_id(Config0),
+
+    % Fake initialization to simulate credential update after bridge was created.
+    emqx_common_test_helpers:with_mock(
+        influxdb,
+        check_auth,
+        fun(_) ->
+            ok
+        end,
+        fun() ->
+            InfluxDBType = ?config(influxdb_type, Config0),
+            InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
+            InfluxConfig =
+                case InfluxDBType of
+                    apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
+                    apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
+                end,
+            Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
+            {ok, _} = create_bridge(Config),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 10,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            )
+        end
+    ),
+
+    % Now back to wrong credentials
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+    ok.
+
+t_authentication_error_on_send_message(Config0) ->
+    ResourceId = resource_id(Config0),
+    QueryMode = proplists:get_value(query_mode, Config0, sync),
+    InfluxDBType = ?config(influxdb_type, Config0),
+    InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
+    InfluxConfig =
+        case InfluxDBType of
+            apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
+            apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
+        end,
+    Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
+
+    % Fake initialization to simulate credential update after bridge was created.
+    emqx_common_test_helpers:with_mock(
+        influxdb,
+        check_auth,
+        fun(_) ->
+            ok
+        end,
+        fun() ->
+            {ok, _} = create_bridge(Config),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 10,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            )
+        end
+    ),
+
+    % Now back to wrong credentials
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        int_key => -123,
+        bool => true,
+        float_key => 24.5,
+        uint_key => 123
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"timestamp">> => erlang:system_time(millisecond),
+        <<"payload">> => Payload
+    },
+    case QueryMode of
+        sync ->
+            ?assertMatch(
+                {error, {unrecoverable_error, <<"authorization failure">>}},
+                send_message(Config, SentData)
+            );
+        async ->
+            ?check_trace(
+                begin
+                    ?wait_async_action(
+                        ?assertEqual(ok, send_message(Config, SentData)),
+                        #{?snk_kind := handle_async_reply},
+                        1_000
+                    )
+                end,
+                fun(Trace) ->
+                    ?assertMatch(
+                        [#{error := <<"authorization failure">>} | _],
+                        ?of_kind(influxdb_connector_do_query_failure, Trace)
+                    ),
+                    ok
+                end
+            )
+    end,
+    ok.

+ 1 - 0
changes/ee/fix-11031.en.md

@@ -0,0 +1 @@
+Fixed credential validation when creating bridge and checking status for InfluxDB Bridges.

+ 1 - 1
mix.exs

@@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do
   defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
-      {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.9", override: true},
+      {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.10", override: true},
       {:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"},