Feng 9 anos atrás
pai
commit
2d81464fc0

+ 11 - 0
src/emqttd.erl

@@ -35,6 +35,9 @@
 %% Hooks API
 -export([hook/4, hook/3, unhook/2, run_hooks/3]).
 
+%% Adapter
+-export([adapter/1]).
+
 %% Debug API
 -export([dump/0]).
 
@@ -157,6 +160,14 @@ unhook(Hook, Function) ->
 run_hooks(Hook, Args, Acc) ->
     emqttd_hook:run(Hook, Args, Acc).
 
+%%--------------------------------------------------------------------
+%% Adapter
+%%--------------------------------------------------------------------
+
+adapter(server) -> env(pubsub_server,  emqttd_server);
+adapter(pubsub) -> env(pubsub_adapter, emqttd_pubsub);
+adapter(bridge) -> env(bridge_adapter, emqttd_bridge).
+
 %%--------------------------------------------------------------------
 %% Debug
 %%--------------------------------------------------------------------

+ 1 - 1
src/emqttd_app.erl

@@ -91,7 +91,7 @@ start_servers(Sup) ->
                {"emqttd broker", emqttd_broker},
                {"emqttd alarm", emqttd_alarm},
                {"emqttd mod supervisor", emqttd_mod_sup},
-               {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
+               {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup_sup}},
                {"emqttd access control", emqttd_access_control},
                {"emqttd system monitor", {supervisor, emqttd_sysmon_sup}}],
     [start_server(Sup, Server) || Server <- Servers].

+ 8 - 39
src/emqttd_bridge_sup.erl

@@ -18,56 +18,25 @@
 
 -behavior(supervisor).
 
--export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]).
+-export([start_link/3]).
 
 -export([init/1]).
 
--define(BRIDGE_ID(Node, Topic), {bridge, Node, Topic}).
-
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
 
 %% @doc Start bridge supervisor
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-%% @doc List all bridges
--spec(bridges() -> [{tuple(), pid()}]).
-bridges() ->
-    [{{Node, Topic}, Pid} || {?BRIDGE_ID(Node, Topic), Pid, worker, _}
-                             <- supervisor:which_children(?MODULE)].
-
-%% @doc Start a bridge
--spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}).
-start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
-    start_bridge(Node, Topic, []).
-
--spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
-start_bridge(Node, _Topic, _Options) when Node =:= node() ->
-    {error, bridge_to_self};
-start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
-    Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options),
-    supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
-
-%% @doc Stop a bridge
--spec(stop_bridge(atom(), binary()) -> {ok, pid()} | ok).
-stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
-    ChildId = ?BRIDGE_ID(Node, Topic),
-    case supervisor:terminate_child(?MODULE, ChildId) of
-        ok    -> supervisor:delete_child(?MODULE, ChildId);
-        Error -> Error
-    end.
+-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
+start_link(Node, Topic, Options) ->
+    supervisor:start_link(?MODULE, [Node, Topic, Options]).
 
 %%--------------------------------------------------------------------
 %% Supervisor callbacks
 %%--------------------------------------------------------------------
 
-init([]) ->
-    {ok, {{one_for_one, 10, 100}, []}}.
-
-bridge_spec(Node, Topic, Options) ->
-    ChildId = ?BRIDGE_ID(Node, Topic),
-    {ChildId, {emqttd_bridge, start_link, [Node, Topic, Options]},
-        transient, 10000, worker, [emqttd_bridge]}.
+init([Node, Topic, Options]) ->
+    {ok, {{one_for_all, 10, 100},
+          [{bridge, {emqttd_bridge, start_link, [Node, Topic, Options]},
+            transient, 10000, worker, [emqttd_bridge]}]}}.
 

+ 76 - 0
src/emqttd_bridge_sup_sup.erl

@@ -0,0 +1,76 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_bridge_sup_sup).
+
+-behavior(supervisor).
+
+-export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]).
+
+-export([init/1]).
+
+-define(CHILD_ID(Node, Topic), {bridge_sup, Node, Topic}).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+%% @doc List all bridges
+-spec(bridges() -> [{node(), binary(), pid()}]).
+bridges() ->
+    [{Node, Topic, Pid} || {?CHILD_ID(Node, Topic), Pid, supervisor, _}
+                             <- supervisor:which_children(?MODULE)].
+
+%% @doc Start a bridge
+-spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}).
+start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
+    start_bridge(Node, Topic, []).
+
+-spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
+start_bridge(Node, _Topic, _Options) when Node =:= node() ->
+    {error, bridge_to_self};
+start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
+    Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options),
+    supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
+
+%% @doc Stop a bridge
+-spec(stop_bridge(atom(), binary()) -> {ok, pid()} | ok).
+stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
+    ChildId = ?CHILD_ID(Node, Topic),
+    case supervisor:terminate_child(?MODULE, ChildId) of
+        ok    -> supervisor:delete_child(?MODULE, ChildId);
+        Error -> Error
+    end.
+
+%%--------------------------------------------------------------------
+%% Supervisor callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    {ok, {{one_for_one, 10, 100}, []}}.
+
+bridge_spec(Node, Topic, Options) ->
+    SupMod = sup_mod(emqttd:adapter(bridge)),
+    {?CHILD_ID(Node, Topic),
+      {SupMod, start_link, [Node, Topic, Options]},
+        permanent, infinity, supervisor, [SupMod]}.
+
+sup_mod(Adaper) ->
+    list_to_atom(atom_to_list(Adaper) ++ "_sup").
+

