Quellcode durchsuchen

feat: added support for auditing API and CLI activity in logs

zhongwencool vor 2 Jahren
Ursprung
Commit
21bb209fb1

+ 19 - 0
apps/emqx/include/logger.hrl

@@ -61,6 +61,25 @@
     )
     )
 end).
 end).
 
 
+-define(AUDIT(_Level_, _Msg_, _Meta_), begin
+    case emqx_config:get([log, audit], #{enable => false}) of
+        #{enable := false} ->
+            ok;
+        #{enable := true, level := _AllowLevel_} ->
+            case logger:compare_levels(_AllowLevel_, _Level_) of
+                _R_ when _R_ == lt; _R_ == eq ->
+                    emqx_trace:log(
+                        _Level_,
+                        [{emqx_audit, fun(L, _) -> L end, undefined, undefined}],
+                        {report, _Msg_},
+                        _Meta_
+                    );
+                gt ->
+                    ok
+            end
+    end
+end).
+
 %% print to 'user' group leader
 %% print to 'user' group leader
 -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
 -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
 -define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)).
 -define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)).

+ 50 - 11
apps/emqx/src/config/emqx_config_logger.erl

@@ -21,8 +21,10 @@
 -export([tr_handlers/1, tr_level/1]).
 -export([tr_handlers/1, tr_level/1]).
 -export([add_handler/0, remove_handler/0, refresh_config/0]).
 -export([add_handler/0, remove_handler/0, refresh_config/0]).
 -export([post_config_update/5]).
 -export([post_config_update/5]).
+-export([filter_audit/2]).
 
 
 -define(LOG, [log]).
 -define(LOG, [log]).
+-define(AUDIT_HANDLER, emqx_audit).
 
 
 add_handler() ->
 add_handler() ->
     ok = emqx_config_handler:add_handler(?LOG, ?MODULE),
     ok = emqx_config_handler:add_handler(?LOG, ?MODULE),
@@ -133,8 +135,8 @@ tr_console_handler(Conf) ->
                 {handler, console, logger_std_h, #{
                 {handler, console, logger_std_h, #{
                     level => conf_get("log.console.level", Conf),
                     level => conf_get("log.console.level", Conf),
                     config => (log_handler_conf(ConsoleConf))#{type => standard_io},
                     config => (log_handler_conf(ConsoleConf))#{type => standard_io},
-                    formatter => log_formatter(ConsoleConf),
-                    filters => log_filter(ConsoleConf)
+                    formatter => log_formatter(console, ConsoleConf),
+                    filters => log_filter(console, ConsoleConf)
                 }}
                 }}
             ];
             ];
         false ->
         false ->
@@ -143,7 +145,9 @@ tr_console_handler(Conf) ->
 
 
 %% For the file logger
 %% For the file logger
 tr_file_handlers(Conf) ->
 tr_file_handlers(Conf) ->
-    Handlers = logger_file_handlers(Conf),
+    Files = logger_file_handlers(Conf),
+    Audits = logger_audit_handler(Conf),
+    Handlers = Audits ++ Files,
     lists:map(fun tr_file_handler/1, Handlers).
     lists:map(fun tr_file_handler/1, Handlers).
 
 
 tr_file_handler({HandlerName, SubConf}) ->
 tr_file_handler({HandlerName, SubConf}) ->
@@ -155,17 +159,25 @@ tr_file_handler({HandlerName, SubConf}) ->
             max_no_files => conf_get("rotation_count", SubConf),
             max_no_files => conf_get("rotation_count", SubConf),
             max_no_bytes => conf_get("rotation_size", SubConf)
             max_no_bytes => conf_get("rotation_size", SubConf)
         },
         },
-        formatter => log_formatter(SubConf),
-        filters => log_filter(SubConf),
+        formatter => log_formatter(HandlerName, SubConf),
+        filters => log_filter(HandlerName, SubConf),
         filesync_repeat_interval => no_repeat
         filesync_repeat_interval => no_repeat
     }}.
     }}.
 
 
+logger_audit_handler(Conf) ->
+    Handlers = [{?AUDIT_HANDLER, conf_get("log.audit", Conf, #{})}],
+    logger_handlers(Handlers).
+
 logger_file_handlers(Conf) ->
 logger_file_handlers(Conf) ->
+    Handlers = maps:to_list(conf_get("log.file", Conf, #{})),
+    logger_handlers(Handlers).
+
+logger_handlers(Handlers) ->
     lists:filter(
     lists:filter(
         fun({_Name, Handler}) ->
         fun({_Name, Handler}) ->
             conf_get("enable", Handler, false)
             conf_get("enable", Handler, false)
         end,
         end,
-        maps:to_list(conf_get("log.file", Conf, #{}))
+        Handlers
     ).
     ).
 
 
 conf_get(Key, Conf) -> emqx_schema:conf_get(Key, Conf).
 conf_get(Key, Conf) -> emqx_schema:conf_get(Key, Conf).
@@ -190,7 +202,7 @@ log_handler_conf(Conf) ->
         burst_limit_window_time => conf_get("window_time", BurstLimit)
         burst_limit_window_time => conf_get("window_time", BurstLimit)
     }.
     }.
 
 
-log_formatter(Conf) ->
+log_formatter(HandlerName, Conf) ->
     CharsLimit =
     CharsLimit =
         case conf_get("chars_limit", Conf) of
         case conf_get("chars_limit", Conf) of
             unlimited -> unlimited;
             unlimited -> unlimited;
@@ -204,17 +216,38 @@ log_formatter(Conf) ->
         end,
         end,
     SingleLine = conf_get("single_line", Conf),
     SingleLine = conf_get("single_line", Conf),
     Depth = conf_get("max_depth", Conf),
     Depth = conf_get("max_depth", Conf),
-    do_formatter(conf_get("formatter", Conf), CharsLimit, SingleLine, TimeOffSet, Depth).
+    do_formatter(
+        HandlerName, conf_get("formatter", Conf), CharsLimit, SingleLine, TimeOffSet, Depth
+    ).
 
 
 %% helpers
 %% helpers
-do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) ->
+do_formatter(?AUDIT_HANDLER, _, CharsLimit, SingleLine, TimeOffSet, Depth) ->
+    {emqx_logger_jsonfmt, #{
+        template => [
+            time,
+            " [",
+            level,
+            "] ",
+            %% http api
+            {method, [code, " ", method, " ", operate_id, " ", username, " "], []},
+            %% cli
+            {cmd, [cmd, " "], []},
+            msg,
+            "\n"
+        ],
+        chars_limit => CharsLimit,
+        single_line => SingleLine,
+        time_offset => TimeOffSet,
+        depth => Depth
+    }};
+do_formatter(_, json, CharsLimit, SingleLine, TimeOffSet, Depth) ->
     {emqx_logger_jsonfmt, #{
     {emqx_logger_jsonfmt, #{
         chars_limit => CharsLimit,
         chars_limit => CharsLimit,
         single_line => SingleLine,
         single_line => SingleLine,
         time_offset => TimeOffSet,
         time_offset => TimeOffSet,
         depth => Depth
         depth => Depth
     }};
     }};
-do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) ->
+do_formatter(_, text, CharsLimit, SingleLine, TimeOffSet, Depth) ->
     {emqx_logger_textfmt, #{
     {emqx_logger_textfmt, #{
         template => [time, " [", level, "] ", msg, "\n"],
         template => [time, " [", level, "] ", msg, "\n"],
         chars_limit => CharsLimit,
         chars_limit => CharsLimit,
@@ -223,12 +256,18 @@ do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) ->
         depth => Depth
         depth => Depth
     }}.
     }}.
 
 
-log_filter(Conf) ->
+%% Don't record all logger message
+%% only use it for ?AUDIT/1
+log_filter(?AUDIT_HANDLER, _Conf) ->
+    [{filter_audit, {fun ?MODULE:filter_audit/2, stop}}];
+log_filter(_, Conf) ->
     case conf_get("supervisor_reports", Conf) of
     case conf_get("supervisor_reports", Conf) of
         error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}];
         error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}];
         progress -> []
         progress -> []
     end.
     end.
 
 
+filter_audit(_, _) -> stop.
+
 tr_level(Conf) ->
 tr_level(Conf) ->
     ConsoleLevel = conf_get("log.console.level", Conf, undefined),
     ConsoleLevel = conf_get("log.console.level", Conf, undefined),
     FileLevels = [conf_get("level", SubConf) || {_, SubConf} <- logger_file_handlers(Conf)],
     FileLevels = [conf_get("level", SubConf) || {_, SubConf} <- logger_file_handlers(Conf)],

+ 2 - 1
apps/emqx/src/emqx_logger_jsonfmt.erl

@@ -74,7 +74,8 @@ format(#{level := Level, msg := Msg, meta := Meta} = Event, Config0) when is_map
     MsgBin = format(Msg, Meta#{level => Level}, Config),
     MsgBin = format(Msg, Meta#{level => Level}, Config),
     logger_formatter:format(Event#{msg => {string, MsgBin}}, Config).
     logger_formatter:format(Event#{msg => {string, MsgBin}}, Config).
 
 
-format(Msg, Meta, Config) ->
+format(Msg, Meta0, Config) ->
+    Meta = maps:without([time, level], Meta0),
     Data0 =
     Data0 =
         try maybe_format_msg(Msg, Meta, Config) of
         try maybe_format_msg(Msg, Meta, Config) of
             Map when is_map(Map) ->
             Map when is_map(Map) ->

+ 6 - 2
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -27,7 +27,8 @@
     publish/1,
     publish/1,
     subscribe/3,
     subscribe/3,
     unsubscribe/2,
     unsubscribe/2,
-    log/3
+    log/3,
+    log/4
 ]).
 ]).
 
 
 -export([
 -export([
@@ -83,7 +84,10 @@ unsubscribe(Topic, SubOpts) ->
     ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
     ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
 
 
 log(List, Msg, Meta) ->
 log(List, Msg, Meta) ->
-    Log = #{level => debug, meta => enrich_meta(Meta), msg => Msg},
+    log(debug, List, Msg, Meta).
+
+log(Level, List, Msg, Meta) ->
+    Log = #{level => Level, meta => enrich_meta(Meta), msg => Msg},
     log_filter(List, Log).
     log_filter(List, Log).
 
 
 enrich_meta(Meta) ->
 enrich_meta(Meta) ->

+ 10 - 2
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -23,6 +23,7 @@
     load/0,
     load/0,
     admins/1,
     admins/1,
     conf/1,
     conf/1,
+    audit/3,
     unload/0
     unload/0
 ]).
 ]).
 
 
@@ -33,15 +34,19 @@
 %% kept cluster_call for compatibility
 %% kept cluster_call for compatibility
 -define(CLUSTER_CALL, cluster_call).
 -define(CLUSTER_CALL, cluster_call).
 -define(CONF, conf).
 -define(CONF, conf).
+-define(AUDIT_MOD, audit).
 -define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited").
 -define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited").
 
 
 load() ->
 load() ->
     emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
     emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
-    emqx_ctl:register_command(?CONF, {?MODULE, conf}, []).
+    emqx_ctl:register_command(?CONF, {?MODULE, conf}, []),
+    emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]),
+    ok.
 
 
 unload() ->
 unload() ->
     emqx_ctl:unregister_command(?CLUSTER_CALL),
     emqx_ctl:unregister_command(?CLUSTER_CALL),
