Explorar o código

Merge pull request #11261 from paulozulato/feat-bridge-kinesis

Implement Amazon Kinesis Producer bridge
Paulo Zulato %!s(int64=2) %!d(string=hai) anos
pai
achega
27630ca215

+ 1 - 0
.ci/docker-compose-file/.env

@@ -9,6 +9,7 @@ DYNAMO_TAG=1.21.0
 CASSANDRA_TAG=3.11.6
 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z
 OPENTS_TAG=9aa7f88
+KINESIS_TAG=2.1
 
 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
 SQLSERVER_TAG=2019-CU19-ubuntu-20.04

+ 12 - 0
.ci/docker-compose-file/docker-compose-kinesis.yaml

@@ -0,0 +1,12 @@
+version: '3.9'
+
+services:
+  kinesis:
+    container_name: kinesis
+    image: localstack/localstack:2.1
+    environment:
+      - KINESIS_ERROR_PROBABILITY=0.0
+      - KINESIS_LATENCY=0
+    restart: always
+    networks:
+      - emqx_bridge

+ 2 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -49,6 +49,8 @@ services:
       - 38080:38080
       # HStreamDB
       - 15670:5670
+      # Kinesis
+      - 4566:4566
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -161,5 +161,11 @@
     "listen": "0.0.0.0:6570",
     "upstream": "hstreamdb:6570",
     "enabled": true
+  },
+  {
+    "name": "kinesis",
+    "listen": "0.0.0.0:4566",
+    "upstream": "kinesis:4566",
+    "enabled": true
   }
 ]

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -88,7 +88,8 @@
     T == sqlserver;
     T == pulsar_producer;
     T == oracle;
-    T == iotdb
+    T == iotdb;
+    T == kinesis_producer
 ).
 
 -define(ROOT_KEY, bridges).

+ 2 - 0
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -374,6 +374,8 @@ parse_confs(<<"kafka">> = _Type, Name, Conf) ->
     Conf#{bridge_name => Name};
 parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) ->
     Conf#{bridge_name => Name};
+parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) ->
+    Conf#{bridge_name => Name};
 parse_confs(_Type, _Name, Conf) ->
     Conf.
 

+ 20 - 4
apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl

@@ -49,7 +49,8 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_pulsar, <<"pulsar_producer">>, Method ++ "_producer"),
         api_ref(emqx_bridge_oracle, <<"oracle">>, Method),
         api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method),
-        api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method)
+        api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method),
+        api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer")
     ].
 
 schema_modules() ->
@@ -74,7 +75,8 @@ schema_modules() ->
         emqx_bridge_pulsar,
         emqx_bridge_oracle,
         emqx_bridge_iotdb,
-        emqx_bridge_rabbitmq
+        emqx_bridge_rabbitmq,
+        emqx_bridge_kinesis
     ].
 
 examples(Method) ->
@@ -119,7 +121,8 @@ resource_type(opents) -> emqx_bridge_opents_connector;
 resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
 resource_type(oracle) -> emqx_oracle;
 resource_type(iotdb) -> emqx_bridge_iotdb_impl;
-resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector.
+resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
+resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer.
 
 fields(bridges) ->
     [
@@ -199,7 +202,8 @@ fields(bridges) ->
     ] ++ kafka_structs() ++ pulsar_structs() ++ gcp_pubsub_structs() ++ mongodb_structs() ++
         influxdb_structs() ++
         redis_structs() ++
-        pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs().
+        pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++
+        kinesis_structs().
 
 mongodb_structs() ->
     [
@@ -365,6 +369,18 @@ rabbitmq_structs() ->
             )}
     ].
 
+kinesis_structs() ->
+    [
+        {kinesis_producer,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_kinesis, "config_producer")),
+                #{
+                    desc => <<"Amazon Kinesis Producer Bridge Config">>,
+                    required => false
+                }
+            )}
+    ].
+
 api_ref(Module, Type, Method) ->
     {Type, ref(Module, Method)}.
 

+ 1 - 1
apps/emqx_bridge_dynamo/rebar.config

@@ -1,6 +1,6 @@
 %% -*- mode: erlang; -*-
 {erl_opts, [debug_info]}.
-{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}}
+{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 94 - 0
apps/emqx_bridge_kinesis/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 22 - 0
apps/emqx_bridge_kinesis/README.md

@@ -0,0 +1,22 @@
+# Amazon Kinesis Data Integration Bridge
+
+This application houses the Amazon Kinesis Producer data
+integration bridge for EMQX Enterprise Edition. It provides the means to
+connect to Amazon Kinesis Data Streams and publish messages to it.
+
+# Documentation links
+
+For more information about Amazon Kinesis Data Streams, please see its
+[official site](https://aws.amazon.com/kinesis/data-streams/).
+
+# Configurations
+
+Please see [Ingest Data into Kinesis](https://docs.emqx.com/en/enterprise/v5.1/data-integration/data-bridge-kinesis.html) for more detailed info.
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

+ 2 - 0
apps/emqx_bridge_kinesis/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+kinesis

+ 11 - 0
apps/emqx_bridge_kinesis/rebar.config

@@ -0,0 +1,11 @@
+%% -*- mode: erlang; -*-
+{erl_opts, [debug_info]}.
+{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
+
+{shell, [
+    {apps, [emqx_bridge_kinesis]}
+]}.

+ 13 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src

@@ -0,0 +1,13 @@
+{application, emqx_bridge_kinesis, [
+    {description, "EMQX Enterprise Amazon Kinesis Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        erlcloud
+    ]},
+    {env, []},
+    {modules, []},
+    {links, []}
+]}.

+ 167 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl

@@ -0,0 +1,167 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis).
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% hocon_schema API
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() ->
+    "bridge_kinesis".
+
+roots() ->
+    [].
+
+fields("config_producer") ->
+    emqx_bridge_schema:common_bridge_fields() ++
+        emqx_resource_schema:fields("resource_opts") ++
+        fields(connector_config) ++ fields(producer);
+fields(connector_config) ->
+    [
+        {aws_access_key_id,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("aws_access_key_id")
+                }
+            )},
+        {aws_secret_access_key,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("aws_secret_access_key"),
+                    sensitive => true
+                }
+            )},
+        {endpoint,
+            mk(
+                binary(),
+                #{
+                    default => <<"https://kinesis.us-east-1.amazonaws.com">>,
+                    desc => ?DESC("endpoint")
+                }
+            )},
+        {max_retries,
+            mk(
+                non_neg_integer(),
+                #{
+                    required => false,
+                    default => 2,
+                    desc => ?DESC("max_retries")
+                }
+            )},
+        {pool_size,
+            sc(
+                pos_integer(),
+                #{
+                    default => 8,
+                    desc => ?DESC("pool_size")
+                }
+            )}
+    ];
+fields(producer) ->
+    [
+        {payload_template,
+            sc(
+                binary(),
+                #{
+                    default => <<>>,
+                    desc => ?DESC("payload_template")
+                }
+            )},
+        {local_topic,
+            sc(
+                binary(),
+                #{
+                    desc => ?DESC("local_topic")
+                }
+            )},
+        {stream_name,
+            sc(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("stream_name")
+                }
+            )},
+        {partition_key,
+            sc(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("partition_key")
+                }
+            )}
+    ];
+fields("get_producer") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_producer");
+fields("post_producer") ->
+    [type_field_producer(), name_field() | fields("config_producer")];
+fields("put_producer") ->
+    fields("config_producer").
+
+desc("config_producer") ->
+    ?DESC("desc_config");
+desc(_) ->
+    undefined.
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"kinesis_producer">> => #{
+                summary => <<"Amazon Kinesis Producer Bridge">>,
+                value => values(producer, Method)
+            }
+        }
+    ].
+
+values(producer, _Method) ->
+    #{
+        aws_access_key_id => <<"aws_access_key_id">>,
+        aws_secret_access_key => <<"******">>,
+        endpoint => <<"https://kinesis.us-east-1.amazonaws.com">>,
+        max_retries => 3,
+        stream_name => <<"stream_name">>,
+        partition_key => <<"key">>,
+        resource_opts => #{
+            worker_pool_size => 1,
+            health_check_interval => 15000,
+            query_mode => async,
+            inflight_window => 100,
+            max_buffer_bytes => 100 * 1024 * 1024
+        }
+    }.
+
+%%-------------------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------------------
+
+sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+mk(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+enum(OfSymbols) -> hoconsc:enum(OfSymbols).
+
+type_field_producer() ->
+    {type, mk(enum([kinesis_producer]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 178 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl

@@ -0,0 +1,178 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis_connector_client).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("erlcloud/include/erlcloud_aws.hrl").
+
+-behaviour(gen_server).
+
+-type state() :: #{
+    instance_id := resource_id(),
+    partition_key := binary(),
+    stream_name := binary()
+}.
+-type record() :: {Data :: binary(), PartitionKey :: binary()}.
+
+-define(DEFAULT_PORT, 443).
+
+%% API
+-export([
+    start_link/1,
+    connection_status/1,
+    query/2
+]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+-ifdef(TEST).
+-export([execute/2]).
+-endif.
+
+%% The default timeout for Kinesis API calls is 10 seconds,
+%% but this value for `gen_server:call` is 5s,
+%% so we should adjust timeout for `gen_server:call`
+-define(HEALTH_CHECK_TIMEOUT, 15000).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+connection_status(Pid) ->
+    try
+        gen_server:call(Pid, connection_status, ?HEALTH_CHECK_TIMEOUT)
+    catch
+        _:_ ->
+            {error, timeout}
+    end.
+
+query(Pid, Records) ->
+    gen_server:call(Pid, {query, Records}, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts Bridge which communicates to Amazon Kinesis Data Streams
+%% @end
+%%--------------------------------------------------------------------
+start_link(Options) ->
+    gen_server:start_link(?MODULE, Options, []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%% Initialize kinesis connector
+-spec init(emqx_bridge_kinesis_impl_producer:config()) -> {ok, state()}.
+init(#{
+    aws_access_key_id := AwsAccessKey,
+    aws_secret_access_key := AwsSecretAccessKey,
+    endpoint := Endpoint,
+    partition_key := PartitionKey,
+    stream_name := StreamName,
+    max_retries := MaxRetries,
+    instance_id := InstanceId
+}) ->
+    process_flag(trap_exit, true),
+
+    #{scheme := Scheme, hostname := Host, port := Port} =
+        emqx_schema:parse_server(
+            Endpoint,
+            #{
+                default_port => ?DEFAULT_PORT,
+                supported_schemes => ["http", "https"]
+            }
+        ),
+    State = #{
+        instance_id => InstanceId,
+        partition_key => PartitionKey,
+        stream_name => StreamName
+    },
+    New =
+        fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) ->
+            Config0 = erlcloud_kinesis:new(
+                AccessKeyID,
+                SecretAccessKey,
+                HostAddr,
+                HostPort,
+                ConnectionScheme ++ "://"
+            ),
+            Config0#aws_config{retry_num = MaxRetries}
+        end,
+    erlcloud_config:configure(
+        to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New
+    ),
+    {ok, State}.
+
+handle_call(connection_status, _From, #{stream_name := StreamName} = State) ->
+    Status =
+        case erlcloud_kinesis:describe_stream(StreamName) of
+            {ok, _} ->
+                {ok, connected};
+            {error, {<<"ResourceNotFoundException">>, _}} ->
+                {error, unhealthy_target};
+            Error ->
+                {error, Error}
+        end,
+    {reply, Status, State};
+handle_call({query, Records}, _From, #{stream_name := StreamName} = State) ->
+    Result = do_query(StreamName, Records),
+    {reply, Result, State};
+handle_call(_Request, _From, State) ->
+    {reply, {error, unknown_call}, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(Reason, #{instance_id := InstanceId} = _State) ->
+    ?tp(kinesis_stop, #{instance_id => InstanceId, reason => Reason}),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+-spec do_query(binary(), [record()]) ->
+    {ok, jsx:json_term() | binary()}
+    | {error, {unrecoverable_error, term()}}
+    | {error, term()}.
+do_query(StreamName, Records) ->
+    try
+        execute(put_record, {StreamName, Records})
+    catch
+        _Type:Reason ->
+            {error, {unrecoverable_error, {invalid_request, Reason}}}
+    end.
+
+-spec execute(put_record, {binary(), [record()]}) ->
+    {ok, jsx:json_term() | binary()}
+    | {error, term()}.
+execute(put_record, {StreamName, [{Data, PartitionKey}] = Record}) ->
+    Result = erlcloud_kinesis:put_record(StreamName, PartitionKey, Data),
+    ?tp(kinesis_put_record, #{records => Record, result => Result}),
+    Result;
+execute(put_record, {StreamName, Items}) when is_list(Items) ->
+    Result = erlcloud_kinesis:put_records(StreamName, Items),
+    ?tp(kinesis_put_record, #{records => Items, result => Result}),
+    Result.
+
+-spec to_str(list() | binary()) -> list().
+to_str(List) when is_list(List) ->
+    List;
+to_str(Bin) when is_binary(Bin) ->
+    erlang:binary_to_list(Bin).

+ 247 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl

@@ -0,0 +1,247 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis_impl_producer).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(HEALTH_CHECK_TIMEOUT, 15000).
+-define(TOPIC_MESSAGE,
+    "Kinesis stream is invalid. Please check if the stream exist in Kinesis account."
+).
+
+-type config() :: #{
+    aws_access_key_id := binary(),
+    aws_secret_access_key := binary(),
+    endpoint := binary(),
+    stream_name := binary(),
+    partition_key := binary(),
+    payload_template := binary(),
+    max_retries := non_neg_integer(),
+    pool_size := non_neg_integer(),
+    instance_id => resource_id(),
+    any() => term()
+}.
+-type templates() :: #{
+    partition_key := list(),
+    send_message := list()
+}.
+-type state() :: #{
+    pool_name := resource_id(),
+    templates := templates()
+}.
+-export_type([config/0]).
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2
+]).
+
+-export([
+    connect/1
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+callback_mode() -> always_sync.
+
+-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+on_start(
+    InstanceId,
+    #{
+        pool_size := PoolSize
+    } = Config0
+) ->
+    ?SLOG(info, #{
+        msg => "starting_kinesis_bridge",
+        connector => InstanceId,
+        config => redact(Config0)
+    }),
+    Config = Config0#{instance_id => InstanceId},
+    Options = [
+        {config, Config},
+        {pool_size, PoolSize}
+    ],
+    Templates = parse_template(Config),
+    State = #{
+        pool_name => InstanceId,
+        templates => Templates
+    },
+
+    case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
+        ok ->
+            ?tp(emqx_bridge_kinesis_impl_producer_start_ok, #{config => Config}),
+            {ok, State};
+        Error ->
+            ?tp(emqx_bridge_kinesis_impl_producer_start_failed, #{config => Config}),
+            Error
+    end.
+
+-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, _State) ->
+    emqx_resource_pool:stop(InstanceId).
+
+-spec on_get_status(resource_id(), state()) ->
+    connected | disconnected | {disconnected, state(), {unhealthy_target, string()}}.
+on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
+    case
+        emqx_resource_pool:health_check_workers(
+            Pool,
+            {emqx_bridge_kinesis_connector_client, connection_status, []},
+            ?HEALTH_CHECK_TIMEOUT,
+            #{return_values => true}
+        )
+    of
+        {ok, Values} ->
+            AllOk = lists:all(fun(S) -> S =:= {ok, connected} end, Values),
+            case AllOk of
+                true ->
+                    connected;
+                false ->
+                    Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values),
+                    case Unhealthy of
+                        true -> {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
+                        false -> disconnected
+                    end
+            end;
+        {error, _} ->
+            disconnected
+    end.
+
+-spec on_query(
+    resource_id(),
+    {send_message, map()},
+    state()
+) ->
+    {ok, map()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+on_query(ResourceId, {send_message, Message}, State) ->
+    Requests = [{send_message, Message}],
+    ?tp(emqx_bridge_kinesis_impl_producer_sync_query, #{message => Message}),
+    do_send_requests_sync(ResourceId, Requests, State).
+
+-spec on_batch_query(
+    resource_id(),
+    [{send_message, map()}],
+    state()
+) ->
+    {ok, map()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+%% we only support batch insert
+on_batch_query(ResourceId, [{send_message, _} | _] = Requests, State) ->
+    ?tp(emqx_bridge_kinesis_impl_producer_sync_batch_query, #{requests => Requests}),
+    do_send_requests_sync(ResourceId, Requests, State).
+
+connect(Opts) ->
+    Options = proplists:get_value(config, Opts),
+    emqx_bridge_kinesis_connector_client:start_link(Options).
+
+%%-------------------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------------------
+
+-spec do_send_requests_sync(
+    resource_id(),
+    [{send_message, map()}],
+    state()
+) ->
+    {ok, jsx:json_term() | binary()}
+    | {error, {recoverable_error, term()}}
+    | {error, {unrecoverable_error, {invalid_request, term()}}}
+    | {error, {unrecoverable_error, {unhealthy_target, string()}}}
+    | {error, {unrecoverable_error, term()}}
+    | {error, term()}.
+do_send_requests_sync(
+    InstanceId,
+    Requests,
+    #{pool_name := PoolName, templates := Templates}
+) ->
+    Records = render_records(Requests, Templates),
+    Result = ecpool:pick_and_do(
+        PoolName,
+        {emqx_bridge_kinesis_connector_client, query, [Records]},
+        no_handover
+    ),
+    handle_result(Result, Requests, InstanceId).
+
+handle_result({ok, _} = Result, _Requests, _InstanceId) ->
+    Result;
+handle_result({error, {<<"ResourceNotFoundException">>, _} = Reason}, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {unrecoverable_error, {unhealthy_target, ?TOPIC_MESSAGE}}};
+handle_result(
+    {error, {<<"ProvisionedThroughputExceededException">>, _} = Reason}, Requests, InstanceId
+) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {recoverable_error, Reason}};
+handle_result({error, {<<"InvalidArgumentException">>, _} = Reason}, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {unrecoverable_error, Reason}};
+handle_result({error, {econnrefused = Reason, _}}, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {recoverable_error, Reason}};
+handle_result({error, Reason} = Error, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    Error.
+
+parse_template(Config) ->
+    #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config,
+    Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate},
+    maps:map(fun(_K, V) -> emqx_placeholder:preproc_tmpl(V) end, Templates).
+
+render_records(Items, Templates) ->
+    PartitionKeyTemplate = maps:get(partition_key, Templates),
+    MsgTemplate = maps:get(send_message, Templates),
+    render_messages(Items, {MsgTemplate, PartitionKeyTemplate}, []).
+
+render_messages([], _Templates, RenderedMsgs) ->
+    RenderedMsgs;
+render_messages(
+    [{send_message, Msg} | Others],
+    {MsgTemplate, PartitionKeyTemplate} = Templates,
+    RenderedMsgs
+) ->
+    Data = emqx_placeholder:proc_tmpl(MsgTemplate, Msg),
+    PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTemplate, Msg),
+    RenderedMsg = {Data, PartitionKey},
+    render_messages(Others, Templates, [RenderedMsg | RenderedMsgs]).
+
+redact(Config) ->
+    emqx_utils:redact(Config, fun(Any) -> Any =:= aws_secret_access_key end).

+ 817 - 0
apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl

@@ -0,0 +1,817 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis_impl_producer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(PRODUCER, emqx_bridge_kinesis_impl_producer).
+-define(BRIDGE_TYPE, kinesis_producer).
+-define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>).
+-define(KINESIS_PORT, 4566).
+-define(TOPIC, <<"t/topic">>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, with_batch},
+        {group, without_batch}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {with_batch, TCs},
+        {without_batch, TCs}
+    ].
+
+init_per_suite(Config) ->
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    ProxyName = "kinesis",
+    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
+    {ok, _} = application:ensure_all_started(emqx_connector),
+    emqx_mgmt_api_test_util:init_suite(),
+    [
+        {proxy_host, ProxyHost},
+        {proxy_port, ProxyPort},
+        {kinesis_port, ?KINESIS_PORT},
+        {proxy_name, ProxyName}
+        | Config
+    ].
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
+    _ = application:stop(emqx_connector),
+    ok.
+
+init_per_group(with_batch, Config) ->
+    [{batch_size, 100} | Config];
+init_per_group(without_batch, Config) ->
+    [{batch_size, 1} | Config];
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(TestCase, Config0) ->
+    ok = snabbkaffe:start_trace(),
+    ProxyHost = ?config(proxy_host, Config0),
+    ProxyPort = ?config(proxy_port, Config0),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    TimeTrap =
+        case TestCase of
+            t_wrong_server -> 60;
+            _ -> 30
+        end,
+    ct:timetrap({seconds, TimeTrap}),
+    delete_all_bridges(),
+    Tid = install_telemetry_handler(TestCase),
+    put(telemetry_table, Tid),
+    Config = generate_config(Config0),
+    create_stream(Config),
+    [{telemetry_table, Tid} | Config].
+
+end_per_testcase(_TestCase, Config) ->
+    ok = snabbkaffe:stop(),
+    delete_all_bridges(),
+    delete_stream(Config),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+generate_config(Config0) ->
+    #{
+        name := Name,
+        config_string := ConfigString,
+        kinesis_config := KinesisConfig
+    } = kinesis_config(Config0),
+    Endpoint = map_get(<<"endpoint">>, KinesisConfig),
+    #{scheme := Scheme, hostname := Host, port := Port} =
+        emqx_schema:parse_server(
+            Endpoint,
+            #{
+                default_port => 443,
+                supported_schemes => ["http", "https"]
+            }
+        ),
+    ErlcloudConfig = erlcloud_kinesis:new("access_key", "secret", Host, Port, Scheme ++ "://"),
+    ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name),
+    [
+        {kinesis_name, Name},
+        {connection_scheme, Scheme},
+        {kinesis_config, KinesisConfig},
+        {kinesis_config_string, ConfigString},
+        {resource_id, ResourceId},
+        {bridge_id, BridgeId},
+        {erlcloud_config, ErlcloudConfig}
+        | Config0
+    ].
+
+kinesis_config(Config) ->
+    QueryMode = proplists:get_value(query_mode, Config, async),
+    Scheme = proplists:get_value(connection_scheme, Config, "http"),
+    ProxyHost = proplists:get_value(proxy_host, Config),
+    KinesisPort = proplists:get_value(kinesis_port, Config),
+    BatchSize = proplists:get_value(batch_size, Config, 100),
+    BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>),
+    PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"),
+    StreamName = proplists:get_value(stream_name, Config, <<"mystream">>),
+    PartitionKey = proplists:get_value(partition_key, Config, <<"key">>),
+    MaxRetries = proplists:get_value(max_retries, Config, 3),
+    GUID = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>,
+    ConfigString =
+        io_lib:format(
+            "bridges.kinesis_producer.~s {\n"
+            "  enable = true\n"
+            "  aws_access_key_id = \"aws_access_key_id\"\n"
+            "  aws_secret_access_key = \"aws_secret_access_key\"\n"
+            "  endpoint = \"~s://~s:~b\"\n"
+            "  stream_name = \"~s\"\n"
+            "  partition_key = \"~s\"\n"
+            "  payload_template = \"~s\"\n"
+            "  max_retries = ~b\n"
+            "  pool_size = 1\n"
+            "  resource_opts = {\n"
+            "    health_check_interval = \"3s\"\n"
+            "    request_ttl = 30s\n"
+            "    resume_interval = 1s\n"
+            "    metrics_flush_interval = \"700ms\"\n"
+            "    worker_pool_size = 1\n"
+            "    query_mode = ~s\n"
+            "    batch_size = ~b\n"
+            "    batch_time = \"~s\"\n"
+            "  }\n"
+            "}\n",
+            [
+                Name,
+                Scheme,
+                ProxyHost,
+                KinesisPort,
+                StreamName,
+                PartitionKey,
+                PayloadTemplate,
+                MaxRetries,
+                QueryMode,
+                BatchSize,
+                BatchTime
+            ]
+        ),
+    #{
+        name => Name,
+        config_string => ConfigString,
+        kinesis_config => parse_and_check(ConfigString, Name)
+    }.
+
+parse_and_check(ConfigString, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = <<"kinesis_producer">>,
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+delete_all_bridges() ->
+    ct:pal("deleting all bridges"),
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ).
+
+delete_bridge(Config) ->
+    Type = ?BRIDGE_TYPE,
+    Name = ?config(kinesis_name, Config),
+    ct:pal("deleting bridge ~p", [{Type, Name}]),
+    emqx_bridge:remove(Type, Name).
+
+create_bridge_http(Config) ->
+    create_bridge_http(Config, _KinesisConfigOverrides = #{}).
+
+create_bridge_http(Config, KinesisConfigOverrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(kinesis_name, Config),
+    KinesisConfig0 = ?config(kinesis_config, Config),
+    KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides),
+    Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    ProbeResult = emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params),
+    ct:pal("creating bridge (via http): ~p", [Params]),
+    ct:pal("probe result: ~p", [ProbeResult]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+            {ok, Res0} -> {ok, emqx_utils_json:decode(Res0, [return_maps])};
+            Error -> Error
+        end,
+    ct:pal("bridge creation result: ~p", [Res]),
+    ?assertEqual(element(1, ProbeResult), element(1, Res)),
+    Res.
+
+create_bridge(Config) ->
+    create_bridge(Config, _KinesisConfigOverrides = #{}).
+
+create_bridge(Config, KinesisConfigOverrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(kinesis_name, Config),
+    KinesisConfig0 = ?config(kinesis_config, Config),
+    KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides),
+    ct:pal("creating bridge: ~p", [KinesisConfig]),
+    Res = emqx_bridge:create(TypeBin, Name, KinesisConfig),
+    ct:pal("bridge creation result: ~p", [Res]),
+    Res.
+
+create_rule_and_action_http(Config) ->
+    BridgeId = ?config(bridge_id, Config),
+    Params = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"", ?TOPIC/binary, "\"">>,
+        actions => [BridgeId]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+create_stream(Config) ->
+    KinesisConfig = ?config(kinesis_config, Config),
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    StreamName = map_get(<<"stream_name">>, KinesisConfig),
+    {ok, _} = application:ensure_all_started(erlcloud),
+    delete_stream(StreamName, ErlcloudConfig),
+    {ok, _} = erlcloud_kinesis:create_stream(StreamName, 1, ErlcloudConfig),
+    ?retry(
+        _Sleep = 100,
+        _Attempts = 10,
+        begin
+            {ok, [{<<"StreamDescription">>, StreamInfo}]} =
+                erlcloud_kinesis:describe_stream(StreamName, ErlcloudConfig),
+            ?assertEqual(
+                <<"ACTIVE">>,
+                proplists:get_value(<<"StreamStatus">>, StreamInfo)
+            )
+        end
+    ),
+    ok.
+
+delete_stream(Config) ->
+    KinesisConfig = ?config(kinesis_config, Config),
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    StreamName = map_get(<<"stream_name">>, KinesisConfig),
+    {ok, _} = application:ensure_all_started(erlcloud),
+    delete_stream(StreamName, ErlcloudConfig),
+    ok.
+
+delete_stream(StreamName, ErlcloudConfig) ->
+    case erlcloud_kinesis:delete_stream(StreamName, ErlcloudConfig) of
+        {ok, _} ->
+            ?retry(
+                _Sleep = 100,
+                _Attempts = 10,
+                ?assertMatch(
+                    {error, {<<"ResourceNotFoundException">>, _}},
+                    erlcloud_kinesis:describe_stream(StreamName, ErlcloudConfig)
+                )
+            );
+        _ ->
+            ok
+    end,
+    ok.
+
+wait_record(Config, ShardIt, Timeout, Attempts) ->
+    [Record] = wait_records(Config, ShardIt, 1, Timeout, Attempts),
+    Record.
+
+wait_records(Config, ShardIt, Count, Timeout, Attempts) ->
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    ?retry(
+        Timeout,
+        Attempts,
+        begin
+            {ok, Ret} = erlcloud_kinesis:get_records(ShardIt, ErlcloudConfig),
+            Records = proplists:get_value(<<"Records">>, Ret),
+            Count = length(Records),
+            Records
+        end
+    ).
+
+get_shard_iterator(Config) ->
+    get_shard_iterator(Config, 1).
+
+get_shard_iterator(Config, Index) ->
+    KinesisConfig = ?config(kinesis_config, Config),
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    StreamName = map_get(<<"stream_name">>, KinesisConfig),
+    {ok, [{<<"Shards">>, Shards}]} = erlcloud_kinesis:list_shards(StreamName, ErlcloudConfig),
+    Shard = lists:nth(Index, lists:sort(Shards)),
+    ShardId = proplists:get_value(<<"ShardId">>, Shard),
+    {ok, [{<<"ShardIterator">>, ShardIt}]} =
+        erlcloud_kinesis:get_shard_iterator(StreamName, ShardId, <<"LATEST">>, ErlcloudConfig),
+    ShardIt.
+
+install_telemetry_handler(TestCase) ->
+    Tid = ets:new(TestCase, [ordered_set, public]),
+    HandlerId = TestCase,
+    TestPid = self(),
+    _ = telemetry:attach_many(
+        HandlerId,
+        emqx_resource_metrics:events(),
+        fun(EventName, Measurements, Metadata, _Config) ->
+            Data = #{
+                name => EventName,
+                measurements => Measurements,
+                metadata => Metadata
+            },
+            ets:insert(Tid, {erlang:monotonic_time(), Data}),
+            TestPid ! {telemetry, Data},
+            ok
+        end,
+        unused_config
+    ),
+    emqx_common_test_helpers:on_exit(fun() ->
+        telemetry:detach(HandlerId),
+        ets:delete(Tid)
+    end),
+    Tid.
+
+current_metrics(ResourceId) ->
+    Mapping = metrics_mapping(),
+    maps:from_list([
+        {Metric, F(ResourceId)}
+     || {Metric, F} <- maps:to_list(Mapping)
+    ]).
+
+metrics_mapping() ->
+    #{
+        dropped => fun emqx_resource_metrics:dropped_get/1,
+        dropped_expired => fun emqx_resource_metrics:dropped_expired_get/1,
+        dropped_other => fun emqx_resource_metrics:dropped_other_get/1,
+        dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1,
+        dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1,
+        dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1,
+        late_reply => fun emqx_resource_metrics:late_reply_get/1,
+        failed => fun emqx_resource_metrics:failed_get/1,
+        inflight => fun emqx_resource_metrics:inflight_get/1,
+        matched => fun emqx_resource_metrics:matched_get/1,
+        queuing => fun emqx_resource_metrics:queuing_get/1,
+        retried => fun emqx_resource_metrics:retried_get/1,
+        retried_failed => fun emqx_resource_metrics:retried_failed_get/1,
+        retried_success => fun emqx_resource_metrics:retried_success_get/1,
+        success => fun emqx_resource_metrics:success_get/1
+    }.
+
+assert_metrics(ExpectedMetrics, ResourceId) ->
+    Mapping = metrics_mapping(),
+    Metrics =
+        lists:foldl(
+            fun(Metric, Acc) ->
+                #{Metric := Fun} = Mapping,
+                Value = Fun(ResourceId),
+                Acc#{Metric => Value}
+            end,
+            #{},
+            maps:keys(ExpectedMetrics)
+        ),
+    CurrentMetrics = current_metrics(ResourceId),
+    TelemetryTable = get(telemetry_table),
+    RecordedEvents = ets:tab2list(TelemetryTable),
+    ?assertEqual(ExpectedMetrics, Metrics, #{
+        current_metrics => CurrentMetrics, recorded_events => RecordedEvents
+    }),
+    ok.
+
+assert_empty_metrics(ResourceId) ->
+    Mapping = metrics_mapping(),
+    ExpectedMetrics =
+        lists:foldl(
+            fun(Metric, Acc) ->
+                Acc#{Metric => 0}
+            end,
+            #{},
+            maps:keys(Mapping)
+        ),
+    assert_metrics(ExpectedMetrics, ResourceId).
+
+wait_telemetry_event(TelemetryTable, EventName, ResourceId) ->
+    wait_telemetry_event(TelemetryTable, EventName, ResourceId, #{timeout => 5_000, n_events => 1}).
+
+wait_telemetry_event(
+    TelemetryTable,
+    EventName,
+    ResourceId,
+    _Opts = #{
+        timeout := Timeout,
+        n_events := NEvents
+    }
+) ->
+    wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName).
+
+wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when NEvents =< 0 ->
+    ok;
+wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
+    receive
+        {telemetry, #{name := [_, _, EventName], measurements := #{counter_inc := Inc}} = Event} ->
+            ct:pal("telemetry event: ~p", [Event]),
+            wait_n_events(TelemetryTable, ResourceId, NEvents - Inc, Timeout, EventName)
+    after Timeout ->
+        RecordedEvents = ets:tab2list(TelemetryTable),
+        CurrentMetrics = current_metrics(ResourceId),
+        ct:pal("recorded events: ~p", [RecordedEvents]),
+        ct:pal("current metrics: ~p", [CurrentMetrics]),
+        error({timeout_waiting_for_telemetry, EventName})
+    end.
+
+wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
+    Events = receive_all_events(GaugeName, Timeout),
+    case length(Events) > 0 andalso lists:last(Events) of
+        #{measurements := #{gauge_set := ExpectedValue}} ->
+            ok;
+        #{measurements := #{gauge_set := Value}} ->
+            ct:pal("events: ~p", [Events]),
+            ct:fail(
+                "gauge ~p didn't reach expected value ~p; last value: ~p",
+                [GaugeName, ExpectedValue, Value]
+            );
+        false ->
+            ct:pal("no ~p gauge events received!", [GaugeName])
+    end.
+
+receive_all_events(EventName, Timeout) ->
+    receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []).
+
+receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
+    lists:reverse(Acc);
+receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
+    receive
+        {telemetry, #{name := [_, _, EventName]} = Event} ->
+            receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
+    after Timeout ->
+        lists:reverse(Acc)
+    end.
+
+to_str(List) when is_list(List) ->
+    List;
+to_str(Bin) when is_binary(Bin) ->
+    erlang:binary_to_list(Bin);
+to_str(Int) when is_integer(Int) ->
+    erlang:integer_to_list(Int).
+
+to_bin(Str) when is_list(Str) ->
+    erlang:list_to_binary(Str).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_create_via_http(Config) ->
+    ?assertMatch({ok, _}, create_bridge_http(Config)),
+    ok.
+
+t_start_failed_then_fix(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = ?config(resource_id, Config),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ct:sleep(1000),
+        ?wait_async_action(
+            create_bridge(Config),
+            #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_failed},
+            20_000
+        )
+    end),
+    ?retry(
+        _Sleep1 = 1_000,
+        _Attempts1 = 30,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    ok.
+
+t_stop(Config) ->
+    Name = ?config(kinesis_name, Config),
+    {ok, _} = create_bridge(Config),
+    ?check_trace(
+        ?wait_async_action(
+            emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
+            #{?snk_kind := kinesis_stop},
+            5_000
+        ),
+        fun(Trace) ->
+            ?assertMatch([_], ?of_kind(kinesis_stop, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_get_status_ok(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    {ok, _} = create_bridge(Config),
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+    ok.
+
+t_create_unhealthy(Config) ->
+    delete_stream(Config),
+    ResourceId = ?config(resource_id, Config),
+    {ok, _} = create_bridge(Config),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertMatch(
+        {ok, _, #{error := {unhealthy_target, _}}},
+        emqx_resource_manager:lookup_cached(ResourceId)
+    ),
+    ok.
+
+t_get_status_unhealthy(Config) ->
+    delete_stream(Config),
+    ResourceId = ?config(resource_id, Config),
+    {ok, _} = create_bridge(Config),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertMatch(
+        {ok, _, #{error := {unhealthy_target, _}}},
+        emqx_resource_manager:lookup_cached(ResourceId)
+    ),
+    ok.
+
+t_publish_success(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"payload">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    emqx:publish(Message),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 1
+        },
+        ResourceId
+    ),
+    Record = wait_record(Config, ShardIt, 100, 10),
+    ?assertEqual(Payload, proplists:get_value(<<"Data">>, Record)),
+    ok.
+
+t_publish_success_with_template(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    Overrides =
+        #{
+            <<"payload_template">> => <<"${payload.data}">>,
+            <<"partition_key">> => <<"${payload.key}">>
+        },
+    ?assertMatch({ok, _}, create_bridge(Config, Overrides)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"{\"key\":\"my_key\", \"data\":\"my_data\"}">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    emqx:publish(Message),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 1
+        },
+        ResourceId
+    ),
+    Record = wait_record(Config, ShardIt, 100, 10),
+    ?assertEqual(<<"my_data">>, proplists:get_value(<<"Data">>, Record)),
+    ok.
+
+t_publish_multiple_msgs_success(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    lists:foreach(
+        fun(I) ->
+            Payload = "payload_" ++ to_str(I),
+            Message = emqx_message:make(?TOPIC, Payload),
+            emqx:publish(Message)
+        end,
+        lists:seq(1, 10)
+    ),
+    Records = wait_records(Config, ShardIt, 10, 100, 10),
+    ReceivedPayloads =
+        lists:map(fun(Record) -> proplists:get_value(<<"Data">>, Record) end, Records),
+    lists:foreach(
+        fun(I) ->
+            ExpectedPayload = to_bin("payload_" ++ to_str(I)),
+            ?assertEqual(
+                {ExpectedPayload, true},
+                {ExpectedPayload, lists:member(ExpectedPayload, ReceivedPayloads)}
+            )
+        end,
+        lists:seq(1, 10)
+    ),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 10,
+            queuing => 0,
+            retried => 0,
+            success => 10
+        },
+        ResourceId
+    ),
+    ok.
+
+t_publish_unhealthy(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"payload">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    delete_stream(Config),
+    emqx:publish(Message),
+    ?assertError(
+        {badmatch, {error, {<<"ResourceNotFoundException">>, _}}},
+        wait_record(Config, ShardIt, 100, 10)
+    ),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, failed, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 1,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 0
+        },
+        ResourceId
+    ),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertMatch(
+        {ok, _, #{error := {unhealthy_target, _}}},
+        emqx_resource_manager:lookup_cached(ResourceId)
+    ),
+    ok.
+
+t_publish_big_msg(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    % Maximum size is 1MB. Using 1MB + 1 here.
+    Payload = binary:copy(<<"a">>, 1 * 1024 * 1024 + 1),
+    Message = emqx_message:make(?TOPIC, Payload),
+    emqx:publish(Message),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, failed, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 1,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 0
+        },
+        ResourceId
+    ),
+    ok.
+
+t_publish_connection_down(Config0) ->
+    Config = generate_config([{max_retries, 2} | Config0]),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    ?retry(
+        _Sleep1 = 1_000,
+        _Attempts1 = 30,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"payload">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    Kind =
+        case proplists:get_value(batch_size, Config) of
+            1 -> emqx_bridge_kinesis_impl_producer_sync_query;
+            _ -> emqx_bridge_kinesis_impl_producer_sync_batch_query
+        end,
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ct:sleep(1000),
+        ?wait_async_action(
+            emqx:publish(Message),
+            #{?snk_kind := Kind},
+            5_000
+        ),
+        ct:sleep(1000)
+    end),
+    % Wait for reconnection.
+    ?retry(
+        _Sleep3 = 1_000,
+        _Attempts3 = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    Record = wait_record(Config, ShardIt, 2000, 10),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, retried_success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 1,
+            success => 1,
+            retried_success => 1
+        },
+        ResourceId
+    ),
+    Data = proplists:get_value(<<"Data">>, Record),
+    ?assertEqual(Payload, Data),
+    ok.
+
+t_wrong_server(Config) ->
+    Name = ?config(kinesis_name, Config),
+    ResourceId = ?config(resource_id, Config),
+    Overrides =
+        #{
+            <<"max_retries">> => 0,
+            <<"endpoint">> => <<"https://wrong_server:12345">>,
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"60s">>
+            }
+        },
+    ?wait_async_action(
+        create_bridge(Config, Overrides),
+        #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_ok},
+        30_000
+    ),
+    ?assertEqual({error, timeout}, emqx_resource_manager:health_check(ResourceId)),
+    emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
+    emqx_bridge_resource:remove(?BRIDGE_TYPE, Name),
+    ok.

+ 1 - 1
apps/emqx_s3/rebar.config

@@ -1,6 +1,6 @@
 {deps, [
     {emqx, {path, "../../apps/emqx"}},
-    {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}},
+    {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}},
     {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 

+ 1 - 0
changes/ee/feat-11261.en.md

@@ -0,0 +1 @@
+Implemented Amazon Kinesis Data Streams producer data integration bridge .

+ 4 - 2
mix.exs

@@ -191,7 +191,8 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_ft,
       :emqx_s3,
       :emqx_schema_registry,
-      :emqx_enterprise
+      :emqx_enterprise,
+      :emqx_bridge_kinesis
     ])
   end
 
