Procházet zdrojové kódy

feat(iotdb): implemented batch support for IoTDB

firest před 1 rokem
rodič
revize
9b3ca2d84f

+ 0 - 59
.ci/docker-compose-file/docker-compose-iotdb.yaml

@@ -30,65 +30,6 @@ services:
     networks:
       - emqx_bridge
 
-  iotdb_1_1_0:
-    container_name: iotdb110
-    hostname: iotdb110
-    image: apache/iotdb:1.1.0-standalone
-    restart: always
-    environment:
-      - enable_rest_service=true
-      - cn_internal_address=iotdb110
-      - cn_internal_port=10710
-      - cn_consensus_port=10720
-      - cn_target_config_node_list=iotdb110:10710
-      - dn_rpc_address=iotdb110
-      - dn_internal_address=iotdb110
-      - dn_rpc_port=6667
-      - dn_mpp_data_exchange_port=10740
-      - dn_schema_region_consensus_port=10750
-      - dn_data_region_consensus_port=10760
-      - dn_target_config_node_list=iotdb110:10710
-    # volumes:
-    #     - ./data:/iotdb/data
-    #     - ./logs:/iotdb/logs
-    expose:
-      - "18080"
-    # IoTDB's REST interface, uncomment for local testing
-    # ports:
-    #     - "18080:18080"
-    networks:
-      - emqx_bridge
-
-  iotdb_0_13:
-    container_name: iotdb013
-    hostname: iotdb013
-    image: apache/iotdb:0.13.4-node
-    restart: always
-    environment:
-      - enable_rest_service=true
-      - cn_internal_address=iotdb013
-      - cn_internal_port=10710
-      - cn_consensus_port=10720
-      - cn_target_config_node_list=iotdb013:10710
-      - dn_rpc_address=iotdb013
-      - dn_internal_address=iotdb013
-      - dn_rpc_port=6667
-      - dn_mpp_data_exchange_port=10740
-      - dn_schema_region_consensus_port=10750
-      - dn_data_region_consensus_port=10760
-      - dn_target_config_node_list=iotdb013:10710
-    volumes:
-      - ./iotdb013/iotdb-rest.properties:/iotdb/conf/iotdb-rest.properties
-    #     - ./data:/iotdb/data
-    #     - ./logs:/iotdb/logs
-    expose:
-      - "18080"
-    # IoTDB's REST interface, uncomment for local testing
-    # ports:
-    #     - "18080:18080"
-    networks:
-      - emqx_bridge
-
   iotdb-thrift:
     container_name: iotdb-thrift
     hostname: iotdb-thrift

