Просмотр исходного кода

Merge pull request #6942 from terry-xiaoyu/hook_delivery_drop

Hook delivery drop
Shawn 4 лет назад
Родитель
Сommit
e6997dc1ce

+ 25 - 22
apps/emqx/src/emqx_channel.erl

@@ -365,7 +365,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
 
 handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
           = #channel{clientinfo = ClientInfo, session = Session}) ->
-    case emqx_session:puback(PacketId, Session) of
+    case emqx_session:puback(ClientInfo, PacketId, Session) of
         {ok, Msg, NSession} ->
             ok = after_message_acked(ClientInfo, Msg, Properties),
             {ok, set_session(NSession, Channel)};
@@ -384,7 +384,7 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
 
 handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
           = #channel{clientinfo = ClientInfo, session = Session}) ->
-    case emqx_session:pubrec(PacketId, Session) of
+    case emqx_session:pubrec(ClientInfo, PacketId, Session) of
         {ok, Msg, NSession} ->
             ok = after_message_acked(ClientInfo, Msg, Properties),
             NChannel = set_session(NSession, Channel),
@@ -399,8 +399,9 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
             handle_out(pubrel, {PacketId, RC}, Channel)
     end;
 
-handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
-    case emqx_session:pubrel(PacketId, Session) of
+handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{clientinfo = ClientInfo,
+        session = Session}) ->
+    case emqx_session:pubrel(ClientInfo, PacketId, Session) of
         {ok, NSession} ->
             NChannel = set_session(NSession, Channel),
             handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
@@ -410,8 +411,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
             handle_out(pubcomp, {PacketId, RC}, Channel)
     end;
 
-handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
-    case emqx_session:pubcomp(PacketId, Session) of
+handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{
+        clientinfo = ClientInfo, session = Session}) ->
+    case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
         {ok, NSession} ->
             {ok, set_session(NSession, Channel)};
         {ok, Publishes, NSession} ->
@@ -617,8 +619,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
     handle_out(puback, {PacketId, RC}, NChannel);
 
 do_publish(PacketId, Msg = #message{qos = ?QOS_2},
-           Channel = #channel{session = Session}) ->
-    case emqx_session:publish(PacketId, Msg, Session) of
+           Channel = #channel{clientinfo = ClientInfo, session = Session}) ->
+    case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
         {ok, PubRes, NSession} ->
             RC = puback_reason_code(PubRes),
             NChannel0 = set_session(NSession, Channel),
@@ -779,22 +781,22 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
 handle_deliver(Delivers, Channel = #channel{takeover = true,
                                             pendings = Pendings,
                                             session = Session,
-                                            clientinfo = #{clientid := ClientId}}) ->
+                                            clientinfo = #{clientid := ClientId} = ClientInfo}) ->
     %% NOTE: Order is important here. While the takeover is in
     %% progress, the session cannot enqueue messages, since it already
     %% passed on the queue to the new connection in the session state.
     NPendings = lists:append(
                   Pendings,
-                  emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
+                  emqx_session:ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)),
     {ok, Channel#channel{pendings = NPendings}};
 
 handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
                                             takeover   = false,
                                             session    = Session,
-                                            clientinfo = #{clientid := ClientId}}) ->
+                                            clientinfo = #{clientid := ClientId} = ClientInfo}) ->
     Delivers1 = maybe_nack(Delivers),
-    Delivers2 = emqx_session:ignore_local(Delivers1, ClientId, Session),
-    NSession = emqx_session:enqueue(Delivers2, Session),
+    Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
+    NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
     NChannel = set_session(NSession, Channel),
     %% We consider queued/dropped messages as delivered since they are now in the session state.
     maybe_mark_as_delivered(Session, Delivers),
@@ -802,9 +804,10 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
 
 handle_deliver(Delivers, Channel = #channel{session = Session,
                                             takeover   = false,
-                                            clientinfo = #{clientid := ClientId}
+                                            clientinfo = #{clientid := ClientId} = ClientInfo
                                            }) ->
-    case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of
+    case emqx_session:deliver(ClientInfo,
+            emqx_session:ignore_local(ClientInfo, Delivers, ClientId, Session), Session) of
         {ok, Publishes, NSession} ->
             NChannel = set_session(NSession, Channel),
             maybe_mark_as_delivered(NSession, Delivers),
@@ -1111,8 +1114,8 @@ handle_timeout(_TRef, retry_delivery,
                Channel = #channel{conn_state = disconnected}) ->
     {ok, Channel};
 handle_timeout(_TRef, retry_delivery,
-               Channel = #channel{session = Session}) ->
-    case emqx_session:retry(Session) of
+               Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:retry(ClientInfo, Session) of
         {ok, NSession} ->
             {ok, clean_timer(retry_timer, set_session(NSession, Channel))};
         {ok, Publishes, Timeout, NSession} ->
@@ -1124,8 +1127,8 @@ handle_timeout(_TRef, expire_awaiting_rel,
                Channel = #channel{conn_state = disconnected}) ->
     {ok, Channel};
 handle_timeout(_TRef, expire_awaiting_rel,
-               Channel = #channel{session = Session}) ->
-    case emqx_session:expire(awaiting_rel, Session) of
+               Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
         {ok, NSession} ->
             {ok, clean_timer(await_timer, set_session(NSession, Channel))};
         {ok, Timeout, NSession} ->
@@ -1690,11 +1693,11 @@ maybe_resume_session(#channel{resuming = false}) ->
 maybe_resume_session(#channel{session  = Session,
                               resuming = true,
                               pendings = Pendings,
-                              clientinfo = #{clientid := ClientId}}) ->
-    {ok, Publishes, Session1} = emqx_session:replay(Session),
+                              clientinfo = #{clientid := ClientId} = ClientInfo}) ->
+    {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
     %% We consider queued/dropped messages as delivered since they are now in the session state.
     emqx_persistent_session:mark_as_delivered(ClientId, Pendings),
-    case emqx_session:deliver(Pendings, Session1) of
+    case emqx_session:deliver(ClientInfo, Pendings, Session1) of
         {ok, Session2} ->
             {ok, Publishes, Session2};
         {ok, More, Session2} ->

+ 33 - 30
apps/emqx/src/emqx_connection.erl

@@ -124,36 +124,39 @@
 
 -define(ACTIVE_N, 100).
 
--define(INFO_KEYS,  [ socktype
-                    , peername
-                    , sockname
-                    , sockstate
-                    ]).
-
--define(CONN_STATS, [ recv_pkt
-                    , recv_msg
-                    , 'recv_msg.qos0'
-                    , 'recv_msg.qos1'
-                    , 'recv_msg.qos2'
-                    , 'recv_msg.dropped'
-                    , 'recv_msg.dropped.expired'
-                    , send_pkt
-                    , send_msg
-                    , 'send_msg.qos0'
-                    , 'send_msg.qos1'
-                    , 'send_msg.qos2'
-                    , 'send_msg.dropped'
-                    , 'send_msg.dropped.expired'
-                    , 'send_msg.dropped.queue_full'
-                    , 'send_msg.dropped.too_large'
-                    ]).
-
--define(SOCK_STATS, [ recv_oct
-                    , recv_cnt
-                    , send_oct
-                    , send_cnt
-                    , send_pend
-                    ]).
+-define(INFO_KEYS,
+    [ socktype
+    , peername
+    , sockname
+    , sockstate
+    ]).
+
+-define(CONN_STATS,
+    [ recv_pkt
+    , recv_msg
+    , 'recv_msg.qos0'
+    , 'recv_msg.qos1'
+    , 'recv_msg.qos2'
+    , 'recv_msg.dropped'
+    , 'recv_msg.dropped.await_pubrel_timeout'
+    , send_pkt
+    , send_msg
+    , 'send_msg.qos0'
+    , 'send_msg.qos1'
+    , 'send_msg.qos2'
+    , 'send_msg.dropped'
+    , 'send_msg.dropped.expired'
+    , 'send_msg.dropped.queue_full'
+    , 'send_msg.dropped.too_large'
+    ]).
+
+-define(SOCK_STATS,
+    [ recv_oct
+    , recv_cnt
+    , send_oct
+    , send_cnt
+    , send_pend
+    ]).
 
 -define(ENABLED(X), (X =/= undefined)).
 

+ 2 - 2
apps/emqx/src/emqx_metrics.erl

@@ -143,7 +143,7 @@
          %% PubSub Metrics
          {counter, 'messages.publish'},       % Messages Publish
          {counter, 'messages.dropped'},       % Messages dropped due to no subscribers
-         {counter, 'messages.dropped.expired'},  % QoS2 Messages expired
+         {counter, 'messages.dropped.await_pubrel_timeout'},  % QoS2 Messages expired
          {counter, 'messages.dropped.no_subscribers'},  % Messages dropped
          {counter, 'messages.forward'},       % Messages forward
          {counter, 'messages.delayed'},       % Messages delayed
@@ -552,7 +552,7 @@ reserved_idx('messages.qos2.received')       -> 106;
 reserved_idx('messages.qos2.sent')           -> 107;
 reserved_idx('messages.publish')             -> 108;
 reserved_idx('messages.dropped')             -> 109;
-reserved_idx('messages.dropped.expired')     -> 110;
+reserved_idx('messages.dropped.await_pubrel_timeout')     -> 110;
 reserved_idx('messages.dropped.no_subscribers') -> 111;
 reserved_idx('messages.forward')             -> 112;
 %%reserved_idx('messages.retained')            -> 113; %% keep the index, new metrics can use this

+ 3 - 3
apps/emqx/src/emqx_persistent_session.erl

@@ -266,14 +266,14 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
     %% 1. Get pending messages from DB.
     ?tp(ps_initial_pendings, #{sid => SessionID}),
     Pendings1 = pending(SessionID),
-    Pendings2 = emqx_session:ignore_local(Pendings1, ClientID, Session),
+    Pendings2 = emqx_session:ignore_local(ClientInfo, Pendings1, ClientID, Session),
     ?tp(ps_got_initial_pendings, #{ sid => SessionID
                                   , msgs => Pendings1}),
 
     %% 2. Enqueue messages to mimic that the process was alive
     %%    when the messages were delivered.
     ?tp(ps_persist_pendings, #{sid => SessionID}),
-    Session1 = emqx_session:enqueue(Pendings2, Session),
+    Session1 = emqx_session:enqueue(ClientInfo, Pendings2, Session),
     Session2 = persist(ClientInfo, ConnInfo, Session1),
     mark_as_delivered(SessionID, Pendings2),
     ?tp(ps_persist_pendings_msgs, #{ msgs => Pendings2
@@ -294,7 +294,7 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
     ?tp(ps_marker_pendings, #{sid => SessionID}),
     MarkerIDs = [Marker || {_, Marker} <- NodeMarkers],
     Pendings3 = pending(SessionID, MarkerIDs),
-    Pendings4 = emqx_session:ignore_local(Pendings3, ClientID, Session),
+    Pendings4 = emqx_session:ignore_local(ClientInfo, Pendings3, ClientID, Session),
     ?tp(ps_marker_pendings_msgs, #{ sid => SessionID
                                   , msgs => Pendings4}),
 

+ 132 - 139
apps/emqx/src/emqx_session.erl

@@ -66,27 +66,27 @@
         , unsubscribe/4
         ]).
 
--export([ publish/3
-        , puback/2
-        , pubrec/2
-        , pubrel/2
-        , pubcomp/2
+-export([ publish/4
+        , puback/3
+        , pubrec/3
+        , pubrel/3
+        , pubcomp/3
         ]).
 
--export([ deliver/2
-        , enqueue/2
-        , dequeue/1
-        , ignore_local/3
-        , retry/1
+-export([ deliver/3
+        , enqueue/3
+        , dequeue/2
+        , ignore_local/4
+        , retry/2
         , terminate/3
         ]).
 
 -export([ takeover/1
         , resume/2
-        , replay/1
+        , replay/2
         ]).
 
--export([expire/2]).
+-export([expire/3]).
 
 %% Export for CT
 -export([set_field/3]).
@@ -147,27 +147,29 @@
 
 -type(replies() :: list(publish() | pubrel())).
 
--define(INFO_KEYS, [id,
-                    is_persistent,
-                    subscriptions,
-                    upgrade_qos,
-                    retry_interval,
-                    await_rel_timeout,
-                    created_at
-                   ]).
-
--define(STATS_KEYS, [subscriptions_cnt,
-                     subscriptions_max,
-                     inflight_cnt,
-                     inflight_max,
-                     mqueue_len,
-                     mqueue_max,
-                     mqueue_dropped,
-                     next_pkt_id,
-                     awaiting_rel_cnt,
-                     awaiting_rel_max,
-                     latency_stats
-                    ]).
+-define(INFO_KEYS,
+    [ id
+    , is_persistent
+    , subscriptions
+    , upgrade_qos
+    , retry_interval
+    , await_rel_timeout
+    , created_at
+    ]).
+
+-define(STATS_KEYS,
+    [ subscriptions_cnt
+    , subscriptions_max
+    , inflight_cnt
+    , inflight_max
+    , mqueue_len
+    , mqueue_max
+    , mqueue_dropped
+    , next_pkt_id
+    , awaiting_rel_cnt
+    , awaiting_rel_max
+    , latency_stats
+    ]).
 
 -define(DEFAULT_BATCH_N, 1000).
 
@@ -277,18 +279,19 @@ stats(Session) -> info(?STATS_KEYS, Session).
 %% Ignore local messages
 %%--------------------------------------------------------------------
 
-ignore_local(Delivers, Subscriber, Session) ->
+ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
     Subs = info(subscriptions, Session),
-    lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
-                        case maps:find(Topic, Subs) of
-                            {ok, #{nl := 1}} when Subscriber =:= Publisher ->
-                                ok = emqx_metrics:inc('delivery.dropped'),
-                                ok = emqx_metrics:inc('delivery.dropped.no_local'),
-                                true;
-                            _ ->
-                                false
-                        end
-                    end, Delivers).
+    lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
+        case maps:find(Topic, Subs) of
+            {ok, #{nl := 1}} when Subscriber =:= Publisher ->
+                ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
+                ok = emqx_metrics:inc('delivery.dropped'),
+                ok = emqx_metrics:inc('delivery.dropped.no_local'),
+                true;
+            _ ->
+                false
+        end
+    end, Delivers).
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: SUBSCRIBE
@@ -310,7 +313,6 @@ subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
         true -> {error, ?RC_QUOTA_EXCEEDED}
     end.
 
--compile({inline, [is_subscriptions_full/1]}).
 is_subscriptions_full(#session{max_subscriptions = infinity}) ->
     false;
 is_subscriptions_full(#session{subscriptions = Subs,
@@ -340,10 +342,10 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts,
 %% Client -> Broker: PUBLISH
 %%--------------------------------------------------------------------
 
--spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
+-spec(publish(emqx_types:clientinfo(), emqx_types:packet_id(), emqx_types:message(), session())
       -> {ok, emqx_types:publish_result(), session()}
        | {error, emqx_types:reason_code()}).
-publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
+publish(_ClientInfo, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
         Session = #session{awaiting_rel = AwaitingRel}) ->
     case is_awaiting_full(Session) of
         false ->
@@ -359,10 +361,9 @@ publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
     end;
 
 %% Publish QoS0/1 directly
-publish(_PacketId, Msg, Session) ->
+publish(_ClientInfo, _PacketId, Msg, Session) ->
     {ok, emqx_broker:publish(Msg), Session}.
 
--compile({inline, [is_awaiting_full/1]}).
 is_awaiting_full(#session{max_awaiting_rel = infinity}) ->
     false;
 is_awaiting_full(#session{awaiting_rel = AwaitingRel,
@@ -373,23 +374,22 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
 %% Client -> Broker: PUBACK
 %%--------------------------------------------------------------------
 
--spec(puback(emqx_types:packet_id(), session())
+-spec(puback(emqx_types:clientinfo(), emqx_types:packet_id(), session())
       -> {ok, emqx_types:message(), session()}
        | {ok, emqx_types:message(), replies(), session()}
        | {error, emqx_types:reason_code()}).
-puback(PacketId, Session = #session{inflight = Inflight}) ->
+puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
             Session2 = update_latency(Msg, Session),
-            return_with(Msg, dequeue(Session2#session{inflight = Inflight1}));
+            return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1}));
         {value, {_Pubrel, _Ts}} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
     end.
 
--compile({inline, [return_with/2]}).
 return_with(Msg, {ok, Session}) ->
     {ok, Msg, Session};
 return_with(Msg, {ok, Publishes, Session}) ->
@@ -399,10 +399,10 @@ return_with(Msg, {ok, Publishes, Session}) ->
 %% Client -> Broker: PUBREC
 %%--------------------------------------------------------------------
 
--spec(pubrec(emqx_types:packet_id(), session())
+-spec(pubrec(emqx_types:clientinfo(), emqx_types:packet_id(), session())
       -> {ok, emqx_types:message(), session()}
        | {error, emqx_types:reason_code()}).
-pubrec(PacketId, Session = #session{inflight = Inflight}) ->
+pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Msg, _Ts}} when is_record(Msg, message) ->
             Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}),
@@ -418,9 +418,9 @@ pubrec(PacketId, Session = #session{inflight = Inflight}) ->
 %% Client -> Broker: PUBREL
 %%--------------------------------------------------------------------
 
--spec(pubrel(emqx_types:packet_id(), session())
+-spec(pubrel(emqx_types:clientinfo(), emqx_types:packet_id(), session())
       -> {ok, session()} | {error, emqx_types:reason_code()}).
-pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
+pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
     case maps:take(PacketId, AwaitingRel) of
         {_Ts, AwaitingRel1} ->
             {ok, Session#session{awaiting_rel = AwaitingRel1}};
@@ -432,15 +432,15 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
 %% Client -> Broker: PUBCOMP
 %%--------------------------------------------------------------------
 
--spec(pubcomp(emqx_types:packet_id(), session())
+-spec(pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session())
       -> {ok, session()} | {ok, replies(), session()}
        | {error, emqx_types:reason_code()}).
-pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
+pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) ->
             Session2 = update_latency(Pubrel, Session),
             Inflight1 = emqx_inflight:delete(PacketId, Inflight),
-            dequeue(Session2#session{inflight = Inflight1});
+            dequeue(ClientInfo, Session2#session{inflight = Inflight1});
         {value, _Other} ->
             {error, ?RC_PACKET_IDENTIFIER_IN_USE};
         none ->
@@ -451,29 +451,30 @@ pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
 %% Dequeue Msgs
 %%--------------------------------------------------------------------
 
-dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
+dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
     case emqx_mqueue:is_empty(Q) of
         true  -> {ok, Session};
         false ->
-            {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
-            deliver(Msgs, [], Session#session{mqueue = Q1})
+            {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
+            do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1})
     end.
 
-dequeue(0, Msgs, Q) ->
+dequeue(_ClientInfo, 0, Msgs, Q) ->
     {lists:reverse(Msgs), Q};
 
-dequeue(Cnt, Msgs, Q) ->
+dequeue(ClientInfo, Cnt, Msgs, Q) ->
     case emqx_mqueue:out(Q) of
-        {empty, _Q} -> dequeue(0, Msgs, Q);
+        {empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q);
         {{value, Msg}, Q1} ->
             case emqx_message:is_expired(Msg) of
-                true  -> ok = inc_expired_cnt(delivery),
-                         dequeue(Cnt, Msgs, Q1);
-                false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
+                true ->
+                    ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
+                    ok = inc_delivery_expired_cnt(),
+                    dequeue(ClientInfo, Cnt, Msgs, Q1);
+                false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
             end
     end.
 
--compile({inline, [acc_cnt/2]}).
 acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
 acc_cnt(_Msg, Cnt) -> Cnt - 1.
 
@@ -481,38 +482,38 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1.
 %% Broker -> Client: Deliver
 %%--------------------------------------------------------------------
 
--spec(deliver(list(emqx_types:deliver()), session())
+-spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session())
       -> {ok, session()} | {ok, replies(), session()}).
-deliver([Deliver], Session) -> %% Optimize
-    Enrich = enrich_fun(Session),
-    deliver_msg(Enrich(Deliver), Session);
+deliver(ClientInfo, [Deliver], Session) -> %% Optimize
+    Msg = enrich_deliver(Deliver, Session),
+    deliver_msg(ClientInfo, Msg, Session);
 
-deliver(Delivers, Session) ->
-    Msgs = lists:map(enrich_fun(Session), Delivers),
-    deliver(Msgs, [], Session).
+deliver(ClientInfo, Delivers, Session) ->
+    Msgs = [enrich_deliver(D, Session) || D <- Delivers],
+    do_deliver(ClientInfo, Msgs, [], Session).
 
-deliver([], Publishes, Session) ->
+do_deliver(_ClientInfo, [], Publishes, Session) ->
     {ok, lists:reverse(Publishes), Session};
 
-deliver([Msg | More], Acc, Session) ->
-    case deliver_msg(Msg, Session) of
+do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
+    case deliver_msg(ClientInfo, Msg, Session) of
         {ok, Session1} ->
-            deliver(More, Acc, Session1);
+            do_deliver(ClientInfo, More, Acc, Session1);
         {ok, [Publish], Session1} ->
-            deliver(More, [Publish|Acc], Session1)
+            do_deliver(ClientInfo, More, [Publish|Acc], Session1)
     end.
 
-deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
+deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
     {ok, [{undefined, maybe_ack(Msg)}], Session};
 
-deliver_msg(Msg = #message{qos = QoS}, Session =
+deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
             #session{next_pkt_id = PacketId, inflight = Inflight})
     when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
     case emqx_inflight:is_full(Inflight) of
         true ->
             Session1 = case maybe_nack(Msg) of
                            true  -> Session;
-                           false -> enqueue(Msg, Session)
+                           false -> enqueue(ClientInfo, Msg, Session)
                        end,
             {ok, Session1};
         false ->
@@ -521,32 +522,34 @@ deliver_msg(Msg = #message{qos = QoS}, Session =
             {ok, [Publish], next_pkt_id(Session1)}
     end.
 
--spec(enqueue(list(emqx_types:deliver())|emqx_types:message(),
+-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(),
               session()) -> session()).
-enqueue([Deliver], Session) -> %% Optimize
-    Enrich = enrich_fun(Session),
-    enqueue(Enrich(Deliver), Session);
-
-enqueue(Delivers, Session) when is_list(Delivers) ->
-    Msgs = lists:map(enrich_fun(Session), Delivers),
-    lists:foldl(fun enqueue/2, Session, Msgs);
+enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
+    lists:foldl(fun(Deliver, Session0) ->
+            Msg = enrich_deliver(Deliver, Session),
+            enqueue(ClientInfo, Msg, Session0)
+        end, Session, Delivers);
 
-enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
+enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) ->
     {Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
-    (Dropped =/= undefined) andalso log_dropped(Dropped, Session),
+    (Dropped =/= undefined) andalso handle_dropped(ClientInfo, Dropped, Session),
     Session#session{mqueue = NewQ}.
 
-log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
+handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
     Payload = emqx_message:to_log_map(Msg),
      #{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q),
     case (QoS == ?QOS_0) andalso (not StoreQos0) of
         true  ->
+            ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
+            ok = emqx_metrics:inc('delivery.dropped'),
             ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
             ok = inc_pd('send_msg.dropped'),
             ?SLOG(warning, #{msg => "dropped_qos0_msg",
                              queue => QueueInfo,
                              payload => Payload}, #{topic => Topic});
         false ->
+            ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
+            ok = emqx_metrics:inc('delivery.dropped'),
             ok = emqx_metrics:inc('delivery.dropped.queue_full'),
             ok = inc_pd('send_msg.dropped'),
             ok = inc_pd('send_msg.dropped.queue_full'),
@@ -555,10 +558,8 @@ log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
                              payload => Payload}, #{topic => Topic})
     end.
 
-enrich_fun(Session = #session{subscriptions = Subs}) ->
-    fun({deliver, Topic, Msg}) ->
-            enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
-    end.
+enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) ->
+    enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
 
 maybe_ack(Msg) ->
     case emqx_shared_sub:is_ack_required(Msg) of
@@ -613,53 +614,54 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
 %% Retry Delivery
 %%--------------------------------------------------------------------
 
--spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
-retry(Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
+-spec(retry(emqx_types:clientinfo(), session()) ->
+    {ok, session()} | {ok, replies(), timeout(), session()}).
+retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
     case emqx_inflight:is_empty(Inflight) of
         true  -> {ok, Session};
         false ->
             Now = erlang:system_time(millisecond),
             Session2 = check_expire_latency(Now, RetryInterval, Session),
-            retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
-                           [],
-                           Now,
-                           Session2)
+            retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], Now,
+                           Session2, ClientInfo)
     end.
 
-retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
+retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
     {ok, lists:reverse(Acc), Interval, Session};
 
 retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
-               #session{retry_interval = Interval, inflight = Inflight}) ->
+               #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
     case (Age = age(Now, Ts)) >= Interval of
         true ->
-            {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
-            retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1});
+            {Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo),
+            retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
         false ->
             {ok, lists:reverse(Acc), Interval - max(0, Age), Session}
     end.
 
-retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
+do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) ->
+    Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
+    {[{pubrel, PacketId}|Acc], Inflight1};
+
+do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) ->
     case emqx_message:is_expired(Msg) of
         true ->
-            ok = inc_expired_cnt(delivery),
+            ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
+            ok = inc_delivery_expired_cnt(),
             {Acc, emqx_inflight:delete(PacketId, Inflight)};
         false ->
             Msg1 = emqx_message:set_flag(dup, true, Msg),
             Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
             {[{PacketId, Msg1}|Acc], Inflight1}
-    end;
-
-retry_delivery(PacketId, Pubrel, Now, Acc, Inflight) ->
-    Inflight1 = emqx_inflight:update(PacketId, {Pubrel, Now}, Inflight),
-    {[{pubrel, PacketId}|Acc], Inflight1}.
+    end.
 
 %%--------------------------------------------------------------------
 %% Expire Awaiting Rel
 %%--------------------------------------------------------------------
 
--spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}).
-expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
+-spec(expire(emqx_types:clientinfo(), awaiting_rel, session()) ->
+    {ok, session()} | {ok, timeout(), session()}).
+expire(_ClientInfo, awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
     case maps:size(AwaitingRel) of
         0 -> {ok, Session};
         _ -> expire_awaiting_rel(erlang:system_time(millisecond), Session)
@@ -670,7 +672,7 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
     NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
     AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
     ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
-    (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt),
+    (ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt),
     NSession = Session#session{awaiting_rel = AwaitingRel1},
     case maps:size(AwaitingRel1) of
         0 -> {ok, NSession};
@@ -693,14 +695,14 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
     ok = emqx_metrics:inc('session.resumed'),
     emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
 
--spec(replay(session()) -> {ok, replies(), session()}).
-replay(Session = #session{inflight = Inflight}) ->
+-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
+replay(ClientInfo, Session = #session{inflight = Inflight}) ->
     Pubs = lists:map(fun({PacketId, {Pubrel,  _Ts}}) when is_record(Pubrel, pubrel_await) ->
                              {pubrel, PacketId};
                         ({PacketId, {Msg, _Ts}}) ->
                              {PacketId, emqx_message:set_flag(dup, true, Msg)}
                      end, emqx_inflight:to_list(Inflight)),
-    case dequeue(Session) of
+    case dequeue(ClientInfo, Session) of
         {ok, NSession} -> {ok, Pubs, NSession};
         {ok, More, NSession} ->
             {ok, lists:append(Pubs, More), NSession}
@@ -714,29 +716,26 @@ terminate(ClientInfo, takenover, Session) ->
 terminate(ClientInfo, Reason, Session) ->
     run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
 
--compile({inline, [run_hook/2]}).
 run_hook(Name, Args) ->
     ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
 
 %%--------------------------------------------------------------------
 %% Inc message/delivery expired counter
 %%--------------------------------------------------------------------
+inc_delivery_expired_cnt() ->
+    inc_delivery_expired_cnt(1).
 
--compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}).
-
-inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
-
-inc_expired_cnt(delivery, N) ->
+inc_delivery_expired_cnt(N) ->
     ok = inc_pd('send_msg.dropped', N),
     ok = inc_pd('send_msg.dropped.expired', N),
     ok = emqx_metrics:inc('delivery.dropped', N),
-    emqx_metrics:inc('delivery.dropped.expired', N);
+    emqx_metrics:inc('delivery.dropped.expired', N).
 
-inc_expired_cnt(message, N) ->
+inc_await_pubrel_timeout(N) ->
     ok = inc_pd('recv_msg.dropped', N),
-    ok = inc_pd('recv_msg.dropped.expired', N),
+    ok = inc_pd('recv_msg.dropped.await_pubrel_timeout', N),
     ok = emqx_metrics:inc('messages.dropped', N),
-    emqx_metrics:inc('messages.dropped.expired', N).
+    emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
 
 inc_pd(Key) ->
     inc_pd(Key, 1).
@@ -747,9 +746,6 @@ inc_pd(Key, Inc) ->
 %%--------------------------------------------------------------------
 %% Next Packet Id
 %%--------------------------------------------------------------------
-
--compile({inline, [next_pkt_id/1]}).
-
 next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
     Session#session{next_pkt_id = 1};
 
@@ -788,9 +784,6 @@ get_birth_timestamp(_, _) ->
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------
-
--compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}).
-
 sort_fun() ->
     fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end.
 

+ 71 - 20
apps/emqx/test/emqx_channel_SUITE.erl

@@ -115,6 +115,56 @@ listeners_conf() ->
       ws => #{default => listener_mqtt_ws_conf()}
     }.
 
+limiter_conf() ->
+    #{bytes_in =>
+        #{bucket =>
+                #{default =>
+                    #{aggregated =>
+                            #{capacity => infinity,initial => 0,rate => infinity},
+                        per_client =>
+                            #{capacity => infinity,divisible => false,
+                            failure_strategy => force,initial => 0,low_water_mark => 0,
+                            max_retry_time => 5000,rate => infinity},
+                        zone => default}},
+            global => #{burst => 0,rate => infinity},
+            zone => #{default => #{burst => 0,rate => infinity}}},
+      connection =>
+        #{bucket =>
+                #{default =>
+                    #{aggregated =>
+                            #{capacity => infinity,initial => 0,rate => infinity},
+                        per_client =>
+                            #{capacity => infinity,divisible => false,
+                            failure_strategy => force,initial => 0,low_water_mark => 0,
+                            max_retry_time => 5000,rate => infinity},
+                        zone => default}},
+            global => #{burst => 0,rate => infinity},
+            zone => #{default => #{burst => 0,rate => infinity}}},
+      message_in =>
+        #{bucket =>
+                #{default =>
+                    #{aggregated =>
+                            #{capacity => infinity,initial => 0,rate => infinity},
+                        per_client =>
+                            #{capacity => infinity,divisible => false,
+                            failure_strategy => force,initial => 0,low_water_mark => 0,
+                            max_retry_time => 5000,rate => infinity},
+                        zone => default}},
+            global => #{burst => 0,rate => infinity},
+            zone => #{default => #{burst => 0,rate => infinity}}},
+      message_routing =>
+        #{bucket =>
+                #{default =>
+                    #{aggregated =>
+                            #{capacity => infinity,initial => 0,rate => infinity},
+                        per_client =>
+                            #{capacity => infinity,divisible => false,
+                            failure_strategy => force,initial => 0,low_water_mark => 0,
+                            max_retry_time => 5000,rate => infinity},
+                        zone => default}},
+            global => #{burst => 0,rate => infinity},
+            zone => #{default => #{burst => 0,rate => infinity}}}}.
+
 stats_conf() ->
     #{enable => true}.
 
