Bläddra i källkod

Merge pull request #7271 from EMQ-YangM/add_metrics_and_status_to_authn

feat(emqx_authn_api): add metrics and status to authn
Xinyu Liu 3 år sedan
förälder
incheckning
3cf18a293d

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -1,5 +1,6 @@
 {emqx,1}.
 {emqx_bridge,1}.
+{emqx_authn,1}.
 {emqx_broker,1}.
 {emqx_cm,1}.
 {emqx_conf,1}.

+ 48 - 0
apps/emqx/src/emqx_map_lib.erl

@@ -28,6 +28,7 @@
         , binary_string/1
         , deep_convert/3
         , diff_maps/2
+        , merge_with/3
         ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -170,3 +171,50 @@ covert_keys_to_atom(BinKeyMap, Conv) ->
             (K, V) when is_atom(K) -> {K, V};
             (K, V) when is_binary(K) -> {Conv(K), V}
         end, []).
+
+%% copy from maps.erl OTP24.0
+-compile({inline, [error_with_info/2]}).
+merge_with(Combiner, Map1, Map2) when is_map(Map1),
+                                 is_map(Map2),
+                                 is_function(Combiner, 3) ->
+    case map_size(Map1) > map_size(Map2) of
+        true ->
+            Iterator = maps:iterator(Map2),
+            merge_with_t(maps:next(Iterator),
+                         Map1,
+                         Map2,
+                         Combiner);
+        false ->
+            Iterator = maps:iterator(Map1),
+            merge_with_t(maps:next(Iterator),
+                         Map2,
+                         Map1,
+                         fun(K, V1, V2) -> Combiner(K, V2, V1) end)
+    end;
+merge_with(Combiner, Map1, Map2) ->
+    error_with_info(error_type_merge_intersect(Map1, Map2, Combiner),
+                    [Combiner, Map1, Map2]).
+
+merge_with_t({K, V2, Iterator}, Map1, Map2, Combiner) ->
+    case Map1 of
+        #{ K := V1 } ->
+            NewMap1 = Map1#{ K := Combiner(K, V1, V2) },
+            merge_with_t(maps:next(Iterator), NewMap1, Map2, Combiner);
+        #{ } ->
+            merge_with_t(maps:next(Iterator), maps:put(K, V2, Map1), Map2, Combiner)
+    end;
+merge_with_t(none, Result, _, _) ->
+    Result.
+
+error_type_merge_intersect(M1, M2, Combiner) when is_function(Combiner, 3) ->
+    error_type_two_maps(M1, M2);
+error_type_merge_intersect(_M1, _M2, _Combiner) ->
+    badarg.
+
+error_with_info(_, _) ->
+    {error_info, #{module => erl_stdlib_errors}}.
+
+error_type_two_maps(M1, M2) when is_map(M1) ->
+    {badmap, M2};
+error_type_two_maps(M1, _M2) ->
+    {badmap, M1}.

+ 121 - 5
apps/emqx_authn/src/emqx_authn_api.erl

@@ -59,6 +59,8 @@
         , authenticator_user/2
         , listener_authenticator_users/2
         , listener_authenticator_user/2
+        , lookup_from_local_node/2
+        , lookup_from_all_nodes/2
         ]).
 
 -export([ authenticator_examples/0
@@ -550,7 +552,7 @@ authenticators(get, _Params) ->
     list_authenticators([authentication]).
 
 authenticator(get, #{bindings := #{id := AuthenticatorID}}) ->
-    list_authenticator([authentication], AuthenticatorID);
+    list_authenticator(?GLOBAL, [authentication], AuthenticatorID);
 
 authenticator(put, #{bindings := #{id := AuthenticatorID}, body := Config}) ->
     update_authenticator([authentication], ?GLOBAL, AuthenticatorID, Config);
@@ -574,8 +576,8 @@ listener_authenticators(get, #{bindings := #{listener_id := ListenerID}}) ->
 
 listener_authenticator(get, #{bindings := #{listener_id := ListenerID, id := AuthenticatorID}}) ->
     with_listener(ListenerID,
-                  fun(Type, Name, _) ->
-                        list_authenticator([listeners, Type, Name, authentication],
+                  fun(Type, Name, ChainName) ->
+                        list_authenticator(ChainName, [listeners, Type, Name, authentication],
                                        AuthenticatorID)
                   end);
 listener_authenticator(put,
@@ -750,15 +752,129 @@ list_authenticators(ConfKeyPath) ->
                         || AuthenticatorConfig <- AuthenticatorsConfig],
     {200, NAuthenticators}.
 
-list_authenticator(ConfKeyPath, AuthenticatorID) ->
+list_authenticator(ChainName, ConfKeyPath, AuthenticatorID) ->
     AuthenticatorsConfig = get_raw_config_with_defaults(ConfKeyPath),
     case find_config(AuthenticatorID, AuthenticatorsConfig) of
         {ok, AuthenticatorConfig} ->
-            {200, maps:put(id, AuthenticatorID, convert_certs(AuthenticatorConfig))};
+            StatusAndMetrics = lookup_from_all_nodes(ChainName, AuthenticatorID),
+            Fun = fun ({Key, Val}, Map) -> maps:put(Key, Val, Map) end,
+            AppendList = [{id, AuthenticatorID}, {status_and_metrics, StatusAndMetrics}],
+            {200, lists:foldl(Fun, convert_certs(AuthenticatorConfig), AppendList)};
         {error, Reason} ->
             serialize_error(Reason)
     end.
 
+lookup_from_local_node(ChainName, AuthenticatorID) ->
+    NodeId = node(self()),
+    case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
+        {ok, #{provider := Provider, state := State}} ->
+            case lists:member(Provider, resource_provider()) of
+                false -> {error, {NodeId, resource_unsupport_metrics_and_status}};
+                true ->
+                    #{resource_id := ResourceId} = State,
+                    case emqx_resource:get_instance(ResourceId) of
+                        {error, not_found} -> {error, {NodeId, not_found_resource}};
+                        {ok, _, #{ status := Status, metrics := Metrics }} ->
+                            {ok, {NodeId, Status, Metrics}}
+                    end
+            end;
+        {error, Reason} -> {error, {NodeId, Reason}}
+    end.
+
+resource_provider() ->
+    [ emqx_authn_mysql,
+      emqx_authn_pgsql,
+      emqx_authn_mongodb,
+      emqx_authn_redis,
+      emqx_authn_http
+    ].
+
+lookup_from_all_nodes(ChainName, AuthenticatorID) ->
+    Nodes = mria_mnesia:running_nodes(),
+    case is_ok(emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID)) of
+        {ok, ResList} ->
+            {StatusMap, MetricsMap, ErrorMap} = make_result_map(ResList),
+            AggregateStatus = aggregate_status(maps:values(StatusMap)),
+            AggregateMetrics = aggregate_metrics(maps:values(MetricsMap)),
+            Fun = fun(_, V1) -> restructure_map(V1) end,
+            #{node_status => StatusMap,
+              node_metrics => maps:map(Fun, MetricsMap),
+              node_error => ErrorMap,
+              status => AggregateStatus,
+              metrics => restructure_map(AggregateMetrics)
+            };
+        {error, ErrL} ->
+            {error_msg('INTERNAL_ERROR', ErrL)}
+    end.
+
+aggregate_status([]) -> error_some_strange_happen;
+aggregate_status(AllStatus) ->
+    Head = fun ([A | _]) -> A end,
+    HeadVal = Head(AllStatus),
+    AllRes = lists:all(fun (Val) -> Val == HeadVal end, AllStatus),
+    case AllRes of
+        true -> HeadVal;
+        false -> inconsistent
+    end.
+
+aggregate_metrics([]) -> error_some_strange_happen;
+aggregate_metrics([HeadMetrics | AllMetrics]) ->
+    CombinerFun =
+        fun ComFun(Val1, Val2) ->
+            case erlang:is_map(Val1) of
+                true -> emqx_map_lib:merge_with(ComFun, Val1, Val2);
+                false -> Val1 + Val2
+            end
+        end,
+    Fun = fun (ElemMap, AccMap) ->
+        emqx_map_lib:merge_with(CombinerFun, ElemMap, AccMap) end,
+    lists:foldl(Fun, HeadMetrics, AllMetrics).
+
+make_result_map(ResList) ->
+    Fun =
+        fun(Elem, {StatusMap, MetricsMap, ErrorMap}) ->
+            case Elem of
+                {ok, {NodeId, Status, Metrics}} ->
+                    {maps:put(NodeId, Status, StatusMap),
+                     maps:put(NodeId, Metrics, MetricsMap),
+                     ErrorMap
+                    };
+                {error, {NodeId, Reason}} ->
+                    {StatusMap,
+                     MetricsMap,
+                     maps:put(NodeId, Reason, ErrorMap)
+                    }
+            end
+        end,
+    lists:foldl(Fun, {maps:new(), maps:new(), maps:new()}, ResList).
+
+restructure_map(#{counters := #{failed := Failed, matched := Match, success := Succ},
+                  rate := #{matched := #{current := Rate, last5m := Rate5m, max := RateMax}
+                           }
+                 }
+               ) ->
+    #{matched => Match,
+      success => Succ,
+      failed => Failed,
+      rate => Rate,
+      rate_last5m => Rate5m,
+      rate_max => RateMax
+     };
+restructure_map(Error) ->
+     Error.
+
+error_msg(Code, Msg) ->
+              #{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
+
+bin(S) when is_list(S) ->
+    list_to_binary(S).
+
+is_ok(ResL) ->
+    case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
+        [] -> {ok, [Res || {ok, Res} <- ResL]};
+        ErrL -> {error, ErrL}
+    end.
+
 update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) ->
     case update_config(ConfKeyPath, {update_authenticator, ChainName, AuthenticatorID, Config}) of
         {ok, #{post_config_update := #{emqx_authentication := #{id := ID}},

+ 35 - 0
apps/emqx_authn/src/proto/emqx_authn_proto_v1.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authn_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+        , lookup_from_all_nodes/3
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.0.0".
+
+-spec lookup_from_all_nodes([node()], atom(), binary()) ->
+          emqx_rpc:erpc_multicall().
+lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID) ->
+    erpc:multicall(Nodes, emqx_authn_api, lookup_from_local_node, [ChainName, AuthenticatorID], ?TIMEOUT).

+ 27 - 2
apps/emqx_authn/test/emqx_authn_api_SUITE.erl

@@ -164,10 +164,35 @@ test_authenticator(PathPrefix) ->
                      post,
                      uri(PathPrefix ++ [?CONF_NS]),
                      ValidConfig0),
-    {ok, 200, _} = request(
+    {ok, 200, Res} = request(
                      get,
                      uri(PathPrefix ++ [?CONF_NS, "password_based:http"])),
-
+    {ok, RList} = emqx_json:safe_decode(Res),
+    Snd = fun ({_, Val}) -> Val end,
+    LookupVal = fun LookupV(List, RestJson) ->
+            case List of
+                [Name] -> Snd(lists:keyfind(Name, 1, RestJson));
+                [Name | NS] -> LookupV(NS, Snd(lists:keyfind(Name, 1, RestJson)))
+            end
+        end,
+    LookFun = fun (List) -> LookupVal(List, RList) end,
+    MetricsList = [{<<"failed">>, 0},
+                   {<<"matched">>, 0},
+                   {<<"rate">>, 0.0},
+                   {<<"rate_last5m">>, 0.0},
+                   {<<"rate_max">>, 0.0},
+                   {<<"success">>, 0}],
+    EqualFun = fun ({M, V}) ->
+                   ?assertEqual(V, LookFun([<<"status_and_metrics">>,
+                                            <<"metrics">>,
+                                            M]
+                                          )
+                               ) end,
+    lists:map(EqualFun, MetricsList),
+    ?assertEqual(<<"connected">>,
+                 LookFun([<<"status_and_metrics">>,
+                          <<"status">>
+                         ])),
     {ok, 404, _} = request(
                      get,
                      uri(PathPrefix ++ [?CONF_NS, "password_based:redis"])),