Kaynağa Gözat

feat(ft-conf): preprocess TLS configuration on updates

Specifically, reify keys and certificates into files in the file system
and update the configuration to point to those files.
Andrew Mayorov 2 yıl önce
ebeveyn
işleme
34793c5ed0

+ 68 - 17
apps/emqx_ft/src/emqx_ft_conf.erl

@@ -34,7 +34,9 @@
 %% Load/Unload
 -export([
     load/0,
-    unload/0
+    unload/0,
+    get/0,
+    update/1
 ]).
 
 %% callbacks for emqx_config_handler
@@ -43,6 +45,8 @@
     post_config_update/5
 ]).
 
+-type update_request() :: emqx_config:config().
+
 -type milliseconds() :: non_neg_integer().
 -type seconds() :: non_neg_integer().
 
@@ -95,49 +99,96 @@ load() ->
 
 -spec unload() -> ok.
 unload() ->
-    ok = stop(),
-    emqx_conf:remove_handler([file_transfer]).
+    ok = emqx_conf:remove_handler([file_transfer]),
+    maybe_stop().
+
+-spec get() -> emqx_config:config().
+get() ->
+    emqx_config:get([file_transfer]).
+
+-spec update(emqx_config:config()) -> {ok, emqx_config:update_result()} | {error, term()}.
+update(Config) ->
+    emqx_conf:update([file_transfer], Config, #{override_to => cluster}).
 
 %%--------------------------------------------------------------------
 %% emqx_config_handler callbacks
 %%--------------------------------------------------------------------
 
--spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) ->
+-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
     {ok, emqx_config:update_request()} | {error, term()}.
-pre_config_update(_, Req, _Config) ->
-    {ok, Req}.
+pre_config_update([file_transfer | _], NewConfig, OldConfig) ->
+    propagate_config_update(
+        fun emqx_ft_storage_exporter_s3:pre_config_update/3,
+        [<<"storage">>, <<"local">>, <<"exporter">>, <<"s3">>],
+        NewConfig,
+        OldConfig
+    ).
 
 -spec post_config_update(
     list(atom()),
-    emqx_config:update_request(),
+    update_request(),
     emqx_config:config(),
     emqx_config:config(),
     emqx_config:app_envs()
 ) ->
     ok | {ok, Result :: any()} | {error, Reason :: term()}.
 post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
-    on_config_update(OldConfig, NewConfig).
+    PropResult = propagate_config_update(
+        fun emqx_ft_storage_exporter_s3:post_config_update/3,
+        [storage, local, exporter, s3],
+        NewConfig,
+        OldConfig
+    ),
+    case PropResult of
+        ok ->
+            on_config_update(OldConfig, NewConfig);
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+propagate_config_update(Fun, ConfKey, NewConfig, OldConfig) ->
+    NewSubConf = emqx_utils_maps:deep_get(ConfKey, NewConfig, undefined),
+    OldSubConf = emqx_utils_maps:deep_get(ConfKey, OldConfig, undefined),
+    case Fun(ConfKey, NewSubConf, OldSubConf) of
+        ok ->
+            ok;
+        {ok, undefined} ->
+            {ok, NewConfig};
+        {ok, NewSubConfUpdate} ->
+            {ok, emqx_utils_maps:deep_put(ConfKey, NewConfig, NewSubConfUpdate)};
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
 on_config_update(#{enable := false}, #{enable := false}) ->
     ok;
 on_config_update(#{enable := true, storage := OldStorage}, #{enable := false}) ->
-    ok = emqx_ft_storage:on_config_update(OldStorage, undefined),
-    ok = emqx_ft:unhook();
+    ok = stop(OldStorage);
 on_config_update(#{enable := false}, #{enable := true, storage := NewStorage}) ->
-    ok = emqx_ft_storage:on_config_update(undefined, NewStorage),
-    ok = emqx_ft:hook();
+    ok = start(NewStorage);
 on_config_update(#{enable := true, storage := OldStorage}, #{enable := true, storage := NewStorage}) ->
-    ok = emqx_ft_storage:on_config_update(OldStorage, NewStorage).
+    ok = emqx_ft_storage:update_config(OldStorage, NewStorage).
 
 maybe_start() ->
     case emqx_config:get([file_transfer]) of
         #{enable := true, storage := Storage} ->
-            ok = emqx_ft_storage:on_config_update(undefined, Storage),
-            ok = emqx_ft:hook();
+            start(Storage);
         _ ->
             ok
     end.
 
-stop() ->
+maybe_stop() ->
+    case emqx_config:get([file_transfer]) of
+        #{enable := true, storage := Storage} ->
+            stop(Storage);
+        _ ->
+            ok
+    end.
+
+start(Storage) ->
+    ok = emqx_ft_storage:update_config(undefined, Storage),
+    ok = emqx_ft:hook().
+
+stop(Storage) ->
     ok = emqx_ft:unhook(),
-    ok = emqx_ft_storage:on_config_update(storage(), undefined).
+    ok = emqx_ft_storage:update_config(Storage, undefined).

+ 13 - 11
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_ft_storage).
 
+-include_lib("emqx/include/types.hrl").
+
 -export(
     [
         store_filemeta/2,
@@ -29,7 +31,7 @@
         with_storage_type/3,
 
         backend/0,
-        on_config_update/2
+        update_config/2
     ]
 ).
 
@@ -94,10 +96,10 @@
 -callback files(storage(), query(Cursor)) ->
     {ok, page(file_info(), Cursor)} | {error, term()}.
 
--callback start(emqx_config:config()) -> any().
--callback stop(emqx_config:config()) -> any().
+-callback start(storage()) -> any().
+-callback stop(storage()) -> any().
 
--callback on_config_update(_OldConfig :: emqx_config:config(), _NewConfig :: emqx_config:config()) ->
+-callback update_config(_OldConfig :: maybe(storage()), _NewConfig :: maybe(storage())) ->
     any().
 
 %%--------------------------------------------------------------------
@@ -157,9 +159,9 @@ with_storage_type(Type, Fun, Args) ->
 backend() ->
     backend(emqx_ft_conf:storage()).
 
--spec on_config_update(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
+-spec update_config(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
     ok.
-on_config_update(ConfigOld, ConfigNew) ->
+update_config(ConfigOld, ConfigNew) ->
     on_backend_update(
         emqx_maybe:apply(fun backend/1, ConfigOld),
         emqx_maybe:apply(fun backend/1, ConfigNew)
@@ -168,13 +170,13 @@ on_config_update(ConfigOld, ConfigNew) ->
 on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
     ok;
 on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
-    ok = (mod(Type)):on_config_update(StorageOld, StorageNew);
+    ok = (mod(Type)):update_config(StorageOld, StorageNew);
 on_backend_update(BackendOld, BackendNew) when
     (BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
         (BackendNew =:= undefined orelse is_tuple(BackendNew))
 ->
-    _ = emqx_maybe:apply(fun on_storage_stop/1, BackendOld),
-    _ = emqx_maybe:apply(fun on_storage_start/1, BackendNew),
+    _ = emqx_maybe:apply(fun stop_backend/1, BackendOld),
+    _ = emqx_maybe:apply(fun start_backend/1, BackendNew),
     ok.
 
 %%--------------------------------------------------------------------
@@ -185,10 +187,10 @@ on_backend_update(BackendOld, BackendNew) when
 backend(Config) ->
     emqx_ft_schema:backend(Config).
 
-on_storage_start({Type, Storage}) ->
+start_backend({Type, Storage}) ->
     (mod(Type)):start(Storage).
 
-on_storage_stop({Type, Storage}) ->
+stop_backend({Type, Storage}) ->
     (mod(Type)):stop(Storage).
 
 mod(local) ->

+ 5 - 5
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -31,7 +31,7 @@
 -export([list/2]).
 
 %% Lifecycle API
--export([on_config_update/2]).
+-export([update_config/2]).
 
 %% Internal API
 -export([exporter/1]).
@@ -81,7 +81,7 @@
 -callback stop(exporter_conf()) ->
     ok.
 
--callback update(exporter_conf(), exporter_conf()) ->
+-callback update_config(exporter_conf(), exporter_conf()) ->
     ok | {error, _Reason}.
 
 %%------------------------------------------------------------------------------
@@ -148,8 +148,8 @@ list(Storage, Query) ->
 
 %% Lifecycle
 
--spec on_config_update(storage(), storage()) -> ok | {error, term()}.
-on_config_update(StorageOld, StorageNew) ->
+-spec update_config(storage(), storage()) -> ok | {error, term()}.
+update_config(StorageOld, StorageNew) ->
     on_exporter_update(
         emqx_maybe:apply(fun exporter/1, StorageOld),
         emqx_maybe:apply(fun exporter/1, StorageNew)
@@ -158,7 +158,7 @@ on_config_update(StorageOld, StorageNew) ->
 on_exporter_update(Config, Config) ->
     ok;
 on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
-    ExporterMod:update(ConfigOld, ConfigNew);
+    ExporterMod:update_config(ConfigOld, ConfigNew);
 on_exporter_update(ExporterOld, ExporterNew) ->
     _ = emqx_maybe:apply(fun stop/1, ExporterOld),
     _ = emqx_maybe:apply(fun start/1, ExporterNew),

+ 3 - 3
apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl

@@ -31,7 +31,7 @@
 -export([
     start/1,
     stop/1,
-    update/2
+    update_config/2
 ]).
 
 %% Internal API for RPC
@@ -161,8 +161,8 @@ start(_Options) -> ok.
 -spec stop(options()) -> ok.
 stop(_Options) -> ok.
 
--spec update(options(), options()) -> ok.
-update(_OldOptions, _NewOptions) -> ok.
+-spec update_config(options(), options()) -> ok.
+update_config(_OldOptions, _NewOptions) -> ok.
 
 %%--------------------------------------------------------------------
 %% Internal API

+ 19 - 4
apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl

@@ -28,7 +28,12 @@
 -export([
     start/1,
     stop/1,
-    update/2
+    update_config/2
+]).
+
+-export([
+    pre_config_update/3,
+    post_config_update/3
 ]).
 
 -type options() :: emqx_s3:profile_config().
@@ -112,12 +117,22 @@ start(Options) ->
 
 -spec stop(options()) -> ok.
 stop(_Options) ->
-    ok = emqx_s3:stop_profile(?S3_PROFILE_ID).
+    emqx_s3:stop_profile(?S3_PROFILE_ID).
 
--spec update(options(), options()) -> ok.
-update(_OldOptions, NewOptions) ->
+-spec update_config(options(), options()) -> ok.
+update_config(_OldOptions, NewOptions) ->
     emqx_s3:update_profile(?S3_PROFILE_ID, NewOptions).
 
+%%--------------------------------------------------------------------
+%% Config update hooks
+%%--------------------------------------------------------------------
+
+pre_config_update(_ConfKey, NewOptions, OldOptions) ->
+    emqx_s3:pre_config_update(?S3_PROFILE_ID, NewOptions, OldOptions).
+
+post_config_update(_ConfKey, NewOptions, OldOptions) ->
+    emqx_s3:post_config_update(?S3_PROFILE_ID, NewOptions, OldOptions).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %% -------------------------------------------------------------------

+ 5 - 5
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -48,9 +48,9 @@
 
 -export([files/2]).
 
--export([on_config_update/2]).
 -export([start/1]).
 -export([stop/1]).
+-export([update_config/2]).
 
 -export_type([storage/0]).
 -export_type([filefrag/1]).
@@ -230,10 +230,10 @@ files(Storage, Query) ->
 
 %%
 
-on_config_update(StorageOld, StorageNew) ->
+update_config(StorageOld, StorageNew) ->
     % NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely
     ok = emqx_ft_storage_fs_gc:reset(StorageNew),
-    emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
+    emqx_ft_storage_exporter:update_config(StorageOld, StorageNew).
 
 start(Storage) ->
     ok = lists:foreach(
@@ -242,11 +242,11 @@ start(Storage) ->
         end,
         child_spec(Storage)
     ),
-    ok = emqx_ft_storage_exporter:on_config_update(undefined, Storage),
+    ok = emqx_ft_storage_exporter:update_config(undefined, Storage),
     ok.
 
 stop(Storage) ->
-    ok = emqx_ft_storage_exporter:on_config_update(Storage, undefined),
+    ok = emqx_ft_storage_exporter:update_config(Storage, undefined),
     ok = lists:foreach(
         fun(#{id := ChildId}) ->
             _ = supervisor:terminate_child(emqx_ft_sup, ChildId),

+ 101 - 29
apps/emqx_ft/test/emqx_ft_conf_SUITE.erl

@@ -53,16 +53,13 @@ end_per_testcase(_Case, Config) ->
 t_update_config(_Config) ->
     ?assertMatch(
         {error, #{kind := validation_error}},
-        emqx_conf:update(
-            [file_transfer],
-            #{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}},
-            #{}
+        emqx_ft_conf:update(
+            #{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}}
         )
     ),
     ?assertMatch(
         {ok, _},
-        emqx_conf:update(
-            [file_transfer],
+        emqx_ft_conf:update(
             #{
                 <<"enable">> => true,
                 <<"storage">> => #{
@@ -81,8 +78,7 @@ t_update_config(_Config) ->
                         }
                     }
                 }
-            },
-            #{}
+            }
         )
     ),
     ?assertEqual(
@@ -101,13 +97,8 @@ t_update_config(_Config) ->
 t_disable_restore_config(Config) ->
     ?assertMatch(
         {ok, _},
-        emqx_conf:update(
-            [file_transfer],
-            #{
-                <<"enable">> => true,
-                <<"storage">> => #{<<"local">> => #{}}
-            },
-            #{}
+        emqx_ft_conf:update(
+            #{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}}
         )
     ),
     ?assertEqual(
@@ -119,11 +110,7 @@ t_disable_restore_config(Config) ->
     % Verify that clearing storage settings reverts config to defaults
     ?assertMatch(
         {ok, _},
-        emqx_conf:update(
-            [file_transfer],
-            #{<<"enable">> => false, <<"storage">> => undefined},
-            #{}
-        )
+        emqx_ft_conf:update(#{<<"enable">> => false, <<"storage">> => undefined})
     ),
     ?assertEqual(
         false,
@@ -155,8 +142,7 @@ t_disable_restore_config(Config) ->
     Root = emqx_ft_test_helpers:root(Config, node(), [segments]),
     ?assertMatch(
         {ok, _},
-        emqx_conf:update(
-            [file_transfer],
+        emqx_ft_conf:update(
             #{
                 <<"enable">> => true,
                 <<"storage">> => #{
@@ -167,8 +153,7 @@ t_disable_restore_config(Config) ->
                         }
                     }
                 }
-            },
-            #{}
+            }
         )
     ),
     % Verify that GC is getting triggered eventually
@@ -192,11 +177,7 @@ t_disable_restore_config(Config) ->
 t_switch_exporter(_Config) ->
     ?assertMatch(
         {ok, _},
-        emqx_conf:update(
-            [file_transfer],
-            #{<<"enable">> => true},
-            #{}
-        )
+        emqx_ft_conf:update(#{<<"enable">> => true})
     ),
     ?assertMatch(
         #{local := #{exporter := #{local := _}}},
@@ -248,5 +229,96 @@ t_switch_exporter(_Config) ->
     % Verify that transfers work
     ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>).
 
+t_persist_ssl_certfiles(Config) ->
+    ?assertMatch(
+        {ok, _},
+        emqx_ft_conf:update(mk_storage(true))
+    ),
+    ?assertEqual(
+        [],
+        list_ssl_certfiles(Config)
+    ),
+    S3Config = #{
+        <<"bucket">> => <<"emqx">>,
+        <<"host">> => <<"https://localhost">>,
+        <<"port">> => 9000
+    },
+    ?assertMatch(
+        {error, {pre_config_update, _, {bad_ssl_config, #{}}}},
+        emqx_ft_conf:update(
+            mk_storage(true, #{
+                <<"s3">> => S3Config#{
+                    <<"transport_options">> => #{
+                        <<"ssl">> => #{
+                            <<"certfile">> => <<"cert.pem">>,
+                            <<"keyfile">> => <<"key.pem">>
+                        }
+                    }
+                }
+            })
+        )
+    ),
+    ?assertMatch(
+        {ok, _},
+        emqx_ft_conf:update(
+            mk_storage(false, #{
+                <<"s3">> => S3Config#{
+                    <<"transport_options">> => #{
+                        <<"ssl">> => #{
+                            <<"certfile">> => emqx_ft_test_helpers:pem_privkey(),
+                            <<"keyfile">> => emqx_ft_test_helpers:pem_privkey()
+                        }
+                    }
+                }
+            })
+        )
+    ),
+    ?assertMatch(
+        #{
+            local := #{
+                exporter := #{
+                    s3 := #{
+                        transport_options := #{
+                            ssl := #{
+                                certfile := <<"/", _CertFilepath/binary>>,
+                                keyfile := <<"/", _KeyFilepath/binary>>
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        emqx_ft_conf:storage()
+    ),
+    ?assertMatch(
+        [_Certfile, _Keyfile],
+        list_ssl_certfiles(Config)
+    ),
+    ?assertMatch(
+        {ok, _},
+        emqx_ft_conf:update(mk_storage(true))
+    ),
+    ?assertEqual(
+        [],
+        list_ssl_certfiles(Config)
+    ).
+
+mk_storage(Enabled) ->
+    mk_storage(Enabled, #{<<"local">> => #{}}).
+
+mk_storage(Enabled, Exporter) ->
+    #{
+        <<"enable">> => Enabled,
+        <<"storage">> => #{
+            <<"local">> => #{
+                <<"exporter">> => Exporter
+            }
+        }
+    }.
+
 gen_clientid() ->
     emqx_base62:encode(emqx_guid:gen()).
+
+list_ssl_certfiles(_Config) ->
+    CertDir = emqx:mutable_certs_dir(),
+    filelib:fold_files(CertDir, ".*", true, fun(Filepath, Acc) -> [Filepath | Acc] end, []).

+ 10 - 0
apps/emqx_ft/test/emqx_ft_test_helpers.erl

@@ -136,3 +136,13 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
 
 aws_config() ->
     emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT).
+
+pem_privkey() ->
+    <<
+        "\n"
+        "-----BEGIN EC PRIVATE KEY-----\n"
+        "MHQCAQEEICKTbbathzvD8zvgjL7qRHhW4alS0+j0Loo7WeYX9AxaoAcGBSuBBAAK\n"
+        "oUQDQgAEJBdF7MIdam5T4YF3JkEyaPKdG64TVWCHwr/plC0QzNVJ67efXwxlVGTo\n"
+        "ju0VBj6tOX1y6C0U+85VOM0UU5xqvw==\n"
+        "-----END EC PRIVATE KEY-----\n"
+    >>.

+ 37 - 0
apps/emqx_s3/src/emqx_s3.erl

@@ -14,6 +14,11 @@
     with_client/2
 ]).
 
+-export([
+    pre_config_update/3,
+    post_config_update/3
+]).
+
 -export_type([
     profile_id/0,
     profile_config/0,
@@ -94,3 +99,35 @@ with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(Prof
         {error, _} = Error ->
             Error
     end.
+
+%%
+
+-spec pre_config_update(
+    profile_id(), maybe(emqx_config:raw_config()), maybe(emqx_config:raw_config())
+) ->
+    {ok, maybe(profile_config())} | {error, term()}.
+pre_config_update(ProfileId, NewConfig = #{<<"transport_options">> := TransportOpts}, _OldConfig) ->
+    case emqx_connector_ssl:convert_certs(mk_certs_dir(ProfileId), TransportOpts) of
+        {ok, TransportOptsConv} ->
+            {ok, NewConfig#{<<"transport_options">> := TransportOptsConv}};
+        {error, Reason} ->
+            {error, Reason}
+    end;
+pre_config_update(_ProfileId, NewConfig, _OldConfig) ->
+    {ok, NewConfig}.
+
+-spec post_config_update(
+    profile_id(),
+    maybe(emqx_config:config()),
+    maybe(emqx_config:config())
+) ->
+    ok.
+post_config_update(ProfileId, NewConfig, OldConfig) ->
+    emqx_connector_ssl:try_clear_certs(
+        mk_certs_dir(ProfileId),
+        maps:get(transport_options, emqx_maybe:define(NewConfig, #{}), undefined),
+        maps:get(transport_options, emqx_maybe:define(OldConfig, #{}), undefined)
+    ).
+
+mk_certs_dir(ProfileId) ->
+    filename:join([s3, profiles, ProfileId]).