emqttd_mqueue.erl 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc A Simple in-memory message queue.
  17. %%
  18. %% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
  19. %% should be online in most of the time.
  20. %%
  21. %% This module implements a simple in-memory queue for MQTT persistent session.
  22. %%
  23. %% If the broker restarted or crashed, all the messages queued will be gone.
  24. %%
  25. %% Concept of Message Queue and Inflight Window:
  26. %%
  27. %% |<----------------- Max Len ----------------->|
  28. %% -----------------------------------------------
  29. %% IN -> | Messages Queue | Inflight Window | -> Out
  30. %% -----------------------------------------------
  31. %% |<--- Win Size --->|
  32. %%
  33. %%
  34. %% 1. Inflight Window to store the messages delivered and awaiting for puback.
  35. %%
  36. %% 2. Enqueue messages when the inflight window is full.
  37. %%
  38. %% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
  39. %% otherwise dropped the oldest one.
  40. %%
  41. %% @end
  42. -module(emqttd_mqueue).
  43. -author("Feng Lee <feng@emqtt.io>").
  44. -include("emqttd.hrl").
  45. -include("emqttd_protocol.hrl").
  46. -import(proplists, [get_value/3]).
  47. -export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1,
  48. dropped/1, stats/1]).
  49. -define(LOW_WM, 0.2).
  50. -define(HIGH_WM, 0.6).
  51. -type(priority() :: {iolist(), pos_integer()}).
  52. -type(option() :: {type, simple | priority}
  53. | {max_length, pos_integer() | infinity}
  54. | {priority, list(priority())}
  55. | {low_watermark, float()} %% Low watermark
  56. | {high_watermark, float()} %% High watermark
  57. | {queue_qos0, boolean()}). %% Queue Qos0?
  58. -type(stat() :: {max_len, infinity | pos_integer()}
  59. | {len, non_neg_integer()}
  60. | {dropped, non_neg_integer()}).
  61. -record(mqueue, {type :: simple | priority,
  62. name, q :: queue:queue() | priority_queue:q(),
  63. %% priority table
  64. pseq = 0, priorities = [],
  65. %% len of simple queue
  66. len = 0, max_len = infinity,
  67. low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
  68. qos0 = false, dropped = 0,
  69. alarm_fun}).
  70. -type(mqueue() :: #mqueue{}).
  71. -export_type([mqueue/0, priority/0, option/0]).
  72. %% @doc New Queue.
  73. -spec(new(iolist(), list(option()), fun()) -> mqueue()).
  74. new(Name, Opts, AlarmFun) ->
  75. Type = get_value(type, Opts, simple),
  76. MaxLen = get_value(max_length, Opts, infinity),
  77. init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
  78. len = 0, max_len = MaxLen,
  79. low_wm = low_wm(MaxLen, Opts),
  80. high_wm = high_wm(MaxLen, Opts),
  81. qos0 = get_value(queue_qos0, Opts, false),
  82. alarm_fun = AlarmFun}, Opts).
  83. init_q(MQ = #mqueue{type = simple}, _Opts) ->
  84. MQ#mqueue{q = queue:new()};
  85. init_q(MQ = #mqueue{type = priority}, Opts) ->
  86. Priorities = get_value(priority, Opts, []),
  87. init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
  88. init_p([], MQ) ->
  89. MQ;
  90. init_p([{Topic, P} | L], MQ) ->
  91. {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
  92. init_p(L, MQ1).
  93. insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
  94. <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
  95. {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
  96. low_wm(infinity, _Opts) ->
  97. infinity;
  98. low_wm(MaxLen, Opts) ->
  99. round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)).
  100. high_wm(infinity, _Opts) ->
  101. infinity;
  102. high_wm(MaxLen, Opts) ->
  103. round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
  104. -spec(name(mqueue()) -> iolist()).
  105. name(#mqueue{name = Name}) ->
  106. Name.
  107. -spec(type(mqueue()) -> atom()).
  108. type(#mqueue{type = Type}) ->
  109. Type.
  110. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
  111. is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q).
  112. len(#mqueue{type = simple, len = Len}) -> Len;
  113. len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q).
  114. max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
  115. %% @doc Dropped of the mqueue
  116. -spec(dropped(mqueue()) -> non_neg_integer()).
  117. dropped(#mqueue{dropped = Dropped}) -> Dropped.
  118. %% @doc Stats of the mqueue
  119. -spec(stats(mqueue()) -> [stat()]).
  120. stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
  121. [{len, case Type of
  122. simple -> Len;
  123. priority -> priority_queue:len(Q)
  124. end} | [{max_len, MaxLen}, {dropped, Dropped}]].
  125. %% @doc Enqueue a message.
  126. -spec(in(mqtt_message(), mqueue()) -> mqueue()).
  127. in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
  128. MQ;
  129. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
  130. MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
  131. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
  132. when Len >= MaxLen ->
  133. {{value, _Old}, Q2} = queue:out(Q),
  134. MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
  135. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  136. maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
  137. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  138. priorities = Priorities,
  139. max_len = infinity}) ->
  140. case lists:keysearch(Topic, 1, Priorities) of
  141. {value, {_, Pri}} ->
  142. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)};
  143. false ->
  144. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  145. MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  146. end;
  147. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  148. priorities = Priorities,
  149. max_len = MaxLen}) ->
  150. case lists:keysearch(Topic, 1, Priorities) of
  151. {value, {_, Pri}} ->
  152. case priority_queue:plen(Pri, Q) >= MaxLen of
  153. true ->
  154. {_, Q1} = priority_queue:out(Pri, Q),
  155. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)};
  156. false ->
  157. MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  158. end;
  159. false ->
  160. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  161. MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
  162. end.
  163. out(MQ = #mqueue{type = simple, len = 0}) ->
  164. {empty, MQ};
  165. out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
  166. {R, Q2} = queue:out(Q),
  167. {R, MQ#mqueue{q = Q2, len = Len - 1}};
  168. out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  169. {R, Q2} = queue:out(Q),
  170. {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
  171. out(MQ = #mqueue{type = priority, q = Q}) ->
  172. {R, Q2} = priority_queue:out(Q),
  173. {R, MQ#mqueue{q = Q2}}.
  174. maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
  175. when Len > HighWM ->
  176. Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
  177. severity = warning,
  178. title = io_lib:format("Queue ~s high-water mark", [Name]),
  179. summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
  180. MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
  181. maybe_set_alarm(MQ) ->
  182. MQ.
  183. maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
  184. when Len < LowWM ->
  185. MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
  186. maybe_clear_alarm(MQ) ->
  187. MQ.