Просмотр исходного кода

feat: add unrecoverable_resource_error throttle

zhongwencool 1 год назад
Родитель
Сommit
2924ec582a

+ 2 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -80,7 +80,8 @@
     cannot_publish_to_topic_due_to_not_authorized,
     cannot_publish_to_topic_due_to_quota_exceeded,
     connection_rejected_due_to_license_limit_reached,
-    dropped_msg_due_to_mqueue_is_full
+    dropped_msg_due_to_mqueue_is_full,
+    unrecoverable_resource_error
 ]).
 
 %% Callback to upgrade config after loaded from config file but before validation.

+ 44 - 24
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -298,10 +298,10 @@ running(info, {flush_metrics, _Ref}, _Data) ->
 running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
-    ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}),
+    ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}, #{tag => ?TAG}),
     handle_async_worker_down(Data0, Pid);
 running(info, Info, _St) ->
-    ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}),
+    ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}, #{tag => ?TAG}),
     keep_state_and_data.
 
 blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
@@ -331,10 +331,10 @@ blocked(info, {flush_metrics, _Ref}, _Data) ->
 blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
-    ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}),
+    ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}, #{tag => ?TAG}),
     handle_async_worker_down(Data0, Pid);
 blocked(info, Info, _Data) ->
-    ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}),
+    ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}, #{tag => ?TAG}),
     keep_state_and_data.
 
 terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
@@ -981,7 +981,11 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
+                    ?SLOG_THROTTLE(error, #{
+                        resource_id => Id,
+                        msg => unrecoverable_resource_error,
+                        reason => Reason
+                    }),
                     ok
                 end,
             Counters =
@@ -1021,7 +1025,11 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
+                    ?SLOG_THROTTLE(error, #{
+                        resource_id => Id,
+                        msg => unrecoverable_resource_error,
+                        reason => Reason
+                    }),
                     ok
                 end,
             Counters =
@@ -1141,12 +1149,16 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte
         false ->
             ok;
         true ->
-            ?SLOG(info, #{
-                msg => "buffer_worker_dropped_expired_messages",
-                resource_id => Id,
-                worker_index => Index,
-                expired_count => ExpiredCount
-            }),
+            ?SLOG(
+                info,
+                #{
+                    msg => "buffer_worker_dropped_expired_messages",
+                    resource_id => Id,
+                    worker_index => Index,
+                    expired_count => ExpiredCount
+                },
+                #{tag => ?TAG}
+            ),
             ok
     end.
 
@@ -1556,7 +1568,7 @@ handle_async_reply1(
     case is_expired(ExpireAt, Now) of
         true ->
             IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
-            %% evalutate metrics call here since we're not inside
+            %% evaluate metrics call here since we're not inside
             %% buffer worker
             IsAcked andalso
                 begin
@@ -1797,12 +1809,16 @@ append_queue(Id, Index, Q, Queries) ->
                 ok = replayq:ack(Q1, QAckRef),
                 Dropped = length(Items2),
                 Counters = #{dropped_queue_full => Dropped},
-                ?SLOG(info, #{
-                    msg => "buffer_worker_overflow",
-                    resource_id => Id,
-                    worker_index => Index,
-                    dropped => Dropped
-                }),
+                ?SLOG(
+                    info,
+                    #{
+                        msg => "buffer_worker_overflow",
+                        resource_id => Id,
+                        worker_index => Index,
+                        dropped => Dropped
+                    },
+                    #{tag => ?TAG}
+                ),
                 {Items2, Q1, Counters}
         end,
     ?tp(
@@ -2236,11 +2252,15 @@ adjust_batch_time(Id, RequestTTL, BatchTime0) ->
     BatchTime = max(0, min(BatchTime0, RequestTTL div 2)),
     case BatchTime =:= BatchTime0 of
         false ->
-            ?SLOG(info, #{
-                id => Id,
-                msg => "adjusting_buffer_worker_batch_time",
-                new_batch_time => BatchTime
-            });
+            ?SLOG(
+                info,
+                #{
+                    resource_id => Id,
+                    msg => "adjusting_buffer_worker_batch_time",
+                    new_batch_time => BatchTime
+                },
+                #{tag => ?TAG}
+            );
         true ->
             ok
     end,