emqx_message_validation_tests.erl 14 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_message_validation_tests).
  5. -include_lib("eunit/include/eunit.hrl").
  6. -define(VALIDATIONS_PATH, "message_validation.validations").
  7. %%------------------------------------------------------------------------------
  8. %% Helper fns
  9. %%------------------------------------------------------------------------------
  10. parse_and_check(InnerConfigs) ->
  11. RootBin = <<"message_validation">>,
  12. InnerBin = <<"validations">>,
  13. RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
  14. #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
  15. emqx_message_validation_schema,
  16. RawConf,
  17. #{
  18. required => false,
  19. atom_key => false,
  20. make_serializable => false
  21. }
  22. ),
  23. Checked.
  24. validation(Name, Checks) ->
  25. validation(Name, Checks, _Overrides = #{}).
  26. validation(Name, Checks, Overrides) ->
  27. Default = #{
  28. <<"tags">> => [<<"some">>, <<"tags">>],
  29. <<"description">> => <<"my validation">>,
  30. <<"enable">> => true,
  31. <<"name">> => Name,
  32. <<"topics">> => <<"t/+">>,
  33. <<"strategy">> => <<"all_pass">>,
  34. <<"failure_action">> => <<"drop">>,
  35. <<"log_failure">> => #{<<"level">> => <<"warning">>},
  36. <<"checks">> => Checks
  37. },
  38. emqx_utils_maps:deep_merge(Default, Overrides).
  39. sql_check() ->
  40. sql_check(<<"select * where true">>).
  41. sql_check(SQL) ->
  42. #{
  43. <<"type">> => <<"sql">>,
  44. <<"sql">> => SQL
  45. }.
  46. schema_check(Type, SerdeName) ->
  47. schema_check(Type, SerdeName, _Overrides = #{}).
  48. schema_check(Type, SerdeName, Overrides) ->
  49. emqx_utils_maps:deep_merge(
  50. #{
  51. <<"type">> => emqx_utils_conv:bin(Type),
  52. <<"schema">> => SerdeName
  53. },
  54. Overrides
  55. ).
  56. eval_sql(Message, SQL) ->
  57. {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
  58. Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
  59. emqx_message_validation:evaluate_sql_check(Check, Validation, Message).
  60. message() ->
  61. message(_Opts = #{}).
  62. message(Opts) ->
  63. Defaults = #{
  64. id => emqx_guid:gen(),
  65. qos => 0,
  66. from => emqx_guid:to_hexstr(emqx_guid:gen()),
  67. flags => #{retain => false},
  68. headers => #{
  69. proto_ver => v5,
  70. properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
  71. },
  72. topic => <<"t/t">>,
  73. payload => emqx_utils_json:encode(#{value => 10}),
  74. timestamp => 1710272561615,
  75. extra => []
  76. },
  77. emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
  78. %%------------------------------------------------------------------------------
  79. %% Test cases
  80. %%------------------------------------------------------------------------------
  81. schema_test_() ->
  82. [
  83. {"topics is always a list 1",
  84. ?_assertMatch(
  85. [#{<<"topics">> := [<<"t/1">>]}],
  86. parse_and_check([
  87. validation(
  88. <<"foo">>,
  89. [sql_check()],
  90. #{<<"topics">> => <<"t/1">>}
  91. )
  92. ])
  93. )},
  94. {"topics is always a list 2",
  95. ?_assertMatch(
  96. [#{<<"topics">> := [<<"t/1">>]}],
  97. parse_and_check([
  98. validation(
  99. <<"foo">>,
  100. [sql_check()],
  101. #{<<"topics">> => [<<"t/1">>]}
  102. )
  103. ])
  104. )},
  105. {"foreach expression is not allowed",
  106. ?_assertThrow(
  107. {_Schema, [
  108. #{
  109. reason := foreach_not_allowed,
  110. kind := validation_error
  111. }
  112. ]},
  113. parse_and_check([
  114. validation(
  115. <<"foo">>,
  116. [sql_check(<<"foreach foo as f where true">>)]
  117. )
  118. ])
  119. )},
  120. {"from clause is not allowed",
  121. ?_assertThrow(
  122. {_Schema, [
  123. #{
  124. reason := non_empty_from_clause,
  125. kind := validation_error
  126. }
  127. ]},
  128. parse_and_check([
  129. validation(
  130. <<"foo">>,
  131. [sql_check(<<"select * from t">>)]
  132. )
  133. ])
  134. )},
  135. {"names are unique",
  136. ?_assertThrow(
  137. {_Schema, [
  138. #{
  139. reason := <<"duplicated name:", _/binary>>,
  140. path := ?VALIDATIONS_PATH,
  141. kind := validation_error
  142. }
  143. ]},
  144. parse_and_check([
  145. validation(<<"foo">>, [sql_check()]),
  146. validation(<<"foo">>, [sql_check()])
  147. ])
  148. )},
  149. {"checks must be non-empty",
  150. ?_assertThrow(
  151. {_Schema, [
  152. #{
  153. reason := "at least one check must be defined",
  154. kind := validation_error
  155. }
  156. ]},
  157. parse_and_check([
  158. validation(
  159. <<"foo">>,
  160. []
  161. )
  162. ])
  163. )},
  164. {"bogus check type",
  165. ?_assertThrow(
  166. {_Schema, [
  167. #{
  168. expected := <<"sql", _/binary>>,
  169. kind := validation_error,
  170. field_name := type
  171. }
  172. ]},
  173. parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
  174. )}
  175. ].
  176. invalid_names_test_() ->
  177. [
  178. {InvalidName,
  179. ?_assertThrow(
  180. {_Schema, [
  181. #{
  182. reason := <<"must conform to regex:", _/binary>>,
  183. kind := validation_error,
  184. path := "message_validation.validations.1.name"
  185. }
  186. ]},
  187. parse_and_check([validation(InvalidName, [sql_check()])])
  188. )}
  189. || InvalidName <- [
  190. <<"">>,
  191. <<"_name">>,
  192. <<"name$">>,
  193. <<"name!">>,
  194. <<"some name">>,
  195. <<"nãme"/utf8>>,
  196. <<"test_哈哈"/utf8>>
  197. ]
  198. ].
  199. check_test_() ->
  200. [
  201. {"denied by payload 1",
  202. ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
  203. {"denied by payload 2",
  204. ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
  205. {"allowed by payload 1",
  206. ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
  207. {"allowed by payload 2",
  208. ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
  209. {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
  210. {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
  211. {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
  212. {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
  213. {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
  214. ].
  215. duplicated_check_test_() ->
  216. [
  217. {"duplicated topics 1",
  218. ?_assertThrow(
  219. {_Schema, [
  220. #{
  221. reason := <<"duplicated topics: t/1">>,
  222. kind := validation_error,
  223. path := "message_validation.validations.1.topics"
  224. }
  225. ]},
  226. parse_and_check([
  227. validation(
  228. <<"foo">>,
  229. [schema_check(json, <<"a">>)],
  230. #{<<"topics">> => [<<"t/1">>, <<"t/1">>]}
  231. )
  232. ])
  233. )},
  234. {"duplicated topics 2",
  235. ?_assertThrow(
  236. {_Schema, [
  237. #{
  238. reason := <<"duplicated topics: t/1">>,
  239. kind := validation_error,
  240. path := "message_validation.validations.1.topics"
  241. }
  242. ]},
  243. parse_and_check([
  244. validation(
  245. <<"foo">>,
  246. [schema_check(json, <<"a">>)],
  247. #{<<"topics">> => [<<"t/1">>, <<"t/#">>, <<"t/1">>]}
  248. )
  249. ])
  250. )},
  251. {"duplicated topics 3",
  252. ?_assertThrow(
  253. {_Schema, [
  254. #{
  255. reason := <<"duplicated topics: t/1, t/2">>,
  256. kind := validation_error,
  257. path := "message_validation.validations.1.topics"
  258. }
  259. ]},
  260. parse_and_check([
  261. validation(
  262. <<"foo">>,
  263. [schema_check(json, <<"a">>)],
  264. #{
  265. <<"topics">> => [
  266. <<"t/1">>,
  267. <<"t/#">>,
  268. <<"t/1">>,
  269. <<"t/2">>,
  270. <<"t/2">>
  271. ]
  272. }
  273. )
  274. ])
  275. )},
  276. {"duplicated sql checks are not checked",
  277. ?_assertMatch(
  278. [#{<<"checks">> := [_, _]}],
  279. parse_and_check([
  280. validation(<<"foo">>, [sql_check(), sql_check()])
  281. ])
  282. )},
  283. {"different serdes with same name",
  284. ?_assertMatch(
  285. [#{<<"checks">> := [_, _, _]}],
  286. parse_and_check([
  287. validation(<<"foo">>, [
  288. schema_check(json, <<"a">>),
  289. schema_check(avro, <<"a">>),
  290. schema_check(
  291. protobuf,
  292. <<"a">>,
  293. #{<<"message_name">> => <<"a">>}
  294. )
  295. ])
  296. ])
  297. )},
  298. {"duplicated serdes 1",
  299. ?_assertThrow(
  300. {_Schema, [
  301. #{
  302. reason := <<"duplicated schema checks: json:a">>,
  303. kind := validation_error,
  304. path := "message_validation.validations.1.checks"
  305. }
  306. ]},
  307. parse_and_check([
  308. validation(<<"foo">>, [
  309. schema_check(json, <<"a">>),
  310. schema_check(json, <<"a">>)
  311. ])
  312. ])
  313. )},
  314. {"duplicated serdes 2",
  315. ?_assertThrow(
  316. {_Schema, [
  317. #{
  318. reason := <<"duplicated schema checks: json:a">>,
  319. kind := validation_error,
  320. path := "message_validation.validations.1.checks"
  321. }
  322. ]},
  323. parse_and_check([
  324. validation(<<"foo">>, [
  325. schema_check(json, <<"a">>),
  326. sql_check(),
  327. schema_check(json, <<"a">>)
  328. ])
  329. ])
  330. )},
  331. {"duplicated serdes 3",
  332. ?_assertThrow(
  333. {_Schema, [
  334. #{
  335. reason := <<"duplicated schema checks: json:a">>,
  336. kind := validation_error,
  337. path := "message_validation.validations.1.checks"
  338. }
  339. ]},
  340. parse_and_check([
  341. validation(<<"foo">>, [
  342. schema_check(json, <<"a">>),
  343. schema_check(json, <<"a">>),
  344. sql_check()
  345. ])
  346. ])
  347. )},
  348. {"duplicated serdes 4",
  349. ?_assertThrow(
  350. {_Schema, [
  351. #{
  352. reason := <<"duplicated schema checks: json:a">>,
  353. kind := validation_error,
  354. path := "message_validation.validations.1.checks"
  355. }
  356. ]},
  357. parse_and_check([
  358. validation(<<"foo">>, [
  359. schema_check(json, <<"a">>),
  360. schema_check(json, <<"a">>),
  361. schema_check(json, <<"a">>)
  362. ])
  363. ])
  364. )},
  365. {"duplicated serdes 4",
  366. ?_assertThrow(
  367. {_Schema, [
  368. #{
  369. reason := <<"duplicated schema checks: ", _/binary>>,
  370. kind := validation_error,
  371. path := "message_validation.validations.1.checks"
  372. }
  373. ]},
  374. parse_and_check([
  375. validation(<<"foo">>, [
  376. schema_check(json, <<"a">>),
  377. schema_check(json, <<"a">>),
  378. schema_check(avro, <<"b">>),
  379. schema_check(avro, <<"b">>)
  380. ])
  381. ])
  382. )}
  383. ].