emqttd_mqueue.erl 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc A Simple in-memory message queue.
  17. %%
  18. %% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
  19. %% should be online in most of the time.
  20. %%
  21. %% This module implements a simple in-memory queue for MQTT persistent session.
  22. %%
  23. %% If the broker restarted or crashed, all the messages queued will be gone.
  24. %%
  25. %% Concept of Message Queue and Inflight Window:
  26. %%
  27. %% |<----------------- Max Len ----------------->|
  28. %% -----------------------------------------------
  29. %% IN -> | Messages Queue | Inflight Window | -> Out
  30. %% -----------------------------------------------
  31. %% |<--- Win Size --->|
  32. %%
  33. %%
  34. %% 1. Inflight Window to store the messages delivered and awaiting for puback.
  35. %%
  36. %% 2. Enqueue messages when the inflight window is full.
  37. %%
  38. %% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
  39. %% otherwise dropped the oldest one.
  40. %%
  41. %% @end
  42. -module(emqttd_mqueue).
  43. -author("Feng Lee <feng@emqtt.io>").
  44. -include("emqttd.hrl").
  45. -include("emqttd_protocol.hrl").
  46. -import(proplists, [get_value/3]).
  47. -export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1,
  48. dropped/1, stats/1]).
  49. -define(LOW_WM, 0.2).
  50. -define(HIGH_WM, 0.6).
  51. -define(PQUEUE, priority_queue).
  52. -type(priority() :: {iolist(), pos_integer()}).
  53. -type(option() :: {type, simple | priority}
  54. | {max_length, non_neg_integer()} %% Max queue length
  55. | {priority, list(priority())}
  56. | {low_watermark, float()} %% Low watermark
  57. | {high_watermark, float()} %% High watermark
  58. | {store_qos0, boolean()}). %% Queue Qos0?
  59. -type(stat() :: {max_len, non_neg_integer()}
  60. | {len, non_neg_integer()}
  61. | {dropped, non_neg_integer()}).
  62. -record(mqueue, {type :: simple | priority,
  63. name, q :: queue:queue() | ?PQUEUE:q(),
  64. %% priority table
  65. pseq = 0, priorities = [],
  66. %% len of simple queue
  67. len = 0, max_len = 0,
  68. low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
  69. qos0 = false, dropped = 0,
  70. alarm_fun}).
  71. -type(mqueue() :: #mqueue{}).
  72. -export_type([mqueue/0, priority/0, option/0]).
  73. %% @doc New Queue.
  74. -spec(new(iolist(), list(option()), fun()) -> mqueue()).
  75. new(Name, Opts, AlarmFun) ->
  76. Type = get_value(type, Opts, simple),
  77. MaxLen = get_value(max_length, Opts, 0),
  78. init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
  79. len = 0, max_len = MaxLen,
  80. low_wm = low_wm(MaxLen, Opts),
  81. high_wm = high_wm(MaxLen, Opts),
  82. qos0 = get_value(store_qos0, Opts, false),
  83. alarm_fun = AlarmFun}, Opts).
  84. init_q(MQ = #mqueue{type = simple}, _Opts) ->
  85. MQ#mqueue{q = queue:new()};
  86. init_q(MQ = #mqueue{type = priority}, Opts) ->
  87. Priorities = get_value(priority, Opts, []),
  88. init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
  89. init_p([], MQ) ->
  90. MQ;
  91. init_p([{Topic, P} | L], MQ) ->
  92. {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
  93. init_p(L, MQ1).
  94. insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
  95. <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
  96. {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
  97. low_wm(0, _Opts) ->
  98. undefined;
  99. low_wm(MaxLen, Opts) ->
  100. round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)).
  101. high_wm(0, _Opts) ->
  102. undefined;
  103. high_wm(MaxLen, Opts) ->
  104. round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
  105. -spec(name(mqueue()) -> iolist()).
  106. name(#mqueue{name = Name}) ->
  107. Name.
  108. -spec(type(mqueue()) -> atom()).
  109. type(#mqueue{type = Type}) ->
  110. Type.
  111. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
  112. is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).
  113. len(#mqueue{type = simple, len = Len}) -> Len;
  114. len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q).
  115. max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
  116. %% @doc Dropped of the mqueue
  117. -spec(dropped(mqueue()) -> non_neg_integer()).
  118. dropped(#mqueue{dropped = Dropped}) -> Dropped.
  119. %% @doc Stats of the mqueue
  120. -spec(stats(mqueue()) -> [stat()]).
  121. stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
  122. [{len, case Type of
  123. simple -> Len;
  124. priority -> ?PQUEUE:len(Q)
  125. end} | [{max_len, MaxLen}, {dropped, Dropped}]].
  126. %% @doc Enqueue a message.
  127. -spec(in(mqtt_message(), mqueue()) -> mqueue()).
  128. in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
  129. MQ;
  130. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
  131. MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
  132. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
  133. when Len >= MaxLen ->
  134. {{value, _Old}, Q2} = queue:out(Q),
  135. MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
  136. in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  137. maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
  138. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  139. priorities = Priorities,
  140. max_len = 0}) ->
  141. case lists:keysearch(Topic, 1, Priorities) of
  142. {value, {_, Pri}} ->
  143. MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)};
  144. false ->
  145. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  146. MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
  147. end;
  148. in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
  149. priorities = Priorities,
  150. max_len = MaxLen}) ->
  151. case lists:keysearch(Topic, 1, Priorities) of
  152. {value, {_, Pri}} ->
  153. case ?PQUEUE:plen(Pri, Q) >= MaxLen of
  154. true ->
  155. {_, Q1} = ?PQUEUE:out(Pri, Q),
  156. MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)};
  157. false ->
  158. MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
  159. end;
  160. false ->
  161. {Pri, MQ1} = insert_p(Topic, 0, MQ),
  162. MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
  163. end.
  164. out(MQ = #mqueue{type = simple, len = 0}) ->
  165. {empty, MQ};
  166. out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
  167. {R, Q2} = queue:out(Q),
  168. {R, MQ#mqueue{q = Q2, len = Len - 1}};
  169. out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
  170. {R, Q2} = queue:out(Q),
  171. {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
  172. out(MQ = #mqueue{type = priority, q = Q}) ->
  173. {R, Q2} = ?PQUEUE:out(Q),
  174. {R, MQ#mqueue{q = Q2}}.
  175. maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) ->
  176. MQ;
  177. maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
  178. when Len > HighWM ->
  179. Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
  180. severity = warning,
  181. title = io_lib:format("Queue ~s high-water mark", [Name]),
  182. summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
  183. MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
  184. maybe_set_alarm(MQ) ->
  185. MQ.
  186. maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) ->
  187. MQ;
  188. maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
  189. when Len < LowWM ->
  190. MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
  191. maybe_clear_alarm(MQ) ->
  192. MQ.