| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- %% The contents of this file are subject to the Mozilla Public License
- %% Version 1.1 (the "License"); you may not use this file except in
- %% compliance with the License. You may obtain a copy of the License
- %% at http://www.mozilla.org/MPL/
- %%
- %% Software distributed under the License is distributed on an "AS IS"
- %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- %% the License for the specific language governing rights and
- %% limitations under the License.
- %%
- %% The Original Code is RabbitMQ.
- %%
- %% The Initial Developer of the Original Code is GoPivotal, Inc.
- %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
- %%
- %% Priority queues have essentially the same interface as ordinary
- %% queues, except that a) there is an in/3 that takes a priority, and
- %% b) we have only implemented the core API we need.
- %%
- %% Priorities should be integers - the higher the value the higher the
- %% priority - but we don't actually check that.
- %%
- %% in/2 inserts items with priority 0.
- %%
- %% We optimise the case where a priority queue is being used just like
- %% an ordinary queue. When that is the case we represent the priority
- %% queue as an ordinary queue. We could just call into the 'queue'
- %% module for that, but for efficiency we implement the relevant
- %% functions directly in here, thus saving on inter-module calls and
- %% eliminating a level of boxing.
- %%
- %% When the queue contains items with non-zero priorities, it is
- %% represented as a sorted kv list with the inverted Priority as the
- %% key and an ordinary queue as the value. Here again we use our own
- %% ordinary queue implementation for efficiency, often making recursive
- %% calls into the same function knowing that ordinary queues represent
- %% a base case.
- -module(emqx_pqueue).
- -export([
- new/0,
- is_queue/1,
- is_empty/1,
- len/1,
- plen/2,
- to_list/1,
- from_list/1,
- in/2,
- in/3,
- out/1,
- out/2,
- out_p/1,
- join/2,
- filter/2,
- fold/3,
- highest/1,
- shift/1
- ]).
- -export_type([q/0]).
- %%----------------------------------------------------------------------------
- -type priority() :: integer() | 'infinity'.
- -type squeue() :: {queue, [any()], [any()], non_neg_integer()}.
- -type pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}.
- -type q() :: pqueue().
- %%----------------------------------------------------------------------------
- -spec new() -> pqueue().
- new() ->
- {queue, [], [], 0}.
- -spec is_queue(any()) -> boolean().
- is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) ->
- true;
- is_queue({pqueue, Queues}) when is_list(Queues) ->
- lists:all(
- fun
- ({infinity, Q}) -> is_queue(Q);
- ({P, Q}) -> is_integer(P) andalso is_queue(Q)
- end,
- Queues
- );
- is_queue(_) ->
- false.
- -spec is_empty(pqueue()) -> boolean().
- is_empty({queue, [], [], 0}) ->
- true;
- is_empty(_) ->
- false.
- -spec len(pqueue()) -> non_neg_integer().
- len({queue, _R, _F, L}) ->
- L;
- len({pqueue, Queues}) ->
- lists:sum([len(Q) || {_, Q} <- Queues]).
- -spec plen(priority(), pqueue()) -> non_neg_integer().
- plen(0, {queue, _R, _F, L}) ->
- L;
- plen(_, {queue, _R, _F, _}) ->
- 0;
- plen(P, {pqueue, Queues}) ->
- case lists:keysearch(maybe_negate_priority(P), 1, Queues) of
- {value, {_, Q}} -> len(Q);
- false -> 0
- end.
- -spec to_list(pqueue()) -> [{priority(), any()}].
- to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
- [{0, V} || V <- Out ++ lists:reverse(In, [])];
- to_list({pqueue, Queues}) ->
- [
- {maybe_negate_priority(P), V}
- || {P, Q} <- Queues,
- {0, V} <- to_list(Q)
- ].
- -spec from_list([{priority(), any()}]) -> pqueue().
- from_list(L) ->
- lists:foldl(fun({P, E}, Q) -> in(E, P, Q) end, new(), L).
- -spec in(any(), pqueue()) -> pqueue().
- in(Item, Q) ->
- in(Item, 0, Q).
- -spec in(any(), priority(), pqueue()) -> pqueue().
- in(X, 0, {queue, [_] = In, [], 1}) ->
- {queue, [X], In, 2};
- in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
- {queue, [X | In], Out, Len + 1};
- in(X, Priority, _Q = {queue, [], [], 0}) ->
- in(X, Priority, {pqueue, []});
- in(X, Priority, Q = {queue, _, _, _}) ->
- in(X, Priority, {pqueue, [{0, Q}]});
- in(X, Priority, {pqueue, Queues}) ->
- P = maybe_negate_priority(Priority),
- {pqueue,
- case lists:keysearch(P, 1, Queues) of
- {value, {_, Q}} ->
- lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
- false when P == infinity ->
- [{P, {queue, [X], [], 1}} | Queues];
- false ->
- case Queues of
- [{infinity, InfQueue} | Queues1] ->
- [
- {infinity, InfQueue}
- | lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])
- ];
- _ ->
- lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues])
- end
- end}.
- -spec out(pqueue()) -> {empty | {value, any()}, pqueue()}.
- out({queue, [], [], 0} = Q) ->
- {empty, Q};
- out({queue, [V], [], 1}) ->
- {{value, V}, {queue, [], [], 0}};
- out({queue, [Y | In], [], Len}) ->
- [V | Out] = lists:reverse(In, []),
- {{value, V}, {queue, [Y], Out, Len - 1}};
- out({queue, In, [V], Len}) when is_list(In) ->
- {{value, V}, r2f(In, Len - 1)};
- out({queue, In, [V | Out], Len}) when is_list(In) ->
- {{value, V}, {queue, In, Out, Len - 1}};
- out({pqueue, [{P, Q} | Queues]}) ->
- {R, Q1} = out(Q),
- NewQ =
- case is_empty(Q1) of
- true ->
- case Queues of
- [] -> {queue, [], [], 0};
- [{0, OnlyQ}] -> OnlyQ;
- [_ | _] -> {pqueue, Queues}
- end;
- false ->
- {pqueue, [{P, Q1} | Queues]}
- end,
- {R, NewQ}.
- -spec shift(pqueue()) -> pqueue().
- shift(Q = {queue, _, _, _}) ->
- Q;
- shift({pqueue, []}) ->
- %% Shouldn't happen?
- {pqueue, []};
- shift({pqueue, [Hd | Rest]}) ->
- %% Let's hope there are not many priorities.
- {pqueue, Rest ++ [Hd]}.
- -spec out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}.
- out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
- out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
- out(0, {queue, _, _, _} = Q) ->
- out(Q);
- out(Priority, {queue, _, _, _}) ->
- erlang:error(badarg, [Priority]);
- out(Priority, {pqueue, Queues}) ->
- P = maybe_negate_priority(Priority),
- case lists:keysearch(P, 1, Queues) of
- {value, {_, Q}} ->
- {R, Q1} = out(Q),
- Queues1 =
- case is_empty(Q1) of
- true -> lists:keydelete(P, 1, Queues);
- false -> lists:keyreplace(P, 1, Queues, {P, Q1})
- end,
- {R,
- case Queues1 of
- [] -> {queue, [], [], 0};
- [{0, OnlyQ}] -> OnlyQ;
- [_ | _] -> {pqueue, Queues1}
- end};
- false ->
- {empty, {pqueue, Queues}}
- end.
- add_p(R, P) ->
- case R of
- {empty, Q} -> {empty, Q};
- {{value, V}, Q} -> {{value, V, P}, Q}
- end.
- -spec join(pqueue(), pqueue()) -> pqueue().
- join(A, {queue, [], [], 0}) ->
- A;
- join({queue, [], [], 0}, B) ->
- B;
- join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) ->
- {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen};
- join(A = {queue, _, _, _}, {pqueue, BPQ}) ->
- {Pre, Post} =
- lists:splitwith(fun({P, _}) -> P < 0 orelse P == infinity end, BPQ),
- Post1 =
- case Post of
- [] -> [{0, A}];
- [{0, ZeroQueue} | Rest] -> [{0, join(A, ZeroQueue)} | Rest];
- _ -> [{0, A} | Post]
- end,
- {pqueue, Pre ++ Post1};
- join({pqueue, APQ}, B = {queue, _, _, _}) ->
- {Pre, Post} =
- lists:splitwith(fun({P, _}) -> P < 0 orelse P == infinity end, APQ),
- Post1 =
- case Post of
- [] -> [{0, B}];
- [{0, ZeroQueue} | Rest] -> [{0, join(ZeroQueue, B)} | Rest];
- _ -> [{0, B} | Post]
- end,
- {pqueue, Pre ++ Post1};
- join({pqueue, APQ}, {pqueue, BPQ}) ->
- {pqueue, merge(APQ, BPQ, [])}.
- merge([], BPQ, Acc) ->
- lists:reverse(Acc, BPQ);
- merge(APQ, [], Acc) ->
- lists:reverse(Acc, APQ);
- merge([{P, A} | As], [{P, B} | Bs], Acc) ->
- merge(As, Bs, [{P, join(A, B)} | Acc]);
- merge([{PA, A} | As], Bs = [{PB, _} | _], Acc) when PA < PB orelse PA == infinity ->
- merge(As, Bs, [{PA, A} | Acc]);
- merge(As = [{_, _} | _], [{PB, B} | Bs], Acc) ->
- merge(As, Bs, [{PB, B} | Acc]).
- -spec filter(fun((any()) -> boolean()), pqueue()) -> pqueue().
- filter(Pred, Q) ->
- fold(
- fun(V, P, Acc) ->
- case Pred(V) of
- true -> in(V, P, Acc);
- false -> Acc
- end
- end,
- new(),
- Q
- ).
- -spec fold(fun((any(), priority(), A) -> A), A, pqueue()) -> A.
- fold(Fun, Init, Q) ->
- case out_p(Q) of
- {empty, _Q} -> Init;
- {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
- end.
- -spec highest(pqueue()) -> priority() | 'empty'.
- highest({queue, [], [], 0}) -> empty;
- highest({queue, _, _, _}) -> 0;
- highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
- r2f([], 0) -> {queue, [], [], 0};
- r2f([_] = R, 1) -> {queue, [], R, 1};
- r2f([X, Y], 2) -> {queue, [X], [Y], 2};
- r2f([X, Y | R], L) -> {queue, [X, Y], lists:reverse(R, []), L}.
- maybe_negate_priority(infinity) -> infinity;
- maybe_negate_priority(P) -> -P.
|