emqx_mqueue.erl 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc A Simple in-memory message queue.
  18. %%
  19. %% Notice that MQTT is not a (on-disk) persistent messaging queue.
  20. %% It assumes that clients should be online in most of the time.
  21. %%
  22. %% This module implements a simple in-memory queue for MQTT persistent session.
  23. %%
  24. %% If the broker restarts or crashes, all queued messages will be lost.
  25. %%
  26. %% Concept of Message Queue and Inflight Window:
  27. %%
  28. %% |<----------------- Max Len ----------------->|
  29. %% -----------------------------------------------
  30. %% IN -> | Messages Queue | Inflight Window | -> Out
  31. %% -----------------------------------------------
  32. %% |<--- Win Size --->|
  33. %%
  34. %%
  35. %% 1. Inflight Window is to store the messages
  36. %% that are delivered but still awaiting for puback.
  37. %%
  38. %% 2. Messages are enqueued to tail when the inflight window is full.
  39. %%
  40. %% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
  41. %% in init options
  42. %%
  43. %% 4. If the queue is full, drop the oldest one
  44. %% unless `max_len' is set to `0' which implies (`infinity').
  45. %%
  46. %% @end
  47. %%--------------------------------------------------------------------
  48. -module(emqx_mqueue).
  49. -include("emqx.hrl").
  50. -include("types.hrl").
  51. -include("emqx_mqtt.hrl").
  52. -export([
  53. init/1,
  54. info/1,
  55. info/2
  56. ]).
  57. -export([
  58. is_empty/1,
  59. len/1,
  60. max_len/1,
  61. in/2,
  62. out/1,
  63. stats/1,
  64. dropped/1,
  65. to_list/1,
  66. filter/2
  67. ]).
  68. -define(NO_PRIORITY_TABLE, disabled).
  69. -export_type([mqueue/0, options/0]).
  70. -type priority() :: infinity | integer().
  71. -type pq() :: emqx_pqueue:q().
  72. -type count() :: non_neg_integer().
  73. -type p_table() :: ?NO_PRIORITY_TABLE | #{emqx_types:topic() := priority()}.
  74. -type options() :: #{
  75. max_len := count(),
  76. priorities => p_table(),
  77. default_priority => highest | lowest,
  78. store_qos0 => boolean()
  79. }.
  80. -type message() :: emqx_types:message().
  81. -type stat() ::
  82. {len, non_neg_integer()}
  83. | {max_len, non_neg_integer()}
  84. | {dropped, non_neg_integer()}.
  85. -define(PQUEUE, emqx_pqueue).
  86. -define(LOWEST_PRIORITY, 0).
  87. -define(HIGHEST_PRIORITY, infinity).
  88. -define(MAX_LEN_INFINITY, 0).
  89. -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
  90. -record(shift_opts, {
  91. multiplier :: non_neg_integer(),
  92. base :: integer()
  93. }).
  94. -record(mqueue, {
  95. store_qos0 = false :: boolean(),
  96. max_len = ?MAX_LEN_INFINITY :: count(),
  97. len = 0 :: count(),
  98. dropped = 0 :: count(),
  99. p_table = ?NO_PRIORITY_TABLE :: p_table(),
  100. default_p = ?LOWEST_PRIORITY :: priority(),
  101. q = emqx_pqueue:new() :: pq(),
  102. shift_opts :: #shift_opts{},
  103. last_prio :: non_neg_integer() | undefined,
  104. p_credit :: non_neg_integer() | undefined
  105. }).
  106. -type mqueue() :: #mqueue{}.
  107. -spec init(options()) -> mqueue().
  108. init(Opts = #{max_len := MaxLen0, store_qos0 := Qos0}) ->
  109. MaxLen =
  110. case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
  111. true -> MaxLen0;
  112. false -> ?MAX_LEN_INFINITY
  113. end,
  114. #mqueue{
  115. max_len = MaxLen,
  116. store_qos0 = Qos0,
  117. p_table = p_table(get_opt(priorities, Opts, ?NO_PRIORITY_TABLE)),
  118. default_p = get_priority_opt(Opts),
  119. shift_opts = get_shift_opt(Opts)
  120. }.
  121. -spec info(mqueue()) -> emqx_types:infos().
  122. info(MQ) ->
  123. maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
  124. -spec info(atom(), mqueue()) -> term().
  125. info(store_qos0, #mqueue{store_qos0 = True}) ->
  126. True;
  127. info(max_len, #mqueue{max_len = MaxLen}) ->
  128. MaxLen;
  129. info(len, #mqueue{len = Len}) ->
  130. Len;
  131. info(dropped, #mqueue{dropped = Dropped}) ->
  132. Dropped.
  133. is_empty(#mqueue{len = Len}) -> Len =:= 0.
  134. len(#mqueue{len = Len}) -> Len.
  135. max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
  136. %% @doc Return all queued items in a list.
  137. -spec to_list(mqueue()) -> list().
  138. to_list(MQ) ->
  139. to_list(MQ, []).
  140. -spec filter(fun((any()) -> boolean()), mqueue()) -> mqueue().
  141. filter(_Pred, #mqueue{len = 0} = MQ) ->
  142. MQ;
  143. filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
  144. Q2 = ?PQUEUE:filter(Pred, Q),
  145. case ?PQUEUE:len(Q2) of
  146. Len ->
  147. MQ;
  148. Len2 ->
  149. Diff = Len - Len2,
  150. MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
  151. end.
  152. to_list(MQ, Acc) ->
  153. case out(MQ) of
  154. {empty, _MQ} ->
  155. lists:reverse(Acc);
  156. {{value, Msg}, Q1} ->
  157. to_list(Q1, [Msg | Acc])
  158. end.
  159. %% @doc Return number of dropped messages.
  160. -spec dropped(mqueue()) -> count().
  161. dropped(#mqueue{dropped = Dropped}) -> Dropped.
  162. %% @doc Stats of the mqueue
  163. -spec stats(mqueue()) -> [stat()].
  164. stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
  165. [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
  166. %% @doc Enqueue a message.
  167. -spec in(message(), mqueue()) -> {maybe(message()), mqueue()}.
  168. in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
  169. {_Dropped = Msg, MQ};
  170. in(
  171. Msg = #message{topic = Topic},
  172. MQ =
  173. #mqueue{
  174. default_p = Dp,
  175. p_table = PTab,
  176. q = Q,
  177. len = Len,
  178. max_len = MaxLen,
  179. dropped = Dropped
  180. } = MQ
  181. ) ->
  182. Priority = get_priority(Topic, PTab, Dp),
  183. PLen = ?PQUEUE:plen(Priority, Q),
  184. case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
  185. true ->
  186. %% reached max length, drop the oldest message
  187. {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
  188. Q2 = ?PQUEUE:in(Msg, Priority, Q1),
  189. {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
  190. false ->
  191. {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
  192. end.
  193. -spec out(mqueue()) -> {empty | {value, message()}, mqueue()}.
  194. out(MQ = #mqueue{len = 0, q = Q}) ->
  195. %% assert, in this case, ?PQUEUE:len should be very cheap
  196. 0 = ?PQUEUE:len(Q),
  197. {empty, MQ};
  198. out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts}) ->
  199. %% Shouldn't fail, since we've checked the length
  200. {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q),
  201. MQ1 = MQ#mqueue{
  202. q = Q1,
  203. len = Len - 1,
  204. last_prio = Prio,
  205. p_credit = get_credits(Prio, ShiftOpts)
  206. },
  207. {{value, Val}, MQ1};
  208. out(MQ = #mqueue{q = Q, p_credit = 0}) ->
  209. MQ1 = MQ#mqueue{
  210. q = ?PQUEUE:shift(Q),
  211. last_prio = undefined
  212. },
  213. out(MQ1);
  214. out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) ->
  215. {R, Q1} = ?PQUEUE:out(Q),
  216. {R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}.
  217. get_opt(Key, Opts, Default) ->
  218. case maps:get(Key, Opts, Default) of
  219. undefined -> Default;
  220. X -> X
  221. end.
  222. get_priority_opt(Opts) ->
  223. case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
  224. lowest -> ?LOWEST_PRIORITY;
  225. highest -> ?HIGHEST_PRIORITY;
  226. N when is_integer(N) -> N
  227. end.
  228. %% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
  229. %% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
  230. %% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
  231. %% while the highest 'infinity' is a [{infinity, queue:queue()}]
  232. get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
  233. get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
  234. get_credits(?HIGHEST_PRIORITY, Opts) ->
  235. Infinity = 1000000,
  236. get_credits(Infinity, Opts);
  237. get_credits(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
  238. (Prio + Base + 1) * Mult - 1.
  239. get_shift_opt(Opts) ->
  240. %% Using 10 as a multiplier by default. This is needed to minimize
  241. %% overhead of ?PQUEUE:rotate
  242. Mult = maps:get(shift_multiplier, Opts, 10),
  243. true = is_integer(Mult) andalso Mult > 0,
  244. Min =
  245. case Opts of
  246. #{p_table := PTab} ->
  247. case maps:size(PTab) of
  248. 0 -> 0;
  249. _ -> lists:min(maps:values(PTab))
  250. end;
  251. _ ->
  252. ?LOWEST_PRIORITY
  253. end,
  254. %% `mqueue' module supports negative priorities, but we don't want
  255. %% the counter to be negative, so all priorities should be shifted
  256. %% by a constant, if negative priorities are used:
  257. Base =
  258. case Min < 0 of
  259. true -> -Min;
  260. false -> 0
  261. end,
  262. #shift_opts{
  263. multiplier = Mult,
  264. base = Base
  265. }.
  266. %% topic from mqtt.mqueue_priorities(map()) is atom.
  267. p_table(PTab = #{}) ->
  268. maps:fold(
  269. fun
  270. (Topic, Priority, Acc) when is_atom(Topic) ->
  271. maps:put(atom_to_binary(Topic), Priority, Acc);
  272. (Topic, Priority, Acc) when is_binary(Topic) ->
  273. maps:put(Topic, Priority, Acc)
  274. end,
  275. #{},
  276. PTab
  277. );
  278. p_table(PTab) ->
  279. PTab.