Просмотр исходного кода

feat(emqx_exhook): add metrics

lafirest 4 лет назад
Родитель
Сommit
9a6a8a778b

+ 3 - 0
apps/emqx_exhook/include/emqx_exhook.hrl

@@ -18,6 +18,9 @@
 -define(EMQX_EXHOOK_HRL, true).
 
 -define(APP, emqx_exhook).
+-define(HOOKS_REF_COUNTER, emqx_exhook_ref_counter).
+-define(HOOKS_METRICS, emqx_exhook_metrics).
+-define(METRICS_PRECISION, 1).
 
 -define(ENABLED_HOOKS,
       [ {'client.connect',      {emqx_exhook_handler, on_client_connect,       []}}

+ 202 - 88
apps/emqx_exhook/src/emqx_exhook_api.erl

@@ -23,33 +23,36 @@
 
 -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
 
--export([exhooks/2, action_with_name/2, move/2]).
+-export([exhooks/2, action_with_name/2, move/2, server_hooks/2]).
 
--import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
+-import(hoconsc, [mk/2, ref/1, enum/1, array/1, map/2]).
 -import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]).
 
 -define(TAGS, [<<"exhooks">>]).
 -define(BAD_REQUEST, 'BAD_REQUEST').
 -define(BAD_RPC, 'BAD_RPC').
 
+%%--------------------------------------------------------------------
+%% schema
+%%--------------------------------------------------------------------
 namespace() -> "exhook".
 
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE).
 
-paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move"].
+paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move", "/exhooks/:name/hooks"].
 
 schema(("/exhooks")) ->
     #{
       'operationId' => exhooks,
       get => #{tags => ?TAGS,
                description => <<"List all servers">>,
-               responses => #{200 => mk(array(ref(detailed_server_info)), #{})}
+               responses => #{200 => mk(array(ref(list_server_info)), #{})}
               },
       post => #{tags => ?TAGS,
                 description => <<"Add a servers">>,
                 'requestBody' => server_conf_schema(),
-                responses => #{201 => mk(ref(detailed_server_info), #{}),
+                responses => #{201 => mk(ref(detail_server_info), #{}),
                                500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
                               }
                }
@@ -60,7 +63,7 @@ schema("/exhooks/:name") ->
       get => #{tags => ?TAGS,
                description => <<"Get the detail information of server">>,
                parameters => params_server_name_in_path(),
-               responses => #{200 => mk(ref(detailed_server_info), #{}),
+               responses => #{200 => mk(ref(detail_server_info), #{}),
                               400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
                              }
               },
@@ -77,11 +80,21 @@ schema("/exhooks/:name") ->
                   description => <<"Delete the server">>,
                   parameters => params_server_name_in_path(),
                   responses => #{204 => <<>>,
-                                 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
-                                }
+                                 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)                                }
                  }
      };
 
+schema("/exhooks/:name/hooks") ->
+    #{'operationId' => server_hooks,
+      get => #{tags => ?TAGS,
+               description => <<"Get the hooks information of server">>,
+               parameters => params_server_name_in_path(),
+               responses => #{200 => mk(array(ref(list_hook_info)), #{}),
+                              400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
+                             }
+              }
+     };
+
 schema("/exhooks/:name/move") ->
     #{'operationId' => move,
       post => #{tags => ?TAGS,
@@ -96,25 +109,56 @@ schema("/exhooks/:name/move") ->
      }.
 
 fields(move_req) ->
