Просмотр исходного кода

fix(emqx_authn_api): format metrics and status

EMQ-YangM 4 лет назад
Родитель
Сommit
dce602c251

+ 54 - 0
apps/emqx/src/emqx_map_lib.erl

@@ -28,6 +28,7 @@
         , binary_string/1
         , deep_convert/3
         , diff_maps/2
+        , merge_with/3
         ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -170,3 +171,56 @@ covert_keys_to_atom(BinKeyMap, Conv) ->
             (K, V) when is_atom(K) -> {K, V};
             (K, V) when is_binary(K) -> {Conv(K), V}
         end, []).
+
+%% copy from maps.erl OTP24.0
+-compile({inline, [error_with_info/2]}).
+-spec merge_with(Combiner, Map1, Map2) -> Map3 when
+    Map1 :: #{Key1 => Value1},
+    Map2 :: #{Key2 => Value2},
+    Combiner :: fun((Key1, Value1, Value2) -> CombineResult),
+    Map3 :: #{Key1 => CombineResult, Key1 => Value1, Key2 => Value2}.
+
+merge_with(Combiner, Map1, Map2) when is_map(Map1),
+                                 is_map(Map2),
+                                 is_function(Combiner, 3) ->
+    case map_size(Map1) > map_size(Map2) of
+        true ->
+            Iterator = maps:iterator(Map2),
+            merge_with_1(maps:next(Iterator),
+                         Map1,
+                         Map2,
+                         Combiner);
+        false ->
+            Iterator = maps:iterator(Map1),
+            merge_with_1(maps:next(Iterator),
+                         Map2,
+                         Map1,
+                         fun(K, V1, V2) -> Combiner(K, V2, V1) end)
+    end;
+merge_with(Combiner, Map1, Map2) ->
+    error_with_info(error_type_merge_intersect(Map1, Map2, Combiner),
+                    [Combiner, Map1, Map2]).
+
+merge_with_1({K, V2, Iterator}, Map1, Map2, Combiner) ->
+    case Map1 of
+        #{ K := V1 } ->
+            NewMap1 = Map1#{ K := Combiner(K, V1, V2) },
+            merge_with_1(maps:next(Iterator), NewMap1, Map2, Combiner);
+        #{ } ->
+            merge_with_1(maps:next(Iterator), maps:put(K, V2, Map1), Map2, Combiner)
+    end;
+merge_with_1(none, Result, _, _) ->
+    Result.
+
+error_type_merge_intersect(M1, M2, Combiner) when is_function(Combiner, 3) ->
+    error_type_two_maps(M1, M2);
+error_type_merge_intersect(_M1, _M2, _Combiner) ->
+    badarg.
+
+error_with_info(Reason, Args) ->
+    erlang:error(Reason, Args, [{error_info, #{module => erl_stdlib_errors}}]).
+
+error_type_two_maps(M1, M2) when is_map(M1) ->
+    {badmap, M2};
+error_type_two_maps(M1, _M2) ->
+    {badmap, M1}.

+ 28 - 46
apps/emqx_authn/src/emqx_authn_api.erl

@@ -30,7 +30,6 @@
 -define(BAD_REQUEST, 'BAD_REQUEST').
 -define(NOT_FOUND, 'NOT_FOUND').
 -define(CONFLICT, 'CONFLICT').
--define(TIMEOUT, 15000).
 
 % Swagger
 
@@ -61,7 +60,7 @@
         , listener_authenticator_users/2
         , listener_authenticator_user/2
         , lookup_from_local_node/2
-        , lookup_from_all_node/2
+        , lookup_from_all_nodes/2
         ]).
 
 -export([ authenticator_examples/0
@@ -757,20 +756,14 @@ list_authenticator(ConfKeyPath, AuthenticatorID) ->
     AuthenticatorsConfig = get_raw_config_with_defaults(ConfKeyPath),
     case find_config(AuthenticatorID, AuthenticatorsConfig) of
         {ok, AuthenticatorConfig} ->
-            {200, maps:put(id, AuthenticatorID, convert_certs(AuthenticatorConfig))};
+            Status_And_Metrics = lookup_from_all_nodes(?GLOBAL, AuthenticatorID),
+            Fun = fun ({Key, Val}, Map) -> maps:put(Key, Val, Map) end,
+            AppendList = [{id, AuthenticatorID}, {status_and_metrics, Status_And_Metrics}],
+            {200, lists:foldl(Fun, convert_certs(AuthenticatorConfig), AppendList)};
         {error, Reason} ->
             serialize_error(Reason)
     end.
 
-%% example:
-%%
-%% emqx_authn_api:lookup_from_local_node('mqtt:global', <<"password-based:http">>)
-%%
-%% {ok,{'emqx@127.0.0.1',connecting,
-%%      #{counters =>
-%%            #{exception => 0,failed => 0,matched => 0,success => 0},
-%%        rate =>
-%%            #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}}}}
 lookup_from_local_node(ChainName, AuthenticatorID) ->
     NodeId = node(self()),
     case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
@@ -796,39 +789,19 @@ resource_provider() ->
       emqx_authn_http
     ].
 
-%% example:
-%%
-%% emqx_authn_api:lookup_from_all_node('mqtt:global', <<"password-based:http">>)
-%%
-%% #{aggregate_metrics =>
-%%       #{counters =>
-%%             #{exception => 0,failed => 0,matched => 0,success => 0},
-%%         rate =>
-%%             #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}},
-%%   aggregate_status => connecting,all_node_error => #{},
-%%   all_node_metrics =>
-%%       #{'emqx@127.0.0.1' =>
-%%             #{counters =>
-%%                   #{exception => 0,failed => 0,matched => 0,success => 0},
-%%               rate =>
-%%                   #{matched => #{current => 0.0,last5m => 0.0,max => 0.0}}}},
-%%   all_node_state => #{'emqx@127.0.0.1' => connecting}}
-lookup_from_all_node(ChainName, AuthenticatorID) ->
+lookup_from_all_nodes(ChainName, AuthenticatorID) ->
     Nodes = mria_mnesia:running_nodes(),
-    case is_ok(erpc:multicall(Nodes,
-                              emqx_authn_api,
-                              lookup_from_local_node,
-                              [ChainName, AuthenticatorID],
-                              ?TIMEOUT)) of
+    case is_ok(emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID)) of
         {ok, ResList} ->
             {StatusMap, MetricsMap, ErrorMap} = make_result_map(ResList),
             AggregateStatus = aggregate_status(maps:values(StatusMap)),
             AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
-            #{all_node_state => StatusMap,
-              all_node_metrics => MetricsMap,
-              all_node_error => ErrorMap,
-              aggregate_status => AggregateStatus,
-              aggregate_metrics => AggregateMetrics
+            Fun = fun(_, V1) -> restructure_map(V1) end,
+            #{node_status => StatusMap,
+              node_metrics => maps:map(Fun, MetricsMap),
+              node_error => ErrorMap,
+              status => AggregateStatus,
+              metrics => restructure_map(AggregateMetrics)
             };
         {error, ErrL} ->
             {error_msg('INTERNAL_ERROR', ErrL)}
@@ -851,12 +824,13 @@ aggregate_metrics([HeadMetrics | AllMetrics]) ->
         fun (FixVal) ->
             fun (_, Val1, Val2) ->
                 case erlang:is_map(Val1) of
-                    true -> maps:merge_with(FixVal(FixVal), Val1, Val2);
+                    true -> emqx_map_lib:merge_with(FixVal(FixVal), Val1, Val2);
                     false -> Val1 + Val2
                 end
             end
         end,
-    Fun = fun (ElemMap, AccMap) -> maps:merge_with(CombinerFun(CombinerFun), ElemMap, AccMap) end,
+    Fun = fun (ElemMap, AccMap) ->
+        emqx_map_lib:merge_with(CombinerFun(CombinerFun), ElemMap, AccMap) end,
     lists:foldl(Fun, HeadMetrics, AllMetrics).
 
 make_result_map(ResList) ->
@@ -877,10 +851,18 @@ make_result_map(ResList) ->
         end,
     lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList).
 
-
-
-
-
+restructure_map(#{counters := #{failed := Failed, matched := Match, success := Succ},
+                  rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}
+                           }
+                 }
+               ) ->
+    #{matched => Match,
+      success => Succ,
+      failed => Failed,
+      rate => Rate,
+      rate_last5m => Rate5m,
+      rate_max => RateMax
+     }.
 
 error_msg(Code, Msg) when is_binary(Msg) ->
               #{code => Code, message => Msg};

+ 37 - 0
apps/emqx_authn/src/proto/emqx_authn_proto_v1.erl

@@ -0,0 +1,37 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authn_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+        , lookup_from_all_nodes/3
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.0.0".
+
+-type key() :: atom() | binary() | [byte()].
+
+-spec lookup_from_all_nodes([node()], key(), key()) ->
+          emqx_rpc:erpc_multicall().
+lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID) ->
+    erpc:multicall(Nodes, emqx_authn_api, lookup_from_local_node, [ChainName, AuthenticatorID], ?TIMEOUT).