Explorar o código

feat: refactor DynamoDB bridge to connector and action

Fixes:
https://emqx.atlassian.net/browse/EMQX-11456
Kjell Winblad hai 1 ano
pai
achega
6561d989d6

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -88,6 +88,7 @@ hard_coded_action_info_modules_ee() ->
     [
         emqx_bridge_azure_event_hub_action_info,
         emqx_bridge_confluent_producer_action_info,
+        emqx_bridge_dynamo_action_info,
         emqx_bridge_gcp_pubsub_consumer_action_info,
         emqx_bridge_gcp_pubsub_producer_action_info,
         emqx_bridge_kafka_action_info,

+ 1 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src

@@ -8,7 +8,7 @@
         emqx_resource,
         erlcloud
     ]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_dynamo_action_info]}]},
     {modules, []},
     {links, []}
 ]}.

+ 139 - 2
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl

@@ -11,7 +11,6 @@
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
 -export([
-    conn_bridge_examples/1,
     values/1
 ]).
 
@@ -22,6 +21,14 @@
     desc/1
 ]).
 
+-export([
+    bridge_v2_examples/1,
+    connector_examples/1,
+    conn_bridge_examples/1
+]).
+
+-define(CONNECTOR_TYPE, dynamo).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
 -define(DEFAULT_TEMPLATE, <<>>).
 
 %% -------------------------------------------------------------------------------------------------
@@ -59,12 +66,134 @@ values(_Method) ->
         }
     }.
 
+connector_examples(Method) ->
+    [
+        #{
+            <<"dynamo">> =>
+                #{
+                    summary => <<"DynamoDB Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?CONNECTOR_TYPE, connector_values()
+                    )
+                }
+        }
+    ].
+
+connector_values() ->
+    #{
+        <<"enable">> => true,
+        <<"url">> => <<"http://127.0.0.1:8000">>,
+        <<"aws_access_key_id">> => <<"root">>,
+        <<"aws_secret_access_key">> => <<"******">>,
+        <<"pool_size">> => 8,
+        <<"resource_opts">> =>
+            #{
+                <<"health_check_interval">> => <<"15s">>,
+                <<"start_timeout">> => <<"5s">>
+            }
+    }.
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"dynamo">> =>
+                #{
+                    summary => <<"DynamoDB Action">>,
+                    value => emqx_bridge_v2_schema:action_values(
+                        Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
+                    )
+                }
+        }
+    ].
+
+action_values() ->
+    #{
+        <<"parameters">> =>
+            #{
+                <<"table">> => <<"mqtt_msg">>,
+                <<"template">> => ?DEFAULT_TEMPLATE
+            }
+    }.
+
 %% -------------------------------------------------------------------------------------------------
 %% Hocon Schema Definitions
 namespace() -> "bridge_dynamo".
 
 roots() -> [].
 
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    emqx_connector_schema:api_fields(
+        Field,
+        ?CONNECTOR_TYPE,
+        fields("config_connector") -- emqx_connector_schema:common_fields()
+    );
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(dynamo_action));
+fields(action) ->
+    {?ACTION_TYPE,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(?MODULE, dynamo_action)),
+            #{
+                desc => <<"DynamoDB Action Config">>,
+                required => false
+            }
+        )};
+fields(dynamo_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            hoconsc:ref(?MODULE, action_parameters),
+            #{
+                required => true,
+                desc => ?DESC("action_parameters")
+            }
+        )
+    );
+fields(action_parameters) ->
+    Parameters =
+        [
+            {template,
+                mk(
+                    binary(),
+                    #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
+                )}
+        ] ++ emqx_bridge_dynamo_connector:fields(config),
+    lists:foldl(
+        fun(Key, Acc) ->
+            proplists:delete(Key, Acc)
+        end,
+        Parameters,
+        [
+            url,
+            aws_access_key_id,
+            aws_secret_access_key,
+            pool_size,
+            auto_reconnect
+        ]
+    );
+fields("config_connector") ->
+    Config =
+        emqx_connector_schema:common_fields() ++
+            emqx_bridge_dynamo_connector:fields(config) ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
+    lists:foldl(
+        fun(Key, Acc) ->
+            proplists:delete(Key, Acc)
+        end,
+        Config,
+        [
+            table
+        ]
+    );
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@@ -102,9 +231,17 @@ fields("get") ->
 desc("config") ->
     ?DESC("desc_config");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
-    ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
+    ["Configuration for DynamoDB using `", string:to_upper(Method), "` method."];
 desc("creation_opts" = Name) ->
     emqx_resource_schema:desc(Name);
+desc("config_connector") ->
+    ?DESC("config_connector");
+desc(dynamo_action) ->
+    ?DESC("dynamo_action");
+desc(action_parameters) ->
+    ?DESC("action_parameters");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(_) ->
     undefined.
 

+ 22 - 0
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_action_info.erl

@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_dynamo_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+bridge_v1_type_name() -> dynamo.
+
+action_type_name() -> dynamo.
+
+connector_type_name() -> dynamo.
+
+schema_module() -> emqx_bridge_dynamo.

+ 107 - 21
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -21,7 +21,11 @@
     on_stop/2,
     on_query/3,
     on_batch_query/3,
-    on_get_status/2
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
 -export([
@@ -70,7 +74,6 @@ on_start(
         url := Url,
         aws_access_key_id := AccessKeyID,
         aws_secret_access_key := SecretAccessKey,
-        table := Table,
         pool_size := PoolSize
     } = Config
 ) ->
@@ -95,12 +98,9 @@ on_start(
         }},
         {pool_size, PoolSize}
     ],
-
-    Templates = parse_template(Config),
     State = #{
         pool_name => InstanceId,
-        table => Table,
-        templates => Templates
+        installed_channels => #{}
     },
     case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
         ok ->
@@ -109,32 +109,83 @@ on_start(
             Error
     end.
 
+on_add_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels
+    } = OldState,
+    ChannelId,
+    ChannelConfig
+) ->
+    {ok, ChannelState} = create_channel_state(ChannelConfig),
+    NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
+    %% Update state
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+create_channel_state(
+    #{parameters := Conf} = _ChannelConfig
+) ->
+    #{
+        table := Table
+    } = Conf,
+    Templates = parse_template_from_conf(Conf),
+    State = #{
+        table => Table,
+        templates => Templates
+    },
+    {ok, State}.
+
+on_remove_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels
+    } = OldState,
+    ChannelId
+) ->
+    NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
+    %% Update state
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+on_get_channel_status(
+    _ResId,
+    _ChannelId,
+    _State
+) ->
+    ?status_connected.
+
+on_get_channels(ResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ResId).
+
 on_stop(InstanceId, _State) ->
     ?SLOG(info, #{
         msg => "stopping_dynamo_connector",
         connector => InstanceId
     }),
+    ?tp(
+        dynamo_connector_on_stop,
+        #{instance_id => InstanceId}
+    ),
     emqx_resource_pool:stop(InstanceId).
 
 on_query(InstanceId, Query, State) ->
     do_query(InstanceId, Query, State).
 
 %% we only support batch insert
-on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) ->
+on_batch_query(InstanceId, [{_ChannelId, _} | _] = Query, State) ->
     do_query(InstanceId, Query, State);
 on_batch_query(_InstanceId, Query, _State) ->
     {error, {unrecoverable_error, {invalid_request, Query}}}.
 
-%% we only support batch insert
-
 on_get_status(_InstanceId, #{pool_name := Pool}) ->
     Health = emqx_resource_pool:health_check_workers(
         Pool, {emqx_bridge_dynamo_connector_client, is_connected, []}
     ),
     status_result(Health).
 
-status_result(_Status = true) -> connected;
-status_result(_Status = false) -> connecting.
+status_result(_Status = true) -> ?status_connected;
+status_result(_Status = false) -> ?status_connecting.
 
 %%========================================================================================
 %% Helper fns
@@ -143,29 +194,44 @@ status_result(_Status = false) -> connecting.
 do_query(
     InstanceId,
     Query,
-    #{pool_name := PoolName, templates := Templates, table := Table} = State
+    #{
+        pool_name := PoolName,
+        installed_channels := Channels
+    } = State
 ) ->
     ?TRACE(
         "QUERY",
         "dynamo_connector_received",
         #{connector => InstanceId, query => Query, state => State}
     ),
-    Result = ecpool:pick_and_do(
-        PoolName,
-        {emqx_bridge_dynamo_connector_client, query, [Table, Query, Templates]},
-        no_handover
-    ),
+    ChannelId = get_channel_id(Query),
+    QueryTuple = get_query_tuple(Query),
+    ChannelState = maps:get(ChannelId, Channels),
+    #{
+        table := Table,
+        templates := Templates
+    } = ChannelState,
+    Result =
+        ecpool:pick_and_do(
+            PoolName,
+            {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]},
+            no_handover
+        ),
 
     case Result of
         {error, Reason} ->
             ?tp(
                 dynamo_connector_query_return,
-                #{error => Reason}
+                #{
+                    error => Reason,
+                    instance_id => InstanceId
+                }
             ),
             ?SLOG(error, #{
                 msg => "dynamo_connector_do_query_failed",
                 connector => InstanceId,
-                query => Query,
+                channel => ChannelId,
+                query => QueryTuple,
                 reason => Reason
             }),
             case Reason of
@@ -177,16 +243,36 @@ do_query(
         _ ->
             ?tp(
                 dynamo_connector_query_return,
-                #{result => Result}
+                #{
+                    result => Result,
+                    instance_id => InstanceId
+                }
             ),
             Result
     end.
 
