Kaynağa Gözat

Merge pull request #11969 from thalesmg/migrate-gcp-pubsub-actions-m-20231116

feat(gcp_pubsub_producer): migrate GCP PubSub producer to actions
Thales Macedo Garitezi 2 yıl önce
ebeveyn
işleme
18e274e587

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -75,6 +75,7 @@ hard_coded_action_info_modules_ee() ->
     [
         emqx_bridge_azure_event_hub_action_info,
         emqx_bridge_confluent_producer_action_info,
+        emqx_bridge_gcp_pubsub_producer_action_info,
         emqx_bridge_kafka_action_info,
         emqx_bridge_syskeeper_action_info
     ].

+ 22 - 8
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -40,7 +40,11 @@
 
 -export([types/0, types_sc/0]).
 
--export([make_producer_action_schema/1, make_consumer_action_schema/1]).
+-export([
+    make_producer_action_schema/1,
+    make_consumer_action_schema/1,
+    top_level_common_action_keys/0
+]).
 
 -export_type([action_type/0]).
 
@@ -130,6 +134,8 @@ registered_schema_fields() ->
 
 desc(actions) ->
     ?DESC("desc_bridges_v2");
+desc(resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(_) ->
     undefined.
 
@@ -154,6 +160,16 @@ examples(Method) ->
     SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()],
     lists:foldl(Fun, #{}, SchemaModules).
 
+top_level_common_action_keys() ->
+    [
+        <<"connector">>,
+        <<"description">>,
+        <<"enable">>,
+        <<"local_topic">>,
+        <<"parameters">>,
+        <<"resource_opts">>
+    ].
+
 %%======================================================================================
 %% Helper functions for making HOCON Schema
 %%======================================================================================
@@ -174,7 +190,10 @@ make_consumer_action_schema(ActionParametersRef) ->
         {description, emqx_schema:description_schema()},
         {parameters, ActionParametersRef},
         {resource_opts,
-            mk(ref(?MODULE, resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
+            mk(ref(?MODULE, resource_opts), #{
+                default => #{},
+                desc => ?DESC(emqx_resource_schema, "resource_opts")
+            })}
     ].
 
 -ifdef(TEST).
@@ -196,7 +215,7 @@ schema_homogeneous_test() ->
 
 is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
     Fields = Module:fields(TypeName),
-    ExpectedFieldNames = common_field_names(),
+    ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()),
     MissingFileds = lists:filter(
         fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames
     ),
@@ -211,9 +230,4 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
             }}
     end.
 
-common_field_names() ->
-    [
-        enable, description, local_topic, connector, resource_opts, parameters
-    ].
-
 -endif.

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -17,7 +17,7 @@
     desc/1
 ]).
 
