Просмотр исходного кода

Merge pull request #7290 from EMQ-YangM/add_metrics_and_status_to_authz

feat(emqx_authz): add metrics and status to authz
Yang Miao 3 лет назад
Родитель
Сommit
4c93a71446

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -1,6 +1,7 @@
 {emqx,1}.
 {emqx_bridge,1}.
 {emqx_authn,1}.
+{emqx_authz,1}.
 {emqx_broker,1}.
 {emqx_cm,1}.
 {emqx_conf,1}.

+ 102 - 1
apps/emqx_authz/src/emqx_authz_api_sources.erl

@@ -54,6 +54,8 @@
 
 -export([ get_raw_sources/0
         , get_raw_source/1
+        , lookup_from_local_node/1
+        , lookup_from_all_nodes/1
         ]).
 
 -export([ api_spec/0
@@ -226,7 +228,12 @@ source(get, #{bindings := #{type := Type}}) ->
                             message => bin(Reason)}}
             end;
         [Source] ->
-            {200, read_certs(Source)}
+            case emqx_authz:lookup(Type) of
+                #{annotations := #{id := ResourceId }} ->
+                    StatusAndMetrics = lookup_from_all_nodes(ResourceId),
+                    {200, maps:put(status_and_metrics, StatusAndMetrics, read_certs(Source))};
+                _ -> {200, maps:put(status_and_metrics, resource_not_found, read_certs(Source))}
+            end
     end;
 source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>,
                                                            <<"rules">> := Rules,
@@ -269,6 +276,100 @@ move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Pos
 %% Internal functions
 %%--------------------------------------------------------------------
 
+lookup_from_local_node(ResourceId) ->
+    NodeId = node(self()),
+    case emqx_resource:get_instance(ResourceId) of
+        {error, not_found} -> {error, {NodeId, not_found_resource}};
+        {ok, _, #{ status := Status, metrics := Metrics }} ->
+            {ok, {NodeId, Status, Metrics}}
+    end.
+
+lookup_from_all_nodes(ResourceId) ->
+    Nodes = mria_mnesia:running_nodes(),
+    case is_ok(emqx_authz_proto_v1:lookup_from_all_nodes(Nodes, ResourceId)) of
+        {ok, ResList} ->
+            {StatusMap, MetricsMap, ErrorMap} = make_result_map(ResList),
+            AggregateStatus = aggregate_status(maps:values(StatusMap)),
+            AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
+            Fun = fun(_, V1) -> restructure_map(V1) end,
+            #{node_status => StatusMap,
+              node_metrics => maps:map(Fun, MetricsMap),
+              node_error => ErrorMap,
+              status => AggregateStatus,
+              metrics => restructure_map(AggregateMetrics)
+             };
+        {error, ErrL} ->
+            {error_msg('INTERNAL_ERROR', ErrL)}
+    end.
+
+aggregate_status([]) -> error_some_strange_happen;
+aggregate_status(AllStatus) ->
+    Head = fun ([A | _]) -> A end,
+    HeadVal = Head(AllStatus),
+    AllRes = lists:all(fun (Val) -> Val == HeadVal end, AllStatus),
+    case AllRes of
+        true -> HeadVal;
+        false -> inconsistent
+    end.
+
+aggregate_metrics([]) -> error_some_strange_happen;
+aggregate_metrics([HeadMetrics | AllMetrics]) ->
+    CombinerFun =
+        fun ComFun(Val1, Val2) ->
+            case erlang:is_map(Val1) of
+                true -> emqx_map_lib:merge_with(ComFun, Val1, Val2);
+                false -> Val1 + Val2
+            end
+        end,
+    Fun = fun (ElemMap, AccMap) ->
+        emqx_map_lib:merge_with(CombinerFun, ElemMap, AccMap) end,
+    lists:foldl(Fun, HeadMetrics, AllMetrics).
+
+make_result_map(ResList) ->
+    Fun =
+        fun(Elem, {StatusMap, MetricsMap, ErrorMap}) ->
+            case Elem of
+                {ok, {NodeId, Status, Metrics}} ->
+                    {maps:put(NodeId, Status, StatusMap),
+                     maps:put(NodeId, Metrics, MetricsMap),
+                     ErrorMap
+                    };
+                {error, {NodeId, Reason}} ->
+                    {StatusMap,
+                     MetricsMap,
+                     maps:put(NodeId, Reason, ErrorMap)
+                    }
+            end
+        end,
+    lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList).
+
+restructure_map(#{counters := #{failed := Failed, matched := Match, success := Succ},
+                  rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}
+                           }
+                 }
+               ) ->
+    #{matched => Match,
+      success => Succ,
+      failed => Failed,
+      rate => Rate,
+      rate_last5m => Rate5m,
+      rate_max => RateMax
+     };
+restructure_map(Error) ->
+     Error.
+
+error_msg(Code, Msg) ->
+              #{code => Code, message => bin_t(io_lib:format("~p", [Msg]))}.
+
+bin_t(S) when is_list(S) ->
+    list_to_binary(S).
+
+is_ok(ResL) ->
+    case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
+        [] -> {ok, [Res || {ok, Res} <- ResL]};
+        ErrL -> {error, ErrL}
+    end.
+
 get_raw_sources() ->
     RawSources = emqx:get_raw_config([authorization, sources], []),
     Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}},

+ 35 - 0
apps/emqx_authz/src/proto/emqx_authz_proto_v1.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+        , lookup_from_all_nodes/2
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.0.0".
+
+-spec lookup_from_all_nodes([node()], binary()) ->
+          emqx_rpc:erpc_multicall().
+lookup_from_all_nodes(Nodes, ResourceId) ->
+    erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [ResourceId], ?TIMEOUT).

+ 34 - 0
apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl

@@ -176,6 +176,34 @@ t_api(_) ->
                            [?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]),
     {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1),
 
+    Snd = fun ({_, Val}) -> Val end,
+    LookupVal = fun LookupV(List, RestJson) ->
+                        case List of
+                            [Name] -> Snd(lists:keyfind(Name, 1, RestJson));
+                            [Name | NS] -> LookupV(NS, Snd(lists:keyfind(Name, 1, RestJson)))
+                        end
+                end,
+    EqualFun = fun (RList) ->
+                   fun ({M, V}) ->
+                       ?assertEqual(V,
+                                    LookupVal([<<"status_and_metrics">>,
+                                               <<"metrics">>, M],
+                                             RList)
+                                   )
+                   end
+               end,
+    AssertFun =
+        fun (ResultJson) ->
+            {ok, RList} = emqx_json:safe_decode(ResultJson),
+            MetricsList = [{<<"failed">>, 0},
+                           {<<"matched">>, 0},
+                           {<<"rate">>, 0.0},
+                           {<<"rate_last5m">>, 0.0},
+                           {<<"rate_max">>, 0.0},
+                           {<<"success">>, 0}],
+            lists:map(EqualFun(RList), MetricsList)
+        end,
+
     {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []),
     Sources = get_sources(Result2),
     ?assertMatch([ #{<<"type">> := <<"http">>}
@@ -190,6 +218,10 @@ t_api(_) ->
     {ok, 204, _} = request(put, uri(["authorization", "sources", "http"]),
                            ?SOURCE1#{<<"enable">> := false}),
     {ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []),
+    {ok, RList3} = emqx_json:safe_decode(Result3),
+    ?assertEqual(<<"resource_not_found">>,
+                 LookupVal([<<"status_and_metrics">>
+                           ], RList3)),
     ?assertMatch(#{<<"type">> := <<"http">>, <<"enable">> := false}, jsx:decode(Result3)),
 
     Keyfile = emqx_common_test_helpers:app_path(
@@ -211,6 +243,7 @@ t_api(_) ->
                                          <<"verify">> => <<"verify_none">>
                                         }}),
     {ok, 200, Result4} = request(get, uri(["authorization", "sources", "mongodb"]), []),
+    AssertFun(Result4),
     ?assertMatch(#{<<"type">> := <<"mongodb">>,
                    <<"ssl">> := #{<<"enable">> := <<"true">>,
                                   <<"cacertfile">> := ?MATCH_CERT,
@@ -233,6 +266,7 @@ t_api(_) ->
                                          <<"verify">> => <<"verify_none">>
                                         }}),
     {ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongodb"]), []),
+    AssertFun(Result5),
     ?assertMatch(#{<<"type">> := <<"mongodb">>,
                    <<"ssl">> := #{<<"enable">> := <<"true">>,
                                   <<"cacertfile">> := ?MATCH_CERT,