| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- %% @doc This module provides lazy, composable producer streams that
- %% can be considered counterparts to Archiver's consumer pipes and
- %% therefore can facilitate testing
- %%
- %% Also it comes with an implementation of binary data stream which is
- %% able to produce sufficiently large amounts of plausibly
- %% pseudorandom binary payload in a deterministic way. It also
- %% contains routines to check binary blobs via sampling
- -module(payload_gen).
- -define(end_of_stream, []).
- -dialyzer(no_improper_lists).
- %% Generic stream API:
- -export([
- interleave_streams/1,
- retransmits/2,
- next/1,
- consume/2,
- consume/1
- ]).
- %% Binary payload generator API:
- -export([
- interleave_chunks/2,
- interleave_chunks/1,
- mb/1,
- generator_fun/2,
- generate_chunks/3,
- generate_chunk/2,
- check_consistency/3,
- check_file_consistency/3,
- get_byte/2
- ]).
- %% List to stream generator API:
- -export([list_to_stream/1]).
- %% Proper generators:
- -export([
- binary_stream_gen/1,
- interleaved_streams_gen/1,
- interleaved_binary_gen/1,
- interleaved_list_gen/1
- ]).
- -export_type([payload/0, binary_payload/0]).
- -define(hash_size, 16).
- -include_lib("proper/include/proper.hrl").
- -include_lib("eunit/include/eunit.hrl").
- -type payload() :: {Seed :: term(), Size :: integer()}.
- -type binary_payload() :: {
- binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
- }.
- %% For performance reasons we treat regular lists as streams, see `next/1'
- -type cont(Data) ::
- fun(() -> stream(Data))
- | stream(Data).
- -type stream(Data) ::
- maybe_improper_list(Data, cont(Data))
- | ?end_of_stream.
- -type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}).
- -record(chunk_state, {
- seed :: term(),
- payload_size :: non_neg_integer(),
- offset :: non_neg_integer(),
- chunk_size :: non_neg_integer()
- }).
- -type chunk_state() :: #chunk_state{}.
- -record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}).
- -type interleave_state() :: #interleave_state{}.
- %% =============================================================================
- %% API functions
- %% =============================================================================
- %% -----------------------------------------------------------------------------
- %% Proper generators
- %% -----------------------------------------------------------------------------
- %% @doc Proper generator that creates a binary stream
- -spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type().
- binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
- ?LET(
- {Seed, Size},
- {nat(), range(1, 16#100000)},
- generate_chunk({Seed, Size}, ChunkSize)
- ).
- %% @equiv interleaved_streams_gen(10, Type)
- -spec interleaved_streams_gen(proper_types:type()) -> proper_types:type().
- interleaved_streams_gen(Type) ->
- interleaved_streams_gen(10, Type).
- %% @doc Proper generator that creates a term of type
- %% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed
- %% into `interleave_streams/1' function
- -spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) ->
- proper_types:type().
- interleaved_streams_gen(MaxNStreams, StreamType) ->
- ?LET(
- NStreams,
- range(1, MaxNStreams),
- ?LET(
- Streams,
- vector(NStreams, StreamType),
- begin
- Tags = [<<I/integer>> || I <- lists:seq(1, length(Streams))],
- lists:zip(Tags, Streams)
- end
- )
- ).
- -spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type().
- interleaved_binary_gen(ChunkSize) ->
- interleaved_streams_gen(binary_stream_gen(ChunkSize)).
- -spec interleaved_list_gen(proper_types:type()) -> proper_types:type().
- interleaved_list_gen(Type) ->
- interleaved_streams_gen(non_empty(list(Type))).
- %% -----------------------------------------------------------------------------
- %% Generic streams
- %% -----------------------------------------------------------------------------
- %% @doc Consume one element from the stream.
- -spec next(cont(A)) -> stream(A).
- next(Fun) when is_function(Fun, 0) ->
- Fun();
- next(L) ->
- L.
- %% @doc Take a list of tagged streams and return a stream where
- %% elements of the streams are tagged and randomly interleaved.
- %%
- %% Note: this function is more or less generic and it's compatible
- %% with this module's `generate_chunks' function family, as well as
- %% `ets:next', lists and what not
- %%
- %% Consider using simplified versions of this function
- -spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}).
- interleave_streams(Streams) ->
- do_interleave_streams(
- #interleave_state{streams = Streams}
- ).
- %% @doc Take an arbitrary stream and add repetitions of the elements
- %% TODO: Make retransmissions of arbitrary length
- -spec retransmits(stream(Data), float()) -> stream(Data).
- retransmits(Stream, Probability) ->
- case Stream of
- [Data | Cont0] ->
- Cont = fun() -> retransmits(next(Cont0), Probability) end,
- case rand:uniform() < Probability of
- true -> [Data, Data | Cont];
- false -> [Data | Cont]
- end;
- ?end_of_stream ->
- ?end_of_stream
- end.
- %% @doc Consume all elements of the stream and feed them into a
- %% callback (e.g. brod:produce)
- -spec consume(
- stream(A),
- fun((A) -> Ret)
- ) -> [Ret].
- consume(Stream, Callback) ->
- case Stream of
- [Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)];
- ?end_of_stream -> []
- end.
- %% @equiv consume(Stream, fun(A) -> A end)
- -spec consume(stream(A)) -> [A].
- consume(Stream) ->
- consume(Stream, fun(A) -> A end).
- %% -----------------------------------------------------------------------------
- %% Misc functions
- %% -----------------------------------------------------------------------------
- %% @doc Return number of bytes in `N' megabytes
- -spec mb(integer()) -> integer().
- mb(N) ->
- N * 1048576.
- %% -----------------------------------------------------------------------------
- %% List streams
- %% -----------------------------------------------------------------------------
- -spec list_to_stream([A]) -> stream(A).
- list_to_stream(L) -> L.
- %% -----------------------------------------------------------------------------
- %% Binary streams
- %% -----------------------------------------------------------------------------
- %% @doc First argument is a chunk number, the second one is a seed.
- %% This implementation is hardly efficient, but it was chosen for
- %% clarity reasons
- -spec generator_fun(integer(), binary()) -> binary().
- generator_fun(N, Seed) ->
- crypto:hash(md5, <<N:32, Seed/binary>>).
- %% @doc Get byte at offset `N'
- -spec get_byte(integer(), term()) -> byte().
- get_byte(N, Seed) ->
- do_get_byte(N, seed_hash(Seed)).
- %% @doc Stream of binary chunks. Limitation: both payload size and
- %% `ChunkSize' should be dividable by `?hash_size'
- -spec generate_chunk(payload(), integer()) -> stream(binary_payload()).
- generate_chunk({Seed, Size}, ChunkSize) when
- ChunkSize rem ?hash_size =:= 0
- ->
- State = #chunk_state{
- seed = Seed,
- payload_size = Size,
- chunk_size = ChunkSize,
- offset = 0
- },
- generate_chunk(State).
- %% @doc Take a list of `payload()'s and a callback function, and start
- %% producing the payloads in random order. Seed is used as a tag
- %% @see interleave_streams/4
- -spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) ->
- tagged_binstream().
- interleave_chunks(Streams0) ->
- Streams = [
- {Tag, generate_chunk(Payload, ChunkSize)}
- || {Payload = {Tag, _}, ChunkSize} <- Streams0
- ],
- interleave_streams(Streams).
- %% @doc Take a list of `payload()'s and a callback function, and start
- %% consuming the payloads in a random order. Seed is used as a
- %% tag. All streams use the same chunk size
- %% @see interleave_streams/2
- -spec interleave_chunks(
- [payload()],
- non_neg_integer()
- ) -> tagged_binstream().
- interleave_chunks(Streams0, ChunkSize) ->
- Streams = [
- {Seed, generate_chunk({Seed, Size}, ChunkSize)}
- || {Seed, Size} <- Streams0
- ],
- interleave_streams(Streams).
- %% @doc Generate chunks of data and feed them into
- %% `Callback'
- -spec generate_chunks(
- payload(),
- integer(),
- fun((binary()) -> A)
- ) -> [A].
- generate_chunks(Payload, ChunkSize, Callback) ->
- consume(generate_chunk(Payload, ChunkSize), Callback).
- -spec check_consistency(
- payload(),
- integer(),
- fun((integer()) -> {ok, binary()} | undefined)
- ) -> ok.
- check_consistency({Seed, Size}, SampleSize, Callback) ->
- SeedHash = seed_hash(Seed),
- Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
- %% Always check first and last bytes, and one that should not exist:
- Samples = [0, Size - 1, Size | Random],
- lists:foreach(
- fun
- (N) when N < Size ->
- Expected = do_get_byte(N, SeedHash),
- ?assertEqual(
- {N, {ok, Expected}},
- {N, Callback(N)}
- );
- (N) ->
- ?assertMatch(undefined, Callback(N))
- end,
- Samples
- ).
- -spec check_file_consistency(
- payload(),
- integer(),
- file:filename()
- ) -> ok.
- check_file_consistency(Payload, SampleSize, FileName) ->
- {ok, FD} = file:open(FileName, [read, raw]),
- try
- Fun = fun(N) ->
- case file:pread(FD, [{N, 1}]) of
- {ok, [[X]]} -> {ok, X};
- {ok, [eof]} -> undefined
- end
- end,
- check_consistency(Payload, SampleSize, Fun)
- after
- file:close(FD)
- end.
- %% =============================================================================
- %% Internal functions
- %% =============================================================================
- -spec do_interleave_streams(interleave_state()) -> stream(_Data).
- do_interleave_streams(#interleave_state{streams = []}) ->
- ?end_of_stream;
- do_interleave_streams(#interleave_state{streams = Streams} = State0) ->
- %% Not the most efficient implementation (lots of avoidable list
- %% traversals), but we don't expect the number of streams to be the
- %% bottleneck
- N = rand:uniform(length(Streams)),
- {Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams),
- case SC of
- [Payload | SC1] ->
- State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]},
- Cont = fun() -> do_interleave_streams(State) end,
- [{Tag, Payload} | Cont];
- ?end_of_stream ->
- State = State0#interleave_state{streams = Hd ++ Tl},
- do_interleave_streams(State)
- end.
- %% @doc Continue generating chunks
- -spec generate_chunk(chunk_state()) -> stream(binary()).
- generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when
- Offset >= Size
- ->
- ?end_of_stream;
- generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
- State = State0#chunk_state{offset = Offset + ChunkSize},
- Payload = generate_chunk(
- State#chunk_state.seed,
- Offset,
- ChunkSize,
- State#chunk_state.payload_size
- ),
- [Payload | fun() -> generate_chunk(State) end].
- generate_chunk(Seed, Offset, ChunkSize, Size) ->
- SeedHash = seed_hash(Seed),
- To = min(Offset + ChunkSize, Size) - 1,
- Payload = iolist_to_binary([
- generator_fun(I, SeedHash)
- || I <- lists:seq(Offset div 16, To div 16)
- ]),
- ChunkNum = Offset div ChunkSize + 1,
- ChunkCnt = ceil(Size / ChunkSize),
- {Payload, ChunkNum, ChunkCnt}.
- %% @doc Hash any term
- -spec seed_hash(term()) -> binary().
- seed_hash(Seed) ->
- crypto:hash(md5, term_to_binary(Seed)).
- %% @private Get byte at offset `N'
- -spec do_get_byte(integer(), binary()) -> byte().
- do_get_byte(N, Seed) ->
- Chunk = generator_fun(N div ?hash_size, Seed),
- binary:at(Chunk, N rem ?hash_size).
|