Jelajahi Sumber

Merge pull request #1379 from emqtt/develop

Version 2.3.2
huangdan 8 tahun lalu
induk
melakukan
b5c063f3cb
9 mengubah file dengan 109 tambahan dan 12 penghapusan
  1. 2 2
      Makefile
  2. 36 0
      etc/emq.conf
  3. 54 2
      priv/emq.schema
  4. 1 1
      src/emqttd.app.src
  5. 3 1
      src/emqttd_client.erl
  6. 2 4
      src/emqttd_protocol.erl
  7. 1 0
      src/emqttd_ws.erl
  8. 5 1
      src/emqttd_ws_client.erl
  9. 5 1
      test/emqttd_SUITE.erl

+ 2 - 2
Makefile

@@ -1,6 +1,6 @@
 PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
-PROJECT_VERSION = 2.3.1
+PROJECT_VERSION = 2.3.2
 
 DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
 
@@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqtt/cuttlefish
 
 TEST_DEPS = emqttc emq_dashboard
 dep_emqttc = git https://github.com/emqtt/emqttc
-dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard
+dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard develop
 
 TEST_ERLC_OPTS += +debug_info
 TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'

+ 36 - 0
etc/emq.conf

@@ -343,6 +343,10 @@ listener.tcp.external.access.2 = allow all
 ## TCP Socket Options
 listener.tcp.external.backlog = 1024
 
+listener.tcp.external.send_timeout = 15s
+
+listener.tcp.external.send_timeout_close = on
+
 #listener.tcp.external.recbuf = 4KB
 
 #listener.tcp.external.sndbuf = 4KB
@@ -371,6 +375,10 @@ listener.tcp.internal.max_clients = 102400
 ## TCP Socket Options
 listener.tcp.internal.backlog = 512
 
+listener.tcp.internal.send_timeout = 15s
+
+listener.tcp.external.send_timeout_close = on
+
 listener.tcp.internal.tune_buffer = on
 
 listener.tcp.internal.buffer = 1MB
@@ -477,6 +485,10 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 ## SSL Socket Options
 ## listener.ssl.external.backlog = 1024
 
+## listener.ssl.external.send_timeout = 15s
+
+## listener.ssl.external.send_timeout_close = on
+
 ## listener.ssl.external.recbuf = 4KB
 
 ## listener.ssl.external.sndbuf = 4KB
@@ -499,6 +511,10 @@ listener.ws.external.access.1 = allow all
 ## TCP Options
 listener.ws.external.backlog = 1024
 
+listener.ws.external.send_timeout = 15s
+
+listener.ws.external.send_timeout_close = on
+
 listener.ws.external.recbuf = 4KB
 
 listener.ws.external.sndbuf = 4KB
@@ -531,6 +547,20 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 
 ## listener.wss.external.fail_if_no_peer_cert = true
 
+listener.wss.external.backlog = 1024
+
+listener.wss.external.send_timeout = 15s
+
+listener.wss.external.send_timeout_close = on
+
+## listener.wss.external.recbuf = 4KB
+
+## listener.wss.external.sndbuf = 4KB
+
+## listener.wss.external.buffer = 4KB
+
+## listener.wss.external.nodelay = true
+
 ##--------------------------------------------------------------------
 ## HTTP Management API Listener
 
@@ -542,6 +572,12 @@ listener.api.mgmt.max_clients = 64
 
 listener.api.mgmt.access.1 = allow all
 
+listener.api.mgmt.backlog = 512
+
+listener.api.mgmt.send_timeout = 15s
+
+listener.api.mgmt.send_timeout_close = on
+
 ##-------------------------------------------------------------------
 ## System Monitor
 ##-------------------------------------------------------------------

+ 54 - 2
priv/emq.schema

