Sfoglia il codice sorgente

Merge pull request #11501 from thalesmg/gcp-consu-err401-r52-20230823

fix(gcp_consumer): handle 401 errors
Thales Macedo Garitezi 2 anni fa
parent
commit
4ac0972979

+ 15 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -218,6 +218,8 @@ handle_continue(?ensure_subscription, State0) ->
         not_found ->
             %% there's nothing much to do if the topic suddenly doesn't exist anymore.
             {stop, {error, topic_not_found}, State0};
+        bad_credentials ->
+            {stop, {error, bad_credentials}, State0};
         permission_denied ->
             {stop, {error, permission_denied}, State0}
     end;
@@ -295,6 +297,7 @@ handle_info(Msg, State0) ->
 
 terminate({error, Reason}, State) when
     Reason =:= topic_not_found;
+    Reason =:= bad_credentials;
     Reason =:= permission_denied
 ->
     #{
@@ -335,7 +338,7 @@ ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) ->
     State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}.
 
 -spec ensure_subscription_exists(state()) ->
-    continue | retry | not_found | permission_denied | already_exists.
+    continue | retry | not_found | permission_denied | bad_credentials | already_exists.
 ensure_subscription_exists(State) ->
     ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}),
     #{
@@ -384,6 +387,17 @@ ensure_subscription_exists(State) ->
                 }
             ),
             permission_denied;
+        {error, #{status_code := 401}} ->
+            %% bad credentials
+            ?tp(
+                warning,
+                "gcp_pubsub_consumer_worker_bad_credentials",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic
+                }
+            ),
+            bad_credentials;
         {ok, #{status_code := 200}} ->
             ?tp(
                 debug,

+ 21 - 2
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl

@@ -94,6 +94,8 @@ on_get_status(InstanceId, State) ->
             {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
         {error, permission_denied} ->
             {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}};
+        {error, bad_credentials} ->
+            {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}};
         ok ->
             #{client := Client} = State,
             check_workers(InstanceId, Client)
@@ -103,7 +105,12 @@ on_get_status(InstanceId, State) ->
 %% Health check API (signalled by consumer worker)
 %%-------------------------------------------------------------------------------------------------
 
--spec mark_as_unhealthy(resource_id(), topic_not_found | permission_denied) -> ok.
+-spec mark_as_unhealthy(
+    resource_id(),
+    topic_not_found
+    | permission_denied
+    | bad_credentials
+) -> ok.
 mark_as_unhealthy(InstanceId, Reason) ->
     optvar:set(?OPTVAR_UNHEALTHY(InstanceId), Reason),
     ok.
@@ -114,7 +121,12 @@ clear_unhealthy(InstanceId) ->
     ?tp(gcp_pubsub_consumer_clear_unhealthy, #{}),
     ok.
 
--spec check_if_unhealthy(resource_id()) -> ok | {error, topic_not_found | permission_denied}.
+-spec check_if_unhealthy(resource_id()) ->
+    ok
+    | {error,
+        topic_not_found
+        | permission_denied
+        | bad_credentials}.
 check_if_unhealthy(InstanceId) ->
     case optvar:peek(?OPTVAR_UNHEALTHY(InstanceId)) of
         {ok, Reason} ->
@@ -164,6 +176,11 @@ start_consumers(InstanceId, Client, Config) ->
             throw(
                 {unhealthy_target, ?PERMISSION_MESSAGE}
             );
+        {error, bad_credentials} ->
+            _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
+            throw(
+                {unhealthy_target, ?PERMISSION_MESSAGE}
+            );
         {error, _} ->
             %% connection might be down; we'll have to check topic existence during health
             %% check, or the workers will kill themselves when they realized there's no
@@ -242,6 +259,8 @@ check_for_topic_existence(Topic, Client) ->
             {error, not_found};
         {error, #{status_code := 403}} ->
             {error, permission_denied};
+        {error, #{status_code := 401}} ->
+            {error, bad_credentials};
         {error, Reason} ->
             ?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}),
             {error, Reason}

+ 130 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -818,6 +818,61 @@ permission_denied_response() ->
         )
     }}.
 
+unauthenticated_response() ->
+    Msg = <<
+        "Request had invalid authentication credentials. Expected OAuth 2 access token,"
+        " login cookie or other valid authentication credential. "
+        "See https://developers.google.com/identity/sign-in/web/devconsole-project."
+    >>,
+    {error, #{
+        body =>
+            #{
+                <<"error">> =>
+                    #{
+                        <<"code">> => 401,
+                        <<"details">> =>
+                            [
+                                #{
+                                    <<"@type">> =>
+                                        <<"type.googleapis.com/google.rpc.ErrorInfo">>,
+                                    <<"domain">> => <<"googleapis.com">>,
+                                    <<"metadata">> =>
+                                        #{
+                                            <<"email">> =>
+                                                <<"test-516@emqx-cloud-pubsub.iam.gserviceaccount.com">>,
+                                            <<"method">> =>
+                                                <<"google.pubsub.v1.Publisher.CreateTopic">>,
+                                            <<"service">> =>
+                                                <<"pubsub.googleapis.com">>
+                                        },
+                                    <<"reason">> => <<"ACCOUNT_STATE_INVALID">>
+                                }
+                            ],
+                        <<"message">> => Msg,
+
+                        <<"status">> => <<"UNAUTHENTICATED">>
+                    }
+            },
+        headers =>
+            [
+                {<<"www-authenticate">>, <<"Bearer realm=\"https://accounts.google.com/\"">>},
+                {<<"vary">>, <<"X-Origin">>},
+                {<<"vary">>, <<"Referer">>},
+                {<<"content-type">>, <<"application/json; charset=UTF-8">>},
+                {<<"date">>, <<"Wed, 23 Aug 2023 12:41:40 GMT">>},
+                {<<"server">>, <<"ESF">>},
+                {<<"cache-control">>, <<"private">>},
+                {<<"x-xss-protection">>, <<"0">>},
+                {<<"x-frame-options">>, <<"SAMEORIGIN">>},
+                {<<"x-content-type-options">>, <<"nosniff">>},
+                {<<"alt-svc">>, <<"h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000">>},
+                {<<"accept-ranges">>, <<"none">>},
+                {<<"vary">>, <<"Origin,Accept-Encoding">>},
+                {<<"transfer-encoding">>, <<"chunked">>}
+            ],
+        status_code => 401
+    }}.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -2125,6 +2180,81 @@ t_permission_denied_worker(Config) ->
     ),
     ok.
 
+t_unauthenticated_topic_check(Config) ->
+    [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            %% the emulator does not check any credentials
+            emqx_common_test_helpers:with_mock(
+                emqx_bridge_gcp_pubsub_client,
+                query_sync,
+                fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) ->
+                    RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]),
+                    case {Method =:= get, re:run(Path, RE)} of
+                        {true, {match, _}} ->
+                            unauthenticated_response();
+                        _ ->
+                            meck:passthrough([PreparedRequest, Client])
+                    end
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            create_bridge(Config),
+                            #{?snk_kind := gcp_pubsub_stop},
+                            5_000
+                        ),
+                    ?assertMatch(
+                        {ok, disconnected},
+                        emqx_resource_manager:health_check(ResourceId)
+                    ),
+                    ?assertMatch(
+                        {ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}},
+                        emqx_resource_manager:lookup_cached(ResourceId)
+                    ),
+                    ok
+                end
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_unauthenticated_worker(Config) ->
+    ?check_trace(
+        begin
+            emqx_common_test_helpers:with_mock(
+                emqx_bridge_gcp_pubsub_client,
+                query_sync,
+                fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) ->
+                    case Method =:= put of
+                        true ->
+                            unauthenticated_response();
+                        false ->
+                            meck:passthrough([PreparedRequest, Client])
+                    end
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            create_bridge(
+                                Config
+                            ),
+                            #{?snk_kind := gcp_pubsub_consumer_worker_terminate},
+                            10_000
+                        ),
+
+                    ok
+                end
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_cluster_subscription(Config) ->
     [
         #{