فهرست منبع

refactor(mgmt): smplify the node_query/cluster_query implementation

JianBo He 3 سال پیش
والد
کامیت
1fe9c105aa

+ 9 - 25
apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl

@@ -47,7 +47,7 @@
 ]).
 
 -export([
-    query/4,
+    qs2ms/2,
     format_user_info/1,
     group_match_spec/1
 ]).
@@ -66,7 +66,6 @@
     {<<"user_group">>, binary},
     {<<"is_superuser">>, atom}
 ]).
--define(QUERY_FUN, {?MODULE, query}).
 
 -type user_group() :: binary().
 
@@ -264,38 +263,23 @@ list_users(QueryString, #{user_group := UserGroup}) ->
     NQueryString = QueryString#{<<"user_group">> => UserGroup},
     emqx_mgmt_api:node_query(
         node(),
-        NQueryString,
         ?TAB,
+        NQueryString,
         ?AUTHN_QSCHEMA,
-        ?QUERY_FUN,
+        fun ?MODULE:qs2ms/2,
         fun ?MODULE:format_user_info/1
     ).
 
 %%--------------------------------------------------------------------
-%% Query Functions
-
-query(Tab, {QString, []}, Continuation, Limit) ->
-    Ms = ms_from_qstring(QString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        Ms,
-        Continuation,
-        Limit
-    );
-query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
-    Ms = ms_from_qstring(QString),
-    FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        {Ms, FuzzyFilterFun},
-        Continuation,
-        Limit
-    ).
+%% QueryString to MatchSpec
 
-%%--------------------------------------------------------------------
-%% Match funcs
+-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+qs2ms(_Tab, {QString, Fuzzy}) ->
+    {ms_from_qstring(QString), fuzzy_filter_fun(Fuzzy)}.
 
 %% Fuzzy username funcs
+fuzzy_filter_fun([]) ->
+    undefined;
 fuzzy_filter_fun(Fuzzy) ->
     fun(MsRaws) when is_list(MsRaws) ->
         lists:filter(

+ 9 - 25
apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl

@@ -49,7 +49,7 @@
 ]).
 
 -export([
-    query/4,
+    qs2ms/2,
     format_user_info/1,
     group_match_spec/1
 ]).
@@ -84,7 +84,6 @@
     {<<"user_group">>, binary},
     {<<"is_superuser">>, atom}
 ]).
--define(QUERY_FUN, {?MODULE, query}).
 
 %%------------------------------------------------------------------------------
 %% Mnesia bootstrap
@@ -290,38 +289,23 @@ list_users(QueryString, #{user_group := UserGroup}) ->
     NQueryString = QueryString#{<<"user_group">> => UserGroup},
     emqx_mgmt_api:node_query(
         node(),
-        NQueryString,
         ?TAB,
+        NQueryString,
         ?AUTHN_QSCHEMA,
-        ?QUERY_FUN,
+        fun ?MODULE:qs2ms/2,
         fun ?MODULE:format_user_info/1
     ).
 
 %%--------------------------------------------------------------------
-%% Query Functions
-
-query(Tab, {QString, []}, Continuation, Limit) ->
-    Ms = ms_from_qstring(QString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        Ms,
-        Continuation,
-        Limit
-    );
-query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
-    Ms = ms_from_qstring(QString),
-    FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        {Ms, FuzzyFilterFun},
-        Continuation,
-        Limit
-    ).
+%% QueryString to MatchSpec
 
-%%--------------------------------------------------------------------
-%% Match funcs
+-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+qs2ms(_Tab, {QString, FuzzyQString}) ->
+    {ms_from_qstring(QString), fuzzy_filter_fun(FuzzyQString)}.
 
 %% Fuzzy username funcs
+fuzzy_filter_fun([]) ->
+    undefined;
 fuzzy_filter_fun(Fuzzy) ->
     fun(MsRaws) when is_list(MsRaws) ->
         lists:filter(

+ 16 - 46
apps/emqx_authz/src/emqx_authz_api_mnesia.erl

@@ -24,8 +24,8 @@
 
 -import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1, enum/1]).
 
--define(QUERY_USERNAME_FUN, {?MODULE, query_username}).
--define(QUERY_CLIENTID_FUN, {?MODULE, query_clientid}).
+-define(QUERY_USERNAME_FUN, fun ?MODULE:query_username/2).
+-define(QUERY_CLIENTID_FUN, fun ?MODULE:query_clientid/2).
 
 -define(ACL_USERNAME_QSCHEMA, [{<<"like_username">>, binary}]).
 -define(ACL_CLIENTID_QSCHEMA, [{<<"like_clientid">>, binary}]).
@@ -49,12 +49,11 @@
 
 %% query funs
 -export([
-    query_username/4,
-    query_clientid/4
+    query_username/2,
+    query_clientid/2,
+    format_result/1
 ]).
 
--export([format_result/1]).
-
 -define(BAD_REQUEST, 'BAD_REQUEST').
 -define(NOT_FOUND, 'NOT_FOUND').
 -define(ALREADY_EXISTS, 'ALREADY_EXISTS').
@@ -405,8 +404,8 @@ users(get, #{query_string := QueryString}) ->
     case
         emqx_mgmt_api:node_query(
             node(),
-            QueryString,
             ?ACL_TABLE,
+            QueryString,
             ?ACL_USERNAME_QSCHEMA,
             ?QUERY_USERNAME_FUN,
             fun ?MODULE:format_result/1
@@ -441,8 +440,8 @@ clients(get, #{query_string := QueryString}) ->
     case
         emqx_mgmt_api:node_query(
             node(),
-            QueryString,
             ?ACL_TABLE,
+            QueryString,
             ?ACL_CLIENTID_QSCHEMA,
             ?QUERY_CLIENTID_FUN,
             fun ?MODULE:format_result/1
@@ -576,48 +575,19 @@ purge(delete, _) ->
     end.
 
 %%--------------------------------------------------------------------
-%% Query Functions
-
-query_username(Tab, {_QString, []}, Continuation, Limit) ->
-    Ms = emqx_authz_mnesia:list_username_rules(),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        Ms,
-        Continuation,
-        Limit
-    );
-query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
-    Ms = emqx_authz_mnesia:list_username_rules(),
-    FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        {Ms, FuzzyFilterFun},
-        Continuation,
-        Limit
-    ).
+%% QueryString to MatchSpec
 
-query_clientid(Tab, {_QString, []}, Continuation, Limit) ->
-    Ms = emqx_authz_mnesia:list_clientid_rules(),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        Ms,
-        Continuation,
-        Limit
-    );
-query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
-    Ms = emqx_authz_mnesia:list_clientid_rules(),
-    FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        {Ms, FuzzyFilterFun},
-        Continuation,
-        Limit
-    ).
+-spec query_username(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+query_username(_Tab, {_QString, FuzzyQString}) ->
+    {emqx_authz_mnesia:list_username_rules(), fuzzy_filter_fun(FuzzyQString)}.
 
-%%--------------------------------------------------------------------
-%% Match funcs
+-spec query_clientid(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+query_clientid(_Tab, {_QString, FuzzyQString}) ->
+    {emqx_authz_mnesia:list_clientid_rules(), fuzzy_filter_fun(FuzzyQString)}.
 
 %% Fuzzy username funcs
+fuzzy_filter_fun([]) ->
+    undefined;
 fuzzy_filter_fun(Fuzzy) ->
     fun(MsRaws) when is_list(MsRaws) ->
         lists:filter(

+ 12 - 26
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -55,7 +55,7 @@
 
 %% internal exports (for client query)
 -export([
-    query/4,
+    qs2ms/2,
     format_channel_info/1,
     format_channel_info/2
 ]).
@@ -98,8 +98,6 @@ paths() ->
     {<<"lte_lifetime">>, integer}
 ]).
 
--define(QUERY_FUN, {?MODULE, query}).
-
 clients(get, #{
     bindings := #{name := Name0},
     query_string := QString
@@ -110,10 +108,10 @@ clients(get, #{
             case maps:get(<<"node">>, QString, undefined) of
                 undefined ->
                     emqx_mgmt_api:cluster_query(
-                        QString,
                         TabName,
+                        QString,
                         ?CLIENT_QSCHEMA,
-                        ?QUERY_FUN,
+                        fun ?MODULE:qs2ms/2,
                         fun ?MODULE:format_channel_info/2
                     );
                 Node0 ->
@@ -122,10 +120,10 @@ clients(get, #{
                             QStringWithoutNode = maps:without([<<"node">>], QString),
                             emqx_mgmt_api:node_query(
                                 Node1,
-                                QStringWithoutNode,
                                 TabName,
+                                QStringWithoutNode,
                                 ?CLIENT_QSCHEMA,
-                                ?QUERY_FUN,
+                                fun ?MODULE:qs2ms/2,
                                 fun ?MODULE:format_channel_info/2
                             );
                         {error, _} ->
@@ -267,25 +265,11 @@ extra_sub_props(Props) ->
     ).
 
 %%--------------------------------------------------------------------
-%% query funcs
-
-query(Tab, {Qs, []}, Continuation, Limit) ->
-    Ms = qs2ms(Qs),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        Ms,
-        Continuation,
-        Limit
-    );
-query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
-    Ms = qs2ms(Qs),
-    FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        {Ms, FuzzyFilterFun},
-        Continuation,
-        Limit
-    ).
+%% QueryString to MatchSpec
+
+-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+qs2ms(_Tab, {Qs, Fuzzy}) ->
+    {qs2ms(Qs), fuzzy_filter_fun(Fuzzy)}.
 
 qs2ms(Qs) ->
     {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
@@ -340,6 +324,8 @@ ms(lifetime, X) ->
 %%--------------------------------------------------------------------
 %% Fuzzy filter funcs
 
+fuzzy_filter_fun([]) ->
+    undefined;
 fuzzy_filter_fun(Fuzzy) ->
     fun(MsRaws) when is_list(MsRaws) ->
         lists:filter(

+ 101 - 76
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -31,11 +31,10 @@
 -export([
     node_query/6,
     cluster_query/5,
-    select_table_with_count/4,
     b2i/1
 ]).
 
--export([do_query/6]).
+-export([do_query/5]).
 
 paginate(Tables, Params, {Module, FormatFun}) ->
     Qh = query_handle(Tables),
@@ -121,15 +120,37 @@ limit(Params) ->
 %% Node Query
 %%--------------------------------------------------------------------
 
-node_query(Node, QString, Tab, QSchema, QueryFun, FmtFun) ->
+-type query_params() :: list() | map().
+
+-type query_schema() :: [{Key :: binary(), Type :: atom | integer | timestamp | ip | ip_port}].
+
+-type query_to_match_spec_fun() ::
+    fun((list(), list()) -> {ets:match_spec(), fun()}).
+
+-type format_result_fun() ::
+    fun((node(), term()) -> term())
+    | fun((term()) -> term()).
+
+-type query_return() :: #{meta := map(), data := [term()]}.
+
+-spec node_query(
+    node(),
+    atom(),
+    query_params(),
+    query_schema(),
+    query_to_match_spec_fun(),
+    format_result_fun()
+) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
+node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) ->
     case parse_pager_params(QString) of
         false ->
             {error, page_limit_invalid};
         Meta ->
             {_CodCnt, NQString} = parse_qstring(QString, QSchema),
             ResultAcc = #{cursor => 0, count => 0, rows => []},
+            QueryState = init_query_state(Meta),
             NResultAcc = do_node_query(
-                Node, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc
+                Node, Tab, NQString, MsFun, QueryState, ResultAcc
             ),
             format_query_result(FmtFun, Meta, NResultAcc)
     end.
@@ -139,31 +160,36 @@ do_node_query(
     Node,
     Tab,
     QString,
-    QueryFun,
-    Continuation,
-    Meta = #{limit := Limit},
+    MsFun,
+    QueryState,
     ResultAcc
 ) ->
-    case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
+    case do_query(Node, Tab, QString, MsFun, QueryState) of
         {error, {badrpc, R}} ->
             {error, Node, {badrpc, R}};
-        {Rows, ?FRESH_SELECT} ->
-            {_, NResultAcc} = accumulate_query_rows(Node, Rows, ResultAcc, Meta),
+        {Rows, NQueryState = #{continuation := ?FRESH_SELECT}} ->
+            {_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc),
             NResultAcc;
-        {Rows, NContinuation} ->
-            case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of
+        {Rows, NQueryState} ->
+            case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
                 {enough, NResultAcc} ->
                     NResultAcc;
                 {more, NResultAcc} ->
-                    do_node_query(Node, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc)
+                    do_node_query(Node, Tab, QString, MsFun, NQueryState, NResultAcc)
             end
     end.
 
 %%--------------------------------------------------------------------
 %% Cluster Query
 %%--------------------------------------------------------------------
-
-cluster_query(QString, Tab, QSchema, QueryFun, FmtFun) ->
+-spec cluster_query(
+    atom(),
+    query_params(),
+    query_schema(),
+    query_to_match_spec_fun(),
+    format_result_fun()
+) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
+cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
     case parse_pager_params(QString) of
         false ->
             {error, page_limit_invalid};
@@ -171,42 +197,38 @@ cluster_query(QString, Tab, QSchema, QueryFun, FmtFun) ->
             {_CodCnt, NQString} = parse_qstring(QString, QSchema),
             Nodes = mria_mnesia:running_nodes(),
             ResultAcc = #{cursor => 0, count => 0, rows => []},
+            QueryState = init_query_state(Meta),
             NResultAcc = do_cluster_query(
-                Nodes, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc
+                Nodes, Tab, NQString, MsFun, QueryState, ResultAcc
             ),
             format_query_result(FmtFun, Meta, NResultAcc)
     end.
 
 %% @private
-do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, _Meta, ResultAcc) ->
+do_cluster_query([], _Tab, _QString, _QueryFun, _QueryState, ResultAcc) ->
     ResultAcc;
 do_cluster_query(
     [Node | Tail] = Nodes,
     Tab,
     QString,
-    QueryFun,
-    Continuation,
-    Meta = #{limit := Limit},
+    MsFun,
+    QueryState,
     ResultAcc
 ) ->
-    case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
+    case do_query(Node, Tab, QString, MsFun, QueryState) of
         {error, {badrpc, R}} ->
             {error, Node, {badrpc, R}};
-        {Rows, NContinuation} ->
-            case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of
+        {Rows, NQueryState} ->
+            case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
                 {enough, NResultAcc} ->
                     NResultAcc;
                 {more, NResultAcc} ->
-                    case NContinuation of
-                        ?FRESH_SELECT ->
-                            do_cluster_query(
-                                Tail, Tab, QString, QueryFun, ?FRESH_SELECT, Meta, NResultAcc
-                            );
-                        _ ->
-                            do_cluster_query(
-                                Nodes, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc
-                            )
-                    end
+                    NextNodes =
+                        case NQueryState of
+                            #{continuation := ?FRESH_SELECT} -> Tail;
+                            _ -> Nodes
+                        end,
+                    do_cluster_query(NextNodes, Tab, QString, MsFun, NQueryState, NResultAcc)
             end
     end.
 
@@ -214,16 +236,31 @@ do_cluster_query(
 %% Do Query (or rpc query)
 %%--------------------------------------------------------------------
 
+%% QueryState ::
+%%  #{continuation := ets:continuation(),
+%%    page := pos_integer(),
+%%    limit := pos_integer(),
+%%    total := #{node() := non_neg_integer()}
+%%    }
+init_query_state(_Meta = #{page := Page, limit := Limit}) ->
+    #{
+        continuation => ?FRESH_SELECT,
+        page => Page,
+        limit => Limit,
+        total => []
+    }.
+
 %% @private This function is exempt from BPAPI
-do_query(Node, Tab, QString, {M, F}, Continuation, Limit) when Node =:= node() ->
-    erlang:apply(M, F, [Tab, QString, Continuation, Limit]);
-do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
+do_query(Node, Tab, QString, MsFun, QueryState) when Node =:= node(), is_function(MsFun) ->
+    {Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
+    do_select(Tab, Ms, FuzzyFun, QueryState);
+do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) ->
     case
         rpc:call(
             Node,
             ?MODULE,
             do_query,
-            [Node, Tab, QString, QueryFun, Continuation, Limit],
+            [Node, Tab, QString, MsFun, QueryState],
             50000
         )
     of
@@ -231,6 +268,31 @@ do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
         Ret -> Ret
     end.
 
+do_select(
+    Tab,
+    Ms,
+    FuzzyFun,
+    QueryState = #{continuation := Continuation, limit := Limit}
+) ->
+    Result =
+        case Continuation of
+            ?FRESH_SELECT ->
+                ets:select(Tab, Ms, Limit);
+            _ ->
+                ets:select(ets:repair_continuation(Continuation, Ms))
+        end,
+    case Result of
+        '$end_of_table' ->
+            {[], QueryState#{continuation => ?FRESH_SELECT}};
+        {Rows, NContinuation} ->
+            NRows =
+                case is_function(FuzzyFun) of
+                    true -> FuzzyFun(Rows);
+                    false -> Rows
+                end,
+            {NRows, QueryState#{continuation => NContinuation}}
+    end.
+
 %% ResultAcc :: #{count := integer(),
 %%                cursor := integer(),
 %%                rows  := [{node(), Rows :: list()}]
@@ -238,8 +300,8 @@ do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
 accumulate_query_rows(
     Node,
     Rows,
-    ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc},
-    _Meta = #{page := Page, limit := Limit}
+    _QueryState = #{page := Page, limit := Limit},
+    ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
 ) ->
     PageStart = (Page - 1) * Limit + 1,
     PageEnd = Page * Limit,
@@ -266,43 +328,6 @@ accumulate_query_rows(
 %% Table Select
 %%--------------------------------------------------------------------
 
-select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit) when
-    is_function(FuzzyFilterFun) andalso Limit > 0
-->
-    case ets:select(Tab, Ms, Limit) of
-        '$end_of_table' ->
-            {[], ?FRESH_SELECT};
-        {RawResult, NContinuation} ->
-            Rows = FuzzyFilterFun(RawResult),
-            {Rows, NContinuation}
-    end;
-select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit) when
-    is_function(FuzzyFilterFun)
-->
-    case ets:select(ets:repair_continuation(Continuation, Ms)) of
-        '$end_of_table' ->
-            {[], ?FRESH_SELECT};
-        {RawResult, NContinuation} ->
-            Rows = FuzzyFilterFun(RawResult),
-            {Rows, NContinuation}
-    end;
-select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit) when
-    Limit > 0
-->
-    case ets:select(Tab, Ms, Limit) of
-        '$end_of_table' ->
-            {[], ?FRESH_SELECT};
-        {RawResult, NContinuation} ->
-            {RawResult, NContinuation}
-    end;
-select_table_with_count(_Tab, Ms, Continuation, _Limit) ->
-    case ets:select(ets:repair_continuation(Continuation, Ms)) of
-        '$end_of_table' ->
-            {[], ?FRESH_SELECT};
-        {RawResult, NContinuation} ->
-            {RawResult, NContinuation}
-    end.
-
 %%--------------------------------------------------------------------
 %% Internal Functions
 %%--------------------------------------------------------------------

+ 6 - 6
apps/emqx_management/src/emqx_mgmt_api_alarms.erl

@@ -29,7 +29,7 @@
 -define(TAGS, [<<"Alarms">>]).
 
 %% internal export (for query)
--export([query/4]).
+-export([qs2ms/2]).
 
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@@ -114,10 +114,10 @@ alarms(get, #{query_string := QString}) ->
         end,
     case
         emqx_mgmt_api:cluster_query(
-            QString,
             Table,
+            QString,
             [],
-            {?MODULE, query},
+            fun ?MODULE:qs2ms/2,
             fun ?MODULE:format_alarm/2
         )
     of
@@ -136,9 +136,9 @@ alarms(delete, _Params) ->
 %%%==============================================================================================
 %% internal
 
-query(Table, _QsSpec, Continuation, Limit) ->
-    Ms = [{'$1', [], ['$1']}],
-    emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit).
+-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+qs2ms(_Tab, {_Qs, _Fuzzy}) ->
+    {[{'$1', [], ['$1']}], undefined}.
 
 format_alarm(WhichNode, Alarm) ->
     emqx_alarm:format(WhichNode, Alarm).

+ 11 - 27
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -46,7 +46,7 @@
 ]).
 
 -export([
-    query/4,
+    qs2ms/2,
     format_channel_info/1,
     format_channel_info/2
 ]).
@@ -74,7 +74,6 @@
     {<<"lte_connected_at">>, timestamp}
 ]).
 
--define(QUERY_FUN, {?MODULE, query}).
 -define(FORMAT_FUN, {?MODULE, format_channel_info}).
 
 -define(CLIENT_ID_NOT_FOUND,
@@ -643,10 +642,10 @@ list_clients(QString) ->
         case maps:get(<<"node">>, QString, undefined) of
             undefined ->
                 emqx_mgmt_api:cluster_query(
-                    QString,
                     ?CLIENT_QTAB,
+                    QString,
                     ?CLIENT_QSCHEMA,
-                    ?QUERY_FUN,
+                    fun ?MODULE:qs2ms/2,
                     fun ?MODULE:format_channel_info/2
                 );
             Node0 ->
@@ -655,10 +654,10 @@ list_clients(QString) ->
                         QStringWithoutNode = maps:without([<<"node">>], QString),
                         emqx_mgmt_api:node_query(
                             Node1,
-                            QStringWithoutNode,
                             ?CLIENT_QTAB,
+                            QStringWithoutNode,
                             ?CLIENT_QSCHEMA,
-                            ?QUERY_FUN,
+                            fun ?MODULE:qs2ms/2,
                             fun ?MODULE:format_channel_info/2
                         );
                     {error, _} ->
@@ -783,30 +782,13 @@ do_unsubscribe(ClientID, Topic) ->
             Res
     end.
 
-%%--------------------------------------------------------------------
-%% Query Functions
-
-query(Tab, {QString, []}, Continuation, Limit) ->
-    Ms = qs2ms(QString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        Ms,
-        Continuation,
-        Limit
-    );
-query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
-    Ms = qs2ms(QString),
-    FuzzyFilterFun = fuzzy_filter_fun(FuzzyQString),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        {Ms, FuzzyFilterFun},
-        Continuation,
-        Limit
-    ).
-
 %%--------------------------------------------------------------------
 %% QueryString to Match Spec
 
+-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+qs2ms(_Tab, {QString, FuzzyQString}) ->
+    {qs2ms(QString), fuzzy_filter_fun(FuzzyQString)}.
+
 -spec qs2ms(list()) -> ets:match_spec().
 qs2ms(Qs) ->
     {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
@@ -856,6 +838,8 @@ ms(created_at, X) ->
 %%--------------------------------------------------------------------
 %% Match funcs
 
+fuzzy_filter_fun([]) ->
+    undefined;
 fuzzy_filter_fun(Fuzzy) ->
     fun(MsRaws) when is_list(MsRaws) ->
         lists:filter(

+ 27 - 46
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -32,7 +32,7 @@
 -export([subscriptions/2]).
 
 -export([
-    query/4,
+    qs2ms/2,
     format/2
 ]).
 
@@ -47,8 +47,6 @@
     {<<"match_topic">>, binary}
 ]).
 
--define(QUERY_FUN, {?MODULE, query}).
-
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
 
@@ -139,10 +137,10 @@ subscriptions(get, #{query_string := QString}) ->
         case maps:get(<<"node">>, QString, undefined) of
             undefined ->
                 emqx_mgmt_api:cluster_query(
-                    QString,
                     ?SUBS_QTABLE,
+                    QString,
                     ?SUBS_QSCHEMA,
-                    ?QUERY_FUN,
+                    fun ?MODULE:qs2ms/2,
                     fun ?MODULE:format/2
                 );
             Node0 ->
@@ -150,10 +148,10 @@ subscriptions(get, #{query_string := QString}) ->
                     {ok, Node1} ->
                         emqx_mgmt_api:node_query(
                             Node1,
-                            QString,
                             ?SUBS_QTABLE,
+                            QString,
                             ?SUBS_QSCHEMA,
-                            ?QUERY_FUN,
+                            fun ?MODULE:qs2ms/2,
                             fun ?MODULE:format/2
                         );
                     {error, _} ->
@@ -188,51 +186,21 @@ get_topic(Topic, _) ->
     Topic.
 
 %%--------------------------------------------------------------------
-%% Query Function
+%% QueryString to MatchSpec
 %%--------------------------------------------------------------------
 
-query(Tab, {Qs, []}, Continuation, Limit) ->
-    Ms = qs2ms(Qs),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        Ms,
-        Continuation,
-        Limit
-    );
-query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
-    Ms = qs2ms(Qs),
-    FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
-    emqx_mgmt_api:select_table_with_count(
-        Tab,
-        {Ms, FuzzyFilterFun},
-        Continuation,
-        Limit
-    ).
-
-fuzzy_filter_fun(Fuzzy) ->
-    fun(MsRaws) when is_list(MsRaws) ->
-        lists:filter(
-            fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
-            MsRaws
-        )
-    end.
-
-run_fuzzy_filter(_, []) ->
-    true;
-run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
-    emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).
-
-%%--------------------------------------------------------------------
-%% Query String to Match Spec
+-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+qs2ms(_Tab, {Qs, Fuzzy}) ->
+    {gen_match_spec(Qs), fuzzy_filter_fun(Fuzzy)}.
 
-qs2ms(Qs) ->
-    MtchHead = qs2ms(Qs, {{'_', '_'}, #{}}),
+gen_match_spec(Qs) ->
+    MtchHead = gen_match_spec(Qs, {{'_', '_'}, #{}}),
     [{MtchHead, [], ['$_']}].
 
-qs2ms([], MtchHead) ->
+gen_match_spec([], MtchHead) ->
     MtchHead;
-qs2ms([{Key, '=:=', Value} | More], MtchHead) ->
-    qs2ms(More, update_ms(Key, Value, MtchHead)).
+gen_match_spec([{Key, '=:=', Value} | More], MtchHead) ->
+    gen_match_spec(More, update_ms(Key, Value, MtchHead)).
 
 update_ms(clientid, X, {{Pid, Topic}, Opts}) ->
     {{Pid, Topic}, Opts#{subid => X}};
@@ -242,3 +210,16 @@ update_ms(share_group, X, {{Pid, Topic}, Opts}) ->
     {{Pid, Topic}, Opts#{share => X}};
 update_ms(qos, X, {{Pid, Topic}, Opts}) ->
     {{Pid, Topic}, Opts#{qos => X}}.
+
+fuzzy_filter_fun(Fuzzy) ->
+    fun(MsRaws) when is_list(MsRaws) ->
+        lists:filter(
+            fun(E) -> run_fuzzy_filter(E, Fuzzy) end,
+            MsRaws
+        )
+    end.
+
+run_fuzzy_filter(_, []) ->
+    true;
+run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
+    emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).

+ 10 - 11
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -34,7 +34,7 @@
     topic/2
 ]).
 
--export([query/4]).
+-export([qs2ms/2, format/1]).
 
 -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
 
@@ -110,10 +110,10 @@ do_list(Params) ->
     case
         emqx_mgmt_api:node_query(
             node(),
-            Params,
             emqx_route,
+            Params,
             ?TOPICS_QUERY_SCHEMA,
-            {?MODULE, query},
+            fun ?MODULE:qs2ms/2,
             fun ?MODULE:format/1
         )
     of
@@ -143,16 +143,15 @@ generate_topic(Params = #{topic := Topic}) ->
 generate_topic(Params) ->
     Params.
 
-query(Tab, {Qs, _}, Continuation, Limit) ->
-    Ms = qs2ms(Qs, [{{route, '_', '_'}, [], ['$_']}]),
-    emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit, fun format/1).
+qs2ms(_Tab, {Qs, _}) ->
+    {gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]), undefined}.
 
-qs2ms([], Res) ->
+gen_match_spec([], Res) ->
     Res;
-qs2ms([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) ->
-    qs2ms(Qs, [{{route, T, N}, [], ['$_']}]);
-qs2ms([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
-    qs2ms(Qs, [{{route, T, N}, [], ['$_']}]).
+gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) ->
+    gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]);
+gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
+    gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
 
 format(#route{topic = Topic, dest = {_, Node}}) ->
     #{topic => Topic, node => Node};

+ 12 - 6
apps/emqx_modules/src/emqx_delayed.erl

@@ -56,10 +56,12 @@
     get_delayed_message/2,
     delete_delayed_message/1,
     delete_delayed_message/2,
-    cluster_list/1,
-    cluster_query/4
+    cluster_list/1
 ]).
 
+%% internal exports
+-export([qs2ms/2]).
+
 -export([
     post_config_update/5
 ]).
@@ -168,12 +170,16 @@ list(Params) ->
 cluster_list(Params) ->
     %% FIXME: why cluster_query???
     emqx_mgmt_api:cluster_query(
-        Params, ?TAB, [], {?MODULE, cluster_query}, fun ?MODULE:format_delayed/1
+        ?TAB,
+        Params,
+        [],
+        fun ?MODULE:qs2ms/2,
+        fun ?MODULE:format_delayed/1
     ).
 
-cluster_query(Table, _QsSpec, Continuation, Limit) ->
-    Ms = [{'$1', [], ['$1']}],
-    emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit).
+-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
+qs2ms(_Table, {_Qs, _Fuzzy}) ->
+    {[{'$1', [], ['$1']}], undefined}.
 
 format_delayed(Delayed) ->
     format_delayed(Delayed, false).

