Explorar el Código

Merge pull request #12920 from cisiqo/patch-6

fix: STOMP heartbeat
JianBo He hace 1 año
padre
commit
4f341bef8b

+ 10 - 13
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -662,6 +662,13 @@ handle_in(
                 ]
                 ]
         end,
         end,
     {ok, Outgoings, Channel};
     {ok, Outgoings, Channel};
+handle_in(
+    ?PACKET(?CMD_HEARTBEAT),
+    Channel = #channel{heartbeat = Heartbeat}
+) ->
+    NewVal = emqx_pd:get_counter(recv_pkt),
+    NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, NewVal, Heartbeat),
+    {ok, Channel#channel{heartbeat = NewHeartbeat}};
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);
     shutdown(Reason, Channel);
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
@@ -1125,19 +1132,9 @@ handle_timeout(
     {keepalive_send, NewVal},
     {keepalive_send, NewVal},
     Channel = #channel{heartbeat = HrtBt}
     Channel = #channel{heartbeat = HrtBt}
 ) ->
 ) ->
-    case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
-        {error, timeout} ->
-            NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt),
-            NChannel = Channel#channel{heartbeat = NHrtBt},
-            {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)},
-                reset_timer(outgoing_timer, NChannel)};
-        {ok, NHrtBt} ->
-            {ok,
-                reset_timer(
-                    outgoing_timer,
-                    Channel#channel{heartbeat = NHrtBt}
-                )}
-    end;
+    NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt),
+    NChannel = Channel#channel{heartbeat = NHrtBt},
+    {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, reset_timer(outgoing_timer, NChannel)};
 handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) ->
 handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) ->
     Now = erlang:system_time(millisecond),
     Now = erlang:system_time(millisecond),
     NTrans = maps:filter(
     NTrans = maps:filter(

+ 3 - 0
apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl

@@ -140,6 +140,9 @@ g(Key, Opts, Val) ->
 -spec parse(binary(), parse_state()) -> parse_result().
 -spec parse(binary(), parse_state()) -> parse_result().
 parse(<<>>, Parser) ->
 parse(<<>>, Parser) ->
     {more, Parser};
     {more, Parser};
+%% treat the \n as a heartbeat frame
+parse(<<$\n>>, Parser = #{phase := none}) ->
+    {ok, #stomp_frame{command = ?CMD_HEARTBEAT}, <<>>, Parser};
 parse(Bytes, #{phase := body, length := Len, state := State}) ->
 parse(Bytes, #{phase := body, length := Len, state := State}) ->
     parse(body, Bytes, State, Len);
     parse(body, Bytes, State, Len);
 parse(<<?LF, Bytes/binary>>, #{phase := hdname, state := State}) ->
 parse(<<?LF, Bytes/binary>>, #{phase := hdname, state := State}) ->

+ 1 - 1
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -190,7 +190,7 @@ t_heartbeat(_) ->
                     {<<"host">>, <<"127.0.0.1:61613">>},
                     {<<"host">>, <<"127.0.0.1:61613">>},
                     {<<"login">>, <<"guest">>},
                     {<<"login">>, <<"guest">>},
                     {<<"passcode">>, <<"guest">>},
                     {<<"passcode">>, <<"guest">>},
-                    {<<"heart-beat">>, <<"1000,800">>}
+                    {<<"heart-beat">>, <<"500,800">>}
                 ]
                 ]
             )
             )
         ),
         ),