emqx_bridge_api.erl 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_bridge_api).
  17. -behaviour(minirest_api).
  18. -include_lib("typerefl/include/types.hrl").
  19. -include_lib("hocon/include/hoconsc.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -import(hoconsc, [mk/2, array/1, enum/1]).
  22. %% Swagger specs from hocon schema
  23. -export([
  24. api_spec/0,
  25. paths/0,
  26. schema/1,
  27. namespace/0
  28. ]).
  29. %% API callbacks
  30. -export([
  31. '/bridges'/2,
  32. '/bridges/:id'/2,
  33. '/bridges/:id/operation/:operation'/2,
  34. '/nodes/:node/bridges/:id/operation/:operation'/2,
  35. '/bridges/:id/reset_metrics'/2
  36. ]).
  37. -export([lookup_from_local_node/2]).
  38. -define(CONN_TYPES, [mqtt]).
  39. -define(TRY_PARSE_ID(ID, EXPR),
  40. try emqx_bridge_resource:parse_bridge_id(Id) of
  41. {BridgeType, BridgeName} ->
  42. EXPR
  43. catch
  44. error:{invalid_bridge_id, Id0} ->
  45. {400,
  46. error_msg(
  47. 'INVALID_ID',
  48. <<"invalid_bridge_id: ", Id0/binary,
  49. ". Bridge Ids must be of format {type}:{name}">>
  50. )}
  51. end
  52. ).
  53. -define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{
  54. matched => MATCH,
  55. success => SUCC,
  56. failed => FAILED,
  57. rate => RATE,
  58. rate_last5m => RATE_5,
  59. rate_max => RATE_MAX
  60. }).
  61. -define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{
  62. matched := MATCH,
  63. success := SUCC,
  64. failed := FAILED,
  65. rate := RATE,
  66. rate_last5m := RATE_5,
  67. rate_max := RATE_MAX
  68. }).
  69. namespace() -> "bridge".
  70. api_spec() ->
  71. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
  72. paths() ->
  73. [
  74. "/bridges",
  75. "/bridges/:id",
  76. "/bridges/:id/operation/:operation",
  77. "/nodes/:node/bridges/:id/operation/:operation",
  78. "/bridges/:id/reset_metrics"
  79. ].
  80. error_schema(Code, Message) when is_atom(Code) ->
  81. error_schema([Code], Message);
  82. error_schema(Codes, Message) when is_list(Message) ->
  83. error_schema(Codes, list_to_binary(Message));
  84. error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
  85. emqx_dashboard_swagger:error_codes(Codes, Message).
  86. get_response_body_schema() ->
  87. emqx_dashboard_swagger:schema_with_examples(
  88. emqx_bridge_schema:get_response(),
  89. bridge_info_examples(get)
  90. ).
  91. param_path_operation_cluster() ->
  92. {operation,
  93. mk(
  94. enum([enable, disable, stop, restart]),
  95. #{
  96. in => path,
  97. required => true,
  98. example => <<"start">>,
  99. desc => ?DESC("desc_param_path_operation_cluster")
  100. }
  101. )}.
  102. param_path_operation_on_node() ->
  103. {operation,
  104. mk(
  105. enum([stop, restart]),
  106. #{
  107. in => path,
  108. required => true,
  109. example => <<"start">>,
  110. desc => ?DESC("desc_param_path_operation_on_node")
  111. }
  112. )}.
  113. param_path_node() ->
  114. {node,
  115. mk(
  116. binary(),
  117. #{
  118. in => path,
  119. required => true,
  120. example => <<"emqx@127.0.0.1">>,
  121. desc => ?DESC("desc_param_path_node")
  122. }
  123. )}.
  124. param_path_id() ->
  125. {id,
  126. mk(
  127. binary(),
  128. #{
  129. in => path,
  130. required => true,
  131. example => <<"webhook:my_webhook">>,
  132. desc => ?DESC("desc_param_path_id")
  133. }
  134. )}.
  135. bridge_info_array_example(Method) ->
  136. [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
  137. bridge_info_examples(Method) ->
  138. maps:merge(conn_bridge_examples(Method), #{
  139. <<"my_webhook">> => #{
  140. summary => <<"WebHook">>,
  141. value => info_example(webhook, awesome, Method)
  142. }
  143. }).
  144. conn_bridge_examples(Method) ->
  145. lists:foldl(
  146. fun(Type, Acc) ->
  147. SType = atom_to_list(Type),
  148. KeyIngress = bin(SType ++ "_ingress"),
  149. KeyEgress = bin(SType ++ "_egress"),
  150. maps:merge(Acc, #{
  151. KeyIngress => #{
  152. summary => bin(string:uppercase(SType) ++ " Ingress Bridge"),
  153. value => info_example(Type, ingress, Method)
  154. },
  155. KeyEgress => #{
  156. summary => bin(string:uppercase(SType) ++ " Egress Bridge"),
  157. value => info_example(Type, egress, Method)
  158. }
  159. })
  160. end,
  161. #{},
  162. ?CONN_TYPES
  163. ).
  164. info_example(Type, Direction, Method) ->
  165. maps:merge(
  166. info_example_basic(Type, Direction),
  167. method_example(Type, Direction, Method)
  168. ).
  169. method_example(Type, Direction, Method) when Method == get; Method == post ->
  170. SType = atom_to_list(Type),
  171. SDir = atom_to_list(Direction),
  172. SName =
  173. case Type of
  174. webhook -> "my_" ++ SType;
  175. _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
  176. end,
  177. TypeNameExamp = #{
  178. type => bin(SType),
  179. name => bin(SName)
  180. },
  181. maybe_with_metrics_example(TypeNameExamp, Method);
  182. method_example(_Type, _Direction, put) ->
  183. #{}.
  184. maybe_with_metrics_example(TypeNameExamp, get) ->
  185. TypeNameExamp#{
  186. metrics => ?METRICS(0, 0, 0, 0, 0, 0),
  187. node_metrics => [
  188. #{
  189. node => node(),
  190. metrics => ?METRICS(0, 0, 0, 0, 0, 0)
  191. }
  192. ]
  193. };
  194. maybe_with_metrics_example(TypeNameExamp, _) ->
  195. TypeNameExamp.
  196. info_example_basic(webhook, _) ->
  197. #{
  198. enable => true,
  199. url => <<"http://localhost:9901/messages/${topic}">>,
  200. request_timeout => <<"15s">>,
  201. connect_timeout => <<"15s">>,
  202. max_retries => 3,
  203. retry_interval => <<"10s">>,
  204. pool_type => <<"random">>,
  205. pool_size => 4,
  206. enable_pipelining => 100,
  207. ssl => #{enable => false},
  208. local_topic => <<"emqx_webhook/#">>,
  209. method => post,
  210. body => <<"${payload}">>
  211. };
  212. info_example_basic(mqtt, ingress) ->
  213. #{
  214. enable => true,
  215. connector => <<"mqtt:my_mqtt_connector">>,
  216. direction => ingress,
  217. remote_topic => <<"aws/#">>,
  218. remote_qos => 1,
  219. local_topic => <<"from_aws/${topic}">>,
  220. local_qos => <<"${qos}">>,
  221. payload => <<"${payload}">>,
  222. retain => <<"${retain}">>
  223. };
  224. info_example_basic(mqtt, egress) ->
  225. #{
  226. enable => true,
  227. connector => <<"mqtt:my_mqtt_connector">>,
  228. direction => egress,
  229. local_topic => <<"emqx/#">>,
  230. remote_topic => <<"from_emqx/${topic}">>,
  231. remote_qos => <<"${qos}">>,
  232. payload => <<"${payload}">>,
  233. retain => false
  234. }.
  235. schema("/bridges") ->
  236. #{
  237. 'operationId' => '/bridges',
  238. get => #{
  239. tags => [<<"bridges">>],
  240. summary => <<"List Bridges">>,
  241. description => ?DESC("desc_api1"),
  242. responses => #{
  243. 200 => emqx_dashboard_swagger:schema_with_example(
  244. array(emqx_bridge_schema:get_response()),
  245. bridge_info_array_example(get)
  246. )
  247. }
  248. },
  249. post => #{
  250. tags => [<<"bridges">>],
  251. summary => <<"Create Bridge">>,
  252. description => ?DESC("desc_api2"),
  253. 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
  254. emqx_bridge_schema:post_request(),
  255. bridge_info_examples(post)
  256. ),
  257. responses => #{
  258. 201 => get_response_body_schema(),
  259. 400 => error_schema('ALREADY_EXISTS', "Bridge already exists")
  260. }
  261. }
  262. };
  263. schema("/bridges/:id") ->
  264. #{
  265. 'operationId' => '/bridges/:id',
  266. get => #{
  267. tags => [<<"bridges">>],
  268. summary => <<"Get Bridge">>,
  269. description => ?DESC("desc_api3"),
  270. parameters => [param_path_id()],
  271. responses => #{
  272. 200 => get_response_body_schema(),
  273. 404 => error_schema('NOT_FOUND', "Bridge not found")
  274. }
  275. },
  276. put => #{
  277. tags => [<<"bridges">>],
  278. summary => <<"Update Bridge">>,
  279. description => ?DESC("desc_api4"),
  280. parameters => [param_path_id()],
  281. 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
  282. emqx_bridge_schema:put_request(),
  283. bridge_info_examples(put)
  284. ),
  285. responses => #{
  286. 200 => get_response_body_schema(),
  287. 404 => error_schema('NOT_FOUND', "Bridge not found"),
  288. 400 => error_schema(['BAD_REQUEST', 'INVALID_ID'], "Update bridge failed")
  289. }
  290. },
  291. delete => #{
  292. tags => [<<"bridges">>],
  293. summary => <<"Delete Bridge">>,
  294. description => ?DESC("desc_api5"),
  295. parameters => [param_path_id()],
  296. responses => #{
  297. 204 => <<"Bridge deleted">>,
  298. 400 => error_schema(['INVALID_ID'], "Update bridge failed"),
  299. 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
  300. }
  301. }
  302. };
  303. schema("/bridges/:id/reset_metrics") ->
  304. #{
  305. 'operationId' => '/bridges/:id/reset_metrics',
  306. put => #{
  307. tags => [<<"bridges">>],
  308. summary => <<"Reset Bridge Metrics">>,
  309. description => ?DESC("desc_api6"),
  310. parameters => [param_path_id()],
  311. responses => #{
  312. 200 => <<"Reset success">>,
  313. 400 => error_schema(['BAD_REQUEST'], "RPC Call Failed")
  314. }
  315. }
  316. };
  317. schema("/bridges/:id/operation/:operation") ->
  318. #{
  319. 'operationId' => '/bridges/:id/operation/:operation',
  320. post => #{
  321. tags => [<<"bridges">>],
  322. summary => <<"Enable/Disable/Stop/Restart Bridge">>,
  323. description => ?DESC("desc_api7"),
  324. parameters => [
  325. param_path_id(),
  326. param_path_operation_cluster()
  327. ],
  328. responses => #{
  329. 200 => <<"Operation success">>,
  330. 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"),
  331. 400 => error_schema('INVALID_ID', "Bad bridge ID")
  332. }
  333. }
  334. };
  335. schema("/nodes/:node/bridges/:id/operation/:operation") ->
  336. #{
  337. 'operationId' => '/nodes/:node/bridges/:id/operation/:operation',
  338. post => #{
  339. tags => [<<"bridges">>],
  340. summary => <<"Stop/Restart Bridge">>,
  341. description => ?DESC("desc_api8"),
  342. parameters => [
  343. param_path_node(),
  344. param_path_id(),
  345. param_path_operation_on_node()
  346. ],
  347. responses => #{
  348. 200 => <<"Operation success">>,
  349. 400 => error_schema('INVALID_ID', "Bad bridge ID"),
  350. 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"),
  351. 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
  352. }
  353. }
  354. }.
  355. '/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
  356. Conf = filter_out_request_body(Conf0),
  357. case emqx_bridge:lookup(BridgeType, BridgeName) of
  358. {ok, _} ->
  359. {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
  360. {error, not_found} ->
  361. case ensure_bridge_created(BridgeType, BridgeName, Conf) of
  362. ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
  363. {error, Error} -> {400, Error}
  364. end
  365. end;
  366. '/bridges'(get, _Params) ->
  367. {200,
  368. zip_bridges([
  369. [format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)]
  370. || Node <- mria_mnesia:running_nodes()
  371. ])}.
  372. '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
  373. ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
  374. '/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf0}) ->
  375. Conf = filter_out_request_body(Conf0),
  376. ?TRY_PARSE_ID(
  377. Id,
  378. case emqx_bridge:lookup(BridgeType, BridgeName) of
  379. {ok, _} ->
  380. case ensure_bridge_created(BridgeType, BridgeName, Conf) of
  381. ok ->
  382. lookup_from_all_nodes(BridgeType, BridgeName, 200);
  383. {error, Error} ->
  384. {400, Error}
  385. end;
  386. {error, not_found} ->
  387. {404, error_msg('NOT_FOUND', <<"bridge not found">>)}
  388. end
  389. );
  390. '/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
  391. ?TRY_PARSE_ID(
  392. Id,
  393. case emqx_bridge:remove(BridgeType, BridgeName) of
  394. {ok, _} -> {204};
  395. {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
  396. {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
  397. end
  398. ).
  399. '/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) ->
  400. ?TRY_PARSE_ID(
  401. Id,
  402. case
  403. emqx_bridge_resource:reset_metrics(
  404. emqx_bridge_resource:resource_id(BridgeType, BridgeName)
  405. )
  406. of
  407. ok -> {200, <<"Reset success">>};
  408. Reason -> {400, error_msg('BAD_REQUEST', Reason)}
  409. end
  410. ).
  411. lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
  412. Nodes = mria_mnesia:running_nodes(),
  413. case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
  414. {ok, [{ok, _} | _] = Results} ->
  415. {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
  416. {ok, [{error, not_found} | _]} ->
  417. {404, error_msg('NOT_FOUND', <<"not_found">>)};
  418. {error, ErrL} ->
  419. {500, error_msg('INTERNAL_ERROR', ErrL)}
  420. end.
  421. lookup_from_local_node(BridgeType, BridgeName) ->
  422. case emqx_bridge:lookup(BridgeType, BridgeName) of
  423. {ok, Res} -> {ok, format_resp(Res)};
  424. Error -> Error
  425. end.
  426. '/bridges/:id/operation/:operation'(post, #{
  427. bindings :=
  428. #{id := Id, operation := Op}
  429. }) ->
  430. ?TRY_PARSE_ID(
  431. Id,
  432. case operation_func(Op) of
  433. invalid ->
  434. {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
  435. OperFunc when OperFunc == enable; OperFunc == disable ->
  436. case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of
  437. {ok, _} ->
  438. {200};
  439. {error, {pre_config_update, _, bridge_not_found}} ->
  440. {404, error_msg('NOT_FOUND', <<"bridge not found">>)};
  441. {error, {_, _, timeout}} ->
  442. {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
  443. {error, timeout} ->
  444. {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
  445. {error, Reason} ->
  446. {500, error_msg('INTERNAL_ERROR', Reason)}
  447. end;
  448. OperFunc ->
  449. Nodes = mria_mnesia:running_nodes(),
  450. operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName)
  451. end
  452. ).
  453. '/nodes/:node/bridges/:id/operation/:operation'(post, #{
  454. bindings :=
  455. #{id := Id, operation := Op, node := Node}
  456. }) ->
  457. ?TRY_PARSE_ID(
  458. Id,
  459. case node_operation_func(Op) of
  460. invalid ->
  461. {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
  462. OperFunc ->
  463. TargetNode = binary_to_atom(Node, utf8),
  464. ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]),
  465. case maps:get(enable, ConfMap, false) of
  466. false ->
  467. {403,
  468. error_msg(
  469. 'FORBIDDEN_REQUEST', <<"forbidden operation: bridge disabled">>
  470. )};
  471. true ->
  472. case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of
  473. ok ->
  474. {200};
  475. {error, timeout} ->
  476. {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
  477. {error, Reason} ->
  478. {500, error_msg('INTERNAL_ERROR', Reason)}
  479. end
  480. end
  481. end
  482. ).
  483. node_operation_func(<<"stop">>) -> stop_bridge_to_node;
  484. node_operation_func(<<"restart">>) -> restart_bridge_to_node;
  485. node_operation_func(_) -> invalid.
  486. operation_func(<<"stop">>) -> stop;
  487. operation_func(<<"restart">>) -> restart;
  488. operation_func(<<"enable">>) -> enable;
  489. operation_func(<<"disable">>) -> disable;
  490. operation_func(_) -> invalid.
  491. operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
  492. RpcFunc =
  493. case OperFunc of
  494. restart -> restart_bridges_to_all_nodes;
  495. stop -> stop_bridges_to_all_nodes
  496. end,
  497. case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
  498. {ok, _} ->
  499. {200};
  500. {error, [timeout | _]} ->
  501. {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
  502. {error, ErrL} ->
  503. {500, error_msg('INTERNAL_ERROR', ErrL)}
  504. end.
  505. ensure_bridge_created(BridgeType, BridgeName, Conf) ->
  506. case emqx_bridge:create(BridgeType, BridgeName, Conf) of
  507. {ok, _} -> ok;
  508. {error, Reason} -> {error, error_msg('BAD_REQUEST', Reason)}
  509. end.
  510. zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
  511. lists:foldl(
  512. fun(#{type := Type, name := Name}, Acc) ->
  513. Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
  514. [format_bridge_info(Bridges) | Acc]
  515. end,
  516. [],
  517. BridgesFirstNode
  518. ).
  519. pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
  520. lists:foldl(
  521. fun(BridgesOneNode, Acc) ->
  522. case
  523. [
  524. Bridge
  525. || Bridge = #{type := Type0, name := Name0} <- BridgesOneNode,
  526. Type0 == Type,
  527. Name0 == Name
  528. ]
  529. of
  530. [BridgeInfo] ->
  531. [BridgeInfo | Acc];
  532. [] ->
  533. ?SLOG(warning, #{
  534. msg => "bridge_inconsistent_in_cluster",
  535. reason => not_found,
  536. type => Type,
  537. name => Name,
  538. bridge => emqx_bridge_resource:bridge_id(Type, Name)
  539. }),
  540. Acc
  541. end
  542. end,
  543. [],
  544. BridgesAllNodes
  545. ).
  546. format_bridge_info([FirstBridge | _] = Bridges) ->
  547. Res = maps:remove(node, FirstBridge),
  548. NodeStatus = collect_status(Bridges),
  549. NodeMetrics = collect_metrics(Bridges),
  550. Res#{
  551. status => aggregate_status(NodeStatus),
  552. node_status => NodeStatus,
  553. metrics => aggregate_metrics(NodeMetrics),
  554. node_metrics => NodeMetrics
  555. }.
  556. collect_status(Bridges) ->
  557. [maps:with([node, status], B) || B <- Bridges].
  558. aggregate_status(AllStatus) ->
  559. Head = fun([A | _]) -> A end,
  560. HeadVal = maps:get(status, Head(AllStatus), connecting),
  561. AllRes = lists:all(fun(#{status := Val}) -> Val == HeadVal end, AllStatus),
  562. case AllRes of
  563. true -> HeadVal;
  564. false -> inconsistent
  565. end.
  566. collect_metrics(Bridges) ->
  567. [maps:with([node, metrics], B) || B <- Bridges].
  568. aggregate_metrics(AllMetrics) ->
  569. InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0),
  570. lists:foldl(
  571. fun(
  572. #{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
  573. ?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0)
  574. ) ->
  575. ?METRICS(
  576. Match1 + Match0,
  577. Succ1 + Succ0,
  578. Failed1 + Failed0,
  579. Rate1 + Rate0,
  580. Rate5m1 + Rate5m0,
  581. RateMax1 + RateMax0
  582. )
  583. end,
  584. InitMetrics,
  585. AllMetrics
  586. ).
  587. format_resp(Data) ->
  588. format_resp(Data, node()).
  589. format_resp(
  590. #{
  591. type := Type,
  592. name := BridgeName,
  593. raw_config := RawConf,
  594. resource_data := #{status := Status, metrics := Metrics}
  595. },
  596. Node
  597. ) ->
  598. RawConfFull = fill_defaults(Type, RawConf),
  599. RawConfFull#{
  600. type => Type,
  601. name => maps:get(<<"name">>, RawConf, BridgeName),
  602. node => Node,
  603. status => Status,
  604. metrics => format_metrics(Metrics)
  605. }.
  606. format_metrics(#{
  607. counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ},
  608. rate := #{
  609. matched := #{current := Rate, last5m := Rate5m, max := RateMax}
  610. }
  611. }) ->
  612. ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
  613. fill_defaults(Type, RawConf) ->
  614. PackedConf = pack_bridge_conf(Type, RawConf),
  615. FullConf = emqx_config:fill_defaults(emqx_bridge_schema, PackedConf, #{}),
  616. unpack_bridge_conf(Type, FullConf).
  617. pack_bridge_conf(Type, RawConf) ->
  618. #{<<"bridges">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
  619. unpack_bridge_conf(Type, PackedConf) ->
  620. #{<<"bridges">> := Bridges} = PackedConf,
  621. #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges),
  622. RawConf.
  623. is_ok(ResL) ->
  624. case
  625. lists:filter(
  626. fun
  627. ({ok, _}) -> false;
  628. (ok) -> false;
  629. (_) -> true
  630. end,
  631. ResL
  632. )
  633. of
  634. [] -> {ok, [Res || {ok, Res} <- ResL]};
  635. ErrL -> {error, ErrL}
  636. end.
  637. filter_out_request_body(Conf) ->
  638. ExtraConfs = [
  639. <<"id">>,
  640. <<"type">>,
  641. <<"name">>,
  642. <<"status">>,
  643. <<"node_status">>,
  644. <<"node_metrics">>,
  645. <<"metrics">>,
  646. <<"node">>
  647. ],
  648. maps:without(ExtraConfs, Conf).
  649. error_msg(Code, Msg) ->
  650. #{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
  651. bin(S) when is_list(S) ->
  652. list_to_binary(S);
  653. bin(S) when is_atom(S) ->
  654. atom_to_binary(S, utf8);
  655. bin(S) when is_binary(S) ->
  656. S.