+ 5 - 8
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -34,7 +34,7 @@
 -export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]).
 
 %% query callback
--export([query/4]).
+-export([qs2ms/2, format_rule_resp/1]).
 
 -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
 -define(ERR_BADARGS(REASON), begin
@@ -274,10 +274,10 @@ param_path_id() ->
     case
         emqx_mgmt_api:node_query(
             node(),
-            QueryString,
             ?RULE_TAB,
+            QueryString,
             ?RULE_QS_SCHEMA,
-            {?MODULE, query},
+            fun ?MODULE:qs2ms/2,
             fun ?MODULE:format_rule_resp/1
         )
     of
@@ -553,12 +553,9 @@ filter_out_request_body(Conf) ->
     ],
     maps:without(ExtraConfs, Conf).
 
-query(Tab, {Qs, Fuzzy}, Start, Limit) ->
+qs2ms(_Tab, {Qs, Fuzzy}) ->
     Ms = qs2ms(),
-    FuzzyFun = fuzzy_match_fun(Qs, Ms, Fuzzy),
-    emqx_mgmt_api:select_table_with_count(
-        Tab, {Ms, FuzzyFun}, Start, Limit
-    ).
+    {Ms, fuzzy_match_fun(Qs, Ms, Fuzzy)}.
 
 %% rule is not a record, so everything is fuzzy filter.
 qs2ms() ->