Просмотр исходного кода

chore(auth): add config propagation tests for authn

Ilya Averyanov 2 лет назад
Родитель
Сommit
8a3e8ee528

+ 25 - 14
apps/emqx/src/emqx_config_handler.erl

@@ -63,7 +63,7 @@
 -callback propagated_pre_config_update(
 -callback propagated_pre_config_update(
     [atom()], emqx_config:update_request(), emqx_config:raw_config()
     [atom()], emqx_config:update_request(), emqx_config:raw_config()
 ) ->
 ) ->
-    ok | {error, term()}.
+    ok | {ok, emqx_config:update_request()} | {error, term()}.
 
 
 -callback post_config_update(
 -callback post_config_update(
     [atom()],
     [atom()],
@@ -401,14 +401,14 @@ get_sub_config(_, _Conf) ->
 
 
 call_pre_config_update(Ctx) ->
 call_pre_config_update(Ctx) ->
     case call_proper_pre_config_update(Ctx) of
     case call_proper_pre_config_update(Ctx) of
-        {ok, NewUpdateReq} ->
+        {ok, NewUpdateReq0} ->
             case
             case
                 propagate_pre_config_updates_to_subconf(Ctx#{
                 propagate_pre_config_updates_to_subconf(Ctx#{
-                    update_req => NewUpdateReq
+                    update_req => NewUpdateReq0
                 })
                 })
             of
             of
-                {ok, _} ->
-                    {ok, NewUpdateReq};
+                {ok, #{update_req := NewUpdateReq1}} ->
+                    {ok, NewUpdateReq1};
                 {error, _} = Error ->
                 {error, _} = Error ->
                     Error
                     Error
             end;
             end;
@@ -471,12 +471,12 @@ propagate_pre_config_updates_to_subconf_wkey(
     Keys = propagate_keys(UpdateReq, OldRawConf),
     Keys = propagate_keys(UpdateReq, OldRawConf),
     propagate_pre_config_updates_to_subconf_keys(Keys, Ctx).
     propagate_pre_config_updates_to_subconf_keys(Keys, Ctx).
 
 
-propagate_pre_config_updates_to_subconf_keys([], #{update_req := UpdateReq}) ->
-    {ok, UpdateReq};
-propagate_pre_config_updates_to_subconf_keys([Key | Keys], Ctx) ->
-    case propagate_pre_config_updates_to_subconf_key(Key, Ctx) of
-        ok ->
-            propagate_pre_config_updates_to_subconf_keys(Keys, Ctx);
+propagate_pre_config_updates_to_subconf_keys([], Ctx) ->
+    {ok, Ctx};
+propagate_pre_config_updates_to_subconf_keys([Key | Keys], Ctx0) ->
+    case propagate_pre_config_updates_to_subconf_key(Key, Ctx0) of
+        {ok, Ctx1} ->
+            propagate_pre_config_updates_to_subconf_keys(Keys, Ctx1);
         {error, _} = Error ->
         {error, _} = Error ->
             Error
             Error
     end.
     end.
@@ -497,8 +497,10 @@ propagate_pre_config_updates_to_subconf_key(
     SubOldConf = get_sub_config(BinKey, OldRawConf),
     SubOldConf = get_sub_config(BinKey, OldRawConf),
     SubConfKeyPath = ConfKeyPath ++ [AtomKey],
     SubConfKeyPath = ConfKeyPath ++ [AtomKey],
     case {SubOldConf, SubUpdateReq} of
     case {SubOldConf, SubUpdateReq} of
+        %% we have handler, but no relevant keys in both configs (new and old),
+        %% so we don't need to go further
         {undefined, undefined} ->
         {undefined, undefined} ->
-            ok;
+            {ok, Ctx};
         {_, _} ->
         {_, _} ->
             case
             case
                 call_pre_config_update(Ctx#{
                 call_pre_config_update(Ctx#{
@@ -509,8 +511,17 @@ propagate_pre_config_updates_to_subconf_key(
                     callback := propagated_pre_config_update
                     callback := propagated_pre_config_update
                 })
                 })
             of
             of
-                {ok, _SubNewConf1} ->
-                    ok;
+                {ok, SubNewConf1} ->
+                    %% we update only if the new config is not to be removed
+                    %% i.e. SubUpdateReq is not undefined
+                    case SubUpdateReq of
+                        undefined ->
+                            {ok, Ctx};
+                        _ ->
+                            {ok, Ctx#{
+                                update_req := maps:put(BinKey, SubNewConf1, UpdateReq)
+                            }}
+                    end;
                 {error, _} = Error ->
                 {error, _} = Error ->
                     Error
                     Error
             end
             end

+ 48 - 22
apps/emqx/test/emqx_config_handler_SUITE.erl

@@ -224,8 +224,8 @@ t_callback_crash(_Config) ->
     ok = emqx_config_handler:remove_handler(CrashPath),
     ok = emqx_config_handler:remove_handler(CrashPath),
     ok.
     ok.
 
 
-t_pre_callback_error(_Config) ->
-    callback_error(
+t_pre_assert_update_result(_Config) ->
+    assert_update_result(
         [sysmon, os, mem_check_interval],
         [sysmon, os, mem_check_interval],
         <<"100s">>,
         <<"100s">>,
         {error, {pre_config_update, ?MODULE, pre_config_update_error}}
         {error, {pre_config_update, ?MODULE, pre_config_update_error}}
@@ -233,7 +233,7 @@ t_pre_callback_error(_Config) ->
     ok.
     ok.
 
 
 t_post_update_error(_Config) ->
 t_post_update_error(_Config) ->
-    callback_error(
+    assert_update_result(
         [sysmon, os, sysmem_high_watermark],
         [sysmon, os, sysmem_high_watermark],
         <<"60%">>,
         <<"60%">>,
         {error, {post_config_update, ?MODULE, post_config_update_error}}
         {error, {post_config_update, ?MODULE, post_config_update_error}}
@@ -243,7 +243,7 @@ t_post_update_error(_Config) ->
 t_post_update_propagate_error_wkey(_Config) ->
 t_post_update_propagate_error_wkey(_Config) ->
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"sysmem_high_watermark">>], Conf0, <<"60%">>),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"sysmem_high_watermark">>], Conf0, <<"60%">>),
-    callback_error(
+    assert_update_result(
         [
         [
             [sysmon, '?', sysmem_high_watermark],
             [sysmon, '?', sysmem_high_watermark],
             [sysmon]
             [sysmon]
@@ -257,7 +257,7 @@ t_post_update_propagate_error_wkey(_Config) ->
 t_post_update_propagate_error_key(_Config) ->
 t_post_update_propagate_error_key(_Config) ->
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"sysmem_high_watermark">>], Conf0, <<"60%">>),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"sysmem_high_watermark">>], Conf0, <<"60%">>),
-    callback_error(
+    assert_update_result(
         [
         [
             [sysmon, os, sysmem_high_watermark],
             [sysmon, os, sysmem_high_watermark],
             [sysmon]
             [sysmon]
@@ -271,7 +271,7 @@ t_post_update_propagate_error_key(_Config) ->
 t_pre_update_propagate_error_wkey(_Config) ->
 t_pre_update_propagate_error_wkey(_Config) ->
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"mem_check_interval">>], Conf0, <<"70s">>),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"mem_check_interval">>], Conf0, <<"70s">>),
-    callback_error(
+    assert_update_result(
         [
         [
             [sysmon, '?', mem_check_interval],
             [sysmon, '?', mem_check_interval],
             [sysmon]
             [sysmon]
@@ -285,7 +285,7 @@ t_pre_update_propagate_error_wkey(_Config) ->
 t_pre_update_propagate_error_key(_Config) ->
 t_pre_update_propagate_error_key(_Config) ->
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf0 = emqx_config:get_raw([sysmon]),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"mem_check_interval">>], Conf0, <<"70s">>),
     Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"mem_check_interval">>], Conf0, <<"70s">>),
-    callback_error(
+    assert_update_result(
         [
         [
             [sysmon, os, mem_check_interval],
             [sysmon, os, mem_check_interval],
             [sysmon]
             [sysmon]
@@ -296,6 +296,25 @@ t_pre_update_propagate_error_key(_Config) ->
     ),
     ),
     ok.
     ok.
 
 
+t_pre_update_propagate_key_rewrite(_Config) ->
+    Conf0 = emqx_config:get_raw([sysmon]),
+    Conf1 = emqx_utils_maps:deep_put([<<"os">>, <<"cpu_check_interval">>], Conf0, <<"333s">>),
+    with_update_result(
+        [
+            [sysmon, '?', cpu_check_interval],
+            [sysmon]
+        ],
+        [sysmon],
+        Conf1,
+        fun(_, Result) ->
+            ?assertMatch(
+                {ok, #{config := #{os := #{cpu_check_interval := 444000}}}},
+                Result
+            )
+        end
+    ),
+    ok.
+
 t_handler_root() ->
 t_handler_root() ->
     %% Don't rely on default emqx_config_handler's merge behaviour.
     %% Don't rely on default emqx_config_handler's merge behaviour.
     RootKey = [],
     RootKey = [],
@@ -352,6 +371,8 @@ pre_config_update([sysmon, os, sysmem_high_watermark], UpdateReq, _RawConf) ->
 pre_config_update([sysmon, os, mem_check_interval], _UpdateReq, _RawConf) ->
 pre_config_update([sysmon, os, mem_check_interval], _UpdateReq, _RawConf) ->
     {error, pre_config_update_error}.
     {error, pre_config_update_error}.
 
 
+propagated_pre_config_update([sysmon, os, cpu_check_interval], <<"333s">>, _RawConf) ->
+    {ok, <<"444s">>};
 propagated_pre_config_update([sysmon, os, mem_check_interval], _UpdateReq, _RawConf) ->
 propagated_pre_config_update([sysmon, os, mem_check_interval], _UpdateReq, _RawConf) ->
     {error, pre_config_update_error};
     {error, pre_config_update_error};
 propagated_pre_config_update(_ConfKeyPath, _UpdateReq, _RawConf) ->
 propagated_pre_config_update(_ConfKeyPath, _UpdateReq, _RawConf) ->
@@ -386,27 +407,32 @@ wait_for_new_pid() ->
             Pid
             Pid
     end.
     end.
 
 
-callback_error(FailedPath, Update, ExpectError) ->
-    callback_error([FailedPath], FailedPath, Update, ExpectError).
-
-callback_error(Paths, UpdatePath, Update, ExpectError) ->
+assert_update_result(FailedPath, Update, Expect) ->
+    assert_update_result([FailedPath], FailedPath, Update, Expect).
+
+assert_update_result(Paths, UpdatePath, Update, Expect) ->
+    with_update_result(Paths, UpdatePath, Update, fun(Old, Result) ->
+        case Expect of
+            {error, {post_config_update, ?MODULE, post_config_update_error}} ->
+                ?assertMatch(
+                    {error, {post_config_update, ?MODULE, {post_config_update_error, _}}}, Result
+                );
+            _ ->
+                ?assertEqual(Expect, Result)
+        end,
+        New = emqx:get_raw_config(UpdatePath, undefined),
+        ?assertEqual(Old, New)
+    end).
+
+with_update_result(Paths, UpdatePath, Update, Fun) ->
     ok = lists:foreach(
     ok = lists:foreach(
         fun(Path) -> emqx_config_handler:add_handler(Path, ?MODULE) end,
         fun(Path) -> emqx_config_handler:add_handler(Path, ?MODULE) end,
         Paths
         Paths
     ),
     ),
     Opts = #{rawconf_with_defaults => true},
     Opts = #{rawconf_with_defaults => true},
     Old = emqx:get_raw_config(UpdatePath, undefined),
     Old = emqx:get_raw_config(UpdatePath, undefined),
-    Error = emqx:update_config(UpdatePath, Update, Opts),
-    case ExpectError of
-        {error, {post_config_update, ?MODULE, post_config_update_error}} ->
-            ?assertMatch(
-                {error, {post_config_update, ?MODULE, {post_config_update_error, _}}}, Error
-            );
-        _ ->
-            ?assertEqual(ExpectError, Error)
-    end,
-    New = emqx:get_raw_config(UpdatePath, undefined),
-    ?assertEqual(Old, New),
+    Result = emqx:update_config(UpdatePath, Update, Opts),
+    _ = Fun(Old, Result),
     ok = lists:foreach(
     ok = lists:foreach(
         fun(Path) -> emqx_config_handler:remove_handler(Path) end,
         fun(Path) -> emqx_config_handler:remove_handler(Path) end,
         Paths
         Paths

+ 2 - 10
apps/emqx_authn/src/emqx_authentication_config.erl

@@ -150,13 +150,12 @@ do_pre_config_update(Paths, NewConfig, _OldConfig) ->
 -spec propagated_pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
 -spec propagated_pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
     ok | {error, term()}.
     ok | {error, term()}.
 propagated_pre_config_update(Paths, NewConfig, OldConfig) ->
 propagated_pre_config_update(Paths, NewConfig, OldConfig) ->
-    {ok, _} = do_pre_config_update(Paths, NewConfig, OldConfig),
-    ok.
+    do_pre_config_update(Paths, NewConfig, OldConfig).
 
 
 -spec post_config_update(
 -spec post_config_update(
     list(atom()),
     list(atom()),
     update_request(),
     update_request(),
-    map() | list(),
+    map() | list() | undefined,
     emqx_config:raw_config(),
     emqx_config:raw_config(),
     emqx_config:app_envs()
     emqx_config:app_envs()
 ) ->
 ) ->
@@ -222,13 +221,6 @@ do_post_config_update(Paths, _UpdateReq, NewConfig0, OldConfig0, _AppEnvs) ->
 
 
 propagated_post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
 propagated_post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
     ok = post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs),
     ok = post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs),
-    ChainName = chain_name(Paths),
-    ok = maybe_delete_chain(ChainName, NewConfig),
-    ok.
-
-maybe_delete_chain(ChainName, undefined) ->
-    ok = emqx_authentication:delete_chain(ChainName);
-maybe_delete_chain(_ChainName, _NewConfig) ->
     ok.
     ok.
 
 
 %% create new authenticators and update existing ones
 %% create new authenticators and update existing ones

+ 242 - 0
apps/emqx_authn/test/emqx_authn_listeners_SUITE.erl

@@ -0,0 +1,242 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authn_listeners_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include("emqx_authn.hrl").
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_authn], #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
+    ok.
+
+init_per_testcase(_Case, Config) ->
+    Port = emqx_common_test_helpers:select_free_port(tcp),
+    [{port, Port} | Config].
+
+end_per_testcase(_Case, _Config) ->
+    ok.
+
+t_create_update_delete(Config) ->
+    ListenerConf = listener_mqtt_tcp_conf(Config),
+    AuthnConfig0 = #{
+        <<"mechanism">> => <<"password_based">>,
+        <<"backend">> => <<"built_in_database">>,
+        <<"user_id_type">> => <<"clientid">>
+    },
+    %% Create
+    {ok, _} = emqx_conf:update(
+        [listeners],
+        #{
+            <<"tcp">> => #{
+                <<"listener0">> => ListenerConf#{
+                    ?CONF_NS_BINARY => AuthnConfig0
+                }
+            }
+        },
+        #{}
+    ),
+    ?assertMatch(
+        {ok, [
+            #{
+                authenticators := [
+                    #{
+                        id := <<"password_based:built_in_database">>,
+                        state := #{
+                            user_id_type := clientid
+                        }
+                    }
+                ],
+                name := 'tcp:listener0'
+            }
+        ]},
+        emqx_authentication:list_chains()
+    ),
+
+    %% Drop old, create new
+    {ok, _} = emqx_conf:update(
+        [listeners],
+        #{
+            <<"tcp">> => #{
+                <<"listener1">> => ListenerConf#{
+                    ?CONF_NS_BINARY => AuthnConfig0
+                }
+            }
+        },
+        #{}
+    ),
+    ?assertMatch(
+        {ok, [
+            #{
+                authenticators := [
+                    #{
+                        id := <<"password_based:built_in_database">>,
+                        state := #{
+                            user_id_type := clientid
+                        }
+                    }
+                ],
+                name := 'tcp:listener1'
+            }
+        ]},
+        emqx_authentication:list_chains()
+    ),
+
+    %% Update
+    {ok, _} = emqx_conf:update(
+        [listeners],
+        #{
+            <<"tcp">> => #{
+                <<"listener1">> => ListenerConf#{
+                    ?CONF_NS_BINARY => AuthnConfig0#{<<"user_id_type">> => <<"username">>}
+                }
+            }
+        },
+        #{}
+    ),
+    ?assertMatch(
+        {ok, [
+            #{
+                authenticators := [
+                    #{
+                        id := <<"password_based:built_in_database">>,
+                        state := #{
+                            user_id_type := username
+                        }
+                    }
+                ],
+                name := 'tcp:listener1'
+            }
+        ]},
+        emqx_authentication:list_chains()
+    ),
+
+    %% Update by listener path
+    {ok, _} = emqx_conf:update(
+        [listeners, tcp, listener1],
+        {update, ListenerConf#{
+            ?CONF_NS_BINARY => AuthnConfig0#{<<"user_id_type">> => <<"clientid">>}
+        }},
+        #{}
+    ),
+    ?assertMatch(
+        {ok, [
+            #{
+                authenticators := [
+                    #{
+                        id := <<"password_based:built_in_database">>,
+                        state := #{
+                            user_id_type := clientid
+                        }
+                    }
+                ],
+                name := 'tcp:listener1'
+            }
+        ]},
+        emqx_authentication:list_chains()
+    ),
+
+    %% Delete
+    {ok, _} = emqx_conf:tombstone(
+        [listeners, tcp, listener1],
+        #{}
+    ),
+    ?assertMatch(
+        {ok, []},
+        emqx_authentication:list_chains()
+    ).
+
+t_convert_certs(Config) ->
+    ListenerConf = listener_mqtt_tcp_conf(Config),
+    AuthnConfig0 = #{
+        <<"mechanism">> => <<"password_based">>,
+        <<"password_hash_algorithm">> => #{
+            <<"name">> => <<"plain">>,
+            <<"salt_position">> => <<"suffix">>
+        },
+        <<"enable">> => <<"true">>,
+
+        <<"backend">> => <<"redis">>,
+        <<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt is_superuser">>,
+        <<"database">> => <<"1">>,
+        <<"password">> => <<"public">>,
+        <<"server">> => <<"127.0.0.1:55555">>,
+        <<"redis_type">> => <<"single">>,
+        <<"ssl">> => #{
+            <<"enable">> => true,
+            <<"cacertfile">> => some_pem(),
+            <<"certfile">> => some_pem(),
+            <<"keyfile">> => some_pem()
+        }
+    },
+    {ok, _} = emqx_conf:update(
+        [listeners],
+        #{
+            <<"tcp">> => #{
+                <<"listener0">> => ListenerConf#{
+                    ?CONF_NS_BINARY => AuthnConfig0
+                }
+            }
+        },
+        #{}
+    ),
+    lists:foreach(
+        fun(Key) ->
+            [#{ssl := #{Key := FilePath}}] = emqx_config:get([
+                listeners, tcp, listener0, authentication
+            ]),
+            ?assert(filelib:is_regular(FilePath))
+        end,
+        [cacertfile, certfile, keyfile]
+    ).
+
+%%--------------------------------------------------------------------
+%% Helper Functions
+%%--------------------------------------------------------------------
+
+listener_mqtt_tcp_conf(Config) ->
+    Port = ?config(port, Config),
+    PortS = integer_to_binary(Port),
+    #{
+        <<"acceptors">> => 16,
+        <<"access_rules">> => [<<"allow all">>],
+        <<"bind">> => <<"0.0.0.0:", PortS/binary>>,
+        <<"max_connections">> => 1024000,
+        <<"mountpoint">> => <<>>,
+        <<"proxy_protocol">> => false,
+        <<"proxy_protocol_timeout">> => <<"3s">>,
+        <<"enable_authn">> => true
+    }.
+
+some_pem() ->
+    Dir = code:lib_dir(emqx_authn, test),
+    Path = filename:join([Dir, "data", "private_key.pem"]),
+    {ok, Pem} = file:read_file(Path),
+    Pem.