Browse Source

Merge branch 'release-54' into sync-r54-m-20231221

Thales Macedo Garitezi 2 years atrás
parent
commit
18b69809da
49 changed files with 1479 additions and 595 deletions
  1. 22 0
      apps/emqx/include/asserts.hrl
  2. 2 2
      apps/emqx/include/emqx_release.hrl
  3. 2 2
      apps/emqx/rebar.config
  4. 1 0
      apps/emqx/src/emqx_config.erl
  5. 162 138
      apps/emqx/src/emqx_listeners.erl
  6. 5 0
      apps/emqx/test/emqx_common_test_helpers.erl
  7. 0 1
      apps/emqx/test/emqx_cth_suite.erl
  8. 339 0
      apps/emqx/test/emqx_cth_tls.erl
  9. 314 204
      apps/emqx/test/emqx_listeners_SUITE.erl
  10. 15 0
      apps/emqx_bridge/src/emqx_action_info.erl
  11. 1 0
      apps/emqx_bridge/src/emqx_bridge_api.erl
  12. 8 1
      apps/emqx_bridge/src/emqx_bridge_v2.erl
  13. 3 0
      apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl
  14. 36 0
      apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
  15. 44 4
      apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl
  16. 1 0
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
  17. 10 7
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
  18. 13 3
      apps/emqx_bridge_redis/src/emqx_bridge_redis.erl
  19. 4 10
      apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl
  20. 5 11
      apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl
  21. 1 1
      apps/emqx_conf/src/emqx_conf.erl
  22. 19 3
      apps/emqx_conf/src/emqx_conf_cli.erl
  23. 1 1
      apps/emqx_conf/src/emqx_conf_schema.erl
  24. 12 3
      apps/emqx_conf/test/emqx_conf_cli_SUITE.erl
  25. 3 2
      apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl
  26. 19 3
      apps/emqx_gateway/src/emqx_gateway_api_clients.erl
  27. 2 0
      apps/emqx_gateway/src/emqx_gateway_http.erl
  28. 1 1
      apps/emqx_gateway/src/emqx_gateway_schema.erl
  29. 37 10
      apps/emqx_gateway/src/emqx_gateway_utils.erl
  30. 4 1
      apps/emqx_gateway/test/emqx_gateway_auth_ct.erl
  31. 25 28
      apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl
  32. 21 32
      apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl
  33. 11 5
      apps/emqx_gateway/test/emqx_gateway_test_utils.erl
  34. 57 57
      apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl
  35. 65 16
      apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl
  36. 12 7
      apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl
  37. 21 21
      apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl
  38. 99 0
      apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl
  39. 36 0
      apps/emqx_management/test/emqx_mgmt_api_configs_SUITE_data/webhook_v1.conf
  40. 17 5
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  41. 10 0
      changes/ce/feat-12201.en.md
  42. 3 1
      changes/ce/fix-12081.en.md
  43. 1 0
      changes/ce/fix-12180.en.md
  44. 2 2
      deploy/charts/emqx-enterprise/Chart.yaml
  45. 2 2
      deploy/charts/emqx/Chart.yaml
  46. 2 2
      mix.exs
  47. 2 2
      rebar.config
  48. 7 0
      rel/i18n/emqx_bridge_redis.hocon
  49. 0 7
      rel/i18n/emqx_bridge_redis_schema.hocon

+ 22 - 0
apps/emqx/include/asserts.hrl

@@ -83,6 +83,28 @@
     end)()
 ).
 
+-define(assertExceptionOneOf(CT1, CT2, EXPR),
+    (fun() ->
+        X__Attrs = [
+            {module, ?MODULE},
+            {line, ?LINE},
+            {expression, (??EXPR)},
+            {pattern, "[ " ++ (??CT1) ++ ", " ++ (??CT2) ++ " ]"}
+        ],
+        X__Exc =
+            try (EXPR) of
+                X__V -> erlang:error({assertException, [{unexpected_success, X__V} | X__Attrs]})
+            catch
+                X__C:X__T:X__S -> {X__C, X__T, X__S}
+            end,
+        case {element(1, X__Exc), element(2, X__Exc)} of
+            CT1 -> ok;
+            CT2 -> ok;
+            _ -> erlang:error({assertException, [{unexpected_exception, X__Exc} | X__Attrs]})
+        end
+    end)()
+).
+
 -define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin
     __TEST_CASE = ?FUNCTION_NAME,
     (fun

+ 2 - 2
apps/emqx/include/emqx_release.hrl

@@ -32,10 +32,10 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Opensource edition
--define(EMQX_RELEASE_CE, "5.4.0-alpha.2").
+-define(EMQX_RELEASE_CE, "5.4.0-rc.1").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.4.0-alpha.2").
+-define(EMQX_RELEASE_EE, "5.4.0-rc.1").
 
 %% The HTTP API version
 -define(EMQX_API_VERSION, "5.0").

+ 2 - 2
apps/emqx/rebar.config

@@ -27,9 +27,9 @@
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
-    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}},
+    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}},
-    {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}},
+    {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.3"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},

+ 1 - 0
apps/emqx/src/emqx_config.erl

@@ -94,6 +94,7 @@
 
 -export([ensure_atom_conf_path/2]).
 -export([load_config_files/2]).
+-export([upgrade_raw_conf/2]).
 
 -ifdef(TEST).
 -export([erase_all/0, backup_and_write/2]).

+ 162 - 138
apps/emqx/src/emqx_listeners.erl

@@ -55,7 +55,6 @@
 ]).
 
 -export([pre_config_update/3, post_config_update/5]).
--export([create_listener/3, remove_listener/3, update_listener/3]).
 
 -export([format_bind/1]).
 
@@ -66,6 +65,11 @@
 -export_type([listener_id/0]).
 
 -type listener_id() :: atom() | binary().
+-type listener_type() :: tcp | ssl | ws | wss | quic | dtls.
+
+-define(ESOCKD_LISTENER(T), (T == tcp orelse T == ssl)).
+-define(COWBOY_LISTENER(T), (T == ws orelse T == wss)).
+
 -define(ROOT_KEY, listeners).
 -define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']).
 -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
@@ -140,15 +144,9 @@ format_raw_listeners({Type0, Conf}) ->
 
 -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
 is_running(ListenerId) ->
-    case
-        [
-            Running
-         || {Id, #{running := Running}} <- list(),
-            Id =:= ListenerId
-        ]
-    of
-        [] -> {error, not_found};
-        [IsRunning] -> IsRunning
+    case lists:keyfind(ListenerId, 1, list()) of
+        {_Id, #{running := Running}} -> Running;
+        false -> {error, not_found}
     end.
 
 is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
@@ -229,24 +227,26 @@ start() ->
 start_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun start_listener/3).
 
--spec start_listener(atom(), atom(), map()) -> ok | {error, term()}.
-start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
-    case do_start_listener(Type, ListenerName, Conf) of
+-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
+start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) ->
+    ListenerId = listener_id(Type, Name),
+    Limiter = limiter(Conf),
+    ok = add_limiter_bucket(ListenerId, Limiter),
+    case do_start_listener(Type, Name, ListenerId, Conf) of
         {ok, {skipped, Reason}} when
-            Reason =:= listener_disabled;
             Reason =:= quic_app_missing
         ->
             ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
             console_print(
                 "Listener ~ts is NOT started due to: ~p.~n",
-                [listener_id(Type, ListenerName), Reason]
+                [ListenerId, Reason]
             ),
             ok;
         {ok, _} ->
             ?tp(listener_started, #{type => Type, bind => Bind}),
             console_print(
                 "Listener ~ts on ~ts started.~n",
-                [listener_id(Type, ListenerName), format_bind(Bind)]
+                [ListenerId, format_bind(Bind)]
             ),
             ok;
         {error, {already_started, Pid}} ->
@@ -255,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
             }),
             {error, {already_started, Pid}};
         {error, Reason} ->
+            ok = del_limiter_bucket(ListenerId, Limiter),
             ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}),
-            ListenerId = listener_id(Type, ListenerName),
             BindStr = format_bind(Bind),
             ?ELOG(
                 "Failed to start listener ~ts on ~ts: ~0p.~n",
@@ -269,7 +269,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
                 )
             ),
             {error, {failed_to_start, Msg}}
-    end.
+    end;
+start_listener(Type, Name, #{enable := false}) ->
+    console_print(
+        "Listener ~ts is NOT started due to: disabled.~n",
+        [listener_id(Type, Name)]
+    ),
+    ok.
 
 %% @doc Restart all listeners
 -spec restart() -> ok.
@@ -280,16 +286,33 @@ restart() ->
 restart_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun restart_listener/3).
 
--spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}.
-restart_listener(Type, ListenerName, {OldConf, NewConf}) ->
-    restart_listener(Type, ListenerName, OldConf, NewConf);
+-spec restart_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
 restart_listener(Type, ListenerName, Conf) ->
     restart_listener(Type, ListenerName, Conf, Conf).
 
-restart_listener(Type, ListenerName, OldConf, NewConf) ->
-    case stop_listener(Type, ListenerName, OldConf) of
-        ok -> start_listener(Type, ListenerName, NewConf);
-        {error, Reason} -> {error, Reason}
+update_listener(_Type, _Name, #{enable := false}, #{enable := false}) ->
+    ok;
+update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) ->
+    stop_listener(Type, Name, Conf);
+update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) ->
+    start_listener(Type, Name, Conf);
+update_listener(Type, Name, OldConf, NewConf) ->
+    Id = listener_id(Type, Name),
+    ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)),
+    case do_update_listener(Type, Name, OldConf, NewConf) of
+        ok ->
+            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
+            ok;
+        {error, _Reason} ->
+            restart_listener(Type, Name, OldConf, NewConf)
+    end.
+
+restart_listener(Type, Name, OldConf, NewConf) ->
+    case stop_listener(Type, Name, OldConf) of
+        ok ->
+            start_listener(Type, Name, NewConf);
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 %% @doc Stop all listeners.
@@ -305,9 +328,10 @@ stop() ->
 stop_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun stop_listener/3).
 
-stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
-    Id = listener_id(Type, ListenerName),
-    ok = del_limiter_bucket(Id, Conf),
+stop_listener(Type, Name, #{bind := Bind} = Conf) ->
+    Id = listener_id(Type, Name),
+    ok = del_limiter_bucket(Id, limiter(Conf)),
+    ok = unregister_ocsp_stapling_refresh(Type, Name),
     case do_stop_listener(Type, Id, Conf) of
         ok ->
             console_print(
@@ -325,11 +349,10 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
             {error, Reason}
     end.
 
--spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
-
-do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
+-spec do_stop_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
+do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) ->
     esockd:close(Id, ListenOn);
-do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss ->
+do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) ->
     case cowboy:stop_listener(Id) of
         ok ->
             wait_listener_stopped(ListenOn);
@@ -369,45 +392,25 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
 console_print(_Fmt, _Args) -> ok.
 -endif.
 
-%% Start MQTT/TCP listener
--spec do_start_listener(atom(), atom(), map()) ->
+-spec do_start_listener(listener_type(), atom(), listener_id(), map()) ->
     {ok, pid() | {skipped, atom()}} | {error, term()}.
-do_start_listener(_Type, _ListenerName, #{enable := false}) ->
-    {ok, {skipped, listener_disabled}};
-do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
-    Type == tcp; Type == ssl
-->
-    Id = listener_id(Type, ListenerName),
-    Limiter = limiter(Opts),
-    add_limiter_bucket(Id, Limiter),
+%% Start MQTT/TCP listener
+do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) ->
     esockd:open(
         Id,
         ListenOn,
-        merge_default(esockd_opts(Id, Type, Opts)),
-        {emqx_connection, start_link, [
-            #{
-                listener => {Type, ListenerName},
-                zone => zone(Opts),
-                limiter => Limiter,
-                enable_authn => enable_authn(Opts)
-            }
-        ]}
+        merge_default(esockd_opts(Id, Type, Name, Opts))
     );
 %% Start MQTT/WS listener
-do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
-    Type == ws; Type == wss
-->
-    Id = listener_id(Type, ListenerName),
-    Limiter = limiter(Opts),
-    add_limiter_bucket(Id, Limiter),
-    RanchOpts = ranch_opts(Type, ListenOn, Opts),
-    WsOpts = ws_opts(Type, ListenerName, Opts, Limiter),
+do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
+    RanchOpts = ranch_opts(Type, Opts),
+    WsOpts = ws_opts(Type, Name, Opts),
     case Type of
         ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
         wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
     end;
 %% Start MQTT/QUIC listener
-do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
+do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
     ListenOn =
         case Bind of
             {Addr, Port} when tuple_size(Addr) == 4 ->
@@ -457,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
                 peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
                 peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
                 zone => zone(Opts),
-                listener => {quic, ListenerName},
+                listener => {quic, Name},
                 limiter => Limiter
             },
             StreamOpts = #{
                 stream_callback => emqx_quic_stream,
                 active => 1
             },
-
-            Id = listener_id(quic, ListenerName),
-            add_limiter_bucket(Id, Limiter),
             quicer:spawn_listener(
                 Id,
                 ListenOn,
@@ -476,6 +476,39 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
             {ok, {skipped, quic_app_missing}}
     end.
 
+do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
+    ?ESOCKD_LISTENER(Type)
+->
+    Id = listener_id(Type, Name),
+    case maps:get(bind, OldConf) of
+        ListenOn ->
+            esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
+        _Different ->
+            %% TODO
+            %% Again, we're not strictly required to drop live connections in this case.
+            {error, not_supported}
+    end;
+do_update_listener(Type, Name, OldConf, NewConf) when
+    ?COWBOY_LISTENER(Type)
+->
+    Id = listener_id(Type, Name),
+    RanchOpts = ranch_opts(Type, NewConf),
+    WsOpts = ws_opts(Type, Name, NewConf),
+    case ranch_opts(Type, OldConf) of
+        RanchOpts ->
+            %% Transport options did not change, no need to touch the listener.
+            ok;
+        _Different ->
+            %% Transport options changed, we need to tear down the listener.
+            ok = ranch:suspend_listener(Id),
+            ok = ranch:set_transport_options(Id, RanchOpts)
+    end,
+    ok = ranch:set_protocol_options(Id, WsOpts),
+    %% No-op if the listener was not suspended.
+    ranch:resume_listener(Id);
+do_update_listener(_Type, _Name, _OldConf, _NewConf) ->
+    {error, not_supported}.
+
 %% Update the listeners at runtime
 pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when
     V =:= undefined orelse V =:= ?TOMBSTONE_VALUE
@@ -501,69 +534,44 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
 post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, OldConf, _AppEnvs) when
     OldConf =:= undefined orelse OldConf =:= ?TOMBSTONE_TYPE
 ->
-    create_listener(Type, Name, NewConf);
+    start_listener(Type, Name, NewConf);
 post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
-    update_listener(Type, Name, {OldConf, NewConf});
+    update_listener(Type, Name, OldConf, NewConf);
 post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) ->
-    remove_listener(Type, Name, OldConf);
+    stop_listener(Type, Name, OldConf);
 post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
-    #{enable := NewEnabled} = NewConf,
-    #{enable := OldEnabled} = OldConf,
-    case {NewEnabled, OldEnabled} of
-        {true, true} ->
-            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
-            restart_listener(Type, Name, {OldConf, NewConf});
-        {true, false} ->
-            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
-            start_listener(Type, Name, NewConf);
-        {false, true} ->
-            ok = unregister_ocsp_stapling_refresh(Type, Name),
-            stop_listener(Type, Name, OldConf);
-        {false, false} ->
-            ok = unregister_ocsp_stapling_refresh(Type, Name),
-            stop_listener(Type, Name, OldConf)
-    end;
+    update_listener(Type, Name, OldConf, NewConf);
 post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) ->
     ok;
 post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) ->
     #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf),
-    Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed),
-    perform_listener_changes([
-        {fun ?MODULE:remove_listener/3, Removed},
-        {fun ?MODULE:update_listener/3, Updated},
-        {fun ?MODULE:create_listener/3, Added}
-    ]);
+    %% TODO
+    %% This currently lacks transactional semantics. If one of the changes fails,
+    %% previous changes will not be rolled back.
+    perform_listener_changes(
+        [{update, L} || L <- Changed] ++
+            [{stop, L} || L <- Removed] ++
+            [{start, L} || L <- Added]
+    );
 post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
     ok.
 
-create_listener(Type, Name, NewConf) ->
-    start_listener(Type, Name, NewConf).
-
-remove_listener(Type, Name, OldConf) ->
-    ok = unregister_ocsp_stapling_refresh(Type, Name),
-    stop_listener(Type, Name, OldConf).
-
-update_listener(Type, Name, {OldConf, NewConf}) ->
-    ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
-    restart_listener(Type, Name, {OldConf, NewConf}).
-
 perform_listener_changes([]) ->
     ok;
-perform_listener_changes([{Action, ConfL} | Tasks]) ->
-    case perform_listener_changes(Action, ConfL) of
-        ok -> perform_listener_changes(Tasks);
+perform_listener_changes([{Action, Listener} | Rest]) ->
+    case perform_listener_change(Action, Listener) of
+        ok -> perform_listener_changes(Rest);
         {error, Reason} -> {error, Reason}
     end.
 
-perform_listener_changes(_Action, []) ->
-    ok;
-perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) ->
-    case Action(Type, Name, Diff) of
-        ok -> perform_listener_changes(Action, MapConf);
-        {error, Reason} -> {error, Reason}
-    end.
+perform_listener_change(start, {Type, Name, Conf}) ->
+    start_listener(Type, Name, Conf);
+perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
+    update_listener(Type, Name, ConfOld, ConfNew);
+perform_listener_change(stop, {Type, Name, Conf}) ->
+    stop_listener(Type, Name, Conf).
 
-esockd_opts(ListenerId, Type, Opts0) ->
+esockd_opts(ListenerId, Type, Name, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Limiter = limiter(Opts0),
     Opts2 =
@@ -579,7 +587,16 @@ esockd_opts(ListenerId, Type, Opts0) ->
         end,
     Opts3 = Opts2#{
         access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])),
-        tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}
+        tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]},
+        connection_mfargs =>
+            {emqx_connection, start_link, [
+                #{
+                    listener => {Type, Name},
+                    zone => zone(Opts0),
+                    limiter => Limiter,
+                    enable_authn => enable_authn(Opts0)
+                }
+            ]}
     },
     maps:to_list(
         case Type of
@@ -593,20 +610,21 @@ esockd_opts(ListenerId, Type, Opts0) ->
         end
     ).
 
-ws_opts(Type, ListenerName, Opts, Limiter) ->
-    WsPaths = [
-        {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
+ws_opts(Type, ListenerName, Opts) ->
+    WsPath = emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"),
+    WsRoutes = [
+        {WsPath, emqx_ws_connection, #{
             zone => zone(Opts),
             listener => {Type, ListenerName},
-            limiter => Limiter,
+            limiter => limiter(Opts),
             enable_authn => enable_authn(Opts)
         }}
     ],
-    Dispatch = cowboy_router:compile([{'_', WsPaths}]),
+    Dispatch = cowboy_router:compile([{'_', WsRoutes}]),
     ProxyProto = maps:get(proxy_protocol, Opts, false),
     #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
 
-ranch_opts(Type, ListenOn, Opts) ->
+ranch_opts(Type, Opts = #{bind := ListenOn}) ->
     NumAcceptors = maps:get(acceptors, Opts, 4),
     MaxConnections = maps:get(max_connections, Opts, 1024),
     SocketOpts =
@@ -725,41 +743,47 @@ add_limiter_bucket(Id, Limiter) ->
         maps:without([client], Limiter)
     ).
 
-del_limiter_bucket(Id, Conf) ->
-    case limiter(Conf) of
-        undefined ->
-            ok;
-        Limiter ->
-            lists:foreach(
-                fun(Type) ->
-                    emqx_limiter_server:del_bucket(Id, Type)
-                end,
-                maps:keys(Limiter)
-            )
-    end.
+del_limiter_bucket(_Id, undefined) ->
+    ok;
+del_limiter_bucket(Id, Limiter) ->
+    maps:foreach(
+        fun(Type, _) ->
+            emqx_limiter_server:del_bucket(Id, Type)
+        end,
+        Limiter
+    ).
+
+update_limiter_bucket(Id, Limiter, undefined) ->
+    del_limiter_bucket(Id, Limiter);
+update_limiter_bucket(Id, undefined, Limiter) ->
+    add_limiter_bucket(Id, Limiter);
+update_limiter_bucket(Id, OldLimiter, NewLimiter) ->
+    ok = add_limiter_bucket(Id, NewLimiter),
+    Outdated = maps:without(maps:keys(NewLimiter), OldLimiter),
+    del_limiter_bucket(Id, Outdated).
 
 diff_confs(NewConfs, OldConfs) ->
     emqx_utils:diff_lists(
         flatten_confs(NewConfs),
         flatten_confs(OldConfs),
-        fun({Key, _}) -> Key end
+        fun({Type, Name, _}) -> {Type, Name} end
     ).
 
-flatten_confs(Conf0) ->
+flatten_confs(Confs) ->
     lists:flatmap(
-        fun({Type, Conf}) ->
-            do_flatten_confs(Type, Conf)
+        fun({Type, Listeners}) ->
+            do_flatten_confs(Type, Listeners)
         end,
-        maps:to_list(Conf0)
+        maps:to_list(Confs)
     ).
 
-do_flatten_confs(Type, Conf0) ->
+do_flatten_confs(Type, Listeners) ->
     FilterFun =
         fun
             ({_Name, ?TOMBSTONE_TYPE}) -> false;
-            ({Name, Conf}) -> {true, {{Type, Name}, Conf}}
+            ({Name, Conf}) -> {true, {Type, Name, Conf}}
         end,
-    lists:filtermap(FilterFun, maps:to_list(Conf0)).
+    lists:filtermap(FilterFun, maps:to_list(Listeners)).
 
 enable_authn(Opts) ->
     maps:get(enable_authn, Opts, true).

+ 5 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -67,6 +67,11 @@
     select_free_port/1
 ]).
 
