Просмотр исходного кода

fix(rule trace): rename rule_trigger_time(s) and cleaups

This commit renames trace fields rule_trigger_time and
rule_trigger_times to rule_trigger_ts and makes sure that the value for
rule_trigger_ts will always be a list of timestamps.
Kjell Winblad 1 год назад
Родитель
Сommit
2e6db85578

+ 21 - 9
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1190,7 +1190,8 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
     %% Get the rule ids from requests
     RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
     ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
-    RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
+    RuleTriggerTimes0 = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
+    RuleTriggerTimes = lists:flatten(RuleTriggerTimes0),
     StopAfterRenderVal =
         case Requests of
             %% We know that the batch is not mixed since we prevent this by
@@ -1203,7 +1204,7 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
     logger:update_process_metadata(#{
         rule_ids => RuleIDs,
         client_ids => ClientIDs,
-        rule_trigger_times => RuleTriggerTimes,
+        rule_trigger_ts => RuleTriggerTimes,
         stop_action_after_render => StopAfterRenderVal
     }),
     ok;
@@ -1221,18 +1222,29 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
 collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
     Acc.
 
-collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) ->
+collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_ts := Time}), Acc) ->
     [Time | Acc];
 collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) ->
     Acc.
 
 unset_rule_id_trace_meta_data() ->
-    logger:update_process_metadata(#{
-        rule_ids => #{},
-        client_ids => #{},
-        stop_action_after_render => false,
-        rule_trigger_times => []
-    }).
+    case logger:get_process_metadata() of
+        undefined ->
+            ok;
+        OldLoggerProcessMetadata ->
+            NewLoggerProcessMetadata =
+                maps:without(
+                    [
+                        rule_ids,
+                        client_ids,
+                        stop_action_after_render,
+                        rule_trigger_ts
+                    ],
+                    OldLoggerProcessMetadata
+                ),
+            logger:set_process_metadata(NewLoggerProcessMetadata),
+            ok
+    end.
 
 %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
 extract_connector_id(Id) when is_binary(Id) ->

+ 25 - 15
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -144,12 +144,12 @@ set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) ->
     logger:update_process_metadata(#{
         clientid => ClientID,
         rule_id => RuleID,
-        rule_trigger_time => rule_trigger_time(Columns)
+        rule_trigger_ts => [rule_trigger_time(Columns)]
     });
 set_process_trace_metadata(RuleID, Columns) ->
     logger:update_process_metadata(#{
         rule_id => RuleID,
-        rule_trigger_time => rule_trigger_time(Columns)
+        rule_trigger_ts => [rule_trigger_time(Columns)]
     }).
 
 rule_trigger_time(Columns) ->
@@ -161,16 +161,26 @@ rule_trigger_time(Columns) ->
     end.
 
 reset_process_trace_metadata(#{clientid := _ClientID}) ->
-    Meta = logger:get_process_metadata(),
-    Meta1 = maps:remove(clientid, Meta),
-    Meta2 = maps:remove(rule_id, Meta1),
-    Meta3 = maps:remove(rule_trigger_time, Meta2),
-    logger:set_process_metadata(Meta3);
+    Meta0 = logger:get_process_metadata(),
+    Meta1 = maps:without(
+        [
+            clientid,
+            rule_id,
+            rule_trigger_ts
+        ],
+        Meta0
+    ),
+    logger:set_process_metadata(Meta1);
 reset_process_trace_metadata(_) ->
-    Meta = logger:get_process_metadata(),
-    Meta1 = maps:remove(rule_id, Meta),
-    Meta2 = maps:remove(rule_trigger_time, Meta1),
-    logger:set_process_metadata(Meta2).
+    Meta0 = logger:get_process_metadata(),
+    Meta1 = maps:without(
+        [
+            rule_id,
+            rule_trigger_ts
+        ],
+        Meta0
+    ),
+    logger:set_process_metadata(Meta1).
 
 do_apply_rule(
     #{
@@ -533,24 +543,24 @@ do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta
         #{
             rule_id := RuleID,
             clientid := ClientID,
-            rule_trigger_time := Timestamp
+            rule_trigger_ts := Timestamp
         } ->
             #{
                 rule_id => RuleID,
                 clientid => ClientID,
                 action_id => Action,
                 stop_action_after_render => StopAfterRender,
-                rule_trigger_time => Timestamp
+                rule_trigger_ts => Timestamp
             };
         #{
             rule_id := RuleID,
-            rule_trigger_time := Timestamp
+            rule_trigger_ts := Timestamp
         } ->
             #{
                 rule_id => RuleID,
                 action_id => Action,
                 stop_action_after_render => StopAfterRender,
-                rule_trigger_time => Timestamp
+                rule_trigger_ts => Timestamp
             }
     end.
 

+ 12 - 16
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -216,18 +216,15 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
                 end
             )
     end,
-    %% Check that rule_trigger_time meta field is present in all log entries
+    %% Check that rule_trigger_ts meta field is present in all log entries
     Log0 = read_rule_trace_file(TraceName, TraceType, Now),
     Log1 = binary:split(Log0, <<"\n">>, [global, trim]),
     Log2 = lists:join(<<",\n">>, Log1),
     Log3 = iolist_to_binary(["[", Log2, "]"]),
     {ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]),
-    [#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries,
+    [#{<<"meta">> := #{<<"rule_trigger_ts">> := [RuleTriggerTime]}} | _] = LogEntries,
     [
-        ?assert(
-            (maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse
-                (lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, [])))
-        )
+        ?assert(lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_ts">>, Meta, [])))
      || #{<<"meta">> := Meta} <- LogEntries
     ],
     ok.
@@ -265,7 +262,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) ->
                             <<"result">> := <<"ok">>
                         },
                     <<"rule_id">> := _,
-                    <<"rule_trigger_time">> := _,
+                    <<"rule_trigger_ts">> := _,
                     <<"stop_action_after_render">> := false,
                     <<"trace_tag">> := <<"ACTION">>
                 },
@@ -422,7 +419,7 @@ t_apply_rule_test_format_action_failed(_Config) ->
                         <<"clientid">> := _,
                         <<"reason">> := <<"MY REASON">>,
                         <<"rule_id">> := _,
-                        <<"rule_trigger_time">> := _,
+                        <<"rule_trigger_ts">> := _,
                         <<"stop_action_after_render">> := false,
                         <<"trace_tag">> := <<"ACTION">>
                     },
@@ -433,8 +430,7 @@ t_apply_rule_test_format_action_failed(_Config) ->
             ),
             MetaMap = maps:get(<<"meta">>, LastEntryJSON),
             ?assert(not maps:is_key(<<"client_ids">>, MetaMap)),
-            ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)),
-            ?assert(not maps:is_key(<<"rule_trigger_times">>, MetaMap))
+            ?assert(not maps:is_key(<<"rule_ids">>, MetaMap))
         end,
     do_apply_rule_test_format_action_failed_test(1, CheckFun).
 
@@ -495,7 +491,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
                         <<"clientid">> := _,
                         <<"reason">> := <<"request_expired">>,
                         <<"rule_id">> := _,
-                        <<"rule_trigger_time">> := _,
+                        <<"rule_trigger_ts">> := _,
                         <<"stop_action_after_render">> := false,
                         <<"trace_tag">> := <<"ACTION">>
                     },
@@ -512,7 +508,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
                 <<"level">> := <<"debug">>,
                 <<"meta">> :=
                     #{
-                        <<"client_ids">> := [],
                         <<"clientid">> := _,
                         <<"id">> := _,
                         <<"reason">> :=
@@ -522,9 +517,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
                                 <<"msg">> := <<"MY_RECOVERABLE_REASON">>
                             },
                         <<"rule_id">> := _,
-                        <<"rule_ids">> := [],
-                        <<"rule_trigger_time">> := _,
-                        <<"rule_trigger_times">> := [],
+                        <<"rule_trigger_ts">> := _,
                         <<"stop_action_after_render">> := false,
                         <<"trace_tag">> := <<"ERROR">>
                     },
@@ -532,7 +525,10 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
                 <<"time">> := _
             },
             ReasonEntryJSON
-        )
+        ),
+        MetaMap = maps:get(<<"meta">>, ReasonEntryJSON),
+        ?assert(not maps:is_key(<<"client_ids">>, MetaMap)),
+        ?assert(not maps:is_key(<<"rule_ids">>, MetaMap))
     end.
 
 meck_test_connector_recoverable_errors(Reason) ->