Преглед изворни кода

Support tracing log files for specific topics or clients

terry-xiaoyu пре 7 година
родитељ
комит
92cc171aaf
5 измењених фајлова са 74 додато и 32 уклоњено
  1. 3 0
      etc/emqx.conf
  2. 3 2
      priv/emqx.schema
  3. 1 1
      src/emqx_logger_formatter.erl
  4. 1 1
      src/emqx_protocol.erl
  5. 66 28
      src/emqx_tracer.erl

+ 3 - 0
etc/emqx.conf

@@ -342,6 +342,9 @@ log.to = console
 ##
 ##
 ## Value: debug | info | notice | warning | error | critical | alert | emergency
 ## Value: debug | info | notice | warning | error | critical | alert | emergency
 ##
 ##
+## Note: Only the messages with severity level greater than or equal to
+##       this level will be logged.
+##
 ## Default: error
 ## Default: error
 log.level = error
 log.level = error
 
 

+ 3 - 2
priv/emqx.schema

@@ -429,6 +429,7 @@ end}.
 
 
 {translation, "kernel.logger", fun(Conf) ->
 {translation, "kernel.logger", fun(Conf) ->
     LogTo = cuttlefish:conf_get("log.to", Conf),
     LogTo = cuttlefish:conf_get("log.to", Conf),
+    TopLogLevel = cuttlefish:conf_get("log.level", Conf),
     Formatter = {emqx_logger_formatter,
     Formatter = {emqx_logger_formatter,
                   #{template =>
                   #{template =>
                       [time," [",level,"] ",
                       [time," [",level,"] ",
@@ -449,7 +450,7 @@ end}.
     DefaultHandler =
     DefaultHandler =
         if LogTo =:= console orelse LogTo =:= both ->
         if LogTo =:= console orelse LogTo =:= both ->
                 [{handler, default, logger_std_h,
                 [{handler, default, logger_std_h,
-                    #{level => all,
+                    #{level => TopLogLevel,
                       config => #{type => standard_io},
                       config => #{type => standard_io},
                       formatter => Formatter}}];
                       formatter => Formatter}}];
            true ->
            true ->
@@ -460,7 +461,7 @@ end}.
     FileHandler =
     FileHandler =
         if LogTo =:= file orelse LogTo =:= both ->
         if LogTo =:= file orelse LogTo =:= both ->
               [{handler, file, logger_disk_log_h,
               [{handler, file, logger_disk_log_h,
-                    #{level => all,
+                    #{level => TopLogLevel,
                       config => FileConf(cuttlefish:conf_get("log.file", Conf)),
                       config => FileConf(cuttlefish:conf_get("log.file", Conf)),
                       formatter => Formatter,
                       formatter => Formatter,
                       filesync_repeat_interval => 1000}}];
                       filesync_repeat_interval => 1000}}];

+ 1 - 1
src/emqx_logger_formatter.erl

@@ -133,7 +133,7 @@ to_string(X,_) when is_list(X) ->
         _ -> io_lib:format(?FormatP,[X])
         _ -> io_lib:format(?FormatP,[X])
     end;
     end;
 to_string(X,_) ->
 to_string(X,_) ->
-    io_lib:format(?FormatP,[X]).
+    io_lib:format("~s",[X]).
 
 
 printable_list([]) ->
 printable_list([]) ->
     false;
     false;

+ 1 - 1
src/emqx_protocol.erl

@@ -287,7 +287,7 @@ process_packet(?CONNECT_PACKET(
                                        client_id   = ClientId,
                                        client_id   = ClientId,
                                        username    = Username,
                                        username    = Username,
                                        password    = Password} = Connect), PState) ->
                                        password    = Password} = Connect), PState) ->
-    emqx_logger:add_proc_metadata(#{client_id => binary_to_list(ClientId)}),
+    emqx_logger:add_proc_metadata(#{client_id => ClientId}),
     %% TODO: Mountpoint...
     %% TODO: Mountpoint...
     %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
     %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
     WillMsg = make_will_msg(Connect),
     WillMsg = make_will_msg(Connect),

+ 66 - 28
src/emqx_tracer.erl

@@ -25,12 +25,20 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
          code_change/3]).
 
 
--record(state, {level, traces}).
+-record(state, {level, org_top_level, traces}).
 
 
--type(trace_who() :: {client | topic, binary()}).
+-type(trace_who() :: {client_id | topic, binary()}).
 
 
 -define(TRACER, ?MODULE).
 -define(TRACER, ?MODULE).
--define(OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
+-define(FORMAT, {emqx_logger_formatter,
+                  #{template =>
+                      [time," [",level,"] ",
+                       {client_id,
+                          [{peername,
+                              [client_id,"@",peername," "],
+                              [client_id, " "]}],
+                          []},
+                       msg,"\n"]}}).
 
 
 -spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
 -spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
 start_link() ->
 start_link() ->
