Просмотр исходного кода

feat: clear monitor data using the HTTP API

zhongwencool 1 год назад
Родитель
Сommit
ebcdb08586

+ 1 - 1
apps/emqx/priv/bpapi.versions

@@ -19,7 +19,7 @@
 {emqx_conf,3}.
 {emqx_conf,4}.
 {emqx_connector,1}.
-{emqx_dashboard,1}.
+{emqx_dashboard,2}.
 {emqx_delayed,1}.
 {emqx_delayed,2}.
 {emqx_delayed,3}.

+ 6 - 3
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -24,7 +24,7 @@
 
 -behaviour(gen_server).
 
--export([create_tables/0]).
+-export([create_tables/0, clear_table/0]).
 -export([start_link/0]).
 
 -export([
@@ -98,6 +98,9 @@ create_tables() ->
     ]),
     [?TAB].
 
+clear_table() ->
+    mria:clear_table(?TAB).
+
 %% -------------------------------------------------------------------------------------------------
 %% API
 
@@ -133,7 +136,7 @@ current_rate(Node) when Node == node() ->
             {ok, maps:merge(maps:from_list(Rate0), non_rate_value())}
     end;
 current_rate(Node) ->
-    case emqx_dashboard_proto_v1:current_rate(Node) of
+    case emqx_dashboard_proto_v2:current_rate(Node) of
         {badrpc, Reason} ->
             {badrpc, #{node => Node, reason => Reason}};
         {ok, Rate} ->
@@ -316,7 +319,7 @@ do_sample(all, Time) when is_integer(Time) ->
 do_sample(Node, Time) when Node == node() andalso is_integer(Time) ->
     do_sample_local(Time);
 do_sample(Node, Time) when is_integer(Time) ->
-    case emqx_dashboard_proto_v1:do_sample(Node, Time) of
+    case emqx_dashboard_proto_v2:do_sample(Node, Time) of
         {badrpc, Reason} ->
             {badrpc, #{node => Node, reason => Reason}};
         Res ->

+ 20 - 1
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -20,6 +20,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hocon_types.hrl").
 -include_lib("emqx_utils/include/emqx_utils_api.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -behaviour(minirest_api).
 
@@ -61,6 +62,13 @@ schema("/monitor") ->
                 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
                 400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
             }
+        },
+        delete => #{
+            tags => [<<"Metrics">>],
+            description => ?DESC(clear_monitor),
+            responses => #{
+                204 => <<"Metrics deleted">>
+            }
         }
     };
 schema("/monitor/nodes/:node") ->
@@ -148,7 +156,18 @@ fields_current(Names) ->
 monitor(get, #{query_string := QS, bindings := Bindings}) ->
     Latest = maps:get(<<"latest">>, QS, infinity),
     RawNode = maps:get(node, Bindings, <<"all">>),
-    emqx_utils_api:with_node_or_cluster(RawNode, dashboard_samplers_fun(Latest)).
+    emqx_utils_api:with_node_or_cluster(RawNode, dashboard_samplers_fun(Latest));
+monitor(delete, _) ->
+    Nodes = emqx:running_nodes(),
+    Results = emqx_dashboard_proto_v2:clear_table(Nodes),
+    NodeResults = lists:zip(Nodes, Results),
+    NodeErrors = [Result || Result = {_Node, NOk} <- NodeResults, NOk =/= {atomic, ok}],
+    NodeErrors == [] orelse
+        ?SLOG(warning, #{
+            msg => "clear_monitor_metrics_rpc_errors",
+            errors => NodeErrors
+        }),
+    ?NO_CONTENT.
 
 dashboard_samplers_fun(Latest) ->
     fun(NodeOrCluster) ->

+ 4 - 0
apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v1.erl

@@ -20,6 +20,7 @@
 
 -export([
     introduced_in/0,
+    deprecated_since/0,
     do_sample/2,
     current_rate/1
 ]).
@@ -30,6 +31,9 @@
 introduced_in() ->
     "5.0.0".
 
+deprecated_since() ->
+    "5.8.1".
+
 -spec do_sample(node(), Latest :: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc().
 do_sample(Node, Latest) ->
     rpc:call(Node, emqx_dashboard_monitor, do_sample, [Node, Latest], ?RPC_TIMEOUT).

+ 43 - 0
apps/emqx_dashboard/src/proto/emqx_dashboard_proto_v2.erl

@@ -0,0 +1,43 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_dashboard_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    do_sample/2,
+    clear_table/1,
+    current_rate/1
+]).
+
+-include("emqx_dashboard.hrl").
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.8.1".
+
+-spec do_sample(node(), Latest :: pos_integer() | infinity) -> list(map()) | emqx_rpc:badrpc().
+do_sample(Node, Latest) ->
+    erpc:call(Node, emqx_dashboard_monitor, do_sample, [Node, Latest], ?RPC_TIMEOUT).
+
+clear_table(Nodes) ->
+    erpc:multicall(Nodes, emqx_dashboard_monitor, clear_table, [], ?RPC_TIMEOUT).
+
+-spec current_rate(node()) -> {ok, map()} | emqx_rpc:badrpc().
+current_rate(Node) ->
+    erpc:call(Node, emqx_dashboard_monitor, current_rate, [Node], ?RPC_TIMEOUT).

