瀏覽代碼

Merge pull request #12912 from kjellwinblad/kjell/rule_trigger_time/EMQX-12025

feat(rule tracing): add rule trigger time meta data field
Kjell Winblad 1 年之前
父節點
當前提交
e1eed30b5d

+ 14 - 2
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1163,6 +1163,7 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
     %% Get the rule ids from requests
     RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
     ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
+    RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
     StopAfterRenderVal =
         case Requests of
             %% We know that the batch is not mixed since we prevent this by
@@ -1173,7 +1174,10 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
                 false
         end,
     logger:update_process_metadata(#{
-        rule_ids => RuleIDs, client_ids => ClientIDs, stop_action_after_render => StopAfterRenderVal
+        rule_ids => RuleIDs,
+        client_ids => ClientIDs,
+        rule_trigger_times => RuleTriggerTimes,
+        stop_action_after_render => StopAfterRenderVal
     }),
     ok;
 set_rule_id_trace_meta_data(Request) ->
@@ -1190,9 +1194,17 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
 collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
     Acc.
 
+collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) ->
+    [Time | Acc];
+collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) ->
+    Acc.
+
 unset_rule_id_trace_meta_data() ->
     logger:update_process_metadata(#{
-        rule_ids => #{}, client_ids => #{}, stop_action_after_render => false
+        rule_ids => #{},
+        client_ids => #{},
+        stop_action_after_render => false,
+        rule_trigger_times => []
     }).
 
 %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1

+ 25 - 11
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -139,25 +139,35 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
         reset_process_trace_metadata(Columns)
     end.
 
-set_process_trace_metadata(RuleID, #{clientid := ClientID}) ->
+set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) ->
     logger:update_process_metadata(#{
-        rule_id => RuleID,
         clientid => ClientID
-    });
-set_process_trace_metadata(RuleID, _) ->
+    }),
+    set_process_trace_metadata(RuleID, maps:remove(clientid, Columns));
+set_process_trace_metadata(RuleID, Columns) ->
+    EventTimestamp =
+        case Columns of
+            #{timestamp := Timestamp} ->
+                Timestamp;
+            _ ->
+                erlang:system_time(millisecond)
+        end,
     logger:update_process_metadata(#{
-        rule_id => RuleID
+        rule_id => RuleID,
+        rule_trigger_time => EventTimestamp
     }).
 
 reset_process_trace_metadata(#{clientid := _ClientID}) ->
     Meta = logger:get_process_metadata(),
     Meta1 = maps:remove(clientid, Meta),
     Meta2 = maps:remove(rule_id, Meta1),
-    logger:set_process_metadata(Meta2);
+    Meta3 = maps:remove(rule_trigger_time, Meta2),
+    logger:set_process_metadata(Meta3);
 reset_process_trace_metadata(_) ->
     Meta = logger:get_process_metadata(),
     Meta1 = maps:remove(rule_id, Meta),
-    logger:set_process_metadata(Meta1).
+    Meta2 = maps:remove(rule_trigger_time, Meta1),
+    logger:set_process_metadata(Meta2).
 
 do_apply_rule(
     #{
@@ -499,21 +509,25 @@ do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta
     case TraceMeta of
         #{
             rule_id := RuleID,
-            clientid := ClientID
+            clientid := ClientID,
+            rule_trigger_time := Timestamp
         } ->
             #{
                 rule_id => RuleID,
                 clientid => ClientID,
                 action_id => Action,
-                stop_action_after_render => StopAfterRender
+                stop_action_after_render => StopAfterRender,
+                rule_trigger_time => Timestamp
             };
         #{
-            rule_id := RuleID
+            rule_id := RuleID,
+            rule_trigger_time := Timestamp
         } ->
             #{
                 rule_id => RuleID,
                 action_id => Action,
-                stop_action_after_render => StopAfterRender
+                stop_action_after_render => StopAfterRender,
+                rule_trigger_time => Timestamp
             }
     end.
 

+ 14 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -159,6 +159,20 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
                 end
             )
     end,
+    %% Check that rule_trigger_time meta field is present in all log entries
+    Log0 = read_rule_trace_file(TraceName, TraceType, Now),
+    Log1 = binary:split(Log0, <<"\n">>, [global, trim]),
+    Log2 = lists:join(<<",\n">>, Log1),
+    Log3 = iolist_to_binary(["[", Log2, "]"]),
+    {ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]),
+    [#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries,
+    [
+        ?assert(
+            (maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse
+                (lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, [])))
+        )
+     || #{<<"meta">> := Meta} <- LogEntries
+    ],
     emqx_trace:delete(TraceName),
     ok.