Explorar o código

refactor(pluglib): move `emqx_plugin_libs_proto_v1` into emqx app

And rename it to `emqx_metrics_proto_v1` in the process.
Andrew Mayorov %!s(int64=2) %!d(string=hai) anos
pai
achega
79d430cf03

+ 10 - 9
apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl

@@ -14,25 +14,26 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_plugin_libs_proto_v1).
+-module(emqx_metrics_proto_v1).
 
 -behaviour(emqx_bpapi).
 
 -export([
     introduced_in/0,
 
-    get_metrics/3
+    get_metrics/4
 ]).
 
--include_lib("emqx/include/bpapi.hrl").
+-include("bpapi.hrl").
 
 introduced_in() ->
-    "5.0.0".
+    "5.1.0".
 
 -spec get_metrics(
-    node(),
+    [node()],
     emqx_metrics_worker:handler_name(),
-    emqx_metrics_worker:metric_id()
-) -> emqx_metrics_worker:metrics() | {badrpc, _}.
-get_metrics(Node, HandlerName, MetricId) ->
-    rpc:call(Node, emqx_metrics_worker, get_metrics, [HandlerName, MetricId]).
+    emqx_metrics_worker:metric_id(),
+    timeout()
+) -> emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()).
+get_metrics(Nodes, HandlerName, MetricId, Timeout) ->
+    erpc:multicall(Nodes, emqx_metrics_worker, get_metrics, [HandlerName, MetricId], Timeout).

+ 1 - 1
apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugin_libs, [
     {description, "EMQX Plugin utility libs"},
-    {vsn, "4.3.11"},
+    {vsn, "4.3.12"},
     {modules, []},
     {applications, [kernel, stdlib]},
     {env, []}

+ 76 - 68
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -44,10 +44,13 @@
 %% query callback
 -export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]).
 
+-define(RPC_GET_METRICS_TIMEOUT, 5000).
+
 -define(ERR_BADARGS(REASON), begin
     R0 = err_msg(REASON),
     <<"Bad Arguments: ", R0/binary>>
 end).
+
 -define(CHECK_PARAMS(PARAMS, TAG, EXPR),
     case emqx_rule_api_schema:check_params(PARAMS, TAG) of
         {ok, CheckedParams} ->
@@ -56,6 +59,7 @@ end).
             {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}}
     end
 ).
+
 -define(METRICS(
     MATCH,
     PASS,
@@ -87,6 +91,7 @@ end).
         'matched.rate.last5m' => RATE_5
     }
 ).
+
 -define(metrics(
     MATCH,
     PASS,
@@ -527,74 +532,77 @@ printable_function_name(Mod, Func) ->
     list_to_binary(lists:concat([Mod, ":", Func])).
 
 get_rule_metrics(Id) ->
-    Format = fun
-        (
-            Node,
-            #{
-                counters :=
-                    #{
-                        'matched' := Matched,
-                        'passed' := Passed,
-                        'failed' := Failed,
-                        'failed.exception' := FailedEx,
-                        'failed.no_result' := FailedNoRes,
-                        'actions.total' := OTotal,
-                        'actions.failed' := OFailed,
-                        'actions.failed.out_of_service' := OFailedOOS,
-                        'actions.failed.unknown' := OFailedUnknown,
-                        'actions.success' := OFailedSucc
-                    },
-                rate :=
-                    #{
-                        'matched' :=
-                            #{current := Current, max := Max, last5m := Last5M}
-                    }
-            }
-        ) ->
-            #{
-                metrics => ?METRICS(
-                    Matched,
-                    Passed,
-                    Failed,
-                    FailedEx,
-                    FailedNoRes,
-                    OTotal,
-                    OFailed,
-                    OFailedOOS,
-                    OFailedUnknown,
-                    OFailedSucc,
-                    Current,
-                    Max,
-                    Last5M
-                ),
-                node => Node
-            };
-        (Node, _Metrics) ->
-            %% Empty metrics: can happen when a node joins another and a bridge is not yet
-            %% replicated to it, so the counters map is empty.
-            #{
-                metrics => ?METRICS(
-                    _Matched = 0,
-                    _Passed = 0,
-                    _Failed = 0,
-                    _FailedEx = 0,
-                    _FailedNoRes = 0,
-                    _OTotal = 0,
-                    _OFailed = 0,
-                    _OFailedOOS = 0,
-                    _OFailedUnknown = 0,
-                    _OFailedSucc = 0,
-                    _Current = 0,
-                    _Max = 0,
-                    _Last5M = 0
-                ),
-                node => Node
-            }
-    end,
-    [
-        Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id))
-     || Node <- mria:running_nodes()
-    ].
+    Nodes = emqx:running_nodes(),
+    Results = emqx_metrics_proto_v1:get_metrics(Nodes, rule_metrics, Id, ?RPC_GET_METRICS_TIMEOUT),
+    NodeResults = lists:zip(Nodes, Results),
+    NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults],
+    NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok],
+    NodeErrors == [] orelse
+        ?SLOG(warning, #{
+            msg => "rpc_get_rule_metrics_errors",
+            errors => NodeErrors
+        }),
+    NodeMetrics.
+
+format_metrics(Node, #{
+    counters :=
+        #{
+            'matched' := Matched,
+            'passed' := Passed,
+            'failed' := Failed,
+            'failed.exception' := FailedEx,
+            'failed.no_result' := FailedNoRes,
+            'actions.total' := OTotal,
+            'actions.failed' := OFailed,
+            'actions.failed.out_of_service' := OFailedOOS,
+            'actions.failed.unknown' := OFailedUnknown,
+            'actions.success' := OFailedSucc
+        },
+    rate :=
+        #{
+            'matched' :=
+                #{current := Current, max := Max, last5m := Last5M}
+        }
+}) ->
+    #{
+        metrics => ?METRICS(
+            Matched,
+            Passed,
+            Failed,
+            FailedEx,
+            FailedNoRes,
+            OTotal,
+            OFailed,
+            OFailedOOS,
+            OFailedUnknown,
+            OFailedSucc,
+            Current,
+            Max,
+            Last5M
+        ),
+        node => Node
+    };
+format_metrics(Node, _Metrics) ->
+    %% Empty metrics: can happen when a node joins another and a bridge is not yet
+    %% replicated to it, so the counters map is empty.
+    #{
+        metrics => ?METRICS(
+            _Matched = 0,
+            _Passed = 0,
+            _Failed = 0,
+            _FailedEx = 0,
+            _FailedNoRes = 0,
+            _OTotal = 0,
+            _OFailed = 0,
+            _OFailedOOS = 0,
+            _OFailedUnknown = 0,
+            _OFailedSucc = 0,
+            _Current = 0,
+            _Max = 0,
+            _Last5M = 0
+        ),
+        node => Node
+    }.
 
 aggregate_metrics(AllMetrics) ->
     InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),