Просмотр исходного кода

Merge pull request #6452 from zhongwencool/log-trace-api

feat(trace): trace http api schema
zhongwencool 4 лет назад
Родитель
Сommit
8be2aaf72c

+ 70 - 66
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -20,10 +20,8 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 
-%% Mnesia bootstrap
--export([mnesia/1]).
-
 -boot_mnesia({mnesia, [boot]}).
+-export([mnesia/1]).
 
 -export([ publish/1
         , subscribe/3
@@ -54,20 +52,22 @@
 -define(MAX_SIZE, 30).
 
 -ifdef(TEST).
--export([log_file/2]).
+-export([ log_file/2
+        , find_closest_time/2
+       ]).
 -endif.
 
 -export_type([ip_address/0]).
 -type ip_address() :: string().
 
--record(?TRACE,
-        { name :: binary() | undefined | '_'
-        , type :: clientid | topic | ip_address | undefined | '_'
-        , filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_'
-        , enable = true :: boolean() | '_'
-        , start_at :: integer() | undefined | '_'
-        , end_at :: integer() | undefined | '_'
-        }).
+-record(?TRACE, {
+         name :: binary() | undefined | '_'
+       , type :: clientid | topic | ip_address | undefined | '_'
+       , filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_'
+       , enable = true :: boolean() | '_'
+       , start_at :: integer() | undefined | '_'
+       , end_at :: integer() | undefined | '_'
+       }).
 
 mnesia(boot) ->
     ok = mria:create_table(?TRACE, [
@@ -205,14 +205,14 @@ init([]) ->
     {ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{unexpected_call => Req}),
     {reply, ok, State}.
 
 handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) ->
     erlang:monitor(process, Pid),
     {noreply, State#{monitors => Monitors#{Pid => Files}}};
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{unexpected_cast => Msg}),
     {noreply, State}.
 
 handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) ->
@@ -234,7 +234,7 @@ handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
     handle_info({timeout, TRef, update_trace}, State);
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{unexpected_info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) ->
@@ -279,24 +279,22 @@ stop_all_trace_handler() ->
     lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
         emqx_trace_handler:running()).
 get_enable_trace() ->
-    {atomic, Traces} =
-        mria:transaction(?COMMON_SHARD, fun() ->
-            mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
-                           end),
-    Traces.
+    transaction(fun() ->
+        mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
+                end).
 
 find_closest_time(Traces, Now) ->
     Sec =
         lists:foldl(
-            fun(#?TRACE{start_at = Start, end_at = End}, Closest)
-                when Start >= Now andalso Now < End -> %% running
-                min(End - Now, Closest);
-                (#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting
-                    min(Now - Start, Closest);
-                (_, Closest) -> Closest %% finished
+            fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) ->
+                min(closest(End, Now, Closest), closest(Start, Now, Closest));
+                (_, Closest) -> Closest
             end, 60 * 15, Traces),
     timer:seconds(Sec).
 
+closest(Time, Now, Closest) when Now >= Time -> Closest;
+closest(Time, Now, Closest) -> min(Time - Now, Closest).
+
 disable_finished([]) -> ok;
 disable_finished(Traces) ->
     transaction(fun() ->
@@ -367,29 +365,31 @@ classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
     classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
 
 to_trace(TraceParam) ->
-    case to_trace(ensure_proplists(TraceParam), #?TRACE{}) of
+    case to_trace(ensure_map(TraceParam), #?TRACE{}) of
         {error, Reason} -> {error, Reason};
         {ok, #?TRACE{name = undefined}} ->
             {error, "name required"};
         {ok, #?TRACE{type = undefined}} ->
             {error, "type=[topic,clientid,ip_address] required"};
-        {ok, #?TRACE{filter = undefined}} ->
-            {error, "topic/clientid/ip_address filter required"};
-        {ok, TraceRec0} ->
+        {ok, TraceRec0 = #?TRACE{}} ->
             case fill_default(TraceRec0) of
                 #?TRACE{start_at = Start, end_at = End} when End =< Start ->
                     {error, "failed by start_at >= end_at"};
-                TraceRec -> {ok, TraceRec}
+                TraceRec ->
+                    {ok, TraceRec}
             end
     end.
 
-ensure_proplists(#{} = Trace) -> maps:to_list(Trace);
-ensure_proplists(Trace) when is_list(Trace) ->
+ensure_map(#{} = Trace) ->
+    maps:fold(fun(K, V, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
+        (K, V, Acc) when is_atom(K) -> Acc#{K => V}
+              end, #{}, Trace);
+ensure_map(Trace) when is_list(Trace) ->
     lists:foldl(
-        fun({K, V}, Acc) when is_binary(K) -> [{binary_to_existing_atom(K), V} | Acc];
-            ({K, V}, Acc) when is_atom(K) -> [{K, V} | Acc];
+        fun({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V};
+            ({K, V}, Acc) when is_atom(K) -> Acc#{K => V};
             (_, Acc) -> Acc
-        end, [], Trace).
+        end, #{}, Trace).
 
 fill_default(Trace = #?TRACE{start_at = undefined}) ->
     fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
@@ -397,49 +397,47 @@ fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
     fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
 fill_default(Trace) -> Trace.
 
-to_trace([], Rec) -> {ok, Rec};
-to_trace([{name, Name} | Trace], Rec) ->
-    case io_lib:printable_unicode_list(unicode:characters_to_list(Name, utf8)) of
-        true ->
-            case binary:match(Name, [<<"/">>], []) of
-                nomatch -> to_trace(Trace, Rec#?TRACE{name = Name});
-                _ -> {error, "name cannot contain /"}
-            end;
-        false -> {error, "name must printable unicode"}
-    end;
-to_trace([{type, Type} | Trace], Rec) ->
-    case lists:member(Type, [<<"clientid">>, <<"topic">>, <<"ip_address">>]) of
-        true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)});
-        false -> {error, "incorrect type: only support clientid/topic/ip_address"}
+-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
+
+to_trace(#{name := Name} = Trace, Rec) ->
+    case re:run(Name, ?NAME_RE) of
+        nomatch -> {error, "Name should be " ?NAME_RE};
+        _ -> to_trace(maps:remove(name, Trace), Rec#?TRACE{name = Name})
     end;
-to_trace([{topic, Topic} | Trace], Rec) ->
-    case validate_topic(Topic) of
-        ok -> to_trace(Trace, Rec#?TRACE{filter = Topic});
-        {error, Reason} -> {error, Reason}
+to_trace(#{type := clientid, clientid := Filter} = Trace, Rec) ->
+    Trace0 = maps:without([type, clientid], Trace),
+    to_trace(Trace0, Rec#?TRACE{type = clientid, filter = Filter});
+to_trace(#{type := topic, topic := Filter} = Trace, Rec) ->
+    case validate_topic(Filter) of
+        ok ->
+            Trace0 = maps:without([type, topic], Trace),
+            to_trace(Trace0, Rec#?TRACE{type = topic, filter = Filter});
+        Error -> Error
     end;
-to_trace([{clientid, ClientId} | Trace], Rec) ->
-    to_trace(Trace, Rec#?TRACE{filter = ClientId});
-to_trace([{ip_address, IP} | Trace], Rec) ->
-    case inet:parse_address(binary_to_list(IP)) of
-        {ok, _} -> to_trace(Trace, Rec#?TRACE{filter = binary_to_list(IP)});
-        {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
+to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
+    case validate_ip_address(Filter) of
+        ok ->
+            Trace0 = maps:without([type, ip_address], Trace),
+            to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter});
+        Error -> Error
     end;
-to_trace([{start_at, StartAt} | Trace], Rec) ->
+to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])};
+to_trace(#{start_at := StartAt} = Trace, Rec) ->
     case to_system_second(StartAt) of
-        {ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec});
+        {ok, Sec} -> to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
         {error, Reason} -> {error, Reason}
     end;
-to_trace([{end_at, EndAt} | Trace], Rec) ->
+to_trace(#{end_at := EndAt} = Trace, Rec) ->
     Now = erlang:system_time(second),
     case to_system_second(EndAt) of
         {ok, Sec} when Sec > Now ->
-            to_trace(Trace, Rec#?TRACE{end_at = Sec});
+            to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec});
         {ok, _Sec} ->
             {error, "end_at time has already passed"};
         {error, Reason} ->
             {error, Reason}
     end;
-to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}.
+to_trace(_, Rec) -> {ok, Rec}.
 
 validate_topic(TopicName) ->
     try emqx_topic:validate(filter, TopicName) of
@@ -448,11 +446,17 @@ validate_topic(TopicName) ->
         error:Error ->
             {error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])}
     end.
+validate_ip_address(IP) ->
+    case inet:parse_address(binary_to_list(IP)) of
+        {ok, _} -> ok;
+        {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
+    end.
 
 to_system_second(At) ->
     try
         Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]),
-        {ok, Sec}
+        Now = erlang:system_time(second),
+        {ok, erlang:max(Now, Sec)}
     catch error: {badmatch, _} ->
         {error, ["The rfc3339 specification not satisfied: ", At]}
     end.

+ 22 - 4
apps/emqx/src/emqx_trace/emqx_trace_api.erl

@@ -107,10 +107,23 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
         case Res of
             {ok, Node, Bin} ->
                 ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
-                ok = file:write_file(ZipName, Bin),
-                [Node ++ "-" ++ TraceLog | Acc];
+                case file:write_file(ZipName, Bin) of
+                ok -> [Node ++ "-" ++ TraceLog | Acc];
+                Error ->
+                    ?SLOG(error, #{
+                        msg => "write_file_failed",
+                        error => Error,
+                        zip_name => ZipName,
+                        byte_size => byte_size(Bin)}),
+                    Acc
+                end;
             {error, Node, Reason} ->
-                ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
+                ?SLOG(error, #{
+                    msg => "download_trace_log_failed",
+                    node => Node,
+                    trace_log => TraceLog,
+                    reason => Reason
+                }),
                 Acc
         end
                 end, [], TraceFiles).
@@ -121,7 +134,12 @@ collect_trace_file(TraceLog) ->
 cluster_call(Mod, Fun, Args, Timeout) ->
     Nodes = mria_mnesia:running_nodes(),
     {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
-    BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]),
+    BadNodes =/= [] andalso
+        ?SLOG(error, #{
+            msg => "rpc_call_failed",
+            bad_nodes => BadNodes,
+            mfa => {Mod, Fun, Args }
+        }),
     GoodRes.
 
 stream_log_file(#{name := Name}, Params) ->

+ 54 - 44
apps/emqx/src/emqx_trace/emqx_trace_handler.erl

@@ -38,26 +38,10 @@
 -export([handler_id/2]).
 
 -type tracer() :: #{
-                    name := binary(),
-                    type := clientid | topic | ip_address,
-                    filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
-                   }.
-
--define(FORMAT,
-    {logger_formatter, #{
-        template => [
-            time, " [", level, "] ",
-            {clientid,
-                [{peername, [clientid, "@", peername, " "], [clientid, " "]}],
-                [{peername, [peername, " "], []}]
-            },
-            msg, "\n"
-        ],
-        single_line => false,
-        max_size => unlimited,
-        depth => unlimited
-    }}
-).
+         name := binary(),
+         type := clientid | topic | ip_address,
+         filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
+         }.
 
 -define(CONFIG(_LogFile_), #{
     type => halt,
@@ -68,25 +52,25 @@
     overload_kill_qlen => 20000,
     %% disable restart
     overload_kill_restart_after => infinity
-    }).
+}).
 
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
 
 -spec install(Name :: binary() | list(),
-              Type :: clientid | topic | ip_address,
-              Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
-              Level :: logger:level() | all,
-              LogFilePath :: string()) -> ok | {error, term()}.
+    Type :: clientid | topic | ip_address,
+    Filter :: emqx_types:clientid() | emqx_types:topic() | string(),
+    Level :: logger:level() | all,
+    LogFilePath :: string()) -> ok | {error, term()}.
 install(Name, Type, Filter, Level, LogFile) ->
     Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)},
     install(Who, Level, LogFile).
 
 -spec install(Type :: clientid | topic | ip_address,
-              Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
-              Level :: logger:level() | all,
-              LogFilePath :: string()) -> ok | {error, term()}.
+    Filter :: emqx_types:clientid() | emqx_types:topic() | string(),
+    Level :: logger:level() | all,
+    LogFilePath :: string()) -> ok | {error, term()}.
 install(Type, Filter, Level, LogFile) ->
     install(Filter, Type, Filter, Level, LogFile).
 
