emqttd_sysmon.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. %%%-----------------------------------------------------------------------------
  2. %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
  3. %%%
  4. %%% Permission is hereby granted, free of charge, to any person obtaining a copy
  5. %%% of this software and associated documentation files (the "Software"), to deal
  6. %%% in the Software without restriction, including without limitation the rights
  7. %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. %%% copies of the Software, and to permit persons to whom the Software is
  9. %%% furnished to do so, subject to the following conditions:
  10. %%%
  11. %%% The above copyright notice and this permission notice shall be included in all
  12. %%% copies or substantial portions of the Software.
  13. %%%
  14. %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. %%% SOFTWARE.
  21. %%%-----------------------------------------------------------------------------
  22. %%% @doc
  23. %%% emqttd system monitor.
  24. %%%
  25. %%% @end
  26. %%%-----------------------------------------------------------------------------
  27. -module(emqttd_sysmon).
  28. -author("Feng Lee <feng@emqtt.io>").
  29. -behavior(gen_server).
  30. -export([start_link/1]).
  31. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  32. terminate/2, code_change/3]).
  33. -record(state, {tick_tref, events = []}).
  34. %%------------------------------------------------------------------------------
  35. %% @doc Start system monitor
  36. %% @end
  37. %%------------------------------------------------------------------------------
  38. -spec start_link(Opts :: list(tuple())) ->
  39. {ok, pid()} | ignore | {error, term()}.
  40. start_link(Opts) ->
  41. gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
  42. %%%=============================================================================
  43. %%% gen_server callbacks
  44. %%%=============================================================================
  45. init([Opts]) ->
  46. erlang:system_monitor(self(), parse_opt(Opts)),
  47. {ok, TRef} = timer:send_interval(timer:seconds(1), reset),
  48. {ok, #state{tick_tref = TRef}}.
  49. parse_opt(Opts) ->
  50. parse_opt(Opts, []).
  51. parse_opt([], Acc) ->
  52. Acc;
  53. parse_opt([{long_gc, false}|Opts], Acc) ->
  54. parse_opt(Opts, Acc);
  55. parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) ->
  56. parse_opt(Opts, [{long_gc, Ms}|Acc]);
  57. parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) ->
  58. parse_opt(Opts, [{long_schedule, Ms}|Acc]);
  59. parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) ->
  60. parse_opt(Opts, [{large_heap, Size}|Acc]);
  61. parse_opt([{busy_port, true}|Opts], Acc) ->
  62. parse_opt(Opts, [busy_port|Acc]);
  63. parse_opt([{busy_port, false}|Opts], Acc) ->
  64. parse_opt(Opts, Acc);
  65. parse_opt([{busy_dist_port, true}|Opts], Acc) ->
  66. parse_opt(Opts, [busy_dist_port|Acc]);
  67. parse_opt([{busy_dist_port, false}|Opts], Acc) ->
  68. parse_opt(Opts, Acc).
  69. handle_call(Request, _From, State) ->
  70. lager:error("Unexpected request: ~p", [Request]),
  71. {reply, {error, unexpected_request}, State}.
  72. handle_cast(Msg, State) ->
  73. lager:error("Unexpected msg: ~p", [Msg]),
  74. {noreply, State}.
  75. handle_info({monitor, Pid, long_gc, Info}, State) ->
  76. suppress({long_gc, Pid}, fun() ->
  77. WarnMsg = io_lib:format("long_gc: pid = ~p, info: ~p", [Pid, Info]),
  78. lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
  79. publish(long_gc, WarnMsg)
  80. end, State);
  81. handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
  82. suppress({long_schedule, Pid}, fun() ->
  83. WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
  84. lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
  85. publish(long_schedule, WarnMsg)
  86. end, State);
  87. handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
  88. suppress({long_schedule, Port}, fun() ->
  89. WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
  90. lager:error("~s~n~p", [WarnMsg, erlang:port_info(Port)]),
  91. publish(long_schedule, WarnMsg)
  92. end, State);
  93. handle_info({monitor, Pid, large_heap, Info}, State) ->
  94. suppress({large_heap, Pid}, fun() ->
  95. WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
  96. lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
  97. publish(large_heap, WarnMsg)
  98. end, State);
  99. handle_info({monitor, SusPid, busy_port, Port}, State) ->
  100. suppress({busy_port, Port}, fun() ->
  101. WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
  102. lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
  103. publish(busy_port, WarnMsg)
  104. end, State);
  105. handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
  106. suppress({busy_dist_port, Port}, fun() ->
  107. WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
  108. lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
  109. publish(busy_dist_port, WarnMsg)
  110. end, State);
  111. handle_info(reset, State) ->
  112. {noreply, State#state{events = []}};
  113. handle_info(Info, State) ->
  114. lager:error("Unexpected info: ~p", [Info]),
  115. {noreply, State}.
  116. terminate(_Reason, #state{tick_tref = TRef}) ->
  117. timer:cancel(TRef).
  118. code_change(_OldVsn, State, _Extra) ->
  119. {ok, State}.
  120. suppress(Key, SuccFun, State = #state{events = Events}) ->
  121. case lists:member(Key, Events) of
  122. true ->
  123. {noreply, State};
  124. false ->
  125. SuccFun(),
  126. {noreply, State#state{events = [Key|Events]}}
  127. end.
  128. procinfo(Pid) ->
  129. case {emqttd_vm:get_process_info(Pid), emqttd_vm:get_process_gc(Pid)} of
  130. {undefined, _} -> undefined;
  131. {_, undefined} -> undefined;
  132. {Info, GcInfo} -> Info ++ GcInfo
  133. end.
  134. publish(Sysmon, WarnMsg) ->
  135. Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)),
  136. emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
  137. topic(Sysmon) ->
  138. emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).