Explorar el Código

Merge pull request #13053 from kjellwinblad/kjell/fix_rule_trace_issues/EMQX-12327/EMQX-12335/EMQX-12336

Fix small action trace issues
Kjell Winblad hace 1 año
padre
commit
8279d8c787

+ 11 - 1
apps/emqx/src/emqx_logger_jsonfmt.erl

@@ -221,7 +221,7 @@ best_effort_unicode(Input, Config) ->
 
 best_effort_json_obj(List, Config) when is_list(List) ->
     try
-        json_obj(maps:from_list(List), Config)
+        json_obj(convert_tuple_list_to_map(List), Config)
     catch
         _:_ ->
             [json(I, Config) || I <- List]
@@ -234,6 +234,16 @@ best_effort_json_obj(Map, Config) ->
             do_format_msg("~p", [Map], Config)
     end.
 
+%% This function will throw if the list do not only contain tuples or if there
+%% are duplicate keys.
+convert_tuple_list_to_map(List) ->
+    %% Crash if this is not a tuple list
+    CandidateMap = maps:from_list(List),
+    %% Crash if there are duplicates
+    NumberOfItems = length(List),
+    NumberOfItems = maps:size(CandidateMap),
+    CandidateMap.
+
 json(A, _) when is_atom(A) -> A;
 json(I, _) when is_integer(I) -> I;
 json(F, _) when is_float(F) -> F;

+ 5 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -12,6 +12,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 
 %% schema
 -export([roots/0, fields/1, desc/1, namespace/0]).
@@ -273,11 +274,14 @@ do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) ->
             _ -> none
         end,
     emqx_trace:rendered_action_template(ChannelID, #{
-        cqls => CQLs
+        cqls => #emqx_trace_format_func_data{data = CQLs, function = fun trace_format_cql_tuples/1}
     }),
     Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
     handle_result(Res).
 
+trace_format_cql_tuples(Tuples) ->
+    [CQL || {_, CQL} <- Tuples].
+
 parse_request_to_cql({query, CQL}) ->
     {query, CQL, #{}};
 parse_request_to_cql({query, CQL, Params}) ->

+ 2 - 2
apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl

@@ -422,7 +422,7 @@ is_auth_key(_) ->
 %% -------------------------------------------------------------------------------------------------
 %% Query
 do_query(InstId, Channel, Client, Points) ->
-    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}),
+    emqx_trace:rendered_action_template(Channel, #{points => Points}),
     case greptimedb:write_batch(Client, Points) of
         {ok, #{response := {affected_rows, #{value := Rows}}}} ->
             ?SLOG(debug, #{
@@ -465,7 +465,7 @@ do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
         connector => InstId,
         points => Points
     }),
-    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}),
+    emqx_trace:rendered_action_template(Channel, #{points => Points}),
     WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
     ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).
 

+ 2 - 2
apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl

@@ -119,7 +119,7 @@ on_query(
             ),
             emqx_trace:rendered_action_template(
                 MessageTag,
-                #{command => Cmd, batch => false, mode => sync}
+                #{command => Cmd, batch => false}
             ),
             Result = query(InstId, {cmd, Cmd}, RedisConnSt),
             ?tp(
@@ -143,7 +143,7 @@ on_batch_query(
             [{ChannelID, _} | _] = BatchData,
             emqx_trace:rendered_action_template(
                 ChannelID,
-                #{commands => Cmds, batch => ture, mode => sync}
+                #{commands => Cmds, batch => ture}
             ),
             Result = query(InstId, {cmds, Cmds}, RedisConnSt),
             ?tp(

+ 29 - 0
apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl

@@ -271,6 +271,16 @@ t_http_test_json_formatter(_Config) ->
     }),
     %% We should handle report style logging
     ?SLOG(error, #{msg => "recursive_republish_detected"}, #{topic => Topic}),
+    ?TRACE("CUSTOM", "my_log_msg", #{
+        topic => Topic,
+        %% This will be converted to map
+        map_key => [{a, a}, {b, b}]
+    }),
+    ?TRACE("CUSTOM", "my_log_msg", #{
+        topic => Topic,
+        %% We should not convert this to a map as we will lose information
+        map_key => [{a, a}, {a, b}]
+    }),
     ok = emqx_trace_handler_SUITE:filesync(Name, topic),
     {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
     {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
@@ -425,6 +435,25 @@ t_http_test_json_formatter(_Config) ->
         },
         NextFun()
     ),
+    ?assertMatch(
+        #{
+            <<"meta">> := #{
+                <<"map_key">> := #{
+                    <<"a">> := <<"a">>,
+                    <<"b">> := <<"b">>
+                }
+            }
+        },
+        NextFun()
+    ),
+    ?assertMatch(
+        #{
+            <<"meta">> := #{
+                <<"map_key">> := [_, _]
+            }
+        },
+        NextFun()
+    ),
     {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))),
     ?assertEqual(<<>>, Delete),
 

+ 7 - 1
apps/emqx_mysql/src/emqx_mysql.erl

@@ -507,7 +507,13 @@ on_sql_query(
     LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
     ?TRACE("QUERY", "mysql_connector_received", LogMeta),
     ChannelID = maps:get(channel_id, State, no_channel),
-    emqx_trace:rendered_action_template(ChannelID, #{sql => SQLOrKey}),
+    emqx_trace:rendered_action_template(
+        ChannelID,
+        #{
+            sql_or_key => SQLOrKey,
+            parameters => Params
+        }
+    ),
     Worker = ecpool:get_client(PoolName),
     case ecpool_worker:client(Worker) of
         {ok, Conn} ->

+ 23 - 20
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1174,12 +1174,13 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
         {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
             {error, {unrecoverable_error, unhealthy_target}};
         {ok, _Group, Resource} ->
+            PrevLoggerProcessMetadata = logger:get_process_metadata(),
             QueryResult =
                 try
                     set_rule_id_trace_meta_data(Query),
                     do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource)
                 after
-                    unset_rule_id_trace_meta_data()
+                    reset_logger_process_metadata(PrevLoggerProcessMetadata)
                 end,
             QueryResult;
         {error, not_found} ->
@@ -1190,27 +1191,37 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
     %% Get the rule ids from requests
     RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
     ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
-    RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
-    StopAfterRenderVal =
+    RuleTriggerTimes0 = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
+    RuleTriggerTimes = lists:flatten(RuleTriggerTimes0),
+    TraceMetadata =
         case Requests of
             %% We know that the batch is not mixed since we prevent this by
             %% using a stop_after function in the replayq:pop call
             [?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] ->
-                true;
+                #{
+                    rule_ids => RuleIDs,
+                    client_ids => ClientIDs,
+                    rule_trigger_ts => RuleTriggerTimes,
+                    stop_action_after_render => true
+                };
             [?QUERY(_, _, _, _, _TraceCTX) | _] ->
-                false
+                #{
+                    rule_ids => RuleIDs,
+                    client_ids => ClientIDs,
+                    rule_trigger_ts => RuleTriggerTimes
+                }
         end,
-    logger:update_process_metadata(#{
-        rule_ids => RuleIDs,
-        client_ids => ClientIDs,
-        rule_trigger_times => RuleTriggerTimes,
-        stop_action_after_render => StopAfterRenderVal
-    }),
+    logger:update_process_metadata(TraceMetadata),
     ok;
 set_rule_id_trace_meta_data(Request) ->
     set_rule_id_trace_meta_data([Request]),
     ok.
 
+reset_logger_process_metadata(undefined = _PrevProcessMetadata) ->
+    logger:unset_process_metadata();
+reset_logger_process_metadata(PrevProcessMetadata) ->
+    logger:set_process_metadata(PrevProcessMetadata).
+
 collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) ->
     Acc#{RuleId => true};
 collect_rule_id(?QUERY(_, _, _, _, _), Acc) ->
@@ -1221,19 +1232,11 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
 collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
     Acc.
 
-collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) ->
+collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_ts := Time}), Acc) ->
     [Time | Acc];
 collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) ->
     Acc.
 
-unset_rule_id_trace_meta_data() ->
-    logger:update_process_metadata(#{
-        rule_ids => #{},
-        client_ids => #{},
-        stop_action_after_render => false,
-        rule_trigger_times => []
-    }).
-
 %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
 extract_connector_id(Id) when is_binary(Id) ->
     case binary:split(Id, <<":">>, [global]) of

+ 49 - 32
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -70,6 +70,7 @@ apply_rule_discard_result(Rule, Columns, Envs) ->
     ok.
 
 apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
+    PrevProcessMetadata = logger:get_process_metadata(),
     set_process_trace_metadata(RuleID, Columns),
     trace_rule_sql(
         "rule_activated",
@@ -137,21 +138,26 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
             ),
             {error, {Error, StkTrace}}
     after
-        reset_process_trace_metadata(Columns)
+        reset_logger_process_metadata(PrevProcessMetadata)
     end.
 
 set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) ->
     logger:update_process_metadata(#{
         clientid => ClientID,
         rule_id => RuleID,
-        rule_trigger_time => rule_trigger_time(Columns)
+        rule_trigger_ts => [rule_trigger_time(Columns)]
     });
 set_process_trace_metadata(RuleID, Columns) ->
     logger:update_process_metadata(#{
         rule_id => RuleID,
-        rule_trigger_time => rule_trigger_time(Columns)
+        rule_trigger_ts => [rule_trigger_time(Columns)]
     }).
 
+reset_logger_process_metadata(undefined = _PrevProcessMetadata) ->
+    logger:unset_process_metadata();
+reset_logger_process_metadata(PrevProcessMetadata) ->
+    logger:set_process_metadata(PrevProcessMetadata).
+
 rule_trigger_time(Columns) ->
     case Columns of
         #{timestamp := Timestamp} ->
@@ -160,18 +166,6 @@ rule_trigger_time(Columns) ->
             erlang:system_time(millisecond)
     end.
 
-reset_process_trace_metadata(#{clientid := _ClientID}) ->
-    Meta = logger:get_process_metadata(),
-    Meta1 = maps:remove(clientid, Meta),
-    Meta2 = maps:remove(rule_id, Meta1),
-    Meta3 = maps:remove(rule_trigger_time, Meta2),
-    logger:set_process_metadata(Meta3);
-reset_process_trace_metadata(_) ->
-    Meta = logger:get_process_metadata(),
-    Meta1 = maps:remove(rule_id, Meta),
-    Meta2 = maps:remove(rule_trigger_time, Meta1),
-    logger:set_process_metadata(Meta2).
-
 do_apply_rule(
     #{
         id := RuleId,
@@ -528,30 +522,40 @@ do_handle_action_get_trace_inc_metrics_context(RuleID, Action) ->
     end.
 
 do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta) ->
-    StopAfterRender = maps:get(stop_action_after_render, TraceMeta, false),
+    StopAfterRenderMap =
+        case maps:get(stop_action_after_render, TraceMeta, false) of
+            false ->
+                #{};
+            true ->
+                #{stop_action_after_render => true}
+        end,
     case TraceMeta of
         #{
             rule_id := RuleID,
             clientid := ClientID,
-            rule_trigger_time := Timestamp
+            rule_trigger_ts := Timestamp
         } ->
-            #{
-                rule_id => RuleID,
-                clientid => ClientID,
-                action_id => Action,
-                stop_action_after_render => StopAfterRender,
-                rule_trigger_time => Timestamp
-            };
+            maps:merge(
+                #{
+                    rule_id => RuleID,
+                    clientid => ClientID,
+                    action_id => Action,
+                    rule_trigger_ts => Timestamp
+                },
+                StopAfterRenderMap
+            );
         #{
             rule_id := RuleID,
-            rule_trigger_time := Timestamp
+            rule_trigger_ts := Timestamp
         } ->
-            #{
-                rule_id => RuleID,
-                action_id => Action,
-                stop_action_after_render => StopAfterRender,
-                rule_trigger_time => Timestamp
-            }
+            maps:merge(
+                #{
+                    rule_id => RuleID,
+                    action_id => Action,
+                    rule_trigger_ts => Timestamp
+                },
+                StopAfterRenderMap
+            )
     end.
 
 action_info({bridge, BridgeType, BridgeName, _ResId}) ->
@@ -740,7 +744,20 @@ nested_put(Alias, Val, Columns0) ->
     emqx_rule_maps:nested_put(Alias, Val, Columns).
 
 inc_action_metrics(TraceCtx, Result) ->
-    _ = do_inc_action_metrics(TraceCtx, Result),
+    SavedMetaData = logger:get_process_metadata(),
+    try
+        %% To not pollute the trace we temporary remove the process meta data
+        logger:unset_process_metadata(),
+        _ = do_inc_action_metrics(TraceCtx, Result)
+    after
+        %% Setting process metadata to undefined yields an error
+        case SavedMetaData of
+            undefined ->
+                ok;
+            _ ->
+                logger:set_process_metadata(SavedMetaData)
+        end
+    end,
     Result.
 
 do_inc_action_metrics(

+ 24 - 17
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -52,7 +52,8 @@ do_apply_rule(
                     do_apply_matched_rule(
                         Rule,
                         Context,
-                        StopAfterRender
+                        StopAfterRender,
+                        EventTopics
                     );
                 false ->
                     {error, nomatch}
@@ -61,21 +62,29 @@ do_apply_rule(
             case lists:member(InTopic, EventTopics) of
                 true ->
                     %% the rule is for both publish and events, test it directly
-                    do_apply_matched_rule(Rule, Context, StopAfterRender);
+                    do_apply_matched_rule(Rule, Context, StopAfterRender, EventTopics);
                 false ->
                     {error, nomatch}
             end
     end.
 
-do_apply_matched_rule(Rule, Context, StopAfterRender) ->
-    update_process_trace_metadata(StopAfterRender),
-    ApplyRuleRes = emqx_rule_runtime:apply_rule(
-        Rule,
-        Context,
-        apply_rule_environment()
-    ),
-    reset_trace_process_metadata(StopAfterRender),
-    ApplyRuleRes.
+do_apply_matched_rule(Rule, Context, StopAfterRender, EventTopics) ->
+    PrevLoggerProcessMetadata = logger:get_process_metadata(),
+    try
+        update_process_trace_metadata(StopAfterRender),
+        FullContext = fill_default_values(
+            hd(EventTopics),
+            emqx_rule_maps:atom_key_map(Context)
+        ),
+        ApplyRuleRes = emqx_rule_runtime:apply_rule(
+            Rule,
+            FullContext,
+            apply_rule_environment()
+        ),
+        ApplyRuleRes
+    after
+        reset_logger_process_metadata(PrevLoggerProcessMetadata)
+    end.
 
 update_process_trace_metadata(true = _StopAfterRender) ->
     logger:update_process_metadata(#{
@@ -84,12 +93,10 @@ update_process_trace_metadata(true = _StopAfterRender) ->
 update_process_trace_metadata(false = _StopAfterRender) ->
     ok.
 
-reset_trace_process_metadata(true = _StopAfterRender) ->
-    Meta = logger:get_process_metadata(),
-    NewMeta = maps:remove(stop_action_after_render, Meta),
-    logger:set_process_metadata(NewMeta);
-reset_trace_process_metadata(false = _StopAfterRender) ->
-    ok.
+reset_logger_process_metadata(undefined = _PrevProcessMetadata) ->
+    logger:unset_process_metadata();
+reset_logger_process_metadata(PrevProcessMetadata) ->
+    logger:set_process_metadata(PrevProcessMetadata).
 
 %% At the time of writing the environment passed to the apply rule function is
 %% not used at all for normal actions. When it is used for custom functions it

+ 18 - 24
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -216,18 +216,15 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
                 end
             )
     end,
-    %% Check that rule_trigger_time meta field is present in all log entries
+    %% Check that rule_trigger_ts meta field is present in all log entries
     Log0 = read_rule_trace_file(TraceName, TraceType, Now),
     Log1 = binary:split(Log0, <<"\n">>, [global, trim]),
     Log2 = lists:join(<<",\n">>, Log1),
     Log3 = iolist_to_binary(["[", Log2, "]"]),
     {ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]),
-    [#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries,
+    [#{<<"meta">> := #{<<"rule_trigger_ts">> := [RuleTriggerTime]}} | _] = LogEntries,
     [
-        ?assert(
-            (maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse
-                (lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, [])))
-        )
+        ?assert(lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_ts">>, Meta, [])))
      || #{<<"meta">> := Meta} <- LogEntries
     ],
     ok.
@@ -265,8 +262,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) ->
                             <<"result">> := <<"ok">>
                         },
                     <<"rule_id">> := _,
-                    <<"rule_trigger_time">> := _,
-                    <<"stop_action_after_render">> := false,
+                    <<"rule_trigger_ts">> := _,
                     <<"trace_tag">> := <<"ACTION">>
                 },
             <<"msg">> := <<"action_success">>,
@@ -360,9 +356,10 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
                 ok;
             CheckBatchesFunRec(CurCount) ->
                 receive
-                    [{_, #{<<"stop_after_render">> := StopValue}} | _] = List ->
+                    [{_, FirstMsg} | _] = List ->
+                        StopValue = maps:get(<<"stop_after_render">>, FirstMsg, false),
                         [
-                            ?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg)
+                            ?assertEqual(StopValue, maps:get(<<"stop_after_render">>, Msg, false))
                          || {_, Msg} <- List
                         ],
                         Len = length(List),
@@ -419,21 +416,20 @@ t_apply_rule_test_format_action_failed(_Config) ->
                             <<"name">> := _,
                             <<"type">> := <<"rule_engine_test">>
                         },
-                        <<"client_ids">> := [],
                         <<"clientid">> := _,
                         <<"reason">> := <<"MY REASON">>,
                         <<"rule_id">> := _,
-                        <<"rule_ids">> := [],
-                        <<"rule_trigger_time">> := _,
-                        <<"rule_trigger_times">> := [],
-                        <<"stop_action_after_render">> := false,
+                        <<"rule_trigger_ts">> := _,
                         <<"trace_tag">> := <<"ACTION">>
                     },
                     <<"msg">> := <<"action_failed">>,
                     <<"time">> := _
                 },
                 LastEntryJSON
-            )
+            ),
+            MetaMap = maps:get(<<"meta">>, LastEntryJSON),
+            ?assert(not maps:is_key(<<"client_ids">>, MetaMap)),
+            ?assert(not maps:is_key(<<"rule_ids">>, MetaMap))
         end,
     do_apply_rule_test_format_action_failed_test(1, CheckFun).
 
@@ -494,8 +490,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
                         <<"clientid">> := _,
                         <<"reason">> := <<"request_expired">>,
                         <<"rule_id">> := _,
-                        <<"rule_trigger_time">> := _,
-                        <<"stop_action_after_render">> := false,
+                        <<"rule_trigger_ts">> := _,
                         <<"trace_tag">> := <<"ACTION">>
                     },
                 <<"msg">> := <<"action_failed">>,
@@ -511,7 +506,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
                 <<"level">> := <<"debug">>,
                 <<"meta">> :=
                     #{
-                        <<"client_ids">> := [],
                         <<"clientid">> := _,
                         <<"id">> := _,
                         <<"reason">> :=
@@ -521,17 +515,17 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
                                 <<"msg">> := <<"MY_RECOVERABLE_REASON">>
                             },
                         <<"rule_id">> := _,
-                        <<"rule_ids">> := [],
-                        <<"rule_trigger_time">> := _,
-                        <<"rule_trigger_times">> := [],
-                        <<"stop_action_after_render">> := false,
+                        <<"rule_trigger_ts">> := _,
                         <<"trace_tag">> := <<"ERROR">>
                     },
                 <<"msg">> := SendErrorMsg,
                 <<"time">> := _
             },
             ReasonEntryJSON
-        )
+        ),
+        MetaMap = maps:get(<<"meta">>, ReasonEntryJSON),
+        ?assert(not maps:is_key(<<"client_ids">>, MetaMap)),
+        ?assert(not maps:is_key(<<"rule_ids">>, MetaMap))
     end.
 
 meck_test_connector_recoverable_errors(Reason) ->