Explorar o código

feat(iotdb): introduced a thrift driver for IotDB connector

firest hai 1 ano
pai
achega
75b5d40073

+ 30 - 0
.ci/docker-compose-file/docker-compose-iotdb.yaml

@@ -88,3 +88,33 @@ services:
     #     - "18080:18080"
     networks:
       - emqx_bridge
+
+  iotdb-thrift:
+    container_name: iotdb-thrift
+    hostname: iotdb-thrift
+    image: apache/iotdb:1.3.0-standalone
+    restart: always
+    environment:
+      - enable_rest_service=true
+      - cn_internal_address=iotdb-thrift
+      - cn_internal_port=10710
+      - cn_consensus_port=10720
+      - cn_seed_config_node=iotdb-thrift:10710
+      - dn_rpc_address=iotdb-thrift
+      - dn_internal_address=iotdb-thrift
+      - dn_rpc_port=6667
+      - dn_mpp_data_exchange_port=10740
+      - dn_schema_region_consensus_port=10750
+      - dn_data_region_consensus_port=10760
+      - dn_seed_config_node=iotdb-thrift:10710
+    # volumes:
+    #     - ./data:/iotdb/data
+    #     - ./logs:/iotdb/logs
+    expose:
+      - "18080"
+      - "6667"
+    # IoTDB's REST interface, uncomment for local testing
+    # ports:
+    #     - "18080:18080"
+    networks:
+      - emqx_bridge

+ 12 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -156,6 +156,18 @@
     "upstream": "iotdb013:18080",
     "enabled": true
   },
+  {
+    "name": "iotdb_thrift",
+    "listen": "0.0.0.0:46667",
+    "upstream": "iotdb-thrift:6667",
+    "enabled": true
+  },
+  {
+    "name": "iotdb_thrift_rest",
+    "listen": "0.0.0.0:48080",
+    "upstream": "iotdb-thrift:18080",
+    "enabled": true
+  },
   {
     "name": "minio_tcp",
     "listen": "0.0.0.0:19000",

+ 7 - 4
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -986,6 +986,8 @@ t_consume(Config, Opts) ->
     ok.
 
 t_create_via_http(Config) ->
+    t_create_via_http(Config, false).
+t_create_via_http(Config, IsOnlyV2) ->
     ?check_trace(
         begin
             ?assertMatch({ok, _}, create_bridge_api(Config)),
@@ -998,10 +1000,11 @@ t_create_via_http(Config) ->
             ),
 
             %% check that v1 list API is fine
-            ?assertMatch(
-                {ok, {{_, 200, _}, _, _}},
-                list_bridges_http_api_v1()
-            ),
+            (not IsOnlyV2) andalso
+                ?assertMatch(
+                    {ok, {{_, 200, _}, _, _}},
+                    list_bridges_http_api_v1()
+                ),
 
             ok
         end,

+ 20 - 3
apps/emqx_bridge/test/emqx_bridge_v2_tests.erl

@@ -75,10 +75,27 @@ connector_resource_opts_test() ->
         start_timeout
     ],
     ConnectorSchemasRefs =
