Bläddra i källkod

refactor: move topic() types def in `emqx_types.erl`

JimMoen 2 år sedan
förälder
incheckning
ea81a924f1

+ 7 - 0
apps/emqx/include/emqx_mqtt.hrl

@@ -48,6 +48,13 @@
     {?MQTT_PROTO_V5, <<"MQTT">>}
 ]).
 
+%%--------------------------------------------------------------------
+%% MQTT Topic and TopitFilter byte length
+%%--------------------------------------------------------------------
+
+%% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3]
+-define(MAX_TOPIC_LEN, 65535).
+
 %%--------------------------------------------------------------------
 %% MQTT QoS Levels
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx/src/emqx_broker.erl

@@ -103,10 +103,10 @@ start_link(Pool, Id) ->
 create_tabs() ->
     TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
 
-    %% SubOption: {Topic, SubPid} -> SubOption
+    %% SubOption: {TopicFilter, SubPid} -> SubOption
     ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]),
 
-    %% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
+    %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ...
     %% duplicate_bag: o(1) insert
     ok = emqx_utils_ets:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
 

+ 2 - 0
apps/emqx/src/emqx_frame.erl

@@ -300,6 +300,7 @@ parse_connect2(
     ConnPacket = #mqtt_packet_connect{
         proto_name = ProtoName,
         proto_ver = ProtoVer,
+        %% For bridge mode, non-standard implementation
         is_bridge = (BridgeTag =:= 8),
         clean_start = bool(CleanStart),
         will_flag = bool(WillFlag),
@@ -762,6 +763,7 @@ serialize_variable(
     #mqtt_packet_connect{
         proto_name = ProtoName,
         proto_ver = ProtoVer,
+        %% For bridge mode, non-standard implementation
         is_bridge = IsBridge,
         clean_start = CleanStart,
         will_flag = WillFlag,

+ 1 - 2
apps/emqx/src/emqx_mqueue.erl

@@ -75,11 +75,10 @@
 
 -export_type([mqueue/0, options/0]).
 
--type topic() :: emqx_types:topic().
 -type priority() :: infinity | integer().
 -type pq() :: emqx_pqueue:q().
 -type count() :: non_neg_integer().
--type p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}.
+-type p_table() :: ?NO_PRIORITY_TABLE | #{emqx_types:topic() := priority()}.
 -type options() :: #{
     max_len := count(),
     priorities => p_table(),

+ 5 - 7
apps/emqx/src/emqx_session_router.erl

@@ -57,9 +57,7 @@
     code_change/3
 ]).
 
--type group() :: binary().
-
--type dest() :: node() | {group(), node()}.
+-type dest() :: node() | {emqx_types:group(), node()}.
 
 -define(ROUTE_RAM_TAB, emqx_session_route_ram).
 -define(ROUTE_DISC_TAB, emqx_session_route_disc).
@@ -114,7 +112,7 @@ start_link(Pool, Id) ->
 %% Route APIs
 %%--------------------------------------------------------------------
 
--spec do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}.
+-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_add_route(Topic, SessionID) when is_binary(Topic) ->
     Route = #route{topic = Topic, dest = SessionID},
     case lists:member(Route, lookup_routes(Topic)) of
@@ -135,7 +133,7 @@ do_add_route(Topic, SessionID) when is_binary(Topic) ->
     end.
 
 %% @doc Match routes
--spec match_routes(emqx_topic:topic()) -> [emqx_types:route()].
+-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
 match_routes(Topic) when is_binary(Topic) ->
     case match_trie(Topic) of
         [] -> lookup_routes(Topic);
@@ -153,7 +151,7 @@ match_trie(Topic) ->
 delete_routes(SessionID, Subscriptions) ->
     cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}).
 
--spec do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}.
+-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_delete_route(Topic, SessionID) ->
     Route = #route{topic = Topic, dest = SessionID},
     case emqx_topic:wildcard(Topic) of
@@ -165,7 +163,7 @@ do_delete_route(Topic, SessionID) ->
     end.
 
 %% @doc Print routes to a topic