-    emqx_ctl:unregister_command(?CONF).
+    emqx_ctl:unregister_command(?CONF),
+    emqx_ctl:unregister_command(?AUDIT_MOD).
 
 
 conf(["show_keys" | _]) ->
 conf(["show_keys" | _]) ->
     print_keys(keys());
     print_keys(keys());
@@ -102,6 +107,9 @@ admins(["fast_forward", Node0, ToTnxId]) ->
 admins(_) ->
 admins(_) ->
     emqx_ctl:usage(usage_sync()).
     emqx_ctl:usage(usage_sync()).
 
 
+audit(Level, From, Log) ->
+    ?AUDIT(Level, From, Log#{time => logger:timestamp()}).
+
 usage_conf() ->
 usage_conf() ->
     [
     [
         {"conf reload --replace|--merge", "reload etc/emqx.conf on local node"},
         {"conf reload --replace|--merge", "reload etc/emqx.conf on local node"},

+ 61 - 6
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -952,10 +952,19 @@ fields("log") ->
                     aliases => [file_handlers],
                     aliases => [file_handlers],
                     importance => ?IMPORTANCE_HIGH
                     importance => ?IMPORTANCE_HIGH
                 }
                 }
+            )},
+        {"audit",
+            sc(
+                ?R_REF("log_audit_handler"),
+                #{
+                    desc => ?DESC("log_audit_handler"),
+                    importance => ?IMPORTANCE_HIGH,
+                    default => #{<<"enable">> => true, <<"level">> => <<"info">>}
+                }
             )}
             )}
     ];
     ];
 fields("console_handler") ->
 fields("console_handler") ->
-    log_handler_common_confs(console);
+    log_handler_common_confs(console, #{});
 fields("log_file_handler") ->
 fields("log_file_handler") ->
     [
     [
         {"path",
         {"path",
@@ -992,7 +1001,50 @@ fields("log_file_handler") ->
                     importance => ?IMPORTANCE_MEDIUM
                     importance => ?IMPORTANCE_MEDIUM
                 }
                 }
             )}
             )}
-    ] ++ log_handler_common_confs(file);
+    ] ++ log_handler_common_confs(file, #{});
+fields("log_audit_handler") ->
+    [
+        {"path",
+            sc(
+                file(),
+                #{
+                    desc => ?DESC("audit_file_handler_path"),
+                    default => <<"${EMQX_LOG_DIR}/audit.log">>,
+                    importance => ?IMPORTANCE_HIGH,
+                    converter => fun(Path, Opts) ->
+                        emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts))
+                    end
+                }
+            )},
+        {"rotation_count",
+            sc(
+                range(1, 128),
+                #{
+                    default => 10,
+                    converter => fun convert_rotation/2,
+                    desc => ?DESC("log_rotation_count"),
+                    importance => ?IMPORTANCE_MEDIUM
+                }
+            )},
+        {"rotation_size",
+            sc(
+                hoconsc:union([infinity, emqx_schema:bytesize()]),
+                #{
+                    default => <<"50MB">>,
+                    desc => ?DESC("log_file_handler_max_size"),
+                    importance => ?IMPORTANCE_MEDIUM
+                }
+            )}
+    ] ++
+        %% Only support json
+        lists:keydelete(
+            "formatter",
+            1,
+            log_handler_common_confs(
+                file,
+                #{level => info, level_desc => "audit_handler_level"}
+            )
+        );
 fields("log_overload_kill") ->
 fields("log_overload_kill") ->
     [
     [
         {"enable",
         {"enable",
@@ -1083,6 +1135,8 @@ desc("console_handler") ->
     ?DESC("desc_console_handler");
     ?DESC("desc_console_handler");
 desc("log_file_handler") ->
 desc("log_file_handler") ->
     ?DESC("desc_log_file_handler");
     ?DESC("desc_log_file_handler");
+desc("log_audit_handler") ->
+    ?DESC("desc_audit_log_handler");
 desc("log_rotation") ->
 desc("log_rotation") ->
     ?DESC("desc_log_rotation");
     ?DESC("desc_log_rotation");
 desc("log_overload_kill") ->
 desc("log_overload_kill") ->
@@ -1204,7 +1258,7 @@ tr_cluster_discovery(Conf) ->
     Strategy = conf_get("cluster.discovery_strategy", Conf),
     Strategy = conf_get("cluster.discovery_strategy", Conf),
     {Strategy, filter(cluster_options(Strategy, Conf))}.
     {Strategy, filter(cluster_options(Strategy, Conf))}.
 
 
-log_handler_common_confs(Handler) ->
+log_handler_common_confs(Handler, Default) ->
     %% we rarely support dynamic defaults like this
     %% we rarely support dynamic defaults like this
     %% for this one, we have build-time default the same as runtime default
     %% for this one, we have build-time default the same as runtime default
     %% so it's less tricky
     %% so it's less tricky
@@ -1215,13 +1269,14 @@ log_handler_common_confs(Handler) ->
         end,
         end,
     EnvValue = os:getenv("EMQX_DEFAULT_LOG_HANDLER"),
     EnvValue = os:getenv("EMQX_DEFAULT_LOG_HANDLER"),
     Enable = lists:member(EnvValue, EnableValues),
     Enable = lists:member(EnvValue, EnableValues),
+    LevelDesc = maps:get(level_desc, Default, "common_handler_level"),
     [
     [
         {"level",
         {"level",
             sc(
             sc(
                 log_level(),
                 log_level(),
                 #{
                 #{
-                    default => warning,
-                    desc => ?DESC("common_handler_level"),
+                    default => maps:get(level, Default, warning),
+                    desc => ?DESC(LevelDesc),
                     importance => ?IMPORTANCE_HIGH
                     importance => ?IMPORTANCE_HIGH
                 }
                 }
             )},
             )},
@@ -1238,7 +1293,7 @@ log_handler_common_confs(Handler) ->
             sc(
             sc(
                 hoconsc:enum([text, json]),
                 hoconsc:enum([text, json]),
                 #{
                 #{
-                    default => text,
+                    default => maps:get(formatter, Default, text),
                     desc => ?DESC("common_handler_formatter"),
                     desc => ?DESC("common_handler_formatter"),
                     importance => ?IMPORTANCE_MEDIUM
                     importance => ?IMPORTANCE_MEDIUM
                 }
                 }

+ 10 - 1
apps/emqx_conf/test/emqx_conf_logger_SUITE.erl

@@ -78,7 +78,16 @@ t_log_conf(_Conf) ->
                 <<"time_offset">> => <<"system">>
                 <<"time_offset">> => <<"system">>
             },
             },
         <<"file">> =>
         <<"file">> =>
-            #{<<"default">> => FileExpect}
+            #{<<"default">> => FileExpect},
+        <<"audit">> =>
+            #{
+                <<"enable">> => true,
+                <<"level">> => <<"info">>,
+                <<"path">> => <<"log/audit.log">>,
+                <<"rotation_count">> => 10,
+                <<"rotation_size">> => <<"50MB">>,
+                <<"time_offset">> => <<"system">>
+            }
     },
     },
     ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])),
     ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])),
     UpdateLog0 = emqx_utils_maps:deep_remove([<<"file">>, <<"default">>], ExpectLog1),
     UpdateLog0 = emqx_utils_maps:deep_remove([<<"file">>, <<"default">>], ExpectLog1),

