Просмотр исходного кода

refactor(sessds): Move all subscription logic to the subs module

ieQu1 1 год назад
Родитель
Сommit
f1e6565ddd

+ 21 - 48
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -368,52 +368,31 @@ subscribe(
 subscribe(
     TopicFilter,
     SubOpts,
-    Session = #{id := ID}
+    Session
 ) ->
-    {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe(
-        TopicFilter, SubOpts, Session
-    ),
-    case UpdateRouter of
-        true ->
-            ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID);
-        false ->
-            ok
-    end,
-    S = emqx_persistent_session_ds_state:commit(S1),
-    UpdateRouter andalso
-        ?tp(persistent_session_ds_subscription_added, #{topic_filter => TopicFilter, session => ID}),
-    {ok, Session#{s => S}}.
+    case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of
+        {ok, S1} ->
+            S = emqx_persistent_session_ds_state:commit(S1),
+            {ok, Session#{s => S}};
+        Error = {error, _} ->
+            Error
+    end.
 
 -spec unsubscribe(topic_filter(), session()) ->
     {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
 unsubscribe(
     TopicFilter,
-    Session = #{id := ID, s := S0}
+    Session = #{id := SessionId, s := S0}
 ) ->
-    case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
-        undefined ->
-            {error, ?RC_NO_SUBSCRIPTION_EXISTED};
-        Subscription = #{subopts := SubOpts} ->
-            S1 = do_unsubscribe(ID, TopicFilter, Subscription, S0),
-            S = emqx_persistent_session_ds_state:commit(S1),
-            {ok, Session#{s => S}, SubOpts}
+    case emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0) of
+        {ok, S1, #{id := SubId, subopts := SubOpts}} ->
+            S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
+            S = emqx_persistent_session_ds_state:commit(S2),
+            {ok, Session#{s => S}, SubOpts};
+        Error = {error, _} ->
+            Error
     end.
 
--spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
-    emqx_persistent_session_ds_state:t().
-do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) ->
-    S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, S0),
-    ?tp(persistent_session_ds_subscription_delete, #{
-        session_id => SessionId, topic_filter => TopicFilter
-    }),
-    S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
-    ?tp_span(
-        persistent_session_ds_subscription_route_delete,
-        #{session_id => SessionId, topic_filter => TopicFilter},
-        ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
-    ),
-    S.
-
 -spec get_subscription(topic_filter(), session()) ->
     emqx_types:subopts() | undefined.
 get_subscription(#share{}, _) ->
@@ -860,18 +839,12 @@ session_ensure_new(
 %% @doc Called when a client reconnects with `clean session=true' or
 %% during session GC
 -spec session_drop(id(), _Reason) -> ok.
-session_drop(ID, Reason) ->
-    case emqx_persistent_session_ds_state:open(ID) of
+session_drop(SessionId, Reason) ->
+    case emqx_persistent_session_ds_state:open(SessionId) of
         {ok, S0} ->
-            ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}),
-            _S = emqx_persistent_session_ds_subs:fold(
-                fun(TopicFilter, Subscription, S) ->
-                    do_unsubscribe(ID, TopicFilter, Subscription, S)
-                end,
-                S0,
-                S0
-            ),
-            emqx_persistent_session_ds_state:delete(ID);
+            ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}),
+            emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0),
+            emqx_persistent_session_ds_state:delete(SessionId);
         undefined ->
             ok
     end.

+ 45 - 10
apps/emqx/src/emqx_persistent_session_ds_subs.erl

@@ -26,7 +26,8 @@
 %% API:
 -export([
     on_subscribe/3,
-    on_unsubscribe/2,
+    on_unsubscribe/3,
+    on_session_drop/2,
     gc/1,
     lookup/2,
     to_map/1,
@@ -41,6 +42,8 @@
 -export_type([subscription_state_id/0, subscription/0, subscription_state/0]).
 
 -include("emqx_persistent_session_ds.hrl").
+-include("emqx_mqtt.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 %%================================================================================
 %% Type declarations
@@ -81,14 +84,15 @@
     emqx_types:subopts(),
     emqx_persistent_session_ds:session()
 ) ->
-    {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}.
-on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) ->
+    {ok, emqx_persistent_session_ds_state:t()} | {error, ?RC_QUOTA_EXCEEDED}.
+on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props}) ->
     #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props,
     case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
         undefined ->
             %% This is a new subscription:
             case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of
                 true ->
+                    ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
                     {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
                     {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
                     SState = #{
@@ -105,16 +109,19 @@ on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) ->
                     S = emqx_persistent_session_ds_state:put_subscription(
                         TopicFilter, Subscription, S3
                     ),
-                    {true, S};
+                    ?tp(persistent_session_ds_subscription_added, #{
+                        topic_filter => TopicFilter, session => SessionId
+                    }),
+                    {ok, S};
                 false ->
-                    {false, S0}
+                    {error, ?RC_QUOTA_EXCEEDED}
             end;
         Sub0 = #{current_state := SStateId0, id := SubId} ->
             SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
             case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
                 SState ->
                     %% Client resubscribed with the same parameters:
-                    {false, S0};
+                    {ok, S0};
                 _ ->
                     %% Subsription parameters changed:
                     {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0),
@@ -123,18 +130,46 @@ on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) ->
                     ),
                     Sub = Sub0#{current_state => SStateId},
                     S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
-                    {false, S}
+                    {ok, S}
             end
     end.
 
 %% @doc Process UNSUBSCRIBE
 -spec on_unsubscribe(
+    emqx_persistent_session_ds:id(),
     emqx_persistent_session_ds:topic_filter(),
     emqx_persistent_session_ds_state:t()
 ) ->
-    emqx_persistent_session_ds_state:t().
-on_unsubscribe(TopicFilter, S0) ->
-    emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0).
+    {ok, emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()}
+    | {error, ?RC_NO_SUBSCRIPTION_EXISTED}.
+on_unsubscribe(SessionId, TopicFilter, S0) ->
+    case lookup(TopicFilter, S0) of
+        undefined ->
+            {error, ?RC_NO_SUBSCRIPTION_EXISTED};
+        Subscription ->
+            ?tp(persistent_session_ds_subscription_delete, #{
+                session_id => SessionId, topic_filter => TopicFilter
+            }),
+            ?tp_span(
+                persistent_session_ds_subscription_route_delete,
+                #{session_id => SessionId, topic_filter => TopicFilter},
+                ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
+            ),
+            {ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription}
+    end.
+
+-spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok.
+on_session_drop(SessionId, S0) ->
+    fold(
+        fun(TopicFilter, _Subscription, S) ->
+            case on_unsubscribe(SessionId, TopicFilter, S) of
+                {ok, S1, _} -> S1;
+                _ -> S
+            end
+        end,
+        S0,
+        S0
+    ).
 
 %% @doc Remove subscription states that don't have a parent, and that
 %% don't have any unacked messages: