emqx_bridge_iotdb_impl_SUITE.erl 28 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_iotdb_impl_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include("emqx_bridge_iotdb.hrl").
  8. -include_lib("eunit/include/eunit.hrl").
  9. -include_lib("common_test/include/ct.hrl").
  10. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  11. -define(BRIDGE_TYPE_BIN, <<"iotdb">>).
  12. %%------------------------------------------------------------------------------
  13. %% CT boilerplate
  14. %%------------------------------------------------------------------------------
  15. all() ->
  16. [
  17. {group, iotdb110},
  18. {group, iotdb130},
  19. {group, legacy},
  20. {group, thrift}
  21. ].
  22. groups() ->
  23. AllTCs = emqx_common_test_helpers:all(?MODULE) -- [t_thrift_auto_recon],
  24. Async = [
  25. t_async_device_id_missing,
  26. t_async_invalid_template,
  27. t_async_query,
  28. t_extract_device_id_from_rule_engine_message,
  29. %%todo
  30. t_sync_query_aggregated
  31. ],
  32. [
  33. {iotdb110, AllTCs},
  34. {iotdb130, AllTCs},
  35. {legacy, AllTCs},
  36. {thrift, (AllTCs -- Async) ++ [t_thrift_auto_recon]}
  37. ].
  38. init_per_suite(Config) ->
  39. emqx_bridge_v2_testlib:init_per_suite(
  40. Config,
  41. [
  42. emqx,
  43. emqx_conf,
  44. emqx_bridge_iotdb,
  45. emqx_connector,
  46. emqx_bridge,
  47. emqx_rule_engine,
  48. emqx_management,
  49. emqx_mgmt_api_test_util:emqx_dashboard()
  50. ]
  51. ).
  52. end_per_suite(Config) ->
  53. emqx_bridge_v2_testlib:end_per_suite(Config).
  54. init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 ->
  55. Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"),
  56. ProxyName = atom_to_list(Type),
  57. {IotDbVersion, DefaultPort} =
  58. case Type of
  59. iotdb110 -> {?VSN_1_1_X, "18080"};
  60. iotdb130 -> {?VSN_1_3_X, "28080"}
  61. end,
  62. Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", DefaultPort)),
  63. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  64. true ->
  65. Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
  66. [
  67. {bridge_host, Host},
  68. {bridge_port, Port},
  69. {rest_port, Port},
  70. {proxy_name, ProxyName},
  71. {iotdb_version, IotDbVersion},
  72. {iotdb_rest_prefix, <<"/rest/v2/">>}
  73. | Config
  74. ];
  75. false ->
  76. case os:getenv("IS_CI") of
  77. "yes" ->
  78. throw(no_iotdb);
  79. _ ->
  80. {skip, no_iotdb}
  81. end
  82. end;
  83. init_per_group(legacy = Type, Config0) ->
  84. Host = os:getenv("IOTDB_LEGACY_HOST", "toxiproxy.emqx.net"),
  85. Port = list_to_integer(os:getenv("IOTDB_LEGACY_PORT", "38080")),
  86. ProxyName = "iotdb013",
  87. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  88. true ->
  89. Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
  90. [
  91. {bridge_host, Host},
  92. {bridge_port, Port},
  93. {rest_port, Port},
  94. {proxy_name, ProxyName},
  95. {iotdb_version, ?VSN_0_13_X},
  96. {iotdb_rest_prefix, <<"/rest/v1/">>}
  97. | Config
  98. ];
  99. false ->
  100. case os:getenv("IS_CI") of
  101. "yes" ->
  102. throw(no_iotdb);
  103. _ ->
  104. {skip, no_iotdb}
  105. end
  106. end;
  107. init_per_group(thrift = Type, Config0) ->
  108. Host = os:getenv("IOTDB_THRIFT_HOST", "toxiproxy.emqx.net"),
  109. Port = list_to_integer(os:getenv("IOTDB_THRIFT_PORT", "46667")),
  110. ProxyName = "iotdb_thrift",
  111. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  112. true ->
  113. Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
  114. [
  115. {bridge_host, Host},
  116. {bridge_port, Port},
  117. {rest_port, 48080},
  118. {proxy_name, ProxyName},
  119. {iotdb_version, ?PROTOCOL_V3},
  120. {iotdb_rest_prefix, <<"/rest/v2/">>}
  121. | Config
  122. ];
  123. false ->
  124. case os:getenv("IS_CI") of
  125. "yes" ->
  126. throw(no_iotdb);
  127. _ ->
  128. {skip, no_iotdb}
  129. end
  130. end;
  131. init_per_group(_Group, Config) ->
  132. Config.
  133. end_per_group(Group, Config) when
  134. Group =:= iotdb110;
  135. Group =:= iotdb130;
  136. Group =:= legacy
  137. ->
  138. emqx_bridge_v2_testlib:end_per_group(Config),
  139. ok;
  140. end_per_group(_Group, _Config) ->
  141. ok.
  142. init_per_testcase(TestCase, Config0) ->
  143. Type = ?config(bridge_type, Config0),
  144. UniqueNum = integer_to_binary(erlang:unique_integer()),
  145. Name = <<
  146. (atom_to_binary(TestCase))/binary, UniqueNum/binary
  147. >>,
  148. {_ConfigString, ConnectorConfig} = connector_config(TestCase, Name, Config0),
  149. {_, ActionConfig} = action_config(TestCase, Name, Config0),
  150. Config = [
  151. {connector_type, Type},
  152. {connector_name, Name},
  153. {connector_config, ConnectorConfig},
  154. {bridge_type, Type},
  155. {bridge_name, Name},
  156. {bridge_config, ActionConfig}
  157. | Config0
  158. ],
  159. iotdb_reset(Config),
  160. ok = snabbkaffe:start_trace(),
  161. Config.
  162. end_per_testcase(TestCase, Config) ->
  163. emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config).
  164. %%------------------------------------------------------------------------------
  165. %% Helper fns
  166. %%------------------------------------------------------------------------------
  167. iotdb_server_url(Host, Port) ->
  168. iolist_to_binary([
  169. "http://",
  170. Host,
  171. ":",
  172. integer_to_binary(Port)
  173. ]).
  174. bridge_config(TestCase, Config) ->
  175. UniqueNum = integer_to_binary(erlang:unique_integer()),
  176. Host = ?config(bridge_host, Config),
  177. Port = ?config(bridge_port, Config),
  178. Version = ?config(iotdb_version, Config),
  179. Type = ?config(bridge_type, Config),
  180. Name = <<
  181. (atom_to_binary(TestCase))/binary, UniqueNum/binary
  182. >>,
  183. ServerURL = iotdb_server_url(Host, Port),
  184. QueryMode = query_mode(TestCase),
  185. ConfigString =
  186. io_lib:format(
  187. "bridges.~s.~s {\n"
  188. " enable = true\n"
  189. " base_url = \"~s\"\n"
  190. " authentication = {\n"
  191. " username = \"root\"\n"
  192. " password = \"root\"\n"
  193. " }\n"
  194. " iotdb_version = \"~s\"\n"
  195. " pool_size = 1\n"
  196. " resource_opts = {\n"
  197. " health_check_interval = \"1s\"\n"
  198. " request_ttl = 30s\n"
  199. " query_mode = \"~s\"\n"
  200. " worker_pool_size = 1\n"
  201. " }\n"
  202. "}\n",
  203. [
  204. Type,
  205. Name,
  206. ServerURL,
  207. Version,
  208. QueryMode
  209. ]
  210. ),
  211. {ok, InnerConfigMap} = hocon:binary(ConfigString),
  212. {Name, ConfigString, emqx_bridge_v2_testlib:parse_and_check(Type, Name, InnerConfigMap)}.
  213. make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
  214. #{
  215. measurement => s_to_b(Measurement),
  216. data_type => s_to_b(Type),
  217. value => s_to_b(Value),
  218. device_id => DeviceId,
  219. is_aligned => true
  220. }.
  221. make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) ->
  222. Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value),
  223. Payload#{timestamp => Timestamp}.
  224. s_to_b(S) when is_list(S) -> list_to_binary(S);
  225. s_to_b(V) -> V.
  226. make_message_fun(Topic, Payload) ->
  227. fun() ->
  228. MsgId = erlang:unique_integer([positive]),
  229. #{
  230. topic => Topic,
  231. id => MsgId,
  232. payload => emqx_utils_json:encode(Payload),
  233. retain => true
  234. }
  235. end.
  236. iotdb_topic(Config) ->
  237. ?config(mqtt_topic, Config).
  238. iotdb_device(Config) ->
  239. Topic = iotdb_topic(Config),
  240. topic_to_iotdb_device(Topic).
  241. topic_to_iotdb_device(Topic) ->
  242. Device = re:replace(Topic, "/", ".", [global, {return, binary}]),
  243. <<"root.", Device/binary>>.
  244. iotdb_request(Config, Path, Body) ->
  245. iotdb_request(Config, Path, Body, #{}).
  246. iotdb_request(Config, Path, Body, Opts) ->
  247. BridgeConfig = ?config(connector_config, Config),
  248. Host = ?config(bridge_host, Config),
  249. Port = ?config(rest_port, Config),
  250. Username = <<"root">>,
  251. Password = <<"root">>,
  252. BaseURL = iotdb_server_url(Host, Port),
  253. ct:pal("bridge config: ~p", [BridgeConfig]),
  254. URL = <<BaseURL/binary, Path/binary>>,
  255. BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
  256. Headers = [
  257. {"Content-type", "application/json"},
  258. {"Authorization", binary_to_list(BasicToken)}
  259. ],
  260. emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts).
  261. iotdb_reset(Config) ->
  262. Device = iotdb_device(Config),
  263. iotdb_reset(Config, Device).
  264. iotdb_reset(Config, Device) ->
  265. Prefix = ?config(iotdb_rest_prefix, Config),
  266. Body = #{sql => <<"delete from ", Device/binary, ".*">>},
  267. {ok, _} = iotdb_request(Config, <<Prefix/binary, "nonQuery">>, Body).
  268. iotdb_query(Config, Query) ->
  269. Prefix = ?config(iotdb_rest_prefix, Config),
  270. Path = <<Prefix/binary, "query">>,
  271. Opts = #{return_all => true},
  272. Body = #{sql => Query},
  273. iotdb_request(Config, Path, Body, Opts).
  274. is_success_check({ok, 200, _, Body}) ->
  275. ?assert(is_code(200, emqx_utils_json:decode(Body)));
  276. is_success_check({ok, _}) ->
  277. ok;
  278. is_success_check(Other) ->
  279. throw(Other).
  280. is_code(Code, #{<<"code">> := Code}) -> true;
  281. is_code(_, _) -> false.
  282. is_error_check(Reason) ->
  283. fun(Result) ->
  284. ?assertEqual({error, Reason}, Result)
  285. end.
  286. action_config(TestCase, Name, Config) ->
  287. Type = ?config(bridge_type, Config),
  288. QueryMode = query_mode(TestCase),
  289. ConfigString =
  290. io_lib:format(
  291. "actions.~s.~s {\n"
  292. " enable = true\n"
  293. " connector = \"~s\"\n"
  294. " parameters = {\n"
  295. " data = []\n"
  296. " }\n"
  297. " resource_opts = {\n"
  298. " query_mode = \"~s\"\n"
  299. " }\n"
  300. "}\n",
  301. [
  302. Type,
  303. Name,
  304. Name,
  305. QueryMode
  306. ]
  307. ),
  308. ct:pal("ActionConfig:~ts~n", [ConfigString]),
  309. {ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
  310. connector_config(TestCase, Name, Config) ->
  311. Host = ?config(bridge_host, Config),
  312. Port = ?config(bridge_port, Config),
  313. Type = ?config(bridge_type, Config),
  314. Version = ?config(iotdb_version, Config),
  315. ServerURL = iotdb_server_url(Host, Port),
  316. ConfigString =
  317. case ?config(test_group, Config) of
  318. thrift ->
  319. Server = make_thrift_server(TestCase, Config),
  320. io_lib:format(
  321. "connectors.~s.~s {\n"
  322. " enable = true\n"
  323. " driver = \"thrift\"\n"
  324. " server = \"~s\"\n"
  325. " protocol_version = \"~p\"\n"
  326. " username = \"root\"\n"
  327. " password = \"root\"\n"
  328. " zoneId = \"Asia/Shanghai\"\n"
  329. " ssl.enable = false\n"
  330. "}\n",
  331. [
  332. Type,
  333. Name,
  334. Server,
  335. Version
  336. ]
  337. );
  338. _ ->
  339. io_lib:format(
  340. "connectors.~s.~s {\n"
  341. " enable = true\n"
  342. " base_url = \"~s\"\n"
  343. " iotdb_version = \"~s\"\n"
  344. " authentication = {\n"
  345. " username = \"root\"\n"
  346. " password = \"root\"\n"
  347. " }\n"
  348. "}\n",
  349. [
  350. Type,
  351. Name,
  352. ServerURL,
  353. Version
  354. ]
  355. )
  356. end,
  357. ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
  358. {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
  359. parse_action_and_check(ConfigString, BridgeType, Name) ->
  360. parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
  361. parse_connector_and_check(ConfigString, ConnectorType, Name) ->
  362. parse_and_check(
  363. ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
  364. ).
  365. %% emqx_utils_maps:safe_atom_key_map(Config).
  366. parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
  367. Type = to_bin(Type0),
  368. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  369. hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
  370. #{RootKey := #{Type := #{Name := Config}}} = RawConf,
  371. Config.
  372. to_bin(List) when is_list(List) ->
  373. unicode:characters_to_binary(List, utf8);
  374. to_bin(Atom) when is_atom(Atom) ->
  375. erlang:atom_to_binary(Atom);
  376. to_bin(Bin) when is_binary(Bin) ->
  377. Bin.
  378. %%------------------------------------------------------------------------------
  379. %% Testcases
  380. %%------------------------------------------------------------------------------
  381. t_sync_query_simple(Config) ->
  382. DeviceId = iotdb_device(Config),
  383. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
  384. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  385. ok = emqx_bridge_v2_testlib:t_sync_query(
  386. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  387. ),
  388. Query = <<"select temp from ", DeviceId/binary>>,
  389. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  390. ?assertMatch(
  391. #{<<"values">> := [[36]]},
  392. emqx_utils_json:decode(IoTDBResult)
  393. ).
  394. t_async_query(Config) ->
  395. DeviceId = iotdb_device(Config),
  396. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
  397. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  398. ok = emqx_bridge_v2_testlib:t_async_query(
  399. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
  400. ),
  401. Query = <<"select temp from ", DeviceId/binary>>,
  402. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  403. ?assertMatch(
  404. #{<<"values">> := [[36]]},
  405. emqx_utils_json:decode(IoTDBResult)
  406. ).
  407. t_sync_query_aggregated(Config) ->
  408. DeviceId = iotdb_device(Config),
  409. MS = erlang:system_time(millisecond) - 5000,
  410. Payload = [
  411. make_iotdb_payload(DeviceId, "temp", "INT32", "36", MS - 7000),
  412. make_iotdb_payload(DeviceId, "temp", "INT32", 37, MS - 6000),
  413. make_iotdb_payload(DeviceId, "temp", "INT64", 38.7, MS - 5000),
  414. make_iotdb_payload(DeviceId, "temp", "INT64", "39", integer_to_binary(MS - 4000)),
  415. make_iotdb_payload(DeviceId, "temp", "INT64", "34", MS - 3000),
  416. make_iotdb_payload(DeviceId, "temp", "INT32", 33.7, MS - 2000),
  417. make_iotdb_payload(DeviceId, "temp", "INT32", 32, MS - 1000),
  418. %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
  419. (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
  420. make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", MS - 6000),
  421. make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, MS - 5000),
  422. make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, MS - 4000),
  423. make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", MS - 3000),
  424. make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, MS - 2000),
  425. make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, MS - 1000),
  426. make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", MS + 1000),
  427. make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, MS + 1000),
  428. make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, MS + 1000),
  429. make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", MS + 1000),
  430. make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", MS + 1000),
  431. make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", MS + 1000),
  432. make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", MS + 1000),
  433. make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, MS + 1000),
  434. make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, MS + 1000),
  435. make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", MS + 1000),
  436. make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", MS + 1000),
  437. make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", MS + 1000),
  438. make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, MS + 1000),
  439. make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", MS + 1000)
  440. ],
  441. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  442. ok = emqx_bridge_v2_testlib:t_sync_query(
  443. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  444. ),
  445. Time = integer_to_binary(MS - 20000),
  446. %% check weight
  447. QueryWeight = <<"select weight from ", DeviceId/binary, " where time > ", Time/binary>>,
  448. {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
  449. ?assertMatch(
  450. #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
  451. emqx_utils_json:decode(ResultWeight)
  452. ),
  453. %% [FIXME] https://github.com/apache/iotdb/issues/12375
  454. %% null don't seem to be supported by IoTDB insertTablet when 1.3.0
  455. case ?config(iotdb_version, Config) of
  456. ?VSN_1_3_X ->
  457. skip;
  458. _ ->
  459. %% check rest ts = MS + 1000
  460. CheckTime = integer_to_binary(MS + 1000),
  461. QueryRest = <<"select * from ", DeviceId/binary, " where time = ", CheckTime/binary>>,
  462. {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
  463. #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
  464. ResultRest
  465. ),
  466. Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
  467. Exp = #{
  468. exp(DeviceId, "charged") => true,
  469. exp(DeviceId, "floated") => true,
  470. exp(DeviceId, "started") => true,
  471. exp(DeviceId, "stoked") => true,
  472. exp(DeviceId, "enriched") => true,
  473. exp(DeviceId, "gutted") => true,
  474. exp(DeviceId, "drained") => false,
  475. exp(DeviceId, "toasted") => false,
  476. exp(DeviceId, "uncharted") => false,
  477. exp(DeviceId, "dazzled") => false,
  478. exp(DeviceId, "unplugged") => false,
  479. exp(DeviceId, "unraveled") => false,
  480. exp(DeviceId, "undecided") => null,
  481. exp(DeviceId, "foo") => <<"bar">>,
  482. exp(DeviceId, "temp") => null,
  483. exp(DeviceId, "weight") => null
  484. },
  485. ?assertEqual(Exp, Results),
  486. %% check temp
  487. QueryTemp = <<"select temp from ", DeviceId/binary, " where time > ", Time/binary>>,
  488. {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
  489. ?assertMatch(
  490. #{<<"values">> := [[36, 37, 38, 39, 34, 33, 32, 41]]},
  491. emqx_utils_json:decode(ResultTemp)
  492. )
  493. end,
  494. ok.
  495. exp(Dev, M0) ->
  496. M = s_to_b(M0),
  497. <<Dev/binary, ".", M/binary>>.
  498. t_sync_query_fail(Config) ->
  499. DeviceId = iotdb_device(Config),
  500. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
  501. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  502. IsSuccessCheck =
  503. fun(Result) ->
  504. ?assertEqual(error, element(1, Result))
  505. end,
  506. emqx_bridge_v2_testlib:t_sync_query(
  507. Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
  508. ).
  509. t_sync_device_id_missing(Config) ->
  510. emqx_bridge_v2_testlib:t_sync_query(
  511. Config,
  512. make_message_fun(iotdb_topic(Config), #{foo => bar}),
  513. is_error_check(device_id_missing),
  514. iotdb_bridge_on_query
  515. ).
  516. t_extract_device_id_from_rule_engine_message(Config) ->
  517. BridgeType = ?config(bridge_type, Config),
  518. RuleTopic = <<"t/iotdb">>,
  519. DeviceId = iotdb_device(Config),
  520. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"),
  521. Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
  522. ?check_trace(
  523. begin
  524. {ok, _} = emqx_bridge_v2_testlib:create_bridge(Config),
  525. SQL = <<
  526. "SELECT\n"
  527. " payload.measurement, payload.data_type, payload.value, payload.device_id\n"
  528. "FROM\n"
  529. " \"",
  530. RuleTopic/binary,
  531. "\""
  532. >>,
  533. Opts = #{sql => SQL},
  534. {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
  535. BridgeType, RuleTopic, Config, Opts
  536. ),
  537. emqx:publish(Message),
  538. ?block_until(handle_async_reply, 5_000),
  539. ok
  540. end,
  541. fun(Trace) ->
  542. ?assertMatch(
  543. [#{action := ack, result := {ok, 200, _, _}}],
  544. ?of_kind(handle_async_reply, Trace)
  545. ),
  546. ok
  547. end
  548. ),
  549. ok.
  550. t_sync_invalid_template(Config) ->
  551. emqx_bridge_v2_testlib:t_sync_query(
  552. Config,
  553. make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
  554. is_error_check(invalid_template),
  555. iotdb_bridge_on_query
  556. ).
  557. t_async_device_id_missing(Config) ->
  558. emqx_bridge_v2_testlib:t_async_query(
  559. Config,
  560. make_message_fun(iotdb_topic(Config), #{foo => bar}),
  561. is_error_check(device_id_missing),
  562. iotdb_bridge_on_query_async
  563. ).
  564. t_async_invalid_template(Config) ->
  565. emqx_bridge_v2_testlib:t_async_query(
  566. Config,
  567. make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
  568. is_error_check(invalid_template),
  569. iotdb_bridge_on_query_async
  570. ).
  571. t_create_via_http(Config) ->
  572. emqx_bridge_v2_testlib:t_create_via_http(
  573. Config,
  574. thrift =:= ?config(test_group, Config)
  575. ).
  576. t_start_stop(Config) ->
  577. emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped).
  578. t_on_get_status(Config) ->
  579. emqx_bridge_v2_testlib:t_on_get_status(Config).
  580. t_device_id(Config) ->
  581. %% Create without device_id configured
  582. ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
  583. ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
  584. BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
  585. ?retry(
  586. _Sleep = 1_000,
  587. _Attempts = 20,
  588. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  589. ),
  590. ConfiguredDevice = <<"root.someOtherDevice234">>,
  591. DeviceId = <<"root.deviceFooBar123">>,
  592. Topic = <<"some/random/topic">>,
  593. iotdb_reset(Config, DeviceId),
  594. iotdb_reset(Config, ConfiguredDevice),
  595. Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
  596. MessageF1 = make_message_fun(Topic, Payload1),
  597. is_success_check(
  598. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  599. ),
  600. {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  601. ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
  602. #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
  603. ?assertNot(is_empty(Values1_1)),
  604. iotdb_reset(Config, DeviceId),
  605. iotdb_reset(Config, ConfiguredDevice),
  606. %% reconfigure bridge with device_id
  607. {ok, _} =
  608. emqx_bridge_v2_testlib:update_bridge_api(Config, #{
  609. <<"parameters">> => #{<<"device_id">> => ConfiguredDevice}
  610. }),
  611. is_success_check(
  612. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  613. ),
  614. %% even though we had a device_id in the message it's not being used
  615. {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  616. #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
  617. ?assert(is_empty(Values2_1)),
  618. {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
  619. Config, <<"select * from ", ConfiguredDevice/binary>>
  620. ),
  621. #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
  622. ?assertNot(is_empty(Values2_2)),
  623. iotdb_reset(Config, DeviceId),
  624. iotdb_reset(Config, ConfiguredDevice),
  625. ok.
  626. t_template(Config) ->
  627. %% Create without data configured
  628. ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
  629. ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
  630. BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
  631. ?retry(
  632. _Sleep = 1_000,
  633. _Attempts = 20,
  634. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  635. ),
  636. TemplateDeviceId = <<"root.deviceWithTemplate">>,
  637. DeviceId = <<"root.deviceWithoutTemplate">>,
  638. Topic = <<"some/random/topic">>,
  639. iotdb_reset(Config, DeviceId),
  640. iotdb_reset(Config, TemplateDeviceId),
  641. Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
  642. MessageF1 = make_message_fun(Topic, Payload1),
  643. is_success_check(
  644. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  645. ),
  646. {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  647. ?assertMatch(#{<<"values">> := [[true]]}, emqx_utils_json:decode(Res1_1)),
  648. iotdb_reset(Config, DeviceId),
  649. iotdb_reset(Config, TemplateDeviceId),
  650. %% reconfigure with data template
  651. {ok, _} =
  652. emqx_bridge_v2_testlib:update_bridge_api(Config, #{
  653. <<"parameters">> => #{
  654. <<"device_id">> => TemplateDeviceId,
  655. <<"data">> => [
  656. #{
  657. <<"measurement">> => <<"${payload.measurement}">>,
  658. <<"data_type">> => "TEXT",
  659. <<"value">> => <<"${payload.device_id}">>
  660. }
  661. ]
  662. }
  663. }),
  664. is_success_check(
  665. emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()})
  666. ),
  667. {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
  668. Config, <<"select * from ", TemplateDeviceId/binary>>
  669. ),
  670. ?assertMatch(#{<<"values">> := [[<<DeviceId/binary>>]]}, emqx_utils_json:decode(Res2_2)),
  671. iotdb_reset(Config, DeviceId),
  672. iotdb_reset(Config, TemplateDeviceId),
  673. ok.
  674. t_sync_query_case(Config) ->
  675. DeviceId = iotdb_device(Config),
  676. Payload = make_iotdb_payload(DeviceId, "temp", "InT32", "36"),
  677. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  678. ok = emqx_bridge_v2_testlib:t_sync_query(
  679. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  680. ),
  681. Query = <<"select temp from ", DeviceId/binary>>,
  682. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  683. ?assertMatch(
  684. #{<<"values">> := [[36]]},
  685. emqx_utils_json:decode(IoTDBResult)
  686. ).
  687. t_sync_query_invalid_type(Config) ->
  688. DeviceId = iotdb_device(Config),
  689. Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
  690. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  691. IsInvalidType = fun(Result) -> ?assertMatch({error, #{reason := invalid_type}}, Result) end,
  692. ok = emqx_bridge_v2_testlib:t_sync_query(
  693. Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
  694. ).
  695. t_sync_query_unmatched_type(Config) ->
  696. DeviceId = iotdb_device(Config),
  697. Payload = make_iotdb_payload(DeviceId, "temp", "BOOLEAN", "not boolean"),
  698. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  699. IsInvalidType = fun(Result) -> ?assertMatch({error, invalid_data}, Result) end,
  700. ok = emqx_bridge_v2_testlib:t_sync_query(
  701. Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
  702. ).
  703. t_thrift_auto_recon(Config) ->
  704. emqx_bridge_v2_testlib:t_on_get_status(Config).
  705. is_empty(null) -> true;
  706. is_empty([]) -> true;
  707. is_empty([[]]) -> true;
  708. is_empty(_) -> false.
  709. make_thrift_server(t_thrift_auto_recon, Config) ->
  710. Host = ?config(bridge_host, Config),
  711. Port = ?config(bridge_port, Config),
  712. lists:flatten(io_lib:format("127.0.0.1:9999,~s:~p", [Host, Port]));
  713. make_thrift_server(_, Config) ->
  714. Host = ?config(bridge_host, Config),
  715. Port = ?config(bridge_port, Config),
  716. lists:flatten(io_lib:format("~s:~p", [Host, Port])).
  717. query_mode(TestCase) ->
  718. Name = erlang:atom_to_list(TestCase),
  719. Tokens = string:tokens(Name, "_"),
  720. case lists:member("async", Tokens) of
  721. true ->
  722. async;
  723. _ ->
  724. sync
  725. end.