Browse Source

Merge pull request #11090 from thalesmg/gcp-pubsub-consumer

feat(gcp_pubsub_consumer): implement GCP PubSub Consumer bridge
Thales Macedo Garitezi 2 years atrás
parent
commit
7ef03d9e1f

+ 23 - 0
.ci/docker-compose-file/docker-compose-gcp-emulator.yaml

@@ -0,0 +1,23 @@
+version: '3.9'
+
+services:
+  gcp_emulator:
+    container_name: gcp_emulator
+    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:435.0.1-emulators
+    restart: always
+    expose:
+      - "8085"
+    # ports:
+    #   - "8085:8085"
+    networks:
+      - emqx_bridge
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:8085"]
+      interval: 30s
+      timeout: 5s
+      retries: 4
+    command:
+      - bash
+      - "-c"
+      - |
+        gcloud beta emulators pubsub start --project=emqx-pubsub --host-port=0.0.0.0:8085 --impersonate-service-account test@emqx.iam.gserviceaccount.com

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -149,5 +149,11 @@
     "listen": "0.0.0.0:19100",
     "upstream": "minio-tls:9100",
     "enabled": true
+  },
+  {
+    "name": "gcp_emulator",
+    "listen": "0.0.0.0:8085",
+    "upstream": "gcp_emulator:8085",
+    "enabled": true
   }
 ]

File diff suppressed because it is too large
+ 1 - 1
apps/emqx/test/emqx_static_checks_data/5.0.bpapi


+ 3 - 1
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -54,7 +54,9 @@
     (TYPE) =:= <<"mqtt">>
 ).
 -define(IS_INGRESS_BRIDGE(TYPE),
-    (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
+    (TYPE) =:= <<"kafka_consumer">> orelse
+        (TYPE) =:= <<"gcp_pubsub_consumer">> orelse
+        ?IS_BI_DIR_BRIDGE(TYPE)
 ).
 
 -if(?EMQX_RELEASE_EDITION == ee).

+ 2 - 0
apps/emqx_bridge_gcp_pubsub/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+gcp_emulator

+ 22 - 5
apps/emqx_bridge_gcp_pubsub/rebar.config

@@ -1,9 +1,26 @@
 %% -*- mode: erlang; -*-
-{erl_opts, [debug_info]}.
-{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
-       , {emqx_resource, {path, "../../apps/emqx_resource"}}
-       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
-       ]}.
+{erl_opts, [
+    warn_unused_vars,
+    warn_shadow_vars,
+    warn_unused_import,
+    warn_obsolete_guard,
+    warnings_as_errors,
+    debug_info
+]}.
+{deps, [
+    {emqx_connector, {path, "../../apps/emqx_connector"}},
+    {emqx_resource, {path, "../../apps/emqx_resource"}},
+    {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+]}.
+
+{xref_checks, [
+    undefined_function_calls,
+    undefined_functions,
+    locals_not_used,
+    deprecated_function_calls,
+    warnings_as_errors,
+    deprecated_functions
+]}.
 
 {shell, [
     {apps, [emqx_bridge_gcp_pubsub]}

+ 181 - 23
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl

@@ -7,7 +7,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1]).
 
 %% hocon_schema API
 -export([
@@ -39,11 +39,22 @@ namespace() ->
 roots() ->
     [].
 
-fields("config") ->
+fields("config_producer") ->
     emqx_bridge_schema:common_bridge_fields() ++
         emqx_resource_schema:fields("resource_opts") ++
-        fields(bridge_config);
-fields(bridge_config) ->
+        fields(connector_config) ++ fields(producer);
+fields("config_consumer") ->
+    emqx_bridge_schema:common_bridge_fields() ++
+        [
+            {resource_opts,
+                mk(
+                    ref("consumer_resource_opts"),
+                    #{required => true, desc => ?DESC(emqx_resource_schema, "creation_opts")}
+                )}
+        ] ++
+        fields(connector_config) ++
+        [{consumer, mk(ref(consumer), #{required => true, desc => ?DESC(consumer_opts)})}];
+fields(connector_config) ->
     [
         {connect_timeout,
             sc(
@@ -88,6 +99,20 @@ fields(bridge_config) ->
                     desc => ?DESC("request_timeout")
                 }
             )},
+        {service_account_json,
+            sc(
+                service_account_json(),
+                #{
+                    required => true,
+                    validator => fun ?MODULE:service_account_json_validator/1,
+                    converter => fun ?MODULE:service_account_json_converter/1,
+                    sensitive => true,
+                    desc => ?DESC("service_account_json")
+                }
+            )}
+    ];
+fields(producer) ->
+    [
         {payload_template,
             sc(
                 binary(),
@@ -110,28 +135,88 @@ fields(bridge_config) ->
                     required => true,
                     desc => ?DESC("pubsub_topic")
                 }
+            )}
+    ];
+fields(consumer) ->
+    [
+        {ack_retry_interval,
+            mk(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"5s">>,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
             )},
-        {service_account_json,
-            sc(
-                service_account_json(),
+        {pull_max_messages,
+            mk(
+                pos_integer(),
+                #{default => 100, desc => ?DESC("consumer_pull_max_messages")}
+            )},
+        {consumer_workers_per_topic,
+            mk(
+                pos_integer(),
+                #{
+                    default => 1,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        {topic_mapping,
+            mk(
+                hoconsc:array(ref(consumer_topic_mapping)),
                 #{
                     required => true,
-                    validator => fun ?MODULE:service_account_json_validator/1,
-                    converter => fun ?MODULE:service_account_json_converter/1,
-                    sensitive => true,
-                    desc => ?DESC("service_account_json")
+                    validator => fun consumer_topic_mapping_validator/1,
+                    desc => ?DESC("consumer_topic_mapping")
                 }
             )}
     ];
-fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post");
-fields("post") ->
-    [type_field(), name_field() | fields("config")];
-fields("put") ->
-    fields("config").
-
-desc("config") ->
+fields(consumer_topic_mapping) ->
+    [
+        {pubsub_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_pubsub_topic)})},
+        {mqtt_topic, mk(binary(), #{required => true, desc => ?DESC(consumer_mqtt_topic)})},
+        {qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
+        {payload_template,
+            mk(
+                string(),
+                #{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)}
+            )}
+    ];
+fields("consumer_resource_opts") ->
+    ResourceFields = emqx_resource_schema:fields("creation_opts"),
+    SupportedFields = [
+        auto_restart_interval,
+        health_check_interval,
+        request_ttl,
+        resume_interval,
+        worker_pool_size
+    ],
+    lists:filter(
+        fun({Field, _Sc}) -> lists:member(Field, SupportedFields) end,
+        ResourceFields
+    );
+fields("get_producer") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_producer");
+fields("post_producer") ->
+    [type_field_producer(), name_field() | fields("config_producer")];
+fields("put_producer") ->
+    fields("config_producer");
+fields("get_consumer") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_consumer");
+fields("post_consumer") ->
+    [type_field_consumer(), name_field() | fields("config_consumer")];
+fields("put_consumer") ->
+    fields("config_consumer").
+
+desc("config_producer") ->
+    ?DESC("desc_config");
+desc("config_consumer") ->
     ?DESC("desc_config");
+desc("consumer_resource_opts") ->
+    ?DESC(emqx_resource_schema, "creation_opts");
+desc(consumer_topic_mapping) ->
+    ?DESC("consumer_topic_mapping");
+desc(consumer) ->
+    ?DESC("consumer");
 desc(_) ->
     undefined.
 
@@ -139,13 +224,19 @@ conn_bridge_examples(Method) ->
     [
         #{
             <<"gcp_pubsub">> => #{
-                summary => <<"GCP PubSub Bridge">>,
-                value => values(Method)
+                summary => <<"GCP PubSub Producer Bridge">>,
+                value => values(producer, Method)
+            }
+        },
+        #{
+            <<"gcp_pubsub_consumer">> => #{
+                summary => <<"GCP PubSub Consumer Bridge">>,
+                value => values(consumer, Method)
             }
         }
     ].
 
-values(_Method) ->
+values(producer, _Method) ->
     #{
         pubsub_topic => <<"mytopic">>,
         service_account_json =>
@@ -173,17 +264,71 @@ values(_Method) ->
                     <<"https://oauth2.googleapis.com/token">>,
                 type => <<"service_account">>
             }
+    };
+values(consumer, _Method) ->
+    #{
+        connect_timeout => <<"15s">>,
+        consumer =>
+            #{
+                pull_max_messages => 100,
+                topic_mapping => [
+                    #{
+                        pubsub_topic => <<"pubsub-topic-1">>,
+                        mqtt_topic => <<"mqtt/topic/1">>,
+                        qos => 1,
+                        payload_template => <<"${.}">>
+                    },
+                    #{
+                        pubsub_topic => <<"pubsub-topic-2">>,
+                        mqtt_topic => <<"mqtt/topic/2">>,
+                        qos => 2,
+                        payload_template =>
+                            <<"v = ${.value}, a = ${.attributes}, o = ${.ordering_key}">>
+                    }
+                ]
+            },
+        resource_opts => #{request_ttl => <<"20s">>},
+        service_account_json =>
+            #{
+                auth_provider_x509_cert_url =>
+                    <<"https://www.googleapis.com/oauth2/v1/certs">>,
+                auth_uri =>
+                    <<"https://accounts.google.com/o/oauth2/auth">>,
+                client_email =>
+                    <<"test@myproject.iam.gserviceaccount.com">>,
+                client_id => <<"123812831923812319190">>,
+                client_x509_cert_url =>
+                    <<
+                        "https://www.googleapis.com/robot/v1/"
+                        "metadata/x509/test%40myproject.iam.gserviceaccount.com"
+                    >>,
+                private_key =>
+                    <<
+                        "-----BEGIN PRIVATE KEY-----\n"
+                        "MIIEvQI..."
+                    >>,
+                private_key_id => <<"kid">>,
+                project_id => <<"myproject">>,
+                token_uri =>
+                    <<"https://oauth2.googleapis.com/token">>,
+                type => <<"service_account">>
+            }
     }.
 
 %%-------------------------------------------------------------------------------------------------
 %% Helper fns
 %%-------------------------------------------------------------------------------------------------
 
