ソースを参照

feat(gcp_pubsub_consumer): implement GCP PubSub Consumer bridge

Fixes https://emqx.atlassian.net/browse/EMQX-10281
Thales Macedo Garitezi 2 年 前
コミット
b442910ff1

+ 23 - 0
.ci/docker-compose-file/docker-compose-gcp-emulator.yaml

@@ -0,0 +1,23 @@
+version: '3.9'
+
+services:
+  gcp_emulator:
+    container_name: gcp_emulator
+    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:435.0.1-emulators
+    restart: always
+    expose:
+      - "8085"
+    # ports:
+    #   - "8085:8085"
+    networks:
+      - emqx_bridge
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:8085"]
+      interval: 30s
+      timeout: 5s
+      retries: 4
+    command:
+      - bash
+      - "-c"
+      - |
+        gcloud beta emulators pubsub start --project=emqx-pubsub --host-port=0.0.0.0:8085 --impersonate-service-account test@emqx.iam.gserviceaccount.com

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -149,5 +149,11 @@
     "listen": "0.0.0.0:19100",
     "upstream": "minio-tls:9100",
     "enabled": true
+  },
+  {
+    "name": "gcp_emulator",
+    "listen": "0.0.0.0:8085",
+    "upstream": "gcp_emulator:8085",
+    "enabled": true
   }
 ]

+ 3 - 1
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -54,7 +54,9 @@
     (TYPE) =:= <<"mqtt">>
 ).
 -define(IS_INGRESS_BRIDGE(TYPE),
-    (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
+    (TYPE) =:= <<"kafka_consumer">> orelse
+        (TYPE) =:= <<"gcp_pubsub_consumer">> orelse
+        ?IS_BI_DIR_BRIDGE(TYPE)
 ).
 
 -if(?EMQX_RELEASE_EDITION == ee).

+ 2 - 0
apps/emqx_bridge_gcp_pubsub/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+gcp_emulator

+ 181 - 23
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl

@@ -7,7 +7,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1]).
 
 %% hocon_schema API
 -export([
@@ -39,11 +39,22 @@ namespace() ->
 roots() ->
     [].
 
-fields("config") ->
+fields("config_producer") ->
     emqx_bridge_schema:common_bridge_fields() ++
         emqx_resource_schema:fields("resource_opts") ++
-        fields(bridge_config);
-fields(bridge_config) ->
+        fields(connector_config) ++ fields(producer);
+fields("config_consumer") ->
+    emqx_bridge_schema:common_bridge_fields() ++
+        [
+            {resource_opts,
+                mk(
+                    ref("consumer_resource_opts"),
+                    #{required => true, desc => ?DESC(emqx_resource_schema, "creation_opts")}
+                )}
+        ] ++
+        fields(connector_config) ++
+        [{consumer, mk(ref(consumer), #{required => true, desc => ?DESC(consumer_opts)})}];
+fields(connector_config) ->
     [
         {connect_timeout,
             sc(
@@ -88,6 +99,20 @@ fields(bridge_config) ->
                     desc => ?DESC("request_timeout")
                 }
             )},
+        {service_account_json,
+            sc(
+                service_account_json(),
+                #{
+                    required => true,
+                    validator => fun ?MODULE:service_account_json_validator/1,
+                    converter => fun ?MODULE:service_account_json_converter/1,
+                    sensitive => true,
+                    desc => ?DESC("service_account_json")
+                }
+            )}
+    ];
+fields(producer) ->
+    [
         {payload_template,
             sc(
                 binary(),
@@ -110,28 +135,88 @@ fields(bridge_config) ->
                     required => true,
                     desc => ?DESC("pubsub_topic")
                 }
+            )}
+    ];
+fields(consumer) ->
+    [
+        {ack_retry_interval,
+            mk(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"5s">>,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
             )},
-        {service_account_json,
-            sc(
-                service_account_json(),
+        {pull_max_messages,
+            mk(
+                pos_integer(),
+                #{default => 100, desc => ?DESC("consumer_pull_max_messages")}
+            )},
+        {pull_worker_multiplier,
+            mk(
+                pos_integer(),
+                #{
+                    default => 1,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {topic_mapping,
+            mk(
+                hoconsc:array(ref(consumer_topic_mapping)),
                 #{
                     required => true,
-                    validator => fun ?MODULE:service_account_json_validator/1,
-                    converter => fun ?MODULE:service_account_json_converter/1,
-                    sensitive => true,
-                    desc => ?DESC("service_account_json")
+                    validator => fun consumer_topic_mapping_validator/1,
+                    desc => ?DESC("consumer_topic_mapping")
                 }
             )}
     ];
-fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post");
-fields("post") ->
-    [type_field(), name_field() | fields("config")];
-fields("put") ->
-    fields("config").
-
-desc("config") ->
+fields(consumer_topic_mapping) ->
+    [
+        {pubsub_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_pubsub_topic)})},
+        {mqtt_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_mqtt_topic)})},
+        {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
+        {payload_template,
+            mk(
+                string(),
+                #{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)}
+            )}
+    ];
+fields("consumer_resource_opts") ->
+    ResourceFields = emqx_resource_schema:fields("creation_opts"),
+    SupportedFields = [
+        auto_restart_interval,
+        health_check_interval,
+        request_ttl,
+        resume_interval,
+        worker_pool_size
+    ],
+    lists:filter(
+        fun({Field, _Sc}) -> lists:member(Field, SupportedFields) end,
+        ResourceFields
+    );
+fields("get_producer") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_producer");
+fields("post_producer") ->
+    [type_field_producer(), name_field() | fields("config_producer")];
+fields("put_producer") ->
+    fields("config_producer");
+fields("get_consumer") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_consumer");
+fields("post_consumer") ->
+    [type_field_consumer(), name_field() | fields("config_consumer")];
+fields("put_consumer") ->
+    fields("config_consumer").
+
+desc("config_producer") ->
+    ?DESC("desc_config");
+desc("config_consumer") ->
     ?DESC("desc_config");
+desc("consumer_resource_opts") ->
+    ?DESC(emqx_resource_schema, "creation_opts");
+desc(consumer_topic_mapping) ->
+    ?DESC("consumer_topic_mapping");
+desc(consumer) ->
+    ?DESC("consumer");
 desc(_) ->
     undefined.
 
@@ -139,13 +224,19 @@ conn_bridge_examples(Method) ->
     [
         #{
             <<"gcp_pubsub">> => #{
-                summary => <<"GCP PubSub Bridge">>,
-                value => values(Method)
+                summary => <<"GCP PubSub Producer Bridge">>,
+                value => values(producer, Method)
+            }
+        },
+        #{
+            <<"gcp_pubsub_consumer">> => #{
+                summary => <<"GCP PubSub Consumer Bridge">>,
+                value => values(consumer, Method)
             }
         }
     ].
 
-values(_Method) ->
+values(producer, _Method) ->
     #{
         pubsub_topic => <<"mytopic">>,
         service_account_json =>
@@ -173,17 +264,71 @@ values(_Method) ->
                     <<"https://oauth2.googleapis.com/token">>,
                 type => <<"service_account">>
             }
+    };
+values(consumer, _Method) ->
+    #{
+        connect_timeout => <<"15s">>,
+        consumer =>
+            #{
+                pull_max_messages => 100,
+                topic_mapping => [
+                    #{
+                        pubsub_topic => <<"pubsub-topic-1">>,
+                        mqtt_topic => <<"mqtt/topic/1">>,
+                        qos => 1,
+                        payload_template => <<"${.}">>
+                    },
+                    #{
+                        pubsub_topic => <<"pubsub-topic-2">>,
+                        mqtt_topic => <<"mqtt/topic/2">>,
+                        qos => 2,
+                        payload_template =>
+                            <<"v = ${.value}, a = ${.attributes}, o = ${.ordering_key}">>
+                    }
+                ]
+            },
+        resource_opts => #{request_ttl => <<"20s">>},
+        service_account_json =>
+            #{
+                auth_provider_x509_cert_url =>
+                    <<"https://www.googleapis.com/oauth2/v1/certs">>,
+                auth_uri =>
+                    <<"https://accounts.google.com/o/oauth2/auth">>,
+                client_email =>
+                    <<"test@myproject.iam.gserviceaccount.com">>,
+                client_id => <<"123812831923812319190">>,
+                client_x509_cert_url =>
+                    <<
+                        "https://www.googleapis.com/robot/v1/"
+                        "metadata/x509/test%40myproject.iam.gserviceaccount.com"
+                    >>,
+                private_key =>
+                    <<
+                        "-----BEGIN PRIVATE KEY-----\n"
+                        "MIIEvQI..."
+                    >>,
+                private_key_id => <<"kid">>,
+                project_id => <<"myproject">>,
+                token_uri =>
+                    <<"https://oauth2.googleapis.com/token">>,
+                type => <<"service_account">>
+            }
     }.
 
 %%-------------------------------------------------------------------------------------------------
 %% Helper fns
 %%-------------------------------------------------------------------------------------------------
 
+ref(Name) -> hoconsc:ref(?MODULE, Name).
+
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 
-type_field() ->
+type_field_producer() ->
     {type, mk(enum([gcp_pubsub]), #{required => true, desc => ?DESC("desc_type")})}.
 
+type_field_consumer() ->
+    {type, mk(enum([gcp_pubsub_consumer]), #{required => true, desc => ?DESC("desc_type")})}.
+
 name_field() ->
     {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
 
@@ -225,3 +370,16 @@ service_account_json_converter(Map) when is_map(Map) ->
     maps:with(ExpectedKeys, Map);
 service_account_json_converter(Val) ->
     Val.
+
+consumer_topic_mapping_validator(_TopicMapping = []) ->
+    {error, "There must be at least one GCP PubSub-MQTT topic mapping"};
+consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+    NumEntries = length(TopicMapping),
+    PubSubTopics = [KT || #{<<"pubsub_topic">> := KT} <- TopicMapping],
+    DistinctPubSubTopics = length(lists:usort(PubSubTopics)),
+    case DistinctPubSubTopics =:= NumEntries of
+        true ->
+            ok;
+        false ->
+            {error, "GCP PubSub topics must not be repeated in a bridge"}
+    end.

+ 58 - 17
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl

@@ -24,9 +24,12 @@
 ]).
 -export([reply_delegator/3]).
 
+-export([get_topic/3]).
+
 -export([get_jwt_authorization_header/1]).
 
 -type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json().
+-type project_id() :: binary().
 -type config() :: #{
     connect_timeout := emqx_schema:duration_ms(),
     max_retries := non_neg_integer(),
@@ -39,17 +42,26 @@
     jwt_config := emqx_connector_jwt:jwt_config(),
     max_retries := non_neg_integer(),
     pool_name := binary(),
-    project_id := binary(),
+    project_id := project_id(),
     request_ttl := infinity | timer:time()
 }.
 -type headers() :: [{binary(), iodata()}].
 -type body() :: iodata().
 -type status_code() :: 100..599.
--type method() :: post.
+-type method() :: get | post | put | patch.
 -type path() :: binary().
 -type prepared_request() :: {method(), path(), body()}.
+-type topic() :: binary().
 
--export_type([service_account_json/0, state/0, headers/0, body/0, status_code/0]).
+-export_type([
+    service_account_json/0,
+    state/0,
+    headers/0,
+    body/0,
+    status_code/0,
+    project_id/0,
+    topic/0
+]).
 
 -define(DEFAULT_PIPELINE_SIZE, 100).
 
@@ -76,11 +88,22 @@ on_start(
     }),
     %% emulating the emulator behavior
     %% https://cloud.google.com/pubsub/docs/emulator
-    HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"),
+    {Transport, HostPort} =
+        case os:getenv("PUBSUB_EMULATOR_HOST") of
+            false ->
+                {tls, "pubsub.googleapis.com:443"};
+            HostPort0 ->
+                %% The emulator is plain HTTP...
+                Transport0 = persistent_term:get({?MODULE, transport}, tcp),
+                {Transport0, HostPort0}
+        end,
     #{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
     PoolType = random,
-    Transport = tls,
-    TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),
+    TransportOpts =
+        case Transport of
+            tls -> emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none});
+            tcp -> []
+        end,
     NTransportOpts = emqx_utils:ipv6_probe(TransportOpts),
     PoolOpts = [
         {host, Host},
@@ -91,7 +114,7 @@ on_start(
         {pool_size, PoolSize},
         {transport, Transport},
         {transport_opts, NTransportOpts},
-        {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
+        {enable_pipelining, maps:get(pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
     ],
     #{
         jwt_config := JWTConfig,
@@ -150,10 +173,7 @@ on_stop(ResourceId, _State) ->
     {prepared_request, prepared_request()},
     state()
 ) ->
-    {ok, status_code(), headers()}
-    | {ok, status_code(), headers(), body()}
-    | {error, {recoverable_error, term()}}
-    | {error, term()}.
+    {ok, map()} | {error, {recoverable_error, term()} | term()}.
 on_query(ResourceId, {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) ->
     ?TRACE(
         "QUERY_SYNC",
@@ -194,6 +214,19 @@ on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
             disconnected
     end.
 
+%%-------------------------------------------------------------------------------------------------
+%% API
+%%-------------------------------------------------------------------------------------------------
+
+-spec get_topic(resource_id(), topic(), state()) -> {ok, map()} | {error, term()}.
+get_topic(ResourceId, Topic, ConnectorState) ->
+    #{project_id := ProjectId} = ConnectorState,
+    Method = get,
+    Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
+    Body = <<>>,
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    on_query(ResourceId, PreparedRequest, ConnectorState).
+
 %%-------------------------------------------------------------------------------------------------
 %% Helper fns
 %%-------------------------------------------------------------------------------------------------
@@ -266,10 +299,7 @@ get_jwt_authorization_header(JWTConfig) ->
     {prepared_request, prepared_request()},
     resource_id()
 ) ->
-    {ok, status_code(), headers()}
-    | {ok, status_code(), headers(), body()}
-    | {error, {recoverable_error, term()}}
-    | {error, term()}.
+    {ok, map()} | {error, {recoverable_error, term()} | term()}.
 do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) ->
     #{
         jwt_config := JWTConfig,
@@ -280,12 +310,17 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI
     ?tp(
         gcp_pubsub_bridge_do_send_requests,
         #{
+            request => {prepared_request, {Method, Path, Body}},
             query_mode => sync,
             resource_id => ResourceId
         }
     ),
     Headers = get_jwt_authorization_header(JWTConfig),
-    Request = {Path, Headers, Body},
+    Request =
+        case {Method, Body} of
+            {get, <<>>} -> {Path, Headers};
+            _ -> {Path, Headers, Body}
+        end,
     Response = ehttpc:request(
         PoolName,
         Method,
@@ -312,12 +347,17 @@ do_send_requests_async(
     ?tp(
         gcp_pubsub_bridge_do_send_requests,
         #{
+            request => {prepared_request, {Method, Path, Body}},
             query_mode => async,
             resource_id => ResourceId
         }
     ),
     Headers = get_jwt_authorization_header(JWTConfig),
-    Request = {Path, Headers, Body},
+    Request =
+        case {Method, Body} of
+            {get, <<>>} -> {Path, Headers};
+            _ -> {Path, Headers, Body}
+        end,
     Worker = ehttpc_pool:pick_worker(PoolName),
     ok = ehttpc:request_async(
         Worker,
@@ -328,6 +368,7 @@ do_send_requests_async(
     ),
     {ok, Worker}.
 
+-spec handle_response(term(), resource_id(), sync | async) -> {ok, map()} | {error, term()}.
 handle_response(Result, ResourceId, QueryMode) ->
     case Result of
         {error, Reason} ->

+ 552 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -0,0 +1,552 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_consumer_worker).
+
+-behaviour(ecpool_worker).
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `ecpool_worker' API
+-export([connect/1, health_check/1]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_info/2,
+    handle_cast/2,
+    handle_call/3,
+    handle_continue/2,
+    terminate/2
+]).
+
+-export([get_subscription/1]).
+-export([reply_delegator/3, pull_async/1, process_pull_response/2, ensure_subscription/1]).
+
+-type subscription_id() :: binary().
+-type bridge_name() :: atom() | binary().
+-type ack_id() :: binary().
+-type config() :: #{
+    ack_retry_interval := emqx_schema:timeout_duration_ms(),
+    connector_state := emqx_bridge_gcp_pubsub_connector:state(),
+    ecpool_worker_id => non_neg_integer(),
+    hookpoint := binary(),
+    instance_id := binary(),
+    mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
+    pull_max_messages := non_neg_integer(),
+    subscription_id => subscription_id(),
+    topic => emqx_bridge_gcp_pubsub_connector:topic()
+}.
+-type state() :: #{
+    ack_retry_interval := emqx_schema:timeout_duration_ms(),
+    ack_timer := undefined | reference(),
+    async_workers := #{pid() => reference()},
+    connector_state := emqx_bridge_gcp_pubsub_connector:state(),
+    ecpool_worker_id := non_neg_integer(),
+    hookpoint := binary(),
+    instance_id := binary(),
+    mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
+    pending_acks => [ack_id()],
+    pull_max_messages := non_neg_integer(),
+    subscription_id => subscription_id(),
+    topic => emqx_bridge_gcp_pubsub_connector:topic()
+}.
+-type decoded_message() :: map().
+
+-define(HEALTH_CHECK_TIMEOUT, 10_000).
+-define(OPTVAR_SUB_OK(PID), {?MODULE, PID}).
+
+%%-------------------------------------------------------------------------------------------------
+%% API used by `reply_delegator'
+%%-------------------------------------------------------------------------------------------------
+
+-spec pull_async(pid()) -> ok.
+pull_async(WorkerPid) ->
+    gen_server:cast(WorkerPid, pull_async).
+
+-spec process_pull_response(pid(), binary()) -> ok.
+process_pull_response(WorkerPid, RespBody) ->
+    gen_server:cast(WorkerPid, {process_pull_response, RespBody}).
+
+-spec ensure_subscription(pid()) -> ok.
+ensure_subscription(WorkerPid) ->
+    gen_server:cast(WorkerPid, ensure_subscription).
+
+-spec reply_delegator(pid(), binary(), {ok, map()} | {error, timeout | term()}) -> ok.
+reply_delegator(WorkerPid, InstanceId, Result) ->
+    case Result of
+        {error, timeout} ->
+            ?MODULE:pull_async(WorkerPid);
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "gcp_pubsub_consumer_worker_pull_error",
+                instance_id => InstanceId,
+                reason => Reason
+            }),
+            case Reason of
+                #{status_code := 409} ->
+                    %% the subscription was not found; deleted?!
+                    ?MODULE:ensure_subscription(WorkerPid);
+                _ ->
+                    ?MODULE:pull_async(WorkerPid)
+            end;
+        {ok, #{status_code := 200, body := RespBody}} ->
+            ?MODULE:process_pull_response(WorkerPid, RespBody)
+    end.
+
+%%-------------------------------------------------------------------------------------------------
+%% Debugging API
+%%-------------------------------------------------------------------------------------------------
+
+-spec get_subscription(pid()) -> {ok, map()} | {error, term()}.
+get_subscription(WorkerPid) ->
+    gen_server:call(WorkerPid, get_subscription, 5_000).
+
+%%-------------------------------------------------------------------------------------------------
+%% `ecpool' health check
+%%-------------------------------------------------------------------------------------------------
+
+-spec health_check(pid()) -> boolean().
+health_check(WorkerPid) ->
+    case optvar:read(?OPTVAR_SUB_OK(WorkerPid), ?HEALTH_CHECK_TIMEOUT) of
+        {ok, _} ->
+            true;
+        timeout ->
+            false
+    end.
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+connect(Opts0) ->
+    Opts = maps:from_list(Opts0),
+    #{
+        ack_retry_interval := AckRetryInterval,
+        bridge_name := BridgeName,
+        connector_state := ConnectorState,
+        ecpool_worker_id := WorkerId,
+        hookpoint := Hookpoint,
+        instance_id := InstanceId,
+        pull_max_messages := PullMaxMessages,
+        topic_mapping := TopicMapping
+    } = Opts,
+    TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)),
+    Index = 1 + (WorkerId rem map_size(TopicMapping)),
+    {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList),
+    Config = #{
+        ack_retry_interval => AckRetryInterval,
+        connector_state => ConnectorState,
+        hookpoint => Hookpoint,
+        instance_id => InstanceId,
+        mqtt_config => MQTTConfig,
+        pull_max_messages => PullMaxMessages,
+        topic => Topic,
+        subscription_id => subscription_id(BridgeName, Topic)
+    },
+    start_link(Config).
+
+%%-------------------------------------------------------------------------------------------------
+%% `gen_server' API
+%%-------------------------------------------------------------------------------------------------
+
+-spec init(config()) -> {ok, state(), {continue, ensure_subscription}}.
+init(Config) ->
+    process_flag(trap_exit, true),
+    State = Config#{
+        ack_timer => undefined,
+        async_workers => #{},
+        pending_acks => []
+    },
+    {ok, State, {continue, ensure_subscription}}.
+
+handle_continue(ensure_subscription, State0) ->
+    case ensure_subscription_exists(State0) of
+        ok ->
+            #{instance_id := InstanceId} = State0,
+            ?tp(
+                debug,
+                "gcp_pubsub_consumer_worker_subscription_ready",
+                #{instance_id => InstanceId}
+            ),
+            ?MODULE:pull_async(self()),
+            optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
+            {noreply, State0};
+        error ->
+            %% FIXME: add delay if topic does not exist?!
+            %% retry
+            {noreply, State0, {continue, ensure_subscription}}
+    end.
+
+handle_call(get_subscription, _From, State0) ->
+    Res = do_get_subscription(State0),
+    {reply, Res, State0};
+handle_call(_Request, _From, State0) ->
+    {reply, {error, unknown_call}, State0}.
+
+handle_cast(pull_async, State0) ->
+    State = do_pull_async(State0),
+    {noreply, State};
+handle_cast({process_pull_response, RespBody}, State0) ->
+    State = do_process_pull_response(State0, RespBody),
+    {noreply, State};
+handle_cast(ensure_subscription, State0) ->
+    {noreply, State0, {continue, ensure_subscription}};
+handle_cast(_Request, State0) ->
+    {noreply, State0}.
+
+handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) ->
+    State1 = acknowledge(State0),
+    State = ensure_ack_timer(State1),
+    {noreply, State};
+handle_info(
+    {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0}
+) when
+    is_map_key(AsyncWorkerPid, Workers0)
+->
+    Workers = maps:remove(AsyncWorkerPid, Workers0),
+    State1 = State0#{async_workers := Workers},
+    State = do_pull_async(State1),
+    {noreply, State};
+handle_info(Msg, State0) ->
+    #{
+        instance_id := InstanceId,
+        topic := Topic
+    } = State0,
+    ?SLOG(debug, #{
+        msg => "gcp_pubsub_consumer_worker_unexpected_message",
+        unexpected_msg => Msg,
+        instance_id => InstanceId,
+        topic => Topic
+    }),
+    {noreply, State0}.
+
+terminate(_Reason, _State) ->
+    optvar:unset(?OPTVAR_SUB_OK(self())),
+    ok.
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+-spec start_link(config()) -> gen_server:start_ret().
+start_link(Config) ->
+    gen_server:start_link(?MODULE, Config, []).
+
+-spec ensure_ack_timer(state()) -> state().
+ensure_ack_timer(State = #{pending_acks := []}) ->
+    State;
+ensure_ack_timer(State = #{ack_timer := TRef}) when is_reference(TRef) ->
+    State;
+ensure_ack_timer(State = #{ack_retry_interval := AckRetryInterval}) ->
+    State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}.
+
+-spec ensure_subscription_exists(state()) -> ok | error.
+ensure_subscription_exists(State) ->
+    #{
+        connector_state := ConnectorState,
+        instance_id := InstanceId,
+        subscription_id := SubscriptionId,
+        topic := Topic
+    } = State,
+    Method = put,
+    Path = path(State, create),
+    Body = body(State, create),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
+    case Res of
+        {error, #{status_code := 409}} ->
+            %% already exists
+            ?SLOG(debug, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_already_exists",
+                instance_id => InstanceId,
+                topic => Topic,
+                subscription_id => SubscriptionId
+            }),
+            Method1 = patch,
+            Path1 = path(State, create),
+            Body1 = body(State, patch_subscription),
+            PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}},
+            Res1 = emqx_bridge_gcp_pubsub_connector:on_query(
+                InstanceId, PreparedRequest1, ConnectorState
+            ),
+            ?SLOG(debug, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_patch",
+                instance_id => InstanceId,
+                topic => Topic,
+                subscription_id => SubscriptionId,
+                result => Res1
+            }),
+            ok;
+        {ok, #{status_code := 200}} ->
+            ?SLOG(debug, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_created",
+                instance_id => InstanceId,
+                topic => Topic,
+                subscription_id => SubscriptionId
+            }),
+            ok;
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_error",
+                instance_id => InstanceId,
+                topic => Topic,
+                reason => Reason
+            }),
+            error
+    end.
+
+%% We use async requests so that this process will be more responsive to system messages.
+do_pull_async(State) ->
+    #{
+        connector_state := ConnectorState,
+        instance_id := InstanceId
+    } = State,
+    Method = post,
+    Path = path(State, pull),
+    Body = body(State, pull),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    ReplyFunAndArgs = {fun ?MODULE:reply_delegator/3, [self(), InstanceId]},
+    {ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_connector:on_query_async(
+        InstanceId,
+        PreparedRequest,
+        ReplyFunAndArgs,
+        ConnectorState
+    ),
+    ensure_async_worker_monitored(State, AsyncWorkerPid).
+
+-spec ensure_async_worker_monitored(state(), pid()) -> state().
+ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerPid) ->
+    case is_map_key(AsyncWorkerPid, Workers0) of
+        true ->
+            State;
+        false ->
+            Ref = monitor(process, AsyncWorkerPid),
+            Workers = Workers0#{AsyncWorkerPid => Ref},
+            State#{async_workers := Workers}
+    end.
+
+-spec do_process_pull_response(state(), binary()) -> state().
+do_process_pull_response(State0, RespBody) ->
+    Messages = decode_response(RespBody),
+    AckIds = lists:map(fun(Msg) -> handle_message(State0, Msg) end, Messages),
+    State1 = maps:update_with(pending_acks, fun(AckIds0) -> AckIds0 ++ AckIds end, State0),
+    State2 = acknowledge(State1),
+    pull_async(self()),
+    ensure_ack_timer(State2).
+
+-spec acknowledge(state()) -> state().
+acknowledge(State0 = #{pending_acks := []}) ->
+    State0;
+acknowledge(State0) ->
+    State1 = State0#{ack_timer := undefined},
+    #{
+        connector_state := ConnectorState,
+        instance_id := InstanceId,
+        pending_acks := AckIds
+    } = State1,
+    Method = post,
+    Path = path(State1, ack),
+    Body = body(State1, ack, #{ack_ids => AckIds}),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
+    case Res of
+        {error, Reason} ->
+            ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", reason => Reason}),
+            State1;
+        {ok, #{status_code := 200}} ->
+            ?tp(gcp_pubsub_consumer_worker_acknowledged, #{ack_ids => AckIds}),
+            State1#{pending_acks := []};
+        {ok, Details} ->
+            ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", details => Details}),
+            State1
+    end.
+
+do_get_subscription(State) ->
+    #{
+        connector_state := ConnectorState,
+        instance_id := InstanceId
+    } = State,
+    Method = get,
+    Path = path(State, get_subscription),
+    Body = body(State, get_subscription),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    Res = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, PreparedRequest, ConnectorState),
+    case Res of
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "gcp_pubsub_consumer_worker_get_subscription_error",
+                reason => Reason
+            }),
+            {error, Reason};
+        {ok, #{status_code := 200, body := RespBody}} ->
+            DecodedBody = emqx_utils_json:decode(RespBody, [return_maps]),
+            {ok, DecodedBody};
+        {ok, Details} ->
+            ?SLOG(warning, #{
+                msg => "gcp_pubsub_consumer_worker_get_subscription_unexpected_response",
+                details => Details
+            }),
+            {error, Details}
+    end.
+
+-spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_connector:topic()) -> subscription_id().
+subscription_id(BridgeName0, Topic) ->
+    %% The real GCP PubSub accepts colons in subscription names, but its emulator
+    %% doesn't...  We currently validate bridge names to not include that character.  The
+    %% exception is the prefix from the probe API.
+    BridgeName1 = to_bin(BridgeName0),
+    BridgeName = binary:replace(BridgeName1, <<":">>, <<"-">>),
+    to_bin(uri_string:quote(<<"emqx-sub-", BridgeName/binary, "-", Topic/binary>>)).
+
+-spec path(state(), pull | create | ack | get_subscription) -> binary().
+path(State, Type) ->
+    #{
+        connector_state := #{project_id := ProjectId},
+        subscription_id := SubscriptionId
+    } = State,
+    SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
+    case Type of
+        pull ->
+            <<"/v1/", SubscriptionResource/binary, ":pull">>;
+        create ->
+            <<"/v1/", SubscriptionResource/binary>>;
+        ack ->
+            <<"/v1/", SubscriptionResource/binary, ":acknowledge">>;
+        get_subscription ->
+            <<"/v1/", SubscriptionResource/binary>>
+    end.
+
+-spec body(state(), pull | create | patch_subscription | get_subscription) -> binary().
+body(State, pull) ->
+    #{pull_max_messages := PullMaxMessages} = State,
+    emqx_utils_json:encode(#{<<"maxMessages">> => PullMaxMessages});
+body(State, create) ->
+    #{
+        ack_retry_interval := AckRetryInterval,
+        connector_state := #{project_id := ProjectId},
+        topic := PubSubTopic
+    } = State,
+    TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
+    AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
+    JSON = #{
+        <<"topic">> => TopicResource,
+        <<"ackDeadlineSeconds">> => AckDeadlineSeconds
+    },
+    emqx_utils_json:encode(JSON);
+body(State, patch_subscription) ->
+    #{
+        ack_retry_interval := AckRetryInterval,
+        connector_state := #{project_id := ProjectId},
+        topic := PubSubTopic,
+        subscription_id := SubscriptionId
+    } = State,
+    TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
+    SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
+    AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
+    JSON = #{
+        <<"subscription">> =>
+            #{
+                <<"ackDeadlineSeconds">> => AckDeadlineSeconds,
+                <<"name">> => SubscriptionResource,
+                <<"topic">> => TopicResource
+            },
+        %% topic is immutable; don't add it here.
+        <<"updateMask">> => <<"ackDeadlineSeconds">>
+    },
+    emqx_utils_json:encode(JSON);
+body(_State, get_subscription) ->
+    <<>>.
+
+-spec body(state(), ack, map()) -> binary().
+body(_State, ack, Opts) ->
+    #{ack_ids := AckIds} = Opts,
+    JSON = #{<<"ackIds">> => AckIds},
+    emqx_utils_json:encode(JSON).
+
+-spec subscription_resource(emqx_bridge_gcp_pubsub_connector:project_id(), subscription_id()) ->
+    binary().
+subscription_resource(ProjectId, SubscriptionId) ->
+    <<"projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>.
+
+-spec decode_response(binary()) -> [decoded_message()].
+decode_response(RespBody) ->
+    case emqx_utils_json:decode(RespBody, [return_maps]) of
+        #{<<"receivedMessages">> := Msgs0} ->
+            lists:map(
+                fun(Msg0 = #{<<"message">> := InnerMsg0}) ->
+                    InnerMsg = emqx_utils_maps:update_if_present(
+                        <<"data">>, fun base64:decode/1, InnerMsg0
+                    ),
+                    Msg0#{<<"message">> := InnerMsg}
+                end,
+                Msgs0
+            );
+        #{} ->
+            []
+    end.
+
+-spec handle_message(state(), decoded_message()) -> [ack_id()].
+handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Message) ->
+    ?tp(
+        debug,
+        "gcp_pubsub_consumer_worker_handle_message",
+        #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId}
+    ),
+    #{
+        instance_id := InstanceId,
+        hookpoint := Hookpoint,
+        mqtt_config := #{
+            payload_template := PayloadTemplate,
+            qos := MQTTQoS,
+            mqtt_topic := MQTTTopic
+        },
+        topic := Topic
+    } = State,
+    #{
+        <<"messageId">> := MessageId,
+        <<"publishTime">> := PublishTime
+    } = InnerMsg,
+    FullMessage0 = #{
+        message_id => MessageId,
+        publish_time => PublishTime,
+        topic => Topic
+    },
+    FullMessage =
+        lists:foldl(
+            fun({FromKey, ToKey}, Acc) ->
+                add_if_present(FromKey, InnerMsg, ToKey, Acc)
+            end,
+            FullMessage0,
+            [
+                {<<"data">>, value},
+                {<<"attributes">>, attributes},
+                {<<"orderingKey">>, ordering_key}
+            ]
+        ),
+    Payload = render(FullMessage, PayloadTemplate),
+    MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload),
+    _ = emqx:publish(MQTTMessage),
+    emqx:run_hook(Hookpoint, [FullMessage]),
+    emqx_resource_metrics:received_inc(InstanceId),
+    AckId.
+
+-spec add_if_present(any(), map(), any(), map()) -> map().
+add_if_present(FromKey, Message, ToKey, Map) ->
+    case maps:get(FromKey, Message, undefined) of
+        undefined ->
+            Map;
+        Value ->
+            Map#{ToKey => Value}
+    end.
+
+render(FullMessage, PayloadTemplate) ->
+    Opts = #{return => full_binary},
+    emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
+
+to_bin(A) when is_atom(A) -> atom_to_binary(A);
+to_bin(L) when is_list(L) -> iolist_to_binary(L);
+to_bin(B) when is_binary(B) -> B.

