|
|
@@ -1174,12 +1174,13 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
|
|
|
{ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
|
|
|
{error, {unrecoverable_error, unhealthy_target}};
|
|
|
{ok, _Group, Resource} ->
|
|
|
+ PrevLoggerProcessMetadata = logger:get_process_metadata(),
|
|
|
QueryResult =
|
|
|
try
|
|
|
set_rule_id_trace_meta_data(Query),
|
|
|
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource)
|
|
|
after
|
|
|
- unset_rule_id_trace_meta_data()
|
|
|
+ reset_logger_process_metadata(PrevLoggerProcessMetadata)
|
|
|
end,
|
|
|
QueryResult;
|
|
|
{error, not_found} ->
|
|
|
@@ -1216,6 +1217,11 @@ set_rule_id_trace_meta_data(Request) ->
|
|
|
set_rule_id_trace_meta_data([Request]),
|
|
|
ok.
|
|
|
|
|
|
+reset_logger_process_metadata(undefined = _PrevProcessMetadata) ->
|
|
|
+ logger:unset_process_metadata();
|
|
|
+reset_logger_process_metadata(PrevProcessMetadata) ->
|
|
|
+ logger:set_process_metadata(PrevProcessMetadata).
|
|
|
+
|
|
|
collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) ->
|
|
|
Acc#{RuleId => true};
|
|
|
collect_rule_id(?QUERY(_, _, _, _, _), Acc) ->
|
|
|
@@ -1231,25 +1237,6 @@ collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_ts := Time}), Acc)
|
|
|
collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) ->
|
|
|
Acc.
|
|
|
|
|
|
-unset_rule_id_trace_meta_data() ->
|
|
|
- case logger:get_process_metadata() of
|
|
|
- undefined ->
|
|
|
- ok;
|
|
|
- OldLoggerProcessMetadata ->
|
|
|
- NewLoggerProcessMetadata =
|
|
|
- maps:without(
|
|
|
- [
|
|
|
- rule_ids,
|
|
|
- client_ids,
|
|
|
- stop_action_after_render,
|
|
|
- rule_trigger_ts
|
|
|
- ],
|
|
|
- OldLoggerProcessMetadata
|
|
|
- ),
|
|
|
- logger:set_process_metadata(NewLoggerProcessMetadata),
|
|
|
- ok
|
|
|
- end.
|
|
|
-
|
|
|
%% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
|
|
|
extract_connector_id(Id) when is_binary(Id) ->
|
|
|
case binary:split(Id, <<":">>, [global]) of
|