emqx_exhook_api.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_exhook_api).
  17. -behaviour(minirest_api).
  18. -include_lib("typerefl/include/types.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
  21. -export([exhooks/2, action_with_name/2, move/2]).
  22. -import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
  23. -import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]).
  24. -define(TAGS, [<<"exhooks">>]).
  25. -define(BAD_REQUEST, 'BAD_REQUEST').
  26. -define(BAD_RPC, 'BAD_RPC').
  27. namespace() -> "exhook".
  28. api_spec() ->
  29. emqx_dashboard_swagger:spec(?MODULE).
  30. paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move"].
  31. schema(("/exhooks")) ->
  32. #{
  33. 'operationId' => exhooks,
  34. get => #{tags => ?TAGS,
  35. description => <<"List all servers">>,
  36. responses => #{200 => mk(array(ref(detailed_server_info)), #{})}
  37. },
  38. post => #{tags => ?TAGS,
  39. description => <<"Add a servers">>,
  40. 'requestBody' => server_conf_schema(),
  41. responses => #{201 => mk(ref(detailed_server_info), #{}),
  42. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
  43. }
  44. }
  45. };
  46. schema("/exhooks/:name") ->
  47. #{'operationId' => action_with_name,
  48. get => #{tags => ?TAGS,
  49. description => <<"Get the detail information of server">>,
  50. parameters => params_server_name_in_path(),
  51. responses => #{200 => mk(ref(detailed_server_info), #{}),
  52. 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
  53. }
  54. },
  55. put => #{tags => ?TAGS,
  56. description => <<"Update the server">>,
  57. parameters => params_server_name_in_path(),
  58. 'requestBody' => server_conf_schema(),
  59. responses => #{200 => <<>>,
  60. 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
  61. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
  62. }
  63. },
  64. delete => #{tags => ?TAGS,
  65. description => <<"Delete the server">>,
  66. parameters => params_server_name_in_path(),
  67. responses => #{204 => <<>>,
  68. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) }
  69. }
  70. };
  71. schema("/exhooks/:name/move") ->
  72. #{'operationId' => move,
  73. post => #{tags => ?TAGS,
  74. description => <<"Move the server">>,
  75. parameters => params_server_name_in_path(),
  76. 'requestBody' => mk(ref(move_req), #{}),
  77. responses => #{200 => <<>>,
  78. 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
  79. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
  80. }
  81. }
  82. }.
  83. fields(move_req) ->
  84. [
  85. {position, mk(enum([top, bottom, before, 'after']), #{})},
  86. {related, mk(string(), #{desc => <<"Relative position of movement">>,
  87. default => <<>>,
  88. example => <<>>
  89. })}
  90. ];
  91. fields(detailed_server_info) ->
  92. [ {status, mk(enum([running, waiting, stopped]), #{})}
  93. , {hooks, mk(array(string()), #{default => []})}
  94. , {node_status, mk(ref(node_status), #{})}
  95. ] ++ emqx_exhook_schema:server_config();
  96. fields(node_status) ->
  97. [ {node, mk(string(), #{})}
  98. , {status, mk(enum([running, waiting, stopped, not_found, error]), #{})}
  99. ];
  100. fields(server_config) ->
  101. emqx_exhook_schema:server_config().
  102. params_server_name_in_path() ->
  103. [{name, mk(string(), #{in => path,
  104. required => true,
  105. example => <<"default">>})}
  106. ].
  107. server_conf_schema() ->
  108. schema_with_example(ref(server_config),
  109. #{ name => "default"
  110. , enable => true
  111. , url => <<"http://127.0.0.1:8081">>
  112. , request_timeout => "5s"
  113. , failed_action => deny
  114. , auto_reconnect => "60s"
  115. , pool_size => 8
  116. , ssl => #{ enable => false
  117. , cacertfile => <<"{{ platform_etc_dir }}/certs/cacert.pem">>
  118. , certfile => <<"{{ platform_etc_dir }}/certs/cert.pem">>
  119. , keyfile => <<"{{ platform_etc_dir }}/certs/key.pem">>
  120. }
  121. }).
  122. exhooks(get, _) ->
  123. ServerL = emqx_exhook_mgr:list(),
  124. ServerL2 = nodes_all_server_status(ServerL),
  125. {200, ServerL2};
  126. exhooks(post, #{body := Body}) ->
  127. case emqx_exhook_mgr:update_config([emqx_exhook, servers], {add, Body}) of
  128. {ok, Result} ->
  129. {201, Result};
  130. {error, Error} ->
  131. {500, #{code => <<"BAD_RPC">>,
  132. message => Error
  133. }}
  134. end.
  135. action_with_name(get, #{bindings := #{name := Name}}) ->
  136. Result = emqx_exhook_mgr:lookup(Name),
  137. NodeStatus = nodes_server_status(Name),
  138. case Result of
  139. not_found ->
  140. {400, #{code => <<"BAD_REQUEST">>,
  141. message => <<"Server not found">>
  142. }};
  143. ServerInfo ->
  144. {200, ServerInfo#{node_status => NodeStatus}}
  145. end;
  146. action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
  147. case emqx_exhook_mgr:update_config([emqx_exhook, servers],
  148. {update, Name, Body}) of
  149. {ok, not_found} ->
  150. {400, #{code => <<"BAD_REQUEST">>,
  151. message => <<"Server not found">>
  152. }};
  153. {ok, {error, Reason}} ->
  154. {400, #{code => <<"BAD_REQUEST">>,
  155. message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason]))
  156. }};
  157. {ok, _} ->
  158. {200};
  159. {error, Error} ->
  160. {500, #{code => <<"BAD_RPC">>,
  161. message => Error
  162. }}
  163. end;
  164. action_with_name(delete, #{bindings := #{name := Name}}) ->
  165. case emqx_exhook_mgr:update_config([emqx_exhook, servers],
  166. {delete, Name}) of
  167. {ok, _} ->
  168. {200};
  169. {error, Error} ->
  170. {500, #{code => <<"BAD_RPC">>,
  171. message => Error
  172. }}
  173. end.
  174. move(post, #{bindings := #{name := Name}, body := Body}) ->
  175. #{<<"position">> := PositionT, <<"related">> := Related} = Body,
  176. Position = erlang:binary_to_atom(PositionT),
  177. case emqx_exhook_mgr:update_config([emqx_exhook, servers],
  178. {move, Name, Position, Related}) of
  179. {ok, ok} ->
  180. {200};
  181. {ok, not_found} ->
  182. {400, #{code => <<"BAD_REQUEST">>,
  183. message => <<"Server not found">>
  184. }};
  185. {error, Error} ->
  186. {500, #{code => <<"BAD_RPC">>,
  187. message => Error
  188. }}
  189. end.
  190. nodes_server_status(Name) ->
  191. StatusL = call_cluster(emqx_exhook_mgr, server_status, [Name]),
  192. Handler = fun({Node, {error, _}}) ->
  193. #{node => Node,
  194. status => error
  195. };
  196. ({Node, Status}) ->
  197. #{node => Node,
  198. status => Status
  199. }
  200. end,
  201. lists:map(Handler, StatusL).
  202. nodes_all_server_status(ServerL) ->
  203. AllStatusL = call_cluster(emqx_exhook_mgr, all_servers_status, []),
  204. AggreMap = lists:foldl(fun(#{name := Name}, Acc) ->
  205. Acc#{Name => []}
  206. end,
  207. #{},
  208. ServerL),
  209. AddToMap = fun(Servers, Node, Status, Map) ->
  210. lists:foldl(fun(Name, Acc) ->
  211. StatusL = maps:get(Name, Acc),
  212. StatusL2 = [#{node => Node,
  213. status => Status
  214. } | StatusL],
  215. Acc#{Name := StatusL2}
  216. end,
  217. Map,
  218. Servers)
  219. end,
  220. AggreMap2 = lists:foldl(fun({Node, #{running := Running,
  221. waiting := Waiting,
  222. stopped := Stopped}},
  223. Acc) ->
  224. AddToMap(Stopped, Node, stopped,
  225. AddToMap(Waiting, Node, waiting,
  226. AddToMap(Running, Node, running, Acc)))
  227. end,
  228. AggreMap,
  229. AllStatusL),
  230. Handler = fun(#{name := Name} = Server) ->
  231. Server#{node_status => maps:get(Name, AggreMap2)}
  232. end,
  233. lists:map(Handler, ServerL).
  234. call_cluster(Module, Fun, Args) ->
  235. Nodes = mria_mnesia:running_nodes(),
  236. [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes].
  237. rpc_call(Node, Module, Fun, Args) when Node =:= node() ->
  238. erlang:apply(Module, Fun, Args);
  239. rpc_call(Node, Module, Fun, Args) ->
  240. case rpc:call(Node, Module, Fun, Args) of
  241. {badrpc, Reason} -> {error, Reason};
  242. Res -> Res
  243. end.