Просмотр исходного кода

Merge pull request #1145 from emqtt/master

Merge emq22
turtleDeng 8 лет назад
Родитель
Сommit
523dd9c8b0
8 измененных файлов с 70 добавлено и 16 удалено
  1. 5 0
      Makefile
  2. 1 1
      src/emqttd.erl
  3. 16 1
      src/emqttd_app.erl
  4. 27 3
      src/emqttd_cli.erl
  5. 14 4
      src/emqttd_ctl.erl
  6. 3 3
      src/emqttd_metrics.erl
  7. 1 1
      src/emqttd_server.erl
  8. 3 3
      src/emqttd_session.erl

+ 5 - 0
Makefile

@@ -40,6 +40,11 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1
 
 COVER = true
 
+PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia
+DIALYZER_DIRS := ebin/
+DIALYZER_OPTS := --verbose --statistics -Werror_handling \
+                 -Wrace_conditions #-Wunmatched_returns
+
 include erlang.mk
 
 app:: rebar.config

+ 1 - 1
src/emqttd.erl

@@ -125,7 +125,7 @@ topics() -> emqttd_router:topics().
 subscribers(Topic) ->
     emqttd_server:subscribers(iolist_to_binary(Topic)).
 
--spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
+-spec(subscriptions(subscriber()) -> [{binary(), binary(), list(suboption())}]).
 subscriptions(Subscriber) ->
     emqttd_server:subscriptions(Subscriber).
 

+ 16 - 1
src/emqttd_app.erl

@@ -27,7 +27,7 @@
 %% Application callbacks
 -export([start/2, stop/1]).
 
--export([start_listener/1, stop_listener/1]).
+-export([start_listener/1, stop_listener/1, restart_listener/1]).
 
 -type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}).
 
@@ -214,6 +214,21 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == api ->
 stop_listener({Proto, ListenOn, _Opts}) ->
     esockd:close(Proto, ListenOn).
 
+%% @doc Restart Listeners
+restart_listener({tcp, ListenOn, _Opts}) ->
+    esockd:reopen('mqtt:tcp', ListenOn);
+restart_listener({ssl, ListenOn, _Opts}) ->
+    esockd:reopen('mqtt:ssl', ListenOn);
+restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
+    mochiweb:restart_http('mqtt:ws', ListenOn);
+restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
+    mochiweb:restart_http('mqtt:wss', ListenOn);
+restart_listener({Proto, ListenOn, _Opts}) when Proto == api ->
+    mochiweb:restart_http('mqtt:api', ListenOn);
+restart_listener({Proto, ListenOn, _Opts}) ->
+    esockd:reopen(Proto, ListenOn).
+
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 merge_sockopts_test_() ->

+ 27 - 3
src/emqttd_cli.erl

@@ -249,8 +249,6 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
            case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
                ok ->
                    ?PRINT_MSG("ok~n");
-               {error, already_existed} ->
-                   ?PRINT_MSG("Error: already existed~n");
                {error, Reason} ->
                    ?PRINT("Error: ~p~n", [Reason])
            end
@@ -477,8 +475,34 @@ listeners([]) ->
                         end, Info)
             end, esockd:listeners());
 
