emqx_delayed_api.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_delayed_api).
  17. -behaviour(minirest_api).
  18. -include_lib("typerefl/include/types.hrl").
  19. -include_lib("hocon/include/hoconsc.hrl").
  20. -include("emqx_modules.hrl").
  21. -import(hoconsc, [mk/2, ref/1, ref/2]).
  22. -export([
  23. status/2,
  24. delayed_messages/2,
  25. delayed_message/2
  26. ]).
  27. -export([
  28. paths/0,
  29. fields/1,
  30. schema/1
  31. ]).
  32. %% for rpc
  33. -export([update_config_/1]).
  34. -export([api_spec/0]).
  35. -define(MAX_PAYLOAD_LENGTH, 2048).
  36. -define(PAYLOAD_TOO_LARGE, <<"PAYLOAD_TOO_LARGE">>).
  37. -define(INTERNAL_ERROR, 'INTERNAL_ERROR').
  38. -define(BAD_REQUEST, 'BAD_REQUEST').
  39. -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
  40. -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
  41. -define(INVALID_NODE, 'INVALID_NODE').
  42. api_spec() ->
  43. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
  44. paths() ->
  45. [
  46. "/mqtt/delayed",
  47. "/mqtt/delayed/messages",
  48. "/mqtt/delayed/messages/:node/:msgid"
  49. ].
  50. schema("/mqtt/delayed") ->
  51. #{
  52. 'operationId' => status,
  53. get => #{
  54. tags => ?API_TAG_MQTT,
  55. description => ?DESC(view_status_api),
  56. responses => #{
  57. 200 => ref(emqx_modules_schema, "delayed")
  58. }
  59. },
  60. put => #{
  61. tags => ?API_TAG_MQTT,
  62. description => ?DESC(update_api),
  63. 'requestBody' => ref(emqx_modules_schema, "delayed"),
  64. responses => #{
  65. 200 => mk(
  66. ref(emqx_modules_schema, "delayed"),
  67. #{desc => ?DESC(update_success)}
  68. ),
  69. 400 => emqx_dashboard_swagger:error_codes(
  70. [?BAD_REQUEST],
  71. ?DESC(illegality_limit)
  72. )
  73. }
  74. }
  75. };
  76. schema("/mqtt/delayed/messages/:node/:msgid") ->
  77. #{
  78. 'operationId' => delayed_message,
  79. get => #{
  80. tags => ?API_TAG_MQTT,
  81. description => ?DESC(get_message_api),
  82. parameters => [
  83. {node,
  84. mk(
  85. binary(),
  86. #{in => path, desc => ?DESC(node)}
  87. )},
  88. {msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
  89. ],
  90. responses => #{
  91. 200 => ref("message_without_payload"),
  92. 400 => emqx_dashboard_swagger:error_codes(
  93. [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
  94. ?DESC(bad_msgid_format)
  95. ),
  96. 404 => emqx_dashboard_swagger:error_codes(
  97. [?MESSAGE_ID_NOT_FOUND],
  98. ?DESC(msgid_not_found)
  99. )
  100. }
  101. },
  102. delete => #{
  103. tags => ?API_TAG_MQTT,
  104. description => ?DESC(delete_api),
  105. parameters => [
  106. {node,
  107. mk(
  108. binary(),
  109. #{in => path, desc => ?DESC(node)}
  110. )},
  111. {msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
  112. ],
  113. responses => #{
  114. 204 => <<"Delete delayed message success">>,
  115. 400 => emqx_dashboard_swagger:error_codes(
  116. [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
  117. ?DESC(bad_msgid_format)
  118. ),
  119. 404 => emqx_dashboard_swagger:error_codes(
  120. [?MESSAGE_ID_NOT_FOUND],
  121. ?DESC(msgid_not_found)
  122. )
  123. }
  124. }
  125. };
  126. schema("/mqtt/delayed/messages") ->
  127. #{
  128. 'operationId' => delayed_messages,
  129. get => #{
  130. tags => ?API_TAG_MQTT,
  131. description => ?DESC(list_api),
  132. parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
  133. responses => #{
  134. 200 =>
  135. [
  136. {data, mk(hoconsc:array(ref("message")), #{})},
  137. {meta, [
  138. {page, mk(pos_integer(), #{desc => ?DESC(view_page)})},
  139. {limit, mk(pos_integer(), #{desc => ?DESC(view_limit)})},
  140. {count, mk(non_neg_integer(), #{desc => ?DESC(count)})}
  141. ]}
  142. ]
  143. }
  144. }
  145. }.
  146. fields("message_without_payload") ->
  147. [
  148. {msgid, mk(integer(), #{desc => ?DESC(msgid)})},
  149. {node, mk(binary(), #{desc => ?DESC(node)})},
  150. {publish_at, mk(binary(), #{desc => ?DESC(publish_at)})},
  151. {delayed_interval, mk(pos_integer(), #{desc => ?DESC(delayed_interval)})},
  152. {delayed_remaining, mk(non_neg_integer(), #{desc => ?DESC(delayed_remaining)})},
  153. {expected_at, mk(binary(), #{desc => ?DESC(expected_at)})},
  154. {topic, mk(binary(), #{desc => ?DESC(topic), example => <<"/sys/#">>})},
  155. {qos, mk(emqx_schema:qos(), #{desc => ?DESC(qos)})},
  156. {from_clientid, mk(binary(), #{desc => ?DESC(from_clientid)})},
  157. {from_username, mk(binary(), #{desc => ?DESC(from_username)})}
  158. ];
  159. fields("message") ->
  160. fields("message_without_payload") ++
  161. [{payload, mk(binary(), #{desc => ?DESC(payload)})}].
  162. %%--------------------------------------------------------------------
  163. %% HTTP API
  164. %%--------------------------------------------------------------------
  165. status(get, _Params) ->
  166. {200, get_status()};
  167. status(put, #{body := Body}) ->
  168. update_config(Body).
  169. delayed_messages(get, #{query_string := Qs}) ->
  170. {200, emqx_delayed:cluster_list(Qs)}.
  171. delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
  172. MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_existing_atom/1),
  173. MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
  174. with_maybe(
  175. [MaybeNode, MaybeId],
  176. fun(Node, Id) ->
  177. case emqx_delayed:get_delayed_message(Node, Id) of
  178. {ok, Message} ->
  179. Payload = maps:get(payload, Message),
  180. case erlang:byte_size(Payload) > ?MAX_PAYLOAD_LENGTH of
  181. true ->
  182. {200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
  183. _ ->
  184. {200, Message#{payload => base64:encode(Payload)}}
  185. end;
  186. {error, not_found} ->
  187. {404, generate_http_code_map(not_found, HexId)};
  188. {badrpc, _} ->
  189. {400, generate_http_code_map(invalid_node, NodeBin)}
  190. end
  191. end
  192. );
  193. delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
  194. MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
  195. MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
  196. with_maybe(
  197. [MaybeNode, MaybeId],
  198. fun(Node, Id) ->
  199. case emqx_delayed:delete_delayed_message(Node, Id) of
  200. ok ->
  201. {204};
  202. {error, not_found} ->
  203. {404, generate_http_code_map(not_found, Id)}
  204. end
  205. end
  206. ).
  207. %%--------------------------------------------------------------------
  208. %% internal function
  209. %%--------------------------------------------------------------------
  210. get_status() ->
  211. emqx_conf:get([delayed], #{}).
  212. update_config(Config) ->
  213. case generate_config(Config) of
  214. {ok, Config} ->
  215. update_config_(Config);
  216. {error, {Code, Message}} ->
  217. {400, #{code => Code, message => Message}}
  218. end.
  219. generate_config(Config) ->
  220. generate_config(Config, [fun generate_max_delayed_messages/1]).
  221. generate_config(Config, []) ->
  222. {ok, Config};
  223. generate_config(Config, [Fun | Tail]) ->
  224. case Fun(Config) of
  225. {ok, Config} ->
  226. generate_config(Config, Tail);
  227. {error, CodeMessage} ->
  228. {error, CodeMessage}
  229. end.
  230. generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
  231. {ok, Config};
  232. generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
  233. {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
  234. generate_max_delayed_messages(Config) ->
  235. {ok, Config}.
  236. update_config_(Config) ->
  237. case emqx_delayed:update_config(Config) of
  238. {ok, #{raw_config := NewDelayed}} ->
  239. {200, NewDelayed};
  240. {error, Reason} ->
  241. Message = list_to_binary(
  242. io_lib:format("Update config failed ~p", [Reason])
  243. ),
  244. {500, ?INTERNAL_ERROR, Message}
  245. end.
  246. generate_http_code_map(id_schema_error, Id) ->
  247. #{
  248. code => ?MESSAGE_ID_SCHEMA_ERROR,
  249. message =>
  250. iolist_to_binary(io_lib:format("Message ID ~s schema error", [Id]))
  251. };
  252. generate_http_code_map(not_found, Id) ->
  253. #{
  254. code => ?MESSAGE_ID_NOT_FOUND,
  255. message =>
  256. iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
  257. };
  258. generate_http_code_map(invalid_node, Node) ->
  259. #{
  260. code => ?INVALID_NODE,
  261. message =>
  262. iolist_to_binary(io_lib:format("The node name ~s is invalid", [Node]))
  263. }.
  264. make_maybe(X, Error, Fun) ->
  265. try Fun(X) of
  266. Right ->
  267. Right
  268. catch
  269. _:_ ->
  270. {left, X, Error}
  271. end.
  272. with_maybe(Maybes, Cont) ->
  273. with_maybe(Maybes, Cont, []).
  274. with_maybe([], Cont, Rights) ->
  275. erlang:apply(Cont, lists:reverse(Rights));
  276. with_maybe([{left, X, Error} | _], _Cont, _Rights) ->
  277. {400, generate_http_code_map(Error, X)};
  278. with_maybe([Right | T], Cont, Rights) ->
  279. with_maybe(T, Cont, [Right | Rights]).