| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_delayed_api).
- -behaviour(minirest_api).
- -include_lib("typerefl/include/types.hrl").
- -include_lib("hocon/include/hoconsc.hrl").
- -include("emqx_modules.hrl").
- -import(hoconsc, [mk/2, ref/1, ref/2]).
- -export([
- status/2,
- delayed_messages/2,
- delayed_message/2
- ]).
- -export([
- paths/0,
- fields/1,
- schema/1
- ]).
- %% for rpc
- -export([update_config_/1]).
- -export([api_spec/0]).
- -define(MAX_PAYLOAD_LENGTH, 2048).
- -define(PAYLOAD_TOO_LARGE, <<"PAYLOAD_TOO_LARGE">>).
- -define(INTERNAL_ERROR, 'INTERNAL_ERROR').
- -define(BAD_REQUEST, 'BAD_REQUEST').
- -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
- -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
- -define(INVALID_NODE, 'INVALID_NODE').
- api_spec() ->
- emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
- paths() ->
- [
- "/mqtt/delayed",
- "/mqtt/delayed/messages",
- "/mqtt/delayed/messages/:node/:msgid"
- ].
- schema("/mqtt/delayed") ->
- #{
- 'operationId' => status,
- get => #{
- tags => ?API_TAG_MQTT,
- description => ?DESC(view_status_api),
- responses => #{
- 200 => ref(emqx_modules_schema, "delayed")
- }
- },
- put => #{
- tags => ?API_TAG_MQTT,
- description => ?DESC(update_api),
- 'requestBody' => ref(emqx_modules_schema, "delayed"),
- responses => #{
- 200 => mk(
- ref(emqx_modules_schema, "delayed"),
- #{desc => ?DESC(update_success)}
- ),
- 400 => emqx_dashboard_swagger:error_codes(
- [?BAD_REQUEST],
- ?DESC(illegality_limit)
- )
- }
- }
- };
- schema("/mqtt/delayed/messages/:node/:msgid") ->
- #{
- 'operationId' => delayed_message,
- get => #{
- tags => ?API_TAG_MQTT,
- description => ?DESC(get_message_api),
- parameters => [
- {node,
- mk(
- binary(),
- #{in => path, desc => ?DESC(node)}
- )},
- {msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
- ],
- responses => #{
- 200 => ref("message_without_payload"),
- 400 => emqx_dashboard_swagger:error_codes(
- [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
- ?DESC(bad_msgid_format)
- ),
- 404 => emqx_dashboard_swagger:error_codes(
- [?MESSAGE_ID_NOT_FOUND],
- ?DESC(msgid_not_found)
- )
- }
- },
- delete => #{
- tags => ?API_TAG_MQTT,
- description => ?DESC(delete_api),
- parameters => [
- {node,
- mk(
- binary(),
- #{in => path, desc => ?DESC(node)}
- )},
- {msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
- ],
- responses => #{
- 204 => <<"Delete delayed message success">>,
- 400 => emqx_dashboard_swagger:error_codes(
- [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
- ?DESC(bad_msgid_format)
- ),
- 404 => emqx_dashboard_swagger:error_codes(
- [?MESSAGE_ID_NOT_FOUND],
- ?DESC(msgid_not_found)
- )
- }
- }
- };
- schema("/mqtt/delayed/messages") ->
- #{
- 'operationId' => delayed_messages,
- get => #{
- tags => ?API_TAG_MQTT,
- description => ?DESC(list_api),
- parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
- responses => #{
- 200 =>
- [
- {data, mk(hoconsc:array(ref("message")), #{})},
- {meta, [
- {page, mk(pos_integer(), #{desc => ?DESC(view_page)})},
- {limit, mk(pos_integer(), #{desc => ?DESC(view_limit)})},
- {count, mk(non_neg_integer(), #{desc => ?DESC(count)})}
- ]}
- ]
- }
- }
- }.
- fields("message_without_payload") ->
- [
- {msgid, mk(integer(), #{desc => ?DESC(msgid)})},
- {node, mk(binary(), #{desc => ?DESC(node)})},
- {publish_at, mk(binary(), #{desc => ?DESC(publish_at)})},
- {delayed_interval, mk(pos_integer(), #{desc => ?DESC(delayed_interval)})},
- {delayed_remaining, mk(non_neg_integer(), #{desc => ?DESC(delayed_remaining)})},
- {expected_at, mk(binary(), #{desc => ?DESC(expected_at)})},
- {topic, mk(binary(), #{desc => ?DESC(topic), example => <<"/sys/#">>})},
- {qos, mk(emqx_schema:qos(), #{desc => ?DESC(qos)})},
- {from_clientid, mk(binary(), #{desc => ?DESC(from_clientid)})},
- {from_username, mk(binary(), #{desc => ?DESC(from_username)})}
- ];
- fields("message") ->
- fields("message_without_payload") ++
- [{payload, mk(binary(), #{desc => ?DESC(payload)})}].
- %%--------------------------------------------------------------------
- %% HTTP API
- %%--------------------------------------------------------------------
- status(get, _Params) ->
- {200, get_status()};
- status(put, #{body := Body}) ->
- update_config(Body).
- delayed_messages(get, #{query_string := Qs}) ->
- {200, emqx_delayed:cluster_list(Qs)}.
- delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
- MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_existing_atom/1),
- MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
- with_maybe(
- [MaybeNode, MaybeId],
- fun(Node, Id) ->
- case emqx_delayed:get_delayed_message(Node, Id) of
- {ok, Message} ->
- Payload = maps:get(payload, Message),
- case erlang:byte_size(Payload) > ?MAX_PAYLOAD_LENGTH of
- true ->
- {200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
- _ ->
- {200, Message#{payload => base64:encode(Payload)}}
- end;
- {error, not_found} ->
- {404, generate_http_code_map(not_found, HexId)};
- {badrpc, _} ->
- {400, generate_http_code_map(invalid_node, NodeBin)}
- end
- end
- );
- delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
- MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
- MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
- with_maybe(
- [MaybeNode, MaybeId],
- fun(Node, Id) ->
- case emqx_delayed:delete_delayed_message(Node, Id) of
- ok ->
- {204};
- {error, not_found} ->
- {404, generate_http_code_map(not_found, Id)}
- end
- end
- ).
- %%--------------------------------------------------------------------
- %% internal function
- %%--------------------------------------------------------------------
- get_status() ->
- emqx_conf:get([delayed], #{}).
- update_config(Config) ->
- case generate_config(Config) of
- {ok, Config} ->
- update_config_(Config);
- {error, {Code, Message}} ->
- {400, #{code => Code, message => Message}}
- end.
- generate_config(Config) ->
- generate_config(Config, [fun generate_max_delayed_messages/1]).
- generate_config(Config, []) ->
- {ok, Config};
- generate_config(Config, [Fun | Tail]) ->
- case Fun(Config) of
- {ok, Config} ->
- generate_config(Config, Tail);
- {error, CodeMessage} ->
- {error, CodeMessage}
- end.
- generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
- {ok, Config};
- generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
- {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
- generate_max_delayed_messages(Config) ->
- {ok, Config}.
- update_config_(Config) ->
- case emqx_delayed:update_config(Config) of
- {ok, #{raw_config := NewDelayed}} ->
- {200, NewDelayed};
- {error, Reason} ->
- Message = list_to_binary(
- io_lib:format("Update config failed ~p", [Reason])
- ),
- {500, ?INTERNAL_ERROR, Message}
- end.
- generate_http_code_map(id_schema_error, Id) ->
- #{
- code => ?MESSAGE_ID_SCHEMA_ERROR,
- message =>
- iolist_to_binary(io_lib:format("Message ID ~s schema error", [Id]))
- };
- generate_http_code_map(not_found, Id) ->
- #{
- code => ?MESSAGE_ID_NOT_FOUND,
- message =>
- iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
- };
- generate_http_code_map(invalid_node, Node) ->
- #{
- code => ?INVALID_NODE,
- message =>
- iolist_to_binary(io_lib:format("The node name ~s is invalid", [Node]))
- }.
- make_maybe(X, Error, Fun) ->
- try Fun(X) of
- Right ->
- Right
- catch
- _:_ ->
- {left, X, Error}
- end.
- with_maybe(Maybes, Cont) ->
- with_maybe(Maybes, Cont, []).
- with_maybe([], Cont, Rights) ->
- erlang:apply(Cont, lists:reverse(Rights));
- with_maybe([{left, X, Error} | _], _Cont, _Rights) ->
- {400, generate_http_code_map(Error, X)};
- with_maybe([Right | T], Cont, Rights) ->
- with_maybe(T, Cont, [Right | Rights]).
|