Просмотр исходного кода

Merge pull request #12898 from emqx/iotdb-1.3.0

feat: support iotdb 1.3.0
JianBo He 1 год назад
Родитель
Сommit
69bdcd2f24

+ 37 - 8
.ci/docker-compose-file/docker-compose-iotdb.yaml

@@ -1,24 +1,53 @@
 version: '3.9'
 
 services:
-  iotdb:
-    container_name: iotdb
-    hostname: iotdb
+  iotdb_1_3_0:
+    container_name: iotdb130
+    hostname: iotdb130
+    image: apache/iotdb:1.3.0-standalone
+    restart: always
+    environment:
+      - enable_rest_service=true
+      - cn_internal_address=iotdb130
+      - cn_internal_port=10710
+      - cn_consensus_port=10720
+      - cn_seed_config_node=iotdb130:10710
+      - dn_rpc_address=iotdb130
+      - dn_internal_address=iotdb130
+      - dn_rpc_port=6667
+      - dn_mpp_data_exchange_port=10740
+      - dn_schema_region_consensus_port=10750
+      - dn_data_region_consensus_port=10760
+      - dn_seed_config_node=iotdb130:10710
+    # volumes:
+    #     - ./data:/iotdb/data
+    #     - ./logs:/iotdb/logs
+    expose:
+      - "18080"
+    # IoTDB's REST interface, uncomment for local testing
+    # ports:
+    #     - "18080:18080"
+    networks:
+      - emqx_bridge
+
+  iotdb_1_1_0:
+    container_name: iotdb110
+    hostname: iotdb110
     image: apache/iotdb:1.1.0-standalone
     restart: always
     environment:
       - enable_rest_service=true
-      - cn_internal_address=iotdb
+      - cn_internal_address=iotdb110
       - cn_internal_port=10710
       - cn_consensus_port=10720
-      - cn_target_config_node_list=iotdb:10710
-      - dn_rpc_address=iotdb
-      - dn_internal_address=iotdb
+      - cn_target_config_node_list=iotdb110:10710
+      - dn_rpc_address=iotdb110
+      - dn_internal_address=iotdb110
       - dn_rpc_port=6667
       - dn_mpp_data_exchange_port=10740
       - dn_schema_region_consensus_port=10750
       - dn_data_region_consensus_port=10760
-      - dn_target_config_node_list=iotdb:10710
+      - dn_target_config_node_list=iotdb110:10710
     # volumes:
     #     - ./data:/iotdb/data
     #     - ./logs:/iotdb/logs

+ 8 - 2
.ci/docker-compose-file/toxiproxy.json

@@ -139,9 +139,15 @@
     "enabled": true
   },
   {
-    "name": "iotdb",
+    "name": "iotdb110",
     "listen": "0.0.0.0:18080",
-    "upstream": "iotdb:18080",
+    "upstream": "iotdb110:18080",
+    "enabled": true
+  },
+  {
+    "name": "iotdb130",
+    "listen": "0.0.0.0:28080",
+    "upstream": "iotdb130:18080",
     "enabled": true
   },
   {

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -705,7 +705,7 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
     ),
     receive
         {result, Result} -> IsSuccessCheck(Result)
-    after 5_000 ->
+    after 8_000 ->
         throw(timeout)
     end,
     ok.

+ 2 - 0
apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl

@@ -5,6 +5,8 @@
 -ifndef(EMQX_BRIDGE_IOTDB_HRL).
 -define(EMQX_BRIDGE_IOTDB_HRL, true).
 
+-define(VSN_1_3_X, 'v1.3.x').
+-define(VSN_1_2_X, 'v1.2.x').
 -define(VSN_1_1_X, 'v1.1.x').
 -define(VSN_1_0_X, 'v1.0.x').
 -define(VSN_0_13_X, 'v0.13.x').

+ 4 - 15
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -66,12 +66,7 @@ fields(action_config) ->
         ]
     );
 fields(action_resource_opts) ->
