Przeglądaj źródła

fix issue #427 - Optimization for Route ETS insertion

Feng 10 lat temu
rodzic
commit
4e474ee8b9
1 zmienionych plików z 74 dodań i 62 usunięć
  1. 74 62
      src/emqttd_pubsub.erl

+ 74 - 62
src/emqttd_pubsub.erl

@@ -1,5 +1,5 @@
 %%%-----------------------------------------------------------------------------
-%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved.
 %%%
 %%% Permission is hereby granted, free of charge, to any person obtaining a copy
 %%% of this software and associated documentation files (the "Software"), to deal
@@ -19,7 +19,7 @@
 %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
-%%% @doc emqttd pubsub
+%%% @doc PubSub
 %%%
 %%% @author Feng Lee <feng@emqtt.io>
 %%%-----------------------------------------------------------------------------
@@ -43,9 +43,7 @@
 -export([start_link/4]).
 
 -export([create/2, lookup/2, subscribe/1, subscribe/2,
-         unsubscribe/1, unsubscribe/2, publish/1, delete/2]).
-
-%% Subscriptions API
+         publish/1, unsubscribe/1, unsubscribe/2, delete/2]).
 
 %% Local node
 -export([match/1]).
@@ -62,8 +60,6 @@
 
 -define(ROUTER, emqttd_router).
 
--define(HELPER, emqttd_pubsub_helper).
-
 %%%=============================================================================
 %%% Mnesia callbacks
 %%%=============================================================================
@@ -123,14 +119,11 @@ cache_env(Key) ->
 %%% API
 %%%=============================================================================
 
-%%------------------------------------------------------------------------------
 %% @doc Start one pubsub server
-%% @end
-%%------------------------------------------------------------------------------
 -spec start_link(Pool, Id, StatsFun, Opts) -> {ok, pid()} | ignore | {error, any()} when
     Pool     :: atom(),
     Id       :: pos_integer(),
-    StatsFun :: fun(),
+    StatsFun :: fun((atom()) -> any()),
     Opts     :: list(tuple()).
 start_link(Pool, Id, StatsFun, Opts) ->
     gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []).
@@ -138,11 +131,9 @@ start_link(Pool, Id, StatsFun, Opts) ->
 name(Id) ->
     list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)).
 
-%%------------------------------------------------------------------------------
 %% @doc Create Topic or Subscription.
-%% @end
-%%------------------------------------------------------------------------------
--spec create(topic | subscription, binary() | {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}.
+-spec create(topic, emqttd_topic:topic()) -> ok | {error, any()};
+            (subscription, {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}.
 create(topic, Topic) when is_binary(Topic) ->
     Record = #mqtt_topic{topic = Topic, node = node()},
     case mnesia:transaction(fun add_topic/1, [Record]) of
@@ -151,39 +142,33 @@ create(topic, Topic) when is_binary(Topic) ->
     end;
 
 create(subscription, {SubId, Topic, Qos}) when is_binary(SubId) andalso is_binary(Topic) ->
-    case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, Qos}]) of
+    case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, ?QOS_I(Qos)}]) of
         {atomic, ok}     -> ok;
         {aborted, Error} -> {error, Error}
     end.
 
-%%------------------------------------------------------------------------------
 %% @doc Lookup Topic or Subscription.
-%% @end
-%%------------------------------------------------------------------------------
--spec lookup(topic | subscription, binary()) -> list().
-lookup(topic, Topic) ->
+-spec lookup(topic, emqttd_topic:topic()) -> list(mqtt_topic());
+            (subscription, binary())      -> list(mqtt_subscription()).
+lookup(topic, Topic) when is_binary(Topic) ->
     mnesia:dirty_read(topic, Topic);
 
-lookup(subscription, ClientId) ->
-    mnesia:dirty_read(subscription, ClientId).
+lookup(subscription, SubId) when is_binary(SubId) ->
+    mnesia:dirty_read(subscription, SubId).
 
-%%------------------------------------------------------------------------------
 %% @doc Delete Topic or Subscription.
-%% @end
-%%------------------------------------------------------------------------------
+-spec delete(topic, emqttd_topic:topic()) -> ok | {error, any()};
+            (subscription, binary() | {binary(), emqttd_topic:topic()}) -> ok.
 delete(topic, _Topic) ->
     {error, unsupported};
 
-delete(subscription, ClientId) when is_binary(ClientId) ->
-    mnesia:dirty_delete({subscription, ClientId});
+delete(subscription, SubId) when is_binary(SubId) ->
+    mnesia:dirty_delete({subscription, SubId});
 
-delete(subscription, {ClientId, Topic}) when is_binary(ClientId) ->
-    mnesia:async_dirty(fun remove_subscriptions/2, [ClientId, [Topic]]).
+delete(subscription, {SubId, Topic}) when is_binary(SubId) andalso is_binary(Topic) ->
+    mnesia:async_dirty(fun remove_subscriptions/2, [SubId, [Topic]]).
 
-%%------------------------------------------------------------------------------
 %% @doc Subscribe Topics
-%% @end
-%%------------------------------------------------------------------------------
 -spec subscribe({Topic, Qos} | list({Topic, Qos})) ->
     {ok, Qos | list(Qos)} | {error, any()} when
     Topic   :: binary(),
@@ -206,34 +191,28 @@ subscribe(ClientId, TopicTable) when is_binary(ClientId) andalso is_list(TopicTa
 fixqos(TopicTable) ->
     [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable].
 
-call(Request) ->
-    PubSub = gproc_pool:pick_worker(pubsub, self()),
-    gen_server2:call(PubSub, Request, infinity).
-
-%%------------------------------------------------------------------------------
 %% @doc Unsubscribe Topic or Topics
-%% @end
-%%------------------------------------------------------------------------------
--spec unsubscribe(binary() | list(binary())) -> ok.
+-spec unsubscribe(emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok.
 unsubscribe(Topic) when is_binary(Topic) ->
     unsubscribe([Topic]);
 unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
     cast({unsubscribe, {undefined, self()}, Topics}).
 
--spec unsubscribe(binary(), binary() | list(binary())) -> ok.
+-spec unsubscribe(binary(), emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok.
 unsubscribe(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) ->
     unsubscribe(ClientId, [Topic]);
 unsubscribe(ClientId, Topics = [Topic|_]) when is_binary(Topic) ->
     cast({unsubscribe, {ClientId, self()}, Topics}).
 
+call(Request) ->
+    gen_server2:call(pick(self()), Request, infinity).
+
 cast(Msg) ->
-    PubSub = gproc_pool:pick_worker(pubsub, self()),
-    gen_server2:cast(PubSub, Msg).
+    gen_server2:cast(pick(self()), Msg).
+
+pick(Self) -> gproc_pool:pick_worker(pubsub, Self).
 
-%%------------------------------------------------------------------------------
 %% @doc Publish to cluster nodes
-%% @end
-%%------------------------------------------------------------------------------
 -spec publish(Msg :: mqtt_message()) -> ok.
 publish(Msg = #mqtt_message{from = From}) ->
     trace(publish, From, Msg),
@@ -257,35 +236,41 @@ publish(To, Msg) ->
                     end
                   end, match(To)).
 
-%%------------------------------------------------------------------------------
 %% @doc Match Topic Name with Topic Filters
-%% @end
-%%------------------------------------------------------------------------------
--spec match(binary()) -> [mqtt_topic()].
+-spec match(emqttd_topic:topic()) -> [mqtt_topic()].
 match(To) ->
     MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
-    %% ets:lookup for topic table will be replicated.
+    %% ets:lookup for topic table will be replicated to all nodes.
     lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]).
 
 %%%=============================================================================
 %%% gen_server callbacks
 %%%=============================================================================
 
-init([Pool, Id, StatsFun, Opts]) ->
-    ?ROUTER:init(Opts),
+init([Pool, Id, StatsFun, _Opts]) ->
     ?GPROC_POOL(join, Pool, Id),
     {ok, #state{pool = Pool, id = Id, statsfun = StatsFun}}.
 
 handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
             State = #state{statsfun = StatsFun}) ->
 
+    %% Monitor SubPid first
+    try_monitor(SubPid),
+
+    %% Topics
     Topics = [Topic || {Topic, _Qos} <- TopicTable],
 
-    %% Add routes first
-    ?ROUTER:add_routes(Topics, SubPid),
+    NewTopics = Topics -- reverse_routes(SubPid),
+
+    %% Add routes
+    ?ROUTER:add_routes(NewTopics, SubPid),
 
-    %% Insert topic records to global topic table
-    Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- Topics],
+    insert_reverse_routes(SubPid, NewTopics),
+
+    StatsFun(route),
+
+    %% Insert topic records to mnesia
+    Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics],
 
     case mnesia:transaction(fun add_topics/1, [Records]) of
         {atomic, _} ->
@@ -307,9 +292,12 @@ handle_call(Req, _From, State) ->
    ?UNEXPECTED_REQ(Req, State).
 
 handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) ->
+
     %% Delete routes first
     ?ROUTER:delete_routes(Topics, SubPid),
 
+    delete_reverse_routes(SubPid, Topics),
+
     %% Remove subscriptions
     if_subscription(
         fun(_) ->
@@ -324,12 +312,11 @@ handle_cast(Msg, State) ->
 
 handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
 
-    Routes = ?ROUTER:lookup_routes(DownPid),
+    Topics = reverse_routes(DownPid),
 
-    %% Delete all routes of the process
-    ?ROUTER:delete_routes(DownPid),
+    ?ROUTER:delete_routes(Topics, DownPid),
 
-    ?HELPER:aging([Topic || Topic <- Routes, not ?ROUTER:has_route(Topic)]),
+    delete_reverse_routes(DownPid),
 
     {noreply, State, hibernate};
 
@@ -395,6 +382,31 @@ remove_subscriptions(SubId, Topics) ->
 delete_subscription(Record) ->
     mnesia:delete_object(subscription, Record, write).
 
+reverse_routes(SubPid) ->
+    case ets:member(reverse_route, SubPid) of
+        true  ->
+            try ets:lookup_element(reverse_route, SubPid, 2) catch error:badarg -> [] end;
+        false ->
+            []
+    end.
+
+insert_reverse_routes(SubPid, Topics) ->
+    ets:insert(reverse_route, [{SubPid, Topic} || Topic <- Topics]).
+
+delete_reverse_routes(SubPid, Topics) ->
+    lists:foreach(fun(Topic) ->
+                ets:delete_object(reverse_route, {SubPid, Topic})
+        end, Topics).
+
+delete_reverse_routes(SubPid) ->
+    ets:delete(reverse_route, SubPid).
+
+try_monitor(SubPid) ->
+    case ets:member(reverse_route, SubPid) of
+        true  -> ignore;
+        false -> erlang:monitor(process, SubPid)
+    end.
+
 %%%=============================================================================
 %%% Trace Functions
 %%%=============================================================================