Feng Lee 9 лет назад
Родитель
Сommit
bc5cfb4b36
4 измененных файлов с 21 добавлено и 49 удалено
  1. 11 0
      src/emqttd.erl
  2. 1 1
      src/emqttd_app.erl
  3. 8 39
      src/emqttd_bridge_sup.erl
  4. 1 9
      src/emqttd_pubsub_sup.erl

+ 11 - 0
src/emqttd.erl

@@ -35,6 +35,9 @@
 %% Hooks API
 -export([hook/4, hook/3, unhook/2, run_hooks/3]).
 
+%% Adapter
+-export([adapter/1]).
+
 %% Debug API
 -export([dump/0]).
 
@@ -157,6 +160,14 @@ unhook(Hook, Function) ->
 run_hooks(Hook, Args, Acc) ->
     emqttd_hook:run(Hook, Args, Acc).
 
+%%--------------------------------------------------------------------
+%% Adapter
+%%--------------------------------------------------------------------
+
+adapter(server) -> env(pubsub_server,  emqttd_server);
+adapter(pubsub) -> env(pubsub_adapter, emqttd_pubsub);
+adapter(bridge) -> env(bridge_adapter, emqttd_bridge).
+
 %%--------------------------------------------------------------------
 %% Debug
 %%--------------------------------------------------------------------

+ 1 - 1
src/emqttd_app.erl

@@ -91,7 +91,7 @@ start_servers(Sup) ->
                {"emqttd broker", emqttd_broker},
                {"emqttd alarm", emqttd_alarm},
                {"emqttd mod supervisor", emqttd_mod_sup},
-               {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
+               {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup_sup}},
                {"emqttd access control", emqttd_access_control},
                {"emqttd system monitor", {supervisor, emqttd_sysmon_sup}}],
     [start_server(Sup, Server) || Server <- Servers].

+ 8 - 39
src/emqttd_bridge_sup.erl

@@ -18,56 +18,25 @@
 
 -behavior(supervisor).
 
--export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]).
+-export([start_link/3]).
 
 -export([init/1]).
 
--define(BRIDGE_ID(Node, Topic), {bridge, Node, Topic}).
-
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
 
 %% @doc Start bridge supervisor
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-%% @doc List all bridges
--spec(bridges() -> [{tuple(), pid()}]).
-bridges() ->
-    [{{Node, Topic}, Pid} || {?BRIDGE_ID(Node, Topic), Pid, worker, _}
-                             <- supervisor:which_children(?MODULE)].
-
-%% @doc Start a bridge
--spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}).
-start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
-    start_bridge(Node, Topic, []).
-
--spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
-start_bridge(Node, _Topic, _Options) when Node =:= node() ->
-    {error, bridge_to_self};
-start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
-    Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options),
-    supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
-
-%% @doc Stop a bridge
--spec(stop_bridge(atom(), binary()) -> {ok, pid()} | ok).
-stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
-    ChildId = ?BRIDGE_ID(Node, Topic),
-    case supervisor:terminate_child(?MODULE, ChildId) of
-        ok    -> supervisor:delete_child(?MODULE, ChildId);
-        Error -> Error
-    end.
+-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
+start_link(Node, Topic, Options) ->
+    supervisor:start_link(?MODULE, [Node, Topic, Options]).
 
 %%--------------------------------------------------------------------
 %% Supervisor callbacks
 %%--------------------------------------------------------------------
 
-init([]) ->
-    {ok, {{one_for_one, 10, 100}, []}}.
-
-bridge_spec(Node, Topic, Options) ->
-    ChildId = ?BRIDGE_ID(Node, Topic),
-    {ChildId, {emqttd_bridge, start_link, [Node, Topic, Options]},
-        transient, 10000, worker, [emqttd_bridge]}.
+init([Node, Topic, Options]) ->
+    {ok, {{one_for_all, 10, 100},
+          [{bridge, {emqttd_bridge, start_link, [Node, Topic, Options]},
+            transient, 10000, worker, [emqttd_bridge]}]}}.
 

+ 1 - 9
src/emqttd_pubsub_sup.erl

@@ -57,17 +57,9 @@ pool_size(Env) ->
 
 pool_sup(Name, Env) ->
     Pool = list_to_atom(atom_to_list(Name) ++ "_pool"),
-    MFA = {adapter(Name), start_link, [Env]},
+    MFA = {emqttd:adapter(Name), start_link, [Env]},
     emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]).
 
-%%--------------------------------------------------------------------
-%% Adapter
-%%--------------------------------------------------------------------
-
-adapter(server) ->
-    emqttd:env(pubsub_server, emqttd_server);
-adapter(pubsub) ->
-    emqttd:env(pubsub_adapter, emqttd_pubsub).
 
 %%--------------------------------------------------------------------
 %% Create PubSub Tables