emqx_mqueue.erl 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc A Simple in-memory message queue.
  18. %%
  19. %% Notice that MQTT is not a (on-disk) persistent messaging queue.
  20. %% It assumes that clients should be online in most of the time.
  21. %%
  22. %% This module implements a simple in-memory queue for MQTT persistent session.
  23. %%
  24. %% If the broker restarts or crashes, all queued messages will be lost.
  25. %%
  26. %% Concept of Message Queue and Inflight Window:
  27. %%
  28. %% |<----------------- Max Len ----------------->|
  29. %% -----------------------------------------------
  30. %% IN -> | Messages Queue | Inflight Window | -> Out
  31. %% -----------------------------------------------
  32. %% |<--- Win Size --->|
  33. %%
  34. %%
  35. %% 1. Inflight Window is to store the messages
  36. %% that are delivered but still awaiting for puback.
  37. %%
  38. %% 2. Messages are enqueued to tail when the inflight window is full.
  39. %%
  40. %% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
  41. %% in init options
  42. %%
  43. %% 4. If the queue is full, drop the oldest one
  44. %% unless `max_len' is set to `0' which implies (`infinity').
  45. %%
  46. %% @end
  47. %%--------------------------------------------------------------------
  48. -module(emqx_mqueue).
  49. -include("emqx.hrl").
  50. -include("types.hrl").
  51. -include("emqx_mqtt.hrl").
  52. -export([
  53. init/1,
  54. info/1,
  55. info/2
  56. ]).
  57. -export([
  58. is_empty/1,
  59. len/1,
  60. max_len/1,
  61. in/2,
  62. out/1,
  63. stats/1,
  64. dropped/1
  65. ]).
  66. -define(NO_PRIORITY_TABLE, disabled).
  67. -export_type([mqueue/0, options/0]).
  68. -type topic() :: emqx_types:topic().
  69. -type priority() :: infinity | integer().
  70. -type pq() :: emqx_pqueue:q().
  71. -type count() :: non_neg_integer().
  72. -type p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}.
  73. -type options() :: #{
  74. max_len := count(),
  75. priorities => p_table(),
  76. default_priority => highest | lowest,
  77. store_qos0 => boolean()
  78. }.
  79. -type message() :: emqx_types:message().
  80. -type stat() ::
  81. {len, non_neg_integer()}
  82. | {max_len, non_neg_integer()}
  83. | {dropped, non_neg_integer()}.
  84. -define(PQUEUE, emqx_pqueue).
  85. -define(LOWEST_PRIORITY, 0).
  86. -define(HIGHEST_PRIORITY, infinity).
  87. -define(MAX_LEN_INFINITY, 0).
  88. -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
  89. -record(shift_opts, {
  90. multiplier :: non_neg_integer(),
  91. base :: integer()
  92. }).
  93. -record(mqueue, {
  94. store_qos0 = false :: boolean(),
  95. max_len = ?MAX_LEN_INFINITY :: count(),
  96. len = 0 :: count(),
  97. dropped = 0 :: count(),
  98. p_table = ?NO_PRIORITY_TABLE :: p_table(),
  99. default_p = ?LOWEST_PRIORITY :: priority(),
  100. q = ?PQUEUE:new() :: pq(),
  101. shift_opts :: #shift_opts{},
  102. last_prio :: non_neg_integer() | undefined,
  103. p_credit :: non_neg_integer() | undefined
  104. }).
  105. -type mqueue() :: #mqueue{}.
  106. -spec init(options()) -> mqueue().
  107. init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
  108. MaxLen =
  109. case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
  110. true -> MaxLen0;
  111. false -> ?MAX_LEN_INFINITY
  112. end,
  113. #mqueue{
  114. max_len = MaxLen,
  115. store_qos0 = QoS_0,
  116. p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
  117. default_p = get_priority_opt(Opts),
  118. shift_opts = get_shift_opt(Opts)
  119. }.
  120. -spec info(mqueue()) -> emqx_types:infos().
  121. info(MQ) ->
  122. maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
  123. -spec info(atom(), mqueue()) -> term().
  124. info(store_qos0, #mqueue{store_qos0 = True}) ->
  125. True;
  126. info(max_len, #mqueue{max_len = MaxLen}) ->
  127. MaxLen;
  128. info(len, #mqueue{len = Len}) ->
  129. Len;
  130. info(dropped, #mqueue{dropped = Dropped}) ->
  131. Dropped.
  132. is_empty(#mqueue{len = Len}) -> Len =:= 0.
  133. len(#mqueue{len = Len}) -> Len.
  134. max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
  135. %% @doc Return number of dropped messages.
  136. -spec dropped(mqueue()) -> count().
  137. dropped(#mqueue{dropped = Dropped}) -> Dropped.
  138. %% @doc Stats of the mqueue
  139. -spec stats(mqueue()) -> [stat()].
  140. stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
  141. [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
  142. %% @doc Enqueue a message.
  143. -spec in(message(), mqueue()) -> {maybe(message()), mqueue()}.
  144. in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
  145. {_Dropped = Msg, MQ};
  146. in(
  147. Msg = #message{topic = Topic},
  148. MQ =
  149. #mqueue{
  150. default_p = Dp,
  151. p_table = PTab,
  152. q = Q,
  153. len = Len,
  154. max_len = MaxLen,
  155. dropped = Dropped
  156. } = MQ
  157. ) ->
  158. Priority = get_priority(Topic, PTab, Dp),
  159. PLen = ?PQUEUE:plen(Priority, Q),
  160. case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
  161. true ->
  162. %% reached max length, drop the oldest message
  163. {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
  164. Q2 = ?PQUEUE:in(Msg, Priority, Q1),
  165. {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
  166. false ->
  167. {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
  168. end.
  169. -spec out(mqueue()) -> {empty | {value, message()}, mqueue()}.
  170. out(MQ = #mqueue{len = 0, q = Q}) ->
  171. %% assert, in this case, ?PQUEUE:len should be very cheap
  172. 0 = ?PQUEUE:len(Q),
  173. {empty, MQ};
  174. out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts}) ->
  175. %% Shouldn't fail, since we've checked the length
  176. {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q),
  177. MQ1 = MQ#mqueue{
  178. q = Q1,
  179. len = Len - 1,
  180. last_prio = Prio,
  181. p_credit = get_credits(Prio, ShiftOpts)
  182. },
  183. {{value, Val}, MQ1};
  184. out(MQ = #mqueue{q = Q, p_credit = 0}) ->
  185. MQ1 = MQ#mqueue{
  186. q = ?PQUEUE:shift(Q),
  187. last_prio = undefined
  188. },
  189. out(MQ1);
  190. out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) ->
  191. {R, Q1} = ?PQUEUE:out(Q),
  192. {R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}.
  193. get_opt(Key, Opts, Default) ->
  194. case maps:get(Key, Opts, Default) of
  195. undefined -> Default;
  196. X -> X
  197. end.
  198. get_priority_opt(Opts) ->
  199. case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
  200. lowest -> ?LOWEST_PRIORITY;
  201. highest -> ?HIGHEST_PRIORITY;
  202. N when is_integer(N) -> N
  203. end.
  204. %% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
  205. %% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
  206. %% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
  207. %% while the highest 'infinity' is a [{infinity, queue:queue()}]
  208. get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
  209. get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
  210. get_credits(?HIGHEST_PRIORITY, Opts) ->
  211. Infinity = 1000000,
  212. get_credits(Infinity, Opts);
  213. get_credits(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
  214. (Prio + Base + 1) * Mult - 1.
  215. get_shift_opt(Opts) ->
  216. %% Using 10 as a multiplier by default. This is needed to minimize
  217. %% overhead of ?PQUEUE:rotate
  218. Mult = maps:get(shift_multiplier, Opts, 10),
  219. true = is_integer(Mult) andalso Mult > 0,
  220. Min =
  221. case Opts of
  222. #{p_table := PTab} ->
  223. case maps:size(PTab) of
  224. 0 -> 0;
  225. _ -> lists:min(maps:values(PTab))
  226. end;
  227. _ ->
  228. ?LOWEST_PRIORITY
  229. end,
  230. %% `mqueue' module supports negative priorities, but we don't want
  231. %% the counter to be negative, so all priorities should be shifted
  232. %% by a constant, if negative priorities are used:
  233. Base =
  234. case Min < 0 of
  235. true -> -Min;
  236. false -> 0
  237. end,
  238. #shift_opts{
  239. multiplier = Mult,
  240. base = Base
  241. }.