Просмотр исходного кода

feat(routing): add route sync process pool

Dedicated to synchronizing local state updates with the global view
of the routing state.
Andrew Mayorov 2 лет назад
Родитель
Сommit
54f8b47455

+ 17 - 11
apps/emqx/src/emqx_broker.erl

@@ -167,7 +167,13 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
     %% `unsubscribe` codepath. So we have to pick a worker according to the topic,
     %% but not shard. If there are topics with high number of shards, then the
     %% load across the pool will be unbalanced.
-    call(pick(Topic), {subscribe, Topic, SubPid, I});
+    Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}),
+    case Sync of
+        ok ->
+            ok;
+        Ref when is_reference(Ref) ->
+            emqx_router_syncer:wait(Ref)
+    end;
 do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
     is_binary(RealTopic)
 ->
@@ -491,8 +497,8 @@ safe_update_stats(Tab, Stat, MaxStat) ->
 call(Broker, Req) ->
     gen_server:call(Broker, Req, infinity).
 
-cast(Broker, Msg) ->
-    gen_server:cast(Broker, Msg).
+cast(Broker, Req) ->
+    gen_server:cast(Broker, Req).
 
 %% Pick a broker
 pick(Topic) ->
@@ -506,18 +512,18 @@ init([Pool, Id]) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
     {ok, #{pool => Pool, id => Id}}.
 
-handle_call({subscribe, Topic, SubPid, 0}, _From, State) ->
+handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) ->
     Existed = ets:member(?SUBSCRIBER, Topic),
     true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
-    Result = maybe_add_route(Existed, Topic),
+    Result = maybe_add_route(Existed, Topic, From),
     {reply, Result, State};
-handle_call({subscribe, Topic, SubPid, I}, _From, State) ->
+handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) ->
     Existed = ets:member(?SUBSCRIBER, Topic),
     true = ets:insert(?SUBSCRIBER, [
         {Topic, {shard, I}},
         {{shard, Topic, I}, SubPid}
     ]),
-    Result = maybe_add_route(Existed, Topic),
+    Result = maybe_add_route(Existed, Topic, From),
     {reply, Result, State};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
@@ -597,12 +603,12 @@ do_dispatch({shard, I}, Topic, Msg) ->
 
 %%
 
-maybe_add_route(_Existed = false, Topic) ->
-    emqx_router:do_add_route(Topic);
-maybe_add_route(_Existed = true, _Topic) ->
+maybe_add_route(_Existed = false, Topic, ReplyTo) ->
+    emqx_router_syncer:push(add, Topic, node(), #{reply => ReplyTo});
+maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
     ok.
 
 maybe_delete_route(_Exists = false, Topic) ->
-    emqx_router:do_delete_route(Topic);
+    emqx_router_syncer:push(delete, Topic, node(), #{});
 maybe_delete_route(_Exists = true, _Topic) ->
     ok.

+ 9 - 2
apps/emqx/src/emqx_broker_sup.erl

@@ -32,13 +32,20 @@ start_link() ->
 init([]) ->
     %% Broker pool
     PoolSize = emqx:get_config([node, broker_pool_size], emqx_vm:schedulers() * 2),
-    BrokerPool = emqx_pool_sup:spec([
+    BrokerPool = emqx_pool_sup:spec(broker_pool_sup, [
         broker_pool,
         hash,
         PoolSize,
         {emqx_broker, start_link, []}
     ]),
 
+    SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [
+        router_syncer_pool,
+        hash,
+        emqx:get_config([node, syncer_pool_size], emqx_vm:schedulers() * 2),
+        {emqx_router_syncer, start_link, []}
+    ]),
+
     %% Shared subscription
     SharedSub = #{
         id => shared_sub,
@@ -59,4 +66,4 @@ init([]) ->
         modules => [emqx_broker_helper]
     },
 
-    {ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}.
+    {ok, {{one_for_all, 0, 1}, [BrokerPool, SyncerPool, SharedSub, Helper]}}.

+ 100 - 30
apps/emqx/src/emqx_router.erl

@@ -45,6 +45,8 @@
     do_delete_route/2
 ]).
 
+-export([do_batch/1]).
+
 -export([cleanup_routes/1]).
 
 -export([
@@ -86,6 +88,8 @@
     deinit_schema/0
 ]).
 
+-export_type([dest/0]).
+
 -type group() :: binary().
 
 -type dest() :: node() | {group(), node()}.
@@ -173,12 +177,12 @@ do_add_route(Topic) when is_binary(Topic) ->
 -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_add_route(Topic, Dest) when is_binary(Topic) ->
     ok = emqx_router_helper:monitor(Dest),
-    mria_insert_route(get_schema_vsn(), Topic, Dest).
+    mria_insert_route(get_schema_vsn(), Topic, Dest, single).
 
-mria_insert_route(v2, Topic, Dest) ->
-    mria_insert_route_v2(Topic, Dest);
-mria_insert_route(v1, Topic, Dest) ->
-    mria_insert_route_v1(Topic, Dest).
+mria_insert_route(v2, Topic, Dest, Ctx) ->
+    mria_insert_route_v2(Topic, Dest, Ctx);
+mria_insert_route(v1, Topic, Dest, Ctx) ->
+    mria_insert_route_v1(Topic, Dest, Ctx).
 
 %% @doc Take a real topic (not filter) as input, return the matching topics and topic
 %% filters associated with route destination.
@@ -225,12 +229,60 @@ do_delete_route(Topic) when is_binary(Topic) ->
 
 -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_delete_route(Topic, Dest) ->
-    mria_delete_route(get_schema_vsn(), Topic, Dest).
+    mria_delete_route(get_schema_vsn(), Topic, Dest, single).
+
+mria_delete_route(v2, Topic, Dest, Ctx) ->
+    mria_delete_route_v2(Topic, Dest, Ctx);
+mria_delete_route(v1, Topic, Dest, Ctx) ->
+    mria_delete_route_v1(Topic, Dest, Ctx).
+
+do_batch(Batch) ->
+    Nodes = batch_get_dest_nodes(Batch),
+    ok = lists:foreach(fun emqx_router_helper:monitor/1, Nodes),
+    mria_batch(get_schema_vsn(), Batch).
+
+mria_batch(v2, Batch) ->
+    mria_batch_v2(Batch);
+mria_batch(v1, Batch) ->
+    mria_batch_v1(Batch).
+
+mria_batch_v2(Batch) ->
+    mria:async_dirty(?ROUTE_SHARD, fun mria_batch_run/2, [v2, Batch]).
+
+mria_batch_v1(Batch) ->
+    {atomic, Res} = mria:transaction(?ROUTE_SHARD, fun mria_batch_run/2, [v1, Batch]),
+    Res.
+
+mria_batch_run(SchemaVsn, Batch) ->
+    maps:fold(
+        fun({Topic, Dest}, Op, Errors) ->
+            case mria_batch_operation(SchemaVsn, Op, Topic, Dest) of
+                ok ->
+                    Errors;
+                Error ->
+                    Errors#{{Topic, Dest} => Error}
+            end
+        end,
+        #{},
+        Batch
+    ).
 
-mria_delete_route(v2, Topic, Dest) ->
-    mria_delete_route_v2(Topic, Dest);
-mria_delete_route(v1, Topic, Dest) ->
-    mria_delete_route_v1(Topic, Dest).
+mria_batch_operation(SchemaVsn, add, Topic, Dest) ->
+    mria_insert_route(SchemaVsn, Topic, Dest, batch);
+mria_batch_operation(SchemaVsn, delete, Topic, Dest) ->
+    mria_delete_route(SchemaVsn, Topic, Dest, batch).
+
+batch_get_dest_nodes(Batch) ->
+    maps:fold(
+        fun
+            ({_Topic, Dest}, add, Acc) ->
+                ordsets:add_element(get_dest_node(Dest), Acc);
+            (_, delete, Acc) ->
+                Acc
+        end,
+        ordsets:new(),
+        Batch
+    ).
 
 -spec select(Spec, _Limit :: pos_integer(), Continuation) ->
     {[emqx_types:route()], Continuation} | '$end_of_table'
