소스 검색

Add cli listeners start|stop

turtled 8 년 전
부모
커밋
3e1c69dff1
3개의 변경된 파일141개의 추가작업 그리고 29개의 파일을 삭제
  1. 0 0
      priv/emq.schema
  2. 136 29
      src/emqttd_cli2.erl
  3. 5 0
      src/emqttd_session.erl

priv/emqttd.schema → priv/emq.schema


+ 136 - 29
src/emqttd_cli2.erl

@@ -12,7 +12,7 @@
 
 -behaviour(clique_handler).
 
--import(proplists, [get_value/2]).
+-import(proplists, [get_value/2, get_value/3]).
 
 -define(APP, emqttd).
 
@@ -35,7 +35,7 @@ run([]) ->
     All = clique_usage:find_all(),
     io:format("--------------------------------------------------------------------------------~n"),
     lists:foreach(fun({Cmd, Usage}) -> 
-        io:format("~p usage:", [Cmd]),
+        io:format("~p usage:", [cuttlefish_variable:format(Cmd)]),
         io:format("~ts", [Usage]),
         io:format("--------------------------------------------------------------------------------~n")
     end, lists:sort(All));
@@ -58,6 +58,7 @@ register_usage() ->
     clique:register_usage(["trace"],         trace_usage()),
     clique:register_usage(["status"],        status_usage()),
     clique:register_usage(["listeners"],     listeners_usage()),
+    clique:register_usage(["listeners", "start"], listener_start_usage()),
     clique:register_usage(["listeners", "stop"],listener_stop_usage()),
     clique:register_usage(["mnesia"],        mnesia_usage()).
 
@@ -118,6 +119,7 @@ register_cmd() ->
     trace_off(),
 
     listeners(),
+    listeners_start(),
     listeners_stop().
 
 node_status() ->
@@ -837,26 +839,119 @@ listeners() ->
         end,
     clique:register_command(Cmd, [], [], Callback).
 
-listeners_stop() ->
-    Cmd = ["listeners", "stop"],
-    KeySpecs = [{'address',  [{typecast, fun parse_addr/1}]},
-                {'port',  [{typecast, fun parse_port/1}]},
-                {'type',  [{typecast, fun parse_type/1}]}],
-    FlagSpecs = [{kill, [{shortname, "k"},
-                         {longname, "kill_sessions"}]}],
+listeners_start() ->
+    Cmd = ["listeners", "start"],
+    KeySpecs = [{'address',             [{typecast, fun parse_addr/1}]},
+                {'port',                [{typecast, fun parse_port/1}]},
+                {'type',                [{typecast, fun parse_type/1}]}],
+    FlagSpecs = [{acceptors,            [{longname, "acceptors"},
+                                         {typecast, fun(Acceptors) -> list_to_integer(Acceptors) end}]},
+                 {max_clients,          [{longname, "max_clients"},
+                                         {typecast, fun(MaxClients) -> list_to_integer(MaxClients) end}]},
+                 {buffer,               [{longname, "buffer"},
+                                         {typecast, fun(Buffer) -> list_to_integer(Buffer) end}]},
+                 {tls_versions,         [{longname, "tls_versions"},
+                                         {typecast, fun(TlsVersions) -> list_to_atom(TlsVersions) end}]},
+                 {handshake_timeout,    [{longname, "handshake_timeout"},
+                                         {typecast, fun(HandshakeTimeout) -> list_to_integer(HandshakeTimeout) end}]},
+                 {reuse_sessions,       [{longname, "reuse_sessions"},
+                                         {typecast, fun(ReuseSessions) -> list_to_atom(ReuseSessions) end}]},
+                 {keyfile,              [{longname, "keyfile"},
+                                         {typecast, fun(Keyfile) -> Keyfile end}]},
+                 {certfile,             [{longname, "certfile"},
+                                         {typecast, fun(Certfile) -> Certfile end}]},
+                 {cacertfile,           [{longname, "cacertfile"},
+                                         {typecast, fun(Cacertfile) -> Cacertfile end}]},
+                 {dhfile,               [{longname, "dhfile"},
+                                         {typecast, fun(Dhfile) -> Dhfile end}]},
+                 {verify,               [{longname, "verify"},
+                                         {typecast, fun(Verify) -> list_to_atom(Verify) end}]},
+                 {fail_if_no_peer_cert, [{longname, "fail_if_no_peer_cert"},
+                                         {typecast, fun(FailIfNoPeerCert) -> list_to_atom(FailIfNoPeerCert) end}]}],
     Callback =
         fun (_, Params, Flag) ->
             Address = get_value('address', Params),
             Port  = get_value('port', Params),
             Type = get_value('type', Params),
