Feng Lee 11 лет назад
Родитель
Сommit
a72fccf28d
1 измененных файлов с 56 добавлено и 74 удалено
  1. 56 74
      apps/emqttd/src/emqttd_pubsub.erl

+ 56 - 74
apps/emqttd/src/emqttd_pubsub.erl

@@ -48,8 +48,8 @@
 
 -export([topics/0,
         create/1,
-		subscribe/2,
-		unsubscribe/2,
+		subscribe/1,
+		unsubscribe/1,
 		publish/1,
 		publish/2,
         %local node
@@ -108,8 +108,8 @@ topics() ->
 %% @end
 %%------------------------------------------------------------------------------
 -spec create(binary()) -> {atomic,  Reason :: any()} |  {aborted, Reason :: any()}.
-create(Topic) -> 
-    gen_server:call(?SERVER, {create, Topic}).
+create(Topic) when is_binary(Topic) -> 
+	mnesia:transaction(fun trie_add/1, [Topic]).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -117,16 +117,33 @@ create(Topic) ->
 %%
 %% @end
 %%------------------------------------------------------------------------------
--spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}.
-subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
-    subscribe([{Topic, Qos}], SubPid);
-
-%% TODO: 
-%% call will not work when there are 2000K+ clients, 100+ sub requests/sec...
-%% will optimize in 0.9.x...
-%% 
-subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
-    gen_server:call(?SERVER, {subscribe, Topics, SubPid}, infinity).
+-spec subscribe({binary(), mqtt_qos()} | list()) -> {ok, list(mqtt_qos())}.
+subscribe({Topic, Qos}) when is_binary(Topic) ->
+    case subscribe([{Topic, Qos}]) of
+        {ok, [GrantedQos]} -> {ok, GrantedQos};
+        {error, Error} -> {error, Error}
+    end;
+subscribe(Topics = [{_Topic, _Qos}|_]) ->
+    subscribe(Topics, self(), []).
+
+subscribe([], _SubPid, Acc) ->
+    {ok, lists:reverse(Acc)};
+subscribe([{Topic, Qos}|Topics], SubPid, Acc) ->
+    Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid},
+    F = fun() -> 
+                case trie_add(Topic) of
+                    ok ->
+                        mnesia:write(Subscriber);
+                    {atomic, already_exist} ->
+                        mnesia:write(Subscriber);
+                    {aborted, Reason} ->
+                        {aborted, Reason}
+                end
+        end,
+    case mnesia:transaction(F) of
+        ok -> subscribe(Topics, SubPid, [Qos|Acc]);
+        {aborted, Reason} -> {error, {aborted, Reason}}
+    end.
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -134,12 +151,16 @@ subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
 %%
 %% @end
 %%------------------------------------------------------------------------------
--spec unsubscribe(binary() | list(binary()), pid()) -> ok.
-unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
-    unsubscribe([Topic], SubPid);
+-spec unsubscribe(binary() | list(binary())) -> ok.
+unsubscribe(Topic) when is_binary(Topic) ->
+    %% call mnesia directly
+    unsubscribe([Topic]);
+
+unsubscribe(Topics = [Topics|_]) when is_list(Topics) and is_binary(Topic) ->
+    unsubscribe(Topics, self()).
+
+unsubscribe(Topics, SubPid) ->
 
-unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
-	gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -199,42 +220,38 @@ match(Topic) when is_binary(Topic) ->
 %% ------------------------------------------------------------------
 
 init([]) ->
-	mnesia:create_table(topic_trie, [
-		{ram_copies, [node()]},
-		{attributes, record_info(fields, topic_trie)}]),
+    %% trie and topic tables, will be copied by all nodes.
 	mnesia:create_table(topic_trie_node, [
 		{ram_copies, [node()]},
 		{attributes, record_info(fields, topic_trie_node)}]),
+	mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
+	mnesia:create_table(topic_trie, [
+		{ram_copies, [node()]},
+		{attributes, record_info(fields, topic_trie)}]),
+	mnesia:add_table_copy(topic_trie, node(), ram_copies),
 	mnesia:create_table(topic, [
 		{type, bag},
 		{record_name, topic},
 		{ram_copies, [node()]}, 
 		{attributes, record_info(fields, topic)}]),
-	mnesia:add_table_copy(topic_trie, node(), ram_copies),
-	mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
 	mnesia:add_table_copy(topic, node(), ram_copies),
-	ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
+
+    %% local table, not shared with other table
+    mnesia:create_table(topic_subscriber, [
+		{type, bag},
+		{record_name, topic_subscriber},
+		{ram_copies, [node()]}, 
+		{attributes, record_info(fields, topic_subscriber)},
+        {index, [subpid]}, 
+        {local_content, true}]),
 	{ok, #state{}}.
 
 handle_call(getstats, _From, State = #state{max_subs = Max}) ->
     Stats = [{'topics/count', mnesia:table_info(topic, size)},
-             {'subscribers/count', ets:info(topic_subscriber, size)},
+             {'subscribers/count', mnesia:info(topic_subscriber, size)},
              {'subscribers/max', Max}],
     {reply, Stats, State};
 
-handle_call({create, Topic}, _From, State) ->
-	Result = mnesia:transaction(fun trie_add/1, [Topic]),
-    {reply, Result, setstats(State)};
-
-handle_call({subscribe, Topics, SubPid}, _From, State) ->
-    Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics],
-    Reply = 
-    case [Err || Err = {error, _} <- Result] of
-        [] -> {ok, [Qos || {ok, Qos} <- Result]};
-        Errors -> hd(Errors)
-    end,
-    {reply, Reply, setstats(State)};
-
 handle_call(Req, _From, State) ->
 	{stop, {badreq, Req}, State}.
 
@@ -273,41 +290,6 @@ code_change(_OldVsn, State, _Extra) ->
 %%%=============================================================================
 %%% Internal functions
 %%%=============================================================================
-subscribe_topic({Topic, Qos}, SubPid) ->
-	case mnesia:transaction(fun trie_add/1, [Topic]) of
-	{atomic, _} ->	
-		case get({subscriber, SubPid}) of
-		undefined -> 
-            %%TODO: refactor later...
-			MonRef = erlang:monitor(process, SubPid),
-			put({subcriber, SubPid}, MonRef),
-			put({submon, MonRef}, SubPid);
-		_ ->
-			already_monitored
-		end,
-        %% remove duplicated subscribers
-        try_remove_subscriber({Topic, Qos}, SubPid),
-		ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}),
-        %TODO: GrantedQos??
-        {ok, Qos};
-	{aborted, Reason} ->
-		{error, Reason}
-	end.
-
-try_remove_subscriber({Topic, Qos}, SubPid) ->
-    case ets:lookup(topic_subscriber, Topic) of
-        [] -> 
-            not_found;
-        Subs ->
-            DupSubs = [Sub || Sub = #topic_subscriber{qos = OldQos, subpid = OldPid} 
-                                    <- Subs, Qos =/= OldQos, OldPid =:= SubPid],
-            case DupSubs of
-                [] -> ok;
-                [DupSub] -> 
-                    lager:warning("PubSub: remove duplicated subscriber ~p", [DupSub]), 
-                    ets:delete(topic_subscriber, DupSub)
-            end
-    end.
 
 try_remove_topic(Name) when is_binary(Name) ->
 	case ets:member(topic_subscriber, Name) of