+ref(Name) -> hoconsc:ref(?MODULE, Name).
+
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 
-type_field() ->
+type_field_producer() ->
     {type, mk(enum([gcp_pubsub]), #{required => true, desc => ?DESC("desc_type")})}.
 
+type_field_consumer() ->
+    {type, mk(enum([gcp_pubsub_consumer]), #{required => true, desc => ?DESC("desc_type")})}.
+
 name_field() ->
     {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
 
@@ -225,3 +370,16 @@ service_account_json_converter(Map) when is_map(Map) ->
     maps:with(ExpectedKeys, Map);
 service_account_json_converter(Val) ->
     Val.
+
+consumer_topic_mapping_validator(_TopicMapping = []) ->
+    {error, "There must be at least one GCP PubSub-MQTT topic mapping"};
+consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+    NumEntries = length(TopicMapping),
+    PubSubTopics = [KT || #{<<"pubsub_topic">> := KT} <- TopicMapping],
+    DistinctPubSubTopics = length(lists:usort(PubSubTopics)),
+    case DistinctPubSubTopics =:= NumEntries of
+        true ->
+            ok;
+        false ->
+            {error, "GCP PubSub topics must not be repeated in a bridge"}
+    end.

+ 157 - 256
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl

@@ -2,9 +2,7 @@
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_bridge_gcp_pubsub_connector).
-
--behaviour(emqx_resource).
+-module(emqx_bridge_gcp_pubsub_client).
 
 -include_lib("jose/include/jose_jwk.hrl").
 -include_lib("emqx_connector/include/emqx_connector_tables.hrl").
@@ -13,74 +11,79 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
-%% `emqx_resource' API
+%% API
 -export([
-    callback_mode/0,
-    on_start/2,
-    on_stop/2,
-    on_query/3,
-    on_query_async/4,
-    on_batch_query/3,
-    on_batch_query_async/4,
-    on_get_status/2
+    start/2,
+    stop/1,
+    query_sync/2,
+    query_async/3,
+    get_status/1
 ]).
 -export([reply_delegator/3]).
 
+-export([get_topic/2]).
+
+-export([get_jwt_authorization_header/1]).
+
 -type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json().
+-type project_id() :: binary().
 -type config() :: #{
     connect_timeout := emqx_schema:duration_ms(),
     max_retries := non_neg_integer(),
-    pubsub_topic := binary(),
     resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
     service_account_json := service_account_json(),
     any() => term()
 }.
--type state() :: #{
+-opaque state() :: #{
     connect_timeout := timer:time(),
     jwt_config := emqx_connector_jwt:jwt_config(),
     max_retries := non_neg_integer(),
-    payload_template := emqx_placeholder:tmpl_token(),
     pool_name := binary(),
-    project_id := binary(),
-    pubsub_topic := binary(),
+    project_id := project_id(),
     request_ttl := infinity | timer:time()
 }.
 -type headers() :: [{binary(), iodata()}].
 -type body() :: iodata().
 -type status_code() :: 100..599.
+-type method() :: get | post | put | patch.
+-type path() :: binary().
+-type prepared_request() :: {method(), path(), body()}.
+-type topic() :: binary().
+
+-export_type([
+    service_account_json/0,
+    state/0,
+    headers/0,
+    body/0,
+    status_code/0,
+    project_id/0,
+    topic/0
+]).
 
 -define(DEFAULT_PIPELINE_SIZE, 100).
 
 %%-------------------------------------------------------------------------------------------------
-%% emqx_resource API
+%% API
 %%-------------------------------------------------------------------------------------------------
 
-callback_mode() -> async_if_possible.
-
--spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
-on_start(
+-spec start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+start(
     ResourceId,
     #{
         connect_timeout := ConnectTimeout,
         max_retries := MaxRetries,
-        payload_template := PayloadTemplate,
         pool_size := PoolSize,
-        pubsub_topic := PubSubTopic,
         resource_opts := #{request_ttl := RequestTTL}
     } = Config
 ) ->
-    ?SLOG(info, #{
-        msg => "starting_gcp_pubsub_bridge",
-        connector => ResourceId,
-        config => Config
-    }),
-    %% emulating the emulator behavior
-    %% https://cloud.google.com/pubsub/docs/emulator
-    HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"),
+    {Transport, HostPort} = get_transport(),
     #{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
     PoolType = random,
-    Transport = tls,
-    TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),
+    TransportOpts =
+        case Transport of
+            tls -> emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none});
+            tcp -> []
+        end,
     NTransportOpts = emqx_utils:ipv6_probe(TransportOpts),
     PoolOpts = [
         {host, Host},
@@ -91,7 +94,7 @@ on_start(
         {pool_size, PoolSize},
         {transport, Transport},
         {transport_opts, NTransportOpts},
-        {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
+        {enable_pipelining, maps:get(pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
     ],
     #{
         jwt_config := JWTConfig,
@@ -101,10 +104,8 @@ on_start(
         connect_timeout => ConnectTimeout,
         jwt_config => JWTConfig,
         max_retries => MaxRetries,
-        payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
         pool_name => ResourceId,
         project_id => ProjectId,
-        pubsub_topic => PubSubTopic,
         request_ttl => RequestTTL
     },
     ?tp(
@@ -130,8 +131,8 @@ on_start(
             {error, Reason}
     end.
 
--spec on_stop(resource_id(), state()) -> ok | {error, term()}.
-on_stop(ResourceId, _State) ->
+-spec stop(resource_id()) -> ok | {error, term()}.
+stop(ResourceId) ->
     ?tp(gcp_pubsub_stop, #{resource_id => ResourceId}),
     ?SLOG(info, #{
         msg => "stopping_gcp_pubsub_bridge",
@@ -147,73 +148,41 @@ on_stop(ResourceId, _State) ->
             Error
     end.
 
--spec on_query(
-    resource_id(),
-    {send_message, map()},
+-spec query_sync(
+    {prepared_request, prepared_request()},
     state()
 ) ->
-    {ok, status_code(), headers()}
-    | {ok, status_code(), headers(), body()}
-    | {error, {recoverable_error, term()}}
-    | {error, term()}.
-on_query(ResourceId, {send_message, Selected}, State) ->
-    Requests = [{send_message, Selected}],
+    {ok, map()} | {error, {recoverable_error, term()} | term()}.
+query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) ->
+    PoolName = maps:get(pool_name, State),
     ?TRACE(
         "QUERY_SYNC",
         "gcp_pubsub_received",
-        #{requests => Requests, connector => ResourceId, state => State}
+        #{requests => PreparedRequest, connector => PoolName, state => State}
     ),
-    do_send_requests_sync(State, Requests, ResourceId).
+    do_send_requests_sync(State, {prepared_request, PreparedRequest}).
 
--spec on_query_async(
-    resource_id(),
-    {send_message, map()},
+-spec query_async(
+    {prepared_request, prepared_request()},
     {ReplyFun :: function(), Args :: list()},
     state()
 ) -> {ok, pid()}.
-on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
-    Requests = [{send_message, Selected}],
-    ?TRACE(
-        "QUERY_ASYNC",
-        "gcp_pubsub_received",
-        #{requests => Requests, connector => ResourceId, state => State}
-    ),
-    do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
-
--spec on_batch_query(
-    resource_id(),
-    [{send_message, map()}],
-    state()
+query_async(
+    {prepared_request, PreparedRequest = {_Method, _Path, _Body}},
+    ReplyFunAndArgs,
+    State
 ) ->
-    {ok, status_code(), headers()}
-    | {ok, status_code(), headers(), body()}
-    | {error, {recoverable_error, term()}}
-    | {error, term()}.
-on_batch_query(ResourceId, Requests, State) ->
-    ?TRACE(
-        "QUERY_SYNC",
-        "gcp_pubsub_received",
-        #{requests => Requests, connector => ResourceId, state => State}
-    ),
-    do_send_requests_sync(State, Requests, ResourceId).
-
--spec on_batch_query_async(
-    resource_id(),
-    [{send_message, map()}],
-    {ReplyFun :: function(), Args :: list()},
-    state()
-) -> {ok, pid()}.
-on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
+    PoolName = maps:get(pool_name, State),
     ?TRACE(
         "QUERY_ASYNC",
         "gcp_pubsub_received",
-        #{requests => Requests, connector => ResourceId, state => State}
+        #{requests => PreparedRequest, connector => PoolName, state => State}
     ),
-    do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId).
+    do_send_requests_async(State, {prepared_request, PreparedRequest}, ReplyFunAndArgs).
 
--spec on_get_status(resource_id(), state()) -> connected | disconnected.
-on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
-    case do_get_status(ResourceId, Timeout) of
+-spec get_status(state()) -> connected | disconnected.
+get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) ->
+    case do_get_status(PoolName, Timeout) of
         true ->
             connected;
         false ->
@@ -224,6 +193,19 @@ on_get_status(ResourceId, #{connect_timeout := Timeout} = State) ->
             disconnected
     end.
 
+%%-------------------------------------------------------------------------------------------------
+%% API
+%%-------------------------------------------------------------------------------------------------
+
+-spec get_topic(topic(), state()) -> {ok, map()} | {error, term()}.
+get_topic(Topic, ConnectorState) ->
+    #{project_id := ProjectId} = ConnectorState,
+    Method = get,
+    Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
+    Body = <<>>,
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    query_sync(PreparedRequest, ConnectorState).
+
 %%-------------------------------------------------------------------------------------------------
 %% Helper fns
 %%-------------------------------------------------------------------------------------------------
@@ -286,28 +268,6 @@ parse_jwt_config(ResourceId, #{
         project_id => ProjectId
     }.
 
--spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
-encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
-    Interpolated =
-        case PayloadTemplate of
-            [] -> emqx_utils_json:encode(Selected);
-            _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
-        end,
-    #{data => base64:encode(Interpolated)}.
-
--spec to_pubsub_request([#{data := binary()}]) -> binary().
-to_pubsub_request(Payloads) ->
-    emqx_utils_json:encode(#{messages => Payloads}).
-
--spec publish_path(state()) -> binary().
-publish_path(
-    _State = #{
-        project_id := ProjectId,
-        pubsub_topic := PubSubTopic
-    }
-) ->
-    <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
-
 -spec get_jwt_authorization_header(emqx_connector_jwt:jwt_config()) -> [{binary(), binary()}].
 get_jwt_authorization_header(JWTConfig) ->
     JWT = emqx_connector_jwt:ensure_jwt(JWTConfig),
@@ -315,16 +275,11 @@ get_jwt_authorization_header(JWTConfig) ->
 
 -spec do_send_requests_sync(
     state(),
-    [{send_message, map()}],
-    resource_id()
+    {prepared_request, prepared_request()}
 ) ->
-    {ok, status_code(), headers()}
-    | {ok, status_code(), headers(), body()}
-    | {error, {recoverable_error, term()}}
-    | {error, term()}.
-do_send_requests_sync(State, Requests, ResourceId) ->
+    {ok, map()} | {error, {recoverable_error, term()} | term()}.
+do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) ->
     #{
-        jwt_config := JWTConfig,
         pool_name := PoolName,
         max_retries := MaxRetries,
         request_ttl := RequestTTL
@@ -332,192 +287,125 @@ do_send_requests_sync(State, Requests, ResourceId) ->
     ?tp(
         gcp_pubsub_bridge_do_send_requests,
         #{
+            request => {prepared_request, {Method, Path, Body}},
             query_mode => sync,
-            resource_id => ResourceId,
-            requests => Requests
+            resource_id => PoolName
         }
     ),
+    Request = to_ehttpc_request(State, Method, Path, Body),
+    Response = ehttpc:request(
+        PoolName,
+        Method,
+        Request,
+        RequestTTL,
+        MaxRetries
+    ),
+    handle_response(Response, PoolName, _QueryMode = sync).
+
+-spec do_send_requests_async(
+    state(),
+    {prepared_request, prepared_request()},
+    {ReplyFun :: function(), Args :: list()}
+) -> {ok, pid()}.
+do_send_requests_async(
+    State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs
+) ->
+    #{
+        pool_name := PoolName,
+        request_ttl := RequestTTL
+    } = State,
+    ?tp(
+        gcp_pubsub_bridge_do_send_requests,
+        #{
+            request => {prepared_request, {Method, Path, Body}},
+            query_mode => async,
+            resource_id => PoolName
+        }
+    ),
+    Request = to_ehttpc_request(State, Method, Path, Body),
+    Worker = ehttpc_pool:pick_worker(PoolName),
+    ok = ehttpc:request_async(
+        Worker,
+        Method,
+        Request,
+        RequestTTL,
+        {fun ?MODULE:reply_delegator/3, [PoolName, ReplyFunAndArgs]}
+    ),
+    {ok, Worker}.
+
+to_ehttpc_request(State, Method, Path, Body) ->
+    #{jwt_config := JWTConfig} = State,
     Headers = get_jwt_authorization_header(JWTConfig),
-    Payloads =
-        lists:map(
-            fun({send_message, Selected}) ->
-                encode_payload(State, Selected)
-            end,
-            Requests
-        ),
-    Body = to_pubsub_request(Payloads),
-    Path = publish_path(State),
-    Method = post,
-    Request = {Path, Headers, Body},
-    case
-        ehttpc:request(
-            PoolName,
-            Method,
-            Request,
-            RequestTTL,
-            MaxRetries
-        )
-    of
-        {error, Reason} when
-            Reason =:= econnrefused;
-            %% this comes directly from `gun'...
-            Reason =:= {closed, "The connection was lost."};
-            Reason =:= timeout
-        ->
-            ?tp(
-                warning,
-                gcp_pubsub_request_failed,
-                #{
-                    reason => Reason,
-                    query_mode => sync,
-                    recoverable_error => true,
-                    connector => ResourceId
-                }
-            ),
-            {error, {recoverable_error, Reason}};
-        {error, Reason} = Result ->
+    case {Method, Body} of
+        {get, <<>>} -> {Path, Headers};
+        _ -> {Path, Headers, Body}
+    end.
+
+-spec handle_response(term(), resource_id(), sync | async) -> {ok, map()} | {error, term()}.
+handle_response(Result, ResourceId, QueryMode) ->
+    case Result of
+        {error, Reason} ->
             ?tp(
-                error,
                 gcp_pubsub_request_failed,
                 #{
                     reason => Reason,
-                    query_mode => sync,
-                    recoverable_error => false,
+                    query_mode => QueryMode,
                     connector => ResourceId
                 }
             ),
-            Result;
-        {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
+            {error, Reason};
+        {ok, StatusCode, RespHeaders} when StatusCode >= 200 andalso StatusCode < 300 ->
             ?tp(
                 gcp_pubsub_response,
                 #{
                     response => Result,
-                    query_mode => sync,
+                    query_mode => QueryMode,
                     connector => ResourceId
                 }
             ),
-            Result;
-        {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
+            {ok, #{status_code => StatusCode, headers => RespHeaders}};
+        {ok, StatusCode, RespHeaders, RespBody} when
+            StatusCode >= 200 andalso StatusCode < 300
+        ->
             ?tp(
                 gcp_pubsub_response,
                 #{
                     response => Result,
-                    query_mode => sync,
+                    query_mode => QueryMode,
                     connector => ResourceId
                 }
             ),
-            Result;
+            {ok, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}};
         {ok, StatusCode, RespHeaders} = _Result ->
             ?tp(
                 gcp_pubsub_response,
                 #{
                     response => _Result,
-                    query_mode => sync,
+                    query_mode => QueryMode,
                     connector => ResourceId
                 }
             ),
-            ?SLOG(error, #{
-                msg => "gcp_pubsub_error_response",
-                request => Request,
-                connector => ResourceId,
-                status_code => StatusCode
-            }),
             {error, #{status_code => StatusCode, headers => RespHeaders}};
         {ok, StatusCode, RespHeaders, RespBody} = _Result ->
             ?tp(
                 gcp_pubsub_response,
                 #{
                     response => _Result,
-                    query_mode => sync,
+                    query_mode => QueryMode,
                     connector => ResourceId
                 }
             ),
-            ?SLOG(error, #{
-                msg => "gcp_pubsub_error_response",
-                request => Request,
-                connector => ResourceId,
-                status_code => StatusCode
-            }),
             {error, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}}
     end.
 
--spec do_send_requests_async(
-    state(),
-    [{send_message, map()}],
-    {ReplyFun :: function(), Args :: list()},
-    resource_id()
-) -> {ok, pid()}.
-do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
-    #{
-        jwt_config := JWTConfig,
-        pool_name := PoolName,
-        request_ttl := RequestTTL
-    } = State,
-    ?tp(
-        gcp_pubsub_bridge_do_send_requests,
-        #{
-            query_mode => async,
-            resource_id => ResourceId,
-            requests => Requests
-        }
-    ),
-    Headers = get_jwt_authorization_header(JWTConfig),
-    Payloads =
-        lists:map(
-            fun({send_message, Selected}) ->
-                encode_payload(State, Selected)
-            end,
-            Requests
-        ),
-    Body = to_pubsub_request(Payloads),
-    Path = publish_path(State),
-    Method = post,
-    Request = {Path, Headers, Body},
-    Worker = ehttpc_pool:pick_worker(PoolName),
-    ok = ehttpc:request_async(
-        Worker,
-        Method,
-        Request,
-        RequestTTL,
-        {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
-    ),
-    {ok, Worker}.
-
 -spec reply_delegator(
     resource_id(),
     {ReplyFun :: function(), Args :: list()},
     term() | {error, econnrefused | timeout | term()}
 ) -> ok.
-reply_delegator(_ResourceId, ReplyFunAndArgs, Result) ->
-    case Result of
-        {error, Reason} when
-            Reason =:= econnrefused;
-            %% this comes directly from `gun'...
-            Reason =:= {closed, "The connection was lost."};
-            Reason =:= timeout
-        ->
-            ?tp(
-                gcp_pubsub_request_failed,
-                #{
-                    reason => Reason,
-                    query_mode => async,
-                    recoverable_error => true,
-                    connector => _ResourceId
-                }
-            ),
-            Result1 = {error, {recoverable_error, Reason}},
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
-        _ ->
-            ?tp(
-                gcp_pubsub_response,
-                #{
-                    response => Result,
-                    query_mode => async,
-                    connector => _ResourceId
-                }
-            ),
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
-    end.
+reply_delegator(ResourceId, ReplyFunAndArgs, Response) ->
+    Result = handle_response(Response, ResourceId, _QueryMode = async),
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
 
 -spec do_get_status(resource_id(), timer:time()) -> boolean().
 do_get_status(ResourceId, Timeout) ->
@@ -546,3 +434,16 @@ do_get_status(ResourceId, Timeout) ->
         exit:timeout ->
             false
     end.
+
+-spec get_transport() -> {tls | tcp, string()}.
+get_transport() ->
+    %% emulating the emulator behavior
+    %% https://cloud.google.com/pubsub/docs/emulator
+    case os:getenv("PUBSUB_EMULATOR_HOST") of
+        false ->
+            {tls, "pubsub.googleapis.com:443"};
+        HostPort0 ->
+            %% The emulator is plain HTTP...
+            Transport0 = persistent_term:get({?MODULE, transport}, tcp),
+            {Transport0, HostPort0}
+    end.

+ 568 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -0,0 +1,568 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_consumer_worker).
+
+-behaviour(ecpool_worker).
+-behaviour(gen_server).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `ecpool_worker' API
+-export([connect/1, health_check/1]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_info/2,
+    handle_cast/2,
+    handle_call/3,
+    handle_continue/2,
+    terminate/2
+]).
+
+-export([get_subscription/1]).
+-export([reply_delegator/3, pull_async/1, process_pull_response/2, ensure_subscription/1]).
+
+-type subscription_id() :: binary().
+-type bridge_name() :: atom() | binary().
+-type ack_id() :: binary().
+-type config() :: #{
+    ack_retry_interval := emqx_schema:timeout_duration_ms(),
+    client := emqx_bridge_gcp_pubsub_client:state(),
+    ecpool_worker_id => non_neg_integer(),
+    hookpoint := binary(),
+    instance_id := binary(),
+    mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
+    project_id := emqx_bridge_gcp_pubsub_client:project_id(),
+    pull_max_messages := non_neg_integer(),
+    subscription_id => subscription_id(),
+    topic => emqx_bridge_gcp_pubsub_client:topic()
+}.
+-type state() :: #{
+    ack_retry_interval := emqx_schema:timeout_duration_ms(),
+    ack_timer := undefined | reference(),
+    async_workers := #{pid() => reference()},
+    client := emqx_bridge_gcp_pubsub_client:state(),
+    ecpool_worker_id := non_neg_integer(),
+    hookpoint := binary(),
+    instance_id := binary(),
+    mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
+    pending_acks => [ack_id()],
+    project_id := emqx_bridge_gcp_pubsub_client:project_id(),
+    pull_max_messages := non_neg_integer(),
+    pull_timer := undefined | reference(),
+    subscription_id => subscription_id(),
+    topic => emqx_bridge_gcp_pubsub_client:topic()
+}.
+-type decoded_message() :: map().
+
+-define(HEALTH_CHECK_TIMEOUT, 10_000).
+-define(OPTVAR_SUB_OK(PID), {?MODULE, PID}).
+-define(PULL_INTERVAL, 5_000).
+
+%%-------------------------------------------------------------------------------------------------
+%% API used by `reply_delegator'
+%%-------------------------------------------------------------------------------------------------
+
+-spec pull_async(pid()) -> ok.
+pull_async(WorkerPid) ->
+    gen_server:cast(WorkerPid, pull_async).
+
+-spec process_pull_response(pid(), binary()) -> ok.
+process_pull_response(WorkerPid, RespBody) ->
+    gen_server:cast(WorkerPid, {process_pull_response, RespBody}).
+
+-spec ensure_subscription(pid()) -> ok.
+ensure_subscription(WorkerPid) ->
+    gen_server:cast(WorkerPid, ensure_subscription).
+
+-spec reply_delegator(pid(), binary(), {ok, map()} | {error, timeout | term()}) -> ok.
+reply_delegator(WorkerPid, InstanceId, Result) ->
+    case Result of
+        {error, timeout} ->
+            ?MODULE:pull_async(WorkerPid);
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "gcp_pubsub_consumer_worker_pull_error",
+                instance_id => InstanceId,
+                reason => Reason
+            }),
+            case Reason of
+                #{status_code := 409} ->
+                    %% the subscription was not found; deleted?!
+                    ?MODULE:ensure_subscription(WorkerPid);
+                _ ->
+                    ?MODULE:pull_async(WorkerPid)
+            end;
+        {ok, #{status_code := 200, body := RespBody}} ->
+            ?MODULE:process_pull_response(WorkerPid, RespBody)
+    end.
+
+%%-------------------------------------------------------------------------------------------------
+%% Debugging API
+%%-------------------------------------------------------------------------------------------------
+
+-spec get_subscription(pid()) -> {ok, map()} | {error, term()}.
+get_subscription(WorkerPid) ->
+    gen_server:call(WorkerPid, get_subscription, 5_000).
+
+%%-------------------------------------------------------------------------------------------------
+%% `ecpool' health check
+%%-------------------------------------------------------------------------------------------------
+
+-spec health_check(pid()) -> boolean().
+health_check(WorkerPid) ->
+    case optvar:read(?OPTVAR_SUB_OK(WorkerPid), ?HEALTH_CHECK_TIMEOUT) of
+        {ok, _} ->
+            true;
+        timeout ->
+            false
+    end.
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+connect(Opts0) ->
+    Opts = maps:from_list(Opts0),
+    #{
+        ack_retry_interval := AckRetryInterval,
+        bridge_name := BridgeName,
+        client := Client,
+        ecpool_worker_id := WorkerId,
+        hookpoint := Hookpoint,
+        instance_id := InstanceId,
+        project_id := ProjectId,
+        pull_max_messages := PullMaxMessages,
+        topic_mapping := TopicMapping
+    } = Opts,
+    TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)),
+    Index = 1 + (WorkerId rem map_size(TopicMapping)),
+    {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList),
+    Config = #{
+        ack_retry_interval => AckRetryInterval,
+        %% Note: the `client' value here must be immutable and not changed by the
+        %% bridge during `on_get_status', since we have handed it over to the pull
+        %% workers.
+        client => Client,
+        hookpoint => Hookpoint,
+        instance_id => InstanceId,
+        mqtt_config => MQTTConfig,
+        project_id => ProjectId,
+        pull_max_messages => PullMaxMessages,
+        topic => Topic,
+        subscription_id => subscription_id(BridgeName, Topic)
+    },
+    start_link(Config).
+
+%%-------------------------------------------------------------------------------------------------
+%% `gen_server' API
+%%-------------------------------------------------------------------------------------------------
+
+-spec init(config()) -> {ok, state(), {continue, ensure_subscription}}.
+init(Config) ->
+    process_flag(trap_exit, true),
+    State = Config#{
+        ack_timer => undefined,
+        async_workers => #{},
+        pending_acks => [],
+        pull_timer => undefined
+    },
+    {ok, State, {continue, ensure_subscription}}.
+
+handle_continue(ensure_subscription, State0) ->
+    case ensure_subscription_exists(State0) of
+        ok ->
+            #{instance_id := InstanceId} = State0,
+            ?tp(
+                debug,
+                "gcp_pubsub_consumer_worker_subscription_ready",
+                #{instance_id => InstanceId}
+            ),
+            ?MODULE:pull_async(self()),
+            optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
+            {noreply, State0};
+        error ->
+            %% FIXME: add delay if topic does not exist?!
+            %% retry
+            {noreply, State0, {continue, ensure_subscription}}
+    end.
+
+handle_call(get_subscription, _From, State0) ->
+    Res = do_get_subscription(State0),
+    {reply, Res, State0};
+handle_call(_Request, _From, State0) ->
+    {reply, {error, unknown_call}, State0}.
+
+handle_cast(pull_async, State0) ->
+    State = do_pull_async(State0),
+    {noreply, State};
+handle_cast({process_pull_response, RespBody}, State0) ->
+    State = do_process_pull_response(State0, RespBody),
+    {noreply, State};
+handle_cast(ensure_subscription, State0) ->
+    {noreply, State0, {continue, ensure_subscription}};
+handle_cast(_Request, State0) ->
+    {noreply, State0}.
+
+handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) ->
+    State1 = acknowledge(State0),
+    State = ensure_ack_timer(State1),
+    {noreply, State};
+handle_info({timeout, TRef, pull}, State0 = #{pull_timer := TRef}) ->
+    State1 = State0#{pull_timer := undefined},
+    State2 = do_pull_async(State1),
+    State = ensure_pull_timer(State2),
+    {noreply, State};
+handle_info(
+    {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0}
+) when
+    is_map_key(AsyncWorkerPid, Workers0)
+->
+    Workers = maps:remove(AsyncWorkerPid, Workers0),
+    State1 = State0#{async_workers := Workers},
+    State = do_pull_async(State1),
+    {noreply, State};
+handle_info(Msg, State0) ->
+    #{
+        instance_id := InstanceId,
+        topic := Topic
+    } = State0,
+    ?SLOG(debug, #{
+        msg => "gcp_pubsub_consumer_worker_unexpected_message",
+        unexpected_msg => Msg,
+        instance_id => InstanceId,
+        topic => Topic
+    }),
+    {noreply, State0}.
+
+terminate(_Reason, _State) ->
+    optvar:unset(?OPTVAR_SUB_OK(self())),
+    ok.
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+-spec start_link(config()) -> gen_server:start_ret().
+start_link(Config) ->
+    gen_server:start_link(?MODULE, Config, []).
+
+-spec ensure_ack_timer(state()) -> state().
+ensure_ack_timer(State = #{pending_acks := []}) ->
+    State;
+ensure_ack_timer(State = #{ack_timer := TRef}) when is_reference(TRef) ->
+    State;
+ensure_ack_timer(State = #{ack_retry_interval := AckRetryInterval}) ->
+    State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}.
+
+-spec ensure_pull_timer(state()) -> state().
+ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) ->
+    State;
+ensure_pull_timer(State) ->
+    State#{pull_timer := emqx_utils:start_timer(?PULL_INTERVAL, pull)}.
+
+-spec ensure_subscription_exists(state()) -> ok | error.
+ensure_subscription_exists(State) ->
+    #{
+        client := Client,
+        instance_id := InstanceId,
+        subscription_id := SubscriptionId,
+        topic := Topic
+    } = State,
+    Method = put,
+    Path = path(State, create),
+    Body = body(State, create),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
+    case Res of
+        {error, #{status_code := 409}} ->
+            %% already exists
+            ?SLOG(debug, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_already_exists",
+                instance_id => InstanceId,
+                topic => Topic,
+                subscription_id => SubscriptionId
+            }),
+            Method1 = patch,
+            Path1 = path(State, create),
+            Body1 = body(State, patch_subscription),
+            PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}},
+            Res1 = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client),
+            ?SLOG(debug, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_patch",
+                instance_id => InstanceId,
+                topic => Topic,
+                subscription_id => SubscriptionId,
+                result => Res1
+            }),
+            ok;
+        {ok, #{status_code := 200}} ->
+            ?SLOG(debug, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_created",
+                instance_id => InstanceId,
+                topic => Topic,
+                subscription_id => SubscriptionId
+            }),
+            ok;
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "gcp_pubsub_consumer_worker_subscription_error",
+                instance_id => InstanceId,
+                topic => Topic,
+                reason => Reason
+            }),
+            error
+    end.
+
+%% We use async requests so that this process will be more responsive to system messages.
+do_pull_async(State) ->
+    #{
+        client := Client,
+        instance_id := InstanceId
+    } = State,
+    Method = post,
+    Path = path(State, pull),
+    Body = body(State, pull),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    ReplyFunAndArgs = {fun ?MODULE:reply_delegator/3, [self(), InstanceId]},
+    {ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_client:query_async(
+        PreparedRequest,
+        ReplyFunAndArgs,
+        Client
+    ),
+    ensure_async_worker_monitored(State, AsyncWorkerPid).
+
+-spec ensure_async_worker_monitored(state(), pid()) -> state().
+ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerPid) ->
+    case is_map_key(AsyncWorkerPid, Workers0) of
+        true ->
+            State;
+        false ->
+            Ref = monitor(process, AsyncWorkerPid),
+            Workers = Workers0#{AsyncWorkerPid => Ref},
+            State#{async_workers := Workers}
+    end.
+
+-spec do_process_pull_response(state(), binary()) -> state().
+do_process_pull_response(State0, RespBody) ->
+    Messages = decode_response(RespBody),
+    AckIds = lists:map(fun(Msg) -> handle_message(State0, Msg) end, Messages),
+    State1 = maps:update_with(pending_acks, fun(AckIds0) -> AckIds0 ++ AckIds end, State0),
+    State2 = acknowledge(State1),
+    pull_async(self()),
+    ensure_ack_timer(State2).
+
+-spec acknowledge(state()) -> state().
+acknowledge(State0 = #{pending_acks := []}) ->
+    State0;
+acknowledge(State0) ->
+    State1 = State0#{ack_timer := undefined},
+    #{
+        client := Client,
+        pending_acks := AckIds
+    } = State1,
+    Method = post,
+    Path = path(State1, ack),
+    Body = body(State1, ack, #{ack_ids => AckIds}),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
+    case Res of
+        {error, Reason} ->
+            ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", reason => Reason}),
+            State1;
+        {ok, #{status_code := 200}} ->
+            ?tp(gcp_pubsub_consumer_worker_acknowledged, #{ack_ids => AckIds}),
+            State1#{pending_acks := []};
+        {ok, Details} ->
+            ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", details => Details}),
+            State1
+    end.
+
+do_get_subscription(State) ->
+    #{
+        client := Client
+    } = State,
+    Method = get,
+    Path = path(State, get_subscription),
+    Body = body(State, get_subscription),
+    PreparedRequest = {prepared_request, {Method, Path, Body}},
+    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
+    case Res of
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "gcp_pubsub_consumer_worker_get_subscription_error",
+                reason => Reason
+            }),
+            {error, Reason};
+        {ok, #{status_code := 200, body := RespBody}} ->
+            DecodedBody = emqx_utils_json:decode(RespBody, [return_maps]),
+            {ok, DecodedBody};
+        {ok, Details} ->
+            ?SLOG(warning, #{
+                msg => "gcp_pubsub_consumer_worker_get_subscription_unexpected_response",
+                details => Details
+            }),
+            {error, Details}
+    end.
+
+-spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_client:topic()) -> subscription_id().
+subscription_id(BridgeName0, Topic) ->
+    %% The real GCP PubSub accepts colons in subscription names, but its emulator
+    %% doesn't...  We currently validate bridge names to not include that character.  The
+    %% exception is the prefix from the probe API.
+    BridgeName1 = to_bin(BridgeName0),
+    BridgeName = binary:replace(BridgeName1, <<":">>, <<"-">>),
+    to_bin(uri_string:quote(<<"emqx-sub-", BridgeName/binary, "-", Topic/binary>>)).
+
+-spec path(state(), pull | create | ack | get_subscription) -> binary().
+path(State, Type) ->
+    #{
+        client := #{project_id := ProjectId},
+        subscription_id := SubscriptionId
+    } = State,
+    SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
+    case Type of
+        pull ->
+            <<"/v1/", SubscriptionResource/binary, ":pull">>;
+        create ->
+            <<"/v1/", SubscriptionResource/binary>>;
+        ack ->
+            <<"/v1/", SubscriptionResource/binary, ":acknowledge">>;
+        get_subscription ->
+            <<"/v1/", SubscriptionResource/binary>>
+    end.
+
+-spec body(state(), pull | create | patch_subscription | get_subscription) -> binary().
+body(State, pull) ->
+    #{pull_max_messages := PullMaxMessages} = State,
+    emqx_utils_json:encode(#{<<"maxMessages">> => PullMaxMessages});
+body(State, create) ->
+    #{
+        ack_retry_interval := AckRetryInterval,
+        project_id := ProjectId,
+        topic := PubSubTopic
+    } = State,
+    TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
+    AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
+    JSON = #{
+        <<"topic">> => TopicResource,
+        <<"ackDeadlineSeconds">> => AckDeadlineSeconds
+    },
+    emqx_utils_json:encode(JSON);
+body(State, patch_subscription) ->
+    #{
+        ack_retry_interval := AckRetryInterval,
+        project_id := ProjectId,
+        topic := PubSubTopic,
+        subscription_id := SubscriptionId
+    } = State,
+    TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
+    SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
+    AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
+    JSON = #{
+        <<"subscription">> =>
+            #{
+                <<"ackDeadlineSeconds">> => AckDeadlineSeconds,
+                <<"name">> => SubscriptionResource,
+                <<"topic">> => TopicResource
+            },
+        %% topic is immutable; don't add it here.
+        <<"updateMask">> => <<"ackDeadlineSeconds">>
+    },
+    emqx_utils_json:encode(JSON);
+body(_State, get_subscription) ->
+    <<>>.
+
+-spec body(state(), ack, map()) -> binary().
+body(_State, ack, Opts) ->
+    #{ack_ids := AckIds} = Opts,
+    JSON = #{<<"ackIds">> => AckIds},
+    emqx_utils_json:encode(JSON).
+
+-spec subscription_resource(emqx_bridge_gcp_pubsub_client:project_id(), subscription_id()) ->
+    binary().
+subscription_resource(ProjectId, SubscriptionId) ->
+    <<"projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>.
+
+-spec decode_response(binary()) -> [decoded_message()].
+decode_response(RespBody) ->
+    case emqx_utils_json:decode(RespBody, [return_maps]) of
+        #{<<"receivedMessages">> := Msgs0} ->
+            lists:map(
+                fun(Msg0 = #{<<"message">> := InnerMsg0}) ->
+                    InnerMsg = emqx_utils_maps:update_if_present(
+                        <<"data">>, fun base64:decode/1, InnerMsg0
+                    ),
+                    Msg0#{<<"message">> := InnerMsg}
+                end,
+                Msgs0
+            );
+        #{} ->
+            []
+    end.
+
+-spec handle_message(state(), decoded_message()) -> [ack_id()].
+handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Message) ->
+    ?tp(
+        debug,
+        "gcp_pubsub_consumer_worker_handle_message",
+        #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId}
+    ),
+    #{
+        instance_id := InstanceId,
+        hookpoint := Hookpoint,
+        mqtt_config := #{
+            payload_template := PayloadTemplate,
+            qos := MQTTQoS,
+            mqtt_topic := MQTTTopic
+        },
+        topic := Topic
+    } = State,
+    #{
+        <<"messageId">> := MessageId,
+        <<"publishTime">> := PublishTime
+    } = InnerMsg,
+    FullMessage0 = #{
+        message_id => MessageId,
+        publish_time => PublishTime,
+        topic => Topic
+    },
+    FullMessage =
+        lists:foldl(
+            fun({FromKey, ToKey}, Acc) ->
+                add_if_present(FromKey, InnerMsg, ToKey, Acc)
+            end,
+            FullMessage0,
+            [
+                {<<"data">>, value},
+                {<<"attributes">>, attributes},
+                {<<"orderingKey">>, ordering_key}
+            ]
+        ),
+    Payload = render(FullMessage, PayloadTemplate),
+    MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload),
+    _ = emqx:publish(MQTTMessage),
+    emqx:run_hook(Hookpoint, [FullMessage]),
+    emqx_resource_metrics:received_inc(InstanceId),
+    AckId.
+
+-spec add_if_present(any(), map(), any(), map()) -> map().
+add_if_present(FromKey, Message, ToKey, Map) ->
+    case maps:get(FromKey, Message, undefined) of
+        undefined ->
+            Map;
+        Value ->
+            Map#{ToKey => Value}
+    end.
+
+render(FullMessage, PayloadTemplate) ->
+    Opts = #{return => full_binary},
+    emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
+
+to_bin(A) when is_atom(A) -> atom_to_binary(A);
+to_bin(L) when is_list(L) -> iolist_to_binary(L);
+to_bin(B) when is_binary(B) -> B.

+ 201 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl

@@ -0,0 +1,201 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_impl_consumer).
+
+-behaviour(emqx_resource).
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    query_mode/1,
+    on_start/2,
+    on_stop/2,
+    on_get_status/2
+]).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-type mqtt_config() :: #{
+    mqtt_topic := emqx_types:topic(),
+    qos := emqx_types:qos(),
+    payload_template := string()
+}.
+-type config() :: #{
+    connect_timeout := emqx_schema:duration_ms(),
+    max_retries := non_neg_integer(),
+    pool_size := non_neg_integer(),
+    resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
+    service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
+    any() => term()
+}.
+-type state() :: #{
+    client := emqx_bridge_gcp_pubsub_client:state()
+}.
+
+-export_type([mqtt_config/0]).
+
+-define(AUTO_RECONNECT_S, 2).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+-spec callback_mode() -> callback_mode().
+callback_mode() -> async_if_possible.
+
+-spec query_mode(any()) -> query_mode().
+query_mode(_Config) -> no_queries.
+
+-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+on_start(InstanceId, Config) ->
+    case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
+        {ok, Client} ->
+            start_consumers(InstanceId, Client, Config);
+        Error ->
+            Error
+    end.
+
+-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, _State) ->
+    ok = stop_consumers(InstanceId),
+    emqx_bridge_gcp_pubsub_client:stop(InstanceId).
+
+-spec on_get_status(resource_id(), state()) -> connected | disconnected.
+on_get_status(InstanceId, _State) ->
+    %% Note: do *not* alter the `client' value here.  It must be immutable, since
+    %% we have handed it over to the pull workers.
+    case
+        emqx_resource_pool:health_check_workers(
+            InstanceId,
+            fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1
+        )
+    of
+        true -> connected;
+        false -> connecting
+    end.
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+start_consumers(InstanceId, Client, Config) ->
+    #{
+        bridge_name := BridgeName,
+        consumer := ConsumerConfig0,
+        hookpoint := Hookpoint,
+        service_account_json := #{project_id := ProjectId}
+    } = Config,
+    ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
+    TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
+    ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1),
+    PoolSize = map_size(TopicMapping) * ConsumerWorkersPerTopic,
+    ConsumerConfig = ConsumerConfig1#{
+        auto_reconnect => ?AUTO_RECONNECT_S,
+        bridge_name => BridgeName,
+        client => Client,
+        hookpoint => Hookpoint,
+        instance_id => InstanceId,
+        pool_size => PoolSize,
+        project_id => ProjectId
+    },
+    ConsumerOpts = maps:to_list(ConsumerConfig),
+    %% FIXME: mark as unhealthy if topics do not exist!
+    case validate_pubsub_topics(TopicMapping, Client) of
+        ok ->
+            ok;
+        error ->
+            _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
+            throw(
+                "GCP PubSub topics are invalid.  Please check the logs, check if the "
+                "topic exists in GCP and if the service account has permissions to use them."
+            )
+    end,
+    case
+        emqx_resource_pool:start(InstanceId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts)
+    of
+        ok ->
+            State = #{
+                client => Client,
+                pool_name => InstanceId
+            },
+            {ok, State};
+        {error, Reason} ->
+            _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
+            {error, Reason}
+    end.
+
+stop_consumers(InstanceId) ->
+    _ = log_when_error(
+        fun() ->
+            ok = emqx_resource_pool:stop(InstanceId)
+        end,
+        #{
+            msg => "failed_to_stop_pull_worker_pool",
+            instance_id => InstanceId
+        }
+    ),
+    ok.
+
+convert_topic_mapping(TopicMappingList) ->
+    lists:foldl(
+        fun(Fields, Acc) ->
+            #{
+                pubsub_topic := PubSubTopic,
+                mqtt_topic := MQTTTopic,
+                qos := QoS,
+                payload_template := PayloadTemplate0
+            } = Fields,
+            PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
+            Acc#{
+                PubSubTopic => #{
+                    payload_template => PayloadTemplate,
+                    mqtt_topic => MQTTTopic,
+                    qos => QoS
+                }
+            }
+        end,
+        #{},
+        TopicMappingList
+    ).
+
+validate_pubsub_topics(TopicMapping, Client) ->
+    PubSubTopics = maps:keys(TopicMapping),
+    do_validate_pubsub_topics(Client, PubSubTopics).
+
+do_validate_pubsub_topics(Client, [Topic | Rest]) ->
+    case check_for_topic_existence(Topic, Client) of
+        ok ->
+            do_validate_pubsub_topics(Client, Rest);
+        {error, _} ->
+            error
+    end;
+do_validate_pubsub_topics(_Client, []) ->
+    %% we already validate that the mapping is not empty in the config schema.
+    ok.
+
+check_for_topic_existence(Topic, Client) ->
+    Res = emqx_bridge_gcp_pubsub_client:get_topic(Topic, Client),
+    case Res of
+        {ok, _} ->
+            ok;
+        {error, #{status_code := 404}} ->
+            {error, not_found};
+        {error, Details} ->
+            ?tp(warning, "gcp_pubsub_consumer_check_topic_error", Details),
+            {error, Details}
+    end.
+
+log_when_error(Fun, Log) ->
+    try
+        Fun()
+    catch
+        C:E ->
+            ?SLOG(error, Log#{
+                exception => C,
+                reason => E
+            })
+    end.

+ 287 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -0,0 +1,287 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_impl_producer).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-type config() :: #{
+    connect_timeout := emqx_schema:duration_ms(),
+    max_retries := non_neg_integer(),
+    pubsub_topic := binary(),
+    resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
+    service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
+    any() => term()
+}.
+-type state() :: #{
+    client := emqx_bridge_gcp_pubsub_client:state(),
+    payload_template := emqx_placeholder:tmpl_token(),
+    project_id := emqx_bridge_gcp_pubsub_client:project_id(),
+    pubsub_topic := binary()
+}.
+-type headers() :: emqx_bridge_gcp_pubsub_client:headers().
+-type body() :: emqx_bridge_gcp_pubsub_client:body().
+-type status_code() :: emqx_bridge_gcp_pubsub_client:status_code().
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    query_mode/1,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_query_async/4,
+    on_batch_query/3,
+    on_batch_query_async/4,
+    on_get_status/2
+]).
+
+-export([reply_delegator/2]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+callback_mode() -> async_if_possible.
+
+query_mode(_Config) -> async.
+
+-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+on_start(InstanceId, Config) ->
+    ?SLOG(info, #{
+        msg => "starting_gcp_pubsub_bridge",
+        config => Config
+    }),
+    #{
+        payload_template := PayloadTemplate,
+        pubsub_topic := PubSubTopic,
+        service_account_json := #{project_id := ProjectId}
+    } = Config,
+    case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
+        {ok, Client} ->
+            State = #{
+                client => Client,
+                payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
+                project_id => ProjectId,
+                pubsub_topic => PubSubTopic
+            },
+            {ok, State};
+        Error ->
+            Error
+    end.
+
+-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, _State) ->
+    emqx_bridge_gcp_pubsub_client:stop(InstanceId).
+
+-spec on_get_status(resource_id(), state()) -> connected | disconnected.
+on_get_status(_InstanceId, #{client := Client} = _State) ->
+    emqx_bridge_gcp_pubsub_client:get_status(Client).
+
+-spec on_query(
+    resource_id(),
+    {send_message, map()},
+    state()
+) ->
+    {ok, map()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+on_query(ResourceId, {send_message, Selected}, State) ->
+    Requests = [{send_message, Selected}],
+    ?TRACE(
+        "QUERY_SYNC",
+        "gcp_pubsub_received",
+        #{requests => Requests, connector => ResourceId, state => State}
+    ),
+    do_send_requests_sync(State, Requests, ResourceId).
+
+-spec on_query_async(
+    resource_id(),
+    {send_message, map()},
+    {ReplyFun :: function(), Args :: list()},
+    state()
+) -> {ok, pid()}.
+on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
+    Requests = [{send_message, Selected}],
+    ?TRACE(
+        "QUERY_ASYNC",
+        "gcp_pubsub_received",
+        #{requests => Requests, connector => ResourceId, state => State}
+    ),
+    do_send_requests_async(State, Requests, ReplyFunAndArgs).
+
+-spec on_batch_query(
+    resource_id(),
+    [{send_message, map()}],
+    state()
+) ->
+    {ok, map()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+on_batch_query(ResourceId, Requests, State) ->
+    ?TRACE(
+        "QUERY_SYNC",
+        "gcp_pubsub_received",
+        #{requests => Requests, connector => ResourceId, state => State}
+    ),
+    do_send_requests_sync(State, Requests, ResourceId).
+
+-spec on_batch_query_async(
+    resource_id(),
+    [{send_message, map()}],
+    {ReplyFun :: function(), Args :: list()},
+    state()
+) -> {ok, pid()}.
+on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
+    ?TRACE(
+        "QUERY_ASYNC",
+        "gcp_pubsub_received",
+        #{requests => Requests, connector => ResourceId, state => State}
+    ),
+    do_send_requests_async(State, Requests, ReplyFunAndArgs).
+
+%%-------------------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------------------
+
+-spec do_send_requests_sync(
+    state(),
+    [{send_message, map()}],
+    resource_id()
+) ->
+    {ok, status_code(), headers()}
+    | {ok, status_code(), headers(), body()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+do_send_requests_sync(State, Requests, InstanceId) ->
+    #{client := Client} = State,
+    Payloads =
+        lists:map(
+            fun({send_message, Selected}) ->
+                encode_payload(State, Selected)
+            end,
+            Requests
+        ),
+    Body = to_pubsub_request(Payloads),
+    Path = publish_path(State),
+    Method = post,
+    Request = {prepared_request, {Method, Path, Body}},
+    Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client),
+    QueryMode = sync,
+    handle_result(Result, Request, QueryMode, InstanceId).
+
+-spec do_send_requests_async(
+    state(),
+    [{send_message, map()}],
+    {ReplyFun :: function(), Args :: list()}
+) -> {ok, pid()}.
+do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
+    #{client := Client} = State,
+    Payloads =
+        lists:map(
+            fun({send_message, Selected}) ->
+                encode_payload(State, Selected)
+            end,
+            Requests
+        ),
+    Body = to_pubsub_request(Payloads),
+    Path = publish_path(State),
+    Method = post,
+    Request = {prepared_request, {Method, Path, Body}},
+    ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
+    emqx_bridge_gcp_pubsub_client:query_async(
+        Request, ReplyFunAndArgs, Client
+    ).
+
+-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
+encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
+    Interpolated =
+        case PayloadTemplate of
+            [] -> emqx_utils_json:encode(Selected);
+            _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
+        end,
+    #{data => base64:encode(Interpolated)}.
+
+-spec to_pubsub_request([#{data := binary()}]) -> binary().
+to_pubsub_request(Payloads) ->
+    emqx_utils_json:encode(#{messages => Payloads}).
+
+-spec publish_path(state()) -> binary().
+publish_path(
+    _State = #{
+        project_id := ProjectId,
+        pubsub_topic := PubSubTopic
+    }
+) ->
+    <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
+
+handle_result({error, Reason}, _Request, QueryMode, ResourceId) when
+    Reason =:= econnrefused;
+    %% this comes directly from `gun'...
+    Reason =:= {closed, "The connection was lost."};
+    Reason =:= timeout
+->
+    ?tp(
+        warning,
+        gcp_pubsub_request_failed,
+        #{
+            reason => Reason,
+            query_mode => QueryMode,
+            recoverable_error => true,
+            connector => ResourceId
+        }
+    ),
+    {error, {recoverable_error, Reason}};
+handle_result(
+    {error, #{status_code := StatusCode, body := RespBody}} = Result,
+    Request,
+    _QueryMode,
+    ResourceId
+) ->
+    ?SLOG(error, #{
+        msg => "gcp_pubsub_error_response",
+        request => emqx_connector_http:redact_request(Request),
+        connector => ResourceId,
+        status_code => StatusCode,
+        resp_body => RespBody
+    }),
+    Result;
+handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) ->
+    ?SLOG(error, #{
+        msg => "gcp_pubsub_error_response",
+        request => emqx_connector_http:redact_request(Request),
+        connector => ResourceId,
+        status_code => StatusCode
+    }),
+    Result;
+handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) ->
+    ?tp(
+        error,
+        gcp_pubsub_request_failed,
+        #{
+            reason => Reason,
+            query_mode => QueryMode,
+            recoverable_error => false,
+            connector => ResourceId
+        }
+    ),
+    Result;
+handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) ->
+    Result.
+
+reply_delegator(ReplyFunAndArgs, Response) ->
+    case Response of
+        {error, Reason} when
+            Reason =:= econnrefused;
+            %% this comes directly from `gun'...
+            Reason =:= {closed, "The connection was lost."};
+            Reason =:= timeout
+        ->
+            Result = {error, {recoverable_error, Reason}},
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
+        _ ->
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response)
+    end.

+ 688 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -0,0 +1,688 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_consumer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("jose/include/jose_jwt.hrl").
+-include_lib("jose/include/jose_jws.hrl").
+
+-define(BRIDGE_TYPE, gcp_pubsub_consumer).
+-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
+-define(REPUBLISH_TOPIC, <<"republish/t">>).
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    GCPEmulatorHost = os:getenv("GCP_EMULATOR_HOST", "toxiproxy"),
+    GCPEmulatorPortStr = os:getenv("GCP_EMULATOR_PORT", "8085"),
+    GCPEmulatorPort = list_to_integer(GCPEmulatorPortStr),
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    ProxyName = "gcp_emulator",
+    case emqx_common_test_helpers:is_tcp_server_available(GCPEmulatorHost, GCPEmulatorPort) of
+        true ->
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+            ok = emqx_connector_test_helpers:start_apps([
+                emqx_resource, emqx_bridge, emqx_rule_engine
+            ]),
+            {ok, _} = application:ensure_all_started(emqx_connector),
+            emqx_mgmt_api_test_util:init_suite(),
+            HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr,
+            true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
+            Client = start_control_connector(),
+            [
+                {proxy_name, ProxyName},
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
+                {gcp_emulator_host, GCPEmulatorHost},
+                {gcp_emulator_port, GCPEmulatorPort},
+                {client, Client}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_gcp_emulator);
+                _ ->
+                    {skip, no_gcp_emulator}
+            end
+    end.
+
+end_per_suite(Config) ->
+    Client = ?config(client, Config),
+    stop_control_connector(Client),
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
+    _ = application:stop(emqx_connector),
+    os:unsetenv("PUBSUB_EMULATOR_HOST"),
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+common_init_per_testcase(TestCase, Config0) ->
+    ct:timetrap(timer:seconds(60)),
+    emqx_bridge_testlib:delete_all_bridges(),
+    emqx_config:delete_override_conf_files(),
+    ConsumerTopic =
+        <<
+            (atom_to_binary(TestCase))/binary,
+            (integer_to_binary(erlang:unique_integer()))/binary
+        >>,
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>),
+    MQTTQoS = proplists:get_value(mqtt_qos, Config0, 0),
+    DefaultTopicMapping = [
+        #{
+            pubsub_topic => ConsumerTopic,
+            mqtt_topic => MQTTTopic,
+            qos => MQTTQoS,
+            payload_template => <<"${.}">>
+        }
+    ],
+    TopicMapping = proplists:get_value(topic_mapping, Config0, DefaultTopicMapping),
+    ServiceAccountJSON =
+        #{<<"project_id">> := ProjectId} =
+        emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
+    Config = [
+        {consumer_topic, ConsumerTopic},
+        {topic_mapping, TopicMapping},
+        {service_account_json, ServiceAccountJSON},
+        {project_id, ProjectId}
+        | Config0
+    ],
+    {Name, ConfigString, ConsumerConfig} = consumer_config(TestCase, Config),
+    ensure_topics(Config),
+    ok = snabbkaffe:start_trace(),
+    [
+        {consumer_name, Name},
+        {consumer_config_string, ConfigString},
+        {consumer_config, ConsumerConfig}
+        | Config
+    ].
+
+end_per_testcase(_Testcase, Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ok;
+        false ->
+            ProxyHost = ?config(proxy_host, Config),
+            ProxyPort = ?config(proxy_port, Config),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            emqx_bridge_testlib:delete_all_bridges(),
+            emqx_common_test_helpers:call_janitor(60_000),
+            ok = snabbkaffe:stop(),
+            ok
+    end.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+consumer_config(TestCase, Config) ->
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    ConsumerTopic = ?config(consumer_topic, Config),
+    ServiceAccountJSON = ?config(service_account_json, Config),
+    Name = <<
+        (atom_to_binary(TestCase))/binary, UniqueNum/binary
+    >>,
+    ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON),
+    MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
+    MQTTQoS = proplists:get_value(mqtt_qos, Config, 0),
+    ConsumerWorkersPerTopic = proplists:get_value(consumer_workers_per_topic, Config, 1),
+    DefaultTopicMapping = [
+        #{
+            pubsub_topic => ConsumerTopic,
+            mqtt_topic => MQTTTopic,
+            qos => MQTTQoS,
+            payload_template => <<"${.}">>
+        }
+    ],
+    TopicMapping0 = proplists:get_value(topic_mapping, Config, DefaultTopicMapping),
+    TopicMappingStr = topic_mapping(TopicMapping0),
+    ConfigString =
+        io_lib:format(
+            "bridges.gcp_pubsub_consumer.~s {\n"
+            "  enable = true\n"
+            %% gcp pubsub emulator doesn't do pipelining very well...
+            "  pipelining = 1\n"
+            "  connect_timeout = \"15s\"\n"
+            "  service_account_json = ~s\n"
+            "  consumer {\n"
+            "    ack_retry_interval = \"5s\"\n"
+            "    pull_max_messages = 10\n"
+            "    consumer_workers_per_topic = ~b\n"
+            %% topic mapping
+            "~s"
+            "  }\n"
+            "  max_retries = 2\n"
+            "  pipelining = 100\n"
+            "  pool_size = 8\n"
+            "  resource_opts {\n"
+            "    health_check_interval = \"1s\"\n"
+            "    request_ttl = \"15s\"\n"
+            "  }\n"
+            "}\n",
+            [
+                Name,
+                ServiceAccountJSONStr,
+                ConsumerWorkersPerTopic,
+                TopicMappingStr
+            ]
+        ),
+    {Name, ConfigString, parse_and_check(ConfigString, Name)}.
+
+parse_and_check(ConfigString, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+topic_mapping(TopicMapping0) ->
+    Template0 = <<
+        "{pubsub_topic = \"{{ pubsub_topic }}\","
+        " mqtt_topic = \"{{ mqtt_topic }}\","
+        " qos = {{ qos }},"
+        " payload_template = \"{{{ payload_template }}}\" }"
+    >>,
+    Template = bbmustache:parse_binary(Template0),
+    Entries =
+        lists:map(
+            fun(Params) ->
+                bbmustache:compile(Template, Params, [{key_type, atom}])
+            end,
+            TopicMapping0
+        ),
+    iolist_to_binary(
+        [
+            "  topic_mapping = [",
+            lists:join(<<",\n">>, Entries),
+            "]\n"
+        ]
+    ).
+
+ensure_topics(Config) ->
+    TopicMapping = ?config(topic_mapping, Config),
+    lists:foreach(
+        fun(#{pubsub_topic := T}) ->
+            ensure_topic(Config, T)
+        end,
+        TopicMapping
+    ).
+
+ensure_topic(Config, Topic) ->
+    ProjectId = ?config(project_id, Config),
+    Client = ?config(client, Config),
+    Method = put,
+    Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>,
+    Body = <<"{}">>,
+    Res = emqx_bridge_gcp_pubsub_client:query_sync(
+        {prepared_request, {Method, Path, Body}},
+        Client
+    ),
+    case Res of
+        {ok, _} ->
+            ok;
+        {error, #{status_code := 409}} ->
+            %% already exists
+            ok
+    end,
+    ok.
+
+start_control_connector() ->
+    RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
+    ServiceAccount = emqx_utils_maps:unsafe_atom_key_map(RawServiceAccount),
+    ConnectorConfig =
+        #{
+            connect_timeout => 5_000,
+            max_retries => 0,
+            pool_size => 1,
+            resource_opts => #{request_ttl => 5_000},
+            service_account_json => ServiceAccount
+        },
+    PoolName = <<"control_connector">>,
+    {ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig),
+    Client.
+
+stop_control_connector(Client) ->
+    ok = emqx_bridge_gcp_pubsub_client:stop(Client),
+    ok.
+
+pubsub_publish(Config, Topic, Messages0) ->
+    Client = ?config(client, Config),
+    ProjectId = ?config(project_id, Config),
+    Method = post,
+    Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary, ":publish">>,
+    Messages =
+        lists:map(
+            fun(Msg) ->
+                emqx_utils_maps:update_if_present(
+                    <<"data">>,
+                    fun
+                        (D) when is_binary(D) -> base64:encode(D);
+                        (M) when is_map(M) -> base64:encode(emqx_utils_json:encode(M))
+                    end,
+                    Msg
+                )
+            end,
+            Messages0
+        ),
+    Body = emqx_utils_json:encode(#{<<"messages">> => Messages}),
+    {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync(
+        {prepared_request, {Method, Path, Body}},
+        Client
+    ),
+    ok.
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    BridgeConfig0 = ?config(consumer_config, Config),
+    BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+    emqx_bridge:create(Type, Name, BridgeConfig).
+
+create_bridge_api(Config) ->
+    create_bridge_api(Config, _Overrides = #{}).
+
+create_bridge_api(Config, Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    BridgeConfig0 = ?config(consumer_config, Config),
+    BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+    Params = BridgeConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("creating bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {Status, Headers, Body0}} ->
+                {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
+            Error ->
+                Error
+        end,
+    ct:pal("bridge create result: ~p", [Res]),
+    Res.
+
+probe_bridge_api(Config) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    ConsumerConfig = ?config(consumer_config, Config),
+    Params = ConsumerConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("probing bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
+            Error -> Error
+        end,
+    ct:pal("bridge probe result: ~p", [Res]),
+    Res.
+
+start_and_subscribe_mqtt(Config) ->
+    TopicMapping = ?config(topic_mapping, Config),
+    {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+    on_exit(fun() -> emqtt:stop(C) end),
+    {ok, _} = emqtt:connect(C),
+    lists:foreach(
+        fun(#{mqtt_topic := MQTTTopic}) ->
+            {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, _QoS = 2)
+        end,
+        TopicMapping
+    ),
+    ok.
+
+resource_id(Config) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(consumer_name, Config),
+    emqx_bridge_resource:resource_id(Type, Name).
+
+receive_published() ->
+    receive_published(#{}).
+
+receive_published(Opts0) ->
+    Default = #{n => 1, timeout => 20_000},
+    Opts = maps:merge(Default, Opts0),
+    receive_published(Opts, []).
+
+receive_published(#{n := N, timeout := _Timeout}, Acc) when N =< 0 ->
+    {ok, lists:reverse(Acc)};
+receive_published(#{n := N, timeout := Timeout} = Opts, Acc) ->
+    receive
+        {publish, Msg0 = #{payload := Payload}} ->
+            Msg =
+                case emqx_utils_json:safe_decode(Payload, [return_maps]) of
+                    {ok, Decoded} -> Msg0#{payload := Decoded};
+                    {error, _} -> Msg0
+                end,
+            receive_published(Opts#{n := N - 1}, [Msg | Acc])
+    after Timeout ->
+        {timeout, #{
+            msgs_so_far => Acc,
+            mailbox => process_info(self(), messages),
+            expected_remaining => N
+        }}
+    end.
+
+create_rule_and_action_http(Config) ->
+    ConsumerName = ?config(consumer_name, Config),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, ConsumerName),
+    ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>,
+    Params = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"$bridges/", BridgeId/binary, "\"">>,
+        actions =>
+            [
+                #{
+                    <<"function">> => <<"republish">>,
+                    <<"args">> =>
+                        #{
+                            <<"topic">> => ?REPUBLISH_TOPIC,
+                            <<"payload">> => <<>>,
+                            <<"qos">> => 0,
+                            <<"retain">> => false,
+                            <<"user_properties">> => <<"${headers}">>
+                        }
+                },
+                #{<<"function">> => ActionFn}
+            ]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ct:pal("rule action params: ~p", [Params]),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res = #{<<"id">> := RuleId}} ->
+            on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+            {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error ->
+            Error
+    end.
+
+action_response(Selected, Envs, Args) ->
+    ?tp(action_response, #{
+        selected => Selected,
+        envs => Envs,
+        args => Args
+    }),
+    ok.
+
+assert_non_received_metrics(BridgeName) ->
+    Metrics = emqx_bridge:get_metrics(?BRIDGE_TYPE, BridgeName),
+    #{counters := Counters0, gauges := Gauges} = Metrics,
+    Counters = maps:remove(received, Counters0),
+    ?assert(lists:all(fun(V) -> V == 0 end, maps:values(Counters)), #{metrics => Metrics}),
+    ?assert(lists:all(fun(V) -> V == 0 end, maps:values(Gauges)), #{metrics => Metrics}),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Trace properties
+%%------------------------------------------------------------------------------
+
+prop_pulled_only_once(Trace) ->
+    PulledIds = ?projection(
+        message_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace)
+    ),
+    NumPulled = length(PulledIds),
+    UniqueNumPulled = sets:size(sets:from_list(PulledIds, [{version, 2}])),
+    ?assertEqual(UniqueNumPulled, NumPulled),
+    ok.
+
+prop_all_pulled_are_acked(Trace) ->
+    PulledAckIds = ?projection(
+        ack_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace)
+    ),
+    AckedIds0 = ?projection(ack_ids, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)),
+    AckedIds = lists:flatten(AckedIds0),
+    ?assertEqual(
+        sets:from_list(PulledAckIds, [{version, 2}]),
+        sets:from_list(AckedIds, [{version, 2}])
+    ),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_consume_ok(Config) ->
+    BridgeName = ?config(consumer_name, Config),
+    TopicMapping = ?config(topic_mapping, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            start_and_subscribe_mqtt(Config),
+            ?assertMatch(
+                {{ok, _}, {ok, _}},
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
+                    40_000
+                )
+            ),
+            [
+                #{
+                    pubsub_topic := Topic,
+                    mqtt_topic := MQTTTopic,
+                    qos := QoS
+                }
+            ] = TopicMapping,
+            Payload0 = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Messages0 = [
+                #{
+                    <<"data">> => Data0 = #{<<"value">> => Payload0},
+                    <<"attributes">> => Attributes0 = #{<<"key">> => <<"value">>},
+                    <<"orderingKey">> => <<"some_ordering_key">>
+                }
+            ],
+            pubsub_publish(Config, Topic, Messages0),
+            {ok, Published0} = receive_published(),
+            EncodedData0 = emqx_utils_json:encode(Data0),
+            ?assertMatch(
+                [
+                    #{
+                        qos := QoS,
+                        topic := MQTTTopic,
+                        payload :=
+                            #{
+                                <<"attributes">> := Attributes0,
+                                <<"message_id">> := MsgId,
+                                <<"ordering_key">> := <<"some_ordering_key">>,
+                                <<"publish_time">> := PubTime,
+                                <<"topic">> := Topic,
+                                <<"value">> := EncodedData0
+                            }
+                    }
+                ] when is_binary(MsgId) andalso is_binary(PubTime),
+                Published0
+            ),
+            %% no need to check return value; we check the property in
+            %% the check phase.  this is just to give it a chance to do
+            %% so and avoid flakiness.  should be fast.
+            ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 1_000),
+            ?retry(
+                _Interval = 200,
+                _NAttempts = 20,
+                ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId))
+            ),
+
+            %% Batch with only data and only attributes
+            Payload1 = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Messages1 = [
+                #{<<"data">> => Data1 = #{<<"val">> => Payload1}},
+                #{<<"attributes">> => Attributes1 = #{<<"other_key">> => <<"other_value">>}}
+            ],
+            pubsub_publish(Config, Topic, Messages1),
+            {ok, Published1} = receive_published(#{n => 2}),
+            EncodedData1 = emqx_utils_json:encode(Data1),
+            ?assertMatch(
+                [
+                    #{
+                        qos := QoS,
+                        topic := MQTTTopic,
+                        payload :=
+                            #{
+                                <<"message_id">> := _,
+                                <<"publish_time">> := _,
+                                <<"topic">> := Topic,
+                                <<"value">> := EncodedData1
+                            }
+                    },
+                    #{
+                        qos := QoS,
+                        topic := MQTTTopic,
+                        payload :=
+                            #{
+                                <<"attributes">> := Attributes1,
+                                <<"message_id">> := _,
+                                <<"publish_time">> := _,
+                                <<"topic">> := Topic
+                            }
+                    }
+                ],
+                Published1
+            ),
+            ?assertNotMatch(
+                [
+                    #{payload := #{<<"attributes">> := _, <<"ordering_key">> := _}},
+                    #{payload := #{<<"value">> := _, <<"ordering_key">> := _}}
+                ],
+                Published1
+            ),
+            %% no need to check return value; we check the property in
+            %% the check phase.  this is just to give it a chance to do
+            %% so and avoid flakiness.  should be fast.
+            ?block_until(
+                #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged, ack_ids := [_, _]}, 1_000
+            ),
+            ?retry(
+                _Interval = 200,
+                _NAttempts = 20,
+                ?assertEqual(3, emqx_resource_metrics:received_get(ResourceId))
+            ),
+
+            %% Check that the bridge probe API doesn't leak atoms.
+            ProbeRes0 = probe_bridge_api(Config),
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
+            AtomsBefore = erlang:system_info(atom_count),
+            %% Probe again; shouldn't have created more atoms.
+            ProbeRes1 = probe_bridge_api(Config),
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
+            AtomsAfter = erlang:system_info(atom_count),
+            ?assertEqual(AtomsBefore, AtomsAfter),
+
+            assert_non_received_metrics(BridgeName),
+
+            ok
+        end,
+        [
+            {"all pulled ack ids are acked", fun ?MODULE:prop_all_pulled_are_acked/1},
+            {"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}
+        ]
+    ),
+    ok.
+
+t_bridge_rule_action_source(Config) ->
+    BridgeName = ?config(consumer_name, Config),
+    TopicMapping = ?config(topic_mapping, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {{ok, _}, {ok, _}},
+                ?wait_async_action(
+                    create_bridge(Config),
+                    #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
+                    40_000
+                )
+            ),
+            {ok, _} = create_rule_and_action_http(Config),
+
+            [#{pubsub_topic := PubSubTopic}] = TopicMapping,
+            {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+            on_exit(fun() -> emqtt:stop(C) end),
+            {ok, _} = emqtt:connect(C),
+            {ok, _, [0]} = emqtt:subscribe(C, ?REPUBLISH_TOPIC),
+
+            Payload0 = emqx_guid:to_hexstr(emqx_guid:gen()),
+            Messages0 = [
+                #{
+                    <<"data">> => Data0 = #{<<"payload">> => Payload0},
+                    <<"attributes">> => Attributes0 = #{<<"key">> => <<"value">>}
+                }
+            ],
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    pubsub_publish(Config, PubSubTopic, Messages0),
+                    #{?snk_kind := action_response},
+                    5_000
+                ),
+            Published0 = receive_published(),
+            EncodedData0 = emqx_utils_json:encode(Data0),
+            ?assertMatch(
+                {ok, [
+                    #{
+                        topic := ?REPUBLISH_TOPIC,
+                        qos := 0,
+                        payload := #{
+                            <<"event">> := <<"$bridges/", _/binary>>,
+                            <<"message_id">> := _,
+                            <<"metadata">> := #{<<"rule_id">> := _},
+                            <<"publish_time">> := _,
+                            <<"topic">> := PubSubTopic,
+                            <<"attributes">> := Attributes0,
+                            <<"value">> := EncodedData0
+                        }
+                    }
+                ]},
+                Published0
+            ),
+            ?retry(
+                _Interval = 200,
+                _NAttempts = 20,
+                ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId))
+            ),
+
+            assert_non_received_metrics(BridgeName),
+
+            #{payload => Payload0}
+        end,
+        [{"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}]
+    ),
+    ok.
+
+%% TODO TEST:
+%%   * multi-topic mapping
+%%   * get status
+%%   * 2+ pull workers do not duplicate delivered messages
+%%   * inexistent topic
+%%   * connection cut then restored
+%%   * pull worker death
+%%   * async worker death mid-pull
+%%   * ensure subscription creation error
+%%   * cluster subscription
+%%   * connection down during pull
+%%   * connection down during ack
+%%   * topic deleted while consumer is running
+%%   * subscription deleted while consumer is running
+%%   * ensure client is terminated when bridge stops

+ 46 - 40
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_bridge_gcp_pubsub_SUITE).
+-module(emqx_bridge_gcp_pubsub_producer_SUITE).
 
 -compile(nowarn_export_all).
 -compile(export_all).
@@ -38,13 +38,13 @@ groups() ->
         {group, sync_query},
         {group, async_query}
     ],
-    ResourceGroups = [{group, gcp_pubsub}],
+    SyncTCs = MatrixTCs,
+    AsyncTCs = MatrixTCs -- only_sync_tests(),
     [
         {with_batch, SynchronyGroups},
         {without_batch, SynchronyGroups},
-        {sync_query, ResourceGroups},
-        {async_query, ResourceGroups},
-        {gcp_pubsub, MatrixTCs}
+        {sync_query, SyncTCs},
+        {async_query, AsyncTCs}
     ].
 
 %% these should not be influenced by the batch/no
@@ -66,11 +66,15 @@ single_config_tests() ->
         t_on_start_ehttpc_pool_already_started
     ].
 
+only_sync_tests() ->
+    [t_query_sync].
+
 init_per_suite(Config) ->
     ok = emqx_common_test_helpers:start_apps([emqx_conf]),
     ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
     {ok, _} = application:ensure_all_started(emqx_connector),
     emqx_mgmt_api_test_util:init_suite(),
+    persistent_term:put({emqx_bridge_gcp_pubsub_client, transport}, tls),
     Config.
 
 end_per_suite(_Config) ->
@@ -78,6 +82,7 @@ end_per_suite(_Config) ->
     ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
     ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
     _ = application:stop(emqx_connector),
+    persistent_term:erase({emqx_bridge_gcp_pubsub_client, transport}),
     ok.
 
 init_per_group(sync_query, Config) ->
@@ -273,7 +278,7 @@ gcp_pubsub_config(Config) ->
     PayloadTemplate = proplists:get_value(payload_template, Config, ""),
     PubSubTopic = proplists:get_value(pubsub_topic, Config, <<"mytopic">>),
     PipelineSize = proplists:get_value(pipeline_size, Config, 100),
-    ServiceAccountJSON = proplists:get_value(pubsub_topic, Config, generate_service_account_json()),
+    ServiceAccountJSON = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
     ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON),
     GUID = emqx_guid:to_hexstr(emqx_guid:gen()),
     Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>,
@@ -321,32 +326,6 @@ parse_and_check(ConfigString, Name) ->
     #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
     Config.
 
-generate_service_account_json() ->
-    PrivateKeyPEM = generate_private_key_pem(),
-    service_account_json(PrivateKeyPEM).
-
-generate_private_key_pem() ->
-    PublicExponent = 65537,
-    Size = 2048,
-    Key = public_key:generate_key({rsa, Size, PublicExponent}),
-    DERKey = public_key:der_encode('PrivateKeyInfo', Key),
-    public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
-
-service_account_json(PrivateKeyPEM) ->
-    #{
-        <<"type">> => <<"service_account">>,
-        <<"project_id">> => <<"myproject">>,
-        <<"private_key_id">> => <<"kid">>,
-        <<"private_key">> => PrivateKeyPEM,
-        <<"client_email">> => <<"test@myproject.iam.gserviceaccount.com">>,
-        <<"client_id">> => <<"123812831923812319190">>,
-        <<"auth_uri">> => <<"https://accounts.google.com/o/oauth2/auth">>,
-        <<"token_uri">> => <<"https://oauth2.googleapis.com/token">>,
-        <<"auth_provider_x509_cert_url">> => <<"https://www.googleapis.com/oauth2/v1/certs">>,
-        <<"client_x509_cert_url">> =>
-            <<"https://www.googleapis.com/robot/v1/metadata/x509/test%40myproject.iam.gserviceaccount.com">>
-    }.
-
 metrics_mapping() ->
     #{
         dropped => fun emqx_resource_metrics:dropped_get/1,
@@ -1016,7 +995,7 @@ t_publish_timeout(Config) ->
         <<"pipelining">> => 1,
         <<"resource_opts">> => #{
             <<"batch_size">> => 1,
-            <<"resume_interval">> => <<"15s">>
+            <<"resume_interval">> => <<"1s">>
         }
     }),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@@ -1068,7 +1047,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                             #{
                                 ?snk_kind := gcp_pubsub_request_failed,
                                 query_mode := async,
-                                recoverable_error := true
+                                reason := econnrefused
                             },
                             15_000
                         );
@@ -1082,8 +1061,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                     emqx:publish(Message),
                     {ok, _} = snabbkaffe:block_until(
                         ?match_n_events(2, #{
-                            ?snk_kind := gcp_pubsub_response,
-                            query_mode := async
+                            ?snk_kind := handle_async_reply_expired,
+                            expired := [_]
                         }),
                         _Timeout1 = 15_000
                     )
@@ -1104,7 +1083,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 timeout ->
                     ?assertMatch(
                         [_, _ | _],
-                        ?of_kind(gcp_pubsub_response, Trace)
+                        ?of_kind(handle_async_reply_expired, Trace)
                     )
             end,
             ok
@@ -1304,13 +1283,13 @@ t_unrecoverable_error(Config) ->
         {_, {ok, _}} =
             ?wait_async_action(
                 emqx:publish(Message),
-                #{?snk_kind := gcp_pubsub_response},
+                #{?snk_kind := gcp_pubsub_request_failed},
                 5_000
             ),
         fun(Trace) ->
             ?assertMatch(
-                [#{response := {error, killed}}],
-                ?of_kind(gcp_pubsub_response, Trace)
+                [#{reason := killed}],
+                ?of_kind(gcp_pubsub_request_failed, Trace)
             ),
             ok
         end
@@ -1464,3 +1443,30 @@ t_on_start_ehttpc_pool_start_failure(Config) ->
         end
     ),
     ok.
+
+%% Usually not called, since the bridge has `async_if_possible' callback mode.
+t_query_sync(Config) ->
+    BatchSize0 = ?config(batch_size, Config),
+    ServiceAccountJSON = ?config(service_account_json, Config),
+    BatchSize = min(2, BatchSize0),
+    Topic = <<"t/topic">>,
+    Payload = <<"payload">>,
+    ?check_trace(
+        emqx_common_test_helpers:with_mock(
+            emqx_bridge_gcp_pubsub_impl_producer,
+            callback_mode,
+            fun() -> always_sync end,
+            fun() ->
+                {ok, _} = create_bridge(Config),
+                {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+                on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+                Message = emqx_message:make(Topic, Payload),
+                emqx_utils:pmap(fun(_) -> emqx:publish(Message) end, lists:seq(1, BatchSize)),
+                DecodedMessages = assert_http_request(ServiceAccountJSON),
+                ?assertEqual(BatchSize, length(DecodedMessages)),
+                ok
+            end
+        ),
+        []
+    ),
+    ok.

+ 34 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_utils.erl

@@ -0,0 +1,34 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_utils).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+generate_service_account_json() ->
+    PrivateKeyPEM = generate_private_key_pem(),
+    service_account_json(PrivateKeyPEM).
+
+generate_private_key_pem() ->
+    PublicExponent = 65537,
+    Size = 2048,
+    Key = public_key:generate_key({rsa, Size, PublicExponent}),
+    DERKey = public_key:der_encode('PrivateKeyInfo', Key),
+    public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
+
+service_account_json(PrivateKeyPEM) ->
+    #{
+        <<"type">> => <<"service_account">>,
+        <<"project_id">> => <<"myproject">>,
+        <<"private_key_id">> => <<"kid">>,
+        <<"private_key">> => PrivateKeyPEM,
+        <<"client_email">> => <<"test@myproject.iam.gserviceaccount.com">>,
+        <<"client_id">> => <<"123812831923812319190">>,
+        <<"auth_uri">> => <<"https://accounts.google.com/o/oauth2/auth">>,
+        <<"token_uri">> => <<"https://oauth2.googleapis.com/token">>,
+        <<"auth_provider_x509_cert_url">> => <<"https://www.googleapis.com/oauth2/v1/certs">>,
+        <<"client_x509_cert_url">> =>
+            <<"https://www.googleapis.com/robot/v1/metadata/x509/test%40myproject.iam.gserviceaccount.com">>
+    }.

+ 3 - 0
apps/emqx_connector/src/emqx_connector_http.erl

@@ -41,6 +41,9 @@
     namespace/0
 ]).
 
+%% for other webhook-like connectors.
+-export([redact_request/1]).
+
 -export([validate_method/1, join_paths/2]).
 
 -type connect_timeout() :: emqx_schema:duration() | infinity.

+ 1 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -22,7 +22,7 @@
 -type resource_state() :: term().
 -type resource_status() :: connected | disconnected | connecting | stopped.
 -type callback_mode() :: always_sync | async_if_possible.
--type query_mode() :: async | sync | simple_async | simple_sync | dynamic.
+-type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
 -type result() :: term().
 -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
 -type query_opts() :: #{

+ 3 - 5
apps/emqx_resource/src/emqx_resource.erl

@@ -122,6 +122,7 @@
 -export([apply_reply_fun/2]).
 
 -export_type([
+    query_mode/0,
     resource_id/0,
     resource_data/0,
     resource_status/0
@@ -174,8 +175,7 @@
     | {resource_status(), resource_state()}
     | {resource_status(), resource_state(), term()}.
 
--callback query_mode(Config :: term()) ->
-    simple_sync | simple_async | sync | async | no_queries.
+-callback query_mode(Config :: term()) -> query_mode().
 
 -spec list_types() -> [module()].
 list_types() ->
@@ -415,9 +415,7 @@ call_stop(ResId, Mod, ResourceState) ->
         Res
     end).
 
--spec query_mode(module(), term(), creation_opts()) ->
-    simple_sync | simple_async | sync | async | no_queries.
-
+-spec query_mode(module(), term(), creation_opts()) -> query_mode().
 query_mode(Mod, Config, Opts) ->
     case erlang:function_exported(Mod, query_mode, 1) of
         true ->

+ 11 - 1
apps/emqx_utils/src/emqx_utils_maps.erl

@@ -32,7 +32,8 @@
     deep_convert/3,
     diff_maps/2,
     best_effort_recursive_sum/3,
-    if_only_to_toggle_enable/2
+    if_only_to_toggle_enable/2,
+    update_if_present/3
 ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -293,3 +294,12 @@ if_only_to_toggle_enable(OldConf, Conf) ->
         {_, _, _} ->
             false
     end.
+
+%% Like `maps:update_with', but does nothing if key does not exist.
+update_if_present(Key, Fun, Map) ->
+    case Map of
+        #{Key := Val} ->
+            Map#{Key := Fun(Val)};
+        _ ->
+            Map
+    end.

+ 1 - 0
changes/ee/feat-11090.en.md

@@ -0,0 +1 @@
+Implemented GCP PubSub Consumer data integration bridge.

+ 1 - 0
changes/ee/fix-11090.en.md

@@ -0,0 +1 @@
+Fixed a configuration that prevented the pipelining option from being correctly set for GCP PubSub Producer bridge.

+ 26 - 11
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -16,7 +16,8 @@ api_schemas(Method) ->
     [
         %% We need to map the `type' field of a request (binary) to a
         %% bridge schema module.
-        api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method),
+        api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub">>, Method ++ "_producer"),
+        api_ref(emqx_bridge_gcp_pubsub, <<"gcp_pubsub_consumer">>, Method ++ "_consumer"),
         api_ref(emqx_bridge_kafka, <<"kafka_consumer">>, Method ++ "_consumer"),
         %% TODO: rename this to `kafka_producer' after alias support is added
         %% to hocon; keeping this as just `kafka' for backwards compatibility.
@@ -91,7 +92,8 @@ resource_type(kafka_consumer) -> emqx_bridge_kafka_impl_consumer;
 resource_type(kafka) -> emqx_bridge_kafka_impl_producer;
 resource_type(cassandra) -> emqx_bridge_cassandra_connector;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
-resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_connector;
+resource_type(gcp_pubsub) -> emqx_bridge_gcp_pubsub_impl_producer;
+resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer;
 resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
 resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
 resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
@@ -125,14 +127,6 @@ fields(bridges) ->
                     required => false
                 }
             )},
-        {gcp_pubsub,
-            mk(
-                hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config")),
-                #{
-                    desc => <<"EMQX Enterprise Config">>,
-                    required => false
-                }
-            )},
         {mysql,
             mk(
                 hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
@@ -198,7 +192,8 @@ fields(bridges) ->
                     required => false
                 }
             )}
-    ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
+    ] ++ kafka_structs() ++ pulsar_structs() ++ gcp_pubsub_structs() ++ mongodb_structs() ++
+        influxdb_structs() ++
         redis_structs() ++
         pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs().
 
@@ -249,6 +244,26 @@ pulsar_structs() ->
             )}
     ].
 
+gcp_pubsub_structs() ->
+    [
+        {gcp_pubsub,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config_producer")),
+                #{
+                    desc => <<"EMQX Enterprise Config">>,
+                    required => false
+                }
+            )},
+        {gcp_pubsub_consumer,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_gcp_pubsub, "config_consumer")),
+                #{
+                    desc => <<"EMQX Enterprise Config">>,
+                    required => false
+                }
+            )}
+    ].
+
 influxdb_structs() ->
     [
         {Protocol,

+ 1 - 1
mix.exs

@@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do
       {:redbug, "2.0.8"},
       {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
       {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
-      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true},
+      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.11", override: true},
       {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},

+ 1 - 1
rebar.config

@@ -56,7 +56,7 @@
     , {gpb, "4.19.7"}
     , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
     , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
-    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}}
+    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.11"}}}
     , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}

+ 48 - 0
rel/i18n/emqx_bridge_gcp_pubsub.hocon

@@ -71,4 +71,52 @@ When a GCP Service Account is created (as described in https://developers.google
 service_account_json.label:
 """GCP Service Account Credentials"""
 
+  consumer_opts {
+    desc: "Local MQTT publish and GCP PubSub consumer configs."
+    label: "GCP PubSub to MQTT"
+  }
+
+  consumer_pull_max_messages {
+    desc: "The maximum number of messages to retrieve from GCP PubSub in a single pull request."
+          " The actual number may be less than the specified value."
+    label: "Maximum Messages to Pull"
+  }
+
+  consumer_topic_mapping {
+    desc: "Defines the mapping between GCP PubSub topics and MQTT topics. Must contain at least one item."
+    label: "Topic Mapping"
+  }
+
+  consumer_pubsub_topic {
+    desc: "GCP PubSub topic to consume from."
+    label: "GCP PubSub"
+  }
+
+  consumer_mqtt_topic {
+    desc: "Local topic to which consumed GCP PubSub messages should be published to."
+    label: "MQTT Topic"
+  }
+
+  consumer_mqtt_qos {
+    desc: "MQTT QoS used to publish messages consumed from GCP PubSub."
+    label: "QoS"
+  }
+
+consumer_mqtt_payload.desc:
+"""The template for transforming the incoming GCP PubSub message.  By default, it will use JSON format to serialize inputs from the GCP PubSub message.  Available fields are:
+<code>message_id</code>: the message ID assigned by GCP PubSub.
+<code>publish_time</code>: message timestamp assigned by GCP PubSub.
+<code>topic</code>: GCP PubSub topic.
+<code>value</code>: the payload of the GCP PubSub message.  Omitted if there's no payload.
+<code>attributes</code>: an object containing string key-value pairs.  Omitted if there are no attributes.
+<code>ordering_key</code>: GCP PubSub message ordering key.  Omitted if there's none."""
+
+consumer_mqtt_payload.label:
+"Payload Template"
+
+  consumer {
+    desc: "GCP PubSub Consumer configuration."
+    label: "GCP PubSub Consumer"
+  }
+
 }

+ 3 - 0
scripts/ct/run.sh

@@ -213,6 +213,9 @@ for dep in ${CT_DEPS}; do
             FILES+=( '.ci/docker-compose-file/docker-compose-minio-tcp.yaml'
                      '.ci/docker-compose-file/docker-compose-minio-tls.yaml' )
             ;;
+        gcp_emulator)
+            FILES+=( '.ci/docker-compose-file/docker-compose-gcp-emulator.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1