emqttd_sysmon.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc VM System Monitor
  17. -module(emqttd_sysmon).
  18. -author("Feng Lee <feng@emqtt.io>").
  19. -behavior(gen_server).
  20. -include("emqttd_internal.hrl").
  21. -export([start_link/1]).
  22. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  23. terminate/2, code_change/3]).
  24. -record(state, {tickref, events = [], tracelog}).
  25. -define(LOG_FMT, [{formatter_config, [time, " ", message, "\n"]}]).
  26. -define(LOG(Msg, ProcInfo),
  27. lager:warning([{sysmon, true}], "~s~n~p", [WarnMsg, ProcInfo])).
  28. -define(LOG(Msg, ProcInfo, PortInfo),
  29. lager:warning([{sysmon, true}], "~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
  30. %% @doc Start system monitor
  31. -spec(start_link(Opts :: list(tuple())) ->
  32. {ok, pid()} | ignore | {error, term()}).
  33. start_link(Opts) ->
  34. gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
  35. %%--------------------------------------------------------------------
  36. %% gen_server callbacks
  37. %%--------------------------------------------------------------------
  38. init([Opts]) ->
  39. erlang:system_monitor(self(), parse_opt(Opts)),
  40. {ok, TRef} = timer:send_interval(timer:seconds(1), reset),
  41. %%TODO: don't trace for performance issue.
  42. %%{ok, TraceLog} = start_tracelog(proplists:get_value(logfile, Opts)),
  43. {ok, #state{tickref = TRef}}.
  44. parse_opt(Opts) ->
  45. parse_opt(Opts, []).
  46. parse_opt([], Acc) ->
  47. Acc;
  48. parse_opt([{long_gc, false}|Opts], Acc) ->
  49. parse_opt(Opts, Acc);
  50. parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) ->
  51. parse_opt(Opts, [{long_gc, Ms}|Acc]);
  52. parse_opt([{long_schedule, false}|Opts], Acc) ->
  53. parse_opt(Opts, Acc);
  54. parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) ->
  55. parse_opt(Opts, [{long_schedule, Ms}|Acc]);
  56. parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) ->
  57. parse_opt(Opts, [{large_heap, Size}|Acc]);
  58. parse_opt([{busy_port, true}|Opts], Acc) ->
  59. parse_opt(Opts, [busy_port|Acc]);
  60. parse_opt([{busy_port, false}|Opts], Acc) ->
  61. parse_opt(Opts, Acc);
  62. parse_opt([{busy_dist_port, true}|Opts], Acc) ->
  63. parse_opt(Opts, [busy_dist_port|Acc]);
  64. parse_opt([{busy_dist_port, false}|Opts], Acc) ->
  65. parse_opt(Opts, Acc);
  66. parse_opt([_Opt|Opts], Acc) ->
  67. parse_opt(Opts, Acc).
  68. handle_call(Req, _From, State) ->
  69. ?UNEXPECTED_REQ(Req, State).
  70. handle_cast(Msg, State) ->
  71. ?UNEXPECTED_MSG(Msg, State).
  72. handle_info({monitor, Pid, long_gc, Info}, State) ->
  73. suppress({long_gc, Pid}, fun() ->
  74. WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
  75. ?LOG(WarnMsg, procinfo(Pid)),
  76. publish(long_gc, WarnMsg)
  77. end, State);
  78. handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
  79. suppress({long_schedule, Pid}, fun() ->
  80. WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
  81. ?LOG(WarnMsg, procinfo(Pid)),
  82. publish(long_schedule, WarnMsg)
  83. end, State);
  84. handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
  85. suppress({long_schedule, Port}, fun() ->
  86. WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
  87. ?LOG(WarnMsg, erlang:port_info(Port)),
  88. publish(long_schedule, WarnMsg)
  89. end, State);
  90. handle_info({monitor, Pid, large_heap, Info}, State) ->
  91. suppress({large_heap, Pid}, fun() ->
  92. WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
  93. ?LOG(WarnMsg, procinfo(Pid)),
  94. publish(large_heap, WarnMsg)
  95. end, State);
  96. handle_info({monitor, SusPid, busy_port, Port}, State) ->
  97. suppress({busy_port, Port}, fun() ->
  98. WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
  99. ?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
  100. publish(busy_port, WarnMsg)
  101. end, State);
  102. handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
  103. suppress({busy_dist_port, Port}, fun() ->
  104. WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
  105. ?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
  106. publish(busy_dist_port, WarnMsg)
  107. end, State);
  108. handle_info(reset, State) ->
  109. {noreply, State#state{events = []}, hibernate};
  110. handle_info(Info, State) ->
  111. ?UNEXPECTED_INFO(Info, State).
  112. terminate(_Reason, #state{tickref = TRef, tracelog = TraceLog}) ->
  113. timer:cancel(TRef),
  114. cancel_tracelog(TraceLog).
  115. code_change(_OldVsn, State, _Extra) ->
  116. {ok, State}.
  117. suppress(Key, SuccFun, State = #state{events = Events}) ->
  118. case lists:member(Key, Events) of
  119. true ->
  120. {noreply, State};
  121. false ->
  122. SuccFun(),
  123. {noreply, State#state{events = [Key|Events]}}
  124. end.
  125. procinfo(Pid) ->
  126. case {emqttd_vm:get_process_info(Pid), emqttd_vm:get_process_gc(Pid)} of
  127. {undefined, _} -> undefined;
  128. {_, undefined} -> undefined;
  129. {Info, GcInfo} -> Info ++ GcInfo
  130. end.
  131. publish(Sysmon, WarnMsg) ->
  132. Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)),
  133. emqttd:publish(emqttd_message:set_flag(sys, Msg)).
  134. topic(Sysmon) ->
  135. emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).
  136. %% start_tracelog(undefined) ->
  137. %% {ok, undefined};
  138. %% start_tracelog(LogFile) ->
  139. %% lager:trace_file(LogFile, [{sysmon, true}], info, ?LOG_FMT).
  140. cancel_tracelog(undefined) ->
  141. ok;
  142. cancel_tracelog(TraceLog) ->
  143. lager:stop_trace(TraceLog).