Просмотр исходного кода

feat: add emqx_rpc:multicall_on_running

also move emqx:is_running multicall to emqx_proto_v2:are_running
Ivan Dyachkov 2 лет назад
Родитель
Сommit
9d1a16aae1

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -1,5 +1,6 @@
 %% This file is automatically generated by `make static_checks`, do not edit.
 {emqx,1}.
+{emqx,2}.
 {emqx_authn,1}.
 {emqx_authz,1}.
 {emqx_bridge,1}.

+ 13 - 0
apps/emqx/src/emqx_rpc.erl

@@ -27,6 +27,8 @@
     cast/5,
     multicall/4,
     multicall/5,
+    multicall_on_running/5,
+    on_running/3,
 
     unwrap_erpc/1
 ]).
@@ -91,6 +93,17 @@ multicall(Nodes, Mod, Fun, Args) ->
 multicall(Key, Nodes, Mod, Fun, Args) ->
     gen_rpc:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args).
 
+-spec multicall_on_running([node()], module(), atom(), list(), timeout()) -> [term() | {error, _}].
+multicall_on_running(Nodes, Mod, Fun, Args, Timeout) ->
+    unwrap_erpc(erpc:multicall(Nodes, emqx_rpc, on_running, [Mod, Fun, Args], Timeout)).
+
+-spec on_running(module(), atom(), list()) -> term().
+on_running(Mod, Fun, Args) ->
+    case emqx:is_running() of
+        true -> apply(Mod, Fun, Args);
+        false -> error(emqx_down)
+    end.
+
 -spec cast(node(), module(), atom(), list()) -> cast_result().
 cast(Node, Mod, Fun, Args) ->
     %% Note: using a non-ordered cast here, since the generated key is

+ 86 - 0
apps/emqx/src/proto/emqx_proto_v2.erl

@@ -0,0 +1,86 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-include("bpapi.hrl").
+
+-export([
+    introduced_in/0,
+
+    are_running/1,
+    is_running/1,
+
+    get_alarms/2,
+    get_stats/1,
+    get_metrics/1,
+
+    deactivate_alarm/2,
+    delete_all_deactivated_alarms/1,
+
+    clean_authz_cache/1,
+    clean_authz_cache/2,
+    clean_pem_cache/1
+]).
+
+introduced_in() ->
+    "5.0.22".
+
+-spec is_running(node()) -> boolean() | {badrpc, term()}.
+is_running(Node) ->
+    rpc:call(Node, emqx, is_running, []).
+
+-spec are_running([node()]) -> emqx_rpc:erpc_multicall(boolean()).
+are_running(Nodes) when is_list(Nodes) ->
+    erpc:multicall(Nodes, emqx, is_running, []).
+
+-spec get_alarms(node(), all | activated | deactivated) -> [map()].
+get_alarms(Node, Type) ->
+    rpc:call(Node, emqx_alarm, get_alarms, [Type]).
+
+-spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}.
+get_stats(Node) ->
+    rpc:call(Node, emqx_stats, getstats, []).
+
+-spec get_metrics(node()) -> [{emqx_metrics:metric_name(), non_neg_integer()}] | {badrpc, _}.
+get_metrics(Node) ->
+    rpc:call(Node, emqx_metrics, all, []).
+
+-spec clean_authz_cache(node(), emqx_types:clientid()) ->
+    ok
+    | {error, not_found}
+    | {badrpc, _}.
+clean_authz_cache(Node, ClientId) ->
+    rpc:call(Node, emqx_authz_cache, drain_cache, [ClientId]).
+
+-spec clean_authz_cache(node()) -> ok | {badrpc, _}.
+clean_authz_cache(Node) ->
+    rpc:call(Node, emqx_authz_cache, drain_cache, []).
+
+-spec clean_pem_cache(node()) -> ok | {badrpc, _}.
+clean_pem_cache(Node) ->
+    rpc:call(Node, ssl_pem_cache, clear, []).
+
+-spec deactivate_alarm(node(), binary() | atom()) ->
+    ok | {error, not_found} | {badrpc, _}.
+deactivate_alarm(Node, Name) ->
+    rpc:call(Node, emqx_alarm, deactivate, [Name]).
+
+-spec delete_all_deactivated_alarms(node()) -> ok | {badrpc, _}.
+delete_all_deactivated_alarms(Node) ->
+    rpc:call(Node, emqx_alarm, delete_all_deactivated_alarms, []).

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_stats.erl

@@ -139,7 +139,7 @@ list(get, #{query_string := Qs}) ->
 
 running_nodes() ->
     Nodes = erlang:nodes([visible, this]),
-    RpcResults = erpc:multicall(Nodes, emqx, is_running, [], 15000),
+    RpcResults = emqx_proto_v2:are_running(Nodes),
     [
         Node
      || {Node, IsRunning} <- lists:zip(Nodes, RpcResults),