Feng 10 лет назад
Родитель
Сommit
3745d7a24b
2 измененных файлов с 43 добавлено и 14 удалено
  1. 42 13
      src/emqttd_pubsub_sup.erl
  2. 1 1
      src/emqttd_router.erl

+ 42 - 13
src/emqttd_pubsub_sup.erl

@@ -1,5 +1,5 @@
 %%%-----------------------------------------------------------------------------
-%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved.
 %%%
 %%% Permission is hereby granted, free of charge, to any person obtaining a copy
 %%% of this software and associated documentation files (the "Software"), to deal
@@ -19,7 +19,7 @@
 %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
-%%% @doc PubSub Supervisor
+%%% @doc PubSub Supervisor.
 %%%
 %%% @author Feng Lee <feng@emqtt.io>
 %%%-----------------------------------------------------------------------------
@@ -31,6 +31,8 @@
 
 -define(HELPER, emqttd_pubsub_helper).
 
+-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
+
 %% API
 -export([start_link/0]).
 
@@ -40,24 +42,51 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_broker:env(pubsub)]).
 
-init([Opts]) ->
+init([Env]) ->
+    %% Create tabs
+    create_tab(route), create_tab(reverse_route),
+
     %% PubSub Helper
-    Helper = {helper, {?HELPER, start_link, [fun stats/1, Opts]},
+    Helper = {helper, {?HELPER, start_link, [fun setstats/1]},
                 permanent, infinity, worker, [?HELPER]},
 
+    %% Router Pool Sup
+    RouterMFA = {emqttd_router, start_link, [fun setstats/1, Env]},
+    %% Pool_size / 2
+    RouterSup = emqttd_pool_sup:spec(router_pool, [router, hash, pool_size(Env) div 2, RouterMFA]),
+
     %% PubSub Pool Sup
-    MFA = {emqttd_pubsub, start_link, [fun stats/1, Opts]},
-    PoolSup = emqttd_pool_sup:spec([pubsub, hash, pool_size(Opts), MFA]),
-    {ok, {{one_for_all, 10, 60}, [Helper, PoolSup]}}.
+    PubSubMFA = {emqttd_pubsub, start_link, [fun setstats/1, Env]},
+    PubSubSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]),
+
+    {ok, {{one_for_all, 10, 60}, [Helper, RouterSup, PubSubSup]}}.
+
+create_tab(route) ->
+    %% Route Table: Topic -> Pid1, Pid2, ..., PidN
+    %% duplicate_bag: o(1) insert
+    ensure_tab(route, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
 
-pool_size(Opts) ->
+create_tab(reverse_route) ->
+    %% Reverse Route Table: Pid -> Topic1, Topic2, ..., TopicN
+    ensure_tab(reverse_route, [public, named_table, bag | ?CONCURRENCY_OPTS]).
+
+ensure_tab(Tab, Opts) ->
+    case ets:info(Tab, name) of
+        undefined -> ets:new(Tab, Opts);
+        _ -> ok
+    end.
+
+pool_size(Env) ->
     Schedulers = erlang:system_info(schedulers),
-    proplists:get_value(pool_size, Opts, Schedulers).
+    proplists:get_value(pool_size, Env, Schedulers).
+
+setstats(route) ->
+    emqttd_stats:setstat('routes/count', ets:info(route, size));
+
+setstats(topic) ->
+    emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size));
 
-stats(topic) ->
-    emqttd_stats:setstats('topics/count', 'topics/max',
-                          mnesia:table_info(topic, size));
-stats(subscription) ->
+setstats(subscription) ->
     emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
                           mnesia:table_info(subscription, size)).
 

+ 1 - 1
src/emqttd_router.erl

@@ -192,7 +192,7 @@ handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) ->
     case has_route(Topic) of
         false ->
             {noreply, State#state{aging = store_aged(Topic, Aging)}};
-        true -> 
+        true ->
             {noreply, State}
     end;