Przeglądaj źródła

Merge remote-tracking branch 'origin/master' into release-55

Zaiming (Stone) Shi 2 lat temu
rodzic
commit
58a54adbb4

+ 41 - 11
apps/emqx/src/emqx_broker.erl

@@ -167,7 +167,13 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
     %% `unsubscribe` codepath. So we have to pick a worker according to the topic,
     %% but not shard. If there are topics with high number of shards, then the
     %% load across the pool will be unbalanced.
-    call(pick(Topic), {subscribe, Topic, SubPid, I});
+    Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}),
+    case Sync of
+        ok ->
+            ok;
+        Ref when is_reference(Ref) ->
+            emqx_router_syncer:wait(Ref)
+    end;
 do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
     is_binary(RealTopic)
 ->
@@ -491,8 +497,8 @@ safe_update_stats(Tab, Stat, MaxStat) ->
 call(Broker, Req) ->
     gen_server:call(Broker, Req, infinity).
 
-cast(Broker, Msg) ->
-    gen_server:cast(Broker, Msg).
+cast(Broker, Req) ->
+    gen_server:cast(Broker, Req).
 
 %% Pick a broker
 pick(Topic) ->
@@ -506,18 +512,18 @@ init([Pool, Id]) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
     {ok, #{pool => Pool, id => Id}}.
 
-handle_call({subscribe, Topic, SubPid, 0}, _From, State) ->
+handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) ->
     Existed = ets:member(?SUBSCRIBER, Topic),
     true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
-    Result = maybe_add_route(Existed, Topic),
+    Result = maybe_add_route(Existed, Topic, From),
     {reply, Result, State};
-handle_call({subscribe, Topic, SubPid, I}, _From, State) ->
+handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) ->
     Existed = ets:member(?SUBSCRIBER, Topic),
     true = ets:insert(?SUBSCRIBER, [
         {Topic, {shard, I}},
         {{shard, Topic, I}, SubPid}
     ]),
-    Result = maybe_add_route(Existed, Topic),
+    Result = maybe_add_route(Existed, Topic, From),
     {reply, Result, State};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
@@ -597,12 +603,36 @@ do_dispatch({shard, I}, Topic, Msg) ->
 
 %%
 
-maybe_add_route(_Existed = false, Topic) ->
-    emqx_router:do_add_route(Topic);
-maybe_add_route(_Existed = true, _Topic) ->
+maybe_add_route(_Existed = false, Topic, ReplyTo) ->
+    sync_route(add, Topic, #{reply => ReplyTo});
+maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
     ok.
 
 maybe_delete_route(_Exists = false, Topic) ->
-    emqx_router:do_delete_route(Topic);
+    sync_route(delete, Topic, #{});
 maybe_delete_route(_Exists = true, _Topic) ->
     ok.
+
+sync_route(Action, Topic, ReplyTo) ->
+    EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
+    case EnabledOn of
+        all ->
+            push_sync_route(Action, Topic, ReplyTo);
+        none ->
+            regular_sync_route(Action, Topic);
+        Role ->
+            case Role =:= mria_config:whoami() of
+                true ->
+                    push_sync_route(Action, Topic, ReplyTo);
+                false ->
+                    regular_sync_route(Action, Topic)
+            end
+    end.
+
+push_sync_route(Action, Topic, Opts) ->
+    emqx_router_syncer:push(Action, Topic, node(), Opts).
+
+regular_sync_route(add, Topic) ->
+    emqx_router:do_add_route(Topic, node());
+regular_sync_route(delete, Topic) ->
+    emqx_router:do_delete_route(Topic, node()).

+ 9 - 2
apps/emqx/src/emqx_broker_sup.erl

@@ -32,13 +32,20 @@ start_link() ->
 init([]) ->
     %% Broker pool
     PoolSize = emqx:get_config([node, broker_pool_size], emqx_vm:schedulers() * 2),
-    BrokerPool = emqx_pool_sup:spec([
+    BrokerPool = emqx_pool_sup:spec(broker_pool_sup, [
         broker_pool,
         hash,
         PoolSize,
         {emqx_broker, start_link, []}
     ]),
 
+    SyncerPool = emqx_pool_sup:spec(syncer_pool_sup, [
+        router_syncer_pool,
+        hash,
+        PoolSize,
+        {emqx_router_syncer, start_link, []}
+    ]),
+
     %% Shared subscription
     SharedSub = #{
         id => shared_sub,
@@ -59,4 +66,4 @@ init([]) ->
         modules => [emqx_broker_helper]
     },
 
-    {ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}.
+    {ok, {{one_for_all, 0, 1}, [SyncerPool, BrokerPool, SharedSub, Helper]}}.

+ 108 - 31
apps/emqx/src/emqx_router.erl

@@ -45,6 +45,13 @@
     do_delete_route/2
 ]).
 
+%% Mria Activity RPC targets
+-export([
+    mria_batch_run/2
+]).
+
+-export([do_batch/1]).
+
 -export([cleanup_routes/1]).
 
 -export([
@@ -86,10 +93,15 @@
     deinit_schema/0
 ]).
 
--type group() :: binary().
+-export_type([dest/0]).
 
+-type group() :: binary().
 -type dest() :: node() | {group(), node()}.
 
+%% Operation :: {add, ...} | {delete, ...}.
+-type batch() :: #{batch_route() => _Operation :: tuple()}.
+-type batch_route() :: {emqx_types:topic(), dest()}.
+
 -record(routeidx, {
     entry :: '$1' | emqx_topic_index:key(dest()),
     unused = [] :: nil()
@@ -173,12 +185,12 @@ do_add_route(Topic) when is_binary(Topic) ->
 -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_add_route(Topic, Dest) when is_binary(Topic) ->
     ok = emqx_router_helper:monitor(Dest),
-    mria_insert_route(get_schema_vsn(), Topic, Dest).
+    mria_insert_route(get_schema_vsn(), Topic, Dest, single).
 
-mria_insert_route(v2, Topic, Dest) ->
-    mria_insert_route_v2(Topic, Dest);
-mria_insert_route(v1, Topic, Dest) ->
-    mria_insert_route_v1(Topic, Dest).
+mria_insert_route(v2, Topic, Dest, Ctx) ->
+    mria_insert_route_v2(Topic, Dest, Ctx);
+mria_insert_route(v1, Topic, Dest, Ctx) ->
+    mria_insert_route_v1(Topic, Dest, Ctx).
 
 %% @doc Take a real topic (not filter) as input, return the matching topics and topic
 %% filters associated with route destination.
@@ -225,12 +237,35 @@ do_delete_route(Topic) when is_binary(Topic) ->
 
 -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
 do_delete_route(Topic, Dest) ->
-    mria_delete_route(get_schema_vsn(), Topic, Dest).
+    mria_delete_route(get_schema_vsn(), Topic, Dest, single).
+
+mria_delete_route(v2, Topic, Dest, Ctx) ->
+    mria_delete_route_v2(Topic, Dest, Ctx);
+mria_delete_route(v1, Topic, Dest, Ctx) ->
+    mria_delete_route_v1(Topic, Dest, Ctx).
+
+-spec do_batch(batch()) -> #{batch_route() => _Error}.
+do_batch(Batch) ->
+    mria_batch(get_schema_vsn(), Batch).
+
+mria_batch(v2, Batch) ->
+    mria_batch_v2(Batch);
+mria_batch(v1, Batch) ->
+    mria_batch_v1(Batch).
+
+mria_batch_v2(Batch) ->
+    mria:async_dirty(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v2, Batch]).
+
+mria_batch_v1(Batch) ->
+    case mria:transaction(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v1, Batch]) of
+        {atomic, Result} ->
+            Result;
+        Error ->
+            Error
+    end.
 
-mria_delete_route(v2, Topic, Dest) ->
-    mria_delete_route_v2(Topic, Dest);
-mria_delete_route(v1, Topic, Dest) ->
-    mria_delete_route_v1(Topic, Dest).
+batch_get_action(Op) ->
+    element(1, Op).
 
 -spec select(Spec, _Limit :: pos_integer(), Continuation) ->
     {[emqx_types:route()], Continuation} | '$end_of_table'
@@ -301,47 +336,79 @@ call(Router, Msg) ->
 pick(Topic) ->
     gproc_pool:pick_worker(router_pool, Topic).
 
+%%--------------------------------------------------------------------
+%% Route batch RPC targets
+%%--------------------------------------------------------------------
+
+-spec mria_batch_run(schemavsn(), batch()) -> #{batch_route() => _Error}.
+mria_batch_run(SchemaVsn, Batch) ->
+    maps:fold(
+        fun({Topic, Dest}, Op, Errors) ->
+            case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of
+                ok ->
+                    Errors;
+                Error ->
+                    Errors#{{Topic, Dest} => Error}
+            end
+        end,
+        #{},
+        Batch
+    ).
+
+mria_batch_operation(SchemaVsn, add, Topic, Dest) ->
+    mria_insert_route(SchemaVsn, Topic, Dest, batch);
+mria_batch_operation(SchemaVsn, delete, Topic, Dest) ->
+    mria_delete_route(SchemaVsn, Topic, Dest, batch).
+
 %%--------------------------------------------------------------------
 %% Schema v1
 %% --------------------------------------------------------------------
 
-mria_insert_route_v1(Topic, Dest) ->
+mria_insert_route_v1(Topic, Dest, Ctx) ->
     Route = #route{topic = Topic, dest = Dest},
     case emqx_topic:wildcard(Topic) of
         true ->
-            mria_route_tab_insert_update_trie(Route);
+            mria_route_tab_insert_update_trie(Route, Ctx);
         false ->
-            mria_route_tab_insert(Route)
+            mria_route_tab_insert(Route, Ctx)
     end.
 
-mria_route_tab_insert_update_trie(Route) ->
+mria_route_tab_insert_update_trie(Route, single) ->
     emqx_router_utils:maybe_trans(
         fun emqx_router_utils:insert_trie_route/2,
         [?ROUTE_TAB, Route],
         ?ROUTE_SHARD
-    ).
+    );
+mria_route_tab_insert_update_trie(Route, batch) ->
+    emqx_router_utils:insert_trie_route(?ROUTE_TAB, Route).
 
