emqx_exhook_api.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_exhook_api).
  17. -behaviour(minirest_api).
  18. -include_lib("typerefl/include/types.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
  21. -export([exhooks/2, action_with_name/2, move/2, server_hooks/2]).
  22. -import(hoconsc, [mk/2, ref/1, enum/1, array/1, map/2]).
  23. -import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]).
  24. -define(TAGS, [<<"exhooks">>]).
  25. -define(BAD_REQUEST, 'BAD_REQUEST').
  26. -define(BAD_RPC, 'BAD_RPC').
  27. -dialyzer([{nowarn_function, [ fill_cluster_server_info/5
  28. , nodes_server_info/5
  29. , fill_server_hooks_info/4
  30. ]}]).
  31. %%--------------------------------------------------------------------
  32. %% schema
  33. %%--------------------------------------------------------------------
  34. namespace() -> "exhook".
  35. api_spec() ->
  36. emqx_dashboard_swagger:spec(?MODULE).
  37. paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move", "/exhooks/:name/hooks"].
  38. schema(("/exhooks")) ->
  39. #{
  40. 'operationId' => exhooks,
  41. get => #{tags => ?TAGS,
  42. description => <<"List all servers">>,
  43. responses => #{200 => mk(array(ref(detail_server_info)), #{})}
  44. },
  45. post => #{tags => ?TAGS,
  46. description => <<"Add a servers">>,
  47. 'requestBody' => server_conf_schema(),
  48. responses => #{201 => mk(ref(detail_server_info), #{}),
  49. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
  50. }
  51. }
  52. };
  53. schema("/exhooks/:name") ->
  54. #{'operationId' => action_with_name,
  55. get => #{tags => ?TAGS,
  56. description => <<"Get the detail information of server">>,
  57. parameters => params_server_name_in_path(),
  58. responses => #{200 => mk(ref(detail_server_info), #{}),
  59. 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
  60. }
  61. },
  62. put => #{tags => ?TAGS,
  63. description => <<"Update the server">>,
  64. parameters => params_server_name_in_path(),
  65. 'requestBody' => server_conf_schema(),
  66. responses => #{200 => <<>>,
  67. 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
  68. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
  69. }
  70. },
  71. delete => #{tags => ?TAGS,
  72. description => <<"Delete the server">>,
  73. parameters => params_server_name_in_path(),
  74. responses => #{204 => <<>>,
  75. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
  76. }
  77. }
  78. };
  79. schema("/exhooks/:name/hooks") ->
  80. #{'operationId' => server_hooks,
  81. get => #{tags => ?TAGS,
  82. description => <<"Get the hooks information of server">>,
  83. parameters => params_server_name_in_path(),
  84. responses => #{200 => mk(array(ref(list_hook_info)), #{}),
  85. 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
  86. }
  87. }
  88. };
  89. schema("/exhooks/:name/move") ->
  90. #{'operationId' => move,
  91. post => #{tags => ?TAGS,
  92. description => <<"Move the server">>,
  93. parameters => params_server_name_in_path(),
  94. 'requestBody' => mk(ref(move_req), #{}),
  95. responses => #{200 => <<>>,
  96. 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
  97. 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
  98. }
  99. }
  100. }.
  101. fields(move_req) ->
  102. [ {position, mk(enum([top, bottom, before, 'after']), #{})}
  103. , {related, mk(string(), #{desc => <<"Relative position of movement">>,
  104. default => <<>>,
  105. example => <<>>
  106. })}
  107. ];
  108. fields(detail_server_info) ->
  109. [ {metrics, mk(ref(metrics), #{})}
  110. , {node_metrics, mk(array(ref(node_metrics)), #{})}
  111. , {node_status, mk(array(ref(node_status)), #{})}
  112. , {hooks, mk(array(ref(hook_info)), #{})}
  113. ] ++ emqx_exhook_schema:server_config();
  114. fields(list_hook_info) ->
  115. [ {name, mk(binary(), #{desc => <<"The hook's name">>})}
  116. , {params, mk(map(name, binary()),
  117. #{desc => <<"The parameters used when the hook is registered">>})}
  118. , {metrics, mk(ref(metrics), #{})}
  119. , {node_metrics, mk(array(ref(node_metrics)), #{})}
  120. ];
  121. fields(node_metrics) ->
  122. [ {node, mk(string(), #{})}
  123. , {metrics, mk(ref(metrics), #{})}
  124. ];
  125. fields(node_status) ->
  126. [ {node, mk(string(), #{})}
  127. , {status, mk(enum([running, waiting, stopped, error]), #{})}
  128. ];
  129. fields(hook_info) ->
  130. [ {name, mk(binary(), #{desc => <<"The hook's name">>})}
  131. , {params, mk(map(name, binary()),
  132. #{desc => <<"The parameters used when the hook is registered">>})}
  133. ];
  134. fields(metrics) ->
  135. [ {succeed, mk(integer(), #{})}
  136. , {failed, mk(integer(), #{})}
  137. , {rate, mk(integer(), #{})}
  138. , {max_rate, mk(integer(), #{})}
  139. ];
  140. fields(server_config) ->
  141. emqx_exhook_schema:server_config().
  142. params_server_name_in_path() ->
  143. [{name, mk(string(), #{in => path,
  144. required => true,
  145. example => <<"default">>})}
  146. ].
  147. server_conf_schema() ->
  148. schema_with_example(ref(server_config),
  149. #{ name => "default"
  150. , enable => true
  151. , url => <<"http://127.0.0.1:8081">>
  152. , request_timeout => "5s"
  153. , failed_action => deny
  154. , auto_reconnect => "60s"
  155. , pool_size => 8
  156. , ssl => #{ enable => false
  157. , cacertfile => <<"{{ platform_etc_dir }}/certs/cacert.pem">>
  158. , certfile => <<"{{ platform_etc_dir }}/certs/cert.pem">>
  159. , keyfile => <<"{{ platform_etc_dir }}/certs/key.pem">>
  160. }
  161. }).
  162. %%--------------------------------------------------------------------
  163. %% API
  164. %%--------------------------------------------------------------------
  165. exhooks(get, _) ->
  166. Confs = emqx:get_config([exhook, servers]),
  167. Infos = nodes_all_server_info(Confs),
  168. {200, Infos};
  169. exhooks(post, #{body := Body}) ->
  170. {ok, _} = emqx_exhook_mgr:update_config([exhook, servers], {add, Body}),
  171. #{<<"name">> := Name} = Body,
  172. get_nodes_server_info(Name).
  173. action_with_name(get, #{bindings := #{name := Name}}) ->
  174. get_nodes_server_info(Name);
  175. action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
  176. case emqx_exhook_mgr:update_config([exhook, servers],
  177. {update, Name, Body}) of
  178. {ok, not_found} ->
  179. {400, #{code => <<"BAD_REQUEST">>,
  180. message => <<"Server not found">>
  181. }};
  182. {ok, {error, Reason}} ->
  183. {400, #{code => <<"BAD_REQUEST">>,
  184. message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason]))
  185. }};
  186. {ok, _} ->
  187. {200};
  188. {error, Error} ->
  189. {500, #{code => <<"BAD_RPC">>,
  190. message => Error
  191. }}
  192. end;
  193. action_with_name(delete, #{bindings := #{name := Name}}) ->
  194. case emqx_exhook_mgr:update_config([exhook, servers],
  195. {delete, Name}) of
  196. {ok, _} ->
  197. {200};
  198. {error, Error} ->
  199. {500, #{code => <<"BAD_RPC">>,
  200. message => Error
  201. }}
  202. end.
  203. move(post, #{bindings := #{name := Name}, body := Body}) ->
  204. #{<<"position">> := PositionT, <<"related">> := Related} = Body,
  205. Position = erlang:binary_to_atom(PositionT),
  206. case emqx_exhook_mgr:update_config([exhook, servers],
  207. {move, Name, Position, Related}) of
  208. {ok, ok} ->
  209. {200};
  210. {ok, not_found} ->
  211. {400, #{code => <<"BAD_REQUEST">>,
  212. message => <<"Server not found">>
  213. }};
  214. {error, Error} ->
  215. {500, #{code => <<"BAD_RPC">>,
  216. message => Error
  217. }}
  218. end.
  219. server_hooks(get, #{bindings := #{name := Name}}) ->
  220. Confs = emqx:get_config([exhook, servers]),
  221. case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
  222. false ->
  223. {400, #{code => <<"BAD_REQUEST">>,
  224. message => <<"Server not found">>
  225. }};
  226. _ ->
  227. Info = get_nodes_server_hooks_info(Name),
  228. {200, Info}
  229. end.
  230. get_nodes_server_info(Name) ->
  231. Confs = emqx:get_config([exhook, servers]),
  232. case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
  233. false ->
  234. {400, #{code => <<"BAD_REQUEST">>,
  235. message => <<"Server not found">>
  236. }};
  237. {value, Conf} ->
  238. NodeStatus = nodes_server_info(Name),
  239. {200, maps:merge(Conf, NodeStatus)}
  240. end.
  241. %%--------------------------------------------------------------------
  242. %% GET /exhooks
  243. %%--------------------------------------------------------------------
  244. nodes_all_server_info(ConfL) ->
  245. AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:all_servers_info(Nodes) end),
  246. Default = emqx_exhook_metrics:new_metrics_info(),
  247. node_all_server_info(ConfL, AllInfos, Default, []).
  248. node_all_server_info([#{name := ServerName} = Conf | T], AllInfos, Default, Acc) ->
  249. Info = fill_cluster_server_info(AllInfos, [], [], ServerName, Default),
  250. AllInfo = maps:merge(Conf, Info),
  251. node_all_server_info(T, AllInfos, Default, [AllInfo | Acc]);
  252. node_all_server_info([], _, _, Acc) ->
  253. lists:reverse(Acc).
  254. fill_cluster_server_info([{Node, {error, _}} | T], StatusL, MetricsL, ServerName, Default) ->
  255. fill_cluster_server_info(T,
  256. [#{node => Node, status => error} | StatusL],
  257. [#{node => Node, metrics => Default} | MetricsL],
  258. ServerName,
  259. Default);
  260. fill_cluster_server_info([{Node, Result} | T], StatusL, MetricsL, ServerName, Default) ->
  261. #{status := Status, metrics := Metrics} = Result,
  262. fill_cluster_server_info(T,
  263. [#{node => Node, status => maps:get(ServerName, Status, error)} | StatusL],
  264. [#{node => Node, metrics => maps:get(ServerName, Metrics, Default)} | MetricsL],
  265. ServerName,
  266. Default);
  267. fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
  268. Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
  269. #{metrics => Metrics,
  270. node_metrics => MetricsL,
  271. node_status => StatusL,
  272. hooks => emqx_exhook_mgr:hooks(ServerName)
  273. }.
  274. %%--------------------------------------------------------------------
  275. %% GET /exhooks/{name}
  276. %%--------------------------------------------------------------------
  277. nodes_server_info(Name) ->
  278. InfoL = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_info(Nodes, Name) end),
  279. Default = emqx_exhook_metrics:new_metrics_info(),
  280. nodes_server_info(InfoL, Name, Default, [], []).
  281. nodes_server_info([{Node, {error, _}} | T], Name, Default, StatusL, MetricsL) ->
  282. nodes_server_info(T,
  283. Name,
  284. Default,
  285. [#{node => Node, status => error} | StatusL],
  286. [#{node => Node, metrics => Default} | MetricsL]
  287. );
  288. nodes_server_info([{Node, Result} | T], Name, Default, StatusL, MetricsL) ->
  289. #{status := Status, metrics := Metrics} = Result,
  290. nodes_server_info(T,
  291. Name,
  292. Default,
  293. [#{node => Node, status => Status} | StatusL],
  294. [#{node => Node, metrics => Metrics} | MetricsL]
  295. );
  296. nodes_server_info([], Name, _, StatusL, MetricsL) ->
  297. #{metrics => emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
  298. node_status => StatusL,
  299. node_metrics => MetricsL,
  300. hooks => emqx_exhook_mgr:hooks(Name)
  301. }.
  302. %%--------------------------------------------------------------------
  303. %% GET /exhooks/{name}/hooks
  304. %%--------------------------------------------------------------------
  305. get_nodes_server_hooks_info(Name) ->
  306. case emqx_exhook_mgr:hooks(Name) of
  307. [] -> [];
  308. Hooks ->
  309. AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_hooks_metrics(Nodes, Name) end),
  310. Default = emqx_exhook_metrics:new_metrics_info(),
  311. get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
  312. end.
  313. get_nodes_server_hooks_info([#{name := Name} = Spec | T], AllInfos, Default, Acc) ->
  314. Info = fill_server_hooks_info(AllInfos, Name, Default, []),
  315. AllInfo = maps:merge(Spec, Info),
  316. get_nodes_server_hooks_info(T, AllInfos, Default, [AllInfo | Acc]);
  317. get_nodes_server_hooks_info([], _, _, Acc) ->
  318. Acc.
  319. fill_server_hooks_info([{_, {error, _}} | T], Name, Default, MetricsL) ->
  320. fill_server_hooks_info(T, Name, Default, MetricsL);
  321. fill_server_hooks_info([{Node, MetricsMap} | T], Name, Default, MetricsL) ->
  322. Metrics = maps:get(Name, MetricsMap, Default),
  323. NodeMetrics = #{node => Node, metrics => Metrics},
  324. fill_server_hooks_info(T, Name, Default, [NodeMetrics | MetricsL]);
  325. fill_server_hooks_info([], _Name, _Default, MetricsL) ->
  326. Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
  327. #{metrics => Metrics, node_metrics => MetricsL}.
  328. %%--------------------------------------------------------------------
  329. %% cluster call
  330. %%--------------------------------------------------------------------
  331. -spec call_cluster(fun(([node()]) -> emqx_rpc:erpc_multicall(A))) ->
  332. [{node(), A | {error, _Err}}].
  333. call_cluster(Fun) ->
  334. Nodes = mria_mnesia:running_nodes(),
  335. Ret = Fun(Nodes),
  336. lists:zip(Nodes, lists:map(fun emqx_rpc:unwrap_erpc/1, Ret)).