| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- %% @doc This script can be loaded to a running EMQX EE node. It will
- %% create a number of DS databases with different options and fill
- %% them with data of given size.
- %%
- %% Then it will measure size of the database directories and create
- %% a "storage (in)efficiency" report.
- -module(storage_efficiency).
- -include_lib("emqx_utils/include/emqx_message.hrl").
- %% API:
- -export([run/0, run/1]).
- %%================================================================================
- %% API functions
- %%================================================================================
- run() ->
- run(#{}).
- run(Custom) ->
- RunConf = maps:merge(
- #{
- %% Sleep between batches:
- sleep => 1_000,
- %% Don't run test, only plot data:
- dry_run => false,
- %% Payload size multiplier:
- size => 10,
- %% Number of batches:
- batches => 100,
- %% Add generation every N batches:
- add_generation => 10
- },
- Custom
- ),
- lists:foreach(
- fun(DBConf) ->
- run(DBConf, RunConf)
- end,
- configs()
- ).
- %% erlfmt-ignore
- gnuplot_script(Filename) ->
- "set terminal qt\n"
- %% "set logscale y 10\n"
- "set title \"" ++ filename:basename(Filename, ".dat") ++ "\"\n"
- "set key autotitle columnheader\n"
- "plot for [n=2:*] \"" ++ Filename ++ "\" using 1:n with linespoints".
- %%================================================================================
- %% Internal functions
- %%================================================================================
- configs() ->
- [
- {'benchmark-skipstream-asn1',
- db_conf({emqx_ds_storage_skipstream_lts, #{serialization_schema => asn1}})},
- {'benchmark-skipstream-v1',
- db_conf({emqx_ds_storage_skipstream_lts, #{serialization_schema => v1}})},
- {'benchmark-bitfield', db_conf({emqx_ds_storage_bitfield_lts, #{}})}
- ].
- db_conf(Storage) ->
- #{
- backend => builtin_local,
- %% n_sites => 1,
- n_shards => 1,
- %% replication_factor => 1,
- %% replication_options => #{},
- storage => Storage
- }.
- -record(s, {
- data_size = 0,
- payload_size = 0,
- n_messages = 0,
- datapoints = #{},
- x_axis = []
- }).
- run({DB, Config}, RunConf) ->
- #{
- batches := NBatches,
- size := PSMultiplier,
- add_generation := AddGeneration,
- sleep := Sleep,
- dry_run := DryRun
- } = RunConf,
- {ok, _} = application:ensure_all_started(emqx_ds_backends),
- Dir = dir(DB),
- Filename = atom_to_list(DB) ++ ".dat",
- DryRun orelse
- begin
- io:format(user, "Running benchmark for ~p in ~p~n", [DB, Dir]),
- %% Ensure safe directory:
- {match, _} = re:run(Dir, filename:join("data", DB)),
- %% Ensure clean state:
- ok = emqx_ds:open_db(DB, Config),
- ok = emqx_ds:drop_db(DB),
- ok = file:del_dir_r(Dir),
- %% Open a fresh DB:
- ok = emqx_ds:open_db(DB, Config),
- S = lists:foldl(
- fun(Batch, Acc0) ->
- Size = PSMultiplier * Batch,
- io:format(user, "Storing batch with payload size ~p~n", [Size]),
- Acc1 = store_batch(DB, Size, Acc0),
- %% Sleep so all data is hopefully flushed:
- timer:sleep(Sleep),
- (Batch div AddGeneration) =:= 0 andalso
- emqx_ds:add_generation(DB),
- collect_datapoint(DB, Acc1)
- end,
- collect_datapoint(DB, #s{}),
- lists:seq(1, NBatches)
- ),
- {ok, FD} = file:open(Filename, [write]),
- io:put_chars(FD, print(S)),
- file:close(FD)
- end,
- os:cmd("echo '" ++ gnuplot_script(Filename) ++ "' | gnuplot --persist -"),
- ok.
- collect_datapoint(
- DB, S0 = #s{n_messages = N, data_size = DS, payload_size = PS, datapoints = DP0, x_axis = X}
- ) ->
- NewData = [{"$_n", N}, {"$data", DS}, {"$payloads", PS} | dirsize(DB)],
- DP = lists:foldl(
- fun({Key, Val}, Acc) ->
- maps:update_with(
- Key,
- fun(M) -> M#{N => Val} end,
- #{},
- Acc
- )
- end,
- DP0,
- NewData
- ),
- S0#s{
- datapoints = DP,
- x_axis = [N | X]
- }.
- print(#s{x_axis = XX, datapoints = DP}) ->
- Cols = lists:sort(maps:keys(DP)),
- Lines = [
- %% Print header:
- Cols
- %% Scan through rows:
- | [
- %% Scan throgh columns:
- [integer_to_binary(maps:get(X, maps:get(Col, DP), 0)) || Col <- Cols]
- || X <- lists:reverse(XX)
- ]
- ],
- lists:join(
- "\n",
- [lists:join(" ", Line) || Line <- Lines]
- ).
- dirsize(DB) ->
- RawOutput = os:cmd("cd " ++ dir(DB) ++ "; du -b --max-depth 1 ."),
- [
- begin
- [Sz, Dir] = string:lexemes(L, "\t"),
- {Dir, list_to_integer(Sz)}
- end
- || L <- string:lexemes(RawOutput, "\n")
- ].
- dir(DB) ->
- filename:join(emqx_ds_storage_layer:base_dir(), DB).
- store_batch(DB, PayloadSize, S0 = #s{n_messages = N, data_size = DS, payload_size = PS}) ->
- From = rand:bytes(16),
- BatchSize = 50,
- Batch = [
- #message{
- id = emqx_guid:gen(),
- timestamp = emqx_message:timestamp_now(),
- payload = rand:bytes(PayloadSize),
- from = From,
- topic = emqx_topic:join([
- <<"blah">>,
- <<"blah">>,
- '',
- <<"blah">>,
- From,
- <<"bazzzzzzzzzzzzzzzzzzzzzzz">>,
- integer_to_binary(I)
- ])
- }
- || I <- lists:seq(1, BatchSize)
- ],
- ok = emqx_ds:store_batch(DB, Batch, #{sync => true}),
- S0#s{
- n_messages = N + length(Batch),
- data_size = DS + lists:sum(lists:map(fun msg_size/1, Batch)),
- payload_size = PS + length(Batch) * PayloadSize
- }.
- %% We consider MQTT wire encoding to be "close to the ideal".
- msg_size(Msg = #message{}) ->
- iolist_size(emqx_frame:serialize(emqx_message:to_packet(undefined, Msg))).
|