Просмотр исходного кода

refactor(alarm): new data structure and support regular cleaning of deactivated alarms

zhouzb 5 лет назад
Родитель
Сommit
1ba4743213
5 измененных файлов с 253 добавлено и 90 удалено
  1. 27 0
      etc/emqx.conf
  2. 33 4
      priv/emqx.schema
  3. 137 85
      src/emqx_alarm.erl
  4. 1 1
      src/emqx_sys_sup.erl
  5. 55 0
      test/emqx_alarm_SUITE.erl

+ 27 - 0
etc/emqx.conf

@@ -2158,4 +2158,31 @@ vm_mon.process_high_watermark = 80%
 ## Default: 60%
 vm_mon.process_low_watermark = 60%
 
+## Specifies the actions to take when an alarm is activated
+##
+## Value: String
+##  - log
+##  - publish
+##
+## Default: log,publish
+alarm.actions = log,publish
+
+## The maximum number of deactivated alarms
+##
+## Value: Integer 
+##
+## Default: 1000
+alarm.size_limit = 1000
+
+## Validity Period of deactivated alarms
+##
+## Value: Duration
+##  - h: hour
+##  - m: minute
+##  - s: second
+##  - ms: milliseconds
+##
+## Default: 24h
+alarm.validity_period = 24h
+
 {{ additional_configs }}

+ 33 - 4
priv/emqx.schema

@@ -2102,8 +2102,12 @@ end}.
 ]}.
 
 {translation, "emqx.os_mon", fun(Conf) ->
-    Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf),
-    [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs]
+    [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)},
+     {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf) * 100},
+     {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf) * 100},
+     {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)},
+     {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf) * 100},
+     {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf) * 100}]
 end}.
 
 %%--------------------------------------------------------------------
@@ -2125,6 +2129,31 @@ end}.
 ]}.
 
 {translation, "emqx.vm_mon", fun(Conf) ->
-    Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf),
-    [{list_to_atom(Name), Value * 100} || {[_, Name], Value} <- Configs]
+    [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
+     {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf) * 100},
+     {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf) * 100}]
+end}.
+
+%%--------------------------------------------------------------------
+%% Alarm
+%%--------------------------------------------------------------------
+{mapping, "alarm.actions", "emqx.alarm", [
+  {default, "log,publish"},
+  {datatype, string}
+]}.
+
+{mapping, "alarm.size_limit", "emqx.alarm", [
+  {default, 1000},
+  {datatype, integer}
+]}.
+
+{mapping, "alarm.validity_period", "emqx.alarm", [
+  {default, "24h"},
+  {datatype, {duration, s}}
+]}.
+
+{translation, "emqx.alarm", fun(Conf) ->
+    [{actions, [list_to_atom(Action) || Action <- string:tokens(cuttlefish:conf_get("alarm.actions", Conf), ",")]},
+     {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)},
+     {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}]
 end}.

+ 137 - 85
src/emqx_alarm.erl

@@ -23,7 +23,9 @@
 
 -logger_header("[Alarm Handler]").
 
--export([start_link/0, stop/0]).
+-export([ start_link/1
+        , stop/0
+        ]).
 
 %% API
 -export([ activate/1
@@ -43,27 +45,43 @@
         , code_change/3
         ]).
 
