Sfoglia il codice sorgente

fix(gcp_consumer): handle 403 responses

Fixes https://emqx.atlassian.net/browse/EMQX-10736
Thales Macedo Garitezi 2 anni fa
parent
commit
ba956ebe88

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_gcp_pubsub, [
     {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl

@@ -205,7 +205,7 @@ get_topic(Topic, ConnectorState) ->
     Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
     Body = <<>>,
     PreparedRequest = {prepared_request, {Method, Path, Body}},
-    query_sync(PreparedRequest, ConnectorState).
+    ?MODULE:query_sync(PreparedRequest, ConnectorState).
 
 %%-------------------------------------------------------------------------------------------------
 %% Helper fns

+ 22 - 5
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -217,7 +217,9 @@ handle_continue(?ensure_subscription, State0) ->
             {noreply, State0, {continue, ?ensure_subscription}};
         not_found ->
             %% there's nothing much to do if the topic suddenly doesn't exist anymore.
-            {stop, {error, topic_not_found}, State0}
+            {stop, {error, topic_not_found}, State0};
+        permission_denied ->
+            {stop, {error, permission_denied}, State0}
     end;
 handle_continue(?patch_subscription, State0) ->
     ?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}),
@@ -291,14 +293,17 @@ handle_info(Msg, State0) ->
     }),
     {noreply, State0}.
 
-terminate({error, topic_not_found} = _Reason, State) ->
+terminate({error, Reason}, State) when
+    Reason =:= topic_not_found;
+    Reason =:= permission_denied
+->
     #{
         instance_id := InstanceId,
         topic := _Topic
     } = State,
     optvar:unset(?OPTVAR_SUB_OK(self())),
-    emqx_bridge_gcp_pubsub_impl_consumer:mark_topic_as_nonexistent(InstanceId),
-    ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => _Topic}),
+    emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(InstanceId, Reason),
+    ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => {error, Reason}, topic => _Topic}),
     ok;
 terminate(_Reason, _State) ->
     optvar:unset(?OPTVAR_SUB_OK(self())),
@@ -329,7 +334,8 @@ ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) ->
 ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) ->
     State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}.
 
--spec ensure_subscription_exists(state()) -> continue | retry | not_found | already_exists.
+-spec ensure_subscription_exists(state()) ->
+    continue | retry | not_found | permission_denied | already_exists.
 ensure_subscription_exists(State) ->
     ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}),
     #{
@@ -367,6 +373,17 @@ ensure_subscription_exists(State) ->
                 }
             ),
             not_found;