--spec print_routes(emqx_topic:topic()) -> ok.
+-spec print_routes(emqx_types:topic()) -> ok.
 print_routes(Topic) ->
     lists:foreach(
         fun(#route{topic = To, dest = SessionID}) ->

+ 2 - 2
apps/emqx/src/emqx_shared_sub.erl

@@ -97,7 +97,7 @@
 -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
 -define(SUBSCRIBER_DOWN, noproc).
 
--type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
+-type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()).
 
 -record(state, {pmon}).
 
@@ -156,7 +156,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
             end
     end.
 
--spec strategy(emqx_topic:group()) -> strategy().
+-spec strategy(emqx_types:group()) -> strategy().
 strategy(Group) ->
     try
         emqx:get_config([

+ 8 - 13
apps/emqx/src/emqx_topic.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_topic).
 
+-include("emqx_mqtt.hrl").
+
 %% APIs
 -export([
     match/2,
@@ -33,18 +35,9 @@
     parse/2
 ]).
 
--export_type([
-    group/0,
-    topic/0,
-    word/0
-]).
-
--type group() :: binary().
--type topic() :: binary().
--type word() :: '' | '+' | '#' | binary().
--type words() :: list(word()).
-
--define(MAX_TOPIC_LEN, 65535).
+-type topic() :: emqx_types:topic().
+-type word() :: emqx_types:word().
+-type words() :: emqx_types:words().
 
 %%--------------------------------------------------------------------
 %% APIs
@@ -142,6 +135,7 @@ prepend(Parent0, W) ->
         _ -> <<Parent/binary, $/, (bin(W))/binary>>
     end.
 
+-spec bin(word()) -> binary().
 bin('') -> <<>>;
 bin('+') -> <<"+">>;
 bin('#') -> <<"#">>;
@@ -163,6 +157,7 @@ tokens(Topic) ->
 words(Topic) when is_binary(Topic) ->
     [word(W) || W <- tokens(Topic)].
 
+-spec word(binary()) -> word().
 word(<<>>) -> '';
 word(<<"+">>) -> '+';
 word(<<"#">>) -> '#';
@@ -185,7 +180,7 @@ feed_var(Var, Val, [Var | Words], Acc) ->
 feed_var(Var, Val, [W | Words], Acc) ->
     feed_var(Var, Val, Words, [W | Acc]).
 
--spec join(list(binary())) -> binary().
+-spec join(list(word())) -> binary().
 join([]) ->
     <<>>;
 join([W]) ->

+ 3 - 3
apps/emqx/src/emqx_trie.erl

@@ -114,7 +114,7 @@ create_session_trie(Type) ->
 insert(Topic) when is_binary(Topic) ->
     insert(Topic, ?TRIE).
 
--spec insert_session(emqx_topic:topic()) -> ok.
+-spec insert_session(emqx_types:topic()) -> ok.
 insert_session(Topic) when is_binary(Topic) ->
     insert(Topic, session_trie()).
 
@@ -132,7 +132,7 @@ delete(Topic) when is_binary(Topic) ->
     delete(Topic, ?TRIE).
 
 %% @doc Delete a topic filter from the trie.
--spec delete_session(emqx_topic:topic()) -> ok.
+-spec delete_session(emqx_types:topic()) -> ok.
 delete_session(Topic) when is_binary(Topic) ->
     delete(Topic, session_trie()).
 
@@ -148,7 +148,7 @@ delete(Topic, Trie) when is_binary(Topic) ->
 match(Topic) when is_binary(Topic) ->
     match(Topic, ?TRIE).
 
--spec match_session(emqx_topic:topic()) -> list(emqx_topic:topic()).
+-spec match_session(emqx_types:topic()) -> list(emqx_types:topic()).
 match_session(Topic) when is_binary(Topic) ->
     match(Topic, session_trie()).
 

+ 12 - 3
apps/emqx/src/emqx_types.erl

@@ -29,10 +29,16 @@
 -export_type([
     zone/0,
     pubsub/0,
-    topic/0,
     subid/0
 ]).
 
+-export_type([
+    group/0,
+    topic/0,
+    word/0,
+    words/0
+]).
+
 -export_type([
     socktype/0,
     sockstate/0,
@@ -122,9 +128,13 @@
 
 -type zone() :: atom().
 -type pubsub() :: publish | subscribe.
--type topic() :: emqx_topic:topic().
 -type subid() :: binary() | atom().
 
+-type group() :: binary() | undefined.
+-type topic() :: binary().
+-type word() :: '' | '+' | '#' | binary().
+-type words() :: list(word()).
+
 -type socktype() :: tcp | udp | ssl | proxy | atom().
 -type sockstate() :: idle | running | blocked | closed.
 -type conninfo() :: #{
@@ -230,7 +240,6 @@
     | {share, topic(), deliver_result()}
 ].
 -type route() :: #route{}.
--type group() :: emqx_topic:group().
 -type route_entry() :: {topic(), node()} | {topic, group()}.
 -type command() :: #command{}.
 

+ 1 - 1
apps/emqx_authz/src/emqx_authz_mnesia.erl

@@ -33,7 +33,7 @@
 -type clientid() :: {clientid, binary()}.
 -type who() :: username() | clientid() | all.
 
--type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_topic:topic()}.
+-type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_types:topic()}.
 -type rules() :: [rule()].
 
 -record(emqx_acl, {

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl

@@ -49,7 +49,7 @@
 
 -type egress() :: #{
     local => #{
-        topic => emqx_topic:topic()
+        topic => emqx_types:topic()
     },
     remote := emqx_bridge_mqtt_msg:msgvars()
 }.

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl

@@ -43,7 +43,7 @@
 -type ingress() :: #{
     server := string(),
     remote := #{
-        topic := emqx_topic:topic(),
+        topic := emqx_types:topic(),
         qos => emqx_types:qos()
     },
     local := emqx_bridge_mqtt_msg:msgvars(),