@@ -805,8 +805,18 @@ end}.
 ]}.
 
 {mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [
-  {default, 1024},
-  {datatype, integer}
+  {datatype, integer},
+  {default, 1024}
+]}.
+
+{mapping, "listener.tcp.$name.send_timeout", "emqttd.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.tcp.$name.send_timeout_close", "emqttd.listeners", [
+  {datatype, flag},
+  {default, on}
 ]}.
 
 {mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [
@@ -883,6 +893,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.ssl.$name.send_timeout", "emqttd.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.ssl.$name.send_timeout_close", "emqttd.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [
   {datatype, bytesize},
   hidden
@@ -996,6 +1016,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.ws.$name.send_timeout", "emqttd.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.ws.$name.send_timeout_close", "emqttd.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [
   {datatype, bytesize},
   hidden
@@ -1059,6 +1089,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.wss.$name.send_timeout", "emqttd.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.wss.$name.send_timeout_close", "emqttd.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [
   {datatype, bytesize},
   hidden
@@ -1145,6 +1185,8 @@ end}.
               end,
     TcpOpts = fun(Prefix) ->
                    Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
+                           {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
+                           {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
                            {recbuf,  cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
                            {sndbuf,  cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
                            {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
@@ -1252,6 +1294,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.api.$name.send_timeout", "emqttd.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.api.$name.send_timeout_close", "emqttd.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.api.$name.recbuf", "emqttd.listeners", [
   {datatype, bytesize},
   hidden

+ 1 - 1
src/emqttd.app.src

@@ -1,6 +1,6 @@
 {application,emqttd,
              [{description,"Erlang MQTT Broker"},
-              {vsn,"2.3.1"},
+              {vsn,"2.3.2"},
               {modules,[]},
               {registered,[emqttd_sup]},
               {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,

+ 3 - 1
src/emqttd_client.erl

@@ -140,7 +140,9 @@ send_fun(Conn, Peername) ->
         ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
         emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
         try Conn:async_send(Data) of
-            true -> ok
+            ok -> ok;
+            true -> ok; %% Compatible with esockd 4.x
+            {error, Reason} -> Self ! {shutdown, Reason}
         catch
             error:Error -> Self ! {shutdown, Error}
         end

+ 2 - 4
src/emqttd_protocol.erl

@@ -341,13 +341,11 @@ send(Msg, State = #proto_state{client_id  = ClientId,
     emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
     send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
 
-send(Packet = ?PACKET(Type),
-     State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
+send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
     trace(send, Packet, State),
     emqttd_metrics:sent(Packet),
     SendFun(Packet),
-    Stats1 = inc_stats(send, Type, Stats),
-    {ok, State#proto_state{stats_data = Stats1}}.
+    {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}.
 
 trace(recv, Packet, ProtoState) ->
     ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);

+ 1 - 0
src/emqttd_ws.erl

@@ -38,6 +38,7 @@ handle_request(Req) ->
 %%--------------------------------------------------------------------
 %% MQTT Over WebSocket
 %%--------------------------------------------------------------------
+
 handle_request('GET', "/mqtt", Req) ->
     lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
     Upgrade = Req:get_header_value("Upgrade"),

+ 5 - 1
src/emqttd_ws_client.erl

@@ -272,10 +272,14 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 send_fun(ReplyChannel) ->
+    Self = self(),
     fun(Packet) ->
         Data = emqttd_serializer:serialize(Packet),
         emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
-        ReplyChannel({binary, Data})
+        case ReplyChannel({binary, Data}) of
+            ok -> ok;
+            {error, Reason} -> Self ! {shutdown, Reason}
+        end
     end.
 
 stat_fun(Conn) ->

+ 5 - 1
test/emqttd_SUITE.erl

@@ -328,7 +328,10 @@ router_print(_) ->
               #mqtt_route{topic = <<"#">>,     node = node()},
               #mqtt_route{topic = <<"+/#">>,   node = node()}],
     lists:foreach(fun(R) -> emqttd_router:add_route(R) end, Routes),
-    emqttd_router:print(<<"a/b/c">>).
+    emqttd_router:print(<<"a/b/c">>),
+    emqttd_router:del_route(<<"+/#">>),
+    emqttd_router:del_route(<<"a/b/c">>),
+    emqttd_router:del_route(<<"#">>).
 
 router_unused(_) ->
     gen_server:call(emqttd_router, bad_call),
@@ -598,6 +601,7 @@ conflict_listeners(_) ->
     L = proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners),
     ?assertEqual(1, proplists:get_value(current_clients, L)),
     ?assertEqual(1, proplists:get_value(conflict, proplists:get_value(shutdown_count, L))),
+    timer:sleep(100),
     emqttc:disconnect(C2).
 
 cli_vm(_) ->