+ 1 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_iotdb, [
     {description, "EMQX Enterprise Apache IoTDB Bridge"},
-    {vsn, "0.2.6"},
+    {vsn, "0.2.7"},
     {modules, [
         emqx_bridge_iotdb,
         emqx_bridge_iotdb_connector

+ 200 - 343
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -458,7 +458,7 @@ connect(Opts) ->
 on_query(
     InstanceId,
     {ChannelId, _Message} = Req,
-    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+    State
 ) ->
     ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
@@ -468,12 +468,10 @@ on_query(
         state => emqx_utils:redact(State)
     }),
 
-    case try_render_messages([Req], IoTDBVsn, Channels) of
-        {ok, [IoTDBPayload]} ->
+    case try_render_records([Req], State) of
+        {ok, Records} ->
             handle_response(
-                do_on_query(
-                    InstanceId, ChannelId, IoTDBPayload, State
-                )
+                do_on_query(InstanceId, ChannelId, Records, State)
             );
         Error ->
             Error
@@ -485,7 +483,7 @@ on_query_async(
     InstanceId,
     {ChannelId, _Message} = Req,
     ReplyFunAndArgs0,
-    #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
+    #{driver := restapi} = State
 ) ->
     ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
@@ -494,8 +492,8 @@ on_query_async(
         send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case try_render_messages([Req], IoTDBVsn, Channels) of
-        {ok, [IoTDBPayload]} ->
+    case try_render_records([Req], State) of
+        {ok, Records} ->
             ReplyFunAndArgs =
                 {
                     fun(Result) ->
@@ -505,7 +503,7 @@ on_query_async(
                     []
                 },
             emqx_bridge_http_connector:on_query_async(
-                InstanceId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State
+                InstanceId, {ChannelId, Records}, ReplyFunAndArgs, State
             );
         Error ->
             Error
@@ -528,7 +526,7 @@ on_batch_query_async(
     InstId,
     Requests,
     Callback,
-    #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
+    #{driver := restapi} = State
 ) ->
     ?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}),
     [{ChannelId, _Message} | _] = Requests,
@@ -538,8 +536,8 @@ on_batch_query_async(
         send_message => Requests,
         state => emqx_utils:redact(State)
     }),
-    case try_render_messages(Requests, IoTDBVsn, Channels) of
-        {ok, IoTDBPayloads} ->
+    case try_render_records(Requests, State) of
+        {ok, Records} ->
             ReplyFunAndArgs =
                 {
                     fun(Result) ->
@@ -548,13 +546,10 @@ on_batch_query_async(
                     end,
                     []
                 },
-            lists:map(
-                fun(IoTDBPayload) ->
-                    emqx_bridge_http_connector:on_query_async(
-                        InstId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State
-                    )
-                end,
-                IoTDBPayloads
+            handle_response(
+                emqx_bridge_http_connector:on_query_async(
+                    InstId, {ChannelId, Records}, ReplyFunAndArgs, State
+                )
             );
         Error ->
             Error
@@ -573,15 +568,10 @@ on_batch_query_async(
     }),
     {error, not_support}.
 
-%% TODO:
-%% Currently, the batch mode is not really `batch` for the Rest API and Thrift drivers.
-%% 1. For Rest API we need to upgrade from v1 to v2 which has a batch endpoint `insertRecords`,
-%%    and we should take care to ensure this is not a breaking change
-%% 2. For the Thrift, we can use the `tSInsertTabletsReq` or `tSInsertRecordsReq` protocol
 on_batch_query(
     InstId,
     [{ChannelId, _Message} | _] = Requests,
-    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+    State
 ) ->
     ?tp(iotdb_bridge_on_batch_query, #{instance_id => InstId}),
     ?SLOG(debug, #{
@@ -591,17 +581,10 @@ on_batch_query(
         state => emqx_utils:redact(State)
     }),
 
-    case try_render_messages(Requests, IoTDBVsn, Channels) of
-        {ok, IoTDBPayloads} ->
-            lists:map(
-                fun(IoTDBPayload) ->
-                    handle_response(
-                        do_on_query(
-                            InstId, ChannelId, IoTDBPayload, State
-                        )
-                    )
-                end,
-                IoTDBPayloads
+    case try_render_records(Requests, State) of
+        {ok, Records} ->
+            handle_response(
+                do_on_query(InstId, ChannelId, Records, State)
             );
         Error ->
             Error
@@ -610,6 +593,15 @@ on_batch_query(
 on_format_query_result(Result) ->
     emqx_bridge_http_connector:on_format_query_result(Result).
 
+on_add_channel(
+    _InstanceId,
+    _State0,
+    _ChannelId,
+    #{
+        parameters := #{data := []} = _Parameter
+    }
+) ->
+    {error, <<"The data template cannot be empty">>};
 on_add_channel(
     InstanceId,
     #{driver := restapi, iotdb_version := Version, channels := Channels} = OldState0,
@@ -623,45 +615,41 @@ on_add_channel(
             {error, already_exists};
         _ ->
             %% update HTTP channel
-            InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
-            InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
-
-            Path =
-                case Version of
-                    ?VSN_1_1_X -> InsertTabletPathV2;
-                    ?VSN_1_3_X -> InsertTabletPathV2;
-                    _ -> InsertTabletPathV1
-                end,
-
-            HTTPReq = #{
-                parameters => Parameter#{
-                    path => Path,
-                    method => <<"post">>
-                }
-            },
-
-            {ok, OldState} = emqx_bridge_http_connector:on_add_channel(
-                InstanceId, OldState0, ChannelId, HTTPReq
-            ),
-
-            %% update IoTDB channel
-            DeviceId = maps:get(device_id, Parameter, <<>>),
-            Channel = Parameter#{
-                device_id => emqx_placeholder:preproc_tmpl(DeviceId),
-                data := preproc_data_template(Data)
-            },
-            Channels2 = Channels#{ChannelId => Channel},
-            {ok, OldState#{channels := Channels2}}
+            case Version of
+                ?VSN_1_3_X ->
+                    Path = <<"rest/v2/insertRecords">>,
+                    HTTPReq = #{
+                        parameters => Parameter#{
+                            path => Path,
+                            method => <<"post">>
+                        }
+                    },
+
+                    {ok, OldState} = emqx_bridge_http_connector:on_add_channel(
+                        InstanceId, OldState0, ChannelId, HTTPReq
+                    ),
+
+                    %% update IoTDB channel
+                    DeviceId = maps:get(device_id, Parameter, <<>>),
+                    Channel = Parameter#{
+                        device_id => emqx_placeholder:preproc_tmpl(DeviceId),
+                        data := preproc_data_template(Data)
+                    },
+                    Channels2 = Channels#{ChannelId => Channel},
+                    {ok, OldState#{channels := Channels2}};
+                _ ->
+                    {error, <<"REST API only supports IoTDB 1.3.x and later">>}
+            end
     end;
 on_add_channel(
     _InstanceId,
     #{driver := thrift},
     _ChannelId,
     #{
-        resource_opts := #{query_mode := QueryMode, batch_size := BatchSize}
+        resource_opts := #{query_mode := async}
     }
-) when QueryMode =:= async; BatchSize > 1 ->
-    {error, <<"Thrift does not support async or batch mode">>};
+) ->
+    {error, <<"Thrift does not support async mode">>};
 on_add_channel(
     _InstanceId,
     #{driver := thrift, channels := Channels} = OldState,
@@ -678,7 +666,9 @@ on_add_channel(
             DeviceId = maps:get(device_id, Parameter, <<>>),
             Channel = Parameter#{
                 device_id => emqx_placeholder:preproc_tmpl(DeviceId),
-                data := preproc_data_template(Data)
+                %% The template process will reverse the order of the values
+                %% so we can reverse the template here to reduce some runtime cost                                 %%
+                data := lists:reverse(preproc_data_template(Data))
             },
             Channels2 = Channels#{ChannelId => Channel},
             {ok, OldState#{channels := Channels2}}
@@ -717,118 +707,25 @@ get_payload(Payload) ->
 parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
     ParsedPayload;
 parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) ->
-    emqx_utils_json:decode(UnparsedPayload);
-parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) ->
-    lists:map(fun parse_payload/1, UnparsedPayloads).
-
-preproc_data_list(DataList) ->
-    lists:foldl(
-        fun preproc_data/2,
-        [],
-        DataList
-    ).
-
-preproc_data(
-    #{
-        <<"measurement">> := Measurement,
-        <<"data_type">> := DataType,
-        <<"value">> := Value
-    } = Data,
-    Acc
-) ->
-    [
-        #{
-            timestamp => maybe_preproc_tmpl(
-                maps:get(<<"timestamp">>, Data, <<"now">>)
-            ),
-            measurement => emqx_placeholder:preproc_tmpl(Measurement),
-            data_type => emqx_placeholder:preproc_tmpl(DataType),
-            value => maybe_preproc_tmpl(Value)
-        }
-        | Acc
-    ];
-preproc_data(_NoMatch, Acc) ->
-    ?SLOG(
-        warning,
-        #{
-            msg => "iotdb_bridge_preproc_data_failed",
-            required_fields => ['measurement', 'data_type', 'value'],
-            received => _NoMatch
-        }
-    ),
-    Acc.
-
-maybe_preproc_tmpl(Value) when is_binary(Value) ->
-    emqx_placeholder:preproc_tmpl(Value);
-maybe_preproc_tmpl(Value) ->
-    Value.
-
-proc_data(PreProcessedData, Msg, IoTDBVsn) ->
-    NowNS = erlang:system_time(nanosecond),
-    Nows = #{
-        now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond),
-        now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
-        now_ns => NowNS
-    },
-    proc_data(PreProcessedData, Msg, Nows, IoTDBVsn, []).
+    emqx_utils_json:decode(UnparsedPayload).
 
