Explorar o código

feat(mqueue): Interleave messages with different priorities

k32 %!s(int64=4) %!d(string=hai) anos
pai
achega
4eacaa29bd
Modificáronse 3 ficheiros con 168 adicións e 7 borrados
  1. 55 4
      src/emqx_mqueue.erl
  2. 9 1
      src/emqx_pqueue.erl
  3. 104 2
      test/emqx_mqueue_SUITE.erl

+ 55 - 4
src/emqx_mqueue.erl

@@ -91,6 +91,11 @@
 -define(MAX_LEN_INFINITY, 0).
 -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
 
+-record(shift_opts, {
+          multiplier :: non_neg_integer(),
+          base       :: integer()
+         }).
+
 -record(mqueue, {
           store_qos0 = false              :: boolean(),
           max_len    = ?MAX_LEN_INFINITY  :: count(),
@@ -98,7 +103,10 @@
           dropped    = 0                  :: count(),
           p_table    = ?NO_PRIORITY_TABLE :: p_table(),
           default_p  = ?LOWEST_PRIORITY   :: priority(),
-          q          = ?PQUEUE:new()      :: pq()
+          q          = ?PQUEUE:new()      :: pq(),
+          shift_opts                      :: #shift_opts{},
+          last_p                          :: non_neg_integer() | undefined,
+          counter                         :: non_neg_integer() | undefined
          }).
 
 -type(mqueue() :: #mqueue{}).
@@ -112,7 +120,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
     #mqueue{max_len = MaxLen,
             store_qos0 = QoS_0,
             p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
-            default_p = get_priority_opt(Opts)
+            default_p = get_priority_opt(Opts),
+            shift_opts = get_shift_opt(Opts)
            }.
 
 -spec(info(mqueue()) -> emqx_types:infos()).
@@ -171,9 +180,25 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
 out(MQ = #mqueue{len = 0, q = Q}) ->
     0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
     {empty, MQ};
-out(MQ = #mqueue{q = Q, len = Len}) ->
+out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
+    {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
+    MQ1 = MQ#mqueue{
+            q = Q1,
+            len = Len - 1,
+            last_p = Prio,
+            counter = init_counter(Prio, ShiftOpts)
+           },
+    {{value, Val}, MQ1};
+out(MQ = #mqueue{q = Q, counter = 0}) ->
+    MQ1 = MQ#mqueue{
+            q      = ?PQUEUE:shift(Q),
+            last_p = undefined
+           },
+    out(MQ1);
+out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
+    ct:pal("Cnt ~p", [Cnt]),
     {R, Q1} = ?PQUEUE:out(Q),
-    {R, MQ#mqueue{q = Q1, len = Len - 1}}.
+    {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}.
 
 get_opt(Key, Opts, Default) ->
     case maps:get(Key, Opts, Default) of
@@ -194,3 +219,29 @@ get_priority_opt(Opts) ->
 %% while the highest 'infinity' is a [{infinity, queue:queue()}]
 get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
 get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
+
+init_counter(?HIGHEST_PRIORITY, Opts) ->
+    Infinity = 1000000,
+    init_counter(Infinity, Opts);
+init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
+    (Prio + Base) * Mult.
+
+get_shift_opt(Opts) ->
+    Mult = maps:get(shift_multiplier, Opts, 10),
+    Min = case Opts of
+              #{p_table := PTab} ->
+                  case maps:size(PTab) of
+                      0 -> 0;
+                      _ -> lists:min(maps:values(PTab))
+                  end;
+              _ ->
+                  ?LOWEST_PRIORITY
+          end,
+    Base = case Min < 0 of
+               true  -> -Min;
+               false -> 0
+           end,
+    #shift_opts{
+       multiplier = Mult,
+       base       = Base
+      }.

+ 9 - 1
src/emqx_pqueue.erl

@@ -55,6 +55,7 @@
         , filter/2
         , fold/3
         , highest/1
+        , shift/1
         ]).
 
 -export_type([q/0]).
@@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
            end,
     {R, NewQ}.
 
+-spec(shift(pqueue()) -> pqueue()).
+shift(Q = {queue, _, _, _}) ->
+    Q;
+shift({pqueue, []}) ->
+    {pqueue, []}; %% Shouldn't happen?
+shift({pqueue, [Hd|Rest]}) ->
+    {pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
+
 -spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
 out_p({queue, _, _, _}       = Q) -> add_p(out(Q), 0);
 out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
@@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
 
 maybe_negate_priority(infinity) -> infinity;
 maybe_negate_priority(P)        -> -P.
-

+ 104 - 2
test/emqx_mqueue_SUITE.erl

@@ -22,6 +22,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
+-include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 -define(Q, emqx_mqueue).
@@ -121,8 +122,55 @@ t_priority_mqueue(_) ->
     {_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
     ?assertEqual(5, ?Q:len(Q6)),
     {{value, Msg}, Q7} = ?Q:out(Q6),
-    ?assertEqual(4, ?Q:len(Q7)),
-    ?assertEqual(<<"t3">>, Msg#message.topic).
+    ?assertEqual(4, ?Q:len(Q7)).
+
+t_priority_mqueue_conservation(_) ->
+    true = proper:quickcheck(conservation_prop()).
+
+t_priority_order(_) ->
+    Opts = #{max_len => 5,
+             shift_multiplier => 1,
+             priorities =>
+                 #{<<"t1">> => 0,
+                   <<"t2">> => 1,
+                   <<"t3">> => 2
+                  },
+            store_qos0 => false
+            },
+    Messages = [{Topic, Message} ||
+                   Topic <- [<<"t1">>, <<"t2">>, <<"t3">>],
+                   Message <- lists:seq(1, 10)],
+    Q = lists:foldl(fun({Topic, Message}, Q) ->
+                            element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q))
+                    end,
+                    ?Q:init(Opts),
+                    Messages),
+    ?assertMatch([{<<"t3">>, 6},
+                  {<<"t3">>, 7},
+                  {<<"t3">>, 8},
+
+                  {<<"t2">>, 6},
+                  {<<"t2">>, 7},
+
+                  {<<"t1">>, 6},
+
+                  {<<"t3">>, 9},
+                  {<<"t3">>, 10},
+
+                  {<<"t2">>, 8},
+
+                  %% Note: for performance reasons we don't reset the
+                  %% counter when we run out of messages with the
+                  %% current prio, so next is t1:
+                  {<<"t1">>, 7},
+
+                  {<<"t2">>, 9},
+                  {<<"t2">>, 10},
+
+                  {<<"t1">>, 8},
+                  {<<"t1">>, 9},
+                  {<<"t1">>, 10}
+                 ], drain(Q)).
 
 t_infinity_priority_mqueue(_) ->
     Opts = #{max_len => 0,
@@ -163,3 +211,57 @@ t_dropped(_) ->
     {Msg, Q2} = ?Q:in(Msg, Q1),
     ?assertEqual(1, ?Q:dropped(Q2)).
 
+conservation_prop() ->
+    ?FORALL({Priorities, Messages},
+            ?LET(Priorities, topic_priorities(),
+                 {Priorities, messages(Priorities)}),
+            try
+                Opts = #{max_len => 0,
+                         priorities => maps:from_list(Priorities),
+                         store_qos0 => false},
+                %% Put messages in
+                Q1 = lists:foldl(fun({Topic, Message}, Q) ->
+                                         element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q))
+                                 end,
+                                 ?Q:init(Opts),
+                                 Messages),
+                %% Collect messages
+                Got = lists:sort(drain(Q1)),
+                Expected = lists:sort(Messages),
+                case Expected =:= Got of
+                    true ->
+                        true;
+                    false ->
+                        ct:pal("Mismatch: expected ~p~nGot ~p~n", [Expected, Got]),
+                        false
+                end
+            catch
+                EC:Err:Stack ->
+                    ct:pal("Error: ~p", [{EC, Err, Stack}]),
+                    false
+            end).
+
+%% Proper generators:
+
+topic(Priorities) ->
+    {Topics, _} = lists:unzip(Priorities),
+    oneof(Topics).
+
+topic_priorities() ->
+    non_empty(list({binary(), priority()})).
+
+priority() ->
+    oneof([integer(), infinity]).
+
+messages(Topics) ->
+    list({topic(Topics), binary()}).
+
+%% Internal functions:
+
+drain(Q) ->
+    case ?Q:out(Q) of
+        {empty, _} ->
+            [];
+        {{value, #message{topic = T, payload = P}}, Q1} ->
+            [{T, P}|drain(Q1)]
+    end.