Browse Source

Merge pull request #13583 from id/20240807-sync-release-branches

sync release branches
Ivan Dyachkov 1 year ago
parent
commit
bcd63344b8
100 changed files with 1188 additions and 369 deletions
  1. 1 1
      .ci/docker-compose-file/docker-compose-ldap.yaml
  2. 2 1
      .github/workflows/build_and_push_docker_images.yaml
  3. 1 0
      .github/workflows/build_packages_cron.yaml
  4. 1 0
      .github/workflows/codeql.yaml
  5. 1 0
      .github/workflows/green_master.yaml
  6. 2 2
      Makefile
  7. 1 0
      apps/emqx/include/emqx_mqtt.hrl
  8. 2 2
      apps/emqx/include/emqx_release.hrl
  9. 1 1
      apps/emqx/include/logger.hrl
  10. 1 1
      apps/emqx/rebar.config
  11. 1 1
      apps/emqx/src/emqx.app.src
  12. 79 30
      apps/emqx/src/emqx_channel.erl
  13. 8 1
      apps/emqx/src/emqx_connection.erl
  14. 74 27
      apps/emqx/src/emqx_frame.erl
  15. 15 6
      apps/emqx/src/emqx_listeners.erl
  16. 1 1
      apps/emqx/src/emqx_logger_jsonfmt.erl
  17. 19 5
      apps/emqx/src/emqx_logger_textfmt.erl
  18. 0 5
      apps/emqx/src/emqx_packet.erl
  19. 4 4
      apps/emqx/src/emqx_quic_connection.erl
  20. 8 0
      apps/emqx/src/emqx_tls_lib.erl
  21. 0 5
      apps/emqx/src/emqx_trace/emqx_trace_formatter.erl
  22. 11 2
      apps/emqx/src/emqx_ws_connection.erl
  23. 21 13
      apps/emqx/test/emqx_channel_SUITE.erl
  24. 105 2
      apps/emqx/test/emqx_crl_cache_SUITE.erl
  25. 44 10
      apps/emqx/test/emqx_frame_SUITE.erl
  26. 45 27
      apps/emqx/test/emqx_packet_SUITE.erl
  27. 1 1
      apps/emqx_auth/src/emqx_auth.app.src
  28. 7 1
      apps/emqx_auth/src/emqx_authz/emqx_authz.erl
  29. 72 0
      apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl
  30. 1 1
      apps/emqx_auth_http/src/emqx_auth_http.app.src
  31. 1 1
      apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src
  32. 1 1
      apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src
  33. 1 1
      apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src
  34. 1 1
      apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src
  35. 1 1
      apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src
  36. 1 1
      apps/emqx_auth_redis/src/emqx_auth_redis.app.src
  37. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  38. 1 1
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  39. 4 1
      apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
  40. 1 1
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src
  41. 1 1
      apps/emqx_bridge_http/src/emqx_bridge_http.app.src
  42. 1 1
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
  43. 3 2
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
  44. 1 1
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src
  45. 70 12
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl
  46. 4 3
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl
  47. 32 24
      apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl
  48. 46 0
      apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl
  49. 1 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
  50. 27 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl
  51. 17 9
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl
  52. 1 3
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl
  53. 7 2
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl
  54. 113 76
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl
  55. 1 1
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
  56. 1 1
      apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src
  57. 1 1
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src
  58. 1 1
      apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src
  59. 1 1
      apps/emqx_conf/src/emqx_conf.app.src
  60. 1 1
      apps/emqx_connector/src/emqx_connector.app.src
  61. 1 1
      apps/emqx_connector/src/emqx_connector_resource.erl
  62. 1 1
      apps/emqx_dashboard/src/emqx_dashboard.app.src
  63. 1 1
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src
  64. 1 1
      apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src
  65. 1 1
      apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src
  66. 1 1
      apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src
  67. 1 1
      apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src
  68. 1 1
      apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src
  69. 1 1
      apps/emqx_machine/src/emqx_machine.app.src
  70. 1 1
      apps/emqx_management/src/emqx_management.app.src
  71. 3 7
      apps/emqx_management/src/emqx_mgmt_app.erl
  72. 7 9
      apps/emqx_management/src/emqx_mgmt_auth.erl
  73. 22 2
      apps/emqx_management/src/emqx_mgmt_cli.erl
  74. 4 4
      apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl
  75. 5 0
      apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl
  76. 1 1
      apps/emqx_modules/src/emqx_modules.app.src
  77. 1 1
      apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src
  78. 1 1
      apps/emqx_plugins/src/emqx_plugins.app.src
  79. 27 7
      apps/emqx_plugins/src/emqx_plugins.erl
  80. 1 1
      apps/emqx_prometheus/src/emqx_prometheus.app.src
  81. 1 1
      apps/emqx_resource/src/emqx_resource.app.src
  82. 11 1
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl
  83. 1 1
      apps/emqx_schema_registry/include/emqx_schema_registry.hrl
  84. 11 5
      apps/emqx_schema_registry/src/emqx_schema_registry.erl
  85. 39 10
      apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl
  86. 60 0
      apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl
  87. 1 1
      apps/emqx_utils/src/emqx_utils.app.src
  88. 1 0
      changes/ce/feat-13524.en.md
  89. 1 0
      changes/ce/feat-13534.en.md
  90. 4 0
      changes/ce/fix-13357.en.md
  91. 1 0
      changes/ce/fix-13425.en.md
  92. 1 0
      changes/ce/fix-13541.en.md
  93. 8 0
      changes/ce/fix-13552.en.md
  94. 1 0
      changes/ee/feat-13546.en.md
  95. 1 0
      changes/ee/fix-13543.en.md
  96. 87 0
      changes/v5.7.2.en.md
  97. 2 2
      deploy/charts/emqx-enterprise/Chart.yaml
  98. 2 2
      deploy/charts/emqx/Chart.yaml
  99. 1 1
      mix.exs
  100. 0 0
      rebar.config

+ 1 - 1
.ci/docker-compose-file/docker-compose-ldap.yaml

@@ -10,7 +10,7 @@ services:
       nofile: 1024
     image: openldap
     #ports:
-    #  - 389:389
+    #  - "389:389"
     volumes:
       - ./certs/ca.crt:/etc/certs/ca.crt
     restart: always

+ 2 - 1
.github/workflows/build_and_push_docker_images.yaml

@@ -122,9 +122,10 @@ jobs:
         run: |
           ls -lR _packages/$PROFILE
           mv _packages/$PROFILE/*.tar.gz ./
+
       - name: Enable containerd image store on Docker Engine
         run: |
-          echo "$(jq '. += {"features": {"containerd-snapshotter": true}}' /etc/docker/daemon.json)" > daemon.json
+          echo "$(sudo cat /etc/docker/daemon.json | jq '. += {"features": {"containerd-snapshotter": true}}')" > daemon.json
           sudo mv daemon.json /etc/docker/daemon.json
           sudo systemctl restart docker
 

+ 1 - 0
.github/workflows/build_packages_cron.yaml

@@ -23,6 +23,7 @@ jobs:
         profile:
           - ['emqx', 'master']
           - ['emqx', 'release-57']
+          - ['emqx', 'release-58']
         os:
           - ubuntu22.04
           - amzn2023

+ 1 - 0
.github/workflows/codeql.yaml

@@ -24,6 +24,7 @@ jobs:
         branch:
           - master
           - release-57
+          - release-58
         language:
           - cpp
           - python

+ 1 - 0
.github/workflows/green_master.yaml

@@ -24,6 +24,7 @@ jobs:
         ref:
           - master
           - release-57
+          - release-58
     steps:
       - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
         with:

+ 2 - 2
Makefile

@@ -10,8 +10,8 @@ include env.sh
 
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.9.1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1
+export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1
+export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.1
 
 export EMQX_RELUP ?= true
 export EMQX_REL_FORM ?= tgz

+ 1 - 0
apps/emqx/include/emqx_mqtt.hrl

@@ -683,6 +683,7 @@ end).
 
 -define(FRAME_PARSE_ERROR, frame_parse_error).
 -define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
+
 -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).
 -define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).
 

+ 2 - 2
apps/emqx/include/emqx_release.hrl

@@ -32,7 +32,7 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Opensource edition
--define(EMQX_RELEASE_CE, "5.7.1").
+-define(EMQX_RELEASE_CE, "5.8.0-alpha.1").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.7.1").
+-define(EMQX_RELEASE_EE, "5.8.0-alpha.1").

+ 1 - 1
apps/emqx/include/logger.hrl

@@ -91,7 +91,7 @@
     ?_DO_TRACE(Tag, Msg, Meta),
     ?SLOG(
         Level,
-        (emqx_trace_formatter:format_meta_map(Meta))#{msg => Msg, tag => Tag},
+        (Meta)#{msg => Msg, tag => Tag},
         #{is_trace => false}
     )
 end).

+ 1 - 1
apps/emqx/rebar.config

@@ -28,7 +28,7 @@
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
-    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
+    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -2,7 +2,7 @@
 {application, emqx, [
     {id, "emqx"},
     {description, "EMQX Core"},
-    {vsn, "5.3.3"},
+    {vsn, "5.3.4"},
     {modules, []},
     {registered, []},
     {applications, [

+ 79 - 30
apps/emqx/src/emqx_channel.erl

@@ -146,7 +146,9 @@
 -type replies() :: emqx_types:packet() | reply() | [reply()].
 
 -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-
+-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState),
+    ((ConnState == connected) orelse (ConnState == reauthenticating))
+).
 -define(IS_COMMON_SESSION_TIMER(N),
     ((N == retry_delivery) orelse (N == expire_awaiting_rel))
 ).
@@ -337,7 +339,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) ->
     | {shutdown, Reason :: term(), channel()}
     | {shutdown, Reason :: term(), replies(), channel()}.
 handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
+    ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
 ->
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
 handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
@@ -567,29 +569,8 @@ handle_in(
     process_disconnect(ReasonCode, Properties, NChannel);
 handle_in(?AUTH_PACKET(), Channel) ->
     handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
-    shutdown(shutdown_count(frame_error, Reason), Channel);
-handle_in(
-    {frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
-) ->
-    shutdown(
-        shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
-    );
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
-    shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
-handle_in(
-    {frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
-) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
-->
-    handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
-->
-    handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
-handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
-    ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
-    {ok, Channel};
+handle_in({frame_error, Reason}, Channel) ->
+    handle_frame_error(Reason, Channel);
 handle_in(Packet, Channel) ->
     ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
@@ -1021,6 +1002,68 @@ not_nacked({deliver, _Topic, Msg}) ->
             true
     end.
 
+%%--------------------------------------------------------------------
+%% Handle Frame Error
+%%--------------------------------------------------------------------
+
+handle_frame_error(
+    Reason = #{cause := frame_too_large},
+    Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
+) when
+    ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
+->
+    ShutdownCount = shutdown_count(frame_error, Reason),
+    case proto_ver(Reason, ConnInfo) of
+        ?MQTT_PROTO_V5 ->
+            handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
+        _ ->
+            shutdown(ShutdownCount, Channel)
+    end;
+%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting,
+%% otherwise DONOT send any CONNACK or DISCONNECT packet.
+handle_frame_error(
+    Reason,
+    Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
+) when
+    is_map(Reason) andalso
+        (ConnState == idle orelse ConnState == connecting)
+->
+    ShutdownCount = shutdown_count(frame_error, Reason),
+    ProtoVer = proto_ver(Reason, ConnInfo),
+    NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}},
+    case ProtoVer of
+        ?MQTT_PROTO_V5 ->
+            shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel);
+        _ ->
+            shutdown(ShutdownCount, NChannel)
+    end;
+handle_frame_error(
+    Reason,
+    Channel = #channel{conn_state = connecting}
+) ->
+    shutdown(
+        shutdown_count(frame_error, Reason),
+        ?CONNACK_PACKET(?RC_MALFORMED_PACKET),
+        Channel
+    );
+handle_frame_error(
+    Reason,
+    Channel = #channel{conn_state = ConnState}
+) when
+    ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
+->
+    handle_out(
+        disconnect,
+        {?RC_MALFORMED_PACKET, Reason},
+        Channel
+    );
+handle_frame_error(
+    Reason,
+    Channel = #channel{conn_state = disconnected}
+) ->
+    ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
+    {ok, Channel}.
+
 %%--------------------------------------------------------------------
 %% Handle outgoing packet
 %%--------------------------------------------------------------------
@@ -1289,7 +1332,7 @@ handle_info(
             session = Session
         }
 ) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
+    ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
 ->
     {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
     Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
@@ -2636,8 +2679,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
     NAliases = maps:put(Topic, AliasId, Aliases),
     TopicAliases#{outbound => NAliases}.
 
--compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
-
+-compile({inline, [reply/2, shutdown/2, shutdown/3]}).
 reply(Reply, Channel) ->
     {reply, Reply, Channel}.
 
@@ -2673,13 +2715,13 @@ disconnect_and_shutdown(
         ?IS_MQTT_V5 =
         #channel{conn_state = ConnState}
 ) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
+    ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
 ->
     NChannel = ensure_disconnected(Reason, Channel),
     shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
 %% mqtt v3/v4 connected sessions
 disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
-    ConnState =:= connected orelse ConnState =:= reauthenticating
+    ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
 ->
     NChannel = ensure_disconnected(Reason, Channel),
     shutdown(Reason, Reply, NChannel);
@@ -2722,6 +2764,13 @@ is_durable_session(#channel{session = Session}) ->
             false
     end.
 
+proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) ->
+    ProtoVer;
+proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
+    ProtoVer;
+proto_ver(_, _) ->
+    ?MQTT_PROTO_V4.
+
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 8 - 1
apps/emqx/src/emqx_connection.erl

@@ -783,7 +783,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
                 input_bytes => Data,
                 parsed_packets => Packets
             }),
-            {[{frame_error, Reason} | Packets], State};
+            NState = enrich_state(Reason, State),
+            {[{frame_error, Reason} | Packets], NState};
         error:Reason:Stacktrace ->
             ?LOG(error, #{
                 at_state => emqx_frame:describe_state(ParseState),
@@ -1227,6 +1228,12 @@ inc_counter(Key, Inc) ->
     _ = emqx_pd:inc_counter(Key, Inc),
     ok.
 
+enrich_state(#{parse_state := NParseState}, State) ->
+    Serialize = emqx_frame:serialize_opts(NParseState),
+    State#state{parse_state = NParseState, serialize = Serialize};
+enrich_state(_, State) ->
+    State.
+
 set_tcp_keepalive({quic, _Listener}) ->
     ok;
 set_tcp_keepalive({Type, Id}) ->

+ 74 - 27
apps/emqx/src/emqx_frame.erl

@@ -267,28 +267,50 @@ packet(Header, Variable) ->
 packet(Header, Variable, Payload) ->
     #mqtt_packet{header = Header, variable = Variable, payload = Payload}.
 
-parse_connect(FrameBin, StrictMode) ->
-    {ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
-    case ProtoName of
-        <<"MQTT">> ->
-            ok;
-        <<"MQIsdp">> ->
-            ok;
-        _ ->
-            %% from spec: the server MAY send disconnect with reason code 0x84
-            %% we chose to close socket because the client is likely not talking MQTT anyway
-            ?PARSE_ERR(#{
-                cause => invalid_proto_name,
-                expected => <<"'MQTT' or 'MQIsdp'">>,
-                received => ProtoName
-            })
-    end,
-    parse_connect2(ProtoName, Rest, StrictMode).
+parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) ->
+    {ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
+    %% No need to parse and check proto_ver if proto_name is invalid, check it first
+    %% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2`
+    _ = validate_proto_name(ProtoName),
+    {IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0),
+    NOptions = Options#{version => ProtoVer},
+    try
+        do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode)
+    catch
+        throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) ->
+            ?PARSE_ERR(
+                ReasonM#{
+                    proto_ver => ProtoVer,
+                    proto_name => ProtoName,
+                    parse_state => ?NONE(NOptions)
+                }
+            );
+        throw:{?FRAME_PARSE_ERROR, Reason} ->
+            ?PARSE_ERR(
+                #{
+                    cause => Reason,
+                    proto_ver => ProtoVer,
+                    proto_name => ProtoName,
+                    parse_state => ?NONE(NOptions)
+                }
+            )
+    end.
 
-parse_connect2(
+do_parse_connect(
     ProtoName,
-    <<BridgeTag:4, ProtoVer:4, UsernameFlagB:1, PasswordFlagB:1, WillRetainB:1, WillQoS:2,
-        WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
+    IsBridge,
+    ProtoVer,
+    <<
+        UsernameFlagB:1,
+        PasswordFlagB:1,
+        WillRetainB:1,
+        WillQoS:2,
+        WillFlagB:1,
+        CleanStart:1,
+        Reserved:1,
+        KeepAlive:16/big,
+        Rest/binary
+    >>,
     StrictMode
 ) ->
     _ = validate_connect_reserved(Reserved),
@@ -303,14 +325,14 @@ parse_connect2(
         UsernameFlag = bool(UsernameFlagB),
         PasswordFlag = bool(PasswordFlagB)
     ),
-    {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
+    {Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode),
     {ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
     ConnPacket = #mqtt_packet_connect{
         proto_name = ProtoName,
         proto_ver = ProtoVer,
         %% For bridge mode, non-standard implementation
         %% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
-        is_bridge = (BridgeTag =:= 8),
+        is_bridge = IsBridge,
         clean_start = bool(CleanStart),
         will_flag = WillFlag,
         will_qos = WillQoS,
@@ -343,16 +365,16 @@ parse_connect2(
                 unexpected_trailing_bytes => size(Rest7)
             })
     end;
-parse_connect2(_ProtoName, Bin, _StrictMode) ->
-    %% sent less than 32 bytes
+do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) ->
+    %% sent less than 24 bytes
     ?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
 
 parse_packet(
     #mqtt_packet_header{type = ?CONNECT},
     FrameBin,
-    #{strict_mode := StrictMode}
+    Options
 ) ->
-    parse_connect(FrameBin, StrictMode);
+    parse_connect(FrameBin, Options);
 parse_packet(
     #mqtt_packet_header{type = ?CONNACK},
     <<AckFlags:8, ReasonCode:8, Rest/binary>>,
@@ -516,6 +538,12 @@ parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
 parse_packet_id(_) ->
     ?PARSE_ERR(invalid_packet_id).
 
+parse_connect_proto_ver(<<BridgeTag:4, ProtoVer:4, Rest/binary>>) ->
+    {_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest};
+parse_connect_proto_ver(Bin) ->
+    %% sent less than 1 bytes or empty
+    ?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
+
 parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
     {#{}, Bin};
 %% TODO: version mess?
@@ -739,6 +767,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize, strict_mode := StrictMode})
 initial_serialize_opts(Opts) ->
     maps:merge(?DEFAULT_OPTIONS, Opts).
 
+serialize_opts(?NONE(Options)) ->
+    maps:merge(?DEFAULT_OPTIONS, Options);
 serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
     MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
     #{version => ProtoVer, max_size => MaxSize, strict_mode => false}.
@@ -1157,18 +1187,34 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos);
 validate_subqos([_ | T]) -> validate_subqos(T);
 validate_subqos([]) -> ok.
 
+%% from spec: the server MAY send disconnect with reason code 0x84
+%% we chose to close socket because the client is likely not talking MQTT anyway
+validate_proto_name(<<"MQTT">>) ->
+    ok;
+validate_proto_name(<<"MQIsdp">>) ->
+    ok;
+validate_proto_name(ProtoName) ->
+    ?PARSE_ERR(#{
+        cause => invalid_proto_name,
+        expected => <<"'MQTT' or 'MQIsdp'">>,
+        received => ProtoName
+    }).
+
 %% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3]
+-compile({inline, [validate_connect_reserved/1]}).
 validate_connect_reserved(0) -> ok;
 validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
 
+-compile({inline, [validate_connect_will/3]}).
 %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
-validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos);
+validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
 %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
 validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
 %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
 validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain);
 validate_connect_will(_, _, _) -> ok.
 
+-compile({inline, [validate_connect_password_flag/4]}).
 %% MQTT-v3.1
 %% Username flag and password flag are not strongly related
 %% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
@@ -1183,6 +1229,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) ->
 validate_connect_password_flag(_, _, _, _) ->
     ok.
 
+-compile({inline, [bool/1]}).
 bool(0) -> false;
 bool(1) -> true.
 

+ 15 - 6
apps/emqx/src/emqx_listeners.erl

@@ -432,7 +432,7 @@ do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTE
     esockd:open(
         Id,
         ListenOn,
-        merge_default(esockd_opts(Id, Type, Name, Opts))
+        merge_default(esockd_opts(Id, Type, Name, Opts, _OldOpts = undefined))
     );
 %% Start MQTT/WS listener
 do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
@@ -476,7 +476,7 @@ do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
     Id = listener_id(Type, Name),
     case maps:get(bind, OldConf) of
         ListenOn ->
-            esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
+            esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf, OldConf));
         _Different ->
             %% TODO
             %% Again, we're not strictly required to drop live connections in this case.
@@ -588,7 +588,7 @@ perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
 perform_listener_change(stop, {Type, Name, Conf}) ->
     stop_listener(Type, Name, Conf).
 
-esockd_opts(ListenerId, Type, Name, Opts0) ->
+esockd_opts(ListenerId, Type, Name, Opts0, OldOpts) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Limiter = limiter(Opts0),
     Opts2 =
@@ -620,7 +620,7 @@ esockd_opts(ListenerId, Type, Name, Opts0) ->
             tcp ->
                 Opts3#{tcp_options => tcp_opts(Opts0)};
             ssl ->
-                OptsWithCRL = inject_crl_config(Opts0),
+                OptsWithCRL = inject_crl_config(Opts0, OldOpts),
                 OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL),
                 OptsWithRootFun = inject_root_fun(OptsWithSNI),
                 OptsWithVerifyFun = inject_verify_fun(OptsWithRootFun),
@@ -996,7 +996,7 @@ inject_sni_fun(_ListenerId, Conf) ->
     Conf.
 
 inject_crl_config(
-    Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}
+    Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}, _OldOpts
 ) ->
     HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
     Conf#{
@@ -1006,7 +1006,16 @@ inject_crl_config(
             crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
         }
     };
-inject_crl_config(Conf) ->
+inject_crl_config(#{ssl_options := SSLOpts0} = Conf0, #{} = OldOpts) ->
+    %% Note: we must set crl options to `undefined' to unset them.  Otherwise,
+    %% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
+    %% options were previously enabled.
+    WasEnabled = emqx_utils_maps:deep_get([ssl_options, enable_crl_check], OldOpts, false),
+    Undefine = fun(Acc, K) -> emqx_utils_maps:put_if(Acc, K, undefined, WasEnabled) end,
+    SSLOpts1 = Undefine(SSLOpts0, crl_check),
+    SSLOpts = Undefine(SSLOpts1, crl_cache),
+    Conf0#{ssl_options := SSLOpts};
+inject_crl_config(Conf, undefined = _OldOpts) ->
     Conf.
 
 maybe_unregister_ocsp_stapling_refresh(

+ 1 - 1
apps/emqx/src/emqx_logger_jsonfmt.erl

@@ -105,7 +105,7 @@ format(Msg, Meta, Config) ->
 maybe_format_msg(undefined, _Meta, _Config) ->
     #{};
 maybe_format_msg({report, Report0} = Msg, #{report_cb := Cb} = Meta, Config) ->
-    Report = emqx_logger_textfmt:try_encode_payload(Report0, Config),
+    Report = emqx_logger_textfmt:try_encode_meta(Report0, Config),
     case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of
         true ->
             %% reporting a map without a customised format function

+ 19 - 5
apps/emqx/src/emqx_logger_textfmt.erl

@@ -20,7 +20,7 @@
 
 -export([format/2]).
 -export([check_config/1]).
--export([try_format_unicode/1, try_encode_payload/2]).
+-export([try_format_unicode/1, try_encode_meta/2]).
 %% Used in the other log formatters
 -export([evaluate_lazy_values_if_dbg_level/1, evaluate_lazy_values/1]).
 
@@ -111,7 +111,7 @@ is_list_report_acceptable(_) ->
 enrich_report(ReportRaw0, Meta, Config) ->
     %% clientid and peername always in emqx_conn's process metadata.
     %% topic and username can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
-    ReportRaw = try_encode_payload(ReportRaw0, Config),
+    ReportRaw = try_encode_meta(ReportRaw0, Config),
     Topic =
         case maps:get(topic, Meta, undefined) of
             undefined -> maps:get(topic, ReportRaw, undefined);
@@ -180,9 +180,22 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
 enrich_topic(Msg, _) ->
     Msg.
 
-try_encode_payload(#{payload := Payload} = Report, #{payload_encode := Encode}) ->
+try_encode_meta(Report, Config) ->
+    lists:foldl(
+        fun(Meta, Acc) ->
+            try_encode_meta(Meta, Acc, Config)
+        end,
+        Report,
+        [payload, packet]
+    ).
+
+try_encode_meta(payload, #{payload := Payload} = Report, #{payload_encode := Encode}) ->
     Report#{payload := encode_payload(Payload, Encode)};
-try_encode_payload(Report, _Config) ->
+try_encode_meta(packet, #{packet := Packet} = Report, #{payload_encode := Encode}) when
+    is_tuple(Packet)
+->
+    Report#{packet := emqx_packet:format(Packet, Encode)};
+try_encode_meta(_, Report, _Config) ->
     Report.
 
 encode_payload(Payload, text) ->
@@ -190,4 +203,5 @@ encode_payload(Payload, text) ->
 encode_payload(_Payload, hidden) ->
     "******";
 encode_payload(Payload, hex) ->
-    binary:encode_hex(Payload).
+    Bin = emqx_utils_conv:bin(Payload),
+    binary:encode_hex(Bin).

+ 0 - 5
apps/emqx/src/emqx_packet.erl

@@ -51,7 +51,6 @@
 ]).
 
 -export([
-    format/1,
     format/2
 ]).
 
@@ -481,10 +480,6 @@ will_msg(#mqtt_packet_connect{
         headers = #{username => Username, properties => Props}
     }.
 
-%% @doc Format packet
--spec format(emqx_types:packet()) -> iolist().
-format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
-
 %% @doc Format packet
 -spec format(emqx_types:packet(), hex | text | hidden) -> iolist().
 format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->

+ 4 - 4
apps/emqx/src/emqx_quic_connection.erl

@@ -62,7 +62,7 @@
     streams := [{pid(), quicer:stream_handle()}],
     %% New stream opts
     stream_opts := map(),
-    %% If conneciton is resumed from session ticket
+    %% If connection is resumed from session ticket
     is_resumed => boolean(),
     %% mqtt message serializer config
     serialize => undefined,
@@ -70,8 +70,8 @@
 }.
 -type cb_ret() :: quicer_lib:cb_ret().
 
-%% @doc  Data streams initializions are started in parallel with control streams, data streams are blocked
-%%       for the activation from control stream after it is accepted as a legit conneciton.
+%% @doc  Data streams initializations are started in parallel with control streams, data streams are blocked
+%%       for the activation from control stream after it is accepted as a legit connection.
 %%       For security, the initial number of allowed data streams from client should be limited by
 %%       'peer_bidi_stream_count` & 'peer_unidi_stream_count`
 -spec activate_data_streams(pid(), {
@@ -80,7 +80,7 @@
 activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
     gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
 
-%% @doc conneciton owner init callback
+%% @doc connection owner init callback
 -spec init(map()) -> {ok, cb_state()}.
 init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
     init(S#{stream_opts := maps:from_list(SOpts)});

+ 8 - 0
apps/emqx/src/emqx_tls_lib.erl

@@ -589,6 +589,14 @@ ensure_valid_options(Options, Versions) ->
 
 ensure_valid_options([], _, Acc) ->
     lists:reverse(Acc);
+ensure_valid_options([{K, undefined} | T], Versions, Acc) when
+    K =:= crl_check;
+    K =:= crl_cache
+->
+    %% Note: we must set crl options to `undefined' to unset them.  Otherwise,
+    %% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
+    %% options were previously enabled.
+    ensure_valid_options(T, Versions, [{K, undefined} | Acc]);
 ensure_valid_options([{_, undefined} | T], Versions, Acc) ->
     ensure_valid_options(T, Versions, Acc);
 ensure_valid_options([{_, ""} | T], Versions, Acc) ->

+ 0 - 5
apps/emqx/src/emqx_trace/emqx_trace_formatter.erl

@@ -17,7 +17,6 @@
 -include("emqx_mqtt.hrl").
 
 -export([format/2]).
--export([format_meta_map/1]).
 
 %% logger_formatter:config/0 is not exported.
 -type config() :: map().
@@ -43,10 +42,6 @@ format(
 format(Event, Config) ->
     emqx_logger_textfmt:format(Event, Config).
 
-format_meta_map(Meta) ->
-    Encode = emqx_trace_handler:payload_encode(),
-    format_meta_map(Meta, Encode).
-
 format_meta_map(Meta, Encode) ->
     format_meta_map(Meta, Encode, [
         {packet, fun format_packet/2},

+ 11 - 2
apps/emqx/src/emqx_ws_connection.erl

@@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) ->
     %% TODO: should not close the ws connection
     ?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
     shutdown(unexpected_ws_frame, State).
+
 websocket_info({call, From, Req}, State) ->
     handle_call(From, Req, State);
 websocket_info({cast, rate_limit}, State) ->
@@ -737,7 +738,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
                 input_bytes => Data
             }),
             FrameError = {frame_error, Reason},
-            {[{incoming, FrameError} | Packets], State};
+            NState = enrich_state(Reason, State),
+            {[{incoming, FrameError} | Packets], NState};
         error:Reason:Stacktrace ->
             ?LOG(error, #{
                 at_state => emqx_frame:describe_state(ParseState),
@@ -830,7 +832,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
                 ?LOG(warning, #{
                     msg => "packet_discarded",
                     reason => "frame_too_large",
-                    packet => emqx_packet:format(Packet)
+                    packet => Packet
                 }),
                 ok = emqx_metrics:inc('delivery.dropped.too_large'),
                 ok = emqx_metrics:inc('delivery.dropped'),
@@ -1069,6 +1071,13 @@ check_max_connection(Type, Listener) ->
                     {denny, Reason}
             end
     end.
+
+enrich_state(#{parse_state := NParseState}, State) ->
+    Serialize = emqx_frame:serialize_opts(NParseState),
+    State#state{parse_state = NParseState, serialize = Serialize};
+enrich_state(_, State) ->
+    State.
+
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 21 - 13
apps/emqx/test/emqx_channel_SUITE.erl

@@ -414,24 +414,32 @@ t_handle_in_auth(_) ->
         emqx_channel:handle_in(?AUTH_PACKET(), Channel).
 
 t_handle_in_frame_error(_) ->
-    IdleChannel = channel(#{conn_state => idle}),
-    {shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} =
-        emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel),
+    IdleChannelV5 = channel(#{conn_state => idle}),
+    %% no CONNACK packet for v4
+    ?assertMatch(
+        {shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan},
+        emqx_channel:handle_in(
+            {frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5)
+        )
+    ),
+
     ConnectingChan = channel(#{conn_state => connecting}),
     ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
-    {shutdown,
-        #{
-            shutdown_count := frame_too_large,
-            cause := frame_too_large,
-            limit := 100,
-            received := 101
-        },
-        ConnackPacket,
-        _} =
+    ?assertMatch(
+        {shutdown,
+            #{
+                shutdown_count := frame_too_large,
+                cause := frame_too_large,
+                limit := 100,
+                received := 101
+            },
+            ConnackPacket, _},
         emqx_channel:handle_in(
             {frame_error, #{cause => frame_too_large, received => 101, limit => 100}},
             ConnectingChan
-        ),
+        )
+    ),
+
     DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
     ConnectedChan = channel(#{conn_state => connected}),
     ?assertMatch(

+ 105 - 2
apps/emqx/test/emqx_crl_cache_SUITE.erl

@@ -138,13 +138,14 @@ init_per_testcase(t_refresh_config = TestCase, Config) ->
     ];
 init_per_testcase(TestCase, Config) when
     TestCase =:= t_update_listener;
+    TestCase =:= t_update_listener_enable_disable;
     TestCase =:= t_validations
 ->
     ct:timetrap({seconds, 30}),
     ok = snabbkaffe:start_trace(),
     %% when running emqx standalone tests, we can't use those
     %% features.
-    case does_module_exist(emqx_management) of
+    case does_module_exist(emqx_mgmt) of
         true ->
             DataDir = ?config(data_dir, Config),
             CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
@@ -165,7 +166,7 @@ init_per_testcase(TestCase, Config) when
                     {emqx_conf, #{config => #{listeners => #{ssl => #{default => ListenerConf}}}}},
                     emqx,
                     emqx_management,
-                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                    emqx_mgmt_api_test_util:emqx_dashboard()
                 ],
                 #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
             ),
@@ -206,6 +207,7 @@ read_crl(Filename) ->
 
 end_per_testcase(TestCase, Config) when
     TestCase =:= t_update_listener;
+    TestCase =:= t_update_listener_enable_disable;
     TestCase =:= t_validations
 ->
     Skip = proplists:get_bool(skip_does_not_apply, Config),
@@ -1057,3 +1059,104 @@ do_t_validations(_Config) ->
     ),
 
     ok.
+
+%% Checks that if CRL is ever enabled and then disabled, clients can connect, even if they
+%% would otherwise not have their corresponding CRLs cached and fail with `{bad_crls,
+%% no_relevant_crls}`.
+t_update_listener_enable_disable(Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ct:pal("skipping as this test does not apply in this profile"),
+            ok;
+        false ->
+            do_t_update_listener_enable_disable(Config)
+    end.
+
+do_t_update_listener_enable_disable(Config) ->
+    DataDir = ?config(data_dir, Config),
+    Keyfile = filename:join([DataDir, "server.key.pem"]),
+    Certfile = filename:join([DataDir, "server.cert.pem"]),
+    Cacertfile = filename:join([DataDir, "ca-chain.cert.pem"]),
+    ClientCert = filename:join(DataDir, "client.cert.pem"),
+    ClientKey = filename:join(DataDir, "client.key.pem"),
+
+    ListenerId = "ssl:default",
+    %% Enable CRL
+    {ok, {{_, 200, _}, _, ListenerData0}} = get_listener_via_api(ListenerId),
+    CRLConfig0 =
+        #{
+            <<"ssl_options">> =>
+                #{
+                    <<"keyfile">> => Keyfile,
+                    <<"certfile">> => Certfile,
+                    <<"cacertfile">> => Cacertfile,
+                    <<"enable_crl_check">> => true,
+                    <<"fail_if_no_peer_cert">> => true
+                }
+        },
+    ListenerData1 = emqx_utils_maps:deep_merge(ListenerData0, CRLConfig0),
+    {ok, {_, _, ListenerData2}} = update_listener_via_api(ListenerId, ListenerData1),
+    ?assertMatch(
+        #{
+            <<"ssl_options">> :=
+                #{
+                    <<"enable_crl_check">> := true,
+                    <<"verify">> := <<"verify_peer">>,
+                    <<"fail_if_no_peer_cert">> := true
+                }
+        },
+        ListenerData2
+    ),
+
+    %% Disable CRL
+    CRLConfig1 =
+        #{
+            <<"ssl_options">> =>
+                #{
+                    <<"keyfile">> => Keyfile,
+                    <<"certfile">> => Certfile,
+                    <<"cacertfile">> => Cacertfile,
+                    <<"enable_crl_check">> => false,
+                    <<"fail_if_no_peer_cert">> => true
+                }
+        },
+    ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, CRLConfig1),
+    redbug:start(
+        [
+            "esockd_server:get_listener_prop -> return",
+            "esockd_server:set_listener_prop -> return",
+            "esockd:merge_opts -> return",
+            "esockd_listener_sup:set_options -> return",
+            "emqx_listeners:inject_crl_config -> return"
+        ],
+        [{msgs, 100}]
+    ),
+    {ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3),
+    ?assertMatch(
+        #{
+            <<"ssl_options">> :=
+                #{
+                    <<"enable_crl_check">> := false,
+                    <<"verify">> := <<"verify_peer">>,
+                    <<"fail_if_no_peer_cert">> := true
+                }
+        },
+        ListenerData4
+    ),
+
+    %% Now the client that would be blocked tries to connect and should now be allowed.
+    {ok, C} = emqtt:start_link([
+        {ssl, true},
+        {ssl_opts, [
+            {certfile, ClientCert},
+            {keyfile, ClientKey},
+            {verify, verify_none}
+        ]},
+        {port, 8883}
+    ]),
+    ?assertMatch({ok, _}, emqtt:connect(C)),
+    emqtt:stop(C),
+
+    ?assertNotReceive({http_get, _}),
+
+    ok.

+ 44 - 10
apps/emqx/test/emqx_frame_SUITE.erl

@@ -63,6 +63,7 @@ groups() ->
             t_parse_malformed_properties,
             t_malformed_connect_header,
             t_malformed_connect_data,
+            t_malformed_connect_data_proto_ver,
             t_reserved_connect_flag,
             t_invalid_clientid,
             t_undefined_password,
@@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) ->
     ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
     ?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
 
+%% TODO: parse v3 with 0 length clientid
+
 t_serialize_parse_v3_connect(_) ->
     Bin =
         <<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117,
@@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) ->
         header = #mqtt_packet_header{type = ?CONNECT},
         variable = #mqtt_packet_connect{
             clientid = <<"C_00:0C:29:2B:77:52">>,
-            proto_ver = 16#03,
+            proto_ver = ?MQTT_PROTO_V3,
             proto_name = <<"MQIsdp">>,
             is_bridge = true,
             will_retain = true,
@@ -686,15 +689,36 @@ t_malformed_connect_header(_) ->
     ).
 
 t_malformed_connect_data(_) ->
+    ProtoNameWithLen = <<0, 6, "MQIsdp">>,
+    ConnectFlags = <<2#00000000>>,
+    ClientIdwithLen = <<0, 1, "a">>,
+    UnexpectedRestBin = <<0, 1, 2>>,
+    ?ASSERT_FRAME_THROW(
+        #{cause := malformed_connect, unexpected_trailing_bytes := 3},
+        emqx_frame:parse(
+            <<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0,
+                ClientIdwithLen/binary, UnexpectedRestBin/binary>>
+        )
+    ).
+
+t_malformed_connect_data_proto_ver(_) ->
+    Proto3NameWithLen = <<0, 6, "MQIsdp">>,
+    ?ASSERT_FRAME_THROW(
+        #{cause := malformed_connect, header_bytes := <<>>},
+        emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>)
+    ),
+    ProtoNameWithLen = <<0, 4, "MQTT">>,
     ?ASSERT_FRAME_THROW(
-        #{cause := malformed_connect, unexpected_trailing_bytes := _},
-        emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>)
+        #{cause := malformed_connect, header_bytes := <<>>},
+        emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>)
     ).
 
 t_reserved_connect_flag(_) ->
     ?assertException(
         throw,
-        {frame_parse_error, reserved_connect_flag},
+        {frame_parse_error, #{
+            cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">>
+        }},
         emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>)
     ).
 
@@ -726,7 +750,7 @@ t_undefined_password(_) ->
             },
             variable = #mqtt_packet_connect{
                 proto_name = <<"MQTT">>,
-                proto_ver = 4,
+                proto_ver = ?MQTT_PROTO_V4,
                 is_bridge = false,
                 clean_start = true,
                 will_flag = false,
@@ -774,7 +798,9 @@ t_invalid_will_retain(_) ->
             54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
     ?assertException(
         throw,
-        {frame_parse_error, invalid_will_retain},
+        {frame_parse_error, #{
+            cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
+        }},
         emqx_frame:parse(ConnectBin)
     ),
     ok.
@@ -796,22 +822,30 @@ t_invalid_will_qos(_) ->
     ),
     ?assertException(
         throw,
-        {frame_parse_error, invalid_will_qos},
+        {frame_parse_error, #{
+            cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
+        }},
         emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
     ),
     ?assertException(
         throw,
-        {frame_parse_error, invalid_will_qos},
+        {frame_parse_error, #{
+            cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
+        }},
         emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
     ),
     ?assertException(
         throw,
-        {frame_parse_error, invalid_will_qos},
+        {frame_parse_error, #{
+            cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
+        }},
         emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
     ),
     ?assertException(
         throw,
-        {frame_parse_error, invalid_will_qos},
+        {frame_parse_error, #{
+            cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
+        }},
         emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
     ),
     ok.

+ 45 - 27
apps/emqx/test/emqx_packet_SUITE.erl

@@ -377,42 +377,60 @@ t_will_msg(_) ->
 
 t_format(_) ->
     io:format("~ts", [
-        emqx_packet:format(#mqtt_packet{
-            header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
-            variable = undefined
-        })
-    ]),
-    io:format("~ts", [
-        emqx_packet:format(#mqtt_packet{
-            header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>
-        })
+        emqx_packet:format(
+            #mqtt_packet{
+                header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
+                variable = undefined
+            },
+            text
+        )
     ]),
+    io:format(
+        "~ts",
+        [
+            emqx_packet:format(
+                #mqtt_packet{
+                    header = #mqtt_packet_header{type = ?CONNACK},
+                    variable = 1,
+                    payload = <<"payload">>
+                },
+                text
+            )
+        ]
+    ),
     io:format("~ts", [
         emqx_packet:format(
-            ?CONNECT_PACKET(#mqtt_packet_connect{
-                will_flag = true,
-                will_retain = true,
-                will_qos = ?QOS_2,
-                will_topic = <<"topic">>,
-                will_payload = <<"payload">>
-            })
+            ?CONNECT_PACKET(
+                #mqtt_packet_connect{
+                    will_flag = true,
+                    will_retain = true,
+                    will_qos = ?QOS_2,
+                    will_topic = <<"topic">>,
+                    will_payload = <<"payload">>
+                }
+            ),
+            text
         )
     ]),
     io:format("~ts", [
-        emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))
+        emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}), text)
+    ]),
+    io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER), text)]),
+    io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1), text)]),
+    io:format("~ts", [
+        emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>), text)
     ]),
-    io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
-    io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
-    io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
-    io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
-    io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]),
+    io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98), text)]),
+    io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99), text)]),
     io:format("~ts", [
-        emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))
+        emqx_packet:format(
+            ?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]), text
+        )
     ]),
-    io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
-    io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
-    io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
-    io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
+    io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]), text)]),
+    io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]), text)]),
+    io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90), text)]),
+    io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128), text)]).
 
 t_parse_empty_publish(_) ->
     %% 52: 0011(type=PUBLISH) 0100 (QoS=2)

+ 1 - 1
apps/emqx_auth/src/emqx_auth.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth, [
     {description, "EMQX Authentication and authorization"},
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {modules, []},
     {registered, [emqx_auth_sup]},
     {applications, [

+ 7 - 1
apps/emqx_auth/src/emqx_authz/emqx_authz.erl

@@ -477,9 +477,15 @@ authorize_deny(
     sources()
 ) ->
     authz_result().
-authorize(Client, PubSub, Topic, _DefaultResult, Sources) ->
+authorize(#{username := Username} = Client, PubSub, Topic, _DefaultResult, Sources) ->
     case maps:get(is_superuser, Client, false) of
         true ->
+            ?tp(authz_skipped, #{reason => client_is_superuser, action => PubSub}),
+            ?TRACE("AUTHZ", "authorization_skipped_as_superuser", #{
+                username => Username,
+                topic => Topic,
+                action => emqx_access_control:format_action(PubSub)
+            }),
             emqx_metrics:inc(?METRIC_SUPERUSER),
             {stop, #{result => allow, from => superuser}};
         false ->

+ 72 - 0
apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl

@@ -674,5 +674,77 @@ t_publish_last_will_testament_banned_client_connecting(_Config) ->
 
     ok.
 
+t_sikpped_as_superuser(_Config) ->
+    ClientInfo = #{
+        clientid => <<"clientid">>,
+        username => <<"username">>,
+        peerhost => {127, 0, 0, 1},
+        zone => default,
+        listener => {tcp, default},
+        is_superuser => true
+    },
+    ?check_trace(
+        begin
+            ?assertEqual(
+                allow,
+                emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_0), <<"p/t/0">>)
+            ),
+            ?assertEqual(
+                allow,
+                emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_1), <<"p/t/1">>)
+            ),
+            ?assertEqual(
+                allow,
+                emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_2), <<"p/t/2">>)
+            ),
+            ?assertEqual(
+                allow,
+                emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_0), <<"s/t/0">>)
+            ),
+            ?assertEqual(
+                allow,
+                emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_1), <<"s/t/1">>)
+            ),
+            ?assertEqual(
+                allow,
+                emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_2), <<"s/t/2">>)
+            )
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [
+                    #{
+                        reason := client_is_superuser,
+                        action := #{qos := ?QOS_0, action_type := publish}
+                    },
+                    #{
+                        reason := client_is_superuser,
+                        action := #{qos := ?QOS_1, action_type := publish}
+                    },
+                    #{
+                        reason := client_is_superuser,
+                        action := #{qos := ?QOS_2, action_type := publish}
+                    },
+                    #{
+                        reason := client_is_superuser,
+                        action := #{qos := ?QOS_0, action_type := subscribe}
+                    },
+                    #{
+                        reason := client_is_superuser,
+                        action := #{qos := ?QOS_1, action_type := subscribe}
+                    },
+                    #{
+                        reason := client_is_superuser,
+                        action := #{qos := ?QOS_2, action_type := subscribe}
+                    }
+                ],
+                ?of_kind(authz_skipped, Trace)
+            ),
+            ok
+        end
+    ),
+
+    ok = snabbkaffe:stop().
+
 stop_apps(Apps) ->
     lists:foreach(fun application:stop/1, Apps).

+ 1 - 1
apps/emqx_auth_http/src/emqx_auth_http.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_http, [
     {description, "EMQX External HTTP API Authentication and Authorization"},
-    {vsn, "0.3.0"},
+    {vsn, "0.3.1"},
     {registered, []},
     {mod, {emqx_auth_http_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_jwt, [
     {description, "EMQX JWT Authentication and Authorization"},
-    {vsn, "0.3.2"},
+    {vsn, "0.3.3"},
     {registered, []},
     {mod, {emqx_auth_jwt_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_mnesia, [
     {description, "EMQX Buitl-in Database Authentication and Authorization"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {mod, {emqx_auth_mnesia_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_mongodb, [
     {description, "EMQX MongoDB Authentication and Authorization"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {registered, []},
     {mod, {emqx_auth_mongodb_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_mysql, [
     {description, "EMQX MySQL Authentication and Authorization"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {registered, []},
     {mod, {emqx_auth_mysql_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_postgresql, [
     {description, "EMQX PostgreSQL Authentication and Authorization"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {registered, []},
     {mod, {emqx_auth_postgresql_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_auth_redis/src/emqx_auth_redis.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_redis, [
     {description, "EMQX Redis Authentication and Authorization"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {registered, []},
     {mod, {emqx_auth_redis_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -1154,7 +1154,7 @@ t_bridges_probe(Config) ->
     ?assertMatch(
         {ok, 400, #{
             <<"code">> := <<"TEST_FAILED">>,
-            <<"message">> := <<"Connection refused">>
+            <<"message">> := <<"Connection refused", _/binary>>
         }},
         request_json(
             post,

+ 4 - 1
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) ->
             ),
 
             ?force_ordering(
-                #{?snk_kind := call_query},
+                #{?snk_kind := SNKKind} when
+                    SNKKind =:= call_query orelse SNKKind =:= simple_query_enter,
                 #{?snk_kind := cut_connection, ?snk_span := start}
             ),
             %% Note: order of arguments here is reversed compared to `?force_ordering'.
@@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) ->
                     emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
                 )
             end),
+            ?tp("publishing_message", #{}),
             try
                 {_, {ok, _}} =
                     snabbkaffe:wait_async_action(
@@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) ->
                         infinity
                     )
             after
+                ?tp("healing_failure", #{}),
                 emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
             end,
             {ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_gcp_pubsub, [
     {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
-    {vsn, "0.3.2"},
+    {vsn, "0.3.3"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_http, [
     {description, "EMQX HTTP Bridge and Connector Application"},
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_resource, ehttpc]},
     {env, [

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_kafka, [
     {description, "EMQX Enterprise Kafka Bridge"},
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {registered, [emqx_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 3 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -1918,13 +1918,14 @@ t_node_joins_existing_cluster(Config) ->
                 _Attempts2 = 50,
                 [] =/= erpc:call(N2, emqx_router, lookup_routes, [MQTTTopic])
             ),
+            NumMsgs = 50 * NPartitions,
             {ok, SRef1} =
                 snabbkaffe:subscribe(
                     ?match_event(#{
                         ?snk_kind := kafka_consumer_handle_message,
                         ?snk_span := {complete, _}
                     }),
-                    NPartitions,
+                    NumMsgs,
                     20_000
                 ),
             lists:foreach(
@@ -1933,7 +1934,7 @@ t_node_joins_existing_cluster(Config) ->
                     Val = <<"v", (integer_to_binary(N))/binary>>,
                     publish(Config, KafkaTopic, [#{key => Key, value => Val}])
                 end,
-                lists:seq(1, 10 * NPartitions)
+                lists:seq(1, NumMsgs)
             ),
             {ok, _} = snabbkaffe:receive_events(SRef1),
 

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_mqtt, [
     {description, "EMQX MQTT Broker Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {applications, [
         kernel,

+ 70 - 12
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -98,7 +98,7 @@ on_start(ResourceId, #{server := Server} = Conf) ->
                 server => Server
             }};
         {error, Reason} ->
-            {error, Reason}
+            {error, emqx_maybe:define(explain_error(Reason), Reason)}
     end.
 
 on_add_channel(
@@ -200,7 +200,7 @@ on_get_channel_status(
     } = _State
 ) when is_map_key(ChannelId, Channels) ->
     %% The channel should be ok as long as the MQTT client is ok
-    connected.
+    ?status_connected.
 
 on_get_channels(ResId) ->
     emqx_bridge_v2:get_channels_for_connector(ResId).
@@ -356,10 +356,15 @@ on_get_status(_ResourceId, State) ->
     Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
     try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
         Statuses ->
-            combine_status(Statuses)
+            case combine_status(Statuses) of
+                {Status, Msg} ->
+                    {Status, State, Msg};
+                Status ->
+                    Status
+            end
     catch
         exit:timeout ->
-            connecting
+            ?status_connecting
     end.
 
 get_status({_Pool, Worker}) ->
@@ -367,7 +372,7 @@ get_status({_Pool, Worker}) ->
         {ok, Client} ->
             emqx_bridge_mqtt_ingress:status(Client);
         {error, _} ->
-            disconnected
+            ?status_disconnected
     end.
 
 combine_status(Statuses) ->
@@ -375,11 +380,25 @@ combine_status(Statuses) ->
     %% Natural order of statuses: [connected, connecting, disconnected]
     %% * `disconnected` wins over any other status
     %% * `connecting` wins over `connected`
-    case lists:reverse(lists:usort(Statuses)) of
+    ToStatus = fun
+        ({S, _Reason}) -> S;
+        (S) when is_atom(S) -> S
+    end,
+    CompareFn = fun(S1A, S2A) ->
+        S1 = ToStatus(S1A),
+        S2 = ToStatus(S2A),
+        S1 > S2
+    end,
+    case lists:usort(CompareFn, Statuses) of
+        [{Status, Reason} | _] ->
+            case explain_error(Reason) of
+                undefined -> Status;
+                Msg -> {Status, Msg}
+            end;
         [Status | _] ->
             Status;
         [] ->
-            disconnected
+            ?status_disconnected
     end.
 
 mk_ingress_config(
@@ -514,15 +533,54 @@ connect(Pid, Name) ->
             {ok, Pid};
         {error, Reason} = Error ->
             IsDryRun = emqx_resource:is_dry_run(Name),
-            ?SLOG(?LOG_LEVEL(IsDryRun), #{
-                msg => "ingress_client_connect_failed",
-                reason => Reason,
-                resource_id => Name
-            }),
+            log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name),
             _ = catch emqtt:stop(Pid),
             Error
     end.
 
+log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) ->
+    ?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}),
+    ?SLOG(Level, #{
+        msg => "ingress_client_connect_failed",
+        reason => Reason,
+        name => Name,
+        explain => explain_error(Reason)
+    });
+log_connect_error_reason(Level, econnrefused = Reason, Name) ->
+    ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}),
+    ?SLOG(Level, #{
+        msg => "ingress_client_connect_failed",
+        reason => Reason,
+        name => Name,
+        explain => explain_error(Reason)
+    });
+log_connect_error_reason(Level, Reason, Name) ->
+    ?SLOG(Level, #{
+        msg => "ingress_client_connect_failed",
+        reason => Reason,
+        name => Name
+    }).
+
+explain_error(econnrefused) ->
+    <<
+        "Connection refused. "
+        "This error indicates that your connection attempt to the MQTT server was rejected. "
+        "In simpler terms, the server you tried to connect to refused your request. "
+        "There can be multiple reasons for this. "
+        "For example, the MQTT server you're trying to connect to might be down or not "
+        "running at all or you might have provided the wrong address "
+        "or port number for the server."
+    >>;
+explain_error({tcp_closed, _}) ->
+    <<
+        "Your MQTT connection attempt was unsuccessful. "
+        "It might be at its maximum capacity for handling new connections. "
+        "To diagnose the issue further, you can check the server logs for "
+        "any specific messages related to the unavailability or connection limits."
+    >>;
+explain_error(_Reason) ->
+    undefined.
+
 handle_disconnect(_Reason) ->
     ok.
 

+ 4 - 3
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl

@@ -19,6 +19,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 %% management APIs
 -export([
@@ -234,13 +235,13 @@ status(Pid) ->
     try
         case proplists:get_value(socket, info(Pid)) of
             Socket when Socket /= undefined ->
-                connected;
+                ?status_connected;
             undefined ->
-                connecting
+                ?status_connecting
         end
     catch
         exit:{noproc, _} ->
-            disconnected
+            ?status_disconnected
     end.
 
 %%

+ 32 - 24
apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl

@@ -1025,31 +1025,39 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
     ct:sleep(1000),
 
     %% stop the listener 1883 to make the bridge disconnected
-    ok = emqx_listeners:stop_listener('tcp:default'),
-    ct:sleep(1500),
-    ?assertMatch(
-        #{<<"status">> := Status} when
-            Status == <<"connecting">> orelse Status == <<"disconnected">>,
-        request_bridge(BridgeIDEgress)
-    ),
-
-    %% start the listener 1883 to make the bridge reconnected
-    ok = emqx_listeners:start_listener('tcp:default'),
-    timer:sleep(1500),
-    ?assertMatch(
-        #{<<"status">> := <<"connected">>},
-        request_bridge(BridgeIDEgress)
+    ?check_trace(
+        begin
+            ok = emqx_listeners:stop_listener('tcp:default'),
+            ct:sleep(1500),
+            ?assertMatch(
+                #{<<"status">> := Status} when
+                    Status == <<"connecting">> orelse Status == <<"disconnected">>,
+                request_bridge(BridgeIDEgress)
+            ),
+
+            %% start the listener 1883 to make the bridge reconnected
+            ok = emqx_listeners:start_listener('tcp:default'),
+            timer:sleep(1500),
+            ?assertMatch(
+                #{<<"status">> := <<"connected">>},
+                request_bridge(BridgeIDEgress)
+            ),
+
+            N = stop_publisher(Publisher),
+
+            %% all those messages should eventually be delivered
+            [
+                assert_mqtt_msg_received(RemoteTopic, Payload)
+             || I <- lists:seq(1, N),
+                Payload <- [integer_to_binary(I)]
+            ],
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_econnrefused_error, Trace)),
+            ok
+        end
     ),
-
-    N = stop_publisher(Publisher),
-
-    %% all those messages should eventually be delivered
-    [
-        assert_mqtt_msg_received(RemoteTopic, Payload)
-     || I <- lists:seq(1, N),
-        Payload <- [integer_to_binary(I)]
-    ],
-
     ok.
 
 start_publisher(Topic, Interval, CtrlPid) ->

+ 46 - 0
apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl

@@ -131,6 +131,9 @@ hookpoint(Config) ->
     BridgeId = bridge_id(Config),
     emqx_bridge_resource:bridge_hookpoint(BridgeId).
 
+simplify_result(Res) ->
+    emqx_bridge_v2_testlib:simplify_result(Res).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -246,3 +249,46 @@ t_receive_via_rule(Config) ->
         end
     ),
     ok.
+
+t_connect_with_more_clients_than_the_broker_accepts(Config) ->
+    Name = ?config(connector_name, Config),
+    OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default),
+    on_exit(fun() ->
+        emqx_mgmt_listeners_conf:update(tcp, default, OrgConf)
+    end),
+    NewConf = OrgConf#{<<"max_connections">> => 3},
+    {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf),
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            ?assertMatch(
+                {201, #{
+                    <<"status">> := <<"disconnected">>,
+                    <<"status_reason">> :=
+                        <<"Your MQTT connection attempt was unsuccessful", _/binary>>
+                }},
+                simplify_result(
+                    emqx_bridge_v2_testlib:create_connector_api(
+                        Config,
+                        #{<<"pool_size">> => 100}
+                    )
+                )
+            ),
+            ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}),
+            ?assertMatch(
+                {200, #{
+                    <<"status">> := <<"disconnected">>,
+                    <<"status_reason">> :=
+                        <<"Your MQTT connection attempt was unsuccessful", _/binary>>
+                }},
+                simplify_result(emqx_bridge_v2_testlib:get_connector_api(mqtt, Name))
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)),
+            ok
+        end
+    ),
+
+    ok.

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pulsar, [
     {description, "EMQX Pulsar Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {applications, [
         kernel,

+ 27 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl

@@ -11,7 +11,8 @@
     action_type_name/0,
     connector_type_name/0,
     schema_module/0,
-    is_action/1
+    is_action/1,
+    connector_action_config_to_bridge_v1_config/2
 ]).
 
 is_action(_) -> true.
@@ -23,3 +24,28 @@ action_type_name() -> pulsar.
 connector_type_name() -> pulsar.
 
 schema_module() -> emqx_bridge_pulsar_pubsub_schema.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    BridgeV1Config1 = emqx_action_info:connector_action_config_to_bridge_v1_config(
+        ConnectorConfig, ActionConfig
+    ),
+    BridgeV1Config = maps:with(v1_fields(pulsar_producer), BridgeV1Config1),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun(RO) -> maps:with(v1_fields(producer_resource_opts), RO) end,
+        BridgeV1Config
+    ).
+
+%%------------------------------------------------------------------------------------------
+%% Internal helper functions
+%%------------------------------------------------------------------------------------------
+
+v1_fields(Struct) ->
+    [
+        to_bin(K)
+     || {K, _} <- emqx_bridge_pulsar:fields(Struct)
+    ].
+
+to_bin(B) when is_binary(B) -> B;
+to_bin(L) when is_list(L) -> list_to_binary(L);
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

+ 17 - 9
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -60,6 +60,8 @@ resource_type() -> pulsar.
 
 callback_mode() -> async_if_possible.
 
+query_mode(#{resource_opts := #{query_mode := sync}}) ->
+    simple_sync_internal_buffer;
 query_mode(_Config) ->
     simple_async_internal_buffer.
 
@@ -204,12 +206,17 @@ on_query(_InstanceId, {ChannelId, Message}, State) ->
                 sync_timeout => SyncTimeout,
                 is_async => false
             }),
-            try
-                pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
-            catch
-                error:timeout ->
-                    {error, timeout}
-            end
+            ?tp_span(
+                "pulsar_producer_query_enter",
+                #{instance_id => _InstanceId, message => Message, mode => sync},
+                try
+                    ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => sync}),
+                    pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
+                catch
+                    error:timeout ->
+                        {error, timeout}
+                end
+            )
     end.
 
 -spec on_query_async(
@@ -220,11 +227,11 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
     #{channels := Channels} = State,
     case maps:find(ChannelId, Channels) of
         error ->
-            {error, channel_not_found};
+            {error, {unrecoverable_error, channel_not_found}};
         {ok, #{message := MessageTmpl, producers := Producers}} ->
             ?tp_span(
-                pulsar_producer_on_query_async,
-                #{instance_id => _InstanceId, message => Message},
+                "pulsar_producer_query_enter",
+                #{instance_id => _InstanceId, message => Message, mode => async},
                 on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn)
             )
     end.
@@ -235,6 +242,7 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
         message => PulsarMessage,
         is_async => true
     }),
+    ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => async}),
     pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
 
 on_format_query_result({ok, Info}) ->

+ 1 - 3
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl

@@ -66,10 +66,8 @@ fields(action_resource_opts) ->
         batch_size,
         batch_time,
         worker_pool_size,
-        request_ttl,
         inflight_window,
-        max_buffer_bytes,
-        query_mode
+        max_buffer_bytes
     ],
     lists:filter(
         fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,

+ 7 - 2
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl

@@ -843,7 +843,8 @@ do_t_send_with_failure(Config, FailureType) ->
                         ?wait_async_action(
                             emqx:publish(Message0),
                             #{
-                                ?snk_kind := pulsar_producer_on_query_async,
+                                ?snk_kind := "pulsar_producer_query_enter",
+                                mode := async,
                                 ?snk_span := {complete, _}
                             },
                             5_000
@@ -970,7 +971,11 @@ t_producer_process_crash(Config) ->
             {_, {ok, _}} =
                 ?wait_async_action(
                     emqx:publish(Message0),
-                    #{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}},
+                    #{
+                        ?snk_kind := "pulsar_producer_query_enter",
+                        mode := async,
+                        ?snk_span := {complete, _}
+                    },
                     5_000
                 ),
             Data0 = receive_consumed(20_000),

+ 113 - 76
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl

@@ -23,31 +23,25 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    [
-        {group, plain},
-        {group, tls}
-    ].
+    All0 = emqx_common_test_helpers:all(?MODULE),
+    All = All0 -- matrix_cases(),
+    Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
+    Groups ++ All.
 
 groups() ->
-    AllTCs = emqx_common_test_helpers:all(?MODULE),
-    [
-        {plain, AllTCs},
-        {tls, AllTCs}
-    ].
+    emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
+
+matrix_cases() ->
+    emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    %% Ensure enterprise bridge module is loaded
-    _ = emqx_bridge_enterprise:module_info(),
-    {ok, Cwd} = file:get_cwd(),
-    PrivDir = ?config(priv_dir, Config),
-    WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd),
     Apps = emqx_cth_suite:start(
         lists:flatten([
             ?APPS,
             emqx_management,
             emqx_mgmt_api_test_util:emqx_dashboard()
         ]),
-        #{work_dir => WorkDir}
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     [{suite_apps, Apps} | Config].
 
@@ -61,6 +55,7 @@ init_per_group(plain = Type, Config) ->
     case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
         true ->
             Config1 = common_init_per_group(),
+            ConnectorName = ?MODULE,
             NewConfig =
                 [
                     {proxy_name, ProxyName},
@@ -70,7 +65,7 @@ init_per_group(plain = Type, Config) ->
                     {use_tls, false}
                     | Config1 ++ Config
                 ],
-            create_connector(?MODULE, NewConfig),
+            create_connector(ConnectorName, NewConfig),
             NewConfig;
         false ->
             maybe_skip_without_ci()
@@ -82,6 +77,7 @@ init_per_group(tls = Type, Config) ->
     case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of
         true ->
             Config1 = common_init_per_group(),
+            ConnectorName = ?MODULE,
             NewConfig =
                 [
                     {proxy_name, ProxyName},
@@ -91,17 +87,21 @@ init_per_group(tls = Type, Config) ->
                     {use_tls, true}
                     | Config1 ++ Config
                 ],
-            create_connector(?MODULE, NewConfig),
+            create_connector(ConnectorName, NewConfig),
             NewConfig;
         false ->
             maybe_skip_without_ci()
-    end.
+    end;
+init_per_group(_Group, Config) ->
+    Config.
 
 end_per_group(Group, Config) when
     Group =:= plain;
     Group =:= tls
 ->
     common_end_per_group(Config),
+    ok;
+end_per_group(_Group, _Config) ->
     ok.
 
 common_init_per_group() ->
@@ -189,66 +189,49 @@ pulsar_connector(Config) ->
         ":",
         integer_to_binary(PulsarPort)
     ]),
-    Connector = #{
-        <<"connectors">> => #{
-            <<"pulsar">> => #{
-                Name => #{
-                    <<"enable">> => true,
-                    <<"ssl">> => #{
-                        <<"enable">> => UseTLS,
-                        <<"verify">> => <<"verify_none">>,
-                        <<"server_name_indication">> => <<"auto">>
-                    },
-                    <<"authentication">> => <<"none">>,
-                    <<"servers">> => ServerURL
-                }
-            }
-        }
+    InnerConfigMap = #{
+        <<"enable">> => true,
+        <<"ssl">> => #{
+            <<"enable">> => UseTLS,
+            <<"verify">> => <<"verify_none">>,
+            <<"server_name_indication">> => <<"auto">>
+        },
+        <<"authentication">> => <<"none">>,
+        <<"servers">> => ServerURL
     },
-    parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name).
+    emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap).
 
 pulsar_action(Config) ->
+    QueryMode = proplists:get_value(query_mode, Config, <<"sync">>),
     Name = atom_to_binary(?MODULE),
-    Action = #{
-        <<"actions">> => #{
-            <<"pulsar">> => #{
-                Name => #{
-                    <<"connector">> => Name,
-                    <<"enable">> => true,
-                    <<"parameters">> => #{
-                        <<"retention_period">> => <<"infinity">>,
-                        <<"max_batch_bytes">> => <<"1MB">>,
-                        <<"batch_size">> => 100,
-                        <<"strategy">> => <<"random">>,
-                        <<"buffer">> => #{
-                            <<"mode">> => <<"memory">>,
-                            <<"per_partition_limit">> => <<"10MB">>,
-                            <<"segment_bytes">> => <<"5MB">>,
-                            <<"memory_overload_protection">> => true
-                        },
-                        <<"message">> => #{
-                            <<"key">> => <<"${.clientid}">>,
-                            <<"value">> => <<"${.}">>
-                        },
-                        <<"pulsar_topic">> => ?config(pulsar_topic, Config)
-                    },
-                    <<"resource_opts">> => #{
-                        <<"health_check_interval">> => <<"1s">>,
-                        <<"metrics_flush_interval">> => <<"300ms">>
-                    }
-                }
-            }
+    InnerConfigMap = #{
+        <<"connector">> => Name,
+        <<"enable">> => true,
+        <<"parameters">> => #{
+            <<"retention_period">> => <<"infinity">>,
+            <<"max_batch_bytes">> => <<"1MB">>,
+            <<"batch_size">> => 100,
+            <<"strategy">> => <<"random">>,
+            <<"buffer">> => #{
+                <<"mode">> => <<"memory">>,
+                <<"per_partition_limit">> => <<"10MB">>,
+                <<"segment_bytes">> => <<"5MB">>,
+                <<"memory_overload_protection">> => true
+            },
+            <<"message">> => #{
+                <<"key">> => <<"${.clientid}">>,
+                <<"value">> => <<"${.}">>
+            },
+            <<"pulsar_topic">> => ?config(pulsar_topic, Config)
+        },
+        <<"resource_opts">> => #{
+            <<"query_mode">> => QueryMode,
+            <<"request_ttl">> => <<"1s">>,
+            <<"health_check_interval">> => <<"1s">>,
+            <<"metrics_flush_interval">> => <<"300ms">>
         }
     },
-    parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name).
-
-parse_and_check(Key, Mod, Conf, Name) ->
-    ConfStr = hocon_pp:do(Conf, #{}),
-    ct:pal(ConfStr),
-    {ok, RawConf} = hocon:binary(ConfStr, #{format => map}),
-    hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}),
-    #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf,
-    RetConf.
+    emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap).
 
 instance_id(Type, Name) ->
     ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
@@ -404,20 +387,44 @@ assert_status_api(Line, Type, Name, Status) ->
     ).
 -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
 
+proplists_with(Keys, PList) ->
+    lists:filter(fun({K, _}) -> lists:member(K, Keys) end, PList).
+
+group_path(Config) ->
+    case emqx_common_test_helpers:group_path(Config) of
+        [] ->
+            undefined;
+        Path ->
+            Path
+    end.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_action_probe(Config) ->
+t_action_probe(matrix) ->
+    [[plain], [tls]];
+t_action_probe(Config) when is_list(Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
     Action = pulsar_action(Config),
     {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
     ?assertMatch({{_, 204, _}, _, _}, Res0),
     ok.
 
-t_action(Config) ->
+t_action(matrix) ->
+    [
+        [plain, async],
+        [plain, sync],
+        [tls, async]
+    ];
+t_action(Config) when is_list(Config) ->
+    QueryMode =
+        case group_path(Config) of
+            [_, QM | _] -> atom_to_binary(QM);
+            _ -> <<"async">>
+        end,
     Name = atom_to_binary(?FUNCTION_NAME),
-    create_action(Name, Config),
+    create_action(Name, [{query_mode, QueryMode} | Config]),
     Actions = emqx_bridge_v2:list(actions),
     Any = fun(#{name := BName}) -> BName =:= Name end,
     ?assert(lists:any(Any, Actions), Actions),
@@ -465,7 +472,9 @@ t_action(Config) ->
 
 %% Tests that deleting/disabling an action that share the same Pulsar topic with other
 %% actions do not disturb the latter.
-t_multiple_actions_sharing_topic(Config) ->
+t_multiple_actions_sharing_topic(matrix) ->
+    [[plain], [tls]];
+t_multiple_actions_sharing_topic(Config) when is_list(Config) ->
     Type = ?TYPE,
     ConnectorName = <<"c">>,
     ConnectorConfig = pulsar_connector(Config),
@@ -546,3 +555,31 @@ t_multiple_actions_sharing_topic(Config) ->
         []
     ),
     ok.
+
+t_sync_query_down(matrix) ->
+    [[plain]];
+t_sync_query_down(Config0) when is_list(Config0) ->
+    ct:timetrap({seconds, 15}),
+    Payload = #{<<"x">> => <<"some data">>},
+    PayloadBin = emqx_utils_json:encode(Payload),
+    ClientId = <<"some_client">>,
+    Opts = #{
+        make_message_fn => fun(Topic) -> emqx_message:make(ClientId, Topic, PayloadBin) end,
+        enter_tp_filter =>
+            ?match_event(#{?snk_kind := "pulsar_producer_send"}),
+        error_tp_filter =>
+            ?match_event(#{?snk_kind := "resource_simple_sync_internal_buffer_query_timeout"}),
+        success_tp_filter =>
+            ?match_event(#{?snk_kind := pulsar_echo_consumer_message})
+    },
+    Config = [
+        {connector_type, ?TYPE},
+        {connector_name, ?FUNCTION_NAME},
+        {connector_config, pulsar_connector(Config0)},
+        {action_type, ?TYPE},
+        {action_name, ?FUNCTION_NAME},
+        {action_config, pulsar_action(Config0)}
+        | proplists_with([proxy_name, proxy_host, proxy_port], Config0)
+    ],
+    emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
+    ok.

+ 1 - 1
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_rabbitmq, [
     {description, "EMQX Enterprise RabbitMQ Bridge"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {mod, {emqx_bridge_rabbitmq_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_s3, [
     {description, "EMQX Enterprise S3 Bridge"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_sqlserver, [
     {description, "EMQX Enterprise SQL Server Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_resource, odbc]},
     {env, [

+ 1 - 1
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_syskeeper, [
     {description, "EMQX Enterprise Data bridge for Syskeeper"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_conf/src/emqx_conf.app.src

@@ -1,6 +1,6 @@
 {application, emqx_conf, [
     {description, "EMQX configuration management"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {mod, {emqx_conf_app, []}},
     {applications, [kernel, stdlib]},

+ 1 - 1
apps/emqx_connector/src/emqx_connector.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_connector, [
     {description, "EMQX Data Integration Connectors"},
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {registered, []},
     {mod, {emqx_connector_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -125,6 +125,7 @@ create(Type, Name, Conf0, Opts) ->
     TypeBin = bin(Type),
     ResourceId = resource_id(Type, Name),
     Conf = Conf0#{connector_type => TypeBin, connector_name => Name},
+    _ = emqx_alarm:ensure_deactivated(ResourceId),
     {ok, _Data} = emqx_resource:create_local(
         ResourceId,
         ?CONNECTOR_RESOURCE_GROUP,
@@ -132,7 +133,6 @@ create(Type, Name, Conf0, Opts) ->
         parse_confs(TypeBin, Name, Conf),
         parse_opts(Conf, Opts)
     ),
-    _ = emqx_alarm:ensure_deactivated(ResourceId),
     ok.
 
 update(ConnectorId, {OldConf, Conf}) ->

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -2,7 +2,7 @@
 {application, emqx_dashboard, [
     {description, "EMQX Web Dashboard"},
     % strict semver, bump manually!
-    {vsn, "5.1.3"},
+    {vsn, "5.1.4"},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
     {applications, [

+ 1 - 1
apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src

@@ -1,6 +1,6 @@
 {application, emqx_dashboard_sso, [
     {description, "EMQX Dashboard Single Sign-On"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, [emqx_dashboard_sso_sup]},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_coap, [
     {description, "CoAP Gateway"},
-    {vsn, "0.1.9"},
+    {vsn, "0.1.10"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_exproto, [
     {description, "ExProto Gateway"},
-    {vsn, "0.1.12"},
+    {vsn, "0.1.13"},
     {registered, []},
     {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_gbt32960, [
     {description, "GBT32960 Gateway"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_jt808, [
     {description, "JT/T 808 Gateway"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_mqttsn, [
     {description, "MQTT-SN Gateway"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl, redbug]},

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -2,7 +2,7 @@
 {application, emqx_management, [
     {description, "EMQX Management API and CLI"},
     % strict semver, bump manually!
-    {vsn, "5.2.3"},
+    {vsn, "5.2.4"},
     {modules, []},
     {registered, [emqx_management_sup]},
     {applications, [

+ 3 - 7
apps/emqx_management/src/emqx_mgmt_app.erl

@@ -29,13 +29,9 @@
 
 start(_Type, _Args) ->
     ok = mria:wait_for_tables(emqx_mgmt_auth:create_tables()),
-    case emqx_mgmt_auth:init_bootstrap_file() of
-        ok ->
-            emqx_conf:add_handler([api_key], emqx_mgmt_auth),
-            emqx_mgmt_sup:start_link();
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    emqx_mgmt_auth:try_init_bootstrap_file(),
+    emqx_conf:add_handler([api_key], emqx_mgmt_auth),
+    emqx_mgmt_sup:start_link().
 
 stop(_State) ->
     emqx_conf:remove_handler([api_key]),

+ 7 - 9
apps/emqx_management/src/emqx_mgmt_auth.erl

@@ -32,7 +32,7 @@
     update/5,
     delete/1,
     list/0,
-    init_bootstrap_file/0,
+    try_init_bootstrap_file/0,
     format/1
 ]).
 
@@ -52,6 +52,7 @@
 -ifdef(TEST).
 -export([create/7]).
 -export([trans/2, force_create_app/1]).
+-export([init_bootstrap_file/1]).
 -endif.
 
 -define(APP, emqx_app).
@@ -114,11 +115,12 @@ post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) ->
     end,
     ok.
 
--spec init_bootstrap_file() -> ok | {error, _}.
-init_bootstrap_file() ->
+-spec try_init_bootstrap_file() -> ok | {error, _}.
+try_init_bootstrap_file() ->
     File = bootstrap_file(),
     ?SLOG(debug, #{msg => "init_bootstrap_api_keys_from_file", file => File}),
-    init_bootstrap_file(File).
+    _ = init_bootstrap_file(File),
+    ok.
 
 create(Name, Enable, ExpiredAt, Desc, Role) ->
     ApiKey = generate_unique_api_key(Name),
@@ -357,10 +359,6 @@ init_bootstrap_file(File) ->
             init_bootstrap_file(File, Dev, MP);
         {error, Reason0} ->
             Reason = emqx_utils:explain_posix(Reason0),
-            FmtReason = emqx_utils:format(
-                "load API bootstrap file failed, file:~ts, reason:~ts",
-                [File, Reason]
-            ),
 
             ?SLOG(
                 error,
@@ -371,7 +369,7 @@ init_bootstrap_file(File) ->
                 }
             ),
 
-            {error, FmtReason}
+            {error, Reason}
     end.
 
 init_bootstrap_file(File, Dev, MP) ->

+ 22 - 2
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -23,6 +23,7 @@
 -include_lib("emqx/include/logger.hrl").
 
 -define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
+-define(EXCLUSIVE_TAB, emqx_exclusive_subscription).
 
 -export([load/0]).
 
@@ -45,7 +46,8 @@
     olp/1,
     data/1,
     ds/1,
-    cluster_info/0
+    cluster_info/0,
+    exclusive/1
 ]).
 
 -spec load() -> ok.
@@ -1024,7 +1026,9 @@ print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
     NL = maps:get(nl, Options, 0),
     RH = maps:get(rh, Options, 0),
     RAP = maps:get(rap, Options, 0),
-    emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]).
+    emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]);
+print({exclusive, {exclusive_subscription, Topic, ClientId}}) ->
+    emqx_ctl:print("topic:~ts -> ClientId:~ts~n", [Topic, ClientId]).
 
 format(_, undefined) ->
     undefined;
@@ -1085,3 +1089,19 @@ safe_call_mria(Fun, Args, OnFail) ->
             }),
             OnFail
     end.
+%%--------------------------------------------------------------------
+%% @doc Exclusive topics
+exclusive(["list"]) ->
+    case ets:info(?EXCLUSIVE_TAB, size) of
+        0 -> emqx_ctl:print("No topics.~n");
+        _ -> dump(?EXCLUSIVE_TAB, exclusive)
+    end;
+exclusive(["delete", Topic0]) ->
+    Topic = erlang:iolist_to_binary(Topic0),
+    emqx_exclusive_subscription:unsubscribe(Topic, #{is_exclusive => true}),
+    emqx_ctl:print("ok~n");
+exclusive(_) ->
+    emqx_ctl:usage([
+        {"exclusive list", "List all exclusive topics"},
+        {"exclusive delete <Topic>", "Delete an exclusive topic"}
+    ]).

+ 4 - 4
apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl

@@ -100,7 +100,7 @@ t_bootstrap_file(_) ->
     BadBin = <<"test-1:secret-11\ntest-2 secret-12">>,
     ok = file:write_file(File, BadBin),
     update_file(File),
-    ?assertMatch({error, #{reason := "invalid_format"}}, emqx_mgmt_auth:init_bootstrap_file()),
+    ?assertMatch({error, #{reason := "invalid_format"}}, emqx_mgmt_auth:init_bootstrap_file(File)),
     ?assertEqual(ok, auth_authorize(TestPath, <<"test-1">>, <<"secret-11">>)),
     ?assertMatch({error, _}, auth_authorize(TestPath, <<"test-2">>, <<"secret-12">>)),
     update_file(<<>>),
@@ -123,7 +123,7 @@ t_bootstrap_file_override(_) ->
     ok = file:write_file(File, Bin),
     update_file(File),
 
-    ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file()),
+    ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file(File)),
 
     MatchFun = fun(ApiKey) -> mnesia:match_object(#?APP{api_key = ApiKey, _ = '_'}) end,
     ?assertMatch(
@@ -156,7 +156,7 @@ t_bootstrap_file_dup_override(_) ->
     File = "./bootstrap_api_keys.txt",
     ok = file:write_file(File, Bin),
     update_file(File),
-    ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file()),
+    ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file(File)),
 
     SameAppWithDiffName = #?APP{
         name = <<"name-1">>,
@@ -190,7 +190,7 @@ t_bootstrap_file_dup_override(_) ->
 
     %% Similar to loading bootstrap file at node startup
     %% the duplicated apikey in mnesia will be cleaned up
-    ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file()),
+    ?assertEqual(ok, emqx_mgmt_auth:init_bootstrap_file(File)),
     ?assertMatch(
         {ok, [
             #?APP{

+ 5 - 0
apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl

@@ -360,4 +360,9 @@ t_autocluster_leave(Config) ->
         )
     ).
 
+t_exclusive(_Config) ->
+    emqx_ctl:run_command(["exclusive", "list"]),
+    emqx_ctl:run_command(["exclusive", "delete", "t/1"]),
+    ok.
+
 format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]).

+ 1 - 1
apps/emqx_modules/src/emqx_modules.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_modules, [
     {description, "EMQX Modules"},
-    {vsn, "5.0.27"},
+    {vsn, "5.0.28"},
     {modules, []},
     {applications, [kernel, stdlib, emqx, emqx_ctl, observer_cli]},
     {mod, {emqx_modules_app, []}},

+ 1 - 1
apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src

@@ -1,6 +1,6 @@
 {application, emqx_node_rebalance, [
     {description, "EMQX Node Rebalance"},
-    {vsn, "5.0.9"},
+    {vsn, "5.0.10"},
     {registered, [
         emqx_node_rebalance_sup,
         emqx_node_rebalance,

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugins, [
     {description, "EMQX Plugin Management"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {modules, []},
     {mod, {emqx_plugins_app, []}},
     {applications, [kernel, stdlib, emqx, erlavro]},

+ 27 - 7
apps/emqx_plugins/src/emqx_plugins.erl

@@ -1049,19 +1049,22 @@ do_load_plugin_app(AppName, Ebin) ->
     end.
 
 start_app(App) ->
-    case application:ensure_all_started(App) of
-        {ok, Started} ->
+    case run_with_timeout(application, ensure_all_started, [App], 10_000) of
+        {ok, {ok, Started}} ->
             case Started =/= [] of
                 true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
                 false -> ok
-            end,
-            ?SLOG(debug, #{msg => "started_plugin_app", app => App}),
-            ok;
-        {error, {ErrApp, Reason}} ->
+            end;
+        {ok, {error, Reason}} ->
+            throw(#{
+                msg => "failed_to_start_app",
+                app => App,
+                reason => Reason
+            });
+        {error, Reason} ->
             throw(#{
                 msg => "failed_to_start_plugin_app",
                 app => App,
-                err_app => ErrApp,
                 reason => Reason
             })
     end.
@@ -1586,3 +1589,20 @@ bin(B) when is_binary(B) -> B.
 
 wrap_to_list(Path) ->
     binary_to_list(iolist_to_binary(Path)).
+
+run_with_timeout(Module, Function, Args, Timeout) ->
+    Self = self(),
+    Fun = fun() ->
+        Result = apply(Module, Function, Args),
+        Self ! {self(), Result}
+    end,
+    Pid = spawn(Fun),
+    TimerRef = erlang:send_after(Timeout, self(), {timeout, Pid}),
+    receive
+        {Pid, Result} ->
+            _ = erlang:cancel_timer(TimerRef),
+            {ok, Result};
+        {timeout, Pid} ->
+            exit(Pid, kill),
+            {error, timeout}
+    end.

+ 1 - 1
apps/emqx_prometheus/src/emqx_prometheus.app.src

@@ -2,7 +2,7 @@
 {application, emqx_prometheus, [
     {description, "Prometheus for EMQX"},
     % strict semver, bump manually!
-    {vsn, "5.2.3"},
+    {vsn, "5.2.4"},
     {modules, []},
     {registered, [emqx_prometheus_sup]},
     {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]},

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.32"},
+    {vsn, "0.1.33"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 11 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -198,6 +198,9 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
         case simple_async_query(Id, Request, QueryOpts) of
             {error, _} = Error ->
+                ?tp("resource_simple_sync_internal_buffer_query_error", #{
+                    id => Id, request => Request
+                }),
                 Error;
             {async_return, {error, _} = Error} ->
                 Error;
@@ -210,7 +213,11 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
                     receive
                         {ReplyAlias, Response} ->
                             Response
-                    after 0 -> {error, timeout}
+                    after 0 ->
+                        ?tp("resource_simple_sync_internal_buffer_query_timeout", #{
+                            id => Id, request => Request
+                        }),
+                        {error, timeout}
                     end
                 end
         end
@@ -1324,6 +1331,7 @@ do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Res
     ?tp(simple_query_override, #{query_mode => ReqQM}),
     #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
     CallMode = call_mode(QM, CBM),
+    ?tp(simple_query_enter, #{}),
     apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
     ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
@@ -1331,6 +1339,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Res
     %% The connector supports buffer, send even in disconnected state
     #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
     CallMode = call_mode(QM, CBM),
+    ?tp(simple_query_enter, #{}),
     apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
     %% when calling from the buffer worker or other simple queries,
@@ -2327,6 +2336,7 @@ reply_call(Alias, Response) ->
 %% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
 %% callbacks.
 reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
+    ?tp("reply_call_internal_buffer", #{}),
     ?MODULE:reply_call(ReplyAlias, Response),
     do_reply_caller(MaybeReplyTo, Response).
 

+ 1 - 1
apps/emqx_schema_registry/include/emqx_schema_registry.hrl

@@ -34,7 +34,7 @@
     type :: serde_type(),
     eval_context :: term(),
     %% for future use
-    extra = []
+    extra = #{}
 }).
 -type serde() :: #serde{}.
 

+ 11 - 5
apps/emqx_schema_registry/src/emqx_schema_registry.erl

@@ -148,14 +148,19 @@ post_config_update(
 post_config_update(
     [?CONF_KEY_ROOT, schemas, NewName],
     _Cmd,
-    NewSchemas,
-    %% undefined or OldSchemas
-    _,
+    NewSchema,
+    OldSchema,
     _AppEnvs
 ) ->
-    case build_serdes([{NewName, NewSchemas}]) of
+    case OldSchema of
+        undefined ->
+            ok;
+        _ ->
+            ensure_serde_absent(NewName)
+    end,
+    case build_serdes([{NewName, NewSchema}]) of
         ok ->
-            {ok, #{NewName => NewSchemas}};
+            {ok, #{NewName => NewSchema}};
         {error, Reason, SerdesToRollback} ->
             lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
             {error, Reason}
@@ -176,6 +181,7 @@ post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, Old
             async_delete_serdes(RemovedNames)
     end,
     SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
+    ok = lists:foreach(fun ensure_serde_absent/1, [N || {N, _} <- SchemasToBuild]),
     case build_serdes(SchemasToBuild) of
         ok ->
             {ok, NewConf};

+ 39 - 10
apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl

@@ -48,6 +48,10 @@
 
 -type eval_context() :: term().
 
+-type fingerprint() :: binary().
+
+-type protobuf_cache_key() :: {schema_name(), fingerprint()}.
+
 -export_type([serde_type/0]).
 
 %%------------------------------------------------------------------------------
@@ -175,11 +179,12 @@ make_serde(avro, Name, Source) ->
         eval_context = Store
     };
 make_serde(protobuf, Name, Source) ->
-    SerdeMod = make_protobuf_serde_mod(Name, Source),
+    {CacheKey, SerdeMod} = make_protobuf_serde_mod(Name, Source),
     #serde{
         name = Name,
         type = protobuf,
-        eval_context = SerdeMod
+        eval_context = SerdeMod,
+        extra = #{cache_key => CacheKey}
     };
 make_serde(json, Name, Source) ->
     case json_decode(Source) of
@@ -254,8 +259,9 @@ eval_encode(#serde{type = json, name = Name}, [Map]) ->
 destroy(#serde{type = avro, name = _Name}) ->
     ?tp(serde_destroyed, #{type => avro, name => _Name}),
     ok;
-destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
+destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod} = Serde) ->
     unload_code(SerdeMod),
+    destroy_protobuf_code(Serde),
     ?tp(serde_destroyed, #{type => protobuf, name => _Name}),
     ok;
 destroy(#serde{type = json, name = Name}) ->
@@ -282,13 +288,14 @@ jesse_validate(Name, Map) ->
 jesse_name(Str) ->
     unicode:characters_to_list(Str).
 
--spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
+-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> {protobuf_cache_key(), module()}.
 make_protobuf_serde_mod(Name, Source) ->
     {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
     case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of
         {ok, SerdeMod, ModBinary} ->
             load_code(SerdeMod, SerdeModFileName, ModBinary),
-            SerdeMod;
+            CacheKey = protobuf_cache_key(Name, Source),
+            {CacheKey, SerdeMod};
         {error, #{error := Error, warnings := Warnings}} ->
             ?SLOG(
                 warning,
@@ -310,6 +317,13 @@ protobuf_serde_mod_name(Name) ->
     SerdeModFileName = SerdeModName ++ ".memory",
     {SerdeMod, SerdeModFileName}.
 
+%% Fixme: we cannot uncomment the following typespec because Dialyzer complains that
+%% `Source' should be `string()' due to `gpb_compile:string/3', but it does work fine with
+%% binaries...
+%% -spec protobuf_cache_key(schema_name(), schema_source()) -> {schema_name(), fingerprint()}.
+protobuf_cache_key(Name, Source) ->
+    {Name, erlang:md5(Source)}.
+
 -spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) ->
     {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
 lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
@@ -326,9 +340,9 @@ lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
 -spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) ->
     {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
 lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
-    Fingerprint = erlang:md5(Source),
-    _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
-    case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
+    CacheKey = protobuf_cache_key(Name, Source),
+    _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, CacheKey}, write),
+    case mnesia:read(?PROTOBUF_CACHE_TAB, CacheKey) of
         [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
             ?tp(schema_registry_protobuf_cache_hit, #{name => Name}),
             {ok, SerdeMod, ModBinary};
@@ -337,7 +351,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
             case generate_protobuf_code(SerdeMod0, Source) of
                 {ok, SerdeMod, ModBinary} ->
                     CacheEntry = #protobuf_cache{
-                        fingerprint = Fingerprint,
+                        fingerprint = CacheKey,
                         module = SerdeMod,
                         module_binary = ModBinary
                     },
@@ -345,7 +359,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
                     {ok, SerdeMod, ModBinary};
                 {ok, SerdeMod, ModBinary, _Warnings} ->
                     CacheEntry = #protobuf_cache{
-                        fingerprint = Fingerprint,
+                        fingerprint = CacheKey,
                         module = SerdeMod,
                         module_binary = ModBinary
                     },
@@ -390,6 +404,21 @@ unload_code(SerdeMod) ->
     _ = code:delete(SerdeMod),
     ok.
 
+-spec destroy_protobuf_code(serde()) -> ok.
+destroy_protobuf_code(Serde) ->
+    #serde{extra = #{cache_key := CacheKey}} = Serde,
+    {atomic, Res} = mria:transaction(
+        ?SCHEMA_REGISTRY_SHARD,
+        fun destroy_protobuf_code_trans/1,
+        [CacheKey]
+    ),
+    ?tp("schema_registry_protobuf_cache_destroyed", #{name => Serde#serde.name}),
+    Res.
+
+-spec destroy_protobuf_code_trans({schema_name(), fingerprint()}) -> ok.
+destroy_protobuf_code_trans(CacheKey) ->
+    mnesia:delete(?PROTOBUF_CACHE_TAB, CacheKey, write).
+
 -spec has_inner_type(serde_type(), eval_context(), [binary()]) ->
     boolean().
 has_inner_type(protobuf, _SerdeMod, [_, _ | _]) ->

+ 60 - 0
apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl

@@ -207,6 +207,66 @@ t_protobuf_invalid_schema(_Config) ->
     ),
     ok.
 
+%% Checks that we unload code and clear code generation cache after destroying a protobuf
+%% serde.
+t_destroy_protobuf(_Config) ->
+    SerdeName = ?FUNCTION_NAME,
+    SerdeNameBin = atom_to_binary(SerdeName),
+    ?check_trace(
+        #{timetrap => 5_000},
+        begin
+            Params = schema_params(protobuf),
+            ok = emqx_schema_registry:add_schema(SerdeName, Params),
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_schema_registry:delete_schema(SerdeName),
+                    #{?snk_kind := serde_destroyed, name := SerdeNameBin}
+                ),
+            %% Create again to check we don't hit the cache.
+            ok = emqx_schema_registry:add_schema(SerdeName, Params),
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_schema_registry:delete_schema(SerdeName),
+                    #{?snk_kind := serde_destroyed, name := SerdeNameBin}
+                ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)),
+            ?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)),
+            ok
+        end
+    ),
+    ok.
+
+%% Checks that we don't leave entries lingering in the protobuf code cache table when
+%% updating the source of a serde.
+t_update_protobuf_cache(_Config) ->
+    SerdeName = ?FUNCTION_NAME,
+    ?check_trace(
+        #{timetrap => 5_000},
+        begin
+            #{source := Source0} = Params0 = schema_params(protobuf),
+            ok = emqx_schema_registry:add_schema(SerdeName, Params0),
+            %% Now we touch the source so protobuf needs to be recompiled.
+            Source1 = <<Source0/binary, "\n\n">>,
+            Params1 = Params0#{source := Source1},
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_schema_registry:add_schema(SerdeName, Params1),
+                    #{?snk_kind := "schema_registry_protobuf_cache_destroyed"}
+                ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)),
+            ?assertMatch([_, _ | _], ?of_kind(schema_registry_protobuf_cache_miss, Trace)),
+            ?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)),
+            ok
+        end
+    ),
+    ok.
+
 t_json_invalid_schema(_Config) ->
     SerdeName = invalid_json,
     Params = schema_params(json),

+ 1 - 1
apps/emqx_utils/src/emqx_utils.app.src

@@ -2,7 +2,7 @@
 {application, emqx_utils, [
     {description, "Miscellaneous utilities for EMQX apps"},
     % strict semver, bump manually!
-    {vsn, "5.2.3"},
+    {vsn, "5.2.4"},
     {modules, [
         emqx_utils,
         emqx_utils_api,

+ 1 - 0
changes/ce/feat-13524.en.md

@@ -0,0 +1 @@
+Added CLI interface `emqx ctl exclusive` for the feature exclusive topics.

+ 1 - 0
changes/ce/feat-13534.en.md

@@ -0,0 +1 @@
+Add trace logging when superuser skipped authz check.

+ 4 - 0
changes/ce/fix-13357.en.md

@@ -0,0 +1,4 @@
+Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets.
+
+- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected.
+- Otherwise **DONOT** send any CONNACK or DISCONNECT packet.

+ 1 - 0
changes/ce/fix-13425.en.md

@@ -0,0 +1 @@
+The MQTT connector error log messages have been improved to provide clearer and more detailed information.

+ 1 - 0
changes/ce/fix-13541.en.md

@@ -0,0 +1 @@
+Previously, if CRL checks were ever enabled for a listener, later disabling them via the configuration would not actually disable them until the listener restarted.  This has been fixed.

+ 8 - 0
changes/ce/fix-13552.en.md

@@ -0,0 +1,8 @@
+Add a startup timeout limit for the plug-in application. Currently the timeout is 10 seconds.
+
+Starting a bad plugin while EMQX is running will result in a thrown runtime error.
+When EMQX is closed and restarted, the main starting process may hang due to the the plugin application to start failures.
+
+Maybe restarting with modified:
+- Modifed config file: make the bad plugin enabled.
+- Add a plugin with bad plugin config.

+ 1 - 0
changes/ee/feat-13546.en.md

@@ -0,0 +1 @@
+Added the option to configure the query mode for Pulsar Producer action.

+ 1 - 0
changes/ee/fix-13543.en.md

@@ -0,0 +1 @@
+Fixed an issue where the internal cache for Protobuf schemas in Schema Registry was not properly cleaned up after deleting or updating a schema.

+ 87 - 0
changes/v5.7.2.en.md

@@ -0,0 +1,87 @@
+## 5.7.2
+
+*Release Date: 2024-08-06*
+
+### Enhancements
+
+- [#13317](https://github.com/emqx/emqx/pull/13317) Added a new per-authorization source metric type: `ignore`.  This metric increments when an authorization source attempts to authorize a request but encounters scenarios where the authorization is not applicable or encounters an error, resulting in an undecidable outcome.
+
+- [#13336](https://github.com/emqx/emqx/pull/13336) Added functionality to initialize authentication data in the built-in database of an empty EMQX node or cluster using a bootstrap file in CSV or JSON format. This feature introduces new configuration entries, `bootstrap_file` and `bootstrap_type`.
+
+- [#13348](https://github.com/emqx/emqx/pull/13348) Added a new field `payload_encode` in the log configuration to determine the format of the payload in the log data. 
+
+- [#13436](https://github.com/emqx/emqx/pull/13436) Added the option to add custom request headers to JWKS requests.
+
+- [#13507](https://github.com/emqx/emqx/pull/13507) Introduced a new built-in function `getenv` in the rule engine and variform expression to facilitate access to environment variables. This function adheres to the following constraints:
+
+  - Prefix `EMQXVAR_` is added before reading from OS environment variables. For example, `getenv('FOO_BAR')` is to read `EMQXVAR_FOO_BAR`.
+  - These values are immutable once loaded from the OS environment.
+
+- [#13521](https://github.com/emqx/emqx/pull/13521) Resolved an issue where LDAP query timeouts could cause the underlying connection to become unusable, potentially causing subsequent queries to return outdated results. The fix ensures the system reconnects automatically in case of a timeout.
+
+- [#13528](https://github.com/emqx/emqx/pull/13528) Applied log throttling for the event of unrecoverable errors in data integrations.
+
+- [#13548](https://github.com/emqx/emqx/pull/13548) EMQX now can optionally invoke the `on_config_changed/2` callback function when the plugin configuration is updated via the REST API. This callback function is assumed to be exported by the `<PluginName>_app` module.
+  For example, if the plugin name and version are `my_plugin-1.0.0`, then the callback function is assumed to be `my_plugin_app:on_config_changed/2`.
+
+- [#13386](https://github.com/emqx/emqx/pull/13386) Added support for initializing a list of banned clients on an empty EMQX node or cluster with a bootstrap file in CSV format. The corresponding config entry to specify the file path is `banned.bootstrap_file`. This file is a CSV file with `,` as its delimiter. The first line of this file must be a header line. All valid headers are listed here:
+
+  - as :: required
+  - who :: required
+  - by  :: optional
+  - reason :: optional
+  - at :: optional
+  - until :: optional
+
+  See the [Configuration Manual](https://docs.emqx.com/en/enterprise/v@EE_VERSION@/hocon/) for details on each field.
+
+  Each row in the rest of this file must contain the same number of columns as the header line, and the column can be omitted then its value is `undefined`.
+
+### Bug Fixes
+
+- [#13222](https://github.com/emqx/emqx/pull/13222) Resolved issues with flags checking and error handling associated with the Will message in the `CONNECT` packet.
+  For detailed specifications, refer to:
+  
+  - MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
+  - MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
+  - MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
+  
+- [#13307](https://github.com/emqx/emqx/pull/13307) Updated `ekka` library to version 0.19.5. This version of `ekka` utilizes `mria` 0.8.8, enhancing auto-heal functionality. Previously, the auto-heal worked only when all core nodes were reachable. This update allows to apply auto-heal once the majority of core nodes are alive. For details, refer to the [Mria PR](https://github.com/emqx/mria/pull/180).
+
+- [#13334](https://github.com/emqx/emqx/pull/13334) Implemented strict mode checking for the `PasswordFlag` in the MQTT v3.1.1 CONNECT packet to align with protocol specifications.
+
+  Note: To ensure bug-to-bug compatibility, this check is performed only in strict mode.
+
+- [#13344](https://github.com/emqx/emqx/pull/13344) Resolved an issue where the `POST /clients/:clientid/subscribe/bulk` API would not function correctly if the node receiving the API request did not maintain the connection to the specified `clientid`.
+
+- [#13358](https://github.com/emqx/emqx/pull/13358) Fixed an issue when the `reason` in the `authn_complete_event` event was incorrectly displayed.
+- [#13375](https://github.com/emqx/emqx/pull/13375) The value `infinity` has been added as default value to the listener configuration fields `max_conn_rate`, `messages_rate`, and `bytes_rate`.
+
+- [#13382](https://github.com/emqx/emqx/pull/13382) Updated the `emqtt` library to version 0.4.14, which resolves an issue preventing `emqtt_pool`s from reusing pools that are in an inconsistent state.
+
+- [#13389](https://github.com/emqx/emqx/pull/13389) Fixed an issue where the `Derived Key Length` for `pbkdf2` could be set to a negative integer.
+
+- [#13389](https://github.com/emqx/emqx/pull/13389) Fixed an issue where topics in the authorization rules might be parsed incorrectly.
+
+- [#13393](https://github.com/emqx/emqx/pull/13393) Fixed an issue where plugin applications failed to restart after a node joined a cluster, resulting in hooks not being properly installed and causing inconsistent states.
+
+- [#13398](https://github.com/emqx/emqx/pull/13398) Fixed an issue where ACL rules were incorrectly cleared when reloading the built-in database for authorization using the command line.
+
+- [#13403](https://github.com/emqx/emqx/pull/13403) Addressed a security issue where environment variable configuration overrides were inadvertently logging passwords. This fix ensures that passwords present in environment variables are not logged.
+
+- [#13408](https://github.com/emqx/emqx/pull/13408) Resolved a `function_clause` crash triggered by authentication attempts with invalid salt or password types. This fix enhances error handling to better manage authentication failures involving incorrect salt or password types.
+
+- [#13419](https://github.com/emqx/emqx/pull/13419) Resolved an issue where crash log messages from the `/configs` API were displaying garbled hints. This fix ensures that log messages related to API calls are clear and understandable.
+
+- [#13422](https://github.com/emqx/emqx/pull/13422) Fixed an issue where the option `force_shutdown.max_heap_size` could not be set to 0 to disable this tuning.
+
+- [#13442](https://github.com/emqx/emqx/pull/13442) Fixed an issue where the health check interval configuration for actions/sources was not being respected. Previously, EMQX ignored the specified health check interval for actions and used the connector's interval instead. The fix ensures that EMQX now correctly uses the health check interval configured for actions/sources, allowing for independent and accurate health monitoring frequencies.
+
+- [#13503](https://github.com/emqx/emqx/pull/13503) Fixed an issue where connectors did not adhere to the configured health check interval upon initial startup, requiring an update or restart to apply the correct interval.
+
+- [#13515](https://github.com/emqx/emqx/pull/13515) Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason.
+
+- [#13527](https://github.com/emqx/emqx/pull/13527) Fixed an issue in the Rule Engine where executing a SQL test for the Message Publish event would consistently return no results when a `$bridges/...` source was included in the `FROM` clause.
+
+- [#13541](https://github.com/emqx/emqx/pull/13541) Fixed an issue where disabling CRL checks for a listener required a listener restart to take effect.
+- [#13552](https://github.com/emqx/emqx/pull/13552) Added a startup timeout limit for EMQX plugins with a default timeout of 10 seconds. Before this update, problematic plugins could cause runtime errors during startup, leading to potential issues where the main startup process might hang when EMQX is stopped and restarted.

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.7.1
+version: 5.8.0-alpha.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.7.1
+appVersion: 5.8.0-alpha.1

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.7.1
+version: 5.8.0-alpha.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.7.1
+appVersion: 5.8.0-alpha.1

+ 1 - 1
mix.exs

@@ -182,7 +182,7 @@ defmodule EMQXUmbrella.MixProject do
   end
 
   def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
-  def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true}
+  def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.12.0", override: true}
   def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
   def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.2", override: true}
   def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}

+ 0 - 0
rebar.config


Some files were not shown because too many files changed in this diff