-proc_data(
-    [
-        #{
-            timestamp := TimestampTkn,
-            measurement := Measurement,
-            data_type := DataType0,
-            value := ValueTkn
-        }
-        | T
-    ],
-    Msg,
-    Nows,
-    IoTDbVsn,
-    Acc
-) ->
-    DataType = list_to_binary(
-        string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
-    ),
-    try
-        proc_data(T, Msg, Nows, IoTDbVsn, [
-            #{
-                timestamp => iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows),
-                measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
-                data_type => DataType,
-                value => proc_value(DataType, ValueTkn, Msg)
-            }
-            | Acc
-        ])
-    catch
-        throw:Reason ->
-            {error, Reason};
-        Error:Reason:Stacktrace ->
-            ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
-            {error, invalid_data}
-    end;
-proc_data([], _Msg, _Nows, _IoTDbVsn, Acc) ->
-    {ok, lists:reverse(Acc)}.
-
-iot_timestamp(_IoTDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
+iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
     Timestamp;
-iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows) ->
-    iot_timestamp(IoTDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
+iot_timestamp(TimestampTkn, Msg, Nows) ->
+    iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
 
-%% > v1.3.0 don't allow write nanoseconds nor microseconds
-iot_timestamp(?VSN_1_3_X, <<"now_us">>, #{now_ms := NowMs}) ->
-    NowMs;
-iot_timestamp(?VSN_1_3_X, <<"now_ns">>, #{now_ms := NowMs}) ->
-    NowMs;
-iot_timestamp(_IoTDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
+%% IoTDB allows us/ms/ns,
+%% but an instance only supports one time unit,
+%% and the time unit cannot be changed after the database is started.
+iot_timestamp(<<"now_us">>, #{now_us := NowUs}) ->
     NowUs;
-iot_timestamp(_IoTDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
+iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) ->
     NowNs;
-iot_timestamp(_IoTDbVsn, Timestamp, #{now_ms := NowMs}) when
+iot_timestamp(Timestamp, #{now_ms := NowMs}) when
     Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
 ->
     NowMs;
-iot_timestamp(_IoTDbVsn, Timestamp, _) when is_binary(Timestamp) ->
+iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
     binary_to_integer(Timestamp).
 
 proc_value(<<"TEXT">>, ValueTkn, Msg) ->
@@ -893,117 +790,11 @@ convert_float(null) ->
 convert_float(undefined) ->
     null.
 
-make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) ->
-    InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
-    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn),
-    maps:merge(Rows, #{
-        iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
-        iotdb_field_key(device_id, IoTDBVsn) => DeviceId
-    }).
-
-replace_dtypes(Rows0, IoTDBVsn) ->
-    {Types, Rows} = maps:take(dtypes, Rows0),
-    Rows#{iotdb_field_key(data_types, IoTDBVsn) => Types}.
-
-aggregate_rows(DataList, InitAcc) ->
-    lists:foldr(
-        fun(
-            #{
-                timestamp := Timestamp,
-                measurement := Measurement,
-                data_type := DataType,
-                value := Data
-            },
-            #{
-                timestamps := AccTs,
-                measurements := AccM,
-                dtypes := AccDt,
-                values := AccV
-            } = Acc
-        ) ->
-            Timestamps = [Timestamp | AccTs],
-            case index_of(Measurement, AccM) of
-                0 ->
-                    Acc#{
-                        timestamps => Timestamps,
-                        values => [pad_value(Data, length(AccTs)) | pad_existing_values(AccV)],
-                        measurements => [Measurement | AccM],
-                        dtypes => [DataType | AccDt]
-                    };
-                Index ->
-                    Acc#{
-                        timestamps => Timestamps,
-                        values => insert_value(Index, Data, AccV),
-                        measurements => AccM,
-                        dtypes => AccDt
-                    }
-            end
-        end,
-        InitAcc,
-        DataList
-    ).
-
-pad_value(Data, N) ->
-    [Data | lists:duplicate(N, null)].
-
-pad_existing_values(Values) ->
-    [[null | Value] || Value <- Values].
-
-index_of(E, List) ->
-    string:str(List, [E]).
-
-insert_value(_Index, _Data, []) ->
-    [];
-insert_value(1, Data, [Value | Values]) ->
-    [[Data | Value] | insert_value(0, Data, Values)];
-insert_value(Index, Data, [Value | Values]) ->
-    [[null | Value] | insert_value(Index - 1, Data, Values)].
-
-iotdb_field_key(is_aligned, ?VSN_1_3_X) ->
-    <<"is_aligned">>;
-iotdb_field_key(is_aligned, ?VSN_1_1_X) ->
-    <<"is_aligned">>;
-iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
-    <<"is_aligned">>;
-iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
-    <<"isAligned">>;
-iotdb_field_key(is_aligned, Vsn) when
-    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
-->
-    'isAligned';
-iotdb_field_key(device_id, ?VSN_1_3_X) ->
-    <<"device">>;
-iotdb_field_key(device_id, ?VSN_1_1_X) ->
-    <<"device">>;
-iotdb_field_key(device_id, ?VSN_1_0_X) ->
-    <<"device">>;
-iotdb_field_key(device_id, ?VSN_0_13_X) ->
-    <<"deviceId">>;
-iotdb_field_key(device_id, Vsn) when
-    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
-->
-    'deviceId';
-iotdb_field_key(data_types, ?VSN_1_3_X) ->
-    <<"data_types">>;
-iotdb_field_key(data_types, ?VSN_1_1_X) ->
-    <<"data_types">>;
-iotdb_field_key(data_types, ?VSN_1_0_X) ->
-    <<"data_types">>;
-iotdb_field_key(data_types, ?VSN_0_13_X) ->
-    <<"dataTypes">>;
-iotdb_field_key(data_types, Vsn) when
-    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
-->
-    dtypes.
-
-to_list(List) when is_list(List) -> List;
-to_list(Data) -> [Data].
-
 %% If device_id is missing from the channel data, try to find it from the payload
