|
|
@@ -0,0 +1,277 @@
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
+%%
|
|
|
+%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
+%% you may not use this file except in compliance with the License.
|
|
|
+%% You may obtain a copy of the License at
|
|
|
+%%
|
|
|
+%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+%%
|
|
|
+%% Unless required by applicable law or agreed to in writing, software
|
|
|
+%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+%% See the License for the specific language governing permissions and
|
|
|
+%% limitations under the License.
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+-module(emqx_rule_engine_api_rule_test_SUITE).
|
|
|
+
|
|
|
+-compile(nowarn_export_all).
|
|
|
+-compile(export_all).
|
|
|
+
|
|
|
+-include_lib("eunit/include/eunit.hrl").
|
|
|
+-include_lib("common_test/include/ct.hrl").
|
|
|
+
|
|
|
+-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
|
|
|
+
|
|
|
+all() ->
|
|
|
+ emqx_common_test_helpers:all(?MODULE).
|
|
|
+
|
|
|
+init_per_suite(Config) ->
|
|
|
+ application:load(emqx_conf),
|
|
|
+ ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT),
|
|
|
+ ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
|
|
|
+ Config.
|
|
|
+
|
|
|
+end_per_suite(_Config) ->
|
|
|
+ emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_ctx_pub(_) ->
|
|
|
+ SQL = <<"SELECT payload.msg as msg, clientid, username, payload, topic, qos FROM \"t/#\"">>,
|
|
|
+ Context = #{
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => message_publish,
|
|
|
+ payload => <<"{\"msg\": \"hello\"}">>,
|
|
|
+ qos => 1,
|
|
|
+ topic => <<"t/a">>,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+ Expected = Context#{msg => <<"hello">>},
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_sub(_) ->
|
|
|
+ SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_subscribed\"">>,
|
|
|
+ Context = #{
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => session_subscribed,
|
|
|
+ qos => 1,
|
|
|
+ topic => <<"t/a">>,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+
|
|
|
+ do_test(SQL, Context, Context).
|
|
|
+
|
|
|
+t_ctx_unsub(_) ->
|
|
|
+ SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_unsubscribed\"">>,
|
|
|
+ Context = #{
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => session_unsubscribed,
|
|
|
+ qos => 1,
|
|
|
+ topic => <<"t/a">>,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+ do_test(SQL, Context, Context).
|
|
|
+
|
|
|
+t_ctx_delivered(_) ->
|
|
|
+ SQL =
|
|
|
+ <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_delivered\"">>,
|
|
|
+ Context = #{
|
|
|
+ clientid => <<"c_emqx_2">>,
|
|
|
+ event_type => message_delivered,
|
|
|
+ from_clientid => <<"c_emqx_1">>,
|
|
|
+ from_username => <<"u_emqx_1">>,
|
|
|
+ payload => <<"{\"msg\": \"hello\"}">>,
|
|
|
+ qos => 1,
|
|
|
+ topic => <<"t/a">>,
|
|
|
+ username => <<"u_emqx_2">>
|
|
|
+ },
|
|
|
+ Expected = check_result([from_clientid, from_username, topic, qos], [node, timestamp], Context),
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_acked(_) ->
|
|
|
+ SQL =
|
|
|
+ <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_acked\"">>,
|
|
|
+
|
|
|
+ Context = #{
|
|
|
+ clientid => <<"c_emqx_2">>,
|
|
|
+ event_type => message_acked,
|
|
|
+ from_clientid => <<"c_emqx_1">>,
|
|
|
+ from_username => <<"u_emqx_1">>,
|
|
|
+ payload => <<"{\"msg\": \"hello\"}">>,
|
|
|
+ qos => 1,
|
|
|
+ topic => <<"t/a">>,
|
|
|
+ username => <<"u_emqx_2">>
|
|
|
+ },
|
|
|
+
|
|
|
+ Expected = with_node_timestampe([from_clientid, from_username, topic, qos], Context),
|
|
|
+
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_droped(_) ->
|
|
|
+ SQL = <<"SELECT reason, topic, qos, node, timestamp FROM \"$events/message_dropped\"">>,
|
|
|
+ Topic = <<"t/a">>,
|
|
|
+ QoS = 1,
|
|
|
+ Reason = <<"no_subscribers">>,
|
|
|
+ Context = #{
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => message_dropped,
|
|
|
+ payload => <<"{\"msg\": \"hello\"}">>,
|
|
|
+ qos => QoS,
|
|
|
+ reason => Reason,
|
|
|
+ topic => Topic,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+
|
|
|
+ Expected = with_node_timestampe([reason, topic, qos], Context),
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_connected(_) ->
|
|
|
+ SQL =
|
|
|
+ <<"SELECT clientid, username, keepalive, is_bridge FROM \"$events/client_connected\"">>,
|
|
|
+
|
|
|
+ Context =
|
|
|
+ #{
|
|
|
+ clean_start => true,
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => client_connected,
|
|
|
+ is_bridge => false,
|
|
|
+ peername => <<"127.0.0.1:52918">>,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+ Expected = check_result([clientid, username, keepalive, is_bridge], [], Context),
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_disconnected(_) ->
|
|
|
+ SQL =
|
|
|
+ <<"SELECT clientid, username, reason, disconnected_at, node FROM \"$events/client_disconnected\"">>,
|
|
|
+
|
|
|
+ Context =
|
|
|
+ #{
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => client_disconnected,
|
|
|
+ reason => <<"normal">>,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+ Expected = check_result([clientid, username, reason], [disconnected_at, node], Context),
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_connack(_) ->
|
|
|
+ SQL =
|
|
|
+ <<"SELECT clientid, username, reason_code, node FROM \"$events/client_connack\"">>,
|
|
|
+
|
|
|
+ Context =
|
|
|
+ #{
|
|
|
+ clean_start => true,
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => client_connack,
|
|
|
+ reason_code => <<"sucess">>,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+ Expected = check_result([clientid, username, reason_code], [node], Context),
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_check_authz_complete(_) ->
|
|
|
+ SQL =
|
|
|
+ <<
|
|
|
+ "SELECT clientid, username, topic, action, result,\n"
|
|
|
+ "authz_source, node FROM \"$events/client_check_authz_complete\""
|
|
|
+ >>,
|
|
|
+
|
|
|
+ Context =
|
|
|
+ #{
|
|
|
+ action => <<"publish">>,
|
|
|
+ clientid => <<"c_emqx">>,
|
|
|
+ event_type => client_check_authz_complete,
|
|
|
+ result => <<"allow">>,
|
|
|
+ topic => <<"t/1">>,
|
|
|
+ username => <<"u_emqx">>
|
|
|
+ },
|
|
|
+ Expected = check_result(
|
|
|
+ [clientid, username, topic, action],
|
|
|
+ [authz_source, node, result],
|
|
|
+ Context
|
|
|
+ ),
|
|
|
+
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+t_ctx_delivery_dropped(_) ->
|
|
|
+ SQL =
|
|
|
+ <<"SELECT from_clientid, from_username, reason, topic, qos FROM \"$events/delivery_dropped\"">>,
|
|
|
+
|
|
|
+ Context =
|
|
|
+ #{
|
|
|
+ clientid => <<"c_emqx_2">>,
|
|
|
+ event_type => delivery_dropped,
|
|
|
+ from_clientid => <<"c_emqx_1">>,
|
|
|
+ from_username => <<"u_emqx_1">>,
|
|
|
+ payload => <<"{\"msg\": \"hello\"}">>,
|
|
|
+ qos => 1,
|
|
|
+ reason => <<"queue_full">>,
|
|
|
+ topic => <<"t/a">>,
|
|
|
+ username => <<"u_emqx_2">>
|
|
|
+ },
|
|
|
+ Expected = check_result([from_clientid, from_username, reason, qos, topic], [], Context),
|
|
|
+ do_test(SQL, Context, Expected).
|
|
|
+
|
|
|
+do_test(SQL, Context, Expected0) ->
|
|
|
+ Res = emqx_rule_engine_api:'/rule_test'(
|
|
|
+ post,
|
|
|
+ test_rule_params(SQL, Context)
|
|
|
+ ),
|
|
|
+ ?assertMatch({200, _}, Res),
|
|
|
+ {200, Result0} = Res,
|
|
|
+ Result = emqx_utils_maps:unsafe_atom_key_map(Result0),
|
|
|
+ case is_function(Expected0) of
|
|
|
+ false ->
|
|
|
+ Expected = maps:without([event_type], Expected0),
|
|
|
+ ?assertMatch(Expected, Result, Expected);
|
|
|
+ _ ->
|
|
|
+ Expected0(Result)
|
|
|
+ end,
|
|
|
+ ok.
|
|
|
+
|
|
|
+test_rule_params(Sql, Context) ->
|
|
|
+ #{
|
|
|
+ body => #{
|
|
|
+ <<"context">> => Context,
|
|
|
+ <<"sql">> => Sql
|
|
|
+ }
|
|
|
+ }.
|
|
|
+
|
|
|
+with_node_timestampe(Keys, Context) ->
|
|
|
+ check_result(Keys, [node, timestamp], Context).
|
|
|
+
|
|
|
+check_result(Keys, Exists, Context) ->
|
|
|
+ Log = fun(Format, Args) ->
|
|
|
+ lists:flatten(io_lib:format(Format, Args))
|
|
|
+ end,
|
|
|
+
|
|
|
+ Base = maps:with(Keys, Context),
|
|
|
+
|
|
|
+ fun(Result) ->
|
|
|
+ maps:foreach(
|
|
|
+ fun(Key, Value) ->
|
|
|
+ ?assertEqual(
|
|
|
+ Value,
|
|
|
+ maps:get(Key, Result, undefined),
|
|
|
+ Log("Key:~p value error~nResult:~p~n", [Key, Result])
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ Base
|
|
|
+ ),
|
|
|
+
|
|
|
+ NotExists = fun(Key) -> Log("Key:~p not exists in result:~p~n", [Key, Result]) end,
|
|
|
+ lists:foreach(
|
|
|
+ fun(Key) ->
|
|
|
+ Find = maps:find(Key, Result),
|
|
|
+ Formatter = NotExists(Key),
|
|
|
+ ?assertMatch({ok, _}, Find, Formatter),
|
|
|
+ ?assertNotMatch({ok, undefined}, Find, Formatter),
|
|
|
+ ?assertNotMatch({ok, <<"undefined">>}, Find, Formatter)
|
|
|
+ end,
|
|
|
+ Exists
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertEqual(erlang:length(Keys) + erlang:length(Exists), maps:size(Result), Result)
|
|
|
+ end.
|