Explorar o código

Merge pull request #14096 from HJianBo/add-metrics-for-commit-tx

feat: expose tnx_id of cluster_rpc for prometheus
JianBo He hai 1 ano
pai
achega
c80159ed3a

+ 7 - 0
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -41,6 +41,7 @@
     commit/2,
     commit_status_trans/2,
     get_cluster_tnx_id/0,
+    get_current_tnx_id/0,
     get_node_tnx_id/1,
     init_mfa/2,
     force_sync_tnx_id/3,
@@ -486,6 +487,12 @@ get_cluster_tnx_id() ->
         Id -> Id
     end.
 
+get_current_tnx_id() ->
+    case mnesia:dirty_read(?CLUSTER_COMMIT, node()) of
+        [] -> ?DEFAULT_INIT_TXN_ID;
+        [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId
+    end.
+
 get_oldest_mfa_id() ->
     case mnesia:first(?CLUSTER_MFA) of
         '$end_of_table' -> 0;

+ 25 - 1
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -216,6 +216,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
     ok = add_collect_family(Callback, authn_metric_meta(), ?MG(emqx_authn_data, RawData)),
 
     ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)),
+    ok = add_collect_family(Callback, cluster_rpc_meta(), ?MG(cluster_rpc, RawData)),
     ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)),
     ok = maybe_add_ds_collect_family(Callback, RawData),
     ok = maybe_license_add_collect_family(Callback, RawData),
@@ -258,7 +259,8 @@ collect(<<"json">>) ->
         olp => collect_json_data(?MG(emqx_olp_data, RawData)),
         acl => collect_json_data(?MG(emqx_acl_data, RawData)),
         authn => collect_json_data(?MG(emqx_authn_data, RawData)),
-        certs => collect_cert_json_data(?MG(cert_data, RawData))
+        certs => collect_cert_json_data(?MG(cert_data, RawData)),
+        cluster_rpc => collect_json_data(?MG(cluster_rpc, RawData))
     };
 collect(<<"prometheus">>) ->
     prometheus_text_format:format(?PROMETHEUS_DEFAULT_REGISTRY).
@@ -288,6 +290,7 @@ fetch_from_local_node(Mode) ->
         emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
         emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
         emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode),
+        cluster_rpc => cluster_rpc_data(Mode),
         mria_data => mria_data(Mode)
     }}.
 
@@ -310,6 +313,7 @@ aggre_or_zip_init_acc() ->
         emqx_olp_data => meta_to_init_from(olp_metric_meta()),
         emqx_acl_data => meta_to_init_from(acl_metric_meta()),
         emqx_authn_data => meta_to_init_from(authn_metric_meta()),
+        cluster_rpc => meta_to_init_from(cluster_rpc_meta()),
         mria_data => meta_to_init_from(mria_metric_meta())
     }.
 
@@ -495,6 +499,8 @@ emqx_collect(K = emqx_license_expiry_at, D) -> gauge_metric(?MG(K, D));
 %%--------------------------------------------------------------------
 %% Certs
 emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D));
+%% Cluster RPC
+emqx_collect(K = emqx_conf_sync_txid, D) -> gauge_metrics(?MG(K, D));
 %% Mria
 %% ========== core
 emqx_collect(K = emqx_mria_last_intercepted_trans, D) -> gauge_metrics(?MG(K, D, []));
@@ -998,6 +1004,13 @@ not_after_epoch(#'Certificate'{
 not_after_epoch(_) ->
     0.
 
+%%========================================
+%% Cluster RPC
+%%========================================
+
+cluster_rpc_meta() ->
+    [{emqx_conf_sync_txid, gauge, undefined}].
+
 %%========================================
 %% Mria
 %%========================================
@@ -1021,6 +1034,17 @@ mria_metric_meta(replicant) ->
         {emqx_mria_replayq_len, gauge, replayq_len}
     ].
 
+cluster_rpc_data(Mode) ->
+    Labels =
+        case Mode of
+            ?PROM_DATA_MODE__NODE -> [];
+            _ -> [{node, node(self())}]
+        end,
+    DataFun = fun() -> emqx_cluster_rpc:get_current_tnx_id() end,
+    #{
+        emqx_conf_sync_txid => [{Labels, catch_all(DataFun)}]
+    }.
+
 mria_data(Mode) ->
     case mria_rlog:backend() of
         rlog ->

+ 24 - 0
apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl

@@ -336,6 +336,7 @@ metric_meta(<<"emqx_vm_total_memory">>) -> ?meta(0, 1, 1);
 metric_meta(<<"emqx_vm_used_memory">>) -> ?meta(0, 1, 1);
 metric_meta(<<"emqx_cluster_nodes_running">>) -> ?meta(0, 1, 1);
 metric_meta(<<"emqx_cluster_nodes_stopped">>) -> ?meta(0, 1, 1);
+metric_meta(<<"emqx_conf_sync_txid">>) -> ?meta(0, 1, 1);
 %% END
 metric_meta(<<"emqx_cert_expiry_at">>) -> ?meta(2, 2, 2);
 metric_meta(<<"emqx_license_expiry_at">>) -> ?meta(0, 0, 0);
@@ -641,6 +642,29 @@ assert_json_data__certs(Ms, _) ->
         Ms
     ).
 
+assert_json_data__cluster_rpc(Ms, Mode) when
+    Mode =:= ?PROM_DATA_MODE__NODE;
+    Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED
+->
+    ?assertMatch(
+        #{
+            emqx_conf_sync_txid := _
+        },
+        Ms
+    );
+assert_json_data__cluster_rpc(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
+    lists:foreach(
+        fun(M) ->
+            ?assertMatch(
+                #{
+                    emqx_conf_sync_txid := _
+                },
+                M
+            )
+        end,
+        Ms
+    ).
+
 eval_foreach_assert(FunctionName, Ms) ->
     Fun = fun() ->
         ok = lists:foreach(

+ 1 - 0
changes/ce/feat-14096.en.md

@@ -0,0 +1 @@
+Expose `cluster_rpc_txid` as a Prometheus metric for monitoring the configuration file synchronization status of each node in the cluster.