Sfoglia il codice sorgente

fix(mgmt): optimize the speed of query tail pages

In the previous, when you query the tail pages, all the front of rows
will be queried out and formatted. It greatly hurts the speed of query.

Currently, we only format the final result rows. i.e, the query for the
last page of data will be 10x faster.
JianBo He 3 anni fa
parent
commit
08121e7df6

+ 9 - 5
apps/emqx/src/emqx_alarm.erl

@@ -41,7 +41,8 @@
     delete_all_deactivated_alarms/0,
     get_alarms/0,
     get_alarms/1,
-    format/1
+    format/1,
+    format/2
 ]).
 
 %% gen_server callbacks
@@ -169,12 +170,15 @@ get_alarms(activated) ->
 get_alarms(deactivated) ->
     gen_server:call(?MODULE, {get_alarms, deactivated}).
 
-format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) ->
+format(Alarm) ->
+    format(node(), Alarm).
+
+format(Node, #activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) ->
     Now = erlang:system_time(microsecond),
     %% mnesia db stored microsecond for high frequency alarm
     %% format for dashboard using millisecond
     #{
-        node => node(),
+        node => Node,
         name => Name,
         message => Message,
         %% to millisecond
@@ -182,7 +186,7 @@ format(#activated_alarm{name = Name, message = Message, activate_at = At, detail
         activate_at => to_rfc3339(At),
         details => Details
     };
-format(#deactivated_alarm{
+format(Node, #deactivated_alarm{
     name = Name,
     message = Message,
     activate_at = At,
@@ -190,7 +194,7 @@ format(#deactivated_alarm{
     deactivate_at = DAt
 }) ->
     #{
-        node => node(),
+        node => Node,
         name => Name,
         message => Message,
         %% to millisecond

+ 10 - 5
apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl

@@ -262,7 +262,14 @@ lookup_user(UserID, #{user_group := UserGroup}) ->
 
 list_users(QueryString, #{user_group := UserGroup}) ->
     NQueryString = QueryString#{<<"user_group">> => UserGroup},
-    emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN).
+    emqx_mgmt_api:node_query(
+        node(),
+        NQueryString,
+        ?TAB,
+        ?AUTHN_QSCHEMA,
+        ?QUERY_FUN,
+        fun ?MODULE:format_user_info/1
+    ).
 
 %%--------------------------------------------------------------------
 %% Query Functions
@@ -273,8 +280,7 @@ query(Tab, {QString, []}, Continuation, Limit) ->
         Tab,
         Ms,
         Continuation,
-        Limit,
-        fun format_user_info/1
+        Limit
     );
 query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
     Ms = ms_from_qstring(QString),
@@ -283,8 +289,7 @@ query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
         Tab,
         {Ms, FuzzyFilterFun},
         Continuation,
-        Limit,
-        fun format_user_info/1
+        Limit
     ).
 
 %%--------------------------------------------------------------------

+ 10 - 5
apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl

@@ -288,7 +288,14 @@ lookup_user(UserID, #{user_group := UserGroup}) ->
 
 list_users(QueryString, #{user_group := UserGroup}) ->
     NQueryString = QueryString#{<<"user_group">> => UserGroup},
-    emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN).
+    emqx_mgmt_api:node_query(
+        node(),
+        NQueryString,
+        ?TAB,
+        ?AUTHN_QSCHEMA,
+        ?QUERY_FUN,
+        fun ?MODULE:format_user_info/1
+    ).
 
 %%--------------------------------------------------------------------
 %% Query Functions
@@ -299,8 +306,7 @@ query(Tab, {QString, []}, Continuation, Limit) ->
         Tab,
         Ms,
         Continuation,
-        Limit,
-        fun format_user_info/1
+        Limit
     );
 query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
     Ms = ms_from_qstring(QString),
@@ -309,8 +315,7 @@ query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
         Tab,
         {Ms, FuzzyFilterFun},
         Continuation,
-        Limit,
-        fun format_user_info/1
+        Limit
     ).
 
 %%--------------------------------------------------------------------

+ 8 - 10
apps/emqx_authz/src/emqx_authz_api_mnesia.erl

@@ -408,7 +408,8 @@ users(get, #{query_string := QueryString}) ->
             QueryString,
             ?ACL_TABLE,
             ?ACL_USERNAME_QSCHEMA,
-            ?QUERY_USERNAME_FUN
+            ?QUERY_USERNAME_FUN,
+            fun ?MODULE:format_result/1
         )
     of
         {error, page_limit_invalid} ->
@@ -443,7 +444,8 @@ clients(get, #{query_string := QueryString}) ->
             QueryString,
             ?ACL_TABLE,
             ?ACL_CLIENTID_QSCHEMA,
-            ?QUERY_CLIENTID_FUN
+            ?QUERY_CLIENTID_FUN,
+            fun ?MODULE:format_result/1
         )
     of
         {error, page_limit_invalid} ->
@@ -582,8 +584,7 @@ query_username(Tab, {_QString, []}, Continuation, Limit) ->
         Tab,
         Ms,
         Continuation,
-        Limit,
-        fun format_result/1
+        Limit
     );
 query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
     Ms = emqx_authz_mnesia:list_username_rules(),
@@ -592,8 +593,7 @@ query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
         Tab,
         {Ms, FuzzyFilterFun},
         Continuation,
-        Limit,
-        fun format_result/1
+        Limit
     ).
 
 query_clientid(Tab, {_QString, []}, Continuation, Limit) ->
@@ -602,8 +602,7 @@ query_clientid(Tab, {_QString, []}, Continuation, Limit) ->
         Tab,
         Ms,
         Continuation,
-        Limit,
-        fun format_result/1
+        Limit
     );
 query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
     Ms = emqx_authz_mnesia:list_clientid_rules(),
@@ -612,8 +611,7 @@ query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) ->
         Tab,
         {Ms, FuzzyFilterFun},
         Continuation,
-        Limit,
-        fun format_result/1
+        Limit
     ).
 
 %%--------------------------------------------------------------------

+ 13 - 9
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -56,7 +56,8 @@
 %% internal exports (for client query)
 -export([
     query/4,
-    format_channel_info/1
+    format_channel_info/1,
+    format_channel_info/2
 ]).
 
 -define(TAGS, [<<"Gateway Clients">>]).
@@ -112,7 +113,8 @@ clients(get, #{
                         QString,
                         TabName,
                         ?CLIENT_QSCHEMA,
-                        ?QUERY_FUN
+                        ?QUERY_FUN,
+                        fun ?MODULE:format_channel_info/2
                     );
                 Node0 ->
                     case emqx_misc:safe_to_existing_atom(Node0) of
@@ -123,7 +125,8 @@ clients(get, #{
                                 QStringWithoutNode,
                                 TabName,
                                 ?CLIENT_QSCHEMA,
-                                ?QUERY_FUN
+                                ?QUERY_FUN,
+                                fun ?MODULE:format_channel_info/2
                             );
                         {error, _} ->
                             {error, Node0, {badrpc, <<"invalid node">>}}
@@ -272,8 +275,7 @@ query(Tab, {Qs, []}, Continuation, Limit) ->
         Tab,
         Ms,
         Continuation,
-        Limit,
-        fun format_channel_info/1
+        Limit
     );
 query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
     Ms = qs2ms(Qs),
@@ -282,8 +284,7 @@ query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
         Tab,
         {Ms, FuzzyFilterFun},
         Continuation,
-        Limit,
-        fun format_channel_info/1
+        Limit
     ).
 
 qs2ms(Qs) ->
@@ -363,8 +364,11 @@ run_fuzzy_filter(
 %%--------------------------------------------------------------------
 %% format funcs
 
-format_channel_info({_, Infos, Stats} = R) ->
-    Node = maps:get(node, Infos, node()),
+format_channel_info(ChannInfo) ->
+    format_channel_info(node(), ChannInfo).
+
+format_channel_info(WhichNode, {_, Infos, Stats} = R) ->
+    Node = maps:get(node, Infos, WhichNode),
     ClientInfo = maps:get(clientinfo, Infos, #{}),
     ConnInfo = maps:get(conninfo, Infos, #{}),
     SessInfo = maps:get(session, Infos, #{}),

+ 128 - 111
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -29,9 +29,9 @@
 
 %% first_next query APIs
 -export([
-    node_query/5,
-    cluster_query/4,
-    select_table_with_count/5,
+    node_query/6,
+    cluster_query/5,
+    select_table_with_count/4,
     b2i/1
 ]).
 
@@ -117,30 +117,24 @@ limit(Params) when is_map(Params) ->
 limit(Params) ->
     proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
 
-init_meta(Params) ->
-    Limit = b2i(limit(Params)),
-    Page = b2i(page(Params)),
-    #{
-        page => Page,
-        limit => Limit,
-        count => 0
-    }.
-
 %%--------------------------------------------------------------------
 %% Node Query
 %%--------------------------------------------------------------------
 
-node_query(Node, QString, Tab, QSchema, QueryFun) ->
-    {_CodCnt, NQString} = parse_qstring(QString, QSchema),
-    page_limit_check_query(
-        init_meta(QString),
-        {fun do_node_query/5, [Node, Tab, NQString, QueryFun, init_meta(QString)]}
-    ).
+node_query(Node, QString, Tab, QSchema, QueryFun, FmtFun) ->
+    case parse_pager_params(QString) of
+        false ->
+            {error, page_limit_invalid};
+        Meta ->
+            {_CodCnt, NQString} = parse_qstring(QString, QSchema),
+            ResultAcc = #{cursor => 0, count => 0, rows => []},
+            NResultAcc = do_node_query(
+                Node, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc
+            ),
+            format_query_result(FmtFun, Meta, NResultAcc)
+    end.
 
 %% @private
-do_node_query(Node, Tab, QString, QueryFun, Meta) ->
-    do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
-
 do_node_query(
     Node,
     Tab,
@@ -148,45 +142,44 @@ do_node_query(
     QueryFun,
     Continuation,
     Meta = #{limit := Limit},
-    Results
+    ResultAcc
 ) ->
     case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
         {error, {badrpc, R}} ->
             {error, Node, {badrpc, R}};
-        {Len, Rows, ?FRESH_SELECT} ->
-            {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
-            #{meta => NMeta, data => NResults};
-        {Len, Rows, NContinuation} ->
-            {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
-            do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
+        {Rows, ?FRESH_SELECT} ->
+            {_, NResultAcc} = accumulate_query_rows(Node, Rows, ResultAcc, Meta),
+            NResultAcc;
+        {Rows, NContinuation} ->
+            case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of
+                {enough, NResultAcc} ->
+                    NResultAcc;
+                {more, NResultAcc} ->
+                    do_node_query(Node, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc)
+            end
     end.
 
 %%--------------------------------------------------------------------
 %% Cluster Query
 %%--------------------------------------------------------------------
 
-cluster_query(QString, Tab, QSchema, QueryFun) ->
-    {_CodCnt, NQString} = parse_qstring(QString, QSchema),
-    Nodes = mria_mnesia:running_nodes(),
-    page_limit_check_query(
-        init_meta(QString),
-        {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]}
-    ).
+cluster_query(QString, Tab, QSchema, QueryFun, FmtFun) ->
+    case parse_pager_params(QString) of
+        false ->
+            {error, page_limit_invalid};
+        Meta ->
+            {_CodCnt, NQString} = parse_qstring(QString, QSchema),
+            Nodes = mria_mnesia:running_nodes(),
+            ResultAcc = #{cursor => 0, count => 0, rows => []},
+            NResultAcc = do_cluster_query(
+                Nodes, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc
+            ),
+            format_query_result(FmtFun, Meta, NResultAcc)
+    end.
 
 %% @private
-do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) ->
-    do_cluster_query(
-        Nodes,
-        Tab,
-        QString,
-        QueryFun,
-        _Continuation = ?FRESH_SELECT,
-        Meta,
-        _Results = []
-    ).
-
-do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) ->
-    #{meta => Meta, data => Results};
+do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, _Meta, ResultAcc) ->
+    ResultAcc;
 do_cluster_query(
     [Node | Tail] = Nodes,
     Tab,
@@ -194,17 +187,27 @@ do_cluster_query(
     QueryFun,
     Continuation,
     Meta = #{limit := Limit},
-    Results
+    ResultAcc
 ) ->
     case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
         {error, {badrpc, R}} ->
-            {error, Node, {bar_rpc, R}};
-        {Len, Rows, ?FRESH_SELECT} ->
-            {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
-            do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults);
-        {Len, Rows, NContinuation} ->
-            {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
-            do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
+            {error, Node, {badrpc, R}};
+        {Rows, NContinuation} ->
+            case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of
+                {enough, NResultAcc} ->
+                    NResultAcc;
+                {more, NResultAcc} ->
+                    case NContinuation of
+                        ?FRESH_SELECT ->
+                            do_cluster_query(
+                                Tail, Tab, QString, QueryFun, ?FRESH_SELECT, Meta, NResultAcc
+                            );
+                        _ ->
+                            do_cluster_query(
+                                Nodes, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc
+                            )
+                    end
+            end
     end.
 
 %%--------------------------------------------------------------------
@@ -228,60 +231,76 @@ do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
         Ret -> Ret
     end.
 
-sub_query_result(Len, Rows, Limit, Results, Meta) ->
-    {Flag, NMeta} = judge_page_with_counting(Len, Meta),
-    NResults =
-        case Flag of
-            more ->
-                [];
-            cutrows ->
-                {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta),
-                ThisRows = lists:sublist(Rows, SubStart, NeedNowNum),
-                lists:sublist(lists:append(Results, ThisRows), SubStart, Limit);
-            enough ->
-                lists:sublist(lists:append(Results, Rows), 1, Limit)
-        end,
-    {NMeta, NResults}.
+%% ResultAcc :: #{count := integer(),
+%%                cursor := integer(),
+%%                rows  := [{node(), Rows :: list()}]
+%%               }
+accumulate_query_rows(
+    Node,
+    Rows,
+    ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc},
+    _Meta = #{page := Page, limit := Limit}
+) ->
+    PageStart = (Page - 1) * Limit + 1,
+    PageEnd = Page * Limit,
+    Len = length(Rows),
+    case Cursor + Len of
+        NCursor when NCursor < PageStart ->
+            {more, ResultAcc#{cursor => NCursor}};
+        NCursor when NCursor < PageEnd ->
+            {more, ResultAcc#{
+                cursor => NCursor,
+                count => Count + length(Rows),
+                rows => [{Node, Rows} | RowsAcc]
+            }};
+        NCursor when NCursor >= PageEnd ->
+            SubRows = lists:sublist(Rows, Limit - Count),
+            {enough, ResultAcc#{
+                cursor => NCursor,
+                count => Count + length(SubRows),
+                rows => [{Node, SubRows} | RowsAcc]
+            }}
+    end.
 
 %%--------------------------------------------------------------------
 %% Table Select
 %%--------------------------------------------------------------------
 
-select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit, FmtFun) when
+select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit) when
     is_function(FuzzyFilterFun) andalso Limit > 0
 ->
     case ets:select(Tab, Ms, Limit) of
         '$end_of_table' ->
-            {0, [], ?FRESH_SELECT};
+            {[], ?FRESH_SELECT};
         {RawResult, NContinuation} ->
             Rows = FuzzyFilterFun(RawResult),
-            {length(Rows), lists:map(FmtFun, Rows), NContinuation}
+            {Rows, NContinuation}
     end;
-select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit, FmtFun) when
+select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit) when
     is_function(FuzzyFilterFun)
 ->
     case ets:select(ets:repair_continuation(Continuation, Ms)) of
         '$end_of_table' ->
-            {0, [], ?FRESH_SELECT};
+            {[], ?FRESH_SELECT};
         {RawResult, NContinuation} ->
             Rows = FuzzyFilterFun(RawResult),
-            {length(Rows), lists:map(FmtFun, Rows), NContinuation}
+            {Rows, NContinuation}
     end;
-select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit, FmtFun) when
+select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit) when
     Limit > 0
 ->
     case ets:select(Tab, Ms, Limit) of
         '$end_of_table' ->
-            {0, [], ?FRESH_SELECT};
+            {[], ?FRESH_SELECT};
         {RawResult, NContinuation} ->
-            {length(RawResult), lists:map(FmtFun, RawResult), NContinuation}
+            {RawResult, NContinuation}
     end;
-select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) ->
+select_table_with_count(_Tab, Ms, Continuation, _Limit) ->
     case ets:select(ets:repair_continuation(Continuation, Ms)) of
         '$end_of_table' ->
-            {0, [], ?FRESH_SELECT};
+            {[], ?FRESH_SELECT};
         {RawResult, NContinuation} ->
-            {length(RawResult), lists:map(FmtFun, RawResult), NContinuation}
+            {RawResult, NContinuation}
     end.
 
 %%--------------------------------------------------------------------
@@ -379,40 +398,38 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
 is_fuzzy_key(_) ->
     false.
 
-page_start(1, _) -> 1;
-page_start(Page, Limit) -> (Page - 1) * Limit + 1.
+format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) ->
+    Error;
+format_query_result(
+    FmtFun, Meta, _ResultAcc = #{count := _Count, cursor := Cursor, rows := RowsAcc}
+) ->
+    #{
+        meta => Meta#{count => Cursor},
+        data => lists:flatten(
+            lists:foldr(
+                fun({Node, Rows}, Acc) ->
+                    [lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc]
+                end,
+                [],
+                RowsAcc
+            )
+        )
+    }.
 
-judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) ->
-    PageStart = page_start(Page, Limit),
-    PageEnd = Page * Limit,
-    case Count + Len of
-        NCount when NCount < PageStart ->
-            {more, Meta#{count => NCount}};
-        NCount when NCount < PageEnd ->
-            {cutrows, Meta#{count => NCount}};
-        NCount when NCount >= PageEnd ->
-            {enough, Meta#{count => NCount}}
+exec_format_fun(FmtFun, Node, Row) ->
+    case erlang:fun_info(FmtFun, arity) of
+        {arity, 1} -> FmtFun(Row);
+        {arity, 2} -> FmtFun(Node, Row)
     end.
 
-rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) ->
-    PageStart = page_start(Page, Limit),
-    case (Count - Len) < PageStart of
+parse_pager_params(Params) ->
+    Page = b2i(page(Params)),
+    Limit = b2i(limit(Params)),
+    case Page > 0 andalso Limit > 0 of
         true ->
-            NeedNowNum = Count - PageStart + 1,
-            SubStart = Len - NeedNowNum + 1,
-            {SubStart, NeedNowNum};
+            #{page => Page, limit => Limit, count => 0};
         false ->
-            {_SubStart = 1, _NeedNowNum = Len}
-    end.
-
-page_limit_check_query(Meta, {F, A}) ->
-    case Meta of
-        #{page := Page, limit := Limit} when
-            Page < 1; Limit < 1
-        ->
-            {error, page_limit_invalid};
-        _ ->
-            erlang:apply(F, A)
+            false
     end.
 
 %%--------------------------------------------------------------------

+ 13 - 7
apps/emqx_management/src/emqx_mgmt_api_alarms.erl

@@ -24,7 +24,7 @@
 
 -export([api_spec/0, paths/0, schema/1, fields/1]).
 
--export([alarms/2]).
+-export([alarms/2, format_alarm/2]).
 
 -define(TAGS, [<<"Alarms">>]).
 
@@ -112,7 +112,15 @@ alarms(get, #{query_string := QString}) ->
             true -> ?ACTIVATED_ALARM;
             false -> ?DEACTIVATED_ALARM
         end,
-    case emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}) of
+    case
+        emqx_mgmt_api:cluster_query(
+            QString,
+            Table,
+            [],
+            {?MODULE, query},
+            fun ?MODULE:format_alarm/2
+        )
+    of
         {error, page_limit_invalid} ->
             {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
         {error, Node, {badrpc, R}} ->
@@ -130,9 +138,7 @@ alarms(delete, _Params) ->
 
 query(Table, _QsSpec, Continuation, Limit) ->
     Ms = [{'$1', [], ['$1']}],
-    emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_alarm/1).
+    emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit).
 