+ 16 - 6
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -405,8 +405,8 @@ t_handle_old_monitor_data(_Config) ->
 
     ok = meck:new(emqx, [passthrough, no_history]),
     ok = meck:expect(emqx, running_nodes, fun() -> [node(), 'other@node'] end),
-    ok = meck:new(emqx_dashboard_proto_v1, [passthrough, no_history]),
-    ok = meck:expect(emqx_dashboard_proto_v1, do_sample, fun('other@node', _Time) ->
+    ok = meck:new(emqx_dashboard_proto_v2, [passthrough, no_history]),
+    ok = meck:expect(emqx_dashboard_proto_v2, do_sample, fun('other@node', _Time) ->
         Self ! sample_called,
         FakeOldData
     end),
@@ -421,7 +421,7 @@ t_handle_old_monitor_data(_Config) ->
         hd(emqx_dashboard_monitor:samplers())
     ),
     ?assertReceive(sample_called, 1_000),
-    ok = meck:unload([emqx, emqx_dashboard_proto_v1]),
+    ok = meck:unload([emqx, emqx_dashboard_proto_v2]),
     ok.
 
 t_monitor_api(_) ->
@@ -583,6 +583,8 @@ t_monitor_reset(_) ->
         ),
     {ok, Samplers} = request(["monitor"], "latest=1"),
     ?assertEqual(1, erlang:length(Samplers)),
+    ok = delete(["monitor"]),
+    ?assertMatch({ok, []}, request(["monitor"], "latest=1")),
     ok.
 
 t_monitor_api_error(_) ->
@@ -666,7 +668,7 @@ t_persistent_session_stats(Config) ->
                 <<"connections">> := 3,
                 <<"disconnected_durable_sessions">> := 1,
                 %% N.B.: we currently don't perform any deduplication between persistent
-                %% and non-persistent routes, so we count `commont/topic' twice and get 8
+                %% and non-persistent routes, so we count `common/topic' twice and get 8
                 %% instead of 6 here.
                 <<"topics">> := 8,
                 <<"subscriptions">> := 8,
@@ -702,7 +704,7 @@ t_persistent_session_stats(Config) ->
                 <<"connections">> := 3,
                 <<"disconnected_durable_sessions">> := 2,
                 %% N.B.: we currently don't perform any deduplication between persistent
-                %% and non-persistent routes, so we count `commont/topic' twice and get 8
+                %% and non-persistent routes, so we count `common/topic' twice and get 8
                 %% instead of 6 here.
                 <<"topics">> := 8,
                 <<"subscriptions">> := 8,
@@ -712,7 +714,9 @@ t_persistent_session_stats(Config) ->
             ?ON(N1, request(["monitor_current"]))
         )
     end),
-
+    ?assertNotMatch({ok, []}, ?ON(N1, request(["monitor"]))),
+    ?assertMatch(ok, ?ON(N1, delete(["monitor"]))),
+    ?assertMatch({ok, []}, ?ON(N1, request(["monitor"]))),
     ok.
 
 %% Checks that we get consistent data when changing the requested time window for
@@ -842,6 +846,10 @@ get_req_cluster(Config, Path, QS) ->
 host(Port) ->
     "http://127.0.0.1:" ++ integer_to_list(Port).
 
+delete(Path) ->
+    Url = url(Path, ""),
+    do_request_api(delete, {Url, [auth_header_()]}).
+
 url(Parts, QS) ->
     url(?SERVER, Parts, QS).
 
@@ -858,6 +866,8 @@ do_request_api(Method, Request) ->
     case httpc:request(Method, Request, [], []) of
         {error, socket_closed_remotely} ->
             {error, socket_closed_remotely};
+        {ok, {{"HTTP/1.1", 204, _}, _, _}} ->
+            ok;
         {ok, {{"HTTP/1.1", Code, _}, _, Return}} when
             Code >= 200 andalso Code =< 299
         ->

+ 1 - 1
apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl

@@ -239,7 +239,7 @@ transitions(Node, DB) ->
 %% Try to eliminate any ambiguity in the message representation.
 message_canonical_form(Msg0 = #message{}) ->
     message_canonical_form(emqx_message:to_map(Msg0));
-message_canonical_form(#{flags := Flags0, headers := Headers0, payload := Payload0} = Msg) ->
+message_canonical_form(#{flags := Flags0, headers := _Headers0, payload := Payload0} = Msg) ->
     %% Remove flags that are false:
     Flags = maps:filter(
         fun(_Key, Val) -> Val end,

+ 5 - 0
rel/i18n/emqx_dashboard_monitor_api.hocon

@@ -5,6 +5,11 @@ list_monitor.desc:
 list_monitor.label:
 """List cluster stats data"""
 
+clear_monitor.desc:
+"""Clear monitor (statistics) data for the whole cluster."""
+clear_monitor.label:
+"""Clear cluster stats data"""
+
 list_monitor_node.desc:
 """List the monitor (statistics) data on the specified node."""
 list_monitor_node.label: