Просмотр исходного кода

Optimize the route and trie modules.

1. Use mnesia:wread/1 to replace mnesia:read/2
2. Update the router supervisor
Feng Lee 7 лет назад
Родитель
Сommit
f008ceb5c8
4 измененных файлов с 23 добавлено и 18 удалено
  1. 8 7
      src/emqx_router.erl
  2. 2 2
      src/emqx_router_helper.erl
  3. 6 2
      src/emqx_router_sup.erl
  4. 7 7
      src/emqx_trie.erl

+ 8 - 7
src/emqx_router.erl

@@ -34,7 +34,8 @@
 -export([has_routes/1, match_routes/1, print_routes/1]).
 -export([topics/0]).
 %% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+         code_change/3]).
 
 -type(destination() :: node() | {binary(), node()}).
 
@@ -45,9 +46,9 @@
 -define(BATCH(Enabled), #batch{enabled = Enabled}).
 -define(BATCH(Enabled, Pending), #batch{enabled = Enabled, pending = Pending}).
 
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Mnesia bootstrap
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 mnesia(boot) ->
     ok = ekka_mnesia:create_table(?ROUTE, [
@@ -132,9 +133,9 @@ cast(Router, Msg) ->
 pick(Topic) ->
     gproc_pool:pick_worker(router, Topic).
 
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% gen_server callbacks
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 init([Pool, Id]) ->
     rand:seed(exsplus, erlang:timestamp()),
@@ -207,9 +208,9 @@ terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Internal functions
-%%-----------------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) ->
     State;

+ 2 - 2
src/emqx_router_helper.erl

@@ -84,8 +84,8 @@ monitor(Node) when is_atom(Node) ->
 %%------------------------------------------------------------------------------
 
 init([]) ->
-    ekka:monitor(membership),
-    mnesia:subscribe({table, ?ROUTING_NODE, simple}),
+    _ = ekka:monitor(membership),
+    _ = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
     Nodes = lists:foldl(
               fun(Node, Acc) ->
                   case ekka:is_member(Node) of

+ 6 - 2
src/emqx_router_sup.erl

@@ -24,8 +24,12 @@ start_link() ->
 
 init([]) ->
     %% Router helper
-    Helper = {router_helper, {emqx_router_helper, start_link, []},
-              permanent, 5000, worker, [emqx_router_helper]},
+    Helper = #{id       => helper,
+               start    => {emqx_router_helper, start_link, []},
+               restart  => permanent,
+               shutdown => 5000,
+               type     => worker,
+               modules  => [emqx_router_helper]},
 
     %% Router pool
     RouterPool = emqx_pool_sup:spec(emqx_router_pool,

+ 7 - 7
src/emqx_trie.erl

@@ -62,10 +62,10 @@ mnesia(copy) ->
 %% Trie APIs
 %%------------------------------------------------------------------------------
 
-%% @doc Insert a topic into the trie
+%% @doc Insert a topic filter into the trie.
 -spec(insert(emqx_topic:topic()) -> ok).
 insert(Topic) when is_binary(Topic) ->
-    case mnesia:read(?TRIE_NODE, Topic) of
+    case mnesia:wread({?TRIE_NODE, Topic}) of
         [#trie_node{topic = Topic}] ->
             ok;
         [TrieNode = #trie_node{topic = undefined}] ->
@@ -77,21 +77,21 @@ insert(Topic) when is_binary(Topic) ->
             write_trie_node(#trie_node{node_id = Topic, topic = Topic})
     end.
 
-%% @doc Find trie nodes that match the topic
+%% @doc Find trie nodes that match the topic name.
 -spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
 match(Topic) when is_binary(Topic) ->
     TrieNodes = match_node(root, emqx_topic:words(Topic)),
     [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
 
-%% @doc Lookup a trie node
+%% @doc Lookup a trie node.
 -spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
 lookup(NodeId) ->
     mnesia:read(?TRIE_NODE, NodeId).
 
-%% @doc Delete a topic from the trie
+%% @doc Delete a topic filter from the trie.
 -spec(delete(emqx_topic:topic()) -> ok).
 delete(Topic) when is_binary(Topic) ->
-    case mnesia:read(?TRIE_NODE, Topic) of
+    case mnesia:wread({?TRIE_NODE, Topic}) of
         [#trie_node{edge_count = 0}] ->
             mnesia:delete({?TRIE_NODE, Topic}),
             delete_path(lists:reverse(emqx_topic:triples(Topic)));
@@ -108,7 +108,7 @@ delete(Topic) when is_binary(Topic) ->
 %% @doc Add a path to the trie.
 add_path({Node, Word, Child}) ->
     Edge = #trie_edge{node_id = Node, word = Word},
-    case mnesia:read(?TRIE_NODE, Node) of
+    case mnesia:wread({?TRIE_NODE, Node}) of
         [TrieNode = #trie_node{edge_count = Count}] ->
             case mnesia:wread({?TRIE, Edge}) of
                 [] ->