Просмотр исходного кода

Merge pull request #14414 from terry-xiaoyu/tablestore-integration-2

Tablestore integration
Xinyu Liu 1 год назад
Родитель
Сommit
9ea899efd2

+ 19 - 0
apps/emqx_bridge_tablestore/.gitignore

@@ -0,0 +1,19 @@
+.rebar3
+_*
+.eunit
+*.o
+*.beam
+*.plt
+*.swp
+*.swo
+.erlang.cookie
+ebin
+log
+erl_crash.dump
+.rebar
+logs
+_build
+.idea
+*.iml
+rebar3.crashdump
+*~

+ 94 - 0
apps/emqx_bridge_tablestore/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2028-01-26
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 24 - 0
apps/emqx_bridge_tablestore/README.md

@@ -0,0 +1,24 @@
+# EMQX Tablestore Bridge
+
+The `emqx_bridge_tablestore` is a bridge for EMQX to integrate with the [Aliyun Tablestore](https://cn.aliyun.com/product/ots).
+
+# Documentation
+
+- Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
+  for the EMQX rules engine introduction.
+
+# HTTP APIs
+
+- Several APIs are provided for bridge management, which includes create bridge,
+  update bridge, get bridge, stop or restart bridge and list bridges etc.
+
+  Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges)
+  for more detailed information.
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

+ 32 - 0
apps/emqx_bridge_tablestore/mix.exs

@@ -0,0 +1,32 @@
+defmodule EMQXBridgeTablestore.MixProject do
+  use Mix.Project
+  alias EMQXUmbrella.MixProject, as: UMP
+
+  def project do
+    [
+      app: :emqx_bridge_tablestore,
+      version: "0.1.0",
+      build_path: "../../_build",
+      erlc_options: UMP.erlc_options(),
+      erlc_paths: UMP.erlc_paths(),
+      deps_path: "../../deps",
+      lockfile: "../../mix.lock",
+      elixir: "~> 1.14",
+      start_permanent: Mix.env() == :prod,
+      deps: deps()
+    ]
+  end
+
+  def application do
+    [extra_applications: UMP.extra_applications()]
+  end
+
+  def deps() do
+    [
+      UMP.common_dep(:tablestore),
+      {:emqx_connector, in_umbrella: true, runtime: false},
+      {:emqx_resource, in_umbrella: true},
+      {:emqx_bridge, in_umbrella: true, runtime: false}
+    ]
+  end
+end

+ 7 - 0
apps/emqx_bridge_tablestore/rebar.config

@@ -0,0 +1,7 @@
+{erl_opts, [debug_info]}.
+{deps, [
+    {ots_erl, {git, "https://github.com/emqx/ots_erl.git", {tag, "0.2.2"}}},
+    {emqx_connector, {path, "../../apps/emqx_connector"}},
+    {emqx_resource, {path, "../../apps/emqx_resource"}},
+    {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+]}.

+ 16 - 0
apps/emqx_bridge_tablestore/src/emqx_bridge_tablestore.app.src

@@ -0,0 +1,16 @@
+{application, emqx_bridge_tablestore, [
+    {description, "EMQX Enterprise Tablestore Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        ots_erl
+    ]},
+    {env, [
+        {emqx_action_info_modules, [emqx_bridge_tablestore_action_info]},
+        {emqx_connector_info_modules, [emqx_bridge_tablestore_connector_info]}
+    ]},
+    {modules, []},
+    {links, []}
+]}.

+ 317 - 0
apps/emqx_bridge_tablestore/src/emqx_bridge_tablestore.erl

