Browse Source

perf(emqx_cm): use a dedicated pool for channel cleanup

This is to isolate channels cleanup from other async tasks (like routes cleanup),
as channels cleanup can be quite slow under high network latency conditions.

Fixes: EMQX-11743
Serge Tupchii 2 năm trước cách đây
mục cha
commit
b472b56883

+ 2 - 0
apps/emqx/include/emqx_cm.hrl

@@ -30,4 +30,6 @@
 -define(T_GET_INFO, 5_000).
 -define(T_GET_INFO, 5_000).
 -define(T_TAKEOVER, 15_000).
 -define(T_TAKEOVER, 15_000).
 
 
+-define(CM_POOL, emqx_cm_pool).
+
 -endif.
 -endif.

+ 5 - 1
apps/emqx/src/emqx_cm.erl

@@ -670,7 +670,11 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
     ChanPids = [Pid | emqx_utils:drain_down(BatchSize)],
     ChanPids = [Pid | emqx_utils:drain_down(BatchSize)],
     {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
     {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
     lists:foreach(fun mark_channel_disconnected/1, ChanPids),
     lists:foreach(fun mark_channel_disconnected/1, ChanPids),
-    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
+    ok = emqx_pool:async_submit_to_pool(
+        ?CM_POOL,
+        fun lists:foreach/2,
+        [fun ?MODULE:clean_down/1, Items]
+    ),
     {noreply, State#{chan_pmon := PMon1}};
     {noreply, State#{chan_pmon := PMon1}};
 handle_info(Info, State) ->
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),

+ 4 - 0
apps/emqx/src/emqx_cm_sup.erl

@@ -25,6 +25,8 @@
 %% for test
 %% for test
 -export([restart_flapping/0]).
 -export([restart_flapping/0]).
 
 
+-include("emqx_cm.hrl").
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -45,6 +47,7 @@ init([]) ->
     Banned = child_spec(emqx_banned, 1000, worker),
     Banned = child_spec(emqx_banned, 1000, worker),
     Flapping = child_spec(emqx_flapping, 1000, worker),
     Flapping = child_spec(emqx_flapping, 1000, worker),
     Locker = child_spec(emqx_cm_locker, 5000, worker),
     Locker = child_spec(emqx_cm_locker, 5000, worker),
+    CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
     Registry = child_spec(emqx_cm_registry, 5000, worker),
     Registry = child_spec(emqx_cm_registry, 5000, worker),
     Manager = child_spec(emqx_cm, 5000, worker),
     Manager = child_spec(emqx_cm, 5000, worker),
     DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
     DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
@@ -53,6 +56,7 @@ init([]) ->
             Banned,
             Banned,
             Flapping,
             Flapping,
             Locker,
             Locker,
+            CmPool,
             Registry,
             Registry,
             Manager,
             Manager,
             DSSessionGCSup
             DSSessionGCSup

+ 47 - 17
apps/emqx/src/emqx_pool.erl

@@ -28,11 +28,15 @@
     submit/1,
     submit/1,
     submit/2,
     submit/2,
     async_submit/1,
     async_submit/1,
-    async_submit/2
+    async_submit/2,
+    submit_to_pool/2,
+    submit_to_pool/3,
+    async_submit_to_pool/2,
+    async_submit_to_pool/3
 ]).
 ]).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
--export([worker/0, flush_async_tasks/0]).
+-export([worker/0, flush_async_tasks/0, flush_async_tasks/1]).
 -endif.
 -endif.
 
 
 %% gen_server callbacks
 %% gen_server callbacks
