Selaa lähdekoodia

Merge pull request #11403 from thalesmg/gcp-produ-attr-20230728

feat(gcp_producer): add support for defining message attributes and ordering key
Thales Macedo Garitezi 2 vuotta sitten
vanhempi
commit
5d707c8b7d

+ 1 - 1
apps/emqx/rebar.config

@@ -30,7 +30,7 @@
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
-    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}},
+    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_gcp_pubsub, [
     {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [
         kernel,

+ 30 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl

@@ -113,6 +113,22 @@ fields(connector_config) ->
     ];
 fields(producer) ->
     [
+        {attributes_template,
+            sc(
+                hoconsc:array(ref(key_value_pair)),
+                #{
+                    default => [],
+                    desc => ?DESC("attributes_template")
+                }
+            )},
+        {ordering_key_template,
+            sc(
+                binary(),
+                #{
+                    default => <<>>,
+                    desc => ?DESC("ordering_key_template")
+                }
+            )},
         {payload_template,
             sc(
                 binary(),
@@ -203,6 +219,18 @@ fields("consumer_resource_opts") ->
         fun({Field, _Sc}) -> lists:member(Field, SupportedFields) end,
         ResourceFields
     );
+fields(key_value_pair) ->
+    [
+        {key,
+            mk(binary(), #{
+                required => true,
+                validator => [
+                    emqx_resource_validator:not_empty("Key templates must not be empty")
+                ],
+                desc => ?DESC(kv_pair_key)
+            })},
+        {value, mk(binary(), #{required => true, desc => ?DESC(kv_pair_value)})}
+    ];
 fields("get_producer") ->
     emqx_bridge_schema:status_fields() ++ fields("post_producer");
 fields("post_producer") ->
@@ -218,6 +246,8 @@ fields("put_consumer") ->
 
 desc("config_producer") ->
     ?DESC("desc_config");
+desc(key_value_pair) ->
+    ?DESC("kv_pair_desc");
 desc("config_consumer") ->
     ?DESC("desc_config");
 desc("consumer_resource_opts") ->

+ 109 - 7
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -9,15 +9,20 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -type config() :: #{
+    attributes_template := [#{key := binary(), value := binary()}],
     connect_timeout := emqx_schema:duration_ms(),
     max_retries := non_neg_integer(),
+    ordering_key_template := binary(),
+    payload_template := binary(),
     pubsub_topic := binary(),
     resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
     service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
     any() => term()
 }.
 -type state() :: #{
+    attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
     client := emqx_bridge_gcp_pubsub_client:state(),
+    ordering_key_template := emqx_placeholder:tmpl_token(),
     payload_template := emqx_placeholder:tmpl_token(),
     project_id := emqx_bridge_gcp_pubsub_client:project_id(),
     pubsub_topic := binary()
@@ -57,6 +62,8 @@ on_start(InstanceId, Config0) ->
     }),
     Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
     #{
+        attributes_template := AttributesTemplate,
+        ordering_key_template := OrderingKeyTemplate,
         payload_template := PayloadTemplate,
         pubsub_topic := PubSubTopic,
         service_account_json := #{<<"project_id">> := ProjectId}
@@ -65,6 +72,8 @@ on_start(InstanceId, Config0) ->
         {ok, Client} ->
             State = #{
                 client => Client,
+                attributes_template => preproc_attributes(AttributesTemplate),
+                ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
                 payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
                 project_id => ProjectId,
                 pubsub_topic => PubSubTopic
@@ -197,14 +206,107 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
         Request, ReplyFunAndArgs, Client
     ).
 
--spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
-encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
-    Interpolated =
-        case PayloadTemplate of
-            [] -> emqx_utils_json:encode(Selected);
-            _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
+-spec encode_payload(state(), Selected :: map()) ->
+    #{
+        data := binary(),
+        attributes => #{binary() => binary()},
+        'orderingKey' => binary()
+    }.
+encode_payload(State, Selected) ->
+    #{
+        attributes_template := AttributesTemplate,
+        ordering_key_template := OrderingKeyTemplate,
+        payload_template := PayloadTemplate
+    } = State,
+    Data = render_payload(PayloadTemplate, Selected),
+    OrderingKey = render_key(OrderingKeyTemplate, Selected),
+    Attributes = proc_attributes(AttributesTemplate, Selected),
+    Payload0 = #{data => base64:encode(Data)},
+    Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
+    put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>).
+
+put_if(Acc, K, V, true) ->
+    Acc#{K => V};
+put_if(Acc, _K, _V, false) ->
+    Acc.
+
+-spec render_payload(emqx_placeholder:tmpl_token(), map()) -> binary().
+render_payload([] = _Template, Selected) ->
+    emqx_utils_json:encode(Selected);
+render_payload(Template, Selected) ->
+    render_value(Template, Selected).
+
+render_key(Template, Selected) ->
+    Opts = #{
+        return => full_binary,
+        var_trans => fun
+            (_Var, undefined) ->
+                <<>>;
+            (Var, X) when is_boolean(X) ->
+                throw({bad_value_for_key, Var, X});
+            (_Var, X) when is_binary(X); is_number(X); is_atom(X) ->
+                emqx_utils_conv:bin(X);
+            (Var, X) ->
+                throw({bad_value_for_key, Var, X})
+        end
+    },
+    try
+        emqx_placeholder:proc_tmpl(Template, Selected, Opts)
+    catch
+        throw:{bad_value_for_key, Var, X} ->
+            ?tp(
+                warning,
+                "gcp_pubsub_producer_bad_value_for_key",
+                #{
+                    placeholder => Var,
+                    value => X,
+                    action => "key ignored",
+                    hint => "only plain values like strings and numbers can be used in keys"
+                }
+            ),
+            <<>>
+    end.
+
+render_value(Template, Selected) ->
+    Opts = #{
+        return => full_binary,
+        var_trans => fun
+            (undefined) -> <<>>;
+            (X) -> emqx_utils_conv:bin(X)
+        end
+    },
+    emqx_placeholder:proc_tmpl(Template, Selected, Opts).
+
+-spec preproc_attributes([#{key := binary(), value := binary()}]) ->
+    #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}.
+preproc_attributes(AttributesTemplate) ->
+    lists:foldl(
+        fun(#{key := K, value := V}, Acc) ->
+            KT = emqx_placeholder:preproc_tmpl(K),
+            VT = emqx_placeholder:preproc_tmpl(V),
+            Acc#{KT => VT}
+        end,
+        #{},
+        AttributesTemplate
+    ).
+
+-spec proc_attributes(#{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, map()) ->
+    #{binary() => binary()}.
+proc_attributes(AttributesTemplate, Selected) ->
+    maps:fold(
+        fun(KT, VT, Acc) ->
+            K = render_key(KT, Selected),
+            case K =:= <<>> of
+                true ->
+                    Acc;
+                false ->
+                    V = render_value(VT, Selected),
+                    Acc#{K => V}
+            end
         end,
-    #{data => base64:encode(Interpolated)}.
+        #{},
+        AttributesTemplate
+    ).
 
 -spec to_pubsub_request([#{data := binary()}]) -> binary().
 to_pubsub_request(Payloads) ->

+ 282 - 4
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -63,7 +63,9 @@ single_config_tests() ->
         t_get_status_down,
         t_get_status_no_worker,
         t_get_status_timeout_calling_workers,
-        t_on_start_ehttpc_pool_already_started
+        t_on_start_ehttpc_pool_already_started,
+        t_attributes,
+        t_bad_attributes
     ].
 
 only_sync_tests() ->
@@ -212,7 +214,9 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) ->
                 Error
         end,
     ct:pal("bridge creation result: ~p", [Res]),
-    ?assertEqual(element(1, ProbeResult), element(1, Res)),
+    ?assertEqual(element(1, ProbeResult), element(1, Res), #{
+        creation_result => Res, probe_result => ProbeResult
+    }),
     case ProbeResult of
         {error, {{_, 500, _}, _, _}} -> error({bad_probe_result, ProbeResult});
         _ -> ok
@@ -456,6 +460,7 @@ assert_valid_request_headers(Headers, ServiceAccountJSON) ->
 assert_valid_request_body(Body) ->
     BodyMap = emqx_utils_json:decode(Body, [return_maps]),
     ?assertMatch(#{<<"messages">> := [_ | _]}, BodyMap),
+    ct:pal("request: ~p", [BodyMap]),
     #{<<"messages">> := Messages} = BodyMap,
     lists:map(
         fun(Msg) ->
@@ -480,6 +485,31 @@ assert_http_request(ServiceAccountJSON) ->
         error({timeout, #{mailbox => Mailbox}})
     end.
 
+receive_http_requests(ServiceAccountJSON, Opts) ->
+    Default = #{n => 1},
+    #{n := N} = maps:merge(Default, Opts),
+    lists:flatmap(fun(_) -> receive_http_request(ServiceAccountJSON) end, lists:seq(1, N)).
+
+receive_http_request(ServiceAccountJSON) ->
+    receive
+        {http, Headers, Body} ->
+            ct:pal("received publish:\n  ~p", [#{headers => Headers, body => Body}]),
+            assert_valid_request_headers(Headers, ServiceAccountJSON),
+            #{<<"messages">> := Msgs} = emqx_utils_json:decode(Body, [return_maps]),
+            lists:map(
+                fun(Msg) ->
+                    #{<<"data">> := Content64} = Msg,
+                    Content = base64:decode(Content64),
+                    Decoded = emqx_utils_json:decode(Content, [return_maps]),
+                    Msg#{<<"data">> := Decoded}
+                end,
+                Msgs
+            )
+    after 5_000 ->
+        {messages, Mailbox} = process_info(self(), messages),
+        error({timeout, #{mailbox => Mailbox}})
+    end.
+
 install_telemetry_handler(TestCase) ->
     Tid = ets:new(TestCase, [ordered_set, public]),
     HandlerId = TestCase,
@@ -585,8 +615,8 @@ t_publish_success(Config) ->
                 <<"topic">> := Topic,
                 <<"payload">> := Payload,
                 <<"metadata">> := #{<<"rule_id">> := RuleId}
-            }
-        ],
+            } = Msg
+        ] when not (is_map_key(<<"attributes">>, Msg) orelse is_map_key(<<"orderingKey">>, Msg)),
         DecodedMessages
     ),
     %% to avoid test flakiness
@@ -1524,3 +1554,251 @@ t_query_sync(Config) ->
         []
     ),
     ok.
+
+t_attributes(Config) ->
+    Name = ?config(gcp_pubsub_name, Config),
+    ServiceAccountJSON = ?config(service_account_json, Config),
+    LocalTopic = <<"t/topic">>,
+    ?check_trace(
+        begin
+            {ok, _} = create_bridge_http(
+                Config,
+                #{
+                    <<"local_topic">> => LocalTopic,
+                    <<"attributes_template">> =>
+                        [
+                            #{
+                                <<"key">> => <<"${.payload.key}">>,
+                                <<"value">> => <<"fixed_value">>
+                            },
+                            #{
+                                <<"key">> => <<"${.payload.key}2">>,
+                                <<"value">> => <<"${.payload.value}">>
+                            },
+                            #{
+                                <<"key">> => <<"fixed_key">>,
+                                <<"value">> => <<"fixed_value">>
+                            },
+                            #{
+                                <<"key">> => <<"fixed_key2">>,
+                                <<"value">> => <<"${.payload.value}">>
+                            }
+                        ],
+                    <<"ordering_key_template">> => <<"${.payload.ok}">>
+                }
+            ),
+            %% without ordering key
+            Payload0 =
+                emqx_utils_json:encode(
+                    #{
+                        <<"value">> => <<"payload_value">>,
+                        <<"key">> => <<"payload_key">>
+                    }
+                ),
+            Message0 = emqx_message:make(LocalTopic, Payload0),
+            emqx:publish(Message0),
+            DecodedMessages0 = receive_http_request(ServiceAccountJSON),
+            ?assertMatch(
+                [
+                    #{
+                        <<"attributes">> :=
+                            #{
+                                <<"fixed_key">> := <<"fixed_value">>,
+                                <<"fixed_key2">> := <<"payload_value">>,
+                                <<"payload_key">> := <<"fixed_value">>,
+                                <<"payload_key2">> := <<"payload_value">>
+                            },
+                        <<"data">> := #{
+                            <<"topic">> := _,
+                            <<"payload">> := _
+                        }
+                    } = Msg
+                ] when not is_map_key(<<"orderingKey">>, Msg),
+                DecodedMessages0
+            ),
+            %% with ordering key
+            Payload1 =
+                emqx_utils_json:encode(
+                    #{
+                        <<"value">> => <<"payload_value">>,
+                        <<"key">> => <<"payload_key">>,
+                        <<"ok">> => <<"ordering_key">>
+                    }
+                ),
+            Message1 = emqx_message:make(LocalTopic, Payload1),
+            emqx:publish(Message1),
+            DecodedMessages1 = receive_http_request(ServiceAccountJSON),
+            ?assertMatch(
+                [
+                    #{
+                        <<"attributes">> :=
+                            #{
+                                <<"fixed_key">> := <<"fixed_value">>,
+                                <<"fixed_key2">> := <<"payload_value">>,
+                                <<"payload_key">> := <<"fixed_value">>,
+                                <<"payload_key2">> := <<"payload_value">>
+                            },
+                        <<"orderingKey">> := <<"ordering_key">>,
+                        <<"data">> := #{
+                            <<"topic">> := _,
+                            <<"payload">> := _
+                        }
+                    }
+                ],
+                DecodedMessages1
+            ),
+            %% will result in empty key
+            Payload2 =
+                emqx_utils_json:encode(
+                    #{
+                        <<"value">> => <<"payload_value">>,
+                        <<"ok">> => <<"ordering_key">>
+                    }
+                ),
+            Message2 = emqx_message:make(LocalTopic, Payload2),
+            emqx:publish(Message2),
+            [DecodedMessage2] = receive_http_request(ServiceAccountJSON),
+            ?assertEqual(
+                #{
+                    <<"fixed_key">> => <<"fixed_value">>,
+                    <<"fixed_key2">> => <<"payload_value">>,
+                    <<"2">> => <<"payload_value">>
+                },
+                maps:get(<<"attributes">>, DecodedMessage2)
+            ),
+            %% ensure loading cluster override file doesn't mangle the attribute
+            %% placeholders...
+            #{<<"bridges">> := #{?BRIDGE_TYPE_BIN := #{Name := RawConf}}} =
+                emqx_config:read_override_conf(#{override_to => cluster}),
+            ?assertEqual(
+                [
+                    #{
+                        <<"key">> => <<"${.payload.key}">>,
+                        <<"value">> => <<"fixed_value">>
+                    },
+                    #{
+                        <<"key">> => <<"${.payload.key}2">>,
+                        <<"value">> => <<"${.payload.value}">>
+                    },
+                    #{
+                        <<"key">> => <<"fixed_key">>,
+                        <<"value">> => <<"fixed_value">>
+                    },
+                    #{
+                        <<"key">> => <<"fixed_key2">>,
+                        <<"value">> => <<"${.payload.value}">>
+                    }
+                ],
+                maps:get(<<"attributes_template">>, RawConf)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_bad_attributes(Config) ->
+    ServiceAccountJSON = ?config(service_account_json, Config),
+    LocalTopic = <<"t/topic">>,
+    ?check_trace(
+        begin
+            {ok, _} = create_bridge_http(
+                Config,
+                #{
+                    <<"local_topic">> => LocalTopic,
+                    <<"attributes_template">> =>
+                        [
+                            #{
+                                <<"key">> => <<"${.payload.key}">>,
+                                <<"value">> => <<"${.payload.value}">>
+                            }
+                        ],
+                    <<"ordering_key_template">> => <<"${.payload.ok}">>
+                }
+            ),
+            %% Ok: attribute value is a map or list
+            lists:foreach(
+                fun(OkValue) ->
+                    Payload0 =
+                        emqx_utils_json:encode(
+                            #{
+                                <<"ok">> => <<"ord_key">>,
+                                <<"value">> => OkValue,
+                                <<"key">> => <<"attr_key">>
+                            }
+                        ),
+                    Message0 = emqx_message:make(LocalTopic, Payload0),
+                    emqx:publish(Message0)
+                end,
+                [
+                    #{<<"some">> => <<"map">>},
+                    [1, <<"str">>, #{<<"deep">> => true}]
+                ]
+            ),
+            DecodedMessages0 = receive_http_requests(ServiceAccountJSON, #{n => 1}),
+            ?assertMatch(
+                [
+                    #{
+                        <<"attributes">> :=
+                            #{<<"attr_key">> := <<"{\"some\":\"map\"}">>},
+                        <<"orderingKey">> := <<"ord_key">>
+                    },
+                    #{
+                        <<"attributes">> :=
+                            #{<<"attr_key">> := <<"[1,\"str\",{\"deep\":true}]">>},
+                        <<"orderingKey">> := <<"ord_key">>
+                    }
+                ],
+                DecodedMessages0
+            ),
+            %% Bad: key is not a plain value
+            lists:foreach(
+                fun(BadKey) ->
+                    Payload1 =
+                        emqx_utils_json:encode(
+                            #{
+                                <<"value">> => <<"v">>,
+                                <<"key">> => BadKey,
+                                <<"ok">> => BadKey
+                            }
+                        ),
+                    Message1 = emqx_message:make(LocalTopic, Payload1),
+                    emqx:publish(Message1)
+                end,
+                [
+                    #{<<"some">> => <<"map">>},
+                    [1, <<"list">>, true],
+                    true,
+                    false
+                ]
+            ),
+            DecodedMessages1 = receive_http_request(ServiceAccountJSON),
+            lists:foreach(
+                fun(DMsg) ->
+                    ?assertNot(is_map_key(<<"orderingKey">>, DMsg), #{decoded_message => DMsg}),
+                    ?assertNot(is_map_key(<<"attributes">>, DMsg), #{decoded_message => DMsg}),
+                    ok
+                end,
+                DecodedMessages1
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ct:pal("trace:\n  ~p", [Trace]),
+            ?assertMatch(
+                [
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := #{}},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := #{}},
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := [_ | _]},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := [_ | _]},
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := true},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := true},
+                    #{placeholder := [<<"payload">>, <<"ok">>], value := false},
+                    #{placeholder := [<<"payload">>, <<"key">>], value := false}
+                ],
+                ?of_kind("gcp_pubsub_producer_bad_value_for_key", Trace)
+            ),
+            ok
+        end
+    ),
+    ok.

