| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_rule_engine_api_SUITE).
- -compile(nowarn_export_all).
- -compile(export_all).
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
- -define(SIMPLE_RULE(NAME_SUFFIX), #{
- <<"description">> => <<"A simple rule">>,
- <<"enable">> => true,
- <<"actions">> => [#{<<"function">> => <<"console">>}],
- <<"sql">> => <<"SELECT * from \"t/1\"">>,
- <<"name">> => <<"test_rule", NAME_SUFFIX/binary>>
- }).
- -define(SIMPLE_RULE(ID, NAME_SUFFIX), ?SIMPLE_RULE(NAME_SUFFIX)#{<<"id">> => ID}).
- all() ->
- emqx_common_test_helpers:all(?MODULE).
- init_per_suite(Config) ->
- application:load(emqx_conf),
- ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT),
- ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
- Config.
- end_per_suite(_Config) ->
- emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
- ok.
- init_per_testcase(t_crud_rule_api, Config) ->
- meck:new(emqx_utils_json, [passthrough]),
- init_per_testcase(common, Config);
- init_per_testcase(_, Config) ->
- Config.
- end_per_testcase(t_crud_rule_api, Config) ->
- meck:unload(emqx_utils_json),
- end_per_testcase(common, Config);
- end_per_testcase(_, _Config) ->
- {200, #{data := Rules}} =
- emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
- lists:foreach(
- fun(#{id := Id}) ->
- {204} = emqx_rule_engine_api:'/rules/:id'(
- delete,
- #{bindings => #{id => Id}}
- )
- end,
- Rules
- ).
- t_crud_rule_api(_Config) ->
- RuleId = <<"my_rule">>,
- Rule = simple_rule_fixture(RuleId, <<>>),
- ?assertEqual(RuleId, maps:get(id, Rule)),
- {200, #{data := Rules}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
- ct:pal("RList : ~p", [Rules]),
- ?assert(length(Rules) > 0),
- %% if we post again with the same id, it return with 400 "rule id already exists"
- ?assertMatch(
- {400, #{code := _, message := _Message}},
- emqx_rule_engine_api:'/rules'(post, #{body => ?SIMPLE_RULE(RuleId, <<"some_other">>)})
- ),
- {204} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{
- bindings => #{id => RuleId}
- }),
- {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
- ct:pal("RShow : ~p", [Rule1]),
- ?assertEqual(Rule, Rule1),
- {200, Metrics} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{bindings => #{id => RuleId}}),
- ct:pal("RMetrics : ~p", [Metrics]),
- ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics),
- %% simulating a node joining a cluster and lagging the configuration replication; in
- %% such cases, when fetching metrics, a rule may exist in the cluster but not on the
- %% new node. We just check that it doesn't provoke a crash.
- emqx_common_test_helpers:with_mock(
- emqx_metrics_worker,
- get_metrics,
- fun(HandlerName, MetricId) ->
- %% change the metric id to some unknown id.
- meck:passthrough([HandlerName, <<"unknown-", MetricId/binary>>])
- end,
- fun() ->
- {200, Metrics1} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{
- bindings => #{id => RuleId}
- }),
- ct:pal("RMetrics : ~p", [Metrics1]),
- ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics1),
- ok
- end
- ),
- {200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
- bindings => #{id => RuleId},
- body => ?SIMPLE_RULE(RuleId)#{<<"sql">> => <<"select * from \"t/b\"">>}
- }),
- {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
- %ct:pal("RShow : ~p", [Rule3]),
- ?assertEqual(Rule3, Rule2),
- ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
- {404, _} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => <<"unknown_rule">>}}),
- {404, _} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{
- bindings => #{id => <<"unknown_rule">>}
- }),
- {404, _} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{
- bindings => #{id => <<"unknown_rule">>}
- }),
- ?assertMatch(
- {204},
- emqx_rule_engine_api:'/rules/:id'(
- delete,
- #{bindings => #{id => RuleId}}
- )
- ),
- ?assertMatch(
- {404, #{code := 'NOT_FOUND'}},
- emqx_rule_engine_api:'/rules/:id'(
- delete,
- #{bindings => #{id => RuleId}}
- )
- ),
- ?assertMatch(
- {404, #{code := _, message := _Message}},
- emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}})
- ),
- {400, #{
- code := 'BAD_REQUEST',
- message := SelectAndTransformJsonError
- }} =
- emqx_rule_engine_api:'/rule_test'(
- post,
- test_rule_params(<<"SELECT\n payload.msg\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hel">>)
- ),
- ?assertMatch(
- #{<<"select_and_transform_error">> := <<"decode_json_failed">>},
- emqx_utils_json:decode(SelectAndTransformJsonError, [return_maps])
- ),
- {400, #{
- code := 'BAD_REQUEST',
- message := SelectAndTransformBadArgError
- }} =
- emqx_rule_engine_api:'/rule_test'(
- post,
- test_rule_params(
- <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
- )
- ),
- ?assertMatch(
- #{<<"select_and_transform_error">> := <<"badarg">>},
- emqx_utils_json:decode(SelectAndTransformBadArgError, [return_maps])
- ),
- {400, #{
- code := 'BAD_REQUEST',
- message := BadSqlMessage
- }} = emqx_rule_engine_api:'/rule_test'(
- post,
- test_rule_params(
- <<"BAD_SQL">>, <<"{\"msg\": \"hello\"}">>
- )
- ),
- ?assertMatch({match, _}, re:run(BadSqlMessage, "syntax error")),
- meck:expect(emqx_utils_json, safe_encode, 1, {error, foo}),
- ?assertMatch(
- {400, #{
- code := 'BAD_REQUEST',
- message := <<"{select_and_transform_error,badarg}">>
- }},
- emqx_rule_engine_api:'/rule_test'(
- post,
- test_rule_params(
- <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
- )
- )
- ),
- ok.
- t_list_rule_api(_Config) ->
- AddIds = rules_fixture(20),
- ct:pal("rule ids: ~p", [AddIds]),
- {200, #{data := Rules, meta := #{count := Count}}} =
- emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
- ?assertEqual(20, length(AddIds)),
- ?assertEqual(20, length(Rules)),
- ?assertEqual(20, Count),
- [RuleId | _] = AddIds,
- UpdateParams = #{
- <<"description">> => <<"中文的描述也能搜索"/utf8>>,
- <<"enable">> => false,
- <<"actions">> => [#{<<"function">> => <<"console">>}],
- <<"sql">> => <<"SELECT * from \"t/1/+\"">>,
- <<"name">> => <<"test_rule_update1">>
- },
- {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
- bindings => #{id => RuleId},
- body => UpdateParams
- }),
- QueryStr1 = #{query_string => #{<<"enable">> => false}},
- {200, Result1 = #{meta := #{count := Count1}}} = emqx_rule_engine_api:'/rules'(get, QueryStr1),
- ?assertEqual(1, Count1),
- QueryStr2 = #{query_string => #{<<"like_description">> => <<"也能"/utf8>>}},
- {200, Result2} = emqx_rule_engine_api:'/rules'(get, QueryStr2),
- ?assertEqual(maps:get(data, Result1), maps:get(data, Result2)),
- QueryStr3 = #{query_string => #{<<"from">> => <<"t/1">>}},
- {200, #{data := Data3}} = emqx_rule_engine_api:'/rules'(get, QueryStr3),
- ?assertEqual(19, length(Data3)),
- QueryStr4 = #{query_string => #{<<"like_from">> => <<"t/1/+">>}},
- {200, Result4} = emqx_rule_engine_api:'/rules'(get, QueryStr4),
- ?assertEqual(maps:get(data, Result1), maps:get(data, Result4)),
- QueryStr5 = #{query_string => #{<<"match_from">> => <<"t/+/+">>}},
- {200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5),
- ?assertEqual(maps:get(data, Result1), maps:get(data, Result5)),
- QueryStr6 = #{query_string => #{<<"like_id">> => RuleId}},
- {200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6),
- ?assertEqual(maps:get(data, Result1), maps:get(data, Result6)),
- ok.
- t_reset_metrics_on_disable(_Config) ->
- #{id := RuleId} = simple_rule_fixture(),
- %% generate some fake metrics
- emqx_metrics_worker:inc(rule_metrics, RuleId, 'matched', 10),
- emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed', 10),
- {200, #{metrics := Metrics0}} = emqx_rule_engine_api:'/rules/:id/metrics'(
- get,
- #{bindings => #{id => RuleId}}
- ),
- ?assertMatch(#{passed := 10, matched := 10}, Metrics0),
- %% disable the rule; metrics should be reset
- {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
- bindings => #{id => RuleId},
- body => #{<<"enable">> => false}
- }),
- {200, #{metrics := Metrics1}} = emqx_rule_engine_api:'/rules/:id/metrics'(
- get,
- #{bindings => #{id => RuleId}}
- ),
- ?assertMatch(#{passed := 0, matched := 0}, Metrics1),
- ok.
- test_rule_params(Sql, Payload) ->
- #{
- body => #{
- <<"context">> =>
- #{
- <<"clientid">> => <<"c_emqx">>,
- <<"event_type">> => <<"message_publish">>,
- <<"payload">> => Payload,
- <<"qos">> => 1,
- <<"topic">> => <<"t/a">>,
- <<"username">> => <<"u_emqx">>
- },
- <<"sql">> => Sql
- }
- }.
- t_rule_engine(_) ->
- _ = simple_rule_fixture(),
- {200, Config} = emqx_rule_engine_api:'/rule_engine'(get, #{}),
- ?assert(not maps:is_key(rules, Config)),
- {200, #{
- jq_function_default_timeout := 12000
- % hidden! jq_implementation_module := jq_port
- }} = emqx_rule_engine_api:'/rule_engine'(put, #{
- body => #{
- <<"jq_function_default_timeout">> => <<"12s">>,
- <<"jq_implementation_module">> => <<"jq_port">>
- }
- }),
- SomeRule = #{<<"sql">> => <<"SELECT * FROM \"t/#\"">>},
- {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{
- body => #{<<"rules">> => #{<<"some_rule">> => SomeRule}}
- }),
- {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
- t_dont_downgrade_bridge_type(_) ->
- case emqx_release:edition() of
- ee ->
- do_t_dont_downgrade_bridge_type();
- ce ->
- %% downgrade is not supported in CE
- ok
- end.
- do_t_dont_downgrade_bridge_type() ->
- %% Create a rule using a bridge V1 ID
- #{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}),
- ?assertMatch(
- %% returns an action ID
- {200, #{data := [#{actions := [<<"kafka_producer:name">>]}]}},
- emqx_rule_engine_api:'/rules'(get, #{query_string => #{}})
- ),
- ?assertMatch(
- %% returns an action ID
- {200, #{actions := [<<"kafka_producer:name">>]}},
- emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}})
- ),
- ok.
- rules_fixture(N) ->
- lists:map(
- fun(Seq0) ->
- Seq = integer_to_binary(Seq0),
- #{id := Id} = simple_rule_fixture(Seq),
- Id
- end,
- lists:seq(1, N)
- ).
- simple_rule_fixture() ->
- simple_rule_fixture(<<>>).
- simple_rule_fixture(NameSuffix) ->
- create_rule(?SIMPLE_RULE(NameSuffix)).
- simple_rule_fixture(Id, NameSuffix) ->
- create_rule(?SIMPLE_RULE(Id, NameSuffix)).
- create_rule(Params) ->
- {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params}),
- Rule.
|