Pārlūkot izejas kodu

feat(dssubs): support paging in List API

Andrew Mayorov 1 gadu atpakaļ
vecāks
revīzija
2d9d7e8197

+ 84 - 27
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -10,6 +10,11 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-define(DESC_BAD_REQUEST, <<"Erroneous request">>).
+-define(RESP_BAD_REQUEST(MSG),
+    {400, #{code => <<"BAD_REQUEST">>, message => MSG}}
+).
+
 -define(DESC_NOT_FOUND, <<"Queue not found">>).
 -define(RESP_NOT_FOUND,
     {404, #{code => <<"NOT_FOUND">>, message => ?DESC_NOT_FOUND}}
@@ -71,11 +76,13 @@ schema("/durable_queues") ->
             tags => ?TAGS,
             summary => <<"List declared durable queues">>,
             description => ?DESC("durable_queues_get"),
+            parameters => [
+                hoconsc:ref(emqx_dashboard_swagger, cursor),
+                hoconsc:ref(emqx_dashboard_swagger, limit)
+            ],
             responses => #{
-                200 => emqx_dashboard_swagger:schema_with_example(
-                    durable_queues_get(),
-                    durable_queues_get_example()
-                )
+                200 => resp_list_durable_queues(),
+                400 => error_codes(['BAD_REQUEST'], ?DESC_BAD_REQUEST)
             }
         },
         post => #{
@@ -84,10 +91,7 @@ schema("/durable_queues") ->
             description => ?DESC("durable_queues_post"),
             'requestBody' => durable_queue_post(),
             responses => #{
-                201 => emqx_dashboard_swagger:schema_with_example(
-                    durable_queue_get(),
-                    durable_queue_get_example()
-                ),
+                201 => resp_create_durable_queue(),
                 409 => error_codes(['CONFLICT'], ?DESC_CREATE_CONFICT)
             }
         }
@@ -121,8 +125,23 @@ schema("/durable_queues/:id") ->
         }
     }.
 
-'/durable_queues'(get, _Params) ->
-    {200, [encode_props(ID, Props) || {ID, Props} <- queue_list()]};
+'/durable_queues'(get, #{query_string := QString}) ->
+    Cursor = maps:get(<<"cursor">>, QString, undefined),
+    Limit = maps:get(<<"limit">>, QString),
+    try queue_list(Cursor, Limit) of
+        {Queues, CursorNext} ->
+            Data = [encode_props(ID, Props) || {ID, Props} <- Queues],
+            case CursorNext of
+                undefined ->
+                    Meta = #{hasnext => false};
+                _Cursor ->
+                    Meta = #{hasnext => true, cursor => CursorNext}
+            end,
+            {200, #{data => Data, meta => Meta}}
+    catch
+        throw:Error ->
+            ?RESP_BAD_REQUEST(emqx_utils:readable_error_msg(Error))
+    end;
 '/durable_queues'(post, #{body := Params}) ->
     case queue_declare(Params) of
         {ok, Queue} ->
@@ -152,8 +171,8 @@ schema("/durable_queues/:id") ->
             ?RESP_INTERNAL_ERROR(emqx_utils:readable_error_msg(Reason))
     end.
 
-queue_list() ->
-    emqx_ds_shared_sub_queue:list().
+queue_list(Cursor, Limit) ->
+    emqx_ds_shared_sub_queue:list(Cursor, Limit).
 
 queue_get(#{bindings := #{id := ID}}) ->
     emqx_ds_shared_sub_queue:lookup(ID).
@@ -192,26 +211,50 @@ param_queue_id() ->
         })
     }.
 
+resp_list_durable_queues() ->
+    emqx_dashboard_swagger:schema_with_example(
+        ref(resp_list_durable_queues),
+        durable_queues_list_example()
+    ).
+
+resp_create_durable_queue() ->
+    emqx_dashboard_swagger:schema_with_example(
+        durable_queue_post(),
+        durable_queue_get_example()
+    ).
+
 validate_queue_id(Id) ->
     case emqx_topic:words(Id) of
         [Segment] when is_binary(Segment) -> true;
         _ -> {error, <<"Invalid queue id">>}
     end.
 
-durable_queues_get() ->
-    hoconsc:array(ref(durable_queue_get)).
-
 durable_queue_get() ->
-    ref(durable_queue_get).
+    ref(durable_queue).
 
 durable_queue_post() ->
     map().
 
 roots() -> [].
 
-fields(durable_queue_get) ->
+fields(durable_queue) ->
+    [
+        {id,
+            mk(binary(), #{
+                desc => <<"Identifier assigned at creation time">>
+            })},
+        {created_at,
+            mk(emqx_utils_calendar:epoch_millisecond(), #{
+                desc => <<"Queue creation time">>
+            })},
+        {group, mk(binary(), #{})},
+        {topic, mk(binary(), #{})},
+        {start_time, mk(emqx_utils_calendar:epoch_millisecond(), #{})}
+    ];
+fields(resp_list_durable_queues) ->
     [
-        {id, mk(binary(), #{})}
+        {data, hoconsc:mk(hoconsc:array(durable_queue_get()), #{})},
+        {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta_with_cursor), #{})}
     ].
 
 %%--------------------------------------------------------------------
@@ -220,15 +263,29 @@ fields(durable_queue_get) ->
 
 durable_queue_get_example() ->
     #{
-        id => <<"queue1">>
+        id => <<"mygrp:1234EF">>,
+        created_at => <<"2024-01-01T12:34:56.789+02:00">>,
+        group => <<"mygrp">>,
+        topic => <<"t/devices/#">>,
+        start_time => <<"2024-01-01T00:00:00.000+02:00">>
     }.
 
-durable_queues_get_example() ->
-    [
-        #{
-            id => <<"queue1">>
-        },
-        #{
-            id => <<"queue2">>
+durable_queues_list_example() ->
+    #{
+        data => [
+            durable_queue_get_example(),
+            #{
+                id => <<"mygrp:567890AABBCC">>,
+                created_at => <<"2024-02-02T22:33:44.000+02:00">>,
+                group => <<"mygrp">>,
+                topic => <<"t/devices/#">>,
+                start_time => <<"1970-01-01T02:00:00+02:00">>
+            }
+        ],
+        %% TODO: Probably needs to be defined in `emqx_dashboard_swagger`.
+        meta => #{
+            <<"count">> => 2,
+            <<"cursor">> => <<"g2wAAAADYQFhAm0AAAACYzJq">>,
+            <<"hasnext">> => true
         }
-    ].
+    }.

+ 33 - 9
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_queue.erl

@@ -11,7 +11,7 @@
     declare/4,
     destroy/1,
     destroy/2,
-    list/0
+    list/2
 ]).
 
 -export([
@@ -74,8 +74,13 @@ destroy(ID) ->
             not_found
     end.
 
-list() ->
-    consume_select(emqx_ds_shared_sub_store:select(properties)).
+list(undefined, Limit) ->
+    list(select_properties(), Limit);
+list(Cursor, Limit) when is_binary(Cursor) ->
+    list(select_properties(Cursor), Limit);
+list(Select, Limit) ->
+    {Records, SelectNext} = emqx_ds_shared_sub_store:select_next(Select, Limit),
+    {Records, preserve_cursor(SelectNext)}.
 
 ensure_route(Topic, QueueID) ->
     _ = emqx_persistent_session_ds_router:do_add_route(Topic, QueueID),
@@ -93,14 +98,33 @@ ensure_delete_route(Topic, QueueID) ->
 
 %%
 
-consume_select(It0) ->
-    case emqx_ds_shared_sub_store:iter_next(It0, _ChunkSize = 100) of
-        {Records, end_of_iterator} ->
-            Records;
-        {Records, It} ->
-            Records ++ consume_select(It)
+select_properties() ->
+    emqx_ds_shared_sub_store:select(properties).
+
+select_properties(Cursor) ->
+    try
+        emqx_ds_shared_sub_store:select(properties, decode_cursor(Cursor))
+    catch
+        error:_ ->
+            throw("Invalid cursor")
+    end.
+
+preserve_cursor(end_of_iterator) ->
+    undefined;
+preserve_cursor(Select) ->
+    case emqx_ds_shared_sub_store:select_next(Select, 1) of
+        {[], end_of_iterator} ->
+            undefined;
+        {[_], _} ->
+            encode_cursor(emqx_ds_shared_sub_store:select_preserve(Select))
     end.
 
+encode_cursor(Cursor) ->
+    emqx_base62:encode(term_to_binary(Cursor)).
+
+decode_cursor(Cursor) ->
+    binary_to_term(emqx_base62:decode(Cursor), [safe]).
+
 %%
 
 id(Queue) ->

+ 57 - 24
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_store.erl

@@ -51,7 +51,9 @@
 
 -export([
     select/1,
-    iter_next/2
+    select/2,
+    select_next/2,
+    select_preserve/1
 ]).
 
 %% Messing with IDs
@@ -301,6 +303,7 @@ mk_leader_topic(ID) ->
 
 -type properties() :: #{
     %% TODO: Efficient encoding.
+    group => emqx_types:group(),
     topic => emqx_types:topic(),
     start_time => emqx_message:timestamp(),
     created_at => emqx_message:timestamp()
@@ -311,9 +314,6 @@ mk_leader_topic(ID) ->
     rank => emqx_ds:stream_rank()
 }.
 
--type iter_select() ::
-    {emqx_ds:topic_filter(), emqx_ds:iterator(), [emqx_ds:stream()]} | nil().
-
 -spec init(id()) -> t().
 init(ID) ->
     %% NOTE: Empty store is impicitly dirty because rootset needs to be persisted.
@@ -707,20 +707,36 @@ mk_store_payload(?STORE_SK(_SpaceName, ID), Value) ->
 mk_store_payload(_VarName, Value) ->
     Value.
 
--spec select(var_name()) -> iter_select().
+%%
+
+-record(select, {tf, start, it, streams}).
+
+-type select() :: #select{}.
+
+-spec select(var_name()) -> select().
 select(VarName) ->
-    TopicFilter = mk_store_varname_wildcard(VarName),
-    Streams = [S || {_Rank, S} <- emqx_ds:get_streams(?DS_DB, TopicFilter, _StartTime = 0)],
-    iter_jump(TopicFilter, Streams).
+    select_jump(select_new(VarName)).
 
-iter_jump(TopicFilter, [Stream | Rest]) ->
-    {TopicFilter, ds_make_iterator(Stream, TopicFilter, _StartTime = 0), Rest};
-iter_jump(_TopicFilter, []) ->
-    [].
+-spec select(var_name(), _Cursor) -> select().
+select(VarName, Cursor) ->
+    select_restore(Cursor, select_new(VarName)).
 
--spec iter_next(iter_select(), _N) -> {[{id(), _Var}], iter_select() | end_of_iterator}.
-iter_next(ItSelect, N) ->
-    iter_fold(
+select_new(VarName) ->
+    TopicFilter = mk_store_varname_wildcard(VarName),
+    StartTime = 0,
+    RankedStreams = emqx_ds:get_streams(?DS_DB, TopicFilter, StartTime),
+    Streams = [S || {_Rank, S} <- lists:sort(RankedStreams)],
+    #select{tf = TopicFilter, start = StartTime, streams = Streams}.
+
+select_jump(It = #select{tf = TopicFilter, start = StartTime, streams = [Stream | Rest]}) ->
+    DSIt = ds_make_iterator(Stream, TopicFilter, StartTime),
+    It#select{it = DSIt, streams = Rest};
+select_jump(It = #select{streams = []}) ->
+    It#select{it = undefined}.
+
+-spec select_next(select(), _N) -> {[{id(), _Var}], select() | end_of_iterator}.
+select_next(ItSelect, N) ->
+    select_fold(
         ItSelect,
         N,
         fun(Message, Acc) ->
@@ -729,21 +745,38 @@ iter_next(ItSelect, N) ->
         []
     ).
 
-iter_fold({TopicFilter, It0, Streams}, N, Fun, Acc0) ->
-    case emqx_ds:next(?DS_DB, It0, N) of
-        {ok, It, Messages} ->
+select_fold(#select{it = undefined}, _, _Fun, Acc) ->
+    {Acc, end_of_iterator};
+select_fold(It = #select{it = DSIt0}, N, Fun, Acc0) ->
+    case emqx_ds:next(?DS_DB, DSIt0, N) of
+        {ok, DSIt, Messages} ->
             Acc = lists:foldl(fun({_Key, Msg}, Acc) -> Fun(Msg, Acc) end, Acc0, Messages),
             case length(Messages) of
                 N ->
-                    {Acc, {TopicFilter, It, Streams}};
+                    {Acc, It#select{it = DSIt}};
                 NLess when NLess < N ->
-                    iter_fold(iter_jump(TopicFilter, Streams), N - NLess, Fun, Acc)
+                    select_fold(select_jump(It), N - NLess, Fun, Acc)
             end;
         {ok, end_of_stream} ->
-            iter_fold(iter_jump(TopicFilter, Streams), N, Fun, Acc0)
-    end;
-iter_fold([], _N, _Fun, Acc) ->
-    {Acc, end_of_iterator}.
+            select_fold(select_jump(It), N, Fun, Acc0)
+    end.
+
+-spec select_preserve(select()) -> _Cursor.
+select_preserve(#select{it = It, streams = Streams}) ->
+    case Streams of
+        [StreamNext | _Rest] ->
+            %% Preserve only the subsequent stream.
+            [It | StreamNext];
+        [] ->
+            %% Iterating over last stream, preserve only iterator.
+            [It]
+    end.
+
+select_restore([It], Select) ->
+    Select#select{it = It, streams = []};
+select_restore([It | StreamNext], Select = #select{streams = Streams}) ->
+    StreamsRest = lists:dropwhile(fun(S) -> S =/= StreamNext end, Streams),
+    Select#select{it = It, streams = StreamsRest}.
 
 mk_store_root_topic(ID) ->
     emqx_topic:join([?STORE_TOPIC_PREFIX, ID]).

+ 112 - 20
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -10,14 +10,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--import(
-    emqx_mgmt_api_test_util,
-    [
-        request_api/2,
-        request/3,
-        uri/1
-    ]
-).
+-import(emqx_mgmt_api_test_util, [uri/1]).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -33,10 +26,12 @@ init_per_suite(Config) ->
                     },
                     <<"durable_storage">> => #{
                         <<"messages">> => #{
-                            <<"backend">> => <<"builtin_raft">>
+                            <<"backend">> => <<"builtin_raft">>,
+                            <<"n_shards">> => 4
                         },
                         <<"queues">> => #{
-                            <<"backend">> => <<"builtin_raft">>
+                            <<"backend">> => <<"builtin_raft">>,
+                            <<"n_shards">> => 4
                         }
                     }
                 }
@@ -67,6 +62,7 @@ init_per_testcase(_TC, Config) ->
 end_per_testcase(_TC, _Config) ->
     ok = snabbkaffe:stop(),
     ok = emqx_ds_shared_sub_registry:purge(),
+    ok = destroy_queues(),
     ok.
 %%--------------------------------------------------------------------
 %% Tests
@@ -74,7 +70,7 @@ end_per_testcase(_TC, _Config) ->
 
 t_basic_crud(_Config) ->
     ?assertMatch(
-        {ok, []},
+        {ok, #{<<"data">> := []}},
         api_get(["durable_queues"])
     ),
 
@@ -126,7 +122,7 @@ t_basic_crud(_Config) ->
 
     {ok, 201, #{<<"id">> := QueueID2}} = Resp2,
     ?assertMatch(
-        {ok, [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}]},
+        {ok, #{<<"data">> := [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}]}},
         api_get(["durable_queues"])
     ),
 
@@ -140,10 +136,90 @@ t_basic_crud(_Config) ->
     ),
 
     ?assertMatch(
-        {ok, [#{<<"id">> := QueueID2}]},
+        {ok, #{<<"data">> := [#{<<"id">> := QueueID2}]}},
         api_get(["durable_queues"])
     ).
 
+t_list_queues(_Config) ->
+    {ok, 201, #{<<"id">> := QID1}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq1">>,
+        <<"topic">> => <<"#">>,
+        <<"start_time">> => 42
+    }),
+    {ok, 201, #{<<"id">> := QID2}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq2">>,
+        <<"topic">> => <<"specific/topic">>,
+        <<"start_time">> => 0
+    }),
+    {ok, 201, #{<<"id">> := QID3}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq3">>,
+        <<"topic">> => <<"1/2/3/+">>,
+        <<"start_time">> => emqx_message:timestamp_now()
+    }),
+    {ok, 201, #{<<"id">> := QID4}} = api(post, ["durable_queues"], #{
+        <<"group">> => <<"glq4">>,
+        <<"topic">> => <<"4/5/6/#">>,
+        <<"start_time">> => emqx_message:timestamp_now()
+    }),
+
+    {ok, Resp} = api_get(["durable_queues"]),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}, #{}, #{}, #{}],
+            <<"meta">> := #{<<"hasnext">> := false}
+        },
+        Resp
+    ),
+
+    {ok, Resp1} = api_get(["durable_queues"], #{limit => <<"1">>}),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}],
+            <<"meta">> := #{<<"hasnext">> := true, <<"cursor">> := _}
+        },
+        Resp1
+    ),
+
+    {ok, Resp2} = api_get(["durable_queues"], #{
+        limit => <<"1">>,
+        cursor => emqx_utils_maps:deep_get([<<"meta">>, <<"cursor">>], Resp1)
+    }),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}],
+            <<"meta">> := #{<<"hasnext">> := true, <<"cursor">> := _}
+        },
+        Resp2
+    ),
+
+    {ok, Resp3} = api_get(["durable_queues"], #{
+        limit => <<"2">>,
+        cursor => emqx_utils_maps:deep_get([<<"meta">>, <<"cursor">>], Resp2)
+    }),
+    ?assertMatch(
+        #{
+            <<"data">> := [#{}, #{}],
+            <<"meta">> := #{<<"hasnext">> := false}
+        },
+        Resp3
+    ),
+
+    Data = maps:get(<<"data">>, Resp),
+    ?assertEqual(
+        [QID1, QID2, QID3, QID4],
+        lists:sort([ID || #{<<"id">> := ID} <- Data]),
+        Resp
+    ),
+
+    Data1 = maps:get(<<"data">>, Resp1),
+    Data2 = maps:get(<<"data">>, Resp2),
+    Data3 = maps:get(<<"data">>, Resp3),
+    ?assertEqual(
+        [QID1, QID2, QID3, QID4],
+        lists:sort([ID || D <- [Data1, Data2, Data3], #{<<"id">> := ID} <- D]),
+        [Resp1, Resp2, Resp3]
+    ).
+
 t_duplicate_queue(_Config) ->
     ?assertMatch(
         {ok, 201, #{
@@ -168,20 +244,36 @@ t_duplicate_queue(_Config) ->
         })
     ).
 
+%%--------------------------------------------------------------------
+
+destroy_queues() ->
+    case api_get(["durable_queues"], #{limit => <<"100">>}) of
+        {ok, #{<<"data">> := Queues}} ->
+            lists:foreach(fun destroy_queue/1, Queues);
+        Error ->
+            Error
+    end.
+
+destroy_queue(#{<<"id">> := QueueID}) ->
+    {ok, 200, _Deleted} = api(delete, ["durable_queues", QueueID], #{}).
+
 %%--------------------------------------------------------------------
 %% Helpers
 %%--------------------------------------------------------------------
 
 api_get(Path) ->
-    case request_api(get, uri(Path)) of
-        {ok, ResponseBody} ->
-            {ok, jiffy:decode(list_to_binary(ResponseBody), [return_maps])};
-        {error, _} = Error ->
-            Error
-    end.
+    api_response(emqx_mgmt_api_test_util:request_api(get, uri(Path))).
+
+api_get(Path, Query) ->
+    api_response(emqx_mgmt_api_test_util:request_api(get, uri(Path), Query, [])).
+
+api_response({ok, ResponseBody}) ->
+    {ok, jiffy:decode(iolist_to_binary(ResponseBody), [return_maps])};
+api_response({error, _} = Error) ->
+    Error.
 
 api(Method, Path, Data) ->
-    case request(Method, uri(Path), Data) of
+    case emqx_mgmt_api_test_util:request(Method, uri(Path), Data) of
         {ok, Code, ResponseBody} ->
             Res =
                 case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of

+ 9 - 2
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -108,8 +108,8 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, [], Opts) when
 ->
     NewUrl =
         case QueryParams of
-            "" -> Url;
-            _ -> Url ++ "?" ++ QueryParams
+            [] -> Url;
+            _ -> Url ++ "?" ++ build_query_string(QueryParams)
         end,
     do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}, Opts);
 request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when
@@ -165,6 +165,13 @@ simplify_result(Res) ->
 auth_header_() ->
     emqx_common_test_http:default_auth_header().
 
+build_query_string(Query = #{}) ->
+    build_query_string(maps:to_list(Query));
+build_query_string(Query = [{_, _} | _]) ->
+    uri_string:compose_query([{emqx_utils_conv:bin(K), V} || {K, V} <- Query]);
+build_query_string(QueryString) ->
+    unicode:characters_to_list(QueryString).
+
 build_http_header(X) when is_list(X) ->
     X;
 build_http_header(X) ->