Просмотр исходного кода

refactor(session): update eviction channel session logic

The changes partially reflect `emqx_channel` changes with respect to
in-memory session specific logic. The difference is that eviction
channel does not replay post-takeover, instead enqueues messages.
Andrew Mayorov 2 лет назад
Родитель
Сommit
45d44df11d

+ 1 - 1
apps/emqx/src/emqx_channel.erl

@@ -1177,7 +1177,7 @@ handle_call(
     ok = emqx_session_mem:takeover(Session),
     %% TODO: Should not drain deliver here (side effect)
     Delivers = emqx_utils:drain_deliver(),
-    AllPendings = lists:append(Delivers, Pendings),
+    AllPendings = lists:append(Pendings, maybe_nack(Delivers)),
     ?tp(
         debug,
         emqx_channel_takeover_end,

+ 15 - 13
apps/emqx/src/emqx_session_mem.erl

@@ -100,7 +100,8 @@
     resume/2,
     enqueue/3,
     dequeue/2,
-    replay/2
+    replay/2,
+    dedup/4
 ]).
 
 %% Export for CT
@@ -669,19 +670,10 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
 -spec replay(emqx_types:clientinfo(), [emqx_types:message()], session()) ->
     {ok, replies(), session()}.
 replay(ClientInfo, Pendings, Session) ->
-    PendingsLocal = emqx_session:enrich_delivers(
-        ClientInfo,
-        emqx_utils:drain_deliver(),
-        Session
-    ),
-    PendingsLocal1 = lists:filter(
-        fun(Msg) -> not lists:keymember(Msg#message.id, #message.id, Pendings) end,
-        PendingsLocal
-    ),
+    PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session),
     {ok, PubsResendQueued, Session1} = replay(ClientInfo, Session),
-    {ok, Pubs1, Session2} = deliver(ClientInfo, Pendings, Session1),
-    {ok, Pubs2, Session3} = deliver(ClientInfo, PendingsLocal1, Session2),
-    {ok, append(append(PubsResendQueued, Pubs1), Pubs2), Session3}.
+    {ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1),
+    {ok, append(PubsResendQueued, PubsPending), Session2}.
 
 -spec replay(emqx_types:clientinfo(), session()) ->
     {ok, replies(), session()}.
@@ -698,6 +690,16 @@ replay(ClientInfo, Session) ->
     {ok, More, Session1} = dequeue(ClientInfo, Session),
     {ok, append(PubsResend, More), Session1}.
 
+-spec dedup(clientinfo(), [emqx_types:message()], [emqx_types:deliver()], session()) ->
+    [emqx_types:message()].
+dedup(ClientInfo, Pendings, DeliversLocal, Session) ->
+    PendingsLocal1 = emqx_session:enrich_delivers(ClientInfo, DeliversLocal, Session),
+    PendingsLocal2 = lists:filter(
+        fun(Msg) -> not lists:keymember(Msg#message.id, #message.id, Pendings) end,
+        PendingsLocal1
+    ),
+    append(Pendings, PendingsLocal2).
+
 append(L1, []) -> L1;
 append(L1, L2) -> L1 ++ L2.
 

+ 11 - 10
apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl

@@ -7,7 +7,6 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_channel.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/types.hrl").
 
@@ -122,7 +121,9 @@ handle_call(
         pendings := Pendings
     } = Channel
 ) ->
-    ok = emqx_session:takeover(Session),
+    % NOTE
+    % This is essentially part of `emqx_session_mem` logic, thus call it directly.
+    ok = emqx_session_mem:takeover(Session),
     %% TODO: Should not drain deliver here (side effect)
     Delivers = emqx_utils:drain_deliver(),
     AllPendings = lists:append(Delivers, Pendings),
@@ -196,8 +197,11 @@ handle_deliver(
         clientinfo := ClientInfo
     } = Channel
 ) ->
+    % NOTE
+    % This is essentially part of `emqx_session_mem` logic, thus call it directly.
     Delivers1 = emqx_channel:maybe_nack(Delivers),
-    NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session),
+    Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
+    NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
     Channel#{session := NSession}.
 
 cancel_expiry_timer(#{expiry_timer := TRef}) when is_reference(TRef) ->
@@ -230,7 +234,7 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) ->
                 }
             ),
             {error, no_session};
-        {ok, #{session := Session, present := true, pendings := Pendings0}} ->
+        {ok, #{session := Session, present := true, replay := Pendings}} ->
             ?SLOG(
                 info,
                 #{
@@ -239,12 +243,9 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) ->
                     node => node()
                 }
             ),
-            Pendings1 = lists:usort(lists:append(Pendings0, emqx_utils:drain_deliver())),
-            NSession = emqx_session:enqueue(
-                ClientInfo,
-                emqx_channel:maybe_nack(Pendings1),
-                Session
-            ),
+            DeliversLocal = emqx_channel:maybe_nack(emqx_utils:drain_deliver()),
+            PendingsAll = emqx_session_mem:dedup(ClientInfo, Pendings, DeliversLocal, Session),
+            NSession = emqx_session_mem:enqueue(ClientInfo, PendingsAll, Session),
             NChannel = Channel#{session => NSession},
             ok = emqx_cm:insert_channel_info(ClientId, info(NChannel), stats(NChannel)),
             ?SLOG(