Jelajahi Sumber

Merge pull request #6952 from lafirest/port/slow_subs

feat: port slow subs from v4.4
JianBo He 4 tahun lalu
induk
melakukan
5d616acf77

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -16,3 +16,4 @@
 {emqx_statsd,1}.
 {emqx_telemetry,1}.
 {emqx_topic_metrics,1}.
+{emqx_slow_subs,1}.

+ 1 - 2
apps/emqx/src/emqx_cm.erl

@@ -322,8 +322,7 @@ get_session_confs(#{zone := Zone, clientid := ClientId}, #{receive_maximum := Ma
       %% TODO: Add conf for allowing/disallowing persistent sessions.
       %% Note that the connection info is already enriched to have
       %% default config values for session expiry.
-      is_persistent => EI > 0,
-      latency_stats => emqx_config:get_zone_conf(Zone, [latency_stats])
+      is_persistent => EI > 0
      }.
 
 mqueue_confs(Zone) ->

+ 0 - 7
apps/emqx/src/emqx_schema.erl

@@ -182,9 +182,6 @@ this number of messages or bytes have passed through."""
    , {"persistent_session_store",
        sc(ref("persistent_session_store"),
           #{})}
-    , {"latency_stats",
-       sc(ref("latency_stats"),
-          #{})}
     , {"trace",
        sc(ref("trace"),
           #{desc => """
@@ -1105,10 +1102,6 @@ when deactivated, but after the retention time.
       }
     ];
 
-fields("latency_stats") ->
-    [ {"samples", sc(integer(), #{default => 10,
-                                  desc => "the number of samples for calculate the average latency of delivery"})}
-    ];
 fields("trace") ->
     [ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{
         default => text,

+ 56 - 68
apps/emqx/src/emqx_session.erl

@@ -129,15 +129,16 @@
           %% Awaiting PUBREL Timeout (Unit: millisecond)
           await_rel_timeout :: timeout(),
           %% Created at
-          created_at :: pos_integer(),
+          created_at :: pos_integer()
           %% Message deliver latency stats
-          latency_stats :: emqx_message_latency_stats:stats()
          }).
 
-%% in the previous code, we will replace the message record with the pubrel atom
-%% in the pubrec function, this will lose the creation time of the message,
-%% but now we need this time to calculate latency, so now pubrel atom is changed to this record
--record(pubrel_await, {timestamp :: non_neg_integer()}).
+
+-type inflight_data_phase() :: wait_ack | wait_comp.
+
+-record(inflight_data, { phase :: inflight_data_phase()
+                       , message :: emqx_types:message()
+                       , timestamp :: non_neg_integer()}).
 
 -type(session() :: #session{}).
 
@@ -168,7 +169,6 @@
     , next_pkt_id
     , awaiting_rel_cnt
     , awaiting_rel_max
-    , latency_stats
     ]).
 
 -define(DEFAULT_BATCH_N, 1000).
@@ -182,7 +182,6 @@
                     , mqueue => emqx_mqueue:options()
                     , is_persistent => boolean()
                     , clientid => emqx_types:clientid()
-                    , latency_stats => emqx_message_latency_stats:create_options()
                     }.
 
 %%--------------------------------------------------------------------
@@ -210,8 +209,7 @@ init(Opts) ->
        awaiting_rel      = #{},
        max_awaiting_rel  = maps:get(max_awaiting_rel, Opts, 100),
        await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000),
-       created_at        = erlang:system_time(millisecond),
-       latency_stats     = emqx_message_latency_stats:new(maps:get(latency_stats, Opts, #{}))
+       created_at        = erlang:system_time(millisecond)
       }.
 
 %%--------------------------------------------------------------------
@@ -267,9 +265,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
 info(created_at, #session{created_at = CreatedAt}) ->
-    CreatedAt;
-info(latency_stats, #session{latency_stats = Stats}) ->
-    emqx_message_latency_stats:latency(Stats).
+    CreatedAt.
 
 %% @doc Get stats of the session.
 -spec(stats(session()) -> emqx_types:stats()).
@@ -380,11 +376,11 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
        | {error, emqx_types:reason_code()}).
 puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
-        {value, {Msg, _Ts}} when is_record(Msg, message) ->
+        {value, #inflight_data{phase = wait_ack, message = Msg}} ->
+            on_delivery_completed(Msg, Session),
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
-            Session2 = update_latency(Msg, Session),
-            return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1}));
-        {value, {_Pubrel, _Ts}} ->
+            return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
+        {value, _} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@@ -404,11 +400,11 @@ return_with(Msg, {ok, Publishes, Session}) ->
        | {error, emqx_types:reason_code()}).
 pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
