emqx_ft_content_gen.erl 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% Inspired by
  17. %% https://github.com/kafka4beam/kflow/blob/master/src/testbed/payload_gen.erl
  18. -module(emqx_ft_content_gen).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -dialyzer(no_improper_lists).
  21. -export([new/2]).
  22. -export([generate/3]).
  23. -export([next/1]).
  24. -export([consume/1]).
  25. -export([consume/2]).
  26. -export([fold/3]).
  27. -export([hash/2]).
  28. -export([check_file_consistency/3]).
  29. -export_type([cont/1]).
  30. -export_type([stream/1]).
  31. -export_type([binary_payload/0]).
  32. -define(hash_size, 16).
  33. -type payload() :: {Seed :: term(), Size :: integer()}.
  34. -type binary_payload() :: {
  35. binary(), _ChunkNum :: non_neg_integer(), _Meta :: #{}
  36. }.
  37. -type cont(Data) ::
  38. fun(() -> stream(Data))
  39. | stream(Data).
  40. -type stream(Data) ::
  41. maybe_improper_list(Data, cont(Data))
  42. | eos.
  43. -record(chunk_state, {
  44. seed :: term(),
  45. payload_size :: non_neg_integer(),
  46. offset :: non_neg_integer(),
  47. chunk_size :: non_neg_integer()
  48. }).
  49. -type chunk_state() :: #chunk_state{}.
  50. %% -----------------------------------------------------------------------------
  51. %% Generic streams
  52. %% -----------------------------------------------------------------------------
  53. %% @doc Consume one element from the stream.
  54. -spec next(cont(A)) -> stream(A).
  55. next(Fun) when is_function(Fun, 0) ->
  56. Fun();
  57. next(L) ->
  58. L.
  59. %% @doc Consume all elements of the stream and feed them into a
  60. %% callback (e.g. brod:produce)
  61. -spec consume(cont(A), fun((A) -> Ret)) -> [Ret].
  62. consume([Data | Cont], Callback) ->
  63. [Callback(Data) | consume(next(Cont), Callback)];
  64. consume(Cont, Callback) when is_function(Cont, 0) ->
  65. consume(next(Cont), Callback);
  66. consume(eos, _Callback) ->
  67. [].
  68. %% @equiv consume(Stream, fun(A) -> A end)
  69. -spec consume(cont(A)) -> [A].
  70. consume(Stream) ->
  71. consume(Stream, fun(A) -> A end).
  72. -spec fold(fun((A, Acc) -> Acc), Acc, cont(A)) -> Acc.
  73. fold(Fun, Acc, [Data | Cont]) ->
  74. fold(Fun, Fun(Data, Acc), next(Cont));
  75. fold(Fun, Acc, Cont) when is_function(Cont, 0) ->
  76. fold(Fun, Acc, next(Cont));
  77. fold(_Fun, Acc, eos) ->
  78. Acc.
  79. %% -----------------------------------------------------------------------------
  80. %% Binary streams
  81. %% -----------------------------------------------------------------------------
  82. %% @doc Stream of binary chunks.
  83. %% Limitation: `ChunkSize' should be dividable by `?hash_size'
  84. -spec new(payload(), integer()) -> cont(binary_payload()).
  85. new({Seed, Size}, ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
  86. fun() ->
  87. generate_next_chunk(#chunk_state{
  88. seed = Seed,
  89. payload_size = Size,
  90. chunk_size = ChunkSize,
  91. offset = 0
  92. })
  93. end.
  94. %% @doc Generate chunks of data and feed them into
  95. %% `Callback'
  96. -spec generate(payload(), integer(), fun((binary_payload()) -> A)) -> [A].
  97. generate(Payload, ChunkSize, Callback) ->
  98. consume(new(Payload, ChunkSize), Callback).
  99. -spec hash(cont(binary_payload()), crypto:hash_state()) -> binary().
  100. hash(Stream, HashCtxIn) ->
  101. crypto:hash_final(
  102. fold(
  103. fun({Chunk, _, _}, HashCtx) ->
  104. crypto:hash_update(HashCtx, Chunk)
  105. end,
  106. HashCtxIn,
  107. Stream
  108. )
  109. ).
  110. -spec check_consistency(
  111. payload(),
  112. integer(),
  113. fun((integer()) -> {ok, binary()} | undefined)
  114. ) -> ok.
  115. check_consistency({Seed, Size}, SampleSize, Callback) ->
  116. SeedHash = seed_hash(Seed),
  117. Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
  118. %% Always check first and last bytes, and one that should not exist:
  119. Samples = [0, Size - 1, Size | Random],
  120. lists:foreach(
  121. fun
  122. (N) when N < Size ->
  123. Expected = do_get_byte(N, SeedHash),
  124. ?assertEqual(
  125. {N, {ok, Expected}},
  126. {N, Callback(N)}
  127. );
  128. (N) ->
  129. ?assertMatch(undefined, Callback(N))
  130. end,
  131. Samples
  132. ).
  133. -spec check_file_consistency(
  134. payload(),
  135. integer(),
  136. file:filename()
  137. ) -> ok.
  138. check_file_consistency(Payload, SampleSize, FileName) ->
  139. {ok, FD} = file:open(FileName, [read, raw]),
  140. try
  141. Fun = fun(N) ->
  142. case file:pread(FD, [{N, 1}]) of
  143. {ok, [[X]]} -> {ok, X};
  144. {ok, [eof]} -> undefined
  145. end
  146. end,
  147. check_consistency(Payload, SampleSize, Fun)
  148. after
  149. file:close(FD)
  150. end.
  151. %% =============================================================================
  152. %% Internal functions
  153. %% =============================================================================
  154. %% @doc Continue generating chunks
  155. -spec generate_next_chunk(chunk_state()) -> stream(binary()).
  156. generate_next_chunk(#chunk_state{offset = Offset, payload_size = Size}) when Offset >= Size ->
  157. eos;
  158. generate_next_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
  159. State = State0#chunk_state{offset = Offset + ChunkSize},
  160. Payload = generate_chunk(
  161. State#chunk_state.seed,
  162. Offset,
  163. ChunkSize,
  164. State#chunk_state.payload_size
  165. ),
  166. [Payload | fun() -> generate_next_chunk(State) end].
  167. generate_chunk(Seed, Offset, ChunkSize, Size) ->
  168. SeedHash = seed_hash(Seed),
  169. To = min(Offset + ChunkSize, Size) - 1,
  170. Payload = iolist_to_binary([
  171. generator_fun(I, SeedHash)
  172. || I <- lists:seq(Offset div 16, To div 16)
  173. ]),
  174. ChunkNum = Offset div ChunkSize + 1,
  175. Meta = #{
  176. chunk_size => ChunkSize,
  177. chunk_count => ceil(Size / ChunkSize)
  178. },
  179. Chunk =
  180. case Offset + ChunkSize of
  181. NextOffset when NextOffset > Size ->
  182. binary:part(Payload, 0, Size rem ChunkSize);
  183. _ ->
  184. Payload
  185. end,
  186. {Chunk, ChunkNum, Meta}.
  187. %% @doc First argument is a chunk number, the second one is a seed.
  188. %% This implementation is hardly efficient, but it was chosen for
  189. %% clarity reasons
  190. -spec generator_fun(integer(), binary()) -> binary().
  191. generator_fun(N, Seed) ->
  192. crypto:hash(md5, <<N:32, Seed/binary>>).
  193. %% @doc Hash any term
  194. -spec seed_hash(term()) -> binary().
  195. seed_hash(Seed) ->
  196. crypto:hash(md5, term_to_binary(Seed)).
  197. %% @private Get byte at offset `N'
  198. -spec do_get_byte(integer(), binary()) -> byte().
  199. do_get_byte(N, Seed) ->
  200. Chunk = generator_fun(N div ?hash_size, Seed),
  201. binary:at(Chunk, N rem ?hash_size).