소스 검색

Merge pull request #10906 from JimMoen/fix-shared-sub-unsub

refacting and small bug fix of share-sub.
Part.1 of mqtt client shared sub-unsub.
JimMoen 2 년 전
부모
커밋
d27903c1e9
37개의 변경된 파일255개의 추가작업 그리고 139개의 파일을 삭제
  1. 10 0
      apps/emqx/src/emqx_cm.hrl
  2. 12 0
      apps/emqx/include/emqx_mqtt.hrl
  3. 31 0
      apps/emqx/include/emqx_router.hrl
  4. 4 7
      apps/emqx/src/emqx_broker.erl
  5. 13 7
      apps/emqx/src/emqx_channel.erl
  6. 4 11
      apps/emqx/src/emqx_cm.erl
  7. 7 7
      apps/emqx/src/emqx_cm_registry.erl
  8. 2 0
      apps/emqx/src/emqx_frame.erl
  9. 1 2
      apps/emqx/src/emqx_mqueue.erl
  10. 1 2
      apps/emqx/src/emqx_router.erl
  11. 4 5
      apps/emqx/src/emqx_router_helper.erl
  12. 5 7
      apps/emqx/src/emqx_session_router.erl
  13. 2 2
      apps/emqx/src/emqx_shared_sub.erl
  14. 56 29
      apps/emqx/src/emqx_topic.erl
  15. 3 3
      apps/emqx/src/emqx_trie.erl
  16. 12 3
      apps/emqx/src/emqx_types.erl
  17. 2 1
      apps/emqx/src/emqx_ws_connection.erl
  18. 1 1
      apps/emqx/src/proto/emqx_cm_proto_v1.erl
  19. 1 1
      apps/emqx/src/proto/emqx_cm_proto_v2.erl
  20. 4 3
      apps/emqx/test/emqx_cm_SUITE.erl
  21. 3 2
      apps/emqx/test/emqx_quic_multistreams_SUITE.erl
  22. 2 1
      apps/emqx/test/emqx_router_SUITE.erl
  23. 3 3
      apps/emqx/test/emqx_router_helper_SUITE.erl
  24. 2 1
      apps/emqx/test/emqx_takeover_SUITE.erl
  25. 28 3
      apps/emqx/test/emqx_topic_SUITE.erl
  26. 1 1
      apps/emqx_authz/src/emqx_authz_mnesia.erl
  27. 1 1
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl
  28. 1 1
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl
  29. 2 1
      apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
  30. 2 1
      apps/emqx_gateway/test/test_mqtt_broker.erl
  31. 6 4
      apps/emqx_management/src/emqx_mgmt.erl
  32. 4 4
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  33. 2 1
      apps/emqx_management/src/emqx_mgmt_api_topics.erl
  34. 13 11
      apps/emqx_management/src/emqx_mgmt_cli.erl
  35. 1 2
      apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl
  36. 7 7
      apps/emqx_retainer/src/emqx_retainer_index.erl
  37. 2 4
      rel/i18n/emqx_schema.hocon

+ 10 - 0
apps/emqx/src/emqx_cm.hrl

@@ -13,9 +13,19 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
+
 -ifndef(EMQX_CM_HRL).
 -define(EMQX_CM_HRL, true).
 
+%% Tables for channel management.
+-define(CHAN_TAB, emqx_channel).
+-define(CHAN_CONN_TAB, emqx_channel_conn).
+-define(CHAN_INFO_TAB, emqx_channel_info).
+-define(CHAN_LIVE_TAB, emqx_channel_live).
+
+%% Mria/Mnesia Tables for channel management.
+-define(CHAN_REG_TAB, emqx_channel_registry).
+
 -define(T_KICK, 5_000).
 -define(T_GET_INFO, 5_000).
 -define(T_TAKEOVER, 15_000).

+ 12 - 0
apps/emqx/include/emqx_mqtt.hrl

@@ -48,6 +48,13 @@
     {?MQTT_PROTO_V5, <<"MQTT">>}
 ]).
 
+%%--------------------------------------------------------------------
+%% MQTT Topic and TopitFilter byte length
+%%--------------------------------------------------------------------
+
+%% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3]
+-define(MAX_TOPIC_LEN, 65535).
+
 %%--------------------------------------------------------------------
 %% MQTT QoS Levels
 %%--------------------------------------------------------------------
@@ -662,6 +669,11 @@ end).
     end
 ).
 
+-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty).
+-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty).
+-define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter').
+-define(SHARE_NAME_INVALID_CHAR, share_subscription_group_name_cannot_include_wildcard).
+
 -define(FRAME_PARSE_ERROR, frame_parse_error).
 -define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
 -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).

+ 31 - 0
apps/emqx/include/emqx_router.hrl

@@ -0,0 +1,31 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_ROUTER_HRL).
+-define(EMQX_ROUTER_HRL, true).
+
+%% ETS table for message routing
+-define(ROUTE_TAB, emqx_route).
+
+%% Mnesia table for message routing
+-define(ROUTING_NODE, emqx_routing_node).
+
+%% ETS tables for PubSub
+-define(SUBOPTION, emqx_suboption).
+-define(SUBSCRIBER, emqx_subscriber).
+-define(SUBSCRIPTION, emqx_subscription).
+
+-endif.

+ 4 - 7
apps/emqx/src/emqx_broker.erl

@@ -19,6 +19,8 @@
 -behaviour(gen_server).
 
 -include("emqx.hrl").
+-include("emqx_router.hrl").
+
 -include("logger.hrl").
 -include("types.hrl").
 -include("emqx_mqtt.hrl").
@@ -80,11 +82,6 @@
 
 -define(BROKER, ?MODULE).
 
