Sfoglia il codice sorgente

feat: refactor kinesis bridge to connector and action

Fixes:
https://emqx.atlassian.net/browse/EMQX-11461
Kjell Winblad 2 anni fa
parent
commit
fa842736d2

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -89,6 +89,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_confluent_producer_action_info,
         emqx_bridge_gcp_pubsub_producer_action_info,
         emqx_bridge_kafka_action_info,
+        emqx_bridge_kinesis_action_info,
         emqx_bridge_matrix_action_info,
         emqx_bridge_mongodb_action_info,
         emqx_bridge_influxdb_action_info,

+ 2 - 2
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src

@@ -1,13 +1,13 @@
 {application, emqx_bridge_kinesis, [
     {description, "EMQX Enterprise Amazon Kinesis Bridge"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [
         kernel,
         stdlib,
         erlcloud
     ]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_kinesis_action_info]}]},
     {modules, []},
     {links, []}
 ]}.

+ 161 - 2
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl

@@ -15,7 +15,9 @@
 ]).
 
 -export([
-    conn_bridge_examples/1
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
 ]).
 
 %%-------------------------------------------------------------------------------------------------
@@ -28,6 +30,37 @@ namespace() ->
 roots() ->
     [].
 
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    emqx_connector_schema:api_fields(
+        Field,
+        kinesis,
+        connector_config_fields()
+    );
+fields(action) ->
+    {kinesis,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(?MODULE, kinesis_action)),
+            #{
+                desc => <<"Kinesis Action Config">>,
+                required => false
+            }
+        )};
+fields(action_parameters) ->
+    fields(producer);
+fields(kinesis_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            hoconsc:ref(?MODULE, action_parameters),
+            #{
+                required => true,
+                desc => ?DESC("action_parameters")
+            }
+        )
+    );
 fields("config_producer") ->
     emqx_bridge_schema:common_bridge_fields() ++
         fields("resource_opts") ++
@@ -134,12 +167,38 @@ fields("get_producer") ->
 fields("post_producer") ->
     [type_field_producer(), name_field() | fields("config_producer")];
 fields("put_producer") ->
-    fields("config_producer").
+    fields("config_producer");
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        connector_config_fields() ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields("put_bridge_v2") ->
+    fields(kinesis_action);
+fields("get_bridge_v2") ->
+    fields(kinesis_action);
+fields("post_bridge_v2") ->
+    fields("post", kinesis, kinesis_action).
+
+fields("post", Type, StructName) ->
+    [type_field(Type), name_field() | fields(StructName)].
+
+type_field(Type) ->
+    {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
 
 desc("config_producer") ->
     ?DESC("desc_config");
 desc("creation_opts") ->
     ?DESC(emqx_resource_schema, "creation_opts");
+desc("config_connector") ->
+    ?DESC("config_connector");
+desc(kinesis_action) ->
+    ?DESC("kinesis_action");
+desc(action_parameters) ->
+    ?DESC("action_parameters");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(_) ->
     undefined.
 
@@ -153,6 +212,103 @@ conn_bridge_examples(Method) ->
         }
     ].
 
