|
|
@@ -927,6 +927,7 @@ merge_counters(OldCounters, DeltaCounters) ->
|
|
|
flush_metrics(Data = #{id := Id, counters := Counters}) ->
|
|
|
bump_counters(Id, Counters),
|
|
|
set_gauges(Data),
|
|
|
+ log_expired_message_count(Data),
|
|
|
ensure_metrics_flush_timer(Data#{counters := #{}}).
|
|
|
|
|
|
-spec ensure_metrics_flush_timer(data()) -> data().
|
|
|
@@ -966,6 +967,22 @@ do_bump_counters1(dropped_resource_not_found, Val, Id) ->
|
|
|
do_bump_counters1(dropped_resource_stopped, Val, Id) ->
|
|
|
emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val).
|
|
|
|
|
|
+-spec log_expired_message_count(data()) -> ok.
|
|
|
+log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counters}) ->
|
|
|
+ ExpiredCount = maps:get(dropped_expired, Counters, 0),
|
|
|
+ case ExpiredCount > 0 of
|
|
|
+ false ->
|
|
|
+ ok;
|
|
|
+ true ->
|
|
|
+ ?SLOG(info, #{
|
|
|
+ msg => "buffer_worker_dropped_expired_messages",
|
|
|
+ resource_id => Id,
|
|
|
+ worker_index => Index,
|
|
|
+ expired_count => ExpiredCount
|
|
|
+ }),
|
|
|
+ ok
|
|
|
+ end.
|
|
|
+
|
|
|
-spec set_gauges(data()) -> ok.
|
|
|
set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->
|
|
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
|