-%% ETS tables for PubSub
--define(SUBOPTION, emqx_suboption).
--define(SUBSCRIBER, emqx_subscriber).
--define(SUBSCRIPTION, emqx_subscription).
-
 %% Guards
 -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).
 
@@ -106,10 +103,10 @@ start_link(Pool, Id) ->
 create_tabs() ->
     TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
 
-    %% SubOption: {Topic, SubPid} -> SubOption
+    %% SubOption: {TopicFilter, SubPid} -> SubOption
     ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]),
 
-    %% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
+    %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ...
     %% duplicate_bag: o(1) insert
     ok = emqx_utils_ets:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
 

+ 13 - 7
apps/emqx/src/emqx_channel.erl

@@ -484,13 +484,13 @@ handle_in(
             {ok, Channel}
     end;
 handle_in(
-    Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
+    SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
     Channel = #channel{clientinfo = ClientInfo}
 ) ->
-    case emqx_packet:check(Packet) of
+    case emqx_packet:check(SubPkt) of
         ok ->
             TopicFilters0 = parse_topic_filters(TopicFilters),
-            TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0),
+            TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
             TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel),
             HasAuthzDeny = lists:any(
                 fun({_TopicFilter, ReasonCode}) ->
@@ -503,7 +503,10 @@ handle_in(
                 true ->
                     handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
                 false ->
-                    TopicFilters2 = [TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
+                    TopicFilters2 = [
+                        TopicFilter
+                     || {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0
+                    ],
                     TopicFilters3 = run_hooks(
                         'client.subscribe',
                         [ClientInfo, Properties],
@@ -1866,6 +1869,9 @@ check_pub_caps(
 %%--------------------------------------------------------------------
 %% Check Sub Authorization
 
+%% TODO: not only check topic filter. Qos chould be checked too.
+%% Not implemented yet:
+%% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7]
 check_sub_authzs(TopicFilters, Channel) ->
     check_sub_authzs(TopicFilters, Channel, []).
 
@@ -1876,7 +1882,7 @@ check_sub_authzs(
 ) ->
     case emqx_access_control:authorize(ClientInfo, subscribe, Topic) of
         allow ->
-            check_sub_authzs(More, Channel, [{TopicFilter, 0} | Acc]);
+            check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
         deny ->
             check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
     end;
@@ -1892,9 +1898,9 @@ check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
 %%--------------------------------------------------------------------
 %% Enrich SubId
 
-put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
+enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
     [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
-put_subid_in_subopts(_Properties, TopicFilters) ->
+enrich_subopts_subid(_Properties, TopicFilters) ->
     TopicFilters.
 
 %%--------------------------------------------------------------------

+ 4 - 11
apps/emqx/src/emqx_cm.erl

@@ -20,6 +20,7 @@
 -behaviour(gen_server).
 
 -include("emqx.hrl").
+-include("emqx_cm.hrl").
 -include("logger.hrl").
 -include("types.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -118,14 +119,6 @@
     _Stats :: emqx_types:stats()
 }.
 
--include("emqx_cm.hrl").
-
-%% Tables for channel management.
--define(CHAN_TAB, emqx_channel).
--define(CHAN_CONN_TAB, emqx_channel_conn).
--define(CHAN_INFO_TAB, emqx_channel_info).
--define(CHAN_LIVE_TAB, emqx_channel_live).
-
 -define(CHAN_STATS, [
     {?CHAN_TAB, 'channels.count', 'channels.max'},
     {?CHAN_TAB, 'sessions.count', 'sessions.max'},
@@ -669,12 +662,12 @@ lookup_client({username, Username}) ->
     MatchSpec = [
         {{'_', #{clientinfo => #{username => '$1'}}, '_'}, [{'=:=', '$1', Username}], ['$_']}
     ],
-    ets:select(emqx_channel_info, MatchSpec);
+    ets:select(?CHAN_INFO_TAB, MatchSpec);
 lookup_client({clientid, ClientId}) ->
     [
         Rec
-     || Key <- ets:lookup(emqx_channel, ClientId),
-        Rec <- ets:lookup(emqx_channel_info, Key)
+     || Key <- ets:lookup(?CHAN_TAB, ClientId),
+        Rec <- ets:lookup(?CHAN_INFO_TAB, Key)
     ].
 
 %% @private

+ 7 - 7
apps/emqx/src/emqx_cm_registry.erl

@@ -20,6 +20,7 @@
 -behaviour(gen_server).
 
 -include("emqx.hrl").
+-include("emqx_cm.hrl").
 -include("logger.hrl").
 -include("types.hrl").
 
@@ -50,7 +51,6 @@
 ]).
 
 -define(REGISTRY, ?MODULE).
--define(TAB, emqx_channel_registry).
 -define(LOCK, {?MODULE, cleanup_down}).
 
 -record(channel, {chid, pid}).
@@ -78,7 +78,7 @@ register_channel(ClientId) when is_binary(ClientId) ->
     register_channel({ClientId, self()});
 register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
-        true -> mria:dirty_write(?TAB, record(ClientId, ChanPid));
+        true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
         false -> ok
     end.
 
@@ -91,14 +91,14 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
     unregister_channel({ClientId, self()});
 unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
     case is_enabled() of
-        true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid));
+        true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
         false -> ok
     end.
 
 %% @doc Lookup the global channels.
 -spec lookup_channels(emqx_types:clientid()) -> list(pid()).
 lookup_channels(ClientId) ->
-    [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)].
+    [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)].
 
 record(ClientId, ChanPid) ->
     #channel{chid = ClientId, pid = ChanPid}.
@@ -109,7 +109,7 @@ record(ClientId, ChanPid) ->
 
 init([]) ->
     mria_config:set_dirty_shard(?CM_SHARD, true),
-    ok = mria:create_table(?TAB, [
+    ok = mria:create_table(?CHAN_REG_TAB, [
         {type, bag},
         {rlog_shard, ?CM_SHARD},
         {storage, ram_copies},
@@ -166,7 +166,7 @@ cleanup_channels(Node) ->
 
 do_cleanup_channels(Node) ->
     Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
-    lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)).
+    lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)).
 
 delete_channel(Chan) ->
-    mnesia:delete_object(?TAB, Chan, write).
+    mnesia:delete_object(?CHAN_REG_TAB, Chan, write).

+ 2 - 0
apps/emqx/src/emqx_frame.erl

@@ -300,6 +300,7 @@ parse_connect2(
     ConnPacket = #mqtt_packet_connect{
         proto_name = ProtoName,
         proto_ver = ProtoVer,
+        %% For bridge mode, non-standard implementation
         is_bridge = (BridgeTag =:= 8),
         clean_start = bool(CleanStart),
         will_flag = bool(WillFlag),
@@ -762,6 +763,7 @@ serialize_variable(
     #mqtt_packet_connect{
         proto_name = ProtoName,
         proto_ver = ProtoVer,
+        %% For bridge mode, non-standard implementation
         is_bridge = IsBridge,
         clean_start = CleanStart,
         will_flag = WillFlag,

+ 1 - 2
apps/emqx/src/emqx_mqueue.erl

@@ -75,11 +75,10 @@
 
 -export_type([mqueue/0, options/0]).
 
--type topic() :: emqx_types:topic().
 -type priority() :: infinity | integer().
 -type pq() :: emqx_pqueue:q().
 -type count() :: non_neg_integer().
--type p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}.
+-type p_table() :: ?NO_PRIORITY_TABLE | #{emqx_types:topic() := priority()}.
 -type options() :: #{
     max_len := count(),
     priorities => p_table(),

+ 1 - 2
apps/emqx/src/emqx_router.erl

@@ -22,6 +22,7 @@
 -include("logger.hrl").
 -include("types.hrl").
 -include_lib("mria/include/mria.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 
 %% Mnesia bootstrap
 -export([mnesia/1]).
@@ -69,8 +70,6 @@
 
 -type dest() :: node() | {group(), node()}.
 
--define(ROUTE_TAB, emqx_route).
-
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------

+ 4 - 5
apps/emqx/src/emqx_router_helper.erl

@@ -19,6 +19,7 @@
 -behaviour(gen_server).
 
 -include("emqx.hrl").
+-include("emqx_router.hrl").
 -include("logger.hrl").
 -include("types.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -54,8 +55,6 @@
 
 -record(routing_node, {name, const = unused}).
 
--define(ROUTE, emqx_route).
--define(ROUTING_NODE, emqx_routing_node).
 -define(LOCK, {?MODULE, cleanup_routes}).
 
 -dialyzer({nowarn_function, [cleanup_routes/1]}).
@@ -185,7 +184,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 stats_fun() ->
-    case ets:info(?ROUTE, size) of
+    case ets:info(?ROUTE_TAB, size) of
         undefined ->
             ok;
         Size ->
@@ -198,6 +197,6 @@ cleanup_routes(Node) ->
         #route{_ = '_', dest = {'_', Node}}
     ],
     [
-        mnesia:delete_object(?ROUTE, Route, write)
-     || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)
+        mnesia:delete_object(?ROUTE_TAB, Route, write)
+     || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
     ].

+ 5 - 7
apps/emqx/src/emqx_session_router.erl

@@ -57,9 +57,7 @@
     code_change/3
 ]).
 
--type group() :: binary().
-
--type dest() :: node() | {group(), node()}.
+-type dest() :: node() | {emqx_types:group(), node()}.
 
 -define(ROUTE_RAM_TAB, emqx_session_route_ram).
 -define(ROUTE_DISC_TAB, emqx_session_route_disc).
@@ -114,7 +112,7 @@ start_link(Pool, Id) ->
 %% Route APIs
 %%--------------------------------------------------------------------
 
--spec do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}.
+-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_add_route(Topic, SessionID) when is_binary(Topic) ->
     Route = #route{topic = Topic, dest = SessionID},
     case lists:member(Route, lookup_routes(Topic)) of
@@ -135,7 +133,7 @@ do_add_route(Topic, SessionID) when is_binary(Topic) ->
     end.
 
 %% @doc Match routes
--spec match_routes(emqx_topic:topic()) -> [emqx_types:route()].
+-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
 match_routes(Topic) when is_binary(Topic) ->
     case match_trie(Topic) of
         [] -> lookup_routes(Topic);
@@ -153,7 +151,7 @@ match_trie(Topic) ->
 delete_routes(SessionID, Subscriptions) ->
     cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}).
 
