emqttd_mqueue.erl 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc A Simple in-memory message queue.
  17. %%
  18. %% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
  19. %% should be online in most of the time.
  20. %%
  21. %% This module implements a simple in-memory queue for MQTT persistent session.
  22. %%
  23. %% If the broker restarted or crashed, all the messages queued will be gone.
  24. %%
  25. %% Concept of Message Queue and Inflight Window:
  26. %%
  27. %% |<----------------- Max Len ----------------->|
  28. %% -----------------------------------------------
  29. %% IN -> | Messages Queue | Inflight Window | -> Out
  30. %% -----------------------------------------------
  31. %% |<--- Win Size --->|
  32. %%
  33. %%
  34. %% 1. Inflight Window to store the messages delivered and awaiting for puback.
  35. %%
  36. %% 2. Enqueue messages when the inflight window is full.
  37. %%
  38. %% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
  39. %% otherwise dropped the oldest one.
  40. %%
  41. %% @end
  42. -module(emqttd_mqueue).
  43. -include("emqttd.hrl").
  44. -include("emqttd_protocol.hrl").
  45. -import(proplists, [get_value/3]).
  46. -export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]).
  47. -define(LOW_WM, 0.2).
  48. -define(HIGH_WM, 0.6).
  49. -type priority() :: {iolist(), pos_integer()}.
  50. -type option() :: {type, simple | priority}
  51. | {max_length, pos_integer() | infinity}
  52. | {priority, list(priority())}
  53. | {low_watermark, float()} %% Low watermark
  54. | {high_watermark, float()} %% High watermark
  55. | {queue_qos0, boolean()}. %% Queue Qos0?
  56. -type mqueue_option() :: {max_length, pos_integer()} %% Max queue length
  57. | {low_watermark, float()} %% Low watermark
  58. | {high_watermark, float()} %% High watermark
  59. | {queue_qos0, boolean()}. %% Queue Qos0
  60. -type stat() :: {max_len, infinity | pos_integer()}
  61. | {len, non_neg_integer()}
  62. | {dropped, non_neg_integer()}.
  63. -record(mqueue, {type :: simple | priority,
  64. name, q :: queue:queue() | priority_queue:q(),
  65. %% priority table
  66. pseq = 0, priorities = [],
  67. %% len of simple queue
  68. len = 0, max_len = ?MAX_LEN,
  69. low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
  70. qos0 = false, dropped = 0,
  71. alarm_fun}).
  72. -type mqueue() :: #mqueue{}.
  73. -export_type([mqueue/0, priority/0, option/0]).
  74. %% @doc New Queue.
  75. -spec(new(iolist(), list(mqueue_option()), fun()) -> mqueue()).
  76. new(Name, Opts, AlarmFun) ->
  77. Type = get_value(type, Opts, simple),
  78. MaxLen = get_value(max_length, Opts, infinity),
  79. init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
  80. len = 0, max_len = MaxLen,
  81. low_wm = low_wm(MaxLen, Opts),
  82. high_wm = high_wm(MaxLen, Opts),
  83. qos0 = get_value(queue_qos0, Opts, false),
  84. alarm_fun = AlarmFun}, Opts).
  85. init_q(MQ = #mqueue{type = simple}, _Opts) ->
  86. MQ#mqueue{q = queue:new()};
  87. init_q(MQ = #mqueue{type = priority}, Opts) ->
  88. Priorities = get_value(priority, Opts, []),
  89. init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
  90. init_p([], MQ) ->
  91. MQ;
  92. init_p([{Topic, P} | L], MQ) ->
  93. {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
  94. init_p(L, MQ1).
  95. insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
  96. <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
  97. {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
  98. low_wm(infinity, _Opts) ->
  99. infinity;
  100. low_wm(MaxLen, Opts) ->
  101. round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)).
  102. high_wm(infinity, _Opts) ->
  103. infinity;
  104. high_wm(MaxLen, Opts) ->
  105. round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
  106. -spec(name(mqueue()) -> iolist()).
  107. name(#mqueue{name = Name}) ->
  108. Name.
  109. -spec(type(mqueue()) -> atom()).
  110. type(#mqueue{type = Type}) ->
  111. Type.
  112. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
  113. is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q).
  114. len(#mqueue{type = simple, len = Len}) -> Len;
  115. len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q).
  116. max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
  117. %% @doc Stats of the mqueue
  118. -spec(stats(mqueue()) -> [stat()]).
  119. stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
  120. [{len, case Type of
  121. simple -> Len;
  122. priority -> priority_queue:len(Q)
  123. end} | [{max_len, MaxLen}, {dropped, Dropped}]].
  124. %% @doc Enqueue a message.
  125. -spec(in(mqtt_message(), mqueue()) -> mqueue()).
  126. in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
  127. MQ;
  128. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
  129. MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
  130. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
  131. when Len >= MaxLen ->
  132. {{value, _Old}, Q2} = queue:out(Q),
  133. MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
  134. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  135. maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
  136. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  137. priorities = Priorities,
  138. max_len = infinity}) ->
  139. case lists:keysearch(Topic, 1, Priorities) of
  140. {value, {_, Pri}} ->
  141. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)};
  142. false ->
  143. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  144. MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  145. end;
  146. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  147. priorities = Priorities,
  148. max_len = MaxLen}) ->
  149. case lists:keysearch(Topic, 1, Priorities) of
  150. {value, {_, Pri}} ->
  151. case priority_queue:plen(Pri, Q) >= MaxLen of
  152. true ->
  153. {_, Q1} = priority_queue:out(Pri, Q),
  154. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)};
  155. false ->
  156. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  157. end;
  158. false ->
  159. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  160. MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  161. end.
  162. out(MQ = #mqueue{type = simple, len = 0}) ->
  163. {empty, MQ};
  164. out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
  165. {R, Q2} = queue:out(Q),
  166. {R, MQ#mqueue{q = Q2, len = Len - 1}};
  167. out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  168. {R, Q2} = queue:out(Q),
  169. {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
  170. out(MQ = #mqueue{type = priority, q = Q}) ->
  171. {R, Q2} = priority_queue:out(Q),
  172. {R, MQ#mqueue{q = Q2}}.
  173. maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
  174. when Len > HighWM ->
  175. Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
  176. severity = warning,
  177. title = io_lib:format("Queue ~s high-water mark", [Name]),
  178. summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
  179. MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
  180. maybe_set_alarm(MQ) ->
  181. MQ.
  182. maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
  183. when Len < LowWM ->
  184. MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
  185. maybe_clear_alarm(MQ) ->
  186. MQ.