Kaynağa Gözat

Merge branch 'release-57' into merge/william/sync-rel57-to-master

William Yang 1 yıl önce
ebeveyn
işleme
d39b8caff9
41 değiştirilmiş dosya ile 2599 ekleme ve 663 silme
  1. 258 0
      apps/emqx/include/emqx_metrics.hrl
  2. 2 0
      apps/emqx/include/emqx_placeholder.hrl
  3. 5 0
      apps/emqx/include/emqx_schema.hrl
  4. 1 0
      apps/emqx/src/emqx_access_control.erl
  5. 18 4
      apps/emqx/src/emqx_channel.erl
  6. 30 50
      apps/emqx/src/emqx_connection.erl
  7. 20 3
      apps/emqx/src/emqx_listeners.erl
  8. 6 207
      apps/emqx/src/emqx_metrics.erl
  9. 2 2
      apps/emqx/src/emqx_quic_stream.erl
  10. 3 1
      apps/emqx/src/emqx_schema.erl
  11. 15 0
      apps/emqx/src/emqx_tls_lib.erl
  12. 9 13
      apps/emqx/test/emqx_connection_SUITE.erl
  13. 307 0
      apps/emqx/test/emqx_test_tls_certs_helper.erl
  14. 20 0
      apps/emqx_auth_ext/.gitignore
  15. 94 0
      apps/emqx_auth_ext/BSL.txt
  16. 7 0
      apps/emqx_auth_ext/README.md
  17. 2 0
      apps/emqx_auth_ext/rebar.config
  18. 21 0
      apps/emqx_auth_ext/src/emqx_auth_ext.app.src
  19. 28 0
      apps/emqx_auth_ext/src/emqx_auth_ext.erl
  20. 42 0
      apps/emqx_auth_ext/src/emqx_auth_ext_schema.erl
  21. 111 0
      apps/emqx_auth_ext/src/emqx_auth_ext_tls_const_v1.erl
  22. 66 0
      apps/emqx_auth_ext/src/emqx_auth_ext_tls_lib.erl
  23. 247 0
      apps/emqx_auth_ext/test/emqx_auth_ext_listener_tls_verify_chain_SUITE.erl
  24. 362 0
      apps/emqx_auth_ext/test/emqx_auth_ext_listener_tls_verify_keyusage_SUITE.erl
  25. 709 0
      apps/emqx_auth_ext/test/emqx_auth_ext_listener_tls_verify_partial_chain_SUITE.erl
  26. 66 0
      apps/emqx_auth_ext/test/emqx_auth_ext_schema_SUITE.erl
  27. 1 0
      apps/emqx_auth_http/src/emqx_authz_http.erl
  28. 9 3
      apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl
  29. 2 1
      apps/emqx_conf/include/emqx_conf.hrl
  30. 29 27
      apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl
  31. 8 0
      apps/emqx_gateway/src/emqx_gateway_utils.erl
  32. 2 1
      apps/emqx_machine/priv/reboot_lists.eterm
  33. 20 347
      apps/emqx_management/src/emqx_mgmt_api_metrics.erl
  34. 1 1
      apps/emqx_s3/src/emqx_s3.app.src
  35. 2 2
      apps/emqx_s3/src/emqx_s3_schema.erl
  36. 1 0
      changes/ce/feat-13180.en.md
  37. 22 0
      changes/ee/feat-13211.en.md
  38. 3 1
      mix.exs
  39. 1 0
      rebar.config.erl
  40. 46 0
      rel/i18n/emqx_auth_ext_schema.hocon
  41. 1 0
      scripts/spellcheck/dicts/emqx.txt

+ 258 - 0
apps/emqx/include/emqx_metrics.hrl

@@ -0,0 +1,258 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_METRICS_HRL).
+-define(EMQX_METRICS_HRL, true).
+
+%% Bytes sent and received
+-define(BYTES_METRICS, [
+    {counter, 'bytes.received', <<"Number of bytes received ">>},
+    {counter, 'bytes.sent', <<"Number of bytes sent on this connection">>}
+]).
+
+%% Packets sent and received
+-define(PACKET_METRICS, [
+    {counter, 'packets.received', <<"Number of received packet">>},
+    {counter, 'packets.sent', <<"Number of sent packet">>},
+    {counter, 'packets.connect.received', <<"Number of received CONNECT packet">>},
+    {counter, 'packets.connack.sent', <<"Number of sent CONNACK packet">>},
+    {counter, 'packets.connack.error',
+        <<"Number of received CONNECT packet with unsuccessful connections">>},
+    {counter, 'packets.connack.auth_error',
+        <<"Number of received CONNECT packet with failed Authentication">>},
+    {counter, 'packets.publish.received', <<"Number of received PUBLISH packet">>},
+    %% PUBLISH packets sent
+    {counter, 'packets.publish.sent', <<"Number of sent PUBLISH packet">>},
+    %% PUBLISH packet_id inuse
+    {counter, 'packets.publish.inuse',
+        <<"Number of received PUBLISH packet with occupied identifiers">>},
+    %% PUBLISH failed for error
+    {counter, 'packets.publish.error',
+        <<"Number of received PUBLISH packet that cannot be published">>},
+    %% PUBLISH failed for auth error
+    {counter, 'packets.publish.auth_error',
+        <<"Number of received PUBLISH packets with failed the Authorization check">>},
+    %% PUBLISH(QoS2) packets dropped
+    {counter, 'packets.publish.dropped',
+        <<"Number of messages discarded due to the receiving limit">>},
+    %% PUBACK packets received
+    {counter, 'packets.puback.received', <<"Number of received PUBACK packet">>},
+    %% PUBACK packets sent
+    {counter, 'packets.puback.sent', <<"Number of sent PUBACK packet">>},
+    %% PUBACK packet_id inuse
+    {counter, 'packets.puback.inuse',
+        <<"Number of received PUBACK packet with occupied identifiers">>},
+    %% PUBACK packets missed
+    {counter, 'packets.puback.missed', <<"Number of received packet with identifiers.">>},
+    %% PUBREC packets received
+    {counter, 'packets.pubrec.received', <<"Number of received PUBREC packet">>},
+    %% PUBREC packets sent
+    {counter, 'packets.pubrec.sent', <<"Number of sent PUBREC packet">>},
+    %% PUBREC packet_id inuse
+    {counter, 'packets.pubrec.inuse',
+        <<"Number of received PUBREC packet with occupied identifiers">>},
+    %% PUBREC packets missed
+    {counter, 'packets.pubrec.missed',
+        <<"Number of received PUBREC packet with unknown identifiers">>},
+    %% PUBREL packets received
+    {counter, 'packets.pubrel.received', <<"Number of received PUBREL packet">>},
+    %% PUBREL packets sent
+    {counter, 'packets.pubrel.sent', <<"Number of sent PUBREL packet">>},
+    %% PUBREL packets missed
+    {counter, 'packets.pubrel.missed',
+        <<"Number of received PUBREC packet with unknown identifiers">>},
+    %% PUBCOMP packets received
+    {counter, 'packets.pubcomp.received', <<"Number of received PUBCOMP packet">>},
+    %% PUBCOMP packets sent
+    {counter, 'packets.pubcomp.sent', <<"Number of sent PUBCOMP packet">>},
+    %% PUBCOMP packet_id inuse
+    {counter, 'packets.pubcomp.inuse',
+        <<"Number of received PUBCOMP packet with occupied identifiers">>},
+    %% PUBCOMP packets missed
+    {counter, 'packets.pubcomp.missed', <<"Number of missed PUBCOMP packet">>},
+    %% SUBSCRIBE Packets received
+    {counter, 'packets.subscribe.received', <<"Number of received SUBSCRIBE packet">>},
+    %% SUBSCRIBE error
+    {counter, 'packets.subscribe.error',
+        <<"Number of received SUBSCRIBE packet with failed subscriptions">>},
+    %% SUBSCRIBE failed for not auth
+    {counter, 'packets.subscribe.auth_error',
+        <<"Number of received SUBACK packet with failed Authorization check">>},
+    %% SUBACK packets sent
+    {counter, 'packets.suback.sent', <<"Number of sent SUBACK packet">>},
+    %% UNSUBSCRIBE Packets received
+    {counter, 'packets.unsubscribe.received', <<"Number of received UNSUBSCRIBE packet">>},
+    %% UNSUBSCRIBE error
+    {counter, 'packets.unsubscribe.error',
+        <<"Number of received UNSUBSCRIBE packet with failed unsubscriptions">>},
+    %% UNSUBACK Packets sent
+    {counter, 'packets.unsuback.sent', <<"Number of sent UNSUBACK packet">>},
+    %% PINGREQ packets received
+    {counter, 'packets.pingreq.received', <<"Number of received PINGREQ packet">>},
+    %% PINGRESP Packets sent
+    {counter, 'packets.pingresp.sent', <<"Number of sent PUBRESP packet">>},
+    %% DISCONNECT Packets received
+    {counter, 'packets.disconnect.received', <<"Number of received DISCONNECT packet">>},
+    %% DISCONNECT Packets sent
+    {counter, 'packets.disconnect.sent', <<"Number of sent DISCONNECT packet">>},
+    %% Auth Packets received
+    {counter, 'packets.auth.received', <<"Number of received AUTH packet">>},
+    %% Auth Packets sent
+    {counter, 'packets.auth.sent', <<"Number of sent AUTH packet">>}
+]).
+
+%% Messages sent/received and pubsub
+-define(MESSAGE_METRICS, [
+    %% All Messages received
+    {counter, 'messages.received', <<
+        "Number of messages received from the client, equal to the sum of "
+        "messages.qos0.received\fmessages.qos1.received and messages.qos2.received"
+    >>},
+    %% All Messages sent
+    {counter, 'messages.sent', <<
+        "Number of messages sent to the client, equal to the sum of "
+        "messages.qos0.sent\fmessages.qos1.sent and messages.qos2.sent"
+    >>},
+    %% QoS0 Messages received
+    {counter, 'messages.qos0.received', <<"Number of QoS 0 messages received from clients">>},
+    %% QoS0 Messages sent
+    {counter, 'messages.qos0.sent', <<"Number of QoS 0 messages sent to clients">>},
+    %% QoS1 Messages received
+    {counter, 'messages.qos1.received', <<"Number of QoS 1 messages received from clients">>},
+    %% QoS1 Messages sent
+    {counter, 'messages.qos1.sent', <<"Number of QoS 1 messages sent to clients">>},
+    %% QoS2 Messages received
+    {counter, 'messages.qos2.received', <<"Number of QoS 2 messages received from clients">>},
+    %% QoS2 Messages sent
+    {counter, 'messages.qos2.sent', <<"Number of QoS 2 messages sent to clients">>},
+    %% PubSub Metrics
+
+    %% Messages Publish
+    {counter, 'messages.publish',
+        <<"Number of messages published in addition to system messages">>},
+    %% Messages dropped due to no subscribers
+    {counter, 'messages.dropped',
+        <<"Number of messages dropped before forwarding to the subscription process">>},
+    %% Messages that failed validations
+    {counter, 'messages.validation_failed', <<"Number of message validation failed">>},
+    %% Messages that passed validations
+    {counter, 'messages.validation_succeeded', <<"Number of message validation successful">>},
+    %% % Messages that failed transformations
+    {counter, 'messages.transformation_failed', <<"Number fo message transformation failed">>},
+    %% % Messages that passed transformations
+    {counter, 'messages.transformation_succeeded',
+        <<"Number fo message transformation succeeded">>},
+    %% QoS2 Messages expired
+    {counter, 'messages.dropped.await_pubrel_timeout',
+        <<"Number of messages dropped due to waiting PUBREL timeout">>},
+    %% Messages dropped
+    {counter, 'messages.dropped.no_subscribers',
+        <<"Number of messages dropped due to no subscribers">>},
+    %% Messages forward
+    {counter, 'messages.forward', <<"Number of messages forwarded to other nodes">>},
+    %% Messages delayed
+    {counter, 'messages.delayed', <<"Number of delay-published messages">>},
+    %% Messages delivered
+    {counter, 'messages.delivered',
+        <<"Number of messages forwarded to the subscription process internally">>},
+    %% Messages acked
+    {counter, 'messages.acked', <<"Number of received PUBACK and PUBREC packet">>},
+    %% Messages persistently stored
+    {counter, 'messages.persisted', <<"Number of message persisted">>}
+]).
+
+%% Delivery metrics
+-define(DELIVERY_METRICS, [
+    %% All Dropped during delivery
+    {counter, 'delivery.dropped', <<"Total number of discarded messages when sending">>},
+    %% Dropped due to no_local
+    {counter, 'delivery.dropped.no_local', <<
+        "Number of messages that were dropped due to the No Local subscription "
+        "option when sending"
+    >>},
+    %% Dropped due to message too large
+    {counter, 'delivery.dropped.too_large', <<
+        "The number of messages that were dropped because the length exceeded "
+        "the limit when sending"
+    >>},
+    %% Dropped qos0 message
+    {counter, 'delivery.dropped.qos0_msg', <<
+        "Number of messages with QoS 0 that were dropped because the message "
+        "queue was full when sending"
+    >>},
+    %% Dropped due to queue full
+    {counter, 'delivery.dropped.queue_full', <<
+        "Number of messages with a non-zero QoS that were dropped because the "
+        "message queue was full when sending"
+    >>},
+    %% Dropped due to expired
+    {counter, 'delivery.dropped.expired',
+        <<"Number of messages dropped due to message expiration on sending">>}
+]).
+
+%% Client Lifecircle metrics
+-define(CLIENT_METRICS, [
+    {counter, 'client.connect', <<"Number of client connections">>},
+    {counter, 'client.connack', <<"Number of CONNACK packet sent">>},
+    {counter, 'client.connected', <<"Number of successful client connected">>},
+    {counter, 'client.authenticate', <<"Number of client Authentication">>},
+    {counter, 'client.auth.anonymous', <<"Number of clients who log in anonymously">>},
+    {counter, 'client.authorize', <<"Number of Authorization rule checks">>},
+    {counter, 'client.subscribe', <<"Number of client subscriptions">>},
+    {counter, 'client.unsubscribe', <<"Number of client unsubscriptions">>},
+    {counter, 'client.disconnected', <<"Number of client disconnects">>}
+]).
+
+%% Session Lifecircle metrics
+-define(SESSION_METRICS, [
+    {counter, 'session.created', <<"Number of sessions created">>},
+    {counter, 'session.resumed',
+        <<"Number of sessions resumed because Clean Session or Clean Start is false">>},
+    {counter, 'session.takenover',
+        <<"Number of sessions takenover because Clean Session or Clean Start is false">>},
+    {counter, 'session.discarded',
+        <<"Number of sessions dropped because Clean Session or Clean Start is true">>},
+    {counter, 'session.terminated', <<"Number of terminated sessions">>}
+]).
+
+%% Statistic metrics for ACL checking
+-define(STASTS_ACL_METRICS, [
+    {counter, 'authorization.allow', <<"Number of Authorization allow">>},
+    {counter, 'authorization.deny', <<"Number of Authorization deny">>},
+    {counter, 'authorization.cache_hit', <<"Number of Authorization hits the cache">>},
+    {counter, 'authorization.cache_miss', <<"Number of Authorization cache missing">>}
+]).
+
+%% Statistic metrics for auth checking
+-define(STASTS_AUTHN_METRICS, [
+    {counter, 'authentication.success', <<"Number of successful client Authentication">>},
+    {counter, 'authentication.success.anonymous',
+        <<"Number of successful client Authentication due to anonymous">>},
+    {counter, 'authentication.failure', <<"Number of failed client Authentication">>}
+]).
+
+%% Overload protection counters
+-define(OLP_METRICS, [
+    {counter, 'overload_protection.delay.ok', <<"Number of overload protection delayed">>},
+    {counter, 'overload_protection.delay.timeout',
+        <<"Number of overload protection delay timeout">>},
+    {counter, 'overload_protection.hibernation', <<"Number of overload protection hibernation">>},
+    {counter, 'overload_protection.gc', <<"Number of overload protection garbage collection">>},
+    {counter, 'overload_protection.new_conn',
+        <<"Number of overload protection close new incoming connection">>}
+]).
+
+-endif.

+ 2 - 0
apps/emqx/include/emqx_placeholder.hrl

@@ -28,8 +28,10 @@
 %% cert
 -define(VAR_CERT_SUBJECT, "cert_subject").
 -define(VAR_CERT_CN_NAME, "cert_common_name").
+-define(VAR_CERT_PEM, "cert_pem").
 -define(PH_CERT_SUBJECT, ?PH(?VAR_CERT_SUBJECT)).
 -define(PH_CERT_CN_NAME, ?PH(?VAR_CERT_CN_NAME)).
+-define(PH_CERT_PEM, ?PH(?VAR_CERT_PEM)).
 
 %% MQTT/Gateway
 -define(VAR_PASSWORD, "password").

+ 5 - 0
apps/emqx/include/emqx_schema.hrl

@@ -21,4 +21,9 @@
 -define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion).
 -define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
 
+%%--------------------------------------------------------------------
+%% EE injections
+%%--------------------------------------------------------------------
+-define(EMQX_SSL_FUN_MFA(Name), {emqx_ssl_fun_mfa, Name}).
+
 -endif.

+ 1 - 0
apps/emqx/src/emqx_access_control.erl

@@ -238,6 +238,7 @@ inc_authn_metrics(error) ->
 inc_authn_metrics(ok) ->
     emqx_metrics:inc('authentication.success');
 inc_authn_metrics(anonymous) ->
+    emqx_metrics:inc('client.auth.anonymous'),
     emqx_metrics:inc('authentication.success.anonymous'),
     emqx_metrics:inc('authentication.success').
 

+ 18 - 4
apps/emqx/src/emqx_channel.erl

