Преглед изворни кода

Delegate serialize fun into sendfun

Gilbert Wong пре 7 година
родитељ
комит
763115e149
3 измењених фајлова са 14 додато и 8 уклоњено
  1. 9 2
      src/emqx_connection.erl
  2. 2 3
      src/emqx_protocol.erl
  3. 3 3
      src/emqx_ws_connection.erl

+ 9 - 2
src/emqx_connection.erl

@@ -141,7 +141,15 @@ init({Transport, RawSocket, Options}) ->
     ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
     EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
     IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
-    SendFun = fun(Data) -> Transport:async_send(Socket, Data) end,
+    SendFun = fun(Packet, SeriaOpts) ->
+                      Data = emqx_frame:serialize(Packet, SeriaOpts),
+                      case Transport:async_send(Socket, Data) of
+                          ok ->
+                              {ok, Data};
+                          {error, Reason} ->
+                              {error, Reason}
+                      end
+              end,
     ProtoState = emqx_protocol:init(#{peername => Peername,
                                       sockname => Sockname,
                                       peercert => Peercert,
@@ -484,4 +492,3 @@ shutdown(Reason, State) ->
 
 stop(Reason, State) ->
     {stop, Reason, State}.
-

+ 2 - 3
src/emqx_protocol.erl

@@ -683,9 +683,8 @@ deliver({disconnect, _ReasonCode}, PState) ->
 
 -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
 send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) ->
-    Data = emqx_frame:serialize(Packet, #{version => Ver}),
-    case Send(Data) of
-        ok ->
+    case Send(Packet, #{version => Ver}) of
+        {ok, Data} ->
             trace(send, Packet),
             emqx_metrics:sent(Packet),
             emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),

+ 3 - 3
src/emqx_ws_connection.erl

@@ -143,12 +143,13 @@ websocket_init(#state{request = Req, options = Options}) ->
                 idle_timeout = IdleTimout}}.
 
 send_fun(WsPid) ->
-    fun(Data) ->
+    fun(Packet, Options) ->
+        Data = emqx_frame:serialize(Packet, Options),
         BinSize = iolist_size(Data),
         emqx_pd:update_counter(send_cnt, 1),
         emqx_pd:update_counter(send_oct, BinSize),
         WsPid ! {binary, iolist_to_binary(Data)},
-        ok
+        {ok, Data}
     end.
 
 stat_fun() ->
@@ -305,4 +306,3 @@ shutdown(Reason, State) ->
 
 wsock_stats() ->
     [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].
-