-            case Address of
-                undefined -> emqttd_app:stop_listener({Type, Port, []});
-                Address -> emqttd_app:stop_listener({Type, {Address, Port}, []})
+            Text = case {Type, Port}of
+                {undefined, _} ->
+                    io_lib:format("Invalid params type: ~p error~n", [Type]);
+                {_, undefined} ->
+                    io_lib:format("Invalid params port: ~p error~n", [Type]);
+                {_, _} ->
+                    ListenOn = case Address of
+                        undefined -> Port;
+                        _ -> {Address, Port}
+                    end,
+                    Opts = parse_opts(Type, Flag),
+                    case emqttd_app:start_listener({Type, ListenOn, Opts}) of
+                        {ok, _} -> 
+                            io_lib:format("Start mqtt:~p listen on ~p successfully", [Type, ListenOn]); 
+                        Error ->
+                            io_lib:format("Start mqtt:~p listen on ~p failed, error:~p~n", [Type, ListenOn, Error])
+                    end
             end,
-            [clique_status:text("aaa")]  
+            [clique_status:text(Text)]  
         end,
     clique:register_command(Cmd, KeySpecs, FlagSpecs, Callback).
 
+listeners_stop() ->
+    Cmd = ["listeners", "stop"],
+    KeySpecs = [{'address', [{typecast, fun parse_addr/1}]},
+                {'port',    [{typecast, fun parse_port/1}]},
+                {'type',    [{typecast, fun parse_type/1}]}],
+    Callback =
+        fun (_, Params, _) ->
+            Address = get_value('address', Params),
+            Port  = get_value('port', Params),
+            Type = get_value('type', Params),
+            Text = case {Type, Port}of
+                {undefined, _} ->
+                    io_lib:format("Invalid params type: ~p error~n", [Type]);
+                {_, undefined} ->
+                    io_lib:format("Invalid params port: ~p error~n", [Type]);
+                {_, _} ->
+                    case Address of
+                        undefined -> 
+                            emqttd_app:stop_listener({Type, Port, []}),
+                            io_lib:format("stopped mqtt:~p on ~p~n", [Type, Port]);
+                        Address -> 
+                            emqttd_app:stop_listener({Type, {Address, Port}, []}),
+                            io_lib:format("stopped mqtt:~p on ~p:~p~n", [Type, emqttd_net:ntoa(Address), Port])
+                    end
+            end,
+            [clique_status:text(Text)]  
+        end,
+    clique:register_command(Cmd, KeySpecs, [], Callback).
+
+parse_opts(Type, Opts) when Type == ssl 
+                        orelse Type == wss 
+                        orelse Type == https ->
+
+    OptList = [handshake_timeout, reuse_sessions, keyfile, certfile, 
+               cacertfile, dhfile, verify, fail_if_no_peer_cert],
+     SslOpts = lists:foldl(
+        fun(Opt, Acc) -> 
+            case get_value(Opt, Opts) of
+                undefined -> Acc;
+                OptVal    -> [[{Opt, OptVal}] | Acc]
+            end
+        end, [], OptList) ++
+     case get_value(tls_versions, Opts) of
+         undefined  -> [];
+         TlsVersions -> [{versions, [TlsVersions]}]
+     end,
+    case SslOpts of
+        [] -> parse_opts(undefined, Opts);
+        _  -> [{sslopts, SslOpts}] ++ parse_opts(undefined, Opts)
+    end;
+parse_opts(_Type, Opts) ->
+    Acceptors = get_value(acceptors, Opts, 4),
+    MaxClients = get_value(max_clients, Opts, 1024),
+    Buffer = get_value(buffer, Opts, 4096),
+    [{acceptors, Acceptors}, {max_clients, MaxClients}, {sockopts, [{buffer, Buffer}]}].
+
+
 parse_port(Port) ->
     case catch list_to_integer(Port) of
         P when (P >= 0) and (P=<65535) -> P;
@@ -920,11 +1015,11 @@ topics_usage() ->
      "  topics show topic=<Topic>    Show a topic\n"].
 
 subscriptions_usage() ->
