|
|
@@ -28,13 +28,14 @@
|
|
|
|
|
|
-import(emqx_mgmt_api_configs, [gen_schema/1]).
|
|
|
-import(emqx_mgmt_util, [ object_array_schema/2
|
|
|
+ , object_schema/2
|
|
|
, schema/1
|
|
|
, schema/2
|
|
|
, error_schema/2
|
|
|
, page_params/0
|
|
|
, properties/1]).
|
|
|
|
|
|
--define(MAX_BASE64_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
|
|
|
+-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
|
|
|
|
|
|
api_spec() ->
|
|
|
{[lookup_retained_api(), with_topic_api(), config_api()], []}.
|
|
|
@@ -64,7 +65,7 @@ parameters() ->
|
|
|
lookup_retained_api() ->
|
|
|
Metadata = #{
|
|
|
get => #{
|
|
|
- description => <<"lookup matching messages">>,
|
|
|
+ description => <<"List retained messages">>,
|
|
|
parameters => page_params(),
|
|
|
responses => #{
|
|
|
<<"200">> => object_array_schema(
|
|
|
@@ -80,9 +81,10 @@ with_topic_api() ->
|
|
|
MetaData = #{
|
|
|
get => #{
|
|
|
description => <<"lookup matching messages">>,
|
|
|
- parameters => parameters() ++ page_params(),
|
|
|
+ parameters => parameters(),
|
|
|
responses => #{
|
|
|
- <<"200">> => object_array_schema(message_props(), <<"List retained messages">>),
|
|
|
+ <<"200">> => object_schema(message_props(), <<"List retained messages">>),
|
|
|
+ <<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']),
|
|
|
<<"405">> => schema(<<"NotAllowed">>)
|
|
|
}
|
|
|
},
|
|
|
@@ -139,35 +141,27 @@ config(put, #{body := Body}) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Interval Funcs
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-lookup_retained(get, Params) ->
|
|
|
- lookup(undefined, Params, fun format_message/1).
|
|
|
+lookup_retained(get, #{query_string := Qs}) ->
|
|
|
+ Page = maps:get(page, Qs, 1),
|
|
|
+ Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
|
|
|
+ {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
|
|
|
+ {200, [format_message(Msg) || Msg <- Msgs]}.
|
|
|
|
|
|
-with_topic(get, #{bindings := Bindings} = Params) ->
|
|
|
+with_topic(get, #{bindings := Bindings}) ->
|
|
|
Topic = maps:get(topic, Bindings),
|
|
|
- lookup(Topic, Params, fun format_detail_message/1);
|
|
|
+ {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1),
|
|
|
+ case Msgs of
|
|
|
+ [H | _] ->
|
|
|
+ {200, format_detail_message(H)};
|
|
|
+ _ ->
|
|
|
+ {404, #{code => 'NOT_FOUND'}}
|
|
|
+ end;
|
|
|
|
|
|
with_topic(delete, #{bindings := Bindings}) ->
|
|
|
Topic = maps:get(topic, Bindings),
|
|
|
emqx_retainer_mnesia:delete_message(undefined, Topic),
|
|
|
{204}.
|
|
|
|
|
|
--spec lookup(undefined | binary(),
|
|
|
- map(),
|
|
|
- fun((emqx_types:message()) -> map())) ->
|
|
|
- {200, map()}.
|
|
|
-lookup(Topic, #{query_string := Qs}, Formatter) ->
|
|
|
- Page = maps:get(page, Qs, 1),
|
|
|
- Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
|
|
|
- {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
|
|
|
- {200, format_message(Msgs, Formatter)}.
|
|
|
-
|
|
|
-
|
|
|
-format_message(Messages, Formatter) when is_list(Messages)->
|
|
|
- [Formatter(Message) || Message <- Messages];
|
|
|
-
|
|
|
-format_message(Message, Formatter) ->
|
|
|
- Formatter(Message).
|
|
|
-
|
|
|
format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
|
|
|
, timestamp = Timestamp, headers = Headers}) ->
|
|
|
#{msgid => emqx_guid:to_hexstr(ID),
|
|
|
@@ -181,12 +175,11 @@ format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
|
|
|
|
|
|
format_detail_message(#message{payload = Payload} = Msg) ->
|
|
|
Base = format_message(Msg),
|
|
|
- EncodePayload = base64:encode(Payload),
|
|
|
- case erlang:byte_size(EncodePayload) =< ?MAX_BASE64_PAYLOAD_SIZE of
|
|
|
+ case erlang:byte_size(Payload) =< ?MAX_PAYLOAD_SIZE of
|
|
|
true ->
|
|
|
- Base#{payload => EncodePayload};
|
|
|
+ Base#{payload => base64:encode(Payload)};
|
|
|
_ ->
|
|
|
- Base#{payload => base64:encode(<<"PAYLOAD_TOO_LARGE">>)}
|
|
|
+ Base
|
|
|
end.
|
|
|
|
|
|
to_bin_string(Data) when is_binary(Data) ->
|