Browse Source

Merge pull request #12005 from thalesmg/sync-r53-m-20231122

chore: sync `release-53` to `master`
Thales Macedo Garitezi 2 years ago
parent
commit
acaf2784f5
32 changed files with 391 additions and 83 deletions
  1. 1 1
      Makefile
  2. 2 2
      apps/emqx/include/emqx_release.hrl
  3. 4 2
      apps/emqx/src/emqx_channel.erl
  4. 10 10
      apps/emqx/src/emqx_connection.erl
  5. 9 9
      apps/emqx/src/emqx_ws_connection.erl
  6. 2 0
      apps/emqx_bridge/src/emqx_bridge_v2_api.erl
  7. 26 0
      apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl
  8. 1 1
      apps/emqx_bridge/test/emqx_bridge_SUITE.erl
  9. 1 1
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  10. 1 1
      apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl
  11. 20 0
      apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl
  12. 19 0
      apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
  13. 41 0
      apps/emqx_bridge/test/emqx_bridge_v2_tests.erl
  14. 1 1
      apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl
  15. 27 0
      apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl
  16. 1 1
      apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl
  17. 10 10
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
  18. 2 3
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl
  19. 5 5
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl
  20. 27 0
      apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl
  21. 4 1
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl
  22. 1 6
      apps/emqx_conf/src/emqx_conf.erl
  23. 1 1
      apps/emqx_connector/src/emqx_connector_api.erl
  24. 20 0
      apps/emqx_connector/src/schema/emqx_connector_schema.erl
  25. 1 1
      apps/emqx_connector/test/emqx_connector_SUITE.erl
  26. 129 14
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl
  27. 4 5
      apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl
  28. 9 4
      apps/emqx_resource/src/emqx_resource.erl
  29. 5 0
      changes/ce/fix-11975.en.md
  30. 3 0
      changes/ce/fix-11987.en.md
  31. 2 2
      deploy/charts/emqx-enterprise/Chart.yaml
  32. 2 2
      deploy/charts/emqx/Chart.yaml

+ 1 - 1
Makefile

@@ -21,7 +21,7 @@ endif
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
 export EMQX_DASHBOARD_VERSION ?= v1.5.1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.3.1
+export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2-beta.1
 
 PROFILE ?= emqx
 REL_PROFILES := emqx emqx-enterprise

+ 2 - 2
apps/emqx/include/emqx_release.hrl

@@ -32,10 +32,10 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Opensource edition
--define(EMQX_RELEASE_CE, "5.3.1").
+-define(EMQX_RELEASE_CE, "5.3.2").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.3.1").
+-define(EMQX_RELEASE_EE, "5.3.2-alpha.1").
 
 %% The HTTP API version
 -define(EMQX_API_VERSION, "5.0").

+ 4 - 2
apps/emqx/src/emqx_channel.erl

@@ -1216,8 +1216,10 @@ handle_info(
         {ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
         Shutdown -> Shutdown
     end;
-handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
-    ?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}),
+handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
+    %% This can happen as a race:
+    %% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed'
+    %% is already in process mailbox
     {ok, Channel};
 handle_info(clean_authz_cache, Channel) ->
     ok = emqx_authz_cache:empty_authz_cache(),

+ 10 - 10
apps/emqx/src/emqx_connection.erl

@@ -552,13 +552,13 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
     inc_counter(incoming_bytes, Len),
     ok = emqx_metrics:inc('bytes.received', Len),
     when_bytes_in(Len, Data, State);
-handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
-    case queue:peek(Cache) of
+handle_msg(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
+    case queue:peek(Buffer) of
         empty ->
-            activate_socket(State);
+            handle_info(activate_socket, State);
         {value, #pending_req{need = Needs, data = Data, next = Next}} ->
-            State2 = State#state{limiter_buffer = queue:drop(Cache)},
-            check_limiter(Needs, Data, Next, [check_cache], State2)
+            State2 = State#state{limiter_buffer = queue:drop(Buffer)},
+            check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)
     end;
 handle_msg(
     {incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
@@ -1036,13 +1036,13 @@ check_limiter(
     Data,
     WhenOk,
     _Msgs,
-    #state{limiter_buffer = Cache} = State
+    #state{limiter_buffer = Buffer} = State
 ) ->
     %% if there has a retry timer,
-    %% cache the operation and execute it after the retry is over
-    %% the maximum length of the cache queue is equal to the active_n
+    %% Buffer the operation and execute it after the retry is over
+    %% the maximum length of the buffer queue is equal to the active_n
     New = #pending_req{need = Needs, data = Data, next = WhenOk},
-    {ok, State#state{limiter_buffer = queue:in(New, Cache)}}.
+    {ok, State#state{limiter_buffer = queue:in(New, Buffer)}}.
 
 %% try to perform a retry
 -spec retry_limiter(state()) -> _.
@@ -1053,7 +1053,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
         {ok, Limiter2} ->
             Next(
                 Data,
-                [check_cache],
+                [check_limiter_buffer],
                 State#state{
                     limiter = Limiter2,
                     limiter_timer = undefined

+ 9 - 9
apps/emqx/src/emqx_ws_connection.erl

@@ -94,7 +94,7 @@
     limiter :: container(),
 
     %% cache operation when overload
-    limiter_cache :: queue:queue(cache()),
+    limiter_buffer :: queue:queue(cache()),
 
     %% limiter timers
     limiter_timer :: undefined | reference()
@@ -326,7 +326,7 @@ websocket_init([Req, Opts]) ->
                     zone = Zone,
                     listener = {Type, Listener},
                     limiter_timer = undefined,
-                    limiter_cache = queue:new()
+                    limiter_buffer = queue:new()
                 },
                 hibernate};
         {denny, Reason} ->
@@ -462,13 +462,13 @@ websocket_info(
     State
 ) ->
     return(retry_limiter(State));
-websocket_info(check_cache, #state{limiter_cache = Cache} = State) ->
-    case queue:peek(Cache) of
+websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
+    case queue:peek(Buffer) of
         empty ->
             return(enqueue({active, true}, State#state{sockstate = running}));
         {value, #cache{need = Needs, data = Data, next = Next}} ->
-            State2 = State#state{limiter_cache = queue:drop(Cache)},
-            return(check_limiter(Needs, Data, Next, [check_cache], State2))
+            State2 = State#state{limiter_buffer = queue:drop(Buffer)},
+            return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2))
     end;
 websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
     handle_timeout(TRef, Msg, State);
@@ -630,10 +630,10 @@ check_limiter(
     Data,
     WhenOk,
     _Msgs,
-    #state{limiter_cache = Cache} = State
+    #state{limiter_buffer = Buffer} = State
 ) ->
     New = #cache{need = Needs, data = Data, next = WhenOk},
-    State#state{limiter_cache = queue:in(New, Cache)}.
+    State#state{limiter_buffer = queue:in(New, Buffer)}.
 
 -spec retry_limiter(state()) -> state().
 retry_limiter(#state{limiter = Limiter} = State) ->
@@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
         {ok, Limiter2} ->
             Next(
                 Data,
-                [check_cache],
+                [check_limiter_buffer],
                 State#state{
                     limiter = Limiter2,
                     limiter_timer = undefined

+ 2 - 0
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -791,6 +791,8 @@ do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
             PreOrPostConfigUpdate =:= pre_config_update;
             PreOrPostConfigUpdate =:= post_config_update
         ->
+            ?BAD_REQUEST(map_to_json(redact(Reason)));
+        {error, Reason} when is_map(Reason) ->
             ?BAD_REQUEST(map_to_json(redact(Reason)))
     end.
 

+ 26 - 0
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -39,6 +39,7 @@
 ]).
 
 -export([types/0, types_sc/0]).
+-export([resource_opts_fields/0, resource_opts_fields/1]).
 
 -export([
     make_producer_action_schema/1,
@@ -147,6 +148,31 @@ types() ->
 types_sc() ->
     hoconsc:enum(types()).
 
+resource_opts_fields() ->
+    resource_opts_fields(_Overrides = []).
+
+resource_opts_fields(Overrides) ->
+    ActionROFields = [
+        batch_size,
+        batch_time,
+        buffer_mode,
+        buffer_seg_bytes,
+        health_check_interval,
+        inflight_window,
+        max_buffer_bytes,
+        metrics_flush_interval,
+        query_mode,
+        request_ttl,
+        resume_interval,
+        start_after_created,
+        start_timeout,
+        worker_pool_size
+    ],
+    lists:filter(
+        fun({Key, _Sc}) -> lists:member(Key, ActionROFields) end,
+        emqx_resource_schema:create_opts(Overrides)
+    ).
+
 examples(Method) ->
     MergeFun =
         fun(Example, Examples) ->

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_SUITE.erl

@@ -199,7 +199,7 @@ t_create_with_bad_name(_Config) ->
     ?assertMatch(
         {error,
             {pre_config_update, emqx_bridge_app, #{
-                reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>,
+                reason := <<"Invalid name format.", _/binary>>,
                 kind := validation_error
             }}},
         emqx:update_config(Path, Conf)

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -1365,7 +1365,7 @@ t_create_with_bad_name(Config) ->
     ?assertMatch(
         #{
             <<"kind">> := <<"validation_error">>,
-            <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
+            <<"reason">> := <<"Invalid name format.", _/binary>>
         },
         Msg
     ),

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -821,7 +821,7 @@ t_create_with_bad_name(_Config) ->
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> := #{
                 <<"kind">> := <<"validation_error">>,
-                <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
+                <<"reason">> := <<"Invalid name format.", _/binary>>
             }
         }}} = create_bridge_http_api_v1(Opts),
     ok.

+ 20 - 0
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -1021,6 +1021,26 @@ t_action_types(Config) ->
     ?assert(lists:all(fun is_binary/1, Types), #{types => Types}),
     ok.
 
+t_bad_name(Config) ->
+    Name = <<"_bad_name">>,
+    Res = request_json(
+        post,
+        uri([?ROOT]),
+        ?KAFKA_BRIDGE(Name),
+        Config
+    ),
+    ?assertMatch({ok, 400, #{<<"message">> := _}}, Res),
+    {ok, 400, #{<<"message">> := Msg0}} = Res,
+    Msg = emqx_utils_json:decode(Msg0, [return_maps]),
+    ?assertMatch(
+        #{
+            <<"kind">> := <<"validation_error">>,
+            <<"reason">> := <<"Invalid name format.", _/binary>>
+        },
+        Msg
+    ),
+    ok.
+
 %%% helpers
 listen_on_random_port() ->
     SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

+ 19 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -312,6 +312,25 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
             Error
     end.
 
+api_spec_schemas(Root) ->
+    Method = get,
+    Path = emqx_mgmt_api_test_util:api_path(["schemas", Root]),
+    Params = [],
+    AuthHeader = [],
+    Opts = #{return_all => true},
+    case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
+        {ok, {{_, 200, _}, _, Res0}} ->
+            #{<<"components">> := #{<<"schemas">> := Schemas}} =
+                emqx_utils_json:decode(Res0, [return_maps]),
+            Schemas
+    end.
+
+bridges_api_spec_schemas() ->
+    api_spec_schemas("bridges").
+
+actions_api_spec_schemas() ->
+    api_spec_schemas("actions").
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------

+ 41 - 0
apps/emqx_bridge/test/emqx_bridge_v2_tests.erl

@@ -0,0 +1,41 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_v2_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+resource_opts_union_connector_actions_test() ->
+    %% The purpose of this test is to ensure we have split `resource_opts' fields
+    %% consciouly between connector and actions, in particular when/if we introduce new
+    %% fields there.
+    AllROFields = non_deprecated_fields(emqx_resource_schema:create_opts([])),
+    ActionROFields = non_deprecated_fields(emqx_bridge_v2_schema:resource_opts_fields()),
+    ConnectorROFields = non_deprecated_fields(emqx_connector_schema:resource_opts_fields()),
+    UnionROFields = lists:usort(ConnectorROFields ++ ActionROFields),
+    ?assertEqual(
+        lists:usort(AllROFields),
+        UnionROFields,
+        #{
+            missing_fields => AllROFields -- UnionROFields,
+            unexpected_fields => UnionROFields -- AllROFields,
+            action_fields => ActionROFields,
+            connector_fields => ConnectorROFields
+        }
+    ),
+    ok.
+
+non_deprecated_fields(Fields) ->
+    [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)].

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -126,7 +126,7 @@ fields(action) ->
 fields(actions) ->
     Fields =
         override(
-            emqx_bridge_kafka:producer_opts(),
+            emqx_bridge_kafka:producer_opts(action),
             bridge_v2_overrides()
         ) ++
             [

+ 27 - 0
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl

@@ -272,6 +272,22 @@ make_message() ->
         timestamp => Time
     }.
 
+bridge_api_spec_props_for_get() ->
+    #{
+        <<"bridge_azure_event_hub.get_producer">> :=
+            #{<<"properties">> := Props}
+    } =
+        emqx_bridge_v2_testlib:bridges_api_spec_schemas(),
+    Props.
+
+action_api_spec_props_for_get() ->
+    #{
+        <<"bridge_azure_event_hub.get_bridge_v2">> :=
+            #{<<"properties">> := Props}
+    } =
+        emqx_bridge_v2_testlib:actions_api_spec_schemas(),
+    Props.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -341,3 +357,14 @@ t_same_name_azure_kafka_bridges(Config) ->
         end
     ),
     ok.
+
+t_parameters_key_api_spec(_Config) ->
+    BridgeProps = bridge_api_spec_props_for_get(),
+    ?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}),
+    ?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}),
+
+    ActionProps = action_api_spec_props_for_get(),
+    ?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}),
+    ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
+
+    ok.

+ 1 - 1
apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl

@@ -113,7 +113,7 @@ fields(action) ->
 fields(actions) ->
     Fields =
         override(
-            emqx_bridge_kafka:producer_opts(),
+            emqx_bridge_kafka:producer_opts(action),
             bridge_v2_overrides()
         ) ++
             [

+ 10 - 10
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -29,7 +29,7 @@
     desc/1,
     host_opts/0,
     ssl_client_opts_fields/0,
-    producer_opts/0
+    producer_opts/1
 ]).
 
 -export([
@@ -261,7 +261,7 @@ fields("config_producer") ->
 fields("config_consumer") ->
     fields(kafka_consumer);
 fields(kafka_producer) ->
-    connector_config_fields() ++ producer_opts();
+    connector_config_fields() ++ producer_opts(v1);
 fields(kafka_producer_action) ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@@ -270,7 +270,7 @@ fields(kafka_producer_action) ->
                 desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
             })},
         {description, emqx_schema:description_schema()}
-    ] ++ producer_opts();
+    ] ++ producer_opts(action);
 fields(kafka_consumer) ->
     connector_config_fields() ++ fields(consumer_opts);
 fields(ssl_client_opts) ->
@@ -523,7 +523,7 @@ fields(consumer_kafka_opts) ->
     ];
 fields(resource_opts) ->
     SupportedFields = [health_check_interval],
-    CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
+    CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(),
     lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts);
 fields(action_field) ->
     {kafka_producer,
@@ -599,25 +599,25 @@ connector_config_fields() ->
         {ssl, mk(ref(ssl_client_opts), #{})}
     ].
 
-producer_opts() ->
+producer_opts(ActionOrBridgeV1) ->
     [
         %% Note: there's an implicit convention in `emqx_bridge' that,
         %% for egress bridges with this config, the published messages
         %% will be forwarded to such bridges.
         {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
-        parameters_field(),
+        parameters_field(ActionOrBridgeV1),
         {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
     ].
 
 %% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
 %% However we need to keep it backward compatible for generated schema json (version 0.1.0)
 %% since schema is data for the 'schemas' API.
-parameters_field() ->
+parameters_field(ActionOrBridgeV1) ->
     {Name, Alias} =
-        case get(emqx_bridge_schema_version) of
-            <<"0.1.0">> ->
+        case ActionOrBridgeV1 of
+            v1 ->
                 {kafka, parameters};
-            _ ->
+            action ->
                 {parameters, kafka}
         end,
     {Name,

+ 2 - 3
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl

@@ -32,9 +32,8 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
     Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
         BridgeV1Conf, ConnectorName, schema_module(), kafka_producer
     ),
-    KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}),
-    Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0),
-    Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}),
+    KafkaMap = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
+    Config2 = emqx_utils_maps:deep_merge(Config0, #{<<"parameters">> => KafkaMap}),
     maps:with(producer_action_field_keys(), Config2).
 
 %%------------------------------------------------------------------------------------------

+ 5 - 5
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

@@ -25,7 +25,7 @@ kafka_producer_test() ->
                     <<"kafka_producer">> :=
                         #{
                             <<"myproducer">> :=
-                                #{<<"parameters">> := #{}}
+                                #{<<"kafka">> := #{}}
                         }
                 }
         },
@@ -52,7 +52,7 @@ kafka_producer_test() ->
                         #{
                             <<"myproducer">> :=
                                 #{
-                                    <<"parameters">> := #{},
+                                    <<"kafka">> := #{},
                                     <<"local_topic">> := <<"mqtt/local">>
                                 }
                         }
@@ -68,7 +68,7 @@ kafka_producer_test() ->
                         #{
                             <<"myproducer">> :=
                                 #{
-                                    <<"parameters">> := #{},
+                                    <<"kafka">> := #{},
                                     <<"local_topic">> := <<"mqtt/local">>
                                 }
                         }
@@ -166,7 +166,7 @@ message_key_dispatch_validations_test() ->
     ?assertThrow(
         {_, [
             #{
-                path := "bridges.kafka_producer.myproducer.parameters",
+                path := "bridges.kafka_producer.myproducer.kafka",
                 reason := "Message key cannot be empty when `key_dispatch` strategy is used"
             }
         ]},
@@ -175,7 +175,7 @@ message_key_dispatch_validations_test() ->
     ?assertThrow(
         {_, [
             #{
-                path := "bridges.kafka_producer.myproducer.parameters",
+                path := "bridges.kafka_producer.myproducer.kafka",
                 reason := "Message key cannot be empty when `key_dispatch` strategy is used"
             }
         ]},

+ 27 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -182,6 +182,22 @@ create_action(Name, Config) ->
     on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
     Res.
 
+bridge_api_spec_props_for_get() ->
+    #{
+        <<"bridge_kafka.get_producer">> :=
+            #{<<"properties">> := Props}
+    } =
+        emqx_bridge_v2_testlib:bridges_api_spec_schemas(),
+    Props.
+
+action_api_spec_props_for_get() ->
+    #{
+        <<"bridge_kafka.get_bridge_v2">> :=
+            #{<<"properties">> := Props}
+    } =
+        emqx_bridge_v2_testlib:actions_api_spec_schemas(),
+    Props.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -342,3 +358,14 @@ t_bad_url(_Config) ->
     ),
     ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
     ok.
+
+t_parameters_key_api_spec(_Config) ->
+    BridgeProps = bridge_api_spec_props_for_get(),
+    ?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}),
+    ?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}),
+
+    ActionProps = action_api_spec_props_for_get(),
+    ?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}),
+    ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
+
+    ok.

+ 4 - 1
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl

@@ -10,8 +10,11 @@
 %% Test cases
 %%===========================================================================
 
+atoms() ->
+    [my_producer].
+
 pulsar_producer_validations_test() ->
-    Name = list_to_atom("my_producer"),
+    Name = hd(atoms()),
     Conf0 = pulsar_producer_hocon(),
     Conf1 =
         Conf0 ++

+ 1 - 6
apps/emqx_conf/src/emqx_conf.erl

@@ -193,12 +193,7 @@ hotconf_schema_json() ->
 bridge_schema_json() ->
     Version = <<"0.1.0">>,
     SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => Version},
-    put(emqx_bridge_schema_version, Version),
-    try
-        gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo)
-    after
-        erase(emqx_bridge_schema_version)
-    end.
+    gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo).
 
 %% TODO: remove it and also remove hocon_md.erl and friends.
 %% markdown generation from schema is a failure and we are moving to an interactive

+ 1 - 1
apps/emqx_connector/src/emqx_connector_api.erl

@@ -372,7 +372,7 @@ schema("/connectors_probe") ->
                 case emqx_connector:remove(ConnectorType, ConnectorName) of
                     ok ->
                         ?NO_CONTENT;
-                    {error, {active_channels, Channels}} ->
+                    {error, {post_config_update, _HandlerMod, {active_channels, Channels}}} ->
                         ?BAD_REQUEST(
                             {<<"Cannot delete connector while there are active channels defined for this connector">>,
                                 Channels}

+ 20 - 0
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -35,6 +35,8 @@
 -export([connector_type_to_bridge_types/1]).
 -export([common_fields/0]).
 
+-export([resource_opts_fields/0, resource_opts_fields/1]).
+
 -if(?EMQX_RELEASE_EDITION == ee).
 enterprise_api_schemas(Method) ->
     %% We *must* do this to ensure the module is really loaded, especially when we use
@@ -364,6 +366,24 @@ common_fields() ->
         {description, emqx_schema:description_schema()}
     ].
 
+resource_opts_fields() ->
+    resource_opts_fields(_Overrides = []).
+
+resource_opts_fields(Overrides) ->
+    %% Note: these don't include buffer-related configurations because buffer workers are
+    %% tied to the action.
+    ConnectorROFields = [
+        health_check_interval,
+        query_mode,
+        request_ttl,
+        start_after_created,
+        start_timeout
+    ],
+    lists:filter(
+        fun({Key, _Sc}) -> lists:member(Key, ConnectorROFields) end,
+        emqx_resource_schema:create_opts(Overrides)
+    ).
+
 %%======================================================================================
 %% Helper Functions
 %%======================================================================================

+ 1 - 1
apps/emqx_connector/test/emqx_connector_SUITE.erl

@@ -229,7 +229,7 @@ t_create_with_bad_name_direct_path(_Config) ->
         {error,
             {pre_config_update, _ConfigHandlerMod, #{
                 kind := validation_error,
-                reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
+                reason := <<"Invalid name format.", _/binary>>
             }}},
         emqx:update_config(Path, ConnConfig)
     ),

+ 129 - 14
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -25,7 +25,7 @@
 -include_lib("snabbkaffe/include/test_macros.hrl").
 
 -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))).
--define(CONNECTOR(NAME, TYPE), #{
+-define(RESOURCE(NAME, TYPE), #{
     %<<"ssl">> => #{<<"enable">> => false},
     <<"type">> => TYPE,
     <<"name">> => NAME
@@ -52,12 +52,57 @@
 -define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)).
 -define(KAFKA_CONNECTOR(Name, BootstrapHosts),
     maps:merge(
-        ?CONNECTOR(Name, ?CONNECTOR_TYPE),
+        ?RESOURCE(Name, ?CONNECTOR_TYPE),
         ?KAFKA_CONNECTOR_BASE(BootstrapHosts)
     )
 ).
 -define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
 
+-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
+-define(BRIDGE_TYPE_STR, "kafka_producer").
+-define(BRIDGE_TYPE, <<?BRIDGE_TYPE_STR>>).
+-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{
+    <<"enable">> => true,
+    <<"connector">> => Connector,
+    <<"kafka">> => #{
+        <<"buffer">> => #{
+            <<"memory_overload_protection">> => true,
+            <<"mode">> => <<"hybrid">>,
+            <<"per_partition_limit">> => <<"2GB">>,
+            <<"segment_bytes">> => <<"100MB">>
+        },
+        <<"compression">> => <<"no_compression">>,
+        <<"kafka_ext_headers">> => [
+            #{
+                <<"kafka_ext_header_key">> => <<"clientid">>,
+                <<"kafka_ext_header_value">> => <<"${clientid}">>
+            },
+            #{
+                <<"kafka_ext_header_key">> => <<"topic">>,
+                <<"kafka_ext_header_value">> => <<"${topic}">>
+            }
+        ],
+        <<"kafka_header_value_encode_mode">> => <<"none">>,
+        <<"kafka_headers">> => <<"${pub_props}">>,
+        <<"max_batch_bytes">> => <<"896KB">>,
+        <<"max_inflight">> => 10,
+        <<"message">> => #{
+            <<"key">> => <<"${.clientid}">>,
+            <<"timestamp">> => <<"${.timestamp}">>,
+            <<"value">> => <<"${.}">>
+        },
+        <<"partition_count_refresh_interval">> => <<"60s">>,
+        <<"partition_strategy">> => <<"random">>,
+        <<"required_acks">> => <<"all_isr">>,
+        <<"topic">> => <<"kafka-topic">>
+    },
+    <<"local_topic">> => <<"mqtt/local/topic">>,
+    <<"resource_opts">> => #{
+        <<"health_check_interval">> => <<"32s">>
+    }
+}).
+-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)).
+
 %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>).
 %% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{
 %%     <<"server">> => SERVER,
@@ -105,7 +150,8 @@
     emqx,
     emqx_auth,
     emqx_management,
-    {emqx_connector, "connectors {}"}
+    {emqx_connector, "connectors {}"},
+    {emqx_bridge, "actions {}"}
 ]).
 
 -define(APPSPEC_DASHBOARD,
@@ -128,7 +174,8 @@ all() ->
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     SingleOnlyTests = [
-        t_connectors_probe
+        t_connectors_probe,
+        t_fail_delete_with_action
     ],
     ClusterLaterJoinOnlyTCs = [
         % t_cluster_later_join_metrics
@@ -187,29 +234,38 @@ end_per_group(_, Config) ->
     emqx_cth_suite:stop(?config(group_apps, Config)),
     ok.
 
-init_per_testcase(_TestCase, Config) ->
+init_per_testcase(TestCase, Config) ->
     case ?config(cluster_nodes, Config) of
         undefined ->
-            init_mocks();
+            init_mocks(TestCase);
         Nodes ->
-            [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
+            [erpc:call(Node, ?MODULE, init_mocks, [TestCase]) || Node <- Nodes]
     end,
     Config.
 
-end_per_testcase(_TestCase, Config) ->
+end_per_testcase(TestCase, Config) ->
+    Node = ?config(node, Config),
+    ok = erpc:call(Node, ?MODULE, clear_resources, [TestCase]),
     case ?config(cluster_nodes, Config) of
         undefined ->
             meck:unload();
         Nodes ->
-            [erpc:call(Node, meck, unload, []) || Node <- Nodes]
+            [erpc:call(N, meck, unload, []) || N <- Nodes]
     end,
-    Node = ?config(node, Config),
     ok = emqx_common_test_helpers:call_janitor(),
-    ok = erpc:call(Node, fun clear_resources/0),
     ok.
 
 -define(CONNECTOR_IMPL, dummy_connector_impl).
-init_mocks() ->
+init_mocks(t_fail_delete_with_action) ->
+    init_mocks(common),
+    meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
+    meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
+    meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
+    ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
+        emqx_bridge_v2:get_channels_for_connector(ResId)
+    end),
+    ok;
+init_mocks(_TestCase) ->
     meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
     meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
     meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
@@ -235,7 +291,15 @@ init_mocks() ->
     ),
     [?CONNECTOR_IMPL, emqx_connector_ee_schema].
 
-clear_resources() ->
+clear_resources(t_fail_delete_with_action) ->
+    lists:foreach(
+        fun(#{type := Type, name := Name}) ->
+            ok = emqx_bridge_v2:remove(Type, Name)
+        end,
+        emqx_bridge_v2:list()
+    ),
+    clear_resources(common);
+clear_resources(_) ->
     lists:foreach(
         fun(#{type := Type, name := Name}) ->
             ok = emqx_connector:remove(Type, Name)
@@ -646,7 +710,7 @@ t_connectors_probe(Config) ->
         request_json(
             post,
             uri(["connectors_probe"]),
-            ?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>),
+            ?RESOURCE(<<"broken_connector">>, <<"unknown_type">>),
             Config
         )
     ),
@@ -674,6 +738,57 @@ t_create_with_bad_name(Config) ->
     ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
     ok.
 
+t_fail_delete_with_action(Config) ->
+    Name = ?CONNECTOR_NAME,
+    ?assertMatch(
+        {ok, 201, #{
+            <<"type">> := ?CONNECTOR_TYPE,
+            <<"name">> := Name,
+            <<"enable">> := true,
+            <<"status">> := <<"connected">>,
+            <<"node_status">> := [_ | _]
+        }},
+        request_json(
+            post,
+            uri(["connectors"]),
+            ?KAFKA_CONNECTOR(Name),
+            Config
+        )
+    ),
+    ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
+    BridgeName = ?BRIDGE_NAME,
+    ?assertMatch(
+        {ok, 201, #{
+            <<"type">> := ?BRIDGE_TYPE,
+            <<"name">> := BridgeName,
+            <<"enable">> := true,
+            <<"status">> := <<"connected">>,
+            <<"node_status">> := [_ | _],
+            <<"connector">> := Name,
+            <<"kafka">> := #{},
+            <<"local_topic">> := _,
+            <<"resource_opts">> := _
+        }},
+        request_json(
+            post,
+            uri(["actions"]),
+            ?KAFKA_BRIDGE(?BRIDGE_NAME),
+            Config
+        )
+    ),
+
+    %% delete the connector
+    ?assertMatch(
+        {ok, 400, #{
+            <<"code">> := <<"BAD_REQUEST">>,
+            <<"message">> :=
+                <<"{<<\"Cannot delete connector while there are active channels",
+                    " defined for this connector\">>,", _/binary>>
+        }},
+        request_json(delete, uri(["connectors", ConnectorID]), Config)
+    ),
+    ok.
+
 %%% helpers
 listen_on_random_port() ->
     SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

+ 4 - 5
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -963,13 +963,12 @@ handle_info(
     NChannel = ensure_disconnected(Reason, Channel),
     shutdown(Reason, NChannel);
 handle_info(
-    {sock_closed, Reason},
+    {sock_closed, _Reason},
     Channel = #channel{conn_state = disconnected}
 ) ->
-    ?SLOG(error, #{
-        msg => "unexpected_sock_closed",
-        reason => Reason
-    }),
+    %% This can happen as a race:
+    %% EMQX closes socket and marks 'disconnected' but 'tcp_closed' or 'ssl_closed'
+    %% is already in process mailbox
     {ok, Channel};
 handle_info(clean_authz_cache, Channel) ->
     ok = emqx_authz_cache:empty_authz_cache(),

+ 9 - 4
apps/emqx_resource/src/emqx_resource.erl

@@ -812,11 +812,11 @@ validate_name(Name) ->
     ok.
 
 validate_name(<<>>, _Opts) ->
-    invalid_data("name cannot be empty string");
+    invalid_data("Name cannot be empty string");
 validate_name(Name, _Opts) when size(Name) >= 255 ->
-    invalid_data("name length must be less than 255");
+    invalid_data("Name length must be less than 255");
 validate_name(Name, Opts) ->
-    case re:run(Name, <<"^[-0-9a-zA-Z_]+$">>, [{capture, none}]) of
+    case re:run(Name, <<"^[0-9a-zA-Z][-0-9a-zA-Z_]*$">>, [{capture, none}]) of
         match ->
             case maps:get(atom_name, Opts, true) of
                 %% NOTE
@@ -827,7 +827,12 @@ validate_name(Name, Opts) ->
             end;
         nomatch ->
             invalid_data(
-                <<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name/binary>>
+                <<
+                    "Invalid name format. The name must begin with a letter or number "
+                    "(0-9, a-z, A-Z) and can only include underscores and hyphens as "
+                    "non-initial characters. Got: ",
+                    Name/binary
+                >>
             )
     end.
 

+ 5 - 0
changes/ce/fix-11975.en.md

@@ -0,0 +1,5 @@
+Resolve redundant error logging on socket closure
+
+Addressed a race condition causing duplicate error logs when a socket is closed by both a peer and the server.
+Dual socket close events from the OS and EMQX previously led to excessive error logging.
+The fix improves event handling to avoid redundant error-level logging.

+ 3 - 0
changes/ce/fix-11987.en.md

@@ -0,0 +1,3 @@
+Fix connection crash when trying to set TCP/SSL socket `active_n` option.
+
+Prior to this fix, if a socket is already closed when connection process tries to set `active_n` option, it causes a `case_clause` crash.

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.3.1
+version: 5.3.2-alpha.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.3.1
+appVersion: 5.3.2-alpha.1

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.3.1
+version: 5.3.2
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.3.1
+appVersion: 5.3.2