emqx_schema_validation_tests.erl 14 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_schema_validation_tests).
  5. -include_lib("eunit/include/eunit.hrl").
  6. -define(VALIDATIONS_PATH, "schema_validation.validations").
  7. %%------------------------------------------------------------------------------
  8. %% Helper fns
  9. %%------------------------------------------------------------------------------
  10. parse_and_check(InnerConfigs) ->
  11. RootBin = <<"schema_validation">>,
  12. InnerBin = <<"validations">>,
  13. RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
  14. #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
  15. emqx_schema_validation_schema,
  16. RawConf,
  17. #{
  18. required => false,
  19. atom_key => false,
  20. make_serializable => false
  21. }
  22. ),
  23. Checked.
  24. validation(Name, Checks) ->
  25. validation(Name, Checks, _Overrides = #{}).
  26. validation(Name, Checks, Overrides) ->
  27. Default = #{
  28. <<"tags">> => [<<"some">>, <<"tags">>],
  29. <<"description">> => <<"my validation">>,
  30. <<"enable">> => true,
  31. <<"name">> => Name,
  32. <<"topics">> => <<"t/+">>,
  33. <<"strategy">> => <<"all_pass">>,
  34. <<"failure_action">> => <<"drop">>,
  35. <<"log_failure">> => #{<<"level">> => <<"warning">>},
  36. <<"checks">> => Checks
  37. },
  38. emqx_utils_maps:deep_merge(Default, Overrides).
  39. sql_check() ->
  40. sql_check(<<"select * where true">>).
  41. sql_check(SQL) ->
  42. #{
  43. <<"type">> => <<"sql">>,
  44. <<"sql">> => SQL
  45. }.
  46. schema_check(Type, SerdeName) ->
  47. schema_check(Type, SerdeName, _Overrides = #{}).
  48. schema_check(Type, SerdeName, Overrides) ->
  49. emqx_utils_maps:deep_merge(
  50. #{
  51. <<"type">> => emqx_utils_conv:bin(Type),
  52. <<"schema">> => SerdeName
  53. },
  54. Overrides
  55. ).
  56. eval_sql(Message, SQL) ->
  57. {ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
  58. Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
  59. emqx_schema_validation:evaluate_sql_check(Check, Validation, Message).
  60. message() ->
  61. message(_Opts = #{}).
  62. message(Opts) ->
  63. Defaults = #{
  64. id => emqx_guid:gen(),
  65. qos => 0,
  66. from => emqx_guid:to_hexstr(emqx_guid:gen()),
  67. flags => #{retain => false},
  68. headers => #{
  69. proto_ver => v5,
  70. properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
  71. },
  72. topic => <<"t/t">>,
  73. payload => emqx_utils_json:encode(#{value => 10}),
  74. timestamp => 1710272561615,
  75. extra => []
  76. },
  77. emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
  78. %%------------------------------------------------------------------------------
  79. %% Test cases
  80. %%------------------------------------------------------------------------------
  81. schema_test_() ->
  82. [
  83. {"topics is always a list 1",
  84. ?_assertMatch(
  85. [#{<<"topics">> := [<<"t/1">>]}],
  86. parse_and_check([
  87. validation(
  88. <<"foo">>,
  89. [sql_check()],
  90. #{<<"topics">> => <<"t/1">>}
  91. )
  92. ])
  93. )},
  94. {"topics is always a list 2",
  95. ?_assertMatch(
  96. [#{<<"topics">> := [<<"t/1">>]}],
  97. parse_and_check([
  98. validation(
  99. <<"foo">>,
  100. [sql_check()],
  101. #{<<"topics">> => [<<"t/1">>]}
  102. )
  103. ])
  104. )},
  105. {"topics must be non-empty",
  106. ?_assertThrow(
  107. {_Schema, [
  108. #{
  109. reason := <<"at least one topic filter must be defined", _/binary>>,
  110. value := [],
  111. kind := validation_error
  112. }
  113. ]},
  114. parse_and_check([
  115. validation(<<"foo">>, [sql_check()], #{<<"topics">> => []})
  116. ])
  117. )},
  118. {"foreach expression is not allowed",
  119. ?_assertThrow(
  120. {_Schema, [
  121. #{
  122. reason := foreach_not_allowed,
  123. kind := validation_error
  124. }
  125. ]},
  126. parse_and_check([
  127. validation(
  128. <<"foo">>,
  129. [sql_check(<<"foreach foo as f where true">>)]
  130. )
  131. ])
  132. )},
  133. {"from clause is not allowed",
  134. ?_assertThrow(
  135. {_Schema, [
  136. #{
  137. reason := non_empty_from_clause,
  138. kind := validation_error
  139. }
  140. ]},
  141. parse_and_check([
  142. validation(
  143. <<"foo">>,
  144. [sql_check(<<"select * from t">>)]
  145. )
  146. ])
  147. )},
  148. {"names are unique",
  149. ?_assertThrow(
  150. {_Schema, [
  151. #{
  152. reason := <<"duplicated name:", _/binary>>,
  153. path := ?VALIDATIONS_PATH,
  154. kind := validation_error
  155. }
  156. ]},
  157. parse_and_check([
  158. validation(<<"foo">>, [sql_check()]),
  159. validation(<<"foo">>, [sql_check()])
  160. ])
  161. )},
  162. {"checks must be non-empty",
  163. ?_assertThrow(
  164. {_Schema, [
  165. #{
  166. reason := "at least one check must be defined",
  167. kind := validation_error
  168. }
  169. ]},
  170. parse_and_check([
  171. validation(
  172. <<"foo">>,
  173. []
  174. )
  175. ])
  176. )},
  177. {"bogus check type",
  178. ?_assertThrow(
  179. {_Schema, [
  180. #{
  181. expected := <<"sql", _/binary>>,
  182. kind := validation_error,
  183. field_name := type
  184. }
  185. ]},
  186. parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
  187. )}
  188. ].
  189. invalid_names_test_() ->
  190. [
  191. {InvalidName,
  192. ?_assertThrow(
  193. {_Schema, [
  194. #{
  195. kind := validation_error,
  196. path := "schema_validation.validations.1.name"
  197. }
  198. ]},
  199. parse_and_check([validation(InvalidName, [sql_check()])])
  200. )}
  201. || InvalidName <- [
  202. <<"">>,
  203. <<"_name">>,
  204. <<"name$">>,
  205. <<"name!">>,
  206. <<"some name">>,
  207. <<"nãme"/utf8>>,
  208. <<"test_哈哈"/utf8>>,
  209. %% long name
  210. binary:copy(<<"a">>, 256)
  211. ]
  212. ].
  213. check_test_() ->
  214. [
  215. {"denied by payload 1",
  216. ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
  217. {"denied by payload 2",
  218. ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
  219. {"allowed by payload 1",
  220. ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
  221. {"allowed by payload 2",
  222. ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
  223. {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
  224. {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
  225. {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
  226. {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
  227. {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
  228. ].
  229. duplicated_check_test_() ->
  230. [
  231. {"duplicated topics 1",
  232. ?_assertThrow(
  233. {_Schema, [
  234. #{
  235. reason := <<"duplicated topics: t/1">>,
  236. kind := validation_error,
  237. path := "schema_validation.validations.1.topics"
  238. }
  239. ]},
  240. parse_and_check([
  241. validation(
  242. <<"foo">>,
  243. [schema_check(json, <<"a">>)],
  244. #{<<"topics">> => [<<"t/1">>, <<"t/1">>]}
  245. )
  246. ])
  247. )},
  248. {"duplicated topics 2",
  249. ?_assertThrow(
  250. {_Schema, [
  251. #{
  252. reason := <<"duplicated topics: t/1">>,
  253. kind := validation_error,
  254. path := "schema_validation.validations.1.topics"
  255. }
  256. ]},
  257. parse_and_check([
  258. validation(
  259. <<"foo">>,
  260. [schema_check(json, <<"a">>)],
  261. #{<<"topics">> => [<<"t/1">>, <<"t/#">>, <<"t/1">>]}
  262. )
  263. ])
  264. )},
  265. {"duplicated topics 3",
  266. ?_assertThrow(
  267. {_Schema, [
  268. #{
  269. reason := <<"duplicated topics: t/1, t/2">>,
  270. kind := validation_error,
  271. path := "schema_validation.validations.1.topics"
  272. }
  273. ]},
  274. parse_and_check([
  275. validation(
  276. <<"foo">>,
  277. [schema_check(json, <<"a">>)],
  278. #{
  279. <<"topics">> => [
  280. <<"t/1">>,
  281. <<"t/#">>,
  282. <<"t/1">>,
  283. <<"t/2">>,
  284. <<"t/2">>
  285. ]
  286. }
  287. )
  288. ])
  289. )},
  290. {"duplicated sql checks are not checked",
  291. ?_assertMatch(
  292. [#{<<"checks">> := [_, _]}],
  293. parse_and_check([
  294. validation(<<"foo">>, [sql_check(), sql_check()])
  295. ])
  296. )},
  297. {"different serdes with same name",
  298. ?_assertMatch(
  299. [#{<<"checks">> := [_, _, _]}],
  300. parse_and_check([
  301. validation(<<"foo">>, [
  302. schema_check(json, <<"a">>),
  303. schema_check(avro, <<"a">>),
  304. schema_check(
  305. protobuf,
  306. <<"a">>,
  307. #{<<"message_type">> => <<"a">>}
  308. )
  309. ])
  310. ])
  311. )},
  312. {"duplicated serdes 1",
  313. ?_assertThrow(
  314. {_Schema, [
  315. #{
  316. reason := <<"duplicated schema checks: json:a">>,
  317. kind := validation_error,
  318. path := "schema_validation.validations.1.checks"
  319. }
  320. ]},
  321. parse_and_check([
  322. validation(<<"foo">>, [
  323. schema_check(json, <<"a">>),
  324. schema_check(json, <<"a">>)
  325. ])
  326. ])
  327. )},
  328. {"duplicated serdes 2",
  329. ?_assertThrow(
  330. {_Schema, [
  331. #{
  332. reason := <<"duplicated schema checks: json:a">>,
  333. kind := validation_error,
  334. path := "schema_validation.validations.1.checks"
  335. }
  336. ]},
  337. parse_and_check([
  338. validation(<<"foo">>, [
  339. schema_check(json, <<"a">>),
  340. sql_check(),
  341. schema_check(json, <<"a">>)
  342. ])
  343. ])
  344. )},
  345. {"duplicated serdes 3",
  346. ?_assertThrow(
  347. {_Schema, [
  348. #{
  349. reason := <<"duplicated schema checks: json:a">>,
  350. kind := validation_error,
  351. path := "schema_validation.validations.1.checks"
  352. }
  353. ]},
  354. parse_and_check([
  355. validation(<<"foo">>, [
  356. schema_check(json, <<"a">>),
  357. schema_check(json, <<"a">>),
  358. sql_check()
  359. ])
  360. ])
  361. )},
  362. {"duplicated serdes 4",
  363. ?_assertThrow(
  364. {_Schema, [
  365. #{
  366. reason := <<"duplicated schema checks: json:a">>,
  367. kind := validation_error,
  368. path := "schema_validation.validations.1.checks"
  369. }
  370. ]},
  371. parse_and_check([
  372. validation(<<"foo">>, [
  373. schema_check(json, <<"a">>),
  374. schema_check(json, <<"a">>),
  375. schema_check(json, <<"a">>)
  376. ])
  377. ])
  378. )},
  379. {"duplicated serdes 4",
  380. ?_assertThrow(
  381. {_Schema, [
  382. #{
  383. reason := <<"duplicated schema checks: ", _/binary>>,
  384. kind := validation_error,
  385. path := "schema_validation.validations.1.checks"
  386. }
  387. ]},
  388. parse_and_check([
  389. validation(<<"foo">>, [
  390. schema_check(json, <<"a">>),
  391. schema_check(json, <<"a">>),
  392. schema_check(avro, <<"b">>),
  393. schema_check(avro, <<"b">>)
  394. ])
  395. ])
  396. )}
  397. ].