Parcourir la source

feat(retain): add two new endpoints for retain API

firest il y a 2 ans
Parent
commit
c180325e0a

+ 28 - 10
apps/emqx_retainer/src/emqx_retainer_api.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@
 -export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]).
 
 -export([
-    lookup_retained_warp/2,
+    '/messages'/2,
     with_topic_warp/2,
     config/2
 ]).
@@ -69,11 +69,11 @@ schema(?PREFIX) ->
     };
 schema(?PREFIX ++ "/messages") ->
     #{
-        'operationId' => lookup_retained_warp,
+        'operationId' => '/messages',
         get => #{
             tags => ?TAGS,
             description => ?DESC(list_retained_api),
-            parameters => page_params(),
+            parameters => parameters(query, false, query_match_topic) ++ page_params(),
             responses => #{
                 200 => [
                     {data, mk(array(ref(message_summary)), #{desc => ?DESC(retained_list)})},
@@ -81,6 +81,13 @@ schema(?PREFIX ++ "/messages") ->
                 ],
                 400 => error_codes(['BAD_REQUEST'], ?DESC(unsupported_backend))
             }
+        },
+        delete => #{
+            tags => ?TAGS,
+            description => ?DESC(delete_messages),
+            responses => #{
+                204 => <<>>
+            }
         }
     };
 schema(?PREFIX ++ "/message/:topic") ->
@@ -118,12 +125,15 @@ conf_schema() ->
     ref(emqx_retainer_schema, "retainer").
 
 parameters() ->
+    parameters(path, true, topic).
+
+parameters(In, Required, Desc) ->
     [
         {topic,
             mk(binary(), #{
-                in => path,
-                required => true,
-                desc => ?DESC(topic)
+                in => In,
+                required => Required,
+                desc => ?DESC(Desc)
             })}
     ].
 
@@ -142,8 +152,10 @@ fields(message) ->
         | fields(message_summary)
     ].
 
-lookup_retained_warp(Type, Params) ->
-    check_backend(Type, Params, fun lookup_retained/2).
+'/messages'(get, Params) ->
+    check_backend(get, Params, fun lookup_retained/2);
+'/messages'(delete, Params) ->
+    delete_messages(delete, Params).
 
 with_topic_warp(Type, Params) ->
     check_backend(Type, Params, fun with_topic/2).
@@ -168,7 +180,9 @@ config(put, #{body := Body}) ->
 lookup_retained(get, #{query_string := Qs}) ->
     Page = maps:get(<<"page">>, Qs, 1),
     Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
-    {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
+    Topic = maps:get(<<"topic">>, Qs, undefined),
+    ct:print("Qs:~p~n", [Qs]),
+    {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
     {200, #{
         data => [format_message(Msg) || Msg <- Msgs],
         meta => #{page => Page, limit => Limit, count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}
@@ -199,6 +213,10 @@ with_topic(delete, #{bindings := Bindings}) ->
             {204}
     end.
 
+delete_messages(delete, _) ->
+    emqx_retainer:clean(),
+    {204}.
+
 format_message(#message{
     id = ID,
     qos = Qos,

+ 35 - 2
apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -25,7 +25,9 @@
 
 -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
 
--import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]).
+-import(emqx_mgmt_api_test_util, [
+    request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0
+]).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -308,6 +310,37 @@ t_change_storage_type(_Config) ->
 
     ok.
 
+t_match_and_clean(_) ->
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C1),
+    emqx_retainer:clean(),
+    timer:sleep(300),
+
+    _ = [
+        emqtt:publish(C1, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}])
+     || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>]
+    ],
+
+    timer:sleep(1000),
+
+    API = api_path(["mqtt", "retainer", "messages"]),
+    {ok, LookupJson} = request_api(get, API, "topic=t/%2B", auth_header_()),
+    LookupResult = decode_json(LookupJson),
+
+    Expected = lists:usort([<<"t/1">>, <<"t/2">>, <<"t/3">>]),
+    ?assertMatch(
+        Expected,
+        lists:usort([Topic || #{topic := Topic} <- maps:get(data, LookupResult)])
+    ),
+
+    CleanAPI = api_path(["mqtt", "retainer", "messages"]),
+    {ok, []} = request_api(delete, CleanAPI),
+
+    {ok, LookupJson2} = request_api(get, API),
+    ?assertMatch(#{data := []}, decode_json(LookupJson2)),
+
+    ok = emqtt:disconnect(C1).
+
 %%--------------------------------------------------------------------
 %% HTTP Request
 %%--------------------------------------------------------------------

+ 9 - 1
rel/i18n/emqx_retainer_api.hocon

@@ -27,8 +27,13 @@ list_retained_api.desc:
 list_retained_api.label:
 """List retained messages."""
 
+delete_messages.desc:
+"""Delete all retained messages"""
+delete_messages.label:
+"""Delete Retained Messages"""
+
 lookup_api.desc:
-"""Lookup a message by a topic without wildcards."""
+"""Lookup a message by a topic without wildcards, ."""
 lookup_api.label:
 """Lookup a message"""
 
@@ -56,6 +61,9 @@ retained_list.desc:
 topic.desc:
 """Topic."""
 
+query_match_topic.desc:
+"""Topic filter, supports wildcards, omit this to match all messages."""
+
 unsupported_backend.desc:
 """Unsupported backend."""