emqx_ft_schema.erl 12 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_schema).
  17. -behaviour(hocon_schema).
  18. -include_lib("hocon/include/hoconsc.hrl").
  19. -include_lib("typerefl/include/types.hrl").
  20. -export([namespace/0, roots/0, fields/1, tags/0, desc/1]).
  21. -export([schema/1]).
  22. %% Utilities
  23. -export([backend/1, encode/2, decode/2]).
  24. %% Test-only helpers
  25. -export([translate/1]).
  26. -type json_value() ::
  27. null
  28. | boolean()
  29. | binary()
  30. | number()
  31. | [json_value()]
  32. | #{binary() => json_value()}.
  33. -reflect_type([json_value/0]).
  34. %% NOTE
  35. %% This is rather conservative limit, mostly dictated by the filename limitations
  36. %% on most filesystems. Even though, say, S3 does not have such limitations, it's
  37. %% still useful to have a limit on the filename length, to avoid having to deal with
  38. %% limits in the storage backends.
  39. %% Usual realistic limit is 255 bytes actually, but we leave some room for backends
  40. %% to spare.
  41. -define(MAX_FILENAME_BYTELEN, 240).
  42. -import(hoconsc, [ref/2, mk/2]).
  43. namespace() -> file_transfer.
  44. tags() ->
  45. [<<"File Transfer">>].
  46. roots() -> [file_transfer].
  47. fields(file_transfer) ->
  48. [
  49. {enable,
  50. mk(
  51. boolean(),
  52. #{
  53. desc => ?DESC("enable"),
  54. required => false,
  55. default => false
  56. }
  57. )},
  58. {init_timeout,
  59. mk(
  60. emqx_schema:timeout_duration_ms(),
  61. #{
  62. desc => ?DESC("init_timeout"),
  63. required => false,
  64. importance => ?IMPORTANCE_LOW,
  65. default => <<"10s">>
  66. }
  67. )},
  68. {store_segment_timeout,
  69. mk(
  70. emqx_schema:timeout_duration_ms(),
  71. #{
  72. desc => ?DESC("store_segment_timeout"),
  73. required => false,
  74. importance => ?IMPORTANCE_LOW,
  75. default => <<"5m">>
  76. }
  77. )},
  78. {assemble_timeout,
  79. mk(
  80. emqx_schema:timeout_duration_ms(),
  81. #{
  82. desc => ?DESC("assemble_timeout"),
  83. required => false,
  84. importance => ?IMPORTANCE_LOW,
  85. default => <<"5m">>
  86. }
  87. )},
  88. {storage,
  89. mk(
  90. ref(storage_backend),
  91. #{
  92. desc => ?DESC("storage_backend"),
  93. required => false,
  94. validator => validator(backend),
  95. default => #{
  96. <<"local">> => #{}
  97. }
  98. }
  99. )}
  100. ];
  101. fields(storage_backend) ->
  102. [
  103. {local,
  104. mk(
  105. ref(local_storage),
  106. #{
  107. desc => ?DESC("local_storage"),
  108. required => {false, recursively}
  109. }
  110. )}
  111. ];
  112. fields(local_storage) ->
  113. [
  114. {segments,
  115. mk(
  116. ref(local_storage_segments),
  117. #{
  118. desc => ?DESC("local_storage_segments"),
  119. required => false,
  120. default => #{
  121. <<"gc">> => #{}
  122. }
  123. }
  124. )},
  125. {exporter,
  126. mk(
  127. ref(local_storage_exporter_backend),
  128. #{
  129. desc => ?DESC("local_storage_exporter_backend"),
  130. required => false,
  131. validator => validator(backend),
  132. default => #{
  133. <<"local">> => #{}
  134. }
  135. }
  136. )}
  137. ] ++ common_backend_fields();
  138. fields(local_storage_segments) ->
  139. [
  140. {root,
  141. mk(
  142. string(),
  143. #{
  144. desc => ?DESC("local_storage_segments_root"),
  145. required => false
  146. }
  147. )},
  148. {gc,
  149. mk(
  150. ref(local_storage_segments_gc), #{
  151. desc => ?DESC("local_storage_segments_gc"),
  152. required => false
  153. }
  154. )}
  155. ];
  156. fields(local_storage_exporter_backend) ->
  157. [
  158. {local,
  159. mk(
  160. ref(local_storage_exporter),
  161. #{
  162. desc => ?DESC("local_storage_exporter"),
  163. required => {false, recursively}
  164. }
  165. )},
  166. {s3,
  167. mk(
  168. ref(s3_exporter),
  169. #{
  170. desc => ?DESC("s3_exporter"),
  171. required => {false, recursively}
  172. }
  173. )}
  174. ];
  175. fields(local_storage_exporter) ->
  176. [
  177. {root,
  178. mk(
  179. string(),
  180. #{
  181. desc => ?DESC("local_storage_exporter_root"),
  182. required => false
  183. }
  184. )}
  185. ] ++ common_backend_fields();
  186. fields(s3_exporter) ->
  187. emqx_s3_schema:fields(s3) ++ common_backend_fields();
  188. fields(local_storage_segments_gc) ->
  189. [
  190. {interval,
  191. mk(
  192. emqx_schema:timeout_duration_ms(),
  193. #{
  194. desc => ?DESC("storage_gc_interval"),
  195. required => false,
  196. default => <<"1h">>
  197. }
  198. )},
  199. {maximum_segments_ttl,
  200. mk(
  201. %% not used in a `receive ... after' block, just timestamp comparison
  202. emqx_schema:duration_s(),
  203. #{
  204. desc => ?DESC("storage_gc_max_segments_ttl"),
  205. required => false,
  206. default => <<"24h">>
  207. }
  208. )},
  209. {minimum_segments_ttl,
  210. mk(
  211. %% not used in a `receive ... after' block, just timestamp comparison
  212. emqx_schema:duration_s(),
  213. #{
  214. desc => ?DESC("storage_gc_min_segments_ttl"),
  215. required => false,
  216. default => <<"5m">>,
  217. % NOTE
  218. % This setting does not seem to be useful to an end-user.
  219. hidden => true
  220. }
  221. )}
  222. ].
  223. common_backend_fields() ->
  224. [
  225. {enable,
  226. mk(
  227. boolean(), #{
  228. desc => ?DESC("backend_enable"),
  229. required => false,
  230. default => true
  231. }
  232. )}
  233. ].
  234. desc(file_transfer) ->
  235. "File transfer settings";
  236. desc(local_storage) ->
  237. "File transfer local storage settings";
  238. desc(local_storage_segments) ->
  239. "File transfer local segments storage settings";
  240. desc(local_storage_exporter) ->
  241. "Local Exporter settings for the File transfer local storage backend";
  242. desc(s3_exporter) ->
  243. "S3 Exporter settings for the File transfer local storage backend";
  244. desc(local_storage_segments_gc) ->
  245. "Garbage collection settings for the File transfer local segments storage";
  246. desc(local_storage_exporter_backend) ->
  247. "Exporter for the local file system storage backend";
  248. desc(storage_backend) ->
  249. "Storage backend settings for file transfer";
  250. desc(_) ->
  251. undefined.
  252. schema(filemeta) ->
  253. #{
  254. roots => [
  255. {name,
  256. hoconsc:mk(string(), #{
  257. required => true,
  258. validator => validator(filename),
  259. converter => converter(unicode_string)
  260. })},
  261. {size, hoconsc:mk(non_neg_integer())},
  262. {expire_at, hoconsc:mk(non_neg_integer())},
  263. {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
  264. {segments_ttl, hoconsc:mk(pos_integer())},
  265. {user_data, hoconsc:mk(json_value())}
  266. ]
  267. };
  268. schema(command_response) ->
  269. #{
  270. roots => [
  271. {vsn, hoconsc:mk(string(), #{default => <<"0.1">>})},
  272. {topic, hoconsc:mk(string())},
  273. {packet_id, hoconsc:mk(pos_integer())},
  274. {reason_code, hoconsc:mk(non_neg_integer())},
  275. {reason_description, hoconsc:mk(binary())}
  276. ]
  277. }.
  278. validator(filename) ->
  279. [
  280. fun(Value) ->
  281. Bin = unicode:characters_to_binary(Value),
  282. byte_size(Bin) =< ?MAX_FILENAME_BYTELEN orelse {error, max_length_exceeded}
  283. end,
  284. fun emqx_ft_fs_util:is_filename_safe/1
  285. ];
  286. validator(backend) ->
  287. fun(Config) ->
  288. Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
  289. case maps:to_list(Enabled) of
  290. [{_Type, _BackendConfig}] ->
  291. ok;
  292. _Conflicts = [_ | _] ->
  293. {error, multiple_enabled_backends};
  294. _None = [] ->
  295. {error, no_enabled_backend}
  296. end
  297. end.
  298. converter(checksum) ->
  299. fun
  300. (undefined, #{}) ->
  301. undefined;
  302. ({sha256, Bin}, #{make_serializable := true}) ->
  303. _ = is_binary(Bin) orelse throw({expected_type, string}),
  304. _ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}),
  305. binary:encode_hex(Bin);
  306. (Hex, #{}) ->
  307. _ = is_binary(Hex) orelse throw({expected_type, string}),
  308. _ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}),
  309. {sha256, binary:decode_hex(Hex)}
  310. end;
  311. converter(unicode_string) ->
  312. fun
  313. (undefined, #{}) ->
  314. undefined;
  315. (Str, #{make_serializable := true}) ->
  316. _ = is_list(Str) orelse throw({expected_type, string}),
  317. unicode:characters_to_binary(Str);
  318. (Str, #{}) ->
  319. _ = is_binary(Str) orelse throw({expected_type, string}),
  320. unicode:characters_to_list(Str)
  321. end.
  322. ref(Ref) ->
  323. ref(?MODULE, Ref).
  324. %% Utilities
  325. -spec backend(emqx_config:config()) ->
  326. {_Type :: atom(), emqx_config:config()}.
  327. backend(Config) ->
  328. catch maps:foreach(fun emit_enabled/2, Config).
  329. -spec emit_enabled(atom(), emqx_config:config()) ->
  330. no_return().
  331. emit_enabled(Type, BConf = #{enable := Enabled}) ->
  332. Enabled andalso throw({Type, BConf}).
  333. decode(SchemaName, Payload) when is_binary(Payload) ->
  334. case emqx_utils_json:safe_decode(Payload, [return_maps]) of
  335. {ok, Map} ->
  336. decode(SchemaName, Map);
  337. {error, Error} ->
  338. {error, {invalid_filemeta_json, Error}}
  339. end;
  340. decode(SchemaName, Map) when is_map(Map) ->
  341. Schema = schema(SchemaName),
  342. try
  343. Meta = hocon_tconf:check_plain(Schema, Map, #{atom_key => true, required => false}),
  344. {ok, Meta}
  345. catch
  346. throw:{_Schema, Errors} ->
  347. {error, {invalid_filemeta, Errors}}
  348. end.
  349. encode(SchemaName, Map = #{}) ->
  350. Schema = schema(SchemaName),
  351. hocon_tconf:make_serializable(Schema, emqx_utils_maps:binary_key_map(Map), #{}).
  352. %% Test-only helpers
  353. -spec translate(emqx_config:raw_config()) ->
  354. emqx_config:config().
  355. translate(Conf) ->
  356. [Root] = roots(),
  357. RootRaw = atom_to_binary(Root),
  358. ConfChecked = hocon_tconf:check_plain(?MODULE, #{RootRaw => Conf}, #{}, [Root]),
  359. emqx_utils_maps:unsafe_atom_key_map(maps:get(RootRaw, ConfChecked)).