Kaynağa Gözat

feat(routesync): make syncer a bit more generic and reusable

Andrew Mayorov 1 yıl önce
ebeveyn
işleme
7b95273218

+ 1 - 1
apps/emqx/src/emqx_broker_sup.erl

@@ -47,7 +47,7 @@ init([]) ->
         router_syncer_pool,
         hash,
         PoolSize,
-        {emqx_router_syncer, start_link, []}
+        {emqx_router_syncer, start_link_pooled, []}
     ]),
 
     %% Shared subscription

+ 97 - 27
apps/emqx/src/emqx_router_syncer.erl

@@ -21,11 +21,17 @@
 
 -behaviour(gen_server).
 
+-export([start_link/1]).
 -export([start_link/2]).
+-export([start_link_pooled/2]).
 
 -export([push/4]).
+-export([push/5]).
 -export([wait/1]).
 
+-export([close/1]).
+-export([open/1]).
+
 -export([stats/0]).
 
 -export([
@@ -38,6 +44,15 @@
 
 -type action() :: add | delete.
 
+-type options() :: #{
+    max_batch_size => pos_integer(),
+    min_sync_interval => non_neg_integer(),
+    error_delay => non_neg_integer(),
+    error_retry_interval => non_neg_integer(),
+    initial_state => open | closed,
+    batch_handler => {module(), _Function :: atom(), _Args :: list()}
+}.
+
 -define(POOL, router_syncer_pool).
 
 -define(MAX_BATCH_SIZE, 1000).
@@ -77,13 +92,23 @@
 
 %%
 
--spec start_link(atom(), pos_integer()) ->
+-spec start_link(options()) ->
+    {ok, pid()} | {error, _Reason}.
+start_link(Options) ->
+    gen_server:start_link(?MODULE, mk_state(Options), []).
+
+-spec start_link(_Name, options()) ->
+    {ok, pid()} | {error, _Reason}.
+start_link(Name, Options) ->
+    gen_server:start_link(Name, ?MODULE, mk_state(Options), []).
+
+-spec start_link_pooled(atom(), pos_integer()) ->
     {ok, pid()}.
-start_link(Pool, Id) ->
+start_link_pooled(Pool, Id) ->
     gen_server:start_link(
         {local, emqx_utils:proc_name(?MODULE, Id)},
         ?MODULE,
-        [Pool, Id],
+        {Pool, Id, mk_state(#{})},
         []
     ).
 
@@ -93,9 +118,16 @@ when
     Opts :: #{reply => pid()}.
 push(Action, Topic, Dest, Opts) ->
     Worker = gproc_pool:pick_worker(?POOL, Topic),
+    push(Worker, Action, Topic, Dest, Opts).
+
+-spec push(_Ref, action(), emqx_types:topic(), emqx_router:dest(), Opts) ->
+    ok | _WaitRef :: reference()
+when
+    Opts :: #{reply => pid()}.
+push(Ref, Action, Topic, Dest, Opts) ->
     Prio = designate_prio(Action, Opts),
     Context = mk_push_context(Opts),
-    _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})),
+    _ = gproc:send(Ref, ?PUSH(Prio, {Action, Topic, Dest, Context})),
     case Context of
         [{MRef, _}] ->
             MRef;
@@ -134,6 +166,14 @@ mk_push_context(_) ->
 
 %%
 
+close(Ref) ->
+    gen_server:call(Ref, close, infinity).
+
+open(Ref) ->
+    gen_server:call(Ref, open, infinity).
+
+%%
+
 -type stats() :: #{
     size := non_neg_integer(),
     n_add := non_neg_integer(),
@@ -149,10 +189,34 @@ stats() ->
 
 %%
 
-init([Pool, Id]) ->
-    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
-    {ok, #{stash => stash_new()}}.
+mk_state(Options) ->
+    #{
+        state => maps:get(initial_state, Options, open),
+        stash => stash_new(),
+        retry_timer => undefined,
+        max_batch_size => maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE),
+        min_sync_interval => maps:get(min_sync_interval, Options, ?MIN_SYNC_INTERVAL),
+        error_delay => maps:get(error_delay, Options, ?ERROR_DELAY),
+        error_retry_interval => maps:get(error_retry_interval, Options, ?ERROR_RETRY_INTERVAL),
+        batch_handler => maps:get(batch_handler, Options, default)
+    }.
+
+%%
 
+init({Pool, Id, State}) ->
+    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
+    {ok, State};
+init(State) ->
+    {ok, State}.
+
+handle_call(close, _From, State) ->
+    NState = State#{state := closed},
+    {reply, ok, NState};
+handle_call(open, _From, State = #{state := closed}) ->
+    NState = run_batch_loop([], State#{state := open}),
+    {reply, ok, NState};
+handle_call(open, _From, State) ->
+    {reply, ok, State};
 handle_call(stats, _From, State = #{stash := Stash}) ->
     {reply, stash_stats(Stash), State};
 handle_call(_Call, _From, State) ->
@@ -162,11 +226,11 @@ handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info({timeout, _TRef, retry}, State) ->
-    NState = run_batch_loop([], maps:remove(retry_timer, State)),
+    NState = run_batch_loop([], State#{retry_timer := undefined}),
     {noreply, NState};
-handle_info(Push = ?PUSH(_, _), State) ->
+handle_info(Push = ?PUSH(_, _), State = #{min_sync_interval := MSI}) ->
     %% NOTE: Wait a bit to collect potentially overlapping operations.
-    ok = timer:sleep(?MIN_SYNC_INTERVAL),
+    ok = timer:sleep(MSI),
     NState = run_batch_loop([Push], State),
     {noreply, NState}.
 
@@ -175,12 +239,16 @@ terminate(_Reason, _State) ->
 
 %%
 
-run_batch_loop(Incoming, State = #{stash := Stash0}) ->
+run_batch_loop(Incoming, State = #{stash := Stash0, state := closed}) ->
+    Stash1 = stash_add(Incoming, Stash0),
+    Stash2 = stash_drain(Stash1),
+    State#{stash := Stash2};
+run_batch_loop(Incoming, State = #{stash := Stash0, max_batch_size := MBS}) ->
     Stash1 = stash_add(Incoming, Stash0),
     Stash2 = stash_drain(Stash1),
-    {Batch, Stash3} = mk_batch(Stash2),
+    {Batch, Stash3} = mk_batch(Stash2, MBS),
     ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)),
-    case run_batch(Batch) of
+    case run_batch(Batch, State) of
         Status = #{} ->
             ok = send_replies(Status, Batch),
             NState = cancel_retry_timer(State#{stash := Stash3}),
@@ -203,37 +271,37 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) ->
                 batch => batch_stats(Batch, Stash3)
             }),
             NState = State#{stash := Stash2},
-            ok = timer:sleep(?ERROR_DELAY),
+            ok = error_cooldown(NState),
             ensure_retry_timer(NState)
     end.
 
+error_cooldown(#{error_delay := ED}) ->
+    timer:sleep(ED).
+
+ensure_retry_timer(State = #{retry_timer := undefined, error_retry_interval := ERI}) ->
+    TRef = emqx_utils:start_timer(ERI, retry),
+    State#{retry_timer := TRef};
 ensure_retry_timer(State = #{retry_timer := _TRef}) ->
-    State;
-ensure_retry_timer(State) ->
-    TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry),
-    State#{retry_timer => TRef}.
+    State.
 
 cancel_retry_timer(State = #{retry_timer := TRef}) ->
     ok = emqx_utils:cancel_timer(TRef),
-    maps:remove(retry_timer, State);
+    State#{retry_timer := undefined};
 cancel_retry_timer(State) ->
     State.
 
 %%
 
-mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE ->
+mk_batch(Stash, BatchSize) when map_size(Stash) =< BatchSize ->
     %% This is perfect situation, we just use stash as batch w/o extra reallocations.
     {Stash, stash_new()};
-mk_batch(Stash) ->
+mk_batch(Stash, BatchSize) ->
     %% Take a subset of stashed operations to form a batch.
     %% Note that stash is an unordered map, it's not a queue. The order of operations is
     %% not preserved strictly, only loosely, because of how we start from high priority
     %% operations and go down to low priority ones. This might cause some operations to
     %% stay in stash for unfairly long time, when there are many high priority operations.
     %% However, it's unclear how likely this is to happen in practice.
-    mk_batch(Stash, ?MAX_BATCH_SIZE).
-
-mk_batch(Stash, BatchSize) ->
     mk_batch(?PRIO_HI, #{}, BatchSize, Stash).
 
 mk_batch(Prio, Batch, SizeLeft, Stash) ->
@@ -278,10 +346,12 @@ replyctx_send(Result, RefsPids) ->
 
 %%
 
-run_batch(Batch) when map_size(Batch) > 0 ->
+run_batch(Empty, _State) when Empty =:= #{} ->
+    #{};
+run_batch(Batch, #{batch_handler := default}) ->
     catch emqx_router:do_batch(Batch);
-run_batch(_Empty) ->
-    #{}.
+run_batch(Batch, #{batch_handler := {Module, Function, Args}}) ->
+    erlang:apply(Module, Function, [Batch | Args]).
 
 %%