|
|
@@ -41,15 +41,15 @@
|
|
|
-compile(nowarn_export_all).
|
|
|
-endif.
|
|
|
|
|
|
+-define(TRIE, emqx_trie).
|
|
|
-define(PREFIX(Prefix), {Prefix, 0}).
|
|
|
-define(TOPIC(Topic), {Topic, 1}).
|
|
|
|
|
|
--record(emqx_topic,
|
|
|
+-record(?TRIE,
|
|
|
{ key :: ?TOPIC(binary()) | ?PREFIX(binary())
|
|
|
, count = 0 :: non_neg_integer()
|
|
|
}).
|
|
|
|
|
|
--define(TOPICS_TAB, emqx_topic).
|
|
|
-define(IS_COMPACT, true).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -68,15 +68,16 @@ put_default_compaction_flag() ->
|
|
|
mnesia(boot) ->
|
|
|
%% Optimize storage
|
|
|
StoreProps = [{ets, [{read_concurrency, true},
|
|
|
- {write_concurrency, true}]}],
|
|
|
- ok = ekka_mnesia:create_table(?TOPICS_TAB, [
|
|
|
+ {write_concurrency, true}
|
|
|
+ ]}],
|
|
|
+ ok = ekka_mnesia:create_table(?TRIE, [
|
|
|
{ram_copies, [node()]},
|
|
|
- {record_name, emqx_topic},
|
|
|
- {attributes, record_info(fields, emqx_topic)},
|
|
|
+ {record_name, ?TRIE},
|
|
|
+ {attributes, record_info(fields, ?TRIE)},
|
|
|
{storage_properties, StoreProps}]);
|
|
|
mnesia(copy) ->
|
|
|
%% Copy topics table
|
|
|
- ok = ekka_mnesia:copy_table(?TOPICS_TAB, ram_copies).
|
|
|
+ ok = ekka_mnesia:copy_table(?TRIE, ram_copies).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Topics APIs
|
|
|
@@ -86,7 +87,7 @@ mnesia(copy) ->
|
|
|
-spec(insert(emqx_topic:topic()) -> ok).
|
|
|
insert(Topic) when is_binary(Topic) ->
|
|
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
|
|
- case mnesia:wread({?TOPICS_TAB, TopicKey}) of
|
|
|
+ case mnesia:wread({?TRIE, TopicKey}) of
|
|
|
[_] -> ok; %% already inserted
|
|
|
[] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys])
|
|
|
end.
|
|
|
@@ -95,7 +96,7 @@ insert(Topic) when is_binary(Topic) ->
|
|
|
-spec(delete(emqx_topic:topic()) -> ok).
|
|
|
delete(Topic) when is_binary(Topic) ->
|
|
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
|
|
- case [] =/= mnesia:wread({?TOPICS_TAB, TopicKey}) of
|
|
|
+ case [] =/= mnesia:wread({?TRIE, TopicKey}) of
|
|
|
true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]);
|
|
|
false -> ok
|
|
|
end.
|
|
|
@@ -104,16 +105,16 @@ delete(Topic) when is_binary(Topic) ->
|
|
|
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
|
|
match(Topic) when is_binary(Topic) ->
|
|
|
Words = emqx_topic:words(Topic),
|
|
|
- false = emqx_topic:wildcard(Words), %% assert
|
|
|
+ false = emqx_topic:wildcard(Words), % assert
|
|
|
do_match(Words).
|
|
|
|
|
|
%% @doc Is the trie empty?
|
|
|
-spec(empty() -> boolean()).
|
|
|
-empty() -> ets:info(?TOPICS_TAB, size) == 0.
|
|
|
+empty() -> ets:info(?TRIE, size) == 0.
|
|
|
|
|
|
-spec lock_tables() -> ok.
|
|
|
lock_tables() ->
|
|
|
- mnesia:write_lock_table(?TOPICS_TAB).
|
|
|
+ mnesia:write_lock_table(?TRIE).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
@@ -159,34 +160,34 @@ make_prefixes([H | T], Prefix0, Acc0) ->
|
|
|
make_prefixes(T, Prefix, Acc).
|
|
|
|
|
|
insert_key(Key) ->
|
|
|
- T = case mnesia:wread({?TOPICS_TAB, Key}) of
|
|
|
- [#emqx_topic{count = C} = T1] ->
|
|
|
- T1#emqx_topic{count = C + 1};
|
|
|
+ T = case mnesia:wread({?TRIE, Key}) of
|
|
|
+ [#?TRIE{count = C} = T1] ->
|
|
|
+ T1#?TRIE{count = C + 1};
|
|
|
[] ->
|
|
|
- #emqx_topic{key = Key, count = 1}
|
|
|
+ #?TRIE{key = Key, count = 1}
|
|
|
end,
|
|
|
ok = mnesia:write(T).
|
|
|
|
|
|
delete_key(Key) ->
|
|
|
- case mnesia:wread({?TOPICS_TAB, Key}) of
|
|
|
- [#emqx_topic{count = C} = T] when C > 1 ->
|
|
|
- ok = mnesia:write(T#emqx_topic{count = C - 1});
|
|
|
+ case mnesia:wread({?TRIE, Key}) of
|
|
|
+ [#?TRIE{count = C} = T] when C > 1 ->
|
|
|
+ ok = mnesia:write(T#?TRIE{count = C - 1});
|
|
|
[_] ->
|
|
|
- ok = mnesia:delete(?TOPICS_TAB, Key, write);
|
|
|
+ ok = mnesia:delete(?TRIE, Key, write);
|
|
|
[] ->
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
lookup_topic(Topic) when is_binary(Topic) ->
|
|
|
- case ets:lookup(?TOPICS_TAB, ?TOPIC(Topic)) of
|
|
|
- [#emqx_topic{count = C}] -> [Topic || C > 0];
|
|
|
+ case ets:lookup(?TRIE, ?TOPIC(Topic)) of
|
|
|
+ [#?TRIE{count = C}] -> [Topic || C > 0];
|
|
|
[] -> []
|
|
|
end.
|
|
|
|
|
|
has_prefix(empty) -> true; %% this is the virtual tree root
|
|
|
has_prefix(Prefix) ->
|
|
|
- case ets:lookup(?TOPICS_TAB, ?PREFIX(Prefix)) of
|
|
|
- [#emqx_topic{count = C}] -> C > 0;
|
|
|
+ case ets:lookup(?TRIE, ?PREFIX(Prefix)) of
|
|
|
+ [#?TRIE{count = C}] -> C > 0;
|
|
|
[] -> false
|
|
|
end.
|
|
|
|
|
|
@@ -307,4 +308,6 @@ do_compact_test() ->
|
|
|
?assertEqual([<<"a/b/+">>, <<"c">>], do_compact(words(<<"a/b/+/c">>))),
|
|
|
?assertEqual([<<"a/+">>, <<"+">>, <<"b">>], do_compact(words(<<"a/+/+/b">>))).
|
|
|
|
|
|
+clear_tables() -> mnesia:clear_table(?TRIE).
|
|
|
+
|
|
|
-endif. % TEST
|