-    ["\n  subscriptions list                                                      List all subscriptions\n",
-     "  subscriptions show client_id=<ClientId>                                 Show subscriptions of a client\n",
-     "  subscriptions subscribe client_id=<ClientId> topic=<Topic> qos=<Qos>    Add a static subscription manually\n",
-     "  subscriptions del client_id=<ClientId>                                  Delete static subscriptions manually\n",
-     "  subscriptions unsubscribe client_id=<ClientId> topic=<Topic>            Delete a static subscription manually\n"].
+    ["\n  subscriptions list                                                    List all subscriptions\n",
+     "  subscriptions show client_id=<ClientId>                               Show subscriptions of a client\n",
+     "  subscriptions subscribe client_id=<ClientId> topic=<Topic> qos=<Qos>  Add a static subscription manually\n",
+     "  subscriptions del client_id=<ClientId>                                Delete static subscriptions manually\n",
+     "  subscriptions unsubscribe client_id=<ClientId> topic=<Topic>          Delete a static subscription manually\n"].
 
 plugins_usage() ->
     ["\n  plugins list                           Show loaded plugins\n",
@@ -950,9 +1045,9 @@ vm_usage() ->
      "  vm ports      Show Ports of Erlang VM\n"].
 
 trace_usage() ->
-    ["\n  trace list                                                                      List all traces\n",
-     "  trace type=client|topic client_id=<ClientId> topic=<Topic> log_file=<LogFile>   Start tracing\n",
-     "  trace off type=client|topic client_id=<ClientId> topic=<Topic>                  Stop tracing\n"].
+    ["\n  trace list                                                                     List all traces\n",
+     "  trace type=client|topic client_id=<ClientId> topic=<Topic> log_file=<LogFile>  Start tracing\n",
+     "  trace off type=client|topic client_id=<ClientId> topic=<Topic>                 Stop tracing\n"].
 
 status_usage() ->
     ["\n  status info   Show broker status\n"].
@@ -960,16 +1055,28 @@ status_usage() ->
 listeners_usage() ->
     ["\n  listeners info     List listeners\n",
      "  listeners start    Create and start a listener\n",
-     "  listeners stop     Stop accepting new connections for a running listener\n",
-     "  listeners restart  Restart accepting new connections for a stopped listener\n",
-     "  listeners delete   Delete a stopped listener\n"].
+     "  listeners stop     Stop accepting new connections for a running listener\n"].
+
+listener_start_usage() ->
+    ["\n  listeners start address=<IpAddr> port=<Port> type=<tcp|ssl|ws|wss|http|https>\n",
+     "  Create and start a listener.\n",
+     "Options:\n",
+     "  --acceptors=<integer>                   Size of acceptor pool\n",
+     "  --max_clients=<integer>                 Maximum number of concurrent clients\n",
+     "  --buffer=<integer>                      TCP Socket Options\n",
+     "  --tls_versions=<tlsv1.2|tlsv1.1|tlsv1>  TLS protocol versions\n",
+     "  --handshake_timeout=<integer>           TLS handshake timeout\n",
+     "  --reuse_sessions=<true|false>           TLS allows clients to reuse pre-existing sessions\n",
+     "  --keyfile=<path>                        Path to the file containing the user's private PEM-encoded key\n",
+     "  --certfile=<path>                       Path to a file containing the user certificate\n",
+     "  --cacertfile=<path>                     Path to a file containing PEM-encoded CA certificates\n",
+     "  --dhfile=<path>                         Path to a file containing PEM-encoded Diffie Hellman\n",
+     "  --verify=<verify_none|verify_peer>      A server only does x509-path validation in mode\n",
+     "  --fail_if_no_peer_cert=<true|false>     Used together with {verify, verify_peer} by an SSL server\n"].
 
 listener_stop_usage() ->
-    ["\n  listeners stop address=IpAddr port=Port\n",
-     "  Stops accepting new connections on a running listener.\n",
-     "Options\n",
-     "  -k, --kill_sessions\n"
-     "      kills all sessions accepted with this listener.\n"].
+    ["\n  listeners stop address=<IpAddr> port=<Port> type=<tcp|ssl|ws|wss|http|https>\n",
+     "  Stops accepting new connections on a running listener.\n"].
 
 mnesia_usage() ->
     ["\n  mnesia info   Mnesia system info\n"].

+ 5 - 0
src/emqttd_session.erl

@@ -559,6 +559,11 @@ handle_info({'EXIT', ClientPid, _Reason},
             State = #state{clean_sess = true, client_pid = ClientPid}) ->
     {stop, normal, State};
 
+%% ClientPid was killed
+handle_info({'EXIT', ClientPid, killed}, State) ->
+    ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, killed], State),
+    shutdown(killed, State);
+
 handle_info({'EXIT', ClientPid, Reason},
             State = #state{clean_sess      = false,
                            client_pid      = ClientPid,