-mria_route_tab_insert(Route) ->
-    mria:dirty_write(?ROUTE_TAB, Route).
+mria_route_tab_insert(Route, single) ->
+    mria:dirty_write(?ROUTE_TAB, Route);
+mria_route_tab_insert(Route, batch) ->
+    mnesia:write(?ROUTE_TAB, Route, write).
 
-mria_delete_route_v1(Topic, Dest) ->
+mria_delete_route_v1(Topic, Dest, Ctx) ->
     Route = #route{topic = Topic, dest = Dest},
     case emqx_topic:wildcard(Topic) of
         true ->
-            mria_route_tab_delete_update_trie(Route);
+            mria_route_tab_delete_update_trie(Route, Ctx);
         false ->
-            mria_route_tab_delete(Route)
+            mria_route_tab_delete(Route, Ctx)
     end.
 
-mria_route_tab_delete_update_trie(Route) ->
+mria_route_tab_delete_update_trie(Route, single) ->
     emqx_router_utils:maybe_trans(
         fun emqx_router_utils:delete_trie_route/2,
         [?ROUTE_TAB, Route],
         ?ROUTE_SHARD
-    ).
+    );
+mria_route_tab_delete_update_trie(Route, batch) ->
+    emqx_router_utils:delete_trie_route(?ROUTE_TAB, Route).
 
-mria_route_tab_delete(Route) ->
-    mria:dirty_delete_object(?ROUTE_TAB, Route).
+mria_route_tab_delete(Route, single) ->
+    mria:dirty_delete_object(?ROUTE_TAB, Route);
+mria_route_tab_delete(Route, batch) ->
+    mnesia:delete_object(?ROUTE_TAB, Route, write).
 
 match_routes_v1(Topic) ->
     lookup_route_tab(Topic) ++
@@ -410,24 +477,34 @@ fold_routes_v1(FunName, FoldFun, AccIn) ->
 %% topics. Writes go to only one of the two tables at a time.
 %% --------------------------------------------------------------------
 
-mria_insert_route_v2(Topic, Dest) ->
+mria_insert_route_v2(Topic, Dest, Ctx) ->
     case emqx_trie_search:filter(Topic) of
         Words when is_list(Words) ->
             K = emqx_topic_index:make_key(Words, Dest),
-            mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
+            mria_filter_tab_insert(K, Ctx);
         false ->
-            mria_route_tab_insert(#route{topic = Topic, dest = Dest})
+            mria_route_tab_insert(#route{topic = Topic, dest = Dest}, Ctx)
     end.
 
-mria_delete_route_v2(Topic, Dest) ->
+mria_filter_tab_insert(K, single) ->
+    mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
+mria_filter_tab_insert(K, batch) ->
+    mnesia:write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}, write).
+
+mria_delete_route_v2(Topic, Dest, Ctx) ->
     case emqx_trie_search:filter(Topic) of
         Words when is_list(Words) ->
             K = emqx_topic_index:make_key(Words, Dest),
-            mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+            mria_filter_tab_delete(K, Ctx);
         false ->
