Просмотр исходного кода

fix: delete error log when file enoent.

delete emqx_trace_api, replace LOG by SLOG
zhongwencool 4 лет назад
Родитель
Сommit
07ba4ad05e

+ 0 - 228
apps/emqx/src/emqx_trace/emqx_trace_api.erl

@@ -1,228 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_trace_api).
--include_lib("emqx/include/logger.hrl").
--include_lib("kernel/include/file.hrl").
-
-%% API
--export([ list_trace/2
-        , create_trace/2
-        , update_trace/2
-        , delete_trace/2
-        , clear_traces/2
-        , download_zip_log/2
-        , stream_log_file/2
-]).
--export([ read_trace_file/3
-        , get_trace_size/0
-        ]).
-
--define(TO_BIN(_B_), iolist_to_binary(_B_)).
--define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, " NOT FOUND"])}).
-
-list_trace(_, _Params) ->
-    case emqx_trace:list() of
-        [] -> {ok, []};
-        List0 ->
-            List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, List0),
-            Nodes = mria_mnesia:running_nodes(),
-            TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000),
-            AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
-            Now = erlang:system_time(second),
-            Traces =
-                lists:map(fun(Trace = #{name := Name, start_at := Start,
-                    end_at := End, enable := Enable, type := Type, filter := Filter}) ->
-                    FileName = emqx_trace:filename(Name, Start),
-                    LogSize = collect_file_size(Nodes, FileName, AllFileSize),
-                    Trace0 = maps:without([enable, filter], Trace),
-                    Trace0#{ log_size => LogSize
-                           , Type => Filter
-                           , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
-                           , end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
-                           , status => status(Enable, Start, End, Now)
-                           }
-                          end, emqx_trace:format(List)),
-            {ok, Traces}
-    end.
-
-create_trace(_, Param) ->
-    case emqx_trace:create(Param) of
-        ok -> ok;
-        {error, {already_existed, Name}} ->
-            {error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])};
-        {error, {duplicate_condition, Name}} ->
-            {error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])};
-        {error, Reason} ->
-            {error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)}
-    end.
-
-delete_trace(#{name := Name}, _Param) ->
-    case emqx_trace:delete(Name) of
-        ok -> ok;
-        {error, not_found} -> ?NOT_FOUND(Name)
-    end.
-
-clear_traces(_, _) ->
-    emqx_trace:clear().
-
-update_trace(#{name := Name, operation := Operation}, _Param) ->
-    Enable = case Operation of disable -> false; enable -> true end,
-    case emqx_trace:update(Name, Enable) of
-        ok -> {ok, #{enable => Enable, name => Name}};
-        {error, not_found} -> ?NOT_FOUND(Name)
-    end.
-
-%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
-%% cowboy_compress_h will auto encode gzip format.
-download_zip_log(#{name := Name}, _Param) ->
-    case emqx_trace:get_trace_filename(Name) of
-        {ok, TraceLog} ->
-            TraceFiles = collect_trace_file(TraceLog),
-            ZipDir = emqx_trace:zip_dir(),
-            Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
-            ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
-            {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
-            emqx_trace:delete_files_after_send(ZipFileName, Zips),
-            {ok, ZipFile};
-        {error, Reason} ->
-            {error, Reason}
-    end.
-
-group_trace_file(ZipDir, TraceLog, TraceFiles) ->
-    lists:foldl(fun(Res, Acc) ->
-        case Res of
-            {ok, Node, Bin} ->
-                ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
-                case file:write_file(ZipName, Bin) of
-                ok -> [Node ++ "-" ++ TraceLog | Acc];
-                Error ->
-                    ?SLOG(error, #{
-                        msg => "write_file_failed",
-                        error => Error,
-                        zip_name => ZipName,
-                        byte_size => byte_size(Bin)}),
-                    Acc
-                end;
-            {error, Node, Reason} ->
-                ?SLOG(error, #{
-                    msg => "download_trace_log_failed",
-                    node => Node,
-                    trace_log => TraceLog,
-                    reason => Reason
-                }),
-                Acc
-        end
-                end, [], TraceFiles).
-
-collect_trace_file(TraceLog) ->
-    cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
-
-cluster_call(Mod, Fun, Args, Timeout) ->
-    Nodes = mria_mnesia:running_nodes(),
-    {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
-    BadNodes =/= [] andalso
-        ?SLOG(error, #{
-            msg => "rpc_call_failed",
-            bad_nodes => BadNodes,
-            mfa => {Mod, Fun, Args }
-        }),
-    GoodRes.
-
-stream_log_file(#{name := Name}, Params) ->
-    Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
-    Position0 = proplists:get_value(<<"position">>, Params, <<"0">>),
-    Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>),
-    case to_node(Node0) of
-        {ok, Node} ->
-            Position = binary_to_integer(Position0),
-            Bytes = binary_to_integer(Bytes0),
-            case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
-                {ok, Bin} ->
-                    Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
-                    {ok, #{meta => Meta, items => Bin}};
-                {eof, Size} ->
-                    Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
-                    {ok, #{meta => Meta, items => <<"">>}};
-                {error, Reason} ->
-                    logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
-                    {error, Reason};
-                {badrpc, nodedown} ->
-                    {error, "BadRpc node down"}
-            end;
-        {error, Reason} -> {error, Reason}
-    end.
-
-get_trace_size() ->
-    TraceDir = emqx_trace:trace_dir(),
-    Node = node(),
-    case file:list_dir(TraceDir) of
-        {ok, AllFiles} ->
-            lists:foldl(fun(File, Acc) ->
-                FullFileName = filename:join(TraceDir, File),
-                Acc#{{Node, File} => filelib:file_size(FullFileName)}
-                        end, #{}, lists:delete("zip", AllFiles));
-        _ -> #{}
-    end.
-
-%% this is an rpc call for stream_log_file/2
-read_trace_file(Name, Position, Limit) ->
-    TraceDir = emqx_trace:trace_dir(),
-    {ok, AllFiles} = file:list_dir(TraceDir),
-    TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_",
-    Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end,
-    case lists:filter(Filter, AllFiles) of
-        [TraceFile] ->
-            TracePath = filename:join([TraceDir, TraceFile]),
-            read_file(TracePath, Position, Limit);
-        [] -> {error, not_found}
-    end.
-
-read_file(Path, Offset, Bytes) ->
-    {ok, IoDevice} = file:open(Path, [read, raw, binary]),
-    try
-        _ = case Offset of
-                0 -> ok;
-                _ -> file:position(IoDevice, {bof, Offset})
-            end,
-        case file:read(IoDevice, Bytes) of
-            {ok, Bin} -> {ok, Bin};
-            {error, Reason} -> {error, Reason};
-            eof ->
-                {ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
-                {eof, Size}
-        end
-    after
-        file:close(IoDevice)
-    end.
-
-to_node(Node) ->
-    try {ok, binary_to_existing_atom(Node)}
-    catch _:_ ->
-        {error, "node not found"}
-    end.
-
-collect_file_size(Nodes, FileName, AllFiles) ->
-    lists:foldl(fun(Node, Acc) ->
-        Size = maps:get({Node, FileName}, AllFiles, 0),
-        Acc#{Node => Size}
-                end, #{}, Nodes).
-
-%% status(false, _Start, End, Now) when End > Now -> <<"stopped">>;
-status(false, _Start, _End, _Now) -> <<"stopped">>;
-status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
-status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
-status(true, _Start, _End, _Now) -> <<"running">>.

+ 0 - 17
apps/emqx/test/emqx_trace_SUITE.erl

@@ -298,23 +298,6 @@ t_trace_file(_Config) ->
     ok = file:delete(File),
     ok.
 
-t_download_log(_Config) ->
-    ClientId = <<"client-test-download">>,
-    Now = erlang:system_time(second),
-    Start = to_rfc3339(Now),
-    Name = <<"test_client_id">>,
-    ok = emqx_trace:create([{<<"name">>, Name},
-        {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
-    ct:sleep(50),
-    {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
-    {ok, _} = emqtt:connect(Client),
-    [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
-    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
-    {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
-    ?assert(filelib:file_size(ZipFile) > 0),
-    ok = emqtt:disconnect(Client),
-    ok.
-
 t_find_closed_time(_Config) ->
     DefaultMs = 60 * 15000,
     Now = erlang:system_time(second),

+ 7 - 2
apps/emqx_management/src/emqx_mgmt_api_trace.erl

@@ -338,7 +338,8 @@ collect_trace_file(TraceLog) ->
 cluster_call(Mod, Fun, Args, Timeout) ->
     Nodes = mria_mnesia:running_nodes(),
     {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
-    BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]),
+    BadNodes =/= [] andalso
+        ?SLOG(error, #{msg => "rpc call failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}),
     GoodRes.
 
 stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
@@ -354,8 +355,12 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
                 {eof, Size} ->
                     Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
                     {200, #{meta => Meta, items => <<"">>}};
+                {error, enoent} -> %% the waiting trace should return "" not error.
+                    Meta = #{<<"position">> => Position, <<"bytes">> => Bytes},
+                    {200, #{meta => Meta, items => <<"">>}};
                 {error, Reason} ->
-                    logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]),
+                    ?SLOG(error, #{msg => "read_file_failed",
+                        node => Node, name => Name, reason => Reason, position => Position, bytes => Bytes}),
                     {400, #{code => 'READ_FILE_ERROR', message => Reason}};
                 {badrpc, nodedown} ->
                     {400, #{code => 'RPC_ERROR', message => "BadRpc node down"}}

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -442,7 +442,7 @@ trace_off(Type, Filter) ->
 %%--------------------------------------------------------------------
 %% @doc Trace Cluster Command
 traces(["list"]) ->
-    {ok, List} = emqx_trace_api:list_trace(get, []),
+    {200, List} = emqx_mgmt_api_trace:trace(get, []),
     case List of
         [] ->
             emqx_ctl:print("Cluster Trace is empty~n", []);

+ 26 - 0
apps/emqx_management/test/emqx_mgmt_trace_api_SUITE.erl

@@ -22,6 +22,8 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("stdlib/include/zip.hrl").
 
 -define(HOST, "http://127.0.0.1:18083/").
 -define(API_VERSION, "v5").
@@ -112,6 +114,30 @@ t_http_test(_Config) ->
     unload(),
     ok.
 
+t_download_log(_Config) ->
+    ClientId = <<"client-test-download">>,
+    Now = erlang:system_time(second),
+    Start = to_rfc3339(Now),
+    Name = <<"test_client_id">>,
+    load(),
+    ok = emqx_trace:create([{<<"name">>, Name},
+        {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
+    ct:sleep(50),
+    {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
+    {ok, _} = emqtt:connect(Client),
+    [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
+    Header = auth_header_(),
+    {ok, Binary} = request_api(get, api_path("trace/test_client_id/download"), Header),
+    {ok, [_Comment,
+        #zip_file{name = ZipName, info = #file_info{size = Size, type = regular, access = read_write}}]}
+        = zip:table(Binary),
+    ?assert(Size > 0),
+    ZipNamePrefix = lists:flatten(io_lib:format("~s-trace_~s", [node(), Name])),
+    ?assertNotEqual(nomatch, re:run(ZipName, [ZipNamePrefix])),
+    ok = emqtt:disconnect(Client),
+    ok.
+
 t_stream_log(_Config) ->
     application:set_env(emqx, allow_anonymous, true),
     emqx_trace:clear(),