Просмотр исходного кода

refactor: new metrics for resources

Shawn 3 лет назад
Родитель
Сommit
6fde37791c

+ 11 - 8
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -129,17 +129,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
         ?RES_METRICS,
         ResId,
         [
-            matched,
-            sent,
-            dropped,
-            queued,
-            batched,
-            inflight,
+            'matched',
+            'sent',
+            'dropped',
+            'queued',
+            'batched',
+            'retried',
             'sent.success',
             'sent.failed',
             'sent.exception',
-            'dropped.inflight',
-            'dropped.queued',
+            'sent.inflight',
+            'dropped.queue_not_enabled',
+            'dropped.queue_full',
+            'dropped.resource_not_found',
+            'dropped.resource_stopped',
             'dropped.other'
         ],
         [matched]

+ 43 - 18
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -77,27 +77,27 @@ start_link(Id, Index, Opts) ->
 sync_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_cast(Id, PickKey, {query, Request, Opts}).
 
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
     Result = call_query(sync, Id, ?QUERY(self(), Request), #{}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false),
     Result.
 
 -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
 simple_async_query(Id, Request, ReplyFun) ->
     Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false),
     Result.
 
@@ -252,6 +252,7 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S
     case handle_query_result(Id, Result, false) of
         %% Send failed because resource down
         true ->
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
             {keep_state, St0, {state_timeout, ResumeT, resume}};
         %% Send ok or failed but the resource is working
         false ->
@@ -263,18 +264,20 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S
                         inflight_drop(Name, Ref),
                         St0;
                     _ ->
-                        St0#{queue => drop_head(Q)}
+                        St0#{queue => drop_head(Id, Q)}
                 end,
             {keep_state, St, {state_timeout, 0, resume}}
     end.
 
-drop_head(Q) ->
+drop_head(Id, Q) ->
     {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
     ok = replayq:ack(Q1, AckRef),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1),
     Q1.
 
-query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) ->
+query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
     Acc1 = [?QUERY(From, Request) | Acc],
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'),
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
         true -> flush(St);
@@ -308,6 +311,7 @@ flush(
         inflight_name => maps:get(name, St),
         inflight_window => maps:get(async_inflight_window, St)
     },
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)),
     Result = call_query(configured, Id, Batch, QueryOpts),
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
@@ -322,8 +326,20 @@ maybe_append_queue(Id, undefined, _Items) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
     undefined;
 maybe_append_queue(Id, Q, Items) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'),
-    replayq:append(Q, Items).
+    case replayq:overflow(Q) of
+        Overflow when Overflow =< 0 ->
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'),
+            replayq:append(Q, Items);
+        Overflow ->
+            PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
+            {Q1, QAckRef, Items} = replayq:pop(Q, PopOpts),
+            ok = replayq:ack(Q1, QAckRef),
+            Dropped = length(Items),
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped),
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
+            ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
+            Q1
+    end.
 
 batch_reply_caller(Id, BatchResult, Batch) ->
     lists:foldl(
@@ -375,7 +391,8 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
 handle_query_result(Id, {error, _}, BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
     BlockWorker;
-handle_query_result(_Id, {recoverable_error, _}, _BlockWorker) ->
+handle_query_result(Id, {recoverable_error, _}, _BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
     true;
 handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
     true;
@@ -426,7 +443,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
 
 apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
     ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
 apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
@@ -438,7 +455,8 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
                 ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
                 ReplyFun = fun ?MODULE:reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = [self(), Id, Name, Ref, Query],
@@ -451,7 +469,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
 apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
     ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Requests = [Request || ?QUERY(_From, Request) <- Batch],
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
     ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
 apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
@@ -463,7 +481,8 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
                 ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, sent, length(Batch)),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
                 ReplyFun = fun ?MODULE:batch_reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
@@ -477,14 +496,20 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
 
 reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
     case reply_caller(Id, ?REPLY(From, Request, Result)) of
-        true -> ?MODULE:block(Pid);
-        false -> inflight_drop(Name, Ref)
+        true ->
+            ?MODULE:block(Pid);
+        false ->
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
+            inflight_drop(Name, Ref)
     end.
 
 batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
     case batch_reply_caller(Id, Result, Batch) of
-        true -> ?MODULE:block(Pid);
-        false -> inflight_drop(Name, Ref)
+        true ->
+            ?MODULE:block(Pid);
+        false ->
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)),
+            inflight_drop(Name, Ref)
     end.
 %%==============================================================================
 %% the inflight queue for async query