Просмотр исходного кода

Improve session takeover (#2831)

Implement the session takover/resumption across nodes
Feng Lee 6 лет назад
Родитель
Сommit
82b9a7c301
5 измененных файлов с 119 добавлено и 46 удалено
  1. 53 17
      src/emqx_channel.erl
  2. 12 6
      src/emqx_cm.erl
  3. 19 12
      src/emqx_connection.erl
  4. 5 1
      src/emqx_misc.erl
  5. 30 10
      src/emqx_session.erl

+ 53 - 17
src/emqx_channel.erl

@@ -73,10 +73,14 @@
           oom_policy :: emqx_oom:oom_policy(),
           %% Connected
           connected :: boolean(),
+          %% Connected at
           connected_at :: erlang:timestamp(),
           disconnected_at :: erlang:timestamp(),
-          %% Takeover/Resume
+          %% Takeover
+          takeover :: boolean(),
+          %% Resume
           resuming :: boolean(),
+          %% Pending delivers when takeovering
           pendings :: list()
          }).
 
@@ -125,7 +129,10 @@ init(ConnInfo, Options) ->
              gc_state   = GcState,
              oom_policy = OomPolicy,
              timers     = #{stats_timer => StatsTimer},
-             connected  = false
+             connected  = false,
+             takeover   = false,
+             resuming   = false,
+             pendings   = []
             }.
 
 peer_cert_as_username(Options) ->
@@ -234,7 +241,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
             handle_out({connack, ReasonCode}, NChannel)
     end;
 
-handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{protocol = Protocol}) ->
+handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{protocol = Protocol}) ->
     case pipeline([fun validate_packet/2,
                    fun process_alias/2,
                    fun check_publish/2], Packet, Channel) of
@@ -372,9 +379,15 @@ handle_in(Packet, Channel) ->
 
 process_connect(ConnPkt, Channel) ->
     case open_session(ConnPkt, Channel) of
-        {ok, Session, SP} ->
+        {ok, #{session := Session, present := false}} ->
             NChannel = Channel#channel{session = Session},
-            handle_out({connack, ?RC_SUCCESS, sp(SP)}, NChannel);
+            handle_out({connack, ?RC_SUCCESS, sp(false)}, NChannel);
+        {ok, #{session := Session, present := true, pendings := Pendings}} ->
+            NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
+            NChannel = Channel#channel{session  = Session,
+                                       resuming = true,
+                                       pendings = NPendings},
+            handle_out({connack, ?RC_SUCCESS, sp(true)}, NChannel);
         {error, Reason} ->
             %% TODO: Unknown error?
             ?LOG(error, "Failed to open session: ~p", [Reason]),
@@ -474,8 +487,17 @@ handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) ->
                                    fun enrich_server_keepalive/2,
                                    fun enrich_assigned_clientid/2
                                   ], #{}, Channel),
-    NChannel = ensure_keepalive(AckProps, ensure_connected(Channel)),
-    {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), NChannel};
+    AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
+    Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)),
+    case maybe_resume_session(Channel1) of
+        ignore -> {ok, AckPacket, Channel1};
+        {ok, Publishes, NSession} ->
+            Channel2 = Channel1#channel{session  = NSession,
+                                        resuming = false,
+                                        pendings = []},
+            {ok, Packets, _} = handle_out({publish, Publishes}, Channel2),
+            {ok, [AckPacket|Packets], Channel2}
+    end;
 
 handle_out({connack, ReasonCode}, Channel = #channel{client = Client,
                                                      protocol = Protocol
@@ -489,9 +511,12 @@ handle_out({connack, ReasonCode}, Channel = #channel{client = Client,
     Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
     {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel};
 
-handle_out({deliver, Delivers}, Channel = #channel{resuming = true,
-                                                   pendings = Pendings
-                                                  }) ->
+handle_out({deliver, Delivers}, Channel = #channel{session  = Session,
+                                                   connected = false}) ->
+    {ok, Channel#channel{session = emqx_session:enqueue(Delivers, Session)}};
+
+handle_out({deliver, Delivers}, Channel = #channel{takeover = true,
+                                                   pendings = Pendings}) ->
     {ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}};
 
 handle_out({deliver, Delivers}, Channel = #channel{session = Session}) ->
@@ -571,20 +596,18 @@ handle_out({Type, Data}, Channel) ->
 %% Handle call
 %%--------------------------------------------------------------------
 
-%%--------------------------------------------------------------------
-%% Takeover session
-%%--------------------------------------------------------------------
-
+%% Session Takeover
 handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
-    {ok, Session, Channel#channel{resuming = true}};
+    {ok, Session, Channel#channel{takeover = true}};
 
 handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
                                                   pendings = Pendings}) ->
     ok = emqx_session:takeover(Session),
-    {stop, {shutdown, takeovered}, Pendings, Channel};
+    AllPendings = lists:append(emqx_misc:drain_deliver(), Pendings),
+    {stop, {shutdown, takeovered}, AllPendings, Channel};
 
 handle_call(Req, Channel) ->
-    ?LOG(error, "Unexpected call: Req", [Req]),
+    ?LOG(error, "Unexpected call: ~p", [Req]),
     {ok, ignored, Channel}.
 
 %%--------------------------------------------------------------------
@@ -1110,6 +1133,19 @@ ensure_keepalive_timer(Interval, Channel = #channel{client = #{zone := Zone}}) -
     Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
     ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
 
+maybe_resume_session(#channel{resuming = false}) ->
+    ignore;
+maybe_resume_session(#channel{session  = Session,
+                              resuming = true,
+                              pendings = Pendings}) ->
+    {ok, Publishes, Session1} = emqx_session:redeliver(Session),
+    case emqx_session:deliver(Pendings, Session1) of
+        {ok, Session2} ->
+            {ok, Publishes, Session2};
+        {ok, More, Session2} ->
+            {ok, lists:append(Publishes, More), Session2}
+    end.
+
 %%--------------------------------------------------------------------
 %% Is ACL enabled?
 %%--------------------------------------------------------------------

+ 12 - 6
src/emqx_cm.erl

@@ -157,11 +157,15 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
 
 %% @doc Open a session.
 -spec(open_session(boolean(), emqx_types:client(), map())
-      -> {ok, emqx_session:session()} | {error, Reason :: term()}).
+      -> {ok, #{session  := emqx_session:session(),
+                present  := boolean(),
+                pendings => list()}}
+       | {error, Reason :: term()}).
 open_session(true, Client = #{client_id := ClientId}, Options) ->
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId),
-                     {ok, emqx_session:init(Client, Options), false}
+                     Session = emqx_session:init(Client, Options),
+                     {ok, #{session => Session, present => false}}
                  end,
     emqx_cm_locker:trans(ClientId, CleanStart);
 
@@ -169,12 +173,14 @@ open_session(false, Client = #{client_id := ClientId}, Options) ->
     ResumeStart = fun(_) ->
                       case takeover_session(ClientId) of
                           {ok, ConnMod, ChanPid, Session} ->
-                              NSession = emqx_session:resume(ClientId, Session),
+                              ok = emqx_session:resume(ClientId, Session),
                               Pendings = ConnMod:takeover(ChanPid, 'end'),
-                              io:format("Pending Delivers: ~p~n", [Pendings]),
-                              {ok, NSession, true};
+                              {ok, #{session  => Session,
+                                     present  => true,
+                                     pendings => Pendings}};
                           {error, not_found} ->
-                              {ok, emqx_session:init(Client, Options), false}
+                              Session = emqx_session:init(Client, Options),
+                              {ok, #{session => Session, present => false}}
                       end
                   end,
     emqx_cm_locker:trans(ClientId, ResumeStart).

+ 19 - 12
src/emqx_connection.erl

@@ -255,18 +255,8 @@ connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) ->
 connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
     handle_incoming(Packet, fun keep_state/1, State);
 
-connected(info, Deliver = {deliver, _Topic, _Msg},
-          State = #state{chan_state = ChanState}) ->
-    Delivers = emqx_misc:drain_deliver([Deliver]),
-    case emqx_channel:handle_out({deliver, Delivers}, ChanState) of
-        {ok, NChanState} ->
-            keep_state(State#state{chan_state = NChanState});
-        {ok, Packets, NChanState} ->
-            NState = State#state{chan_state = NChanState},
-            handle_outgoing(Packets, fun keep_state/1, NState);
-        {stop, Reason, NChanState} ->
-            stop(Reason, State#state{chan_state = NChanState})
-    end;
+connected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
+    handle_deliver(emqx_misc:drain_deliver([Deliver]), State);
 
 connected(EventType, Content, State) ->
     ?HANDLE(EventType, Content, State).
@@ -279,6 +269,9 @@ disconnected(enter, _, _State) ->
     %% CleanStart is true
     keep_state_and_data;
 
+disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) ->
+    handle_deliver([Deliver], State);
+
 disconnected(EventType, Content, State) ->
     ?HANDLE(EventType, Content, State).
 
@@ -469,6 +462,20 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun,
             handle_outgoing(OutPacket, Shutdown, State#state{chan_state = NChanState})
     end.
 
+%%-------------------------------------------------------------------
+%% Handle deliver
+
+handle_deliver(Delivers,  State = #state{chan_state = ChanState}) ->
+    case emqx_channel:handle_out({deliver, Delivers}, ChanState) of
+        {ok, NChanState} ->
+            keep_state(State#state{chan_state = NChanState});
+        {ok, Packets, NChanState} ->
+            NState = State#state{chan_state = NChanState},
+            handle_outgoing(Packets, fun keep_state/1, NState);
+        {stop, Reason, NChanState} ->
+            stop(Reason, State#state{chan_state = NChanState})
+    end.
+
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
 

+ 5 - 1
src/emqx_misc.erl

@@ -29,7 +29,8 @@
         , proc_stats/1
         ]).
 
--export([ drain_deliver/1
+-export([ drain_deliver/0
+        , drain_deliver/1
         , drain_down/1
         ]).
 
@@ -96,6 +97,9 @@ proc_stats(Pid) ->
     end.
 
 %% @doc Drain delivers from the channel's mailbox.
+drain_deliver() ->
+    drain_deliver([]).
+
 drain_deliver(Acc) ->
     receive
         Deliver = {deliver, _Topic, _Msg} ->

+ 30 - 10
src/emqx_session.erl

@@ -72,11 +72,13 @@
         ]).
 
 -export([ deliver/2
+        , enqueue/2
         , retry/1
         ]).
 
 -export([ takeover/1
         , resume/2
+        , redeliver/1
         ]).
 
 -export([expire/2]).
@@ -264,18 +266,30 @@ takeover(#session{subscriptions = Subs}) ->
                           ok = emqx_broker:unsubscribe(TopicFilter)
                   end, maps:to_list(Subs)).
 
--spec(resume(emqx_types:client_id(), session()) -> session()).
-resume(ClientId, Session = #session{subscriptions = Subs}) ->
+-spec(resume(emqx_types:client_id(), session()) -> ok).
+resume(ClientId, #session{subscriptions = Subs}) ->
     ?LOG(info, "Session is resumed."),
-    %% 1. Subscribe again
-    ok = lists:foreach(fun({TopicFilter, SubOpts}) ->
-                               ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
-                       end, maps:to_list(Subs)),
+    %% 1. Subscribe again.
+    lists:foreach(fun({TopicFilter, SubOpts}) ->
+                          ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
+                  end, maps:to_list(Subs)).
     %% 2. Run hooks.
-    ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(Session)]),
+    %% ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(Session)]),
     %% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages
-    %% noreply(dequeue(retry_delivery(true, State1)));
-    Session.
+    %%Session.
+
+redeliver(Session = #session{inflight = Inflight}) ->
+    Publishes = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
+                                  {pubrel, PacketId, ?RC_SUCCESS};
+                             ({PacketId, {Msg, _Ts}}) ->
+                                  {publish, PacketId, Msg}
+                          end, emqx_inflight:to_list(Inflight)),
+    case dequeue(Session) of
+        {ok, NSession} ->
+            {ok, Publishes, NSession};
+        {ok, More, NSession} ->
+            {ok, lists:append(Publishes, More), NSession}
+    end.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: SUBSCRIBE
@@ -501,7 +515,13 @@ deliver([Msg = #message{qos = QoS}|More], Acc,
             deliver(More, [Publish|Acc], next_pkt_id(Session1))
     end.
 
-enqueue(Msg, Session = #session{mqueue = Q}) ->
+enqueue(Delivers, Session = #session{subscriptions = Subs})
+  when is_list(Delivers) ->
+    Msgs = [enrich(get_subopts(Topic, Subs), Msg, Session)
+            || {deliver, Topic, Msg} <- Delivers],
+    lists:foldl(fun enqueue/2, Session, Msgs);
+
+enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
     emqx_pd:update_counter(enqueue_stats, 1),
     {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
     if