Przeglądaj źródła

Merge pull request #14059 from thalesmg/20241022-r58-retainer-expiry

feat(retainer): add option to override message expiry interval for gc and throttle `table_is_full` logs
Thales Macedo Garitezi 1 rok temu
rodzic
commit
e33a3cee2a

+ 3 - 2
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -80,10 +80,11 @@
     cannot_publish_to_topic_due_to_not_authorized,
     cannot_publish_to_topic_due_to_quota_exceeded,
     connection_rejected_due_to_license_limit_reached,
-    dropped_msg_due_to_mqueue_is_full,
-    socket_receive_paused_by_rate_limit,
     data_bridge_buffer_overflow,
+    dropped_msg_due_to_mqueue_is_full,
     external_broker_crashed,
+    failed_to_retain_message,
+    socket_receive_paused_by_rate_limit,
     unrecoverable_resource_error
 ]).
 

+ 32 - 10
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -199,11 +199,14 @@ store_retained(State, Msg = #message{topic = Topic}) ->
     Tokens = topic_to_tokens(Topic),
     case is_table_full(State) andalso is_new_topic(Tokens) of
         true ->
-            ?SLOG(error, #{
-                msg => "failed_to_retain_message",
-                topic => Topic,
-                reason => table_is_full
-            });
+            ?SLOG_THROTTLE(
+                error,
+                #{
+                    msg => failed_to_retain_message,
+                    reason => table_is_full
+                },
+                #{topic => Topic}
+            );
         false ->
             do_store_retained(Msg, Tokens, ExpiryTime),
             ?tp(message_retained, #{topic => Topic}),
@@ -212,12 +215,31 @@ store_retained(State, Msg = #message{topic = Topic}) ->
 
 clear_expired(_State, Deadline, Limit) ->
     S0 = ets_stream(?TAB_MESSAGE),
-    S1 = emqx_utils_stream:filter(
-        fun(#retained_message{expiry_time = ExpiryTime}) ->
-            ExpiryTime =/= 0 andalso ExpiryTime < Deadline
+    AllowNeverExpire = emqx_conf:get([retainer, allow_never_expire]),
+    FilterFn =
+        case emqx_conf:get([retainer, msg_expiry_interval_override]) of
+            disabled ->
+                fun(#retained_message{expiry_time = ExpiryTime}) ->
+                    ExpiryTime =/= 0 andalso ExpiryTime < Deadline
+                end;
+            OverrideMS ->
+                fun(
+                    #retained_message{
+                        expiry_time = StoredExpiryTime,
+                        msg = #message{timestamp = Ts}
+                    }
+                ) ->
+                    case StoredExpiryTime of
+                        0 when not AllowNeverExpire ->
+                            Ts + OverrideMS < Deadline;
+                        0 ->
+                            false;
+                        _ ->
+                            min(Ts + OverrideMS, StoredExpiryTime) < Deadline
+                    end
+                end
         end,
-        S0
-    ),
+    S1 = emqx_utils_stream:filter(FilterFn, S0),
     DirtyWriteIndices = dirty_indices(write),
     S2 = emqx_utils_stream:map(
         fun(RetainedMsg) ->

+ 8 - 0
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -51,6 +51,14 @@ fields("retainer") ->
                 msg_expiry_interval,
                 <<"0s">>
             )},
+        {msg_expiry_interval_override,
+            sc(
+                %% not used in a `receive ... after' block, just timestamp comparison
+                hoconsc:union([disabled, emqx_schema:duration_ms()]),
+                msg_expiry_interval_override,
+                disabled
+            )},
+        {allow_never_expire, sc(boolean(), allow_never_expire, true)},
         {msg_clear_interval,
             sc(
                 emqx_schema:timeout_duration_ms(),

+ 94 - 2
apps/emqx_retainer/test/emqx_retainer_gc_SUITE.erl

@@ -115,8 +115,79 @@ t_limited_gc_runtime(Config) ->
         end
     ).
 
+%% Verifies that, even if a retained message has a long expiry time, if we set a maximum
+%% expiry time override, the latter wins.
+t_expiry_override(Config) ->
+    NumMsgs = 10,
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            %% Longer than override
+            ok = store_retained(NumMsgs, [{expiry_interval_s, 1000}, {start_n, 1} | Config]),
+            %% Shorter than override
+            ok = store_retained(NumMsgs, [{expiry_interval_s, 1}, {start_n, NumMsgs + 1} | Config]),
+            %% Would never expire.
+            ok = store_retained(NumMsgs, [
+                {expiry_interval_s, 0}, {start_n, 2 * NumMsgs + 1} | Config
+            ]),
+
+            ok = set_message_expiry_override(<<"1500ms">>, Config),
+            %% Initially, we don't override messages with infinity expiry time.
+            ok = set_allow_never_expire(true, Config),
+            {ok, {ok, _}} = ?wait_async_action(
+                enable_clear_expired(_Interval = 2_000, Config),
+                #{?snk_kind := emqx_retainer_cleared_expired, n_cleared := N} when
+                    N > 0
+            ),
+            %% Now we override messages with infinity expiry time.
+            {ok, {ok, _}} = ?wait_async_action(
+                begin
+                    ok = set_allow_never_expire(false, Config),
+                    enable_clear_expired(_Interval2 = 500, Config)
+                end,
+                #{?snk_kind := emqx_retainer_cleared_expired}
+            ),
+
+            ok = set_allow_never_expire(true, Config),
+            ok = disable_clear_expired(Config),
+            ok = unset_message_expiry_override(Config),
+            ok
+        end,
+        fun(Trace) ->
+            SubTrace =
+                lists:filter(
+                    fun
+                        (#{?snk_kind := emqx_retainer_cleared_expired, n_cleared := N}) when
+                            N > 0
+                        ->
+                            true;
+                        (_) ->
+                            false
+                    end,
+                    Trace
+                ),
+            %% `NumMsgs * 2' because we have one batch that was already going to expire
+            %% without the override, and one batch that is overridden.
+            ExpectedCleared1 = NumMsgs * 2,
+            %% Second cleared batch contains the messages that would never expire, after
+            %% we set `allow_never_expire = false' so they are affected by GC.
+            ExpectedCleared2 = NumMsgs,
+            ?assertMatch(
+                [
+                    #{complete := true, n_cleared := ExpectedCleared1},
+                    #{complete := true, n_cleared := ExpectedCleared2}
+                ],
+                SubTrace
+            ),
+            ok
+        end
+    ),
+    ok.
+
 store_retained(NMessages, Config) ->
     [N1 | _] = ?config(cluster, Config),
+    StartN = proplists:get_value(start_n, Config, 1),
+    ExpiryInterval = proplists:get_value(expiry_interval_s, Config, 1),
     ?ON(
         N1,
         lists:foreach(
@@ -128,11 +199,11 @@ store_retained(NMessages, Config) ->
                     <<"retained/", Num/binary>>,
                     <<"payload">>,
                     #{retain => true},
-                    #{properties => #{'Message-Expiry-Interval' => 1}}
+                    #{properties => #{'Message-Expiry-Interval' => ExpiryInterval}}
                 ),
                 emqx:publish(Msg)
             end,
-            lists:seq(1, NMessages)
+            lists:seq(StartN, StartN + NMessages - 1)
         )
     ).
 
@@ -145,3 +216,24 @@ disable_clear_expired(Config) ->
     [N1 | _] = ?config(cluster, Config),
     {ok, _} = ?ON(N1, emqx_retainer:update_config(#{<<"msg_clear_interval">> => 0})),
     ok.
+
+set_message_expiry_override(Override, Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(
+        N1, emqx_retainer:update_config(#{<<"msg_expiry_interval_override">> => Override})
+    ),
+    ok.
+
+unset_message_expiry_override(Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(
+        N1, emqx_retainer:update_config(#{<<"msg_expiry_interval_override">> => <<"disabled">>})
+    ),
+    ok.
+
+set_allow_never_expire(Bool, Config) ->
+    [N1 | _] = ?config(cluster, Config),
+    {ok, _} = ?ON(
+        N1, emqx_retainer:update_config(#{<<"allow_never_expire">> => Bool})
+    ),
+    ok.

+ 1 - 0
changes/ce/feat-14059.en.md

@@ -0,0 +1 @@
+Added a new configuration to retainer that allows capping message expiry intervals defined for retained messages.  This allows garbage collection to remove messages earlier in case of depleted storage.

+ 7 - 0
rel/i18n/emqx_retainer_schema.hocon

@@ -59,4 +59,11 @@ http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038
 
 delivery_rate.desc:
 """The maximum rate of delivering retained messages"""
+
+msg_expiry_interval_override.desc:
+"""If set, this value will take precedence over any `Message-Expiry-Interval` property specified in retained MQTT messages, allowing messages to expire earlier if necessary."""
+
+allow_never_expire.desc:
+"""If true, retained messages set to never expire (i.e., whose `Message-Expiry-Interval = 0`) are not affected by the expiry time override.  This configuration only takes effect when `msg_expiry_interval_override` is set."""
+
 }