Jelajahi Sumber

fix: only override messages with larger expiry time

Thales Macedo Garitezi 1 tahun lalu
induk
melakukan
fdbc3c0fe6

+ 1 - 2
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -228,9 +228,8 @@ clear_expired(_State, Deadline, Limit) ->
                         msg = #message{timestamp = Ts}
                     }
                 ) ->
-                    ExpiryTime = Ts + OverrideMS,
                     StoredExpiryTime =/= 0 andalso
-                        ExpiryTime < Deadline
+                        min(Ts + OverrideMS, StoredExpiryTime) < Deadline
                 end
         end,
     S1 = emqx_utils_stream:filter(FilterFn, S0),

+ 33 - 7
apps/emqx_retainer/test/emqx_retainer_gc_SUITE.erl

@@ -120,21 +120,46 @@ t_limited_gc_runtime(Config) ->
 t_expiry_override(Config) ->
     NumMsgs = 10,
     ?check_trace(
+        #{timetrap => 10_000},
         begin
-            ok = store_retained(NumMsgs, [{expiry_interval_s, 1000} | Config]),
-            ok = set_message_expiry_override(<<"500ms">>, Config),
+            %% Longer than override
+            ok = store_retained(NumMsgs, [{expiry_interval_s, 1000}, {start_n, 1} | Config]),
+            %% Shorter than override
+            ok = store_retained(NumMsgs, [{expiry_interval_s, 1}, {start_n, NumMsgs + 1} | Config]),
+            %% Would never expire.
+            ok = store_retained(NumMsgs, [
+                {expiry_interval_s, 0}, {start_n, 2 * NumMsgs + 1} | Config
+            ]),
+
+            ok = set_message_expiry_override(<<"1500ms">>, Config),
             {ok, {ok, _}} = ?wait_async_action(
-                enable_clear_expired(_Interval = 1000, Config),
-                #{?snk_kind := emqx_retainer_cleared_expired}
+                enable_clear_expired(_Interval = 2_000, Config),
+                #{?snk_kind := emqx_retainer_cleared_expired, n_cleared := N} when
+                    N > 0
             ),
             ok = disable_clear_expired(Config),
             ok = unset_message_expiry_override(Config),
             ok
         end,
         fun(Trace) ->
+            SubTrace =
+                lists:filter(
+                    fun
+                        (#{?snk_kind := emqx_retainer_cleared_expired, n_cleared := N}) when
+                            N > 0
+                        ->
+                            true;
+                        (_) ->
+                            false
+                    end,
+                    Trace
+                ),
+            %% `NumMsgs * 2' because we have one batch that was already going to expire
+            %% without the override, and one batch that is overridden.
+            ExpectedCleared1 = NumMsgs * 2,
             ?assertMatch(
-                [#{complete := true, n_cleared := NumMsgs}],
-                ?of_kind(emqx_retainer_cleared_expired, Trace)
+                [#{complete := true, n_cleared := ExpectedCleared1}],
+                SubTrace
             ),
             ok
         end
@@ -143,6 +168,7 @@ t_expiry_override(Config) ->
 
 store_retained(NMessages, Config) ->
     [N1 | _] = ?config(cluster, Config),
+    StartN = proplists:get_value(start_n, Config, 1),
     ExpiryInterval = proplists:get_value(expiry_interval_s, Config, 1),
     ?ON(
         N1,
@@ -159,7 +185,7 @@ store_retained(NMessages, Config) ->
                 ),
                 emqx:publish(Msg)
             end,
-            lists:seq(1, NMessages)
+            lists:seq(StartN, StartN + NMessages - 1)
         )
     ).