Просмотр исходного кода

fix: params & config generate; api base64 encode payload

DDDHuang 4 лет назад
Родитель
Сommit
cc1b8eca6c
2 измененных файлов с 75 добавлено и 41 удалено
  1. 4 8
      apps/emqx_modules/src/emqx_delayed.erl
  2. 71 33
      apps/emqx_modules/src/emqx_delayed_api.erl

+ 4 - 8
apps/emqx_modules/src/emqx_delayed.erl

@@ -44,7 +44,7 @@
 -export([ enable/0
         , disable/0
         , set_max_delayed_messages/1
-        , update_config/2
+        , update_config/1
         , list/1
         , get_delayed_message/1
         , delete_delayed_message/1
@@ -146,7 +146,7 @@ format_delayed(#delayed_message{key = {TimeStamp, Id},
     },
     case WithPayload of
         true ->
-            Result#{payload => Payload};
+            Result#{payload => base64:encode(Payload)};
         _ ->
             Result
     end.
@@ -172,12 +172,8 @@ delete_delayed_message(Id0) ->
             Timestamp = hd(Rows),
             ekka_mnesia:dirty_delete(?TAB, {Timestamp, Id})
     end.
-
-update_config(Enable, MaxDelayedMessages) ->
-    Opts0 = emqx_config:get_raw([<<"delayed">>], #{}),
-    Opts1 = maps:put(<<"enable">>, Enable, Opts0),
-    Opts = maps:put(<<"max_delayed_messages">>, MaxDelayedMessages, Opts1),
-    {ok, _} = emqx:update_config([delayed], Opts).
+update_config(Config) ->
+    {ok, _} = emqx:update_config([delayed], Config).
 
 %%--------------------------------------------------------------------
 %% gen_server callback

+ 71 - 33
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -25,13 +25,16 @@
                         , request_body_schema/1
                         ]).
 
+-define(MAX_PAYLOAD_LENGTH, 2048).
+-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
+
 -export([ status/2
         , delayed_messages/2
         , delayed_message/2
         ]).
 
 %% for rpc
--export([update_config_/2]).
+-export([update_config_/1]).
 
 -export([api_spec/0]).
 
@@ -40,7 +43,7 @@
 
 -define(BAD_REQUEST, 'BAD_REQUEST').
 
--define(MESSAGE_ID_NOT_FOUND, 'ALREADY_DISABLED').
+-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
 
 api_spec() ->
     {
@@ -66,6 +69,9 @@ delayed_schema(WithPayload) ->
     end.
 
 delayed_message_properties() ->
+    PayloadDesc = list_to_binary(
+        io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
+            [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH])),
     #{
         id => #{
             type => integer,
@@ -82,7 +88,7 @@ delayed_message_properties() ->
             description => <<"Qos">>},
         payload => #{
             type => string,
-            description => <<"Payload">>},
+            description => PayloadDesc},
         form_clientid => #{
             type => string,
             description => <<"Client ID">>},
@@ -110,7 +116,7 @@ status_api() ->
             'requestBody' => request_body_schema(Schema),
             responses => #{
                 <<"200">> =>
-                    response_schema(<<"Enable or disable delayed successfully">>),
+                    response_schema(<<"Enable or disable delayed successfully">>, Schema),
                 <<"400">> =>
                     response_error_schema(<<"Already disabled or enabled">>, [?ALREADY_ENABLED, ?ALREADY_DISABLED])}}},
     {"/mqtt/delayed_messages/status", Metadata, status}.
@@ -156,10 +162,8 @@ status(get, _Request) ->
 
 status(put, Request) ->
     {ok, Body, _} = cowboy_req:read_body(Request),
-    Params = emqx_json:decode(Body, [return_maps]),
-    Enable = maps:get(<<"enable">>, Params),
-    MaxDelayedMessages = maps:get(<<"max_delayed_messages">>, Params),
-    update_config(Enable, MaxDelayedMessages).
+    Config = emqx_json:decode(Body, [return_maps]),
+    update_config(Config).
 
 delayed_messages(get, Request) ->
     Qs = cowboy_req:parse_qs(Request),
@@ -169,7 +173,13 @@ delayed_message(get, Request) ->
     Id = cowboy_req:binding(id, Request),
     case emqx_delayed:get_delayed_message(Id) of
         {ok, Message} ->
-            {200, Message};
+            Payload = maps:get(payload, Message),
+            case size(Payload) > ?MAX_PAYLOAD_LENGTH of
+                true ->
+                    {200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
+                _ ->
+                    {200, Message#{payload => base64:encode(Payload)}}
+            end;
         {error, not_found} ->
             Message = list_to_binary(io_lib:format("Message ID ~p not found", [Id])),
             {404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}}
@@ -183,44 +193,72 @@ delayed_message(delete, Request) ->
 %% internal function
 %%--------------------------------------------------------------------
 get_status() ->
-    #{
-        enable => emqx:get_config([delayed, enable], true),
-        max_delayed_messages => emqx:get_config([delayed, max_delayed_messages], 0)
-    }.
+    emqx:get_config([delayed], #{}).
 
-update_config(Enable, MaxDelayedMessages) when MaxDelayedMessages >= 0 ->
-    case Enable =:= maps:get(enable, get_status()) of
-        true ->
-            update_config_error_response(Enable);
+update_config(Config) ->
+    case generate_config(Config) of
+        {ok, Config} ->
+            update_config_(Config),
+            {200, get_status()};
+        {error, {Code, Message}} ->
+            {400, #{code => Code, message => Message}}
+    end.
+generate_config(Config) ->
+    generate_config(Config, [fun generate_enable/1, fun generate_max_delayed_messages/1]).
+
+generate_config(Config, []) ->
+    {ok, Config};
+generate_config(Config, [Fun | Tail]) ->
+    case Fun(Config) of
+        {ok, Config} ->
+            generate_config(Config, Tail);
+        {error, CodeMessage} ->
+            {error, CodeMessage}
+    end.
+
+generate_enable(Config = #{<<"enable">> := Enable}) ->
+    case {Enable =:= maps:get(enable, get_status()), Enable} of
+        {true, true} ->
+            {error, {?ALREADY_ENABLED, <<"Delayed message status is already enabled">>}};
+        {true, false} ->
+            {error, {?ALREADY_DISABLED, <<"Delayed message status is already disable">>}};
         _ ->
-            update_config_(Enable, MaxDelayedMessages),
-            {200}
+            {ok, Config}
     end;
-update_config(_Enable, _MaxDelayedMessages) ->
-    {400, #{code => ?BAD_REQUEST, message => <<"Max delayed must be equal or greater than 0">>}}.
+generate_enable(Config) ->
+    {ok, Config}.
 
-update_config_error_response(true) ->
-    {400, #{code => ?ALREADY_ENABLED, message => <<"Delayed message status is already enabled">>}};
-update_config_error_response(false) ->
-    {400, #{code => ?ALREADY_DISABLED, message => <<"Delayed message status is already disable">>}}.
+generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
+    {ok, Config};
+generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
+    {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
+generate_max_delayed_messages(Config) ->
+    {ok, Config}.
 
-update_config_(Enable, MaxDelayedMessages) ->
+update_config_(Config) ->
     lists:foreach(fun(Node) ->
-        update_config_(Node, Enable, MaxDelayedMessages)
+        update_config_(Node, Config)
     end, ekka_mnesia:running_nodes()).
 
-update_config_(Node, Enable, MaxDelayedMessages) when Node =:= node() ->
-    _ = emqx_delayed:update_config(Enable, MaxDelayedMessages),
-    ok = emqx_delayed:set_max_delayed_messages(MaxDelayedMessages),
-    case Enable of
+update_config_(Node, Config) when Node =:= node() ->
+    _ = emqx_delayed:update_config(Config),
+    case maps:get(<<"enable">>, Config, undefined) of
+        undefined ->
+            ignore;
         true ->
             emqx_delayed:enable();
         false ->
             emqx_delayed:disable()
+    end,
+    case maps:get(<<"max_delayed_messages">>, Config, undefined) of
+        undefined ->
+            ignore;
+        Max ->
+            ok = emqx_delayed:set_max_delayed_messages(Max)
     end;
 
-update_config_(Node, Enable, MaxDelayedMessages) ->
-    rpc_call(Node, ?MODULE, update_config_, [Node, Enable, MaxDelayedMessages]).
+update_config_(Node, Config) ->
+    rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
 
 rpc_call(Node, Module, Fun, Args) ->
     case rpc:call(Node, Module, Fun, Args) of