-            mria_route_tab_delete(#route{topic = Topic, dest = Dest})
+            mria_route_tab_delete(#route{topic = Topic, dest = Dest}, Ctx)
     end.
 
+mria_filter_tab_delete(K, single) ->
+    mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+mria_filter_tab_delete(K, batch) ->
+    mnesia:delete(?ROUTE_TAB_FILTERS, K, write).
+
 match_routes_v2(Topic) ->
     lookup_route_tab(Topic) ++
         [match_to_route(M) || M <- match_filters(Topic)].

+ 410 - 0
apps/emqx/src/emqx_router_syncer.erl

@@ -0,0 +1,410 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_router_syncer).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/2]).
+
+-export([push/4]).
+-export([wait/1]).
+
+-export([stats/0]).
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+
+-type action() :: add | delete.
+
+-define(POOL, router_syncer_pool).
+
+-define(MAX_BATCH_SIZE, 1000).
+
+%% How long to idle (ms) after receiving a new operation before triggering batch sync?
+%% Zero effectively just schedules out the process, so that it has a chance to receive
+%% more operations, and introduce no minimum delay.
+-define(MIN_SYNC_INTERVAL, 0).
+
+%% How long (ms) to idle after observing a batch sync error?
+%% Should help to avoid excessive retries in situations when errors are caused by
+%% conditions that take some time to resolve (e.g. restarting an upstream core node).
+-define(ERROR_DELAY, 10).
+
+%% How soon (ms) to retry last failed batch sync attempt?
+%% Only matter in absence of new operations, otherwise batch sync is triggered as
+%% soon as `?ERROR_DELAY` is over.
+-define(ERROR_RETRY_INTERVAL, 500).
+
+-define(PRIO_HI, 1).
+-define(PRIO_LO, 2).
+-define(PRIO_BG, 3).
+
+-define(PUSH(PRIO, OP), {PRIO, OP}).
+-define(OP(ACT, TOPIC, DEST, CTX), {ACT, TOPIC, DEST, CTX}).
+
+-define(ROUTEOP(ACT), {ACT, _, _}).
+-define(ROUTEOP(ACT, PRIO), {ACT, PRIO, _}).
+-define(ROUTEOP(ACT, PRIO, CTX), {ACT, PRIO, CTX}).
+
+-ifdef(TEST).
+-undef(MAX_BATCH_SIZE).
+-undef(MIN_SYNC_INTERVAL).
+-define(MAX_BATCH_SIZE, 40).
+-define(MIN_SYNC_INTERVAL, 10).
+-endif.
+
+%%
+
+-spec start_link(atom(), pos_integer()) ->
+    {ok, pid()}.
+start_link(Pool, Id) ->
+    gen_server:start_link(
+        {local, emqx_utils:proc_name(?MODULE, Id)},
+        ?MODULE,
+        [Pool, Id],
+        []
+    ).
+
+-spec push(action(), emqx_types:topic(), emqx_router:dest(), Opts) ->
+    ok | _WaitRef :: reference()
+when
+    Opts :: #{reply => pid()}.
+push(Action, Topic, Dest, Opts) ->
+    Worker = gproc_pool:pick_worker(?POOL, Topic),
+    Prio = designate_prio(Action, Opts),
+    Context = mk_push_context(Opts),
+    _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})),
+    case Context of
+        {MRef, _} ->
+            MRef;
+        [] ->
+            ok
+    end.
+
+-spec wait(_WaitRef :: reference()) ->
+    ok | {error, _Reason}.
+wait(MRef) ->
+    %% NOTE
+    %% No timeouts here because (as in `emqx_broker:call/2` case) callers do not
+    %% really expect this to fail with timeout exception. However, waiting
+    %% indefinitely is not the best option since it blocks the caller from receiving
+    %% other messages, so for instance channel (connection) process may not be able
+    %% to react to socket close event in time. Better option would probably be to
+    %% introduce cancellable operation, which will be able to check if the caller
+    %% would still be interested in the result.
+    receive
+        {MRef, Result} ->
+            Result
+    end.
+
+designate_prio(_, #{reply := _To}) ->
+    ?PRIO_HI;
+designate_prio(add, #{}) ->
+    ?PRIO_LO;
+designate_prio(delete, #{}) ->
+    ?PRIO_BG.
+
+mk_push_context(#{reply := To}) ->
+    MRef = erlang:make_ref(),
+    {MRef, To};
+mk_push_context(_) ->
+    [].
+
+%%
+
+-type stats() :: #{
+    size := non_neg_integer(),
+    n_add := non_neg_integer(),
+    n_delete := non_neg_integer(),
+    prio_highest := non_neg_integer() | undefined,
+    prio_lowest := non_neg_integer() | undefined
+}.
+
+-spec stats() -> [stats()].
+stats() ->
+    Workers = gproc_pool:active_workers(?POOL),
+    [gen_server:call(Pid, stats, infinity) || {_Name, Pid} <- Workers].
+
+%%
+
+init([Pool, Id]) ->
+    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
+    {ok, #{stash => stash_new()}}.
+
+handle_call(stats, _From, State = #{stash := Stash}) ->
+    {reply, stash_stats(Stash), State};
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({timeout, _TRef, retry}, State) ->
+    NState = run_batch_loop([], maps:remove(retry_timer, State)),
+    {noreply, NState};
+handle_info(Push = ?PUSH(_, _), State) ->
+    %% NOTE: Wait a bit to collect potentially overlapping operations.
+    ok = timer:sleep(?MIN_SYNC_INTERVAL),
+    NState = run_batch_loop([Push], State),
+    {noreply, NState}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+%%
+
+run_batch_loop(Incoming, State = #{stash := Stash0}) ->
+    Stash1 = stash_add(Incoming, Stash0),
+    Stash2 = stash_drain(Stash1),
+    {Batch, Stash3} = mk_batch(Stash2),
+    ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)),
+    case run_batch(Batch) of
+        Status = #{} ->
+            ok = send_replies(Status, Batch),
+            NState = cancel_retry_timer(State#{stash := Stash3}),
+            %% NOTE
+            %% We could postpone batches where only `?PRIO_BG` operations left, which
+            %% would allow to do less work in situations when there are intermittently
+            %% reconnecting clients with moderately unique subscriptions. However, this
+            %% would also require us to forego the idempotency of batch syncs (see
+            %% `merge_route_op/2`).
+            case is_stash_empty(Stash3) of
+                true ->
+                    NState;
+                false ->
+                    run_batch_loop([], NState)
+            end;
+        BatchError ->
+            ?SLOG(warning, #{
+                msg => "router_batch_sync_failed",
+                reason => BatchError,
+                batch => batch_stats(Batch, Stash3)
+            }),
+            NState = State#{stash := Stash2},
+            ok = timer:sleep(?ERROR_DELAY),
+            ensure_retry_timer(NState)
+    end.
+
+ensure_retry_timer(State = #{retry_timer := _TRef}) ->
+    State;
+ensure_retry_timer(State) ->
+    TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry),
+    State#{retry_timer => TRef}.
+
+cancel_retry_timer(State = #{retry_timer := TRef}) ->
+    ok = emqx_utils:cancel_timer(TRef),
+    maps:remove(retry_timer, State);
+cancel_retry_timer(State) ->
+    State.
+
+%%
+
+mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE ->
+    %% This is perfect situation, we just use stash as batch w/o extra reallocations.
+    {Stash, stash_new()};
+mk_batch(Stash) ->
+    %% Take a subset of stashed operations to form a batch.
+    %% Note that stash is an unordered map, it's not a queue. The order of operations is
+    %% not preserved strictly, only loosely, because of how we start from high priority
+    %% operations and go down to low priority ones. This might cause some operations to
+    %% stay in stash for unfairly long time, when there are many high priority operations.
+    %% However, it's unclear how likely this is to happen in practice.
+    mk_batch(Stash, ?MAX_BATCH_SIZE).
+
+mk_batch(Stash, BatchSize) ->
+    mk_batch(?PRIO_HI, #{}, BatchSize, Stash).
+
+mk_batch(Prio, Batch, SizeLeft, Stash) ->
+    mk_batch(Prio, Batch, SizeLeft, Stash, maps:iterator(Stash)).
+
+mk_batch(Prio, Batch, SizeLeft, Stash, It) when SizeLeft > 0 ->
+    %% Iterating over stash, only taking operations with priority equal to `Prio`.
+    case maps:next(It) of
+        {Route, Op = ?ROUTEOP(_Action, Prio), NIt} ->
+            NBatch = Batch#{Route => Op},
+            NStash = maps:remove(Route, Stash),
+            mk_batch(Prio, NBatch, SizeLeft - 1, NStash, NIt);
+        {_Route, _Op, NIt} ->
+            %% This is lower priority operation, skip it.
+            mk_batch(Prio, Batch, SizeLeft, Stash, NIt);
+        none ->
+            %% No more operations with priority `Prio`, go to the next priority level.
+            true = Prio < ?PRIO_BG,
+            mk_batch(Prio + 1, Batch, SizeLeft, Stash)
+    end;
+mk_batch(_Prio, Batch, _, Stash, _It) ->
+    {Batch, Stash}.
+
+send_replies(Errors, Batch) ->
+    maps:foreach(
+        fun(Route, {_Action, _Prio, Ctx}) ->
+            case Ctx of
+                [] ->
+                    ok;
+                _ ->
+                    replyctx_send(maps:get(Route, Errors, ok), Ctx)
+            end
+        end,
+        Batch
+    ).
+
+replyctx_send(_Result, []) ->
+    noreply;
+replyctx_send(Result, {MRef, Pid}) ->
+    _ = erlang:send(Pid, {MRef, Result}),
+    ok.
+
+%%
+
+run_batch(Batch) when map_size(Batch) > 0 ->
+    catch emqx_router:do_batch(Batch);
+run_batch(_Empty) ->
+    #{}.
+
+%%
+
+stash_new() ->
+    #{}.
+
+is_stash_empty(Stash) ->
+    maps:size(Stash) =:= 0.
+
+stash_drain(Stash) ->
+    receive
+        ?PUSH(Prio, Op) ->
+            stash_drain(stash_add(Prio, Op, Stash))
+    after 0 ->
+        Stash
+    end.
+
+stash_add(Pushes, Stash) ->
+    lists:foldl(
+        fun(?PUSH(Prio, Op), QAcc) -> stash_add(Prio, Op, QAcc) end,
+        Stash,
+        Pushes
+    ).
+
+stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) ->
+    Route = {Topic, Dest},
+    case maps:get(Route, Stash, undefined) of
+        undefined ->
+            Stash#{Route => {Action, Prio, Ctx}};
+        RouteOp ->
+            RouteOpMerged = merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)),
+            Stash#{Route := RouteOpMerged}
+    end.
+
+merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) ->
+    %% NOTE: This should not happen anyway.
+    _ = replyctx_send(ignored, Ctx1),
+    DestOp;
+merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) ->
+    %% NOTE: Latter cancel the former.
+    %% Strictly speaking, in ideal conditions we could just cancel both, because they
+    %% essentially do not change the global state. However, we decided to stay on the
+    %% safe side and cancel only the former, making batch syncs idempotent.
+    _ = replyctx_send(ok, Ctx1),
+    DestOp.
+
+%%
+
+batch_stats(Batch, Stash) ->
+    BatchStats = stash_stats(Batch),
+    BatchStats#{
+        stashed => maps:size(Stash)
+    }.
+
+stash_stats(Stash) ->
+    #{
+        size => maps:size(Stash),
+        n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Stash)),
+        n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Stash)),
+        prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Stash),
+        prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Stash)
+    }.
+
+%%
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+batch_test() ->
+    Dest = node(),
+    Ctx = fun(N) -> {N, self()} end,
+    Stash = stash_add(
+        [
+            ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))),
+            ?PUSH(?PRIO_HI, ?OP(add, <<"t/1">>, Dest, Ctx(2))),
+            ?PUSH(?PRIO_LO, ?OP(add, <<"t/1">>, Dest, Ctx(3))),
+            ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(4))),
+            ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(5))),
+            ?PUSH(?PRIO_HI, ?OP(add, <<"t/4">>, Dest, Ctx(6))),
+            ?PUSH(?PRIO_LO, ?OP(delete, <<"t/3">>, Dest, Ctx(7))),
+            ?PUSH(?PRIO_BG, ?OP(delete, <<"t/3">>, Dest, Ctx(8))),
+            ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(9))),
+            ?PUSH(?PRIO_BG, ?OP(delete, <<"old/1">>, Dest, Ctx(10))),
+            ?PUSH(?PRIO_HI, ?OP(add, <<"t/2">>, Dest, Ctx(11))),
+            ?PUSH(?PRIO_BG, ?OP(delete, <<"old/2">>, Dest, Ctx(12))),
+            ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(13))),
+            ?PUSH(?PRIO_HI, ?OP(add, <<"t/3">>, Dest, Ctx(14))),
+            ?PUSH(?PRIO_LO, ?OP(delete, <<"old/3">>, Dest, Ctx(15))),
+            ?PUSH(?PRIO_LO, ?OP(delete, <<"t/2">>, Dest, Ctx(16)))
+        ],
+        stash_new()
+    ),
+    {Batch, StashLeft} = mk_batch(Stash, 5),
+    ?assertMatch(
+        #{
+            {<<"t/1">>, Dest} := {add, ?PRIO_LO, _},
+            {<<"t/3">>, Dest} := {add, ?PRIO_HI, _},
+            {<<"t/2">>, Dest} := {delete, ?PRIO_LO, _},
+            {<<"t/4">>, Dest} := {add, ?PRIO_HI, _},
+            {<<"old/3">>, Dest} := {delete, ?PRIO_LO, _}
+        },
+        Batch
+    ),
+    ?assertMatch(
+        #{
+            {<<"old/1">>, Dest} := {delete, ?PRIO_BG, _},
+            {<<"old/2">>, Dest} := {delete, ?PRIO_BG, _}
+        },
+        StashLeft
+    ),
+    ?assertEqual(
+        [
+            {2, ignored},
+            {1, ok},
+            {5, ok},
+            {7, ignored},
+            {4, ok},
+            {9, ok},
+            {8, ok},
+            {13, ignored},
+            {11, ok}
+        ],
+        emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))
+    ).
+
+-endif.