+ 149 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_tests.erl

@@ -0,0 +1,149 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_gcp_pubsub_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%===========================================================================
+%% Data section
+%%===========================================================================
+
+%% erlfmt-ignore
+gcp_pubsub_producer_hocon() ->
+"""
+bridges.gcp_pubsub.my_producer {
+  attributes_template = [
+    {key = \"${payload.key}\", value = fixed_value}
+    {key = \"${payload.key}2\", value = \"${.payload.value}\"}
+    {key = fixed_key, value = fixed_value}
+    {key = fixed_key2, value = \"${.payload.value}\"}
+  ]
+  connect_timeout = 15s
+  enable = false
+  local_topic = \"t/gcp/produ\"
+  max_retries = 2
+  ordering_key_template = \"${.payload.ok}\"
+  payload_template = \"${.}\"
+  pipelining = 100
+  pool_size = 8
+  pubsub_topic = my-topic
+  resource_opts {
+    batch_size = 1
+    batch_time = 0ms
+    health_check_interval = 15s
+    inflight_window = 100
+    max_buffer_bytes = 256MB
+    query_mode = async
+    request_ttl = 15s
+    start_after_created = true
+    start_timeout = 5s
+    worker_pool_size = 16
+  }
+  service_account_json {
+    auth_provider_x509_cert_url = \"https://www.googleapis.com/oauth2/v1/certs\"
+    auth_uri = \"https://accounts.google.com/o/oauth2/auth\"
+    client_email = \"test@myproject.iam.gserviceaccount.com\"
+    client_id = \"123812831923812319190\"
+    client_x509_cert_url = \"https://www.googleapis.com/robot/v1/metadata/x509/...\"
+    private_key = \"-----BEGIN PRIVATE KEY-----...\"
+    private_key_id = \"kid\"
+    project_id = myproject
+    token_uri = \"https://oauth2.googleapis.com/token\"
+    type = service_account
+  }
+}
+""".
+
+%%===========================================================================
+%% Helper functions
+%%===========================================================================
+
+parse(Hocon) ->
+    {ok, Conf} = hocon:binary(Hocon),
+    Conf.
+
+check(Conf) when is_map(Conf) ->
+    hocon_tconf:check_plain(emqx_bridge_schema, Conf).
+
+-define(validation_error(Reason, Value),
+    {emqx_bridge_schema, [
+        #{
+            kind := validation_error,
+            reason := Reason,
+            value := Value
+        }
+    ]}
+).
+
+-define(ok_config(Cfg), #{
+    <<"bridges">> :=
+        #{
+            <<"gcp_pubsub">> :=
+                #{
+                    <<"my_producer">> :=
+                        Cfg
+                }
+        }
+}).
+
+%%===========================================================================
+%% Test cases
+%%===========================================================================
+
+producer_attributes_validator_test_() ->
+    %% ensure this module is loaded when testing only this file
+    _ = emqx_bridge_enterprise:module_info(),
+    BaseConf = parse(gcp_pubsub_producer_hocon()),
+    Override = fun(Cfg) ->
+        emqx_utils_maps:deep_merge(
+            BaseConf,
+            #{
+                <<"bridges">> =>
+                    #{
+                        <<"gcp_pubsub">> =>
+                            #{<<"my_producer">> => Cfg}
+                    }
+            }
+        )
+    end,
+    [
+        {"base config",
+            ?_assertMatch(
+                ?ok_config(#{
+                    <<"attributes_template">> := [_, _, _, _]
+                }),
+                check(BaseConf)
+            )},
+        {"empty key template",
+            ?_assertThrow(
+                ?validation_error("Key templates must not be empty", _),
+                check(
+                    Override(#{
+                        <<"attributes_template">> => [
+                            #{
+                                <<"key">> => <<>>,
+                                <<"value">> => <<"some_value">>
+                            }
+                        ]
+                    })
+                )
+            )},
+        {"empty value template",
+            ?_assertMatch(
+                ?ok_config(#{
+                    <<"attributes_template">> := [_]
+                }),
+                check(
+                    Override(#{
+                        <<"attributes_template">> => [
+                            #{
+                                <<"key">> => <<"some_key">>,
+                                <<"value">> => <<>>
+                            }
+                        ]
+                    })
+                )
+            )}
+    ].