-        lists:map(
-            fun({Type, #{type := ?MAP(_, ?R_REF(SchemaMod, FieldName))}}) ->
-                {Type, find_resource_opts_fields(SchemaMod, FieldName)}
+        lists:foldl(
+            fun
+                ({Type, #{type := ?MAP(_, ?R_REF(SchemaMod, FieldName))}}, Acc) ->
+                    [{Type, find_resource_opts_fields(SchemaMod, FieldName)} | Acc];
+                ({Type, #{type := ?MAP(_, ?UNION(UnionType))}}, Acc) ->
+                    Types =
+                        case UnionType of
+                            List when is_list(List) ->
+                                List;
+                            Func when is_function(Func, 1) ->
+                                Func(all_union_members)
+                        end,
+                    lists:foldl(
+                        fun(?R_REF(SchemaMod, FieldName), InAcc) ->
+                            [{Type, find_resource_opts_fields(SchemaMod, FieldName)} | InAcc]
+                        end,
+                        Acc,
+                        Types
+                    )
             end,
+            [],
             emqx_connector_schema:fields(connectors)
         ),
     ConnectorsMissingRO = [Type || {Type, undefined} <- ConnectorSchemasRefs],

+ 12 - 0
apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl

@@ -11,4 +11,16 @@
 -define(VSN_1_0_X, 'v1.0.x').
 -define(VSN_0_13_X, 'v0.13.x').
 
+-define(THRIFT_HOST_OPTIONS, #{
+    default_port => 6667
+}).
+
+-define(PROTOCOL_V1, 'protocol_v1').
+-define(PROTOCOL_V2, 'protocol_v2').
+-define(PROTOCOL_V3, 'protocol_v3').
+
+-define(THRIFT_NOT_SUPPORT_ASYNC_MSG, <<"The Thrift backend does not support asynchronous calls">>).
+
+-type driver() :: resetapi | thrift.
+
 -endif.

+ 4 - 4
apps/emqx_bridge_iotdb/mix.exs

@@ -1,11 +1,11 @@
-defmodule EMQXBridgeIotdb.MixProject do
+defmodule EMQXBridgeTdengine.MixProject do
   use Mix.Project
   alias EMQXUmbrella.MixProject, as: UMP
 
   def project do
     [
       app: :emqx_bridge_iotdb,
-      version: "0.1.0",
+      version: "0.2.3",
       build_path: "../../_build",
       erlc_options: UMP.erlc_options(),
       erlc_paths: UMP.erlc_paths(),
@@ -23,11 +23,11 @@ defmodule EMQXBridgeIotdb.MixProject do
 
   def deps() do
     [
-      {:emqx, in_umbrella: true},
+      {:iotdb, github: "emqx/iotdb-client-erl", tag: "0.1.4"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false},
-      {:emqx_bridge_http, in_umbrella: true}
+      {:emqx_bridge_http, in_umbrella: true, runtime: false}
     ]
   end
 end

+ 2 - 1
apps/emqx_bridge_iotdb/rebar.config

@@ -9,7 +9,8 @@
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}},
-    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}},
+    {iotdb, {git, "https://github.com/emqx/iotdb-client-erl.git", {tag, "0.1.4"}}}
 ]}.
 {plugins, [rebar3_path_deps]}.
 {project_plugins, [erlfmt]}.

+ 2 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -10,7 +10,8 @@
     {applications, [
         kernel,
         stdlib,
-        emqx_resource
+        emqx_resource,
+        iotdb
     ]},
     {env, [
         {emqx_action_info_modules, [emqx_bridge_iotdb_action_info]},

+ 1 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl

@@ -53,7 +53,7 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
     ).
 
 bridge_v1_config_to_connector_config(BridgeV1Config) ->
-    ConnectorKeys = schema_keys(emqx_bridge_iotdb_connector, config),
+    ConnectorKeys = schema_keys(emqx_bridge_iotdb_connector, "config_rest"),
     emqx_utils_maps:update_if_present(
         <<"resource_opts">>,
         fun emqx_connector_schema:project_to_connector_resource_opts/1,

+ 286 - 56
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -17,6 +17,7 @@
 -export([
     resource_type/0,
     callback_mode/0,
+    callback_mode/1,
     on_start/2,
     on_stop/2,
     on_get_status/2,
@@ -31,6 +32,8 @@
     on_format_query_result/1
 ]).
 
+-export([connect/1, do_get_status/1]).
+
 -export([
     namespace/0,
     roots/0,
@@ -45,7 +48,8 @@
 
 -type config() ::
     #{
-        request_base := #{
+        driver := driver(),
+        request_base => #{
             scheme := http | https,
             host := iolist(),
             port := inet:port_number()
@@ -53,18 +57,17 @@
         connect_timeout := pos_integer(),
         pool_type := random | hash,
         pool_size := pos_integer(),
-        iotdb_version := atom(),
+        iotdb_version => atom(),
+        protocol_version => atom(),
         request => undefined | map(),
         atom() => _
     }.
 
 -type state() ::
     #{
-        connect_timeout := pos_integer(),
-        pool_type := random | hash,
+        driver := driver(),
         channels := map(),
         iotdb_version := atom(),
-        request => undefined | map(),
         atom() => _
     }.
 
@@ -115,13 +118,13 @@ connector_example_values() ->
 namespace() -> "iotdb".
 
 roots() ->
-    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+    [].
 
-fields(config) ->
+fields("config_rest") ->
     proplists_without(
         [url, request, retry_interval, headers],
         emqx_bridge_http_schema:fields("config_connector")
-    ) ++
+    ) ++ common_fields(rest) ++
         fields("connection_fields");
 fields("connection_fields") ->
     [
@@ -143,38 +146,88 @@ fields("connection_fields") ->
             )},
         {authentication,
             mk(
-                hoconsc:union([ref(?MODULE, auth_basic)]),
+                hoconsc:union([ref(?MODULE, authentication)]),
                 #{
                     default => auth_basic, desc => ?DESC("config_authentication")
                 }
             )}
     ];
