Ery Lee 11 лет назад
Родитель
Сommit
c799e06b8b
37 измененных файлов с 694 добавлено и 397 удалено
  1. 5 0
      CHANGELOG.md
  2. 1 1
      apps/emqttd/include/emqttd.hrl
  3. 1 1
      apps/emqttd/src/emqttd.app.src
  4. 6 2
      apps/emqttd/src/emqttd_access_rule.erl
  5. 6 6
      apps/emqttd/src/emqttd_acl_internal.erl
  6. 19 43
      apps/emqttd/src/emqttd_app.erl
  7. 13 13
      apps/emqttd/src/emqttd_auth_clientid.erl
  8. 12 12
      apps/emqttd/src/emqttd_auth_username.erl
  9. 1 3
      apps/emqttd/src/emqttd_bridge.erl
  10. 15 15
      apps/emqttd/src/emqttd_broker.erl
  11. 12 12
      apps/emqttd/src/emqttd_cm.erl
  12. 2 2
      apps/emqttd/src/emqttd_event.erl
  13. 4 4
      apps/emqttd/src/emqttd_http.erl
  14. 12 11
      apps/emqttd/src/emqttd_metrics.erl
  15. 1 1
      apps/emqttd/src/emqttd_msg_store.erl
  16. 90 0
      apps/emqttd/src/emqttd_pooler.erl
  17. 57 0
      apps/emqttd/src/emqttd_pooler_sup.erl
  18. 1 1
      apps/emqttd/src/emqttd_protocol.erl
  19. 178 164
      apps/emqttd/src/emqttd_pubsub.erl
  20. 56 0
      apps/emqttd/src/emqttd_pubsub_sup.erl
  21. 0 94
      apps/emqttd/src/emqttd_router.erl
  22. 3 3
      apps/emqttd/src/emqttd_session.erl
  23. 6 4
      apps/emqttd/src/emqttd_sm.erl
  24. 3 1
      apps/emqttd/src/emqttd_sysmon.erl
  25. 15 1
      apps/emqttd/src/emqttd_utils.erl
  26. 1 0
      doc/pubsub.md
  27. 19 0
      doc/route.md
  28. 51 0
      plugins/emqttd_plugin_demo/src/emqttd_demo_acl.erl
  29. 41 0
      plugins/emqttd_plugin_demo/src/emqttd_demo_auth.erl
  30. 12 0
      plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src
  31. 16 0
      plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl
  32. 27 0
      plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl
  33. 2 1
      rebar.config
  34. 1 1
      rel/files/acl.config
  35. 2 0
      rel/files/app.config
  36. 1 1
      rel/files/vm.args
  37. 2 0
      rel/reltool.config

+ 5 - 0
CHANGELOG.md

@@ -2,6 +2,11 @@
 eMQTTD ChangeLog
 ==================
 
+0.6.1-alpha (2015-04-20)
+-------------------------
+
+Redesign PUBSUB
+
 0.6.0-alpha (2015-04-08)
 -------------------------
 

+ 1 - 1
apps/emqttd/include/emqttd.hrl

@@ -57,7 +57,7 @@
 -record(mqtt_subscriber, {
     topic    :: binary(),
     qos = 0  :: 0 | 1 | 2,
-    subpid   :: pid()
+    pid      :: pid()
 }).
 
 -type mqtt_subscriber() :: #mqtt_subscriber{}.

+ 1 - 1
apps/emqttd/src/emqttd.app.src

@@ -1,7 +1,7 @@
 {application, emqttd,
  [
   {description, "Erlang MQTT Broker"},
-  {vsn, "0.6.0"},
+  {vsn, "0.6.1"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 6 - 2
apps/emqttd/src/emqttd_access_rule.erl

@@ -58,7 +58,7 @@ compile({A, all}) when (A =:= allow) orelse (A =:= deny) ->
     {A, all};
 
 compile({A, Who, Access, TopicFilters}) when (A =:= allow) orelse (A =:= deny) ->
-    {A, compile(who, Who), Access, [compile(topic, bin(Topic)) || Topic <- TopicFilters]}.
+    {A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}.
 
 compile(who, all) -> 
     all;
@@ -74,8 +74,10 @@ compile(who, {user, all}) ->
 compile(who, {user, Username}) ->
     {user, bin(Username)};
 
+compile(topic, {eq, Topic}) ->
+    {eq, emqtt_topic:words(bin(Topic))};
 compile(topic, Topic) ->
-    Words = emqtt_topic:words(Topic),
+    Words = emqtt_topic:words(bin(Topic)),
     case 'pattern?'(Words) of
         true -> {pattern, Words};
         false -> Words
@@ -138,6 +140,8 @@ match_topics(Client, Topic, [TopicFilter|Filters]) ->
     false -> match_topics(Client, Topic, Filters)
     end.
 
+match_topic(Topic, {eq, TopicFilter}) ->
+    Topic =:= TopicFilter;
 match_topic(Topic, TopicFilter) ->
     emqtt_topic:match(Topic, TopicFilter).
 

+ 6 - 6
apps/emqttd/src/emqttd_acl_internal.erl

@@ -37,7 +37,7 @@
 %% ACL callbacks
 -export([init/1, check_acl/2, reload_acl/1, description/0]).
 
--define(ACL_RULE_TABLE, mqtt_acl_rule).
+-define(ACL_RULE_TAB, mqtt_acl_rule).
 
 -record(state, {acl_file, nomatch = allow}).
 
@@ -48,7 +48,7 @@
 %% @doc Read all rules.
 -spec all_rules() -> list(emqttd_access_rule:rule()).
 all_rules() ->
-    case ets:lookup(?ACL_RULE_TABLE, all_rules) of
+    case ets:lookup(?ACL_RULE_TAB, all_rules) of
         [] -> [];
         [{_, Rules}] -> Rules
     end.
@@ -60,7 +60,7 @@ all_rules() ->
 %% @doc init internal ACL.
 -spec init(AclOpts :: list()) -> {ok, State :: any()}.
 init(AclOpts) ->
-    ets:new(?ACL_RULE_TABLE, [set, public, named_table, {read_concurrency, true}]),
+    ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
     AclFile = proplists:get_value(file, AclOpts),
     Default = proplists:get_value(nomatch, AclOpts, allow),
     State = #state{acl_file = AclFile, nomatch = Default},
@@ -71,10 +71,10 @@ load_rules(#state{acl_file = AclFile}) ->
     {ok, Terms} = file:consult(AclFile),
     Rules = [emqttd_access_rule:compile(Term) || Term <- Terms],
     lists:foreach(fun(PubSub) ->
-        ets:insert(?ACL_RULE_TABLE, {PubSub,
+        ets:insert(?ACL_RULE_TAB, {PubSub,
             lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
         end, [publish, subscribe]),
-    ets:insert(?ACL_RULE_TABLE, {all_rules, Terms}).
+    ets:insert(?ACL_RULE_TAB, {all_rules, Terms}).
 
 filter(_PubSub, {allow, all}) ->
     true;
@@ -103,7 +103,7 @@ check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) ->
     end.
 
 lookup(PubSub) ->
-    case ets:lookup(?ACL_RULE_TABLE, PubSub) of
+    case ets:lookup(?ACL_RULE_TAB, PubSub) of
         [] -> [];
         [{PubSub, Rules}] -> Rules
     end.

+ 19 - 43
apps/emqttd/src/emqttd_app.erl

@@ -33,19 +33,6 @@
 %% Application callbacks
 -export([start/2, stop/1]).
 
-%% Servers
--define(SERVERS, [config,
-                  event,
-                  client,
-                  session,
-                  pubsub,
-                  router,
-                  broker,
-                  metrics,
-                  bridge,
-                  access_control,
-                  sysmon]).
-
 -define(PRINT_MSG(Msg), io:format(Msg)).
 
 -define(PRINT(Format, Args), io:format(Format, Args)).
@@ -79,7 +66,25 @@ print_vsn() ->
 	?PRINT("~s ~s is running now~n", [Desc, Vsn]).
 
 start_servers(Sup) ->
-    Servers = lists:flatten([server(Srv) || Srv <- ?SERVERS]),
+    {ok, SessOpts} = application:get_env(session),
+    {ok, PubSubOpts} = application:get_env(pubsub),
+    {ok, BrokerOpts} = application:get_env(broker),
+    {ok, MetricOpts} = application:get_env(metrics),
+    {ok, AccessOpts} = application:get_env(access_control),
+    Servers = [
+            {"emqttd config", emqttd_config},
+            {"emqttd event", emqttd_event},
+            {"emqttd pooler", {supervisor, emqttd_pooler_sup}},
+            {"emqttd client manager", emqttd_cm},
+            {"emqttd session manager", emqttd_sm},
+            {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
+            {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
+            %{"emqttd router", emqttd_router},
+            {"emqttd broker", emqttd_broker, BrokerOpts},
+            {"emqttd metrics", emqttd_metrics, MetricOpts},
+            {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
+            {"emqttd access control", emqttd_access_control, AccessOpts},
+            {"emqttd system monitor", emqttd_sysmon}],
     [start_server(Sup, Server) || Server <- Servers].
 
 start_server(_Sup, {Name, F}) when is_function(F) ->
@@ -97,35 +102,6 @@ start_server(Sup, {Name, Server, Opts}) ->
     start_child(Sup, Server, Opts),
     ?PRINT_MSG("[done]~n").
 
-%%TODO: redesign later...
-server(config) ->
-    {"emqttd config", emqttd_config};
-server(event) ->
-    {"emqttd event", emqttd_event}; 
-server(client) ->
-    {"emqttd client manager", emqttd_cm};
-server(session) ->
-    {ok, SessOpts} = application:get_env(session),
-    [{"emqttd session manager", emqttd_sm},
-     {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}];
-server(pubsub) ->
-    {"emqttd pubsub", emqttd_pubsub};
-server(router) ->
-    {"emqttd router", emqttd_router};
-server(broker) ->
-    {ok, BrokerOpts} = application:get_env(broker),
-    {"emqttd broker", emqttd_broker, BrokerOpts};
-server(metrics) ->
-    {ok, MetricOpts} = application:get_env(metrics),
-    {"emqttd metrics", emqttd_metrics, MetricOpts};
-server(bridge) ->
-    {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}};
-server(access_control) ->
-    {ok, AcOpts} = application:get_env(access_control),
-    {"emqttd access control", emqttd_access_control, AcOpts};
-server(sysmon) ->
-    {"emqttd system monitor", emqttd_sysmon}.
-
 start_child(Sup, {supervisor, Name}) ->
     supervisor:start_child(Sup, supervisor_spec(Name));
 start_child(Sup, Name) when is_atom(Name) ->

+ 13 - 13
apps/emqttd/src/emqttd_auth_clientid.erl

@@ -39,9 +39,9 @@
 %% emqttd_auth callbacks
 -export([init/1, check/3, description/0]).
 
--define(AUTH_CLIENTID_TABLE, mqtt_auth_clientid).
+-define(AUTH_CLIENTID_TAB, mqtt_auth_clientid).
 
--record(?AUTH_CLIENTID_TABLE, {clientid, ipaddr, password}).
+-record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}).
 
 add_clientid(ClientId) when is_binary(ClientId) ->
     R = #mqtt_auth_clientid{clientid = ClientId},
@@ -52,19 +52,19 @@ add_clientid(ClientId, Password) ->
     mnesia:transaction(fun() -> mnesia:write(R) end).
 
 lookup_clientid(ClientId) ->
-	mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId).
+	mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
 
 all_clientids() ->
-	mnesia:dirty_all_keys(?AUTH_CLIENTID_TABLE).
+	mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
 
 remove_clientid(ClientId) ->
-    mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TABLE, ClientId}) end).
+    mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end).
 
 init(Opts) ->
-	mnesia:create_table(?AUTH_CLIENTID_TABLE, [
+	mnesia:create_table(?AUTH_CLIENTID_TAB, [
 		{ram_copies, [node()]},
-		{attributes, record_info(fields, ?AUTH_CLIENTID_TABLE)}]),
-	mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), ram_copies),
+		{attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]),
+	mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
     case proplists:get_value(file, Opts) of
         undefined -> ok;
         File -> load(File)
@@ -80,9 +80,9 @@ check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password,
 check(_Client, undefined, [{password, yes}|_]) ->
     {error, "Password undefined"};
 check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) ->
-    case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of
+    case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
         [] -> {error, "ClientId Not Found"};
-        [#?AUTH_CLIENTID_TABLE{password = Password}]  -> ok; %% TODO: plaintext??
+        [#?AUTH_CLIENTID_TAB{password = Password}]  -> ok; %% TODO: plaintext??
         _ -> {error, "Password Not Right"}
     end.
 
@@ -118,10 +118,10 @@ load(Fd, eof, Clients) ->
     file:close(Fd).
 
 check_clientid_only(ClientId, IpAddr) ->
-    case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of
+    case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
         [] -> {error, "ClientId Not Found"};
-        [#?AUTH_CLIENTID_TABLE{ipaddr = undefined}]  -> ok;
-        [#?AUTH_CLIENTID_TABLE{ipaddr = {_, {Start, End}}}] ->
+        [#?AUTH_CLIENTID_TAB{ipaddr = undefined}]  -> ok;
+        [#?AUTH_CLIENTID_TAB{ipaddr = {_, {Start, End}}}] ->
             I = esockd_access:atoi(IpAddr),
             case I >= Start andalso I =< End of
                 true -> ok;

+ 12 - 12
apps/emqttd/src/emqttd_auth_username.erl

@@ -38,35 +38,35 @@
 %% emqttd_auth callbacks
 -export([init/1, check/3, description/0]).
 
--define(AUTH_USERNAME_TABLE, mqtt_auth_username).
+-define(AUTH_USERNAME_TAB, mqtt_auth_username).
 
--record(?AUTH_USERNAME_TABLE, {username, password}).
+-record(?AUTH_USERNAME_TAB, {username, password}).
 
 %%%=============================================================================
 %%% API 
 %%%=============================================================================
 
 add_user(Username, Password) ->
-    R = #?AUTH_USERNAME_TABLE{username = Username, password = hash(Password)},
+    R = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
     mnesia:transaction(fun() -> mnesia:write(R) end).
 
 lookup_user(Username) ->
-    mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username).
+    mnesia:dirty_read(?AUTH_USERNAME_TAB, Username).
 
 remove_user(Username) ->
-    mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TABLE, Username}) end).
+    mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TAB, Username}) end).
 
 all_users() ->
-    mnesia:dirty_all_keys(?AUTH_USERNAME_TABLE).
+    mnesia:dirty_all_keys(?AUTH_USERNAME_TAB).
 
 %%%=============================================================================
 %%% emqttd_auth callbacks
 %%%=============================================================================
 init(Opts) ->
-	mnesia:create_table(?AUTH_USERNAME_TABLE, [
-		{ram_copies, [node()]},
-		{attributes, record_info(fields, ?AUTH_USERNAME_TABLE)}]),
-	mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), ram_copies),
+	mnesia:create_table(?AUTH_USERNAME_TAB, [
+		{disc_copies, [node()]},
+		{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
+	mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies),
     {ok, Opts}.
 
 check(#mqtt_client{username = undefined}, _Password, _Opts) ->
@@ -74,10 +74,10 @@ check(#mqtt_client{username = undefined}, _Password, _Opts) ->
 check(_User, undefined, _Opts) ->
     {error, "Password undefined"};
 check(#mqtt_client{username = Username}, Password, _Opts) ->
-	case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of
+	case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of
         [] -> 
             {error, "Username Not Found"};
-        [#?AUTH_USERNAME_TABLE{password = <<Salt:4/binary, Hash/binary>>}] ->
+        [#?AUTH_USERNAME_TAB{password = <<Salt:4/binary, Hash/binary>>}] ->
             case Hash =:= md5_hash(Salt, Password) of
                 true -> ok;
                 false -> {error, "Password Not Right"}

+ 1 - 3
apps/emqttd/src/emqttd_bridge.erl

@@ -113,7 +113,7 @@ handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down}
     {noreply, State};
 
 handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) ->
-    rpc:cast(Node, emqttd_router, route, [transform(Msg, State)]),
+    rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
     {noreply, State};
 
 handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
@@ -172,5 +172,3 @@ transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos,
     end,
     Msg1#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.
 
-
-

+ 15 - 15
apps/emqttd/src/emqttd_broker.erl

@@ -34,7 +34,7 @@
 
 -define(SERVER, ?MODULE).
 
--define(BROKER_TABLE, mqtt_broker).
+-define(BROKER_TAB, mqtt_broker).
 
 %% API Function Exports
 -export([start_link/1]).
@@ -115,7 +115,7 @@ datetime() ->
 %%------------------------------------------------------------------------------
 -spec getstats() -> [{atom(), non_neg_integer()}].
 getstats() ->
-    ets:tab2list(?BROKER_TABLE).
+    ets:tab2list(?BROKER_TAB).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -125,7 +125,7 @@ getstats() ->
 %%------------------------------------------------------------------------------
 -spec getstat(atom()) -> non_neg_integer() | undefined.
 getstat(Name) ->
-    case ets:lookup(?BROKER_TABLE, Name) of
+    case ets:lookup(?BROKER_TAB, Name) of
         [{Name, Val}] -> Val;
         [] -> undefined
     end.
@@ -138,7 +138,7 @@ getstat(Name) ->
 %%------------------------------------------------------------------------------
 -spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean().
 setstat(Stat, Val) ->
-    ets:update_element(?BROKER_TABLE, Stat, {2, Val}).
+    ets:update_element(?BROKER_TAB, Stat, {2, Val}).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -148,13 +148,13 @@ setstat(Stat, Val) ->
 %%------------------------------------------------------------------------------
 -spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
 setstats(Stat, MaxStat, Val) ->
-    MaxVal = ets:lookup_element(?BROKER_TABLE, MaxStat, 2),
+    MaxVal = ets:lookup_element(?BROKER_TAB, MaxStat, 2),
     if
         Val > MaxVal -> 
-            ets:update_element(?BROKER_TABLE, MaxStat, {2, Val});
+            ets:update_element(?BROKER_TAB, MaxStat, {2, Val});
         true -> ok
     end,
-    ets:update_element(?BROKER_TABLE, Stat, {2, Val}).
+    ets:update_element(?BROKER_TAB, Stat, {2, Val}).
 
 %%%=============================================================================
 %%% gen_server callbacks
@@ -162,9 +162,9 @@ setstats(Stat, MaxStat, Val) ->
 
 init([Options]) ->
     random:seed(now()),
-    ets:new(?BROKER_TABLE, [set, public, named_table, {write_concurrency, true}]),
+    ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]),
     Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
-    [ets:insert(?BROKER_TABLE, {Topic, 0}) || Topic <- Topics],
+    [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics],
     % Create $SYS Topics
     [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
     [ok = create(systop(Topic)) || Topic <- Topics],
@@ -191,7 +191,7 @@ handle_info(tick, State) ->
     publish(systop(uptime), list_to_binary(uptime(State))),
     publish(systop(datetime), list_to_binary(datetime())),
     [publish(systop(Stat), i2b(Val)) 
-        || {Stat, Val} <- ets:tab2list(?BROKER_TABLE)],
+        || {Stat, Val} <- ets:tab2list(?BROKER_TAB)],
     {noreply, tick(State), hibernate};
 
 handle_info(_Info, State) ->
@@ -214,13 +214,13 @@ create(Topic) ->
     emqttd_pubsub:create(Topic).
 
 retain(Topic, Payload) when is_binary(Payload) ->
-    emqttd_router:route(broker, #mqtt_message{retain = true,
-                                              topic = Topic,
-                                              payload = Payload}).
+    emqttd_pubsub:publish(broker, #mqtt_message{retain = true,
+                                                topic = Topic,
+                                                payload = Payload}).
 
 publish(Topic, Payload) when is_binary(Payload) ->
-    emqttd_router:route(broker, #mqtt_message{topic = Topic,
-                                              payload = Payload}).
+    emqttd_pubsub:publish(broker, #mqtt_message{topic = Topic,
+                                                payload = Payload}).
 
 uptime(#state{started_at = Ts}) ->
     Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,

+ 12 - 12
apps/emqttd/src/emqttd_cm.erl

@@ -32,8 +32,6 @@
 
 -define(SERVER, ?MODULE).
 
--define(CLIENT_TABLE, mqtt_client).
-
 %% API Exports 
 -export([start_link/0]).
 
@@ -52,6 +50,8 @@
 
 -record(state, {tab}).
 
+-define(CLIENT_TAB, mqtt_client).
+
 %%%=============================================================================
 %%% API
 %%%=============================================================================
@@ -74,7 +74,7 @@ start_link() ->
 %%------------------------------------------------------------------------------
 -spec lookup(ClientId :: binary()) -> pid() | undefined.
 lookup(ClientId) when is_binary(ClientId) ->
-	case ets:lookup(?CLIENT_TABLE, ClientId) of
+	case ets:lookup(?CLIENT_TAB, ClientId) of
 	[{_, Pid, _}] -> Pid;
 	[] -> undefined
 	end.
@@ -87,7 +87,7 @@ lookup(ClientId) when is_binary(ClientId) ->
 register(ClientId) when is_binary(ClientId) ->
     Pid = self(),
     %% this is atomic
-    case ets:insert_new(?CLIENT_TABLE, {ClientId, Pid, undefined}) of
+    case ets:insert_new(?CLIENT_TAB, {ClientId, Pid, undefined}) of
         true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid});
         false -> gen_server:cast(?SERVER, {register, ClientId, Pid})
     end.
@@ -117,10 +117,10 @@ getstats() ->
 %%%=============================================================================
 
 init([]) ->
-    TabId = ets:new(?CLIENT_TABLE, [set,
-                                    named_table,
-                                    public,
-                                    {write_concurrency, true}]),
+    TabId = ets:new(?CLIENT_TAB, [set,
+                                  named_table,
+                                  public,
+                                  {write_concurrency, true}]),
     {ok, #state{tab = TabId}}.
 
 handle_call(Req, _From, State) ->
@@ -144,10 +144,10 @@ handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) ->
     {noreply, setstats(State)};
 
 handle_cast({unregister, ClientId, Pid}, State) ->
-	case ets:lookup(?CLIENT_TABLE, ClientId) of
+	case ets:lookup(?CLIENT_TAB, ClientId) of
 	[{_, Pid, MRef}] ->
 		erlang:demonitor(MRef, [flush]),
-		ets:delete(?CLIENT_TABLE, ClientId);
+		ets:delete(?CLIENT_TAB, ClientId);
 	[_] -> 
 		ignore;
 	[] ->
@@ -159,7 +159,7 @@ handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
-	ets:match_delete(?CLIENT_TABLE, {'_', DownPid, MRef}),
+	ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
     {noreply, setstats(State)};
 
 handle_info(_Info, State) ->
@@ -191,6 +191,6 @@ registerd(Tab, {ClientId, Pid}) ->
 setstats(State) ->
     emqttd_broker:setstats('clients/count',
                            'clients/max',
-                           ets:info(?CLIENT_TABLE, size)), State.
+                           ets:info(?CLIENT_TAB, size)), State.
 
 

+ 2 - 2
apps/emqttd/src/emqttd_event.erl

@@ -75,13 +75,13 @@ init([]) ->
 handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
     Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>,
     Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)},
-    emqttd_router:route(event, Msg),
+    emqttd_pubsub:publish(event, Msg),
     {ok, State};
 
 handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) ->
     Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
     Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
-    emqttd_router:route(event, Msg),
+    emqttd_pubsub:publish(event, Msg),
     {ok, State};
 
 handle_event({subscribed, ClientId, TopicTable}, State) ->

+ 4 - 4
apps/emqttd/src/emqttd_http.erl

@@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) ->
     Message = list_to_binary(get_value("message", Params)),
     case {validate(qos, Qos), validate(topic, Topic)} of
         {true, true} ->
-            emqttd_router:route(http, #mqtt_message{qos     = Qos,
-                                                    retain  = Retain,
-                                                    topic   = Topic,
-                                                    payload = Message}),
+            emqttd_pubsub:publish(http, #mqtt_message{qos     = Qos,
+                                                      retain  = Retain,
+                                                      topic   = Topic,
+                                                      payload = Message}),
             Req:ok({"text/plan", <<"ok\n">>});
        {false, _} ->
             Req:respond({400, [], <<"Bad QoS">>});

+ 12 - 11
apps/emqttd/src/emqttd_metrics.erl

@@ -36,7 +36,7 @@
 
 -define(SERVER, ?MODULE).
 
--define(METRIC_TABLE, mqtt_broker_metric).
+-define(METRIC_TAB, mqtt_broker_metric).
 
 %% API Function Exports
 -export([start_link/1]).
@@ -81,7 +81,7 @@ all() ->
                         {ok, Count} -> maps:put(Metric, Count+Val, Map);
                         error -> maps:put(Metric, Val, Map)
                     end
-            end, #{}, ?METRIC_TABLE)).
+            end, #{}, ?METRIC_TAB)).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -91,7 +91,7 @@ all() ->
 %%------------------------------------------------------------------------------
 -spec value(atom()) -> non_neg_integer().
 value(Metric) ->
-    lists:sum(ets:select(?METRIC_TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
+    lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -125,9 +125,9 @@ inc(Metric, Val) when is_atom(Metric) and is_integer(Val) ->
 %%------------------------------------------------------------------------------
 -spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer().
 inc(gauge, Metric, Val) ->
-    ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, Val});
+    ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val});
 inc(counter, Metric, Val) ->
-    ets:update_counter(?METRIC_TABLE, key(counter, Metric), {2, Val}).
+    ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -147,7 +147,7 @@ dec(gauge, Metric) ->
 %%------------------------------------------------------------------------------
 -spec dec(gauge, atom(), pos_integer()) -> integer().
 dec(gauge, Metric, Val) ->
-    ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, -Val}).
+    ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -158,7 +158,7 @@ dec(gauge, Metric, Val) ->
 set(Metric, Val) when is_atom(Metric) ->
     set(gauge, Metric, Val).
 set(gauge, Metric, Val) ->
-    ets:insert(?METRIC_TABLE, {key(gauge, Metric), Val}).
+    ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}).
 
 %%------------------------------------------------------------------------------
 %% @doc
@@ -180,7 +180,7 @@ init([Options]) ->
     random:seed(now()),
     Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
     % Create metrics table
-    ets:new(?METRIC_TABLE, [set, public, named_table, {write_concurrency, true}]),
+    ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
     % Init metrics
     [new_metric(Metric) ||  Metric <- Metrics],
     % $SYS Topics for metrics
@@ -220,14 +220,15 @@ systop(Name) when is_atom(Name) ->
     list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
 
 publish(Topic, Payload) ->
-    emqttd_router:route(metrics, #mqtt_message{topic = Topic, payload = Payload}).
+    emqttd_pubsub:publish(metrics, #mqtt_message{topic = Topic,
+                                                 payload = Payload}).
 
 new_metric({gauge, Name}) ->
-    ets:insert(?METRIC_TABLE, {{Name, 0}, 0});
+    ets:insert(?METRIC_TAB, {{Name, 0}, 0});
 
 new_metric({counter, Name}) ->
     Schedulers = lists:seq(1, erlang:system_info(schedulers)),