+ 20 - 2
apps/emqx_conf/test/emqx_conf_schema_tests.erl

@@ -164,7 +164,8 @@ validate_log(Conf) ->
     ?assertEqual(silent, proplists:get_value(error_logger, Kernel)),
     ?assertEqual(silent, proplists:get_value(error_logger, Kernel)),
     ?assertEqual(debug, proplists:get_value(logger_level, Kernel)),
     ?assertEqual(debug, proplists:get_value(logger_level, Kernel)),
     Loggers = proplists:get_value(logger, Kernel),
     Loggers = proplists:get_value(logger, Kernel),
-    FileHandler = lists:keyfind(logger_disk_log_h, 3, Loggers),
+    FileHandlers = lists:filter(fun(L) -> element(3, L) =:= logger_disk_log_h end, Loggers),
+    FileHandler = lists:keyfind(default, 2, FileHandlers),
     ?assertEqual(
     ?assertEqual(
         {handler, default, logger_disk_log_h, #{
         {handler, default, logger_disk_log_h, #{
             config => ?LOG_CONFIG#{
             config => ?LOG_CONFIG#{
@@ -180,6 +181,23 @@ validate_log(Conf) ->
         }},
         }},
         FileHandler
         FileHandler
     ),
     ),
+    AuditHandler = lists:keyfind(emqx_audit, 2, FileHandlers),
+    %% default is enable and log level is info.
+    ?assertMatch(
+        {handler, emqx_audit, logger_disk_log_h, #{
+            config := #{
+                type := wrap,
+                file := "log/audit.log",
+                max_no_bytes := _,
+                max_no_files := _
+            },
+            filesync_repeat_interval := no_repeat,
+            filters := [{filter_audit, {_, stop}}],
+            formatter := _,
+            level := info
+        }},
+        AuditHandler
+    ),
     ConsoleHandler = lists:keyfind(logger_std_h, 3, Loggers),
     ConsoleHandler = lists:keyfind(logger_std_h, 3, Loggers),
     ?assertEqual(
     ?assertEqual(
         {handler, console, logger_std_h, #{
         {handler, console, logger_std_h, #{
@@ -244,7 +262,7 @@ log_rotation_count_limit_test() ->
             {handler, default, logger_disk_log_h, #{
             {handler, default, logger_disk_log_h, #{
                 config := #{max_no_files := Count}
                 config := #{max_no_files := Count}
             }},
             }},
-            lists:keyfind(logger_disk_log_h, 3, Loggers)
+            lists:keyfind(default, 2, Loggers)
         )
         )
                   end,
                   end,
         [{to_bin(Format, [1]), 1}, {to_bin(Format, [128]), 128}]),
         [{to_bin(Format, [1]), 1}, {to_bin(Format, [128]), 128}]),

+ 53 - 18
apps/emqx_ctl/src/emqx_ctl.erl

@@ -116,25 +116,35 @@ run_command([Cmd | Args]) ->
 run_command(help, []) ->
 run_command(help, []) ->
     help();
     help();
 run_command(Cmd, Args) when is_atom(Cmd) ->
 run_command(Cmd, Args) when is_atom(Cmd) ->
