Browse Source

feature(mqtt piggyback): transporting mutli MQTT packets at once or single

zhouzb 5 years ago
parent
commit
ec12acc4ef
3 changed files with 49 additions and 19 deletions
  1. 10 0
      etc/emqx.conf
  2. 13 2
      priv/emqx.schema
  3. 26 17
      src/emqx_ws_connection.erl

+ 10 - 0
etc/emqx.conf

@@ -1651,6 +1651,11 @@ listener.ws.external.nodelay = true
 ## Value: Number
 ## Value: Number
 ## listener.ws.external.max_frame_size = 0
 ## listener.ws.external.max_frame_size = 0
 
 
+## Whether a WebSocket message is allowed to contain multiple MQTT packets
+##
+## Value: single | multiple
+listener.ws.external.mqtt_piggyback = multiple
+
 ##--------------------------------------------------------------------
 ##--------------------------------------------------------------------
 ## External WebSocket/SSL listener for MQTT Protocol
 ## External WebSocket/SSL listener for MQTT Protocol
 
 
@@ -1911,6 +1916,11 @@ listener.wss.external.send_timeout_close = on
 ## Value: Number
 ## Value: Number
 ## listener.wss.external.max_frame_size = 0
 ## listener.wss.external.max_frame_size = 0
 
 
+## Whether a WebSocket message is allowed to contain multiple MQTT packets
+##
+## Value: single | multiple
+listener.wss.external.mqtt_piggyback = multiple
+
 ##--------------------------------------------------------------------
 ##--------------------------------------------------------------------
 ## Modules
 ## Modules
 ##--------------------------------------------------------------------
 ##--------------------------------------------------------------------

+ 13 - 2
priv/emqx.schema

@@ -1540,6 +1540,12 @@ end}.
   hidden
   hidden
 ]}.
 ]}.
 
 
+{mapping, "listener.ws.$name.mqtt_piggyback", "emqx.listeners", [
+  {datatype, {enum, [single, multiple]}},
+  {default, multiple},
+  hidden
+]}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% MQTT/WebSocket/SSL Listeners
 %% MQTT/WebSocket/SSL Listeners
 
 
@@ -1743,7 +1749,11 @@ end}.
   hidden
   hidden
 ]}.
 ]}.
 
 
-
+{mapping, "listener.wss.$name.mqtt_piggyback", "emqx.listeners", [
+  {datatype, {enum, [single, multiple]}},
+  {default, multiple},
+  hidden
+]}.
 
 
 {translation, "emqx.listeners", fun(Conf) ->
 {translation, "emqx.listeners", fun(Conf) ->
 
 
@@ -1793,7 +1803,8 @@ end}.
                           {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
                           {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
                           {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
                           {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
                           {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
                           {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
-                          {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)} | AccOpts(Prefix)])
+                          {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)},
+                          {mqtt_piggyback, cuttlefish:conf_get(Prefix ++ ".mqtt_piggyback", Conf, undefined)} | AccOpts(Prefix)])
               end,
               end,
     DeflateOpts = fun(Prefix) ->
     DeflateOpts = fun(Prefix) ->
                       Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},
                       Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},

+ 26 - 17
src/emqx_ws_connection.erl

@@ -62,6 +62,8 @@
           sockstate :: emqx_types:sockstate(),
           sockstate :: emqx_types:sockstate(),
           %% Simulate the active_n opt
           %% Simulate the active_n opt
           active_n :: pos_integer(),
           active_n :: pos_integer(),
+          %% MQTT Piggyback
+          mqtt_piggyback :: single | multiple, 
           %% Limiter
           %% Limiter
           limiter :: maybe(emqx_limiter:limiter()),
           limiter :: maybe(emqx_limiter:limiter()),
           %% Limit Timer
           %% Limit Timer
@@ -226,6 +228,7 @@ websocket_init([Req, Opts]) ->
     RateLimit = emqx_zone:ratelimit(Zone),
     RateLimit = emqx_zone:ratelimit(Zone),
     Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
     Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
     ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
     ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
+    MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
     FrameOpts = emqx_zone:mqtt_frame_options(Zone),
     FrameOpts = emqx_zone:mqtt_frame_options(Zone),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
     Serialize = emqx_frame:serialize_fun(),
     Serialize = emqx_frame:serialize_fun(),
@@ -237,19 +240,20 @@ websocket_init([Req, Opts]) ->
     IdleTimer = start_timer(IdleTimeout, idle_timeout),
     IdleTimer = start_timer(IdleTimeout, idle_timeout),
     emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
     emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
     emqx_logger:set_metadata_peername(esockd:format(Peername)),
     emqx_logger:set_metadata_peername(esockd:format(Peername)),
-    {ok, #state{peername     = Peername,
-                sockname     = Sockname,
-                sockstate    = running,
-                active_n     = ActiveN,
-                limiter      = Limiter,
-                parse_state  = ParseState,
-                serialize    = Serialize,
-                channel      = Channel,
-                gc_state     = GcState,
-                postponed    = [],
-                stats_timer  = StatsTimer,
-                idle_timeout = IdleTimeout,
-                idle_timer   = IdleTimer
+    {ok, #state{peername       = Peername,
+                sockname       = Sockname,
+                sockstate      = running,
+                active_n       = ActiveN,
+                mqtt_piggyback = MQTTPiggyback,
+                limiter        = Limiter,
+                parse_state    = ParseState,
+                serialize      = Serialize,
+                channel        = Channel,
+                gc_state       = GcState,
+                postponed      = [],
+                stats_timer    = StatsTimer,
+                idle_timeout   = IdleTimeout,
+                idle_timer     = IdleTimer
                }, hibernate}.
                }, hibernate}.
 
 
 websocket_handle({binary, Data}, State) when is_list(Data) ->
 websocket_handle({binary, Data}, State) when is_list(Data) ->
@@ -514,7 +518,7 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
 %% Handle outgoing packets
 %% Handle outgoing packets
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-handle_outgoing(Packets, State = #state{active_n = ActiveN}) ->
+handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) ->
     IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
     IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
     Oct = iolist_size(IoData),
     Oct = iolist_size(IoData),
     ok = inc_sent_stats(length(Packets), Oct),
     ok = inc_sent_stats(length(Packets), Oct),
@@ -526,7 +530,12 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN}) ->
                      postpone({check_gc, Stats}, State);
                      postpone({check_gc, Stats}, State);
                  false -> State
                  false -> State
              end,
              end,
-    {{binary, IoData}, ensure_stats_timer(NState)}.
+    
+    {case MQTTPiggyback of
+         single -> {binary, IoData};
+         multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData)
+     end,
+     ensure_stats_timer(NState)}.
 
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet) ->
     fun(Packet) ->
@@ -637,8 +646,8 @@ return(State = #state{postponed = Postponed}) ->
         {[], []}   -> {ok, State1};
         {[], []}   -> {ok, State1};
         {[], Cmds} -> {Cmds, State1};
         {[], Cmds} -> {Cmds, State1};
         {Packets, Cmds} ->
         {Packets, Cmds} ->
-            {Frame, State2} = handle_outgoing(Packets, State1),
-            {[Frame|Cmds], State2}
+            {Frames, State2} = handle_outgoing(Packets, State1),
+            {Frames ++ Cmds, State2}
     end.
     end.
 
 
 classify([], Packets, Cmds, Events) ->
 classify([], Packets, Cmds, Events) ->