-        {value, {Msg, _Ts}} when is_record(Msg, message) ->
-            Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}),
+        {value, #inflight_data{phase = wait_ack, message = Msg} = Data} ->
+            Update = Data#inflight_data{phase = wait_comp},
             Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
             {ok, Msg, Session#session{inflight = Inflight1}};
-        {value, {_Pubrel, _Ts}} ->
+        {value, _} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@@ -437,10 +433,10 @@ pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
        | {error, emqx_types:reason_code()}).
 pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
-        {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) ->
-            Session2 = update_latency(Pubrel, Session),
+        {value, #inflight_data{phase = wait_comp, message = Msg}} ->
+            on_delivery_completed(Msg, Session),
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
-            dequeue(ClientInfo, Session2#session{inflight = Inflight1});
+            dequeue(ClientInfo, Session#session{inflight = Inflight1});
         {value, _Other} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
@@ -504,6 +500,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
     end.
 
 deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
+    on_delivery_completed(Msg, Session),        %
     {ok, [{undefined, maybe_ack(Msg)}], Session};
 
 deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
@@ -518,7 +515,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
             {ok, Session1};
         false ->
             Publish = {PacketId, maybe_ack(Msg)},
-            Session1 = await(PacketId, Msg, Session),
+            Msg2 = mark_begin_deliver(Msg),
+            Session1 = await(PacketId, Msg2, Session),
             {ok, [Publish], next_pkt_id(Session1)}
     end.
 
@@ -616,34 +614,30 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
 
 -spec(retry(emqx_types:clientinfo(), session()) ->
     {ok, session()} | {ok, replies(), timeout(), session()}).
-retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
+retry(ClientInfo, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:is_empty(Inflight) of
         true  -> {ok, Session};
         false ->
             Now = erlang:system_time(millisecond),
-            Session2 = check_expire_latency(Now, RetryInterval, Session),
-            retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], Now,
-                           Session2, ClientInfo)
+            retry_delivery(emqx_inflight:to_list(fun sort_fun/2, Inflight),
+                           [], Now, Session, ClientInfo)
     end.
 
 retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
     {ok, lists:reverse(Acc), Interval, Session};
 
-retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
-               #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
+retry_delivery([{PacketId, #inflight_data{timestamp = Ts} = Data} | More],
+               Acc, Now, Session = #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
     case (Age = age(Now, Ts)) >= Interval of
         true ->
-            {Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo),
+            {Acc1, Inflight1} = do_retry_delivery(PacketId, Data, Now, Acc, Inflight, ClientInfo),
             retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
         false ->
             {ok, lists:reverse(Acc), Interval - max(0, Age), Session}
     end.
 
-do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) ->
-    Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
-    {[{pubrel, PacketId}|Acc], Inflight1};
-
-do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) ->
+do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data,
+                  Now, Acc, Inflight, ClientInfo) ->
     case emqx_message:is_expired(Msg) of
         true ->
             ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
@@ -651,9 +645,15 @@ do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) ->
             {Acc, emqx_inflight:delete(PacketId, Inflight)};
         false ->
             Msg1 = emqx_message:set_flag(dup, true, Msg),
-            Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
-            {[{PacketId, Msg1}|Acc], Inflight1}
-    end.
+            Update = Data#inflight_data{message = Msg1, timestamp = Now},
+            Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
+            {[{PacketId, Msg1} | Acc], Inflight1}
+    end;
+
+do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) ->
+    Update = Data#inflight_data{timestamp = Now},
+    Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
+    {[{pubrel, PacketId} | Acc], Inflight1}.
 
 %%--------------------------------------------------------------------
 %% Expire Awaiting Rel
@@ -697,9 +697,9 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
 
 -spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
 replay(ClientInfo, Session = #session{inflight = Inflight}) ->
-    Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) ->
+    Pubs = lists:map(fun({PacketId, #inflight_data{phase = wait_comp}}) ->
                              {pubrel, PacketId};
-                        ({PacketId, {Msg, _Ts}}) ->
+                        ({PacketId, #inflight_data{message = Msg}}) ->
                              {PacketId, emqx_message:set_flag(dup, true, Msg)}
                      end, emqx_inflight:to_list(Inflight)),
     case dequeue(ClientInfo, Session) of
@@ -755,37 +755,23 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
 %%--------------------------------------------------------------------
 %% Message Latency Stats
 %%--------------------------------------------------------------------
-update_latency(Msg,
-               #session{clientid = ClientId,
-                        latency_stats = Stats,
-                        created_at = CreateAt} = S) ->
-    case get_birth_timestamp(Msg, CreateAt) of
-        0 -> S;
-        Ts ->
-            Latency = erlang:system_time(millisecond) - Ts,
-            Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats),
-            S#session{latency_stats = Stats2}
-    end.
-
-check_expire_latency(Now, Interval,
-                     #session{clientid = ClientId, latency_stats = Stats} = S) ->
-    Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats),
-    S#session{latency_stats = Stats2}.
-
-get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
-    Ts;
+on_delivery_completed(Msg,
+                      #session{created_at = CreateAt, clientid = ClientId}) ->
+    emqx:run_hook('delivery.completed',
+                  [Msg,
+                   #{session_birth_time => CreateAt, clientid => ClientId}]).
 
-get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
-    Ts;
-
-get_birth_timestamp(_, _) ->
-    0.
+mark_begin_deliver(Msg) ->
+    emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg).
 
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------
-sort_fun() ->
-    fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end.
+
+-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
+
+sort_fun({_, A}, {_, B}) ->
+    A#inflight_data.timestamp =< B#inflight_data.timestamp.
 
 batch_n(Inflight) ->
     case emqx_inflight:max_size(Inflight) of
@@ -794,7 +780,9 @@ batch_n(Inflight) ->
     end.
 
 with_ts(Msg) ->
-    {Msg, erlang:system_time(millisecond)}.
+    #inflight_data{phase = wait_ack,
+                   message = Msg,
+                   timestamp = erlang:system_time(millisecond)}.
 
 age(Now, Ts) -> Now - Ts.
 

+ 0 - 120
apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl

@@ -1,120 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_message_latency_stats).
-
-%% API
--export([new/1, update/3, check_expire/4, latency/1]).
-
--export([get_threshold/0, update_threshold/1]).
-
--define(NOW, erlang:system_time(millisecond)).
--define(MINIMUM_INSERT_INTERVAL, 1000).
--define(MINIMUM_THRESHOLD, 100).
--define(DEFAULT_THRESHOLD, 500).
--define(DEFAULT_SAMPLES, 10).
--define(THRESHOLD_KEY, {?MODULE, threshold}).
-
--opaque stats() :: #{ ema := emqx_moving_average:ema()
-                    , last_update_time := timestamp()
-                    , last_access_time := timestamp()  %% timestamp of last access top-k
-                    , last_insert_value := non_neg_integer()
-                    }.
-
--type timestamp() :: non_neg_integer().
--type timespan() :: number().
-
--type latency_type() :: average
-                      | expire.
-
--type create_options() :: #{samples => pos_integer()}.
-
--export_type([stats/0, latency_type/0, create_options/0]).
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
--spec new(non_neg_integer() | create_options()) -> stats().
-new(SamplesT) when is_integer(SamplesT) ->
-    Samples = erlang:max(1, SamplesT),
-    #{ ema => emqx_moving_average:new(exponential, #{period => Samples})
-     , last_update_time => 0
-     , last_access_time => 0
-     , last_insert_value => 0
-     };
-
-new(OptsT) ->
-    Opts = maps:merge(#{samples => ?DEFAULT_SAMPLES}, OptsT),
-    #{samples := Samples} = Opts,
-    new(Samples).
-
--spec update(emqx_types:clientid(), number(), stats()) -> stats().
-update(ClientId, Val, #{ema := EMA} = Stats) ->
-    Now = ?NOW,
-    #{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA),
-    Stats2 = call_hook(ClientId, Now, average, Latency, Stats),
-    Stats2#{ ema := EMA2
-           , last_update_time := ?NOW}.
-
--spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats().
-check_expire(_, Now, Interval, #{last_update_time := LUT} = S)
-  when LUT >= Now - Interval ->
-    S;
-
-check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) ->
-    Latency = Now - LUT,
-    call_hook(ClientId, Now, expire, Latency, S).
-
--spec latency(stats()) -> number().
-latency(#{ema := #{average := Average}}) ->
-    Average.
-
--spec update_threshold(pos_integer()) -> pos_integer().
-update_threshold(Threshold) ->
-    Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD),
-    persistent_term:put(?THRESHOLD_KEY, Val),
-    Val.
-
-get_threshold() ->
-    persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD).
-
-%%--------------------------------------------------------------------
-%%  Internal functions
-%%--------------------------------------------------------------------
--spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
-call_hook(_, _, _, Latency, S)
-  when Latency =< ?MINIMUM_THRESHOLD ->
-    S;
-
-call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
-  when Now =< LIT + ?MINIMUM_INSERT_INTERVAL ->
-    S;
-
-call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
-    case Latency =< get_threshold() of
-        true ->
-            Stats#{last_access_time := Now};
-        _ ->
-            ToInsert = erlang:floor(Latency),
-            Arg = #{clientid => ClientId,
-                    latency => ToInsert,
-                    type => Type,
-                    last_insert_value => LIV,
-                    update_time => Now},
-            emqx:run_hook('message.slow_subs_stats', [Arg]),
-            Stats#{last_insert_value := ToInsert,
-                   last_access_time := Now}
-    end.