@@ -111,7 +95,7 @@ install(Who, Level, LogFile) ->
     end.
 
 -spec uninstall(Type :: clientid | topic | ip_address,
-                Name :: binary() | list()) -> ok | {error, term()}.
+    Name :: binary() | list()) -> ok | {error, term()}.
 uninstall(Type, Name) ->
     HandlerId = handler_id(ensure_bin(Name), Type),
     uninstall(HandlerId).
@@ -126,12 +110,12 @@ uninstall(HandlerId) ->
 -spec running() ->
     [
         #{
-            name => binary(),
-            type => topic | clientid | ip_address,
-            id => atom(),
-            filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(),
-            level => logger:level(),
-            dst => file:filename() | console | unknown
+        name => binary(),
+        type => topic | clientid | ip_address,
+        id => atom(),
+        filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(),
+        level => logger:level(),
+        dst => file:filename() | console | unknown
         }
     ].
 running() ->
@@ -159,12 +143,13 @@ filter_ip_address(_Log, _ExpectId) -> ignore.
 
 install_handler(Who = #{name := Name, type := Type}, Level, LogFile) ->
     HandlerId = handler_id(Name, Type),
-    Config = #{ level => Level,
-                formatter => ?FORMAT,
-                filter_default => stop,
-                filters => filters(Who),
-                config => ?CONFIG(LogFile)
-                },
+    Config = #{
+        level => Level,
+        formatter => formatter(Who),
+        filter_default => stop,
+        filters => filters(Who),
+        config => ?CONFIG(LogFile)
+    },
     Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
     show_prompts(Res, Who, "Start trace"),
     Res.