--record(alarm, {
+-record(activated_alarm, {
           name :: binary() | atom(),
 
           details :: map() | list(),
     
           message :: binary(),
     
+          activate_at :: integer()
+        }).
+
+-record(deactivated_alarm, {
           activate_at :: integer(),
+
+          name :: binary() | atom(),
+
+          details :: map() | list(),
     
-          deactivate_at :: integer() | infinity,
+          message :: binary(),
     
-          activated :: boolean()
+          deactivate_at :: integer() | infinity
         }).
 
 -record(state, {
-          actions :: [action()]
+          actions :: [action()],
+
+          size_limit :: non_neg_integer(),
+
+          validity_period :: non_neg_integer(),
+
+          timer = undefined :: undefined | reference()
         }).
 
 -type action() :: log | publish | event.
 
--define(TAB, emqx_alarm).
+-define(ACTIVATED_ALARM, emqx_activated_alarm).
+
+-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
 
 -ifdef(TEST).
 -compile(export_all).
@@ -74,9 +92,8 @@
 %% API
 %%--------------------------------------------------------------------
 
--spec(start_link() -> emqx_types:startlink_ret()).
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(Opts) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
 
 stop() ->
     gen_server:stop(?MODULE).
@@ -113,81 +130,85 @@ init([]) ->
     Opts = [{actions, [log, publish]}],
     init([Opts]);
 init([Opts]) ->
-    ok = ekka_mnesia:create_table(?TAB,
-             [{type, bag},
+    ok = ekka_mnesia:create_table(?ACTIVATED_ALARM,
+             [{type, set},
+              {disc_copies, [node()]},
+              {local_content, true},
+              {record_name, activated_alarm},
+              {attributes, record_info(fields, activated_alarm)}]),
+    ok = ekka_mnesia:create_table(?DEACTIVATED_ALARM,
+             [{type, ordered_set},
               {disc_copies, [node()]},
               {local_content, true},
-              {record_name, alarm},
-              {attributes, record_info(fields, alarm)}]),
-    Actions = proplists:get_value(actions, Opts, [log, publish]),
+              {record_name, deactivated_alarm},
+              {attributes, record_info(fields, deactivated_alarm)}]),
     deactivate_all_alarms(),
-    {ok, #state{actions = Actions}}.
+    Actions = proplists:get_value(actions, Opts),
+    SizeLimit = proplists:get_value(size_limit, Opts),
+    ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)),
+    {ok, ensure_delete_timer(#state{actions = Actions,
+                                    size_limit = SizeLimit,
+                                    validity_period = ValidityPeriod})}.
 
 handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) ->
-    case get(Name) of
-        set ->
+    case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
+        [#activated_alarm{name = Name}] ->
             {reply, {error, already_existed}, State};
-        undefined ->
-            Alarm = #alarm{name = Name,
-                           details = Details,
-                           message = normalize_message(Name, Details),
-                           activate_at = erlang:system_time(millisecond),
-                           deactivate_at = infinity,
-                           activated = true},
-            mnesia:dirty_write(?TAB, Alarm),
-            put(Name, set),
+        [] ->
+            Alarm = #activated_alarm{name = Name,
+                                     details = Details,
+                                     message = normalize_message(Name, Details),
+                                     activate_at = erlang:system_time(microsecond)},
+            mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
             do_actions(activate, Alarm, Actions),
             {reply, ok, State}
     end;
 
-handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions}) ->
-    case get(Name) of
-        set ->
-            MatchSpec = [{#alarm{name = '$1', activated = '$2', _ = '_'},
-                         [{'==', '$1', Name}, {'==', '$2', true}],
-                         ['$_']}],
-            case mnesia:dirty_select(?TAB, MatchSpec) of
-                [] ->
-                    erase(Name),
-                    {reply, {error, not_found}, State};
-                [Alarm | _] ->
-                    NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond),
-                                         activated = false},
-                    mnesia:dirty_delete_object(?TAB, Alarm),
-                    mnesia:dirty_write(?TAB, NAlarm),
-                    erase(Name),
-                    do_actions(deactivate, NAlarm, Actions),
-                    {reply, ok, State}
-            end;
-        undefined ->
-            {reply, {error, not_found}, State}
+handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions,
+                                                            size_limit = SizeLimit}) ->
+    case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
+        [] ->
+            {reply, {error, not_found}, State};
+        [#activated_alarm{name = Name,
+                          details = Details,
+                          message = Message,
+                          activate_at = ActivateAt}] ->
+            case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
+                true ->
+                    case mnesia:dirty_first(?DEACTIVATED_ALARM) of
+                        '$end_of_table' ->
+                            ok;
+                        ActivateAt2 ->
+                            mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
+                    end;
+                false ->
+                    ok
+            end,
+            Alarm = #deactivated_alarm{activate_at = ActivateAt,
+                                        name = Name,
+                                        details = Details,
+                                        message = Message,
+                                        deactivate_at = erlang:system_time(microsecond)},
+            mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
+            mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm),
+            do_actions(deactivate, Alarm, Actions),
+            {reply, ok, State}
     end;
 
 handle_call(delete_all_deactivated_alarms, _From, State) ->
-    MatchSpec = [{#alarm{activated = '$1', _ = '_'},
-                 [{'==', '$1', false}],
-                 ['$_']}],
-    lists:foreach(fun(Alarm) ->
-                      mnesia:dirty_delete_object(?TAB, Alarm)
-                  end, mnesia:dirty_select(?TAB, MatchSpec)),
+    mnesia:clear_table(?DEACTIVATED_ALARM),
     {reply, ok, State};
 
 handle_call({get_alarms, all}, _From, State) ->
-    Alarms = ets:tab2list(?TAB),
-    {reply, [normalize(Alarm) || Alarm <- Alarms], State};
+    Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM) ++ ets:tab2list(?DEACTIVATED_ALARM)],
+    {reply, Alarms, State};
 
 handle_call({get_alarms, activated}, _From, State) ->
-    MatchSpec = [{#alarm{activated = '$1', _ = '_'},
-                 [{'==', '$1', true}],
-                 ['$_']}],
-    Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)],
+    Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM)],
     {reply, Alarms, State};
 
 handle_call({get_alarms, deactivated}, _From, State) ->
-    MatchSpec = [{#alarm{activated = '$1', _ = '_'},
-                 [{'==', '$1', false}],
-                 ['$_']}],
-    Alarms = [normalize(Alarm) || Alarm <- mnesia:dirty_select(?TAB, MatchSpec)],
+    Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?DEACTIVATED_ALARM)],
     {reply, Alarms, State};
 
 handle_call(Req, _From, State) ->
@@ -198,6 +219,12 @@ handle_cast(Msg, State) ->
     ?LOG(error, "Unexpected msg: ~p", [Msg]),
     {noreply, State}.
 
+handle_info({timeout, TRef, delete_expired_deactivated_alarm},
+            State = #state{timer = TRef,
+                           validity_period = ValidityPeriod}) ->
+    delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000),
+    {noreply, ensure_delete_timer(State)};
+
 handle_info(Info, State) ->
     ?LOG(error, "Unexpected info: ~p", [Info]),
     {noreply, State}.
@@ -213,27 +240,43 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 
 deactivate_all_alarms() ->
-    MatchSpec = [{#alarm{activated = '$1', _ = '_'},
-                 [{'==', '$1', true}],
-                 ['$_']}],
-    case mnesia:dirty_select(?TAB, MatchSpec) of
-        [] ->
-            ok;
-        Alarms ->
-            lists:foreach(fun(Alarm) ->
-                              NAlarm = Alarm#alarm{deactivate_at = erlang:system_time(millisecond),
-                                                   activated = false},
-                              mnesia:dirty_delete_object(?TAB, Alarm),
-                              mnesia:dirty_write(?TAB, NAlarm)
-                          end, Alarms)
+    lists:foreach(fun(#activated_alarm{name = Name,
+                                       details = Details,
+                                       message = Message,
+                                       activate_at = ActivateAt}) ->
+                      mnesia:dirty_write(?DEACTIVATED_ALARM,
+                                         #deactivated_alarm{activate_at = ActivateAt,
+                                                            name = Name,
+                                                            details = Details,
+                                                            message = Message,
+                                                            deactivate_at = erlang:system_time(microsecond)})
+                  end, ets:tab2list(?ACTIVATED_ALARM)),
+    mnesia:clear_table(?ACTIVATED_ALARM).
+
+ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) ->
+    State#state{timer = emqx_misc:start_timer(ValidityPeriod div 1, delete_expired_deactivated_alarm)}.
+
+delete_expired_deactivated_alarms(Checkpoint) ->
+    delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint).
+
+delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) ->
+    ok;
+delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) ->
+    case ActivatedAt =< Checkpoint of
+        true ->
+            mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
+            NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt),
+            delete_expired_deactivated_alarms(NActivatedAt, Checkpoint);
+        false ->
+            ok
     end.
 
 do_actions(_, _, []) ->
     ok;
-do_actions(activate, Alarm = #alarm{name = Name, message = Message}, [log | More]) ->
+do_actions(activate, Alarm = #activated_alarm{name = Name, message = Message}, [log | More]) ->
     ?LOG(warning, "Alarm ~p is activated, ~s", [Name, Message]),
     do_actions(activate, Alarm, More);
-do_actions(deactivate, Alarm = #alarm{name = Name}, [log | More]) ->
+do_actions(deactivate, Alarm = #deactivated_alarm{name = Name}, [log | More]) ->
     ?LOG(warning, "Alarm ~p is deactivated", [Name]),
     do_actions(deactivate, Alarm, More);
 do_actions(Operation, Alarm, [publish | More]) ->
@@ -252,18 +295,27 @@ topic(activate) ->
 topic(deactivate) ->
     emqx_topic:systop(<<"alarms/deactivate">>).
 
-normalize(#alarm{name = Name,
-                 details = Details,
-                 message = Message,
-                 activate_at = ActivateAt,
-                 deactivate_at = DeactivateAt,
-                 activated = Activated}) ->
+normalize(#activated_alarm{name = Name,
+                           details = Details,
+                           message = Message,
+                           activate_at = ActivateAt}) ->
+    #{name => Name,
+      details => Details,
+      message => Message,
+      activate_at => ActivateAt,
+      deactivate_at => infinity,
+      activated => true};
+normalize(#deactivated_alarm{activate_at = ActivateAt,
+                             name = Name,
+                             details = Details,
+                             message = Message,
+                             deactivate_at = DeactivateAt}) ->
     #{name => Name,
       details => Details,
       message => Message,
       activate_at => ActivateAt,
       deactivate_at => DeactivateAt,
-      activated => Activated}.
+      activated => false}.
 
 normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
     list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));

+ 1 - 1
src/emqx_sys_sup.erl

@@ -27,7 +27,7 @@ start_link() ->
 
 init([]) ->
     Childs = [child_spec(emqx_sys),
-              child_spec(emqx_alarm),
+              child_spec(emqx_alarm,   [config(alarm)]),
               child_spec(emqx_sys_mon, [config(sysmon)]),
               child_spec(emqx_os_mon,  [config(os_mon)]),
               child_spec(emqx_vm_mon,  [config(vm_mon)])],

+ 55 - 0
test/emqx_alarm_SUITE.erl

@@ -33,6 +33,38 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+init_per_testcase(t_size_limit, Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([],
+        fun(emqx) ->
+            application:set_env(emqx, alarm, [{actions, [log,publish]},
+                                              {size_limit, 2},
+                                              {validity_period, 3600}]),
+            ok;
+           (_) ->
+            ok
+        end),
+    Config;
+init_per_testcase(t_validity_period, Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([],
+        fun(emqx) ->
+            application:set_env(emqx, alarm, [{actions, [log,publish]},
+                                              {size_limit, 1000},
+                                              {validity_period, 1}]),
+            ok;
+           (_) ->
+            ok
+        end),
+    Config;
+init_per_testcase(_, Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_testcase(_, _Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
 t_alarm(_) ->
     ok = emqx_alarm:activate(unknown_alarm),
     {error, already_existed} = emqx_alarm:activate(unknown_alarm),
@@ -59,6 +91,29 @@ t_deactivate_all_alarms(_) ->
     emqx_alarm:delete_all_deactivated_alarms(),
     ?assertEqual({error, not_found}, get_alarm(unknown_alarm, emqx_alarm:get_alarms(deactivated))).
 
+t_size_limit(_) ->
+    ok = emqx_alarm:activate(a),
+    ok = emqx_alarm:deactivate(a),
+    ok = emqx_alarm:activate(b),
+    ok = emqx_alarm:deactivate(b),
+    ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
+    ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
+    ok = emqx_alarm:activate(c),
+    ok = emqx_alarm:deactivate(c),
+    ?assertNotEqual({error, not_found}, get_alarm(c, emqx_alarm:get_alarms(deactivated))),
+    ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
+    emqx_alarm:delete_all_deactivated_alarms().
+
+t_validity_period(_) ->
+    ok = emqx_alarm:activate(a),
+    ok = emqx_alarm:deactivate(a),
+    dbg:tracer(),
+    dbg:p(all, c),
+    dbg:tpl(emqx_alarm, delete_expired_deactivated_alarms, cx),
+    ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))),
+    ct:sleep(2000),
+    ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))).
+
 get_alarm(Name, [Alarm = #{name := Name} | _More]) ->
     Alarm;
 get_alarm(Name, [_Alarm | More]) ->