@@ -130,11 +180,11 @@ basic_conf() ->
       stats => stats_conf(),
       listeners => listeners_conf(),
       zones => zone_conf(),
-      limiter => emqx:get_config([limiter])
+      limiter => limiter_conf()
     }.
 
 set_test_listener_confs() ->
-    Conf = emqx_config:get([]),
+    Conf = emqx_config:get([], #{}),
     emqx_config:put(basic_conf()),
     Conf.
 
@@ -180,10 +230,10 @@ end_per_suite(_Config) ->
                 ]).
 
 init_per_testcase(TestCase, Config) ->
-    NewConf = set_test_listener_confs(),
+    OldConf = set_test_listener_confs(),
     emqx_common_test_helpers:start_apps([]),
-    modify_limiter(TestCase, NewConf),
-    [{config, NewConf}|Config].
+    modify_limiter(TestCase, OldConf),
+    [{config, OldConf}|Config].
 
 end_per_testcase(_TestCase, Config) ->
     emqx_config:put(?config(config, Config)),
@@ -232,15 +282,16 @@ t_chan_info(_) ->
     ?assertEqual(clientinfo(), ClientInfo).
 
 t_chan_caps(_) ->
-     #{max_clientid_len := 65535,
+    ?assertMatch(#{
+       max_clientid_len := 65535,
        max_qos_allowed := 2,
        max_topic_alias := 65535,
-       max_topic_levels := 128,
+       max_topic_levels := Level,
        retain_available := true,
        shared_subscription := true,
        subscription_identifiers := true,
        wildcard_subscription := true
-      } = emqx_channel:caps(channel()).
+    } when is_integer(Level), emqx_channel:caps(channel())).
 
 %%--------------------------------------------------------------------
 %% Test cases for channel handle_in
@@ -377,14 +428,14 @@ t_handle_in_qos2_publish_with_error_return(_) ->
 t_handle_in_puback_ok(_) ->
     Msg = emqx_message:make(<<"t">>, <<"payload">>),
     ok = meck:expect(emqx_session, puback,
-                     fun(_PacketId, Session) -> {ok, Msg, Session} end),
+                     fun(_, _PacketId, Session) -> {ok, Msg, Session} end),
     Channel = channel(#{conn_state => connected}),
     {ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel).
     % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
 
 t_handle_in_puback_id_in_use(_) ->
     ok = meck:expect(emqx_session, puback,
-                     fun(_, _Session) ->
+                     fun(_, _, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_IN_USE}
                      end),
     {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
@@ -392,7 +443,7 @@ t_handle_in_puback_id_in_use(_) ->
 
 t_handle_in_puback_id_not_found(_) ->
     ok = meck:expect(emqx_session, puback,
-                     fun(_, _Session) ->
+                     fun(_, _, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
                      end),
     {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
@@ -430,14 +481,14 @@ t_override_client_receive_maximum(_) ->
 
 t_handle_in_pubrec_ok(_) ->
     Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
-    ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end),
+    ok = meck:expect(emqx_session, pubrec, fun(_, _, Session) -> {ok, Msg, Session} end),
     Channel = channel(#{conn_state => connected}),
     {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} =
         emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
 
 t_handle_in_pubrec_id_in_use(_) ->
     ok = meck:expect(emqx_session, pubrec,
-                     fun(_, _Session) ->
+                     fun(_, _, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_IN_USE}
                      end),
     {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} =
@@ -445,34 +496,34 @@ t_handle_in_pubrec_id_in_use(_) ->
 
 t_handle_in_pubrec_id_not_found(_) ->
     ok = meck:expect(emqx_session, pubrec,
-                     fun(_, _Session) ->
+                     fun(_, _, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
                      end),
     {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
         emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
 
 t_handle_in_pubrel_ok(_) ->
-    ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
+    ok = meck:expect(emqx_session, pubrel, fun(_, _, Session) -> {ok, Session} end),
     Channel = channel(#{conn_state => connected}),
     {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} =
         emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
 
 t_handle_in_pubrel_not_found_error(_) ->
     ok = meck:expect(emqx_session, pubrel,
-                     fun(_PacketId, _Session) ->
+                     fun(_, _PacketId, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
                      end),
     {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
         emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
 
 t_handle_in_pubcomp_ok(_) ->
-    ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
+    ok = meck:expect(emqx_session, pubcomp, fun(_, _, Session) -> {ok, Session} end),
     {ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()).
     % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
 
 t_handle_in_pubcomp_not_found_error(_) ->
     ok = meck:expect(emqx_session, pubcomp,
-                     fun(_PacketId, _Session) ->
+                     fun(_, _PacketId, _Session) ->
                              {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
                      end),
     Channel = channel(#{conn_state => connected}),
@@ -795,13 +846,13 @@ t_handle_timeout_keepalive(_) ->
 
 t_handle_timeout_retry_delivery(_) ->
     TRef = make_ref(),
-    ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
+    ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end),
     Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
     {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
 
 t_handle_timeout_expire_awaiting_rel(_) ->
     TRef = make_ref(),
-    ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
+    ok = meck:expect(emqx_session, expire, fun(_, _, Session) -> {ok, Session} end),
     Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
     {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).
 

+ 45 - 43
apps/emqx/test/emqx_session_SUITE.erl

@@ -126,23 +126,23 @@ t_unsubscribe(_) ->
 t_publish_qos0(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>),
-    {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()),
-    {ok, [], Session} = emqx_session:publish(undefined, Msg, Session).
+    {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
+    {ok, [], Session} = emqx_session:publish(clientinfo(), undefined, Msg, Session).
 
 t_publish_qos1(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>),
-    {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()),
-    {ok, [], Session} = emqx_session:publish(2, Msg, Session).
+    {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
+    {ok, [], Session} = emqx_session:publish(clientinfo(), 2, Msg, Session).
 
 t_publish_qos2(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
-    {ok, [], Session} = emqx_session:publish(1, Msg, session()),
+    {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, session()),
     ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)),
-    {ok, Session1} = emqx_session:pubrel(1, Session),
+    {ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)),
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, Session1).
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, Session1).
 
 t_publish_qos2_with_error_return(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
@@ -150,10 +150,10 @@ t_publish_qos2_with_error_return(_) ->
                         awaiting_rel => #{1 => ts(millisecond)}
                        }),
     Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
-    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session),
-    {ok, [], Session1} = emqx_session:publish(2, Msg, Session),
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session),
+    {ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session),
     ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
-    {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1).
+    {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1).
 
 t_is_awaiting_full_false(_) ->
     Session = session(#{max_awaiting_rel => infinity}),
@@ -169,7 +169,7 @@ t_puback(_) ->
     Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
     Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight, mqueue => mqueue()}),
-    {ok, Msg, Session1} = emqx_session:puback(1, Session),
+    {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
 
 t_puback_with_dequeue(_) ->
@@ -178,7 +178,7 @@ t_puback_with_dequeue(_) ->
     Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
     {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
     Session = session(#{inflight => Inflight, mqueue => Q}),
-    {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session),
+    {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
     ?assertEqual(0, emqx_session:info(mqueue_len, Session1)),
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
@@ -186,48 +186,48 @@ t_puback_with_dequeue(_) ->
 t_puback_error_packet_id_in_use(_) ->
     Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
     {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
-        emqx_session:puback(1, session(#{inflight => Inflight})).
+        emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})).
 
 t_puback_error_packet_id_not_found(_) ->
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()).
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()).
 
 t_pubrec(_) ->
     Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
     Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
-    {ok, Msg, Session1} = emqx_session:pubrec(2, Session),
+    {ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session),
     ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
 
 t_pubrec_packet_id_in_use_error(_) ->
     Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
     {error, ?RC_PACKET_IDENTIFIER_IN_USE} =
-        emqx_session:pubrec(1, session(#{inflight => Inflight})).
+        emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})).
 
 t_pubrec_packet_id_not_found_error(_) ->
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()).
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(clientinfo(), 1, session()).
 
 t_pubrel(_) ->
     Session = session(#{awaiting_rel => #{1 => ts(millisecond)}}),
-    {ok, Session1} = emqx_session:pubrel(1, Session),
+    {ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session),
     ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)).
 
 t_pubrel_error_packetid_not_found(_) ->
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()).
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()).
 
 t_pubcomp(_) ->
     Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
-    {ok, Session1} = emqx_session:pubcomp(1, Session),
+    {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
 
 t_pubcomp_error_packetid_in_use(_) ->
     Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
     Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
     Session = session(#{inflight => Inflight}),
-    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session).
+    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session).
 
 t_pubcomp_error_packetid_not_found(_) ->
-    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()).
+    {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()).
 
 %%--------------------------------------------------------------------
 %% Test cases for deliver/retry
@@ -235,14 +235,16 @@ t_pubcomp_error_packetid_not_found(_) ->
 
 t_dequeue(_) ->
     Q = mqueue(#{store_qos0 => true}),
-    {ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})),
+    {ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})),
     Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>),
             emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
             emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>)
            ],
-    Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs),
+    Session1 = lists:foldl(fun(Msg, S) ->
+        emqx_session:enqueue(clientinfo(), Msg, S)
+    end, Session, Msgs),
     {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
-        emqx_session:dequeue(Session1),
+        emqx_session:dequeue(clientinfo(), Session1),
     ?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
@@ -257,7 +259,7 @@ t_deliver_qos0(_) ->
                        clientinfo(), <<"t1">>, subopts(), Session),
     Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]],
     {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
-        emqx_session:deliver(Deliveries, Session1),
+        emqx_session:deliver(clientinfo(), Deliveries, Session1),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
 
@@ -266,38 +268,38 @@ t_deliver_qos1(_) ->
     {ok, Session} = emqx_session:subscribe(
                       clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
     Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
-    {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session),
+    {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
-    {ok, Msg1, Session2} = emqx_session:puback(1, Session1),
+    {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
-    {ok, Msg2, Session3} = emqx_session:puback(2, Session2),
+    {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
     ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
 
 t_deliver_qos2(_) ->
     ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
     Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)],
     {ok, [{1, Msg1}, {2, Msg2}], Session} =
-        emqx_session:deliver(Delivers, session()),
+        emqx_session:deliver(clientinfo(), Delivers, session()),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session)),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
 
 t_deliver_one_msg(_) ->
     {ok, [{1, Msg}], Session} =
-        emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()),
+        emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
 
 t_deliver_when_inflight_is_full(_) ->
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
     Session = session(#{inflight => emqx_inflight:new(1)}),
-    {ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session),
+    {ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
     ?assertEqual(1, length(Publishes)),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
     ?assertEqual(1, emqx_session:info(mqueue_len, Session1)),
-    {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1),
+    {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1),
     ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
     ?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
@@ -305,8 +307,8 @@ t_deliver_when_inflight_is_full(_) ->
 
 t_enqueue(_) ->
     %% store_qos0 = true
-    Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()),
-    Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>),
+    Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()),
+    Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>),
                                      delivery(?QOS_2, <<"t2">>)], Session),
     ?assertEqual(3, emqx_session:info(mqueue_len, Session1)).
 
@@ -314,11 +316,11 @@ t_retry(_) ->
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
     RetryIntervalMs = 100, %% 0.1s
     Session = session(#{retry_interval => RetryIntervalMs}),
-    {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
+    {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
     ElapseMs = 200, %% 0.2s
     ok = timer:sleep(ElapseMs),
     Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
-    {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
+    {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
     ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
 
 %%--------------------------------------------------------------------
@@ -337,24 +339,24 @@ t_resume(_) ->
 
 t_replay(_) ->
     Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
-    {ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()),
+    {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()),
     Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
-    Session2 = emqx_session:enqueue(Msg, Session1),
+    Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
     Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
-    {ok, ReplayPubs, Session3} = emqx_session:replay(Session2),
+    {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
     ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
     ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
 
 t_expire_awaiting_rel(_) ->
-    {ok, Session} = emqx_session:expire(awaiting_rel, session()),
+    {ok, Session} = emqx_session:expire(clientinfo(), awaiting_rel, session()),
     Timeout = emqx_session:info(await_rel_timeout, Session),
     Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session),
-    {ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1),
+    {ok, Timeout, Session2} = emqx_session:expire(clientinfo(), awaiting_rel, Session1),
     ?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)).
 
 t_expire_awaiting_rel_all(_) ->
     Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}),
-    {ok, Session1} = emqx_session:expire(awaiting_rel, Session),
+    {ok, Session1} = emqx_session:expire(clientinfo(), awaiting_rel, Session),
     ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)).
 
 %%--------------------------------------------------------------------

+ 22 - 22
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -527,7 +527,7 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
                        clientinfo = ClientInfo = #{clientid := ClientId}}) ->
     case ReturnCode of
         ?SN_RC_ACCEPTED ->
-            case emqx_session:puback(MsgId, Session) of
+            case emqx_session:puback(ClientInfo, MsgId, Session) of
                 {ok, Msg, NSession} ->
                     ok = after_message_acked(ClientInfo, Msg, Channel),
                     {ok, Channel#channel{session = NSession}};
@@ -574,7 +574,7 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
           Channel = #channel{ctx = Ctx,
                              session = Session,
                              clientinfo = ClientInfo}) ->
-    case emqx_session:pubrec(MsgId, Session) of
+    case emqx_session:pubrec(ClientInfo, MsgId, Session) of
         {ok, Msg, NSession} ->
             ok = after_message_acked(ClientInfo, Msg, Channel),
             NChannel = Channel#channel{session = NSession},
@@ -596,8 +596,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
     end;
 
 handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
-          Channel = #channel{ctx = Ctx, session = Session}) ->
-    case emqx_session:pubrel(MsgId, Session) of
+          Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:pubrel(ClientInfo, MsgId, Session) of
         {ok, NSession} ->
             NChannel = Channel#channel{session = NSession},
             handle_out(pubcomp, MsgId, NChannel);
@@ -611,8 +611,8 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
     end;
 
 handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
-          Channel = #channel{ctx = Ctx, session = Session}) ->
-    case emqx_session:pubcomp(MsgId, Session) of
+          Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:pubcomp(ClientInfo, MsgId, Session) of
         {ok, NSession} ->
             {ok, Channel#channel{session = NSession}};
         {ok, Publishes, NSession} ->
@@ -823,8 +823,8 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_1}, Channel) ->
     handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel);
 
 do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2},
-           Channel = #channel{ctx = Ctx, session = Session}) ->
-    case emqx_session:publish(MsgId, Msg, Session) of
+           Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of
         {ok, _PubRes, NSession} ->
             NChannel1 = ensure_timer(await_timer,
                                      Channel#channel{session = NSession}
@@ -1012,9 +1012,9 @@ do_unsubscribe(TopicFilters,
 %%--------------------------------------------------------------------
 %% Awake & Asleep
 
-awake(Channel = #channel{session = Session}) ->
-    {ok, Publishes, Session1} = emqx_session:replay(Session),
-    {NPublishes, NSession} = case emqx_session:deliver([], Session1) of
+awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
+    {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
+    {NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of
         {ok, Session2} ->
             {Publishes, Session2};
         {ok, More, Session2} ->
@@ -1109,9 +1109,9 @@ maybe_resume_session(#channel{resuming = false}) ->
     ignore;
 maybe_resume_session(#channel{session  = Session,
                               resuming = true,
-                              pendings = Pendings}) ->
-    {ok, Publishes, Session1} = emqx_session:replay(Session),
-    case emqx_session:deliver(Pendings, Session1) of
+                              pendings = Pendings, clientinfo = ClientInfo}) ->
+    {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
+    case emqx_session:deliver(ClientInfo, Pendings, Session1) of
         {ok, Session2} ->
             {ok, Publishes, Session2};
         {ok, More, Session2} ->
@@ -1335,10 +1335,10 @@ handle_deliver(Delivers, Channel = #channel{
                                       ctx = Ctx,
                                       conn_state = ConnState,
                                       session    = Session,
-                                      clientinfo = #{clientid := ClientId}})
+                                      clientinfo = ClientInfo = #{clientid := ClientId}})
   when ConnState =:= disconnected;
        ConnState =:= asleep ->
-    NSession = emqx_session:enqueue(
+    NSession = emqx_session:enqueue(ClientInfo,
                  ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx),
                  Session
                 ),
@@ -1359,8 +1359,8 @@ handle_deliver(Delivers, Channel = #channel{
 handle_deliver(Delivers, Channel = #channel{
                                       ctx = Ctx,
                                       session = Session,
-                                      clientinfo = #{clientid := ClientId}}) ->
-    case emqx_session:deliver(
+                                      clientinfo = ClientInfo = #{clientid := ClientId}}) ->
+    case emqx_session:deliver(ClientInfo,
            ignore_local(Delivers, ClientId, Session, Ctx),
            Session
           ) of
@@ -1427,8 +1427,8 @@ handle_timeout(_TRef, retry_delivery,
                Channel = #channel{conn_state = asleep}) ->
     {ok, reset_timer(retry_timer, Channel)};
 handle_timeout(_TRef, retry_delivery,
-               Channel = #channel{session = Session}) ->
-    case emqx_session:retry(Session) of
+               Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:retry(ClientInfo, Session) of
         {ok, NSession} ->
             {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
         {ok, Publishes, Timeout, NSession} ->
@@ -1443,8 +1443,8 @@ handle_timeout(_TRef, expire_awaiting_rel,
                Channel = #channel{conn_state = asleep}) ->
     {ok, reset_timer(await_timer, Channel)};
 handle_timeout(_TRef, expire_awaiting_rel,
-               Channel = #channel{session = Session}) ->
-    case emqx_session:expire(awaiting_rel, Session) of
+               Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
+    case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
         {ok, NSession} ->
             {ok, clean_timer(await_timer, Channel#channel{session = NSession})};
         {ok, Timeout, NSession} ->

+ 11 - 11
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -153,9 +153,9 @@ properties(client) ->
     {'recv_msg.qos2',   integer,
      <<"Number of PUBLISH QoS2 packets received">>},
     {'recv_msg.dropped', integer,
-     <<"Number of dropped PUBLISH packets">>},
-    {'recv_msg.dropped.expired', integer,
-     <<"Number of dropped PUBLISH packets due to expired">>},
+     <<"Number of dropped PUBLISH messages">>},
+    {'recv_msg.dropped.await_pubrel_timeout', integer,
+     <<"Number of dropped PUBLISH messages due to waiting PUBREL timeout">>},
     {recv_oct,          integer,
      <<"Number of bytes received by EMQ X Broker (the same below)">>},
     {recv_pkt,          integer,
@@ -165,21 +165,21 @@ properties(client) ->
     {send_cnt,          integer,
      <<"Number of TCP packets sent">>},
     {send_msg,          integer,
-     <<"Number of PUBLISH packets sent">>},
+     <<"Number of PUBLISH messages sent">>},
     {'send_msg.qos0',   integer,
-     <<"Number of PUBLISH QoS0 packets sent">>},
+     <<"Number of PUBLISH QoS0 messages sent">>},
     {'send_msg.qos1',   integer,
-     <<"Number of PUBLISH QoS1 packets sent">>},
+     <<"Number of PUBLISH QoS1 messages sent">>},
     {'send_msg.qos2',   integer,
-     <<"Number of PUBLISH QoS2 packets sent">>},
+     <<"Number of PUBLISH QoS2 messages sent">>},
     {'send_msg.dropped', integer,
-     <<"Number of dropped PUBLISH packets">>},
+     <<"Number of dropped PUBLISH messages">>},
     {'send_msg.dropped.expired', integer,
-     <<"Number of dropped PUBLISH packets due to expired">>},
+     <<"Number of dropped PUBLISH messages due to expired">>},
     {'send_msg.dropped.queue_full', integer,
-     <<"Number of dropped PUBLISH packets due to queue full">>},
+     <<"Number of dropped PUBLISH messages due to queue full">>},
     {'send_msg.dropped.too_large', integer,
-     <<"Number of dropped PUBLISH packets due to packet length too large">>},
+     <<"Number of dropped PUBLISH messages due to packet length too large">>},
     {send_oct,          integer,
      <<"Number of bytes sent">>},
     {send_pkt,          integer,

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_metrics.erl

@@ -69,7 +69,7 @@ properties() ->
         {'messages.delayed',                integer, <<"Number of delay- published messages stored by EMQ X Broker">>},
         {'messages.delivered',              integer, <<"Number of messages forwarded to the subscription process internally by EMQ X Broker">>},
         {'messages.dropped',                integer, <<"Total number of messages dropped by EMQ X Broker before forwarding to the subscription process">>},
-        {'messages.dropped.expired',        integer, <<"Number of messages dropped due to message expiration when receiving">>},
+        {'messages.dropped.await_pubrel_timeout', integer, <<"Number of messages dropped due to waiting PUBREL timeout">>},
         {'messages.dropped.no_subscribers', integer, <<"Number of messages dropped due to no subscribers">>},
         {'messages.forward',                integer, <<"Number of messages forwarded to other nodes">>},
         {'messages.publish',                integer, <<"Number of messages published in addition to system messages">>},

+ 1 - 1
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -403,7 +403,7 @@ emqx_collect(emqx_messages_dropped, Metrics) ->
     counter_metric(?C('messages.dropped', Metrics));
 
 emqx_collect(emqx_messages_dropped_expired, Metrics) ->
-    counter_metric(?C('messages.dropped.expired', Metrics));
+    counter_metric(?C('messages.dropped.await_pubrel_timeout', Metrics));
 
 emqx_collect(emqx_messages_dropped_no_subscribers, Metrics) ->
     counter_metric(?C('messages.dropped.no_subscribers', Metrics));

+ 72 - 3
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -39,6 +39,7 @@
         , on_message_dropped/4
         , on_message_delivered/3
         , on_message_acked/3
+        , on_delivery_dropped/4
         , on_bridge_message_received/2
         ]).
 
@@ -63,6 +64,7 @@ event_names() ->
     , 'message.delivered'
     , 'message.acked'
     , 'message.dropped'
+    , 'delivery.dropped'
     ].
 
 reload() ->
@@ -153,6 +155,15 @@ on_message_acked(ClientInfo, Message, Env) ->
     end,
     {ok, Message}.
 
+on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
+    case ignore_sys_message(Message) of
+        true -> ok;
+        false ->
+            apply_event('delivery.dropped',
+                fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env)
+    end,
+    {ok, Message}.
+
 %%--------------------------------------------------------------------
 %% Event Messages
 %%--------------------------------------------------------------------
@@ -311,6 +322,32 @@ eventmsg_acked(_ClientInfo = #{
           publish_received_at => Timestamp
         }).
 
+eventmsg_delivery_dropped(_ClientInfo = #{
+            peerhost := PeerHost,
+            clientid := ReceiverCId,
+            username := ReceiverUsername
+        },
+        Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic,
+            headers = Headers, payload = Payload, timestamp = Timestamp},
+        Reason) ->
+    with_basic_columns('delivery.dropped',
+        #{id => emqx_guid:to_hexstr(Id),
+        reason => Reason,
+        from_clientid => ClientId,
+        from_username => emqx_message:get_header(username, Message, undefined),
+        clientid => ReceiverCId,
+        username => ReceiverUsername,
+        payload => Payload,
+        peerhost => ntoa(PeerHost),
+        topic => Topic,
+        qos => QoS,
+        flags => Flags,
+        %% the column 'headers' will be removed in the next major release
+        headers => printable_maps(Headers),
+        pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
+        publish_received_at => Timestamp
+        }).
+
 sub_unsub_prop_key('session.subscribed') -> sub_props;
 sub_unsub_prop_key('session.unsubscribed') -> unsub_props.
 
@@ -345,6 +382,7 @@ event_info() ->
     , event_info_client_disconnected()
     , event_info_session_subscribed()
     , event_info_session_unsubscribed()
+    , event_info_delivery_dropped()
     ].
 
 event_info_message_publish() ->
