emqx_ft_api_SUITE.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_api_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("stdlib/include/assert.hrl").
  21. -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
  22. all() -> emqx_common_test_helpers:all(?MODULE).
  23. suite() ->
  24. [{timetrap, {seconds, 90}}].
  25. init_per_suite(Config) ->
  26. WorkDir = ?config(priv_dir, Config),
  27. Cluster = mk_cluster_specs(Config),
  28. Nodes = [Node1 | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
  29. {ok, App} = erpc:call(Node1, emqx_common_test_http, create_default_app, []),
  30. [{cluster_nodes, Nodes}, {api, App} | Config].
  31. end_per_suite(Config) ->
  32. ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)).
  33. mk_cluster_specs(_Config) ->
  34. Apps = [
  35. emqx_conf,
  36. {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
  37. {emqx_ft, "file_transfer { enable = true }"},
  38. {emqx_management, #{}}
  39. ],
  40. DashboardConfig =
  41. "dashboard { \n"
  42. " listeners.http { enable = true, bind = 0 } \n"
  43. " default_username = \"\" \n"
  44. " default_password = \"\" \n"
  45. "}\n",
  46. [
  47. {emqx_ft_api_SUITE1, #{
  48. role => core,
  49. apps => Apps ++
  50. [
  51. {emqx_dashboard, DashboardConfig ++ "dashboard.listeners.http.bind = 18083"}
  52. ]
  53. }},
  54. {emqx_ft_api_SUITE2, #{
  55. role => core,
  56. apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
  57. }},
  58. {emqx_ft_api_SUITE3, #{
  59. role => replicant,
  60. apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
  61. }}
  62. ].
  63. init_per_testcase(Case, Config) ->
  64. [{tc, Case} | Config].
  65. end_per_testcase(_Case, Config) ->
  66. ok = reset_ft_config(Config, true),
  67. ok.
  68. %%--------------------------------------------------------------------
  69. %% Tests
  70. %%--------------------------------------------------------------------
  71. t_list_files(Config) ->
  72. ClientId = client_id(Config),
  73. FileId = <<"f1">>,
  74. Node = lists:last(test_nodes(Config)),
  75. ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
  76. {ok, 200, #{<<"files">> := Files}} =
  77. request_json(get, uri(["file_transfer", "files"]), Config),
  78. ?assertMatch(
  79. [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
  80. [File || File = #{<<"clientid">> := CId} <- Files, CId == ClientId]
  81. ),
  82. {ok, 200, #{<<"files">> := FilesTransfer}} =
  83. request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
  84. ?assertMatch(
  85. [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
  86. FilesTransfer
  87. ),
  88. ?assertMatch(
  89. {ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
  90. request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]), Config)
  91. ).
  92. t_download_transfer(Config) ->
  93. ClientId = client_id(Config),
  94. FileId = <<"f1">>,
  95. Nodes = [Node | _] = test_nodes(Config),
  96. NodeUpload = lists:last(Nodes),
  97. ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, NodeUpload),
  98. ?assertMatch(
  99. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  100. request_json(
  101. get,
  102. uri(["file_transfer", "file"]) ++ query(#{fileref => FileId}),
  103. Config
  104. )
  105. ),
  106. ?assertMatch(
  107. {ok, 503, _},
  108. request(
  109. get,
  110. uri(["file_transfer", "file"]) ++
  111. query(#{fileref => FileId, node => <<"nonode@nohost">>}),
  112. Config
  113. )
  114. ),
  115. ?assertMatch(
  116. {ok, 404, _},
  117. request(
  118. get,
  119. uri(["file_transfer", "file"]) ++
  120. query(#{fileref => <<"unknown_file">>, node => Node}),
  121. Config
  122. )
  123. ),
  124. ?assertMatch(
  125. {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
  126. request_json(
  127. get,
  128. uri(["file_transfer", "file"]) ++
  129. query(#{fileref => <<>>, node => Node}),
  130. Config
  131. )
  132. ),
  133. ?assertMatch(
  134. {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
  135. request_json(
  136. get,
  137. uri(["file_transfer", "file"]) ++
  138. query(#{fileref => <<"/etc/passwd">>, node => Node}),
  139. Config
  140. )
  141. ),
  142. {ok, 200, #{<<"files">> := [File]}} =
  143. request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
  144. {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File), Config),
  145. ?assertEqual(
  146. <<"data">>,
  147. Response
  148. ).
  149. t_list_files_paging(Config) ->
  150. ClientId = client_id(Config),
  151. NFiles = 20,
  152. Nodes = test_nodes(Config),
  153. Uploads = [
  154. {mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)}
  155. || N <- lists:seq(1, NFiles)
  156. ],
  157. ok = lists:foreach(
  158. fun({FileId, Name, Node}) ->
  159. ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, <<"data">>, Node)
  160. end,
  161. Uploads
  162. ),
  163. ?assertMatch(
  164. {ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
  165. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}), Config)
  166. ),
  167. {ok, 200, #{<<"files">> := Files}} =
  168. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config),
  169. ?assert(length(Files) >= NFiles),
  170. ?assertNotMatch(
  171. {ok, 200, #{<<"cursor">> := _}},
  172. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config)
  173. ),
  174. ?assertMatch(
  175. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  176. request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}), Config)
  177. ),
  178. ?assertMatch(
  179. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  180. request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}), Config)
  181. ),
  182. ?assertMatch(
  183. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  184. request_json(
  185. get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}), Config
  186. )
  187. ),
  188. ?assertMatch(
  189. {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
  190. request_json(
  191. get,
  192. uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>}),
  193. Config
  194. )
  195. ),
  196. PageThrough = fun PageThrough(Query, Acc) ->
  197. case request_json(get, uri(["file_transfer", "files"]) ++ query(Query), Config) of
  198. {ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
  199. PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
  200. {ok, 200, #{<<"files">> := FilesPage}} ->
  201. Acc ++ FilesPage
  202. end
  203. end,
  204. ?assertEqual(Files, PageThrough(#{limit => 1}, [])),
  205. ?assertEqual(Files, PageThrough(#{limit => 8}, [])),
  206. ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
  207. t_ft_disabled(Config) ->
  208. ?assertMatch(
  209. {ok, 200, _},
  210. request_json(get, uri(["file_transfer", "files"]), Config)
  211. ),
  212. ?assertMatch(
  213. {ok, 400, _},
  214. request_json(
  215. get,
  216. uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
  217. Config
  218. )
  219. ),
  220. ok = reset_ft_config(Config, false),
  221. ?assertMatch(
  222. {ok, 503, _},
  223. request_json(get, uri(["file_transfer", "files"]), Config)
  224. ),
  225. ?assertMatch(
  226. {ok, 503, _},
  227. request_json(
  228. get,
  229. uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()}),
  230. Config
  231. )
  232. ).
  233. t_configure(Config) ->
  234. ?assertMatch(
  235. {ok, 200, #{<<"enable">> := true, <<"storage">> := #{}}},
  236. request_json(get, uri(["file_transfer"]), Config)
  237. ),
  238. ?assertMatch(
  239. {ok, 200, #{<<"enable">> := false}},
  240. request_json(put, uri(["file_transfer"]), #{<<"enable">> => false}, Config)
  241. ),
  242. ?assertMatch(
  243. {ok, 200, #{<<"enable">> := false}},
  244. request_json(get, uri(["file_transfer"]), Config)
  245. ),
  246. ?assertMatch(
  247. {ok, 200, #{}},
  248. request_json(
  249. put,
  250. uri(["file_transfer"]),
  251. #{
  252. <<"enable">> => true,
  253. <<"storage">> => emqx_ft_test_helpers:local_storage(Config)
  254. },
  255. Config
  256. )
  257. ),
  258. ?assertMatch(
  259. {ok, 400, _},
  260. request(
  261. put,
  262. uri(["file_transfer"]),
  263. #{
  264. <<"enable">> => true,
  265. <<"storage">> => #{
  266. <<"local">> => #{},
  267. <<"remote">> => #{}
  268. }
  269. },
  270. Config
  271. )
  272. ),
  273. ?assertMatch(
  274. {ok, 400, _},
  275. request(
  276. put,
  277. uri(["file_transfer"]),
  278. #{
  279. <<"enable">> => true,
  280. <<"storage">> => #{
  281. <<"local">> => #{
  282. <<"gc">> => #{<<"interval">> => -42}
  283. }
  284. }
  285. },
  286. Config
  287. )
  288. ),
  289. S3Exporter = #{
  290. <<"host">> => <<"localhost">>,
  291. <<"port">> => 9000,
  292. <<"bucket">> => <<"emqx">>,
  293. <<"transport_options">> => #{
  294. <<"ssl">> => #{
  295. <<"enable">> => true,
  296. <<"certfile">> => emqx_ft_test_helpers:pem_privkey(),
  297. <<"keyfile">> => emqx_ft_test_helpers:pem_privkey()
  298. }
  299. }
  300. },
  301. ?assertMatch(
  302. {ok, 200, #{
  303. <<"enable">> := true,
  304. <<"storage">> := #{
  305. <<"local">> := #{
  306. <<"exporter">> := #{
  307. <<"s3">> := #{
  308. <<"transport_options">> := #{
  309. <<"ssl">> := #{
  310. <<"enable">> := true,
  311. <<"certfile">> := <<"/", _CertFilepath/bytes>>,
  312. <<"keyfile">> := <<"/", _KeyFilepath/bytes>>
  313. }
  314. }
  315. }
  316. }
  317. }
  318. }
  319. }},
  320. request_json(
  321. put,
  322. uri(["file_transfer"]),
  323. #{
  324. <<"enable">> => true,
  325. <<"storage">> => #{
  326. <<"local">> => #{
  327. <<"exporter">> => #{
  328. <<"s3">> => S3Exporter
  329. }
  330. }
  331. }
  332. },
  333. Config
  334. )
  335. ),
  336. ?assertMatch(
  337. {ok, 400, _},
  338. request_json(
  339. put,
  340. uri(["file_transfer"]),
  341. #{
  342. <<"enable">> => true,
  343. <<"storage">> => #{
  344. <<"local">> => #{
  345. <<"exporter">> => #{
  346. <<"s3">> => emqx_utils_maps:deep_put(
  347. [<<"transport_options">>, <<"ssl">>, <<"keyfile">>],
  348. S3Exporter,
  349. <<>>
  350. )
  351. }
  352. }
  353. }
  354. },
  355. Config
  356. )
  357. ),
  358. ?assertMatch(
  359. {ok, 200, #{}},
  360. request_json(
  361. put,
  362. uri(["file_transfer"]),
  363. #{
  364. <<"enable">> => true,
  365. <<"storage">> => #{
  366. <<"local">> => #{
  367. <<"exporter">> => #{
  368. <<"s3">> => emqx_utils_maps:deep_put(
  369. [<<"transport_options">>, <<"ssl">>, <<"enable">>],
  370. S3Exporter,
  371. false
  372. )
  373. }
  374. }
  375. }
  376. },
  377. Config
  378. )
  379. ),
  380. ok.
  381. %%--------------------------------------------------------------------
  382. %% Helpers
  383. %%--------------------------------------------------------------------
  384. test_nodes(Config) ->
  385. ?config(cluster_nodes, Config).
  386. client_id(Config) ->
  387. iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])).
  388. mk_file_id(Prefix, N) ->
  389. iolist_to_binary([Prefix, integer_to_list(N)]).
  390. mk_file_name(N) ->
  391. "file." ++ integer_to_list(N).
  392. request(Method, Url, Config) ->
  393. request(Method, Url, [], Config).
  394. request(Method, Url, Body, Config) ->
  395. Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
  396. request(Method, Url, Body, Opts, Config).
  397. request(Method, Url, Body, Opts, Config) ->
  398. emqx_mgmt_api_test_util:request_api(Method, Url, [], auth_header(Config), Body, Opts).
  399. request_json(Method, Url, Body, Config) ->
  400. case request(Method, Url, Body, Config) of
  401. {ok, Code, RespBody} ->
  402. {ok, Code, json(RespBody)};
  403. Otherwise ->
  404. Otherwise
  405. end.
  406. request_json(Method, Url, Config) ->
  407. request_json(Method, Url, [], Config).
  408. json(Body) when is_binary(Body) ->
  409. emqx_utils_json:decode(Body, [return_maps]).
  410. query(Params) ->
  411. KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
  412. "?" ++ string:join(KVs, "&").
  413. auth_header(Config) ->
  414. #{api_key := ApiKey, api_secret := Secret} = ?config(api, Config),
  415. emqx_common_test_http:auth_header(binary_to_list(ApiKey), binary_to_list(Secret)).
  416. uri_encode(T) ->
  417. emqx_http_lib:uri_encode(to_list(T)).
  418. to_list(A) when is_atom(A) ->
  419. atom_to_list(A);
  420. to_list(A) when is_integer(A) ->
  421. integer_to_list(A);
  422. to_list(B) when is_binary(B) ->
  423. binary_to_list(B);
  424. to_list(L) when is_list(L) ->
  425. L.
  426. pick(N, List) ->
  427. lists:nth(1 + (N rem length(List)), List).
  428. reset_ft_config(Config, Enable) ->
  429. [Node | _] = test_nodes(Config),
  430. LocalConfig =
  431. #{
  432. <<"enable">> => Enable,
  433. <<"storage">> => #{
  434. <<"local">> => #{
  435. <<"enable">> => true
  436. }
  437. }
  438. },
  439. {ok, _} = rpc:call(Node, emqx_ft_conf, update, [LocalConfig]),
  440. ok.