+ 2 - 2
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -1089,7 +1089,7 @@ t_strategy_key_validation(Config) ->
                     #{
                         <<"kind">> := <<"validation_error">>,
                         <<"reason">> := <<"Message key cannot be empty", _/binary>>
-                    } = Msg
+                    }
             }}},
         probe_bridge_api(
             Config,
@@ -1103,7 +1103,7 @@ t_strategy_key_validation(Config) ->
                     #{
                         <<"kind">> := <<"validation_error">>,
                         <<"reason">> := <<"Message key cannot be empty", _/binary>>
-                    } = Msg
+                    }
             }}},
         create_bridge_api(
             Config,

+ 3 - 0
changes/ee/feat-11403.en.md

@@ -0,0 +1,3 @@
+Added support for defining message attributes and ordering key templates for GCP PubSub Producer bridge.
+
+Also updated our HOCON library to fix an issue where objects in an array were being concatenated even if they lay on different lines.

+ 1 - 1
mix.exs

@@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
       # in conflict by emqtt and hocon
       {:getopt, "1.0.2", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
-      {:hocon, github: "emqx/hocon", tag: "0.39.14", override: true},
+      {:hocon, github: "emqx/hocon", tag: "0.39.16", override: true},
       {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
       {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},

+ 1 - 1
rebar.config

@@ -75,7 +75,7 @@
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
     , {getopt, "1.0.2"}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
-    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}
+    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}
     , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

+ 42 - 0
rel/i18n/emqx_bridge_gcp_pubsub.hocon

@@ -46,6 +46,18 @@ payload_template.desc:
 payload_template.label:
 """Payload template"""
 
+attributes_template.desc:
+"""The template for formatting the outgoing message attributes.  Undefined values will be rendered as empty string values.  Empty keys are removed from the attribute map."""
+
+attributes_template.label:
+"""Attributes template"""
+
+ordering_key_template.desc:
+"""The template for formatting the outgoing message ordering key.  Undefined values will be rendered as empty string values.  This value will not be added to the message if it's empty."""
+
+ordering_key_template.label:
+"""Ordering Key template"""
+
 pipelining.desc:
 """A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request."""
 
@@ -64,6 +76,36 @@ pubsub_topic.desc:
 pubsub_topic.label:
 """GCP PubSub Topic"""
 
+producer_attributes.desc:
+"""List of key-value pairs representing templates to construct the attributes for a given GCP PubSub message.  Both keys and values support the placeholder `${var_name}` notation.  Keys that are undefined or resolve to an empty string are omitted from the attribute map."""
+
+producer_attributes.label:
+"""Attributes Template"""
+
+producer_ordering_key.desc:
+"""Template for the Ordering Key of a given GCP PubSub message.  If the resolved value is undefined or an empty string, the ordering key property is omitted from the message."""
+
+producer_ordering_key.label:
+"""Ordering Key Template"""
+
+kv_pair_desc.desc:
+"""Key-value pair."""
+
+kv_pair_desc.label:
+"""Key-value pair"""
+
+kv_pair_key.desc:
+"""Key"""
+
+kv_pair_key.label:
+"""Key"""
+
+kv_pair_value.desc:
+"""Value"""
+
+kv_pair_value.label:
+"""Value"""
+
 service_account_json.desc:
 """JSON containing the GCP Service Account credentials to be used with PubSub.
 When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form.  That's the file needed."""