| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_bridge_api_SUITE).
- -compile(nowarn_export_all).
- -compile(export_all).
- -import(emqx_mgmt_api_test_util, [uri/1]).
- -import(emqx_common_test_helpers, [on_exit/1]).
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("common_test/include/ct.hrl").
- -include_lib("snabbkaffe/include/test_macros.hrl").
- -define(BRIDGE_TYPE_HTTP, <<"webhook">>).
- -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
- -define(URL(PORT, PATH),
- list_to_binary(
- io_lib:format(
- "http://localhost:~s/~s",
- [integer_to_list(PORT), PATH]
- )
- )
- ).
- -define(BRIDGE(NAME, TYPE), #{
- <<"ssl">> => #{<<"enable">> => false},
- <<"type">> => TYPE,
- <<"name">> => NAME
- }).
- -define(BRIDGE_TYPE_MQTT, <<"mqtt">>).
- -define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{
- <<"server">> => SERVER,
- <<"username">> => <<"user1">>,
- <<"password">> => <<"">>,
- <<"proto_ver">> => <<"v5">>,
- <<"egress">> => #{
- <<"remote">> => #{
- <<"topic">> => <<"emqx/${topic}">>,
- <<"qos">> => <<"${qos}">>,
- <<"retain">> => false
- }
- }
- }).
- -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
- -define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{
- <<"url">> => URL,
- <<"local_topic">> => <<"emqx_webhook/#">>,
- <<"method">> => <<"post">>,
- <<"body">> => <<"${payload}">>,
- <<"headers">> => #{
- % NOTE
- % The Pascal-Case is important here.
- % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts
- % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS
- % when this happens (while the 'content-type' does not).
- <<"Content-Type">> => <<"application/json">>
- }
- }).
- -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
- -define(APPSPECS, [
- emqx,
- emqx_conf,
- emqx_auth,
- emqx_auth_mnesia,
- emqx_management,
- emqx_connector,
- emqx_bridge_http,
- {emqx_bridge, "actions {}\n bridges {}"},
- {emqx_rule_engine, "rule_engine { rules {} }"}
- ]).
- -define(APPSPEC_DASHBOARD,
- {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
- ).
- all() ->
- [
- {group, single},
- {group, cluster_later_join},
- {group, cluster}
- ].
- groups() ->
- AllTCs = emqx_common_test_helpers:all(?MODULE),
- SingleOnlyTests = [
- t_broken_bpapi_vsn,
- t_old_bpapi_vsn,
- t_bridges_probe
- ],
- ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
- [
- {single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
- {cluster_later_join, [], ClusterLaterJoinOnlyTCs},
- {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs}
- ].
- suite() ->
- [{timetrap, {seconds, 120}}].
- init_per_suite(Config) ->
- Config.
- end_per_suite(_Config) ->
- ok.
- init_per_group(cluster = Name, Config) ->
- Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
- init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
- init_per_group(cluster_later_join = Name, Config) ->
- Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
- init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
- init_per_group(_Name, Config) ->
- WorkDir = emqx_cth_suite:work_dir(Config),
- Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
- init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]).
- init_api(Config) ->
- APINode = ?config(node, Config),
- {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []),
- [{api, App} | Config].
- mk_cluster(Name, Config) ->
- mk_cluster(Name, Config, #{}).
- mk_cluster(Name, Config, Opts) ->
- Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD],
- Node2Apps = ?APPSPECS,
- emqx_cth_cluster:start(
- [
- {emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}},
- {emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}}
- ],
- #{work_dir => emqx_cth_suite:work_dir(Name, Config)}
- ).
- end_per_group(Group, Config) when
- Group =:= cluster;
- Group =:= cluster_later_join
- ->
- ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
- end_per_group(_, Config) ->
- emqx_cth_suite:stop(?config(group_apps, Config)),
- ok.
- init_per_testcase(t_broken_bpapi_vsn, Config) ->
- meck:new(emqx_bpapi, [passthrough]),
- meck:expect(emqx_bpapi, supported_version, 2, -1),
- meck:new(emqx_bridge_api, [passthrough]),
- meck:expect(emqx_bridge_api, supported_versions, 1, []),
- init_per_testcase(common, Config);
- init_per_testcase(t_old_bpapi_vsn, Config) ->
- meck:new(emqx_bpapi, [passthrough]),
- meck:expect(emqx_bpapi, supported_version, 1, 1),
- meck:expect(emqx_bpapi, supported_version, 2, 1),
- init_per_testcase(common, Config);
- init_per_testcase(_, Config) ->
- {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
- [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
- end_per_testcase(t_broken_bpapi_vsn, Config) ->
- meck:unload(),
- end_per_testcase(common, Config);
- end_per_testcase(t_old_bpapi_vsn, Config) ->
- meck:unload(),
- end_per_testcase(common, Config);
- end_per_testcase(_, Config) ->
- Sock = ?config(sock, Config),
- Acceptor = ?config(acceptor, Config),
- Node = ?config(node, Config),
- ok = emqx_common_test_helpers:call_janitor(),
- ok = stop_http_server(Sock, Acceptor),
- ok = erpc:call(Node, fun clear_resources/0),
- ok.
- clear_resources() ->
- emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
- lists:foreach(
- fun(#{type := Type, name := Name}) ->
- ok = emqx_bridge:remove(Type, Name)
- end,
- emqx_bridge:list()
- ).
- %%------------------------------------------------------------------------------
- %% HTTP server for testing
- %%------------------------------------------------------------------------------
- start_http_server(HandleFun) ->
- process_flag(trap_exit, true),
- Parent = self(),
- {Port, Sock} = listen_on_random_port(),
- Acceptor = spawn_link(fun() ->
- accept_loop(Sock, HandleFun, Parent)
- end),
- timer:sleep(100),
- {Port, Sock, Acceptor}.
- stop_http_server(Sock, Acceptor) ->
- exit(Acceptor, kill),
- gen_tcp:close(Sock).
- listen_on_random_port() ->
- SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
- case gen_tcp:listen(0, SockOpts) of
- {ok, Sock} ->
- {ok, Port} = inet:port(Sock),
- {Port, Sock};
- {error, Reason} when Reason /= eaddrinuse ->
- {error, Reason}
- end.
- accept_loop(Sock, HandleFun, Parent) ->
- process_flag(trap_exit, true),
- {ok, Conn} = gen_tcp:accept(Sock),
- Handler = spawn_link(fun() -> HandleFun(Conn, Parent) end),
- gen_tcp:controlling_process(Conn, Handler),
- accept_loop(Sock, HandleFun, Parent).
- make_response(CodeStr, Str) ->
- B = iolist_to_binary(Str),
- iolist_to_binary(
- io_lib:fwrite(
- "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
- [CodeStr, size(B), B]
- )
- ).
- handle_fun_200_ok(Conn, Parent) ->
- case gen_tcp:recv(Conn, 0) of
- {ok, ReqStr} ->
- ct:pal("the http handler got request: ~p", [ReqStr]),
- Req = parse_http_request(ReqStr),
- Parent ! {http_server, received, Req},
- gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
- handle_fun_200_ok(Conn, Parent);
- {error, Reason} ->
- ct:pal("the http handler recv error: ~p", [Reason]),
- timer:sleep(100),
- gen_tcp:close(Conn)
- end.
- parse_http_request(ReqStr0) ->
- [Method, ReqStr1] = string:split(ReqStr0, " ", leading),
- [Path, ReqStr2] = string:split(ReqStr1, " ", leading),
- [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
- [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading),
- #{method => Method, path => Path, body => Body}.
- %%------------------------------------------------------------------------------
- %% Testcases
- %%------------------------------------------------------------------------------
- t_http_crud_apis(Config) ->
- Port = ?config(port, Config),
- %% assert we there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- {ok, 404, _} = request(get, uri(["bridges", "foo"]), Config),
- {ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), Config),
- %% then we add a webhook bridge, using POST
- %% POST /bridges/ will create a bridge
- URL1 = ?URL(Port, "path1"),
- Name = ?BRIDGE_NAME,
- ?assertMatch(
- {ok, 201, #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := _,
- <<"node_status">> := [_ | _],
- <<"url">> := URL1
- }},
- request_json(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- )
- ),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- %% send an message to emqx and the message should be forwarded to the HTTP server
- Body = <<"my msg">>,
- _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
- ?assert(
- receive
- {http_server, received, #{
- method := <<"POST">>,
- path := <<"/path1">>,
- body := Body
- }} ->
- true;
- Msg ->
- ct:pal("error: http got unexpected request: ~p", [Msg]),
- false
- after 100 ->
- false
- end
- ),
- %% update the request-path of the bridge
- URL2 = ?URL(Port, "path2"),
- ?assertMatch(
- {ok, 200, #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := _,
- <<"node_status">> := [_ | _],
- <<"url">> := URL2
- }},
- request_json(
- put,
- uri(["bridges", BridgeID]),
- ?HTTP_BRIDGE(URL2, Name),
- Config
- )
- ),
- %% list all bridges again, assert Bridge2 is in it
- ?assertMatch(
- {ok, 200, [
- #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := _,
- <<"node_status">> := [_ | _],
- <<"url">> := URL2
- }
- ]},
- request_json(get, uri(["bridges"]), Config)
- ),
- %% get the bridge by id
- ?assertMatch(
- {ok, 200, #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := _,
- <<"node_status">> := [_ | _],
- <<"url">> := URL2
- }},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% send an message to emqx again, check the path has been changed
- _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
- ?assert(
- receive
- {http_server, received, #{path := <<"/path2">>}} ->
- true;
- Msg2 ->
- ct:pal("error: http got unexpected request: ~p", [Msg2]),
- false
- after 100 ->
- false
- end
- ),
- %% Test bad updates
- %% ================
- %% Add bridge with a name that is too long
- %% We only support bridge names up to 255 characters
- LongName = list_to_binary(lists:duplicate(256, $a)),
- NameTooLongRequestResult = request_json(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, LongName),
- Config
- ),
- ?assertMatch(
- {ok, 400, _},
- NameTooLongRequestResult
- ),
- {ok, 400, #{<<"message">> := NameTooLongMessage}} = NameTooLongRequestResult,
- %% Use regex to check that the message contains the name
- Match = re:run(NameTooLongMessage, LongName),
- ?assertMatch({match, _}, Match),
- %% Add bridge without the URL field
- {ok, 400, PutFail1} = request_json(
- put,
- uri(["bridges", BridgeID]),
- maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name)),
- Config
- ),
- ?assertMatch(
- #{<<"reason">> := <<"required_field">>},
- json(maps:get(<<"message">>, PutFail1))
- ),
- {ok, 400, PutFail2} = request_json(
- put,
- uri(["bridges", BridgeID]),
- maps:put(<<"curl">>, URL2, maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name))),
- Config
- ),
- ?assertMatch(
- #{<<"reason">> := <<"required_field">>},
- json(maps:get(<<"message">>, PutFail2))
- ),
- {ok, 400, _} = request_json(
- put,
- uri(["bridges", BridgeID]),
- ?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name),
- Config
- ),
- {ok, 400, PutFail3} = request_json(
- put,
- uri(["bridges", BridgeID]),
- ?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name),
- Config
- ),
- ?assertMatch(
- #{<<"kind">> := <<"validation_error">>},
- json(maps:get(<<"message">>, PutFail3))
- ),
- %% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- %% update a deleted bridge returns an error
- ?assertMatch(
- {ok, 404, #{
- <<"code">> := <<"NOT_FOUND">>,
- <<"message">> := _
- }},
- request_json(
- put,
- uri(["bridges", BridgeID]),
- ?HTTP_BRIDGE(URL2, Name),
- Config
- )
- ),
- %% try delete bad bridge id
- ?assertMatch(
- {ok, 404, #{
- <<"code">> := <<"NOT_FOUND">>,
- <<"message">> := <<"Invalid bridge ID", _/binary>>
- }},
- request_json(delete, uri(["bridges", "foo"]), Config)
- ),
- %% Deleting a non-existing bridge should result in an error
- ?assertMatch(
- {ok, 404, #{
- <<"code">> := <<"NOT_FOUND">>,
- <<"message">> := _
- }},
- request_json(delete, uri(["bridges", BridgeID]), Config)
- ),
- %% Create non working bridge
- BrokenURL = ?URL(Port + 1, "foo"),
- {ok, 201, BrokenBridge} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(BrokenURL, Name),
- fun json/1,
- Config
- ),
- ?assertMatch(
- #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := <<"disconnected">>,
- <<"status_reason">> := <<"Connection refused">>,
- <<"node_status">> := [
- #{
- <<"status">> := <<"disconnected">>,
- <<"status_reason">> := <<"Connection refused">>
- }
- | _
- ],
- <<"url">> := BrokenURL
- },
- BrokenBridge
- ),
- {ok, 200, FixedBridge} = request_json(
- put,
- uri(["bridges", BridgeID]),
- ?HTTP_BRIDGE(URL1),
- Config
- ),
- ?assertMatch(
- #{
- <<"status">> := <<"connected">>,
- <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _]
- } when
- not is_map_key(<<"status_reason">>, FixedBridge) andalso
- not is_map_key(<<"status_reason">>, FixedNodeStatus),
- FixedBridge
- ),
- %% Try create bridge with bad characters as name
- {ok, 400, _} = request(post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, <<"隋达"/utf8>>), Config),
- %% Missing scheme in URL
- {ok, 400, _} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(<<"localhost:1234/foo">>, <<"missing_url_scheme">>),
- Config
- ),
- %% Invalid port
- {ok, 400, _} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(<<"http://localhost:12341234/foo">>, <<"invalid_port">>),
- Config
- ),
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
- t_http_bridges_local_topic(Config) ->
- Port = ?config(port, Config),
- %% assert we there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- %% then we add a webhook bridge, using POST
- %% POST /bridges/ will create a bridge
- URL1 = ?URL(Port, "path1"),
- Name1 = <<"t_http_bridges_with_local_topic1">>,
- Name2 = <<"t_http_bridges_without_local_topic1">>,
- %% create one http bridge with local_topic
- {ok, 201, _} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name1),
- Config
- ),
- %% and we create another one without local_topic
- {ok, 201, _} = request(
- post,
- uri(["bridges"]),
- maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2)),
- Config
- ),
- BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name1),
- BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name2),
- %% Send an message to emqx and the message should be forwarded to the HTTP server.
- %% This is to verify we can have 2 bridges with and without local_topic fields
- %% at the same time.
- Body = <<"my msg">>,
- _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
- ?assert(
- receive
- {http_server, received, #{
- method := <<"POST">>,
- path := <<"/path1">>,
- body := Body
- }} ->
- true;
- Msg ->
- ct:pal("error: http got unexpected request: ~p", [Msg]),
- false
- after 100 ->
- false
- end
- ),
- %% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), Config),
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), Config).
- t_check_dependent_actions_on_delete(Config) ->
- Port = ?config(port, Config),
- %% assert we there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- %% then we add a webhook bridge, using POST
- %% POST /bridges/ will create a bridge
- URL1 = ?URL(Port, "path1"),
- Name = <<"t_http_crud_apis">>,
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- {ok, 201, _} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- ),
- {ok, 201, #{<<"id">> := RuleId}} = request_json(
- post,
- uri(["rules"]),
- #{
- <<"name">> => <<"t_http_crud_apis">>,
- <<"enable">> => true,
- <<"actions">> => [BridgeID],
- <<"sql">> => <<"SELECT * from \"t\"">>
- },
- Config
- ),
- %% deleting the bridge should fail because there is a rule that depends on it
- {ok, 400, Body} = request(
- delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", Config
- ),
- ?assertMatch(#{<<"rules">> := [_ | _]}, emqx_utils_json:decode(Body, [return_maps])),
- %% delete the rule first
- {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config),
- %% then delete the bridge is OK
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
- t_cascade_delete_actions(Config) ->
- Port = ?config(port, Config),
- %% assert we there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- %% then we add a webhook bridge, using POST
- %% POST /bridges/ will create a bridge
- URL1 = ?URL(Port, "path1"),
- Name = <<"t_http_crud_apis">>,
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- {ok, 201, _} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- ),
- {ok, 201, #{<<"id">> := RuleId}} = request_json(
- post,
- uri(["rules"]),
- #{
- <<"name">> => <<"t_http_crud_apis">>,
- <<"enable">> => true,
- <<"actions">> => [BridgeID],
- <<"sql">> => <<"SELECT * from \"t\"">>
- },
- Config
- ),
- %% delete the bridge will also delete the actions from the rules
- {ok, 204, _} = request(
- delete,
- uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true",
- Config
- ),
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- ?assertMatch(
- {ok, 200, #{<<"actions">> := []}},
- request_json(get, uri(["rules", RuleId]), Config)
- ),
- {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config),
- {ok, 201, _} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- ),
- {ok, 201, _} = request(
- post,
- uri(["rules"]),
- #{
- <<"name">> => <<"t_http_crud_apis">>,
- <<"enable">> => true,
- <<"actions">> => [BridgeID],
- <<"sql">> => <<"SELECT * from \"t\"">>
- },
- Config
- ),
- {ok, 204, _} = request(
- delete,
- uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions",
- Config
- ),
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
- t_broken_bpapi_vsn(Config) ->
- Port = ?config(port, Config),
- URL1 = ?URL(Port, "abc"),
- Name = <<"t_bad_bpapi_vsn">>,
- {ok, 201, _Bridge} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- ),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- %% still works since we redirect to 'restart'
- {ok, 501, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config),
- {ok, 501, <<>>} = request(post, {operation, node, start, BridgeID}, Config),
- ok.
- t_old_bpapi_vsn(Config) ->
- Port = ?config(port, Config),
- URL1 = ?URL(Port, "abc"),
- Name = <<"t_bad_bpapi_vsn">>,
- {ok, 201, _Bridge} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- ),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- {ok, 204, <<>>} = request(post, {operation, cluster, stop, BridgeID}, Config),
- {ok, 204, <<>>} = request(post, {operation, node, stop, BridgeID}, Config),
- %% still works since we redirect to 'restart'
- {ok, 204, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config),
- {ok, 204, <<>>} = request(post, {operation, node, start, BridgeID}, Config),
- {ok, 204, <<>>} = request(post, {operation, cluster, restart, BridgeID}, Config),
- {ok, 204, <<>>} = request(post, {operation, node, restart, BridgeID}, Config),
- ok.
- t_start_bridge_unknown_node(Config) ->
- {ok, 404, _} =
- request(
- post,
- uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]),
- Config
- ),
- {ok, 404, _} =
- request(
- post,
- uri(["nodes", "undefined", "bridges", "webhook:foo", start]),
- Config
- ).
- t_start_stop_bridges_node(Config) ->
- do_start_stop_bridges(node, Config).
- t_start_stop_bridges_cluster(Config) ->
- do_start_stop_bridges(cluster, Config).
- do_start_stop_bridges(Type, Config) ->
- %% assert we there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- Port = ?config(port, Config),
- URL1 = ?URL(Port, "abc"),
- Name = atom_to_binary(Type),
- ?assertMatch(
- {ok, 201, #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := <<"connected">>,
- <<"node_status">> := [_ | _],
- <<"url">> := URL1
- }},
- request_json(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- )
- ),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- ExpectedStatus =
- case ?config(group, Config) of
- cluster when Type == node ->
- <<"inconsistent">>;
- _ ->
- <<"stopped">>
- end,
- %% stop it
- {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := ExpectedStatus}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% start again
- {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"connected">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% start a started bridge
- {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"connected">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% restart an already started bridge
- {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"connected">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% stop it again
- {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config),
- %% restart a stopped bridge
- {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"connected">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- {ok, 404, _} = request(post, {operation, Type, invalidop, BridgeID}, Config),
- %% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- %% Fail parse-id check
- {ok, 404, _} = request(post, {operation, Type, start, <<"wreckbook_fugazi">>}, Config),
- %% Looks ok but doesn't exist
- {ok, 404, _} = request(post, {operation, Type, start, <<"webhook:cptn_hook">>}, Config),
- %% Create broken bridge
- {ListenPort, Sock} = listen_on_random_port(),
- %% Connecting to this endpoint should always timeout
- BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
- BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
- CreateRes0 = request_json(
- post,
- uri(["bridges"]),
- ?MQTT_BRIDGE(BadServer, BadName),
- Config
- ),
- ?assertMatch(
- {ok, 201, #{
- <<"type">> := ?BRIDGE_TYPE_MQTT,
- <<"name">> := BadName,
- <<"enable">> := true,
- <<"server">> := BadServer
- }},
- CreateRes0
- ),
- {ok, 201, CreateRes1} = CreateRes0,
- case CreateRes1 of
- #{
- <<"node_status">> := [
- #{
- <<"status">> := <<"disconnected">>,
- <<"status_reason">> := <<"connack_timeout">>
- },
- #{<<"status">> := <<"connecting">>}
- | _
- ],
- %% `inconsistent': one node is `?status_disconnected' (because it has already
- %% timed out), the other node is `?status_connecting' (started later and
- %% haven't timed out yet)
- <<"status">> := <<"inconsistent">>,
- <<"status_reason">> := <<"connack_timeout">>
- } ->
- ok;
- #{
- <<"node_status">> := [_, _ | _],
- <<"status">> := <<"disconnected">>,
- <<"status_reason">> := <<"connack_timeout">>
- } ->
- ok;
- #{
- <<"node_status">> := [_],
- <<"status">> := <<"connecting">>
- } ->
- ok;
- _ ->
- error({unexpected_result, CreateRes1})
- end,
- BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
- ?assertMatch(
- %% request from product: return 400 on such errors
- {ok, SC, _} when SC == 500 orelse SC == 400,
- request(post, {operation, Type, start, BadBridgeID}, Config)
- ),
- ok = gen_tcp:close(Sock),
- ok.
- t_start_stop_inconsistent_bridge_node(Config) ->
- start_stop_inconsistent_bridge(node, Config).
- t_start_stop_inconsistent_bridge_cluster(Config) ->
- start_stop_inconsistent_bridge(cluster, Config).
- start_stop_inconsistent_bridge(Type, Config) ->
- Port = ?config(port, Config),
- URL = ?URL(Port, "abc"),
- Node = ?config(node, Config),
- erpc:call(Node, fun() ->
- meck:new(emqx_bridge_resource, [passthrough, no_link]),
- meck:expect(
- emqx_bridge_resource,
- stop,
- fun
- (_, <<"bridge_not_found">>) -> {error, not_found};
- (BridgeType, Name) -> meck:passthrough([BridgeType, Name])
- end
- )
- end),
- on_exit(fun() ->
- erpc:call(Node, fun() ->
- meck:unload([emqx_bridge_resource])
- end)
- end),
- {ok, 201, _Bridge} = request(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL, <<"bridge_not_found">>),
- Config
- ),
- {ok, 503, _} = request(
- post, {operation, Type, stop, <<"webhook:bridge_not_found">>}, Config
- ).
- t_enable_disable_bridges(Config) ->
- %% assert we there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- Name = ?BRIDGE_NAME,
- Port = ?config(port, Config),
- URL1 = ?URL(Port, "abc"),
- ?assertMatch(
- {ok, 201, #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := <<"connected">>,
- <<"node_status">> := [_ | _],
- <<"url">> := URL1
- }},
- request_json(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- )
- ),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- %% disable it
- {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"stopped">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% enable again
- {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"connected">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% enable an already started bridge
- {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"connected">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% disable it again
- {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config),
- %% bad param
- {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config),
- {ok, 404, _} = request(put, enable_path(true, "foo"), Config),
- {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config),
- {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config),
- ?assertEqual(
- #{
- <<"code">> => <<"BAD_REQUEST">>,
- <<"message">> => <<"Forbidden operation, bridge not enabled">>
- },
- Res
- ),
- {ok, 400, Res} = request(post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config),
- %% enable a stopped bridge
- {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
- ?assertMatch(
- {ok, 200, #{<<"status">> := <<"connected">>}},
- request_json(get, uri(["bridges", BridgeID]), Config)
- ),
- %% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
- t_reset_bridges(Config) ->
- %% assert there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- Name = ?BRIDGE_NAME,
- Port = ?config(port, Config),
- URL1 = ?URL(Port, "abc"),
- ?assertMatch(
- {ok, 201, #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := <<"connected">>,
- <<"node_status">> := [_ | _],
- <<"url">> := URL1
- }},
- request_json(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- )
- ),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), Config),
- %% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config).
- t_with_redact_update(Config) ->
- ok = snabbkaffe:start_trace(),
- on_exit(fun() -> ok = snabbkaffe:stop() end),
- Name = <<"redact_update">>,
- Type = <<"mqtt">>,
- Password = <<"123456">>,
- Template = #{
- <<"type">> => Type,
- <<"name">> => Name,
- <<"server">> => <<"127.0.0.1:1883">>,
- <<"username">> => <<"test">>,
- <<"password">> => Password,
- <<"ingress">> =>
- #{<<"remote">> => #{<<"topic">> => <<"t/#">>}}
- },
- {ok, 201, _} = request(
- post,
- uri(["bridges"]),
- Template,
- Config
- ),
- %% update with redacted config
- BridgeConf = emqx_utils:redact(Template),
- BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
- {ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config),
- %% bridge is migrated after creation
- ConfigRootKey = connectors,
- ?assertEqual(
- Password,
- get_raw_config([ConfigRootKey, Type, Name, password], Config)
- ),
- %% probe with new password; should not be considered redacted
- {_, {ok, #{params := UsedParams}}} =
- ?wait_async_action(
- request(
- post,
- uri(["bridges_probe"]),
- Template#{<<"password">> := <<"newpassword">>},
- Config
- ),
- #{?snk_kind := bridge_v1_api_dry_run},
- 1_000
- ),
- UsedPassword0 = maps:get(<<"password">>, UsedParams),
- %% the password field schema makes
- %% `emqx_dashboard_swagger:filter_check_request_and_translate_body' wrap the password.
- %% hack: this fails with `badfun' in CI only, due to cover compile, if not evaluated
- %% in the original node...
- PrimaryNode = ?config(node, Config),
- erpc:call(PrimaryNode, fun() -> ?assertEqual(<<"newpassword">>, UsedPassword0()) end),
- ok = snabbkaffe:stop(),
- ok.
- t_bridges_probe(Config) ->
- Port = ?config(port, Config),
- URL = ?URL(Port, "some_path"),
- {ok, 204, <<>>} = request(
- post,
- uri(["bridges_probe"]),
- ?HTTP_BRIDGE(URL),
- Config
- ),
- %% second time with same name is ok since no real bridge created
- {ok, 204, <<>>} = request(
- post,
- uri(["bridges_probe"]),
- ?HTTP_BRIDGE(URL),
- Config
- ),
- %% with descriptions is ok.
- {ok, 204, <<>>} = request(
- post,
- uri(["bridges_probe"]),
- (?HTTP_BRIDGE(URL))#{<<"description">> => <<"Test Description">>},
- Config
- ),
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := _
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>),
- Config
- )
- ),
- %% Missing scheme in URL
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := _
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?HTTP_BRIDGE(<<"203.0.113.3:1234/foo">>),
- Config
- )
- ),
- %% Invalid port
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := _
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?HTTP_BRIDGE(<<"http://203.0.113.3:12341234/foo">>),
- Config
- )
- ),
- {ok, 204, _} = request(
- post,
- uri(["bridges_probe"]),
- ?MQTT_BRIDGE(<<"127.0.0.1:1883">>),
- Config
- ),
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := <<"Connection refused">>
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?MQTT_BRIDGE(<<"127.0.0.1:2883">>),
- Config
- )
- ),
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := <<"Could not resolve host">>
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?MQTT_BRIDGE(<<"nohost:2883">>),
- Config
- )
- ),
- AuthnConfig = #{
- <<"mechanism">> => <<"password_based">>,
- <<"backend">> => <<"built_in_database">>,
- <<"user_id_type">> => <<"username">>
- },
- Chain = 'mqtt:global',
- {ok, _} = update_config(
- [authentication],
- {create_authenticator, Chain, AuthnConfig},
- Config
- ),
- User = #{user_id => <<"u">>, password => <<"p">>},
- AuthenticatorID = <<"password_based:built_in_database">>,
- {ok, _} = add_user_auth(
- Chain,
- AuthenticatorID,
- User,
- Config
- ),
- on_exit(fun() ->
- delete_user_auth(Chain, AuthenticatorID, User, Config)
- end),
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := <<"Unauthorized client">>
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>},
- Config
- )
- ),
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := <<"Bad username or password">>
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{
- <<"proto_ver">> => <<"v4">>,
- <<"password">> => <<"mySecret">>,
- <<"username">> => <<"u">>
- },
- Config
- )
- ),
- ?assertMatch(
- {ok, 400, #{
- <<"code">> := <<"TEST_FAILED">>,
- <<"message">> := <<"Not authorized">>
- }},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?MQTT_BRIDGE(<<"127.0.0.1:1883">>),
- Config
- )
- ),
- ?assertMatch(
- {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
- request_json(
- post,
- uri(["bridges_probe"]),
- ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>),
- Config
- )
- ),
- ok.
- t_metrics(Config) ->
- Port = ?config(port, Config),
- %% assert we there's no bridges at first
- {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
- %% then we add a webhook bridge, using POST
- %% POST /bridges/ will create a bridge
- URL1 = ?URL(Port, "path1"),
- Name = ?BRIDGE_NAME,
- ?assertMatch(
- {ok, 201,
- Bridge = #{
- <<"type">> := ?BRIDGE_TYPE_HTTP,
- <<"name">> := Name,
- <<"enable">> := true,
- <<"status">> := _,
- <<"node_status">> := [_ | _],
- <<"url">> := URL1
- }} when
- %% assert that the bridge return doesn't contain metrics anymore
- not is_map_key(<<"metrics">>, Bridge) andalso
- not is_map_key(<<"node_metrics">>, Bridge),
- request_json(
- post,
- uri(["bridges"]),
- ?HTTP_BRIDGE(URL1, Name),
- Config
- )
- ),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- %% check for empty bridge metrics
- ?assertMatch(
- {ok, 200, #{
- <<"metrics">> := #{<<"success">> := 0},
- <<"node_metrics">> := [_ | _]
- }},
- request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
- ),
- %% check that the bridge doesn't contain metrics anymore
- {ok, 200, Bridge} = request_json(get, uri(["bridges", BridgeID]), Config),
- ?assertNot(maps:is_key(<<"metrics">>, Bridge)),
- ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)),
- %% send an message to emqx and the message should be forwarded to the HTTP server
- Body = <<"my msg">>,
- _ = publish_message(<<"emqx_webhook/1">>, Body, Config),
- ?assert(
- receive
- {http_server, received, #{
- method := <<"POST">>,
- path := <<"/path1">>,
- body := Body
- }} ->
- true;
- Msg ->
- ct:pal("error: http got unexpected request: ~p", [Msg]),
- false
- after 100 ->
- false
- end
- ),
- %% check for non-empty bridge metrics
- ?assertMatch(
- {ok, 200, #{
- <<"metrics">> := #{<<"success">> := _},
- <<"node_metrics">> := [_ | _]
- }},
- request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
- ),
- %% check for absence of metrics when listing all bridges
- {ok, 200, Bridges} = request_json(get, uri(["bridges"]), Config),
- ?assertNotMatch(
- [
- #{
- <<"metrics">> := #{},
- <<"node_metrics">> := [_ | _]
- }
- ],
- Bridges
- ),
- ok.
- %% request_timeout in bridge root should match request_ttl in
- %% resource_opts.
- t_inconsistent_webhook_request_timeouts(Config) ->
- Port = ?config(port, Config),
- URL1 = ?URL(Port, "path1"),
- Name = ?BRIDGE_NAME,
- BadBridgeParams =
- emqx_utils_maps:deep_merge(
- ?HTTP_BRIDGE(URL1, Name),
- #{
- <<"request_timeout">> => <<"1s">>,
- <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>}
- }
- ),
- %% root request_timeout is deprecated for bridge.
- {ok, 201,
- #{
- <<"resource_opts">> := ResourceOpts
- } = Response} =
- request_json(
- post,
- uri(["bridges"]),
- BadBridgeParams,
- Config
- ),
- ?assertNot(maps:is_key(<<"request_timeout">>, Response)),
- ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts),
- validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name),
- ok.
- t_cluster_later_join_metrics(Config) ->
- Port = ?config(port, Config),
- [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config),
- URL1 = ?URL(Port, "path1"),
- Name = ?BRIDGE_NAME,
- BridgeParams = ?HTTP_BRIDGE(URL1, Name),
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- ?check_trace(
- #{timetrap => 15_000},
- begin
- %% Create a bridge on only one of the nodes.
- ?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)),
- %% Pre-condition.
- ?assertMatch(
- {ok, 200, #{
- <<"metrics">> := #{<<"success">> := _},
- <<"node_metrics">> := [_ | _]
- }},
- request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
- ),
- ct:print("node joining cluster"),
- %% Now join the other node join with the api node.
- ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
- %% Hack / workaround for the fact that `emqx_machine_boot' doesn't restart the
- %% applications, in particular `emqx_conf' doesn't restart and synchronize the
- %% transaction id. It's also unclear at the moment why the equivalent test in
- %% `emqx_bridge_v2_api_SUITE' doesn't need this hack.
- ok = erpc:call(OtherNode, application, stop, [emqx_conf]),
- ok = erpc:call(OtherNode, application, start, [emqx_conf]),
- ct:print("node joined cluster"),
- %% assert: wait for the bridge to be ready on the other node.
- {_, {ok, _}} =
- ?wait_async_action(
- {emqx_cluster_rpc, OtherNode} ! wake_up,
- #{?snk_kind := cluster_rpc_caught_up, ?snk_meta := #{node := OtherNode}},
- 10_000
- ),
- %% Check metrics; shouldn't crash even if the bridge is not
- %% ready on the node that just joined the cluster.
- ?assertMatch(
- {ok, 200, #{
- <<"metrics">> := #{<<"success">> := _},
- <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _]
- }},
- request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
- ),
- ok
- end,
- []
- ),
- ok.
- t_create_with_bad_name(Config) ->
- Port = ?config(port, Config),
- URL1 = ?URL(Port, "path1"),
- Name = <<"test_哈哈">>,
- BadBridgeParams =
- emqx_utils_maps:deep_merge(
- ?HTTP_BRIDGE(URL1, Name),
- #{
- <<"ssl">> =>
- #{
- <<"enable">> => true,
- <<"certfile">> => cert_file("certfile")
- }
- }
- ),
- {ok, 400, #{
- <<"code">> := <<"BAD_REQUEST">>,
- <<"message">> := Msg0
- }} =
- request_json(
- post,
- uri(["bridges"]),
- BadBridgeParams,
- Config
- ),
- Msg = emqx_utils_json:decode(Msg0, [return_maps]),
- ?assertMatch(
- #{
- <<"kind">> := <<"validation_error">>,
- <<"reason">> := <<"Invalid name format.", _/binary>>
- },
- Msg
- ),
- ok.
- validate_resource_request_ttl(single, Timeout, Name) ->
- SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
- _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- ?check_trace(
- begin
- {ok, Res} =
- ?wait_async_action(
- do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
- #{?snk_kind := async_query},
- 1000
- ),
- ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res)
- end,
- fun(Trace0) ->
- Trace = ?of_kind(async_query, Trace0),
- ?assertMatch([#{query_opts := #{timeout := Timeout}}], Trace),
- ok
- end
- );
- validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
- ignore.
- do_send_message(BridgeV1Type, Name, Message) ->
- Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
- emqx_bridge_v2:send_message(Type, Name, Message, #{}).
- %%
- request(Method, URL, Config) ->
- request(Method, URL, [], Config).
- request(Method, {operation, Type, Op, BridgeID}, Body, Config) ->
- URL = operation_path(Type, Op, BridgeID, Config),
- request(Method, URL, Body, Config);
- request(Method, URL, Body, Config) ->
- AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)),
- Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
- emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts).
- request(Method, URL, Body, Decoder, Config) ->
- case request(Method, URL, Body, Config) of
- {ok, Code, Response} ->
- {ok, Code, Decoder(Response)};
- Otherwise ->
- Otherwise
- end.
- request_json(Method, URLLike, Config) ->
- request(Method, URLLike, [], fun json/1, Config).
- request_json(Method, URLLike, Body, Config) ->
- request(Method, URLLike, Body, fun json/1, Config).
- operation_path(node, Oper, BridgeID, Config) ->
- uri(["nodes", ?config(node, Config), "bridges", BridgeID, Oper]);
- operation_path(cluster, Oper, BridgeID, _Config) ->
- uri(["bridges", BridgeID, Oper]).
- enable_path(Enable, BridgeID) ->
- uri(["bridges", BridgeID, "enable", Enable]).
- publish_message(Topic, Body, Config) ->
- Node = ?config(node, Config),
- erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]).
- update_config(Path, Value, Config) ->
- Node = ?config(node, Config),
- erpc:call(Node, emqx, update_config, [Path, Value]).
- get_raw_config(Path, Config) ->
- Node = ?config(node, Config),
- erpc:call(Node, emqx, get_raw_config, [Path]).
- add_user_auth(Chain, AuthenticatorID, User, Config) ->
- Node = ?config(node, Config),
- erpc:call(Node, emqx_authn_chains, add_user, [Chain, AuthenticatorID, User]).
- delete_user_auth(Chain, AuthenticatorID, User, Config) ->
- Node = ?config(node, Config),
- erpc:call(Node, emqx_authn_chains, delete_user, [Chain, AuthenticatorID, User]).
- str(S) when is_list(S) -> S;
- str(S) when is_binary(S) -> binary_to_list(S).
- json(B) when is_binary(B) ->
- emqx_utils_json:decode(B, [return_maps]).
- data_file(Name) ->
- Dir = code:lib_dir(emqx_bridge, test),
- {ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),
- Bin.
- cert_file(Name) ->
- data_file(filename:join(["certs", Name])).
|