Просмотр исходного кода

Merge pull request #12261 from lafirest/fix/iotdb

feat(iotdb): improve the IoTDB bridge to v2 style
lafirest 2 лет назад
Родитель
Сommit
7f57ec47d5

+ 2 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -85,7 +85,8 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_pgsql_action_info,
         emqx_bridge_syskeeper_action_info,
         emqx_bridge_timescale_action_info,
-        emqx_bridge_redis_action_info
+        emqx_bridge_redis_action_info,
+        emqx_bridge_iotdb_action_info
     ].
 -else.
 hard_coded_action_info_modules_ee() ->

+ 1 - 1
apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl

@@ -124,7 +124,7 @@ resource_type(sqlserver) -> emqx_bridge_sqlserver_connector;
 resource_type(opents) -> emqx_bridge_opents_connector;
 resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
 resource_type(oracle) -> emqx_oracle;
-resource_type(iotdb) -> emqx_bridge_iotdb_impl;
+resource_type(iotdb) -> emqx_bridge_iotdb_connector;
 resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
 resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer;
 resource_type(greptimedb) -> emqx_bridge_greptimedb_connector;

+ 3 - 1
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -478,11 +478,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
             BridgeId = bridge_id(Config),
+            BridgeType = ?config(bridge_type, Config),
+            BridgeName = ?config(bridge_name, Config),
             Message = {BridgeId, MakeMessageFun()},
             ?assertMatch(
                 {ok, {ok, _}},
                 ?wait_async_action(
-                    emqx_resource:query(ResourceId, Message, #{
+                    emqx_bridge_v2:query(BridgeType, BridgeName, Message, #{
                         async_reply_fun => {ReplyFun, [self()]}
                     }),
                     #{?snk_kind := TracePoint, instance_id := ResourceId},

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_azure_event_hub, [
     {description, "EMQX Enterprise Azure Event Hub Bridge"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 2
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -25,7 +25,7 @@
 ]).
 
 %% emqx_connector_resource behaviour callbacks
--export([connector_config/1]).
+-export([connector_config/2]).
 
 -export([producer_converter/2, host_opts/0]).
 
@@ -326,7 +326,7 @@ values(producer) ->
 %% `emqx_connector_resource' API
 %%-------------------------------------------------------------------------------------------------
 
-connector_config(Config) ->
+connector_config(Config, _) ->
     %% Default port for AEH is 9093
     BootstrapHosts0 = maps:get(bootstrap_hosts, Config),
     BootstrapHosts = emqx_schema:parse_servers(

+ 1 - 1
apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_confluent, [
     {description, "EMQX Enterprise Confluent Connector and Action"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 2
apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl

@@ -24,7 +24,7 @@
 ]).
 
 %% emqx_connector_resource behaviour callbacks
--export([connector_config/1]).
+-export([connector_config/2]).
 
 -export([host_opts/0]).
 
@@ -288,7 +288,7 @@ values(action) ->
 %% `emqx_connector_resource' API
 %%-------------------------------------------------------------------------------------------------
 
-connector_config(Config) ->
+connector_config(Config, _) ->
     %% Default port for Confluent is 9092
     BootstrapHosts0 = maps:get(bootstrap_hosts, Config),
     BootstrapHosts = emqx_schema:parse_servers(

+ 2 - 11
apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl

@@ -136,17 +136,8 @@ fields("get_" ++ Type) ->
 fields("config_bridge_v2") ->
     fields("http_action");
 fields("config_connector") ->
-    [
-        {enable,
-            mk(
-                boolean(),
-                #{
-                    desc => <<"Enable or disable this connector">>,
-                    default => true
-                }
-            )},
-        {description, emqx_schema:description_schema()}
-    ] ++ connector_url_headers() ++
+    emqx_connector_schema:common_fields() ++
+        connector_url_headers() ++
         connector_opts() ++
         emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
 fields(connector_resource_opts) ->

+ 2 - 2
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -1,10 +1,10 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_iotdb, [
     {description, "EMQX Enterprise Apache IoTDB Bridge"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {modules, [
         emqx_bridge_iotdb,
-        emqx_bridge_iotdb_impl
+        emqx_bridge_iotdb_connector
     ]},
     {registered, []},
     {applications, [

+ 194 - 50
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 -module(emqx_bridge_iotdb).
 
@@ -8,7 +8,12 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1, ref/2, array/1]).
+
+-export([
+    bridge_v2_examples/1,
+    conn_bridge_examples/1
+]).
 
 %% hocon_schema API
 -export([
@@ -18,8 +23,8 @@
     desc/1
 ]).
 
-%% emqx_bridge_enterprise "unofficial" API
--export([conn_bridge_examples/1]).
+-define(CONNECTOR_TYPE, iotdb).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
 
 %%-------------------------------------------------------------------------------------------------
 %% `hocon_schema' API
@@ -29,24 +34,140 @@ namespace() -> "bridge_iotdb".
 
 roots() -> [].
 
-fields("config") ->
-    basic_config() ++ request_config();
-fields("post") ->
-    [
-        type_field(),
-        name_field()
-    ] ++ fields("config");
-fields("put") ->
-    fields("config");
-fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post");
-fields("creation_opts") ->
+%%-------------------------------------------------------------------------------------------------
+%% v2 schema
+%%-------------------------------------------------------------------------------------------------
+
+fields(action) ->
+    {iotdb,
+        mk(
+            hoconsc:map(name, ref(?MODULE, action_config)),
+            #{
+                desc => <<"IoTDB Action Config">>,
+                required => false
+            }
+        )};
+fields(action_config) ->
+    emqx_resource_schema:override(
+        emqx_bridge_v2_schema:make_producer_action_schema(
+            mk(
+                ref(?MODULE, action_parameters),
+                #{
+                    required => true, desc => ?DESC("action_parameters")
+                }
+            )
+        ),
+        [
+            {resource_opts,
+                mk(ref(?MODULE, action_resource_opts), #{
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, "resource_opts")
+                })}
+        ]
+    );
+fields(action_resource_opts) ->
     lists:filter(
         fun({K, _V}) ->
             not lists:member(K, unsupported_opts())
         end,
-        emqx_resource_schema:fields("creation_opts")
+        emqx_bridge_v2_schema:resource_opts_fields()
     );
+fields(action_parameters) ->
+    [
+        {is_aligned,
+            mk(
+                boolean(),
+                #{
+                    desc => ?DESC("config_is_aligned"),
+                    default => false
+                }
+            )},
+        {device_id,
+            mk(
+                binary(),
+                #{
+                    desc => ?DESC("config_device_id")
+                }
+            )},
+        {iotdb_version,
+            mk(
+                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
+                #{
+                    desc => ?DESC("config_iotdb_version"),
+                    default => ?VSN_1_1_X
+                }
+            )},
+        {data,
+            mk(
+                array(ref(?MODULE, action_parameters_data)),
+                #{
+                    desc => ?DESC("action_parameters_data")
+                }
+            )}
+    ] ++
+        proplists_without(
+            [path, method, body, headers, request_timeout],
+            emqx_bridge_http_schema:fields("parameters_opts")
+        );
+fields(action_parameters_data) ->
+    [
+        {timestamp,
+            mk(
+                binary(),
+                #{
+                    desc => ?DESC("config_parameters_timestamp"),
+                    default => <<"now">>
+                }
+            )},
+        {measurement,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_measurement")
+                }
+            )},
+        {data_type,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_data_type"),
+                    validator => fun(Type) ->
+                        lists:member(Type, [
+                            <<"TEXT">>,
+                            <<"BOOLEAN">>,
+                            <<"INT32">>,
+                            <<"INT64">>,
+                            <<"FLOAT">>,
+                            <<"DOUBLE">>
+                        ])
+                    end
+                }
+            )},
+        {value,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_value")
+                }
+            )}
+    ];
+fields("post_bridge_v2") ->
+    emqx_bridge_schema:type_and_name_fields(enum([iotdb])) ++ fields(action_config);
+fields("put_bridge_v2") ->
+    fields(action_config);
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
+%%-------------------------------------------------------------------------------------------------
+%% v1 schema
+%%-------------------------------------------------------------------------------------------------
+
+fields("config") ->
+    basic_config() ++ request_config();
+fields("creation_opts") ->
+    proplists_without(unsupported_opts(), emqx_resource_schema:fields("creation_opts"));
 fields(auth_basic) ->
     [
         {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
@@ -55,22 +176,32 @@ fields(auth_basic) ->
                 required => true,
                 desc => ?DESC("config_auth_basic_password")
             })}
-    ].
+    ];
+fields("post") ->
+    emqx_bridge_schema:type_and_name_fields(enum([iotdb])) ++ fields("config");
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
 
 desc("config") ->
     ?DESC("desc_config");
+desc(action_config) ->
+    ?DESC("desc_config");
+desc(action_parameters) ->
+    ?DESC("action_parameters");
+desc(action_parameters_data) ->
+    ?DESC("action_parameters_data");
+desc(action_resource_opts) ->
+    "Action Resource Options";
 desc("creation_opts") ->
-    ?DESC(emqx_resource_schema, "creation_opts");
-desc("post") ->
-    ["Configuration for IoTDB using `POST` method."];
-desc(Name) ->
-    lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
-    ?DESC(Name).
-
-struct_names() ->
-    [
-        auth_basic
-    ].
+    "Creation Options";
+desc(auth_basic) ->
+    "Basic Authentication";
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
 
 basic_config() ->
     [
@@ -160,30 +291,43 @@ unsupported_opts() ->
         batch_time
     ].
 
-%%======================================================================================
-
-type_field() ->
-    {type,
-        mk(
-            hoconsc:enum([iotdb]),
-            #{
-                required => true,
-                desc => ?DESC("desc_type")
-            }
-        )}.
+%%-------------------------------------------------------------------------------------------------
+%% v2 examples
+%%-------------------------------------------------------------------------------------------------
 
-name_field() ->
-    {name,
-        mk(
-            binary(),
-            #{
-                required => true,
-                desc => ?DESC("desc_name")
-            }
-        )}.
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"iotdb">> =>
+                #{
+                    summary => <<"Apache IoTDB Bridge">>,
+                    value => emqx_bridge_v2_schema:action_values(
+                        Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
+                    )
+                }
+        }
+    ].
 
-%%======================================================================================
+action_values() ->
+    #{
+        parameters => #{
+            data => [
+                #{
+                    timestamp => now,
+                    measurement => <<"status">>,
+                    data_type => <<"BOOLEAN">>,
+                    value => <<"${st}">>
+                }
+            ],
+            is_aligned => false,
+            device_id => <<"my_device">>,
+            iotdb_version => ?VSN_1_1_X
+        }
+    }.
 
+%%-------------------------------------------------------------------------------------------------
+%% v1 examples
+%%-------------------------------------------------------------------------------------------------
 conn_bridge_examples(Method) ->
     [
         #{

+ 71 - 0
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl

@@ -0,0 +1,71 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_iotdb_action_info).
+
+-behaviour(emqx_action_info).
+
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
+%% behaviour callbacks
+-export([
+    action_type_name/0,
+    bridge_v1_config_to_action_config/2,
+    bridge_v1_config_to_connector_config/1,
+    bridge_v1_type_name/0,
+    connector_action_config_to_bridge_v1_config/2,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-import(emqx_utils_conv, [bin/1]).
+
+-define(ACTION_TYPE, iotdb).
+-define(SCHEMA_MODULE, emqx_bridge_iotdb).
+
+action_type_name() -> ?ACTION_TYPE.
+bridge_v1_type_name() -> ?ACTION_TYPE.
+connector_type_name() -> ?ACTION_TYPE.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    MergedConfig =
+        emqx_utils_maps:deep_merge(
+            maps:without(
+                [<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>],
+                emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
+            ),
+            ConnectorConfig
+        ),
+    BridgeV1Keys = schema_keys("config"),
+    maps:with(BridgeV1Keys, MergedConfig).
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(action_config),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
+        ActionConfig#{<<"connector">> => ConnectorName}
+    ).
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ConnectorKeys = schema_keys(emqx_bridge_iotdb_connector, config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun emqx_connector_schema:project_to_connector_resource_opts/1,
+        maps:with(ConnectorKeys, BridgeV1Config)
+    ).
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config#{<<"data">> => []}),
+    emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
+
+schema_keys(Name) ->
+    schema_keys(?SCHEMA_MODULE, Name).
+
+schema_keys(Mod, Name) ->
+    [bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))].

+ 285 - 54
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl

@@ -1,10 +1,13 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_bridge_iotdb_impl).
+-module(emqx_bridge_iotdb_connector).
+
+-behaviour(emqx_resource).
 
 -include("emqx_bridge_iotdb.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% `emqx_resource' API
@@ -14,9 +17,25 @@
     on_stop/2,
     on_get_status/2,
     on_query/3,
-    on_query_async/4
+    on_query_async/4,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1,
+    connector_examples/1,
+    connector_example_values/0
 ]).
 
+%% emqx_connector_resource behaviour callbacks
+-export([connector_config/2]).
+
 -type config() ::
     #{
         base_url := #{
@@ -29,33 +48,142 @@
         pool_type := random | hash,
         pool_size := pos_integer(),
         request => undefined | map(),
-        is_aligned => boolean(),
-        iotdb_version => binary(),
-        device_id => binary() | undefined,
         atom() => _
     }.
 
 -type state() ::
     #{
         base_path := _,
-        base_url := #{
-            scheme := http | https,
-            host := iolist(),
-            port := inet:port_number(),
-            path := _
-        },
         connect_timeout := pos_integer(),
         pool_type := random | hash,
