|
|
@@ -222,8 +222,11 @@ do_sample(Node, Time) ->
|
|
|
Res
|
|
|
end.
|
|
|
|
|
|
-do_sample([], _Time, Res) ->
|
|
|
- Res;
|
|
|
+do_sample([], _Time, Samples) ->
|
|
|
+ maps:map(
|
|
|
+ fun(_TS, Sample) -> adjust_synthetic_cluster_metrics(Sample) end,
|
|
|
+ Samples
|
|
|
+ );
|
|
|
do_sample([Node | Nodes], Time, Res) ->
|
|
|
case do_sample(Node, Time) of
|
|
|
{badrpc, Reason} ->
|
|
|
@@ -237,22 +240,28 @@ match_spec(infinity) ->
|
|
|
match_spec(Time) ->
|
|
|
[{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
|
|
|
|
|
|
-merge_cluster_samplers(Node, Cluster) ->
|
|
|
- maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
|
|
|
+merge_cluster_samplers(NodeSamples, Cluster) ->
|
|
|
+ maps:fold(fun merge_cluster_samplers/3, Cluster, NodeSamples).
|
|
|
|
|
|
-merge_cluster_samplers(TS, NodeData, Cluster) ->
|
|
|
+merge_cluster_samplers(TS, NodeSample0, Cluster) ->
|
|
|
+ NodeSample = adjust_individual_node_metrics(NodeSample0),
|
|
|
case maps:get(TS, Cluster, undefined) of
|
|
|
undefined ->
|
|
|
- Cluster#{TS => NodeData};
|
|
|
- ClusterData ->
|
|
|
- Cluster#{TS => merge_cluster_sampler_map(NodeData, ClusterData)}
|
|
|
+ Cluster#{TS => NodeSample};
|
|
|
+ ClusterSample ->
|
|
|
+ Cluster#{TS => merge_cluster_sampler_map(NodeSample, ClusterSample)}
|
|
|
end.
|
|
|
|
|
|
merge_cluster_sampler_map(M1, M2) ->
|
|
|
Fun =
|
|
|
fun
|
|
|
- (topics, Map) ->
|
|
|
- Map#{topics => maps:get(topics, M1)};
|
|
|
+ (Key, Map) when
|
|
|
+ %% cluster-synced values
|
|
|
+ Key =:= topics;
|
|
|
+ Key =:= subscriptions_durable;
|
|
|
+ Key =:= disconnected_durable_sessions
|
|
|
+ ->
|
|
|
+ Map#{Key => maps:get(Key, M1)};
|
|
|
(Key, Map) ->
|
|
|
Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)}
|
|
|
end,
|