-device_id(Message, Payloads, Channel) ->
+device_id(Message, Payload, Channel) ->
     case maps:get(device_id, Channel, []) of
         [] ->
-            maps:get(<<"device_id">>, hd(Payloads), undefined);
+            maps:get(<<"device_id">>, Payload, undefined);
         DeviceIdTkn ->
             emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
     end.
@@ -1046,76 +837,17 @@ preproc_data_template(DataList) ->
             #{
                 timestamp => emqx_placeholder:preproc_tmpl(Atom2Bin(Timestamp)),
                 measurement => emqx_placeholder:preproc_tmpl(Measurement),
-                data_type => emqx_placeholder:preproc_tmpl(Atom2Bin(DataType)),
+                data_type => Atom2Bin(DataType),
                 value => emqx_placeholder:preproc_tmpl(Value)
             }
         end,
         DataList
     ).
 
-try_render_messages([{ChannelId, _} | _] = Msgs, IoTDBVsn, Channels) ->
-    case maps:find(ChannelId, Channels) of
-        {ok, Channel} ->
-            case do_render_message(Msgs, Channel, IoTDBVsn, #{}) of
-                RenderMsgs when is_map(RenderMsgs) ->
-                    {ok,
-                        lists:map(
-                            fun({{DeviceId, IsAligned}, DataList}) ->
-                                make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn)
-                            end,
-                            maps:to_list(RenderMsgs)
-                        )};
-                Error ->
-                    Error
-            end;
-        _ ->
-            {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
-    end.
-
-do_render_message([], _Channel, _IoTDBVsn, Acc) ->
-    Acc;
-do_render_message([{_, Msg} | Msgs], Channel, IoTDBVsn, Acc) ->
-    case render_channel_message(Channel, IoTDBVsn, Msg) of
-        {ok, NewDataList, DeviceId, IsAligned} ->
-            Fun = fun(V) -> NewDataList ++ V end,
-            Acc1 = maps:update_with({DeviceId, IsAligned}, Fun, NewDataList, Acc),
-            do_render_message(Msgs, Channel, IoTDBVsn, Acc1);
-        Error ->
-            Error
-    end.
-
-render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) ->
-    Payloads = to_list(parse_payload(get_payload(Message))),
-    case device_id(Message, Payloads, Channel) of
-        undefined ->
-            {error, device_id_missing};
-        DeviceId ->
-            case get_data_template(Channel, Payloads) of
-                [] ->
-                    {error, invalid_template};
-                DataTemplate ->
-                    case proc_data(DataTemplate, Message, IoTDBVsn) of
-                        {ok, DataList} ->
-                            {ok, DataList, DeviceId, IsAligned};
-                        Error ->
-                            Error
-                    end
-            end
-    end.
-
-%% Get the message template.
-%% In order to be compatible with 4.4, the template version has higher priority
-%% This is a template, using it
-get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
-    Data;
-%% This is a self-describing message
-get_data_template(#{data := []}, Payloads) ->
-    preproc_data_list(Payloads).
-
 do_on_query(InstanceId, ChannelId, Data, #{driver := restapi} = State) ->
     emqx_bridge_http_connector:on_query(InstanceId, {ChannelId, Data}, State);
 do_on_query(InstanceId, _ChannelId, Data, #{driver := thrift} = _State) ->
-    ecpool:pick_and_do(InstanceId, {iotdb, insert_tablet, [Data]}, no_handover).
+    ecpool:pick_and_do(InstanceId, {iotdb, insert_records, [Data]}, no_handover).
 
 %% 1. The default timeout in Thrift is `infinity`, but it may cause stuck
 %% 2. The schema of `timeout` accepts a zero value, but the Thrift driver not
@@ -1138,3 +870,128 @@ normalize_thrift_timeout(Timeouts) ->
         end,
         Timeouts
     ).
+
+%%-------------------------------------------------------------------------------------
+%% batch
+%%-------------------------------------------------------------------------------------
+try_render_records([{ChannelId, _} | _] = Msgs, #{driver := Driver, channels := Channels}) ->
+    case maps:find(ChannelId, Channels) of
+        {ok, #{is_aligned := IsAligned} = Channel} ->
+            EmptyRecords = #{
+                timestamps => [],
+                measurements_list => [],
+                data_types_list => [],
+                values_list => [],
+                devices => [],
+                is_aligned_name(Driver) => IsAligned
+            },
+            do_render_record(Msgs, Channel, EmptyRecords);
+        _ ->
+            {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
+    end.
+
+do_render_record([], _Channel, Acc) ->
+    {ok, Acc};
+do_render_record([{_, Msg} | Msgs], Channel, Acc) ->
+    case render_channel_record(Channel, Msg) of
+        {ok, Record} ->
+            do_render_record(Msgs, Channel, append_record(Record, Acc));
+        Error ->
+            Error
+    end.
+
+render_channel_record(#{data := DataTemplate} = Channel, Msg) ->
+    Payload = parse_payload(get_payload(Msg)),
+    case device_id(Msg, Payload, Channel) of
+        undefined ->
+            {error, device_id_missing};
+        DeviceId ->
+            #{timestamp := TimestampTkn} = hd(DataTemplate),
+            NowNS = erlang:system_time(nanosecond),
+            Nows = #{
+                now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond),
+                now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
+                now_ns => NowNS
+            },
+            case
+                proc_record_data(
+                    DataTemplate,
+                    Msg,
+                    [],
+                    [],
+                    []
+                )
+            of
+                {ok, MeasurementAcc, TypeAcc, ValueAcc} ->
+                    {ok, #{
+                        timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
+                        measurements => MeasurementAcc,
+                        data_types => TypeAcc,
+                        values => ValueAcc,
+                        device_id => DeviceId
+                    }};
+                Error ->
+                    Error
+            end
+    end.
+
+proc_record_data(
+    [
+        #{
+            measurement := Measurement,
+            data_type := DataType,
+            value := ValueTkn
+        }
+        | T
+    ],
+    Msg,
+    MeasurementAcc,
+    TypeAcc,
+    ValueAcc
+) ->
+    try
+        proc_record_data(
+            T,
+            Msg,
+            [emqx_placeholder:proc_tmpl(Measurement, Msg) | MeasurementAcc],
+            [DataType | TypeAcc],
+            [proc_value(DataType, ValueTkn, Msg) | ValueAcc]
+        )
+    catch
+        throw:Reason ->
+            {error, Reason};
+        Error:Reason:Stacktrace ->
+            ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
+            {error, invalid_data}
+    end;
+proc_record_data([], _Msg, MeasurementAcc, TypeAcc, ValueAcc) ->
+    {ok, MeasurementAcc, TypeAcc, ValueAcc}.
+
+append_record(
+    #{
+        timestamp := Ts,
+        measurements := Measurements,
+        data_types := DataTypes,
+        values := Vals,
+        device_id := DeviceId
+    },
+    #{
+        timestamps := TsL,
+        measurements_list := MeasL,
+        data_types_list := DtL,
+        values_list := ValL,
+        devices := DevL
+    } = Records
+) ->
+    Records#{
+        timestamps := [Ts | TsL],
+        measurements_list := [Measurements | MeasL],
+        data_types_list := [DataTypes | DtL],
+        values_list := [Vals | ValL],
+        devices := [DeviceId | DevL]
+    }.
+
+is_aligned_name(restapi) ->
+    is_aligned;
+is_aligned_name(thrift) ->
+    'isAligned'.