-%% emqx_bridge_enterprise "unofficial" API
+%% `emqx_bridge_v2_schema' "unofficial" API
 -export([
     bridge_v2_examples/1,
     conn_bridge_examples/1,

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl

@@ -134,7 +134,7 @@ start(
 
 -spec stop(resource_id()) -> ok | {error, term()}.
 stop(ResourceId) ->
-    ?tp(gcp_pubsub_stop, #{resource_id => ResourceId}),
+    ?tp(gcp_pubsub_stop, #{instance_id => ResourceId, resource_id => ResourceId}),
     ?SLOG(info, #{
         msg => "stopping_gcp_pubsub_bridge",
         connector => ResourceId

+ 129 - 66
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -8,23 +8,30 @@
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--type config() :: #{
-    attributes_template := [#{key := binary(), value := binary()}],
+-type connector_config() :: #{
     connect_timeout := emqx_schema:duration_ms(),
     max_retries := non_neg_integer(),
-    ordering_key_template := binary(),
-    payload_template := binary(),
-    pubsub_topic := binary(),
     resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
-    service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
-    any() => term()
+    service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json()
 }.
--type state() :: #{
-    attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
+-type action_config() :: #{
+    parameters := #{
+        attributes_template := [#{key := binary(), value := binary()}],
+        ordering_key_template := binary(),
+        payload_template := binary(),
+        pubsub_topic := binary()
+    },
+    resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}
+}.
+-type connector_state() :: #{
     client := emqx_bridge_gcp_pubsub_client:state(),
+    installed_actions := #{action_resource_id() => action_state()},
+    project_id := emqx_bridge_gcp_pubsub_client:project_id()
+}.
+-type action_state() :: #{
+    attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
     ordering_key_template := emqx_placeholder:tmpl_token(),
     payload_template := emqx_placeholder:tmpl_token(),
-    project_id := emqx_bridge_gcp_pubsub_client:project_id(),
     pubsub_topic := binary()
 }.
 -type headers() :: emqx_bridge_gcp_pubsub_client:headers().
@@ -41,7 +48,11 @@
     on_query_async/4,
     on_batch_query/3,
     on_batch_query_async/4,
-    on_get_status/2
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
 -export([reply_delegator/2]).
@@ -54,53 +65,45 @@ callback_mode() -> async_if_possible.
 
 query_mode(_Config) -> async.
 
--spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+-spec on_start(connector_resource_id(), connector_config()) ->
+    {ok, connector_state()} | {error, term()}.
 on_start(InstanceId, Config0) ->
     ?SLOG(info, #{
         msg => "starting_gcp_pubsub_bridge",
         config => Config0
     }),
     Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
-    #{
-        attributes_template := AttributesTemplate,
-        ordering_key_template := OrderingKeyTemplate,
-        payload_template := PayloadTemplate,
-        pubsub_topic := PubSubTopic,
-        service_account_json := #{<<"project_id">> := ProjectId}
-    } = Config,
+    #{service_account_json := #{<<"project_id">> := ProjectId}} = Config,
     case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of
         {ok, Client} ->
             State = #{
                 client => Client,
-                attributes_template => preproc_attributes(AttributesTemplate),
-                ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
-                payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
-                project_id => ProjectId,
-                pubsub_topic => PubSubTopic
+                installed_actions => #{},
+                project_id => ProjectId
             },
             {ok, State};
         Error ->
             Error
     end.
 
--spec on_stop(resource_id(), state()) -> ok | {error, term()}.
+-spec on_stop(connector_resource_id(), connector_state()) -> ok | {error, term()}.
 on_stop(InstanceId, _State) ->
     emqx_bridge_gcp_pubsub_client:stop(InstanceId).
 
--spec on_get_status(resource_id(), state()) -> connected | disconnected.
+-spec on_get_status(connector_resource_id(), connector_state()) -> connected | disconnected.
 on_get_status(_InstanceId, #{client := Client} = _State) ->
     emqx_bridge_gcp_pubsub_client:get_status(Client).
 
 -spec on_query(
-    resource_id(),
-    {send_message, map()},
-    state()
+    connector_resource_id(),
+    {message_tag(), map()},
+    connector_state()
 ) ->
     {ok, map()}
     | {error, {recoverable_error, term()}}
     | {error, term()}.
-on_query(ResourceId, {send_message, Selected}, State) ->
-    Requests = [{send_message, Selected}],
+on_query(ResourceId, {MessageTag, Selected}, State) ->
+    Requests = [{MessageTag, Selected}],
     ?TRACE(
         "QUERY_SYNC",
         "gcp_pubsub_received",
@@ -109,24 +112,25 @@ on_query(ResourceId, {send_message, Selected}, State) ->
     do_send_requests_sync(State, Requests, ResourceId).
 
 -spec on_query_async(
-    resource_id(),
-    {send_message, map()},
+    connector_resource_id(),
+    {message_tag(), map()},
     {ReplyFun :: function(), Args :: list()},
-    state()
+    connector_state()
 ) -> {ok, pid()} | {error, no_pool_worker_available}.
-on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
-    Requests = [{send_message, Selected}],
+on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) ->
+    Requests = [{MessageTag, Selected}],
     ?TRACE(
         "QUERY_ASYNC",
         "gcp_pubsub_received",
         #{requests => Requests, connector => ResourceId, state => State}
     ),
+    ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
     do_send_requests_async(State, Requests, ReplyFunAndArgs).
 
 -spec on_batch_query(
-    resource_id(),
-    [{send_message, map()}],
-    state()
+    connector_resource_id(),
+    [{message_tag(), map()}],
+    connector_state()
 ) ->
     {ok, map()}
     | {error, {recoverable_error, term()}}
@@ -140,10 +144,10 @@ on_batch_query(ResourceId, Requests, State) ->
     do_send_requests_sync(State, Requests, ResourceId).
 
 -spec on_batch_query_async(
-    resource_id(),
-    [{send_message, map()}],
+    connector_resource_id(),
+    [{message_tag(), map()}],
     {ReplyFun :: function(), Args :: list()},
-    state()
+    connector_state()
 ) -> {ok, pid()} | {error, no_pool_worker_available}.
 on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
     ?TRACE(
@@ -151,32 +155,92 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
         "gcp_pubsub_received",
         #{requests => Requests, connector => ResourceId, state => State}
     ),
+    ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}),
     do_send_requests_async(State, Requests, ReplyFunAndArgs).
 
+-spec on_add_channel(
+    connector_resource_id(),
+    connector_state(),
+    action_resource_id(),
+    action_config()
+) ->
+    {ok, connector_state()}.
+on_add_channel(_ConnectorResId, ConnectorState0, ActionId, ActionConfig) ->
+    #{installed_actions := InstalledActions0} = ConnectorState0,
+    ChannelState = install_channel(ActionConfig),
+    InstalledActions = InstalledActions0#{ActionId => ChannelState},
+    ConnectorState = ConnectorState0#{installed_actions := InstalledActions},
+    {ok, ConnectorState}.
+
+-spec on_remove_channel(
+    connector_resource_id(),
+    connector_state(),
+    action_resource_id()
+) ->
+    {ok, connector_state()}.
+on_remove_channel(_ConnectorResId, ConnectorState0, ActionId) ->
+    #{installed_actions := InstalledActions0} = ConnectorState0,
+    InstalledActions = maps:remove(ActionId, InstalledActions0),
+    ConnectorState = ConnectorState0#{installed_actions := InstalledActions},
+    {ok, ConnectorState}.
+
+-spec on_get_channels(connector_resource_id()) ->
+    [{action_resource_id(), action_config()}].
+on_get_channels(ConnectorResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ConnectorResId).
+
+-spec on_get_channel_status(connector_resource_id(), action_resource_id(), connector_state()) ->
+    health_check_status().
+on_get_channel_status(_ConnectorResId, _ChannelId, _ConnectorState) ->
+    %% Should we check the underlying client?  Same as on_get_status?
+    ?status_connected.
+
 %%-------------------------------------------------------------------------------------------------
 %% Helper fns
 %%-------------------------------------------------------------------------------------------------
 
+%% TODO: check if topic exists ("unhealthy target")
+install_channel(ActionConfig) ->
+    #{
+        parameters := #{
+            attributes_template := AttributesTemplate,
+            ordering_key_template := OrderingKeyTemplate,
+            payload_template := PayloadTemplate,
+            pubsub_topic := PubSubTopic
+        }
+    } = ActionConfig,
+    #{
+        attributes_template => preproc_attributes(AttributesTemplate),
+        ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
+        payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
+        pubsub_topic => PubSubTopic
+    }.
+
 -spec do_send_requests_sync(
-    state(),
-    [{send_message, map()}],
+    connector_state(),
+    [{message_tag(), map()}],
     resource_id()
 ) ->
     {ok, status_code(), headers()}
     | {ok, status_code(), headers(), body()}
     | {error, {recoverable_error, term()}}
     | {error, term()}.
-do_send_requests_sync(State, Requests, InstanceId) ->
-    #{client := Client} = State,
+do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
+    ?tp(gcp_pubsub_producer_sync, #{instance_id => InstanceId, requests => Requests}),
+    #{client := Client} = ConnectorState,
+    %% is it safe to assume the tag is the same???  And not empty???
+    [{MessageTag, _} | _] = Requests,
+    #{installed_actions := InstalledActions} = ConnectorState,
+    ChannelState = maps:get(MessageTag, InstalledActions),
     Payloads =
         lists:map(
-            fun({send_message, Selected}) ->
-                encode_payload(State, Selected)
+            fun({_MessageTag, Selected}) ->
+                encode_payload(ChannelState, Selected)
             end,
             Requests
         ),
     Body = to_pubsub_request(Payloads),
-    Path = publish_path(State),
+    Path = publish_path(ConnectorState, ChannelState),
     Method = post,
     Request = {prepared_request, {Method, Path, Body}},
     Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client),
@@ -184,21 +248,25 @@ do_send_requests_sync(State, Requests, InstanceId) ->
     handle_result(Result, Request, QueryMode, InstanceId).
 
 -spec do_send_requests_async(
-    state(),
-    [{send_message, map()}],
+    connector_state(),
+    [{message_tag(), map()}],
     {ReplyFun :: function(), Args :: list()}
 ) -> {ok, pid()} | {error, no_pool_worker_available}.
-do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
-    #{client := Client} = State,
+do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
+    #{client := Client} = ConnectorState,
+    %% is it safe to assume the tag is the same???  And not empty???
+    [{MessageTag, _} | _] = Requests,
+    #{installed_actions := InstalledActions} = ConnectorState,
+    ChannelState = maps:get(MessageTag, InstalledActions),
     Payloads =
         lists:map(
-            fun({send_message, Selected}) ->
-                encode_payload(State, Selected)
+            fun({_MessageTag, Selected}) ->
+                encode_payload(ChannelState, Selected)
             end,
             Requests
         ),
     Body = to_pubsub_request(Payloads),
-    Path = publish_path(State),
+    Path = publish_path(ConnectorState, ChannelState),
     Method = post,
     Request = {prepared_request, {Method, Path, Body}},
     ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
@@ -206,18 +274,18 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
         Request, ReplyFunAndArgs, Client
     ).
 
--spec encode_payload(state(), Selected :: map()) ->
+-spec encode_payload(action_state(), Selected :: map()) ->
     #{
         data := binary(),
         attributes => #{binary() => binary()},
         'orderingKey' => binary()
     }.
-encode_payload(State, Selected) ->
+encode_payload(ActionState, Selected) ->
     #{
         attributes_template := AttributesTemplate,
         ordering_key_template := OrderingKeyTemplate,
         payload_template := PayloadTemplate
-    } = State,
+    } = ActionState,
     Data = render_payload(PayloadTemplate, Selected),
     OrderingKey = render_key(OrderingKeyTemplate, Selected),
     Attributes = proc_attributes(AttributesTemplate, Selected),
@@ -307,13 +375,8 @@ proc_attributes(AttributesTemplate, Selected) ->
 to_pubsub_request(Payloads) ->
     emqx_utils_json:encode(#{messages => Payloads}).
 
--spec publish_path(state()) -> binary().
-publish_path(
-    _State = #{
-        project_id := ProjectId,
-        pubsub_topic := PubSubTopic
-    }
-) ->
+-spec publish_path(connector_state(), action_state()) -> binary().
+publish_path(#{project_id := ProjectId}, #{pubsub_topic := PubSubTopic}) ->
     <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
 
 handle_result({error, Reason}, _Request, QueryMode, ResourceId) when

+ 46 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_producer_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0,
+    bridge_v1_config_to_action_config/2
+]).
+
+bridge_v1_type_name() -> gcp_pubsub.
+
+action_type_name() -> gcp_pubsub_producer.
+
+connector_type_name() -> gcp_pubsub_producer.
+
+schema_module() -> emqx_bridge_gcp_pubsub_producer_schema.
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    CommonActionKeys = emqx_bridge_v2_schema:top_level_common_action_keys(),
+    ParamsKeys = producer_action_parameters_field_keys(),
+    Config1 = maps:with(CommonActionKeys, BridgeV1Config),
+    Params = maps:with(ParamsKeys, BridgeV1Config),
+    Config1#{
+        <<"connector">> => ConnectorName,
+        <<"parameters">> => Params
+    }.
+
+%%------------------------------------------------------------------------------------------
+%% Internal helper fns
+%%------------------------------------------------------------------------------------------
+
+producer_action_parameters_field_keys() ->
+    [
+        to_bin(K)
+     || {K, _} <- emqx_bridge_gcp_pubsub_producer_schema:fields(action_parameters)
+    ].
+
+to_bin(L) when is_list(L) -> list_to_binary(L);
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

+ 223 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl

@@ -0,0 +1,223 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_producer_schema).
+
+-import(hoconsc, [mk/2, ref/2]).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% `hocon_schema' API
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+%% `emqx_bridge_v2_schema' "unofficial" API
+-export([
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() ->
+    "gcp_pubsub_producer".
+
+roots() ->
+    [].
+
+%%=========================================
+%% Action fields
+%%=========================================
+fields(action) ->
+    {gcp_pubsub_producer,
+        mk(
+            hoconsc:map(name, ref(?MODULE, producer_action)),
+            #{
+                desc => <<"GCP PubSub Producer Action Config">>,
+                required => false
+            }
+        )};
+fields(producer_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        mk(
+            ref(?MODULE, action_parameters),
+            #{
+                required => true,
+                desc => ?DESC(producer_action)
+            }
+        )
+    );
+fields(action_parameters) ->
+    UnsupportedFields = [local_topic],
+    lists:filter(
+        fun({Key, _Schema}) -> not lists:member(Key, UnsupportedFields) end,
+        emqx_bridge_gcp_pubsub:fields(producer)
+    );
+%%=========================================
+%% Connector fields
+%%=========================================
+fields("config_connector") ->
+    %% FIXME
+    emqx_connector_schema:common_fields() ++
+        emqx_bridge_gcp_pubsub:fields(connector_config) ++
+        emqx_resource_schema:fields("resource_opts");
+%%=========================================
+%% HTTP API fields
+%%=========================================
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
+fields("post_bridge_v2") ->
+    [type_field(), name_field() | fields("put_bridge_v2")];
+fields("put_bridge_v2") ->
+    fields(producer_action).
+
+desc("config_connector") ->
+    ?DESC("config_connector");
+desc(action_parameters) ->
+    ?DESC(action_parameters);
+desc(producer_action) ->
+    ?DESC(producer_action);
+desc(_Name) ->
+    undefined.
+
+type_field() ->
+    {type, mk(gcp_pubsub_producer, #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_bridge_v2_schema' "unofficial" API
+%%-------------------------------------------------------------------------------------------------
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"gcp_pubsub_producer">> => #{
+                summary => <<"GCP PubSub Producer Action">>,
+                value => action_example(Method)
+            }
+        }
+    ].
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"gcp_pubsub_producer">> => #{
+                summary => <<"GCP PubSub Producer Connector">>,
+                value => connector_example(Method)
+            }
+        }
+    ].
+
+conn_bridge_examples(Method) ->
+    emqx_bridge_gcp_pubsub:conn_bridge_examples(Method).
+
+action_example(post) ->
+    maps:merge(
+        action_example(put),
+        #{
+            type => <<"gcp_pubsub_producer">>,
+            name => <<"my_action">>
+        }
+    );
+action_example(get) ->
+    maps:merge(
+        action_example(put),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+action_example(put) ->
+    #{
+        enable => true,
+        connector => <<"my_connector_name">>,
+        description => <<"My action">>,
+        local_topic => <<"local/topic">>,
+        resource_opts =>
+            #{batch_size => 5},
+        parameters =>
+            #{
+                pubsub_topic => <<"mytopic">>,
+                ordering_key_template => <<"${payload.ok}">>,
+                payload_template => <<"${payload}">>,
+                attributes_template =>
+                    [
+                        #{
+                            key => <<"${payload.attrs.k}">>,
+                            value => <<"${payload.attrs.v}">>
+                        }
+                    ]
+            }
+    }.
+
+connector_example(get) ->
+    maps:merge(
+        connector_example(put),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+connector_example(post) ->
+    maps:merge(
+        connector_example(put),
+        #{
+            type => <<"gcp_pubsub_producer">>,
+            name => <<"my_connector">>
+        }
+    );
+connector_example(put) ->
+    #{
+        enable => true,
+        connect_timeout => <<"10s">>,
+        pool_size => 8,
+        pipelining => 100,
+        max_retries => 2,
+        resource_opts => #{request_ttl => <<"60s">>},
+        service_account_json =>
+            #{
+                auth_provider_x509_cert_url =>
+                    <<"https://www.googleapis.com/oauth2/v1/certs">>,
+                auth_uri =>
+                    <<"https://accounts.google.com/o/oauth2/auth">>,
+                client_email =>
+                    <<"test@myproject.iam.gserviceaccount.com">>,
+                client_id => <<"123812831923812319190">>,
+                client_x509_cert_url =>
+                    <<
+                        "https://www.googleapis.com/robot/v1/"
+                        "metadata/x509/test%40myproject.iam.gserviceaccount.com"
+                    >>,
+                private_key =>
+                    <<
+                        "-----BEGIN PRIVATE KEY-----\n"
+                        "MIIEvQI..."
+                    >>,
+                private_key_id => <<"kid">>,
+                project_id => <<"myproject">>,
+                token_uri =>
+                    <<"https://oauth2.googleapis.com/token">>,
+                type => <<"service_account">>
+            }
+    }.

+ 131 - 58
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -13,8 +13,12 @@
 -include_lib("jose/include/jose_jwt.hrl").
 -include_lib("jose/include/jose_jws.hrl").
 
--define(BRIDGE_TYPE, gcp_pubsub).
--define(BRIDGE_TYPE_BIN, <<"gcp_pubsub">>).
+-define(ACTION_TYPE, gcp_pubsub_producer).
+-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>).
+-define(CONNECTOR_TYPE, gcp_pubsub_producer).
+-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>).
+-define(BRIDGE_V1_TYPE, gcp_pubsub).
+-define(BRIDGE_V1_TYPE_BIN, <<"gcp_pubsub">>).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -141,19 +145,24 @@ end_per_testcase(_TestCase, _Config) ->
 
 generate_config(Config0) ->
     #{
-        name := Name,
+        name := ActionName,
         config_string := ConfigString,
         pubsub_config := PubSubConfig,
         service_account_json := ServiceAccountJSON
     } = gcp_pubsub_config(Config0),
-    ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name),
-    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name),
+    %% FIXME
+    %% `emqx_bridge_resource:resource_id' requires an existing connector in the config.....
+    ConnectorName = <<"connector_", ActionName/binary>>,
+    ConnectorResourceId = <<"connector:", ?CONNECTOR_TYPE_BIN/binary, ":", ConnectorName/binary>>,
+    ActionResourceId = emqx_bridge_v2:id(?ACTION_TYPE_BIN, ActionName, ConnectorName),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_V1_TYPE_BIN, ActionName),
     [
-        {gcp_pubsub_name, Name},
+        {gcp_pubsub_name, ActionName},
         {gcp_pubsub_config, PubSubConfig},
         {gcp_pubsub_config_string, ConfigString},
         {service_account_json, ServiceAccountJSON},
-        {resource_id, ResourceId},
+        {connector_resource_id, ConnectorResourceId},
+        {action_resource_id, ActionResourceId},
         {bridge_id, BridgeId}
         | Config0
     ].