@@ -423,7 +424,8 @@ defmodule EMQXUmbrella.MixProject do
           emqx_schema_registry: :permanent,
           emqx_eviction_agent: :permanent,
           emqx_node_rebalance: :permanent,
-          emqx_ft: :permanent
+          emqx_ft: :permanent,
+          emqx_bridge_kinesis: :permanent
         ],
         else: [
           emqx_telemetry: :permanent

+ 3 - 1
rebar.config.erl

@@ -104,6 +104,7 @@ is_community_umbrella_app("apps/emqx_ft") -> false;
 is_community_umbrella_app("apps/emqx_s3") -> false;
 is_community_umbrella_app("apps/emqx_schema_registry") -> false;
 is_community_umbrella_app("apps/emqx_enterprise") -> false;
+is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false;
 is_community_umbrella_app(_) -> true.
 
 is_jq_supported() ->
@@ -491,7 +492,8 @@ relx_apps_per_edition(ee) ->
         emqx_schema_registry,
         emqx_eviction_agent,
         emqx_node_rebalance,
-        emqx_ft
+        emqx_ft,
+        emqx_bridge_kinesis
     ];
 relx_apps_per_edition(ce) ->
     [emqx_telemetry].

+ 85 - 0
rel/i18n/emqx_bridge_kinesis.hocon

