Kaynağa Gözat

fix(delayed): make it possible to lookup/delete other node's delayed message

firest 4 yıl önce
ebeveyn
işleme
3fc4236565

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -20,3 +20,4 @@
 {emqx_statsd,1}.
 {emqx_telemetry,1}.
 {emqx_topic_metrics,1}.
+{emqx_delayed,1}.

+ 29 - 14
apps/emqx_modules/src/emqx_delayed.erl

@@ -46,7 +46,9 @@
         , update_config/1
         , list/1
         , get_delayed_message/1
+        , get_delayed_message/2
         , delete_delayed_message/1
+        , delete_delayed_message/2
         , post_config_update/5
         , cluster_list/1
         , cluster_query/4
@@ -56,6 +58,10 @@
 
 -record(delayed_message, {key, delayed, msg}).
 -type delayed_message() :: #delayed_message{}.
+-type with_id_return() :: ok | {error, not_found}.
+-type with_id_return(T) :: {ok, T} | {error, not_found}.
+
+-export_type([with_id_return/0, with_id_return/1]).
 
 %% sync ms with record change
 -define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
@@ -155,6 +161,7 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed,
     RemainingTime = ExpectTimeStamp - erlang:system_time(second),
     Result = #{
         msgid => emqx_guid:to_hexstr(Id),
+        node => node(),
         publish_at => PublishTime,
         delayed_interval => Delayed,
         delayed_remaining => RemainingTime,
@@ -174,22 +181,24 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed,
 to_rfc3339(Timestamp) ->
     list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
 
-get_delayed_message(Id0) ->
-    try emqx_guid:from_hexstr(Id0) of
-        Id ->
-            case ets:select(?TAB, ?QUERY_MS(Id)) of
-                [] ->
-                    {error, not_found};
-                Rows ->
-                    Message = hd(Rows),
-                    {ok, format_delayed(Message, true)}
-            end
-    catch
-        error:function_clause -> {error, id_schema_error}
+-spec get_delayed_message(binary()) -> with_id_return(map()).
+get_delayed_message(Id) ->
+    case ets:select(?TAB, ?QUERY_MS(Id)) of
+        [] ->
+            {error, not_found};
+        Rows ->
+            Message = hd(Rows),
+            {ok, format_delayed(Message, true)}
     end.
 
-delete_delayed_message(Id0) ->
-    Id = emqx_guid:from_hexstr(Id0),
+get_delayed_message(Node, Id) when Node =:= node() ->
+    get_delayed_message(Id);
+
+get_delayed_message(Node, Id) ->
+    emqx_delayed_proto_v1:get_delayed_message(Node, Id).
+
+-spec delete_delayed_message(binary()) -> with_id_return().
+delete_delayed_message(Id) ->
     case ets:select(?TAB, ?DELETE_MS(Id)) of
         [] ->
             {error, not_found};
@@ -197,6 +206,12 @@ delete_delayed_message(Id0) ->
             Timestamp = hd(Rows),
             mria:dirty_delete(?TAB, {Timestamp, Id})
     end.
+
+delete_delayed_message(Node, Id) when Node =:= node() ->
+    delete_delayed_message(Id);
+delete_delayed_message(Node, Id) ->
+    emqx_delayed_proto_v1:delete_delayed_message(Node, Id).
+
 update_config(Config) ->
     emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
 

+ 77 - 42
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -40,14 +40,12 @@
 
 -export([api_spec/0]).
 
--define(ALREADY_ENABLED, 'ALREADY_ENABLED').
--define(ALREADY_DISABLED, 'ALREADY_DISABLED').
-
 -define(INTERNAL_ERROR, 'INTERNAL_ERROR').
 -define(BAD_REQUEST, 'BAD_REQUEST').
 
 -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
 -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
+-define(INVALID_NODE, 'INVALID_NODE').
 -define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
 
 api_spec() ->
@@ -56,7 +54,7 @@ api_spec() ->
 paths() ->
     [ "/mqtt/delayed"
     , "/mqtt/delayed/messages"
-    , "/mqtt/delayed/messages/:msgid"
+    , "/mqtt/delayed/messages/:node/:msgid"
     ].
 
 schema("/mqtt/delayed") ->
@@ -83,12 +81,16 @@ schema("/mqtt/delayed") ->
         }
     };
 
-schema("/mqtt/delayed/messages/:msgid") ->
+schema("/mqtt/delayed/messages/:node/:msgid") ->
     #{'operationId' => delayed_message,
         get => #{
             tags => ?API_TAG_MQTT,
             description => <<"Get delayed message">>,
-            parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
+            parameters => [ {node,
+                               mk(binary(),
+                                    #{in => path, desc => <<"The node where message from">>})}
+                          , {msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})}
+                          ],
             responses => #{
                 200 => ref("message_without_payload"),
                 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
@@ -100,7 +102,12 @@ schema("/mqtt/delayed/messages/:msgid") ->
         delete => #{
             tags => ?API_TAG_MQTT,
             description => <<"Delete delayed message">>,