+ 200 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl

@@ -0,0 +1,200 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_impl_consumer).
+
+-behaviour(emqx_resource).
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    query_mode/1,
+    on_start/2,
+    on_stop/2,
+    on_get_status/2
+]).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-type mqtt_config() :: #{
+    mqtt_topic := emqx_types:topic(),
+    qos := emqx_types:qos(),
+    payload_template := string()
+}.
+-type config() :: #{
+    connect_timeout := emqx_schema:duration_ms(),
+    max_retries := non_neg_integer(),
+    pool_size := non_neg_integer(),
+    resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
+    service_account_json := emqx_bridge_gcp_pubsub_connector:service_account_json(),
+    any() => term()
+}.
+-type state() :: #{
+    connector_state := emqx_bridge_gcp_pubsub_connector:state()
+}.
+
+-export_type([mqtt_config/0]).
+
+-define(AUTO_RECONNECT_S, 2).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+-spec callback_mode() -> callback_mode().
+callback_mode() -> async_if_possible.
+
+-spec query_mode(any()) -> query_mode().
+query_mode(_Config) -> no_queries.
+
+-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+on_start(InstanceId, Config) ->
+    case emqx_bridge_gcp_pubsub_connector:on_start(InstanceId, Config) of
+        {ok, ConnectorState} ->
+            start_consumers(InstanceId, ConnectorState, Config);
+        Error ->
+            Error
+    end.
+
+-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, #{connector_state := ConnectorState}) ->
+    ok = stop_consumers(InstanceId),
+    emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState);
+on_stop(InstanceId, undefined = _State) ->
+    ok = stop_consumers(InstanceId),
+    emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, undefined).
+
+-spec on_get_status(resource_id(), state()) -> connected | disconnected.
+on_get_status(InstanceId, _State) ->
+    case
+        emqx_resource_pool:health_check_workers(
+            InstanceId,
+            fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1
+        )
+    of
+        true -> connected;
+        false -> connecting
+    end.
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+start_consumers(InstanceId, ConnectorState, Config) ->
+    #{
+        bridge_name := BridgeName,
+        consumer := ConsumerConfig0,
+        hookpoint := Hookpoint
+    } = Config,
+    ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
+    TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
+    PullWorkerMultiplier = maps:get(pull_worker_multiplier, ConsumerConfig1),
+    PoolSize = map_size(TopicMapping) * PullWorkerMultiplier,
+    ConsumerConfig = ConsumerConfig1#{
+        auto_reconnect => ?AUTO_RECONNECT_S,
+        bridge_name => BridgeName,
+        connector_state => ConnectorState,
+        hookpoint => Hookpoint,
+        instance_id => InstanceId,
+        pool_size => PoolSize
+    },
+    ConsumerOpts = maps:to_list(ConsumerConfig),
+    %% FIXME: mark as unhealthy if topics do not exist!
+    case validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) of
+        ok ->
+            ok;
+        error ->
+            _ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState),
+            throw(
+                "GCP PubSub topics are invalid.  Please check the logs, check if the "
+                "topic exists in GCP and if the service account has permissions to use them."
+            )
+    end,
+    case
+        emqx_resource_pool:start(InstanceId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts)
+    of
+        ok ->
+            State = #{
+                connector_state => ConnectorState,
+                pool_name => InstanceId
+            },
+            {ok, State};
+        {error, Reason} ->
+            _ = emqx_bridge_gcp_pubsub_connector:on_stop(InstanceId, ConnectorState),
+            {error, Reason}
+    end.
+
+stop_consumers(InstanceId) ->
+    _ = log_when_error(
+        fun() ->
+            ok = emqx_resource_pool:stop(InstanceId)
+        end,
+        #{
+            msg => "failed_to_stop_pull_worker_pool",
+            instance_id => InstanceId
+        }
+    ),
+    ok.
+
+convert_topic_mapping(TopicMappingList) ->
+    lists:foldl(
+        fun(Fields, Acc) ->
+            #{
+                pubsub_topic := PubSubTopic,
+                mqtt_topic := MQTTTopic,
+                qos := QoS,
+                payload_template := PayloadTemplate0
+            } = Fields,
+            PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
+            Acc#{
+                PubSubTopic => #{
+                    payload_template => PayloadTemplate,
+                    mqtt_topic => MQTTTopic,
+                    qos => QoS
+                }
+            }
+        end,
+        #{},
+        TopicMappingList
+    ).
+
+validate_pubsub_topics(InstanceId, TopicMapping, ConnectorState) ->
+    PubSubTopics = maps:keys(TopicMapping),
+    do_validate_pubsub_topics(InstanceId, ConnectorState, PubSubTopics).
+
+do_validate_pubsub_topics(InstanceId, ConnectorState, [Topic | Rest]) ->
+    case check_for_topic_existence(InstanceId, Topic, ConnectorState) of
+        ok ->
+            do_validate_pubsub_topics(InstanceId, ConnectorState, Rest);
+        {error, _} ->
+            error
+    end;
+do_validate_pubsub_topics(_InstanceId, _ConnectorState, []) ->
+    %% we already validate that the mapping is not empty in the config schema.
+    ok.
+
+check_for_topic_existence(InstanceId, Topic, ConnectorState) ->
+    Res = emqx_bridge_gcp_pubsub_connector:get_topic(InstanceId, Topic, ConnectorState),
+    case Res of
+        {ok, _} ->
+            ok;
+        {error, #{status_code := 404}} ->
+            {error, not_found};
+        {error, Details} ->
+            ?tp(warning, "gcp_pubsub_consumer_check_topic_error", Details),
+            {error, Details}
+    end.
+
+log_when_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.

+ 690 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -0,0 +1,690 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_consumer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("jose/include/jose_jwt.hrl").
+-include_lib("jose/include/jose_jws.hrl").
+
+-define(BRIDGE_TYPE, gcp_pubsub_consumer).
+-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
+-define(REPUBLISH_TOPIC, <<"republish/t">>).
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    GCPEmulatorHost = os:getenv("GCP_EMULATOR_HOST", "toxiproxy"),
+    GCPEmulatorPortStr = os:getenv("GCP_EMULATOR_PORT", "8085"),
+    GCPEmulatorPort = list_to_integer(GCPEmulatorPortStr),
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    ProxyName = "gcp_emulator",
+    case emqx_common_test_helpers:is_tcp_server_available(GCPEmulatorHost, GCPEmulatorPort) of
+        true ->
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+            ok = emqx_connector_test_helpers:start_apps([
+                emqx_resource, emqx_bridge, emqx_rule_engine
+            ]),
+            {ok, _} = application:ensure_all_started(emqx_connector),
+            emqx_mgmt_api_test_util:init_suite(),
+            HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr,
+            true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
+            ConnectorState = start_control_connector(),
+            [
+                {proxy_name, ProxyName},
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
+                {gcp_emulator_host, GCPEmulatorHost},
+                {gcp_emulator_port, GCPEmulatorPort},
+                {connector_state, ConnectorState}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_gcp_emulator);
+                _ ->
+                    {skip, no_gcp_emulator}
+            end
+    end.
+
+end_per_suite(Config) ->
+    ConnectorState = ?config(connector_state, Config),
+    stop_control_connector(ConnectorState),
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
+    _ = application:stop(emqx_connector),
+    os:unsetenv("PUBSUB_EMULATOR_HOST"),
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+common_init_per_testcase(TestCase, Config0) ->
+    ct:timetrap(timer:seconds(60)),
+    emqx_bridge_testlib:delete_all_bridges(),
+    emqx_config:delete_override_conf_files(),
+    ConsumerTopic =
+        <<
+            (atom_to_binary(TestCase))/binary,
+            (integer_to_binary(erlang:unique_integer()))/binary
+        >>,
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>),
+    MQTTQoS = proplists:get_value(mqtt_qos, Config0, 0),
+    DefaultTopicMapping = [
+        #{
+            pubsub_topic => ConsumerTopic,
+            mqtt_topic => MQTTTopic,
+            qos => MQTTQoS,
+            payload_template => <<"${.}">>
+        }
+    ],
+    TopicMapping = proplists:get_value(topic_mapping, Config0, DefaultTopicMapping),
+    ServiceAccountJSON =
+        #{<<"project_id">> := ProjectId} =
+        emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
+    Config = [
+        {consumer_topic, ConsumerTopic},
+        {topic_mapping, TopicMapping},
+        {service_account_json, ServiceAccountJSON},
+        {project_id, ProjectId}
+        | Config0
+    ],
+    {Name, ConfigString, ConsumerConfig} = consumer_config(TestCase, Config),
+    ensure_topics(Config),
+    ok = snabbkaffe:start_trace(),
+    [
+        {consumer_name, Name},
+        {consumer_config_string, ConfigString},
+        {consumer_config, ConsumerConfig}
+        | Config
+    ].
+
+end_per_testcase(_Testcase, Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ok;
+        false ->
+            ProxyHost = ?config(proxy_host, Config),
+            ProxyPort = ?config(proxy_port, Config),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            emqx_bridge_testlib:delete_all_bridges(),
+            emqx_common_test_helpers:call_janitor(60_000),
+            ok = snabbkaffe:stop(),
+            ok
+    end.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+consumer_config(TestCase, Config) ->
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    ConsumerTopic = ?config(consumer_topic, Config),
+    ServiceAccountJSON = ?config(service_account_json, Config),
+    Name = <<
+        (atom_to_binary(TestCase))/binary, UniqueNum/binary
+    >>,
+    ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON),
+    MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
+    MQTTQoS = proplists:get_value(mqtt_qos, Config, 0),
+    PullWorkerMultiplier = proplists:get_value(pull_worker_multiplier, Config, 1),
+    DefaultTopicMapping = [
+        #{
+            pubsub_topic => ConsumerTopic,
+            mqtt_topic => MQTTTopic,
+            qos => MQTTQoS,
+            payload_template => <<"${.}">>
+        }
+    ],
+    TopicMapping0 = proplists:get_value(topic_mapping, Config, DefaultTopicMapping),
+    TopicMappingStr = topic_mapping(TopicMapping0),
+    ConfigString =
+        io_lib:format(
+            "bridges.gcp_pubsub_consumer.~s {\n"
+            "  enable = true\n"
+            %% gcp pubsub emulator doesn't do pipelining very well...
+            "  pipelining = 1\n"
+            "  connect_timeout = \"15s\"\n"
+            "  service_account_json = ~s\n"
+            "  consumer {\n"
+            "    ack_retry_interval = \"5s\"\n"
+            "    pull_max_messages = 10\n"
+            "    pull_worker_multiplier = ~b\n"
+            %% topic mapping
+            "~s"
+            "  }\n"
+            "  max_retries = 2\n"
+            "  pipelining = 100\n"
+            "  pool_size = 8\n"
+            "  resource_opts {\n"
+            "    health_check_interval = \"1s\"\n"
+            "    request_ttl = \"15s\"\n"
+            "  }\n"
+            "}\n",
+            [
+                Name,
+                ServiceAccountJSONStr,
+                PullWorkerMultiplier,
+                TopicMappingStr
+            ]
+        ),
+    {Name, ConfigString, parse_and_check(ConfigString, Name)}.
+
+parse_and_check(ConfigString, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+topic_mapping(TopicMapping0) ->
+    Template0 = <<
+        "{pubsub_topic = \"{{ pubsub_topic }}\","
+        " mqtt_topic = \"{{ mqtt_topic }}\","
+        " qos = {{ qos }},"
+        " payload_template = \"{{{ payload_template }}}\" }"
+    >>,
+    Template = bbmustache:parse_binary(Template0),
+    Entries =
+        lists:map(
+            fun(Params) ->
+                bbmustache:compile(Template, Params, [{key_type, atom}])
+            end,
+            TopicMapping0
+        ),
+    iolist_to_binary(
+        [
+            "  topic_mapping = [",
+            lists:join(<<",\n">>, Entries),
+            "]\n"
+        ]
+    ).
+
+ensure_topics(Config) ->
+    TopicMapping = ?config(topic_mapping, Config),
+    lists:foreach(
+        fun(#{pubsub_topic := T}) ->
+            ensure_topic(Config, T)
+        end,
+        TopicMapping
+    ).
+
+ensure_topic(Config, Topic) ->
+    ProjectId = ?config(project_id, Config),
+    ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config),
+    Method = put,
+    Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
+    Body = <<"{}">>,
+    Res = emqx_bridge_gcp_pubsub_connector:on_query(
+        PoolName,
+        {prepared_request, {Method, Path, Body}},
+        ConnectorState
+    ),
+    case Res of
+        {ok, _} ->
+            ok;
+        {error, #{status_code := 409}} ->
+            %% already exists
+            ok
+    end,
+    ok.
+
+start_control_connector() ->
+    RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
+    ServiceAccount = emqx_utils_maps:unsafe_atom_key_map(RawServiceAccount),
+    ConnectorConfig =
+        #{
+            connect_timeout => 5_000,
+            max_retries => 0,
+            pool_size => 1,
+            resource_opts => #{request_ttl => 5_000},
+            service_account_json => ServiceAccount
+        },
+    PoolName = <<"control_connector">>,
+    {ok, ConnectorState} = emqx_bridge_gcp_pubsub_connector:on_start(PoolName, ConnectorConfig),
+    ConnectorState.
+
+stop_control_connector(ConnectorState) ->
+    #{pool_name := PoolName} = ConnectorState,
+    ok = emqx_bridge_gcp_pubsub_connector:on_stop(PoolName, ConnectorState),
+    ok.
+
+pubsub_publish(Config, Topic, Messages0) ->
+    ConnectorState = #{pool_name := PoolName} = ?config(connector_state, Config),
+    ProjectId = ?config(project_id, Config),
+    Method = post,
+    Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary, ":publish">>,
+    Messages =
+        lists:map(
+            fun(Msg) ->
+                emqx_utils_maps:update_if_present(
+                    <<"data">>,
+                    fun
+                        (D) when is_binary(D) -> base64:encode(D);
+                        (M) when is_map(M) -> base64:encode(emqx_utils_json:encode(M))
+                    end,
+                    Msg
+                )
+            end,
+            Messages0
+        ),
+    Body = emqx_utils_json:encode(#{<<"messages">> => Messages}),
+    {ok, _} = emqx_bridge_gcp_pubsub_connector:on_query(
+        PoolName,
+        {prepared_request, {Method, Path, Body}},
+        ConnectorState
+    ),
+    ok.
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    BridgeConfig0 = ?config(consumer_config, Config),
+    BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+    emqx_bridge:create(Type, Name, BridgeConfig).
+
+create_bridge_api(Config) ->
+    create_bridge_api(Config, _Overrides = #{}).
+
+create_bridge_api(Config, Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    BridgeConfig0 = ?config(consumer_config, Config),
+    BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+    Params = BridgeConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("creating bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {Status, Headers, Body0}} ->
+                {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
+            Error ->
+                Error
+        end,
+    ct:pal("bridge create result: ~p", [Res]),
+    Res.
+
+probe_bridge_api(Config) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    ConsumerConfig = ?config(consumer_config, Config),
+    Params = ConsumerConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("probing bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
+            Error -> Error
+        end,
+    ct:pal("bridge probe result: ~p", [Res]),
+    Res.
+
+start_and_subscribe_mqtt(Config) ->
+    TopicMapping = ?config(topic_mapping, Config),
+    {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+    on_exit(fun() -> emqtt:stop(C) end),
+    {ok, _} = emqtt:connect(C),
+    lists:foreach(
+        fun(#{mqtt_topic := MQTTTopic}) ->
+            {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, _QoS = 2)
+        end,
+        TopicMapping
+    ),
+    ok.
+
+resource_id(Config) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    emqx_bridge_resource:resource_id(Type, Name).
+
+receive_published() ->
+    receive_published(#{}).
+
+receive_published(Opts0) ->
+    Default = #{n => 1, timeout => 20_000},
+    Opts = maps:merge(Default, Opts0),
+    receive_published(Opts, []).
+
+receive_published(#{n := N, timeout := _Timeout}, Acc) when N =< 0 ->
+    {ok, lists:reverse(Acc)};
+receive_published(#{n := N, timeout := Timeout} = Opts, Acc) ->
+    receive
+        {publish, Msg0 = #{payload := Payload}} ->
+            Msg =
+                case emqx_utils_json:safe_decode(Payload, [return_maps]) of
+                    {ok, Decoded} -> Msg0#{payload := Decoded};
+                    {error, _} -> Msg0
+                end,
+            receive_published(Opts#{n := N - 1}, [Msg | Acc])
+    after Timeout ->
+        {timeout, #{
+            msgs_so_far => Acc,
+            mailbox => process_info(self(), messages),
+            expected_remaining => N
+        }}
+    end.
+
+create_rule_and_action_http(Config) ->
+    ConsumerName = ?config(consumer_name, Config),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, ConsumerName),
+    ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>,
+    Params = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"$bridges/", BridgeId/binary, "\"">>,
+        actions =>
+            [
+                #{
+                    <<"function">> => <<"republish">>,
+                    <<"args">> =>
+                        #{
+                            <<"topic">> => ?REPUBLISH_TOPIC,
+                            <<"payload">> => <<>>,
+                            <<"qos">> => 0,
+                            <<"retain">> => false,
+                            <<"user_properties">> => <<"${headers}">>
+                        }
+                },
+                #{<<"function">> => ActionFn}
+            ]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ct:pal("rule action params: ~p", [Params]),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res = #{<<"id">> := RuleId}} ->
+            on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+            {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error ->
+            Error
+    end.
+
+action_response(Selected, Envs, Args) ->
+    ?tp(action_response, #{
+        selected => Selected,
+        envs => Envs,
+        args => Args
+    }),
+    ok.
+
+assert_non_received_metrics(BridgeName) ->
+    Metrics = emqx_bridge:get_metrics(?BRIDGE_TYPE, BridgeName),
+    #{counters := Counters0, gauges := Gauges} = Metrics,
+    Counters = maps:remove(received, Counters0),
+    ?assert(lists:all(fun(V) -> V == 0 end, maps:values(Counters)), #{metrics => Metrics}),
+    ?assert(lists:all(fun(V) -> V == 0 end, maps:values(Gauges)), #{metrics => Metrics}),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Trace properties
+%%------------------------------------------------------------------------------
+
+prop_pulled_only_once(Trace) ->
+    PulledIds = ?projection(
+        message_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace)
+    ),
+    NumPulled = length(PulledIds),
+    UniqueNumPulled = sets:size(sets:from_list(PulledIds, [{version, 2}])),
+    ?assertEqual(UniqueNumPulled, NumPulled),
+    ok.
+
+prop_all_pulled_are_acked(Trace) ->
+    PulledAckIds = ?projection(
+        ack_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace)
+    ),
+    AckedIds0 = ?projection(ack_ids, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)),
+    AckedIds = lists:flatten(AckedIds0),
+    ?assertEqual(
+        sets:from_list(PulledAckIds, [{version, 2}]),
+        sets:from_list(AckedIds, [{version, 2}])
+    ),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_consume_ok(Config) ->
+    BridgeName = ?config(consumer_name, Config),
+    TopicMapping = ?config(topic_mapping, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            start_and_subscribe_mqtt(Config),
+            ?assertMatch(
+                {{ok, _}, {ok, _}},
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
+                    40_000
+                )
+            ),
+            [
+                #{
+                    pubsub_topic := Topic,
+                    mqtt_topic := MQTTTopic,
+                    qos := QoS
+                }
+            ] = TopicMapping,
+            Payload0 = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Messages0 = [
+                #{
+                    <<"data">> => Data0 = #{<<"value">> => Payload0},
+                    <<"attributes">> => Attributes0 = #{<<"key">> => <<"value">>},
+                    <<"orderingKey">> => <<"some_ordering_key">>
+                }
+            ],
+            pubsub_publish(Config, Topic, Messages0),
+            {ok, Published0} = receive_published(),
+            EncodedData0 = emqx_utils_json:encode(Data0),
+            ?assertMatch(
+                [
+                    #{
+                        qos := QoS,
+                        topic := MQTTTopic,
+                        payload :=
+                            #{
+                                <<"attributes">> := Attributes0,
+                                <<"message_id">> := MsgId,
+                                <<"ordering_key">> := <<"some_ordering_key">>,
+                                <<"publish_time">> := PubTime,
+                                <<"topic">> := Topic,
+                                <<"value">> := EncodedData0
+                            }
+                    }
+                ] when is_binary(MsgId) andalso is_binary(PubTime),
+                Published0
+            ),
+            %% no need to check return value; we check the property in
+            %% the check phase.  this is just to give it a chance to do
+            %% so and avoid flakiness.  should be fast.
+            ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 1_000),
+            ?retry(
+                _Interval = 200,
+                _NAttempts = 20,
+                ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId))
+            ),
+
+            %% Batch with only data and only attributes
+            Payload1 = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Messages1 = [
+                #{<<"data">> => Data1 = #{<<"val">> => Payload1}},
+                #{<<"attributes">> => Attributes1 = #{<<"other_key">> => <<"other_value">>}}
+            ],
+            pubsub_publish(Config, Topic, Messages1),
+            {ok, Published1} = receive_published(#{n => 2}),
+            EncodedData1 = emqx_utils_json:encode(Data1),
+            ?assertMatch(
+                [
+                    #{
+                        qos := QoS,
+                        topic := MQTTTopic,
+                        payload :=
+                            #{
+                                <<"message_id">> := _,
+                                <<"publish_time">> := _,
+                                <<"topic">> := Topic,
+                                <<"value">> := EncodedData1
+                            }
+                    },
+                    #{
+                        qos := QoS,
+                        topic := MQTTTopic,
+                        payload :=
+                            #{
+                                <<"attributes">> := Attributes1,
+                                <<"message_id">> := _,
+                                <<"publish_time">> := _,
+                                <<"topic">> := Topic
+                            }
+                    }
+                ],
+                Published1
+            ),
+            ?assertNotMatch(
+                [
+                    #{payload := #{<<"attributes">> := _, <<"ordering_key">> := _}},
+                    #{payload := #{<<"value">> := _, <<"ordering_key">> := _}}
+                ],
+                Published1
+            ),
+            %% no need to check return value; we check the property in
+            %% the check phase.  this is just to give it a chance to do
+            %% so and avoid flakiness.  should be fast.
+            ?block_until(
+                #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged, ack_ids := [_, _]}, 1_000
+            ),
+            ?retry(
+                _Interval = 200,
+                _NAttempts = 20,
+                ?assertEqual(3, emqx_resource_metrics:received_get(ResourceId))
+            ),
+
+            %% Check that the bridge probe API doesn't leak atoms.
+            ProbeRes0 = probe_bridge_api(Config),
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
+            AtomsBefore = erlang:system_info(atom_count),
+            %% Probe again; shouldn't have created more atoms.
+            ProbeRes1 = probe_bridge_api(Config),
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
+            AtomsAfter = erlang:system_info(atom_count),
+            ?assertEqual(AtomsBefore, AtomsAfter),
+
+            assert_non_received_metrics(BridgeName),
+
+            ok
+        end,
+        [
+            {"all pulled ack ids are acked", fun ?MODULE:prop_all_pulled_are_acked/1},
+            {"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}
+        ]
+    ),
+    ok.
+
+t_bridge_rule_action_source(Config) ->
+    BridgeName = ?config(consumer_name, Config),
+    TopicMapping = ?config(topic_mapping, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {{ok, _}, {ok, _}},
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
+                    40_000
+                )
+            ),
+            {ok, _} = create_rule_and_action_http(Config),
+
+            [#{pubsub_topic := PubSubTopic}] = TopicMapping,
+            {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+            on_exit(fun() -> emqtt:stop(C) end),
+            {ok, _} = emqtt:connect(C),
+            {ok, _, [0]} = emqtt:subscribe(C, ?REPUBLISH_TOPIC),
+
+            Payload0 = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Messages0 = [
+                #{
+                    <<"data">> => Data0 = #{<<"payload">> => Payload0},
+                    <<"attributes">> => Attributes0 = #{<<"key">> => <<"value">>}
+                }
+            ],
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    pubsub_publish(Config, PubSubTopic, Messages0),
+                    #{?snk_kind := action_response},
+                    5_000
+                ),
+            Published0 = receive_published(),
+            EncodedData0 = emqx_utils_json:encode(Data0),
+            ?assertMatch(
+                {ok, [
+                    #{
+                        topic := ?REPUBLISH_TOPIC,
+                        qos := 0,
+                        payload := #{
+                            <<"event">> := <<"$bridges/", _/binary>>,
+                            <<"message_id">> := _,
+                            <<"metadata">> := #{<<"rule_id">> := _},
+                            <<"publish_time">> := _,
+                            <<"topic">> := PubSubTopic,
+                            <<"attributes">> := Attributes0,
+                            <<"value">> := EncodedData0
+                        }
+                    }
+                ]},
+                Published0
+            ),
+            ?retry(
+                _Interval = 200,
+                _NAttempts = 20,
+                ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId))
+            ),
+
+            assert_non_received_metrics(BridgeName),
+
+            #{payload => Payload0}
+        end,
+        [{"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}]
+    ),
+    ok.
+
+%% TODO TEST:
+%%   * multi-topic mapping
+%%   * get status
+%%   * 2+ pull workers do not duplicate delivered messages
+%%   * inexistent topic
+%%   * connection cut then restored
+%%   * pull worker death
+%%   * async worker death mid-pull
+%%   * ensure subscription creation error
+%%   * cluster subscription
+%%   * connection down during pull
+%%   * connection down during ack
+%%   * topic deleted while consumer is running
+%%   * subscription deleted while consumer is running

+ 7 - 31
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -74,6 +74,7 @@ init_per_suite(Config) ->
     ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
     {ok, _} = application:ensure_all_started(emqx_connector),
     emqx_mgmt_api_test_util:init_suite(),
+    persistent_term:put({emqx_bridge_gcp_pubsub_connector, transport}, tls),
     Config.
 
 end_per_suite(_Config) ->
@@ -81,6 +82,7 @@ end_per_suite(_Config) ->
     ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
     ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
     _ = application:stop(emqx_connector),
+    persistent_term:erase({emqx_bridge_gcp_pubsub_connector, transport}),
     ok.
 
 init_per_group(sync_query, Config) ->
@@ -276,7 +278,7 @@ gcp_pubsub_config(Config) ->
     PayloadTemplate = proplists:get_value(payload_template, Config, ""),
     PubSubTopic = proplists:get_value(pubsub_topic, Config, <<"mytopic">>),
     PipelineSize = proplists:get_value(pipeline_size, Config, 100),
-    ServiceAccountJSON = proplists:get_value(pubsub_topic, Config, generate_service_account_json()),
+    ServiceAccountJSON = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
     ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON),
     GUID = emqx_guid:to_hexstr(emqx_guid:gen()),
     Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>,