--spec do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}.
+-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_delete_route(Topic, SessionID) ->
     Route = #route{topic = Topic, dest = SessionID},
     case emqx_topic:wildcard(Topic) of
@@ -165,7 +163,7 @@ do_delete_route(Topic, SessionID) ->
     end.
 
 %% @doc Print routes to a topic
--spec print_routes(emqx_topic:topic()) -> ok.
+-spec print_routes(emqx_types:topic()) -> ok.
 print_routes(Topic) ->
     lists:foreach(
         fun(#route{topic = To, dest = SessionID}) ->

+ 2 - 2
apps/emqx/src/emqx_shared_sub.erl

@@ -97,7 +97,7 @@
 -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
 -define(SUBSCRIBER_DOWN, noproc).
 
--type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
+-type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()).
 
 -record(state, {pmon}).
 
@@ -156,7 +156,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
             end
     end.
 
--spec strategy(emqx_topic:group()) -> strategy().
+-spec strategy(emqx_types:group()) -> strategy().
 strategy(Group) ->
     try
         emqx:get_config([

+ 56 - 29
apps/emqx/src/emqx_topic.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_topic).
 
+-include("emqx_mqtt.hrl").
+
 %% APIs
 -export([
     match/2,
@@ -33,18 +35,14 @@
     parse/2
 ]).
 
--export_type([
-    group/0,
-    topic/0,
-    word/0
-]).
-
--type group() :: binary().
--type topic() :: binary().
--type word() :: '' | '+' | '#' | binary().
--type words() :: list(word()).
+-type topic() :: emqx_types:topic().
+-type word() :: emqx_types:word().
+-type words() :: emqx_types:words().
 
--define(MAX_TOPIC_LEN, 65535).
+%% Guards
+-define(MULTI_LEVEL_WILDCARD_NOT_LAST(C, REST),
+    ((C =:= '#' orelse C =:= <<"#">>) andalso REST =/= [])
+).
 
 %%--------------------------------------------------------------------
 %% APIs
@@ -97,11 +95,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
 
 -spec validate(name | filter, topic()) -> true.
 validate(_, <<>>) ->
+    %% MQTT-5.0 [MQTT-4.7.3-1]
     error(empty_topic);
 validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
+    %% MQTT-5.0 [MQTT-4.7.3-3]
     error(topic_too_long);
-validate(filter, Topic) when is_binary(Topic) ->
-    validate2(words(Topic));
+validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) ->
+    validate_share(SharedFilter);
+validate(filter, Filter) when is_binary(Filter) ->
+    validate2(words(Filter));
 validate(name, Topic) when is_binary(Topic) ->
     Words = words(Topic),
     validate2(Words) andalso
@@ -113,7 +115,8 @@ validate2([]) ->
 % end with '#'
 validate2(['#']) ->
     true;
-validate2(['#' | Words]) when length(Words) > 0 ->
+%% MQTT-5.0 [MQTT-4.7.1-1]
+validate2([C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) ->
     error('topic_invalid_#');
 validate2(['' | Words]) ->
     validate2(Words);
@@ -129,6 +132,32 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
 validate3(<<_/utf8, Rest/binary>>) ->
     validate3(Rest).
 
+validate_share(<<"$share/", Rest/binary>>) when
+    Rest =:= <<>> orelse Rest =:= <<"/">>
+->
+    %% MQTT-5.0 [MQTT-4.8.2-1]
+    error(?SHARE_EMPTY_FILTER);
+validate_share(<<"$share/", Rest/binary>>) ->
+    case binary:split(Rest, <<"/">>) of
+        %% MQTT-5.0 [MQTT-4.8.2-1]
+        [<<>>, _] ->
+            error(?SHARE_EMPTY_GROUP);
+        %% MQTT-5.0 [MQTT-4.7.3-1]
+        [_, <<>>] ->
+            error(?SHARE_EMPTY_FILTER);
+        [ShareName, Filter] ->
+            validate_share(ShareName, Filter)
+    end.
+
+validate_share(_, <<"$share/", _Rest/binary>>) ->
+    error(?SHARE_RECURSIVELY);
+validate_share(ShareName, Filter) ->
+    case binary:match(ShareName, [<<"+">>, <<"#">>]) of
+        %% MQTT-5.0 [MQTT-4.8.2-2]
+        nomatch -> validate2(words(Filter));
+        _ -> error(?SHARE_NAME_INVALID_CHAR)
+    end.
+
 %% @doc Prepend a topic prefix.
 %% Ensured to have only one / between prefix and suffix.
 prepend(undefined, W) ->
@@ -142,6 +171,7 @@ prepend(Parent0, W) ->
         _ -> <<Parent/binary, $/, (bin(W))/binary>>
     end.
 
+-spec bin(word()) -> binary().
 bin('') -> <<>>;
 bin('+') -> <<"+">>;
 bin('#') -> <<"#">>;
@@ -163,6 +193,7 @@ tokens(Topic) ->
 words(Topic) when is_binary(Topic) ->
     [word(W) || W <- tokens(Topic)].
 
+-spec word(binary()) -> word().
 word(<<>>) -> '';
 word(<<"+">>) -> '+';
 word(<<"#">>) -> '#';
@@ -185,23 +216,19 @@ feed_var(Var, Val, [Var | Words], Acc) ->
 feed_var(Var, Val, [W | Words], Acc) ->
     feed_var(Var, Val, Words, [W | Acc]).
 
--spec join(list(binary())) -> binary().
+-spec join(list(word())) -> binary().
 join([]) ->
     <<>>;
-join([W]) ->
-    bin(W);
-join(Words) ->
-    {_, Bin} = lists:foldr(
-        fun
-            (W, {true, Tail}) ->
-                {false, <<W/binary, Tail/binary>>};
-            (W, {false, Tail}) ->
-                {false, <<W/binary, "/", Tail/binary>>}
-        end,
-        {true, <<>>},
-        [bin(W) || W <- Words]
-    ),
-    Bin.
+join([Word | Words]) ->
+    do_join(bin(Word), Words).
+
+do_join(TopicAcc, []) ->
+    TopicAcc;
+%% MQTT-5.0 [MQTT-4.7.1-1]
+do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) ->
+    error('topic_invalid_#');
+do_join(TopicAcc, [Word | Words]) ->
+    do_join(<<TopicAcc/binary, "/", (bin(Word))/binary>>, Words).
 
 -spec parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}.
 parse(TopicFilter) when is_binary(TopicFilter) ->

+ 3 - 3
apps/emqx/src/emqx_trie.erl

@@ -114,7 +114,7 @@ create_session_trie(Type) ->
 insert(Topic) when is_binary(Topic) ->
     insert(Topic, ?TRIE).
 
--spec insert_session(emqx_topic:topic()) -> ok.
+-spec insert_session(emqx_types:topic()) -> ok.
 insert_session(Topic) when is_binary(Topic) ->
     insert(Topic, session_trie()).
 
@@ -132,7 +132,7 @@ delete(Topic) when is_binary(Topic) ->
     delete(Topic, ?TRIE).
 
 %% @doc Delete a topic filter from the trie.
--spec delete_session(emqx_topic:topic()) -> ok.
+-spec delete_session(emqx_types:topic()) -> ok.
 delete_session(Topic) when is_binary(Topic) ->
     delete(Topic, session_trie()).
 
@@ -148,7 +148,7 @@ delete(Topic, Trie) when is_binary(Topic) ->
 match(Topic) when is_binary(Topic) ->
     match(Topic, ?TRIE).
 
--spec match_session(emqx_topic:topic()) -> list(emqx_topic:topic()).
+-spec match_session(emqx_types:topic()) -> list(emqx_types:topic()).
 match_session(Topic) when is_binary(Topic) ->
     match(Topic, session_trie()).
 

