|
|
@@ -27,8 +27,9 @@
|
|
|
%% Trie APIs
|
|
|
-export([ insert/1
|
|
|
, match/1
|
|
|
- , lookup/1
|
|
|
, delete/1
|
|
|
+ , put_compaction_flag/1
|
|
|
+ , put_default_compaction_flag/0
|
|
|
]).
|
|
|
|
|
|
-export([ empty/0
|
|
|
@@ -40,177 +41,270 @@
|
|
|
-compile(nowarn_export_all).
|
|
|
-endif.
|
|
|
|
|
|
--type(triple() :: {root | binary(), emqx_topic:word(), binary()}).
|
|
|
+-define(PREFIX(Prefix), {Prefix, 0}).
|
|
|
+-define(TOPIC(Topic), {Topic, 1}).
|
|
|
|
|
|
-%% Mnesia tables
|
|
|
--define(TRIE_TAB, emqx_trie).
|
|
|
--define(TRIE_NODE_TAB, emqx_trie_node).
|
|
|
+-record(emqx_topic,
|
|
|
+ { key :: ?TOPIC(binary()) | ?PREFIX(binary())
|
|
|
+ , count = 0 :: non_neg_integer()
|
|
|
+ }).
|
|
|
|
|
|
--elvis([{elvis_style, function_naming_convention, disable}]).
|
|
|
+-define(TOPICS_TAB, emqx_topic).
|
|
|
+-define(IS_COMPACT, true).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Mnesia bootstrap
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-%% @doc Create or replicate trie tables.
|
|
|
+put_compaction_flag(Bool) when is_boolean(Bool) ->
|
|
|
+ _ = persistent_term:put({?MODULE, compaction}, Bool),
|
|
|
+ ok.
|
|
|
+
|
|
|
+put_default_compaction_flag() ->
|
|
|
+ ok = put_compaction_flag(?IS_COMPACT).
|
|
|
+
|
|
|
+%% @doc Create or replicate topics table.
|
|
|
-spec(mnesia(boot | copy) -> ok).
|
|
|
mnesia(boot) ->
|
|
|
%% Optimize storage
|
|
|
StoreProps = [{ets, [{read_concurrency, true},
|
|
|
{write_concurrency, true}]}],
|
|
|
- %% Trie table
|
|
|
- ok = ekka_mnesia:create_table(?TRIE_TAB, [
|
|
|
+ ok = ekka_mnesia:create_table(?TOPICS_TAB, [
|
|
|
{ram_copies, [node()]},
|
|
|
- {record_name, trie},
|
|
|
- {attributes, record_info(fields, trie)},
|
|
|
- {storage_properties, StoreProps}]),
|
|
|
- %% Trie node table
|
|
|
- ok = ekka_mnesia:create_table(?TRIE_NODE_TAB, [
|
|
|
- {ram_copies, [node()]},
|
|
|
- {record_name, trie_node},
|
|
|
- {attributes, record_info(fields, trie_node)},
|
|
|
+ {record_name, emqx_topic},
|
|
|
+ {attributes, record_info(fields, emqx_topic)},
|
|
|
{storage_properties, StoreProps}]);
|
|
|
-
|
|
|
mnesia(copy) ->
|
|
|
- %% Copy trie table
|
|
|
- ok = ekka_mnesia:copy_table(?TRIE_TAB, ram_copies),
|
|
|
- %% Copy trie_node table
|
|
|
- ok = ekka_mnesia:copy_table(?TRIE_NODE_TAB, ram_copies).
|
|
|
+ %% Copy topics table
|
|
|
+ ok = ekka_mnesia:copy_table(?TOPICS_TAB, ram_copies).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% Trie APIs
|
|
|
+%% Topics APIs
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
%% @doc Insert a topic filter into the trie.
|
|
|
-spec(insert(emqx_topic:topic()) -> ok).
|
|
|
insert(Topic) when is_binary(Topic) ->
|
|
|
- case mnesia:wread({?TRIE_NODE_TAB, Topic}) of
|
|
|
- [#trie_node{topic = Topic}] ->
|
|
|
- ok;
|
|
|
- [TrieNode = #trie_node{topic = undefined}] ->
|
|
|
- write_trie_node(TrieNode#trie_node{topic = Topic});
|
|
|
- [] ->
|
|
|
- %% Add trie path
|
|
|
- ok = lists:foreach(fun add_path/1, triples(Topic)),
|
|
|
- %% Add last node
|
|
|
- write_trie_node(#trie_node{node_id = Topic, topic = Topic})
|
|
|
+ {TopicKey, PrefixKeys} = make_keys(Topic),
|
|
|
+ case mnesia:wread({?TOPICS_TAB, TopicKey}) of
|
|
|
+ [_] -> ok; %% already inserted
|
|
|
+ [] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys])
|
|
|
end.
|
|
|
|
|
|
-%% @doc Find trie nodes that match the topic name.
|
|
|
--spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
|
|
-match(Topic) when is_binary(Topic) ->
|
|
|
- TrieNodes = match_node(root, emqx_topic:words(Topic)),
|
|
|
- [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
|
|
|
-
|
|
|
-%% @doc Lookup a trie node.
|
|
|
--spec(lookup(NodeId :: binary()) -> [trie_node()]).
|
|
|
-lookup(NodeId) ->
|
|
|
- mnesia:read(?TRIE_NODE_TAB, NodeId).
|
|
|
-
|
|
|
%% @doc Delete a topic filter from the trie.
|
|
|
-spec(delete(emqx_topic:topic()) -> ok).
|
|
|
delete(Topic) when is_binary(Topic) ->
|
|
|
- case mnesia:wread({?TRIE_NODE_TAB, Topic}) of
|
|
|
- [#trie_node{edge_count = 0}] ->
|
|
|
- ok = mnesia:delete({?TRIE_NODE_TAB, Topic}),
|
|
|
- delete_path(lists:reverse(triples(Topic)));
|
|
|
- [TrieNode] ->
|
|
|
- write_trie_node(TrieNode#trie_node{topic = undefined});
|
|
|
- [] -> ok
|
|
|
+ {TopicKey, PrefixKeys} = make_keys(Topic),
|
|
|
+ case [] =/= mnesia:wread({?TOPICS_TAB, TopicKey}) of
|
|
|
+ true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]);
|
|
|
+ false -> ok
|
|
|
end.
|
|
|
|
|
|
+%% @doc Find trie nodes that matchs the topic name.
|
|
|
+-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
|
|
+match(Topic) when is_binary(Topic) ->
|
|
|
+ Words = emqx_topic:words(Topic),
|
|
|
+ false = emqx_topic:wildcard(Words), %% assert
|
|
|
+ do_match(Words).
|
|
|
+
|
|
|
%% @doc Is the trie empty?
|
|
|
-spec(empty() -> boolean()).
|
|
|
-empty() ->
|
|
|
- ets:info(?TRIE_TAB, size) == 0.
|
|
|
+empty() -> ets:info(?TOPICS_TAB, size) == 0.
|
|
|
|
|
|
-spec lock_tables() -> ok.
|
|
|
lock_tables() ->
|
|
|
- mnesia:write_lock_table(?TRIE_TAB),
|
|
|
- mnesia:write_lock_table(?TRIE_NODE_TAB).
|
|
|
+ mnesia:write_lock_table(?TOPICS_TAB).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-%% Topic to triples.
|
|
|
--spec(triples(emqx_topic:topic()) -> list(triple())).
|
|
|
-triples(Topic) when is_binary(Topic) ->
|
|
|
- triples(emqx_topic:words(Topic), root, []).
|
|
|
-
|
|
|
-triples([], _Parent, Acc) ->
|
|
|
- lists:reverse(Acc);
|
|
|
-triples([W|Words], Parent, Acc) ->
|
|
|
- Node = join(Parent, W),
|
|
|
- triples(Words, Node, [{Parent, W, Node}|Acc]).
|
|
|
-
|
|
|
-join(root, W) ->
|
|
|
- emqx_topic:join([W]);
|
|
|
-join(Parent, W) ->
|
|
|
- emqx_topic:join([Parent, W]).
|
|
|
-
|
|
|
-%% Add a path to the trie.
|
|
|
-add_path({Node, Word, Child}) ->
|
|
|
- Edge = #trie_edge{node_id = Node, word = Word},
|
|
|
- case mnesia:wread({?TRIE_NODE_TAB, Node}) of
|
|
|
- [TrieNode = #trie_node{edge_count = Count}] ->
|
|
|
- case mnesia:wread({?TRIE_TAB, Edge}) of
|
|
|
- [] ->
|
|
|
- ok = write_trie_node(TrieNode#trie_node{edge_count = Count + 1}),
|
|
|
- write_trie(#trie{edge = Edge, node_id = Child});
|
|
|
- [_] -> ok
|
|
|
- end;
|
|
|
+make_keys(Topic) ->
|
|
|
+ Words = emqx_topic:words(Topic),
|
|
|
+ {?TOPIC(Topic), [?PREFIX(Prefix) || Prefix <- make_prefixes(Words)]}.
|
|
|
+
|
|
|
+compact(Words) ->
|
|
|
+ case is_compact() of
|
|
|
+ true -> do_compact(Words);
|
|
|
+ false -> Words
|
|
|
+ end.
|
|
|
+
|
|
|
+%% a/b/c/+/d/# => [a/b/c/+, d/#]
|
|
|
+%% a/+/+/b => [a/+, +, b]
|
|
|
+do_compact(Words) ->
|
|
|
+ do_compact(Words, empty, []).
|
|
|
+
|
|
|
+do_compact([], empty, Acc) -> lists:reverse(Acc);
|
|
|
+do_compact([], Seg, Acc) -> lists:reverse([Seg | Acc]);
|
|
|
+do_compact([Word | Words], Seg, Acc) when Word =:= '+' orelse Word =:= '#' ->
|
|
|
+ do_compact(Words, empty, [join(Seg, Word) | Acc]);
|
|
|
+do_compact([Word | Words], Seg, Acc) ->
|
|
|
+ do_compact(Words, join(Seg, Word), Acc).
|
|
|
+
|
|
|
+join(empty, '+') -> <<"+">>;
|
|
|
+join(empty, '#') -> <<"#">>;
|
|
|
+join(empty, '') -> <<>>;
|
|
|
+join(empty, Word) -> Word;
|
|
|
+join(Prefix, Word) -> emqx_topic:join([Prefix, Word]).
|
|
|
+
|
|
|
+make_prefixes(Words) ->
|
|
|
+ lists:map(fun emqx_topic:join/1,
|
|
|
+ make_prefixes(compact(Words), [], [])).
|
|
|
+
|
|
|
+make_prefixes([_LastWord], _Prefix, Acc) ->
|
|
|
+ lists:map(fun lists:reverse/1, Acc);
|
|
|
+make_prefixes([H | T], Prefix0, Acc0) ->
|
|
|
+ Prefix = [H | Prefix0],
|
|
|
+ Acc = [Prefix | Acc0],
|
|
|
+ make_prefixes(T, Prefix, Acc).
|
|
|
+
|
|
|
+insert_key(Key) ->
|
|
|
+ T = case mnesia:wread({?TOPICS_TAB, Key}) of
|
|
|
+ [#emqx_topic{count = C} = T1] ->
|
|
|
+ T1#emqx_topic{count = C + 1};
|
|
|
+ [] ->
|
|
|
+ #emqx_topic{key = Key, count = 1}
|
|
|
+ end,
|
|
|
+ ok = mnesia:write(T).
|
|
|
+
|
|
|
+delete_key(Key) ->
|
|
|
+ case mnesia:wread({?TOPICS_TAB, Key}) of
|
|
|
+ [#emqx_topic{count = C} = T] when C > 1 ->
|
|
|
+ ok = mnesia:write(T#emqx_topic{count = C - 1});
|
|
|
+ [_] ->
|
|
|
+ ok = mnesia:delete(?TOPICS_TAB, Key, write);
|
|
|
[] ->
|
|
|
- ok = write_trie_node(#trie_node{node_id = Node, edge_count = 1}),
|
|
|
- write_trie(#trie{edge = Edge, node_id = Child})
|
|
|
+ ok
|
|
|
end.
|
|
|
|
|
|
-%% Match node with word or '+'.
|
|
|
-match_node(root, [NodeId = <<$$, _/binary>>|Words]) ->
|
|
|
- match_node(NodeId, Words, []);
|
|
|
-
|
|
|
-match_node(NodeId, Words) ->
|
|
|
- match_node(NodeId, Words, []).
|
|
|
-
|
|
|
-match_node(NodeId, [], ResAcc) ->
|
|
|
- mnesia:read(?TRIE_NODE_TAB, NodeId) ++ 'match_#'(NodeId, ResAcc);
|
|
|
-
|
|
|
-match_node(NodeId, [W|Words], ResAcc) ->
|
|
|
- lists:foldl(fun(WArg, Acc) ->
|
|
|
- case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = WArg}) of
|
|
|
- [#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc);
|
|
|
- [] -> Acc
|
|
|
- end
|
|
|
- end, 'match_#'(NodeId, ResAcc), [W, '+']).
|
|
|
-
|
|
|
-%% Match node with '#'.
|
|
|
-'match_#'(NodeId, ResAcc) ->
|
|
|
- case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = '#'}) of
|
|
|
- [#trie{node_id = ChildId}] ->
|
|
|
- mnesia:read(?TRIE_NODE_TAB, ChildId) ++ ResAcc;
|
|
|
- [] -> ResAcc
|
|
|
+lookup_topic(Topic) when is_binary(Topic) ->
|
|
|
+ case ets:lookup(?TOPICS_TAB, ?TOPIC(Topic)) of
|
|
|
+ [#emqx_topic{count = C}] -> [Topic || C > 0];
|
|
|
+ [] -> []
|
|
|
end.
|
|
|
|
|
|
-%% Delete paths from the trie.
|
|
|
-delete_path([]) ->
|
|
|
- ok;
|
|
|
-delete_path([{NodeId, Word, _} | RestPath]) ->
|
|
|
- ok = mnesia:delete({?TRIE_TAB, #trie_edge{node_id = NodeId, word = Word}}),
|
|
|
- case mnesia:wread({?TRIE_NODE_TAB, NodeId}) of
|
|
|
- [#trie_node{edge_count = 1, topic = undefined}] ->
|
|
|
- ok = mnesia:delete({?TRIE_NODE_TAB, NodeId}),
|
|
|
- delete_path(RestPath);
|
|
|
- [TrieNode = #trie_node{edge_count = 1, topic = _}] ->
|
|
|
- write_trie_node(TrieNode#trie_node{edge_count = 0});
|
|
|
- [TrieNode = #trie_node{edge_count = C}] ->
|
|
|
- write_trie_node(TrieNode#trie_node{edge_count = C-1});
|
|
|
- [] ->
|
|
|
- mnesia:abort({node_not_found, NodeId})
|
|
|
+has_prefix(empty) -> true; %% this is the virtual tree root
|
|
|
+has_prefix(Prefix) ->
|
|
|
+ case ets:lookup(?TOPICS_TAB, ?PREFIX(Prefix)) of
|
|
|
+ [#emqx_topic{count = C}] -> C > 0;
|
|
|
+ [] -> false
|
|
|
end.
|
|
|
|
|
|
-write_trie(Trie) ->
|
|
|
- mnesia:write(?TRIE_TAB, Trie, write).
|
|
|
+do_match([<<"$", _/binary>> = Prefix | Words]) ->
|
|
|
+ %% For topics having dollar sign prefix,
|
|
|
+ %% we do not match root level + or #,
|
|
|
+ %% fast forward to the next level.
|
|
|
+ case Words =:= [] of
|
|
|
+ true -> lookup_topic(Prefix);
|
|
|
+ false -> []
|
|
|
+ end ++ do_match(Words, Prefix);
|
|
|
+do_match(Words) ->
|
|
|
+ do_match(Words, empty).
|
|
|
+
|
|
|
+do_match(Words, Prefix) ->
|
|
|
+ match(is_compact(), Words, Prefix, []).
|
|
|
+
|
|
|
+match(_IsCompact, [], Topic, Acc) ->
|
|
|
+ match_any_level(Topic) ++ %% try match foo/bar/#
|
|
|
+ lookup_topic(Topic) ++ %% try match foo/bar
|
|
|
+ Acc;
|
|
|
+match(IsCompact, [Word | Words], Prefix, Acc0) ->
|
|
|
+ case {has_prefix(Prefix), IsCompact} of
|
|
|
+ {false, false} ->
|
|
|
+ %% non-compact paths in database
|
|
|
+ %% if there is no prefix matches the current topic prefix
|
|
|
+ %% we can simpliy return from here
|
|
|
+ %% e.g. a/b/c/+ results in
|
|
|
+ %% - a
|
|
|
+ %% - a/b
|
|
|
+ %% - a/b/c
|
|
|
+ %% - a/b/c/+
|
|
|
+ %% if the input topic is to match 'a/x/y',
|
|
|
+ %% then at the second level, we lookup prefix a/x,
|
|
|
+ %% no such prefix to be found, meaning there is no point
|
|
|
+ %% searching for 'a/x/y', 'a/x/+' or 'a/x/#'
|
|
|
+ Acc0;
|
|
|
+ _ ->
|
|
|
+ %% compact paths in database
|
|
|
+ %% we have to enumerate all possible prefixes
|
|
|
+ %% e.g. a/+/b/# results with below entries in database
|
|
|
+ %% - a/+
|
|
|
+ %% - a/+/b/#
|
|
|
+ %% when matching a/x/y, we need to enumerate
|
|
|
+ %% - a
|
|
|
+ %% - a/x
|
|
|
+ %% - a/x/y
|
|
|
+ %% *with '+', '#' replaced at each level
|
|
|
+ Acc1 = match_any_level(Prefix) ++ Acc0,
|
|
|
+ Acc = match(IsCompact, Words, join(Prefix, '+'), Acc1),
|
|
|
+ match(IsCompact, Words, join(Prefix, Word), Acc)
|
|
|
+ end.
|
|
|
+
|
|
|
+match_any_level(Prefix) ->
|
|
|
+ MlTopic = join(Prefix, '#'),
|
|
|
+ lookup_topic(MlTopic).
|
|
|
+
|
|
|
+is_compact() ->
|
|
|
+ case persistent_term:get({?MODULE, compaction}, undefined) of
|
|
|
+ undefined ->
|
|
|
+ Default = ?IS_COMPACT,
|
|
|
+ FromEnv = emqx:get_env(trie_compaction, Default),
|
|
|
+ _ = put_compaction_flag(FromEnv),
|
|
|
+ true = is_boolean(FromEnv),
|
|
|
+ FromEnv;
|
|
|
+ Value when is_boolean(Value) ->
|
|
|
+ Value
|
|
|
+ end.
|
|
|
+
|
|
|
+-ifdef(TEST).
|
|
|
+-include_lib("eunit/include/eunit.hrl").
|
|
|
+
|
|
|
+make_keys_test_() ->
|
|
|
+ [{"no compact", fun() -> with_compact_flag(false, fun make_keys_no_compact/0) end},
|
|
|
+ {"compact", fun() -> with_compact_flag(true, fun make_keys_compact/0) end}
|
|
|
+ ].
|
|
|
+
|
|
|
+make_keys_no_compact() ->
|
|
|
+ ?assertEqual({?TOPIC(<<"#">>), []}, make_keys(<<"#">>)),
|
|
|
+ ?assertEqual({?TOPIC(<<"a/+">>),
|
|
|
+ [?PREFIX(<<"a">>)]}, make_keys(<<"a/+">>)),
|
|
|
+ ?assertEqual({?TOPIC(<<"+">>), []}, make_keys(<<"+">>)).
|
|
|
+
|
|
|
+make_keys_compact() ->
|
|
|
+ ?assertEqual({?TOPIC(<<"#">>), []}, make_keys(<<"#">>)),
|
|
|
+ ?assertEqual({?TOPIC(<<"a/+">>), []}, make_keys(<<"a/+">>)),
|
|
|
+ ?assertEqual({?TOPIC(<<"+">>), []}, make_keys(<<"+">>)),
|
|
|
+ ?assertEqual({?TOPIC(<<"a/+/c">>),
|
|
|
+ [?PREFIX(<<"a/+">>)]}, make_keys(<<"a/+/c">>)).
|
|
|
+
|
|
|
+words(T) -> emqx_topic:words(T).
|
|
|
+
|
|
|
+make_prefixes_t(Topic) -> make_prefixes(words(Topic)).
|
|
|
+
|
|
|
+with_compact_flag(IsCmopact, F) ->
|
|
|
+ put_compaction_flag(IsCmopact),
|
|
|
+ try F()
|
|
|
+ after put_default_compaction_flag()
|
|
|
+ end.
|
|
|
+
|
|
|
+make_prefixes_test_() ->
|
|
|
+ [{"no compact", fun() -> with_compact_flag(false, fun make_prefixes_no_compact/0) end},
|
|
|
+ {"compact", fun() -> with_compact_flag(true, fun make_prefixes_compact/0) end}
|
|
|
+ ].
|
|
|
+
|
|
|
+make_prefixes_no_compact() ->
|
|
|
+ ?assertEqual([<<"a/b">>, <<"a">>], make_prefixes_t(<<"a/b/+">>)),
|
|
|
+ ?assertEqual([<<"a/b/+/c">>, <<"a/b/+">>, <<"a/b">>, <<"a">>],
|
|
|
+ make_prefixes_t(<<"a/b/+/c/#">>)).
|
|
|
+
|
|
|
+make_prefixes_compact() ->
|
|
|
+ ?assertEqual([], make_prefixes_t(<<"a/b/+">>)),
|
|
|
+ ?assertEqual([<<"a/b/+">>], make_prefixes_t(<<"a/b/+/c/#">>)).
|
|
|
|
|
|
-write_trie_node(TrieNode) ->
|
|
|
- mnesia:write(?TRIE_NODE_TAB, TrieNode, write).
|
|
|
+do_compact_test() ->
|
|
|
+ ?assertEqual([<<"/+">>], do_compact(words(<<"/+">>))),
|
|
|
+ ?assertEqual([<<"/#">>], do_compact(words(<<"/#">>))),
|
|
|
+ ?assertEqual([<<"a/b/+">>, <<"c">>], do_compact(words(<<"a/b/+/c">>))),
|
|
|
+ ?assertEqual([<<"a/+">>, <<"+">>, <<"b">>], do_compact(words(<<"a/+/+/b">>))).
|
|
|
|
|
|
+-endif. % TEST
|