+-export([
+    ssl_verify_fun_allow_any_host/0,
+    ssl_verify_fun_allow_any_host_impl/3
+]).
+
 -export([
     emqx_cluster/1,
     emqx_cluster/2,

+ 0 - 1
apps/emqx/test/emqx_cth_suite.erl

@@ -58,7 +58,6 @@
 -module(emqx_cth_suite).
 
 -include_lib("common_test/include/ct.hrl").
--include_lib("emqx/include/emqx_access_control.hrl").
 
 -export([start/2]).
 -export([stop/1]).

+ 339 - 0
apps/emqx/test/emqx_cth_tls.erl

@@ -0,0 +1,339 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cth_tls).
+
+-include_lib("public_key/include/public_key.hrl").
+
+-export([gen_cert/1]).
+-export([write_cert/2]).
+-export([write_cert/3]).
+-export([write_pem/2]).
+
+%% -------------------------------------------------------------------
+%% Certificate Issuing
+%% Heavily inspired by: ${ERL_SRC}/lib/public_key/test/erl_make_certs.erl
+%% -------------------------------------------------------------------
+
+-type pem_entry() :: public_key:pem_entry().
+-type certificate() :: pem_entry().
+-type private_key() :: pem_entry().
+
+-type cert_subject() :: #{
+    name => string(),
+    email => string(),
+    city => string(),
+    state => string(),
+    org => string(),
+    org_unit => string(),
+    country => string(),
+    serial => string(),
+    title => string(),
+    dnQualifer => string()
+}.
+
+-type cert_validity() ::
+    {_From :: calendar:date(), _To :: calendar:date()}.
+
+-type cert_extensions() :: #{
+    basic_constraints => false | ca | _PathLenContraint :: pos_integer(),
+    key_usage => false | certsign
+}.
+
+%% @doc Generate a certificate and a private key.
+%% If you need root (CA) certificate, use `root` as `issuer` option. By default, the
+%% generated certificate will have according extensions (constraints, key usage, etc).
+%% Once root certificate + private key pair is generated, you can use the result
+%% as `issuer` option to generate other certificates signed by this root.
+-spec gen_cert(Opts) -> {certificate(), private_key()} when
+    Opts :: #{
+        key := ec | rsa | PrivKeyIn,
+        issuer := root | {CertificateIn, PrivKeyIn},
+        subject => cert_subject(),
+        validity => cert_validity(),
+        extensions => cert_extensions() | false
+    },
+    CertificateIn :: certificate() | public_key:der_encoded() | #'OTPCertificate'{},
+    PrivKeyIn :: private_key() | _PEM :: binary().
+gen_cert(Opts) ->
+    SubjectPrivateKey = get_privkey(Opts),
+    {TBSCert, IssuerKey} = make_tbs(SubjectPrivateKey, Opts),
+    Cert = public_key:pkix_sign(TBSCert, IssuerKey),
+    true = verify_signature(Cert, IssuerKey),
+    {encode_cert(Cert), encode_privkey(SubjectPrivateKey)}.
+
+get_privkey(#{key := Algo}) when is_atom(Algo) ->
+    gen_privkey(Algo);
+get_privkey(#{key := Key}) ->
+    decode_privkey(Key).
+
+make_tbs(SubjectKey, Opts) ->
+    {Issuer, IssuerKey} = issuer(Opts, SubjectKey),
+    Subject =
+        case Opts of
+            #{issuer := root} ->
+                Issuer;
+            #{} ->
+                subject(Opts)
+        end,
+    {
+        #'OTPTBSCertificate'{
+            version = v3,
+            serialNumber = rand:uniform(1000000000000),
+            signature = sign_algorithm(IssuerKey, Opts),
+            issuer = Issuer,
+            validity = validity(Opts),
+            subject = Subject,
+            subjectPublicKeyInfo = publickey(SubjectKey),
+            extensions = extensions(Opts)
+        },
+        IssuerKey
+    }.
+
+issuer(Opts = #{issuer := root}, SubjectKey) ->
+    %% Self signed
+    {subject(Opts), SubjectKey};
+issuer(#{issuer := {Issuer, IssuerKey}}, _SubjectKey) ->
+    {issuer_subject(Issuer), decode_privkey(IssuerKey)}.
+
+issuer_subject({'Certificate', IssuerDer, _}) when is_binary(IssuerDer) ->
+    issuer_subject(IssuerDer);
+issuer_subject(IssuerDer) when is_binary(IssuerDer) ->
+    issuer_subject(public_key:pkix_decode_cert(IssuerDer, otp));
+issuer_subject(#'OTPCertificate'{tbsCertificate = #'OTPTBSCertificate'{subject = Subject}}) ->
+    Subject.
+
+subject(Opts = #{}) ->
+    Subject = maps:get(subject, Opts, #{}),
+    Entries = maps:map(
+        fun(N, V) -> [subject_entry(N, V)] end,
+        maps:merge(default_subject(Opts), Subject)
+    ),
+    {rdnSequence, maps:values(Entries)}.
+
+subject_entry(name, Name) ->
+    typed_attr(?'id-at-commonName', {printableString, Name});
+subject_entry(email, Email) ->
+    typed_attr(?'id-emailAddress', Email);
+subject_entry(city, City) ->
+    typed_attr(?'id-at-localityName', {printableString, City});
+subject_entry(state, State) ->
+    typed_attr(?'id-at-stateOrProvinceName', {printableString, State});
+subject_entry(org, Org) ->
+    typed_attr(?'id-at-organizationName', {printableString, Org});
+subject_entry(org_unit, OrgUnit) ->
+    typed_attr(?'id-at-organizationalUnitName', {printableString, OrgUnit});
+subject_entry(country, Country) ->
+    typed_attr(?'id-at-countryName', Country);
+subject_entry(serial, Serial) ->
+    typed_attr(?'id-at-serialNumber', Serial);
+subject_entry(title, Title) ->
+    typed_attr(?'id-at-title', {printableString, Title});
+subject_entry(dnQualifer, DnQ) ->
+    typed_attr(?'id-at-dnQualifier', DnQ).
+
+subject_info(Info, Subject, Default) ->
+    case subject_info(Info, Subject) of
+        undefined -> Default;
+        Value -> Value
+    end.
+
+subject_info(Info, {rdnSequence, Entries}) ->
+    subject_info(Info, Entries);
+subject_info(name, Entries) when is_list(Entries) ->
+    get_string(find_subject_entry(?'id-at-commonName', Entries));
+subject_info(org, Entries) when is_list(Entries) ->
+    get_string(find_subject_entry(?'id-at-organizationName', Entries));
+subject_info(org_unit, Entries) when is_list(Entries) ->
+    get_string(find_subject_entry(?'id-at-organizationalUnitName', Entries));
+subject_info(country, Entries) when is_list(Entries) ->
+    find_subject_entry(?'id-at-countryName', Entries).
+
+find_subject_entry(Oid, Entries) ->
+    emqx_maybe:from_list([
+        Value
+     || Attrs <- Entries,
+        #'AttributeTypeAndValue'{type = T, value = Value} <- Attrs,
+        T =:= Oid
+    ]).
+
+get_string({printableString, String}) ->
+    String;
+get_string(undefined) ->
+    undefined.
+
+typed_attr(Type, Value) ->
+    #'AttributeTypeAndValue'{type = Type, value = Value}.
+
+sign_algorithm(#'ECPrivateKey'{parameters = Parms}, _Opts) ->
+    #'SignatureAlgorithm'{
+        algorithm = ?'ecdsa-with-SHA256',
+        parameters = Parms
+    }.
+
+validity(Opts) ->
+    {From, To} = maps:get(validity, Opts, default_validity()),
+    #'Validity'{
+        notBefore = {generalTime, format_date(From)},
+        notAfter = {generalTime, format_date(To)}
+    }.
+
+publickey(#'ECPrivateKey'{parameters = Params, publicKey = PubKey}) ->
+    #'OTPSubjectPublicKeyInfo'{
+        algorithm = #'PublicKeyAlgorithm'{
+            algorithm = ?'id-ecPublicKey',
+            parameters = Params
+        },
+        subjectPublicKey = #'ECPoint'{point = PubKey}
+    }.
+
+extensions(#{extensions := false}) ->
+    asn1_NOVALUE;
+extensions(Opts) ->
+    Exts = maps:get(extensions, Opts, #{}),
+    Default = default_extensions(Opts),
+    maps:fold(
+        fun(Name, Data, Acc) -> Acc ++ extension(Name, Data) end,
+        [],
+        maps:merge(Default, Exts)
+    ).
+
+extension(basic_constraints, false) ->
+    [];
+extension(basic_constraints, ca) ->
+    [
+        #'Extension'{
+            extnID = ?'id-ce-basicConstraints',
+            extnValue = #'BasicConstraints'{cA = true},
+            critical = true
+        }
+    ];
+extension(basic_constraints, Len) when is_integer(Len) ->
+    [
+        #'Extension'{
+            extnID = ?'id-ce-basicConstraints',
+            extnValue = #'BasicConstraints'{cA = true, pathLenConstraint = Len},
+            critical = true
+        }
+    ];
+extension(key_usage, false) ->
+    [];
+extension(key_usage, certsign) ->
+    [
+        #'Extension'{
+            extnID = ?'id-ce-keyUsage',
+            extnValue = [keyCertSign],
+            critical = true
+        }
+    ].
+
+default_validity() ->
+    {shift_date(date(), -1), shift_date(date(), +7)}.
+
+default_subject(#{issuer := root}) ->
+    #{
+        name => "RootCA",
+        org => "EMQ",
+        org_unit => "EMQX",
+        country => "CN"
+    };
+default_subject(#{}) ->
+    #{
+        name => "Server",
+        org => "EMQ",
+        org_unit => "EMQX",
+        country => "CN"
+    }.
+
+default_extensions(#{issuer := root}) ->
+    #{
+        basic_constraints => ca,
+        key_usage => certsign
+    };
+default_extensions(#{}) ->
+    #{}.
+
+%% -------------------------------------------------------------------
+
+verify_signature(CertDer, #'ECPrivateKey'{parameters = Params, publicKey = PubKey}) ->
+    public_key:pkix_verify(CertDer, {#'ECPoint'{point = PubKey}, Params});
+verify_signature(CertDer, KeyPem) ->
+    verify_signature(CertDer, decode_privkey(KeyPem)).
+
+%% -------------------------------------------------------------------
+
+gen_privkey(ec) ->
+    public_key:generate_key({namedCurve, secp256k1});
+gen_privkey(rsa) ->
+    public_key:generate_key({rsa, 2048, 17}).
+
+decode_privkey(#'ECPrivateKey'{} = Key) ->
+    Key;
+decode_privkey(#'RSAPrivateKey'{} = Key) ->
+    Key;
+decode_privkey(PemEntry = {_, _, _}) ->
+    public_key:pem_entry_decode(PemEntry);
+decode_privkey(PemBinary) when is_binary(PemBinary) ->
+    [KeyInfo] = public_key:pem_decode(PemBinary),
+    decode_privkey(KeyInfo).
+
+-spec encode_privkey(#'ECPrivateKey'{} | #'RSAPrivateKey'{}) -> private_key().
+encode_privkey(Key = #'ECPrivateKey'{}) ->
+    {ok, Der} = 'OTP-PUB-KEY':encode('ECPrivateKey', Key),
+    {'ECPrivateKey', Der, not_encrypted};
+encode_privkey(Key = #'RSAPrivateKey'{}) ->
+    {ok, Der} = 'OTP-PUB-KEY':encode('RSAPrivateKey', Key),
+    {'RSAPrivateKey', Der, not_encrypted}.
+
+-spec encode_cert(public_key:der_encoded()) -> certificate().
+encode_cert(Der) ->
+    {'Certificate', Der, not_encrypted}.
+
+%% -------------------------------------------------------------------
+
+shift_date(Date, Offset) ->
+    calendar:gregorian_days_to_date(calendar:date_to_gregorian_days(Date) + Offset).
+
+format_date({Y, M, D}) ->
+    lists:flatten(io_lib:format("~w~2..0w~2..0w000000Z", [Y, M, D])).
+
+%% -------------------------------------------------------------------
+
+%% @doc Write certificate + private key pair to respective files.
+%% Files are created in the given directory. The filenames are derived
+%% from the subject information in the certificate.
+-spec write_cert(_Dir :: file:name(), {certificate(), private_key()}) ->
+    {file:name(), file:name()}.
+write_cert(Dir, {Cert, Key}) ->
+    Subject = issuer_subject(Cert),
+    Filename = subject_info(org, Subject, "ORG") ++ "." ++ subject_info(name, Subject, "XXX"),
+    write_cert(Dir, Filename, {Cert, Key}).
+
+-spec write_cert(_Dir :: file:name(), _Prefix :: string(), {certificate(), private_key()}) ->
+    {file:name(), file:name()}.
+write_cert(Dir, Filename, {Cert, Key}) ->
+    Certfile = filename:join(Dir, Filename ++ ".crt"),
+    Keyfile = filename:join(Dir, Filename ++ ".key"),
+    ok = write_pem(Certfile, Cert),
+    ok = write_pem(Keyfile, Key),
+    {Certfile, Keyfile}.
+
+-spec write_pem(file:name(), pem_entry() | [pem_entry()]) ->
+    ok | {error, file:posix()}.
+write_pem(Name, Entries = [_ | _]) ->
+    file:write_file(Name, public_key:pem_encode(Entries));
+write_pem(Name, Entry) ->
+    write_pem(Name, [Entry]).

+ 314 - 204
apps/emqx/test/emqx_listeners_SUITE.erl

@@ -20,122 +20,46 @@
 -compile(nowarn_export_all).
 
 -include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_schema.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--define(CERTS_PATH(CertName), filename:join(["../../lib/emqx/etc/certs/", CertName])).
-
 -define(SERVER_KEY_PASSWORD, "sErve7r8Key$!").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    NewConfig = generate_config(),
-    application:ensure_all_started(esockd),
-    application:ensure_all_started(quicer),
-    application:ensure_all_started(cowboy),
     generate_tls_certs(Config),
-    lists:foreach(fun set_app_env/1, NewConfig),
-    Config.
+    WorkDir = emqx_cth_suite:work_dir(Config),
+    Apps = emqx_cth_suite:start([quicer, emqx], #{work_dir => WorkDir}),
+    [{apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    application:stop(esockd),
-    application:stop(cowboy).
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
 
 init_per_testcase(Case, Config) when
-    Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp
+    Case =:= t_start_stop_listeners;
+    Case =:= t_restart_listeners;
+    Case =:= t_restart_listeners_with_hibernate_after_disabled
 ->
-    catch emqx_config_handler:stop(),
-    Port = emqx_common_test_helpers:select_free_port(tcp),
-    {ok, _} = emqx_config_handler:start_link(),
-    PrevListeners = emqx_config:get([listeners], #{}),
-    PureListeners = remove_default_limiter(PrevListeners),
-    PureListeners2 = PureListeners#{
-        tcp => #{
-            listener_test => #{
-                bind => {"127.0.0.1", Port},
-                max_connections => 4321,
-                limiter => #{}
-            }
-        }
-    },
-    emqx_config:put([listeners], PureListeners2),
-
+    ok = emqx_listeners:stop(),
+    Config;
+init_per_testcase(_, Config) ->
     ok = emqx_listeners:start(),
-    [
-        {prev_listener_conf, PrevListeners},
-        {tcp_port, Port}
-        | Config
-    ];
-init_per_testcase(t_wss_conn, Config) ->
-    catch emqx_config_handler:stop(),
-    Port = emqx_common_test_helpers:select_free_port(ssl),
-    {ok, _} = emqx_config_handler:start_link(),
-    PrevListeners = emqx_config:get([listeners], #{}),
-    PureListeners = remove_default_limiter(PrevListeners),
-    PureListeners2 = PureListeners#{
-        wss => #{
-            listener_test => #{
-                bind => {{127, 0, 0, 1}, Port},
-                limiter => #{},
-                ssl_options => #{
-                    cacertfile => ?CERTS_PATH("cacert.pem"),
-                    certfile => ?CERTS_PATH("cert.pem"),
-                    keyfile => ?CERTS_PATH("key.pem")
-                }
-            }
-        }
-    },
-    emqx_config:put([listeners], PureListeners2),
+    Config.
 
-    ok = emqx_listeners:start(),
-    [
-        {prev_listener_conf, PrevListeners},
-        {wss_port, Port}
-        | Config
-    ];
-init_per_testcase(_, Config) ->
-    catch emqx_config_handler:stop(),
-    {ok, _} = emqx_config_handler:start_link(),
-    PrevListeners = emqx_config:get([listeners], #{}),
-    PureListeners = remove_default_limiter(PrevListeners),
-    emqx_config:put([listeners], PureListeners),
-    [
-        {prev_listener_conf, PrevListeners}
-        | Config
-    ].
-
-end_per_testcase(Case, Config) when
-    Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp
-->
-    PrevListener = ?config(prev_listener_conf, Config),
-    emqx_listeners:stop(),
-    emqx_config:put([listeners], PrevListener),
-    _ = emqx_config_handler:stop(),
-    ok;
-end_per_testcase(t_wss_conn, Config) ->
-    PrevListener = ?config(prev_listener_conf, Config),
-    emqx_listeners:stop(),
-    emqx_config:put([listeners], PrevListener),
-    _ = emqx_config_handler:stop(),
-    ok;
-end_per_testcase(_, Config) ->
-    PrevListener = ?config(prev_listener_conf, Config),
-    emqx_config:put([listeners], PrevListener),
-    _ = emqx_config_handler:stop(),
+end_per_testcase(_, _Config) ->
     ok.
 
 t_start_stop_listeners(_) ->
     ok = emqx_listeners:start(),
-    ?assertException(error, _, emqx_listeners:start_listener({ws, {"127.0.0.1", 8083}, []})),
+    ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})),
     ok = emqx_listeners:stop().
 
 t_restart_listeners(_) ->
     ok = emqx_listeners:start(),
     ok = emqx_listeners:stop(),
-    %% flakyness: eaddrinuse
-    timer:sleep(timer:seconds(2)),
     ok = emqx_listeners:restart(),
     ok = emqx_listeners:stop().
 
@@ -168,77 +92,315 @@ t_restart_listeners_with_hibernate_after_disabled(_Config) ->
     ),
     ok = emqx_listeners:start(),
     ok = emqx_listeners:stop(),
-    %% flakyness: eaddrinuse
-    timer:sleep(timer:seconds(2)),
     ok = emqx_listeners:restart(),
     ok = emqx_listeners:stop(),
     emqx_config:put([listeners], OldLConf).
 
-t_max_conns_tcp(Config) ->
+t_max_conns_tcp(_Config) ->
     %% Note: Using a string representation for the bind address like
     %% "127.0.0.1" does not work
-    ?assertEqual(
-        4321,
-        emqx_listeners:max_conns('tcp:listener_test', {{127, 0, 0, 1}, ?config(tcp_port, Config)})
-    ).
+    Port = emqx_common_test_helpers:select_free_port(tcp),
+    Conf = #{
+        <<"bind">> => format_bind({"127.0.0.1", Port}),
+        <<"max_connections">> => 4321,
+        <<"limiter">> => #{}
+    },
+    with_listener(tcp, maxconns, Conf, fun() ->
+        ?assertEqual(
+            4321,
+            emqx_listeners:max_conns('tcp:maxconns', {{127, 0, 0, 1}, Port})
+        )
+    end).
 
-t_current_conns_tcp(Config) ->
-    ?assertEqual(
-        0,
-        emqx_listeners:current_conns('tcp:listener_test', {
-            {127, 0, 0, 1}, ?config(tcp_port, Config)
-        })
-    ).
+t_current_conns_tcp(_Config) ->
+    Port = emqx_common_test_helpers:select_free_port(tcp),
+    Conf = #{
+        <<"bind">> => format_bind({"127.0.0.1", Port}),
+        <<"max_connections">> => 42,
+        <<"limiter">> => #{}
+    },
+    with_listener(tcp, curconns, Conf, fun() ->
+        ?assertEqual(
+            0,
+            emqx_listeners:current_conns('tcp:curconns', {{127, 0, 0, 1}, Port})
+        )
+    end).
 
 t_wss_conn(Config) ->
-    {ok, Socket} = ssl:connect(
-        {127, 0, 0, 1}, ?config(wss_port, Config), [{verify, verify_none}], 1000
-    ),
-    ok = ssl:close(Socket).
+    PrivDir = ?config(priv_dir, Config),
+    Port = emqx_common_test_helpers:select_free_port(ssl),
+    Conf = #{
+        <<"bind">> => format_bind({"127.0.0.1", Port}),
+        <<"limiter">> => #{},
+        <<"ssl_options">> => #{
+            <<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
+            <<"certfile">> => filename:join(PrivDir, "server.pem"),
+            <<"keyfile">> => filename:join(PrivDir, "server.key")
+        }
+    },
+    with_listener(wss, wssconn, Conf, fun() ->
+        {ok, Socket} = ssl:connect({127, 0, 0, 1}, Port, [{verify, verify_none}], 1000),
+        ok = ssl:close(Socket)
+    end).
 
 t_quic_conn(Config) ->
+    PrivDir = ?config(priv_dir, Config),
     Port = emqx_common_test_helpers:select_free_port(quic),
-    DataDir = ?config(data_dir, Config),
-    SSLOpts = #{
-        password => ?SERVER_KEY_PASSWORD,
-        certfile => filename:join(DataDir, "server-password.pem"),
-        cacertfile => filename:join(DataDir, "ca.pem"),
-        keyfile => filename:join(DataDir, "server-password.key")
+    Conf = #{
+        <<"bind">> => format_bind({"127.0.0.1", Port}),
+        <<"ssl_options">> => #{
+            <<"password">> => ?SERVER_KEY_PASSWORD,
+            <<"certfile">> => filename:join(PrivDir, "server-password.pem"),
+            <<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
+            <<"keyfile">> => filename:join(PrivDir, "server-password.key")
+        }
     },
-    emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, Port, #{ssl_options => SSLOpts}),
-    ct:pal("~p", [emqx_listeners:list()]),
-    {ok, Conn} = quicer:connect(
-        {127, 0, 0, 1},
-        Port,
-        [
-            {verify, verify_none},
-            {alpn, ["mqtt"]}
-        ],
-        1000
-    ),
-    ok = quicer:close_connection(Conn),
-    emqx_listeners:stop_listener(quic, ?FUNCTION_NAME, #{bind => Port}).
+    with_listener(quic, ?FUNCTION_NAME, Conf, fun() ->
+        {ok, Conn} = quicer:connect(
+            {127, 0, 0, 1},
+            Port,
+            [
+                {verify, verify_none},
+                {alpn, ["mqtt"]}
+            ],
+            1000
+        ),
+        ok = quicer:close_connection(Conn)
+    end).
 
 t_ssl_password_cert(Config) ->
+    PrivDir = ?config(priv_dir, Config),
     Port = emqx_common_test_helpers:select_free_port(ssl),
-    DataDir = ?config(data_dir, Config),
     SSLOptsPWD = #{
-        password => ?SERVER_KEY_PASSWORD,
-        certfile => filename:join(DataDir, "server-password.pem"),
-        cacertfile => filename:join(DataDir, "ca.pem"),
-        keyfile => filename:join(DataDir, "server-password.key")
+        <<"password">> => ?SERVER_KEY_PASSWORD,
+        <<"certfile">> => filename:join(PrivDir, "server-password.pem"),
+        <<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
+        <<"keyfile">> => filename:join(PrivDir, "server-password.key")
     },
     LConf = #{
-        enable => true,
-        bind => {{127, 0, 0, 1}, Port},
-        mountpoint => <<>>,
-        zone => default,
-        ssl_options => SSLOptsPWD
+        <<"enable">> => true,
+        <<"bind">> => format_bind({{127, 0, 0, 1}, Port}),
+        <<"ssl_options">> => SSLOptsPWD
     },
