Kaynağa Gözat

uupdate go timeout

hejin 11 yıl önce
ebeveyn
işleme
acc491c799
2 değiştirilmiş dosya ile 15 ekleme ve 5 silme
  1. 7 4
      src/emqtt_client.erl
  2. 8 1
      src/emqtt_sup.erl

+ 7 - 4
src/emqtt_client.erl

@@ -59,7 +59,7 @@ start_link() ->
     gen_server2:start_link(?MODULE, [], []).
 
 go(Pid, Sock) ->
-	gen_server2:call(Pid, {go, Sock}).
+	gen_server2:call(Pid, {go, Sock}, infinity).
 
 info(Pid) ->
 	gen_server2:call(Pid, info).
@@ -85,7 +85,7 @@ handle_call({go, Sock}, _From, _State) ->
     {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
 	%FIXME: merge to registry
 	emqtt_client_monitor:mon(self()),
-    ?INFO("accepting connection (~s)", [ConnStr]),
+    ?ERROR("accepting connection (~s)", [ConnStr]),
     {reply, ok, 
 	  control_throttle(
        #state{ socket           = Sock,
@@ -163,6 +163,7 @@ handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
 	end;
 
 handle_info(Info, State) ->
+	?ERROR("unext info :~p",[Info]),
 	{stop, {badinfo, Info}, State}.
 
 terminate(_Reason, #state{client_id=ClientId, keep_alive=KeepAlive}) ->
@@ -252,7 +253,7 @@ process_request(?CONNECT,
                         ?ERROR_MSG("MQTT login failed - no credentials"),
                         {?CONNACK_CREDENTIALS, State};
                     true ->
-						?INFO("connect from clientid: ~s, ~p", [ClientId, AlivePeriod]),
+						?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
 						ok = emqtt_registry:register(ClientId, self()),
 						KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
 						{?CONNACK_ACCEPT,
@@ -261,6 +262,7 @@ process_request(?CONNECT,
 											 keep_alive = KeepAlive}}
                 end
         end,
+		?INFO("recv conn...:~p", [ReturnCode]),
 		send_frame(Sock, #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?CONNACK},
 								 variable = #mqtt_frame_connack{
                                          return_code = ReturnCode }}),
@@ -394,12 +396,13 @@ send_will_msg(#state{will_msg = WillMsg }) ->
 	emqtt_router:publish(WillMsg).
 
 send_frame(Sock, Frame) ->
+	?INFO("send frame:~p", [Frame]),
     erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
 
 %%----------------------------------------------------------------------------
 network_error(Reason,
               State = #state{ conn_name  = ConnStr}) ->
-    ?INFO("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
+    ?ERROR("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
     send_will_msg(State),
     % todo: flush channel after publish
     stop({shutdown, conn_closed}, State).

+ 8 - 1
src/emqtt_sup.erl

@@ -52,14 +52,21 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
 %% ===================================================================
 
 init([Listeners]) ->
+	Listeners2 = lists:map(fun({Port, Args}) ->
+								{Port, Args};
+							({Port, Size, Args}) ->
+								[{Port+I, Args} || I <- lists:seq(0,Size)]
+		end, Listeners),
+	
     {ok, { {one_for_all, 5, 10}, [
+		?CHILD(emqtt_monitor, worker),
 		?CHILD(emqtt_auth, worker),
 		?CHILD(emqtt_retained, worker),
 		?CHILD(emqtt_router, worker),
 		?CHILD(emqtt_registry, worker),
 		?CHILD(emqtt_client_monitor, worker),
 		?CHILD(emqtt_client_sup, supervisor)
-		| listener_children(Listeners) ]}
+		| listener_children(lists:flatten(Listeners2)) ]}
 	}.
 
 listener_children(Listeners) ->