Преглед изворни кода

feat(hook): new emqx hook 'delivery.dropped'

Shawn пре 4 година
родитељ
комит
9434c0fa6c
3 измењених фајлова са 18 додато и 15 уклоњено
  1. 2 1
      src/emqx_channel.erl
  2. 3 2
      src/emqx_metrics.erl
  3. 13 12
      src/emqx_session.erl

+ 2 - 1
src/emqx_channel.erl

@@ -750,9 +750,10 @@ handle_deliver(Delivers, Channel = #channel{session = Session,
 
 ignore_local(Delivers, Subscriber, Session) ->
     Subs = emqx_session:info(subscriptions, Session),
-    lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
+    lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
                         case maps:find(Topic, Subs) of
                             {ok, #{nl := 1}} when Subscriber =:= Publisher ->
+                                ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, no_local]),
                                 ok = emqx_metrics:inc('delivery.dropped'),
                                 ok = emqx_metrics:inc('delivery.dropped.no_local'),
                                 true;

+ 3 - 2
src/emqx_metrics.erl

@@ -146,7 +146,7 @@
          %% PubSub Metrics
          {counter, 'messages.publish'},       % Messages Publish
          {counter, 'messages.dropped'},       % Messages dropped due to no subscribers
-         {counter, 'messages.dropped.expired'},  % QoS2 Messages expired
+         {counter, 'messages.dropped.await_pubrel_timeout'},  % QoS2 await PUBREL timeout
          {counter, 'messages.dropped.no_subscribers'},  % Messages dropped
          {counter, 'messages.forward'},       % Messages forward
          {counter, 'messages.retained'},      % Messages retained
@@ -542,7 +542,8 @@ reserved_idx('messages.qos2.received')       -> 106;
 reserved_idx('messages.qos2.sent')           -> 107;
 reserved_idx('messages.publish')             -> 108;
 reserved_idx('messages.dropped')             -> 109;
-reserved_idx('messages.dropped.expired')     -> 110;
+reserved_idx('messages.dropped.expired')     -> 110; %% To be removed in 5.0
+reserved_idx('messages.dropped.await_pubrel_timeout') -> 110;
 reserved_idx('messages.dropped.no_subscribers') -> 111;
 reserved_idx('messages.forward')             -> 112;
 reserved_idx('messages.retained')            -> 113;

+ 13 - 12
src/emqx_session.erl

@@ -403,8 +403,10 @@ dequeue(Cnt, Msgs, Q) ->
         {empty, _Q} -> dequeue(0, Msgs, Q);
         {{value, Msg}, Q1} ->
             case emqx_message:is_expired(Msg) of
-                true  -> ok = inc_expired_cnt(delivery),
-                         dequeue(Cnt, Msgs, Q1);
+                true  ->
+                    ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]),
+                    ok = inc_delivery_expired_cnt(),
+                    dequeue(Cnt, Msgs, Q1);
                 false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
             end
     end.
@@ -565,7 +567,8 @@ retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
 retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
     case emqx_message:is_expired(Msg) of
         true ->
-            ok = inc_expired_cnt(delivery),
+            ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]),
+            ok = inc_delivery_expired_cnt(),
             {Acc, emqx_inflight:delete(PacketId, Inflight)};
         false ->
             Msg1 = emqx_message:set_flag(dup, true, Msg),
@@ -593,7 +596,7 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
     NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
     AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
     ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
-    (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt),
+    (ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt),
     NSession = Session#session{awaiting_rel = AwaitingRel1},
     case maps:size(AwaitingRel1) of
         0 -> {ok, NSession};
@@ -644,18 +647,16 @@ run_hook(Name, Args) ->
 %%--------------------------------------------------------------------
 %% Inc message/delivery expired counter
 %%--------------------------------------------------------------------
+inc_delivery_expired_cnt() ->
+    inc_delivery_expired_cnt(1).
 
--compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}).
-
-inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
-
-inc_expired_cnt(delivery, N) ->
+inc_delivery_expired_cnt(N) ->
     ok = emqx_metrics:inc('delivery.dropped', N),
-    emqx_metrics:inc('delivery.dropped.expired', N);
+    emqx_metrics:inc('delivery.dropped.expired', N).
 
-inc_expired_cnt(message, N) ->
+inc_await_pubrel_timeout(N) ->
     ok = emqx_metrics:inc('messages.dropped', N),
-    emqx_metrics:inc('messages.dropped.expired', N).
+    emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
 
 %%--------------------------------------------------------------------
 %% Next Packet Id