@@ -168,7 +177,7 @@ delete_all_bridges() ->
     ).
 
 delete_bridge(Config) ->
-    Type = ?BRIDGE_TYPE,
+    Type = ?BRIDGE_V1_TYPE,
     Name = ?config(gcp_pubsub_name, Config),
     ct:pal("deleting bridge ~p", [{Type, Name}]),
     emqx_bridge:remove(Type, Name).
@@ -177,7 +186,7 @@ create_bridge(Config) ->
     create_bridge(Config, _GCPPubSubConfigOverrides = #{}).
 
 create_bridge(Config, GCPPubSubConfigOverrides) ->
-    TypeBin = ?BRIDGE_TYPE_BIN,
+    TypeBin = ?BRIDGE_V1_TYPE_BIN,
     Name = ?config(gcp_pubsub_name, Config),
     GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config),
     GCPPubSubConfig = emqx_utils_maps:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides),
@@ -190,7 +199,7 @@ create_bridge_http(Config) ->
     create_bridge_http(Config, _GCPPubSubConfigOverrides = #{}).
 
 create_bridge_http(Config, GCPPubSubConfigOverrides) ->
-    TypeBin = ?BRIDGE_TYPE_BIN,
+    TypeBin = ?BRIDGE_V1_TYPE_BIN,
     Name = ?config(gcp_pubsub_name, Config),
     GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config),
     GCPPubSubConfig = emqx_utils_maps:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides),
@@ -225,7 +234,7 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) ->
 
 create_rule_and_action_http(Config) ->
     GCPPubSubName = ?config(gcp_pubsub_name, Config),
-    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, GCPPubSubName),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_V1_TYPE_BIN, GCPPubSubName),
     Params = #{
         enable => true,
         sql => <<"SELECT * FROM \"t/topic\"">>,
@@ -382,9 +391,14 @@ assert_metrics(ExpectedMetrics, ResourceId) ->
     CurrentMetrics = current_metrics(ResourceId),
     TelemetryTable = get(telemetry_table),
     RecordedEvents = ets:tab2list(TelemetryTable),
-    ?assertEqual(ExpectedMetrics, Metrics, #{
-        current_metrics => CurrentMetrics, recorded_events => RecordedEvents
-    }),
+    ?retry(
+        _Sleep0 = 300,
+        _Attempts = 20,
+        ?assertEqual(ExpectedMetrics, Metrics, #{
+            current_metrics => CurrentMetrics,
+            recorded_events => RecordedEvents
+        })
+    ),
     ok.
 
 assert_empty_metrics(ResourceId) ->
@@ -535,8 +549,30 @@ install_telemetry_handler(TestCase) ->
     end),
     Tid.
 
+mk_res_id_filter(ResourceId) ->
+    fun(Event) ->
+        case Event of
+            #{metadata := #{resource_id := ResId}} when ResId =:= ResourceId ->
+                true;
+            _ ->
+                false
+        end
+    end.
+
 wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
-    Events = receive_all_events(GaugeName, Timeout),
+    wait_until_gauge_is(#{
+        gauge_name => GaugeName,
+        expected => ExpectedValue,
+        timeout => Timeout
+    }).
+
+wait_until_gauge_is(#{} = Opts) ->
+    GaugeName = maps:get(gauge_name, Opts),
+    ExpectedValue = maps:get(expected, Opts),
+    Timeout = maps:get(timeout, Opts),
+    MaxEvents = maps:get(max_events, Opts, 10),
+    FilterFn = maps:get(filter_fn, Opts, fun(_Event) -> true end),
+    Events = receive_all_events(GaugeName, Timeout, MaxEvents, FilterFn),
     case length(Events) > 0 andalso lists:last(Events) of
         #{measurements := #{gauge_set := ExpectedValue}} ->
             ok;
@@ -550,15 +586,36 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
             ct:pal("no ~p gauge events received!", [GaugeName])
     end.
 
-receive_all_events(EventName, Timeout) ->
-    receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []).
+receive_all_events(EventName, Timeout, MaxEvents, FilterFn) ->
+    receive_all_events(EventName, Timeout, MaxEvents, FilterFn, _Count = 0, _Acc = []).
 
-receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
+receive_all_events(_EventName, _Timeout, MaxEvents, _FilterFn, Count, Acc) when
+    Count >= MaxEvents
+->
     lists:reverse(Acc);
-receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
+receive_all_events(EventName, Timeout, MaxEvents, FilterFn, Count, Acc) ->
     receive
         {telemetry, #{name := [_, _, EventName]} = Event} ->
-            receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
+            case FilterFn(Event) of
+                true ->
+                    receive_all_events(
+                        EventName,
+                        Timeout,
+                        MaxEvents,
+                        FilterFn,
+                        Count + 1,
+                        [Event | Acc]
+                    );
+                false ->
+                    receive_all_events(
+                        EventName,
+                        Timeout,
+                        MaxEvents,
+                        FilterFn,
+                        Count,
+                        Acc
+                    )
+            end
     after Timeout ->
         lists:reverse(Acc)
     end.
@@ -597,14 +654,14 @@ wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
 %%------------------------------------------------------------------------------
 
 t_publish_success(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ActionResourceId = ?config(action_resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
     Topic = <<"t/topic">>,
     ?assertMatch({ok, _}, create_bridge(Config)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    assert_empty_metrics(ActionResourceId),
     Payload = <<"payload">>,
     Message = emqx_message:make(Topic, Payload),
     emqx:publish(Message),
@@ -620,7 +677,7 @@ t_publish_success(Config) ->
         DecodedMessages
     ),
     %% to avoid test flakiness
-    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_telemetry_event(TelemetryTable, success, ActionResourceId),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -633,7 +690,7 @@ t_publish_success(Config) ->
             retried => 0,
             success => 1
         },
-        ResourceId
+        ActionResourceId
     ),
     ok.
 
@@ -662,12 +719,12 @@ t_publish_success_infinity_timeout(Config) ->
     ok.
 
 t_publish_success_local_topic(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ActionResourceId = ?config(action_resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
     LocalTopic = <<"local/topic">>,
     {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
-    assert_empty_metrics(ResourceId),
+    assert_empty_metrics(ActionResourceId),
     Payload = <<"payload">>,
     Message = emqx_message:make(LocalTopic, Payload),
     emqx:publish(Message),
@@ -682,7 +739,7 @@ t_publish_success_local_topic(Config) ->
         DecodedMessages
     ),
     %% to avoid test flakiness
-    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_telemetry_event(TelemetryTable, success, ActionResourceId),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -695,7 +752,7 @@ t_publish_success_local_topic(Config) ->
             retried => 0,
             success => 1
         },
-        ResourceId
+        ActionResourceId
     ),
     ok.
 
@@ -704,7 +761,7 @@ t_create_via_http(Config) ->
     ok.
 
 t_publish_templated(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ActionResourceId = ?config(action_resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
     Topic = <<"t/topic">>,
@@ -721,7 +778,7 @@ t_publish_templated(Config) ->
     ),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    assert_empty_metrics(ActionResourceId),
     Payload = <<"payload">>,
     Message =
         emqx_message:set_header(
@@ -747,7 +804,7 @@ t_publish_templated(Config) ->
         DecodedMessages
     ),
     %% to avoid test flakiness
-    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_telemetry_event(TelemetryTable, success, ActionResourceId),
     wait_until_gauge_is(queuing, 0, 500),
     wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
@@ -760,7 +817,7 @@ t_publish_templated(Config) ->
             retried => 0,
             success => 1
         },
-        ResourceId
+        ActionResourceId
     ),
     ok.
 
@@ -774,7 +831,7 @@ t_publish_success_batch(Config) ->
     end.
 
 test_publish_success_batch(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ActionResourceId = ?config(action_resource_id, Config),
     ServiceAccountJSON = ?config(service_account_json, Config),
     TelemetryTable = ?config(telemetry_table, Config),
     Topic = <<"t/topic">>,
@@ -796,7 +853,7 @@ test_publish_success_batch(Config) ->
     ),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    assert_empty_metrics(ActionResourceId),
     NumMessages = BatchSize * 2,
     Messages = [emqx_message:make(Topic, integer_to_binary(N)) || N <- lists:seq(1, NumMessages)],
     %% publish in parallel to avoid each client blocking and then
@@ -822,7 +879,7 @@ test_publish_success_batch(Config) ->
     wait_telemetry_event(
         TelemetryTable,
         success,
-        ResourceId,
+        ActionResourceId,
         #{timeout => 15_000, n_events => NumMessages}
     ),
     wait_until_gauge_is(queuing, 0, _Timeout = 400),
@@ -837,7 +894,7 @@ test_publish_success_batch(Config) ->
             retried => 0,
             success => NumMessages
         },
-        ResourceId
+        ActionResourceId
     ),
     ok.
 
@@ -1045,7 +1102,7 @@ t_jose_other_error(Config) ->
         fun(Res, Trace) ->
             ?assertMatch({ok, _}, Res),
             ?assertMatch(
-                [#{error := {invalid_private_key, {unknown, error}}}],
+                [#{error := {invalid_private_key, {unknown, error}}} | _],
                 ?of_kind(gcp_pubsub_connector_startup_error, Trace)
             ),
             ok
@@ -1054,7 +1111,7 @@ t_jose_other_error(Config) ->
     ok.
 
 t_publish_econnrefused(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ResourceId = ?config(connector_resource_id, Config),
     %% set pipelining to 1 so that one of the 2 requests is `pending'
     %% in ehttpc.
     {ok, _} = create_bridge(
@@ -1071,7 +1128,7 @@ t_publish_econnrefused(Config) ->
     do_econnrefused_or_timeout_test(Config, econnrefused).
 
 t_publish_timeout(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ActionResourceId = ?config(action_resource_id, Config),
     %% set pipelining to 1 so that one of the 2 requests is `pending'
     %% in ehttpc. also, we set the batch size to 1 to also ensure the
     %% requests are done separately.
@@ -1079,12 +1136,13 @@ t_publish_timeout(Config) ->
         <<"pipelining">> => 1,
         <<"resource_opts">> => #{
             <<"batch_size">> => 1,
-            <<"resume_interval">> => <<"1s">>
+            <<"resume_interval">> => <<"1s">>,
+            <<"metrics_flush_interval">> => <<"700ms">>
         }
     }),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    assert_empty_metrics(ActionResourceId),
     TestPid = self(),
     TimeoutHandler =
         fun(Req0, State) ->
