Przeglądaj źródła

refactor(s3): use single pool per profile

Andrew Mayorov 1 rok temu
rodzic
commit
ac0e600db3

+ 19 - 3
apps/emqx_s3/src/emqx_s3_client_http.erl

@@ -16,8 +16,8 @@
 
 -export([
     start_pool/2,
-    stop_pool/1,
-    pool_config/1
+    update_pool/2,
+    stop_pool/1
 ]).
 
 -export_type([t/0]).
@@ -172,7 +172,6 @@ request_timeout(C) ->
 %%
 
 start_pool(PoolName, ProfileConfig) ->
-    %% FIXME
     PoolConfig = pool_config(ProfileConfig),
     ?SLOG(debug, #{msg => "s3_starting_http_pool", pool_name => PoolName, config => PoolConfig}),
     case hackney_pool:start_pool(PoolName, PoolConfig) of
@@ -184,6 +183,23 @@ start_pool(PoolName, ProfileConfig) ->
             Error
     end.
 
+update_pool(PoolName, ProfileConfig) ->
+    PoolConfig = pool_config(ProfileConfig),
+    ?SLOG(debug, #{msg => "s3_updating_http_pool", pool_name => PoolName, config => PoolConfig}),
+    lists:foldl(
+        fun
+            (PoolSetting, ok) -> update_pool_setting(PoolName, PoolSetting);
+            (_, Error) -> Error
+        end,
+        ok,
+        PoolConfig
+    ).
+
+update_pool_setting(PoolName, {max_connections, PoolSize}) ->
+    hackney_pool:set_max_connections(PoolName, PoolSize);
+update_pool_setting(PoolName, {timeout, Keepalive}) ->
+    hackney_pool:set_timeout(PoolName, Keepalive).
+
 stop_pool(PoolName) ->
     case hackney_pool:stop_pool(PoolName) of
         ok ->

+ 11 - 104
apps/emqx_s3/src/emqx_s3_profile_conf.erl

@@ -57,9 +57,6 @@
 
 -define(DEFAULT_CALL_TIMEOUT, 5000).
 
--define(DEFAULT_HTTP_POOL_TIMEOUT, 60000).
--define(DEAFULT_HTTP_POOL_CLEANUP_INTERVAL, 60000).
-
 -define(SAFE_CALL_VIA_GPROC(ProfileId, Message, Timeout),
     ?SAFE_CALL_VIA_GPROC(id(ProfileId), Message, Timeout, profile_not_found)
 ).
@@ -115,7 +112,6 @@ init([ProfileId, ProfileConfig]) ->
     ok = cleanup_profile_pools(ProfileId),
     case start_http_pool(ProfileId, ProfileConfig) of
         {ok, PoolName} ->
-            HttpPoolCleanupInterval = http_pool_cleanup_interval(ProfileConfig),
             {ok, #{
                 profile_id => ProfileId,
                 profile_config => ProfileConfig,
@@ -123,15 +119,7 @@ init([ProfileId, ProfileConfig]) ->
                 upload_options => upload_options(ProfileConfig),
                 client_config => client_config(ProfileConfig, PoolName),
                 uploader_config => uploader_config(ProfileConfig),
-                pool_name => PoolName,
-                pool_clients => emqx_s3_profile_http_pool_clients:create_table(),
-                %% We don't expose these options to users currently, but use in tests
-                http_pool_timeout => http_pool_timeout(ProfileConfig),
-                http_pool_cleanup_interval => HttpPoolCleanupInterval,
-
-                outdated_pool_cleanup_tref => erlang:send_after(
-                    HttpPoolCleanupInterval, self(), cleanup_outdated
-                )
+                pool_name => PoolName
             }};
         {error, Reason} ->
             {stop, Reason}
@@ -165,8 +153,6 @@ handle_call(
                 upload_options => upload_options(NewProfileConfig),
                 client_config => client_config(NewProfileConfig, PoolName),
                 uploader_config => uploader_config(NewProfileConfig),
-                http_pool_timeout => http_pool_timeout(NewProfileConfig),
-                http_pool_cleanup_interval => http_pool_cleanup_interval(NewProfileConfig),
                 pool_name => PoolName
             },
             {reply, ok, NewState};
@@ -179,18 +165,6 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Request, State) ->
     {noreply, State}.
 
