|
@@ -40,11 +40,14 @@
|
|
|
-export([
|
|
-export([
|
|
|
samplers/0,
|
|
samplers/0,
|
|
|
samplers/2,
|
|
samplers/2,
|
|
|
- current_rate/0,
|
|
|
|
|
current_rate/1,
|
|
current_rate/1,
|
|
|
granularity_adapter/1
|
|
granularity_adapter/1
|
|
|
]).
|
|
]).
|
|
|
|
|
|
|
|
|
|
+-ifdef(TEST).
|
|
|
|
|
+-export([current_rate_cluster/0]).
|
|
|
|
|
+-endif.
|
|
|
|
|
+
|
|
|
%% for rpc
|
|
%% for rpc
|
|
|
-export([do_sample/2]).
|
|
-export([do_sample/2]).
|
|
|
|
|
|
|
@@ -112,29 +115,8 @@ granularity_adapter(List) when length(List) > 1000 ->
|
|
|
granularity_adapter(List) ->
|
|
granularity_adapter(List) ->
|
|
|
List.
|
|
List.
|
|
|
|
|
|
|
|
-%% Get the current rate. Not the current sampler data.
|
|
|
|
|
-current_rate() ->
|
|
|
|
|
- Fun =
|
|
|
|
|
- fun
|
|
|
|
|
- (Node, Cluster) when is_map(Cluster) ->
|
|
|
|
|
- case current_rate(Node) of
|
|
|
|
|
- {ok, CurrentRate} ->
|
|
|
|
|
- merge_cluster_rate(CurrentRate, Cluster);
|
|
|
|
|
- {badrpc, Reason} ->
|
|
|
|
|
- {badrpc, {Node, Reason}}
|
|
|
|
|
- end;
|
|
|
|
|
- (_Node, Error) ->
|
|
|
|
|
- Error
|
|
|
|
|
- end,
|
|
|
|
|
- case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
|
|
|
|
|
- {badrpc, Reason} ->
|
|
|
|
|
- {badrpc, Reason};
|
|
|
|
|
- Rate ->
|
|
|
|
|
- {ok, Rate}
|
|
|
|
|
- end.
|
|
|
|
|
-
|
|
|
|
|
current_rate(all) ->
|
|
current_rate(all) ->
|
|
|
- current_rate();
|
|
|
|
|
|
|
+ current_rate_cluster();
|
|
|
current_rate(Node) when Node == node() ->
|
|
current_rate(Node) when Node == node() ->
|
|
|
try
|
|
try
|
|
|
{ok, Rate} = do_call(current_rate),
|
|
{ok, Rate} = do_call(current_rate),
|
|
@@ -148,7 +130,7 @@ current_rate(Node) when Node == node() ->
|
|
|
{Key, 0}
|
|
{Key, 0}
|
|
|
|| Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
|
|
|| Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
|
|
|
],
|
|
],
|
|
|
- {ok, maps:from_list(Rate0)}
|
|
|
|
|
|
|
+ {ok, maps:merge(maps:from_list(Rate0), non_rate_value())}
|
|
|
end;
|
|
end;
|
|
|
current_rate(Node) ->
|
|
current_rate(Node) ->
|
|
|
case emqx_dashboard_proto_v1:current_rate(Node) of
|
|
case emqx_dashboard_proto_v1:current_rate(Node) of
|
|
@@ -158,6 +140,27 @@ current_rate(Node) ->
|
|
|
{ok, Rate}
|
|
{ok, Rate}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
|
|
+%% Get the current rate. Not the current sampler data.
|
|
|
|
|
+current_rate_cluster() ->
|
|
|
|
|
+ Fun =
|
|
|
|
|
+ fun
|
|
|
|
|
+ (Node, Cluster) when is_map(Cluster) ->
|
|
|
|
|
+ case current_rate(Node) of
|
|
|
|
|
+ {ok, CurrentRate} ->
|
|
|
|
|
+ merge_cluster_rate(CurrentRate, Cluster);
|
|
|
|
|
+ {badrpc, Reason} ->
|
|
|
|
|
+ {badrpc, {Node, Reason}}
|
|
|
|
|
+ end;
|
|
|
|
|
+ (_Node, Error) ->
|
|
|
|
|
+ Error
|
|
|
|
|
+ end,
|
|
|
|
|
+ case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
|
|
|
|
|
+ {badrpc, Reason} ->
|
|
|
|
|
+ {badrpc, Reason};
|
|
|
|
|
+ Rate ->
|
|
|
|
|
+ {ok, Rate}
|
|
|
|
|
+ end.
|
|
|
|
|
+
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% gen_server functions
|
|
%% gen_server functions
|
|
|
|
|
|
|
@@ -258,8 +261,13 @@ merge_cluster_sampler_map(M1, M2) ->
|
|
|
merge_cluster_rate(Node, Cluster) ->
|
|
merge_cluster_rate(Node, Cluster) ->
|
|
|
Fun =
|
|
Fun =
|
|
|
fun
|
|
fun
|
|
|
- (topics, Value, NCluster) ->
|
|
|
|
|
- NCluster#{topics => Value};
|
|
|
|
|
|
|
+ %% cluster-synced values
|
|
|
|
|
+ (topics, V, NCluster) ->
|
|
|
|
|
+ NCluster#{topics => V};
|
|
|
|
|
+ (retained_msg_count, V, NCluster) ->
|
|
|
|
|
+ NCluster#{retained_msg_count => V};
|
|
|
|
|
+ (license_quota, V, NCluster) ->
|
|
|
|
|
+ NCluster#{license_quota => V};
|
|
|
(Key, Value, NCluster) ->
|
|
(Key, Value, NCluster) ->
|
|
|
ClusterValue = maps:get(Key, NCluster, 0),
|
|
ClusterValue = maps:get(Key, NCluster, 0),
|
|
|
NCluster#{Key => Value + ClusterValue}
|
|
NCluster#{Key => Value + ClusterValue}
|