+ 12 - 3
apps/emqx/src/emqx_types.erl

@@ -29,10 +29,16 @@
 -export_type([
     zone/0,
     pubsub/0,
-    topic/0,
     subid/0
 ]).
 
+-export_type([
+    group/0,
+    topic/0,
+    word/0,
+    words/0
+]).
+
 -export_type([
     socktype/0,
     sockstate/0,
@@ -122,9 +128,13 @@
 
 -type zone() :: atom().
 -type pubsub() :: publish | subscribe.
--type topic() :: emqx_topic:topic().
 -type subid() :: binary() | atom().
 
+-type group() :: binary() | undefined.
+-type topic() :: binary().
+-type word() :: '' | '+' | '#' | binary().
+-type words() :: list(word()).
+
 -type socktype() :: tcp | udp | ssl | proxy | atom().
 -type sockstate() :: idle | running | blocked | closed.
 -type conninfo() :: #{
@@ -230,7 +240,6 @@
     | {share, topic(), deliver_result()}
 ].
 -type route() :: #route{}.
--type group() :: emqx_topic:group().
 -type route_entry() :: {topic(), node()} | {topic, group()}.
 -type command() :: #command{}.
 

+ 2 - 1
apps/emqx/src/emqx_ws_connection.erl

@@ -18,6 +18,7 @@
 -module(emqx_ws_connection).
 
 -include("emqx.hrl").
+-include("emqx_cm.hrl").
 -include("emqx_mqtt.hrl").
 -include("logger.hrl").
 -include("types.hrl").
@@ -1034,7 +1035,7 @@ check_max_connection(Type, Listener) ->
             allow;
         Max ->
             MatchSpec = [{{'_', emqx_ws_connection}, [], [true]}],
-            Curr = ets:select_count(emqx_channel_conn, MatchSpec),
+            Curr = ets:select_count(?CHAN_CONN_TAB, MatchSpec),
             case Curr >= Max of
                 false ->
                     allow;

+ 1 - 1
apps/emqx/src/proto/emqx_cm_proto_v1.erl

@@ -33,7 +33,7 @@
 ]).
 
 -include("bpapi.hrl").
--include("src/emqx_cm.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
 
 introduced_in() ->
     "5.0.0".

+ 1 - 1
apps/emqx/src/proto/emqx_cm_proto_v2.erl

@@ -34,7 +34,7 @@
 ]).
 
 -include("bpapi.hrl").
--include("src/emqx_cm.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
 
 introduced_in() ->
     "5.0.0".

+ 4 - 3
apps/emqx/test/emqx_cm_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -200,10 +201,10 @@ t_open_session_race_condition(_) ->
         end,
     Winner = WaitForDowns(Pids),
 
-    ?assertMatch([_], ets:lookup(emqx_channel, ClientId)),
+    ?assertMatch([_], ets:lookup(?CHAN_TAB, ClientId)),
     ?assertEqual([Winner], emqx_cm:lookup_channels(ClientId)),
-    ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Winner})),
-    ?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)),
+    ?assertMatch([_], ets:lookup(?CHAN_CONN_TAB, {ClientId, Winner})),
+    ?assertMatch([_], ets:lookup(?CHAN_REG_TAB, ClientId)),
 
     exit(Winner, kill),
     receive

+ 3 - 2
apps/emqx/test/emqx_quic_multistreams_SUITE.erl

@@ -23,6 +23,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("quicer/include/quicer.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -1465,7 +1466,7 @@ t_multi_streams_emqx_ctrl_kill(Config) ->
     ),
 
     ClientId = proplists:get_value(clientid, emqtt:info(C)),
-    [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
+    [{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId),
     exit(TransPid, kill),
 
     %% Client should be closed
@@ -1518,7 +1519,7 @@ t_multi_streams_emqx_ctrl_exit_normal(Config) ->
     ),
 
     ClientId = proplists:get_value(clientid, emqtt:info(C)),
-    [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
+    [{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId),
 
     emqx_connection:stop(TransPid),
     %% Client exit normal.

+ 2 - 1
apps/emqx/test/emqx_router_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
@@ -127,5 +128,5 @@ t_unexpected(_) ->
 clear_tables() ->
     lists:foreach(
         fun mnesia:clear_table/1,
-        [emqx_route, emqx_trie, emqx_trie_node]
+        [?ROUTE_TAB, ?TRIE, emqx_trie_node]
     ).

+ 3 - 3
apps/emqx/test/emqx_router_helper_SUITE.erl

@@ -20,11 +20,11 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(ROUTER_HELPER, emqx_router_helper).
--define(ROUTE_TAB, emqx_route).
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
@@ -82,9 +82,9 @@ t_monitor(_) ->
     emqx_router_helper:monitor(undefined).
 
 t_mnesia(_) ->
-    ?ROUTER_HELPER ! {mnesia_table_event, {delete, {emqx_routing_node, node()}, undefined}},
+    ?ROUTER_HELPER ! {mnesia_table_event, {delete, {?ROUTING_NODE, node()}, undefined}},
     ?ROUTER_HELPER ! {mnesia_table_event, testing},
-    ?ROUTER_HELPER ! {mnesia_table_event, {write, {emqx_routing_node, node()}, undefined}},
+    ?ROUTER_HELPER ! {mnesia_table_event, {write, {?ROUTING_NODE, node()}, undefined}},
     ?ROUTER_HELPER ! {membership, testing},
     ?ROUTER_HELPER ! {membership, {mnesia, down, node()}},
     ct:sleep(200).

+ 2 - 1
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -117,7 +118,7 @@ load_meck(ClientId) ->
     [ChanPid] = emqx_cm:lookup_channels(ClientId),
     ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId),
     NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}},
