emqx_bridge_iotdb_impl.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_iotdb_impl).
  5. -include("emqx_bridge_iotdb.hrl").
  6. -include_lib("emqx/include/logger.hrl").
  7. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  8. %% `emqx_resource' API
  9. -export([
  10. callback_mode/0,
  11. on_start/2,
  12. on_stop/2,
  13. on_get_status/2,
  14. on_query/3,
  15. on_query_async/4
  16. ]).
  17. -type config() ::
  18. #{
  19. base_url := #{
  20. scheme := http | https,
  21. host := iolist(),
  22. port := inet:port_number(),
  23. path := _
  24. },
  25. connect_timeout := pos_integer(),
  26. pool_type := random | hash,
  27. pool_size := pos_integer(),
  28. request => undefined | map(),
  29. is_aligned => boolean(),
  30. iotdb_version => binary(),
  31. device_id => binary() | undefined,
  32. atom() => _
  33. }.
  34. -type state() ::
  35. #{
  36. base_path := _,
  37. base_url := #{
  38. scheme := http | https,
  39. host := iolist(),
  40. port := inet:port_number(),
  41. path := _
  42. },
  43. connect_timeout := pos_integer(),
  44. pool_type := random | hash,
  45. pool_size := pos_integer(),
  46. request => undefined | map(),
  47. is_aligned => boolean(),
  48. iotdb_version => binary(),
  49. device_id => binary() | undefined,
  50. atom() => _
  51. }.
  52. -type manager_id() :: binary().
  53. %%-------------------------------------------------------------------------------------
  54. %% `emqx_resource' API
  55. %%-------------------------------------------------------------------------------------
  56. callback_mode() -> async_if_possible.
  57. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
  58. on_start(InstanceId, Config) ->
  59. %% [FIXME] The configuration passed in here is pre-processed and transformed
  60. %% in emqx_bridge_resource:parse_confs/2.
  61. case emqx_connector_http:on_start(InstanceId, Config) of
  62. {ok, State} ->
  63. ?SLOG(info, #{
  64. msg => "iotdb_bridge_started",
  65. instance_id => InstanceId,
  66. request => maps:get(request, State, <<>>)
  67. }),
  68. ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
  69. {ok, maps:merge(Config, State)};
  70. {error, Reason} ->
  71. ?SLOG(error, #{
  72. msg => "failed_to_start_iotdb_bridge",
  73. instance_id => InstanceId,
  74. base_url => maps:get(request, Config, <<>>),
  75. reason => Reason
  76. }),
  77. throw(failed_to_start_iotdb_bridge)
  78. end.
  79. -spec on_stop(manager_id(), state()) -> ok | {error, term()}.
  80. on_stop(InstanceId, State) ->
  81. ?SLOG(info, #{
  82. msg => "stopping_iotdb_bridge",
  83. connector => InstanceId
  84. }),
  85. Res = emqx_connector_http:on_stop(InstanceId, State),
  86. ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
  87. Res.
  88. -spec on_get_status(manager_id(), state()) ->
  89. {connected, state()} | {disconnected, state(), term()}.
  90. on_get_status(InstanceId, State) ->
  91. emqx_connector_http:on_get_status(InstanceId, State).
  92. -spec on_query(manager_id(), {send_message, map()}, state()) ->
  93. {ok, pos_integer(), [term()], term()}
  94. | {ok, pos_integer(), [term()]}
  95. | {error, term()}.
  96. on_query(InstanceId, {send_message, Message}, State) ->
  97. ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
  98. ?SLOG(debug, #{
  99. msg => "iotdb_bridge_on_query_called",
  100. instance_id => InstanceId,
  101. send_message => Message,
  102. state => emqx_utils:redact(State)
  103. }),
  104. case make_iotdb_insert_request(Message, State) of
  105. {ok, IoTDBPayload} ->
  106. handle_response(
  107. emqx_connector_http:on_query(
  108. InstanceId, {send_message, IoTDBPayload}, State
  109. )
  110. );
  111. Error ->
  112. Error
  113. end.
  114. -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
  115. {ok, pid()} | {error, empty_request}.
  116. on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
  117. ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
  118. ?SLOG(debug, #{
  119. msg => "iotdb_bridge_on_query_async_called",
  120. instance_id => InstanceId,
  121. send_message => Message,
  122. state => emqx_utils:redact(State)
  123. }),
  124. case make_iotdb_insert_request(Message, State) of
  125. {ok, IoTDBPayload} ->
  126. ReplyFunAndArgs =
  127. {
  128. fun(Result) ->
  129. Response = handle_response(Result),
  130. emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
  131. end,
  132. []
  133. },
  134. emqx_connector_http:on_query_async(
  135. InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
  136. );
  137. Error ->
  138. Error
  139. end.
  140. %%--------------------------------------------------------------------
  141. %% Internal Functions
  142. %%--------------------------------------------------------------------
  143. get_payload(#{payload := Payload}) ->
  144. Payload;
  145. get_payload(#{<<"payload">> := Payload}) ->
  146. Payload.
  147. parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
  148. ParsedPayload;
  149. parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) ->
  150. emqx_utils_json:decode(UnparsedPayload);
  151. parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) ->
  152. lists:map(fun parse_payload/1, UnparsedPayloads).
  153. preproc_data_list(DataList) ->
  154. lists:foldl(
  155. fun preproc_data/2,
  156. [],
  157. DataList
  158. ).
  159. preproc_data(
  160. #{
  161. <<"measurement">> := Measurement,
  162. <<"data_type">> := DataType,
  163. <<"value">> := Value
  164. } = Data,
  165. Acc
  166. ) ->
  167. [
  168. #{
  169. timestamp => maybe_preproc_tmpl(
  170. maps:get(<<"timestamp">>, Data, <<"now">>)
  171. ),
  172. measurement => emqx_placeholder:preproc_tmpl(Measurement),
  173. data_type => DataType,
  174. value => maybe_preproc_tmpl(Value)
  175. }
  176. | Acc
  177. ];
  178. preproc_data(_NoMatch, Acc) ->
  179. ?SLOG(
  180. warning,
  181. #{
  182. msg => "iotdb_bridge_preproc_data_failed",
  183. required_fields => ['measurement', 'data_type', 'value'],
  184. received => _NoMatch
  185. }
  186. ),
  187. Acc.
  188. maybe_preproc_tmpl(Value) when is_binary(Value) ->
  189. emqx_placeholder:preproc_tmpl(Value);
  190. maybe_preproc_tmpl(Value) ->
  191. Value.
  192. proc_data(PreProcessedData, Msg) ->
  193. NowNS = erlang:system_time(nanosecond),
  194. Nows = #{
  195. now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond),
  196. now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
  197. now_ns => NowNS
  198. },
  199. lists:map(
  200. fun(
  201. #{
  202. timestamp := TimestampTkn,
  203. measurement := Measurement,
  204. data_type := DataType,
  205. value := ValueTkn
  206. }
  207. ) ->
  208. #{
  209. timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
  210. measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
  211. data_type => DataType,
  212. value => proc_value(DataType, ValueTkn, Msg)
  213. }
  214. end,
  215. PreProcessedData
  216. ).
  217. iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
  218. Timestamp;
  219. iot_timestamp(TimestampTkn, Msg, Nows) ->
  220. iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
  221. iot_timestamp(Timestamp, #{now_ms := NowMs}) when
  222. Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
  223. ->
  224. NowMs;
  225. iot_timestamp(Timestamp, #{now_us := NowUs}) when Timestamp =:= <<"now_us">> ->
  226. NowUs;
  227. iot_timestamp(Timestamp, #{now_ns := NowNs}) when Timestamp =:= <<"now_ns">> ->
  228. NowNs;
  229. iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
  230. binary_to_integer(Timestamp).
  231. proc_value(<<"TEXT">>, ValueTkn, Msg) ->
  232. case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
  233. <<"undefined">> -> null;
  234. Val -> Val
  235. end;
  236. proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
  237. convert_bool(replace_var(ValueTkn, Msg));
  238. proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
  239. convert_int(replace_var(ValueTkn, Msg));
  240. proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
  241. convert_float(replace_var(ValueTkn, Msg)).
  242. replace_var(Tokens, Data) when is_list(Tokens) ->
  243. [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
  244. Val;
  245. replace_var(Val, _Data) ->
  246. Val.
  247. convert_bool(B) when is_boolean(B) -> B;
  248. convert_bool(null) -> null;
  249. convert_bool(1) -> true;
  250. convert_bool(0) -> false;
  251. convert_bool(<<"1">>) -> true;
  252. convert_bool(<<"0">>) -> false;
  253. convert_bool(<<"true">>) -> true;
  254. convert_bool(<<"True">>) -> true;
  255. convert_bool(<<"TRUE">>) -> true;
  256. convert_bool(<<"false">>) -> false;
  257. convert_bool(<<"False">>) -> false;
  258. convert_bool(<<"FALSE">>) -> false.
  259. convert_int(Int) when is_integer(Int) -> Int;
  260. convert_int(Float) when is_float(Float) -> floor(Float);
  261. convert_int(Str) when is_binary(Str) ->
  262. try
  263. binary_to_integer(Str)
  264. catch
  265. _:_ ->
  266. convert_int(binary_to_float(Str))
  267. end;
  268. convert_int(undefined) ->
  269. null.
  270. convert_float(Float) when is_float(Float) -> Float;
  271. convert_float(Int) when is_integer(Int) -> Int * 10 / 10;
  272. convert_float(Str) when is_binary(Str) ->
  273. try
  274. binary_to_float(Str)
  275. catch
  276. _:_ ->
  277. convert_float(binary_to_integer(Str))
  278. end;
  279. convert_float(undefined) ->
  280. null.
  281. make_iotdb_insert_request(Message, State) ->
  282. Payloads = to_list(parse_payload(get_payload(Message))),
  283. IsAligned = maps:get(is_aligned, State, false),
  284. IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
  285. case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of
  286. {undefined, _} ->
  287. {error, device_id_missing};
  288. {_, []} ->
  289. {error, invalid_data};
  290. {DeviceId, PreProcessedData} ->
  291. DataList = proc_data(PreProcessedData, Message),
  292. InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
  293. Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
  294. {ok,
  295. maps:merge(Rows, #{
  296. iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
  297. iotdb_field_key(device_id, IotDBVsn) => DeviceId
  298. })}
  299. end.
  300. replace_dtypes(Rows0, IotDBVsn) ->
  301. {Types, Rows} = maps:take(dtypes, Rows0),
  302. Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}.
  303. aggregate_rows(DataList, InitAcc) ->
  304. lists:foldr(
  305. fun(
  306. #{
  307. timestamp := Timestamp,
  308. measurement := Measurement,
  309. data_type := DataType,
  310. value := Data
  311. },
  312. #{
  313. timestamps := AccTs,
  314. measurements := AccM,
  315. dtypes := AccDt,
  316. values := AccV
  317. } = Acc
  318. ) ->
  319. Timestamps = [Timestamp | AccTs],
  320. case index_of(Measurement, AccM) of
  321. 0 ->
  322. Acc#{
  323. timestamps => Timestamps,
  324. values => [pad_value(Data, length(AccTs)) | pad_existing_values(AccV)],
  325. measurements => [Measurement | AccM],
  326. dtypes => [DataType | AccDt]
  327. };
  328. Index ->
  329. Acc#{
  330. timestamps => Timestamps,
  331. values => insert_value(Index, Data, AccV),
  332. measurements => AccM,
  333. dtypes => AccDt
  334. }
  335. end
  336. end,
  337. InitAcc,
  338. DataList
  339. ).
  340. pad_value(Data, N) ->
  341. [Data | lists:duplicate(N, null)].
  342. pad_existing_values(Values) ->
  343. [[null | Value] || Value <- Values].
  344. index_of(E, List) ->
  345. string:str(List, [E]).
  346. insert_value(_Index, _Data, []) ->
  347. [];
  348. insert_value(1, Data, [Value | Values]) ->
  349. [[Data | Value] | insert_value(0, Data, Values)];
  350. insert_value(Index, Data, [Value | Values]) ->
  351. [[null | Value] | insert_value(Index - 1, Data, Values)].
  352. iotdb_field_key(is_aligned, ?VSN_1_1_X) ->
  353. <<"is_aligned">>;
  354. iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
  355. <<"is_aligned">>;
  356. iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
  357. <<"isAligned">>;
  358. iotdb_field_key(device_id, ?VSN_1_1_X) ->
  359. <<"device">>;
  360. iotdb_field_key(device_id, ?VSN_1_0_X) ->
  361. <<"device">>;
  362. iotdb_field_key(device_id, ?VSN_0_13_X) ->
  363. <<"deviceId">>;
  364. iotdb_field_key(data_types, ?VSN_1_1_X) ->
  365. <<"data_types">>;
  366. iotdb_field_key(data_types, ?VSN_1_0_X) ->
  367. <<"data_types">>;
  368. iotdb_field_key(data_types, ?VSN_0_13_X) ->
  369. <<"dataTypes">>.
  370. to_list(List) when is_list(List) -> List;
  371. to_list(Data) -> [Data].
  372. device_id(Message, Payloads, State) ->
  373. case maps:get(device_id, State, undefined) of
  374. undefined ->
  375. %% [FIXME] there could be conflicting device-ids in the Payloads
  376. maps:get(<<"device_id">>, hd(Payloads), undefined);
  377. DeviceId ->
  378. DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId),
  379. emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
  380. end.
  381. handle_response({ok, 200, _Headers, Body} = Resp) ->
  382. eval_response_body(Body, Resp);
  383. handle_response({ok, 200, Body} = Resp) ->
  384. eval_response_body(Body, Resp);
  385. handle_response({ok, Code, _Headers, Body}) ->
  386. {error, #{code => Code, body => Body}};
  387. handle_response({ok, Code, Body}) ->
  388. {error, #{code => Code, body => Body}};
  389. handle_response({error, _} = Error) ->
  390. Error.
  391. eval_response_body(Body, Resp) ->
  392. case emqx_utils_json:decode(Body) of
  393. #{<<"code">> := 200} -> Resp;
  394. Reason -> {error, Reason}
  395. end.