Преглед на файлове

Merge pull request #10976 from JimMoen/fix-shared-sub-unsub

JimMoen преди 2 години
родител
ревизия
a2015f37ae
променени са 32 файла, в които са добавени 666 реда и са изтрити 367 реда
  1. 1 1
      README-CN.md
  2. 0 3
      apps/emqx/include/emqx.hrl
  3. 14 6
      apps/emqx/include/emqx_mqtt.hrl
  4. 56 34
      apps/emqx/src/emqx_broker.erl
  5. 191 114
      apps/emqx/src/emqx_channel.erl
  6. 32 15
      apps/emqx/src/emqx_mountpoint.erl
  7. 12 3
      apps/emqx/src/emqx_mqtt_caps.erl
  8. 1 0
      apps/emqx/src/emqx_reason_codes.erl
  9. 9 3
      apps/emqx/src/emqx_session.erl
  10. 1 1
      apps/emqx/src/emqx_session_mem.erl
  11. 7 11
      apps/emqx/src/emqx_shared_sub.erl
  12. 69 20
      apps/emqx/src/emqx_topic.erl
  13. 8 2
      apps/emqx/src/emqx_types.erl
  14. 8 25
      apps/emqx/test/emqx_broker_SUITE.erl
  15. 9 2
      apps/emqx/test/emqx_channel_SUITE.erl
  16. 39 0
      apps/emqx/test/emqx_mountpoint_SUITE.erl
  17. 3 1
      apps/emqx/test/emqx_mqtt_caps_SUITE.erl
  18. 130 67
      apps/emqx/test/emqx_shared_sub_SUITE.erl
  19. 8 4
      apps/emqx/test/emqx_topic_SUITE.erl
  20. 1 1
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl
  21. 4 1
      apps/emqx_exhook/priv/protos/exhook.proto
  22. 1 1
      apps/emqx_exhook/src/emqx_exhook.app.src
  23. 11 5
      apps/emqx_exhook/src/emqx_exhook_handler.erl
  24. 1 2
      apps/emqx_exhook/test/props/prop_exhook_hooks.erl
  25. 2 9
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  26. 21 20
      apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl
  27. 6 12
      apps/emqx_management/src/emqx_mgmt_api_topics.erl
  28. 2 0
      apps/emqx_modules/src/emqx_rewrite.erl
  29. 2 1
      apps/emqx_retainer/src/emqx_retainer.erl
  30. 6 2
      apps/emqx_rule_engine/src/emqx_rule_events.erl
  31. 2 1
      apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl
  32. 9 0
      changes/ce/fix-10976.en.md

+ 1 - 1
README-CN.md

@@ -77,7 +77,7 @@ EMQX Cloud 文档:[docs.emqx.com/zh/cloud/latest/](https://docs.emqx.com/zh/cl
 
   优雅的跨平台 MQTT 5.0 客户端工具,提供了桌面端、命令行、Web 三种版本,帮助您更快的开发和调试 MQTT 服务和应用。
 
-- [车联网平台搭建从入门到精通 ](https://www.emqx.com/zh/blog/category/internet-of-vehicles)
+- [车联网平台搭建从入门到精通](https://www.emqx.com/zh/blog/category/internet-of-vehicles)
 
   结合 EMQ 在车联网领域的实践经验,从协议选择等理论知识,到平台架构设计等实战操作,分享如何搭建一个可靠、高效、符合行业场景需求的车联网平台。
 

+ 0 - 3
apps/emqx/include/emqx.hrl

@@ -39,9 +39,6 @@
 %% System topic
 -define(SYSTOP, <<"$SYS/">>).
 
-%% Queue topic
--define(QUEUE, <<"$queue/">>).
-
 %%--------------------------------------------------------------------
 %% alarms
 %%--------------------------------------------------------------------

+ 14 - 6
apps/emqx/include/emqx_mqtt.hrl

@@ -55,6 +55,17 @@
 %% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3]
 -define(MAX_TOPIC_LEN, 65535).
 
+%%--------------------------------------------------------------------
+%% MQTT Share-Sub Internal
+%%--------------------------------------------------------------------
+
+-record(share, {group :: emqx_types:group(), topic :: emqx_types:topic()}).
+
+%% guards
+-define(IS_TOPIC(T),
+    (is_binary(T) orelse is_record(T, share))
+).
+
 %%--------------------------------------------------------------------
 %% MQTT QoS Levels
 %%--------------------------------------------------------------------
@@ -661,13 +672,10 @@ end).
 -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}).
 
 -define(SHARE, "$share").
+-define(QUEUE, "$queue").
 -define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
--define(IS_SHARE(Topic),
-    case Topic of
-        <<?SHARE, _/binary>> -> true;
-        _ -> false
-    end
-).
+
+-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
 
 -define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty).
 -define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty).

+ 56 - 34
apps/emqx/src/emqx_broker.erl

@@ -118,18 +118,20 @@ create_tabs() ->
 %% Subscribe API
 %%------------------------------------------------------------------------------
 
--spec subscribe(emqx_types:topic()) -> ok.
-subscribe(Topic) when is_binary(Topic) ->
+-spec subscribe(emqx_types:topic() | emqx_types:share()) -> ok.
+subscribe(Topic) when ?IS_TOPIC(Topic) ->
     subscribe(Topic, undefined).
 
--spec subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok.
-subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) ->
+-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid() | emqx_types:subopts()) ->
+    ok.
+subscribe(Topic, SubId) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId) ->
     subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
-subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
+subscribe(Topic, SubOpts) when ?IS_TOPIC(Topic), is_map(SubOpts) ->
     subscribe(Topic, undefined, SubOpts).
 
--spec subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok.
-subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
+-spec subscribe(emqx_types:topic() | emqx_types:share(), emqx_types:subid(), emqx_types:subopts()) ->
+    ok.
+subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
     SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
     _ = emqx_trace:subscribe(Topic, SubId, SubOpts),
     SubPid = self(),
@@ -151,13 +153,13 @@ with_subid(undefined, SubOpts) ->
 with_subid(SubId, SubOpts) ->
     maps:put(subid, SubId, SubOpts).
 
-%% @private
 do_subscribe(Topic, SubPid, SubOpts) ->
     true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
-    Group = maps:get(share, SubOpts, undefined),
-    do_subscribe(Group, Topic, SubPid, SubOpts).
+    do_subscribe2(Topic, SubPid, SubOpts).
 
-do_subscribe(undefined, Topic, SubPid, SubOpts) ->
+do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) ->
+    %% FIXME: subscribe shard bug
+    %% https://emqx.atlassian.net/browse/EMQX-10214
     case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
         0 ->
             true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
@@ -168,34 +170,40 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) ->
             true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
             call(pick({Topic, I}), {subscribe, Topic, I})
     end;
-%% Shared subscription
-do_subscribe(Group, Topic, SubPid, SubOpts) ->
+do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
+    is_binary(RealTopic)
+->
     true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
-    emqx_shared_sub:subscribe(Group, Topic, SubPid).
+    emqx_shared_sub:subscribe(Group, RealTopic, SubPid).
 
 %%--------------------------------------------------------------------
 %% Unsubscribe API
 %%--------------------------------------------------------------------
 
--spec unsubscribe(emqx_types:topic()) -> ok.
-unsubscribe(Topic) when is_binary(Topic) ->
+-spec unsubscribe(emqx_types:topic() | emqx_types:share()) -> ok.
+unsubscribe(Topic) when ?IS_TOPIC(Topic) ->
     SubPid = self(),
     case ets:lookup(?SUBOPTION, {Topic, SubPid}) of
         [{_, SubOpts}] ->
-            _ = emqx_broker_helper:reclaim_seq(Topic),
             _ = emqx_trace:unsubscribe(Topic, SubOpts),
             do_unsubscribe(Topic, SubPid, SubOpts);
         [] ->
             ok
     end.
 
+-spec do_unsubscribe(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) ->
+    ok.
 do_unsubscribe(Topic, SubPid, SubOpts) ->
     true = ets:delete(?SUBOPTION, {Topic, SubPid}),
     true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
-    Group = maps:get(share, SubOpts, undefined),
-    do_unsubscribe(Group, Topic, SubPid, SubOpts).
-
-do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
+    do_unsubscribe2(Topic, SubPid, SubOpts).
+
+-spec do_unsubscribe2(emqx_types:topic() | emqx_types:share(), pid(), emqx_types:subopts()) ->
+    ok.
+do_unsubscribe2(Topic, SubPid, SubOpts) when
+    is_binary(Topic), is_pid(SubPid), is_map(SubOpts)
+->
+    _ = emqx_broker_helper:reclaim_seq(Topic),
     case maps:get(shard, SubOpts, 0) of
         0 ->
             true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
@@ -205,7 +213,9 @@ do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
             true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
             cast(pick({Topic, I}), {unsubscribed, Topic, I})
     end;
-do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
+do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
+    is_binary(Group), is_binary(Topic), is_pid(SubPid)
+->
     emqx_shared_sub:unsubscribe(Group, Topic, SubPid).
 
 %%--------------------------------------------------------------------
@@ -306,7 +316,9 @@ aggre([], true, Acc) ->
     lists:usort(Acc).
 
 %% @doc Forward message to another node.
--spec forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode :: sync | async) ->
+-spec forward(
+    node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async
+) ->
     emqx_types:deliver_result().
 forward(Node, To, Delivery, async) ->
     true = emqx_broker_proto_v1:forward_async(Node, To, Delivery),
@@ -329,7 +341,8 @@ forward(Node, To, Delivery, sync) ->
             Result
     end.
 