-    ok = emqx_listeners:start_listener(ssl, ?FUNCTION_NAME, LConf),
-    {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]),
-    ssl:close(SSLSocket),
-    emqx_listeners:stop_listener(ssl, ?FUNCTION_NAME, LConf).
+    with_listener(ssl, ?FUNCTION_NAME, LConf, fun() ->
+        {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]),
+        ssl:close(SSLSocket)
+    end).
+
+t_ssl_update_opts(Config) ->
+    PrivDir = ?config(priv_dir, Config),
+    Host = "127.0.0.1",
+    Port = emqx_common_test_helpers:select_free_port(ssl),
+    Conf = #{
+        <<"enable">> => true,
+        <<"bind">> => format_bind({Host, Port}),
+        <<"ssl_options">> => #{
+            <<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
+            <<"password">> => ?SERVER_KEY_PASSWORD,
+            <<"certfile">> => filename:join(PrivDir, "server-password.pem"),
+            <<"keyfile">> => filename:join(PrivDir, "server-password.key"),
+            <<"verify">> => verify_none
+        }
+    },
+    ClientSSLOpts = [
+        {verify, verify_peer},
+        {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
+    ],
+    with_listener(ssl, updated, Conf, fun() ->
+        %% Client connects successfully.
+        C1 = emqtt_connect_ssl(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
+        ]),
+
+        %% Change the listener SSL configuration: another set of cert/key files.
+        {ok, _} = emqx:update_config(
+            [listeners, ssl, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
+                    <<"certfile">> => filename:join(PrivDir, "server.pem"),
+                    <<"keyfile">> => filename:join(PrivDir, "server.key")
+                }
+            }}
+        ),
+
+        %% Unable to connect with old SSL options, server's cert is signed by another CA.
+        ?assertError(
+            {tls_alert, {unknown_ca, _}},
+            emqtt_connect_ssl(Host, Port, [
+                {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
+            ])
+        ),
+
+        C2 = emqtt_connect_ssl(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
+        ]),
+
+        %% Change the listener SSL configuration: require peer certificate.
+        {ok, _} = emqx:update_config(
+            [listeners, ssl, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"verify">> => verify_peer,
+                    <<"fail_if_no_peer_cert">> => true
+                }
+            }}
+        ),
+
+        %% Unable to connect with old SSL options, certificate is now required.
+        ?assertExceptionOneOf(
+            {error, {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}},
+            {error, closed},
+            emqtt_connect_ssl(Host, Port, [
+                {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
+            ])
+        ),
+
+        C3 = emqtt_connect_ssl(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca-next.pem")},
+            {certfile, filename:join(PrivDir, "client.pem")},
+            {keyfile, filename:join(PrivDir, "client.key")}
+            | ClientSSLOpts
+        ]),
+
+        %% Both pre- and post-update clients should be alive.
+        ?assertEqual(pong, emqtt:ping(C1)),
+        ?assertEqual(pong, emqtt:ping(C2)),
+        ?assertEqual(pong, emqtt:ping(C3)),
+
+        ok = emqtt:stop(C1),
+        ok = emqtt:stop(C2),
+        ok = emqtt:stop(C3)
+    end).
+
+t_wss_update_opts(Config) ->
+    PrivDir = ?config(priv_dir, Config),
+    Host = "127.0.0.1",
+    Port = emqx_common_test_helpers:select_free_port(ssl),
+    Conf = #{
+        <<"enable">> => true,
+        <<"bind">> => format_bind({Host, Port}),
+        <<"ssl_options">> => #{
+            <<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
+            <<"certfile">> => filename:join(PrivDir, "server-password.pem"),
+            <<"keyfile">> => filename:join(PrivDir, "server-password.key"),
+            <<"password">> => ?SERVER_KEY_PASSWORD,
+            <<"verify">> => verify_none
+        }
+    },
+    ClientSSLOpts = [
+        {verify, verify_peer},
+        {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
+    ],
+    with_listener(wss, updated, Conf, fun() ->
+        %% Start a client.
+        C1 = emqtt_connect_wss(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca.pem")}
+            | ClientSSLOpts
+        ]),
+
+        %% Change the listener SSL configuration.
+        %% 1. Another set of (password protected) cert/key files.
+        %% 2. Require peer certificate.
+        {ok, _} = emqx:update_config(
+            [listeners, wss, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
+                    <<"certfile">> => filename:join(PrivDir, "server.pem"),
+                    <<"keyfile">> => filename:join(PrivDir, "server.key")
+                }
+            }}
+        ),
+
+        %% Unable to connect with old SSL options, server's cert is signed by another CA.
+        %% Due to a bug `emqtt` exits with `badmatch` in this case.
+        ?assertExit(
+            _Badmatch,
+            emqtt_connect_wss(Host, Port, ClientSSLOpts)
+        ),
+
+        C2 = emqtt_connect_wss(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca-next.pem")}
+            | ClientSSLOpts
+        ]),
+
+        %% Change the listener SSL configuration: require peer certificate.
+        {ok, _} = emqx:update_config(
+            [listeners, wss, updated],
+            {update, #{
+                <<"ssl_options">> => #{
+                    <<"verify">> => verify_peer,
+                    <<"fail_if_no_peer_cert">> => true
+                }
+            }}
+        ),
+
+        %% Unable to connect with old SSL options, certificate is now required.
+        %% Due to a bug `emqtt` does not instantly report that socket was closed.
+        ?assertError(
+            timeout,
+            emqtt_connect_wss(Host, Port, [
+                {cacertfile, filename:join(PrivDir, "ca-next.pem")}
+                | ClientSSLOpts
+            ])
+        ),
+
+        C3 = emqtt_connect_wss(Host, Port, [
+            {cacertfile, filename:join(PrivDir, "ca-next.pem")},
+            {certfile, filename:join(PrivDir, "client.pem")},
+            {keyfile, filename:join(PrivDir, "client.key")}
+            | ClientSSLOpts
+        ]),
+
+        %% Both pre- and post-update clients should be alive.
+        ?assertEqual(pong, emqtt:ping(C1)),
+        ?assertEqual(pong, emqtt:ping(C2)),
+        ?assertEqual(pong, emqtt:ping(C3)),
+
+        ok = emqtt:stop(C1),
+        ok = emqtt:stop(C2),
+        ok = emqtt:stop(C3)
+    end).
+
+with_listener(Type, Name, Config, Then) ->
+    {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}),
+    try
+        Then()
+    after
+        emqx:update_config([listeners, Type, Name], ?TOMBSTONE_CONFIG_CHANGE_REQ)
+    end.
+
+emqtt_connect_ssl(Host, Port, SSLOpts) ->
+    emqtt_connect(fun emqtt:connect/1, #{
+        hosts => [{Host, Port}],
+        connect_timeout => 1,
+        ssl => true,
+        ssl_opts => SSLOpts
+    }).
+
+emqtt_connect_wss(Host, Port, SSLOpts) ->
+    emqtt_connect(fun emqtt:ws_connect/1, #{
+        hosts => [{Host, Port}],
+        connect_timeout => 1,
+        ws_transport_options => [
+            {protocols, [http]},
+            {transport, tls},
+            {tls_opts, SSLOpts}
+        ]
+    }).
+
+emqtt_connect(Connect, Opts) ->
+    case emqtt:start_link(Opts) of
+        {ok, Client} ->
+            true = erlang:unlink(Client),
+            case Connect(Client) of
+                {ok, _} -> Client;
+                {error, Reason} -> error(Reason, [Opts])
+            end;
+        {error, Reason} ->
+            error(Reason, [Opts])
+    end.
 
 t_format_bind(_) ->
     ?assertEqual(
@@ -266,67 +428,15 @@ t_format_bind(_) ->
         lists:flatten(emqx_listeners:format_bind(":1883"))
     ).
 
-render_config_file() ->
-    Path = local_path(["etc", "emqx.conf"]),
-    {ok, Temp} = file:read_file(Path),
-    Vars0 = mustache_vars(),
-    Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0],
-    Targ = bbmustache:render(Temp, Vars),
-    NewName = Path ++ ".rendered",
-    ok = file:write_file(NewName, Targ),
-    NewName.
-
-mustache_vars() ->
-    [
-        {platform_data_dir, local_path(["data"])},
-        {platform_etc_dir, local_path(["etc"])}
-    ].
-
-generate_config() ->
-    ConfFile = render_config_file(),
-    {ok, Conf} = hocon:load(ConfFile, #{format => richmap}),
-    hocon_tconf:generate(emqx_schema, Conf).
-
-set_app_env({App, Lists}) ->
-    lists:foreach(
-        fun
-            ({authz_file, _Var}) ->
-                application:set_env(App, authz_file, local_path(["etc", "authz.conf"]));
-            ({Par, Var}) ->
-                application:set_env(App, Par, Var)
-        end,
-        Lists
-    ).
-
-local_path(Components, Module) ->
-    filename:join([get_base_dir(Module) | Components]).
-
-local_path(Components) ->
-    local_path(Components, ?MODULE).
-
-get_base_dir(Module) ->
-    {file, Here} = code:is_loaded(Module),
-    filename:dirname(filename:dirname(Here)).
-
-get_base_dir() ->
-    get_base_dir(?MODULE).
-
-remove_default_limiter(Listeners) ->
-    maps:map(
-        fun(_, X) ->
-            maps:map(
-                fun(_, E) ->
-                    maps:remove(limiter, E)
-                end,
-                X
-            )
-        end,
-        Listeners
-    ).
-
 generate_tls_certs(Config) ->
-    DataDir = ?config(data_dir, Config),
-    emqx_common_test_helpers:gen_ca(DataDir, "ca"),
-    emqx_common_test_helpers:gen_host_cert("server-password", "ca", DataDir, #{
+    PrivDir = ?config(priv_dir, Config),
+    emqx_common_test_helpers:gen_ca(PrivDir, "ca"),
+    emqx_common_test_helpers:gen_ca(PrivDir, "ca-next"),
+    emqx_common_test_helpers:gen_host_cert("server", "ca-next", PrivDir, #{}),
+    emqx_common_test_helpers:gen_host_cert("client", "ca-next", PrivDir, #{}),
+    emqx_common_test_helpers:gen_host_cert("server-password", "ca", PrivDir, #{
         password => ?SERVER_KEY_PASSWORD
     }).
+
+format_bind(Bind) ->
+    iolist_to_binary(emqx_listeners:format_bind(Bind)).

+ 15 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -24,6 +24,7 @@
     action_type_to_connector_type/1,
     action_type_to_bridge_v1_type/2,
     bridge_v1_type_to_action_type/1,
+    bridge_v1_type_name/1,
     is_action_type/1,
     registered_schema_modules/0,
     connector_action_config_to_bridge_v1_config/2,
@@ -144,6 +145,20 @@ get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
 get_confs(_, _) ->
     undefined.
 
+%% We need this hack because of the bugs introduced by associating v2/action/source types
+%% with v1 types unconditionally, like `mongodb' being a "valid" V1 bridge type, or
+%% `confluent_producer', which has no v1 equivalent....
+bridge_v1_type_name(ActionTypeBin) when is_binary(ActionTypeBin) ->
+    bridge_v1_type_name(binary_to_existing_atom(ActionTypeBin));
+bridge_v1_type_name(ActionType) ->
+    Module = get_action_info_module(ActionType),
+    case erlang:function_exported(Module, bridge_v1_type_name, 0) of
+        true ->
+            {ok, Module:bridge_v1_type_name()};
+        false ->
+            {error, no_v1_equivalent}
+    end.
+
 %% This function should return true for all inputs that are bridge V1 types for
 %% bridges that have been refactored to bridge V2s, and for all all bridge V2
 %% types. For everything else the function should return false.

+ 1 - 0
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -621,6 +621,7 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
     end.
 
 lookup_from_local_node(ActionType, ActionName) ->
+    %% TODO: BUG: shouldn't accept an action type here, only V1 types....
     case emqx_bridge:lookup(ActionType, ActionName) of
         {ok, Res} -> {ok, format_resource(Res, node())};
         Error -> Error

+ 8 - 1
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -1086,7 +1086,8 @@ bridge_v1_lookup_and_transform(ActionType, Name) ->
     case lookup(ActionType, Name) of
         {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
             BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
-            case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
+            HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType),
+            case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
                 true ->
                     ConnectorType = connector_type(ActionType),
                     case emqx_connector:lookup(ConnectorType, ConnectorName) of
@@ -1112,6 +1113,12 @@ bridge_v1_lookup_and_transform(ActionType, Name) ->
 not_bridge_v1_compatible_error() ->
     {error, not_bridge_v1_compatible}.
 
+has_bridge_v1_equivalent(ActionType) ->
+    case emqx_action_info:bridge_v1_type_name(ActionType) of
+        {ok, _} -> true;
+        {error, no_v1_equivalent} -> false
+    end.
+
 connector_raw_config(Connector, ConnectorType) ->
     get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema).
 

+ 3 - 0
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -136,6 +136,9 @@ setup_mocks() ->
         end
     ),
 
+    catch meck:new(emqx_action_info, MeckOpts),
+    meck:expect(emqx_action_info, bridge_v1_type_name, 1, {ok, bridge_type()}),
+
     ok.
 
 con_mod() ->

+ 36 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -343,6 +343,42 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
     ct:pal("bridge probe result: ~p", [Res]),
     Res.
 
+list_bridges_http_api_v1() ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    ct:pal("list bridges (http v1)"),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list bridges (http v1) result:\n  ~p", [Res]),
+    Res.
+
+list_actions_http_api() ->
+    Path = emqx_mgmt_api_test_util:api_path(["actions"]),
+    ct:pal("list actions (http v2)"),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list actions (http v2) result:\n  ~p", [Res]),
+    Res.
+
+list_connectors_http_api() ->
+    Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
+    ct:pal("list connectors"),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list connectors result:\n  ~p", [Res]),
+    Res.
+
+update_rule_http(RuleId, Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]),
+    ct:pal("update rule ~p:\n  ~p", [RuleId, Params]),
+    Res = request(put, Path, Params),
+    ct:pal("update rule ~p result:\n  ~p", [RuleId, Res]),
+    Res.
+
+enable_rule_http(RuleId) ->
+    Params = #{<<"enable">> => true},
+    update_rule_http(RuleId, Params).
+
+is_rule_enabled(RuleId) ->
+    {ok, #{enable := Enable}} = emqx_rule_engine:get_rule(RuleId),
+    Enable.
+
 try_decode_error(Body0) ->
     case emqx_utils_json:safe_decode(Body0, [return_maps]) of
         {ok, #{<<"message">> := Msg0} = Body1} ->

+ 44 - 4
apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl

@@ -10,8 +10,8 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(BRIDGE_TYPE, confluent_producer).
--define(BRIDGE_TYPE_BIN, <<"confluent_producer">>).
+-define(ACTION_TYPE, confluent_producer).
+-define(ACTION_TYPE_BIN, <<"confluent_producer">>).
 -define(CONNECTOR_TYPE, confluent_producer).
 -define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
 -define(KAFKA_BRIDGE_TYPE, kafka_producer).
@@ -93,7 +93,7 @@ common_init_per_testcase(TestCase, Config) ->
             {connector_type, ?CONNECTOR_TYPE},
             {connector_name, Name},
             {connector_config, ConnectorConfig},
-            {bridge_type, ?BRIDGE_TYPE},
+            {bridge_type, ?ACTION_TYPE},
             {bridge_name, Name},
             {bridge_config, BridgeConfig}
             | Config
@@ -212,7 +212,7 @@ serde_roundtrip(InnerConfigMap0) ->
     InnerConfigMap.
 
 parse_and_check_bridge_config(InnerConfigMap, Name) ->
-    TypeBin = ?BRIDGE_TYPE_BIN,
+    TypeBin = ?ACTION_TYPE_BIN,
     RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
     hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
     InnerConfigMap.
@@ -341,3 +341,43 @@ t_same_name_confluent_kafka_bridges(Config) ->
         end
     ),
     ok.
+
+t_list_v1_bridges(Config) ->
+    ?check_trace(
+        begin
+            {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config),
+
+            ?assertMatch(
+                {error, no_v1_equivalent},
+                emqx_action_info:bridge_v1_type_name(confluent_producer)
+            ),
+
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, []}}, emqx_bridge_v2_testlib:list_bridges_http_api_v1()
+            ),
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_actions_http_api()
+            ),
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_connectors_http_api()
+            ),
+
+            RuleTopic = <<"t/c">>,
+            {ok, #{<<"id">> := RuleId0}} =
+                emqx_bridge_v2_testlib:create_rule_and_action_http(
+                    ?ACTION_TYPE_BIN,
+                    RuleTopic,
+                    Config,
+                    #{overrides => #{enable => true}}
+                ),
+            ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)),
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, _}}, emqx_bridge_v2_testlib:enable_rule_http(RuleId0)
+            ),
+            ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -566,6 +566,7 @@ do_acknowledge(State0) ->
     Path = path(State1, ack),
     Body = body(State1, ack, #{ack_ids => AckIds}),
     PreparedRequest = {prepared_request, {Method, Path, Body}},
+    ?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}),
     Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
     case Res of
         {error, Reason} ->

+ 10 - 7
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -706,7 +706,9 @@ prop_all_pulled_are_acked(Trace) ->
          || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace),
             #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs
         ],
-    AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)),
+    %% we just need to check that it _tries_ to ack each id; the result itself doesn't
+    %% matter, as it might timeout.
+    AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_will_acknowledge, Trace)),
     AckedMsgIds1 = [
         MsgId
      || PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks)
@@ -1172,7 +1174,12 @@ t_multiple_topic_mappings(Config) ->
             ?assertMatch(
                 {{ok, _}, {ok, _}},
                 ?wait_async_action(
-                    create_bridge(Config),
+                    create_bridge(
+                        Config,
+                        #{
+                            <<"consumer">> => #{<<"ack_deadline">> => <<"10m">>}
+                        }
+                    ),
                     #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
                     40_000
                 )
@@ -1233,7 +1240,7 @@ t_multiple_topic_mappings(Config) ->
                 ],
                 Published
             ),
-            wait_acked(#{n => 2}),
+            ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 20_000),
             ?retry(
                 _Interval = 200,
                 _NAttempts = 20,
@@ -1275,10 +1282,6 @@ t_multiple_pull_workers(Config) ->
                         <<"ack_deadline">> => <<"10m">>,
                         <<"ack_retry_interval">> => <<"1s">>,
                         <<"consumer_workers_per_topic">> => NConsumers
-                    },
-                    <<"resource_opts">> => #{
-                        %% reduce flakiness
-                        <<"request_ttl">> => <<"20s">>
                     }
                 }
             ),

+ 13 - 3
apps/emqx_bridge_redis/src/emqx_bridge_redis.erl

@@ -101,7 +101,14 @@ namespace() -> "bridge_redis".
 roots() -> [].
 
 fields(action_parameters) ->
-    [{command_template, fun command_template/1}];
+    [
+        command_template(),
+        {redis_type,
+            ?HOCON(
+                ?ENUM([single, sentinel, cluster]),
+                #{required => true, desc => ?DESC(redis_type)}
+            )}
+    ];
 fields("post_single") ->
     method_fields(post, redis_single);
 fields("post_sentinel") ->
@@ -147,8 +154,8 @@ method_fields(put, ConnectorType) ->
 redis_bridge_common_fields(Type) ->
     emqx_bridge_schema:common_bridge_fields() ++
         [
-            {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})}
-            | fields(action_parameters)
+            {local_topic, mk(binary(), #{required => false, desc => ?DESC("desc_local_topic")})},
+            command_template()
         ] ++
         v1_resource_fields(Type).
 
@@ -222,3 +229,6 @@ is_command_template_valid(CommandSegments) ->
                 "the value of the field 'command_template' should be a nonempty "
                 "list of strings (templates for Redis command and arguments)"}
     end.
+
+command_template() ->
+    {command_template, fun command_template/1}.

+ 4 - 10
apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl

@@ -76,13 +76,7 @@ fields(redis_action) ->
             )
         ),
     [ResOpts] = emqx_connector_schema:resource_opts_ref(?MODULE, action_resource_opts),
-    RedisType =
-        {redis_type,
-            ?HOCON(
-                ?ENUM([single, sentinel, cluster]),
-                #{required => true, desc => ?DESC(redis_type)}
-            )},
-    [RedisType | lists:keyreplace(resource_opts, 1, Schema, ResOpts)];
+    lists:keyreplace(resource_opts, 1, Schema, ResOpts);
 fields(action_resource_opts) ->
     emqx_bridge_v2_schema:resource_opts_fields([
         {batch_size, #{desc => ?DESC(batch_size)}},
@@ -130,7 +124,7 @@ resource_opts_converter(Conf, _Opts) ->
     maps:map(
         fun(_Name, SubConf) ->
             case SubConf of
-                #{<<"redis_type">> := <<"cluster">>} ->
+                #{<<"parameters">> := #{<<"redis_type">> := <<"cluster">>}} ->
                     ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}),
                     %% cluster don't support batch
                     SubConf#{
@@ -218,12 +212,12 @@ action_example(RedisType, get) ->
     );
 action_example(RedisType, put) ->
     #{
-        redis_type => RedisType,
         enable => true,
         connector => <<"my_connector_name">>,
         description => <<"My action">>,
         parameters => #{
-            command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>]
+            command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>],
+            redis_type => RedisType
         },
         resource_opts => #{batch_size => 1}
     }.

+ 5 - 11
apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl

@@ -229,7 +229,10 @@ action_config(Name, Path, ConnectorId) ->
             <<"enable">> => true,
             <<"connector">> => ConnectorId,
             <<"parameters">> =>
-                #{<<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>]},
+                #{
+                    <<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>],
+                    <<"redis_type">> => atom_to_binary(RedisType)
+                },
             <<"local_topic">> => <<"t/redis">>,
             <<"resource_opts">> => #{
                 <<"batch_size">> => 1,
@@ -246,18 +249,9 @@ action_config(Name, Path, ConnectorId) ->
                 <<"worker_pool_size">> => <<"1">>
             }
         },
-    PerTypeCfg = per_type_action_config(RedisType),
-    InnerConfigMap0 = emqx_utils_maps:deep_merge(CommonCfg, PerTypeCfg),
-    InnerConfigMap = serde_roundtrip(InnerConfigMap0),
+    InnerConfigMap = serde_roundtrip(CommonCfg),
     parse_and_check_bridge_config(InnerConfigMap, Name).
 
-per_type_action_config(single) ->
-    #{<<"redis_type">> => <<"single">>};
-per_type_action_config(sentinel) ->
-    #{<<"redis_type">> => <<"sentinel">>};
-per_type_action_config(cluster) ->
-    #{<<"redis_type">> => <<"cluster">>}.
-
 %% check it serializes correctly
 serde_roundtrip(InnerConfigMap0) ->
     IOList = hocon_pp:do(InnerConfigMap0, #{}),

+ 1 - 1
apps/emqx_conf/src/emqx_conf.erl

@@ -188,7 +188,7 @@ gen_schema_json(Dir, SchemaModule, Lang) ->
 gen_preformat_md_json_files(Dir, StructsJsonArray, Lang) ->
     NestedStruct = reformat_schema_dump(StructsJsonArray),
     %% write to files
-    NestedJsonFile = filename:join([Dir, "schmea-v2-" ++ Lang ++ ".json"]),
+    NestedJsonFile = filename:join([Dir, "schema-v2-" ++ Lang ++ ".json"]),
     io:format(user, "===< Generating: ~s~n", [NestedJsonFile]),
     ok = file:write_file(
         NestedJsonFile, emqx_utils_json:encode(NestedStruct, [pretty, force_utf8])

+ 19 - 3
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -233,7 +233,10 @@ load_config(Bin, Opts) when is_binary(Bin) ->
             {error, Reason}
     end.
 
-load_config_from_raw(RawConf, Opts) ->
+load_config_from_raw(RawConf0, Opts) ->
+    SchemaMod = emqx_conf:schema_module(),
+    RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0),
+    RawConf = emqx_config:fill_defaults(RawConf1),
     case check_config(RawConf) of
         ok ->
             Error =
@@ -452,8 +455,21 @@ sorted_fold(Func, Conf) ->
         Error -> {error, Error}
     end.
 
-to_sorted_list(Conf) ->
-    lists:keysort(1, maps:to_list(Conf)).
+to_sorted_list(Conf0) ->
+    %% connectors > actions/bridges > rule_engine
+    Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>],
+    {HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []),
+    HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)).
+
+split_high_priority_conf([], Conf0, Acc) ->
+    {lists:reverse(Acc), Conf0};
+split_high_priority_conf([Key | Keys], Conf0, Acc) ->
+    case maps:take(Key, Conf0) of
+        error ->
+            split_high_priority_conf(Keys, Conf0, Acc);
+        {Value, Conf1} ->
+            split_high_priority_conf(Keys, Conf1, [{Key, Value} | Acc])
+    end.
 
 merge_conf(Key, NewConf) ->
     OldConf = emqx_conf:get_raw([Key]),

+ 1 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -596,7 +596,7 @@ fields("node") ->
                 #{
                     mapping => "mria.shard_transport",
                     importance => ?IMPORTANCE_HIDDEN,
-                    default => gen_rpc,
+                    default => distr,
                     desc => ?DESC(db_default_shard_transport)
                 }
             )},

+ 12 - 3
apps/emqx_conf/test/emqx_conf_cli_SUITE.erl

@@ -40,7 +40,7 @@ t_load_config(Config) ->
     ConfBin = hocon_pp:do(#{<<"authorization">> => #{<<"sources">> => []}}, #{}),
     ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
     ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
-    ?assertEqual(#{<<"sources">> => []}, emqx_conf:get_raw([Authz])),
+    ?assertMatch(#{<<"sources">> := []}, emqx_conf:get_raw([Authz])),
 
     ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}),
     ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
@@ -73,6 +73,10 @@ t_conflict_mix_conf(Config) ->
             AuthNInit = emqx_conf:get_raw([authentication]),
             Redis = #{
                 <<"backend">> => <<"redis">>,
+                <<"database">> => 0,
+                <<"password_hash_algorithm">> =>
+                    #{<<"name">> => <<"sha256">>, <<"salt_position">> => <<"prefix">>},
+                <<"pool_size">> => 8,
                 <<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt">>,
                 <<"enable">> => false,
                 <<"mechanism">> => <<"password_based">>,
@@ -85,10 +89,15 @@ t_conflict_mix_conf(Config) ->
             ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
             %% init with redis sources
             ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
-            ?assertMatch([Redis], emqx_conf:get_raw([authentication])),
+            [RedisRaw] = emqx_conf:get_raw([authentication]),
+            ?assertEqual(
+                maps:to_list(Redis),
+                maps:to_list(maps:remove(<<"ssl">>, RedisRaw)),
+                {Redis, RedisRaw}
+            ),
             %% change redis type from single to cluster
             %% the server field will become servers field
-            RedisCluster = maps:remove(<<"server">>, Redis#{
+            RedisCluster = maps:without([<<"server">>, <<"database">>], Redis#{
                 <<"redis_type">> => cluster,
                 <<"servers">> => [<<"127.0.0.1:6379">>]
             }),

+ 3 - 2
apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl

@@ -315,8 +315,6 @@ t_none_ref(_Config) ->
     ),
     ok.
 
-namespace() -> undefined.
-
 t_sub_fields(_Config) ->
     Spec = #{
         post => #{
@@ -815,6 +813,9 @@ to_schema(Body) ->
         post => #{requestBody => Body, responses => #{200 => <<"ok">>}}
     }.
 
+%% Don't warning hocon callback namespace/0 undef.
+namespace() -> atom_to_list(?MODULE).
+
 fields(good_ref) ->
     [
         {'webhook-host', mk(emqx_schema:ip_port(), #{default => <<"127.0.0.1:80">>})},

+ 19 - 3
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -197,6 +197,10 @@ subscriptions(get, #{
         case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
             {error, not_found} ->
                 return_http_error(404, "client process not found");
+            {error, ignored} ->
+                return_http_error(
+                    400, "get subscriptions failed: unsupported"
+                );
             {error, Reason} ->
                 return_http_error(400, Reason);
             {ok, Subs} ->
@@ -222,7 +226,13 @@ subscriptions(post, #{
                     )
                 of
                     {error, not_found} ->
-                        return_http_error(404, "client process not found");
+                        return_http_error(
+                            404, "client process not found"
+                        );
+                    {error, ignored} ->
+                        return_http_error(
+                            400, "subscribe failed: unsupported"
+                        );
                     {error, Reason} ->
                         return_http_error(400, Reason);
                     {ok, {NTopic, NSubOpts}} ->
@@ -241,8 +251,14 @@ subscriptions(delete, #{
     with_gateway(Name0, fun(GwName, _) ->
         case lookup_topic(GwName, ClientId, Topic) of
             {ok, _} ->
-                _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
-                {204};
+                case emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic) of
+                    {error, ignored} ->
+                        return_http_error(
+                            400, "unsubscribe failed: unsupported"
+                        );
+                    _ ->
+                        {204}
+                end;
             {error, not_found} ->
                 return_http_error(404, "Resource not found")
         end

+ 2 - 0
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -378,6 +378,8 @@ client_call(GwName, ClientId, Req) ->
     of
         undefined ->
             {error, not_found};
+        ignored ->
+            {error, ignored};
         Res ->
             Res
     catch

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -174,7 +174,7 @@ fields(dtls_opts) ->
             reuse_sessions => true,
             versions => dtls_all_available
         },
-        false
+        _IsRanchListener = false
     ).
 
 desc(gateway) ->

+ 37 - 10
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -273,7 +273,7 @@ merge_default(Udp, Options) ->
             udp ->
                 {udp_options, default_udp_options()};
             dtls ->
-                {udp_options, default_udp_options()};
+                {dtls_options, default_udp_options()};
             tcp ->
                 {tcp_options, default_tcp_options()};
             ssl ->
@@ -525,9 +525,11 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) ->
             udp ->
                 Opts2#{udp_options => sock_opts(udp_options, Opts0)};
             dtls ->
+                UDPOpts = sock_opts(udp_options, Opts0),
+                DTLSOpts = ssl_opts(dtls_options, Opts0),
                 Opts2#{
-                    udp_options => sock_opts(udp_options, Opts0),
-                    dtls_options => ssl_opts(dtls_options, Opts0)
+                    udp_options => UDPOpts,
+                    dtls_options => DTLSOpts
                 }
         end
     ).
@@ -541,12 +543,37 @@ sock_opts(Name, Opts) ->
     ).
 
 ssl_opts(Name, Opts) ->
-    Type =
-        case Name of
-            ssl_options -> tls;
-            dtls_options -> dtls
-        end,
-    emqx_tls_lib:to_server_opts(Type, maps:get(Name, Opts, #{})).
+    SSLOpts = maps:get(Name, Opts, #{}),
+    emqx_utils:run_fold(
+        [
+            fun ssl_opts_crl_config/2,
+            fun ssl_opts_drop_unsupported/2,
+            fun ssl_server_opts/2
+        ],
+        SSLOpts,
+        Name
+    ).
+
+ssl_opts_crl_config(#{enable_crl_check := true} = SSLOpts, _Name) ->
+    HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
+    NSSLOpts = maps:remove(enable_crl_check, SSLOpts),
+    NSSLOpts#{
+        %% `crl_check => true' doesn't work
+        crl_check => peer,
+        crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
+    };
+ssl_opts_crl_config(SSLOpts, _Name) ->
+    %% NOTE: Removing this because DTLS doesn't like any unknown options.
+    maps:remove(enable_crl_check, SSLOpts).
+
+ssl_opts_drop_unsupported(SSLOpts, _Name) ->
+    %% TODO: Support OCSP stapling
+    maps:without([ocsp], SSLOpts).
+
+ssl_server_opts(SSLOpts, ssl_options) ->
+    emqx_tls_lib:to_server_opts(tls, SSLOpts);
+ssl_server_opts(SSLOpts, dtls_options) ->
+    emqx_tls_lib:to_server_opts(dtls, SSLOpts).
 
 ranch_opts(Type, ListenOn, Opts) ->
     NumAcceptors = maps:get(acceptors, Opts, 4),
@@ -635,7 +662,7 @@ default_tcp_options() ->
     ].
 
 default_udp_options() ->
-    [binary].
+    [].
 
 default_subopts() ->
     %% Retain Handling

+ 4 - 1
apps/emqx_gateway/test/emqx_gateway_auth_ct.erl

@@ -238,9 +238,12 @@ http_authz_config() ->
 init_gateway_conf() ->
     ok = emqx_common_test_helpers:load_config(
         emqx_gateway_schema,
-        merge_conf([X:default_config() || X <- ?CONFS], [])
+        merge_conf(list_gateway_conf(), [])
     ).
 
+list_gateway_conf() ->
+    [X:default_config() || X <- ?CONFS].
+
 merge_conf([Conf | T], Acc) ->
     case re:run(Conf, "\s*gateway\\.(.*)", [global, {capture, all_but_first, list}, dotall]) of
         {match, [[Content]]} ->

+ 25 - 28
apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl

@@ -22,7 +22,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]).
+-import(emqx_gateway_auth_ct, [with_resource/3]).
 
 -define(checkMatch(Guard),
     (fun(Expr) ->
@@ -54,40 +54,37 @@ groups() ->
     emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS).
 
 init_per_group(AuthName, Conf) ->
-    ct:pal("on group start:~p~n", [AuthName]),
-    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    emqx_gateway_auth_ct:start_auth(AuthName),
-    timer:sleep(500),
-    Conf.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            emqx_auth,
+            emqx_auth_http,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
+            {emqx_gateway, emqx_gateway_auth_ct:list_gateway_conf()}
+            | emqx_gateway_test_utils:all_gateway_apps()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Conf)}
+    ),
+    _ = emqx_common_test_http:create_default_app(),
+    ok = emqx_gateway_auth_ct:start_auth(AuthName),
+    [{group_apps, Apps} | Conf].
 
 end_per_group(AuthName, Conf) ->
-    ct:pal("on group stop:~p~n", [AuthName]),
-    emqx_gateway_auth_ct:stop_auth(AuthName),
+    ok = emqx_gateway_auth_ct:stop_auth(AuthName),
+    _ = emqx_common_test_http:delete_default_app(),
+    ok = emqx_cth_suite:stop(?config(group_apps, Conf)),
     Conf.
 
 init_per_suite(Config) ->
-    emqx_gateway_test_utils:load_all_gateway_apps(),
-    emqx_config:erase(gateway),
-    init_gateway_conf(),
-    emqx_mgmt_api_test_util:init_suite([grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway]),
-    application:ensure_all_started(cowboy),
-    emqx_gateway_auth_ct:start(),
-    timer:sleep(500),
-    Config.
+    {ok, Apps1} = application:ensure_all_started(grpc),
+    {ok, Apps2} = application:ensure_all_started(cowboy),
+    {ok, _} = emqx_gateway_auth_ct:start(),
+    [{suite_apps, Apps1 ++ Apps2} | Config].
 
 end_per_suite(Config) ->
-    emqx_gateway_auth_ct:stop(),
-    emqx_config:erase(gateway),
-    emqx_mgmt_api_test_util:end_suite([
-        cowboy, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway, grpc
-    ]),
-    Config.
-
-init_per_testcase(_Case, Config) ->
-    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    Config.
-
-end_per_testcase(_Case, Config) ->
+    ok = emqx_gateway_auth_ct:stop(),
+    ok = emqx_cth_suite:stop_apps(?config(suite_apps, Config)),
     Config.
 
 %%------------------------------------------------------------------------------

+ 21 - 32
apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl

@@ -22,7 +22,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]).
+-import(emqx_gateway_auth_ct, [with_resource/3]).
 
 -define(checkMatch(Guard),
     (fun(Expr) ->
@@ -54,44 +54,33 @@ groups() ->
     emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS).
 
 init_per_group(AuthName, Conf) ->
-    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    ok = emqx_authz_test_lib:reset_authorizers(),
-    emqx_gateway_auth_ct:start_auth(AuthName),
-    timer:sleep(500),
-    Conf.
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx_conf, "authorization { no_match = deny, cache { enable = false } }"},
+            emqx_auth,
+            emqx_auth_http,
+            {emqx_gateway, emqx_gateway_auth_ct:list_gateway_conf()}
+            | emqx_gateway_test_utils:all_gateway_apps()
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Conf)}
+    ),
+    ok = emqx_gateway_auth_ct:start_auth(AuthName),
+    [{group_apps, Apps} | Conf].
 
 end_per_group(AuthName, Conf) ->
-    emqx_gateway_auth_ct:stop_auth(AuthName),
+    ok = emqx_gateway_auth_ct:stop_auth(AuthName),
+    ok = emqx_cth_suite:stop(?config(group_apps, Conf)),
     Conf.
 
 init_per_suite(Config) ->
-    emqx_config:erase(gateway),
-    emqx_gateway_test_utils:load_all_gateway_apps(),
-    init_gateway_conf(),
-    emqx_mgmt_api_test_util:init_suite([
-        grpc, emqx_conf, emqx_auth, emqx_auth_http, emqx_gateway
-    ]),
-    meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_authz_file, create, fun(S) -> S end),
-    application:ensure_all_started(cowboy),
-    emqx_gateway_auth_ct:start(),
-    Config.
+    {ok, Apps1} = application:ensure_all_started(grpc),
+    {ok, Apps2} = application:ensure_all_started(cowboy),
+    {ok, _} = emqx_gateway_auth_ct:start(),
+    [{suite_apps, Apps1 ++ Apps2} | Config].
 
 end_per_suite(Config) ->
-    meck:unload(emqx_authz_file),
-    emqx_gateway_auth_ct:stop(),
-    ok = emqx_authz_test_lib:restore_authorizers(),
-    emqx_config:erase(gateway),
-    emqx_mgmt_api_test_util:end_suite([
-        emqx_gateway, emqx_auth_http, emqx_auth, emqx_conf, grpc
-    ]),
-    Config.
-
-init_per_testcase(_Case, Config) ->
-    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    Config.
-
-end_per_testcase(_Case, Config) ->
+    ok = emqx_gateway_auth_ct:stop(),
+    ok = emqx_cth_suite:stop_apps(?config(suite_apps, Config)),
     Config.
 
 %%------------------------------------------------------------------------------

+ 11 - 5
apps/emqx_gateway/test/emqx_gateway_test_utils.erl

@@ -103,12 +103,18 @@ assert_fields_exist(Ks, Map) ->
         end,
         Ks
     ).
+
 load_all_gateway_apps() ->
-    application:load(emqx_gateway_stomp),
-    application:load(emqx_gateway_mqttsn),
-    application:load(emqx_gateway_coap),
-    application:load(emqx_gateway_lwm2m),
-    application:load(emqx_gateway_exproto).
+    emqx_cth_suite:load_apps(all_gateway_apps()).
+
+all_gateway_apps() ->
+    [
+        emqx_gateway_stomp,
+        emqx_gateway_mqttsn,
+        emqx_gateway_coap,
+        emqx_gateway_lwm2m,
+        emqx_gateway_exproto
+    ].
 
 %%--------------------------------------------------------------------
 %% http

+ 57 - 57
apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl

@@ -20,7 +20,6 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
--include_lib("emqx/include/emqx_hooks.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -44,14 +43,6 @@
 -define(TCPOPTS, [binary, {active, false}]).
 -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
 
--define(PORT, 7993).
-
--define(DEFAULT_CLIENT, #{
-    proto_name => <<"demo">>,
-    proto_ver => <<"v0.1">>,
-    clientid => <<"test_client_1">>
-}).
-
 %%--------------------------------------------------------------------
 -define(CONF_DEFAULT, <<
     "\n"
@@ -126,15 +117,33 @@ init_per_group(_, Cfg) ->
 
 init_per_group(LisType, ServiceName, Scheme, Cfg) ->
     Svrs = emqx_exproto_echo_svr:start(Scheme),
-    application:load(emqx_gateway_exproto),
-    emqx_common_test_helpers:start_apps(
-        [emqx_conf, emqx_auth, emqx_gateway],
-        fun(App) ->
-            set_special_cfg(App, LisType, ServiceName, Scheme)
-        end
+    Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
+    GWConfig = #{
+        server => #{bind => 9100},
+        idle_timeout => 5000,
+        mountpoint => <<"ct/">>,
+        handler => #{
+            address => Addrs,
+            service_name => ServiceName,
+            ssl_options => #{enable => Scheme == https}
+        },
+        listeners => listener_confs(LisType)
+    },
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            emqx_auth,
+            {emqx_gateway, #{
+                config =>
+                    #{gateway => #{exproto => GWConfig}}
+            }},
+            emqx_gateway_exproto
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Cfg)}
     ),
     [
         {servers, Svrs},
+        {apps, Apps},
         {listener_type, LisType},
         {service_name, ServiceName},
         {grpc_client_scheme, Scheme}
@@ -142,8 +151,7 @@ init_per_group(LisType, ServiceName, Scheme, Cfg) ->
     ].
 
 end_per_group(_, Cfg) ->
-    emqx_config:erase(gateway),
-    emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_auth, emqx_conf]),
+    ok = emqx_cth_suite:stop(proplists:get_value(apps, Cfg)),
     emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
 
 init_per_testcase(TestCase, Cfg) when
@@ -159,28 +167,13 @@ init_per_testcase(_TestCase, Cfg) ->
 end_per_testcase(_TestCase, _Cfg) ->
     ok.
 
-set_special_cfg(emqx_gateway, LisType, ServiceName, Scheme) ->
-    Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
-    emqx_config:put(
-        [gateway, exproto],
-        #{
-            server => #{bind => 9100},
-            idle_timeout => 5000,
-            mountpoint => <<"ct/">>,
-            handler => #{
-                address => Addrs,
-                service_name => ServiceName,
-                ssl_options => #{enable => Scheme == https}
-            },
-            listeners => listener_confs(LisType)
-        }
-    );
-set_special_cfg(_, _, _, _) ->
-    ok.
-
 listener_confs(Type) ->
-    Default = #{bind => 7993, acceptors => 8},
-    #{Type => #{'default' => maps:merge(Default, server_socketopts(Type))}}.
+    Default = #{
+        bind => 7993,
+        max_connections => 64,
+        access_rules => ["allow all"]
+    },
+    #{Type => #{'default' => maps:merge(Default, socketopts(Type))}}.
 
 default_config() ->
     ?CONF_DEFAULT.
@@ -635,24 +628,29 @@ close({dtls, Sock}) ->
 %%--------------------------------------------------------------------
 %% Server-Opts
 
-server_socketopts(tcp) ->
-    #{tcp_options => server_tcp_opts()};
-server_socketopts(ssl) ->
+socketopts(tcp) ->
+    #{
+        acceptors => 8,
+        tcp_options => tcp_opts()
+    };
+socketopts(ssl) ->
     #{
-        tcp_options => server_tcp_opts(),
-        ssl_options => server_ssl_opts()
+        acceptors => 8,
+        tcp_options => tcp_opts(),
+        ssl_options => ssl_opts()
     };
-server_socketopts(udp) ->
-    #{udp_options => server_udp_opts()};
-server_socketopts(dtls) ->
+socketopts(udp) ->
+    #{udp_options => udp_opts()};
+socketopts(dtls) ->
     #{
-        udp_options => server_udp_opts(),
-        dtls_options => server_dtls_opts()
+        acceptors => 8,
+        udp_options => udp_opts(),
+        dtls_options => dtls_opts()
     }.
 
-server_tcp_opts() ->
+tcp_opts() ->
     maps:merge(
-        server_udp_opts(),
+        udp_opts(),
         #{
             send_timeout => 15000,
             send_timeout_close => true,
@@ -661,15 +659,17 @@ server_tcp_opts() ->
         }
     ).
 
-server_udp_opts() ->
+udp_opts() ->
     #{
-        recbuf => 1024,
-        sndbuf => 1024,
-        buffer => 1024,
+        %% NOTE
+        %% Making those too small will lead to inability to accept connections.
+        recbuf => 2048,
+        sndbuf => 2048,
+        buffer => 2048,
         reuseaddr => true
     }.
 
-server_ssl_opts() ->
+ssl_opts() ->
     Certs = certs("key.pem", "cert.pem", "cacert.pem"),
     maps:merge(
         Certs,
@@ -684,8 +684,8 @@ server_ssl_opts() ->
         }
     ).
 
-server_dtls_opts() ->
-    maps:merge(server_ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}).
+dtls_opts() ->
+    maps:merge(ssl_opts(), #{versions => ['dtlsv1.2', 'dtlsv1']}).
 
 %%--------------------------------------------------------------------
 %% Client-Opts

+ 65 - 16
apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl

@@ -66,7 +66,6 @@
 -elvis([{elvis_style, dont_repeat_yourself, disable}]).
 
 -define(CONF_DEFAULT, <<
-    "\n"
     "gateway.mqttsn {\n"
     "  gateway_id = 1\n"
     "  broadcast = true\n"
@@ -89,6 +88,20 @@
     "}\n"
 >>).
 
+-define(CONF_DTLS, <<
+    "\n"
+    "gateway.mqttsn {"
+    "  listeners.dtls.default {\n"
+    "    bind = 1885\n"
+    "    dtls_options {\n"
+    "      cacertfile = \"${cacertfile}\"\n"
+    "      certfile = \"${certfile}\"\n"
+    "      keyfile = \"${keyfile}\"\n"
+    "    }\n"
+    "  }\n"
+    "}\n"
+>>).
+
 %%--------------------------------------------------------------------
 %% Setups
 %%--------------------------------------------------------------------
@@ -97,9 +110,22 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
+    PrivDir = ?config(priv_dir, Config),
+    Root = emqx_cth_tls:gen_cert(#{key => ec, issuer => root}),
+    Server = emqx_cth_tls:gen_cert(#{key => ec, issuer => Root}),
+    {CACertfile, _} = emqx_cth_tls:write_cert(PrivDir, Root),
+    {Certfile, Keyfile} = emqx_cth_tls:write_cert(PrivDir, Server),
+    Conf = emqx_template:render_strict(
+        emqx_template:parse([?CONF_DEFAULT, ?CONF_DTLS]),
+        #{
+            cacertfile => CACertfile,
+            certfile => Certfile,
+            keyfile => Keyfile
+        }
+    ),
     Apps = emqx_cth_suite:start(
         [
-            {emqx_conf, ?CONF_DEFAULT},
+            {emqx_conf, Conf},
             emqx_gateway,
             emqx_auth,
             emqx_management,
@@ -108,7 +134,7 @@ init_per_suite(Config) ->
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     emqx_common_test_http:create_default_app(),
-    [{suite_apps, Apps} | Config].
+    [{suite_apps, Apps}, {cacertfile, CACertfile} | Config].
 
 end_per_suite(Config) ->
     {ok, _} = emqx:remove_config([gateway, mqttsn]),
@@ -191,6 +217,25 @@ t_first_disconnect(_) ->
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
     gen_udp:close(Socket).
 
+t_connect_dtls(Config) ->
+    SockName = {'mqttsn:dtls:default', 1885},
+    ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
+
+    ClientOpts = [
+        binary,
+        {active, false},
+        {protocol, dtls},
+        {cacertfile, ?config(cacertfile, Config)}
+        | emqx_common_test_helpers:ssl_verify_fun_allow_any_host()
+    ],
+    {ok, Socket} = ssl:connect(?HOST, 1885, ClientOpts, 1000),
+    ok = ssl:send(Socket, make_connect_msg(<<"client_id_test1">>, 1)),
+    ?assertEqual({ok, <<3, ?SN_CONNACK, 0>>}, ssl:recv(Socket, 0, 1000)),
+
+    ok = ssl:send(Socket, make_disconnect_msg(undefined)),
+    ?assertEqual({ok, <<2, ?SN_DISCONNECT>>}, ssl:recv(Socket, 0, 1000)),
+    ssl:close(Socket).
+
 t_subscribe(_) ->
     Dup = 0,
     QoS = 0,
@@ -2444,10 +2489,7 @@ send_searchgw_msg(Socket) ->
     Radius = 0,
     ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
 
-send_connect_msg(Socket, ClientId) ->
-    send_connect_msg(Socket, ClientId, 1).
-
-send_connect_msg(Socket, ClientId, CleanSession) when
+make_connect_msg(ClientId, CleanSession) when
     CleanSession == 0;
     CleanSession == 1
 ->
@@ -2460,9 +2502,14 @@ send_connect_msg(Socket, ClientId, CleanSession) when
     TopicIdType = 0,
     ProtocolId = 1,
     Duration = 10,
-    Packet =
-        <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2,
-            ProtocolId:8, Duration:16, ClientId/binary>>,
+    <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2,
+        ProtocolId:8, Duration:16, ClientId/binary>>.
+
+send_connect_msg(Socket, ClientId) ->
+    send_connect_msg(Socket, ClientId, 1).
+
+send_connect_msg(Socket, ClientId, CleanSession) ->
+    Packet = make_connect_msg(ClientId, CleanSession),
     ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
 
 send_connect_msg_with_will(Socket, Duration, ClientId) ->
@@ -2724,15 +2771,17 @@ send_pingreq_msg(Socket, ClientId) ->
     ?LOG("send_pingreq_msg ClientId=~p", [ClientId]),
     ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket).
 
-send_disconnect_msg(Socket, Duration) ->
+make_disconnect_msg(Duration) ->
     Length = 2,
     Length2 = 4,
     MsgType = ?SN_DISCONNECT,
-    DisConnectPacket =
-        case Duration of
-            undefined -> <<Length:8, MsgType:8>>;
-            Other -> <<Length2:8, MsgType:8, Other:16>>
-        end,
+    case Duration of
+        undefined -> <<Length:8, MsgType:8>>;
+        Other -> <<Length2:8, MsgType:8, Other:16>>
+    end.
+
+send_disconnect_msg(Socket, Duration) ->
+    DisConnectPacket = make_disconnect_msg(Duration),
     ?LOG("send_disconnect_msg Duration=~p", [Duration]),
     ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket).
 

+ 12 - 7
apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

@@ -186,10 +186,10 @@ info(timers, #channel{timers = Timers}) ->
 
 -spec stats(channel()) -> emqx_types:stats().
 stats(#channel{mqueue = MQueue}) ->
-    %% XXX:
+    %% XXX: A fake stats for managed by emqx_management
     SessionStats = [
-        {subscriptions_cnt, 0},
-        {subscriptions_max, 0},
+        {subscriptions_cnt, 1},
+        {subscriptions_max, 1},
         {inflight_cnt, 0},
         {inflight_max, 0},
         {mqueue_len, queue:len(MQueue)},
@@ -524,9 +524,13 @@ handle_out(Type, Data, Channel) ->
 %%--------------------------------------------------------------------
 
 apply_frame(Frames, Channel) when is_list(Frames) ->
-    {Outgoings, NChannel} = lists:foldl(fun apply_frame/2, {[], Channel}, Frames),
+    {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames),
     {lists:reverse(Outgoings), NChannel};
-apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) ->
+apply_frame(Frames, Channel) ->
+    ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}),
+    Channel.
+
+do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) ->
     case maps:get(<<"status">>, Payload) of
         <<"Accepted">> ->
             Intv = maps:get(<<"interval">>, Payload),
@@ -535,8 +539,9 @@ apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) ->
         _ ->
             {Outgoings, Channel}
     end;
-apply_frame(_, Channel) ->
-    Channel.
+do_apply_frame(Frame, Acc = {_Outgoings, Channel}) ->
+    ?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}),
+    Acc.
 
 %%--------------------------------------------------------------------
 %% Handle call

+ 21 - 21
apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl

@@ -33,27 +33,27 @@
 
 -define(HEARTBEAT, <<$\n>>).
 
--define(CONF_DEFAULT, <<
-    "\n"
-    "gateway.ocpp {\n"
-    "  mountpoint = \"ocpp/\"\n"
-    "  default_heartbeat_interval = \"60s\"\n"
-    "  heartbeat_checking_times_backoff = 1\n"
-    "  message_format_checking = disable\n"
-    "  upstream {\n"
-    "    topic = \"cp/${clientid}\"\n"
-    "    reply_topic = \"cp/${clientid}/Reply\"\n"
-    "    error_topic = \"cp/${clientid}/Reply\"\n"
-    "  }\n"
-    "  dnstream {\n"
-    "    topic = \"cs/${clientid}\"\n"
-    "  }\n"
-    "  listeners.ws.default {\n"
-    "      bind = \"0.0.0.0:33033\"\n"
-    "      websocket.path = \"/ocpp\"\n"
-    "  }\n"
-    "}\n"
->>).
+%% erlfmt-ignore
+-define(CONF_DEFAULT, <<"
+    gateway.ocpp {
+      mountpoint = \"ocpp/\"
+      default_heartbeat_interval = \"60s\"
+      heartbeat_checking_times_backoff = 1
+      message_format_checking = disable
+      upstream {
+        topic = \"cp/${clientid}\"
+        reply_topic = \"cp/${clientid}/Reply\"
+        error_topic = \"cp/${clientid}/Reply\"
+      }
+      dnstream {
+        topic = \"cs/${clientid}\"
+      }
+      listeners.ws.default {
+          bind = \"0.0.0.0:33033\"
+          websocket.path = \"/ocpp\"
+      }
+    }
+">>).
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 

+ 99 - 0
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -32,11 +33,15 @@ end_per_suite(_) ->
 
 init_per_testcase(TestCase = t_configs_node, Config) ->
     ?MODULE:TestCase({'init', Config});
+init_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) ->
+    ?MODULE:TestCase({'init', Config});
 init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(TestCase = t_configs_node, Config) ->
     ?MODULE:TestCase({'end', Config});
+end_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) ->
+    ?MODULE:TestCase({'end', Config});
 end_per_testcase(_TestCase, Config) ->
     Config.
 
@@ -372,6 +377,100 @@ t_get_configs_in_different_accept(_Config) ->
     %% returns error if it set to other type
     ?assertMatch({400, "application/json", _}, Request(<<"application/xml">>)).
 
+t_create_webhook_v1_bridges_api({'init', Config}) ->
+    application:ensure_all_started(emqx_connector),
+    application:ensure_all_started(emqx_bridge),
+    Config;
+t_create_webhook_v1_bridges_api({'end', _}) ->
+    application:stop(emqx_bridge),
+    application:stop(emqx_connector),
+    ok;
+t_create_webhook_v1_bridges_api(Config) ->
+    WebHookFile = filename:join(?config(data_dir, Config), "webhook_v1.conf"),
+    ?assertMatch({ok, _}, hocon:files([WebHookFile])),
+    {ok, WebHookBin} = file:read_file(WebHookFile),
+    ?assertEqual([], update_configs_with_binary(WebHookBin)),
+    Actions =
+        #{
+            <<"http">> =>
+                #{
+                    <<"webhook_name">> =>
+                        #{
+                            <<"connector">> => <<"connector_webhook_name">>,
+                            <<"description">> => <<>>,
+                            <<"enable">> => true,
+                            <<"parameters">> =>
+                                #{
+                                    <<"body">> => <<"{\"value\": \"${value}\"}">>,
+                                    <<"headers">> => #{},
+                                    <<"max_retries">> => 3,
+                                    <<"method">> => <<"post">>,
+                                    <<"path">> => <<>>
+                                },
+                            <<"resource_opts">> =>
+                                #{
+                                    <<"health_check_interval">> => <<"15s">>,
+                                    <<"inflight_window">> => 100,
+                                    <<"max_buffer_bytes">> => <<"256MB">>,
+                                    <<"query_mode">> => <<"async">>,
+                                    <<"request_ttl">> => <<"45s">>,
+                                    <<"worker_pool_size">> => 4
+                                }
+                        }
+                }
+        },
+    ?assertEqual(Actions, emqx_conf:get_raw([<<"actions">>])),
+    Connectors =
+        #{
+            <<"http">> =>
+                #{
+                    <<"connector_webhook_name">> =>
+                        #{
+                            <<"connect_timeout">> => <<"15s">>,
+                            <<"description">> => <<>>,
+                            <<"enable">> => true,
+                            <<"enable_pipelining">> => 100,
+                            <<"headers">> =>
+                                #{
+                                    <<"Authorization">> => <<"Bearer redacted">>,
+                                    <<"content-type">> => <<"application/json">>
+                                },
+                            <<"pool_size">> => 4,
+                            <<"pool_type">> => <<"random">>,
+                            <<"resource_opts">> =>
+                                #{
+                                    <<"health_check_interval">> => <<"15s">>,
+                                    <<"start_after_created">> => true,
+                                    <<"start_timeout">> => <<"5s">>
+                                },
+                            <<"ssl">> =>
+                                #{
+                                    <<"ciphers">> => [],
+                                    <<"depth">> => 10,
+                                    <<"enable">> => true,
+                                    <<"hibernate_after">> => <<"5s">>,
+                                    <<"log_level">> => <<"notice">>,
+                                    <<"reuse_sessions">> => true,
+                                    <<"secure_renegotiate">> => true,
+                                    <<"user_lookup_fun">> =>
+                                        <<"emqx_tls_psk:lookup">>,
+                                    <<"verify">> => <<"verify_none">>,
+                                    <<"versions">> =>
+                                        [
+                                            <<"tlsv1.3">>,
+                                            <<"tlsv1.2">>,
+                                            <<"tlsv1.1">>,
+                                            <<"tlsv1">>
+                                        ]
+                                },
+                            <<"url">> => <<"https://127.0.0.1:18083">>
+                        }
+                }
+        },
+    ?assertEqual(Connectors, emqx_conf:get_raw([<<"connectors">>])),
+    ?assertEqual(#{<<"webhook">> => #{}}, emqx_conf:get_raw([<<"bridges">>])),
+    ok.
+
 %% Helpers
 
 get_config(Name) ->

+ 36 - 0
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE_data/webhook_v1.conf

@@ -0,0 +1,36 @@
+bridges {
+  webhook {
+    webhook_name {
+        body = "{\"value\": \"${value}\"}"
+        connect_timeout = "15s"
+        enable = true
+        enable_pipelining = 100
+        headers {Authorization = "Bearer redacted", "content-type" = "application/json"}
+        max_retries = 3
+        method = "post"
+        pool_size = 4
+        pool_type = "random"
+        request_timeout = "15s"
+        resource_opts {
+            async_inflight_window = 100
+            auto_restart_interval = "60s"
+            enable_queue = false
+            health_check_interval = "15s"
+            max_queue_bytes = "1GB"
+            query_mode = "async"
+            worker_pool_size = 4
+        }
+        ssl {
+            ciphers = []
+            depth = 10
+            enable = true
+            reuse_sessions = true
+            secure_renegotiate = true
+            user_lookup_fun = "emqx_tls_psk:lookup"
+            verify = "verify_none"
+            versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
+        }
+        url = "https://127.0.0.1:18083"
+    }
+}
+}

+ 17 - 5
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -621,24 +621,36 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul
     BridgeIDs0 =
         lists:map(
             fun(BridgeID) ->
-                emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false})
+                %% FIXME: this supposedly returns an upgraded type, but it's fuzzy: it
+                %% returns v1 types when attempting to "upgrade".....
+                {Type, Name} =
+                    emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}),
+                case emqx_action_info:is_action_type(Type) of
+                    true -> {action, Type, Name};
+                    false -> {bridge_v1, Type, Name}
+                end
             end,
             get_referenced_hookpoints(Froms)
         ),
     BridgeIDs1 =
         lists:filtermap(
             fun
-                ({bridge_v2, Type, Name}) -> {true, {Type, Name}};
-                ({bridge, Type, Name, _ResId}) -> {true, {Type, Name}};
+                ({bridge_v2, Type, Name}) -> {true, {action, Type, Name}};
+                ({bridge, Type, Name, _ResId}) -> {true, {bridge_v1, Type, Name}};
                 (_) -> false
             end,
             Actions
         ),
     NonExistentBridgeIDs =
         lists:filter(
-            fun({Type, Name}) ->
+            fun({Kind, Type, Name}) ->
+                LookupFn =
+                    case Kind of
+                        action -> fun emqx_bridge_v2:lookup/2;
+                        bridge_v1 -> fun emqx_bridge:lookup/2
+                    end,
                 try
-                    case emqx_bridge:lookup(Type, Name) of
+                    case LookupFn(Type, Name) of
                         {ok, _} -> false;
                         {error, _} -> true
                     end

+ 10 - 0
changes/ce/feat-12201.en.md

@@ -0,0 +1,10 @@
+Support hot update of TCP/SSL/WS/WSS MQTT listeners configuration, which allows changing most of the configuration parameters without restarting the listener and disconnecting the clients.
+
+In case of TCP/SSL listeners, changes to the following parameters still require full listener restart:
+ * `bind`
+ * `tcp_options.backlog`
+
+In case of WS/WSS listeners, any parameter can be freely changed without losing the connected clients. However, changing transport related parameters will cause listening socket to be re-opened, namely:
+ * `bind`
+ * `tcp_options.*`
+ * `ssl_options.*`

+ 3 - 1
changes/ce/fix-12081.en.md

@@ -1,7 +1,9 @@
-Updated `gen_rpc` library to version 3.3.0. The new version includes
+Updated `gen_rpc` library to version 3.3.1. The new version includes
 several performance improvements:
 
 - Avoid allocating extra memory for the packets before they are sent
   to the wire in some cases
 
 - Bypass network for the local calls
+
+- Avoid senstive data leaking in debug logs [#12202](https://github.com/emqx/emqx/pull/12202)

+ 1 - 0
changes/ce/fix-12180.en.md

@@ -0,0 +1 @@
+Fix an issue where DTLS enabled MQTT-SN gateways could not be started, caused by incompatibility of default listener configuration with the DTLS implementation.

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.4.0-alpha.2
+version: 5.4.0-rc.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.4.0-alpha.2
+appVersion: 5.4.0-rc.1

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.4.0-alpha.2
+version: 5.4.0-rc.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.4.0-alpha.2
+appVersion: 5.4.0-rc.1

+ 2 - 2
mix.exs

@@ -53,10 +53,10 @@ defmodule EMQXUmbrella.MixProject do
       {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
       {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
-      {:esockd, github: "emqx/esockd", tag: "5.9.9", override: true},
+      {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true},
       {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.17.0", override: true},
-      {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.0", override: true},
+      {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.15", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true},

+ 2 - 2
rebar.config

@@ -69,10 +69,10 @@
     , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
-    , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}}
+    , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}
     , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}}
-    , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}}
+    , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.15"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}}

+ 7 - 0
rel/i18n/emqx_bridge_redis.hocon

@@ -1,5 +1,12 @@
 emqx_bridge_redis {
 
+redis_type.label:
+"""Redis Type"""
+redis_type.desc:
+"""Single mode. Must be set to 'single' when Redis server is running in single mode.
+Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode.
+Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode."""
+
 command_template.desc:
 """Redis command template used to export messages. Each list element stands for a command name or its argument.
 For example, to push payloads in a Redis list by key `msgs`, the elements should be the following:

+ 0 - 7
rel/i18n/emqx_bridge_redis_schema.hocon

@@ -10,13 +10,6 @@ producer_action.desc:
 producer_action.label:
 """Action Parameters"""
 
-redis_type.label:
-"""Redis Type"""
-redis_type.desc:
-"""Single mode. Must be set to 'single' when Redis server is running in single mode.
-Sentinel mode. Must be set to 'sentinel' when Redis server is running in sentinel mode.
-Cluster mode. Must be set to 'cluster' when Redis server is running in clustered mode."""
-
 batch_size.label:
 """Batch Size"""
 batch_size.desc: