| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_audit_api_SUITE).
- -compile(export_all).
- -compile(nowarn_export_all).
- -include_lib("eunit/include/eunit.hrl").
- all() ->
- [
- {group, audit, [sequence]}
- ].
- groups() ->
- [
- {audit, [sequence], common_tests()}
- ].
- common_tests() ->
- emqx_common_test_helpers:all(?MODULE).
- -define(CONF_DEFAULT, #{
- node =>
- #{
- name => "emqx1@127.0.0.1",
- cookie => "emqxsecretcookie",
- data_dir => "data"
- },
- log => #{
- audit =>
- #{
- enable => true,
- ignore_high_frequency_request => true,
- level => info,
- max_filter_size => 15,
- rotation_count => 2,
- rotation_size => "10MB",
- time_offset => "system"
- }
- }
- }).
- init_per_suite(Config) ->
- _ = application:load(emqx_conf),
- emqx_config:erase_all(),
- emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]),
- ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT),
- emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
- ok = emqx_config_logger:refresh_config(),
- application:set_env(emqx, boot_modules, []),
- emqx_conf_cli:load(),
- Config.
- end_per_suite(_) ->
- emqx_mgmt_api_test_util:end_suite([emqx_audit, emqx_conf, emqx_ctl]).
- t_http_api(_) ->
- process_flag(trap_exit, true),
- AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
- NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1),
- {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
- ?assertMatch(#{<<"max_qos_allowed">> := 1}, Res),
- {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
- ?assertMatch(
- #{
- <<"data">> := [
- #{
- <<"from">> := <<"rest_api">>,
- <<"operation_id">> := <<"/configs/global_zone">>,
- <<"source_ip">> := <<"127.0.0.1">>,
- <<"source">> := _,
- <<"http_request">> := #{
- <<"method">> := <<"put">>,
- <<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}},
- <<"bindings">> := _,
- <<"headers">> := #{<<"authorization">> := <<"******">>}
- },
- <<"http_status_code">> := 200,
- <<"operation_result">> := <<"success">>,
- <<"operation_type">> := <<"configs">>
- }
- ]
- },
- emqx_utils_json:decode(Res1, [return_maps])
- ),
- ok.
- t_disabled(_) ->
- Enable = [log, audit, enable],
- ?assertEqual(true, emqx:get_config(Enable)),
- AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- {ok, _} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
- Size1 = mnesia:table_info(emqx_audit, size),
- {ok, Logs} = emqx_mgmt_api_configs_SUITE:get_config("log"),
- Logs1 = emqx_utils_maps:deep_put([<<"audit">>, <<"max_filter_size">>], Logs, 199),
- NewLogs = emqx_utils_maps:deep_put([<<"audit">>, <<"enable">>], Logs1, false),
- {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", NewLogs),
- {ok, GetLog1} = emqx_mgmt_api_configs_SUITE:get_config("log"),
- ?assertEqual(NewLogs, GetLog1),
- ?assertMatch(
- {error, _},
- emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader)
- ),
- Size2 = mnesia:table_info(emqx_audit, size),
- %% Record the audit disable action, so the size + 1
- ?assertEqual(Size1 + 1, Size2),
- {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
- NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_topic_levels">>], Zones, 111),
- {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
- ?assertMatch(#{<<"max_topic_levels">> := 111}, Res),
- Size3 = mnesia:table_info(emqx_audit, size),
- %% Don't record mqtt update request.
- ?assertEqual(Size2, Size3),
- %% enabled again
- {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", Logs1),
- {ok, GetLog2} = emqx_mgmt_api_configs_SUITE:get_config("log"),
- ?assertEqual(Logs1, GetLog2),
- Size4 = mnesia:table_info(emqx_audit, size),
- ?assertEqual(Size3 + 1, Size4),
- ok.
- t_cli(_Config) ->
- Size = mnesia:table_info(emqx_audit, size),
- TimeInt = erlang:system_time(microsecond) - 1000,
- Time = integer_to_list(TimeInt),
- DateStr = calendar:system_time_to_rfc3339(TimeInt, [{unit, microsecond}]),
- Date = emqx_http_lib:uri_encode(DateStr),
- ok = emqx_ctl:run_command(["conf", "show", "log"]),
- AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
- #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
- ?assertMatch(
- [
- #{
- <<"from">> := <<"cli">>,
- <<"operation_id">> := <<"">>,
- <<"source_ip">> := <<"">>,
- <<"operation_type">> := <<"conf">>,
- <<"args">> := [<<"show">>, <<"log">>],
- <<"node">> := _,
- <<"source">> := <<"">>,
- <<"http_request">> := <<"">>
- }
- ],
- Data
- ),
- %% check create at is valid
- [#{<<"created_at">> := CreateAtRaw}] = Data,
- CreateAt = calendar:rfc3339_to_system_time(binary_to_list(CreateAtRaw), [{unit, microsecond}]),
- ?assert(CreateAt > TimeInt, CreateAtRaw),
- ?assert(CreateAt < TimeInt + 5000000, CreateAtRaw),
- %% check cli filter
- {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader),
- #{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]),
- ?assertEqual(Data, Data1),
- {ok, Res2} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "from=erlang_console", AuthHeader
- ),
- ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
- %% check created_at filter microsecond
- {ok, Res3} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "gte_created_at=" ++ Time, AuthHeader
- ),
- #{<<"data">> := Data3} = emqx_utils_json:decode(Res3, [return_maps]),
- ?assertEqual(1, erlang:length(Data3)),
- %% check created_at filter rfc3339
- {ok, Res31} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "gte_created_at=" ++ Date, AuthHeader
- ),
- ?assertEqual(Res3, Res31),
- %% check created_at filter millisecond
- TimeMs = integer_to_list(TimeInt div 1000),
- {ok, Res32} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "gte_created_at=" ++ TimeMs, AuthHeader
- ),
- ?assertEqual(Res3, Res32),
- %% check created_at filter microsecond
- {ok, Res4} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "lte_created_at=" ++ Time, AuthHeader
- ),
- #{<<"data">> := Data4} = emqx_utils_json:decode(Res4, [return_maps]),
- ?assertEqual(Size, erlang:length(Data4)),
- %% check created_at filter rfc3339
- {ok, Res41} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "lte_created_at=" ++ Date, AuthHeader
- ),
- ?assertEqual(Res4, Res41),
- %% check created_at filter millisecond
- {ok, Res42} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "lte_created_at=" ++ TimeMs, AuthHeader
- ),
- ?assertEqual(Res4, Res42),
- %% check duration_ms filter
- {ok, Res5} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "gte_duration_ms=0", AuthHeader
- ),
- #{<<"data">> := Data5} = emqx_utils_json:decode(Res5, [return_maps]),
- ?assertEqual(Size + 1, erlang:length(Data5)),
- {ok, Res6} = emqx_mgmt_api_test_util:request_api(
- get, AuditPath, "lte_duration_ms=-1", AuthHeader
- ),
- ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res6, [return_maps])),
- ok.
- t_max_size(_Config) ->
- {ok, _} = emqx:update_config([log, audit, max_filter_size], 999),
- %% Make sure this process is using latest max_filter_size.
- ?assertEqual(ignore, gen_server:call(emqx_audit, whatever)),
- SizeFun =
- fun() ->
- AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
- AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
- Limit = "limit=1000",
- {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader),
- #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
- erlang:length(Data)
- end,
- InitSize = SizeFun(),
- lists:foreach(
- fun(_) ->
- ok = emqx_ctl:run_command(["conf", "show", "log"])
- end,
- lists:duplicate(100, 1)
- ),
- _ = mnesia:dump_log(),
- LogCount = wait_for_dirty_write_log_done(1500),
- Size1 = SizeFun(),
- ?assert(Size1 - InitSize >= 100, #{
- api => Size1,
- init => InitSize,
- log_size => LogCount,
- config => emqx:get_config([log, audit, max_filter_size])
- }),
- {ok, _} = emqx:update_config([log, audit, max_filter_size], 10),
- %% wait for clean_expired
- timer:sleep(250),
- ExpectSize = emqx:get_config([log, audit, max_filter_size]),
- Size2 = SizeFun(),
- ?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}),
- ok.
- t_kickout_clients_without_log(_) ->
- process_flag(trap_exit, true),
- AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
- {ok, AuditLogs1} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
- kickout_clients(),
- {ok, AuditLogs2} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
- ?assertEqual(AuditLogs1, AuditLogs2),
- ok.
- kickout_clients() ->
- ClientId1 = <<"client1">>,
- ClientId2 = <<"client2">>,
- ClientId3 = <<"client3">>,
- {ok, C1} = emqtt:start_link(#{
- clientid => ClientId1,
- proto_ver => v5,
- properties => #{'Session-Expiry-Interval' => 120}
- }),
- {ok, _} = emqtt:connect(C1),
- {ok, C2} = emqtt:start_link(#{clientid => ClientId2}),
- {ok, _} = emqtt:connect(C2),
- {ok, C3} = emqtt:start_link(#{clientid => ClientId3}),
- {ok, _} = emqtt:connect(C3),
- timer:sleep(300),
- %% get /clients
- ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
- {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
- ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
- ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
- ClientsPage = maps:get(<<"page">>, ClientsMeta),
- ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
- ClientsCount = maps:get(<<"count">>, ClientsMeta),
- ?assertEqual(ClientsPage, 1),
- ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
- ?assertEqual(ClientsCount, 3),
- %% kickout clients
- KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]),
- KickoutBody = [ClientId1, ClientId2, ClientId3],
- {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody),
- {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
- ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
- ?assertMatch(#{<<"data">> := []}, ClientsResponse2).
- wait_for_dirty_write_log_done(MaxMs) ->
- Size = mnesia:table_info(emqx_audit, size),
- wait_for_dirty_write_log_done(Size, MaxMs).
- wait_for_dirty_write_log_done(Size, RemainMs) when RemainMs =< 0 -> Size;
- wait_for_dirty_write_log_done(Prev, RemainMs) ->
- SleepMs = 100,
- ct:sleep(SleepMs),
- case mnesia:table_info(emqx_audit, size) of
- Prev ->
- ct:sleep(SleepMs * 2),
- Prev;
- New ->
- wait_for_dirty_write_log_done(New, RemainMs - SleepMs)
- end.
|