payload_gen.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc This module provides lazy, composable producer streams that
  17. %% can be considered counterparts to Archiver's consumer pipes and
  18. %% therefore can facilitate testing
  19. %%
  20. %% Also it comes with an implementation of binary data stream which is
  21. %% able to produce sufficiently large amounts of plausibly
  22. %% pseudorandom binary payload in a deterministic way. It also
  23. %% contains routines to check binary blobs via sampling
  24. -module(payload_gen).
  25. -define(end_of_stream, []).
  26. -dialyzer(no_improper_lists).
  27. %% Generic stream API:
  28. -export([
  29. interleave_streams/1,
  30. retransmits/2,
  31. next/1,
  32. consume/2,
  33. consume/1
  34. ]).
  35. %% Binary payload generator API:
  36. -export([
  37. interleave_chunks/2,
  38. interleave_chunks/1,
  39. mb/1,
  40. generator_fun/2,
  41. generate_chunks/3,
  42. generate_chunk/2,
  43. check_consistency/3,
  44. check_file_consistency/3,
  45. get_byte/2
  46. ]).
  47. %% List to stream generator API:
  48. -export([list_to_stream/1]).
  49. %% Proper generators:
  50. -export([
  51. binary_stream_gen/1,
  52. interleaved_streams_gen/1,
  53. interleaved_binary_gen/1,
  54. interleaved_list_gen/1
  55. ]).
  56. -export_type([payload/0, binary_payload/0]).
  57. -define(hash_size, 16).
  58. -include_lib("proper/include/proper.hrl").
  59. -include_lib("eunit/include/eunit.hrl").
  60. -type payload() :: {Seed :: term(), Size :: integer()}.
  61. -type binary_payload() :: {
  62. binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
  63. }.
  64. %% For performance reasons we treat regular lists as streams, see `next/1'
  65. -type cont(Data) ::
  66. fun(() -> stream(Data))
  67. | stream(Data).
  68. -type stream(Data) ::
  69. maybe_improper_list(Data, cont(Data))
  70. | ?end_of_stream.
  71. -type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}).
  72. -record(chunk_state, {
  73. seed :: term(),
  74. payload_size :: non_neg_integer(),
  75. offset :: non_neg_integer(),
  76. chunk_size :: non_neg_integer()
  77. }).
  78. -type chunk_state() :: #chunk_state{}.
  79. -record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}).
  80. -type interleave_state() :: #interleave_state{}.
  81. %% =============================================================================
  82. %% API functions
  83. %% =============================================================================
  84. %% -----------------------------------------------------------------------------
  85. %% Proper generators
  86. %% -----------------------------------------------------------------------------
  87. %% @doc Proper generator that creates a binary stream
  88. -spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type().
  89. binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
  90. ?LET(
  91. {Seed, Size},
  92. {nat(), range(1, 16#100000)},
  93. generate_chunk({Seed, Size}, ChunkSize)
  94. ).
  95. %% @equiv interleaved_streams_gen(10, Type)
  96. -spec interleaved_streams_gen(proper_types:type()) -> proper_types:type().
  97. interleaved_streams_gen(Type) ->
  98. interleaved_streams_gen(10, Type).
  99. %% @doc Proper generator that creates a term of type
  100. %% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed
  101. %% into `interleave_streams/1' function
  102. -spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) ->
  103. proper_types:type().
  104. interleaved_streams_gen(MaxNStreams, StreamType) ->
  105. ?LET(
  106. NStreams,
  107. range(1, MaxNStreams),
  108. ?LET(
  109. Streams,
  110. vector(NStreams, StreamType),
  111. begin
  112. Tags = [<<I/integer>> || I <- lists:seq(1, length(Streams))],
  113. lists:zip(Tags, Streams)
  114. end
  115. )
  116. ).
  117. -spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type().
  118. interleaved_binary_gen(ChunkSize) ->
  119. interleaved_streams_gen(binary_stream_gen(ChunkSize)).
  120. -spec interleaved_list_gen(proper_types:type()) -> proper_types:type().
  121. interleaved_list_gen(Type) ->
  122. interleaved_streams_gen(non_empty(list(Type))).
  123. %% -----------------------------------------------------------------------------
  124. %% Generic streams
  125. %% -----------------------------------------------------------------------------
  126. %% @doc Consume one element from the stream.
  127. -spec next(cont(A)) -> stream(A).
  128. next(Fun) when is_function(Fun, 0) ->
  129. Fun();
  130. next(L) ->
  131. L.
  132. %% @doc Take a list of tagged streams and return a stream where
  133. %% elements of the streams are tagged and randomly interleaved.
  134. %%
  135. %% Note: this function is more or less generic and it's compatible
  136. %% with this module's `generate_chunks' function family, as well as
  137. %% `ets:next', lists and what not
  138. %%
  139. %% Consider using simplified versions of this function
  140. -spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}).
  141. interleave_streams(Streams) ->
  142. do_interleave_streams(
  143. #interleave_state{streams = Streams}
  144. ).
  145. %% @doc Take an arbitrary stream and add repetitions of the elements
  146. %% TODO: Make retransmissions of arbitrary length
  147. -spec retransmits(stream(Data), float()) -> stream(Data).
  148. retransmits(Stream, Probability) ->
  149. case Stream of
  150. [Data | Cont0] ->
  151. Cont = fun() -> retransmits(next(Cont0), Probability) end,
  152. case rand:uniform() < Probability of
  153. true -> [Data, Data | Cont];
  154. false -> [Data | Cont]
  155. end;
  156. ?end_of_stream ->
  157. ?end_of_stream
  158. end.
  159. %% @doc Consume all elements of the stream and feed them into a
  160. %% callback (e.g. brod:produce)
  161. -spec consume(
  162. stream(A),
  163. fun((A) -> Ret)
  164. ) -> [Ret].
  165. consume(Stream, Callback) ->
  166. case Stream of
  167. [Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)];
  168. ?end_of_stream -> []
  169. end.
  170. %% @equiv consume(Stream, fun(A) -> A end)
  171. -spec consume(stream(A)) -> [A].
  172. consume(Stream) ->
  173. consume(Stream, fun(A) -> A end).
  174. %% -----------------------------------------------------------------------------
  175. %% Misc functions
  176. %% -----------------------------------------------------------------------------
  177. %% @doc Return number of bytes in `N' megabytes
  178. -spec mb(integer()) -> integer().
  179. mb(N) ->
  180. N * 1048576.
  181. %% -----------------------------------------------------------------------------
  182. %% List streams
  183. %% -----------------------------------------------------------------------------
  184. -spec list_to_stream([A]) -> stream(A).
  185. list_to_stream(L) -> L.
  186. %% -----------------------------------------------------------------------------
  187. %% Binary streams
  188. %% -----------------------------------------------------------------------------
  189. %% @doc First argument is a chunk number, the second one is a seed.
  190. %% This implementation is hardly efficient, but it was chosen for
  191. %% clarity reasons
  192. -spec generator_fun(integer(), binary()) -> binary().
  193. generator_fun(N, Seed) ->
  194. crypto:hash(md5, <<N:32, Seed/binary>>).
  195. %% @doc Get byte at offset `N'
  196. -spec get_byte(integer(), term()) -> byte().
  197. get_byte(N, Seed) ->
  198. do_get_byte(N, seed_hash(Seed)).
  199. %% @doc Stream of binary chunks. Limitation: both payload size and
  200. %% `ChunkSize' should be dividable by `?hash_size'
  201. -spec generate_chunk(payload(), integer()) -> stream(binary_payload()).
  202. generate_chunk({Seed, Size}, ChunkSize) when
  203. ChunkSize rem ?hash_size =:= 0
  204. ->
  205. State = #chunk_state{
  206. seed = Seed,
  207. payload_size = Size,
  208. chunk_size = ChunkSize,
  209. offset = 0
  210. },
  211. generate_chunk(State).
  212. %% @doc Take a list of `payload()'s and a callback function, and start
  213. %% producing the payloads in random order. Seed is used as a tag
  214. %% @see interleave_streams/4
  215. -spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) ->
  216. tagged_binstream().
  217. interleave_chunks(Streams0) ->
  218. Streams = [
  219. {Tag, generate_chunk(Payload, ChunkSize)}
  220. || {Payload = {Tag, _}, ChunkSize} <- Streams0
  221. ],
  222. interleave_streams(Streams).
  223. %% @doc Take a list of `payload()'s and a callback function, and start
  224. %% consuming the payloads in a random order. Seed is used as a
  225. %% tag. All streams use the same chunk size
  226. %% @see interleave_streams/2
  227. -spec interleave_chunks(
  228. [payload()],
  229. non_neg_integer()
  230. ) -> tagged_binstream().
  231. interleave_chunks(Streams0, ChunkSize) ->
  232. Streams = [
  233. {Seed, generate_chunk({Seed, Size}, ChunkSize)}
  234. || {Seed, Size} <- Streams0
  235. ],
  236. interleave_streams(Streams).
  237. %% @doc Generate chunks of data and feed them into
  238. %% `Callback'
  239. -spec generate_chunks(
  240. payload(),
  241. integer(),
  242. fun((binary()) -> A)
  243. ) -> [A].
  244. generate_chunks(Payload, ChunkSize, Callback) ->
  245. consume(generate_chunk(Payload, ChunkSize), Callback).
  246. -spec check_consistency(
  247. payload(),
  248. integer(),
  249. fun((integer()) -> {ok, binary()} | undefined)
  250. ) -> ok.
  251. check_consistency({Seed, Size}, SampleSize, Callback) ->
  252. SeedHash = seed_hash(Seed),
  253. Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
  254. %% Always check first and last bytes, and one that should not exist:
  255. Samples = [0, Size - 1, Size | Random],
  256. lists:foreach(
  257. fun
  258. (N) when N < Size ->
  259. Expected = do_get_byte(N, SeedHash),
  260. ?assertEqual(
  261. {N, {ok, Expected}},
  262. {N, Callback(N)}
  263. );
  264. (N) ->
  265. ?assertMatch(undefined, Callback(N))
  266. end,
  267. Samples
  268. ).
  269. -spec check_file_consistency(
  270. payload(),
  271. integer(),
  272. file:filename()
  273. ) -> ok.
  274. check_file_consistency(Payload, SampleSize, FileName) ->
  275. {ok, FD} = file:open(FileName, [read, raw]),
  276. try
  277. Fun = fun(N) ->
  278. case file:pread(FD, [{N, 1}]) of
  279. {ok, [[X]]} -> {ok, X};
  280. {ok, [eof]} -> undefined
  281. end
  282. end,
  283. check_consistency(Payload, SampleSize, Fun)
  284. after
  285. file:close(FD)
  286. end.
  287. %% =============================================================================
  288. %% Internal functions
  289. %% =============================================================================
  290. -spec do_interleave_streams(interleave_state()) -> stream(_Data).
  291. do_interleave_streams(#interleave_state{streams = []}) ->
  292. ?end_of_stream;
  293. do_interleave_streams(#interleave_state{streams = Streams} = State0) ->
  294. %% Not the most efficient implementation (lots of avoidable list
  295. %% traversals), but we don't expect the number of streams to be the
  296. %% bottleneck
  297. N = rand:uniform(length(Streams)),
  298. {Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams),
  299. case SC of
  300. [Payload | SC1] ->
  301. State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]},
  302. Cont = fun() -> do_interleave_streams(State) end,
  303. [{Tag, Payload} | Cont];
  304. ?end_of_stream ->
  305. State = State0#interleave_state{streams = Hd ++ Tl},
  306. do_interleave_streams(State)
  307. end.
  308. %% @doc Continue generating chunks
  309. -spec generate_chunk(chunk_state()) -> stream(binary()).
  310. generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when
  311. Offset >= Size
  312. ->
  313. ?end_of_stream;
  314. generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
  315. State = State0#chunk_state{offset = Offset + ChunkSize},
  316. Payload = generate_chunk(
  317. State#chunk_state.seed,
  318. Offset,
  319. ChunkSize,
  320. State#chunk_state.payload_size
  321. ),
  322. [Payload | fun() -> generate_chunk(State) end].
  323. generate_chunk(Seed, Offset, ChunkSize, Size) ->
  324. SeedHash = seed_hash(Seed),
  325. To = min(Offset + ChunkSize, Size) - 1,
  326. Payload = iolist_to_binary([
  327. generator_fun(I, SeedHash)
  328. || I <- lists:seq(Offset div 16, To div 16)
  329. ]),
  330. ChunkNum = Offset div ChunkSize + 1,
  331. ChunkCnt = ceil(Size / ChunkSize),
  332. {Payload, ChunkNum, ChunkCnt}.
  333. %% @doc Hash any term
  334. -spec seed_hash(term()) -> binary().
  335. seed_hash(Seed) ->
  336. crypto:hash(md5, term_to_binary(Seed)).
  337. %% @private Get byte at offset `N'
  338. -spec do_get_byte(integer(), binary()) -> byte().
  339. do_get_byte(N, Seed) ->
  340. Chunk = generator_fun(N div ?hash_size, Seed),
  341. binary:at(Chunk, N rem ?hash_size).