-            parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
+            parameters => [ {node,
+                               mk(binary(),
+                                    #{in => path, desc => <<"The node where message from">>})}
+                          , {msgid,
+                               mk(binary(), #{in => path, desc => <<"Delay message ID">>})}
+                          ],
             responses => #{
                 204 => <<"Delete delayed message success">>,
                 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
@@ -133,15 +140,16 @@ schema("/mqtt/delayed/messages") ->
 
 fields("message_without_payload") ->
     [
-        {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
-        {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
-        {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
-        {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
-        {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
-        {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
-        {qos, mk(binary(), #{desc => <<"QoS">>})},
-        {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
-        {from_username, mk(binary(), #{desc => <<"From Username">>})}
+      {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
+      {node, mk(binary(), #{desc => <<"The node where message from">>})},
+      {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
+      {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
+      {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
+      {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
+      {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
+      {qos, mk(binary(), #{desc => <<"QoS">>})},
+      {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
+      {from_username, mk(binary(), #{desc => <<"From Username">>})}
     ];
 fields("message") ->
     PayloadDesc = io_lib:format(
@@ -162,31 +170,37 @@ status(put, #{body := Body}) ->
 delayed_messages(get, #{query_string := Qs}) ->
     {200, emqx_delayed:cluster_list(Qs)}.
 
-delayed_message(get, #{bindings := #{msgid := Id}}) ->
-    case emqx_delayed:get_delayed_message(Id) of
-        {ok, Message} ->
-            Payload = maps:get(payload, Message),
-            case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of
-                true ->
-                    {200, Message};
-                _ ->
-                    {200, Message#{payload => base64:encode(Payload)}}
-            end;
-        {error, id_schema_error} ->
-            {400, generate_http_code_map(id_schema_error, Id)};
-        {error, not_found} ->
-            {404, generate_http_code_map(not_found, Id)}
-    end;
-delayed_message(delete, #{bindings := #{msgid := Id}}) ->
-    case emqx_delayed:get_delayed_message(Id) of
-        {ok, _Message} ->
-            _ = emqx_delayed:delete_delayed_message(Id),
-            {204};
-        {error, id_schema_error} ->
-            {400, generate_http_code_map(id_schema_error, Id)};
-        {error, not_found} ->
-            {404, generate_http_code_map(not_found, Id)}
-    end.
+delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
+    MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
+    MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
+    with_maybe([MaybeNode, MaybeId],
+               fun(Node, Id) ->
+                       case emqx_delayed:get_delayed_message(Node, Id) of
+                           {ok, Message} ->
+                               Payload = maps:get(payload, Message),
+                               case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of
+                                   true ->
+                                       {200, Message};
+                                   _ ->
+                                       {200, Message#{payload => base64:encode(Payload)}}
+                               end;
+                           {error, not_found} ->
+                               {404, generate_http_code_map(not_found, Id)}
+                       end
+               end);
+
+delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
+    MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
+    MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
+    with_maybe([MaybeNode, MaybeId],
+               fun(Node, Id) ->
+                       case emqx_delayed:delete_delayed_message(Node, Id) of
+                           ok ->
+                               {204};
+                           {error, not_found} ->
+                               {404, generate_http_code_map(not_found, Id)}
+                       end
+               end).
 
 %%--------------------------------------------------------------------
 %% internal function
@@ -236,4 +250,25 @@ generate_http_code_map(id_schema_error, Id) ->
           iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))};
 generate_http_code_map(not_found, Id) ->
     #{code => ?MESSAGE_ID_NOT_FOUND, message =>
-          iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
+          iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))};
+generate_http_code_map(invalid_node, Node) ->
+    #{code => ?INVALID_NODE, message =>
+          iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node]))}.
+
+make_maybe(X, Error, Fun) ->
+    try Fun(X) of
+        Right ->
+            Right
+    catch _:_ ->
+            {left, X, Error}
+    end.
+
+with_maybe(Maybes, Cont) ->
+    with_maybe(Maybes, Cont, []).
+
+with_maybe([], Cont, Rights) ->
+    erlang:apply(Cont, lists:reverse(Rights));
+with_maybe([{left, X, Error} | _], _Cont, _Rights) ->
+    {400, generate_http_code_map(Error, X)};
+with_maybe([Right | T], Cont, Rights) ->
+    with_maybe(T, Cont, [Right | Rights]).

+ 37 - 0
apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl

@@ -0,0 +1,37 @@
+%%--------------------------------------------------------------------
+%%Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_delayed_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+        , get_delayed_message/2
+        , delete_delayed_message/2
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec get_delayed_message(node(), binary()) -> emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
+get_delayed_message(Node, HexId) ->
+    rpc:call(Node, emqx_delayed, get_delayed_message, [HexId]).
+
+-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
+delete_delayed_message(Node, HexId) ->
+    rpc:call(Node, emqx_delayed, delete_delayed_message, [HexId]).