Quellcode durchsuchen

Merge pull request #9660 from olcai/refactor-ingress-bridge-counter

refactor(emqx_resource): ingress bridge counter
Erik Timan vor 3 Jahren
Ursprung
Commit
f767db4d8f

+ 1 - 1
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -136,7 +136,7 @@ drop_bridge(Name) ->
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
 %% if the bridge received msgs from the remote broker.
 %% if the bridge received msgs from the remote broker.
 on_message_received(Msg, HookPoint, ResId) ->
 on_message_received(Msg, HookPoint, ResId) ->
-    emqx_resource:inc_received(ResId),
+    emqx_resource_metrics:received_inc(ResId),
     emqx:run_hook(HookPoint, [Msg]).
     emqx:run_hook(HookPoint, [Msg]).
 
 
 %% ===================================================================
 %% ===================================================================

+ 1 - 4
apps/emqx_resource/src/emqx_resource.erl

@@ -111,7 +111,7 @@
     list_group_instances/1
     list_group_instances/1
 ]).
 ]).
 
 
--export([inc_received/1, apply_reply_fun/2]).
+-export([apply_reply_fun/2]).
 
 
 -optional_callbacks([
 -optional_callbacks([
     on_query/3,
     on_query/3,
@@ -467,8 +467,5 @@ apply_reply_fun(From, Result) ->
 
 
 %% =================================================================================
 %% =================================================================================
 
 
-inc_received(ResId) ->
-    emqx_metrics_worker:inc(?RES_METRICS, ResId, 'received').
-
 filter_instances(Filter) ->
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].

+ 16 - 0
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -55,6 +55,9 @@
     matched_inc/1,
     matched_inc/1,
     matched_inc/2,
     matched_inc/2,
     matched_get/1,
     matched_get/1,
+    received_inc/1,
+    received_inc/2,
+    received_get/1,
     retried_inc/1,
     retried_inc/1,
     retried_inc/2,
     retried_inc/2,
     retried_get/1,
     retried_get/1,
@@ -87,6 +90,7 @@ events() ->
             inflight,
             inflight,
             matched,
             matched,
             queuing,
             queuing,
+            received,
             retried_failed,
             retried_failed,
             retried_success,
             retried_success,
             success
             success
@@ -134,6 +138,8 @@ handle_telemetry_event(
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
         matched ->
         matched ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'matched', Val);
+        received ->
+            emqx_metrics_worker:inc(?RES_METRICS, ID, 'received', Val);
         retried_failed ->
         retried_failed ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'retried', Val),
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val),
@@ -309,6 +315,16 @@ matched_inc(ID, Val) ->
 matched_get(ID) ->
 matched_get(ID) ->
     emqx_metrics_worker:get(?RES_METRICS, ID, 'matched').
     emqx_metrics_worker:get(?RES_METRICS, ID, 'matched').
 
 
+%% @doc The number of messages that have been received from a bridge
+received_inc(ID) ->
+    received_inc(ID, 1).
+
+received_inc(ID, Val) ->
+    telemetry:execute([?TELEMETRY_PREFIX, received], #{counter_inc => Val}, #{resource_id => ID}).
+
+received_get(ID) ->
+    emqx_metrics_worker:get(?RES_METRICS, ID, 'received').
+
 %% @doc The number of times message sends have been retried
 %% @doc The number of times message sends have been retried
 retried_inc(ID) ->
 retried_inc(ID) ->
     retried_inc(ID, 1).
     retried_inc(ID, 1).