Просмотр исходного кода

Merge pull request #11304 from keynslug/ft/EMQX-9593/ds-session-connect

refactor(session): minimize `emqx_session` interface surface
Andrew Mayorov 2 лет назад
Родитель
Сommit
f2a32e8ed2

+ 6 - 18
apps/emqx/src/emqx_channel.erl

@@ -924,18 +924,13 @@ handle_deliver(
     Delivers,
     Channel = #channel{
         takeover = true,
-        pendings = Pendings,
-        session = Session,
-        clientinfo = #{clientid := ClientId} = ClientInfo
+        pendings = Pendings
     }
 ) ->
     %% NOTE: Order is important here. While the takeover is in
     %% progress, the session cannot enqueue messages, since it already
     %% passed on the queue to the new connection in the session state.
-    NPendings = lists:append(
-        Pendings,
-        emqx_session:ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)
-    ),
+    NPendings = lists:append(Pendings, maybe_nack(Delivers)),
     {ok, Channel#channel{pendings = NPendings}};
 handle_deliver(
     Delivers,
@@ -943,12 +938,11 @@ handle_deliver(
         conn_state = disconnected,
         takeover = false,
         session = Session,
-        clientinfo = #{clientid := ClientId} = ClientInfo
+        clientinfo = ClientInfo
     }
 ) ->
     Delivers1 = maybe_nack(Delivers),
-    Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
-    NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
+    NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session),
     NChannel = Channel#channel{session = NSession},
     {ok, NChannel};
 handle_deliver(
@@ -956,16 +950,10 @@ handle_deliver(
     Channel = #channel{
         session = Session,
         takeover = false,
-        clientinfo = #{clientid := ClientId} = ClientInfo
+        clientinfo = ClientInfo
     }
 ) ->
-    case
-        emqx_session:deliver(
-            ClientInfo,
-            emqx_session:ignore_local(ClientInfo, Delivers, ClientId, Session),
-            Session
-        )
-    of
+    case emqx_session:deliver(ClientInfo, Delivers, Session) of
         {ok, Publishes, NSession} ->
             NChannel = Channel#channel{session = NSession},
             handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));

+ 21 - 31
apps/emqx/src/emqx_session.erl

@@ -64,7 +64,6 @@
 -export([
     info/1,
     info/2,
-    is_session/1,
     stats/1,
     obtain_next_pkt_id/1,
     get_mqueue/1
@@ -88,7 +87,6 @@
     enqueue/3,
     dequeue/2,
     filter_queue/2,
-    ignore_local/4,
     retry/2,
     terminate/3
 ]).
@@ -252,9 +250,6 @@ unpersist(Session) ->
 %% Info, Stats
 %%--------------------------------------------------------------------
 
