Преглед на файлове

feat(retainer): add option to override message expiry interval for gc

Fixes https://emqx.atlassian.net/browse/EMQX-12739
Thales Macedo Garitezi преди 1 година
родител
ревизия
a7d1be87a5

+ 18 - 5
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -215,12 +215,25 @@ store_retained(State, Msg = #message{topic = Topic}) ->
 
 clear_expired(_State, Deadline, Limit) ->
     S0 = ets_stream(?TAB_MESSAGE),
-    S1 = emqx_utils_stream:filter(
-        fun(#retained_message{expiry_time = ExpiryTime}) ->
-            ExpiryTime =/= 0 andalso ExpiryTime < Deadline
+    FilterFn =
+        case emqx_conf:get([retainer, msg_expiry_interval_override]) of
+            disabled ->
+                fun(#retained_message{expiry_time = ExpiryTime}) ->
+                    ExpiryTime =/= 0 andalso ExpiryTime < Deadline
+                end;
+            OverrideMS ->
+                fun(
+                    #retained_message{
+                        expiry_time = StoredExpiryTime,
+                        msg = #message{timestamp = Ts}
+                    }
+                ) ->
+                    ExpiryTime = Ts + OverrideMS,
+                    StoredExpiryTime =/= 0 andalso
+                        ExpiryTime < Deadline
+                end
         end,
-        S0
-    ),
+    S1 = emqx_utils_stream:filter(FilterFn, S0),
     DirtyWriteIndices = dirty_indices(write),
     S2 = emqx_utils_stream:map(
         fun(RetainedMsg) ->

+ 7 - 0
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -51,6 +51,13 @@ fields("retainer") ->
                 msg_expiry_interval,
                 <<"0s">>
             )},
+        {msg_expiry_interval_override,
+            sc(
+                %% not used in a `receive ... after' block, just timestamp comparison
+                hoconsc:union([disabled, emqx_schema:duration_ms()]),
+                msg_expiry_interval_override,
+                disabled
+            )},
         {msg_clear_interval,
             sc(
                 emqx_schema:timeout_duration_ms(),

+ 42 - 1
apps/emqx_retainer/test/emqx_retainer_gc_SUITE.erl

@@ -115,8 +115,35 @@ t_limited_gc_runtime(Config) ->
         end
     ).
 
+%% Verifies that, even if a retained message has a long expiry time, if we set a maximum
+%% expiry time override, the latter wins.
+t_expiry_override(Config) ->
+    NumMsgs = 10,
+    ?check_trace(
+        begin
+            ok = store_retained(NumMsgs, [{expiry_interval_s, 1000} | Config]),
+            ok = set_message_expiry_override(<<"500ms">>, Config),
+            {ok, {ok, _}} = ?wait_async_action(
+                enable_clear_expired(_Interval = 1000, Config),
+                #{?snk_kind := emqx_retainer_cleared_expired}
+            ),
+            ok = disable_clear_expired(Config),
+            ok = unset_message_expiry_override(Config),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [#{complete := true, n_cleared := NumMsgs}],
+                ?of_kind(emqx_retainer_cleared_expired, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
 store_retained(NMessages, Config) ->
     [N1 | _] = ?config(cluster, Config),
+    ExpiryInterval = proplists:get_value(expiry_interval_s, Config, 1),
     ?ON(
         N1,
         lists:foreach(
@@ -128,7 +155,7 @@ store_retained(NMessages, Config) ->
                     <<"retained/", Num/binary>>,
                     <<"payload">>,
                     #{retain => true},
-                    #{properties => #{'Message-Expiry-Interval' => 1}}
+                    #{properties => #{'Message-Expiry-Interval' => ExpiryInterval}}
                 ),
                 emqx:publish(Msg)
             end,
@@ -145,3 +172,17 @@ disable_clear_expired(Config) ->
     [N1 | _] = ?config(cluster, Config),
     {ok, _} = ?ON(N1, emqx_retainer:update_config(#{<<"msg_clear_interval">> => 0})),
     ok.
+
+set_message_expiry_override(Override, Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(
+        N1, emqx_retainer:update_config(#{<<"msg_expiry_interval_override">> => Override})
+    ),
+    ok.
+
+unset_message_expiry_override(Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(
+        N1, emqx_retainer:update_config(#{<<"msg_expiry_interval_override">> => <<"disabled">>})
+    ),
+    ok.

+ 1 - 0
changes/ce/feat-14059.en.md

@@ -0,0 +1 @@
+Added a new configuration to retainer that allows overriding message expiry intervals defined for each retained message.  This allows garbage collection to remove messages earlier in case of depleted storage.

+ 4 - 0
rel/i18n/emqx_retainer_schema.hocon

@@ -59,4 +59,8 @@ http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
 
 delivery_rate.desc:
 """The maximum rate of delivering retained messages"""
+
+msg_expiry_interval_override.desc:
+"""If set, this value will take precedence over any `Message-Expiry-Interval` property specified in retained MQTT messages, allowing messages to expire earlier if necessary."""
+
 }