Преглед изворни кода

feat(fs-gc): wire gc up with emqx config

Andrew Mayorov пре 3 година
родитељ
комит
e1dc48fa2b

+ 35 - 0
apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf

@@ -33,4 +33,39 @@ emqx_ft_schema {
         }
         }
     }
     }
 
 
+    local_storage_gc {
+        desc {
+            en: "Garbage collection settings for the intermediate and temporary files in the local file system."
+            zh: ""
+        }
+        label: {
+            en: "Local Storage GC"
+            zh: ""
+        }
+    }
+
+    storage_gc_interval {
+        desc {
+            en: "Interval of periodic garbage collection."
+            zh: ""
+        }
+        label: {
+            en: "GC Interval"
+            zh: ""
+        }
+    }
+
+    storage_gc_max_segments_ttl {
+        desc {
+            en: "Maximum TTL of a segment kept in the local file system.<br/>"
+                "This is a hard limit: no segment will outlive this TTL, even if some file transfer specifies a "
+                "TTL more than that."
+            zh: ""
+        }
+        label: {
+            en: "GC Interval"
+            zh: ""
+        }
+    }
+
 }
 }

+ 13 - 5
apps/emqx_ft/src/emqx_ft_conf.erl

@@ -50,17 +50,25 @@ storage() ->
 
 
 -spec gc_interval(_Storage) -> milliseconds().
 -spec gc_interval(_Storage) -> milliseconds().
 gc_interval(_Storage) ->
 gc_interval(_Storage) ->
-    % TODO: config wiring
-    application:get_env(emqx_ft, gc_interval, timer:minutes(10)).
+    Conf = assert_storage(local),
+    emqx_map_lib:deep_get([gc, interval], Conf).
 
 
 -spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
 -spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
 segments_ttl(_Storage) ->
 segments_ttl(_Storage) ->
-    % TODO: config wiring
+    Conf = assert_storage(local),
     {
     {
-        application:get_env(emqx_ft, min_segments_ttl, 60),
-        application:get_env(emqx_ft, max_segments_ttl, 72 * 3600)
+        emqx_map_lib:deep_get([gc, minimum_segments_ttl], Conf),
+        emqx_map_lib:deep_get([gc, maximum_segments_ttl], Conf)
     }.
     }.
 
 
+assert_storage(Type) ->
+    case storage() of
+        Conf = #{type := Type} ->
+            Conf;
+        Conf ->
+            error({inapplicable, Conf})
+    end.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% API
 %% API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 32 - 1
apps/emqx_ft/src/emqx_ft_schema.erl

@@ -65,13 +65,44 @@ fields(local_storage) ->
             type => binary(),
             type => binary(),
             desc => ?DESC("local_storage_root"),
             desc => ?DESC("local_storage_root"),
             required => false
             required => false
+        }},
+        {gc, #{
+            type => hoconsc:ref(?MODULE, local_storage_gc),
+            desc => ?DESC("local_storage_gc"),
+            required => false
+        }}
+    ];
+fields(local_storage_gc) ->
+    [
+        {interval, #{
+            type => emqx_schema:duration_ms(),
+            desc => ?DESC("storage_gc_interval"),
+            required => false,
+            default => "1h"
+        }},
+        {maximum_segments_ttl, #{
+            type => emqx_schema:duration_s(),
+            desc => ?DESC("storage_gc_max_segments_ttl"),
+            required => false,
+            default => "24h"
+        }},
+        {minimum_segments_ttl, #{
+            type => emqx_schema:duration_s(),
+            % desc => ?DESC("storage_gc_min_segments_ttl"),
+            required => false,
+            default => "5m",
+            % NOTE
+            % This setting does not seem to be useful to an end-user.
+            hidden => true
         }}
         }}
     ].
     ].
 
 
 desc(file_transfer) ->
 desc(file_transfer) ->
     "File transfer settings";
     "File transfer settings";
 desc(local_storage) ->
 desc(local_storage) ->