@@ -57,7 +61,7 @@
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
 start_link(Pool, Id) ->
 start_link(Pool, Id) ->
     gen_server:start_link(
     gen_server:start_link(
-        {local, emqx_utils:proc_name(?MODULE, Id)},
+        {local, emqx_utils:proc_name(Pool, Id)},
         ?MODULE,
         ?MODULE,
         [Pool, Id],
         [Pool, Id],
         [{hibernate_after, 1000}]
         [{hibernate_after, 1000}]
@@ -66,32 +70,48 @@ start_link(Pool, Id) ->
 %% @doc Submit work to the pool.
 %% @doc Submit work to the pool.
 -spec submit(task()) -> any().
 -spec submit(task()) -> any().
 submit(Task) ->
 submit(Task) ->
-    call({submit, Task}).
+    submit_to_pool(?POOL, Task).
 
 
 -spec submit(fun(), list(any())) -> any().
 -spec submit(fun(), list(any())) -> any().
 submit(Fun, Args) ->
 submit(Fun, Args) ->
-    call({submit, {Fun, Args}}).
-
-%% @private
-call(Req) ->
-    gen_server:call(worker(), Req, infinity).
+    submit_to_pool(?POOL, Fun, Args).
 
 
 %% @doc Submit work to the pool asynchronously.
 %% @doc Submit work to the pool asynchronously.
 -spec async_submit(task()) -> ok.
 -spec async_submit(task()) -> ok.
 async_submit(Task) ->
 async_submit(Task) ->
-    cast({async_submit, Task}).
+    async_submit_to_pool(?POOL, Task).
 
 
 -spec async_submit(fun(), list(any())) -> ok.
 -spec async_submit(fun(), list(any())) -> ok.
 async_submit(Fun, Args) ->
 async_submit(Fun, Args) ->
-    cast({async_submit, {Fun, Args}}).
+    async_submit_to_pool(?POOL, Fun, Args).
+
+-spec submit_to_pool(any(), task()) -> any().
+submit_to_pool(Pool, Task) ->
+    call(Pool, {submit, Task}).
+
+-spec submit_to_pool(any(), fun(), list(any())) -> any().
+submit_to_pool(Pool, Fun, Args) ->
+    call(Pool, {submit, {Fun, Args}}).
+
+-spec async_submit_to_pool(any(), task()) -> ok.
+async_submit_to_pool(Pool, Task) ->
+    cast(Pool, {async_submit, Task}).
+
+-spec async_submit_to_pool(any(), fun(), list(any())) -> ok.
+async_submit_to_pool(Pool, Fun, Args) ->
+    cast(Pool, {async_submit, {Fun, Args}}).
 
 
 %% @private
 %% @private
-cast(Msg) ->
-    gen_server:cast(worker(), Msg).
+call(Pool, Req) ->
+    gen_server:call(worker(Pool), Req, infinity).
 
 
 %% @private
 %% @private
-worker() ->
-    gproc_pool:pick_worker(?POOL).
+cast(Pool, Msg) ->
+    gen_server:cast(worker(Pool), Msg).
+
+%% @private
+worker(Pool) ->
+    gproc_pool:pick_worker(Pool).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %% gen_server callbacks
@@ -146,15 +166,25 @@ run(Fun) when is_function(Fun) ->
     Fun().
     Fun().
 
 
 -ifdef(TEST).
 -ifdef(TEST).
+
+worker() ->
+    worker(?POOL).
+
+flush_async_tasks() ->
+    flush_async_tasks(?POOL).
+
 %% This help function creates a large enough number of async tasks
 %% This help function creates a large enough number of async tasks
 %% to force flush the pool workers.
 %% to force flush the pool workers.
 %% The number of tasks should be large enough to ensure all workers have
 %% The number of tasks should be large enough to ensure all workers have
 %% the chance to work on at least one of the tasks.
 %% the chance to work on at least one of the tasks.
-flush_async_tasks() ->
+flush_async_tasks(Pool) ->
     Ref = make_ref(),
     Ref = make_ref(),
     Self = self(),
     Self = self(),
     L = lists:seq(1, 997),
     L = lists:seq(1, 997),
-    lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, Ref, I} end, []) end, L),
+    lists:foreach(
+        fun(I) -> emqx_pool:async_submit_to_pool(Pool, fun() -> Self ! {done, Ref, I} end, []) end,
+        L
+    ),
     lists:foreach(
     lists:foreach(
         fun(I) ->
         fun(I) ->
             receive
             receive

+ 6 - 4
apps/emqx/test/emqx_cm_SUITE.erl

@@ -221,7 +221,7 @@ t_open_session_race_condition(_) ->
     end,
     end,
     %% sync
     %% sync
     ignored = gen_server:call(?CM, ignore, infinity),
     ignored = gen_server:call(?CM, ignore, infinity),
-    ok = emqx_pool:flush_async_tasks(),
+    ok = emqx_pool:flush_async_tasks(?CM_POOL),
     ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
     ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
 
 t_kick_session_discard_normal(_) ->
 t_kick_session_discard_normal(_) ->
@@ -343,7 +343,7 @@ test_stepdown_session(Action, Reason) ->
     end,
     end,
     % sync
     % sync
     ignored = gen_server:call(?CM, ignore, infinity),
     ignored = gen_server:call(?CM, ignore, infinity),
-    ok = flush_emqx_pool(),
+    ok = flush_emqx_cm_pool(),
     ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
     ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
 
 %% Channel deregistration is delegated to emqx_pool as a sync tasks.
 %% Channel deregistration is delegated to emqx_pool as a sync tasks.
@@ -353,10 +353,12 @@ test_stepdown_session(Action, Reason) ->
 %% to sync with the pool workers.
 %% to sync with the pool workers.
 %% The number of tasks should be large enough to ensure all workers have
 %% The number of tasks should be large enough to ensure all workers have
 %% the chance to work on at least one of the tasks.
 %% the chance to work on at least one of the tasks.
-flush_emqx_pool() ->
+flush_emqx_cm_pool() ->
     Self = self(),
     Self = self(),
     L = lists:seq(1, 1000),
     L = lists:seq(1, 1000),
-    lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
+    lists:foreach(
+        fun(I) -> emqx_pool:async_submit_to_pool(?CM_POOL, fun() -> Self ! {done, I} end, []) end, L
+    ),
     lists:foreach(
     lists:foreach(
         fun(I) ->
         fun(I) ->
             receive
             receive

+ 2 - 0
changes/ce/perf-12336.en.md

@@ -0,0 +1,2 @@
+Isolate channels cleanup from other async tasks (like routes cleanup) by using a dedicated pool,
+as this task can be quite slow under high network latency conditions.