浏览代码

Add monitors and alarm handler (#2266)

* Add monitors and alarm handler
tigercl 7 年之前
父节点
当前提交
da755b88c7

+ 2 - 1
Makefile

@@ -37,7 +37,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
 			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
 			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
 			emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
 			emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
 			emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
 			emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
-			emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message
+			emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
+      emqx_vm_mon emqx_alarm_handler
 
 
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

+ 57 - 0
etc/emqx.conf

@@ -2055,4 +2055,61 @@ sysmon.busy_port = false
 ## Value: true | false
 ## Value: true | false
 sysmon.busy_dist_port = true
 sysmon.busy_dist_port = true
 
 
+## The time interval for the periodic cpu check
+##
+## Value: Duration
+## -h: hour, e.g. '2h' for 2 hours
+## -m: minute, e.g. '5m' for 5 minutes
+## -s: second, e.g. '30s' for 30 seconds
+##
+## Default: 60s
+os_mon.cpu_check_interval = 60s
+
+## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is set.
+##
+## Default: 80%
+os_mon.cpu_high_watermark = 80%
+
+## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is clear.
+##
+## Default: 60%
+os_mon.cpu_low_watermark = 60%
+
+## The time interval for the periodic memory check
+##
+## Value: Duration
+## -h: hour, e.g. '2h' for 2 hours
+## -m: minute, e.g. '5m' for 5 minutes
+## -s: second, e.g. '30s' for 30 seconds
+##
+## Default: 60s
+os_mon.mem_check_interval = 60s
+
+## The threshold, as percentage of system memory, for how much system memory can be allocated before the corresponding alarm is set.
+##
+## Default: 70%
+os_mon.sysmem_high_watermark = 70%
+
+## The threshold, as percentage of system memory, for how much system memory can be allocated by one Erlang process before the corresponding alarm is set.
+##
+## Default: 5%
+os_mon.procmem_high_watermark = 5%
+
+## The time interval for the periodic process limit check
+##
+## Value: Duration
+##
+## Default: 30s
+vm_mon.check_interval = 30s
+
+## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is set.
+##
+## Default: 80%
+vm_mon.process_high_watermark = 80%
+
+## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is clear.
+##
+## Default: 60%
+vm_mon.process_low_watermark = 60%
+
 {{ additional_configs }}
 {{ additional_configs }}

+ 67 - 0
priv/emqx.schema

@@ -1886,3 +1886,70 @@ end}.
    {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
    {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
    {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
    {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
 end}.
 end}.
+
+%%--------------------------------------------------------------------
+%% Operating System Monitor
+%%--------------------------------------------------------------------
+
+{mapping, "os_mon.cpu_check_interval", "emqx.os_mon", [
+  {default, 60},
+  {datatype, {duration, s}}
+]}.
+
+{mapping, "os_mon.cpu_high_watermark", "emqx.os_mon", [
+  {default, "80%"},
+  {datatype, {percent, float}}
+]}.
+
+{mapping, "os_mon.cpu_low_watermark", "emqx.os_mon", [
+  {default, "60%"},
+  {datatype, {percent, float}}
+]}.
+
+{mapping, "os_mon.mem_check_interval", "emqx.os_mon", [
+  {default, 60},
+  {datatype, {duration, s}}
+]}.
+
+{mapping, "os_mon.sysmem_high_watermark", "emqx.os_mon", [
+  {default, "70%"},
+  {datatype, {percent, float}}
+]}.
+
+{mapping, "os_mon.procmem_high_watermark", "emqx.os_mon", [
+  {default, "5%"},
+  {datatype, {percent, float}}
+]}.
+
+{translation, "emqx.os_mon", fun(Conf) ->
+  [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)},
+   {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf)},
+   {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf)},
+   {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)},
+   {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf)},
+   {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf)}]
+end}.
+
+%%--------------------------------------------------------------------
+%% VM Monitor
+%%--------------------------------------------------------------------
+{mapping, "vm_mon.check_interval", "emqx.vm_mon", [
+  {default, 30},
+  {datatype, {duration, s}}
+]}.
+
+{mapping, "vm_mon.process_high_watermark", "emqx.vm_mon", [
+  {default, "80%"},
+  {datatype, {percent, float}}
+]}.
+
+{mapping, "vm_mon.process_low_watermark", "emqx.vm_mon", [
+  {default, "60%"},
+  {datatype, {percent, float}}
+]}.
+
+{translation, "emqx.vm_mon", fun(Conf) ->
+  [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
+   {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)},
+   {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}]
+end}.

+ 1 - 1
src/emqx.app.src

@@ -4,7 +4,7 @@
               {modules,[]},
               {modules,[]},
               {registered,[emqx_sup]},
               {registered,[emqx_sup]},
               {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy,
               {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy,
-                             replayq]},
+                             replayq,sasl,os_mon]},
               {env,[]},
               {env,[]},
               {mod,{emqx_app,[]}},
               {mod,{emqx_app,[]}},
               {maintainers,["Feng Lee <feng@emqx.io>"]},
               {maintainers,["Feng Lee <feng@emqx.io>"]},

+ 169 - 0
src/emqx_alarm_handler.erl

@@ -0,0 +1,169 @@
+%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_alarm_handler).
+
+-behaviour(gen_event).
+
+-include("emqx.hrl").
+-include("logger.hrl").
+
+%% Mnesia bootstrap
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+-export([init/1,
+         handle_event/2,
+         handle_call/2,
+         handle_info/2,
+         terminate/2]).
+
+-export([load/0,
+         get_alarms/0]).
+
+-record(common_alarm, {id, desc}).
+-record(alarm_history, {id, clear_at}).
+
+-define(ALARM_TAB, emqx_alarm).
+-define(ALARM_HISTORY_TAB, emqx_alarm_history).
+
+%%------------------------------------------------------------------------------
+%% Mnesia bootstrap
+%%------------------------------------------------------------------------------
+
+mnesia(boot) ->
+    ok = ekka_mnesia:create_table(?ALARM_TAB, [
+                {type, set},
+                {disc_copies, [node()]},
+                {local_content, true},
+                {record_name, common_alarm},
+                {attributes, record_info(fields, common_alarm)}]),
+    ok = ekka_mnesia:create_table(?ALARM_HISTORY_TAB, [
+                {type, set},
+                {disc_copies, [node()]},
+                {local_content, true},
+                {record_name, alarm_history},
+                {attributes, record_info(fields, alarm_history)}]);
+mnesia(copy) ->
+    ok = ekka_mnesia:copy_table(?ALARM_TAB),
+    ok = ekka_mnesia:copy_table(?ALARM_HISTORY_TAB).
+
+%%----------------------------------------------------------------------
+%% API
+%%----------------------------------------------------------------------
+
+load() ->
+    gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {?MODULE, []}).
+
+get_alarms() ->
+    gen_event:call(alarm_handler, ?MODULE, get_alarms).
+
+%%----------------------------------------------------------------------
+%% gen_event callbacks
+%%----------------------------------------------------------------------
+
+init({_Args, {alarm_handler, ExistingAlarms}}) ->
+    init_tables(ExistingAlarms),
+    {ok, []};
+init(_) ->
+    init_tables([]),
+    {ok, []}.
+
+handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) ->
+    handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State);
+handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
+    ?LOG(notice, "Alarm report: set ~p", [Alarm]),
+    case encode_alarm(Alarm) of
+        {ok, Json} ->
+            emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json));
+        {error, Reason} ->
+            ?LOG(error, "Failed to encode alarm: ~p", [Reason])
+    end,
+    set_alarm_(AlarmId, AlarmDesc),
+    {ok, State};
+handle_event({clear_alarm, AlarmId}, State) ->
+    ?LOG(notice, "Alarm report: clear ~p", [AlarmId]),
+    emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)),
+    clear_alarm_(AlarmId),
+    {ok, State};
+handle_event(_, State) ->
+    {ok, State}.
+
+handle_info(_, State) -> {ok, State}.
+
+handle_call(get_alarms, State) ->
+    {ok, get_alarms_(), State};
+handle_call(_Query, State)     -> {ok, {error, bad_query}, State}.
+
+terminate(swap, _State) ->
+    {emqx_alarm_handler, get_alarms_()};
+terminate(_, _) ->
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+init_tables(ExistingAlarms) ->
+    mnesia:clear_table(?ALARM_TAB),
+    lists:foreach(fun({Id, _Desc}) ->
+                      set_alarm_history(Id)
+                  end, ExistingAlarms).
+
+encode_alarm({AlarmId, #alarm{severity  = Severity, 
+                              title     = Title,
+                              summary   = Summary, 
+                              timestamp = Ts}}) ->
+    emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
+                           {desc, [{severity, Severity},
+                                   {title, iolist_to_binary(Title)},
+                                   {summary, iolist_to_binary(Summary)},
+                                   {ts, emqx_time:now_secs(Ts)}]}]);
+encode_alarm({AlarmId, AlarmDesc}) ->
+    emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, 
+                           {desc, maybe_to_binary(AlarmDesc)}]).
+
+alarm_msg(Topic, Payload) ->
+    Msg = emqx_message:make(?MODULE, Topic, Payload),
+    emqx_message:set_headers(#{'Content-Type' => <<"application/json">>},
+                             emqx_message:set_flag(sys, Msg)).
+
+topic(alert, AlarmId) ->
+    emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
+topic(clear, AlarmId) ->
+    emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>).
+
+maybe_to_binary(Data) when is_binary(Data) ->
+    Data;
+maybe_to_binary(Data) ->
+    iolist_to_binary(io_lib:format("~p", [Data])).
+
+set_alarm_(Id, Desc) ->
+    mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = Desc}).
+
+clear_alarm_(Id) ->
+    mnesia:dirty_delete(?ALARM_TAB, Id),
+    set_alarm_history(Id).
+
+get_alarms_() ->
+    Alarms = ets:tab2list(?ALARM_TAB),
+    [{Id, Desc} || #common_alarm{id = Id, desc = Desc} <- Alarms].
+
+set_alarm_history(Id) ->
+    mnesia:dirty_write(?ALARM_HISTORY_TAB, #alarm_history{id = Id,
+                                                          clear_at = undefined}).
+
+

+ 0 - 144
src/emqx_alarm_mgr.erl

@@ -1,144 +0,0 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-
--module(emqx_alarm_mgr).
-
--behaviour(gen_event).
-
--include("emqx.hrl").
--include("logger.hrl").
-
--export([start_link/0]).
--export([alarm_fun/0, get_alarms/0, set_alarm/1, clear_alarm/1]).
--export([add_alarm_handler/1, add_alarm_handler/2, delete_alarm_handler/1]).
-
-%% gen_event callbacks
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
-         code_change/3]).
-
--define(ALARM_MGR, ?MODULE).
-
-start_link() ->
-    start_with(
-      fun(Pid) ->
-          gen_event:add_handler(Pid, ?MODULE, [])
-      end).
-
-start_with(Fun) ->
-    case gen_event:start_link({local, ?ALARM_MGR}) of
-        {ok, Pid} -> Fun(Pid), {ok, Pid};
-        Error     -> Error
-    end.
-
-alarm_fun() -> alarm_fun(false).
-
-alarm_fun(Bool) ->
-    fun(alert, _Alarm)   when Bool =:= true  -> alarm_fun(true);
-       (alert,  Alarm)   when Bool =:= false -> set_alarm(Alarm), alarm_fun(true);
-       (clear,  AlarmId) when Bool =:= true  -> clear_alarm(AlarmId), alarm_fun(false);
-       (clear, _AlarmId) when Bool =:= false -> alarm_fun(false)
-    end.
-
--spec(set_alarm(emqx_types:alarm()) -> ok).
-set_alarm(Alarm) when is_record(Alarm, alarm) ->
-    gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}).
-
--spec(clear_alarm(any()) -> ok).
-clear_alarm(AlarmId) when is_binary(AlarmId) ->
-    gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}).
-
--spec(get_alarms() -> list(emqx_types:alarm())).
-get_alarms() ->
-    gen_event:call(?ALARM_MGR, ?MODULE, get_alarms).
-
-add_alarm_handler(Module) when is_atom(Module) ->
-    gen_event:add_handler(?ALARM_MGR, Module, []).
-
-add_alarm_handler(Module, Args) when is_atom(Module) ->
-    gen_event:add_handler(?ALARM_MGR, Module, Args).
-
-delete_alarm_handler(Module) when is_atom(Module) ->
-    gen_event:delete_handler(?ALARM_MGR, Module, []).
-
-%%------------------------------------------------------------------------------
-%% Default Alarm handler
-%%------------------------------------------------------------------------------
-
-init(_) -> {ok, #{alarms => []}}.
-
-handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)->
-    handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State);
-
-handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alarms}) ->
-    case encode_alarm(Alarm) of
-        {ok, Json} ->
-            emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
-        {error, Reason} ->
-            ?ERROR("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
-    end,
-    {ok, State#{alarms := [Alarm|Alarms]}};
-
-handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) ->
-    case emqx_json:safe_encode([{id, AlarmId}, {ts, os:system_time(second)}]) of
-        {ok, Json} ->
-            emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json));
-        {error, Reason} ->
-            ?ERROR("[AlarmMgr] Failed to encode clear: ~p", [Reason])
-    end,
-    {ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
-
-handle_event(Event, State)->
-    ?ERROR("[AlarmMgr] unexpected event: ~p", [Event]),
-    {ok, State}.
-
-handle_info(Info, State) ->
-    ?ERROR("[AlarmMgr] unexpected info: ~p", [Info]),
-    {ok, State}.
-
-handle_call(get_alarms, State = #{alarms := Alarms}) ->
-    {ok, Alarms, State};
-
-handle_call(Req, State) ->
-    ?ERROR("[AlarmMgr] unexpected call: ~p", [Req]),
-    {ok, ignored, State}.
-
-terminate(swap, State) ->
-    {?MODULE, State};
-terminate(_, _) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%------------------------------------------------------------------------------
-%% Internal functions
-%%------------------------------------------------------------------------------
-
-encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title,
-                    summary = Summary, timestamp = Ts}) ->
-    emqx_json:safe_encode([{id, AlarmId}, {severity, Severity},
-                           {title, iolist_to_binary(Title)},
-                           {summary, iolist_to_binary(Summary)},
-                           {ts, emqx_time:now_secs(Ts)}]).
-
-alarm_msg(Type, AlarmId, Json) ->
-    Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json),
-    emqx_message:set_headers( #{'Content-Type' => <<"application/json">>},
-                              emqx_message:set_flag(sys, Msg)).
-
-topic(alert, AlarmId) ->
-    emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
-topic(clear, AlarmId) ->
-    emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>).
-