@@ -176,11 +161,36 @@ filters(#{type := topic, filter := Filter, name := Name}) ->
 filters(#{type := ip_address, filter := Filter, name := Name}) ->
     [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
 
+formatter(#{type := Type}) ->
+    {logger_formatter,
+        #{
+            template => template(Type),
+            single_line => false,
+            max_size => unlimited,
+            depth => unlimited
+        }
+    }.
+
+%% Don't log clientid since clientid only supports exact match, all client ids are the same.
+%% if clientid is not latin characters. the logger_formatter restricts the output must be `~tp`
+%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read.
+template(clientid) ->
+    [time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"];
+%% TODO better format when clientid is utf8.
+template(_) ->
+    [time, " [", level, "] ",
+        {clientid,
+            [{peername, [clientid, "@", peername, " "], [clientid, " "]}],
+            [{peername, [peername, " "], []}]
+        },
+        msg, "\n"
+    ].
+
 filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
     Init = #{id => Id, level => Level, dst => Dst},
     case Filters of
         [{Type, {_FilterFun, {Filter, Name}}}] when
-                Type =:= topic orelse
+            Type =:= topic orelse
                 Type =:= clientid orelse
                 Type =:= ip_address ->
             [Init#{type => Type, filter => Filter, name => Name} | Acc];
@@ -201,7 +211,7 @@ handler_id(Name, Type) ->
 do_handler_id(Name, Type) ->
     TypeStr = atom_to_list(Type),
     NameStr = unicode:characters_to_list(Name, utf8),
-    FullNameStr = "trace_" ++ TypeStr ++ "_" ++  NameStr,
+    FullNameStr = "trace_" ++ TypeStr ++ "_" ++ NameStr,
     true = io_lib:printable_unicode_list(FullNameStr),
     FullNameBin = unicode:characters_to_binary(FullNameStr, utf8),
     binary_to_atom(FullNameBin, utf8).
@@ -209,7 +219,7 @@ do_handler_id(Name, Type) ->
 ensure_bin(List) when is_list(List) -> iolist_to_binary(List);
 ensure_bin(Bin) when is_binary(Bin) -> Bin.
 
-ensure_list(Bin) when is_binary(Bin) -> binary_to_list(Bin);
+ensure_list(Bin) when is_binary(Bin) -> unicode:characters_to_list(Bin, utf8);
 ensure_list(List) when is_list(List) -> List.
 
 show_prompts(ok, Who, Msg) ->

+ 90 - 62
apps/emqx/test/emqx_trace_SUITE.erl

@@ -22,7 +22,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("emqx/include/emqx.hrl").
--import(emqx_trace_handler_SUITE, [filesync/2]).
+
 -record(emqx_trace, {name, type, filter, enable = true, start_at, end_at}).
 
 %%--------------------------------------------------------------------
@@ -33,15 +33,22 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    application:load(emqx_plugin_libs),
     emqx_common_test_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([]).
 
-t_base_create_delete(_Config) ->
+init_per_testcase(_, Config) ->
     ok = emqx_trace:clear(),
+    reload(),
+    ct:pal("load:~p~n", [erlang:whereis(emqx_trace)]),
+    Config.
+
+end_per_testcase(_) ->
+    ok.
+
+t_base_create_delete(_Config) ->
     Now = erlang:system_time(second),
     Start = to_rfc3339(Now),
     End = to_rfc3339(Now + 30 * 60),
@@ -49,7 +56,7 @@ t_base_create_delete(_Config) ->
     ClientId = <<"test-device">>,
     Trace = #{
         name => Name,
-        type => <<"clientid">>,
+        type => clientid,
         clientid => ClientId,
         start_at => Start,
         end_at => End
@@ -84,15 +91,14 @@ t_base_create_delete(_Config) ->
     ok.
 
 t_create_size_max(_Config) ->
-    emqx_trace:clear(),
     lists:map(fun(Seq) ->
         Name = list_to_binary("name" ++ integer_to_list(Seq)),
-        Trace = [{name, Name}, {type, <<"topic">>},
+        Trace = [{name, Name}, {type, topic},
             {topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
         ok = emqx_trace:create(Trace)
               end, lists:seq(1, 30)),
     Trace31 = [{<<"name">>, <<"name31">>},
-        {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/31">>}],
+        {<<"type">>, topic}, {<<"topic">>, <<"/x/y/31">>}],
     {error, _} = emqx_trace:create(Trace31),
     ok = emqx_trace:delete(<<"name30">>),
     ok = emqx_trace:create(Trace31),
@@ -100,54 +106,52 @@ t_create_size_max(_Config) ->
     ok.
 
 t_create_failed(_Config) ->
-    ok = emqx_trace:clear(),
-    UnknownField = [{<<"unknown">>, 12}],
+    Name = {<<"name">>, <<"test">>},
+    UnknownField = [Name, {<<"unknown">>, 12}],
     {error, Reason1} = emqx_trace:create(UnknownField),
-    ?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)),
+    ?assertEqual(<<"type=[topic,clientid,ip_address] required">>, iolist_to_binary(Reason1)),
 
-    InvalidTopic = [{<<"topic">>, "#/#//"}],
+    InvalidTopic = [Name, {<<"topic">>, "#/#//"}, {<<"type">>, topic}],
     {error, Reason2} = emqx_trace:create(InvalidTopic),
     ?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
 
-    InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}],
+    InvalidStart = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>},
+        {<<"start_at">>, <<"2021-12-3:12">>}],
     {error, Reason3} = emqx_trace:create(InvalidStart),
     ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
         iolist_to_binary(Reason3)),
 
-    InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}],
+    InvalidEnd = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>},
+        {<<"end_at">>, <<"2021-12-3:12">>}],
     {error, Reason4} = emqx_trace:create(InvalidEnd),
     ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
         iolist_to_binary(Reason4)),
 