@@ -0,0 +1,85 @@
+emqx_bridge_kinesis {
+
+config_enable.desc:
+"""Enable or disable this bridge"""
+
+config_enable.label:
+"""Enable Or Disable Bridge"""
+
+desc_config.desc:
+"""Configuration for an Amazon Kinesis bridge."""
+
+desc_config.label:
+"""Amazon Kinesis Bridge Configuration"""
+
+desc_name.desc:
+"""Bridge name."""
+
+desc_name.label:
+"""Bridge Name"""
+
+desc_type.desc:
+"""The Bridge Type"""
+
+desc_type.label:
+"""Bridge Type"""
+
+pool_size.desc:
+"""The pool size."""
+
+pool_size.label:
+"""Pool Size"""
+
+local_topic.desc:
+"""The MQTT topic filter to be forwarded to Amazon Kinesis. All MQTT `PUBLISH` messages with the topic
+matching the `local_topic` will be forwarded.</br>
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also `local_topic` is
+configured, then both the data got from the rule and the MQTT messages that match `local_topic`
+will be forwarded."""
+
+local_topic.label:
+"""Local Topic"""
+
+payload_template.desc:
+"""The template for formatting the outgoing messages.  If undefined, will send all the available context in JSON format."""
+
+payload_template.label:
+"""Payload template"""
+
+aws_access_key_id.desc:
+"""Access Key ID for connecting to Amazon Kinesis."""
+
+aws_access_key_id.label:
+"""AWS Access Key ID"""
+
+aws_secret_access_key.desc:
+"""AWS Secret Access Key for connecting to Amazon Kinesis."""
+
+aws_secret_access_key.label:
+"""AWS Secret Access Key"""
+
+endpoint.desc:
+"""The url of Amazon Kinesis endpoint."""
+
+endpoint.label:
+"""Amazon Kinesis Endpoint"""
+
+stream_name.desc:
+"""The Amazon Kinesis Stream to publish messages to."""
+
+stream_name.label:
+"""Amazon Kinesis Stream"""
+
+partition_key.desc:
+"""The Amazon Kinesis Partition Key associated to published message. Placeholders in format of ${var} are supported."""
+
+partition_key.label:
+"""Partition key"""
+
+max_retries.desc:
+"""Max retry times if an error occurs when sending a request."""
+
+max_retries.label:
+"""Max Retries"""
+
+}

+ 3 - 0
scripts/ct/run.sh

@@ -219,6 +219,9 @@ for dep in ${CT_DEPS}; do
         hstreamdb)
             FILES+=( '.ci/docker-compose-file/docker-compose-hstreamdb.yaml' )
             ;;
+        kinesis)
+            FILES+=( '.ci/docker-compose-file/docker-compose-kinesis.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1