emqx_bridge_iotdb_impl_SUITE.erl 25 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_iotdb_impl_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include("emqx_bridge_iotdb.hrl").
  8. -include_lib("eunit/include/eunit.hrl").
  9. -include_lib("common_test/include/ct.hrl").
  10. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  11. -define(BRIDGE_TYPE_BIN, <<"iotdb">>).
  12. -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]).
  13. %%------------------------------------------------------------------------------
  14. %% CT boilerplate
  15. %%------------------------------------------------------------------------------
  16. all() ->
  17. [
  18. {group, iotdb110},
  19. {group, iotdb130},
  20. {group, legacy}
  21. ].
  22. groups() ->
  23. AllTCs = emqx_common_test_helpers:all(?MODULE),
  24. [
  25. {iotdb110, AllTCs},
  26. {iotdb130, AllTCs},
  27. {legacy, AllTCs}
  28. ].
  29. init_per_suite(Config) ->
  30. emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS).
  31. end_per_suite(Config) ->
  32. emqx_bridge_v2_testlib:end_per_suite(Config).
  33. init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 ->
  34. Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"),
  35. ProxyName = atom_to_list(Type),
  36. {IotDbVersion, DefaultPort} =
  37. case Type of
  38. iotdb110 -> {?VSN_1_1_X, "18080"};
  39. iotdb130 -> {?VSN_1_3_X, "28080"}
  40. end,
  41. Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", DefaultPort)),
  42. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  43. true ->
  44. Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
  45. [
  46. {bridge_host, Host},
  47. {bridge_port, Port},
  48. {proxy_name, ProxyName},
  49. {iotdb_version, IotDbVersion},
  50. {iotdb_rest_prefix, <<"/rest/v2/">>}
  51. | Config
  52. ];
  53. false ->
  54. case os:getenv("IS_CI") of
  55. "yes" ->
  56. throw(no_iotdb);
  57. _ ->
  58. {skip, no_iotdb}
  59. end
  60. end;
  61. init_per_group(legacy = Type, Config0) ->
  62. Host = os:getenv("IOTDB_LEGACY_HOST", "toxiproxy.emqx.net"),
  63. Port = list_to_integer(os:getenv("IOTDB_LEGACY_PORT", "38080")),
  64. ProxyName = "iotdb013",
  65. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  66. true ->
  67. Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
  68. [
  69. {bridge_host, Host},
  70. {bridge_port, Port},
  71. {proxy_name, ProxyName},
  72. {iotdb_version, ?VSN_0_13_X},
  73. {iotdb_rest_prefix, <<"/rest/v1/">>}
  74. | Config
  75. ];
  76. false ->
  77. case os:getenv("IS_CI") of
  78. "yes" ->
  79. throw(no_iotdb);
  80. _ ->
  81. {skip, no_iotdb}
  82. end
  83. end;
  84. init_per_group(_Group, Config) ->
  85. Config.
  86. end_per_group(Group, Config) when
  87. Group =:= iotdb110;
  88. Group =:= iotdb130;
  89. Group =:= legacy
  90. ->
  91. emqx_bridge_v2_testlib:end_per_group(Config),
  92. ok;
  93. end_per_group(_Group, _Config) ->
  94. ok.
  95. init_per_testcase(TestCase, Config0) ->
  96. Type = ?config(bridge_type, Config0),
  97. UniqueNum = integer_to_binary(erlang:unique_integer()),
  98. Name = <<
  99. (atom_to_binary(TestCase))/binary, UniqueNum/binary
  100. >>,
  101. {_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
  102. {_, ActionConfig} = action_config(Name, Config0),
  103. Config = [
  104. {connector_type, Type},
  105. {connector_name, Name},
  106. {connector_config, ConnectorConfig},
  107. {bridge_type, Type},
  108. {bridge_name, Name},
  109. {bridge_config, ActionConfig}
  110. | Config0
  111. ],
  112. iotdb_reset(Config),
  113. ok = snabbkaffe:start_trace(),
  114. Config.
  115. end_per_testcase(TestCase, Config) ->
  116. emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config).
  117. %%------------------------------------------------------------------------------
  118. %% Helper fns
  119. %%------------------------------------------------------------------------------
  120. iotdb_server_url(Host, Port) ->
  121. iolist_to_binary([
  122. "http://",
  123. Host,
  124. ":",
  125. integer_to_binary(Port)
  126. ]).
  127. bridge_config(TestCase, Config) ->
  128. UniqueNum = integer_to_binary(erlang:unique_integer()),
  129. Host = ?config(bridge_host, Config),
  130. Port = ?config(bridge_port, Config),
  131. Version = ?config(iotdb_version, Config),
  132. Type = ?config(bridge_type, Config),
  133. Name = <<
  134. (atom_to_binary(TestCase))/binary, UniqueNum/binary
  135. >>,
  136. ServerURL = iotdb_server_url(Host, Port),
  137. ConfigString =
  138. io_lib:format(
  139. "bridges.~s.~s {\n"
  140. " enable = true\n"
  141. " base_url = \"~s\"\n"
  142. " authentication = {\n"
  143. " username = \"root\"\n"
  144. " password = \"root\"\n"
  145. " }\n"
  146. " iotdb_version = \"~s\"\n"
  147. " pool_size = 1\n"
  148. " resource_opts = {\n"
  149. " health_check_interval = \"1s\"\n"
  150. " request_ttl = 30s\n"
  151. " query_mode = \"async\"\n"
  152. " worker_pool_size = 1\n"
  153. " }\n"
  154. "}\n",
  155. [
  156. Type,
  157. Name,
  158. ServerURL,
  159. Version
  160. ]
  161. ),
  162. {ok, InnerConfigMap} = hocon:binary(ConfigString),
  163. {Name, ConfigString, emqx_bridge_v2_testlib:parse_and_check(Type, Name, InnerConfigMap)}.
  164. make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
  165. #{
  166. measurement => s_to_b(Measurement),
  167. data_type => s_to_b(Type),
  168. value => s_to_b(Value),
  169. device_id => DeviceId,
  170. is_aligned => true
  171. }.
  172. make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) ->
  173. Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value),
  174. Payload#{timestamp => Timestamp}.
  175. s_to_b(S) when is_list(S) -> list_to_binary(S);
  176. s_to_b(V) -> V.
  177. make_message_fun(Topic, Payload) ->
  178. fun() ->
  179. MsgId = erlang:unique_integer([positive]),
  180. #{
  181. topic => Topic,
  182. id => MsgId,
  183. payload => emqx_utils_json:encode(Payload),
  184. retain => true
  185. }
  186. end.
  187. iotdb_topic(Config) ->
  188. ?config(mqtt_topic, Config).
  189. iotdb_device(Config) ->
  190. Topic = iotdb_topic(Config),
  191. topic_to_iotdb_device(Topic).
  192. topic_to_iotdb_device(Topic) ->
  193. Device = re:replace(Topic, "/", ".", [global, {return, binary}]),
  194. <<"root.", Device/binary>>.
  195. iotdb_request(Config, Path, Body) ->
  196. iotdb_request(Config, Path, Body, #{}).
  197. iotdb_request(Config, Path, Body, Opts) ->
  198. _BridgeConfig =
  199. #{
  200. <<"base_url">> := BaseURL,
  201. <<"authentication">> := #{
  202. <<"username">> := Username,
  203. <<"password">> := Password
  204. }
  205. } =
  206. ?config(connector_config, Config),
  207. ct:pal("bridge config: ~p", [_BridgeConfig]),
  208. URL = <<BaseURL/binary, Path/binary>>,
  209. BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
  210. Headers = [
  211. {"Content-type", "application/json"},
  212. {"Authorization", binary_to_list(BasicToken)}
  213. ],
  214. emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts).
  215. iotdb_reset(Config) ->
  216. Device = iotdb_device(Config),
  217. iotdb_reset(Config, Device).
  218. iotdb_reset(Config, Device) ->
  219. Prefix = ?config(iotdb_rest_prefix, Config),
  220. Body = #{sql => <<"delete from ", Device/binary, ".*">>},
  221. {ok, _} = iotdb_request(Config, <<Prefix/binary, "nonQuery">>, Body).
  222. iotdb_query(Config, Query) ->
  223. Prefix = ?config(iotdb_rest_prefix, Config),
  224. Path = <<Prefix/binary, "query">>,
  225. Opts = #{return_all => true},
  226. Body = #{sql => Query},
  227. iotdb_request(Config, Path, Body, Opts).
  228. is_success_check({ok, 200, _, Body}) ->
  229. ?assert(is_code(200, emqx_utils_json:decode(Body)));
  230. is_success_check(Other) ->
  231. throw(Other).
  232. is_code(Code, #{<<"code">> := Code}) -> true;
  233. is_code(_, _) -> false.
  234. is_error_check(Reason) ->
  235. fun(Result) ->
  236. ?assertEqual({error, Reason}, Result)
  237. end.
  238. action_config(Name, Config) ->
  239. Type = ?config(bridge_type, Config),
  240. ConfigString =
  241. io_lib:format(
  242. "actions.~s.~s {\n"
  243. " enable = true\n"
  244. " connector = \"~s\"\n"
  245. " parameters = {\n"
  246. " data = []\n"
  247. " }\n"
  248. "}\n",
  249. [
  250. Type,
  251. Name,
  252. Name
  253. ]
  254. ),
  255. ct:pal("ActionConfig:~ts~n", [ConfigString]),
  256. {ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
  257. connector_config(Name, Config) ->
  258. Host = ?config(bridge_host, Config),
  259. Port = ?config(bridge_port, Config),
  260. Type = ?config(bridge_type, Config),
  261. Version = ?config(iotdb_version, Config),
  262. ServerURL = iotdb_server_url(Host, Port),
  263. ConfigString =
  264. io_lib:format(
  265. "connectors.~s.~s {\n"
  266. " enable = true\n"
  267. " base_url = \"~s\"\n"
  268. " iotdb_version = \"~s\"\n"
  269. " authentication = {\n"
  270. " username = \"root\"\n"
  271. " password = \"root\"\n"
  272. " }\n"
  273. "}\n",
  274. [
  275. Type,
  276. Name,
  277. ServerURL,
  278. Version
  279. ]
  280. ),
  281. ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
  282. {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
  283. parse_action_and_check(ConfigString, BridgeType, Name) ->
  284. parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
  285. parse_connector_and_check(ConfigString, ConnectorType, Name) ->
  286. parse_and_check(
  287. ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
  288. ).
  289. %% emqx_utils_maps:safe_atom_key_map(Config).
  290. parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
  291. Type = to_bin(Type0),
  292. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  293. hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
  294. #{RootKey := #{Type := #{Name := Config}}} = RawConf,
  295. Config.
  296. to_bin(List) when is_list(List) ->
  297. unicode:characters_to_binary(List, utf8);
  298. to_bin(Atom) when is_atom(Atom) ->
  299. erlang:atom_to_binary(Atom);
  300. to_bin(Bin) when is_binary(Bin) ->
  301. Bin.
  302. %%------------------------------------------------------------------------------
  303. %% Testcases
  304. %%------------------------------------------------------------------------------
  305. t_sync_query_simple(Config) ->
  306. DeviceId = iotdb_device(Config),
  307. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
  308. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  309. ok = emqx_bridge_v2_testlib:t_sync_query(
  310. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  311. ),
  312. Query = <<"select temp from ", DeviceId/binary>>,
  313. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  314. ?assertMatch(
  315. #{<<"values">> := [[36]]},
  316. emqx_utils_json:decode(IoTDBResult)
  317. ).
  318. t_async_query(Config) ->
  319. DeviceId = iotdb_device(Config),
  320. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
  321. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  322. ok = emqx_bridge_v2_testlib:t_async_query(
  323. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
  324. ),
  325. Query = <<"select temp from ", DeviceId/binary>>,
  326. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  327. ?assertMatch(
  328. #{<<"values">> := [[36]]},
  329. emqx_utils_json:decode(IoTDBResult)
  330. ).
  331. t_sync_query_aggregated(Config) ->
  332. DeviceId = iotdb_device(Config),
  333. MS = erlang:system_time(millisecond) - 5000,
  334. Payload = [
  335. make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000),
  336. make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000),
  337. make_iotdb_payload(DeviceId, "temp", "INT64", 38.7, MS - 5000),
  338. make_iotdb_payload(DeviceId, "temp", "INT64", "39", integer_to_binary(MS - 4000)),
  339. make_iotdb_payload(DeviceId, "temp", "INT64", "34", MS - 3000),
  340. make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000),
  341. make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000),
  342. %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
  343. (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
  344. make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", MS - 6000),
  345. make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, MS - 5000),
  346. make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, MS - 4000),
  347. make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", MS - 3000),
  348. make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, MS - 2000),
  349. make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, MS - 1000),
  350. make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", MS + 1000),
  351. make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, MS + 1000),
  352. make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, MS + 1000),
  353. make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", MS + 1000),
  354. make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", MS + 1000),
  355. make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", MS + 1000),
  356. make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", MS + 1000),
  357. make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, MS + 1000),
  358. make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, MS + 1000),
  359. make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", MS + 1000),
  360. make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", MS + 1000),
  361. make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", MS + 1000),
  362. make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, MS + 1000),
  363. make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", MS + 1000)
  364. ],
  365. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  366. ok = emqx_bridge_v2_testlib:t_sync_query(
  367. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  368. ),
  369. Time = integer_to_binary(MS - 20000),
  370. %% check weight
  371. QueryWeight = <<"select weight from ", DeviceId/binary, " where time > ", Time/binary>>,
  372. {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
  373. ?assertMatch(
  374. #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
  375. emqx_utils_json:decode(ResultWeight)
  376. ),
  377. %% [FIXME] https://github.com/apache/iotdb/issues/12375
  378. %% null don't seem to be supported by IoTDB insertTablet when 1.3.0
  379. case ?config(iotdb_version, Config) of
  380. ?VSN_1_3_X ->
  381. skip;
  382. _ ->
  383. %% check rest ts = MS + 1000
  384. CheckTime = integer_to_binary(MS + 1000),
  385. QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>,
  386. {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
  387. #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
  388. ResultRest
  389. ),
  390. Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
  391. Exp = #{
  392. exp(DeviceId, "charged") => true,
  393. exp(DeviceId, "floated") => true,
  394. exp(DeviceId, "started") => true,
  395. exp(DeviceId, "stoked") => true,
  396. exp(DeviceId, "enriched") => true,
  397. exp(DeviceId, "gutted") => true,
  398. exp(DeviceId, "drained") => false,
  399. exp(DeviceId, "toasted") => false,
  400. exp(DeviceId, "uncharted") => false,
  401. exp(DeviceId, "dazzled") => false,
  402. exp(DeviceId, "unplugged") => false,
  403. exp(DeviceId, "unraveled") => false,
  404. exp(DeviceId, "undecided") => null,
  405. exp(DeviceId, "foo") => <<"bar">>,
  406. exp(DeviceId, "temp") => null,
  407. exp(DeviceId, "weight") => null
  408. },
  409. ?assertEqual(Exp, Results),
  410. %% check temp
  411. QueryTemp = <<"select temp from ", DeviceId/binary, " where time > ", Time/binary>>,
  412. {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
  413. ?assertMatch(
  414. #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]},
  415. emqx_utils_json:decode(ResultTemp)
  416. )
  417. end,
  418. ok.
  419. exp(Dev, M0) ->
  420. M = s_to_b(M0),
  421. <<Dev/binary, ".", M/binary>>.
  422. t_sync_query_fail(Config) ->
  423. DeviceId = iotdb_device(Config),
  424. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
  425. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  426. IsSuccessCheck =
  427. fun(Result) ->
  428. ?assertEqual(error, element(1, Result))
  429. end,
  430. emqx_bridge_v2_testlib:t_sync_query(
  431. Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
  432. ).
  433. t_sync_device_id_missing(Config) ->
  434. emqx_bridge_v2_testlib:t_sync_query(
  435. Config,
  436. make_message_fun(iotdb_topic(Config), #{foo => bar}),
  437. is_error_check(device_id_missing),
  438. iotdb_bridge_on_query
  439. ).
  440. t_extract_device_id_from_rule_engine_message(Config) ->
  441. BridgeType = ?config(bridge_type, Config),
  442. RuleTopic = <<"t/iotdb">>,
  443. DeviceId = iotdb_device(Config),
  444. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"),
  445. Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
  446. ?check_trace(
  447. begin
  448. {ok, _} = emqx_bridge_v2_testlib:create_bridge(Config),
  449. SQL = <<
  450. "SELECT\n"
  451. " payload.measurement, payload.data_type, payload.value, payload.device_id\n"
  452. "FROM\n"
  453. " \"",
  454. RuleTopic/binary,
  455. "\""
  456. >>,
  457. Opts = #{sql => SQL},
  458. {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
  459. BridgeType, RuleTopic, Config, Opts
  460. ),
  461. emqx:publish(Message),
  462. ?block_until(handle_async_reply, 5_000),
  463. ok
  464. end,
  465. fun(Trace) ->
  466. ?assertMatch(
  467. [#{action := ack, result := {ok, 200, _, _}}],
  468. ?of_kind(handle_async_reply, Trace)
  469. ),
  470. ok
  471. end
  472. ),
  473. ok.
  474. t_sync_invalid_template(Config) ->
  475. emqx_bridge_v2_testlib:t_sync_query(
  476. Config,
  477. make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
  478. is_error_check(invalid_template),
  479. iotdb_bridge_on_query
  480. ).
  481. t_async_device_id_missing(Config) ->
  482. emqx_bridge_v2_testlib:t_async_query(
  483. Config,
  484. make_message_fun(iotdb_topic(Config), #{foo => bar}),
  485. is_error_check(device_id_missing),
  486. iotdb_bridge_on_query_async
  487. ).
  488. t_async_invalid_template(Config) ->
  489. emqx_bridge_v2_testlib:t_async_query(
  490. Config,
  491. make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
  492. is_error_check(invalid_template),
  493. iotdb_bridge_on_query_async
  494. ).
  495. t_create_via_http(Config) ->
  496. emqx_bridge_v2_testlib:t_create_via_http(Config).
  497. t_start_stop(Config) ->
  498. emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped).
  499. t_on_get_status(Config) ->
  500. emqx_bridge_v2_testlib:t_on_get_status(Config).
  501. t_device_id(Config) ->
  502. %% Create without device_id configured
  503. ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
  504. ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
  505. BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
  506. ?retry(
  507. _Sleep = 1_000,
  508. _Attempts = 20,
  509. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  510. ),
  511. ConfiguredDevice = <<"root.someOtherDevice234">>,
  512. DeviceId = <<"root.deviceFooBar123">>,
  513. Topic = <<"some/random/topic">>,
  514. iotdb_reset(Config, DeviceId),
  515. iotdb_reset(Config, ConfiguredDevice),
  516. Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
  517. MessageF1 = make_message_fun(Topic, Payload1),
  518. is_success_check(
  519. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  520. ),
  521. {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  522. ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
  523. #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
  524. ?assertNot(is_empty(Values1_1)),
  525. iotdb_reset(Config, DeviceId),
  526. iotdb_reset(Config, ConfiguredDevice),
  527. %% reconfigure bridge with device_id
  528. {ok, _} =
  529. emqx_bridge_v2_testlib:update_bridge_api(Config, #{
  530. <<"parameters">> => #{<<"device_id">> => ConfiguredDevice}
  531. }),
  532. is_success_check(
  533. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  534. ),
  535. %% even though we had a device_id in the message it's not being used
  536. {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  537. #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
  538. ?assert(is_empty(Values2_1)),
  539. {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
  540. Config, <<"select * from ", ConfiguredDevice/binary>>
  541. ),
  542. #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
  543. ?assertNot(is_empty(Values2_2)),
  544. iotdb_reset(Config, DeviceId),
  545. iotdb_reset(Config, ConfiguredDevice),
  546. ok.
  547. t_template(Config) ->
  548. %% Create without data configured
  549. ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
  550. ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
  551. BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
  552. ?retry(
  553. _Sleep = 1_000,
  554. _Attempts = 20,
  555. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  556. ),
  557. TemplateDeviceId = <<"root.deviceWithTemplate">>,
  558. DeviceId = <<"root.deviceWithoutTemplate">>,
  559. Topic = <<"some/random/topic">>,
  560. iotdb_reset(Config, DeviceId),
  561. iotdb_reset(Config, TemplateDeviceId),
  562. Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
  563. MessageF1 = make_message_fun(Topic, Payload1),
  564. is_success_check(
  565. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  566. ),
  567. {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  568. ?assertMatch(#{<<"values">> := [[true]]}, emqx_utils_json:decode(Res1_1)),
  569. iotdb_reset(Config, DeviceId),
  570. iotdb_reset(Config, TemplateDeviceId),
  571. %% reconfigure with data template
  572. {ok, _} =
  573. emqx_bridge_v2_testlib:update_bridge_api(Config, #{
  574. <<"parameters">> => #{
  575. <<"device_id">> => TemplateDeviceId,
  576. <<"data">> => [
  577. #{
  578. <<"measurement">> => <<"${payload.measurement}">>,
  579. <<"data_type">> => "TEXT",
  580. <<"value">> => <<"${payload.device_id}">>
  581. }
  582. ]
  583. }
  584. }),
  585. is_success_check(
  586. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  587. ),
  588. {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
  589. Config, <<"select * from ", TemplateDeviceId/binary>>
  590. ),
  591. ?assertMatch(#{<<"values">> := [[<<DeviceId/binary>>]]}, emqx_utils_json:decode(Res2_2)),
  592. iotdb_reset(Config, DeviceId),
  593. iotdb_reset(Config, TemplateDeviceId),
  594. ok.
  595. t_sync_query_case(Config) ->
  596. DeviceId = iotdb_device(Config),
  597. Payload = make_iotdb_payload(DeviceId, "temp", "InT32", "36"),
  598. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  599. ok = emqx_bridge_v2_testlib:t_sync_query(
  600. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  601. ),
  602. Query = <<"select temp from ", DeviceId/binary>>,
  603. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  604. ?assertMatch(
  605. #{<<"values">> := [[36]]},
  606. emqx_utils_json:decode(IoTDBResult)
  607. ).
  608. t_sync_query_invalid_type(Config) ->
  609. DeviceId = iotdb_device(Config),
  610. Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
  611. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  612. IsInvalidType = fun(Result) -> ?assertMatch({error, #{reason := invalid_type}}, Result) end,
  613. ok = emqx_bridge_v2_testlib:t_sync_query(
  614. Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
  615. ).
  616. t_sync_query_unmatched_type(Config) ->
  617. DeviceId = iotdb_device(Config),
  618. Payload = make_iotdb_payload(DeviceId, "temp", "BOOLEAN", "not boolean"),
  619. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  620. IsInvalidType = fun(Result) -> ?assertMatch({error, invalid_data}, Result) end,
  621. ok = emqx_bridge_v2_testlib:t_sync_query(
  622. Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
  623. ).
  624. is_empty(null) -> true;
  625. is_empty([]) -> true;
  626. is_empty([[]]) -> true;
  627. is_empty(_) -> false.