@@ -305,43 +357,51 @@ pick(Topic) ->
 %% Schema v1
 %% --------------------------------------------------------------------
 
-mria_insert_route_v1(Topic, Dest) ->
+mria_insert_route_v1(Topic, Dest, Ctx) ->
     Route = #route{topic = Topic, dest = Dest},
     case emqx_topic:wildcard(Topic) of
         true ->
-            mria_route_tab_insert_update_trie(Route);
+            mria_route_tab_insert_update_trie(Route, Ctx);
         false ->
-            mria_route_tab_insert(Route)
+            mria_route_tab_insert(Route, Ctx)
     end.
 
-mria_route_tab_insert_update_trie(Route) ->
+mria_route_tab_insert_update_trie(Route, single) ->
     emqx_router_utils:maybe_trans(
         fun emqx_router_utils:insert_trie_route/2,
         [?ROUTE_TAB, Route],
         ?ROUTE_SHARD
-    ).
+    );
+mria_route_tab_insert_update_trie(Route, batch) ->
+    emqx_router_utils:insert_trie_route(?ROUTE_TAB, Route).
 
-mria_route_tab_insert(Route) ->
-    mria:dirty_write(?ROUTE_TAB, Route).
+mria_route_tab_insert(Route, single) ->
+    mria:dirty_write(?ROUTE_TAB, Route);
+mria_route_tab_insert(Route, batch) ->
+    mnesia:write(?ROUTE_TAB, Route, write).
 
-mria_delete_route_v1(Topic, Dest) ->
+mria_delete_route_v1(Topic, Dest, Ctx) ->
     Route = #route{topic = Topic, dest = Dest},
     case emqx_topic:wildcard(Topic) of
         true ->
-            mria_route_tab_delete_update_trie(Route);
+            mria_route_tab_delete_update_trie(Route, Ctx);
         false ->
-            mria_route_tab_delete(Route)
+            mria_route_tab_delete(Route, Ctx)
     end.
 
-mria_route_tab_delete_update_trie(Route) ->
+mria_route_tab_delete_update_trie(Route, single) ->
     emqx_router_utils:maybe_trans(
         fun emqx_router_utils:delete_trie_route/2,
         [?ROUTE_TAB, Route],
         ?ROUTE_SHARD
-    ).
+    );
+mria_route_tab_delete_update_trie(Route, batch) ->
+    emqx_router_utils:delete_trie_route(?ROUTE_TAB, Route).
 
-mria_route_tab_delete(Route) ->
-    mria:dirty_delete_object(?ROUTE_TAB, Route).
+mria_route_tab_delete(Route, single) ->
+    mria:dirty_delete_object(?ROUTE_TAB, Route);
+mria_route_tab_delete(Route, batch) ->
+    mnesia:delete_object(?ROUTE_TAB, Route, write).
 
 match_routes_v1(Topic) ->
     lookup_route_tab(Topic) ++
@@ -410,24 +470,34 @@ fold_routes_v1(FunName, FoldFun, AccIn) ->
 %% topics. Writes go to only one of the two tables at a time.
 %% --------------------------------------------------------------------
 
-mria_insert_route_v2(Topic, Dest) ->
+mria_insert_route_v2(Topic, Dest, Ctx) ->
     case emqx_trie_search:filter(Topic) of
         Words when is_list(Words) ->
             K = emqx_topic_index:make_key(Words, Dest),
-            mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
+            mria_filter_tab_insert(K, Ctx);
         false ->
-            mria_route_tab_insert(#route{topic = Topic, dest = Dest})
+            mria_route_tab_insert(#route{topic = Topic, dest = Dest}, Ctx)
     end.
 
-mria_delete_route_v2(Topic, Dest) ->
+mria_filter_tab_insert(K, single) ->
+    mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
+mria_filter_tab_insert(K, batch) ->
+    mnesia:write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}, write).
+
+mria_delete_route_v2(Topic, Dest, Ctx) ->
     case emqx_trie_search:filter(Topic) of
         Words when is_list(Words) ->
             K = emqx_topic_index:make_key(Words, Dest),
-            mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+            mria_filter_tab_delete(K, Ctx);
         false ->
-            mria_route_tab_delete(#route{topic = Topic, dest = Dest})
+            mria_route_tab_delete(#route{topic = Topic, dest = Dest}, Ctx)
     end.
 
+mria_filter_tab_delete(K, single) ->
+    mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+mria_filter_tab_delete(K, batch) ->
+    mnesia:delete(?ROUTE_TAB_FILTERS, K, write).
+
 match_routes_v2(Topic) ->
     lookup_route_tab(Topic) ++
         [match_to_route(M) || M <- match_filters(Topic)].

+ 235 - 0
apps/emqx/src/emqx_router_syncer.erl

@@ -0,0 +1,235 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_router_syncer).
+
+-behaviour(gen_server).
+
+-export([start_link/2]).
+
+-export([push/4]).
+-export([wait/1]).
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+
+-type action() :: add | delete.
+
+-define(POOL, router_syncer_pool).
+
+-define(MAX_BATCH_SIZE, 4000).
+-define(MIN_SYNC_INTERVAL, 1).
+
+-define(HIGHEST_PRIO, 1).
+-define(LOWEST_PRIO, 4).
+
+-define(PUSH(PRIO, OP), {PRIO, OP}).
+
+-define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}).
+
+%%
+
+-spec start_link(atom(), pos_integer()) ->
+    {ok, pid()}.
+start_link(Pool, Id) ->
+    gen_server:start_link(
+        {local, emqx_utils:proc_name(?MODULE, Id)},
+        ?MODULE,
+        [Pool, Id],
+        []
+    ).
+
+-spec push(action(), emqx_types:topic(), emqx_router:dest(), Opts) ->
+    ok | _WaitRef :: reference()
+when
+    Opts :: #{reply => pid()}.
+push(Action, Topic, Dest, Opts) ->
+    Worker = gproc_pool:pick_worker(?POOL, Topic),
+    Prio = designate_prio(Action, Opts),
+    Context = mk_push_context(Opts),
+    Worker ! ?PUSH(Prio, {Action, Topic, Dest, Context}),
+    case Context of
+        {MRef, _} ->
+            MRef;
+        [] ->
+            ok
+    end.
+
+-spec wait(_WaitRef :: reference()) ->
+    ok | {error, _Reason}.
+wait(MRef) ->
+    %% FIXME: timeouts
+    receive
+        {MRef, Result} ->
+            Result
+    end.
+
+designate_prio(_, #{reply := true}) ->
+    ?HIGHEST_PRIO;
+designate_prio(add, #{}) ->
+    2;
+designate_prio(delete, #{}) ->
+    3.
+
+mk_push_context(#{reply := To}) ->
+    MRef = erlang:make_ref(),
+    {MRef, To};
+mk_push_context(_) ->
+    [].
+
+%%
+
+init([Pool, Id]) ->
+    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
+    {ok, #{queue => []}}.
+
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(Push = ?PUSH(_, _), State) ->
+    %% NOTE: Wait a bit to collect potentially overlapping operations.
+    ok = timer:sleep(?MIN_SYNC_INTERVAL),
+    NState = run_batch_loop([Push], State),
+    {noreply, NState}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+%%
+
+run_batch_loop(Incoming, State = #{queue := Queue}) ->
+    NQueue = queue_join(Queue, gather_operations(Incoming)),
+    {Batch, N, FQueue} = mk_batch(NQueue),
+    %% TODO: retry if error?
+    Errors = run_batch(Batch),
+    0 = send_replies(Errors, N, NQueue),
+    %% TODO: squash queue
+    NState = State#{queue := queue_fix(FQueue)},
+    case queue_empty(FQueue) of
+        true ->
+            NState;
+        false ->
+            run_batch_loop([], NState)
+    end.
+
+%%
+
+mk_batch(Queue) ->
+    mk_batch(Queue, 0, #{}).
+
+mk_batch(Queue, N, Batch) when map_size(Batch) =:= ?MAX_BATCH_SIZE ->
+    {Batch, N, Queue};
+mk_batch([Op = ?OP(_, _, _, _) | Queue], N, Batch) ->
+    NBatch = batch_add_operation(Op, Batch),
+    mk_batch(Queue, N + 1, NBatch);
+mk_batch([Run | Queue], N, Batch) when is_list(Run) ->
+    case mk_batch(Run, N, Batch) of
+        {NBatch, N1, []} ->
+            mk_batch(Queue, N1, NBatch);
+        {NBatch, N1, Left} ->
+            {NBatch, N1, [Left | Queue]}
+    end;
+mk_batch([], N, Batch) ->
+    {Batch, N, []}.
+
+batch_add_operation(?OP(Action, Topic, Dest, _ReplyCtx), Batch) ->
+    case Batch of
+        #{{Topic, Dest} := Action} ->
+            Batch;
+        #{{Topic, Dest} := delete} when Action == add ->
+            Batch#{{Topic, Dest} := add};
+        #{{Topic, Dest} := add} when Action == delete ->
+            maps:remove({Topic, Dest}, Batch);
+        #{} ->
+            maps:put({Topic, Dest}, Action, Batch)
+    end.
+
+send_replies(_Result, 0, _Queue) ->
+    0;
+send_replies(Result, N, [Op = ?OP(_, _, _, _) | Queue]) ->
+    _ = replyctx_send(Result, Op),
+    send_replies(Result, N - 1, Queue);
+send_replies(Result, N, [Run | Queue]) when is_list(Run) ->
+    N1 = send_replies(Result, N, Run),
+    send_replies(Result, N1, Queue);
+send_replies(_Result, N, []) ->
+    N.
+
+replyctx_send(_Result, ?OP(_, _, _, [])) ->
+    noreply;
+replyctx_send(Result, ?OP(_, Topic, Dest, {MRef, Pid})) ->
+    case Result of
+        #{{Topic, Dest} := Error} ->
+            Pid ! {MRef, Error};
+        #{} ->
+            Pid ! {MRef, ok}
+    end.
+
+%%
+
+run_batch(Batch) ->
+    emqx_router:do_batch(Batch).
+
+%%
+
+queue_fix([]) ->
+    [];
+queue_fix(Queue) when length(Queue) < ?LOWEST_PRIO ->
+    queue_fix([[] | Queue]);
+queue_fix(Queue) ->
+    Queue.
+
+queue_join(Q1, []) ->
+    Q1;
+queue_join([], Q2) ->
+    Q2;
+queue_join(Q1, Q2) ->
+    lists:zipwith(fun join_list/2, Q1, Q2).
+
+join_list(L1, []) ->
+    L1;
+join_list([], L2) ->
+    L2;
+join_list(L1, L2) ->
+    [L1, L2].
+
+queue_empty(Queue) ->
+    lists:all(fun(L) -> L == [] end, Queue).
+
+gather_operations(Incoming) ->
+    [
+        pick_operations(Prio, Incoming) ++ drain_operations(Prio)
+     || Prio <- lists:seq(?HIGHEST_PRIO, ?LOWEST_PRIO)
+    ].
+
+drain_operations(Prio) ->
+    receive
+        {Prio, Op} ->
+            [Op | drain_operations(Prio)]
+    after 0 ->
+        []
+    end.
+
+pick_operations(Prio, Incoming) ->
+    [Op || {P, Op} <- Incoming, P =:= Prio].