Преглед изворни кода

Merge pull request #1905 from emqx/Fix-for-mqtt-sn

Fix the init_proc_mng_policy bug
turtleDeng пре 7 година
родитељ
комит
0a337cbcb4
5 измењених фајлова са 12 додато и 14 уклоњено
  1. 2 2
      src/emqx_connection.erl
  2. 1 2
      src/emqx_frame.erl
  3. 4 3
      src/emqx_misc.erl
  4. 1 4
      src/emqx_protocol.erl
  5. 4 3
      src/emqx_ws_connection.erl

+ 2 - 2
src/emqx_connection.erl

@@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) ->
     esockd_rate_limit:new(Rate, Burst).
 
 send_fun(Transport, Socket, Peername) ->
-    fun(Data) ->
+    fun(Packet, Options) ->
+        Data = emqx_frame:serialize(Packet, Options),
         try Transport:async_send(Socket, Data) of
             ok ->
                 ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}),
@@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
     ok = emqx_gc:inc(1, Oct);
 maybe_gc(_, _) ->
     ok.
-

+ 1 - 2
src/emqx_frame.erl

@@ -130,7 +130,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
       WillQoS      : 2,
       WillFlag     : 1,
       CleanStart   : 1,
-      0            : 1,         
+      0            : 1,
       KeepAlive    : 16/big,
       Rest2/binary>> = Rest1,
 
@@ -634,4 +634,3 @@ fixqos(?PUBREL, 0)      -> 1;
 fixqos(?SUBSCRIBE, 0)   -> 1;
 fixqos(?UNSUBSCRIBE, 0) -> 1;
 fixqos(_Type, QoS)      -> QoS.
-

+ 4 - 3
src/emqx_misc.erl

@@ -62,9 +62,11 @@ proc_stats(Pid) ->
 
 -define(DISABLED, 0).
 
+init_proc_mng_policy(undefined) -> ok;
 init_proc_mng_policy(Zone) ->
-    #{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy =
-        emqx_zone:get_env(Zone, force_shutdown_policy),
+    #{max_heap_size := MaxHeapSizeInBytes}
+        = ShutdownPolicy
+        = emqx_zone:get_env(Zone, force_shutdown_policy),
     MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize),
     _ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded
     erlang:put(force_shutdown_policy, ShutdownPolicy),
@@ -106,4 +108,3 @@ is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.
 proc_info(Key) ->
     {Key, Value} = erlang:process_info(self(), Key),
     Value.
-

+ 1 - 4
src/emqx_protocol.erl

@@ -586,13 +586,10 @@ deliver({disconnect, _ReasonCode}, PState) ->
 -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
 send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
     trace(send, Packet, PState),
-    case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of
+    case SendFun(Packet, #{version => Ver}) of
         ok ->
             emqx_metrics:sent(Packet),
             {ok, inc_stats(send, Type, PState)};
-        {binary, _Data} ->
-            emqx_metrics:sent(Packet),
-            {ok, inc_stats(send, Type, PState)};
         {error, Reason} ->
             {error, Reason}
     end.

+ 4 - 3
src/emqx_ws_connection.erl

@@ -144,12 +144,14 @@ websocket_init(#state{request = Req, options = Options}) ->
                 idle_timeout = IdleTimout}}.
 
 send_fun(WsPid) ->
-    fun(Data) ->
+    fun(Packet, Options) ->
+        Data = emqx_frame:serialize(Packet, Options),
         BinSize = iolist_size(Data),
         emqx_metrics:inc('bytes/sent', BinSize),
         put(send_oct, get(send_oct) + BinSize),
         put(send_cnt, get(send_cnt) + 1),
-        WsPid ! {binary, iolist_to_binary(Data)}
+        WsPid ! {binary, iolist_to_binary(Data)},
+        ok
     end.
 
 stat_fun() ->
@@ -299,4 +301,3 @@ stop(Error, State) ->
 
 wsock_stats() ->
     [{Key, get(Key)} || Key <- ?SOCK_STATS].
-