emqx_delayed_api.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_delayed_api).
  17. -behaviour(minirest_api).
  18. -include_lib("typerefl/include/types.hrl").
  19. -include_lib("hocon/include/hoconsc.hrl").
  20. -include("emqx_modules.hrl").
  21. -import(hoconsc, [mk/2, ref/1, ref/2]).
  22. -export([
  23. status/2,
  24. delayed_messages/2,
  25. delayed_message/2,
  26. delayed_message_topic/2
  27. ]).
  28. -export([
  29. paths/0,
  30. fields/1,
  31. schema/1,
  32. namespace/0
  33. ]).
  34. %% for rpc
  35. -export([update_config_/1]).
  36. -export([api_spec/0]).
  37. -define(MAX_PAYLOAD_LENGTH, 2048).
  38. -define(PAYLOAD_TOO_LARGE, <<"PAYLOAD_TOO_LARGE">>).
  39. -define(INTERNAL_ERROR, 'INTERNAL_ERROR').
  40. -define(BAD_REQUEST, 'BAD_REQUEST').
  41. -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
  42. -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
  43. -define(INVALID_NODE, 'INVALID_NODE').
  44. -define(INVALID_TOPIC, 'INVALID_TOPIC_NAME').
  45. -define(MESSAGE_NOT_FOUND, 'MESSAGE_NOT_FOUND').
  46. namespace() -> undefined.
  47. api_spec() ->
  48. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
  49. paths() ->
  50. [
  51. "/mqtt/delayed",
  52. "/mqtt/delayed/messages",
  53. "/mqtt/delayed/messages/:topic",
  54. "/mqtt/delayed/messages/:node/:msgid"
  55. ].
  56. schema("/mqtt/delayed") ->
  57. #{
  58. 'operationId' => status,
  59. get => #{
  60. tags => ?API_TAG_MQTT,
  61. description => ?DESC(view_status_api),
  62. responses => #{
  63. 200 => ref(emqx_modules_schema, "delayed")
  64. }
  65. },
  66. put => #{
  67. tags => ?API_TAG_MQTT,
  68. description => ?DESC(update_api),
  69. 'requestBody' => ref(emqx_modules_schema, "delayed"),
  70. responses => #{
  71. 200 => mk(
  72. ref(emqx_modules_schema, "delayed"),
  73. #{desc => ?DESC(update_success)}
  74. ),
  75. 400 => emqx_dashboard_swagger:error_codes(
  76. [?BAD_REQUEST],
  77. ?DESC(illegality_limit)
  78. )
  79. }
  80. }
  81. };
  82. schema("/mqtt/delayed/messages/:topic") ->
  83. #{
  84. 'operationId' => delayed_message_topic,
  85. delete => #{
  86. tags => ?API_TAG_MQTT,
  87. description => ?DESC(delete_api),
  88. parameters => [
  89. {topic,
  90. mk(
  91. binary(),
  92. #{in => path, desc => ?DESC(topic)}
  93. )}
  94. ],
  95. responses => #{
  96. 204 => <<"Delete delayed message success">>,
  97. 400 => emqx_dashboard_swagger:error_codes(
  98. [?INVALID_TOPIC],
  99. ?DESC(bad_topic_name)
  100. ),
  101. 404 => emqx_dashboard_swagger:error_codes(
  102. [?MESSAGE_NOT_FOUND],
  103. ?DESC(no_delayed_message)
  104. )
  105. }
  106. }
  107. };
  108. schema("/mqtt/delayed/messages/:node/:msgid") ->
  109. #{
  110. 'operationId' => delayed_message,
  111. get => #{
  112. tags => ?API_TAG_MQTT,
  113. description => ?DESC(get_message_api),
  114. parameters => [
  115. {node,
  116. mk(
  117. binary(),
  118. #{in => path, desc => ?DESC(node)}
  119. )},
  120. {msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
  121. ],
  122. responses => #{
  123. 200 => ref("message_without_payload"),
  124. 400 => emqx_dashboard_swagger:error_codes(
  125. [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
  126. ?DESC(bad_msgid_format)
  127. ),
  128. 404 => emqx_dashboard_swagger:error_codes(
  129. [?MESSAGE_ID_NOT_FOUND],
  130. ?DESC(msgid_not_found)
  131. )
  132. }
  133. },
  134. delete => #{
  135. tags => ?API_TAG_MQTT,
  136. description => ?DESC(delete_api),
  137. parameters => [
  138. {node,
  139. mk(
  140. binary(),
  141. #{in => path, desc => ?DESC(node)}
  142. )},
  143. {msgid, mk(binary(), #{in => path, desc => ?DESC(msgid)})}
  144. ],
  145. responses => #{
  146. 204 => <<"Delete delayed message success">>,
  147. 400 => emqx_dashboard_swagger:error_codes(
  148. [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE],
  149. ?DESC(bad_msgid_format)
  150. ),
  151. 404 => emqx_dashboard_swagger:error_codes(
  152. [?MESSAGE_ID_NOT_FOUND],
  153. ?DESC(msgid_not_found)
  154. )
  155. }
  156. }
  157. };
  158. schema("/mqtt/delayed/messages") ->
  159. #{
  160. 'operationId' => delayed_messages,
  161. get => #{
  162. tags => ?API_TAG_MQTT,
  163. description => ?DESC(list_api),
  164. parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
  165. responses => #{
  166. 200 =>
  167. [
  168. {data, mk(hoconsc:array(ref("message")), #{})},
  169. {meta, [
  170. {page, mk(pos_integer(), #{desc => ?DESC(view_page)})},
  171. {limit, mk(pos_integer(), #{desc => ?DESC(view_limit)})},
  172. {count, mk(non_neg_integer(), #{desc => ?DESC(count)})}
  173. ]}
  174. ]
  175. }
  176. }
  177. }.
  178. fields("message_without_payload") ->
  179. [
  180. {msgid, mk(integer(), #{desc => ?DESC(msgid)})},
  181. {node, mk(binary(), #{desc => ?DESC(node)})},
  182. {publish_at, mk(binary(), #{desc => ?DESC(publish_at)})},
  183. {delayed_interval, mk(pos_integer(), #{desc => ?DESC(delayed_interval)})},
  184. {delayed_remaining, mk(non_neg_integer(), #{desc => ?DESC(delayed_remaining)})},
  185. {expected_at, mk(binary(), #{desc => ?DESC(expected_at)})},
  186. {topic, mk(binary(), #{desc => ?DESC(topic), example => <<"/sys/#">>})},
  187. {qos, mk(emqx_schema:qos(), #{desc => ?DESC(qos)})},
  188. {from_clientid, mk(binary(), #{desc => ?DESC(from_clientid)})},
  189. {from_username, mk(binary(), #{desc => ?DESC(from_username)})}
  190. ];
  191. fields("message") ->
  192. fields("message_without_payload") ++
  193. [{payload, mk(binary(), #{desc => ?DESC(payload)})}].
  194. %%--------------------------------------------------------------------
  195. %% HTTP API
  196. %%--------------------------------------------------------------------
  197. status(get, _Params) ->
  198. {200, get_status()};
  199. status(put, #{body := Body}) ->
  200. update_config(Body).
  201. delayed_messages(get, #{query_string := Qs}) ->
  202. {200, emqx_delayed:cluster_list(Qs)}.
  203. delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
  204. MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_existing_atom/1),
  205. MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
  206. with_maybe(
  207. [MaybeNode, MaybeId],
  208. fun(Node, Id) ->
  209. case emqx_delayed:get_delayed_message(Node, Id) of
  210. {ok, Message} ->
  211. Payload = maps:get(payload, Message),
  212. case erlang:byte_size(Payload) > ?MAX_PAYLOAD_LENGTH of
  213. true ->
  214. {200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
  215. _ ->
  216. {200, Message#{payload => base64:encode(Payload)}}
  217. end;
  218. {error, not_found} ->
  219. {404, generate_http_code_map(not_found, HexId)};
  220. {badrpc, _} ->
  221. {400, generate_http_code_map(invalid_node, NodeBin)}
  222. end
  223. end
  224. );
  225. delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
  226. MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
  227. MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
  228. with_maybe(
  229. [MaybeNode, MaybeId],
  230. fun(Node, Id) ->
  231. case emqx_delayed:delete_delayed_message(Node, Id) of
  232. ok ->
  233. {204};
  234. {error, not_found} ->
  235. {404, generate_http_code_map(not_found, Id)}
  236. end
  237. end
  238. ).
  239. delayed_message_topic(delete, #{bindings := #{topic := Topic}}) ->
  240. MaybeTopic = make_maybe(Topic, invalid_topic_name, fun validate_topic_name/1),
  241. with_maybe(
  242. [MaybeTopic],
  243. fun(TopicName) ->
  244. case emqx_delayed:delete_delayed_messages_by_topic_name(TopicName) of
  245. ok ->
  246. {204};
  247. {error, not_found} ->
  248. {404, generate_http_code_map(message_not_found, TopicName)}
  249. end
  250. end
  251. ).
  252. %%--------------------------------------------------------------------
  253. %% internal function
  254. %%--------------------------------------------------------------------
  255. get_status() ->
  256. emqx_conf:get([delayed], #{}).
  257. update_config(Config) ->
  258. case generate_config(Config) of
  259. {ok, Config} ->
  260. update_config_(Config);
  261. {error, {Code, Message}} ->
  262. {400, #{code => Code, message => Message}}
  263. end.
  264. generate_config(Config) ->
  265. generate_config(Config, [fun generate_max_delayed_messages/1]).
  266. generate_config(Config, []) ->
  267. {ok, Config};
  268. generate_config(Config, [Fun | Tail]) ->
  269. case Fun(Config) of
  270. {ok, Config} ->
  271. generate_config(Config, Tail);
  272. {error, CodeMessage} ->
  273. {error, CodeMessage}
  274. end.
  275. generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
  276. {ok, Config};
  277. generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
  278. {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
  279. generate_max_delayed_messages(Config) ->
  280. {ok, Config}.
  281. update_config_(Config) ->
  282. case emqx_delayed:update_config(Config) of
  283. {ok, #{raw_config := NewDelayed}} ->
  284. {200, NewDelayed};
  285. {error, Reason} ->
  286. Message = list_to_binary(
  287. io_lib:format("Update config failed ~p", [Reason])
  288. ),
  289. {500, ?INTERNAL_ERROR, Message}
  290. end.
  291. generate_http_code_map(id_schema_error, Id) ->
  292. #{
  293. code => ?MESSAGE_ID_SCHEMA_ERROR,
  294. message =>
  295. iolist_to_binary(io_lib:format("Message ID ~s schema error", [Id]))
  296. };
  297. generate_http_code_map(not_found, Id) ->
  298. #{
  299. code => ?MESSAGE_ID_NOT_FOUND,
  300. message =>
  301. iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
  302. };
  303. generate_http_code_map(message_not_found, Topic) ->
  304. #{
  305. code => ?MESSAGE_NOT_FOUND,
  306. message =>
  307. iolist_to_binary(io_lib:format("Not found messages for ~s", [Topic]))
  308. };
  309. generate_http_code_map(invalid_topic_name, Topic) ->
  310. #{
  311. code => ?INVALID_TOPIC,
  312. message =>
  313. iolist_to_binary(io_lib:format("The topic name ~s is invalid", [Topic]))
  314. };
  315. generate_http_code_map(invalid_node, Node) ->
  316. #{
  317. code => ?INVALID_NODE,
  318. message =>
  319. iolist_to_binary(io_lib:format("The node name ~s is invalid", [Node]))
  320. }.
  321. make_maybe(X, Error, Fun) ->
  322. try Fun(X) of
  323. Right ->
  324. Right
  325. catch
  326. _:_ ->
  327. {left, X, Error}
  328. end.
  329. validate_topic_name(Topic) ->
  330. case emqx_topic:wildcard(Topic) of
  331. true ->
  332. error(badarg);
  333. false ->
  334. Topic
  335. end.
  336. with_maybe(Maybes, Cont) ->
  337. with_maybe(Maybes, Cont, []).
  338. with_maybe([], Cont, Rights) ->
  339. erlang:apply(Cont, lists:reverse(Rights));
  340. with_maybe([{left, X, Error} | _], _Cont, _Rights) ->
  341. {400, generate_http_code_map(Error, X)};
  342. with_maybe([Right | T], Cont, Rights) ->
  343. with_maybe(T, Cont, [Right | Rights]).