emqx_schema_validation_schema.erl 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_schema_validation_schema).
  5. -include_lib("typerefl/include/types.hrl").
  6. -include_lib("hocon/include/hoconsc.hrl").
  7. %% `hocon_schema' API
  8. -export([
  9. namespace/0,
  10. roots/0,
  11. fields/1
  12. ]).
  13. %% `minirest_trails' API
  14. -export([
  15. api_schema/1
  16. ]).
  17. %%------------------------------------------------------------------------------
  18. %% Type declarations
  19. %%------------------------------------------------------------------------------
  20. %%------------------------------------------------------------------------------
  21. %% `hocon_schema' API
  22. %%------------------------------------------------------------------------------
  23. namespace() -> schema_validation.
  24. roots() ->
  25. [{schema_validation, mk(ref(schema_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
  26. fields(schema_validation) ->
  27. [
  28. {validations,
  29. mk(
  30. hoconsc:array(ref(validation)),
  31. #{
  32. default => [],
  33. desc => ?DESC("validations"),
  34. validator => fun validate_unique_names/1
  35. }
  36. )}
  37. ];
  38. fields(validation) ->
  39. [
  40. {tags, emqx_schema:tags_schema()},
  41. {description, emqx_schema:description_schema()},
  42. {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
  43. {name,
  44. mk(
  45. binary(),
  46. #{
  47. required => true,
  48. validator => fun emqx_resource:validate_name/1,
  49. desc => ?DESC("name")
  50. }
  51. )},
  52. {topics,
  53. mk(
  54. hoconsc:union([binary(), hoconsc:array(binary())]),
  55. #{
  56. desc => ?DESC("topics"),
  57. converter => fun ensure_array/2,
  58. validator => fun validate_unique_topics/1,
  59. required => true
  60. }
  61. )},
  62. {strategy,
  63. mk(
  64. hoconsc:enum([any_pass, all_pass]),
  65. #{desc => ?DESC("strategy"), required => true}
  66. )},
  67. {failure_action,
  68. mk(
  69. hoconsc:enum([drop, disconnect, ignore]),
  70. #{desc => ?DESC("failure_action"), required => true}
  71. )},
  72. {log_failure,
  73. mk(
  74. ref(log_failure),
  75. #{desc => ?DESC("log_failure_at"), default => #{}}
  76. )},
  77. {checks,
  78. mk(
  79. hoconsc:array(
  80. hoconsc:union(fun checks_union_member_selector/1)
  81. ),
  82. #{
  83. required => true,
  84. desc => ?DESC("checks"),
  85. validator => fun validate_unique_schema_checks/1
  86. }
  87. )}
  88. ];
  89. fields(log_failure) ->
  90. [
  91. {level,
  92. mk(
  93. hoconsc:enum([error, warning, notice, info, debug, none]),
  94. #{desc => ?DESC("log_failure_at"), default => info}
  95. )}
  96. ];
  97. fields(check_sql) ->
  98. [
  99. {type, mk(sql, #{default => sql, desc => ?DESC("check_sql_type")})},
  100. {sql,
  101. mk(binary(), #{
  102. required => true,
  103. desc => ?DESC("check_sql_type"),
  104. validator => fun validate_sql/1
  105. })}
  106. ];
  107. fields(check_json) ->
  108. [
  109. {type, mk(json, #{default => json, desc => ?DESC("check_json_type")})},
  110. {schema, mk(binary(), #{required => true, desc => ?DESC("check_json_type")})}
  111. ];
  112. fields(check_protobuf) ->
  113. [
  114. {type, mk(protobuf, #{default => protobuf, desc => ?DESC("check_protobuf_type")})},
  115. {schema, mk(binary(), #{required => true, desc => ?DESC("check_protobuf_schema")})},
  116. {message_type,
  117. mk(binary(), #{required => true, desc => ?DESC("check_protobuf_message_type")})}
  118. ];
  119. fields(check_avro) ->
  120. [
  121. {type, mk(avro, #{default => avro, desc => ?DESC("check_avro_type")})},
  122. {schema, mk(binary(), #{required => true, desc => ?DESC("check_avro_schema")})}
  123. ].
  124. checks_union_member_selector(all_union_members) ->
  125. checks_refs();
  126. checks_union_member_selector({value, V}) ->
  127. checks_refs(V).
  128. checks_refs() ->
  129. [ref(CheckType) || CheckType <- check_types()].
  130. check_types() ->
  131. [
  132. check_sql,
  133. check_json,
  134. check_avro,
  135. check_protobuf
  136. ].
  137. checks_refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
  138. checks_refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
  139. checks_refs(#{<<"type">> := <<"sql">>}) ->
  140. [ref(check_sql)];
  141. checks_refs(#{<<"type">> := <<"json">>}) ->
  142. [ref(check_json)];
  143. checks_refs(#{<<"type">> := <<"avro">>}) ->
  144. [ref(check_avro)];
  145. checks_refs(#{<<"type">> := <<"protobuf">>}) ->
  146. [ref(check_protobuf)];
  147. checks_refs(_Value) ->
  148. Expected = lists:join(
  149. " | ",
  150. [
  151. Name
  152. || T <- check_types(),
  153. "check_" ++ Name <- [atom_to_list(T)]
  154. ]
  155. ),
  156. throw(#{
  157. field_name => type,
  158. expected => iolist_to_binary(Expected)
  159. }).
  160. %%------------------------------------------------------------------------------
  161. %% `minirest_trails' API
  162. %%------------------------------------------------------------------------------
  163. api_schema(list) ->
  164. hoconsc:array(ref(validation));
  165. api_schema(lookup) ->
  166. ref(validation);
  167. api_schema(post) ->
  168. ref(validation);
  169. api_schema(put) ->
  170. ref(validation).
  171. %%------------------------------------------------------------------------------
  172. %% Internal exports
  173. %%------------------------------------------------------------------------------
  174. %%------------------------------------------------------------------------------
  175. %% Internal fns
  176. %%------------------------------------------------------------------------------
  177. mk(Type, Meta) -> hoconsc:mk(Type, Meta).
  178. ref(Name) -> hoconsc:ref(?MODULE, Name).
  179. ensure_array(undefined, _) -> undefined;
  180. ensure_array(L, _) when is_list(L) -> L;
  181. ensure_array(B, _) -> [B].
  182. validate_sql(SQL) ->
  183. case emqx_schema_validation:parse_sql_check(SQL) of
  184. {ok, _Parsed} ->
  185. ok;
  186. Error = {error, _} ->
  187. Error
  188. end.
  189. validate_unique_names(Validations0) ->
  190. Validations = emqx_utils_maps:binary_key_map(Validations0),
  191. do_validate_unique_names(Validations, #{}).
  192. do_validate_unique_names(_Validations = [], _Acc) ->
  193. ok;
  194. do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(Name, Acc) ->
  195. {error, <<"duplicated name: ", Name/binary>>};
  196. do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
  197. do_validate_unique_names(Rest, Acc#{Name => true}).
  198. validate_unique_schema_checks([]) ->
  199. {error, "at least one check must be defined"};
  200. validate_unique_schema_checks(Checks) ->
  201. Seen = sets:new([{version, 2}]),
  202. Duplicated = sets:new([{version, 2}]),
  203. do_validate_unique_schema_checks(Checks, Seen, Duplicated).
  204. do_validate_unique_schema_checks(_Checks = [], _Seen, Duplicated) ->
  205. case sets:to_list(Duplicated) of
  206. [] ->
  207. ok;
  208. DuplicatedChecks0 ->
  209. DuplicatedChecks =
  210. lists:map(
  211. fun({Type, SerdeName}) ->
  212. [atom_to_binary(Type), ":", SerdeName]
  213. end,
  214. DuplicatedChecks0
  215. ),
  216. Msg = iolist_to_binary([
  217. <<"duplicated schema checks: ">>,
  218. lists:join(", ", DuplicatedChecks)
  219. ]),
  220. {error, Msg}
  221. end;
  222. do_validate_unique_schema_checks(
  223. [#{<<"type">> := Type, <<"schema">> := SerdeName} | Rest],
  224. Seen0,
  225. Duplicated0
  226. ) ->
  227. Check = {Type, SerdeName},
  228. case sets:is_element(Check, Seen0) of
  229. true ->
  230. Duplicated = sets:add_element(Check, Duplicated0),
  231. do_validate_unique_schema_checks(Rest, Seen0, Duplicated);
  232. false ->
  233. Seen = sets:add_element(Check, Seen0),
  234. do_validate_unique_schema_checks(Rest, Seen, Duplicated0)
  235. end;
  236. do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) ->
  237. do_validate_unique_schema_checks(Rest, Seen, Duplicated).
  238. validate_unique_topics([]) ->
  239. {error, <<"at least one topic filter must be defined">>};
  240. validate_unique_topics(Topics) ->
  241. Grouped = maps:groups_from_list(
  242. fun(T) -> T end,
  243. Topics
  244. ),
  245. DuplicatedMap = maps:filter(
  246. fun(_T, Ts) -> length(Ts) > 1 end,
  247. Grouped
  248. ),
  249. case maps:keys(DuplicatedMap) of
  250. [] ->
  251. ok;
  252. Duplicated ->
  253. Msg = iolist_to_binary([
  254. <<"duplicated topics: ">>,
  255. lists:join(", ", Duplicated)
  256. ]),
  257. {error, Msg}
  258. end.