Просмотр исходного кода

Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240729-021938

id 1 год назад
Родитель
Сommit
c1e2801f41

+ 61 - 0
.ci/docker-compose-file/openldap/README.md

@@ -0,0 +1,61 @@
+# LDAP authentication
+
+To run manual tests with the default docker-compose files.
+
+Expose openldap container port by uncommenting the `ports` config in `docker-compose-ldap.yaml `
+
+To start openldap:
+
+```
+docker-compose -f ./.ci/docker-compose-file/docker-compose.yaml -f ./.ci/docker-compose-file/docker-compose-ldap.yaml up -docker
+```
+
+## LDAP database
+
+LDAP database is populated from below files:
+```
+apps/emqx_ldap/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
+apps/emqx_ldap/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
+```
+
+## Minimal EMQX config
+
+```
+authentication = [
+  {
+    backend = ldap
+    base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
+    filter = "(& (objectClass=mqttUser) (uid=${username}))"
+    mechanism = password_based
+    method {
+      is_superuser_attribute = isSuperuser
+      password_attribute = userPassword
+      type = hash
+    }
+    password = public
+    pool_size = 8
+    query_timeout = "5s"
+    request_timeout = "10s"
+    server = "localhost:1389"
+    username = "cn=root,dc=emqx,dc=io"
+  }
+]
+```
+
+## Example ldapsearch command
+
+```
+ldapsearch -x -H ldap://localhost:389 -D "cn=root,dc=emqx,dc=io" -W -b "uid=mqttuser0007,ou=testdevice,dc=emqx,dc=io" "(&(objectClass=mqttUser)(uid=mqttuser0007))"
+```
+
+## Example mqttx command
+
+The client password hashes are generated from their username.
+
+```
+# disabled user
+mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0006 -P mqttuser0006
+
+# enabled super-user
+mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0007 -P mqttuser0007
+```

+ 7 - 0
apps/emqx/src/emqx_exclusive_subscription.erl

@@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
                 write
             ),
             allow;
+        [#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
+            %% Fixed the issue-13476
+            %% In this feature, the user must manually call `unsubscribe` to release the lock,
+            %% but sometimes the node may go down for some reason,
+            %% then the client will reconnect to this node and resubscribe.
+            %% We need to allow resubscription, otherwise the lock will never be released.
+            allow;
         [_] ->
             deny
     end.

+ 2 - 0
apps/emqx/test/emqx_exclusive_sub_SUITE.erl

@@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
     {ok, _} = emqtt:connect(C1),
     ?CHECK_SUB(C1, 0),
 
+    ?CHECK_SUB(C1, 0),
+
     {ok, C2} = emqtt:start_link([
         {clientid, <<"client2">>},
         {clean_start, false},

+ 82 - 11
apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl

@@ -21,6 +21,7 @@
 -include_lib("emqx_auth/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(LDAP_HOST, "ldap").
 -define(LDAP_DEFAULT_PORT, 389).
@@ -46,13 +47,6 @@ init_per_suite(Config) ->
             Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_ldap], #{
                 work_dir => ?config(priv_dir, Config)
             }),
-            {ok, _} = emqx_resource:create_local(
-                ?LDAP_RESOURCE,
-                ?AUTHN_RESOURCE_GROUP,
-                emqx_ldap,
-                ldap_config(),
-                #{}
-            ),
             [{apps, Apps} | Config];
         false ->
             {skip, no_ldap}
@@ -63,7 +57,6 @@ end_per_suite(Config) ->
         [authentication],
         ?GLOBAL
     ),
-    ok = emqx_resource:remove_local(?LDAP_RESOURCE),
     ok = emqx_cth_suite:stop(?config(apps, Config)).
 
 %%------------------------------------------------------------------------------
@@ -128,6 +121,87 @@ t_create_invalid(_Config) ->
         InvalidConfigs
     ).
 
+t_authenticate_timeout_cause_reconnect(_Config) ->
+    TestPid = self(),
+    meck:new(eldap, [non_strict, no_link, passthrough]),
+    try
+        %% cause eldap process to be killed
+        meck:expect(
+            eldap,
+            search,
+            fun
+                (Pid, [{base, <<"uid=mqttuser0007", _/binary>>} | _]) ->
+                    TestPid ! {eldap_pid, Pid},
+                    {error, {gen_tcp_error, timeout}};
+                (Pid, Args) ->
+                    meck:passthrough([Pid, Args])
+            end
+        ),
+
+        Credentials = fun(Username) ->
+            #{
+                username => Username,
+                password => Username,
+                listener => 'tcp:default',
+                protocol => mqtt
+            }
+        end,
+
+        SpecificConfigParams = #{},
+        Result = {ok, #{is_superuser => true}},
+
+        Timeout = 1000,
+        Config0 = raw_ldap_auth_config(),
+        Config = Config0#{
+            <<"pool_size">> => 1,
+            <<"request_timeout">> => Timeout
+        },
+        AuthConfig = maps:merge(Config, SpecificConfigParams),
+        {ok, _} = emqx:update_config(
+            ?PATH,
+            {create_authenticator, ?GLOBAL, AuthConfig}
+        ),
+
+        %% 0006 is a disabled user
+        ?assertEqual(
+            {error, user_disabled},
+            emqx_access_control:authenticate(Credentials(<<"mqttuser0006">>))
+        ),
+        ?assertEqual(
+            {error, not_authorized},
+            emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))
+        ),
+        ok = wait_for_ldap_pid(1000),
+        [#{id := ResourceID}] = emqx_resource_manager:list_all(),
+        ?retry(1_000, 10, {ok, connected} = emqx_resource_manager:health_check(ResourceID)),
+        %% turn back to normal
+        meck:expect(
+            eldap,
+            search,
+            2,
+            fun(Pid2, Query) ->
+                meck:passthrough([Pid2, Query])
+            end
+        ),
+        %% expect eldap process to be restarted
+        ?assertEqual(Result, emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))),
+        emqx_authn_test_lib:delete_authenticators(
+            [authentication],
+            ?GLOBAL
+        )
+    after
+        meck:unload(eldap)
+    end.
+
+wait_for_ldap_pid(After) ->
+    receive
+        {eldap_pid, Pid} ->
+            ?assertNot(is_process_alive(Pid)),
+            ok
+    after After ->
+        error(timeout)
+    end.
+
 t_authenticate(_Config) ->
     ok = lists:foreach(
         fun(Sample) ->
@@ -300,6 +374,3 @@ user_seeds() ->
 
 ldap_server() ->
     iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
-
-ldap_config() ->
-    emqx_ldap_SUITE:ldap_config([]).

+ 0 - 14
apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl

@@ -44,7 +44,6 @@ init_per_suite(Config) ->
                 ],
                 #{work_dir => emqx_cth_suite:work_dir(Config)}
             ),
-            ok = create_ldap_resource(),
             [{apps, Apps} | Config];
         false ->
             {skip, no_ldap}
@@ -167,21 +166,8 @@ setup_config(SpecialParams) ->
 ldap_server() ->
     iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
 
-ldap_config() ->
-    emqx_ldap_SUITE:ldap_config([]).
-
 start_apps(Apps) ->
     lists:foreach(fun application:ensure_all_started/1, Apps).
 
 stop_apps(Apps) ->
     lists:foreach(fun application:stop/1, Apps).
-
-create_ldap_resource() ->
-    {ok, _} = emqx_resource:create_local(
-        ?LDAP_RESOURCE,
-        ?AUTHZ_RESOURCE_GROUP,
-        emqx_ldap,
-        ldap_config(),
-        #{}
-    ),
-    ok.

+ 1 - 1
apps/emqx_connector/src/emqx_connector_jwt.erl

@@ -141,5 +141,5 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
 is_about_to_expire(JWT) ->
     #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
     Now = erlang:system_time(seconds),
-    GraceExp = Exp - timer:seconds(5),
+    GraceExp = Exp - 5,
     Now >= GraceExp.

+ 4 - 2
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -352,8 +352,10 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
 safe_atom(Atom) when is_atom(Atom) -> Atom.
 
 parse_opts(Conf, Opts0) ->
-    Opts1 = override_start_after_created(Conf, Opts0),
-    set_no_buffer_workers(Opts1).
+    Opts1 = emqx_resource:fetch_creation_opts(Conf),
+    Opts2 = maps:merge(Opts1, Opts0),
+    Opts = override_start_after_created(Conf, Opts2),
+    set_no_buffer_workers(Opts).
 
 override_start_after_created(Config, Opts) ->
     Enabled = maps:get(enable, Config, true),

+ 1 - 1
apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl

@@ -125,7 +125,7 @@ t_ensure_jwt(_Config) ->
             JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig),
             ?assertNot(is_expired(JWT0)),
             %% should refresh 5 s before expiration
-            ct:sleep(Expiration - 5500),
+            ct:sleep(Expiration - 3000),
             JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig),
             ?assertNot(is_expired(JWT1)),
             %% fully expired

+ 21 - 6
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl

@@ -260,7 +260,15 @@ convert_certs(_Dir, Conf) ->
 %%------------------------------------------------------------------------------
 
 save_jwks_file(Dir, Content) ->
-    Path = filename:join([emqx_tls_lib:pem_dir(Dir), "client_jwks"]),
+    case filelib:is_file(Content) of
+        true ->
+            {ok, Content};
+        _ ->
+            Path = filename:join([emqx_tls_lib:pem_dir(Dir), "client_jwks"]),
+            write_jwks_file(Path, Content)
+    end.
+
+write_jwks_file(Path, Content) ->
     case filelib:ensure_dir(Path) of
         ok ->
             case file:write_file(Path, Content) of
@@ -288,11 +296,18 @@ maybe_require_pkce(true, Opts) ->
     }.
 
 init_client_jwks(#{client_jwks := #{type := file, file := File}}) ->
-    case jose_jwk:from_file(File) of
-        {error, _} ->
-            none;
-        Jwks ->
-            Jwks
+    try
+        case jose_jwk:from_file(File) of
+            {error, Reason} ->
+                ?SLOG(error, #{msg => "failed_to_initialize_jwks", reason => Reason}),
+                none;
+            Jwks ->
+                Jwks
+        end
+    catch
+        _:CReason ->
+            ?SLOG(error, #{msg => "failed_to_initialize_jwks", reason => CReason}),
+            none
     end;
 init_client_jwks(_) ->
     none.

+ 13 - 5
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_api.erl

@@ -28,6 +28,7 @@
 
 -export([code_callback/2, make_callback_url/1]).
 
+-define(BAD_REQUEST, 'BAD_REQUEST').
 -define(BAD_USERNAME_OR_PWD, 'BAD_USERNAME_OR_PWD').
 -define(BACKEND_NOT_FOUND, 'BACKEND_NOT_FOUND').
 
@@ -62,6 +63,7 @@ schema("/sso/oidc/callback") ->
             desc => ?DESC(code_callback),
             responses => #{
                 200 => emqx_dashboard_api:fields([token, version, license]),
+                400 => response_schema(400),
                 401 => response_schema(401),
                 404 => response_schema(404)
             },
@@ -78,8 +80,9 @@ code_callback(get, #{query_string := QS}) ->
             ?SLOG(info, #{
                 msg => "dashboard_sso_login_successful"
             }),
-
             {302, ?RESPHEADERS#{<<"location">> => Target}, ?REDIRECT_BODY};
+        {error, invalid_query_string_param} ->
+            {400, #{code => ?BAD_REQUEST, message => <<"Invalid query string">>}};
         {error, invalid_backend} ->
             {404, #{code => ?BACKEND_NOT_FOUND, message => <<"Backend not found">>}};
         {error, Reason} ->
@@ -93,11 +96,14 @@ code_callback(get, #{query_string := QS}) ->
 %%--------------------------------------------------------------------
 %% internal
 %%--------------------------------------------------------------------
-
+response_schema(400) ->
+    emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>);
 response_schema(401) ->
-    emqx_dashboard_swagger:error_codes([?BAD_USERNAME_OR_PWD], ?DESC(login_failed401));
+    emqx_dashboard_swagger:error_codes(
+        [?BAD_USERNAME_OR_PWD], ?DESC(emqx_dashboard_api, login_failed401)
+    );
 response_schema(404) ->
-    emqx_dashboard_swagger:error_codes([?BACKEND_NOT_FOUND], ?DESC(backend_not_found)).
+    emqx_dashboard_swagger:error_codes([?BACKEND_NOT_FOUND], <<"Backend not found">>).
 
 reason_to_message(Bin) when is_binary(Bin) ->
     Bin;
@@ -119,7 +125,9 @@ ensure_oidc_state(#{<<"state">> := State} = QS, Cfg) ->
             retrieve_token(QS, Cfg, Data);
         _ ->
             {error, session_not_exists}
-    end.
+    end;
+ensure_oidc_state(_, _Cfg) ->
+    {error, invalid_query_string_param}.
 
 retrieve_token(
     #{<<"code">> := Code},

+ 19 - 2
apps/emqx_ldap/src/emqx_ldap.erl

@@ -41,6 +41,7 @@
 -export([namespace/0, roots/0, fields/1, desc/1]).
 
 -export([do_get_status/1, get_status_with_poolname/1]).
+-export([search/2]).
 
 -define(LDAP_HOST_OPTIONS, #{
     default_port => 389
@@ -273,6 +274,22 @@ on_query(
             Error
     end.
 
+search(Pid, SearchOptions) ->
+    case eldap:search(Pid, SearchOptions) of
+        {error, ldap_closed} ->
+            %% ldap server closing the socket does not result in
+            %% process restart, so we need to kill it to trigger a quick reconnect
+            %% instead of waiting for the next health-check
+            _ = exit(Pid, kill),
+            {error, ldap_closed};
+        {error, {gen_tcp_error, _} = Reason} ->
+            %% kill the process to trigger reconnect
+            _ = exit(Pid, kill),
+            {error, Reason};
+        Result ->
+            Result
+    end.
+
 do_ldap_query(
     InstId,
     SearchOptions,
@@ -283,7 +300,7 @@ do_ldap_query(
     case
         ecpool:pick_and_do(
             PoolName,
-            {eldap, search, [SearchOptions]},
+            {?MODULE, search, [SearchOptions]},
             handover
         )
     of
@@ -319,7 +336,7 @@ do_ldap_query(
             ?SLOG(
                 error,
                 LogMeta#{
-                    msg => "ldap_connector_do_query_failed",
+                    msg => "ldap_connector_query_failed",
                     reason => emqx_utils:redact(Reason)
                 }
             ),

+ 9 - 0
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -252,6 +252,9 @@
     timezone_to_offset_seconds/1
 ]).
 
+%% System functions
+-export([getenv/1]).
+
 %% See extra_functions_module/0 and set_extra_functions_module/1 in the
 %% emqx_rule_engine module
 -callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
@@ -1262,3 +1265,9 @@ convert_timestamp(MillisecondsTimestamp) ->
 
 uuid_str(UUID, DisplayOpt) ->
     uuid:uuid_to_string(UUID, DisplayOpt).
+
+%%------------------------------------------------------------------------------
+%% System Funcs
+%%------------------------------------------------------------------------------
+getenv(Env) ->
+    emqx_variform_bif:getenv(Env).

+ 27 - 0
apps/emqx_utils/src/emqx_variform_bif.erl

@@ -79,6 +79,12 @@
 %% Number compare functions
 -export([num_comp/2, num_eq/2, num_lt/2, num_lte/2, num_gt/2, num_gte/2]).
 
+%% System
+-export([getenv/1]).
+
+-define(CACHE(Key), {?MODULE, Key}).
+-define(ENV_CACHE(Env), ?CACHE({env, Env})).
+
 %%------------------------------------------------------------------------------
 %% String Funcs
 %%------------------------------------------------------------------------------
@@ -569,3 +575,24 @@ num_lte(A, B) ->
 num_gte(A, B) ->
     R = num_comp(A, B),
     R =:= gt orelse R =:= eq.
+
+%%------------------------------------------------------------------------------
+%% System
+%%------------------------------------------------------------------------------
+getenv(Bin) when is_binary(Bin) ->
+    EnvKey = ?ENV_CACHE(Bin),
+    case persistent_term:get(EnvKey, undefined) of
+        undefined ->
+            Name = erlang:binary_to_list(Bin),
+            Result =
+                case os:getenv(Name) of
+                    false ->
+                        <<>>;
+                    Value ->
+                        erlang:list_to_binary(Value)
+                end,
+            persistent_term:put(EnvKey, Result),
+            Result;
+        Result ->
+            Result
+    end.

+ 7 - 0
apps/emqx_utils/test/emqx_variform_bif_tests.erl

@@ -72,3 +72,10 @@ base64_encode_decode_test() ->
     RandBytes = crypto:strong_rand_bytes(100),
     Encoded = emqx_variform_bif:base64_encode(RandBytes),
     ?assertEqual(RandBytes, emqx_variform_bif:base64_decode(Encoded)).
+
+system_test() ->
+    EnvName = erlang:atom_to_list(?MODULE),
+    EnvVal = erlang:atom_to_list(?FUNCTION_NAME),
+    EnvNameBin = erlang:list_to_binary(EnvName),
+    os:putenv(EnvName, EnvVal),
+    ?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)).

+ 2 - 0
changes/ce/feat-13507.en.md

@@ -0,0 +1,2 @@
+Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables.
+Note this value is immutable once loaded from the environment.

+ 4 - 0
changes/ce/feat-13521.en.md

@@ -0,0 +1,4 @@
+Fix LDAP query timeout issue.
+
+Previously, LDAP query timeout may cause the underlying connection to be unusable.
+Fixed to always reconnect if timeout happens.

+ 1 - 0
changes/ce/fix-13503.en.md

@@ -0,0 +1 @@
+Fixed an issue where a connector wouldn't respect the configured health check interval when first starting up, and would need an update/restart for the correct value to take effect.

+ 1 - 0
changes/ce/fix-13515.en.md

@@ -0,0 +1 @@
+Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason.

+ 19 - 0
rel/config/ee-examples/ldap-authn.conf

@@ -0,0 +1,19 @@
+authentication = [
+  {
+    backend = ldap
+    base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
+    filter = "(& (objectClass=mqttUser) (uid=${username}))"
+    mechanism = password_based
+    method {
+      is_superuser_attribute = isSuperuser
+      password_attribute = userPassword
+      type = hash
+    }
+    password = public
+    pool_size = 8
+    query_timeout = "5s"
+    request_timeout = "10s"
+    server = "localhost:1389"
+    username = "cn=root,dc=emqx,dc=io"
+  }
+]