Преглед на файлове

Merge branch 'master' into feat-add-emqx-plugins-app

zhongwencool преди 4 години
родител
ревизия
b4e2aa0dcf

+ 5 - 3
apps/emqx/src/emqx_cm.erl

@@ -291,8 +291,9 @@ create_session(ClientInfo, ConnInfo) ->
     ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
     Session.
 
-get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight, expiry_interval := EI}) ->
-    #{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
+get_session_confs(#{zone := Zone, clientid := ClientId}, #{receive_maximum := MaxInflight, expiry_interval := EI}) ->
+    #{clientid => ClientId,
+      max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
       upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
       max_inflight => MaxInflight,
       retry_interval => get_mqtt_conf(Zone, retry_interval),
@@ -301,7 +302,8 @@ get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight, expiry_inte
       %% TODO: Add conf for allowing/disallowing persistent sessions.
       %% Note that the connection info is already enriched to have
       %% default config values for session expiry.
-      is_persistent => EI > 0
+      is_persistent => EI > 0,
+      latency_stats => emqx_config:get_zone_conf(Zone, [latency_stats])
      }.
 
 mqueue_confs(Zone) ->

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -172,6 +172,9 @@ roots(low) ->
    , {"persistent_session_store",
        sc(ref("persistent_session_store"),
           #{})}
+    , {"latency_stats",
+       sc(ref("latency_stats"),
+          #{})}
     ].
 
 fields("persistent_session_store") ->
@@ -964,6 +967,11 @@ when deactivated, but after the retention time.
 """
            })
       }
+    ];
+
+fields("latency_stats") ->
+    [ {"samples", sc(integer(), #{default => 10,
+                                  desc => "the number of smaples for calculate the average latency of delivery"})}
     ].
 
 mqtt_listener() ->

+ 68 - 16
apps/emqx/src/emqx_session.erl

@@ -98,7 +98,8 @@
              ]).
 
 -record(session, {
-          %% sessionID, fresh for all new sessions unless it is a resumed persistent session
+          %% Client's id
+          clientid :: emqx_types:clientid(),
           id :: sessionID(),
           %% Is this session a persistent session i.e. was it started with Session-Expiry > 0
           is_persistent :: boolean(),
@@ -128,9 +129,16 @@
           %% Awaiting PUBREL Timeout (Unit: millsecond)
           await_rel_timeout :: timeout(),
           %% Created at
-          created_at :: pos_integer()
+          created_at :: pos_integer(),
+          %% Message deliver latency stats
+          latency_stats :: emqx_message_latency_stats:stats()
          }).
 
+%% in the previous code, we will replace the message record with the pubrel atom
+%% in the pubrec function, this will lose the creation time of the message,
+%% but now we need this time to calculate latency, so now pubrel atom is changed to this record
+-record(pubrel_await, {timestamp :: non_neg_integer()}).
+
 -type(session() :: #session{}).
 
 -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}).
@@ -157,7 +165,8 @@
                      mqueue_dropped,
                      next_pkt_id,
                      awaiting_rel_cnt,
-                     awaiting_rel_max
+                     awaiting_rel_max,
+                     latency_stats
                     ]).
 
 -define(DEFAULT_BATCH_N, 1000).
@@ -170,6 +179,8 @@
                     , max_inflight => integer()
                     , mqueue => emqx_mqueue:options()
                     , is_persistent => boolean()
+                    , clientid => emqx_types:clientid()
+                    , latency_stats => emqx_message_latency_stats:create_options()
                     }.
 
 %%--------------------------------------------------------------------
@@ -185,6 +196,7 @@ init(Opts) ->
                    }, maps:get(mqueue, Opts, #{})),
     #session{
        id                = emqx_guid:gen(),
+       clientid          = maps:get(clientid, Opts, <<>>),
        is_persistent     = maps:get(is_persistent, Opts, false),
        max_subscriptions = maps:get(max_subscriptions, Opts, infinity),
        subscriptions     = #{},
@@ -196,7 +208,8 @@ init(Opts) ->
        awaiting_rel      = #{},
        max_awaiting_rel  = maps:get(max_awaiting_rel, Opts, 100),
        await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000),
-       created_at        = erlang:system_time(millisecond)
+       created_at        = erlang:system_time(millisecond),
+       latency_stats     = emqx_message_latency_stats:new(maps:get(latency_stats, Opts, #{}))
       }.
 
 %%--------------------------------------------------------------------
@@ -252,7 +265,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
 info(created_at, #session{created_at = CreatedAt}) ->
-    CreatedAt.
+    CreatedAt;
+info(latency_stats, #session{latency_stats = Stats}) ->
+    emqx_message_latency_stats:latency(Stats).
 
 %% @doc Get stats of the session.
 -spec(stats(session()) -> emqx_types:stats()).
@@ -365,7 +380,8 @@ puback(PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
-            return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
+            Session2 = update_latency(Msg, Session),
+            return_with(Msg, dequeue(Session2#session{inflight = Inflight1}));
         {value, {_Pubrel, _Ts}} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
@@ -388,9 +404,10 @@ return_with(Msg, {ok, Publishes, Session}) ->
 pubrec(PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
-            Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
+            Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}),
+            Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
             {ok, Msg, Session#session{inflight = Inflight1}};
-        {value, {pubrel, _Ts}} ->
+        {value, {_Pubrel, _Ts}} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@@ -419,9 +436,10 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
        | {error, emqx_types:reason_code()}).
 pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
-        {value, {pubrel, _Ts}} ->
+        {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) ->
+            Session2 = update_latency(Pubrel, Session),
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
-            dequeue(Session#session{inflight = Inflight1});
+            dequeue(Session2#session{inflight = Inflight1});
         {value, _Other} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
@@ -588,11 +606,16 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
 %%--------------------------------------------------------------------
 
 -spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
-retry(Session = #session{inflight = Inflight}) ->
+retry(Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
     case emqx_inflight:is_empty(Inflight) of
         true  -> {ok, Session};
-        false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
-                                [], erlang:system_time(millisecond), Session)
+        false ->
+            Now = erlang:system_time(millisecond),
+            Session2 = check_expire_latency(Now, RetryInterval, Session),
+            retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
+                           [],
+                           Now,
+                           Session2)
     end.
 
 retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
@@ -619,8 +642,8 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -
             {[{PacketId, Msg1}|Acc], Inflight1}
     end;
 
-retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
-    Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
+retry_delivery(PacketId, Pubrel, Now, Acc, Inflight) ->
+    Inflight1 = emqx_inflight:update(PacketId, {Pubrel, Now}, Inflight),
     {[{pubrel, PacketId}|Acc], Inflight1}.
 
 %%--------------------------------------------------------------------
@@ -664,7 +687,7 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
 
 -spec(replay(session()) -> {ok, replies(), session()}).
 replay(Session = #session{inflight = Inflight}) ->
-    Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
+    Pubs = lists:map(fun({PacketId, {Pubrel,  _Ts}}) when is_record(Pubrel, pubrel_await) ->
                              {pubrel, PacketId};
                         ({PacketId, {Msg, _Ts}}) ->
                              {PacketId, emqx_message:set_flag(dup, true, Msg)}
@@ -715,6 +738,35 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
 next_pkt_id(Session = #session{next_pkt_id = Id}) ->
     Session#session{next_pkt_id = Id + 1}.
 
+%%--------------------------------------------------------------------
+%% Message Latency Stats
+%%--------------------------------------------------------------------
+update_latency(Msg,
+               #session{clientid = ClientId,
+                        latency_stats = Stats,
+                        created_at = CreateAt} = S) ->
+    case get_birth_timestamp(Msg, CreateAt) of
+        0 -> S;
+        Ts ->
+            Latency = erlang:system_time(millisecond) - Ts,
+            Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats),
+            S#session{latency_stats = Stats2}
+    end.
+
+check_expire_latency(Now, Interval,
+                     #session{clientid = ClientId, latency_stats = Stats} = S) ->
+    Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats),
+    S#session{latency_stats = Stats2}.
+
+get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
+    Ts;
+
+get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
+    Ts;
+
+get_birth_timestamp(_, _) ->
+    0.
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------

+ 120 - 0
apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl

@@ -0,0 +1,120 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_message_latency_stats).
+
+%% API
+-export([new/1, update/3, check_expire/4, latency/1]).
+
+-export([get_threshold/0, update_threshold/1]).
+
+-define(NOW, erlang:system_time(millisecond)).
+-define(MINIMUM_INSERT_INTERVAL, 1000).
+-define(MINIMUM_THRESHOLD, 100).
+-define(DEFAULT_THRESHOLD, 500).
+-define(DEFAULT_SAMPLES, 10).
+-define(THRESHOLD_KEY, {?MODULE, threshold}).
+
+-opaque stats() :: #{ ema := emqx_moving_average:ema()
+                    , last_update_time := timestamp()
+                    , last_access_time := timestamp()  %% timestamp of last access top-k
+                    , last_insert_value := non_neg_integer()
+                    }.
+
+-type timestamp() :: non_neg_integer().
+-type timespan() :: number().
+
+-type latency_type() :: average
+                      | expire.
+
+-type create_options() :: #{samples => pos_integer()}.
+
+-export_type([stats/0, latency_type/0, create_options/0]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+-spec new(non_neg_integer() | create_options()) -> stats().
+new(SamplesT) when is_integer(SamplesT) ->
+    Samples = erlang:max(1, SamplesT),
+    #{ ema => emqx_moving_average:new(exponential, #{period => Samples})
+     , last_update_time => 0
+     , last_access_time => 0
+     , last_insert_value => 0
+     };
+
+new(OptsT) ->
+    Opts = maps:merge(#{samples => ?DEFAULT_SAMPLES}, OptsT),
+    #{samples := Samples} = Opts,
+    new(Samples).
+
+-spec update(emqx_types:clientid(), number(), stats()) -> stats().
+update(ClientId, Val, #{ema := EMA} = Stats) ->
+    Now = ?NOW,
+    #{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA),
+    Stats2 = call_hook(ClientId, Now, average, Latency, Stats),
+    Stats2#{ ema := EMA2
+           , last_update_time := ?NOW}.
+
+-spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats().
+check_expire(_, Now, Interval, #{last_update_time := LUT} = S)
+  when LUT >= Now - Interval ->
+    S;
+
+check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) ->
+    Latency = Now - LUT,
+    call_hook(ClientId, Now, expire, Latency, S).
+
+-spec latency(stats()) -> number().
+latency(#{ema := #{average := Average}}) ->
+    Average.
+
+-spec update_threshold(pos_integer()) -> pos_integer().
+update_threshold(Threshold) ->
+    Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD),
+    persistent_term:put(?THRESHOLD_KEY, Val),
+    Val.
+
+get_threshold() ->
+    persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD).
+
+%%--------------------------------------------------------------------
+%%  Internal functions
+%%--------------------------------------------------------------------
+-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
+call_hook(_, _, _, Latency, S)
+  when Latency =< ?MINIMUM_THRESHOLD ->
+    S;
+
+call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
+  when Now =< LIT + ?MINIMUM_INSERT_INTERVAL ->
+    S;
+
+call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
+    case Latency =< get_threshold() of
+        true ->
+            Stats#{last_access_time := Now};
+        _ ->
+            ToInsert = erlang:floor(Latency),
+            Arg = #{clientid => ClientId,
+                    latency => ToInsert,
+                    type => Type,
+                    last_insert_value => LIV,
+                    update_time => Now},
+            emqx:run_hook('message.slow_subs_stats', [Arg]),
+            Stats#{last_insert_value := ToInsert,
+                   last_access_time := Now}
+    end.

+ 90 - 0
apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl

@@ -0,0 +1,90 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @see https://en.wikipedia.org/wiki/Moving_average
+
+-module(emqx_moving_average).
+
+%% API
+-export([new/0, new/1, new/2, update/2]).
+
+-type type() :: cumulative
+              | exponential.
+
+-type ema() :: #{ type := exponential
+                , average := 0 | float()
+                , coefficient := float()
+                }.
+
+-type cma() :: #{ type := cumulative
+                , average := 0 | float()
+                , count := non_neg_integer()
+                }.
+
+-type moving_average() :: ema()
+                        | cma().
+
+-define(DEF_EMA_ARG, #{period => 10}).
+-define(DEF_AVG_TYPE, exponential).
+
+-export_type([type/0, moving_average/0, ema/0, cma/0]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+-spec new() -> moving_average().
+new() ->
+    new(?DEF_AVG_TYPE, #{}).
+
+-spec new(type()) -> moving_average().
+new(Type) ->
+    new(Type, #{}).
+
+-spec new(type(), Args :: map()) -> moving_average().
+new(cumulative, _) ->
+    #{ type => cumulative
+     , average => 0
+     , count => 0
+     };
+
+new(exponential, Arg) ->
+    #{period := Period} = maps:merge(?DEF_EMA_ARG, Arg),
+    #{ type => exponential
+     , average => 0
+       %% coefficient = 2/(N+1) is a common convention, see the wiki link for details
+     , coefficient => 2 / (Period + 1)
+     }.
+
+-spec update(number(), moving_average()) -> moving_average().
+
+update(Val, #{average := 0} = Avg) ->
+    Avg#{average := Val};
+
+update(Val, #{ type := cumulative
+             , average := Average
+             , count := Count} = CMA) ->
+    NewCount = Count + 1,
+    CMA#{average := (Count * Average + Val) / NewCount,
+         count := NewCount};
+
+update(Val, #{ type := exponential
+             , average := Average
+             , coefficient := Coefficient} = EMA) ->
+    EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}.
+
+%%--------------------------------------------------------------------
+%%  Internal functions
+%%--------------------------------------------------------------------

+ 3 - 2
apps/emqx/src/emqx_tls_lib.erl

@@ -353,11 +353,12 @@ is_valid_pem_file(Path) ->
 %% @doc This is to return SSL file content in management APIs.
 file_content_as_options(undefined) -> undefined;
 file_content_as_options(#{<<"enable">> := false} = SSL) ->
-    maps:without(?SSL_FILE_OPT_NAMES, SSL);
+    {ok, maps:without(?SSL_FILE_OPT_NAMES, SSL)};
 file_content_as_options(#{<<"enable">> := true} = SSL) ->
     file_content_as_options(?SSL_FILE_OPT_NAMES, SSL).
 
-file_content_as_options([], SSL) -> {ok, SSL};
+file_content_as_options([], SSL) ->
+    {ok, SSL};
 file_content_as_options([Key | Keys], SSL) ->
     case maps:get(Key, SSL, undefined) of
         undefined -> file_content_as_options(Keys, SSL);

+ 1 - 1
apps/emqx/src/emqx_zone_schema.erl

@@ -24,7 +24,7 @@ namespace() -> zone.
 %% roots are added only for document generation.
 roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown",
             "conn_congestion", "rate_limit", "quota", "force_gc",
-            "overload_protection"
+            "overload_protection", "latency_stats"
            ].
 
 %% zone schemas are clones from the same name from root level

+ 27 - 1
apps/emqx/test/emqx_proper_types.erl

@@ -100,6 +100,7 @@ clientinfo() ->
 %% See emqx_session:session() type define
 sessioninfo() ->
     ?LET(Session, {session,
+                    clientid(),
                     sessionid(),        % id
                     boolean(),          % is_persistent
                     subscriptions(),    % subscriptions
@@ -112,7 +113,8 @@ sessioninfo() ->
                     awaiting_rel(),     % awaiting_rel
                     non_neg_integer(),  % max_awaiting_rel
                     safty_timeout(),    % await_rel_timeout
-                    timestamp()         % created_at
+                    timestamp(),        % created_at
+                    latency_stats()
                   },
          emqx_session:info(Session)).
 
@@ -336,6 +338,30 @@ normal_topic_filter() ->
              end
          end).
 
+%% Type defined emqx_message_lantency_stats.erl - stats()
+latency_stats() ->
+    Keys = [{threshold, number()},
+            {ema, exp_moving_average()},
+            {last_update_time, non_neg_integer()},
+            {last_access_time, non_neg_integer()},
+            {last_insert_value, non_neg_integer()}
+           ],
+    ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
+         begin
+             maps:merge(maps:from_list(Ks), M)
+         end).
+
+%% Type defined emqx_moving_average.erl - ema()
+exp_moving_average() ->
+    Keys = [{type, exponential},
+            {average, number()},
+            {coefficient, float()}
+           ],
+    ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
+         begin
+             maps:merge(maps:from_list(Ks), M)
+         end).
+
 %%--------------------------------------------------------------------
 %% Basic Types
 %%--------------------------------------------------------------------

+ 8 - 6
apps/emqx/test/emqx_session_SUITE.erl

@@ -24,6 +24,9 @@
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
+-define(NOW, erlang:system_time(millisecond)).
+-record(pubrel_await, {timestamp :: non_neg_integer()}).
+
 %%--------------------------------------------------------------------
 %% CT callbacks
 %%--------------------------------------------------------------------
@@ -181,7 +184,7 @@ t_puback_with_dequeue(_) ->
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
 
 t_puback_error_packet_id_in_use(_) ->
-    Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
     {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
         emqx_session:puback(1, session(#{inflight => Inflight})).
 
@@ -193,10 +196,10 @@ t_pubrec(_) ->
     Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
     {ok, Msg, Session1} = emqx_session:pubrec(2, Session),
-    ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
+    ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
 
 t_pubrec_packet_id_in_use_error(_) ->
-    Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
     {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
         emqx_session:pubrec(1, session(#{inflight => Inflight})).
 
@@ -212,7 +215,7 @@ t_pubrel_error_packetid_not_found(_) ->
     {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()).
 
 t_pubcomp(_) ->
-    Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
     {ok, Session1} = emqx_session:pubcomp(1, Session),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
@@ -261,7 +264,7 @@ t_deliver_qos0(_) ->
 t_deliver_qos1(_) ->
     ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
     {ok, Session} = emqx_session:subscribe(
-                       clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
+                      clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
     Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
     {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
@@ -399,4 +402,3 @@ ts(second) ->
     erlang:system_time(second);
 ts(millisecond) ->
     erlang:system_time(millisecond).
-

+ 1 - 0
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -59,6 +59,7 @@
         , emqx_psk_schema
         , emqx_limiter_schema
         , emqx_connector_schema
+        , emqx_slow_subs_schema
         ]).
 
 namespace() -> undefined.

+ 17 - 1
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -61,10 +61,18 @@ do_paginate(Qh, Count, Params, {Module, FormatFun}) ->
 
 query_handle(Table) when is_atom(Table) ->
     qlc:q([R || R <- ets:table(Table)]);
+
+query_handle({Table, Opts}) when is_atom(Table) ->
+    qlc:q([R || R <- ets:table(Table, Opts)]);
+
 query_handle([Table]) when is_atom(Table) ->
     qlc:q([R || R <- ets:table(Table)]);
+
+query_handle([{Table, Opts}]) when is_atom(Table) ->
+    qlc:q([R || R <- ets:table(Table, Opts)]);
+
 query_handle(Tables) ->
-    qlc:append([qlc:q([E || E <- ets:table(T)]) || T <- Tables]).
+    qlc:append([query_handle(T) || T <- Tables]). %
 
 query_handle(Table, MatchSpec) when is_atom(Table) ->
     Options = {traverse, {select, MatchSpec}},
@@ -78,8 +86,16 @@ query_handle(Tables, MatchSpec) ->
 
 count(Table) when is_atom(Table) ->
     ets:info(Table, size);
+
+count({Table, _}) when is_atom(Table) ->
+    ets:info(Table, size);
+
 count([Table]) when is_atom(Table) ->
     ets:info(Table, size);
+
+count([{Table, _}]) when is_atom(Table) ->
+    ets:info(Table, size);
+
 count(Tables) ->
     lists:sum([count(T) || T <- Tables]).
 

+ 2 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -118,7 +118,8 @@ fields("user_provided_function") ->
     [ {function, sc(binary(),
         #{ desc => """
 The user provided function. Should be in the format: '{module}:{function}'.<br>
-Where the <module> is the erlang callback module and the {function} is the erlang function.<br>
+Where {module} is the Erlang callback module and {function} is the Erlang function.
+<br>
 To write your own function, checkout the function <code>console</code> and
 <code>republish</code> in the source file:
 <code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example.

+ 40 - 0
apps/emqx_slow_subs/etc/emqx_slow_subs.conf

@@ -0,0 +1,40 @@
+##--------------------------------------------------------------------
+## EMQ X Slow Subscribers Statistics
+##--------------------------------------------------------------------
+
+emqx_slow_subs {
+    enable = false
+
+    threshold = 500ms
+    ## The latency threshold for statistics, the minimum value is 100ms
+    ##
+    ## Default: 500ms
+
+    ## The eviction time of the record, which in the statistics record table
+    ##
+    ## Default: 5m
+    expire_interval = 5m
+
+    ## The maximum number of records in the slow subscription statistics record table
+    ##
+    ## Value: 10
+    top_k_num = 10
+
+    ## The interval for pushing statistics table records to the system topic. When set to 0, push is disabled
+    ## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval
+    ## publish is disabled if set to 0s.
+    ##
+    ## Value: 0s
+    expire_interval = 0s
+
+    ## QoS of notification message
+    ##
+    ## Defaut: 0
+    notice_qos = 0
+
+    ## Maximum information number in one notification
+    ##
+    ## Default: 100
+    notice_batch_size = 100
+
+}

+ 28 - 0
apps/emqx_slow_subs/include/emqx_slow_subs.hrl

@@ -0,0 +1,28 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-define(TOPK_TAB, emqx_slow_subs_topk).
+
+-define(INDEX(Latency, ClientId), {Latency, ClientId}).
+
+-record(top_k, { index :: index()
+               , type :: emqx_message_latency_stats:latency_type()
+               , last_update_time :: pos_integer()
+               , extra = []
+               }).
+
+-type top_k() :: #top_k{}.
+-type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()).

+ 12 - 0
apps/emqx_slow_subs/src/emqx_slow_subs.app.src

@@ -0,0 +1,12 @@
+{application, emqx_slow_subs,
+ [{description, "EMQ X Slow Subscribers Statistics"},
+  {vsn, "1.0.0"}, % strict semver, bump manually!
+  {modules, []},
+  {registered, [emqx_slow_subs_sup]},
+  {applications, [kernel,stdlib]},
+  {mod, {emqx_slow_subs_app,[]}},
+  {env, []},
+  {licenses, ["Apache-2.0"]},
+  {maintainers, ["EMQ X Team <contact@emqx.io>"]},
+  {links, []}
+ ]}.

+ 318 - 0
apps/emqx_slow_subs/src/emqx_slow_subs.erl

@@ -0,0 +1,318 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_slow_subs).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
+
+-export([ start_link/0, on_stats_update/2, update_settings/1
+        , clear_history/0, init_topk_tab/0
+        ]).
+
+%% gen_server callbacks
+-export([ init/1
+        , handle_call/3
+        , handle_cast/2
+        , handle_info/2
+        , terminate/2
+        , code_change/3
+        ]).
+
+-compile(nowarn_unused_type).
+
+-type state() :: #{ enable := boolean()
+                  , last_tick_at := pos_integer()
+                  }.
+
+-type log() :: #{ rank := pos_integer()
+                , clientid := emqx_types:clientid()
+                , latency := non_neg_integer()
+                , type := emqx_message_latency_stats:latency_type()
+                }.
+
+-type window_log() :: #{ last_tick_at := pos_integer()
+                       , logs := [log()]
+                       }.
+
+-type message() :: #message{}.
+
+-type stats_update_args() :: #{ clientid := emqx_types:clientid()
+                              , latency := non_neg_integer()
+                              , type := emqx_message_latency_stats:latency_type()
+                              , last_insert_value := non_neg_integer()
+                              , update_time := timer:time()
+                              }.
+
+-type stats_update_env() :: #{max_size := pos_integer()}.
+
+-ifdef(TEST).
+-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
+-else.
+-define(EXPIRE_CHECK_INTERVAL, timer:seconds(10)).
+-endif.
+
+-define(NOW, erlang:system_time(millisecond)).
+-define(NOTICE_TOPIC_NAME, "slow_subs").
+-define(DEF_CALL_TIMEOUT, timer:seconds(10)).
+
+%% erlang term order
+%% number < atom < reference < fun < port < pid < tuple < list < bit string
+
+%% ets ordered_set is ascending by term order
+
+%%--------------------------------------------------------------------
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+%% @doc Start the st_statistics
+-spec(start_link() -> emqx_types:startlink_ret()).
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% XXX NOTE:pay attention to the performance here
+-spec on_stats_update(stats_update_args(), stats_update_env()) -> true.
+on_stats_update(#{clientid := ClientId,
+                  latency := Latency,
+                  type := Type,
+                  last_insert_value := LIV,
+                  update_time := Ts},
+                #{max_size := MaxSize}) ->
+
+    LastIndex = ?INDEX(LIV, ClientId),
+    Index = ?INDEX(Latency, ClientId),
+
+    %% check whether the client is in the table
+    case ets:lookup(?TOPK_TAB, LastIndex) of
+        [#top_k{index = Index}] ->
+            %% if last value == the new value, update the type and last_update_time
+            %% XXX for clients whose latency are stable for a long time, is it possible to reduce updates?
+            ets:insert(?TOPK_TAB,
+                       #top_k{index = Index, type = Type, last_update_time = Ts});
+        [_] ->
+            %% if Latency > minimum value, we should update it
+            %% if Latency < minimum value, maybe it can replace the minimum value
+            %% so alwyas update at here
+            %% do we need check if Latency == minimum ???
+            ets:insert(?TOPK_TAB,
+                       #top_k{index = Index, type = Type, last_update_time = Ts}),
+            ets:delete(?TOPK_TAB, LastIndex);
+        [] ->
+            %% try to insert
+            try_insert_to_topk(MaxSize, Index, Latency, Type, Ts)
+    end.
+
+clear_history() ->
+    gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT).
+
+update_settings(Enable) ->
+    gen_server:call(?MODULE, {?FUNCTION_NAME, Enable}, ?DEF_CALL_TIMEOUT).
+
+init_topk_tab() ->
+    case ets:whereis(?TOPK_TAB) of
+        undefined ->
+            ?TOPK_TAB = ets:new(?TOPK_TAB,
+                                [ ordered_set, public, named_table
+                                , {keypos, #top_k.index}, {write_concurrency, true}
+                                , {read_concurrency, true}
+                                ]);
+        _ ->
+            ?TOPK_TAB
+    end.
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    Enable = emqx:get_config([emqx_slow_subs, enable]),
+    {ok, check_enable(Enable, #{enable => false})}.
+
+handle_call({update_settings, Enable}, _From, State) ->
+    State2 = check_enable(Enable, State),
+    {reply, ok, State2};
+
+handle_call(clear_history, _, State) ->
+    ets:delete_all_objects(?TOPK_TAB),
+    {reply, ok, State};
+
+handle_call(Req, _From, State) ->
+    ?LOG(error, "Unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
+
+handle_cast(Msg, State) ->
+    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    {noreply, State}.
+
+handle_info(expire_tick, State) ->
+    expire_tick(),
+    Logs = ets:tab2list(?TOPK_TAB),
+    do_clear(Logs),
+    {noreply, State};
+
+handle_info(notice_tick, State) ->
+    notice_tick(),
+    Logs = ets:tab2list(?TOPK_TAB),
+    do_notification(Logs, State),
+    {noreply, State#{last_tick_at := ?NOW}};
+
+handle_info(Info, State) ->
+    ?LOG(error, "Unexpected info: ~p", [Info]),
+    {noreply, State}.
+
+terminate(_Reason, _) ->
+    unload(),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+expire_tick() ->
+    erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
+
+notice_tick() ->
+    case emqx:get_config([emqx_slow_subs, notice_interval]) of
+        0 -> ok;
+        Interval ->
+            erlang:send_after(Interval, self(), ?FUNCTION_NAME),
+            ok
+    end.
+
+-spec do_notification(list(), state()) -> ok.
+do_notification([], _) ->
+    ok;
+
+do_notification(Logs, #{last_tick_at := LastTickTime}) ->
+    start_publish(Logs, LastTickTime),
+    ok.
+
+start_publish(Logs, TickTime) ->
+    emqx_pool:async_submit({fun do_publish/3, [Logs, erlang:length(Logs), TickTime]}).
+
+do_publish([], _, _) ->
+    ok;
+
+do_publish(Logs, Rank, TickTime) ->
+    BatchSize = emqx:get_config([emqx_slow_subs, notice_batch_size]),
+    do_publish(Logs, BatchSize, Rank, TickTime, []).
+
+do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 ->
+    Cache2 = [convert_to_notice(Rank, Log) | Cache],
+    do_publish(T, Size - 1, Rank - 1, TickTime, Cache2);
+
+do_publish(Logs, Size, Rank, TickTime, Cache) when Size =:= 0 ->
+    publish(TickTime, Cache),
+    do_publish(Logs, Rank, TickTime);
+
+do_publish([], _, _Rank, TickTime, Cache) ->
+    publish(TickTime, Cache),
+    ok.
+
+convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId),
+                               type = Type,
+                               last_update_time = Ts}) ->
+    #{rank => Rank,
+      clientid => ClientId,
+      latency => Latency,
+      type => Type,
+      timestamp => Ts}.
+
+publish(TickTime, Notices) ->
+    WindowLog = #{last_tick_at => TickTime,
+                  logs => lists:reverse(Notices)},
+    Payload = emqx_json:encode(WindowLog),
+    Msg = #message{ id = emqx_guid:gen()
+                  , qos = emqx:get_config([emqx_slow_subs, notice_qos])
+                  , from = ?MODULE
+                  , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
+                  , payload = Payload
+                  , timestamp = ?NOW
+                  },
+    _ = emqx_broker:safe_publish(Msg),
+    ok.
+
+load() ->
+    MaxSize = emqx:get_config([emqx_slow_subs, top_k_num]),
+    _ = emqx:hook('message.slow_subs_stats',
+                  {?MODULE, on_stats_update, [#{max_size => MaxSize}]}
+                 ),
+    ok.
+
+unload() ->
+    emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}).
+
+do_clear(Logs) ->
+    Now = ?NOW,
+    Interval = emqx:get_config([emqx_slow_subs, expire_interval]),
+    Each = fun(#top_k{index = Index, last_update_time = Ts}) ->
+                   case Now - Ts >= Interval of
+                       true ->
+                           ets:delete(?TOPK_TAB, Index);
+                       _ ->
+                           true
+               end
+           end,
+    lists:foreach(Each, Logs).
+
+try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) ->
+    case ets:info(?TOPK_TAB, size) of
+        Size when Size < MaxSize ->
+            %% if the size is under limit, insert it directly
+            ets:insert(?TOPK_TAB,
+                       #top_k{index = Index, type = Type, last_update_time = Ts});
+        _Size ->
+            %% find the minimum value
+            ?INDEX(Min, _) = First =
+                case ets:first(?TOPK_TAB) of
+                    ?INDEX(_, _) = I ->  I;
+                    _ -> ?INDEX(Latency - 1, <<>>)
+                end,
+
+            case Latency =< Min of
+                true -> true;
+                _ ->
+                    ets:insert(?TOPK_TAB,
+                               #top_k{index = Index, type = Type, last_update_time = Ts}),
+
+                    ets:delete(?TOPK_TAB, First)
+            end
+    end.
+
+check_enable(Enable, #{enable := IsEnable} = State) ->
+    update_threshold(),
+    case Enable of
+        IsEnable ->
+            State;
+        true ->
+            notice_tick(),
+            expire_tick(),
+            load(),
+            State#{enable := true, last_tick_at => ?NOW};
+        _ ->
+            unload(),
+            State#{enable := false}
+    end.
+
+update_threshold() ->
+    Threshold = emqx:get_config([emqx_slow_subs, threshold]),
+    emqx_message_latency_stats:update_threshold(Threshold),
+    ok.

+ 108 - 0
apps/emqx_slow_subs/src/emqx_slow_subs_api.erl

@@ -0,0 +1,108 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_slow_subs_api).
+
+-behaviour(minirest_api).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
+
+-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
+
+-export([slow_subs/2, encode_record/1, settings/2]).
+
+-import(hoconsc, [mk/2, ref/1]).
+-import(emqx_mgmt_util, [bad_request/0]).
+
+-define(FORMAT_FUN, {?MODULE, encode_record}).
+-define(APP, emqx_slow_subs).
+-define(APP_NAME, <<"emqx_slow_subs">>).
+
+namespace() -> "slow_subscribers_statistics".
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE).
+
+paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"].
+
+schema(("/slow_subscriptions")) ->
+    #{
+      'operationId' => slow_subs,
+      delete => #{tags => [<<"slow subs">>],
+                  description => <<"Clear current data and re count slow topic">>,
+                  parameters => [],
+                  'requestBody' => [],
+                  responses => #{204 => <<"No Content">>}
+                 },
+      get => #{tags => [<<"slow subs">>],
+               description => <<"Get slow topics statistics record data">>,
+               parameters => [ {page, mk(integer(), #{in => query})}
+                             , {limit, mk(integer(), #{in => query})}
+                             ],
+               'requestBody' => [],
+               responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}
+              }
+     };
+
+schema("/slow_subscriptions/settings") ->
+    #{'operationId' => settings,
+      get => #{tags => [<<"slow subs">>],
+               description => <<"Get slow subs settings">>,
+               responses => #{200 => conf_schema()}
+              },
+      put => #{tags => [<<"slow subs">>],
+               description => <<"Update slow subs settings">>,
+               'requestBody' => conf_schema(),
+               responses => #{200 => conf_schema()}
+              }
+     }.
+
+fields(record) ->
+    [
+     {clientid, mk(string(), #{desc => <<"the clientid">>})},
+     {latency, mk(integer(), #{desc => <<"average time for message delivery or time for message expire">>})},
+     {type, mk(string(), #{desc => <<"type of the latency, could be average or expire">>})},
+     {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})}
+    ].
+
+conf_schema() ->
+    Ref = hoconsc:ref(emqx_slow_subs_schema, "emqx_slow_subs"),
+    hoconsc:mk(Ref, #{}).
+
+slow_subs(delete, _) ->
+    ok = emqx_slow_subs:clear_history(),
+    {204};
+
+slow_subs(get, #{query_string := QS}) ->
+    Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN),
+    {200, Data}.
+
+encode_record(#top_k{index = ?INDEX(Latency, ClientId),
+                     type = Type,
+                     last_update_time = Ts}) ->
+    #{clientid => ClientId,
+      latency => Latency,
+      type => Type,
+      last_update_time => Ts}.
+
+settings(get, _) ->
+    {200, emqx:get_raw_config([?APP_NAME], #{})};
+
+settings(put, #{body := Body}) ->
+    {ok, #{config := #{enable := Enable}}} = emqx:update_config([?APP], Body),
+    _ = emqx_slow_subs:update_settings(Enable),
+    {200, emqx:get_raw_config([?APP_NAME], #{})}.

+ 30 - 0
apps/emqx_slow_subs/src/emqx_slow_subs_app.erl

@@ -0,0 +1,30 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_slow_subs_app).
+
+-behaviour(application).
+
+-export([ start/2
+        , stop/1
+        ]).
+
+start(_Type, _Args) ->
+    {ok, Sup} = emqx_slow_subs_sup:start_link(),
+    {ok, Sup}.
+
+stop(_State) ->
+    ok.

+ 44 - 0
apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl

@@ -0,0 +1,44 @@
+-module(emqx_slow_subs_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-export([roots/0, fields/1]).
+
+roots() -> ["emqx_slow_subs"].
+
+fields("emqx_slow_subs") ->
+    [ {enable, sc(boolean(), false, "switch of this function")}
+    , {threshold,
+       sc(emqx_schema:duration_ms(),
+          "500ms",
+          "The latency threshold for statistics, the minimum value is 100ms")}
+    , {expire_interval,
+       sc(emqx_schema:duration_ms(),
+          "5m",
+          "The eviction time of the record, which in the statistics record table")}
+    , {top_k_num,
+       sc(integer(),
+          10,
+          "The maximum number of records in the slow subscription statistics record table")}
+    , {notice_interval,
+       sc(emqx_schema:duration_ms(),
+          "0s",
+          "The interval for pushing statistics table records to the system topic. When set to 0, push is disabled"
+          "publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval"
+          "publish is disabled if set to 0s."
+         )}
+    , {notice_qos,
+       sc(range(0, 2),
+          0,
+          "QoS of notification message in notice topic")}
+    , {notice_batch_size,
+       sc(integer(),
+          0,
+          "Maximum information number in one notification")}
+    ].
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+sc(Type, Default, Desc) ->
+    hoconsc:mk(Type, #{default => Default, desc => Desc}).

+ 36 - 0
apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl

@@ -0,0 +1,36 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_slow_subs_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    emqx_slow_subs:init_topk_tab(),
+    {ok, {{one_for_one, 10, 3600},
+          [#{id       => st_statistics,
+             start    => {emqx_slow_subs, start_link, []},
+             restart  => permanent,
+             shutdown => 5000,
+             type     => worker,
+             modules  => [emqx_slow_subs]}]}}.

+ 124 - 0
apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl

@@ -0,0 +1,124 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_slow_subs_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx.hrl").
+
+-define(TOPK_TAB, emqx_slow_subs_topk).
+-define(NOW, erlang:system_time(millisecond)).
+
+-define(BASE_CONF, <<"""
+emqx_slow_subs {
+    enable = true
+	top_k_num = 5,
+    expire_interval = 3000
+    notice_interval = 1500
+    notice_qos = 0
+    notice_batch_size = 3
+}""">>).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    ok = emqx_config:init_load(emqx_slow_subs_schema, ?BASE_CONF),
+    emqx_common_test_helpers:start_apps([emqx_slow_subs]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_common_test_helpers:stop_apps([emqx_slow_subs]).
+
+init_per_testcase(_, Config) ->
+    Config.
+
+end_per_testcase(_, _) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Test Cases
+%%--------------------------------------------------------------------
+t_log_and_pub(_) ->
+    %% Sub topic first
+    Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
+    Clients = start_client(Subs),
+    emqx:subscribe("$SYS/brokers/+/slow_subs"),
+    timer:sleep(1000),
+    Now = ?NOW,
+    %% publish
+
+    lists:foreach(fun(I) ->
+                          Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
+                          Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
+                          emqx:publish(Msg#message{timestamp = Now - 500})
+                  end,
+                  lists:seq(1, 10)),
+
+    lists:foreach(fun(I) ->
+                          Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
+                          Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
+                          emqx:publish(Msg#message{timestamp = Now - 500})
+                  end,
+                  lists:seq(1, 10)),
+
+    timer:sleep(1000),
+    Size = ets:info(?TOPK_TAB, size),
+    %% some time record maybe delete due to it expired
+    ?assert(Size =< 6 andalso Size >= 4),
+
+    timer:sleep(1500),
+    Recs = try_receive([]),
+    RecSum = lists:sum(Recs),
+    ?assert(RecSum >= 5),
+    ?assert(lists:all(fun(E) -> E =< 3 end, Recs)),
+
+    timer:sleep(2000),
+    ?assert(ets:info(?TOPK_TAB, size) =:= 0),
+    [Client ! stop || Client <- Clients],
+    ok.
+
+start_client(Subs) ->
+    [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].
+
+client(I, Subs) ->
+    {ok, C} = emqtt:start_link([{host,      "localhost"},
+                                {clientid,  io_lib:format("slow_subs_~p", [I])},
+                                {username,  <<"plain">>},
+                                {password,  <<"plain">>}]),
+    {ok, _} = emqtt:connect(C),
+
+    Len = erlang:length(Subs),
+    Sub = lists:nth(I rem Len + 1, Subs),
+    _ = emqtt:subscribe(C, Sub),
+
+    receive
+        stop ->
+            ok
+    end.
+
+try_receive(Acc) ->
+    receive
+        {deliver, _, #message{payload = Payload}} ->
+            #{<<"logs">> := Logs} =  emqx_json:decode(Payload, [return_maps]),
+            try_receive([length(Logs) | Acc])
+    after 500 ->
+            Acc
+    end.

+ 174 - 0
apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl

@@ -0,0 +1,174 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_slow_subs_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx_management/include/emqx_mgmt.hrl").
+-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
+
+-define(HOST, "http://127.0.0.1:18083/").
+
+-define(API_VERSION, "v5").
+
+-define(BASE_PATH, "api").
+-define(NOW, erlang:system_time(millisecond)).
+
+-define(CONF_DEFAULT, <<"""
+emqx_slow_subs
+{
+ enable = true
+ top_k_num = 5,
+ expire_interval = 60000
+ notice_interval = 0
+ notice_qos = 0
+ notice_batch_size = 3
+}""">>).
+
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    ok = emqx_config:init_load(emqx_slow_subs_schema, ?CONF_DEFAULT),
+    emqx_mgmt_api_test_util:init_suite([emqx_slow_subs]),
+    {ok, _} = application:ensure_all_started(emqx_authn),
+    Config.
+
+end_per_suite(Config) ->
+    application:stop(emqx_authn),
+    emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]),
+    Config.
+
+init_per_testcase(_, Config) ->
+    application:ensure_all_started(emqx_slow_subs),
+    timer:sleep(500),
+    Config.
+
+end_per_testcase(_, Config) ->
+    application:stop(emqx_slow_subs),
+    Config.
+
+t_get_history(_) ->
+    Now = ?NOW,
+    Each = fun(I) ->
+                   ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
+                   ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId),
+                                                type = average,
+                                                last_update_time = Now})
+           end,
+
+    lists:foreach(Each, lists:seq(1, 5)),
+
+    {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10",
+                             auth_header_()),
+    #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]),
+
+    RFirst = #{<<"clientid">> => <<"test_5">>,
+               <<"latency">> => 5,
+               <<"type">> => <<"average">>,
+               <<"last_update_time">> => Now},
+
+    ?assertEqual(RFirst, First).
+
+t_clear(_) ->
+    ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>),
+                                 type = average,
+                                 last_update_time = ?NOW}),
+
+    {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [],
+                          auth_header_()),
+
+    ?assertEqual(0, ets:info(?TOPK_TAB, size)).
+
+t_settting(_) ->
+    Conf = emqx:get_config([emqx_slow_subs]),
+    Conf2 = Conf#{threshold => 1000},
+    {ok, Data} = request_api(put,
+                             api_path(["slow_subscriptions", "settings"]),
+                             [],
+                             auth_header_(),
+                             Conf2),
+
+    Return = decode_json(Data),
+
+    ?assertEqual(Conf2, Return),
+
+    {ok, GetData} = request_api(get,
+                                api_path(["slow_subscriptions", "settings"]),
+                                [],
+                                auth_header_()
+                            ),
+
+    GetReturn = decode_json(GetData),
+
+    ?assertEqual(Conf2, GetReturn),
+
+    ?assertEqual(1000,
+                 emqx_message_latency_stats:get_threshold()).
+
+decode_json(Data) ->
+    BinJosn = emqx_json:decode(Data, [return_maps]),
+    emqx_map_lib:unsafe_atom_key_map(BinJosn).
+
+request_api(Method, Url, Auth) ->
+    request_api(Method, Url, [], Auth, []).
+
+request_api(Method, Url, QueryParams, Auth) ->
+    request_api(Method, Url, QueryParams, Auth, []).
+
+request_api(Method, Url, QueryParams, Auth, []) ->
+    NewUrl = case QueryParams of
+                 "" -> Url;
+                 _ -> Url ++ "?" ++ QueryParams
+             end,
+    do_request_api(Method, {NewUrl, [Auth]});
+request_api(Method, Url, QueryParams, Auth, Body) ->
+    NewUrl = case QueryParams of
+                 "" -> Url;
+                 _ -> Url ++ "?" ++ QueryParams
+             end,
+    do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
+
+do_request_api(Method, Request)->
+    ct:pal("Method: ~p, Request: ~p", [Method, Request]),
+    case httpc:request(Method, Request, [], [{body_format, binary}]) of
+        {error, socket_closed_remotely} ->
+            {error, socket_closed_remotely};
+        {ok, {{"HTTP/1.1", Code, _}, _, Return} }
+          when Code =:= 200 orelse Code =:= 204 ->
+            {ok, Return};
+        {ok, {Reason, _, _}} ->
+            {error, Reason}
+    end.
+
+auth_header_() ->
+    AppId = <<"admin">>,
+    AppSecret = <<"public">>,
+    auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
+
+auth_header_(User, Pass) ->
+    Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
+    {"Authorization","Basic " ++ Encoded}.
+
+api_path(Parts)->
+    ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).

+ 1 - 0
rebar.config.erl

@@ -305,6 +305,7 @@ relx_apps(ReleaseType, Edition) ->
     , emqx_statsd
     , emqx_prometheus
     , emqx_psk
+
     , emqx_plugins
     ]
     ++ [quicer || is_quicer_supported()]