Просмотр исходного кода

feat(sessds): Specialize the interval queue for positive numbers

ieQu1 2 лет назад
Родитель
Сommit
5f85105801
1 измененных файлов с 24 добавлено и 25 удалено
  1. 24 25
      apps/emqx/src/emqx_persistent_session_ds_inflight.erl

+ 24 - 25
apps/emqx/src/emqx_persistent_session_ds_inflight.erl

@@ -211,17 +211,17 @@ pubrec(SeqNo, Rec = #inflight{pubrec_queue = Q0}) ->
 %%%% Interval queue:
 
 %% "Interval queue": a data structure that represents a queue of
-%% monotonically increasing integers in a compact manner. It is
-%% functionally equivalent to a `queue:queue(integer())'.
+%% monotonically increasing non-negative integers in a compact manner.
+%% It is functionally equivalent to a `queue:queue(integer())'.
 -record(iqueue, {
     %% Head interval:
-    head :: integer() | undefined,
-    head_end :: integer() | undefined,
+    head = 0 :: integer(),
+    head_end = 0 :: integer(),
     %% Intermediate ranges:
     queue :: queue:queue({integer(), integer()}),
     %% End interval:
-    tail :: integer() | undefined,
-    tail_end :: integer() | undefined
+    tail = 0 :: integer(),
+    tail_end = 0 :: integer()
 }).
 
 -type iqueue() :: #iqueue{}.
@@ -233,17 +233,20 @@ iqueue_new() ->
 
 %% @doc Push a value into the interval queue:
 -spec ipush(integer(), iqueue()) -> iqueue().
-ipush(Val, Q = #iqueue{tail = undefined, tail_end = undefined}) ->
+ipush(Val, Q = #iqueue{tail_end = Val, head_end = Val}) ->
+    %% Optimization: head and tail intervals overlap, and the newly
+    %% inserted value extends both. Attach it to both intervals, to
+    %% avoid `queue:out' in `ipop':
     Q#iqueue{
-        tail = Val,
-        tail_end = Val + 1
+        tail_end = Val + 1,
+        head_end = Val + 1
     };
 ipush(Val, Q = #iqueue{tail_end = Val}) ->
     %% Extend tail interval:
     Q#iqueue{
         tail_end = Val + 1
     };
-ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when Val > End ->
+ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when is_number(Val), Val > End ->
     IQ = queue:in({Tl, End}, IQ0),
     %% Begin a new interval:
     Q#iqueue{
@@ -253,28 +256,24 @@ ipush(Val, Q = #iqueue{tail = Tl, tail_end = End, queue = IQ0}) when Val > End -
     }.
 
 -spec ipop(iqueue()) -> {{value, integer()}, iqueue()} | {empty, iqueue()}.
-ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd ->
+ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when Hd < HdEnd ->
+    %% Head interval is not empty. Consume a value from it:
     {{value, Hd}, Q#iqueue{head = Hd + 1}};
+ipop(Q = #iqueue{head_end = End, tail_end = End}) ->
+    %% Head interval is fully consumed, and it's overlaps with the
+    %% tail interval. It means the queue is empty:
+    {empty, Q};
 ipop(Q = #iqueue{head = Hd0, tail = Tl, tail_end = TlEnd, queue = IQ0}) ->
+    %% Head interval is fully consumed, and it doesn't overlap with
+    %% the tail interval. Replace the head interval with the next
+    %% interval from the queue or with the tail interval:
     case queue:out(IQ0) of
         {{value, {Hd, HdEnd}}, IQ} ->
-            ipop(Q#iqueue{head = nmax(Hd0, Hd), head_end = HdEnd, queue = IQ});
+            ipop(Q#iqueue{head = max(Hd0, Hd), head_end = HdEnd, queue = IQ});
         {empty, _} ->
-            do_ipop(Q#iqueue{head = nmax(Hd0, Tl), head_end = TlEnd})
+            ipop(Q#iqueue{head = max(Hd0, Tl), head_end = TlEnd})
     end.
 
-do_ipop(Q = #iqueue{head = Hd, head_end = HdEnd}) when is_number(HdEnd), Hd < HdEnd ->
-    {{value, Hd}, Q#iqueue{head = Hd + 1}};
-do_ipop(Q) ->
-    {empty, Q}.
-
-nmax(undefined, N) ->
-    N;
-nmax(N, undefined) ->
-    N;
-nmax(N, M) ->
-    max(N, M).
-
 -ifdef(TEST).
 
 %% Test that behavior of iqueue is identical to that of a regular queue of integers: