Преглед на файлове

feat: add /audit http api to filter audit log

zhongwencool преди 2 години
родител
ревизия
926c804314

+ 10 - 7
apps/emqx/include/logger.hrl

@@ -61,25 +61,28 @@
     )
 end).
 
--define(AUDIT(_Level_, _From_, _Meta_), begin
+-define(AUDIT(_LevelFun_, _MetaFun_), begin
     case emqx_config:get([log, audit], #{enable => false}) of
         #{enable := false} ->
             ok;
         #{enable := true, level := _AllowLevel_} ->
+            _Level_ = _LevelFun_,
             case logger:compare_levels(_AllowLevel_, _Level_) of
                 _R_ when _R_ == lt; _R_ == eq ->
-                    emqx_trace:log(
-                        _Level_,
-                        [{emqx_audit, fun(L, _) -> L end, undefined, undefined}],
-                        _Msg = undefined,
-                        _Meta_#{from => _From_}
-                    );
+                    ?LOG_AUDIT_EVENT(_Level_, _MetaFun_);
                 gt ->
                     ok
             end
     end
 end).
 
+-define(LOG_AUDIT_EVENT(Level, M), begin
+    M1 = (M)#{time => logger:timestamp(), level => Level},
+    Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}],
+    emqx_trace:log(Level, Filter, undefined, M1),
+    emqx_audit:log(M1)
+end).
+
 %% print to 'user' group leader
 -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
 -define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)).

+ 9 - 0
apps/emqx/src/config/emqx_config_logger.erl

@@ -23,6 +23,8 @@
 -export([post_config_update/5]).
 -export([filter_audit/2]).
 
+-include("logger.hrl").
+
 -define(LOG, [log]).
 -define(AUDIT_HANDLER, emqx_audit).
 
@@ -96,6 +98,7 @@ update_log_handlers(NewHandlers) ->
     ok.
 
 update_log_handler({removed, Id}) ->
+    audit("audit_disabled", Id),
     log_to_console("Config override: ~s is removed~n", [id_for_log(Id)]),
     logger:remove_handler(Id);
 update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
@@ -104,6 +107,7 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
     _ = logger:remove_handler(Id),
     case logger:add_handler(Id, Mod, Conf) of
         ok ->
+            audit("audit_enabled", Id),
             ok;
         %% Don't crash here, otherwise the cluster rpc will retry the wrong handler forever.
         {error, Reason} ->
@@ -114,6 +118,11 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
     end,
     ok.
 
