Bläddra i källkod

Merge pull request #10825 from fix/EMQX-9991/bugs

fix(ft): avoid leaking secrets
Andrew Mayorov 2 år sedan
förälder
incheckning
45b128946a

+ 4 - 0
apps/emqx/src/emqx_secret.erl

@@ -21,6 +21,10 @@
 %% API:
 -export([wrap/1, unwrap/1]).
 
+-export_type([t/1]).
+
+-opaque t(T) :: T | fun(() -> t(T)).
+
 %%================================================================================
 %% API funcions
 %%================================================================================

+ 4 - 3
apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl

@@ -79,7 +79,7 @@ start_export(_Options, Transfer, Filemeta) ->
 -spec write(export_st(), iodata()) ->
     {ok, export_st()} | {error, term()}.
 write(#{pid := Pid} = ExportSt, IoData) ->
-    case emqx_s3_uploader:write(Pid, IoData) of
+    case emqx_s3_uploader:write(Pid, IoData, emqx_ft_conf:store_segment_timeout()) of
         ok ->
             {ok, ExportSt};
         {error, _Reason} = Error ->
@@ -89,12 +89,13 @@ write(#{pid := Pid} = ExportSt, IoData) ->
 -spec complete(export_st(), emqx_ft:checksum()) ->
     ok | {error, term()}.
 complete(#{pid := Pid} = _ExportSt, _Checksum) ->
-    emqx_s3_uploader:complete(Pid).
+    emqx_s3_uploader:complete(Pid, emqx_ft_conf:assemble_timeout()).
 
 -spec discard(export_st()) ->
     ok.
 discard(#{pid := Pid} = _ExportSt) ->
-    emqx_s3_uploader:abort(Pid).
+    % NOTE: will abort upload asynchronously if needed
+    emqx_s3_uploader:shutdown(Pid).
 
 -spec list(options(), query()) ->
     {ok, page(exportinfo())} | {error, term()}.

+ 6 - 2
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -267,8 +267,12 @@ lookup_assembler([Source | Sources]) ->
 
 check_if_already_exported(Storage, Transfer) ->
     case files(Storage, #{transfer => Transfer}) of
-        {ok, #{items := [_ | _]}} -> ok;
-        _ -> {error, not_found}
+        {ok, #{items := [_ | _]}} ->
+            % NOTE: we don't know coverage here, let's just clean up locally.
+            _ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]),
+            ok;
+        _ ->
+            {error, not_found}
     end.
 
 lookup_local_assembler(Transfer) ->

+ 44 - 8
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -266,6 +266,38 @@ t_gc_incomplete_transfers(_Config) ->
         0
     ).
 
+t_gc_repeated_transfer(_Config) ->
+    {local, Storage} = emqx_ft_storage:backend(),
+    Transfer = {
+        TID = {<<"clientclient">>, mk_file_id()},
+        #{name => "repeat.please", segments_ttl => 10},
+        emqx_ft_content_gen:new({?LINE, Size = 42}, 16)
+    },
+    Size = start_transfer(Storage, Transfer),
+    {ok, {ok, #{stats := Stats1}}} = ?wait_async_action(
+        ?assertEqual(ok, complete_transfer(Storage, TID, Size)),
+        #{?snk_kind := garbage_collection},
+        1000
+    ),
+    Size = start_transfer(Storage, Transfer),
+    {ok, {ok, #{stats := Stats2}}} = ?wait_async_action(
+        ?assertEqual(ok, complete_transfer(Storage, TID, Size)),
+        #{?snk_kind := garbage_collection},
+        1000
+    ),
+    ?assertMatch(
+        #gcstats{files = 4, directories = 2},
+        Stats1
+    ),
+    ?assertMatch(
+        #gcstats{files = 4, directories = 2},
+        Stats2
+    ),
+    ?assertEqual(
+        {ok, []},
+        emqx_ft_storage_fs:list(Storage, TID, fragment)
+    ).
+
 t_gc_handling_errors(_Config) ->
     ok = set_gc_config(minimum_segments_ttl, 0),
     ok = set_gc_config(maximum_segments_ttl, 0),
@@ -349,14 +381,18 @@ complete_transfer(Storage, Transfer, Size) ->
     complete_transfer(Storage, Transfer, Size, 100).
 
 complete_transfer(Storage, Transfer, Size, Timeout) ->
-    {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
-    MRef = erlang:monitor(process, Pid),
-    Pid ! kickoff,
-    receive
-        {'DOWN', MRef, process, Pid, {shutdown, Result}} ->
-            Result
-    after Timeout ->
-        ct:fail("Assembler did not finish in time")
+    case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of
+        ok ->
+            ok;
+        {async, Pid} ->
+            MRef = erlang:monitor(process, Pid),
+            Pid ! kickoff,
+            receive
+                {'DOWN', MRef, process, Pid, {shutdown, Result}} ->
+                    Result
+            after Timeout ->
+                ct:fail("Assembler did not finish in time")
+            end
     end.
 
 mk_file_id() ->

+ 1 - 1
apps/emqx_s3/src/emqx_s3.erl

@@ -44,7 +44,7 @@
 -type profile_config() :: #{
     bucket := string(),
     access_key_id => string(),
-    secret_access_key => string(),
+    secret_access_key => emqx_secret:t(string()),
     host := string(),
     port := pos_integer(),
     url_expire_time := pos_integer(),

+ 2 - 2
apps/emqx_s3/src/emqx_s3_client.erl

@@ -60,7 +60,7 @@
     acl := emqx_s3:acl() | undefined,
     url_expire_time := pos_integer(),
     access_key_id := string() | undefined,
-    secret_access_key := string() | undefined,
+    secret_access_key := emqx_secret:t(string()) | undefined,
     http_pool := http_pool(),
     pool_type := pool_type(),
     request_timeout := timeout() | undefined,
@@ -230,7 +230,7 @@ aws_config(#{
         s3_bucket_after_host = true,
 
         access_key_id = AccessKeyId,
-        secret_access_key = SecretAccessKey,
+        secret_access_key = emqx_secret:unwrap(SecretAccessKey),
 
         http_client = request_fun(
             HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)

+ 3 - 3
apps/emqx_s3/src/emqx_s3_profile_conf.erl

@@ -11,7 +11,7 @@
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--include("src/emqx_s3.hrl").
+-include("emqx_s3.hrl").
 
 -export([
     start_link/2,
@@ -377,10 +377,10 @@ stop_http_pool(ProfileId, PoolName) ->
     ok = ?tp(debug, "s3_stop_http_pool", #{pool_name => PoolName}).
 
 do_start_http_pool(PoolName, HttpConfig) ->
-    ?SLOG(warning, #{msg => "s3_start_http_pool", pool_name => PoolName, config => HttpConfig}),
+    ?SLOG(debug, #{msg => "s3_starting_http_pool", pool_name => PoolName, config => HttpConfig}),
     case ehttpc_sup:start_pool(PoolName, HttpConfig) of
         {ok, _} ->
-            ?SLOG(warning, #{msg => "s3_start_http_pool_success", pool_name => PoolName}),
+            ?SLOG(info, #{msg => "s3_start_http_pool_success", pool_name => PoolName}),
             ok;
         {error, _} = Error ->
             ?SLOG(error, #{msg => "s3_start_http_pool_fail", pool_name => PoolName, error => Error}),

+ 12 - 3
apps/emqx_s3/src/emqx_s3_schema.erl

@@ -34,11 +34,12 @@ fields(s3) ->
             )},
         {secret_access_key,
             mk(
-                string(),
+                hoconsc:union([string(), function()]),
                 #{
                     desc => ?DESC("secret_access_key"),
                     required => false,
-                    sensitive => true
+                    sensitive => true,
+                    converter => fun secret/2
                 }
             )},
         {bucket,
@@ -124,7 +125,7 @@ fields(transport_options) ->
             mk(
                 boolean(),
                 #{
-                    default => true,
+                    default => false,
                     desc => ?DESC("ipv6_probe"),
                     required => false
                 }
@@ -142,6 +143,14 @@ desc(s3) ->
 desc(transport_options) ->
     "Options for the HTTP transport layer used by the S3 client".
 
+secret(undefined, #{}) ->
+    undefined;
+secret(Secret, #{make_serializable := true}) ->
+    unicode:characters_to_binary(emqx_secret:unwrap(Secret));
+secret(Secret, #{}) ->
+    _ = is_binary(Secret) orelse throw({expected_type, string}),
+    emqx_secret:wrap(unicode:characters_to_list(Secret)).
+
 translate(Conf) ->
     translate(Conf, #{}).
 

+ 8 - 1
apps/emqx_s3/src/emqx_s3_uploader.erl

@@ -18,7 +18,9 @@
     complete/2,
 
     abort/1,
-    abort/2
+    abort/2,
+
+    shutdown/1
 ]).
 
 -export([
@@ -87,6 +89,11 @@ abort(Pid) ->
 abort(Pid, Timeout) ->
     gen_statem:call(Pid, abort, Timeout).
 
+-spec shutdown(pid()) -> ok.
+shutdown(Pid) ->
+    _ = erlang:exit(Pid, shutdown),
+    ok.
+
 %%--------------------------------------------------------------------
 %% gen_statem callbacks
 %%--------------------------------------------------------------------

+ 22 - 2
apps/emqx_s3/test/emqx_s3_schema_SUITE.erl

@@ -49,7 +49,7 @@ t_full_config(_Config) ->
             host := "s3.us-east-1.endpoint.com",
             min_part_size := 10485760,
             port := 443,
-            secret_access_key := "secret_access_key",
+            secret_access_key := Secret,
             transport_options :=
                 #{
                     connect_timeout := 30000,
@@ -74,7 +74,7 @@ t_full_config(_Config) ->
                             versions := ['tlsv1.2']
                         }
                 }
-        },
+        } when is_function(Secret),
         emqx_s3_schema:translate(#{
             <<"access_key_id">> => <<"access_key_id">>,
             <<"secret_access_key">> => <<"secret_access_key">>,
@@ -126,6 +126,26 @@ t_sensitive_config_hidden(_Config) ->
         )
     ).
 
+t_sensitive_config_no_leak(_Config) ->
+    ?assertThrow(
+        {emqx_s3_schema, [
+            Error = #{
+                kind := validation_error,
+                path := "s3.secret_access_key",
+                reason := {expected_type, string}
+            }
+        ]} when map_size(Error) == 3,
+        emqx_s3_schema:translate(
+            #{
+                <<"bucket">> => <<"bucket">>,
+                <<"host">> => <<"s3.us-east-1.endpoint.com">>,
+                <<"port">> => 443,
+                <<"access_key_id">> => <<"access_key_id">>,
+                <<"secret_access_key">> => #{<<"1">> => <<"secret_access_key">>}
+            }
+        )
+    ).
+
 t_invalid_limits(_Config) ->
     ?assertException(
         throw,