+ 32 - 171
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -19,9 +19,7 @@
 
 all() ->
     [
-        {group, iotdb110},
         {group, iotdb130},
-        {group, legacy},
         {group, thrift}
     ].
 
@@ -29,16 +27,11 @@ groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE) -- [t_thrift_auto_recon],
     Async = [
         t_async_device_id_missing,
-        t_async_invalid_template,
         t_async_query,
-        t_extract_device_id_from_rule_engine_message,
-        %%todo
-        t_sync_query_aggregated
+        t_extract_device_id_from_rule_engine_message
     ],
     [
-        {iotdb110, AllTCs},
         {iotdb130, AllTCs},
-        {legacy, AllTCs},
         {thrift, (AllTCs -- Async) ++ [t_thrift_auto_recon]}
     ].
 
@@ -60,14 +53,11 @@ init_per_suite(Config) ->
 end_per_suite(Config) ->
     emqx_bridge_v2_testlib:end_per_suite(Config).
 
-init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 ->
+init_per_group(iotdb130 = Type, Config0) ->
     Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"),
     ProxyName = atom_to_list(Type),
-    {IotDbVersion, DefaultPort} =
-        case Type of
-            iotdb110 -> {?VSN_1_1_X, "18080"};
-            iotdb130 -> {?VSN_1_3_X, "28080"}
-        end,
+    IotDbVersion = ?VSN_1_3_X,
+    DefaultPort = "28080",
     Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", DefaultPort)),
     case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
