Преглед изворни кода

feat(dssubs): partially implement Management API handlers

Andrew Mayorov пре 1 година
родитељ
комит
9b740818e7

+ 68 - 55
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -10,6 +10,24 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-define(RESP_NOT_FOUND,
+    {404, #{
+        code => <<"NOT_FOUND">>,
+        message => <<"Queue not found">>
+    }}
+).
+
+-define(RESP_CREATE_CONFLICT,
+    {409, #{
+        code => <<"CONFLICT">>,
+        message => <<"Queue with given group name and topic filter already exists">>
+    }}
+).
+
+-define(RESP_INTERNAL_ERROR(MSG),
+    {500, #{code => <<"INTERNAL_ERROR">>, message => MSG}}
+).
+
 %% Swagger specs from hocon schema
 -export([
     api_spec/0,
@@ -60,6 +78,18 @@ schema("/durable_queues") ->
                     durable_queues_get_example()
                 )
             }
+        },
+        post => #{
+            tags => ?TAGS,
+            summary => <<"Declare a durable queue">>,
+            description => ?DESC("durable_queues_post"),
+            'requestBody' => durable_queue_post(),
+            responses => #{
+                201 => emqx_dashboard_swagger:schema_with_example(
+                    durable_queue_get(),
+                    durable_queue_get_example()
+                )
+            }
         }
     };
 schema("/durable_queues/:id") ->
@@ -87,69 +117,62 @@ schema("/durable_queues/:id") ->
                 200 => <<"Queue deleted">>,
                 404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>)
             }
-        },
-        put => #{
-            tags => ?TAGS,
-            summary => <<"Declare a durable queue">>,
-            description => ?DESC("durable_queues_put"),
-            parameters => [param_queue_id()],
-            'requestBody' => durable_queue_put(),
-            responses => #{
-                200 => emqx_dashboard_swagger:schema_with_example(
-                    durable_queue_get(),
-                    durable_queue_get_example()
-                )
-            }
         }
     }.
 
 '/durable_queues'(get, _Params) ->
-    {200, queue_list()}.
+    {200, queue_list()};
+'/durable_queues'(post, #{body := Params}) ->
+    case queue_declare(Params) of
+        {ok, Queue} ->
+            {201, encode_queue(Queue)};
+        exists ->
+            ?RESP_CREATE_CONFLICT;
+        {error, _Class, Reason} ->
+            ?RESP_INTERNAL_ERROR(emqx_utils:readable_error_msg(Reason))
+    end.
 
 '/durable_queues/:id'(get, Params) ->
     case queue_get(Params) of
-        {ok, Queue} -> {200, Queue};
-        not_found -> serialize_error(not_found)
+        Queue when Queue =/= false ->
+            {200, encode_queue(Queue)};
+        false ->
+            ?RESP_NOT_FOUND
     end;
 '/durable_queues/:id'(delete, Params) ->
     case queue_delete(Params) of
-        ok -> {200, <<"Queue deleted">>};
-        not_found -> serialize_error(not_found)
-    end;
-'/durable_queues/:id'(put, Params) ->
-    {200, queue_put(Params)}.
+        ok ->
+            {200, <<"Queue deleted">>};
+        not_found ->
+            ?RESP_NOT_FOUND
+    end.
 
 %%--------------------------------------------------------------------
 %% Actual handlers: stubs
 %%--------------------------------------------------------------------
 
 queue_list() ->
-    persistent_term:get({?MODULE, queues}, []).
+    %% TODO
+    [].
 
-queue_get(#{bindings := #{id := ReqId}}) ->
-    case [Q || #{id := Id} = Q <- queue_list(), Id =:= ReqId] of
-        [Queue] -> {ok, Queue};
-        [] -> not_found
-    end.
+queue_get(#{bindings := #{id := ID}}) ->
+    emqx_ds_shared_sub_queue:lookup(ID).
 
-queue_delete(#{bindings := #{id := ReqId}}) ->
-    Queues0 = queue_list(),
-    Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId],
-    persistent_term:put({?MODULE, queues}, Queues1),
-    case Queues0 =:= Queues1 of
-        true -> not_found;
-        false -> ok
-    end.
+queue_delete(#{bindings := #{id := ID}}) ->
+    emqx_ds_shared_sub_queue:destroy(ID).
 
-queue_put(#{bindings := #{id := ReqId}}) ->
-    Queues0 = queue_list(),
-    Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId],
-    NewQueue = #{
-        id => ReqId
-    },
-    Queues2 = [NewQueue | Queues1],
-    persistent_term:put({?MODULE, queues}, Queues2),
-    NewQueue.
+queue_declare(#{<<"group">> := Group, <<"topic">> := TopicFilter} = Params) ->
+    CreatedAt = emqx_message:timestamp_now(),
+    StartTime = maps:get(<<"start_time">>, Params, CreatedAt),
+    emqx_ds_shared_sub_queue:declare(Group, TopicFilter, CreatedAt, StartTime).
+
+%%--------------------------------------------------------------------
+
+encode_queue(Queue) ->
+    maps:merge(
+        #{id => emqx_ds_shared_sub_queue:id(Queue)},
+        emqx_ds_shared_sub_queue:properties(Queue)
+    ).
 
 %%--------------------------------------------------------------------
 %% Schemas
@@ -178,7 +201,7 @@ durable_queues_get() ->
 durable_queue_get() ->
     ref(durable_queue_get).
 
-durable_queue_put() ->
+durable_queue_post() ->
     map().
 
 roots() -> [].
@@ -206,13 +229,3 @@ durable_queues_get_example() ->
             id => <<"queue2">>
         }
     ].
-
-%%--------------------------------------------------------------------
-%% Error codes
-%%--------------------------------------------------------------------
-
-serialize_error(not_found) ->
-    {404, #{
-        code => <<"NOT_FOUND">>,
-        message => <<"Queue Not Found">>
-    }}.

+ 15 - 1
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_queue.erl

@@ -11,9 +11,15 @@
     lookup/2,
     exists/2,
     declare/4,
+    destroy/1,
     destroy/2
 ]).
 
+-export([
+    id/1,
+    properties/1
+]).
+
 %%
 
 lookup(Group, Topic) ->
@@ -48,7 +54,15 @@ destroy(ID) ->
     %% FIXME: Sync on leader.
     case lookup(ID) of
         false ->
-            notfound;
+            not_found;
         Queue ->
             emqx_ds_shared_sub_store:destroy(Queue)
     end.
+
+%%
+
+id(Queue) ->
+    emqx_ds_shared_sub_store:id(Queue).
+
+properties(Queue) ->
+    emqx_ds_shared_sub_store:get(properties, Queue).

+ 68 - 21
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -78,45 +78,97 @@ t_basic_crud(_Config) ->
         api_get(["durable_queues"])
     ),
 
+    Resp1 = api(post, ["durable_queues"], #{
+        <<"group">> => <<"g1">>,
+        <<"topic">> => <<"#">>,
+        <<"start_time">> => 42
+    }),
     ?assertMatch(
-        {ok, 200, #{
-            <<"id">> := <<"q1">>
+        {ok, 201, #{
+            <<"id">> := _QueueID,
+            <<"created_at">> := _,
+            <<"group">> := <<"g1">>,
+            <<"topic">> := <<"#">>,
+            <<"start_time">> := 42
         }},
-        api(put, ["durable_queues", "q1"], #{})
+        Resp1
     ),
 
     ?assertMatch(
         {error, {_, 404, _}},
-        api_get(["durable_queues", "q2"])
+        api_get(["durable_queues", "non-existent-queue"])
     ),
 
+    {ok, 201, #{<<"id">> := QueueID1}} = Resp1,
     ?assertMatch(
-        {ok, 200, #{
-            <<"id">> := <<"q2">>
+        {ok, #{
+            <<"id">> := QueueID1,
+            <<"group">> := <<"g1">>,
+            <<"topic">> := <<"#">>
         }},
-        api(put, ["durable_queues", "q2"], #{})
+        api_get(["durable_queues", QueueID1])
     ),
 
+    Resp2 = api(post, ["durable_queues"], #{
+        <<"group">> => <<"g1">>,
+        <<"topic">> => <<"another/topic/filter/+">>,
+        <<"start_time">> => 0
+    }),
     ?assertMatch(
-        {ok, #{
-            <<"id">> := <<"q2">>
+        {ok, 201, #{
+            <<"id">> := _QueueID,
+            <<"group">> := <<"g1">>,
+            <<"topic">> := <<"another/topic/filter/+">>,
+            <<"start_time">> := 0
         }},
-        api_get(["durable_queues", "q2"])
+        Resp2
     ),
 
+    %% TODO
+    %% ?assertMatch(
+    %%     {ok, [#{<<"id">> := <<"q2">>}, #{<<"id">> := <<"q1">>}]},
+    %%     api_get(["durable_queues"])
+    %% ),
+
     ?assertMatch(
-        {ok, [#{<<"id">> := <<"q2">>}, #{<<"id">> := <<"q1">>}]},
-        api_get(["durable_queues"])
+        {ok, 200, <<"Queue deleted">>},
+        api(delete, ["durable_queues", QueueID1], #{})
+    ),
+    ?assertMatch(
+        {ok, 404, #{<<"code">> := <<"NOT_FOUND">>}},
+        api(delete, ["durable_queues", QueueID1], #{})
     ),
 
+    %% TODO
+    %% ?assertMatch(
+    %%     {ok, [#{<<"id">> := <<"q1">>}]},
+    %%     api_get(["durable_queues"])
+    %% ).
+
+    ok.
+
+t_duplicate_queue(_Config) ->
     ?assertMatch(
-        {ok, 200, <<"Queue deleted">>},
-        api(delete, ["durable_queues", "q2"], #{})
+        {ok, 201, #{
+            <<"id">> := _QueueID,
+            <<"group">> := <<"g1">>,
+            <<"topic">> := <<"#">>,
+            <<"start_time">> := 42
+        }},
+        api(post, ["durable_queues"], #{
+            <<"group">> => <<"g1">>,
+            <<"topic">> => <<"#">>,
+            <<"start_time">> => 42
+        })
     ),
 
     ?assertMatch(
-        {ok, [#{<<"id">> := <<"q1">>}]},
-        api_get(["durable_queues"])
+        {ok, 409, #{<<"code">> := <<"CONFLICT">>}},
+        api(post, ["durable_queues"], #{
+            <<"group">> => <<"g1">>,
+            <<"topic">> => <<"#">>,
+            <<"start_time">> => 0
+        })
     ).
 
 %%--------------------------------------------------------------------
@@ -143,8 +195,3 @@ api(Method, Path, Data) ->
         {error, _} = Error ->
             Error
     end.
-
-terminate_leaders() ->
-    ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
-    {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
-    ok.

+ 4 - 4
rel/i18n/emqx_ds_shared_sub_api.hocon

@@ -24,11 +24,11 @@ durable_queue_delete.desc:
 durable_queue_delete.label:
 """Delete Durable Queue"""
 
-durable_queues_put.desc:
-"""Create a durable queue."""
+durable_queues_post.desc:
+"""Declare new durable queue."""
 
-durable_queues_put.label:
-"""Create Durable Queue"""
+durable_queues_post.label:
+"""Declare Durable Queue"""
 
 
 }