-    true = ets:update_element(emqx_channel_info, {ClientId, ChanPid}, {2, NChanInfo}).
+    true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}).
 
 unload_meck(_ClientId) ->
     meck:unload(fake_conn_mod).

+ 28 - 3
apps/emqx/test/emqx_topic_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
 
 -import(
@@ -130,14 +131,35 @@ t_validate(_) ->
     true = validate({filter, <<"x">>}),
     true = validate({name, <<"x//y">>}),
     true = validate({filter, <<"sport/tennis/#">>}),
+    %% MQTT-5.0 [MQTT-4.7.3-1]
     ?assertError(empty_topic, validate({name, <<>>})),
+    ?assertError(empty_topic, validate({filter, <<>>})),
     ?assertError(topic_name_error, validate({name, <<"abc/#">>})),
     ?assertError(topic_too_long, validate({name, long_topic()})),
-    ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})),
     ?assertError(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})),
     ?assertError(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})),
     ?assertError(topic_invalid_char, validate({filter, <<"sport/tennis#">>})),
-    ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})).
+    %% MQTT-5.0 [MQTT-4.7.1-1]
+    ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})),
+    ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})),
+    %% MQTT-5.0 [MQTT-4.8.2-1]
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/">>})),
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share//">>})),
+    ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//t">>})),
+    ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//test">>})),
+    %% MQTT-5.0 [MQTT-4.7.3-1] for shared-sub
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g/">>})),
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g2/">>})),
+    %% MQTT-5.0 [MQTT-4.8.2-2]
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/p+q/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/m+/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/+n/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#y/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/#y/1">>})),
+    %% share recursively
+    ?assertError(?SHARE_RECURSIVELY, validate({filter, <<"$share/g1/$share/t">>})),
+    true = validate({filter, <<"$share/g1/topic/$share">>}).
 
 t_sigle_level_validate(_) ->
     true = validate({filter, <<"+">>}),
@@ -177,7 +199,10 @@ t_join(_) ->
     ?assertEqual(<<"+//#">>, join(['+', '', '#'])),
     ?assertEqual(<<"x/y/z/+">>, join([<<"x">>, <<"y">>, <<"z">>, '+'])),
     ?assertEqual(<<"/ab/cd/ef/">>, join(words(<<"/ab/cd/ef/">>))),
-    ?assertEqual(<<"ab/+/#">>, join(words(<<"ab/+/#">>))).
+    ?assertEqual(<<"ab/+/#">>, join(words(<<"ab/+/#">>))),
+    %% MQTT-5.0 [MQTT-4.7.1-1]
+    ?assertError('topic_invalid_#', join(['+', <<"a">>, '#', <<"b">>, '', '+'])),
+    ?assertError('topic_invalid_#', join(['+', <<"c">>, <<"#">>, <<"d">>, '', '+'])).
 
 t_systop(_) ->
     SysTop1 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/xyz"]),

+ 1 - 1
apps/emqx_authz/src/emqx_authz_mnesia.erl

@@ -33,7 +33,7 @@
 -type clientid() :: {clientid, binary()}.
 -type who() :: username() | clientid() | all.
 
--type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_topic:topic()}.
+-type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_types:topic()}.
 -type rules() :: [rule()].
 
 -record(emqx_acl, {

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl

@@ -49,7 +49,7 @@
 
 -type egress() :: #{
     local => #{
-        topic => emqx_topic:topic()
+        topic => emqx_types:topic()
     },
     remote := emqx_bridge_mqtt_msg:msgvars()
 }.

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl

@@ -43,7 +43,7 @@
 -type ingress() :: #{
     server := string(),
     remote := #{
-        topic := emqx_topic:topic(),
+        topic := emqx_types:topic(),
         qos => emqx_types:qos()
     },
     local := emqx_bridge_mqtt_msg:msgvars(),

+ 2 - 1
apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl

@@ -11,6 +11,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/asserts.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
 
 -import(
     emqx_eviction_agent_test_helpers,
@@ -295,7 +296,7 @@ t_session_serialization(_Config) ->
     ?assertMatch(
         #{data := [#{clientid := <<"client_with_session">>}]},
         emqx_mgmt_api:cluster_query(
-            emqx_channel_info,
+            ?CHAN_INFO_TAB,
             #{},
             [],
             fun emqx_mgmt_api_clients:qs2ms/2,

+ 2 - 1
apps/emqx_gateway/test/test_mqtt_broker.erl

@@ -24,6 +24,7 @@
 -record(state, {subscriber}).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
@@ -50,7 +51,7 @@ unsubscribe(Topic) ->
     gen_server:call(?MODULE, {unsubscribe, Topic}).
 
 get_subscrbied_topics() ->
-    [Topic || {_Client, Topic} <- ets:tab2list(emqx_subscription)].
+    [Topic || {_Client, Topic} <- ets:tab2list(?SUBSCRIPTION)].
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

+ 6 - 4
apps/emqx_management/src/emqx_mgmt.erl

@@ -17,6 +17,8 @@
 -module(emqx_mgmt).
 
 -include("emqx_mgmt.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
+
 -elvis([{elvis_style, invalid_dynamic_call, disable}]).
 -elvis([{elvis_style, god_modules, disable}]).
 
@@ -139,7 +141,7 @@ node_info() ->
         max_fds => proplists:get_value(
             max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))
         ),
-        connections => ets:info(emqx_channel, size),
+        connections => ets:info(?CHAN_TAB, size),
         node_status => 'running',
         uptime => proplists:get_value(uptime, BrokerInfo),
         version => iolist_to_binary(proplists:get_value(version, BrokerInfo)),
@@ -487,7 +489,7 @@ subscribe([], _ClientId, _TopicTables) ->
 -spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) ->
     {subscribe, _} | {error, atom()}.
 do_subscribe(ClientId, TopicTables) ->
-    case ets:lookup(emqx_channel, ClientId) of
+    case ets:lookup(?CHAN_TAB, ClientId) of
         [] -> {error, channel_not_found};
         [{_, Pid}] -> Pid ! {subscribe, TopicTables}
     end.
@@ -514,7 +516,7 @@ unsubscribe([], _ClientId, _Topic) ->
 -spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
     {unsubscribe, _} | {error, _}.
 do_unsubscribe(ClientId, Topic) ->
-    case ets:lookup(emqx_channel, ClientId) of
+    case ets:lookup(?CHAN_TAB, ClientId) of
         [] -> {error, channel_not_found};
         [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
     end.
@@ -537,7 +539,7 @@ unsubscribe_batch([], _ClientId, _Topics) ->
 -spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
     {unsubscribe_batch, _} | {error, _}.
 do_unsubscribe_batch(ClientId, Topics) ->
-    case ets:lookup(emqx_channel, ClientId) of
+    case ets:lookup(?CHAN_TAB, ClientId) of
         [] -> {error, channel_not_found};
         [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]}
     end.

+ 4 - 4
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -20,6 +20,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
 -include_lib("emqx/include/logger.hrl").
@@ -57,7 +58,6 @@
 %% for batch operation
 -export([do_subscribe/3]).
 
--define(CLIENT_QTAB, emqx_channel_info).
 -define(TAGS, [<<"Clients">>]).
 
 -define(CLIENT_QSCHEMA, [
@@ -666,7 +666,7 @@ list_clients(QString) ->
         case maps:get(<<"node">>, QString, undefined) of
             undefined ->
                 emqx_mgmt_api:cluster_query(
-                    ?CLIENT_QTAB,
+                    ?CHAN_INFO_TAB,
                     QString,
                     ?CLIENT_QSCHEMA,
                     fun ?MODULE:qs2ms/2,
@@ -678,7 +678,7 @@ list_clients(QString) ->
                         QStringWithoutNode = maps:without([<<"node">>], QString),
                         emqx_mgmt_api:node_query(
                             Node1,
-                            ?CLIENT_QTAB,
+                            ?CHAN_INFO_TAB,
                             QStringWithoutNode,
                             ?CLIENT_QSCHEMA,
                             fun ?MODULE:qs2ms/2,
@@ -753,7 +753,7 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
     %% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2),
     %% as the emqx_channel_info table will only be populated after the hook `client.connected`
     %% has returned. So if one want to subscribe topics in this hook, it will fail.
-    case ets:lookup(emqx_channel, ClientID) of
+    case ets:lookup(?CHAN_TAB, ClientID) of
         [] ->
             {404, ?CLIENTID_NOT_FOUND};
         _ ->

+ 2 - 1
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -17,6 +17,7 @@
 -module(emqx_mgmt_api_topics).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
@@ -111,7 +112,7 @@ do_list(Params) ->
     case
         emqx_mgmt_api:node_query(
             node(),
-            emqx_route,
+            ?ROUTE_TAB,
             Params,
             ?TOPICS_QUERY_SCHEMA,
             fun ?MODULE:qs2ms/2,

+ 13 - 11
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -17,6 +17,8 @@
 -module(emqx_mgmt_cli).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_cm.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/logger.hrl").
 
@@ -168,7 +170,7 @@ sort_map_list_field(Field, Map) ->
 %% @doc Query clients
 
 clients(["list"]) ->
-    dump(emqx_channel, client);
+    dump(?CHAN_TAB, client);
 clients(["show", ClientId]) ->
     if_client(ClientId, fun print/1);
 clients(["kick", ClientId]) ->
@@ -182,7 +184,7 @@ clients(_) ->
     ]).
 
 if_client(ClientId, Fun) ->
-    case ets:lookup(emqx_channel, (bin(ClientId))) of
+    case ets:lookup(?CHAN_TAB, (bin(ClientId))) of
         [] -> emqx_ctl:print("Not Found.~n");
         [Channel] -> Fun({client, Channel})
     end.
@@ -191,9 +193,9 @@ if_client(ClientId, Fun) ->
 %% @doc Topics Command
 
 topics(["list"]) ->
-    dump(emqx_route, emqx_topic);
+    dump(?ROUTE_TAB, emqx_topic);
 topics(["show", Topic]) ->
-    Routes = ets:lookup(emqx_route, bin(Topic)),
+    Routes = ets:lookup(?ROUTE_TAB, bin(Topic)),
     [print({emqx_topic, Route}) || Route <- Routes];
 topics(_) ->
     emqx_ctl:usage([
@@ -204,23 +206,23 @@ topics(_) ->
 subscriptions(["list"]) ->
     lists:foreach(
         fun(Suboption) ->
-            print({emqx_suboption, Suboption})
+            print({?SUBOPTION, Suboption})
         end,
-        ets:tab2list(emqx_suboption)
+        ets:tab2list(?SUBOPTION)
     );
 subscriptions(["show", ClientId]) ->
     case ets:lookup(emqx_subid, bin(ClientId)) of
         [] ->
             emqx_ctl:print("Not Found.~n");
         [{_, Pid}] ->
-            case ets:match_object(emqx_suboption, {{'_', Pid}, '_'}) of
+            case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of
                 [] -> emqx_ctl:print("Not Found.~n");
-                Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption]
+                Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption]
             end
     end;
 subscriptions(["add", ClientId, Topic, QoS]) ->
     if_valid_qos(QoS, fun(IntQos) ->
-        case ets:lookup(emqx_channel, bin(ClientId)) of
+        case ets:lookup(?CHAN_TAB, bin(ClientId)) of
             [] ->
                 emqx_ctl:print("Error: Channel not found!");
             [{_, Pid}] ->
@@ -230,7 +232,7 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
         end
     end);
 subscriptions(["del", ClientId, Topic]) ->
-    case ets:lookup(emqx_channel, bin(ClientId)) of
+    case ets:lookup(?CHAN_TAB, bin(ClientId)) of
         [] ->
             emqx_ctl:print("Error: Channel not found!");
         [{_, Pid}] ->
@@ -841,7 +843,7 @@ print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) ->
     emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
 print({emqx_topic, #route{topic = Topic, dest = Node}}) ->
     emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
-print({emqx_suboption, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
+print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
     SubId = maps:get(subid, Options),
     QoS = maps:get(qos, Options, 0),
     NL = maps:get(nl, Options, 0),

+ 1 - 2
apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl

@@ -18,11 +18,10 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx/include/emqx_router.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--define(ROUTE_TAB, emqx_route).
-
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 

+ 7 - 7
apps/emqx_retainer/src/emqx_retainer_index.erl

@@ -31,7 +31,7 @@
 -type index() :: list(pos_integer()).
 
 %% @doc Index key is a term that can be effectively searched in the index table.
--type index_key() :: {index(), {emqx_topic:words(), emqx_topic:words()}}.
+-type index_key() :: {index(), {emqx_types:words(), emqx_types:words()}}.
 
 -type match_pattern_part() :: term().
 
@@ -42,7 +42,7 @@
 %% @doc Given words of a concrete topic (`Tokens') and a list of `Indices',
 %% constructs index keys for the topic and each of the indices.
 %% `Fun' is called with each of these keys.
--spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_topic:words()) -> ok.
+-spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_types:words()) -> ok.
 foreach_index_key(_Fun, [], _Tokens) ->
     ok;
 foreach_index_key(Fun, [Index | Indices], Tokens) ->
@@ -59,7 +59,7 @@ foreach_index_key(Fun, [Index | Indices], Tokens) ->
 %% returns `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' term.
 %%
 %% @see foreach_index_key/3
--spec to_index_key(index(), emqx_topic:words()) -> index_key().
+-spec to_index_key(index(), emqx_types:words()) -> index_key().
 to_index_key(Index, Tokens) ->
     {Index, split_index_tokens(Index, Tokens, 1, [], [])}.
 
@@ -73,7 +73,7 @@ to_index_key(Index, Tokens) ->
 %%
 %% @see foreach_index_key/3
 %% @see to_index_key/2
--spec index_score(index(), emqx_topic:words()) -> non_neg_integer().
+-spec index_score(index(), emqx_types:words()) -> non_neg_integer().
 index_score(Index, Tokens) ->
     index_score(Index, Tokens, 1, 0).
 
@@ -92,7 +92,7 @@ select_index(Tokens, Indices) ->
 %%
 %% E.g. for `[2, 3]' index and <code>['+', <<"b">>, '+', <<"d">>]</code> wildcard topic
 %% returns <code>{[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}</code> pattern.
--spec condition(index(), emqx_topic:words()) -> match_pattern_part().
+-spec condition(index(), emqx_types:words()) -> match_pattern_part().
 condition(Index, Tokens) ->
     {Index, condition(Index, Tokens, 1, [], [])}.
 
@@ -100,7 +100,7 @@ condition(Index, Tokens) ->
 %%
 %% E.g. for <code>['+', <<"b">>, '+', <<"d">>, '#']</code> wildcard topic
 %% returns <code>['_', <<"b">>, '_', <<"d">> | '_']</code> pattern.
--spec condition(emqx_topic:words()) -> match_pattern_part().
+-spec condition(emqx_types:words()) -> match_pattern_part().
 condition(Tokens) ->
     Tokens1 = [
         case W =:= '+' of
@@ -118,7 +118,7 @@ condition(Tokens) ->
 %%
 %% E.g given `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' index key
 %% returns `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic.
--spec restore_topic(index_key()) -> emqx_topic:words().
+-spec restore_topic(index_key()) -> emqx_types:words().
 restore_topic({Index, {IndexTokens, OtherTokens}}) ->
     restore_topic(Index, IndexTokens, OtherTokens, 1, []).
 

+ 2 - 4
rel/i18n/emqx_schema.hocon

@@ -976,15 +976,13 @@ fields_tcp_opts_nodelay.label:
 """TCP_NODELAY"""
 
 fields_tcp_opts_keepalive.desc:
-"""
-Enable TCP keepalive for MQTT connections over TCP or SSL.
+"""Enable TCP keepalive for MQTT connections over TCP or SSL.
 The value is three comma separated numbers in the format of 'Idle,Interval,Probes'
  - Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200).
  - Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
  - Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9).
 For example "240,30,5" means: EMQX should start sending TCP keepalive probes after the connection is in idle for 240 seconds, and the probes are sent every 30 seconds until a response is received from the MQTT client, if it misses 5 consecutive responses, EMQX should close the connection.
-Default: 'none'
-"""
+Default: 'none'"""
 
 fields_tcp_opts_keepalive.label:
 """TCP keepalive options"""