Sfoglia il codice sorgente

fix(cluster link metrics): use periodic full table scan and gauge to count routes

Thales Macedo Garitezi 1 anno fa
parent
commit
dda73651c5

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_api.erl

@@ -281,7 +281,7 @@ aggregate_metrics(NodeMetrics) ->
 
 format_metrics(Node, RouterMetrics, ResourceMetrics) ->
     Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end,
-    Routes = Get([counters, ?route_metric], RouterMetrics),
+    Routes = Get([gauges, ?route_metric], RouterMetrics),
     #{
         node => Node,
         metrics => #{

+ 84 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl

@@ -0,0 +1,84 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_bookkeeper).
+
+%% API
+-export([
+    start_link/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_continue/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%% call/cast/info events
+-record(tally_routes, {}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link() -> gen_server:start_ret().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_Opts) ->
+    State = #{},
+    {ok, State, {continue, #tally_routes{}}}.
+
+handle_continue(#tally_routes{}, State) ->
+    handle_tally_routes(),
+    {noreply, State}.
+
+handle_call(_Call, _From, State) ->
+    {reply, {error, bad_call}, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(#tally_routes{}, State) ->
+    handle_tally_routes(),
+    {noreply, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+cluster_names() ->
+    Links = emqx_cluster_link_config:links(),
+    lists:map(fun(#{name := Name}) -> Name end, Links).
+
+ensure_timer(Event, Timeout) ->
+    _ = erlang:send_after(Timeout, self(), Event),
+    ok.
+
+handle_tally_routes() ->
+    ClusterNames = cluster_names(),
+    tally_routes(ClusterNames),
+    ensure_timer(#tally_routes{}, emqx_cluster_link_config:tally_routes_interval()),
+    ok.
+
+tally_routes([ClusterName | ClusterNames]) ->
+    Tab = emqx_cluster_link_extrouter:extroute_tab(),
+    Pat = emqx_cluster_link_extrouter:cluster_routes_ms(ClusterName),
+    NumRoutes = ets:select_count(Tab, Pat),
+    emqx_cluster_link_metrics:routes_set(ClusterName, NumRoutes),
+    tally_routes(ClusterNames);
+tally_routes([]) ->
+    ok.

+ 7 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -46,7 +46,9 @@
     %% Actor Lifecycle
     actor_ttl/0,
     actor_gc_interval/0,
-    actor_heartbeat_interval/0
+    actor_heartbeat_interval/0,
+    %% Metrics
+    tally_routes_interval/0
 ]).
 
 -export([
@@ -163,6 +165,10 @@ actor_gc_interval() ->
 actor_heartbeat_interval() ->
     actor_ttl() div 3.
 
+-spec tally_routes_interval() -> _Milliseconds :: timeout().
+tally_routes_interval() ->
+    emqx_config:get([cluster, tally_routes_interval]).
+
 %%
 
 mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->

+ 23 - 11
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -34,6 +34,9 @@
     apply_actor_operation/5
 ]).
 
+%% Internal export for bookkeeping
+-export([cluster_routes_ms/1, extroute_tab/0]).
+
 %% Strictly monotonically increasing integer.
 -type smint() :: integer().
 
@@ -147,6 +150,18 @@ make_extroute_rec_pat(Entry) ->
         [{1, extroute}, {#extroute.entry, Entry}]
     ).
 
+%% Internal exports for bookkeeping
+cluster_routes_ms(ClusterName) ->
+    TopicPat = '_',
+    RouteIDPat = '_',
+    Pat = make_extroute_rec_pat(
+        emqx_trie_search:make_pat(TopicPat, ?ROUTE_ID(ClusterName, RouteIDPat))
+    ),
+    [{Pat, [], [true]}].
+
+extroute_tab() ->
+    ?EXTROUTE_TAB.
+
 %%
 
 -record(state, {
@@ -256,33 +271,31 @@ actor_apply_operation(
 
 apply_actor_operation(ActorID, Incarnation, Entry, OpName, Lane) ->
     _ = assert_current_incarnation(ActorID, Incarnation),
-    apply_operation(ActorID, Entry, OpName, Lane).
+    apply_operation(Entry, OpName, Lane).
 
-apply_operation(ActorID, Entry, OpName, Lane) ->
+apply_operation(Entry, OpName, Lane) ->
     %% NOTE
     %% This is safe sequence of operations only on core nodes. On replicants,
     %% `mria:dirty_update_counter/3` will be replicated asynchronously, which
     %% means this read can be stale.
     case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of
         [#extroute{mcounter = MCounter}] ->
-            apply_operation(ActorID, Entry, MCounter, OpName, Lane);
+            apply_operation(Entry, MCounter, OpName, Lane);
         [] ->
-            apply_operation(ActorID, Entry, 0, OpName, Lane)
+            apply_operation(Entry, 0, OpName, Lane)
     end.
 
-apply_operation(ActorID, Entry, MCounter, OpName, Lane) ->
+apply_operation(Entry, MCounter, OpName, Lane) ->
     %% NOTE
     %% We are relying on the fact that changes to each individual lane of this
     %% multi-counter are synchronized. Without this, such counter updates would
     %% be unsafe. Instead, we would have to use another, more complex approach,
     %% that runs `ets:lookup/2` + `ets:select_replace/2` in a loop until the
     %% counter is updated accordingly.
-    ?ACTOR_ID(ClusterName, _Actor) = ActorID,
     Marker = 1 bsl Lane,
     case MCounter band Marker of
         0 when OpName =:= add ->
             Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker),
-            _ = emqx_cluster_link_metrics:routes_inc(ClusterName, 1),
             ?tp("cluster_link_extrouter_route_added", #{}),
             Res;
         Marker when OpName =:= add ->
@@ -293,7 +306,6 @@ apply_operation(ActorID, Entry, MCounter, OpName, Lane) ->
                 0 ->
                     Record = #extroute{entry = Entry, mcounter = 0},
                     ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record),
-                    _ = emqx_cluster_link_metrics:routes_inc(ClusterName, -1),
                     ?tp("cluster_link_extrouter_route_deleted", #{}),
                     0;
                 C ->
@@ -368,16 +380,16 @@ clean_incarnation(Rec = #actor{id = {Cluster, Actor}}) ->
 mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) ->
     case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of
         [#actor{incarnation = Incarnation}] ->
-            _ = clean_lane(Actor, Lane),
+            _ = clean_lane(Lane),
             mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write);
         _Renewed ->
             stale
     end.
 
-clean_lane(ActorID, Lane) ->
+clean_lane(Lane) ->
     ets:foldl(
         fun(#extroute{entry = Entry, mcounter = MCounter}, _) ->
-            apply_operation(ActorID, Entry, MCounter, delete, Lane)
+            apply_operation(Entry, MCounter, delete, Lane)
         end,
         0,
         ?EXTROUTE_TAB

+ 5 - 3
apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl

@@ -11,7 +11,7 @@
     drop_metrics/1,
 
     get_metrics/1,
-    routes_inc/2
+    routes_set/2
 ]).
 
 %%--------------------------------------------------------------------
@@ -50,8 +50,10 @@ maybe_create_metrics(ClusterName) ->
 drop_metrics(ClusterName) ->
     ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName).
 
-routes_inc(ClusterName, Val) ->
-    catch emqx_metrics_worker:inc(?METRIC_NAME, ClusterName, ?route_metric, Val).
+routes_set(ClusterName, Val) ->
+    catch emqx_metrics_worker:set_gauge(
+        ?METRIC_NAME, ClusterName, <<"singleton">>, ?route_metric, Val
+    ).
 
 %%--------------------------------------------------------------------
 %% Internal functions

+ 12 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl

@@ -30,7 +30,18 @@ namespace() -> "cluster".
 roots() -> [].
 
 injected_fields() ->
-    #{cluster => [{links, links_schema(#{})}]}.
+    #{
+        cluster => [
+            {links, links_schema(#{})},
+            {tally_routes_interval,
+                hoconsc:mk(
+                    emqx_schema:timeout_duration(), #{
+                        default => <<"15s">>,
+                        importance => ?IMPORTANCE_HIDDEN
+                    }
+                )}
+        ]
+    }.
 
 links_schema(Meta) ->
     ?HOCON(?ARRAY(?R_REF("link")), Meta#{

+ 11 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl

@@ -30,12 +30,13 @@ init(LinksConf) ->
         period => 5
     },
     Metrics = emqx_metrics_worker:child_spec(metrics, ?METRIC_NAME),
+    BookKeeper = bookkeeper_spec(),
     ExtrouterGC = extrouter_gc_spec(),
     RouteActors = [
         sup_spec(Name, ?ACTOR_MODULE, [LinkConf])
      || #{name := Name} = LinkConf <- LinksConf
     ],
-    {ok, {SupFlags, [Metrics, ExtrouterGC | RouteActors]}}.
+    {ok, {SupFlags, [Metrics, BookKeeper, ExtrouterGC | RouteActors]}}.
 
 extrouter_gc_spec() ->
     %% NOTE: This one is currently global, not per-link.
@@ -56,6 +57,15 @@ sup_spec(Id, Mod, Args) ->
         modules => [Mod]
     }.
 
+bookkeeper_spec() ->
+    #{
+        id => bookkeeper,
+        start => {emqx_cluster_link_bookkeeper, start_link, []},
+        restart => permanent,
+        type => worker,
+        shutdown => 5_000
+    }.
+
 ensure_actor(#{name := Name} = LinkConf) ->
     case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of
         {ok, Pid} ->

+ 2 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -53,6 +53,7 @@ mk_source_cluster(BaseName, Config) ->
     SourceConf =
         "cluster {"
         "\n name = cl.source"
+        "\n tally_routes_interval = 300ms"
         "\n links = ["
         "\n   { enable = true"
         "\n     name = cl.target"
@@ -75,6 +76,7 @@ mk_target_cluster(BaseName, Config) ->
     TargetConf =
         "cluster {"
         "\n name = cl.target"
+        "\n tally_routes_interval = 300ms"
         "\n links = ["
         "\n   { enable = true"
         "\n     name = cl.source"

+ 40 - 21
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -600,14 +600,22 @@ t_metrics(Config) ->
             #{?snk_kind := clink_route_sync_complete}
         ),
 
-    %% Routes = 2 in source cluster, because the target cluster has some topic filters
-    %% configured and subscribers to them, which were replicated to the source cluster.
-    ?assertMatch(
-        {200, #{
-            <<"metrics">> := #{<<"routes">> := 2},
-            <<"node_metrics">> := _
-        }},
-        get_metrics(source, SourceName)
+    %% Routes = 4 in source cluster, because the target cluster has some topic filters
+    %% configured and subscribers to them, which were replicated to the source cluster,
+    %% and we have 2 nodes with 2 routes each.
+    ?retry(
+        300,
+        10,
+        ?assertMatch(
+            {200, #{
+                <<"metrics">> := #{<<"routes">> := 4},
+                <<"node_metrics">> := [
+                    #{<<"metrics">> := #{<<"routes">> := 2}},
+                    #{<<"metrics">> := #{<<"routes">> := 2}}
+                ]
+            }},
+            get_metrics(source, SourceName)
+        )
     ),
     ?assertMatch(
         {200, #{
@@ -627,12 +635,19 @@ t_metrics(Config) ->
             #{?snk_kind := clink_route_sync_complete}
         ),
 
-    ?assertMatch(
-        {200, #{
-            <<"metrics">> := #{<<"routes">> := 1},
-            <<"node_metrics">> := _
-        }},
-        get_metrics(source, SourceName)
+    ?retry(
+        300,
+        10,
+        ?assertMatch(
+            {200, #{
+                <<"metrics">> := #{<<"routes">> := 2},
+                <<"node_metrics">> := [
+                    #{<<"metrics">> := #{<<"routes">> := 1}},
+                    #{<<"metrics">> := #{<<"routes">> := 1}}
+                ]
+            }},
+            get_metrics(source, SourceName)
+        )
     ),
 
     %% Disabling the link should remove the routes.
@@ -658,12 +673,16 @@ t_metrics(Config) ->
             #{?snk_kind := "cluster_link_extrouter_route_deleted"}
         ),
 
-    ?assertMatch(
-        {200, #{
-            <<"metrics">> := #{<<"routes">> := 0},
-            <<"node_metrics">> := _
-        }},
-        get_metrics(source, SourceName)
+    ?retry(
+        300,
+        10,
+        ?assertMatch(
+            {200, #{
+                <<"metrics">> := #{<<"routes">> := 0},
+                <<"node_metrics">> := _
+            }},
+            get_metrics(source, SourceName)
+        )
     ),
 
     %% Enabling again
@@ -678,7 +697,7 @@ t_metrics(Config) ->
 
     ?assertMatch(
         {200, #{
-            <<"metrics">> := #{<<"routes">> := 1},
+            <<"metrics">> := #{<<"routes">> := 2},
             <<"node_metrics">> := _
         }},
         get_metrics(source, SourceName)