@@ -1733,6 +1733,16 @@ count_flapping_event(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
 %%--------------------------------------------------------------------
 %% Authenticate
 
+%% If peercert exists, add it as `cert_pem` credential field.
+maybe_add_cert(Map, #channel{conninfo = ConnInfo}) ->
+    maybe_add_cert(Map, ConnInfo);
+maybe_add_cert(Map, #{peercert := PeerCert}) when is_binary(PeerCert) ->
+    %% NOTE: it's raw binary at this point,
+    %% encoding to PEM (base64) is done lazy in emqx_authn_utils:render_var
+    Map#{cert_pem => PeerCert};
+maybe_add_cert(Map, _) ->
+    Map.
+
 authenticate(
     ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -1745,20 +1755,23 @@ authenticate(
         auth_cache = AuthCache
     } = Channel
 ) ->
+    %% Auth with CONNECT packet for MQTT v5
     AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
-    do_authenticate(
+    Credential0 =
         ClientInfo#{
             auth_method => AuthMethod,
             auth_data => AuthData,
             auth_cache => AuthCache
         },
-        Channel
-    );
+    Credential = maybe_add_cert(Credential0, Channel),
+    do_authenticate(Credential, Channel);
 authenticate(
     ?CONNECT_PACKET(#mqtt_packet_connect{password = Password}),
     #channel{clientinfo = ClientInfo} = Channel
 ) ->
-    do_authenticate(ClientInfo#{password => Password}, Channel);
+    %% Auth with CONNECT packet for MQTT v3
+    Credential = maybe_add_cert(ClientInfo#{password => Password}, Channel),
+    do_authenticate(Credential, Channel);
 authenticate(
     ?AUTH_PACKET(_, #{'Authentication-Method' := AuthMethod} = Properties),
     #channel{
@@ -1767,6 +1780,7 @@ authenticate(
         auth_cache = AuthCache
     } = Channel
 ) ->
+    %% Enhanced auth
     case emqx_mqtt_props:get('Authentication-Method', ConnProps, undefined) of
         AuthMethod ->
             AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),

+ 30 - 50
apps/emqx/src/emqx_connection.erl

@@ -158,31 +158,6 @@
 
 -define(ENABLED(X), (X =/= undefined)).
 
--define(ALARM_TCP_CONGEST(Channel),
-    list_to_binary(
-        io_lib:format(
-            "mqtt_conn/congested/~ts/~ts",
-            [
-                emqx_channel:info(clientid, Channel),
-                emqx_channel:info(username, Channel)
-            ]
-        )
-    )
-).
-
--define(ALARM_CONN_INFO_KEYS, [
-    socktype,
-    sockname,
-    peername,
-    clientid,
-    username,
-    proto_name,
-    proto_ver,
-    connected_at
-]).
--define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
--define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-
 -define(LIMITER_BYTES_IN, bytes).
 -define(LIMITER_MESSAGE_IN, messages).
 
@@ -603,17 +578,6 @@ handle_msg(
     ActiveN = get_active_n(Type, Listener),
     Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
     with_channel(handle_deliver, [Delivers], State);
-%% Something sent
-handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) ->
-    case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of
-        true ->
-            Pubs = emqx_pd:reset_counter(outgoing_pubs),
-            Bytes = emqx_pd:reset_counter(outgoing_bytes),
-            OutStats = #{cnt => Pubs, oct => Bytes},
-            {ok, check_oom(run_gc(OutStats, State))};
-        false ->
-            ok
-    end;
 handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
     handle_info({sock_error, Reason}, State);
 handle_msg({connack, ConnAck}, State) ->
@@ -729,9 +693,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
             shutdown(Reason, Reply, State#state{channel = NChannel});
         {shutdown, Reason, Reply, OutPacket, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(OutPacket, NState),
-            NState2 = graceful_shutdown_transport(Reason, NState),
-            shutdown(Reason, Reply, NState2)
+            {ok, NState2} = handle_outgoing(OutPacket, NState),
+            NState3 = graceful_shutdown_transport(Reason, NState2),
+            shutdown(Reason, Reply, NState3)
     end.
 
 %%--------------------------------------------------------------------
@@ -854,8 +818,8 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
             shutdown(Reason, State#state{channel = NChannel});
         {shutdown, Reason, Packet, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(Packet, NState),
-            shutdown(Reason, NState)
+            {ok, NState2} = handle_outgoing(Packet, NState),
+            shutdown(Reason, NState2)
     end.
 
 %%--------------------------------------------------------------------
@@ -909,20 +873,36 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 %%--------------------------------------------------------------------
 %% Send data
 
--spec send(iodata(), state()) -> ok.
-send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
+-spec send(iodata(), state()) -> {ok, state()}.
+send(IoData, #state{transport = Transport, socket = Socket} = State) ->
     Oct = iolist_size(IoData),
-    ok = emqx_metrics:inc('bytes.sent', Oct),
+    emqx_metrics:inc('bytes.sent', Oct),
     inc_counter(outgoing_bytes, Oct),
-    emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
-    case Transport:async_send(Socket, IoData, []) of
+    case Transport:send(Socket, IoData) of
         ok ->
-            ok;
+            %% NOTE: for Transport=emqx_quic_stream, it's actually an
+            %% async_send, sent/1 should technically be called when
+            %% {quic, send_complete, _Stream, true | false} is received,
+            %% but it is handled early for simplicity
+            sent(State);
         Error = {error, _Reason} ->
-            %% Send an inet_reply to postpone handling the error
-            %% @FIXME: why not just return error?
+            %% Defer error handling
+            %% so it's handled the same as tcp_closed or ssl_closed
             self() ! {inet_reply, Socket, Error},
-            ok
+            {ok, State}
+    end.
+
+%% Some bytes sent
+sent(#state{listener = {Type, Listener}} = State) ->
+    %% Run GC and check OOM after certain amount of messages or bytes sent.
+    case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of
+        true ->
+            Pubs = emqx_pd:reset_counter(outgoing_pubs),
+            Bytes = emqx_pd:reset_counter(outgoing_bytes),
+            OutStats = #{cnt => Pubs, oct => Bytes},
+            {ok, check_oom(run_gc(OutStats, State))};
+        false ->
+            {ok, State}
     end.
 
 %%--------------------------------------------------------------------

+ 20 - 3
apps/emqx/src/emqx_listeners.erl

@@ -611,7 +611,9 @@ esockd_opts(ListenerId, Type, Name, Opts0) ->
             ssl ->
                 OptsWithCRL = inject_crl_config(Opts0),
                 OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL),
-                SSLOpts = ssl_opts(OptsWithSNI),
+                OptsWithRootFun = inject_root_fun(OptsWithSNI),
+                OptsWithVerifyFun = inject_verify_fun(OptsWithRootFun),
+                SSLOpts = ssl_opts(OptsWithVerifyFun),
                 Opts3#{ssl_options => SSLOpts, tcp_options => tcp_opts(Opts0)}
         end
     ).
@@ -635,8 +637,18 @@ ranch_opts(Type, Opts = #{bind := ListenOn}) ->
     MaxConnections = maps:get(max_connections, Opts, 1024),
     SocketOpts =
         case Type of
-            wss -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts));
-            ws -> tcp_opts(Opts)
+            wss ->
+                tcp_opts(Opts) ++
+                    lists:filter(
+                        fun
+                            ({partial_chain, _}) -> false;
+                            ({handshake_timeout, _}) -> false;
+                            (_) -> true
+                        end,
+                        ssl_opts(Opts)
+                    );
+            ws ->
+                tcp_opts(Opts)
         end,
     #{
         num_acceptors => NumAcceptors,
@@ -962,6 +974,11 @@ quic_listener_optional_settings() ->
         stateless_operation_expiration_ms
     ].
 
+inject_root_fun(#{ssl_options := SSLOpts} = Opts) ->
+    Opts#{ssl_options := emqx_tls_lib:maybe_inject_ssl_fun(root_fun, SSLOpts)}.
+inject_verify_fun(#{ssl_options := SSLOpts} = Opts) ->
+    Opts#{ssl_options := emqx_tls_lib:maybe_inject_ssl_fun(verify_fun, SSLOpts)}.
+
 inject_sni_fun(ListenerId, Conf = #{ssl_options := #{ocsp := #{enable_ocsp_stapling := true}}}) ->
     emqx_ocsp_cache:inject_sni_fun(ListenerId, Conf);
 inject_sni_fun(_ListenerId, Conf) ->

+ 6 - 207
apps/emqx/src/emqx_metrics.erl

@@ -22,6 +22,7 @@
 -include("logger.hrl").
 -include("types.hrl").
 -include("emqx_mqtt.hrl").
+-include("emqx_metrics.hrl").
 
 -export([
     start_link/0,
@@ -86,210 +87,8 @@
 -define(TAB, ?MODULE).
 -define(SERVER, ?MODULE).
 
-%% Bytes sent and received
--define(BYTES_METRICS,
-    % Total bytes received
-    [
-        {counter, 'bytes.received'},
-        % Total bytes sent
-        {counter, 'bytes.sent'}
-    ]
-).
-
-%% Packets sent and received
--define(PACKET_METRICS,
-    % All Packets received
-    [
-        {counter, 'packets.received'},
-        % All Packets sent
-        {counter, 'packets.sent'},
-        % CONNECT Packets received
-        {counter, 'packets.connect.received'},
-        % CONNACK Packets sent
-        {counter, 'packets.connack.sent'},
-        % CONNACK error sent
-        {counter, 'packets.connack.error'},
-        % CONNACK auth_error sent
-        {counter, 'packets.connack.auth_error'},
-        % PUBLISH packets received
-        {counter, 'packets.publish.received'},
-        % PUBLISH packets sent
-        {counter, 'packets.publish.sent'},
-        % PUBLISH packet_id inuse
-        {counter, 'packets.publish.inuse'},
-        % PUBLISH failed for error
-        {counter, 'packets.publish.error'},
-        % PUBLISH failed for auth error
-        {counter, 'packets.publish.auth_error'},
-        % PUBLISH(QoS2) packets dropped
-        {counter, 'packets.publish.dropped'},
-        % PUBACK packets received
-        {counter, 'packets.puback.received'},
-        % PUBACK packets sent
-        {counter, 'packets.puback.sent'},
-        % PUBACK packet_id inuse
-        {counter, 'packets.puback.inuse'},
-        % PUBACK packets missed
-        {counter, 'packets.puback.missed'},
-        % PUBREC packets received
-        {counter, 'packets.pubrec.received'},
-        % PUBREC packets sent
-        {counter, 'packets.pubrec.sent'},
-        % PUBREC packet_id inuse
-        {counter, 'packets.pubrec.inuse'},
-        % PUBREC packets missed
-        {counter, 'packets.pubrec.missed'},
-        % PUBREL packets received
-        {counter, 'packets.pubrel.received'},
-        % PUBREL packets sent
-        {counter, 'packets.pubrel.sent'},
-        % PUBREL packets missed
-        {counter, 'packets.pubrel.missed'},
-        % PUBCOMP packets received
-        {counter, 'packets.pubcomp.received'},
-        % PUBCOMP packets sent
-        {counter, 'packets.pubcomp.sent'},
-        % PUBCOMP packet_id inuse
-        {counter, 'packets.pubcomp.inuse'},
-        % PUBCOMP packets missed
-        {counter, 'packets.pubcomp.missed'},
-        % SUBSCRIBE Packets received
-        {counter, 'packets.subscribe.received'},
-        % SUBSCRIBE error
-        {counter, 'packets.subscribe.error'},
-        % SUBSCRIBE failed for not auth
-        {counter, 'packets.subscribe.auth_error'},
-        % SUBACK packets sent
-        {counter, 'packets.suback.sent'},
-        % UNSUBSCRIBE Packets received
-        {counter, 'packets.unsubscribe.received'},
-        % UNSUBSCRIBE error
-        {counter, 'packets.unsubscribe.error'},
-        % UNSUBACK Packets sent
-        {counter, 'packets.unsuback.sent'},
-        % PINGREQ packets received
-        {counter, 'packets.pingreq.received'},
-        % PINGRESP Packets sent
-        {counter, 'packets.pingresp.sent'},
-        % DISCONNECT Packets received
-        {counter, 'packets.disconnect.received'},
-        % DISCONNECT Packets sent
-        {counter, 'packets.disconnect.sent'},
-        % Auth Packets received
-        {counter, 'packets.auth.received'},
-        % Auth Packets sent
-        {counter, 'packets.auth.sent'}
-    ]
-).
-
-%% Messages sent/received and pubsub
--define(MESSAGE_METRICS,
-    % All Messages received
-    [
-        {counter, 'messages.received'},
-        % All Messages sent
-        {counter, 'messages.sent'},
-        % QoS0 Messages received
-        {counter, 'messages.qos0.received'},
-        % QoS0 Messages sent
-        {counter, 'messages.qos0.sent'},
-        % QoS1 Messages received
-        {counter, 'messages.qos1.received'},
-        % QoS1 Messages sent
-        {counter, 'messages.qos1.sent'},
-        % QoS2 Messages received
-        {counter, 'messages.qos2.received'},
-        % QoS2 Messages sent
-        {counter, 'messages.qos2.sent'},
-        %% PubSub Metrics
-
-        % Messages Publish
-        {counter, 'messages.publish'},
-        % Messages dropped due to no subscribers
-        {counter, 'messages.dropped'},
-        %% % Messages that failed validations
-        {counter, 'messages.validation_failed'},
-        %% % Messages that passed validations
-        {counter, 'messages.validation_succeeded'},
-        %% % Messages that failed transformations
-        {counter, 'messages.transformation_failed'},
-        %% % Messages that passed transformations
-        {counter, 'messages.transformation_succeeded'},
-        % QoS2 Messages expired
-        {counter, 'messages.dropped.await_pubrel_timeout'},
-        % Messages dropped
-        {counter, 'messages.dropped.no_subscribers'},
-        % Messages forward
-        {counter, 'messages.forward'},
-        % Messages delayed
-        {counter, 'messages.delayed'},
-        % Messages delivered
-        {counter, 'messages.delivered'},
-        % Messages acked
-        {counter, 'messages.acked'},
-        % Messages persistently stored
-        {counter, 'messages.persisted'}
-    ]
-).
-
-%% Delivery metrics
--define(DELIVERY_METRICS, [
-    {counter, 'delivery.dropped'},
-    {counter, 'delivery.dropped.no_local'},
-    {counter, 'delivery.dropped.too_large'},
-    {counter, 'delivery.dropped.qos0_msg'},
-    {counter, 'delivery.dropped.queue_full'},
-    {counter, 'delivery.dropped.expired'}
-]).
-
-%% Client Lifecircle metrics
--define(CLIENT_METRICS, [
-    {counter, 'client.connect'},
-    {counter, 'client.connack'},
-    {counter, 'client.connected'},
-    {counter, 'client.authenticate'},
-    {counter, 'client.auth.anonymous'},
-    {counter, 'client.authorize'},
-    {counter, 'client.subscribe'},
-    {counter, 'client.unsubscribe'},
-    {counter, 'client.disconnected'}
-]).
-
-%% Session Lifecircle metrics
--define(SESSION_METRICS, [
-    {counter, 'session.created'},
-    {counter, 'session.resumed'},
-    {counter, 'session.takenover'},
-    {counter, 'session.discarded'},
-    {counter, 'session.terminated'}
-]).
-
-%% Statistic metrics for ACL checking
--define(STASTS_ACL_METRICS, [
-    {counter, 'authorization.allow'},
-    {counter, 'authorization.deny'},
-    {counter, 'authorization.cache_hit'},
-    {counter, 'authorization.cache_miss'}
-]).
-
-%% Statistic metrics for auth checking
--define(STASTS_AUTHN_METRICS, [
-    {counter, 'authentication.success'},
-    {counter, 'authentication.success.anonymous'},
-    {counter, 'authentication.failure'}
-]).
-
-%% Overload protection counters
--define(OLP_METRICS, [
-    {counter, 'overload_protection.delay.ok'},
-    {counter, 'overload_protection.delay.timeout'},
-    {counter, 'overload_protection.hibernation'},
-    {counter, 'overload_protection.gc'},
-    {counter, 'overload_protection.new_conn'}
-]).
-
 olp_metrics() ->
-    lists:map(fun({_, Metric}) -> Metric end, ?OLP_METRICS).
+    lists:map(fun({_, Metric, _}) -> Metric end, ?OLP_METRICS).
 
 -record(state, {next_idx = 1}).
 
@@ -570,7 +369,7 @@ init([]) ->
     ]),
     % Store reserved indices
     ok = lists:foreach(
-        fun({Type, Name}) ->
+        fun({Type, Name, _Desc}) ->
             Idx = reserved_idx(Name),
             Metric = #metric{name = Name, type = Type, idx = Idx},
             true = ets:insert(?TAB, Metric),
@@ -684,11 +483,11 @@ reserved_idx('messages.dropped') -> 109;
 reserved_idx('messages.dropped.await_pubrel_timeout') -> 110;
 reserved_idx('messages.dropped.no_subscribers') -> 111;
 reserved_idx('messages.forward') -> 112;
-%%reserved_idx('messages.retained')            -> 113; %% keep the index, new metrics can use this
+%% reserved_idx('messages.retained') -> 113; %% keep the index, new metrics can use this
 reserved_idx('messages.delayed') -> 114;
 reserved_idx('messages.delivered') -> 115;
 reserved_idx('messages.acked') -> 116;
-reserved_idx('delivery.expired') -> 117;
+%% reserved_idx('delivery.expired') -> 117; %% have never used
 reserved_idx('delivery.dropped') -> 118;
 reserved_idx('delivery.dropped.no_local') -> 119;
 reserved_idx('delivery.dropped.too_large') -> 120;
@@ -699,7 +498,7 @@ reserved_idx('client.connect') -> 200;
 reserved_idx('client.connack') -> 201;
 reserved_idx('client.connected') -> 202;
 reserved_idx('client.authenticate') -> 203;
-reserved_idx('client.enhanced_authenticate') -> 204;
+%% reserved_idx('client.enhanced_authenticate') -> 204; %% have never used
 reserved_idx('client.auth.anonymous') -> 205;
 reserved_idx('client.authorize') -> 206;
 reserved_idx('client.subscribe') -> 207;

+ 2 - 2
apps/emqx/src/emqx_quic_stream.erl

@@ -34,7 +34,7 @@
     fast_close/1,
     shutdown/2,
     ensure_ok_or_exit/2,
-    async_send/3,
+    send/2,
     setopts/2,
     getopts/2,
     peername/1,
@@ -165,7 +165,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
             Result
     end.
 
-async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
+send({quic, _Conn, Stream, _Info}, Data) ->
     case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of
         {ok, _Len} -> ok;
         {error, X, Y} -> {error, {X, Y}};

+ 3 - 1
apps/emqx/src/emqx_schema.erl

@@ -191,6 +191,8 @@
 -define(DEFAULT_MULTIPLIER, 1.5).
 -define(DEFAULT_BACKOFF, 0.75).
 
+-define(INJECTING_CONFIGS, [?AUTH_EXT_SCHEMA_MODS]).
+
 namespace() -> emqx.
 
 tags() ->
@@ -2247,7 +2249,7 @@ common_ssl_opts_schema(Defaults, Type) ->
                     desc => ?DESC(common_ssl_opts_schema_hibernate_after)
                 }
             )}
-    ].
+    ] ++ emqx_schema_hooks:injection_point('common_ssl_opts_schema').
 
 %% @doc Make schema for SSL listener options.
 -spec server_ssl_opts_schema(map(), boolean()) -> hocon_schema:field_schema().

+ 15 - 0
apps/emqx/src/emqx_tls_lib.erl

@@ -15,6 +15,7 @@
 %%--------------------------------------------------------------------
 
 -module(emqx_tls_lib).
+-elvis([{elvis_style, atom_naming_convention, #{regex => "^([a-z][a-z0-9A-Z]*_?)*(_SUITE)?$"}}]).
 
 %% version & cipher suites
 -export([
@@ -44,10 +45,13 @@
     to_client_opts/2
 ]).
 
+-export([maybe_inject_ssl_fun/2]).
+
 %% ssl:tls_version/0 is not exported.
 -type tls_version() :: tlsv1 | 'tlsv1.1' | 'tlsv1.2' | 'tlsv1.3'.
 
 -include("logger.hrl").
+-include("emqx_schema.hrl").
 
 -define(IS_TRUE(Val), ((Val =:= true) orelse (Val =:= <<"true">>))).
 -define(IS_FALSE(Val), ((Val =:= false) orelse (Val =:= <<"false">>))).
@@ -685,3 +689,14 @@ ensure_ssl_file_key(SSL, RequiredKeyPaths) ->
         [] -> ok;
         Miss -> {error, #{reason => ssl_file_option_not_found, which_options => Miss}}
     end.
+
+-spec maybe_inject_ssl_fun(root_fun | verify_fun, map()) -> map().
+maybe_inject_ssl_fun(FunName, SslOpts) ->
+    case persistent_term:get(?EMQX_SSL_FUN_MFA(FunName), undefined) of
+        undefined ->
+            SslOpts;
+        {M, F, A} ->
+            %% We should have one entry not a list of {M,F,A},
+            %% as ordering matters in validations
+            erlang:apply(M, F, [SslOpts | A])
+    end.

+ 9 - 13
apps/emqx/test/emqx_connection_SUITE.erl

@@ -94,8 +94,7 @@ init_per_testcase(TestCase, Config) when
     ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
         {ok, [{K, 0} || K <- Options]}
     end),
-    ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
-    ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
+    ok = meck:expect(emqx_transport, send, fun(_Sock, _Data) -> ok end),
     ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
     case erlang:function_exported(?MODULE, TestCase, 2) of
         true -> ?MODULE:TestCase(init, Config);
@@ -234,9 +233,11 @@ t_handle_msg_incoming(_) ->
     ?assertMatch({ok, _St}, handle_msg({incoming, undefined}, st())).
 
 t_handle_msg_outgoing(_) ->
-    ?assertEqual(ok, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
-    ?assertEqual(ok, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
-    ?assertEqual(ok, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
+    ?assertMatch(
+        {ok, _}, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())
+    ),
+    ?assertMatch({ok, _}, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
+    ?assertMatch({ok, _}, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
 
 t_handle_msg_tcp_error(_) ->
     ?assertMatch(
@@ -255,18 +256,13 @@ t_handle_msg_deliver(_) ->
     ?assertMatch({ok, _St}, handle_msg({deliver, topic, msg}, st())).
 
 t_handle_msg_inet_reply(_) ->
-    ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
-    emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 0),
-    ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())),
-    emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 100),
-    ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())),
     ?assertMatch(
         {stop, {shutdown, for_testing}, _St},
         handle_msg({inet_reply, for_testing, {error, for_testing}}, st())
     ).
 
 t_handle_msg_connack(_) ->
-    ?assertEqual(ok, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
+    ?assertMatch({ok, _}, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
 
 t_handle_msg_close(_) ->
     ?assertMatch({stop, {shutdown, normal}, _St}, handle_msg({close, normal}, st())).
@@ -399,8 +395,8 @@ t_with_channel(_) ->
     meck:unload(emqx_channel).
 
 t_handle_outgoing(_) ->
-    ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
-    ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
+    ?assertMatch({ok, _}, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
+    ?assertMatch({ok, _}, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
 
 t_handle_info(_) ->
     ?assertMatch(

+ 307 - 0
apps/emqx/test/emqx_test_tls_certs_helper.erl

@@ -0,0 +1,307 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_test_tls_certs_helper).
+-export([
+    gen_ca/2,
+    gen_host_cert/3,
+    gen_host_cert/4,
+
+    select_free_port/1,
+    generate_tls_certs/1,
+
+    fail_when_ssl_error/1,
+    fail_when_ssl_error/2,
+    fail_when_no_ssl_alert/2,
+    fail_when_no_ssl_alert/3,
+
+    emqx_start_listener/4
+]).
+
+-include_lib("common_test/include/ct.hrl").
+
+%%-------------------------------------------------------------------------------
+%% Start Listener
+%%-------------------------------------------------------------------------------
+emqx_start_listener(Name, Type, Port, Opts) when is_list(Opts) ->
+    emqx_start_listener(Name, Type, Port, maps:from_list(Opts));
+emqx_start_listener(Name, ssl, Port, #{ssl_options := SslOptions} = Opts0) ->
+    Opts = Opts0#{
+        enable => true,
+        bind => {{127, 0, 0, 1}, Port},
+        mountpoint => <<>>,
+        zone => default,
+        ssl_options => maps:from_list(SslOptions)
+    },
+    ct:pal("start listener with ~p ~p", [Name, Opts]),
+    emqx_listeners:start_listener(ssl, Name, Opts).
+
+%%-------------------------------------------------------------------------------
+%% TLS certs
+%%-------------------------------------------------------------------------------
+gen_ca(Path, Name) ->
+    %% Generate ca.pem and ca.key which will be used to generate certs
+    %% for hosts server and clients
+    ECKeyFile = eckey_name(Path),
+    filelib:ensure_dir(ECKeyFile),
+    os:cmd("openssl ecparam -name secp256r1 > " ++ ECKeyFile),
+    Cmd = lists:flatten(
+        io_lib:format(
+            "openssl req -new -x509 -nodes "
+            "-newkey ec:~s "
+            "-keyout ~s -out ~s -days 3650 "
+            "-addext basicConstraints=CA:TRUE "
+            "-subj \"/C=SE/O=TEST CA\"",
+            [
+                ECKeyFile,
+                ca_key_name(Path, Name),
+                ca_cert_name(Path, Name)
+            ]
+        )
+    ),
+    os:cmd(Cmd).
+
+ca_cert_name(Path, Name) ->
+    filename(Path, "~s.pem", [Name]).
+ca_key_name(Path, Name) ->
+    filename(Path, "~s.key", [Name]).
+
+eckey_name(Path) ->
+    filename(Path, "ec.key", []).
+
+gen_host_cert(H, CaName, Path) ->
+    gen_host_cert(H, CaName, Path, #{}).
+
+gen_host_cert(H, CaName, Path, Opts) ->
+    ECKeyFile = eckey_name(Path),
+    CN = str(H),
+    HKey = filename(Path, "~s.key", [H]),
+    HCSR = filename(Path, "~s.csr", [H]),
+    HCSR2 = filename(Path, "~s.csr", [H]),
+    HPEM = filename(Path, "~s.pem", [H]),
+    HPEM2 = filename(Path, "~s_renewed.pem", [H]),
+    HEXT = filename(Path, "~s.extfile", [H]),
+    PasswordArg =
+        case maps:get(password, Opts, undefined) of
+            undefined ->
+                " -nodes ";
+            Password ->
+                io_lib:format(" -passout pass:'~s' ", [Password])
+        end,
+
+    create_file(
+        HEXT,
+        "keyUsage=digitalSignature,keyAgreement,keyCertSign\n"
+        "basicConstraints=CA:TRUE \n"
+        "~s \n"
+        "subjectAltName=DNS:~s\n",
+        [maps:get(ext, Opts, ""), CN]
+    ),
+
+    CSR_Cmd = csr_cmd(PasswordArg, ECKeyFile, HKey, HCSR, CN),
+    CSR_Cmd2 = csr_cmd(PasswordArg, ECKeyFile, HKey, HCSR2, CN),
+
+    CERT_Cmd = cert_sign_cmd(
+        HEXT, HCSR, ca_cert_name(Path, CaName), ca_key_name(Path, CaName), HPEM
+    ),
+    %% 2nd cert for testing renewed cert.
+    CERT_Cmd2 = cert_sign_cmd(
+        HEXT, HCSR2, ca_cert_name(Path, CaName), ca_key_name(Path, CaName), HPEM2
+    ),
+    ct:pal(os:cmd(CSR_Cmd)),
+    ct:pal(os:cmd(CSR_Cmd2)),
+    ct:pal(os:cmd(CERT_Cmd)),
+    ct:pal(os:cmd(CERT_Cmd2)),
+    file:delete(HEXT).
+
+cert_sign_cmd(ExtFile, CSRFile, CACert, CAKey, OutputCert) ->
+    lists:flatten(
+        io_lib:format(
+            "openssl x509 -req "
+            "-extfile ~s "
+            "-in ~s -CA ~s -CAkey ~s -CAcreateserial "
+            "-out ~s -days 500",
+            [
+                ExtFile,
+                CSRFile,
+                CACert,
+                CAKey,
+                OutputCert
+            ]
+        )
+    ).
+
+csr_cmd(PasswordArg, ECKeyFile, HKey, HCSR, CN) ->
+    lists:flatten(
+        io_lib:format(
+            "openssl req -new ~s -newkey ec:~s "
+            "-keyout ~s -out ~s "
+            "-addext \"subjectAltName=DNS:~s\" "
+            "-addext basicConstraints=CA:TRUE "
+            "-addext keyUsage=digitalSignature,keyAgreement,keyCertSign "
+            "-subj \"/C=SE/O=TEST/CN=~s\"",
+            [PasswordArg, ECKeyFile, HKey, HCSR, CN, CN]
+        )
+    ).
+
+filename(Path, F, A) ->
+    filename:join(Path, str(io_lib:format(F, A))).
+
+str(Arg) ->
+    binary_to_list(iolist_to_binary(Arg)).
+
+create_file(Filename, Fmt, Args) ->
+    filelib:ensure_dir(Filename),
+    {ok, F} = file:open(Filename, [write]),
+    try
+        io:format(F, Fmt, Args)
+    after
+        file:close(F)
+    end,
+    ok.
+
+%% @doc get unused port from OS
+-spec select_free_port(tcp | udp | ssl | quic) -> inets:port_number().
+select_free_port(tcp) ->
+    select_free_port(gen_tcp, listen);
+select_free_port(udp) ->
+    select_free_port(gen_udp, open);
+select_free_port(ssl) ->
+    select_free_port(tcp);
+select_free_port(quic) ->
+    select_free_port(udp).
+
+select_free_port(GenModule, Fun) when
+    GenModule == gen_tcp orelse
+        GenModule == gen_udp
+->
+    {ok, S} = GenModule:Fun(0, [{reuseaddr, true}]),
+    {ok, Port} = inet:port(S),
+    ok = GenModule:close(S),
+    case os:type() of
+        {unix, darwin} ->
+            %% in MacOS, still get address_in_use after close port
+            timer:sleep(500);
+        _ ->
+            skip
+    end,
+    ct:pal("Select free OS port: ~p", [Port]),
+    Port.
+
+%% @doc fail the test if ssl_error recvd
+%%      post check for success conn establishment
+fail_when_ssl_error(Socket) ->
+    fail_when_ssl_error(Socket, 1000).
+fail_when_ssl_error(Socket, Timeout) ->
+    receive
+        {ssl_error, Socket, _} ->
+            ct:fail("Handshake failed!")
+    after Timeout ->
+        ok
+    end.
+
+%% @doc fail the test if no ssl_error
+fail_when_no_ssl_alert(Res, Alert) ->
+    fail_when_no_ssl_alert(Res, Alert, 1000).
+
+fail_when_no_ssl_alert({error, {tls_alert, {Alert, _}}}, Alert, _Timeout) ->
+    ok;
+fail_when_no_ssl_alert({error, _} = Other, Alert, _Timeout) ->
+    ct:fail("returned unexpected ssl_error: ~p, expected ~n", [Other, Alert]);
+fail_when_no_ssl_alert({ok, Socket}, Alert, Timeout) ->
+    fail_when_no_ssl_alert(Socket, Alert, Timeout);
+fail_when_no_ssl_alert(Socket, Alert, Timeout) ->
+    receive
+        {ssl_error, Socket, {tls_alert, {Alert, AlertInfo}}} ->
+            ct:pal("alert info: ~p~n", [AlertInfo]);
+        {ssl_error, Socket, Other} ->
+            ct:fail("recv unexpected ssl_error: ~p~n", [Other])
+    after Timeout ->
+        ct:fail("No expected alert: ~p from Socket: ~p ", [Alert, Socket])
+    end.
+
+%% @doc Generate TLS cert chain for tests
+generate_tls_certs(Config) ->
+    DataDir = ?config(data_dir, Config),
+    gen_ca(DataDir, "root"),
+    gen_host_cert("intermediate1", "root", DataDir),
+    gen_host_cert("intermediate2", "root", DataDir),
+    gen_host_cert("server1", "intermediate1", DataDir),
+    gen_host_cert("client1", "intermediate1", DataDir),
+    gen_host_cert("server2", "intermediate2", DataDir),
+    gen_host_cert("client2", "intermediate2", DataDir),
+
+    %% Build bundles below
+    os:cmd(
+        io_lib:format("cat ~p ~p ~p > ~p", [
+            filename:join(DataDir, "client2.pem"),
+            filename:join(DataDir, "intermediate2.pem"),
+            filename:join(DataDir, "root.pem"),
+            filename:join(DataDir, "client2-complete-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "client2.pem"),
+            filename:join(DataDir, "intermediate2.pem"),
+            filename:join(DataDir, "client2-intermediate2-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "client2.pem"),
+            filename:join(DataDir, "root.pem"),
+            filename:join(DataDir, "client2-root-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "server1.pem"),
+            filename:join(DataDir, "intermediate1.pem"),
+            filename:join(DataDir, "server1-intermediate1-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "intermediate1.pem"),
+            filename:join(DataDir, "server1.pem"),
+            filename:join(DataDir, "intermediate1-server1-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "intermediate1_renewed.pem"),
+            filename:join(DataDir, "root.pem"),
+            filename:join(DataDir, "intermediate1_renewed-root-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "intermediate2.pem"),
+            filename:join(DataDir, "intermediate2_renewed.pem"),
+            filename:join(DataDir, "intermediate2_renewed_old-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "intermediate1.pem"),
+            filename:join(DataDir, "root.pem"),
+            filename:join(DataDir, "intermediate1-root-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p ~p > ~p", [
+            filename:join(DataDir, "root.pem"),
+            filename:join(DataDir, "intermediate2.pem"),
+            filename:join(DataDir, "intermediate1.pem"),
+            filename:join(DataDir, "all-CAcerts-bundle.pem")
+        ])
+    ),
+    os:cmd(
+        io_lib:format("cat ~p ~p > ~p", [
+            filename:join(DataDir, "intermediate2.pem"),
+            filename:join(DataDir, "intermediate1.pem"),
+            filename:join(DataDir, "two-intermediates-bundle.pem")
+        ])
+    ).

+ 20 - 0
apps/emqx_auth_ext/.gitignore

@@ -0,0 +1,20 @@
+.rebar3
+_build
+_checkouts
+_vendor
+.eunit
+*.o
+*.beam
+*.plt
+*.swp
+*.swo
+.erlang.cookie
+ebin
+log
+erl_crash.dump
+.rebar
+logs
+.idea
+*.iml
+rebar3.crashdump
+*~

+ 94 - 0
apps/emqx_auth_ext/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2028-01-26
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 7 - 0
apps/emqx_auth_ext/README.md

@@ -0,0 +1,7 @@
+# EMQX Extended Auth Library 
+
+Library that extends EMQX authentication capbility for enterprise.
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

+ 2 - 0
apps/emqx_auth_ext/rebar.config

@@ -0,0 +1,2 @@
+{erl_opts, [debug_info]}.
+{deps, [{emqx, {path, "../emqx"}}]}.

+ 21 - 0
apps/emqx_auth_ext/src/emqx_auth_ext.app.src

@@ -0,0 +1,21 @@
+{application, emqx_auth_ext, [
+    {description, "EMQX Extended Auth Library"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        ssl,
+        emqx
+    ]},
+    {env, []},
+    {modules, [
+        emqx_auth_ext,
+        emqx_auth_ext_schema,
+        emqx_auth_ext_tls_lib,
+        emqx_auth_ext_tls_const_v1
+    ]},
+
+    {licenses, ["Apache-2.0"]},
+    {links, []}
+]}.

+ 28 - 0
apps/emqx_auth_ext/src/emqx_auth_ext.erl

@@ -0,0 +1,28 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_auth_ext).
+
+-include_lib("emqx/include/emqx_schema.hrl").
+
+-on_load(on_load/0).
+
+-export([]).
+
+-spec on_load() -> ok.
+on_load() ->
+    init_ssl_fun_cb().
+
+init_ssl_fun_cb() ->
+    lists:foreach(
+        fun({FunName, {_, _, _} = MFA}) ->
+            persistent_term:put(
+                ?EMQX_SSL_FUN_MFA(FunName),
+                MFA
+            )
+        end,
+        [
+            {root_fun, {emqx_auth_ext_tls_lib, opt_partial_chain, []}},
+            {verify_fun, {emqx_auth_ext_tls_lib, opt_verify_fun, []}}
+        ]
+    ).

+ 42 - 0
apps/emqx_auth_ext/src/emqx_auth_ext_schema.erl

@@ -0,0 +1,42 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_auth_ext_schema).
+-behaviour(emqx_schema_hooks).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%%------------------------------------------------------------------------------
+%% emqx_schema_hooks callbacks
+%%------------------------------------------------------------------------------
+-export([injected_fields/0]).
+
+-spec injected_fields() -> #{emqx_schema_hooks:hookpoint() => [hocon_schema:field()]}.
+injected_fields() ->
+    #{
+        'common_ssl_opts_schema' => fields(auth_ext)
+    }.
+
+fields(auth_ext) ->
+    [
+        {"partial_chain",
+            sc(
+                hoconsc:enum([true, false, two_cacerts_from_cacertfile, cacert_from_cacertfile]),
+                #{
+                    required => false,
+                    desc => ?DESC(common_ssl_opts_schema_partial_chain)
+                }
+            )},
+        {"verify_peer_ext_key_usage",
+            sc(
+                string(),
+                #{
+                    required => false,
+                    desc => ?DESC(common_ssl_opts_verify_peer_ext_key_usage)
+                }
+            )}
+    ].
+
+sc(Type, Meta) -> hoconsc:mk(Type, Meta).

+ 111 - 0
apps/emqx_auth_ext/src/emqx_auth_ext_tls_const_v1.erl

@@ -0,0 +1,111 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_auth_ext_tls_const_v1).
+-elvis([{elvis_style, atom_naming_convention, #{regex => "^([a-z][a-z0-9A-Z]*_?)*(_SUITE)?$"}}]).
+
+-export([
+    make_tls_root_fun/2,
+    make_tls_verify_fun/2
+]).
+
+-include_lib("public_key/include/public_key.hrl").
+%% @doc Build a root fun for verify TLS partial_chain.
+%% The `InputChain' is composed by OTP SSL with local cert store
+%% AND the cert (chain if any) from the client.
+%% @end
+make_tls_root_fun(cacert_from_cacertfile, [Trusted]) ->
+    %% Allow only one trusted ca cert, and just return the defined trusted CA cert,
+    fun(_InputChain) ->
+        %% Note, returing `trusted_ca` doesn't really mean it accepts the connection
+        %% OTP SSL app will do the path validation, signature validation subsequently.
+        {trusted_ca, Trusted}
+    end;
+make_tls_root_fun(cacert_from_cacertfile, [TrustedOne, TrustedTwo]) ->
+    %% Allow two trusted CA certs in case of CA cert renewal
+    %% This is a little expensive call as it compares the binaries.
+    fun(InputChain) ->
+        case lists:member(TrustedOne, InputChain) of
+            true ->
+                {trusted_ca, TrustedOne};
+            false ->
+                {trusted_ca, TrustedTwo}
+        end
+    end.
+
+make_tls_verify_fun(verify_cert_extKeyUsage, KeyUsages) ->
+    RequiredKeyUsages = ext_key_opts(KeyUsages),
+    {fun verify_fun_peer_extKeyUsage/3, RequiredKeyUsages}.
+
+verify_fun_peer_extKeyUsage(_, {bad_cert, invalid_ext_key_usage}, UserState) ->
+    %% !! Override OTP verify peer default
+    %% OTP SSL is unhappy with the ext_key_usage but we will check on our own.
+    {unknown, UserState};
+verify_fun_peer_extKeyUsage(_, {bad_cert, _} = Reason, _UserState) ->
+    %% OTP verify_peer default
+    {fail, Reason};
+verify_fun_peer_extKeyUsage(_, {extension, _}, UserState) ->
+    %% OTP verify_peer default
+    {unknown, UserState};
+verify_fun_peer_extKeyUsage(_, valid, UserState) ->
+    %% OTP verify_peer default
+    {valid, UserState};
+verify_fun_peer_extKeyUsage(
+    #'OTPCertificate'{tbsCertificate = #'OTPTBSCertificate'{extensions = ExtL}},
+    %% valid peer cert
+    valid_peer,
+    RequiredKeyUsages
+) ->
+    %% override OTP verify_peer default
+    %% must have id-ce-extKeyUsage
+    case lists:keyfind(?'id-ce-extKeyUsage', 2, ExtL) of
+        #'Extension'{extnID = ?'id-ce-extKeyUsage', extnValue = VL} ->
+            case do_verify_ext_key_usage(VL, RequiredKeyUsages) of
+                true ->
+                    %% pass the check,
+                    %% fallback to OTP verify_peer default
+                    {valid, RequiredKeyUsages};
+                false ->
+                    {fail, extKeyUsage_unmatched}
+            end;
+        _ ->
+            {fail, extKeyUsage_not_set}
+    end.
+
+%% @doc check required extkeyUsages are presented in the cert
+do_verify_ext_key_usage(_, []) ->
+    %% Verify finished
+    true;
+do_verify_ext_key_usage(CertExtL, [Usage | T] = _Required) ->
+    case lists:member(Usage, CertExtL) of
+        true ->
+            do_verify_ext_key_usage(CertExtL, T);
+        false ->
+            false
+    end.
+
+%% @doc Helper tls cert extension
+-spec ext_key_opts(string()) -> [OidString :: string() | public_key:oid()].
+ext_key_opts(Str) ->
+    Usages = string:tokens(Str, ","),
+    lists:map(
+        fun
+            ("clientAuth") ->
+                ?'id-kp-clientAuth';
+            ("serverAuth") ->
+                ?'id-kp-serverAuth';
+            ("codeSigning") ->
+                ?'id-kp-codeSigning';
+            ("emailProtection") ->
+                ?'id-kp-emailProtection';
+            ("timeStamping") ->
+                ?'id-kp-timeStamping';
+            ("ocspSigning") ->
+                ?'id-kp-OCSPSigning';
+            ("OID:" ++ OidStr) ->
+                OidList = string:tokens(OidStr, "."),
+                list_to_tuple(lists:map(fun list_to_integer/1, OidList))
+        end,
+        Usages
+    ).

+ 66 - 0
apps/emqx_auth_ext/src/emqx_auth_ext_tls_lib.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_auth_ext_tls_lib).
+-elvis([{elvis_style, atom_naming_convention, #{regex => "^([a-z][a-z0-9A-Z]*_?)*(_SUITE)?$"}}]).
+
+-export([
+    opt_partial_chain/1,
+    opt_verify_fun/1
+]).
+
+-include_lib("emqx/include/logger.hrl").
+
+-define(CONST_MOD_V1, emqx_auth_ext_tls_const_v1).
+%% @doc enable TLS partial_chain validation if set.
+-spec opt_partial_chain(SslOpts :: map()) -> NewSslOpts :: map().
+opt_partial_chain(#{partial_chain := false} = SslOpts) ->
+    maps:remove(partial_chain, SslOpts);
+opt_partial_chain(#{partial_chain := true} = SslOpts) ->
+    SslOpts#{partial_chain := rootfun_trusted_ca_from_cacertfile(1, SslOpts)};
+opt_partial_chain(#{partial_chain := cacert_from_cacertfile} = SslOpts) ->
+    SslOpts#{partial_chain := rootfun_trusted_ca_from_cacertfile(1, SslOpts)};
+opt_partial_chain(#{partial_chain := two_cacerts_from_cacertfile} = SslOpts) ->
+    SslOpts#{partial_chain := rootfun_trusted_ca_from_cacertfile(2, SslOpts)};
+opt_partial_chain(SslOpts) ->
+    SslOpts.
+
+%% @doc make verify_fun if set.
+-spec opt_verify_fun(SslOpts :: map()) -> NewSslOpts :: map().
+opt_verify_fun(#{verify_peer_ext_key_usage := V} = SslOpts) when V =/= undefined ->
+    SslOpts#{verify_fun => ?CONST_MOD_V1:make_tls_verify_fun(verify_cert_extKeyUsage, V)};
+opt_verify_fun(SslOpts) ->
+    SslOpts.
+
+%% @doc Helper, make TLS root_fun
+rootfun_trusted_ca_from_cacertfile(NumOfCerts, #{cacertfile := Cacertfile}) ->
+    case file:read_file(emqx_schema:naive_env_interpolation(Cacertfile)) of
+        {ok, PemBin} ->
+            try
+                do_rootfun_trusted_ca_from_cacertfile(NumOfCerts, PemBin)
+            catch
+                _Error:_Info:ST ->
+                    %% The cacertfile will be checked by OTP SSL as well and OTP choice to be silent on this.
+                    %% We are touching security sutffs, don't leak extra info..
+                    ?SLOG(error, #{
+                        msg => "trusted_cacert_not_found_in_cacertfile", stacktrace => ST
+                    }),
+                    throw({error, ?FUNCTION_NAME})
+            end;
+        {error, Reason} ->
+            throw({error, {read_cacertfile_error, Cacertfile, Reason}})
+    end;
+rootfun_trusted_ca_from_cacertfile(_NumOfCerts, _SslOpts) ->
+    throw({error, cacertfile_unset}).
+
+do_rootfun_trusted_ca_from_cacertfile(NumOfCerts, PemBin) ->
+    %% The last one or two should be the top parent in the chain if it is a chain
+    Certs = public_key:pem_decode(PemBin),
+    Pos = length(Certs) - NumOfCerts + 1,
+    Trusted = [
+        CADer
+     || {'Certificate', CADer, _} <-
+            lists:sublist(public_key:pem_decode(PemBin), Pos, NumOfCerts)
+    ],
+    ?CONST_MOD_V1:make_tls_root_fun(cacert_from_cacertfile, Trusted).

+ 247 - 0
apps/emqx_auth_ext/test/emqx_auth_ext_listener_tls_verify_chain_SUITE.erl

@@ -0,0 +1,247 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_auth_ext_listener_tls_verify_chain_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-import(
+    emqx_test_tls_certs_helper,
+    [
+        emqx_start_listener/4,
+        fail_when_ssl_error/1,
+        fail_when_no_ssl_alert/2,
+        generate_tls_certs/1,
+        select_free_port/1
+    ]
+).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    generate_tls_certs(Config),
+    application:ensure_all_started(esockd),
+    [{ssl_config, ssl_config_verify_peer()} | Config].
+
+end_per_suite(_Config) ->
+    application:stop(esockd).
+
+t_conn_fail_with_intermediate_ca_cert(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+            {certfile, filename:join(DataDir, "server1.pem")},
+            {keyfile, filename:join(DataDir, "server1.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+
+    fail_when_no_ssl_alert(Socket, unknown_ca),
+    ok = ssl:close(Socket).
+
+t_conn_fail_with_other_intermediate_ca_cert(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+            {certfile, filename:join(DataDir, "server1.pem")},
+            {keyfile, filename:join(DataDir, "server1.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+
+    fail_when_no_ssl_alert(Socket, unknown_ca),
+    ok = ssl:close(Socket).
+
+t_conn_success_with_server_client_composed_complete_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Server has root ca cert
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "root.pem")},
+            {certfile, filename:join(DataDir, "server2.pem")},
+            {keyfile, filename:join(DataDir, "server2.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    %% Client has complete chain
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-intermediate2-bundle.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_with_other_signed_client_composed_complete_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Server has root ca cert
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "root.pem")},
+            {certfile, filename:join(DataDir, "server1.pem")},
+            {keyfile, filename:join(DataDir, "server1.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    %% Client has partial_chain
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-intermediate2-bundle.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_with_renewed_intermediate_root_bundle(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Server has root ca cert
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "intermediate1_renewed-root-bundle.pem")},
+            {certfile, filename:join(DataDir, "server1.pem")},
+            {keyfile, filename:join(DataDir, "server1.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_with_client_complete_cert_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "root.pem")},
+            {certfile, filename:join(DataDir, "server2.pem")},
+            {keyfile, filename:join(DataDir, "server2.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-complete-bundle.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_fail_with_server_partial_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% imcomplete at server side
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "intermediate2.pem")},
+            {certfile, filename:join(DataDir, "server2.pem")},
+            {keyfile, filename:join(DataDir, "server2.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-complete-bundle.pem")},
+            {versions, ['tlsv1.2']},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_without_root_cacert(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options, [
+            {cacertfile, filename:join(DataDir, "intermediate2.pem")},
+            {certfile, filename:join(DataDir, "server2.pem")},
+            {keyfile, filename:join(DataDir, "server2.key")}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-intermediate2-bundle.pem")},
+            %% stick to tlsv1.2 for consistent error message
+            {versions, ['tlsv1.2']},
+            {cacertfile, filename:join(DataDir, "intermediate2.pem")}
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+ssl_config_verify_peer() ->
+    [
+        {verify, verify_peer},
+        {fail_if_no_peer_cert, true}
+    ].

+ 362 - 0
apps/emqx_auth_ext/test/emqx_auth_ext_listener_tls_verify_keyusage_SUITE.erl

@@ -0,0 +1,362 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_auth_ext_listener_tls_verify_keyusage_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-import(
+    emqx_test_tls_certs_helper,
+    [
+        fail_when_ssl_error/1,
+        fail_when_no_ssl_alert/2,
+        generate_tls_certs/1,
+        gen_host_cert/4,
+        emqx_start_listener/4,
+        select_free_port/1
+    ]
+).
+
+all() ->
+    [
+        {group, full_chain},
+        {group, partial_chain}
+    ].
+
+all_tc() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+    [
+        {partial_chain, [], all_tc()},
+        {full_chain, [], all_tc()}
+    ].
+
+init_per_suite(Config) ->
+    generate_tls_certs(Config),
+    application:ensure_all_started(esockd),
+    Config.
+
+end_per_suite(_Config) ->
+    application:stop(esockd).
+
+init_per_group(full_chain, Config) ->
+    [{ssl_config, ssl_config_verify_peer_full_chain(Config)} | Config];
+init_per_group(partial_chain, Config) ->
+    [{ssl_config, ssl_config_verify_peer_partial_chain(Config)} | Config];
+init_per_group(_, Config) ->
+    Config.
+
+end_per_group(_, Config) ->
+    Config.
+
+t_conn_success_verify_peer_ext_key_usage_unset(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Given listener keyusage unset
+    Options = [{ssl_options, ?config(ssl_config, Config)}],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% when client connect with cert without keyusage ext
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connection success
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_verify_peer_ext_key_usage_undefined(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Give listener keyusage is set to undefined
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, undefined}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% when client connect with cert without keyusages ext
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connection success
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_verify_peer_ext_key_usage_matched_predefined(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Give listener keyusage is set to clientAuth
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, "clientAuth"}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+
+    %% When client cert has clientAuth that is matched
+    gen_client_cert_ext_keyusage(?FUNCTION_NAME, "intermediate1", DataDir, "clientAuth"),
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, client_key_file(DataDir, ?FUNCTION_NAME)},
+            {certfile, client_pem_file(DataDir, ?FUNCTION_NAME)},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connection success
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_verify_peer_ext_key_usage_matched_raw_oid(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Give listener keyusage is set to raw OID
+
+    %% from OTP-PUB-KEY.hrl
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, "OID:1.3.6.1.5.5.7.3.2"}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% When client cert has keyusage and matched.
+    gen_client_cert_ext_keyusage(?FUNCTION_NAME, "intermediate1", DataDir, "clientAuth"),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, client_key_file(DataDir, ?FUNCTION_NAME)},
+            {certfile, client_pem_file(DataDir, ?FUNCTION_NAME)},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connection success
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_verify_peer_ext_key_usage_matched_ordered_list(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+
+    %% Give listener keyusage is clientAuth,serverAuth
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, "clientAuth,serverAuth"}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% When client cert has the same keyusage ext list
+    gen_client_cert_ext_keyusage(?FUNCTION_NAME, "intermediate1", DataDir, "clientAuth,serverAuth"),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, client_key_file(DataDir, ?FUNCTION_NAME)},
+            {certfile, client_pem_file(DataDir, ?FUNCTION_NAME)},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connection success
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_verify_peer_ext_key_usage_matched_unordered_list(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Give listener keyusage is clientAuth,serverAuth
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, "serverAuth,clientAuth"}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% When client cert has the same keyusage ext list but different order
+    gen_client_cert_ext_keyusage(?FUNCTION_NAME, "intermediate1", DataDir, "clientAuth,serverAuth"),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, client_key_file(DataDir, ?FUNCTION_NAME)},
+            {certfile, client_pem_file(DataDir, ?FUNCTION_NAME)},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connection success
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_fail_verify_peer_ext_key_usage_unmatched_raw_oid(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Give listener keyusage is using OID
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, "OID:1.3.6.1.5.5.7.3.1"}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+
+    %% When client cert has the keyusage but not matching OID
+    gen_client_cert_ext_keyusage(?FUNCTION_NAME, "intermediate1", DataDir, "clientAuth"),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, client_key_file(DataDir, ?FUNCTION_NAME)},
+            {certfile, client_pem_file(DataDir, ?FUNCTION_NAME)},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+
+    %% Then connecion should fail.
+    fail_when_no_ssl_alert(Socket, handshake_failure),
+    ok = ssl:close(Socket).
+
+t_conn_fail_verify_peer_ext_key_usage_empty_str(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, ""}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    %% Give listener keyusage is empty string
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% When client connect with cert without keyusage
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connecion should fail.
+    fail_when_no_ssl_alert(Socket, handshake_failure),
+    ok = ssl:close(Socket).
+
+t_conn_fail_client_keyusage_unmatch(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+
+    %% Give listener keyusage is clientAuth
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage, "clientAuth"}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% When client connect with mismatch cert keyusage = codeSigning
+    gen_client_cert_ext_keyusage(?FUNCTION_NAME, "intermediate1", DataDir, "codeSigning"),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, client_key_file(DataDir, ?FUNCTION_NAME)},
+            {certfile, client_pem_file(DataDir, ?FUNCTION_NAME)},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connecion should fail.
+    fail_when_no_ssl_alert(Socket, handshake_failure),
+    ok = ssl:close(Socket).
+
+t_conn_fail_client_keyusage_incomplete(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% Give listener keyusage is codeSigning,clientAuth
+    Options = [
+        {ssl_options, [
+            {verify_peer_ext_key_usage,
+                "serverAuth,clientAuth,codeSigning,emailProtection,timeStamping,ocspSigning"}
+            | ?config(ssl_config, Config)
+        ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    %% When client connect with cert keyusage = clientAuth
+    gen_client_cert_ext_keyusage(?FUNCTION_NAME, "intermediate1", DataDir, "codeSigning"),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")},
+            {verify, verify_none}
+        ],
+        1000
+    ),
+    %% Then connection should fail
+    fail_when_no_ssl_alert(Socket, handshake_failure),
+    ok = ssl:close(Socket).
+
+%%%
+%%% Helpers
+%%%
+gen_client_cert_ext_keyusage(Name, CA, DataDir, Usage) when is_atom(Name) ->
+    gen_client_cert_ext_keyusage(atom_to_list(Name), CA, DataDir, Usage);
+gen_client_cert_ext_keyusage(Name, CA, DataDir, Usage) ->
+    gen_host_cert(Name, CA, DataDir, #{ext => "extendedKeyUsage=" ++ Usage}).
+
+client_key_file(DataDir, Name) ->
+    filename:join(DataDir, Name) ++ ".key".
+
+client_pem_file(DataDir, Name) ->
+    filename:join(DataDir, Name) ++ ".pem".
+
+ssl_config_verify_peer_full_chain(Config) ->
+    [
+        {cacertfile, filename:join(?config(data_dir, Config), "intermediate1-root-bundle.pem")}
+        | ssl_config_verify_peer(Config)
+    ].
+ssl_config_verify_peer_partial_chain(Config) ->
+    [
+        {cacertfile, filename:join(?config(data_dir, Config), "intermediate1.pem")},
+        {partial_chain, true}
+        | ssl_config_verify_peer(Config)
+    ].
+
+ssl_config_verify_peer(Config) ->
+    DataDir = ?config(data_dir, Config),
+    [
+        {verify, verify_peer},
+        {fail_if_no_peer_cert, true},
+        {keyfile, filename:join(DataDir, "server1.key")},
+        {certfile, filename:join(DataDir, "server1.pem")}
+        %% , {log_level, debug}
+    ].

+ 709 - 0
apps/emqx_auth_ext/test/emqx_auth_ext_listener_tls_verify_partial_chain_SUITE.erl

@@ -0,0 +1,709 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_auth_ext_listener_tls_verify_partial_chain_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-import(
+    emqx_test_tls_certs_helper,
+    [
+        emqx_start_listener/4,
+        fail_when_ssl_error/1,
+        fail_when_no_ssl_alert/2,
+        generate_tls_certs/1,
+        select_free_port/1
+    ]
+).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    generate_tls_certs(Config),
+    application:ensure_all_started(esockd),
+    [{ssl_config, ssl_config_verify_partial_chain()} | Config].
+
+end_per_suite(_Config) ->
+    application:stop(esockd).
+
+t_conn_success_with_server_intermediate_cacert_and_client_cert(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ssl:close(Socket).
+
+t_conn_success_with_intermediate_cacert_bundle(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "server1-intermediate1-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ssl:close(Socket).
+
+t_conn_success_with_renewed_intermediate_cacert(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1_renewed.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ssl:close(Socket).
+
+t_conn_fail_with_renewed_intermediate_cacert_and_client_using_old_complete_bundle(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-complete-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_with_renewed_intermediate_cacert_and_client_using_old_bundle(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-intermediate2-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_success_with_old_and_renewed_intermediate_cacert_and_client_provides_renewed_client_cert(
+    Config
+) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed_old-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")},
+                    {partial_chain, two_cacerts_from_cacertfile}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2_renewed.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ssl:close(Socket).
+
+%% Note, this is good to have for usecase coverage
+t_conn_success_with_new_intermediate_cacert_and_client_provides_renewed_client_cert_signed_by_old_intermediate(
+    Config
+) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2_renewed.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ssl:close(Socket).
+
+%% @doc server should build a partial_chain with old version of ca cert.
+t_conn_success_with_old_and_renewed_intermediate_cacert_and_client_provides_client_cert(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed_old-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")},
+                    {partial_chain, two_cacerts_from_cacertfile}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ssl:close(Socket).
+
+%% @doc verify when config does not allow two versions of certs from same trusted CA.
+t_conn_fail_with_renewed_and_old_intermediate_cacert_and_client_using_old_bundle(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed_old-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-intermediate2-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+%% @doc verify when config (two_cacerts_from_cacertfile) allows two versions of certs from same trusted CA.
+t_001_conn_success_with_old_and_renewed_intermediate_cacert_bundle_and_client_using_old_bundle(
+    Config
+) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed_old-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")},
+                    {partial_chain, two_cacerts_from_cacertfile}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-intermediate2-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ssl:close(Socket).
+
+%% @doc: verify even if listener has old/new intermediate2 certs,
+%%       client1 should not able to connect with old intermediate2 cert.
+%%  In this case, listener verify_fun returns {trusted_ca, Oldintermediate2Cert} but
+%%  OTP should still fail the validation since the client1 cert is not signed by
+%%  Oldintermediate2Cert (trusted CA cert).
+%% @end
+t_conn_fail_with_old_and_renewed_intermediate_cacert_bundle_and_client_using_all_CAcerts(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2_renewed_old-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")},
+                    {partial_chain, two_cacerts_from_cacertfile}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "all-CAcerts-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_with_renewed_intermediate_cacert_other_client(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1_renewed.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_with_intermediate_cacert_bundle_but_incorrect_order(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1-server1-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_when_singed_by_other_intermediate_ca(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_success_with_complete_chain_that_server_root_cacert_and_client_complete_cert_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "root.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-complete-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_fail_with_other_client_complete_cert_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-complete-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_with_server_intermediate_and_other_client_complete_cert_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1-root-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-complete-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_success_with_server_intermediate_cacert_and_client_complete_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-complete-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_fail_with_server_intermediate_chain_and_client_other_incomplete_cert_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-intermediate2-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_with_server_intermediate_and_other_client_root_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-root-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_success_with_server_intermediate_and_client_root_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate2.pem")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-root-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+%% @doc once rootCA cert present in cacertfile, sibling CA signed Client cert could connect.
+t_conn_success_with_server_all_CA_bundle_and_client_root_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "all-CAcerts-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    {ok, Socket} = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-root-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_ssl_error(Socket),
+    ok = ssl:close(Socket).
+
+t_conn_fail_with_server_two_IA_bundle_and_client_root_chain(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "two-intermediates-bundle.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client2.key")},
+            {certfile, filename:join(DataDir, "client2-root-bundle.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_conn_fail_with_server_partial_chain_false_intermediate_cacert_and_client_cert(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "intermediate1.pem")},
+                    {certfile, filename:join(DataDir, "server1.pem")},
+                    {keyfile, filename:join(DataDir, "server1.key")},
+                    {partial_chain, false}
+                ]}
+    ],
+    emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options),
+    Res = ssl:connect(
+        {127, 0, 0, 1},
+        Port,
+        [
+            {keyfile, filename:join(DataDir, "client1.key")},
+            {certfile, filename:join(DataDir, "client1.pem")}
+            | client_default_tls_opts()
+        ],
+        1000
+    ),
+    fail_when_no_ssl_alert(Res, unknown_ca).
+
+t_error_handling_invalid_cacertfile(Config) ->
+    Port = select_free_port(ssl),
+    DataDir = ?config(data_dir, Config),
+    %% trigger error
+    Options = [
+        {ssl_options,
+            ?config(ssl_config, Config) ++
+                [
+                    {cacertfile, filename:join(DataDir, "server2.key")},
+                    {certfile, filename:join(DataDir, "server2.pem")},
+                    {keyfile, filename:join(DataDir, "server2.key")}
+                ]}
+    ],
+    ?assertException(
+        throw,
+        {error, rootfun_trusted_ca_from_cacertfile},
+        emqx_start_listener(?FUNCTION_NAME, ssl, Port, Options)
+    ).
+
+ssl_config_verify_partial_chain() ->
+    [
+        {verify, verify_peer},
+        {fail_if_no_peer_cert, true},
+        {partial_chain, true}
+    ].
+
+client_default_tls_opts() ->
+    [
+        {versions, ['tlsv1.2']},
+        {verify, verify_none}
+    ].

+ 66 - 0
apps/emqx_auth_ext/test/emqx_auth_ext_schema_SUITE.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_auth_ext_schema_SUITE).
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(BASE_CONF,
+    "\n"
+    "    listeners.ssl.auth_ext.bind = 28883\n"
+    "    listeners.ssl.auth_ext.enable = true\n"
+    "    listeners.ssl.auth_ext.ssl_options.partial_chain = true\n"
+    "    listeners.ssl.auth_ext.ssl_options.verify = verify_peer\n"
+    "    listeners.ssl.auth_ext.ssl_options.verify_peer_ext_key_usage = \"clientAuth\"\n"
+    "    "
+).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    %% injection happens when module is loaded.
+    code:load_file(emqx_auth_ext),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            {emqx_conf, ?BASE_CONF}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    emqx_listeners:restart(),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
+    code:delete(emqx_auth_ext),
+    code:purge(emqx_auth_ext),
+    ok.
+
+t_conf_check_default(_Config) ->
+    Opts = esockd:get_options({'ssl:default', {{0, 0, 0, 0}, 8883}}),
+    SSLOpts = proplists:get_value(ssl_options, Opts),
+    ?assertEqual(none, proplists:lookup(partial_chain, SSLOpts)),
+    ?assertEqual(none, proplists:lookup(verify_fun, SSLOpts)).
+
+t_conf_check_auth_ext(_Config) ->
+    Opts = esockd:get_options({'ssl:auth_ext', 28883}),
+    SSLOpts = proplists:get_value(ssl_options, Opts),
+    ?assertMatch(Fun when is_function(Fun), proplists:get_value(partial_chain, SSLOpts)),
+    ?assertMatch({Fun, _} when is_function(Fun), proplists:get_value(verify_fun, SSLOpts)).

+ 1 - 0
apps/emqx_auth_http/src/emqx_authz_http.erl

@@ -51,6 +51,7 @@
     ?VAR_ACTION,
     ?VAR_CERT_SUBJECT,
     ?VAR_CERT_CN_NAME,
+    ?VAR_CERT_PEM,
     ?VAR_ACCESS,
     ?VAR_NS_CLIENT_ATTRS
 ]).

+ 9 - 3
apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl

@@ -37,6 +37,7 @@
     protocol => mqtt,
     cert_subject => <<"cert_subject_data">>,
     cert_common_name => <<"cert_common_name_data">>,
+    cert_pem => <<"fake_raw_cert_to_be_base64_encoded">>,
     client_attrs => #{<<"group">> => <<"g1">>}
 }).
 
@@ -230,7 +231,8 @@ t_no_value_for_placeholder(_Config) ->
         {ok, RawBody, Req1} = cowboy_req:read_body(Req0),
         #{
             <<"cert_subject">> := <<"">>,
-            <<"cert_common_name">> := <<"">>
+            <<"cert_common_name">> := <<"">>,
+            <<"cert_pem">> := <<"">>
         } = emqx_utils_json:decode(RawBody, [return_maps]),
         Req = cowboy_req:reply(
             200,
@@ -246,7 +248,8 @@ t_no_value_for_placeholder(_Config) ->
         <<"headers">> => #{<<"content-type">> => <<"application/json">>},
         <<"body">> => #{
             <<"cert_subject">> => ?PH_CERT_SUBJECT,
-            <<"cert_common_name">> => ?PH_CERT_CN_NAME
+            <<"cert_common_name">> => ?PH_CERT_CN_NAME,
+            <<"cert_pem">> => ?PH_CERT_PEM
         }
     },
 
@@ -259,7 +262,7 @@ t_no_value_for_placeholder(_Config) ->
 
     ok = emqx_authn_http_test_server:set_handler(Handler),
 
-    Credentials = maps:without([cert_subject, cert_common_name], ?CREDENTIALS),
+    Credentials = maps:without([cert_subject, cert_common_name, cert_pem], ?CREDENTIALS),
 
     ?assertMatch({ok, _}, emqx_access_control:authenticate(Credentials)),
 
@@ -713,8 +716,10 @@ samples() ->
                     <<"peerhost">> := <<"127.0.0.1">>,
                     <<"cert_subject">> := <<"cert_subject_data">>,
                     <<"cert_common_name">> := <<"cert_common_name_data">>,
+                    <<"cert_pem">> := CertPem,
                     <<"the_group">> := <<"g1">>
                 } = emqx_utils_json:decode(RawBody, [return_maps]),
+                <<"fake_raw_cert_to_be_base64_encoded">> = base64:decode(CertPem),
                 Req = cowboy_req:reply(
                     200,
                     #{<<"content-type">> => <<"application/json">>},
@@ -733,6 +738,7 @@ samples() ->
                     <<"peerhost">> => ?PH_PEERHOST,
                     <<"cert_subject">> => ?PH_CERT_SUBJECT,
                     <<"cert_common_name">> => ?PH_CERT_CN_NAME,
+                    <<"cert_pem">> => ?PH_CERT_PEM,
                     <<"the_group">> => <<"${client_attrs.group}">>
                 }
             },

+ 2 - 1
apps/emqx_conf/include/emqx_conf.hrl

@@ -68,12 +68,13 @@
 
 -if(?EMQX_RELEASE_EDITION == ee).
 
+-define(AUTH_EXT_SCHEMA_MODS, [emqx_auth_ext_schema]).
 -define(AUTHZ_SOURCE_SCHEMA_MODS, ?CE_AUTHZ_SOURCE_SCHEMA_MODS ++ ?EE_AUTHZ_SOURCE_SCHEMA_MODS).
 -define(AUTHN_PROVIDER_SCHEMA_MODS,
     (?CE_AUTHN_PROVIDER_SCHEMA_MODS ++ ?EE_AUTHN_PROVIDER_SCHEMA_MODS)
 ).
 
--define(OTHER_INJECTING_CONFIGS, []).
+-define(OTHER_INJECTING_CONFIGS, ?AUTH_EXT_SCHEMA_MODS).
 
 -else.
 

+ 29 - 27
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -242,7 +242,7 @@ esockd_send(Data, #state{
 }) ->
     gen_udp:send(Sock, Ip, Port, Data);
 esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
-    esockd_transport:async_send(Sock, Data).
+    esockd_transport:send(Sock, Data).
 
 keepalive_stats(recv) ->
     emqx_pd:get_counter(recv_pkt);
@@ -503,18 +503,6 @@ handle_msg(
 ) ->
     Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
     with_channel(handle_deliver, [Delivers], State);
-%% Something sent
-%% TODO: Who will deliver this message?
-handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
-    case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
-        true ->
-            Pubs = emqx_pd:reset_counter(outgoing_pkt),
-            Bytes = emqx_pd:reset_counter(outgoing_bytes),
-            OutStats = #{cnt => Pubs, oct => Bytes},
-            {ok, check_oom(run_gc(OutStats, State))};
-        false ->
-            ok
-    end;
 handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
     handle_info({sock_error, Reason}, State);
 handle_msg({close, Reason}, State) ->
@@ -630,8 +618,8 @@ handle_call(
             shutdown(Reason, Reply, State#state{channel = NChannel});
         {shutdown, Reason, Reply, Packet, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(Packet, NState),
-            shutdown(Reason, Reply, NState)
+            {ok, NState1} = handle_outgoing(Packet, NState),
+            shutdown(Reason, Reply, NState1)
     end.
 
 %%--------------------------------------------------------------------
@@ -772,15 +760,15 @@ with_channel(
             shutdown(Reason, State#state{channel = NChannel});
         {shutdown, Reason, Packet, NChannel} ->
             NState = State#state{channel = NChannel},
-            ok = handle_outgoing(Packet, NState),
-            shutdown(Reason, NState)
+            {ok, NState1} = handle_outgoing(Packet, NState),
+            shutdown(Reason, NState1)
     end.
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
 
-handle_outgoing(_Packets = [], _State) ->
-    ok;
+handle_outgoing(_Packets = [], State) ->
+    {ok, State};
 handle_outgoing(
     Packets,
     State = #state{socket = Socket}
@@ -792,12 +780,15 @@ handle_outgoing(
                 State
             );
         _ ->
-            lists:foreach(
-                fun(Packet) ->
-                    handle_outgoing(Packet, State)
+            NState = lists:foldl(
+                fun(Packet, State0) ->
+                    {ok, State1} = handle_outgoing(Packet, State0),
+                    State1
                 end,
+                State,
                 Packets
-            )
+            ),
+            {ok, NState}
     end;
 handle_outgoing(Packet, State) ->
     send((serialize_and_inc_stats_fun(State))(Packet), State).
@@ -842,7 +833,7 @@ serialize_and_inc_stats_fun(#state{
 %%--------------------------------------------------------------------
 %% Send data
 
--spec send(iodata(), state()) -> ok.
+-spec send(iodata(), state()) -> {ok, state()}.
 send(
     IoData,
     State = #state{
@@ -858,11 +849,22 @@ send(
     inc_counter(outgoing_bytes, Oct),
     case esockd_send(IoData, State) of
         ok ->
-            ok;
+            sent(State);
         Error = {error, _Reason} ->
-            %% Send an inet_reply to postpone handling the error
+            %% Send an inet_reply to defer handling the error
             self() ! {inet_reply, Socket, Error},
-            ok
+            {ok, State}
+    end.
+
+sent(#state{active_n = ActiveN} = State) ->
+    case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
+        true ->
+            Pubs = emqx_pd:reset_counter(outgoing_pkt),
+            Bytes = emqx_pd:reset_counter(outgoing_bytes),
+            OutStats = #{cnt => Pubs, oct => Bytes},
+            {ok, check_oom(run_gc(OutStats, State))};
+        false ->
+            {ok, State}
     end.
 
 %%--------------------------------------------------------------------

+ 8 - 0
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -559,6 +559,8 @@ ssl_opts(Name, Opts) ->
         [
             fun ssl_opts_crl_config/2,
             fun ssl_opts_drop_unsupported/2,
+            fun ssl_partial_chain/2,
+            fun ssl_verify_fun/2,
             fun ssl_server_opts/2
         ],
         SSLOpts,
@@ -586,6 +588,12 @@ ssl_server_opts(SSLOpts, ssl_options) ->
 ssl_server_opts(SSLOpts, dtls_options) ->
     emqx_tls_lib:to_server_opts(dtls, SSLOpts).
 
+ssl_partial_chain(SSLOpts, _Options) ->
+    emqx_tls_lib:maybe_inject_ssl_fun(root_fun, SSLOpts).
+
+ssl_verify_fun(SSLOpts, _Options) ->
+    emqx_tls_lib:maybe_inject_ssl_fun(verify_fun, SSLOpts).
+
 ranch_opts(Type, ListenOn, Opts) ->
     NumAcceptors = maps:get(acceptors, Opts, 4),
     MaxConnections = maps:get(max_connections, Opts, 1024),

+ 2 - 1
apps/emqx_machine/priv/reboot_lists.eterm

@@ -132,7 +132,8 @@
             emqx_gateway_jt808,
             emqx_bridge_syskeeper,
             emqx_bridge_confluent,
-            emqx_ds_shared_sub
+            emqx_ds_shared_sub,
+            emqx_auth_ext
         ],
     %% must always be of type `load'
     ce_business_apps =>

+ 20 - 347
apps/emqx_management/src/emqx_mgmt_api_metrics.erl

@@ -20,6 +20,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hocon_types.hrl").
+-include_lib("emqx/include/emqx_metrics.hrl").
 
 -import(hoconsc, [mk/2, ref/2]).
 
@@ -112,354 +113,26 @@ fields(node_metrics) ->
     [{node, mk(binary(), #{desc => <<"Node name">>})}] ++ properties().
 
 properties() ->
-    [
-        m(
-            'actions.failure',
-            <<"Number of failure executions of the rule engine action">>
-        ),
-        m(
-            'actions.success',
-            <<"Number of successful executions of the rule engine action">>
-        ),
-        m(
-            'bytes.received',
-            <<"Number of bytes received ">>
-        ),
-        m(
-            'bytes.sent',
-            <<"Number of bytes sent on this connection">>
-        ),
-        m(
-            'client.auth.anonymous',
-            <<"Number of clients who log in anonymously">>
-        ),
-        m(
-            'client.authenticate',
-            <<"Number of client authentications">>
-        ),
-        m(
-            'client.check_authz',
-            <<"Number of Authorization rule checks">>
-        ),
-        m(
-            'client.connack',
-            <<"Number of CONNACK packet sent">>
-        ),
-        m(
-            'client.connect',
-            <<"Number of client connections">>
-        ),
-        m(
-            'client.connected',
-            <<"Number of successful client connections">>
-        ),
-        m(
-            'client.disconnected',
-            <<"Number of client disconnects">>
-        ),
-        m(
-            'client.subscribe',
-            <<"Number of client subscriptions">>
-        ),
-        m(
-            'client.unsubscribe',
-            <<"Number of client unsubscriptions">>
-        ),
-        m(
-            'delivery.dropped',
-            <<"Total number of discarded messages when sending">>
-        ),
-        m(
-            'delivery.dropped.expired',
-            <<"Number of messages dropped due to message expiration on sending">>
-        ),
-        m(
-            'delivery.dropped.no_local',
-            <<
-                "Number of messages that were dropped due to the No Local subscription "
-                "option when sending"
-            >>
-        ),
-        m(
-            'delivery.dropped.qos0_msg',
-            <<
-                "Number of messages with QoS 0 that were dropped because the message "
-                "queue was full when sending"
-            >>
-        ),
-        m(
-            'delivery.dropped.queue_full',
-            <<
-                "Number of messages with a non-zero QoS that were dropped because the "
-                "message queue was full when sending"
-            >>
-        ),
-        m(
-            'delivery.dropped.too_large',
-            <<
-                "The number of messages that were dropped because the length exceeded "
-                "the limit when sending"
-            >>
-        ),
-        m(
-            'messages.acked',
-            <<"Number of received PUBACK and PUBREC packet">>
-        ),
-        m(
-            'messages.delayed',
-            <<"Number of delay-published messages">>
-        ),
-        m(
-            'messages.delivered',
-            <<"Number of messages forwarded to the subscription process internally">>
-        ),
-        m(
-            'messages.dropped',
-            <<"Total number of messages dropped before forwarding to the subscription process">>
-        ),
-        m(
-            'messages.dropped.await_pubrel_timeout',
-            <<"Number of messages dropped due to waiting PUBREL timeout">>
-        ),
-        m(
-            'messages.dropped.no_subscribers',
-            <<"Number of messages dropped due to no subscribers">>
-        ),
-        m(
-            'messages.forward',
-            <<"Number of messages forwarded to other nodes">>
-        ),
-        m(
-            'messages.publish',
-            <<"Number of messages published in addition to system messages">>
-        ),
-        m(
-            'messages.qos0.received',
-            <<"Number of QoS 0 messages received from clients">>
-        ),
-        m(
-            'messages.qos0.sent',
-            <<"Number of QoS 0 messages sent to clients">>
-        ),
-        m(
-            'messages.qos1.received',
-            <<"Number of QoS 1 messages received from clients">>
-        ),
-        m(
-            'messages.qos1.sent',
-            <<"Number of QoS 1 messages sent to clients">>
-        ),
-        m(
-            'messages.qos2.received',
-            <<"Number of QoS 2 messages received from clients">>
-        ),
-        m(
-            'messages.qos2.sent',
-            <<"Number of QoS 2 messages sent to clients">>
-        ),
-        m(
-            'messages.received',
-            <<
-                "Number of messages received from the client, equal to the sum of "
-                "messages.qos0.received\fmessages.qos1.received and messages.qos2.received"
-            >>
-        ),
-        %% m(
-        %%     'messages.retained',
-        %%     <<"Number of retained messages">>
-        %% ),
-        m(
-            'messages.sent',
-            <<
-                "Number of messages sent to the client, equal to the sum of "
-                "messages.qos0.sent\fmessages.qos1.sent and messages.qos2.sent"
-            >>
-        ),
-        m(
-            'packets.auth.received',
-            <<"Number of received AUTH packet">>
-        ),
-        m(
-            'packets.auth.sent',
-            <<"Number of sent AUTH packet">>
-        ),
-        m(
-            'packets.connack.auth_error',
-            <<"Number of received CONNECT packet with failed authentication">>
-        ),
-        m(
-            'packets.connack.error',
-            <<"Number of received CONNECT packet with unsuccessful connections">>
-        ),
-        m(
-            'packets.connack.sent',
-            <<"Number of sent CONNACK packet">>
-        ),
-        m(
-            'packets.connect.received',
-            <<"Number of received CONNECT packet">>
-        ),
-        m(
-            'packets.disconnect.received',
-            <<"Number of received DISCONNECT packet">>
-        ),
-        m(
-            'packets.disconnect.sent',
-            <<"Number of sent DISCONNECT packet">>
-        ),
-        m(
-            'packets.pingreq.received',
-            <<"Number of received PINGREQ packet">>
-        ),
-        m(
-            'packets.pingresp.sent',
-            <<"Number of sent PUBRESP packet">>
-        ),
-        m(
-            'packets.puback.inuse',
-            <<"Number of received PUBACK packet with occupied identifiers">>
-        ),
-        m(
-            'packets.puback.missed',
-            <<"Number of received packet with identifiers.">>
-        ),
-        m(
-            'packets.puback.received',
-            <<"Number of received PUBACK packet">>
-        ),
-        m(
-            'packets.puback.sent',
-            <<"Number of sent PUBACK packet">>
-        ),
-        m(
-            'packets.pubcomp.inuse',
-            <<"Number of received PUBCOMP packet with occupied identifiers">>
-        ),
-        m(
-            'packets.pubcomp.missed',
-            <<"Number of missed PUBCOMP packet">>
-        ),
-        m(
-            'packets.pubcomp.received',
-            <<"Number of received PUBCOMP packet">>
-        ),
-        m(
-            'packets.pubcomp.sent',
-            <<"Number of sent PUBCOMP packet">>
-        ),
-        m(
-            'packets.publish.auth_error',
-            <<"Number of received PUBLISH packets with failed the Authorization check">>
-        ),
-        m(
-            'packets.publish.dropped',
-            <<"Number of messages discarded due to the receiving limit">>
-        ),
-        m(
-            'packets.publish.error',
-            <<"Number of received PUBLISH packet that cannot be published">>
-        ),
-        m(
-            'packets.publish.inuse',
-            <<"Number of received PUBLISH packet with occupied identifiers">>
-        ),
-        m(
-            'packets.publish.received',
-            <<"Number of received PUBLISH packet">>
-        ),
-        m(
-            'packets.publish.sent',
-            <<"Number of sent PUBLISH packet">>
-        ),
-        m(
-            'packets.pubrec.inuse',
-            <<"Number of received PUBREC packet with occupied identifiers">>
-        ),
-        m(
-            'packets.pubrec.missed',
-            <<"Number of received PUBREC packet with unknown identifiers">>
-        ),
-        m(
-            'packets.pubrec.received',
-            <<"Number of received PUBREC packet">>
-        ),
-        m(
-            'packets.pubrec.sent',
-            <<"Number of sent PUBREC packet">>
-        ),
-        m(
-            'packets.pubrel.missed',
-            <<"Number of received PUBREC packet with unknown identifiers">>
-        ),
-        m(
-            'packets.pubrel.received',
-            <<"Number of received PUBREL packet">>
-        ),
-        m(
-            'packets.pubrel.sent',
-            <<"Number of sent PUBREL packet">>
-        ),
-        m(
-            'packets.received',
-            <<"Number of received packet">>
-        ),
-        m(
-            'packets.sent',
-            <<"Number of sent packet">>
-        ),
-        m(
-            'packets.suback.sent',
-            <<"Number of sent SUBACK packet">>
-        ),
-        m(
-            'packets.subscribe.auth_error',
-            <<"Number of received SUBACK packet with failed Authorization check">>
-        ),
-        m(
-            'packets.subscribe.error',
-            <<"Number of received SUBSCRIBE packet with failed subscriptions">>
-        ),
-        m(
-            'packets.subscribe.received',
-            <<"Number of received SUBSCRIBE packet">>
-        ),
-        m(
-            'packets.unsuback.sent',
-            <<"Number of sent UNSUBACK packet">>
-        ),
-        m(
-            'packets.unsubscribe.error',
-            <<"Number of received UNSUBSCRIBE packet with failed unsubscriptions">>
-        ),
-        m(
-            'packets.unsubscribe.received',
-            <<"Number of received UNSUBSCRIBE packet">>
-        ),
-        m(
-            'rules.matched',
-            <<"Number of rule matched">>
-        ),
-        m(
-            'session.created',
-            <<"Number of sessions created">>
-        ),
-        m(
-            'session.discarded',
-            <<"Number of sessions dropped because Clean Session or Clean Start is true">>
-        ),
-        m(
-            'session.resumed',
-            <<"Number of sessions resumed because Clean Session or Clean Start is false">>
-        ),
-        m(
-            'session.takenover',
-            <<"Number of sessions takenover because Clean Session or Clean Start is false">>
-        ),
-        m(
-            'session.terminated',
-            <<"Number of terminated sessions">>
+    Metrics = lists:append([
+        ?BYTES_METRICS,
+        ?PACKET_METRICS,
+        ?MESSAGE_METRICS,
+        ?DELIVERY_METRICS,
+        ?CLIENT_METRICS,
+        ?SESSION_METRICS,
+        ?STASTS_ACL_METRICS,
+        ?STASTS_AUTHN_METRICS,
+        ?OLP_METRICS
+    ]),
+    lists:reverse(
+        lists:foldl(
+            fun({_Type, MetricName, Desc}, Acc) ->
+                [m(MetricName, Desc) | Acc]
+            end,
+            [],
+            Metrics
         )
-    ].
+    ).
 
 m(K, Desc) ->
     {K, mk(non_neg_integer(), #{desc => Desc})}.

+ 1 - 1
apps/emqx_s3/src/emqx_s3.app.src

@@ -1,6 +1,6 @@
 {application, emqx_s3, [
     {description, "EMQX S3"},
-    {vsn, "5.1.0"},
+    {vsn, "5.1.1"},
     {modules, []},
     {registered, [emqx_s3_sup]},
     {applications, [

+ 2 - 2
apps/emqx_s3/src/emqx_s3_schema.erl

@@ -121,7 +121,7 @@ fields(s3_uploader) ->
                 #{
                     default => <<"5mb">>,
                     desc => ?DESC("min_part_size"),
-                    required => true,
+                    required => false,
                     validator => fun part_size_validator/1
                 }
             )},
@@ -131,7 +131,7 @@ fields(s3_uploader) ->
                 #{
                     default => <<"5gb">>,
                     desc => ?DESC("max_part_size"),
-                    required => true,
+                    required => false,
                     validator => fun part_size_validator/1
                 }
             )}

+ 1 - 0
changes/ce/feat-13180.en.md

@@ -0,0 +1 @@
+Improve client message handling performance when running on OTP 26.

+ 22 - 0
changes/ee/feat-13211.en.md

@@ -0,0 +1,22 @@
+Enhance TLS listener to support more flexible TLS verifications.
+
+- partial_chain support
+
+  If the option `partial_chain` is set to `true`, allow connections with incomplete certificate chains.
+  
+  Check the configuration manual document for more details.
+  
+- Certificate KeyUsage Validation
+
+  Added support for required Extended Key Usage defined in 
+  [rfc5280](https://www.rfc-editor.org/rfc/rfc5280#section-4.2.1.12).
+
+  Introduced a new option (`verify_peer_ext_key_usage`) to require specific key usages (like "serverAuth") 
+  in peer certificates during the TLS handshake.
+  This strengthens security by ensuring certificates are used for their intended purposes.
+
+  example:
+     "serverAuth,OID:1.3.6.1.5.5.7.3.2"
+    
+  Check the configuration manual document for more details.
+     

+ 3 - 1
mix.exs

@@ -101,6 +101,7 @@ defmodule EMQXUmbrella.MixProject do
       {:bcrypt, github: "emqx/erlang-bcrypt", tag: "0.6.2", override: true},
       {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
       {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
+      {:mimerl, "1.2.0", override: true},
       {:ra, "2.7.3", override: true}
     ] ++
       emqx_apps(profile_info, version) ++
@@ -202,7 +203,8 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_gateway_ocpp,
       :emqx_gateway_jt808,
       :emqx_bridge_syskeeper,
-      :emqx_ds_shared_sub
+      :emqx_ds_shared_sub,
+      :emqx_auth_ext
     ])
   end
 

+ 1 - 0
rebar.config.erl

@@ -122,6 +122,7 @@ is_community_umbrella_app("apps/emqx_message_transformation") -> false;
 is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
 is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
 is_community_umbrella_app("apps/emqx_ds_shared_sub") -> false;
+is_community_umbrella_app("apps/emqx_auth_ext") -> false;
 is_community_umbrella_app(_) -> true.
 
 %% BUILD_WITHOUT_JQ

+ 46 - 0
rel/i18n/emqx_auth_ext_schema.hocon

@@ -0,0 +1,46 @@
+emqx_auth_ext_schema {
+
+common_ssl_opts_schema_partial_chain.desc:
+"""Enable or disable peer verification with partial_chain.
+When local verifies a peer certificate during the x509 path validation
+process, it constructs a certificate chain that starts with the peer
+certificate and ends with a trust anchor.
+By default, if it is set to `false`, the trust anchor is the
+Root CA, and the certificate chain must be complete.
+However, if the setting is set to `true` or `cacert_from_cacertfile`,
+the last certificate in `cacertfile` will be used as the trust anchor
+certificate (intermediate CA). This creates a partial chain
+in the path validation.
+Alternatively, if it is configured with `two_cacerts_from_cacertfile`,
+one of the last two certificates in `cacertfile` will be used as the
+trust anchor certificate, forming a partial chain. This option is
+particularly useful for intermediate CA certificate rotation.
+However, please note that it incurs some additional overhead, so it
+should only be used for certificate rotation purposes."""
+
+common_ssl_opts_schema_partial_chain.label:
+"""Partial chain"""
+
+common_ssl_opts_verify_peer_ext_key_usage.desc:
+"""Verify extended key usage in peer's certificate
+For additional peer certificate validation, the value defined here must present in the
+'Extended Key Usage' of peer certificate defined in
+[rfc5280](https://www.rfc-editor.org/rfc/rfc5280#section-4.2.1.12).
+
+Allowed values are
+- `clientAuth`
+- `serverAuth`
+- `codeSigning`
+- `emailProtection`
+- `timeStamping`
+- `ocspSigning`
+- raw OID, for example: "OID:1.3.6.1.5.5.7.3.2" means `id-pk 2` which is equivalent to `clientAuth`
+
+Comma-separated string is also supported for validating more than one key usages.
+
+For example, `"serverAuth,OID:1.3.6.1.5.5.7.3.2"`"""
+
+common_ssl_opts_verify_peer_ext_key_usage.label:
+"""Verify KeyUsage in cert"""
+
+}

+ 1 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -310,3 +310,4 @@ ElasticSearch
 doc_as_upsert
 upsert
 aliyun
+OID