Sfoglia il codice sorgente

fix: the exclusive topics aren't removed when the session has already been cleaned

firest 3 anni fa
parent
commit
3bdffca488

+ 3 - 10
apps/emqx/src/emqx_broker.erl

@@ -196,13 +196,13 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
     true = ets:delete(?SUBOPTION, {Topic, SubPid}),
     true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
     Group = maps:get(share, SubOpts, undefined),
-    do_unsubscribe(Group, Topic, SubPid, SubOpts),
-    emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
+    do_unsubscribe(Group, Topic, SubPid, SubOpts).
 
 do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
     case maps:get(shard, SubOpts, 0) of
         0 ->
             true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
+            emqx_exclusive_subscription:unsubscribe(Topic, SubOpts),
             cast(pick(Topic), {unsubscribed, Topic});
         I ->
             true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
@@ -366,14 +366,7 @@ subscriber_down(SubPid) ->
                 SubOpts when is_map(SubOpts) ->
                     _ = emqx_broker_helper:reclaim_seq(Topic),
                     true = ets:delete(?SUBOPTION, {Topic, SubPid}),
-                    case maps:get(shard, SubOpts, 0) of
-                        0 ->
-                            true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
-                            ok = cast(pick(Topic), {unsubscribed, Topic});
-                        I ->
-                            true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
-                            ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
-                    end;
+                    do_unsubscribe(undefined, Topic, SubPid, SubOpts);
                 undefined ->
                     ok
             end

+ 6 - 2
apps/emqx/src/emqx_exclusive_subscription.erl

@@ -32,7 +32,8 @@
 
 -export([
     check_subscribe/2,
-    unsubscribe/2
+    unsubscribe/2,
+    clear/0
 ]).
 
 %% Internal exports (RPC)
@@ -77,7 +78,7 @@ on_add_module() ->
     mnesia(boot).
 
 on_delete_module() ->
-    mria:clear_table(?EXCLUSIVE_SHARD).
+    clear().
 
 %%--------------------------------------------------------------------
 %% APIs
@@ -101,6 +102,9 @@ unsubscribe(Topic, #{is_exclusive := true}) ->
 unsubscribe(_Topic, _SubOpts) ->
     ok.
 
+clear() ->
+    mria:clear_table(?TAB).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

+ 159 - 0
apps/emqx/test/emqx_exclusive_sub_SUITE.erl

@@ -0,0 +1,159 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exclusive_sub_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(EXCLUSIVE_TOPIC, <<"$exclusive/t/1">>).
+-define(NORMAL_TOPIC, <<"t/1">>).
+
+-define(CHECK_SUB(Client, Code), ?CHECK_SUB(Client, ?EXCLUSIVE_TOPIC, Code)).
+-define(CHECK_SUB(Client, Topic, Code),
+    {ok, _, [Code]} = emqtt:subscribe(Client, Topic, [])
+).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_common_test_helpers:start_apps([]),
+    ok = ekka:start(),
+    OldConf = emqx:get_config([zones], #{}),
+    emqx_config:put_zone_conf(default, [mqtt, exclusive_subscription], true),
+    timer:sleep(50),
+    [{old_conf, OldConf} | Config].
+
+end_per_suite(Config) ->
+    emqx_config:put([zones], proplists:get_value(old_conf, Config)),
+    ekka:stop(),
+    mria:stop(),
+    mria_mnesia:delete_schema(),
+    emqx_common_test_helpers:stop_apps([]).
+
+end_per_testcase(_TestCase, _Config) ->
+    emqx_exclusive_subscription:clear().
+
+t_exclusive_sub(_) ->
+    {ok, C1} = emqtt:start_link([
+        {clientid, <<"client1">>},
+        {clean_start, false},
+        {proto_ver, v5},
+        {properties, #{'Session-Expiry-Interval' => 100}}
+    ]),
+    {ok, _} = emqtt:connect(C1),
+    ?CHECK_SUB(C1, 0),
+
+    {ok, C2} = emqtt:start_link([
+        {clientid, <<"client2">>},
+        {clean_start, false},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C2),
+    ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
+
+    %% keep exclusive even disconnected
+    ok = emqtt:disconnect(C1),
+    timer:sleep(1000),
+
+    ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
+
+    ok = emqtt:disconnect(C2).
+
+t_allow_normal_sub(_) ->
+    {ok, C1} = emqtt:start_link([
+        {clientid, <<"client1">>},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C1),
+    ?CHECK_SUB(C1, 0),
+
+    {ok, C2} = emqtt:start_link([
+        {clientid, <<"client2">>},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C2),
+    ?CHECK_SUB(C2, ?NORMAL_TOPIC, 0),
+
+    ok = emqtt:disconnect(C1),
+    ok = emqtt:disconnect(C2).
+
+t_unsub(_) ->
+    {ok, C1} = emqtt:start_link([
+        {clientid, <<"client1">>},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C1),
+    ?CHECK_SUB(C1, 0),
+
+    {ok, C2} = emqtt:start_link([
+        {clientid, <<"client2">>},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C2),
+    ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
+
+    {ok, #{}, [0]} = emqtt:unsubscribe(C1, ?EXCLUSIVE_TOPIC),
+
+    ?CHECK_SUB(C2, 0),
+
+    ok = emqtt:disconnect(C1),
+    ok = emqtt:disconnect(C2).
+
+t_clean_session(_) ->
+    erlang:process_flag(trap_exit, true),
+    {ok, C1} = emqtt:start_link([
+        {clientid, <<"client1">>},
+        {clean_start, true},
+        {proto_ver, v5},
+        {properties, #{'Session-Expiry-Interval' => 0}}
+    ]),
+    {ok, _} = emqtt:connect(C1),
+    ?CHECK_SUB(C1, 0),
+
+    {ok, C2} = emqtt:start_link([
+        {clientid, <<"client2">>},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C2),
+    ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
+
+    %% auto clean when session was cleand
+    ok = emqtt:disconnect(C1),
+
+    timer:sleep(1000),
+
+    ?CHECK_SUB(C2, 0),
+
+    ok = emqtt:disconnect(C2).
+
+t_feat_disabled(_) ->
+    OldConf = emqx:get_config([zones], #{}),
+    emqx_config:put_zone_conf(default, [mqtt, exclusive_subscription], false),
+
+    {ok, C1} = emqtt:start_link([
+        {clientid, <<"client1">>},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(C1),
+    ?CHECK_SUB(C1, ?RC_TOPIC_FILTER_INVALID),
+    ok = emqtt:disconnect(C1),
+
+    emqx_config:put([zones], OldConf).