Explorar el Código

feat(cluster-link): add simple replication actor GC process

Andrew Mayorov hace 1 año
padre
commit
0219b8bd4d

+ 19 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -13,6 +13,8 @@
 
 -define(MQTT_HOST_OPTS, #{default_port => 1883}).
 
+-define(DEFAULT_ACTOR_TTL, 30_000).
+
 -export([
     %% General
     cluster/0,
@@ -22,7 +24,11 @@
     topic_filters/1,
     %% Connections
     emqtt_options/1,
-    mk_emqtt_options/1
+    mk_emqtt_options/1,
+    %% Actor Lifecycle
+    actor_ttl/0,
+    actor_gc_interval/0,
+    actor_heartbeat_interval/0
 ]).
 
 -export([
@@ -58,6 +64,18 @@ emqtt_options(LinkName) ->
 topic_filters(LinkName) ->
     maps:get(topics, ?MODULE:link(LinkName), []).
 
+-spec actor_ttl() -> _Milliseconds :: pos_integer().
+actor_ttl() ->
+    ?DEFAULT_ACTOR_TTL.
+
+-spec actor_gc_interval() -> _Milliseconds :: pos_integer().
+actor_gc_interval() ->
+    actor_ttl().
+
+-spec actor_heartbeat_interval() -> _Milliseconds :: pos_integer().
+actor_heartbeat_interval() ->
+    actor_ttl() div 3.
+
 %%
 
 mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->

+ 16 - 15
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -67,8 +67,6 @@
 
 -include_lib("emqx/include/emqx.hrl").
 
--define(DEFAULT_ACTOR_TTL_MS, 30_000).
-
 -define(EXTROUTE_SHARD, ?MODULE).
 -define(EXTROUTE_TAB, emqx_external_router_route).
 -define(EXTROUTE_ACTOR_TAB, emqx_external_router_actor).
@@ -280,16 +278,22 @@ apply_operation(Entry, MCounter, OpName, Lane) ->
             MCounter
     end.
 
--spec actor_gc(env()) -> ok.
+-spec actor_gc(env()) -> _NumCleaned :: non_neg_integer().
 actor_gc(#{timestamp := Now}) ->
     MS = [{#actor{until = '$1', _ = '_'}, [{'<', '$1', Now}], ['$_']}],
-    case mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS) of
-        [Rec | _Rest] ->
-            %% NOTE: One at a time.
-            clean_incarnation(Rec);
-        [] ->
-            ok
-    end.
+    Dead = mnesia:dirty_select(?EXTROUTE_ACTOR_TAB, MS),
+    try_clean_incarnation(Dead).
+
+try_clean_incarnation([Rec | Rest]) ->
+    %% NOTE: One at a time.
+    case clean_incarnation(Rec) of
+        ok ->
+            1;
+        stale ->
+            try_clean_incarnation(Rest)
+    end;
+try_clean_incarnation([]) ->
+    0.
 
 mnesia_assign_lane(Cluster) ->
     Assignment = lists:foldl(
@@ -323,7 +327,7 @@ mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = La
             _ = clean_lane(Lane),
             mnesia:delete(?EXTROUTE_ACTOR_TAB, Actor, write);
         _Renewed ->
-            ok
+            stale
     end.
 
 clean_lane(Lane) ->
@@ -368,7 +372,4 @@ first_zero_bit(N, I) ->
 %%
 
 bump_actor_ttl(TS) ->
-    TS + get_actor_ttl().
-
-get_actor_ttl() ->
-    ?DEFAULT_ACTOR_TTL_MS.
+    TS + emqx_cluster_link_config:actor_ttl().

+ 95 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl

@@ -0,0 +1,95 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_extrouter_gc).
+
+-include_lib("emqx/include/logger.hrl").
+
+-export([start_link/0]).
+
+-export([run/0]).
+
+-behaviour(gen_server).
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+-define(SERVER, ?MODULE).
+
+-define(REPEAT_GC_INTERVAL, 5_000).
+
+%%
+
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+run() ->
+    gen_server:call(?SERVER, run).
+
+%%
+
+-record(st, {
+    gc_timer :: reference()
+}).
+
+init(_) ->
+    {ok, schedule_gc(#st{})}.
+
+handle_call(run, _From, St) ->
+    Result = run_gc(),
+    Timeout = choose_timeout(Result),
+    {reply, Result, reschedule_gc(Timeout, St)};
+handle_call(_Call, _From, St) ->
+    {reply, ignored, St}.
+
+handle_cast(Cast, State) ->
+    ?SLOG(warning, #{msg => "unexpected_cast", cast => Cast}),
+    {noreply, State}.
+
+handle_info({timeout, TRef, _GC}, St = #st{gc_timer = TRef}) ->
+    Result = run_gc_exclusive(),
+    Timeout = choose_timeout(Result),
+    {noreply, schedule_gc(Timeout, St#st{gc_timer = undefined})};
+handle_info(Info, St) ->
+    ?SLOG(warning, #{msg => "unexpected_info", info => Info}),
+    {noreply, St}.
+
+%%
+
+run_gc_exclusive() ->
+    case is_responsible() of
+        true -> run_gc();
+        false -> 0
+    end.
+
+is_responsible() ->
+    Nodes = lists:sort(mria_membership:running_core_nodelist()),
+    Nodes =/= [] andalso hd(Nodes) == node().
+
+-spec run_gc() -> _NumCleaned :: non_neg_integer().
+run_gc() ->
+    Env = #{timestamp => erlang:system_time(millisecond)},
+    emqx_cluster_link_extrouter:actor_gc(Env).
+
+choose_timeout(_NumCleaned = 0) ->
+    emqx_cluster_link_config:actor_gc_interval();
+choose_timeout(_NumCleaned) ->
+    %% NOTE: There could likely be more outdated actors.
+    ?REPEAT_GC_INTERVAL.
+
+schedule_gc(St) ->
+    schedule_gc(emqx_cluster_link_config:actor_gc_interval(), St).
+
+schedule_gc(Timeout, St = #st{gc_timer = undefined}) ->
+    TRef = erlang:start_timer(Timeout, self(), gc),
+    St#st{gc_timer = TRef}.
+
+reschedule_gc(Timeout, St = #st{gc_timer = undefined}) ->
+    schedule_gc(Timeout, St);
+reschedule_gc(Timeout, St = #st{gc_timer = TRef}) ->
+    ok = emqx_utils:cancel_timer(TRef),
+    schedule_gc(Timeout, St#st{gc_timer = undefined}).

+ 2 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -54,7 +54,6 @@
 
 -define(RECONNECT_TIMEOUT, 5_000).
 -define(ACTOR_REINIT_TIMEOUT, 7000).
--define(HEARTBEAT_INTERVAL, 10_000).
 
 -define(CLIENT_SUFFIX, ":routesync:").
 -define(PS_CLIENT_SUFFIX, ":routesync-ps:").
@@ -475,7 +474,8 @@ process_heartbeat(St = #st{client = ClientPid, actor = Actor, incarnation = Inca
     schedule_heartbeat(St).
 
 schedule_heartbeat(St = #st{heartbeat_timer = undefined}) ->
-    TRef = erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat),
+    Timeout = emqx_cluster_link_config:actor_heartbeat_interval(),
+    TRef = erlang:start_timer(Timeout, self(), heartbeat),
     St#st{heartbeat_timer = TRef}.
 
 %% Bootstrapping.

+ 12 - 4
apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl

@@ -10,7 +10,6 @@
 
 -export([init/1]).
 
--define(COORD_SUP, emqx_cluster_link_coord_sup).
 -define(SERVER, ?MODULE).
 
 start_link(LinksConf) ->
@@ -22,12 +21,21 @@ init(LinksConf) ->
         intensity => 10,
         period => 5
     },
-    %% Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)],
-    Children = [
+    ExtrouterGC = extrouter_gc_spec(),
+    RouteActors = [
         sup_spec(Name, emqx_cluster_link_router_syncer, [Name])
      || #{upstream := Name} <- LinksConf
     ],
-    {ok, {SupFlags, Children}}.
+    {ok, {SupFlags, [ExtrouterGC | RouteActors]}}.
+
+extrouter_gc_spec() ->
+    %% NOTE: This one is currently global, not per-link.
+    #{
+        id => {extrouter, gc},
+        start => {emqx_cluster_link_extrouter_gc, start_link, []},
+        restart => permanent,
+        type => worker
+    }.
 
 sup_spec(Id, Mod, Args) ->
     #{

+ 9 - 3
apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl

@@ -124,7 +124,10 @@ t_actor_gc(_Config) ->
         topics_sorted()
     ),
     _AS13 = apply_operation(heartbeat, AS12, 50_000),
-    ok = emqx_cluster_link_extrouter:actor_gc(env(60_000)),
+    ?assertEqual(
+        1,
+        emqx_cluster_link_extrouter:actor_gc(env(60_000))
+    ),
     ?assertEqual(
         [<<"topic/#">>, <<"topic/42/+">>],
         topics_sorted()
@@ -133,7 +136,10 @@ t_actor_gc(_Config) ->
         _IncarnationMismatch,
         apply_operation({add, {<<"toolate/#">>, id}}, AS21)
     ),
-    ok = emqx_cluster_link_extrouter:actor_gc(env(120_000)),
+    ?assertEqual(
+        1,
+        emqx_cluster_link_extrouter:actor_gc(env(120_000))
+    ),
     ?assertEqual(
         [],
         topics_sorted()
@@ -273,7 +279,7 @@ run_actor({Actor, Seq}) ->
             ({TS, heartbeat}, AS) ->
                 apply_operation(heartbeat, AS, TS);
             ({TS, gc}, AS) ->
-                ok = emqx_cluster_link_extrouter:actor_gc(env(TS)),
+                _NC = emqx_cluster_link_extrouter:actor_gc(env(TS)),
                 AS;
             ({_TS, {sleep, MS}}, AS) ->
                 ok = timer:sleep(MS),