Просмотр исходного кода

Merge pull request #10881 from sstrigler/EMQX-9355-5-0-x-test-coverage-80

IoTDB test coverage
Stefan Strigler 2 лет назад
Родитель
Сommit
e011b5532f

+ 30 - 0
.ci/docker-compose-file/docker-compose-iotdb.yaml

@@ -29,3 +29,33 @@ services:
     #     - "18080:18080"
     networks:
       - emqx_bridge
+
+  iotdb_0_13:
+    container_name: iotdb013
+    hostname: iotdb013
+    image: apache/iotdb:0.13.4-node
+    restart: always
+    environment:
+      - enable_rest_service=true
+      - cn_internal_address=iotdb013
+      - cn_internal_port=10710
+      - cn_consensus_port=10720
+      - cn_target_config_node_list=iotdb013:10710
+      - dn_rpc_address=iotdb013
+      - dn_internal_address=iotdb013
+      - dn_rpc_port=6667
+      - dn_mpp_data_exchange_port=10740
+      - dn_schema_region_consensus_port=10750
+      - dn_data_region_consensus_port=10760
+      - dn_target_config_node_list=iotdb013:10710
+    volumes:
+      - ./iotdb013/iotdb-rest.properties:/iotdb/conf/iotdb-rest.properties
+    #     - ./data:/iotdb/data
+    #     - ./logs:/iotdb/logs
+    expose:
+      - "18080"
+    # IoTDB's REST interface, uncomment for local testing
+    # ports:
+    #     - "18080:18080"
+    networks:
+      - emqx_bridge

+ 1 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -46,6 +46,7 @@ services:
       # IOTDB
       - 14242:4242
       - 28080:18080
+      - 38080:38080
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 58 - 0
.ci/docker-compose-file/iotdb013/iotdb-rest.properties

@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+####################
+### REST Service Configuration
+####################
+
+# Is the REST service enabled
+enable_rest_service=true
+
+# the binding port of the REST service
+# rest_service_port=18080
+
+# the default row limit to a REST query response when the rowSize parameter is not given in request
+# rest_query_default_row_size_limit=10000
+
+# the expiration time of the user login information cache (in seconds)
+# cache_expire_in_seconds=28800
+
+# maximum number of users can be stored in the user login cache.
+# cache_max_num=100
+
+# init capacity of users can be stored in the user login cache.
+# cache_init_num=10
+
+# is SSL enabled
+# enable_https=false
+
+# SSL key store path
+# key_store_path=
+
+# SSL key store password
+# key_store_pwd=
+
+# SSL trust store path
+# trust_store_path=
+
+# SSL trust store password.
+# trust_store_pwd=
+
+# SSL timeout (in seconds)
+# idle_timeout_in_seconds=50000

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -132,6 +132,12 @@
     "upstream": "iotdb:18080",
     "enabled": true
   },
+  {
+    "name": "iotdb013",
+    "listen": "0.0.0.0:38080",
+    "upstream": "iotdb013:18080",
+    "enabled": true
+  },
   {
     "name": "minio_tcp",
     "listen": "0.0.0.0:19000",

+ 21 - 7
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -32,7 +32,7 @@ init_per_group(TestGroup, BridgeType, Config) ->
     {ok, _} = application:ensure_all_started(emqx_connector),
     emqx_mgmt_api_test_util:init_suite(),
     UniqueNum = integer_to_binary(erlang:unique_integer([positive])),
-    MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
+    MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>,
     [
         {proxy_host, ProxyHost},
         {proxy_port, ProxyPort},
@@ -116,6 +116,7 @@ create_bridge(Config, Overrides) ->
     Name = ?config(bridge_name, Config),
     BridgeConfig0 = ?config(bridge_config, Config),
     BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+    ct:pal("creating bridge with config: ~p", [BridgeConfig]),
     emqx_bridge:create(BridgeType, Name, BridgeConfig).
 
 create_bridge_api(Config) ->
@@ -203,7 +204,7 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_sync_query(Config, MakeMessageFun, IsSuccessCheck) ->
+t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
@@ -217,11 +218,13 @@ t_sync_query(Config, MakeMessageFun, IsSuccessCheck) ->
             IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
             ok
         end,
-        []
+        fun(Trace) ->
+            ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
+        end
     ),
     ok.
 
-t_async_query(Config, MakeMessageFun, IsSuccessCheck) ->
+t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
     ResourceId = resource_id(Config),
     ReplyFun =
         fun(Pid, Result) ->
@@ -236,10 +239,21 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck) ->
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
             Message = {send_message, MakeMessageFun()},
