emqx_mqueue.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc A Simple in-memory message queue.
  18. %%
  19. %% Notice that MQTT is not a (on-disk) persistent messaging queue.
  20. %% It assumes that clients should be online in most of the time.
  21. %%
  22. %% This module implements a simple in-memory queue for MQTT persistent session.
  23. %%
  24. %% If the broker restarts or crashes, all queued messages will be lost.
  25. %%
  26. %% Concept of Message Queue and Inflight Window:
  27. %%
  28. %% |<----------------- Max Len ----------------->|
  29. %% -----------------------------------------------
  30. %% IN -> | Messages Queue | Inflight Window | -> Out
  31. %% -----------------------------------------------
  32. %% |<--- Win Size --->|
  33. %%
  34. %%
  35. %% 1. Inflight Window is to store the messages
  36. %% that are delivered but still awaiting for puback.
  37. %%
  38. %% 2. Messages are enqueued to tail when the inflight window is full.
  39. %%
  40. %% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
  41. %% in init options
  42. %%
  43. %% 4. If the queue is full, drop the oldest one
  44. %% unless `max_len' is set to `0' which implies (`infinity').
  45. %%
  46. %% @end
  47. %%--------------------------------------------------------------------
  48. -module(emqx_mqueue).
  49. -include("emqx.hrl").
  50. -include("types.hrl").
  51. -include("emqx_mqtt.hrl").
  52. -export([ init/1
  53. , info/1
  54. , info/2
  55. ]).
  56. -export([ is_empty/1
  57. , len/1
  58. , max_len/1
  59. , in/2
  60. , out/1
  61. , stats/1
  62. , dropped/1
  63. ]).
  64. -export_type([mqueue/0, options/0]).
  65. -type(topic() :: emqx_topic:topic()).
  66. -type(priority() :: infinity | integer()).
  67. -type(pq() :: emqx_pqueue:q()).
  68. -type(count() :: non_neg_integer()).
  69. -type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}).
  70. -type(options() :: #{max_len := count(),
  71. priorities => p_table(),
  72. default_priority => highest | lowest,
  73. store_qos0 => boolean()
  74. }).
  75. -type(message() :: emqx_types:message()).
  76. -type(stat() :: {len, non_neg_integer()}
  77. | {max_len, non_neg_integer()}
  78. | {dropped, non_neg_integer()}).
  79. -define(PQUEUE, emqx_pqueue).
  80. -define(LOWEST_PRIORITY, 0).
  81. -define(HIGHEST_PRIORITY, infinity).
  82. -define(MAX_LEN_INFINITY, 0).
  83. -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
  84. -record(mqueue, {
  85. store_qos0 = false :: boolean(),
  86. max_len = ?MAX_LEN_INFINITY :: count(),
  87. len = 0 :: count(),
  88. dropped = 0 :: count(),
  89. p_table = ?NO_PRIORITY_TABLE :: p_table(),
  90. default_p = ?LOWEST_PRIORITY :: priority(),
  91. q = ?PQUEUE:new() :: pq()
  92. }).
  93. -type(mqueue() :: #mqueue{}).
  94. -spec(init(options()) -> mqueue()).
  95. init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
  96. MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
  97. true -> MaxLen0;
  98. false -> ?MAX_LEN_INFINITY
  99. end,
  100. #mqueue{max_len = MaxLen,
  101. store_qos0 = QoS_0,
  102. p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
  103. default_p = get_priority_opt(Opts)
  104. }.
  105. -spec(info(mqueue()) -> emqx_types:infos()).
  106. info(MQ) ->
  107. maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
  108. -spec(info(atom(), mqueue()) -> term()).
  109. info(store_qos0, #mqueue{store_qos0 = True}) ->
  110. True;
  111. info(max_len, #mqueue{max_len = MaxLen}) ->
  112. MaxLen;
  113. info(len, #mqueue{len = Len}) ->
  114. Len;
  115. info(dropped, #mqueue{dropped = Dropped}) ->
  116. Dropped.
  117. is_empty(#mqueue{len = Len}) -> Len =:= 0.
  118. len(#mqueue{len = Len}) -> Len.
  119. max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
  120. %% @doc Return number of dropped messages.
  121. -spec(dropped(mqueue()) -> count()).
  122. dropped(#mqueue{dropped = Dropped}) -> Dropped.
  123. %% @doc Stats of the mqueue
  124. -spec(stats(mqueue()) -> [stat()]).
  125. stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
  126. [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
  127. %% @doc Enqueue a message.
  128. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
  129. in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
  130. {_Dropped = Msg, MQ};
  131. in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
  132. p_table = PTab,
  133. q = Q,
  134. len = Len,
  135. max_len = MaxLen,
  136. dropped = Dropped
  137. } = MQ) ->
  138. Priority = get_priority(Topic, PTab, Dp),
  139. PLen = ?PQUEUE:plen(Priority, Q),
  140. case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
  141. true ->
  142. %% reached max length, drop the oldest message
  143. {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
  144. Q2 = ?PQUEUE:in(Msg, Priority, Q1),
  145. {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
  146. false ->
  147. {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
  148. end.
  149. -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
  150. out(MQ = #mqueue{len = 0, q = Q}) ->
  151. 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
  152. {empty, MQ};
  153. out(MQ = #mqueue{q = Q, len = Len}) ->
  154. {R, Q1} = ?PQUEUE:out(Q),
  155. {R, MQ#mqueue{q = Q1, len = Len - 1}}.
  156. get_opt(Key, Opts, Default) ->
  157. case maps:get(Key, Opts, Default) of
  158. undefined -> Default;
  159. X -> X
  160. end.
  161. get_priority_opt(Opts) ->
  162. case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
  163. lowest -> ?LOWEST_PRIORITY;
  164. highest -> ?HIGHEST_PRIORITY;
  165. N when is_integer(N) -> N
  166. end.
  167. %% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
  168. %% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
  169. %% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
  170. %% while the highest 'infinity' is a [{infinity, queue:queue()}]
  171. get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
  172. get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).