浏览代码

fix(gcp_pubsub_producer): check for topic existence when creating action

Fixes https://emqx.atlassian.net/browse/EMQX-11949
Thales Macedo Garitezi 1 年之前
父节点
当前提交
dc16e59f2c

+ 3 - 3
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl

@@ -198,13 +198,13 @@ get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
 %%-------------------------------------------------------------------------------------------------
 
 -spec get_topic(topic(), state(), request_opts()) -> {ok, map()} | {error, term()}.
-get_topic(Topic, ConnectorState, ReqOpts) ->
-    #{project_id := ProjectId} = ConnectorState,
+get_topic(Topic, ClientState, ReqOpts) ->
+    #{project_id := ProjectId} = ClientState,
     Method = get,
     Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
     Body = <<>>,
     PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
-    ?MODULE:query_sync(PreparedRequest, ConnectorState).
+    ?MODULE:query_sync(PreparedRequest, ClientState).
 
 %%-------------------------------------------------------------------------------------------------
 %% Helper fns

+ 30 - 13
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -186,10 +186,14 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) ->
     {ok, connector_state()}.
 on_add_channel(_ConnectorResId, ConnectorState0, ActionId, ActionConfig) ->
     #{installed_actions := InstalledActions0} = ConnectorState0,
-    ChannelState = install_channel(ActionConfig),
-    InstalledActions = InstalledActions0#{ActionId => ChannelState},
-    ConnectorState = ConnectorState0#{installed_actions := InstalledActions},
-    {ok, ConnectorState}.
+    case install_channel(ActionConfig, ConnectorState0) of
+        {ok, ChannelState} ->
+            InstalledActions = InstalledActions0#{ActionId => ChannelState},
+            ConnectorState = ConnectorState0#{installed_actions := InstalledActions},
+            {ok, ConnectorState};
+        Error = {error, _} ->
+            Error
+    end.
 
 -spec on_remove_channel(
     connector_resource_id(),
@@ -218,8 +222,7 @@ on_get_channel_status(_ConnectorResId, _ChannelId, _ConnectorState) ->
 %% Helper fns
 %%-------------------------------------------------------------------------------------------------
 
-%% TODO: check if topic exists ("unhealthy target")
-install_channel(ActionConfig) ->
+install_channel(ActionConfig, ConnectorState) ->
     #{
         parameters := #{
             attributes_template := AttributesTemplate,
@@ -231,13 +234,27 @@ install_channel(ActionConfig) ->
             request_ttl := RequestTTL
         }
     } = ActionConfig,
