|
|
@@ -21,9 +21,6 @@
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("snabbkaffe/include/trace.hrl").
|
|
|
|
|
|
--boot_mnesia({mnesia, [boot]}).
|
|
|
--export([mnesia/1]).
|
|
|
-
|
|
|
-export([
|
|
|
publish/1,
|
|
|
subscribe/3,
|
|
|
@@ -55,6 +52,7 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
|
|
|
|
-define(TRACE, ?MODULE).
|
|
|
+-define(SHARD, ?COMMON_SHARD).
|
|
|
-define(MAX_SIZE, 30).
|
|
|
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
|
|
|
|
|
|
@@ -77,15 +75,6 @@
|
|
|
end_at :: integer() | undefined | '_'
|
|
|
}).
|
|
|
|
|
|
-mnesia(boot) ->
|
|
|
- ok = mria:create_table(?TRACE, [
|
|
|
- {type, set},
|
|
|
- {rlog_shard, ?COMMON_SHARD},
|
|
|
- {storage, disc_copies},
|
|
|
- {record_name, ?TRACE},
|
|
|
- {attributes, record_info(fields, ?TRACE)}
|
|
|
- ]).
|
|
|
-
|
|
|
publish(#message{topic = <<"$SYS/", _/binary>>}) ->
|
|
|
ignore;
|
|
|
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
|
|
@@ -255,12 +244,19 @@ format(Traces) ->
|
|
|
).
|
|
|
|
|
|
init([]) ->
|
|
|
- ok = mria:wait_for_tables([?TRACE]),
|
|
|
erlang:process_flag(trap_exit, true),
|
|
|
+ ok = mria:create_table(?TRACE, [
|
|
|
+ {type, set},
|
|
|
+ {rlog_shard, ?SHARD},
|
|
|
+ {storage, disc_copies},
|
|
|
+ {record_name, ?TRACE},
|
|
|
+ {attributes, record_info(fields, ?TRACE)}
|
|
|
+ ]),
|
|
|
+ ok = mria:wait_for_tables([?TRACE]),
|
|
|
+ {ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
|
|
ok = filelib:ensure_dir(filename:join([trace_dir(), dummy])),
|
|
|
ok = filelib:ensure_dir(filename:join([zip_dir(), dummy])),
|
|
|
- {ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
|
|
- Traces = get_enable_trace(),
|
|
|
+ Traces = get_enabled_trace(),
|
|
|
TRef = update_trace(Traces),
|
|
|
update_trace_handler(),
|
|
|
{ok, #{timer => TRef, monitors => #{}}}.
|
|
|
@@ -288,7 +284,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor
|
|
|
{noreply, State#{monitors => NewMonitors}}
|
|
|
end;
|
|
|
handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
|
|
|
- Traces = get_enable_trace(),
|
|
|
+ Traces = get_enabled_trace(),
|
|
|
NextTRef = update_trace(Traces),
|
|
|
update_trace_handler(),
|
|
|
?tp(update_trace_done, #{}),
|
|
|
@@ -347,10 +343,13 @@ stop_all_trace_handler() ->
|
|
|
fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
|
|
|
emqx_trace_handler:running()
|
|
|
).
|
|
|
-get_enable_trace() ->
|
|
|
- transaction(fun() ->
|
|
|
- mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
|
|
|
- end).
|
|
|
+
|
|
|
+get_enabled_trace() ->
|
|
|
+ {atomic, Traces} =
|
|
|
+ mria:ro_transaction(?SHARD, fun() ->
|
|
|
+ mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
|
|
|
+ end),
|
|
|
+ Traces.
|
|
|
|
|
|
find_closest_time(Traces, Now) ->
|
|
|
Sec =
|