-    case lookup_command(Cmd) of
-        [{Mod, Fun}] ->
-            try
-                apply(Mod, Fun, [Args])
-            catch
-                _:Reason:Stacktrace ->
-                    ?LOG_ERROR(#{
-                        msg => "ctl_command_crashed",
-                        stacktrace => Stacktrace,
-                        reason => Reason
-                    }),
-                    {error, Reason}
-            end;
-        Error ->
-            help(),
-            Error
-    end.
+    Start = erlang:monotonic_time(),
+    Result =
+        case lookup_command(Cmd) of
+            [{Mod, Fun}] ->
+                try
+                    apply(Mod, Fun, [Args])
+                catch
+                    _:Reason:Stacktrace ->
+                        ?LOG_ERROR(#{
+                            msg => "ctl_command_crashed",
+                            stacktrace => Stacktrace,
+                            reason => Reason
+                        }),
+                        {error, Reason}
+                end;
+            Error ->
+                help(),
+                Error
+        end,
+    Duration = erlang:convert_time_unit(erlang:monotonic_time() - Start, native, millisecond),
+
+    audit_log(
+        audit_level(Result, Duration),
+        "from_cli",
+        #{duration_ms => Duration, result => Result, cmd => Cmd, args => Args, node => node()}
+    ),
+    Result.
 
 
--spec lookup_command(cmd()) -> [{module(), atom()}].
+-spec lookup_command(cmd()) -> [{module(), atom()}] | {error, any()}.
 lookup_command(Cmd) when is_atom(Cmd) ->
 lookup_command(Cmd) when is_atom(Cmd) ->
     case is_initialized() of
     case is_initialized() of
         true ->
         true ->
@@ -304,3 +314,28 @@ safe_to_existing_atom(Str) ->
 
 
 is_initialized() ->
 is_initialized() ->
     ets:info(?CMD_TAB) =/= undefined.
     ets:info(?CMD_TAB) =/= undefined.
+
+audit_log(Level, From, Log) ->
+    case lookup_command(audit) of
+        {error, _} ->
+            ignore;
+        [{Mod, Fun}] ->
+            try
+                apply(Mod, Fun, [Level, From, Log])
+            catch
+                _:Reason:Stacktrace ->
+                    ?LOG_ERROR(#{
+                        msg => "ctl_command_crashed",
+                        stacktrace => Stacktrace,
+                        reason => Reason
+                    })
+            end
+    end.
+
+-define(TOO_SLOW, 3000).
+
+audit_level(ok, Duration) when Duration >= ?TOO_SLOW -> warning;
+audit_level({ok, _}, Duration) when Duration >= ?TOO_SLOW -> warning;
+audit_level(ok, _Duration) -> info;
+audit_level({ok, _}, _Duration) -> info;
+audit_level(_, _) -> error.

+ 3 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -5,7 +5,9 @@
     {vsn, "5.0.27"},
     {vsn, "5.0.27"},
     {modules, []},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
     {registered, [emqx_dashboard_sup]},
-    {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},
+    {applications, [
+        kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http
+    ]},
     {mod, {emqx_dashboard_app, []}},
     {mod, {emqx_dashboard_app, []}},
     {env, []},
     {env, []},
     {licenses, ["Apache-2.0"]},
     {licenses, ["Apache-2.0"]},

+ 17 - 7
apps/emqx_dashboard/src/emqx_dashboard.erl

@@ -72,6 +72,7 @@ start_listeners(Listeners) ->
         base_path => emqx_dashboard_swagger:base_path(),
         base_path => emqx_dashboard_swagger:base_path(),
         modules => minirest_api:find_api_modules(apps()),
         modules => minirest_api:find_api_modules(apps()),
         authorization => Authorization,
         authorization => Authorization,
+        log => fun emqx_dashboard_audit:log/1,
         security => [#{'basicAuth' => []}, #{'bearerAuth' => []}],
         security => [#{'basicAuth' => []}, #{'bearerAuth' => []}],
         swagger_global_spec => GlobalSpec,
         swagger_global_spec => GlobalSpec,
         dispatch => dispatch(),
         dispatch => dispatch(),
@@ -189,10 +190,19 @@ ranch_opts(Options) ->
         end,
         end,
     RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
     RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
 
 
-proto_opts(#{proxy_header := ProxyHeader}) ->
-    #{proxy_header => ProxyHeader};
-proto_opts(_Opts) ->
-    #{}.
+init_proto_opts() ->
+    %% cowboy_stream_h is required by default
+    %% will integrate cowboy_telemetry_h when OTEL trace is ready
+    #{stream_handlers => [cowboy_stream_h]}.
+
+proto_opts(Opts) ->
+    Init = init_proto_opts(),
+    proxy_header_opt(Init, Opts).
+
+proxy_header_opt(Init, #{proxy_header := ProxyHeader}) ->
+    Init#{proxy_header => ProxyHeader};
+proxy_header_opt(Init, _Opts) ->
+    Init.
 
 
 filter_false(_K, false, S) -> S;
 filter_false(_K, false, S) -> S;
 filter_false(K, V, S) -> [{K, V} | S].
 filter_false(K, V, S) -> [{K, V} | S].
@@ -206,8 +216,8 @@ authorize(Req) ->
             api_key_authorize(Req, Username, Password);
             api_key_authorize(Req, Username, Password);
         {bearer, Token} ->
         {bearer, Token} ->
             case emqx_dashboard_admin:verify_token(Req, Token) of
             case emqx_dashboard_admin:verify_token(Req, Token) of
-                ok ->
-                    ok;
+                {ok, Username} ->
+                    {ok, #{auth_type => jwt_token, username => Username}};
                 {error, token_timeout} ->
                 {error, token_timeout} ->
                     {401, 'TOKEN_TIME_OUT', <<"Token expired, get new token by POST /login">>};
                     {401, 'TOKEN_TIME_OUT', <<"Token expired, get new token by POST /login">>};
                 {error, not_found} ->
                 {error, not_found} ->
@@ -257,7 +267,7 @@ api_key_authorize(Req, Key, Secret) ->
     Path = cowboy_req:path(Req),
     Path = cowboy_req:path(Req),
     case emqx_mgmt_auth:authorize(Path, Key, Secret) of
     case emqx_mgmt_auth:authorize(Path, Key, Secret) of
         ok ->
         ok ->
-            ok;
+            {ok, #{auth_type => api_key, api_key => Key}};
         {error, <<"not_allowed">>} ->
         {error, <<"not_allowed">>} ->
             return_unauthorized(
             return_unauthorized(
                 ?BAD_API_KEY_OR_SECRET,
                 ?BAD_API_KEY_OR_SECRET,

+ 1 - 0
apps/emqx_dashboard/src/emqx_dashboard_app.erl

@@ -31,6 +31,7 @@ start(_StartType, _StartArgs) ->
     case emqx_dashboard:start_listeners() of
     case emqx_dashboard:start_listeners() of
         ok ->
         ok ->
             emqx_dashboard_cli:load(),
             emqx_dashboard_cli:load(),
+            %emqx_dashboard_log:setup(),
             {ok, _} = emqx_dashboard_admin:add_default_user(),
             {ok, _} = emqx_dashboard_admin:add_default_user(),
             {ok, Sup};
             {ok, Sup};
         {error, Reason} ->
         {error, Reason} ->

+ 51 - 0
apps/emqx_dashboard/src/emqx_dashboard_audit.erl

@@ -0,0 +1,51 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_dashboard_audit).
+
+-include_lib("emqx/include/logger.hrl").
+%% API
+-export([log/1]).
+
+log(Meta0) ->
+    #{req_start := ReqStart, req_end := ReqEnd, code := Code, method := Method} = Meta0,
+    Duration = erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond),
+    Level = level(Method, Code, Duration),
+    Username = maps:get(username, Meta0, <<"">>),
+    Meta1 = maps:without([req_start, req_end], Meta0),
+    Meta2 = Meta1#{time => logger:timestamp(), duration_ms => Duration},
+    Meta = emqx_utils:redact(Meta2),
+    ?AUDIT(
+        Level,
+        "from_api",
+        Meta#{
+            from => from(maps:get(auth_type, Meta0, "")),
+            username => binary_to_list(Username),
+            node => node()
+        }
+    ),
+    ok.
+
+from(jwt_token) -> "dashboard";
+from(api_key) -> "aip_key";
+from(_) -> "unauthorized".
+
+level(_, _Code, Duration) when Duration > 3000 -> warning;
+level(get, Code, _) when Code >= 200 andalso Code < 300 -> debug;
+level(_, Code, _) when Code >= 200 andalso Code < 300 -> info;
+level(_, Code, _) when Code >= 300 andalso Code < 400 -> warning;
+level(_, Code, _) when Code >= 400 andalso Code < 500 -> error;
+level(_, _, _) -> critical.

+ 3 - 3
apps/emqx_dashboard/src/emqx_dashboard_token.erl

@@ -62,7 +62,7 @@ sign(User, Password) ->
 
 
 -spec verify(_, Token :: binary()) ->
 -spec verify(_, Token :: binary()) ->
     Result ::
     Result ::
-        ok
+        {ok, binary()}
         | {error, token_timeout | not_found | unauthorized_role}.
         | {error, token_timeout | not_found | unauthorized_role}.
 verify(Req, Token) ->
 verify(Req, Token) ->
     do_verify(Req, Token).
     do_verify(Req, Token).
@@ -124,7 +124,7 @@ do_sign(#?ADMIN{username = Username} = User, Password) ->
 
 
 do_verify(Req, Token) ->
 do_verify(Req, Token) ->
     case lookup(Token) of
     case lookup(Token) of
-        {ok, JWT = #?ADMIN_JWT{exptime = ExpTime, extra = Extra}} ->
+        {ok, JWT = #?ADMIN_JWT{exptime = ExpTime, extra = Extra, username = Username}} ->
             case ExpTime > erlang:system_time(millisecond) of
             case ExpTime > erlang:system_time(millisecond) of
                 true ->
                 true ->
                     case check_rbac(Req, Extra) of
                     case check_rbac(Req, Extra) of
@@ -135,7 +135,7 @@ do_verify(Req, Token) ->
                                 fun mnesia:write/1,
                                 fun mnesia:write/1,
                                 [NewJWT]
                                 [NewJWT]
                             ),
                             ),
-                            Res;
+                            {Res, Username};
                         _ ->
                         _ ->
                             {error, unauthorized_role}
                             {error, unauthorized_role}
                     end;
                     end;

+ 2 - 2
apps/emqx_dashboard/test/emqx_dashboard_admin_SUITE.erl

@@ -176,14 +176,14 @@ t_clean_token(_) ->
     {ok, _} = emqx_dashboard_admin:add_user(Username, Password, ?ROLE_SUPERUSER, <<"desc">>),
     {ok, _} = emqx_dashboard_admin:add_user(Username, Password, ?ROLE_SUPERUSER, <<"desc">>),
     {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
     {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
     FakeReq = #{method => <<"GET">>},
     FakeReq = #{method => <<"GET">>},
-    ok = emqx_dashboard_admin:verify_token(FakeReq, Token),
+    {ok, Username} = emqx_dashboard_admin:verify_token(FakeReq, Token),
     %% change password
     %% change password
     {ok, _} = emqx_dashboard_admin:change_password(Username, Password, NewPassword),
     {ok, _} = emqx_dashboard_admin:change_password(Username, Password, NewPassword),
     timer:sleep(5),
     timer:sleep(5),
     {error, not_found} = emqx_dashboard_admin:verify_token(FakeReq, Token),
     {error, not_found} = emqx_dashboard_admin:verify_token(FakeReq, Token),
     %% remove user
     %% remove user
     {ok, Token2} = emqx_dashboard_admin:sign_token(Username, NewPassword),
     {ok, Token2} = emqx_dashboard_admin:sign_token(Username, NewPassword),
-    ok = emqx_dashboard_admin:verify_token(FakeReq, Token2),
+    {ok, Username} = emqx_dashboard_admin:verify_token(FakeReq, Token2),
     {ok, _} = emqx_dashboard_admin:remove_user(Username),
     {ok, _} = emqx_dashboard_admin:remove_user(Username),
     timer:sleep(5),
     timer:sleep(5),
     {error, not_found} = emqx_dashboard_admin:verify_token(FakeReq, Token2),
     {error, not_found} = emqx_dashboard_admin:verify_token(FakeReq, Token2),

+ 2 - 2
apps/emqx_dashboard_rbac/test/emqx_dashboard_rbac_SUITE.erl

@@ -137,11 +137,11 @@ t_clean_token(_) ->
     {ok, _} = emqx_dashboard_admin:add_user(Username, Password, ?ROLE_SUPERUSER, Desc),
     {ok, _} = emqx_dashboard_admin:add_user(Username, Password, ?ROLE_SUPERUSER, Desc),
     {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
     {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
     FakeReq = #{method => <<"GET">>},
     FakeReq = #{method => <<"GET">>},
-    ok = emqx_dashboard_admin:verify_token(FakeReq, Token),
+    {ok, Username} = emqx_dashboard_admin:verify_token(FakeReq, Token),
     %% change description
     %% change description
     {ok, _} = emqx_dashboard_admin:update_user(Username, ?ROLE_SUPERUSER, NewDesc),
     {ok, _} = emqx_dashboard_admin:update_user(Username, ?ROLE_SUPERUSER, NewDesc),
     timer:sleep(5),
     timer:sleep(5),
-    ok = emqx_dashboard_admin:verify_token(FakeReq, Token),
+    {ok, Username} = emqx_dashboard_admin:verify_token(FakeReq, Token),
     %% change role
     %% change role
     {ok, _} = emqx_dashboard_admin:update_user(Username, ?ROLE_VIEWER, NewDesc),
     {ok, _} = emqx_dashboard_admin:update_user(Username, ?ROLE_VIEWER, NewDesc),
     timer:sleep(5),
     timer:sleep(5),

+ 2 - 0
apps/emqx_machine/src/emqx_machine_app.erl

@@ -16,6 +16,8 @@
 
 
 -module(emqx_machine_app).
 -module(emqx_machine_app).
 
 
+-include_lib("emqx/include/logger.hrl").
+
 -export([
 -export([
     start/2,
     start/2,
     stop/1
     stop/1

+ 1 - 0
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -47,6 +47,7 @@ post_boot() ->
     ok = ensure_apps_started(),
     ok = ensure_apps_started(),
     ok = print_vsn(),
     ok = print_vsn(),
     ok = start_autocluster(),
     ok = start_autocluster(),
+    ?AUDIT(alert, "from_cli", #{time => logger:timestamp(), event => "emqx_start"}),
     ignore.
     ignore.
 
 
 -ifdef(TEST).
 -ifdef(TEST).

+ 3 - 0
apps/emqx_machine/src/emqx_machine_terminator.erl

@@ -67,6 +67,9 @@ graceful() ->
 
 
 %% @doc Shutdown the Erlang VM and wait indefinitely.
 %% @doc Shutdown the Erlang VM and wait indefinitely.
 graceful_wait() ->
 graceful_wait() ->
+    ?AUDIT(alert, "from_cli", #{
+        time => logger:timestamp(), msg => "run_emqx_stop_to_grace_shutdown"
+    }),
     ok = graceful(),
     ok = graceful(),
     exit_loop().
     exit_loop().
 
 

+ 15 - 4
apps/emqx_machine/src/emqx_restricted_shell.erl

@@ -65,7 +65,7 @@ check_allowed(MF, NotAllowed) ->
     case {lists:member(MF, NotAllowed), is_locked()} of
     case {lists:member(MF, NotAllowed), is_locked()} of
         {true, false} -> exempted;
         {true, false} -> exempted;
         {true, true} -> prohibited;
         {true, true} -> prohibited;
-        {false, _} -> ignore
+        {false, _} -> ok
     end.
     end.
 
 
 is_allowed(prohibited) -> false;
 is_allowed(prohibited) -> false;
@@ -109,15 +109,26 @@ max_heap_size_warning(MF, Args) ->
             })
             })
     end.
     end.
 
 
-log(prohibited, MF, Args) ->
+log(_, {?MODULE, prompt_func}, [[{history, _}]]) ->
+    ok;
+log(IsAllow, MF, Args) ->
+    ?AUDIT(warning, "from_remote_console", #{
+        time => logger:timestamp(),
+        function => MF,
+        args => Args,
+        permission => IsAllow
+    }),
+    to_console(IsAllow, MF, Args).
+
+to_console(prohibited, MF, Args) ->
     warning("DANGEROUS FUNCTION: FORBIDDEN IN SHELL!!!!!", []),
     warning("DANGEROUS FUNCTION: FORBIDDEN IN SHELL!!!!!", []),
     ?SLOG(error, #{msg => "execute_function_in_shell_prohibited", function => MF, args => Args});
     ?SLOG(error, #{msg => "execute_function_in_shell_prohibited", function => MF, args => Args});
-log(exempted, MF, Args) ->
+to_console(exempted, MF, Args) ->
     limit_warning(MF, Args),
     limit_warning(MF, Args),
     ?SLOG(error, #{
     ?SLOG(error, #{
         msg => "execute_dangerous_function_in_shell_exempted", function => MF, args => Args
         msg => "execute_dangerous_function_in_shell_exempted", function => MF, args => Args
     });
     });
-log(ignore, MF, Args) ->
+to_console(ok, MF, Args) ->
     limit_warning(MF, Args).
     limit_warning(MF, Args).
 
 
 warning(Format, Args) ->
 warning(Format, Args) ->

+ 1 - 1
apps/emqx_telemetry/test/emqx_telemetry_api_SUITE.erl

@@ -48,7 +48,7 @@ end_per_suite(_Config) ->
         }
         }
     ),
     ),
     emqx_mgmt_api_test_util:end_suite([
     emqx_mgmt_api_test_util:end_suite([
-        emqx_conf, emqx_authn, emqx_authz, emqx_telemetry
+        emqx_conf, emqx_authn, emqx_management, emqx_authz, emqx_telemetry
     ]),
     ]),
     ok.
     ok.
 
 

+ 1 - 1
mix.exs

@@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do
       {:ekka, github: "emqx/ekka", tag: "0.15.13", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.15.13", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "3.1.0", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "3.1.0", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
-      {:minirest, github: "emqx/minirest", tag: "1.3.12", override: true},
+      {:minirest, github: "emqx/minirest", tag: "1.3.13", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
       {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
       {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},

+ 1 - 1
rebar.config

@@ -65,7 +65,7 @@
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.13"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.13"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
-    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.12"}}}
+    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.13"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}

+ 30 - 0
rel/i18n/emqx_conf_schema.hocon

@@ -146,6 +146,12 @@ desc_log_file_handler.desc:
 desc_log_file_handler.label:
 desc_log_file_handler.label:
 """Files Log Handler"""
 """Files Log Handler"""
 
 
+desc_audit_log_handler.desc:
+"""Audit log handler that prints log events to files."""
+
+desc_audit_log_handler.label:
+"""Audit Log Handler"""
+
 rpc_socket_keepalive_count.desc:
 rpc_socket_keepalive_count.desc:
 """How many times the keepalive probe message can fail to receive a reply
 """How many times the keepalive probe message can fail to receive a reply
 until the RPC connection is considered lost."""
 until the RPC connection is considered lost."""
@@ -395,6 +401,12 @@ log_file_handler_file.desc:
 log_file_handler_file.label:
 log_file_handler_file.label:
 """Log File Name"""
 """Log File Name"""
 
 
+audit_file_handler_path.desc:
+"""Name the audit log file."""
+
+audit_file_handler_path.label:
+"""Audit Log File Name"""
+
 node_dist_net_ticktime.desc:
 node_dist_net_ticktime.desc:
 """This is the approximate time an EMQX node may be unresponsive until it is considered down and thereby disconnected."""
 """This is the approximate time an EMQX node may be unresponsive until it is considered down and thereby disconnected."""
 
 
@@ -630,6 +642,12 @@ log_file_handlers.desc:
 log_file_handlers.label:
 log_file_handlers.label:
 """File Handler"""
 """File Handler"""
 
 
+log_audit_handler.desc:
+"""Audit file-based log handler."""
+
+log_audit_handler.label:
+"""Audit log Handler"""
+
 node_global_gc_interval.desc:
 node_global_gc_interval.desc:
 """Periodic garbage collection interval. Set to <code>disabled</code> to have it disabled."""
 """Periodic garbage collection interval. Set to <code>disabled</code> to have it disabled."""
 
 
@@ -678,6 +696,18 @@ Defaults to warning."""
 common_handler_level.label:
 common_handler_level.label:
 """Log Level"""
 """Log Level"""
 
 
+audit_handler_level.desc:
+"""The log level for the audit log handler.<br/>
+- Requests that take longer than 3 seconds to process are logged as <code>warning</code> logs.<br/>
+- GET requests with HTTP status codes between 200-300 are logged as <code>debug</code> logs.<br/>
+- Non-GET Requests with HTTP status codes between 200-300 are logged as <code>info</code> logs.<br/>
+- Requests with HTTP status codes between 300-400 are logged as <code>warning</code> logs.<br/>
+- Requests with HTTP status codes between 400-500 are logged as <code>error</code> logs.<br/>
+- Defaults to info."""
+
+audit_handler_level.label:
+"""Log Level"""
+
 desc_rpc.desc:
 desc_rpc.desc:
 """EMQX uses a library called <code>gen_rpc</code> for inter-broker communication.<br/>
 """EMQX uses a library called <code>gen_rpc</code> for inter-broker communication.<br/>
 Most of the time the default config should work,
 Most of the time the default config should work,