emqx_message_validation_tests.erl 12 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_message_validation_tests).
  5. -include_lib("eunit/include/eunit.hrl").
  6. -define(VALIDATIONS_PATH, "message_validation.validations").
  7. %%------------------------------------------------------------------------------
  8. %% Helper fns
  9. %%------------------------------------------------------------------------------
  10. parse_and_check(InnerConfigs) ->
  11. RootBin = <<"message_validation">>,
  12. InnerBin = <<"validations">>,
  13. RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
  14. #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
  15. emqx_message_validation_schema,
  16. RawConf,
  17. #{
  18. required => false,
  19. atom_key => false,
  20. make_serializable => false
  21. }
  22. ),
  23. Checked.
  24. validation(Name, Checks) ->
  25. validation(Name, Checks, _Overrides = #{}).
  26. validation(Name, Checks, Overrides) ->
  27. Default = #{
  28. <<"tags">> => [<<"some">>, <<"tags">>],
  29. <<"description">> => <<"my validation">>,
  30. <<"enable">> => true,
  31. <<"name">> => Name,
  32. <<"topics">> => <<"t/+">>,
  33. <<"strategy">> => <<"all_pass">>,
  34. <<"failure_action">> => <<"drop">>,
  35. <<"log_failure">> => #{<<"level">> => <<"warning">>},
  36. <<"checks">> => Checks
  37. },
  38. emqx_utils_maps:deep_merge(Default, Overrides).
  39. sql_check() ->
  40. sql_check(<<"select * where true">>).
  41. sql_check(SQL) ->
  42. #{
  43. <<"type">> => <<"sql">>,
  44. <<"sql">> => SQL
  45. }.
  46. schema_check(Type, SerdeName) ->
  47. schema_check(Type, SerdeName, _Overrides = #{}).
  48. schema_check(Type, SerdeName, Overrides) ->
  49. emqx_utils_maps:deep_merge(
  50. #{
  51. <<"type">> => emqx_utils_conv:bin(Type),
  52. <<"schema">> => SerdeName
  53. },
  54. Overrides
  55. ).
  56. eval_sql(Message, SQL) ->
  57. {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
  58. Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
  59. emqx_message_validation:evaluate_sql_check(Check, Validation, Message).
  60. message() ->
  61. message(_Opts = #{}).
  62. message(Opts) ->
  63. Defaults = #{
  64. id => emqx_guid:gen(),
  65. qos => 0,
  66. from => emqx_guid:to_hexstr(emqx_guid:gen()),
  67. flags => #{retain => false},
  68. headers => #{
  69. proto_ver => v5,
  70. properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
  71. },
  72. topic => <<"t/t">>,
  73. payload => emqx_utils_json:encode(#{value => 10}),
  74. timestamp => 1710272561615,
  75. extra => []
  76. },
  77. emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
  78. %%------------------------------------------------------------------------------
  79. %% Test cases
  80. %%------------------------------------------------------------------------------
  81. schema_test_() ->
  82. [
  83. {"topics is always a list 1",
  84. ?_assertMatch(
  85. [#{<<"topics">> := [<<"t/1">>]}],
  86. parse_and_check([
  87. validation(
  88. <<"foo">>,
  89. [sql_check()],
  90. #{<<"topics">> => <<"t/1">>}
  91. )
  92. ])
  93. )},
  94. {"topics is always a list 2",
  95. ?_assertMatch(
  96. [#{<<"topics">> := [<<"t/1">>]}],
  97. parse_and_check([
  98. validation(
  99. <<"foo">>,
  100. [sql_check()],
  101. #{<<"topics">> => [<<"t/1">>]}
  102. )
  103. ])
  104. )},
  105. {"foreach expression is not allowed",
  106. ?_assertThrow(
  107. {_Schema, [
  108. #{
  109. reason := foreach_not_allowed,
  110. kind := validation_error
  111. }
  112. ]},
  113. parse_and_check([
  114. validation(
  115. <<"foo">>,
  116. [sql_check(<<"foreach foo as f where true">>)]
  117. )
  118. ])
  119. )},
  120. {"from clause is not allowed",
  121. ?_assertThrow(
  122. {_Schema, [
  123. #{
  124. reason := non_empty_from_clause,
  125. kind := validation_error
  126. }
  127. ]},
  128. parse_and_check([
  129. validation(
  130. <<"foo">>,
  131. [sql_check(<<"select * from t">>)]
  132. )
  133. ])
  134. )},
  135. {"names are unique",
  136. ?_assertThrow(
  137. {_Schema, [
  138. #{
  139. reason := <<"duplicated name:", _/binary>>,
  140. path := ?VALIDATIONS_PATH,
  141. kind := validation_error
  142. }
  143. ]},
  144. parse_and_check([
  145. validation(<<"foo">>, [sql_check()]),
  146. validation(<<"foo">>, [sql_check()])
  147. ])
  148. )},
  149. {"checks must be non-empty",
  150. ?_assertThrow(
  151. {_Schema, [
  152. #{
  153. reason := "at least one check must be defined",
  154. kind := validation_error
  155. }
  156. ]},
  157. parse_and_check([
  158. validation(
  159. <<"foo">>,
  160. []
  161. )
  162. ])
  163. )},
  164. {"bogus check type",
  165. ?_assertThrow(
  166. {_Schema, [
  167. #{
  168. expected := <<"sql", _/binary>>,
  169. kind := validation_error,
  170. field_name := type
  171. }
  172. ]},
  173. parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
  174. )}
  175. ].
  176. invalid_names_test_() ->
  177. [
  178. {InvalidName,
  179. ?_assertThrow(
  180. {_Schema, [
  181. #{
  182. reason := <<"must conform to regex:", _/binary>>,
  183. kind := validation_error,
  184. path := "message_validation.validations.1.name"
  185. }
  186. ]},
  187. parse_and_check([validation(InvalidName, [sql_check()])])
  188. )}
  189. || InvalidName <- [
  190. <<"">>,
  191. <<"_name">>,
  192. <<"name$">>,
  193. <<"name!">>,
  194. <<"some name">>,
  195. <<"nãme"/utf8>>,
  196. <<"test_哈哈"/utf8>>
  197. ]
  198. ].
  199. check_test_() ->
  200. [
  201. {"denied by payload 1",
  202. ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
  203. {"denied by payload 2",
  204. ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
  205. {"allowed by payload 1",
  206. ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
  207. {"allowed by payload 2",
  208. ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
  209. {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
  210. {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
  211. {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
  212. {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
  213. {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
  214. ].
  215. duplicated_check_test_() ->
  216. [
  217. {"duplicated sql checks are not checked",
  218. ?_assertMatch(
  219. [#{<<"checks">> := [_, _]}],
  220. parse_and_check([
  221. validation(<<"foo">>, [sql_check(), sql_check()])
  222. ])
  223. )},
  224. {"different serdes with same name",
  225. ?_assertMatch(
  226. [#{<<"checks">> := [_, _, _]}],
  227. parse_and_check([
  228. validation(<<"foo">>, [
  229. schema_check(json, <<"a">>),
  230. schema_check(avro, <<"a">>),
  231. schema_check(
  232. protobuf,
  233. <<"a">>,
  234. #{<<"message_name">> => <<"a">>}
  235. )
  236. ])
  237. ])
  238. )},
  239. {"duplicated serdes 1",
  240. ?_assertThrow(
  241. {_Schema, [
  242. #{
  243. reason := <<"duplicated schema checks: json:a">>,
  244. kind := validation_error,
  245. path := "message_validation.validations.1.checks"
  246. }
  247. ]},
  248. parse_and_check([
  249. validation(<<"foo">>, [
  250. schema_check(json, <<"a">>),
  251. schema_check(json, <<"a">>)
  252. ])
  253. ])
  254. )},
  255. {"duplicated serdes 2",
  256. ?_assertThrow(
  257. {_Schema, [
  258. #{
  259. reason := <<"duplicated schema checks: json:a">>,
  260. kind := validation_error,
  261. path := "message_validation.validations.1.checks"
  262. }
  263. ]},
  264. parse_and_check([
  265. validation(<<"foo">>, [
  266. schema_check(json, <<"a">>),
  267. sql_check(),
  268. schema_check(json, <<"a">>)
  269. ])
  270. ])
  271. )},
  272. {"duplicated serdes 3",
  273. ?_assertThrow(
  274. {_Schema, [
  275. #{
  276. reason := <<"duplicated schema checks: json:a">>,
  277. kind := validation_error,
  278. path := "message_validation.validations.1.checks"
  279. }
  280. ]},
  281. parse_and_check([
  282. validation(<<"foo">>, [
  283. schema_check(json, <<"a">>),
  284. schema_check(json, <<"a">>),
  285. sql_check()
  286. ])
  287. ])
  288. )},
  289. {"duplicated serdes 4",
  290. ?_assertThrow(
  291. {_Schema, [
  292. #{
  293. reason := <<"duplicated schema checks: json:a">>,
  294. kind := validation_error,
  295. path := "message_validation.validations.1.checks"
  296. }
  297. ]},
  298. parse_and_check([
  299. validation(<<"foo">>, [
  300. schema_check(json, <<"a">>),
  301. schema_check(json, <<"a">>),
  302. schema_check(json, <<"a">>)
  303. ])
  304. ])
  305. )},
  306. {"duplicated serdes 4",
  307. ?_assertThrow(
  308. {_Schema, [
  309. #{
  310. reason := <<"duplicated schema checks: ", _/binary>>,
  311. kind := validation_error,
  312. path := "message_validation.validations.1.checks"
  313. }
  314. ]},
  315. parse_and_check([
  316. validation(<<"foo">>, [
  317. schema_check(json, <<"a">>),
  318. schema_check(json, <<"a">>),
  319. schema_check(avro, <<"b">>),
  320. schema_check(avro, <<"b">>)
  321. ])
  322. ])
  323. )}
  324. ].