@@ -324,32 +326,6 @@ parse_and_check(ConfigString, Name) ->
     #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
     Config.
 
-generate_service_account_json() ->
-    PrivateKeyPEM = generate_private_key_pem(),
-    service_account_json(PrivateKeyPEM).
-
-generate_private_key_pem() ->
-    PublicExponent = 65537,
-    Size = 2048,
-    Key = public_key:generate_key({rsa, Size, PublicExponent}),
-    DERKey = public_key:der_encode('PrivateKeyInfo', Key),
-    public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
-
-service_account_json(PrivateKeyPEM) ->
-    #{
-        <<"type">> => <<"service_account">>,
-        <<"project_id">> => <<"myproject">>,
-        <<"private_key_id">> => <<"kid">>,
-        <<"private_key">> => PrivateKeyPEM,
-        <<"client_email">> => <<"test@myproject.iam.gserviceaccount.com">>,
-        <<"client_id">> => <<"123812831923812319190">>,
-        <<"auth_uri">> => <<"https://accounts.google.com/o/oauth2/auth">>,
-        <<"token_uri">> => <<"https://oauth2.googleapis.com/token">>,
-        <<"auth_provider_x509_cert_url">> => <<"https://www.googleapis.com/oauth2/v1/certs">>,
-        <<"client_x509_cert_url">> =>
-            <<"https://www.googleapis.com/robot/v1/metadata/x509/test%40myproject.iam.gserviceaccount.com">>
-    }.
-
 metrics_mapping() ->
     #{
         dropped => fun emqx_resource_metrics:dropped_get/1,
