emqx_delayed_api.erl 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_delayed_api).
  17. -behaviour(minirest_api).
  18. -include_lib("typerefl/include/types.hrl").
  19. -import(hoconsc, [mk/2, ref/1, ref/2]).
  20. -define(MAX_PAYLOAD_LENGTH, 2048).
  21. -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
  22. -export([status/2
  23. , delayed_messages/2
  24. , delayed_message/2
  25. ]).
  26. -export([paths/0, fields/1, schema/1]).
  27. %% for rpc
  28. -export([update_config_/1]).
  29. -export([api_spec/0]).
  30. -define(ALREADY_ENABLED, 'ALREADY_ENABLED').
  31. -define(ALREADY_DISABLED, 'ALREADY_DISABLED').
  32. -define(BAD_REQUEST, 'BAD_REQUEST').
  33. -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
  34. -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
  35. api_spec() ->
  36. emqx_dashboard_swagger:spec(?MODULE).
  37. paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"].
  38. schema("/mqtt/delayed") ->
  39. #{
  40. 'operationId' => status,
  41. get => #{
  42. tags => [<<"mqtt">>],
  43. description => <<"Get delayed status">>,
  44. summary => <<"Get delayed status">>,
  45. responses => #{
  46. 200 => ref(emqx_modules_schema, "delayed")
  47. }
  48. },
  49. put => #{
  50. tags => [<<"mqtt">>],
  51. description => <<"Enable or disable delayed, set max delayed messages">>,
  52. 'requestBody' => ref(emqx_modules_schema, "delayed"),
  53. responses => #{
  54. 200 => mk(ref(emqx_modules_schema, "delayed"),
  55. #{desc => <<"Enable or disable delayed successfully">>}),
  56. 400 => emqx_dashboard_swagger:error_codes( [?BAD_REQUEST]
  57. , <<"Max limit illegality">>)
  58. }
  59. }
  60. };
  61. schema("/mqtt/delayed/messages/:msgid") ->
  62. #{'operationId' => delayed_message,
  63. get => #{
  64. tags => [<<"mqtt">>],
  65. description => <<"Get delayed message">>,
  66. parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
  67. responses => #{
  68. 200 => ref("message_without_payload"),
  69. 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
  70. , <<"Bad MsgId format">>),
  71. 404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND]
  72. , <<"MsgId not found">>)
  73. }
  74. },
  75. delete => #{
  76. tags => [<<"mqtt">>],
  77. description => <<"Delete delayed message">>,
  78. parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
  79. responses => #{
  80. 204 => <<"Delete delayed message success">>,
  81. 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
  82. , <<"Bad MsgId format">>),
  83. 404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND]
  84. , <<"MsgId not found">>)
  85. }
  86. }
  87. };
  88. schema("/mqtt/delayed/messages") ->
  89. #{
  90. 'operationId' => delayed_messages,
  91. get => #{
  92. tags => [<<"mqtt">>],
  93. description => <<"List delayed messages">>,
  94. parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
  95. responses => #{
  96. 200 =>
  97. [
  98. {data, mk(hoconsc:array(ref("message")), #{})},
  99. {meta, [
  100. {page, mk(integer(), #{})},
  101. {limit, mk(integer(), #{})},
  102. {count, mk(integer(), #{})}
  103. ]}
  104. ]
  105. }
  106. }
  107. }.
  108. fields("message_without_payload") ->
  109. [
  110. {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
  111. {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
  112. {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
  113. {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
  114. {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
  115. {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
  116. {qos, mk(binary(), #{desc => <<"QoS">>})},
  117. {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
  118. {from_username, mk(binary(), #{desc => <<"From Username">>})}
  119. ];
  120. fields("message") ->
  121. PayloadDesc = io_lib:format(
  122. "Payload, base64 encode. Payload will be ~p if length large than ~p",
  123. [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
  124. fields("message_without_payload") ++
  125. [{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}].
  126. %%--------------------------------------------------------------------
  127. %% HTTP API
  128. %%--------------------------------------------------------------------
  129. status(get, _Params) ->
  130. {200, get_status()};
  131. status(put, #{body := Body}) ->
  132. update_config(Body).
  133. delayed_messages(get, #{query_string := Qs}) ->
  134. {200, emqx_delayed:list(Qs)}.
  135. delayed_message(get, #{bindings := #{msgid := Id}}) ->
  136. case emqx_delayed:get_delayed_message(Id) of
  137. {ok, Message} ->
  138. Payload = maps:get(payload, Message),
  139. case size(Payload) > ?MAX_PAYLOAD_LENGTH of
  140. true ->
  141. {200, Message#{payload => ?PAYLOAD_TOO_LARGE}};
  142. _ ->
  143. {200, Message#{payload => Payload}}
  144. end;
  145. {error, id_schema_error} ->
  146. {400, generate_http_code_map(id_schema_error, Id)};
  147. {error, not_found} ->
  148. {404, generate_http_code_map(not_found, Id)}
  149. end;
  150. delayed_message(delete, #{bindings := #{msgid := Id}}) ->
  151. case emqx_delayed:get_delayed_message(Id) of
  152. {ok, _Message} ->
  153. _ = emqx_delayed:delete_delayed_message(Id),
  154. {204};
  155. {error, id_schema_error} ->
  156. {400, generate_http_code_map(id_schema_error, Id)};
  157. {error, not_found} ->
  158. {404, generate_http_code_map(not_found, Id)}
  159. end.
  160. %%--------------------------------------------------------------------
  161. %% internal function
  162. %%--------------------------------------------------------------------
  163. get_status() ->
  164. emqx_conf:get([delayed], #{}).
  165. update_config(Config) ->
  166. case generate_config(Config) of
  167. {ok, Config} ->
  168. update_config_(Config),
  169. {200, get_status()};
  170. {error, {Code, Message}} ->
  171. {400, #{code => Code, message => Message}}
  172. end.
  173. generate_config(Config) ->
  174. generate_config(Config, [fun generate_max_delayed_messages/1]).
  175. generate_config(Config, []) ->
  176. {ok, Config};
  177. generate_config(Config, [Fun | Tail]) ->
  178. case Fun(Config) of
  179. {ok, Config} ->
  180. generate_config(Config, Tail);
  181. {error, CodeMessage} ->
  182. {error, CodeMessage}
  183. end.
  184. generate_max_delayed_messages(Config = #{<<"max_delayed_messages">> := Max}) when Max >= 0 ->
  185. {ok, Config};
  186. generate_max_delayed_messages(#{<<"max_delayed_messages">> := Max}) when Max < 0 ->
  187. {error, {?BAD_REQUEST, <<"Max delayed must be equal or greater than 0">>}};
  188. generate_max_delayed_messages(Config) ->
  189. {ok, Config}.
  190. update_config_(Config) ->
  191. lists:foreach(fun(Node) ->
  192. update_config_(Node, Config)
  193. end, mria_mnesia:running_nodes()).
  194. update_config_(Node, Config) when Node =:= node() ->
  195. _ = emqx_delayed:update_config(Config),
  196. case maps:get(<<"enable">>, Config, undefined) of
  197. undefined ->
  198. ignore;
  199. true ->
  200. emqx_delayed:enable();
  201. false ->
  202. emqx_delayed:disable()
  203. end,
  204. case maps:get(<<"max_delayed_messages">>, Config, undefined) of
  205. undefined ->
  206. ignore;
  207. Max ->
  208. ok = emqx_delayed:set_max_delayed_messages(Max)
  209. end;
  210. update_config_(Node, Config) ->
  211. rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
  212. generate_http_code_map(id_schema_error, Id) ->
  213. #{code => ?MESSAGE_ID_SCHEMA_ERROR, message =>
  214. iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))};
  215. generate_http_code_map(not_found, Id) ->
  216. #{code => ?MESSAGE_ID_NOT_FOUND, message =>
  217. iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
  218. rpc_call(Node, Module, Fun, Args) ->
  219. case rpc:call(Node, Module, Fun, Args) of
  220. {badrpc, Reason} -> {error, Reason};
  221. Result -> Result
  222. end.