emqx_schema_validation_tests.erl 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_schema_validation_tests).
  5. -include_lib("eunit/include/eunit.hrl").
  6. -define(VALIDATIONS_PATH, "schema_validation.validations").
  7. %%------------------------------------------------------------------------------
  8. %% Helper fns
  9. %%------------------------------------------------------------------------------
  10. parse_and_check(InnerConfigs) ->
  11. RootBin = <<"schema_validation">>,
  12. InnerBin = <<"validations">>,
  13. RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
  14. #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
  15. emqx_schema_validation_schema,
  16. RawConf,
  17. #{
  18. required => false,
  19. atom_key => false,
  20. make_serializable => false
  21. }
  22. ),
  23. Checked.
  24. validation(Name, Checks) ->
  25. validation(Name, Checks, _Overrides = #{}).
  26. validation(Name, Checks, Overrides) ->
  27. Default = #{
  28. <<"tags">> => [<<"some">>, <<"tags">>],
  29. <<"description">> => <<"my validation">>,
  30. <<"enable">> => true,
  31. <<"name">> => Name,
  32. <<"topics">> => <<"t/+">>,
  33. <<"strategy">> => <<"all_pass">>,
  34. <<"failure_action">> => <<"drop">>,
  35. <<"log_failure">> => #{<<"level">> => <<"warning">>},
  36. <<"checks">> => Checks
  37. },
  38. emqx_utils_maps:deep_merge(Default, Overrides).
  39. sql_check() ->
  40. sql_check(<<"select * where true">>).
  41. sql_check(SQL) ->
  42. #{
  43. <<"type">> => <<"sql">>,
  44. <<"sql">> => SQL
  45. }.
  46. schema_check(Type, SerdeName) ->
  47. schema_check(Type, SerdeName, _Overrides = #{}).
  48. schema_check(Type, SerdeName, Overrides) ->
  49. emqx_utils_maps:deep_merge(
  50. #{
  51. <<"type">> => emqx_utils_conv:bin(Type),
  52. <<"schema">> => SerdeName
  53. },
  54. Overrides
  55. ).
  56. eval_sql(Message, SQL) ->
  57. {ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
  58. Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
  59. emqx_schema_validation:evaluate_sql_check(Check, Validation, Message).
  60. message() ->
  61. message(_Opts = #{}).
  62. message(Opts) ->
  63. Defaults = #{
  64. id => emqx_guid:gen(),
  65. qos => 0,
  66. from => emqx_guid:to_hexstr(emqx_guid:gen()),
  67. flags => #{retain => false},
  68. headers => #{
  69. proto_ver => v5,
  70. properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
  71. },
  72. topic => <<"t/t">>,
  73. payload => emqx_utils_json:encode(#{value => 10}),
  74. timestamp => 1710272561615,
  75. extra => []
  76. },
  77. emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
  78. %%------------------------------------------------------------------------------
  79. %% Test cases
  80. %%------------------------------------------------------------------------------
  81. schema_test_() ->
  82. [
  83. {"topics is always a list 1",
  84. ?_assertMatch(
  85. [#{<<"topics">> := [<<"t/1">>]}],
  86. parse_and_check([
  87. validation(
  88. <<"foo">>,
  89. [sql_check()],
  90. #{<<"topics">> => <<"t/1">>}
  91. )
  92. ])
  93. )},
  94. {"topics is always a list 2",
  95. ?_assertMatch(
  96. [#{<<"topics">> := [<<"t/1">>]}],
  97. parse_and_check([
  98. validation(
  99. <<"foo">>,
  100. [sql_check()],
  101. #{<<"topics">> => [<<"t/1">>]}
  102. )
  103. ])
  104. )},
  105. {"foreach expression is not allowed",
  106. ?_assertThrow(
  107. {_Schema, [
  108. #{
  109. reason := foreach_not_allowed,
  110. kind := validation_error
  111. }
  112. ]},
  113. parse_and_check([
  114. validation(
  115. <<"foo">>,
  116. [sql_check(<<"foreach foo as f where true">>)]
  117. )
  118. ])
  119. )},
  120. {"from clause is not allowed",
  121. ?_assertThrow(
  122. {_Schema, [
  123. #{
  124. reason := non_empty_from_clause,
  125. kind := validation_error
  126. }
  127. ]},
  128. parse_and_check([
  129. validation(
  130. <<"foo">>,
  131. [sql_check(<<"select * from t">>)]
  132. )
  133. ])
  134. )},
  135. {"names are unique",
  136. ?_assertThrow(
  137. {_Schema, [
  138. #{
  139. reason := <<"duplicated name:", _/binary>>,
  140. path := ?VALIDATIONS_PATH,
  141. kind := validation_error
  142. }
  143. ]},
  144. parse_and_check([
  145. validation(<<"foo">>, [sql_check()]),
  146. validation(<<"foo">>, [sql_check()])
  147. ])
  148. )},
  149. {"checks must be non-empty",
  150. ?_assertThrow(
  151. {_Schema, [
  152. #{
  153. reason := "at least one check must be defined",
  154. kind := validation_error
  155. }
  156. ]},
  157. parse_and_check([
  158. validation(
  159. <<"foo">>,
  160. []
  161. )
  162. ])
  163. )},
  164. {"bogus check type",
  165. ?_assertThrow(
  166. {_Schema, [
  167. #{
  168. expected := <<"sql", _/binary>>,
  169. kind := validation_error,
  170. field_name := type
  171. }
  172. ]},
  173. parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
  174. )}
  175. ].
  176. invalid_names_test_() ->
  177. [
  178. {InvalidName,
  179. ?_assertThrow(
  180. {_Schema, [
  181. #{
  182. kind := validation_error,
  183. path := "schema_validation.validations.1.name"
  184. }
  185. ]},
  186. parse_and_check([validation(InvalidName, [sql_check()])])
  187. )}
  188. || InvalidName <- [
  189. <<"">>,
  190. <<"_name">>,
  191. <<"name$">>,
  192. <<"name!">>,
  193. <<"some name">>,
  194. <<"nãme"/utf8>>,
  195. <<"test_哈哈"/utf8>>,
  196. %% long name
  197. binary:copy(<<"a">>, 256)
  198. ]
  199. ].
  200. check_test_() ->
  201. [
  202. {"denied by payload 1",
  203. ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
  204. {"denied by payload 2",
  205. ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
  206. {"allowed by payload 1",
  207. ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
  208. {"allowed by payload 2",
  209. ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
  210. {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
  211. {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
  212. {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
  213. {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
  214. {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
  215. ].
  216. duplicated_check_test_() ->
  217. [
  218. {"duplicated topics 1",
  219. ?_assertThrow(
  220. {_Schema, [
  221. #{
  222. reason := <<"duplicated topics: t/1">>,
  223. kind := validation_error,
  224. path := "schema_validation.validations.1.topics"
  225. }
  226. ]},
  227. parse_and_check([
  228. validation(
  229. <<"foo">>,
  230. [schema_check(json, <<"a">>)],
  231. #{<<"topics">> => [<<"t/1">>, <<"t/1">>]}
  232. )
  233. ])
  234. )},
  235. {"duplicated topics 2",
  236. ?_assertThrow(
  237. {_Schema, [
  238. #{
  239. reason := <<"duplicated topics: t/1">>,
  240. kind := validation_error,
  241. path := "schema_validation.validations.1.topics"
  242. }
  243. ]},
  244. parse_and_check([
  245. validation(
  246. <<"foo">>,
  247. [schema_check(json, <<"a">>)],
  248. #{<<"topics">> => [<<"t/1">>, <<"t/#">>, <<"t/1">>]}
  249. )
  250. ])
  251. )},
  252. {"duplicated topics 3",
  253. ?_assertThrow(
  254. {_Schema, [
  255. #{
  256. reason := <<"duplicated topics: t/1, t/2">>,
  257. kind := validation_error,
  258. path := "schema_validation.validations.1.topics"
  259. }
  260. ]},
  261. parse_and_check([
  262. validation(
  263. <<"foo">>,
  264. [schema_check(json, <<"a">>)],
  265. #{
  266. <<"topics">> => [
  267. <<"t/1">>,
  268. <<"t/#">>,
  269. <<"t/1">>,
  270. <<"t/2">>,
  271. <<"t/2">>
  272. ]
  273. }
  274. )
  275. ])
  276. )},
  277. {"duplicated sql checks are not checked",
  278. ?_assertMatch(
  279. [#{<<"checks">> := [_, _]}],
  280. parse_and_check([
  281. validation(<<"foo">>, [sql_check(), sql_check()])
  282. ])
  283. )},
  284. {"different serdes with same name",
  285. ?_assertMatch(
  286. [#{<<"checks">> := [_, _, _]}],
  287. parse_and_check([
  288. validation(<<"foo">>, [
  289. schema_check(json, <<"a">>),
  290. schema_check(avro, <<"a">>),
  291. schema_check(
  292. protobuf,
  293. <<"a">>,
  294. #{<<"message_type">> => <<"a">>}
  295. )
  296. ])
  297. ])
  298. )},
  299. {"duplicated serdes 1",
  300. ?_assertThrow(
  301. {_Schema, [
  302. #{
  303. reason := <<"duplicated schema checks: json:a">>,
  304. kind := validation_error,
  305. path := "schema_validation.validations.1.checks"
  306. }
  307. ]},
  308. parse_and_check([
  309. validation(<<"foo">>, [
  310. schema_check(json, <<"a">>),
  311. schema_check(json, <<"a">>)
  312. ])
  313. ])
  314. )},
  315. {"duplicated serdes 2",
  316. ?_assertThrow(
  317. {_Schema, [
  318. #{
  319. reason := <<"duplicated schema checks: json:a">>,
  320. kind := validation_error,
  321. path := "schema_validation.validations.1.checks"
  322. }
  323. ]},
  324. parse_and_check([
  325. validation(<<"foo">>, [
  326. schema_check(json, <<"a">>),
  327. sql_check(),
  328. schema_check(json, <<"a">>)
  329. ])
  330. ])
  331. )},
  332. {"duplicated serdes 3",
  333. ?_assertThrow(
  334. {_Schema, [
  335. #{
  336. reason := <<"duplicated schema checks: json:a">>,
  337. kind := validation_error,
  338. path := "schema_validation.validations.1.checks"
  339. }
  340. ]},
  341. parse_and_check([
  342. validation(<<"foo">>, [
  343. schema_check(json, <<"a">>),
  344. schema_check(json, <<"a">>),
  345. sql_check()
  346. ])
  347. ])
  348. )},
  349. {"duplicated serdes 4",
  350. ?_assertThrow(
  351. {_Schema, [
  352. #{
  353. reason := <<"duplicated schema checks: json:a">>,
  354. kind := validation_error,
  355. path := "schema_validation.validations.1.checks"
  356. }
  357. ]},
  358. parse_and_check([
  359. validation(<<"foo">>, [
  360. schema_check(json, <<"a">>),
  361. schema_check(json, <<"a">>),
  362. schema_check(json, <<"a">>)
  363. ])
  364. ])
  365. )},
  366. {"duplicated serdes 4",
  367. ?_assertThrow(
  368. {_Schema, [
  369. #{
  370. reason := <<"duplicated schema checks: ", _/binary>>,
  371. kind := validation_error,
  372. path := "schema_validation.validations.1.checks"
  373. }
  374. ]},
  375. parse_and_check([
  376. validation(<<"foo">>, [
  377. schema_check(json, <<"a">>),
  378. schema_check(json, <<"a">>),
  379. schema_check(avro, <<"b">>),
  380. schema_check(avro, <<"b">>)
  381. ])
  382. ])
  383. )}
  384. ].