+ 7 - 7
apps/emqx_retainer/src/emqx_retainer_index.erl

@@ -31,7 +31,7 @@
 -type index() :: list(pos_integer()).
 
 %% @doc Index key is a term that can be effectively searched in the index table.
--type index_key() :: {index(), {emqx_topic:words(), emqx_topic:words()}}.
+-type index_key() :: {index(), {emqx_types:words(), emqx_types:words()}}.
 
 -type match_pattern_part() :: term().
 
@@ -42,7 +42,7 @@
 %% @doc Given words of a concrete topic (`Tokens') and a list of `Indices',
 %% constructs index keys for the topic and each of the indices.
 %% `Fun' is called with each of these keys.
--spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_topic:words()) -> ok.
+-spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_types:words()) -> ok.
 foreach_index_key(_Fun, [], _Tokens) ->
     ok;
 foreach_index_key(Fun, [Index | Indices], Tokens) ->
@@ -59,7 +59,7 @@ foreach_index_key(Fun, [Index | Indices], Tokens) ->
 %% returns `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' term.
 %%
 %% @see foreach_index_key/3
--spec to_index_key(index(), emqx_topic:words()) -> index_key().
+-spec to_index_key(index(), emqx_types:words()) -> index_key().
 to_index_key(Index, Tokens) ->
     {Index, split_index_tokens(Index, Tokens, 1, [], [])}.
 
@@ -73,7 +73,7 @@ to_index_key(Index, Tokens) ->
 %%
 %% @see foreach_index_key/3
 %% @see to_index_key/2
--spec index_score(index(), emqx_topic:words()) -> non_neg_integer().
+-spec index_score(index(), emqx_types:words()) -> non_neg_integer().
 index_score(Index, Tokens) ->
     index_score(Index, Tokens, 1, 0).
 
@@ -92,7 +92,7 @@ select_index(Tokens, Indices) ->
 %%
 %% E.g. for `[2, 3]' index and <code>['+', <<"b">>, '+', <<"d">>]</code> wildcard topic
 %% returns <code>{[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}</code> pattern.
--spec condition(index(), emqx_topic:words()) -> match_pattern_part().
+-spec condition(index(), emqx_types:words()) -> match_pattern_part().
 condition(Index, Tokens) ->
     {Index, condition(Index, Tokens, 1, [], [])}.
 
@@ -100,7 +100,7 @@ condition(Index, Tokens) ->
 %%
 %% E.g. for <code>['+', <<"b">>, '+', <<"d">>, '#']</code> wildcard topic
 %% returns <code>['_', <<"b">>, '_', <<"d">> | '_']</code> pattern.
--spec condition(emqx_topic:words()) -> match_pattern_part().
+-spec condition(emqx_types:words()) -> match_pattern_part().
 condition(Tokens) ->
     Tokens1 = [
         case W =:= '+' of
@@ -118,7 +118,7 @@ condition(Tokens) ->
 %%
 %% E.g given `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' index key
 %% returns `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic.
--spec restore_topic(index_key()) -> emqx_topic:words().
+-spec restore_topic(index_key()) -> emqx_types:words().
 restore_topic({Index, {IndexTokens, OtherTokens}}) ->
     restore_topic(Index, IndexTokens, OtherTokens, 1, []).