-format_alarm(Alarms) when is_list(Alarms) ->
-    [emqx_alarm:format(Alarm) || Alarm <- Alarms];
-format_alarm(Alarm) ->
-    emqx_alarm:format(Alarm).
+format_alarm(WhichNode, Alarm) ->
+    emqx_alarm:format(WhichNode, Alarm).

+ 13 - 13
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -47,7 +47,8 @@
 
 -export([
     query/4,
-    format_channel_info/1
+    format_channel_info/1,
+    format_channel_info/2
 ]).
 
 %% for batch operation
@@ -645,7 +646,8 @@ list_clients(QString) ->
                     QString,
                     ?CLIENT_QTAB,
                     ?CLIENT_QSCHEMA,
-                    ?QUERY_FUN
+                    ?QUERY_FUN,
+                    fun ?MODULE:format_channel_info/2
                 );
             Node0 ->
                 case emqx_misc:safe_to_existing_atom(Node0) of
@@ -656,7 +658,8 @@ list_clients(QString) ->
                             QStringWithoutNode,
                             ?CLIENT_QTAB,
                             ?CLIENT_QSCHEMA,
-                            ?QUERY_FUN
+                            ?QUERY_FUN,
+                            fun ?MODULE:format_channel_info/2
                         );
                     {error, _} ->
                         {error, Node0, {badrpc, <<"invalid node">>}}
@@ -789,8 +792,7 @@ query(Tab, {QString, []}, Continuation, Limit) ->
         Tab,
         Ms,
         Continuation,
-        Limit,
-        fun format_channel_info/1
+        Limit
     );
 query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
     Ms = qs2ms(QString),
@@ -799,8 +801,7 @@ query(Tab, {QString, FuzzyQString}, Continuation, Limit) ->
         Tab,
         {Ms, FuzzyFilterFun},
         Continuation,
-        Limit,
-        fun format_channel_info/1
+        Limit
     ).
 
 %%--------------------------------------------------------------------
@@ -876,12 +877,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} |
 %%--------------------------------------------------------------------
 %% format funcs
 