-    [ets:insert(?METRIC_TABLE, {{Name, I}, 0}) || I <- Schedulers].
+    [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
 
 tick(State = #state{pub_interval = PubInterval}) ->
     tick(PubInterval, State).

+ 1 - 1
apps/emqttd/src/emqttd_msg_store.erl

@@ -84,7 +84,7 @@ retain(Msg = #mqtt_message{topic = Topic,
             lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
        {_, false}->
             lager:error("Dropped retained message(topic=~s, payload=~p) for payload is too big!", [Topic, size(Payload)])
-    end.
+    end, ok.
 
 limit(table) ->
     proplists:get_value(max_message_num, env());

+ 90 - 0
apps/emqttd/src/emqttd_pooler.erl

@@ -0,0 +1,90 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd pooler.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_pooler).
+
+-author('feng@emqtt.io').
+
+-behaviour(gen_server).
+
+%% API Exports 
+-export([start_link/1, submit/1, async_submit/1]).
+
+%% gen_server Function Exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {id}).
+
+%%%=============================================================================
+%%% API
+%%%=============================================================================
+-spec start_link(I :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
+start_link(I) ->
+    gen_server:start_link(?MODULE, [I], []).
+
+submit(Fun) ->
+   gen_server:call(gproc_pool:pick(pooler), {submit, Fun}, infinity).
+
+async_submit(Fun) ->
+    gen_server:cast(gproc_pool:pick(pooler), {async_submit, Fun}).
+
+init([I]) ->
+    gproc_pool:connect_worker(pooler, {pooler, I}),
+    {ok, #state{id = I}}.
+
+handle_call({submit, Fun}, _From, State) ->
+    {reply, run(Fun), State};
+
+handle_call(_Req, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast({async_submit, Fun}, State) ->
+    run(Fun),
+    {noreply, State};
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #state{id = I}) ->
+    gproc_pool:disconnect_worker(pooler, {pooler, I}), ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%=============================================================================
+%%% Internal functions
+%%%=============================================================================
+
+run({M, F, A}) ->
+    erlang:apply(M, F, A);
+run(Fun) when is_function(Fun) ->
+    Fun().
+
+

+ 57 - 0
apps/emqttd/src/emqttd_pooler_sup.erl

@@ -0,0 +1,57 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd pooler supervisor.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_pooler_sup).
+
+-author('feng@emqtt.io').
+
+-include("emqttd.hrl").
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0, start_link/1]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+start_link() ->
+    start_link(erlang:system_info(schedulers)).
+
+start_link(PoolSize) ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, [PoolSize]).
+
+init([PoolSize]) ->
+    gproc_pool:new(pooler, random, [{size, PoolSize}]),
+    Children = lists:map(
+                 fun(I) ->
+                         gproc_pool:add_worker(pooler, {pooler, I}, I),
+                         {{emqttd_pooler, I},
+                            {emqttd_pooler, start_link, [I]},
+                                permanent, 5000, worker, [emqttd_pooler]}
+                 end, lists:seq(1, PoolSize)),
+    {ok, {{one_for_all, 10, 100}, Children}}.
+

+ 1 - 1
apps/emqttd/src/emqttd_protocol.erl

@@ -266,7 +266,7 @@ send_willmsg(_ClientId, undefined) ->
     ignore;
 %%TODO:should call session...
 send_willmsg(ClientId, WillMsg) -> 
-    emqttd_router:route(ClientId, WillMsg).
+    emqttd_pubsub:publish(ClientId, WillMsg).
 
 start_keepalive(0) -> ignore;
 start_keepalive(Sec) when Sec > 0 ->

+ 178 - 164
apps/emqttd/src/emqttd_pubsub.erl

@@ -28,15 +28,11 @@
 
 -author('feng@emqtt.io').
 
--include_lib("emqtt/include/emqtt.hrl").
-
 -include("emqttd.hrl").
 
--behaviour(gen_server).
-
--define(SERVER, ?MODULE).
+-include_lib("emqtt/include/emqtt.hrl").
 
--define(SUBACK_ERR, 128).
+-include_lib("emqtt/include/emqtt_packet.hrl").
 
 %% Mnesia Callbacks
 -export([mnesia/1]).
@@ -44,13 +40,15 @@
 -boot_mnesia({mnesia, [boot]}).
 -copy_mnesia({mnesia, [copy]}).
 
+-behaviour(gen_server).
+
 %% API Exports 
--export([start_link/0]).
+-export([start_link/2]).
 
 -export([create/1,
-         subscribe/1, subscribe/2,
+         subscribe/1,
          unsubscribe/1,
-         publish/1, publish/2,
+         publish/2,
          %local node
          dispatch/2, match/1]).
 
@@ -58,7 +56,9 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {submap :: map()}).
+-define(POOL, pubsub).
+
+-record(state, {id, submap :: map()}).
 
 %%%=============================================================================
 %%% Mnesia callbacks
@@ -76,7 +76,7 @@ mnesia(boot) ->
                 {ram_copies, [node()]},
                 {record_name, mqtt_subscriber},
                 {attributes, record_info(fields, mqtt_subscriber)},
-                {index, [subpid]},
+                {index, [pid]},
                 {local_content, true}]);
 
 mnesia(copy) ->
@@ -88,96 +88,71 @@ mnesia(copy) ->
 %%%=============================================================================
 
 %%------------------------------------------------------------------------------
-%% @doc
-%% Start Pubsub.
-%%
-%% @end
+%% @doc Start one pubsub.
 %%------------------------------------------------------------------------------
--spec start_link() -> {ok, pid()} | ignore | {error, any()}.
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+-spec start_link(Id, Opts) -> {ok, pid()} | ignore | {error, any()} when
+    Id   :: pos_integer(),
+    Opts :: list().
+start_link(Id, Opts) ->
+    gen_server:start_link(?MODULE, [Id, Opts], []).
 
 %%------------------------------------------------------------------------------
-%% @doc
-%% Create topic.
-%%
-%% @end
+%% @doc Create topic. Notice That this transaction is not protected by pubsub pool.
 %%------------------------------------------------------------------------------
--spec create(binary()) -> ok.
+-spec create(Topic :: binary()) -> ok | {error, Error :: any()}.
 create(Topic) when is_binary(Topic) ->
-    Record = #mqtt_topic{topic = Topic, node = node()},
-    {atomic, ok} = mnesia:transaction(fun insert_topic/1, [Record]), ok.
+    TopicR = #mqtt_topic{topic = Topic, node = node()},
+    case mnesia:transaction(fun add_topic/1, [TopicR]) of
+        {atomic, ok} -> setstats(topics), ok;
+        {aborted, Error} -> {error, Error}
+    end.  
 
 %%------------------------------------------------------------------------------
-%% @doc
-%% Subscribe topics
-%%
-%% @end
+%% @doc Subscribe topic.
 %%------------------------------------------------------------------------------
--spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when 
+-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when
     Topic   :: binary(),
     Qos     :: mqtt_qos().
-subscribe(Topics = [{_Topic, _Qos} | _]) ->
-    {ok, lists:map(fun({Topic, Qos}) ->
-            case subscribe(Topic, Qos) of
-                {ok, GrantedQos} -> 
-                    GrantedQos;
-                Error -> 
-                    lager:error("Failed to subscribe '~s': ~p", [Topic, Error]), 
-                    ?SUBACK_ERR
-            end
-        end, Topics)}.
 
--spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}.
-subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
-    TopicRecord = #mqtt_topic{topic = Topic, node = node()},
-    Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()},
-    F = fun() ->
-            case insert_topic(TopicRecord) of
-               ok -> insert_subscriber(Subscriber);
-               Error -> Error
-            end
-        end,
-    case mnesia:transaction(F) of
-        {atomic, ok} -> 
-            {ok, Qos};
-        {aborted, Reason} -> 
-            {error, Reason}
-    end.
+subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
+    call({subscribe, self(), Topic, Qos});
 
-%%------------------------------------------------------------------------------
-%% @doc
-%% Unsubscribe Topic or Topics
-%%
-%% @end
-%%------------------------------------------------------------------------------
+subscribe(Topics = [{_Topic, _Qos} | _]) ->
+    call({subscribe, self(), Topics}).
+
+%% @doc Unsubscribe Topic or Topics
 -spec unsubscribe(binary() | list(binary())) -> ok.
 unsubscribe(Topic) when is_binary(Topic) ->
-    SubPid = self(),
-    TopicRecord = #mqtt_topic{topic = Topic, node = node()},
-    F = fun() ->
-        %%TODO record name...
-        Pattern = #mqtt_subscriber{topic = Topic, _ = '_', subpid = SubPid},
-        [mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)],
-        try_remove_topic(TopicRecord)
-    end,
-    {atomic, _} = mneisa:transaction(F), ok;
+    cast({unsubscribe, self(), Topic});
 
 unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
-    lists:foreach(fun(T) -> unsubscribe(T) end, Topics).
+    cast({unsubscribe, self(), Topics}).
+
+call(Req) ->
+    Pid = gproc_pool:pick_worker(?POOL, self()),
+    lager:info("~p call ~p", [self(), Pid]),
+    gen_server:call(Pid, Req, infinity).
+
+cast(Msg) ->
+    Pid = gproc_pool:pick_worker(?POOL, self()),
+    gen_server:cast(Pid, Msg).
 
 %%------------------------------------------------------------------------------
-%% @doc
-%% Publish to cluster node.
-%%
-%% @end
+%% @doc Publish to cluster nodes.
 %%------------------------------------------------------------------------------
--spec publish(Msg :: mqtt_message()) -> ok.
-publish(Msg=#mqtt_message{topic=Topic}) ->
-	publish(Topic, Msg).
-
--spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any().
-publish(Topic, Msg) when is_binary(Topic) ->
+-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
+publish(From, Msg=#mqtt_message{topic=Topic}) ->
+    lager:info("~s PUBLISH to ~s", [From, Topic]),
+    %% Retain message first. Don't create retained topic.
+    case emqttd_msg_store:retain(Msg) of
+        ok ->
+            %TODO: why unset 'retain' flag?
+            publish(From, Topic, emqtt_message:unset_flag(Msg));
+        ignore ->
+            publish(From, Topic, Msg)
+     end.
+    
+publish(_From, Topic, Msg) when is_binary(Topic) ->
 	lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) ->
         case Node =:= node() of
             true -> dispatch(Name, Msg);
@@ -185,107 +160,113 @@ publish(Topic, Msg) when is_binary(Topic) ->
         end
 	end, match(Topic)).
 
-%%------------------------------------------------------------------------------
-%% @doc
-%% Dispatch Locally. Should only be called by publish.
-%%
-%% @end
-%%------------------------------------------------------------------------------
+%% @doc Dispatch message locally. should only be called by publish.
 -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
 dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
-    case mnesia:dirty_read(subscriber, Topic) of
-        [] -> 
-            %%TODO: not right when clusted...
-            setstats(dropped);
-        Subscribers ->
-            lists:foreach(
-                fun(#mqtt_subscriber{qos = SubQos, subpid=SubPid}) ->
-                        Msg1 = if
-                            Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
-                            true -> Msg
-                        end,
-                        SubPid ! {dispatch, {self(), Msg1}}
-                end, Subscribers)
-    end.
+    Subscribers = mnesia:dirty_read(subscriber, Topic),
+    setstats(dropped, Subscribers =:= []), %%TODO:...
+    lists:foreach(
+        fun(#mqtt_subscriber{qos = SubQos, pid=SubPid}) ->
+                Msg1 = if
+                    Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
+                    true -> Msg
+                end,
+                SubPid ! {dispatch, {self(), Msg1}}
+            end, Subscribers), 
+    length(Subscribers).
 
-%%------------------------------------------------------------------------------
-%% @doc
-%% @private
-%% Match topic.
-%%
-%% @end
-%%------------------------------------------------------------------------------
 -spec match(Topic :: binary()) -> [mqtt_topic()].
 match(Topic) when is_binary(Topic) ->
 	MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]),
-	lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]).
+	lists:append([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]).
 
 %%%=============================================================================
 %%% gen_server callbacks
 %%%=============================================================================
 
-init([]) ->
-    %%TODO: really need?
-    process_flag(priority, high),
+init([Id, _Opts]) ->
     process_flag(min_heap_size, 1024*1024),
