Ery Lee пре 11 година
родитељ
комит
d556c8a356

+ 10 - 1
CHANGELOG.md

@@ -1,11 +1,20 @@
+
 eMQTT ChangeLog
 ==================
 
-v0.3.1-beta (2015-01-24)
+v0.3.0-beta (2015-01-19)
 ------------------------
 
 Feature: HTTP POST API to support 'qos', 'retain' parameters
 
+Feature: $SYS system topics support
+
+Change: Rewrite emqtt_topic.erl, use '', '#', '+' to replace <<"">>, <<"#">>, <<"+">>
+
+Change: fix emqtt_pubsub.erl to match '#', '+'
+
+Tests: emqtt_topic_tests.erl add more test cases
+
 v0.3.0-alpha (2015-01-18)
 ------------------------
 

+ 1 - 1
README.md

@@ -1,6 +1,6 @@
 # eMQTT
 
-eMQTT is a clusterable, scalable, fault-tolerant and extensible MQTT V3.1.1 broker written in Erlang/OTP.
+eMQTT is a clusterable, massively scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP.
 
 eMQTT support MQTT V3.1/V3.1.1 Protocol Specification.
 

+ 13 - 13
apps/emqtt/include/emqtt_topic.hrl

@@ -1,5 +1,5 @@
 %%-----------------------------------------------------------------------------
-%% Copyright (c) 2015, Feng Lee <feng@emqtt.io>
+%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
 %% 
 %% Permission is hereby granted, free of charge, to any person obtaining a copy
 %% of this software and associated documentation files (the "Software"), to deal
@@ -24,32 +24,32 @@
 %% Core PubSub Topic
 %%------------------------------------------------------------------------------
 -record(topic, {
-    name	:: binary(),
-    node	:: node()
+    name    :: binary(),
+    node    :: node()
 }).
 
 -type topic() :: #topic{}.
 
 -record(topic_subscriber, {
-	topic	:: binary(),
-	qos = 0	:: non_neg_integer(),
-	subpid	:: pid()
+    topic    :: binary(),
+    qos = 0  :: non_neg_integer(),
+    subpid   :: pid()
 }).
 
 -record(topic_trie_node, {
-	node_id			:: binary(),
-	edge_count = 0	:: non_neg_integer(),
-	topic			:: binary()
+    node_id        	:: binary() | atom(),
+    edge_count = 0  :: non_neg_integer(),
+    topic    		:: binary()
 }).
 
 -record(topic_trie_edge, {
-	node_id	:: binary(),
-	word	:: binary() | char()
+    node_id :: binary() | atom(),
+    word    :: binary() | atom()
 }).
 
 -record(topic_trie, {
-	edge	:: #topic_trie_edge{},
-	node_id	:: binary()
+    edge    :: #topic_trie_edge{},
+    node_id :: binary() | atom()
 }).
 
 %%------------------------------------------------------------------------------

+ 4 - 0
apps/emqtt/src/emqtt_broker.erl

@@ -0,0 +1,4 @@
+
+-module(emqtt_broker).
+
+

+ 1 - 1
apps/emqtt/src/emqtt_http.erl

@@ -83,7 +83,7 @@ validate(qos, Qos) ->
     (Qos >= ?QOS_0) and (Qos =< ?QOS_2); 
 
 validate(topic, Topic) ->
-    emqtt_topic:validate({publish, Topic}).
+    emqtt_topic:validate({name, Topic}).
 
 int(S) -> list_to_integer(S).
 

+ 1 - 0
apps/emqtt/src/emqtt_plugin.erl

@@ -0,0 +1 @@
+-module(emqtt_plugin).

+ 5 - 5
apps/emqtt/src/emqtt_protocol.erl

@@ -317,7 +317,7 @@ validate_clientid(#mqtt_packet_connect { proto_ver = Ver, clean_sess = CleanSess
 
 validate_packet(#mqtt_packet { header  = #mqtt_packet_header { type = ?PUBLISH }, 
                                variable = #mqtt_packet_publish{ topic_name = Topic }}) ->
-	case emqtt_topic:validate({publish, Topic}) of
+	case emqtt_topic:validate({name, Topic}) of
 	true -> ok;
 	false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
 	end;
@@ -325,21 +325,21 @@ validate_packet(#mqtt_packet { header  = #mqtt_packet_header { type = ?PUBLISH }
 validate_packet(#mqtt_packet { header  = #mqtt_packet_header { type = ?SUBSCRIBE }, 
                                variable = #mqtt_packet_subscribe{topic_table = Topics }}) ->
 
-    validate_topics(subscribe, Topics);
+    validate_topics(filter, Topics);
 
 validate_packet(#mqtt_packet{ header  = #mqtt_packet_header { type = ?UNSUBSCRIBE }, 
                               variable = #mqtt_packet_subscribe{ topic_table = Topics }}) ->
 
-    validate_topics(unsubscribe, Topics);
+    validate_topics(filter, Topics);
 
 validate_packet(_Packet) -> 
     ok.
 
-validate_topics(Type, []) when Type =:= subscribe orelse Type =:= unsubscribe ->
+validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
 	lager:error("Empty Topics!"),
     {error, empty_topics};
 
-validate_topics(Type, Topics) when Type =:= subscribe orelse Type =:= unsubscribe ->
+validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
 	ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
 						not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
 	case ErrTopics of

+ 2 - 2
apps/emqtt/src/emqtt_pubsub.erl

@@ -300,10 +300,10 @@ trie_match(NodeId, [W|Words], ResAcc) ->
 		[#topic_trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc);
 		[] -> Acc
 		end
-	end, 'trie_match_#'(NodeId, ResAcc), [W, <<"+">>]).
+	end, 'trie_match_#'(NodeId, ResAcc), [W, '+']).
 
 'trie_match_#'(NodeId, ResAcc) ->
-	case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word = <<"#">>}) of
+	case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word = '#'}) of
 	[#topic_trie{node_id=ChildId}] ->
 		mnesia:read(topic_trie_node, ChildId) ++ ResAcc;	
 	[] ->

+ 108 - 71
apps/emqtt/src/emqtt_topic.erl

@@ -26,8 +26,6 @@
 
 -import(lists, [reverse/1]).
 
--import(string, [rchr/2, substr/2, substr/3]).
-
 %% ------------------------------------------------------------------------
 %% Topic semantics and usage
 %% ------------------------------------------------------------------------
@@ -50,107 +48,146 @@
 
 -include("emqtt_topic.hrl").
  
--export([new/1,
-		 type/1,
-		 match/2,
-		 validate/1,
-		 triples/1,
-		 words/1]).
+-export([new/1, type/1, match/2, validate/1, triples/1, words/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec new( binary() ) -> topic().
+
+-spec type(topic() | binary()) -> direct | wildcard.
+
+-spec match(binary(), binary()) -> boolean().
 
--define(MAX_LEN, 1024).
+-spec validate({name | filter, binary()}) -> boolean().
 
--spec new(Name :: binary()) -> topic().
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(MAX_TOPIC_LEN, 65535).
+
+%% ------------------------------------------------------------------------
+%% New Topic
+%% ------------------------------------------------------------------------
 new(Name) when is_binary(Name) ->
-	#topic{name=Name, node=node()}.
+	#topic{ name = Name, node = node() }.
 
 %% ------------------------------------------------------------------------
-%% topic type: direct or wildcard
+%% Topic Type: direct or wildcard
 %% ------------------------------------------------------------------------
--spec type(Topic :: topic()) -> direct | wildcard.
-type(#topic{name=Name}) when is_binary(Name) ->
-	type(words(Name));
-type([]) -> 
-	direct;
-type([<<>>|T]) -> 
-	type(T);
-type([<<$#, _/binary>>|_]) ->
-	wildcard;
-type([<<$+, _/binary>>|_]) ->
-	wildcard;
-type([_|T]) ->
-	type(T).
+type(#topic{ name = Name }) when is_binary(Name) ->
+	type(Name);
+type(Topic) when is_binary(Topic) ->
+	type2(words(Topic)).
+
+type2([]) -> 
+    direct;
+type2(['#'|_]) ->
+    wildcard;
+type2(['+'|_]) ->
+    wildcard;
+type2([_H |T]) ->
+    type2(T).
 
 %% ------------------------------------------------------------------------
-%% topic match
+%% Match Topic. B1 is Topic Name, B2 is Topic Filter.
 %% ------------------------------------------------------------------------
--spec match(B1 :: binary(), B2 :: binary()) -> boolean().
-match(B1, B2) when is_binary(B1) and is_binary(B2) ->
-	match(words(B1), words(B2));
+match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
+	match(words(Name), words(Filter));
 match([], []) ->
 	true;
 match([H|T1], [H|T2]) ->
 	match(T1, T2);
-match([_H|T1], [<<"+">>|T2]) ->
+match([<<$$, _/binary>>|_], ['+'|_]) ->
+    false;
+match([_H|T1], ['+'|T2]) ->
 	match(T1, T2);
-match(_, [<<"#">>]) ->
+match([<<$$, _/binary>>|_], ['#']) ->
+    false;
+match(_, ['#']) ->
 	true;
 match([_H1|_], [_H2|_]) ->
 	false;
+match([_H1|_], []) ->
+	false;
 match([], [_H|_T2]) ->
 	false.
 
 %% ------------------------------------------------------------------------
-%% topic validate
+%% Validate Topic 
 %% ------------------------------------------------------------------------
--spec validate({Type :: subscribe | publish, Topic :: binary()}) -> boolean().
 validate({_, <<>>}) ->
 	false;
-validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_LEN) ->
+validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
 	false;
-validate({subscribe, Topic}) when is_binary(Topic) ->
-	valid(words(Topic));
-validate({publish, Topic}) when is_binary(Topic) ->
+validate({filter, Topic}) when is_binary(Topic) ->
+	validate2(words(Topic));
+validate({name, Topic}) when is_binary(Topic) ->
 	Words = words(Topic),
-	valid(Words) and (not include_wildcard(Topic)).
-
-triples(B) when is_binary(B) ->
-	triples(binary_to_list(B), []).
-
-triples(S, Acc) ->
-	triples(rchr(S, $/), S, Acc).
+	validate2(Words) and (not include_wildcard(Words)).
+
+validate2([]) ->
+    true;
+validate2(['#']) -> % end with '#'
+    true;
+validate2(['#'|Words]) when length(Words) > 0 -> 
+    false; 
+validate2([''|Words]) ->
+    validate2(Words);
+validate2(['+'|Words]) ->
+    validate2(Words);
+validate2([W|Words]) ->
+    case validate3(W) of
+        true -> validate2(Words);
+        false -> false
+    end.
+
+validate3(<<>>) ->
+    true;
+validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
+    false;
+validate3(<<_/utf8, Rest/binary>>) ->
+    validate3(Rest).
+
+include_wildcard([])        -> false;
+include_wildcard(['#'|_T])  -> true;
+include_wildcard(['+'|_T])  -> true;
+include_wildcard([ _ | T])  -> include_wildcard(T).
 
-triples(0, S, Acc) ->
-	[{root, l2b(S), l2b(S)}|Acc];
-
-triples(I, S, Acc) ->
-	S1 = substr(S, 1, I-1),
-	S2 = substr(S, I+1),
-	triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]).
-
-words(Topic) when is_binary(Topic) ->
-	words(binary_to_list(Topic), [], []).
+%% ------------------------------------------------------------------------
+%% Topic to Triples
+%% ------------------------------------------------------------------------
+triples(Topic) when is_binary(Topic) ->
+	triples(words(Topic), root, []).
 
-words([], Word, ResAcc) ->
-	reverse([l2b(reverse(W)) || W <- [Word|ResAcc]]);
+triples([], _Parent, Acc) ->
+    reverse(Acc);
 
-words([$/|Topic], Word, ResAcc) ->
-	words(Topic, [], [Word|ResAcc]);
+triples([W|Words], Parent, Acc) ->
+    Node = join(Parent, W),
+    triples(Words, Node, [{Parent, W, Node}|Acc]).
 
-words([C|Topic], Word, ResAcc) ->
-	words(Topic, [C|Word], ResAcc).
+join(root, W) -> 
+    W;
+join(Parent, W) ->
+    <<(bin(Parent))/binary, $/, (bin(W))/binary>>.
 
-valid([<<>>|Words]) -> valid2(Words);
-valid(Words) -> valid2(Words).
+bin('')  -> <<>>;
+bin('+') -> <<"+">>;
+bin('#') -> <<"#">>;
+bin( B ) when is_binary(B) -> B.
 
-valid2([<<>>|_Words]) -> false;
-valid2([<<"#">>|Words]) when length(Words) > 0 -> false; 
-valid2([_|Words]) -> valid2(Words);
-valid2([]) -> true.
+%% ------------------------------------------------------------------------
+%% Split Topic to Words
+%% ------------------------------------------------------------------------
+words(Topic) when is_binary(Topic) ->
+    [word(W) || W <- binary:split(Topic, <<"/">>, [global])].
 
-include_wildcard(<<>>) -> false;
-include_wildcard(<<$#, _T/binary>>) -> true;
-include_wildcard(<<$+, _T/binary>>) -> true;
-include_wildcard(<<_H, T/binary>>) -> include_wildcard(T).
+word(<<>>)    -> '';
+word(<<"+">>) -> '+';
+word(<<"#">>) -> '#';
+word(Bin)     -> Bin.
 
-l2b(L) -> list_to_binary(L).
 

+ 76 - 5
apps/emqtt/test/emqtt_topic_tests.erl

@@ -30,17 +30,88 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
+-define(N, 100000).
+
 validate_test() ->
-	?assert( validate({subscribe, <<"a/b/c">>}) ),
-	?assert( validate({subscribe, <<"/a/b">>}) ),
-	?assert( validate({subscribe, <<"/+/x">>}) ),
-	?assert( validate({subscribe, <<"/a/b/c/#">>}) ),
-	?assertNot( validate({subscribe, <<"a/#/c">>}) ).
+	?assert( validate({filter, <<"sport/tennis/#">>}) ),
+	?assert( validate({filter, <<"a/b/c">>}) ),
+	?assert( validate({filter, <<"/a/b">>}) ),
+	?assert( validate({filter, <<"/+/x">>}) ),
+	?assert( validate({filter, <<"/a/b/c/#">>}) ),
+	?assertNot( validate({filter, <<"a/#/c">>}) ),
+	?assertNot( validate({filter, <<"sport/tennis#">>}) ),
+	?assertNot( validate({filter, <<"sport/tennis/#/ranking">>}) ).
+
+sigle_level_validate_test() ->
+    ?assert( validate({filter, <<"+">>}) ),
+    ?assert( validate({filter, <<"+/tennis/#">>}) ),
+    ?assertNot( validate({filter, <<"sport+">>}) ),
+    ?assert( validate({filter, <<"sport/+/player1">>}) ).
+
+match_test() ->
+    ?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/player1/#">>) ),
+    ?assert( match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/player1/#">>) ),
+    ?assert( match(<<"sport/tennis/player1/score/wimbledon">>, <<"sport/tennis/player1/#">>) ),
+
+    ?assert( match(<<"sport">>, <<"sport/#">>) ),
+    ?assert( match(<<"sport">>, <<"#">>) ),
+    ?assert( match(<<"/sport/football/score/1">>, <<"#">>) ).
+
+sigle_level_match_test() ->
+    ?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>) ),
+    ?assertNot( match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/+">>) ),
+    ?assertNot( match(<<"sport">>, <<"sport/+">>) ),
+    ?assert( match(<<"sport/">>, <<"sport/+">>) ),
+    ?assert( match(<<"/finance">>, <<"+/+">>) ),
+    ?assert( match(<<"/finance">>, <<"/+">>) ),
+    ?assertNot( match(<<"/finance">>, <<"+">>) ).
+
+sys_match_test() ->
+    ?assert( match(<<"$SYS/borker/clients/testclient">>, <<"$SYS/#">>) ),
+    ?assert( match(<<"$SYS/borker">>, <<"$SYS/+">>) ),
+    ?assertNot( match(<<"$SYS/borker">>, <<"+/+">>) ),
+    ?assertNot( match(<<"$SYS/borker">>, <<"#">>) ).
+
+match_perf_test() ->
+    ?assert( match(<<"a/b/ccc">>, <<"a/#">>) ),
+    Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
+    Filter = <<"/abkc/19383/+/akakdkkdkak/#">>,
+    ?assert( match(Name, Filter) ),
+    %?debugFmt("Match ~p with ~p", [Name, Filter]),
+    {Time, _} = timer:tc(fun() -> 
+                [match(Name, Filter) || _I <- lists:seq(1, ?N)]
+        end),
+    ?debugFmt("Time for match: ~p(micro)", [Time/?N]),
+    ok.
+
+triples_test() ->
+    Triples = [{root, <<"a">>, <<"a">>}, {<<"a">>, <<"b">>, <<"a/b">>}],
+    ?assertMatch(Triples, triples(<<"a/b">>) ). 
+
+triples_perf_test() ->
+    Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
+    {Time, _} = timer:tc(fun() -> 
+                [triples(Topic) || _I <- lists:seq(1, ?N)]
+        end),
+    ?debugFmt("Time for triples: ~p(micro)", [Time/?N]),
+    ok.
 
 type_test() ->
 	?assertEqual(direct, type(#topic{name = <<"/a/b/cdkd">>})),
 	?assertEqual(wildcard, type(#topic{name = <<"/a/+/d">>})),
 	?assertEqual(wildcard, type(#topic{name = <<"/a/b/#">>})).
 
+words_test() ->
+    ?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'],  words(<<"/abkc/19383/+/akakdkkdkak/#">>)),
+    {Time, _} = timer:tc(fun() -> 
+                [words(<<"/abkc/19383/+/akakdkkdkak/#">>) || _I <- lists:seq(1, ?N)]
+        end),
+    ?debugFmt("Time for words: ~p(micro)", [Time/?N]),
+    {Time2, _} = timer:tc(fun() -> 
+                [binary:split(<<"/abkc/19383/+/akakdkkdkak/#">>, <<"/">>, [global]) || _I <- lists:seq(1, ?N)]
+        end),
+    ?debugFmt("Time for binary:split: ~p(micro)", [Time2/?N]),
+    ok.
+
 -endif.
 

+ 20 - 0
doc/pubsub.md

@@ -17,3 +17,23 @@ PubQos | SubQos | In Message | Out Message
 ## Publish
 
 
+## Performance
+
+Mac Air(11): 
+
+Function     | Time(microseconds)
+-------------|--------------------
+match        | 6.25086
+triples      | 13.86881
+words        | 3.41177
+binary:split | 3.03776
+
+iMac:
+
+Function     | Time(microseconds)
+-------------|--------------------
+match        | 3.2348
+triples      | 6.93524
+words        | 1.89616
+binary:split | 1.65243
+