+connector_examples(Method) ->
+    [
+        #{
+            <<"kinesis">> => #{
+                summary => <<"Kinesis Connector">>,
+                value => values({Method, connector})
+            }
+        }
+    ].
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"kinesis">> => #{
+                summary => <<"Kinesis Action">>,
+                value => values({Method, bridge_v2_producer})
+            }
+        }
+    ].
+
+values({get, connector}) ->
+    maps:merge(
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ],
+            actions => [<<"my_action">>]
+        },
+        values({post, connector})
+    );
+values({get, Type}) ->
+    maps:merge(
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        },
+        values({post, Type})
+    );
+values({post, connector}) ->
+    maps:merge(
+        #{
+            name => <<"my_kinesis_connector">>,
+            type => <<"kinesis">>
+        },
+        values(common_config)
+    );
+values({post, Type}) ->
+    maps:merge(
+        #{
+            name => <<"my_kinesis_action">>,
+            type => <<"kinesis">>
+        },
+        values({put, Type})
+    );
+values({put, bridge_v2_producer}) ->
+    values(bridge_v2_producer);
+values({put, connector}) ->
+    values(common_config);
+values({put, Type}) ->
+    maps:merge(values(common_config), values(Type));
+values(bridge_v2_producer) ->
+    #{
+        enable => true,
+        connector => <<"my_kinesis_connector">>,
+        parameters => values(producer_values),
+        resource_opts => #{
+            <<"batch_size">> => 100,
+            <<"inflight_window">> => 100,
+            <<"max_buffer_bytes">> => <<"256MB">>,
+            <<"request_ttl">> => <<"45s">>
+        }
+    };
+values(common_config) ->
+    #{
+        <<"enable">> => true,
+        <<"aws_access_key_id">> => <<"your_access_key">>,
+        <<"aws_secret_access_key">> => <<"aws_secret_key">>,
+        <<"endpoint">> => <<"http://localhost:4566">>,
+        <<"max_retries">> => 2,
+        <<"pool_size">> => 8
+    };
+values(producer_values) ->
+    #{
+        <<"partition_key">> => <<"any_key">>,
+        <<"payload_template">> => <<"${.}">>,
+        <<"stream_name">> => <<"my_stream">>
+    }.
+
 values(producer, _Method) ->
     #{
         aws_access_key_id => <<"aws_access_key_id">>,
@@ -174,6 +330,9 @@ values(producer, _Method) ->
 %% Helper fns
 %%-------------------------------------------------------------------------------------------------
 
+connector_config_fields() ->
+    fields(connector_config).
+
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 
 mk(Type, Meta) -> hoconsc:mk(Type, Meta).

+ 22 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_action_info.erl

@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+bridge_v1_type_name() -> kinesis_producer.
+
+action_type_name() -> kinesis.
+
+connector_type_name() -> kinesis.
+
+schema_module() -> emqx_bridge_kinesis.

+ 33 - 18
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl

@@ -11,9 +11,7 @@
 -behaviour(gen_server).
 
 -type state() :: #{
-    instance_id := resource_id(),
-    partition_key := binary(),
-    stream_name := binary()
+    instance_id := resource_id()
 }.
 -type record() :: {Data :: binary(), PartitionKey :: binary()}.
 
@@ -23,7 +21,8 @@
 -export([
     start_link/1,
     connection_status/1,
-    query/2
+    connection_status/2,
+    query/3
 ]).
 
 %% gen_server callbacks
@@ -56,8 +55,16 @@ connection_status(Pid) ->
             {error, timeout}
     end.
 
-query(Pid, Records) ->
-    gen_server:call(Pid, {query, Records}, infinity).
+connection_status(Pid, StreamName) ->
+    try
+        gen_server:call(Pid, {connection_status, StreamName}, ?HEALTH_CHECK_TIMEOUT)
+    catch
+        _:_ ->
+            {error, timeout}
+    end.
+
+query(Pid, Records, StreamName) ->
+    gen_server:call(Pid, {query, Records, StreamName}, infinity).
 
 %%--------------------------------------------------------------------
 %% @doc
@@ -72,13 +79,12 @@ start_link(Options) ->
 %%%===================================================================
 
 %% Initialize kinesis connector
--spec init(emqx_bridge_kinesis_impl_producer:config()) -> {ok, state()}.
+-spec init(emqx_bridge_kinesis_impl_producer:config_connector()) ->
+    {ok, state()} | {stop, Reason :: term()}.
 init(#{
     aws_access_key_id := AwsAccessKey,
     aws_secret_access_key := AwsSecretAccessKey,
     endpoint := Endpoint,
-    partition_key := PartitionKey,
-    stream_name := StreamName,
     max_retries := MaxRetries,
     instance_id := InstanceId
 }) ->
@@ -93,9 +99,7 @@ init(#{
             }
         ),
     State = #{
-        instance_id => InstanceId,
-        partition_key => PartitionKey,
-        stream_name => StreamName
+        instance_id => InstanceId
     },
     %% TODO: teach `erlcloud` to to accept 0-arity closures as passwords.
     ok = erlcloud_config:configure(
@@ -124,18 +128,19 @@ init(#{
             {stop, Reason}
     end.
 
-handle_call(connection_status, _From, #{stream_name := StreamName} = State) ->
+handle_call({connection_status, StreamName}, _From, State) ->
+    Status = get_status(StreamName),
+    {reply, Status, State};
+handle_call(connection_status, _From, State) ->
     Status =
-        case erlcloud_kinesis:describe_stream(StreamName) of
-            {ok, _} ->
+        case erlcloud_kinesis:list_streams() of
+            {ok, _ListStreamsResult} ->
                 {ok, connected};
-            {error, {<<"ResourceNotFoundException">>, _}} ->
-                {error, unhealthy_target};
             Error ->
                 {error, Error}
         end,
     {reply, Status, State};
-handle_call({query, Records}, _From, #{stream_name := StreamName} = State) ->
+handle_call({query, Records, StreamName}, _From, State) ->
     Result = do_query(StreamName, Records),
     {reply, Result, State};
 handle_call(_Request, _From, State) ->
@@ -158,6 +163,16 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 
+get_status(StreamName) ->
+    case erlcloud_kinesis:describe_stream(StreamName) of
+        {ok, _} ->
+            {ok, connected};
+        {error, {<<"ResourceNotFoundException">>, _}} ->
+            {error, unhealthy_target};
+        Error ->
+            {error, Error}
+    end.
+
 -spec do_query(binary(), [record()]) ->
     {ok, jsx:json_term() | binary()}
     | {error, {unrecoverable_error, term()}}

+ 111 - 26
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl

@@ -13,27 +13,20 @@
     "Kinesis stream is invalid. Please check if the stream exist in Kinesis account."
 ).
 
--type config() :: #{
+-type config_connector() :: #{
     aws_access_key_id := binary(),
     aws_secret_access_key := emqx_secret:t(binary()),
     endpoint := binary(),
-    stream_name := binary(),
-    partition_key := binary(),
-    payload_template := binary(),
     max_retries := non_neg_integer(),
     pool_size := non_neg_integer(),
     instance_id => resource_id(),
     any() => term()
 }.
--type templates() :: #{
-    partition_key := list(),
-    send_message := list()
-}.
 -type state() :: #{
     pool_name := resource_id(),
-    templates := templates()
+    installed_channels := map()
 }.