-        pool_size := pos_integer(),
+        channels := map(),
         request => undefined | map(),
-        is_aligned => boolean(),
-        iotdb_version => binary(),
-        device_id => binary() | undefined,
         atom() => _
     }.
 
 -type manager_id() :: binary().
 
+-define(CONNECTOR_TYPE, iotdb).
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+%%-------------------------------------------------------------------------------------
+%% connector examples
+%%-------------------------------------------------------------------------------------
+connector_examples(Method) ->
+    [
+        #{
+            <<"iotdb">> =>
+                #{
+                    summary => <<"Apache IoTDB Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?CONNECTOR_TYPE, connector_example_values()
+                    )
+                }
+        }
+    ].
+
+connector_example_values() ->
+    #{
+        name => <<"iotdb_connector">>,
+        type => iotdb,
+        enable => true,
+        authentication => #{
+            <<"username">> => <<"root">>,
+            <<"password">> => <<"*****">>
+        },
+        base_url => <<"http://iotdb.local:18080/">>,
+        connect_timeout => <<"15s">>,
+        pool_type => <<"random">>,
+        pool_size => 8,
+        enable_pipelining => 100,
+        ssl => #{enable => false}
+    }.
+
+%%-------------------------------------------------------------------------------------
+%% schema
+%%-------------------------------------------------------------------------------------
+namespace() -> "iotdb".
+
+roots() ->
+    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+
+fields(config) ->
+    proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++
+        fields("connection_fields");
+fields("connection_fields") ->
+    [
+        {base_url,
+            mk(
+                emqx_schema:url(),
+                #{
+                    required => true,
+                    desc => ?DESC(emqx_bridge_iotdb, "config_base_url")
+                }
+            )},
+        {authentication,
+            mk(
+                hoconsc:union([ref(?MODULE, auth_basic)]),
+                #{
+                    default => auth_basic, desc => ?DESC("config_authentication")
+                }
+            )}
+    ];
+fields(auth_basic) ->
+    [
+        {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
+        {password,
+            emqx_schema_secret:mk(#{
+                required => true,
+                desc => ?DESC("config_auth_basic_password")
+            })}
+    ];
+fields("post") ->
+    emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config);
+fields("put") ->
+    fields(config);
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+desc(config) ->
+    ?DESC("desc_config");
+desc(auth_basic) ->
+    "Basic Authentication";
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
+    #{
+        base_url := BaseUrl,
+        authentication :=
+            #{
+                username := Username,
+                password := Password0
+            }
+    } = Conf,
+
+    Password = emqx_secret:unwrap(Password0),
+    BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
+
+    WebhookConfig =
+        Conf#{
+            url => BaseUrl,
+            headers => [
+                {<<"Content-type">>, <<"application/json">>},
+                {<<"Authorization">>, BasicToken}
+            ]
+        },
+    ParseConfs(
+        <<"http">>,
+        Name,
+        WebhookConfig
+    ).
+
+proplists_without(Keys, List) ->
+    [El || El = {K, _} <- List, not lists:member(K, Keys)].
+
 %%-------------------------------------------------------------------------------------
 %% `emqx_resource' API
 %%-------------------------------------------------------------------------------------
@@ -73,7 +201,7 @@ on_start(InstanceId, Config) ->
                 request => maps:get(request, State, <<>>)
             }),
             ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
-            {ok, maps:merge(Config, State)};
+            {ok, State#{channels => #{}}};
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_iotdb_bridge",
@@ -103,19 +231,20 @@ on_get_status(InstanceId, State) ->
     {ok, pos_integer(), [term()], term()}
     | {ok, pos_integer(), [term()]}
     | {error, term()}.
-on_query(InstanceId, {send_message, Message}, State) ->
+on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) ->
     ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_called",
         instance_id => InstanceId,
-        send_message => Message,
+        send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case make_iotdb_insert_request(Message, State) of
+
+    case try_render_message(Req, Channels) of
         {ok, IoTDBPayload} ->
             handle_response(
                 emqx_bridge_http_connector:on_query(
-                    InstanceId, {send_message, IoTDBPayload}, State
+                    InstanceId, {ChannelId, IoTDBPayload}, State
                 )
             );
         Error ->
@@ -124,15 +253,17 @@ on_query(InstanceId, {send_message, Message}, State) ->
 
 -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
     {ok, pid()} | {error, empty_request}.
-on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
+on_query_async(
+    InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State
+) ->
     ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_async_called",
         instance_id => InstanceId,
-        send_message => Message,
+        send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case make_iotdb_insert_request(Message, State) of
+    case try_render_message(Req, Channels) of
         {ok, IoTDBPayload} ->
             ReplyFunAndArgs =
                 {
@@ -143,12 +274,71 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
                     []
                 },
             emqx_bridge_http_connector:on_query_async(
-                InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
+                InstanceId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State
             );
         Error ->
             Error
     end.
 
+on_add_channel(
+    InstanceId,
+    #{channels := Channels} = OldState0,
+    ChannelId,
+    #{
+        parameters := #{iotdb_version := Version, data := Data} = Parameter
+    }
+) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            {error, already_exists};
+        _ ->
+            %% update HTTP channel
+            InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
+            InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
+
+            Path =
+                case Version of
+                    ?VSN_1_1_X -> InsertTabletPathV2;
+                    _ -> InsertTabletPathV1
+                end,
+
+            HTTPReq = #{
+                parameters => Parameter#{
+                    path => Path,
+                    method => <<"post">>
+                }
+            },
+
+            {ok, OldState} = emqx_bridge_http_connector:on_add_channel(
+                InstanceId, OldState0, ChannelId, HTTPReq
+            ),
+
+            %% update IoTDB channel
+            DeviceId = maps:get(device_id, Parameter, <<>>),
+            Channel = Parameter#{
+                device_id => emqx_placeholder:preproc_tmpl(DeviceId),
+                data := preproc_data_template(Data)
+            },
+            Channels2 = Channels#{ChannelId => Channel},
+            {ok, OldState#{channels := Channels2}}
+    end.
+
+on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
+    {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
+    Channels2 = maps:remove(ChannelId, Channels),
+    {ok, OldState#{channels => Channels2}}.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
+on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            connected;
+        _ ->
+            {error, not_exists}
+    end.
+
 %%--------------------------------------------------------------------
 %% Internal Functions
 %%--------------------------------------------------------------------
@@ -238,14 +428,14 @@ iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
 iot_timestamp(TimestampTkn, Msg, Nows) ->
     iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
 
+iot_timestamp(<<"now_us">>, #{now_us := NowUs}) ->
+    NowUs;
+iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) ->
+    NowNs;
 iot_timestamp(Timestamp, #{now_ms := NowMs}) when
     Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
 ->
     NowMs;
-iot_timestamp(Timestamp, #{now_us := NowUs}) when Timestamp =:= <<"now_us">> ->
-    NowUs;
-iot_timestamp(Timestamp, #{now_ns := NowNs}) when Timestamp =:= <<"now_ns">> ->
-    NowNs;
 iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
     binary_to_integer(Timestamp).
 
@@ -304,25 +494,14 @@ convert_float(Str) when is_binary(Str) ->
 convert_float(undefined) ->
     null.
 
-make_iotdb_insert_request(Message, State) ->
-    Payloads = to_list(parse_payload(get_payload(Message))),
-    IsAligned = maps:get(is_aligned, State, false),
-    IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
-    case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of
-        {undefined, _} ->
-            {error, device_id_missing};
-        {_, []} ->
-            {error, invalid_data};
-        {DeviceId, PreProcessedData} ->
-            DataList = proc_data(PreProcessedData, Message),
-            InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
-            Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
-            {ok,
-                maps:merge(Rows, #{
-                    iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
-                    iotdb_field_key(device_id, IotDBVsn) => DeviceId
-                })}
-    end.
+make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) ->
+    InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
+    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
+    {ok,
+        maps:merge(Rows, #{
+            iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
+            iotdb_field_key(device_id, IotDBVsn) => DeviceId
+        })}.
 
 replace_dtypes(Rows0, IotDBVsn) ->
     {Types, Rows} = maps:take(dtypes, Rows0),
@@ -404,13 +583,12 @@ iotdb_field_key(data_types, ?VSN_0_13_X) ->
 to_list(List) when is_list(List) -> List;
 to_list(Data) -> [Data].
 
-device_id(Message, Payloads, State) ->
-    case maps:get(device_id, State, undefined) of
-        undefined ->
-            %% [FIXME] there could be conflicting device-ids in the Payloads
+%% If device_id is missing from the channel data, try to find it from the payload
+device_id(Message, Payloads, Channel) ->
+    case maps:get(device_id, Channel, []) of
+        [] ->
             maps:get(<<"device_id">>, hd(Payloads), undefined);
-        DeviceId ->
-            DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId),
+        DeviceIdTkn ->
             emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
     end.
 
@@ -430,3 +608,56 @@ eval_response_body(Body, Resp) ->
         #{<<"code">> := 200} -> Resp;
         Reason -> {error, Reason}
     end.
+
+preproc_data_template(DataList) ->
+    lists:map(
+        fun(
+            #{
+                timestamp := Timestamp,
+                measurement := Measurement,
+                data_type := DataType,
+                value := Value
+            }
+        ) ->
+            #{
+                timestamp => emqx_placeholder:preproc_tmpl(Timestamp),
+                measurement => emqx_placeholder:preproc_tmpl(Measurement),
+                data_type => DataType,
+                value => emqx_placeholder:preproc_tmpl(Value)
+            }
+        end,
+        DataList
+    ).
+
+try_render_message({ChannelId, Msg}, Channels) ->
+    case maps:find(ChannelId, Channels) of
+        {ok, Channel} ->
+            render_channel_message(Channel, Msg);
+        _ ->
+            {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
+    end.
+
+render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) ->
+    Payloads = to_list(parse_payload(get_payload(Message))),
+    case device_id(Message, Payloads, Channel) of
+        undefined ->
+            {error, device_id_missing};
+        DeviceId ->
+            case get_data_template(Channel, Payloads) of
+                [] ->
+                    {error, invalid_data};
+                DataTemplate ->
+                    DataList = proc_data(DataTemplate, Message),
+
+                    make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn)
+            end
+    end.
+
+%% Get the message template.
+%% In order to be compatible with 4.4, the template version has higher priority
+%% This is a template, using it
+get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
+    Data;
+%% This is a self-describing message
+get_data_template(#{data := []}, Payloads) ->
+    preproc_data_list(Payloads).

+ 178 - 28
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -32,10 +32,10 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
-    emqx_bridge_testlib:init_per_suite(Config, ?APPS).
+    emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS).
 
 end_per_suite(Config) ->
-    emqx_bridge_testlib:end_per_suite(Config).
+    emqx_bridge_v2_testlib:end_per_suite(Config).
 
 init_per_group(plain = Type, Config0) ->
     Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"),
@@ -43,7 +43,7 @@ init_per_group(plain = Type, Config0) ->
     ProxyName = "iotdb",
     case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
-            Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
+            Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
@@ -66,7 +66,7 @@ init_per_group(legacy = Type, Config0) ->
     ProxyName = "iotdb013",
     case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
-            Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
+            Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
@@ -90,18 +90,34 @@ end_per_group(Group, Config) when
     Group =:= plain;
     Group =:= legacy
 ->
-    emqx_bridge_testlib:end_per_group(Config),
+    emqx_bridge_v2_testlib:end_per_group(Config),
     ok;
 end_per_group(_Group, _Config) ->
     ok.
 
 init_per_testcase(TestCase, Config0) ->
-    Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3),
+    Type = ?config(bridge_type, Config0),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = <<
+        (atom_to_binary(TestCase))/binary, UniqueNum/binary
+    >>,
+    {_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
+    {_, ActionConfig} = action_config(Name, Config0),
+    Config = [
+        {connector_type, Type},
+        {connector_name, Name},
+        {connector_config, ConnectorConfig},
+        {bridge_type, Type},
+        {bridge_name, Name},
+        {bridge_config, ActionConfig}
+        | Config0
+    ],
     iotdb_reset(Config),
+    ok = snabbkaffe:start_trace(),
     Config.
 
 end_per_testcase(TestCase, Config) ->
-    emqx_bridge_testlib:end_per_testcase(TestCase, Config).
+    emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config).
 
 %%------------------------------------------------------------------------------
 %% Helper fns
@@ -114,7 +130,7 @@ iotdb_server_url(Host, Port) ->
         integer_to_binary(Port)
     ]).
 
-bridge_config(TestCase, _TestGroup, Config) ->
+bridge_config(TestCase, Config) ->
     UniqueNum = integer_to_binary(erlang:unique_integer()),
     Host = ?config(bridge_host, Config),
     Port = ?config(bridge_port, Config),
@@ -149,7 +165,7 @@ bridge_config(TestCase, _TestGroup, Config) ->
                 Version
             ]
         ),
-    {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}.
+    {Name, ConfigString, emqx_bridge_v2_testlib:parse_and_check(Type, Name, ConfigString)}.
 
 make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
     #{
@@ -201,7 +217,7 @@ iotdb_request(Config, Path, Body, Opts) ->
                 <<"password">> := Password
             }
         } =
-        ?config(bridge_config, Config),
+        ?config(connector_config, Config),
     ct:pal("bridge config: ~p", [_BridgeConfig]),
     URL = <<BaseURL/binary, Path/binary>>,
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
@@ -238,6 +254,76 @@ is_error_check(Reason) ->
         ?assertEqual({error, Reason}, Result)
     end.
 
+action_config(Name, Config) ->
+    Version = ?config(iotdb_version, Config),
+    Type = ?config(bridge_type, Config),
+    ConfigString =
+        io_lib:format(
+            "actions.~s.~s {\n"
+            "  enable = true\n"
+            "  connector = \"~s\"\n"
+            "  parameters = {\n"
+            "     iotdb_version = \"~s\"\n"
+            "     data = []\n"
+            "  }\n"
+            "}\n",
+            [
+                Type,
+                Name,
+                Name,
+                Version
+            ]
+        ),
+    ct:pal("ActionConfig:~ts~n", [ConfigString]),
+    {ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
+
+connector_config(Name, Config) ->
+    Host = ?config(bridge_host, Config),
+    Port = ?config(bridge_port, Config),
+    Type = ?config(bridge_type, Config),
+    ServerURL = iotdb_server_url(Host, Port),
+    ConfigString =
+        io_lib:format(
+            "connectors.~s.~s {\n"
+            "  enable = true\n"
+            "  base_url = \"~s\"\n"
+            "  authentication = {\n"
+            "     username = \"root\"\n"
+            "     password = \"root\"\n"
+            "  }\n"
+            "}\n",
+            [
+                Type,
+                Name,
+                ServerURL
+            ]
+        ),
+    ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
+    {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
+
+parse_action_and_check(ConfigString, BridgeType, Name) ->
+    parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
+
+parse_connector_and_check(ConfigString, ConnectorType, Name) ->
+    parse_and_check(
+        ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
+    ).
+%%    emqx_utils_maps:safe_atom_key_map(Config).
+
+parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
+    Type = to_bin(Type0),
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
+    #{RootKey := #{Type := #{Name := Config}}} = RawConf,
+    Config.
+
+to_bin(List) when is_list(List) ->
+    unicode:characters_to_binary(List, utf8);
+to_bin(Atom) when is_atom(Atom) ->
+    erlang:atom_to_binary(Atom);
+to_bin(Bin) when is_binary(Bin) ->
+    Bin.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -246,7 +332,7 @@ t_sync_query_simple(Config) ->
     DeviceId = iotdb_device(Config),
     Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
     MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
-    ok = emqx_bridge_testlib:t_sync_query(
+    ok = emqx_bridge_v2_testlib:t_sync_query(
         Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
     ),
     Query = <<"select temp from ", DeviceId/binary>>,
@@ -260,7 +346,7 @@ t_async_query(Config) ->
     DeviceId = iotdb_device(Config),
     Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
     MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
-    ok = emqx_bridge_testlib:t_async_query(
+    ok = emqx_bridge_v2_testlib:t_async_query(
         Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
     ),
     Query = <<"select temp from ", DeviceId/binary>>,
@@ -310,7 +396,7 @@ t_sync_query_aggregated(Config) ->
         make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
     ],
     MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
-    ok = emqx_bridge_testlib:t_sync_query(
+    ok = emqx_bridge_v2_testlib:t_sync_query(
         Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
     ),
 
@@ -369,10 +455,12 @@ t_sync_query_fail(Config) ->
         fun(Result) ->
             ?assertEqual(error, element(1, Result))
         end,
-    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query).
+    emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
+    ).
 
 t_sync_device_id_missing(Config) ->
-    emqx_bridge_testlib:t_sync_query(
+    emqx_bridge_v2_testlib:t_sync_query(
         Config,
         make_message_fun(iotdb_topic(Config), #{foo => bar}),
         is_error_check(device_id_missing),
@@ -387,7 +475,7 @@ t_extract_device_id_from_rule_engine_message(Config) ->
     Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
     ?check_trace(
         begin
-            {ok, _} = emqx_bridge_testlib:create_bridge(Config),
+            {ok, _} = emqx_bridge_v2_testlib:create_bridge(Config),
             SQL = <<
                 "SELECT\n"
                 "  payload.measurement, payload.data_type, payload.value, payload.device_id\n"
@@ -397,7 +485,7 @@ t_extract_device_id_from_rule_engine_message(Config) ->
                 "\""
             >>,
             Opts = #{sql => SQL},
-            {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
                 BridgeType, RuleTopic, Config, Opts
             ),
             emqx:publish(Message),
@@ -415,7 +503,7 @@ t_extract_device_id_from_rule_engine_message(Config) ->
     ok.
 
 t_sync_invalid_data(Config) ->
-    emqx_bridge_testlib:t_sync_query(
+    emqx_bridge_v2_testlib:t_sync_query(
         Config,
         make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
         is_error_check(invalid_data),
@@ -423,7 +511,7 @@ t_sync_invalid_data(Config) ->
     ).
 
 t_async_device_id_missing(Config) ->
-    emqx_bridge_testlib:t_async_query(
+    emqx_bridge_v2_testlib:t_async_query(
         Config,
         make_message_fun(iotdb_topic(Config), #{foo => bar}),
         is_error_check(device_id_missing),
@@ -431,7 +519,7 @@ t_async_device_id_missing(Config) ->
     ).
 
 t_async_invalid_data(Config) ->
-    emqx_bridge_testlib:t_async_query(
+    emqx_bridge_v2_testlib:t_async_query(
         Config,
         make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
         is_error_check(invalid_data),
@@ -439,18 +527,19 @@ t_async_invalid_data(Config) ->
     ).
 
 t_create_via_http(Config) ->
-    emqx_bridge_testlib:t_create_via_http(Config).
+    emqx_bridge_v2_testlib:t_create_via_http(Config).
 
 t_start_stop(Config) ->
-    emqx_bridge_testlib:t_start_stop(Config, iotdb_bridge_stopped).
+    emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped).
 
 t_on_get_status(Config) ->
-    emqx_bridge_testlib:t_on_get_status(Config).
+    emqx_bridge_v2_testlib:t_on_get_status(Config).
 
 t_device_id(Config) ->
-    ResourceId = emqx_bridge_testlib:resource_id(Config),
     %% Create without device_id configured
-    ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)),
+    ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
+    BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
     ?retry(
         _Sleep = 1_000,
         _Attempts = 20,
@@ -463,9 +552,11 @@ t_device_id(Config) ->
     iotdb_reset(Config, ConfiguredDevice),
     Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
     MessageF1 = make_message_fun(Topic, Payload1),
+
     is_success_check(
-        emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
+        emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
     ),
+
     {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
     ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
     #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
@@ -476,10 +567,12 @@ t_device_id(Config) ->
 
     %% reconfigure bridge with device_id
     {ok, _} =
-        emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}),
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{
+            <<"parameters">> => #{<<"device_id">> => ConfiguredDevice}
+        }),
 
     is_success_check(
-        emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
+        emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
     ),
 
     %% even though we had a device_id in the message it's not being used
@@ -496,6 +589,63 @@ t_device_id(Config) ->
     iotdb_reset(Config, ConfiguredDevice),
     ok.
 
+t_template(Config) ->
+    %% Create without data  configured
+    ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
+    BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    TemplateDeviceId = <<"root.deviceWithTemplate">>,
+    DeviceId = <<"root.deviceWithoutTemplate">>,
+    Topic = <<"some/random/topic">>,
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, TemplateDeviceId),
+    Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
+    MessageF1 = make_message_fun(Topic, Payload1),
+
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
+    ),
+
+    {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
+    ?assertMatch(#{<<"values">> := [[true]]}, emqx_utils_json:decode(Res1_1)),
+
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, TemplateDeviceId),
+
+    %% reconfigure with data template
+    {ok, _} =
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{
+            <<"parameters">> => #{
+                <<"device_id">> => TemplateDeviceId,
+                <<"data">> => [
+                    #{
+                        <<"measurement">> => <<"${payload.measurement}">>,
+                        <<"data_type">> => "TEXT",
+                        <<"value">> => <<"${payload.device_id}">>
+                    }
+                ]
+            }
+        }),
+
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
+    ),
+
+    {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
+        Config, <<"select * from ", TemplateDeviceId/binary>>
+    ),
+
+    ?assertMatch(#{<<"values">> := [[<<DeviceId/binary>>]]}, emqx_utils_json:decode(Res2_2)),
+
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, TemplateDeviceId),
+    ok.
+
 is_empty(null) -> true;
 is_empty([]) -> true;
 is_empty([[]]) -> true;

+ 10 - 46
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -302,54 +302,18 @@ parse_confs(
                 method => undefined
             }
     };
-parse_confs(<<"iotdb">>, Name, Conf) ->
-    %% [FIXME] this has no place here, it's used in parse_confs/3, which should
-    %% rather delegate to a behavior callback than implementing domain knowledge
-    %% here (reversed dependency)
-    InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
-    InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
-    #{
-        base_url := BaseURL,
-        authentication :=
-            #{
-                username := Username,
-                password := Password
-            }
-    } = Conf,
-    BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
-    %% This version atom correspond to the macro ?VSN_1_1_X in
-    %% emqx_connector_iotdb.hrl. It would be better to use the macro directly, but
-    %% this cannot be done without introducing a dependency on the
-    %% emqx_iotdb_connector app (which is an EE app).
-    DefaultIOTDBConnector = 'v1.1.x',
-    Version = maps:get(iotdb_version, Conf, DefaultIOTDBConnector),
-    InsertTabletPath =
-        case Version of
-            DefaultIOTDBConnector -> InsertTabletPathV2;
-            _ -> InsertTabletPathV1
-        end,
-    WebhookConfig =
-        Conf#{
-            method => <<"post">>,
-            url => <<BaseURL/binary, InsertTabletPath/binary>>,
-            headers => [
-                {<<"Content-type">>, <<"application/json">>},
-                {<<"Authorization">>, BasicToken}
-            ]
-        },
-    parse_confs(
-        <<"webhook">>,
-        Name,
-        WebhookConfig
-    );
-parse_confs(ConnectorType, _Name, Config) ->
-    connector_config(ConnectorType, Config).
-
-connector_config(ConnectorType, Config) ->
+parse_confs(ConnectorType, Name, Config) ->
+    connector_config(ConnectorType, Name, Config).
+
+connector_config(ConnectorType, Name, Config) ->
     Mod = connector_impl_module(ConnectorType),
-    case erlang:function_exported(Mod, connector_config, 1) of
+    case erlang:function_exported(Mod, connector_config, 2) of
         true ->
-            Mod:connector_config(Config);
+            Mod:connector_config(Config, #{
+                type => ConnectorType,
+                name => Name,
+                parse_confs => fun parse_confs/3
+            });
         false ->
             Config
     end.

+ 16 - 2
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -48,6 +48,8 @@ resource_type(timescale) ->
     emqx_postgresql;
 resource_type(redis) ->
     emqx_bridge_redis_connector;
+resource_type(iotdb) ->
+    emqx_bridge_iotdb_connector;
 resource_type(Type) ->
     error({unknown_connector_type, Type}).
 
@@ -58,6 +60,8 @@ connector_impl_module(azure_event_hub_producer) ->
     emqx_bridge_azure_event_hub;
 connector_impl_module(confluent_producer) ->
     emqx_bridge_confluent_producer;
+connector_impl_module(iotdb) ->
+    emqx_bridge_iotdb_connector;
 connector_impl_module(_ConnectorType) ->
     undefined.
 
@@ -169,6 +173,14 @@ connector_structs() ->
                     desc => <<"Timescale Connector Config">>,
                     required => false
                 }
+            )},
+        {iotdb,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_iotdb_connector, config)),
+                #{
+                    desc => <<"IoTDB Connector Config">>,
+                    required => false
+                }
             )}
     ].
 
@@ -186,7 +198,8 @@ schema_modules() ->
         emqx_bridge_syskeeper_proxy,
         emqx_bridge_timescale,
         emqx_postgresql_connector_schema,
-        emqx_bridge_redis_schema
+        emqx_bridge_redis_schema,
+        emqx_bridge_iotdb_connector
     ].
 
 api_schemas(Method) ->
@@ -213,7 +226,8 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
         api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
         api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
-        api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector")
+        api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method)
     ].
 
 api_ref(Module, Type, Method) ->

+ 3 - 1
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -145,7 +145,9 @@ connector_type_to_bridge_types(syskeeper_forwarder) ->
 connector_type_to_bridge_types(syskeeper_proxy) ->
     [];
 connector_type_to_bridge_types(timescale) ->
-    [timescale].
+    [timescale];
+connector_type_to_bridge_types(iotdb) ->
+    [iotdb].
 
 actions_config_name() -> <<"actions">>.
 

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.26"},
+    {vsn, "0.1.27"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -23,7 +23,7 @@
 
 -export([namespace/0, roots/0, fields/1, desc/1]).
 
--export([create_opts/1, resource_opts_meta/0]).
+-export([create_opts/1, resource_opts_meta/0, override/2]).
 
 %% range interval in ms
 -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1).

+ 1 - 0
changes/feat-12261.en.md

@@ -0,0 +1 @@
+The bridges for IoTDB have been split so it is available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.

+ 48 - 0
rel/i18n/emqx_bridge_iotdb.hocon

@@ -70,4 +70,52 @@ desc_name.desc:
 
 desc_name.label:
 """Bridge Name"""
+
+config_parameters_timestamp.desc:
+"""Timestamp. Placeholders in format of ${var} is supported, the final value can be:</br>
+- now: use the `now_ms` which is contained in the payload as timestamp
+- now_ms: same as above
+- now_us: use the `now_us` which is contained in the payload as timestamp
+- now_ns: use the `now_ns` which is contained in the payload as timestamp
+- any other: use the value directly as the timestamp"""
+
+config_parameters_timestamp.label:
+"""Timestamp"""
+
+config_parameters_measurement.desc:
+"""Measurement. Placeholders in format of ${var} is supported"""
+
+config_parameters_measurement.label:
+"""Measurement"""
+
+config_parameters_data_type.desc:
+"""Data Type, can be:</br>
+- TEXT
+- BOOLEAN
+- INT32
+- INT64
+- FLOAT
+- DOUBLE"""
+
+config_parameters_data_type.label:
+"""Data type"""
+
+config_parameters_value.desc:
+"""Value. Placeholders in format of ${var} is supported"""
+
+config_parameters_value.label:
+"""Value"""
+
+action_parameters_data.desc:
+"""IoTDB action parameter data"""
+
+action_parameters_data.label:
+"""Parameter Data"""
+
+action_parameters.desc:
+"""IoTDB action parameters"""
+
+action_parameters.label:
+"""Parameters"""
+
 }

+ 44 - 0
rel/i18n/emqx_bridge_iotdb_connector.hocon

@@ -0,0 +1,44 @@
+emqx_bridge_iotdb_connector {
+
+config_authentication.desc:
+"""Authentication configuration"""
+
+config_authentication.label:
+"""Authentication"""
+
+auth_basic.desc:
+"""Parameters for basic authentication."""
+
+auth_basic.label:
+"""Basic auth params"""
+
+config_auth_basic_username.desc:
+"""The username as configured at the IoTDB REST interface"""
+
+config_auth_basic_username.label:
+  """HTTP Basic Auth Username"""
+
+config_auth_basic_password.desc:
+"""The password as configured at the IoTDB REST interface"""
+
+config_auth_basic_password.label:
+"""HTTP Basic Auth Password"""
+
+config_base_url.desc:
+"""The base URL of the external IoTDB service's REST interface."""
+config_base_url.label:
+"""IoTDB REST Service Base URL"""
+
+config_max_retries.desc:
+"""HTTP request max retry times if failed."""
+
+config_max_retries.label:
+"""HTTP Request Max Retries"""
+
+desc_config.desc:
+"""Configuration for Apache IoTDB bridge."""
+
+desc_config.label:
+"""IoTDB Bridge Configuration"""
+
+}

+ 2 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -295,3 +295,5 @@ upstream
 priv
 Syskeeper
 msacc
+now_us
+ns