@@ -89,30 +79,6 @@ init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 ->
                     {skip, no_iotdb}
             end
     end;
-init_per_group(legacy = Type, Config0) ->
-    Host = os:getenv("IOTDB_LEGACY_HOST", "toxiproxy.emqx.net"),
-    Port = list_to_integer(os:getenv("IOTDB_LEGACY_PORT", "38080")),
-    ProxyName = "iotdb013",
-    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
-        true ->
-            Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
-            [
-                {bridge_host, Host},
-                {bridge_port, Port},
-                {rest_port, Port},
-                {proxy_name, ProxyName},
-                {iotdb_version, ?VSN_0_13_X},
-                {iotdb_rest_prefix, <<"/rest/v1/">>}
-                | Config
-            ];
-        false ->
-            case os:getenv("IS_CI") of
-                "yes" ->
-                    throw(no_iotdb);
-                _ ->
-                    {skip, no_iotdb}
-            end
-    end;
 init_per_group(thrift = Type, Config0) ->
     Host = os:getenv("IOTDB_THRIFT_HOST", "toxiproxy.emqx.net"),
     Port = list_to_integer(os:getenv("IOTDB_THRIFT_PORT", "46667")),
@@ -140,11 +106,7 @@ init_per_group(thrift = Type, Config0) ->
 init_per_group(_Group, Config) ->
     Config.
 