-    #{
-        attributes_template => preproc_attributes(AttributesTemplate),
-        ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
-        payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
-        pubsub_topic => PubSubTopic,
-        request_ttl => RequestTTL
-    }.
+    #{client := Client} = ConnectorState,
+    case
+        emqx_bridge_gcp_pubsub_client:get_topic(PubSubTopic, Client, #{request_ttl => RequestTTL})
+    of
+        {error, #{status_code := 404}} ->
+            {error, {unhealthy_target, <<"Topic does not exist">>}};
+        {error, #{status_code := 403}} ->
+            {error, {unhealthy_target, <<"Permission denied for topic">>}};
+        {error, #{status_code := 401}} ->
+            {error, {unhealthy_target, <<"Bad credentials">>}};
+        {error, Reason} ->
+            {error, Reason};
+        {ok, _} ->
+            {ok, #{
+                attributes_template => preproc_attributes(AttributesTemplate),
+                ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
+                payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
+                pubsub_topic => PubSubTopic,
+                request_ttl => RequestTTL
+            }}
+    end.
 
 -spec do_send_requests_sync(
     connector_state(),

+ 100 - 55
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -76,6 +76,7 @@ only_sync_tests() ->
     [t_query_sync].
 
 init_per_suite(Config) ->
+    emqx_common_test_helpers:clear_screen(),
     Apps = emqx_cth_suite:start(
         [
             emqx,
@@ -257,20 +258,31 @@ create_rule_and_action_http(Config) ->
 success_http_handler() ->
     TestPid = self(),
     fun(Req0, State) ->
-        {ok, Body, Req} = cowboy_req:read_body(Req0),
-        TestPid ! {http, cowboy_req:headers(Req), Body},
-        Rep = cowboy_req:reply(
-            200,
-            #{<<"content-type">> => <<"application/json">>},
-            emqx_utils_json:encode(#{messageIds => [<<"6058891368195201">>]}),
-            Req
-        ),
-        {ok, Rep, State}
+        case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
+            {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
+                Rep = cowboy_req:reply(
+                    200,
+                    #{<<"content-type">> => <<"application/json">>},
+                    <<"{}">>,
+                    Req0
+                ),
+                {ok, Rep, State};
+            _ ->
+                {ok, Body, Req} = cowboy_req:read_body(Req0),
+                TestPid ! {http, cowboy_req:headers(Req), Body},
+                Rep = cowboy_req:reply(
+                    200,
+                    #{<<"content-type">> => <<"application/json">>},
+                    emqx_utils_json:encode(#{messageIds => [<<"6058891368195201">>]}),
+                    Req
+                ),
+                {ok, Rep, State}
+        end
     end.
 
 start_echo_http_server() ->
     HTTPHost = "localhost",
-    HTTPPath = <<"/v1/projects/myproject/topics/mytopic:publish">>,
+    HTTPPath = '_',
     ServerSSLOpts =
         [
             {verify, verify_none},
@@ -656,6 +668,20 @@ wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
         error({timeout_waiting_for_telemetry, EventName})
     end.
 
+kill_gun_process(EhttpcPid) ->
+    State = ehttpc:get_state(EhttpcPid, minimal),
+    GunPid = maps:get(client, State),
+    true = is_pid(GunPid),
+    _ = exit(GunPid, kill),
+    ok.
+
+kill_gun_processes(ConnectorResourceId) ->
+    Pool = ehttpc:workers(ConnectorResourceId),
+    Workers = lists:map(fun({_, Pid}) -> Pid end, Pool),
+    %% assert there is at least one pool member
+    ?assertMatch([_ | _], Workers),
+    lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1343,15 +1369,26 @@ t_failure_with_body(Config) ->
     TestPid = self(),
     FailureWithBodyHandler =
         fun(Req0, State) ->
-            {ok, Body, Req} = cowboy_req:read_body(Req0),
-            TestPid ! {http, cowboy_req:headers(Req), Body},
-            Rep = cowboy_req:reply(
-                400,
-                #{<<"content-type">> => <<"application/json">>},
-                emqx_utils_json:encode(#{}),
-                Req
-            ),
-            {ok, Rep, State}
+            case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
+                {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
+                    Rep = cowboy_req:reply(
+                        200,
+                        #{<<"content-type">> => <<"application/json">>},
+                        <<"{}">>,
+                        Req0
+                    ),
+                    {ok, Rep, State};
+                _ ->
+                    {ok, Body, Req} = cowboy_req:read_body(Req0),
+                    TestPid ! {http, cowboy_req:headers(Req), Body},
+                    Rep = cowboy_req:reply(
+                        400,
+                        #{<<"content-type">> => <<"application/json">>},
+                        emqx_utils_json:encode(#{}),
+                        Req
+                    ),
+                    {ok, Rep, State}
+            end
         end,
     ok = emqx_bridge_http_connector_test_server:set_handler(FailureWithBodyHandler),
     Topic = <<"t/topic">>,
@@ -1381,15 +1418,26 @@ t_failure_no_body(Config) ->
     TestPid = self(),
     FailureNoBodyHandler =
         fun(Req0, State) ->
-            {ok, Body, Req} = cowboy_req:read_body(Req0),
-            TestPid ! {http, cowboy_req:headers(Req), Body},
-            Rep = cowboy_req:reply(
-                400,
-                #{<<"content-type">> => <<"application/json">>},
-                <<>>,
-                Req
-            ),
-            {ok, Rep, State}
+            case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
+                {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
+                    Rep = cowboy_req:reply(
+                        200,
+                        #{<<"content-type">> => <<"application/json">>},
+                        <<"{}">>,
+                        Req0
+                    ),
+                    {ok, Rep, State};
+                _ ->
+                    {ok, Body, Req} = cowboy_req:read_body(Req0),
+                    TestPid ! {http, cowboy_req:headers(Req), Body},
+                    Rep = cowboy_req:reply(
+                        400,
+                        #{<<"content-type">> => <<"application/json">>},
+                        <<>>,
+                        Req
+                    ),
+                    {ok, Rep, State}
+            end
         end,
     ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
     Topic = <<"t/topic">>,
@@ -1415,20 +1463,6 @@ t_failure_no_body(Config) ->
     ),
     ok.
 
-kill_gun_process(EhttpcPid) ->
-    State = ehttpc:get_state(EhttpcPid, minimal),
-    GunPid = maps:get(client, State),
-    true = is_pid(GunPid),
-    _ = exit(GunPid, kill),
-    ok.
-
-kill_gun_processes(ConnectorResourceId) ->
-    Pool = ehttpc:workers(ConnectorResourceId),
-    Workers = lists:map(fun({_, Pid}) -> Pid end, Pool),
-    %% assert there is at least one pool member
-    ?assertMatch([_ | _], Workers),
-    lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers).
-
 t_unrecoverable_error(Config) ->
     ActionResourceId = ?config(action_resource_id, Config),
     ConnectorResourceId = ?config(connector_resource_id, Config),
@@ -1436,19 +1470,30 @@ t_unrecoverable_error(Config) ->
     TestPid = self(),
     FailureNoBodyHandler =
         fun(Req0, State) ->
-            {ok, Body, Req} = cowboy_req:read_body(Req0),
-            TestPid ! {http, cowboy_req:headers(Req), Body},
-            %% kill the gun process while it's waiting for the
-            %% response so we provoke an `{error, _}' response from
-            %% ehttpc.
-            ok = kill_gun_processes(ConnectorResourceId),
-            Rep = cowboy_req:reply(
-                200,
-                #{<<"content-type">> => <<"application/json">>},
-                <<>>,
-                Req
-            ),
-            {ok, Rep, State}
+            case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of
+                {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} ->
+                    Rep = cowboy_req:reply(
+                        200,
+                        #{<<"content-type">> => <<"application/json">>},
+                        <<"{}">>,
+                        Req0
+                    ),
+                    {ok, Rep, State};
+                _ ->
+                    {ok, Body, Req} = cowboy_req:read_body(Req0),
+                    TestPid ! {http, cowboy_req:headers(Req), Body},
+                    %% kill the gun process while it's waiting for the
+                    %% response so we provoke an `{error, _}' response from
+                    %% ehttpc.
+                    ok = kill_gun_processes(ConnectorResourceId),
+                    Rep = cowboy_req:reply(
+                        200,
+                        #{<<"content-type">> => <<"application/json">>},
+                        <<>>,
+                        Req
+                    ),
+                    {ok, Rep, State}
+            end
         end,
     ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
     Topic = <<"t/topic">>,

+ 215 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl

@@ -0,0 +1,215 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_v2_gcp_pubsub_producer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>).
+-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_common_test_helpers:clear_screen(),
+    emqx_bridge_gcp_pubsub_consumer_SUITE:init_per_suite(Config).
+
+end_per_suite(Config) ->
+    emqx_bridge_gcp_pubsub_consumer_SUITE:end_per_suite(Config).
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+common_init_per_testcase(TestCase, Config0) ->
+    ct:timetrap(timer:seconds(60)),
+    ServiceAccountJSON =
+        #{<<"project_id">> := ProjectId} =
+        emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
+    ConnectorConfig = connector_config(Name, ServiceAccountJSON),
+    PubsubTopic = Name,
+    ActionConfig = action_config(#{
+        connector => Name,
+        parameters => #{pubsub_topic => PubsubTopic}
+    }),
+    Config = [
+        {bridge_kind, action},
+        {action_type, ?ACTION_TYPE_BIN},
+        {action_name, Name},
+        {action_config, ActionConfig},
+        {connector_name, Name},
+        {connector_type, ?CONNECTOR_TYPE_BIN},
+        {connector_config, ConnectorConfig},
+        {service_account_json, ServiceAccountJSON},
+        {project_id, ProjectId},
+        {pubsub_topic, PubsubTopic}
+        | Config0
+    ],
+    ok = emqx_bridge_gcp_pubsub_consumer_SUITE:ensure_topic(Config, PubsubTopic),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+    emqx_common_test_helpers:call_janitor(60_000),
+    ok = snabbkaffe:stop(),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+connector_config(Name, ServiceAccountJSON) ->
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"tags">> => [<<"bridge">>],
+            <<"description">> => <<"my cool bridge">>,
+            <<"connect_timeout">> => <<"5s">>,
+            <<"pool_size">> => 8,
+            <<"pipelining">> => <<"100">>,
+            <<"max_retries">> => <<"2">>,
+            <<"service_account_json">> => ServiceAccountJSON,
+            <<"resource_opts">> =>
+                #{
+                    <<"health_check_interval">> => <<"1s">>,
+                    <<"start_after_created">> => true,
+                    <<"start_timeout">> => <<"5s">>
+                }
+        },
+    emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0).
+
+action_config(Overrides0) ->
+    Overrides = emqx_utils_maps:binary_key_map(Overrides0),
+    CommonConfig =
+        #{
+            <<"enable">> => true,
+            <<"connector">> => <<"please override">>,
+            <<"parameters">> =>
+                #{
+                    <<"pubsub_topic">> => <<"please override">>
+                },
+            <<"resource_opts">> => #{
+                <<"batch_size">> => 1,
+                <<"batch_time">> => <<"0ms">>,
+                <<"buffer_mode">> => <<"memory_only">>,
+                <<"buffer_seg_bytes">> => <<"10MB">>,
+                <<"health_check_interval">> => <<"15s">>,
+                <<"inflight_window">> => 100,
+                <<"max_buffer_bytes">> => <<"256MB">>,
+                <<"metrics_flush_interval">> => <<"1s">>,
+                <<"query_mode">> => <<"sync">>,
+                <<"request_ttl">> => <<"45s">>,
+                <<"resume_interval">> => <<"15s">>,
+                <<"worker_pool_size">> => <<"1">>
+            }
+        },
+    maps:merge(CommonConfig, Overrides).
+
+assert_persisted_service_account_json_is_binary(ConnectorName) ->
+    %% ensure cluster.hocon has a binary encoded json string as the value
+    {ok, Hocon} = hocon:files([application:get_env(emqx, cluster_hocon_file, undefined)]),
+    ?assertMatch(
+        Bin when is_binary(Bin),
+        emqx_utils_maps:deep_get(
+            [
+                <<"connectors">>,
+                <<"gcp_pubsub_producer">>,
+                ConnectorName,
+                <<"service_account_json">>
+            ],
+            Hocon
+        )
+    ),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_start_stop(Config) ->
+    ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop),
+    ok.
+
+t_create_via_http(Config) ->
+    ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
+    ok.
+
+t_create_via_http_json_object_service_account(Config0) ->
+    %% After the config goes through the roundtrip with `hocon_tconf:check_plain', service
+    %% account json comes back as a binary even if the input is a json object.
+    ConnectorName = ?config(connector_name, Config0),
+    ConnConfig0 = ?config(connector_config, Config0),
+    Config1 = proplists:delete(connector_config, Config0),
+    ConnConfig1 = maps:update_with(
+        <<"service_account_json">>,
+        fun(X) ->
+            ?assert(is_binary(X), #{json => X}),
+            JSON = emqx_utils_json:decode(X, [return_maps]),
+            ?assert(is_map(JSON)),
+            JSON
+        end,
+        ConnConfig0
+    ),
+    Config = [{connector_config, ConnConfig1} | Config1],
+    ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
+    assert_persisted_service_account_json_is_binary(ConnectorName),
+    ok.
+
+%% Check that creating an action (V2) with a non-existent topic leads returns an error.
+t_bad_topic(Config) ->
+    ?check_trace(
+        begin
+            %% Should it really be 201 here?
+            ?assertMatch(
+                {ok, {{_, 201, _}, _, #{}}},
+                emqx_bridge_v2_testlib:create_bridge_api(
+                    Config,
+                    #{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}}
+                )
+            ),
+            #{
+                kind := Kind,
+                type := Type,
+                name := Name
+            } = emqx_bridge_v2_testlib:get_common_values(Config),
+            ActionConfig0 = emqx_bridge_v2_testlib:get_value(action_config, Config),
+            ProbeRes = emqx_bridge_v2_testlib:probe_bridge_api(
+                Kind,
+                Type,
+                Name,
+                emqx_utils_maps:deep_merge(
+                    ActionConfig0,
+                    #{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}}
+                )
+            ),
+            ?assertMatch(
+                {error, {{_, 400, _}, _, _}},
+                ProbeRes
+            ),
+            {error, {{_, 400, _}, _, #{<<"message">> := Msg}}} = ProbeRes,
+            ?assertMatch(match, re:run(Msg, <<"unhealthy_target">>, [{capture, none}]), #{
+                msg => Msg
+            }),
+            ?assertMatch(match, re:run(Msg, <<"Topic does not exist">>, [{capture, none}]), #{
+                msg => Msg
+            }),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
changes/ee/fix-12656.en.md

@@ -0,0 +1 @@
+Added a topic check when creating a GCP PubSub Producer action, so it now fails when the topic does not exist or the provided credentials do not have enough permissions to use it.