@@ -1019,7 +995,7 @@ t_publish_timeout(Config) ->
         <<"pipelining">> => 1,
         <<"resource_opts">> => #{
             <<"batch_size">> => 1,
-            <<"resume_interval">> => <<"15s">>
+            <<"resume_interval">> => <<"1s">>
         }
     }),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@@ -1085,8 +1061,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                     emqx:publish(Message),
                     {ok, _} = snabbkaffe:block_until(
                         ?match_n_events(2, #{
-                            ?snk_kind := gcp_pubsub_response,
-                            query_mode := async
+                            ?snk_kind := handle_async_reply_expired,
+                            expired := [_]
                         }),
                         _Timeout1 = 15_000
                     )
@@ -1107,7 +1083,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 timeout ->
                     ?assertMatch(
                         [_, _ | _],
-                        ?of_kind(gcp_pubsub_response, Trace)
+                        ?of_kind(handle_async_reply_expired, Trace)
                     )
             end,
             ok

+ 34 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_utils.erl

@@ -0,0 +1,34 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_utils).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+generate_service_account_json() ->
+    PrivateKeyPEM = generate_private_key_pem(),
+    service_account_json(PrivateKeyPEM).
+
+generate_private_key_pem() ->
+    PublicExponent = 65537,
+    Size = 2048,
+    Key = public_key:generate_key({rsa, Size, PublicExponent}),
+    DERKey = public_key:der_encode('PrivateKeyInfo', Key),
+    public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
+
+service_account_json(PrivateKeyPEM) ->
+    #{
+        <<"type">> => <<"service_account">>,
+        <<"project_id">> => <<"myproject">>,
+        <<"private_key_id">> => <<"kid">>,
+        <<"private_key">> => PrivateKeyPEM,
+        <<"client_email">> => <<"test@myproject.iam.gserviceaccount.com">>,
+        <<"client_id">> => <<"123812831923812319190">>,
+        <<"auth_uri">> => <<"https://accounts.google.com/o/oauth2/auth">>,
+        <<"token_uri">> => <<"https://oauth2.googleapis.com/token">>,
+        <<"auth_provider_x509_cert_url">> => <<"https://www.googleapis.com/oauth2/v1/certs">>,
+        <<"client_x509_cert_url">> =>
+            <<"https://www.googleapis.com/robot/v1/metadata/x509/test%40myproject.iam.gserviceaccount.com">>
+    }.