-    lists:filter(
-        fun({K, _V}) ->
-            not lists:member(K, unsupported_opts())
-        end,
-        emqx_bridge_v2_schema:action_resource_opts_fields()
-    );
+    emqx_bridge_v2_schema:action_resource_opts_fields();
 fields(action_parameters) ->
     [
         {is_aligned,
@@ -152,7 +147,7 @@ fields("get_bridge_v2") ->
 fields("config") ->
     basic_config() ++ request_config();
 fields("creation_opts") ->
-    proplists_without(unsupported_opts(), emqx_resource_schema:fields("creation_opts"));
+    emqx_resource_schema:fields("creation_opts");
 fields(auth_basic) ->
     [
         {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
@@ -222,10 +217,10 @@ basic_config() ->
             )},
         {iotdb_version,
             mk(
-                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
+                hoconsc:enum([?VSN_1_3_X, ?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
                 #{
                     desc => ?DESC("config_iotdb_version"),
-                    default => ?VSN_1_1_X
+                    default => ?VSN_1_3_X
                 }
             )}
     ] ++ resource_creation_opts() ++
@@ -270,12 +265,6 @@ resource_creation_opts() ->
             )}
     ].
 
-unsupported_opts() ->
-    [
-        batch_size,
-        batch_time
-    ].
-
 %%-------------------------------------------------------------------------------------------------
 %% v2 examples
 %%-------------------------------------------------------------------------------------------------

+ 130 - 28
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -21,6 +21,8 @@
     on_get_status/2,
     on_query/3,
     on_query_async/4,
+    on_batch_query/3,
+    on_batch_query_async/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_get_channels/1,
@@ -94,7 +96,7 @@ connector_example_values() ->
         name => <<"iotdb_connector">>,
         type => iotdb,
         enable => true,
-        iotdb_version => ?VSN_1_1_X,
+        iotdb_version => ?VSN_1_3_X,
         authentication => #{
             <<"username">> => <<"root">>,
             <<"password">> => <<"******">>
