Browse Source

Merge pull request #338 from emqtt/dev-feng

Refactor Sysmon and suppress 'monitor' messages
Feng Lee 10 years ago
parent
commit
f0cc42b9e8

+ 4 - 3
Makefile

@@ -53,12 +53,13 @@ APPS = erts kernel stdlib sasl crypto ssl os_mon syntax_tools \
 
 check_plt: compile
 	dialyzer --check_plt --plt $(PLT) --apps $(APPS) \
-		deps/*/ebin ./ebin
+		deps/*/ebin ./ebin plugins/*/ebin
 
 build_plt: compile
 	dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) \
-		deps/*/ebin ./ebin
+		deps/*/ebin ./ebin plugins/*/ebin
 
 dialyzer: compile
-	dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin
+	dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin plugins/*/ebin
+
 

+ 0 - 1
include/emqttd_cli.hrl

@@ -32,4 +32,3 @@
 -define(USAGE(CmdList),
     [?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]).
 
-

+ 21 - 0
rel/files/emqttd.config.development

@@ -241,6 +241,27 @@
                 %{buffer, 4096},
             ]}
         ]}
+    ]},
+
+    %% Erlang System Monitor 
+    {sysmon, [
+
+        %% Long GC
+        {long_gc, 100},
+
+        %% Long Schedule(ms)
+        {long_schedule, 50},
+
+        %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
+        %% 8 * 1024 * 1024
+        {large_heap, 8388608},
+
+        %% Busy Port
+        {busy_port, true},
+
+        %% Busy Dist Port
+        {busy_dist_port, true}
+
     ]}
  ]}
 ].

+ 23 - 0
rel/files/emqttd.config.production

@@ -233,7 +233,30 @@
                 %{buffer, 4096},
             ]}
         ]}
+    ]},
+
+    %% Erlang System Monitor
+    {sysmon, [
+
+        %% Long GC, don't monitor in production mode for:
+        %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
+        {long_gc, false},
+
+        %% Long Schedule(ms)
+        {long_schedule, 50},
+
+        %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
+        %% 8 * 1024 * 1024
+        {large_heap, 8388608},
+
+        %% Busy Port
+        {busy_port, true},
+
+        %% Busy Dist Port
+        {busy_dist_port, true}
+
     ]}
+
  ]}
 ].
 

+ 1 - 1
src/emqttd.app.src

@@ -1,7 +1,7 @@
 {application, emqttd,
  [
   {description, "Erlang MQTT Broker"},
-  {vsn, "0.12.0"},
+  {vsn, "0.12.1"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 1 - 1
src/emqttd_app.erl

@@ -88,7 +88,7 @@ start_servers(Sup) ->
                {"emqttd mode supervisor", emqttd_mod_sup},
                {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
                {"emqttd access control", emqttd_access_control},
-               {"emqttd system monitor", emqttd_sysmon}],
+               {"emqttd system monitor", emqttd_sysmon, emqttd:env(sysmon)}],
     [start_server(Sup, Server) || Server <- Servers].
 
 start_server(_Sup, {Name, F}) when is_function(F) ->

+ 4 - 1
src/emqttd_pubsub.erl

@@ -107,7 +107,10 @@ mnesia(copy) ->
     Id   :: pos_integer(),
     Opts :: list().
 start_link(Id, Opts) ->
-    gen_server2:start_link(?MODULE, [Id, Opts], []).
+    gen_server2:start_link({local, name(Id)}, ?MODULE, [Id, Opts], []).
+
+name(Id) ->
+    list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)).
 
 %%------------------------------------------------------------------------------
 %% @doc Create topic. Notice That this transaction is not protected by pubsub pool

+ 4 - 1
src/emqttd_sm.erl

@@ -85,7 +85,10 @@ mnesia(copy) ->
 %%------------------------------------------------------------------------------
 -spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
 start_link(Id) ->
-    gen_server2:start_link(?MODULE, [Id], []).
+    gen_server2:start_link({local, name(Id)}, ?MODULE, [Id], []).
+
+name(Id) ->
+    list_to_atom("emqttd_sm_" ++ integer_to_list(Id)).
 
 %%------------------------------------------------------------------------------
 %% @doc Pool name.

+ 95 - 27
src/emqttd_sysmon.erl

@@ -25,69 +25,137 @@
 %%% @end
 %%%-----------------------------------------------------------------------------
 
-%%TODO: this is a demo module....
-
 -module(emqttd_sysmon).
 
 -author("Feng Lee <feng@emqtt.io>").
 
 -behavior(gen_server).
 
--export([start_link/0]).
+-export([start_link/1]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {}).
+-record(state, {tref, events = []}).
 
 %%------------------------------------------------------------------------------
 %% @doc Start system monitor
 %% @end
 %%------------------------------------------------------------------------------
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+-spec start_link(Opts :: list(tuple())) ->
+    {ok, pid()} | ignore | {error, term()}.
+start_link(Opts) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
 
 %%%=============================================================================
 %%% gen_server callbacks
 %%%=============================================================================
 
-init([]) ->
-    erlang:system_monitor(self(), [{long_gc, 5000},
-                                   {large_heap, 8 * 1024 * 1024},
-                                   busy_port]),
-    {ok, #state{}}.
+init([Opts]) ->
+    erlang:system_monitor(self(), parse_opt(Opts)),
+    {ok, TRef} = timer:send_interval(1000, reset),
+    {ok, #state{tref = TRef}}.
+
+parse_opt(Opts) ->
+    parse_opt(Opts, []).
+parse_opt([], Acc) ->
+    Acc;
+parse_opt([{long_gc, false}|Opts], Acc) ->
+    parse_opt(Opts, Acc);
+parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) ->
+    parse_opt(Opts, [{long_gc, Ms}|Acc]);
+parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) ->
+    parse_opt(Opts, [{long_schedule, Ms}|Acc]);
+parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) ->
+    parse_opt(Opts, [{large_heap, Size}|Acc]);
+parse_opt([{busy_port, true}|Opts], Acc) ->
+    parse_opt(Opts, [busy_port|Acc]);
+parse_opt([{busy_port, false}|Opts], Acc) ->
+    parse_opt(Opts, Acc);
+parse_opt([{busy_dist_port, true}|Opts], Acc) ->
+    parse_opt(Opts, [busy_dist_port|Acc]);
+parse_opt([{busy_dist_port, false}|Opts], Acc) ->
+    parse_opt(Opts, Acc).
 
 handle_call(Request, _From, State) ->
     lager:error("Unexpected request: ~p", [Request]),
     {reply, {error, unexpected_request}, State}.
 
 handle_cast(Msg, State) ->
-    lager:error("unexpected msg: ~p", [Msg]),
+    lager:error("Unexpected msg: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({monitor, GcPid, long_gc, Info}, State) ->
-    lager:error("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid, 
-		[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
-    {noreply, State};
-
-handle_info({monitor, GcPid, large_heap, Info}, State) ->
-    lager:error("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid, 
-		[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
-    {noreply, State};
+handle_info({monitor, Pid, long_gc, Info}, State) ->
+    suppress({long_gc, Pid}, fun() ->
+            WarnMsg = io_lib:format("long_gc: pid = ~p, info: ~p", [Pid, Info]),
+            lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
+            publish(long_gc, WarnMsg)
+        end, State);
+
+handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
+    suppress({long_schedule, Pid}, fun() ->
+            WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
+            lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
+            publish(long_schedule, WarnMsg)
+        end, State);
+
+handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
+    suppress({long_schedule, Port}, fun() ->
+        WarnMsg  = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
+        lager:error("~s~n~p", [WarnMsg, erlang:port_info(Port)]),
+        publish(long_schedule, WarnMsg)
+    end, State);
+
+handle_info({monitor, Pid, large_heap, Info}, State) ->
+    suppress({large_heap, Pid}, fun() ->
+        WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
+        lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
+        publish(large_heap, WarnMsg)
+    end, State);
 
 handle_info({monitor, SusPid, busy_port, Port}, State) ->
-    lager:error("busy_port: suspid = ~p, port = ~p", [process_info(SusPid, 
-		[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]),
-    {noreply, State};
+    suppress({busy_port, Port}, fun() ->
+        WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
+        lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
+        publish(busy_port, WarnMsg)
+    end, State);
+
+handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
+    suppress({busy_dist_port, Port}, fun() ->
+        WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
+        lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
+        publish(busy_dist_port, WarnMsg)
+    end, State);
+
+handle_info(reset, State) ->
+    {noreply, State#state{events = []}};
 
 handle_info(Info, State) ->
     lager:error("Unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(_Reason, _State) ->
-    ok.
+terminate(_Reason, #state{tref = TRef}) ->
+    timer:cancel(TRef), ok.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+suppress(Key, SuccFun, State = #state{events = Events}) ->
+    case lists:member(Key, Events) of
+        true  ->
+            {noreply, State};
+        false ->
+            SuccFun(),
+            {noreply, State#state{events = [Key|Events]}}
+    end.
+
+procinfo(Pid) ->
+    emqttd_vm:get_process_info(Pid) ++ emqttd_vm:get_process_gc(Pid).
+
+publish(Sysmon, WarnMsg) ->
+    Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)),
+    emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
+
+topic(Sysmon) ->
+    emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).
+

+ 4 - 2
src/emqttd_vm.erl

@@ -39,7 +39,9 @@
 
 -export([get_process_list/0,
          get_process_info/0,
+         get_process_info/1,
          get_process_gc/0,
+         get_process_gc/1,
          get_process_group_leader_info/1,
          get_process_limit/0]).
 	
@@ -311,12 +313,12 @@ get_process_list(Pid) when is_pid(Pid) ->
 get_process_info() ->
     [get_process_info(Pid) || Pid <- processes()].
 get_process_info(Pid) when is_pid(Pid) ->
-    [process_info(Pid, Key) || Key <- ?PROCESS_INFO].
+    process_info(Pid, ?PROCESS_INFO).
 
 get_process_gc() ->
     [get_process_gc(Pid) || Pid <- processes()].
 get_process_gc(Pid) when is_pid(Pid) ->
-    [process_info(Pid, Key) || Key <- ?PROCESS_GC].
+    process_info(Pid, ?PROCESS_GC).
 
 get_process_group_leader_info(LeaderPid) when is_pid(LeaderPid) ->
     [{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO)].