+ 43 - 34
src/emqttd_cli.erl

@@ -217,41 +217,45 @@ topics(_) ->
             {"topics show <Topic>", "Show a topic"}]).
 
 subscriptions(["list"]) ->
-    lists:foreach(fun({Sub, Topic, Opts}) when is_pid(Sub) ->
-                ?PRINT("~p -> ~s: ~p~n", [Sub, Topic, Opts]);
-                 ({Sub, Topic, Opts}) ->
-                ?PRINT("~s -> ~s: ~p~n", [Sub, Topic, Opts])
-        end, emqttd:subscriptions());
+    lists:foreach(fun(Subscription) ->
+                      print(subscription, Subscription)
+                  end, []); %%emqttd:subscriptions());
 
 subscriptions(["show", ClientId]) ->
-    case ets:dirty_read(mqtt_subscription, bin(ClientId)) of
+    case ets:lookup(mqtt_subscription, bin(ClientId)) of
         []      -> ?PRINT_MSG("Not Found.~n");
-        Records -> print(Records)
+        Records -> [print(subscription, Subscription) || Subscription <- Records]
     end;
 
-subscriptions(["add", ClientId, Topic, QoS]) ->
-    Add = fun(IntQos) ->
-            Subscription = #mqtt_subscription{subid = bin(ClientId),
-                                              topic = bin(Topic),
-                                              qos   = IntQos},
-            case emqttd_backend:add_subscription(Subscription) of
-                ok ->
-                    ?PRINT_MSG("ok~n");
-                {error, already_existed} ->
-                    ?PRINT_MSG("Error: already existed~n");
-                {error, Reason} ->
-                    ?PRINT("Error: ~p~n", [Reason])
-            end
-          end,
-    if_valid_qos(QoS, Add);
-
-subscriptions(["del", ClientId]) ->
-    Ok = emqttd_backend:del_subscriptions(bin(ClientId)),
-    ?PRINT("~p~n", [Ok]);
-
-subscriptions(["del", ClientId, Topic]) ->
-    Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)),
-    ?PRINT("~p~n", [Ok]);
+%%
+%% subscriptions(["add", ClientId, Topic, QoS]) ->
+%%    Add = fun(IntQos) ->
+%%            Subscription = #mqtt_subscription{subid = bin(ClientId),
+%%                                              topic = bin(Topic),
+%%                                              qos   = IntQos},
+%%            case emqttd_backend:add_subscription(Subscription) of
+%%                ok ->
+%%                    ?PRINT_MSG("ok~n");
+%%                {error, already_existed} ->
+%%                    ?PRINT_MSG("Error: already existed~n");
+%%                {error, Reason} ->
+%%                    ?PRINT("Error: ~p~n", [Reason])
+%%            end
+%%          end,
+%%    if_valid_qos(QoS, Add);
+%%
+
+%%
+%% subscriptions(["del", ClientId]) ->
+%%    Ok = emqttd_backend:del_subscriptions(bin(ClientId)),
+%%    ?PRINT("~p~n", [Ok]);
+%%
+
+%%
+%% subscriptions(["del", ClientId, Topic]) ->
+%%    Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)),
+%%    ?PRINT("~p~n", [Ok]);
+%%
 
 subscriptions(_) ->
     ?USAGE([{"subscriptions list",                         "List all subscriptions"},
@@ -306,7 +310,7 @@ plugins(_) ->
 bridges(["list"]) ->
     foreach(fun({{Node, Topic}, _Pid}) ->
                 ?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node])
-            end, emqttd_bridge_sup:bridges());
+            end, emqttd_bridge_sup_sup:bridges());
 
 bridges(["options"]) ->
     ?PRINT_MSG("Options:~n"),
@@ -318,20 +322,20 @@ bridges(["options"]) ->
     ?PRINT_MSG("  qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
 
 bridges(["start", SNode, Topic]) ->
-    case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
+    case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
         {ok, _}        -> ?PRINT_MSG("bridge is started.~n");
         {error, Error} -> ?PRINT("error: ~p~n", [Error])
     end;
 
 bridges(["start", SNode, Topic, OptStr]) ->
     Opts = parse_opts(bridge, OptStr),
-    case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of
+    case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of
         {ok, _}        -> ?PRINT_MSG("bridge is started.~n");
         {error, Error} -> ?PRINT("error: ~p~n", [Error])
     end;
 
 bridges(["stop", SNode, Topic]) ->
-    case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
+    case emqttd_bridge_sup_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
         ok             -> ?PRINT_MSG("bridge is stopped.~n");
         {error, Error} -> ?PRINT("error: ~p~n", [Error])
     end;
@@ -526,6 +530,11 @@ print({ClientId, _ClientPid, CleanSess, SessInfo}) ->
            "created_at=~w)~n",
             [ClientId, CleanSess | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
 
+print(subscription, {Sub, Topic, Opts}) when is_pid(Sub) ->
+    ?PRINT("~p -> ~s: ~p~n", [Sub, Topic, Opts]);
+print(subscription, {Sub, Topic, Opts}) ->
+    ?PRINT("~s -> ~s: ~p~n", [Sub, Topic, Opts]).
+
 format(created_at, Val) ->
     emqttd_time:now_to_secs(Val);
 

+ 3 - 3
src/emqttd_pool_sup.erl

@@ -41,10 +41,10 @@ start_link(Pool, Type, MFA) ->
 
 -spec(start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}).
 start_link(Pool, Type, Size, MFA) ->
-    supervisor:start_link({local, sup_name(Pool)}, ?MODULE, [Pool, Type, Size, MFA]).
+    supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]).
 
-sup_name(Pool) when is_atom(Pool) ->
-    list_to_atom(atom_to_list(Pool) ++ "_pool_sup").
+%% sup_name(Pool) when is_atom(Pool) ->
+%%    list_to_atom(atom_to_list(Pool) ++ "_pool_sup").
 
 init([Pool, Type, Size, {M, F, Args}]) ->
     ensure_pool(Pool, Type, [{size, Size}]),

+ 4 - 8
src/emqttd_pubsub.erl

@@ -162,18 +162,14 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 add_subscriber_(Topic, Subscriber) ->
-    case ets:member(mqtt_subscriber, Topic) of
-        false -> emqttd_router:add_route(Topic, node());
-        true  -> ok
-    end,
+    (not ets:member(mqtt_subscriber, Topic))
+        andalso emqttd_router:add_route(Topic),
     ets:insert(mqtt_subscriber, {Topic, Subscriber}).
 
 del_subscriber_(Topic, Subscriber) ->
     ets:delete_object(mqtt_subscriber, {Topic, Subscriber}),
-    case ets:member(mqtt_subscriber, Topic) of
-        false -> emqttd_router:del_route(Topic, node());
-        true  -> ok
-    end.
+    (not ets:member(mqtt_subscriber, Topic))
+        andalso emqttd_router:del_route(Topic).
 
 setstats(State) ->
     emqttd_stats:setstats('subscribers/count', 'subscribers/max',

+ 1 - 9
src/emqttd_pubsub_sup.erl

@@ -57,17 +57,9 @@ pool_size(Env) ->
 
 pool_sup(Name, Env) ->
     Pool = list_to_atom(atom_to_list(Name) ++ "_pool"),
-    MFA = {adapter(Name), start_link, [Env]},
+    MFA = {emqttd:adapter(Name), start_link, [Env]},
     emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]).
 
-%%--------------------------------------------------------------------
-%% Adapter
-%%--------------------------------------------------------------------
-
-adapter(server) ->
-    emqttd:env(pubsub_server, emqttd_server);
-adapter(pubsub) ->
-    emqttd:env(pubsub_adapter, emqttd_pubsub).
 
 %%--------------------------------------------------------------------
 %% Create PubSub Tables

+ 11 - 11
src/emqttd_server.erl

