Bläddra i källkod

chore(rebalance): use emqx_utils_streams instead of qlc

Ilya Averyanov 1 år sedan
förälder
incheckning
4ae6b6d7be

+ 46 - 18
apps/emqx/src/emqx_cm.erl

@@ -25,7 +25,6 @@
 -include("emqx_mqtt.hrl").
 -include("emqx_external_trace.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
--include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
 
 -export([start_link/0]).
@@ -75,8 +74,8 @@
 
 %% Client management
 -export([
-    all_channels_table/1,
-    live_connection_table/1
+    all_channels_stream/1,
+    live_connection_stream/1
 ]).
 
 %% gen_server callbacks
@@ -140,6 +139,8 @@
 %% Batch drain
 -define(BATCH_SIZE, 100000).
 
+-define(CHAN_INFO_SELECT_LIMIT, 100).
+
 %% Server name
 -define(CM, ?MODULE).
 
@@ -650,30 +651,57 @@ all_channels() ->
     ets:select(?CHAN_TAB, Pat).
 
 %% @doc Get clientinfo for all clients
-all_channels_table(ConnModuleList) ->
+-spec all_channels_stream([module()]) ->
+    emqx_utils_stream:stream({
+        emqx_types:clientid(),
+        _ConnState :: atom(),
+        emqx_types:conninfo(),
+        emqx_types:clientinfo()
+    }).
+all_channels_stream(ConnModuleList) ->
     Ms = ets:fun2ms(
         fun({{ClientId, _ChanPid}, Info, _Stats}) ->
             {ClientId, Info}
         end
     ),
-    Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
     ConnModules = sets:from_list(ConnModuleList, [{version, 2}]),
-    qlc:q([
-        {ClientId, ConnState, ConnInfo, ClientInfo}
-     || {ClientId, #{
-            conn_state := ConnState,
-            clientinfo := ClientInfo,
-            conninfo := #{conn_mod := ConnModule} = ConnInfo
-        }} <-
-            Table,
-        sets:is_element(ConnModule, ConnModules)
-    ]).
+    AllChanInfoStream = emqx_utils_stream:ets(fun
+        (undefined) -> ets:select(?CHAN_INFO_TAB, Ms, ?CHAN_INFO_SELECT_LIMIT);
+        (Cont) -> ets:select(Cont)
+    end),
+    WithModulesFilteredStream = emqx_utils_stream:filter(
+        fun({_, #{conninfo := #{conn_mod := ConnModule}}}) ->
+            sets:is_element(ConnModule, ConnModules)
+        end,
+        AllChanInfoStream
+    ),
+    %% Map to the plain tuples
+    emqx_utils_stream:map(
+        fun(
+            {ClientId, #{
+                conn_state := ConnState,
+                clientinfo := ClientInfo,
+                conninfo := ConnInfo
+            }}
+        ) ->
+            {ClientId, ConnState, ConnInfo, ClientInfo}
+        end,
+        WithModulesFilteredStream
+    ).
 
 %% @doc Get all local connection query handle
-live_connection_table(ConnModules) ->
+-spec live_connection_stream([module()]) ->
+    emqx_utils_stream:stream({emqx_types:clientid(), pid()}).
+live_connection_stream(ConnModules) ->
     Ms = lists:map(fun live_connection_ms/1, ConnModules),
-    Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]),
-    qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ChanPid)]).
+    AllConnStream = emqx_utils_stream:ets(fun
+        (undefined) -> ets:select(?CHAN_CONN_TAB, Ms, ?CHAN_INFO_SELECT_LIMIT);
+        (Cont) -> ets:select(Cont)
+    end),
+    emqx_utils_stream:filter(
+        fun({_ClientId, ChanPid}) -> is_channel_connected(ChanPid) end,
+        AllConnStream
+    ).
 
 live_connection_ms(ConnModule) ->
     {{{'$1', '$2'}, ConnModule}, [], [{{'$1', '$2'}}]}.

+ 2 - 2
apps/emqx/test/emqx_connection_conninfo_SUITE.erl

@@ -59,8 +59,8 @@ t_inconsistent_chan_info(_Config) ->
 
     ClientIds = [
         ClientId
-     || {ClientId, _ConnState, _ConnInfo, _ClientInfo} <- qlc:eval(
-            emqx_cm:all_channels_table([emqx_connection])
+     || {ClientId, _ConnState, _ConnInfo, _ClientInfo} <- emqx_utils_stream:consume(
+            emqx_cm:all_channels_stream([emqx_connection])
         )
     ],
 

+ 1 - 1
apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src

@@ -1,6 +1,6 @@
 {application, emqx_eviction_agent, [
     {description, "EMQX Eviction Agent"},
-    {vsn, "5.1.8"},
+    {vsn, "5.1.9"},
     {registered, [
         emqx_eviction_agent_sup,
         emqx_eviction_agent,

+ 46 - 40
apps/emqx_eviction_agent/src/emqx_eviction_agent.erl

@@ -9,7 +9,6 @@
 -include_lib("emqx/include/types.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 
--include_lib("stdlib/include/qlc.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -export([
@@ -267,25 +266,31 @@ stats() ->
         sessions => session_count()
     }.
 
-connection_table() ->
-    emqx_cm:live_connection_table(?CONN_MODULES).
+connection_stream() ->
+    emqx_cm:live_connection_stream(?CONN_MODULES).
 
 connection_count() ->
-    table_count(connection_table()).
-
-channel_table(any) ->
-    qlc:q([
-        {ClientId, ConnInfo, ClientInfo}
-     || {ClientId, _, ConnInfo, ClientInfo} <-
-            emqx_cm:all_channels_table(?CONN_MODULES)
-    ]);
-channel_table(RequiredConnState) ->
-    qlc:q([
-        {ClientId, ConnInfo, ClientInfo}
-     || {ClientId, ConnState, ConnInfo, ClientInfo} <-
-            emqx_cm:all_channels_table(?CONN_MODULES),
-        RequiredConnState =:= ConnState
-    ]).
+    stream_count(connection_stream()).
+
+channel_stream(any) ->
+    emqx_utils_stream:map(
+        fun({ClientId, _, ConnInfo, ClientInfo}) ->
+            {ClientId, ConnInfo, ClientInfo}
+        end,
+        emqx_cm:all_channels_stream(?CONN_MODULES)
+    );
+channel_stream(RequiredConnState) ->
+    WithRequiredConnStateStream =
+        emqx_utils_stream:filter(
+            fun({_, ConnState, _, _}) -> RequiredConnState =:= ConnState end,
+            emqx_cm:all_channels_stream(?CONN_MODULES)
+        ),
+    emqx_utils_stream:map(
+        fun({ClientId, _, ConnInfo, ClientInfo}) ->
+            {ClientId, ConnInfo, ClientInfo}
+        end,
+        WithRequiredConnStateStream
+    ).
 
 -spec all_channels_count() -> non_neg_integer().
 all_channels_count() ->
@@ -312,7 +317,7 @@ all_channels_count() ->
 
 -spec all_local_channels_count() -> non_neg_integer().
 all_local_channels_count() ->
-    table_count(channel_table(any)).
+    stream_count(channel_stream(any)).
 
 session_count() ->
     session_count(any) + durable_session_count().
@@ -321,34 +326,27 @@ durable_session_count() ->
     emqx_persistent_session_bookkeeper:get_disconnected_session_count().
 
 session_count(ConnState) ->
-    table_count(channel_table(ConnState)).
+    stream_count(channel_stream(ConnState)).
 
-table_count(QH) ->
-    qlc:fold(fun(_, Acc) -> Acc + 1 end, 0, QH).
+stream_count(Stream) ->
+    emqx_utils_stream:fold(fun(_, Acc) -> Acc + 1 end, 0, Stream).
 
 take_connections(N) ->
-    ChanQH = qlc:q([ChanPid || {_ClientId, ChanPid} <- connection_table()]),
-    ChanPidCursor = qlc:cursor(ChanQH),
-    ChanPids = qlc:next_answers(ChanPidCursor, N),
-    ok = qlc:delete_cursor(ChanPidCursor),
-    ChanPids.
+    PidStream = emqx_utils_stream:map(
+        fun({_ClientId, ChanPid}) -> ChanPid end,
+        connection_stream()
+    ),
+    consume(N, PidStream).
 
 take_channels(N) ->
-    QH = qlc:q([
-        {ClientId, ConnInfo, ClientInfo}
-     || {ClientId, _, ConnInfo, ClientInfo} <-
-            emqx_cm:all_channels_table(?CONN_MODULES)
-    ]),
-    ChanPidCursor = qlc:cursor(QH),
-    Channels = qlc:next_answers(ChanPidCursor, N),
-    ok = qlc:delete_cursor(ChanPidCursor),
-    Channels.
+    Stream = emqx_utils_stream:map(
+        fun({ClientId, _, ConnInfo, ClientInfo}) -> {ClientId, ConnInfo, ClientInfo} end,
+        emqx_cm:all_channels_stream(?CONN_MODULES)
+    ),
+    consume(N, Stream).
 
 take_channels(N, ConnState) ->
-    ChanPidCursor = qlc:cursor(channel_table(ConnState)),
-    Channels = qlc:next_answers(ChanPidCursor, N),
-    ok = qlc:delete_cursor(ChanPidCursor),
-    Channels.
+    consume(N, channel_stream(ConnState)).
 
 do_evict_connections(N, ServerReference) when N > 0 ->
     ChanPids = take_connections(N),
@@ -490,3 +488,11 @@ do_purge_durable_sessions(N) when N > 0 ->
 
 select_random(List) when length(List) > 0 ->
     lists:nth(rand:uniform(length(List)), List).
+
+consume(N, Stream) ->
+    case emqx_utils_stream:consume(N, Stream) of
+        {Items, _Stream} ->
+            Items;
+        Items when is_list(Items) ->
+            Items
+    end.