emqx_bridge_s3_upload.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_s3_upload).
  5. -include_lib("typerefl/include/types.hrl").
  6. -include_lib("hocon/include/hoconsc.hrl").
  7. -include("emqx_bridge_s3.hrl").
  8. -define(ACTION, ?ACTION_UPLOAD).
  9. -behaviour(hocon_schema).
  10. -export([
  11. namespace/0,
  12. roots/0,
  13. fields/1,
  14. desc/1
  15. ]).
  16. %% Interpreting options
  17. -export([
  18. mk_key_template/1,
  19. mk_upload_options/1
  20. ]).
  21. -export([
  22. bridge_v2_examples/1
  23. ]).
  24. %% Internal exports
  25. -export([convert_actions/2]).
  26. -define(DEFAULT_AGGREG_BATCH_SIZE, 100).
  27. -define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
  28. %%-------------------------------------------------------------------------------------------------
  29. %% `hocon_schema' API
  30. %%-------------------------------------------------------------------------------------------------
  31. namespace() ->
  32. "bridge_s3".
  33. roots() ->
  34. [].
  35. fields(Field) when
  36. Field == "get_bridge_v2";
  37. Field == "put_bridge_v2";
  38. Field == "post_bridge_v2"
  39. ->
  40. emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
  41. fields(action) ->
  42. {?ACTION,
  43. hoconsc:mk(
  44. hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
  45. #{
  46. desc => <<"S3 Upload Action Config">>,
  47. required => false,
  48. converter => fun ?MODULE:convert_actions/2
  49. }
  50. )};
  51. fields(?ACTION) ->
  52. emqx_bridge_v2_schema:make_producer_action_schema(
  53. hoconsc:mk(
  54. mkunion(mode, #{
  55. <<"direct">> => ?R_REF(s3_direct_upload_parameters),
  56. <<"aggregated">> => ?R_REF(s3_aggregated_upload_parameters)
  57. }),
  58. #{
  59. required => true,
  60. desc => ?DESC(s3_upload),
  61. %% NOTE
  62. %% There seems to be no way to attach validators to union types, thus we
  63. %% have to attach a "common denominator" validator here.
  64. validator => validators(s3_upload_parameters)
  65. }
  66. ),
  67. #{
  68. resource_opts_ref => ?R_REF(s3_upload_resource_opts)
  69. }
  70. );
  71. fields(s3_direct_upload_parameters) ->
  72. emqx_s3_schema:fields(s3_upload) ++
  73. [
  74. {mode,
  75. hoconsc:mk(
  76. direct,
  77. #{
  78. required => true,
  79. desc => ?DESC(s3_direct_upload_mode)
  80. }
  81. )},
  82. {content,
  83. hoconsc:mk(
  84. emqx_schema:template(),
  85. #{
  86. required => false,
  87. default => <<"${.}">>,
  88. desc => ?DESC(s3_object_content)
  89. }
  90. )}
  91. ];
  92. fields(s3_aggregated_upload_parameters) ->
  93. lists:append([
  94. [
  95. {mode,
  96. hoconsc:mk(
  97. aggregated,
  98. #{
  99. required => true,
  100. desc => ?DESC(s3_aggregated_upload_mode)
  101. }
  102. )},
  103. {container,
  104. hoconsc:mk(
  105. mkunion(type, #{
  106. <<"csv">> => ?REF(s3_aggregated_container_csv)
  107. }),
  108. #{
  109. required => true,
  110. default => #{<<"type">> => <<"csv">>},
  111. desc => ?DESC(s3_aggregated_container)
  112. }
  113. )},
  114. {aggregation,
  115. hoconsc:mk(
  116. ?REF(s3_aggregation),
  117. #{
  118. required => true,
  119. desc => ?DESC(s3_aggregation)
  120. }
  121. )}
  122. ],
  123. emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
  124. {key, #{desc => ?DESC(s3_aggregated_upload_key)}}
  125. ]),
  126. emqx_s3_schema:fields(s3_uploader)
  127. ]);
  128. fields(s3_aggregated_container_csv) ->
  129. [
  130. {type,
  131. hoconsc:mk(
  132. csv,
  133. #{
  134. required => true,
  135. desc => ?DESC(s3_aggregated_container_csv)
  136. }
  137. )},
  138. {column_order,
  139. hoconsc:mk(
  140. hoconsc:array(string()),
  141. #{
  142. required => false,
  143. default => [],
  144. desc => ?DESC(s3_aggregated_container_csv_column_order)
  145. }
  146. )}
  147. ];
  148. fields(s3_aggregation) ->
  149. [
  150. %% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
  151. {time_interval,
  152. hoconsc:mk(
  153. emqx_schema:duration_s(),
  154. #{
  155. required => false,
  156. default => <<"30m">>,
  157. desc => ?DESC(s3_aggregation_interval)
  158. }
  159. )},
  160. {max_records,
  161. hoconsc:mk(
  162. pos_integer(),
  163. #{
  164. required => false,
  165. default => <<"100000">>,
  166. desc => ?DESC(s3_aggregation_max_records)
  167. }
  168. )}
  169. ];
  170. fields(s3_upload_resource_opts) ->
  171. %% NOTE: Aggregated action should benefit from generous batching defaults.
  172. emqx_bridge_v2_schema:action_resource_opts_fields([
  173. {batch_size, #{default => ?DEFAULT_AGGREG_BATCH_SIZE}},
  174. {batch_time, #{default => ?DEFAULT_AGGREG_BATCH_TIME}}
  175. ]).
  176. mkunion(Field, Schemas) ->
  177. hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end).
  178. scunion(_Field, Schemas, all_union_members) ->
  179. maps:values(Schemas);
  180. scunion(Field, Schemas, {value, Value}) ->
  181. Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined),
  182. case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of
  183. {ok, Schema} ->
  184. [Schema];
  185. _Error ->
  186. throw(#{field_name => Field, expected => maps:keys(Schemas)})
  187. end.
  188. desc(s3) ->
  189. ?DESC(s3_upload);
  190. desc(Name) when
  191. Name == s3_upload;
  192. Name == s3_direct_upload_parameters;
  193. Name == s3_aggregated_upload_parameters;
  194. Name == s3_aggregation;
  195. Name == s3_aggregated_container_csv
  196. ->
  197. ?DESC(Name);
  198. desc(s3_upload_resource_opts) ->
  199. ?DESC(emqx_resource_schema, resource_opts);
  200. desc(_Name) ->
  201. undefined.
  202. validators(s3_upload_parameters) ->
  203. emqx_s3_schema:validators(s3_uploader).
  204. convert_actions(Conf = #{}, Opts) ->
  205. maps:map(fun(_Name, ConfAction) -> convert_action(ConfAction, Opts) end, Conf);
  206. convert_actions(undefined, _) ->
  207. undefined.
  208. convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := ResourceOpts}, _) ->
  209. case Params of
  210. #{<<"mode">> := <<"direct">>} ->
  211. %% NOTE: Disable batching for direct uploads.
  212. NResourceOpts = ResourceOpts#{<<"batch_size">> => 1, <<"batch_time">> => 0},
  213. Conf#{<<"resource_opts">> := NResourceOpts};
  214. #{} ->
  215. Conf
  216. end.
  217. %% Interpreting options
  218. -spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
  219. mk_key_template(#{key := Key}) ->
  220. Template = emqx_template:parse(Key),
  221. {_, BindingErrors} = emqx_template:render(Template, #{}),
  222. {UsedBindings, _} = lists:unzip(BindingErrors),
  223. SuffixTemplate = mk_suffix_template(UsedBindings),
  224. case emqx_template:is_const(SuffixTemplate) of
  225. true ->
  226. Template;
  227. false ->
  228. Template ++ SuffixTemplate
  229. end.
  230. mk_suffix_template(UsedBindings) ->
  231. RequiredBindings = ["action", "node", "datetime.", "sequence"],
  232. SuffixBindings = [
  233. mk_default_binding(RB)
  234. || RB <- RequiredBindings,
  235. lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
  236. ],
  237. SuffixTemplate = [["/", B] || B <- SuffixBindings],
  238. emqx_template:parse(SuffixTemplate).
  239. mk_default_binding("datetime.") ->
  240. "${datetime.rfc3339utc}";
  241. mk_default_binding(Binding) ->
  242. "${" ++ Binding ++ "}".
  243. -spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options().
  244. mk_upload_options(Parameters) ->
  245. Headers = mk_upload_headers(Parameters),
  246. #{
  247. headers => Headers,
  248. acl => maps:get(acl, Parameters, undefined)
  249. }.
  250. mk_upload_headers(Parameters = #{container := Container}) ->
  251. Headers = normalize_headers(maps:get(headers, Parameters, #{})),
  252. ContainerHeaders = mk_container_headers(Container),
  253. maps:merge(ContainerHeaders, Headers).
  254. normalize_headers(Headers) ->
  255. maps:fold(
  256. fun(Header, Value, Acc) ->
  257. maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc)
  258. end,
  259. #{},
  260. Headers
  261. ).
  262. mk_container_headers(#{type := csv}) ->
  263. #{"content-type" => "text/csv"};
  264. mk_container_headers(#{}) ->
  265. #{}.
  266. %% Examples
  267. bridge_v2_examples(Method) ->
  268. [
  269. #{
  270. <<"s3">> => #{
  271. summary => <<"S3 Direct Upload">>,
  272. value => s3_upload_action_example(Method, direct)
  273. },
  274. <<"s3_aggreg">> => #{
  275. summary => <<"S3 Aggregated Upload">>,
  276. value => s3_upload_action_example(Method, aggreg)
  277. }
  278. }
  279. ].
  280. s3_upload_action_example(post, Mode) ->
  281. maps:merge(
  282. s3_upload_action_example(put, Mode),
  283. #{
  284. type => atom_to_binary(?ACTION_UPLOAD),
  285. name => <<"my_s3_action">>,
  286. enable => true,
  287. connector => <<"my_s3_connector">>
  288. }
  289. );
  290. s3_upload_action_example(get, Mode) ->
  291. maps:merge(
  292. s3_upload_action_example(put, Mode),
  293. #{
  294. enable => true,
  295. connector => <<"my_s3_connector">>,
  296. status => <<"connected">>,
  297. node_status => [
  298. #{
  299. node => <<"emqx@localhost">>,
  300. status => <<"connected">>
  301. }
  302. ]
  303. }
  304. );
  305. s3_upload_action_example(put, direct) ->
  306. #{
  307. description => <<"My upload action">>,
  308. parameters => #{
  309. mode => <<"direct">>,
  310. bucket => <<"${clientid}">>,
  311. key => <<"${topic}">>,
  312. content => <<"${payload}">>,
  313. acl => <<"public_read">>
  314. },
  315. resource_opts => #{
  316. query_mode => <<"sync">>,
  317. inflight_window => 10
  318. }
  319. };
  320. s3_upload_action_example(put, aggreg) ->
  321. #{
  322. description => <<"My aggregated upload action">>,
  323. parameters => #{
  324. mode => <<"aggregated">>,
  325. bucket => <<"mqtt-aggregated">>,
  326. key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>,
  327. acl => <<"public_read">>,
  328. aggregation => #{
  329. time_interval => <<"15m">>,
  330. max_records => 100_000
  331. },
  332. <<"container">> => #{
  333. type => <<"csv">>,
  334. column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>]
  335. }
  336. },
  337. resource_opts => #{
  338. health_check_interval => <<"10s">>,
  339. query_mode => <<"async">>,
  340. inflight_window => 100
  341. }
  342. }.