emqx_mqueue.erl 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %%--------------------------------------------------------------------
  17. %% @doc A Simple in-memory message queue.
  18. %%
  19. %% Notice that MQTT is not a (on-disk) persistent messaging queue.
  20. %% It assumes that clients should be online in most of the time.
  21. %%
  22. %% This module implements a simple in-memory queue for MQTT persistent session.
  23. %%
  24. %% If the broker restarts or crashes, all queued messages will be lost.
  25. %%
  26. %% Concept of Message Queue and Inflight Window:
  27. %%
  28. %% |<----------------- Max Len ----------------->|
  29. %% -----------------------------------------------
  30. %% IN -> | Messages Queue | Inflight Window | -> Out
  31. %% -----------------------------------------------
  32. %% |<--- Win Size --->|
  33. %%
  34. %%
  35. %% 1. Inflight Window is to store the messages
  36. %% that are delivered but still awaiting for puback.
  37. %%
  38. %% 2. Messages are enqueued to tail when the inflight window is full.
  39. %%
  40. %% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
  41. %% in init options
  42. %%
  43. %% 4. If the queue is full, drop the oldest one
  44. %% unless `max_len' is set to `0' which implies (`infinity').
  45. %%
  46. %% @end
  47. %%--------------------------------------------------------------------
  48. -module(emqx_mqueue).
  49. -include("emqx.hrl").
  50. -include("types.hrl").
  51. -include("emqx_mqtt.hrl").
  52. -export([ init/1
  53. , info/1
  54. , info/2
  55. ]).
  56. -export([ is_empty/1
  57. , len/1
  58. , max_len/1
  59. , in/2
  60. , out/1
  61. , stats/1
  62. , dropped/1
  63. ]).
  64. -define(NO_PRIORITY_TABLE, disabled).
  65. -export_type([mqueue/0, options/0]).
  66. -type(topic() :: emqx_types:topic()).
  67. -type(priority() :: infinity | integer()).
  68. -type(pq() :: emqx_pqueue:q()).
  69. -type(count() :: non_neg_integer()).
  70. -type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}).
  71. -type(options() :: #{max_len := count(),
  72. priorities => p_table(),
  73. default_priority => highest | lowest,
  74. store_qos0 => boolean()
  75. }).
  76. -type(message() :: emqx_types:message()).
  77. -type(stat() :: {len, non_neg_integer()}
  78. | {max_len, non_neg_integer()}
  79. | {dropped, non_neg_integer()}).
  80. -define(PQUEUE, emqx_pqueue).
  81. -define(LOWEST_PRIORITY, 0).
  82. -define(HIGHEST_PRIORITY, infinity).
  83. -define(MAX_LEN_INFINITY, 0).
  84. -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
  85. -record(shift_opts, {
  86. multiplier :: non_neg_integer(),
  87. base :: integer()
  88. }).
  89. -record(mqueue, {
  90. store_qos0 = false :: boolean(),
  91. max_len = ?MAX_LEN_INFINITY :: count(),
  92. len = 0 :: count(),
  93. dropped = 0 :: count(),
  94. p_table = ?NO_PRIORITY_TABLE :: p_table(),
  95. default_p = ?LOWEST_PRIORITY :: priority(),
  96. q = ?PQUEUE:new() :: pq(),
  97. shift_opts :: #shift_opts{},
  98. last_prio :: non_neg_integer() | undefined,
  99. p_credit :: non_neg_integer() | undefined
  100. }).
  101. -type(mqueue() :: #mqueue{}).
  102. -spec(init(options()) -> mqueue()).
  103. init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
  104. MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
  105. true -> MaxLen0;
  106. false -> ?MAX_LEN_INFINITY
  107. end,
  108. #mqueue{max_len = MaxLen,
  109. store_qos0 = QoS_0,
  110. p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
  111. default_p = get_priority_opt(Opts),
  112. shift_opts = get_shift_opt(Opts)
  113. }.
  114. -spec(info(mqueue()) -> emqx_types:infos()).
  115. info(MQ) ->
  116. maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
  117. -spec(info(atom(), mqueue()) -> term()).
  118. info(store_qos0, #mqueue{store_qos0 = True}) ->
  119. True;
  120. info(max_len, #mqueue{max_len = MaxLen}) ->
  121. MaxLen;
  122. info(len, #mqueue{len = Len}) ->
  123. Len;
  124. info(dropped, #mqueue{dropped = Dropped}) ->
  125. Dropped.
  126. is_empty(#mqueue{len = Len}) -> Len =:= 0.
  127. len(#mqueue{len = Len}) -> Len.
  128. max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
  129. %% @doc Return number of dropped messages.
  130. -spec(dropped(mqueue()) -> count()).
  131. dropped(#mqueue{dropped = Dropped}) -> Dropped.
  132. %% @doc Stats of the mqueue
  133. -spec(stats(mqueue()) -> [stat()]).
  134. stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
  135. [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
  136. %% @doc Enqueue a message.
  137. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
  138. in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
  139. {_Dropped = Msg, MQ};
  140. in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
  141. p_table = PTab,
  142. q = Q,
  143. len = Len,
  144. max_len = MaxLen,
  145. dropped = Dropped
  146. } = MQ) ->
  147. Priority = get_priority(Topic, PTab, Dp),
  148. PLen = ?PQUEUE:plen(Priority, Q),
  149. case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
  150. true ->
  151. %% reached max length, drop the oldest message
  152. {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
  153. Q2 = ?PQUEUE:in(Msg, Priority, Q1),
  154. {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
  155. false ->
  156. {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
  157. end.
  158. -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
  159. out(MQ = #mqueue{len = 0, q = Q}) ->
  160. 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
  161. {empty, MQ};
  162. out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts}) ->
  163. {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
  164. MQ1 = MQ#mqueue{
  165. q = Q1,
  166. len = Len - 1,
  167. last_prio = Prio,
  168. p_credit = get_credits(Prio, ShiftOpts)
  169. },
  170. {{value, Val}, MQ1};
  171. out(MQ = #mqueue{q = Q, p_credit = 0}) ->
  172. MQ1 = MQ#mqueue{
  173. q = ?PQUEUE:shift(Q),
  174. last_prio = undefined
  175. },
  176. out(MQ1);
  177. out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) ->
  178. {R, Q1} = ?PQUEUE:out(Q),
  179. {R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}.
  180. get_opt(Key, Opts, Default) ->
  181. case maps:get(Key, Opts, Default) of
  182. undefined -> Default;
  183. X -> X
  184. end.
  185. get_priority_opt(Opts) ->
  186. case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
  187. lowest -> ?LOWEST_PRIORITY;
  188. highest -> ?HIGHEST_PRIORITY;
  189. N when is_integer(N) -> N
  190. end.
  191. %% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
  192. %% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
  193. %% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
  194. %% while the highest 'infinity' is a [{infinity, queue:queue()}]
  195. get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
  196. get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
  197. get_credits(?HIGHEST_PRIORITY, Opts) ->
  198. Infinity = 1000000,
  199. get_credits(Infinity, Opts);
  200. get_credits(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
  201. (Prio + Base + 1) * Mult - 1.
  202. get_shift_opt(Opts) ->
  203. %% Using 10 as a multiplier by default. This is needed to minimize
  204. %% overhead of ?PQUEUE:rotate
  205. Mult = maps:get(shift_multiplier, Opts, 10),
  206. true = is_integer(Mult) andalso Mult > 0,
  207. Min = case Opts of
  208. #{p_table := PTab} ->
  209. case maps:size(PTab) of
  210. 0 -> 0;
  211. _ -> lists:min(maps:values(PTab))
  212. end;
  213. _ ->
  214. ?LOWEST_PRIORITY
  215. end,
  216. %% `mqueue' module supports negative priorities, but we don't want
  217. %% the counter to be negative, so all priorities should be shifted
  218. %% by a constant, if negative priorities are used:
  219. Base = case Min < 0 of
  220. true -> -Min;
  221. false -> 0
  222. end,
  223. #shift_opts{
  224. multiplier = Mult,
  225. base = Base
  226. }.