erylee 13 лет назад
Родитель
Сommit
0fabf9feb2
3 измененных файлов с 107 добавлено и 8 удалено
  1. 7 6
      etc/emqtt.config
  2. 14 2
      src/emqtt_client.erl
  3. 86 0
      src/emqtt_net.erl

+ 7 - 6
etc/emqtt.config

@@ -12,12 +12,13 @@
  ]},
  ]},
  {emqtt, [
  {emqtt, [
     {tcp_listeners, [1883]},
     {tcp_listeners, [1883]},
-    {tcp_listen_options, [
-        binary,
-        {packet,    raw},
-        {reuseaddr, true},
-        {backlog,   128},
-        {nodelay,   true}]}
+	{tcp_listen_options, [binary,
+					   {packet,        raw},
+					   {reuseaddr,     true},
+					   {backlog,       128},
+					   {nodelay,       true},
+					   {linger,        {true, 0}},
+					   {exit_on_close, false}]}
  ]}
  ]}
 ].
 ].
 
 

+ 14 - 2
src/emqtt_client.erl

@@ -23,8 +23,12 @@ init([]) ->
     {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}.
     {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}.
 
 
 handle_call({go, Sock}, _From, State) ->
 handle_call({go, Sock}, _From, State) ->
-	error_logger:info_msg("go.... sock: ~p", [Sock]),
-	inet:setopts(Sock, [{active, true}]),
+    process_flag(trap_exit, true),
+    ok = throw_on_error(
+           inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end),
+    {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
+    error_logger:info_msg("accepting MQTT connection (~s)~n", [ConnStr]),
+	%inet:setopts(Sock, [{active, once}]),
 	{reply, ok, State}.
 	{reply, ok, State}.
 
 
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
@@ -39,3 +43,11 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
 	
 	
+throw_on_error(E, Thunk) ->
+    case Thunk() of
+        {error, Reason} -> throw({E, Reason});
+        {ok, Res}       -> Res;
+        Res             -> Res
+    end.
+
+

+ 86 - 0
src/emqtt_net.erl

@@ -0,0 +1,86 @@
+-module(emqtt_net).
+
+-export([tune_buffer_size/1, connection_string/2]).
+
+-include_lib("kernel/include/inet.hrl").
+
+tune_buffer_size(Sock) ->
+    case getopts(Sock, [sndbuf, recbuf, buffer]) of
+        {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
+                          setopts(Sock, [{buffer, BufSz}]);
+        Err            -> Err
+    end.
+
+connection_string(Sock, Direction) ->
+    case socket_ends(Sock, Direction) of
+        {ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
+            {ok, format(
+                   "~s:~p -> ~s:~p",
+                   [maybe_ntoab(FromAddress), FromPort,
+                    maybe_ntoab(ToAddress),   ToPort])};
+        Error ->
+            Error
+    end.
+
+format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
+
+socket_ends(Sock, Direction) ->
+    {From, To} = sock_funs(Direction),
+    case {From(Sock), To(Sock)} of
+        {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
+            {ok, {rdns(FromAddress), FromPort,
+                  rdns(ToAddress),   ToPort}};
+        {{error, _Reason} = Error, _} ->
+            Error;
+        {_, {error, _Reason} = Error} ->
+            Error
+    end.
+
+maybe_ntoab(Addr) when is_tuple(Addr) -> ntoab(Addr);
+maybe_ntoab(Host)                     -> Host.
+
+rdns(Addr) -> Addr.
+
+sock_funs(inbound)  -> {fun peername/1, fun sockname/1};
+sock_funs(outbound) -> {fun sockname/1, fun peername/1}.
+
+getopts(Sock, Options) when is_port(Sock) ->
+    inet:getopts(Sock, Options).
+
+setopts(Sock, Options) when is_port(Sock) ->
+    inet:setopts(Sock, Options).
+
+sockname(Sock)   when is_port(Sock) -> inet:sockname(Sock).
+
+peername(Sock)   when is_port(Sock) -> inet:peername(Sock).
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+    inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
+ntoa(IP) ->
+    inet_parse:ntoa(IP).
+
+ntoab(IP) ->
+    Str = ntoa(IP),
+    case string:str(Str, ":") of
+        0 -> Str;
+        _ -> "[" ++ Str ++ "]"
+    end.
+
+tcp_host({0,0,0,0}) ->
+    hostname();
+
+tcp_host({0,0,0,0,0,0,0,0}) ->
+    hostname();
+
+tcp_host(IPAddress) ->
+    case inet:gethostbyaddr(IPAddress) of
+        {ok, #hostent{h_name = Name}} -> Name;
+        {error, _Reason} -> ntoa(IPAddress)
+    end.
+
+hostname() ->
+    {ok, Hostname} = inet:gethostname(),
+    case inet:gethostbyname(Hostname) of
+        {ok,    #hostent{h_name = Name}} -> Name;
+        {error, _Reason}                 -> Hostname
+    end.