@@ -0,0 +1,317 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_tablestore).
+
+-behaviour(emqx_connector_examples).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+%% Examples
+-export([
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
+]).
+
+-define(CONNECTOR_TYPE, tablestore).
+-define(ACTION_TYPE, tablestore).
+
+%% Examples
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"tablestore_timeseries">> => #{
+                summary => <<"Tablestore Timeseries Bridge">>,
+                value => values(timeseries, Method)
+            }
+        }
+    ].
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"tablestore_timeseries">> => #{
+                summary => <<"Tablestore Timeseries Action">>,
+                value => emqx_bridge_v2_schema:action_values(
+                    Method, tablestore, tablestore, action_parameters_example(timeseries)
+                )
+            }
+        }
+    ].
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"tablestore_timeseries">> => #{
+                summary => <<"Tablestore Timeseries Connector">>,
+                value => emqx_connector_schema:connector_values(
+                    Method, tablestore, connector_values(timeseries)
+                )
+            }
+        }
+    ].
+
+connector_values(timeseries) ->
+    #{
+        name => <<"tablestore_connector">>,
+        enable => true,
+        endpoint => <<"https://myinstance.cn-hangzhou.ots.aliyuncs.com">>,
+        storage_model_type => timeseries,
+        instance_name => <<"myinstance">>,
+        access_key_id => <<"******">>,
+        access_key_secret => <<"******">>
+    }.
+
+action_parameters_example(timeseries) ->
+    #{
+        parameters => #{
+            storage_model_type => timeseries,
+            table_name => <<"table_name">>,
+            data_source => <<"${data_source}">>,
+            measurement => <<"${measurement}">>,
+            tags => #{
+                tag1 => <<"${tag1}">>,
+                tag2 => <<"${tag2}">>
+            },
+            fields => [
+                #{
+                    column => <<"${column}">>,
+                    value => <<"${value}">>,
+                    isint => true
+                }
+            ],
+            meta_update_model => <<"MUM_IGNORE">>
+        }
+    }.
+
+values(StorageType, get) ->
+    values(StorageType, post);
+values(StorageType, put) ->
+    values(StorageType, post);
+values(timeseries, post) ->
+    #{
+        name => <<"tablestore_connector">>,
+        enable => true,
+        local_topic => <<"local/topic/#">>,
+        endpoint => <<"https://myinstance.cn-hangzhou.ots.aliyuncs.com">>,
+        storage_model_type => timeseries,
+        instance_name => <<"myinstance">>,
+        access_key_id => <<"******">>,
+        access_key_secret => <<"******">>,
+        table_name => <<"table_name">>,
+        measurement => <<"measurement">>,
+        tags => #{
+            tag1 => <<"${tag1}">>,
+            tag2 => <<"${tag2}">>
+        },
+        fields => [
+            #{
+                column => <<"${column}">>,
+                value => <<"${value}">>,
+                isint => true
+            }
+        ],
+        data_source => <<"${data_source}">>,
+        meta_update_model => <<"MUM_IGNORE">>,
+        resource_opts => #{
+            batch_size => 1,
+            batch_time => <<"20ms">>
+        },
+        pool_size => 8,
+        ssl => #{enable => false}
+    }.
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge_tablestore".
+
+roots() -> [].
+
+fields("connector") ->
+    [
+        storage_model_type_field(),
+        {endpoint, mk(binary(), #{required => true, desc => ?DESC("desc_endpoint")})},
+        {instance_name, mk(binary(), #{required => true, desc => ?DESC("desc_instance_name")})},
+        {access_key_id,
+            emqx_schema_secret:mk(
+                #{
+                    required => true,
+                    sensitive => true,
+                    desc => ?DESC("desc_access_key_id")
+                }
+            )},
+        {access_key_secret,
+            emqx_schema_secret:mk(
+                #{
+                    required => true,
+                    sensitive => true,
+                    desc => ?DESC("desc_access_key_secret")
+                }
+            )},
+        {pool_size,
+            mk(
+                integer(),
+                #{
+                    required => false,
+                    default => 8,
+                    desc => ?DESC("pool_size")
+                }
+            )}
+    ] ++ emqx_connector_schema_lib:ssl_fields();
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        fields("connector") ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(action) ->
+    {tablestore,
+        mk(
+            hoconsc:map(name, ref(?MODULE, tablestore_action)),
+            #{desc => <<"Tablestore Action Config">>, required => false}
+        )};
+fields(tablestore_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        mk(ref(?MODULE, action_parameters), #{
+            required => true, desc => ?DESC(action_parameters)
+        })
+    );
+fields(action_parameters) ->
+    [
+        storage_model_type_field(),
+        {table_name,
+            mk(binary(), #{
+                required => true,
+                is_template => true,
+                desc => ?DESC("desc_table_name")
+            })},
+        {measurement,
+            mk(binary(), #{
+                required => true,
+                is_template => true,
+                desc => ?DESC("desc_measurement")
+            })},
+        tags_field(),
+        fields_field(),
+        {data_source,
+            mk(binary(), #{
+                required => false,
+                is_template => true,
+                desc => ?DESC("desc_data_source")
+            })},
+        {timestamp,
+            mk(hoconsc:union([integer(), binary()]), #{
+                required => false,
+                is_template => true,
+                desc => ?DESC("desc_timestamp")
+            })},
+        {meta_update_model,
+            mk(
+                enum(['MUM_IGNORE', 'MUM_NORMAL']), #{
+                    required => false,
+                    default => 'MUM_NORMAL',
+                    desc => ?DESC("desc_meta_update_model")
+                }
+            )}
+    ];
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    Fields =
+        fields("connector") ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
+    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(tablestore_action));
+fields("tablestore_fields") ->
+    [
+        {column,
+            mk(binary(), #{
+                required => true,
+                is_template => true,
+                desc => ?DESC("tablestore_fields_column")
+            })},
+        {value,
+            mk(hoconsc:union([boolean(), number(), binary()]), #{
+                required => true,
+                is_template => true,
+                desc => ?DESC("tablestore_fields_value")
+            })},
+        {isint,
+            mk(hoconsc:union([boolean(), binary()]), #{
+                required => false,
+                is_template => true,
+                desc => ?DESC("tablestore_fields_isint")
+            })},
+        {isbinary,
+            mk(hoconsc:union([boolean(), binary()]), #{
+                required => false,
+                is_template => true,
+                desc => ?DESC("tablestore_fields_isbinary")
+            })}
+    ].
+
+storage_model_type_field() ->
+    {storage_model_type,
+        mk(
+            enum([timeseries]), #{
+                required => false,
+                default => timeseries,
+                desc => ?DESC("storage_model_type")
+            }
+        )}.
+
+tags_field() ->
+    {tags,
+        mk(
+            map(), #{
+                default => #{},
+                desc => ?DESC("desc_tags"),
+                is_template => true
+            }
+        )}.
+
+fields_field() ->
+    {fields,
+        mk(
+            hoconsc:array(ref(?MODULE, "tablestore_fields")), #{
+                default => [],
+                desc => ?DESC("desc_fields")
+            }
+        )}.
+
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for Tablestore using `", string:to_upper(Method), "` method."];
+desc("connector") ->
+    ?DESC("connector");
+desc("config_connector") ->
+    ?DESC("desc_config");
+desc(action_parameters) ->
+    ?DESC("action_parameters");
+desc(tablestore_action) ->
+    ?DESC("tablestore_action");
+desc("tablestore_fields") ->
+    ?DESC("tablestore_fields");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
+desc(_) ->
+    undefined.