--export_type([config/0]).
+-export_type([config_connector/0]).
 
 %% `emqx_resource' API
 -export([
@@ -42,7 +35,11 @@
     on_stop/2,
     on_query/3,
     on_batch_query/3,
-    on_get_status/2
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
 -export([
@@ -55,7 +52,7 @@
 
 callback_mode() -> always_sync.
 
--spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+-spec on_start(resource_id(), config_connector()) -> {ok, state()} | {error, term()}.
 on_start(
     InstanceId,
     #{
@@ -72,10 +69,9 @@ on_start(
         {config, Config},
         {pool_size, PoolSize}
     ],
-    Templates = parse_template(Config),
     State = #{
         pool_name => InstanceId,
-        templates => Templates
+        installed_channels => #{}
     },
 
     case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
@@ -123,31 +119,111 @@ on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
             disconnected
     end.
 
+on_add_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels
+    } = OldState,
+    ChannelId,
+    ChannelConfig
+) ->
+    {ok, ChannelState} = create_channel_state(ChannelConfig),
+    NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
+    %% Update state
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+create_channel_state(
+    #{parameters := Parameters} = _ChannelConfig
+) ->
+    #{
+        stream_name := StreamName,
+        partition_key := PartitionKey
+    } = Parameters,
+    {ok, #{
+        templates => parse_template(Parameters),
+        stream_name => StreamName,
+        partition_key => PartitionKey
+    }}.
+
+on_remove_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels
+    } = OldState,
+    ChannelId
+) ->
+    NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
+    %% Update state
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+on_get_channel_status(
+    _ResId,
+    ChannelId,
+    #{
+        pool_name := PoolName,
+        installed_channels := Channels
+    } = State
+) ->
+    #{stream_name := StreamName} = maps:get(ChannelId, Channels),
+    case
+        emqx_resource_pool:health_check_workers(
+            PoolName,
+            {emqx_bridge_kinesis_connector_client, connection_status, [StreamName]},
+            ?HEALTH_CHECK_TIMEOUT,
+            #{return_values => true}
+        )
+    of
+        {ok, Values} ->
+            AllOk = lists:all(fun(S) -> S =:= {ok, connected} end, Values),
+            case AllOk of
+                true ->
+                    connected;
+                false ->
+                    Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values),
+                    case Unhealthy of
+                        true -> {disconnected, {unhealthy_target, ?TOPIC_MESSAGE}};
+                        false -> disconnected
+                    end
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "kinesis_producer_get_status_failed",
+                state => State,
+                reason => Reason
+            }),
+            disconnected
+    end.
+
+on_get_channels(ResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ResId).
+
 -spec on_query(
     resource_id(),
-    {send_message, map()},
+    {channel_id(), map()},
     state()
 ) ->
     {ok, map()}
     | {error, {recoverable_error, term()}}
     | {error, term()}.
-on_query(ResourceId, {send_message, Message}, State) ->
-    Requests = [{send_message, Message}],
+on_query(ResourceId, {ChannelId, Message}, State) ->
+    Requests = [{ChannelId, Message}],
     ?tp(emqx_bridge_kinesis_impl_producer_sync_query, #{message => Message}),
-    do_send_requests_sync(ResourceId, Requests, State).
+    do_send_requests_sync(ResourceId, Requests, State, ChannelId).
 
 -spec on_batch_query(
     resource_id(),
-    [{send_message, map()}],
+    [{channel_id(), map()}],
     state()
 ) ->
     {ok, map()}
     | {error, {recoverable_error, term()}}
     | {error, term()}.
 %% we only support batch insert
-on_batch_query(ResourceId, [{send_message, _} | _] = Requests, State) ->
+on_batch_query(ResourceId, [{ChannelId, _} | _] = Requests, State) ->
     ?tp(emqx_bridge_kinesis_impl_producer_sync_batch_query, #{requests => Requests}),
-    do_send_requests_sync(ResourceId, Requests, State).
+    do_send_requests_sync(ResourceId, Requests, State, ChannelId).
 
 connect(Opts) ->
     Options = proplists:get_value(config, Opts),
@@ -159,8 +235,9 @@ connect(Opts) ->
 
 -spec do_send_requests_sync(
     resource_id(),
-    [{send_message, map()}],
-    state()
+    [{channel_id(), map()}],
+    state(),
+    channel_id()
 ) ->
     {ok, jsx:json_term() | binary()}
     | {error, {recoverable_error, term()}}
@@ -171,12 +248,20 @@ connect(Opts) ->
 do_send_requests_sync(
     InstanceId,
     Requests,
-    #{pool_name := PoolName, templates := Templates}
+    #{
+        pool_name := PoolName,
+        installed_channels := InstalledChannels
+    } = _State,
+    ChannelId
 ) ->
+    #{
+        templates := Templates,
+        stream_name := StreamName
+    } = maps:get(ChannelId, InstalledChannels),
     Records = render_records(Requests, Templates),
     Result = ecpool:pick_and_do(
         PoolName,
-        {emqx_bridge_kinesis_connector_client, query, [Records]},
+        {emqx_bridge_kinesis_connector_client, query, [Records, StreamName]},
         no_handover
     ),
     handle_result(Result, Requests, InstanceId).
@@ -239,7 +324,7 @@ render_records(Items, Templates) ->
 render_messages([], _Templates, RenderedMsgs) ->
     RenderedMsgs;
 render_messages(
-    [{send_message, Msg} | Others],
+    [{_, Msg} | Others],
     {MsgTemplate, PartitionKeyTemplate} = Templates,
     RenderedMsgs
 ) ->

+ 75 - 33
apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl

@@ -13,6 +13,7 @@
 
 -define(BRIDGE_TYPE, kinesis_producer).
 -define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>).
+-define(BRIDGE_V2_TYPE_BIN, <<"kinesis">>).
 -define(KINESIS_PORT, 4566).
 -define(KINESIS_ACCESS_KEY, "aws_access_key_id").
 -define(KINESIS_SECRET_KEY, "aws_secret_access_key").
@@ -48,7 +49,7 @@ init_per_suite(Config) ->
     [
         {proxy_host, ProxyHost},
         {proxy_port, ProxyPort},
-        {kinesis_port, ?KINESIS_PORT},
+        {kinesis_port, list_to_integer(os:getenv("KINESIS_PORT", integer_to_list(?KINESIS_PORT)))},
         {kinesis_secretfile, SecretFile},
         {proxy_name, ProxyName}
         | Config
@@ -116,7 +117,7 @@ generate_config(Config0) ->
             }
         ),
     ErlcloudConfig = erlcloud_kinesis:new("access_key", "secret", Host, Port, Scheme ++ "://"),
-    ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name),
+    ResourceId = connector_resource_id(?BRIDGE_V2_TYPE_BIN, Name),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name),
     [
         {kinesis_name, Name},
@@ -129,6 +130,9 @@ generate_config(Config0) ->
         | Config0
     ].
 
+connector_resource_id(BridgeType, Name) ->
+    <<"connector:", BridgeType/binary, ":", Name/binary>>.
+
 kinesis_config(Config) ->
     QueryMode = proplists:get_value(query_mode, Config, async),
     Scheme = proplists:get_value(connection_scheme, Config, "http"),
@@ -505,7 +509,7 @@ t_start_failed_then_fix(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
-    ResourceId = ?config(resource_id, Config),
+    Name = ?config(kinesis_name, Config),
     emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         ct:sleep(1000),
         ?wait_async_action(
@@ -517,7 +521,7 @@ t_start_failed_then_fix(Config) ->
     ?retry(
         _Sleep1 = 1_000,
         _Attempts1 = 30,
-        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+        ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name))
     ),
     ok.
 
@@ -538,40 +542,58 @@ t_stop(Config) ->
     ok.
 
 t_get_status_ok(Config) ->
-    ResourceId = ?config(resource_id, Config),
+    Name = ?config(kinesis_name, Config),
     {ok, _} = create_bridge(Config),
-    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)),
     ok.
 
 t_create_unhealthy(Config) ->
     delete_stream(Config),
-    ResourceId = ?config(resource_id, Config),
+    Name = ?config(kinesis_name, Config),
     {ok, _} = create_bridge(Config),
-    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
     ?assertMatch(
-        {ok, _, #{error := {unhealthy_target, _}}},
-        emqx_resource_manager:lookup_cached(ResourceId)
+        #{
+            status := disconnected,
+            error := {unhealthy_target, _}
+        },
+        emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)
     ),
     ok.
 
 t_get_status_unhealthy(Config) ->
-    delete_stream(Config),
-    ResourceId = ?config(resource_id, Config),
+    Name = ?config(kinesis_name, Config),
     {ok, _} = create_bridge(Config),
-    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
     ?assertMatch(
-        {ok, _, #{error := {unhealthy_target, _}}},
-        emqx_resource_manager:lookup_cached(ResourceId)
+        #{
+            status := connected
+        },
+        emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)
+    ),
+    delete_stream(Config),
+    ?retry(
+        100,
+        100,
+        fun() ->
+            ?assertMatch(
+                #{
+                    status := disconnected,
+                    error := {unhealthy_target, _}
+                },
+                emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)
+            )
+        end
     ),
     ok.
 
 t_publish_success(Config) ->
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    Name = ?config(kinesis_name, Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name),
+    assert_empty_metrics(ActionId),
     ShardIt = get_shard_iterator(Config),
     Payload = <<"payload">>,
     Message = emqx_message:make(?TOPIC, Payload),
@@ -590,7 +612,7 @@ t_publish_success(Config) ->
             retried => 0,
             success => 1
         },
-        ResourceId
+        ActionId
     ),
     Record = wait_record(Config, ShardIt, 100, 10),
     ?assertEqual(Payload, proplists:get_value(<<"Data">>, Record)),
@@ -599,6 +621,7 @@ t_publish_success(Config) ->
 t_publish_success_with_template(Config) ->
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    Name = ?config(kinesis_name, Config),
     Overrides =
         #{
             <<"payload_template">> => <<"${payload.data}">>,
@@ -607,7 +630,8 @@ t_publish_success_with_template(Config) ->
     ?assertMatch({ok, _}, create_bridge(Config, Overrides)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name),
+    assert_empty_metrics(ActionId),
     ShardIt = get_shard_iterator(Config),
     Payload = <<"{\"key\":\"my_key\", \"data\":\"my_data\"}">>,
     Message = emqx_message:make(?TOPIC, Payload),
@@ -626,7 +650,7 @@ t_publish_success_with_template(Config) ->
             retried => 0,
             success => 1
         },
-        ResourceId
+        ActionId
     ),
     Record = wait_record(Config, ShardIt, 100, 10),
     ?assertEqual(<<"my_data">>, proplists:get_value(<<"Data">>, Record)),
@@ -635,10 +659,12 @@ t_publish_success_with_template(Config) ->
 t_publish_multiple_msgs_success(Config) ->
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    Name = ?config(kinesis_name, Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name),
+    assert_empty_metrics(ActionId),
     ShardIt = get_shard_iterator(Config),
     lists:foreach(
         fun(I) ->
@@ -675,17 +701,19 @@ t_publish_multiple_msgs_success(Config) ->
             retried => 0,
             success => 10
         },
-        ResourceId
+        ActionId
     ),
     ok.
 
 t_publish_unhealthy(Config) ->
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    Name = ?config(kinesis_name, Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name),
+    assert_empty_metrics(ActionId),
     ShardIt = get_shard_iterator(Config),
     Payload = <<"payload">>,
     Message = emqx_message:make(?TOPIC, Payload),
@@ -709,22 +737,26 @@ t_publish_unhealthy(Config) ->
             retried => 0,
             success => 0
         },
-        ResourceId
+        ActionId
     ),
-    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
     ?assertMatch(
-        {ok, _, #{error := {unhealthy_target, _}}},
-        emqx_resource_manager:lookup_cached(ResourceId)
+        #{
+            status := disconnected,
+            error := {unhealthy_target, _}
+        },
+        emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)
     ),
     ok.
 
 t_publish_big_msg(Config) ->
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    Name = ?config(kinesis_name, Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name),
+    assert_empty_metrics(ActionId),
     % Maximum size is 1MB. Using 1MB + 1 here.
     Payload = binary:copy(<<"a">>, 1 * 1024 * 1024 + 1),
     Message = emqx_message:make(?TOPIC, Payload),
@@ -743,7 +775,7 @@ t_publish_big_msg(Config) ->
             retried => 0,
             success => 0
         },
-        ResourceId
+        ActionId
     ),
     ok.
 
@@ -754,15 +786,20 @@ t_publish_connection_down(Config0) ->
     ProxyName = ?config(proxy_name, Config),
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
+    Name = ?config(kinesis_name, Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     ?retry(
         _Sleep1 = 1_000,
         _Attempts1 = 30,
-        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+        ?assertMatch(
+            #{status := connected},
+            emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)
+        )
     ),
     emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
-    assert_empty_metrics(ResourceId),
+    ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name),
+    assert_empty_metrics(ActionId),
     ShardIt = get_shard_iterator(Config),
     Payload = <<"payload">>,
     Message = emqx_message:make(?TOPIC, Payload),
@@ -784,7 +821,10 @@ t_publish_connection_down(Config0) ->
     ?retry(
         _Sleep3 = 1_000,
         _Attempts3 = 20,
-        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+        ?assertMatch(
+            #{status := connected},
+            emqx_bridge_v2:health_check(?BRIDGE_V2_TYPE_BIN, Name)
+        )
     ),
     Record = wait_record(Config, ShardIt, 2000, 10),
     %% to avoid test flakiness
@@ -802,7 +842,7 @@ t_publish_connection_down(Config0) ->
             success => 1,
             retried_success => 1
         },
-        ResourceId
+        ActionId
     ),
     Data = proplists:get_value(<<"Data">>, Record),
     ?assertEqual(Payload, Data),
@@ -880,9 +920,11 @@ t_empty_payload_template(Config) ->
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
     Removes = [<<"payload_template">>],
+    Name = ?config(kinesis_name, Config),
     ?assertMatch({ok, _}, create_bridge(Config, #{}, Removes)),
     {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
     emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    ActionId = emqx_bridge_v2:id(?BRIDGE_V2_TYPE_BIN, Name),
     assert_empty_metrics(ResourceId),
     ShardIt = get_shard_iterator(Config),
     Payload = <<"payload">>,
@@ -902,7 +944,7 @@ t_empty_payload_template(Config) ->
             retried => 0,
             success => 1
         },
-        ResourceId
+        ActionId
     ),
     Record = wait_record(Config, ShardIt, 100, 10),
     Data = proplists:get_value(<<"Data">>, Record),

+ 12 - 0
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -30,6 +30,8 @@ resource_type(gcp_pubsub_producer) ->
     emqx_bridge_gcp_pubsub_impl_producer;
 resource_type(kafka_producer) ->
     emqx_bridge_kafka_impl_producer;
+resource_type(kinesis) ->
+    emqx_bridge_kinesis_impl_producer;
 resource_type(matrix) ->
     emqx_postgresql;
 resource_type(mongodb) ->
@@ -112,6 +114,14 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {kinesis,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_kinesis, "config_connector")),
+                #{
+                    desc => <<"Kinesis Connector Config">>,
+                    required => false
+                }
+            )},
         {matrix,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
@@ -224,6 +234,7 @@ schema_modules() ->
         emqx_bridge_confluent_producer,
         emqx_bridge_gcp_pubsub_producer_schema,
         emqx_bridge_kafka,
+        emqx_bridge_kinesis,
         emqx_bridge_matrix,
         emqx_bridge_mongodb,
         emqx_bridge_influxdb,
@@ -255,6 +266,7 @@ api_schemas(Method) ->
             Method ++ "_connector"
         ),
         api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
         api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
         api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),

+ 2 - 0
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -131,6 +131,8 @@ connector_type_to_bridge_types(gcp_pubsub_producer) ->
     [gcp_pubsub, gcp_pubsub_producer];
 connector_type_to_bridge_types(kafka_producer) ->
     [kafka, kafka_producer];
+connector_type_to_bridge_types(kinesis) ->
+    [kinesis, kinesis_producer];
 connector_type_to_bridge_types(matrix) ->
     [matrix];
 connector_type_to_bridge_types(mongodb) ->

+ 5 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -1247,6 +1247,11 @@ channel_status({?status_connecting, Error}) ->
         status => ?status_connecting,
         error => Error
     };
+channel_status({?status_disconnected, Error}) ->
+    #{
+        status => ?status_disconnected,
+        error => Error
+    };
 channel_status(?status_disconnected) ->
     #{
         status => ?status_disconnected,

+ 19 - 0
rel/i18n/emqx_bridge_kinesis.hocon

@@ -82,4 +82,23 @@ max_retries.desc:
 max_retries.label:
 """Max Retries"""
 
+action_parameters.desc:
+"""Action specific configuration."""
+
+action_parameters.label:
+"""Action"""
+
+kinesis_action.desc:
+"""Configuration for Kinesis Action"""
+
+kinesis_action.label:
+"""Kinesis Action Configuration"""
+
+
+config_connector.desc:
+"""Configuration for a Kinesis Client."""
+
+config_connector.label:
+"""Kinesis Client Configuration"""
+
 }