-    {error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]),
-    ?assertEqual(<<"topic/clientid/ip_address filter required">>, iolist_to_binary(Reason7)),
+    {error, Reason7} = emqx_trace:create([Name, {<<"type">>, clientid}]),
+    ?assertEqual(<<"required clientid field">>, iolist_to_binary(Reason7)),
 
     InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>},
-        {<<"type">>, <<"clientid">>}],
+        {<<"type">>, clientid}],
     {error, Reason9} = emqx_trace:create(InvalidPackets4),
-    ?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)),
+    ?assertEqual(<<"Name should be ^[A-Za-z]+[A-Za-z0-9-_]*$">>, iolist_to_binary(Reason9)),
 
     ?assertEqual({error, "type=[topic,clientid,ip_address] required"},
         emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])),
 
-    ?assertEqual({error, "incorrect type: only support clientid/topic/ip_address"},
-        emqx_trace:create([{<<"name">>, <<"test-name">>},
-            {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])),
-
     ?assertEqual({error, "ip address: einval"},
-        emqx_trace:create([{<<"ip_address">>, <<"test-name">>}])),
+        emqx_trace:create([Name, {<<"type">>, ip_address},
+            {<<"ip_address">>, <<"test-name">>}])),
     ok.
 
 t_create_default(_Config) ->
-    ok = emqx_trace:clear(),
     {error, "name required"} = emqx_trace:create([]),
     ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
-        {<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]),
+        {<<"type">>, clientid}, {<<"clientid">>, <<"good">>}]),
     [#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(),
     ok = emqx_trace:clear(),
     Trace = [
         {<<"name">>, <<"test-name">>},
-        {<<"type">>, <<"topic">>},
+        {<<"type">>, topic},
         {<<"topic">>, <<"/x/y/z">>},
         {<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>},
         {<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>}
@@ -156,25 +160,38 @@ t_create_default(_Config) ->
     Now = erlang:system_time(second),
     Trace2 = [
         {<<"name">>, <<"test-name">>},
-        {<<"type">>, <<"topic">>},
+        {<<"type">>, topic},
         {<<"topic">>, <<"/x/y/z">>},
         {<<"start_at">>, to_rfc3339(Now + 10)},
         {<<"end_at">>, to_rfc3339(Now + 3)}
     ],
     {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
     ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
-        {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}]),
+        {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}]),
     [#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(),
     ?assertEqual(10 * 60, End - Start),
     ?assertEqual(true, Start - erlang:system_time(second) < 5),
     ok.
 
-t_update_enable(_Config) ->
+t_create_with_extra_fields(_Config) ->
     ok = emqx_trace:clear(),
+    Trace = [
+        {<<"name">>, <<"test-name">>},
+        {<<"type">>, topic},
+        {<<"topic">>, <<"/x/y/z">>},
+        {<<"clientid">>, <<"dev001">>},
+        {<<"ip_address">>, <<"127.0.0.1">>}
+    ],
+    ok = emqx_trace:create(Trace),
+    ?assertMatch([#emqx_trace{name = <<"test-name">>, filter = <<"/x/y/z">>, type = topic}],
+        emqx_trace:list()),
+    ok.
+
+t_update_enable(_Config) ->
     Name = <<"test-name">>,
     Now = erlang:system_time(second),
     End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
-    ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"topic">>},
+    ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic},
         {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
     [#emqx_trace{enable = Enable}] = emqx_trace:list(),
     ?assertEqual(Enable, true),
@@ -192,16 +209,14 @@ t_update_enable(_Config) ->
     ok.
 
 t_load_state(_Config) ->
-    emqx_trace:clear(),
-    load(),
     Now = erlang:system_time(second),
-    Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
-        {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)},
-        {<<"end_at">>, to_rfc3339(Now + 2)}],
-    Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>},
+    Running = #{name => <<"Running">>, type => topic,
+        topic => <<"/x/y/1">>, start_at => to_rfc3339(Now - 1),
+        end_at => to_rfc3339(Now + 2)},
+    Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, topic},
         {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)},
         {<<"end_at">>, to_rfc3339(Now + 8)}],
-    Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>},
+    Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, topic},
         {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)},
         {<<"end_at">>, to_rfc3339(Now)}],
     ok = emqx_trace:create(Running),
@@ -218,54 +233,48 @@ t_load_state(_Config) ->
     Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
     ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
     ?assertEqual(ExpectEnables2, lists:sort(Enables2)),
-    unload(),
     ok.
 
 t_client_event(_Config) ->
     application:set_env(emqx, allow_anonymous, true),
-    emqx_trace:clear(),
     ClientId = <<"client-test">>,
