Explorar o código

refactor: make it possible to use different tries

In preparation for persistent sessions
Tobias Lindahl %!s(int64=4) %!d(string=hai) anos
pai
achega
fc7b4c0009

+ 8 - 101
apps/emqx/src/emqx_router.erl

@@ -116,8 +116,10 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
             ok = emqx_router_helper:monitor(Dest),
             case emqx_topic:wildcard(Topic) of
                 true  ->
-                    maybe_trans(fun insert_trie_route/1, [Route]);
-                false -> insert_direct_route(Route)
+                    Fun = fun emqx_router_utils:insert_trie_route/2,
+                    emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
+                false ->
+                    emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
             end
     end.
 
@@ -162,8 +164,10 @@ do_delete_route(Topic, Dest) ->
     Route = #route{topic = Topic, dest = Dest},
     case emqx_topic:wildcard(Topic) of
         true  ->
-            maybe_trans(fun delete_trie_route/1, [Route]);
-        false -> delete_direct_route(Route)
+            Fun = fun emqx_router_utils:delete_trie_route/2,
+            emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
+        false ->
+            emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
     end.
 
 -spec(topics() -> list(emqx_types:topic())).
@@ -216,100 +220,3 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-insert_direct_route(Route) ->
-    mria:dirty_write(?ROUTE_TAB, Route).
-
-insert_trie_route(Route = #route{topic = Topic}) ->
-    case mnesia:wread({?ROUTE_TAB, Topic}) of
-        [] -> emqx_trie:insert(Topic);
-        _  -> ok
-    end,
-    mnesia:write(?ROUTE_TAB, Route, sticky_write).
-
-delete_direct_route(Route) ->
-    mria:dirty_delete_object(?ROUTE_TAB, Route).
-
-delete_trie_route(Route = #route{topic = Topic}) ->
-    case mnesia:wread({?ROUTE_TAB, Topic}) of
-        [Route] -> %% Remove route and trie
-                  ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write),
-                   emqx_trie:delete(Topic);
-        [_|_]   -> %% Remove route only
-                   mnesia:delete_object(?ROUTE_TAB, Route, sticky_write);
-        []      -> ok
-    end.
-
-%% @private
--spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
-maybe_trans(Fun, Args) ->
-    case emqx:get_config([broker, perf, route_lock_type]) of
-        key ->
-            trans(Fun, Args);
-        global ->
-            %% Assert:
-            mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash
-            lock_router(),
-            try mnesia:sync_dirty(Fun, Args)
-            after
-                unlock_router()
-            end;
-        tab ->
-            trans(fun() ->
-                          emqx_trie:lock_tables(),
-                          apply(Fun, Args)
-                  end, [])
-    end.
-
-%% The created fun only terminates with explicit exception
--dialyzer({nowarn_function, [trans/2]}).
-
--spec(trans(function(), list(any())) -> ok | {error, term()}).
-trans(Fun, Args) ->
-    {WPid, RefMon} =
-        spawn_monitor(
-            %% NOTE: this is under the assumption that crashes in Fun
-            %% are caught by mria:transaction/2.
-            %% Future changes should keep in mind that this process
-            %% always exit with database write result.
-            fun() ->
-                    Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of
-                              {atomic, Ok} -> Ok;
-                              {aborted, Reason} -> {error, Reason}
-                          end,
-                    exit({shutdown, Res})
-            end),
-    %% Receive a 'shutdown' exit to pass result from the short-lived process.
-    %% so the receive below can be receive-mark optimized by the compiler.
-    %%
-    %% If the result is sent as a regular message, we'll have to
-    %% either demonitor (with flush which is essentially a 'receive' since
-    %% the process is no longer alive after the result has been received),
-    %% or use a plain 'receive' to drain the normal 'DOWN' message.
-    %% However the compiler does not optimize this second 'receive'.
-    receive
-        {'DOWN', RefMon, process, WPid, Info} ->
-            case Info of
-                {shutdown, Result} -> Result;
-                _ -> {error, {trans_crash, Info}}
-            end
-    end.
-
-lock_router() ->
-    %% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
-    %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
-    case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
-        false ->
-            %% Force to sleep 1ms instead.
-            timer:sleep(1),
-            lock_router();
-        true ->
-            ok
-    end.
-
-unlock_router() ->
-    global:del_lock({?MODULE, self()}).

+ 126 - 0
apps/emqx/src/emqx_router_utils.erl

@@ -0,0 +1,126 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_router_utils).
+
+-include("emqx.hrl").
+
+-export([ delete_direct_route/2
+        , delete_trie_route/2
+        , insert_direct_route/2
+        , insert_trie_route/2
+        , maybe_trans/3
+        ]).
+
+insert_direct_route(Tab, Route) ->
+    mria:dirty_write(Tab, Route).
+
+insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
+    case mnesia:wread({RouteTab, Topic}) of
+        [] when RouteTab =:= emqx_route         -> emqx_trie:insert(Topic);
+        [] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic);
+        _  -> ok
+    end,
+    mnesia:write(RouteTab, Route, sticky_write).
+
+delete_direct_route(RouteTab, Route) ->
+    mria:dirty_delete_object(RouteTab, Route).
+
+delete_trie_route(RouteTab, Route = #route{topic = Topic}) ->
+    case mnesia:wread({RouteTab, Topic}) of
+        [R] when R =:= Route ->
+            %% Remove route and trie
+            ok = mnesia:delete_object(RouteTab, Route, sticky_write),
+            case RouteTab of
+                emqx_route         -> emqx_trie:delete(Topic);
+                emqx_session_route -> emqx_trie:delete_session(Topic)
+            end;
+        [_|_]   ->
+            %% Remove route only
+            mnesia:delete_object(RouteTab, Route, sticky_write);
+        [] ->
+            ok
+    end.
+
+%% @private
+-spec(maybe_trans(function(), list(any()), Shard :: atom()) -> ok | {error, term()}).
+maybe_trans(Fun, Args, Shard) ->
+    case emqx:get_config([broker, perf, route_lock_type]) of
+        key ->
+            trans(Fun, Args, Shard);
+        global ->
+            %% Assert:
+            mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
+            lock_router(Shard),
+            try mnesia:sync_dirty(Fun, Args)
+            after
+                unlock_router(Shard)
+            end;
+        tab ->
+            trans(fun() ->
+                          emqx_trie:lock_tables(),
+                          apply(Fun, Args)
+                  end, [], Shard)
+    end.
+
+%% The created fun only terminates with explicit exception
+-dialyzer({nowarn_function, [trans/3]}).
+
+-spec(trans(function(), list(any()), atom()) -> ok | {error, term()}).
+trans(Fun, Args, Shard) ->
+    {WPid, RefMon} =
+        spawn_monitor(
+            %% NOTE: this is under the assumption that crashes in Fun
+            %% are caught by mnesia:transaction/2.
+            %% Future changes should keep in mind that this process
+            %% always exit with database write result.
+            fun() ->
+                    Res = case mria:transaction(Shard, Fun, Args) of
+                              {atomic, Ok} -> Ok;
+                              {aborted, Reason} -> {error, Reason}
+                          end,
+                    exit({shutdown, Res})
+            end),
+    %% Receive a 'shutdown' exit to pass result from the short-lived process.
+    %% so the receive below can be receive-mark optimized by the compiler.
+    %%
+    %% If the result is sent as a regular message, we'll have to
+    %% either demonitor (with flush which is essentially a 'receive' since
+    %% the process is no longer alive after the result has been received),
+    %% or use a plain 'receive' to drain the normal 'DOWN' message.
+    %% However the compiler does not optimize this second 'receive'.
+    receive
+        {'DOWN', RefMon, process, WPid, Info} ->
+            case Info of
+                {shutdown, Result} -> Result;
+                _ -> {error, {trans_crash, Info}}
+            end
+    end.
+
+lock_router(Shard) ->
+    %% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
+    %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
+    case global:set_lock({{?MODULE, Shard}, self()}, [node() | nodes()], 0) of
+        false ->
+            %% Force to sleep 1ms instead.
+            timer:sleep(1),
+            lock_router(Shard);
+        true ->
+            ok
+    end.
+
+unlock_router(Shard) ->
+    global:del_lock({{?MODULE, Shard}, self()}).

+ 56 - 46
apps/emqx/src/emqx_trie.erl

@@ -75,24 +75,32 @@ mnesia(boot) ->
 %% @doc Insert a topic filter into the trie.
 -spec(insert(emqx_types:topic()) -> ok).
 insert(Topic) when is_binary(Topic) ->
+    insert(Topic, ?TRIE).
+
+insert(Topic, Trie) when is_binary(Topic) ->
     {TopicKey, PrefixKeys} = make_keys(Topic),
-    case mnesia:wread({?TRIE, TopicKey}) of
+    case mnesia:wread({Trie, TopicKey}) of
         [_] -> ok; %% already inserted
-        [] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys])
+        [] -> lists:foreach(fun(Key) -> insert_key(Key, Trie) end, [TopicKey | PrefixKeys])
     end.
 
 %% @doc Delete a topic filter from the trie.
 -spec(delete(emqx_types:topic()) -> ok).
 delete(Topic) when is_binary(Topic) ->
+    delete(Topic, ?TRIE).
+
+delete(Topic, Trie) when is_binary(Topic) ->
     {TopicKey, PrefixKeys} = make_keys(Topic),
-    case [] =/= mnesia:wread({?TRIE, TopicKey}) of
-        true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]);
+    case [] =/= mnesia:wread({Trie, TopicKey}) of
+        true -> lists:foreach(fun(Key) -> delete_key(Key, Trie) end, [TopicKey | PrefixKeys]);
         false -> ok
     end.
 
 %% @doc Find trie nodes that matchs the topic name.
 -spec(match(emqx_types:topic()) -> list(emqx_types:topic())).
 match(Topic) when is_binary(Topic) ->
+    match(Topic, ?TRIE).
+match(Topic, Trie) when is_binary(Topic) ->
     Words = emqx_topic:words(Topic),
     case emqx_topic:wildcard(Words) of
         true ->
@@ -105,12 +113,14 @@ match(Topic) when is_binary(Topic) ->
             %% Such clients will get disconnected.
             [];
         false ->
-            do_match(Words)
+            do_match(Words, Trie)
     end.
 
 %% @doc Is the trie empty?
 -spec(empty() -> boolean()).
-empty() -> ets:first(?TRIE) =:= '$end_of_table'.
+empty() -> empty(?TRIE).
+
+empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
 
 -spec lock_tables() -> ok.
 lock_tables() ->
@@ -163,70 +173,70 @@ make_prefixes([H | T], Prefix0, Acc0) ->
     Acc = [Prefix | Acc0],
     make_prefixes(T, Prefix, Acc).
 
-insert_key(Key) ->
-    T = case mnesia:wread({?TRIE, Key}) of
+insert_key(Key, Trie) ->
+    T = case mnesia:wread({Trie, Key}) of
             [#?TRIE{count = C} = T1] ->
                 T1#?TRIE{count = C + 1};
              [] ->
                 #?TRIE{key = Key, count = 1}
          end,
-    ok = mnesia:write(T).
+    ok = mnesia:write(Trie, T, write).
 
-delete_key(Key) ->
-    case mnesia:wread({?TRIE, Key}) of
+delete_key(Key, Trie) ->
+    case mnesia:wread({Trie, Key}) of
         [#?TRIE{count = C} = T] when C > 1 ->
-            ok = mnesia:write(T#?TRIE{count = C - 1});
+            ok = mnesia:write(Trie, T#?TRIE{count = C - 1}, write);
         [_] ->
-            ok = mnesia:delete(?TRIE, Key, write);
+            ok = mnesia:delete(Trie, Key, write);
         [] ->
             ok
     end.
 
 %% micro-optimization: no need to lookup when topic is not wildcard
 %% because we only insert wildcards to emqx_trie
-lookup_topic(_Topic, false) -> [];
-lookup_topic(Topic, true) -> lookup_topic(Topic).
+lookup_topic(_Topic,_Trie, false) -> [];
+lookup_topic(Topic, Trie, true) -> lookup_topic(Topic, Trie).
 
-lookup_topic(Topic) when is_binary(Topic) ->
-    case ets:lookup(?TRIE, ?TOPIC(Topic)) of
+lookup_topic(Topic, Trie) when is_binary(Topic) ->
+    case ets:lookup(Trie, ?TOPIC(Topic)) of
         [#?TRIE{count = C}] -> [Topic || C > 0];
         [] -> []
     end.
 
-has_prefix(empty) -> true; %% this is the virtual tree root
-has_prefix(Prefix) ->
-    case ets:lookup(?TRIE, ?PREFIX(Prefix)) of
+has_prefix(empty, _Trie) -> true; %% this is the virtual tree root
+has_prefix(Prefix, Trie) ->
+    case ets:lookup(Trie, ?PREFIX(Prefix)) of
         [#?TRIE{count = C}] -> C > 0;
         [] -> false
     end.
 
-do_match([<<"$", _/binary>> = Prefix | Words]) ->
+do_match([<<"$", _/binary>> = Prefix | Words], Trie) ->
     %% For topics having dollar sign prefix,
     %% we do not match root level + or #,
     %% fast forward to the next level.
     case Words =:= [] of
-        true -> lookup_topic(Prefix);
+        true -> lookup_topic(Prefix, Trie);
         false -> []
-    end ++ do_match(Words, Prefix);
-do_match(Words) ->
-    do_match(Words, empty).
+    end ++ do_match(Words, Prefix, Trie);
+do_match(Words, Trie) ->
+    do_match(Words, empty, Trie).
 
-do_match(Words, Prefix) ->
+do_match(Words, Prefix, Trie) ->
     case is_compact() of
-        true -> match_compact(Words, Prefix, false, []);
-        false -> match_no_compact(Words, Prefix, false, [])
+        true -> match_compact(Words, Prefix, Trie, false, []);
+        false -> match_no_compact(Words, Prefix, Trie, false, [])
     end.
 
-match_no_compact([], Topic, IsWildcard, Acc) ->
-    'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/#
-    lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+
+match_no_compact([], Topic, Trie, IsWildcard, Acc) ->
+    'match_#'(Topic, Trie) ++ %% try match foo/+/# or foo/bar/#
+    lookup_topic(Topic, Trie, IsWildcard) ++ %% e.g. foo/+
     Acc;
-match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
-    case has_prefix(Prefix) of
+match_no_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
+    case has_prefix(Prefix, Trie) of
         true ->
-            Acc1 = 'match_#'(Prefix) ++ Acc0,
-            Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1),
-            match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc);
+            Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
+            Acc = match_no_compact(Words, join(Prefix, '+'), Trie, true, Acc1),
+            match_no_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc);
         false ->
             %% non-compact paths in database
             %% if there is no prefix matches the current topic prefix
@@ -243,26 +253,26 @@ match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
             Acc0
     end.
 
-match_compact([], Topic, IsWildcard, Acc) ->
-    'match_#'(Topic) ++ %% try match foo/bar/#
-    lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar
+match_compact([], Topic, Trie, IsWildcard, Acc) ->
+    'match_#'(Topic, Trie) ++ %% try match foo/bar/#
+    lookup_topic(Topic, Trie, IsWildcard) ++ %% try match foo/bar
     Acc;
-match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
-    Acc1 = 'match_#'(Prefix) ++ Acc0,
-    Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1),
+match_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
+    Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
+    Acc = match_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc1),
     WildcardPrefix = join(Prefix, '+'),
     %% go deeper to match current_prefix/+ only when:
     %% 1. current word is the last
     %% OR
     %% 2. there is a prefix = 'current_prefix/+'
-    case Words =:= [] orelse has_prefix(WildcardPrefix) of
-        true -> match_compact(Words, WildcardPrefix, true, Acc);
+    case Words =:= [] orelse has_prefix(WildcardPrefix, Trie) of
+        true -> match_compact(Words, WildcardPrefix, Trie, true, Acc);
         false -> Acc
     end.
 
-'match_#'(Prefix) ->
+'match_#'(Prefix, Trie) ->
     MlTopic = join(Prefix, '#'),
-    lookup_topic(MlTopic).
+    lookup_topic(MlTopic, Trie).
 
 is_compact() ->
     emqx:get_config([broker, perf, trie_compaction], true).

+ 1 - 1
apps/emqx/test/emqx_trie_SUITE.erl

@@ -183,7 +183,7 @@ t_delete3(_) ->
               ?TRIE:delete(<<"sensor/+/unknown">>)
           end),
     ?assertEqual([], ?TRIE:match(<<"sensor">>)),
-    ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>)).
+    ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)).
 
 clear_tables() -> emqx_trie:clear_tables().