+        {error, #{status_code := 403}} ->
+            %% permission denied
+            ?tp(
+                warning,
+                "gcp_pubsub_consumer_worker_permission_denied",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic
+                }
+            ),
+            permission_denied;
         {ok, #{status_code := 200}} ->
             ?tp(
                 debug,

+ 35 - 22
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl

@@ -17,9 +17,9 @@
 
 %% health check API
 -export([
-    mark_topic_as_nonexistent/1,
-    unset_nonexistent_topic/1,
-    is_nonexistent_topic/1
+    mark_as_unhealthy/2,
+    clear_unhealthy/1,
+    check_if_unhealthy/1
 ]).
 
 -include_lib("emqx/include/logger.hrl").
@@ -47,11 +47,15 @@
 
 -define(AUTO_RECONNECT_S, 2).
 -define(DEFAULT_FORGET_INTERVAL, timer:seconds(60)).
--define(OPTVAR_TOPIC_NOT_FOUND(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}).
+-define(OPTVAR_UNHEALTHY(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}).
 -define(TOPIC_MESSAGE,
     "GCP PubSub topics are invalid.  Please check the logs, check if the "
     "topics exist in GCP and if the service account has permissions to use them."
 ).
+-define(PERMISSION_MESSAGE,
+    "Permission denied while verifying topic existence.  Please check that the "
+    "provided service account has the correct permissions configured."
+).
 
 %%-------------------------------------------------------------------------------------------------
 %% `emqx_resource' API
@@ -77,7 +81,7 @@ on_start(InstanceId, Config0) ->
 -spec on_stop(resource_id(), state()) -> ok | {error, term()}.
 on_stop(InstanceId, _State) ->
     ?tp(gcp_pubsub_consumer_stop_enter, #{}),
-    unset_nonexistent_topic(InstanceId),
+    clear_unhealthy(InstanceId),
     ok = stop_consumers(InstanceId),
     emqx_bridge_gcp_pubsub_client:stop(InstanceId).
 
@@ -85,10 +89,12 @@ on_stop(InstanceId, _State) ->
 on_get_status(InstanceId, State) ->
     %% We need to check this flag separately because the workers might be gone when we
     %% check them.
-    case is_nonexistent_topic(InstanceId) of
-        true ->
+    case check_if_unhealthy(InstanceId) of
+        {error, topic_not_found} ->
             {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
-        false ->
+        {error, permission_denied} ->
+            {disconnected, State, {unhealthy_target, ?PERMISSION_MESSAGE}};
+        ok ->
             #{client := Client} = State,
             check_workers(InstanceId, Client)
     end.
@@ -97,24 +103,24 @@ on_get_status(InstanceId, State) ->
 %% Health check API (signalled by consumer worker)
 %%-------------------------------------------------------------------------------------------------
 
--spec mark_topic_as_nonexistent(resource_id()) -> ok.
-mark_topic_as_nonexistent(InstanceId) ->
-    optvar:set(?OPTVAR_TOPIC_NOT_FOUND(InstanceId), true),
+-spec mark_as_unhealthy(resource_id(), topic_not_found | permission_denied) -> ok.
+mark_as_unhealthy(InstanceId, Reason) ->
+    optvar:set(?OPTVAR_UNHEALTHY(InstanceId), Reason),
     ok.
 
--spec unset_nonexistent_topic(resource_id()) -> ok.
-unset_nonexistent_topic(InstanceId) ->
-    optvar:unset(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)),
-    ?tp(gcp_pubsub_consumer_unset_nonexistent_topic, #{}),
+-spec clear_unhealthy(resource_id()) -> ok.
+clear_unhealthy(InstanceId) ->
+    optvar:unset(?OPTVAR_UNHEALTHY(InstanceId)),
+    ?tp(gcp_pubsub_consumer_clear_unhealthy, #{}),
     ok.
 
--spec is_nonexistent_topic(resource_id()) -> boolean().
-is_nonexistent_topic(InstanceId) ->
-    case optvar:peek(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)) of
-        {ok, true} ->
-            true;
-        _ ->
-            false
+-spec check_if_unhealthy(resource_id()) -> ok | {error, topic_not_found | permission_denied}.
+check_if_unhealthy(InstanceId) ->
+    case optvar:peek(?OPTVAR_UNHEALTHY(InstanceId)) of
+        {ok, Reason} ->
+            {error, Reason};
+        undefined ->
+            ok
     end.
 
 %%-------------------------------------------------------------------------------------------------
@@ -153,6 +159,11 @@ start_consumers(InstanceId, Client, Config) ->
             throw(
                 {unhealthy_target, ?TOPIC_MESSAGE}
             );
+        {error, permission_denied} ->
+            _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
+            throw(
+                {unhealthy_target, ?PERMISSION_MESSAGE}
+            );
         {error, _} ->
             %% connection might be down; we'll have to check topic existence during health
             %% check, or the workers will kill themselves when they realized there's no
@@ -229,6 +240,8 @@ check_for_topic_existence(Topic, Client) ->
             ok;
         {error, #{status_code := 404}} ->
             {error, not_found};
+        {error, #{status_code := 403}} ->
+            {error, permission_denied};
         {error, Reason} ->
             ?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}),
             {error, Reason}

+ 134 - 1
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -760,6 +760,64 @@ prop_acked_ids_eventually_forgotten(Trace) ->
     ),
     ok.
 
+permission_denied_response() ->
+    Link =
+        <<"https://console.developers.google.com/project/9999/apiui/credential">>,
+    {error, #{
+        status_code => 403,
+        headers =>
+            [
+                {<<"vary">>, <<"X-Origin">>},
+                {<<"vary">>, <<"Referer">>},
+                {<<"content-type">>, <<"application/json; charset=UTF-8">>},
+                {<<"date">>, <<"Tue, 15 Aug 2023 13:59:09 GMT">>},
+                {<<"server">>, <<"ESF">>},
+                {<<"cache-control">>, <<"private">>},
+                {<<"x-xss-protection">>, <<"0">>},
+                {<<"x-frame-options">>, <<"SAMEORIGIN">>},
+                {<<"x-content-type-options">>, <<"nosniff">>},
+                {<<"alt-svc">>, <<"h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000">>},
+                {<<"accept-ranges">>, <<"none">>},
+                {<<"vary">>, <<"Origin,Accept-Encoding">>},
+                {<<"transfer-encoding">>, <<"chunked">>}
+            ],
+        body => emqx_utils_json:encode(
+            #{
+                <<"error">> =>
+                    #{
+                        <<"code">> => 403,
+                        <<"details">> =>
+                            [
+                                #{
+                                    <<"@type">> => <<"type.googleapis.com/google.rpc.Help">>,
+                                    <<"links">> =>
+                                        [
+                                            #{
+                                                <<"description">> =>
+                                                    <<"Google developer console API key">>,
+                                                <<"url">> =>
+                                                    Link
+                                            }
+                                        ]
+                                },
+                                #{
+                                    <<"@type">> => <<"type.googleapis.com/google.rpc.ErrorInfo">>,
+                                    <<"domain">> => <<"googleapis.com">>,
+                                    <<"metadata">> =>
+                                        #{
+                                            <<"consumer">> => <<"projects/9999">>,
+                                            <<"service">> => <<"pubsub.googleapis.com">>
+                                        },
+                                    <<"reason">> => <<"CONSUMER_INVALID">>
+                                }
+                            ],
+                        <<"message">> => <<"Project #9999 has been deleted.">>,
+                        <<"status">> => <<"PERMISSION_DENIED">>
+                    }
+            }
+        )
+    }}.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -785,7 +843,7 @@ t_start_stop(Config) ->
             prop_client_stopped(),
             prop_workers_stopped(PubSubTopic),
             fun(Trace) ->
-                ?assertMatch([_], ?of_kind(gcp_pubsub_consumer_unset_nonexistent_topic, Trace)),
+                ?assertMatch([_], ?of_kind(gcp_pubsub_consumer_clear_unhealthy, Trace)),
                 ok
             end
         ]
@@ -1992,6 +2050,81 @@ t_get_subscription(Config) ->
     ),
     ok.
 
+t_permission_denied_topic_check(Config) ->
+    [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            %% the emulator does not check any credentials
+            emqx_common_test_helpers:with_mock(
+                emqx_bridge_gcp_pubsub_client,
+                query_sync,
+                fun(PreparedRequest = {prepared_request, {Method, Path, _Body}}, Client) ->
+                    RE = iolist_to_binary(["/topics/", PubSubTopic, "$"]),
+                    case {Method =:= get, re:run(Path, RE)} of
+                        {true, {match, _}} ->
+                            permission_denied_response();
+                        _ ->
+                            meck:passthrough([PreparedRequest, Client])
+                    end
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            create_bridge(Config),
+                            #{?snk_kind := gcp_pubsub_stop},
+                            5_000
+                        ),
+                    ?assertMatch(
+                        {ok, disconnected},
+                        emqx_resource_manager:health_check(ResourceId)
+                    ),
+                    ?assertMatch(
+                        {ok, _Group, #{error := {unhealthy_target, "Permission denied" ++ _}}},
+                        emqx_resource_manager:lookup_cached(ResourceId)
+                    ),
+                    ok
+                end
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_permission_denied_worker(Config) ->
+    ?check_trace(
+        begin
+            emqx_common_test_helpers:with_mock(
+                emqx_bridge_gcp_pubsub_client,
+                query_sync,
+                fun(PreparedRequest = {prepared_request, {Method, _Path, _Body}}, Client) ->
+                    case Method =:= put of
+                        true ->
+                            permission_denied_response();
+                        false ->
+                            meck:passthrough([PreparedRequest, Client])
+                    end
+                end,
+                fun() ->
+                    {{ok, _}, {ok, _}} =
+                        ?wait_async_action(
+                            create_bridge(
+                                Config
+                            ),
+                            #{?snk_kind := gcp_pubsub_consumer_worker_terminate},
+                            10_000
+                        ),
+
+                    ok
+                end
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_cluster_subscription(Config) ->
     [
         #{