+audit(Event, ?AUDIT_HANDLER) ->
+    ?LOG_AUDIT_EVENT(alert, #{event => Event, from => event});
+audit(_, _) ->
+    ok.
+
 id_for_log(console) -> "log.console";
 id_for_log(Other) -> "log.file." ++ atom_to_list(Other).
 

+ 5 - 0
apps/emqx_audit/README.md

@@ -0,0 +1,5 @@
+emqx_audit
+=====
+
+Audit log for EMQX, empowers users to efficiently access the desired audit trail data
+and facilitates auditing, compliance, troubleshooting, and security analysis.

+ 39 - 0
apps/emqx_audit/include/emqx_audit.hrl

@@ -0,0 +1,39 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-define(AUDIT, emqx_audit).
+
+-record(?AUDIT, {
+    seq,
+    %% basic info
+    created_at,
+    node,
+    from,
+    source,
+    source_ip,
+    %% operation info
+    operation_id,
+    operation_type,
+    args,
+    operation_result,
+    failure,
+    %% request detail
+    http_method,
+    http_request,
+    http_status_code,
+    duration_ms,
+    extra
+}).

+ 2 - 0
apps/emqx_audit/rebar.config

@@ -0,0 +1,2 @@
+{erl_opts, [debug_info]}.
+{deps, []}.

+ 10 - 0
apps/emqx_audit/src/emqx_audit.app.src

@@ -0,0 +1,10 @@
+{application, emqx_audit, [
+    {description, "Audit log for EMQX"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {mod, {emqx_audit_app, []}},
+    {applications, [kernel, stdlib, emqx]},
+    {env, []},
+    {modules, []},
+    {links, []}
+]}.

+ 202 - 0
apps/emqx_audit/src/emqx_audit.erl

@@ -0,0 +1,202 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_audit).
+
+%% API
+-export([]).
+
+-behaviour(gen_server).
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include("emqx_audit.hrl").
+
+%% API
+-export([start_link/1]).
+-export([log/1]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+-define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]).
+-define(CLEAN_EXPIRED_MS, 60 * 1000).
+
+to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
+    #?AUDIT{
+        created_at = erlang:system_time(microsecond),
+        node = node(),
+        operation_id = <<"">>,
+        operation_type = atom_to_binary(Cmd),
+        args = Args,
+        operation_result = <<"">>,
+        failure = <<"">>,
+        duration_ms = DurationMs,
+        from = cli,
+        source = <<"">>,
+        source_ip = <<"">>,
+        http_status_code = <<"">>,
+        http_method = <<"">>,
+        http_request = <<"">>
+    };
+to_audit(#{http_method := get}) ->
+    ok;
+to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api ->
+    #{
+        source := Source,
+        source_ip := SourceIp,
+        %% operation info
+        operation_id := OperationId,
+        operation_type := OperationType,
+        operation_result := OperationResult,
+        %% request detail
+        http_status_code := StatusCode,
+        http_method := Method,
+        http_request := Request,
+        duration_ms := DurationMs
+    } = Log,
+    #?AUDIT{
+        created_at = erlang:system_time(microsecond),
+        node = node(),
+        from = From,
+        source = Source,
+        source_ip = SourceIp,
+        %% operation info
+        operation_id = OperationId,
+        operation_type = OperationType,
+        operation_result = OperationResult,
+        failure = maps:get(failure, Log, <<"">>),
+        %% request detail
+        http_status_code = StatusCode,
+        http_method = Method,
+        http_request = Request,
+        duration_ms = DurationMs,
+        args = <<"">>
+    };
+to_audit(#{from := event, event := Event}) ->
+    #?AUDIT{
+        created_at = erlang:system_time(microsecond),
+        node = node(),
+        from = event,
+        source = <<"">>,
+        source_ip = <<"">>,
+        %% operation info
+        operation_id = iolist_to_binary(Event),
+        operation_type = <<"">>,
+        operation_result = <<"">>,
+        failure = <<"">>,
+        %% request detail
+        http_status_code = <<"">>,
+        http_method = <<"">>,
+        http_request = <<"">>,
+        duration_ms = 0,
+        args = <<"">>
+    };
+to_audit(#{from := erlang_console, function := F, args := Args}) ->
+    #?AUDIT{
+        created_at = erlang:system_time(microsecond),
+        node = node(),
+        from = erlang_console,
+        source = <<"">>,
+        source_ip = <<"">>,
+        %% operation info
+        operation_id = <<"">>,
+        operation_type = <<"">>,
+        operation_result = <<"">>,
+        failure = <<"">>,
+        %% request detail
+        http_status_code = <<"">>,
+        http_method = <<"">>,
+        http_request = <<"">>,
+        duration_ms = 0,
+        args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args]))
+    }.
+
+log(Log) ->
+    gen_server:cast(?MODULE, {write, to_audit(Log)}).
+
+start_link(Config) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Config], []).
+
+init([Config]) ->
+    erlang:process_flag(trap_exit, true),
+    ok = mria:create_table(?AUDIT, [
+        {type, ordered_set},
+        {rlog_shard, ?COMMON_SHARD},
+        {storage, disc_copies},
+        {record_name, ?AUDIT},
+        {attributes, record_info(fields, ?AUDIT)}
+    ]),
+    {ok, Config, {continue, setup}}.
+
+handle_continue(setup, #{max_size := MaxSize} = State) ->
+    ok = mria:wait_for_tables([?AUDIT]),
+    LatestId = latest_id(),
+    clean_expired(LatestId, MaxSize),
+    {noreply, State#{latest_id => LatestId}}.
+
+handle_call(_Request, _From, State = #{}) ->
+    {reply, ok, State}.
+
+handle_cast({write, Log}, State = #{latest_id := LatestId}) ->
+    NewSeq = LatestId + 1,
+    Audit = Log#?AUDIT{seq = NewSeq},
+    mnesia:dirty_write(?AUDIT, Audit),
+    {noreply, State#{latest_id => NewSeq}, ?CLEAN_EXPIRED_MS};
+handle_cast(_Request, State = #{}) ->
+    {noreply, State}.
+
+handle_info(timeout, State = #{max_size := MaxSize, latest_id := LatestId}) ->
+    clean_expired(LatestId, MaxSize),
+    {noreply, State#{latest_id => latest_id()}, hibernate};
+handle_info(_Info, State = #{}) ->
+    {noreply, State}.
+
+terminate(_Reason, _State = #{}) ->
+    ok.
+
+code_change(_OldVsn, State = #{}, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+clean_expired(LatestId, MaxSize) ->
+    Min = LatestId - MaxSize,
+    %% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end),
+    MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}],
+    NumDeleted = mnesia:ets(fun ets:select_delete/2, [?AUDIT, MS]),
+    ?SLOG(debug, #{
+        msg => "clean_audit_log",
+        latest_id => LatestId,
+        min => Min,
+        deleted_number => NumDeleted
+    }),
+    ok.
+
+latest_id() ->
+    case mnesia:dirty_last(?AUDIT) of
+        '$end_of_table' -> 0;
+        Seq -> Seq
+    end.

+ 397 - 0
apps/emqx_audit/src/emqx_audit_api.erl

@@ -0,0 +1,397 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_audit_api).
+
+-behaviour(minirest_api).
+
+%% API
+-export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]).
+-export([audit/2]).
+-export([qs2ms/2, format/1]).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include("emqx_audit.hrl").
+
+-import(hoconsc, [mk/2, ref/2, array/1]).
+
+-define(TAGS, ["Audit"]).
+
+-define(AUDIT_QS_SCHEMA, [
+    {<<"node">>, atom},
+    {<<"from">>, atom},
+    {<<"source">>, binary},
+    {<<"source_ip">>, binary},
+    {<<"operation_id">>, binary},
+    {<<"operation_type">>, binary},
+    {<<"operation_result">>, atom},
+    {<<"http_status_code">>, integer},
+    {<<"http_method">>, atom},
+    {<<"gte_created_at">>, timestamp},
+    {<<"lte_created_at">>, timestamp},
+    {<<"gte_duration_ms">>, timestamp},
+    {<<"lte_duration_ms">>, timestamp}
+]).
+
+namespace() -> "audit".
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    ["/audit"].
+
+schema("/audit") ->
+    #{
+        'operationId' => audit,
+        get => #{
+            tags => ?TAGS,
+            description => ?DESC(audit_get),
+            parameters => [
+                {node,
+                    ?HOCON(binary(), #{
+                        in => query,
+                        required => false,
+                        example => <<"emqx@127.0.0.1">>,
+                        desc => ?DESC(filter_node)
+                    })},
+                {from,
+                    ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{
+                        in => query,
+                        required => false,
+                        example => <<"dashboard">>,
+                        desc => ?DESC(filter_from)
+                    })},
+                {source,
+                    ?HOCON(binary(), #{
+                        in => query,
+                        required => false,
+                        example => <<"admin">>,
+                        desc => ?DESC(filter_source)
+                    })},
+                {source_ip,
+                    ?HOCON(binary(), #{
+                        in => query,
+                        required => false,
+                        example => <<"127.0.0.1">>,
+                        desc => ?DESC(filter_source_ip)
+                    })},
+                {operation_id,
+                    ?HOCON(binary(), #{
+                        in => query,
+                        required => false,
+                        example => <<"/rules/{id}">>,
+                        desc => ?DESC(filter_operation_id)
+                    })},
+                {operation_type,
+                    ?HOCON(binary(), #{
+                        in => query,
+                        example => <<"rules">>,
+                        required => false,
+                        desc => ?DESC(filter_operation_type)
+                    })},
+                {operation_result,
+                    ?HOCON(?ENUM([success, failure]), #{
+                        in => query,
+                        example => failure,
+                        required => false,
+                        desc => ?DESC(filter_operation_result)
+                    })},
+                {http_status_code,
+                    ?HOCON(integer(), #{
+                        in => query,
+                        example => 200,
+                        required => false,
+                        desc => ?DESC(filter_http_status_code)
+                    })},
+                {http_method,
+                    ?HOCON(?ENUM([post, put, delete]), #{
+                        in => query,
+                        example => post,
+                        required => false,
+                        desc => ?DESC(filter_http_method)
+                    })},
+                {gte_duration_ms,
+                    ?HOCON(integer(), #{
+                        in => query,
+                        example => 0,
+                        required => false,
+                        desc => ?DESC(filter_gte_duration_ms)
+                    })},
+                {lte_duration_ms,
+                    ?HOCON(integer(), #{
+                        in => query,
+                        example => 1000,
+                        required => false,
+                        desc => ?DESC(filter_lte_duration_ms)
+                    })},
+                {gte_created_at,
+                    ?HOCON(emqx_utils_calendar:epoch_millisecond(), #{
+                        in => query,
+                        required => false,
+                        example => <<"2023-10-15T00:00:00.820384+08:00">>,
+                        desc => ?DESC(filter_gte_created_at)
+                    })},
+                {lte_created_at,
+                    ?HOCON(emqx_utils_calendar:epoch_millisecond(), #{
+                        in => query,
+                        example => <<"2023-10-16T00:00:00.820384+08:00">>,
+                        required => false,
+                        desc => ?DESC(filter_lte_created_at)
+                    })},
+                ref(emqx_dashboard_swagger, page),
+                ref(emqx_dashboard_swagger, limit)
+            ],
+            summary => <<"List audit logs">>,
+            responses => #{
+                200 =>
+                    emqx_dashboard_swagger:schema_with_example(
+                        array(?REF(audit_list)),
+                        audit_log_list_example()
+                    )
+            }
+        }
+    }.
+
+fields(audit_list) ->
+    [
+        {data, mk(array(?REF(audit)), #{desc => ?DESC("audit_resp")})},
+        {meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
+    ];
+fields(audit) ->
+    [
+        {created_at,
+            ?HOCON(
+                emqx_utils_calendar:epoch_millisecond(),
+                #{
+                    desc => "The time when the log is created"
+                }
+            )},
+        {node,
+            ?HOCON(binary(), #{
+                desc => "The node name to which the log is created"
+            })},
+        {from,
+            ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{
+                desc => "The source type of the log"
+            })},
+        {source,
+            ?HOCON(binary(), #{
+                desc => "The source of the log"
+            })},
+        {source_ip,
+            ?HOCON(binary(), #{
+                desc => "The source ip of the log"
+            })},
+        {operation_id,
+            ?HOCON(binary(), #{
+                desc => "The operation id of the log"
+            })},
+        {operation_type,
+            ?HOCON(binary(), #{
+                desc => "The operation type of the log"
+            })},
+        {operation_result,
+            ?HOCON(?ENUM([success, failure]), #{
+                desc => "The operation result of the log"
+            })},
+        {http_status_code,
+            ?HOCON(integer(), #{
+                desc => "The http status code of the log"
+            })},
+        {http_method,
+            ?HOCON(?ENUM([post, put, delete]), #{
+                desc => "The http method of the log"
+            })},
+        {duration_ms,
+            ?HOCON(integer(), #{
+                desc => "The duration of the log"
+            })},
+        {args,
+            ?HOCON(?ARRAY(binary()), #{
+                desc => "The args of the log"
+            })},
+        {failure,
+            ?HOCON(?ARRAY(binary()), #{
+                desc => "The failure of the log"
+            })},
+        {http_request,
+            ?HOCON(?REF(http_request), #{
+                desc => "The http request of the log"
+            })}
+    ];
+fields(http_request) ->
+    [
+        {bindings, ?HOCON(map(), #{})},
+        {body, ?HOCON(map(), #{})},
+        {headers, ?HOCON(map(), #{})},
+        {method, ?HOCON(?ENUM([post, put, delete]), #{})}
+    ].
+
+audit(get, #{query_string := QueryString}) ->
+    case
+        emqx_mgmt_api:node_query(
+            node(),
+            ?AUDIT,
+            QueryString,
+            ?AUDIT_QS_SCHEMA,
+            fun ?MODULE:qs2ms/2,
+            fun ?MODULE:format/1
+        )
+    of
+        {error, page_limit_invalid} ->
+            {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
+        {error, Node, Error} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
+        Result ->
+            {200, Result}
+    end.
+
+qs2ms(_Tab, {Qs, _}) ->
+    #{
+        match_spec => gen_match_spec(Qs, #?AUDIT{_ = '_'}, []),
+        fuzzy_fun => undefined
+    }.
+
+gen_match_spec([], Audit, Conn) ->
+    [{Audit, Conn, ['$_']}];
+gen_match_spec([{node, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{node = T}, Conn);
+gen_match_spec([{from, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{from = T}, Conn);
+gen_match_spec([{source, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{source = T}, Conn);
+gen_match_spec([{source_ip, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{source_ip = T}, Conn);
+gen_match_spec([{operation_id, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{operation_id = T}, Conn);
+gen_match_spec([{operation_type, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{operation_type = T}, Conn);
+gen_match_spec([{operation_result, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{operation_result = T}, Conn);
+gen_match_spec([{http_status_code, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{http_status_code = T}, Conn);
+gen_match_spec([{http_method, '=:=', T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{http_method = T}, Conn);
+gen_match_spec([{created_at, Hold, T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{created_at = '$1'}, [{'$1', Hold, T} | Conn]);
+gen_match_spec([{created_at, Hold1, T1, Hold2, T2} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{created_at = '$1'}, [
+        {'$1', Hold1, T1}, {'$1', Hold2, T2} | Conn
+    ]);
+gen_match_spec([{duration_ms, Hold, T} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{duration_ms = '$2'}, [{'$2', Hold, T} | Conn]);
+gen_match_spec([{duration_ms, Hold1, T1, Hold2, T2} | Qs], Audit, Conn) ->
+    gen_match_spec(Qs, Audit#?AUDIT{duration_ms = '$2'}, [
+        {'$2', Hold1, T1}, {'$2', Hold2, T2} | Conn
+    ]).
+
+format(Audit) ->
+    #?AUDIT{
+        created_at = CreatedAt,
+        node = Node,
+        from = From,
+        source = Source,
+        source_ip = SourceIp,
+        operation_id = OperationId,
+        operation_type = OperationType,
+        operation_result = OperationResult,
+        http_status_code = HttpStatusCode,
+        http_method = HttpMethod,
+        duration_ms = DurationMs,
+        args = Args,
+        failure = Failure,
+        http_request = HttpRequest
+    } = Audit,
+    #{
+        created_at => emqx_utils_calendar:epoch_to_rfc3339(CreatedAt, microsecond),
+        node => Node,
+        from => From,
+        source => Source,
+        source_ip => SourceIp,
+        operation_id => OperationId,
+        operation_type => OperationType,
+        operation_result => OperationResult,
+        http_status_code => HttpStatusCode,
+        http_method => HttpMethod,
+        duration_ms => DurationMs,
+        args => Args,
+        failure => Failure,
+        http_request => HttpRequest
+    }.
+
+audit_log_list_example() ->
+    #{
+        data => [api_example(), cli_example()],
+        meta => #{
+            <<"count">> => 2,
+            <<"hasnext">> => false,
+            <<"limit">> => 50,
+            <<"page">> => 1
+        }
+    }.
+
+api_example() ->
+    #{
+        <<"args">> => "",
+        <<"created_at">> => "2023-10-17T10:41:20.383993+08:00",
+        <<"duration_ms">> => 0,
+        <<"failure">> => "",
+        <<"from">> => "dashboard",
+        <<"http_method">> => "post",
+        <<"http_request">> => #{
+            <<"bindings">> => #{},
+            <<"body">> => #{
+                <<"password">> => "******",
+                <<"username">> => "admin"
+            },
+            <<"headers">> => #{
+                <<"accept">> => "*/*",
+                <<"authorization">> => "******",
+                <<"connection">> => "keep-alive",
+                <<"content-length">> => "45",
+                <<"content-type">> => "application/json"
+            },
+            <<"method">> => "post"
+        },
+        <<"http_status_code">> => 200,
+        <<"node">> => "emqx@127.0.0.1",
+        <<"operation_id">> => "/login",
+        <<"operation_result">> => "success",
+        <<"operation_type">> => "login",
+        <<"source">> => "admin",
+        <<"source_ip">> => "127.0.0.1"
+    }.
+
+cli_example() ->
+    #{
+        <<"args">> => [<<"show">>, <<"log">>],
+        <<"created_at">> => "2023-10-17T10:45:13.100426+08:00",
+        <<"duration_ms">> => 7,
+        <<"failure">> => "",
+        <<"from">> => "cli",
+        <<"http_method">> => "",
+        <<"http_request">> => "",
+        <<"http_status_code">> => "",
+        <<"node">> => "emqx@127.0.0.1",
+        <<"operation_id">> => "",
+        <<"operation_result">> => "",
+        <<"operation_type">> => "conf",
+        <<"source">> => "",
+        <<"source_ip">> => ""
+    }.

+ 27 - 0
apps/emqx_audit/src/emqx_audit_app.erl

@@ -0,0 +1,27 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_audit_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+    emqx_audit_sup:start_link().
+
+stop(_State) ->
+    ok.

+ 45 - 0
apps/emqx_audit/src/emqx_audit_sup.erl

@@ -0,0 +1,45 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_audit_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_all,
+        intensity => 10,
+        period => 10
+    },
+    ChildSpecs = [
+        #{
+            id => emqx_audit,
+            start => {emqx_audit, start_link, [#{max_size => 5000}]},
+            type => worker,
+            restart => transient,
+            shutdown => 1000
+        }
+    ],
+    {ok, {SupFlags, ChildSpecs}}.

+ 8 - 9
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -108,15 +108,14 @@ admins(_) ->
     emqx_ctl:usage(usage_sync()).
 
 audit(Level, From, Log) ->
-    Log1 = redact(Log#{time => logger:timestamp()}),
-    ?AUDIT(Level, From, Log1).
-
-redact(Logs = #{cmd := admins, args := ["add", Username, _Password | Rest]}) ->
-    Logs#{args => ["add", Username, "******" | Rest]};
-redact(Logs = #{cmd := admins, args := ["passwd", Username, _Password]}) ->
-    Logs#{args => ["passwd", Username, "******"]};
-redact(Logs = #{cmd := license, args := ["update", _License]}) ->
-    Logs#{args => ["update", "******"]};
+    ?AUDIT(Level, redact(Log#{from => From})).
+
+redact(Logs = #{cmd := admins, args := [<<"add">>, Username, _Password | Rest]}) ->
+    Logs#{args => [<<"add">>, Username, <<"******">> | Rest]};
+redact(Logs = #{cmd := admins, args := [<<"passwd">>, Username, _Password]}) ->
+    Logs#{args => [<<"passwd">>, Username, <<"******">>]};
+redact(Logs = #{cmd := license, args := [<<"update">>, _License]}) ->
+    Logs#{args => [<<"update">>, "******"]};
 redact(Logs) ->
     Logs.
 

+ 3 - 3
apps/emqx_dashboard/src/emqx_dashboard.erl

@@ -72,7 +72,7 @@ start_listeners(Listeners) ->
         base_path => emqx_dashboard_swagger:base_path(),
         modules => minirest_api:find_api_modules(apps()),
         authorization => Authorization,
-        log => fun emqx_dashboard_audit:log/1,
+        log => fun emqx_dashboard_audit:log/2,
         security => [#{'basicAuth' => []}, #{'bearerAuth' => []}],
         swagger_global_spec => GlobalSpec,
         dispatch => dispatch(),
@@ -222,7 +222,7 @@ authorize(Req) ->
         {bearer, Token} ->
             case emqx_dashboard_admin:verify_token(Req, Token) of
                 {ok, Username} ->
-                    {ok, #{auth_type => jwt_token, username => Username}};
+                    {ok, #{auth_type => jwt_token, source => Username}};
                 {error, token_timeout} ->
                     {401, 'TOKEN_TIME_OUT', <<"Token expired, get new token by POST /login">>};
                 {error, not_found} ->
@@ -253,7 +253,7 @@ api_key_authorize(Req, Key, Secret) ->
     Path = cowboy_req:path(Req),
     case emqx_mgmt_auth:authorize(Path, Req, Key, Secret) of
         ok ->
-            {ok, #{auth_type => api_key, api_key => Key}};
+            {ok, #{auth_type => api_key, source => Key}};
         {error, <<"not_allowed">>} ->
             return_unauthorized(
                 ?BAD_API_KEY_OR_SECRET,

+ 81 - 26
apps/emqx_dashboard/src/emqx_dashboard_audit.erl

@@ -18,29 +18,84 @@
 
 -include_lib("emqx/include/logger.hrl").
 %% API
--export([log/1]).
-
-log(Meta0) ->
-    #{req_start := ReqStart, req_end := ReqEnd, code := Code, method := Method} = Meta0,
-    Duration = erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond),
-    Level = level(Method, Code, Duration),
-    Username = maps:get(username, Meta0, <<"">>),
-    From = from(maps:get(auth_type, Meta0, "")),
-    Meta1 = maps:without([req_start, req_end], Meta0),
-    Meta2 = Meta1#{time => logger:timestamp(), duration_ms => Duration},
-    Meta = emqx_utils:redact(Meta2),
-    ?AUDIT(
-        Level,
-        From,
-        Meta#{username => binary_to_list(Username), node => node()}
-    ),
-    ok.
-
-from(jwt_token) -> "dashboard";
-from(_) -> "rest_api".
-
-level(get, _Code, _) -> debug;
-level(_, Code, _) when Code >= 200 andalso Code < 300 -> info;
-level(_, Code, _) when Code >= 300 andalso Code < 400 -> warning;
-level(_, Code, _) when Code >= 400 andalso Code < 500 -> error;
-level(_, _, _) -> critical.
+-export([log/2]).
+
+%% todo filter high frequency events
+-define(HIGH_FREQUENCY_EVENTS, [
+    mqtt_subscribe,
+    mqtt_unsubscribe,
+    mqtt_subscribe_batch,
+    mqtt_unsubscribe_batch,
+    mqtt_publish,
+    mqtt_publish_batch,
+    kickout_client
+]).
+
+log(#{code := Code, method := Method} = Meta, Req) ->
+    %% Keep level/2 and log_meta/1 inside of this ?AUDIT macro
+    ?AUDIT(level(Method, Code), log_meta(Meta, Req)).
+
+log_meta(Meta, Req) ->
+    Meta1 = #{
+        time => logger:timestamp(),
+        from => from(Meta),
+        source => source(Meta),
+        duration_ms => duration_ms(Meta),
+        source_ip => source_ip(Req),
+        operation_type => operation_type(Meta),
+        %% method for http filter api.
+        http_method => maps:get(method, Meta),
+        http_request => http_request(Meta),
+        http_status_code => maps:get(code, Meta),
+        operation_result => operation_result(Meta),
+        node => node()
+    },
+    Meta2 = maps:without([req_start, req_end, method, headers, body, bindings, code], Meta),
+    emqx_utils:redact(maps:merge(Meta2, Meta1)).
+
+duration_ms(#{req_start := ReqStart, req_end := ReqEnd}) ->
+    erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond).
+
+from(Meta) ->
+    case maps:find(auth_type, Meta) of
+        {ok, jwt_token} ->
+            dashboard;
+        {ok, api_key} ->
+            rest_api;
+        error ->
+            case maps:find(operation_id, Meta) of
+                %% login api create jwt_token, so we don have authorization in it's headers
+                {ok, <<"/login">>} -> dashboard;
+                _ -> unknown
+            end
+    end.
+source(#{source := Source}) -> Source;
+source(#{operation_id := <<"/login">>, body := #{<<"username">> := Username}}) -> Username;
+source(_Meta) -> <<"">>.
+
+source_ip(Req) ->
+    case cowboy_req:header(<<"x-forwarded-for">>, Req, undefined) of
+        undefined ->
+            {RemoteIP, _} = cowboy_req:peer(Req),
+            iolist_to_binary(inet:ntoa(RemoteIP));
+        Addresses ->
+            hd(binary:split(Addresses, <<",">>))
+    end.
+
+operation_type(Meta) ->
+    case maps:find(operation_id, Meta) of
+        {ok, OperationId} -> lists:nth(2, binary:split(OperationId, <<"/">>));
+        _ -> <<"unknown">>
+    end.
+
+http_request(Meta) ->
+    maps:with([method, headers, bindings, body], Meta).
+
+operation_result(#{failure := _}) -> failure;
+operation_result(_) -> success.
+
+level(get, _Code) -> debug;
+level(_, Code) when Code >= 200 andalso Code < 300 -> info;
+level(_, Code) when Code >= 300 andalso Code < 400 -> warning;
+level(_, Code) when Code >= 400 andalso Code < 500 -> error;
+level(_, _) -> critical.

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -33,7 +33,7 @@
     ]
 ).
 
-%% minirest/dashbaord_swagger behaviour callbacks
+%% minirest/dashboard_swagger behaviour callbacks
 -export([
     api_spec/0,
     paths/0,

+ 2 - 1
apps/emqx_machine/priv/reboot_lists.eterm

@@ -124,7 +124,8 @@
             emqx_ft,
             emqx_gcp_device,
             emqx_dashboard_rbac,
-            emqx_dashboard_sso
+            emqx_dashboard_sso,
+            emqx_audit
         ],
     %% must always be of type `load'
     ce_business_apps =>

+ 4 - 1
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -47,7 +47,10 @@ post_boot() ->
     ok = ensure_apps_started(),
     ok = print_vsn(),
     ok = start_autocluster(),
-    ?AUDIT(alert, cli, #{time => logger:timestamp(), event => "emqx_start"}),
+    ?AUDIT(alert, #{
+        event => "emqx_start",
+        from => event
+    }),
     ignore.
 
 -ifdef(TEST).

+ 3 - 3
apps/emqx_machine/src/emqx_machine_terminator.erl

@@ -67,9 +67,9 @@ graceful() ->
 
 %% @doc Shutdown the Erlang VM and wait indefinitely.
 graceful_wait() ->
-    ?AUDIT(alert, cli, #{
-        time => logger:timestamp(),
-        event => emqx_gracefully_stop
+    ?AUDIT(alert, #{
+        event => "emqx_gracefully_stop",
+        from => event
     }),
     ok = graceful(),
     exit_loop().

+ 3 - 3
apps/emqx_machine/src/emqx_restricted_shell.erl

@@ -112,11 +112,11 @@ max_heap_size_warning(MF, Args) ->
 log(_, {?MODULE, prompt_func}, [[{history, _}]]) ->
     ok;
 log(IsAllow, MF, Args) ->
-    ?AUDIT(warning, erlang_console, #{
-        time => logger:timestamp(),
+    ?AUDIT(warning, #{
         function => MF,
         args => pp_args(Args),
-        permission => IsAllow
+        permission => IsAllow,
+        from => erlang_console
     }),
     to_console(IsAllow, MF, Args).
 

+ 2 - 2
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -341,11 +341,11 @@ do_select(
         try
             case maps:get(continuation, QueryState, undefined) of
                 undefined ->
-                    ets:select(Tab, Ms, Limit);
+                    ets:select_reverse(Tab, Ms, Limit);
                 Continuation ->
                     %% XXX: Repair is necessary because we pass Continuation back
                     %% and forth through the nodes in the `do_cluster_query`
-                    ets:select(ets:repair_continuation(Continuation, Ms))
+                    ets:select_reverse(ets:repair_continuation(Continuation, Ms))
             end
         catch
             exit:_ = Exit ->

+ 2 - 1
mix.exs

@@ -214,7 +214,8 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_azure_event_hub,
       :emqx_gcp_device,
       :emqx_dashboard_rbac,
-      :emqx_dashboard_sso
+      :eqmx_dashboard_sso,
+      :emqx_audit
     ])
   end
 

+ 58 - 0
rel/i18n/emqx_audit_api.hocon

@@ -0,0 +1,58 @@
+emqx_audit_api {
+
+audit_get.desc:
+"""Get audit logs based on filter API, empowers users to efficiently
+access the desired audit trail data and facilitates auditing, compliance,
+troubleshooting, and security analysis"""
+
+audit_get.label:
+"List audit logs"
+
+filter_node.desc:
+"Filter logs based on the node name to which the logs are created."
+
+filter_from.desc:
+""""Filter logs based on source type, valid values include:
+`dashboard`: Dashboard request logs, requiring the use of a jwt_token.
+`rest_api`: API KEY request logs.
+`cli`: The emqx command line logs.
+`erlang_console`: The emqx remote_console run function logs.
+`event`: Logs related to events such as emqx_start, emqx_stop, audit_enabled, and audit_disabled."""
+
+filter_source.desc:
+""""Filter logs based on source, Possible values are:
+The login username when logs are generated from the dashboard.
+The API Keys when logs are generated from the REST API.
+empty string when logs are generated from CLI, Erlang console, or an event."""
+
+filter_source_ip.desc:
+"Filter logs based on source ip when logs are generated from dashboard and REST API."
+
+filter_operation_id.desc:
+"Filter log with swagger's operation_id when logs are generated from dashboard and REST API."
+
+filter_operation_type.desc:
+"Filter logs with operation type."
+
+filter_operation_result.desc:
+"Filter logs with operation result."
+
+filter_http_status_code.desc:
+"Filter The HTTP API with response code when logs are generated from dashboard and REST API."
+
+filter_http_method.desc:
+"Filter The HTTP API Request with method when logs are generated from dashboard and REST API."
+
+filter_gte_duration_ms.desc:
+"Filter logs with a duration greater than or equal to given microseconds."
+
+filter_lte_duration_ms.desc:
+"Filter logs with a duration less than or equal to given microseconds."
+
+filter_gte_created_at.desc:
+"Filter logs with a creation time greater than or equal to the given timestamp, rfc3339 or timestamp(millisecond)"
+
+filter_lte_created_at.desc:
+"Filter logs with a creation time less than or equal to the given timestamp, rfc3339 or timestamp(millisecond)"
+
+}