+ 18 - 0
apps/emqx/src/emqx_schema.erl

@@ -1404,6 +1404,24 @@ fields("broker_routing") ->
                     'readOnly' => true,
                     desc => ?DESC(broker_routing_storage_schema)
                 }
+            )},
+        {"batch_sync",
+            sc(
+                ref("broker_routing_batch_sync"),
+                #{importance => ?IMPORTANCE_HIDDEN}
+            )}
+    ];
+fields("broker_routing_batch_sync") ->
+    [
+        {"enable_on",
+            sc(
+                hoconsc:enum([none, core, replicant, all]),
+                #{
+                    %% TODO
+                    %% Make `replicant` the default value after initial release.
+                    default => none,
+                    desc => ?DESC(broker_routing_batch_sync_enable_on)
+                }
             )}
     ];
 fields("shared_subscription_group") ->

+ 1 - 0
apps/emqx/test/emqx_cth_suite.erl

@@ -72,6 +72,7 @@
 -export([stop_apps/1]).
 
 -export([merge_appspec/2]).
+-export([merge_config/2]).
 
 %% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs
 -export([schema_module/0, upgrade_raw_conf/1]).

+ 250 - 29
apps/emqx/test/emqx_routing_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx_router.hrl").
 
@@ -33,27 +34,80 @@ all() ->
     ].
 
 groups() ->
-    TCs = [
+    GroupVsn = [
+        {group, batch_sync_on},
+        {group, batch_sync_replicants},
+        {group, batch_sync_off}
+    ],
+    ClusterTCs = [
         t_cluster_routing,
         t_slow_rlog_routing_consistency
     ],
+    SingleTCs = [t_concurrent_routing_updates],
+    BatchSyncTCs = lists:duplicate(5, t_concurrent_routing_updates_with_errors),
     [
-        {routing_schema_v1, [], TCs},
-        {routing_schema_v2, [], TCs}
+        {routing_schema_v1, [], GroupVsn},
+        {routing_schema_v2, [], GroupVsn},
+        {batch_sync_on, [], [{group, cluster}, {group, single_batch_on}]},
+        {batch_sync_replicants, [], [{group, cluster}, {group, single}]},
+        {batch_sync_off, [], [{group, cluster}, {group, single}]},
+        {cluster, [], ClusterTCs},
+        {single_batch_on, [], SingleTCs ++ BatchSyncTCs},
+        {single, [], SingleTCs}
     ].
 
-init_per_group(GroupName, Config) ->
-    WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
+init_per_group(routing_schema_v1, Config) ->
+    [{emqx_config, "broker.routing.storage_schema = v1"} | Config];
+init_per_group(routing_schema_v2, Config) ->
+    [{emqx_config, "broker.routing.storage_schema = v2"} | Config];
+init_per_group(batch_sync_on, Config) ->
+    [{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config];
+init_per_group(batch_sync_replicants, Config) ->
+    [{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
+init_per_group(batch_sync_off, Config) ->
+    [{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config];
+init_per_group(cluster, Config) ->
+    WorkDir = emqx_cth_suite:work_dir(Config),
     NodeSpecs = [
-        {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}},
-        {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}},
-        {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}}
+        {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
+        {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}},
+        {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
     ],
     Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
-    [{cluster, Nodes} | Config].
+    [{cluster, Nodes} | Config];
+init_per_group(GroupName, Config) when
+    GroupName =:= single_batch_on;
+    GroupName =:= single
+->
+    WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{
+                config => mk_config_broker(Config),
+                %% NOTE
+                %% Artificially increasing pool workers contention by forcing small pool size.
+                before_start => fun() ->
+                    % NOTE
+                    % This one is actually defined on `emqx_conf_schema` level, but used
+                    % in `emqx_broker`. Thus we have to resort to this ugly hack.
+                    emqx_config:force_put([node, broker_pool_size], 2),
+                    emqx_app:set_config_loader(?MODULE)
+                end
+            }}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    [{group_apps, Apps} | Config].
 
-end_per_group(_GroupName, Config) ->
-    emqx_cth_cluster:stop(?config(cluster, Config)).
+end_per_group(cluster, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config));
+end_per_group(GroupName, Config) when
+    GroupName =:= single_batch_on;
+    GroupName =:= single
+->
+    emqx_cth_suite:stop(?config(group_apps, Config));
+end_per_group(_, _Config) ->
+    ok.
 
 init_per_testcase(TC, Config) ->
     emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
@@ -61,9 +115,9 @@ init_per_testcase(TC, Config) ->
 end_per_testcase(TC, Config) ->
     emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
 
-mk_emqx_appspec(GroupName, N) ->
+mk_emqx_appspec(N, Config) ->
     {emqx, #{
-        config => mk_config(GroupName, N),
+        config => mk_config(N, Config),
         after_start => fun() ->
             % NOTE
             % This one is actually defined on `emqx_conf_schema` level, but used
@@ -77,24 +131,28 @@ mk_genrpc_appspec() ->
         override_env => [{port_discovery, stateless}]
     }}.
 
-mk_config(GroupName, N) ->
-    #{
-        broker => mk_config_broker(GroupName),
-        listeners => mk_config_listeners(N)
-    }.
+mk_config(N, ConfigOrVsn) ->
+    emqx_cth_suite:merge_config(
+        mk_config_broker(ConfigOrVsn),
+        mk_config_listeners(N)
+    ).
 
-mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 ->
-    #{routing => #{storage_schema => v1}};
-mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 ->
-    #{routing => #{storage_schema => v2}}.
+mk_config_broker(v1) ->
+    "broker.routing.storage_schema = v1";
+mk_config_broker(v2) ->
+    "broker.routing.storage_schema = v2";
+mk_config_broker(CTConfig) ->
+    string:join(proplists:get_all_values(emqx_config, CTConfig), "\n").
 
 mk_config_listeners(N) ->
     Port = 1883 + N,
     #{
-        tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}},
-        ssl => #{default => #{enable => false}},
-        ws => #{default => #{enable => false}},
-        wss => #{default => #{enable => false}}
+        listeners => #{
+            tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}},
+            ssl => #{default => #{enable => false}},
+            ws => #{default => #{enable => false}},
+            wss => #{default => #{enable => false}}
+        }
     }.
 
 %%
@@ -182,6 +240,169 @@ unsubscribe(C, Topic) ->
 
 %%
 
+-define(SUBSCRIBE_TOPICS, [
+    <<"t/#">>,
+    <<"t/fixed">>,
+    <<"t/1/+">>,
+    <<"t/2/+">>,
+    <<"t/42/+/+">>,
+    <<"client/${i}/+">>,
+    <<"client/${i}/fixed">>,
+    <<"client/${i}/#">>,
+    <<"rand/${r}/+">>,
+    <<"rand/${r}/fixed">>
+]).
+
+t_concurrent_routing_updates(init, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config;
+t_concurrent_routing_updates('end', _Config) ->
+    ok = snabbkaffe:stop().
+
+t_concurrent_routing_updates(_Config) ->
+    NClients = 400,
+    NRTopics = 250,
+    MCommands = 8,
+    Port = get_mqtt_tcp_port(node()),
+    Clients = [
+        spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics])
+     || I <- lists:seq(1, NClients)
+    ],
+    ok = lists:foreach(fun ping_concurrent_client/1, Clients),
+    ok = timer:sleep(200),
+    Subscribers = ets:tab2list(?SUBSCRIBER),
+    Topics = maps:keys(maps:from_list(Subscribers)),
+    ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())),
+    ok = lists:foreach(fun stop_concurrent_client/1, Clients),
+    ok = timer:sleep(1000),
+    ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]),
+    ?assertEqual([], ets:tab2list(?SUBSCRIBER)),
+    ?assertEqual([], emqx_router:topics()).
+
+t_concurrent_routing_updates_with_errors(init, Config) ->
+    ok = snabbkaffe:start_trace(),
+    ok = meck:new(emqx_router, [passthrough, no_history]),
+    Config;
+t_concurrent_routing_updates_with_errors('end', _Config) ->
+    ok = meck:unload(emqx_router),
+    ok = snabbkaffe:stop().
+
+t_concurrent_routing_updates_with_errors(_Config) ->
+    NClients = 100,
+    NRTopics = 80,
+    MCommands = 6,
+    PSyncError = 0.1,
+    Port = get_mqtt_tcp_port(node()),
+    %% Crash the batch sync operation with some small probability.
+    ok = meck:expect(emqx_router, mria_batch_run, fun(Vsn, Batch) ->
+        case rand:uniform() < PSyncError of
+            false -> meck:passthrough([Vsn, Batch]);
+            true -> error(overload)
+        end
+    end),
+    Clients = [
+        spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics])
+     || I <- lists:seq(1, NClients)
+    ],
+    ok = lists:foreach(fun ping_concurrent_client/1, Clients),
+    0 = ?retry(
+        _Interval = 500,
+        _NTimes = 10,
+        0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()])
+    ),
+    Subscribers = ets:tab2list(?SUBSCRIBER),
+    Topics = maps:keys(maps:from_list(Subscribers)),
+    ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())),
+    ok = lists:foreach(fun stop_concurrent_client/1, Clients),
+    ok = timer:sleep(100),
+    0 = ?retry(
+        500,
+        10,
+        0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()])
+    ),
+    ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]),
+    ?assertEqual([], ets:tab2list(?SUBSCRIBER)),
+    ?assertEqual([], emqx_router:topics()).
+
+run_concurrent_client(I, Port, MCommands, NRTopics) ->
+    % _ = rand:seed(default, I),
+    Ctx = #{
+        i => I,
+        r => rand:uniform(NRTopics)
+    },
+    {ok, C} = emqtt:start_link(#{port => Port, clientid => render("client:${i}", Ctx)}),
+    {ok, _Props} = emqtt:connect(C),
+    NCommands = rand:uniform(MCommands),
+    Commands = gen_concurrent_client_plan(NCommands, Ctx),
+    ok = subscribe_concurrent_client(C, Commands),
+    run_concurrent_client_loop(C).
+
+gen_concurrent_client_plan(N, Ctx) ->
+    lists:foldl(
+        fun(_, Acc) -> mixin(pick_random_command(Ctx), Acc) end,
+        [],
+        lists:seq(1, N)
+    ).
+
+subscribe_concurrent_client(C, Commands) ->
+    lists:foreach(
+        fun
+            ({subscribe, Topic}) ->
+                {ok, _Props, [0]} = emqtt:subscribe(C, Topic);
+            ({unsubscribe, Topic}) ->
+                {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic)
+        end,
+        Commands
+    ).
+
+pick_random_command(Ctx) ->
+    Topic = render(randpick(?SUBSCRIBE_TOPICS), Ctx),
+    randpick([
+        [{subscribe, Topic}],
+        [{subscribe, Topic}, {unsubscribe, Topic}]
+    ]).
+
+render(Template, Ctx) ->
+    iolist_to_binary(emqx_template:render_strict(emqx_template:parse(Template), Ctx)).
+
+run_concurrent_client_loop(C) ->
+    receive
+        {From, Ref, F} ->
+            Reply = F(C),
+            From ! {Ref, Reply},
+            run_concurrent_client_loop(C)
+    end.
+
+ping_concurrent_client(Pid) ->
+    Ref = make_ref(),
+    Pid ! {self(), Ref, fun emqtt:ping/1},
+    receive
+        {Ref, Reply} -> Reply
+    after 5000 ->
+        error(timeout)
+    end.
+
+stop_concurrent_client(Pid) ->
+    MRef = erlang:monitor(process, Pid),
+    true = erlang:unlink(Pid),
+    true = erlang:exit(Pid, shutdown),
+    receive
+        {'DOWN', MRef, process, Pid, Reason} -> Reason
+    end.
+
+randpick(List) ->
+    lists:nth(rand:uniform(length(List)), List).
+
+mixin(L = [H | T], Into = [HInto | TInto]) ->
+    case rand:uniform(length(Into) + 1) of
+        1 -> [H | mixin(T, Into)];
+        _ -> [HInto | mixin(L, TInto)]
+    end;
+mixin(L, Into) ->
+    L ++ Into.
+
+%%
+
 t_routing_schema_switch_v1(Config) ->
     WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
     t_routing_schema_switch(_From = v2, _To = v1, WorkDir).
@@ -195,7 +416,7 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
     [Node1] = emqx_cth_cluster:start(
         [
             {routing_schema_switch1, #{
-                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)]
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, VTo)]
             }}
         ],
         #{work_dir => WorkDir}
@@ -208,12 +429,12 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
     [Node2, Node3] = emqx_cth_cluster:start(
         [
             {routing_schema_switch2, #{
-                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)],
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, VFrom)],
                 base_port => 20000,
                 join_to => Node1
             }},
             {routing_schema_switch3, #{
-                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)],
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, VFrom)],
                 base_port => 20100,
                 join_to => Node1
             }}

+ 11 - 4
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -50,8 +50,8 @@
 ]).
 
 -export([
-    make_producer_action_schema/1,
-    make_consumer_action_schema/1,
+    make_producer_action_schema/1, make_producer_action_schema/2,
+    make_consumer_action_schema/1, make_consumer_action_schema/2,
     top_level_common_action_keys/0,
     project_to_actions_resource_opts/1
 ]).
@@ -282,12 +282,19 @@ top_level_common_action_keys() ->
 %%======================================================================================
 
 make_producer_action_schema(ActionParametersRef) ->
+    make_producer_action_schema(ActionParametersRef, _Opts = #{}).
+
+make_producer_action_schema(ActionParametersRef, Opts) ->
     [
         {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}
-        | make_consumer_action_schema(ActionParametersRef)
+        | make_consumer_action_schema(ActionParametersRef, Opts)
     ].
 
 make_consumer_action_schema(ActionParametersRef) ->
+    make_consumer_action_schema(ActionParametersRef, _Opts = #{}).
+
+make_consumer_action_schema(ActionParametersRef, Opts) ->
+    ResourceOptsRef = maps:get(resource_opts_ref, Opts, ref(?MODULE, resource_opts)),
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
         {connector,
@@ -297,7 +304,7 @@ make_consumer_action_schema(ActionParametersRef) ->
         {description, emqx_schema:description_schema()},
         {parameters, ActionParametersRef},
         {resource_opts,
-            mk(ref(?MODULE, resource_opts), #{
+            mk(ResourceOptsRef, #{
                 default => #{},
                 desc => ?DESC(emqx_resource_schema, "resource_opts")
             })}

+ 1 - 10
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -89,14 +89,6 @@ fields(action_parameters) ->
                     desc => ?DESC("config_device_id")
                 }
             )},
-        {iotdb_version,
-            mk(
-                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
-                #{
-                    desc => ?DESC("config_iotdb_version"),
-                    default => ?VSN_1_1_X
-                }
-            )},
         {data,
             mk(
                 array(ref(?MODULE, action_parameters_data)),
@@ -311,8 +303,7 @@ action_values() ->
                 }
             ],
             is_aligned => false,
-            device_id => <<"my_device">>,
-            iotdb_version => ?VSN_1_1_X
+            device_id => <<"my_device">>
         }
     }.
 

+ 91 - 60
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -47,6 +47,7 @@
         connect_timeout := pos_integer(),
         pool_type := random | hash,
         pool_size := pos_integer(),
+        iotdb_version := atom(),
         request => undefined | map(),
         atom() => _
     }.
@@ -57,6 +58,7 @@
         connect_timeout := pos_integer(),
         pool_type := random | hash,
         channels := map(),
+        iotdb_version := atom(),
         request => undefined | map(),
         atom() => _
     }.
@@ -88,6 +90,7 @@ connector_example_values() ->
         name => <<"iotdb_connector">>,
         type => iotdb,
         enable => true,
+        iotdb_version => ?VSN_1_1_X,
         authentication => #{
             <<"username">> => <<"root">>,
             <<"password">> => <<"******">>
