chengshq 1 год назад
Родитель
Сommit
75dab4dff0

+ 4 - 1
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -710,7 +710,10 @@ parse_incoming(
 ) ->
     try FrameMod:parse(Data, ParseState) of
         {more, NParseState} ->
-            {Packets, State#state{parse_state = NParseState}};
+            if
+                Data == <<$\n>> -> {[Data], State#state{parse_state = NParseState}};
+                true -> {Packets, State#state{parse_state = NParseState}}
+            end;
         {ok, Packet, Rest, NParseState} ->
             NState = State#state{parse_state = NParseState},
             parse_incoming(Rest, [Packet | Packets], NState)

+ 9 - 13
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -653,6 +653,12 @@ handle_in(
                 ]
         end,
     {ok, Outgoings, Channel};
+handle_in(
+    <<$\n>>,
+    Channel = #channel{heartbeat = Heartbeat}
+) ->
+    NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, 0, Heartbeat),
+    {ok, Channel#channel{heartbeat = NewHeartbeat}};
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
@@ -1116,19 +1122,9 @@ handle_timeout(
     {keepalive_send, NewVal},
     Channel = #channel{heartbeat = HrtBt}
 ) ->
-    case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
-        {error, timeout} ->
-            NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt),
-            NChannel = Channel#channel{heartbeat = NHrtBt},
-            {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)},
-                reset_timer(outgoing_timer, NChannel)};
-        {ok, NHrtBt} ->
-            {ok,
-                reset_timer(
-                    outgoing_timer,
-                    Channel#channel{heartbeat = NHrtBt}
-                )}
-    end;
+    NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt),
+    NChannel = Channel#channel{heartbeat = NHrtBt},
+    {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, reset_timer(outgoing_timer, NChannel)};
 handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) ->
     Now = erlang:system_time(millisecond),
     NTrans = maps:filter(

+ 3 - 1
apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl

@@ -346,7 +346,9 @@ serialize_pkt(
 serialize_pkt(header, {Name, Val}) when is_integer(Val) ->
     [escape(Name), ?COLON, integer_to_list(Val), ?LF];
 serialize_pkt(header, {Name, Val}) ->
-    [escape(Name), ?COLON, escape(Val), ?LF].
+    [escape(Name), ?COLON, escape(Val), ?LF];
+serialize_pkt(<<$\n>>, _SerializeOpts) ->
+    <<$\n>>.
 
 escape(Bin) when is_binary(Bin) ->
     <<<<(escape(Ch))/binary>> || <<Ch>> <= Bin>>;

+ 1 - 1
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -153,7 +153,7 @@ t_heartbeat(_) ->
                     {<<"host">>, <<"127.0.0.1:61613">>},
                     {<<"login">>, <<"guest">>},
                     {<<"passcode">>, <<"guest">>},
-                    {<<"heart-beat">>, <<"1000,800">>}
+                    {<<"heart-beat">>, <<"500,800">>}
                 ]
             )
         ),