-is_session(#session{}) -> true;
-is_session(_) -> false.
-
 %% @doc Get infos of the session.
 -spec info(session()) -> emqx_types:infos().
 info(Session) ->
@@ -309,27 +304,6 @@ info(created_at, #session{created_at = CreatedAt}) ->
 -spec stats(session()) -> emqx_types:stats().
 stats(Session) -> info(?STATS_KEYS, Session).
 
-%%--------------------------------------------------------------------
-%% Ignore local messages
-%%--------------------------------------------------------------------
-
-ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
-    Subs = info(subscriptions, Session),
-    lists:filter(
-        fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
-            case maps:find(Topic, Subs) of
-                {ok, #{nl := 1}} when Subscriber =:= Publisher ->
-                    ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
-                    ok = emqx_metrics:inc('delivery.dropped'),
-                    ok = emqx_metrics:inc('delivery.dropped.no_local'),
-                    false;
-                _ ->
-                    true
-            end
-        end,
-        Delivers
-    ).
-
 %%--------------------------------------------------------------------
 %% Client -> Broker: SUBSCRIBE
 %%--------------------------------------------------------------------
@@ -610,7 +584,10 @@ deliver_msg(
             MarkedMsg = mark_begin_deliver(Msg),
             Inflight1 = emqx_inflight:insert(PacketId, with_ts(MarkedMsg), Inflight),
             {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
-    end.
+    end;
+deliver_msg(ClientInfo, {drop, Msg, Reason}, Session) ->
+    handle_dropped(ClientInfo, Msg, Reason, Session),
+    {ok, Session}.
 
 -spec enqueue(
     emqx_types:clientinfo(),
@@ -629,7 +606,10 @@ enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
 enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) ->
     {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
     (Dropped =/= undefined) andalso handle_dropped(ClientInfo, Dropped, Session),
-    Session#session{mqueue = NewQ}.
+    Session#session{mqueue = NewQ};
+enqueue(ClientInfo, {drop, Msg, Reason}, Session) ->
+    handle_dropped(ClientInfo, Msg, Reason, Session),
+    Session.
 
 handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
     Payload = emqx_message:to_log_map(Msg),
@@ -666,8 +646,18 @@ handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mq
             )
     end.
 
+handle_dropped(ClientInfo, Msg, Reason, _Session) ->
+    ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, Reason]),
+    ok = emqx_metrics:inc('delivery.dropped'),
+    ok = emqx_metrics:inc('delivery.dropped.no_local').
+
 enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) ->
-    enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
+    enrich_deliver(Msg, maps:find(Topic, Subs), Session).
+
+enrich_deliver(Msg = #message{from = ClientId}, {ok, #{nl := 1}}, #session{clientid = ClientId}) ->
+    {drop, Msg, no_local};
+enrich_deliver(Msg, SubOpts, Session) ->
+    enrich_subopts(mk_subopts(SubOpts), Msg, Session).
 
 maybe_ack(Msg) ->
     emqx_shared_sub:maybe_ack(Msg).
@@ -675,8 +665,8 @@ maybe_ack(Msg) ->
 maybe_nack(Msg) ->
     emqx_shared_sub:maybe_nack_dropped(Msg).
 
-get_subopts(Topic, SubMap) ->
-    case maps:find(Topic, SubMap) of
+mk_subopts(SubOpts) ->
+    case SubOpts of
         {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
             [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}];
         {ok, #{nl := Nl, qos := QoS, rap := Rap}} ->

+ 7 - 9
apps/emqx/src/persistent_session/emqx_persistent_session.erl

@@ -272,7 +272,7 @@ remove_subscription(_TopicFilter, _SessionID, false = _IsPersistent) ->
 %% Must be called inside a emqx_cm_locker transaction.
 -spec resume(emqx_types:clientinfo(), emqx_types:conninfo(), emqx_session:session()) ->
     {emqx_session:session(), [emqx_types:deliver()]}.
-resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
+resume(ClientInfo, ConnInfo, Session) ->
     SessionID = emqx_session:info(id, Session),
     ?tp(ps_resuming, #{from => db, sid => SessionID}),
 
@@ -281,7 +281,6 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
     %% 1. Get pending messages from DB.
     ?tp(ps_initial_pendings, #{sid => SessionID}),
     Pendings1 = pending(SessionID),
-    Pendings2 = emqx_session:ignore_local(ClientInfo, Pendings1, ClientID, Session),
     ?tp(ps_got_initial_pendings, #{
         sid => SessionID,
         msgs => Pendings1
@@ -290,11 +289,11 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
     %% 2. Enqueue messages to mimic that the process was alive
     %%    when the messages were delivered.
     ?tp(ps_persist_pendings, #{sid => SessionID}),
-    Session1 = emqx_session:enqueue(ClientInfo, Pendings2, Session),
+    Session1 = emqx_session:enqueue(ClientInfo, Pendings1, Session),
     Session2 = persist(ClientInfo, ConnInfo, Session1),
-    mark_as_delivered(SessionID, Pendings2),
+    mark_as_delivered(SessionID, Pendings1),
     ?tp(ps_persist_pendings_msgs, #{
-        msgs => Pendings2,
+        msgs => Pendings1,
         sid => SessionID
     }),
 
@@ -312,11 +311,10 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
     %% 5. Get pending messages from DB until we find all markers.
     ?tp(ps_marker_pendings, #{sid => SessionID}),
     MarkerIDs = [Marker || {_, Marker} <- NodeMarkers],
-    Pendings3 = pending(SessionID, MarkerIDs),
-    Pendings4 = emqx_session:ignore_local(ClientInfo, Pendings3, ClientID, Session),
+    Pendings2 = pending(SessionID, MarkerIDs),
     ?tp(ps_marker_pendings_msgs, #{
         sid => SessionID,
-        msgs => Pendings4
+        msgs => Pendings2
     }),
 
     %% 6. Get pending messages from writers.
@@ -329,7 +327,7 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
 
     %% 7. Drain the inbox and usort the messages
     %%    with the pending messages. (Should be done by caller.)
-    {Session2, Pendings4 ++ WriterPendings}.
+    {Session2, Pendings2 ++ WriterPendings}.
 
 resume_begin(Nodes, SessionID) ->
     Res = emqx_persistent_session_proto_v1:resume_begin(Nodes, self(), SessionID),

+ 8 - 5
apps/emqx/test/emqx_channel_SUITE.erl

@@ -584,7 +584,7 @@ t_handle_deliver(_) ->
 
 t_handle_deliver_nl(_) ->
     ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
-    Session = session(#{subscriptions => #{<<"t1">> => #{nl => 1}}}),
+    Session = session(ClientInfo, #{subscriptions => #{<<"t1">> => #{nl => 1}}}),
     Channel = channel(#{clientinfo => ClientInfo, session => Session}),
     Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
     NMsg = emqx_message:set_flag(nl, Msg),
@@ -1071,11 +1071,14 @@ connpkt(Props) ->
         password = <<"passwd">>
     }.
 
-session() -> session(#{}).
-session(InitFields) when is_map(InitFields) ->
+session() -> session(#{zone => default, clientid => <<"fake-test">>}, #{}).
+session(InitFields) -> session(#{zone => default, clientid => <<"fake-test">>}, InitFields).
+session(ClientInfo, InitFields) when is_map(InitFields) ->
     Conf = emqx_cm:get_session_confs(
-        #{zone => default, clientid => <<"fake-test">>}, #{
-            receive_maximum => 0, expiry_interval => 0
+        ClientInfo,
+        #{
+            receive_maximum => 0,
+            expiry_interval => 0
         }
     ),
     Session = emqx_session:init(Conf),

+ 5 - 16
apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl

@@ -180,30 +180,24 @@ handle_deliver(
     Delivers,
     #{
         takeover := true,
-        pendings := Pendings,
-        session := Session,
-        clientinfo := #{clientid := ClientId} = ClientInfo
+        pendings := Pendings
     } = Channel
 ) ->
     %% NOTE: Order is important here. While the takeover is in
     %% progress, the session cannot enqueue messages, since it already
     %% passed on the queue to the new connection in the session state.
-    NPendings = lists:append(
-        Pendings,
-        emqx_session:ignore_local(ClientInfo, emqx_channel:maybe_nack(Delivers), ClientId, Session)
-    ),
+    NPendings = lists:append(Pendings, emqx_channel:maybe_nack(Delivers)),
     Channel#{pendings => NPendings};
 handle_deliver(
     Delivers,
     #{
         takeover := false,
         session := Session,
-        clientinfo := #{clientid := ClientId} = ClientInfo
+        clientinfo := ClientInfo
     } = Channel
 ) ->
     Delivers1 = emqx_channel:maybe_nack(Delivers),
-    Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
-    NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
+    NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session),
     Channel#{session := NSession}.
 
 cancel_expiry_timer(#{expiry_timer := TRef}) when is_reference(TRef) ->
@@ -248,12 +242,7 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) ->
             Pendings1 = lists:usort(lists:append(Pendings0, emqx_utils:drain_deliver())),
             NSession = emqx_session:enqueue(
                 ClientInfo,
-                emqx_session:ignore_local(
-                    ClientInfo,
-                    emqx_channel:maybe_nack(Pendings1),
-                    ClientId,
-                    Session
-                ),
+                emqx_channel:maybe_nack(Pendings1),
                 Session
             ),
             NChannel = Channel#{session => NSession},