+get_channel_id([{ChannelId, _Req} | _]) ->
+    ChannelId;
+get_channel_id({ChannelId, _Req}) ->
+    ChannelId.
+
+get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
+    {QueryType, Data};
+get_query_tuple({_ChannelId, Data} = _Query) ->
+    {send_message, Data};
+get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
+    error(
+        {unrecoverable_error,
+            {invalid_request, <<"The only query type that support batching is insert.">>}}
+    );
+get_query_tuple([InsertQuery | _]) ->
+    get_query_tuple(InsertQuery).
+
 connect(Opts) ->
     Config = proplists:get_value(config, Opts),
     {ok, _Pid} = emqx_bridge_dynamo_connector_client:start_link(Config).
 
-parse_template(Config) ->
+parse_template_from_conf(Config) ->
     Templates =
         case maps:get(template, Config, undefined) of
             undefined -> #{};

+ 1 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl

@@ -144,7 +144,7 @@ apply_template({Key, Msg} = Req, Templates) ->
 %% 1. we can simply replace the `send_message` to `put`
 %% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`,
 %%    so we can reduce some list map cost
-apply_template([{send_message, _Msg} | _] = Msgs, Templates) ->
+apply_template([{_, _Msg} | _] = Msgs, Templates) ->
     lists:map(
         fun(Req) ->
             {_, Msg} = apply_template(Req, Templates),

+ 78 - 5
apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl

@@ -117,7 +117,11 @@ common_init(ConfigT) ->
         {host, Host},
         {port, Port},
         {query_mode, sync},
-        {proxy_name, "dynamo"}
+        {proxy_name, "dynamo"},
+        {bridge_type, <<"dynamo">>},
+        {bridge_name, <<"my_dynamo_action">>},
+        {connector_type, <<"dynamo">>},
+        {connector_name, <<"my_dynamo_connector">>}
         | ConfigT
     ],
 
@@ -143,6 +147,8 @@ common_init(ConfigT) ->
                     {dynamo_config, TDConf},
                     {dynamo_bridge_type, BridgeType},
                     {dynamo_name, Name},
+                    {bridge_config, action_config(Config0)},
+                    {connector_config, connector_config(Config0)},
                     {proxy_host, ProxyHost},
                     {proxy_port, ProxyPort}
                     | Config0
@@ -193,6 +199,48 @@ dynamo_config(BridgeType, Config) ->
         ),
     {Name, parse_and_check(ConfigString, BridgeType, Name)}.
 
+action_config(Config) ->
+    ConnectorName = ?config(connector_name, Config),
+    BatchSize = ?config(batch_size, Config),
+    QueryMode = ?config(query_mode, Config),
+    #{
+        <<"connector">> => ConnectorName,
+        <<"enable">> => true,
+        <<"parameters">> =>
+            #{
+                <<"table">> => ?TABLE
+            },
+        <<"resource_opts">> =>
+            #{
+                <<"health_check_interval">> => <<"15s">>,
+                <<"inflight_window">> => 100,
+                <<"max_buffer_bytes">> => <<"256MB">>,
+                <<"request_ttl">> => <<"45s">>,
+                <<"worker_pool_size">> => 16,
+                <<"query_mode">> => QueryMode,
+                <<"batch_size">> => BatchSize
+            }
+    }.
+
+connector_config(Config) ->
+    Host = ?config(host, Config),
+    Port = ?config(port, Config),
+    URL = list_to_binary("http://" ++ Host ++ ":" ++ integer_to_list(Port)),
+    SecretFile = ?config(dynamo_secretfile, Config),
+    AccessKey = "file://" ++ SecretFile,
+    #{
+        <<"url">> => URL,
+        <<"aws_access_key_id">> => ?ACCESS_KEY_ID,
+        <<"aws_secret_access_key">> => AccessKey,
+        <<"enable">> => true,
+        <<"pool_size">> => 8,
+        <<"resource_opts">> =>
+            #{
+                <<"health_check_interval">> => <<"15s">>,
+                <<"start_timeout">> => <<"5s">>
+            }
+    }.
+
 parse_and_check(ConfigString, BridgeType, Name) ->
     {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
     hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
@@ -234,8 +282,9 @@ send_message(Config, Payload) ->
 query_resource(Config, Request) ->
     Name = ?config(dynamo_name, Config),
     BridgeType = ?config(dynamo_bridge_type, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
+    ID = emqx_bridge_v2:id(BridgeType, Name),
+    ResID = emqx_connector_resource:resource_id(BridgeType, Name),
+    emqx_resource:query(ID, Request, #{timeout => 500, connector_resource_id => ResID}).
 
 %% create a table, use the apps/emqx_bridge_dynamo/priv/dynamo/mqtt_msg.json as template
 create_table(Config) ->
@@ -403,7 +452,10 @@ t_simple_query(Config) ->
         {ok, _},
         create_bridge(Config)
     ),
-    Request = {get_item, {<<"id">>, <<"not_exists">>}},
+    BridgeType = ?config(dynamo_bridge_type, Config),
+    Name = ?config(dynamo_name, Config),
+    ActionID = emqx_bridge_v2:id(BridgeType, Name),
+    Request = {ActionID, {get_item, {<<"id">>, <<"not_exists">>}}},
     Result = query_resource(Config, Request),
     case ?config(batch_size, Config) of
         ?BATCH_SIZE ->
@@ -427,11 +479,32 @@ t_bad_parameter(Config) ->
         {ok, _},
         create_bridge(Config)
     ),
-    Request = {insert_item, bad_parameter},
+    BridgeType = ?config(dynamo_bridge_type, Config),
+    Name = ?config(dynamo_name, Config),
+    ActionID = emqx_bridge_v2:id(BridgeType, Name),
+    Request = {ActionID, {insert_item, bad_parameter}},
     Result = query_resource(Config, Request),
     ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result),
     ok.
 
+%% Connector Action Tests
+
+t_action_on_get_status(Config) ->
+    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}).
+
+t_action_create_via_http(Config) ->
+    emqx_bridge_v2_testlib:t_create_via_http(Config).
+
+t_action_sync_query(Config) ->
+    MakeMessageFun = fun() -> #{id => <<"the_message_id">>, payload => ?PAYLOAD} end,
+    IsSuccessCheck = fun(Result) -> ?assertEqual({ok, []}, Result) end,
+    TracePoint = dynamo_connector_query_return,
+    emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint).
+
+t_action_start_stop(Config) ->
+    StopTracePoint = dynamo_connector_on_stop,
+    emqx_bridge_v2_testlib:t_start_stop(Config, StopTracePoint).
+
 to_bin(List) when is_list(List) ->
     unicode:characters_to_binary(List, utf8);
 to_bin(Bin) when is_binary(Bin) ->

+ 14 - 0
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -26,6 +26,8 @@ resource_type(azure_event_hub_producer) ->
     emqx_bridge_kafka_impl_producer;
 resource_type(confluent_producer) ->
     emqx_bridge_kafka_impl_producer;
+resource_type(dynamo) ->
+    emqx_bridge_dynamo_connector;
 resource_type(gcp_pubsub_consumer) ->
     emqx_bridge_gcp_pubsub_impl_consumer;
 resource_type(gcp_pubsub_producer) ->
@@ -122,6 +124,14 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {dynamo,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_dynamo, "config_connector")),
+                #{
+                    desc => <<"DynamoDB Connector Config">>,
+                    required => false
+                }
+            )},
         {gcp_pubsub_consumer,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_consumer_schema, "config_connector")),
@@ -329,6 +339,7 @@ schema_modules() ->
     [
         emqx_bridge_azure_event_hub,
         emqx_bridge_confluent_producer,
+        emqx_bridge_dynamo,
         emqx_bridge_gcp_pubsub_consumer_schema,
         emqx_bridge_gcp_pubsub_producer_schema,
         emqx_bridge_hstreamdb,
@@ -366,6 +377,9 @@ api_schemas(Method) ->
         api_ref(
             emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
         ),
+        api_ref(
+            emqx_bridge_dynamo, <<"dynamo">>, Method ++ "_connector"
+        ),
         api_ref(
             emqx_bridge_gcp_pubsub_consumer_schema,
             <<"gcp_pubsub_consumer">>,

+ 2 - 0
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -126,6 +126,8 @@ connector_type_to_bridge_types(azure_event_hub_producer) ->
     [azure_event_hub_producer];
 connector_type_to_bridge_types(confluent_producer) ->
     [confluent_producer];
+connector_type_to_bridge_types(dynamo) ->
+    [dynamo];
 connector_type_to_bridge_types(gcp_pubsub_consumer) ->
     [gcp_pubsub_consumer];
 connector_type_to_bridge_types(gcp_pubsub_producer) ->

+ 18 - 0
rel/i18n/emqx_bridge_dynamo.hocon

@@ -42,4 +42,22 @@ The template can be any valid JSON with placeholders and make sure all keys for
 template.label:
 """Template"""
 
+action_parameters.desc:
+"""Action specific configuration."""
+
+action_parameters.label:
+"""Action"""
+
+dynamo_action.desc:
+"""Configuration for DynamoDB action."""
+
+dynamo_action.label:
+"""DynamoDB Action Configuration"""
+
+config_connector.desc:
+"""Configuration for an DynamoDB connector."""
+
+config_connector.label:
+"""DynamoDB Connector Configuration"""
+
 }