Przeglądaj źródła

feat(retainer): add new option to control whether override affects messages that don't expire

Thales Macedo Garitezi 1 rok temu
rodzic
commit
d2813b12f9

+ 9 - 2
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -215,6 +215,7 @@ store_retained(State, Msg = #message{topic = Topic}) ->
 
 clear_expired(_State, Deadline, Limit) ->
     S0 = ets_stream(?TAB_MESSAGE),
+    AllowNeverExpire = emqx_conf:get([retainer, allow_never_expire]),
     FilterFn =
         case emqx_conf:get([retainer, msg_expiry_interval_override]) of
             disabled ->
@@ -228,8 +229,14 @@ clear_expired(_State, Deadline, Limit) ->
                         msg = #message{timestamp = Ts}
                     }
                 ) ->
-                    StoredExpiryTime =/= 0 andalso
-                        min(Ts + OverrideMS, StoredExpiryTime) < Deadline
+                    case StoredExpiryTime of
+                        0 when not AllowNeverExpire ->
+                            Ts + OverrideMS < Deadline;
+                        0 ->
+                            false;
+                        _ ->
+                            min(Ts + OverrideMS, StoredExpiryTime) < Deadline
+                    end
                 end
         end,
     S1 = emqx_utils_stream:filter(FilterFn, S0),

+ 1 - 0
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -58,6 +58,7 @@ fields("retainer") ->
                 msg_expiry_interval_override,
                 disabled
             )},
+        {allow_never_expire, sc(boolean(), allow_never_expire, true)},
         {msg_clear_interval,
             sc(
                 emqx_schema:timeout_duration_ms(),

+ 26 - 1
apps/emqx_retainer/test/emqx_retainer_gc_SUITE.erl

@@ -132,11 +132,23 @@ t_expiry_override(Config) ->
             ]),
 
             ok = set_message_expiry_override(<<"1500ms">>, Config),
+            %% Initially, we don't override messages with infinity expiry time.
+            ok = set_allow_never_expire(true, Config),
             {ok, {ok, _}} = ?wait_async_action(
                 enable_clear_expired(_Interval = 2_000, Config),
                 #{?snk_kind := emqx_retainer_cleared_expired, n_cleared := N} when
                     N > 0
             ),
+            %% Now we override messages with infinity expiry time.
+            {ok, {ok, _}} = ?wait_async_action(
+                begin
+                    ok = set_allow_never_expire(false, Config),
+                    enable_clear_expired(_Interval2 = 500, Config)
+                end,
+                #{?snk_kind := emqx_retainer_cleared_expired}
+            ),
+
+            ok = set_allow_never_expire(true, Config),
             ok = disable_clear_expired(Config),
             ok = unset_message_expiry_override(Config),
             ok
@@ -157,8 +169,14 @@ t_expiry_override(Config) ->
             %% `NumMsgs * 2' because we have one batch that was already going to expire
             %% without the override, and one batch that is overridden.
             ExpectedCleared1 = NumMsgs * 2,
+            %% Second cleared batch contains the messages that would never expire, after
+            %% we set `allow_never_expire = false' so they are affected by GC.
+            ExpectedCleared2 = NumMsgs,
             ?assertMatch(
-                [#{complete := true, n_cleared := ExpectedCleared1}],
+                [
+                    #{complete := true, n_cleared := ExpectedCleared1},
+                    #{complete := true, n_cleared := ExpectedCleared2}
+                ],
                 SubTrace
             ),
             ok
@@ -212,3 +230,10 @@ unset_message_expiry_override(Config) ->
         N1, emqx_retainer:update_config(#{<<"msg_expiry_interval_override">> => <<"disabled">>})
     ),
     ok.
+
+set_allow_never_expire(Bool, Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(
+        N1, emqx_retainer:update_config(#{<<"allow_never_expire">> => Bool})
+    ),
+    ok.

+ 3 - 0
rel/i18n/emqx_retainer_schema.hocon

@@ -63,4 +63,7 @@ delivery_rate.desc:
 msg_expiry_interval_override.desc:
 """If set, this value will take precedence over any `Message-Expiry-Interval` property specified in retained MQTT messages, allowing messages to expire earlier if necessary."""
 
+allow_never_expire.desc:
+"""If true, retained messages set to never expire (i.e., whose `Message-Expiry-Interval = 0`) are not affected by the expiry time override.  This configuration only takes effect when `msg_expiry_interval_override` is set."""
+
 }