emqx_rule_engine_api_SUITE.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_api_SUITE).
  17. -compile(nowarn_export_all).
  18. -compile(export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -define(SIMPLE_RULE(NAME_SUFFIX), #{
  22. <<"description">> => <<"A simple rule">>,
  23. <<"enable">> => true,
  24. <<"actions">> => [#{<<"function">> => <<"console">>}],
  25. <<"sql">> => <<"SELECT * from \"t/1\"">>,
  26. <<"name">> => <<"test_rule", NAME_SUFFIX/binary>>
  27. }).
  28. -define(SIMPLE_RULE(ID, NAME_SUFFIX), ?SIMPLE_RULE(NAME_SUFFIX)#{<<"id">> => ID}).
  29. all() ->
  30. emqx_common_test_helpers:all(?MODULE).
  31. init_per_suite(Config) ->
  32. Apps = emqx_cth_suite:start(
  33. [
  34. emqx,
  35. emqx_conf,
  36. emqx_rule_engine
  37. ],
  38. #{work_dir => emqx_cth_suite:work_dir(Config)}
  39. ),
  40. [{apps, Apps} | Config].
  41. end_per_suite(Config) ->
  42. Apps = ?config(apps, Config),
  43. emqx_cth_suite:stop(Apps),
  44. ok.
  45. init_per_testcase(t_crud_rule_api, Config) ->
  46. meck:new(emqx_utils_json, [passthrough]),
  47. init_per_testcase(common, Config);
  48. init_per_testcase(_, Config) ->
  49. Config.
  50. end_per_testcase(t_crud_rule_api, Config) ->
  51. meck:unload(emqx_utils_json),
  52. end_per_testcase(common, Config);
  53. end_per_testcase(_, _Config) ->
  54. {200, #{data := Rules}} =
  55. emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
  56. lists:foreach(
  57. fun(#{id := Id}) ->
  58. {204} = emqx_rule_engine_api:'/rules/:id'(
  59. delete,
  60. #{bindings => #{id => Id}}
  61. )
  62. end,
  63. Rules
  64. ).
  65. t_crud_rule_api(_Config) ->
  66. RuleId = <<"my_rule">>,
  67. Rule = simple_rule_fixture(RuleId, <<>>),
  68. ?assertEqual(RuleId, maps:get(id, Rule)),
  69. {200, #{data := Rules}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
  70. ct:pal("RList : ~p", [Rules]),
  71. ?assert(length(Rules) > 0),
  72. %% if we post again with the same id, it return with 400 "rule id already exists"
  73. ?assertMatch(
  74. {400, #{code := _, message := _Message}},
  75. emqx_rule_engine_api:'/rules'(post, #{body => ?SIMPLE_RULE(RuleId, <<"some_other">>)})
  76. ),
  77. {204} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{
  78. bindings => #{id => RuleId}
  79. }),
  80. {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
  81. ct:pal("RShow : ~p", [Rule1]),
  82. ?assertEqual(Rule, Rule1),
  83. {200, Metrics} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{bindings => #{id => RuleId}}),
  84. ct:pal("RMetrics : ~p", [Metrics]),
  85. ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics),
  86. %% simulating a node joining a cluster and lagging the configuration replication; in
  87. %% such cases, when fetching metrics, a rule may exist in the cluster but not on the
  88. %% new node. We just check that it doesn't provoke a crash.
  89. emqx_common_test_helpers:with_mock(
  90. emqx_metrics_worker,
  91. get_metrics,
  92. fun(HandlerName, MetricId) ->
  93. %% change the metric id to some unknown id.
  94. meck:passthrough([HandlerName, <<"unknown-", MetricId/binary>>])
  95. end,
  96. fun() ->
  97. {200, Metrics1} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{
  98. bindings => #{id => RuleId}
  99. }),
  100. ct:pal("RMetrics : ~p", [Metrics1]),
  101. ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics1),
  102. ok
  103. end
  104. ),
  105. {200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
  106. bindings => #{id => RuleId},
  107. body => ?SIMPLE_RULE(RuleId)#{<<"sql">> => <<"select * from \"t/b\"">>}
  108. }),
  109. {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
  110. %ct:pal("RShow : ~p", [Rule3]),
  111. ?assertEqual(Rule3, Rule2),
  112. ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
  113. {404, _} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => <<"unknown_rule">>}}),
  114. {404, _} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{
  115. bindings => #{id => <<"unknown_rule">>}
  116. }),
  117. {404, _} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{
  118. bindings => #{id => <<"unknown_rule">>}
  119. }),
  120. ?assertMatch(
  121. {204},
  122. emqx_rule_engine_api:'/rules/:id'(
  123. delete,
  124. #{bindings => #{id => RuleId}}
  125. )
  126. ),
  127. ?assertMatch(
  128. {404, #{code := 'NOT_FOUND'}},
  129. emqx_rule_engine_api:'/rules/:id'(
  130. delete,
  131. #{bindings => #{id => RuleId}}
  132. )
  133. ),
  134. ?assertMatch(
  135. {404, #{code := _, message := _Message}},
  136. emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}})
  137. ),
  138. {400, #{
  139. code := 'BAD_REQUEST',
  140. message := SelectAndTransformJsonError
  141. }} =
  142. emqx_rule_engine_api:'/rule_test'(
  143. post,
  144. test_rule_params(<<"SELECT\n payload.msg\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hel">>)
  145. ),
  146. ?assertMatch(
  147. #{<<"select_and_transform_error">> := <<"decode_json_failed">>},
  148. emqx_utils_json:decode(SelectAndTransformJsonError, [return_maps])
  149. ),
  150. {400, #{
  151. code := 'BAD_REQUEST',
  152. message := SelectAndTransformBadArgError
  153. }} =
  154. emqx_rule_engine_api:'/rule_test'(
  155. post,
  156. test_rule_params(
  157. <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
  158. )
  159. ),
  160. ?assertMatch(
  161. #{<<"select_and_transform_error">> := <<"badarg">>},
  162. emqx_utils_json:decode(SelectAndTransformBadArgError, [return_maps])
  163. ),
  164. {400, #{
  165. code := 'BAD_REQUEST',
  166. message := BadSqlMessage
  167. }} = emqx_rule_engine_api:'/rule_test'(
  168. post,
  169. test_rule_params(
  170. <<"BAD_SQL">>, <<"{\"msg\": \"hello\"}">>
  171. )
  172. ),
  173. ?assertMatch({match, _}, re:run(BadSqlMessage, "syntax error")),
  174. meck:expect(emqx_utils_json, safe_encode, 1, {error, foo}),
  175. ?assertMatch(
  176. {400, #{
  177. code := 'BAD_REQUEST',
  178. message := <<"{select_and_transform_error,badarg}">>
  179. }},
  180. emqx_rule_engine_api:'/rule_test'(
  181. post,
  182. test_rule_params(
  183. <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
  184. )
  185. )
  186. ),
  187. ok.
  188. t_list_rule_api(_Config) ->
  189. AddIds = rules_fixture(20),
  190. ct:pal("rule ids: ~p", [AddIds]),
  191. {200, #{data := Rules, meta := #{count := Count}}} =
  192. emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
  193. ?assertEqual(20, length(AddIds)),
  194. ?assertEqual(20, length(Rules)),
  195. ?assertEqual(20, Count),
  196. [RuleId | _] = AddIds,
  197. UpdateParams = #{
  198. <<"description">> => <<"中文的描述也能搜索"/utf8>>,
  199. <<"enable">> => false,
  200. <<"actions">> => [#{<<"function">> => <<"console">>}],
  201. <<"sql">> => <<"SELECT * from \"t/1/+\"">>,
  202. <<"name">> => <<"test_rule_update1">>
  203. },
  204. {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
  205. bindings => #{id => RuleId},
  206. body => UpdateParams
  207. }),
  208. QueryStr1 = #{query_string => #{<<"enable">> => false}},
  209. {200, Result1 = #{meta := #{count := Count1}}} = emqx_rule_engine_api:'/rules'(get, QueryStr1),
  210. ?assertEqual(1, Count1),
  211. QueryStr2 = #{query_string => #{<<"like_description">> => <<"也能"/utf8>>}},
  212. {200, Result2} = emqx_rule_engine_api:'/rules'(get, QueryStr2),
  213. ?assertEqual(maps:get(data, Result1), maps:get(data, Result2)),
  214. QueryStr3 = #{query_string => #{<<"from">> => <<"t/1">>}},
  215. {200, #{data := Data3}} = emqx_rule_engine_api:'/rules'(get, QueryStr3),
  216. ?assertEqual(19, length(Data3)),
  217. QueryStr4 = #{query_string => #{<<"like_from">> => <<"t/1/+">>}},
  218. {200, Result4} = emqx_rule_engine_api:'/rules'(get, QueryStr4),
  219. ?assertEqual(maps:get(data, Result1), maps:get(data, Result4)),
  220. QueryStr5 = #{query_string => #{<<"match_from">> => <<"t/+/+">>}},
  221. {200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5),
  222. ?assertEqual(maps:get(data, Result1), maps:get(data, Result5)),
  223. QueryStr6 = #{query_string => #{<<"like_id">> => RuleId}},
  224. {200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6),
  225. ?assertEqual(maps:get(data, Result1), maps:get(data, Result6)),
  226. ok.
  227. t_reset_metrics_on_disable(_Config) ->
  228. #{id := RuleId} = simple_rule_fixture(),
  229. %% generate some fake metrics
  230. emqx_metrics_worker:inc(rule_metrics, RuleId, 'matched', 10),
  231. emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed', 10),
  232. {200, #{metrics := Metrics0}} = emqx_rule_engine_api:'/rules/:id/metrics'(
  233. get,
  234. #{bindings => #{id => RuleId}}
  235. ),
  236. ?assertMatch(#{passed := 10, matched := 10}, Metrics0),
  237. %% disable the rule; metrics should be reset
  238. {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
  239. bindings => #{id => RuleId},
  240. body => #{<<"enable">> => false}
  241. }),
  242. {200, #{metrics := Metrics1}} = emqx_rule_engine_api:'/rules/:id/metrics'(
  243. get,
  244. #{bindings => #{id => RuleId}}
  245. ),
  246. ?assertMatch(#{passed := 0, matched := 0}, Metrics1),
  247. ok.
  248. test_rule_params(Sql, Payload) ->
  249. #{
  250. body => #{
  251. <<"context">> =>
  252. #{
  253. <<"clientid">> => <<"c_emqx">>,
  254. <<"event_type">> => <<"message_publish">>,
  255. <<"payload">> => Payload,
  256. <<"qos">> => 1,
  257. <<"topic">> => <<"t/a">>,
  258. <<"username">> => <<"u_emqx">>
  259. },
  260. <<"sql">> => Sql
  261. }
  262. }.
  263. t_rule_engine(_) ->
  264. _ = simple_rule_fixture(),
  265. {200, Config} = emqx_rule_engine_api:'/rule_engine'(get, #{}),
  266. ?assert(not maps:is_key(rules, Config)),
  267. {200, #{
  268. jq_function_default_timeout := 12000
  269. % hidden! jq_implementation_module := jq_port
  270. }} = emqx_rule_engine_api:'/rule_engine'(put, #{
  271. body => #{
  272. <<"jq_function_default_timeout">> => <<"12s">>,
  273. <<"jq_implementation_module">> => <<"jq_port">>
  274. }
  275. }),
  276. SomeRule = #{<<"sql">> => <<"SELECT * FROM \"t/#\"">>},
  277. {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{
  278. body => #{<<"rules">> => #{<<"some_rule">> => SomeRule}}
  279. }),
  280. {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
  281. t_dont_downgrade_bridge_type(_) ->
  282. case emqx_release:edition() of
  283. ee ->
  284. do_t_dont_downgrade_bridge_type();
  285. ce ->
  286. %% downgrade is not supported in CE
  287. ok
  288. end.
  289. do_t_dont_downgrade_bridge_type() ->
  290. %% Create a rule using a bridge V1 ID
  291. #{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}),
  292. ?assertMatch(
  293. %% returns an action ID
  294. {200, #{data := [#{actions := [<<"kafka_producer:name">>]}]}},
  295. emqx_rule_engine_api:'/rules'(get, #{query_string => #{}})
  296. ),
  297. ?assertMatch(
  298. %% returns an action ID
  299. {200, #{actions := [<<"kafka_producer:name">>]}},
  300. emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}})
  301. ),
  302. ok.
  303. rules_fixture(N) ->
  304. lists:map(
  305. fun(Seq0) ->
  306. Seq = integer_to_binary(Seq0),
  307. #{id := Id} = simple_rule_fixture(Seq),
  308. Id
  309. end,
  310. lists:seq(1, N)
  311. ).
  312. simple_rule_fixture() ->
  313. simple_rule_fixture(<<>>).
  314. simple_rule_fixture(NameSuffix) ->
  315. create_rule(?SIMPLE_RULE(NameSuffix)).
  316. simple_rule_fixture(Id, NameSuffix) ->
  317. create_rule(?SIMPLE_RULE(Id, NameSuffix)).
  318. create_rule(Params) ->
  319. {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params}),
  320. Rule.