|
|
@@ -12,9 +12,9 @@
|
|
|
|
|
|
%% API
|
|
|
-export([start_link/0]).
|
|
|
--export([log/1, log/2]).
|
|
|
+-export([log/3]).
|
|
|
|
|
|
--export([dirty_clean_expired/1]).
|
|
|
+-export([trans_clean_expired/2]).
|
|
|
|
|
|
%% gen_server callbacks
|
|
|
-export([
|
|
|
@@ -32,7 +32,7 @@
|
|
|
-ifdef(TEST).
|
|
|
-define(INTERVAL, 100).
|
|
|
-else.
|
|
|
--define(INTERVAL, 2500).
|
|
|
+-define(INTERVAL, 10000).
|
|
|
-endif.
|
|
|
|
|
|
to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
|
|
|
@@ -50,8 +50,6 @@ to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
|
|
|
http_method = <<"">>,
|
|
|
http_request = <<"">>
|
|
|
};
|
|
|
-to_audit(#{http_method := get}) ->
|
|
|
- ok;
|
|
|
to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api ->
|
|
|
#{
|
|
|
source := Source,
|
|
|
@@ -82,23 +80,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api
|
|
|
duration_ms = DurationMs,
|
|
|
args = <<"">>
|
|
|
};
|
|
|
-to_audit(#{from := event, event := Event}) ->
|
|
|
- #?AUDIT{
|
|
|
- from = event,
|
|
|
- source = <<"">>,
|
|
|
- source_ip = <<"">>,
|
|
|
- %% operation info
|
|
|
- operation_id = iolist_to_binary(Event),
|
|
|
- operation_type = <<"">>,
|
|
|
- operation_result = <<"">>,
|
|
|
- failure = <<"">>,
|
|
|
- %% request detail
|
|
|
- http_status_code = <<"">>,
|
|
|
- http_method = <<"">>,
|
|
|
- http_request = <<"">>,
|
|
|
- duration_ms = 0,
|
|
|
- args = <<"">>
|
|
|
- };
|
|
|
to_audit(#{from := erlang_console, function := F, args := Args}) ->
|
|
|
#?AUDIT{
|
|
|
from = erlang_console,
|
|
|
@@ -117,15 +98,22 @@ to_audit(#{from := erlang_console, function := F, args := Args}) ->
|
|
|
args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args]))
|
|
|
}.
|
|
|
|
|
|
-log(_Level, undefined) ->
|
|
|
+log(_Level, undefined, _Handler) ->
|
|
|
ok;
|
|
|
-log(Level, Meta1) ->
|
|
|
+log(Level, Meta1, Handler) ->
|
|
|
Meta2 = Meta1#{time => logger:timestamp(), level => Level},
|
|
|
- Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}],
|
|
|
- emqx_trace:log(Level, Filter, undefined, Meta2),
|
|
|
- emqx_audit:log(Meta2).
|
|
|
+ log_to_file(Level, Meta2, Handler),
|
|
|
+ log_to_db(Meta2),
|
|
|
+ remove_handler_when_disabled().
|
|
|
+
|
|
|
+remove_handler_when_disabled() ->
|
|
|
+ case emqx_config:get([log, audit, enable], false) of
|
|
|
+ true -> ok;
|
|
|
+ false -> _ = logger:remove_handler(?AUDIT_HANDLER)
|
|
|
+ end,
|
|
|
+ ok.
|
|
|
|
|
|
-log(Log) ->
|
|
|
+log_to_db(Log) ->
|
|
|
Audit0 = to_audit(Log),
|
|
|
Audit = Audit0#?AUDIT{
|
|
|
node = node(),
|
|
|
@@ -144,68 +132,112 @@ init([]) ->
|
|
|
{record_name, ?AUDIT},
|
|
|
{attributes, record_info(fields, ?AUDIT)}
|
|
|
]),
|
|
|
- case mria_rlog:role() of
|
|
|
- core -> {ok, #{}, {continue, setup}};
|
|
|
- _ -> {ok, #{}}
|
|
|
- end.
|
|
|
+ {ok, #{}, {continue, setup}}.
|
|
|
|
|
|
handle_continue(setup, State) ->
|
|
|
ok = mria:wait_for_tables([?AUDIT]),
|
|
|
- clean_expired(),
|
|
|
- Interval = clean_expired_interval(),
|
|
|
- {noreply, State#{interval => Interval}, Interval}.
|
|
|
-
|
|
|
-handle_call(_Request, _From, State = #{interval := Interval}) ->
|
|
|
- {reply, ignore, State, Interval}.
|
|
|
-
|
|
|
-handle_cast(_Request, State = #{interval := Interval}) ->
|
|
|
- {noreply, State, Interval}.
|
|
|
-
|
|
|
-handle_info(timeout, State = #{interval := Interval}) ->
|
|
|
- clean_expired(),
|
|
|
- {noreply, State, Interval};
|
|
|
-handle_info(_Info, State = #{interval := Interval}) ->
|
|
|
- {noreply, State, Interval}.
|
|
|
-
|
|
|
-terminate(_Reason, _State = #{}) ->
|
|
|
+ NewState = State#{role => mria_rlog:role()},
|
|
|
+ ?AUDIT(alert, #{
|
|
|
+ cmd => emqx,
|
|
|
+ args => ["start"],
|
|
|
+ version => emqx_release:version(),
|
|
|
+ from => cli,
|
|
|
+ duration_ms => 0
|
|
|
+ }),
|
|
|
+ {noreply, NewState, interval(NewState)}.
|
|
|
+
|
|
|
+handle_call(_Request, _From, State) ->
|
|
|
+ {reply, ignore, State, interval(State)}.
|
|
|
+
|
|
|
+handle_cast(_Request, State) ->
|
|
|
+ {noreply, State, interval(State)}.
|
|
|
+
|
|
|
+handle_info(timeout, State) ->
|
|
|
+ ExtraWait = clean_expired_logs(),
|
|
|
+ {noreply, State, interval(State) + ExtraWait};
|
|
|
+handle_info(_Info, State) ->
|
|
|
+ {noreply, State, interval(State)}.
|
|
|
+
|
|
|
+terminate(_Reason, _State) ->
|
|
|
ok.
|
|
|
|
|
|
-code_change(_OldVsn, State = #{}, _Extra) ->
|
|
|
+code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
%%%===================================================================
|
|
|
%%% Internal functions
|
|
|
%%%===================================================================
|
|
|
|
|
|
-clean_expired() ->
|
|
|
+%% if clean_expired transaction aborted, it will be scheduled with extra 60 seconds.
|
|
|
+clean_expired_logs() ->
|
|
|
MaxSize = max_size(),
|
|
|
+ Oldest = mnesia:dirty_first(?AUDIT),
|
|
|
CurSize = mnesia:table_info(?AUDIT, size),
|
|
|
case CurSize - MaxSize of
|
|
|
- DelCount when DelCount > 0 ->
|
|
|
- mria:async_dirty(
|
|
|
- ?COMMON_SHARD,
|
|
|
- fun ?MODULE:dirty_clean_expired/1,
|
|
|
- [DelCount]
|
|
|
- );
|
|
|
+ DelSize when DelSize > 0 ->
|
|
|
+ case
|
|
|
+ mria:transaction(
|
|
|
+ ?COMMON_SHARD,
|
|
|
+ fun ?MODULE:trans_clean_expired/2,
|
|
|
+ [Oldest, DelSize]
|
|
|
+ )
|
|
|
+ of
|
|
|
+ {atomic, ok} ->
|
|
|
+ 0;
|
|
|
+ {aborted, Reason} ->
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => "clean_expired_audit_aborted",
|
|
|
+ reason => Reason,
|
|
|
+ delete_size => DelSize,
|
|
|
+ current_size => CurSize,
|
|
|
+ max_count => MaxSize
|
|
|
+ }),
|
|
|
+ 60000
|
|
|
+ end;
|
|
|
_ ->
|
|
|
- ok
|
|
|
+ 0
|
|
|
end.
|
|
|
|
|
|
-dirty_clean_expired(DelCount) ->
|
|
|
- dirty_clean_expired(mnesia:dirty_first(?AUDIT), DelCount).
|
|
|
+trans_clean_expired(Oldest, DelCount) ->
|
|
|
+ First = mnesia:first(?AUDIT),
|
|
|
+ %% Other node already clean from the oldest record.
|
|
|
+ %% ensure not delete twice, otherwise records that should not be deleted will be deleted.
|
|
|
+ case First =:= Oldest of
|
|
|
+ true -> do_clean_expired(First, DelCount);
|
|
|
+ false -> ok
|
|
|
+ end.
|
|
|
|
|
|
-dirty_clean_expired(_, DelCount) when DelCount =< 0 -> ok;
|
|
|
-dirty_clean_expired('$end_of_table', _DelCount) ->
|
|
|
+do_clean_expired(_, DelSize) when DelSize =< 0 -> ok;
|
|
|
+do_clean_expired('$end_of_table', _DelSize) ->
|
|
|
ok;
|
|
|
-dirty_clean_expired(CurKey, DeleteCount) ->
|
|
|
- mnesia:dirty_delete(?AUDIT, CurKey),
|
|
|
- dirty_clean_expired(mnesia:dirty_next(?AUDIT, CurKey), DeleteCount - 1).
|
|
|
+do_clean_expired(CurKey, DeleteSize) ->
|
|
|
+ mnesia:delete(?AUDIT, CurKey, sticky_write),
|
|
|
+ do_clean_expired(mnesia:next(?AUDIT, CurKey), DeleteSize - 1).
|
|
|
|
|
|
max_size() ->
|
|
|
emqx_conf:get([log, audit, max_filter_size], 5000).
|
|
|
|
|
|
-%% Try to make the time interval of each node is different.
|
|
|
-%% 2 * Interval ~ 3 * Interval (5000~7500)
|
|
|
-clean_expired_interval() ->
|
|
|
- Interval = ?INTERVAL,
|
|
|
- Interval * 2 + erlang:phash2(node(), Interval).
|
|
|
+interval(#{role := replicant}) -> hibernate;
|
|
|
+interval(#{role := core}) -> ?INTERVAL + rand:uniform(?INTERVAL).
|
|
|
+
|
|
|
+log_to_file(Level, Meta, #{module := Module} = Handler) ->
|
|
|
+ Log = #{level => Level, meta => Meta, msg => undefined},
|
|
|
+ Handler1 = maps:without(?OWN_KEYS, Handler),
|
|
|
+ try
|
|
|
+ erlang:apply(Module, log, [Log, Handler1])
|
|
|
+ catch
|
|
|
+ C:R:S ->
|
|
|
+ case logger:remove_handler(?AUDIT_HANDLER) of
|
|
|
+ ok ->
|
|
|
+ logger:internal_log(
|
|
|
+ error, {removed_failing_handler, ?AUDIT_HANDLER, C, R, S}
|
|
|
+ );
|
|
|
+ {error, {not_found, _}} ->
|
|
|
+ ok;
|
|
|
+ {error, Reason} ->
|
|
|
+ logger:internal_log(
|
|
|
+ error,
|
|
|
+ {removed_handler_failed, ?AUDIT_HANDLER, Reason, C, R, S}
|
|
|
+ )
|
|
|
+ end
|
|
|
+ end.
|