-fields(auth_basic) ->
+fields(authentication) ->
     [
-        {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
+        {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_username")})},
         {password,
             emqx_schema_secret:mk(#{
                 required => true,
-                desc => ?DESC("config_auth_basic_password")
+                desc => ?DESC("config_auth_password")
             })}
     ];
-fields("post") ->
-    emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config);
-fields("put") ->
-    fields(config);
-fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post").
-
-desc(config) ->
-    ?DESC("desc_config");
-desc(auth_basic) ->
-    "Basic Authentication";
-desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
-    ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
+fields("config_thrift") ->
+    Meta = #{desc => ?DESC("server")},
+    emqx_connector_schema:common_fields() ++
+        common_fields(thrift) ++
+        [
+            {server, emqx_schema:servers_sc(Meta, ?THRIFT_HOST_OPTIONS)},
+            {protocol_version,
+                mk(
+                    hoconsc:enum([?PROTOCOL_V1, ?PROTOCOL_V2, ?PROTOCOL_V3]),
+                    #{
+                        desc => ?DESC("config_protocol_version"),
+                        default => ?PROTOCOL_V3
+                    }
+                )},
+            {'zoneId',
+                mk(
+                    binary(),
+                    #{default => <<"Asia/Shanghai">>, desc => ?DESC("config_zoneId")}
+                )},
+            {pool_size,
+                mk(
+                    pos_integer(),
+                    #{
+                        default => 8,
+                        desc => ?DESC("pool_size")
+                    }
+                )}
+        ] ++ fields(authentication) ++ emqx_connector_schema_lib:ssl_fields() ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields("post_" ++ Driver) ->
+    emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields("config_" ++ Driver);
+fields("put_" ++ Driver) ->
+    fields("config_" ++ Driver);
+fields("get_" ++ Driver) ->
+    emqx_bridge_schema:status_fields() ++ fields("post_" ++ Driver).
+
+common_fields(Driver) ->
+    [
+        {driver,
+            mk(
+                hoconsc:enum([Driver]),
+                #{
+                    desc => ?DESC("config_driver"),
+                    default => <<"rest">>
+                }
+            )}
+    ].
+
+desc(authentication) ->
+    ?DESC("config_authentication");
+desc(connector_resource_opts) ->
+    "Connector resource options";
+desc(Struct) when is_list(Struct) ->
+    case string:split(Struct, "_") of
+        ["config", _] ->
+            ?DESC("desc_config");
+        [Method, _] when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+            ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
+        _ ->
+            undefined
+    end;
 desc(_) ->
     undefined.
 
-connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
+connector_config(#{driver := rest} = Conf, #{name := Name, parse_confs := ParseConfs}) ->
     #{
         base_url := BaseUrl,
         authentication :=
@@ -199,7 +252,9 @@ connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
         <<"http">>,
         Name,
         WebhookConfig
-    ).
+    );
+connector_config(Conf, _) ->
+    Conf.
 
 proplists_without(Keys, List) ->
     [El || El = {K, _} <- List, not lists:member(K, Keys)].
@@ -211,8 +266,13 @@ resource_type() -> iotdb.
 
 callback_mode() -> async_if_possible.
 
+callback_mode(#{driver := rest}) ->
+    async_if_possible;
+callback_mode(#{driver := thrift}) ->
+    always_sync.
+
 -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
-on_start(InstanceId, #{iotdb_version := Version} = Config) ->
+on_start(InstanceId, #{driver := rest, iotdb_version := Version} = Config) ->
     %% [FIXME] The configuration passed in here is pre-processed and transformed
     %% in emqx_bridge_resource:parse_confs/2.
     case emqx_bridge_http_connector:on_start(InstanceId, Config) of
@@ -222,8 +282,78 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) ->
                 instance_id => InstanceId,
                 request => emqx_utils:redact(maps:get(request, State, <<>>))
             }),
-            ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
-            {ok, State#{iotdb_version => Version, channels => #{}}};
+            ?tp(iotdb_bridge_started, #{driver => rest, instance_id => InstanceId}),
+            {ok, State#{driver => rest, iotdb_version => Version, channels => #{}}};
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_iotdb_bridge",
+                instance_id => InstanceId,
+                request => emqx_utils:redact(maps:get(request, Config, <<>>)),
+                reason => Reason
+            }),
+            throw(failed_to_start_iotdb_bridge)
+    end;
+on_start(
+    InstanceId,
+    #{
+        driver := thrift,
+        protocol_version := ProtocolVsn,
+        server := Server,
+        pool_size := PoolSize,
+        ssl := SSL
+    } = Config
+) ->
+    IoTDBOpts0 = maps:with(['zoneId', username, password], Config),
+
+    Version =
+        case ProtocolVsn of
+            ?PROTOCOL_V1 ->
+                0;
+            ?PROTOCOL_V2 ->
+                1;
+            ?PROTOCOL_V3 ->
+                2
+        end,
+
+    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?THRIFT_HOST_OPTIONS),
+
+    TransportOpts =
+        case maps:get(enable, SSL) of
+            true ->
+                #{
+                    ssltransport => true,
+                    ssloptions => emqx_tls_lib:to_client_opts(SSL)
+                };
+            false ->
+                #{}
+        end,
+
+    IoTDBOpts = IoTDBOpts0#{
+        version => Version,
+        host => Host,
+        port => Port,
+        options => TransportOpts
+    },
+
+    Options = [
+        {pool_size, PoolSize},
+        {iotdb_options, IoTDBOpts}
+    ],
+
+    case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
+        ok ->
+            ?SLOG(info, #{
+                msg => "iotdb_bridge_started",
+                instance_id => InstanceId
+            }),
+
+            ?tp(iotdb_bridge_started, #{driver => thrift, instance_id => InstanceId}),
+
+            {ok, #{
+                driver => thrift,
+                iotdb_version => ProtocolVsn,
+                channels => #{}
+            }};
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_iotdb_bridge",
@@ -235,18 +365,26 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) ->
     end.
 
 -spec on_stop(manager_id(), state()) -> ok | {error, term()}.
-on_stop(InstanceId, State) ->
+on_stop(InstanceId, #{driver := rest} = State) ->
     ?SLOG(info, #{
         msg => "stopping_iotdb_bridge",
         connector => InstanceId
     }),
     Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
     ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