-handle_info({'DOWN', _Ref, process, Pid, _Reason}, State) ->
-    ok = unregister_client(Pid, State),
-    {noreply, State};
-handle_info(cleanup_outdated, #{http_pool_cleanup_interval := HttpPoolCleanupInterval} = State0) ->
-    %% Maybe cleanup asynchoronously
-    ok = cleanup_outdated_pools(State0),
-    State1 = State0#{
-        outdated_pool_cleanup_tref => erlang:send_after(
-            HttpPoolCleanupInterval, self(), cleanup_outdated
-        )
-    },
-    {noreply, State1};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -248,47 +222,20 @@ start_http_pool(ProfileId, ProfileConfig) ->
             Error
     end.
 
-update_http_pool(
-    ProfileId,
-    ProfileConfig,
-    #{pool_name := OldPoolName, profile_config := OldProfileConfig} = State
-) ->
-    HttpConfig = emqx_s3_client_http:pool_config(ProfileConfig),
-    OldHttpConfig = emqx_s3_client_http:pool_config(OldProfileConfig),
-    case OldHttpConfig =:= HttpConfig of
-        true ->
-            {ok, OldPoolName};
-        false ->
-            PoolName = pool_name(ProfileId),
-            case emqx_s3_client_http:start_pool(PoolName, ProfileConfig) of
-                ok ->
-                    ok = set_old_pool_outdated(State),
-                    ok = emqx_s3_profile_http_pools:register(ProfileId, PoolName),
-                    {ok, PoolName};
-                {error, _} = Error ->
-                    Error
-            end
+update_http_pool(_ProfileId, ProfileConfig, #{pool_name := PoolName}) ->
+    case emqx_s3_client_http:update_pool(PoolName, ProfileConfig) of
+        ok ->
+            {ok, PoolName};
+        {error, _} = Error ->
+            Error
     end.
 
 pool_name(ProfileId) ->
-    iolist_to_binary([
-        <<"s3-http-">>,
-        profile_id_to_bin(ProfileId),
-        <<"-">>,
-        integer_to_binary(erlang:system_time(millisecond)),
-        <<"-">>,
-        integer_to_binary(erlang:unique_integer([positive]))
-    ]).
+    iolist_to_binary([<<"s3-http-">>, profile_id_to_bin(ProfileId)]).
 
 profile_id_to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
 profile_id_to_bin(Bin) when is_binary(Bin) -> Bin.
 
-set_old_pool_outdated(#{
-    profile_id := ProfileId, pool_name := PoolName, http_pool_timeout := HttpPoolTimeout
-}) ->
-    _ = emqx_s3_profile_http_pools:set_outdated(ProfileId, PoolName, HttpPoolTimeout),
-    ok.
-
 cleanup_profile_pools(ProfileId) ->
     lists:foreach(
         fun(PoolName) ->
@@ -297,58 +244,18 @@ cleanup_profile_pools(ProfileId) ->
         emqx_s3_profile_http_pools:all(ProfileId)
     ).
 
