Просмотр исходного кода

feat: configurable server side message_expiry_interval

Shawn 2 лет назад
Родитель
Сommit
12da3c0986

+ 2 - 1
apps/emqx/src/emqx_channel.erl

@@ -930,7 +930,8 @@ handle_deliver(
     Delivers1 = maybe_nack(Delivers),
     Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
     NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
-    {ok, Channel#channel{session = NSession}};
+    %% we need to update stats here, as the stats_timer is canceled after disconnected
+    {ok, {event, updated}, Channel#channel{session = NSession}};
 handle_deliver(Delivers, Channel) ->
     Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)),
     do_handle_deliver(Delivers1, Channel).

+ 14 - 8
apps/emqx/src/emqx_message.erl

@@ -65,7 +65,7 @@
 ]).
 
 -export([
-    is_expired/1,
+    is_expired/2,
     update_expiry/1,
     timestamp_now/0
 ]).
@@ -273,14 +273,20 @@ remove_header(Hdr, Msg = #message{headers = Headers}) ->
         false -> Msg
     end.
 
--spec is_expired(emqx_types:message()) -> boolean().
-is_expired(#message{
-    headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
-    timestamp = CreatedAt
-}) ->
+-spec is_expired(emqx_types:message(), atom()) -> boolean().
+is_expired(
+    #message{
+        headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
+        timestamp = CreatedAt
+    },
+    _
+) ->
     elapsed(CreatedAt) > timer:seconds(Interval);
-is_expired(_Msg) ->
-    false.
+is_expired(#message{timestamp = CreatedAt}, Zone) ->
+    case emqx_config:get_zone_conf(Zone, [mqtt, message_expiry_interval], infinity) of
+        infinity -> false;
+        Interval -> elapsed(CreatedAt) > Interval
+    end.
 
 -spec update_expiry(emqx_types:message()) -> emqx_types:message().
 update_expiry(

+ 9 - 0
apps/emqx/src/emqx_schema.erl

@@ -3745,6 +3745,15 @@ mqtt_session() ->
                     importance => ?IMPORTANCE_LOW
                 }
             )},
+        {"message_expiry_interval",
+            sc(
+                hoconsc:union([duration(), infinity]),
+                #{
+                    default => infinity,
+                    desc => ?DESC(mqtt_message_expiry_interval),
+                    importance => ?IMPORTANCE_LOW
+                }
+            )},
         {"max_awaiting_rel",
             sc(
                 hoconsc:union([non_neg_integer(), infinity]),

+ 4 - 4
apps/emqx/src/emqx_session_mem.erl

@@ -468,12 +468,12 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
 
 dequeue(_ClientInfo, 0, Msgs, Q) ->
     {lists:reverse(Msgs), Q};
-dequeue(ClientInfo, Cnt, Msgs, Q) ->
+dequeue(ClientInfo = #{zone := Zone}, Cnt, Msgs, Q) ->
     case emqx_mqueue:out(Q) of
         {empty, _Q} ->
             dequeue(ClientInfo, 0, Msgs, Q);
         {{value, Msg}, Q1} ->
-            case emqx_message:is_expired(Msg) of
+            case emqx_message:is_expired(Msg, Zone) of
                 true ->
                     _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
                     dequeue(ClientInfo, Cnt, Msgs, Q1);
@@ -619,14 +619,14 @@ retry_delivery(
     end.
 
 do_retry_delivery(
-    ClientInfo,
+    ClientInfo = #{zone := Zone},
     PacketId,
     #inflight_data{phase = wait_ack, message = Msg} = Data,
     Now,
     Acc,
     Inflight
 ) ->
-    case emqx_message:is_expired(Msg) of
+    case emqx_message:is_expired(Msg, Zone) of
         true ->
             _ = emqx_session_events:handle_event(ClientInfo, {expired, Msg}),
             {Acc, emqx_inflight:delete(PacketId, Inflight)};

+ 1 - 0
apps/emqx/test/emqx_config_SUITE.erl

@@ -446,6 +446,7 @@ zone_global_defaults() ->
                 response_information => [],
                 retain_available => true,
                 retry_interval => 30000,
+                message_expiry_interval => infinity,
                 server_keepalive => disabled,
                 session_expiry_interval => 7200000,
                 shared_subscription => true,

+ 3 - 3
apps/emqx/test/emqx_message_SUITE.erl

@@ -143,12 +143,12 @@ t_undefined_headers(_) ->
 
 t_is_expired(_) ->
     Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
-    ?assertNot(emqx_message:is_expired(Msg)),
+    ?assertNot(emqx_message:is_expired(Msg, ?MODULE)),
     Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg),
     timer:sleep(500),
-    ?assertNot(emqx_message:is_expired(Msg1)),
+    ?assertNot(emqx_message:is_expired(Msg1, ?MODULE)),
     timer:sleep(600),
-    ?assert(emqx_message:is_expired(Msg1)),
+    ?assert(emqx_message:is_expired(Msg1, ?MODULE)),
     timer:sleep(1000),
     Msg = emqx_message:update_expiry(Msg),
     Msg2 = emqx_message:update_expiry(Msg1),

+ 1 - 0
apps/emqx/test/emqx_session_mem_SUITE.erl

@@ -545,6 +545,7 @@ clientinfo() -> clientinfo(#{}).
 clientinfo(Init) ->
     maps:merge(
         #{
+            zone => ?MODULE,
             clientid => <<"clientid">>,
             username => <<"username">>
         },

+ 15 - 0
rel/config/examples/mqtt.conf.example

@@ -82,6 +82,21 @@ mqtt {
     ## Specifies how long the session will expire after the connection is disconnected, only for non-MQTT 5.0 connections
     session_expiry_interval = 2h
 
+    ## The expiry interval of MQTT messages.
+    ##
+    ## For MQTT 5.0 clients, this configuration will only take effect when the
+    ##  Message-Expiry-Interval property is not set in the message; otherwise, the
+    ##  value of the Message-Expiry-Interval property will be used.
+    ## For MQTT versions older than 5.0, this configuration will always take effect.
+    ## Please note that setting message_expiry_interval greater than session_expiry_interval
+    ##  is meaningless, as all messages will be cleared when the session expires.
+    ##
+    ## Type:
+    ##   - infinity :: Never expire
+    ##   - Time Duration :: The expiry interval
+    ## Default: infinity
+    message_expiry_interval = infinity
+
     ## Maximum queue length. Enqueued messages when persistent client disconnected, or inflight window is full
     ## Type: infinity | Integer
     max_mqueue_len = 1000

+ 6 - 0
rel/i18n/emqx_schema.hocon

@@ -191,6 +191,12 @@ mqtt_session_expiry_interval.desc:
 mqtt_session_expiry_interval.label:
 """Session Expiry Interval"""
 
+mqtt_message_expiry_interval.desc:
+"""The expiry interval of MQTT messages. For MQTT 5.0 clients, this configuration will only take effect when the Message-Expiry-Interval property is not set in the message; otherwise, the value of the Message-Expiry-Interval property will be used. For MQTT versions older than 5.0, this configuration will always take effect. Please note that setting message_expiry_interval greater than session_expiry_interval is meaningless, as all messages will be cleared when the session expires."""
+
+mqtt_message_expiry_interval.label:
+"""Message Expiry Interval"""
+
 fields_listener_enabled.desc:
 """Enable listener."""