emqx_ft_api_SUITE.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_api_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("stdlib/include/assert.hrl").
  21. -define(SECRET_ACCESS_KEY, <<"fake_secret_access_key">>).
  22. -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
  23. all() -> emqx_common_test_helpers:all(?MODULE).
  24. suite() ->
  25. [{timetrap, {seconds, 90}}].
  26. init_per_suite(Config) ->
  27. WorkDir = ?config(priv_dir, Config),
  28. Cluster = mk_cluster_specs(Config),
  29. Nodes = [Node1 | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
  30. {ok, App} = erpc:call(Node1, emqx_common_test_http, create_default_app, []),
  31. [{cluster_nodes, Nodes}, {api, App} | Config].
  32. end_per_suite(Config) ->
  33. ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)).
  34. mk_cluster_specs(_Config) ->
  35. Apps = [
  36. emqx_conf,
  37. {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
  38. {emqx_ft, "file_transfer { enable = true }"},
  39. {emqx_management, #{}}
  40. ],
  41. DashboardConfig =
  42. "dashboard { \n"
  43. " listeners.http { enable = true, bind = 0 } \n"
  44. " default_username = \"\" \n"
  45. " default_password = \"\" \n"
  46. "}\n",
  47. [
  48. {emqx_ft_api_SUITE1, #{
  49. role => core,
  50. apps => Apps ++
  51. [
  52. {emqx_dashboard, DashboardConfig ++ "dashboard.listeners.http.bind = 18083"}
  53. ]
  54. }},
  55. {emqx_ft_api_SUITE2, #{
  56. role => core,
  57. apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
  58. }},
  59. {emqx_ft_api_SUITE3, #{
  60. role => replicant,
  61. apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
  62. }}
  63. ].
  64. init_per_testcase(Case, Config) ->
  65. [{tc, Case} | Config].
  66. end_per_testcase(_Case, Config) ->
  67. ok = reset_ft_config(Config, true),
  68. ok.
  69. %%--------------------------------------------------------------------
  70. %% Tests
  71. %%--------------------------------------------------------------------
  72. t_list_files(Config) ->
  73. ClientId = client_id(Config),
  74. FileId = <<"f1">>,
  75. Node = lists:last(test_nodes(Config)),
  76. ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, "f1", <<"data">>, Node),
  77. {ok, 200, #{<<"files">> := Files}} =
  78. request_json(get, uri(["file_transfer", "files"]), Config),
  79. ?assertMatch(
  80. [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
  81. [File || File = #{<<"clientid">> := CId} <- Files, CId == ClientId]
  82. ),
  83. {ok, 200, #{<<"files">> := FilesTransfer}} =
  84. request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
  85. ?assertMatch(
  86. [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
  87. FilesTransfer
  88. ),
  89. ?assertMatch(
  90. {ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
  91. request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]), Config)
  92. ).
  93. t_download_transfer(Config) ->
  94. ClientId = client_id(Config),
  95. FileId = <<"f1">>,
  96. Nodes = [Node | _] = test_nodes(Config),
  97. NodeUpload = lists:last(Nodes),
  98. ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, "f1", <<"data">>, NodeUpload),
  99. ?assertMatch(
  100. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  101. request_json(
  102. get,
  103. uri(["file_transfer", "file"]) ++ query(#{fileref => FileId}),
  104. Config
  105. )
  106. ),
  107. ?assertMatch(
  108. {ok, 503, _},
  109. request(
  110. get,
  111. uri(["file_transfer", "file"]) ++
  112. query(#{fileref => FileId, node => <<"nonode@nohost">>}),
  113. Config
  114. )
  115. ),
  116. ?assertMatch(
  117. {ok, 404, _},
  118. request(
  119. get,
  120. uri(["file_transfer", "file"]) ++
  121. query(#{fileref => <<"unknown_file">>, node => Node}),
  122. Config
  123. )
  124. ),
  125. ?assertMatch(
  126. {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
  127. request_json(
  128. get,
  129. uri(["file_transfer", "file"]) ++
  130. query(#{fileref => <<>>, node => Node}),
  131. Config
  132. )
  133. ),
  134. ?assertMatch(
  135. {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
  136. request_json(
  137. get,
  138. uri(["file_transfer", "file"]) ++
  139. query(#{fileref => <<"/etc/passwd">>, node => Node}),
  140. Config
  141. )
  142. ),
  143. {ok, 200, #{<<"files">> := [File]}} =
  144. request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
  145. {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File), Config),
  146. ?assertEqual(
  147. <<"data">>,
  148. Response
  149. ).
  150. t_list_files_paging(Config) ->
  151. ClientId = client_id(Config),
  152. NFiles = 20,
  153. Nodes = test_nodes(Config),
  154. Uploads = [
  155. {mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)}
  156. || N <- lists:seq(1, NFiles)
  157. ],
  158. ok = lists:foreach(
  159. fun({FileId, Name, Node}) ->
  160. ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, Name, <<"data">>, Node)
  161. end,
  162. Uploads
  163. ),
  164. ?assertMatch(
  165. {ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
  166. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}), Config)
  167. ),
  168. {ok, 200, #{<<"files">> := Files}} =
  169. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config),
  170. ?assert(length(Files) >= NFiles),
  171. ?assertNotMatch(
  172. {ok, 200, #{<<"cursor">> := _}},
  173. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config)
  174. ),
  175. ?assertMatch(
  176. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  177. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}), Config)
  178. ),
  179. ?assertMatch(
  180. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  181. request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}), Config)
  182. ),
  183. ?assertMatch(
  184. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  185. request_json(
  186. get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}), Config
  187. )
  188. ),
  189. ?assertMatch(
  190. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  191. request_json(
  192. get,
  193. uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>}),
  194. Config
  195. )
  196. ),
  197. PageThrough = fun PageThrough(Query, Acc) ->
  198. case request_json(get, uri(["file_transfer", "files"]) ++ query(Query), Config) of
  199. {ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
  200. PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
  201. {ok, 200, #{<<"files">> := FilesPage}} ->
  202. Acc ++ FilesPage
  203. end
  204. end,
  205. ?assertEqual(Files, PageThrough(#{limit => 1}, [])),
  206. ?assertEqual(Files, PageThrough(#{limit => 8}, [])),
  207. ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
  208. t_ft_disabled(Config) ->
  209. ?assertMatch(
  210. {ok, 200, _},
  211. request_json(get, uri(["file_transfer", "files"]), Config)
  212. ),
  213. ?assertMatch(
  214. {ok, 400, _},
  215. request_json(
  216. get,
  217. uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
  218. Config
  219. )
  220. ),
  221. ok = reset_ft_config(Config, false),
  222. ?assertMatch(
  223. {ok, 503, _},
  224. request_json(get, uri(["file_transfer", "files"]), Config)
  225. ),
  226. ?assertMatch(
  227. {ok, 503, _},
  228. request_json(
  229. get,
  230. uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()}),
  231. Config
  232. )
  233. ).
  234. t_configure_file_transfer(Config) ->
  235. Uri = uri(["file_transfer"]),
  236. test_configure(Uri, Config).
  237. t_configure_config_file_transfer(Config) ->
  238. Uri = uri(["configs/file_transfer"]),
  239. test_configure(Uri, Config).
  240. test_configure(Uri, Config) ->
  241. ?assertMatch(
  242. {ok, 200, #{
  243. <<"enable">> := true,
  244. <<"storage">> :=
  245. #{
  246. <<"local">> :=
  247. #{
  248. <<"enable">> := true,
  249. <<"segments">> :=
  250. #{
  251. <<"gc">> :=
  252. #{
  253. %% Match keep the raw conf
  254. %% 1h is not change to 3600000
  255. <<"interval">> := <<"1h">>,
  256. <<"maximum_segments_ttl">> := <<"24h">>,
  257. <<"minimum_segments_ttl">> := <<"5m">>
  258. }
  259. }
  260. }
  261. }
  262. }},
  263. request_json(get, Uri, Config)
  264. ),
  265. ?assertMatch(
  266. {ok, 200, #{<<"enable">> := false}},
  267. request_json(put, Uri, #{<<"enable">> => false}, Config)
  268. ),
  269. ?assertMatch(
  270. {ok, 200, #{<<"enable">> := false}},
  271. request_json(get, Uri, Config)
  272. ),
  273. Storage0 = emqx_ft_test_helpers:local_storage(Config),
  274. Storage = emqx_utils_maps:deep_put(
  275. [
  276. <<"local">>,
  277. <<"segments">>,
  278. <<"gc">>,
  279. <<"maximum_segments_ttl">>
  280. ],
  281. Storage0,
  282. <<"10m">>
  283. ),
  284. ?assertMatch(
  285. {ok, 200, #{
  286. <<"storage">> :=
  287. #{
  288. <<"local">> :=
  289. #{
  290. <<"segments">> :=
  291. #{
  292. <<"gc">> :=
  293. #{
  294. <<"interval">> := <<"1h">>,
  295. %% Match keep the raw conf
  296. %% 10m is not change to 600,000
  297. <<"maximum_segments_ttl">> := <<"10m">>,
  298. <<"minimum_segments_ttl">> := <<"5m">>
  299. }
  300. }
  301. }
  302. }
  303. }},
  304. request_json(
  305. put,
  306. Uri,
  307. #{
  308. <<"enable">> => true,
  309. <<"storage">> => Storage
  310. },
  311. Config
  312. )
  313. ),
  314. ?assertMatch(
  315. {ok, 400, _},
  316. request(
  317. put,
  318. Uri,
  319. #{
  320. <<"enable">> => true,
  321. <<"storage">> => #{
  322. <<"local">> => #{},
  323. <<"remote">> => #{}
  324. }
  325. },
  326. Config
  327. )
  328. ),
  329. ?assertMatch(
  330. {ok, 400, _},
  331. request(
  332. put,
  333. Uri,
  334. #{
  335. <<"enable">> => true,
  336. <<"storage">> => #{
  337. <<"local">> => #{
  338. <<"gc">> => #{<<"interval">> => -42}
  339. }
  340. }
  341. },
  342. Config
  343. )
  344. ),
  345. S3Exporter = #{
  346. <<"host">> => <<"localhost">>,
  347. <<"port">> => 9000,
  348. <<"bucket">> => <<"emqx">>,
  349. <<"url_expire_time">> => <<"2h">>,
  350. <<"secret_access_key">> => ?SECRET_ACCESS_KEY,
  351. <<"transport_options">> => #{
  352. <<"ssl">> => #{
  353. <<"enable">> => true,
  354. <<"certfile">> => emqx_ft_test_helpers:pem_privkey(),
  355. <<"keyfile">> => emqx_ft_test_helpers:pem_privkey()
  356. }
  357. }
  358. },
  359. {ok, 200, GetConfigJson} =
  360. request_json(
  361. put,
  362. Uri,
  363. #{
  364. <<"enable">> => true,
  365. <<"storage">> => #{
  366. <<"local">> => #{
  367. <<"exporter">> => #{
  368. <<"s3">> => S3Exporter
  369. }
  370. }
  371. }
  372. },
  373. Config
  374. ),
  375. ?assertMatch(
  376. #{
  377. <<"enable">> := true,
  378. <<"storage">> := #{
  379. <<"local">> := #{
  380. <<"exporter">> := #{
  381. <<"s3">> := #{
  382. <<"transport_options">> := #{
  383. <<"ssl">> := SSL = #{
  384. <<"enable">> := true,
  385. <<"certfile">> := <<"/", _CertFilepath/bytes>>,
  386. <<"keyfile">> := <<"/", _KeyFilepath/bytes>>
  387. }
  388. },
  389. %% ensure 2h is unchanged
  390. <<"url_expire_time">> := <<"2h">>,
  391. <<"secret_access_key">> := <<"******">>
  392. }
  393. }
  394. }
  395. }
  396. } when not is_map_key(<<"password">>, SSL),
  397. GetConfigJson
  398. ),
  399. ?assertMatch(
  400. {ok, 400, _},
  401. request_json(
  402. put,
  403. Uri,
  404. #{
  405. <<"enable">> => true,
  406. <<"storage">> => #{
  407. <<"local">> => #{
  408. <<"exporter">> => #{
  409. <<"s3">> => emqx_utils_maps:deep_put(
  410. [<<"transport_options">>, <<"ssl">>, <<"keyfile">>],
  411. S3Exporter,
  412. <<>>
  413. )
  414. }
  415. }
  416. }
  417. },
  418. Config
  419. )
  420. ),
  421. ?assertMatch(
  422. {ok, 200, #{}},
  423. request_json(
  424. put,
  425. Uri,
  426. emqx_utils_maps:deep_merge(
  427. GetConfigJson,
  428. #{
  429. <<"enable">> => true,
  430. <<"storage">> => #{
  431. <<"local">> => #{
  432. <<"exporter">> => #{
  433. <<"s3">> => emqx_utils_maps:deep_put(
  434. [<<"transport_options">>, <<"ssl">>, <<"enable">>],
  435. S3Exporter,
  436. false
  437. )
  438. }
  439. }
  440. }
  441. }
  442. ),
  443. Config
  444. )
  445. ),
  446. %% put secret as ******, check the secret is unchanged
  447. ?assertMatch(
  448. #{
  449. <<"storage">> :=
  450. #{
  451. <<"local">> :=
  452. #{
  453. <<"enable">> := true,
  454. <<"exporter">> :=
  455. #{
  456. <<"s3">> :=
  457. #{
  458. <<"enable">> := true,
  459. <<"secret_access_key">> := ?SECRET_ACCESS_KEY
  460. }
  461. }
  462. }
  463. }
  464. },
  465. get_ft_config(Config)
  466. ),
  467. ok.
  468. %%--------------------------------------------------------------------
  469. %% Helpers
  470. %%--------------------------------------------------------------------
  471. test_nodes(Config) ->
  472. ?config(cluster_nodes, Config).
  473. client_id(Config) ->
  474. iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])).
  475. mk_file_id(Prefix, N) ->
  476. iolist_to_binary([Prefix, integer_to_list(N)]).
  477. mk_file_name(N) ->
  478. "file." ++ integer_to_list(N).
  479. request(Method, Url, Config) ->
  480. request(Method, Url, [], Config).
  481. request(Method, Url, Body, Config) ->
  482. Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
  483. request(Method, Url, Body, Opts, Config).
  484. request(Method, Url, Body, Opts, Config) ->
  485. emqx_mgmt_api_test_util:request_api(Method, Url, [], auth_header(Config), Body, Opts).
  486. request_json(Method, Url, Body, Config) ->
  487. case request(Method, Url, Body, Config) of
  488. {ok, Code, RespBody} ->
  489. {ok, Code, json(RespBody)};
  490. Otherwise ->
  491. Otherwise
  492. end.
  493. request_json(Method, Url, Config) ->
  494. request_json(Method, Url, [], Config).
  495. json(Body) when is_binary(Body) ->
  496. try
  497. emqx_utils_json:decode(Body, [return_maps])
  498. catch
  499. _:_ ->
  500. error({bad_json, Body})
  501. end.
  502. query(Params) ->
  503. KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
  504. "?" ++ string:join(KVs, "&").
  505. auth_header(Config) ->
  506. #{api_key := ApiKey, api_secret := Secret} = ?config(api, Config),
  507. emqx_common_test_http:auth_header(binary_to_list(ApiKey), binary_to_list(Secret)).
  508. uri_encode(T) ->
  509. emqx_http_lib:uri_encode(to_list(T)).
  510. to_list(A) when is_atom(A) ->
  511. atom_to_list(A);
  512. to_list(A) when is_integer(A) ->
  513. integer_to_list(A);
  514. to_list(B) when is_binary(B) ->
  515. binary_to_list(B);
  516. to_list(L) when is_list(L) ->
  517. L.
  518. pick(N, List) ->
  519. lists:nth(1 + (N rem length(List)), List).
  520. reset_ft_config(Config, Enable) ->
  521. [Node | _] = test_nodes(Config),
  522. LocalConfig =
  523. #{
  524. <<"enable">> => Enable,
  525. <<"storage">> => #{
  526. <<"local">> => #{
  527. <<"enable">> => true,
  528. <<"segments">> => #{
  529. <<"gc">> => #{
  530. <<"interval">> => <<"1h">>,
  531. <<"maximum_segments_ttl">> => "24h",
  532. <<"minimum_segments_ttl">> => "5m"
  533. }
  534. }
  535. }
  536. }
  537. },
  538. {ok, _} = rpc:call(Node, emqx_ft_conf, update, [LocalConfig]),
  539. ok.
  540. get_ft_config(Config) ->
  541. [Node | _] = test_nodes(Config),
  542. rpc:call(Node, emqx_ft_conf, get_raw, []).