emqx_mgmt_api_data_backup_SUITE.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_mgmt_api_data_backup_SUITE).
  16. -compile(export_all).
  17. -compile(nowarn_export_all).
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -define(NODE1_PORT, 18085).
  22. -define(NODE2_PORT, 18086).
  23. -define(NODE3_PORT, 18087).
  24. -define(api_base_url(_Port_), ("http://127.0.0.1:" ++ (integer_to_list(_Port_)))).
  25. -define(UPLOAD_EE_BACKUP, "emqx-export-upload-ee.tar.gz").
  26. -define(UPLOAD_CE_BACKUP, "emqx-export-upload-ce.tar.gz").
  27. -define(BAD_UPLOAD_BACKUP, "emqx-export-bad-upload.tar.gz").
  28. -define(BAD_IMPORT_BACKUP, "emqx-export-bad-file.tar.gz").
  29. -define(backup_path(_Config_, _BackupName_),
  30. filename:join(?config(data_dir, _Config_), _BackupName_)
  31. ).
  32. all() ->
  33. emqx_common_test_helpers:all(?MODULE).
  34. init_per_suite(Config) ->
  35. Config.
  36. end_per_suite(_) ->
  37. ok.
  38. init_per_testcase(TC, Config) when
  39. TC =:= t_upload_ee_backup;
  40. TC =:= t_import_ee_backup
  41. ->
  42. case emqx_release:edition() of
  43. ee -> do_init_per_testcase(TC, Config);
  44. ce -> Config
  45. end;
  46. init_per_testcase(TC, Config) ->
  47. do_init_per_testcase(TC, Config).
  48. end_per_testcase(_TC, Config) ->
  49. case ?config(cluster, Config) of
  50. undefined -> ok;
  51. Cluster -> emqx_cth_cluster:stop(Cluster)
  52. end.
  53. t_export_backup(Config) ->
  54. Auth = ?config(auth, Config),
  55. export_test(?NODE1_PORT, Auth),
  56. export_test(?NODE2_PORT, Auth),
  57. export_test(?NODE3_PORT, Auth).
  58. t_delete_backup(Config) ->
  59. test_file_op(delete, Config).
  60. t_get_backup(Config) ->
  61. test_file_op(get, Config).
  62. t_list_backups(Config) ->
  63. Auth = ?config(auth, Config),
  64. [{ok, _} = export_backup(?NODE1_PORT, Auth) || _ <- lists:seq(1, 10)],
  65. [{ok, _} = export_backup(?NODE2_PORT, Auth) || _ <- lists:seq(1, 10)],
  66. {ok, RespBody} = list_backups(?NODE1_PORT, Auth, <<"1">>, <<"100">>),
  67. #{<<"data">> := Data, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
  68. RespBody
  69. ),
  70. ?assertEqual(20, length(Data)),
  71. {ok, EmptyRespBody} = list_backups(?NODE2_PORT, Auth, <<"2">>, <<"100">>),
  72. #{<<"data">> := EmptyData, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
  73. EmptyRespBody
  74. ),
  75. ?assertEqual(0, length(EmptyData)),
  76. {ok, RespBodyP1} = list_backups(?NODE3_PORT, Auth, <<"1">>, <<"10">>),
  77. {ok, RespBodyP2} = list_backups(?NODE3_PORT, Auth, <<"2">>, <<"10">>),
  78. {ok, RespBodyP3} = list_backups(?NODE3_PORT, Auth, <<"3">>, <<"10">>),
  79. #{<<"data">> := DataP1, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := true}} = emqx_utils_json:decode(
  80. RespBodyP1
  81. ),
  82. ?assertEqual(10, length(DataP1)),
  83. #{<<"data">> := DataP2, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
  84. RespBodyP2
  85. ),
  86. ?assertEqual(10, length(DataP2)),
  87. #{<<"data">> := DataP3, <<"meta">> := #{<<"count">> := 20, <<"hasnext">> := false}} = emqx_utils_json:decode(
  88. RespBodyP3
  89. ),
  90. ?assertEqual(0, length(DataP3)),
  91. ?assertEqual(Data, DataP1 ++ DataP2).
  92. t_upload_ce_backup(Config) ->
  93. upload_backup_test(Config, ?UPLOAD_CE_BACKUP).
  94. t_upload_ee_backup(Config) ->
  95. case emqx_release:edition() of
  96. ee -> upload_backup_test(Config, ?UPLOAD_EE_BACKUP);
  97. ce -> ok
  98. end.
  99. t_import_ce_backup(Config) ->
  100. import_backup_test(Config, ?UPLOAD_CE_BACKUP).
  101. t_import_ee_backup(Config) ->
  102. case emqx_release:edition() of
  103. ee -> import_backup_test(Config, ?UPLOAD_EE_BACKUP);
  104. ce -> ok
  105. end.
  106. do_init_per_testcase(TC, Config) ->
  107. Cluster = [Core1, _Core2, Repl] = cluster(TC, Config),
  108. Auth = auth_header(Core1),
  109. ok = wait_for_auth_replication(Repl),
  110. [{auth, Auth}, {cluster, Cluster} | Config].
  111. test_file_op(Method, Config) ->
  112. Auth = ?config(auth, Config),
  113. {ok, Node1Resp} = export_backup(?NODE1_PORT, Auth),
  114. {ok, Node2Resp} = export_backup(?NODE2_PORT, Auth),
  115. {ok, Node3Resp} = export_backup(?NODE3_PORT, Auth),
  116. ParsedResps = [emqx_utils_json:decode(R) || R <- [Node1Resp, Node2Resp, Node3Resp]],
  117. [Node1Parsed, Node2Parsed, Node3Parsed] = ParsedResps,
  118. %% node param is not set in Query, expect get/delete the backup on the local node
  119. F1 = fun() ->
  120. backup_file_op(Method, ?NODE1_PORT, Auth, maps:get(<<"filename">>, Node1Parsed), [])
  121. end,
  122. ?assertMatch({ok, _}, F1()),
  123. assert_second_call(Method, F1()),
  124. %% Node 2 must get/delete the backup on Node 3 via rpc
  125. F2 = fun() ->
  126. backup_file_op(
  127. Method,
  128. ?NODE2_PORT,
  129. Auth,
  130. maps:get(<<"filename">>, Node3Parsed),
  131. [{<<"node">>, maps:get(<<"node">>, Node3Parsed)}]
  132. )
  133. end,
  134. ?assertMatch({ok, _}, F2()),
  135. assert_second_call(Method, F2()),
  136. %% The same as above but nodes are switched
  137. F3 = fun() ->
  138. backup_file_op(
  139. Method,
  140. ?NODE3_PORT,
  141. Auth,
  142. maps:get(<<"filename">>, Node2Parsed),
  143. [{<<"node">>, maps:get(<<"node">>, Node2Parsed)}]
  144. )
  145. end,
  146. ?assertMatch({ok, _}, F3()),
  147. assert_second_call(Method, F3()).
  148. export_test(NodeApiPort, Auth) ->
  149. {ok, RespBody} = export_backup(NodeApiPort, Auth),
  150. #{
  151. <<"created_at">> := _,
  152. <<"created_at_sec">> := CreatedSec,
  153. <<"filename">> := _,
  154. <<"node">> := _,
  155. <<"size">> := Size
  156. } = emqx_utils_json:decode(RespBody),
  157. ?assert(is_integer(Size)),
  158. ?assert(is_integer(CreatedSec) andalso CreatedSec > 0).
  159. upload_backup_test(Config, BackupName) ->
  160. Auth = ?config(auth, Config),
  161. UploadFile = ?backup_path(Config, BackupName),
  162. BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP),
  163. BadUploadFile = ?backup_path(Config, ?BAD_UPLOAD_BACKUP),
  164. ?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)),
  165. %% This file was specially forged to pass upload validation bat fail on import
  166. ?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)),
  167. ?assertEqual({error, bad_request}, upload_backup(?NODE1_PORT, Auth, BadUploadFile)),
  168. %% Invalid file must not be kept
  169. ?assertMatch(
  170. {error, {_, 404, _}}, backup_file_op(get, ?NODE1_PORT, Auth, ?BAD_UPLOAD_BACKUP, [])
  171. ).
  172. import_backup_test(Config, BackupName) ->
  173. Auth = ?config(auth, Config),
  174. UploadFile = ?backup_path(Config, BackupName),
  175. BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP),
  176. ?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)),
  177. %% This file was specially forged to pass upload validation bat fail on import
  178. ?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)),
  179. %% Replicant node must be able to import the file by doing rpc to a core node
  180. ?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)),
  181. [N1, N2, N3] = ?config(cluster, Config),
  182. ?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)),
  183. ?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N3)),
  184. %% Now this node must also have the file locally
  185. ?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N1)),
  186. ?assertMatch({error, {_, 400, _}}, import_backup(?NODE2_PORT, Auth, ?BAD_IMPORT_BACKUP, N2)).
  187. assert_second_call(get, Res) ->
  188. ?assertMatch({ok, _}, Res);
  189. assert_second_call(delete, Res) ->
  190. ?assertMatch({error, {_, 404, _}}, Res).
  191. export_backup(NodeApiPort, Auth) ->
  192. Path = ["data", "export"],
  193. request(post, NodeApiPort, Path, Auth).
  194. import_backup(NodeApiPort, Auth, BackupName) ->
  195. import_backup(NodeApiPort, Auth, BackupName, undefined).
  196. import_backup(NodeApiPort, Auth, BackupName, Node) ->
  197. Path = ["data", "import"],
  198. Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)},
  199. Body1 =
  200. case Node of
  201. undefined -> Body;
  202. _ -> Body#{<<"node">> => Node}
  203. end,
  204. request(post, NodeApiPort, Path, Body1, Auth).
  205. list_backups(NodeApiPort, Auth, Page, Limit) ->
  206. Path = ["data", "files"],
  207. request(get, NodeApiPort, Path, [{<<"page">>, Page}, {<<"limit">>, Limit}], [], Auth).
  208. backup_file_op(Method, NodeApiPort, Auth, BackupName, QueryList) ->
  209. Path = ["data", "files", BackupName],
  210. request(Method, NodeApiPort, Path, QueryList, [], Auth).
  211. upload_backup(NodeApiPort, Auth, BackupFilePath) ->
  212. Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodeApiPort), ["data", "files"]),
  213. Res = emqx_mgmt_api_test_util:upload_request(
  214. Path,
  215. BackupFilePath,
  216. "filename",
  217. <<"application/octet-stream">>,
  218. [],
  219. Auth
  220. ),
  221. case Res of
  222. {ok, {{"HTTP/1.1", 204, _}, _Headers, _}} ->
  223. ok;
  224. {ok, {{"HTTP/1.1", 400, _}, _Headers, _} = Resp} ->
  225. ct:pal("Backup upload failed: ~p", [Resp]),
  226. {error, bad_request};
  227. Err ->
  228. Err
  229. end.
  230. request(Method, NodePort, PathParts, Auth) ->
  231. request(Method, NodePort, PathParts, [], [], Auth).
  232. request(Method, NodePort, PathParts, Body, Auth) ->
  233. request(Method, NodePort, PathParts, [], Body, Auth).
  234. request(Method, NodePort, PathParts, QueryList, Body, Auth) ->
  235. Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodePort), PathParts),
  236. Query = unicode:characters_to_list(uri_string:compose_query(QueryList)),
  237. emqx_mgmt_api_test_util:request_api(Method, Path, Query, Auth, Body).
  238. cluster(TC, Config) ->
  239. Nodes = emqx_cth_cluster:start(
  240. [
  241. {api_data_backup_core1, #{role => core, apps => apps_spec(18085, TC)}},
  242. {api_data_backup_core2, #{role => core, apps => apps_spec(18086, TC)}},
  243. {api_data_backup_replicant, #{role => replicant, apps => apps_spec(18087, TC)}}
  244. ],
  245. #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
  246. ),
  247. Nodes.
  248. auth_header(Node) ->
  249. {ok, API} = erpc:call(Node, emqx_common_test_http, create_default_app, []),
  250. emqx_common_test_http:auth_header(API).
  251. wait_for_auth_replication(ReplNode) ->
  252. wait_for_auth_replication(ReplNode, 100).
  253. wait_for_auth_replication(ReplNode, 0) ->
  254. {error, {ReplNode, auth_not_ready}};
  255. wait_for_auth_replication(ReplNode, Retries) ->
  256. try
  257. {_Header, _Val} = erpc:call(ReplNode, emqx_common_test_http, default_auth_header, []),
  258. ok
  259. catch
  260. _:_ ->
  261. timer:sleep(1),
  262. wait_for_auth_replication(ReplNode, Retries - 1)
  263. end.
  264. apps_spec(APIPort, TC) ->
  265. common_apps_spec() ++
  266. app_spec_dashboard(APIPort) ++
  267. upload_import_apps_spec(TC).
  268. common_apps_spec() ->
  269. [
  270. emqx,
  271. emqx_conf,
  272. emqx_management
  273. ].
  274. app_spec_dashboard(APIPort) ->
  275. [
  276. {emqx_dashboard, #{
  277. config =>
  278. #{
  279. dashboard =>
  280. #{
  281. listeners =>
  282. #{
  283. http =>
  284. #{bind => APIPort}
  285. },
  286. default_username => "",
  287. default_password => ""
  288. }
  289. }
  290. }}
  291. ].
  292. upload_import_apps_spec(TC) when
  293. TC =:= t_upload_ee_backup;
  294. TC =:= t_import_ee_backup;
  295. TC =:= t_upload_ce_backup;
  296. TC =:= t_import_ce_backup
  297. ->
  298. [
  299. emqx_auth,
  300. emqx_auth_http,
  301. emqx_auth_jwt,
  302. emqx_auth_mnesia,
  303. emqx_rule_engine,
  304. emqx_modules,
  305. emqx_bridge
  306. ];
  307. upload_import_apps_spec(_TC) ->
  308. [].