emqx_gateway_http.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc Gateway Interface Module for HTTP-APIs
  17. -module(emqx_gateway_http).
  18. -include("emqx_gateway.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -include_lib("emqx_auth/include/emqx_authn_chains.hrl").
  21. -define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
  22. -import(emqx_gateway_utils, [listener_id/3]).
  23. %% Mgmt APIs - gateway
  24. -export([gateways/1]).
  25. %% Mgmt APIs
  26. -export([
  27. add_listener/2,
  28. remove_listener/1,
  29. update_listener/2
  30. ]).
  31. -export([
  32. authn/1,
  33. authn/2,
  34. add_authn/2,
  35. add_authn/3,
  36. update_authn/2,
  37. update_authn/3,
  38. remove_authn/1,
  39. remove_authn/2
  40. ]).
  41. %% Mgmt APIs - clients
  42. -export([
  43. lookup_client/3,
  44. kickout_client/2,
  45. list_client_subscriptions/2,
  46. client_subscribe/4,
  47. client_unsubscribe/3
  48. ]).
  49. %% Utils for http, swagger, etc.
  50. -export([
  51. return_http_error/2,
  52. with_gateway/2,
  53. with_authn/2,
  54. with_listener_authn/3,
  55. checks/2,
  56. reason2resp/1,
  57. reason2msg/1,
  58. sum_cluster_connections/1
  59. ]).
  60. %% RPC
  61. -export([gateway_status/1, cluster_gateway_status/1]).
  62. -type gateway_summary() ::
  63. #{
  64. name := binary(),
  65. status := running | stopped | unloaded,
  66. created_at => binary(),
  67. started_at => binary(),
  68. stopped_at => binary(),
  69. max_connections => integer(),
  70. current_connections => integer(),
  71. listeners => []
  72. }.
  73. -elvis([
  74. {elvis_style, god_modules, disable},
  75. {elvis_style, no_nested_try_catch, disable},
  76. {elvis_style, invalid_dynamic_call, disable}
  77. ]).
  78. -define(DEFAULT_CALL_TIMEOUT, 15000).
  79. %%--------------------------------------------------------------------
  80. %% Mgmt APIs - gateway
  81. %%--------------------------------------------------------------------
  82. -spec gateways(Status :: all | running | stopped | unloaded) ->
  83. [gateway_summary()].
  84. gateways(Status) ->
  85. Gateways = lists:map(
  86. fun({GwName, _}) ->
  87. case emqx_gateway:lookup(GwName) of
  88. undefined ->
  89. #{name => GwName, status => unloaded};
  90. GwInfo = #{config := Config} ->
  91. GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
  92. [created_at, started_at, stopped_at],
  93. GwInfo
  94. ),
  95. GwInfo1 = maps:with(
  96. [
  97. name,
  98. status,
  99. created_at,
  100. started_at,
  101. stopped_at
  102. ],
  103. GwInfo0
  104. ),
  105. NodeStatus = cluster_gateway_status(GwName),
  106. {MaxCons, CurrCons} = sum_cluster_connections(NodeStatus),
  107. GwInfo1#{
  108. max_connections => MaxCons,
  109. current_connections => CurrCons,
  110. listeners => get_listeners_status(GwName, Config),
  111. node_status => NodeStatus
  112. }
  113. end
  114. end,
  115. emqx_gateway_registry:list()
  116. ),
  117. case Status of
  118. all -> Gateways;
  119. _ -> [Gw || Gw = #{status := S} <- Gateways, S == Status]
  120. end.
  121. gateway_status(GwName) ->
  122. case emqx_gateway:lookup(GwName) of
  123. undefined ->
  124. #{node => node(), status => unloaded};
  125. #{status := Status, config := Config} ->
  126. #{
  127. node => node(),
  128. status => Status,
  129. max_connections => max_connections_count(Config),
  130. current_connections => current_connections_count(GwName)
  131. }
  132. end.
  133. cluster_gateway_status(GwName) ->
  134. Nodes = mria:running_nodes(),
  135. case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of
  136. {Results, []} ->
  137. Results;
  138. {_, _BadNodes} ->
  139. error(badrpc)
  140. end.
  141. %% @private
  142. max_connections_count(Config) ->
  143. Listeners = emqx_gateway_utils:normalize_config(Config),
  144. lists:foldl(
  145. fun({_, _, _, Conf0}, Acc) ->
  146. emqx_gateway_utils:plus_max_connections(
  147. Acc,
  148. maps:get(max_connections, Conf0, 0)
  149. )
  150. end,
  151. 0,
  152. Listeners
  153. ).
  154. %% @private
  155. current_connections_count(GwName) ->
  156. try
  157. InfoTab = emqx_gateway_cm:tabname(info, GwName),
  158. ets:info(InfoTab, size)
  159. catch
  160. _:_ ->
  161. 0
  162. end.
  163. %% @private
  164. get_listeners_status(GwName, Config) ->
  165. Listeners = emqx_gateway_utils:normalize_config(Config),
  166. lists:map(
  167. fun({Type, LisName, ListenOn, _}) ->
  168. Name0 = listener_id(GwName, Type, LisName),
  169. Name = {Name0, ListenOn},
  170. LisO = #{id => Name0, type => Type, name => LisName},
  171. case catch esockd:listener(Name) of
  172. _Pid when is_pid(_Pid) ->
  173. LisO#{running => true};
  174. _ ->
  175. LisO#{running => false}
  176. end
  177. end,
  178. Listeners
  179. ).
  180. %%--------------------------------------------------------------------
  181. %% Mgmt APIs - listeners
  182. %%--------------------------------------------------------------------
  183. -spec add_listener(atom() | binary(), map()) -> {ok, map()}.
  184. add_listener(ListenerId, NewConf0) ->
  185. {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
  186. NewConf = maps:without(
  187. [
  188. <<"id">>,
  189. <<"name">>,
  190. <<"type">>,
  191. <<"running">>
  192. ],
  193. NewConf0
  194. ),
  195. confexp(emqx_gateway_conf:add_listener(GwName, {Type, Name}, NewConf)).
  196. -spec update_listener(atom() | binary(), map()) -> {ok, map()}.
  197. update_listener(ListenerId, NewConf0) ->
  198. {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
  199. NewConf = maps:without(
  200. [
  201. <<"id">>,
  202. <<"name">>,
  203. <<"type">>,
  204. <<"running">>
  205. ],
  206. NewConf0
  207. ),
  208. confexp(emqx_gateway_conf:update_listener(GwName, {Type, Name}, NewConf)).
  209. -spec remove_listener(binary()) -> ok.
  210. remove_listener(ListenerId) ->
  211. {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
  212. confexp(emqx_gateway_conf:remove_listener(GwName, {Type, Name})).
  213. -spec authn(gateway_name()) -> map().
  214. authn(GwName) ->
  215. %% XXX: Need append chain-nanme, authenticator-id?
  216. Path = [gateway, GwName, ?AUTHN],
  217. ChainName = emqx_gateway_utils:global_chain(GwName),
  218. wrap_chain_name(
  219. ChainName,
  220. emqx_utils_maps:jsonable_map(emqx:get_raw_config(Path))
  221. ).
  222. -spec authn(gateway_name(), binary()) -> map().
  223. authn(GwName, ListenerId) ->
  224. {_, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
  225. Path = [gateway, GwName, listeners, Type, Name, ?AUTHN],
  226. ChainName = emqx_gateway_utils:listener_chain(GwName, Type, Name),
  227. wrap_chain_name(
  228. ChainName,
  229. emqx_utils_maps:jsonable_map(emqx:get_raw_config(Path))
  230. ).
  231. wrap_chain_name(ChainName, Conf) ->
  232. case emqx_authn_chains:list_authenticators(ChainName) of
  233. {ok, [#{id := Id} | _]} ->
  234. Conf#{chain_name => ChainName, id => Id};
  235. _ ->
  236. Conf
  237. end.
  238. -spec add_authn(gateway_name(), map()) -> {ok, map()}.
  239. add_authn(GwName, AuthConf) ->
  240. confexp(emqx_gateway_conf:add_authn(GwName, AuthConf)).
  241. -spec add_authn(gateway_name(), binary(), map()) -> {ok, map()}.
  242. add_authn(GwName, ListenerId, AuthConf) ->
  243. {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
  244. confexp(emqx_gateway_conf:add_authn(GwName, {LType, LName}, AuthConf)).
  245. -spec update_authn(gateway_name(), map()) -> {ok, map()}.
  246. update_authn(GwName, AuthConf) ->
  247. confexp(emqx_gateway_conf:update_authn(GwName, AuthConf)).
  248. -spec update_authn(gateway_name(), binary(), map()) -> {ok, map()}.
  249. update_authn(GwName, ListenerId, AuthConf) ->
  250. {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
  251. confexp(emqx_gateway_conf:update_authn(GwName, {LType, LName}, AuthConf)).
  252. -spec remove_authn(gateway_name()) -> ok.
  253. remove_authn(GwName) ->
  254. confexp(emqx_gateway_conf:remove_authn(GwName)).
  255. -spec remove_authn(gateway_name(), binary()) -> ok.
  256. remove_authn(GwName, ListenerId) ->
  257. {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
  258. confexp(emqx_gateway_conf:remove_authn(GwName, {LType, LName})).
  259. confexp(ok) -> ok;
  260. confexp({ok, Res}) -> {ok, Res};
  261. confexp({error, Reason}) -> error(Reason).
  262. %%--------------------------------------------------------------------
  263. %% Mgmt APIs - clients
  264. %%--------------------------------------------------------------------
  265. -spec lookup_client(
  266. gateway_name(),
  267. emqx_types:clientid(),
  268. {module(), atom()}
  269. ) -> list().
  270. lookup_client(GwName, ClientId, {M, F}) ->
  271. [
  272. begin
  273. Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid),
  274. Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid),
  275. M:F({{ClientId, Pid}, Info, Stats})
  276. end
  277. || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)
  278. ].
  279. -spec kickout_client(gateway_name(), emqx_types:clientid()) ->
  280. {error, any()}
  281. | ok.
  282. kickout_client(GwName, ClientId) ->
  283. Results = [
  284. emqx_gateway_cm:kick_session(GwName, ClientId, Pid)
  285. || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)
  286. ],
  287. IsOk = lists:any(fun(Item) -> Item =:= ok end, Results),
  288. case {IsOk, Results} of
  289. {true, _} -> ok;
  290. {_, []} -> {error, not_found};
  291. {false, _} -> lists:last(Results)
  292. end.
  293. -spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) ->
  294. {error, any()}
  295. | {ok, list()}.
  296. list_client_subscriptions(GwName, ClientId) ->
  297. case client_call(GwName, ClientId, subscriptions) of
  298. {error, Reason} ->
  299. {error, Reason};
  300. {ok, Subs} ->
  301. {ok,
  302. lists:map(
  303. fun({Topic, SubOpts}) ->
  304. SubOpts#{topic => Topic}
  305. end,
  306. Subs
  307. )}
  308. end.
  309. -spec client_subscribe(
  310. gateway_name(),
  311. emqx_types:clientid(),
  312. emqx_types:topic(),
  313. emqx_types:subopts()
  314. ) ->
  315. {error, any()}
  316. | {ok, {emqx_types:topic(), emqx_types:subopts()}}.
  317. client_subscribe(GwName, ClientId, Topic, SubOpts) ->
  318. client_call(GwName, ClientId, {subscribe, Topic, SubOpts}).
  319. -spec client_unsubscribe(
  320. gateway_name(),
  321. emqx_types:clientid(),
  322. emqx_types:topic()
  323. ) ->
  324. {error, any()}
  325. | ok.
  326. client_unsubscribe(GwName, ClientId, Topic) ->
  327. client_call(GwName, ClientId, {unsubscribe, Topic}).
  328. client_call(GwName, ClientId, Req) ->
  329. try
  330. emqx_gateway_cm:call(
  331. GwName,
  332. ClientId,
  333. Req,
  334. ?DEFAULT_CALL_TIMEOUT
  335. )
  336. of
  337. undefined ->
  338. {error, not_found};
  339. ignored ->
  340. {error, ignored};
  341. Res ->
  342. Res
  343. catch
  344. throw:noproc ->
  345. {error, not_found};
  346. throw:{badrpc, Reason} ->
  347. {error, {badrpc, Reason}}
  348. end.
  349. %%--------------------------------------------------------------------
  350. %% Utils
  351. %%--------------------------------------------------------------------
  352. -spec reason2resp({atom(), map()} | any()) -> binary() | any().
  353. reason2resp(R) ->
  354. case reason2msg(R) of
  355. error ->
  356. return_http_error(500, R);
  357. Msg ->
  358. return_http_error(400, Msg)
  359. end.
  360. -spec return_http_error(integer(), any()) -> {integer(), atom(), binary()}.
  361. return_http_error(Code, Msg) ->
  362. {Code, codestr(Code), emqx_gateway_utils:stringfy(Msg)}.
  363. -spec reason2msg({atom(), map()} | any()) -> error | string().
  364. reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
  365. NValue =
  366. case emqx_utils_json:safe_encode(Value) of
  367. {ok, Str} -> Str;
  368. {error, _} -> emqx_gateway_utils:stringfy(Value)
  369. end,
  370. fmtstr(
  371. "Bad config value '~s' for '~s', reason: ~s",
  372. [NValue, Key, emqx_gateway_utils:stringfy(Reason)]
  373. );
  374. reason2msg(
  375. {badres, #{
  376. resource := gateway,
  377. gateway := GwName,
  378. reason := not_found
  379. }}
  380. ) ->
  381. fmtstr("The ~s gateway is unloaded", [GwName]);
  382. reason2msg(
  383. {badres, #{
  384. resource := gateway,
  385. gateway := GwName,
  386. reason := already_exist
  387. }}
  388. ) ->
  389. fmtstr("The ~s gateway already loaded", [GwName]);
  390. reason2msg(
  391. {badres, #{
  392. resource := listener,
  393. listener := {GwName, LType, LName},
  394. reason := not_found
  395. }}
  396. ) ->
  397. fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]);
  398. reason2msg(
  399. {badres, #{
  400. resource := listener,
  401. listener := {GwName, LType, LName},
  402. reason := already_exist
  403. }}
  404. ) ->
  405. fmtstr(
  406. "The listener ~s of ~s already exist",
  407. [listener_id(GwName, LType, LName), GwName]
  408. );
  409. reason2msg(
  410. {badres, #{
  411. resource := authn,
  412. gateway := GwName,
  413. reason := not_found
  414. }}
  415. ) ->
  416. fmtstr("The authentication not found on ~s", [GwName]);
  417. reason2msg(
  418. {badres, #{
  419. resource := authn,
  420. gateway := GwName,
  421. reason := already_exist
  422. }}
  423. ) ->
  424. fmtstr("The authentication already exist on ~s", [GwName]);
  425. reason2msg(
  426. {badres, #{
  427. resource := listener_authn,
  428. listener := {GwName, LType, LName},
  429. reason := not_found
  430. }}
  431. ) ->
  432. fmtstr(
  433. "The authentication not found on ~s",
  434. [listener_id(GwName, LType, LName)]
  435. );
  436. reason2msg(
  437. {badres, #{
  438. resource := listener_authn,
  439. listener := {GwName, LType, LName},
  440. reason := already_exist
  441. }}
  442. ) ->
  443. fmtstr(
  444. "The authentication already exist on ~s",
  445. [listener_id(GwName, LType, LName)]
  446. );
  447. reason2msg(
  448. {bad_ssl_config, #{
  449. reason := Reason,
  450. which_options := Options
  451. }}
  452. ) ->
  453. fmtstr("Bad TLS configuration for ~p, reason: ~s", [Options, Reason]);
  454. reason2msg(
  455. {#{roots := [{gateway, _}]}, [_ | _]} = Error
  456. ) ->
  457. Bin = emqx_utils:readable_error_msg(Error),
  458. <<"Invalid configurations: ", Bin/binary>>;
  459. reason2msg(_) ->
  460. error.
  461. codestr(400) -> 'BAD_REQUEST';
  462. codestr(404) -> 'RESOURCE_NOT_FOUND';
  463. codestr(405) -> 'METHOD_NOT_ALLOWED';
  464. codestr(409) -> 'NOT_SUPPORT';
  465. codestr(500) -> 'UNKNOWN_ERROR';
  466. codestr(501) -> 'NOT_IMPLEMENTED'.
  467. fmtstr(Fmt, Args) ->
  468. lists:flatten(io_lib:format(Fmt, Args)).
  469. -spec with_authn(atom(), function()) -> any().
  470. with_authn(GwName0, Fun) ->
  471. with_gateway(GwName0, fun(GwName, _GwConf) ->
  472. Authn = emqx_gateway_http:authn(GwName),
  473. Fun(GwName, Authn)
  474. end).
  475. -spec with_listener_authn(atom(), binary(), function()) -> any().
  476. with_listener_authn(GwName0, Id, Fun) ->
  477. with_gateway(GwName0, fun(GwName, _GwConf) ->
  478. Authn = emqx_gateway_http:authn(GwName, Id),
  479. Fun(GwName, Authn)
  480. end).
  481. -spec with_gateway(atom(), function()) -> any().
  482. with_gateway(GwName, Fun) ->
  483. try
  484. case emqx_gateway:lookup(GwName) of
  485. undefined ->
  486. return_http_error(404, "Gateway not loaded");
  487. Gateway ->
  488. Fun(GwName, Gateway)
  489. end
  490. catch
  491. error:badname ->
  492. return_http_error(404, "Bad gateway name");
  493. %% Exceptions from: checks/2
  494. error:{miss_param, K} ->
  495. return_http_error(400, [K, " is required"]);
  496. %% Exceptions from emqx_gateway_utils:parse_listener_id/1
  497. error:{invalid_listener_id, Id} ->
  498. return_http_error(404, ["Listener not found: ", Id]);
  499. %% Exceptions from emqx:get_config/1
  500. error:{config_not_found, Path0} ->
  501. Path = lists:concat(
  502. lists:join(".", lists:map(fun to_list/1, Path0))
  503. ),
  504. return_http_error(404, "Resource not found. path: " ++ Path);
  505. error:{badmatch, {error, einval}} ->
  506. return_http_error(400, "Invalid bind address");
  507. error:{badauth, Reason} ->
  508. Reason1 = emqx_gateway_utils:stringfy(Reason),
  509. return_http_error(400, ["Bad authentication config: ", Reason1]);
  510. Class:Reason:Stk ->
  511. ?SLOG(error, #{
  512. msg => "uncaught_exception",
  513. exception => Class,
  514. reason => Reason,
  515. stacktrace => Stk
  516. }),
  517. reason2resp(Reason)
  518. end.
  519. -spec checks(list(), map()) -> ok.
  520. checks([], _) ->
  521. ok;
  522. checks([K | Ks], Map) ->
  523. case maps:is_key(K, Map) of
  524. true -> checks(Ks, Map);
  525. false -> error({miss_param, K})
  526. end.
  527. to_list(A) when is_atom(A) ->
  528. atom_to_list(A);
  529. to_list(B) when is_binary(B) ->
  530. binary_to_list(B).
  531. sum_cluster_connections(List) ->
  532. sum_cluster_connections(List, 0, 0).
  533. %%--------------------------------------------------------------------
  534. %% Internal funcs
  535. sum_cluster_connections(
  536. [#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc
  537. ) ->
  538. NMaxAcc = emqx_gateway_utils:plus_max_connections(MaxAcc, Max),
  539. sum_cluster_connections(T, NMaxAcc, Current + CurrAcc);
  540. sum_cluster_connections([_ | T], MaxAcc, CurrAcc) ->
  541. sum_cluster_connections(T, MaxAcc, CurrAcc);
  542. sum_cluster_connections([], MaxAcc, CurrAcc) ->
  543. {MaxAcc, CurrAcc}.