Bläddra i källkod

Make emqx_tracer more reliable

terry-xiaoyu 6 år sedan
förälder
incheckning
0c24bfd78c
3 ändrade filer med 79 tillägg och 121 borttagningar
  1. 1 2
      src/emqx_kernel_sup.erl
  2. 73 115
      src/emqx_tracer.erl
  3. 5 4
      test/emqx_tracer_SUITE.erl

+ 1 - 2
src/emqx_kernel_sup.erl

@@ -32,8 +32,7 @@ init([]) ->
            child_spec(emqx_stats, worker),
            child_spec(emqx_stats, worker),
            child_spec(emqx_metrics, worker),
            child_spec(emqx_metrics, worker),
            child_spec(emqx_ctl, worker),
            child_spec(emqx_ctl, worker),
-           child_spec(emqx_zone, worker),
-           child_spec(emqx_tracer, worker)]}}.
+           child_spec(emqx_zone, worker)]}}.
 
 
 child_spec(M, worker) ->
 child_spec(M, worker) ->
     #{id       => M,
     #{id       => M,

+ 73 - 115
src/emqx_tracer.erl

@@ -16,34 +16,19 @@
 
 
 -module(emqx_tracer).
 -module(emqx_tracer).
 
 
--behaviour(gen_server).
-
 -include("emqx.hrl").
 -include("emqx.hrl").
 -include("logger.hrl").
 -include("logger.hrl").
 
 
 -logger_header("[Tracer]").
 -logger_header("[Tracer]").
 
 
 %% APIs
 %% APIs
--export([start_link/0]).
-
 -export([ trace/2
 -export([ trace/2
         , start_trace/3
         , start_trace/3
         , lookup_traces/0
         , lookup_traces/0
         , stop_trace/1
         , stop_trace/1
         ]).
         ]).
 
 
-%% gen_server callbacks
--export([ init/1
-        , handle_call/3
-        , handle_cast/2
-        , handle_info/2
-        , terminate/2
-        , code_change/3
-        ]).
-
--record(state, {traces}).
-
--type(trace_who() :: {client_id | topic, binary()}).
+-type(trace_who() :: {client_id | topic, binary() | list()}).
 
 
 -define(TRACER, ?MODULE).
 -define(TRACER, ?MODULE).
 -define(FORMAT, {emqx_logger_formatter,
 -define(FORMAT, {emqx_logger_formatter,
@@ -57,120 +42,100 @@
                               [peername," "],
                               [peername," "],
                               []}]},
                               []}]},
                        msg,"\n"]}}).
                        msg,"\n"]}}).
+-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
+-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
+-define(TOPIC_TRACE(T), {topic,T}).
+-define(CLIENT_TRACE(C), {client_id,C}).
+
+-define(is_log_level(L),
+        L =:= emergency orelse
+        L =:= alert orelse
+        L =:= critical orelse
+        L =:= error orelse
+        L =:= warning orelse
+        L =:= notice orelse
+        L =:= info orelse
+        L =:= debug).
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% APIs
 %% APIs
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
-
--spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
-start_link() ->
-    gen_server:start_link({local, ?TRACER}, ?MODULE, [], []).
-
 trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
 trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
-    %% Dont' trace '$SYS' publish
+    %% Do not trace '$SYS' publish
     ignore;
     ignore;
 trace(publish, #message{from = From, topic = Topic, payload = Payload})
 trace(publish, #message{from = From, topic = Topic, payload = Payload})
         when is_binary(From); is_atom(From) ->
         when is_binary(From); is_atom(From) ->
     emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
     emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
 
 
-%%------------------------------------------------------------------------------
-%% Start/Stop trace
-%%------------------------------------------------------------------------------
-
 %% @doc Start to trace client_id or topic.
 %% @doc Start to trace client_id or topic.
 -spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
 -spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
-start_trace({client_id, ClientId}, Level, LogFile) ->
-    do_start_trace({client_id, ClientId}, Level, LogFile);
-start_trace({topic, Topic}, Level, LogFile) ->
-    do_start_trace({topic, Topic}, Level, LogFile).
-
-do_start_trace(Who, Level, LogFile) ->
-    #{level := PrimaryLevel} = logger:get_primary_config(),
-    try logger:compare_levels(log_level(Level), PrimaryLevel) of
-        lt ->
-            {error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
-        _GtOrEq ->
-            gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000)
-    catch
-        _:Error ->
-           {error, Error}
+start_trace(Who, all, LogFile) ->
+    start_trace(Who, debug, LogFile);
+start_trace(Who, Level, LogFile) ->
+    case ?is_log_level(Level) of
+        true ->
+            #{level := PrimaryLevel} = logger:get_primary_config(),
+            try logger:compare_levels(Level, PrimaryLevel) of
+                lt ->
+                    {error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
+                _GtOrEq ->
+                    install_trace_handler(Who, Level, LogFile)
+            catch
+                _:Error ->
+                {error, Error}
+            end;
+        false -> {error, {invalid_log_level, Level}}
     end.
     end.
 
 
 %% @doc Stop tracing client_id or topic.
 %% @doc Stop tracing client_id or topic.
 -spec(stop_trace(trace_who()) -> ok | {error, term()}).
 -spec(stop_trace(trace_who()) -> ok | {error, term()}).
-stop_trace({client_id, ClientId}) ->
-    gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}});
-stop_trace({topic, Topic}) ->
-    gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
+stop_trace(Who) ->
+    uninstall_trance_handler(Who).
 
 
 %% @doc Lookup all traces
 %% @doc Lookup all traces
 -spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
 -spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
 lookup_traces() ->
 lookup_traces() ->
-    gen_server:call(?TRACER, lookup_traces).
+    lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers()).
 
 
-%%------------------------------------------------------------------------------
-%% gen_server callbacks
-%%------------------------------------------------------------------------------
-
-init([]) ->
-    {ok, #state{traces = #{}}}.
-
-handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = Traces}) ->
+install_trace_handler(Who, Level, LogFile) ->
     case logger:add_handler(handler_id(Who), logger_disk_log_h,
     case logger:add_handler(handler_id(Who), logger_disk_log_h,
-                                #{level => Level,
-                                  formatter => ?FORMAT,
-                                  filesync_repeat_interval => no_repeat,
-                                  config => #{type => halt, file => LogFile},
-                                  filter_default => stop,
-                                  filters => [{meta_key_filter,
-                                              {fun filter_by_meta_key/2, Who} }]}) of
+                            #{level => Level,
+                              formatter => ?FORMAT,
+                              filesync_repeat_interval => no_repeat,
+                              config => #{type => halt, file => LogFile},
+                              filter_default => stop,
+                              filters => [{meta_key_filter,
+                                          {fun filter_by_meta_key/2, Who}}]})
+    of
         ok ->
         ok ->
-            ?LOG(info, "Start trace for ~p", [Who]),
-            {reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
+            ?LOG(info, "Start trace for ~p", [Who]);
         {error, Reason} ->
         {error, Reason} ->
             ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
             ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
-            {reply, {error, Reason}, State}
-    end;
-
-handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
-    case maps:find(Who, Traces) of
-        {ok, _LogFile} ->
-            case logger:remove_handler(handler_id(Who)) of
-                ok ->
-                    ?LOG(info, "Stop trace for ~p", [Who]);
-                {error, Reason} ->
-                    ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason])
-            end,
-            {reply, ok, State#state{traces = maps:remove(Who, Traces)}};
-        error ->
-            {reply, {error, not_found}, State}
-    end;
-
-handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
-    {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
-
-handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
-    {reply, ignored, State}.
-
-handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
-    {noreply, State}.
-
-handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-handler_id({topic, Topic}) ->
-    list_to_atom("topic_" ++ binary_to_list(Topic));
-handler_id({client_id, ClientId}) ->
-    list_to_atom("clientid_" ++ binary_to_list(ClientId)).
+            {error, Reason}
+    end.
+
+uninstall_trance_handler(Who) ->
+    case logger:remove_handler(handler_id(Who)) of
+        ok ->
+            ?LOG(info, "Stop trace for ~p", [Who]);
+        {error, Reason} ->
+            ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
+            {error, Reason}
+    end.
+
+filter_traces({Id, Level, Dst}, Acc) ->
+    case atom_to_list(Id) of
+        ?TOPIC_TRACE_ID(T)->
+            [{?TOPIC_TRACE(T), {Level,Dst}} | Acc];
+        ?CLIENT_TRACE_ID(C) ->
+            [{?CLIENT_TRACE(C), {Level,Dst}} | Acc];
+        _ -> Acc
+    end.
+
+handler_id(?TOPIC_TRACE(Topic)) ->
+    list_to_atom(?TOPIC_TRACE_ID(str(Topic)));
+handler_id(?CLIENT_TRACE(ClientId)) ->
+    list_to_atom(?CLIENT_TRACE_ID(str(ClientId))).
 
 
 filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
 filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
     case maps:find(MetaKey, Meta) of
     case maps:find(MetaKey, Meta) of
@@ -183,13 +148,6 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
         _ -> ignore
         _ -> ignore
     end.
     end.
 
 
-log_level(emergency) -> emergency;
-log_level(alert) -> alert;
-log_level(critical) -> critical;
-log_level(error) -> error;
-log_level(warning) -> warning;
-log_level(notice) -> notice;
-log_level(info) -> info;
-log_level(debug) -> debug;
-log_level(all) -> debug;
-log_level(_) -> throw(invalid_log_level).
+str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
+str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
+str(Str) when is_list(Str) -> Str.

+ 5 - 4
test/emqx_tracer_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 -compile(nowarn_export_all).
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
+
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 
 
 all() -> [t_start_traces].
 all() -> [t_start_traces].
@@ -44,7 +45,7 @@ t_start_traces(_Config) ->
     emqx_logger:set_log_level(debug),
     emqx_logger:set_log_level(debug),
     ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
     ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
     ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
     ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
-    {error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
+    {error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
     ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
     ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
     ct:sleep(100),
     ct:sleep(100),
 
 
@@ -54,9 +55,9 @@ t_start_traces(_Config) ->
     ?assert(filelib:is_regular("tmp/topic_trace.log")),
     ?assert(filelib:is_regular("tmp/topic_trace.log")),
 
 
     %% Get current traces
     %% Get current traces
-    ?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}},
-                  {{client_id,<<"client2">>},{all,"tmp/client2.log"}},
-                  {{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
+    ?assertEqual([{{client_id,"client"},{debug,"tmp/client.log"}},
+                  {{client_id,"client2"},{debug,"tmp/client2.log"}},
+                  {{topic,"a/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
 
 
     %% set the overall log level to debug
     %% set the overall log level to debug
     emqx_logger:set_log_level(debug),
     emqx_logger:set_log_level(debug),