@@ -133,10 +135,10 @@ fields("connection_fields") ->
             )},
         {iotdb_version,
             mk(
-                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
+                hoconsc:enum([?VSN_1_3_X, ?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
                 #{
                     desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"),
-                    default => ?VSN_1_1_X
+                    default => ?VSN_1_3_X
                 }
             )},
         {authentication,
@@ -280,8 +282,8 @@ on_query(
         state => emqx_utils:redact(State)
     }),
 
-    case try_render_message(Req, IoTDBVsn, Channels) of
-        {ok, IoTDBPayload} ->
+    case try_render_messages([Req], IoTDBVsn, Channels) of
+        {ok, [IoTDBPayload]} ->
             handle_response(
                 emqx_bridge_http_connector:on_query(
                     InstanceId, {ChannelId, IoTDBPayload}, State
@@ -306,8 +308,8 @@ on_query_async(
         send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case try_render_message(Req, IoTDBVsn, Channels) of
-        {ok, IoTDBPayload} ->
+    case try_render_messages([Req], IoTDBVsn, Channels) of
+        {ok, [IoTDBPayload]} ->
             ReplyFunAndArgs =
                 {
                     fun(Result) ->
@@ -323,6 +325,71 @@ on_query_async(
             Error
     end.
 
+on_batch_query_async(
+    InstId,
+    Requests,
+    Callback,
+    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+) ->
+    ?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}),
+    [{ChannelId, _Message} | _] = Requests,
+    ?SLOG(debug, #{
+        msg => "iotdb_bridge_on_query_batch_async_called",
+        instance_id => InstId,
+        send_message => Requests,
+        state => emqx_utils:redact(State)
+    }),
+    case try_render_messages(Requests, IoTDBVsn, Channels) of
+        {ok, IoTDBPayloads} ->
+            ReplyFunAndArgs =
+                {
+                    fun(Result) ->
+                        Response = handle_response(Result),
+                        emqx_resource:apply_reply_fun(Callback, Response)
+                    end,
+                    []
+                },
+            lists:map(
+                fun(IoTDBPayload) ->
+                    emqx_bridge_http_connector:on_query_async(
+                        InstId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State
+                    )
+                end,
+                IoTDBPayloads
+            );
+        Error ->
+            Error
+    end.
+
+on_batch_query(
+    InstId,
+    [{ChannelId, _Message}] = Requests,
+    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+) ->
+    ?tp(iotdb_bridge_on_batch_query, #{instance_id => InstId}),
+    ?SLOG(debug, #{
+        msg => "iotdb_bridge_on_batch_query_called",
+        instance_id => InstId,
+        send_message => Requests,
+        state => emqx_utils:redact(State)
+    }),
+
+    case try_render_messages(Requests, IoTDBVsn, Channels) of
+        {ok, IoTDBPayloads} ->
+            lists:map(
+                fun(IoTDBPayload) ->
+                    handle_response(
+                        emqx_bridge_http_connector:on_query(
+                            InstId, {ChannelId, IoTDBPayload}, State
+                        )
+                    )
+                end,
+                IoTDBPayloads
+            );
+        Error ->
+            Error
+    end.
+
 on_add_channel(
     InstanceId,
     #{iotdb_version := Version, channels := Channels} = OldState0,
@@ -342,6 +409,7 @@ on_add_channel(
             Path =
                 case Version of
                     ?VSN_1_1_X -> InsertTabletPathV2;
+                    ?VSN_1_3_X -> InsertTabletPathV2;
                     _ -> InsertTabletPathV1
                 end,
 
@@ -442,14 +510,14 @@ maybe_preproc_tmpl(Value) when is_binary(Value) ->
 maybe_preproc_tmpl(Value) ->
     Value.
 
-proc_data(PreProcessedData, Msg) ->
+proc_data(PreProcessedData, Msg, IoTDBVsn) ->
     NowNS = erlang:system_time(nanosecond),
     Nows = #{
         now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond),
         now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
         now_ns => NowNS
     },
-    proc_data(PreProcessedData, Msg, Nows, []).
+    proc_data(PreProcessedData, Msg, Nows, IoTDBVsn, []).
 
 proc_data(
     [
@@ -463,15 +531,16 @@ proc_data(
     ],
     Msg,
     Nows,
+    IotDbVsn,
     Acc
 ) ->
     DataType = list_to_binary(
         string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
     ),
     try
-        proc_data(T, Msg, Nows, [
+        proc_data(T, Msg, Nows, IotDbVsn, [
             #{
-                timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
+                timestamp => iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows),
                 measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
                 data_type => DataType,
                 value => proc_value(DataType, ValueTkn, Msg)
@@ -485,23 +554,28 @@ proc_data(
             ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
             {error, invalid_data}
     end;
-proc_data([], _Msg, _Nows, Acc) ->
+proc_data([], _Msg, _Nows, _IotDbVsn, Acc) ->
     {ok, lists:reverse(Acc)}.
 
-iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
+iot_timestamp(_IotDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
     Timestamp;
-iot_timestamp(TimestampTkn, Msg, Nows) ->
-    iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
+iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows) ->
+    iot_timestamp(IotDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
 
-iot_timestamp(<<"now_us">>, #{now_us := NowUs}) ->
+%% > v1.3.0 don't allow write nanoseconds nor microseconds
+iot_timestamp(?VSN_1_3_X, <<"now_us">>, #{now_ms := NowMs}) ->
+    NowMs;
+iot_timestamp(?VSN_1_3_X, <<"now_ns">>, #{now_ms := NowMs}) ->
+    NowMs;
+iot_timestamp(_IotDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
     NowUs;
-iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) ->
+iot_timestamp(_IotDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
     NowNs;
-iot_timestamp(Timestamp, #{now_ms := NowMs}) when
+iot_timestamp(_IotDbVsn, Timestamp, #{now_ms := NowMs}) when
     Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
 ->
     NowMs;
-iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
+iot_timestamp(_IotDbVsn, Timestamp, _) when is_binary(Timestamp) ->
     binary_to_integer(Timestamp).
 
 proc_value(<<"TEXT">>, ValueTkn, Msg) ->
@@ -569,11 +643,10 @@ convert_float(undefined) ->
 make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) ->
     InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
     Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn),
-    {ok,
-        maps:merge(Rows, #{
-            iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
-            iotdb_field_key(device_id, IoTDBVsn) => DeviceId
-        })}.
+    maps:merge(Rows, #{
+        iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
+        iotdb_field_key(device_id, IoTDBVsn) => DeviceId
+    }).
 
 replace_dtypes(Rows0, IoTDBVsn) ->
     {Types, Rows} = maps:take(dtypes, Rows0),
@@ -633,18 +706,24 @@ insert_value(1, Data, [Value | Values]) ->
 insert_value(Index, Data, [Value | Values]) ->
     [[null | Value] | insert_value(Index - 1, Data, Values)].
 
+iotdb_field_key(is_aligned, ?VSN_1_3_X) ->
+    <<"is_aligned">>;
 iotdb_field_key(is_aligned, ?VSN_1_1_X) ->
     <<"is_aligned">>;
 iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
     <<"is_aligned">>;
 iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
     <<"isAligned">>;
+iotdb_field_key(device_id, ?VSN_1_3_X) ->
+    <<"device">>;
 iotdb_field_key(device_id, ?VSN_1_1_X) ->
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_1_0_X) ->
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_0_13_X) ->
     <<"deviceId">>;
+iotdb_field_key(data_types, ?VSN_1_3_X) ->
+    <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_1_1_X) ->
     <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_1_0_X) ->
@@ -707,14 +786,37 @@ preproc_data_template(DataList) ->
         DataList
     ).
 
-try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) ->
+try_render_messages([{ChannelId, _} | _] = Msgs, IoTDBVsn, Channels) ->
     case maps:find(ChannelId, Channels) of
         {ok, Channel} ->
-            render_channel_message(Channel, IoTDBVsn, Msg);
+            case do_render_message(Msgs, Channel, IoTDBVsn, #{}) of
+                RenderMsgs when is_map(RenderMsgs) ->
+                    {ok,
+                        lists:map(
+                            fun({{DeviceId, IsAligned}, DataList}) ->
+                                make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn)
+                            end,
+                            maps:to_list(RenderMsgs)
+                        )};
+                Error ->
+                    Error
+            end;
         _ ->
             {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
     end.
 
+do_render_message([], _Channel, _IoTDBVsn, Acc) ->
+    Acc;
+do_render_message([{_, Msg} | Msgs], Channel, IoTDBVsn, Acc) ->
+    case render_channel_message(Channel, IoTDBVsn, Msg) of
+        {ok, NewDataList, DeviceId, IsAligned} ->
+            Fun = fun(V) -> NewDataList ++ V end,
+            Acc1 = maps:update_with({DeviceId, IsAligned}, Fun, NewDataList, Acc),
+            do_render_message(Msgs, Channel, IoTDBVsn, Acc1);
+        Error ->
+            Error
+    end.
+
 render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) ->
     Payloads = to_list(parse_payload(get_payload(Message))),
     case device_id(Message, Payloads, Channel) of
@@ -725,9 +827,9 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message)
                 [] ->
                     {error, invalid_template};
                 DataTemplate ->
-                    case proc_data(DataTemplate, Message) of
+                    case proc_data(DataTemplate, Message, IoTDBVsn) of
                         {ok, DataList} ->
-                            make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn);
+                            {ok, DataList, DeviceId, IsAligned};
                         Error ->
                             Error
                     end

+ 93 - 76
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -20,14 +20,16 @@
 
 all() ->
     [
-        {group, plain},
+        {group, iotdb110},
+        {group, iotdb130},
         {group, legacy}
     ].
 
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {plain, AllTCs},
+        {iotdb110, AllTCs},
+        {iotdb130, AllTCs},
         {legacy, AllTCs}
     ].
 
@@ -37,10 +39,15 @@ init_per_suite(Config) ->
 end_per_suite(Config) ->
     emqx_bridge_v2_testlib:end_per_suite(Config).
 
-init_per_group(plain = Type, Config0) ->
+init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 ->
     Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"),
-    Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", "18080")),
-    ProxyName = "iotdb",
+    ProxyName = atom_to_list(Type),
+    {IotDbVersion, DefaultPort} =
+        case Type of
+            iotdb110 -> {?VSN_1_1_X, "18080"};
+            iotdb130 -> {?VSN_1_3_X, "28080"}
+        end,
+    Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", DefaultPort)),
     case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
         true ->
             Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
@@ -48,7 +55,7 @@ init_per_group(plain = Type, Config0) ->
                 {bridge_host, Host},
                 {bridge_port, Port},
                 {proxy_name, ProxyName},
-                {iotdb_version, ?VSN_1_1_X},
+                {iotdb_version, IotDbVersion},
                 {iotdb_rest_prefix, <<"/rest/v2/">>}
                 | Config
             ];
@@ -87,7 +94,8 @@ init_per_group(_Group, Config) ->
     Config.
 
 end_per_group(Group, Config) when
-    Group =:= plain;
+    Group =:= iotdb110;
+    Group =:= iotdb130;
     Group =:= legacy
 ->
     emqx_bridge_v2_testlib:end_per_group(Config),
@@ -245,7 +253,9 @@ iotdb_query(Config, Query) ->
     iotdb_request(Config, Path, Body, Opts).
 
 is_success_check({ok, 200, _, Body}) ->
-    ?assert(is_code(200, emqx_utils_json:decode(Body))).
+    ?assert(is_code(200, emqx_utils_json:decode(Body)));
+is_success_check(Other) ->
+    throw(Other).
 
 is_code(Code, #{<<"code">> := Code}) -> true;
 is_code(_, _) -> false.
@@ -359,89 +369,96 @@ t_async_query(Config) ->
 
 t_sync_query_aggregated(Config) ->
     DeviceId = iotdb_device(Config),
+    MS = erlang:system_time(millisecond) - 5000,
     Payload = [
-        make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290),
-        make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291),
-        make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292),
-        make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>),
-        make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294),
-        make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295),
-        make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296),
-        %% implicit 'now()' timestamp
-        make_iotdb_payload(DeviceId, "temp", "INT32", "40"),
+        make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000),
+        make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000),
+        make_iotdb_payload(DeviceId, "temp", "INT64", 38.7, MS - 5000),
+        make_iotdb_payload(DeviceId, "temp", "INT64", "39", integer_to_binary(MS - 4000)),
+        make_iotdb_payload(DeviceId, "temp", "INT64", "34", MS - 3000),
+        make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000),
+        make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000),
         %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
         (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
-        (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>},
-
-        make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290),
-        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291),
-        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292),
-        make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293),
-        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294),
-        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295),
-
-        make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300),
-        make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300),
-        make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300),
-        make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300),
-        make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300),
-        make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300),
-        make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300),
-        make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300),
-        make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300),
-        make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300),
-        make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300),
-        make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300),
-        make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300),
-
-        make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
+
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", MS - 6000),
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, MS - 5000),
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, MS - 4000),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", MS - 3000),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, MS - 2000),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, MS - 1000),
+
+        make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", MS + 1000),
+        make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, MS + 1000),
+        make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, MS + 1000),
+        make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", MS + 1000),
+        make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", MS + 1000),
+        make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", MS + 1000),
+        make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", MS + 1000),
+        make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, MS + 1000),
+        make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, MS + 1000),
+        make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", MS + 1000),
+        make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", MS + 1000),
+        make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", MS + 1000),
+        make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, MS + 1000),
+
+        make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", MS + 1000)
     ],
     MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
     ok = emqx_bridge_v2_testlib:t_sync_query(
         Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
     ),
 
