|
|
@@ -46,6 +46,7 @@
|
|
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
|
|
|
|
|
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
|
|
|
+-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
|
|
|
|
|
|
api_spec() ->
|
|
|
{
|
|
|
@@ -117,14 +118,17 @@ delayed_message_api() ->
|
|
|
description => <<"Get delayed message">>,
|
|
|
parameters => parameters(),
|
|
|
responses => #{
|
|
|
- <<"200">> => object_schema(maps:without([payload], properties()), <<"Get delayed message success">>),
|
|
|
- <<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND])
|
|
|
+ <<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]),
|
|
|
+ <<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]),
|
|
|
+ <<"200">> => object_schema(maps:without([payload], properties()), <<"Get delayed message success">>)
|
|
|
}
|
|
|
},
|
|
|
delete => #{
|
|
|
description => <<"Delete delayed message">>,
|
|
|
parameters => parameters(),
|
|
|
responses => #{
|
|
|
+ <<"400">> => error_schema(<<"Message ID Schema error">>, [?MESSAGE_ID_SCHEMA_ERROR]),
|
|
|
+ <<"404">> => error_schema(<<"Message ID not found">>, [?MESSAGE_ID_NOT_FOUND]),
|
|
|
<<"200">> => schema(<<"Delete delayed message success">>)
|
|
|
}
|
|
|
}
|
|
|
@@ -153,13 +157,21 @@ delayed_message(get, #{bindings := #{msgid := Id}}) ->
|
|
|
_ ->
|
|
|
{200, Message#{payload => base64:encode(Payload)}}
|
|
|
end;
|
|
|
+ {error, id_schema_error} ->
|
|
|
+ {400, generate_http_code_map(id_schema_error, Id)};
|
|
|
{error, not_found} ->
|
|
|
- Message = iolist_to_binary(io_lib:format("Message ID ~p not found", [Id])),
|
|
|
- {404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}}
|
|
|
+ {404, generate_http_code_map(not_found, Id)}
|
|
|
end;
|
|
|
delayed_message(delete, #{bindings := #{msgid := Id}}) ->
|
|
|
- _ = emqx_delayed:delete_delayed_message(Id),
|
|
|
- {200}.
|
|
|
+ case emqx_delayed:get_delayed_message(Id) of
|
|
|
+ {ok, _Message} ->
|
|
|
+ _ = emqx_delayed:delete_delayed_message(Id),
|
|
|
+ {200};
|
|
|
+ {error, id_schema_error} ->
|
|
|
+ {400, generate_http_code_map(id_schema_error, Id)};
|
|
|
+ {error, not_found} ->
|
|
|
+ {404, generate_http_code_map(not_found, Id)}
|
|
|
+ end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% internal function
|
|
|
@@ -220,6 +232,11 @@ update_config_(Node, Config) when Node =:= node() ->
|
|
|
update_config_(Node, Config) ->
|
|
|
rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
|
|
|
|
|
|
+generate_http_code_map(id_schema_error, Id) ->
|
|
|
+ #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))};
|
|
|
+generate_http_code_map(not_found, Id) ->
|
|
|
+ #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
|
|
|
+
|
|
|
rpc_call(Node, Module, Fun, Args) ->
|
|
|
case rpc:call(Node, Module, Fun, Args) of
|
|
|
{badrpc, Reason} -> {error, Reason};
|