DDDHuang 4 лет назад
Родитель
Сommit
5520326ce3

+ 3 - 1
apps/emqx_management/src/emqx_mgmt_util.erl

@@ -140,6 +140,8 @@ response_error_schema(Description, Enum) ->
 response_page_schema(Def) when is_atom(Def) ->
     response_page_schema(atom_to_binary(Def, utf8));
 response_page_schema(Def) when is_binary(Def) ->
+    response_page_schema(minirest:ref(Def));
+response_page_schema(ItemSchema) when is_map(ItemSchema) ->
     Schema = #{
         type => object,
         properties => #{
@@ -154,7 +156,7 @@ response_page_schema(Def) when is_binary(Def) ->
                         type => integer}}},
             data => #{
                 type => array,
-                items => minirest:ref(Def)}}},
+                items => ItemSchema}}},
     json_content_schema("", Schema).
 
 response_batch_schema(DefName) when is_atom(DefName) ->

+ 1 - 0
apps/emqx_modules/etc/emqx_modules.conf

@@ -1,5 +1,6 @@
 delayed: {
     enable: true
+    ## 0 is no limit
     max_delayed_messages: 0
 }
 

+ 66 - 1
apps/emqx_modules/src/emqx_delayed.erl

@@ -21,7 +21,6 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 
-
 %% Mnesia bootstrap
 -export([mnesia/1]).
 
@@ -44,6 +43,11 @@
 %% gen_server callbacks
 -export([ enable/0
         , disable/0
+        , set_max_delayed_messages/1
+        , update_config/2
+        , list/1
+        , get_delayed_message/1
+        , delete_delayed_message/1
         ]).
 
 -record(delayed_message, {key, msg}).
@@ -117,6 +121,64 @@ enable() ->
 disable() ->
     gen_server:call(?SERVER, disable).
 
+set_max_delayed_messages(Max) ->
+    gen_server:call(?SERVER, {set_max_delayed_messages, Max}).
+
+list(Params) ->
+    emqx_mgmt_api:paginate(?TAB, Params, fun format_delayed/1).
+
+format_delayed(Delayed) ->
+    format_delayed(Delayed, false).
+
+format_delayed(#delayed_message{key = {TimeStamp, Id},
+            msg = #message{topic = Topic,
+                           from = From,
+                           headers = #{username := Username},
+                           qos = Qos,
+                           payload = Payload}}, WithPayload) ->
+    Result = #{
+        id => emqx_guid:to_hexstr(Id),
+        publish_time => list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, second}])),
+        topic => Topic,
+        qos => Qos,
+        from_clientid => From,
+        from_username => Username
+    },
+    case WithPayload of
+        true ->
+            Result#{payload => Payload};
+        _ ->
+            Result
+    end.
+
+get_delayed_message(Id0) ->
+    Id = emqx_guid:from_hexstr(Id0),
+    Ms = [{{delayed_message,{'_',Id},'_'},[],['$_']}],
+    case ets:select(?TAB, Ms) of
+        [] ->
+            {error, not_found};
+        Rows ->
+            Message = hd(Rows),
+            {ok, format_delayed(Message, true)}
+    end.
+
+delete_delayed_message(Id0) ->
+    Id = emqx_guid:from_hexstr(Id0),
+    Ms = [{{delayed_message, {'$1', Id}, '_'}, [], ['$1']}],
+    case ets:select(?TAB, Ms) of
+        [] ->
+            {error, not_found};
+        Rows ->
+            Timestamp = hd(Rows),
+            ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id})
+    end.
+
+update_config(Enable, MaxDelayedMessages) ->
+    Opts0 = emqx_config:get_raw([<<"delayed">>], #{}),
+    Opts1 = maps:put(<<"enable">>, Enable, Opts0),
+    Opts = maps:put(<<"max_delayed_messages">>, MaxDelayedMessages, Opts1),
+    {ok, _} = emqx:update_config([delayed], Opts).
+
 %%--------------------------------------------------------------------
 %% gen_server callback
 %%--------------------------------------------------------------------
@@ -128,6 +190,9 @@ init([Opts]) ->
                                   publish_at => 0,
                                   max_delayed_messages => MaxDelayedMessages}))}.
 
