Explorar el Código

fix: run `message.dropped` hook, inc `messages.dropped` metrics

- when awaiting_rel full
- packet identifier in use (QoS2 packet resend)
JimMoen hace 3 años
padre
commit
b88398c3c6
Se han modificado 4 ficheros con 20 adiciones y 11 borrados
  1. 0 9
      apps/emqx/src/emqx_channel.erl
  2. 16 2
      apps/emqx/src/emqx_session.erl
  3. 2 0
      changes/v5.0.13-en.md
  4. 2 0
      changes/v5.0.13-zh.md

+ 0 - 9
apps/emqx/src/emqx_channel.erl

@@ -752,15 +752,6 @@ do_publish(
             ok = emqx_metrics:inc('packets.publish.inuse'),
             handle_out(pubrec, {PacketId, RC}, Channel);
         {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
-            ?SLOG(
-                warning,
-                #{
-                    msg => "dropped_qos2_packet",
-                    reason => emqx_reason_codes:name(RC),
-                    packet_id => PacketId
-                },
-                #{topic => Msg#message.topic}
-            ),
             ok = emqx_metrics:inc('packets.publish.dropped'),
             handle_out(disconnect, RC, Channel)
     end.

+ 16 - 2
apps/emqx/src/emqx_session.erl

@@ -390,15 +390,29 @@ publish(
                     AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
                     {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
                 true ->
-                    {error, ?RC_PACKET_IDENTIFIER_IN_USE}
+                    drop_qos2_msg(PacketId, Msg, ?RC_PACKET_IDENTIFIER_IN_USE)
             end;
         true ->
-            {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
+            drop_qos2_msg(PacketId, Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED)
     end;
 %% Publish QoS0/1 directly
 publish(_ClientInfo, _PacketId, Msg, Session) ->
     {ok, emqx_broker:publish(Msg), Session}.
 
+drop_qos2_msg(PacketId, Msg, RC) ->
+    ?SLOG(
+        warning,
+        #{
+            msg => "dropped_qos2_packet",
+            reason => emqx_reason_codes:name(RC),
+            packet_id => PacketId
+        },
+        #{topic => Msg#message.topic}
+    ),
+    ok = emqx_metrics:inc('messages.dropped'),
+    ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(RC)]),
+    {error, RC}.
+
 is_awaiting_full(#session{max_awaiting_rel = infinity}) ->
     false;
 is_awaiting_full(#session{

+ 2 - 0
changes/v5.0.13-en.md

@@ -11,3 +11,5 @@
 - Refactor: use `POST` not `PUT` for `/users/{name}/change_pwd` [#9533](https://github.com/emqx/emqx/pull/9533).
 
 ## Bug fixes
+
+- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487).

+ 2 - 0
changes/v5.0.13-zh.md

@@ -11,3 +11,5 @@
 - 重构:  `/users/{name}/change_pwd` 的请求方式从 `PUT` 改为了 `POST` [#9533](https://github.com/emqx/emqx/pull/9533)。
 
 ## 修复
+
+- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。