Kaynağa Gözat

feat: support to delete delayed messages via topic name

JianBo He 2 yıl önce
ebeveyn
işleme
9ce96bafa3

+ 17 - 0
apps/emqx_modules/src/emqx_delayed.erl

@@ -58,6 +58,7 @@
     get_delayed_message/2,
     delete_delayed_message/1,
     delete_delayed_message/2,
+    delete_delayed_messages_by_topic_name/1,
     clear_all/0,
     %% rpc target
     clear_all_local/0,
@@ -95,6 +96,13 @@
 %% sync ms with record change
 -define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
 -define(DELETE_MS(Id), [{{delayed_message, {'$1', Id}, '_', '_'}, [], ['$1']}]).
+-define(DELETE_BY_TOPIC_MS(Topic), [
+    {
+        {delayed_message, '$1', '_', {message, '_', '_', '_', '_', '_', Topic, '_', '_', '_'}},
+        [],
+        ['$1']
+    }
+]).
 
 -define(TAB, ?MODULE).
 -define(SERVER, ?MODULE).
@@ -267,6 +275,15 @@ delete_delayed_message(Node, Id) when Node =:= node() ->
 delete_delayed_message(Node, Id) ->
     emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
 
+-spec delete_delayed_messages_by_topic_name(binary()) -> with_id_return().
+delete_delayed_messages_by_topic_name(TopicName) when is_binary(TopicName) ->
+    case ets:select(?TAB, ?DELETE_BY_TOPIC_MS(TopicName)) of
+        [] ->
+            {error, not_found};
+        Rows ->
+            lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, Rows)
+    end.
+
 -spec clear_all() -> ok.
 clear_all() ->
     Nodes = emqx:running_nodes(),

+ 59 - 1
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -27,7 +27,8 @@
 -export([
     status/2,
     delayed_messages/2,
-    delayed_message/2
+    delayed_message/2,
+    delayed_message_topic/2
 ]).
 
 -export([
@@ -51,6 +52,9 @@
 -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
 -define(INVALID_NODE, 'INVALID_NODE').
 
+-define(INVALID_TOPIC, 'INVALID_TOPIC_NAME').
+-define(MESSAGE_NOT_FOUND, 'MESSAGE_NOT_FOUND').
+
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
 
@@ -58,6 +62,7 @@ paths() ->
     [
         "/mqtt/delayed",
         "/mqtt/delayed/messages",
+        "/mqtt/delayed/messages/:topic",
         "/mqtt/delayed/messages/:node/:msgid"
     ].
 
@@ -87,6 +92,32 @@ schema("/mqtt/delayed") ->
             }
         }
     };
+schema("/mqtt/delayed/messages/:topic") ->
+    #{
+        'operationId' => delayed_message_topic,
+        delete => #{
+            tags => ?API_TAG_MQTT,
+            description => ?DESC(delete_api),
+            parameters => [
+                {topic,
+                    mk(
+                        binary(),
+                        #{in => path, desc => ?DESC(topic)}
+                    )}
+            ],
+            responses => #{
+                204 => <<"Delete delayed message success">>,
+                400 => emqx_dashboard_swagger:error_codes(
+                    [?INVALID_TOPIC],
+                    ?DESC(bad_topic_name)
+                ),
+                404 => emqx_dashboard_swagger:error_codes(
+                    [?MESSAGE_NOT_FOUND],
+                    ?DESC(no_delayed_message)
+                )
+            }
+        }
+    };
 schema("/mqtt/delayed/messages/:node/:msgid") ->
     #{
         'operationId' => delayed_message,
@@ -223,6 +254,19 @@ delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
         end
     ).
 
+delayed_message_topic(delete, #{bindings := #{topic := Topic}}) ->
+    MaybeTopic = make_maybe(Topic, invalid_topic_name, fun validate_topic_name/1),
+    with_maybe(
+        [MaybeTopic],
+        fun(TopicName) ->
+            case emqx_delayed:delete_delayed_messages_by_topic_name(TopicName) of
+                ok ->
+                    {204};
+                {error, not_found} ->
+                    {404, generate_http_code_map(message_not_found, TopicName)}
+            end
+        end
+    ).
 %%--------------------------------------------------------------------
 %% internal function
 %%--------------------------------------------------------------------
@@ -279,6 +323,12 @@ generate_http_code_map(not_found, Id) ->
         message =>
             iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
     };
+generate_http_code_map(message_not_found, Topic) ->
+    #{
+        code => ?MESSAGE_NOT_FOUND,
+        message =>
+            iolist_to_binary(io_lib:format("Not found messages for ~s", [Topic]))
+    };
 generate_http_code_map(invalid_node, Node) ->
     #{
         code => ?INVALID_NODE,
@@ -295,6 +345,14 @@ make_maybe(X, Error, Fun) ->
             {left, X, Error}
     end.
 
+validate_topic_name(Topic) ->
+    case emqx_topic:wildcard(Topic) of
+        true ->
+            error(badarg);
+        false ->
+            Topic
+    end.
+
 with_maybe(Maybes, Cont) ->
     with_maybe(Maybes, Cont, []).
 

+ 35 - 0
apps/emqx_modules/test/emqx_delayed_api_SUITE.erl

@@ -189,6 +189,30 @@ t_messages(_) ->
 
     ok = emqtt:disconnect(C1).
 
+t_delete_messages_via_topic(_) ->
+    clear_all_record(),
+    emqx_delayed:load(),
+
+    OriginTopic = <<"t/a">>,
+    Topic = <<"$delayed/123/", OriginTopic/binary>>,
+
+    publish_a_delayed_message(Topic),
+    publish_a_delayed_message(Topic),
+
+    %% assert: delayed messages are saved
+    ?assertMatch([_, _], get_messages(2)),
+
+    %% delete these messages via topic
+    TopicInUrl = uri_string:quote(OriginTopic),
+    {ok, 204, _} = request(
+        delete,
+        uri(["mqtt", "delayed", "messages", TopicInUrl])
+    ),
+
+    %% assert: messages are deleted
+    ?assertEqual([], get_messages(0)),
+    ok.
+
 t_large_payload(_) ->
     clear_all_record(),
     emqx_delayed:load(),
@@ -246,3 +270,14 @@ get_messages(Len) ->
         )
     ),
     Msgs.
+
+publish_a_delayed_message(Topic) ->
+    {ok, C1} = emqtt:start_link([{clean_start, true}]),
+    {ok, _} = emqtt:connect(C1),
+    emqtt:publish(
+        C1,
+        Topic,
+        <<"This is a delayed messages">>,
+        [{qos, 1}]
+    ),
+    ok = emqtt:disconnect(C1).

+ 10 - 0
rel/i18n/emqx_delayed_api.hocon

@@ -65,6 +65,16 @@ msgid_not_found.desc:
 msgid_not_found.label:
 """Message ID not found"""
 
+bad_topic_name.desc:
+"""Bad Topic Name"""
+bad_topic_name.label:
+"""Bad Topic Name"""
+
+no_delayed_message.desc:
+"""Not found delayed message for this topic"""
+no_delayed_message.label:
+"""Not found delayed message for this topic"""
+
 node.desc:
 """The node where message from"""
 node.label: