emqx_rule_engine_api_SUITE.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_api_SUITE).
  17. -compile(nowarn_export_all).
  18. -compile(export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
  22. -define(SIMPLE_RULE(NAME_SUFFIX), #{
  23. <<"description">> => <<"A simple rule">>,
  24. <<"enable">> => true,
  25. <<"actions">> => [#{<<"function">> => <<"console">>}],
  26. <<"sql">> => <<"SELECT * from \"t/1\"">>,
  27. <<"name">> => <<"test_rule", NAME_SUFFIX/binary>>
  28. }).
  29. -define(SIMPLE_RULE(ID, NAME_SUFFIX), ?SIMPLE_RULE(NAME_SUFFIX)#{<<"id">> => ID}).
  30. all() ->
  31. emqx_common_test_helpers:all(?MODULE).
  32. init_per_suite(Config) ->
  33. application:load(emqx_conf),
  34. ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT),
  35. ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
  36. Config.
  37. end_per_suite(_Config) ->
  38. emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
  39. ok.
  40. init_per_testcase(t_crud_rule_api, Config) ->
  41. meck:new(emqx_utils_json, [passthrough]),
  42. init_per_testcase(common, Config);
  43. init_per_testcase(_, Config) ->
  44. Config.
  45. end_per_testcase(t_crud_rule_api, Config) ->
  46. meck:unload(emqx_utils_json),
  47. end_per_testcase(common, Config);
  48. end_per_testcase(_, _Config) ->
  49. {200, #{data := Rules}} =
  50. emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
  51. lists:foreach(
  52. fun(#{id := Id}) ->
  53. {204} = emqx_rule_engine_api:'/rules/:id'(
  54. delete,
  55. #{bindings => #{id => Id}}
  56. )
  57. end,
  58. Rules
  59. ).
  60. t_crud_rule_api(_Config) ->
  61. RuleId = <<"my_rule">>,
  62. Rule = simple_rule_fixture(RuleId, <<>>),
  63. ?assertEqual(RuleId, maps:get(id, Rule)),
  64. {200, #{data := Rules}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
  65. ct:pal("RList : ~p", [Rules]),
  66. ?assert(length(Rules) > 0),
  67. %% if we post again with the same id, it return with 400 "rule id already exists"
  68. ?assertMatch(
  69. {400, #{code := _, message := _Message}},
  70. emqx_rule_engine_api:'/rules'(post, #{body => ?SIMPLE_RULE(RuleId, <<"some_other">>)})
  71. ),
  72. {204} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{
  73. bindings => #{id => RuleId}
  74. }),
  75. {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
  76. ct:pal("RShow : ~p", [Rule1]),
  77. ?assertEqual(Rule, Rule1),
  78. {200, Metrics} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{bindings => #{id => RuleId}}),
  79. ct:pal("RMetrics : ~p", [Metrics]),
  80. ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics),
  81. %% simulating a node joining a cluster and lagging the configuration replication; in
  82. %% such cases, when fetching metrics, a rule may exist in the cluster but not on the
  83. %% new node. We just check that it doesn't provoke a crash.
  84. emqx_common_test_helpers:with_mock(
  85. emqx_metrics_worker,
  86. get_metrics,
  87. fun(HandlerName, MetricId) ->
  88. %% change the metric id to some unknown id.
  89. meck:passthrough([HandlerName, <<"unknown-", MetricId/binary>>])
  90. end,
  91. fun() ->
  92. {200, Metrics1} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{
  93. bindings => #{id => RuleId}
  94. }),
  95. ct:pal("RMetrics : ~p", [Metrics1]),
  96. ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics1),
  97. ok
  98. end
  99. ),
  100. {200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
  101. bindings => #{id => RuleId},
  102. body => ?SIMPLE_RULE(RuleId)#{<<"sql">> => <<"select * from \"t/b\"">>}
  103. }),
  104. {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
  105. %ct:pal("RShow : ~p", [Rule3]),
  106. ?assertEqual(Rule3, Rule2),
  107. ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
  108. {404, _} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => <<"unknown_rule">>}}),
  109. {404, _} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{
  110. bindings => #{id => <<"unknown_rule">>}
  111. }),
  112. {404, _} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{
  113. bindings => #{id => <<"unknown_rule">>}
  114. }),
  115. ?assertMatch(
  116. {204},
  117. emqx_rule_engine_api:'/rules/:id'(
  118. delete,
  119. #{bindings => #{id => RuleId}}
  120. )
  121. ),
  122. ?assertMatch(
  123. {404, #{code := 'NOT_FOUND'}},
  124. emqx_rule_engine_api:'/rules/:id'(
  125. delete,
  126. #{bindings => #{id => RuleId}}
  127. )
  128. ),
  129. ?assertMatch(
  130. {404, #{code := _, message := _Message}},
  131. emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}})
  132. ),
  133. {400, #{
  134. code := 'BAD_REQUEST',
  135. message := SelectAndTransformJsonError
  136. }} =
  137. emqx_rule_engine_api:'/rule_test'(
  138. post,
  139. test_rule_params(<<"SELECT\n payload.msg\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hel">>)
  140. ),
  141. ?assertMatch(
  142. #{<<"select_and_transform_error">> := <<"decode_json_failed">>},
  143. emqx_utils_json:decode(SelectAndTransformJsonError, [return_maps])
  144. ),
  145. {400, #{
  146. code := 'BAD_REQUEST',
  147. message := SelectAndTransformBadArgError
  148. }} =
  149. emqx_rule_engine_api:'/rule_test'(
  150. post,
  151. test_rule_params(
  152. <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
  153. )
  154. ),
  155. ?assertMatch(
  156. #{<<"select_and_transform_error">> := <<"badarg">>},
  157. emqx_utils_json:decode(SelectAndTransformBadArgError, [return_maps])
  158. ),
  159. {400, #{
  160. code := 'BAD_REQUEST',
  161. message := BadSqlMessage
  162. }} = emqx_rule_engine_api:'/rule_test'(
  163. post,
  164. test_rule_params(
  165. <<"BAD_SQL">>, <<"{\"msg\": \"hello\"}">>
  166. )
  167. ),
  168. ?assertMatch({match, _}, re:run(BadSqlMessage, "syntax error")),
  169. meck:expect(emqx_utils_json, safe_encode, 1, {error, foo}),
  170. ?assertMatch(
  171. {400, #{
  172. code := 'BAD_REQUEST',
  173. message := <<"{select_and_transform_error,badarg}">>
  174. }},
  175. emqx_rule_engine_api:'/rule_test'(
  176. post,
  177. test_rule_params(
  178. <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
  179. )
  180. )
  181. ),
  182. ok.
  183. t_list_rule_api(_Config) ->
  184. AddIds = rules_fixture(20),
  185. ct:pal("rule ids: ~p", [AddIds]),
  186. {200, #{data := Rules, meta := #{count := Count}}} =
  187. emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
  188. ?assertEqual(20, length(AddIds)),
  189. ?assertEqual(20, length(Rules)),
  190. ?assertEqual(20, Count),
  191. [RuleId | _] = AddIds,
  192. UpdateParams = #{
  193. <<"description">> => <<"中文的描述也能搜索"/utf8>>,
  194. <<"enable">> => false,
  195. <<"actions">> => [#{<<"function">> => <<"console">>}],
  196. <<"sql">> => <<"SELECT * from \"t/1/+\"">>,
  197. <<"name">> => <<"test_rule_update1">>
  198. },
  199. {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
  200. bindings => #{id => RuleId},
  201. body => UpdateParams
  202. }),
  203. QueryStr1 = #{query_string => #{<<"enable">> => false}},
  204. {200, Result1 = #{meta := #{count := Count1}}} = emqx_rule_engine_api:'/rules'(get, QueryStr1),
  205. ?assertEqual(1, Count1),
  206. QueryStr2 = #{query_string => #{<<"like_description">> => <<"也能"/utf8>>}},
  207. {200, Result2} = emqx_rule_engine_api:'/rules'(get, QueryStr2),
  208. ?assertEqual(maps:get(data, Result1), maps:get(data, Result2)),
  209. QueryStr3 = #{query_string => #{<<"from">> => <<"t/1">>}},
  210. {200, #{data := Data3}} = emqx_rule_engine_api:'/rules'(get, QueryStr3),
  211. ?assertEqual(19, length(Data3)),
  212. QueryStr4 = #{query_string => #{<<"like_from">> => <<"t/1/+">>}},
  213. {200, Result4} = emqx_rule_engine_api:'/rules'(get, QueryStr4),
  214. ?assertEqual(maps:get(data, Result1), maps:get(data, Result4)),
  215. QueryStr5 = #{query_string => #{<<"match_from">> => <<"t/+/+">>}},
  216. {200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5),
  217. ?assertEqual(maps:get(data, Result1), maps:get(data, Result5)),
  218. QueryStr6 = #{query_string => #{<<"like_id">> => RuleId}},
  219. {200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6),
  220. ?assertEqual(maps:get(data, Result1), maps:get(data, Result6)),
  221. ok.
  222. t_reset_metrics_on_disable(_Config) ->
  223. #{id := RuleId} = simple_rule_fixture(),
  224. %% generate some fake metrics
  225. emqx_metrics_worker:inc(rule_metrics, RuleId, 'matched', 10),
  226. emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed', 10),
  227. {200, #{metrics := Metrics0}} = emqx_rule_engine_api:'/rules/:id/metrics'(
  228. get,
  229. #{bindings => #{id => RuleId}}
  230. ),
  231. ?assertMatch(#{passed := 10, matched := 10}, Metrics0),
  232. %% disable the rule; metrics should be reset
  233. {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
  234. bindings => #{id => RuleId},
  235. body => #{<<"enable">> => false}
  236. }),
  237. {200, #{metrics := Metrics1}} = emqx_rule_engine_api:'/rules/:id/metrics'(
  238. get,
  239. #{bindings => #{id => RuleId}}
  240. ),
  241. ?assertMatch(#{passed := 0, matched := 0}, Metrics1),
  242. ok.
  243. test_rule_params(Sql, Payload) ->
  244. #{
  245. body => #{
  246. <<"context">> =>
  247. #{
  248. <<"clientid">> => <<"c_emqx">>,
  249. <<"event_type">> => <<"message_publish">>,
  250. <<"payload">> => Payload,
  251. <<"qos">> => 1,
  252. <<"topic">> => <<"t/a">>,
  253. <<"username">> => <<"u_emqx">>
  254. },
  255. <<"sql">> => Sql
  256. }
  257. }.
  258. t_rule_engine(_) ->
  259. _ = simple_rule_fixture(),
  260. {200, Config} = emqx_rule_engine_api:'/rule_engine'(get, #{}),
  261. ?assert(not maps:is_key(rules, Config)),
  262. {200, #{
  263. jq_function_default_timeout := 12000
  264. % hidden! jq_implementation_module := jq_port
  265. }} = emqx_rule_engine_api:'/rule_engine'(put, #{
  266. body => #{
  267. <<"jq_function_default_timeout">> => <<"12s">>,
  268. <<"jq_implementation_module">> => <<"jq_port">>
  269. }
  270. }),
  271. SomeRule = #{<<"sql">> => <<"SELECT * FROM \"t/#\"">>},
  272. {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{
  273. body => #{<<"rules">> => #{<<"some_rule">> => SomeRule}}
  274. }),
  275. {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
  276. t_dont_downgrade_bridge_type(_) ->
  277. case emqx_release:edition() of
  278. ee ->
  279. do_t_dont_downgrade_bridge_type();
  280. ce ->
  281. %% downgrade is not supported in CE
  282. ok
  283. end.
  284. do_t_dont_downgrade_bridge_type() ->
  285. %% Create a rule using a bridge V1 ID
  286. #{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}),
  287. ?assertMatch(
  288. %% returns an action ID
  289. {200, #{data := [#{actions := [<<"kafka_producer:name">>]}]}},
  290. emqx_rule_engine_api:'/rules'(get, #{query_string => #{}})
  291. ),
  292. ?assertMatch(
  293. %% returns an action ID
  294. {200, #{actions := [<<"kafka_producer:name">>]}},
  295. emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}})
  296. ),
  297. ok.
  298. rules_fixture(N) ->
  299. lists:map(
  300. fun(Seq0) ->
  301. Seq = integer_to_binary(Seq0),
  302. #{id := Id} = simple_rule_fixture(Seq),
  303. Id
  304. end,
  305. lists:seq(1, N)
  306. ).
  307. simple_rule_fixture() ->
  308. simple_rule_fixture(<<>>).
  309. simple_rule_fixture(NameSuffix) ->
  310. create_rule(?SIMPLE_RULE(NameSuffix)).
  311. simple_rule_fixture(Id, NameSuffix) ->
  312. create_rule(?SIMPLE_RULE(Id, NameSuffix)).
  313. create_rule(Params) ->
  314. {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params}),
  315. Rule.