Просмотр исходного кода

Merge pull request #14317 from keynslug/test/EEC-1119/accum-query

fix(mgmt): drop incorrect but inactive clause in paging accumulation
Andrew Mayorov 1 год назад
Родитель
Сommit
8900f1f8e3
2 измененных файлов с 99 добавлено и 16 удалено
  1. 98 16
      apps/emqx_management/src/emqx_mgmt_api.erl
  2. 1 0
      changes/ce/fix-14317.en.md

+ 98 - 16
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -54,7 +54,11 @@
 ]).
 
 -ifdef(TEST).
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
 -export([paginate_test_format/1]).
+
 -endif.
 
 -export_type([
@@ -557,18 +561,23 @@ accumulate_query_rows(
     Len = length(Rows),
     case Cursor + Len of
         NCursor when NCursor < PageStart ->
+            %% Haven't reached the required page.
             {more, ResultAcc#{cursor => NCursor}};
         NCursor when NCursor < PageEnd ->
+            %% Rows overlap with the page start
+            %% Throw away rows in the beginning belonging to the previous page(s).
             SubRows = lists:nthtail(max(0, PageStart - Cursor - 1), Rows),
             {more, ResultAcc#{
                 cursor => NCursor,
                 count => Count + length(SubRows),
                 rows => [{Node, SubRows} | RowsAcc]
             }};
-        NCursor when NCursor >= PageEnd + Limit ->
-            {enough, ResultAcc#{cursor => NCursor}};
         NCursor when NCursor >= PageEnd ->
-            SubRows = lists:sublist(Rows, Limit - Count),
+            %% Rows overlap with the page end (and potentially with the page start).
+            %% Throw away rows in the beginning belonging to the previous page(s).
+            %% Then throw away rows in the tail belonging to the next page(s).
+            PageRows = lists:nthtail(max(0, PageStart - Cursor - 1), Rows),
+            SubRows = lists:sublist(PageRows, Limit - Count),
             {enough, ResultAcc#{
                 cursor => NCursor,
                 count => Count + length(SubRows),
@@ -707,20 +716,19 @@ format_query_result(
         end,
     #{
         meta => Meta,
-        data => lists:flatten(
-            lists:foldl(
-                fun({Node, Rows}, Acc) ->
-                    [
-                        lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row, Opts) end, Rows)
-                        | Acc
-                    ]
-                end,
-                [],
-                RowsAcc
-            )
-        )
+        data => format_query_data(FmtFun, RowsAcc, Opts)
     }.
 
+format_query_data(FmtFun, RowsAcc, Opts) ->
+    %% NOTE: `RowsAcc` is reversed in the node-order, `lists:foldl/3` is correct here.
+    lists:foldl(
+        fun({Node, Rows}, Acc) ->
+            [exec_format_fun(FmtFun, Node, R, Opts) || R <- Rows] ++ Acc
+        end,
+        [],
+        RowsAcc
+    ).
+
 exec_format_fun(FmtFun, Node, Row, Opts) ->
     case erlang:fun_info(FmtFun, arity) of
         {arity, 1} -> FmtFun(Row);
@@ -813,7 +821,6 @@ b2i(Any) ->
 %%--------------------------------------------------------------------
 
 -ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
 
 params2qs_test_() ->
     QSchema = [
@@ -926,4 +933,79 @@ assert_paginate_results(Results, Size, Limit) ->
             ?_assertEqual(Size, length(AllData)),
             ?_assertEqual(Size, sets:size(sets:from_list(AllData)))
         ].
+
+accumulate_prop_test() ->
+    ?assert(proper:quickcheck(accumulate_prop(), [{numtests, 1000}])).
+
+accumulate_prop() ->
+    ?FORALL(
+        #{page := Page, limit := Limit, noderows := NodeRows},
+        emqx_proper_types:fixedmap(#{
+            page => page_t(),
+            limit => limit_t(),
+            noderows => noderows_t()
+        }),
+        begin
+            {Status, QRows} = accumulate_page_rows(Page, Limit, NodeRows),
+            {_Status, QRowsNext} = accumulate_page_rows(Page + 1, Limit, NodeRows),
+            measure(
+                #{
+                    "Limit" => Limit,
+                    "Page" => Page,
+                    "NRows" => length(QRows),
+                    "Complete" => emqx_utils_conv:int(Status == enough)
+                },
+                %% Verify page is non-empty if accumulation is complete.
+                accumulate_assert_nonempty(Status, Limit, QRows) and
+                    %% Verify rows across 2 consective pages form continuous sequence.
+                    accumulate_assert_continuous(QRows ++ QRowsNext)
+            )
+        end
+    ).
+
+accumulate_page_rows(Page, Limit, NodeRows) ->
+    QState = #{page => Page, limit => Limit},
+    {Status, #{rows := QRowsAcc}} = lists:foldl(
+        fun
+            ({Node, Rows}, {more, QRAcc}) ->
+                accumulate_query_rows(Node, Rows, QState, QRAcc);
+            (_NodeRows, {enough, QRAcc}) ->
+                {enough, QRAcc}
+        end,
+        {more, init_query_result()},
+        NodeRows
+    ),
+    QRows = format_query_data(fun(N, R) -> {N, R} end, QRowsAcc, #{}),
+    {Status, QRows}.
+
+accumulate_assert_nonempty(enough, Limit, QRows) ->
+    length(QRows) =:= Limit;
+accumulate_assert_nonempty(more, _Limit, _QRows) ->
+    true.
+
+accumulate_assert_continuous([{N, R1} | Rest = [{N, R2} | _]]) ->
+    (R2 - R1 =:= 1) andalso accumulate_assert_continuous(Rest);
+accumulate_assert_continuous([{_N1, _} | Rest = [{_N2, R} | _]]) ->
+    (R =:= 1) andalso accumulate_assert_continuous(Rest);
+accumulate_assert_continuous([_]) ->
+    true;
+accumulate_assert_continuous([]) ->
+    true.
+
+page_t() ->
+    pos_integer().
+
+limit_t() ->
+    emqx_proper_types:scaled(0.6, pos_integer()).
+
+noderows_t() ->
+    ?LET(
+        {Nodes, PageSize},
+        {pos_integer(), limit_t()},
+        [{N, lists:seq(1, PageSize)} || N <- lists:seq(1, Nodes)]
+    ).
+
+measure(NamedSamples, Test) ->
+    maps:fold(fun(Name, Sample, Acc) -> measure(Name, Sample, Acc) end, Test, NamedSamples).
+
 -endif.

+ 1 - 0
changes/ce/fix-14317.en.md

@@ -0,0 +1 @@
+Prevent potential issues where APIs involving paging may return empty pages, in case the internal APIs will be subtly misused in the future.