priority_queue.erl 9.4 KB


  1. %% The contents of this file are subject to the Mozilla Public License
  2. %% Version 1.1 (the "License"); you may not use this file except in
  3. %% compliance with the License. You may obtain a copy of the License
  4. %% at http://www.mozilla.org/MPL/
  5. %%
  6. %% Software distributed under the License is distributed on an "AS IS"
  7. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  8. %% the License for the specific language governing rights and
  9. %% limitations under the License.
  10. %%
  11. %% The Original Code is RabbitMQ.
  12. %%
  13. %% The Initial Developer of the Original Code is GoPivotal, Inc.
  14. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
  15. %%
  16. %% Priority queues have essentially the same interface as ordinary
  17. %% queues, except that a) there is an in/3 that takes a priority, and
  18. %% b) we have only implemented the core API we need.
  19. %%
  20. %% Priorities should be integers - the higher the value the higher the
  21. %% priority - but we don't actually check that.
  22. %%
  23. %% in/2 inserts items with priority 0.
  24. %%
  25. %% We optimise the case where a priority queue is being used just like
  26. %% an ordinary queue. When that is the case we represent the priority
  27. %% queue as an ordinary queue. We could just call into the 'queue'
  28. %% module for that, but for efficiency we implement the relevant
  29. %% functions directly in here, thus saving on inter-module calls and
  30. %% eliminating a level of boxing.
  31. %%
  32. %% When the queue contains items with non-zero priorities, it is
  33. %% represented as a sorted kv list with the inverted Priority as the
  34. %% key and an ordinary queue as the value. Here again we use our own
  35. %% ordinary queue implemention for efficiency, often making recursive
  36. %% calls into the same function knowing that ordinary queues represent
  37. %% a base case.
  38. -module(priority_queue).
  39. -export([new/0, is_queue/1, is_empty/1, len/1, plen/2, to_list/1, from_list/1,
  40. in/2, in/3, out/1, out/2, out_p/1, join/2, filter/2, fold/3, highest/1]).
  41. %%----------------------------------------------------------------------------
  42. -ifdef(use_specs).
  43. -type(q() :: pqueue()).
  44. -type(priority() :: integer() | 'infinity').
  45. -type(squeue() :: {queue, [any()], [any()], non_neg_integer()}).
  46. -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
  47. -export_type([q/0]).
  48. -spec(new/0 :: () -> pqueue()).
  49. -spec(is_queue/1 :: (any()) -> boolean()).
  50. -spec(is_empty/1 :: (pqueue()) -> boolean()).
  51. -spec(len/1 :: (pqueue()) -> non_neg_integer()).
  52. -spec(plen/2 :: (priority(), pqueue()) -> non_neg_integer()).
  53. -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
  54. -spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
  55. -spec(in/2 :: (any(), pqueue()) -> pqueue()).
  56. -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
  57. -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
  58. -spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
  59. -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
  60. -spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
  61. -spec(fold/3 ::
  62. (fun ((any(), priority(), A) -> A), A, pqueue()) -> A).
  63. -spec(highest/1 :: (pqueue()) -> priority() | 'empty').
  64. -endif.
  65. %%----------------------------------------------------------------------------
  66. new() ->
  67. {queue, [], [], 0}.
  68. is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) ->
  69. true;
  70. is_queue({pqueue, Queues}) when is_list(Queues) ->
  71. lists:all(fun ({infinity, Q}) -> is_queue(Q);
  72. ({P, Q}) -> is_integer(P) andalso is_queue(Q)
  73. end, Queues);
  74. is_queue(_) ->
  75. false.
  76. is_empty({queue, [], [], 0}) ->
  77. true;
  78. is_empty(_) ->
  79. false.
  80. len({queue, _R, _F, L}) ->
  81. L;
  82. len({pqueue, Queues}) ->
  83. lists:sum([len(Q) || {_, Q} <- Queues]).
  84. plen(0, {queue, _R, _F, L}) ->
  85. L;
  86. plen(_, {queue, _R, _F, _}) ->
  87. 0;
  88. plen(P, {pqueue, Queues}) ->
  89. case lists:keysearch(maybe_negate_priority(P), 1, Queues) of
  90. {value, {_, Q}} -> len(Q);
  91. false -> 0
  92. end.
  93. to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
  94. [{0, V} || V <- Out ++ lists:reverse(In, [])];
  95. to_list({pqueue, Queues}) ->
  96. [{maybe_negate_priority(P), V} || {P, Q} <- Queues,
  97. {0, V} <- to_list(Q)].
  98. from_list(L) ->
  99. lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L).
  100. in(Item, Q) ->
  101. in(Item, 0, Q).
  102. in(X, 0, {queue, [_] = In, [], 1}) ->
  103. {queue, [X], In, 2};
  104. in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
  105. {queue, [X|In], Out, Len + 1};
  106. in(X, Priority, _Q = {queue, [], [], 0}) ->
  107. in(X, Priority, {pqueue, []});
  108. in(X, Priority, Q = {queue, _, _, _}) ->
  109. in(X, Priority, {pqueue, [{0, Q}]});
  110. in(X, Priority, {pqueue, Queues}) ->
  111. P = maybe_negate_priority(Priority),
  112. {pqueue, case lists:keysearch(P, 1, Queues) of
  113. {value, {_, Q}} ->
  114. lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
  115. false when P == infinity ->
  116. [{P, {queue, [X], [], 1}} | Queues];
  117. false ->
  118. case Queues of
  119. [{infinity, InfQueue} | Queues1] ->
  120. [{infinity, InfQueue} |
  121. lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])];
  122. _ ->
  123. lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues])
  124. end
  125. end}.
  126. out({queue, [], [], 0} = Q) ->
  127. {empty, Q};
  128. out({queue, [V], [], 1}) ->
  129. {{value, V}, {queue, [], [], 0}};
  130. out({queue, [Y|In], [], Len}) ->
  131. [V|Out] = lists:reverse(In, []),
  132. {{value, V}, {queue, [Y], Out, Len - 1}};
  133. out({queue, In, [V], Len}) when is_list(In) ->
  134. {{value,V}, r2f(In, Len - 1)};
  135. out({queue, In,[V|Out], Len}) when is_list(In) ->
  136. {{value, V}, {queue, In, Out, Len - 1}};
  137. out({pqueue, [{P, Q} | Queues]}) ->
  138. {R, Q1} = out(Q),
  139. NewQ = case is_empty(Q1) of
  140. true -> case Queues of
  141. [] -> {queue, [], [], 0};
  142. [{0, OnlyQ}] -> OnlyQ;
  143. [_|_] -> {pqueue, Queues}
  144. end;
  145. false -> {pqueue, [{P, Q1} | Queues]}
  146. end,
  147. {R, NewQ}.
  148. out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
  149. out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
  150. out(0, {queue, _, _, _} = Q) ->
  151. out(Q);
  152. out(Priority, {queue, _, _, _}) ->
  153. erlang:error(badarg, [Priority]);
  154. out(Priority, {pqueue, Queues}) ->
  155. P = maybe_negate_priority(Priority),
  156. case lists:keysearch(P, 1, Queues) of
  157. {value, {_, Q}} ->
  158. {R, Q1} = out(Q),
  159. Queues1 = case is_empty(Q1) of
  160. true -> lists:keydelete(P, 1, Queues);
  161. false -> lists:keyreplace(P, 1, Queues, {P, Q1})
  162. end,
  163. {R, case Queues1 of
  164. [] -> {queue, [], [], 0};
  165. [{0, OnlyQ}] -> OnlyQ;
  166. [_|_] -> {pqueue, Queues1}
  167. end};
  168. false ->
  169. {empty, {pqueue, Queues}}
  170. end.
  171. add_p(R, P) -> case R of
  172. {empty, Q} -> {empty, Q};
  173. {{value, V}, Q} -> {{value, V, P}, Q}
  174. end.
  175. join(A, {queue, [], [], 0}) ->
  176. A;
  177. join({queue, [], [], 0}, B) ->
  178. B;
  179. join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) ->
  180. {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen};
  181. join(A = {queue, _, _, _}, {pqueue, BPQ}) ->
  182. {Pre, Post} =
  183. lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
  184. Post1 = case Post of
  185. [] -> [ {0, A} ];
  186. [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
  187. _ -> [ {0, A} | Post ]
  188. end,
  189. {pqueue, Pre ++ Post1};
  190. join({pqueue, APQ}, B = {queue, _, _, _}) ->
  191. {Pre, Post} =
  192. lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
  193. Post1 = case Post of
  194. [] -> [ {0, B} ];
  195. [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
  196. _ -> [ {0, B} | Post ]
  197. end,
  198. {pqueue, Pre ++ Post1};
  199. join({pqueue, APQ}, {pqueue, BPQ}) ->
  200. {pqueue, merge(APQ, BPQ, [])}.
  201. merge([], BPQ, Acc) ->
  202. lists:reverse(Acc, BPQ);
  203. merge(APQ, [], Acc) ->
  204. lists:reverse(Acc, APQ);
  205. merge([{P, A}|As], [{P, B}|Bs], Acc) ->
  206. merge(As, Bs, [ {P, join(A, B)} | Acc ]);
  207. merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
  208. merge(As, Bs, [ {PA, A} | Acc ]);
  209. merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
  210. merge(As, Bs, [ {PB, B} | Acc ]).
  211. filter(Pred, Q) -> fold(fun(V, P, Acc) ->
  212. case Pred(V) of
  213. true -> in(V, P, Acc);
  214. false -> Acc
  215. end
  216. end, new(), Q).
  217. fold(Fun, Init, Q) -> case out_p(Q) of
  218. {empty, _Q} -> Init;
  219. {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
  220. end.
  221. highest({queue, [], [], 0}) -> empty;
  222. highest({queue, _, _, _}) -> 0;
  223. highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
  224. r2f([], 0) -> {queue, [], [], 0};
  225. r2f([_] = R, 1) -> {queue, [], R, 1};
  226. r2f([X,Y], 2) -> {queue, [X], [Y], 2};
  227. r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
  228. maybe_negate_priority(infinity) -> infinity;
  229. maybe_negate_priority(P) -> -P.