+ 4 - 0
src/emqx_app.erl

@@ -40,6 +40,10 @@ start(_Type, _Args) ->
     emqx_listeners:start(),
     emqx_listeners:start(),
     start_autocluster(),
     start_autocluster(),
     register(emqx, self()),
     register(emqx, self()),
+
+    emqx_alarm_handler:load(),
+    emqx_logger_handler:init(),
+    
     print_vsn(),
     print_vsn(),
     {ok, Sup}.
     {ok, Sup}.
 
 

+ 0 - 1
src/emqx_kernel_sup.erl

@@ -26,7 +26,6 @@ start_link() ->
 init([]) ->
 init([]) ->
     {ok, {{one_for_one, 10, 100},
     {ok, {{one_for_one, 10, 100},
           [child_spec(emqx_pool_sup, supervisor),
           [child_spec(emqx_pool_sup, supervisor),
-           child_spec(emqx_alarm_mgr, worker),
            child_spec(emqx_hooks, worker),
            child_spec(emqx_hooks, worker),
            child_spec(emqx_stats, worker),
            child_spec(emqx_stats, worker),
            child_spec(emqx_metrics, worker),
            child_spec(emqx_metrics, worker),

+ 42 - 0
src/emqx_logger_handler.erl

@@ -0,0 +1,42 @@
+%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_logger_handler).
+
+-export([log/2]).
+-export([init/0]).
+
+init() ->
+    logger:add_handler(emqx_logger_handler, 
+                       emqx_logger_handler, 
+                       #{level => error,
+                         filters => [{easy_filter, {fun filter_by_level/2, []}}],
+                         filters_default => stop}).
+
+-spec log(LogEvent, Config) -> ok when LogEvent :: logger:log_event(), Config :: logger:handler_config().
+log(#{msg := {report, #{report := [{supervisor, SupName},
+                                   {errorContext, Error},
+                                   {reason, Reason},
+                                   {offender, _}]}}}, _Config) ->
+    alarm_handler:set_alarm({supervisor_report, [{supervisor, SupName},
+                                                 {errorContext, Error},
+                                                 {reason, Reason}]}),
+    ok;
+log(_LogEvent, _Config) ->
+    ok.
+
+filter_by_level(LogEvent = #{level := error}, _Extra) ->
+    LogEvent;
+filter_by_level(_LogEvent, _Extra) ->
+    stop.

+ 153 - 0
src/emqx_os_mon.erl

@@ -0,0 +1,153 @@
+%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_os_mon).
+
+-behaviour(gen_server).
+
+-include("logger.hrl").
+
+-export([start_link/1]).
+
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
+-export([get_cpu_check_interval/0,
+         set_cpu_check_interval/1,
+         get_cpu_high_watermark/0,
+         set_cpu_high_watermark/1,
+         get_cpu_low_watermark/0,
+         set_cpu_low_watermark/1,
+         get_mem_check_interval/0,
+         set_mem_check_interval/1,
+         get_sysmem_high_watermark/0,
+         set_sysmem_high_watermark/1,
+         get_procmem_high_watermark/0,
+         set_procmem_high_watermark/1]).
+
+-define(OS_MON, ?MODULE).
+
+%%----------------------------------------------------------------------
+%% API
+%%----------------------------------------------------------------------
+
+start_link(Opts) ->
+    gen_server:start_link({local, ?OS_MON}, ?MODULE, [Opts], []).
+
+get_cpu_check_interval() ->
+    call(get_cpu_check_interval).
+
+set_cpu_check_interval(Seconds) ->
+    call({set_cpu_check_interval, Seconds}).
+
+get_cpu_high_watermark() ->
+    call(get_cpu_high_watermark).
+
+set_cpu_high_watermark(Float) ->
+    call({set_cpu_high_watermark, Float}).
+
+get_cpu_low_watermark() ->
+    call(get_cpu_low_watermark).
+
+set_cpu_low_watermark(Float) ->
+    call({set_cpu_low_watermark, Float}).
+
+get_mem_check_interval() ->
+    memsup:get_check_interval() div 1000.
+
+set_mem_check_interval(Seconds) ->
+    memsup:set_check_interval(Seconds div 60).
+
+get_sysmem_high_watermark() ->
+    memsup:get_sysmem_high_watermark() / 100.
+
+set_sysmem_high_watermark(Float) ->
+    memsup:set_sysmem_high_watermark(Float).
+
+get_procmem_high_watermark() ->
+    memsup:get_procmem_high_watermark() / 100.
+
+set_procmem_high_watermark(Float) ->
+    memsup:set_procmem_high_watermark(Float).
+
+%%----------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------
+
+init([Opts]) ->
+    _ = cpu_sup:util(),
+    set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)),
+    set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)),
+    set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)),
+    {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts, 0.80),
+                              cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts, 0.60),
+                              cpu_check_interval => proplists:get_value(cpu_check_interval, Opts, 60),
+                              timer => undefined})}.
+
+handle_call(get_cpu_check_interval, _From, State) ->
+    {reply, maps:get(cpu_check_interval, State, undefined), State};
+handle_call({set_cpu_check_interval, Seconds}, _From, State) ->
+    {reply, ok, State#{cpu_check_interval := Seconds}};
+
+handle_call(get_cpu_high_watermark, _From, State) ->
+    {reply, maps:get(cpu_high_watermark, State, undefined), State};
+handle_call({set_cpu_high_watermark, Float}, _From, State) ->
+    {reply, ok, State#{cpu_high_watermark := Float}};
+
+handle_call(get_cpu_low_watermark, _From, State) ->
+    {reply, maps:get(cpu_low_watermark, State, undefined), State};
+handle_call({set_cpu_low_watermark, Float}, _From, State) ->
+    {reply, ok, State#{cpu_low_watermark := Float}};
+
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info({timeout, Timer, check}, State = #{timer := Timer, 
+                                               cpu_high_watermark := CPUHighWatermark,
+                                               cpu_low_watermark := CPULowWatermark}) ->
+    case cpu_sup:util() of
+        0 ->
+            {noreply, State#{timer := undefined}};
+        {error, Reason} ->
+            ?LOG(warning, "Failed to get cpu utilization: ~p", [Reason]),
+            {noreply, ensure_check_timer(State)};
+        Busy when Busy / 100 >= CPUHighWatermark ->
+            alarm_handler:set_alarm({cpu_high_watermark, Busy}),
+            {noreply, ensure_check_timer(State)};
+        Busy when Busy / 100 < CPULowWatermark ->
+            alarm_handler:clear_alarm(cpu_high_watermark),
+            {noreply, ensure_check_timer(State)}
+    end.
+
+terminate(_Reason, #{timer := Timer}) ->
+    emqx_misc:cancel_timer(Timer).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%----------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------
+call(Req) ->
+    gen_server:call(?OS_MON, Req, infinity).
+
+ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
+    State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.

+ 2 - 1
src/emqx_sys_mon.erl

@@ -163,5 +163,6 @@ safe_publish(Event, WarnMsg) ->
     emqx_broker:safe_publish(sysmon_msg(Topic, iolist_to_binary(WarnMsg))).
     emqx_broker:safe_publish(sysmon_msg(Topic, iolist_to_binary(WarnMsg))).
 
 
 sysmon_msg(Topic, Payload) ->
 sysmon_msg(Topic, Payload) ->
-    emqx_message:make(?SYSMON, #{sys => true}, Topic, Payload).
+    Msg = emqx_message:make(?SYSMON, Topic, Payload),
+    emqx_message:set_flag(sys, Msg).
 
 

+ 19 - 13
src/emqx_sys_sup.erl

@@ -24,17 +24,23 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 
 init([]) ->
 init([]) ->
-    Sys = #{id       => sys,
-            start    => {emqx_sys, start_link, []},
-            restart  => permanent,
-            shutdown => 5000,
-            type     => worker,
-            modules  => [emqx_sys]},
-    Sysmon = #{id       => sys_mon,
-               start    => {emqx_sys_mon, start_link, [emqx_config:get_env(sysmon, [])]},
-               restart  => permanent,
-               shutdown => 5000,
-               type     => worker,
-               modules  => [emqx_sys_mon]},
-    {ok, {{one_for_one, 10, 100}, [Sys, Sysmon]}}.
+    {ok, {{one_for_one, 10, 100}, [child_spec(emqx_sys, worker),
+                                   child_spec(emqx_sys_mon, worker, [emqx_config:get_env(sysmon, [])]),
+                                   child_spec(emqx_os_mon, worker, [emqx_config:get_env(os_mon, [])]),
+                                   child_spec(emqx_vm_mon, worker, [emqx_config:get_env(vm_mon, [])])]}}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+child_spec(M, worker) ->
+    child_spec(M, worker, []).
+
+child_spec(M, worker, A) ->
+    #{id       => M,
+      start    => {M, start_link, A},
+      restart  => permanent,
+      shutdown => 5000,
+      type     => worker,
+      modules  => [M]}.
 
 

+ 118 - 0
src/emqx_vm_mon.erl

@@ -0,0 +1,118 @@
+%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_vm_mon).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1,
+         handle_call/3,
+         handle_cast/2,
+         handle_info/2,
+         terminate/2,
+         code_change/3]).
+
+-export([get_check_interval/0,
+         set_check_interval/1,
+         get_process_high_watermark/0,
+         set_process_high_watermark/1,
+         get_process_low_watermark/0,
+         set_process_low_watermark/1]).
+
+-define(VM_MON, ?MODULE).
+
+%%----------------------------------------------------------------------
+%% API
+%%----------------------------------------------------------------------
+
+start_link(Opts) ->
+    gen_server:start_link({local, ?VM_MON}, ?MODULE, [Opts], []).
+
+get_check_interval() ->
+    call(get_check_interval).
+
+set_check_interval(Seconds) ->
+    call({set_check_interval, Seconds}).
+
+get_process_high_watermark() ->
+    call(get_process_high_watermark).
+
+set_process_high_watermark(Float) ->
+    call({set_process_high_watermark, Float}).
+
+get_process_low_watermark() ->
+    call(get_process_low_watermark).
+
+set_process_low_watermark(Float) ->
+    call({set_process_low_watermark, Float}).
+
+%%----------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------
+
+init([Opts]) ->
+    {ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts, 30),
+                              process_high_watermark => proplists:get_value(process_high_watermark, Opts, 0.70),
+                              process_low_watermark => proplists:get_value(process_low_watermark, Opts, 0.50),
+                              timer => undefined})}.
+
+handle_call(get_check_interval, _From, State) ->
+    {reply, maps:get(check_interval, State, undefined), State};
+handle_call({set_check_interval, Seconds}, _From, State) ->
+    {reply, ok, State#{check_interval := Seconds}};
+
+handle_call(get_process_high_watermark, _From, State) ->
+    {reply, maps:get(process_high_watermark, State, undefined), State};
+handle_call({set_process_high_watermark, Float}, _From, State) ->
+    {reply, ok, State#{process_high_watermark := Float}};
+
+handle_call(get_process_low_watermark, _From, State) ->
+    {reply, maps:get(process_low_watermark, State, undefined), State};
+handle_call({set_process_low_watermark, Float}, _From, State) ->
+    {reply, ok, State#{process_low_watermark := Float}};
+
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info({timeout, Timer, check}, State = #{timer := Timer,
+                                               process_high_watermark := ProcHighWatermark,
+                                               process_low_watermark := ProcLowWatermark}) ->
+    ProcessCount = erlang:system_info(process_count),
+    case ProcessCount / erlang:system_info(process_limit) of
+        Percent when Percent >= ProcHighWatermark ->
+            alarm_handler:set_alarm({too_many_processes, ProcessCount});
+        Percent when Percent < ProcLowWatermark ->
+            alarm_handler:clear_alarm(too_many_processes)
+    end,
+    {noreply, ensure_check_timer(State)}.
+
+terminate(_Reason, #{timer := Timer}) ->
+    emqx_misc:cancel_timer(Timer).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%----------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------
+call(Req) ->
+    gen_server:call(?VM_MON, Req, infinity).
+
+ensure_check_timer(State = #{check_interval := Interval}) ->
+    State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.

+ 145 - 0
test/emqx_alarm_handler_SUITE.erl

@@ -0,0 +1,145 @@
+%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_alarm_handler_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-include_lib("common_test/include/ct.hrl").
+
+-include("emqx_mqtt.hrl").
+-include("emqx.hrl").
+
+all() -> [t_alarm_handler, t_logger_handler].
+
+init_per_suite(Config) ->
+    [start_apps(App, {SchemaFile, ConfigFile}) ||
+        {App, SchemaFile, ConfigFile}
+            <- [{emqx, local_path("priv/emqx.schema"),
+                       local_path("etc/emqx.conf")}]],
+    Config.
+
+end_per_suite(_Config) ->
+    application:stop(emqx).
+
+local_path(RelativePath) ->
+    filename:join([get_base_dir(), RelativePath]).
+
+get_base_dir() ->
+    {file, Here} = code:is_loaded(?MODULE),
+    filename:dirname(filename:dirname(Here)).
+
+start_apps(App, {SchemaFile, ConfigFile}) ->
+    read_schema_configs(App, {SchemaFile, ConfigFile}),
+    set_special_configs(App),
+    application:ensure_all_started(App).
+
+read_schema_configs(App, {SchemaFile, ConfigFile}) ->
+    ct:pal("Read configs - SchemaFile: ~p, ConfigFile: ~p", [SchemaFile, ConfigFile]),
+    Schema = cuttlefish_schema:files([SchemaFile]),
+    Conf = conf_parse:file(ConfigFile),
+    NewConfig = cuttlefish_generator:map(Schema, Conf),
+    Vals = proplists:get_value(App, NewConfig, []),
+    [application:set_env(App, Par, Value) || {Par, Value} <- Vals].
+
+set_special_configs(_App) ->
+    ok.
+
+with_connection(DoFun) ->
+    {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
+                                          [binary, {packet, raw}, {active, false}],
+                                          3000),
+    try
+        DoFun(Sock)
+    after
+        emqx_client_sock:close(Sock)
+    end.
+
+t_alarm_handler(_) ->
+    with_connection(
+        fun(Sock) ->
+            emqx_client_sock:send(Sock,
+                                  raw_send_serialize(
+                                      ?CONNECT_PACKET(
+                                          #mqtt_packet_connect{
+                                          proto_ver  = ?MQTT_PROTO_V5}),
+                                      #{version => ?MQTT_PROTO_V5}
+                                  )),
+            {ok, Data} = gen_tcp:recv(Sock, 0),
+            {ok, ?CONNACK_PACKET(?RC_SUCCESS), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
+
+            Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>),
+            Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>),
+            SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
+            emqx_client_sock:send(Sock, 
+                                  raw_send_serialize(
+                                      ?SUBSCRIBE_PACKET(
+                                          1, 
+                                          [{Topic1, SubOpts},
+                                           {Topic2, SubOpts}]), 
+                                      #{version => ?MQTT_PROTO_V5})),
+
+            {ok, Data2} = gen_tcp:recv(Sock, 0),
+            {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
+
+            alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test,
+                                                            severity = error,
+                                                            title="alarm title",
+                                                            summary="alarm summary"}}),
+
+            {ok, Data3} = gen_tcp:recv(Sock, 0),
+
+            {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
+
+            ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())),
+
+            alarm_handler:clear_alarm(alarm_for_test),
+
+            {ok, Data4} = gen_tcp:recv(Sock, 0),
+
+            {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), _} = raw_recv_parse(Data4, ?MQTT_PROTO_V5),
+
+            ?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms()))
+
+        end).
+
+t_logger_handler(_) ->
+    %% Meck supervisor report
+    logger:log(error, #{label => {supervisor, start_error}, 
+                        report => [{supervisor, {local, tmp_sup}}, 
+                                   {errorContext, shutdown}, 
+                                   {reason, reached_max_restart_intensity}, 
+                                   {offender, [{pid, meck},
+                                               {id, meck},
+                                               {mfargs, {meck, start_link, []}},
+                                               {restart_type, permanent},
+                                               {shutdown, 5000},
+                                               {child_type, worker}]}]}, 
+               #{logger_formatter => #{title => "SUPERVISOR REPORT"},
+                 report_cb => fun logger:format_otp_report/1}),
+    ?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())).
+
+raw_send_serialize(Packet) ->
+    emqx_frame:serialize(Packet).
+
+raw_send_serialize(Packet, Opts) ->
+    emqx_frame:serialize(Packet, Opts).
+
+raw_recv_parse(P, ProtoVersion) ->
+    emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
+                                 version         => ProtoVersion}}).
+

+ 2 - 13
test/emqx_broker_SUITE.erl

@@ -29,8 +29,7 @@ all() ->
     [{group, pubsub},
     [{group, pubsub},
      {group, session},
      {group, session},
      {group, metrics},
      {group, metrics},
-     {group, stats},
-     {group, alarms}].
+     {group, stats}].
 
 
 groups() ->
 groups() ->
     [
     [
@@ -41,8 +40,7 @@ groups() ->
                            'pubsub#', 'pubsub+']},
                            'pubsub#', 'pubsub+']},
      {session, [sequence], [start_session]},
      {session, [sequence], [start_session]},
      {metrics, [sequence], [inc_dec_metric]},
      {metrics, [sequence], [inc_dec_metric]},
-     {stats, [sequence], [set_get_stat]},
-     {alarms, [sequence], [set_alarms]}
+     {stats, [sequence], [set_get_stat]}
     ].
     ].
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
@@ -171,12 +169,3 @@ inc_dec_metric(_) ->
 set_get_stat(_) ->
 set_get_stat(_) ->
     emqx_stats:setstat('retained/max', 99),
     emqx_stats:setstat('retained/max', 99),
     99 = emqx_stats:getstat('retained/max').
     99 = emqx_stats:getstat('retained/max').
-
-set_alarms(_) ->
-    AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
-    emqx_alarm_mgr:set_alarm(AlarmTest),
-    Alarms = emqx_alarm_mgr:get_alarms(),
-    ct:log("Alarms Length: ~p ~n", [length(Alarms)]),
-    ?assertEqual(1, length(Alarms)),
-    emqx_alarm_mgr:clear_alarm(<<"1">>),
-    [] = emqx_alarm_mgr:get_alarms().

