| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_ft_assembler_SUITE).
- -compile(export_all).
- -compile(nowarn_export_all).
- -include_lib("common_test/include/ct.hrl").
- -include_lib("stdlib/include/assert.hrl").
- -include_lib("kernel/include/file.hrl").
- all() ->
- [
- t_assemble_empty_transfer,
- t_assemble_complete_local_transfer,
- t_assemble_incomplete_transfer,
- t_assemble_no_meta,
- % NOTE
- % It depends on the side effects of all previous testcases.
- t_list_transfers
- ].
- init_per_suite(Config) ->
- {ok, Apps} = application:ensure_all_started(gproc),
- [{suite_apps, Apps} | Config].
- end_per_suite(Config) ->
- emqx_cth_suite:stop_apps(?config(suite_apps, Config)).
- init_per_testcase(TC, Config) ->
- ok = snabbkaffe:start_trace(),
- {ok, Pid} = emqx_ft_assembler_sup:start_link(),
- [
- {storage_root, <<"file_transfer_root">>},
- {exports_root, <<"file_transfer_exports">>},
- {file_id, atom_to_binary(TC)},
- {assembler_sup, Pid}
- | Config
- ].
- end_per_testcase(_TC, Config) ->
- ok = inspect_storage_root(Config),
- ok = gen:stop(?config(assembler_sup, Config)),
- ok = snabbkaffe:stop(),
- ok.
- %%
- -define(CLIENTID1, <<"thatsme">>).
- -define(CLIENTID2, <<"thatsnotme">>).
- t_assemble_empty_transfer(Config) ->
- Storage = storage(Config),
- Transfer = {?CLIENTID1, ?config(file_id, Config)},
- Filename = "important.pdf",
- Meta = #{
- name => Filename,
- size => 0,
- expire_at => 42
- },
- ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
- ?assertMatch(
- {ok, [
- #{
- path := _,
- timestamp := {{_, _, _}, {_, _, _}},
- fragment := {filemeta, Meta}
- }
- ]},
- emqx_ft_storage_fs:list(Storage, Transfer, fragment)
- ),
- Status = complete_assemble(Storage, Transfer, 0),
- ?assertEqual({shutdown, ok}, Status),
- {ok, [_Result = #{size := _Size = 0}]} = list_exports(Config, Transfer),
- % ?assertEqual(
- % {error, eof},
- % emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
- % ),
- ok.
- t_assemble_complete_local_transfer(Config) ->
- Storage = storage(Config),
- Transfer = {?CLIENTID2, ?config(file_id, Config)},
- Filename = "topsecret.pdf",
- TransferSize = 10000 + rand:uniform(50000),
- SegmentSize = 4096,
- Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
- Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
- Meta = #{
- name => Filename,
- checksum => {sha256, Hash},
- expire_at => 42
- },
- ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
- _ = emqx_ft_content_gen:consume(
- Gen,
- fun({Content, SegmentNum, _Meta}) ->
- Offset = (SegmentNum - 1) * SegmentSize,
- ?assertEqual(
- ok,
- emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content})
- )
- end
- ),
- {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
- ?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
- ?assertEqual(
- [Meta],
- [FM || #{fragment := {filemeta, FM}} <- Fragments],
- Fragments
- ),
- Status = complete_assemble(Storage, Transfer, TransferSize),
- ?assertEqual({shutdown, ok}, Status),
- ?assertMatch(
- {ok, [
- #{
- size := TransferSize,
- meta := #{}
- }
- ]},
- list_exports(Config, Transfer)
- ),
- {ok, [#{path := AssemblyFilename}]} = list_exports(Config, Transfer),
- ?assertMatch(
- {ok, #file_info{type = regular, size = TransferSize}},
- file:read_file_info(AssemblyFilename)
- ),
- ok = emqx_ft_content_gen:check_file_consistency(
- {Transfer, TransferSize},
- 100,
- AssemblyFilename
- ).
- t_assemble_incomplete_transfer(Config) ->
- Storage = storage(Config),
- Transfer = {?CLIENTID2, ?config(file_id, Config)},
- Filename = "incomplete.pdf",
- TransferSize = 10000 + rand:uniform(50000),
- SegmentSize = 4096,
- Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
- Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
- Meta = #{
- name => Filename,
- checksum => {sha256, Hash},
- size => TransferSize,
- expire_at => 42
- },
- ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
- Status = complete_assemble(Storage, Transfer, TransferSize),
- ?assertMatch({shutdown, {error, _}}, Status).
- t_assemble_no_meta(Config) ->
- Storage = storage(Config),
- Transfer = {?CLIENTID2, ?config(file_id, Config)},
- Status = complete_assemble(Storage, Transfer, 42),
- ?assertMatch({shutdown, {error, {incomplete, _}}}, Status).
- complete_assemble(Storage, Transfer, Size) ->
- complete_assemble(Storage, Transfer, Size, 1000).
- complete_assemble(Storage, Transfer, Size, Timeout) ->
- {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}),
- MRef = erlang:monitor(process, Pid),
- Pid ! kickoff,
- receive
- {'DOWN', MRef, process, Pid, Result} ->
- Result
- after Timeout ->
- ct:fail("Assembler did not finish in time")
- end.
- %%
- t_list_transfers(Config) ->
- {ok, Exports} = list_exports(Config),
- ?assertMatch(
- [
- #{
- transfer := {?CLIENTID2, <<"t_assemble_complete_local_transfer">>},
- path := _,
- size := Size,
- meta := #{name := "topsecret.pdf"}
- },
- #{
- transfer := {?CLIENTID1, <<"t_assemble_empty_transfer">>},
- path := _,
- size := 0,
- meta := #{name := "important.pdf"}
- }
- ] when Size > 0,
- lists:sort(Exports)
- ).
- %%
- -include_lib("kernel/include/file.hrl").
- inspect_storage_root(Config) ->
- inspect_dir(?config(storage_root, Config)).
- inspect_dir(Dir) ->
- FileInfos = filelib:fold_files(
- Dir,
- ".*",
- true,
- fun(Filename, Acc) -> orddict:store(Filename, inspect_file(Filename), Acc) end,
- orddict:new()
- ),
- ct:pal("inspect '~s': ~p", [Dir, FileInfos]).
- inspect_file(Filename) ->
- {ok, Info} = file:read_file_info(Filename),
- {Info#file_info.type, Info#file_info.size, Info#file_info.mtime}.
- mk_fileid() ->
- integer_to_binary(erlang:system_time(millisecond)).
- list_exports(Config) ->
- {emqx_ft_storage_exporter_fs, Options} = exporter(Config),
- emqx_ft_storage_exporter_fs:list_local(Options).
- list_exports(Config, Transfer) ->
- {emqx_ft_storage_exporter_fs, Options} = exporter(Config),
- emqx_ft_storage_exporter_fs:list_local_transfer(Options, Transfer).
- exporter(Config) ->
- emqx_ft_storage_exporter:exporter(storage(Config)).
- storage(Config) ->
- emqx_utils_maps:deep_get(
- [storage, local],
- emqx_ft_schema:translate(#{
- <<"storage">> => #{
- <<"local">> => #{
- <<"segments">> => #{
- <<"root">> => ?config(storage_root, Config)
- },
- <<"exporter">> => #{
- <<"local">> => #{
- <<"enable">> => true,
- <<"root">> => ?config(exports_root, Config)
- }
- }
- }
- }
- })
- ).
|