@@ -124,6 +127,14 @@ fields("connection_fields") ->
                     desc => ?DESC(emqx_bridge_iotdb, "config_base_url")
                 }
             )},
+        {iotdb_version,
+            mk(
+                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
+                #{
+                    desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"),
+                    default => ?VSN_1_1_X
+                }
+            )},
         {authentication,
             mk(
                 hoconsc:union([ref(?MODULE, auth_basic)]),
@@ -193,7 +204,7 @@ proplists_without(Keys, List) ->
 callback_mode() -> async_if_possible.
 
 -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
-on_start(InstanceId, Config) ->
+on_start(InstanceId, #{iotdb_version := Version} = Config) ->
     %% [FIXME] The configuration passed in here is pre-processed and transformed
     %% in emqx_bridge_resource:parse_confs/2.
     case emqx_bridge_http_connector:on_start(InstanceId, Config) of
@@ -204,7 +215,7 @@ on_start(InstanceId, Config) ->
                 request => maps:get(request, State, <<>>)
             }),
             ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
-            {ok, State#{channels => #{}}};
+            {ok, State#{iotdb_version => Version, channels => #{}}};
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_iotdb_bridge",
@@ -234,7 +245,11 @@ on_get_status(InstanceId, State) ->
     {ok, pos_integer(), [term()], term()}
     | {ok, pos_integer(), [term()]}
     | {error, term()}.
-on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) ->
+on_query(
+    InstanceId,
+    {ChannelId, _Message} = Req,
+    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+) ->
     ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_called",
@@ -243,7 +258,7 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat
         state => emqx_utils:redact(State)
     }),
 
-    case try_render_message(Req, Channels) of
+    case try_render_message(Req, IoTDBVsn, Channels) of
         {ok, IoTDBPayload} ->
             handle_response(
                 emqx_bridge_http_connector:on_query(
@@ -257,7 +272,10 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat
 -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
     {ok, pid()} | {error, empty_request}.
 on_query_async(
-    InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State
+    InstanceId,
+    {ChannelId, _Message} = Req,
+    ReplyFunAndArgs0,
+    #{iotdb_version := IoTDBVsn, channels := Channels} = State
 ) ->
     ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
@@ -266,7 +284,7 @@ on_query_async(
         send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case try_render_message(Req, Channels) of
+    case try_render_message(Req, IoTDBVsn, Channels) of
         {ok, IoTDBPayload} ->
             ReplyFunAndArgs =
                 {
@@ -285,10 +303,10 @@ on_query_async(
 
 on_add_channel(
     InstanceId,
-    #{channels := Channels} = OldState0,
+    #{iotdb_version := Version, channels := Channels} = OldState0,
     ChannelId,
     #{
-        parameters := #{iotdb_version := Version, data := Data} = Parameter
+        parameters := #{data := Data} = Parameter
     }
 ) ->
     case maps:is_key(ChannelId, Channels) of
@@ -407,25 +425,41 @@ proc_data(PreProcessedData, Msg) ->
         now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
         now_ns => NowNS
     },
-    lists:map(
-        fun(
-            #{
-                timestamp := TimestampTkn,
-                measurement := Measurement,
-                data_type := DataType0,
-                value := ValueTkn
-            }
-        ) ->
-            DataType = emqx_placeholder:proc_tmpl(DataType0, Msg),
-            #{
-                timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
-                measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
-                data_type => DataType,
-                value => proc_value(DataType, ValueTkn, Msg)
-            }
-        end,
-        PreProcessedData
-    ).
+    proc_data(PreProcessedData, Msg, Nows, []).
+
+proc_data(
+    [
+        #{
+            timestamp := TimestampTkn,
+            measurement := Measurement,
+            data_type := DataType0,
+            value := ValueTkn
+        }
+        | T
+    ],
+    Msg,
+    Nows,
+    Acc
+) ->
+    DataType = list_to_binary(
+        string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
+    ),
+    case proc_value(DataType, ValueTkn, Msg) of
+        {ok, Value} ->
+            proc_data(T, Msg, Nows, [
+                #{
+                    timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
+                    measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
+                    data_type => DataType,
+                    value => Value
+                }
+                | Acc
+            ]);
+        Error ->
+            Error
+    end;
+proc_data([], _Msg, _Nows, Acc) ->
+    {ok, lists:reverse(Acc)}.
 
 iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
     Timestamp;
@@ -444,16 +478,19 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
     binary_to_integer(Timestamp).
 
 proc_value(<<"TEXT">>, ValueTkn, Msg) ->
-    case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
-        <<"undefined">> -> null;
-        Val -> Val
-    end;
+    {ok,
+        case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
+            <<"undefined">> -> null;
+            Val -> Val
+        end};
 proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
-    convert_bool(replace_var(ValueTkn, Msg));
+    {ok, convert_bool(replace_var(ValueTkn, Msg))};
 proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
-    convert_int(replace_var(ValueTkn, Msg));
+    {ok, convert_int(replace_var(ValueTkn, Msg))};
 proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
-    convert_float(replace_var(ValueTkn, Msg)).
+    {ok, convert_float(replace_var(ValueTkn, Msg))};
+proc_value(Type, _, _) ->
+    {error, {invalid_type, Type}}.
 
 replace_var(Tokens, Data) when is_list(Tokens) ->
     [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
@@ -498,18 +535,18 @@ convert_float(Str) when is_binary(Str) ->
 convert_float(undefined) ->
     null.
 
-make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) ->
+make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) ->
     InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
-    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
+    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn),
     {ok,
         maps:merge(Rows, #{
-            iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
-            iotdb_field_key(device_id, IotDBVsn) => DeviceId
+            iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
+            iotdb_field_key(device_id, IoTDBVsn) => DeviceId
         })}.
 
-replace_dtypes(Rows0, IotDBVsn) ->
+replace_dtypes(Rows0, IoTDBVsn) ->
     {Types, Rows} = maps:take(dtypes, Rows0),
-    Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}.
+    Rows#{iotdb_field_key(data_types, IoTDBVsn) => Types}.
 
 aggregate_rows(DataList, InitAcc) ->
     lists:foldr(
@@ -615,9 +652,9 @@ eval_response_body(Body, Resp) ->
 
 preproc_data_template(DataList) ->
     Atom2Bin = fun
-        (Atom, Converter) when is_atom(Atom) ->
-            Converter(Atom);
-        (Bin, _) ->
+        (Atom) when is_atom(Atom) ->
+            erlang:atom_to_binary(Atom);
+        (Bin) ->
             Bin
     end,
     lists:map(
@@ -630,33 +667,24 @@ preproc_data_template(DataList) ->
             }
         ) ->
             #{
-                timestamp => emqx_placeholder:preproc_tmpl(
-                    Atom2Bin(Timestamp, fun erlang:atom_to_binary/1)
-                ),
+                timestamp => emqx_placeholder:preproc_tmpl(Atom2Bin(Timestamp)),
                 measurement => emqx_placeholder:preproc_tmpl(Measurement),
-                data_type => emqx_placeholder:preproc_tmpl(
-                    Atom2Bin(
-                        DataType,
-                        fun(Atom) ->
-                            erlang:list_to_binary(string:uppercase(erlang:atom_to_list(Atom)))
-                        end
-                    )
-                ),
+                data_type => emqx_placeholder:preproc_tmpl(Atom2Bin(DataType)),
                 value => emqx_placeholder:preproc_tmpl(Value)
             }
         end,
         DataList
     ).
 
