Feng Lee пре 10 година
родитељ
комит
d23bf74d1e
2 измењених фајлова са 23 додато и 16 уклоњено
  1. 10 5
      src/emqttd_mqueue.erl
  2. 13 11
      src/emqttd_session.erl

+ 10 - 5
src/emqttd_mqueue.erl

@@ -59,7 +59,8 @@
 -export([new/3, name/1,
 -export([new/3, name/1,
          is_empty/1, is_full/1,
          is_empty/1, is_full/1,
          len/1, max_len/1,
          len/1, max_len/1,
-         in/2, out/1]).
+         in/2, out/1,
+         stats/1]).
 
 
 -define(LOW_WM, 0.2).
 -define(LOW_WM, 0.2).
 
 
@@ -72,6 +73,7 @@
                  high_wm    = ?HIGH_WM,
                  high_wm    = ?HIGH_WM,
                  max_len    = ?MAX_LEN,
                  max_len    = ?MAX_LEN,
                  qos0       = false,
                  qos0       = false,
+                 dropped    = 0,
                  alarm_fun}).
                  alarm_fun}).
 
 
 -type mqueue() :: #mqueue{}.
 -type mqueue() :: #mqueue{}.
@@ -111,6 +113,9 @@ len(#mqueue{len = Len}) -> Len.
 
 
 max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
 max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
 
 
+stats(#mqueue{max_len = MaxLen, len = Len, dropped = Dropped}) ->
+    [{max_len, MaxLen}, {len, Len}, {dropped, Dropped}].
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% @doc Queue one message.
 %% @doc Queue one message.
 %% @end
 %% @end
@@ -122,11 +127,11 @@ in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
     MQ;
     MQ;
 
 
 %% simply drop the oldest one if queue is full, improve later
 %% simply drop the oldest one if queue is full, improve later
-in(Msg, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen})
+in(Msg, MQ = #mqueue{q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
     when Len =:= MaxLen ->
     when Len =:= MaxLen ->
-    {{value, OldMsg}, Q2} = queue:out(Q),
-    lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]),
-    MQ#mqueue{q = queue:in(Msg, Q2)};
+    {{value, _OldMsg}, Q2} = queue:out(Q),
+    %lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]),
+    MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
 
 
 in(Msg, MQ = #mqueue{q = Q, len = Len}) ->
 in(Msg, MQ = #mqueue{q = Q, len = Len}) ->
     maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}).
     maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}).

+ 13 - 11
src/emqttd_session.erl

@@ -647,21 +647,23 @@ start_collector(Session = #session{collect_interval = Interval}) ->
     TRef = erlang:send_after(Interval * 1000, self(), collect_info),
     TRef = erlang:send_after(Interval * 1000, self(), collect_info),
     Session#session{collect_timer = TRef}.
     Session#session{collect_timer = TRef}.
 
 
-info(#session{clean_sess        = CleanSess,
-              subscriptions     = Subscriptions,
-              inflight_queue    = InflightQueue,
-              max_inflight      = MaxInflight,
-              message_queue     = MessageQueue,
-              awaiting_rel      = AwaitingRel,
-              awaiting_ack      = AwaitingAck,
-              awaiting_comp     = AwaitingComp,
-              timestamp         = CreatedAt}) ->
-    [{pid, self()}, 
+info(#session{clean_sess      = CleanSess,
+              subscriptions   = Subscriptions,
+              inflight_queue  = InflightQueue,
+              max_inflight    = MaxInflight,
+              message_queue   = MessageQueue,
+              awaiting_rel    = AwaitingRel,
+              awaiting_ack    = AwaitingAck,
+              awaiting_comp   = AwaitingComp,
+              timestamp       = CreatedAt}) ->
+    Stats = emqttd_mqueue:stats(MessageQueue),
+    [{pid, self()},
      {clean_sess, CleanSess},
      {clean_sess, CleanSess},
      {subscriptions, Subscriptions},
      {subscriptions, Subscriptions},
      {max_inflight, MaxInflight},
      {max_inflight, MaxInflight},
      {inflight_queue, length(InflightQueue)},
      {inflight_queue, length(InflightQueue)},
-     {message_queue, emqttd_mqueue:len(MessageQueue)},
+     {message_queue, proplists:get_value(len, Stats)},
+     {message_dropped, proplists:get_value(dropped, Stats)},
      {awaiting_rel, maps:size(AwaitingRel)},
      {awaiting_rel, maps:size(AwaitingRel)},
      {awaiting_ack, maps:size(AwaitingAck)},
      {awaiting_ack, maps:size(AwaitingAck)},
      {awaiting_comp, maps:size(AwaitingComp)},
      {awaiting_comp, maps:size(AwaitingComp)},