فهرست منبع

Merge pull request #13116 from keynslug/fix/sessds/monitor-api-metrics

fix(monitor): reflect durable subscriptions in historical metrics
Andrew Mayorov 1 سال پیش
والد
کامیت
2908da4a8a

+ 7 - 2
apps/emqx_dashboard/include/emqx_dashboard.hrl

@@ -73,14 +73,14 @@
 
 -define(GAUGE_SAMPLER_LIST, [
     disconnected_durable_sessions,
-    durable_subscriptions,
+    subscriptions_durable,
     subscriptions,
     topics,
     connections,
     live_connections
 ]).
 
--define(SAMPLER_LIST, ?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST).
+-define(SAMPLER_LIST, (?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST)).
 
 -define(DELTA_SAMPLER_RATE_MAP, #{
     received => received_msg_rate,
@@ -102,6 +102,11 @@
     ] ++ ?LICENSE_QUOTA
 ).
 
+-define(CLUSTERONLY_SAMPLER_LIST, [
+    subscriptions_durable,
+    disconnected_durable_sessions
+]).
+
 -if(?EMQX_RELEASE_EDITION == ee).
 -define(LICENSE_QUOTA, [license_quota]).
 -else.

+ 20 - 17
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -118,8 +118,7 @@ current_rate(all) ->
     current_rate_cluster();
 current_rate(Node) when Node == node() ->
     try
-        {ok, Rate} = do_call(current_rate),
-        {ok, adjust_individual_node_metrics(Rate)}
+        do_call(current_rate)
     catch
         _E:R ->
             ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
@@ -222,8 +221,11 @@ do_sample(Node, Time) ->
             Res
     end.
 
-do_sample([], _Time, Res) ->
-    Res;
+do_sample([], _Time, Samples) ->
+    maps:map(
+        fun(_TS, Sample) -> adjust_synthetic_cluster_metrics(Sample) end,
+        Samples
+    );
 do_sample([Node | Nodes], Time, Res) ->
     case do_sample(Node, Time) of
         {badrpc, Reason} ->
@@ -237,22 +239,27 @@ match_spec(infinity) ->
 match_spec(Time) ->
     [{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
 
-merge_cluster_samplers(Node, Cluster) ->
-    maps:fold(fun merge_cluster_samplers/3, Cluster, Node).
+merge_cluster_samplers(NodeSamples, Cluster) ->
+    maps:fold(fun merge_cluster_samplers/3, Cluster, NodeSamples).
 
-merge_cluster_samplers(TS, NodeData, Cluster) ->
+merge_cluster_samplers(TS, NodeSample, Cluster) ->
     case maps:get(TS, Cluster, undefined) of
         undefined ->
-            Cluster#{TS => NodeData};
-        ClusterData ->
-            Cluster#{TS => merge_cluster_sampler_map(NodeData, ClusterData)}
+            Cluster#{TS => NodeSample};
+        ClusterSample ->
+            Cluster#{TS => merge_cluster_sampler_map(NodeSample, ClusterSample)}
     end.
 
 merge_cluster_sampler_map(M1, M2) ->
     Fun =
         fun
-            (topics, Map) ->
-                Map#{topics => maps:get(topics, M1)};
+            (Key, Map) when
+                %% cluster-synced values
+                Key =:= topics;
+                Key =:= subscriptions_durable;
+                Key =:= disconnected_durable_sessions
+            ->
+                Map#{Key => maps:get(Key, M1)};
             (Key, Map) ->
                 Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)}
         end,
@@ -283,10 +290,6 @@ merge_cluster_rate(Node, Cluster) ->
         end,
     maps:fold(Fun, Cluster, Node).
 
-adjust_individual_node_metrics(Metrics0) ->
-    %% ensure renamed
-    emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0).
-
 adjust_synthetic_cluster_metrics(Metrics0) ->
     DSSubs = maps:get(subscriptions_durable, Metrics0, 0),
     RamSubs = maps:get(subscriptions, Metrics0, 0),
@@ -445,7 +448,7 @@ stats(connections) ->
     emqx_stats:getstat('connections.count');
 stats(disconnected_durable_sessions) ->
     emqx_persistent_session_bookkeeper:get_disconnected_session_count();
-stats(durable_subscriptions) ->
+stats(subscriptions_durable) ->
     emqx_stats:getstat('durable_subscriptions.count');
 stats(live_connections) ->
     emqx_stats:getstat('live_connections.count');

+ 3 - 8
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -202,11 +202,10 @@ swagger_desc(persisted) ->
     swagger_desc_format("Messages saved to the durable storage ");
 swagger_desc(disconnected_durable_sessions) ->
     <<"Disconnected durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
-swagger_desc(durable_subscriptions) ->
+swagger_desc(subscriptions_durable) ->
     <<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
 swagger_desc(subscriptions) ->
-    <<"Subscriptions at the time of sampling (not considering durable sessions).",
-        ?APPROXIMATE_DESC>>;
+    <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
 swagger_desc(topics) ->
     <<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
 swagger_desc(connections) ->
@@ -252,8 +251,4 @@ swagger_desc_format(Format, Type) ->
 maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
     Rates;
 maybe_reject_cluster_only_metrics(_Node, Rates) ->
-    ClusterOnlyMetrics = [
-        subscriptions_durable,
-        disconnected_durable_sessions
-    ],
-    maps:without(ClusterOnlyMetrics, Rates).
+    maps:without(?CLUSTERONLY_SAMPLER_LIST, Rates).

+ 24 - 6
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -224,13 +224,16 @@ t_monitor_current_api(_) ->
     ],
     ?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
     ?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)),
-    ClusterOnlyMetrics = [durable_subscriptions, disconnected_durable_sessions],
     {ok, NodeRate} = request(["monitor_current", "nodes", node()]),
-    [
-        ?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate), #{key => Key, rates => NodeRate})
-     || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
-        not lists:member(Key, ClusterOnlyMetrics)
-    ],
+    ExpectedKeys = lists:map(
+        fun atom_to_binary/1,
+        (?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)) -- ?CLUSTERONLY_SAMPLER_LIST
+    ),
+    ?assertEqual(
+        [],
+        ExpectedKeys -- maps:keys(NodeRate),
+        NodeRate
+    ),
     ?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)),
     ?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)),
     ?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)),
@@ -426,6 +429,21 @@ t_persistent_session_stats(Config) ->
             ?ON(N1, request(["monitor_current"]))
         )
     end),
+    %% Verify that historical metrics are in line with the current ones.
+    ?assertMatch(
+        {ok, [
+            #{
+                <<"time_stamp">> := _,
+                <<"connections">> := 3,
+                <<"disconnected_durable_sessions">> := 1,
+                <<"topics">> := 8,
+                <<"subscriptions">> := 8,
+                <<"subscriptions_ram">> := 4,
+                <<"subscriptions_durable">> := 4
+            }
+        ]},
+        ?ON(N1, request(["monitor"], "latest=1"))
+    ),
     {ok, {ok, _}} =
         ?wait_async_action(
             emqtt:disconnect(PSClient2),