@@ -371,10 +409,18 @@ event_info_message_acked() ->
 event_info_message_dropped() ->
     event_info_common(
         'message.dropped',
-        {<<"message dropped">>, <<"消息丢弃"/utf8>>},
-        {<<"message dropped">>, <<"消息丢弃"/utf8>>},
+        {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>},
+        {<<"messages are discarded during routing, usually because there are no subscribers">>, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
         <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
     ).
+event_info_delivery_dropped() ->
+    event_info_common(
+        'delivery.dropped',
+        {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>},
+        {<<"messages are discarded during delivery, i.e. because the message queue is full">>,
+        <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>},
+        <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">>
+    ).
 event_info_client_connected() ->
     event_info_common(
         'client.connected',
@@ -414,7 +460,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
     }.
 
 test_columns('message.dropped') ->
-    test_columns('message.publish');
+    [ {<<"reason">>, <<"no_subscribers">>}
+    ] ++ test_columns('message.publish');
 test_columns('message.publish') ->
     [ {<<"clientid">>, <<"c_emqx">>}
     , {<<"username">>, <<"u_emqx">>}
@@ -422,6 +469,9 @@ test_columns('message.publish') ->
     , {<<"qos">>, 1}
     , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
     ];
+test_columns('delivery.dropped') ->
+    [ {<<"reason">>, <<"queue_full">>}
+    ] ++ test_columns('message.delivered');
 test_columns('message.acked') ->
     test_columns('message.delivered');
 test_columns('message.delivered') ->
@@ -486,6 +536,23 @@ columns_with_exam('message.dropped') ->
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
     ];
+columns_with_exam('delivery.dropped') ->
+    [ {<<"event">>, 'delivery.dropped'}
+    , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
+    , {<<"reason">>, queue_full}
+    , {<<"from_clientid">>, <<"c_emqx_1">>}
+    , {<<"from_username">>, <<"u_emqx_1">>}
+    , {<<"clientid">>, <<"c_emqx_2">>}
+    , {<<"username">>, <<"u_emqx_2">>}
+    , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
+    , {<<"peerhost">>, <<"192.168.0.10">>}
+    , {<<"topic">>, <<"t/a">>}
+    , {<<"qos">>, 1}
+    , {<<"flags">>, #{}}
+    , {<<"publish_received_at">>, erlang:system_time(millisecond)}
+    , {<<"timestamp">>, erlang:system_time(millisecond)}
+    , {<<"node">>, node()}
+    ];
 columns_with_exam('client.connected') ->
     [ {<<"event">>, 'client.connected'}
     , {<<"clientid">>, <<"c_emqx">>}
@@ -578,6 +645,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) ->
 event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
 event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
 event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
+event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
 event_name(_) -> 'message.publish'.
 
 event_topic('client.connected') -> <<"$events/client_connected">>;
@@ -587,6 +655,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
 event_topic('message.delivered') -> <<"$events/message_delivered">>;
 event_topic('message.acked') -> <<"$events/message_acked">>;
 event_topic('message.dropped') -> <<"$events/message_dropped">>;
+event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
 event_topic('message.publish') -> <<"$events/message_publish">>.
 
 printable_maps(undefined) -> #{};

+ 53 - 2
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -144,6 +144,7 @@ init_per_testcase(t_events, Config) ->
                         "\"$events/message_acked\", "
                         "\"$events/message_delivered\", "
                         "\"$events/message_dropped\", "
+                        "\"$events/delivery_dropped\", "
                         "\"t1\"",
     {ok, Rule} = emqx_rule_engine:create_rule(
         #{id => <<"rule:t_events">>,
@@ -322,18 +323,20 @@ t_events(_Config) ->
         ]),
     ct:pal("====== verify $events/client_connected"),
     client_connected(Client, Client2),
+    ct:pal("====== verify $events/message_dropped"),
+    message_dropped(Client),
     ct:pal("====== verify $events/session_subscribed"),
     session_subscribed(Client2),
     ct:pal("====== verify t1"),
     message_publish(Client),
+    ct:pal("====== verify $events/delivery_dropped"),
+    delivery_dropped(Client),
     ct:pal("====== verify $events/message_delivered"),
     message_delivered(Client),
     ct:pal("====== verify $events/message_acked"),
     message_acked(Client),
     ct:pal("====== verify $events/session_unsubscribed"),
     session_unsubscribed(Client2),
-    ct:pal("====== verify $events/message_dropped"),
-    message_dropped(Client),
     ct:pal("====== verify $events/client_disconnected"),
     client_disconnected(Client, Client2),
     ok.
@@ -365,6 +368,15 @@ session_unsubscribed(Client2) ->
 message_delivered(_Client) ->
     verify_event('message.delivered'),
     ok.
+delivery_dropped(Client) ->
+    %% subscribe "t1" and then publish to "t1", the message will not be received by itself
+    %% because we have set the subscribe flag 'nl' = true
+    {ok, _, _} = emqtt:subscribe(Client, #{}, <<"t1">>, [{nl, true}, {qos, 1}]),
+    ct:sleep(50),
+    message_publish(Client),
+    ct:pal("--- current emqx hooks: ~p", [ets:tab2list(emqx_hooks)]),
+    verify_event('delivery.dropped'),
+    ok.
 message_dropped(Client) ->
     message_publish(Client),
     verify_event('message.dropped'),
@@ -1490,6 +1502,45 @@ verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed'
                  maps:get(PropKey, Fields)),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000);
 
+verify_event_fields('delivery.dropped', Fields) ->
+    #{event := 'delivery.dropped',
+      id := ID,
+      metadata := #{rule_id := RuleId},
+      reason := Reason,
+      clientid := ClientId,
+      username := Username,
+      from_clientid := FromClientId,
+      from_username := FromUsername,
+      node := Node,
+      payload := Payload,
+      peerhost := PeerHost,
+      pub_props := Properties,
+      publish_received_at := EventAt,
+      qos := QoS,
+      flags := Flags,
+      timestamp := Timestamp,
+      topic := Topic} = Fields,
+    Now = erlang:system_time(millisecond),
+    TimestampElapse = Now - Timestamp,
+    RcvdAtElapse = Now - EventAt,
+    ?assert(is_binary(ID)),
+    ?assertEqual(<<"rule:t_events">>, RuleId),
+    ?assertEqual(no_local, Reason),
+    ?assertEqual(node(), Node),
+    ?assertEqual(<<"c_event">>, ClientId),
+    ?assertEqual(<<"u_event">>, Username),
+    ?assertEqual(<<"c_event">>, FromClientId),
+    ?assertEqual(<<"u_event">>, FromUsername),
+    ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
+    verify_ipaddr(PeerHost),
+    ?assertEqual(<<"t1">>, Topic),
+    ?assertEqual(1, QoS),
+    ?assert(is_map(Flags)),
+    ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
+    ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
+    ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
+    ?assert(EventAt =< Timestamp);
+
 verify_event_fields('message.dropped', Fields) ->
     #{id := ID,
       reason := Reason,