-            emqx_resource:query(ResourceId, Message, #{async_reply_fun => {ReplyFun, [self()]}}),
+            ?assertMatch(
+                {ok, {ok, _}},
+                ?wait_async_action(
+                    emqx_resource:query(ResourceId, Message, #{
+                        async_reply_fun => {ReplyFun, [self()]}
+                    }),
+                    #{?snk_kind := TracePoint, instance_id := ResourceId},
+                    5_000
+                )
+            ),
             ok
         end,
-        []
+        fun(Trace) ->
+            ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
+        end
     ),
     receive
         {result, Result} -> IsSuccessCheck(Result)
@@ -318,7 +332,7 @@ t_start_stop(Config, StopTracePoint) ->
         end,
         fun(Trace) ->
             %% one for each probe, one for real
-            ?assertMatch([_, _, _], ?of_kind(StopTracePoint, Trace)),
+            ?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)),
             ok
         end
     ),

+ 110 - 87
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl

@@ -72,7 +72,7 @@ on_start(InstanceId, Config) ->
                 instance_id => InstanceId,
                 request => maps:get(request, State, <<>>)
             }),
-            ?tp(iotdb_bridge_started, #{}),
+            ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
             {ok, maps:merge(Config, State)};
         {error, Reason} ->
             ?SLOG(error, #{
@@ -104,83 +104,108 @@ on_get_status(InstanceId, State) ->
     | {ok, pos_integer(), [term()]}
     | {error, term()}.
 on_query(InstanceId, {send_message, Message}, State) ->
+    ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_called",
         instance_id => InstanceId,
         send_message => Message,
         state => emqx_utils:redact(State)
     }),
-    IoTDBPayload = make_iotdb_insert_request(Message, State),
-    handle_response(
-        emqx_connector_http:on_query(
-            InstanceId, {send_message, IoTDBPayload}, State
-        )
-    ).
+    case make_iotdb_insert_request(Message, State) of
+        {ok, IoTDBPayload} ->
+            handle_response(
+                emqx_connector_http:on_query(
+                    InstanceId, {send_message, IoTDBPayload}, State
+                )
+            );
+        Error ->
+            Error
+    end.
 
 -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
-    {ok, pid()}.
+    {ok, pid()} | {error, empty_request}.
 on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
+    ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_async_called",
         instance_id => InstanceId,
         send_message => Message,
         state => emqx_utils:redact(State)
     }),
-    IoTDBPayload = make_iotdb_insert_request(Message, State),
-    ReplyFunAndArgs =
-        {
-            fun(Result) ->
-                Response = handle_response(Result),
-                emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
-            end,
-            []
-        },
-    emqx_connector_http:on_query_async(
-        InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
-    ).
+    case make_iotdb_insert_request(Message, State) of
+        {ok, IoTDBPayload} ->
+            ReplyFunAndArgs =
+                {
+                    fun(Result) ->
+                        Response = handle_response(Result),
+                        emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
+                    end,
+                    []
+                },
+            emqx_connector_http:on_query_async(
+                InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
+            );
+        Error ->
+            Error
+    end.
 
 %%--------------------------------------------------------------------
 %% Internal Functions
 %%--------------------------------------------------------------------
 
-make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
-    emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
-make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
-    lists:map(fun make_parsed_payload/1, PayloadUnparsed);
-make_parsed_payload(
-    #{
-        measurement := Measurement,
-        data_type := DataType,
-        value := Value
-    } = Data
-) ->
-    Data#{
-        <<"measurement">> => Measurement,
-        <<"data_type">> => DataType,
-        <<"value">> => Value
-    }.
+get_payload(#{payload := Payload}) ->
+    Payload;
+get_payload(#{<<"payload">> := Payload}) ->
+    Payload.
+
+parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
+    ParsedPayload;
+parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) ->
+    emqx_utils_json:decode(UnparsedPayload);
+parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) ->
+    lists:map(fun parse_payload/1, UnparsedPayloads).
+
+preproc_data_list(DataList) ->
+    lists:foldl(
+        fun preproc_data/2,
+        [],
+        DataList
+    ).
 
 preproc_data(
     #{
         <<"measurement">> := Measurement,
         <<"data_type">> := DataType,
         <<"value">> := Value
-    } = Data
+    } = Data,
+    Acc
 ) ->
-    #{
-        timestamp => emqx_plugin_libs_rule:preproc_tmpl(
-            maps:get(<<"timestamp">>, Data, <<"now">>)
-        ),
-        measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
-        data_type => DataType,
-        value => emqx_plugin_libs_rule:preproc_tmpl(Value)
-    }.
-
-preproc_data_list(DataList) ->
-    lists:map(
-        fun preproc_data/1,
-        DataList
-    ).
+    [
+        #{
+            timestamp => maybe_preproc_tmpl(
+                maps:get(<<"timestamp">>, Data, <<"now">>)
+            ),
+            measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
+            data_type => DataType,
+            value => maybe_preproc_tmpl(Value)
+        }
+        | Acc
+    ];
+preproc_data(_NoMatch, Acc) ->
+    ?SLOG(
+        warning,
+        #{
+            msg => "iotdb_bridge_preproc_data_failed",
+            required_fields => ['measurement', 'data_type', 'value'],
+            received => _NoMatch
+        }
+    ),
+    Acc.
+
+maybe_preproc_tmpl(Value) when is_binary(Value) ->
+    emqx_plugin_libs_rule:preproc_tmpl(Value);
+maybe_preproc_tmpl(Value) ->
+    Value.
 
 proc_data(PreProcessedData, Msg) ->
     NowNS = erlang:system_time(nanosecond),
@@ -199,9 +224,7 @@ proc_data(PreProcessedData, Msg) ->
             }
         ) ->
             #{
-                timestamp => iot_timestamp(
-                    emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows
-                ),
+                timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
                 measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg),
                 data_type => DataType,
                 value => proc_value(DataType, ValueTkn, Msg)
@@ -210,6 +233,11 @@ proc_data(PreProcessedData, Msg) ->
         PreProcessedData
     ).
 
+iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
+    Timestamp;
+iot_timestamp(TimestampTkn, Msg, Nows) ->
+    iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows).
+
 iot_timestamp(Timestamp, #{now_ms := NowMs}) when
     Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
 ->
@@ -240,6 +268,7 @@ replace_var(Val, _Data) ->
     Val.
 
 convert_bool(B) when is_boolean(B) -> B;
+convert_bool(null) -> null;
 convert_bool(1) -> true;
 convert_bool(0) -> false;
 convert_bool(<<"1">>) -> true;
@@ -249,8 +278,7 @@ convert_bool(<<"True">>) -> true;
 convert_bool(<<"TRUE">>) -> true;
 convert_bool(<<"false">>) -> false;
 convert_bool(<<"False">>) -> false;
-convert_bool(<<"FALSE">>) -> false;
-convert_bool(undefined) -> null.
+convert_bool(<<"FALSE">>) -> false.
 
 convert_int(Int) when is_integer(Int) -> Int;
 convert_int(Float) when is_float(Float) -> floor(Float);
@@ -276,24 +304,29 @@ convert_float(Str) when is_binary(Str) ->
 convert_float(undefined) ->
     null.
 
-make_iotdb_insert_request(MessageUnparsedPayload, State) ->
-    Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
+make_iotdb_insert_request(Message, State) ->
+    Payloads = to_list(parse_payload(get_payload(Message))),
     IsAligned = maps:get(is_aligned, State, false),
-    DeviceId = device_id(Message, State),
     IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
-    Payload = make_list(maps:get(payload, Message)),
-    PreProcessedData = preproc_data_list(Payload),
-    DataList = proc_data(PreProcessedData, Message),
-    InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
-    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
-    maps:merge(Rows, #{
-        iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
-        iotdb_field_key(device_id, IotDBVsn) => DeviceId
-    }).
-
-replace_dtypes(Rows, IotDBVsn) ->
-    {Types, Map} = maps:take(dtypes, Rows),
-    Map#{iotdb_field_key(data_types, IotDBVsn) => Types}.
+    case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of
+        {undefined, _} ->
+            {error, device_id_missing};
+        {_, []} ->
+            {error, invalid_data};
+        {DeviceId, PreProcessedData} ->
+            DataList = proc_data(PreProcessedData, Message),
+            InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
+            Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
+            {ok,
+                maps:merge(Rows, #{
+                    iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
+                    iotdb_field_key(device_id, IotDBVsn) => DeviceId
+                })}
+    end.
+
+replace_dtypes(Rows0, IotDBVsn) ->
+    {Types, Rows} = maps:take(dtypes, Rows0),
+    Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}.
 
 aggregate_rows(DataList, InitAcc) ->
     lists:foldr(
@@ -368,24 +401,14 @@ iotdb_field_key(data_types, ?VSN_1_0_X) ->
 iotdb_field_key(data_types, ?VSN_0_13_X) ->
     <<"dataTypes">>.
 
-make_list(List) when is_list(List) -> List;
-make_list(Data) -> [Data].
+to_list(List) when is_list(List) -> List;
+to_list(Data) -> [Data].
 
-device_id(Message, State) ->
+device_id(Message, Payloads, State) ->
     case maps:get(device_id, State, undefined) of
         undefined ->
-            case maps:get(payload, Message) of
-                #{<<"device_id">> := DeviceId} ->
-                    DeviceId;
-                #{device_id := DeviceId} ->
-                    DeviceId;
-                _NotFound ->
-                    Topic = maps:get(topic, Message),
-                    case re:replace(Topic, "/", ".", [global, {return, binary}]) of
-                        <<"root.", _/binary>> = Device -> Device;
-                        Device -> <<"root.", Device/binary>>
-                    end
-            end;
+            %% [FIXME] there could be conflicting device-ids in the Payloads
+            maps:get(<<"device_id">>, hd(Payloads), undefined);
         DeviceId ->
             DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId),
             emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message)

+ 309 - 73
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -6,8 +6,10 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
+-include("emqx_bridge_iotdb.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(BRIDGE_TYPE_BIN, <<"iotdb">>).
 -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]).
@@ -18,13 +20,15 @@
 
 all() ->
     [
-        {group, plain}
+        {group, plain},
+        {group, legacy}
     ].
 
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {plain, AllTCs}
+        {plain, AllTCs},
+        {legacy, AllTCs}
     ].
 
 init_per_suite(Config) ->
@@ -43,7 +47,32 @@ init_per_group(plain = Type, Config0) ->
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
-                {proxy_name, ProxyName}
+                {proxy_name, ProxyName},
+                {iotdb_version, ?VSN_1_1_X},
+                {iotdb_rest_prefix, <<"/rest/v2/">>}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_iotdb);
+                _ ->
+                    {skip, no_iotdb}
+            end
+    end;
+init_per_group(legacy = Type, Config0) ->
+    Host = os:getenv("IOTDB_LEGACY_HOST", "toxiproxy.emqx.net"),
+    Port = list_to_integer(os:getenv("IOTDB_LEGACY_PORT", "38080")),
+    ProxyName = "iotdb013",
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
+            [
+                {bridge_host, Host},
+                {bridge_port, Port},
+                {proxy_name, ProxyName},
+                {iotdb_version, ?VSN_0_13_X},
+                {iotdb_rest_prefix, <<"/rest/v1/">>}
                 | Config
             ];
         false ->
@@ -58,7 +87,8 @@ init_per_group(_Group, Config) ->
     Config.
 
 end_per_group(Group, Config) when
-    Group =:= plain
+    Group =:= plain;
+    Group =:= legacy
 ->
     emqx_bridge_testlib:end_per_group(Config),
     ok;
@@ -67,7 +97,7 @@ end_per_group(_Group, _Config) ->
 
 init_per_testcase(TestCase, Config0) ->
     Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3),
-    reset_service(Config),
+    iotdb_reset(Config),
     Config.
 
 end_per_testcase(TestCase, Config) ->
@@ -76,20 +106,23 @@ end_per_testcase(TestCase, Config) ->
 %%------------------------------------------------------------------------------
 %% Helper fns
 %%------------------------------------------------------------------------------
+iotdb_server_url(Host, Port) ->
+    iolist_to_binary([
+        "http://",
+        Host,
+        ":",
+        integer_to_binary(Port)
+    ]).
 
 bridge_config(TestCase, _TestGroup, Config) ->
     UniqueNum = integer_to_binary(erlang:unique_integer()),
     Host = ?config(bridge_host, Config),
     Port = ?config(bridge_port, Config),
+    Version = ?config(iotdb_version, Config),
     Name = <<
         (atom_to_binary(TestCase))/binary, UniqueNum/binary
     >>,
-    ServerURL = iolist_to_binary([
-        "http://",
-        Host,
-        ":",
-        integer_to_binary(Port)
-    ]),
+    ServerURL = iotdb_server_url(Host, Port),
     ConfigString =
         io_lib:format(
             "bridges.iotdb.~s {\n"
@@ -99,6 +132,7 @@ bridge_config(TestCase, _TestGroup, Config) ->
             "     username = \"root\"\n"
             "     password = \"root\"\n"
             "  }\n"
+            "iotdb_version = \"~s\"\n"
             "  pool_size = 1\n"
             "  resource_opts = {\n"
             "     auto_restart_interval = 5000\n"
@@ -109,12 +143,54 @@ bridge_config(TestCase, _TestGroup, Config) ->
             "}\n",
             [
                 Name,
-                ServerURL
+                ServerURL,
+                Version
             ]
         ),
     {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}.
 
-reset_service(Config) ->
+make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
+    #{
+        measurement => s_to_b(Measurement),
+        data_type => s_to_b(Type),
+        value => s_to_b(Value),
+        device_id => DeviceId,
+        is_aligned => true
+    }.
+
+make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) ->
+    Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value),
+    Payload#{timestamp => Timestamp}.
+
+s_to_b(S) when is_list(S) -> list_to_binary(S);
+s_to_b(V) -> V.
+
+make_message_fun(Topic, Payload) ->
+    fun() ->
+        MsgId = erlang:unique_integer([positive]),
+        #{
+            topic => Topic,
+            id => MsgId,
+            payload => emqx_utils_json:encode(Payload),
+            retain => true
+        }
+    end.
+
+iotdb_topic(Config) ->
+    ?config(mqtt_topic, Config).
+
+iotdb_device(Config) ->
+    Topic = iotdb_topic(Config),
+    topic_to_iotdb_device(Topic).
+
+topic_to_iotdb_device(Topic) ->
+    Device = re:replace(Topic, "/", ".", [global, {return, binary}]),
+    <<"root.", Device/binary>>.
+
+iotdb_request(Config, Path, Body) ->
+    iotdb_request(Config, Path, Body, #{}).
+
+iotdb_request(Config, Path, Body, Opts) ->
     _BridgeConfig =
         #{
             <<"base_url">> := BaseURL,
@@ -125,43 +201,40 @@ reset_service(Config) ->
         } =
         ?config(bridge_config, Config),
     ct:pal("bridge config: ~p", [_BridgeConfig]),
-    Path = <<BaseURL/binary, "/rest/v2/nonQuery">>,
+    URL = <<BaseURL/binary, Path/binary>>,
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
     Headers = [
         {"Content-type", "application/json"},
         {"Authorization", binary_to_list(BasicToken)}
     ],
+    emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts).
+
+iotdb_reset(Config) ->
     Device = iotdb_device(Config),
+    iotdb_reset(Config, Device).
+
+iotdb_reset(Config, Device) ->
+    Prefix = ?config(iotdb_rest_prefix, Config),
     Body = #{sql => <<"delete from ", Device/binary, ".*">>},
-    {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}).
+    {ok, _} = iotdb_request(Config, <<Prefix/binary, "nonQuery">>, Body).
 
-make_iotdb_payload(DeviceId) ->
-    make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36").
+iotdb_query(Config, Query) ->
+    Prefix = ?config(iotdb_rest_prefix, Config),
+    Path = <<Prefix/binary, "query">>,
+    Opts = #{return_all => true},
+    Body = #{sql => Query},
+    iotdb_request(Config, Path, Body, Opts).
 
-make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
-    #{
-        measurement => Measurement,
-        data_type => Type,
-        value => Value,
-        device_id => DeviceId,
-        is_aligned => false
-    }.
+is_success_check({ok, 200, _, Body}) ->
+    ?assert(is_code(200, emqx_utils_json:decode(Body))).
 
-make_message_fun(Topic, Payload) ->
-    fun() ->
-        MsgId = erlang:unique_integer([positive]),
-        #{
-            topic => Topic,
-            id => MsgId,
-            payload => Payload,
-            retain => true
-        }
-    end.
+is_code(Code, #{<<"code">> := Code}) -> true;
+is_code(_, _) -> false.
 
-iotdb_device(Config) ->
-    MQTTTopic = ?config(mqtt_topic, Config),
-    Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]),
-    <<"root.", Device/binary>>.
+is_error_check(Reason) ->
+    fun(Result) ->
+        ?assertEqual({error, Reason}, Result)
+    end.
 
 %%------------------------------------------------------------------------------
 %% Testcases
@@ -169,55 +242,164 @@ iotdb_device(Config) ->
 
 t_sync_query_simple(Config) ->
     DeviceId = iotdb_device(Config),
-    Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"),
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
-    IsSuccessCheck =
-        fun(Result) ->
-            ?assertEqual(ok, element(1, Result))
-        end,
-    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck).
+    Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
+    ),
+    Query = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
+    ?assertMatch(
+        #{<<"values">> := [[36]]},
+        emqx_utils_json:decode(IoTDBResult)
+    ).
 
 t_async_query(Config) ->
     DeviceId = iotdb_device(Config),
-    Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"),
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
-    IsSuccessCheck =
-        fun(Result) ->
-            ?assertEqual(ok, element(1, Result))
-        end,
-    emqx_bridge_testlib:t_async_query(Config, MakeMessageFun, IsSuccessCheck).
+    Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_testlib:t_async_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
+    ),
+    Query = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
+    ?assertMatch(
+        #{<<"values">> := [[36]]},
+        emqx_utils_json:decode(IoTDBResult)
+    ).
 
 t_sync_query_aggregated(Config) ->
     DeviceId = iotdb_device(Config),
     Payload = [
-        make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"),
-        (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"mow_us">>},
-        (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"mow_ns">>},
-        make_iotdb_payload(DeviceId, "charged", <<"BOOLEAN">>, "1"),
-        make_iotdb_payload(DeviceId, "stoked", <<"BOOLEAN">>, "true"),
-        make_iotdb_payload(DeviceId, "enriched", <<"BOOLEAN">>, <<"TRUE">>),
-        make_iotdb_payload(DeviceId, "drained", <<"BOOLEAN">>, "0"),
-        make_iotdb_payload(DeviceId, "dazzled", <<"BOOLEAN">>, "false"),
-        make_iotdb_payload(DeviceId, "unplugged", <<"BOOLEAN">>, <<"FALSE">>),
-        make_iotdb_payload(DeviceId, "weight", <<"FLOAT">>, "87.3"),
-        make_iotdb_payload(DeviceId, "foo", <<"TEXT">>, <<"bar">>)
+        make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290),
+        make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291),
+        make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292),
+        make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>),
+        make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294),
+        make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295),
+        make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296),
+        %% implicit 'now()' timestamp
+        make_iotdb_payload(DeviceId, "temp", "INT32", "40"),
+        %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
+        (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
+        (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>},
+
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290),
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291),
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295),
+
+        make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300),
+        make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300),
+        make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300),
+        make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300),
+        make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300),
+        make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300),
+        make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300),
+        make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300),
+        make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300),
+        make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300),
+        make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300),
+        make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300),
+        make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300),
+
+        make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
     ],
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
-    IsSuccessCheck =
-        fun(Result) ->
-            ?assertEqual(ok, element(1, Result))
-        end,
-    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck).
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
+    ),
+
+    %% check temp
+    QueryTemp = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
+    ?assertMatch(
+        #{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]},
+        emqx_utils_json:decode(ResultTemp)
+    ),
+    %% check weight
+    QueryWeight = <<"select weight from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
+    ?assertMatch(
+        #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
+        emqx_utils_json:decode(ResultWeight)
+    ),
+    %% check rest ts = 1685112026300
+    QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>,
+    {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
+    #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
+        ResultRest
+    ),
+    Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
+    Exp = #{
+        exp(DeviceId, "charged") => true,
+        exp(DeviceId, "floated") => true,
+        exp(DeviceId, "started") => true,
+        exp(DeviceId, "stoked") => true,
+        exp(DeviceId, "enriched") => true,
+        exp(DeviceId, "gutted") => true,
+        exp(DeviceId, "drained") => false,
+        exp(DeviceId, "toasted") => false,
+        exp(DeviceId, "uncharted") => false,
+        exp(DeviceId, "dazzled") => false,
+        exp(DeviceId, "unplugged") => false,
+        exp(DeviceId, "unraveled") => false,
+        exp(DeviceId, "undecided") => null,
+        exp(DeviceId, "foo") => <<"bar">>,
+        exp(DeviceId, "temp") => null,
+        exp(DeviceId, "weight") => null
+    },
+    ?assertEqual(Exp, Results),
+
+    ok.
+
+exp(Dev, M0) ->
+    M = s_to_b(M0),
+    <<Dev/binary, ".", M/binary>>.
 
 t_sync_query_fail(Config) ->
     DeviceId = iotdb_device(Config),
-    Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "Anton"),
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
+    Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
     IsSuccessCheck =
         fun(Result) ->
             ?assertEqual(error, element(1, Result))
         end,
-    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck).
+    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query).
+
+t_sync_device_id_missing(Config) ->
+    emqx_bridge_testlib:t_sync_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar}),
+        is_error_check(device_id_missing),
+        iotdb_bridge_on_query
+    ).
+
+t_sync_invalid_data(Config) ->
+    emqx_bridge_testlib:t_sync_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
+        is_error_check(invalid_data),
+        iotdb_bridge_on_query
+    ).
+
+t_async_device_id_missing(Config) ->
+    emqx_bridge_testlib:t_async_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar}),
+        is_error_check(device_id_missing),
+        iotdb_bridge_on_query_async
+    ).
+
+t_async_invalid_data(Config) ->
+    emqx_bridge_testlib:t_async_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
+        is_error_check(invalid_data),
+        iotdb_bridge_on_query_async
+    ).
 
 t_create_via_http(Config) ->
     emqx_bridge_testlib:t_create_via_http(Config).
@@ -227,3 +409,57 @@ t_start_stop(Config) ->
 
 t_on_get_status(Config) ->
     emqx_bridge_testlib:t_on_get_status(Config).
+
+t_device_id(Config) ->
+    ResourceId = emqx_bridge_testlib:resource_id(Config),
+    %% Create without device_id configured
+    ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)),
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    ConfiguredDevice = <<"root.someOtherDevice234">>,
+    DeviceId = <<"root.deviceFooBar123">>,
+    Topic = <<"some/random/topic">>,
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, ConfiguredDevice),
+    Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
+    MessageF1 = make_message_fun(Topic, Payload1),
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
+    ),
+    {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
+    ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
+    #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
+    ?assertNot(is_empty(Values1_1)),
+
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, ConfiguredDevice),
+
+    %% reconfigure bridge with device_id
+    {ok, _} =
+        emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}),
+
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
+    ),
+
+    %% even though we had a device_id in the message it's not being used
+    {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
+    #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
+    ?assert(is_empty(Values2_1)),
+    {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
+        Config, <<"select * from ", ConfiguredDevice/binary>>
+    ),
+    #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
+    ?assertNot(is_empty(Values2_2)),
+
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, ConfiguredDevice),
+    ok.
+
+is_empty(null) -> true;
+is_empty([]) -> true;
+is_empty([[]]) -> true;
+is_empty(_) -> false.