+ 20 - 0
apps/emqx_bridge_tablestore/src/emqx_bridge_tablestore_action_info.erl

@@ -0,0 +1,20 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_tablestore_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-define(SCHEMA_MODULE, emqx_bridge_tablestore).
+
+action_type_name() -> tablestore.
+
+connector_type_name() -> tablestore.
+
+schema_module() -> ?SCHEMA_MODULE.

+ 315 - 0
apps/emqx_bridge_tablestore/src/emqx_bridge_tablestore_connector.erl

@@ -0,0 +1,315 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_tablestore_connector).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    resource_type/0,
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channel_status/3,
+    on_get_channels/1,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2
+]).
+
+-export([
+    mk_tablestore_data/3,
+    mk_tablestore_batch_data/2
+]).
+
+-define(OTS_CLIENT_NAME(ID), list_to_binary("tablestore:" ++ str(ID))).
+-define(TKS(STR), {tmpl_tokens, STR}).
+-define(LOG_T(LEVEL, LOG), ?SLOG(LEVEL, LOG, #{tag => "TABLE_STORE"})).
+
+%%--------------------------------------------------------------------
+%% resource callback
+
+resource_type() -> tablestore.
+
+callback_mode() -> always_sync.
+
+on_add_channel(_InstId, #{channels := Channels} = OldState, ChannelId, Conf) ->
+    #{parameters := #{storage_model_type := timeseries} = Params} = Conf,
+    Channels1 = maps:get(ChannelId, Channels, #{}),
+    NewState = OldState#{
+        channels => Channels1#{
+            ChannelId => #{
+                timestamp => maybe_preproc(maps:get(timestamp, Params, <<"now">>)),
+                table_name => maybe_preproc(maps:get(table_name, Params)),
+                measurement => maybe_preproc(maps:get(measurement, Params)),
+                tags => preproc_tags(maps:get(tags, Params)),
+                fields => preproc_fields(maps:get(fields, Params)),
+                data_source => maybe_preproc(maps:get(data_source, Params, <<>>)),
+                meta_update_model => maps:get(meta_update_model, Params)
+            }
+        }
+    },
+    {ok, NewState}.
+
+on_remove_channel(_InstId, #{channels := Channels} = State, ChannelId) ->
+    {ok, State#{
+        channels => maps:remove(ChannelId, Channels)
+    }}.
+
+on_get_channel_status(InstId, _ChannelId, State) ->
+    on_get_status(InstId, State).
+
+on_get_channels(InstId) ->
+    emqx_bridge_influxdb_connector:on_get_channels(InstId).
+
+on_start(InstId, Config) ->
+    BaseOpts = [
+        {instance, maps:get(instance_name, Config)},
+        {pool, ?OTS_CLIENT_NAME(InstId)},
+        {endpoint, maps:get(endpoint, Config)},
+        {pool_size, maps:get(pool_size, Config)}
+    ],
+    SecretOpts = [
+        {access_key, maps:get(access_key_id, Config)},
+        {access_secret, maps:get(access_key_secret, Config)}
+    ],
+    ?LOG_T(info, #{msg => ots_start, ots_opts => BaseOpts}),
+    {ok, ClientRef} = start_ots_ts_client(InstId, BaseOpts ++ SecretOpts),
+    case list_ots_tables(ClientRef) of
+        {ok, _} ->
+            {ok, #{client_ref => ClientRef, channels => #{}, ots_opts => BaseOpts}};
+        {error, Reason} ->
+            _ = ots_ts_client:stop(ClientRef),
+            {error, Reason}
+    end.
+
+on_stop(_InstId, #{client_ref := ClientRef} = State) ->
+    ots_ts_client:stop(ClientRef),
+    State.
+
+on_get_status(_InstId, #{client_ref := ClientRef}) ->
+    case list_ots_tables(ClientRef) of
+        {ok, _} -> ?status_connected;
+        _ -> ?status_connecting
+    end.
+
+on_query(_InstId, {ChannelId, Message}, #{client_ref := ClientRef, channels := Channels}) ->
+    case maps:find(ChannelId, Channels) of
+        {ok, #{table_name := TableName0, meta_update_model := MetaUpdateMode} = ChannelState} ->
+            try
+                Row = mk_tablestore_data_row(Message, ChannelState),
+                TableName = render_table_name(TableName0, Message),
+                mk_tablestore_data(TableName, MetaUpdateMode, [Row])
+            of
+                Data ->
+                    LogMsg = #{msg => ots_query, channel => ChannelId, data => Data},
+                    ?LOG_T(debug, LogMsg),
+                    case ots_ts_client:put(ClientRef, Data) of
+                        {ok, _Res} -> ok;
+                        {error, Reason} -> {error, {unrecoverable_error, Reason}}
+                    end
+            catch
+                throw:{bad_ots_data, _} = Reason ->
+                    {error, {unrecoverable_error, Reason}}
+            end;
+        error ->
+            {error, {unrecoverable_error, channel_not_found}}
+    end.
+
+on_batch_query(_, [{ChannelId, _} | _] = MsgList, #{client_ref := ClientRef, channels := Channels}) ->
+    case maps:find(ChannelId, Channels) of
+        {ok, ChannelState} ->
+            try mk_tablestore_batch_data(MsgList, ChannelState) of
+                BatchDataList ->
+                    send_batch_data(BatchDataList, ClientRef, ChannelId)
+            catch
+                throw:{bad_ots_data, _} = Reason ->
+                    {error, {unrecoverable_error, Reason}}
+            end;
+        error ->
+            {error, {unrecoverable_error, channel_not_found}}
+    end.
+
+send_batch_data(BatchDataList, ClientRef, ChannelId) ->
+    LogMsg = #{msg => ots_batch_query, channel => ChannelId, batch_data_list => BatchDataList},
+    ?LOG_T(debug, LogMsg),
+    Res = [ots_ts_client:put(ClientRef, BatchData) || BatchData <- BatchDataList],
+    Filter = fun
+        ({error, _}) -> true;
+        (_) -> false
+    end,
+    case lists:filter(Filter, Res) of
+        [] -> ok;
+        Errors -> {error, {unrecoverable_error, Errors}}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal Functions
+%%--------------------------------------------------------------------
+start_ots_ts_client(_, OtsOpts) ->
+    ots_ts_client:start(OtsOpts).
+
+list_ots_tables(ClientRef) ->
+    try
+        ots_ts_client:list_tables(ClientRef)
+    catch
+        Err:Reason:ST ->
+            {error, #{error => Err, reason => Reason, stacktrace => ST}}
+    end.
+
+preproc_tags(Tags) ->
+    maps:fold(
+        fun(K, V, AccIn) ->
+            AccIn#{maybe_preproc(bin(K)) => maybe_preproc(V)}
+        end,
+        #{},
+        Tags
+    ).
+
+preproc_fields(Fields) ->
+    lists:map(
+        fun(#{column := C, value := V} = Row0) ->
+            #{
+                column => maybe_preproc(C),
+                value => maybe_preproc(V),
+                isint => maybe_preproc(maps:get(isint, Row0, undefined)),
+                isbinary => maybe_preproc(maps:get(isbinary, Row0, undefined))
+            }
+        end,
+        Fields
+    ).
+
+render_table_name(TableName, Message) ->
+    case render_tmpl(TableName, Message) of
+        undefined -> throw({bad_ots_data, no_table_name});
+        TName -> TName
+    end.
+
+render_tags(Tags, Message) ->
+    maps:fold(
+        fun(K, V, AccIn) ->
+            case {render_tmpl(K, Message), render_tmpl(V, Message)} of
+                {Key, Value} when Key =/= undefined, Value =/= undefined ->
+                    AccIn#{Key => Value};
+                _ ->
+                    AccIn
+            end
+        end,
+        #{},
+        Tags
+    ).
+
+render_fields(Fields, Message) ->
+    lists:filtermap(
+        fun(#{column := Column, value := Value} = Row) ->
+            case {render_tmpl(Column, Message), render_tmpl(Value, Message)} of
+                {Col, Val} when Col =/= undefined, Val =/= undefined ->
+                    {true, {Col, Val, field_opts([isint, isbinary], Row, Message, #{})}};
+                {_Col, _Val} ->
+                    false
+            end
+        end,
+        Fields
+    ).
+
+field_opts([Key | Keys], Row, Message, Opts) ->
+    case render_tmpl(maps:get(Key, Row, undefined), Message) of
+        undefined ->
+            field_opts(Keys, Row, Message, Opts);
+        Val ->
+            field_opts(Keys, Row, Message, Opts#{Key => Val})
+    end;
+field_opts([], _, _, Opts) ->
+    Opts.
+
+maybe_preproc(Str) when is_binary(Str) ->
+    case string:find(Str, "${") of
+        nomatch -> Str;
+        _ -> ?TKS(emqx_placeholder:preproc_tmpl(Str))
+    end;
+maybe_preproc(Any) ->
+    Any.
+
+mk_tablestore_batch_data(MsgList, #{table_name := TableName0} = ChannelState) ->
+    #{meta_update_model := MetaUpdateMode} = ChannelState,
+    GrpRows = lists:foldr(
+        fun({_, Message}, Res) ->
+            TableName = render_table_name(TableName0, Message),
+            Row = mk_tablestore_data_row(Message, ChannelState),
+            Res#{TableName => [Row | maps:get(TableName, Res, [])]}
+        end,
+        #{},
+        MsgList
+    ),
+    [
+        mk_tablestore_data(TableName, MetaUpdateMode, Rows)
+     || {TableName, Rows} <- maps:to_list(GrpRows)
+    ].
+
+mk_tablestore_data(TableName, MetaUpdateMode, Rows) ->
+    #{
+        table_name => TableName,
+        rows_data => Rows,
+        meta_update_mode => MetaUpdateMode
+    }.
+
+mk_tablestore_data_row(Message, #{measurement := Measurement0} = ChannelState) ->
+    Measurement = render_tmpl(Measurement0, Message),
+    do_mk_tablestore_data_row(Message, ChannelState, Measurement).
+
+do_mk_tablestore_data_row(_, _, undefined) ->
+    throw({bad_ots_data, no_measurement});
+do_mk_tablestore_data_row(Message, ChannelState, Measurement) ->
+    #{tags := Tags0, fields := Fields0, data_source := DataSource0, timestamp := Ts0} =
+        ChannelState,
+    DataSource = trans_data_source(render_tmpl(DataSource0, Message)),
+    Tags = render_tags(Tags0, Message),
+    Fields = render_fields(Fields0, Message),
+    Timestamp =
+        case render_tmpl(Ts0, Message) of
+            <<"now">> -> os:system_time(microsecond);
+            <<"NOW">> -> os:system_time(microsecond);
+            undefined -> os:system_time(microsecond);
+            Ts1 when is_integer(Ts1) -> Ts1;
+            Ts2 -> throw({bad_ots_data, {bad_timestamp, Ts2}})
+        end,
+    #{
+        measurement => Measurement,
+        data_source => DataSource,
+        tags => Tags,
+        fields => Fields,
+        time => Timestamp
+    }.
+
+trans_data_source(undefined) -> <<>>;
+trans_data_source(DataSource) -> DataSource.
+
+render_tmpl(?TKS(Tokens), Message) ->
+    do_render_tmpl(Tokens, Message);
+render_tmpl(Val, _) ->
+    Val.
+
+do_render_tmpl(Tokens, Message) ->
+    RawResult = emqx_placeholder:proc_tmpl(Tokens, Message, #{return => rawlist}),
+    filter_vars(RawResult).
+
+filter_vars([undefined]) ->
+    undefined;
+filter_vars([RawResult]) ->
+    RawResult;
+filter_vars(RawResult) when is_list(RawResult) ->
+    erlang:iolist_to_binary([str(R) || R <- RawResult]).
+
+str(Data) ->
+    emqx_utils_conv:str(Data).
+
+bin(Data) ->
+    emqx_utils_conv:bin(Data).

+ 43 - 0
apps/emqx_bridge_tablestore/src/emqx_bridge_tablestore_connector_info.erl

@@ -0,0 +1,43 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_tablestore_connector_info).
+
+-behaviour(emqx_connector_info).
+
+-export([
+    type_name/0,
+    bridge_types/0,
+    resource_callback_module/0,
+    config_schema/0,
+    schema_module/0,
+    api_schema/1
+]).
+
+type_name() ->
+    tablestore.
+
+bridge_types() ->
+    [tablestore, tablestore_timeseries].
+
+resource_callback_module() ->
+    emqx_bridge_tablestore_connector.
+
+config_schema() ->
+    {tablestore,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(emqx_bridge_tablestore, "config_connector")),
+            #{
+                desc => <<"Tablestore Connector Config">>,
+                required => false
+            }
+        )}.
+
+schema_module() ->
+    emqx_bridge_tablestore.
+
+api_schema(Method) ->
+    emqx_connector_schema:api_ref(
+        emqx_bridge_tablestore, <<"tablestore">>, Method ++ "_connector"
+    ).

+ 220 - 0
apps/emqx_bridge_tablestore/test/emqx_bridge_tablestore_connector_tests.erl

@@ -0,0 +1,220 @@
+-module(emqx_bridge_tablestore_connector_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(CONF, #{
+    instance_name => <<"instance">>,
+    endpoint => <<"endpoint">>,
+    access_key_id => <<"access_key_id">>,
+    access_key_secret => <<"access_key_secret">>,
+    pool_size => 8
+}).
+
+-define(ACT_CONF, #{
+    parameters => #{
+        storage_model_type => timeseries,
+        timestamp => <<"NOW">>,
+        table_name => <<"${table}">>,
+        measurement => <<"${measurement}">>,
+        meta_update_model => 'MUM_NORMAL',
+        data_source => <<"data_source">>,
+        tags => #{
+            '${tag1}' => <<"${tag1_value}">>,
+            '${tag2}' => <<"${tag2_value}">>
+        },
+        fields => [
+            #{column => <<"str_field0">>, value => <<"str_val0">>},
+            #{column => <<"${str_field}">>, value => <<"${str_val}">>},
+            #{column => <<"${int_field}">>, value => <<"${int_val}">>, isint => true},
+            #{column => <<"${float_field}">>, value => <<"${float_val}">>, isint => false},
+            #{column => <<"${bool_field}">>, value => <<"${bool_val}">>},
+            #{column => <<"${binary_field}">>, value => <<"${binary_val}">>, isbinary => true}
+        ]
+    }
+}).
+
+-define(MSG, #{
+    table => <<"table">>,
+    measurement => <<"measurement">>,
+    tag1 => <<"tag1">>,
+    tag2 => <<"tag2">>,
+    tag1_value => <<"tag1_value">>,
+    tag2_value => <<"tag2_value">>,
+    str_field => <<"str_field">>,
+    str_val => <<"str_val">>,
+    int_field => <<"int_field">>,
+    int_val => 123,
+    float_field => <<"float_field">>,
+    float_val => 123.456,
+    bool_field => <<"bool_field">>,
+    bool_val => true,
+    binary_field => <<"binary_field">>,
+    binary_val => <<"binary_val">>
+}).
+
+start_connector_test_() ->
+    {timeout, 30,
+        {setup,
+            fun() ->
+                meck:new(ots_ts_client, [no_history]),
+                ok = meck:expect(ots_ts_client, start, fun(_OtsOpts) ->
+                    {ok, dummy_client_ref}
+                end),
+                ok = meck:expect(ots_ts_client, list_tables, fun(_CRef) ->
+                    {ok, []}
+                end),
+                ok = meck:expect(ots_ts_client, stop, fun(_CRef) ->
+                    ok
+                end),
+                emqx_bridge_tablestore_connector:on_start(test_inst, ?CONF)
+            end,
+            fun(_) ->
+                meck:unload(ots_ts_client)
+            end,
+            fun({ok, #{client_ref := ClientRef, ots_opts := OtsOpts}}) ->
+                [
+                    ?_assertEqual(dummy_client_ref, ClientRef),
+                    ?_assertEqual(<<"endpoint">>, proplists:get_value(endpoint, OtsOpts)),
+                    ?_assertEqual(8, proplists:get_value(pool_size, OtsOpts))
+                ]
+            end}}.
+
+start_connector_failure_test_() ->
+    {setup,
+        fun() ->
+            meck:new(ots_ts_client, [no_history]),
+            ok = meck:expect(ots_ts_client, start, fun(_OtsOpts) ->
+                {ok, dummy_client_ref}
+            end),
+            ok = meck:expect(ots_ts_client, list_tables, fun(_CRef) ->
+                {error, not_found}
+            end),
+            ok = meck:expect(ots_ts_client, stop, fun(_CRef) ->
+                ok
+            end)
+        end,
+        fun(_) ->
+            meck:unload(ots_ts_client)
+        end,
+        fun(_) ->
+            [
+                ?_assertMatch(
+                    {error, not_found}, emqx_bridge_tablestore_connector:on_start(test_inst, ?CONF)
+                )
+            ]
+        end}.
+
+on_query_test_() ->
+    {setup,
+        fun() ->
+            ets:new(on_query_test, [named_table, public]),
+            meck:new(ots_ts_client, [no_history]),
+            ok = meck:expect(ots_ts_client, put, fun(_CRef, Query) ->
+                ets:insert(on_query_test, {query, Query}),
+                {ok, []}
+            end),
+            emqx_bridge_tablestore_connector:on_add_channel(
+                test_inst,
+                #{channels => #{}},
+                channelid1,
+                ?ACT_CONF
+            )
+        end,
+        fun(_) ->
+            meck:unload(ots_ts_client),
+            ets:delete(on_query_test)
+        end,
+        fun({ok, State}) ->
+            ok = emqx_bridge_tablestore_connector:on_query(
+                test_inst,
+                {channelid1, ?MSG},
+                State#{client_ref => dummy_client_ref}
+            ),
+            [{query, Query}] = ets:lookup(on_query_test, query),
+            #{
+                table_name := TableName,
+                rows_data := [Row],
+                meta_update_mode := MetaUpdateMode
+            } = Query,
+            [
+                ?_assertMatch('MUM_NORMAL', MetaUpdateMode),
+                ?_assertMatch(<<"table">>, TableName),
+                ?_assertMatch(Ts when is_integer(Ts), maps:get(time, Row)),
+                ?_assertMatch(<<"measurement">>, maps:get(measurement, Row)),
+                ?_assertMatch(<<"data_source">>, maps:get(data_source, Row)),
+                ?_assertMatch(
+                    #{<<"tag1">> := <<"tag1_value">>, <<"tag2">> := <<"tag2_value">>},
+                    maps:get(tags, Row)
+                ),
+                ?_assertMatch(
+                    [
+                        {<<"str_field0">>, <<"str_val0">>, #{}},
+                        {<<"str_field">>, <<"str_val">>, #{}},
+                        {<<"int_field">>, 123, #{isint := true}},
+                        {<<"float_field">>, 123.456, #{isint := false}},
+                        {<<"bool_field">>, true, #{}},
+                        {<<"binary_field">>, <<"binary_val">>, #{isbinary := true}}
+                    ],
+                    maps:get(fields, Row)
+                )
+            ]
+        end}.
+
+on_batch_query_test_() ->
+    {setup,
+        fun() ->
+            ets:new(on_query_test, [named_table, public]),
+            meck:new(ots_ts_client, [no_history]),
+            ok = meck:expect(ots_ts_client, put, fun(_CRef, Query) ->
+                ets:insert(on_query_test, {query, Query}),
+                {ok, []}
+            end),
+            emqx_bridge_tablestore_connector:on_add_channel(
+                test_inst,
+                #{channels => #{}},
+                channelid1,
+                ?ACT_CONF
+            )
+        end,
+        fun(_) ->
+            meck:unload(ots_ts_client),
+            ets:delete(on_query_test)
+        end,
+        fun({ok, State}) ->
+            BatchMsgs = [{channelid1, ?MSG} || _ <- lists:seq(1, 3)],
+            ok = emqx_bridge_tablestore_connector:on_batch_query(
+                test_inst,
+                BatchMsgs,
+                State#{client_ref => dummy_client_ref}
+            ),
+            [{query, Query}] = ets:lookup(on_query_test, query),
+            #{
+                table_name := TableName,
+                rows_data := Rows,
+                meta_update_mode := MetaUpdateMode
+            } = Query,
+            Row = hd(Rows),
+            [
+                ?_assert(length(BatchMsgs) =:= length(Rows)),
+                ?_assertMatch('MUM_NORMAL', MetaUpdateMode),
+                ?_assertMatch(<<"table">>, TableName),
+                ?_assertMatch(Ts when is_integer(Ts), maps:get(time, Row)),
+                ?_assertMatch(<<"measurement">>, maps:get(measurement, Row)),
+                ?_assertMatch(<<"data_source">>, maps:get(data_source, Row)),
+                ?_assertMatch(
+                    #{<<"tag1">> := <<"tag1_value">>, <<"tag2">> := <<"tag2_value">>},
+                    maps:get(tags, Row)
+                ),
+                ?_assertMatch(
+                    [
+                        {<<"str_field0">>, <<"str_val0">>, #{}},
+                        {<<"str_field">>, <<"str_val">>, #{}},
+                        {<<"int_field">>, 123, #{isint := true}},
+                        {<<"float_field">>, 123.456, #{isint := false}},
+                        {<<"bool_field">>, true, #{}},
+                        {<<"binary_field">>, <<"binary_val">>, #{isbinary := true}}
+                    ],
+                    maps:get(fields, Row)
+                )
+            ]
+        end}.

+ 1 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -118,6 +118,7 @@
             emqx_bridge_rabbitmq,
             emqx_bridge_azure_event_hub,
             emqx_bridge_datalayers,
+            emqx_bridge_tablestore,
             emqx_s3,
             emqx_bridge_s3,
             emqx_bridge_azure_blob_storage,

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.3.7"},
+    {vsn, "0.3.8"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl, redbug]},

+ 6 - 0
apps/emqx_utils/src/emqx_utils_redact.erl

@@ -41,6 +41,12 @@ is_sensitive_key(<<"secret">>) -> true;
 is_sensitive_key(secret_access_key) -> true;
 is_sensitive_key("secret_access_key") -> true;
 is_sensitive_key(<<"secret_access_key">>) -> true;
+is_sensitive_key(access_key_secret) -> true;
+is_sensitive_key("access_key_secret") -> true;
+is_sensitive_key(<<"access_key_secret">>) -> true;
+is_sensitive_key(access_key_id) -> true;
+is_sensitive_key("access_key_id") -> true;
+is_sensitive_key(<<"access_key_id">>) -> true;
 is_sensitive_key(secret_key) -> true;
 is_sensitive_key("secret_key") -> true;
 is_sensitive_key(<<"secret_key">>) -> true;

+ 1 - 0
changes/ee/feat-14410.en.md

@@ -0,0 +1 @@
+Add new data integration support for the [Aliyun Tablestore](https://cn.aliyun.com/product/ots).

+ 4 - 0
mix.exs

@@ -269,6 +269,9 @@ defmodule EMQXUmbrella.MixProject do
       system_env: emqx_app_system_env()
     }
 
+  def common_dep(:tablestore),
+    do: {:tablestore, github: "emqx/ots_erl", tag: "0.2.2", override: true}
+
   def common_dep(:influxdb),
     do: {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}
 
@@ -396,6 +399,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_ds_builtin_raft,
       :emqx_auth_kerberos,
       :emqx_bridge_datalayers,
+      :emqx_bridge_tablestore,
       :emqx_auth_cinfo
     ])
   end

+ 1 - 0
rebar.config.erl

@@ -101,6 +101,7 @@ is_community_umbrella_app("apps/emqx_bridge_timescale") -> false;
 is_community_umbrella_app("apps/emqx_bridge_oracle") -> false;
 is_community_umbrella_app("apps/emqx_bridge_sqlserver") -> false;
 is_community_umbrella_app("apps/emqx_bridge_datalayers") -> false;
+is_community_umbrella_app("apps/emqx_bridge_tablestore") -> false;
 is_community_umbrella_app("apps/emqx_oracle") -> false;
 is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false;
 is_community_umbrella_app("apps/emqx_ft") -> false;

+ 123 - 0
rel/i18n/emqx_bridge_tablestore.hocon

@@ -0,0 +1,123 @@
+emqx_bridge_tablestore {
+
+desc_config.label:
+"""Tablestore Bridge Configuration"""
+desc_config.desc:
+"""Configuration for a Tablestore bridge."""
+
+desc_endpoint.label:
+"""Endpoint"""
+desc_endpoint.desc:
+"""Endpoint for the Tablestore. e.g. https://myinstance.cn-hangzhou.ots.aliyuncs.com"""
+
+desc_instance_name.label:
+"""Instance Name"""
+desc_instance_name.desc:
+"""Instance name."""
+
+desc_access_key_id.label:
+"""Key ID"""
+desc_access_key_id.desc:
+"""Key ID. e.g. NTS**********************"""
+
+desc_access_key_secret.label:
+"""Key Secret"""
+desc_access_key_secret.desc:
+"""Key secret. e.g. 7NR2****************************************"""
+
+pool_size.label:
+"""Pool Size"""
+pool_size.desc:
+"""The pool size."""
+
+desc_table_name.label:
+"""Table Name"""
+desc_table_name.desc:
+"""Table name. It can either be a static value or a placeholder like `${payload.table_name}`."""
+
+desc_measurement.label:
+"""Measurement"""
+desc_measurement.desc:
+"""The measurement. It can either be a static value or a placeholder like `${payload.measurement}`."""
+
+desc_data_source.label:
+"""Data Source"""
+desc_data_source.desc:
+"""The data source. It can either be a static value or a placeholder like `${payload.data_source}`."""
+
+desc_timestamp.label:
+"""Timestamp"""
+desc_timestamp.desc:
+"""~
+The timestamp in microsecond of the field.
+It can either be a static value or a placeholder like `${payload.microsecond_timestamp}`.
+If not provided or set to `NOW`, the millisecond timestamp when EMQX writes to Tablestore will be used.~"""
+
+desc_meta_update_model.label:
+"""Meta Update Model"""
+desc_meta_update_model.desc:
+"""~
+The update mode for time-series metadata. Can be one of:
+- MUM_NORMAL: Normal mode. When sending messages in this mode, Tablestore will create the timeseries metadata if not exits.
+- MUM_IGNORE: Do not update metadata. When sending messages in this mode, Tablestore will not try to create the timeseries metadata.
+Defaults to MUM_NORMAL~"""
+
+tablestore_fields_column.label:
+"""Column"""
+tablestore_fields_column.desc:
+"""Column name of the field. It can either be a static value or a placeholder like `${payload.column}`"""
+
+tablestore_fields_value.label:
+"""Value"""
+tablestore_fields_value.desc:
+"""Value of the field. It can either be a static value or a placeholder like `${payload.value}`"""
+
+tablestore_fields_isint.label:
+"""Is Int"""
+tablestore_fields_isint.desc:
+"""~
+Whether try to write numeric value as `integer`. Defaults to `false`, means that write integers as floats.
+It can either be a static value or a placeholder like `${payload.is_int}`.~"""
+
+tablestore_fields_isbinary.label:
+"""Is Binary"""
+tablestore_fields_isbinary.desc:
+"""~Whether try to write binary values as `binary` type. Defaults to `false`, means that write binary values as strings.
+It can either be a static value or a placeholder like `${payload.is_binary}`.~"""
+
+storage_model_type.label:
+"""Storage Model Type"""
+storage_model_type.desc:
+"""Storage model type. Can be one of `timeseries` or `order`."""
+
+desc_tags.label:
+"""Tags"""
+desc_tags.desc:
+"""Tags. The tag key and tag value can either be static strings or a placeholder like `${payload.tag_key}` and `${payload.tag_value}`."""
+
+desc_fields.label:
+"""Fields"""
+desc_fields.desc:
+"""Fields. The field column and value can either be static values or a placeholder like `${payload.column_name}` and `${payload.column_value}`."""
+
+connector.label:
+"""Tablestore Connector Configuration"""
+connector.desc:
+"""Configuration for a Tablestore connector."""
+
+action_parameters.label:
+"""Action Parameters"""
+action_parameters.desc:
+"""Additional parameters specific to this action type"""
+
+tablestore_action.label:
+"""Tablestore Action"""
+tablestore_action.desc:
+"""Action to interact with a Tablestore connector"""
+
+tablestore_fields.label:
+"""Tablestore Fields"""
+tablestore_fields.desc:
+"""Tablestore fields."""
+
+}

+ 2 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -283,6 +283,8 @@ Syskeeper
 sysmem
 sysmon
 systemd
+tablestore
+Tablestore
 tcp
 TCP
 TDengine