+ 1 - 0
test/emqx_mqtt_packet_SUITE.erl

@@ -10,6 +10,7 @@
 %% distributed under the License is distributed on an "AS IS" BASIS,
 %% distributed under the License is distributed on an "AS IS" BASIS,
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
+%% limitations under the License.
 
 
 -module(emqx_mqtt_packet_SUITE).
 -module(emqx_mqtt_packet_SUITE).
 
 

+ 56 - 0
test/emqx_os_mon_SUITE.erl

@@ -0,0 +1,56 @@
+%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_os_mon_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-include_lib("common_test/include/ct.hrl").
+
+all() -> [t_api].
+
+init_per_suite(Config) ->
+    application:ensure_all_started(os_mon),
+    Config.
+
+end_per_suite(_Config) ->
+    application:stop(os_mon).
+
+t_api(_) ->
+    gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
+    {ok, _} = emqx_os_mon:start_link([{cpu_check_interval, 1},
+                                      {cpu_high_watermark, 0.05},
+                                      {cpu_low_watermark, 0.80},
+                                      {mem_check_interval, 60},
+                                      {sysmem_high_watermark, 0.70},
+                                      {procmem_high_watermark, 0.05}]),
+    ?assertEqual(1, emqx_os_mon:get_cpu_check_interval()),
+    ?assertEqual(0.05, emqx_os_mon:get_cpu_high_watermark()),
+    ?assertEqual(0.80, emqx_os_mon:get_cpu_low_watermark()),
+    ?assertEqual(60, emqx_os_mon:get_mem_check_interval()),
+    ?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()),
+    ?assertEqual(0.05, emqx_os_mon:get_procmem_high_watermark()),
+    % timer:sleep(2000),
+    % ?assertEqual(true, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())),
+
+    emqx_os_mon:set_cpu_high_watermark(0.8),
+    emqx_os_mon:set_cpu_low_watermark(0.75),
+    ?assertEqual(0.8, emqx_os_mon:get_cpu_high_watermark()),
+    ?assertEqual(0.75, emqx_os_mon:get_cpu_low_watermark()),
+    % timer:sleep(3000),
+    % ?assertEqual(false, lists:keymember(cpu_high_watermark, 1, alarm_handler:get_alarms())),
+    ok.

