Преглед изворни кода

support trie topic structure

erylee пре 13 година
родитељ
комит
31c687f26c
5 измењених фајлова са 184 додато и 80 уклоњено
  1. 7 1
      include/emqtt.hrl
  2. 1 1
      src/emqtt_client.erl
  3. 2 2
      src/emqtt_frame.erl
  4. 119 51
      src/emqtt_router.erl
  5. 55 25
      src/emqtt_topic.erl

+ 7 - 1
include/emqtt.hrl

@@ -29,7 +29,13 @@
 %name: <<"a/b/c">>
 %node: node()
 %words: [<<"a">>, <<"b">>, <<"c">>]
--record(topic, {name, node, words}).
+-record(topic, {name, node}).
+
+-record(trie, {edge, node_id}).
+
+-record(trie_node, {node_id, edge_count=0, topic}).
+
+-record(trie_edge, {node_id, word}).
 
 %topic: topic name
 

+ 1 - 1
src/emqtt_client.erl

@@ -383,7 +383,7 @@ stop(Reason, State ) ->
     {stop, Reason, State}.
 
 valid_client_id(ClientId) ->
-    ClientIdLen = size(ClientId),
+    ClientIdLen = length(ClientId),
     1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
 
 retained(false, _Topic, _Msg) ->

+ 2 - 2
src/emqtt_frame.erl

@@ -23,7 +23,7 @@
 -export([serialise/1]).
 
 -define(RESERVED, 0).
--define(PROTOCOL_MAGIC, <<"MQIsdp">>).
+-define(PROTOCOL_MAGIC, "MQIsdp").
 -define(MAX_LEN, 16#fffffff).
 -define(HIGHBIT, 2#10000000).
 -define(LOWBITS, 2#01111111).
@@ -145,7 +145,7 @@ parse_utf(Bin, _) ->
     parse_utf(Bin).
 
 parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
-    {Str, Rest}.
+    {binary_to_list(Str), Rest}.
 
 parse_msg(Bin, 0) ->
     {undefined, Bin};

+ 119 - 51
src/emqtt_router.erl

@@ -11,7 +11,6 @@
 %% Developer of the eMQTT Code is <ery.lee@gmail.com>
 %% Copyright (c) 2012 Ery Lee.  All rights reserved.
 %%
-
 -module(emqtt_router).
 
 -include("emqtt.hrl").
@@ -22,11 +21,12 @@
 
 -export([start_link/0]).
 
--export([topics/1,
+-export([topics/0,
 		subscribe/2,
 		unsubscribe/2,
 		publish/2,
 		route/2,
+		match/1,
 		down/1]).
 
 -behaviour(gen_server).
@@ -43,20 +43,17 @@
 start_link() ->
 	gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-topics(direct) ->
-	mnesia:dirty_all_keys(direct_topic);
-
-topics(wildcard) ->
-	mnesia:dirty_all_keys(wildcard_topic).
+topics() ->
+	mnesia:dirty_all_keys(topic).
 
 subscribe({Topic, Qos}, Client) when is_pid(Client) ->
 	gen_server2:call(?MODULE, {subscribe, {Topic, Qos}, Client}).
 
-unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) ->
+unsubscribe(Topic, Client) when is_list(Topic) and is_pid(Client) ->
 	gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}).
 
 %publish to cluster node.
-publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) ->
+publish(Topic, Msg) when is_list(Topic) and is_record(Msg, mqtt_msg) ->
 	lists:foreach(fun(#topic{name=Name, node=Node}) ->
 		case Node == node() of
 		true -> route(Name, Msg);
@@ -68,63 +65,54 @@ publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) ->
 route(Topic, Msg) ->
 	[Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Topic)].
 
-match(Topic) when is_binary(Topic)  ->
-	DirectMatches = mnesia:dirty_read(direct_topic, Topic),
-	TopicWords = emqtt_topic:words(Topic),
-	WildcardQuery = qlc:q([T || T = #topic{words=Words}
-								<- mnesia:table(wildcard_topic), 
-									emqtt_topic:match(TopicWords, Words)]), %
-	
-	{atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end),
-	%mnesia:async_dirty(fun qlc:e/1, WildcardQuery),
-	%?INFO("~p", [WildcardMatches]),
-	DirectMatches ++ WildcardMatches.
+match(Topic) when is_list(Topic) ->
+	TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),
+    Names = [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined],
+	lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]).
 
 down(Client) when is_pid(Client) ->
 	gen_server2:cast(?MODULE, {down, Client}).
 
 init([]) ->
