Prechádzať zdrojové kódy

start_listeners, stop_listeners

Feng 10 rokov pred
rodič
commit
67f1e15d4a
1 zmenil súbory, kde vykonal 90 pridanie a 9 odobranie
  1. 90 9
      src/emqttd_app.erl

+ 90 - 9
src/emqttd_app.erl

@@ -16,18 +16,30 @@
 
 
 -module(emqttd_app).
 -module(emqttd_app).
 
 
--include("emqttd_cli.hrl").
-
 -behaviour(application).
 -behaviour(application).
 
 
+-include("emqttd_cli.hrl").
+
 %% Application callbacks
 %% Application callbacks
 -export([start/2, stop/1]).
 -export([start/2, stop/1]).
 
 
+-export([start_listener/1, stop_listener/1, is_mod_enabled/1]).
+
+%% MQTT SockOpts
+-define(MQTT_SOCKOPTS, [
+        binary,
+        {packet,    raw},
+        {reuseaddr, true},
+        {backlog,   512},
+        {nodelay,   true}]).
+
+-type listener() :: {atom(), inet:port_number(), [esockd:option()]}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Application callbacks
 %% Application callbacks
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
--spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when 
+-spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when
     StartType :: normal | {takeover, node()} | {failover, node()},
     StartType :: normal | {takeover, node()} | {failover, node()},
     StartArgs :: term(),
     StartArgs :: term(),
     State     :: term(),
     State     :: term(),
@@ -38,13 +50,21 @@ start(_StartType, _StartArgs) ->
     {ok, Sup} = emqttd_sup:start_link(),
     {ok, Sup} = emqttd_sup:start_link(),
     start_servers(Sup),
     start_servers(Sup),
     emqttd_cli:load(),
     emqttd_cli:load(),
-    emqttd:load_all_mods(),
+    load_all_mods(),
     emqttd_plugins:load(),
     emqttd_plugins:load(),
-    emqttd:start_listeners(),
+    start_listeners(),
     register(emqttd, self()),
     register(emqttd, self()),
     print_vsn(),
     print_vsn(),
     {ok, Sup}.
     {ok, Sup}.
 
 
+-spec stop(State :: term()) -> term().
+stop(_State) ->
+    catch stop_listeners().
+
+%%--------------------------------------------------------------------
+%% Print Banner
+%%--------------------------------------------------------------------
+
 print_banner() ->
 print_banner() ->
     ?PRINT("starting emqttd on node '~s'~n", [node()]).
     ?PRINT("starting emqttd on node '~s'~n", [node()]).
 
 
@@ -53,14 +73,18 @@ print_vsn() ->
     {ok, Desc} = application:get_key(description),
     {ok, Desc} = application:get_key(description),
     ?PRINT("~s ~s is running now~n", [Desc, Vsn]).
     ?PRINT("~s ~s is running now~n", [Desc, Vsn]).
 
 
+%%--------------------------------------------------------------------
+%% Start Servers
+%%--------------------------------------------------------------------
+
 start_servers(Sup) ->
 start_servers(Sup) ->
     Servers = [{"emqttd ctl", emqttd_ctl},
     Servers = [{"emqttd ctl", emqttd_ctl},
-               {"emqttd trace", {supervisor, emqttd_trace_sup}},
                {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
                {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
                {"emqttd stats", emqttd_stats},
                {"emqttd stats", emqttd_stats},
                {"emqttd metrics", emqttd_metrics},
                {"emqttd metrics", emqttd_metrics},
                {"emqttd retainer", emqttd_retainer},
                {"emqttd retainer", emqttd_retainer},
                {"emqttd pooler", {supervisor, emqttd_pooler}},
                {"emqttd pooler", {supervisor, emqttd_pooler}},
+               {"emqttd trace", {supervisor, emqttd_trace_sup}},
                {"emqttd client manager", {supervisor, emqttd_cm_sup}},
                {"emqttd client manager", {supervisor, emqttd_cm_sup}},
                {"emqttd session manager", {supervisor, emqttd_sm_sup}},
                {"emqttd session manager", {supervisor, emqttd_sm_sup}},
                {"emqttd session supervisor", {supervisor, emqttd_session_sup}},
                {"emqttd session supervisor", {supervisor, emqttd_session_sup}},
@@ -117,7 +141,64 @@ worker_spec(Module, Opts) when is_atom(Module) ->
 worker_spec(M, F, A) ->
 worker_spec(M, F, A) ->
     {M, {M, F, A}, permanent, 10000, worker, [M]}.
     {M, {M, F, A}, permanent, 10000, worker, [M]}.
 
 
--spec stop(State :: term()) -> term().
-stop(_State) ->
-    catch emqttd:stop_listeners().
+%%--------------------------------------------------------------------
+%% Load Modules
+%%--------------------------------------------------------------------
+
+%% @doc load all modules
+load_all_mods() ->
+    lists:foreach(fun load_mod/1, emqttd:env(modules)).
+
+load_mod({Name, Opts}) ->
+    Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
+    case catch Mod:load(Opts) of
+        ok               -> lager:info("Load module ~s successfully", [Name]);
+        {error, Error}   -> lager:error("Load module ~s error: ~p", [Name, Error]);
+        {'EXIT', Reason} -> lager:error("Load module ~s error: ~p", [Name, Reason])
+    end.
+
+%% @doc Is module enabled?
+-spec is_mod_enabled(Name :: atom()) -> boolean().
+is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined.
+
+%%--------------------------------------------------------------------
+%% Start Listeners
+%%--------------------------------------------------------------------
+
+%% @doc Start Listeners of the broker.
+-spec start_listeners() -> any().
+start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)).
+
+%% Start mqtt listener
+-spec start_listener(listener()) -> any().
+start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts);
+
+%% Start mqtt(SSL) listener
+start_listener({mqtts, Port, Opts}) -> start_listener(mqtts, Port, Opts);
+
+%% Start http listener
+start_listener({http, Port, Opts}) ->
+    mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []});
+
+%% Start https listener
+start_listener({https, Port, Opts}) ->
+    mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}).
+
+start_listener(Protocol, Port, Opts) ->
+    MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]},
+    esockd:open(Protocol, Port, merge_sockopts(Opts), MFArgs).
+
+merge_sockopts(Options) ->
+    SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS,
+                                 proplists:get_value(sockopts, Options, [])),
+    emqttd_opts:merge(Options, [{sockopts, SockOpts}]).
+
+%%--------------------------------------------------------------------
+%% Stop Listeners
+%%--------------------------------------------------------------------
+
+%% @doc Stop Listeners
+stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)).
+
+stop_listener({Protocol, Port, _Opts}) -> esockd:close({Protocol, Port}).