emqx_pqueue.erl 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. %% The contents of this file are subject to the Mozilla Public License
  2. %% Version 1.1 (the "License"); you may not use this file except in
  3. %% compliance with the License. You may obtain a copy of the License
  4. %% at http://www.mozilla.org/MPL/
  5. %%
  6. %% Software distributed under the License is distributed on an "AS IS"
  7. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  8. %% the License for the specific language governing rights and
  9. %% limitations under the License.
  10. %%
  11. %% The Original Code is RabbitMQ.
  12. %%
  13. %% The Initial Developer of the Original Code is GoPivotal, Inc.
  14. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
  15. %%
  16. %% Priority queues have essentially the same interface as ordinary
  17. %% queues, except that a) there is an in/3 that takes a priority, and
  18. %% b) we have only implemented the core API we need.
  19. %%
  20. %% Priorities should be integers - the higher the value the higher the
  21. %% priority - but we don't actually check that.
  22. %%
  23. %% in/2 inserts items with priority 0.
  24. %%
  25. %% We optimise the case where a priority queue is being used just like
  26. %% an ordinary queue. When that is the case we represent the priority
  27. %% queue as an ordinary queue. We could just call into the 'queue'
  28. %% module for that, but for efficiency we implement the relevant
  29. %% functions directly in here, thus saving on inter-module calls and
  30. %% eliminating a level of boxing.
  31. %%
  32. %% When the queue contains items with non-zero priorities, it is
  33. %% represented as a sorted kv list with the inverted Priority as the
  34. %% key and an ordinary queue as the value. Here again we use our own
  35. %% ordinary queue implementation for efficiency, often making recursive
  36. %% calls into the same function knowing that ordinary queues represent
  37. %% a base case.
  38. -module(emqx_pqueue).
  39. -export([
  40. new/0,
  41. is_queue/1,
  42. is_empty/1,
  43. len/1,
  44. plen/2,
  45. to_list/1,
  46. from_list/1,
  47. in/2,
  48. in/3,
  49. out/1,
  50. out/2,
  51. out_p/1,
  52. join/2,
  53. filter/2,
  54. fold/3,
  55. highest/1,
  56. shift/1
  57. ]).
  58. -export_type([q/0]).
  59. %%----------------------------------------------------------------------------
  60. -type priority() :: integer() | 'infinity'.
  61. -type squeue() :: {queue, [any()], [any()], non_neg_integer()}.
  62. -type pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}.
  63. -type q() :: pqueue().
  64. %%----------------------------------------------------------------------------
  65. -spec new() -> pqueue().
  66. new() ->
  67. {queue, [], [], 0}.
  68. -spec is_queue(any()) -> boolean().
  69. is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) ->
  70. true;
  71. is_queue({pqueue, Queues}) when is_list(Queues) ->
  72. lists:all(
  73. fun
  74. ({infinity, Q}) -> is_queue(Q);
  75. ({P, Q}) -> is_integer(P) andalso is_queue(Q)
  76. end,
  77. Queues
  78. );
  79. is_queue(_) ->
  80. false.
  81. -spec is_empty(pqueue()) -> boolean().
  82. is_empty({queue, [], [], 0}) ->
  83. true;
  84. is_empty(_) ->
  85. false.
  86. -spec len(pqueue()) -> non_neg_integer().
  87. len({queue, _R, _F, L}) ->
  88. L;
  89. len({pqueue, Queues}) ->
  90. lists:sum([len(Q) || {_, Q} <- Queues]).
  91. -spec plen(priority(), pqueue()) -> non_neg_integer().
  92. plen(0, {queue, _R, _F, L}) ->
  93. L;
  94. plen(_, {queue, _R, _F, _}) ->
  95. 0;
  96. plen(P, {pqueue, Queues}) ->
  97. case lists:keysearch(maybe_negate_priority(P), 1, Queues) of
  98. {value, {_, Q}} -> len(Q);
  99. false -> 0
  100. end.
  101. -spec to_list(pqueue()) -> [{priority(), any()}].
  102. to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
  103. [{0, V} || V <- Out ++ lists:reverse(In, [])];
  104. to_list({pqueue, Queues}) ->
  105. [
  106. {maybe_negate_priority(P), V}
  107. || {P, Q} <- Queues,
  108. {0, V} <- to_list(Q)
  109. ].
  110. -spec from_list([{priority(), any()}]) -> pqueue().
  111. from_list(L) ->
  112. lists:foldl(fun({P, E}, Q) -> in(E, P, Q) end, new(), L).
  113. -spec in(any(), pqueue()) -> pqueue().
  114. in(Item, Q) ->
  115. in(Item, 0, Q).
  116. -spec in(any(), priority(), pqueue()) -> pqueue().
  117. in(X, 0, {queue, [_] = In, [], 1}) ->
  118. {queue, [X], In, 2};
  119. in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
  120. {queue, [X | In], Out, Len + 1};
  121. in(X, Priority, _Q = {queue, [], [], 0}) ->
  122. in(X, Priority, {pqueue, []});
  123. in(X, Priority, Q = {queue, _, _, _}) ->
  124. in(X, Priority, {pqueue, [{0, Q}]});
  125. in(X, Priority, {pqueue, Queues}) ->
  126. P = maybe_negate_priority(Priority),
  127. {pqueue,
  128. case lists:keysearch(P, 1, Queues) of
  129. {value, {_, Q}} ->
  130. lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
  131. false when P == infinity ->
  132. [{P, {queue, [X], [], 1}} | Queues];
  133. false ->
  134. case Queues of
  135. [{infinity, InfQueue} | Queues1] ->
  136. [
  137. {infinity, InfQueue}
  138. | lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])
  139. ];
  140. _ ->
  141. lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues])
  142. end
  143. end}.
  144. -spec out(pqueue()) -> {empty | {value, any()}, pqueue()}.
  145. out({queue, [], [], 0} = Q) ->
  146. {empty, Q};
  147. out({queue, [V], [], 1}) ->
  148. {{value, V}, {queue, [], [], 0}};
  149. out({queue, [Y | In], [], Len}) ->
  150. [V | Out] = lists:reverse(In, []),
  151. {{value, V}, {queue, [Y], Out, Len - 1}};
  152. out({queue, In, [V], Len}) when is_list(In) ->
  153. {{value, V}, r2f(In, Len - 1)};
  154. out({queue, In, [V | Out], Len}) when is_list(In) ->
  155. {{value, V}, {queue, In, Out, Len - 1}};
  156. out({pqueue, [{P, Q} | Queues]}) ->
  157. {R, Q1} = out(Q),
  158. NewQ =
  159. case is_empty(Q1) of
  160. true ->
  161. case Queues of
  162. [] -> {queue, [], [], 0};
  163. [{0, OnlyQ}] -> OnlyQ;
  164. [_ | _] -> {pqueue, Queues}
  165. end;
  166. false ->
  167. {pqueue, [{P, Q1} | Queues]}
  168. end,
  169. {R, NewQ}.
  170. -spec shift(pqueue()) -> pqueue().
  171. shift(Q = {queue, _, _, _}) ->
  172. Q;
  173. shift({pqueue, []}) ->
  174. %% Shouldn't happen?
  175. {pqueue, []};
  176. shift({pqueue, [Hd | Rest]}) ->
  177. %% Let's hope there are not many priorities.
  178. {pqueue, Rest ++ [Hd]}.
  179. -spec out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}.
  180. out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
  181. out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
  182. out(0, {queue, _, _, _} = Q) ->
  183. out(Q);
  184. out(Priority, {queue, _, _, _}) ->
  185. erlang:error(badarg, [Priority]);
  186. out(Priority, {pqueue, Queues}) ->
  187. P = maybe_negate_priority(Priority),
  188. case lists:keysearch(P, 1, Queues) of
  189. {value, {_, Q}} ->
  190. {R, Q1} = out(Q),
  191. Queues1 =
  192. case is_empty(Q1) of
  193. true -> lists:keydelete(P, 1, Queues);
  194. false -> lists:keyreplace(P, 1, Queues, {P, Q1})
  195. end,
  196. {R,
  197. case Queues1 of
  198. [] -> {queue, [], [], 0};
  199. [{0, OnlyQ}] -> OnlyQ;
  200. [_ | _] -> {pqueue, Queues1}
  201. end};
  202. false ->
  203. {empty, {pqueue, Queues}}
  204. end.
  205. add_p(R, P) ->
  206. case R of
  207. {empty, Q} -> {empty, Q};
  208. {{value, V}, Q} -> {{value, V, P}, Q}
  209. end.
  210. -spec join(pqueue(), pqueue()) -> pqueue().
  211. join(A, {queue, [], [], 0}) ->
  212. A;
  213. join({queue, [], [], 0}, B) ->
  214. B;
  215. join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) ->
  216. {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen};
  217. join(A = {queue, _, _, _}, {pqueue, BPQ}) ->
  218. {Pre, Post} =
  219. lists:splitwith(fun({P, _}) -> P < 0 orelse P == infinity end, BPQ),
  220. Post1 =
  221. case Post of
  222. [] -> [{0, A}];
  223. [{0, ZeroQueue} | Rest] -> [{0, join(A, ZeroQueue)} | Rest];
  224. _ -> [{0, A} | Post]
  225. end,
  226. {pqueue, Pre ++ Post1};
  227. join({pqueue, APQ}, B = {queue, _, _, _}) ->
  228. {Pre, Post} =
  229. lists:splitwith(fun({P, _}) -> P < 0 orelse P == infinity end, APQ),
  230. Post1 =
  231. case Post of
  232. [] -> [{0, B}];
  233. [{0, ZeroQueue} | Rest] -> [{0, join(ZeroQueue, B)} | Rest];
  234. _ -> [{0, B} | Post]
  235. end,
  236. {pqueue, Pre ++ Post1};
  237. join({pqueue, APQ}, {pqueue, BPQ}) ->
  238. {pqueue, merge(APQ, BPQ, [])}.
  239. merge([], BPQ, Acc) ->
  240. lists:reverse(Acc, BPQ);
  241. merge(APQ, [], Acc) ->
  242. lists:reverse(Acc, APQ);
  243. merge([{P, A} | As], [{P, B} | Bs], Acc) ->
  244. merge(As, Bs, [{P, join(A, B)} | Acc]);
  245. merge([{PA, A} | As], Bs = [{PB, _} | _], Acc) when PA < PB orelse PA == infinity ->
  246. merge(As, Bs, [{PA, A} | Acc]);
  247. merge(As = [{_, _} | _], [{PB, B} | Bs], Acc) ->
  248. merge(As, Bs, [{PB, B} | Acc]).
  249. -spec filter(fun((any()) -> boolean()), pqueue()) -> pqueue().
  250. filter(Pred, Q) ->
  251. fold(
  252. fun(V, P, Acc) ->
  253. case Pred(V) of
  254. true -> in(V, P, Acc);
  255. false -> Acc
  256. end
  257. end,
  258. new(),
  259. Q
  260. ).
  261. -spec fold(fun((any(), priority(), A) -> A), A, pqueue()) -> A.
  262. fold(Fun, Init, Q) ->
  263. case out_p(Q) of
  264. {empty, _Q} -> Init;
  265. {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
  266. end.
  267. -spec highest(pqueue()) -> priority() | 'empty'.
  268. highest({queue, [], [], 0}) -> empty;
  269. highest({queue, _, _, _}) -> 0;
  270. highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
  271. r2f([], 0) -> {queue, [], [], 0};
  272. r2f([_] = R, 1) -> {queue, [], R, 1};
  273. r2f([X, Y], 2) -> {queue, [X], [Y], 2};
  274. r2f([X, Y | R], L) -> {queue, [X, Y], lists:reverse(R, []), L}.
  275. maybe_negate_priority(infinity) -> infinity;
  276. maybe_negate_priority(P) -> -P.