-register_client(Pid, #{profile_id := ProfileId, pool_clients := PoolClients, pool_name := PoolName}) ->
-    MRef = monitor(process, Pid),
-    ok = emqx_s3_profile_http_pool_clients:register(PoolClients, Pid, MRef, PoolName),
+register_client(_Pid, #{profile_id := ProfileId, pool_name := PoolName}) ->
     _ = emqx_s3_profile_http_pools:register_client(ProfileId, PoolName),
     ok.
 
-unregister_client(
-    Pid,
-    #{
-        profile_id := ProfileId, pool_clients := PoolClients, pool_name := PoolName
-    }
-) ->
-    case emqx_s3_profile_http_pool_clients:unregister(PoolClients, Pid) of
-        undefined ->
-            ok;
-        {MRef, PoolName} ->
-            true = erlang:demonitor(MRef, [flush]),
-            _ = emqx_s3_profile_http_pools:unregister_client(ProfileId, PoolName),
-            ok;
-        {MRef, OutdatedPoolName} ->
-            true = erlang:demonitor(MRef, [flush]),
-            ClientNum = emqx_s3_profile_http_pools:unregister_client(ProfileId, OutdatedPoolName),
-            maybe_stop_outdated_pool(ProfileId, OutdatedPoolName, ClientNum)
-    end.
-
-maybe_stop_outdated_pool(ProfileId, OutdatedPoolName, 0) ->
-    ok = stop_http_pool(ProfileId, OutdatedPoolName);
-maybe_stop_outdated_pool(_ProfileId, _OutdatedPoolName, _ClientNum) ->
+unregister_client(_Pid, #{profile_id := ProfileId, pool_name := PoolName}) ->
+    _ = emqx_s3_profile_http_pools:unregister_client(ProfileId, PoolName),
     ok.
 
-cleanup_outdated_pools(#{profile_id := ProfileId}) ->
-    lists:foreach(
-        fun(PoolName) ->
-            ok = stop_http_pool(ProfileId, PoolName)
-        end,
-        emqx_s3_profile_http_pools:outdated(ProfileId)
-    ).
-
 %%--------------------------------------------------------------------
 %% HTTP Pool implementation dependent functions
 %%--------------------------------------------------------------------
 
-http_pool_cleanup_interval(ProfileConfig) ->
-    maps:get(
-        http_pool_cleanup_interval, ProfileConfig, ?DEAFULT_HTTP_POOL_CLEANUP_INTERVAL
-    ).
-
-http_pool_timeout(ProfileConfig) ->
-    maps:get(
-        http_pool_timeout, ProfileConfig, ?DEFAULT_HTTP_POOL_TIMEOUT
-    ).
-
 stop_http_pool(ProfileId, PoolName) ->
     ok = emqx_s3_client_http:stop_pool(PoolName),
     ok = emqx_s3_profile_http_pools:unregister(ProfileId, PoolName),

+ 0 - 36
apps/emqx_s3/src/emqx_s3_profile_http_pool_clients.erl

@@ -1,36 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
-
--module(emqx_s3_profile_http_pool_clients).
-
--export([
-    create_table/0,
-
-    register/4,
-    unregister/2
-]).
-
--define(TAB, ?MODULE).
-
--spec create_table() -> ok.
-create_table() ->
-    ets:new(?TAB, [
-        private,
-        set
-    ]).
-
--spec register(ets:tid(), pid(), reference(), emqx_s3_profile_http_pools:pool_name()) -> ok.
-register(Tab, Pid, MRef, PoolName) ->
-    true = ets:insert(Tab, {Pid, {MRef, PoolName}}),
-    ok.
-
--spec unregister(ets:tid(), pid()) ->
-    {reference(), emqx_s3_profile_http_pools:pool_name()} | undefined.
-unregister(Tab, Pid) ->
-    case ets:take(Tab, Pid) of
-        [{Pid, {MRef, PoolName}}] ->
-            {MRef, PoolName};
-        [] ->
-            undefined
-    end.

+ 0 - 25
apps/emqx_s3/src/emqx_s3_profile_http_pools.erl

@@ -15,9 +15,6 @@
     register_client/2,
     unregister_client/2,
 
-    set_outdated/3,
-
-    outdated/1,
     all/1
 ]).
 
@@ -84,28 +81,6 @@ unregister_client(ProfileId, PoolName) ->
             undefined
     end.
 
--spec set_outdated(emqx_s3:profile_id(), pool_name(), integer()) ->
-    ok.
-set_outdated(ProfileId, PoolName, Timeout) ->
-    Key = key(ProfileId, PoolName),
-    Now = erlang:monotonic_time(millisecond),
-    _ = ets:update_element(?TAB, Key, {#pool.deadline, Now + Timeout}),
-    ok.
-
--spec outdated(emqx_s3:profile_id()) ->
-    [pool_name()].
-outdated(ProfileId) ->
-    Now = erlang:monotonic_time(millisecond),
-    MS = ets:fun2ms(
-        fun(#pool{key = {CurProfileId, CurPoolName}, deadline = CurDeadline}) when
-            CurProfileId =:= ProfileId andalso
-                CurDeadline =/= undefined andalso CurDeadline < Now
-        ->
-            CurPoolName
-        end
-    ),
-    ets:select(?TAB, MS).
-
 -spec all(emqx_s3:profile_id()) ->
     [pool_name()].
 all(ProfileId) ->

+ 2 - 81
apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl

@@ -11,7 +11,8 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/asserts.hrl").
 
-all() -> emqx_common_test_helpers:all(?MODULE).
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
 
 suite() -> [{timetrap, {minutes, 1}}].
 
@@ -43,79 +44,6 @@ end_per_testcase(_TestCase, _Config) ->
 %% Test cases
 %%--------------------------------------------------------------------
 
-t_regular_outdated_pool_cleanup(Config) ->
-    _ = process_flag(trap_exit, true),
-    Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
-
-    [OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
-
-    ProfileBaseConfig = ?config(profile_config, Config),
-    ProfileConfig = emqx_utils_maps:deep_put(
-        [transport_options, pool_size], ProfileBaseConfig, 16
-    ),
-    ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
-
-    ?assertEqual(
-        2,
-        length(emqx_s3_profile_http_pools:all(profile_id()))
-    ),
-
-    ?assertWaitEvent(
-        ok = emqx_s3_uploader:abort(Pid),
-        #{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
-        1000
-    ),
-
-    [NewPool] = emqx_s3_profile_http_pools:all(profile_id()),
-
-    ?assertWaitEvent(
-        ok = emqx_s3:stop_profile(profile_id()),
-        #{?snk_kind := "s3_stop_http_pool", pool_name := NewPool},
-        1000
-    ),
-
-    ?assertEqual(
-        0,
-        length(emqx_s3_profile_http_pools:all(profile_id()))
-    ).
-
-t_timeout_pool_cleanup(Config) ->
-    _ = process_flag(trap_exit, true),
-
-    %% We restart the profile to set `http_pool_timeout` value suitable for test
-    ok = emqx_s3:stop_profile(profile_id()),
-    ProfileBaseConfig = ?config(profile_config, Config),
-    ProfileConfig = ProfileBaseConfig#{
-        http_pool_timeout => 500,
-        http_pool_cleanup_interval => 100
-    },
-    ok = emqx_s3:start_profile(profile_id(), ProfileConfig),
-
-    %% Start uploader
-    Key = emqx_s3_test_helpers:unique_key(),
-    {ok, Pid} = emqx_s3:start_uploader(profile_id(), Key, #{}),
-    ok = emqx_s3_uploader:write(Pid, <<"data">>),
-
-    [OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
-
-    NewProfileConfig = emqx_utils_maps:deep_put(
-        [transport_options, pool_size], ProfileConfig, 16
-    ),
-
-    %% We update profile to create new pool and wait for the old one to be stopped by timeout
-    ?assertWaitEvent(
-        ok = emqx_s3:update_profile(profile_id(), NewProfileConfig),
-        #{?snk_kind := "s3_stop_http_pool", pool_name := OldPool},
-        1000
-    ),
-
-    %% The uploader now has no valid pool and should fail
-    ?assertMatch(
-        {error, _},
-        emqx_s3_uploader:complete(Pid)
-    ).
-
 t_checkout_no_profile(_Config) ->
     ?assertEqual(
         {error, profile_not_found},
@@ -231,13 +159,6 @@ t_checkout_client(Config) ->
     ),
     ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
 
-    %% We should have two pools now, because the old one is still in use
-    %% by the spawned process
-    ?assertEqual(
-        2,
-        length(emqx_s3_profile_http_pools:all(ProfileId))
-    ),
-
     %% Ask spawned process to list objects
     Pid ! list_objects,
     receive