Przeglądaj źródła

fix(frame): safely serializing and structured log.

JimMoen 4 lat temu
rodzic
commit
c1ff30896a

+ 4 - 2
apps/emqx/include/emqx_mqtt.hrl

@@ -542,7 +542,9 @@
 -define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
 -define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
 -define(IS_SHARE(Topic), case Topic of <<?SHARE, _/binary>> -> true; _ -> false end).
 -define(IS_SHARE(Topic), case Topic of <<?SHARE, _/binary>> -> true; _ -> false end).
 
 
--define(FRAME_ERROR(Reason), {frame_error, Reason}).
--define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_ERROR(Reason))).
+-define(FRAME_PARSE_ERROR(Reason), {frame_parse_error, Reason}).
+-define(FRAME_SERIALIZE_ERROR(Reason), {frame_serialize_error, Reason}).
+-define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_PARSE_ERROR(Reason))).
+-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw(?FRAME_SERIALIZE_ERROR(Reason))).
 
 
 -endif.
 -endif.

+ 19 - 8
apps/emqx/src/emqx_connection.erl

@@ -644,7 +644,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
             NState = State#state{parse_state = NParseState},
             NState = State#state{parse_state = NParseState},
             parse_incoming(Rest, [Packet|Packets], NState)
             parse_incoming(Rest, [Packet|Packets], NState)
     catch
     catch
-        throw : ?FRAME_ERROR(Reason) ->
+        throw : ?FRAME_PARSE_ERROR(Reason) ->
             ?SLOG(info, #{ reason => Reason
             ?SLOG(info, #{ reason => Reason
                          , at_state => emqx_frame:describe_state(ParseState)
                          , at_state => emqx_frame:describe_state(ParseState)
                          , input_bytes => Data
                          , input_bytes => Data
@@ -652,12 +652,12 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
                          }),
                          }),
             {[{frame_error, Reason} | Packets], State};
             {[{frame_error, Reason} | Packets], State};
         error : Reason : Stacktrace ->
         error : Reason : Stacktrace ->
-            ?SLOG(info, #{ at_state => emqx_frame:describe_state(ParseState)
-                         , input_bytes => Data
-                         , parsed_packets => Packets
-                         , exception => Reason
-                         , stacktrace => Stacktrace
-                         }),
+            ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState)
+                          , input_bytes => Data
+                          , parsed_packets => Packets
+                          , exception => Reason
+                          , stacktrace => Stacktrace
+                          }),
             {[{frame_error, Reason} | Packets], State}
             {[{frame_error, Reason} | Packets], State}
     end.
     end.
 
 
@@ -707,7 +707,7 @@ handle_outgoing(Packet, State) ->
 
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet) ->
     fun(Packet) ->
-        case emqx_frame:serialize_pkt(Packet, Serialize) of
+        try emqx_frame:serialize_pkt(Packet, Serialize) of
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
                          [emqx_packet:format(Packet)]),
                          [emqx_packet:format(Packet)]),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
@@ -716,6 +716,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
             Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
             Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
                     ok = inc_outgoing_stats(Packet),
                     ok = inc_outgoing_stats(Packet),
                     Data
                     Data
+        catch
+            %% Maybe Never happen.
+            throw : ?FRAME_SERIALIZE_ERROR(Reason) ->
+                ?SLOG(info, #{ reason => Reason
+                             , input_packet => Packet}),
+                erlang:error(?FRAME_SERIALIZE_ERROR(Reason));
+            error : Reason : Stacktrace ->
+                ?SLOG(error, #{ input_packet => Packet
+                              , exception => Reason
+                              , stacktrace => Stacktrace}),
+                erlang:raise(error, Reason, Stacktrace)
         end
         end
     end.
     end.
 
 

+ 2 - 1
apps/emqx/src/emqx_frame.erl

@@ -74,6 +74,7 @@
          }).
          }).
 
 
 -define(PARSE_ERR(Reason), ?THROW_FRAME_ERROR(Reason)).
 -define(PARSE_ERR(Reason), ?THROW_FRAME_ERROR(Reason)).
