emqttd_mqueue.erl 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc A Simple in-memory message queue.
  17. %%
  18. %% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
  19. %% should be online in most of the time.
  20. %%
  21. %% This module implements a simple in-memory queue for MQTT persistent session.
  22. %%
  23. %% If the broker restarted or crashed, all the messages queued will be gone.
  24. %%
  25. %% Concept of Message Queue and Inflight Window:
  26. %%
  27. %% |<----------------- Max Len ----------------->|
  28. %% -----------------------------------------------
  29. %% IN -> | Messages Queue | Inflight Window | -> Out
  30. %% -----------------------------------------------
  31. %% |<--- Win Size --->|
  32. %%
  33. %%
  34. %% 1. Inflight Window to store the messages delivered and awaiting for puback.
  35. %%
  36. %% 2. Enqueue messages when the inflight window is full.
  37. %%
  38. %% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
  39. %% otherwise dropped the oldest one.
  40. %%
  41. %% @end
  42. -module(emqttd_mqueue).
  43. -include("emqttd.hrl").
  44. -include("emqttd_protocol.hrl").
  45. -export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]).
  46. -define(LOW_WM, 0.2).
  47. -define(HIGH_WM, 0.6).
  48. -type priority() :: {iolist(), pos_integer()}.
  49. -type option() :: {type, simple | priority}
  50. | {max_length, pos_integer() | infinity}
  51. | {priority, list(priority())}
  52. | {low_watermark, float()} %% Low watermark
  53. | {high_watermark, float()} %% High watermark
  54. | {queue_qos0, boolean()}. %% Queue Qos0?
  55. -type mqueue_option() :: {max_length, pos_integer()} %% Max queue length
  56. | {low_watermark, float()} %% Low watermark
  57. | {high_watermark, float()} %% High watermark
  58. | {queue_qos0, boolean()}. %% Queue Qos0
  59. -type stat() :: {max_len, infinity | pos_integer()}
  60. | {len, non_neg_integer()}
  61. | {dropped, non_neg_integer()}.
  62. -record(mqueue, {type :: simple | priority,
  63. name, q :: queue:queue() | priority_queue:q(),
  64. %% priority table
  65. pseq = 0, priorities = [],
  66. %% len of simple queue
  67. len = 0, max_len = ?MAX_LEN,
  68. low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
  69. qos0 = false, dropped = 0,
  70. alarm_fun}).
  71. -type mqueue() :: #mqueue{}.
  72. -export_type([mqueue/0, priority/0, option/0]).
  73. %% @doc New Queue.
  74. -spec new(iolist(), list(mqueue_option()), fun()) -> mqueue().
  75. new(Name, Opts, AlarmFun) ->
  76. Type = emqttd_opts:g(type, Opts, simple),
  77. MaxLen = emqttd_opts:g(max_length, Opts, infinity),
  78. init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
  79. len = 0, max_len = MaxLen,
  80. low_wm = low_wm(MaxLen, Opts),
  81. high_wm = high_wm(MaxLen, Opts),
  82. qos0 = emqttd_opts:g(queue_qos0, Opts, false),
  83. alarm_fun = AlarmFun}, Opts).
  84. init_q(MQ = #mqueue{type = simple}, _Opts) ->
  85. MQ#mqueue{q = queue:new()};
  86. init_q(MQ = #mqueue{type = priority}, Opts) ->
  87. Priorities = emqttd_opts:g(priority, Opts, []),
  88. init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
  89. init_p([], MQ) ->
  90. MQ;
  91. init_p([{Topic, P} | L], MQ) ->
  92. {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
  93. init_p(L, MQ1).
  94. insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
  95. <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
  96. {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
  97. low_wm(infinity, _Opts) ->
  98. infinity;
  99. low_wm(MaxLen, Opts) ->
  100. round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)).
  101. high_wm(infinity, _Opts) ->
  102. infinity;
  103. high_wm(MaxLen, Opts) ->
  104. round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)).
  105. -spec name(mqueue()) -> iolist().
  106. name(#mqueue{name = Name}) ->
  107. Name.
  108. -spec type(mqueue()) -> atom().
  109. type(#mqueue{type = Type}) ->
  110. Type.
  111. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
  112. is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q).
  113. len(#mqueue{type = simple, len = Len}) -> Len;
  114. len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q).
  115. max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
  116. %% @doc Stats of the mqueue
  117. -spec stats(mqueue()) -> [stat()].
  118. stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
  119. [{len, case Type of
  120. simple -> Len;
  121. priority -> priority_queue:len(Q)
  122. end} | [{max_len, MaxLen}, {dropped, Dropped}]].
  123. %% @doc Enqueue a message.
  124. -spec in(mqtt_message(), mqueue()) -> mqueue().
  125. in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
  126. MQ;
  127. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
  128. MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
  129. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
  130. when Len >= MaxLen ->
  131. {{value, _Old}, Q2} = queue:out(Q),
  132. MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
  133. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  134. maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
  135. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  136. priorities = Priorities,
  137. max_len = infinity}) ->
  138. case lists:keysearch(Topic, 1, Priorities) of
  139. {value, {_, Pri}} ->
  140. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)};
  141. false ->
  142. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  143. MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  144. end;
  145. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  146. priorities = Priorities,
  147. max_len = MaxLen}) ->
  148. case lists:keysearch(Topic, 1, Priorities) of
  149. {value, {_, Pri}} ->
  150. case priority_queue:plen(Pri, Q) >= MaxLen of
  151. true ->
  152. {_, Q1} = priority_queue:out(Pri, Q),
  153. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)};
  154. false ->
  155. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  156. end;
  157. false ->
  158. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  159. MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  160. end.
  161. out(MQ = #mqueue{type = simple, len = 0}) ->
  162. {empty, MQ};
  163. out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
  164. {R, Q2} = queue:out(Q),
  165. {R, MQ#mqueue{q = Q2, len = Len - 1}};
  166. out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  167. {R, Q2} = queue:out(Q),
  168. {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
  169. out(MQ = #mqueue{type = priority, q = Q}) ->
  170. {R, Q2} = priority_queue:out(Q),
  171. {R, MQ#mqueue{q = Q2}}.
  172. maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
  173. when Len > HighWM ->
  174. Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
  175. severity = warning,
  176. title = io_lib:format("Queue ~s high-water mark", [Name]),
  177. summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
  178. MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
  179. maybe_set_alarm(MQ) ->
  180. MQ.
  181. maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
  182. when Len < LowWM ->
  183. MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
  184. maybe_clear_alarm(MQ) ->
  185. MQ.