emqx_bridge_iotdb_impl_SUITE.erl 18 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_iotdb_impl_SUITE).
  5. -compile(nowarn_export_all).
  6. -compile(export_all).
  7. -include("emqx_bridge_iotdb.hrl").
  8. -include_lib("eunit/include/eunit.hrl").
  9. -include_lib("common_test/include/ct.hrl").
  10. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  11. -define(BRIDGE_TYPE_BIN, <<"iotdb">>).
  12. -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]).
  13. %%------------------------------------------------------------------------------
  14. %% CT boilerplate
  15. %%------------------------------------------------------------------------------
  16. all() ->
  17. [
  18. {group, plain},
  19. {group, legacy}
  20. ].
  21. groups() ->
  22. AllTCs = emqx_common_test_helpers:all(?MODULE),
  23. [
  24. {plain, AllTCs},
  25. {legacy, AllTCs}
  26. ].
  27. init_per_suite(Config) ->
  28. emqx_bridge_testlib:init_per_suite(Config, ?APPS).
  29. end_per_suite(Config) ->
  30. emqx_bridge_testlib:end_per_suite(Config).
  31. init_per_group(plain = Type, Config0) ->
  32. Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"),
  33. Port = list_to_integer(os:getenv("IOTDB_PLAIN_PORT", "18080")),
  34. ProxyName = "iotdb",
  35. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  36. true ->
  37. Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
  38. [
  39. {bridge_host, Host},
  40. {bridge_port, Port},
  41. {proxy_name, ProxyName},
  42. {iotdb_version, ?VSN_1_1_X},
  43. {iotdb_rest_prefix, <<"/rest/v2/">>}
  44. | Config
  45. ];
  46. false ->
  47. case os:getenv("IS_CI") of
  48. "yes" ->
  49. throw(no_iotdb);
  50. _ ->
  51. {skip, no_iotdb}
  52. end
  53. end;
  54. init_per_group(legacy = Type, Config0) ->
  55. Host = os:getenv("IOTDB_LEGACY_HOST", "toxiproxy.emqx.net"),
  56. Port = list_to_integer(os:getenv("IOTDB_LEGACY_PORT", "38080")),
  57. ProxyName = "iotdb013",
  58. case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
  59. true ->
  60. Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
  61. [
  62. {bridge_host, Host},
  63. {bridge_port, Port},
  64. {proxy_name, ProxyName},
  65. {iotdb_version, ?VSN_0_13_X},
  66. {iotdb_rest_prefix, <<"/rest/v1/">>}
  67. | Config
  68. ];
  69. false ->
  70. case os:getenv("IS_CI") of
  71. "yes" ->
  72. throw(no_iotdb);
  73. _ ->
  74. {skip, no_iotdb}
  75. end
  76. end;
  77. init_per_group(_Group, Config) ->
  78. Config.
  79. end_per_group(Group, Config) when
  80. Group =:= plain;
  81. Group =:= legacy
  82. ->
  83. emqx_bridge_testlib:end_per_group(Config),
  84. ok;
  85. end_per_group(_Group, _Config) ->
  86. ok.
  87. init_per_testcase(TestCase, Config0) ->
  88. Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3),
  89. iotdb_reset(Config),
  90. Config.
  91. end_per_testcase(TestCase, Config) ->
  92. emqx_bridge_testlib:end_per_testcase(TestCase, Config).
  93. %%------------------------------------------------------------------------------
  94. %% Helper fns
  95. %%------------------------------------------------------------------------------
  96. iotdb_server_url(Host, Port) ->
  97. iolist_to_binary([
  98. "http://",
  99. Host,
  100. ":",
  101. integer_to_binary(Port)
  102. ]).
  103. bridge_config(TestCase, _TestGroup, Config) ->
  104. UniqueNum = integer_to_binary(erlang:unique_integer()),
  105. Host = ?config(bridge_host, Config),
  106. Port = ?config(bridge_port, Config),
  107. Version = ?config(iotdb_version, Config),
  108. Type = ?config(bridge_type, Config),
  109. Name = <<
  110. (atom_to_binary(TestCase))/binary, UniqueNum/binary
  111. >>,
  112. ServerURL = iotdb_server_url(Host, Port),
  113. ConfigString =
  114. io_lib:format(
  115. "bridges.~s.~s {\n"
  116. " enable = true\n"
  117. " base_url = \"~s\"\n"
  118. " authentication = {\n"
  119. " username = \"root\"\n"
  120. " password = \"root\"\n"
  121. " }\n"
  122. " iotdb_version = \"~s\"\n"
  123. " pool_size = 1\n"
  124. " resource_opts = {\n"
  125. " health_check_interval = \"5s\"\n"
  126. " request_ttl = 30s\n"
  127. " query_mode = \"async\"\n"
  128. " worker_pool_size = 1\n"
  129. " }\n"
  130. "}\n",
  131. [
  132. Type,
  133. Name,
  134. ServerURL,
  135. Version
  136. ]
  137. ),
  138. {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}.
  139. make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
  140. #{
  141. measurement => s_to_b(Measurement),
  142. data_type => s_to_b(Type),
  143. value => s_to_b(Value),
  144. device_id => DeviceId,
  145. is_aligned => true
  146. }.
  147. make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) ->
  148. Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value),
  149. Payload#{timestamp => Timestamp}.
  150. s_to_b(S) when is_list(S) -> list_to_binary(S);
  151. s_to_b(V) -> V.
  152. make_message_fun(Topic, Payload) ->
  153. fun() ->
  154. MsgId = erlang:unique_integer([positive]),
  155. #{
  156. topic => Topic,
  157. id => MsgId,
  158. payload => emqx_utils_json:encode(Payload),
  159. retain => true
  160. }
  161. end.
  162. iotdb_topic(Config) ->
  163. ?config(mqtt_topic, Config).
  164. iotdb_device(Config) ->
  165. Topic = iotdb_topic(Config),
  166. topic_to_iotdb_device(Topic).
  167. topic_to_iotdb_device(Topic) ->
  168. Device = re:replace(Topic, "/", ".", [global, {return, binary}]),
  169. <<"root.", Device/binary>>.
  170. iotdb_request(Config, Path, Body) ->
  171. iotdb_request(Config, Path, Body, #{}).
  172. iotdb_request(Config, Path, Body, Opts) ->
  173. _BridgeConfig =
  174. #{
  175. <<"base_url">> := BaseURL,
  176. <<"authentication">> := #{
  177. <<"username">> := Username,
  178. <<"password">> := Password
  179. }
  180. } =
  181. ?config(bridge_config, Config),
  182. ct:pal("bridge config: ~p", [_BridgeConfig]),
  183. URL = <<BaseURL/binary, Path/binary>>,
  184. BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
  185. Headers = [
  186. {"Content-type", "application/json"},
  187. {"Authorization", binary_to_list(BasicToken)}
  188. ],
  189. emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts).
  190. iotdb_reset(Config) ->
  191. Device = iotdb_device(Config),
  192. iotdb_reset(Config, Device).
  193. iotdb_reset(Config, Device) ->
  194. Prefix = ?config(iotdb_rest_prefix, Config),
  195. Body = #{sql => <<"delete from ", Device/binary, ".*">>},
  196. {ok, _} = iotdb_request(Config, <<Prefix/binary, "nonQuery">>, Body).
  197. iotdb_query(Config, Query) ->
  198. Prefix = ?config(iotdb_rest_prefix, Config),
  199. Path = <<Prefix/binary, "query">>,
  200. Opts = #{return_all => true},
  201. Body = #{sql => Query},
  202. iotdb_request(Config, Path, Body, Opts).
  203. is_success_check({ok, 200, _, Body}) ->
  204. ?assert(is_code(200, emqx_utils_json:decode(Body))).
  205. is_code(Code, #{<<"code">> := Code}) -> true;
  206. is_code(_, _) -> false.
  207. is_error_check(Reason) ->
  208. fun(Result) ->
  209. ?assertEqual({error, Reason}, Result)
  210. end.
  211. %%------------------------------------------------------------------------------
  212. %% Testcases
  213. %%------------------------------------------------------------------------------
  214. t_sync_query_simple(Config) ->
  215. DeviceId = iotdb_device(Config),
  216. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
  217. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  218. ok = emqx_bridge_testlib:t_sync_query(
  219. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  220. ),
  221. Query = <<"select temp from ", DeviceId/binary>>,
  222. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  223. ?assertMatch(
  224. #{<<"values">> := [[36]]},
  225. emqx_utils_json:decode(IoTDBResult)
  226. ).
  227. t_async_query(Config) ->
  228. DeviceId = iotdb_device(Config),
  229. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
  230. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  231. ok = emqx_bridge_testlib:t_async_query(
  232. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
  233. ),
  234. Query = <<"select temp from ", DeviceId/binary>>,
  235. {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
  236. ?assertMatch(
  237. #{<<"values">> := [[36]]},
  238. emqx_utils_json:decode(IoTDBResult)
  239. ).
  240. t_sync_query_aggregated(Config) ->
  241. DeviceId = iotdb_device(Config),
  242. Payload = [
  243. make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290),
  244. make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291),
  245. make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292),
  246. make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>),
  247. make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294),
  248. make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295),
  249. make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296),
  250. %% implicit 'now()' timestamp
  251. make_iotdb_payload(DeviceId, "temp", "INT32", "40"),
  252. %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
  253. (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
  254. (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>},
  255. make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290),
  256. make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291),
  257. make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292),
  258. make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293),
  259. make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294),
  260. make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295),
  261. make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300),
  262. make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300),
  263. make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300),
  264. make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300),
  265. make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300),
  266. make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300),
  267. make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300),
  268. make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300),
  269. make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300),
  270. make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300),
  271. make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300),
  272. make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300),
  273. make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300),
  274. make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
  275. ],
  276. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  277. ok = emqx_bridge_testlib:t_sync_query(
  278. Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
  279. ),
  280. %% check temp
  281. QueryTemp = <<"select temp from ", DeviceId/binary>>,
  282. {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
  283. ?assertMatch(
  284. #{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]},
  285. emqx_utils_json:decode(ResultTemp)
  286. ),
  287. %% check weight
  288. QueryWeight = <<"select weight from ", DeviceId/binary>>,
  289. {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
  290. ?assertMatch(
  291. #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
  292. emqx_utils_json:decode(ResultWeight)
  293. ),
  294. %% check rest ts = 1685112026300
  295. QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>,
  296. {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
  297. #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
  298. ResultRest
  299. ),
  300. Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
  301. Exp = #{
  302. exp(DeviceId, "charged") => true,
  303. exp(DeviceId, "floated") => true,
  304. exp(DeviceId, "started") => true,
  305. exp(DeviceId, "stoked") => true,
  306. exp(DeviceId, "enriched") => true,
  307. exp(DeviceId, "gutted") => true,
  308. exp(DeviceId, "drained") => false,
  309. exp(DeviceId, "toasted") => false,
  310. exp(DeviceId, "uncharted") => false,
  311. exp(DeviceId, "dazzled") => false,
  312. exp(DeviceId, "unplugged") => false,
  313. exp(DeviceId, "unraveled") => false,
  314. exp(DeviceId, "undecided") => null,
  315. exp(DeviceId, "foo") => <<"bar">>,
  316. exp(DeviceId, "temp") => null,
  317. exp(DeviceId, "weight") => null
  318. },
  319. ?assertEqual(Exp, Results),
  320. ok.
  321. exp(Dev, M0) ->
  322. M = s_to_b(M0),
  323. <<Dev/binary, ".", M/binary>>.
  324. t_sync_query_fail(Config) ->
  325. DeviceId = iotdb_device(Config),
  326. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
  327. MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
  328. IsSuccessCheck =
  329. fun(Result) ->
  330. ?assertEqual(error, element(1, Result))
  331. end,
  332. emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query).
  333. t_sync_device_id_missing(Config) ->
  334. emqx_bridge_testlib:t_sync_query(
  335. Config,
  336. make_message_fun(iotdb_topic(Config), #{foo => bar}),
  337. is_error_check(device_id_missing),
  338. iotdb_bridge_on_query
  339. ).
  340. t_extract_device_id_from_rule_engine_message(Config) ->
  341. BridgeType = ?config(bridge_type, Config),
  342. RuleTopic = <<"t/iotdb">>,
  343. DeviceId = iotdb_device(Config),
  344. Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"),
  345. Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
  346. ?check_trace(
  347. begin
  348. {ok, _} = emqx_bridge_testlib:create_bridge(Config),
  349. SQL = <<
  350. "SELECT\n"
  351. " payload.measurement, payload.data_type, payload.value, payload.device_id\n"
  352. "FROM\n"
  353. " \"",
  354. RuleTopic/binary,
  355. "\""
  356. >>,
  357. Opts = #{sql => SQL},
  358. {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(
  359. BridgeType, RuleTopic, Config, Opts
  360. ),
  361. emqx:publish(Message),
  362. ?block_until(handle_async_reply, 5_000),
  363. ok
  364. end,
  365. fun(Trace) ->
  366. ?assertMatch(
  367. [#{action := ack, result := {ok, 200, _, _}}],
  368. ?of_kind(handle_async_reply, Trace)
  369. ),
  370. ok
  371. end
  372. ),
  373. ok.
  374. t_sync_invalid_data(Config) ->
  375. emqx_bridge_testlib:t_sync_query(
  376. Config,
  377. make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
  378. is_error_check(invalid_data),
  379. iotdb_bridge_on_query
  380. ).
  381. t_async_device_id_missing(Config) ->
  382. emqx_bridge_testlib:t_async_query(
  383. Config,
  384. make_message_fun(iotdb_topic(Config), #{foo => bar}),
  385. is_error_check(device_id_missing),
  386. iotdb_bridge_on_query_async
  387. ).
  388. t_async_invalid_data(Config) ->
  389. emqx_bridge_testlib:t_async_query(
  390. Config,
  391. make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
  392. is_error_check(invalid_data),
  393. iotdb_bridge_on_query_async
  394. ).
  395. t_create_via_http(Config) ->
  396. emqx_bridge_testlib:t_create_via_http(Config).
  397. t_start_stop(Config) ->
  398. emqx_bridge_testlib:t_start_stop(Config, iotdb_bridge_stopped).
  399. t_on_get_status(Config) ->
  400. emqx_bridge_testlib:t_on_get_status(Config).
  401. t_device_id(Config) ->
  402. ResourceId = emqx_bridge_testlib:resource_id(Config),
  403. %% Create without device_id configured
  404. ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)),
  405. ?retry(
  406. _Sleep = 1_000,
  407. _Attempts = 20,
  408. ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
  409. ),
  410. ConfiguredDevice = <<"root.someOtherDevice234">>,
  411. DeviceId = <<"root.deviceFooBar123">>,
  412. Topic = <<"some/random/topic">>,
  413. iotdb_reset(Config, DeviceId),
  414. iotdb_reset(Config, ConfiguredDevice),
  415. Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
  416. MessageF1 = make_message_fun(Topic, Payload1),
  417. is_success_check(
  418. emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
  419. ),
  420. {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  421. ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
  422. #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
  423. ?assertNot(is_empty(Values1_1)),
  424. iotdb_reset(Config, DeviceId),
  425. iotdb_reset(Config, ConfiguredDevice),
  426. %% reconfigure bridge with device_id
  427. {ok, _} =
  428. emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}),
  429. is_success_check(
  430. emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
  431. ),
  432. %% even though we had a device_id in the message it's not being used
  433. {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
  434. #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
  435. ?assert(is_empty(Values2_1)),
  436. {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
  437. Config, <<"select * from ", ConfiguredDevice/binary>>
  438. ),
  439. #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
  440. ?assertNot(is_empty(Values2_2)),
  441. iotdb_reset(Config, DeviceId),
  442. iotdb_reset(Config, ConfiguredDevice),
  443. ok.
  444. is_empty(null) -> true;
  445. is_empty([]) -> true;
  446. is_empty([[]]) -> true;
  447. is_empty(_) -> false.