--spec dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result().
+-spec dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) ->
+    emqx_types:deliver_result().
 dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     case emqx:is_running() of
         true ->
@@ -353,7 +366,11 @@ inc_dropped_cnt(Msg) ->
     end.
 
 -compile({inline, [subscribers/1]}).
--spec subscribers(emqx_types:topic() | {shard, emqx_types:topic(), non_neg_integer()}) ->
+-spec subscribers(
+    emqx_types:topic()
+    | emqx_types:share()
+    | {shard, emqx_types:topic() | emqx_types:share(), non_neg_integer()}
+) ->
     [pid()].
 subscribers(Topic) when is_binary(Topic) ->
     lookup_value(?SUBSCRIBER, Topic, []);
@@ -372,7 +389,7 @@ subscriber_down(SubPid) ->
                 SubOpts when is_map(SubOpts) ->
                     _ = emqx_broker_helper:reclaim_seq(Topic),
                     true = ets:delete(?SUBOPTION, {Topic, SubPid}),
-                    do_unsubscribe(undefined, Topic, SubPid, SubOpts);
+                    do_unsubscribe2(Topic, SubPid, SubOpts);
                 undefined ->
                     ok
             end
@@ -386,7 +403,7 @@ subscriber_down(SubPid) ->
 %%--------------------------------------------------------------------
 
 -spec subscriptions(pid() | emqx_types:subid()) ->
-    [{emqx_types:topic(), emqx_types:subopts()}].
+    [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}].
 subscriptions(SubPid) when is_pid(SubPid) ->
     [
         {Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})}
@@ -400,20 +417,22 @@ subscriptions(SubId) ->
             []
     end.
 
--spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()].
+-spec subscriptions_via_topic(emqx_types:topic() | emqx_types:share()) -> [emqx_types:subopts()].
 subscriptions_via_topic(Topic) ->
     MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}],
     ets:select(?SUBOPTION, MatchSpec).
 
--spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean().
+-spec subscribed(
+    pid() | emqx_types:subid(), emqx_types:topic() | emqx_types:share()
+) -> boolean().
 subscribed(SubPid, Topic) when is_pid(SubPid) ->
     ets:member(?SUBOPTION, {Topic, SubPid});
 subscribed(SubId, Topic) when ?IS_SUBID(SubId) ->
     SubPid = emqx_broker_helper:lookup_subpid(SubId),
     ets:member(?SUBOPTION, {Topic, SubPid}).
 
--spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()).
-get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
+-spec get_subopts(pid(), emqx_types:topic() | emqx_types:share()) -> maybe(emqx_types:subopts()).
+get_subopts(SubPid, Topic) when is_pid(SubPid), ?IS_TOPIC(Topic) ->
     lookup_value(?SUBOPTION, {Topic, SubPid});
 get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
     case emqx_broker_helper:lookup_subpid(SubId) of
@@ -423,7 +442,7 @@ get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
             undefined
     end.
 
--spec set_subopts(emqx_types:topic(), emqx_types:subopts()) -> boolean().
+-spec set_subopts(emqx_types:topic() | emqx_types:share(), emqx_types:subopts()) -> boolean().
 set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
     set_subopts(self(), Topic, NewOpts).
 
@@ -437,7 +456,7 @@ set_subopts(SubPid, Topic, NewOpts) ->
             false
     end.
 
--spec topics() -> [emqx_types:topic()].
+-spec topics() -> [emqx_types:topic() | emqx_types:share()].
 topics() ->
     emqx_router:topics().
 
