emqx_bridge_api_SUITE.erl 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -import(emqx_mgmt_api_test_util, [uri/1]).
  19. -import(emqx_common_test_helpers, [on_exit/1]).
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("common_test/include/ct.hrl").
  22. -include_lib("snabbkaffe/include/test_macros.hrl").
  23. -define(BRIDGE_TYPE_HTTP, <<"webhook">>).
  24. -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
  25. -define(URL(PORT, PATH),
  26. list_to_binary(
  27. io_lib:format(
  28. "http://localhost:~s/~s",
  29. [integer_to_list(PORT), PATH]
  30. )
  31. )
  32. ).
  33. -define(BRIDGE(NAME, TYPE), #{
  34. <<"ssl">> => #{<<"enable">> => false},
  35. <<"type">> => TYPE,
  36. <<"name">> => NAME
  37. }).
  38. -define(BRIDGE_TYPE_MQTT, <<"mqtt">>).
  39. -define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{
  40. <<"server">> => SERVER,
  41. <<"username">> => <<"user1">>,
  42. <<"password">> => <<"">>,
  43. <<"proto_ver">> => <<"v5">>,
  44. <<"egress">> => #{
  45. <<"remote">> => #{
  46. <<"topic">> => <<"emqx/${topic}">>,
  47. <<"qos">> => <<"${qos}">>,
  48. <<"retain">> => false
  49. }
  50. }
  51. }).
  52. -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
  53. -define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{
  54. <<"url">> => URL,
  55. <<"local_topic">> => <<"emqx_webhook/#">>,
  56. <<"method">> => <<"post">>,
  57. <<"body">> => <<"${payload}">>,
  58. <<"headers">> => #{
  59. % NOTE
  60. % The Pascal-Case is important here.
  61. % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts
  62. % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS
  63. % when this happens (while the 'content-type' does not).
  64. <<"Content-Type">> => <<"application/json">>
  65. }
  66. }).
  67. -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
  68. -define(APPSPECS, [
  69. emqx,
  70. emqx_conf,
  71. emqx_auth,
  72. emqx_auth_mnesia,
  73. emqx_management,
  74. emqx_connector,
  75. emqx_bridge_http,
  76. {emqx_bridge, "actions {}\n bridges {}"},
  77. {emqx_rule_engine, "rule_engine { rules {} }"}
  78. ]).
  79. -define(APPSPEC_DASHBOARD,
  80. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  81. ).
  82. all() ->
  83. [
  84. {group, single},
  85. {group, cluster_later_join},
  86. {group, cluster}
  87. ].
  88. groups() ->
  89. AllTCs = emqx_common_test_helpers:all(?MODULE),
  90. SingleOnlyTests = [
  91. t_broken_bpapi_vsn,
  92. t_old_bpapi_vsn,
  93. t_bridges_probe
  94. ],
  95. ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
  96. [
  97. {single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
  98. {cluster_later_join, [], ClusterLaterJoinOnlyTCs},
  99. {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs}
  100. ].
  101. suite() ->
  102. [{timetrap, {seconds, 120}}].
  103. init_per_suite(Config) ->
  104. Config.
  105. end_per_suite(_Config) ->
  106. ok.
  107. init_per_group(cluster = Name, Config) ->
  108. Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
  109. init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
  110. init_per_group(cluster_later_join = Name, Config) ->
  111. Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
  112. init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
  113. init_per_group(_Name, Config) ->
  114. WorkDir = emqx_cth_suite:work_dir(Config),
  115. Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
  116. init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]).
  117. init_api(Config) ->
  118. APINode = ?config(node, Config),
  119. {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []),
  120. [{api, App} | Config].
  121. mk_cluster(Name, Config) ->
  122. mk_cluster(Name, Config, #{}).
  123. mk_cluster(Name, Config, Opts) ->
  124. Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD],
  125. Node2Apps = ?APPSPECS,
  126. emqx_cth_cluster:start(
  127. [
  128. {emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}},
  129. {emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}}
  130. ],
  131. #{work_dir => emqx_cth_suite:work_dir(Name, Config)}
  132. ).
  133. end_per_group(Group, Config) when
  134. Group =:= cluster;
  135. Group =:= cluster_later_join
  136. ->
  137. ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
  138. end_per_group(_, Config) ->
  139. emqx_cth_suite:stop(?config(group_apps, Config)),
  140. ok.
  141. init_per_testcase(t_broken_bpapi_vsn, Config) ->
  142. meck:new(emqx_bpapi, [passthrough]),
  143. meck:expect(emqx_bpapi, supported_version, 2, -1),
  144. meck:new(emqx_bridge_api, [passthrough]),
  145. meck:expect(emqx_bridge_api, supported_versions, 1, []),
  146. init_per_testcase(common, Config);
  147. init_per_testcase(t_old_bpapi_vsn, Config) ->
  148. meck:new(emqx_bpapi, [passthrough]),
  149. meck:expect(emqx_bpapi, supported_version, 1, 1),
  150. meck:expect(emqx_bpapi, supported_version, 2, 1),
  151. init_per_testcase(common, Config);
  152. init_per_testcase(_, Config) ->
  153. {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
  154. [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
  155. end_per_testcase(t_broken_bpapi_vsn, Config) ->
  156. meck:unload(),
  157. end_per_testcase(common, Config);
  158. end_per_testcase(t_old_bpapi_vsn, Config) ->
  159. meck:unload(),
  160. end_per_testcase(common, Config);
  161. end_per_testcase(_, Config) ->
  162. Sock = ?config(sock, Config),
  163. Acceptor = ?config(acceptor, Config),
  164. Node = ?config(node, Config),
  165. ok = emqx_common_test_helpers:call_janitor(),
  166. ok = stop_http_server(Sock, Acceptor),
  167. ok = erpc:call(Node, fun clear_resources/0),
  168. ok.
  169. clear_resources() ->
  170. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
  171. lists:foreach(
  172. fun(#{type := Type, name := Name}) ->
  173. ok = emqx_bridge:remove(Type, Name)
  174. end,
  175. emqx_bridge:list()
  176. ).
  177. %%------------------------------------------------------------------------------
  178. %% HTTP server for testing
  179. %%------------------------------------------------------------------------------
  180. start_http_server(HandleFun) ->
  181. process_flag(trap_exit, true),
  182. Parent = self(),
  183. {Port, Sock} = listen_on_random_port(),
  184. Acceptor = spawn_link(fun() ->
  185. accept_loop(Sock, HandleFun, Parent)
  186. end),
  187. timer:sleep(100),
  188. {Port, Sock, Acceptor}.
  189. stop_http_server(Sock, Acceptor) ->
  190. exit(Acceptor, kill),
  191. gen_tcp:close(Sock).
  192. listen_on_random_port() ->
  193. SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
  194. case gen_tcp:listen(0, SockOpts) of
  195. {ok, Sock} ->
  196. {ok, Port} = inet:port(Sock),
  197. {Port, Sock};
  198. {error, Reason} when Reason /= eaddrinuse ->
  199. {error, Reason}
  200. end.
  201. accept_loop(Sock, HandleFun, Parent) ->
  202. process_flag(trap_exit, true),
  203. {ok, Conn} = gen_tcp:accept(Sock),
  204. Handler = spawn_link(fun() -> HandleFun(Conn, Parent) end),
  205. gen_tcp:controlling_process(Conn, Handler),
  206. accept_loop(Sock, HandleFun, Parent).
  207. make_response(CodeStr, Str) ->
  208. B = iolist_to_binary(Str),
  209. iolist_to_binary(
  210. io_lib:fwrite(
  211. "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
  212. [CodeStr, size(B), B]
  213. )
  214. ).
  215. handle_fun_200_ok(Conn, Parent) ->
  216. case gen_tcp:recv(Conn, 0) of
  217. {ok, ReqStr} ->
  218. ct:pal("the http handler got request: ~p", [ReqStr]),
  219. Req = parse_http_request(ReqStr),
  220. Parent ! {http_server, received, Req},
  221. gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
  222. handle_fun_200_ok(Conn, Parent);
  223. {error, Reason} ->
  224. ct:pal("the http handler recv error: ~p", [Reason]),
  225. timer:sleep(100),
  226. gen_tcp:close(Conn)
  227. end.
  228. parse_http_request(ReqStr0) ->
  229. [Method, ReqStr1] = string:split(ReqStr0, " ", leading),
  230. [Path, ReqStr2] = string:split(ReqStr1, " ", leading),
  231. [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
  232. [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading),
  233. #{method => Method, path => Path, body => Body}.
  234. %%------------------------------------------------------------------------------
  235. %% Testcases
  236. %%------------------------------------------------------------------------------
  237. t_http_crud_apis(Config) ->
  238. Port = ?config(port, Config),
  239. %% assert we there's no bridges at first
  240. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  241. {ok, 404, _} = request(get, uri(["bridges", "foo"]), Config),
  242. {ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), Config),
  243. %% then we add a webhook bridge, using POST
  244. %% POST /bridges/ will create a bridge
  245. URL1 = ?URL(Port, "path1"),
  246. Name = ?BRIDGE_NAME,
  247. ?assertMatch(
  248. {ok, 201, #{
  249. <<"type">> := ?BRIDGE_TYPE_HTTP,
  250. <<"name">> := Name,
  251. <<"enable">> := true,
  252. <<"status">> := _,
  253. <<"node_status">> := [_ | _],
  254. <<"url">> := URL1
  255. }},
  256. request_json(
  257. post,
  258. uri(["bridges"]),
  259. ?HTTP_BRIDGE(URL1, Name),
  260. Config
  261. )
  262. ),
  263. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  264. %% send an message to emqx and the message should be forwarded to the HTTP server
  265. Body = <<"my msg">>,
  266. _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
  267. ?assert(
  268. receive
  269. {http_server, received, #{
  270. method := <<"POST">>,
  271. path := <<"/path1">>,
  272. body := Body
  273. }} ->
  274. true;
  275. Msg ->
  276. ct:pal("error: http got unexpected request: ~p", [Msg]),
  277. false
  278. after 100 ->
  279. false
  280. end
  281. ),
  282. %% update the request-path of the bridge
  283. URL2 = ?URL(Port, "path2"),
  284. ?assertMatch(
  285. {ok, 200, #{
  286. <<"type">> := ?BRIDGE_TYPE_HTTP,
  287. <<"name">> := Name,
  288. <<"enable">> := true,
  289. <<"status">> := _,
  290. <<"node_status">> := [_ | _],
  291. <<"url">> := URL2
  292. }},
  293. request_json(
  294. put,
  295. uri(["bridges", BridgeID]),
  296. ?HTTP_BRIDGE(URL2, Name),
  297. Config
  298. )
  299. ),
  300. %% list all bridges again, assert Bridge2 is in it
  301. ?assertMatch(
  302. {ok, 200, [
  303. #{
  304. <<"type">> := ?BRIDGE_TYPE_HTTP,
  305. <<"name">> := Name,
  306. <<"enable">> := true,
  307. <<"status">> := _,
  308. <<"node_status">> := [_ | _],
  309. <<"url">> := URL2
  310. }
  311. ]},
  312. request_json(get, uri(["bridges"]), Config)
  313. ),
  314. %% get the bridge by id
  315. ?assertMatch(
  316. {ok, 200, #{
  317. <<"type">> := ?BRIDGE_TYPE_HTTP,
  318. <<"name">> := Name,
  319. <<"enable">> := true,
  320. <<"status">> := _,
  321. <<"node_status">> := [_ | _],
  322. <<"url">> := URL2
  323. }},
  324. request_json(get, uri(["bridges", BridgeID]), Config)
  325. ),
  326. %% send an message to emqx again, check the path has been changed
  327. _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
  328. ?assert(
  329. receive
  330. {http_server, received, #{path := <<"/path2">>}} ->
  331. true;
  332. Msg2 ->
  333. ct:pal("error: http got unexpected request: ~p", [Msg2]),
  334. false
  335. after 100 ->
  336. false
  337. end
  338. ),
  339. %% Test bad updates
  340. %% ================
  341. %% Add bridge with a name that is too long
  342. %% We only support bridge names up to 255 characters
  343. LongName = list_to_binary(lists:duplicate(256, $a)),
  344. NameTooLongRequestResult = request_json(
  345. post,
  346. uri(["bridges"]),
  347. ?HTTP_BRIDGE(URL1, LongName),
  348. Config
  349. ),
  350. ?assertMatch(
  351. {ok, 400, _},
  352. NameTooLongRequestResult
  353. ),
  354. {ok, 400, #{<<"message">> := NameTooLongMessage}} = NameTooLongRequestResult,
  355. %% Use regex to check that the message contains the name
  356. Match = re:run(NameTooLongMessage, LongName),
  357. ?assertMatch({match, _}, Match),
  358. %% Add bridge without the URL field
  359. {ok, 400, PutFail1} = request_json(
  360. put,
  361. uri(["bridges", BridgeID]),
  362. maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name)),
  363. Config
  364. ),
  365. ?assertMatch(
  366. #{<<"reason">> := <<"required_field">>},
  367. json(maps:get(<<"message">>, PutFail1))
  368. ),
  369. {ok, 400, PutFail2} = request_json(
  370. put,
  371. uri(["bridges", BridgeID]),
  372. maps:put(<<"curl">>, URL2, maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name))),
  373. Config
  374. ),
  375. ?assertMatch(
  376. #{<<"reason">> := <<"required_field">>},
  377. json(maps:get(<<"message">>, PutFail2))
  378. ),
  379. {ok, 400, _} = request_json(
  380. put,
  381. uri(["bridges", BridgeID]),
  382. ?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name),
  383. Config
  384. ),
  385. {ok, 400, PutFail3} = request_json(
  386. put,
  387. uri(["bridges", BridgeID]),
  388. ?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name),
  389. Config
  390. ),
  391. ?assertMatch(
  392. #{<<"kind">> := <<"validation_error">>},
  393. json(maps:get(<<"message">>, PutFail3))
  394. ),
  395. %% delete the bridge
  396. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
  397. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  398. %% update a deleted bridge returns an error
  399. ?assertMatch(
  400. {ok, 404, #{
  401. <<"code">> := <<"NOT_FOUND">>,
  402. <<"message">> := _
  403. }},
  404. request_json(
  405. put,
  406. uri(["bridges", BridgeID]),
  407. ?HTTP_BRIDGE(URL2, Name),
  408. Config
  409. )
  410. ),
  411. %% try delete bad bridge id
  412. ?assertMatch(
  413. {ok, 404, #{
  414. <<"code">> := <<"NOT_FOUND">>,
  415. <<"message">> := <<"Invalid bridge ID", _/binary>>
  416. }},
  417. request_json(delete, uri(["bridges", "foo"]), Config)
  418. ),
  419. %% Deleting a non-existing bridge should result in an error
  420. ?assertMatch(
  421. {ok, 404, #{
  422. <<"code">> := <<"NOT_FOUND">>,
  423. <<"message">> := _
  424. }},
  425. request_json(delete, uri(["bridges", BridgeID]), Config)
  426. ),
  427. %% Create non working bridge
  428. BrokenURL = ?URL(Port + 1, "foo"),
  429. {ok, 201, BrokenBridge} = request(
  430. post,
  431. uri(["bridges"]),
  432. ?HTTP_BRIDGE(BrokenURL, Name),
  433. fun json/1,
  434. Config
  435. ),
  436. ?assertMatch(
  437. #{
  438. <<"type">> := ?BRIDGE_TYPE_HTTP,
  439. <<"name">> := Name,
  440. <<"enable">> := true,
  441. <<"status">> := <<"disconnected">>,
  442. <<"status_reason">> := <<"Connection refused">>,
  443. <<"node_status">> := [
  444. #{
  445. <<"status">> := <<"disconnected">>,
  446. <<"status_reason">> := <<"Connection refused">>
  447. }
  448. | _
  449. ],
  450. <<"url">> := BrokenURL
  451. },
  452. BrokenBridge
  453. ),
  454. {ok, 200, FixedBridge} = request_json(
  455. put,
  456. uri(["bridges", BridgeID]),
  457. ?HTTP_BRIDGE(URL1),
  458. Config
  459. ),
  460. ?assertMatch(
  461. #{
  462. <<"status">> := <<"connected">>,
  463. <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _]
  464. } when
  465. not is_map_key(<<"status_reason">>, FixedBridge) andalso
  466. not is_map_key(<<"status_reason">>, FixedNodeStatus),
  467. FixedBridge
  468. ),
  469. %% Try create bridge with bad characters as name
  470. {ok, 400, _} = request(post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, <<"隋达"/utf8>>), Config),
  471. %% Missing scheme in URL
  472. {ok, 400, _} = request(
  473. post,
  474. uri(["bridges"]),
  475. ?HTTP_BRIDGE(<<"localhost:1234/foo">>, <<"missing_url_scheme">>),
  476. Config
  477. ),
  478. %% Invalid port
  479. {ok, 400, _} = request(
  480. post,
  481. uri(["bridges"]),
  482. ?HTTP_BRIDGE(<<"http://localhost:12341234/foo">>, <<"invalid_port">>),
  483. Config
  484. ),
  485. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
  486. t_http_bridges_local_topic(Config) ->
  487. Port = ?config(port, Config),
  488. %% assert we there's no bridges at first
  489. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  490. %% then we add a webhook bridge, using POST
  491. %% POST /bridges/ will create a bridge
  492. URL1 = ?URL(Port, "path1"),
  493. Name1 = <<"t_http_bridges_with_local_topic1">>,
  494. Name2 = <<"t_http_bridges_without_local_topic1">>,
  495. %% create one http bridge with local_topic
  496. {ok, 201, _} = request(
  497. post,
  498. uri(["bridges"]),
  499. ?HTTP_BRIDGE(URL1, Name1),
  500. Config
  501. ),
  502. %% and we create another one without local_topic
  503. {ok, 201, _} = request(
  504. post,
  505. uri(["bridges"]),
  506. maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2)),
  507. Config
  508. ),
  509. BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name1),
  510. BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name2),
  511. %% Send an message to emqx and the message should be forwarded to the HTTP server.
  512. %% This is to verify we can have 2 bridges with and without local_topic fields
  513. %% at the same time.
  514. Body = <<"my msg">>,
  515. _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
  516. ?assert(
  517. receive
  518. {http_server, received, #{
  519. method := <<"POST">>,
  520. path := <<"/path1">>,
  521. body := Body
  522. }} ->
  523. true;
  524. Msg ->
  525. ct:pal("error: http got unexpected request: ~p", [Msg]),
  526. false
  527. after 100 ->
  528. false
  529. end
  530. ),
  531. %% delete the bridge
  532. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), Config),
  533. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), Config).
  534. t_check_dependent_actions_on_delete(Config) ->
  535. Port = ?config(port, Config),
  536. %% assert we there's no bridges at first
  537. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  538. %% then we add a webhook bridge, using POST
  539. %% POST /bridges/ will create a bridge
  540. URL1 = ?URL(Port, "path1"),
  541. Name = <<"t_http_crud_apis">>,
  542. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  543. {ok, 201, _} = request(
  544. post,
  545. uri(["bridges"]),
  546. ?HTTP_BRIDGE(URL1, Name),
  547. Config
  548. ),
  549. {ok, 201, #{<<"id">> := RuleId}} = request_json(
  550. post,
  551. uri(["rules"]),
  552. #{
  553. <<"name">> => <<"t_http_crud_apis">>,
  554. <<"enable">> => true,
  555. <<"actions">> => [BridgeID],
  556. <<"sql">> => <<"SELECT * from \"t\"">>
  557. },
  558. Config
  559. ),
  560. %% deleting the bridge should fail because there is a rule that depends on it
  561. {ok, 400, Body} = request(
  562. delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", Config
  563. ),
  564. ?assertMatch(#{<<"rules">> := [_ | _]}, emqx_utils_json:decode(Body, [return_maps])),
  565. %% delete the rule first
  566. {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config),
  567. %% then delete the bridge is OK
  568. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
  569. {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
  570. t_cascade_delete_actions(Config) ->
  571. Port = ?config(port, Config),
  572. %% assert we there's no bridges at first
  573. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  574. %% then we add a webhook bridge, using POST
  575. %% POST /bridges/ will create a bridge
  576. URL1 = ?URL(Port, "path1"),
  577. Name = <<"t_http_crud_apis">>,
  578. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  579. {ok, 201, _} = request(
  580. post,
  581. uri(["bridges"]),
  582. ?HTTP_BRIDGE(URL1, Name),
  583. Config
  584. ),
  585. {ok, 201, #{<<"id">> := RuleId}} = request_json(
  586. post,
  587. uri(["rules"]),
  588. #{
  589. <<"name">> => <<"t_http_crud_apis">>,
  590. <<"enable">> => true,
  591. <<"actions">> => [BridgeID],
  592. <<"sql">> => <<"SELECT * from \"t\"">>
  593. },
  594. Config
  595. ),
  596. %% delete the bridge will also delete the actions from the rules
  597. {ok, 204, _} = request(
  598. delete,
  599. uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true",
  600. Config
  601. ),
  602. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  603. ?assertMatch(
  604. {ok, 200, #{<<"actions">> := []}},
  605. request_json(get, uri(["rules", RuleId]), Config)
  606. ),
  607. {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config),
  608. {ok, 201, _} = request(
  609. post,
  610. uri(["bridges"]),
  611. ?HTTP_BRIDGE(URL1, Name),
  612. Config
  613. ),
  614. {ok, 201, _} = request(
  615. post,
  616. uri(["rules"]),
  617. #{
  618. <<"name">> => <<"t_http_crud_apis">>,
  619. <<"enable">> => true,
  620. <<"actions">> => [BridgeID],
  621. <<"sql">> => <<"SELECT * from \"t\"">>
  622. },
  623. Config
  624. ),
  625. {ok, 204, _} = request(
  626. delete,
  627. uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions",
  628. Config
  629. ),
  630. {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
  631. t_broken_bpapi_vsn(Config) ->
  632. Port = ?config(port, Config),
  633. URL1 = ?URL(Port, "abc"),
  634. Name = <<"t_bad_bpapi_vsn">>,
  635. {ok, 201, _Bridge} = request(
  636. post,
  637. uri(["bridges"]),
  638. ?HTTP_BRIDGE(URL1, Name),
  639. Config
  640. ),
  641. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  642. %% still works since we redirect to 'restart'
  643. {ok, 501, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config),
  644. {ok, 501, <<>>} = request(post, {operation, node, start, BridgeID}, Config),
  645. ok.
  646. t_old_bpapi_vsn(Config) ->
  647. Port = ?config(port, Config),
  648. URL1 = ?URL(Port, "abc"),
  649. Name = <<"t_bad_bpapi_vsn">>,
  650. {ok, 201, _Bridge} = request(
  651. post,
  652. uri(["bridges"]),
  653. ?HTTP_BRIDGE(URL1, Name),
  654. Config
  655. ),
  656. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  657. {ok, 204, <<>>} = request(post, {operation, cluster, stop, BridgeID}, Config),
  658. {ok, 204, <<>>} = request(post, {operation, node, stop, BridgeID}, Config),
  659. %% still works since we redirect to 'restart'
  660. {ok, 204, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config),
  661. {ok, 204, <<>>} = request(post, {operation, node, start, BridgeID}, Config),
  662. {ok, 204, <<>>} = request(post, {operation, cluster, restart, BridgeID}, Config),
  663. {ok, 204, <<>>} = request(post, {operation, node, restart, BridgeID}, Config),
  664. ok.
  665. t_start_bridge_unknown_node(Config) ->
  666. {ok, 404, _} =
  667. request(
  668. post,
  669. uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]),
  670. Config
  671. ),
  672. {ok, 404, _} =
  673. request(
  674. post,
  675. uri(["nodes", "undefined", "bridges", "webhook:foo", start]),
  676. Config
  677. ).
  678. t_start_stop_bridges_node(Config) ->
  679. do_start_stop_bridges(node, Config).
  680. t_start_stop_bridges_cluster(Config) ->
  681. do_start_stop_bridges(cluster, Config).
  682. do_start_stop_bridges(Type, Config) ->
  683. %% assert we there's no bridges at first
  684. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  685. Port = ?config(port, Config),
  686. URL1 = ?URL(Port, "abc"),
  687. Name = atom_to_binary(Type),
  688. ?assertMatch(
  689. {ok, 201, #{
  690. <<"type">> := ?BRIDGE_TYPE_HTTP,
  691. <<"name">> := Name,
  692. <<"enable">> := true,
  693. <<"status">> := <<"connected">>,
  694. <<"node_status">> := [_ | _],
  695. <<"url">> := URL1
  696. }},
  697. request_json(
  698. post,
  699. uri(["bridges"]),
  700. ?HTTP_BRIDGE(URL1, Name),
  701. Config
  702. )
  703. ),
  704. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  705. ExpectedStatus =
  706. case ?config(group, Config) of
  707. cluster when Type == node ->
  708. <<"inconsistent">>;
  709. _ ->
  710. <<"stopped">>
  711. end,
  712. %% stop it
  713. {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config),
  714. ?assertMatch(
  715. {ok, 200, #{<<"status">> := ExpectedStatus}},
  716. request_json(get, uri(["bridges", BridgeID]), Config)
  717. ),
  718. %% start again
  719. {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config),
  720. ?assertMatch(
  721. {ok, 200, #{<<"status">> := <<"connected">>}},
  722. request_json(get, uri(["bridges", BridgeID]), Config)
  723. ),
  724. %% start a started bridge
  725. {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config),
  726. ?assertMatch(
  727. {ok, 200, #{<<"status">> := <<"connected">>}},
  728. request_json(get, uri(["bridges", BridgeID]), Config)
  729. ),
  730. %% restart an already started bridge
  731. {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config),
  732. ?assertMatch(
  733. {ok, 200, #{<<"status">> := <<"connected">>}},
  734. request_json(get, uri(["bridges", BridgeID]), Config)
  735. ),
  736. %% stop it again
  737. {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config),
  738. %% restart a stopped bridge
  739. {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config),
  740. ?assertMatch(
  741. {ok, 200, #{<<"status">> := <<"connected">>}},
  742. request_json(get, uri(["bridges", BridgeID]), Config)
  743. ),
  744. {ok, 404, _} = request(post, {operation, Type, invalidop, BridgeID}, Config),
  745. %% delete the bridge
  746. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
  747. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  748. %% Fail parse-id check
  749. {ok, 404, _} = request(post, {operation, Type, start, <<"wreckbook_fugazi">>}, Config),
  750. %% Looks ok but doesn't exist
  751. {ok, 404, _} = request(post, {operation, Type, start, <<"webhook:cptn_hook">>}, Config),
  752. %% Create broken bridge
  753. {ListenPort, Sock} = listen_on_random_port(),
  754. %% Connecting to this endpoint should always timeout
  755. BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
  756. BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
  757. CreateRes0 = request_json(
  758. post,
  759. uri(["bridges"]),
  760. ?MQTT_BRIDGE(BadServer, BadName),
  761. Config
  762. ),
  763. ?assertMatch(
  764. {ok, 201, #{
  765. <<"type">> := ?BRIDGE_TYPE_MQTT,
  766. <<"name">> := BadName,
  767. <<"enable">> := true,
  768. <<"server">> := BadServer
  769. }},
  770. CreateRes0
  771. ),
  772. {ok, 201, CreateRes1} = CreateRes0,
  773. case CreateRes1 of
  774. #{
  775. <<"node_status">> := [
  776. #{
  777. <<"status">> := <<"disconnected">>,
  778. <<"status_reason">> := <<"connack_timeout">>
  779. },
  780. #{<<"status">> := <<"connecting">>}
  781. | _
  782. ],
  783. %% `inconsistent': one node is `?status_disconnected' (because it has already
  784. %% timed out), the other node is `?status_connecting' (started later and
  785. %% haven't timed out yet)
  786. <<"status">> := <<"inconsistent">>,
  787. <<"status_reason">> := <<"connack_timeout">>
  788. } ->
  789. ok;
  790. #{
  791. <<"node_status">> := [_, _ | _],
  792. <<"status">> := <<"disconnected">>,
  793. <<"status_reason">> := <<"connack_timeout">>
  794. } ->
  795. ok;
  796. #{
  797. <<"node_status">> := [_],
  798. <<"status">> := <<"connecting">>
  799. } ->
  800. ok;
  801. _ ->
  802. error({unexpected_result, CreateRes1})
  803. end,
  804. BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
  805. ?assertMatch(
  806. %% request from product: return 400 on such errors
  807. {ok, SC, _} when SC == 500 orelse SC == 400,
  808. request(post, {operation, Type, start, BadBridgeID}, Config)
  809. ),
  810. ok = gen_tcp:close(Sock),
  811. ok.
  812. t_start_stop_inconsistent_bridge_node(Config) ->
  813. start_stop_inconsistent_bridge(node, Config).
  814. t_start_stop_inconsistent_bridge_cluster(Config) ->
  815. start_stop_inconsistent_bridge(cluster, Config).
  816. start_stop_inconsistent_bridge(Type, Config) ->
  817. Port = ?config(port, Config),
  818. URL = ?URL(Port, "abc"),
  819. Node = ?config(node, Config),
  820. erpc:call(Node, fun() ->
  821. meck:new(emqx_bridge_resource, [passthrough, no_link]),
  822. meck:expect(
  823. emqx_bridge_resource,
  824. stop,
  825. fun
  826. (_, <<"bridge_not_found">>) -> {error, not_found};
  827. (BridgeType, Name) -> meck:passthrough([BridgeType, Name])
  828. end
  829. )
  830. end),
  831. on_exit(fun() ->
  832. erpc:call(Node, fun() ->
  833. meck:unload([emqx_bridge_resource])
  834. end)
  835. end),
  836. {ok, 201, _Bridge} = request(
  837. post,
  838. uri(["bridges"]),
  839. ?HTTP_BRIDGE(URL, <<"bridge_not_found">>),
  840. Config
  841. ),
  842. {ok, 503, _} = request(
  843. post, {operation, Type, stop, <<"webhook:bridge_not_found">>}, Config
  844. ).
  845. t_enable_disable_bridges(Config) ->
  846. %% assert we there's no bridges at first
  847. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  848. Name = ?BRIDGE_NAME,
  849. Port = ?config(port, Config),
  850. URL1 = ?URL(Port, "abc"),
  851. ?assertMatch(
  852. {ok, 201, #{
  853. <<"type">> := ?BRIDGE_TYPE_HTTP,
  854. <<"name">> := Name,
  855. <<"enable">> := true,
  856. <<"status">> := <<"connected">>,
  857. <<"node_status">> := [_ | _],
  858. <<"url">> := URL1
  859. }},
  860. request_json(
  861. post,
  862. uri(["bridges"]),
  863. ?HTTP_BRIDGE(URL1, Name),
  864. Config
  865. )
  866. ),
  867. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  868. %% disable it
  869. {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config),
  870. ?assertMatch(
  871. {ok, 200, #{<<"status">> := <<"stopped">>}},
  872. request_json(get, uri(["bridges", BridgeID]), Config)
  873. ),
  874. %% enable again
  875. {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
  876. ?assertMatch(
  877. {ok, 200, #{<<"status">> := <<"connected">>}},
  878. request_json(get, uri(["bridges", BridgeID]), Config)
  879. ),
  880. %% enable an already started bridge
  881. {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
  882. ?assertMatch(
  883. {ok, 200, #{<<"status">> := <<"connected">>}},
  884. request_json(get, uri(["bridges", BridgeID]), Config)
  885. ),
  886. %% disable it again
  887. {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config),
  888. %% bad param
  889. {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config),
  890. {ok, 404, _} = request(put, enable_path(true, "foo"), Config),
  891. {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config),
  892. {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config),
  893. ?assertEqual(
  894. #{
  895. <<"code">> => <<"BAD_REQUEST">>,
  896. <<"message">> => <<"Forbidden operation, bridge not enabled">>
  897. },
  898. Res
  899. ),
  900. {ok, 400, Res} = request(post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config),
  901. %% enable a stopped bridge
  902. {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
  903. ?assertMatch(
  904. {ok, 200, #{<<"status">> := <<"connected">>}},
  905. request_json(get, uri(["bridges", BridgeID]), Config)
  906. ),
  907. %% delete the bridge
  908. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
  909. {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
  910. t_reset_bridges(Config) ->
  911. %% assert there's no bridges at first
  912. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  913. Name = ?BRIDGE_NAME,
  914. Port = ?config(port, Config),
  915. URL1 = ?URL(Port, "abc"),
  916. ?assertMatch(
  917. {ok, 201, #{
  918. <<"type">> := ?BRIDGE_TYPE_HTTP,
  919. <<"name">> := Name,
  920. <<"enable">> := true,
  921. <<"status">> := <<"connected">>,
  922. <<"node_status">> := [_ | _],
  923. <<"url">> := URL1
  924. }},
  925. request_json(
  926. post,
  927. uri(["bridges"]),
  928. ?HTTP_BRIDGE(URL1, Name),
  929. Config
  930. )
  931. ),
  932. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  933. {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), Config),
  934. %% delete the bridge
  935. {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
  936. {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
  937. t_with_redact_update(Config) ->
  938. ok = snabbkaffe:start_trace(),
  939. on_exit(fun() -> ok = snabbkaffe:stop() end),
  940. Name = <<"redact_update">>,
  941. Type = <<"mqtt">>,
  942. Password = <<"123456">>,
  943. Template = #{
  944. <<"type">> => Type,
  945. <<"name">> => Name,
  946. <<"server">> => <<"127.0.0.1:1883">>,
  947. <<"username">> => <<"test">>,
  948. <<"password">> => Password,
  949. <<"ingress">> =>
  950. #{<<"remote">> => #{<<"topic">> => <<"t/#">>}}
  951. },
  952. {ok, 201, _} = request(
  953. post,
  954. uri(["bridges"]),
  955. Template,
  956. Config
  957. ),
  958. %% update with redacted config
  959. BridgeConf = emqx_utils:redact(Template),
  960. BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
  961. {ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config),
  962. %% bridge is migrated after creation
  963. ConfigRootKey = connectors,
  964. ?assertEqual(
  965. Password,
  966. get_raw_config([ConfigRootKey, Type, Name, password], Config)
  967. ),
  968. %% probe with new password; should not be considered redacted
  969. {_, {ok, #{params := UsedParams}}} =
  970. ?wait_async_action(
  971. request(
  972. post,
  973. uri(["bridges_probe"]),
  974. Template#{<<"password">> := <<"newpassword">>},
  975. Config
  976. ),
  977. #{?snk_kind := bridge_v1_api_dry_run},
  978. 1_000
  979. ),
  980. UsedPassword0 = maps:get(<<"password">>, UsedParams),
  981. %% the password field schema makes
  982. %% `emqx_dashboard_swagger:filter_check_request_and_translate_body' wrap the password.
  983. %% hack: this fails with `badfun' in CI only, due to cover compile, if not evaluated
  984. %% in the original node...
  985. PrimaryNode = ?config(node, Config),
  986. erpc:call(PrimaryNode, fun() -> ?assertEqual(<<"newpassword">>, UsedPassword0()) end),
  987. ok = snabbkaffe:stop(),
  988. ok.
  989. t_bridges_probe(Config) ->
  990. Port = ?config(port, Config),
  991. URL = ?URL(Port, "some_path"),
  992. {ok, 204, <<>>} = request(
  993. post,
  994. uri(["bridges_probe"]),
  995. ?HTTP_BRIDGE(URL),
  996. Config
  997. ),
  998. %% second time with same name is ok since no real bridge created
  999. {ok, 204, <<>>} = request(
  1000. post,
  1001. uri(["bridges_probe"]),
  1002. ?HTTP_BRIDGE(URL),
  1003. Config
  1004. ),
  1005. %% with descriptions is ok.
  1006. {ok, 204, <<>>} = request(
  1007. post,
  1008. uri(["bridges_probe"]),
  1009. (?HTTP_BRIDGE(URL))#{<<"description">> => <<"Test Description">>},
  1010. Config
  1011. ),
  1012. ?assertMatch(
  1013. {ok, 400, #{
  1014. <<"code">> := <<"TEST_FAILED">>,
  1015. <<"message">> := _
  1016. }},
  1017. request_json(
  1018. post,
  1019. uri(["bridges_probe"]),
  1020. ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>),
  1021. Config
  1022. )
  1023. ),
  1024. %% Missing scheme in URL
  1025. ?assertMatch(
  1026. {ok, 400, #{
  1027. <<"code">> := <<"TEST_FAILED">>,
  1028. <<"message">> := _
  1029. }},
  1030. request_json(
  1031. post,
  1032. uri(["bridges_probe"]),
  1033. ?HTTP_BRIDGE(<<"203.0.113.3:1234/foo">>),
  1034. Config
  1035. )
  1036. ),
  1037. %% Invalid port
  1038. ?assertMatch(
  1039. {ok, 400, #{
  1040. <<"code">> := <<"TEST_FAILED">>,
  1041. <<"message">> := _
  1042. }},
  1043. request_json(
  1044. post,
  1045. uri(["bridges_probe"]),
  1046. ?HTTP_BRIDGE(<<"http://203.0.113.3:12341234/foo">>),
  1047. Config
  1048. )
  1049. ),
  1050. {ok, 204, _} = request(
  1051. post,
  1052. uri(["bridges_probe"]),
  1053. ?MQTT_BRIDGE(<<"127.0.0.1:1883">>),
  1054. Config
  1055. ),
  1056. ?assertMatch(
  1057. {ok, 400, #{
  1058. <<"code">> := <<"TEST_FAILED">>,
  1059. <<"message">> := <<"Connection refused">>
  1060. }},
  1061. request_json(
  1062. post,
  1063. uri(["bridges_probe"]),
  1064. ?MQTT_BRIDGE(<<"127.0.0.1:2883">>),
  1065. Config
  1066. )
  1067. ),
  1068. ?assertMatch(
  1069. {ok, 400, #{
  1070. <<"code">> := <<"TEST_FAILED">>,
  1071. <<"message">> := <<"Could not resolve host">>
  1072. }},
  1073. request_json(
  1074. post,
  1075. uri(["bridges_probe"]),
  1076. ?MQTT_BRIDGE(<<"nohost:2883">>),
  1077. Config
  1078. )
  1079. ),
  1080. AuthnConfig = #{
  1081. <<"mechanism">> => <<"password_based">>,
  1082. <<"backend">> => <<"built_in_database">>,
  1083. <<"user_id_type">> => <<"username">>
  1084. },
  1085. Chain = 'mqtt:global',
  1086. {ok, _} = update_config(
  1087. [authentication],
  1088. {create_authenticator, Chain, AuthnConfig},
  1089. Config
  1090. ),
  1091. User = #{user_id => <<"u">>, password => <<"p">>},
  1092. AuthenticatorID = <<"password_based:built_in_database">>,
  1093. {ok, _} = add_user_auth(
  1094. Chain,
  1095. AuthenticatorID,
  1096. User,
  1097. Config
  1098. ),
  1099. on_exit(fun() ->
  1100. delete_user_auth(Chain, AuthenticatorID, User, Config)
  1101. end),
  1102. ?assertMatch(
  1103. {ok, 400, #{
  1104. <<"code">> := <<"TEST_FAILED">>,
  1105. <<"message">> := <<"Unauthorized client">>
  1106. }},
  1107. request_json(
  1108. post,
  1109. uri(["bridges_probe"]),
  1110. ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>},
  1111. Config
  1112. )
  1113. ),
  1114. ?assertMatch(
  1115. {ok, 400, #{
  1116. <<"code">> := <<"TEST_FAILED">>,
  1117. <<"message">> := <<"Bad username or password">>
  1118. }},
  1119. request_json(
  1120. post,
  1121. uri(["bridges_probe"]),
  1122. ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{
  1123. <<"proto_ver">> => <<"v4">>,
  1124. <<"password">> => <<"mySecret">>,
  1125. <<"username">> => <<"u">>
  1126. },
  1127. Config
  1128. )
  1129. ),
  1130. ?assertMatch(
  1131. {ok, 400, #{
  1132. <<"code">> := <<"TEST_FAILED">>,
  1133. <<"message">> := <<"Not authorized">>
  1134. }},
  1135. request_json(
  1136. post,
  1137. uri(["bridges_probe"]),
  1138. ?MQTT_BRIDGE(<<"127.0.0.1:1883">>),
  1139. Config
  1140. )
  1141. ),
  1142. ?assertMatch(
  1143. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  1144. request_json(
  1145. post,
  1146. uri(["bridges_probe"]),
  1147. ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>),
  1148. Config
  1149. )
  1150. ),
  1151. ok.
  1152. t_metrics(Config) ->
  1153. Port = ?config(port, Config),
  1154. %% assert we there's no bridges at first
  1155. {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
  1156. %% then we add a webhook bridge, using POST
  1157. %% POST /bridges/ will create a bridge
  1158. URL1 = ?URL(Port, "path1"),
  1159. Name = ?BRIDGE_NAME,
  1160. ?assertMatch(
  1161. {ok, 201,
  1162. Bridge = #{
  1163. <<"type">> := ?BRIDGE_TYPE_HTTP,
  1164. <<"name">> := Name,
  1165. <<"enable">> := true,
  1166. <<"status">> := _,
  1167. <<"node_status">> := [_ | _],
  1168. <<"url">> := URL1
  1169. }} when
  1170. %% assert that the bridge return doesn't contain metrics anymore
  1171. not is_map_key(<<"metrics">>, Bridge) andalso
  1172. not is_map_key(<<"node_metrics">>, Bridge),
  1173. request_json(
  1174. post,
  1175. uri(["bridges"]),
  1176. ?HTTP_BRIDGE(URL1, Name),
  1177. Config
  1178. )
  1179. ),
  1180. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  1181. %% check for empty bridge metrics
  1182. ?assertMatch(
  1183. {ok, 200, #{
  1184. <<"metrics">> := #{<<"success">> := 0},
  1185. <<"node_metrics">> := [_ | _]
  1186. }},
  1187. request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
  1188. ),
  1189. %% check that the bridge doesn't contain metrics anymore
  1190. {ok, 200, Bridge} = request_json(get, uri(["bridges", BridgeID]), Config),
  1191. ?assertNot(maps:is_key(<<"metrics">>, Bridge)),
  1192. ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)),
  1193. %% send an message to emqx and the message should be forwarded to the HTTP server
  1194. Body = <<"my msg">>,
  1195. _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
  1196. ?assert(
  1197. receive
  1198. {http_server, received, #{
  1199. method := <<"POST">>,
  1200. path := <<"/path1">>,
  1201. body := Body
  1202. }} ->
  1203. true;
  1204. Msg ->
  1205. ct:pal("error: http got unexpected request: ~p", [Msg]),
  1206. false
  1207. after 100 ->
  1208. false
  1209. end
  1210. ),
  1211. %% check for non-empty bridge metrics
  1212. ?assertMatch(
  1213. {ok, 200, #{
  1214. <<"metrics">> := #{<<"success">> := _},
  1215. <<"node_metrics">> := [_ | _]
  1216. }},
  1217. request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
  1218. ),
  1219. %% check for absence of metrics when listing all bridges
  1220. {ok, 200, Bridges} = request_json(get, uri(["bridges"]), Config),
  1221. ?assertNotMatch(
  1222. [
  1223. #{
  1224. <<"metrics">> := #{},
  1225. <<"node_metrics">> := [_ | _]
  1226. }
  1227. ],
  1228. Bridges
  1229. ),
  1230. ok.
  1231. %% request_timeout in bridge root should match request_ttl in
  1232. %% resource_opts.
  1233. t_inconsistent_webhook_request_timeouts(Config) ->
  1234. Port = ?config(port, Config),
  1235. URL1 = ?URL(Port, "path1"),
  1236. Name = ?BRIDGE_NAME,
  1237. BadBridgeParams =
  1238. emqx_utils_maps:deep_merge(
  1239. ?HTTP_BRIDGE(URL1, Name),
  1240. #{
  1241. <<"request_timeout">> => <<"1s">>,
  1242. <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>}
  1243. }
  1244. ),
  1245. %% root request_timeout is deprecated for bridge.
  1246. {ok, 201,
  1247. #{
  1248. <<"resource_opts">> := ResourceOpts
  1249. } = Response} =
  1250. request_json(
  1251. post,
  1252. uri(["bridges"]),
  1253. BadBridgeParams,
  1254. Config
  1255. ),
  1256. ?assertNot(maps:is_key(<<"request_timeout">>, Response)),
  1257. ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts),
  1258. validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name),
  1259. ok.
  1260. t_cluster_later_join_metrics(Config) ->
  1261. Port = ?config(port, Config),
  1262. [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config),
  1263. URL1 = ?URL(Port, "path1"),
  1264. Name = ?BRIDGE_NAME,
  1265. BridgeParams = ?HTTP_BRIDGE(URL1, Name),
  1266. BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  1267. ?check_trace(
  1268. #{timetrap => 15_000},
  1269. begin
  1270. %% Create a bridge on only one of the nodes.
  1271. ?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)),
  1272. %% Pre-condition.
  1273. ?assertMatch(
  1274. {ok, 200, #{
  1275. <<"metrics">> := #{<<"success">> := _},
  1276. <<"node_metrics">> := [_ | _]
  1277. }},
  1278. request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
  1279. ),
  1280. ct:print("node joining cluster"),
  1281. %% Now join the other node join with the api node.
  1282. ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
  1283. %% Hack / workaround for the fact that `emqx_machine_boot' doesn't restart the
  1284. %% applications, in particular `emqx_conf' doesn't restart and synchronize the
  1285. %% transaction id. It's also unclear at the moment why the equivalent test in
  1286. %% `emqx_bridge_v2_api_SUITE' doesn't need this hack.
  1287. ok = erpc:call(OtherNode, application, stop, [emqx_conf]),
  1288. ok = erpc:call(OtherNode, application, start, [emqx_conf]),
  1289. ct:print("node joined cluster"),
  1290. %% assert: wait for the bridge to be ready on the other node.
  1291. {_, {ok, _}} =
  1292. ?wait_async_action(
  1293. {emqx_cluster_rpc, OtherNode} ! wake_up,
  1294. #{?snk_kind := cluster_rpc_caught_up, ?snk_meta := #{node := OtherNode}},
  1295. 10_000
  1296. ),
  1297. %% Check metrics; shouldn't crash even if the bridge is not
  1298. %% ready on the node that just joined the cluster.
  1299. ?assertMatch(
  1300. {ok, 200, #{
  1301. <<"metrics">> := #{<<"success">> := _},
  1302. <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _]
  1303. }},
  1304. request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
  1305. ),
  1306. ok
  1307. end,
  1308. []
  1309. ),
  1310. ok.
  1311. t_create_with_bad_name(Config) ->
  1312. Port = ?config(port, Config),
  1313. URL1 = ?URL(Port, "path1"),
  1314. Name = <<"test_哈哈">>,
  1315. BadBridgeParams =
  1316. emqx_utils_maps:deep_merge(
  1317. ?HTTP_BRIDGE(URL1, Name),
  1318. #{
  1319. <<"ssl">> =>
  1320. #{
  1321. <<"enable">> => true,
  1322. <<"certfile">> => cert_file("certfile")
  1323. }
  1324. }
  1325. ),
  1326. {ok, 400, #{
  1327. <<"code">> := <<"BAD_REQUEST">>,
  1328. <<"message">> := Msg0
  1329. }} =
  1330. request_json(
  1331. post,
  1332. uri(["bridges"]),
  1333. BadBridgeParams,
  1334. Config
  1335. ),
  1336. Msg = emqx_utils_json:decode(Msg0, [return_maps]),
  1337. ?assertMatch(
  1338. #{
  1339. <<"kind">> := <<"validation_error">>,
  1340. <<"reason">> := <<"Invalid name format.", _/binary>>
  1341. },
  1342. Msg
  1343. ),
  1344. ok.
  1345. validate_resource_request_ttl(single, Timeout, Name) ->
  1346. SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
  1347. _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
  1348. ?check_trace(
  1349. begin
  1350. {ok, Res} =
  1351. ?wait_async_action(
  1352. do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
  1353. #{?snk_kind := async_query},
  1354. 1000
  1355. ),
  1356. ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res)
  1357. end,
  1358. fun(Trace0) ->
  1359. Trace = ?of_kind(async_query, Trace0),
  1360. ?assertMatch([#{query_opts := #{timeout := Timeout}}], Trace),
  1361. ok
  1362. end
  1363. );
  1364. validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
  1365. ignore.
  1366. do_send_message(BridgeV1Type, Name, Message) ->
  1367. Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
  1368. emqx_bridge_v2:send_message(Type, Name, Message, #{}).
  1369. %%
  1370. request(Method, URL, Config) ->
  1371. request(Method, URL, [], Config).
  1372. request(Method, {operation, Type, Op, BridgeID}, Body, Config) ->
  1373. URL = operation_path(Type, Op, BridgeID, Config),
  1374. request(Method, URL, Body, Config);
  1375. request(Method, URL, Body, Config) ->
  1376. AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)),
  1377. Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
  1378. emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts).
  1379. request(Method, URL, Body, Decoder, Config) ->
  1380. case request(Method, URL, Body, Config) of
  1381. {ok, Code, Response} ->
  1382. {ok, Code, Decoder(Response)};
  1383. Otherwise ->
  1384. Otherwise
  1385. end.
  1386. request_json(Method, URLLike, Config) ->
  1387. request(Method, URLLike, [], fun json/1, Config).
  1388. request_json(Method, URLLike, Body, Config) ->
  1389. request(Method, URLLike, Body, fun json/1, Config).
  1390. operation_path(node, Oper, BridgeID, Config) ->
  1391. uri(["nodes", ?config(node, Config), "bridges", BridgeID, Oper]);
  1392. operation_path(cluster, Oper, BridgeID, _Config) ->
  1393. uri(["bridges", BridgeID, Oper]).
  1394. enable_path(Enable, BridgeID) ->
  1395. uri(["bridges", BridgeID, "enable", Enable]).
  1396. publish_message(Topic, Body, Config) ->
  1397. Node = ?config(node, Config),
  1398. erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]).
  1399. update_config(Path, Value, Config) ->
  1400. Node = ?config(node, Config),
  1401. erpc:call(Node, emqx, update_config, [Path, Value]).
  1402. get_raw_config(Path, Config) ->
  1403. Node = ?config(node, Config),
  1404. erpc:call(Node, emqx, get_raw_config, [Path]).
  1405. add_user_auth(Chain, AuthenticatorID, User, Config) ->
  1406. Node = ?config(node, Config),
  1407. erpc:call(Node, emqx_authn_chains, add_user, [Chain, AuthenticatorID, User]).
  1408. delete_user_auth(Chain, AuthenticatorID, User, Config) ->
  1409. Node = ?config(node, Config),
  1410. erpc:call(Node, emqx_authn_chains, delete_user, [Chain, AuthenticatorID, User]).
  1411. str(S) when is_list(S) -> S;
  1412. str(S) when is_binary(S) -> binary_to_list(S).
  1413. json(B) when is_binary(B) ->
  1414. emqx_utils_json:decode(B, [return_maps]).
  1415. data_file(Name) ->
  1416. Dir = code:lib_dir(emqx_bridge, test),
  1417. {ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),
  1418. Bin.
  1419. cert_file(Name) ->
  1420. data_file(filename:join(["certs", Name])).