-    mnesia:subscribe({table, topic, simple}),
-    mnesia:subscribe({table, subscriber, simple}),
-    {ok, #state{submap = maps:new()}}.
+    gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
+    {ok, #state{id = Id, submap = maps:new()}}.
+
+handle_call({subscribe, SubPid, Topics}, _From, State) ->
+    TopicSubs = lists:map(fun({Topic, Qos}) ->
+                    {#mqtt_topic{topic = Topic, node = node()},
+                     #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}
+                end, Topics),
+    F = fun() ->
+            lists:map(fun add_subscriber/1, TopicSubs) 
+        end,
+    case mnesia:transaction(F) of
+        {atomic, _Result} ->
+            setstats(all),
+            NewState = monitor_subscriber(SubPid, State),
+            %% grant all qos
+            {reply, {ok, [Qos || {_Topic, Qos} <- Topics]}, NewState};
+        {aborted, Error} ->
+            {reply, {error, Error}, State}
+    end;
+
+handle_call({subscribe, SubPid, Topic, Qos}, _From, State) ->
+    TopicR = #mqtt_topic{topic = Topic, node = node()},
+    Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid},
+    case mnesia:transaction(fun add_subscriber/1, [{TopicR, Subscriber}]) of
+        {atomic, ok} -> 
+            setstats(all),
+            {reply, {ok, Qos}, monitor_subscriber(SubPid, State)};
+        {aborted, Error} -> 
+            {reply, {error, Error}, State}
+    end;
 
 handle_call(Req, _From, State) ->
     lager:error("Bad Request: ~p", [Req]),
 	{reply, {error, badreq}, State}.
 
-handle_cast(Msg, State) ->
-    lager:error("Bad Msg: ~p", [Msg]),
-	{noreply, State}.
-
-handle_info({mnesia_table_event, {write, #mqtt_subscriber{subpid = Pid}, _ActivityId}},
-            State = #state{submap = SubMap}) ->
-    NewSubMap =
-    case maps:is_key(Pid, SubMap) of
-        false ->
-            maps:put(Pid, erlang:monitor(process, Pid), SubMap);
-        true ->
-            SubMap
+handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
+    TopicSubs = lists:map(fun(Topic) ->
+                    {#mqtt_topic{topic = Topic, node = node()},
+                     #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid}}
+                end, Topics),
+    F = fun() -> lists:foreach(fun remove_subscriber/1, TopicSubs) end,
+    case mnesia:transaction(F) of
+        {atomic, _} -> ok;
+        {aborted, Error} -> lager:error("unsubscribe ~p error: ~p", [Topics, Error])
     end,
-    setstats(subscribers),
-    {noreply, State#state{submap = NewSubMap}};
-
-handle_info({mnesia_table_event, {write, #mqtt_topic{}, _ActivityId}}, State) ->
-    %%TODO: this is not right when clusterd.
-    setstats(topics),
+    setstats(all),
     {noreply, State};
 
-%% {write, #topic{}, _ActivityId}
-%% {delete_object, _OldRecord, _ActivityId}
-%% {delete, {Tab, Key}, ActivityId}
-handle_info({mnesia_table_event, _Event}, State) ->
-    setstats(topics),
-    setstats(subscribers),
+handle_cast({unsubscribe, SubPid, Topic}, State) ->
+    TopicR = #mqtt_topic{topic = Topic, node = node()},
+    Subscriber = #mqtt_subscriber{topic = Topic, _ = '_', pid = SubPid},
+    case mnesia:transaction(fun remove_subscriber/1, [{TopicR, Subscriber}]) of
+        {atomic, _} -> ok;
+        {aborted, Error} -> lager:error("unsubscribe ~s error: ~p", [Topic, Error])
+    end,
+    setstats(all),
     {noreply, State};
 
+handle_cast(Msg, State) ->
+    lager:error("Bad Msg: ~p", [Msg]),
+	{noreply, State}.
+
 handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) ->
     case maps:is_key(DownPid, SubMap) of
         true ->
             Node = node(),
             F = fun() -> 
-                    Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid),
+                    Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.pid),
                     lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) ->
                                 mnesia:delete_object(subscriber, Sub, write),
                                 try_remove_topic(#mqtt_topic{topic = Topic, node = Node})
                         end, Subscribers)
             end,
-            NewState =
             case catch mnesia:transaction(F) of
-                {atomic, _} ->
-                    State#state{submap = maps:remove(DownPid, SubMap)};
+                {atomic, _} -> ok;
                 {aborted, Reason} ->
-                    lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]),
-                    State
+                    lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason])
             end,
-            setstats(topics), setstats(subscribers),
-            {noreply, NewState};
+            setstats(all),
+            {noreply, State#state{submap = maps:remove(DownPid, SubMap)}};
         false ->
             lager:error("Unexpected 'DOWN' from ~p", [DownPid]),
             {noreply, State}
@@ -296,10 +277,13 @@ handle_info(Info, State) ->
 	{noreply, State}.
 
 terminate(_Reason, _State) ->
-    mnesia:unsubscribe({table, topic, simple}),
-    mnesia:unsubscribe({table, subscriber, simple}),
-    %%TODO: clear topics belongs to this node???
-    ok.
+    TopicR = #mqtt_topic{_ = '_', node = node()},
+    F = fun() ->
+            [mnesia:delete_object(topic, R, write) || R <- mnesia:match_object(topic, TopicR, write)]
+            %%TODO: remove trie??
+        end,
+    mnesia:transaction(F),
+    setstats(all).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -307,25 +291,45 @@ code_change(_OldVsn, State, _Extra) ->
 %%%=============================================================================
 %%% Internal functions
 %%%=============================================================================
-insert_topic(Record = #mqtt_topic{topic = Topic}) ->
+
+add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
     case mnesia:wread({topic, Topic}) of
         [] ->
             ok = emqttd_trie:insert(Topic),
-            mnesia:write(topic, Record, write);
+            mnesia:write(topic, TopicR, write);
         Records ->
-            case lists:member(Record, Records) of
+            case lists:member(TopicR, Records) of
                 true -> ok;
-                false -> mnesia:write(topic, Record, write)
+                false -> mnesia:write(topic, TopicR, write)
             end
     end.
 
-insert_subscriber(Subscriber) ->
-    mnesia:write(subscriber, Subscriber, write).
+add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
+    case add_topic(TopicR) of
+        ok ->
+            mnesia:write(subscriber, Subscriber, write);
+        Error -> 
+            Error
+    end.
 
-try_remove_topic(Record = #mqtt_topic{topic = Topic}) ->
+monitor_subscriber(SubPid, State = #state{submap = SubMap}) ->
+    NewSubMap = case maps:is_key(SubPid, SubMap) of
+        false ->
+            maps:put(SubPid, erlang:monitor(process, SubPid), SubMap);
+        true ->
+            SubMap
+    end,
+    State#state{submap = NewSubMap}.
+
+remove_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
+    [mnesia:delete_object(subscriber, Sub, write) || 
+        Sub <- mnesia:match_object(subscriber, Subscriber, write)],
+    try_remove_topic(TopicR).
+
+try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
     case mnesia:read({subscriber, Topic}) of
         [] ->
-            mnesia:delete_object(topic, Record, write),
+            mnesia:delete_object(topic, TopicR, write),
             case mnesia:read(topic, Topic) of
                 [] -> emqttd_trie:delete(Topic);		
                 _ -> ok
@@ -334,13 +338,23 @@ try_remove_topic(Record = #mqtt_topic{topic = Topic}) ->
             ok
  	end.
 
+%%%=============================================================================
+%%% Stats functions
+%%%=============================================================================
+setstats(all) ->
+    setstats(topics),
+    setstats(subscribers);
 setstats(topics) ->
-    emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size));
-
+    emqttd_broker:setstat('topics/count',
+                          mnesia:table_info(topic, size));
 setstats(subscribers) ->
     emqttd_broker:setstats('subscribers/count',
                            'subscribers/max',
-                           mnesia:table_info(subscriber, size));
-setstats(dropped) ->
+                           mnesia:table_info(subscriber, size)).
+
+setstats(dropped, false) ->
+    ignore;
+setstats(dropped, true) ->
     emqttd_metrics:inc('messages/dropped').
 
+

+ 56 - 0
apps/emqttd/src/emqttd_pubsub_sup.erl

@@ -0,0 +1,56 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd pubsub supervisor.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_pubsub_sup).
+
+-author('feng@emqtt.io').
+
+-include("emqttd.hrl").
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/1]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+start_link(Opts) ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]).
+
+init([Opts]) ->
+    Schedulers = erlang:system_info(schedulers),
+    PoolSize = proplists:get_value(pool_size, Opts, Schedulers),
+    gproc_pool:new(pubsub, hash, [{size, PoolSize}]),
+    Children = lists:map(
+                 fun(I) ->
+                    Name = {emqttd_pubsub, I},
+                    gproc_pool:add_worker(pubsub, Name, I),
+                    {Name, {emqttd_pubsub, start_link, [I, Opts]},
+                        permanent, 5000, worker, [emqttd_pubsub]}
+                 end, lists:seq(1, PoolSize)),
+    {ok, {{one_for_all, 10, 100}, Children}}.
+

+ 0 - 94
apps/emqttd/src/emqttd_router.erl

@@ -1,94 +0,0 @@
-%%-----------------------------------------------------------------------------
-%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
-%% 
-%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%% of this software and associated documentation files (the "Software"), to deal
-%% in the Software without restriction, including without limitation the rights
-%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the Software is
-%% furnished to do so, subject to the following conditions:
-%% 
-%% The above copyright notice and this permission notice shall be included in all
-%% copies or substantial portions of the Software.
-%% 
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%% SOFTWARE.
-%%------------------------------------------------------------------------------
-
-%%TODO: route chain... statistics
--module(emqttd_router).
-
--include_lib("emqtt/include/emqtt.hrl").
-
--behaviour(gen_server).
-
--define(SERVER, ?MODULE).
-
-%% API Function Exports
--export([start_link/0]).
-
-%%Router Chain--> --->In Out<---
--export([route/2]).
-
-%% gen_server Function Exports
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
--record(state, {}).
-
-%%%=============================================================================
-%%% API
-%%%=============================================================================
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Start emqttd router.
-%%
-%% @end
-%%------------------------------------------------------------------------------
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%%------------------------------------------------------------------------------
-%% @doc
-%% Route mqtt message. From is clienid or module.
-%%
-%% @end
-%%------------------------------------------------------------------------------
--spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok.
-route(From, Msg) ->
-    lager:info("Route ~s from ~s", [emqtt_message:format(Msg), From]),
-    emqttd_msg_store:retain(Msg),
-	emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)).
-
-%%%=============================================================================
-%%% gen_server callbacks
-%%%=============================================================================
-init([]) ->
-    {ok, #state{}, hibernate}.
-
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-	
-%%%=============================================================================
-%%% Internal functions
-%%%=============================================================================
-

+ 3 - 3
apps/emqttd/src/emqttd_session.erl

@@ -103,10 +103,10 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
 %%------------------------------------------------------------------------------
 -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session().
 publish(Session, ClientId, {?QOS_0, Message}) ->
-    emqttd_router:route(ClientId, Message), Session;
+    emqttd_pubsub:publish(ClientId, Message), Session;
 
 publish(Session, ClientId, {?QOS_1, Message}) ->
-	emqttd_router:route(ClientId, Message), Session;
+	emqttd_pubsub:publish(ClientId, Message), Session;
 
 publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId,
         {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
@@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
 puback(SessState = #session_state{clientid = ClientId,
                                   awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
     case maps:find(PacketId, Awaiting) of
-        {ok, Msg} -> emqttd_router:route(ClientId, Msg);
+        {ok, Msg} -> emqttd_pubsub:publish(ClientId, Msg);
         error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId])
     end,
     SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)};

+ 6 - 4
apps/emqttd/src/emqttd_sm.erl

@@ -44,7 +44,7 @@
 
 -define(SERVER, ?MODULE).
 
--define(SESSION_TABLE, mqtt_session).
+-define(SESSION_TAB, mqtt_session).
 
 %% API Function Exports
 -export([start_link/0]).
@@ -72,7 +72,7 @@ start_link() ->
 %%------------------------------------------------------------------------------
 -spec lookup_session(binary()) -> pid() | undefined.
 lookup_session(ClientId) ->
-    case ets:lookup(?SESSION_TABLE, ClientId) of
+    case ets:lookup(?SESSION_TAB, ClientId) of
         [{_, SessPid, _}] ->  SessPid;
         [] -> undefined
     end.
@@ -103,7 +103,7 @@ destroy_session(ClientId) ->
 
 init([]) ->
     process_flag(trap_exit, true),
-    TabId = ets:new(?SESSION_TABLE, [set, protected, named_table]),
+    TabId = ets:new(?SESSION_TAB, [set, protected, named_table]),
     {ok, #state{tab = TabId}}.
 
 handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) ->
@@ -157,8 +157,10 @@ code_change(_OldVsn, State, _Extra) ->
 %%%=============================================================================
 %%% Internal functions
 %%%=============================================================================
+
 setstats(State) ->
     emqttd_broker:setstats('sessions/count',
                            'sessions/max',
-                           ets:info(?SESSION_TABLE, size)), State.
+                           ets:info(?SESSION_TAB, size)), State.
+
 

+ 3 - 1
apps/emqttd/src/emqttd_sysmon.erl

@@ -55,7 +55,9 @@ start_link() ->
 %%%=============================================================================
 
 init([]) ->
-    erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]),
+    erlang:system_monitor(self(), [{long_gc, 5000},
+                                   {large_heap, 8 * 1024 * 1024},
+                                   busy_port]),
     {ok, #state{}}.
 
 handle_call(Request, _From, State) ->

+ 15 - 1
apps/emqttd/src/emqttd_utils.erl

@@ -40,7 +40,7 @@ all_module_attributes(Name) ->
         lists:usort(
           lists:append(
             [[{App, Module} || Module <- Modules] ||
-                {App, _, _}   <- application:loaded_applications(),
+                {App, _, _}   <- ignore_lib_apps(application:loaded_applications()),
                 {ok, Modules} <- [application:get_key(App, modules)]])),
     lists:foldl(
       fun ({App, Module}, Acc) ->
@@ -62,3 +62,17 @@ module_attributes(Module) ->
             V
     end.
 
+ignore_lib_apps(Apps) ->
+    LibApps = [kernel, stdlib, sasl,
+               syntax_tools, ssl, crypto,
+               mnesia, os_mon, inets,
+               goldrush, lager, gproc,
+               runtime_tools, snmp, otp_mibs,
+               public_key, asn1, ssh,
+               common_test, observer, webtool,
+               xmerl, tools, test_server,
+               compiler, debugger, eunit,
+               et, gen_logger, wx,
+               hipe, esockd, mochiweb],
+    [App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)].
+

+ 1 - 0
doc/pubsub.md

@@ -14,6 +14,7 @@ PubQos | SubQos | In Message | Out Message
    2   |   1    |   -        | - 
    2   |   2    |   -        | - 
 
+
 ## Publish
 
 

+ 19 - 0
doc/route.md

@@ -0,0 +1,19 @@
+
+
+ClientA -> SessionA -> Route -> PubSub -> SessionB -> ClientB
+
+
+ClientA -> Session -> PubSub -> Route -> SessionB -> ClientB
+                        |        |
+                      Trie    Subscriber                 
+
+
+ClientPidA -> ClientPidB
+
+
+ClientPidA -> SessionPidB -> ClientB
+
+
+ClientPidA -> SessionPidA -> SessionPidB -> ClientPidB
+
+

+ 51 - 0
plugins/emqttd_plugin_demo/src/emqttd_demo_acl.erl

@@ -0,0 +1,51 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd demo acl module.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_demo_acl).
+
+-author('feng@emqtt.io').
+
+-include_lib("emqttd/include/emqttd.hrl").
+
+-behaviour(emqttd_acl).
+
+-export([check_acl/3, reload_acl/0, description/0]).
+
+-spec check_acl(User, PubSub, Topic) -> {ok, allow | deny} | ignore | {error, any()} when
+    User     :: mqtt_user(),
+    PubSub   :: publish | subscribe,
+    Topic    :: binary().
+check_acl(_User, _PubSub, _Topic) ->
+    ignore.
+
+reload_acl() ->
+    ok.
+
+description() ->
+    "Demo ACL Module".
+
+
+

+ 41 - 0
plugins/emqttd_plugin_demo/src/emqttd_demo_auth.erl

@@ -0,0 +1,41 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd demo auth module.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_demo_auth).
+
+-author('feng@emqtt.io').
+
+-include_lib("emqttd/include/emqttd.hrl").
+
+-behaviour(emqttd_auth).
+
+%% callbacks...
+-export([check_login/2]).
+
+-spec check_login(mqtt_user(), undefined | binary()) -> true | false | ignore.
+check_login(User, Password) ->
+    true.
+

+ 12 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo.app.src

@@ -0,0 +1,12 @@
+{application, emqttd_plugin_demo,
+ [
+  {description, ""},
+  {vsn, "1"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  {mod, { emqttd_plugin_demo_app, []}},
+  {env, []}
+ ]}.

+ 16 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_app.erl

@@ -0,0 +1,16 @@
+-module(emqttd_plugin_demo_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    emqttd_plugin_demo_sup:start_link().
+
+stop(_State) ->
+    ok.

+ 27 - 0
plugins/emqttd_plugin_demo/src/emqttd_plugin_demo_sup.erl

@@ -0,0 +1,27 @@
+-module(emqttd_plugin_demo_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, []} }.
+

+ 2 - 1
rebar.config

@@ -18,11 +18,12 @@
 {validate_app_modules, true}.
 
 {sub_dirs, [
-    "rel", 
+    "rel",
     "apps/emqtt",
     "apps/emqttd"]}.
 
 {deps, [
+	{gproc, "0.3.*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}},
 	{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
 	{esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}},
 	{mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}}

+ 1 - 1
rel/files/acl.config

@@ -20,7 +20,7 @@
 
 {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
 
-{deny, all, subscribe, ["$SYS/#", "#"]}.
+{deny, all, subscribe, [{eq, "$SYS/#"}, {eq, "#"}]}.
 
 {allow, all}.
 

+ 2 - 0
rel/files/app.config

@@ -72,6 +72,8 @@
         {max_message_num, 100000},
         {max_playload_size, 16#ffff}
     ]},
+    %% PubSub
+    {pubsub, []},
     %% Broker
     {broker, [
         {sys_interval, 60}

+ 1 - 1
rel/files/vm.args

@@ -24,5 +24,5 @@
 #-env ERL_MAX_ETS_TABLES 1024
 
 ## Tweak GC to run more often
-##-env ERL_FULLSWEEP_AFTER 10
+##-env ERL_FULLSWEEP_AFTER 1000
 #

+ 2 - 0
rel/reltool.config

@@ -15,6 +15,7 @@
 		 inets,
 		 goldrush,
 		 lager,
+         gproc,
 		 esockd,
 		 mochiweb,
          emqttd
@@ -45,6 +46,7 @@
        {app, inets, [{mod_cond, app},{incl_cond, include}]},
        {app, goldrush, [{mod_cond, app}, {incl_cond, include}]},
        {app, lager, [{mod_cond, app}, {incl_cond, include}]},
+       {app, gproc, [{mod_cond, app}, {incl_cond, include}]},
        {app, esockd, [{mod_cond, app}, {incl_cond, include}]},
        {app, mochiweb, [{mod_cond, app}, {incl_cond, include}]},
        {app, emqtt, [{mod_cond, app}, {incl_cond, include}]},