emqx_bridge_v2_api_SUITE.erl 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_v2_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -import(emqx_mgmt_api_test_util, [uri/1]).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -include_lib("snabbkaffe/include/test_macros.hrl").
  22. -define(ACTIONS_ROOT, "actions").
  23. -define(SOURCES_ROOT, "sources").
  24. -define(ACTION_CONNECTOR_NAME, <<"my_connector">>).
  25. -define(SOURCE_CONNECTOR_NAME, <<"my_connector">>).
  26. -define(RESOURCE(NAME, TYPE), #{
  27. <<"enable">> => true,
  28. %<<"ssl">> => #{<<"enable">> => false},
  29. <<"type">> => TYPE,
  30. <<"name">> => NAME
  31. }).
  32. -define(ACTION_CONNECTOR_TYPE_STR, "kafka_producer").
  33. -define(ACTION_CONNECTOR_TYPE, <<?ACTION_CONNECTOR_TYPE_STR>>).
  34. -define(KAFKA_BOOTSTRAP_HOST, <<"127.0.0.1:9092">>).
  35. -define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?RESOURCE(Name, ?ACTION_CONNECTOR_TYPE)#{
  36. <<"authentication">> => <<"none">>,
  37. <<"bootstrap_hosts">> => BootstrapHosts,
  38. <<"connect_timeout">> => <<"5s">>,
  39. <<"metadata_request_timeout">> => <<"5s">>,
  40. <<"min_metadata_refresh_interval">> => <<"3s">>,
  41. <<"socket_opts">> =>
  42. #{
  43. <<"nodelay">> => true,
  44. <<"recbuf">> => <<"1024KB">>,
  45. <<"sndbuf">> => <<"1024KB">>,
  46. <<"tcp_keepalive">> => <<"none">>
  47. }
  48. }).
  49. -define(ACTIONS_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
  50. -define(ACTIONS_CONNECTOR, ?ACTIONS_CONNECTOR(?ACTION_CONNECTOR_NAME)).
  51. -define(MQTT_LOCAL_TOPIC, <<"mqtt/local/topic">>).
  52. -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
  53. -define(ACTION_TYPE_STR, "kafka_producer").
  54. -define(ACTION_TYPE, <<?ACTION_TYPE_STR>>).
  55. -define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?ACTION_TYPE)#{
  56. <<"connector">> => Connector,
  57. <<"kafka">> => #{
  58. <<"buffer">> => #{
  59. <<"memory_overload_protection">> => true,
  60. <<"mode">> => <<"hybrid">>,
  61. <<"per_partition_limit">> => <<"2GB">>,
  62. <<"segment_bytes">> => <<"100MB">>
  63. },
  64. <<"compression">> => <<"no_compression">>,
  65. <<"kafka_ext_headers">> => [
  66. #{
  67. <<"kafka_ext_header_key">> => <<"clientid">>,
  68. <<"kafka_ext_header_value">> => <<"${clientid}">>
  69. },
  70. #{
  71. <<"kafka_ext_header_key">> => <<"topic">>,
  72. <<"kafka_ext_header_value">> => <<"${topic}">>
  73. }
  74. ],
  75. <<"kafka_header_value_encode_mode">> => <<"none">>,
  76. <<"kafka_headers">> => <<"${pub_props}">>,
  77. <<"max_batch_bytes">> => <<"896KB">>,
  78. <<"max_inflight">> => 10,
  79. <<"message">> => #{
  80. <<"key">> => <<"${.clientid}">>,
  81. <<"timestamp">> => <<"${.timestamp}">>,
  82. <<"value">> => <<"${.}">>
  83. },
  84. <<"partition_count_refresh_interval">> => <<"60s">>,
  85. <<"partition_strategy">> => <<"random">>,
  86. <<"required_acks">> => <<"all_isr">>,
  87. <<"topic">> => <<"kafka-topic">>
  88. },
  89. <<"local_topic">> => ?MQTT_LOCAL_TOPIC,
  90. <<"resource_opts">> => #{
  91. <<"health_check_interval">> => <<"32s">>
  92. }
  93. }).
  94. -define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?ACTION_CONNECTOR_NAME)).
  95. -define(KAFKA_BRIDGE_UPDATE(Name, Connector),
  96. maps:without([<<"name">>, <<"type">>], ?KAFKA_BRIDGE(Name, Connector))
  97. ).
  98. -define(SOURCE_TYPE_STR, "mqtt").
  99. -define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
  100. -define(APPSPECS, [
  101. emqx_conf,
  102. emqx,
  103. emqx_auth,
  104. emqx_management,
  105. emqx_connector,
  106. {emqx_bridge, "actions {}"},
  107. {emqx_rule_engine, "rule_engine { rules {} }"}
  108. ]).
  109. -define(APPSPEC_DASHBOARD,
  110. {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
  111. ).
  112. %%------------------------------------------------------------------------------
  113. %% CT boilerplate
  114. %%------------------------------------------------------------------------------
  115. -if(?EMQX_RELEASE_EDITION == ee).
  116. %% For now we got only kafka implementing `bridge_v2` and that is enterprise only.
  117. all() ->
  118. All0 = emqx_common_test_helpers:all(?MODULE),
  119. All = All0 -- matrix_cases(),
  120. Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
  121. Groups ++ All.
  122. -else.
  123. all() ->
  124. [].
  125. -endif.
  126. matrix_cases() ->
  127. emqx_common_test_helpers:all(?MODULE).
  128. groups() ->
  129. emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
  130. suite() ->
  131. [{timetrap, {seconds, 60}}].
  132. init_per_suite(Config) ->
  133. Config.
  134. end_per_suite(_Config) ->
  135. ok.
  136. init_per_group(cluster = Name, Config) ->
  137. Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
  138. init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
  139. init_per_group(cluster_later_join = Name, Config) ->
  140. Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
  141. init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
  142. init_per_group(single = Group, Config) ->
  143. WorkDir = filename:join(?config(priv_dir, Config), Group),
  144. Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
  145. init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]);
  146. init_per_group(actions, Config) ->
  147. [{bridge_kind, action} | Config];
  148. init_per_group(sources, Config) ->
  149. [{bridge_kind, source} | Config];
  150. init_per_group(_Group, Config) ->
  151. Config.
  152. init_api(Config) ->
  153. Node = ?config(node, Config),
  154. {ok, ApiKey} = erpc:call(Node, emqx_common_test_http, create_default_app, []),
  155. [{api_key, ApiKey} | Config].
  156. mk_cluster(Name, Config) ->
  157. mk_cluster(Name, Config, #{}).
  158. mk_cluster(Name, Config, Opts) ->
  159. Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD],
  160. Node2Apps = ?APPSPECS,
  161. emqx_cth_cluster:start(
  162. [
  163. {emqx_bridge_v2_api_SUITE_1, Opts#{role => core, apps => Node1Apps}},
  164. {emqx_bridge_v2_api_SUITE_2, Opts#{role => core, apps => Node2Apps}}
  165. ],
  166. #{work_dir => emqx_cth_suite:work_dir(Name, Config)}
  167. ).
  168. end_per_group(Group, Config) when
  169. Group =:= cluster;
  170. Group =:= cluster_later_join
  171. ->
  172. ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
  173. end_per_group(single, Config) ->
  174. emqx_cth_suite:stop(?config(group_apps, Config)),
  175. ok;
  176. end_per_group(_Group, _Config) ->
  177. ok.
  178. init_per_testcase(t_action_types, Config) ->
  179. case ?config(cluster_nodes, Config) of
  180. undefined ->
  181. init_mocks();
  182. Nodes ->
  183. [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
  184. end,
  185. Config;
  186. init_per_testcase(_TestCase, Config) ->
  187. case ?config(cluster_nodes, Config) of
  188. undefined ->
  189. init_mocks();
  190. Nodes ->
  191. [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
  192. end,
  193. case ?config(bridge_kind, Config) of
  194. action ->
  195. {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config);
  196. source ->
  197. {ok, 201, _} = request(
  198. post,
  199. uri(["connectors"]),
  200. source_connector_create_config(#{}),
  201. Config
  202. )
  203. end,
  204. Config.
  205. end_per_testcase(_TestCase, Config) ->
  206. Node = ?config(node, Config),
  207. ok = erpc:call(Node, fun clear_resources/0),
  208. case ?config(cluster_nodes, Config) of
  209. undefined ->
  210. meck:unload();
  211. ClusterNodes ->
  212. [erpc:call(ClusterNode, meck, unload, []) || ClusterNode <- ClusterNodes]
  213. end,
  214. ok = emqx_common_test_helpers:call_janitor(),
  215. ok.
  216. %%------------------------------------------------------------------------------
  217. %% Helper fns
  218. %%------------------------------------------------------------------------------
  219. -define(CONNECTOR_IMPL, emqx_bridge_v2_dummy_connector).
  220. init_mocks() ->
  221. meck:new(emqx_connector_resource, [passthrough, no_link]),
  222. meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL),
  223. meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
  224. meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible),
  225. meck:expect(
  226. ?CONNECTOR_IMPL,
  227. on_start,
  228. fun
  229. (<<"connector:", ?ACTION_CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) ->
  230. {ok, bad_connector_state};
  231. (_I, _C) ->
  232. {ok, connector_state}
  233. end
  234. ),
  235. meck:expect(?CONNECTOR_IMPL, on_stop, 2, ok),
  236. meck:expect(
  237. ?CONNECTOR_IMPL,
  238. on_get_status,
  239. fun
  240. (_, bad_connector_state) -> connecting;
  241. (_, _) -> connected
  242. end
  243. ),
  244. meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
  245. meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
  246. meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
  247. ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
  248. emqx_bridge_v2:get_channels_for_connector(ResId)
  249. end),
  250. meck:expect(?CONNECTOR_IMPL, on_query_async, fun(_ResId, _Req, ReplyFunAndArgs, _ConnState) ->
  251. emqx_resource:apply_reply_fun(ReplyFunAndArgs, ok),
  252. {ok, self()}
  253. end),
  254. ok.
  255. clear_resources() ->
  256. emqx_bridge_v2_testlib:delete_all_bridges_and_connectors().
  257. expect_on_all_nodes(Mod, Function, Fun, Config) ->
  258. case ?config(cluster_nodes, Config) of
  259. undefined ->
  260. ok = meck:expect(Mod, Function, Fun);
  261. Nodes ->
  262. [erpc:call(Node, meck, expect, [Mod, Function, Fun]) || Node <- Nodes]
  263. end,
  264. ok.
  265. connector_operation(Config, ConnectorType, ConnectorName, OperationName) ->
  266. case ?config(group, Config) of
  267. cluster ->
  268. case ?config(cluster_nodes, Config) of
  269. undefined ->
  270. Node = ?config(node, Config),
  271. ok = rpc:call(
  272. Node,
  273. emqx_connector_resource,
  274. OperationName,
  275. [ConnectorType, ConnectorName],
  276. 500
  277. );
  278. Nodes ->
  279. erpc:multicall(
  280. Nodes,
  281. emqx_connector_resource,
  282. OperationName,
  283. [ConnectorType, ConnectorName],
  284. 500
  285. )
  286. end;
  287. _ ->
  288. ok = emqx_connector_resource:OperationName(ConnectorType, ConnectorName)
  289. end.
  290. listen_on_random_port() ->
  291. SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
  292. case gen_tcp:listen(0, SockOpts) of
  293. {ok, Sock} ->
  294. {ok, Port} = inet:port(Sock),
  295. {Port, Sock};
  296. {error, Reason} when Reason /= eaddrinuse ->
  297. {error, Reason}
  298. end.
  299. request(Method, URL, Config) ->
  300. request(Method, URL, [], Config).
  301. request(Method, {operation, Type, Op, BridgeID}, Body, Config) ->
  302. URL = operation_path(Type, Op, BridgeID, Config),
  303. request(Method, URL, Body, Config);
  304. request(Method, URL, Body, Config) ->
  305. AuthHeader = emqx_common_test_http:auth_header(?config(api_key, Config)),
  306. Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
  307. emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts).
  308. request(Method, URL, Body, Decoder, Config) ->
  309. case request(Method, URL, Body, Config) of
  310. {ok, Code, Response} ->
  311. case Decoder(Response) of
  312. {error, _} = Error -> Error;
  313. Decoded -> {ok, Code, Decoded}
  314. end;
  315. Otherwise ->
  316. Otherwise
  317. end.
  318. request_json(Method, URLLike, Config) ->
  319. request(Method, URLLike, [], fun json/1, Config).
  320. request_json(Method, URLLike, Body, Config) ->
  321. request(Method, URLLike, Body, fun json/1, Config).
  322. operation_path(node, Oper, BridgeID, Config) ->
  323. [_SingleOrCluster, Kind | _] = group_path(Config),
  324. #{api_root_key := APIRootKey} = get_common_values(Kind, <<"unused">>),
  325. uri(["nodes", ?config(node, Config), APIRootKey, BridgeID, Oper]);
  326. operation_path(cluster, Oper, BridgeID, Config) ->
  327. [_SingleOrCluster, Kind | _] = group_path(Config),
  328. #{api_root_key := APIRootKey} = get_common_values(Kind, <<"unused">>),
  329. uri([APIRootKey, BridgeID, Oper]).
  330. enable_path(Enable, BridgeID) ->
  331. uri([?ACTIONS_ROOT, BridgeID, "enable", Enable]).
  332. publish_message(Topic, Body, Config) ->
  333. Node = ?config(node, Config),
  334. publish_message(Topic, Body, Node, Config).
  335. publish_message(Topic, Body, Node, _Config) ->
  336. erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]).
  337. update_config(Path, Value, Config) ->
  338. Node = ?config(node, Config),
  339. erpc:call(Node, emqx, update_config, [Path, Value]).
  340. get_raw_config(Path, Config) ->
  341. Node = ?config(node, Config),
  342. erpc:call(Node, emqx, get_raw_config, [Path]).
  343. add_user_auth(Chain, AuthenticatorID, User, Config) ->
  344. Node = ?config(node, Config),
  345. erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]).
  346. delete_user_auth(Chain, AuthenticatorID, User, Config) ->
  347. Node = ?config(node, Config),
  348. erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]).
  349. str(S) when is_list(S) -> S;
  350. str(S) when is_binary(S) -> binary_to_list(S).
  351. json(B) when is_binary(B) ->
  352. case emqx_utils_json:safe_decode(B, [return_maps]) of
  353. {ok, Term} ->
  354. Term;
  355. {error, Reason} = Error ->
  356. ct:pal("Failed to decode json: ~p~n~p", [Reason, B]),
  357. Error
  358. end.
  359. group_path(Config) ->
  360. case emqx_common_test_helpers:group_path(Config) of
  361. [] ->
  362. undefined;
  363. Path ->
  364. Path
  365. end.
  366. source_connector_config_base() ->
  367. #{
  368. <<"enable">> => true,
  369. <<"description">> => <<"my connector">>,
  370. <<"pool_size">> => 3,
  371. <<"proto_ver">> => <<"v5">>,
  372. <<"server">> => <<"127.0.0.1:1883">>,
  373. <<"resource_opts">> => #{
  374. <<"health_check_interval">> => <<"15s">>,
  375. <<"start_after_created">> => true,
  376. <<"start_timeout">> => <<"5s">>
  377. }
  378. }.
  379. source_connector_create_config(Overrides0) ->
  380. Overrides = emqx_utils_maps:binary_key_map(Overrides0),
  381. Conf0 = maps:merge(
  382. source_connector_config_base(),
  383. #{
  384. <<"enable">> => true,
  385. <<"type">> => ?SOURCE_TYPE,
  386. <<"name">> => ?SOURCE_CONNECTOR_NAME
  387. }
  388. ),
  389. maps:merge(
  390. Conf0,
  391. Overrides
  392. ).
  393. source_config_base() ->
  394. #{
  395. <<"enable">> => true,
  396. <<"connector">> => ?SOURCE_CONNECTOR_NAME,
  397. <<"parameters">> =>
  398. #{
  399. <<"topic">> => <<"remote/topic">>,
  400. <<"qos">> => 2
  401. },
  402. <<"resource_opts">> => #{
  403. <<"health_check_interval">> => <<"15s">>,
  404. <<"resume_interval">> => <<"15s">>
  405. }
  406. }.
  407. source_create_config(Overrides0) ->
  408. Overrides = emqx_utils_maps:binary_key_map(Overrides0),
  409. Conf0 = maps:merge(
  410. source_config_base(),
  411. #{
  412. <<"enable">> => true,
  413. <<"type">> => ?SOURCE_TYPE
  414. }
  415. ),
  416. maps:merge(
  417. Conf0,
  418. Overrides
  419. ).
  420. source_update_config(Overrides0) ->
  421. Overrides = emqx_utils_maps:binary_key_map(Overrides0),
  422. maps:merge(
  423. source_config_base(),
  424. Overrides
  425. ).
  426. get_common_values(Kind, FnName) ->
  427. case Kind of
  428. actions ->
  429. #{
  430. api_root_key => ?ACTIONS_ROOT,
  431. type => ?ACTION_TYPE,
  432. default_connector_name => ?ACTION_CONNECTOR_NAME,
  433. create_config_fn =>
  434. fun(Overrides) ->
  435. Name = maps:get(name, Overrides, FnName),
  436. ConnectorName = maps:get(connector, Overrides, ?ACTION_CONNECTOR_NAME),
  437. ?KAFKA_BRIDGE(Name, ConnectorName)
  438. end,
  439. update_config_fn =>
  440. fun(Overrides) ->
  441. Name = maps:get(name, Overrides, FnName),
  442. ConnectorName = maps:get(connector, Overrides, ?ACTION_CONNECTOR_NAME),
  443. ?KAFKA_BRIDGE_UPDATE(Name, ConnectorName)
  444. end,
  445. create_connector_config_fn =>
  446. fun(Overrides) ->
  447. ConnectorName = maps:get(name, Overrides, ?ACTION_CONNECTOR_NAME),
  448. ?ACTIONS_CONNECTOR(ConnectorName)
  449. end
  450. };
  451. sources ->
  452. #{
  453. api_root_key => ?SOURCES_ROOT,
  454. type => ?SOURCE_TYPE,
  455. default_connector_name => ?SOURCE_CONNECTOR_NAME,
  456. create_config_fn => fun(Overrides0) ->
  457. Overrides =
  458. case Overrides0 of
  459. #{name := _} -> Overrides0;
  460. _ -> Overrides0#{name => FnName}
  461. end,
  462. source_create_config(Overrides)
  463. end,
  464. update_config_fn => fun source_update_config/1,
  465. create_connector_config_fn => fun source_connector_create_config/1
  466. }
  467. end.
  468. maybe_get_other_node(Config) ->
  469. %% In the single node test group, this simply returns the lone node. Otherwise, it'll
  470. %% return a node that's not the primary one that receives API calls.
  471. PrimaryNode = ?config(node, Config),
  472. case proplists:get_value(cluster_nodes, Config, []) -- [PrimaryNode] of
  473. [] ->
  474. PrimaryNode;
  475. [OtherNode | _] ->
  476. OtherNode
  477. end.
  478. %%------------------------------------------------------------------------------
  479. %% Testcases
  480. %%------------------------------------------------------------------------------
  481. %% We have to pretend testing a kafka bridge since at this point that's the
  482. %% only one that's implemented.
  483. t_bridges_lifecycle(matrix) ->
  484. [
  485. [single, actions],
  486. [single, sources],
  487. [cluster, actions],
  488. [cluster, sources]
  489. ];
  490. t_bridges_lifecycle(Config) ->
  491. [_SingleOrCluster, Kind | _] = group_path(Config),
  492. FnName = atom_to_binary(?FUNCTION_NAME),
  493. #{
  494. api_root_key := APIRootKey,
  495. type := Type,
  496. default_connector_name := DefaultConnectorName,
  497. create_config_fn := CreateConfigFn,
  498. update_config_fn := UpdateConfigFn,
  499. create_connector_config_fn := CreateConnectorConfigFn
  500. } = get_common_values(Kind, FnName),
  501. %% assert we there's no bridges at first
  502. {ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
  503. {ok, 404, _} = request(get, uri([APIRootKey, "foo"]), Config),
  504. {ok, 404, _} = request(get, uri([APIRootKey, "kafka_producer:foo"]), Config),
  505. %% need a var for patterns below
  506. BridgeName = FnName,
  507. CreateRes = request_json(
  508. post,
  509. uri([APIRootKey]),
  510. CreateConfigFn(#{}),
  511. Config
  512. ),
  513. ?assertMatch(
  514. {ok, 201, #{
  515. <<"type">> := Type,
  516. <<"name">> := BridgeName,
  517. <<"enable">> := true,
  518. <<"status">> := <<"connected">>,
  519. <<"node_status">> := [_ | _],
  520. <<"connector">> := DefaultConnectorName,
  521. <<"parameters">> := #{},
  522. <<"resource_opts">> := _
  523. }},
  524. CreateRes,
  525. #{name => BridgeName, type => Type, connector => DefaultConnectorName}
  526. ),
  527. case Kind of
  528. actions ->
  529. ?assertMatch({ok, 201, #{<<"local_topic">> := _}}, CreateRes);
  530. sources ->
  531. ok
  532. end,
  533. %% list all bridges, assert bridge is in it
  534. ?assertMatch(
  535. {ok, 200, [
  536. #{
  537. <<"type">> := Type,
  538. <<"name">> := BridgeName,
  539. <<"enable">> := true,
  540. <<"status">> := _,
  541. <<"node_status">> := [_ | _]
  542. }
  543. ]},
  544. request_json(get, uri([APIRootKey]), Config)
  545. ),
  546. %% list all bridges, assert bridge is in it
  547. ?assertMatch(
  548. {ok, 200, [
  549. #{
  550. <<"type">> := Type,
  551. <<"name">> := BridgeName,
  552. <<"enable">> := true,
  553. <<"status">> := _,
  554. <<"node_status">> := [_ | _]
  555. }
  556. ]},
  557. request_json(get, uri([APIRootKey]), Config)
  558. ),
  559. %% get the bridge by id
  560. BridgeID = emqx_bridge_resource:bridge_id(Type, ?BRIDGE_NAME),
  561. ?assertMatch(
  562. {ok, 200, #{
  563. <<"type">> := Type,
  564. <<"name">> := BridgeName,
  565. <<"enable">> := true,
  566. <<"status">> := _,
  567. <<"node_status">> := [_ | _]
  568. }},
  569. request_json(get, uri([APIRootKey, BridgeID]), Config)
  570. ),
  571. ?assertMatch(
  572. {ok, 400, #{
  573. <<"code">> := <<"BAD_REQUEST">>,
  574. <<"message">> := _
  575. }},
  576. request_json(post, uri([APIRootKey, BridgeID, "brababbel"]), Config)
  577. ),
  578. %% update bridge config
  579. {ok, 201, _} = request(
  580. post,
  581. uri(["connectors"]),
  582. CreateConnectorConfigFn(#{name => <<"foobla">>}),
  583. Config
  584. ),
  585. ?assertMatch(
  586. {ok, 200, #{
  587. <<"type">> := Type,
  588. <<"name">> := BridgeName,
  589. <<"connector">> := <<"foobla">>,
  590. <<"enable">> := true,
  591. <<"status">> := _,
  592. <<"node_status">> := [_ | _]
  593. }},
  594. request_json(
  595. put,
  596. uri([APIRootKey, BridgeID]),
  597. UpdateConfigFn(#{connector => <<"foobla">>}),
  598. Config
  599. )
  600. ),
  601. %% update bridge with unknown connector name
  602. {ok, 400, #{
  603. <<"code">> := <<"BAD_REQUEST">>,
  604. <<"message">> := Message1
  605. }} =
  606. request_json(
  607. put,
  608. uri([APIRootKey, BridgeID]),
  609. UpdateConfigFn(#{connector => <<"does_not_exist">>}),
  610. Config
  611. ),
  612. ?assertMatch(
  613. #{<<"reason">> := <<"connector_not_found_or_wrong_type">>},
  614. emqx_utils_json:decode(Message1)
  615. ),
  616. %% update bridge with connector of wrong type
  617. {ok, 201, _} =
  618. request(
  619. post,
  620. uri(["connectors"]),
  621. (?ACTIONS_CONNECTOR(<<"foobla2">>))#{
  622. <<"type">> => <<"azure_event_hub_producer">>,
  623. <<"authentication">> => #{
  624. <<"username">> => <<"emqxuser">>,
  625. <<"password">> => <<"topSecret">>,
  626. <<"mechanism">> => <<"plain">>
  627. },
  628. <<"ssl">> => #{
  629. <<"enable">> => true,
  630. <<"server_name_indication">> => <<"auto">>,
  631. <<"verify">> => <<"verify_none">>,
  632. <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
  633. }
  634. },
  635. Config
  636. ),
  637. {ok, 400, #{
  638. <<"code">> := <<"BAD_REQUEST">>,
  639. <<"message">> := Message2
  640. }} =
  641. request_json(
  642. put,
  643. uri([APIRootKey, BridgeID]),
  644. UpdateConfigFn(#{connector => <<"foobla2">>}),
  645. Config
  646. ),
  647. ?assertMatch(
  648. #{<<"reason">> := <<"connector_not_found_or_wrong_type">>},
  649. emqx_utils_json:decode(Message2)
  650. ),
  651. %% delete the bridge
  652. {ok, 204, <<>>} = request(delete, uri([APIRootKey, BridgeID]), Config),
  653. {ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
  654. %% try create with unknown connector name
  655. {ok, 400, #{
  656. <<"code">> := <<"BAD_REQUEST">>,
  657. <<"message">> := Message3
  658. }} =
  659. request_json(
  660. post,
  661. uri([APIRootKey]),
  662. CreateConfigFn(#{connector => <<"does_not_exist">>}),
  663. Config
  664. ),
  665. ?assertMatch(
  666. #{<<"reason">> := <<"connector_not_found_or_wrong_type">>},
  667. emqx_utils_json:decode(Message3)
  668. ),
  669. %% try create bridge with connector of wrong type
  670. {ok, 400, #{
  671. <<"code">> := <<"BAD_REQUEST">>,
  672. <<"message">> := Message4
  673. }} =
  674. request_json(
  675. post,
  676. uri([APIRootKey]),
  677. CreateConfigFn(#{connector => <<"foobla2">>}),
  678. Config
  679. ),
  680. ?assertMatch(
  681. #{<<"reason">> := <<"connector_not_found_or_wrong_type">>},
  682. emqx_utils_json:decode(Message4)
  683. ),
  684. %% make sure nothing has been created above
  685. {ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
  686. %% update a deleted bridge returns an error
  687. ?assertMatch(
  688. {ok, 404, #{
  689. <<"code">> := <<"NOT_FOUND">>,
  690. <<"message">> := _
  691. }},
  692. request_json(
  693. put,
  694. uri([APIRootKey, BridgeID]),
  695. UpdateConfigFn(#{}),
  696. Config
  697. )
  698. ),
  699. %% deleting a non-existing bridge should result in an error
  700. ?assertMatch(
  701. {ok, 404, #{
  702. <<"code">> := <<"NOT_FOUND">>,
  703. <<"message">> := _
  704. }},
  705. request_json(delete, uri([APIRootKey, BridgeID]), Config)
  706. ),
  707. %% try delete unknown bridge id
  708. ?assertMatch(
  709. {ok, 404, #{
  710. <<"code">> := <<"NOT_FOUND">>,
  711. <<"message">> := <<"Invalid bridge ID", _/binary>>
  712. }},
  713. request_json(delete, uri([APIRootKey, "foo"]), Config)
  714. ),
  715. %% Try create bridge with bad characters as name
  716. {ok, 400, _} = request(
  717. post, uri([APIRootKey]), CreateConfigFn(#{name => <<"隋达"/utf8>>}), Config
  718. ),
  719. {ok, 400, _} = request(post, uri([APIRootKey]), CreateConfigFn(#{name => <<"a.b">>}), Config),
  720. ok.
  721. t_broken_bridge_config(matrix) ->
  722. [
  723. [single, actions]
  724. ];
  725. t_broken_bridge_config(Config) ->
  726. emqx_cth_suite:stop_apps([emqx_bridge]),
  727. BridgeName = ?BRIDGE_NAME,
  728. StartOps =
  729. #{
  730. config =>
  731. "actions {\n"
  732. " "
  733. ?ACTION_TYPE_STR
  734. " {\n"
  735. " " ++ binary_to_list(BridgeName) ++
  736. " {\n"
  737. " connector = does_not_exist\n"
  738. " enable = true\n"
  739. " kafka {\n"
  740. " topic = test-topic-one-partition\n"
  741. " }\n"
  742. " local_topic = \"mqtt/local/topic\"\n"
  743. " resource_opts {health_check_interval = 32s}\n"
  744. " }\n"
  745. " }\n"
  746. "}\n"
  747. "\n",
  748. schema_mod => emqx_bridge_v2_schema
  749. },
  750. emqx_cth_suite:start_app(emqx_bridge, StartOps),
  751. ?assertMatch(
  752. {ok, 200, [
  753. #{
  754. <<"name">> := BridgeName,
  755. <<"type">> := ?ACTION_TYPE,
  756. <<"connector">> := <<"does_not_exist">>,
  757. <<"status">> := <<"disconnected">>,
  758. <<"error">> := <<"Not installed">>
  759. }
  760. ]},
  761. request_json(get, uri([?ACTIONS_ROOT]), Config)
  762. ),
  763. BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME),
  764. ?assertEqual(
  765. {ok, 204, <<>>},
  766. request(delete, uri([?ACTIONS_ROOT, BridgeID]), Config)
  767. ),
  768. ?assertEqual(
  769. {ok, 200, []},
  770. request_json(get, uri([?ACTIONS_ROOT]), Config)
  771. ),
  772. ok.
  773. t_fix_broken_bridge_config(matrix) ->
  774. [
  775. [single, actions]
  776. ];
  777. t_fix_broken_bridge_config(Config) ->
  778. emqx_cth_suite:stop_apps([emqx_bridge]),
  779. BridgeName = ?BRIDGE_NAME,
  780. StartOps =
  781. #{
  782. config =>
  783. "actions {\n"
  784. " "
  785. ?ACTION_TYPE_STR
  786. " {\n"
  787. " " ++ binary_to_list(BridgeName) ++
  788. " {\n"
  789. " connector = does_not_exist\n"
  790. " enable = true\n"
  791. " kafka {\n"
  792. " topic = test-topic-one-partition\n"
  793. " }\n"
  794. " local_topic = \"mqtt/local/topic\"\n"
  795. " resource_opts {health_check_interval = 32s}\n"
  796. " }\n"
  797. " }\n"
  798. "}\n"
  799. "\n",
  800. schema_mod => emqx_bridge_v2_schema
  801. },
  802. emqx_cth_suite:start_app(emqx_bridge, StartOps),
  803. ?assertMatch(
  804. {ok, 200, [
  805. #{
  806. <<"name">> := BridgeName,
  807. <<"type">> := ?ACTION_TYPE,
  808. <<"connector">> := <<"does_not_exist">>,
  809. <<"status">> := <<"disconnected">>,
  810. <<"error">> := <<"Not installed">>
  811. }
  812. ]},
  813. request_json(get, uri([?ACTIONS_ROOT]), Config)
  814. ),
  815. BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME),
  816. request_json(
  817. put,
  818. uri([?ACTIONS_ROOT, BridgeID]),
  819. ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?ACTION_CONNECTOR_NAME),
  820. Config
  821. ),
  822. ?assertMatch(
  823. {ok, 200, #{
  824. <<"connector">> := ?ACTION_CONNECTOR_NAME,
  825. <<"status">> := <<"connected">>
  826. }},
  827. request_json(get, uri([?ACTIONS_ROOT, BridgeID]), Config)
  828. ),
  829. ok.
  830. t_start_bridge_unknown_node(matrix) ->
  831. [
  832. [single, actions],
  833. [cluster, actions]
  834. ];
  835. t_start_bridge_unknown_node(Config) ->
  836. {ok, 404, _} =
  837. request(
  838. post,
  839. uri(["nodes", "thisbetterbenotanatomyet", ?ACTIONS_ROOT, "kafka_producer:foo", start]),
  840. Config
  841. ),
  842. {ok, 404, _} =
  843. request(
  844. post,
  845. uri(["nodes", "undefined", ?ACTIONS_ROOT, "kafka_producer:foo", start]),
  846. Config
  847. ).
  848. t_start_bridge_node(matrix) ->
  849. [
  850. [single, actions],
  851. [single, sources],
  852. [cluster, actions],
  853. [cluster, sources]
  854. ];
  855. t_start_bridge_node(Config) ->
  856. do_start_bridge(node, Config).
  857. t_start_bridge_cluster(matrix) ->
  858. [
  859. [single, actions],
  860. [single, sources],
  861. [cluster, actions],
  862. [cluster, sources]
  863. ];
  864. t_start_bridge_cluster(Config) ->
  865. do_start_bridge(cluster, Config).
  866. do_start_bridge(TestType, Config) ->
  867. [_SingleOrCluster, Kind | _] = group_path(Config),
  868. Name = atom_to_binary(TestType),
  869. #{
  870. api_root_key := APIRootKey,
  871. type := Type,
  872. default_connector_name := DefaultConnectorName,
  873. create_config_fn := CreateConfigFn
  874. } = get_common_values(Kind, Name),
  875. %% assert we there's no bridges at first
  876. {ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
  877. ?assertMatch(
  878. {ok, 201, #{
  879. <<"type">> := Type,
  880. <<"name">> := Name,
  881. <<"enable">> := true,
  882. <<"status">> := <<"connected">>,
  883. <<"node_status">> := [_ | _]
  884. }},
  885. request_json(
  886. post,
  887. uri([APIRootKey]),
  888. CreateConfigFn(#{name => Name}),
  889. Config
  890. )
  891. ),
  892. BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
  893. %% start again
  894. {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config),
  895. ?assertMatch(
  896. {ok, 200, #{<<"status">> := <<"connected">>}},
  897. request_json(get, uri([APIRootKey, BridgeID]), Config)
  898. ),
  899. %% start a started bridge
  900. {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config),
  901. ?assertMatch(
  902. {ok, 200, #{<<"status">> := <<"connected">>}},
  903. request_json(get, uri([APIRootKey, BridgeID]), Config)
  904. ),
  905. {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config),
  906. %% Make start bridge fail
  907. expect_on_all_nodes(
  908. ?CONNECTOR_IMPL,
  909. on_add_channel,
  910. fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end,
  911. Config
  912. ),
  913. connector_operation(Config, Type, DefaultConnectorName, stop),
  914. connector_operation(Config, Type, DefaultConnectorName, start),
  915. {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config),
  916. %% Make start bridge succeed
  917. expect_on_all_nodes(
  918. ?CONNECTOR_IMPL,
  919. on_add_channel,
  920. fun(_, _, _ResId, _Channel) -> {ok, connector_state} end,
  921. Config
  922. ),
  923. %% try to start again
  924. {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config),
  925. %% delete the bridge
  926. {ok, 204, <<>>} = request(delete, uri([APIRootKey, BridgeID]), Config),
  927. {ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
  928. %% Fail parse-id check
  929. {ok, 404, _} = request(post, {operation, TestType, start, <<"wreckbook_fugazi">>}, Config),
  930. %% Looks ok but doesn't exist
  931. {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config),
  932. ok.
  933. %% t_start_stop_inconsistent_bridge_node(Config) ->
  934. %% start_stop_inconsistent_bridge(node, Config).
  935. %% t_start_stop_inconsistent_bridge_cluster(Config) ->
  936. %% start_stop_inconsistent_bridge(cluster, Config).
  937. %% start_stop_inconsistent_bridge(Type, Config) ->
  938. %% Node = ?config(node, Config),
  939. %% erpc:call(Node, fun() ->
  940. %% meck:new(emqx_bridge_resource, [passthrough, no_link]),
  941. %% meck:expect(
  942. %% emqx_bridge_resource,
  943. %% stop,
  944. %% fun
  945. %% (_, <<"bridge_not_found">>) -> {error, not_found};
  946. %% (BridgeType, Name) -> meck:passthrough([BridgeType, Name])
  947. %% end
  948. %% )
  949. %% end),
  950. %% emqx_common_test_helpers:on_exit(fun() ->
  951. %% erpc:call(Node, fun() ->
  952. %% meck:unload([emqx_bridge_resource])
  953. %% end)
  954. %% end),
  955. %% {ok, 201, _Bridge} = request(
  956. %% post,
  957. %% uri([?ROOT]),
  958. %% ?KAFKA_BRIDGE(<<"bridge_not_found">>),
  959. %% Config
  960. %% ),
  961. %% {ok, 503, _} = request(
  962. %% post, {operation, Type, stop, <<"kafka:bridge_not_found">>}, Config
  963. %% ).
  964. %% [TODO] This is a mess, need to clarify what the actual behavior needs to be
  965. %% like.
  966. %% t_enable_disable_bridges(Config) ->
  967. %% %% assert we there's no bridges at first
  968. %% {ok, 200, []} = request_json(get, uri([?ROOT]), Config),
  969. %% Name = ?BRIDGE_NAME,
  970. %% ?assertMatch(
  971. %% {ok, 201, #{
  972. %% <<"type">> := ?BRIDGE_TYPE,
  973. %% <<"name">> := Name,
  974. %% <<"enable">> := true,
  975. %% <<"status">> := <<"connected">>,
  976. %% <<"node_status">> := [_ | _]
  977. %% }},
  978. %% request_json(
  979. %% post,
  980. %% uri([?ROOT]),
  981. %% ?KAFKA_BRIDGE(Name),
  982. %% Config
  983. %% )
  984. %% ),
  985. %% BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
  986. %% %% disable it
  987. %% meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connecting),
  988. %% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config),
  989. %% ?assertMatch(
  990. %% {ok, 200, #{<<"status">> := <<"stopped">>}},
  991. %% request_json(get, uri([?ROOT, BridgeID]), Config)
  992. %% ),
  993. %% %% enable again
  994. %% meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
  995. %% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
  996. %% ?assertMatch(
  997. %% {ok, 200, #{<<"status">> := <<"connected">>}},
  998. %% request_json(get, uri([?ROOT, BridgeID]), Config)
  999. %% ),
  1000. %% %% enable an already started bridge
  1001. %% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
  1002. %% ?assertMatch(
  1003. %% {ok, 200, #{<<"status">> := <<"connected">>}},
  1004. %% request_json(get, uri([?ROOT, BridgeID]), Config)
  1005. %% ),
  1006. %% %% disable it again
  1007. %% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config),
  1008. %% %% bad param
  1009. %% {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config),
  1010. %% {ok, 404, _} = request(put, enable_path(true, "foo"), Config),
  1011. %% {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config),
  1012. %% {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config),
  1013. %% ?assertEqual(
  1014. %% #{
  1015. %% <<"code">> => <<"BAD_REQUEST">>,
  1016. %% <<"message">> => <<"Forbidden operation, bridge not enabled">>
  1017. %% },
  1018. %% Res
  1019. %% ),
  1020. %% {ok, 400, Res} = request(
  1021. %% post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config
  1022. %% ),
  1023. %% %% enable a stopped bridge
  1024. %% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config),
  1025. %% ?assertMatch(
  1026. %% {ok, 200, #{<<"status">> := <<"connected">>}},
  1027. %% request_json(get, uri([?ROOT, BridgeID]), Config)
  1028. %% ),
  1029. %% %% delete the bridge
  1030. %% {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config),
  1031. %% {ok, 200, []} = request_json(get, uri([?ROOT]), Config).
  1032. t_bridges_probe(matrix) ->
  1033. [
  1034. [single, actions]
  1035. ];
  1036. t_bridges_probe(Config) ->
  1037. {ok, 204, <<>>} = request(
  1038. post,
  1039. uri(["actions_probe"]),
  1040. ?KAFKA_BRIDGE(?BRIDGE_NAME),
  1041. Config
  1042. ),
  1043. %% second time with same name is ok since no real bridge created
  1044. {ok, 204, <<>>} = request(
  1045. post,
  1046. uri(["actions_probe"]),
  1047. ?KAFKA_BRIDGE(?BRIDGE_NAME),
  1048. Config
  1049. ),
  1050. meck:expect(?CONNECTOR_IMPL, on_start, 2, {error, on_start_error}),
  1051. ?assertMatch(
  1052. {ok, 400, #{
  1053. <<"code">> := <<"TEST_FAILED">>,
  1054. <<"message">> := _
  1055. }},
  1056. request_json(
  1057. post,
  1058. uri(["actions_probe"]),
  1059. ?KAFKA_BRIDGE(<<"broken_bridge">>, <<"brokenhost:1234">>),
  1060. Config
  1061. )
  1062. ),
  1063. meck:expect(?CONNECTOR_IMPL, on_start, 2, {ok, bridge_state}),
  1064. ?assertMatch(
  1065. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  1066. request_json(
  1067. post,
  1068. uri(["actions_probe"]),
  1069. ?RESOURCE(<<"broken_bridge">>, <<"unknown_type">>),
  1070. Config
  1071. )
  1072. ),
  1073. ok.
  1074. t_cascade_delete_actions(matrix) ->
  1075. [
  1076. [single, actions],
  1077. [cluster, actions]
  1078. ];
  1079. t_cascade_delete_actions(Config) ->
  1080. %% assert we there's no bridges at first
  1081. {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config),
  1082. %% then we add a a bridge, using POST
  1083. %% POST /actions/ will create a bridge
  1084. BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME),
  1085. {ok, 201, _} = request(
  1086. post,
  1087. uri([?ACTIONS_ROOT]),
  1088. ?KAFKA_BRIDGE(?BRIDGE_NAME),
  1089. Config
  1090. ),
  1091. {ok, 201, #{<<"id">> := RuleId}} = request_json(
  1092. post,
  1093. uri(["rules"]),
  1094. #{
  1095. <<"name">> => <<"t_http_crud_apis">>,
  1096. <<"enable">> => true,
  1097. <<"actions">> => [BridgeID],
  1098. <<"sql">> => <<"SELECT * from \"t\"">>
  1099. },
  1100. Config
  1101. ),
  1102. %% delete the bridge will also delete the actions from the rules
  1103. {ok, 204, _} = request(
  1104. delete,
  1105. uri([?ACTIONS_ROOT, BridgeID]) ++ "?also_delete_dep_actions=true",
  1106. Config
  1107. ),
  1108. {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config),
  1109. ?assertMatch(
  1110. {ok, 200, #{<<"actions">> := []}},
  1111. request_json(get, uri(["rules", RuleId]), Config)
  1112. ),
  1113. {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config),
  1114. {ok, 201, _} = request(
  1115. post,
  1116. uri([?ACTIONS_ROOT]),
  1117. ?KAFKA_BRIDGE(?BRIDGE_NAME),
  1118. Config
  1119. ),
  1120. {ok, 201, _} = request(
  1121. post,
  1122. uri(["rules"]),
  1123. #{
  1124. <<"name">> => <<"t_http_crud_apis">>,
  1125. <<"enable">> => true,
  1126. <<"actions">> => [BridgeID],
  1127. <<"sql">> => <<"SELECT * from \"t\"">>
  1128. },
  1129. Config
  1130. ),
  1131. {ok, 400, Body} = request(
  1132. delete,
  1133. uri([?ACTIONS_ROOT, BridgeID]),
  1134. Config
  1135. ),
  1136. ?assertMatch(#{<<"rules">> := [_ | _]}, emqx_utils_json:decode(Body, [return_maps])),
  1137. {ok, 200, [_]} = request_json(get, uri([?ACTIONS_ROOT]), Config),
  1138. %% Cleanup
  1139. {ok, 204, _} = request(
  1140. delete,
  1141. uri([?ACTIONS_ROOT, BridgeID]) ++ "?also_delete_dep_actions=true",
  1142. Config
  1143. ),
  1144. {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config).
  1145. t_action_types(matrix) ->
  1146. [
  1147. [single, actions],
  1148. [cluster, actions]
  1149. ];
  1150. t_action_types(Config) ->
  1151. Res = request_json(get, uri(["action_types"]), Config),
  1152. ?assertMatch({ok, 200, _}, Res),
  1153. {ok, 200, Types} = Res,
  1154. ?assert(is_list(Types), #{types => Types}),
  1155. ?assert(lists:all(fun is_binary/1, Types), #{types => Types}),
  1156. ok.
  1157. t_bad_name(matrix) ->
  1158. [
  1159. [single, actions],
  1160. [single, sources],
  1161. [cluster, actions],
  1162. [cluster, sources]
  1163. ];
  1164. t_bad_name(Config) ->
  1165. [_SingleOrCluster, Kind | _] = group_path(Config),
  1166. Name = <<"_bad_name">>,
  1167. #{
  1168. api_root_key := APIRootKey,
  1169. create_config_fn := CreateConfigFn
  1170. } = get_common_values(Kind, Name),
  1171. Res = request_json(
  1172. post,
  1173. uri([APIRootKey]),
  1174. CreateConfigFn(#{}),
  1175. Config
  1176. ),
  1177. ?assertMatch({ok, 400, #{<<"message">> := _}}, Res),
  1178. {ok, 400, #{<<"message">> := Msg0}} = Res,
  1179. Msg = emqx_utils_json:decode(Msg0, [return_maps]),
  1180. ?assertMatch(
  1181. #{
  1182. <<"kind">> := <<"validation_error">>,
  1183. <<"reason">> := <<"Invalid name format.", _/binary>>
  1184. },
  1185. Msg
  1186. ),
  1187. ok.
  1188. t_metrics(matrix) ->
  1189. [
  1190. [single, actions],
  1191. [cluster, actions]
  1192. ];
  1193. t_metrics(Config) ->
  1194. {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config),
  1195. ActionName = ?BRIDGE_NAME,
  1196. ?assertMatch(
  1197. {ok, 201, _},
  1198. request_json(
  1199. post,
  1200. uri([?ACTIONS_ROOT]),
  1201. ?KAFKA_BRIDGE(?BRIDGE_NAME),
  1202. Config
  1203. )
  1204. ),
  1205. ActionID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ActionName),
  1206. ?assertMatch(
  1207. {ok, 200, #{
  1208. <<"metrics">> := #{<<"matched">> := 0},
  1209. <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 0}} | _]
  1210. }},
  1211. request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
  1212. ),
  1213. {ok, 200, Bridge} = request_json(get, uri([?ACTIONS_ROOT, ActionID]), Config),
  1214. ?assertNot(maps:is_key(<<"metrics">>, Bridge)),
  1215. ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)),
  1216. Body = <<"my msg">>,
  1217. _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config),
  1218. %% check for non-empty bridge metrics
  1219. ?retry(
  1220. _Sleep0 = 200,
  1221. _Retries0 = 20,
  1222. ?assertMatch(
  1223. {ok, 200, #{
  1224. <<"metrics">> := #{<<"matched">> := 1},
  1225. <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 1}} | _]
  1226. }},
  1227. request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
  1228. )
  1229. ),
  1230. %% check for absence of metrics when listing all bridges
  1231. {ok, 200, Bridges} = request_json(get, uri([?ACTIONS_ROOT]), Config),
  1232. ?assertNotMatch(
  1233. [
  1234. #{
  1235. <<"metrics">> := #{},
  1236. <<"node_metrics">> := [_ | _]
  1237. }
  1238. ],
  1239. Bridges
  1240. ),
  1241. ok.
  1242. t_reset_metrics(matrix) ->
  1243. [
  1244. [single, actions],
  1245. [cluster, actions]
  1246. ];
  1247. t_reset_metrics(Config) ->
  1248. %% assert there's no bridges at first
  1249. {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config),
  1250. ActionName = ?BRIDGE_NAME,
  1251. ?assertMatch(
  1252. {ok, 201, _},
  1253. request_json(
  1254. post,
  1255. uri([?ACTIONS_ROOT]),
  1256. ?KAFKA_BRIDGE(?BRIDGE_NAME),
  1257. Config
  1258. )
  1259. ),
  1260. ActionID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ActionName),
  1261. Body = <<"my msg">>,
  1262. OtherNode = maybe_get_other_node(Config),
  1263. _ = publish_message(?MQTT_LOCAL_TOPIC, Body, OtherNode, Config),
  1264. ?retry(
  1265. _Sleep0 = 200,
  1266. _Retries0 = 20,
  1267. ?assertMatch(
  1268. {ok, 200, #{
  1269. <<"metrics">> := #{<<"matched">> := 1},
  1270. <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
  1271. }},
  1272. request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
  1273. )
  1274. ),
  1275. {ok, 204, <<>>} = request(put, uri([?ACTIONS_ROOT, ActionID, "metrics", "reset"]), Config),
  1276. Res = ?retry(
  1277. _Sleep0 = 200,
  1278. _Retries0 = 20,
  1279. begin
  1280. Res0 = request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config),
  1281. ?assertMatch(
  1282. {ok, 200, #{
  1283. <<"metrics">> := #{<<"matched">> := 0},
  1284. <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
  1285. }},
  1286. Res0
  1287. ),
  1288. Res0
  1289. end
  1290. ),
  1291. {ok, 200, #{<<"node_metrics">> := NodeMetrics}} = Res,
  1292. ?assert(
  1293. lists:all(
  1294. fun(#{<<"metrics">> := #{<<"matched">> := Matched}}) ->
  1295. Matched == 0
  1296. end,
  1297. NodeMetrics
  1298. ),
  1299. #{node_metrics => NodeMetrics}
  1300. ),
  1301. ok.
  1302. t_cluster_later_join_metrics(matrix) ->
  1303. [
  1304. [cluster_later_join, actions]
  1305. ];
  1306. t_cluster_later_join_metrics(Config) ->
  1307. [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config),
  1308. Name = ?BRIDGE_NAME,
  1309. ActionParams = ?KAFKA_BRIDGE(Name),
  1310. ActionID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, Name),
  1311. ?check_trace(
  1312. begin
  1313. %% Create a bridge on only one of the nodes.
  1314. ?assertMatch(
  1315. {ok, 201, _}, request_json(post, uri([?ACTIONS_ROOT]), ActionParams, Config)
  1316. ),
  1317. %% Pre-condition.
  1318. ?assertMatch(
  1319. {ok, 200, #{
  1320. <<"metrics">> := #{<<"success">> := _},
  1321. <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
  1322. }},
  1323. request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
  1324. ),
  1325. %% Now join the other node join with the api node.
  1326. ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
  1327. %% Check metrics; shouldn't crash even if the bridge is not
  1328. %% ready on the node that just joined the cluster.
  1329. ?assertMatch(
  1330. {ok, 200, #{
  1331. <<"metrics">> := #{<<"success">> := _},
  1332. <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
  1333. }},
  1334. request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
  1335. ),
  1336. ok
  1337. end,
  1338. []
  1339. ),
  1340. ok.
  1341. t_raw_config_response_defaults(matrix) ->
  1342. [
  1343. [single, actions],
  1344. [single, sources],
  1345. [cluster, actions],
  1346. [cluster, sources]
  1347. ];
  1348. t_raw_config_response_defaults(Config) ->
  1349. [_SingleOrCluster, Kind | _] = group_path(Config),
  1350. Name = atom_to_binary(?FUNCTION_NAME),
  1351. #{
  1352. api_root_key := APIRootKey,
  1353. create_config_fn := CreateConfigFn
  1354. } = get_common_values(Kind, Name),
  1355. Params = maps:remove(<<"enable">>, CreateConfigFn(#{})),
  1356. ?assertMatch(
  1357. {ok, 201, #{<<"enable">> := true}},
  1358. request_json(
  1359. post,
  1360. uri([APIRootKey]),
  1361. Params,
  1362. Config
  1363. )
  1364. ),
  1365. ok.
  1366. t_older_version_nodes_in_cluster(matrix) ->
  1367. [
  1368. [cluster, actions],
  1369. [cluster, sources]
  1370. ];
  1371. t_older_version_nodes_in_cluster(Config) ->
  1372. [_, Kind | _] = group_path(Config),
  1373. PrimaryNode = ?config(node, Config),
  1374. OtherNode = maybe_get_other_node(Config),
  1375. ?assertNotEqual(OtherNode, PrimaryNode),
  1376. Name = atom_to_binary(?FUNCTION_NAME),
  1377. ?check_trace(
  1378. begin
  1379. #{api_root_key := APIRootKey} = get_common_values(Kind, Name),
  1380. erpc:call(PrimaryNode, fun() ->
  1381. meck:new(emqx_bpapi, [no_history, passthrough, no_link]),
  1382. meck:expect(emqx_bpapi, supported_version, fun(N, Api) ->
  1383. case N =:= OtherNode of
  1384. true -> 1;
  1385. false -> meck:passthrough([N, Api])
  1386. end
  1387. end)
  1388. end),
  1389. erpc:call(OtherNode, fun() ->
  1390. meck:new(emqx_bridge_v2, [no_history, passthrough, no_link]),
  1391. meck:expect(emqx_bridge_v2, list, fun(_ConfRootKey) ->
  1392. error(should_not_be_called)
  1393. end)
  1394. end),
  1395. ?assertMatch(
  1396. {ok, 200, _},
  1397. request_json(
  1398. get,
  1399. uri([APIRootKey]),
  1400. Config
  1401. )
  1402. ),
  1403. ok
  1404. end,
  1405. []
  1406. ),
  1407. ok.