Преглед изворни кода

Merge pull request #5511 from DDDHuang/delayed_api

feat: add delayed api
DDDHuang пре 4 година
родитељ
комит
65563e9f8c

+ 3 - 1
apps/emqx_management/src/emqx_mgmt_util.erl

@@ -140,6 +140,8 @@ response_error_schema(Description, Enum) ->
 response_page_schema(Def) when is_atom(Def) ->
     response_page_schema(atom_to_binary(Def, utf8));
 response_page_schema(Def) when is_binary(Def) ->
+    response_page_schema(minirest:ref(Def));
+response_page_schema(ItemSchema) when is_map(ItemSchema) ->
     Schema = #{
         type => object,
         properties => #{
@@ -154,7 +156,7 @@ response_page_schema(Def) when is_binary(Def) ->
                         type => integer}}},
             data => #{
                 type => array,
-                items => minirest:ref(Def)}}},
+                items => ItemSchema}}},
     json_content_schema("", Schema).
 
 response_batch_schema(DefName) when is_atom(DefName) ->

+ 1 - 0
apps/emqx_modules/etc/emqx_modules.conf

@@ -1,5 +1,6 @@
 delayed: {
     enable: true
+    ## 0 is no limit
     max_delayed_messages: 0
 }
 

+ 62 - 1
apps/emqx_modules/src/emqx_delayed.erl

@@ -21,7 +21,6 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 
-
 %% Mnesia bootstrap
 -export([mnesia/1]).
 
@@ -44,6 +43,11 @@
 %% gen_server callbacks
 -export([ enable/0
         , disable/0
+        , set_max_delayed_messages/1
+        , update_config/1
+        , list/1
+        , get_delayed_message/1
+        , delete_delayed_message/1
         ]).
 
 -record(delayed_message, {key, msg}).
@@ -117,6 +121,60 @@ enable() ->
 disable() ->
     gen_server:call(?SERVER, disable).
 
+set_max_delayed_messages(Max) ->
+    gen_server:call(?SERVER, {set_max_delayed_messages, Max}).
+
+list(Params) ->
+    emqx_mgmt_api:paginate(?TAB, Params, fun format_delayed/1).
+
+format_delayed(Delayed) ->
+    format_delayed(Delayed, false).
+
+format_delayed(#delayed_message{key = {TimeStamp, Id},
+            msg = #message{topic = Topic,
+                           from = From,
+                           headers = #{username := Username},
+                           qos = Qos,
+                           payload = Payload}}, WithPayload) ->
+    Result = #{
+        id => emqx_guid:to_hexstr(Id),
+        publish_time => list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, second}])),
+        topic => Topic,
+        qos => Qos,
+        from_clientid => From,
+        from_username => Username
+    },
+    case WithPayload of
+        true ->
+            Result#{payload => base64:encode(Payload)};
+        _ ->
+            Result
+    end.
+
+get_delayed_message(Id0) ->
+    Id = emqx_guid:from_hexstr(Id0),
+    Ms = [{{delayed_message,{'_',Id},'_'},[],['$_']}],
+    case ets:select(?TAB, Ms) of
+        [] ->
+            {error, not_found};
+        Rows ->
+            Message = hd(Rows),
+            {ok, format_delayed(Message, true)}
+    end.
+
+delete_delayed_message(Id0) ->
+    Id = emqx_guid:from_hexstr(Id0),
+    Ms = [{{delayed_message, {'$1', Id}, '_'}, [], ['$1']}],
+    case ets:select(?TAB, Ms) of
+        [] ->
+            {error, not_found};
+        Rows ->
+            Timestamp = hd(Rows),
+            ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id})
+    end.
+update_config(Config) ->
+    {ok, _} = emqx:update_config([delayed], Config).
+
 %%--------------------------------------------------------------------
 %% gen_server callback
 %%--------------------------------------------------------------------
@@ -128,6 +186,9 @@ init([Opts]) ->
                                   publish_at => 0,
                                   max_delayed_messages => MaxDelayedMessages}))}.
 
+handle_call({set_max_delayed_messages, Max}, _From, State) ->
+    {reply, ok, State#{max_delayed_messages => Max}};
+
 handle_call({store, DelayedMsg = #delayed_message{key = Key}},
             _From, State = #{max_delayed_messages := 0}) ->
     ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),

+ 176 - 91
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -20,110 +20,139 @@
 
 -import(emqx_mgmt_util, [ response_schema/1
                         , response_schema/2
+                        , response_error_schema/2
+                        , response_page_schema/1
                         , request_body_schema/1
                         ]).
 
-% -export([cli/1]).
+-define(MAX_PAYLOAD_LENGTH, 2048).
+-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
 
 -export([ status/2
         , delayed_messages/2
-        , delete_delayed_message/2
+        , delayed_message/2
         ]).
 
--export([enable_delayed/2]).
+%% for rpc
+-export([update_config_/1]).
 
 -export([api_spec/0]).
 
+-define(ALREADY_ENABLED, 'ALREADY_ENABLED').
+-define(ALREADY_DISABLED, 'ALREADY_DISABLED').
+
+-define(BAD_REQUEST, 'BAD_REQUEST').
+
+-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
+
 api_spec() ->
-    {[status(), delayed_messages(), delete_delayed_message()],
-     [delayed_message_schema()]}.
+    {
+        [status_api(), delayed_messages_api(), delayed_message_api()],
+        []
+    }.
 
+delayed_schema() ->
+    delayed_schema(false).
 
-delayed_message_schema() ->
-    #{broker_info => #{
-        type => object,
-        properties => #{
-            msgid => #{
-                type => string,
-                description => <<"Message Id">>
+delayed_schema(WithPayload) ->
+    case WithPayload of
+        true ->
+            #{
+                type => object,
+                properties => delayed_message_properties()
+            };
+        _ ->
+            #{
+                type => object,
+                properties => maps:without([payload], delayed_message_properties())
             }