+ 11 - 1
apps/emqx_utils/src/emqx_utils_maps.erl

@@ -32,7 +32,8 @@
     deep_convert/3,
     diff_maps/2,
     best_effort_recursive_sum/3,
-    if_only_to_toggle_enable/2
+    if_only_to_toggle_enable/2,
+    update_if_present/3
 ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -293,3 +294,12 @@ if_only_to_toggle_enable(OldConf, Conf) ->
         {_, _, _} ->
             false
     end.
+
+%% Like `maps:update_with', but does nothing if key does not exist.
+update_if_present(Key, Fun, Map) ->
+    case Map of
+        #{Key := Val} ->
+            Map#{Key := Fun(Val)};
+        _ ->
+            Map
+    end.

+ 1 - 0
changes/ee/feat-11090.en.md

@@ -0,0 +1 @@
+Implemented GCP PubSub Consumer data integration bridge.

+ 1 - 0
changes/ee/fix-11090.en.md

@@ -0,0 +1 @@
+Fixed a configuration that prevented the pipelining option from being correctly set for GCP PubSub Producer bridge.

+ 25 - 10
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -16,7 +16,8 @@ api_schemas(Method) ->
     [
         %% We need to map the `type' field of a request (binary) to a
         %% bridge schema module.
-        api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method),
+        api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method ++ "_producer"),
+        api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub_consumer">>, Method ++ "_consumer"),
         api_ref(emqx_bridge_kafka, <<"kafka_consumer">>, Method ++ "_consumer"),
         %% TODO: rename this to `kafka_producer' after alias support is added
         %% to hocon; keeping this as just `kafka' for backwards compatibility.