-    %% check temp
-    QueryTemp = <<"select temp from ", DeviceId/binary>>,
-    {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
-    ?assertMatch(
-        #{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]},
-        emqx_utils_json:decode(ResultTemp)
-    ),
+    Time = integer_to_binary(MS - 20000),
     %% check weight
-    QueryWeight = <<"select weight from ", DeviceId/binary>>,
+    QueryWeight = <<"select weight from ", DeviceId/binary, " where time > ", Time/binary>>,
     {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
     ?assertMatch(
         #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
         emqx_utils_json:decode(ResultWeight)
     ),
-    %% check rest ts = 1685112026300
-    QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>,
-    {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
-    #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
-        ResultRest
-    ),
-    Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
-    Exp = #{
-        exp(DeviceId, "charged") => true,
-        exp(DeviceId, "floated") => true,
-        exp(DeviceId, "started") => true,
-        exp(DeviceId, "stoked") => true,
-        exp(DeviceId, "enriched") => true,
-        exp(DeviceId, "gutted") => true,
-        exp(DeviceId, "drained") => false,
-        exp(DeviceId, "toasted") => false,
-        exp(DeviceId, "uncharted") => false,
-        exp(DeviceId, "dazzled") => false,
-        exp(DeviceId, "unplugged") => false,
-        exp(DeviceId, "unraveled") => false,
-        exp(DeviceId, "undecided") => null,
-        exp(DeviceId, "foo") => <<"bar">>,
-        exp(DeviceId, "temp") => null,
-        exp(DeviceId, "weight") => null
-    },
-    ?assertEqual(Exp, Results),
-
+    %% [FIXME] https://github.com/apache/iotdb/issues/12375
+    %% null don't seem to be supported by IoTDB insertTablet when 1.3.0
+    case ?config(iotdb_version, Config) of
+        ?VSN_1_3_X ->
+            skip;
+        _ ->
+            %% check rest ts = MS + 1000
+            CheckTime = integer_to_binary(MS + 1000),
+            QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>,
+            {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
+            #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
+                ResultRest
+            ),
+            Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
+            Exp = #{
+                exp(DeviceId, "charged") => true,
+                exp(DeviceId, "floated") => true,
+                exp(DeviceId, "started") => true,
+                exp(DeviceId, "stoked") => true,
+                exp(DeviceId, "enriched") => true,
+                exp(DeviceId, "gutted") => true,
+                exp(DeviceId, "drained") => false,
+                exp(DeviceId, "toasted") => false,
+                exp(DeviceId, "uncharted") => false,
+                exp(DeviceId, "dazzled") => false,
+                exp(DeviceId, "unplugged") => false,
+                exp(DeviceId, "unraveled") => false,
+                exp(DeviceId, "undecided") => null,
+                exp(DeviceId, "foo") => <<"bar">>,
+                exp(DeviceId, "temp") => null,
+                exp(DeviceId, "weight") => null
+            },
+            ?assertEqual(Exp, Results),
+
+            %% check temp
+            QueryTemp = <<"select temp from ", DeviceId/binary, " where time > ", Time/binary>>,
+            {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
+            ?assertMatch(
+                #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]},
+                emqx_utils_json:decode(ResultTemp)
+            )
+    end,
     ok.
 
 exp(Dev, M0) ->

+ 23 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1024,7 +1024,29 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
 handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
     {ack, fun() -> ok end, #{}};
 handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
-    {ack, fun() -> ok end, #{}}.
+    {ack, fun() -> ok end, #{}};
+handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) ->
+    All = fun(L) ->
+        case L of
+            {ok, Pid} -> is_pid(Pid);
+            _ -> false
+        end
+    end,
+    case lists:all(All, Results) of
+        true ->
+            {ack, fun() -> ok end, #{}};
+        false ->
+            PostFn = fun() ->
+                ?SLOG(error, #{
+                    id => Id,
+                    msg => "async_batch_send_error",
+                    reason => Results,
+                    has_been_sent => HasBeenSent
+                }),
+                ok
+            end,
+            {nack, PostFn, #{}}
+    end.
 
 -spec aggregate_counters(data(), counters()) -> data().
 aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->