-    load(),
     Now = erlang:system_time(second),
     Start = to_rfc3339(Now),
     Name = <<"test_client_id_event">>,
     ok = emqx_trace:create([{<<"name">>, Name},
-        {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
-    ct:sleep(200),
+        {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
     {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
     {ok, _} = emqtt:connect(Client),
     emqtt:ping(Client),
     ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
     ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
     ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
-        {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
-    ok = filesync(Name, clientid),
-    ok = filesync(<<"test_topic">>, topic),
+        {<<"type">>, topic}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
+    ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
     {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
     ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
     ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
     ok = emqtt:disconnect(Client),
-    ok = filesync(Name, clientid),
-    ok = filesync(<<"test_topic">>, topic),
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
+    ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
     {ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
     {ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
     ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
     ?assert(erlang:byte_size(Bin) > 0),
     ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
     ?assert(erlang:byte_size(Bin3) > 0),
-    unload(),
     ok.
 
 t_get_log_filename(_Config) ->
-    ok = emqx_trace:clear(),
-    load(),
     Now = erlang:system_time(second),
     Start = calendar:system_time_to_rfc3339(Now),
     End = calendar:system_time_to_rfc3339(Now + 2),
     Name = <<"name1">>,
     Trace = [
         {<<"name">>, Name},
-        {<<"type">>, <<"ip_address">>},
+        {<<"type">>, ip_address},
         {<<"ip_address">>, <<"127.0.0.1">>},
         {<<"start_at">>, list_to_binary(Start)},
         {<<"end_at">>, list_to_binary(End)}
@@ -275,7 +284,6 @@ t_get_log_filename(_Config) ->
     ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
     ct:sleep(3000),
     ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
-    unload(),
     ok.
 
 t_trace_file(_Config) ->
@@ -291,29 +299,49 @@ t_trace_file(_Config) ->
     ok.
 
 t_download_log(_Config) ->
-    emqx_trace:clear(),
-    load(),
-    ClientId = <<"client-test">>,
+    ClientId = <<"client-test-download">>,
     Now = erlang:system_time(second),
     Start = to_rfc3339(Now),
     Name = <<"test_client_id">>,
     ok = emqx_trace:create([{<<"name">>, Name},
-        {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
+        {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
+    ct:sleep(50),
     {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
     {ok, _} = emqtt:connect(Client),
     [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
-    ok = filesync(Name, clientid),
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
     {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
     ?assert(filelib:file_size(ZipFile) > 0),
     ok = emqtt:disconnect(Client),
-    unload(),
+    ok.
+
+t_find_closed_time(_Config) ->
+    DefaultMs = 60 * 15000,
+    Now = erlang:system_time(second),
+    Traces2 = [],
+    ?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces2, Now)),
+    Traces3 = [#emqx_trace{name = <<"disable">>, start_at = Now + 1,
+        end_at = Now + 2, enable = false}],
+    ?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces3, Now)),
+    Traces4 = [#emqx_trace{name = <<"running">>, start_at = Now, end_at = Now + 10, enable = true}],
+    ?assertEqual(10000, emqx_trace:find_closest_time(Traces4, Now)),
+    Traces5 = [#emqx_trace{name = <<"waiting">>, start_at = Now + 2,
+        end_at = Now + 10, enable = true}],
+    ?assertEqual(2000, emqx_trace:find_closest_time(Traces5, Now)),
+    Traces = [
+        #emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 2, enable = true},
+        #emqx_trace{name = <<"running0">>, start_at = Now, end_at = Now + 5, enable = true},
+        #emqx_trace{name = <<"running1">>, start_at = Now - 1, end_at = Now + 1, enable = true},
+        #emqx_trace{name = <<"finished">>, start_at = Now - 2, end_at = Now - 1, enable = true},
+        #emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 1, enable = true},
+        #emqx_trace{name = <<"stopped">>, start_at = Now, end_at = Now + 10, enable = false}
+    ],
+    ?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)),
     ok.
 
 to_rfc3339(Second) ->
     list_to_binary(calendar:system_time_to_rfc3339(Second)).
 
-load() ->
-    emqx_trace:start_link().
-
-unload() ->
-    gen_server:stop(emqx_trace).
+reload() ->
+    catch ok = gen_server:stop(emqx_trace),
+    {ok, _Pid} = emqx_trace:start_link().

+ 19 - 2
apps/emqx/test/emqx_trace_handler_SUITE.erl

@@ -28,7 +28,7 @@
                  {password, <<"pass">>}
                 ]).
 
-all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address].
+all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8].
 
 init_per_suite(Config) ->
     emqx_common_test_helpers:boot_modules(all),
@@ -85,7 +85,6 @@ t_trace_clientid(_Config) ->
     emqtt:connect(T),
     emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
     emqtt:ping(T),
-
     ok = filesync(<<"client">>, clientid),
     ok = filesync(<<"client2">>, clientid),
     ok = filesync(<<"client3">>, clientid),
@@ -106,6 +105,22 @@ t_trace_clientid(_Config) ->
     emqtt:disconnect(T),
     ?assertEqual([], emqx_trace_handler:running()).
 
+t_trace_clientid_utf8(_) ->
+    emqx_logger:set_log_level(debug),
+
+    Utf8Id = <<"client 漢字編碼"/utf8>>,
+    ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"),
+    {ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
+    emqtt:connect(T),
+    [begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)],
+    emqtt:ping(T),
+
+    ok = filesync(Utf8Id, clientid),
+    ok = emqx_trace_handler:uninstall(clientid, Utf8Id),
+    emqtt:disconnect(T),
+    ?assertEqual([], emqx_trace_handler:running()),
+    ok.
+
 t_trace_topic(_Config) ->
     {ok, T} = emqtt:start_link(?CLIENT),
     emqtt:connect(T),
@@ -161,6 +176,7 @@ t_trace_ip_address(_Config) ->
     ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
     ok = filesync(<<"127.0.0.1">>, ip_address),
     ok = filesync(<<"192.168.1.1">>, ip_address),
+
     %% Verify the tracing file exits
     ?assert(filelib:is_regular("tmp/ip_trace_x.log")),
     ?assert(filelib:is_regular("tmp/ip_trace_y.log")),
@@ -198,6 +214,7 @@ t_trace_ip_address(_Config) ->
     emqtt:disconnect(T),
     ?assertEqual([], emqx_trace_handler:running()).
 
+
 filesync(Name, Type) ->
     ct:sleep(50),
     filesync(Name, Type, 3).

+ 408 - 0
apps/emqx_management/src/emqx_mgmt_api_trace.erl

@@ -0,0 +1,408 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_trace).
+
+-behaviour(minirest_api).
+
+-include_lib("kernel/include/file.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-export([ api_spec/0
+        , fields/1
+        , paths/0
+        , schema/1
+        , namespace/0
+        ]).
+
+-export([ trace/2
+        , delete_trace/2
+        , update_trace/2
+        , download_trace_log/2
+        , stream_log_file/2
+        ]).
+
+-export([validate_name/1]).
+
+%% for rpc
+-export([read_trace_file/3
+        , get_trace_size/0
+        ]).
+
+-define(TO_BIN(_B_), iolist_to_binary(_B_)).
+-define(NOT_FOUND(N), {404, #{code => 'NOT_FOUND', message => ?TO_BIN([N, " NOT FOUND"])}}).
+
+namespace() -> "trace".
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
+
+paths() ->
+    ["/trace", "/trace/:name/stop", "/trace/:name/download", "/trace/:name/log", "/trace/:name"].
+
+
+schema("/trace") ->
+    #{
+        'operationId' => trace,
+        get => #{
+            description => "List all trace",
+            responses => #{
+                200 => hoconsc:ref(trace)
+            }
+        },
+        post => #{
+            description => "Create new trace",
+            'requestBody' => delete([status, log_size], fields(trace)),
+            responses => #{
+                200 => hoconsc:ref(trace)
+            }
+        },
+        delete => #{
+            description => "Clear all traces",
+            responses => #{
+                204 => <<"No Content">>
+            }
+        }
+    };
+schema("/trace/:name") ->
+    #{
+        'operationId' => delete_trace,
+        delete => #{
+            description => "Delete trace by name",
+            parameters => [hoconsc:ref(name)],
+            responses => #{
+                204 => <<"Delete successfully">>,
+                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
+            }
+        }
+    };
+schema("/trace/:name/stop") ->
+    #{
+        'operationId' => update_trace,
+        put => #{
+            description => "Stop trace by name",
+            parameters => [hoconsc:ref(name)],
+            responses => #{
+                200 => hoconsc:ref(trace),
+                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
+            }
+        }
+    };
+schema("/trace/:name/download") ->
+    #{
+        'operationId' => download_trace_log,
+        get => #{
+            description => "Download trace log by name",
+            parameters => [hoconsc:ref(name)],
+            %% todo zip file octet-stream
+            responses => #{
+                200 => <<"TODO octet-stream">>
+            }
+        }
+    };
+schema("/trace/:name/log") ->
+    #{
+        'operationId' => stream_log_file,
+        get => #{
+            description => "view trace log",
+            parameters => [
+                hoconsc:ref(name),
+                hoconsc:ref(bytes),
+                hoconsc:ref(position),
+                hoconsc:ref(node)
+            ],
+            %% todo response data
+            responses => #{
+                200 => <<"TODO">>
+            }
+        }
+    }.
+
+fields(trace) ->
+    [
+        {name, hoconsc:mk(binary(),
+            #{desc => "Unique and format by [a-zA-Z0-9-_]",
+                validator => fun ?MODULE:validate_name/1,
+                nullable => false,
+                example => <<"EMQX-TRACE-1">>})},
+        {type, hoconsc:mk(hoconsc:enum([clientid, topic, ip_address]),
+            #{desc => """Filter type""",
+                nullable => false,
+                example => <<"clientid">>})},
+        {topic, hoconsc:mk(binary(),
+            #{desc => """support mqtt wildcard topic.""",
+                nullable => true,
+                example => <<"/dev/#">>})},
+        {clientid, hoconsc:mk(binary(),
+            #{desc => """mqtt clientid.""",
+                nullable => true,
+                example => <<"dev-001">>})},
+        %% TODO add ip_address type in emqx_schema.erl
+        {ip_address, hoconsc:mk(binary(),
+            #{desc => "client ip address",
+                nullable => true,
+                example => <<"127.0.0.1">>
+            })},
+        {status, hoconsc:mk(hoconsc:enum([running, stopped, waiting]),
+            #{desc => "trace status",
+                nullable => true,
+                example => running
+            })},
+        {start_at, hoconsc:mk(binary(),
+            #{desc => "rfc3339 timestamp",
+                nullable => true,
+                example => <<"2021-11-04T18:17:38+08:00">>
+            })},
+        {end_at, hoconsc:mk(binary(),
+            #{desc => "rfc3339 timestamp",
+                nullable => true,
+                example => <<"2021-11-05T18:17:38+08:00">>
+            })},
+        {log_size, hoconsc:mk(hoconsc:array(map()),
+            #{desc => "trace log size",
+                example => [#{<<"node">> => <<"emqx@127.0.0.1">>, <<"size">> => 1024}],
+                nullable => true})}
+    ];
+fields(name) ->
+    [{name, hoconsc:mk(binary(),
+        #{
+            desc => <<"[a-zA-Z0-9-_]">>,
+            example => <<"EMQX-TRACE-1">>,
+            in => path,
+            validator => fun ?MODULE:validate_name/1
+        })}
+    ];
+fields(node) ->
+    [{node, hoconsc:mk(binary(),
+        #{
+            desc => "Node name",
+            in => query,
+            nullable => true
+        })}];
+fields(bytes) ->
+    [{bytes, hoconsc:mk(integer(),
+        #{
+            desc => "Maximum number of bytes to store in request",
+            in => query,
+            nullable => true,
+            default => 1000
+        })}];
+fields(position) ->
+    [{position, hoconsc:mk(integer(),
+        #{
+            desc => "Offset from the current trace position.",
+            in => query,
+            nullable => true,
+            default => 0
+        })}].
+
+-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
+
+validate_name(Name) ->
+    NameLen = byte_size(Name),
+    case NameLen > 0 andalso NameLen =< 256 of
+        true ->
+            case re:run(Name, ?NAME_RE) of
+                nomatch -> {error, "Name should be " ?NAME_RE};
+                _ -> ok
+            end;
+        false -> {error, "Name Length must =< 256"}
+    end.
+
+delete(Keys, Fields) ->
+    lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys).
+
+trace(get, _Params) ->
+    case emqx_trace:list() of
+        [] -> {200, []};
+        List0 ->
+            List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end,
+                emqx_trace:format(List0)),
+            Nodes = mria_mnesia:running_nodes(),
+            TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000),
+            AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
+            Now = erlang:system_time(second),
+            Traces =
+                lists:map(fun(Trace = #{name := Name, start_at := Start,
+                    end_at := End, enable := Enable, type := Type, filter := Filter}) ->
+                    FileName = emqx_trace:filename(Name, Start),
+                    LogSize = collect_file_size(Nodes, FileName, AllFileSize),
+                    Trace0 = maps:without([enable, filter], Trace),
+                    Trace0#{log_size => LogSize
+                        , Type => iolist_to_binary(Filter)
+                        , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
+                        , end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
+                        , status => status(Enable, Start, End, Now)
+                    }
+                          end, List),
+            {200, Traces}
+    end;
+trace(post, #{body := Param}) ->
+    case emqx_trace:create(Param) of
+        ok -> {200};
+        {error, {already_existed, Name}} ->
+            {400, #{
+                code => 'ALREADY_EXISTED',
+                message => ?TO_BIN([Name, " Already Exists"])
+            }};
+        {error, {duplicate_condition, Name}} ->
+            {400, #{
+                code => 'DUPLICATE_CONDITION',
+                message => ?TO_BIN([Name, " Duplication Condition"])
+            }};
+        {error, Reason} ->
+            {400, #{
+                code => 'INCORRECT_PARAMS',
+                message => ?TO_BIN(Reason)
+            }}
+    end;
+trace(delete, _Param) ->
+    ok = emqx_trace:clear(),
+    {200}.
+
+delete_trace(delete, #{bindings := #{name := Name}}) ->
+    case emqx_trace:delete(Name) of
+        ok -> {200};
+        {error, not_found} -> ?NOT_FOUND(Name)
+    end.
+
+update_trace(put, #{bindings := #{name := Name}}) ->
+    case emqx_trace:update(Name, false) of
+        ok -> {200, #{enable => false, name => Name}};
+        {error, not_found} -> ?NOT_FOUND(Name)
+    end.
+
+%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
+%% cowboy_compress_h will auto encode gzip format.
+download_trace_log(get, #{bindings := #{name := Name}}) ->
+    case emqx_trace:get_trace_filename(Name) of
+        {ok, TraceLog} ->
+            TraceFiles = collect_trace_file(TraceLog),
+            ZipDir = emqx_trace:zip_dir(),
+            Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
+            ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
+            {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
+            emqx_trace:delete_files_after_send(ZipFileName, Zips),
+            {200, ZipFile};
+        {error, not_found} -> ?NOT_FOUND(Name)
+    end.
+
+group_trace_file(ZipDir, TraceLog, TraceFiles) ->
+    lists:foldl(fun(Res, Acc) ->
+        case Res of
+            {ok, Node, Bin} ->
+                ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
+                case file:write_file(ZipName, Bin) of
+                    ok -> [Node ++ "-" ++ TraceLog | Acc];
+                    _ -> Acc
+                end;
+            {error, Node, Reason} ->
+                ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
+                Acc
+        end
+                end, [], TraceFiles).
+
+collect_trace_file(TraceLog) ->
+    cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
+
+cluster_call(Mod, Fun, Args, Timeout) ->
+    Nodes = mria_mnesia:running_nodes(),
+    {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
+    BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]),
+    GoodRes.
+
+stream_log_file(get, #{bindings := #{name := Name}, query_string := Query} = T) ->
+    Node0 = maps:get(<<"node">>, Query, atom_to_binary(node())),
+    Position = maps:get(<<"position">>, Query, 0),
+    Bytes = maps:get(<<"bytes">>, Query, 1000),
+    logger:error("~p", [T]),
+    case to_node(Node0) of
+        {ok, Node} ->
+            case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
+                {ok, Bin} ->
+                    Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
+                    {200, #{meta => Meta, items => Bin}};
+                {eof, Size} ->
+                    Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
+                    {200, #{meta => Meta, items => <<"">>}};
+                {error, Reason} ->
+                    logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]),
+                    {400, #{code => 'READ_FILE_ERROR', message => Reason}};
+                {badrpc, nodedown} ->
+                    {400, #{code => 'RPC_ERROR', message => "BadRpc node down"}}
+            end;
+        {error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}}
+    end.
+
+get_trace_size() ->
+    TraceDir = emqx_trace:trace_dir(),
+    Node = node(),
+    case file:list_dir(TraceDir) of
+        {ok, AllFiles} ->
+            lists:foldl(fun(File, Acc) ->
+                FullFileName = filename:join(TraceDir, File),
+                Acc#{{Node, File} => filelib:file_size(FullFileName)}
+                        end, #{}, lists:delete("zip", AllFiles));
+        _ -> #{}
+    end.
+
+%% this is an rpc call for stream_log_file/2
+read_trace_file(Name, Position, Limit) ->
+    case emqx_trace:get_trace_filename(Name) of
+        {error, _} = Error -> Error;
+        {ok, TraceFile} ->
+            TraceDir = emqx_trace:trace_dir(),
+            TracePath = filename:join([TraceDir, TraceFile]),
+            read_file(TracePath, Position, Limit)
+    end.
+
+read_file(Path, Offset, Bytes) ->
+    case file:open(Path, [read, raw, binary]) of
+        {ok, IoDevice} ->
+            try
+                _ = case Offset of
+                        0 -> ok;
+                        _ -> file:position(IoDevice, {bof, Offset})
+                    end,
+                case file:read(IoDevice, Bytes) of
+                    {ok, Bin} -> {ok, Bin};
+                    {error, Reason} -> {error, Reason};
+                    eof ->
+                        {ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
+                        {eof, Size}
+                end
+            after
+                file:close(IoDevice)
+            end;
+        {error, Reason} -> {error, Reason}
+    end.
+
+to_node(Node) ->
+    try {ok, binary_to_existing_atom(Node)}
+    catch _:_ ->
+        {error, not_found}
+    end.
+
+collect_file_size(Nodes, FileName, AllFiles) ->
+    lists:foldl(fun(Node, Acc) ->
+        Size = maps:get({Node, FileName}, AllFiles, 0),
+        Acc#{Node => Size}
+                end, #{}, Nodes).
+
+status(false, _Start, _End, _Now) -> <<"stopped">>;
+status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
+status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
+status(true, _Start, _End, _Now) -> <<"running">>.

+ 190 - 0
apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl

@@ -0,0 +1,190 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_trace_api_SUITE).
+
+%% API
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx.hrl").
+
+-define(HOST, "http://127.0.0.1:18083/").
+-define(API_VERSION, "v5").
+-define(BASE_PATH, "api").
+
+%%--------------------------------------------------------------------
+%% Setups
+%%--------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_mgmt_api_test_util:init_suite(),
+    Config.
+
+end_per_suite(_) ->
+    emqx_mgmt_api_test_util:end_suite().
+
+t_http_test(_Config) ->
+    emqx_trace:clear(),
+    load(),
+    Header = auth_header_(),
+    %% list
+    {ok, Empty} = request_api(get, api_path("trace"), Header),
+    ?assertEqual([], json(Empty)),
+    %% create
+    ErrorTrace = #{},
+    {error, {"HTTP/1.1", 400, "Bad Request"}, Body} =
+        request_api(post, api_path("trace"), Header, ErrorTrace),
+    ?assertEqual(
+        #{
+            <<"code">> => <<"BAD_REQUEST">>,
+            <<"message">> => <<"name : not_nullable">>
+        }, json(Body)),
+
+    Name = <<"test-name">>,
+    Trace = [
+        {<<"name">>, Name},
+        {<<"type">>, <<"topic">>},
+        {<<"topic">>, <<"/x/y/z">>}
+    ],
+
+    {ok, Create} = request_api(post, api_path("trace"), Header, Trace),
+    ?assertEqual(<<>>, Create),
+
+    {ok, List} = request_api(get, api_path("trace"), Header),
+    [Data] = json(List),
+    ?assertEqual(Name, maps:get(<<"name">>, Data)),
+
+    %% update
+    {ok, Update} = request_api(put, api_path("trace/test-name/stop"), Header, #{}),
+    ?assertEqual(#{<<"enable">> => false,
+        <<"name">> => <<"test-name">>}, json(Update)),
+
+    {ok, List1} = request_api(get, api_path("trace"), Header),
+    [Data1] = json(List1),
+    Node = atom_to_binary(node()),
+    ?assertMatch(#{
+        <<"status">> := <<"stopped">>,
+        <<"name">> := <<"test-name">>,
+        <<"log_size">> := #{Node := _},
+        <<"start_at">> := _,
+        <<"end_at">> := _,
+        <<"type">> := <<"topic">>,
+        <<"topic">> := <<"/x/y/z">>
+    }, Data1),
+
+    %% delete
+    {ok, Delete} = request_api(delete, api_path("trace/test-name"), Header),
+    ?assertEqual(<<>>, Delete),
+
+    {error, {"HTTP/1.1", 404, "Not Found"}, DeleteNotFound}
+        = request_api(delete, api_path("trace/test-name"), Header),
+    ?assertEqual(#{<<"code">> => <<"NOT_FOUND">>,
+        <<"message">> => <<"test-name NOT FOUND">>}, json(DeleteNotFound)),
+
+    {ok, List2} = request_api(get, api_path("trace"), Header),
+    ?assertEqual([], json(List2)),
+
+    %% clear
+    {ok, Create1} = request_api(post, api_path("trace"), Header, Trace),
+    ?assertEqual(<<>>, Create1),
+
+    {ok, Clear} = request_api(delete, api_path("trace"), Header),
+    ?assertEqual(<<>>, Clear),
+
+    unload(),
+    ok.
+
+t_stream_log(_Config) ->
+    application:set_env(emqx, allow_anonymous, true),
+    emqx_trace:clear(),
+    load(),
+    ClientId = <<"client-stream">>,
+    Now = erlang:system_time(second),
+    Name = <<"test_stream_log">>,
+    Start = to_rfc3339(Now - 10),
+    ok = emqx_trace:create(#{<<"name">> => Name,
+        <<"type">> => clientid, <<"clientid">> => ClientId, <<"start_at">> => Start}),
+    ct:sleep(200),
+    {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
+    {ok, _} = emqtt:connect(Client),
+    [begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)],
+    emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]),
+    emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]),
+    ok = emqtt:disconnect(Client),
+    ct:sleep(200),
+    File = emqx_trace:log_file(Name, Now),
+    ct:pal("FileName: ~p", [File]),
+    {ok, FileBin} = file:read_file(File),
+    ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]),
+    Header = auth_header_(),
+    {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header),
+    #{<<"meta">> := Meta, <<"items">> := Bin} = json(Binary),
+    ?assertEqual(10, byte_size(Bin)),
+    ?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta),
+    Path = api_path("trace/test_stream_log/log?position=20&bytes=10"),
+    {ok, Binary1} = request_api(get, Path, Header),
+    #{<<"meta">> := Meta1, <<"items">> := Bin1} = json(Binary1),
+    ?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1),
+    ?assertEqual(10, byte_size(Bin1)),
+    unload(),
+    ok.
+
+to_rfc3339(Second) ->
+    list_to_binary(calendar:system_time_to_rfc3339(Second)).
+
+auth_header_() ->
+    auth_header_("admin", "public").
+
+auth_header_(User, Pass) ->
+    Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
+    {"Authorization", "Basic " ++ Encoded}.
+
+request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}).
+
+request_api(Method, Url, Auth, Body) ->
+    Request = {Url, [Auth], "application/json", emqx_json:encode(Body)},
+    do_request_api(Method, Request).
+
+do_request_api(Method, Request) ->
+    ct:pal("Method: ~p, Request: ~p", [Method, Request]),
+    case httpc:request(Method, Request, [], [{body_format, binary}]) of
+        {error, socket_closed_remotely} ->
+            {error, socket_closed_remotely};
+        {error, {shutdown, server_closed}} ->
+            {error, server_closed};
+        {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}}
+            when Code =:= 200 orelse Code =:= 201 ->
+            {ok, Return};
+        {ok, {Reason, _Header, Body}} ->
+            {error, Reason, Body}
+    end.
+
+api_path(Path) ->
+    ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]).
+
+json(Data) ->
+    {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx.
+
+load() ->
+    emqx_trace:start_link().
+
+unload() ->
+    gen_server:stop(emqx_trace).