-try_render_message({ChannelId, Msg}, Channels) ->
+try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) ->
     case maps:find(ChannelId, Channels) of
         {ok, Channel} ->
-            render_channel_message(Channel, Msg);
+            render_channel_message(Channel, IoTDBVsn, Msg);
         _ ->
             {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
     end.
 
-render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) ->
+render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) ->
     Payloads = to_list(parse_payload(get_payload(Message))),
     case device_id(Message, Payloads, Channel) of
         undefined ->
@@ -666,9 +694,12 @@ render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = C
                 [] ->
                     {error, invalid_data};
                 DataTemplate ->
-                    DataList = proc_data(DataTemplate, Message),
-
-                    make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn)
+                    case proc_data(DataTemplate, Message) of
+                        {ok, DataList} ->
+                            make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn);
+                        Error ->
+                            Error
+                    end
             end
     end.
 

+ 28 - 5
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -255,7 +255,6 @@ is_error_check(Reason) ->
     end.
 
 action_config(Name, Config) ->
-    Version = ?config(iotdb_version, Config),
     Type = ?config(bridge_type, Config),
     ConfigString =
         io_lib:format(
@@ -263,15 +262,13 @@ action_config(Name, Config) ->
             "  enable = true\n"
             "  connector = \"~s\"\n"
             "  parameters = {\n"
-            "     iotdb_version = \"~s\"\n"
             "     data = []\n"
             "  }\n"
             "}\n",
             [
                 Type,
                 Name,
-                Name,
-                Version
+                Name
             ]
         ),
     ct:pal("ActionConfig:~ts~n", [ConfigString]),
@@ -281,12 +278,14 @@ connector_config(Name, Config) ->
     Host = ?config(bridge_host, Config),
     Port = ?config(bridge_port, Config),
     Type = ?config(bridge_type, Config),
+    Version = ?config(iotdb_version, Config),
     ServerURL = iotdb_server_url(Host, Port),
     ConfigString =
         io_lib:format(
             "connectors.~s.~s {\n"
             "  enable = true\n"
             "  base_url = \"~s\"\n"
+            "  iotdb_version = \"~s\"\n"
             "  authentication = {\n"
             "     username = \"root\"\n"
             "     password = \"root\"\n"
@@ -295,7 +294,8 @@ connector_config(Name, Config) ->
             [
                 Type,
                 Name,
-                ServerURL
+                ServerURL,
+                Version
             ]
         ),
     ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
@@ -646,6 +646,29 @@ t_template(Config) ->
     iotdb_reset(Config, TemplateDeviceId),
     ok.
 
+t_sync_query_case(Config) ->
+    DeviceId = iotdb_device(Config),
+    Payload = make_iotdb_payload(DeviceId, "temp", "InT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
+    ),
+    Query = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
+    ?assertMatch(
+        #{<<"values">> := [[36]]},
+        emqx_utils_json:decode(IoTDBResult)
+    ).
+
+t_sync_query_invalid_type(Config) ->
+    DeviceId = iotdb_device(Config),
+    Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    IsInvalidType = fun(Result) -> ?assertMatch({error, {invalid_type, _}}, Result) end,
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
+    ).
+
 is_empty(null) -> true;
 is_empty([]) -> true;
 is_empty([[]]) -> true;

+ 1 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mongodb, [
     {description, "EMQX Enterprise MongoDB Bridge"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {applications, [
         kernel,

+ 12 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl

@@ -86,7 +86,8 @@ fields(mongodb_action) ->
     emqx_bridge_v2_schema:make_producer_action_schema(
         mk(ref(?MODULE, action_parameters), #{
             required => true, desc => ?DESC(action_parameters)
-        })
+        }),
+        #{resource_opts_ref => ref(?MODULE, action_resource_opts)}
     );
 fields(action_parameters) ->
     [
@@ -95,6 +96,14 @@ fields(action_parameters) ->
     ];
 fields(connector_resource_opts) ->
     emqx_connector_schema:resource_opts_fields();
+fields(action_resource_opts) ->
+    emqx_bridge_v2_schema:resource_opts_fields([
+        {batch_size, #{
+            importance => ?IMPORTANCE_HIDDEN,
+            converter => fun(_, _) -> 1 end,
+            desc => ?DESC("batch_size")
+        }}
+    ]);
 fields(resource_opts) ->
     fields("creation_opts");
 fields(mongodb_rs) ->
@@ -213,6 +222,8 @@ desc("creation_opts") ->
     ?DESC(emqx_resource_schema, "creation_opts");
 desc(resource_opts) ->
     ?DESC(emqx_resource_schema, "resource_opts");
+desc(action_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(connector_resource_opts) ->
     ?DESC(emqx_resource_schema, "resource_opts");
 desc(mongodb_rs) ->

+ 13 - 0
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -20,6 +20,7 @@
 -export([
     empty/0,
     list/1,
+    mqueue/1,
     map/2,
     chain/2
 ]).
@@ -59,6 +60,18 @@ list([]) ->
 list([X | Rest]) ->
     fun() -> [X | list(Rest)] end.
 
+%% @doc Make a stream out of process message queue.
+-spec mqueue(timeout()) -> stream(any()).
+mqueue(Timeout) ->
+    fun() ->
+        receive
+            X ->
+                [X | mqueue(Timeout)]
+        after Timeout ->
+            []
+        end
+    end.
+
 %% @doc Make a stream by applying a function to each element of the underlying stream.
 -spec map(fun((X) -> Y), stream(X)) -> stream(Y).
 map(F, S) ->

+ 9 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -73,3 +73,12 @@ chain_list_map_test() ->
         ["1", "2", "3", "4", "5", "6"],
         emqx_utils_stream:consume(S)
     ).
+
+mqueue_test() ->
+    _ = erlang:send_after(1, self(), 1),
+    _ = erlang:send_after(100, self(), 2),
+    _ = erlang:send_after(20, self(), 42),
+    ?assertEqual(
+        [1, 42, 2],
+        emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
+    ).

+ 1 - 0
changes/ee/fix-12317.en.md

@@ -0,0 +1 @@
+Removed the `resource_opts.batch_size` field from the MongoDB Action schema, as it's still not supported.

+ 9 - 0
rel/i18n/emqx_schema.hocon

@@ -1541,6 +1541,15 @@ Set <code>v1</code> to use the former schema.
 NOTE: Schema <code>v2</code> is still experimental.
 NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
 
+broker_routing_batch_sync_enable_on.desc:
+"""Use separate process pool to synchronize subscriptions with the global routing table in a batched manner.
+Especially useful in clusters interconnected through links with non-negligible latency, but might help in other scenarios by ensuring that the broker pool has less chance being overloaded.
+The selected value determines which nodes in the cluster will have this feature enabled.
+- <code>all</code>: enables it unconditionally on each node,
+- <code>replicant</code>: enables it only on replicants (e.g. those where <code>node.role = replicant</code>),
+- <code>core</code>: enables it only on core nodes,
+- <code>none</code>: disables this altogether."""
+
 broker_perf_trie_compaction.desc:
 """Enable trie path compaction.
 Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber.