storage_efficiency.erl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc This script can be loaded to a running EMQX EE node. It will
  17. %% create a number of DS databases with different options and fill
  18. %% them with data of given size.
  19. %%
  20. %% Then it will measure size of the database directories and create
  21. %% a "storage (in)efficiency" report.
  22. -module(storage_efficiency).
  23. -include_lib("emqx_utils/include/emqx_message.hrl").
  24. %% API:
  25. -export([run/0, run/1]).
  26. %%================================================================================
  27. %% API functions
  28. %%================================================================================
  29. run() ->
  30. run(#{}).
  31. run(Custom) ->
  32. RunConf = maps:merge(
  33. #{
  34. %% Sleep between batches:
  35. sleep => 1_000,
  36. %% Don't run test, only plot data:
  37. dry_run => false,
  38. %% Payload size multiplier:
  39. size => 10,
  40. %% Number of batches:
  41. batches => 100,
  42. %% Add generation every N batches:
  43. add_generation => 10
  44. },
  45. Custom
  46. ),
  47. lists:foreach(
  48. fun(DBConf) ->
  49. run(DBConf, RunConf)
  50. end,
  51. configs()
  52. ).
  53. %% erlfmt-ignore
  54. gnuplot_script(Filename) ->
  55. "set terminal qt\n"
  56. %% "set logscale y 10\n"
  57. "set title \"" ++ filename:basename(Filename, ".dat") ++ "\"\n"
  58. "set key autotitle columnheader\n"
  59. "plot for [n=2:*] \"" ++ Filename ++ "\" using 1:n with linespoints".
  60. %%================================================================================
  61. %% Internal functions
  62. %%================================================================================
  63. configs() ->
  64. [
  65. {'benchmark-skipstream-asn1',
  66. db_conf({emqx_ds_storage_skipstream_lts, #{serialization_schema => asn1}})},
  67. {'benchmark-skipstream-v1',
  68. db_conf({emqx_ds_storage_skipstream_lts, #{serialization_schema => v1}})},
  69. {'benchmark-bitfield', db_conf({emqx_ds_storage_bitfield_lts, #{}})}
  70. ].
  71. db_conf(Storage) ->
  72. #{
  73. backend => builtin_local,
  74. %% n_sites => 1,
  75. n_shards => 1,
  76. %% replication_factor => 1,
  77. %% replication_options => #{},
  78. storage => Storage
  79. }.
  80. -record(s, {
  81. data_size = 0,
  82. payload_size = 0,
  83. n_messages = 0,
  84. datapoints = #{},
  85. x_axis = []
  86. }).
  87. run({DB, Config}, RunConf) ->
  88. #{
  89. batches := NBatches,
  90. size := PSMultiplier,
  91. add_generation := AddGeneration,
  92. sleep := Sleep,
  93. dry_run := DryRun
  94. } = RunConf,
  95. {ok, _} = application:ensure_all_started(emqx_ds_backends),
  96. Dir = dir(DB),
  97. Filename = atom_to_list(DB) ++ ".dat",
  98. DryRun orelse
  99. begin
  100. io:format(user, "Running benchmark for ~p in ~p~n", [DB, Dir]),
  101. %% Ensure safe directory:
  102. {match, _} = re:run(Dir, filename:join("data", DB)),
  103. %% Ensure clean state:
  104. ok = emqx_ds:open_db(DB, Config),
  105. ok = emqx_ds:drop_db(DB),
  106. ok = file:del_dir_r(Dir),
  107. %% Open a fresh DB:
  108. ok = emqx_ds:open_db(DB, Config),
  109. S = lists:foldl(
  110. fun(Batch, Acc0) ->
  111. Size = PSMultiplier * Batch,
  112. io:format(user, "Storing batch with payload size ~p~n", [Size]),
  113. Acc1 = store_batch(DB, Size, Acc0),
  114. %% Sleep so all data is hopefully flushed:
  115. timer:sleep(Sleep),
  116. (Batch div AddGeneration) =:= 0 andalso
  117. emqx_ds:add_generation(DB),
  118. collect_datapoint(DB, Acc1)
  119. end,
  120. collect_datapoint(DB, #s{}),
  121. lists:seq(1, NBatches)
  122. ),
  123. {ok, FD} = file:open(Filename, [write]),
  124. io:put_chars(FD, print(S)),
  125. file:close(FD)
  126. end,
  127. os:cmd("echo '" ++ gnuplot_script(Filename) ++ "' | gnuplot --persist -"),
  128. ok.
  129. collect_datapoint(
  130. DB, S0 = #s{n_messages = N, data_size = DS, payload_size = PS, datapoints = DP0, x_axis = X}
  131. ) ->
  132. NewData = [{"$_n", N}, {"$data", DS}, {"$payloads", PS} | dirsize(DB)],
  133. DP = lists:foldl(
  134. fun({Key, Val}, Acc) ->
  135. maps:update_with(
  136. Key,
  137. fun(M) -> M#{N => Val} end,
  138. #{},
  139. Acc
  140. )
  141. end,
  142. DP0,
  143. NewData
  144. ),
  145. S0#s{
  146. datapoints = DP,
  147. x_axis = [N | X]
  148. }.
  149. print(#s{x_axis = XX, datapoints = DP}) ->
  150. Cols = lists:sort(maps:keys(DP)),
  151. Lines = [
  152. %% Print header:
  153. Cols
  154. %% Scan through rows:
  155. | [
  156. %% Scan throgh columns:
  157. [integer_to_binary(maps:get(X, maps:get(Col, DP), 0)) || Col <- Cols]
  158. || X <- lists:reverse(XX)
  159. ]
  160. ],
  161. lists:join(
  162. "\n",
  163. [lists:join(" ", Line) || Line <- Lines]
  164. ).
  165. dirsize(DB) ->
  166. RawOutput = os:cmd("cd " ++ dir(DB) ++ "; du -b --max-depth 1 ."),
  167. [
  168. begin
  169. [Sz, Dir] = string:lexemes(L, "\t"),
  170. {Dir, list_to_integer(Sz)}
  171. end
  172. || L <- string:lexemes(RawOutput, "\n")
  173. ].
  174. dir(DB) ->
  175. filename:join(emqx_ds_storage_layer:base_dir(), DB).
  176. store_batch(DB, PayloadSize, S0 = #s{n_messages = N, data_size = DS, payload_size = PS}) ->
  177. From = rand:bytes(16),
  178. BatchSize = 50,
  179. Batch = [
  180. #message{
  181. id = emqx_guid:gen(),
  182. timestamp = emqx_message:timestamp_now(),
  183. payload = rand:bytes(PayloadSize),
  184. from = From,
  185. topic = emqx_topic:join([
  186. <<"blah">>,
  187. <<"blah">>,
  188. '',
  189. <<"blah">>,
  190. From,
  191. <<"bazzzzzzzzzzzzzzzzzzzzzzz">>,
  192. integer_to_binary(I)
  193. ])
  194. }
  195. || I <- lists:seq(1, BatchSize)
  196. ],
  197. ok = emqx_ds:store_batch(DB, Batch, #{sync => true}),
  198. S0#s{
  199. n_messages = N + length(Batch),
  200. data_size = DS + lists:sum(lists:map(fun msg_size/1, Batch)),
  201. payload_size = PS + length(Batch) * PayloadSize
  202. }.
  203. %% We consider MQTT wire encoding to be "close to the ideal".
  204. msg_size(Msg = #message{}) ->
  205. iolist_size(emqx_frame:serialize(emqx_message:to_packet(undefined, Msg))).