Przeglądaj źródła

feat(queue): add stub for CRUD API

Ilya Averyanov 1 rok temu
rodzic
commit
303ff95e10

+ 218 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -0,0 +1,218 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_api).
+
+-behaviour(minirest_api).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% Swagger specs from hocon schema
+-export([
+    api_spec/0,
+    paths/0,
+    schema/1,
+    namespace/0
+]).
+
+-export([
+    fields/1,
+    roots/0
+]).
+
+-define(TAGS, [<<"Durable Queues">>]).
+
+%% API callbacks
+-export([
+    '/durable_queues'/2,
+    '/durable_queues/:id'/2
+]).
+
+-import(hoconsc, [mk/2, ref/1, ref/2]).
+-import(emqx_dashboard_swagger, [error_codes/2]).
+
+namespace() -> "durable_queues".
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/durable_queues",
+        "/durable_queues/:id"
+    ].
+
+-define(NOT_FOUND, 'NOT_FOUND').
+
+schema("/durable_queues") ->
+    #{
+        'operationId' => '/durable_queues',
+        get => #{
+            tags => ?TAGS,
+            summary => <<"List declared durable queues">>,
+            description => ?DESC("durable_queues_get"),
+            responses => #{
+                200 => emqx_dashboard_swagger:schema_with_example(
+                    durable_queues_get(),
+                    durable_queues_get_example()
+                )
+            }
+        }
+    };
+schema("/durable_queues/:id") ->
+    #{
+        'operationId' => '/durable_queues/:id',
+        get => #{
+            tags => ?TAGS,
+            summary => <<"Get a declared durable queue">>,
+            description => ?DESC("durable_queue_get"),
+            parameters => [param_queue_id()],
+            responses => #{
+                200 => emqx_dashboard_swagger:schema_with_example(
+                    durable_queue_get(),
+                    durable_queue_get_example()
+                ),
+                404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>)
+            }
+        },
+        delete => #{
+            tags => ?TAGS,
+            summary => <<"Delete a declared durable queue">>,
+            description => ?DESC("durable_queue_delete"),
+            parameters => [param_queue_id()],
+            responses => #{
+                200 => <<"Queue deleted">>,
+                404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>)
+            }
+        },
+        put => #{
+            tags => ?TAGS,
+            summary => <<"Declare a durable queue">>,
+            description => ?DESC("durable_queues_put"),
+            parameters => [param_queue_id()],
+            'requestBody' => durable_queue_put(),
+            responses => #{
+                200 => emqx_dashboard_swagger:schema_with_example(
+                    durable_queue_get(),
+                    durable_queue_get_example()
+                )
+            }
+        }
+    }.
+
+'/durable_queues'(get, _Params) ->
+    {200, queue_list()}.
+
+'/durable_queues/:id'(get, Params) ->
+    case queue_get(Params) of
+        {ok, Queue} -> {200, Queue};
+        not_found -> serialize_error(not_found)
+    end;
+'/durable_queues/:id'(delete, Params) ->
+    case queue_delete(Params) of
+        ok -> {200, <<"Queue deleted">>};
+        not_found -> serialize_error(not_found)
+    end;
+'/durable_queues/:id'(put, Params) ->
+    {200, queue_put(Params)}.
+
+%%--------------------------------------------------------------------
+%% Actual handlers: stubs
+%%--------------------------------------------------------------------
+
+queue_list() ->
+    persistent_term:get({?MODULE, queues}, []).
+
+queue_get(#{bindings := #{id := ReqId}}) ->
+    case [Q || #{id := Id} = Q <- queue_list(), Id =:= ReqId] of
+        [Queue] -> {ok, Queue};
+        [] -> not_found
+    end.
+
+queue_delete(#{bindings := #{id := ReqId}}) ->
+    Queues0 = queue_list(),
+    Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId],
+    persistent_term:put({?MODULE, queues}, Queues1),
+    case Queues0 =:= Queues1 of
+        true -> not_found;
+        false -> ok
+    end.
+
+queue_put(#{bindings := #{id := ReqId}}) ->
+    Queues0 = queue_list(),
+    Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId],
+    NewQueue = #{
+        id => ReqId
+    },
+    Queues2 = [NewQueue | Queues1],
+    persistent_term:put({?MODULE, queues}, Queues2),
+    NewQueue.
+
+%%--------------------------------------------------------------------
+%% Schemas
+%%--------------------------------------------------------------------
+
+param_queue_id() ->
+    {
+        id,
+        mk(binary(), #{
+            in => path,
+            desc => ?DESC(param_queue_id),
+            required => true,
+            validator => fun validate_queue_id/1
+        })
+    }.
+
+validate_queue_id(Id) ->
+    case emqx_topic:words(Id) of
+        [Segment] when is_binary(Segment) -> true;
+        _ -> {error, <<"Invalid queue id">>}
+    end.
+
+durable_queues_get() ->
+    hoconsc:array(ref(durable_queue_get)).
+
+durable_queue_get() ->
+    ref(durable_queue_get).
+
+durable_queue_put() ->
+    map().
+
+roots() -> [].
+
+fields(durable_queue_get) ->
+    [
+        {id, mk(binary(), #{})}
+    ].
+
+%%--------------------------------------------------------------------
+%% Examples
+%%--------------------------------------------------------------------
+
+durable_queue_get_example() ->
+    #{
+        id => <<"queue1">>
+    }.
+
+durable_queues_get_example() ->
+    [
+        #{
+            id => <<"queue1">>
+        },
+        #{
+            id => <<"queue2">>
+        }
+    ].
+
+%%--------------------------------------------------------------------
+%% Error codes
+%%--------------------------------------------------------------------
+
+serialize_error(not_found) ->
+    {404, #{
+        code => <<"NOT_FOUND">>,
+        message => <<"Queue Not Found">>
+    }}.

+ 140 - 0
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -0,0 +1,140 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-import(
+    emqx_mgmt_api_test_util,
+    [
+        request_api/2,
+        request/3,
+        uri/1
+    ]
+).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{
+                config => #{
+                    <<"durable_sessions">> => #{
+                        <<"enable">> => true,
+                        <<"renew_streams_interval">> => "100ms"
+                    },
+                    <<"durable_storage">> => #{
+                        <<"messages">> => #{
+                            <<"backend">> => <<"builtin_raft">>
+                        }
+                    }
+                }
+            }},
+            emqx_ds_shared_sub,
+            emqx_management,
+            emqx_mgmt_api_test_util:emqx_dashboard()
+        ],
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
+    ok.
+
+init_per_testcase(_TC, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_TC, _Config) ->
+    ok = snabbkaffe:stop(),
+    ok = terminate_leaders(),
+    ok.
+%%--------------------------------------------------------------------
+%% Tests
+%%--------------------------------------------------------------------
+
+t_basic_crud(_Config) ->
+    ?assertMatch(
+        {ok, []},
+        api_get(["durable_queues"])
+    ),
+
+    ?assertMatch(
+        {ok, 200, #{
+            <<"id">> := <<"q1">>
+        }},
+        api(put, ["durable_queues", "q1"], #{})
+    ),
+
+    ?assertMatch(
+        {error, {_, 404, _}},
+        api_get(["durable_queues", "q2"])
+    ),
+
+    ?assertMatch(
+        {ok, 200, #{
+            <<"id">> := <<"q2">>
+        }},
+        api(put, ["durable_queues", "q2"], #{})
+    ),
+
+    ?assertMatch(
+        {ok, #{
+            <<"id">> := <<"q2">>
+        }},
+        api_get(["durable_queues", "q2"])
+    ),
+
+    ?assertMatch(
+        {ok, [#{<<"id">> := <<"q2">>}, #{<<"id">> := <<"q1">>}]},
+        api_get(["durable_queues"])
+    ),
+
+    ?assertMatch(
+        {ok, 200, <<"Queue deleted">>},
+        api(delete, ["durable_queues", "q2"], #{})
+    ),
+
+    ?assertMatch(
+        {ok, [#{<<"id">> := <<"q1">>}]},
+        api_get(["durable_queues"])
+    ).
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+api_get(Path) ->
+    case request_api(get, uri(Path)) of
+        {ok, ResponseBody} ->
+            {ok, jiffy:decode(list_to_binary(ResponseBody), [return_maps])};
+        {error, _} = Error ->
+            Error
+    end.
+
+api(Method, Path, Data) ->
+    case request(Method, uri(Path), Data) of
+        {ok, Code, ResponseBody} ->
+            Res =
+                case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of
+                    {ok, Decoded} -> Decoded;
+                    {error, _} -> ResponseBody
+                end,
+            {ok, Code, Res};
+        {error, _} = Error ->
+            Error
+    end.
+
+terminate_leaders() ->
+    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
+    {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
+    ok.

+ 34 - 0
rel/i18n/emqx_ds_shared_sub_api.hocon

@@ -0,0 +1,34 @@
+emqx_ds_shared_sub_api {
+
+param_queue_id.desc:
+"""The ID of the durable queue."""
+
+param_queue_id.label:
+"""Queue ID"""
+
+durable_queues_get.desc:
+"""Get the list of durable queues."""
+
+durable_queues_get.label:
+"""Durable Queues"""
+
+durable_queue_get.desc:
+"""Get the information of a durable queue."""
+
+durable_queue_get.label:
+"""Durable Queue"""
+
+durable_queue_delete.desc:
+"""Delete a durable queue."""
+
+durable_queue_delete.label:
+"""Delete Durable Queue"""
+
+durable_queues_put.desc:
+"""Create a durable queue."""
+
+durable_queues_put.label:
+"""Create Durable Queue"""
+
+
+}