-end_per_group(Group, Config) when
-    Group =:= iotdb110;
-    Group =:= iotdb130;
-    Group =:= legacy
-->
+end_per_group(iotdb130, Config) ->
     emqx_bridge_v2_testlib:end_per_group(Config),
     ok;
 end_per_group(_Group, _Config) ->
@@ -318,13 +280,26 @@ is_error_check(Reason) ->
 action_config(TestCase, Name, Config) ->
     Type = ?config(bridge_type, Config),
     QueryMode = query_mode(TestCase),
+    DataTemplate =
+        case TestCase of
+            t_template ->
+                <<"">>;
+            _ ->
+                emqx_utils_json:encode(
+                    #{
+                        <<"measurement">> => <<"${payload.measurement}">>,
+                        <<"data_type">> => test_case_data_type(TestCase),
+                        <<"value">> => <<"${payload.value}">>
+                    }
+                )
+        end,
     ConfigString =
         io_lib:format(
             "actions.~s.~s {\n"
             "  enable = true\n"
             "  connector = \"~s\"\n"
             "  parameters = {\n"
-            "     data = []\n"
+            "     data = [~s]\n"
             "  }\n"
             "  resource_opts = {\n"
             "     query_mode = \"~s\"\n"
@@ -334,6 +309,7 @@ action_config(TestCase, Name, Config) ->
                 Type,
                 Name,
                 Name,
+                DataTemplate,
                 QueryMode
             ]
         ),
@@ -445,104 +421,6 @@ t_async_query(Config) ->
         emqx_utils_json:decode(IoTDBResult)
     ).
 
-t_sync_query_aggregated(Config) ->
-    DeviceId = iotdb_device(Config),
-    MS = erlang:system_time(millisecond) - 5000,
-    Payload = [
-        make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000),
-        make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000),
-        make_iotdb_payload(DeviceId, "temp", "INT64", 38.7, MS - 5000),
-        make_iotdb_payload(DeviceId, "temp", "INT64", "39", integer_to_binary(MS - 4000)),
-        make_iotdb_payload(DeviceId, "temp", "INT64", "34", MS - 3000),
-        make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000),
-        make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000),
-        %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
-        (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
-
-        make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", MS - 6000),
-        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, MS - 5000),
-        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, MS - 4000),
-        make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", MS - 3000),
-        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, MS - 2000),
-        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, MS - 1000),
-
-        make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", MS + 1000),
-        make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, MS + 1000),
-        make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, MS + 1000),
-        make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", MS + 1000),
-        make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", MS + 1000),
-        make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", MS + 1000),
-        make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", MS + 1000),
-        make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, MS + 1000),
-        make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, MS + 1000),
-        make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", MS + 1000),
-        make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", MS + 1000),
-        make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", MS + 1000),
-        make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, MS + 1000),
-
-        make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", MS + 1000)
-    ],
-    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
-    ok = emqx_bridge_v2_testlib:t_sync_query(
-        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
-    ),
-
-    Time = integer_to_binary(MS - 20000),
-    %% check weight
-    QueryWeight = <<"select weight from ", DeviceId/binary, " where time > ", Time/binary>>,
-    {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
-    ?assertMatch(
-        #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
-        emqx_utils_json:decode(ResultWeight)
-    ),
-    %% [FIXME] https://github.com/apache/iotdb/issues/12375
-    %% null don't seem to be supported by IoTDB insertTablet when 1.3.0
-    case ?config(iotdb_version, Config) of
-        ?VSN_1_3_X ->
-            skip;
-        _ ->
-            %% check rest ts = MS + 1000
-            CheckTime = integer_to_binary(MS + 1000),
-            QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>,
-            {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
-            #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
-                ResultRest
-            ),
-            Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
-            Exp = #{
-                exp(DeviceId, "charged") => true,
-                exp(DeviceId, "floated") => true,
-                exp(DeviceId, "started") => true,
-                exp(DeviceId, "stoked") => true,
-                exp(DeviceId, "enriched") => true,
-                exp(DeviceId, "gutted") => true,
-                exp(DeviceId, "drained") => false,
-                exp(DeviceId, "toasted") => false,
-                exp(DeviceId, "uncharted") => false,
-                exp(DeviceId, "dazzled") => false,
-                exp(DeviceId, "unplugged") => false,
-                exp(DeviceId, "unraveled") => false,
-                exp(DeviceId, "undecided") => null,
-                exp(DeviceId, "foo") => <<"bar">>,
-                exp(DeviceId, "temp") => null,
-                exp(DeviceId, "weight") => null
-            },
-            ?assertEqual(Exp, Results),
-
-            %% check temp
-            QueryTemp = <<"select temp from ", DeviceId/binary, " where time > ", Time/binary>>,
-            {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
-            ?assertMatch(
-                #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]},
-                emqx_utils_json:decode(ResultTemp)
-            )
-    end,
-    ok.
-
-exp(Dev, M0) ->
-    M = s_to_b(M0),
-    <<Dev/binary, ".", M/binary>>.
-
 t_sync_query_fail(Config) ->
     DeviceId = iotdb_device(Config),
     Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
@@ -598,14 +476,6 @@ t_extract_device_id_from_rule_engine_message(Config) ->
     ),
     ok.
 
-t_sync_invalid_template(Config) ->
-    emqx_bridge_v2_testlib:t_sync_query(
-        Config,
-        make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
-        is_error_check(invalid_template),
-        iotdb_bridge_on_query
-    ).
-
 t_async_device_id_missing(Config) ->
     emqx_bridge_v2_testlib:t_async_query(
         Config,
@@ -614,14 +484,6 @@ t_async_device_id_missing(Config) ->
         iotdb_bridge_on_query_async
     ).
 
-t_async_invalid_template(Config) ->
-    emqx_bridge_v2_testlib:t_async_query(
-        Config,
-        make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
-        is_error_check(invalid_template),
-        iotdb_bridge_on_query_async
-    ).
-
 t_create_via_http(Config) ->
     emqx_bridge_v2_testlib:t_create_via_http(
         Config,
@@ -706,13 +568,10 @@ t_template(Config) ->
     Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
     MessageF1 = make_message_fun(Topic, Payload1),
 
-    is_success_check(
+    is_error_check(
         emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
     ),
 
-    {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
-    ?assertMatch(#{<<"values">> := [[true]]}, emqx_utils_json:decode(Res1_1)),
-
     iotdb_reset(Config, DeviceId),
     iotdb_reset(Config, TemplateDeviceId),
 
@@ -759,15 +618,6 @@ t_sync_query_case(Config) ->
         emqx_utils_json:decode(IoTDBResult)
     ).
 
-t_sync_query_invalid_type(Config) ->
-    DeviceId = iotdb_device(Config),
-    Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
-    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
-    IsInvalidType = fun(Result) -> ?assertMatch({error, #{reason := invalid_type}}, Result) end,
-    ok = emqx_bridge_v2_testlib:t_sync_query(
-        Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
-    ).
-
 t_sync_query_unmatched_type(Config) ->
     DeviceId = iotdb_device(Config),
     Payload = make_iotdb_payload(DeviceId, "temp", "BOOLEAN", "not boolean"),
@@ -803,3 +653,14 @@ query_mode(TestCase) ->
         _ ->
             sync
     end.
+
+exp(Dev, M0) ->
+    M = s_to_b(M0),
+    <<Dev/binary, ".", M/binary>>.
+
+test_case_data_type(t_device_id) ->
+    <<"BOOLEAN">>;
+test_case_data_type(t_sync_query_unmatched_type) ->
+    <<"BOOLEAN">>;
+test_case_data_type(_) ->
+    <<"INT32">>.

+ 6 - 0
changes/ee/breaking-14295.en.md

@@ -0,0 +1,6 @@
+Refactored IoTD connector, the following are no longer supported:
+- The self-describing template has been removed
+- One rule can only carry one `payload`, arrays of payloads are no longer supported
+- The `data type` is now a plain value, not a template value
+- The REST API driver only supports IoTDB 1.3.x and later
+- Thrift driver has add support for "batch" mode