|
|
@@ -51,7 +51,8 @@
|
|
|
|
|
|
-define(TAB, ?MODULE).
|
|
|
|
|
|
--define(SECONDS, 1_000).
|
|
|
+-define(ONE_SECOND, 1_000).
|
|
|
+-define(SECONDS, ?ONE_SECOND).
|
|
|
-define(ONE_MINUTE, 60 * ?SECONDS).
|
|
|
-define(MINUTES, ?ONE_MINUTE).
|
|
|
-define(ONE_HOUR, 60 * ?MINUTES).
|
|
|
@@ -204,17 +205,10 @@ do_call(Request) ->
|
|
|
gen_server:call(?MODULE, Request, 5000).
|
|
|
|
|
|
do_sample(all, Time) ->
|
|
|
- AllNodes = emqx:running_nodes(),
|
|
|
- case [node()] =:= AllNodes of
|
|
|
- true ->
|
|
|
- %% micro-optimization to avoid downsample twice for single node
|
|
|
- do_sample(node(), Time);
|
|
|
- false ->
|
|
|
- %% downsample again after merged all data points from all nodes
|
|
|
- %% this is necessary because some nodes might be running older version
|
|
|
- %% which returns ALL data points.
|
|
|
- downsample(sample_nodes(AllNodes, Time, #{}))
|
|
|
- end;
|
|
|
+ AllNodes = emqx:running_nodes() -- [node()],
|
|
|
+ Local = do_sample(node(), Time),
|
|
|
+ All = sample_nodes(AllNodes, Time, Local),
|
|
|
+ maps:map(fun(_, S) -> adjust_synthetic_cluster_metrics(S) end, All);
|
|
|
do_sample(Node, Time) when Node == node() ->
|
|
|
MS = match_spec(Time),
|
|
|
FromDB = ets:select(?TAB, MS),
|
|
|
@@ -224,32 +218,36 @@ do_sample(Node, Time) when Node == node() ->
|
|
|
do_sample(Node, Time) ->
|
|
|
case emqx_dashboard_proto_v1:do_sample(Node, Time) of
|
|
|
{badrpc, Reason} ->
|
|
|
- {badrpc, {Node, Reason}};
|
|
|
+ {badrpc, #{node => Node, reason => Reason}};
|
|
|
Res ->
|
|
|
Res
|
|
|
end.
|
|
|
|
|
|
-sample_nodes([], _Time, Samples) ->
|
|
|
- maps:map(
|
|
|
- fun(_TS, Sample) -> adjust_synthetic_cluster_metrics(Sample) end,
|
|
|
- Samples
|
|
|
- );
|
|
|
-sample_nodes([Node | Nodes], Time, Res) ->
|
|
|
- case do_sample(Node, Time) of
|
|
|
- {badrpc, Reason} ->
|
|
|
- {badrpc, Reason};
|
|
|
- Samplers ->
|
|
|
- sample_nodes(Nodes, Time, merge_cluster_samplers(Samplers, Res))
|
|
|
- end.
|
|
|
+sample_nodes(Nodes, Time, Local) ->
|
|
|
+ ResList = concurrently_sample_nodes(Nodes, Time),
|
|
|
+ {Failed, Success} = lists:partition(
|
|
|
+ fun
|
|
|
+ ({badrpc, _}) -> true;
|
|
|
+ (_) -> false
|
|
|
+ end,
|
|
|
+ ResList
|
|
|
+ ),
|
|
|
+ Failed =/= [] andalso
|
|
|
+ ?SLOG(warning, #{msg => "failed_to_sample_monitor_data", errors => Failed}),
|
|
|
+ lists:foldl(fun merge_samplers/2, Local, Success).
|
|
|
+
|
|
|
+concurrently_sample_nodes(Nodes, Time) ->
|
|
|
+ %% emqx_dashboard_proto_v1:do_sample has a timeout (5s),
|
|
|
+ Timeout = ?RPC_TIMEOUT + ?ONE_SECOND,
|
|
|
+ %% call emqx_utils:pmap here instead of a rpc multicall
|
|
|
+ %% to avoid having to introduce a new bpapi proto version
|
|
|
+ emqx_utils:pmap(fun(Node) -> do_sample(Node, Time) end, Nodes, Timeout).
|
|
|
|
|
|
match_spec(infinity) ->
|
|
|
[{'$1', [], ['$1']}];
|
|
|
match_spec(Time) ->
|
|
|
[{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
|
|
|
|
|
|
-merge_cluster_samplers(NodeSamples, Cluster) ->
|
|
|
- merge_samplers(NodeSamples, Cluster).
|
|
|
-
|
|
|
merge_samplers(Increment, Base) ->
|
|
|
maps:fold(fun merge_samplers_loop/3, Base, Increment).
|
|
|
|