-    Res.
+    Res;
+on_stop(InstanceId, #{driver := thrift} = _State) ->
+    ?SLOG(info, #{
+        msg => "stopping_iotdb_bridge",
+        connector => InstanceId
+    }),
+
+    ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
+    emqx_resource_pool:stop(InstanceId).
 
 -spec on_get_status(manager_id(), state()) ->
     connected | connecting | {disconnected, state(), term()}.
-on_get_status(InstanceId, State) ->
+on_get_status(InstanceId, #{driver := rest} = State) ->
     Func = fun(Worker, Timeout) ->
         Request = {?IOTDB_PING_PATH, [], undefined},
         NRequest = emqx_bridge_http_connector:formalize_request(get, Request),
@@ -265,7 +403,27 @@ on_get_status(InstanceId, State) ->
                 {error, {unexpected_ping_result, Result}}
         end
     end,
-    emqx_bridge_http_connector:on_get_status(InstanceId, State, Func).
+    emqx_bridge_http_connector:on_get_status(InstanceId, State, Func);
+on_get_status(InstanceId, #{driver := thrift} = _State) ->
+    case emqx_resource_pool:health_check_workers(InstanceId, fun ?MODULE:do_get_status/1) of
+        true ->
+            ?status_connected;
+        false ->
+            ?status_disconnected
+    end.
+
+do_get_status(Conn) ->
+    case iotdb:ping(Conn) of
+        {ok, _} ->
+            true;
+        {error, _} ->
+            false
+    end.
+
+connect(Opts) ->
+    {iotdb_options, #{password := Password} = IoTDBOpts0} = lists:keyfind(iotdb_options, 1, Opts),
+    IoTDBOpts = IoTDBOpts0#{password := emqx_secret:unwrap(Password)},
+    iotdb:start_link(IoTDBOpts).
 
 -spec on_query(manager_id(), {send_message, map()}, state()) ->
     {ok, pos_integer(), [term()], term()}
@@ -287,8 +445,8 @@ on_query(
     case try_render_messages([Req], IoTDBVsn, Channels) of
         {ok, [IoTDBPayload]} ->
             handle_response(
-                emqx_bridge_http_connector:on_query(
-                    InstanceId, {ChannelId, IoTDBPayload}, State
+                do_on_query(
+                    InstanceId, ChannelId, IoTDBPayload, State
                 )
             );
         Error ->
@@ -301,7 +459,7 @@ on_query_async(
     InstanceId,
     {ChannelId, _Message} = Req,
     ReplyFunAndArgs0,
-    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+    #{driver := rest, iotdb_version := IoTDBVsn, channels := Channels} = State
 ) ->
     ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
@@ -325,13 +483,27 @@ on_query_async(
             );
         Error ->
             Error
-    end.
+    end;
+on_query_async(
+    InstanceId,
+    Req,
+    _ReplyFunAndArgs0,
+    #{driver := thrift} = State
+) ->
+    ?SLOG(error, #{
+        msg => "iotdb_bridge_async_query_failed",
+        instance_id => InstanceId,
+        send_message => Req,
+        reason => ?THRIFT_NOT_SUPPORT_ASYNC_MSG,
+        state => emqx_utils:redact(State)
+    }),
+    {error, not_support}.
 
 on_batch_query_async(
     InstId,
     Requests,
     Callback,
-    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+    #{driver := rest, iotdb_version := IoTDBVsn, channels := Channels} = State
 ) ->
     ?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}),
     [{ChannelId, _Message} | _] = Requests,
@@ -361,8 +533,23 @@ on_batch_query_async(
             );
         Error ->
             Error
-    end.
+    end;
+on_batch_query_async(
+    InstanceId,
+    Req,
+    _ReplyFunAndArgs0,
+    #{driver := thrift} = State
+) ->
+    ?SLOG(error, #{
+        msg => "iotdb_bridge_async_query_failed",
+        instance_id => InstanceId,
+        send_message => Req,
+        reason => ?THRIFT_NOT_SUPPORT_ASYNC_MSG,
+        state => emqx_utils:redact(State)
+    }),
+    {error, not_support}.
 
+%% todo
 on_batch_query(
     InstId,
     [{ChannelId, _Message}] = Requests,
@@ -381,8 +568,8 @@ on_batch_query(
             lists:map(
                 fun(IoTDBPayload) ->
                     handle_response(
-                        emqx_bridge_http_connector:on_query(
-                            InstId, {ChannelId, IoTDBPayload}, State
+                        do_on_query(
+                            InstId, ChannelId, IoTDBPayload, State
                         )
                     )
                 end,
@@ -397,7 +584,7 @@ on_format_query_result(Result) ->
 
 on_add_channel(
     InstanceId,
-    #{iotdb_version := Version, channels := Channels} = OldState0,
+    #{driver := rest, iotdb_version := Version, channels := Channels} = OldState0,
     ChannelId,
     #{
         parameters := #{data := Data} = Parameter
@@ -429,6 +616,27 @@ on_add_channel(
                 InstanceId, OldState0, ChannelId, HTTPReq
             ),
 
+            %% update IoTDB channel
+            DeviceId = maps:get(device_id, Parameter, <<>>),
+            Channel = Parameter#{
+                device_id => emqx_placeholder:preproc_tmpl(DeviceId),
+                data := preproc_data_template(Data)
+            },
+            Channels2 = Channels#{ChannelId => Channel},
+            {ok, OldState#{channels := Channels2}}
+    end;
+on_add_channel(
+    _InstanceId,
+    #{driver := thrift, channels := Channels} = OldState,
+    ChannelId,
+    #{
+        parameters := #{data := Data} = Parameter
+    }
+) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            {error, already_exists};
+        _ ->
             %% update IoTDB channel
             DeviceId = maps:get(device_id, Parameter, <<>>),
             Channel = Parameter#{
@@ -439,8 +647,11 @@ on_add_channel(
             {ok, OldState#{channels := Channels2}}
     end.
 
-on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
+on_remove_channel(InstanceId, #{driver := rest, channels := Channels} = OldState0, ChannelId) ->
     {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
+    Channels2 = maps:remove(ChannelId, Channels),
+    {ok, OldState#{channels => Channels2}};
+on_remove_channel(_InstanceId, #{driver := thrift, channels := Channels} = OldState, ChannelId) ->
     Channels2 = maps:remove(ChannelId, Channels),
     {ok, OldState#{channels => Channels2}}.
 
@@ -536,16 +747,16 @@ proc_data(
     ],
     Msg,
     Nows,
-    IotDbVsn,
+    IoTDbVsn,
     Acc
 ) ->
     DataType = list_to_binary(
         string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
     ),
     try
-        proc_data(T, Msg, Nows, IotDbVsn, [
+        proc_data(T, Msg, Nows, IoTDbVsn, [
             #{
-                timestamp => iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows),
+                timestamp => iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows),
                 measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
                 data_type => DataType,
                 value => proc_value(DataType, ValueTkn, Msg)
@@ -559,28 +770,28 @@ proc_data(
             ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
             {error, invalid_data}
     end;
-proc_data([], _Msg, _Nows, _IotDbVsn, Acc) ->
+proc_data([], _Msg, _Nows, _IoTDbVsn, Acc) ->
     {ok, lists:reverse(Acc)}.
 
-iot_timestamp(_IotDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
+iot_timestamp(_IoTDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
     Timestamp;
-iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows) ->
-    iot_timestamp(IotDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
+iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows) ->
+    iot_timestamp(IoTDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
 
 %% > v1.3.0 don't allow write nanoseconds nor microseconds
 iot_timestamp(?VSN_1_3_X, <<"now_us">>, #{now_ms := NowMs}) ->
     NowMs;
 iot_timestamp(?VSN_1_3_X, <<"now_ns">>, #{now_ms := NowMs}) ->
     NowMs;
-iot_timestamp(_IotDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
+iot_timestamp(_IoTDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
     NowUs;
-iot_timestamp(_IotDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
+iot_timestamp(_IoTDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
     NowNs;
-iot_timestamp(_IotDbVsn, Timestamp, #{now_ms := NowMs}) when
+iot_timestamp(_IoTDbVsn, Timestamp, #{now_ms := NowMs}) when
     Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
 ->
     NowMs;
-iot_timestamp(_IotDbVsn, Timestamp, _) when is_binary(Timestamp) ->
+iot_timestamp(_IoTDbVsn, Timestamp, _) when is_binary(Timestamp) ->
     binary_to_integer(Timestamp).
 
 proc_value(<<"TEXT">>, ValueTkn, Msg) ->
@@ -719,6 +930,10 @@ iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
     <<"is_aligned">>;
 iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
     <<"isAligned">>;
+iotdb_field_key(is_aligned, Vsn) when
+    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
+->
+    'isAligned';
 iotdb_field_key(device_id, ?VSN_1_3_X) ->
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_1_1_X) ->
@@ -727,6 +942,10 @@ iotdb_field_key(device_id, ?VSN_1_0_X) ->
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_0_13_X) ->
     <<"deviceId">>;
+iotdb_field_key(device_id, Vsn) when
+    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
+->
+    'deviceId';
 iotdb_field_key(data_types, ?VSN_1_3_X) ->
     <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_1_1_X) ->
@@ -734,7 +953,11 @@ iotdb_field_key(data_types, ?VSN_1_1_X) ->
 iotdb_field_key(data_types, ?VSN_1_0_X) ->
     <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_0_13_X) ->
-    <<"dataTypes">>.
+    <<"dataTypes">>;
+iotdb_field_key(data_types, Vsn) when
+    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
+->
+    dtypes.
 
 to_list(List) when is_list(List) -> List;
 to_list(Data) -> [Data].
@@ -756,6 +979,8 @@ handle_response({ok, Code, _Headers, Body}) ->
     {error, #{code => Code, body => Body}};
 handle_response({ok, Code, Body}) ->
     {error, #{code => Code, body => Body}};
+handle_response({ok, _} = Resp) ->
+    Resp;
 handle_response({error, _} = Error) ->
     Error.
 
@@ -849,3 +1074,8 @@ get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
 %% This is a self-describing message
 get_data_template(#{data := []}, Payloads) ->
     preproc_data_list(Payloads).
+
+do_on_query(InstanceId, ChannelId, Data, #{driver := rest} = State) ->
+    emqx_bridge_http_connector:on_query(InstanceId, {ChannelId, Data}, State);
+do_on_query(InstanceId, _ChannelId, Data, #{driver := thrift} = _State) ->
+    ecpool:pick_and_do(InstanceId, {iotdb, insert_tablet, [Data]}, no_handover).

+ 39 - 4
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector_info.erl

@@ -16,6 +16,10 @@
     api_schema/1
 ]).
 
+-define(CONNECTOR, emqx_bridge_iotdb_connector).
+-define(DRIVER_REST, "rest").
+-define(DRIVER_THRIFT, "thrift").
+
 type_name() ->
     iotdb.
 
@@ -31,7 +35,7 @@ config_transform_module() ->
 config_schema() ->
     {iotdb,
         hoconsc:mk(
-            hoconsc:map(name, hoconsc:ref(emqx_bridge_iotdb_connector, config)),
+            hoconsc:map(name, hoconsc:union(fun driver_union_selector/1)),
             #{
                 desc => <<"IoTDB Connector Config">>,
                 required => false
@@ -42,6 +46,37 @@ schema_module() ->
     emqx_bridge_iotdb_connector.
 
 api_schema(Method) ->
-    emqx_connector_schema:api_ref(
-        emqx_bridge_iotdb_connector, <<"iotdb">>, Method
-    ).
+    {<<"iotdb">>, hoconsc:union(mk_api_union_selector(Method))}.
+
+driver_union_selector(all_union_members) ->
+    [
+        ref(?DRIVER_REST, "config"),
+        ref(?DRIVER_THRIFT, "config")
+    ];
+driver_union_selector({value, Value}) ->
+    case Value of
+        #{<<"driver">> := <<"thrift">>} ->
+            [ref(?DRIVER_THRIFT, "config")];
+        _ ->
+            [ref(?DRIVER_REST, "config")]
+    end.
+
+mk_api_union_selector(Method) ->
+    fun
+        (all_union_members) ->
+            [
+                ref(?DRIVER_REST, Method),
+                ref(?DRIVER_THRIFT, Method)
+            ];
+        ({value, Value}) ->
+            case Value of
+                #{<<"driver">> := <<"thrift">>} ->
+                    [ref(?DRIVER_THRIFT, Method)];
+                _ ->
+                    [ref(?DRIVER_REST, Method)]
+            end
+    end.
+
+ref(Driver, Field) ->
+    Name = Field ++ "_" ++ Driver,
+    hoconsc:ref(?CONNECTOR, Name).

+ 93 - 30
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -21,15 +21,25 @@ all() ->
     [
         {group, iotdb110},
         {group, iotdb130},
-        {group, legacy}
+        {group, legacy},
+        {group, thrift}
     ].
 
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
+    Async = [
+        t_async_device_id_missing,
+        t_async_invalid_template,
+        t_async_query,
+        t_extract_device_id_from_rule_engine_message,
+        %%todo
+        t_sync_query_aggregated
+    ],
     [
         {iotdb110, AllTCs},
         {iotdb130, AllTCs},
-        {legacy, AllTCs}
+        {legacy, AllTCs},
+        {thrift, AllTCs -- Async}
     ].
 
 init_per_suite(Config) ->
@@ -65,6 +75,7 @@ init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 ->
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
+                {rest_port, Port},
                 {proxy_name, ProxyName},
                 {iotdb_version, IotDbVersion},
                 {iotdb_rest_prefix, <<"/rest/v2/">>}
@@ -88,6 +99,7 @@ init_per_group(legacy = Type, Config0) ->
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
+                {rest_port, Port},
                 {proxy_name, ProxyName},
                 {iotdb_version, ?VSN_0_13_X},
                 {iotdb_rest_prefix, <<"/rest/v1/">>}
@@ -101,6 +113,30 @@ init_per_group(legacy = Type, Config0) ->
                     {skip, no_iotdb}
             end
     end;
+init_per_group(thrift = Type, Config0) ->
+    Host = os:getenv("IOTDB_THRIFT_HOST", "toxiproxy.emqx.net"),
+    Port = list_to_integer(os:getenv("IOTDB_THRIFT_PORT", "46667")),
+    ProxyName = "iotdb_thrift",
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
+            [
+                {bridge_host, Host},
+                {bridge_port, Port},
+                {rest_port, 48080},
+                {proxy_name, ProxyName},
+                {iotdb_version, ?PROTOCOL_V3},
+                {iotdb_rest_prefix, <<"/rest/v2/">>}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_iotdb);
+                _ ->
+                    {skip, no_iotdb}
+            end
+    end;
 init_per_group(_Group, Config) ->
     Config.
 
@@ -121,6 +157,7 @@ init_per_testcase(TestCase, Config0) ->
         (atom_to_binary(TestCase))/binary, UniqueNum/binary
     >>,
     {_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
+
     {_, ActionConfig} = action_config(Name, Config0),
     Config = [
         {connector_type, Type},
@@ -229,16 +266,14 @@ iotdb_request(Config, Path, Body) ->
     iotdb_request(Config, Path, Body, #{}).
 
 iotdb_request(Config, Path, Body, Opts) ->
-    _BridgeConfig =
-        #{
-            <<"base_url">> := BaseURL,
-            <<"authentication">> := #{
-                <<"username">> := Username,
-                <<"password">> := Password
-            }
-        } =
-        ?config(connector_config, Config),
-    ct:pal("bridge config: ~p", [_BridgeConfig]),
+    BridgeConfig = ?config(connector_config, Config),
+    Host = ?config(bridge_host, Config),
+    Port = ?config(rest_port, Config),
+    Username = <<"root">>,
+    Password = <<"root">>,
+    BaseURL = iotdb_server_url(Host, Port),
+
+    ct:pal("bridge config: ~p", [BridgeConfig]),
     URL = <<BaseURL/binary, Path/binary>>,
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
     Headers = [
@@ -265,6 +300,8 @@ iotdb_query(Config, Query) ->
 
 is_success_check({ok, 200, _, Body}) ->
     ?assert(is_code(200, emqx_utils_json:decode(Body)));
+is_success_check({ok, _}) ->
+    ok;
 is_success_check(Other) ->
     throw(Other).
 
@@ -303,23 +340,46 @@ connector_config(Name, Config) ->
     Version = ?config(iotdb_version, Config),
     ServerURL = iotdb_server_url(Host, Port),
     ConfigString =
-        io_lib:format(
-            "connectors.~s.~s {\n"
-            "  enable = true\n"
-            "  base_url = \"~s\"\n"
-            "  iotdb_version = \"~s\"\n"
-            "  authentication = {\n"
-            "     username = \"root\"\n"
-            "     password = \"root\"\n"
-            "  }\n"
-            "}\n",
-            [
-                Type,
-                Name,
-                ServerURL,
-                Version
-            ]
-        ),
+        case ?config(test_group, Config) of
+            thrift ->
+                io_lib:format(
+                    "connectors.~s.~s {\n"
+                    "  enable = true\n"
+                    "  driver = \"thrift\"\n"
+                    "  server = \"~s:~p\"\n"
+                    "  protocol_version = \"~p\"\n"
+                    "  username = \"root\"\n"
+                    "  password = \"root\"\n"
+                    "  zoneId = \"Asia/Shanghai\"\n"
+                    "  ssl.enable = false\n"
+                    "}\n",
+                    [
+                        Type,
+                        Name,
+                        Host,
+                        Port,
+                        Version
+                    ]
+                );
+            _ ->
+                io_lib:format(
+                    "connectors.~s.~s {\n"
+                    "  enable = true\n"
+                    "  base_url = \"~s\"\n"
+                    "  iotdb_version = \"~s\"\n"
+                    "  authentication = {\n"
+                    "     username = \"root\"\n"
+                    "     password = \"root\"\n"
+                    "  }\n"
+                    "}\n",
+                    [
+                        Type,
+                        Name,
+                        ServerURL,
+                        Version
+                    ]
+                )
+        end,
     ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
     {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
 
@@ -556,7 +616,10 @@ t_async_invalid_template(Config) ->
     ).
 
 t_create_via_http(Config) ->
-    emqx_bridge_v2_testlib:t_create_via_http(Config).
+    emqx_bridge_v2_testlib:t_create_via_http(
+        Config,
+        thrift =:= ?config(test_group, Config)
+    ).
 
 t_start_stop(Config) ->
     emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped).

+ 45 - 11
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -352,7 +352,12 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
         end,
         RawConfig,
         ActionConnectorTuples
-    ).
+    );
+transform_old_style_bridges_to_connector_and_actions_of_type(
+    {_ConnectorType, #{type := ?MAP(_Name, _)}},
+    RawConfig
+) ->
+    RawConfig.
 
 transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) ->
     ConnectorFields = ?MODULE:fields(connectors),
@@ -636,10 +641,11 @@ status() ->
 -include_lib("hocon/include/hocon_types.hrl").
 schema_homogeneous_test() ->
     case
-        lists:filtermap(
-            fun({_Name, Schema}) ->
-                is_bad_schema(Schema)
+        lists:foldl(
+            fun({_Name, Schema}, Bads) ->
+                is_bad_schema(Schema, Bads)
             end,
+            [],
             fields(connectors)
         )
     of
@@ -649,7 +655,32 @@ schema_homogeneous_test() ->
             throw(List)
     end.
 
-is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
+is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}, Bads) ->
+    is_bad_schema_type(Module, TypeName, Bads);
+is_bad_schema(#{type := ?MAP(_, ?UNION(Types))}, Bads) when is_list(Types) ->
+    is_bad_schema_types(Types, Bads);
+is_bad_schema(#{type := ?MAP(_, ?UNION(Func))}, Bads) when is_function(Func, 1) ->
+    Types = Func(all_union_members),
+    is_bad_schema_types(Types, Bads).
+
+is_bad_schema_types(Types, Bads) ->
+    lists:foldl(
+        fun
+            (?R_REF(Module, TypeName), Acc) ->
+                is_bad_schema_type(Module, TypeName, Acc);
+            (Type, Acc) ->
+                [
+                    #{
+                        type => Type
+                    }
+                    | Acc
+                ]
+        end,
+        Bads,
+        Types
+    ).
+
+is_bad_schema_type(Module, TypeName, Bads) ->
     Fields = Module:fields(TypeName),
     ExpectedFieldNames = common_field_names(),
     MissingFields = lists:filter(
@@ -657,13 +688,16 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
     ),
     case MissingFields of
         [] ->
-            false;
+            Bads;
         _ ->
-            {true, #{
-                schema_module => Module,
-                type_name => TypeName,
-                missing_fields => MissingFields
-            }}
+            [
+                #{
+                    schema_module => Module,
+                    type_name => TypeName,
+                    missing_fields => MissingFields
+                }
+                | Bads
+            ]
     end.
 
 common_field_names() ->

+ 18 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -97,6 +97,7 @@
     %% get the callback mode of a specific module
     get_callback_mode/1,
     get_resource_type/1,
+    get_callback_mode/2,
     %% start the instance
     call_start/3,
     %% verify if the resource is working normally
@@ -160,7 +161,8 @@
     on_remove_channel/3,
     on_get_channels/1,
     query_mode/1,
-    on_format_query_result/1
+    on_format_query_result/1,
+    callback_mode/1
 ]).
 
 %% when calling emqx_resource:start/1
@@ -171,7 +173,10 @@
 -callback on_stop(resource_id(), resource_state()) -> term().
 
 %% when calling emqx_resource:get_callback_mode/1
--callback callback_mode() -> callback_mode().
+-callback callback_mode() -> callback_mode() | undefined.
+
+%% when calling emqx_resource:get_callback_mode/1
+-callback callback_mode(resource_state()) -> callback_mode().
 
 %% when calling emqx_resource:query/3
 -callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
@@ -491,9 +496,20 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
 get_callback_mode(Mod) ->
     Mod:callback_mode().
 
+<<<<<<< variant A
 -spec get_resource_type(module()) -> resource_type().
 get_resource_type(Mod) ->
     Mod:resource_type().
+>>>>>>> variant B
+-spec get_callback_mode(module(), resource_state()) -> callback_mode() | undefined.
+get_callback_mode(Mod, State) ->
+    case erlang:function_exported(Mod, callback_mode, 1) of
+        true ->
+            Mod:callback_mode(State);
+        _ ->
+            undefined
+    end.
+======= end
 
 -spec call_start(resource_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.

+ 11 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -795,8 +795,10 @@ start_resource(Data, From) ->
             UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
             %% Perform an initial health_check immediately before transitioning into a connected state
             UpdatedData2 = add_channels(UpdatedData1),
+            UpdatedData3 = maybe_update_callback_mode(UpdatedData2),
+
             Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
-            {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions};
+            {next_state, ?state_connecting, update_state(UpdatedData3, Data), Actions};
         {error, Reason} = Err ->
             IsDryRun = emqx_resource:is_dry_run(ResId),
             ?SLOG(
@@ -836,6 +838,14 @@ add_channels(Data) ->
     ),
     Data#data{added_channels = NewChannels}.
 
+maybe_update_callback_mode(Data = #data{mod = ResourceType, state = ResourceState}) ->
+    case emqx_resource:get_callback_mode(ResourceType, ResourceState) of
+        undefined ->
+            Data;
+        CallMode ->
+            Data#data{callback_mode = CallMode}
+    end.
+
 add_channels_in_list([], Data) ->
     Data;
 add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->

+ 25 - 8
rel/i18n/emqx_bridge_iotdb_connector.hocon

@@ -12,17 +12,17 @@ auth_basic.desc:
 auth_basic.label:
 """Basic auth params"""
 
-config_auth_basic_username.desc:
-"""The username as configured at the IoTDB REST interface"""
+config_auth_username.desc:
+"""The username as configured at the IoTDB REST/Thrift interface"""
 
-config_auth_basic_username.label:
-  """HTTP Basic Auth Username"""
+config_auth_username.label:
+  """Auth Username"""
 
-config_auth_basic_password.desc:
-"""The password as configured at the IoTDB REST interface"""
+config_auth_password.desc:
+"""The password as configured at the IoTDB/Thrift REST interface"""
 
-config_auth_basic_password.label:
-"""HTTP Basic Auth Password"""
+config_auth_password.label:
+"""Auth Password"""
 
 config_base_url.desc:
 """The base URL of the external IoTDB service's REST interface."""
@@ -41,4 +41,21 @@ desc_config.desc:
 desc_config.label:
 """IoTDB Bridge Configuration"""
 
+config_driver.desc:
+"""The drivers for IoTDB connector, can be:
+- rest
+- thrift"""
+
+server.desc:
+"""The address of the IoTDB Thrift server (host:port)"""
+
+config_protocol_version.desc:
+"""The version of IoTDB Thrift protocol"""
+
+config_zoneId.desc:
+"""Timezone for IoTDB Thrift session"""
+
+pool_size.desc:
+"""Size for the IoTDB Thrift connection pool"""
+
 }