Просмотр исходного кода

Merge pull request #7694 from terry-xiaoyu/mqtt_bridge_issues

feat: save ssl cert files for data bridges
Xinyu Liu 3 лет назад
Родитель
Сommit
0986a1c8c4

+ 3 - 3
apps/emqx/src/emqx_schema.erl

@@ -1954,7 +1954,7 @@ common_ssl_opts_schema(Defaults) ->
             )},
             )},
         {"cacertfile",
         {"cacertfile",
             sc(
             sc(
-                string(),
+                binary(),
                 #{
                 #{
                     default => D("cacertfile"),
                     default => D("cacertfile"),
                     required => false,
                     required => false,
@@ -1970,7 +1970,7 @@ common_ssl_opts_schema(Defaults) ->
             )},
             )},
         {"certfile",
         {"certfile",
             sc(
             sc(
-                string(),
+                binary(),
                 #{
                 #{
                     default => D("certfile"),
                     default => D("certfile"),
                     required => false,
                     required => false,
@@ -1985,7 +1985,7 @@ common_ssl_opts_schema(Defaults) ->
             )},
             )},
         {"keyfile",
         {"keyfile",
             sc(
             sc(
-                string(),
+                binary(),
                 #{
                 #{
                     default => D("keyfile"),
                     default => D("keyfile"),
                     required => false,
                     required => false,

+ 49 - 18
apps/emqx/src/emqx_tls_lib.erl

@@ -32,7 +32,8 @@
     ensure_ssl_files/2,
     ensure_ssl_files/2,
     delete_ssl_files/3,
     delete_ssl_files/3,
     drop_invalid_certs/1,
     drop_invalid_certs/1,
-    is_valid_pem_file/1
+    is_valid_pem_file/1,
+    is_pem/1
 ]).
 ]).
 
 
 -export([
 -export([
@@ -281,8 +282,10 @@ ensure_ssl_files(_Dir, undefined, _DryRun) ->
     {ok, undefined};
     {ok, undefined};
 ensure_ssl_files(_Dir, #{<<"enable">> := False} = Opts, _DryRun) when ?IS_FALSE(False) ->
 ensure_ssl_files(_Dir, #{<<"enable">> := False} = Opts, _DryRun) when ?IS_FALSE(False) ->
     {ok, Opts};
     {ok, Opts};
+ensure_ssl_files(_Dir, #{enable := False} = Opts, _DryRun) when ?IS_FALSE(False) ->
+    {ok, Opts};
 ensure_ssl_files(Dir, Opts, DryRun) ->
 ensure_ssl_files(Dir, Opts, DryRun) ->
-    ensure_ssl_files(Dir, Opts, ?SSL_FILE_OPT_NAMES, DryRun).
+    ensure_ssl_files(Dir, Opts, ?SSL_FILE_OPT_NAMES ++ ?SSL_FILE_OPT_NAMES_A, DryRun).
 
 
 ensure_ssl_files(_Dir, Opts, [], _DryRun) ->
 ensure_ssl_files(_Dir, Opts, [], _DryRun) ->
     {ok, Opts};
     {ok, Opts};
@@ -306,18 +309,20 @@ delete_ssl_files(Dir, NewOpts0, OldOpts0) ->
     end,
     end,
     lists:foreach(
     lists:foreach(
         fun(Key) -> delete_old_file(Get(Key, NewOpts), Get(Key, OldOpts)) end,
         fun(Key) -> delete_old_file(Get(Key, NewOpts), Get(Key, OldOpts)) end,
-        ?SSL_FILE_OPT_NAMES
-    ).
+        ?SSL_FILE_OPT_NAMES ++ ?SSL_FILE_OPT_NAMES_A
+    ),
+    %% try to delete the dir if it is empty
+    _ = file:del_dir(pem_dir(Dir)),
+    ok.
 
 
 delete_old_file(New, Old) when New =:= Old -> ok;
 delete_old_file(New, Old) when New =:= Old -> ok;
 delete_old_file(_New, _Old = undefined) ->
 delete_old_file(_New, _Old = undefined) ->
     ok;
     ok;
 delete_old_file(_New, Old) ->
 delete_old_file(_New, Old) ->
-    case filelib:is_regular(Old) andalso file:delete(Old) of
+    case is_generated_file(Old) andalso filelib:is_regular(Old) andalso file:delete(Old) of
         ok ->
         ok ->
             ok;
             ok;
-        %% already deleted
-        false ->
+        false -> %% the file is not generated by us, or it is already deleted
             ok;
             ok;
         {error, Reason} ->
         {error, Reason} ->
             ?SLOG(error, #{msg => "failed_to_delete_ssl_file", file_path => Old, reason => Reason})
             ?SLOG(error, #{msg => "failed_to_delete_ssl_file", file_path => Old, reason => Reason})
@@ -391,11 +396,33 @@ save_pem_file(Dir, Key, Pem, DryRun) ->
 %% the filename is prefixed by the option name without the 'file' part
 %% the filename is prefixed by the option name without the 'file' part
 %% and suffixed with the first 8 byets the PEM content's md5 checksum.
 %% and suffixed with the first 8 byets the PEM content's md5 checksum.
 %% e.g. key-1234567890abcdef, cert-1234567890abcdef, and cacert-1234567890abcdef
 %% e.g. key-1234567890abcdef, cert-1234567890abcdef, and cacert-1234567890abcdef
+is_generated_file(Filename) ->
+    case string:split(filename:basename(Filename), "-") of
+        [_Name, Suffix] -> is_hex_str(Suffix);
+        _ -> false
+    end.
+
 pem_file_name(Dir, Key, Pem) ->
 pem_file_name(Dir, Key, Pem) ->
     <<CK:8/binary, _/binary>> = crypto:hash(md5, Pem),
     <<CK:8/binary, _/binary>> = crypto:hash(md5, Pem),
     Suffix = hex_str(CK),
     Suffix = hex_str(CK),
-    FileName = binary:replace(Key, <<"file">>, <<"-", Suffix/binary>>),
-    filename:join([emqx:mutable_certs_dir(), Dir, FileName]).
+    FileName = binary:replace(ensure_bin(Key), <<"file">>, <<"-", Suffix/binary>>),
+    filename:join([pem_dir(Dir), FileName]).
+
+pem_dir(Dir) ->
+    filename:join([emqx:mutable_certs_dir(), Dir]).
+
+is_hex_str(HexStr) ->
+    try is_hex_str2(ensure_str(HexStr))
+    catch throw: not_hex -> false
+    end.
+
+is_hex_str2(HexStr) ->
+    _ = [case S of
+            S when S >= $0, S =< $9 -> S;
+            S when S >= $a, S =< $f -> S;
+            _ -> throw(not_hex)
+         end || S <- HexStr],
+    true.
 
 
 hex_str(Bin) ->
 hex_str(Bin) ->
     iolist_to_binary([io_lib:format("~2.16.0b", [X]) || <<X:8>> <= Bin]).
     iolist_to_binary([io_lib:format("~2.16.0b", [X]) || <<X:8>> <= Bin]).
@@ -417,20 +444,21 @@ drop_invalid_certs(#{enable := False} = SSL) when ?IS_FALSE(False) ->
 drop_invalid_certs(#{<<"enable">> := False} = SSL) when ?IS_FALSE(False) ->
 drop_invalid_certs(#{<<"enable">> := False} = SSL) when ?IS_FALSE(False) ->
     maps:without(?SSL_FILE_OPT_NAMES, SSL);
     maps:without(?SSL_FILE_OPT_NAMES, SSL);
 drop_invalid_certs(#{enable := True} = SSL) when ?IS_TRUE(True) ->
 drop_invalid_certs(#{enable := True} = SSL) when ?IS_TRUE(True) ->
-    drop_invalid_certs(?SSL_FILE_OPT_NAMES_A, SSL);
+    do_drop_invalid_certs(?SSL_FILE_OPT_NAMES_A, SSL);
 drop_invalid_certs(#{<<"enable">> := True} = SSL) when ?IS_TRUE(True) ->
 drop_invalid_certs(#{<<"enable">> := True} = SSL) when ?IS_TRUE(True) ->
-    drop_invalid_certs(?SSL_FILE_OPT_NAMES, SSL).
+    do_drop_invalid_certs(?SSL_FILE_OPT_NAMES, SSL).
 
 
-drop_invalid_certs([], SSL) ->
+do_drop_invalid_certs([], SSL) ->
     SSL;
     SSL;
-drop_invalid_certs([Key | Keys], SSL) ->
+do_drop_invalid_certs([Key | Keys], SSL) ->
     case maps:get(Key, SSL, undefined) of
     case maps:get(Key, SSL, undefined) of
         undefined ->
         undefined ->
-            drop_invalid_certs(Keys, SSL);
-        Path ->
-            case is_valid_pem_file(Path) of
-                true -> SSL;
-                {error, _} -> maps:without([Key], SSL)
+            do_drop_invalid_certs(Keys, SSL);
+        PemOrPath ->
+            case is_pem(PemOrPath) orelse is_valid_pem_file(PemOrPath) of
+                true -> do_drop_invalid_certs(Keys, SSL);
+                {error, _} ->
+                    do_drop_invalid_certs(Keys, maps:without([Key], SSL))
             end
             end
     end.
     end.
 
 
@@ -476,6 +504,9 @@ ensure_str(undefined) -> undefined;
 ensure_str(L) when is_list(L) -> L;
 ensure_str(L) when is_list(L) -> L;
 ensure_str(B) when is_binary(B) -> unicode:characters_to_list(B, utf8).
 ensure_str(B) when is_binary(B) -> unicode:characters_to_list(B, utf8).
 
 
+ensure_bin(B) when is_binary(B) -> B;
+ensure_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
+
 -if(?OTP_RELEASE > 22).
 -if(?OTP_RELEASE > 22).
 -ifdef(TEST).
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").

+ 16 - 0
apps/emqx/test/emqx_tls_lib_tests.erl

@@ -143,6 +143,22 @@ ssl_files_save_delete_test() ->
     ok = emqx_tls_lib:delete_ssl_files(Dir, undefined, SSL),
     ok = emqx_tls_lib:delete_ssl_files(Dir, undefined, SSL),
     ok.
     ok.
 
 
+ssl_files_handle_non_generated_file_test() ->
+    TmpKeyFile = <<"my-key-file.pem">>,
+    KeyFileContent = bin(test_key()),
+    ok = file:write_file(TmpKeyFile, KeyFileContent),
+    ?assert(filelib:is_regular(TmpKeyFile)),
+    SSL0 = #{<<"keyfile">> => TmpKeyFile},
+    Dir = filename:join(["/tmp", "ssl-test-dir-00"]),
+    {ok, SSL2} = emqx_tls_lib:ensure_ssl_files(Dir, SSL0),
+    File1 = maps:get(<<"keyfile">>, SSL2),
+    %% verify the filename and path is not changed by the emqx_tls_lib
+    ?assertEqual(TmpKeyFile, File1),
+    ok = emqx_tls_lib:delete_ssl_files(Dir, undefined, SSL2),
+    %% verify the file is not delete and not changed, because it is not generated by
+    %% emqx_tls_lib
+    ?assertEqual({ok, KeyFileContent}, file:read_file(TmpKeyFile)).
+
 ssl_file_replace_test() ->
 ssl_file_replace_test() ->
     SSL0 = #{<<"keyfile">> => bin(test_key())},
     SSL0 = #{<<"keyfile">> => bin(test_key())},
     SSL1 = #{<<"keyfile">> => bin(test_key2())},
     SSL1 = #{<<"keyfile">> => bin(test_key2())},

+ 17 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -183,6 +183,7 @@ maybe_with_metrics_example(TypeNameExamp, _) ->
 
 
 info_example_basic(http, _) ->
 info_example_basic(http, _) ->
     #{
     #{
+        enable => true,
         url => <<"http://localhost:9901/messages/${topic}">>,
         url => <<"http://localhost:9901/messages/${topic}">>,
         request_timeout => <<"15s">>,
         request_timeout => <<"15s">>,
         connect_timeout => <<"15s">>,
         connect_timeout => <<"15s">>,
@@ -198,6 +199,7 @@ info_example_basic(http, _) ->
     };
     };
 info_example_basic(mqtt, ingress) ->
 info_example_basic(mqtt, ingress) ->
     #{
     #{
+        enable => true,
         connector => <<"mqtt:my_mqtt_connector">>,
         connector => <<"mqtt:my_mqtt_connector">>,
         direction => ingress,
         direction => ingress,
         remote_topic => <<"aws/#">>,
         remote_topic => <<"aws/#">>,
@@ -209,6 +211,7 @@ info_example_basic(mqtt, ingress) ->
     };
     };
 info_example_basic(mqtt, egress) ->
 info_example_basic(mqtt, egress) ->
     #{
     #{
+        enable => true,
         connector => <<"mqtt:my_mqtt_connector">>,
         connector => <<"mqtt:my_mqtt_connector">>,
         direction => egress,
         direction => egress,
         local_topic => <<"emqx/#">>,
         local_topic => <<"emqx/#">>,
@@ -512,7 +515,8 @@ aggregate_metrics(AllMetrics) ->
 
 
 format_resp(#{type := Type, name := BridgeName, raw_config := RawConf,
 format_resp(#{type := Type, name := BridgeName, raw_config := RawConf,
               resource_data := #{status := Status, metrics := Metrics}}) ->
               resource_data := #{status := Status, metrics := Metrics}}) ->
-    RawConf#{
+    RawConfFull = fill_defaults(Type, RawConf),
+    RawConfFull#{
         type => Type,
         type => Type,
         name => maps:get(<<"name">>, RawConf, BridgeName),
         name => maps:get(<<"name">>, RawConf, BridgeName),
         node => node(),
         node => node(),
@@ -527,6 +531,18 @@ format_metrics(#{
         } }) ->
         } }) ->
     ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
     ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
 
 
+fill_defaults(Type, RawConf) ->
+    PackedConf = pack_bridge_conf(Type, RawConf),
+    FullConf = emqx_config:fill_defaults(emqx_bridge_schema, PackedConf),
+    unpack_bridge_conf(Type, FullConf).
+
+pack_bridge_conf(Type, RawConf) ->
+    #{<<"bridges">> => #{Type => #{<<"foo">> => RawConf}}}.
+
+unpack_bridge_conf(Type, PackedConf) ->
+    #{<<"bridges">> := Bridges} = PackedConf,
+    #{<<"foo">> := RawConf} = maps:get(Type, Bridges),
+    RawConf.
 
 
 is_ok(ResL) ->
 is_ok(ResL) ->
     case lists:filter(fun({ok, _}) -> false; (ok) -> false; (_) -> true end, ResL) of
     case lists:filter(fun({ok, _}) -> false; (ok) -> false; (_) -> true end, ResL) of

+ 24 - 9
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -193,21 +193,35 @@ do_wait_for_resource_ready(InstId, Retry) ->
 
 
 do_create(InstId, Group, ResourceType, Config, Opts) ->
 do_create(InstId, Group, ResourceType, Config, Opts) ->
     case lookup(InstId) of
     case lookup(InstId) of
-        {ok,_, _} ->
+        {ok, _, _} ->
             {ok, already_created};
             {ok, already_created};
         {error, not_found} ->
         {error, not_found} ->
-            case do_start(InstId, Group, ResourceType, Config, Opts) of
-                ok ->
-                    ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
-                            [matched, success, failed, exception], [matched]),
-                    {ok, force_lookup(InstId)};
-                Error ->
-                    Error
+            case emqx_resource_ssl:convert_certs(InstId, Config) of
+                {error, Reason} ->
+                    {error, Reason};
+                {ok, Config1} ->
+                    do_create2(InstId, Group, ResourceType, Config1, Opts)
             end
             end
     end.
     end.
 
 
+do_create2(InstId, Group, ResourceType, Config, Opts) ->
+    ok = do_start(InstId, Group, ResourceType, Config, Opts),
+    ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
+            [matched, success, failed, exception], [matched]),
+    {ok, force_lookup(InstId)}.
+
 do_create_dry_run(ResourceType, Config) ->
 do_create_dry_run(ResourceType, Config) ->
     InstId = make_test_id(),
     InstId = make_test_id(),
+    case emqx_resource_ssl:convert_certs(InstId, Config) of
+        {error, Reason} ->
+            {error, Reason};
+        {ok, Config1} ->
+            Result = do_create_dry_run2(InstId, ResourceType, Config1),
+            _ = emqx_resource_ssl:clear_certs(InstId, Config1),
+            Result
+    end.
+
+do_create_dry_run2(InstId, ResourceType, Config) ->
     case emqx_resource:call_start(InstId, ResourceType, Config) of
     case emqx_resource:call_start(InstId, ResourceType, Config) of
         {ok, ResourceState} ->
         {ok, ResourceState} ->
             case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
             case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
@@ -231,8 +245,9 @@ do_remove(Instance) ->
 do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
 do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
     do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]).
     do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]).
 
 
-do_remove(Group, #{id := InstId} = Data, ClearMetrics) ->
+do_remove(Group, #{id := InstId, config := Config} = Data, ClearMetrics) ->
     _ = do_stop(Group, Data),
     _ = do_stop(Group, Data),
+    _ = emqx_resource_ssl:clear_certs(InstId, Config),
     ets:delete(emqx_resource_instance, InstId),
     ets:delete(emqx_resource_instance, InstId),
     case ClearMetrics of
     case ClearMetrics of
         true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
         true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);

+ 51 - 0
apps/emqx_resource/src/emqx_resource_ssl.erl

@@ -0,0 +1,51 @@
+
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_ssl).
+
+-export([ convert_certs/2
+        , convert_certs/3
+        , clear_certs/2
+        ]).
+
+convert_certs(ResId, NewConfig) ->
+    convert_certs(ResId, NewConfig, #{}).
+
+convert_certs(ResId, NewConfig, OldConfig) ->
+    OldSSL = drop_invalid_certs(maps:get(ssl, OldConfig, undefined)),
+    NewSSL = drop_invalid_certs(maps:get(ssl, NewConfig, undefined)),
+    CertsDir = cert_dir(ResId),
+    case emqx_tls_lib:ensure_ssl_files(CertsDir, NewSSL) of
+        {ok, NewSSL1} ->
+            ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL),
+            {ok, new_ssl_config(NewConfig, NewSSL1)};
+        {error, Reason} ->
+            {error, {bad_ssl_config, Reason}}
+    end.
+
+clear_certs(ResId, Config) ->
+    OldSSL = drop_invalid_certs(maps:get(ssl, Config, undefined)),
+    ok = emqx_tls_lib:delete_ssl_files(cert_dir(ResId), undefined, OldSSL).
+
+cert_dir(ResId) ->
+    filename:join(["resources", ResId]).
+
+new_ssl_config(Config, undefined) -> Config;
+new_ssl_config(Config, SSL) -> Config#{ssl => SSL}.
+
+drop_invalid_certs(undefined) -> undefined;
+drop_invalid_certs(SSL) -> emqx_tls_lib:drop_invalid_certs(SSL).