-    "File transfer local storage settings".
+    "File transfer local storage settings";
+desc(local_storage_gc) ->
+    "Garbage collection settings for the File transfer local storage backend".
 
 
 schema(filemeta) ->
 schema(filemeta) ->
     #{
     #{

+ 19 - 13
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -37,16 +37,22 @@ end_per_suite(_Config) ->
     ok.
     ok.
 
 
 init_per_testcase(TC, Config) ->
 init_per_testcase(TC, Config) ->
-    _ = application:unset_env(emqx_ft, gc_interval),
-    _ = application:unset_env(emqx_ft, min_segments_ttl),
-    _ = application:unset_env(emqx_ft, max_segments_ttl),
     ok = emqx_common_test_helpers:start_app(
     ok = emqx_common_test_helpers:start_app(
         emqx_ft,
         emqx_ft,
         fun(emqx_ft) ->
         fun(emqx_ft) ->
-            ok = emqx_config:put([file_transfer, storage], #{
-                type => local,
-                root => mk_root(TC, Config)
-            })
+            emqx_common_test_helpers:load_config(
+                emqx_ft_schema,
+                iolist_to_binary([
+                    "file_transfer {"
+                    "  storage = {"
+                    "    type = \"local\","
+                    "    root = \"",
+                    mk_root(TC, Config),
+                    "\""
+                    "  }"
+                    "}"
+                ])
+            )
         end
         end
     ),
     ),
     Config.
     Config.
@@ -64,7 +70,7 @@ mk_root(TC, Config) ->
 
 
 t_gc_triggers_periodically(_Config) ->
 t_gc_triggers_periodically(_Config) ->
     Interval = 500,
     Interval = 500,
-    ok = application:set_env(emqx_ft, gc_interval, Interval),
+    ok = emqx_config:put([file_transfer, storage, gc, interval], Interval),
     ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
     ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
     ?check_trace(
     ?check_trace(
         timer:sleep(Interval * 3),
         timer:sleep(Interval * 3),
@@ -165,8 +171,8 @@ t_gc_complete_transfers(_Config) ->
     ).
     ).
 
 
 t_gc_incomplete_transfers(_Config) ->
 t_gc_incomplete_transfers(_Config) ->
-    _ = application:set_env(emqx_ft, min_segments_ttl, 0),
-    _ = application:set_env(emqx_ft, max_segments_ttl, 4),
+    ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0),
+    ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 4),
     Storage = emqx_ft_conf:storage(),
     Storage = emqx_ft_conf:storage(),
     Transfers = [
     Transfers = [
         {
         {
@@ -195,7 +201,7 @@ t_gc_incomplete_transfers(_Config) ->
     ?check_trace(
     ?check_trace(
         begin
         begin
             % 2. Enable periodic GC every 0.5 seconds.
             % 2. Enable periodic GC every 0.5 seconds.
-            ok = application:set_env(emqx_ft, gc_interval, 500),
+            ok = emqx_config:put([file_transfer, storage, gc, interval], 500),
             ok = emqx_ft_storage_fs_gc:reset(Storage),
             ok = emqx_ft_storage_fs_gc:reset(Storage),
             % 3. First we need the first transfer to be collected.
             % 3. First we need the first transfer to be collected.
             {ok, _} = ?block_until(
             {ok, _} = ?block_until(
@@ -241,8 +247,8 @@ t_gc_incomplete_transfers(_Config) ->
     ).
     ).
 
 
 t_gc_handling_errors(_Config) ->
 t_gc_handling_errors(_Config) ->
-    _ = application:set_env(emqx_ft, min_segments_ttl, 0),
-    _ = application:set_env(emqx_ft, max_segments_ttl, 0),
+    ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0),
+    ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 0),
     Storage = emqx_ft_conf:storage(),
     Storage = emqx_ft_conf:storage(),
     Transfer1 = {<<"client1">>, mk_file_id()},
     Transfer1 = {<<"client1">>, mk_file_id()},
     Transfer2 = {<<"client2">>, mk_file_id()},
     Transfer2 = {<<"client2">>, mk_file_id()},