Browse Source

feat: add exclusive subscription

firest 3 năm trước cách đây
mục cha
commit
af5bf52ddf

+ 12 - 1
apps/emqx/i18n/emqx_schema_i18n.conf

@@ -754,6 +754,17 @@ mqtt 下所有的配置作为全局的默认值存在,它可以被 <code>zone<
         }
     }
 
+    mqtt_exclusive_subscription {
+        desc {
+            en: """Whether to enable support for MQTT exclusive subscription."""
+            zh: """是否启用对 MQTT 独占订阅的支持。"""
+        }
+        label: {
+            en: """Exclusive Subscription Available"""
+            zh: """独占订阅可用"""
+        }
+    }
+
     mqtt_ignore_loop_deliver {
         desc {
             en: """Ignore loop delivery of messages for MQTT v3.1.1/v3.1.0, similar to <code>No Local</code> subscription option in MQTT 5.0"""
@@ -2066,7 +2077,7 @@ Type of the rate limit.
 base_listener_enable_authn {
     desc {
         en: """
-Set <code>true</code> (default) to enable client authentication on this listener. 
+Set <code>true</code> (default) to enable client authentication on this listener.
 When set to <code>false</code> clients will be allowed to connect without authentication.
 """
         zh: """

+ 2 - 1
apps/emqx/src/emqx_broker.erl

@@ -196,7 +196,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
     true = ets:delete(?SUBOPTION, {SubPid, Topic}),
     true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
     Group = maps:get(share, SubOpts, undefined),
-    do_unsubscribe(Group, Topic, SubPid, SubOpts).
+    do_unsubscribe(Group, Topic, SubPid, SubOpts),
+    emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
 
 do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
     case maps:get(shard, SubOpts, 0) of

+ 2 - 2
apps/emqx/src/emqx_channel.erl

@@ -1865,8 +1865,8 @@ check_sub_authzs([], _Channel, Acc) ->
 %%--------------------------------------------------------------------
 %% Check Sub Caps
 
-check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
-    emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
+check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
+    emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
 
 %%--------------------------------------------------------------------
 %% Enrich SubId

+ 106 - 0
apps/emqx/src/emqx_exclusive_subscription.erl

@@ -0,0 +1,106 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exclusive_subscription).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-logger_header("[exclusive]").
+
+%% Mnesia bootstrap
+-export([mnesia/1]).
+
+-boot_mnesia({mnesia, [boot]}).
+-copy_mnesia({mnesia, [copy]}).
+
+-export([
+    check_subscribe/2,
+    unsubscribe/2
+]).
+
+-record(exclusive_subscription, {
+    topic :: emqx_types:topic(),
+    clientid :: emqx_types:clientid()
+}).
+
+-define(TAB, emqx_exclusive_subscription).
+-define(EXCLUSIVE_SHARD, emqx_exclusive_shard).
+
+%%--------------------------------------------------------------------
+%% Mnesia bootstrap
+%%--------------------------------------------------------------------
+
+mnesia(boot) ->
+    StoreProps = [
+        {ets, [
+            {read_concurrency, true},
+            {write_concurrency, true}
+        ]}
+    ],
+    ok = mria:create_table(?TAB, [
+        {rlog_shard, ?EXCLUSIVE_SHARD},
+        {type, set},
+        {storage, ram_copies},
+        {record_name, exclusive_subscription},
+        {attributes, record_info(fields, exclusive_subscription)},
+        {storage_properties, StoreProps}
+    ]),
+    ok = mria_rlog:wait_for_shards([?EXCLUSIVE_SHARD], infinity).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
+    allow | deny.
+check_subscribe(#{clientid := ClientId}, Topic) ->
+    Fun = fun() ->
+        try_subscribe(ClientId, Topic)
+    end,
+    case mnesia:transaction(Fun) of
+        {atomic, Res} ->
+            Res;
+        {aborted, Reason} ->
+            ?SLOG(warning, #{
+                msg => "Cannot check subscribe ~p due to ~p.", topic => Topic, reason => Reason
+            }),
+            deny
+    end.
+
+unsubscribe(Topic, #{is_exclusive := true}) ->
+    _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end),
+    ok;
+unsubscribe(_Topic, _SubOpts) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+try_subscribe(ClientId, Topic) ->
+    case mnesia:wread({?TAB, Topic}) of
+        [] ->
+            mnesia:write(
+                ?TAB,
+                #exclusive_subscription{
+                    clientid = ClientId,
+                    topic = Topic
+                },
+                write
+            ),
+            allow;
+        [_] ->
+            deny
+    end.

+ 24 - 10
apps/emqx/src/emqx_mqtt_caps.erl

@@ -38,7 +38,8 @@
     retain_available => boolean(),
     wildcard_subscription => boolean(),
     subscription_identifiers => boolean(),
-    shared_subscription => boolean()
+    shared_subscription => boolean(),
+    exclusive_subscription => boolean()
 }.
 
 -define(MAX_TOPIC_LEVELS, 65535).
@@ -53,7 +54,8 @@
     max_topic_levels,
     max_qos_allowed,
     wildcard_subscription,
-    shared_subscription
+    shared_subscription,
+    exclusive_subscription
 ]).
 
 -define(DEFAULT_CAPS, #{
@@ -65,7 +67,8 @@
     retain_available => true,
     wildcard_subscription => true,
     subscription_identifiers => true,
-    shared_subscription => true
+    shared_subscription => true,
+    exclusive_subscription => true
 }).
 
 -spec check_pub(
@@ -102,12 +105,12 @@ do_check_pub(_Flags, _Caps) ->
     ok.
 
 -spec check_sub(
-    emqx_types:zone(),
+    emqx_types:clientinfo(),
     emqx_types:topic(),
     emqx_types:subopts()
 ) ->
     ok_or_error(emqx_types:reason_code()).
-check_sub(Zone, Topic, SubOpts) ->
+check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) ->
     Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone)),
     Flags = lists:foldl(
         fun
@@ -117,6 +120,8 @@ check_sub(Zone, Topic, SubOpts) ->
                 Map#{is_wildcard => emqx_topic:wildcard(Topic)};
             (shared_subscription, Map) ->
                 Map#{is_shared => maps:is_key(share, SubOpts)};
+            (exclusive_subscription, Map) ->
+                Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)};
             %% Ignore
             (_Key, Map) ->
                 Map
@@ -124,17 +129,26 @@ check_sub(Zone, Topic, SubOpts) ->
         #{},
         maps:keys(Caps)
     ),
-    do_check_sub(Flags, Caps).
+    do_check_sub(Flags, Caps, ClientInfo, Topic).
 
-do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) when
+do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when
     Limit > 0, Levels > Limit
 ->
     {error, ?RC_TOPIC_FILTER_INVALID};
-do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
+do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) ->
     {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
-do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
+do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) ->
     {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
-do_check_sub(_Flags, _Caps) ->
+do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) ->
+    {error, ?RC_TOPIC_FILTER_INVALID};
+do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) ->
+    case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of
+        deny ->
+            {error, ?RC_QUOTA_EXCEEDED};
+        _ ->
+            ok
+    end;
+do_check_sub(_Flags, _Caps, _, _) ->
     ok.
 
 get_caps(Zone) ->

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -439,6 +439,14 @@ fields("mqtt") ->
                     desc => ?DESC(mqtt_shared_subscription)
                 }
             )},
+        {"exclusive_subscription",
+            sc(
+                boolean(),
+                #{
+                    default => true,
+                    desc => ?DESC(mqtt_exclusive_subscription)
+                }
+            )},
         {"ignore_loop_deliver",
             sc(
                 boolean(),

+ 7 - 0
apps/emqx/src/emqx_topic.erl

@@ -226,5 +226,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
                 _ -> error({invalid_topic_filter, TopicFilter})
             end
     end;
+parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) ->
+    case Topic of
+        <<>> ->
+            error({invalid_topic_filter, TopicFilter});
+        _ ->
+            {Topic, Options#{is_exclusive => true}}
+    end;
 parse(TopicFilter, Options) ->
     {TopicFilter, Options}.

+ 5 - 4
apps/emqx/test/emqx_mqtt_caps_SUITE.erl

@@ -55,17 +55,18 @@ t_check_sub(_) ->
     emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false),
     emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false),
     timer:sleep(50),
-    ok = emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts),
+    ClientInfo = #{zone => default},
+    ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
     ?assertEqual(
         {error, ?RC_TOPIC_FILTER_INVALID},
-        emqx_mqtt_caps:check_sub(default, <<"a/b/c/d">>, SubOpts)
+        emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)
     ),
     ?assertEqual(
         {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
-        emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts)
+        emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)
     ),
     ?assertEqual(
         {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
-        emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true})
+        emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})
     ),
     emqx_config:put([zones], OldConf).