Quellcode durchsuchen

Support to pass ws compressing options (#2356)

Add new config entries about websocket
Gilbert vor 7 Jahren
Ursprung
Commit
2534b8dc64
5 geänderte Dateien mit 294 neuen und 29 gelöschten Zeilen
  1. 1 1
      Makefile
  2. 149 12
      etc/emqx.conf
  3. 130 11
      priv/emqx.schema
  4. 1 2
      rebar.config
  5. 13 3
      src/emqx_ws_connection.erl

+ 1 - 1
Makefile

@@ -10,7 +10,7 @@ dep_gproc   = git-emqx https://github.com/uwiger/gproc 0.8.0
 dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1
 dep_esockd  = git-emqx https://github.com/emqx/esockd v5.4.4
 dep_ekka    = git-emqx https://github.com/emqx/ekka v0.5.3
-dep_cowboy  = git-emqx https://github.com/ninenines/cowboy 2.4.0
+dep_cowboy  = git-emqx https://github.com/ninenines/cowboy 2.6.1
 dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
 
 NO_AUTOPATCH = cuttlefish

+ 149 - 12
etc/emqx.conf

@@ -1261,7 +1261,7 @@ listener.ws.external.zone = external
 
 ## The access control for the MQTT/WebSocket listener.
 ##
-## See: listener.tcp.$name.access
+## See: listener.ws.$name.access
 ##
 ## Value: ACL Rule
 listener.ws.external.access.1 = allow all
@@ -1286,74 +1286,143 @@ listener.ws.external.verify_protocol_header = on
 ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
 ## HAProxy or Nginx.
 ##
-## See: listener.tcp.$name.proxy_protocol
+## See: listener.ws.$name.proxy_protocol
 ##
 ## Value: on | off
 ## listener.ws.external.proxy_protocol = on
 
 ## Sets the timeout for proxy protocol.
 ##
-## See: listener.tcp.$name.proxy_protocol_timeout
+## See: listener.ws.$name.proxy_protocol_timeout
 ##
 ## Value: Duration
 ## listener.ws.external.proxy_protocol_timeout = 3s
 
 ## The TCP backlog of external MQTT/WebSocket Listener.
 ##
-## See: listener.tcp.$name.backlog
+## See: listener.ws.$name.backlog
 ##
 ## Value: Number >= 0
 listener.ws.external.backlog = 1024
 
 ## The TCP send timeout for external MQTT/WebSocket connections.
 ##
-## See: listener.tcp.$name.send_timeout
+## See: listener.ws.$name.send_timeout
 ##
 ## Value: Duration
 listener.ws.external.send_timeout = 15s
 
 ## Close the MQTT/WebSocket connection if send timeout.
 ##
-## See: listener.tcp.$name.send_timeout_close
+## See: listener.ws.$name.send_timeout_close
 ##
 ## Value: on | off
 listener.ws.external.send_timeout_close = on
 
 ## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections.
 ##
-## See: listener.tcp.$name.recbuf
+## See: listener.ws.$name.recbuf
 ##
 ## Value: Bytes
 ## listener.ws.external.recbuf = 2KB
 
 ## The TCP send buffer(os kernel) for external MQTT/WebSocket connections.
 ##
-## See: listener.tcp.$name.sndbuf
+## See: listener.ws.$name.sndbuf
 ##
 ## Value: Bytes
 ## listener.ws.external.sndbuf = 2KB
 
 ## The size of the user-level software buffer used by the driver.
 ##
-## See: listener.tcp.$name.buffer
+## See: listener.ws.$name.buffer
 ##
 ## Value: Bytes
 ## listener.ws.external.buffer = 2KB
 
 ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled.
 ##
-## See: listener.tcp.$name.tune_buffer
+## See: listener.ws.$name.tune_buffer
 ##
 ## Value: on | off
 ## listener.ws.external.tune_buffer = off
 
 ## The TCP_NODELAY flag for external MQTT/WebSocket connections.
 ##
-## See: listener.tcp.$name.nodelay
+## See: listener.ws.$name.nodelay
 ##
 ## Value: true | false
 listener.ws.external.nodelay = true
 
+## The compress flag for external MQTT/WebSocket connections.
+##
+## If this Value is set true,the websocket message would be compressed
+##
+## Value: true | false
+## listener.ws.external.compress = true
+
+## The level of deflate options for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.deflate_opts.level
+##
+## Value: none | default | best_compression | best_speed
+## listener.ws.external.deflate_opts.level = default
+
+## The mem_level of deflate options for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.deflate_opts.mem_level
+##
+## Valid range is 1-9
+## listener.ws.external.deflate_opts.mem_level = 8
+
+## The strategy of deflate options for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.deflate_opts.strategy
+##
+## Value: default | filtered | huffman_only | rle
+## listener.ws.external.deflate_opts.strategy = default
+
+## The deflate option for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.deflate_opts.server_context_takeover
+##
+## Value: takeover | no_takeover
+## listener.ws.external.deflate_opts.server_context_takeover = takeover
+
+## The deflate option for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.deflate_opts.client_context_takeover
+##
+## Value: takeover | no_takeover
+## listener.ws.external.deflate_opts.client_context_takeover = takeover
+
+## The deflate options for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.deflate_opts.server_max_window_bits
+##
+## Valid range is 8-15
+## listener.ws.external.deflate_opts.server_max_window_bits = 15
+
+## The deflate options for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.deflate_opts.client_max_window_bits
+##
+## Valid range is 8-15
+## listener.ws.external.deflate_opts.client_max_window_bits = 15
+
+## The idle timeout for external MQTT/WebSocket connections.
+##
+## See: listener.ws.$name.idle_timeout
+##
+## Value: Duration
+## listener.ws.external.idle_timeout = 60s
+
+## The max frame size for external MQTT/WebSocket connections.
+##
+##
+## Value: Number
+## listener.ws.external.max_frame_size = 0
+
 ##--------------------------------------------------------------------
 ## External WebSocket/SSL listener for MQTT Protocol
 
@@ -1486,7 +1555,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
 ## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot
 ## be configured at the same time.
 ## See 'https://tools.ietf.org/html/rfc4279#section-2'.
-#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
+## listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
 
 ## See: listener.ssl.$name.secure_renegotiate
 ##
@@ -1557,6 +1626,74 @@ listener.wss.external.send_timeout_close = on
 ## Value: true | false
 ## listener.wss.external.nodelay = true
 
+## The compress flag for external WebSocket/SSL connections.
+##
+## If this Value is set true,the websocket message would be compressed
+##
+## Value: true | false
+## listener.wss.external.compress = true
+
+## The level of deflate options for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.deflate_opts.level
+##
+## Value: none | default | best_compression | best_speed
+## listener.wss.external.deflate_opts.level = default
+
+## The mem_level of deflate options for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.deflate_opts.mem_level
+##
+## Valid range is 1-9
+## listener.wss.external.deflate_opts.mem_level = 8
+
+## The strategy of deflate options for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.deflate_opts.strategy
+##
+## Value: default | filtered | huffman_only | rle
+## listener.wss.external.deflate_opts.strategy = default
+
+## The deflate option for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.deflate_opts.server_context_takeover
+##
+## Value: takeover | no_takeover
+## listener.wss.external.deflate_opts.server_context_takeover = takeover
+
+## The deflate option for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.deflate_opts.client_context_takeover
+##
+## Value: takeover | no_takeover
+## listener.wss.external.deflate_opts.client_context_takeover = takeover
+
+## The deflate options for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.deflate_opts.server_max_window_bits
+##
+## Valid range is 8-15
+## listener.wss.external.deflate_opts.server_max_window_bits = 15
+
+## The deflate options for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.deflate_opts.client_max_window_bits
+##
+## Valid range is 8-15
+## listener.wss.external.deflate_opts.client_max_window_bits = 15
+
+## The idle timeout for external WebSocket/SSL connections.
+##
+## See: listener.wss.$name.idle_timeout
+##
+## Value: Duration
+## listener.wss.external.idle_timeout = 60s
+
+## The max frame size for external WebSocket/SSL connections.
+##
+## Value: Number
+## listener.wss.external.max_frame_size = 0
+
 ##--------------------------------------------------------------------
 ## Bridges
 ##--------------------------------------------------------------------

+ 130 - 11
priv/emqx.schema

@@ -1238,6 +1238,57 @@ end}.
   hidden
 ]}.
 
+{mapping, "listener.ws.$name.compress", "emqx.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [
+  {datatype, {enum, [none, default, best_compression, best_speed]}},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [
+  {datatype, integer},
+  {validators, ["range:1-9"]},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [
+  {datatype, {enum, [default, filtered, huffman_only, rle]}},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [
+  {datatype, {enum, [takeover, no_takeover]}},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [
+  {datatype, {enum, [takeover, no_takeover]}},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [
+  {datatype, {duration, ms}},
+  hidden
+]}.
+
+{mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
 %%--------------------------------------------------------------------
 %% MQTT/WebSocket/SSL Listeners
 
@@ -1393,6 +1444,61 @@ end}.
   {datatype, {enum, [cn, dn, crt]}}
 ]}.
 
+{mapping, "listener.wss.$name.compress", "emqx.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [
+  {datatype, {enum, [none, default, best_compression, best_speed]}},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [
+  {datatype, integer},
+  {validators, ["range:1-9"]},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [
+  {datatype, {enum, [default, filtered, huffman_only, rle]}},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [
+  {datatype, {enum, [takeover, no_takeover]}},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [
+  {datatype, {enum, [takeover, no_takeover]}},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [
+  {datatype, integer},
+  {validators, ["range:8-15"]},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [
+  {datatype, integer},
+  {validators, ["range:8-15"]},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [
+  {datatype, {duration, ms}},
+  hidden
+]}.
+
+{mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [
+  {datatype, integer},
+  hidden
+]}.
+
+
+
 {translation, "emqx.listeners", fun(Conf) ->
 
     Filter  = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
@@ -1431,19 +1537,30 @@ end}.
                           {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
                           {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
                           {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
+                          {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
+                          {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
+                          {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)},
                           {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)])
               end,
+    DeflateOpts = fun(Prefix) ->
+                      Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},
+                              {mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)},
+                              {strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)},
+                              {server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)},
+                              {client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)},
+                              {server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)},
+                              {client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}])
+                  end,
     TcpOpts = fun(Prefix) ->
-                   Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
-                           {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
-                           {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
-                           {recbuf,  cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
-                           {sndbuf,  cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
-                           {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
-                           {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
-                           {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
+                  Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
+                          {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
+                          {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
+                          {recbuf,  cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
+                          {sndbuf,  cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
+                          {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
+                          {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
+                          {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
               end,
-
     SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
     MapPSKCiphers = fun(PSKCiphers) ->
                       lists:map(
@@ -1496,7 +1613,8 @@ end}.
                        case cuttlefish:conf_get(Prefix, Conf, undefined) of
                            undefined -> [];
                            ListenOn  ->
-                               [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
+                               [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
+                                                        {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
                        end
                    end,
 
@@ -1506,7 +1624,8 @@ end}.
                            undefined ->
                                [];
                            ListenOn ->
-                               [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)},
+                               [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
+                                                        {tcp_options, TcpOpts(Prefix)},
                                                         {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}]
                        end
                    end,

+ 1 - 2
rebar.config

@@ -1,6 +1,6 @@
 {deps, [{jsx, "2.9.0"},
         {gproc, "0.8.0"},
-        {cowboy, "2.4.0"},
+        {cowboy, "2.6.1"},
         {meck, "0.8.13"} %% temp workaround for version check
        ]}.
 
@@ -28,4 +28,3 @@
 {cover_export_enabled, true}.
 
 {plugins, [coveralls]}.
-

+ 13 - 3
src/emqx_ws_connection.erl

@@ -113,12 +113,23 @@ call(WSPid, Req) when is_pid(WSPid) ->
 %%------------------------------------------------------------------------------
 
 init(Req, Opts) ->
+    IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000),
+    DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
+    MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
+                       0 -> infinity;
+                       MFS -> MFS
+                   end,
+    Compress = proplists:get_value(compress, Opts, false),
+    Options = #{compress => Compress,
+                deflate_opts => DeflateOptions,
+                max_frame_size => MaxFrameSize,
+                idle_timeout => IdleTimeout},
     case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
         undefined ->
-            {cowboy_websocket, Req, #state{}};
+            {cowboy_websocket, Req, #state{}, Options};
         [<<"mqtt", Vsn/binary>>] ->
             Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req),
-            {cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}};
+            {cowboy_websocket, Resp, #state{request = Req, options = Opts}, Options};
         _ ->
             {ok, cowboy_req:reply(400, Req), #state{}}
     end.
@@ -308,4 +319,3 @@ shutdown(Reason, State) ->
 
 wsock_stats() ->
     [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].
-