payload_gen.erl 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. %% @doc This module provides lazy, composable producer streams that
  2. %% can be considered counterparts to Archiver's consumer pipes and
  3. %% therefore can facilitate testing
  4. %%
  5. %% Also it comes with an implementation of binary data stream which is
  6. %% able to produce sufficiently large amounts of plausibly
  7. %% pseudorandom binary payload in a deterministic way. It also
  8. %% contains routines to check binary blobs via sampling
  9. -module(payload_gen).
  10. -define(end_of_stream, []).
  11. -dialyzer(no_improper_lists).
  12. %% Generic stream API:
  13. -export([
  14. interleave_streams/1,
  15. retransmits/2,
  16. next/1,
  17. consume/2,
  18. consume/1
  19. ]).
  20. %% Binary payload generator API:
  21. -export([
  22. interleave_chunks/2,
  23. interleave_chunks/1,
  24. mb/1,
  25. generator_fun/2,
  26. generate_chunks/3,
  27. generate_chunk/2,
  28. check_consistency/3,
  29. check_file_consistency/3,
  30. get_byte/2
  31. ]).
  32. %% List to stream generator API:
  33. -export([list_to_stream/1]).
  34. %% Proper generators:
  35. -export([
  36. binary_stream_gen/1,
  37. interleaved_streams_gen/1,
  38. interleaved_binary_gen/1,
  39. interleaved_list_gen/1
  40. ]).
  41. -export_type([payload/0, binary_payload/0]).
  42. -define(hash_size, 16).
  43. -include_lib("proper/include/proper.hrl").
  44. -include_lib("eunit/include/eunit.hrl").
  45. -type payload() :: {Seed :: term(), Size :: integer()}.
  46. -type binary_payload() :: {
  47. binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
  48. }.
  49. %% For performance reasons we treat regular lists as streams, see `next/1'
  50. -opaque cont(Data) ::
  51. fun(() -> stream(Data))
  52. | stream(Data).
  53. -type stream(Data) ::
  54. maybe_improper_list(Data, cont(Data))
  55. | ?end_of_stream.
  56. -type tagged_binstream() :: stream({Tag :: term(), Payload :: chunk_state()}).
  57. -record(chunk_state, {
  58. seed :: term(),
  59. payload_size :: non_neg_integer(),
  60. offset :: non_neg_integer(),
  61. chunk_size :: non_neg_integer()
  62. }).
  63. -opaque chunk_state() :: #chunk_state{}.
  64. -record(interleave_state, {streams :: [{Tag :: term(), Stream :: term()}]}).
  65. -opaque interleave_state() :: #interleave_state{}.
  66. %% =============================================================================
  67. %% API functions
  68. %% =============================================================================
  69. %% -----------------------------------------------------------------------------
  70. %% Proper generators
  71. %% -----------------------------------------------------------------------------
  72. %% @doc Proper generator that creates a binary stream
  73. -spec binary_stream_gen(_ChunkSize :: non_neg_integer()) -> proper_types:type().
  74. binary_stream_gen(ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
  75. ?LET(
  76. {Seed, Size},
  77. {nat(), range(1, 16#100000)},
  78. generate_chunk({Seed, Size}, ChunkSize)
  79. ).
  80. %% @equiv interleaved_streams_gen(10, Type)
  81. -spec interleaved_streams_gen(proper_types:type()) -> proper_types:type().
  82. interleaved_streams_gen(Type) ->
  83. interleaved_streams_gen(10, Type).
  84. %% @doc Proper generator that creates a term of type
  85. %% ```[{_Tag :: binary(), stream()}]''' that is ready to be fed
  86. %% into `interleave_streams/1' function
  87. -spec interleaved_streams_gen(non_neg_integer(), proper_types:type()) ->
  88. proper_types:type().
  89. interleaved_streams_gen(MaxNStreams, StreamType) ->
  90. ?LET(
  91. NStreams,
  92. range(1, MaxNStreams),
  93. ?LET(
  94. Streams,
  95. vector(NStreams, StreamType),
  96. begin
  97. Tags = [<<I/integer>> || I <- lists:seq(1, length(Streams))],
  98. lists:zip(Tags, Streams)
  99. end
  100. )
  101. ).
  102. -spec interleaved_binary_gen(non_neg_integer()) -> proper_types:type().
  103. interleaved_binary_gen(ChunkSize) ->
  104. interleaved_streams_gen(binary_stream_gen(ChunkSize)).
  105. -spec interleaved_list_gen(proper_types:type()) -> proper_types:type().
  106. interleaved_list_gen(Type) ->
  107. interleaved_streams_gen(non_empty(list(Type))).
  108. %% -----------------------------------------------------------------------------
  109. %% Generic streams
  110. %% -----------------------------------------------------------------------------
  111. %% @doc Consume one element from the stream.
  112. -spec next(cont(A)) -> stream(A).
  113. next(Fun) when is_function(Fun, 0) ->
  114. Fun();
  115. next(L) ->
  116. L.
  117. %% @doc Take a list of tagged streams and return a stream where
  118. %% elements of the streams are tagged and randomly interleaved.
  119. %%
  120. %% Note: this function is more or less generic and it's compatible
  121. %% with this module's `generate_chunks' function family, as well as
  122. %% `ets:next', lists and what not
  123. %%
  124. %% Consider using simplified versions of this function
  125. -spec interleave_streams([{Tag, stream(Data)}]) -> stream({Tag, Data}).
  126. interleave_streams(Streams) ->
  127. do_interleave_streams(
  128. #interleave_state{streams = Streams}
  129. ).
  130. %% @doc Take an arbitrary stream and add repetitions of the elements
  131. %% TODO: Make retransmissions of arbitrary length
  132. -spec retransmits(stream(Data), float()) -> stream(Data).
  133. retransmits(Stream, Probability) ->
  134. case Stream of
  135. [Data | Cont0] ->
  136. Cont = fun() -> retransmits(next(Cont0), Probability) end,
  137. case rand:uniform() < Probability of
  138. true -> [Data, Data | Cont];
  139. false -> [Data | Cont]
  140. end;
  141. ?end_of_stream ->
  142. ?end_of_stream
  143. end.
  144. %% @doc Consume all elements of the stream and feed them into a
  145. %% callback (e.g. brod:produce)
  146. -spec consume(
  147. stream(A),
  148. fun((A) -> Ret)
  149. ) -> [Ret].
  150. consume(Stream, Callback) ->
  151. case Stream of
  152. [Data | Cont] -> [Callback(Data) | consume(next(Cont), Callback)];
  153. ?end_of_stream -> []
  154. end.
  155. %% @equiv consume(Stream, fun(A) -> A end)
  156. -spec consume(stream(A)) -> [A].
  157. consume(Stream) ->
  158. consume(Stream, fun(A) -> A end).
  159. %% -----------------------------------------------------------------------------
  160. %% Misc functions
  161. %% -----------------------------------------------------------------------------
  162. %% @doc Return number of bytes in `N' megabytes
  163. -spec mb(integer()) -> integer().
  164. mb(N) ->
  165. N * 1048576.
  166. %% -----------------------------------------------------------------------------
  167. %% List streams
  168. %% -----------------------------------------------------------------------------
  169. -spec list_to_stream([A]) -> stream(A).
  170. list_to_stream(L) -> L.
  171. %% -----------------------------------------------------------------------------
  172. %% Binary streams
  173. %% -----------------------------------------------------------------------------
  174. %% @doc First argument is a chunk number, the second one is a seed.
  175. %% This implementation is hardly efficient, but it was chosen for
  176. %% clarity reasons
  177. -spec generator_fun(integer(), binary()) -> binary().
  178. generator_fun(N, Seed) ->
  179. crypto:hash(md5, <<N:32, Seed/binary>>).
  180. %% @doc Get byte at offset `N'
  181. -spec get_byte(integer(), term()) -> byte().
  182. get_byte(N, Seed) ->
  183. do_get_byte(N, seed_hash(Seed)).
  184. %% @doc Stream of binary chunks. Limitation: both payload size and
  185. %% `ChunkSize' should be dividable by `?hash_size'
  186. -spec generate_chunk(payload(), integer()) -> stream(binary_payload()).
  187. generate_chunk({Seed, Size}, ChunkSize) when
  188. ChunkSize rem ?hash_size =:= 0
  189. ->
  190. State = #chunk_state{
  191. seed = Seed,
  192. payload_size = Size,
  193. chunk_size = ChunkSize,
  194. offset = 0
  195. },
  196. generate_chunk(State).
  197. %% @doc Take a list of `payload()'s and a callback function, and start
  198. %% producing the payloads in random order. Seed is used as a tag
  199. %% @see interleave_streams/4
  200. -spec interleave_chunks([{payload(), ChunkSize :: non_neg_integer()}]) ->
  201. tagged_binstream().
  202. interleave_chunks(Streams0) ->
  203. Streams = [
  204. {Tag, generate_chunk(Payload, ChunkSize)}
  205. || {Payload = {Tag, _}, ChunkSize} <- Streams0
  206. ],
  207. interleave_streams(Streams).
  208. %% @doc Take a list of `payload()'s and a callback function, and start
  209. %% consuming the payloads in a random order. Seed is used as a
  210. %% tag. All streams use the same chunk size
  211. %% @see interleave_streams/2
  212. -spec interleave_chunks(
  213. [payload()],
  214. non_neg_integer()
  215. ) -> tagged_binstream().
  216. interleave_chunks(Streams0, ChunkSize) ->
  217. Streams = [
  218. {Seed, generate_chunk({Seed, Size}, ChunkSize)}
  219. || {Seed, Size} <- Streams0
  220. ],
  221. interleave_streams(Streams).
  222. %% @doc Generate chunks of data and feed them into
  223. %% `Callback'
  224. -spec generate_chunks(
  225. payload(),
  226. integer(),
  227. fun((binary()) -> A)
  228. ) -> [A].
  229. generate_chunks(Payload, ChunkSize, Callback) ->
  230. consume(generate_chunk(Payload, ChunkSize), Callback).
  231. -spec check_consistency(
  232. payload(),
  233. integer(),
  234. fun((integer()) -> {ok, binary()} | undefined)
  235. ) -> ok.
  236. check_consistency({Seed, Size}, SampleSize, Callback) ->
  237. SeedHash = seed_hash(Seed),
  238. Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
  239. %% Always check first and last bytes, and one that should not exist:
  240. Samples = [0, Size - 1, Size | Random],
  241. lists:foreach(
  242. fun
  243. (N) when N < Size ->
  244. Expected = do_get_byte(N, SeedHash),
  245. ?assertEqual(
  246. {N, {ok, Expected}},
  247. {N, Callback(N)}
  248. );
  249. (N) ->
  250. ?assertMatch(undefined, Callback(N))
  251. end,
  252. Samples
  253. ).
  254. -spec check_file_consistency(
  255. payload(),
  256. integer(),
  257. file:filename()
  258. ) -> ok.
  259. check_file_consistency(Payload, SampleSize, FileName) ->
  260. {ok, FD} = file:open(FileName, [read, raw]),
  261. try
  262. Fun = fun(N) ->
  263. case file:pread(FD, [{N, 1}]) of
  264. {ok, [[X]]} -> {ok, X};
  265. {ok, [eof]} -> undefined
  266. end
  267. end,
  268. check_consistency(Payload, SampleSize, Fun)
  269. after
  270. file:close(FD)
  271. end.
  272. %% =============================================================================
  273. %% Internal functions
  274. %% =============================================================================
  275. -spec do_interleave_streams(interleave_state()) -> stream(_Data).
  276. do_interleave_streams(#interleave_state{streams = []}) ->
  277. ?end_of_stream;
  278. do_interleave_streams(#interleave_state{streams = Streams} = State0) ->
  279. %% Not the most efficient implementation (lots of avoidable list
  280. %% traversals), but we don't expect the number of streams to be the
  281. %% bottleneck
  282. N = rand:uniform(length(Streams)),
  283. {Hd, [{Tag, SC} | Tl]} = lists:split(N - 1, Streams),
  284. case SC of
  285. [Payload | SC1] ->
  286. State = State0#interleave_state{streams = Hd ++ [{Tag, next(SC1)} | Tl]},
  287. Cont = fun() -> do_interleave_streams(State) end,
  288. [{Tag, Payload} | Cont];
  289. ?end_of_stream ->
  290. State = State0#interleave_state{streams = Hd ++ Tl},
  291. do_interleave_streams(State)
  292. end.
  293. %% @doc Continue generating chunks
  294. -spec generate_chunk(chunk_state()) -> stream(binary()).
  295. generate_chunk(#chunk_state{offset = Offset, payload_size = Size}) when
  296. Offset >= Size
  297. ->
  298. ?end_of_stream;
  299. generate_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
  300. State = State0#chunk_state{offset = Offset + ChunkSize},
  301. Payload = generate_chunk(
  302. State#chunk_state.seed,
  303. Offset,
  304. ChunkSize,
  305. State#chunk_state.payload_size
  306. ),
  307. [Payload | fun() -> generate_chunk(State) end].
  308. generate_chunk(Seed, Offset, ChunkSize, Size) ->
  309. SeedHash = seed_hash(Seed),
  310. To = min(Offset + ChunkSize, Size) - 1,
  311. Payload = iolist_to_binary([
  312. generator_fun(I, SeedHash)
  313. || I <- lists:seq(Offset div 16, To div 16)
  314. ]),
  315. ChunkNum = Offset div ChunkSize + 1,
  316. ChunkCnt = ceil(Size / ChunkSize),
  317. {Payload, ChunkNum, ChunkCnt}.
  318. %% @doc Hash any term
  319. -spec seed_hash(term()) -> binary().
  320. seed_hash(Seed) ->
  321. crypto:hash(md5, term_to_binary(Seed)).
  322. %% @private Get byte at offset `N'
  323. -spec do_get_byte(integer(), binary()) -> byte().
  324. do_get_byte(N, Seed) ->
  325. Chunk = generator_fun(N div ?hash_size, Seed),
  326. binary:at(Chunk, N rem ?hash_size).