+-define(SERIALIZE_ERR(Reason), ?THROW_SERIALIZE_ERROR(Reason)).
 
 
 -define(MULTIPLIER_MAX, 16#200000).
 -define(MULTIPLIER_MAX, 16#200000).
 
 
@@ -781,7 +782,7 @@ serialize_binary_data(Bin) ->
     [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
     [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
 
 
 serialize_utf8_string(undefined, false) ->
 serialize_utf8_string(undefined, false) ->
-    ?PARSE_ERR(utf8_string_undefined);
+    ?SERIALIZE_ERR(utf8_string_undefined);
 serialize_utf8_string(undefined, true) ->
 serialize_utf8_string(undefined, true) ->
     <<>>;
     <<>>;
 serialize_utf8_string(String, _AllowNull) ->
 serialize_utf8_string(String, _AllowNull) ->

+ 18 - 7
apps/emqx/src/emqx_ws_connection.erl

@@ -547,7 +547,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
             NState = State#state{parse_state = NParseState},
             NState = State#state{parse_state = NParseState},
             parse_incoming(Rest, postpone({incoming, Packet}, NState))
             parse_incoming(Rest, postpone({incoming, Packet}, NState))
     catch
     catch
-        throw : ?FRAME_ERROR(Reason) ->
+        throw : ?FRAME_PARSE_ERROR(Reason) ->
             ?SLOG(info, #{ reason => Reason
             ?SLOG(info, #{ reason => Reason
                          , at_state => emqx_frame:describe_state(ParseState)
                          , at_state => emqx_frame:describe_state(ParseState)
                          , input_bytes => Data
                          , input_bytes => Data
@@ -555,11 +555,11 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
             FrameError = {frame_error, Reason},
             FrameError = {frame_error, Reason},
             postpone({incoming, FrameError}, State);
             postpone({incoming, FrameError}, State);
         error : Reason : Stacktrace ->
         error : Reason : Stacktrace ->
-            ?SLOG(info, #{ at_state => emqx_frame:describe_state(ParseState)
-                         , input_bytes => Data
-                         , exception => Reason
-                         , stacktrace => Stacktrace
-                         }),
+            ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState)
+                          , input_bytes => Data
+                          , exception => Reason
+                          , stacktrace => Stacktrace
+                          }),
             FrameError = {frame_error, Reason},
             FrameError = {frame_error, Reason},
             postpone({incoming, FrameError}, State)
             postpone({incoming, FrameError}, State)
     end.
     end.
@@ -627,7 +627,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
 
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet) ->
     fun(Packet) ->
-        case emqx_frame:serialize_pkt(Packet, Serialize) of
+        try emqx_frame:serialize_pkt(Packet, Serialize) of
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
                          [emqx_packet:format(Packet)]),
                          [emqx_packet:format(Packet)]),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
@@ -636,6 +636,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
             Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
             Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
                     ok = inc_outgoing_stats(Packet),
                     ok = inc_outgoing_stats(Packet),
                     Data
                     Data
+        catch
+            %% Maybe Never happen.
+            throw : ?FRAME_SERIALIZE_ERROR(Reason) ->
+                ?SLOG(info, #{ reason => Reason
+                             , input_packet => Packet}),
+                erlang:error(?FRAME_SERIALIZE_ERROR(Reason));
+            error : Reason : Stacktrace ->
+                ?SLOG(error, #{ input_packet => Packet
+                              , exception => Reason
+                              , stacktrace => Stacktrace}),
+                erlang:raise(error, Reason, Stacktrace)
         end
         end
     end.
     end.
 
 

+ 1 - 2
apps/emqx/test/emqx_frame_SUITE.erl

@@ -24,7 +24,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 
 
 -define(ASSERT_FRAME_THROW(Reason, Expr),
 -define(ASSERT_FRAME_THROW(Reason, Expr),
-        ?assertThrow(?FRAME_ERROR(Reason), Expr)).
+        ?assertThrow(?FRAME_PARSE_ERROR(Reason), Expr)).
 
 
 all() ->
 all() ->
     [{group, parse},
     [{group, parse},
@@ -552,4 +552,3 @@ parse_to_packet(Bin, Opts) ->
     Packet.
     Packet.
 
 
 payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
 payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
-