-    [
-     {position, mk(enum([top, bottom, before, 'after']), #{})},
-     {related, mk(string(), #{desc => <<"Relative position of movement">>,
-                              default => <<>>,
-                              example => <<>>
-                             })}
+    [ {position, mk(enum([top, bottom, before, 'after']), #{})}
+    , {related, mk(string(), #{desc => <<"Relative position of movement">>,
+                               default => <<>>,
+                               example => <<>>
+                              })}
     ];
 
-fields(detailed_server_info) ->
-    [ {status, mk(enum([running, waiting, stopped]), #{})}
-    , {hooks, mk(array(string()), #{default => []})}
-    , {node_status, mk(ref(node_status), #{})}
+fields(list_server_info) ->
+    [ {metrics, mk(ref(metrics), #{})}
+    , {node_metrics, mk(array(ref(node_metrics)), #{})}
+    , {node_status, mk(array(ref(node_status)), #{})}
+    , {hooks, mk(array(ref(hook_info)), #{})}
+    ] ++ emqx_exhook_schema:server_config();
+
+fields(detail_server_info) ->
+    [ {metrics, mk(ref(metrics), #{})}
+    , {node_metrics, mk(array(ref(node_metrics)), #{})}
+    , {node_status, mk(array(ref(node_status)), #{})}
+    , {hooks, mk(array(ref(hook_info)), #{})}
     ] ++ emqx_exhook_schema:server_config();
 
+fields(list_hook_info) ->
+    [ {name, mk(binary(), #{})}
+    , {params, mk(map(name, binary()), #{})}
+    , {metrics, mk(ref(metrics), #{})}
+    , {node_metrics, mk(array(ref(node_metrics)), #{})}
+    ];
+
+fields(node_metrics) ->
+    [ {node, mk(string(), #{})}
+    , {metrics, mk(ref(metrics), #{})}
+    ];
+
 fields(node_status) ->
     [ {node, mk(string(), #{})}
     , {status, mk(enum([running, waiting, stopped, not_found, error]), #{})}
     ];
 
+fields(hook_info) ->
+    [ {name, mk(binary(), #{})}
+    , {params, mk(map(name, binary()), #{})}
+    ];
+
+fields(metrics) ->
+    [ {succeed, mk(integer(), #{})}
+    , {failed, mk(integer(), #{})}
+    , {rate, mk(integer(), #{})}
+    , {max_rate, mk(integer(), #{})}
+    ];
+
 fields(server_config) ->
     emqx_exhook_schema:server_config().
 
@@ -140,16 +184,19 @@ server_conf_schema() ->
                                    }
                          }).
 
-
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
 exhooks(get, _) ->
-    ServerL = emqx_exhook_mgr:list(),
-    ServerL2 = nodes_all_server_status(ServerL),
-    {200, ServerL2};
+    Confs = emqx:get_config([exhook, servers]),
+    Infos = nodes_all_server_info(Confs),
+    {200, Infos};
 
 exhooks(post, #{body := Body}) ->
     case emqx_exhook_mgr:update_config([exhook, servers], {add, Body}) of
-        {ok, Result} ->
-            {201, Result};
+        {ok, _} ->
+            #{<<"name">> := Name} = Body,
+            get_nodes_server_info(Name);
         {error, Error} ->
             {500, #{code => <<"BAD_RPC">>,
                     message => Error
@@ -157,16 +204,7 @@ exhooks(post, #{body := Body}) ->
     end.
 
 action_with_name(get, #{bindings := #{name := Name}}) ->
-    Result = emqx_exhook_mgr:lookup(Name),
-    NodeStatus = nodes_server_status(Name),
-    case Result of
-        not_found ->
-            {400, #{code => <<"BAD_REQUEST">>,
-                    message => <<"Server not found">>
-                   }};
-        ServerInfo ->
-            {200, ServerInfo#{node_status => NodeStatus}}
-    end;
+    get_nodes_server_info(Name);
 
 action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
     case emqx_exhook_mgr:update_config([exhook, servers],
@@ -177,8 +215,7 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
                    }};
         {ok, {error, Reason}} ->
             {400, #{code => <<"BAD_REQUEST">>,
-                    message => unicode:characters_to_binary(
-                                 io_lib:format("Error Reason:~p~n", [Reason]))
+                    message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason]))
                    }};
         {ok, _} ->
             {200};
@@ -216,59 +253,136 @@ move(post, #{bindings := #{name := Name}, body := Body}) ->
                    }}
     end.
 
-nodes_server_status(Name) ->
-    StatusL = call_cluster(emqx_exhook_mgr, server_status, [Name]),
-
-    Handler = fun({Node, {error, _}}) ->
-                      #{node => Node,
-                        status => error
-                       };
-                 ({Node, Status}) ->
-                      #{node => Node,
-                        status => Status
-                       }
-              end,
-
-    lists:map(Handler, StatusL).
-
-nodes_all_server_status(ServerL) ->
-    AllStatusL = call_cluster(emqx_exhook_mgr, all_servers_status, []),
-
-    AggreMap = lists:foldl(fun(#{name := Name}, Acc) ->
-                                   Acc#{Name => []}
-                           end,
-                           #{},
-                           ServerL),
-
-    AddToMap = fun(Servers, Node, Status, Map) ->
-                       lists:foldl(fun(Name, Acc) ->
-                                           StatusL = maps:get(Name, Acc),
-                                           StatusL2 = [#{node => Node,
-                                                         status => Status
-                                                        } | StatusL],
-                                           Acc#{Name := StatusL2}
-                                   end,
-                                   Map,
-                                   Servers)
-               end,
-
-    AggreMap2 = lists:foldl(fun({Node, #{running := Running,
-                                         waiting := Waiting,
-                                         stopped := Stopped}},
-                                Acc) ->
-                                    AddToMap(Stopped, Node, stopped,
-                                             AddToMap(Waiting, Node, waiting,
-                                                      AddToMap(Running, Node, running, Acc)))
-                            end,
-                            AggreMap,
-                            AllStatusL),
-
-    Handler = fun(#{name := Name} = Server) ->
-                      Server#{node_status => maps:get(Name, AggreMap2)}
-              end,
-
-    lists:map(Handler, ServerL).
+server_hooks(get, #{bindings := #{name := Name}}) ->
+    Confs = emqx:get_config([exhook, servers]),
+    case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
+        false ->
+            {400, #{code => <<"BAD_REQUEST">>,
+                    message => <<"Server not found">>
+                   }};
+        _ ->
+            Info = get_nodes_server_hooks_info(Name),
+            {200, Info}
+    end.
+
+get_nodes_server_info(Name) ->
+    Confs = emqx:get_config([exhook, servers]),
+    case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
+        false ->
+            {400, #{code => <<"BAD_REQUEST">>,
+                    message => <<"Server not found">>
+                   }};
+        {value, Conf} ->
+            NodeStatus = nodes_server_info(Name),
+            {200, maps:merge(Conf, NodeStatus)}
+    end.
 
+%%--------------------------------------------------------------------
+%% GET /exhooks
+%%--------------------------------------------------------------------
+nodes_all_server_info(ConfL) ->
+    AllInfos = call_cluster(emqx_exhook_mgr, all_servers_info, []),
+    Default = emqx_exhook_metrics:new_metrics_info(),
+    node_all_server_info(ConfL, AllInfos, Default, []).
+
+node_all_server_info([#{name := ServerName} = Conf | T], AllInfos, Default, Acc) ->
+    Info = fill_cluster_server_info(AllInfos, [], [], ServerName, Default),
+    AllInfo = maps:merge(Conf, Info),
+    node_all_server_info(T, AllInfos, Default, [AllInfo | Acc]);
+
+node_all_server_info([], _, _, Acc) ->
+    lists:reverse(Acc).
+
+fill_cluster_server_info([{Node, {error, _}} | T], StatusL, MetricsL, ServerName, Default) ->
+    fill_cluster_server_info(T,
+                             [#{node => Node, status => error} | StatusL],
+                             [#{node => Node, metrics => Default} | MetricsL],
+                             ServerName,
+                             Default);
+
+fill_cluster_server_info([{Node, Result} | T], StatusL, MetricsL, ServerName, Default) ->
+    #{status := Status, metrics := Metrics} = Result,
+    fill_cluster_server_info(T,
+                             [#{node => Node, status => maps:get(ServerName, Status, error)} | StatusL],
+                             [#{node => Node, metrics => maps:get(ServerName, Metrics, Default)} | MetricsL],
+                             ServerName,
+                             Default);
+
+fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
+    Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
+    #{metrics => Metrics,
+      node_metrics => MetricsL,
+      node_status => StatusL,
+      hooks => emqx_exhook_mgr:hooks(ServerName)
+     }.
+
+%%--------------------------------------------------------------------
+%% GET /exhooks/{name}
+%%--------------------------------------------------------------------
+nodes_server_info(Name) ->
+    InfoL = call_cluster(emqx_exhook_mgr, server_info, [Name]),
+    Default = emqx_exhook_metrics:new_metrics_info(),
+    nodes_server_info(InfoL, Name, Default, [], []).
+
+nodes_server_info([{Node, {error, _}} | T], Name, Default, StatusL, MetricsL) ->
+    nodes_server_info(T,
+                      Name,
+                      Default,
+                      [#{node => Node, status => error} | StatusL],
+                      [#{node => Node, metrics => Default} | MetricsL]
+                     );
+
+nodes_server_info([{Node, Result} | T], Name, Default, StatusL, MetricsL) ->
+    #{status := Status, metrics := Metrics} = Result,
+    nodes_server_info(T,
+                      Name,
+                      Default,
+                      [#{node => Node, status => Status} | StatusL],
+                      [#{node => Node, metrics => Metrics} | MetricsL]
+                     );
+
+nodes_server_info([], Name, _, StatusL, MetricsL) ->
+    #{metrics => emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
+      node_status => StatusL,
+      node_metrics => MetricsL,
+      hooks => emqx_exhook_mgr:hooks(Name)
+     }.
+
+%%--------------------------------------------------------------------
+%% GET /exhooks/{name}/hooks
+%%--------------------------------------------------------------------
+get_nodes_server_hooks_info(Name) ->
+    case emqx_exhook_mgr:hooks(Name) of
+        [] -> [];
+        Hooks ->
+            AllInfos = call_cluster(emqx_exhook_mgr, server_hooks_metrics, [Name]),
+            Default = emqx_exhook_metrics:new_metrics_info(),
+            get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
+    end.
+
+get_nodes_server_hooks_info([#{name := Name} = Spec | T], AllInfos, Default, Acc) ->
+    Info = fill_server_hooks_info(AllInfos, Name, Default, []),
+    AllInfo = maps:merge(Spec, Info),
+    get_nodes_server_hooks_info(T, AllInfos, Default, [AllInfo | Acc]);
+
+get_nodes_server_hooks_info([], _, _, Acc) ->
+    Acc.
+
+fill_server_hooks_info([{_, {error, _}} | T], Name, Default, MetricsL) ->
+    fill_server_hooks_info(T, Name, Default, MetricsL);
+
+fill_server_hooks_info([{Node, MetricsMap} | T], Name, Default, MetricsL) ->
+    Metrics = maps:get(Name, MetricsMap, Default),
+    NodeMetrics = #{node => Node, metrics => Metrics},
+    fill_server_hooks_info(T, Name, Default, [NodeMetrics | MetricsL]);
+
+fill_server_hooks_info([], _Name, _Default, MetricsL) ->
+    Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
+    #{metrics => Metrics, node_metrics => MetricsL}.
+
+%%--------------------------------------------------------------------
+%% cluster call
+%%--------------------------------------------------------------------
 call_cluster(Module, Fun, Args) ->
     Nodes = mria_mnesia:running_nodes(),
     [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes].

+ 247 - 0
apps/emqx_exhook/src/emqx_exhook_metrics.erl

@@ -0,0 +1,247 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_metrics).
+
+-include("emqx_exhook.hrl").
+
+%% API
+-export([ init/0, succeed/2, failed/2
+        , update/1, new_metrics_info/0, servers_metrics/0
+        , delete_server/1, server_metrics/1, hooks_metrics/1
+        , metrics_aggregate/1, metrics_aggregate_by_key/2, hooks_metrics_aggregate/1
+        , metrics_aggregate_by/2
+        ]).
+
+-record(metrics, {
+                  index :: index()
+                 ,succeed = 0 :: non_neg_integer()
+                 ,failed = 0 :: non_neg_integer()
+                 ,rate = 0 :: non_neg_integer()
+                 ,max_rate = 0 :: non_neg_integer()
+                 ,window_rate :: integer()
+                 }).
+
+-type server_name() :: emqx_exhook_mgr:server_name().
+-type hookpoint() :: emqx_exhook_server:hookpoint().
+-type index() :: {server_name(), hookpoint()}.
+-type hooks_metrics()  :: #{hookpoint() => metrics_info()}.
+-type servers_metrics()  :: #{server_name() => metrics_info()}.
+
+-type metrics_info() :: #{ succeed := non_neg_integer()
+                         , failed := non_neg_integer()
+                         , rate := number()
+                         , max_rate := number()
+                         }.
+
+-define(INDEX(ServerName, HookPoint), {ServerName, HookPoint}).
+-export_type([metrics_info/0, servers_metrics/0, hooks_metrics/0]).
+
+%%--------------------------------------------------------------------
+%%% API
+%%--------------------------------------------------------------------
+init() ->
+    _ = ets:new(?HOOKS_METRICS,
+                [ set, named_table, public
+                , {keypos, #metrics.index}, {write_concurrency, true}
+                , {read_concurrency, true}
+                ]),
+    ok.
+
+-spec new_metric_info() -> metrics_info().
+new_metric_info() ->
+    #{succeed => 0,
+      failed => 0,
+      rate => 0,
+      max_rate => 0
+     }.
+
+-spec succeed(server_name(), hookpoint()) -> integer().
+succeed(Server, Hook) ->
+    inc(Server, Hook, #metrics.succeed,
+        #metrics{index = {Server, Hook}
+                ,window_rate = 1
+                ,succeed = 1
+                }).
+
+-spec failed(server_name(), hookpoint()) -> integer().
+failed(Server, Hook) ->
+    inc(Server, Hook, #metrics.failed,
+        #metrics{index = {Server, Hook}
+                ,window_rate = 1
+                ,failed = 1
+                }).
+
+-spec update(pos_integer()) -> true.
+update(Interval) ->
+    Fun = fun(#metrics{rate = Rate,
+                       window_rate = WindowRate,
+                       max_rate = MaxRate} = Metrics,
+              _) ->
+                  case calc_metric(WindowRate, Interval) of
+                      Rate -> true;
+                      NewRate ->
+                          MaxRate2 = erlang:max(MaxRate, NewRate),
+                          Metrics2 = Metrics#metrics{rate = NewRate,
+                                                     window_rate = 0,
+                                                     max_rate = MaxRate2},
+                          ets:insert(?HOOKS_METRICS, Metrics2)
+                  end
+          end,
+
+    ets:foldl(Fun, true, ?HOOKS_METRICS).
+
+-spec delete_server(server_name()) -> true.
+delete_server(Name) ->
+    ets:match_delete(?HOOKS_METRICS,
+                     {metrics, {Name, '_'}, '_', '_', '_', '_', '_'}).
+
+-spec server_metrics(server_name()) -> metrics_info().
+server_metrics(SvrName) ->
+    Hooks = ets:match(?HOOKS_METRICS,
+                      {metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}),
+
+    Fold = fun(#metrics{succeed = Succeed,
+                        failed = Failed,
+                        rate = Rate,
+                        max_rate = MaxRate},
+               Acc) ->
+                   [#{ succeed => Succeed
+                     , failed => Failed
+                     , rate => Rate
+                     , max_rate => MaxRate
+                     } | Acc]
+           end,
+
+    AllMetrics = lists:foldl(Fold, [], Hooks),
+    metrics_aggregate(AllMetrics).
+
+-spec servers_metrics() -> servers_metrics().
+servers_metrics() ->
+    AllMetrics = ets:tab2list(?HOOKS_METRICS),
+
+    GroupFun = fun(#metrics{index = ?INDEX(ServerName, _),
+                            succeed = Succeed,
+                            failed = Failed,
+                            rate = Rate,
+                            max_rate = MaxRate
+                           },
+                   Acc) ->
+                       SvrGroup = maps:get(ServerName, Acc, []),
+                       Metrics = #{ succeed => Succeed
+                                  , failed => Failed
+                                  , rate => Rate
+                                  , max_rate => MaxRate
+                                  },
+                       Acc#{ServerName => [Metrics | SvrGroup]}
+               end,
+
+    GroupBySever = lists:foldl(GroupFun, #{}, AllMetrics),
+
+    MapFun = fun(_SvrName, Group) -> metrics_aggregate(Group) end,
+    maps:map(MapFun, GroupBySever).
+
+-spec hooks_metrics(server_name()) -> hooks_metrics().
+hooks_metrics(SvrName) ->
+    Hooks = ets:match(?HOOKS_METRICS,
+                      {metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}),
+
+    Fold = fun(#metrics{index = ?INDEX(_, HookPoint),
+                        succeed = Succeed,
+                        failed = Failed,
+                        rate = Rate,
+                        max_rate = MaxRate},
+               Acc) ->
+                   Acc#{HookPoint => #{ succeed => Succeed
+                                      , failed => Failed
+                                      , rate => Rate
+                                      , max_rate => MaxRate
+                                      }}
+           end,
+
+    lists:foldl(Fold, #{}, Hooks).
+
+-spec metrics_aggregate(list(metrics_info())) -> metrics_info().
+metrics_aggregate(MetricsL) ->
+    metrics_aggregate_by(fun(X) -> X end, MetricsL).
+
+-spec metrics_aggregate_by_key(Key, list(HasMetrics)) -> metrics_info()
+              when Key :: any(),
+                   HasMetrics :: #{Key => metrics_info()}.
+metrics_aggregate_by_key(Key, MetricsL) ->
+    metrics_aggregate_by(fun(X) -> maps:get(Key, X, new_metrics_info()) end,
+                         MetricsL).
+
+-spec hooks_metrics_aggregate(list(hooks_metrics())) -> hooks_metrics().
+hooks_metrics_aggregate([]) ->
+    #{};
+
+hooks_metrics_aggregate([H | _] = MapL) ->
+    Hooks = maps:keys(H),
+
+    Fold = fun(Hook, Acc) ->
+                   Metrics = metrics_aggregate_by_key(Hook, MapL),
+                   Acc#{Hook => Metrics}
+           end,
+
+    lists:foldl(Fold, #{}, Hooks).
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+-spec inc(server_name(), hookpoint(), pos_integer(), #metrics{}) -> integer().
+inc(Server, Hook, Pos, Default) ->
+    Index = {Server, Hook},
+    ets:update_counter(?HOOKS_METRICS,
+                       Index,
+                       [{#metrics.window_rate, 1}, {Pos, 1}],
+                       Default).
+
+-spec new_metrics_info() -> metrics_info().
+new_metrics_info() ->
+    #{ succeed => 0
+     , failed => 0
+     , rate => 0
+     , max_rate => 0
+     }.
+
+-spec calc_metric(non_neg_integer(), non_neg_integer()) -> non_neg_integer().
+calc_metric(Val, Interval) ->
+    erlang:ceil(Val * ?METRICS_PRECISION / Interval).
+
+-spec metrics_add(metrics_info(), metrics_info()) -> metrics_info().
+metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1}
+           , #{succeed := S2, failed := F2, rate := R2, max_rate := M2} = Acc) ->
+    Acc#{ succeed := S1 + S2
+        , failed := F1 + F2
+        , rate := R1 + R2
+        , max_rate := M1 + M2
+        }.
+
+-spec metrics_aggregate_by(fun((any()) -> metrics_info()), list(metrics_info())) -> metrics_info().
+metrics_aggregate_by(_, []) ->
+    new_metric_info();
+
+metrics_aggregate_by(Fun, MetricsL) ->
+    Fold = fun(E, Acc) -> metrics_add(Fun(E), Acc) end,
+    #{rate := Rate,
+      max_rate := MaxRate} = Result = lists:foldl(Fold, new_metric_info(), MetricsL),
+
+    Len = erlang:length(MetricsL),
+
+    Result#{rate := Rate div Len,
+            max_rate := MaxRate div Len
+           }.

+ 59 - 33
apps/emqx_exhook/src/emqx_exhook_mgr.erl

@@ -30,14 +30,16 @@
         , lookup/1
         , enable/1
         , disable/1
-        , server_status/1
-        , all_servers_status/0
+        , server_info/1
+        , all_servers_info/0
+        , server_hooks_metrics/1
         ]).
 
 %% Helper funcs
 -export([ running/0
         , server/1
-        , init_counter_table/0
+        , hooks/1
+        , init_ref_counter_table/0
         ]).
 
 -export([ update_config/2
@@ -86,9 +88,9 @@
                         }.
 
 -define(DEFAULT_TIMEOUT, 60000).
--define(CNTER, emqx_exhook_counter).
+-define(REFRESH_INTERVAL, timer:seconds(5)).
 
--export_type([server_info/0]).
+-export_type([servers/0, server/0, server_info/0]).
 
 %%--------------------------------------------------------------------
 %% APIs
@@ -113,17 +115,20 @@ enable(Name) ->
 disable(Name) ->
     update_config([exhook, servers], {enable, Name, false}).
 
-server_status(Name) ->
-    call({server_status, Name}).
+server_info(Name) ->
+    call({?FUNCTION_NAME, Name}).
 
-all_servers_status() ->
-    call(all_servers_status).
+all_servers_info() ->
+    call(?FUNCTION_NAME).
+
+server_hooks_metrics(Name) ->
+    call({?FUNCTION_NAME, Name}).
 
 call(Req) ->
     gen_server:call(?MODULE, Req, ?DEFAULT_TIMEOUT).
 
-init_counter_table() ->
-    _ = ets:new(?CNTER, [named_table, public]).
+init_ref_counter_table() ->
+    _ = ets:new(?HOOKS_REF_COUNTER, [named_table, public]).
 
 %%=====================================================================
 %% Hocon schema
@@ -180,6 +185,7 @@ init([]) ->
     ServerL = emqx:get_config([exhook, servers]),
     {Waiting, Running, Stopped} = load_all_servers(ServerL),
     Orders = reorder(ServerL),
+    refresh_tick(),
     {ok, ensure_reload_timer(
            #{waiting => Waiting,
              running => Running,
@@ -235,6 +241,8 @@ handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
                      orders := maps:remove(ToDelete, Orders)
                     },
 
+    emqx_exhook_metrics:delete_server(ToDelete),
+
     {reply, ok, State3};
 
 handle_call({update_config, {add, RawConf}, NewConfL},
@@ -245,32 +253,22 @@ handle_call({update_config, {add, RawConf}, NewConfL},
     case emqx_exhook_server:load(Name, Conf) of
         {ok, ServerState} ->
             save(Name, ServerState),
-            Status = running,
-            Hooks = hooks(Name),
             State2 = State#{running := Running#{Name => Conf}};
         {error, _} ->
-            Status = running,
-            Hooks = [],
             StateT = State#{waiting := Waitting#{Name => Conf}},
             State2 = ensure_reload_timer(StateT);
         disable ->
-            Status = stopped,
-            Hooks = [],
             State2 = State#{stopped := Stopped#{Name => Conf}}
     end,
     Orders = reorder(NewConfL),
-    Resulte = maps:merge(Conf, #{status => Status, hooks => Hooks}),
-    {reply, Resulte, State2#{orders := Orders}};
+    {reply, ok, State2#{orders := Orders}};
 
 handle_call({lookup, Name}, _From, State) ->
     case where_is_server(Name, State) of
         not_found ->
             Result = not_found;
         {Where, #{Name := Conf}} ->
-            Result = maps:merge(Conf,
-                                #{ status => Where
-                                 , hooks => hooks(Name)
-                                 })
+            Result = maps:merge(Conf, #{status => Where})
     end,
     {reply, Result, State};
 
@@ -282,21 +280,41 @@ handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) ->
     {Result, State2} = restart_server(Name, NewConfL, State),
     {reply, Result, State2};
 
-handle_call({server_status, Name}, _From, State) ->
+handle_call({server_info, Name}, _From, State) ->
     case where_is_server(Name, State) of
         not_found ->
             Result = not_found;
         {Status, _} ->
-            Result = Status
+            HooksMetrics = emqx_exhook_metrics:server_metrics(Name),
+            Result = #{ status => Status
+                      , metrics => HooksMetrics
+                      }
     end,
     {reply, Result, State};
 
-handle_call(all_servers_status, _From, #{running := Running,
-                                         waiting := Waiting,
-                                         stopped := Stopped} = State) ->
-    {reply, #{running => maps:keys(Running),
-              waiting => maps:keys(Waiting),
-              stopped => maps:keys(Stopped)}, State};
+handle_call(all_servers_info, _From, #{running := Running,
+                                       waiting := Waiting,
+                                       stopped := Stopped} = State) ->
+    MakeStatus = fun(Status, Servers, Acc) ->
+                         lists:foldl(fun(Name, IAcc) -> IAcc#{Name => Status} end,
+                                     Acc,
+                                     maps:keys(Servers))
+                 end,
+    Status = lists:foldl(fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end,
+                         #{},
+                         [{running, Running}, {waiting, Waiting}, {stopped, Stopped}]),
+
+    Metrics = emqx_exhook_metrics:servers_metrics(),
+
+    Result = #{ status => Status
+              , metrics => Metrics
+              },
+
+    {reply, Result, State};
+
+handle_call({server_hooks_metrics, Name}, _From, State) ->
+    Result = emqx_exhook_metrics:hooks_metrics(Name),
+    {reply, Result, State};
 
 handle_call(_Request, _From, State) ->
     Reply = ok,
@@ -318,6 +336,11 @@ handle_info({timeout, _Ref, {reload, Name}}, State) ->
             {noreply, ensure_reload_timer(NState)}
     end;
 
+handle_info(refresh_tick, State) ->
+    refresh_tick(),
+    emqx_exhook_metrics:update(?REFRESH_INTERVAL),
+    {noreply, State};
+
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -490,7 +513,6 @@ get_servers_info(Status, Map) ->
            end,
     maps:fold(Fold, [], Map).
 
-
 where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) ->
     {running, Running};
 
@@ -549,6 +571,10 @@ sort_name_by_order(Names, Orders) ->
                        maps:get(A, Orders) < maps:get(B, Orders)
                end,
                Names).
+
+refresh_tick() ->
+    erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME).
+
 %%--------------------------------------------------------------------
 %% Server state persistent
 save(Name, ServerState) ->
@@ -590,5 +616,5 @@ hooks(Name) ->
         undefined ->
             [];
         Service ->
-            emqx_exhook_server:hookpoints(Service)
+            emqx_exhook_server:hooks(Service)
     end.

+ 12 - 9
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -19,8 +19,6 @@
 -include("emqx_exhook.hrl").
 -include_lib("emqx/include/logger.hrl").
 
-
--define(CNTER, emqx_exhook_counter).
 -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
 
 %% Load/Unload
@@ -33,7 +31,7 @@
 
 %% Infos
 -export([ name/1
-        , hookpoints/1
+        , hooks/1
         , format/1
         , failed_action/1
         ]).
@@ -72,7 +70,7 @@
                    | 'message.acked'
                    | 'message.dropped'.
 
--export_type([server/0]).
+-export_type([server/0, hookpoint/0]).
 
 -dialyzer({nowarn_function, [inc_metrics/2]}).
 
@@ -215,20 +213,20 @@ ensure_hooks(HookSpecs) ->
                 ?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
             {Hookpoint, {M, F, A}} ->
                 emqx_hooks:put(Hookpoint, {M, F, A}),
-                ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
+                ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
         end
     end, maps:keys(HookSpecs)).
 
 may_unload_hooks(HookSpecs) ->
     lists:foreach(fun(Hookpoint) ->
-        case ets:update_counter(?CNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
+        case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
             Cnt when Cnt =< 0 ->
                 case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
                     {Hookpoint, {M, F, _A}} ->
                         emqx_hooks:del(Hookpoint, {M, F});
                     _ -> ok
                 end,
-                ets:delete(?CNTER, Hookpoint);
+                ets:delete(?HOOKS_REF_COUNTER, Hookpoint);
             _ -> ok
         end
     end, maps:keys(HookSpecs)).
@@ -244,8 +242,13 @@ format(#{name := Name, hookspec := Hooks}) ->
 name(#{name := Name}) ->
     Name.
 
-hookpoints(#{hookspec := Hooks}) ->
-    maps:keys(Hooks).
+hooks(#{hookspec := Hooks}) ->
+    FoldFun = fun(Hook, Params, Acc) ->
+                      [#{ name => Hook
+                        , params => Params
+                        } | Acc]
+              end,
+    maps:fold(FoldFun, [], Hooks).
 
 -spec call(hookpoint(), map(), server()) -> ignore
               | {ok, Resp :: term()}

+ 2 - 1
apps/emqx_exhook/src/emqx_exhook_sup.erl

@@ -42,7 +42,8 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    _ = emqx_exhook_mgr:init_counter_table(),
+    _ = emqx_exhook_metrics:init(),
+    _ = emqx_exhook_mgr:init_ref_counter_table(),
     Mngr = ?CHILD(emqx_exhook_mgr, worker, []),
     {ok, {{one_for_one, 10, 100}, [Mngr]}}.
 

+ 27 - 4
apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl

@@ -37,7 +37,7 @@ exhook {
 ">>).
 
 all() ->
-    [t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_update].
+    [t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_hooks, t_update].
 
 init_per_suite(Config) ->
     application:load(emqx_conf),
@@ -94,7 +94,11 @@ t_list(_) ->
     [Svr] = List,
 
     ?assertMatch(#{name := <<"default">>,
-                   status := <<"running">>}, Svr).
+                   metrics := _,
+                   node_metrics := _,
+                   node_status := _,
+                   hooks := _
+                  }, Svr).
 
 t_get(_) ->
     {ok, Data} = request_api(get, api_path(["exhooks", "default"]), "",
@@ -103,7 +107,11 @@ t_get(_) ->
     Svr = decode_json(Data),
 
     ?assertMatch(#{name := <<"default">>,
-                   status := <<"running">>}, Svr).
+                   metrics := _,
+                   node_metrics := _,
+                   node_status := _,
+                   hooks := _
+                  }, Svr).
 
 t_add(Cfg) ->
     Template = proplists:get_value(template, Cfg),
@@ -116,7 +124,10 @@ t_add(Cfg) ->
     Svr = decode_json(Data),
 
     ?assertMatch(#{name := <<"test1">>,
-                   status := <<"running">>}, Svr),
+                   metrics := _,
+                   node_metrics := _,
+                   node_status := _,
+                   hooks := _}, Svr),
 
     ?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()).
 
@@ -143,6 +154,18 @@ t_delete(_) ->
     ?assertMatch({ok, <<>>}, Result),
     ?assertMatch([<<"default">>], emqx_exhook_mgr:running()).
 
+t_hooks(_Cfg) ->
+    {ok, Data} = request_api(get, api_path(["exhooks", "default", "hooks"]), "",
+                             auth_header_()),
+
+    [Hook1 | _] = decode_json(Data),
+
+    ?assertMatch(#{name := _,
+                   params := _,
+                   metrics := _,
+                   node_metrics := _
+                  }, Hook1).
+
 t_update(Cfg) ->
     Template = proplists:get_value(template, Cfg),
     Instance = Template#{enable => false},