|
|
@@ -8,6 +8,7 @@
|
|
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
-define(BRIDGE_TYPE_BIN, <<"iotdb">>).
|
|
|
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]).
|
|
|
@@ -143,9 +144,15 @@ make_message_fun(Topic, Payload) ->
|
|
|
}
|
|
|
end.
|
|
|
|
|
|
+iotdb_topic(Config) ->
|
|
|
+ ?config(mqtt_topic, Config).
|
|
|
+
|
|
|
iotdb_device(Config) ->
|
|
|
- MQTTTopic = ?config(mqtt_topic, Config),
|
|
|
- Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]),
|
|
|
+ Topic = iotdb_topic(Config),
|
|
|
+ topic_to_iotdb_device(Topic).
|
|
|
+
|
|
|
+topic_to_iotdb_device(Topic) ->
|
|
|
+ Device = re:replace(Topic, "/", ".", [global, {return, binary}]),
|
|
|
<<"root.", Device/binary>>.
|
|
|
|
|
|
iotdb_request(Config, Path, Body) ->
|
|
|
@@ -172,6 +179,9 @@ iotdb_request(Config, Path, Body, Opts) ->
|
|
|
|
|
|
iotdb_reset(Config) ->
|
|
|
Device = iotdb_device(Config),
|
|
|
+ iotdb_reset(Config, Device).
|
|
|
+
|
|
|
+iotdb_reset(Config, Device) ->
|
|
|
Body = #{sql => <<"delete from ", Device/binary, ".*">>},
|
|
|
{ok, _} = iotdb_request(Config, <<"/rest/v2/nonQuery">>, Body).
|
|
|
|
|
|
@@ -181,6 +191,12 @@ iotdb_query(Config, Query) ->
|
|
|
Body = #{sql => Query},
|
|
|
iotdb_request(Config, Path, Body, Opts).
|
|
|
|
|
|
+is_success_check({ok, 200, _, Body}) ->
|
|
|
+ ?assert(is_code(200, emqx_utils_json:decode(Body))).
|
|
|
+
|
|
|
+is_code(Code, #{<<"code">> := Code}) -> true;
|
|
|
+is_code(_, _) -> false.
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Testcases
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -188,13 +204,9 @@ iotdb_query(Config, Query) ->
|
|
|
t_sync_query_simple(Config) ->
|
|
|
DeviceId = iotdb_device(Config),
|
|
|
Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
|
|
|
- MakeMessageFun = make_message_fun(DeviceId, Payload),
|
|
|
- IsSuccessCheck =
|
|
|
- fun(Result) ->
|
|
|
- ?assertEqual(ok, element(1, Result))
|
|
|
- end,
|
|
|
+ MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
|
|
|
ok = emqx_bridge_testlib:t_sync_query(
|
|
|
- Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
|
|
|
+ Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
|
|
|
),
|
|
|
Query = <<"select temp from ", DeviceId/binary>>,
|
|
|
{ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
|
|
|
@@ -206,13 +218,9 @@ t_sync_query_simple(Config) ->
|
|
|
t_async_query(Config) ->
|
|
|
DeviceId = iotdb_device(Config),
|
|
|
Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
|
|
|
- MakeMessageFun = make_message_fun(DeviceId, Payload),
|
|
|
- IsSuccessCheck =
|
|
|
- fun(Result) ->
|
|
|
- ?assertEqual(ok, element(1, Result))
|
|
|
- end,
|
|
|
+ MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
|
|
|
ok = emqx_bridge_testlib:t_async_query(
|
|
|
- Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async
|
|
|
+ Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
|
|
|
),
|
|
|
Query = <<"select temp from ", DeviceId/binary>>,
|
|
|
{ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
|
|
|
@@ -260,13 +268,9 @@ t_sync_query_aggregated(Config) ->
|
|
|
|
|
|
make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
|
|
|
],
|
|
|
- MakeMessageFun = make_message_fun(DeviceId, Payload),
|
|
|
- IsSuccessCheck =
|
|
|
- fun(Result) ->
|
|
|
- ?assertEqual(ok, element(1, Result))
|
|
|
- end,
|
|
|
+ MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
|
|
|
ok = emqx_bridge_testlib:t_sync_query(
|
|
|
- Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
|
|
|
+ Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
|
|
|
),
|
|
|
|
|
|
%% check temp
|
|
|
@@ -319,7 +323,7 @@ exp(Dev, M0) ->
|
|
|
t_sync_query_fail(Config) ->
|
|
|
DeviceId = iotdb_device(Config),
|
|
|
Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
|
|
|
- MakeMessageFun = make_message_fun(DeviceId, Payload),
|
|
|
+ MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
|
|
|
IsSuccessCheck =
|
|
|
fun(Result) ->
|
|
|
?assertEqual(error, element(1, Result))
|
|
|
@@ -327,7 +331,6 @@ t_sync_query_fail(Config) ->
|
|
|
emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query).
|
|
|
|
|
|
t_sync_query_badpayload(Config) ->
|
|
|
- DeviceId = iotdb_device(Config),
|
|
|
BadPayload = #{foo => bar},
|
|
|
IsSuccessCheck =
|
|
|
fun(Result) ->
|
|
|
@@ -335,14 +338,13 @@ t_sync_query_badpayload(Config) ->
|
|
|
end,
|
|
|
emqx_bridge_testlib:t_sync_query(
|
|
|
Config,
|
|
|
- make_message_fun(DeviceId, BadPayload),
|
|
|
+ make_message_fun(iotdb_topic(Config), BadPayload),
|
|
|
IsSuccessCheck,
|
|
|
iotdb_bridge_on_query
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
t_async_query_badpayload(Config) ->
|
|
|
- DeviceId = iotdb_device(Config),
|
|
|
BadPayload = #{foo => bar},
|
|
|
IsSuccessCheck =
|
|
|
fun(Result) ->
|
|
|
@@ -350,7 +352,7 @@ t_async_query_badpayload(Config) ->
|
|
|
end,
|
|
|
emqx_bridge_testlib:t_async_query(
|
|
|
Config,
|
|
|
- make_message_fun(DeviceId, BadPayload),
|
|
|
+ make_message_fun(iotdb_topic(Config), BadPayload),
|
|
|
IsSuccessCheck,
|
|
|
iotdb_bridge_on_query_async
|
|
|
),
|
|
|
@@ -364,3 +366,78 @@ t_start_stop(Config) ->
|
|
|
|
|
|
t_on_get_status(Config) ->
|
|
|
emqx_bridge_testlib:t_on_get_status(Config).
|
|
|
+
|
|
|
+t_device_id(Config) ->
|
|
|
+ ResourceId = emqx_bridge_testlib:resource_id(Config),
|
|
|
+ %% Create without device_id configured
|
|
|
+ ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)),
|
|
|
+ ?retry(
|
|
|
+ _Sleep = 1_000,
|
|
|
+ _Attempts = 20,
|
|
|
+ ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
+ ),
|
|
|
+ ConfiguredDevice = <<"root.someOtherDevice234">>,
|
|
|
+ DeviceId = <<"root.deviceFooBar123">>,
|
|
|
+ Topic = <<"some/random/topic">>,
|
|
|
+ TopicDevice = topic_to_iotdb_device(Topic),
|
|
|
+ iotdb_reset(Config, DeviceId),
|
|
|
+ iotdb_reset(Config, TopicDevice),
|
|
|
+ iotdb_reset(Config, ConfiguredDevice),
|
|
|
+ Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
|
|
|
+ MessageF1 = make_message_fun(Topic, Payload1),
|
|
|
+ ?assertNotEqual(DeviceId, TopicDevice),
|
|
|
+ is_success_check(
|
|
|
+ emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
|
|
|
+ ),
|
|
|
+ {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
|
|
|
+ #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
|
|
|
+ ?assertNotEqual([], Values1_1),
|
|
|
+ {ok, {{_, 200, _}, _, Res1_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>),
|
|
|
+ #{<<"values">> := Values1_2} = emqx_utils_json:decode(Res1_2),
|
|
|
+ ?assertEqual([], Values1_2),
|
|
|
+
|
|
|
+ %% test without device_id in message, taking it from topic
|
|
|
+ iotdb_reset(Config, DeviceId),
|
|
|
+ iotdb_reset(Config, TopicDevice),
|
|
|
+ iotdb_reset(Config, ConfiguredDevice),
|
|
|
+ Payload2 = maps:remove(device_id, make_iotdb_payload(DeviceId, "root", "BOOLEAN", true)),
|
|
|
+ MessageF2 = make_message_fun(Topic, Payload2),
|
|
|
+ is_success_check(
|
|
|
+ emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF2()})
|
|
|
+ ),
|
|
|
+ {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
|
|
|
+ #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
|
|
|
+ ?assertEqual([], Values2_1),
|
|
|
+ {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>),
|
|
|
+ #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
|
|
|
+ ?assertNotEqual([], Values2_2),
|
|
|
+
|
|
|
+ iotdb_reset(Config, DeviceId),
|
|
|
+ iotdb_reset(Config, TopicDevice),
|
|
|
+ iotdb_reset(Config, ConfiguredDevice),
|
|
|
+
|
|
|
+ %% reconfigure bridge with device_id
|
|
|
+ {ok, _} =
|
|
|
+ emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}),
|
|
|
+
|
|
|
+ is_success_check(
|
|
|
+ emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% even though we had a device_id in the message it's not being used
|
|
|
+ {ok, {{_, 200, _}, _, Res3_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
|
|
|
+ #{<<"values">> := Values3_1} = emqx_utils_json:decode(Res3_1),
|
|
|
+ ?assertEqual([], Values3_1),
|
|
|
+ {ok, {{_, 200, _}, _, Res3_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>),
|
|
|
+ #{<<"values">> := Values3_2} = emqx_utils_json:decode(Res3_2),
|
|
|
+ ?assertEqual([], Values3_2),
|
|
|
+ {ok, {{_, 200, _}, _, Res3_3}} = iotdb_query(
|
|
|
+ Config, <<"select * from ", ConfiguredDevice/binary>>
|
|
|
+ ),
|
|
|
+ #{<<"values">> := Values3_3} = emqx_utils_json:decode(Res3_3),
|
|
|
+ ?assertNotEqual([], Values3_3),
|
|
|
+
|
|
|
+ iotdb_reset(Config, DeviceId),
|
|
|
+ iotdb_reset(Config, TopicDevice),
|
|
|
+ iotdb_reset(Config, ConfiguredDevice),
|
|
|
+ ok.
|