|
|
@@ -21,6 +21,7 @@
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("kernel/include/file.hrl").
|
|
|
-include_lib("snabbkaffe/include/trace.hrl").
|
|
|
+-include_lib("emqx/include/emqx_trace.hrl").
|
|
|
|
|
|
-export([
|
|
|
publish/1,
|
|
|
@@ -54,8 +55,6 @@
|
|
|
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
|
|
|
|
--include("emqx_trace.hrl").
|
|
|
-
|
|
|
-ifdef(TEST).
|
|
|
-export([
|
|
|
log_file/2,
|
|
|
@@ -147,7 +146,11 @@ list(Enable) ->
|
|
|
|
|
|
-spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) ->
|
|
|
{ok, #?TRACE{}}
|
|
|
- | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
|
|
|
+ | {error,
|
|
|
+ {duplicate_condition, iodata()}
|
|
|
+ | {already_existed, iodata()}
|
|
|
+ | {bad_type, any()}
|
|
|
+ | iodata()}.
|
|
|
create(Trace) ->
|
|
|
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
|
|
|
true ->
|
|
|
@@ -222,14 +225,16 @@ format(Traces) ->
|
|
|
|
|
|
init([]) ->
|
|
|
erlang:process_flag(trap_exit, true),
|
|
|
+ Fields = record_info(fields, ?TRACE),
|
|
|
ok = mria:create_table(?TRACE, [
|
|
|
{type, set},
|
|
|
{rlog_shard, ?SHARD},
|
|
|
{storage, disc_copies},
|
|
|
{record_name, ?TRACE},
|
|
|
- {attributes, record_info(fields, ?TRACE)}
|
|
|
+ {attributes, Fields}
|
|
|
]),
|
|
|
ok = mria:wait_for_tables([?TRACE]),
|
|
|
+ maybe_migrate_trace(Fields),
|
|
|
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
|
|
ok = filelib:ensure_dir(filename:join([trace_dir(), dummy])),
|
|
|
ok = filelib:ensure_dir(filename:join([zip_dir(), dummy])),
|
|
|
@@ -358,9 +363,10 @@ start_trace(Trace) ->
|
|
|
name = Name,
|
|
|
type = Type,
|
|
|
filter = Filter,
|
|
|
- start_at = Start
|
|
|
+ start_at = Start,
|
|
|
+ payload_encode = PayloadEncode
|
|
|
} = Trace,
|
|
|
- Who = #{name => Name, type => Type, filter => Filter},
|
|
|
+ Who = #{name => Name, type => Type, filter => Filter, payload_encode => PayloadEncode},
|
|
|
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
|
|
|
|
|
|
stop_trace(Finished, Started) ->
|
|
|
@@ -490,6 +496,8 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
|
|
|
end;
|
|
|
to_trace(#{type := Type}, _Rec) ->
|
|
|
{error, io_lib:format("required ~s field", [Type])};
|
|
|
+to_trace(#{payload_encode := PayloadEncode} = Trace, Rec) ->
|
|
|
+ to_trace(maps:remove(payload_encode, Trace), Rec#?TRACE{payload_encode = PayloadEncode});
|
|
|
to_trace(#{start_at := StartAt} = Trace, Rec) ->
|
|
|
{ok, Sec} = to_system_second(StartAt),
|
|
|
to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
|
|
|
@@ -573,3 +581,29 @@ filter_cli_handler(Names) ->
|
|
|
|
|
|
now_second() ->
|
|
|
os:system_time(second).
|
|
|
+
|
|
|
+maybe_migrate_trace(Fields) ->
|
|
|
+ case mnesia:table_info(emqx_trace, attributes) =:= Fields of
|
|
|
+ true ->
|
|
|
+ ok;
|
|
|
+ false ->
|
|
|
+ TransFun = fun(Trace) ->
|
|
|
+ case Trace of
|
|
|
+ {?TRACE, Name, Type, Filter, Enable, StartAt, EndAt} ->
|
|
|
+ #?TRACE{
|
|
|
+ name = Name,
|
|
|
+ type = Type,
|
|
|
+ filter = Filter,
|
|
|
+ enable = Enable,
|
|
|
+ start_at = StartAt,
|
|
|
+ end_at = EndAt,
|
|
|
+ payload_encode = text,
|
|
|
+ extra = #{}
|
|
|
+ };
|
|
|
+ #?TRACE{} ->
|
|
|
+ Trace
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ {atomic, ok} = mnesia:transform_table(?TRACE, TransFun, Fields, ?TRACE),
|
|
|
+ ok
|
|
|
+ end.
|