|
@@ -18,32 +18,83 @@
|
|
|
|
|
|
|
|
-ifdef(EMQX_BENCHMARK).
|
|
-ifdef(EMQX_BENCHMARK).
|
|
|
|
|
|
|
|
--export([start/1, run1/0, run1/2]).
|
|
|
|
|
|
|
+-export([run/1, run1/0, run1/4]).
|
|
|
|
|
|
|
|
-run1() -> run1(4, 1000).
|
|
|
|
|
|
|
+-define(T(Expr), timer:tc(fun() -> Expr end)).
|
|
|
|
|
|
|
|
-run1(Factor, Limit) ->
|
|
|
|
|
- start(#{factor => Factor,
|
|
|
|
|
- limit => Limit,
|
|
|
|
|
- sub_ptn => <<"device/{{id}}/+/{{num}}/#">>,
|
|
|
|
|
- pub_ptn => <<"device/{{id}}/xays/{{num}}/foo/bar/baz">>}).
|
|
|
|
|
|
|
+run1() -> run1(80, 1000, 80, 10000).
|
|
|
|
|
+
|
|
|
|
|
+run1(Subs, SubOps, Pubs, PubOps) ->
|
|
|
|
|
+ run(#{subscribers => Subs,
|
|
|
|
|
+ publishers => Pubs,
|
|
|
|
|
+ sub_ops => SubOps,
|
|
|
|
|
+ pub_ops => PubOps,
|
|
|
|
|
+ sub_ptn => <<"device/{{id}}/+/{{num}}/#">>,
|
|
|
|
|
+ pub_ptn => <<"device/{{id}}/foo/{{num}}/bar">>
|
|
|
|
|
+ }).
|
|
|
|
|
|
|
|
%% setting fields:
|
|
%% setting fields:
|
|
|
-%% - factor: spawn broker-pool-size * factor number of callers
|
|
|
|
|
-%% - limit: limit the total number of topics for each caller
|
|
|
|
|
|
|
+%% - subscribers: spawn this number of subscriber workers
|
|
|
|
|
+%% - publishers: spawn this number of publisher workers
|
|
|
|
|
+%% - sub_ops: the number of subscribes (route insert) each subscriber runs
|
|
|
|
|
+%% - pub_ops: the number of publish (route lookups) each publisher runs
|
|
|
%% - sub_ptn: subscribe topic pattern like a/+/b/+/c/#
|
|
%% - sub_ptn: subscribe topic pattern like a/+/b/+/c/#
|
|
|
%% or a/+/{{id}}/{{num}}/# to generate pattern with {{id}}
|
|
%% or a/+/{{id}}/{{num}}/# to generate pattern with {{id}}
|
|
|
%% replaced by worker id and {{num}} replaced by topic number.
|
|
%% replaced by worker id and {{num}} replaced by topic number.
|
|
|
%% - pub_ptn: topic pattern used to benchmark publish (match) performance
|
|
%% - pub_ptn: topic pattern used to benchmark publish (match) performance
|
|
|
%% e.g. a/x/{{id}}/{{num}}/foo/bar
|
|
%% e.g. a/x/{{id}}/{{num}}/foo/bar
|
|
|
-start(#{factor := Factor} = Settings) ->
|
|
|
|
|
- BrokerPoolSize = emqx_vm:schedulers() * 2,
|
|
|
|
|
- Pids = start_callers(BrokerPoolSize * Factor, Settings),
|
|
|
|
|
- R = collect_results(Pids, #{subscribe => 0, match => 0}),
|
|
|
|
|
|
|
+run(#{subscribers := Subs,
|
|
|
|
|
+ publishers := Pubs,
|
|
|
|
|
+ sub_ops := SubOps,
|
|
|
|
|
+ pub_ops := PubOps
|
|
|
|
|
+ } = Settings) ->
|
|
|
|
|
+ SubsPids = start_callers(Subs, fun start_subscriber/1, Settings),
|
|
|
|
|
+ PubsPids = start_callers(Pubs, fun start_publisher/1, Settings),
|
|
|
|
|
+ _ = collect_results(SubsPids, subscriber_ready),
|
|
|
|
|
+ io:format(user, "subscribe ...~n", []),
|
|
|
|
|
+ {T1, SubsTime} =
|
|
|
|
|
+ ?T(begin
|
|
|
|
|
+ lists:foreach(fun(Pid) -> Pid ! start_subscribe end, SubsPids),
|
|
|
|
|
+ collect_results(SubsPids, subscribe_time)
|
|
|
|
|
+ end),
|
|
|
|
|
+ io:format(user, "InsertTotalTime: ~s~n", [ns(T1)]),
|
|
|
|
|
+ io:format(user, "InsertTimeAverage: ~s~n", [ns(SubsTime / Subs)]),
|
|
|
|
|
+ io:format(user, "InsertRps: ~p~n", [rps(Subs * SubOps, T1)]),
|
|
|
|
|
+
|
|
|
|
|
+ io:format(user, "lookup ...~n", []),
|
|
|
|
|
+ {T2, PubsTime} =
|
|
|
|
|
+ ?T(begin
|
|
|
|
|
+ lists:foreach(fun(Pid) -> Pid ! start_lookup end, PubsPids),
|
|
|
|
|
+ collect_results(PubsPids, lookup_time)
|
|
|
|
|
+ end),
|
|
|
|
|
+ io:format(user, "LookupTotalTime: ~s~n", [ns(T2)]),
|
|
|
|
|
+ io:format(user, "LookupTimeAverage: ~s~n", [ns(PubsTime / Pubs)]),
|
|
|
|
|
+ io:format(user, "LookupRps: ~p~n", [rps(Pubs * PubOps, T2)]),
|
|
|
|
|
+
|
|
|
io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]),
|
|
io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]),
|
|
|
- io:format(user, "~p~n", [erlang:memory()]),
|
|
|
|
|
- io:format(user, "~p~n", [R]),
|
|
|
|
|
- lists:foreach(fun(Pid) -> Pid ! stop end, Pids).
|
|
|
|
|
|
|
+ io:format(user, "erlang memory: ~p~n", [erlang:memory()]),
|
|
|
|
|
+
|
|
|
|
|
+ io:format(user, "unsubscribe ...~n", []),
|
|
|
|
|
+ {T3, ok} =
|
|
|
|
|
+ ?T(begin
|
|
|
|
|
+ lists:foreach(fun(Pid) -> Pid ! stop end, SubsPids),
|
|
|
|
|
+ wait_until_empty()
|
|
|
|
|
+ end),
|
|
|
|
|
+ io:format(user, "TimeToUnsubscribeAll: ~s~n", [ns(T3)]).
|
|
|
|
|
+
|
|
|
|
|
+wait_until_empty() ->
|
|
|
|
|
+ case emqx_trie:empty() of
|
|
|
|
|
+ true -> ok;
|
|
|
|
|
+ false ->
|
|
|
|
|
+ timer:sleep(5),
|
|
|
|
|
+ wait_until_empty()
|
|
|
|
|
+ end.
|
|
|
|
|
+
|
|
|
|
|
+rps(N, NanoSec) -> N * 1_000_000 / NanoSec.
|
|
|
|
|
+
|
|
|
|
|
+ns(T) when T > 1_000_000 -> io_lib:format("~p(s)", [T / 1_000_000]);
|
|
|
|
|
+ns(T) when T > 1_000 -> io_lib:format("~p(ms)", [T / 1_000]);
|
|
|
|
|
+ns(T) -> io_lib:format("~p(ns)", [T]).
|
|
|
|
|
|
|
|
ram_bytes() ->
|
|
ram_bytes() ->
|
|
|
Wordsize = erlang:system_info(wordsize),
|
|
Wordsize = erlang:system_info(wordsize),
|
|
@@ -56,48 +107,69 @@ ram_bytes() ->
|
|
|
0
|
|
0
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-start_callers(0, _) -> [];
|
|
|
|
|
-start_callers(N, Settings) ->
|
|
|
|
|
- [start_caller(Settings#{id => N}) | start_callers(N - 1, Settings)].
|
|
|
|
|
|
|
+start_callers(N, F, Settings) ->
|
|
|
|
|
+ start_callers(N, F, Settings, []).
|
|
|
|
|
+
|
|
|
|
|
+start_callers(0, _F, _Settings, Acc) ->
|
|
|
|
|
+ lists:reverse(Acc);
|
|
|
|
|
+start_callers(N, F, Settings, Acc) ->
|
|
|
|
|
+ start_callers(N - 1, F, Settings, [F(Settings#{id => N}) | Acc]).
|
|
|
|
|
|
|
|
-collect_results([], R) -> R;
|
|
|
|
|
-collect_results([Pid | Pids], Acc = #{subscribe := Sr, match := Mr}) ->
|
|
|
|
|
|
|
+collect_results(Pids, Tag) ->
|
|
|
|
|
+ collect_results(Pids, Tag, 0).
|
|
|
|
|
+
|
|
|
|
|
+collect_results([], _Tag, R) -> R;
|
|
|
|
|
+collect_results([Pid | Pids], Tag, R) ->
|
|
|
receive
|
|
receive
|
|
|
- {Pid, #{subscribe := Srd, match := Mrd}} ->
|
|
|
|
|
- collect_results(Pids, Acc#{subscribe := Sr + Srd, match := Mr + Mrd})
|
|
|
|
|
|
|
+ {Pid, Tag, N} ->
|
|
|
|
|
+ collect_results(Pids, Tag, N + R)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-%% ops per second
|
|
|
|
|
-rps(T, N) -> round(N / (T / 1000000)).
|
|
|
|
|
-
|
|
|
|
|
-start_caller(#{id := Id, limit := N, sub_ptn := SubPtn, pub_ptn := PubPtn}) ->
|
|
|
|
|
|
|
+start_subscriber(#{id := Id, sub_ops := N, sub_ptn := SubPtn}) ->
|
|
|
Parent = self(),
|
|
Parent = self(),
|
|
|
proc_lib:spawn_link(
|
|
proc_lib:spawn_link(
|
|
|
fun() ->
|
|
fun() ->
|
|
|
SubTopics = make_topics(SubPtn, Id, N),
|
|
SubTopics = make_topics(SubPtn, Id, N),
|
|
|
- {Ts, _} = timer:tc(fun() -> subscribe(SubTopics) end),
|
|
|
|
|
- PubTopics = make_topics(PubPtn, Id, N),
|
|
|
|
|
- {Tm, _} = timer:tc(fun() -> match(PubTopics) end),
|
|
|
|
|
- _ = erlang:send(Parent, {self(), #{subscribe => rps(Ts, N), match => rps(Tm, N)}}),
|
|
|
|
|
|
|
+ Parent ! {self(), subscriber_ready, 0},
|
|
|
|
|
+ receive
|
|
|
|
|
+ start_subscribe ->
|
|
|
|
|
+ ok
|
|
|
|
|
+ end,
|
|
|
|
|
+ {Ts, _} = ?T(subscribe(SubTopics)),
|
|
|
|
|
+ _ = erlang:send(Parent, {self(), subscribe_time, Ts/ N}),
|
|
|
|
|
+ %% subscribers should not exit before publish test is done
|
|
|
receive
|
|
receive
|
|
|
stop ->
|
|
stop ->
|
|
|
ok
|
|
ok
|
|
|
end
|
|
end
|
|
|
end).
|
|
end).
|
|
|
|
|
|
|
|
-match([]) -> ok;
|
|
|
|
|
-match([Topic | Topics]) ->
|
|
|
|
|
- _ = emqx_router:lookup_routes(Topic),
|
|
|
|
|
- match(Topics).
|
|
|
|
|
|
|
+start_publisher(#{id := Id, pub_ops := N, pub_ptn := PubPtn, subscribers := Subs}) ->
|
|
|
|
|
+ Parent = self(),
|
|
|
|
|
+ proc_lib:spawn_link(
|
|
|
|
|
+ fun() ->
|
|
|
|
|
+ L = lists:seq(1, N),
|
|
|
|
|
+ [Topic] = make_topics(PubPtn, (Id rem Subs) + 1, 1),
|
|
|
|
|
+ receive
|
|
|
|
|
+ start_lookup ->
|
|
|
|
|
+ ok
|
|
|
|
|
+ end,
|
|
|
|
|
+ {Tm, ok} = ?T(lists:foreach(fun(_) -> match(Topic) end, L)),
|
|
|
|
|
+ _ = erlang:send(Parent, {self(), lookup_time, Tm / N}),
|
|
|
|
|
+ ok
|
|
|
|
|
+ end).
|
|
|
|
|
+
|
|
|
|
|
+match(Topic) ->
|
|
|
|
|
+ [_] = emqx_router:match_routes(Topic).
|
|
|
|
|
|
|
|
subscribe([]) -> ok;
|
|
subscribe([]) -> ok;
|
|
|
subscribe([Topic | Rest]) ->
|
|
subscribe([Topic | Rest]) ->
|
|
|
ok = emqx_broker:subscribe(Topic),
|
|
ok = emqx_broker:subscribe(Topic),
|
|
|
subscribe(Rest).
|
|
subscribe(Rest).
|
|
|
|
|
|
|
|
-make_topics(SubPtn0, Id, Limit) ->
|
|
|
|
|
- SubPtn = emqx_topic:words(SubPtn0),
|
|
|
|
|
- F = fun(N) -> render(Id, N, SubPtn) end,
|
|
|
|
|
|
|
+make_topics(Ptn0, Id, Limit) ->
|
|
|
|
|
+ Ptn = emqx_topic:words(Ptn0),
|
|
|
|
|
+ F = fun(N) -> render(Id, N, Ptn) end,
|
|
|
lists:map(F, lists:seq(1, Limit)).
|
|
lists:map(F, lists:seq(1, Limit)).
|
|
|
|
|
|
|
|
render(ID, N, Ptn) ->
|
|
render(ID, N, Ptn) ->
|