@@ -542,7 +561,8 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
--spec do_dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result().
+-spec do_dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) ->
+    emqx_types:deliver_result().
 do_dispatch(Topic, #delivery{message = Msg}) ->
     DispN = lists:foldl(
         fun(Sub, N) ->
@@ -560,6 +580,8 @@ do_dispatch(Topic, #delivery{message = Msg}) ->
             {ok, DispN}
     end.
 
+%% Donot dispatch to share subscriber here.
+%% we do it in `emqx_shared_sub.erl` with configured strategy
 do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
     case erlang:is_process_alive(SubPid) of
         true ->

+ 191 - 114
apps/emqx/src/emqx_channel.erl

@@ -476,60 +476,27 @@ handle_in(
             ok = emqx_metrics:inc('packets.pubcomp.missed'),
             {ok, Channel}
     end;
-handle_in(
-    SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
-    Channel = #channel{clientinfo = ClientInfo}
-) ->
-    case emqx_packet:check(SubPkt) of
-        ok ->
-            TopicFilters0 = parse_topic_filters(TopicFilters),
-            TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
-            TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel),
-            HasAuthzDeny = lists:any(
-                fun({_TopicFilter, ReasonCode}) ->
-                    ReasonCode =:= ?RC_NOT_AUTHORIZED
-                end,
-                TupleTopicFilters0
-            ),
-            DenyAction = emqx:get_config([authorization, deny_action], ignore),
-            case DenyAction =:= disconnect andalso HasAuthzDeny of
-                true ->
-                    handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
-                false ->
-                    TopicFilters2 = [
-                        TopicFilter
-                     || {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0
-                    ],
-                    TopicFilters3 = run_hooks(
-                        'client.subscribe',
-                        [ClientInfo, Properties],
-                        TopicFilters2
-                    ),
-                    {TupleTopicFilters1, NChannel} = process_subscribe(
-                        TopicFilters3,
-                        Properties,
-                        Channel
-                    ),
-                    TupleTopicFilters2 =
-                        lists:foldl(
-                            fun
-                                ({{Topic, Opts = #{deny_subscription := true}}, _QoS}, Acc) ->
-                                    Key = {Topic, maps:without([deny_subscription], Opts)},
-                                    lists:keyreplace(Key, 1, Acc, {Key, ?RC_UNSPECIFIED_ERROR});
-                                (Tuple = {Key, _Value}, Acc) ->
-                                    lists:keyreplace(Key, 1, Acc, Tuple)
-                            end,
-                            TupleTopicFilters0,
-                            TupleTopicFilters1
-                        ),
-                    ReasonCodes2 = [
-                        ReasonCode
-                     || {_TopicFilter, ReasonCode} <- TupleTopicFilters2
-                    ],
-                    handle_out(suback, {PacketId, ReasonCodes2}, NChannel)
-            end;
-        {error, ReasonCode} ->
-            handle_out(disconnect, ReasonCode, Channel)
+handle_in(SubPkt = ?SUBSCRIBE_PACKET(PacketId, _Properties, _TopicFilters0), Channel0) ->
+    Pipe = pipeline(
+        [
+            fun check_subscribe/2,
+            fun enrich_subscribe/2,
+            %% TODO && FIXME (EMQX-10786): mount topic before authz check.
+            fun check_sub_authzs/2,
+            fun check_sub_caps/2
+        ],
+        SubPkt,
+        Channel0
+    ),
+    case Pipe of
+        {ok, NPkt = ?SUBSCRIBE_PACKET(_PacketId, TFChecked), Channel} ->
+            {TFSubedWithNRC, NChannel} = process_subscribe(run_sub_hooks(NPkt, Channel), Channel),
+            ReasonCodes = gen_reason_codes(TFChecked, TFSubedWithNRC),
+            handle_out(suback, {PacketId, ReasonCodes}, NChannel);
+        {error, {disconnect, RC}, Channel} ->
+            %% funcs in pipeline always cause action: `disconnect`
+            %% And Only one ReasonCode in DISCONNECT packet
+            handle_out(disconnect, RC, Channel)
     end;
 handle_in(
     Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
@@ -540,7 +507,7 @@ handle_in(
             TopicFilters1 = run_hooks(
                 'client.unsubscribe',
                 [ClientInfo, Properties],
-                parse_topic_filters(TopicFilters)
+                parse_raw_topic_filters(TopicFilters)
             ),
             {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
             handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
@@ -782,32 +749,14 @@ after_message_acked(ClientInfo, Msg, PubAckProps) ->
 %% Process Subscribe
 %%--------------------------------------------------------------------
 
--compile({inline, [process_subscribe/3]}).
-process_subscribe(TopicFilters, SubProps, Channel) ->
-    process_subscribe(TopicFilters, SubProps, Channel, []).
+process_subscribe(TopicFilters, Channel) ->
+    process_subscribe(TopicFilters, Channel, []).
 
-process_subscribe([], _SubProps, Channel, Acc) ->
+process_subscribe([], Channel, Acc) ->
     {lists:reverse(Acc), Channel};
-process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) ->
-    case check_sub_caps(TopicFilter, SubOpts, Channel) of
-        ok ->
-            {ReasonCode, NChannel} = do_subscribe(
-                TopicFilter,
-                SubOpts#{sub_props => SubProps},
-                Channel
-            ),
-            process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]);
-        {error, ReasonCode} ->
-            ?SLOG(
-                warning,
-                #{
-                    msg => "cannot_subscribe_topic_filter",
-                    reason => emqx_reason_codes:name(ReasonCode)
-                },
-                #{topic => TopicFilter}
-            ),
-            process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc])
-    end.
+process_subscribe([Filter = {TopicFilter, SubOpts} | More], Channel, Acc) ->
+    {NReasonCode, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel),
+    process_subscribe(More, NChannel, [{Filter, NReasonCode} | Acc]).
 
 do_subscribe(
     TopicFilter,
@@ -818,11 +767,13 @@ do_subscribe(
             session = Session
         }
 ) ->
+    %% TODO && FIXME (EMQX-10786): mount topic before authz check.
     NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
-    NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
-    case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
+    case emqx_session:subscribe(ClientInfo, NTopicFilter, SubOpts, Session) of
         {ok, NSession} ->
-            {QoS, Channel#channel{session = NSession}};
+            %% TODO && FIXME (EMQX-11216): QoS as ReasonCode(max granted QoS) for now
+            RC = QoS,
+            {RC, Channel#channel{session = NSession}};
         {error, RC} ->
             ?SLOG(
                 warning,
@@ -835,6 +786,30 @@ do_subscribe(
             {RC, Channel}
     end.
 
+gen_reason_codes(TFChecked, TFSubedWitNhRC) ->
+    do_gen_reason_codes([], TFChecked, TFSubedWitNhRC).
+
+%% Initial RC is `RC_SUCCESS | RC_NOT_AUTHORIZED`, generated by check_sub_authzs/2
+%% And then TF with `RC_SUCCESS` will passing through `process_subscribe/2` and
+%% NRC should override the initial RC.
+do_gen_reason_codes(Acc, [], []) ->
+    lists:reverse(Acc);
+do_gen_reason_codes(
+    Acc,
+    [{_, ?RC_SUCCESS} | RestTF],
+    [{_, NRC} | RestWithNRC]
+) ->
+    %% will passing through `process_subscribe/2`
+    %% use NRC to override IintialRC
+    do_gen_reason_codes([NRC | Acc], RestTF, RestWithNRC);
+do_gen_reason_codes(
+    Acc,
+    [{_, InitialRC} | Rest],
+    RestWithNRC
+) ->
+    %% InitialRC is not `RC_SUCCESS`, use it.
+    do_gen_reason_codes([InitialRC | Acc], Rest, RestWithNRC).
+
 %%--------------------------------------------------------------------
 %% Process Unsubscribe
 %%--------------------------------------------------------------------
@@ -1213,13 +1188,8 @@ handle_call(Req, Channel) ->
     ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
 
 handle_info({subscribe, TopicFilters}, Channel) ->
-    {_, NChannel} = lists:foldl(
-        fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
-            do_subscribe(TopicFilter, SubOpts, ChannelAcc)
-        end,
-        {[], Channel},
-        parse_topic_filters(TopicFilters)
-    ),
+    NTopicFilters = enrich_subscribe(TopicFilters, Channel),
+    {_TopicFiltersWithRC, NChannel} = process_subscribe(NTopicFilters, Channel),
     {ok, NChannel};
 handle_info({unsubscribe, TopicFilters}, Channel) ->
     {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
@@ -1858,48 +1828,155 @@ check_pub_caps(
     emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
 
 %%--------------------------------------------------------------------
-%% Check Sub Authorization
+%% Check Subscribe Packet
+
+check_subscribe(SubPkt, _Channel) ->
+    case emqx_packet:check(SubPkt) of
+        ok -> ok;
+        {error, RC} -> {error, {disconnect, RC}}
+    end.
 
-check_sub_authzs(TopicFilters, Channel) ->
-    check_sub_authzs(TopicFilters, Channel, []).
+%%--------------------------------------------------------------------
+%% Check Sub Authorization
 
 check_sub_authzs(
-    [TopicFilter = {Topic, _} | More],
-    Channel = #channel{clientinfo = ClientInfo},
-    Acc
+    ?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters0),
+    Channel = #channel{clientinfo = ClientInfo}
 ) ->
+    CheckResult = do_check_sub_authzs(TopicFilters0, ClientInfo),
+    HasAuthzDeny = lists:any(
+        fun({{_TopicFilter, _SubOpts}, ReasonCode}) ->
+            ReasonCode =:= ?RC_NOT_AUTHORIZED
+        end,
+        CheckResult
+    ),
+    DenyAction = emqx:get_config([authorization, deny_action], ignore),
+    case DenyAction =:= disconnect andalso HasAuthzDeny of
+        true ->
+            {error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel};
+        false ->
+            {ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}
+    end.
+
+do_check_sub_authzs(TopicFilters, ClientInfo) ->
+    do_check_sub_authzs(ClientInfo, TopicFilters, []).
+
+do_check_sub_authzs(_ClientInfo, [], Acc) ->
+    lists:reverse(Acc);
+do_check_sub_authzs(ClientInfo, [TopicFilter = {Topic, _SubOpts} | More], Acc) ->
+    %% subsclibe authz check only cares the real topic filter when shared-sub
+    %% e.g. only check <<"t/#">> for <<"$share/g/t/#">>
     Action = authz_action(TopicFilter),
-    case emqx_access_control:authorize(ClientInfo, Action, Topic) of
+    case
+        emqx_access_control:authorize(
+            ClientInfo,
+            Action,
+            emqx_topic:get_shared_real_topic(Topic)
+        )
+    of
+        %% TODO: support maximum QoS granted
+        %% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7]
+        %% Not implemented yet:
+        %% {allow, RC} -> do_check_sub_authzs(ClientInfo, More, [{TopicFilter, RC} | Acc]);
         allow ->
-            check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
+            do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_SUCCESS} | Acc]);
         deny ->
-            check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
-    end;
-check_sub_authzs([], _Channel, Acc) ->
-    lists:reverse(Acc).
+            do_check_sub_authzs(ClientInfo, More, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
+    end.
 
 %%--------------------------------------------------------------------
 %% Check Sub Caps
 
-check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
-    emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
+check_sub_caps(
+    ?SUBSCRIBE_PACKET(PacketId, SubProps, TopicFilters),
+    Channel = #channel{clientinfo = ClientInfo}
+) ->
+    CheckResult = do_check_sub_caps(ClientInfo, TopicFilters),
+    {ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}.
+
+do_check_sub_caps(ClientInfo, TopicFilters) ->
+    do_check_sub_caps(ClientInfo, TopicFilters, []).
+
+do_check_sub_caps(_ClientInfo, [], Acc) ->
+    lists:reverse(Acc);
+do_check_sub_caps(ClientInfo, [TopicFilter = {{Topic, SubOpts}, ?RC_SUCCESS} | More], Acc) ->
+    case emqx_mqtt_caps:check_sub(ClientInfo, Topic, SubOpts) of
+        ok ->
+            do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]);
+        {error, NRC} ->
+            ?SLOG(
+                warning,
+                #{
+                    msg => "cannot_subscribe_topic_filter",
+                    reason => emqx_reason_codes:name(NRC)
+                },
+                #{topic => Topic}
+            ),
+            do_check_sub_caps(ClientInfo, More, [{{Topic, SubOpts}, NRC} | Acc])
+    end;
+do_check_sub_caps(ClientInfo, [TopicFilter = {{_Topic, _SubOpts}, _OtherRC} | More], Acc) ->
+    do_check_sub_caps(ClientInfo, More, [TopicFilter | Acc]).
+
+%%--------------------------------------------------------------------
+%% Run Subscribe Hooks
+
+run_sub_hooks(
+    ?SUBSCRIBE_PACKET(_PacketId, Properties, TopicFilters0),
+    _Channel = #channel{clientinfo = ClientInfo}
+) ->
+    TopicFilters = [
+        TopicFilter
+     || {TopicFilter, ?RC_SUCCESS} <- TopicFilters0
+    ],
+    _NTopicFilters = run_hooks('client.subscribe', [ClientInfo, Properties], TopicFilters).
 
 %%--------------------------------------------------------------------
-%% Enrich SubId
+%% Enrich SubOpts
 
-enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
+%% for api subscribe without sub-authz check and sub-caps check.
+enrich_subscribe(TopicFilters, Channel) when is_list(TopicFilters) ->
+    do_enrich_subscribe(#{}, TopicFilters, Channel);
+%% for mqtt clients sent subscribe packet.
+enrich_subscribe(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel) ->
+    NTopicFilters = do_enrich_subscribe(Properties, TopicFilters, Channel),
+    {ok, ?SUBSCRIBE_PACKET(PacketId, Properties, NTopicFilters), Channel}.
+
+do_enrich_subscribe(Properties, TopicFilters, Channel) ->
+    _NTopicFilters = run_fold(
+        [
+            %% TODO: do try catch with reason code here
+            fun(TFs, _) -> parse_raw_topic_filters(TFs) end,
+            fun enrich_subopts_subid/2,
+            fun enrich_subopts_porps/2,
+            fun enrich_subopts_flags/2
+        ],
+        TopicFilters,
+        #{sub_props => Properties, channel => Channel}
+    ).
+
+enrich_subopts_subid(TopicFilters, #{sub_props := #{'Subscription-Identifier' := SubId}}) ->
     [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
-enrich_subopts_subid(_Properties, TopicFilters) ->
+enrich_subopts_subid(TopicFilters, _State) ->
     TopicFilters.
 
-%%--------------------------------------------------------------------
-%% Enrich SubOpts
+enrich_subopts_porps(TopicFilters, #{sub_props := SubProps}) ->
+    [{Topic, SubOpts#{sub_props => SubProps}} || {Topic, SubOpts} <- TopicFilters].
+
+enrich_subopts_flags(TopicFilters, #{channel := Channel}) ->
+    do_enrich_subopts_flags(TopicFilters, Channel).
 
-enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
-    SubOpts;
-enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
+do_enrich_subopts_flags(TopicFilters, ?IS_MQTT_V5) ->
+    [{Topic, merge_default_subopts(SubOpts)} || {Topic, SubOpts} <- TopicFilters];
+do_enrich_subopts_flags(TopicFilters, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
+    Rap = flag(IsBridge),
     NL = flag(get_mqtt_conf(Zone, ignore_loop_deliver)),
-    SubOpts#{rap => flag(IsBridge), nl => NL}.
+    [
+        {Topic, (merge_default_subopts(SubOpts))#{rap => Rap, nl => NL}}
+     || {Topic, SubOpts} <- TopicFilters
+    ].
+
+merge_default_subopts(SubOpts) ->
+    maps:merge(?DEFAULT_SUBOPTS, SubOpts).
 
 %%--------------------------------------------------------------------
 %% Enrich ConnAck Caps
@@ -2089,8 +2166,8 @@ maybe_shutdown(Reason, _Intent = shutdown, Channel) ->
 %%--------------------------------------------------------------------
 %% Parse Topic Filters
 
--compile({inline, [parse_topic_filters/1]}).
-parse_topic_filters(TopicFilters) ->
+%% [{<<"$share/group/topic">>, _SubOpts = #{}} | _]
+parse_raw_topic_filters(TopicFilters) ->
     lists:map(fun emqx_topic:parse/1, TopicFilters).
 
 %%--------------------------------------------------------------------

+ 32 - 15
apps/emqx/src/emqx_mountpoint.erl

@@ -17,6 +17,7 @@
 -module(emqx_mountpoint).
 
 -include("emqx.hrl").
+-include("emqx_mqtt.hrl").
 -include("emqx_placeholder.hrl").
 -include("types.hrl").
 
@@ -34,38 +35,54 @@
 -spec mount(maybe(mountpoint()), Any) -> Any when
     Any ::
         emqx_types:topic()
+        | emqx_types:share()
         | emqx_types:message()
         | emqx_types:topic_filters().
 mount(undefined, Any) ->
     Any;
-mount(MountPoint, Topic) when is_binary(Topic) ->
-    prefix(MountPoint, Topic);
-mount(MountPoint, Msg = #message{topic = Topic}) ->
-    Msg#message{topic = prefix(MountPoint, Topic)};
+mount(MountPoint, Topic) when ?IS_TOPIC(Topic) ->
+    prefix_maybe_share(MountPoint, Topic);
+mount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) ->
+    Msg#message{topic = prefix_maybe_share(MountPoint, Topic)};
 mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
-    [{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
+    [{prefix_maybe_share(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
 
-%% @private
--compile({inline, [prefix/2]}).
-prefix(MountPoint, Topic) ->
-    <<MountPoint/binary, Topic/binary>>.
+-spec prefix_maybe_share(maybe(mountpoint()), Any) -> Any when
+    Any ::
+        emqx_types:topic()
+        | emqx_types:share().
+prefix_maybe_share(MountPoint, Topic) when
+    is_binary(MountPoint) andalso is_binary(Topic)
+->
+    <<MountPoint/binary, Topic/binary>>;
+prefix_maybe_share(MountPoint, #share{group = Group, topic = Topic}) when
+    is_binary(MountPoint) andalso is_binary(Topic)
+->
+    #share{group = Group, topic = prefix_maybe_share(MountPoint, Topic)}.
 
 -spec unmount(maybe(mountpoint()), Any) -> Any when
     Any ::
         emqx_types:topic()
+        | emqx_types:share()
         | emqx_types:message().
 unmount(undefined, Any) ->
     Any;
-unmount(MountPoint, Topic) when is_binary(Topic) ->
+unmount(MountPoint, Topic) when ?IS_TOPIC(Topic) ->
+    unmount_maybe_share(MountPoint, Topic);
+unmount(MountPoint, Msg = #message{topic = Topic}) when is_binary(Topic) ->
+    Msg#message{topic = unmount_maybe_share(MountPoint, Topic)}.
+
+unmount_maybe_share(MountPoint, Topic) when
+    is_binary(MountPoint) andalso is_binary(Topic)
+->
     case string:prefix(Topic, MountPoint) of
         nomatch -> Topic;
         Topic1 -> Topic1
     end;
-unmount(MountPoint, Msg = #message{topic = Topic}) ->
-    case string:prefix(Topic, MountPoint) of
-        nomatch -> Msg;
-        Topic1 -> Msg#message{topic = Topic1}
-    end.
+unmount_maybe_share(MountPoint, TopicFilter = #share{topic = Topic}) when
+    is_binary(MountPoint) andalso is_binary(Topic)
+->
+    TopicFilter#share{topic = unmount_maybe_share(MountPoint, Topic)}.
 
 -spec replvar(maybe(mountpoint()), map()) -> maybe(mountpoint()).
 replvar(undefined, _Vars) ->

+ 12 - 3
apps/emqx/src/emqx_mqtt_caps.erl

@@ -102,16 +102,19 @@ do_check_pub(_Flags, _Caps) ->
 
 -spec check_sub(
     emqx_types:clientinfo(),
-    emqx_types:topic(),
+    emqx_types:topic() | emqx_types:share(),
     emqx_types:subopts()
 ) ->
     ok_or_error(emqx_types:reason_code()).
 check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) ->
     Caps = emqx_config:get_zone_conf(Zone, [mqtt]),
     Flags = #{
+        %% TODO: qos check
+        %% (max_qos_allowed, Map) ->
+        %% max_qos_allowed => maps:get(max_qos_allowed, Caps, 2),
         topic_levels => emqx_topic:levels(Topic),
         is_wildcard => emqx_topic:wildcard(Topic),
-        is_shared => maps:is_key(share, SubOpts),
+        is_shared => erlang:is_record(Topic, share),
         is_exclusive => maps:get(is_exclusive, SubOpts, false)
     },
     do_check_sub(Flags, Caps, ClientInfo, Topic).
@@ -126,13 +129,19 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) ->
     {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
 do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) ->
     {error, ?RC_TOPIC_FILTER_INVALID};
-do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) ->
+do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) when
+    is_binary(Topic)
+->
     case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of
         deny ->
             {error, ?RC_QUOTA_EXCEEDED};
         _ ->
             ok
     end;
+%% for max_qos_allowed
+%% see: RC_GRANTED_QOS_0, RC_GRANTED_QOS_1, RC_GRANTED_QOS_2
+%% do_check_sub(_, _) ->
+%%     {ok, RC};
 do_check_sub(_Flags, _Caps, _, _) ->
     ok.
 

+ 1 - 0
apps/emqx/src/emqx_reason_codes.erl

@@ -177,6 +177,7 @@ compat(connack, 16#9D) -> ?CONNACK_SERVER;
 compat(connack, 16#9F) -> ?CONNACK_SERVER;
 compat(suback, Code) when Code =< ?QOS_2 -> Code;
 compat(suback, Code) when Code >= 16#80 -> 16#80;
+%% TODO: 16#80(qos0) 16#81(qos1) 16#82(qos2) for mqtt-v3.1.1
 compat(unsuback, _Code) -> undefined;
 compat(_Other, _Code) -> undefined.
 

+ 9 - 3
apps/emqx/src/emqx_session.erl

@@ -259,7 +259,7 @@ destroy(Session) ->
 
 -spec subscribe(
     clientinfo(),
-    emqx_types:topic(),
+    emqx_types:topic() | emqx_types:share(),
     emqx_types:subopts(),
     t()
 ) ->
@@ -279,7 +279,7 @@ subscribe(ClientInfo, TopicFilter, SubOpts, Session) ->
 
 -spec unsubscribe(
     clientinfo(),
-    emqx_types:topic(),
+    emqx_types:topic() | emqx_types:share(),
     emqx_types:subopts(),
     t()
 ) ->
@@ -410,7 +410,13 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) ->
     end.
 
 enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
-    SubOpts = ?IMPL(Session):get_subscription(Topic, Session),
+    SubOpts =
+        case Msg of
+            #message{headers = #{redispatch_to := ?REDISPATCH_TO(Group, T)}} ->
+                ?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session);
+            _ ->
+                ?IMPL(Session):get_subscription(Topic, Session)
+        end,
     enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS).
 
 enrich_message(

+ 1 - 1
apps/emqx/src/emqx_session_mem.erl

@@ -314,7 +314,7 @@ unsubscribe(
             {error, ?RC_NO_SUBSCRIPTION_EXISTED}
     end.
 
--spec get_subscription(emqx_types:topic(), session()) ->
+-spec get_subscription(emqx_types:topic() | emqx_types:share(), session()) ->
     emqx_types:subopts() | undefined.
 get_subscription(Topic, #session{subscriptions = Subs}) ->
     maps:get(Topic, Subs, undefined).

+ 7 - 11
apps/emqx/src/emqx_shared_sub.erl

@@ -95,7 +95,6 @@
 -define(ACK, shared_sub_ack).
 -define(NACK(Reason), {shared_sub_nack, Reason}).
 -define(NO_ACK, no_ack).
--define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
 -define(SUBSCRIBER_DOWN, noproc).
 
 -type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()).
@@ -234,19 +233,16 @@ without_group_ack(Msg) ->
 get_group_ack(Msg) ->
     emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
 
-with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
-    Msg;
+%% always add `redispatch_to` header to the message
+%% for QOS_0 msgs, redispatch_to is not needed and filtered out in is_redispatch_needed/1
 with_redispatch_to(Msg, Group, Topic) ->
     emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
 
-%% @hidden Redispatch is needed only for the messages with redispatch_to header added.
-is_redispatch_needed(#message{} = Msg) ->
-    case get_redispatch_to(Msg) of
-        ?REDISPATCH_TO(_, _) ->
-            true;
-        _ ->
-            false
-    end.
+%% @hidden Redispatch is needed only for the messages which not QOS_0
+is_redispatch_needed(#message{qos = ?QOS_0}) ->
+    false;
+is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) ->
+    true.
 
 %% @doc Redispatch shared deliveries to other members in the group.
 redispatch(Messages0) ->

+ 69 - 20
apps/emqx/src/emqx_topic.erl

@@ -36,9 +36,16 @@
     parse/2
 ]).
 
+-export([
+    maybe_format_share/1,
+    get_shared_real_topic/1,
+    make_shared_record/2
+]).
+
 -type topic() :: emqx_types:topic().
 -type word() :: emqx_types:word().
 -type words() :: emqx_types:words().
+-type share() :: emqx_types:share().
 
 %% Guards
 -define(MULTI_LEVEL_WILDCARD_NOT_LAST(C, REST),
@@ -50,7 +57,9 @@
 %%--------------------------------------------------------------------
 
 %% @doc Is wildcard topic?
--spec wildcard(topic() | words()) -> true | false.
+-spec wildcard(topic() | share() | words()) -> true | false.
+wildcard(#share{topic = Topic}) when is_binary(Topic) ->
+    wildcard(Topic);
 wildcard(Topic) when is_binary(Topic) ->
     wildcard(words(Topic));
 wildcard([]) ->
@@ -64,7 +73,7 @@ wildcard([_H | T]) ->
 
 %% @doc Match Topic name with filter.
 -spec match(Name, Filter) -> boolean() when
-    Name :: topic() | words(),
+    Name :: topic() | share() | words(),
     Filter :: topic() | words().
 match(<<$$, _/binary>>, <<$+, _/binary>>) ->
     false;
@@ -72,6 +81,8 @@ match(<<$$, _/binary>>, <<$#, _/binary>>) ->
     false;
 match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
     match(words(Name), words(Filter));
+match(#share{} = Name, Filter) ->
+    match_share(Name, Filter);
 match([], []) ->
     true;
 match([H | T1], [H | T2]) ->
@@ -87,12 +98,26 @@ match([_H1 | _], []) ->
 match([], [_H | _T2]) ->
     false.
 
+-spec match_share(Name, Filter) -> boolean() when
+    Name :: share(),
+    Filter :: topic() | share().
+match_share(#share{topic = Name}, Filter) when is_binary(Filter) ->
+    %% only match real topic filter for normal topic filter.
+    match(words(Name), words(Filter));
+match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = Filter}) ->
+    %% Matching real topic filter When subed same share group.
+    match(words(Name), words(Filter));
+match_share(#share{}, _) ->
+    %% Otherwise, non-matched.
+    false.
+
 -spec match_any(Name, [Filter]) -> boolean() when
     Name :: topic() | words(),
     Filter :: topic() | words().
 match_any(Topic, Filters) ->
     lists:any(fun(Filter) -> match(Topic, Filter) end, Filters).
 
+%% TODO: validate share topic #share{} for emqx_trace.erl
 %% @doc Validate topic name or filter
 -spec validate(topic() | {name | filter, topic()}) -> true.
 validate(Topic) when is_binary(Topic) ->
@@ -107,7 +132,7 @@ validate(_, <<>>) ->
 validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
     %% MQTT-5.0 [MQTT-4.7.3-3]
     error(topic_too_long);
-validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) ->
+validate(filter, SharedFilter = <<?SHARE, "/", _Rest/binary>>) ->
     validate_share(SharedFilter);
 validate(filter, Filter) when is_binary(Filter) ->
     validate2(words(Filter));
@@ -139,12 +164,12 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
 validate3(<<_/utf8, Rest/binary>>) ->
     validate3(Rest).
 
-validate_share(<<"$share/", Rest/binary>>) when
+validate_share(<<?SHARE, "/", Rest/binary>>) when
     Rest =:= <<>> orelse Rest =:= <<"/">>
 ->
     %% MQTT-5.0 [MQTT-4.8.2-1]
     error(?SHARE_EMPTY_FILTER);
-validate_share(<<"$share/", Rest/binary>>) ->
+validate_share(<<?SHARE, "/", Rest/binary>>) ->
     case binary:split(Rest, <<"/">>) of
         %% MQTT-5.0 [MQTT-4.8.2-1]
         [<<>>, _] ->
@@ -156,7 +181,7 @@ validate_share(<<"$share/", Rest/binary>>) ->
             validate_share(ShareName, Filter)
     end.
 
-validate_share(_, <<"$share/", _Rest/binary>>) ->
+validate_share(_, <<?SHARE, "/", _Rest/binary>>) ->
     error(?SHARE_RECURSIVELY);
 validate_share(ShareName, Filter) ->
     case binary:match(ShareName, [<<"+">>, <<"#">>]) of
@@ -185,7 +210,9 @@ bin('#') -> <<"#">>;
 bin(B) when is_binary(B) -> B;
 bin(L) when is_list(L) -> list_to_binary(L).
 
--spec levels(topic()) -> pos_integer().
+-spec levels(topic() | share()) -> pos_integer().
+levels(#share{topic = Topic}) when is_binary(Topic) ->
+    levels(Topic);
 levels(Topic) when is_binary(Topic) ->
     length(tokens(Topic)).
 
@@ -197,6 +224,8 @@ tokens(Topic) ->
 
 %% @doc Split Topic Path to Words
 -spec words(topic()) -> words().
+words(#share{topic = Topic}) when is_binary(Topic) ->
+    words(Topic);
 words(Topic) when is_binary(Topic) ->
     [word(W) || W <- tokens(Topic)].
 
@@ -237,26 +266,29 @@ do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) ->
 do_join(TopicAcc, [Word | Words]) ->
     do_join(<<TopicAcc/binary, "/", (bin(Word))/binary>>, Words).
 
--spec parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}.
+-spec parse(topic() | {topic(), map()}) -> {topic() | share(), map()}.
 parse(TopicFilter) when is_binary(TopicFilter) ->
     parse(TopicFilter, #{});
 parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
     parse(TopicFilter, Options).
 
--spec parse(topic(), map()) -> {topic(), map()}.
-parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
-    error({invalid_topic_filter, TopicFilter});
-parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
-    error({invalid_topic_filter, TopicFilter});
-parse(<<"$queue/", TopicFilter/binary>>, Options) ->
-    parse(TopicFilter, Options#{share => <<"$queue">>});
-parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
+-spec parse(topic() | share(), map()) -> {topic() | share(), map()}.
+%% <<"$queue/[real_topic_filter]>">> equivalent to <<"$share/$queue/[real_topic_filter]">>
+%% So the head of `real_topic_filter` MUST NOT be `<<$queue>>` or `<<$share>>`
+parse(#share{topic = Topic = <<?QUEUE, "/", _/binary>>}, _Options) ->
+    error({invalid_topic_filter, Topic});
+parse(#share{topic = Topic = <<?SHARE, "/", _/binary>>}, _Options) ->
+    error({invalid_topic_filter, Topic});
+parse(<<?QUEUE, "/", Topic/binary>>, Options) ->
+    parse(#share{group = <<?QUEUE>>, topic = Topic}, Options);
+parse(TopicFilter = <<?SHARE, "/", Rest/binary>>, Options) ->
     case binary:split(Rest, <<"/">>) of
         [_Any] ->
             error({invalid_topic_filter, TopicFilter});
-        [ShareName, Filter] ->
-            case binary:match(ShareName, [<<"+">>, <<"#">>]) of
-                nomatch -> parse(Filter, Options#{share => ShareName});
+        %% `Group` could be `$share` or `$queue`
+        [Group, Topic] ->
+            case binary:match(Group, [<<"+">>, <<"#">>]) of
+                nomatch -> parse(#share{group = Group, topic = Topic}, Options);
                 _ -> error({invalid_topic_filter, TopicFilter})
             end
     end;
@@ -267,5 +299,22 @@ parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) ->
         _ ->
             {Topic, Options#{is_exclusive => true}}
     end;
-parse(TopicFilter, Options) ->
+parse(TopicFilter, Options) when
+    ?IS_TOPIC(TopicFilter)
+->
     {TopicFilter, Options}.
+
+get_shared_real_topic(#share{topic = TopicFilter}) ->
+    TopicFilter;
+get_shared_real_topic(TopicFilter) when is_binary(TopicFilter) ->
+    TopicFilter.
+
+make_shared_record(Group, Topic) ->
+    #share{group = Group, topic = Topic}.
+
+maybe_format_share(#share{group = <<?QUEUE>>, topic = Topic}) ->
+    join([<<?QUEUE>>, Topic]);
+maybe_format_share(#share{group = Group, topic = Topic}) ->
+    join([<<?SHARE>>, Group, Topic]);
+maybe_format_share(Topic) ->
+    join([Topic]).

+ 8 - 2
apps/emqx/src/emqx_types.erl

@@ -40,6 +40,10 @@
     words/0
 ]).
 
+-export_type([
+    share/0
+]).
+
 -export_type([
     socktype/0,
     sockstate/0,
@@ -136,11 +140,14 @@
 
 -type subid() :: binary() | atom().
 
--type group() :: binary() | undefined.
+%% '_' for match spec
+-type group() :: binary() | '_'.
 -type topic() :: binary().
 -type word() :: '' | '+' | '#' | binary().
 -type words() :: list(word()).
 
+-type share() :: #share{}.
+
 -type socktype() :: tcp | udp | ssl | proxy | atom().
 -type sockstate() :: idle | running | blocked | closed.
 -type conninfo() :: #{
@@ -207,7 +214,6 @@
     rap := 0 | 1,
     nl := 0 | 1,
     qos := qos(),
-    share => binary(),
     atom() => term()
 }.
 -type reason_code() :: 0..16#FF.

+ 8 - 25
apps/emqx/test/emqx_broker_SUITE.erl

@@ -299,14 +299,19 @@ t_nosub_pub(Config) when is_list(Config) ->
     ?assertEqual(1, emqx_metrics:val('messages.dropped')).
 
 t_shared_subscribe({init, Config}) ->
-    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
+    emqx_broker:subscribe(
+        emqx_topic:make_shared_record(<<"group">>, <<"topic">>), <<"clientid">>, #{}
+    ),
     ct:sleep(100),
     Config;
 t_shared_subscribe(Config) when is_list(Config) ->
     emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
     ?assert(
         receive
-            {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
+            {deliver, <<"topic">>, #message{
+                headers = #{redispatch_to := ?REDISPATCH_TO(<<"group">>, <<"topic">>)},
+                payload = <<"hello">>
+            }} ->
                 true;
             Msg ->
                 ct:pal("Msg: ~p", [Msg]),
@@ -316,7 +321,7 @@ t_shared_subscribe(Config) when is_list(Config) ->
         end
     );
 t_shared_subscribe({'end', _Config}) ->
-    emqx_broker:unsubscribe(<<"$share/group/topic">>).
+    emqx_broker:unsubscribe(emqx_topic:make_shared_record(<<"group">>, <<"topic">>)).
 
 t_shared_subscribe_2({init, Config}) ->
     Config;
@@ -723,24 +728,6 @@ t_connack_auth_error(Config) when is_list(Config) ->
     ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
     ok.
 
-t_handle_in_empty_client_subscribe_hook({init, Config}) ->
-    Hook = {?MODULE, client_subscribe_delete_all_hook, []},
-    ok = emqx_hooks:put('client.subscribe', Hook, _Priority = 100),
-    Config;
-t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
-    emqx_hooks:del('client.subscribe', {?MODULE, client_subscribe_delete_all_hook}),
-    ok;
-t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
-    {ok, C} = emqtt:start_link(),
-    {ok, _} = emqtt:connect(C),
-    try
-        {ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
-        ?assertEqual([?RC_UNSPECIFIED_ERROR], RCs),
-        ok
-    after
-        emqtt:disconnect(C)
-    end.
-
 authenticate_deny(_Credentials, _Default) ->
     {stop, {error, bad_username_or_password}}.
 
@@ -800,7 +787,3 @@ recv_msgs(Count, Msgs) ->
     after 100 ->
         Msgs
     end.
-
-client_subscribe_delete_all_hook(_ClientInfo, _Username, TopicFilter) ->
-    EmptyFilters = [{T, Opts#{deny_subscription => true}} || {T, Opts} <- TopicFilter],
-    {stop, EmptyFilters}.

+ 9 - 2
apps/emqx/test/emqx_channel_SUITE.erl

@@ -456,7 +456,7 @@ t_process_subscribe(_) ->
     ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
     TopicFilters = [TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}],
     {[{TopicFilter, ?RC_SUCCESS}], _Channel} =
-        emqx_channel:process_subscribe(TopicFilters, #{}, channel()).
+        emqx_channel:process_subscribe(TopicFilters, channel()).
 
 t_process_unsubscribe(_) ->
     ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end),
@@ -914,7 +914,13 @@ t_check_pub_alias(_) ->
 t_check_sub_authzs(_) ->
     emqx_config:put_zone_conf(default, [authorization, enable], true),
     TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
-    [{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()).
+    SubPkt = ?SUBSCRIBE_PACKET(1, #{}, [TopicFilter]),
+    CheckedSubPkt = ?SUBSCRIBE_PACKET(1, #{}, [{TopicFilter, ?RC_SUCCESS}]),
+    Channel = channel(),
+    ?assertEqual(
+        {ok, CheckedSubPkt, Channel},
+        emqx_channel:check_sub_authzs(SubPkt, Channel)
+    ).
 
 t_enrich_connack_caps(_) ->
     ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
@@ -1061,6 +1067,7 @@ clientinfo(InitProps) ->
             clientid => <<"clientid">>,
             username => <<"username">>,
             is_superuser => false,
+            is_bridge => false,
             mountpoint => undefined
         },
         InitProps

+ 39 - 0
apps/emqx/test/emqx_mountpoint_SUITE.erl

@@ -29,6 +29,7 @@
 ).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
@@ -52,6 +53,27 @@ t_mount(_) ->
         mount(<<"device/1/">>, TopicFilters)
     ).
 
+t_mount_share(_) ->
+    T = {TopicFilter, Opts} = emqx_topic:parse(<<"$share/group/topic">>),
+    TopicFilters = [T],
+    ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}),
+
+    %% should not mount share topic when make message.
+    Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>),
+
+    ?assertEqual(
+        TopicFilter,
+        mount(undefined, TopicFilter)
+    ),
+    ?assertEqual(
+        #share{group = <<"group">>, topic = <<"device/1/topic">>},
+        mount(<<"device/1/">>, TopicFilter)
+    ),
+    ?assertEqual(
+        [{#share{group = <<"group">>, topic = <<"device/1/topic">>}, Opts}],
+        mount(<<"device/1/">>, TopicFilters)
+    ).
+
 t_unmount(_) ->
     Msg = emqx_message:make(<<"clientid">>, <<"device/1/topic">>, <<"payload">>),
     ?assertEqual(<<"topic">>, unmount(undefined, <<"topic">>)),
@@ -61,6 +83,23 @@ t_unmount(_) ->
     ?assertEqual(<<"device/1/topic">>, unmount(<<"device/2/">>, <<"device/1/topic">>)),
     ?assertEqual(Msg#message{topic = <<"device/1/topic">>}, unmount(<<"device/2/">>, Msg)).
 
+t_unmount_share(_) ->
+    {TopicFilter, _Opts} = emqx_topic:parse(<<"$share/group/topic">>),
+    MountedTopicFilter = #share{group = <<"group">>, topic = <<"device/1/topic">>},
+
+    ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}),
+
+    %% should not unmount share topic when make message.
+    Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>),
+    ?assertEqual(
+        TopicFilter,
+        unmount(undefined, TopicFilter)
+    ),
+    ?assertEqual(
+        #share{group = <<"group">>, topic = <<"topic">>},
+        unmount(<<"device/1/">>, MountedTopicFilter)
+    ).
+
 t_replvar(_) ->
     ?assertEqual(undefined, replvar(undefined, #{})),
     ?assertEqual(

+ 3 - 1
apps/emqx/test/emqx_mqtt_caps_SUITE.erl

@@ -76,6 +76,8 @@ t_check_sub(_) ->
     ),
     ?assertEqual(
         {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
-        emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})
+        emqx_mqtt_caps:check_sub(
+            ClientInfo, #share{group = <<"group">>, topic = <<"topic">>}, SubOpts
+        )
     ),
     emqx_config:put([zones], OldConf).

+ 130 - 67
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -137,7 +137,8 @@ t_random_basic(Config) when is_list(Config) ->
     ClientId = <<"ClientId">>,
     Topic = <<"foo">>,
     Payload = <<"hello">>,
-    emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
+    Group = <<"group1">>,
+    emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}),
     MsgQoS2 = emqx_message:make(ClientId, 2, Topic, Payload),
     %% wait for the subscription to show up
     ct:sleep(200),
@@ -402,7 +403,7 @@ t_hash(Config) when is_list(Config) ->
     ok = ensure_config(hash_clientid, false),
     test_two_messages(hash_clientid).
 
-t_hash_clinetid(Config) when is_list(Config) ->
+t_hash_clientid(Config) when is_list(Config) ->
     ok = ensure_config(hash_clientid, false),
     test_two_messages(hash_clientid).
 
@@ -528,14 +529,15 @@ last_message(ExpectedPayload, Pids, Timeout) ->
 t_dispatch(Config) when is_list(Config) ->
     ok = ensure_config(random),
     Topic = <<"foo">>,
+    Group = <<"group1">>,
     ?assertEqual(
         {error, no_subscribers},
-        emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
+        emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}})
     ),
-    emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
+    emqx_broker:subscribe(emqx_topic:make_shared_record(Group, Topic), #{qos => 2}),
     ?assertEqual(
         {ok, 1},
-        emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
+        emqx_shared_sub:dispatch(Group, Topic, #delivery{message = #message{}})
     ).
 
 t_uncovered_func(Config) when is_list(Config) ->
@@ -991,37 +993,110 @@ t_session_kicked(Config) when is_list(Config) ->
     ?assertEqual([], collect_msgs(0)),
     ok.
 
-%% FIXME: currently doesn't work
-%% t_different_groups_same_topic({init, Config}) ->
-%%     TestName = atom_to_binary(?FUNCTION_NAME),
-%%     ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
-%%     {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
-%%     {ok, _} = emqtt:connect(C),
-%%     [{client, C}, {clientid, ClientId} | Config];
-%% t_different_groups_same_topic({'end', Config}) ->
-%%     C = ?config(client, Config),
-%%     emqtt:stop(C),
-%%     ok;
-%% t_different_groups_same_topic(Config) when is_list(Config) ->
-%%     C = ?config(client, Config),
-%%     ClientId = ?config(clientid, Config),
-%%     %% Subscribe and unsubscribe to both $queue and $shared topics
-%%     Topic = <<"t/1">>,
-%%     SharedTopic0 = <<"$share/aa/", Topic/binary>>,
-%%     SharedTopic1 = <<"$share/bb/", Topic/binary>>,
-%%     {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}),
-%%     {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}),
-
-%%     Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
-%%     emqx:publish(Message0),
-%%     ?assertMatch([ {publish, #{payload := <<"hi">>}}
-%%                  , {publish, #{payload := <<"hi">>}}
-%%                  ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}),
-
-%%     {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0),
-%%     {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1),
-
-%%     ok.
+-define(UPDATE_SUB_QOS(ConnPid, Topic, QoS),
+    ?assertMatch({ok, _, [QoS]}, emqtt:subscribe(ConnPid, {Topic, QoS}))
+).
+
+t_different_groups_same_topic({init, Config}) ->
+    TestName = atom_to_binary(?FUNCTION_NAME),
+    ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
+    {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+    [{client, C}, {clientid, ClientId} | Config];
+t_different_groups_same_topic({'end', Config}) ->
+    C = ?config(client, Config),
+    emqtt:stop(C),
+    ok;
+t_different_groups_same_topic(Config) when is_list(Config) ->
+    C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
+    %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic
+    GroupA = <<"aa">>,
+    GroupB = <<"bb">>,
+    Topic = <<"t/1">>,
+
+    SharedTopicGroupA = ?SHARE(GroupA, Topic),
+    ?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2),
+    SharedTopicGroupB = ?SHARE(GroupB, Topic),
+    ?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2),
+
+    ?retry(
+        _Sleep0 = 100,
+        _Attempts0 = 50,
+        begin
+            ?assertEqual(2, length(emqx_router:match_routes(Topic)))
+        end
+    ),
+
+    Message0 = emqx_message:make(ClientId, ?QOS_2, Topic, <<"hi">>),
+    emqx:publish(Message0),
+    ?assertMatch(
+        [
+            {publish, #{payload := <<"hi">>}},
+            {publish, #{payload := <<"hi">>}}
+        ],
+        collect_msgs(5_000),
+        #{routes => ets:tab2list(emqx_route)}
+    ),
+
+    {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA),
+    {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB),
+
+    ok.
+
+t_different_groups_update_subopts({init, Config}) ->
+    TestName = atom_to_binary(?FUNCTION_NAME),
+    ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
+    {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+    [{client, C}, {clientid, ClientId} | Config];
+t_different_groups_update_subopts({'end', Config}) ->
+    C = ?config(client, Config),
+    emqtt:stop(C),
+    ok;
+t_different_groups_update_subopts(Config) when is_list(Config) ->
+    C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
+    %% Subscribe and unsubscribe to different group `aa` and `bb` with same topic
+    Topic = <<"t/1">>,
+    GroupA = <<"aa">>,
+    GroupB = <<"bb">>,
+    SharedTopicGroupA = ?SHARE(GroupA, Topic),
+    SharedTopicGroupB = ?SHARE(GroupB, Topic),
+
+    Fun = fun(Group, QoS) ->
+        ?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS),
+        ?assertMatch(
+            #{qos := QoS},
+            emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic))
+        )
+    end,
+
+    [Fun(Group, QoS) || QoS <- [?QOS_0, ?QOS_1, ?QOS_2], Group <- [GroupA, GroupB]],
+
+    ?retry(
+        _Sleep0 = 100,
+        _Attempts0 = 50,
+        begin
+            ?assertEqual(2, length(emqx_router:match_routes(Topic)))
+        end
+    ),
+
+    Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
+    emqx:publish(Message0),
+    ?assertMatch(
+        [
+            {publish, #{payload := <<"hi">>}},
+            {publish, #{payload := <<"hi">>}}
+        ],
+        collect_msgs(5_000),
+        #{routes => ets:tab2list(emqx_route)}
+    ),
+
+    {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupA),
+    {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopicGroupB),
+
+    ok.
 
 t_queue_subscription({init, Config}) ->
     TestName = atom_to_binary(?FUNCTION_NAME),
@@ -1038,23 +1113,19 @@ t_queue_subscription({'end', Config}) ->
 t_queue_subscription(Config) when is_list(Config) ->
     C = ?config(client, Config),
     ClientId = ?config(clientid, Config),
-    %% Subscribe and unsubscribe to both $queue and $shared topics
+    %% Subscribe and unsubscribe to both $queue share and $share/<group> with same topic
     Topic = <<"t/1">>,
     QueueTopic = <<"$queue/", Topic/binary>>,
     SharedTopic = <<"$share/aa/", Topic/binary>>,
-    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}),
-    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}),
 
-    %% FIXME: we should actually see 2 routes, one for each group
-    %% ($queue and aa), but currently the latest subscription
-    %% overwrites the existing one.
+    ?UPDATE_SUB_QOS(C, QueueTopic, ?QOS_2),
+    ?UPDATE_SUB_QOS(C, SharedTopic, ?QOS_2),
+
     ?retry(
         _Sleep0 = 100,
         _Attempts0 = 50,
         begin
-            ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
-            %% FIXME: should ensure we have 2 subscriptions
-            [_] = emqx_router:lookup_routes(Topic)
+            ?assertEqual(2, length(emqx_router:match_routes(Topic)))
         end
     ),
 
@@ -1063,37 +1134,29 @@ t_queue_subscription(Config) when is_list(Config) ->
     emqx:publish(Message0),
     ?assertMatch(
         [
+            {publish, #{payload := <<"hi">>}},
             {publish, #{payload := <<"hi">>}}
-            %% FIXME: should receive one message from each group
-            %% , {publish, #{payload := <<"hi">>}}
         ],
-        collect_msgs(5_000)
+        collect_msgs(5_000),
+        #{routes => ets:tab2list(emqx_route)}
     ),
 
     {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic),
-    %% FIXME: return code should be success instead of 17 ("no_subscription_existed")
-    {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic),
-
-    %% FIXME: this should eventually be true, but currently we leak
-    %% the previous group subscription...
-    %% ?retry(
-    %%     _Sleep0 = 100,
-    %%     _Attempts0 = 50,
-    %%    begin
-    %%     ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
-    %%     [] = emqx_router:lookup_routes(Topic)
-    %%    end
-    %%   ),
+    {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, SharedTopic),
+
+    ?retry(
+        _Sleep0 = 100,
+        _Attempts0 = 50,
+        begin
+            ?assertEqual(0, length(emqx_router:match_routes(Topic)))
+        end
+    ),
     ct:sleep(500),
 
     Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>),
     emqx:publish(Message1),
-    %% FIXME: we should *not* receive any messages...
-    %% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
-    %% This is from the leaked group...
-    ?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{
-        routes => ets:tab2list(emqx_route)
-    }),
+    %% we should *not* receive any messages.
+    ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
 
     ok.
 

+ 8 - 4
apps/emqx/test/emqx_topic_SUITE.erl

@@ -238,11 +238,11 @@ long_topic() ->
 t_parse(_) ->
     ?assertError(
         {invalid_topic_filter, <<"$queue/t">>},
-        parse(<<"$queue/t">>, #{share => <<"g">>})
+        parse(#share{group = <<"$queue">>, topic = <<"$queue/t">>}, #{})
     ),
     ?assertError(
         {invalid_topic_filter, <<"$share/g/t">>},
-        parse(<<"$share/g/t">>, #{share => <<"g">>})
+        parse(#share{group = <<"g">>, topic = <<"$share/g/t">>}, #{})
     ),
     ?assertError(
         {invalid_topic_filter, <<"$share/t">>},
@@ -254,8 +254,12 @@ t_parse(_) ->
     ),
     ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
     ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
-    ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
-    ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
+    ?assertEqual(
+        {#share{group = <<"$queue">>, topic = <<"topic">>}, #{}}, parse(<<"$queue/topic">>)
+    ),
+    ?assertEqual(
+        {#share{group = <<"group">>, topic = <<"topic">>}, #{}}, parse(<<"$share/group/topic">>)
+    ),
     %% The '$local' and '$fastlane' topics have been deprecated.
     ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
     ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -96,7 +96,7 @@ choose_ingress_pool_size(
     #{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
 ) ->
     case emqx_topic:parse(RemoteTopic) of
-        {_Filter, #{share := _Name}} ->
+        {#share{} = _Filter, _SubOpts} ->
             % NOTE: this is shared subscription, many workers may subscribe
             PoolSize;
         {_Filter, #{}} when PoolSize > 1 ->

+ 4 - 1
apps/emqx_exhook/priv/protos/exhook.proto

@@ -460,8 +460,11 @@ message SubOpts {
   // The QoS level
   uint32 qos = 1;
 
+  // deprecated
+  reserved 2;
+  reserved "share";
   // The group name for shared subscription
-  string share = 2;
+  // string share = 2;
 
   // The Retain Handling option (MQTT v5.0)
   //

+ 1 - 1
apps/emqx_exhook/src/emqx_exhook.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_exhook, [
     {description, "EMQX Extension for Hook"},
-    {vsn, "5.0.14"},
+    {vsn, "5.0.15"},
     {modules, []},
     {registered, []},
     {mod, {emqx_exhook_app, []}},

+ 11 - 5
apps/emqx_exhook/src/emqx_exhook_handler.erl

@@ -143,7 +143,7 @@ on_client_authorize(ClientInfo, Action, Topic, Result) ->
     Req = #{
         clientinfo => clientinfo(ClientInfo),
         type => Type,
-        topic => Topic,
+        topic => emqx_topic:maybe_format_share(Topic),
         result => Bool
     },
     case
@@ -191,15 +191,15 @@ on_session_created(ClientInfo, _SessInfo) ->
 on_session_subscribed(ClientInfo, Topic, SubOpts) ->
     Req = #{
         clientinfo => clientinfo(ClientInfo),
-        topic => Topic,
-        subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
+        topic => emqx_topic:maybe_format_share(Topic),
+        subopts => maps:with([qos, rh, rap, nl], SubOpts)
     },
     cast('session.subscribed', Req).
 
 on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
     Req = #{
         clientinfo => clientinfo(ClientInfo),
-        topic => Topic
+        topic => emqx_topic:maybe_format_share(Topic)
     },
     cast('session.unsubscribed', Req).
 
@@ -413,7 +413,13 @@ enrich_header(Headers, Message) ->
     end.
 
 topicfilters(Tfs) when is_list(Tfs) ->
-    [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
+    GetQos = fun(SubOpts) ->
+        maps:get(qos, SubOpts, 0)
+    end,
+    [
+        #{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)}
+     || {Topic, SubOpts} <- Tfs
+    ].
 
 ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
     list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));

+ 1 - 2
apps/emqx_exhook/test/props/prop_exhook_hooks.erl

@@ -546,8 +546,7 @@ subopts(SubOpts) ->
         qos => maps:get(qos, SubOpts, 0),
         rh => maps:get(rh, SubOpts, 0),
         rap => maps:get(rap, SubOpts, 0),
-        nl => maps:get(nl, SubOpts, 0),
-        share => maps:get(share, SubOpts, <<>>)
+        nl => maps:get(nl, SubOpts, 0)
     }.
 
 authresult_to_bool(AuthResult) ->

+ 2 - 9
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -629,15 +629,8 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
             {200, []};
         {Node, Subs} ->
             Formatter =
-                fun({Topic, SubOpts}) ->
-                    maps:merge(
-                        #{
-                            node => Node,
-                            clientid => ClientID,
-                            topic => Topic
-                        },
-                        maps:with([qos, nl, rap, rh], SubOpts)
-                    )
+                fun(_Sub = {Topic, SubOpts}) ->
+                    emqx_mgmt_api_subscriptions:format(Node, {{Topic, ClientID}, SubOpts})
                 end,
             {200, lists:map(Formatter, Subs)}
     end.

+ 21 - 20
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -20,6 +20,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
@@ -38,8 +39,6 @@
     format/2
 ]).
 
--define(SUBS_QTABLE, emqx_suboption).
-
 -define(SUBS_QSCHEMA, [
     {<<"clientid">>, binary},
     {<<"topic">>, binary},
@@ -146,7 +145,7 @@ subscriptions(get, #{query_string := QString}) ->
         case maps:get(<<"node">>, QString, undefined) of
             undefined ->
                 emqx_mgmt_api:cluster_query(
-                    ?SUBS_QTABLE,
+                    ?SUBOPTION,
                     QString,
                     ?SUBS_QSCHEMA,
                     fun ?MODULE:qs2ms/2,
@@ -157,7 +156,7 @@ subscriptions(get, #{query_string := QString}) ->
                     {ok, Node1} ->
                         emqx_mgmt_api:node_query(
                             Node1,
-                            ?SUBS_QTABLE,
+                            ?SUBOPTION,
                             QString,
                             ?SUBS_QSCHEMA,
                             fun ?MODULE:qs2ms/2,
@@ -177,23 +176,16 @@ subscriptions(get, #{query_string := QString}) ->
             {200, Result}
     end.
 
-format(WhichNode, {{Topic, _Subscriber}, Options}) ->
+format(WhichNode, {{Topic, _Subscriber}, SubOpts}) ->
     maps:merge(
         #{
-            topic => get_topic(Topic, Options),
-            clientid => maps:get(subid, Options, null),
+            topic => emqx_topic:maybe_format_share(Topic),
+            clientid => maps:get(subid, SubOpts, null),
             node => WhichNode
         },
-        maps:with([qos, nl, rap, rh], Options)
+        maps:with([qos, nl, rap, rh], SubOpts)
     ).
 
-get_topic(Topic, #{share := <<"$queue">> = Group}) ->
-    emqx_topic:join([Group, Topic]);
-get_topic(Topic, #{share := Group}) ->
-    emqx_topic:join([<<"$share">>, Group, Topic]);
-get_topic(Topic, _) ->
-    Topic.
-
 %%--------------------------------------------------------------------
 %% QueryString to MatchSpec
 %%--------------------------------------------------------------------
@@ -213,10 +205,18 @@ gen_match_spec([{Key, '=:=', Value} | More], MtchHead) ->
 
 update_ms(clientid, X, {{Topic, Pid}, Opts}) ->
     {{Topic, Pid}, Opts#{subid => X}};
-update_ms(topic, X, {{_Topic, Pid}, Opts}) ->
+update_ms(topic, X, {{Topic, Pid}, Opts}) when
+    is_record(Topic, share)
+->
+    {{#share{group = '_', topic = X}, Pid}, Opts};
+update_ms(topic, X, {{Topic, Pid}, Opts}) when
+    is_binary(Topic) orelse Topic =:= '_'
+->
     {{X, Pid}, Opts};
-update_ms(share_group, X, {{Topic, Pid}, Opts}) ->
-    {{Topic, Pid}, Opts#{share => X}};
+update_ms(share_group, X, {{Topic, Pid}, Opts}) when
+    not is_record(Topic, share)
+->
+    {{#share{group = X, topic = Topic}, Pid}, Opts};
 update_ms(qos, X, {{Topic, Pid}, Opts}) ->
     {{Topic, Pid}, Opts#{qos => X}}.
 
@@ -227,5 +227,6 @@ fuzzy_filter_fun(Fuzzy) ->
 
 run_fuzzy_filter(_, []) ->
     true;
-run_fuzzy_filter(E = {{Topic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
-    emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).
+run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
+    {Filter, _SubOpts} = emqx_topic:parse(TopicFilter),
+    emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy).

+ 6 - 12
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -17,6 +17,7 @@
 -module(emqx_mgmt_api_topics).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_router.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -101,10 +102,10 @@ fields(topic) ->
 %%%==============================================================================================
 %% parameters trans
 topics(get, #{query_string := Qs}) ->
-    do_list(generate_topic(Qs)).
+    do_list(Qs).
 
 topic(get, #{bindings := Bindings}) ->
-    lookup(generate_topic(Bindings)).
+    lookup(Bindings).
 
 %%%==============================================================================================
 %% api apply
@@ -139,13 +140,6 @@ lookup(#{topic := Topic}) ->
 
 %%%==============================================================================================
 %% internal
-generate_topic(Params = #{<<"topic">> := Topic}) ->
-    Params#{<<"topic">> => Topic};
-generate_topic(Params = #{topic := Topic}) ->
-    Params#{topic => Topic};
-generate_topic(Params) ->
-    Params.
-
 -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
 qs2ms(_Tab, {Qs, _}) ->
     #{
@@ -160,9 +154,9 @@ gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) ->
 gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
     gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
 
-format(#route{topic = Topic, dest = {_, Node}}) ->
-    #{topic => Topic, node => Node};
-format(#route{topic = Topic, dest = Node}) ->
+format(#route{topic = Topic, dest = {Group, Node}}) ->
+    #{topic => ?SHARE(Group, Topic), node => Node};
+format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
     #{topic => Topic, node => Node}.
 
 topic_param(In) ->

+ 2 - 0
apps/emqx_modules/src/emqx_rewrite.erl

@@ -158,6 +158,8 @@ match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) ->
         false -> match_and_rewrite(Topic, Rules, Binds)
     end.
 
+rewrite(SharedRecord = #share{topic = Topic}, MP, Dest, Binds) ->
+    SharedRecord#share{topic = rewrite(Topic, MP, Dest, Binds)};
 rewrite(Topic, MP, Dest, Binds) ->
     case re:run(Topic, MP, [{capture, all_but_first, list}]) of
         {match, Captured} ->

+ 2 - 1
apps/emqx_retainer/src/emqx_retainer.erl

@@ -20,6 +20,7 @@
 
 -include("emqx_retainer.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 
 -export([start_link/0]).
@@ -87,7 +88,7 @@
 %% Hook API
 %%------------------------------------------------------------------------------
 -spec on_session_subscribed(_, _, emqx_types:subopts(), _) -> any().
-on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined ->
+on_session_subscribed(_, #share{} = _Topic, _SubOpts, _) ->
     ok;
 on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) ->
     IsNew = maps:get(is_new, Opts, true),

+ 6 - 2
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -190,7 +190,9 @@ on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) ->
     apply_event(
         'session.subscribed',
         fun() ->
-            eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
+            eventmsg_sub_or_unsub(
+                'session.subscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts
+            )
         end,
         Conf
     ).
@@ -199,7 +201,9 @@ on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) ->
     apply_event(
         'session.unsubscribed',
         fun() ->
-            eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
+            eventmsg_sub_or_unsub(
+                'session.unsubscribed', ClientInfo, emqx_topic:maybe_format_share(Topic), SubOpts
+            )
         end,
         Conf
     ).

+ 2 - 1
apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
@@ -42,7 +43,7 @@ t_printable_maps(_) ->
         peerhost => {127, 0, 0, 1},
         peername => {{127, 0, 0, 1}, 9980},
         sockname => {{127, 0, 0, 1}, 1883},
-        redispatch_to => {<<"group">>, <<"sub/topic/+">>},
+        redispatch_to => ?REDISPATCH_TO(<<"group">>, <<"sub/topic/+">>),
         shared_dispatch_ack => {self(), ref}
     },
     Converted = emqx_rule_events:printable_maps(Headers),

+ 9 - 0
changes/ce/fix-10976.en.md

@@ -0,0 +1,9 @@
+Fix topic-filter overlapping handling in shared subscription.
+In the previous implementation, the storage method for subscription options did not provide adequate support for shared subscriptions. This resulted in message routing failures and leakage of routing tables between nodes during the "subscribe-unsubscribe" process with specific order and topics.
+
+## Breaking changes
+* Hook callback `session.subscribed` and `client.subscribe` will now receive shared subscription in its full representation, e.g. `$share/group1/topic1/#`, and the `share` property is deleted from `subopts`.
+* Hook callback `session.unsubscribed` and `client.unsubscribe` will now receive shared subscription in its full representation, e.g. `$share/group1/topic1/#` instead of just `topic1/#`.
+* ExHook Proto changed. The `share` field in message `SubOpts` was deprecated.
+  ExHook Server will now receive shared subscription in its full representation, e.g. `$share/group1/topic1/#`, and the `share` property is deleted from message `SubOpts`.
+* `session.subscribed` and `session.unsubscribed` rule-engine events will have shared subscriptions in their full representation for `topic`, e.g. `$share/group1/topic1/#` instead of just `topic1/#`.