-format_channel_info({_, ClientInfo0, ClientStats}) ->
-    Node =
-        case ClientInfo0 of
-            #{node := N} -> N;
-            _ -> node()
-        end,
+format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) ->
+    format_channel_info(node(), ChannInfo).
+
+format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
+    Node = maps:get(node, ClientInfo0, WhichNode),
     ClientInfo1 = emqx_map_lib:deep_remove([conninfo, clientid], ClientInfo0),
     ClientInfo2 = emqx_map_lib:deep_remove([conninfo, username], ClientInfo1),
     StatsMap = maps:without(

+ 9 - 13
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -33,7 +33,7 @@
 
 -export([
     query/4,
-    format/1
+    format/2
 ]).
 
 -define(SUBS_QTABLE, emqx_suboption).
@@ -142,7 +142,8 @@ subscriptions(get, #{query_string := QString}) ->
                     QString,
                     ?SUBS_QTABLE,
                     ?SUBS_QSCHEMA,
-                    ?QUERY_FUN
+                    ?QUERY_FUN,
+                    fun ?MODULE:format/2
                 );
             Node0 ->
                 case emqx_misc:safe_to_existing_atom(Node0) of
@@ -152,7 +153,8 @@ subscriptions(get, #{query_string := QString}) ->
                             QString,
                             ?SUBS_QTABLE,
                             ?SUBS_QSCHEMA,
-                            ?QUERY_FUN
+                            ?QUERY_FUN,
+                            fun ?MODULE:format/2
                         );
                     {error, _} ->
                         {error, Node0, {badrpc, <<"invalid node">>}}
@@ -168,16 +170,12 @@ subscriptions(get, #{query_string := QString}) ->
             {200, Result}
     end.
 
-format(Items) when is_list(Items) ->
-    [format(Item) || Item <- Items];
-format({{Subscriber, Topic}, Options}) ->
-    format({Subscriber, Topic, Options});
-format({_Subscriber, Topic, Options}) ->
+format(WhichNode, {{_Subscriber, Topic}, Options}) ->
     maps:merge(
         #{
             topic => get_topic(Topic, Options),
             clientid => maps:get(subid, Options),
-            node => node()
+            node => WhichNode
         },
         maps:with([qos, nl, rap, rh], Options)
     ).
@@ -199,8 +197,7 @@ query(Tab, {Qs, []}, Continuation, Limit) ->
         Tab,
         Ms,
         Continuation,
-        Limit,
-        fun format/1
+        Limit
     );
 query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
     Ms = qs2ms(Qs),
@@ -209,8 +206,7 @@ query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
         Tab,
         {Ms, FuzzyFilterFun},
         Continuation,
-        Limit,
-        fun format/1
+        Limit
     ).
 
 fuzzy_filter_fun(Fuzzy) ->

+ 6 - 1
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -109,7 +109,12 @@ topic(get, #{bindings := Bindings}) ->
 do_list(Params) ->
     case
         emqx_mgmt_api:node_query(
-            node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query}
+            node(),
+            Params,
+            emqx_route,
+            ?TOPICS_QUERY_SCHEMA,
+            {?MODULE, query},
+            fun ?MODULE:format/1
         )
     of
         {error, page_limit_invalid} ->

+ 5 - 2
apps/emqx_modules/src/emqx_delayed.erl

@@ -166,11 +166,14 @@ list(Params) ->
     emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN).
 
 cluster_list(Params) ->
-    emqx_mgmt_api:cluster_query(Params, ?TAB, [], {?MODULE, cluster_query}).
+    %% FIXME: why cluster_query???
+    emqx_mgmt_api:cluster_query(
+        Params, ?TAB, [], {?MODULE, cluster_query}, fun ?MODULE:format_delayed/1
+    ).
 
 cluster_query(Table, _QsSpec, Continuation, Limit) ->
     Ms = [{'$1', [], ['$1']}],
-    emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_delayed/1).
+    emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit).
 
 format_delayed(Delayed) ->
     format_delayed(Delayed, false).

+ 3 - 2
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -277,7 +277,8 @@ param_path_id() ->
             QueryString,
             ?RULE_TAB,
             ?RULE_QS_SCHEMA,
-            {?MODULE, query}
+            {?MODULE, query},
+            fun ?MODULE:format_rule_resp/1
         )
     of
         {error, page_limit_invalid} ->
@@ -556,7 +557,7 @@ query(Tab, {Qs, Fuzzy}, Start, Limit) ->
     Ms = qs2ms(),
     FuzzyFun = fuzzy_match_fun(Qs, Ms, Fuzzy),
     emqx_mgmt_api:select_table_with_count(
-        Tab, {Ms, FuzzyFun}, Start, Limit, fun format_rule_resp/1
+        Tab, {Ms, FuzzyFun}, Start, Limit
     ).
 
 %% rule is not a record, so everything is fuzzy filter.