Kaynağa Gözat

feat(sessds): Support max subscriptions

ieQu1 1 yıl önce
ebeveyn
işleme
113a990482

+ 4 - 5
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -367,10 +367,10 @@ subscribe(
 subscribe(
 subscribe(
     TopicFilter,
     TopicFilter,
     SubOpts,
     SubOpts,
-    Session = #{id := ID, s := S0, props := #{upgrade_qos := UpgradeQoS}}
+    Session = #{id := ID}
 ) ->
 ) ->
     {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe(
     {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe(
-        TopicFilter, UpgradeQoS, SubOpts, S0
+        TopicFilter, SubOpts, Session
     ),
     ),
     case UpdateRouter of
     case UpdateRouter of
         true ->
         true ->
@@ -379,9 +379,8 @@ subscribe(
             ok
             ok
     end,
     end,
     S = emqx_persistent_session_ds_state:commit(S1),
     S = emqx_persistent_session_ds_state:commit(S1),
-    ?tp(persistent_session_ds_subscription_added, #{
-        topic_filter => TopicFilter, is_new => UpdateRouter
-    }),
+    UpdateRouter andalso
+        ?tp(persistent_session_ds_subscription_added, #{topic_filter => TopicFilter, session => ID}),
     {ok, Session#{s => S}}.
     {ok, Session#{s => S}}.
 
 
 -spec unsubscribe(topic_filter(), session()) ->
 -spec unsubscribe(topic_filter(), session()) ->

+ 26 - 15
apps/emqx/src/emqx_persistent_session_ds_subs.erl

@@ -25,7 +25,7 @@
 
 
 %% API:
 %% API:
 -export([
 -export([
-    on_subscribe/4,
+    on_subscribe/3,
     on_unsubscribe/2,
     on_unsubscribe/2,
     gc/1,
     gc/1,
     lookup/2,
     lookup/2,
@@ -73,26 +73,37 @@
 %% @doc Process a new subscription
 %% @doc Process a new subscription
 -spec on_subscribe(
 -spec on_subscribe(
     emqx_persistent_session_ds:topic_filter(),
     emqx_persistent_session_ds:topic_filter(),
-    boolean(),
     emqx_types:subopts(),
     emqx_types:subopts(),
-    emqx_persistent_session_ds_state:t()
+    emqx_persistent_session_ds:session()
 ) ->
 ) ->
     {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}.
     {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}.
-on_subscribe(TopicFilter, UpgradeQoS, SubOpts, S0) ->
+on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) ->
+    #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props,
     case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
     case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
         undefined ->
         undefined ->
             %% This is a new subscription:
             %% This is a new subscription:
-            {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
-            {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
-            SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
-            S3 = emqx_persistent_session_ds_state:put_subscription_state(SStateId, SState, S2),
-            Subscription = #{
-                id => SubId,
-                current_state => SStateId,
-                start_time => now_ms()
-            },
-            S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Subscription, S3),
-            {true, S};
+            case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of
+                true ->
+                    {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
+                    {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
+                    SState = #{
+                        parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts
+                    },
+                    S3 = emqx_persistent_session_ds_state:put_subscription_state(
+                        SStateId, SState, S2
+                    ),
+                    Subscription = #{
+                        id => SubId,
+                        current_state => SStateId,
+                        start_time => now_ms()
+                    },
+                    S = emqx_persistent_session_ds_state:put_subscription(
+                        TopicFilter, Subscription, S3
+                    ),
+                    {true, S};
+                false ->
+                    {false, S0}
+            end;
         Sub0 = #{current_state := SStateId0, id := SubId} ->
         Sub0 = #{current_state := SStateId0, id := SubId} ->
             SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
             SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
             case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
             case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of