emqx_ft_assembler_SUITE.erl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_assembler_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("stdlib/include/assert.hrl").
  21. -include_lib("kernel/include/file.hrl").
  22. all() ->
  23. [
  24. t_assemble_empty_transfer,
  25. t_assemble_complete_local_transfer,
  26. t_assemble_incomplete_transfer,
  27. t_assemble_no_meta,
  28. % NOTE
  29. % It depends on the side effects of all previous testcases.
  30. t_list_transfers
  31. ].
  32. init_per_suite(Config) ->
  33. {ok, Apps} = application:ensure_all_started(gproc),
  34. [{suite_apps, Apps} | Config].
  35. end_per_suite(Config) ->
  36. emqx_cth_suite:stop_apps(?config(suite_apps, Config)).
  37. init_per_testcase(TC, Config) ->
  38. ok = snabbkaffe:start_trace(),
  39. {ok, Pid} = emqx_ft_assembler_sup:start_link(),
  40. [
  41. {storage_root, <<"file_transfer_root">>},
  42. {exports_root, <<"file_transfer_exports">>},
  43. {file_id, atom_to_binary(TC)},
  44. {assembler_sup, Pid}
  45. | Config
  46. ].
  47. end_per_testcase(_TC, Config) ->
  48. ok = inspect_storage_root(Config),
  49. ok = gen:stop(?config(assembler_sup, Config)),
  50. ok = snabbkaffe:stop(),
  51. ok.
  52. %%
  53. -define(CLIENTID1, <<"thatsme">>).
  54. -define(CLIENTID2, <<"thatsnotme">>).
  55. t_assemble_empty_transfer(Config) ->
  56. Storage = storage(Config),
  57. Transfer = {?CLIENTID1, ?config(file_id, Config)},
  58. Filename = "important.pdf",
  59. Meta = #{
  60. name => Filename,
  61. size => 0,
  62. expire_at => 42
  63. },
  64. ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
  65. ?assertMatch(
  66. {ok, [
  67. #{
  68. path := _,
  69. timestamp := {{_, _, _}, {_, _, _}},
  70. fragment := {filemeta, Meta}
  71. }
  72. ]},
  73. emqx_ft_storage_fs:list(Storage, Transfer, fragment)
  74. ),
  75. Status = complete_assemble(Storage, Transfer, 0),
  76. ?assertEqual({shutdown, ok}, Status),
  77. {ok, [_Result = #{size := _Size = 0}]} = list_exports(Config, Transfer),
  78. % ?assertEqual(
  79. % {error, eof},
  80. % emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
  81. % ),
  82. ok.
  83. t_assemble_complete_local_transfer(Config) ->
  84. Storage = storage(Config),
  85. Transfer = {?CLIENTID2, ?config(file_id, Config)},
  86. Filename = "topsecret.pdf",
  87. TransferSize = 10000 + rand:uniform(50000),
  88. SegmentSize = 4096,
  89. Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
  90. Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
  91. Meta = #{
  92. name => Filename,
  93. checksum => {sha256, Hash},
  94. expire_at => 42
  95. },
  96. ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
  97. _ = emqx_ft_content_gen:consume(
  98. Gen,
  99. fun({Content, SegmentNum, _Meta}) ->
  100. Offset = (SegmentNum - 1) * SegmentSize,
  101. ?assertEqual(
  102. ok,
  103. emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content})
  104. )
  105. end
  106. ),
  107. {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
  108. ?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
  109. ?assertEqual(
  110. [Meta],
  111. [FM || #{fragment := {filemeta, FM}} <- Fragments],
  112. Fragments
  113. ),
  114. Status = complete_assemble(Storage, Transfer, TransferSize),
  115. ?assertEqual({shutdown, ok}, Status),
  116. ?assertMatch(
  117. {ok, [
  118. #{
  119. size := TransferSize,
  120. meta := #{}
  121. }
  122. ]},
  123. list_exports(Config, Transfer)
  124. ),
  125. {ok, [#{path := AssemblyFilename}]} = list_exports(Config, Transfer),
  126. ?assertMatch(
  127. {ok, #file_info{type = regular, size = TransferSize}},
  128. file:read_file_info(AssemblyFilename)
  129. ),
  130. ok = emqx_ft_content_gen:check_file_consistency(
  131. {Transfer, TransferSize},
  132. 100,
  133. AssemblyFilename
  134. ).
  135. t_assemble_incomplete_transfer(Config) ->
  136. Storage = storage(Config),
  137. Transfer = {?CLIENTID2, ?config(file_id, Config)},
  138. Filename = "incomplete.pdf",
  139. TransferSize = 10000 + rand:uniform(50000),
  140. SegmentSize = 4096,
  141. Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
  142. Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
  143. Meta = #{
  144. name => Filename,
  145. checksum => {sha256, Hash},
  146. size => TransferSize,
  147. expire_at => 42
  148. },
  149. ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
  150. Status = complete_assemble(Storage, Transfer, TransferSize),
  151. ?assertMatch({shutdown, {error, _}}, Status).
  152. t_assemble_no_meta(Config) ->
  153. Storage = storage(Config),
  154. Transfer = {?CLIENTID2, ?config(file_id, Config)},
  155. Status = complete_assemble(Storage, Transfer, 42),
  156. ?assertMatch({shutdown, {error, {incomplete, _}}}, Status).
  157. complete_assemble(Storage, Transfer, Size) ->
  158. complete_assemble(Storage, Transfer, Size, 1000).
  159. complete_assemble(Storage, Transfer, Size, Timeout) ->
  160. {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}),
  161. MRef = erlang:monitor(process, Pid),
  162. Pid ! kickoff,
  163. receive
  164. {'DOWN', MRef, process, Pid, Result} ->
  165. Result
  166. after Timeout ->
  167. ct:fail("Assembler did not finish in time")
  168. end.
  169. %%
  170. t_list_transfers(Config) ->
  171. {ok, Exports} = list_exports(Config),
  172. ?assertMatch(
  173. [
  174. #{
  175. transfer := {?CLIENTID2, <<"t_assemble_complete_local_transfer">>},
  176. path := _,
  177. size := Size,
  178. meta := #{name := "topsecret.pdf"}
  179. },
  180. #{
  181. transfer := {?CLIENTID1, <<"t_assemble_empty_transfer">>},
  182. path := _,
  183. size := 0,
  184. meta := #{name := "important.pdf"}
  185. }
  186. ] when Size > 0,
  187. lists:sort(Exports)
  188. ).
  189. %%
  190. -include_lib("kernel/include/file.hrl").
  191. inspect_storage_root(Config) ->
  192. inspect_dir(?config(storage_root, Config)).
  193. inspect_dir(Dir) ->
  194. FileInfos = filelib:fold_files(
  195. Dir,
  196. ".*",
  197. true,
  198. fun(Filename, Acc) -> orddict:store(Filename, inspect_file(Filename), Acc) end,
  199. orddict:new()
  200. ),
  201. ct:pal("inspect '~s': ~p", [Dir, FileInfos]).
  202. inspect_file(Filename) ->
  203. {ok, Info} = file:read_file_info(Filename),
  204. {Info#file_info.type, Info#file_info.size, Info#file_info.mtime}.
  205. mk_fileid() ->
  206. integer_to_binary(erlang:system_time(millisecond)).
  207. list_exports(Config) ->
  208. {emqx_ft_storage_exporter_fs, Options} = exporter(Config),
  209. emqx_ft_storage_exporter_fs:list_local(Options).
  210. list_exports(Config, Transfer) ->
  211. {emqx_ft_storage_exporter_fs, Options} = exporter(Config),
  212. emqx_ft_storage_exporter_fs:list_local_transfer(Options, Transfer).
  213. exporter(Config) ->
  214. emqx_ft_storage_exporter:exporter(storage(Config)).
  215. storage(Config) ->
  216. emqx_utils_maps:deep_get(
  217. [storage, local],
  218. emqx_ft_schema:translate(#{
  219. <<"storage">> => #{
  220. <<"local">> => #{
  221. <<"segments">> => #{
  222. <<"root">> => ?config(storage_root, Config)
  223. },
  224. <<"exporter">> => #{
  225. <<"local">> => #{
  226. <<"enable">> => true,
  227. <<"root">> => ?config(exports_root, Config)
  228. }
  229. }
  230. }
  231. }
  232. })
  233. ).