-        }
-    }}.
+    end.
+
+delayed_message_properties() ->
+    PayloadDesc = list_to_binary(
+        io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
+            [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH])),
+    #{
+        id => #{
+            type => integer,
+            description => <<"Message Id (MQTT message id hash)">>},
+        publish_time => #{
+            type => string,
+            description => <<"publish time, rfc 3339">>},
+        topic => #{
+            type => string,
+            description => <<"Topic">>},
+        qos => #{
+            type => integer,
+            enum => [0, 1, 2],
+            description => <<"Qos">>},
+        payload => #{
+            type => string,
+            description => PayloadDesc},
+        form_clientid => #{
+            type => string,
+            description => <<"Client ID">>},
+        form_username => #{
+            type => string,
+            description => <<"Username">>}
+    }.
 
-status() ->
+status_api() ->
+    Schema = #{
+        type => object,
+        properties => #{
+            enable => #{
+                type => boolean},
+            max_delayed_messages => #{
+                type => integer,
+                description => <<"Max limit, 0 is no limit">>}}},
     Metadata = #{
         get => #{
             description => "Get delayed status",
             responses => #{
-                <<"200">> => response_schema(<<"Bad Request">>,
-                    #{
-                        type => object,
-                        properties => #{enable => #{type => boolean}}
-                    }
-                )
-            }
-        },
+                <<"200">> => response_schema(<<"Bad Request">>, Schema)}},
         put => #{
-            description => "Enable or disbale delayed",
-            'requestBody' => request_body_schema(#{
-                type => object,
-                properties => #{
-                    enable => #{
-                        type => boolean
-                    }
-                }
-            }),
+            description => "Enable or disable delayed, set max delayed messages",
+            'requestBody' => request_body_schema(Schema),
             responses => #{
                 <<"200">> =>
-                    response_schema(<<"Enable or disbale delayed successfully">>),
+                    response_schema(<<"Enable or disable delayed successfully">>, Schema),
                 <<"400">> =>
-                    response_schema(<<"Bad Request">>,
-                        #{
-                            type => object,
-                            properties => #{
-                                message => #{type => string},
-                                code => #{type => string}
-                            }
-                        }
-                    )
-            }
-        }
-    },
-    {"/delayed/status", Metadata, status}.
+                    response_error_schema(<<"Already disabled or enabled">>, [?ALREADY_ENABLED, ?ALREADY_DISABLED])}}},
+    {"/mqtt/delayed_messages/status", Metadata, status}.
 
-delayed_messages() ->
+delayed_messages_api() ->
     Metadata = #{
         get => #{
-            description => "Get delayed message list",
+            description => "List delayed messages",
             responses => #{
-                <<"200">> => emqx_mgmt_util:response_array_schema(<<>>, delayed_message)
-            }
-        }
-    },
-    {"/delayed/messages", Metadata, delayed_messages}.
+                <<"200">> => response_page_schema(delayed_schema())}}},
+    {"/mqtt/delayed_messages", Metadata, delayed_messages}.
 
-delete_delayed_message() ->
+delayed_message_api() ->
     Metadata = #{
+        get => #{
+            description => "Get delayed message",
+            parameters => [#{
+                name => id,
+                in => path,
+                schema => #{type => string},
+                required => true
+            }],
+            responses => #{
+                <<"200">> => response_schema(<<"Get delayed message success">>, delayed_schema(true)),
+                <<"404">> => response_error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND])}},
         delete => #{
             description => "Delete delayed message",
             parameters => [#{
-                name => msgid,
+                name => id,
                 in => path,
                 schema => #{type => string},
                 required => true
             }],
             responses => #{
-                <<"200">> => response_schema(<<"Bad Request">>,
-                    #{
-                        type => object,
-                        properties => #{enable => #{type => boolean}}
-                    }
-                )
-            }
-        }
-    },
-    {"/delayed/messages/:msgid", Metadata, delete_delayed_message}.
-
+                <<"200">> => response_schema(<<"Delete delayed message success">>)}}},
+    {"/mqtt/delayed_messages/:id", Metadata, delayed_message}.
 
 %%--------------------------------------------------------------------
 %% HTTP API
@@ -133,50 +162,106 @@ status(get, _Request) ->
 
 status(put, Request) ->
     {ok, Body, _} = cowboy_req:read_body(Request),
-    Params = emqx_json:decode(Body, [return_maps]),
-    Enable = maps:get(<<"enable">>, Params),
-    case Enable =:= get_status() of
-        true ->
-            Reason = case Enable of
-                true -> <<"Telemetry status is already enabled">>;
-                false -> <<"Telemetry status is already disable">>
-            end,
-            {400, #{code => "BAD_REQUEST", message => Reason}};
-        false ->
-            enable_delayed(Enable),
-             {200}
-    end.
+    Config = emqx_json:decode(Body, [return_maps]),
+    update_config(Config).
 
-delayed_messages(get, _Request) ->
-    {200, []}.
+delayed_messages(get, Request) ->
+    Qs = cowboy_req:parse_qs(Request),
+    {200, emqx_delayed:list(Qs)}.
 
-delete_delayed_message(delete, _Request) ->
+delayed_message(get, Request) ->
+    Id = cowboy_req:binding(id, Request),
+    case emqx_delayed:get_delayed_message(Id) of
+        {ok, Message} ->
+            Payload = maps:get(payload, Message),
+            case size(Payload) > ?MAX_PAYLOAD_LENGTH of
+                true ->
+                    {200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
+                _ ->
+                    {200, Message#{payload => base64:encode(Payload)}}
+            end;
+        {error, not_found} ->
+            Message = list_to_binary(io_lib:format("Message ID ~p not found", [Id])),
+            {404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}}
+    end;
+delayed_message(delete, Request) ->
+    Id = cowboy_req:binding(id, Request),
+    _ = emqx_delayed:delete_delayed_message(Id),
     {200}.
 
 %%--------------------------------------------------------------------
 %% internal function
 %%--------------------------------------------------------------------
-enable_delayed(Enable) ->
+get_status() ->
+    emqx:get_config([delayed], #{}).
+
+update_config(Config) ->
+    case generate_config(Config) of
+        {ok, Config} ->
+            update_config_(Config),
+            {200, get_status()};
+        {error, {Code, Message}} ->
+            {400, #{code => Code, message => Message}}
+    end.
+generate_config(Config) ->
+    generate_config(Config, [fun generate_enable/1, fun generate_max_delayed_messages/1]).
+
+generate_config(Config, []) ->
+    {ok, Config};
+generate_config(Config, [Fun | Tail]) ->
+    case Fun(Config) of
+        {ok, Config} ->
+            generate_config(Config, Tail);
+        {error, CodeMessage} ->
+            {error, CodeMessage}
+    end.
+
+generate_enable(Config = #{<<"enable">> := Enable}) ->
+    case {Enable =:= maps:get(enable, get_status()), Enable} of
+        {true, true} ->
+            {error, {?ALREADY_ENABLED, <<"Delayed message status is already enabled">>}};
+        {true, false} ->
+            {error, {?ALREADY_DISABLED, <<"Delayed message status is already disable">>}};
+        _ ->
+            {ok, Config}
+    end;
+generate_enable(Config) ->
+    {ok, Config}.
+
+generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
+    {ok, Config};
+generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
+    {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
+generate_max_delayed_messages(Config) ->
+    {ok, Config}.
+
+update_config_(Config) ->
     lists:foreach(fun(Node) ->
-        enable_delayed(Node, Enable)
+        update_config_(Node, Config)
     end, ekka_mnesia:running_nodes()).
 
-enable_delayed(Node, Enable) when Node =:= node() ->
-    case Enable of
+update_config_(Node, Config) when Node =:= node() ->
+    _ = emqx_delayed:update_config(Config),
+    case maps:get(<<"enable">>, Config, undefined) of
+        undefined ->
+            ignore;
         true ->
             emqx_delayed:enable();
         false ->
             emqx_delayed:disable()
+    end,
+    case maps:get(<<"max_delayed_messages">>, Config, undefined) of
+        undefined ->
+            ignore;
+        Max ->
+            ok = emqx_delayed:set_max_delayed_messages(Max)
     end;
 
-enable_delayed(Node, Enable) ->
-    rpc_call(Node, ?MODULE, enable_delayed, [Node, Enable]).
+update_config_(Node, Config) ->
+    rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
 
 rpc_call(Node, Module, Fun, Args) ->
     case rpc:call(Node, Module, Fun, Args) of
         {badrpc, Reason} -> {error, Reason};
         Result -> Result
     end.
-
-get_status() ->
-    emqx:get_config([delayed, enable], true).

+ 16 - 1
apps/emqx_modules/src/emqx_rewrite_api.erl

@@ -1,3 +1,18 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
 -module(emqx_rewrite_api).
 
 -behaviour(minirest_api).
@@ -44,7 +59,7 @@ rewrite_api() ->
         post => #{
             description => <<"Update topic rewrite">>,
             'requestBody' => emqx_mgmt_util:request_body_array_schema(topic_rewrite_schema()),
-            response => #{
+            responses => #{
                 <<"200">> =>
                     emqx_mgmt_util:response_schema(<<"Update topic rewrite success">>, topic_rewrite_schema()),
                 <<"413">> => emqx_mgmt_util:response_error_schema(<<"Rules count exceed max limit">>, [?EXCEED_LIMIT])}}},