emqx_audit_api_SUITE.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_audit_api_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. all() ->
  21. [
  22. {group, audit, [sequence]}
  23. ].
  24. groups() ->
  25. [
  26. {audit, [sequence], common_tests()}
  27. ].
  28. common_tests() ->
  29. emqx_common_test_helpers:all(?MODULE).
  30. -define(CONF_DEFAULT, #{
  31. node =>
  32. #{
  33. name => "emqx1@127.0.0.1",
  34. cookie => "emqxsecretcookie",
  35. data_dir => "data"
  36. },
  37. log => #{
  38. audit =>
  39. #{
  40. enable => true,
  41. ignore_high_frequency_request => true,
  42. level => info,
  43. max_filter_size => 15,
  44. rotation_count => 2,
  45. rotation_size => "10MB",
  46. time_offset => "system"
  47. }
  48. }
  49. }).
  50. init_per_suite(Config) ->
  51. _ = application:load(emqx_conf),
  52. emqx_config:erase_all(),
  53. emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]),
  54. ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT),
  55. emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
  56. ok = emqx_config_logger:refresh_config(),
  57. application:set_env(emqx, boot_modules, []),
  58. emqx_conf_cli:load(),
  59. Config.
  60. end_per_suite(_) ->
  61. emqx_mgmt_api_test_util:end_suite([emqx_audit, emqx_conf, emqx_ctl]).
  62. t_http_api(_) ->
  63. process_flag(trap_exit, true),
  64. AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
  65. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  66. {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
  67. NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1),
  68. {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
  69. ?assertMatch(#{<<"max_qos_allowed">> := 1}, Res),
  70. {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
  71. ?assertMatch(
  72. #{
  73. <<"data">> := [
  74. #{
  75. <<"from">> := <<"rest_api">>,
  76. <<"operation_id">> := <<"/configs/global_zone">>,
  77. <<"source_ip">> := <<"127.0.0.1">>,
  78. <<"source">> := _,
  79. <<"http_request">> := #{
  80. <<"method">> := <<"put">>,
  81. <<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}},
  82. <<"bindings">> := _,
  83. <<"headers">> := #{<<"authorization">> := <<"******">>}
  84. },
  85. <<"http_status_code">> := 200,
  86. <<"operation_result">> := <<"success">>,
  87. <<"operation_type">> := <<"configs">>
  88. }
  89. ]
  90. },
  91. emqx_utils_json:decode(Res1, [return_maps])
  92. ),
  93. ok.
  94. t_disabled(_) ->
  95. Enable = [log, audit, enable],
  96. ?assertEqual(true, emqx:get_config(Enable)),
  97. AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
  98. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  99. {ok, _} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
  100. Size1 = mnesia:table_info(emqx_audit, size),
  101. {ok, Logs} = emqx_mgmt_api_configs_SUITE:get_config("log"),
  102. Logs1 = emqx_utils_maps:deep_put([<<"audit">>, <<"max_filter_size">>], Logs, 199),
  103. NewLogs = emqx_utils_maps:deep_put([<<"audit">>, <<"enable">>], Logs1, false),
  104. {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", NewLogs),
  105. {ok, GetLog1} = emqx_mgmt_api_configs_SUITE:get_config("log"),
  106. ?assertEqual(NewLogs, GetLog1),
  107. ?assertMatch(
  108. {error, _},
  109. emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader)
  110. ),
  111. Size2 = mnesia:table_info(emqx_audit, size),
  112. %% Record the audit disable action, so the size + 1
  113. ?assertEqual(Size1 + 1, Size2),
  114. {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
  115. NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_topic_levels">>], Zones, 111),
  116. {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
  117. ?assertMatch(#{<<"max_topic_levels">> := 111}, Res),
  118. Size3 = mnesia:table_info(emqx_audit, size),
  119. %% Don't record mqtt update request.
  120. ?assertEqual(Size2, Size3),
  121. %% enabled again
  122. {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", Logs1),
  123. {ok, GetLog2} = emqx_mgmt_api_configs_SUITE:get_config("log"),
  124. ?assertEqual(Logs1, GetLog2),
  125. Size4 = mnesia:table_info(emqx_audit, size),
  126. ?assertEqual(Size3 + 1, Size4),
  127. ok.
  128. t_cli(_Config) ->
  129. Size = mnesia:table_info(emqx_audit, size),
  130. TimeInt = erlang:system_time(microsecond) - 1000,
  131. Time = integer_to_list(TimeInt),
  132. DateStr = calendar:system_time_to_rfc3339(TimeInt, [{unit, microsecond}]),
  133. Date = emqx_http_lib:uri_encode(DateStr),
  134. ok = emqx_ctl:run_command(["conf", "show", "log"]),
  135. AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
  136. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  137. {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
  138. #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
  139. ?assertMatch(
  140. [
  141. #{
  142. <<"from">> := <<"cli">>,
  143. <<"operation_id">> := <<"">>,
  144. <<"source_ip">> := <<"">>,
  145. <<"operation_type">> := <<"conf">>,
  146. <<"args">> := [<<"show">>, <<"log">>],
  147. <<"node">> := _,
  148. <<"source">> := <<"">>,
  149. <<"http_request">> := <<"">>
  150. }
  151. ],
  152. Data
  153. ),
  154. %% check create at is valid
  155. [#{<<"created_at">> := CreateAtRaw}] = Data,
  156. CreateAt = calendar:rfc3339_to_system_time(binary_to_list(CreateAtRaw), [{unit, microsecond}]),
  157. ?assert(CreateAt > TimeInt, CreateAtRaw),
  158. ?assert(CreateAt < TimeInt + 5000000, CreateAtRaw),
  159. %% check cli filter
  160. {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader),
  161. #{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]),
  162. ?assertEqual(Data, Data1),
  163. {ok, Res2} = emqx_mgmt_api_test_util:request_api(
  164. get, AuditPath, "from=erlang_console", AuthHeader
  165. ),
  166. ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
  167. %% check created_at filter microsecond
  168. {ok, Res3} = emqx_mgmt_api_test_util:request_api(
  169. get, AuditPath, "gte_created_at=" ++ Time, AuthHeader
  170. ),
  171. #{<<"data">> := Data3} = emqx_utils_json:decode(Res3, [return_maps]),
  172. ?assertEqual(1, erlang:length(Data3)),
  173. %% check created_at filter rfc3339
  174. {ok, Res31} = emqx_mgmt_api_test_util:request_api(
  175. get, AuditPath, "gte_created_at=" ++ Date, AuthHeader
  176. ),
  177. ?assertEqual(Res3, Res31),
  178. %% check created_at filter millisecond
  179. TimeMs = integer_to_list(TimeInt div 1000),
  180. {ok, Res32} = emqx_mgmt_api_test_util:request_api(
  181. get, AuditPath, "gte_created_at=" ++ TimeMs, AuthHeader
  182. ),
  183. ?assertEqual(Res3, Res32),
  184. %% check created_at filter microsecond
  185. {ok, Res4} = emqx_mgmt_api_test_util:request_api(
  186. get, AuditPath, "lte_created_at=" ++ Time, AuthHeader
  187. ),
  188. #{<<"data">> := Data4} = emqx_utils_json:decode(Res4, [return_maps]),
  189. ?assertEqual(Size, erlang:length(Data4)),
  190. %% check created_at filter rfc3339
  191. {ok, Res41} = emqx_mgmt_api_test_util:request_api(
  192. get, AuditPath, "lte_created_at=" ++ Date, AuthHeader
  193. ),
  194. ?assertEqual(Res4, Res41),
  195. %% check created_at filter millisecond
  196. {ok, Res42} = emqx_mgmt_api_test_util:request_api(
  197. get, AuditPath, "lte_created_at=" ++ TimeMs, AuthHeader
  198. ),
  199. ?assertEqual(Res4, Res42),
  200. %% check duration_ms filter
  201. {ok, Res5} = emqx_mgmt_api_test_util:request_api(
  202. get, AuditPath, "gte_duration_ms=0", AuthHeader
  203. ),
  204. #{<<"data">> := Data5} = emqx_utils_json:decode(Res5, [return_maps]),
  205. ?assertEqual(Size + 1, erlang:length(Data5)),
  206. {ok, Res6} = emqx_mgmt_api_test_util:request_api(
  207. get, AuditPath, "lte_duration_ms=-1", AuthHeader
  208. ),
  209. ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res6, [return_maps])),
  210. ok.
  211. t_max_size(_Config) ->
  212. {ok, _} = emqx:update_config([log, audit, max_filter_size], 999),
  213. %% Make sure this process is using latest max_filter_size.
  214. ?assertEqual(ignore, gen_server:call(emqx_audit, whatever)),
  215. SizeFun =
  216. fun() ->
  217. AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
  218. AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
  219. Limit = "limit=1000",
  220. {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader),
  221. #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
  222. erlang:length(Data)
  223. end,
  224. InitSize = SizeFun(),
  225. lists:foreach(
  226. fun(_) ->
  227. ok = emqx_ctl:run_command(["conf", "show", "log"])
  228. end,
  229. lists:duplicate(100, 1)
  230. ),
  231. _ = mnesia:dump_log(),
  232. LogCount = wait_for_dirty_write_log_done(1500),
  233. Size1 = SizeFun(),
  234. ?assert(Size1 - InitSize >= 100, #{
  235. api => Size1,
  236. init => InitSize,
  237. log_size => LogCount,
  238. config => emqx:get_config([log, audit, max_filter_size])
  239. }),
  240. {ok, _} = emqx:update_config([log, audit, max_filter_size], 10),
  241. %% wait for clean_expired
  242. timer:sleep(250),
  243. ExpectSize = emqx:get_config([log, audit, max_filter_size]),
  244. Size2 = SizeFun(),
  245. ?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}),
  246. ok.
  247. t_kickout_clients_without_log(_) ->
  248. process_flag(trap_exit, true),
  249. AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
  250. {ok, AuditLogs1} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
  251. kickout_clients(),
  252. {ok, AuditLogs2} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
  253. ?assertEqual(AuditLogs1, AuditLogs2),
  254. ok.
  255. kickout_clients() ->
  256. ClientId1 = <<"client1">>,
  257. ClientId2 = <<"client2">>,
  258. ClientId3 = <<"client3">>,
  259. {ok, C1} = emqtt:start_link(#{
  260. clientid => ClientId1,
  261. proto_ver => v5,
  262. properties => #{'Session-Expiry-Interval' => 120}
  263. }),
  264. {ok, _} = emqtt:connect(C1),
  265. {ok, C2} = emqtt:start_link(#{clientid => ClientId2}),
  266. {ok, _} = emqtt:connect(C2),
  267. {ok, C3} = emqtt:start_link(#{clientid => ClientId3}),
  268. {ok, _} = emqtt:connect(C3),
  269. timer:sleep(300),
  270. %% get /clients
  271. ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
  272. {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
  273. ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
  274. ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
  275. ClientsPage = maps:get(<<"page">>, ClientsMeta),
  276. ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
  277. ClientsCount = maps:get(<<"count">>, ClientsMeta),
  278. ?assertEqual(ClientsPage, 1),
  279. ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
  280. ?assertEqual(ClientsCount, 3),
  281. %% kickout clients
  282. KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]),
  283. KickoutBody = [ClientId1, ClientId2, ClientId3],
  284. {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody),
  285. {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
  286. ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
  287. ?assertMatch(#{<<"data">> := []}, ClientsResponse2).
  288. wait_for_dirty_write_log_done(MaxMs) ->
  289. Size = mnesia:table_info(emqx_audit, size),
  290. wait_for_dirty_write_log_done(Size, MaxMs).
  291. wait_for_dirty_write_log_done(Size, RemainMs) when RemainMs =< 0 -> Size;
  292. wait_for_dirty_write_log_done(Prev, RemainMs) ->
  293. SleepMs = 100,
  294. ct:sleep(SleepMs),
  295. case mnesia:table_info(emqx_audit, size) of
  296. Prev ->
  297. ct:sleep(SleepMs * 2),
  298. Prev;
  299. New ->
  300. wait_for_dirty_write_log_done(New, RemainMs - SleepMs)
  301. end.