Просмотр исходного кода

Merge pull request #9487 from JimMoen/fix-message-dropped-event-v50

Fix message dropped event v50
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
5b3ad54c1b

+ 0 - 9
apps/emqx/src/emqx_channel.erl

@@ -752,15 +752,6 @@ do_publish(
             ok = emqx_metrics:inc('packets.publish.inuse'),
             handle_out(pubrec, {PacketId, RC}, Channel);
         {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
-            ?SLOG(
-                warning,
-                #{
-                    msg => "dropped_qos2_packet",
-                    reason => emqx_reason_codes:name(RC),
-                    packet_id => PacketId
-                },
-                #{topic => Msg#message.topic}
-            ),
             ok = emqx_metrics:inc('packets.publish.dropped'),
             handle_out(disconnect, RC, Channel)
     end.

+ 16 - 2
apps/emqx/src/emqx_session.erl

@@ -390,15 +390,29 @@ publish(
                     AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
                     {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
                 true ->
-                    {error, ?RC_PACKET_IDENTIFIER_IN_USE}
+                    drop_qos2_msg(PacketId, Msg, ?RC_PACKET_IDENTIFIER_IN_USE)
             end;
         true ->
-            {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
+            drop_qos2_msg(PacketId, Msg, ?RC_RECEIVE_MAXIMUM_EXCEEDED)
     end;
 %% Publish QoS0/1 directly
 publish(_ClientInfo, _PacketId, Msg, Session) ->
     {ok, emqx_broker:publish(Msg), Session}.
 
+drop_qos2_msg(PacketId, Msg, RC) ->
+    ?SLOG(
+        warning,
+        #{
+            msg => "dropped_qos2_packet",
+            reason => emqx_reason_codes:name(RC),
+            packet_id => PacketId
+        },
+        #{topic => Msg#message.topic}
+    ),
+    ok = emqx_metrics:inc('messages.dropped'),
+    ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, emqx_reason_codes:name(RC)]),
+    {error, RC}.
+
 is_awaiting_full(#session{max_awaiting_rel = infinity}) ->
     false;
 is_awaiting_full(#session{

+ 39 - 9
apps/emqx/test/emqx_session_SUITE.erl

@@ -165,15 +165,45 @@ t_publish_qos2(_) ->
 
 t_publish_qos2_with_error_return(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
-    Session = session(#{
-        max_awaiting_rel => 2,
-        awaiting_rel => #{1 => ts(millisecond)}
-    }),
-    Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
-    {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session),
-    {ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session),
-    ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
-    {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1).
+    ok = meck:expect(emqx_hooks, run, fun
+        ('message.dropped', [Msg, _By, ReasonName]) ->
+            self() ! {'message.dropped', ReasonName, Msg},
+            ok;
+        (_Hook, _Arg) ->
+            ok
+    end),
+
+    Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{PacketId1 = 1 => ts(millisecond)}}),
+    begin
+        Msg1 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload1">>),
+        {error, RC1 = ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(
+            clientinfo(), PacketId1, Msg1, Session
+        ),
+        receive
+            {'message.dropped', Reason1, RecMsg1} ->
+                ?assertEqual(Reason1, emqx_reason_codes:name(RC1)),
+                ?assertEqual(RecMsg1, Msg1)
+        after 1000 ->
+            ct:fail(?FUNCTION_NAME)
+        end
+    end,
+
+    begin
+        Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>),
+        {ok, [], Session1} = emqx_session:publish(clientinfo(), PacketId2 = 2, Msg2, Session),
+        ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
+        {error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(
+            clientinfo(), _PacketId3 = 3, Msg2, Session1
+        ),
+        receive
+            {'message.dropped', Reason2, RecMsg2} ->
+                ?assertEqual(Reason2, emqx_reason_codes:name(RC2)),
+                ?assertEqual(RecMsg2, Msg2)
+        after 1000 ->
+            ct:fail(?FUNCTION_NAME)
+        end
+    end,
+    ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end).
 
 t_is_awaiting_full_false(_) ->
     Session = session(#{max_awaiting_rel => infinity}),

+ 5 - 0
changes/v5.0.10-en.md

@@ -24,6 +24,11 @@
 
 - Binary packages for all platforms are now built on Erlang/OTP version 24.3.4.2 [#9293](https://github.com/emqx/emqx/pull/9293).
 
+- Added more `client.disconnected` events (and counter bumps) [#9327](https://github.com/emqx/emqx/pull/9327).
+  Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
+  performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
+  stale connection had to be 'discarded' (for clean session) or 'takeovered' (for non-clean session) by new connection.
+
 ## Bug fixes
 
 - Fix error log message when `mechanism` is missing in authentication config [#8924](https://github.com/emqx/emqx/pull/8924).

+ 4 - 0
changes/v5.0.10-zh.md

@@ -23,6 +23,10 @@
 
 - 为所有平台的二进制包升级了 Erlang/OTP 到 24.3.4.2 [#9293](https://github.com/emqx/emqx/pull/9293)。
 
+- 增加更多类型的 `client.disconnected` 事件(及计数器触发) [#9327](https://github.com/emqx/emqx/pull/9327)。
+  此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发,
+  但不会在旧 session 被新连接废弃时 (clean_session = true) ,或旧 session 被新连接接管时 (clean_session = false) 被触发。
+
 ## Bug fixes
 
 - 优化认认证配置中 `mechanism` 字段缺失情况下的错误日志 [#8924](https://github.com/emqx/emqx/pull/8924)。

+ 2 - 0
changes/v5.0.13-en.md

@@ -11,3 +11,5 @@
 - Refactor: use `POST` not `PUT` for `/users/{name}/change_pwd` [#9533](https://github.com/emqx/emqx/pull/9533).
 
 ## Bug fixes
+
+- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487).

+ 2 - 0
changes/v5.0.13-zh.md

@@ -11,3 +11,5 @@
 - 重构:  `/users/{name}/change_pwd` 的请求方式从 `PUT` 改为了 `POST` [#9533](https://github.com/emqx/emqx/pull/9533)。
 
 ## 修复
+
+- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。