+ 0 - 90
apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl

@@ -1,90 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @see https://en.wikipedia.org/wiki/Moving_average
-
--module(emqx_moving_average).
-
-%% API
--export([new/0, new/1, new/2, update/2]).
-
--type type() :: cumulative
-              | exponential.
-
--type ema() :: #{ type := exponential
-                , average := 0 | float()
-                , coefficient := float()
-                }.
-
--type cma() :: #{ type := cumulative
-                , average := 0 | float()
-                , count := non_neg_integer()
-                }.
-
--type moving_average() :: ema()
-                        | cma().
-
--define(DEF_EMA_ARG, #{period => 10}).
--define(DEF_AVG_TYPE, exponential).
-
--export_type([type/0, moving_average/0, ema/0, cma/0]).
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
--spec new() -> moving_average().
-new() ->
-    new(?DEF_AVG_TYPE, #{}).
-
--spec new(type()) -> moving_average().
-new(Type) ->
-    new(Type, #{}).
-
--spec new(type(), Args :: map()) -> moving_average().
-new(cumulative, _) ->
-    #{ type => cumulative
-     , average => 0
-     , count => 0
-     };
-
-new(exponential, Arg) ->
-    #{period := Period} = maps:merge(?DEF_EMA_ARG, Arg),
-    #{ type => exponential
-     , average => 0
-       %% coefficient = 2/(N+1) is a common convention, see the wiki link for details
-     , coefficient => 2 / (Period + 1)
-     }.
-
--spec update(number(), moving_average()) -> moving_average().
-
-update(Val, #{average := 0} = Avg) ->
-    Avg#{average := Val};
-
-update(Val, #{ type := cumulative
-             , average := Average
-             , count := Count} = CMA) ->
-    NewCount = Count + 1,
-    CMA#{average := (Count * Average + Val) / NewCount,
-         count := NewCount};
-
-update(Val, #{ type := exponential
-             , average := Average
-             , coefficient := Coefficient} = EMA) ->
-    EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}.
-
-%%--------------------------------------------------------------------
-%%  Internal functions
-%%--------------------------------------------------------------------

+ 1 - 1
apps/emqx/src/emqx_zone_schema.erl

@@ -24,7 +24,7 @@ namespace() -> zone.
 %% roots are added only for document generation.
 roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown",
             "conn_congestion", "rate_limit", "quota", "force_gc",
-            "overload_protection", "latency_stats"
+            "overload_protection"
            ].
 
 %% zone schemas are clones from the same name from root level

+ 1 - 26
apps/emqx/test/emqx_proper_types.erl

@@ -113,8 +113,7 @@ sessioninfo() ->
                     awaiting_rel(),     % awaiting_rel
                     non_neg_integer(),  % max_awaiting_rel
                     safty_timeout(),    % await_rel_timeout
-                    timestamp(),        % created_at
-                    latency_stats()
+                    timestamp()        % created_at
                   },
          emqx_session:info(Session)).
 
@@ -338,30 +337,6 @@ normal_topic_filter() ->
              end
          end).
 
-%% Type defined emqx_message_lantency_stats.erl - stats()
-latency_stats() ->
-    Keys = [{threshold, number()},
-            {ema, exp_moving_average()},
-            {last_update_time, non_neg_integer()},
-            {last_access_time, non_neg_integer()},
-            {last_insert_value, non_neg_integer()}
-           ],
-    ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
-         begin
-             maps:merge(maps:from_list(Ks), M)
-         end).
-
-%% Type defined emqx_moving_average.erl - ema()
-exp_moving_average() ->
-    Keys = [{type, exponential},
-            {average, number()},
-            {coefficient, float()}
-           ],
-    ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
-         begin
-             maps:merge(maps:from_list(Ks), M)
-         end).
-
 %%--------------------------------------------------------------------
 %% Basic Types
 %%--------------------------------------------------------------------

+ 50 - 13
apps/emqx/test/emqx_session_SUITE.erl

@@ -25,7 +25,12 @@
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 -define(NOW, erlang:system_time(millisecond)).
--record(pubrel_await, {timestamp :: non_neg_integer()}).
+
+-type inflight_data_phase() :: wait_ack | wait_comp.
+
+-record(inflight_data, { phase :: inflight_data_phase()
+                       , message :: emqx_types:message()
+                       , timestamp :: non_neg_integer()}).
 
 %%--------------------------------------------------------------------
 %% CT callbacks
@@ -167,14 +172,14 @@ t_is_awaiting_full_true(_) ->
 
 t_puback(_) ->
     Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
-    Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg), emqx_inflight:new()),
     Session = session(#{inflight => Inflight, mqueue => mqueue()}),
     {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
 
 t_puback_with_dequeue(_) ->
     Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>),
-    Inflight = emqx_inflight:insert(1, {Msg1, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg1), emqx_inflight:new()),
     Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
     {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
     Session = session(#{inflight => Inflight, mqueue => Q}),
@@ -184,7 +189,7 @@ t_puback_with_dequeue(_) ->
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
 
 t_puback_error_packet_id_in_use(_) ->
-    Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
     {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
         emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})).
 
@@ -193,13 +198,13 @@ t_puback_error_packet_id_not_found(_) ->
 
 t_pubrec(_) ->
     Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
-    Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(2, with_ts(wait_ack, Msg), emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
     {ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session),
-    ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
+    ?assertMatch([#inflight_data{phase = wait_comp}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
 
 t_pubrec_packet_id_in_use_error(_) ->
-    Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
     {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
         emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})).
 
@@ -215,7 +220,7 @@ t_pubrel_error_packetid_not_found(_) ->
     {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()).
 
 t_pubcomp(_) ->
-    Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
+    Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
     {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
@@ -272,9 +277,11 @@ t_deliver_qos1(_) ->
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
-    {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
+    {ok, Msg1T, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
+    ?assertEqual(Msg1, remove_deliver_flag(Msg1T)),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
-    {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
+    {ok, Msg2T, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
+    ?assertEqual(Msg2, remove_deliver_flag(Msg2T)),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
 
 t_deliver_qos2(_) ->
@@ -319,8 +326,9 @@ t_retry(_) ->
     {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
     ElapseMs = 200, %% 0.2s
     ok = timer:sleep(ElapseMs),
-    Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
-    {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
+    Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs],
+    {ok, Msgs1T, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
+    ?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
 
 %%--------------------------------------------------------------------
@@ -344,7 +352,7 @@ t_replay(_) ->
     Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
     Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
     {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
-    ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
+    ?assertEqual(Pubs1 ++ [{3, Msg}], remove_deliver_flag(ReplayPubs)),
     ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
 
 t_expire_awaiting_rel(_) ->
@@ -404,3 +412,32 @@ ts(second) ->
     erlang:system_time(second);
 ts(millisecond) ->
     erlang:system_time(millisecond).
+
+with_ts(Phase, Msg) ->
+    with_ts(Phase, Msg, erlang:system_time(millisecond)).
+
+with_ts(Phase, Msg, Ts) ->
+    #inflight_data{phase = Phase,
+                   message = Msg,
+                   timestamp = Ts}.
+
+remove_deliver_flag({Id, Data}) ->
+    {Id, remove_deliver_flag(Data)};
+
+remove_deliver_flag(#inflight_data{message = Msg} = Data) ->
+    Data#inflight_data{message = remove_deliver_flag(Msg)};
+
+remove_deliver_flag(List) when is_list(List) ->
+    lists:map(fun remove_deliver_flag/1, List);
+
+remove_deliver_flag(Msg) ->
+    emqx_message:remove_header(deliver_begin_at, Msg).
+
+inflight_data_to_msg({Id, Data}) ->
+    {Id, inflight_data_to_msg(Data)};
+
+inflight_data_to_msg(#inflight_data{message = Msg}) ->
+    Msg;
+
+inflight_data_to_msg(List) when is_list(List) ->
+    lists:map(fun inflight_data_to_msg/1, List).

+ 9 - 14
apps/emqx_slow_subs/etc/emqx_slow_subs.conf

@@ -20,21 +20,16 @@ slow_subs {
     ## Value: 10
     top_k_num = 10
 
-    ## The interval for pushing statistics table records to the system topic. When set to 0, push is disabled
-    ## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval
-    ## publish is disabled if set to 0s.
+    ## The ways to calculate the latency are as follows:
     ##
-    ## Value: 0s
-    notice_interval = 0s
-
-    ## QoS of notification message
+    ## 1. whole
+    ## From the time the message arrives at EMQX until the message completes transmission
     ##
-    ## Default: 0
-    notice_qos = 0
-
-    ## Maximum information number in one notification
+    ## 2.internal
+    ## From when the message arrives at EMQX until when EMQX starts delivering the message
     ##
-    ## Default: 100
-    notice_batch_size = 100
-
+    ## 3.response
+    ## From the time EMQX starts delivering the message, until the message completes transmission
+    ## Default: whole
+    stats_type = whole
 }

+ 13 - 5
apps/emqx_slow_subs/include/emqx_slow_subs.hrl

@@ -15,16 +15,24 @@
 %%--------------------------------------------------------------------
 
 -define(TOPK_TAB, emqx_slow_subs_topk).
+-define(INDEX_TAB, emqx_slow_subs_index).
 
--define(INDEX(Latency, ClientId), {Latency, ClientId}).
+-define(ID(ClientId, Topic), {ClientId, Topic}).
+-define(INDEX(TimeSpan, Id), {Id, TimeSpan}).
+-define(TOPK_INDEX(TimeSpan, Id), {TimeSpan, Id}).
 
--define(MAX_TAB_SIZE, 1000).
+-define(MAX_SIZE, 1000).
 
--record(top_k, { index :: index()
-               , type :: emqx_message_latency_stats:latency_type()
+-record(top_k, { index :: topk_index()
                , last_update_time :: pos_integer()
                , extra = []
                }).
 
+-record(index_tab, { index :: index()}).
+
 -type top_k() :: #top_k{}.
--type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()).
+-type index_tab() :: #index_tab{}.
+
+-type id() :: {emqx_types:clientid(), emqx_types:topic()}.
+-type index() :: ?INDEX(non_neg_integer(), id()).
+-type topk_index() :: ?TOPK_INDEX(non_neg_integer(), id()).

+ 147 - 174
apps/emqx_slow_subs/src/emqx_slow_subs.erl

@@ -22,8 +22,8 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
 
--export([ start_link/0, on_stats_update/2, update_settings/1
-        , clear_history/0, init_topk_tab/0, post_config_update/5
+-export([ start_link/0, on_delivery_completed/3, update_settings/1
+        , clear_history/0, init_tab/0, post_config_update/5
         ]).
 
 %% gen_server callbacks
@@ -40,29 +40,19 @@
 -type state() :: #{ enable := boolean()
                   , last_tick_at := pos_integer()
                   , expire_timer := undefined | reference()
-                  , notice_timer := undefined | reference()
                   }.
 
--type log() :: #{ rank := pos_integer()
-                , clientid := emqx_types:clientid()
-                , latency := non_neg_integer()
-                , type := emqx_message_latency_stats:latency_type()
-                }.
-
--type window_log() :: #{ last_tick_at := pos_integer()
-                       , logs := [log()]
-                       }.
-
 -type message() :: #message{}.
 
--type stats_update_args() :: #{ clientid := emqx_types:clientid()
-                              , latency := non_neg_integer()
-                              , type := emqx_message_latency_stats:latency_type()
-                              , last_insert_value := non_neg_integer()
-                              , update_time := timer:time()
-                              }.
+-type stats_type() :: whole         %% whole = internal + response
+                    | internal      %% timespan from message in to deliver
+                    | response.     %% timespan from delivery to client response
 
--type stats_update_env() :: #{max_size := pos_integer()}.
+-type stats_update_args() :: #{session_birth_time := pos_integer()}.
+
+-type stats_update_env() :: #{ threshold := non_neg_integer()
+                             , stats_type := stats_type()
+                             , max_size := pos_integer()}.
 
 -ifdef(TEST).
 -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
@@ -71,7 +61,6 @@
 -endif.
 
 -define(NOW, erlang:system_time(millisecond)).
--define(NOTICE_TOPIC_NAME, "slow_subs").
 -define(DEF_CALL_TIMEOUT, timer:seconds(10)).
 
 %% erlang term order
@@ -88,37 +77,30 @@
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-%% XXX NOTE:pay attention to the performance here
--spec on_stats_update(stats_update_args(), stats_update_env()) -> true.
-on_stats_update(#{clientid := ClientId,
-                  latency := Latency,
-                  type := Type,
-                  last_insert_value := LIV,
-                  update_time := Ts},
-                #{max_size := MaxSize}) ->
-
-    LastIndex = ?INDEX(LIV, ClientId),
-    Index = ?INDEX(Latency, ClientId),
-
-    %% check whether the client is in the table
-    case ets:lookup(?TOPK_TAB, LastIndex) of
-        [#top_k{index = Index}] ->
-            %% if last value == the new value, update the type and last_update_time
-            %% XXX for clients whose latency are stable for a long time, is it
-            %% possible to reduce updates?
-            ets:insert(?TOPK_TAB,
-                       #top_k{index = Index, type = Type, last_update_time = Ts});
-        [_] ->
-            %% if Latency > minimum value, we should update it
-            %% if Latency < minimum value, maybe it can replace the minimum value
-            %% so always update at here
-            %% do we need check if Latency == minimum ???
-            ets:insert(?TOPK_TAB,
-                       #top_k{index = Index, type = Type, last_update_time = Ts}),
-            ets:delete(?TOPK_TAB, LastIndex);
-        [] ->
-            %% try to insert
-            try_insert_to_topk(MaxSize, Index, Latency, Type, Ts)
+on_delivery_completed(#message{timestamp = Ts},
+                      #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime ->
+    ok;
+
+on_delivery_completed(Msg, Env, Cfg) ->
+    on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
+
+on_delivery_completed(#message{topic = Topic} = Msg,
+                      #{clientid := ClientId},
+                      Now,
+                      #{threshold := Threshold,
+                        stats_type := StatsType,
+                        max_size := MaxSize}) ->
+    TimeSpan = calc_timespan(StatsType, Msg, Now),
+    case TimeSpan =< Threshold of
+        true -> ok;
+        _ ->
+            Id = ?ID(ClientId, Topic),
+            LastUpdateValue = find_last_update_value(Id),
+            case TimeSpan =< LastUpdateValue of
+                true -> ok;
+                _ ->
+                    try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id)
+            end
     end.
 
 clear_history() ->
@@ -127,21 +109,20 @@ clear_history() ->
 update_settings(Conf) ->
     emqx_conf:update([slow_subs], Conf, #{override_to => cluster}).
 
-init_topk_tab() ->
-    case ets:whereis(?TOPK_TAB) of
-        undefined ->
-            ?TOPK_TAB = ets:new(?TOPK_TAB,
-                                [ ordered_set, public, named_table
-                                , {keypos, #top_k.index}, {write_concurrency, true}
-                                , {read_concurrency, true}
-                                ]);
-        _ ->
-            ?TOPK_TAB
-    end.
-
 post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
     gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT).
 
+init_tab() ->
+    safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table
+                               , {keypos, #top_k.index}, {write_concurrency, true}
+                               , {read_concurrency, true}
+                               ]),
+
+    safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table
+                                , {keypos, #index_tab.index}, {write_concurrency, true}
+                                , {read_concurrency, true}
+                                ]).
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
@@ -151,8 +132,7 @@ init([]) ->
 
     InitState = #{enable => false,
                   last_tick_at => 0,
-                  expire_timer => undefined,
-                  notice_timer => undefined
+                  expire_timer => undefined
                  },
 
     Enable = emqx:get_config([slow_subs, enable]),
@@ -164,7 +144,7 @@ handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
     {reply, ok, State2};
 
 handle_call(clear_history, _, State) ->
-    ets:delete_all_objects(?TOPK_TAB),
+    do_clear_history(),
     {reply, ok, State};
 
 handle_call(Req, _From, State) ->
@@ -181,12 +161,6 @@ handle_info(expire_tick, State) ->
     State1 = start_timer(expire_timer, fun expire_tick/0, State),
     {noreply, State1};
 
-handle_info(notice_tick, State) ->
-    Logs = ets:tab2list(?TOPK_TAB),
-    do_notification(Logs, State),
-    State1 = start_timer(notice_timer, fun notice_tick/0, State),
-    {noreply, State1#{last_tick_at := ?NOW}};
-
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
@@ -204,123 +178,127 @@ code_change(_OldVsn, State, _Extra) ->
 expire_tick() ->
     erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
 
-notice_tick() ->
-    case emqx:get_config([slow_subs, notice_interval]) of
-        0 -> undefined;
-        Interval ->
-            erlang:send_after(Interval, self(), ?FUNCTION_NAME)
-    end.
-
--spec do_notification(list(), state()) -> ok.
-do_notification([], _) ->
-    ok;
-
-do_notification(Logs, #{last_tick_at := LastTickTime}) ->
-    start_publish(Logs, LastTickTime),
-    ok.
-
-start_publish(Logs, TickTime) ->
-    emqx_pool:async_submit({fun do_publish/3, [Logs, erlang:length(Logs), TickTime]}).
-
-do_publish([], _, _) ->
-    ok;
-
-do_publish(Logs, Rank, TickTime) ->
-    BatchSize = emqx:get_config([slow_subs, notice_batch_size]),
-    do_publish(Logs, BatchSize, Rank, TickTime, []).
-
-do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 ->
-    Cache2 = [convert_to_notice(Rank, Log) | Cache],
-    do_publish(T, Size - 1, Rank - 1, TickTime, Cache2);
-
-do_publish(Logs, Size, Rank, TickTime, Cache) when Size =:= 0 ->
-    publish(TickTime, Cache),
-    do_publish(Logs, Rank, TickTime);
-
-do_publish([], _, _Rank, TickTime, Cache) ->
-    publish(TickTime, Cache),
-    ok.
-
-convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId),
-                               type = Type,
-                               last_update_time = Ts}) ->
-    #{rank => Rank,
-      clientid => ClientId,
-      latency => Latency,
-      type => Type,
-      timestamp => Ts}.
-
-publish(TickTime, Notices) ->
-    WindowLog = #{last_tick_at => TickTime,
-                  logs => lists:reverse(Notices)},
-    Payload = emqx_json:encode(WindowLog),
-    Msg = #message{ id = emqx_guid:gen()
-                  , qos = emqx:get_config([slow_subs, notice_qos])
-                  , from = ?MODULE
-                  , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
-                  , payload = Payload
-                  , timestamp = ?NOW
-                  },
-    _ = emqx_broker:safe_publish(Msg),
-    ok.
-
 load(State) ->
-    MaxSizeT = emqx:get_config([slow_subs, top_k_num]),
-    MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE),
-    _ = emqx:hook('message.slow_subs_stats',
-                  {?MODULE, on_stats_update, [#{max_size => MaxSize}]}
-                 ),
-
-    State1 = start_timer(notice_timer, fun notice_tick/0, State),
-    State2 = start_timer(expire_timer, fun expire_tick/0, State1),
-    State2#{enable := true, last_tick_at => ?NOW}.
+    #{top_k_num := MaxSizeT,
+      stats_type := StatsType,
+      threshold := Threshold} = emqx:get_config([slow_subs]),
+    MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
+    _ = emqx:hook('delivery.completed',
+                  {?MODULE, on_delivery_completed,
+                   [#{max_size => MaxSize,
+                      stats_type => StatsType,
+                      threshold => Threshold
+                     }]}),
 
+    State1 = start_timer(expire_timer, fun expire_tick/0, State),
+    State1#{enable := true, last_tick_at => ?NOW}.
 
-unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) ->
-    emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}),
-    State#{notice_timer := cancel_timer(NoticeTimer),
-           expire_timer := cancel_timer(ExpireTimer)
-          }.
+unload(#{expire_timer := ExpireTimer} = State) ->
+    emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}),
+    State#{expire_timer := cancel_timer(ExpireTimer)}.
 
 do_clear(Logs) ->
     Now = ?NOW,
     Interval = emqx:get_config([slow_subs, expire_interval]),
-    Each = fun(#top_k{index = Index, last_update_time = Ts}) ->
+    Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) ->
                    case Now - Ts >= Interval of
                        true ->
-                           ets:delete(?TOPK_TAB, Index);
+                           delete_with_index(TimeSpan, Id);
                        _ ->
                            true
-               end
+                   end
            end,
     lists:foreach(Each, Logs).
 
-try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) ->
+-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer().
+calc_timespan(whole, #message{timestamp = Ts}, Now) ->
+    Now - Ts;
+
+calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) ->
+    End = emqx_message:get_header(deliver_begin_at, Msg, Now),
+    End - Ts;
+
+calc_timespan(response, Msg, Now) ->
+    Begin = emqx_message:get_header(deliver_begin_at, Msg, Now),
+    Now - Begin.
+
+%% update_topk is safe, because each process has a unique clientid
+%% insert or delete are bind to this clientid, so there is no race condition
+%%
+%% but, the delete_with_index in L249 may have a race condition
+%% because the data belong to other clientid will be deleted here
+%% (deleted the data written by other processes).
+%% so it may appear that:
+%%   when deleting a record, the other process is performing an update operation on this recrod
+%% in order to solve this race condition problem, the index table also uses the ordered_set type,
+%% so that even if the above situation occurs, it will only cause the old data to be deleted twice
+%% and the correctness of the data will not be affected
+
+try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) ->
     case ets:info(?TOPK_TAB, size) of
         Size when Size < MaxSize ->