+ 50 - 0
test/emqx_vm_mon_SUITE.erl

@@ -0,0 +1,50 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2013-2013-2019 EMQ Enterprise, Inc. (http://emqtt.io)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_vm_mon_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-include_lib("common_test/include/ct.hrl").
+
+all() -> [t_api].
+
+init_per_suite(Config) ->
+    application:ensure_all_started(sasl),
+    Config.
+
+end_per_suite(_Config) ->
+    application:stop(sasl).
+
+t_api(_) ->
+    gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
+    {ok, _} = emqx_vm_mon:start_link([{check_interval, 1},
+                                      {process_high_watermark, 0},
+                                      {process_low_watermark, 0.6}]),
+    timer:sleep(2000),
+    ?assertEqual(true, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
+    emqx_vm_mon:set_process_high_watermark(0.8),
+    emqx_vm_mon:set_process_low_watermark(0.75),
+    ?assertEqual(0.8, emqx_vm_mon:get_process_high_watermark()),
+    ?assertEqual(0.75, emqx_vm_mon:get_process_low_watermark()),
+    timer:sleep(3000),
+    ?assertEqual(false, lists:keymember(too_many_processes, 1, alarm_handler:get_alarms())),
+    emqx_vm_mon:set_check_interval(20),
+    ?assertEqual(20, emqx_vm_mon:get_check_interval()),
+    ok.