Prechádzať zdrojové kódy

chore(mqueue): Implement live upgrade

k32 4 rokov pred
rodič
commit
5fc1036cf7
2 zmenil súbory, kde vykonal 69 pridanie a 8 odobranie
  1. 42 8
      src/emqx_mqueue.erl
  2. 27 0
      test/emqx_mqueue_SUITE.erl

+ 42 - 8
src/emqx_mqueue.erl

@@ -67,6 +67,9 @@
         , dropped/1
         ]).
 
+-export([ live_upgrade/1
+        ]).
+
 -export_type([mqueue/0, options/0]).
 
 -type(topic() :: emqx_topic:topic()).
@@ -111,6 +114,8 @@
 
 -type(mqueue() :: #mqueue{}).
 
+-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
+
 -spec(init(options()) -> mqueue()).
 init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
     MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
@@ -136,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
 info(len, #mqueue{len = Len}) ->
     Len;
 info(dropped, #mqueue{dropped = Dropped}) ->
-    Dropped.
+    Dropped;
+info(Info, ?OLD(MQ)) ->
+    info(Info, live_upgrade(MQ)).
 
-is_empty(#mqueue{len = Len}) -> Len =:= 0.
+is_empty(#mqueue{len = Len}) -> Len =:= 0;
+is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
 
-len(#mqueue{len = Len}) -> Len.
+len(#mqueue{len = Len}) -> Len;
+len(?OLD(MQ)) -> len(live_upgrade(MQ)).
 
-max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
+max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
+max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
 
 %% @doc Return number of dropped messages.
 -spec(dropped(mqueue()) -> count()).
-dropped(#mqueue{dropped = Dropped}) -> Dropped.
+dropped(#mqueue{dropped = Dropped}) -> Dropped;
+dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
 
 %% @doc Stats of the mqueue
 -spec(stats(mqueue()) -> [stat()]).
 stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
-    [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
+    [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
+stats(?OLD(MQ)) ->
+    stats(live_upgrade(MQ)).
 
 %% @doc Enqueue a message.
 -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
@@ -174,7 +187,9 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
             {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
         false ->
             {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
-    end.
+    end;
+in(Msg, ?OLD(MQ)) ->
+    in(Msg, live_upgrade(MQ)).
 
 -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
 out(MQ = #mqueue{len = 0, q = Q}) ->
@@ -198,7 +213,9 @@ out(MQ = #mqueue{q = Q, counter = 0}) ->
 out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
     ct:pal("Cnt ~p", [Cnt]),
     {R, Q1} = ?PQUEUE:out(Q),
-    {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}.
+    {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
+out(?OLD(MQ)) ->
+    out(live_upgrade(MQ)).
 
 get_opt(Key, Opts, Default) ->
     case maps:get(Key, Opts, Default) of
@@ -245,3 +262,20 @@ get_shift_opt(Opts) ->
        multiplier = Mult,
        base       = Base
       }.
+
+live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
+    ShiftOpts = case is_map(PTable) of
+                    true  -> get_shift_opt(#{p_table => PTable});
+                    false -> get_shift_opt(#{})
+                end,
+    #mqueue{ store_qos0 = StoreQos0
+           , max_len    = MaxLen
+           , dropped    = Dropped
+           , p_table    = PTable
+           , default_p  = DefaultP
+           , len        = Len
+           , q          = Q
+           , shift_opts = ShiftOpts
+           , last_p     = undefined
+           , counter    = undefined
+           }.

+ 27 - 0
test/emqx_mqueue_SUITE.erl

@@ -211,6 +211,33 @@ t_dropped(_) ->
     {Msg, Q2} = ?Q:in(Msg, Q1),
     ?assertEqual(1, ?Q:dropped(Q2)).
 
+t_live_upgrade(_) ->
+    Q = {mqueue,true,1,0,0,none,0,
+                   {queue,[],[],0}},
+    ?assertMatch(#{}, ?Q:info(Q)),
+    ?assertMatch(true, ?Q:is_empty(Q)),
+    ?assertMatch(0, ?Q:len(Q)),
+    ?assertMatch(1, ?Q:max_len(Q)),
+    ?assertMatch({undefined, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)),
+    ?assertMatch({empty, _}, ?Q:out(Q)),
+    ?assertMatch([_|_], ?Q:stats(Q)),
+    ?assertMatch(0, ?Q:dropped(Q)).
+
+
+t_live_upgrade2(_) ->
+    Q = {mqueue,false,10,0,0,
+                   #{<<"t">> => 1},
+                   0,
+                   {queue,[],[],0}},
+    ?assertMatch(#{}, ?Q:info(Q)),
+    ?assertMatch(true, ?Q:is_empty(Q)),
+    ?assertMatch(0, ?Q:len(Q)),
+    ?assertMatch(10, ?Q:max_len(Q)),
+    ?assertMatch({_, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)),
+    ?assertMatch({empty, _}, ?Q:out(Q)),
+    ?assertMatch([_|_], ?Q:stats(Q)),
+    ?assertMatch(0, ?Q:dropped(Q)).
+
 conservation_prop() ->
     ?FORALL({Priorities, Messages},
             ?LET(Priorities, topic_priorities(),