-            %% if the size is under limit, insert it directly
-            ets:insert(?TOPK_TAB,
-                       #top_k{index = Index, type = Type, last_update_time = Ts});
+            update_topk(Now, LastUpdateValue, TimeSpan, Id);
         _Size ->
-            %% find the minimum value
-            ?INDEX(Min, _) = First =
-                case ets:first(?TOPK_TAB) of
-                    ?INDEX(_, _) = I ->  I;
-                    _ -> ?INDEX(Latency - 1, <<>>)
-                end,
-
-            case Latency =< Min of
-                true -> true;
-                _ ->
-                    ets:insert(?TOPK_TAB,
-                               #top_k{index = Index, type = Type, last_update_time = Ts}),
-
-                    ets:delete(?TOPK_TAB, First)
+            case ets:first(?TOPK_TAB) of
+                '$end_of_table' ->
+                    update_topk(Now, LastUpdateValue, TimeSpan, Id);
+                ?TOPK_INDEX(_, Id) ->
+                    update_topk(Now, LastUpdateValue, TimeSpan, Id);
+                ?TOPK_INDEX(Min, MinId) ->
+                    case TimeSpan =< Min of
+                        true -> false;
+                        _ ->
+                            update_topk(Now, LastUpdateValue, TimeSpan, Id),
+                            delete_with_index(Min, MinId)
+                    end
             end
     end.
 
+
+-spec find_last_update_value(id()) -> non_neg_integer().
+find_last_update_value(Id) ->
+    case ets:next(?INDEX_TAB,  ?INDEX(0, Id)) of
+        ?INDEX(LastUpdateValue, Id) ->
+            LastUpdateValue;
+        _ ->
+            0
+    end.
+
+-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true.
+update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
+    %% update record
+    ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id),
+                                 last_update_time = Now,
+                                 extra = []
+                                }),
+
+    %% update index
+    ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}),
+
+    %% delete the old record & index
+    delete_with_index(LastUpdateValue, Id).
+
+-spec delete_with_index(non_neg_integer(), id()) -> true.
+delete_with_index(0, _) ->
+    true;
+
+delete_with_index(TimeSpan, Id) ->
+    ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)),
+    ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)).
+
+safe_create_tab(Name, Opts) ->
+    case ets:whereis(Name) of
+        undefined ->
+            Name = ets:new(Name, Opts);
+        _ ->
+            Name
+    end.
+
+do_clear_history() ->
+    ets:delete_all_objects(?INDEX_TAB),
+    ets:delete_all_objects(?TOPK_TAB).
+
 check_enable(Enable, #{enable := IsEnable} = State) ->
-    update_threshold(),
     case Enable of
         IsEnable ->
             State;
@@ -330,11 +308,6 @@ check_enable(Enable, #{enable := IsEnable} = State) ->
             unload(State)
     end.
 
-update_threshold() ->
-    Threshold = emqx:get_config([slow_subs, threshold]),
-    emqx_message_latency_stats:update_threshold(Threshold),
-    ok.
-
 start_timer(Name, Fun, State) ->
     _ = cancel_timer(maps:get(Name, State)),
     State#{Name := Fun()}.

+ 44 - 23
apps/emqx_slow_subs/src/emqx_slow_subs_api.erl

@@ -23,14 +23,14 @@
 
 -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
 
--export([slow_subs/2, encode_record/1, settings/2]).
+-export([slow_subs/2, get_history/0, settings/2]).
 
 -import(hoconsc, [mk/2, ref/1]).
 -import(emqx_mgmt_util, [bad_request/0]).
 
--define(FORMAT_FUN, {?MODULE, encode_record}).
 -define(APP, emqx_slow_subs).
 -define(APP_NAME, <<"emqx_slow_subs">>).
+-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)).
 
 namespace() -> "slow_subscribers_statistics".
 
@@ -74,12 +74,13 @@ schema("/slow_subscriptions/settings") ->
 fields(record) ->
     [ {clientid,
        mk(string(), #{desc => <<"the clientid">>})},
-      {latency,
+      {node,
+       mk(string(), #{desc => <<"the node">>})},
+      {topic,
+       mk(string(), #{desc => <<"the topic">>})},
+      {timespan,
        mk(integer(),
-          #{desc => <<"average time for message delivery or time for message expire">>})},
-      {type,
-       mk(string(),
-          #{desc => <<"type of the latency, could be average or expire">>})},
+          #{desc => <<"timespan for message transmission">>})},
       {last_update_time,
        mk(integer(), #{desc => <<"the timestamp of last update">>})}
     ].
@@ -89,24 +90,40 @@ conf_schema() ->
     hoconsc:mk(Ref, #{}).
 
 slow_subs(delete, _) ->
-    ok = emqx_slow_subs:clear_history(),
+    _ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end),
     {204};
 
-slow_subs(get, #{query_string := QST}) ->
-    LimitT = maps:get(<<"limit">>, QST, ?MAX_TAB_SIZE),
-    Limit = erlang:min(?MAX_TAB_SIZE, emqx_mgmt_api:b2i(LimitT)),
-    Page = maps:get(<<"page">>, QST, 1),
-    QS = QST#{<<"limit">> => Limit, <<"page">> => Page},
-    Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN),
-    {200, Data}.
-
-encode_record(#top_k{index = ?INDEX(Latency, ClientId),
-                     type = Type,
-                     last_update_time = Ts}) ->
-    #{clientid => ClientId,
-      latency => Latency,
-      type => Type,
-      last_update_time => Ts}.
+slow_subs(get, _) ->
+    NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end),
+    Fun = fun({ok, L}, Acc) -> L ++ Acc;
+             (_, Acc) -> Acc
+          end,
+    RankL = lists:foldl(Fun, [], NodeRankL),
+
+    SortFun = fun(#{timespan := A}, #{timespan := B}) ->
+                      A > B
+              end,
+
+    SortedL = lists:sort(SortFun, RankL),
+    SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
+
+    {200, SortedL2}.
+
+get_history() ->
+    Node = node(),
+    RankL = ets:tab2list(?TOPK_TAB),
+    ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
+                         last_update_time = LastUpdateTime
+                        }) ->
+                      #{ clientid => ClientId
+                       , node => Node
+                       , topic => Topic
+                       , timespan => TimeSpan
+                       , last_update_time => LastUpdateTime
+                       }
+              end,
+
+    lists:map(ConvFun, RankL).
 
 settings(get, _) ->
     {200, emqx:get_raw_config([slow_subs], #{})};
@@ -114,3 +131,7 @@ settings(get, _) ->
 settings(put, #{body := Body}) ->
     _ = emqx_slow_subs:update_settings(Body),
     {200, emqx:get_raw_config([slow_subs], #{})}.
+
+rpc_call(Fun) ->
+    Nodes = mria_mnesia:running_nodes(),
+    Fun(Nodes).

+ 4 - 15
apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl

@@ -22,21 +22,10 @@ fields("slow_subs") ->
        sc(integer(),
           10,
           "The maximum number of records in the slow subscription statistics record table")}
-    , {notice_interval,
-       sc(emqx_schema:duration_ms(),
-          "0s",
-          "The interval for pushing statistics table records to the system topic. "
-          "publish top-k list to $SYS/brokers/${node}/slow_subs per notice_interval. "
-          "publish is disabled if set to 0s."
-         )}
-    , {notice_qos,
-       sc(emqx_schema:qos(),
-          0,
-          "QoS of notification message in notice topic")}
-    , {notice_batch_size,
-       sc(integer(),
-          100,
-          "Maximum information number in one notification")}
+    , {stats_type,
+       sc(hoconsc:union([whole, internal, response]),
+          whole,
+          "The method to calculate the latency")}
     ].
 
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl

@@ -26,7 +26,7 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    emqx_slow_subs:init_topk_tab(),
+    emqx_slow_subs:init_tab(),
     {ok, {{one_for_one, 10, 3600},
           [#{id       => st_statistics,
              start    => {emqx_slow_subs, start_link, []},

+ 36 - 0
apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl

@@ -0,0 +1,36 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_slow_subs_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([introduced_in/0]).
+
+-export([clear_history/1, get_history/1]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec clear_history([node()]) -> emqx_rpc:erpc_multicall(map()).
+clear_history(Nodes) ->
+    erpc:multicall(Nodes, emqx_slow_subs, clear_history, []).
+
+-spec get_history([node()]) -> emqx_rpc:erpc_multicall(map()).
+get_history(Nodes) ->
+    erpc:multicall(Nodes, emqx_slow_subs_api, get_history, []).

+ 4 - 22
apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl

@@ -31,10 +31,8 @@ slow_subs {
     enable = true
 	top_k_num = 5,
     expire_interval = 3000
-    notice_interval = 1500
-    notice_qos = 0
-    notice_batch_size = 3
-}""">>).
+    stats_type = whole
+    }""">>).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -60,7 +58,6 @@ t_log_and_pub(_) ->
     %% Sub topic first
     Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
     Clients = start_client(Subs),
-    emqx:subscribe("$SYS/brokers/+/slow_subs"),
     timer:sleep(1000),
     Now = ?NOW,
     %% publish
@@ -82,15 +79,9 @@ t_log_and_pub(_) ->
     timer:sleep(1000),
     Size = ets:info(?TOPK_TAB, size),
     %% some time record maybe delete due to it expired
-    ?assert(Size =< 6 andalso Size >= 4),
+    ?assert(Size =< 6 andalso Size > 3),
 
-    timer:sleep(1500),
-    Recs = try_receive([]),
-    RecSum = lists:sum(Recs),
-    ?assert(RecSum >= 5),
-    ?assert(lists:all(fun(E) -> E =< 3 end, Recs)),
-
-    timer:sleep(3000),
+    timer:sleep(4000),
     ?assert(ets:info(?TOPK_TAB, size) =:= 0),
     [Client ! stop || Client <- Clients],
     ok.
@@ -113,12 +104,3 @@ client(I, Subs) ->
         stop ->
             ok
     end.
-
-try_receive(Acc) ->
-    receive
-        {deliver, _, #message{payload = Payload}} ->
-            #{<<"logs">> := Logs} =  emqx_json:decode(Payload, [return_maps]),
-            try_receive([length(Logs) | Acc])
-    after 500 ->
-            Acc
-    end.

+ 13 - 21
apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl

@@ -40,9 +40,7 @@ slow_subs
  enable = true
  top_k_num = 5,
  expire_interval = 60000
- notice_interval = 0
- notice_qos = 0
- notice_batch_size = 3
+ stats_type = whole
 }""">>).
 
 
@@ -92,8 +90,7 @@ t_get_history(_) ->
     Now = ?NOW,
     Each = fun(I) ->
                    ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
-                   ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId),
-                                                type = average,
+                   ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
                                                 last_update_time = Now})
            end,
 
@@ -101,18 +98,16 @@ t_get_history(_) ->
 
     {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10",
                              auth_header_()),
-    #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]),
+    [First | _] = emqx_json:decode(Data, [return_maps]),
 
-    RFirst = #{<<"clientid">> => <<"test_5">>,
-               <<"latency">> => 5,
-               <<"type">> => <<"average">>,
-               <<"last_update_time">> => Now},
-
-    ?assertEqual(RFirst, First).
+    ?assertMatch(#{<<"clientid">> := <<"test_5">>,
+                   <<"topic">> := <<"topic">>,
+                   <<"last_update_time">> := Now,
+                   <<"node">> := _,
+                   <<"timespan">> := _}, First).
 
 t_clear(_) ->
-    ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>),
-                                 type = average,
+    ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)),
                                  last_update_time = ?NOW}),
 
     {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [],
@@ -122,7 +117,7 @@ t_clear(_) ->
 
 t_settting(_) ->
     Conf = emqx:get_config([slow_subs]),
-    Conf2 = Conf#{threshold => 1000},
+    Conf2 = Conf#{stats_type => internal},
     {ok, Data} = request_api(put,
                              api_path(["slow_subscriptions", "settings"]),
                              [],
@@ -131,22 +126,19 @@ t_settting(_) ->
 
     Return = decode_json(Data),
 
-    ?assertEqual(Conf2, Return),
+    ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return),
 
     {ok, GetData} = request_api(get,
                                 api_path(["slow_subscriptions", "settings"]),
                                 [],
                                 auth_header_()
-                            ),
+                               ),
 
     timer:sleep(1000),
 
     GetReturn = decode_json(GetData),
 
-    ?assertEqual(Conf2, GetReturn),
-
-    ?assertEqual(1000,
-                 emqx_message_latency_stats:get_threshold()).
+    ?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn).
 
 decode_json(Data) ->
     BinJosn = emqx_json:decode(Data, [return_maps]),