@@ -40,26 +48,26 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
     %% Dont' trace '$SYS' publish
     %% Dont' trace '$SYS' publish
     ignore;
     ignore;
 trace(publish, #message{from = From, topic = Topic, payload = Payload})
 trace(publish, #message{from = From, topic = Topic, payload = Payload})
-    when is_binary(From); is_atom(From) ->
+        when is_binary(From); is_atom(From) ->
     emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
     emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Start/Stop trace
 %% Start/Stop trace
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
-%% @doc Start to trace client or topic.
+%% @doc Start to trace client_id or topic.
 -spec(start_trace(trace_who(), string()) -> ok | {error, term()}).
 -spec(start_trace(trace_who(), string()) -> ok | {error, term()}).
-start_trace({client, ClientId}, LogFile) ->
-    start_trace({start_trace, {client, ClientId}, LogFile});
+start_trace({client_id, ClientId}, LogFile) ->
+    start_trace({start_trace, {client_id, ClientId}, LogFile});
 start_trace({topic, Topic}, LogFile) ->
 start_trace({topic, Topic}, LogFile) ->
     start_trace({start_trace, {topic, Topic}, LogFile}).
     start_trace({start_trace, {topic, Topic}, LogFile}).
 
 
 start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
 start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
 
 
-%% @doc Stop tracing client or topic.
+%% @doc Stop tracing client_id or topic.
 -spec(stop_trace(trace_who()) -> ok | {error, term()}).
 -spec(stop_trace(trace_who()) -> ok | {error, term()}).
-stop_trace({client, ClientId}) ->
-    gen_server:call(?MODULE, {stop_trace, {client, ClientId}});
+stop_trace({client_id, ClientId}) ->
+    gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}});
 stop_trace({topic, Topic}) ->
 stop_trace({topic, Topic}) ->
     gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
     gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
 
 
@@ -73,37 +81,45 @@ lookup_traces() ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
 init([]) ->
 init([]) ->
-    {ok, #state{level = emqx_config:get_env(trace_level, debug), traces = #{}}}.
+    {ok, #state{level = emqx_config:get_env(trace_level, debug),
+                org_top_level = get_top_level(),
+                traces = #{}}}.
 
 
 handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
 handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
-    case catch logger:trace_file(LogFile, [Who], Level, ?OPTIONS) of
-        {ok, exists} ->
-            {reply, {error, already_exists}, State};
-        {ok, Trace} ->
-            {reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}};
+    case logger:add_handler(handler_id(Who), logger_disk_log_h,
+                                #{level => Level,
+                                  formatter => ?FORMAT,
+                                  filesync_repeat_interval => 1000,
+                                  config => #{type => halt, file => LogFile},
+                                  filter_default => stop,
+                                  filters => [{meta_key_filter,
+                                               {fun filter_by_meta_key/2, Who} }]}) of
+        ok ->
+            set_top_level(all), % open the top logger level to 'all'
+            emqx_logger:info("[Tracer] start trace for ~p", [Who]),
+            {reply, ok, State#state{traces = maps:put(Who, LogFile, Traces)}};
         {error, Reason} ->
         {error, Reason} ->
-            emqx_logger:error("[Tracer] trace error: ~p", [Reason]),
-            {reply, {error, Reason}, State};
-        {'EXIT', Error} ->
-            emqx_logger:error("[Tracer] trace exit: ~p", [Error]),
-            {reply, {error, Error}, State}
+            emqx_logger:error("[Tracer] start trace for ~p failed, error: ~p", [Who, Reason]),
+            {reply, {error, Reason}, State}
     end;
     end;
 
 
-handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
+handle_call({stop_trace, Who}, _From, State = #state{org_top_level = OrgTopLevel, traces = Traces}) ->
     case maps:find(Who, Traces) of
     case maps:find(Who, Traces) of
-        {ok, {Trace, _LogFile}} ->
-            case logger:stop_trace(Trace) of
-                ok -> ok;
-                {error, Error} ->
-                    emqx_logger:error("[Tracer] stop trace ~p error: ~p", [Who, Error])
+        {ok, _LogFile} ->
+            case logger:remove_handler(handler_id(Who)) of
+                ok ->
+                    emqx_logger:info("[Tracer] stop trace for ~p", [Who]);
+                {error, Reason} ->
+                    emqx_logger:error("[Tracer] stop trace for ~p failed, error: ~p", [Who, Reason])
             end,
             end,
+            set_top_level(OrgTopLevel), % reset the top logger level to original value
             {reply, ok, State#state{traces = maps:remove(Who, Traces)}};
             {reply, ok, State#state{traces = maps:remove(Who, Traces)}};
         error ->
         error ->
             {reply, {error, not_found}, State}
             {reply, {error, not_found}, State}
     end;
     end;
 
 
 handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
 handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
-    {reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(Traces)], State};
+    {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
 
 
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->
     emqx_logger:error("[Tracer] unexpected call: ~p", [Req]),
     emqx_logger:error("[Tracer] unexpected call: ~p", [Req]),
@@ -123,3 +139,25 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
 
 
+handler_id({topic, Topic}) ->
+    list_to_atom("topic_" ++ binary_to_list(Topic));
+handler_id({client_id, ClientId}) ->
+    list_to_atom("clientid_" ++ binary_to_list(ClientId)).
+
+get_top_level() ->
+    #{level := OrgTopLevel} = logger:get_primary_config(),
+    OrgTopLevel.
+
+set_top_level(Level) ->
+    logger:set_primary_config(level, Level).
+
+filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
+    case maps:find(MetaKey, Meta) of
+        {ok, MetaValue} -> LogEvent;
+        {ok, Topic} when MetaKey =:= topic ->
+            case emqx_topic:match(Topic, MetaValue) of
+                true -> LogEvent;
+                false -> ignore
+            end;
+        _ -> ignore
+    end.