+listeners(["restart", Proto, ListenOn]) ->
+    ListenOn1 = case string:tokens(ListenOn, ":") of
+        [Port]     -> list_to_integer(Port);
+        [IP, Port] -> {IP, list_to_integer(Port)}
+    end,
+    case emqttd_app:restart_listener({list_to_atom(Proto), ListenOn1, []}) of
+        {ok, _Pid} ->
+            io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]);
+        {error, Error} ->
+            io:format("Failed to restart ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
+    end;
+
+listeners(["stop", Proto, ListenOn]) ->
+    ListenOn1 = case string:tokens(ListenOn, ":") of
+        [Port]     -> list_to_integer(Port);
+        [IP, Port] -> {IP, list_to_integer(Port)}
+    end,
+    case emqttd_app:stop_listener({list_to_atom(Proto), ListenOn1, []}) of
+        ok ->
+            io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
+        {error, Error} ->
+            io:format("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
+    end;
+
 listeners(_) ->
-    ?PRINT_CMD("listeners", "List listeners").
+    ?USAGE([{"listeners",                        "List listeners"},
+            {"listeners restart <Proto> <Port>", "Restart a listener"},
+            {"listeners stop    <Proto> <Port>", "Stop a listener"}]).
 
 %%--------------------------------------------------------------------
 %% Dump ETS

+ 14 - 4
src/emqttd_ctl.erl

@@ -64,14 +64,24 @@ cast(Msg) -> gen_server:cast(?SERVER, Msg).
 
 %% @doc Run a command
 -spec(run([string()]) -> any()).
-run([]) -> usage();
+run([]) -> usage(), ok;
 
-run(["help"]) -> usage();
+run(["help"]) -> usage(), ok;
 
 run([CmdS|Args]) ->
     case lookup(list_to_atom(CmdS)) of
-        [{Mod, Fun}] -> Mod:Fun(Args);
-        [] -> usage() 
+        [{Mod, Fun}] ->
+            try Mod:Fun(Args) of
+               _ -> ok
+            catch
+                _:Reason ->
+                    io:format("Reason:~p, get_stacktrace:~p~n",
+                              [Reason, erlang:get_stacktrace()]),
+                    {error, Reason}
+            end;
+        [] ->
+            usage(),
+            {error, cmd_not_found}
     end.
 
 %% @doc Lookup a command

+ 3 - 3
src/emqttd_metrics.erl

@@ -102,7 +102,7 @@ start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
 %% @doc Count packets received.
--spec(received(mqtt_packet()) -> ok).
+-spec(received(mqtt_packet()) -> ignore | non_neg_integer()).
 received(Packet) ->
     inc('packets/received'),
     received1(Packet).
@@ -140,7 +140,7 @@ qos_received(?QOS_2) ->
     inc('messages/qos2/received').
 
 %% @doc Count packets received. Will not count $SYS PUBLISH.
--spec(sent(mqtt_packet()) -> ok).
+-spec(sent(mqtt_packet()) -> ignore | non_neg_integer()).
 sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
     ignore;
 sent(Packet) ->
@@ -169,7 +169,7 @@ sent2(?UNSUBACK) ->
 sent2(?PINGRESP) ->
     inc('packets/pingresp');
 sent2(_Type) ->
-    ingore.
+    ignore.
 qos_sent(?QOS_0) ->
     inc('messages/qos0/sent');
 qos_sent(?QOS_1) ->

+ 1 - 1
src/emqttd_server.erl

@@ -130,7 +130,7 @@ async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
 setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
     call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
 
--spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]).
+-spec(subscriptions(emqttd:subscriber()) -> [{binary(), binary(), list(emqttd:suboption())}]).
 subscriptions(Subscriber) ->
     lists:map(fun({_, {_Share, Topic}}) ->
                 subscription(Topic, Subscriber);

+ 3 - 3
src/emqttd_session.erl

@@ -120,7 +120,7 @@
          retry_interval = 20000 :: timeout(),
 
          %% Retry Timer
-         retry_timer :: reference(),
+         retry_timer :: reference() | undefined,
 
          %% All QoS1, QoS2 messages published to when client is disconnected.
          %% QoS 1 and QoS 2 messages pending transmission to the Client.
@@ -138,13 +138,13 @@
          await_rel_timeout = 20000 :: timeout(),
 
          %% Awaiting PUBREL timer
-         await_rel_timer :: reference(),
+         await_rel_timer :: reference() | undefined,
 
          %% Session Expiry Interval
          expiry_interval = 7200000 :: timeout(),
 
          %% Expired Timer
-         expiry_timer :: reference(),
+         expiry_timer :: reference() | undefined,
 
          %% Enable Stats
          enable_stats :: boolean(),