priority_queue.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. %% The contents of this file are subject to the Mozilla Public License
  2. %% Version 1.1 (the "License"); you may not use this file except in
  3. %% compliance with the License. You may obtain a copy of the License
  4. %% at http://www.mozilla.org/MPL/
  5. %%
  6. %% Software distributed under the License is distributed on an "AS IS"
  7. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  8. %% the License for the specific language governing rights and
  9. %% limitations under the License.
  10. %%
  11. %% The Original Code is RabbitMQ.
  12. %%
  13. %% The Initial Developer of the Original Code is VMware, Inc.
  14. %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
  15. %%
  16. %% Priority queues have essentially the same interface as ordinary
  17. %% queues, except that a) there is an in/3 that takes a priority, and
  18. %% b) we have only implemented the core API we need.
  19. %%
  20. %% Priorities should be integers - the higher the value the higher the
  21. %% priority - but we don't actually check that.
  22. %%
  23. %% in/2 inserts items with priority 0.
  24. %%
  25. %% We optimise the case where a priority queue is being used just like
  26. %% an ordinary queue. When that is the case we represent the priority
  27. %% queue as an ordinary queue. We could just call into the 'queue'
  28. %% module for that, but for efficiency we implement the relevant
  29. %% functions directly in here, thus saving on inter-module calls and
  30. %% eliminating a level of boxing.
  31. %%
  32. %% When the queue contains items with non-zero priorities, it is
  33. %% represented as a sorted kv list with the inverted Priority as the
  34. %% key and an ordinary queue as the value. Here again we use our own
  35. %% ordinary queue implemention for efficiency, often making recursive
  36. %% calls into the same function knowing that ordinary queues represent
  37. %% a base case.
  38. -module(priority_queue).
  39. -export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
  40. out/1, join/2]).
  41. %%----------------------------------------------------------------------------
  42. -ifdef(use_specs).
  43. -export_type([q/0]).
  44. -type(q() :: pqueue()).
  45. -type(priority() :: integer() | 'infinity').
  46. -type(squeue() :: {queue, [any()], [any()]}).
  47. -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
  48. -spec(new/0 :: () -> pqueue()).
  49. -spec(is_queue/1 :: (any()) -> boolean()).
  50. -spec(is_empty/1 :: (pqueue()) -> boolean()).
  51. -spec(len/1 :: (pqueue()) -> non_neg_integer()).
  52. -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
  53. -spec(in/2 :: (any(), pqueue()) -> pqueue()).
  54. -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
  55. -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
  56. -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
  57. -endif.
  58. %%----------------------------------------------------------------------------
  59. new() ->
  60. {queue, [], []}.
  61. is_queue({queue, R, F}) when is_list(R), is_list(F) ->
  62. true;
  63. is_queue({pqueue, Queues}) when is_list(Queues) ->
  64. lists:all(fun ({infinity, Q}) -> is_queue(Q);
  65. ({P, Q}) -> is_integer(P) andalso is_queue(Q)
  66. end, Queues);
  67. is_queue(_) ->
  68. false.
  69. is_empty({queue, [], []}) ->
  70. true;
  71. is_empty(_) ->
  72. false.
  73. len({queue, R, F}) when is_list(R), is_list(F) ->
  74. length(R) + length(F);
  75. len({pqueue, Queues}) ->
  76. lists:sum([len(Q) || {_, Q} <- Queues]).
  77. to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
  78. [{0, V} || V <- Out ++ lists:reverse(In, [])];
  79. to_list({pqueue, Queues}) ->
  80. [{maybe_negate_priority(P), V} || {P, Q} <- Queues,
  81. {0, V} <- to_list(Q)].
  82. in(Item, Q) ->
  83. in(Item, 0, Q).
  84. in(X, 0, {queue, [_] = In, []}) ->
  85. {queue, [X], In};
  86. in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
  87. {queue, [X|In], Out};
  88. in(X, Priority, _Q = {queue, [], []}) ->
  89. in(X, Priority, {pqueue, []});
  90. in(X, Priority, Q = {queue, _, _}) ->
  91. in(X, Priority, {pqueue, [{0, Q}]});
  92. in(X, Priority, {pqueue, Queues}) ->
  93. P = maybe_negate_priority(Priority),
  94. {pqueue, case lists:keysearch(P, 1, Queues) of
  95. {value, {_, Q}} ->
  96. lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
  97. false when P == infinity ->
  98. [{P, {queue, [X], []}} | Queues];
  99. false ->
  100. case Queues of
  101. [{infinity, InfQueue} | Queues1] ->
  102. [{infinity, InfQueue} |
  103. lists:keysort(1, [{P, {queue, [X], []}} | Queues1])];
  104. _ ->
  105. lists:keysort(1, [{P, {queue, [X], []}} | Queues])
  106. end
  107. end}.
  108. out({queue, [], []} = Q) ->
  109. {empty, Q};
  110. out({queue, [V], []}) ->
  111. {{value, V}, {queue, [], []}};
  112. out({queue, [Y|In], []}) ->
  113. [V|Out] = lists:reverse(In, []),
  114. {{value, V}, {queue, [Y], Out}};
  115. out({queue, In, [V]}) when is_list(In) ->
  116. {{value,V}, r2f(In)};
  117. out({queue, In,[V|Out]}) when is_list(In) ->
  118. {{value, V}, {queue, In, Out}};
  119. out({pqueue, [{P, Q} | Queues]}) ->
  120. {R, Q1} = out(Q),
  121. NewQ = case is_empty(Q1) of
  122. true -> case Queues of
  123. [] -> {queue, [], []};
  124. [{0, OnlyQ}] -> OnlyQ;
  125. [_|_] -> {pqueue, Queues}
  126. end;
  127. false -> {pqueue, [{P, Q1} | Queues]}
  128. end,
  129. {R, NewQ}.
  130. join(A, {queue, [], []}) ->
  131. A;
  132. join({queue, [], []}, B) ->
  133. B;
  134. join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
  135. {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
  136. join(A = {queue, _, _}, {pqueue, BPQ}) ->
  137. {Pre, Post} =
  138. lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
  139. Post1 = case Post of
  140. [] -> [ {0, A} ];
  141. [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
  142. _ -> [ {0, A} | Post ]
  143. end,
  144. {pqueue, Pre ++ Post1};
  145. join({pqueue, APQ}, B = {queue, _, _}) ->
  146. {Pre, Post} =
  147. lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
  148. Post1 = case Post of
  149. [] -> [ {0, B} ];
  150. [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
  151. _ -> [ {0, B} | Post ]
  152. end,
  153. {pqueue, Pre ++ Post1};
  154. join({pqueue, APQ}, {pqueue, BPQ}) ->
  155. {pqueue, merge(APQ, BPQ, [])}.
  156. merge([], BPQ, Acc) ->
  157. lists:reverse(Acc, BPQ);
  158. merge(APQ, [], Acc) ->
  159. lists:reverse(Acc, APQ);
  160. merge([{P, A}|As], [{P, B}|Bs], Acc) ->
  161. merge(As, Bs, [ {P, join(A, B)} | Acc ]);
  162. merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
  163. merge(As, Bs, [ {PA, A} | Acc ]);
  164. merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
  165. merge(As, Bs, [ {PB, B} | Acc ]).
  166. r2f([]) -> {queue, [], []};
  167. r2f([_] = R) -> {queue, [], R};
  168. r2f([X,Y]) -> {queue, [X], [Y]};
  169. r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
  170. maybe_negate_priority(infinity) -> infinity;
  171. maybe_negate_priority(P) -> -P.