+handle_call({set_max_delayed_messages, Max}, _From, State) ->
+    {reply, ok, State#{max_delayed_messages => Max}};
+
 handle_call({store, DelayedMsg = #delayed_message{key = Key}},
             _From, State = #{max_delayed_messages := 0}) ->
     ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),

+ 138 - 91
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -20,110 +20,133 @@
 
 -import(emqx_mgmt_util, [ response_schema/1
                         , response_schema/2
+                        , response_error_schema/2
+                        , response_page_schema/1
                         , request_body_schema/1
                         ]).
 
-% -export([cli/1]).
-
 -export([ status/2
         , delayed_messages/2
-        , delete_delayed_message/2
+        , delayed_message/2
         ]).
 
--export([enable_delayed/2]).
+%% for rpc
+-export([update_config_/2]).
 
 -export([api_spec/0]).
 
+-define(ALREADY_ENABLED, 'ALREADY_ENABLED').
+-define(ALREADY_DISABLED, 'ALREADY_DISABLED').
+
+-define(BAD_REQUEST, 'BAD_REQUEST').
+
+-define(MESSAGE_ID_NOT_FOUND, 'ALREADY_DISABLED').
+
 api_spec() ->
-    {[status(), delayed_messages(), delete_delayed_message()],
-     [delayed_message_schema()]}.
+    {
+        [status_api(), delayed_messages_api(), delayed_message_api()],
+        []
+    }.
 
+delayed_schema() ->
+    delayed_schema(false).
 
-delayed_message_schema() ->
-    #{broker_info => #{
-        type => object,
-        properties => #{
-            msgid => #{
-                type => string,
-                description => <<"Message Id">>
+delayed_schema(WithPayload) ->
+    case WithPayload of
+        true ->
+            #{
+                type => object,
+                properties => delayed_message_properties()
+            };
+        _ ->
+            #{
+                type => object,
+                properties => maps:without([payload], delayed_message_properties())
             }
-        }
-    }}.
+    end.
 
-status() ->
+delayed_message_properties() ->
+    #{
+        id => #{
+            type => integer,
+            description => <<"Message Id (MQTT message id hash)">>},
+        publish_time => #{
+            type => string,
+            description => <<"publish time, rfc 3339">>},
+        topic => #{
+            type => string,
+            description => <<"Topic">>},
+        qos => #{
+            type => integer,
+            enum => [0, 1, 2],
+            description => <<"Qos">>},
+        payload => #{
+            type => string,
+            description => <<"Payload">>},
+        form_clientid => #{
+            type => string,
+            description => <<"Client ID">>},
+        form_username => #{
+            type => string,
+            description => <<"Username">>}
+    }.
+
+status_api() ->
+    Schema = #{
+        type => object,
+        properties => #{
+            enable => #{
+                type => boolean},
+            max_delayed_messages => #{
+                type => integer,
+                description => <<"Max limit, 0 is no limit">>}}},
     Metadata = #{
         get => #{
             description => "Get delayed status",
             responses => #{
-                <<"200">> => response_schema(<<"Bad Request">>,
-                    #{
-                        type => object,
-                        properties => #{enable => #{type => boolean}}
-                    }
-                )
-            }
-        },
+                <<"200">> => response_schema(<<"Bad Request">>, Schema)}},
         put => #{
-            description => "Enable or disbale delayed",
-            'requestBody' => request_body_schema(#{
-                type => object,
-                properties => #{
-                    enable => #{
-                        type => boolean
-                    }
-                }
-            }),
+            description => "Enable or disable delayed, set max delayed messages",
+            'requestBody' => request_body_schema(Schema),
             responses => #{
                 <<"200">> =>
-                    response_schema(<<"Enable or disbale delayed successfully">>),
+                    response_schema(<<"Enable or disable delayed successfully">>),
                 <<"400">> =>
-                    response_schema(<<"Bad Request">>,
-                        #{
-                            type => object,
-                            properties => #{
-                                message => #{type => string},
-                                code => #{type => string}
-                            }
-                        }
-                    )
-            }
-        }
-    },
-    {"/delayed/status", Metadata, status}.
+                    response_error_schema(<<"Already disabled or enabled">>, [?ALREADY_ENABLED, ?ALREADY_DISABLED])}}},
+    {"/mqtt/delayed_messages/status", Metadata, status}.
 
-delayed_messages() ->
+delayed_messages_api() ->
     Metadata = #{
         get => #{
-            description => "Get delayed message list",
+            description => "List delayed messages",
             responses => #{
-                <<"200">> => emqx_mgmt_util:response_array_schema(<<>>, delayed_message)
-            }
-        }
-    },
-    {"/delayed/messages", Metadata, delayed_messages}.
+                <<"200">> => response_page_schema(delayed_schema())}}},
+    {"/mqtt/delayed_messages", Metadata, delayed_messages}.
 
-delete_delayed_message() ->
+delayed_message_api() ->
     Metadata = #{
+        get => #{
+            description => "Get delayed message",
+            parameters => [#{
+                name => id,
+                in => path,
+                schema => #{type => string},
+                required => true
+            }],
+            responses => #{
+                <<"200">> => response_schema(<<"Get delayed message success">>, delayed_schema(true)),
+                <<"404">> => response_error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND])}},
         delete => #{
             description => "Delete delayed message",
             parameters => [#{
-                name => msgid,
+                name => id,
                 in => path,
                 schema => #{type => string},
                 required => true
             }],
             responses => #{
-                <<"200">> => response_schema(<<"Bad Request">>,
-                    #{
-                        type => object,
-                        properties => #{enable => #{type => boolean}}
-                    }
-                )
-            }
-        }
-    },
-    {"/delayed/messages/:msgid", Metadata, delete_delayed_message}.
-
+                <<"200">> => response_schema(<<"Delete delayed message success">>)}}},
+    {"/mqtt/delayed_messages/:id", Metadata, delayed_message}.
 
 %%--------------------------------------------------------------------
 %% HTTP API
@@ -135,33 +158,60 @@ status(put, Request) ->
     {ok, Body, _} = cowboy_req:read_body(Request),
     Params = emqx_json:decode(Body, [return_maps]),
     Enable = maps:get(<<"enable">>, Params),
-    case Enable =:= get_status() of
-        true ->
-            Reason = case Enable of
-                true -> <<"Telemetry status is already enabled">>;
-                false -> <<"Telemetry status is already disable">>
-            end,
-            {400, #{code => "BAD_REQUEST", message => Reason}};
-        false ->
-            enable_delayed(Enable),
-             {200}
-    end.
-
-delayed_messages(get, _Request) ->
-    {200, []}.
-
-delete_delayed_message(delete, _Request) ->
+    MaxDelayedMessages = maps:get(<<"max_delayed_messages">>, Params),
+    update_config(Enable, MaxDelayedMessages).
+
+delayed_messages(get, Request) ->
+    Qs = cowboy_req:parse_qs(Request),
+    {200, emqx_delayed:list(Qs)}.
+
+delayed_message(get, Request) ->
+    Id = cowboy_req:binding(id, Request),
+    case emqx_delayed:get_delayed_message(Id) of
+        {ok, Message} ->
+            {200, Message};
+        {error, not_found} ->
+            Message = list_to_binary(io_lib:format("Message ID ~p not found", [Id])),
+            {404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}}
+    end;
+delayed_message(delete, Request) ->
+    Id = cowboy_req:binding(id, Request),
+    _ = emqx_delayed:delete_delayed_message(Id),
     {200}.
 
 %%--------------------------------------------------------------------
 %% internal function
 %%--------------------------------------------------------------------
-enable_delayed(Enable) ->
+get_status() ->
+    #{
+        enable => emqx:get_config([delayed, enable], true),
+        max_delayed_messages => emqx:get_config([delayed, max_delayed_messages], 0)
+    }.
+
+update_config(Enable, MaxDelayedMessages) when MaxDelayedMessages >= 0 ->
+    case Enable =:= maps:get(enable, get_status()) of
+        true ->
+            update_config_error_response(Enable);
+        _ ->
+            update_config_(Enable, MaxDelayedMessages),
+            {200}
+    end;
+update_config(_Enable, _MaxDelayedMessages) ->
+    {400, #{code => ?BAD_REQUEST, message => <<"Max delayed must be equal or greater than 0">>}}.
+
+update_config_error_response(true) ->
+    {400, #{code => ?ALREADY_ENABLED, message => <<"Delayed message status is already enabled">>}};
+update_config_error_response(false) ->
+    {400, #{code => ?ALREADY_DISABLED, message => <<"Delayed message status is already disable">>}}.
+
+update_config_(Enable, MaxDelayedMessages) ->
     lists:foreach(fun(Node) ->
-        enable_delayed(Node, Enable)
+        update_config_(Node, Enable, MaxDelayedMessages)
     end, ekka_mnesia:running_nodes()).
 
-enable_delayed(Node, Enable) when Node =:= node() ->
+update_config_(Node, Enable, MaxDelayedMessages) when Node =:= node() ->
+    _ = emqx_delayed:update_config(Enable, MaxDelayedMessages),
+    ok = emqx_delayed:set_max_delayed_messages(MaxDelayedMessages),
     case Enable of
         true ->
             emqx_delayed:enable();
@@ -169,14 +219,11 @@ enable_delayed(Node, Enable) when Node =:= node() ->
             emqx_delayed:disable()
     end;
 
-enable_delayed(Node, Enable) ->
-    rpc_call(Node, ?MODULE, enable_delayed, [Node, Enable]).
+update_config_(Node, Enable, MaxDelayedMessages) ->
+    rpc_call(Node, ?MODULE, update_config_, [Node, Enable, MaxDelayedMessages]).
 
 rpc_call(Node, Module, Fun, Args) ->
     case rpc:call(Node, Module, Fun, Args) of
         {badrpc, Reason} -> {error, Reason};
         Result -> Result
     end.
-
-get_status() ->
-    emqx_config:get([delayed, enable], true).

+ 16 - 1
apps/emqx_modules/src/emqx_rewrite_api.erl

@@ -1,3 +1,18 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
 -module(emqx_rewrite_api).
 
 -behaviour(minirest_api).
@@ -44,7 +59,7 @@ rewrite_api() ->
         post => #{
             description => <<"Update topic rewrite">>,
             'requestBody' => emqx_mgmt_util:request_body_array_schema(topic_rewrite_schema()),
-            response => #{
+            responses => #{
                 <<"200">> =>
                     emqx_mgmt_util:response_schema(<<"Update topic rewrite success">>, topic_rewrite_schema()),
                 <<"413">> => emqx_mgmt_util:response_error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])}}},