@@ -172,13 +172,13 @@ init([Pool, Id, Env]) ->
     {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
 
 handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
-    case subscribe_(Topic, Subscriber, Options, State) of
+    case do_subscribe_(Topic, Subscriber, Options, State) of
         {ok, NewState} -> {reply, ok, setstats(NewState)};
         {error, Error} -> {reply, {error, Error}, State}
     end;
 
 handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
-    case unsubscribe_(Topic, Subscriber, State) of
+    case do_unsubscribe_(Topic, Subscriber, State) of
         {ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
         {error, Error} -> {reply, {error, Error}, State}
     end;
@@ -198,13 +198,13 @@ handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
 
 handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
-    case subscribe_(Topic, Subscriber, Options, State) of
+    case do_subscribe_(Topic, Subscriber, Options, State) of
         {ok, NewState}  -> {noreply, setstats(NewState)};
         {error, _Error} -> {noreply, State}
     end;
 
 handle_cast({unsubscribe, Topic, Subscriber}, State) ->
-    case unsubscribe_(Topic, Subscriber, State) of
+    case do_unsubscribe_(Topic, Subscriber, State) of
         {ok, NewState}  -> {noreply, setstats(NewState), hibernate};
         {error, _Error} -> {noreply, State}
     end;
@@ -233,7 +233,7 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Functions
 %%--------------------------------------------------------------------
 
-subscribe_(Topic, Subscriber, Options, State) ->
+do_subscribe_(Topic, Subscriber, Options, State) ->
     case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
         [] ->
             emqttd_pubsub:async_subscribe(Topic, Subscriber),
@@ -244,7 +244,12 @@ subscribe_(Topic, Subscriber, Options, State) ->
             {error, {already_subscribed, Topic}}
     end.
 
-unsubscribe_(Topic, Subscriber, State) ->
+monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
+    State#state{submon = PMon:monitor(SubPid)};
+monitor_subpid(_SubPid, State) ->
+    State.
+
+do_unsubscribe_(Topic, Subscriber, State) ->
     case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
         [_] ->
             emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
@@ -258,11 +263,6 @@ unsubscribe_(Topic, Subscriber, State) ->
             {error, {subscription_not_found, Topic}}
     end.
 
-monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
-    State#state{submon = PMon:monitor(SubPid)};
-monitor_subpid(_SubPid, State) ->
-    State.
-
 demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
     State#state{submon = PMon:demonitor(SubPid)};
 demonitor_subpid(_SubPid, State) ->

+ 16 - 10
src/emqttd_session.erl

@@ -284,14 +284,18 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
 handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
 
-handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,
-                                                                 subscriptions = Subscriptions}) ->
-
+handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id     = ClientId,
+                                                                   subscriptions = Subscriptions}) ->
+    %% TODO: Ugly...
+    TopicTable0 = lists:map(fun({T, Q}) ->
+                                {T1, Opts} = emqttd_topic:strip(T),
+                                {T1, [{qos, Q} | Opts]}
+                            end, RawTopicTable),
     case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of
         {ok, TopicTable} ->
             ?LOG(info, "Subscribe ~p", [TopicTable], Session),
             Subscriptions1 = lists:foldl(
-                fun({Topic, Qos}, SubDict) ->
+                fun({Topic, Opts = [{qos, Qos}|_]}, SubDict) ->
                     case dict:find(Topic, SubDict) of
                         {ok, Qos} ->
                             ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
@@ -301,7 +305,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     =
                             ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
                             dict:store(Topic, Qos, SubDict);
                         error ->
-                            emqttd:subscribe(Topic, ClientId, [{qos, Qos}]),
+                            emqttd:subscribe(Topic, ClientId, Opts),
                             %%TODO: the design is ugly...
                             %% <MQTT V3.1.1>: 3.8.4
                             %% Where the Topic Filter is not identical to any existing Subscription’s filter,
@@ -311,7 +315,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     =
                             dict:store(Topic, Qos, SubDict)
                     end
                 end, Subscriptions, TopicTable),
-            AckFun([Qos || {_, Qos} <- TopicTable]),
+            AckFun([Qos || {_, Qos} <- RawTopicTable]),
             emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable),
             hibernate(Session#session{subscriptions = Subscriptions1});
         {stop, TopicTable} ->
@@ -319,9 +323,11 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     =
             hibernate(Session)
     end;
 
-handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
-                                                       subscriptions = Subscriptions}) ->
-
+handle_cast({unsubscribe, RawTopics}, Session = #session{client_id     = ClientId,
+                                                         subscriptions = Subscriptions}) ->
+    Topics0 = lists:map(fun(Topic) ->
+                            {T, _Opts} = emqttd_topic:strip(Topic), T
+                        end, RawTopics),
     case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of
         {ok, Topics} ->
             ?LOG(info, "unsubscribe ~p", [Topics], Session),
@@ -329,7 +335,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
                 fun(Topic, SubDict) ->
                     case dict:find(Topic, SubDict) of
                         {ok, _Qos} ->
-                            emqttd:unsubscribe(ClientId, Topic),
+                            emqttd:unsubscribe(Topic, ClientId),
                             dict:erase(Topic, SubDict);
                         error ->
                             SubDict