-	mnesia:create_table(direct_topic, [
-		{type, bag},
-		{record_name, topic},
-		{ram_copies, [node()]}, 
-		{attributes, record_info(fields, topic)}]),
-	mnesia:add_table_copy(direct_topic, node(), ram_copies),
-	mnesia:create_table(wildcard_topic, [
+	mnesia:create_table(trie, [
+		{ram_copies, [node()]},
+		{attributes, record_info(fields, trie)}]),
+	mnesia:add_table_copy(trie, node(), ram_copies),
+	mnesia:create_table(trie_node, [
+		{ram_copies, [node()]},
+		{attributes, record_info(fields, trie_node)}]),
+	mnesia:add_table_copy(trie_node, node(), ram_copies),
+	mnesia:create_table(topic, [
 		{type, bag},
-		{index, [#topic.words]},
 		{record_name, topic},
 		{ram_copies, [node()]}, 
 		{attributes, record_info(fields, topic)}]),
-	mnesia:add_table_copy(wildcard_topic, node(), ram_copies),
+	mnesia:add_table_copy(topic, node(), ram_copies),
 	ets:new(subscriber, [bag, named_table, {keypos, 2}]),
 	?INFO_MSG("emqtt_router is started."),
 	{ok, #state{}}.
 
-handle_call({subscribe, {Name, Qos}, Client}, _From, State) ->
-	Topic = #topic{name=Name, node=node(), words=emqtt_topic:words(Name)},
-	case emqtt_topic:type(Topic) of
-	direct -> 
-		ok = mnesia:dirty_write(direct_topic, Topic);
-	wildcard -> 
-		ok = mnesia:dirty_write(wildcard_topic, Topic)
-	end,
-	ets:insert(subscriber, #subscriber{topic=Name, qos=Qos, client=Client}),
-	emqtt_retained:send(Name, Client),
-	{reply, ok, State};
+handle_call({subscribe, {Topic, Qos}, Client}, _From, State) ->
+	case mnesia:transaction(fun trie_add/1, [Topic]) of
+	{atomic, _} ->	
+		ets:insert(subscriber, #subscriber{topic=Topic, qos=Qos, client=Client}),
+		emqtt_retained:send(Topic, Client),
+		{reply, ok, State};
+	{aborted, Reason} ->
+		{reply, {error, Reason}, State}
+	end;
 
 handle_call(Req, _From, State) ->
 	{stop, {badreq, Req}, State}.
 
 handle_cast({unsubscribe, Topic, Client}, State) ->
-	ets:match_delete(subscriber, {subscriber, Topic, '_', Client}),
+	ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client, _='_'}),
 	try_remove_topic(Topic),
 	{noreply, State};
 
 handle_cast({down, Client}, State) ->
-	case ets:match_object(subscriber, {subscriber, '_', '_', Client}) of
-	[] -> 
-		ignore;
+	case ets:match_object(subscriber, #subscriber{client=Client, _='_'}) of
+	[] -> ignore;
 	Subs -> 
 		[ets:delete_object(subscriber, Sub) || Sub <- Subs],
 		[try_remove_topic(Topic) || #subscriber{topic=Topic} <- Subs]
@@ -134,7 +122,6 @@ handle_cast({down, Client}, State) ->
 handle_cast(Msg, State) ->
 	{stop, {badmsg, Msg}, State}.
 
-
 handle_info(Info, State) ->
 	{stop, {badinfo, Info}, State}.
 
@@ -151,12 +138,93 @@ try_remove_topic(Name) ->
 	case ets:member(subscriber, Name) of
 	false -> 
 		Topic = emqtt_topic:new(Name),
-		case emqtt_topic:type(Topic) of
-		direct ->
-			mnesia:dirty_delete_object(direct_topic, Topic);
-		wildcard -> 
-			mnesia:dirty_delete_object(wildcard_topic, Topic)
+		Fun = fun() -> 
+			mnesia:delete_object(topic, Topic),
+			case mnesia:read(topic, Topic) of
+			[] -> trie_delete(Name);		
+			_ -> ignore
+			end
+		end,
+		mnesia:transaction(Fun);
+	true -> 
+		ok
+	end.
+
+trie_add(Topic) ->
+	mnesia:write(emqtt_topic:new(Topic)),
+	case mnesia:read(trie_node, Topic) of
+	[TrieNode=#trie_node{topic=undefined}] ->
+		mnesia:write(TrieNode#trie_node{topic=Topic});
+	[#trie_node{topic=Topic}] ->
+		ignore;
+	[] ->
+		%add trie path
+		[trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],
+		%add last node
+		mnesia:write(#trie_node{node_id=Topic, topic=Topic})
+	end.
+
+trie_delete(Topic) ->
+	case mnesia:read(trie_node, Topic) of
+	[#trie_node{edge_count=0}] -> 
+		mnesia:delete({trie_node, Topic}),
+		trie_delete_path(lists:reverse(emqtt_topic:triples(Topic)));
+	[TrieNode] ->
+		mnesia:write(TrieNode#trie_node{topic=Topic});
+	[] ->
+		ignore
+	end.
+	
+trie_match(Words) ->
+	trie_match(root, Words, []).
+
+trie_match(NodeId, [], ResAcc) ->
+	mnesia:read(trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc);
+
+trie_match(NodeId, [W|Words], ResAcc) ->
+	lists:foldl(fun(WArg, Acc) ->
+		case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of
+		[#trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc);
+		[] -> Acc
+		end
+	end, 'trie_match_#'(NodeId, ResAcc), [W, "+"]).
+
+'trie_match_#'(NodeId, ResAcc) ->
+	case mnesia:read(trie, #trie_edge{node_id=NodeId, word="#"}) of
+	[#trie{node_id=ChildId}] ->
+		mnesia:read(trie_node, ChildId) ++ ResAcc;	
+	[] ->
+		ResAcc
+	end.
+
+trie_add_path({Node, Word, Child}) ->
+	Edge = #trie_edge{node_id=Node, word=Word},
+	case mnesia:read(trie_node, Node) of
+	[TrieNode = #trie_node{edge_count=Count}] ->
+		case mnesia:read(trie, Edge) of
+		[] -> 
+			mnesia:write(TrieNode#trie_node{edge_count=Count+1}),
+			mnesia:write(#trie{edge=Edge, node_id=Child});
+		[_] -> 
+			ok
 		end;
-	true -> ok
+	[] ->
+		mnesia:write(#trie_node{node_id=Node, edge_count=1}),
+		mnesia:write(#trie{edge=Edge, node_id=Child})
+	end.
+
+trie_delete_path([{NodeId, Word, _} | RestPath]) ->
+	Edge = #trie_edge{node_id=NodeId, word=Word},
+	mnesia:delete({trie, Edge}),
+	case mnesia:read(trie_node, NodeId) of
+	[#trie_node{edge_count=1, topic=undefined}] -> 
+		mnesia:delete({trie_node, NodeId}),
+		trie_delete_path(RestPath);
+	[TrieNode=#trie_node{edge_count=1, topic=_}] -> 
+		mnesia:write(TrieNode#trie_node{edge_count=0});
+	[TrieNode=#trie_node{edge_count=C}] ->
+		mnesia:write(TrieNode#trie_node{edge_count=C-1});
+	[] ->
+		throw({notfound, NodeId}) 
 	end.
 

+ 55 - 25
src/emqtt_topic.erl

@@ -13,6 +13,8 @@
 %%
 -module(emqtt_topic).
 
+-import(string, [rchr/2, substr/2, substr/3]).
+
 %% ------------------------------------------------------------------------
 %% Topic semantics and usage
 %% ------------------------------------------------------------------------
@@ -35,25 +37,30 @@
 
 -include("emqtt.hrl").
  
--export([new/1, type/1, match/2, validate/1, words/1]).
+-export([new/1,
+		 type/1,
+		 match/2,
+		 validate/1,
+		 triples/1,
+		 words/1]).
 
 -export([test/0]).
 
 -define(MAX_LEN, 64*1024).
 
-new(Name) when is_binary(Name) ->
-	#topic{name=Name, node=node(), words=words(Name)}.
+new(Name) when is_list(Name) ->
+	#topic{name=Name, node=node()}.
 
 %% ------------------------------------------------------------------------
 %% topic type: direct or wildcard
 %% ------------------------------------------------------------------------
-type(#topic{words=Words}) ->
-	type(Words);
+type(#topic{name=Name}) ->
+	type(words(Name));
 type([]) ->
 	direct;
-type([<<"#">>]) ->
+type(["#"]) ->
 	wildcard;
-type([<<"+">>|_T]) ->
+type(["+"|_T]) ->
 	wildcard;
 type([_|T]) ->
 	type(T).
@@ -65,9 +72,9 @@ match([], []) ->
 	true;
 match([H|T1], [H|T2]) ->
 	match(T1, T2);
-match([_H|T1], [<<"+">>|T2]) ->
+match([_H|T1], ["+"|T2]) ->
 	match(T1, T2);
-match(_, [<<"#">>]) ->
+match(_, ["#"]) ->
 	true;
 match([_H1|_], [_H2|_]) ->
 	false;
@@ -78,38 +85,61 @@ match([], [_H|_T2]) ->
 %% ------------------------------------------------------------------------
 %% topic validate
 %% ------------------------------------------------------------------------
-validate({_, <<>>}) ->
+validate({_, ""}) ->
 	false;
-validate({_, Topic}) when size(Topic) > ?MAX_LEN ->
+validate({_, Topic}) when length(Topic) > ?MAX_LEN ->
 	false;
-validate({subscribe, Topic}) when is_binary(Topic) ->
+validate({subscribe, Topic}) when is_list(Topic) ->
 	valid(words(Topic));
-validate({publish, Topic}) when is_binary(Topic) ->
+validate({publish, Topic}) when is_list(Topic) ->
 	Words = words(Topic),
 	valid(Words) and (not include_wildcard(Words)).
 
-words(Topic) when is_binary(Topic) ->
-	binary:split(Topic, [<<"/">>], [global]).
+triples(S) when is_list(S) ->
+	triples(S, []).
+
+triples(S, Acc) ->
+	triples(rchr(S, $/), S, Acc).
+
+triples(0, S, Acc) ->
+	[{root, S, S}|Acc];
+
+triples(I, S, Acc) ->
+	S1 = substr(S, 1, I-1),
+	S2 = substr(S, I+1),
+	triples(S1, [{S1, S2, S}|Acc]).
+
+words(Topic) when is_list(Topic) ->
+	words(Topic, [], []).
+
+words([], Word, ResAcc) ->
+	lists:reverse([Word|ResAcc]);
+
+words([$/|Topic], Word, ResAcc) ->
+	words(Topic, [], [Word|ResAcc]);
+
+words([C|Topic], Word, ResAcc) ->
+	words(Topic, lists:reverse([C|Word]), ResAcc).
 
-valid([<<>>|Words]) -> valid2(Words);
+valid([""|Words]) -> valid2(Words);
 valid(Words) -> valid2(Words).
 
-valid2([<<>>|_Words]) -> false;
-valid2([<<"#">>|Words]) when length(Words) > 0 -> false; 
+valid2([""|_Words]) -> false;
+valid2(["#"|Words]) when length(Words) > 0 -> false; 
 valid2([_|Words]) -> valid2(Words);
 valid2([]) -> true.
 
 include_wildcard([]) -> false;
-include_wildcard([<<"#">>|_T]) -> true;
-include_wildcard([<<"+">>|_T]) -> true;
+include_wildcard(["#"|_T]) -> true;
+include_wildcard(["+"|_T]) -> true;
 include_wildcard([_H|T]) -> include_wildcard(T).
 
 
 test() ->
-	true = validate({subscribe, <<"a/b/c">>}),
-	true = validate({subscribe, <<"/a/b">>}),
-	true = validate({subscribe, <<"/+/x">>}),
-	true = validate({subscribe, <<"/a/b/c/#">>}),
-	false = validate({subscribe, <<"a/#/c">>}),
+	true = validate({subscribe, "a/b/c"}),
+	true = validate({subscribe, "/a/b"}),
+	true = validate({subscribe, "/+/x"}),
+	true = validate({subscribe, "/a/b/c/#"}),
+	false = validate({subscribe, "a/#/c"}),
 	ok.