@@ -92,6 +93,7 @@ resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
 resource_type(cassandra) -> emqx_bridge_cassandra_connector;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
 resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_impl_producer;
+resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer;
 resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
 resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
 resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
@@ -125,14 +127,6 @@ fields(bridges) ->
                     required => false
                 }
             )},
-        {gcp_pubsub,
-            mk(
-                hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config")),
-                #{
-                    desc => <<"EMQX Enterprise Config">>,
-                    required => false
-                }
-            )},
         {mysql,
             mk(
                 hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
@@ -198,7 +192,8 @@ fields(bridges) ->
                     required => false
                 }
             )}
-    ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
+    ] ++ kafka_structs() ++ pulsar_structs() ++ gcp_pubsub_structs() ++ mongodb_structs() ++
+        influxdb_structs() ++
         redis_structs() ++
         pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs().
 
@@ -249,6 +244,26 @@ pulsar_structs() ->
             )}
     ].
 
+gcp_pubsub_structs() ->
+    [
+        {gcp_pubsub,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config_producer")),
+                #{
+                    desc => <<"EMQX Enterprise Config">>,
+                    required => false
+                }
+            )},
+        {gcp_pubsub_consumer,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config_consumer")),
+                #{
+                    desc => <<"EMQX Enterprise Config">>,
+                    required => false
+                }
+            )}
+    ].
+
 influxdb_structs() ->
     [
         {Protocol,

+ 1 - 1
mix.exs

@@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do
       {:redbug, "2.0.8"},
       {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
       {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
-      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true},
+      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.11", override: true},
       {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},

+ 1 - 1
rebar.config

@@ -56,7 +56,7 @@
     , {gpb, "4.19.7"}
     , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
     , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
-    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}}
+    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.11"}}}
     , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}

+ 48 - 0
rel/i18n/emqx_bridge_gcp_pubsub.hocon

@@ -71,4 +71,52 @@ When a GCP Service Account is created (as described in https://developers.google
 service_account_json.label:
 """GCP Service Account Credentials"""
 
+  consumer_opts {
+    desc: "Local MQTT publish and GCP PubSub consumer configs."
+    label: "GCP PubSub to MQTT"
+  }
+
+  consumer_pull_max_messages {
+    desc: "The maximum number of messages to retrieve from GCP PubSub in a single pull request."
+          " The actual number may be less than the specified value."
+    label: "Maximum Messages to Pull"
+  }
+
+  consumer_topic_mapping {
+    desc: "Defines the mapping between GCP PubSub topics and MQTT topics. Must contain at least one item."
+    label: "Topic Mapping"
+  }
+
+  consumer_pubsub_topic {
+    desc: "GCP PubSub topic to consume from."
+    label: "GCP PubSub"
+  }
+
+  consumer_mqtt_topic {
+    desc: "Local topic to which consumed GCP PubSub messages should be published to."
+    label: "MQTT Topic"
+  }
+
+  consumer_mqtt_qos {
+    desc: "MQTT QoS used to publish messages consumed from GCP PubSub."
+    label: "QoS"
+  }
+
+consumer_mqtt_payload.desc:
+"""The template for transforming the incoming GCP PubSub message.  By default, it will use JSON format to serialize inputs from the GCP PubSub message.  Available fields are:
+<code>message_id</code>: the message ID assigned by GCP PubSub.
+<code>publish_time</code>: message timestamp assigned by GCP PubSub.
+<code>topic</code>: GCP PubSub topic.
+<code>value</code>: the payload of the GCP PubSub message.  Omitted if there's no payload.
+<code>attributes</code>: an object containing string key-value pairs.  Omitted if there are no attributes.
+<code>ordering_key</code>: GCP PubSub message ordering key.  Omitted if there's none."""
+
+consumer_mqtt_payload.label:
+"Payload Template"
+
+  consumer {
+    desc: "GCP PubSub Consumer configuration."
+    label: "GCP PubSub Consumer"
+  }
+
 }

+ 3 - 0
scripts/ct/run.sh

@@ -213,6 +213,9 @@ for dep in ${CT_DEPS}; do
             FILES+=( '.ci/docker-compose-file/docker-compose-minio-tcp.yaml'
                      '.ci/docker-compose-file/docker-compose-minio-tls.yaml' )
             ;;
+        gcp_emulator)
+            FILES+=( '.ci/docker-compose-file/docker-compose-gcp-emulator.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1