emqttd_mqueue.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. %%%-----------------------------------------------------------------------------
  2. %%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
  3. %%%
  4. %%% Permission is hereby granted, free of charge, to any person obtaining a copy
  5. %%% of this software and associated documentation files (the "Software"), to deal
  6. %%% in the Software without restriction, including without limitation the rights
  7. %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. %%% copies of the Software, and to permit persons to whom the Software is
  9. %%% furnished to do so, subject to the following conditions:
  10. %%%
  11. %%% The above copyright notice and this permission notice shall be included in all
  12. %%% copies or substantial portions of the Software.
  13. %%%
  14. %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. %%% SOFTWARE.
  21. %%%-----------------------------------------------------------------------------
  22. %%% @doc
  23. %%%
  24. %%% A Simple in-memory message queue.
  25. %%%
  26. %%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
  27. %%% should be online in most of the time.
  28. %%%
  29. %%% This module implements a simple in-memory queue for MQTT persistent session.
  30. %%%
  31. %%% If the broker restarted or crashed, all the messages queued will be gone.
  32. %%%
  33. %%% Desgin of The Queue:
  34. %%% |<----------------- Max Len ----------------->|
  35. %%% -----------------------------------------------
  36. %%% IN -> | Pending Messages | Inflight Window | -> Out
  37. %%% -----------------------------------------------
  38. %%% |<--- Win Size --->|
  39. %%%
  40. %%%
  41. %%% 1. Inflight Window to store the messages awaiting for ack.
  42. %%%
  43. %%% 2. Suspend IN messages when the queue is deactive, or inflight windows is full.
  44. %%%
  45. %%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
  46. %%% otherwise dropped the oldest pending one.
  47. %%%
  48. %%% @end
  49. %%%
  50. %%% @author Feng Lee <feng@emqtt.io>
  51. %%%-----------------------------------------------------------------------------
  52. -module(emqttd_mqueue).
  53. -include("emqttd.hrl").
  54. -include("emqttd_protocol.hrl").
  55. -export([new/3, name/1,
  56. is_empty/1, is_full/1,
  57. len/1, max_len/1,
  58. in/2, out/1,
  59. stats/1]).
  60. -define(LOW_WM, 0.2).
  61. -define(HIGH_WM, 0.6).
  62. -record(mqueue, {name,
  63. q = queue:new(), %% pending queue
  64. len = 0, %% current queue len
  65. low_wm = ?LOW_WM,
  66. high_wm = ?HIGH_WM,
  67. max_len = ?MAX_LEN,
  68. qos0 = false,
  69. dropped = 0,
  70. alarm_fun}).
  71. -type mqueue() :: #mqueue{}.
  72. -type mqueue_option() :: {max_length, pos_integer()} %% Max queue length
  73. | {low_watermark, float()} %% Low watermark
  74. | {high_watermark, float()} %% High watermark
  75. | {queue_qos0, boolean()}. %% Queue Qos0
  76. -export_type([mqueue/0]).
  77. %%------------------------------------------------------------------------------
  78. %% @doc New Queue.
  79. %% @end
  80. %%------------------------------------------------------------------------------
  81. -spec new(binary(), list(mqueue_option()), fun()) -> mqueue().
  82. new(Name, Opts, AlarmFun) ->
  83. MaxLen = emqttd_opts:g(max_length, Opts, 1000),
  84. #mqueue{name = Name,
  85. max_len = MaxLen,
  86. low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)),
  87. high_wm = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)),
  88. qos0 = emqttd_opts:g(queue_qos0, Opts, true),
  89. alarm_fun = AlarmFun}.
  90. name(#mqueue{name = Name}) ->
  91. Name.
  92. is_empty(#mqueue{len = 0}) -> true;
  93. is_empty(_MQ) -> false.
  94. is_full(#mqueue{len = Len, max_len = MaxLen})
  95. when Len =:= MaxLen -> true;
  96. is_full(_MQ) -> false.
  97. len(#mqueue{len = Len}) -> Len.
  98. max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
  99. stats(#mqueue{max_len = MaxLen, len = Len, dropped = Dropped}) ->
  100. [{max_len, MaxLen}, {len, Len}, {dropped, Dropped}].
  101. %%------------------------------------------------------------------------------
  102. %% @doc Queue one message.
  103. %% @end
  104. %%------------------------------------------------------------------------------
  105. -spec in(mqtt_message(), mqueue()) -> mqueue().
  106. %% drop qos0
  107. in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
  108. MQ;
  109. %% simply drop the oldest one if queue is full, improve later
  110. in(Msg, MQ = #mqueue{q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
  111. when Len =:= MaxLen ->
  112. {{value, _OldMsg}, Q2} = queue:out(Q),
  113. %lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]),
  114. MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
  115. in(Msg, MQ = #mqueue{q = Q, len = Len}) ->
  116. maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}).
  117. out(MQ = #mqueue{len = 0}) ->
  118. {empty, MQ};
  119. out(MQ = #mqueue{q = Q, len = Len}) ->
  120. {Result, Q2} = queue:out(Q),
  121. {Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}.
  122. maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
  123. when Len > HighWM ->
  124. Alarm = #mqtt_alarm{id = list_to_binary(["queue_high_watermark.", Name]),
  125. severity = warning,
  126. title = io_lib:format("Queue ~s high-water mark", [Name]),
  127. summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
  128. MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
  129. maybe_set_alarm(MQ) ->
  130. MQ.
  131. maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
  132. when Len < LowWM ->
  133. MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
  134. maybe_clear_alarm(MQ) ->
  135. MQ.