|
@@ -22,6 +22,7 @@
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
|
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
|
|
|
|
|
|
|
|
-export([
|
|
-export([
|
|
|
api_spec/0,
|
|
api_spec/0,
|
|
@@ -51,8 +52,7 @@
|
|
|
-define(MAX_SINT32, 2147483647).
|
|
-define(MAX_SINT32, 2147483647).
|
|
|
|
|
|
|
|
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
|
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
|
|
--define(NOT_FOUND(N), {404, #{code => 'NOT_FOUND', message => ?TO_BIN([N, " NOT FOUND"])}}).
|
|
|
|
|
--define(SERVICE_UNAVAILABLE(C, M), {503, #{code => C, message => ?TO_BIN(M)}}).
|
|
|
|
|
|
|
+-define(NOT_FOUND_WITH_MSG(N), ?NOT_FOUND(?TO_BIN([N, " NOT FOUND"]))).
|
|
|
-define(TAGS, [<<"Trace">>]).
|
|
-define(TAGS, [<<"Trace">>]).
|
|
|
|
|
|
|
|
namespace() -> "trace".
|
|
namespace() -> "trace".
|
|
@@ -476,13 +476,13 @@ format_trace(Trace0) ->
|
|
|
delete_trace(delete, #{bindings := #{name := Name}}) ->
|
|
delete_trace(delete, #{bindings := #{name := Name}}) ->
|
|
|
case emqx_trace:delete(Name) of
|
|
case emqx_trace:delete(Name) of
|
|
|
ok -> {204};
|
|
ok -> {204};
|
|
|
- {error, not_found} -> ?NOT_FOUND(Name)
|
|
|
|
|
|
|
+ {error, not_found} -> ?NOT_FOUND_WITH_MSG(Name)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
update_trace(put, #{bindings := #{name := Name}}) ->
|
|
update_trace(put, #{bindings := #{name := Name}}) ->
|
|
|
case emqx_trace:update(Name, false) of
|
|
case emqx_trace:update(Name, false) of
|
|
|
ok -> {200, #{enable => false, name => Name}};
|
|
ok -> {200, #{enable => false, name => Name}};
|
|
|
- {error, not_found} -> ?NOT_FOUND(Name)
|
|
|
|
|
|
|
+ {error, not_found} -> ?NOT_FOUND_WITH_MSG(Name)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
|
|
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
|
|
@@ -493,64 +493,85 @@ download_trace_log(get, #{bindings := #{name := Name}, query_string := Query}) -
|
|
|
case parse_node(Query, undefined) of
|
|
case parse_node(Query, undefined) of
|
|
|
{ok, Node} ->
|
|
{ok, Node} ->
|
|
|
TraceFiles = collect_trace_file(Node, TraceLog),
|
|
TraceFiles = collect_trace_file(Node, TraceLog),
|
|
|
- %% We generate a session ID so that we name files
|
|
|
|
|
- %% with unique names. Then we won't cause
|
|
|
|
|
- %% overwrites for concurrent requests.
|
|
|
|
|
- SessionId = emqx_utils:gen_id(),
|
|
|
|
|
- ZipDir = filename:join([emqx_trace:zip_dir(), SessionId]),
|
|
|
|
|
- ok = file:make_dir(ZipDir),
|
|
|
|
|
- %% Write files to ZipDir and create an in-memory zip file
|
|
|
|
|
- Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
|
|
|
|
|
- ZipName = binary_to_list(Name) ++ ".zip",
|
|
|
|
|
- Binary =
|
|
|
|
|
- try
|
|
|
|
|
- {ok, {ZipName, Bin}} = zip:zip(ZipName, Zips, [memory, {cwd, ZipDir}]),
|
|
|
|
|
- Bin
|
|
|
|
|
- after
|
|
|
|
|
- %% emqx_trace:delete_files_after_send(ZipFileName, Zips),
|
|
|
|
|
- %% TODO use file replace file_binary.(delete file after send is not ready now).
|
|
|
|
|
- ok = file:del_dir_r(ZipDir)
|
|
|
|
|
- end,
|
|
|
|
|
- ?tp(trace_api_download_trace_log, #{
|
|
|
|
|
- files => Zips,
|
|
|
|
|
- name => Name,
|
|
|
|
|
- session_id => SessionId,
|
|
|
|
|
- zip_dir => ZipDir,
|
|
|
|
|
- zip_name => ZipName
|
|
|
|
|
- }),
|
|
|
|
|
- Headers = #{
|
|
|
|
|
- <<"content-type">> => <<"application/x-zip">>,
|
|
|
|
|
- <<"content-disposition">> => iolist_to_binary(
|
|
|
|
|
- "attachment; filename=" ++ ZipName
|
|
|
|
|
- )
|
|
|
|
|
- },
|
|
|
|
|
- {200, Headers, {file_binary, ZipName, Binary}};
|
|
|
|
|
|
|
+ maybe_download_trace_log(Name, TraceLog, TraceFiles);
|
|
|
{error, not_found} ->
|
|
{error, not_found} ->
|
|
|
- ?NOT_FOUND(<<"Node">>)
|
|
|
|
|
|
|
+ ?NOT_FOUND_WITH_MSG(<<"Node">>)
|
|
|
end;
|
|
end;
|
|
|
{error, not_found} ->
|
|
{error, not_found} ->
|
|
|
- ?NOT_FOUND(Name)
|
|
|
|
|
|
|
+ ?NOT_FOUND_WITH_MSG(Name)
|
|
|
|
|
+ end.
|
|
|
|
|
+
|
|
|
|
|
+maybe_download_trace_log(Name, TraceLog, TraceFiles) ->
|
|
|
|
|
+ case group_trace_files(TraceLog, TraceFiles) of
|
|
|
|
|
+ #{nonempty := Files} ->
|
|
|
|
|
+ do_download_trace_log(Name, TraceLog, Files);
|
|
|
|
|
+ #{error := Reasons} ->
|
|
|
|
|
+ ?INTERNAL_ERROR(Reasons);
|
|
|
|
|
+ #{empty := _} ->
|
|
|
|
|
+ ?NOT_FOUND(<<"Trace is empty">>)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
|
|
+do_download_trace_log(Name, TraceLog, TraceFiles) ->
|
|
|
|
|
+ %% We generate a session ID so that we name files
|
|
|
|
|
+ %% with unique names. Then we won't cause
|
|
|
|
|
+ %% overwrites for concurrent requests.
|
|
|
|
|
+ SessionId = emqx_utils:gen_id(),
|
|
|
|
|
+ ZipDir = filename:join([emqx_trace:zip_dir(), SessionId]),
|
|
|
|
|
+ ok = file:make_dir(ZipDir),
|
|
|
|
|
+ %% Write files to ZipDir and create an in-memory zip file
|
|
|
|
|
+ Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
|
|
|
|
|
+ ZipName = binary_to_list(Name) ++ ".zip",
|
|
|
|
|
+ Binary =
|
|
|
|
|
+ try
|
|
|
|
|
+ {ok, {ZipName, Bin}} = zip:zip(ZipName, Zips, [memory, {cwd, ZipDir}]),
|
|
|
|
|
+ Bin
|
|
|
|
|
+ after
|
|
|
|
|
+ %% emqx_trace:delete_files_after_send(ZipFileName, Zips),
|
|
|
|
|
+ %% TODO use file replace file_binary.(delete file after send is not ready now).
|
|
|
|
|
+ ok = file:del_dir_r(ZipDir)
|
|
|
|
|
+ end,
|
|
|
|
|
+ ?tp(trace_api_download_trace_log, #{
|
|
|
|
|
+ files => Zips,
|
|
|
|
|
+ name => Name,
|
|
|
|
|
+ session_id => SessionId,
|
|
|
|
|
+ zip_dir => ZipDir,
|
|
|
|
|
+ zip_name => ZipName
|
|
|
|
|
+ }),
|
|
|
|
|
+ Headers = #{
|
|
|
|
|
+ <<"content-type">> => <<"application/x-zip">>,
|
|
|
|
|
+ <<"content-disposition">> => iolist_to_binary(
|
|
|
|
|
+ "attachment; filename=" ++ ZipName
|
|
|
|
|
+ )
|
|
|
|
|
+ },
|
|
|
|
|
+ {200, Headers, {file_binary, ZipName, Binary}}.
|
|
|
|
|
+
|
|
|
|
|
+group_trace_files(TraceLog, TraceFiles) ->
|
|
|
|
|
+ maps:groups_from_list(
|
|
|
|
|
+ fun
|
|
|
|
|
+ ({ok, _Node, <<>>}) ->
|
|
|
|
|
+ empty;
|
|
|
|
|
+ ({ok, _Node, _Bin}) ->
|
|
|
|
|
+ nonempty;
|
|
|
|
|
+ ({error, Node, Reason}) ->
|
|
|
|
|
+ ?SLOG(error, #{
|
|
|
|
|
+ msg => "download_trace_log_error",
|
|
|
|
|
+ node => Node,
|
|
|
|
|
+ log => TraceLog,
|
|
|
|
|
+ reason => Reason
|
|
|
|
|
+ }),
|
|
|
|
|
+ error
|
|
|
|
|
+ end,
|
|
|
|
|
+ TraceFiles
|
|
|
|
|
+ ).
|
|
|
|
|
+
|
|
|
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
|
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
|
|
lists:foldl(
|
|
lists:foldl(
|
|
|
- fun(Res, Acc) ->
|
|
|
|
|
- case Res of
|
|
|
|
|
- {ok, Node, Bin} ->
|
|
|
|
|
- FileName = Node ++ "-" ++ TraceLog,
|
|
|
|
|
- ZipName = filename:join([ZipDir, FileName]),
|
|
|
|
|
- case file:write_file(ZipName, Bin) of
|
|
|
|
|
- ok -> [FileName | Acc];
|
|
|
|
|
- _ -> Acc
|
|
|
|
|
- end;
|
|
|
|
|
- {error, Node, Reason} ->
|
|
|
|
|
- ?SLOG(error, #{
|
|
|
|
|
- msg => "download_trace_log_error",
|
|
|
|
|
- node => Node,
|
|
|
|
|
- log => TraceLog,
|
|
|
|
|
- reason => Reason
|
|
|
|
|
- }),
|
|
|
|
|
- Acc
|
|
|
|
|
|
|
+ fun({ok, Node, Bin}, Acc) ->
|
|
|
|
|
+ FileName = Node ++ "-" ++ TraceLog,
|
|
|
|
|
+ ZipName = filename:join([ZipDir, FileName]),
|
|
|
|
|
+ case file:write_file(ZipName, Bin) of
|
|
|
|
|
+ ok -> [FileName | Acc];
|
|
|
|
|
+ _ -> Acc
|
|
|
end
|
|
end
|
|
|
end,
|
|
end,
|
|
|
[],
|
|
[],
|
|
@@ -578,7 +599,7 @@ log_file_detail(get, #{bindings := #{name := Name}}) ->
|
|
|
TraceFiles = collect_trace_file_detail(TraceLog),
|
|
TraceFiles = collect_trace_file_detail(TraceLog),
|
|
|
{200, group_trace_file_detail(TraceFiles)};
|
|
{200, group_trace_file_detail(TraceFiles)};
|
|
|
{error, not_found} ->
|
|
{error, not_found} ->
|
|
|
- ?NOT_FOUND(Name)
|
|
|
|
|
|
|
+ ?NOT_FOUND_WITH_MSG(Name)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
group_trace_file_detail(TraceLogDetail) ->
|
|
group_trace_file_detail(TraceLogDetail) ->
|
|
@@ -609,7 +630,7 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
|
|
|
Meta = #{<<"position">> => Position, <<"bytes">> => Bytes},
|
|
Meta = #{<<"position">> => Position, <<"bytes">> => Bytes},
|
|
|
{200, #{meta => Meta, items => <<"">>}};
|
|
{200, #{meta => Meta, items => <<"">>}};
|
|
|
{error, not_found} ->
|
|
{error, not_found} ->
|
|
|
- ?NOT_FOUND(Name);
|
|
|
|
|
|
|
+ ?NOT_FOUND_WITH_MSG(Name);
|
|
|
{error, enomem} ->
|
|
{error, enomem} ->
|
|
|
?SLOG(warning, #{
|
|
?SLOG(warning, #{
|
|
|
code => not_enough_mem,
|
|
code => not_enough_mem,
|
|
@@ -617,12 +638,12 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
|
|
|
bytes => Bytes,
|
|
bytes => Bytes,
|
|
|
name => Name
|
|
name => Name
|
|
|
}),
|
|
}),
|
|
|
- ?SERVICE_UNAVAILABLE('SERVICE_UNAVAILABLE', <<"Requested chunk size too big">>);
|
|
|
|
|
|
|
+ ?SERVICE_UNAVAILABLE(<<"Requested chunk size too big">>);
|
|
|
{badrpc, nodedown} ->
|
|
{badrpc, nodedown} ->
|
|
|
- ?NOT_FOUND(<<"Node">>)
|
|
|
|
|
|
|
+ ?NOT_FOUND_WITH_MSG(<<"Node">>)
|
|
|
end;
|
|
end;
|
|
|
{error, not_found} ->
|
|
{error, not_found} ->
|
|
|
- ?NOT_FOUND(<<"Node">>)
|
|
|
|
|
|
|
+ ?NOT_FOUND_WITH_MSG(<<"Node">>)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}.
|
|
-spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}.
|