Feng Lee 10 лет назад
Родитель
Сommit
2ff78d7fc6
4 измененных файлов с 97 добавлено и 42 удалено
  1. 15 2
      include/emqttd.hrl
  2. 57 18
      src/emqttd_alarm.erl
  3. 24 21
      src/emqttd_mqueue.erl
  4. 1 1
      src/emqttd_session.erl

+ 15 - 2
include/emqttd.hrl

@@ -111,14 +111,15 @@
 -type mqtt_msgid() :: undefined | 1..16#ffff.
 
 -record(mqtt_message, {
-    topic           :: binary(),          %% The topic published to
+    topic           :: binary(),          %% Topic that the message is published to
     from            :: binary() | atom(), %% ClientId of publisher
     qos    = 0      :: 0 | 1 | 2,         %% Message QoS
     retain = false  :: boolean(),         %% Retain flag
     dup    = false  :: boolean(),         %% Dup flag
     sys    = false  :: boolean(),         %% $SYS flag
     msgid           :: mqtt_msgid(),      %% Message ID
-    payload         :: binary()           %% Payload
+    payload         :: binary(),          %% Payload
+    timestamp       :: erlang:timestamp() %% Timestamp
 }).
 
 -type mqtt_message() :: #mqtt_message{}.
@@ -135,4 +136,16 @@
 
 -type mqtt_plugin() :: #mqtt_plugin{}.
 
+%%------------------------------------------------------------------------------
+%% MQTT Alarm
+%%------------------------------------------------------------------------------
+-record(mqtt_alarm, {
+    id          :: binary(),
+    severity    :: warning | error | critical,
+    title       :: binary(),
+    summary     :: binary(),
+    timestamp   :: erlang:timestamp() %% Timestamp
+}).
+
+-type mqtt_alarm() :: #mqtt_alarm{}.
 

+ 57 - 18
src/emqttd_alarm.erl

@@ -27,17 +27,18 @@
 
 -module(emqttd_alarm).
 
--export([start_link/0, set_alarm/1, clear_alarm/1, get_alarms/0,
-	 add_alarm_handler/1, add_alarm_handler/2,
-	 delete_alarm_handler/1]).
+-include("emqttd.hrl").
+
+-export([start_link/0, alarm_fun/0, get_alarms/0,
+         set_alarm/1, clear_alarm/1,
+         add_alarm_handler/1, add_alarm_handler/2,
+         delete_alarm_handler/1]).
 
 -export([init/1, handle_event/2, handle_call/2, handle_info/2,
          terminate/2]).
 
 -define(SERVER, ?MODULE).
 
--type alarm() :: {AlarmId :: any(), AlarmDescription :: string() | binary()}.
-
 start_link() ->
     case gen_event:start_link({local, ?SERVER}) of
 	{ok, Pid} ->
@@ -47,12 +48,22 @@ start_link() ->
         Error
     end.
 
--spec set_alarm(alarm()) -> ok.
-set_alarm(Alarm) ->
+alarm_fun() ->
+    alarm_fun(false).
+
+alarm_fun(Bool) ->
+    fun(alert, _Alarm)   when Bool =:= true  -> alarm_fun(true);
+       (alert,  Alarm)   when Bool =:= false -> set_alarm(Alarm), alarm_fun(true);
+       (clear,  AlarmId) when Bool =:= true  -> clear_alarm(AlarmId), alarm_fun(false);
+       (clear, _AlarmId) when Bool =:= false -> alarm_fun(false)
+    end.
+
+-spec set_alarm(mqtt_alarm()) -> ok.
+set_alarm(Alarm) when is_record(Alarm, mqtt_alarm) ->
     gen_event:notify(?SERVER, {set_alarm, Alarm}).
 
 -spec clear_alarm(any()) -> ok.
-clear_alarm(AlarmId) ->
+clear_alarm(AlarmId) when is_binary(AlarmId) ->
     gen_event:notify(?SERVER, {clear_alarm, AlarmId}).
 
 get_alarms() ->
@@ -70,25 +81,38 @@ delete_alarm_handler(Module) when is_atom(Module) ->
 %%-----------------------------------------------------------------
 %% Default Alarm handler
 %%-----------------------------------------------------------------
-   
-init(_) -> {ok, []}.
+init(_) ->
+    {ok, []}.
     
-handle_event({set_alarm, Alarm}, Alarms)->
-    %%TODO: publish to $SYS
-    {ok, [Alarm | Alarms]};
+handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId,
+                                             severity = Severity,
+                                             title = Title,
+                                             summary = Summary}}, Alarms)->
+    Timestamp = os:timestamp(),
+    Json = mochijson2:encode([{id, AlarmId},
+                              {severity, Severity},
+                              {title, iolist_to_binary(Title)},
+                              {summary, iolist_to_binary(Summary)},
+                              {ts, emqttd_util:now_to_secs(Timestamp)}]),
+    emqttd_pubsub:publish(alarm_msg(alert, AlarmId, Json)),
+    {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]};
 
 handle_event({clear_alarm, AlarmId}, Alarms)->
-    %TODO: publish to $SYS
-    {ok, lists:keydelete(AlarmId, 1, Alarms)};
+    Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_util:now_to_secs()}]),
+    emqttd_pubsub:publish(alarm_msg(clear, AlarmId, Json)),
+    {ok, lists:keydelete(AlarmId, 2, Alarms)};
 
 handle_event(_, Alarms)->
     {ok, Alarms}.
 
-handle_info(_, Alarms) -> {ok, Alarms}.
+handle_info(_, Alarms) ->
+    {ok, Alarms}.
 
-handle_call(get_alarms, Alarms) -> {ok, Alarms, Alarms};
+handle_call(get_alarms, Alarms) ->
+    {ok, Alarms, Alarms};
 
-handle_call(_Query, Alarms)     -> {ok, {error, bad_query}, Alarms}.
+handle_call(_Query, Alarms) -> 
+    {ok, {error, bad_query}, Alarms}.
 
 terminate(swap, Alarms) ->
     {?MODULE, Alarms};
@@ -96,3 +120,18 @@ terminate(swap, Alarms) ->
 terminate(_, _) ->
     ok.
 
+alarm_msg(Type, AlarmId, Json) ->
+    #mqtt_message{from    = alarm,
+                  qos     = 1,
+                  sys     = true,
+                  topic   = topic(Type, AlarmId),
+                  payload = iolist_to_binary(Json),
+                  timestamp = os:timestamp()}.
+
+topic(alert, AlarmId) ->
+    emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
+
+topic(clear, AlarmId) ->
+    emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>).
+
+

+ 24 - 21
src/emqttd_mqueue.erl

@@ -55,7 +55,7 @@
 -include("emqttd.hrl").
 -include("emqttd_protocol.hrl").
 
--export([new/2, name/1,
+-export([new/3, name/1,
          is_empty/1, is_full/1,
          len/1, in/2, out/1]).
 
@@ -64,18 +64,17 @@
 -define(HIGH_WM, 0.6).
 
 -record(mqueue, {name,
-                 q        = queue:new(), %% pending queue
-                 len      = 0,           %% current queue len
-                 low_wm   = ?LOW_WM,
-                 high_wm  = ?HIGH_WM,
-                 max_len  = ?MAX_LEN,
-                 qos0     = false,
-                 alarm    = false}).
+                 q          = queue:new(), %% pending queue
+                 len        = 0,           %% current queue len
+                 low_wm     = ?LOW_WM,
+                 high_wm    = ?HIGH_WM,
+                 max_len    = ?MAX_LEN,
+                 qos0       = false,
+                 alarm_fun}).
 
 -type mqueue() :: #mqueue{}.
 
 -type mqueue_option() :: {max_length, pos_integer()}      %% Max queue length
-                       | {inflight_window, pos_integer()} %% Inflight Window
                        | {low_watermark, float()}         %% Low watermark
                        | {high_watermark, float()}        %% High watermark
                        | {queue_qos0, boolean()}.         %% Queue Qos0
@@ -86,14 +85,15 @@
 %% @doc New Queue.
 %% @end
 %%------------------------------------------------------------------------------
--spec new(binary(), list(mqueue_option())) -> mqueue().
-new(Name, Opts) ->
+-spec new(binary(), list(mqueue_option()), fun()) -> mqueue().
+new(Name, Opts, AlarmFun) ->
     MaxLen = emqttd_opts:g(max_length, Opts, 1000),
     #mqueue{name     = Name,
             max_len  = MaxLen,
             low_wm   = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)),
             high_wm  = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)),
-            qos0     = emqttd_opts:g(queue_qos0, Opts, true)}.
+            qos0     = emqttd_opts:g(queue_qos0, Opts, true),
+            alarm_fun = AlarmFun}.
 
 name(#mqueue{name = Name}) ->
     Name.
@@ -135,18 +135,21 @@ out(MQ = #mqueue{q = Q, len = Len}) ->
     {Result, Q2} = queue:out(Q),
     {Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}.
 
-maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm = false})
-    when Len >= HighWM ->
-    AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]),
-    emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}),
-    MQ#mqueue{alarm = true};
+maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
+    when Len > HighWM ->
+    Alarm = #mqtt_alarm{id = list_to_binary(["queue_high_watermark.", Name]),
+                        severity = warning,
+                        title = io_lib:format("Queue ~s high-water mark", [Name]),
+                        summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
+    MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
+
 maybe_set_alarm(MQ) ->
     MQ.
 
-maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm = true})
-    when Len =< LowWM ->
-    emqttd_alarm:clear_alarm({queue_high_watermark, Name}),
-    MQ#mqueue{alarm = false};
+maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
+    when Len < LowWM ->
+    MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
+
 maybe_clear_alarm(MQ) ->
     MQ.
 

+ 1 - 1
src/emqttd_session.erl

@@ -222,7 +222,7 @@ init([CleanSess, ClientId, ClientPid]) ->
             subscriptions     = [],
             inflight_queue    = [],
             max_inflight      = emqttd_opts:g(max_inflight, SessEnv, 0),
-            message_queue     = emqttd_mqueue:new(ClientId, QEnv),
+            message_queue     = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
             awaiting_rel      = #{},
             awaiting_ack      = #{},
             awaiting_comp     = #{},