Преглед изворни кода

Merge pull request #11454 from zhongwencool/huge-payload

fix: don't crash when debug huge payload
zhongwencool пре 2 година
родитељ
комит
b23da691c0

+ 1 - 0
apps/emqx/include/emqx_mqtt.hrl

@@ -680,6 +680,7 @@ end).
 -define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).
 
 -define(MAX_PAYLOAD_FORMAT_SIZE, 1024).
+-define(TRUNCATED_PAYLOAD_SIZE, 100).
 -define(MAX_PAYLOAD_FORMAT_LIMIT(Bin), (byte_size(Bin) =< ?MAX_PAYLOAD_FORMAT_SIZE)).
 
 -endif.

+ 21 - 7
apps/emqx/src/emqx_packet.erl

@@ -55,6 +55,8 @@
     format/2
 ]).
 
+-export([format_truncated_payload/3]).
+
 -define(TYPE_NAMES,
     {'CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL', 'PUBCOMP', 'SUBSCRIBE',
         'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK', 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH'}
@@ -614,21 +616,33 @@ format_password(undefined) -> "";
 format_password(<<>>) -> "";
 format_password(_Password) -> "******".
 
+format_payload(_, hidden) ->
+    "Payload=******";
 format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
     ["Payload=", unicode:characters_to_list(Payload)];
 format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
     ["Payload(hex)=", binary:encode_hex(Payload)];
-format_payload(_, hidden) ->
-    "Payload=******";
-format_payload(<<Part:100, _/binary>> = Payload, _) ->
+format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
     [
         "Payload=",
-        Part,
-        "... The ",
-        integer_to_list(byte_size(Payload) - 100),
-        " bytes of this log are truncated"
+        format_truncated_payload(Part, byte_size(Payload), Type)
     ].
 
+format_truncated_payload(Bin, Size, Type) ->
+    Bin2 =
+        case Type of
+            text -> Bin;
+            hex -> binary:encode_hex(Bin)
+        end,
+    unicode:characters_to_list(
+        [
+            Bin2,
+            "... The ",
+            integer_to_list(Size - ?TRUNCATED_PAYLOAD_SIZE),
+            " bytes of this log are truncated"
+        ]
+    ).
+
 i(true) -> 1;
 i(false) -> 0;
 i(I) when is_integer(I) -> I.

+ 2 - 7
apps/emqx/src/emqx_trace/emqx_trace_formatter.erl

@@ -76,13 +76,8 @@ format_payload(_, hidden) ->
 format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
     unicode:characters_to_list(Payload);
 format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload);
-format_payload(<<Part:100, _/binary>> = Payload, _) ->
-    [
-        Part,
-        "... The ",
-        integer_to_list(byte_size(Payload) - 100),
-        " bytes of this log are truncated"
-    ].
+format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
+    emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type).
 
 to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
 to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);

+ 54 - 0
apps/emqx/test/emqx_trace_SUITE.erl

@@ -311,6 +311,60 @@ t_client_event(_Config) ->
     ?assert(erlang:byte_size(Bin3) > 0),
     ok.
 
+t_client_huge_payload_truncated(_Config) ->
+    ClientId = <<"client-truncated1">>,
+    Now = erlang:system_time(second),
+    Name = <<"test_client_id_truncated1">>,
+    {ok, _} = emqx_trace:create([
+        {<<"name">>, Name},
+        {<<"type">>, clientid},
+        {<<"clientid">>, ClientId},
+        {<<"start_at">>, Now}
+    ]),
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
+    {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
+    {ok, _} = emqtt:connect(Client),
+    emqtt:ping(Client),
+    NormalPayload = iolist_to_binary(lists:duplicate(1024, "x")),
+    ok = emqtt:publish(Client, <<"/test">>, #{}, NormalPayload, [{qos, 0}]),
+    HugePayload1 = iolist_to_binary(lists:duplicate(1025, "y")),
+    ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload1, [{qos, 0}]),
+    HugePayload2 = iolist_to_binary(lists:duplicate(1024 * 10, "y")),
+    ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload2, [{qos, 0}]),
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
+    {ok, _} = emqx_trace:create([
+        {<<"name">>, <<"test_topic">>},
+        {<<"type">>, topic},
+        {<<"topic">>, <<"/test">>},
+        {<<"start_at">>, Now}
+    ]),
+    ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
+    {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
+    ok = emqtt:publish(Client, <<"/test">>, #{}, NormalPayload, [{qos, 0}]),
+    ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload1, [{qos, 0}]),
+    ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload2, [{qos, 0}]),
+    ok = emqtt:disconnect(Client),
+    ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
+    ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
+    {ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
+    {ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
+    ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
+    ?assert(erlang:byte_size(Bin) > 1024),
+    ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
+    ?assert(erlang:byte_size(Bin3) > 1024),
+
+    %% Don't have format crash
+    CrashBin = <<"CRASH">>,
+    ?assertEqual(nomatch, binary:match(Bin, [CrashBin])),
+    ?assertEqual(nomatch, binary:match(Bin2, [CrashBin])),
+    ?assertEqual(nomatch, binary:match(Bin3, [CrashBin])),
+    %% have "this log are truncated" for huge payload
+    TruncatedLog = <<"this log are truncated">>,
+    ?assertNotEqual(nomatch, binary:match(Bin, [TruncatedLog])),
+    ?assertNotEqual(nomatch, binary:match(Bin2, [TruncatedLog])),
+    ?assertNotEqual(nomatch, binary:match(Bin3, [TruncatedLog])),
+    ok.
+
 t_get_log_filename(_Config) ->
     Now = erlang:system_time(second),
     Name = <<"name1">>,

+ 1 - 0
changes/ce/fix-11454.en.md

@@ -0,0 +1 @@
+Fixed crashing when debugging/tracing with large payloads(introduce when [#11279](https://github.com/emqx/emqx/pull/11279))