emqx_delayed_api.erl 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_delayed_api).
  17. -behaviour(minirest_api).
  18. -include_lib("typerefl/include/types.hrl").
  19. -import(hoconsc, [mk/2, ref/1, ref/2]).
  20. -define(MAX_PAYLOAD_LENGTH, 2048).
  21. -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
  22. -export([status/2
  23. , delayed_messages/2
  24. , delayed_message/2
  25. ]).
  26. -export([paths/0, fields/1, schema/1]).
  27. %% for rpc
  28. -export([update_config_/1]).
  29. -export([api_spec/0]).
  30. -define(ALREADY_ENABLED, 'ALREADY_ENABLED').
  31. -define(ALREADY_DISABLED, 'ALREADY_DISABLED').
  32. -define(BAD_REQUEST, 'BAD_REQUEST').
  33. -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
  34. -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
  35. api_spec() ->
  36. emqx_dashboard_swagger:spec(?MODULE).
  37. paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"].
  38. schema("/mqtt/delayed") ->
  39. #{
  40. operationId => status,
  41. get => #{
  42. tags => [<<"mqtt">>],
  43. description => <<"Get delayed status">>,
  44. summary => <<"Get delayed status">>,
  45. responses => #{
  46. 200 => ref(emqx_modules_schema, "delayed")
  47. }
  48. },
  49. put => #{
  50. tags => [<<"mqtt">>],
  51. description => <<"Enable or disable delayed, set max delayed messages">>,
  52. requestBody => ref(emqx_modules_schema, "delayed"),
  53. responses => #{
  54. 200 => mk(ref(emqx_modules_schema, "delayed"),
  55. #{desc => <<"Enable or disable delayed successfully">>}),
  56. 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Max limit illegality">>)
  57. }
  58. }
  59. };
  60. schema("/mqtt/delayed/messages/:msgid") ->
  61. #{operationId => delayed_message,
  62. get => #{
  63. tags => [<<"mqtt">>],
  64. description => <<"Get delayed message">>,
  65. parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
  66. responses => #{
  67. 200 => ref("message_without_payload"),
  68. 400 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
  69. 404 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
  70. }
  71. },
  72. delete => #{
  73. tags => [<<"mqtt">>],
  74. description => <<"Delete delayed message">>,
  75. parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
  76. responses => #{
  77. 200 => <<"Delete delayed message success">>,
  78. 400 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
  79. 404 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
  80. }
  81. }
  82. };
  83. schema("/mqtt/delayed/messages") ->
  84. #{
  85. operationId => delayed_messages,
  86. get => #{
  87. tags => [<<"mqtt">>],
  88. description => <<"List delayed messages">>,
  89. parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
  90. responses => #{
  91. 200 =>
  92. [
  93. {data, mk(hoconsc:array(ref("message")), #{})},
  94. {meta, [
  95. {page, mk(integer(), #{})},
  96. {limit, mk(integer(), #{})},
  97. {count, mk(integer(), #{})}
  98. ]}
  99. ]
  100. }
  101. }
  102. }.
  103. fields("message_without_payload") ->
  104. [
  105. {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
  106. {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
  107. {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
  108. {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
  109. {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
  110. {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
  111. {qos, mk(binary(), #{desc => <<"QoS">>})},
  112. {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
  113. {from_username, mk(binary(), #{desc => <<"From Username">>})}
  114. ];
  115. fields("message") ->
  116. PayloadDesc = io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
  117. [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
  118. fields("message_without_payload") ++
  119. [{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}].
  120. %%--------------------------------------------------------------------
  121. %% HTTP API
  122. %%--------------------------------------------------------------------
  123. status(get, _Params) ->
  124. {200, get_status()};
  125. status(put, #{body := Body}) ->
  126. update_config(Body).
  127. delayed_messages(get, #{query_string := Qs}) ->
  128. {200, emqx_delayed:list(Qs)}.
  129. delayed_message(get, #{bindings := #{msgid := Id}}) ->
  130. case emqx_delayed:get_delayed_message(Id) of
  131. {ok, Message} ->
  132. Payload = maps:get(payload, Message),
  133. case size(Payload) > ?MAX_PAYLOAD_LENGTH of
  134. true ->
  135. {200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
  136. _ ->
  137. {200, Message#{payload => Payload}}
  138. end;
  139. {error, id_schema_error} ->
  140. {400, generate_http_code_map(id_schema_error, Id)};
  141. {error, not_found} ->
  142. {404, generate_http_code_map(not_found, Id)}
  143. end;
  144. delayed_message(delete, #{bindings := #{msgid := Id}}) ->
  145. case emqx_delayed:get_delayed_message(Id) of
  146. {ok, _Message} ->
  147. _ = emqx_delayed:delete_delayed_message(Id),
  148. {200};
  149. {error, id_schema_error} ->
  150. {400, generate_http_code_map(id_schema_error, Id)};
  151. {error, not_found} ->
  152. {404, generate_http_code_map(not_found, Id)}
  153. end.
  154. %%--------------------------------------------------------------------
  155. %% internal function
  156. %%--------------------------------------------------------------------
  157. get_status() ->
  158. emqx_conf:get([delayed], #{}).
  159. update_config(Config) ->
  160. case generate_config(Config) of
  161. {ok, Config} ->
  162. update_config_(Config),
  163. {200, get_status()};
  164. {error, {Code, Message}} ->
  165. {400, #{code => Code, message => Message}}
  166. end.
  167. generate_config(Config) ->
  168. generate_config(Config, [fun generate_max_delayed_messages/1]).
  169. generate_config(Config, []) ->
  170. {ok, Config};
  171. generate_config(Config, [Fun | Tail]) ->
  172. case Fun(Config) of
  173. {ok, Config} ->
  174. generate_config(Config, Tail);
  175. {error, CodeMessage} ->
  176. {error, CodeMessage}
  177. end.
  178. generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
  179. {ok, Config};
  180. generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
  181. {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
  182. generate_max_delayed_messages(Config) ->
  183. {ok, Config}.
  184. update_config_(Config) ->
  185. lists:foreach(fun(Node) ->
  186. update_config_(Node, Config)
  187. end, mria_mnesia:running_nodes()).
  188. update_config_(Node, Config) when Node =:= node() ->
  189. _ = emqx_delayed:update_config(Config),
  190. case maps:get(<<"enable">>, Config, undefined) of
  191. undefined ->
  192. ignore;
  193. true ->
  194. emqx_delayed:enable();
  195. false ->
  196. emqx_delayed:disable()
  197. end,
  198. case maps:get(<<"max_delayed_messages">>, Config, undefined) of
  199. undefined ->
  200. ignore;
  201. Max ->
  202. ok = emqx_delayed:set_max_delayed_messages(Max)
  203. end;
  204. update_config_(Node, Config) ->
  205. rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
  206. generate_http_code_map(id_schema_error, Id) ->
  207. #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))};
  208. generate_http_code_map(not_found, Id) ->
  209. #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
  210. rpc_call(Node, Module, Fun, Args) ->
  211. case rpc:call(Node, Module, Fun, Args) of
  212. {badrpc, Reason} -> {error, Reason};
  213. Result -> Result
  214. end.