@@ -1107,7 +1165,8 @@ t_publish_timeout(Config) ->
     do_econnrefused_or_timeout_test(Config, timeout).
 
 do_econnrefused_or_timeout_test(Config, Error) ->
-    ResourceId = ?config(resource_id, Config),
+    ActionResourceId = ?config(action_resource_id, Config),
+    ConnectorResourceId = ?config(connector_resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
     Topic = <<"t/topic">>,
     Payload = <<"payload">>,
@@ -1156,9 +1215,9 @@ do_econnrefused_or_timeout_test(Config, Error) ->
             case Error of
                 econnrefused ->
                     case ?of_kind(gcp_pubsub_request_failed, Trace) of
-                        [#{reason := Error, connector := ResourceId} | _] ->
+                        [#{reason := Error, connector := ConnectorResourceId} | _] ->
                             ok;
-                        [#{reason := {closed, _Msg}, connector := ResourceId} | _] ->
+                        [#{reason := {closed, _Msg}, connector := ConnectorResourceId} | _] ->
                             %% _Msg = "The connection was lost."
                             ok;
                         Trace0 ->
@@ -1182,7 +1241,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
             %% even waiting, hard to avoid flakiness... simpler to just sleep
             %% a bit until stabilization.
             ct:sleep(200),
-            CurrentMetrics = current_metrics(ResourceId),
+            CurrentMetrics = current_metrics(ActionResourceId),
             RecordedEvents = ets:tab2list(TelemetryTable),
             ct:pal("telemetry events: ~p", [RecordedEvents]),
             ?assertMatch(
@@ -1198,7 +1257,19 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 CurrentMetrics
             );
         timeout ->
-            wait_until_gauge_is(inflight, 0, _Timeout = 1_000),
+            wait_telemetry_event(
+                TelemetryTable,
+                late_reply,
+                ActionResourceId,
+                #{timeout => 5_000, n_events => 2}
+            ),
+            wait_until_gauge_is(#{
+                gauge_name => inflight,
+                expected => 0,
+                filter_fn => mk_res_id_filter(ActionResourceId),
+                timeout => 1_000,
+                max_events => 20
+            }),
             wait_until_gauge_is(queuing, 0, _Timeout = 1_000),
             assert_metrics(
                 #{
@@ -1211,7 +1282,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                     success => 0,
                     late_reply => 2
                 },
-                ResourceId
+                ActionResourceId
             )
     end,
 
@@ -1334,7 +1405,8 @@ t_failure_no_body(Config) ->
     ok.
 
 t_unrecoverable_error(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ActionResourceId = ?config(action_resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
     TestPid = self(),
     FailureNoBodyHandler =
         fun(Req0, State) ->
@@ -1358,7 +1430,7 @@ t_unrecoverable_error(Config) ->
     ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
     Topic = <<"t/topic">>,
     {ok, _} = create_bridge(Config),
-    assert_empty_metrics(ResourceId),
+    assert_empty_metrics(ActionResourceId),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
     Payload = <<"payload">>,
@@ -1386,6 +1458,7 @@ t_unrecoverable_error(Config) ->
     %% removed, this inflight should be 1, because we retry if
     %% the worker is killed.
     wait_until_gauge_is(inflight, 0, _Timeout = 400),
+    wait_telemetry_event(TelemetryTable, failed, ActionResourceId),
     assert_metrics(
         #{
             dropped => 0,
@@ -1398,7 +1471,7 @@ t_unrecoverable_error(Config) ->
             retried => 0,
             success => 0
         },
-        ResourceId
+        ActionResourceId
     ),
     ok.
 
@@ -1407,7 +1480,7 @@ t_stop(Config) ->
     {ok, _} = create_bridge(Config),
     ?check_trace(
         ?wait_async_action(
-            emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
+            emqx_bridge_resource:stop(?BRIDGE_V1_TYPE, Name),
             #{?snk_kind := gcp_pubsub_stop},
             5_000
         ),
@@ -1421,13 +1494,13 @@ t_stop(Config) ->
     ok.
 
 t_get_status_ok(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ResourceId = ?config(connector_resource_id, Config),
     {ok, _} = create_bridge(Config),
     ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
     ok.
 
 t_get_status_no_worker(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ResourceId = ?config(connector_resource_id, Config),
     {ok, _} = create_bridge(Config),
     emqx_common_test_helpers:with_mock(
         ehttpc,
@@ -1441,7 +1514,7 @@ t_get_status_no_worker(Config) ->
     ok.
 
 t_get_status_down(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ResourceId = ?config(connector_resource_id, Config),
     {ok, _} = create_bridge(Config),
     emqx_common_test_helpers:with_mock(
         ehttpc,
@@ -1457,7 +1530,7 @@ t_get_status_down(Config) ->
     ok.
 
 t_get_status_timeout_calling_workers(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    ResourceId = ?config(connector_resource_id, Config),
     {ok, _} = create_bridge(Config),
     emqx_common_test_helpers:with_mock(
         ehttpc,
@@ -1520,7 +1593,7 @@ t_on_start_ehttpc_pool_start_failure(Config) ->
         ),
         fun(Trace) ->
             ?assertMatch(
-                [#{reason := some_error}],
+                [#{reason := some_error} | _],
                 ?of_kind(gcp_pubsub_ehttpc_pool_start_failure, Trace)
             ),
             ok
@@ -1668,7 +1741,7 @@ t_attributes(Config) ->
             ),
             %% ensure loading cluster override file doesn't mangle the attribute
             %% placeholders...
-            #{<<"bridges">> := #{?BRIDGE_TYPE_BIN := #{Name := RawConf}}} =
+            #{<<"actions">> := #{?ACTION_TYPE_BIN := #{Name := RawConf}}} =
                 emqx_config:read_override_conf(#{override_to => cluster}),
             ?assertEqual(
                 [
@@ -1689,7 +1762,7 @@ t_attributes(Config) ->
                         <<"value">> => <<"${.payload.value}">>
                     }
                 ],
-                maps:get(<<"attributes_template">>, RawConf)
+                emqx_utils_maps:deep_get([<<"parameters">>, <<"attributes_template">>], RawConf)
             ),
             ok
         end,

+ 10 - 0
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -25,6 +25,8 @@ resource_type(azure_event_hub_producer) ->
     emqx_bridge_kafka_impl_producer;
 resource_type(confluent_producer) ->
     emqx_bridge_kafka_impl_producer;
+resource_type(gcp_pubsub_producer) ->
+    emqx_bridge_gcp_pubsub_impl_producer;
 resource_type(kafka_producer) ->
     emqx_bridge_kafka_impl_producer;
 resource_type(syskeeper_forwarder) ->
@@ -65,6 +67,14 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {gcp_pubsub_producer,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_producer_schema, "config_connector")),
+                #{
+                    desc => <<"GCP PubSub Producer Connector Config">>,
+                    required => false
+                }
+            )},
         {kafka_producer,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),

+ 18 - 6
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -24,7 +24,8 @@
 
 -export([
     transform_bridges_v1_to_connectors_and_bridges_v2/1,
-    transform_bridge_v1_config_to_action_config/4
+    transform_bridge_v1_config_to_action_config/4,
+    top_level_common_connector_keys/0
 ]).
 
 -export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
@@ -32,6 +33,7 @@
 -export([get_response/0, put_request/0, post_request/0]).
 
 -export([connector_type_to_bridge_types/1]).
+-export([common_fields/0]).
 
 -if(?EMQX_RELEASE_EDITION == ee).
 enterprise_api_schemas(Method) ->
@@ -64,6 +66,7 @@ enterprise_fields_connectors() -> [].
 
 connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
 connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
+connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub_producer];
 connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
 connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
 connector_type_to_bridge_types(syskeeper_proxy) -> [].
@@ -159,17 +162,20 @@ transform_bridge_v1_config_to_action_config(
         BridgeV1Conf, ConnectorName, ConnectorFields
     ).
 
-transform_bridge_v1_config_to_action_config(
-    BridgeV1Conf, ConnectorName, ConnectorFields
-) ->
-    TopKeys = [
+top_level_common_connector_keys() ->
+    [
         <<"enable">>,
         <<"connector">>,
         <<"local_topic">>,
         <<"resource_opts">>,
         <<"description">>,
         <<"parameters">>
-    ],
+    ].
+
+transform_bridge_v1_config_to_action_config(
+    BridgeV1Conf, ConnectorName, ConnectorFields
+) ->
+    TopKeys = top_level_common_connector_keys(),
     TopKeysMap = maps:from_keys(TopKeys, true),
     %% Remove connector fields
     ActionMap0 = lists:foldl(
@@ -352,6 +358,12 @@ desc(connectors) ->
 desc(_) ->
     undefined.
 
+common_fields() ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {description, emqx_schema:description_schema()}
+    ].
+
 %%======================================================================================
 %% Helper Functions
 %%======================================================================================

+ 4 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -111,6 +111,10 @@
     | {error, {recoverable_error, term()}}
     | {error, term()}.
 
+-type action_resource_id() :: resource_id().
+-type connector_resource_id() :: resource_id().
+-type message_tag() :: action_resource_id().
+
 -define(WORKER_POOL_SIZE, 16).
 
 -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024).

+ 18 - 0
rel/i18n/emqx_bridge_gcp_pubsub_producer_schema.hocon

@@ -0,0 +1,18 @@
+emqx_bridge_gcp_pubsub_producer_schema {
+
+  action_parameters.desc:
+  """Action specific configs."""
+  action_parameters.label:
+  """Action"""
+
+  producer_action.desc:
+  """Action configs."""
+  producer_action.label:
+  """Action"""
+
+  config_connector.desc:
+  """Configuration for a GCP PubSub Producer Client."""
+  config_connector.label:
+  """GCP PubSub Producer Client Configuration"""
+
+}

+ 10 - 0
rel/i18n/emqx_bridge_v2_schema.hocon

@@ -6,4 +6,14 @@ desc_bridges_v2.desc:
 desc_bridges_v2.label:
 """Bridge Configuration"""
 
+mqtt_topic.desc:
+"""MQTT topic or topic filter as data source (action input).  If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in the remote system."""
+mqtt_topic.label:
+"""Source MQTT Topic"""
+
+config_enable.desc:
+"""Enable (true) or disable (false) this action."""
+config_enable.label:
+"""Enable or Disable"""
+
 }

+ 5 - 3
rel/i18n/emqx_connector_schema.hocon

@@ -2,15 +2,17 @@ emqx_connector_schema {
 
 desc_connectors.desc:
 """Connectors that are used to connect to external systems"""
-
 desc_connectors.label:
 """Connectors"""
 
-
 connector_field.desc:
 """Name of connector used to connect to the resource where the action is to be performed."""
-
 connector_field.label:
 """Connector"""
 
+config_enable.desc:
+"""Enable (true) or disable (false) this connector."""
+config_enable.label:
+"""Enable or Disable"""
+
 }