Browse Source

fix(trace): make sure that the payload encode works with nested payloads

This commit makes sure that the trace setting for payload encode works
even when the payload is in a nested structure or when the payload key
is a binary instead of an atom.

Fixes:
https://emqx.atlassian.net/browse/EMQX-12424
Kjell Winblad 1 year ago
parent
commit
fb7688ab94

+ 18 - 2
apps/emqx/src/emqx_trace/emqx_trace_formatter.erl

@@ -48,7 +48,11 @@ format_meta_map(Meta) ->
     format_meta_map(Meta, Encode).
 
 format_meta_map(Meta, Encode) ->
-    format_meta_map(Meta, Encode, [{packet, fun format_packet/2}, {payload, fun format_payload/2}]).
+    format_meta_map(Meta, Encode, [
+        {packet, fun format_packet/2},
+        {payload, fun format_payload/2},
+        {<<"payload">>, fun format_payload/2}
+    ]).
 
 format_meta_map(Meta, _Encode, []) ->
     Meta;
@@ -61,9 +65,21 @@ format_meta_map(Meta, Encode, [{Name, FormatFun} | Rest]) ->
             format_meta_map(Meta, Encode, Rest)
     end.
 
+format_meta_data(Meta0, Encode) when is_map(Meta0) ->
+    Meta1 = format_meta_map(Meta0, Encode),
+    maps:map(fun(_K, V) -> format_meta_data(V, Encode) end, Meta1);
+format_meta_data(Meta, Encode) when is_list(Meta) ->
+    [format_meta_data(Item, Encode) || Item <- Meta];
+format_meta_data(Meta, Encode) when is_tuple(Meta) ->
+    List = erlang:tuple_to_list(Meta),
+    FormattedList = [format_meta_data(Item, Encode) || Item <- List],
+    erlang:list_to_tuple(FormattedList);
+format_meta_data(Meta, _Encode) ->
+    Meta.
+
 format_meta(Meta0, Encode) ->
     Meta1 = maps:without([msg, clientid, peername, trace_tag], Meta0),
-    Meta2 = format_meta_map(Meta1, Encode),
+    Meta2 = format_meta_data(Meta1, Encode),
     kvs_to_iolist(lists:sort(fun compare_meta_kvs/2, maps:to_list(Meta2))).
 
 %% packet always goes first; payload always goes last

+ 15 - 7
apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl

@@ -42,7 +42,7 @@ format(
     %% an external call to create the JSON text
     Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
     LogMap2 = LogMap1#{time => Time},
-    LogMap3 = prepare_log_map(LogMap2, PEncode),
+    LogMap3 = prepare_log_data(LogMap2, PEncode),
     [emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"].
 
 %%%-----------------------------------------------------------------
@@ -85,9 +85,17 @@ do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) -
 do_maybe_format_msg(Msg, Meta, Config) ->
     emqx_logger_jsonfmt:format_msg(Msg, Meta, Config).
 
-prepare_log_map(LogMap, PEncode) ->
+prepare_log_data(LogMap, PEncode) when is_map(LogMap) ->
     NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
-    maps:from_list(NewKeyValuePairs).
+    maps:from_list(NewKeyValuePairs);
+prepare_log_data(V, PEncode) when is_list(V) ->
+    [prepare_log_data(Item, PEncode) || Item <- V];
+prepare_log_data(V, PEncode) when is_tuple(V) ->
+    List = erlang:tuple_to_list(V),
+    PreparedList = [prepare_log_data(Item, PEncode) || Item <- List],
+    erlang:list_to_tuple(PreparedList);
+prepare_log_data(V, _PEncode) ->
+    V.
 
 prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when
     is_integer(I1),
@@ -118,6 +126,8 @@ prepare_key_value(payload = K, V, PEncode) ->
                 V
         end,
     {K, NewV};
+prepare_key_value(<<"payload">>, V, PEncode) ->
+    prepare_key_value(payload, V, PEncode);
 prepare_key_value(packet = K, V, PEncode) ->
     NewV =
         try
@@ -167,10 +177,8 @@ prepare_key_value(action_id = K, V, _PEncode) ->
         _:_ ->
             {K, V}
     end;
-prepare_key_value(K, V, PEncode) when is_map(V) ->
-    {K, prepare_log_map(V, PEncode)};
-prepare_key_value(K, V, _PEncode) ->
-    {K, V}.
+prepare_key_value(K, V, PEncode) ->
+    {K, prepare_log_data(V, PEncode)}.
 
 format_packet(undefined, _) -> "";
 format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode).

+ 21 - 11
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -91,13 +91,16 @@ end_per_testcase(_TestCase, _Config) ->
     ok.
 
 t_basic_apply_rule_trace_ruleid(Config) ->
-    basic_apply_rule_test_helper(get_action(Config), ruleid, false).
+    basic_apply_rule_test_helper(get_action(Config), ruleid, false, text).
+
+t_basic_apply_rule_trace_ruleid_hidden_payload(Config) ->
+    basic_apply_rule_test_helper(get_action(Config), ruleid, false, hidden).
 
 t_basic_apply_rule_trace_clientid(Config) ->
-    basic_apply_rule_test_helper(get_action(Config), clientid, false).
+    basic_apply_rule_test_helper(get_action(Config), clientid, false, text).
 
 t_basic_apply_rule_trace_ruleid_stop_after_render(Config) ->
-    basic_apply_rule_test_helper(get_action(Config), ruleid, true).
+    basic_apply_rule_test_helper(get_action(Config), ruleid, true, text).
 
 get_action(Config) ->
     case ?config(group_name, Config) of
@@ -135,10 +138,10 @@ republish_action() ->
 console_print_action() ->
     #{<<"function">> => <<"console">>}.
 
-basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
+basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) ->
     %% Create Rule
     RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
-    SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
+    SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>,
     {ok, #{<<"id">> := RuleId}} =
         emqx_bridge_testlib:create_rule_and_action(
             Action,
@@ -157,12 +160,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
             clientid ->
                 ClientId
         end,
-    create_trace(TraceName, TraceType, TraceValue),
+    create_trace(TraceName, TraceType, TraceValue, PayloadEncode),
     %% ===================================
     Context = #{
         clientid => ClientId,
         event_type => message_publish,
-        payload => <<"{\"msg\": \"hello\"}">>,
+        payload => <<"{\"msg\": \"my_payload_msg\"}">>,
         qos => 1,
         topic => RuleTopic,
         username => <<"u_emqx">>
@@ -179,6 +182,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
         begin
             Bin = read_rule_trace_file(TraceName, TraceType, Now),
             io:format("THELOG:~n~s", [Bin]),
+            case PayloadEncode of
+                hidden ->
+                    ?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]));
+                text ->
+                    ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]))
+            end,
             ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
             ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])),
             case Action of
@@ -273,7 +282,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) ->
 do_final_log_check(_, _) ->
     ok.
 
-create_trace(TraceName, TraceType, TraceValue) ->
+create_trace(TraceName, TraceType, TraceValue, PayloadEncode) ->
     Now = erlang:system_time(second) - 10,
     Start = Now,
     End = Now + 60,
@@ -283,7 +292,8 @@ create_trace(TraceName, TraceType, TraceValue) ->
         TraceType => TraceValue,
         start_at => Start,
         end_at => End,
-        formatter => json
+        formatter => json,
+        payload_encode => PayloadEncode
     },
     {ok, _} = CreateRes = emqx_trace:create(Trace),
     emqx_common_test_helpers:on_exit(fun() ->
@@ -323,7 +333,7 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
         ?FUNCTION_NAME,
         SQL
     ),
-    create_trace(Name, ruleid, RuleID),
+    create_trace(Name, ruleid, RuleID, text),
     Now = erlang:system_time(second) - 10,
     %% Stop
     ParmsStopAfterRender = apply_rule_parms(true, Name),
@@ -588,7 +598,7 @@ do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun)
         ?FUNCTION_NAME,
         SQL
     ),
-    create_trace(Name, ruleid, RuleID),
+    create_trace(Name, ruleid, RuleID, text),
     Now = erlang:system_time(second) - 10,
     %% Stop
     ParmsNoStopAfterRender = apply_rule_parms(false, Name),