Просмотр исходного кода

fix(stomp): fix the sticky tcp stream parsing

JianBo He 4 лет назад
Родитель
Сommit
fd88f484d2
1 измененных файлов с 9 добавлено и 1 удалено
  1. 9 1
      apps/emqx_stomp/src/emqx_stomp_frame.erl

+ 9 - 1
apps/emqx_stomp/src/emqx_stomp_frame.erl

@@ -123,6 +123,8 @@ parse(<<>>, Parser) ->
 
 parse(Bytes, #{phase := body, length := Len, state := State}) ->
     parse(body, Bytes, State, Len);
+parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
+    parse(Phase, Bytes, State);
 
 parse(Bytes, Parser = #{pre := Pre}) ->
     parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
@@ -153,6 +155,8 @@ parse(command, <<?LF, Rest/binary>>, State = #parser_state{acc = Acc}) ->
     parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
 parse(command, <<Ch:8, Rest/binary>>, State) ->
     parse(command, Rest, acc(Ch, State));
+parse(command, <<>>, State) ->
+    {more, #{phase => command, state => State}};
 
 parse(headers, <<?LF, Rest/binary>>, State) ->
     parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
@@ -165,11 +169,15 @@ parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
     parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
 parse(hdname, <<Ch:8, Rest/binary>>, State) ->
     parse(hdname, Rest, acc(Ch, State));
+parse(hdname, <<>>, State) ->
+    {more, #{phase => hdname, state => State}};
 
 parse(hdvalue, <<?LF, Rest/binary>>, State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
     parse(headers, Rest, State#parser_state{headers = add_header(Name, Acc, Headers), hdname = undefined, acc = <<>>});
 parse(hdvalue, <<Ch:8, Rest/binary>>, State) ->
-    parse(hdvalue, Rest, acc(Ch, State)).
+    parse(hdvalue, Rest, acc(Ch, State));
+parse(hdvalue, <<>>, State) ->
+    {more, #{phase => hdvalue, state => State}}.
 
 %% @private
 parse(body, <<>>, State, Length) ->