Просмотр исходного кода

Merge remote-tracking branch 'upstream/release-583' into release-58

Ivan Dyachkov 1 год назад
Родитель
Сommit
553a5c06d8
73 измененных файлов с 1445 добавлено и 595 удалено
  1. 3 0
      apps/emqx/etc/vm.args.cloud
  2. 3 5
      apps/emqx/src/emqx_ws_connection.erl
  3. 0 106
      apps/emqx/test/emqx_common_test_helpers.erl
  4. 3 2
      apps/emqx/test/emqx_ws_connection_SUITE.erl
  5. 2 2
      apps/emqx_bridge_azure_event_hub/rebar.config
  6. 2 2
      apps/emqx_bridge_confluent/rebar.config
  7. 15 64
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
  8. 2 2
      apps/emqx_bridge_kafka/rebar.config
  9. 38 97
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
  10. 52 0
      apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
  11. 1 1
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
  12. 36 4
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
  13. 2 2
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl
  14. 12 4
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl
  15. 87 16
      apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl
  16. 2 6
      apps/emqx_dashboard/etc/emqx_dashboard.conf
  17. 2 0
      apps/emqx_dashboard/src/emqx_dashboard_api.erl
  18. 10 2
      apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl
  19. 18 2
      apps/emqx_durable_storage/src/emqx_ds_lts.erl
  20. 19 23
      apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl
  21. 384 0
      apps/emqx_durable_storage/src/emqx_dsx.erl
  22. 47 2
      apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl
  23. 1 1
      apps/emqx_ldap/src/emqx_ldap.app.src
  24. 29 25
      apps/emqx_ldap/src/emqx_ldap_filter_parser.yrl
  25. 11 0
      apps/emqx_ldap/test/emqx_ldap_filter_SUITE.erl
  26. 141 69
      apps/emqx_machine/src/emqx_machine_boot.erl
  27. 9 3
      apps/emqx_machine/src/emqx_machine_sup.erl
  28. 1 1
      apps/emqx_machine/src/emqx_machine_terminator.erl
  29. 24 20
      apps/emqx_machine/src/user_default.erl
  30. 8 0
      apps/emqx_machine/test/emqx_machine_SUITE.erl
  31. 7 1
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  32. 1 1
      apps/emqx_message_transformation/src/emqx_message_transformation.app.src
  33. 2 2
      apps/emqx_message_transformation/src/emqx_message_transformation_app.erl
  34. 1 1
      apps/emqx_oracle/src/emqx_oracle.app.src
  35. 7 1
      apps/emqx_oracle/src/emqx_oracle.erl
  36. 4 0
      apps/emqx_plugins/test/emqx_plugins_SUITE.erl
  37. 87 18
      apps/emqx_resource/src/emqx_resource_cache_cleaner.erl
  38. 4 1
      apps/emqx_resource/src/emqx_resource_manager.erl
  39. 9 3
      apps/emqx_resource/test/emqx_connector_demo.erl
  40. 82 0
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  41. 9 6
      apps/emqx_retainer/src/emqx_retainer.erl
  42. 36 19
      apps/emqx_retainer/test/emqx_retainer_SUITE.erl
  43. 2 2
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  44. 159 0
      apps/emqx_rule_engine/test/emqx_rule_engine_api_cluster_SUITE.erl
  45. 15 55
      apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl
  46. 1 0
      apps/emqx_utils/src/emqx_variform.erl
  47. 2 10
      apps/emqx_utils/src/emqx_variform_bif.erl
  48. 12 0
      apps/emqx_utils/test/emqx_variform_tests.erl
  49. 3 0
      changes/ce/feat-14147.en.md
  50. 1 0
      changes/ce/fix-14113.en.md
  51. 2 0
      changes/ce/fix-14117.en.md
  52. 1 0
      changes/ce/fix-14160.en.md
  53. 1 0
      changes/ce/fix-14172.en.md
  54. 1 0
      changes/ce/fix-14178.en.md
  55. 1 0
      changes/ce/fix-14180.en.md
  56. 1 0
      changes/ce/fix-14201.en.md
  57. 2 0
      changes/ce/fix-14215.en.md
  58. 4 0
      changes/ee/feat-14166.en.md
  59. 5 0
      changes/ee/feat-14176.en.md
  60. 6 0
      changes/ee/fix-14126.en.md
  61. 4 0
      changes/ee/fix-14181.en.md
  62. 1 0
      dev
  63. 3 3
      mix.exs
  64. 1 1
      rebar.config
  65. 1 1
      rel/config/examples/dashboard-with-http.conf.example
  66. 1 1
      rel/config/examples/dashboard-with-https.conf.example
  67. 0 1
      rel/config/examples/listeners.quic.conf.example
  68. 0 1
      rel/config/examples/listeners.ssl.conf.example
  69. 0 1
      rel/config/examples/listeners.tcp.conf.example
  70. 0 1
      rel/config/examples/listeners.ws.conf.example
  71. 0 1
      rel/config/examples/listeners.wss.conf.example
  72. 2 2
      rel/i18n/emqx_bridge_rabbitmq_connector_schema.hocon
  73. 0 1
      rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon

+ 3 - 0
apps/emqx/etc/vm.args.cloud

@@ -119,6 +119,9 @@
 ## See: http://erlang.org/doc/man/erl.html
 -shutdown_time 30000
 
+## Disable the code path caching feature to allow adding dynamic patch path using `-pa`.
+-cache_boot_paths false
+
 ## patches dir
 -pa "{{ platform_data_dir }}/patches"
 

+ 3 - 5
apps/emqx/src/emqx_ws_connection.erl

@@ -440,11 +440,9 @@ websocket_handle({Frame, _}, State) ->
 websocket_info({call, From, Req}, State) ->
     handle_call(From, Req, State);
 websocket_info({cast, rate_limit}, State) ->
-    Stats = #{
-        cnt => emqx_pd:reset_counter(incoming_pubs),
-        oct => emqx_pd:reset_counter(incoming_bytes)
-    },
-    return(postpone({check_gc, Stats}, State));
+    Cnt = emqx_pd:reset_counter(incoming_pubs),
+    Oct = emqx_pd:reset_counter(incoming_bytes),
+    return(postpone({check_gc, Cnt, Oct}, State));
 websocket_info({cast, Msg}, State) ->
     handle_info(Msg, State);
 websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->

+ 0 - 106
apps/emqx/test/emqx_common_test_helpers.erl

@@ -72,8 +72,6 @@
 ]).
 
 -export([
-    emqx_cluster/1,
-    emqx_cluster/2,
     start_ekka/0,
     start_epmd/0,
     start_peer/2,
@@ -685,9 +683,6 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
 %% Clusterisation and multi-node testing
 %%
 
--type cluster_spec() :: [node_spec()].
--type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}.
--type role() :: core | replicant.
 -type shortname() :: atom().
 -type nodename() :: atom().
 -type node_opts() :: #{
@@ -722,58 +717,6 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
     listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
 }.
 
--spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].
-emqx_cluster(Specs) ->
-    emqx_cluster(Specs, #{}).
-
--spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}].
-emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) ->
-    emqx_cluster(Specs, maps:from_list(CommonOpts));
-emqx_cluster(Specs0, CommonOpts) ->
-    Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
-    Specs = expand_node_specs(Specs1, CommonOpts),
-    %% Assign grpc ports
-    GenRpcPorts = maps:from_list([
-        {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}}
-     || {{_, Name, _}, Num} <- Specs
-    ]),
-    %% Set the default node of the cluster:
-    CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
-    JoinTo =
-        case CoreNodes of
-            [First | _] -> First;
-            _ -> undefined
-        end,
-    NodeOpts = fun(Number) ->
-        #{
-            base_port => base_port(Number),
-            env => [
-                {mria, core_nodes, CoreNodes},
-                {gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
-            ]
-        }
-    end,
-    RoleOpts = fun
-        (core) ->
-            #{
-                join_to => JoinTo,
-                env => [
-                    {mria, node_role, core}
-                ]
-            };
-        (replicant) ->
-            #{
-                env => [
-                    {mria, node_role, replicant},
-                    {ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}}
-                ]
-            }
-    end,
-    [
-        {Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)}
-     || {{Role, Name, Opts}, Number} <- Specs
-    ].
-
 %% Lower level starting API
 
 -spec start_peer(shortname(), node_opts()) -> nodename().
@@ -999,26 +942,10 @@ node_name(Name) ->
             list_to_atom(atom_to_list(Name) ++ "@" ++ host())
     end.
 
-gen_node_name(Num) ->
-    list_to_atom("autocluster_node" ++ integer_to_list(Num)).
-
 host() ->
     [_, Host] = string:tokens(atom_to_list(node()), "@"),
     Host.
 
-merge_opts(Opts1, Opts2) ->
-    maps:merge_with(
-        fun
-            (env, Env1, Env2) -> lists:usort(Env2 ++ Env1);
-            (conf, Conf1, Conf2) -> lists:usort(Conf2 ++ Conf1);
-            (apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
-            (load_apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
-            (_Option, _Old, Value) -> Value
-        end,
-        Opts1,
-        Opts2
-    ).
-
 set_envs(Node, Env) ->
     lists:foreach(
         fun({Application, Key, Value}) ->
@@ -1040,9 +967,6 @@ is_lib(Path) ->
 
 %% Ports
 
-base_port(Number) ->
-    10000 + Number * 100.
-
 gen_rpc_port(BasePort) ->
     BasePort - 1.
 
@@ -1060,36 +984,6 @@ listener_port(BasePort, ws) ->
 listener_port(BasePort, wss) ->
     BasePort + 4.
 
-%% Autocluster helpers
-
-expand_node_specs(Specs, CommonOpts) ->
-    lists:map(
-        fun({Spec, Num}) ->
-            {
-                case Spec of
-                    core ->
-                        {core, gen_node_name(Num), CommonOpts};
-                    replicant ->
-                        {replicant, gen_node_name(Num), CommonOpts};
-                    {Role, Name} when is_atom(Name) ->
-                        {Role, Name, CommonOpts};
-                    {Role, Opts} when is_list(Opts) ->
-                        Opts1 = maps:from_list(Opts),
-                        {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts1)};
-                    {Role, Name, Opts} when is_list(Opts) ->
-                        Opts1 = maps:from_list(Opts),
-                        {Role, Name, merge_opts(CommonOpts, Opts1)};
-                    {Role, Opts} ->
-                        {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts)};
-                    {Role, Name, Opts} ->
-                        {Role, Name, merge_opts(CommonOpts, Opts)}
-                end,
-                Num
-            }
-        end,
-        Specs
-    ).
-
 %% Useful when iterating on the tests in a loop, to get rid of all the garbaged printed
 %% before the test itself beings.
 %% Only actually does anything if the environment variable `CLEAR_SCREEN' is set to `true'

+ 3 - 2
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -378,8 +378,9 @@ t_websocket_info_rate_limit(_) ->
     {ok, _} = websocket_info({cast, rate_limit}, st()),
     ok = timer:sleep(1),
     receive
-        {check_gc, Stats} ->
-            ?assertEqual(#{cnt => 0, oct => 0}, Stats)
+        {check_gc, Cnt, Oct} ->
+            ?assertEqual(0, Cnt),
+            ?assertEqual(0, Oct)
     after 0 -> error(expect_check_gc)
     end.
 

+ 2 - 2
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,8 +2,8 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.3"},
-    {kafka_protocol, "4.1.9"},
+    {wolff, "4.0.4"},
+    {kafka_protocol, "4.1.10"},
     {brod_gssapi, "0.1.3"},
     {brod, "4.3.1"},
     {snappyer, "1.2.10"},

+ 2 - 2
apps/emqx_bridge_confluent/rebar.config

@@ -2,8 +2,8 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.3"},
-    {kafka_protocol, "4.1.9"},
+    {wolff, "4.0.4"},
+    {kafka_protocol, "4.1.10"},
     {brod_gssapi, "0.1.3"},
     {brod, "4.3.1"},
     {snappyer, "1.2.10"},

+ 15 - 64
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -589,65 +589,6 @@ projection_optional_span(Trace) ->
      || #{?snk_kind := K} = Evt <- Trace
     ].
 
-cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
-        [
-            {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_gcp_pubsub, emqx_bridge]},
-            {listener_ports, []},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {start_autocluster, true},
-            {schema_mod, emqx_enterprise_schema},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, [broker]),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
-    ),
-    ct:pal("cluster: ~p", [Cluster]),
-    Cluster.
-
-start_cluster(Cluster) ->
-    Nodes = lists:map(
-        fun({Name, Opts}) ->
-            ct:pal("starting ~p", [Name]),
-            emqx_common_test_helpers:start_peer(Name, Opts)
-        end,
-        Cluster
-    ),
-    NumNodes = length(Nodes),
-    on_exit(fun() ->
-        emqx_utils:pmap(
-            fun(N) ->
-                ct:pal("stopping ~p", [N]),
-                emqx_common_test_helpers:stop_peer(N)
-            end,
-            Nodes
-        )
-    end),
-    {ok, _} = snabbkaffe:block_until(
-        %% -1 because only those that join the first node will emit the event.
-        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
-        30_000
-    ),
-    Nodes.
-
-wait_for_cluster_rpc(Node) ->
-    %% need to wait until the config handler is ready after
-    %% restarting during the cluster join.
-    ?retry(
-        _Sleep0 = 100,
-        _Attempts0 = 50,
-        true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
-    ).
-
 setup_and_start_listeners(Node, NodeOpts) ->
     erpc:call(
         Node,
@@ -686,6 +627,10 @@ dedup(_X, [Y | Rest]) ->
 dedup(_X, []) ->
     [].
 
+get_mqtt_port(Node) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.
+
 %%------------------------------------------------------------------------------
 %% Trace properties
 %%------------------------------------------------------------------------------
@@ -2278,12 +2223,19 @@ t_cluster_subscription(Config) ->
         }
     ] = ?config(topic_mapping, Config),
     BridgeId = bridge_id(Config),
-    Cluster = [{_N1, Opts1} | _] = cluster(Config),
+    AppSpecs = [emqx_conf, emqx_rule_engine, emqx_bridge_gcp_pubsub, emqx_bridge],
     ?check_trace(
         begin
-            Nodes = [N1, N2] = start_cluster(Cluster),
+            Nodes =
+                [N1, N2] = emqx_cth_cluster:start(
+                    [
+                        {gcp_pubsub_consumer_subscription1, #{apps => AppSpecs}},
+                        {gcp_pubsub_consumer_subscription2, #{apps => AppSpecs}}
+                    ],
+                    #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+                ),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             NumNodes = length(Nodes),
-            lists:foreach(fun wait_for_cluster_rpc/1, Nodes),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             lists:foreach(
                 fun(N) ->
@@ -2303,8 +2255,7 @@ t_cluster_subscription(Config) ->
                 10_000
             ),
 
-            setup_and_start_listeners(N1, Opts1),
-            TCPPort1 = emqx_common_test_helpers:listener_port(Opts1, tcp),
+            TCPPort1 = get_mqtt_port(N1),
             {ok, C1} = emqtt:start_link([{port, TCPPort1}, {proto_ver, v5}]),
             on_exit(fun() -> catch emqtt:stop(C1) end),
             {ok, _} = emqtt:connect(C1),

+ 2 - 2
apps/emqx_bridge_kafka/rebar.config

@@ -2,8 +2,8 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.3"},
-    {kafka_protocol, "4.1.9"},
+    {wolff, "4.0.4"},
+    {kafka_protocol, "4.1.10"},
     {brod_gssapi, "0.1.3"},
     {brod, "4.3.1"},
     {snappyer, "1.2.10"},

+ 38 - 97
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -1009,7 +1009,7 @@ setup_group_subscriber_spy_fn() ->
 
 setup_group_subscriber_spy(TestPid) ->
     ok = meck:new(brod_group_subscriber_v2, [
-        passthrough, no_link, no_history, non_strict
+        passthrough, no_link, no_history
     ]),
     ok = meck:expect(
         brod_group_subscriber_v2,
@@ -1035,15 +1035,6 @@ setup_group_subscriber_spy(TestPid) ->
 kill_group_subscriber_spy() ->
     meck:unload(brod_group_subscriber_v2).
 
-wait_for_cluster_rpc(Node) ->
-    %% need to wait until the config handler is ready after
-    %% restarting during the cluster join.
-    ?retry(
-        _Sleep0 = 100,
-        _Attempts0 = 50,
-        true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
-    ).
-
 setup_and_start_listeners(Node, NodeOpts) ->
     erpc:call(
         Node,
@@ -1068,39 +1059,25 @@ setup_and_start_listeners(Node, NodeOpts) ->
         end
     ).
 
-cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
+cluster(TestCase, Config) ->
+    AppSpecs = [
+        emqx_conf,
+        emqx_rule_engine,
+        {emqx_bridge_kafka, #{after_start => setup_group_subscriber_spy_fn()}},
+        emqx_bridge
+    ],
+    NodeSpecs = emqx_cth_cluster:mk_nodespecs(
         [
-            {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]},
-            {listener_ports, []},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {start_autocluster, true},
-            {schema_mod, emqx_enterprise_schema},
-            {load_apps, [emqx_machine]},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, [broker]),
-                    ExtraEnvHandlerHook(),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
+            {node_name(TestCase, 1), #{apps => AppSpecs}},
+            {node_name(TestCase, 2), #{apps => AppSpecs}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
     ),
-    ct:pal("cluster: ~p", [Cluster]),
-    Cluster.
+    ct:pal("cluster: ~p", [NodeSpecs]),
+    NodeSpecs.
 
-start_peer(Name, Opts) ->
-    Node = emqx_common_test_helpers:start_peer(Name, Opts),
-    % Make it possible to call `ct:pal` and friends (if running under rebar3)
-    _ = emqx_cth_cluster:share_load_module(Node, cthr),
-    Node.
+node_name(TestCase, N) ->
+    binary_to_atom(iolist_to_binary(io_lib:format("~s_~b", [TestCase, N]))).
 
 start_async_publisher(Config, KafkaTopic) ->
     TId = ets:new(kafka_payloads, [public, ordered_set]),
@@ -1156,6 +1133,10 @@ health_check(Node, Config) ->
         {ok, Status}
     end).
 
+get_mqtt_port(Node) ->
+    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1763,29 +1744,16 @@ t_cluster_group(Config) ->
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
-    Cluster = cluster(Config),
+    Cluster = cluster(?FUNCTION_NAME, Config),
     ?check_trace(
         begin
-            Nodes =
-                [_N1, N2 | _] = [
-                    start_peer(Name, Opts)
-                 || {Name, Opts} <- Cluster
-                ],
-            on_exit(fun() ->
-                emqx_utils:pmap(
-                    fun(N) ->
-                        ct:pal("stopping ~p", [N]),
-                        ok = emqx_common_test_helpers:stop_peer(N)
-                    end,
-                    Nodes
-                )
-            end),
+            Nodes = [_N1, N2 | _] = emqx_cth_cluster:start(Cluster),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             {ok, SRef0} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
                 length(Nodes),
                 15_000
             ),
-            wait_for_cluster_rpc(N2),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             {ok, _} = snabbkaffe:receive_events(SRef0),
             lists:foreach(
@@ -1845,15 +1813,15 @@ t_node_joins_existing_cluster(Config) ->
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
-    Cluster = cluster(Config),
+    Cluster = cluster(?FUNCTION_NAME, Config),
     ?check_trace(
         begin
-            [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
-            ct:pal("starting ~p", [Name1]),
-            N1 = start_peer(Name1, Opts1),
+            [NodeSpec1, NodeSpec2 | _] = Cluster,
+            ct:pal("starting ~p", [NodeSpec1]),
+            [N1] = emqx_cth_cluster:start([NodeSpec1]),
             on_exit(fun() ->
                 ct:pal("stopping ~p", [N1]),
-                ok = emqx_common_test_helpers:stop_peer(N1)
+                ok = emqx_cth_cluster:stop([N1])
             end),
             {{ok, _}, {ok, _}} =
                 ?wait_async_action(
@@ -1880,8 +1848,7 @@ t_node_joins_existing_cluster(Config) ->
             ),
 
             %% Now, we start the second node and have it join the cluster.
-            setup_and_start_listeners(N1, Opts1),
-            TCPPort1 = emqx_common_test_helpers:listener_port(Opts1, tcp),
+            TCPPort1 = get_mqtt_port(N1),
             {ok, C1} = emqtt:start_link([{port, TCPPort1}, {proto_ver, v5}]),
             on_exit(fun() -> catch emqtt:stop(C1) end),
             {ok, _} = emqtt:connect(C1),
@@ -1892,14 +1859,13 @@ t_node_joins_existing_cluster(Config) ->
                 1,
                 30_000
             ),
-            ct:pal("starting ~p", [Name2]),
-            N2 = start_peer(Name2, Opts2),
+            ct:pal("starting ~p", [NodeSpec2]),
+            [N2] = emqx_cth_cluster:start([NodeSpec2]),
             on_exit(fun() ->
                 ct:pal("stopping ~p", [N2]),
-                ok = emqx_common_test_helpers:stop_peer(N2)
+                ok = emqx_cth_cluster:stop([N2])
             end),
             Nodes = [N1, N2],
-            wait_for_cluster_rpc(N2),
 
             {ok, _} = snabbkaffe:receive_events(SRef0),
             ?retry(
@@ -1977,40 +1943,16 @@ t_cluster_node_down(Config) ->
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
-    Cluster = cluster(Config),
+    Cluster = cluster(?FUNCTION_NAME, Config),
     ?check_trace(
         begin
-            {_N2, Opts2} = lists:nth(2, Cluster),
-            NumNodes = length(Cluster),
-            Nodes =
-                [N1, N2 | _] =
-                lists:map(
-                    fun({Name, Opts}) ->
-                        ct:pal("starting ~p", [Name]),
-                        start_peer(Name, Opts)
-                    end,
-                    Cluster
-                ),
-            on_exit(fun() ->
-                emqx_utils:pmap(
-                    fun(N) ->
-                        ct:pal("stopping ~p", [N]),
-                        ok = emqx_common_test_helpers:stop_peer(N)
-                    end,
-                    Nodes
-                )
-            end),
+            Nodes = [N1, N2 | _] = emqx_cth_cluster:start(Cluster),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             {ok, SRef0} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
                 length(Nodes),
                 15_000
             ),
-            wait_for_cluster_rpc(N2),
-            {ok, _} = snabbkaffe:block_until(
-                %% -1 because only those that join the first node will emit the event.
-                ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
-                30_000
-            ),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             {ok, _} = snabbkaffe:receive_events(SRef0),
             lists:foreach(
@@ -2031,8 +1973,7 @@ t_cluster_node_down(Config) ->
 
             %% Now, we stop one of the nodes and watch the group
             %% rebalance.
-            setup_and_start_listeners(N2, Opts2),
-            TCPPort = emqx_common_test_helpers:listener_port(Opts2, tcp),
+            TCPPort = get_mqtt_port(N2),
             {ok, C} = emqtt:start_link([{port, TCPPort}, {proto_ver, v5}]),
             on_exit(fun() -> catch emqtt:stop(C) end),
             {ok, _} = emqtt:connect(C),
@@ -2040,7 +1981,7 @@ t_cluster_node_down(Config) ->
             {TId, Pid} = start_async_publisher(Config, KafkaTopic),
 
             ct:pal("stopping node ~p", [N1]),
-            ok = emqx_common_test_helpers:stop_peer(N1),
+            ok = emqx_cth_cluster:stop([N1]),
 
             %% Give some time for the consumers in remaining node to
             %% rebalance.

+ 52 - 0
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -795,3 +795,55 @@ t_table_removed(Config) ->
         []
     ),
     ok.
+
+t_update_with_invalid_prepare(Config) ->
+    reset_table(Config),
+
+    {ok, _} = create_bridge_api(Config),
+
+    %% retainx is a bad column name
+    BadSQL =
+        <<"INSERT INTO mqtt_test(topic, msgid, payload, retainx) VALUES (${topic}, ${id}, ${payload}, ${retain})">>,
+
+    Override = #{<<"sql">> => BadSQL},
+    {ok, Body1} =
+        update_bridge_api(Config, Override),
+
+    ?assertMatch(#{<<"status">> := <<"disconnected">>}, Body1),
+    Error1 = maps:get(<<"status_reason">>, Body1),
+    case re:run(Error1, <<"unhealthy_target">>, [{capture, none}]) of
+        match ->
+            ok;
+        nomatch ->
+            ct:fail(#{
+                expected_pattern => "undefined_column",
+                got => Error1
+            })
+    end,
+
+    %% assert that although there was an error returned, the invliad SQL is actually put
+    BridgeName = ?config(oracle_name, Config),
+    C1 = [{action_name, BridgeName}, {action_type, oracle} | Config],
+    {ok, {{_, 200, "OK"}, _, Action}} = emqx_bridge_v2_testlib:get_action_api(C1),
+    #{<<"parameters">> := #{<<"sql">> := FetchedSQL}} = Action,
+    ?assertEqual(FetchedSQL, BadSQL),
+
+    %% update again with the original sql
+    {ok, Body2} = update_bridge_api(Config),
+    %% the error should be gone now, and status should be 'connected'
+    ?assertMatch(#{<<"status">> := <<"connected">>}, Body2),
+    %% finally check if ecpool worker should have exactly one of reconnect callback
+    ConnectorResId = <<"connector:oracle:", BridgeName/binary>>,
+    Workers = ecpool:workers(ConnectorResId),
+    [_ | _] = WorkerPids = lists:map(fun({_, Pid}) -> Pid end, Workers),
+    lists:foreach(
+        fun(Pid) ->
+            [{emqx_oracle, prepare_sql_to_conn, Args}] =
+                ecpool_worker:get_reconnect_callbacks(Pid),
+            Sig = emqx_postgresql:get_reconnect_callback_signature(Args),
+            BridgeResId = <<"action:oracle:", BridgeName/binary, $:, ConnectorResId/binary>>,
+            ?assertEqual(BridgeResId, Sig)
+        end,
+        WorkerPids
+    ),
+    ok.

+ 1 - 1
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_rabbitmq, [
     {description, "EMQX Enterprise RabbitMQ Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {mod, {emqx_bridge_rabbitmq_app, []}},
     {applications, [

+ 36 - 4
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -314,8 +314,8 @@ publish_messages(
     Conn,
     RabbitMQ,
     DeliveryMode,
-    Exchange,
-    RoutingKey,
+    Exchange0,
+    RoutingKey0,
     PayloadTmpl,
     Messages,
     WaitForPublishConfirmations,
@@ -328,14 +328,19 @@ publish_messages(
                 headers = [],
                 delivery_mode = DeliveryMode
             },
+
+            Exchange = render_template(Exchange0, Messages),
+            RoutingKey = render_template(RoutingKey0, Messages),
             Method = #'basic.publish'{
                 exchange = Exchange,
                 routing_key = RoutingKey
             },
+
             FormattedMsgs = [
                 format_data(PayloadTmpl, M)
              || {_, M} <- Messages
             ],
+
             emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
                 messages => FormattedMsgs,
                 properties => #{
@@ -390,6 +395,20 @@ format_data([], Msg) ->
 format_data(Tokens, Msg) ->
     emqx_placeholder:proc_tmpl(Tokens, Msg).
 
+%% Dynamic `exchange` and `routing_key` are restricted in batch mode,
+%% we assume these two values ​​are the same in a batch.
+render_template({fixed, Data}, _) ->
+    Data;
+render_template(Template, [Req | _]) ->
+    render_template(Template, Req);
+render_template({dynamic, Template}, {_, Message}) ->
+    try
+        erlang:iolist_to_binary(emqx_template:render_strict(Template, {emqx_jsonish, Message}))
+    catch
+        error:_Errors ->
+            erlang:throw(bad_template)
+    end.
+
 handle_result({error, ecpool_empty}) ->
     {error, {recoverable_error, ecpool_empty}};
 handle_result(Res) ->
@@ -444,16 +463,29 @@ init_secret() ->
 preproc_parameter(#{config_root := actions, parameters := Parameter}) ->
     #{
         payload_template := PayloadTemplate,
-        delivery_mode := InitialDeliveryMode
+        delivery_mode := InitialDeliveryMode,
+        exchange := Exchange,
+        routing_key := RoutingKey
     } = Parameter,
     Parameter#{
         delivery_mode => delivery_mode(InitialDeliveryMode),
         payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
-        config_root => actions
+        config_root => actions,
+        exchange := preproc_template(Exchange),
+        routing_key := preproc_template(RoutingKey)
     };
 preproc_parameter(#{config_root := sources, parameters := Parameter, hookpoints := Hooks}) ->
     Parameter#{hookpoints => Hooks, config_root => sources}.
 
+preproc_template(Template0) ->
+    Template = emqx_template:parse(Template0),
+    case emqx_template:placeholders(Template) of
+        [] ->
+            {fixed, emqx_utils_conv:bin(Template0)};
+        [_ | _] ->
+            {dynamic, Template}
+    end.
+
 delivery_mode(non_persistent) -> 1;
 delivery_mode(persistent) -> 2.
 

+ 2 - 2
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_pubsub_schema.erl

@@ -75,7 +75,7 @@ fields(action_parameters) ->
             )},
         {exchange,
             hoconsc:mk(
-                typerefl:binary(),
+                emqx_schema:template(),
                 #{
                     required => true,
                     desc => ?DESC(?CONNECTOR_SCHEMA, "exchange")
@@ -83,7 +83,7 @@ fields(action_parameters) ->
             )},
         {routing_key,
             hoconsc:mk(
-                typerefl:binary(),
+                emqx_schema:template(),
                 #{
                     required => true,
                     desc => ?DESC(?CONNECTOR_SCHEMA, "routing_key")

+ 12 - 4
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_source_worker.erl

@@ -35,13 +35,13 @@ handle_cast(_Request, State) ->
     {noreply, State}.
 
 handle_info(
-    {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{
+    {#'basic.deliver'{delivery_tag = Tag} = BasicDeliver, #amqp_msg{
         payload = Payload,
         props = PBasic
     }},
     {Channel, InstanceId, Params} = State
 ) ->
-    Message = to_map(PBasic, Payload),
+    Message = to_map(BasicDeliver, PBasic, Params, Payload),
     #{hookpoints := Hooks, no_ack := NoAck} = Params,
     lists:foreach(fun(Hook) -> emqx_hooks:run(Hook, [Message]) end, Hooks),
     (NoAck =:= false) andalso
@@ -53,7 +53,9 @@ handle_info(#'basic.cancel_ok'{}, State) ->
 handle_info(_Info, State) ->
     {noreply, State}.
 
-to_map(PBasic, Payload) ->
+to_map(BasicDeliver, PBasic, Params, Payload) ->
+    #'basic.deliver'{exchange = Exchange, routing_key = RoutingKey} = BasicDeliver,
+
     #'P_basic'{
         content_type = ContentType,
         content_encoding = ContentEncoding,
@@ -70,6 +72,9 @@ to_map(PBasic, Payload) ->
         app_id = AppId,
         cluster_id = ClusterId
     } = PBasic,
+
+    #{queue := Queue} = Params,
+
     Message = #{
         <<"payload">> => make_payload(Payload),
         <<"content_type">> => ContentType,
@@ -85,7 +90,10 @@ to_map(PBasic, Payload) ->
         <<"type">> => Type,
         <<"user_id">> => UserId,
         <<"app_id">> => AppId,
-        <<"cluster_id">> => ClusterId
+        <<"cluster_id">> => ClusterId,
+        <<"exchange">> => Exchange,
+        <<"routing_key">> => RoutingKey,
+        <<"queue">> => Queue
     },
     maps:filtermap(fun(_K, V) -> V =/= undefined andalso V =/= <<"undefined">> end, Message).
 

+ 87 - 16
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl

@@ -95,10 +95,10 @@ rabbitmq_source() ->
     },
     parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name).
 
-rabbitmq_action() ->
-    rabbitmq_action(rabbit_mq_exchange()).
+rabbitmq_action(TestCase) ->
+    rabbitmq_action(TestCase, rabbit_mq_exchange(TestCase)).
 
-rabbitmq_action(Exchange) ->
+rabbitmq_action(TestCase, Exchange) ->
     Name = atom_to_binary(?MODULE),
     Action = #{
         <<"actions">> => #{
@@ -109,7 +109,7 @@ rabbitmq_action(Exchange) ->
                     <<"parameters">> => #{
                         <<"exchange">> => Exchange,
                         <<"payload_template">> => <<"${.payload}">>,
-                        <<"routing_key">> => rabbit_mq_routing_key(),
+                        <<"routing_key">> => rabbit_mq_routing_key(TestCase),
                         <<"delivery_mode">> => <<"non_persistent">>,
                         <<"publish_confirmation_timeout">> => <<"30s">>,
                         <<"wait_for_publish_confirmations">> => true
@@ -138,11 +138,11 @@ create_source(Name) ->
 delete_source(Name) ->
     ok = emqx_bridge_v2:remove(sources, ?TYPE, Name).
 
-create_action(Name) ->
-    create_action(Name, rabbit_mq_exchange()).
+create_action(TestCase, Name) ->
+    create_action(TestCase, Name, rabbit_mq_exchange(TestCase)).
 
-create_action(Name, Exchange) ->
-    Action = rabbitmq_action(Exchange),
+create_action(TestCase, Name, Exchange) ->
+    Action = rabbitmq_action(TestCase, Exchange),
     {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action).
 
 delete_action(Name) ->
@@ -163,7 +163,13 @@ t_source(Config) ->
     Topic = <<"tesldkafd">>,
     {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
         #{
-            sql => <<"select * from \"$bridges/rabbitmq:", Name/binary, "\"">>,
+            sql =>
+                <<
+                    "select *, queue as payload.queue, exchange as payload.exchange,"
+                    "routing_key as payload.routing_key from \"$bridges/rabbitmq:",
+                    Name/binary,
+                    "\""
+                >>,
             id => atom_to_binary(?FUNCTION_NAME),
             actions => [
                 #{
@@ -187,7 +193,8 @@ t_source(Config) ->
     {ok, _} = emqtt:connect(C1),
     {ok, #{}, [0]} = emqtt:subscribe(C1, Topic, [{qos, 0}, {rh, 0}]),
     send_test_message_to_rabbitmq(Config),
-    PayloadBin = emqx_utils_json:encode(payload()),
+
+    Received = receive_messages(1),
     ?assertMatch(
         [
             #{
@@ -195,12 +202,21 @@ t_source(Config) ->
                 properties := undefined,
                 topic := Topic,
                 qos := 0,
-                payload := PayloadBin,
+                payload := _,
                 retain := false
             }
         ],
-        receive_messages(1)
+        Received
     ),
+    [#{payload := ReceivedPayload}] = Received,
+    Meta = #{
+        <<"exchange">> => rabbit_mq_exchange(),
+        <<"routing_key">> => rabbit_mq_routing_key(),
+        <<"queue">> => rabbit_mq_queue()
+    },
+    ExpectedPayload = maps:merge(payload(), Meta),
+    ?assertMatch(ExpectedPayload, emqx_utils_json:decode(ReceivedPayload)),
+
     ok = emqtt:disconnect(C1),
     InstanceId = instance_id(sources, Name),
     #{counters := Counters} = emqx_resource:get_metrics(InstanceId),
@@ -228,14 +244,14 @@ t_source_probe(_Config) ->
 
 t_action_probe(_Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
-    Action = rabbitmq_action(),
+    Action = rabbitmq_action(?FUNCTION_NAME),
     {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
     ?assertMatch({{_, 204, _}, _, _}, Res0),
     ok.
 
 t_action(Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
-    create_action(Name),
+    create_action(?FUNCTION_NAME, Name),
     Actions = emqx_bridge_v2:list(actions),
     Any = fun(#{name := BName}) -> BName =:= Name end,
     ?assert(lists:any(Any, Actions), Actions),
@@ -276,7 +292,7 @@ t_action(Config) ->
 
 t_action_not_exist_exchange(_Config) ->
     Name = atom_to_binary(?FUNCTION_NAME),
-    create_action(Name, <<"not_exist_exchange">>),
+    create_action(?FUNCTION_NAME, Name, <<"not_exist_exchange">>),
     Actions = emqx_bridge_v2:list(actions),
     Any = fun(#{name := BName}) -> BName =:= Name end,
     ?assert(lists:any(Any, Actions), Actions),
@@ -336,7 +352,7 @@ t_action_not_exist_exchange(_Config) ->
     ).
 
 t_replace_action_source(Config) ->
-    Action = #{<<"rabbitmq">> => #{<<"my_action">> => rabbitmq_action()}},
+    Action = #{<<"rabbitmq">> => #{<<"my_action">> => rabbitmq_action(?FUNCTION_NAME)}},
     Source = #{<<"rabbitmq">> => #{<<"my_source">> => rabbitmq_source()}},
     ConnectorName = atom_to_binary(?MODULE),
     Connector = #{<<"rabbitmq">> => #{ConnectorName => rabbitmq_connector(get_rabbitmq(Config))}},
@@ -389,6 +405,47 @@ t_replace_action_source(Config) ->
     ),
     ok.
 
+t_action_dynamic(Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    create_action(?FUNCTION_NAME, Name),
+    Actions = emqx_bridge_v2:list(actions),
+    Any = fun(#{name := BName}) -> BName =:= Name end,
+    ?assert(lists:any(Any, Actions), Actions),
+    Topic = <<"rabbitdynaction">>,
+    {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
+        #{
+            sql => <<"select * from \"", Topic/binary, "\"">>,
+            id => atom_to_binary(?FUNCTION_NAME),
+            actions => [<<"rabbitmq:", Name/binary>>],
+            description => <<"bridge_v2 send msg to rabbitmq action">>
+        }
+    ),
+    on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
+    {ok, C1} = emqtt:start_link([{clean_start, true}]),
+    {ok, _} = emqtt:connect(C1),
+    Payload = payload(?FUNCTION_NAME),
+    PayloadBin = emqx_utils_json:encode(Payload),
+    {ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]),
+    Msg = receive_message_from_rabbitmq(Config),
+    ?assertMatch(Payload, Msg),
+    ok = emqtt:disconnect(C1),
+    InstanceId = instance_id(actions, Name),
+    #{counters := Counters} = emqx_resource:get_metrics(InstanceId),
+    ok = delete_action(Name),
+    ActionsAfterDelete = emqx_bridge_v2:list(actions),
+    ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
+    ?assertMatch(
+        #{
+            dropped := 0,
+            success := 0,
+            matched := 1,
+            failed := 0,
+            received := 0
+        },
+        Counters
+    ),
+    ok.
+
 waiting_for_disconnected_alarms(InstanceId) ->
     waiting_for_disconnected_alarms(InstanceId, 0).
 
@@ -452,6 +509,10 @@ receive_messages(Count, Msgs) ->
 payload() ->
     #{<<"key">> => 42, <<"data">> => <<"RabbitMQ">>, <<"timestamp">> => 10000}.
 
+payload(t_action_dynamic) ->
+    Payload = payload(),
+    Payload#{<<"e">> => rabbit_mq_exchange(), <<"r">> => rabbit_mq_routing_key()}.
+
 send_test_message_to_rabbitmq(Config) ->
     #{channel := Channel} = get_channel_connection(Config),
     MessageProperties = #'P_basic'{
@@ -481,3 +542,13 @@ instance_id(Type, Name) ->
             actions -> <<"action:">>
         end,
     <<TypeBin/binary, BridgeId/binary, ":", ConnectorId/binary>>.
+
+rabbit_mq_exchange(t_action_dynamic) ->
+    <<"${payload.e}">>;
+rabbit_mq_exchange(_) ->
+    rabbit_mq_exchange().
+
+rabbit_mq_routing_key(t_action_dynamic) ->
+    <<"${payload.r}">>;
+rabbit_mq_routing_key(_) ->
+    rabbit_mq_routing_key().

+ 2 - 6
apps/emqx_dashboard/etc/emqx_dashboard.conf

@@ -1,12 +1,8 @@
 dashboard {
     listeners {
-        http {
-            ## Comment out 'bind' (or set bind=0) to disable listener.
-            bind = 18083
-        }
+        http.bind = 18083
+        # https.bind = 18084
         https {
-            ## Uncomment to enable
-            # bind = 18084
             ssl_options {
                 certfile = "${EMQX_ETC_DIR}/certs/cert.pem"
                 keyfile = "${EMQX_ETC_DIR}/certs/key.pem"

+ 2 - 0
apps/emqx_dashboard/src/emqx_dashboard_api.erl

@@ -104,6 +104,7 @@ schema("/users") ->
         get => #{
             tags => [<<"dashboard">>],
             desc => ?DESC(list_users_api),
+            security => [#{'bearerAuth' => []}],
             responses => #{
                 200 => mk(
                     array(hoconsc:ref(user)),
@@ -114,6 +115,7 @@ schema("/users") ->
         post => #{
             tags => [<<"dashboard">>],
             desc => ?DESC(create_user_api),
+            security => [#{'bearerAuth' => []}],
             'requestBody' => fields([username, password, role, description]),
             responses => #{
                 200 => fields([username, role, description, backend])

+ 10 - 2
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -122,9 +122,17 @@ t_basic_crud(_Config) ->
     ),
 
     {ok, 201, #{<<"id">> := QueueID2}} = Resp2,
+    Resp3 = api_get(["durable_queues"]),
     ?assertMatch(
-        {ok, #{<<"data">> := [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}]}},
-        api_get(["durable_queues"])
+        {ok, #{<<"data">> := [#{<<"id">> := _}, #{<<"id">> := _}]}},
+        Resp3
+    ),
+    ?assertMatch(
+        [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}],
+        begin
+            {ok, #{<<"data">> := Queues}} = Resp3,
+            lists:sort(emqx_utils_maps:key_comparer(<<"id">>), Queues)
+        end
     ),
 
     ?assertMatch(

+ 18 - 2
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -48,6 +48,7 @@
     static_key/0,
     varying/0,
     trie/0,
+    dump/0,
     msg_storage_key/0,
     learned_structure/0,
     threshold_spec/0,
@@ -78,13 +79,14 @@
 %% Fixed size binary or integer, depending on the options:
 -type static_key() :: non_neg_integer() | binary().
 
-%% Trie root:
+%% Trie roots:
 -define(PREFIX, prefix).
+-define(PREFIX_SPECIAL, special).
 %% Special prefix root for reverse lookups:
 -define(rlookup, rlookup).
 -define(rlookup(STATIC), {?rlookup, STATIC}).
 
--type state() :: static_key() | ?PREFIX.
+-type state() :: static_key() | ?PREFIX | ?PREFIX_SPECIAL.
 
 -type varying() :: [level() | ?PLUS].
 
@@ -231,6 +233,13 @@ trie_copy_learned_paths(OldTrie, NewTrie) ->
 
 %% @doc Lookup the topic key. Create a new one, if not found.
 -spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key().
+topic_key(Trie, ThresholdFun, [<<"$", _/bytes>> | _] = Tokens) ->
+    %% [MQTT-4.7.2-1]
+    %% Put any topic starting with `$` into a separate _special_ root.
+    %% Using a special root only when the topic and the filter start with $<X>
+    %% prevents special topics from matching with + or # pattern, but not with
+    %% $<X>/+ or $<X>/# pattern. See also `match_topics/2`.
+    do_topic_key(Trie, ThresholdFun, 0, ?PREFIX_SPECIAL, Tokens, [], []);
 topic_key(Trie, ThresholdFun, Tokens) ->
     do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []).
 
@@ -242,6 +251,13 @@ lookup_topic_key(Trie, Tokens) ->
 %% @doc Return list of keys of topics that match a given topic filter
 -spec match_topics(trie(), [level() | '+' | '#']) ->
     [msg_storage_key()].
+match_topics(Trie, [<<"$", _/bytes>> | _] = TopicFilter) ->
+    %% [MQTT-4.7.2-1]
+    %% Any topics starting with `$` should belong to a separate _special_ root.
+    %% Using a special root only when the topic and the filter start with $<X>
+    %% prevents special topics from matching with + or # pattern, but not with
+    %% $<X>/+ or $<X>/# pattern.
+    do_match_topics(Trie, ?PREFIX_SPECIAL, [], TopicFilter);
 match_topics(Trie, TopicFilter) ->
     do_match_topics(Trie, ?PREFIX, [], TopicFilter).
 

+ 19 - 23
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -199,35 +199,31 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
     ok = rocksdb:drop_column_family(DBHandle, TrieCF),
     ok.
 
-prepare_batch(
-    _ShardId,
-    S = #s{trie = Trie, threshold_fun = TFun},
-    Operations,
-    _Options
-) ->
+prepare_batch(_ShardId, S, Operations, _Options) ->
     _ = erase(?lts_persist_ops),
-    OperationsCooked = emqx_utils:flattermap(
-        fun
-            ({Timestamp, Msg = #message{topic = Topic}}) ->
-                Tokens = words(Topic),
-                {Static, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
-                ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
-            ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
-                case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
-                    {ok, {Static, Varying}} ->
-                        ?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete);
-                    undefined ->
-                        %% Topic is unknown, nothing to delete.
-                        []
-                end
-        end,
-        Operations
-    ),
+    OperationsCooked = cook(S, Operations, []),
     {ok, #{
         ?cooked_msg_ops => OperationsCooked,
         ?cooked_lts_ops => pop_lts_persist_ops()
     }}.
 
+cook(_, [], Acc) ->
+    lists:reverse(Acc);
+cook(S, [{Timestamp, Msg = #message{topic = Topic}} | Rest], Acc) ->
+    #s{trie = Trie, threshold_fun = TFun} = S,
+    Tokens = words(Topic),
+    {Static, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
+    cook(S, Rest, [?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg)) | Acc]);
+cook(S, [{delete, #message_matcher{topic = Topic, timestamp = Timestamp}} | Rest], Acc) ->
+    #s{trie = Trie} = S,
+    case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
+        {ok, {Static, Varying}} ->
+            cook(S, Rest, [?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete) | Acc]);
+        undefined ->
+            %% Topic is unknown, nothing to delete.
+            cook(S, Rest, Acc)
+    end.
+
 commit_batch(
     ShardId,
     #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},

+ 384 - 0
apps/emqx_durable_storage/src/emqx_dsx.erl

@@ -0,0 +1,384 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc EMQX DS eXplorer. A simple DS client process that can be used
+%% to inspect durable messages in the EMQX console or for
+%% benchmarking.
+-module(emqx_dsx).
+
+-behaviour(gen_server).
+
+%% API:
+-export([start_link/1, ls/0, stop/1, more/1]).
+%% Convenience API:
+-export([c/0, c/1, stop/0, more/0]).
+%% Callback definitions:
+-export([print/0, null/0, stats/1, create_stats_worker/1]).
+
+%% behaviour callbacks:
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
+
+%% internal exports:
+-export([]).
+
+-export_type([]).
+
+-include("emqx_ds.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
+-include_lib("kernel/include/logger.hrl").
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(name(REC), {n, l, {?MODULE, REC}}).
+-define(via(REC), {via, gproc, ?name(REC)}).
+
+-record(more_req, {}).
+
+-type callback() :: fun(
+    (emqx_ds:stream(), [{emqx_ds:message_key(), emqx_types:message()}], State | undefined) -> State
+).
+
+%%================================================================================
+%% API functions
+%%================================================================================
+
+%%%% Print callback
+-spec print() -> callback().
+print() ->
+    fun(Stream, Msgs, _) ->
+        lists:foreach(
+            fun({Key, Msg}) ->
+                io:format(user, "~w/~w:~n   ~p~n", [Stream, Key, emqx_message:to_map(Msg)])
+            end,
+            Msgs
+        )
+    end.
+
+%%%% Null callback
+-spec null() -> callback().
+null() ->
+    fun(_, _, _) ->
+        undefined
+    end.
+
+%%%% Stats callback
+%%
+%% @doc How to use this module in benchmark mode:
+%%
+%% 1. Start statistics server (`create_stats_worker(foo)')
+%%
+%% 2. Create clients with stats callback, with name of the worker passed:
+%% `emqx_dsx:start_link(#{topic => ..., callback => stats(foo)})'
+%%
+%% 3. Get stats by calling
+%% `emqx_metrics_worker:get_metrics(foo, foo).'.
+create_stats_worker(Name) ->
+    {ok, _} = emqx_metrics_worker:start_link(Name),
+    Metrics = [{counter, n_batches}, {counter, n_msgs}, {counter, n_bytes}, {slide, lag}],
+    ok = emqx_metrics_worker:create_metrics(Name, Name, Metrics).
+
+-spec stats(emqx_metrics_worker:handler_name()) -> callback().
+stats(Worker) ->
+    fun(_Stream, Msgs, _) ->
+        emqx_metrics_worker:inc(Worker, Worker, n_batches),
+        lists:foreach(fun(Msg) -> update_stats(Worker, Msg) end, Msgs)
+    end.
+
+update_stats(Worker, {_Key, Msg}) ->
+    emqx_metrics_worker:inc(Worker, Worker, n_msgs),
+    emqx_metrics_worker:inc(Worker, Worker, n_bytes, emqx_message:estimate_size(Msg)),
+    Lag = emqx_message:timestamp_now() - Msg#message.timestamp,
+    emqx_metrics_worker:observe(Worker, Worker, lag, Lag).
+
+%%%%%
+-spec start_link(#{
+    topic := string() | [emqx_types:word()],
+    name => term(),
+    db => emqx_ds:db(),
+    callback => callback(),
+    active => true | pos_integer(),
+    start => integer(),
+    renew_streams_interval => pos_integer(),
+    poll_timeout => pos_integer()
+}) -> {ok, pid()}.
+start_link(User0) ->
+    User = maps:update_with(
+        topic,
+        fun(Topic) ->
+            case Topic of
+                _ when is_binary(Topic) ->
+                    emqx_topic:words(Topic);
+                [A | _] when is_atom(A); is_binary(A) ->
+                    Topic;
+                [A | _] when is_integer(A) ->
+                    emqx_topic:words(list_to_binary(Topic))
+            end
+        end,
+        User0
+    ),
+    Defaults = #{
+        name => self(),
+        db => messages,
+        callback => print(),
+        active => true,
+        start => erlang:system_time(millisecond),
+        renew_streams_interval => 5_000,
+        poll_timeout => 30_000
+    },
+    Conf = #{name := Name} = maps:merge(Defaults, User),
+    c(Name),
+    gen_server:start_link(?via(Name), ?MODULE, Conf, []).
+
+stop(Name) ->
+    exit(gproc:where(?name(Name)), normal).
+
+stop() ->
+    stop(c()).
+
+-spec c() -> _Name.
+c() ->
+    case get(?MODULE) of
+        Pid when is_pid(Pid) ->
+            Pid;
+        undefined ->
+            error(noproc)
+    end.
+
+-spec c(Name) -> Name | undefined.
+c(Name) ->
+    put(?MODULE, Name).
+
+-spec ls() -> [{_Name, pid()}].
+ls() ->
+    MS = {{?name('$1'), '$2', '_'}, [], [{{'$1', '$2'}}]},
+    gproc:select({local, names}, [MS]).
+
+more(Name) ->
+    gen_server:call(?via(Name), #more_req{}).
+
+more() ->
+    more(c()).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+-record(s, {
+    callback :: callback(),
+    callback_state :: _,
+    db :: emqx_ds:db(),
+    poll_timeout :: pos_integer(),
+    topic :: emqx_ds:topic_filter(),
+    start :: integer(),
+    active :: true | pos_integer(),
+    its = #{} :: #{emqx_ds:stream() => emqx_ds:iterator()},
+    %% Streams that have been fully replayed and should be ignored:
+    eos = #{} :: #{emqx_ds:stream() => _},
+    renew_streams_interval :: pos_integer(),
+    %%   Poll-related fields:
+    %% In-progress poll requests:
+    polls = #{} :: #{emqx_ds:stream() => reference()},
+    %% Queue of pollable streams:
+    pq :: queue:queue(emqx_ds:stream()),
+    %% Number of streams with currently inflight poll requests:
+    inflight = 0 :: non_neg_integer(),
+    %% Number of poll requests sent to the DS so far, or since the
+    %% last `next()' call:
+    poll_count = 0 :: non_neg_integer()
+}).
+
+init(
+    #{
+        callback := Callback,
+        active := Active,
+        topic := TF,
+        start := Start,
+        db := DB,
+        renew_streams_interval := RSI,
+        poll_timeout := PollTimeout
+    }
+) ->
+    process_flag(trap_exit, true),
+    S = #s{
+        callback = Callback,
+        db = DB,
+        topic = TF,
+        start = Start,
+        active = Active,
+        renew_streams_interval = RSI,
+        poll_timeout = PollTimeout,
+        pq = queue:new()
+    },
+    {ok, poll(renew_streams(S))}.
+
+handle_call(#more_req{}, _From, S0) ->
+    S = S0#s{poll_count = 0},
+    {reply, ok, poll(S)};
+handle_call(_Call, _From, S) ->
+    {reply, {error, unknown_call}, S}.
+
+handle_cast(_Cast, S) ->
+    {noreply, S}.
+
+handle_info(renew_streams, S) ->
+    {noreply, renew_streams(S)};
+handle_info(#poll_reply{ref = Ref, payload = poll_timeout}, S0 = #s{polls = Polls}) ->
+    unalias(Ref),
+    S = maps:fold(
+        fun(Stream, R, S1) ->
+            case R of
+                Ref ->
+                    handle_poll_timeout(Stream, S1);
+                _ ->
+                    S1
+            end
+        end,
+        S0,
+        Polls
+    ),
+    {noreply, poll(S)};
+handle_info(
+    #poll_reply{ref = Ref, userdata = Stream, payload = Payload},
+    #s{inflight = Inflight, polls = Polls0} = S0
+) ->
+    case maps:take(Stream, Polls0) of
+        {Ref, Polls} ->
+            S = handle_poll_reply(Stream, Payload, S0#s{inflight = Inflight - 1, polls = Polls}),
+            {noreply, poll(S)};
+        _ ->
+            logger:error("Stray poll reply ~p", [
+                #{ref => Ref, stream => Stream, payload => Payload}
+            ]),
+            {noreply, S0}
+    end;
+handle_info(_Info, S) ->
+    {noreply, S}.
+
+terminate(_Reason, _S) ->
+    ok.
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+handle_poll_timeout(Stream, S = #s{polls = Polls, inflight = Inflight, pq = PQ}) ->
+    S#s{
+        polls = maps:remove(Stream, Polls),
+        inflight = Inflight - 1,
+        pq = queue:in(Stream, PQ)
+    }.
+
+handle_poll_reply(
+    Stream,
+    {ok, Iterator, Messages},
+    S = #s{its = Its0, pq = PQ0, callback = Callback, callback_state = CS0}
+) ->
+    CS = Callback(Stream, Messages, CS0),
+    S#s{
+        its = Its0#{Stream => Iterator},
+        pq = queue:in(Stream, PQ0),
+        callback_state = CS
+    };
+handle_poll_reply(Stream, {error, recoverable, _Err}, S = #s{pq = PQ0}) ->
+    S#s{pq = queue:in(Stream, PQ0)};
+handle_poll_reply(Stream, EOSEvent, S = #s{eos = EOS, db = DB}) ->
+    case EOSEvent of
+        {ok, end_of_stream} ->
+            ok;
+        {error, unrecoverable, Err} ->
+            ?LOG_ERROR(#{
+                msg => "Unrecoverable stream poll error", stream => Stream, error => Err, db => DB
+            }),
+            ok
+    end,
+    S#s{eos = EOS#{Stream => true}}.
+
+renew_streams(
+    S = #s{
+        db = DB,
+        topic = TF,
+        start = Start,
+        its = Its0,
+        eos = EOS,
+        renew_streams_interval = RSI,
+        pq = PQ0
+    }
+) ->
+    Streams = emqx_ds:get_streams(DB, TF, Start),
+    {Its, PQ} = lists:foldl(
+        fun({_Rank, Stream}, Acc = {Its1, PQ1}) ->
+            case Its1 of
+                #{Stream := _} ->
+                    %% Stream already exists:
+                    Acc;
+                #{} ->
+                    case
+                        maps:is_key(Stream, EOS) orelse emqx_ds:make_iterator(DB, Stream, TF, Start)
+                    of
+                        {ok, It} ->
+                            %% This is a new stream:
+                            {Its1#{Stream => It}, queue:in(Stream, PQ1)};
+                        _ ->
+                            Acc
+                    end
+            end
+        end,
+        {Its0, PQ0},
+        Streams
+    ),
+    erlang:send_after(RSI, self(), renew_streams),
+    S#s{its = Its, pq = PQ}.
+
+poll(S0) ->
+    poll(S0, []).
+
+poll(S0 = #s{inflight = Inflight, poll_count = PC}, PollStreams) ->
+    case grab_more(S0) of
+        {{value, Stream}, PQ} ->
+            S = S0#s{pq = PQ, inflight = Inflight + 1, poll_count = PC + 1},
+            poll(S, [Stream | PollStreams]);
+        _ ->
+            case PollStreams of
+                [] ->
+                    %% Nothing to poll:
+                    S0;
+                _ ->
+                    do_poll(S0, PollStreams)
+            end
+    end.
+
+grab_more(#s{active = Active, pq = PQ, poll_count = PC}) ->
+    case Active of
+        true ->
+            queue:out(PQ);
+        N when PC < N ->
+            queue:out(PQ);
+        _ ->
+            false
+    end.
+
+do_poll(S = #s{db = DB, poll_timeout = Timeout, polls = Polls0, its = Its}, PollStreams) ->
+    Req = [{Stream, maps:get(Stream, Its)} || Stream <- PollStreams],
+    {ok, Ref} = emqx_ds:poll(DB, Req, #{timeout => Timeout}),
+    NewPolls = maps:from_keys(PollStreams, Ref),
+    Polls = maps:merge(Polls0, NewPolls),
+    S#s{polls = Polls}.

+ 47 - 2
apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl

@@ -31,6 +31,8 @@
 
 -define(SHARD, shard(?FUNCTION_NAME)).
 
+-define(LTS_THRESHOLD, {simple, {20, 10}}).
+
 -define(DB_CONFIG(CONFIG), #{
     backend => builtin_local,
     storage => ?config(layout, CONFIG),
@@ -47,9 +49,14 @@ init_per_group(Group, Config) ->
     LayoutConf =
         case Group of
             reference ->
-                {emqx_ds_storage_reference, #{}};
+                {emqx_ds_storage_reference, #{
+                    lts_threshold_spec => ?LTS_THRESHOLD
+                }};
             skipstream_lts ->
-                {emqx_ds_storage_skipstream_lts, #{with_guid => true}};
+                {emqx_ds_storage_skipstream_lts, #{
+                    with_guid => true,
+                    lts_threshold_spec => ?LTS_THRESHOLD
+                }};
             bitfield_lts ->
                 {emqx_ds_storage_bitfield_lts, #{}}
         end,
@@ -292,6 +299,44 @@ t_replay(Config) ->
     ?assert(check(?SHARD, <<"#">>, 0, Messages)),
     ok.
 
+t_replay_special_topics(_Config) ->
+    %% Verify that topic matching rules respect [MQTT-4.7.2-1]:
+    %% The Server MUST NOT match Topic Filters starting with a wildcard character (# or +)
+    %% with Topic Names beginning with a $ character.
+    {Values1, Values2} = lists:split(5, lists:seq(0, 1000, 100)),
+    STopic1 = <<"$SPECIAL/test/1/2">>,
+    ELTopic = <<"/test/">>,
+    Topics1 = [<<"g/test/1">>, <<"g/test/2">>, <<"/test/">>],
+    SBatch1 = [make_message(V, STopic1, bin(V)) || V <- Values1],
+    Batch1 = [make_message(V, Topic, bin(V)) || Topic <- Topics1, V <- Values1],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch1 ++ Batch1),
+    %% Expect special topic messages to show up only in `$SPECIAL/test/#` subscription:
+    ?assert(check(?SHARD, <<"$SPECIAL/test/#">>, 0, SBatch1)),
+    %% ...But not in an otherwise fitting wildcard subscriptions:
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1)),
+    check(?SHARD, <<"+/test/+/+">>, 0, []),
+    %% ...And not in different special roots:
+    check(?SHARD, <<"$SYS/test/#">>, 0, []),
+    %% Publish through a lot of similarly structured topic to let LTS "learn":
+    STopic2 = <<"$SPECIAL/test/3/4">>,
+    Topics2 = [emqx_utils:format("~p/test/~p", [I, I]) || I <- lists:seq(1, 40)],
+    Batch2 = [make_message(V, Topic, bin(V)) || Topic <- Topics2 ++ [ELTopic], V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
+    SBatch2 = [make_message(V, STopic2, bin(V)) || V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch2),
+    %% ...Then verify the same things:
+    ?assert(check(?SHARD, <<"$SPECIAL/test/#">>, 0, SBatch1 ++ SBatch2)),
+    ?assert(check(?SHARD, <<"$SPECIAL/test/+/4">>, 0, SBatch2)),
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1 ++ Batch2)),
+    check(?SHARD, <<"+/test/+/+">>, 0, SBatch2),
+    %% Also verify that having a lot of different $-roots does not break things:
+    STopics = [emqx_utils:format("$T~p/test/~p", [I, I]) || I <- lists:seq(1, 40)],
+    SBatch3 = [make_message(V, T, bin(V)) || T <- STopics, V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch3),
+    ?assert(check(?SHARD, <<"$T1/test/#">>, 0, SBatch3)),
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1 ++ Batch2)),
+    check(?SHARD, <<"$SYS/test/#">>, 0, []).
+
 %% This testcase verifies poll functionality that doesn't involve events:
 t_poll(Config) ->
     ?check_trace(

+ 1 - 1
apps/emqx_ldap/src/emqx_ldap.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ldap, [
     {description, "EMQX LDAP Connector"},
-    {vsn, "0.1.10"},
+    {vsn, "0.1.11"},
     {registered, []},
     {applications, [
         kernel,

+ 29 - 25
apps/emqx_ldap/src/emqx_ldap_filter_parser.yrl

@@ -1,21 +1,22 @@
-Header "%%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the \"License\");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an \"AS IS\" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------".
+Header
+"%%--------------------------------------------------------------------\n"
+"%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.\n"
+"%%\n"
+"%% Licensed under the Apache License, Version 2.0 (the \"License\");\n"
+"%% you may not use this file except in compliance with the License.\n"
+"%% You may obtain a copy of the License at\n"
+"%%\n"
+"%%     http://www.apache.org/licenses/LICENSE-2.0\n"
+"%%\n"
+"%% Unless required by applicable law or agreed to in writing, software\n"
+"%% distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+"%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+"%% See the License for the specific language governing permissions and\n"
+"%% limitations under the License.\n"
+"%%--------------------------------------------------------------------".
 
 Nonterminals
-filter filtercomp filterlist item simple present substring initial any final extensible attr value type dnattrs matchingrule dnvalue.
+filter filtercomp filterlist item simple present substring initial any final extensible attr value type dnattrs matchingrule dnvalue complexValue.
 
 Terminals
 lparen rparen 'and' 'or' 'not' equal approx greaterOrEqual lessOrEqual asterisk colon dn string comma.
@@ -51,9 +52,7 @@ item->
     extensible: '$1'.
 
 simple ->
-    attr equal value: equal('$1', '$3').
-simple ->
-    attr equal dnvalue: equal('$1', '$3').
+    attr equal complexValue: equal('$1', '$3').
 simple ->
     attr approx value: approx('$1', '$3').
 simple ->
@@ -83,18 +82,18 @@ any -> any value asterisk: 'any'('$1', '$2').
 any -> '$empty': [].
 
 extensible ->
-    type dnattrs matchingrule colon equal value : extensible('$6', ['$1', '$2', '$3']).
+    type dnattrs matchingrule colon equal complexValue : extensible('$6', ['$1', '$2', '$3']).
 extensible ->
-    type dnattrs colon equal value: extensible('$5', ['$1', '$2']).
+    type dnattrs colon equal complexValue: extensible('$5', ['$1', '$2']).
 extensible ->
-    type matchingrule colon equal value: extensible('$5', ['$1', '$2']).
+    type matchingrule colon equal complexValue: extensible('$5', ['$1', '$2']).
 extensible ->
-    type colon equal value: extensible('$4', ['$1']).
+    type colon equal complexValue: extensible('$4', ['$1']).
 
 extensible ->
-    dnattrs matchingrule colon equal value: extensible('$5', ['$1', '$2']).
+    dnattrs matchingrule colon equal complexValue: extensible('$5', ['$1', '$2']).
 extensible ->
-    matchingrule colon equal value: extensible('$4', ['$1']).
+    matchingrule colon equal complexValue: extensible('$4', ['$1']).
 
 attr ->
     string: get_value('$1').
@@ -107,6 +106,11 @@ dnvalue ->
 dnvalue ->
     string equal string: make_dn_value('$1', '$3').
 
+complexValue ->
+    value: '$1'.
+complexValue ->
+    dnvalue: '$1'.
+
 type ->
     value: {type, '$1'}.
 

+ 11 - 0
apps/emqx_ldap/test/emqx_ldap_filter_SUITE.erl

@@ -264,6 +264,17 @@ t_member_of(_Config) ->
         parse("(&(a=b)(memberOf=CN=GroupName,OU=emqx,DC=WL,DC=com))")
     ).
 
+t_extensible_member_of(_Config) ->
+    ?assertEqual(
+        'and'([
+            equalityMatch("a", "b"),
+            extensibleMatch("CN=GroupName,OU=emqx,DC=WL,DC=com", [
+                {type, "memberOf"}, {matchingRule, "1.2.840.113556.1.4.1941"}
+            ])
+        ]),
+        parse("(&(a=b)(memberOf:1.2.840.113556.1.4.1941:=CN=GroupName,OU=emqx,DC=WL,DC=com))")
+    ).
+
 % %%------------------------------------------------------------------------------
 % %% Helpers
 % %%------------------------------------------------------------------------------

+ 141 - 69
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -18,19 +18,37 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--export([post_boot/0]).
--export([stop_apps/0, ensure_apps_started/0]).
--export([sorted_reboot_apps/0]).
--export([start_autocluster/0]).
--export([stop_port_apps/0]).
--export([read_apps/0]).
+%% API
+-export([
+    start_link/0,
+
+    stop_apps/0,
+    ensure_apps_started/0,
+    sorted_reboot_apps/0,
+    stop_port_apps/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+%% Internal exports (for `emqx_machine_terminator' only)
+-export([do_stop_apps/0]).
 
 -dialyzer({no_match, [basic_reboot_apps/0]}).
 
 -ifdef(TEST).
--export([sorted_reboot_apps/1, reboot_apps/0]).
+-export([read_apps/0, sorted_reboot_apps/1, reboot_apps/0, start_autocluster/0]).
 -endif.
 
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
 %% These apps are always (re)started by emqx_machine:
 -define(BASIC_REBOOT_APPS, [gproc, esockd, ranch, cowboy, emqx_durable_storage, emqx]).
 
@@ -41,34 +59,19 @@
 %% release, depending on the build flags:
 -define(OPTIONAL_APPS, [bcrypt, observer]).
 
-post_boot() ->
-    ok = ensure_apps_started(),
-    ok = print_vsn(),
-    ok = start_autocluster(),
-    ignore.
+%% calls/casts/infos
+-record(start_apps, {}).
+-record(stop_apps, {}).
 
--ifdef(TEST).
-print_vsn() -> ok.
-% TEST
--else.
-print_vsn() ->
-    ?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-% TEST
--endif.
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
 
-start_autocluster() ->
-    ekka:callback(stop, fun emqx_machine_boot:stop_apps/0),
-    ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0),
-    %% returns 'ok' or a pid or 'any()' as in spec
-    _ = ekka:autocluster(emqx),
-    ok.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 stop_apps() ->
-    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
-    _ = emqx_alarm_handler:unload(),
-    ok = emqx_conf_app:unset_config_loaded(),
-    ok = emqx_plugins:ensure_stopped(),
-    lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
+    gen_server:call(?MODULE, #stop_apps{}, infinity).
 
 %% Those port apps are terminated after the main apps
 %% Don't need to stop when reboot.
@@ -84,34 +87,60 @@ stop_port_apps() ->
         [os_mon, jq]
     ).
 
-stop_one_app(App) ->
-    ?SLOG(debug, #{msg => "stopping_app", app => App}),
-    try
-        _ = application:stop(App)
-    catch
-        C:E ->
-            ?SLOG(error, #{
-                msg => "failed_to_stop_app",
-                app => App,
-                exception => C,
-                reason => E
-            })
-    end.
-
 ensure_apps_started() ->
-    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
-    lists:foreach(fun start_one_app/1, sorted_reboot_apps()),
-    ?tp(emqx_machine_boot_apps_started, #{}).
+    gen_server:call(?MODULE, #start_apps{}, infinity).
 
-start_one_app(App) ->
-    ?SLOG(debug, #{msg => "starting_app", app => App}),
-    case application:ensure_all_started(App, restart_type(App)) of
-        {ok, Apps} ->
-            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
-        {error, Reason} ->
-            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
-            error({failed_to_start_app, App, Reason})
-    end.
+sorted_reboot_apps() ->
+    RebootApps = reboot_apps(),
+    Apps0 = [{App, app_deps(App, RebootApps)} || App <- RebootApps],
+    Apps = emqx_machine_boot_runtime_deps:inject(Apps0, runtime_deps()),
+    sorted_reboot_apps(Apps).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_Opts) ->
+    %% Ensure that the stop callback is set, so that join requests concurrent to the
+    %% startup are serialized here.
+    %% It would still be problematic if a join request arrives before this process is
+    %% started, though.
+    ekka:callback(stop, fun ?MODULE:stop_apps/0),
+    do_start_apps(),
+    ok = print_vsn(),
+    ok = start_autocluster(),
+    State = #{},
+    {ok, State}.
+
+handle_call(#start_apps{}, _From, State) ->
+    do_start_apps(),
+    {reply, ok, State};
+handle_call(#stop_apps{}, _From, State) ->
+    do_stop_apps(),
+    {reply, ok, State};
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal exports
+%%------------------------------------------------------------------------------
+
+%% Callers who wish to stop applications should not call this directly, and instead use
+%% `stop_apps/0`.  The only exception is `emqx_machine_terminator': it should call this
+%% block directly in its try-catch block, without the possibility of crashing this
+%% process.
+do_stop_apps() ->
+    ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
+    _ = emqx_alarm_handler:unload(),
+    ok = emqx_conf_app:unset_config_loaded(),
+    ok = emqx_plugins:ensure_stopped(),
+    lists:foreach(fun stop_one_app/1, lists:reverse(?MODULE:sorted_reboot_apps())).
 
 restart_type(App) ->
     PermanentApps =
@@ -123,6 +152,26 @@ restart_type(App) ->
             temporary
     end.
 
+start_autocluster() ->
+    ekka:callback(stop, fun ?MODULE:stop_apps/0),
+    ekka:callback(start, fun ?MODULE:ensure_apps_started/0),
+    %% returns 'ok' or a pid or 'any()' as in spec
+    _ = ekka:autocluster(emqx),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+-ifdef(TEST).
+print_vsn() -> ok.
+% TEST
+-else.
+print_vsn() ->
+    ?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
+% TEST
+-endif.
+
 %% list of app names which should be rebooted when:
 %% 1. due to static config change
 %% 2. after join a cluster
@@ -149,13 +198,6 @@ basic_reboot_apps() ->
     BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
     ?BASIC_REBOOT_APPS ++ (BusinessApps -- excluded_apps()).
 
-%% @doc Read business apps belonging to the current profile/edition.
-read_apps() ->
-    PrivDir = code:priv_dir(emqx_machine),
-    RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
-    {ok, [Apps]} = file:consult(RebootListPath),
-    Apps.
-
 excluded_apps() ->
     %% Optional apps _should_ be (re)started automatically, but only
     %% when they are found in the release:
@@ -168,11 +210,34 @@ is_app(Name) ->
         _ -> false
     end.
 
-sorted_reboot_apps() ->
-    RebootApps = reboot_apps(),
-    Apps0 = [{App, app_deps(App, RebootApps)} || App <- RebootApps],
-    Apps = emqx_machine_boot_runtime_deps:inject(Apps0, runtime_deps()),
-    sorted_reboot_apps(Apps).
+do_start_apps() ->
+    ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
+    lists:foreach(fun start_one_app/1, ?MODULE:sorted_reboot_apps()),
+    ?tp(emqx_machine_boot_apps_started, #{}).
+
+start_one_app(App) ->
+    ?SLOG(debug, #{msg => "starting_app", app => App}),
+    case application:ensure_all_started(App, restart_type(App)) of
+        {ok, Apps} ->
+            ?SLOG(debug, #{msg => "started_apps", apps => Apps});
+        {error, Reason} ->
+            ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
+            error({failed_to_start_app, App, Reason})
+    end.
+
+stop_one_app(App) ->
+    ?SLOG(debug, #{msg => "stopping_app", app => App}),
+    try
+        _ = application:stop(App)
+    catch
+        C:E ->
+            ?SLOG(error, #{
+                msg => "failed_to_stop_app",
+                app => App,
+                exception => C,
+                reason => E
+            })
+    end.
 
 app_deps(App, RebootApps) ->
     case application:get_key(App, applications) of
@@ -180,6 +245,13 @@ app_deps(App, RebootApps) ->
         {ok, List} -> lists:filter(fun(A) -> lists:member(A, RebootApps) end, List)
     end.
 
+%% @doc Read business apps belonging to the current profile/edition.
+read_apps() ->
+    PrivDir = code:priv_dir(emqx_machine),
+    RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
+    {ok, [Apps]} = file:consult(RebootListPath),
+    Apps.
+
 runtime_deps() ->
     [
         %% `emqx_bridge' is special in that it needs all the bridges apps to

+ 9 - 3
apps/emqx_machine/src/emqx_machine_sup.erl

@@ -29,10 +29,16 @@ start_link() ->
 
 init([]) ->
     Terminator = child_worker(emqx_machine_terminator, [], transient),
-    BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary),
-    GlobalGC = child_worker(emqx_global_gc, [], permanent),
+    %% Must start before `emqx_machine_boot'.
     ReplicantHealthProbe = child_worker(emqx_machine_replicant_health_probe, [], transient),
-    Children = [Terminator, ReplicantHealthProbe, BootApps, GlobalGC],
+    Booter = child_worker(emqx_machine_boot, [], permanent),
+    GlobalGC = child_worker(emqx_global_gc, [], permanent),
+    Children = [
+        Terminator,
+        ReplicantHealthProbe,
+        Booter,
+        GlobalGC
+    ],
     SupFlags = #{
         strategy => one_for_one,
         intensity => 100,

+ 1 - 1
apps/emqx_machine/src/emqx_machine_terminator.erl

@@ -96,7 +96,7 @@ handle_call(?DO_IT, _From, State) ->
     try
         %% stop port apps before stopping other apps.
         emqx_machine_boot:stop_port_apps(),
-        emqx_machine_boot:stop_apps()
+        emqx_machine_boot:do_stop_apps()
     catch
         C:E:St ->
             Apps = [element(1, A) || A <- application:which_applications()],

+ 24 - 20
apps/emqx_machine/src/user_default.erl

@@ -31,6 +31,10 @@
 -define(RED, <<"\e[31m">>).
 -define(RESET, <<"\e[0m">>).
 
+%% print to default group-leader but not user
+%% so it should work in remote shell
+-define(PRINT(FMT, ARGS), io:format(FMT, ARGS)).
+
 %% API
 -export([lock/0, unlock/0]).
 -export([trace/0, t/0, t/1, t/2, t_msg/0, t_msg/1, t_stop/0]).
@@ -42,26 +46,26 @@ lock() -> emqx_restricted_shell:lock().
 unlock() -> emqx_restricted_shell:unlock().
 
 trace() ->
-    ?ULOG("Trace Usage:~n", []),
-    ?ULOG("  --------------------------------------------------~n", []),
-    ?ULOG("  t(Mod, Func) -> trace a specify function.~n", []),
-    ?ULOG("  t(RTPs) -> trace in Redbug Trace Patterns.~n", []),
-    ?ULOG("       eg1: t(\"emqx_hooks:run\").~n", []),
-    ?ULOG("       eg2: t(\"emqx_hooks:run/2\").~n", []),
-    ?ULOG("       eg3: t(\"emqx_hooks:run/2 -> return\").~n", []),
-    ?ULOG(
+    ?PRINT("Trace Usage:~n", []),
+    ?PRINT("  --------------------------------------------------~n", []),
+    ?PRINT("  t(Mod, Func) -> trace a specify function.~n", []),
+    ?PRINT("  t(RTPs) -> trace in Redbug Trace Patterns.~n", []),
+    ?PRINT("       eg1: t(\"emqx_hooks:run\").~n", []),
+    ?PRINT("       eg2: t(\"emqx_hooks:run/2\").~n", []),
+    ?PRINT("       eg3: t(\"emqx_hooks:run/2 -> return\").~n", []),
+    ?PRINT(
         "       eg4: t(\"emqx_hooks:run('message.dropped',[_, #{node := N}, _])"
         "when N =:= 'emqx@127.0.0.1' -> stack,return\"~n",
         []
     ),
-    ?ULOG("  t() ->   when you forget the RTPs.~n", []),
-    ?ULOG("  --------------------------------------------------~n", []),
-    ?ULOG("  t_msg(PidorRegName) -> trace a pid/registed name's messages.~n", []),
-    ?ULOG("  t_msg([Pid,RegName]) -> trace a list pids's messages.~n", []),
-    ?ULOG("  t_msg() ->  when you forget the pids.~n", []),
-    ?ULOG("  --------------------------------------------------~n", []),
-    ?ULOG("  t_stop() -> stop running trace.~n", []),
-    ?ULOG("  --------------------------------------------------~n", []),
+    ?PRINT("  t() ->   when you forget the RTPs.~n", []),
+    ?PRINT("  --------------------------------------------------~n", []),
+    ?PRINT("  t_msg(PidorRegName) -> trace a pid/registed name's messages.~n", []),
+    ?PRINT("  t_msg([Pid,RegName]) -> trace a list pids's messages.~n", []),
+    ?PRINT("  t_msg() ->  when you forget the pids.~n", []),
+    ?PRINT("  --------------------------------------------------~n", []),
+    ?PRINT("  t_stop() -> stop running trace.~n", []),
+    ?PRINT("  --------------------------------------------------~n", []),
     ok.
 
 t_stop() ->
@@ -82,7 +86,7 @@ t(M, F) ->
     start_trace(RTP, Options, Pids).
 
 t_msg() ->
-    ?ULOG("Tracing on specific pids's send/receive message: ~n", []),
+    ?PRINT("Tracing on specific pids's send/receive message: ~n", []),
     Pids = get_pids(),
     t_msg(Pids).
 
@@ -131,7 +135,7 @@ get_rtp_fun() ->
     end.
 
 get_function() ->
-    ?ULOG("Function(func|func/3|func('_', atom, X) when is_integer(X)) :~n", []),
+    ?PRINT("Function(func|func/3|func('_', atom, X) when is_integer(X)) :~n", []),
     F0 = io:get_line(""),
     string:trim(F0, both, " \n").
 
@@ -196,8 +200,8 @@ parse_pid(NameStr) ->
             throw({not_registered, NameStr})
     end.
 
-warning(Fmt, Args) -> ?ELOG("~s" ++ Fmt ++ ".~s~n", [?RED] ++ Args ++ [?RESET]).
-info(Fmt, Args) -> ?ELOG("~s" ++ Fmt ++ ".~s~n", [?GREEN] ++ Args ++ [?RESET]).
+warning(Fmt, Args) -> ?PRINT("~s" ++ Fmt ++ ".~s~n", [?RED] ++ Args ++ [?RESET]).
+info(Fmt, Args) -> ?PRINT("~s" ++ Fmt ++ ".~s~n", [?GREEN] ++ Args ++ [?RESET]).
 
 ensure_redbug_stop() ->
     case redbug:stop() of

+ 8 - 0
apps/emqx_machine/test/emqx_machine_SUITE.erl

@@ -56,6 +56,7 @@ end_per_suite(Config) ->
 
 app_specs() ->
     [
+        emqx,
         emqx_conf,
         emqx_prometheus,
         emqx_modules,
@@ -113,8 +114,15 @@ t_shutdown_reboot(Config) ->
         [{machine_reboot_SUITE1, #{role => core, apps => app_specs()}}],
         #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
     ),
+    SortedApps = app_specs(),
     try
         erpc:call(Node, fun() ->
+            %% Since `emqx_cth_*' starts applications without going through
+            %% `emqx_machine', we need to start this manually.
+            {ok, _} = emqx_machine_boot:start_link(),
+            ok = meck:new(emqx_machine_boot, [passthrough]),
+            ok = meck:expect(emqx_machine_boot, sorted_reboot_apps, 0, SortedApps),
+
             true = emqx:is_running(node()),
             emqx_machine_boot:stop_apps(),
             false = emqx:is_running(node()),

+ 7 - 1
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -1872,7 +1872,13 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
         )
     );
 format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
-    format_persistent_session_info(ClientId, PSInfo0).
+    format_persistent_session_info(ClientId, PSInfo0);
+format_channel_info(undefined, {ClientId, undefined = _PSInfo}, _Opts) ->
+    %% Durable session missing its metadata: possibly a race condition, such as the client
+    %% being kicked while the API is enumerating clients.  There's nothing much to do, we
+    %% just return an almost empty map to avoid crashing this function.  The client may
+    %% just retry listing in such cases.
+    #{clientid => ClientId}.
 
 format_persistent_session_info(
     _ClientId,

+ 1 - 1
apps/emqx_message_transformation/src/emqx_message_transformation.app.src

@@ -1,6 +1,6 @@
 {application, emqx_message_transformation, [
     {description, "EMQX Message Transformation"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]},
     {mod, {emqx_message_transformation_app, []}},
     {applications, [

+ 2 - 2
apps/emqx_message_transformation/src/emqx_message_transformation_app.erl

@@ -21,14 +21,14 @@ start(_Type, _Args) ->
     {ok, Sup} = emqx_message_transformation_sup:start_link(),
     ok = emqx_variform:inject_allowed_module(emqx_message_transformation_bif),
     ok = emqx_message_transformation_config:add_handler(),
-    ok = emqx_message_transformation:register_hooks(),
     ok = emqx_message_transformation_config:load(),
+    ok = emqx_message_transformation:register_hooks(),
     {ok, Sup}.
 
 -spec stop(term()) -> ok.
 stop(_State) ->
-    ok = emqx_message_transformation_config:unload(),
     ok = emqx_message_transformation:unregister_hooks(),
+    ok = emqx_message_transformation_config:unload(),
     ok = emqx_message_transformation_config:remove_handler(),
     ok = emqx_variform:erase_allowed_module(emqx_message_transformation_bif),
     ok.

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {applications, [
         kernel,

+ 7 - 1
apps/emqx_oracle/src/emqx_oracle.erl

@@ -35,7 +35,7 @@
 ]).
 
 %% callbacks for ecpool
--export([connect/1, prepare_sql_to_conn/3]).
+-export([connect/1, prepare_sql_to_conn/3, get_reconnect_callback_signature/1]).
 
 %% Internal exports used to execute code with ecpool worker
 -export([
@@ -496,6 +496,12 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) whe
             Error
     end.
 
+%% this callback accepts the arg list provided to
+%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
+%% so ecpool_worker can de-duplicate the callbacks based on the signature.
+get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
+    ChannelId.
+
 check_if_table_exists(Conn, SQL, Tokens0) ->
     % Discard nested tokens for checking if table exist. As payload here is defined as
     % a single string, it would fail if Token is, for instance, ${payload.msg}, causing

+ 4 - 0
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -914,6 +914,10 @@ t_start_node_with_plugin_enabled({init, Config}) ->
         emqx_conf,
         emqx_ctl,
         {emqx_plugins, #{
+            after_start => fun() ->
+                {ok, Pid} = emqx_machine_boot:start_link(),
+                unlink(Pid)
+            end,
             config =>
                 #{
                     plugins =>

+ 87 - 18
apps/emqx_resource/src/emqx_resource_cache_cleaner.erl

@@ -16,7 +16,19 @@
 
 -module(emqx_resource_cache_cleaner).
 
--export([start_link/0]).
+-behaviour(gen_server).
+
+-include_lib("snabbkaffe/include/trace.hrl").
+
+%% API
+-export([
+    start_link/0,
+
+    add_cache/2,
+    add_dry_run/2
+]).
+
+%% `gen_server' API
 -export([
     init/1,
     handle_call/3,
@@ -24,51 +36,108 @@
     handle_info/2,
     terminate/2
 ]).
--export([add/2]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
 
 -define(SERVER, ?MODULE).
 
+%% calls/casts/infos
+-record(add_cache, {id :: emqx_resource:resource_id(), pid :: pid()}).
+-record(add_dry_run, {id :: emqx_resource:resource_id(), pid :: pid()}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
-add(ID, Pid) ->
-    gen_server:call(?SERVER, {add, ID, Pid}, infinity).
+add_cache(ID, Pid) ->
+    gen_server:call(?SERVER, #add_cache{id = ID, pid = Pid}, infinity).
+
+add_dry_run(ID, Pid) ->
+    gen_server:cast(?SERVER, #add_dry_run{id = ID, pid = Pid}).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
 
 init(_) ->
     process_flag(trap_exit, true),
-    {ok, #{pmon => emqx_pmon:new()}}.
+    State = #{
+        cache_pmon => emqx_pmon:new(),
+        dry_run_pmon => emqx_pmon:new()
+    },
+    {ok, State}.
 
-handle_call({add, ID, Pid}, _From, #{pmon := Pmon} = State) ->
+handle_call(#add_cache{id = ID, pid = Pid}, _From, #{cache_pmon := Pmon} = State) ->
     NewPmon = emqx_pmon:monitor(Pid, ID, Pmon),
-    {reply, ok, State#{pmon => NewPmon}};
+    {reply, ok, State#{cache_pmon := NewPmon}};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
+handle_cast(#add_dry_run{id = ID, pid = Pid}, #{dry_run_pmon := Pmon0} = State0) ->
+    Pmon = emqx_pmon:monitor(Pid, ID, Pmon0),
+    State = State0#{dry_run_pmon := Pmon},
+    {noreply, State};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({'DOWN', _MRef, process, Pid, _Reason}, #{pmon := Pmon} = State) ->
-    NewPmon =
-        case emqx_pmon:find(Pid, Pmon) of
-            {ok, ID} ->
-                maybe_erase_cache(Pid, ID),
-                emqx_pmon:erase(Pid, Pmon);
-            error ->
-                Pmon
-        end,
-    {noreply, State#{pmon => NewPmon}};
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State0) ->
+    State = handle_down(Pid, State0),
+    {noreply, State};
 handle_info(_Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, _State) ->
     ok.
 
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+handle_down(Pid, State0) ->
+    #{
+        cache_pmon := CachePmon,
+        dry_run_pmon := DryrunPmon
+    } = State0,
+    case emqx_pmon:find(Pid, CachePmon) of
+        {ok, ID} ->
+            handle_down_cache(ID, Pid, State0);
+        error ->
+            case emqx_pmon:find(Pid, DryrunPmon) of
+                {ok, ID} ->
+                    handle_down_dry_run(ID, Pid, State0);
+                error ->
+                    State0
+            end
+    end.
+
+handle_down_cache(ID, Pid, State0) ->
+    #{cache_pmon := Pmon0} = State0,
+    maybe_erase_cache(Pid, ID),
+    Pmon = emqx_pmon:erase(Pid, Pmon0),
+    State0#{cache_pmon := Pmon}.
+
+handle_down_dry_run(ID, Pid, State0) ->
+    #{dry_run_pmon := Pmon0} = State0,
+    %% No need to wait here: since it's a dry run resource, it won't be recreated,
+    %% assuming the ID is random enough.
+    spawn(fun() ->
+        emqx_resource_manager_sup:delete_child(ID),
+        ?tp("resource_cache_cleaner_deleted_child", #{id => ID})
+    end),
+    Pmon = emqx_pmon:erase(Pid, Pmon0),
+    State0#{dry_run_pmon := Pmon}.
+
 maybe_erase_cache(DownManager, ID) ->
     case emqx_resource_cache:read_manager_pid(ID) =:= DownManager of
         true ->
             emqx_resource_cache:erase(ID);
         false ->
             %% already erased, or already replaced by another manager due to quick
-            %% retart by supervisor
+            %% restart by supervisor
             ok
     end.

+ 4 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -249,6 +249,9 @@ create_dry_run(ResId, ResourceType, Config, OnReadyCallback) ->
             true -> maps:get(resource_opts, Config, #{});
             false -> #{}
         end,
+    %% Ensure that the dry run resource is terminated, even if this process is forcefully
+    %% killed (e.g.: cowboy / HTTP API request times out).
+    emqx_resource_cache_cleaner:add_dry_run(ResId, self()),
     ok = emqx_resource_manager_sup:ensure_child(
         ResId, <<"dry_run">>, ResourceType, Config, Opts
     ),
@@ -547,11 +550,11 @@ start_link(ResId, Group, ResourceType, Config, Opts) ->
 init({DataIn, Opts}) ->
     process_flag(trap_exit, true),
     Data = DataIn#data{pid = self()},
+    emqx_resource_cache_cleaner:add_cache(Data#data.id, self()),
     case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
         true ->
             %% init the cache so that lookup/1 will always return something
             UpdatedData = update_state(Data#data{status = ?status_connecting}),
-            emqx_resource_cache_cleaner:add(Data#data.id, self()),
             {ok, ?state_connecting, UpdatedData, {next_event, internal, start_resource}};
         false ->
             %% init the cache so that lookup/1 will always return something

+ 9 - 3
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -74,15 +74,16 @@ set_callback_mode(Mode) ->
 on_start(_InstId, #{create_error := true}) ->
     ?tp(connector_demo_start_error, #{}),
     error("some error");
-on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) ->
+on_start(InstId, #{create_error := {delay, Delay, Agent}} = State0) ->
     ?tp(connector_demo_start_delay, #{}),
+    State = maps:remove(create_error, State0),
     case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of
         not_called ->
             emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep),
             timer:sleep(Delay),
-            on_start(InstId, maps:remove(create_error, Opts));
+            on_start(InstId, State);
         called ->
-            on_start(InstId, maps:remove(create_error, Opts))
+            on_start(InstId, State)
     end;
 on_start(InstId, #{name := Name} = Opts) ->
     Register = maps:get(register, Opts, false),
@@ -99,6 +100,11 @@ on_stop(_InstId, undefined) ->
     ok;
 on_stop(_InstId, #{stop_error := true}) ->
     {error, stop_error};
+on_stop(InstId, #{stop_error := {ask, HowToStop}} = State) ->
+    case HowToStop() of
+        continue ->
+            on_stop(InstId, maps:remove(stop_error, State))
+    end;
 on_stop(InstId, #{pid := Pid}) ->
     persistent_term:erase(?PT_CHAN_KEY(InstId)),
     stop_counter_process(Pid).

+ 82 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -3360,6 +3360,83 @@ t_resource_and_channel_health_check_race(_Config) ->
     ),
     ok.
 
+%% Simulates the race condition where a dry run request takes too long, and the HTTP API
+%% request process is then forcefully killed before it has the chance to properly cleanup
+%% and remove the dry run/probe resource.
+t_dryrun_timeout_then_force_kill_during_stop(_Config) ->
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            ?force_ordering(
+                #{?snk_kind := connector_demo_on_stop_will_delay},
+                #{?snk_kind := will_kill_request}
+            ),
+
+            %% Simulates a cowboy request process.
+            {ok, StartAgent} = emqx_utils_agent:start_link(not_called),
+            {ok, StopAgent} = emqx_utils_agent:start_link({delay, 1_000}),
+            HowToStop = fun() ->
+                %% Delay only the first time, so test cleanup is faster.
+                Action = emqx_utils_agent:get_and_update(StopAgent, fun
+                    (continue) ->
+                        {continue, continue};
+                    ({delay, _} = Delay) ->
+                        {Delay, continue}
+                end),
+                case Action of
+                    {delay, Delay} ->
+                        ?tp(connector_demo_on_stop_will_delay, #{}),
+                        timer:sleep(Delay),
+                        continue;
+                    continue ->
+                        continue
+                end
+            end,
+            {Pid, MRef} = spawn_monitor(fun() ->
+                Res = dryrun(
+                    ?ID,
+                    ?TEST_RESOURCE,
+                    #{
+                        name => test_resource,
+                        create_error => {delay, 1_000, StartAgent},
+                        stop_error => {ask, HowToStop},
+                        resource_opts => #{
+                            health_check_interval => 100,
+                            start_timeout => 100
+                        }
+                    }
+                ),
+                exit(Res)
+            end),
+            on_exit(fun() -> exit(Pid, kill) end),
+
+            %% Simulates cowboy forcefully killing the request after it takes too long and the caller
+            %% has already closed the connection.
+            spawn_link(fun() ->
+                ?tp(will_kill_request, #{}),
+                exit(Pid, kill)
+            end),
+
+            receive
+                {'DOWN', MRef, process, Pid, Reason} ->
+                    ct:pal("request ~p died: ~p", [Pid, Reason]),
+                    ?assertEqual(killed, Reason),
+                    ok
+            end,
+
+            ?block_until(#{?snk_kind := "resource_cache_cleaner_deleted_child"}),
+
+            %% No children should be lingering
+            ?assertEqual([], supervisor:which_children(emqx_resource_manager_sup)),
+            %% Cache should be clean too
+            ?assertEqual([], emqx_resource:list_instances()),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -3618,6 +3695,11 @@ create(Id, Group, Type, Config, Opts) ->
     on_exit(fun() -> emqx_resource:remove_local(Id) end),
     Res.
 
+dryrun(Id, Type, Config) ->
+    TestPid = self(),
+    OnReady = fun(ResId) -> TestPid ! {resource_ready, ResId} end,
+    emqx_resource:create_dry_run_local(Id, Type, Config, OnReady).
+
 log_consistency_prop() ->
     {"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}.
 log_consistency_prop(Trace) ->

+ 9 - 6
apps/emqx_retainer/src/emqx_retainer.erl

@@ -83,10 +83,12 @@
 
 -type backend_state() :: term().
 
--type context() :: #{
-    module := module(),
-    state := backend_state()
-}.
+-type context() ::
+    #{
+        module := module(),
+        state := backend_state()
+    }
+    | undefined.
 
 -type topic() :: emqx_types:topic().
 -type message() :: emqx_types:message().
@@ -506,8 +508,9 @@ config_backend_module(Config) ->
         #{module := Module} -> Module
     end.
 
--spec backend_module(context()) -> module().
-backend_module(#{module := Module}) -> Module.
+-spec backend_module(context()) -> module() | undefined.
+backend_module(#{module := Module}) -> Module;
+backend_module(undefined) -> undefined.
 
 -spec backend_state(context()) -> backend_state().
 backend_state(#{state := State}) -> State.

+ 36 - 19
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -31,7 +31,8 @@ all() ->
         {group, mnesia_without_indices},
         {group, mnesia_with_indices},
         {group, mnesia_reindex},
-        {group, test_disable_then_start}
+        {group, test_disable_then_start},
+        {group, disabled}
     ].
 
 groups() ->
@@ -39,7 +40,8 @@ groups() ->
         {mnesia_without_indices, [sequence], common_tests()},
         {mnesia_with_indices, [sequence], common_tests()},
         {mnesia_reindex, [sequence], [t_reindex]},
-        {test_disable_then_start, [sequence], [test_disable_then_start]}
+        {test_disable_then_start, [sequence], [test_disable_then_start]},
+        {disabled, [test_disabled]}
     ].
 
 common_tests() ->
@@ -65,29 +67,27 @@ retainer {
 }
 ">>).
 
+%% erlfmt-ignore
+-define(DISABLED_CONF, <<"
+retainer {
+  enable = false
+}
+">>).
+
 %%--------------------------------------------------------------------
 %% Setups
 %%--------------------------------------------------------------------
 
-init_per_suite(Config) ->
-    Apps = emqx_cth_suite:start(
-        [emqx, emqx_conf, app_spec()],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    [{suite_apps, Apps} | Config].
-
-end_per_suite(Config) ->
-    emqx_cth_suite:stop(?config(suite_apps, Config)).
-
-init_per_group(mnesia_without_indices, Config) ->
-    [{index, false} | Config];
-init_per_group(mnesia_reindex, Config) ->
-    Config;
-init_per_group(_, Config) ->
-    Config.
+init_per_group(mnesia_without_indices = Group, Config) ->
+    start_apps(Group, [{index, false} | Config]);
+init_per_group(mnesia_reindex = Group, Config) ->
+    start_apps(Group, Config);
+init_per_group(Group, Config) ->
+    start_apps(Group, Config).
 
 end_per_group(_Group, Config) ->
     emqx_retainer_mnesia:populate_index_meta(),
+    stop_apps(Config),
     Config.
 
 init_per_testcase(_TestCase, Config) ->
@@ -107,9 +107,21 @@ end_per_testcase(t_cursor_cleanup, _Config) ->
 end_per_testcase(_TestCase, _Config) ->
     ok.
 
-app_spec() ->
+app_spec(disabled) ->
+    {emqx_retainer, ?DISABLED_CONF};
+app_spec(_) ->
     {emqx_retainer, ?BASE_CONF}.
 
+start_apps(Group, Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx, emqx_conf, app_spec(Group)],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{suite_apps, Apps} | Config].
+
+stop_apps(Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)).
+
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------
@@ -768,6 +780,11 @@ test_disable_then_start(_Config) ->
     ?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
     ok.
 
+test_disabled(_Config) ->
+    ?assertEqual(false, emqx_retainer:enabled()),
+    ?assertEqual(ok, emqx_retainer:clean()),
+    ?assertEqual({ok, false, []}, emqx_retainer:page_read(undefined, 1, 100)).
+
 t_deliver_when_banned(_) ->
     Client1 = <<"c1">>,
     Client2 = <<"c2">>,

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -121,10 +121,10 @@ start_link() ->
 %%----------------------------------------------------------------------------------------
 %% The config handler for emqx_rule_engine
 %%------------------------------------------------------------------------------
-post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
-    create_rule(NewRule#{id => bin(RuleId)});
 post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) ->
     delete_rule(bin(RuleId));
+post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
+    create_rule(NewRule#{id => bin(RuleId)});
 post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) ->
     update_rule(NewRule#{id => bin(RuleId)});
 post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) ->

+ 159 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_cluster_SUITE.erl

@@ -0,0 +1,159 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_api_cluster_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
+
+-define(APPSPECS, [
+    emqx,
+    emqx_conf,
+    emqx_management,
+    {emqx_rule_engine, "rule_engine { rules {} }"}
+]).
+
+-define(APPSPEC_DASHBOARD,
+    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+).
+
+-define(SIMPLE_RULE(NAME_SUFFIX), #{
+    <<"description">> => <<"A simple rule">>,
+    <<"enable">> => true,
+    <<"actions">> => [#{<<"function">> => <<"console">>}],
+    <<"sql">> => <<"SELECT * from \"t/1\"">>,
+    <<"name">> => <<"test_rule", NAME_SUFFIX/binary>>
+}).
+
+%%------------------------------------------------------------------------------
+%% Setup
+%%------------------------------------------------------------------------------
+
+all() ->
+    [{group, cluster}].
+
+groups() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    [{cluster, [], AllTCs}].
+
+suite() ->
+    [{timetrap, {seconds, 120}}].
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_group(cluster = Name, Config) ->
+    Nodes = [NodePrimary | _] = mk_cluster(Config),
+    init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]).
+
+init_api(Config) ->
+    APINode = ?config(node, Config),
+    {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []),
+    [{api, App} | Config].
+
+mk_cluster(Config) ->
+    mk_cluster(Config, #{}).
+
+mk_cluster(Config, Opts) ->
+    Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD],
+    Node2Apps = ?APPSPECS ++ [],
+    emqx_cth_cluster:start(
+        [
+            {emqx_rule_engine_api_cluster_SUITE_1, Opts#{role => core, apps => Node1Apps}},
+            {emqx_rule_engine_api_cluster_SUITE_2, Opts#{role => core, apps => Node2Apps}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ).
+
+end_per_group(Group, Config) when Group =:= cluster ->
+    ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
+end_per_group(_, Config) ->
+    emqx_cth_suite:stop(?config(group_apps, Config)),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_double_delete_on_diff_node(Config) ->
+    [Node1, Node2] = ?config(cluster_nodes, Config),
+
+    CreateFun = fun() ->
+        {201, Rule} = create_rule(Node2, ?SIMPLE_RULE(<<"test_rule1">>)),
+        RuleId = maps:get(id, Rule),
+
+        Parent = self(),
+
+        erlang:spawn(fun() ->
+            R = delete_rule(Node1, RuleId),
+            Parent ! {delete_result, Node1, R}
+        end),
+
+        erlang:spawn(fun() ->
+            R = delete_rule(Node2, RuleId),
+            Parent ! {delete_result, Node2, R}
+        end),
+
+        receive
+            {delete_result, Node1, R1} ->
+                receive
+                    {delete_result, Node2, R2} ->
+                        assert_return_204_or_404(R1),
+                        assert_return_204_or_404(R2),
+                        ?assertEqual(true, lists:member({204}, [R1, R2]))
+                after 5000 ->
+                    error({wait_timeout, Node2})
+                end
+        after 5000 ->
+            error({wait_timeout, Node1})
+        end
+    end,
+
+    lists:foreach(fun(_) -> CreateFun() end, lists:seq(1, 10)),
+
+    TxId1 = cluster_conf_tx_id(Node1),
+    TxId2 = cluster_conf_tx_id(Node2),
+    %% confirm all config updates are applied
+    ?assertEqual(TxId1, TxId2).
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+
+create_rule(Node, Params) when is_map(Params) ->
+    rpc:call(Node, emqx_rule_engine_api, '/rules', [post, #{body => Params}]).
+
+delete_rule(Node, RuleId) when is_binary(RuleId) ->
+    rpc:call(
+        Node,
+        emqx_rule_engine_api,
+        '/rules/:id',
+        [delete, #{bindings => #{id => RuleId}}]
+    ).
+
+cluster_conf_tx_id(Node) ->
+    rpc:call(Node, emqx_cluster_rpc, get_node_tnx_id, [Node]).
+
+assert_return_204_or_404({204}) -> ok;
+assert_return_204_or_404({404, _}) -> ok;
+assert_return_204_or_404(R) -> error({unexpected_result, R}).

+ 15 - 55
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -388,58 +388,6 @@ receive_published(Line) ->
         ct:fail("publish not received, line ~b", [Line])
     end.
 
-cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
-        [
-            {apps, [
-                emqx_conf,
-                emqx_rule_engine,
-                emqx_schema_registry
-            ]},
-            {listener_ports, []},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {start_autocluster, true},
-            {schema_mod, emqx_enterprise_schema},
-            {load_apps, [emqx_machine]},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, [broker]),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
-    ),
-    ct:pal("cluster:\n  ~p", [Cluster]),
-    Cluster.
-
-start_cluster(Cluster) ->
-    Nodes = [
-        emqx_common_test_helpers:start_peer(Name, Opts)
-     || {Name, Opts} <- Cluster
-    ],
-    NumNodes = length(Nodes),
-    on_exit(fun() ->
-        emqx_utils:pmap(
-            fun(N) ->
-                ct:pal("stopping ~p", [N]),
-                ok = emqx_common_test_helpers:stop_peer(N)
-            end,
-            Nodes
-        )
-    end),
-    {ok, _} = snabbkaffe:block_until(
-        %% -1 because only those that join the first node will emit the event.
-        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
-        30_000
-    ),
-    Nodes.
-
 wait_for_cluster_rpc(Node) ->
     %% need to wait until the config handler is ready after
     %% restarting during the cluster join.
@@ -703,7 +651,15 @@ t_fail_rollback(Config) ->
 
 t_cluster_serde_build(Config) ->
     SerdeType = ?config(serde_type, Config),
-    Cluster = cluster(Config),
+    AppSpecs = [
+        emqx_conf,
+        emqx_rule_engine,
+        emqx_schema_registry
+    ],
+    ClusterSpec = [
+        {cluster_serde_build1, #{apps => AppSpecs}},
+        {cluster_serde_build2, #{apps => AppSpecs}}
+    ],
     SerdeName = my_serde,
     Schema = schema_params(SerdeType),
     #{
@@ -712,9 +668,13 @@ t_cluster_serde_build(Config) ->
     } = test_params_for(SerdeType, encode_decode1),
     ?check_trace(
         begin
-            Nodes = [N1, N2 | _] = start_cluster(Cluster),
+            Nodes =
+                [N1, N2 | _] = emqx_cth_cluster:start(
+                    ClusterSpec,
+                    #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+                ),
+            on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
             NumNodes = length(Nodes),
-            wait_for_cluster_rpc(N2),
             ?assertMatch(
                 ok,
                 erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema])

+ 1 - 0
apps/emqx_utils/src/emqx_variform.erl

@@ -96,6 +96,7 @@ eval_render(Expr, Bindings, Opts) ->
     end.
 
 %% Force the expression to return binary string (in most cases).
+return_str(X) when ?IS_EMPTY(X) -> <<"">>;
 return_str(Str) when is_binary(Str) -> Str;
 return_str(Num) when is_integer(Num) -> integer_to_binary(Num);
 return_str(Num) when is_float(Num) -> float_to_binary(Num, [{decimals, 10}, compact]);

+ 2 - 10
apps/emqx_utils/src/emqx_variform_bif.erl

@@ -421,11 +421,9 @@ any_to_str(Data) ->
 %% Random functions
 %%------------------------------------------------------------------------------
 
-%% @doc Make a random string with urlsafe-base64 charset.
+%% @doc Make a random string with urlsafe-base62 charset.
 rand_str(Length) when is_integer(Length) andalso Length > 0 ->
-    RawBytes = erlang:ceil((Length * 3) / 4),
-    RandomData = rand:bytes(RawBytes),
-    urlsafe(binary:part(base64_encode(RandomData), 0, Length));
+    emqx_utils:rand_id(Length);
 rand_str(_) ->
     throw(#{reason => badarg, function => ?FUNCTION_NAME}).
 
@@ -435,12 +433,6 @@ rand_int(N) when is_integer(N) andalso N >= 1 ->
 rand_int(N) ->
     throw(#{reason => badarg, function => ?FUNCTION_NAME, expected => "positive integer", got => N}).
 
-%% TODO: call base64:encode(Bin, #{mode => urlsafe, padding => false})
-%% when oldest OTP to support is 26 or newer.
-urlsafe(Str0) ->
-    Str = replace(Str0, <<"+">>, <<"-">>),
-    replace(Str, <<"/">>, <<"_">>).
-
 %%------------------------------------------------------------------------------
 %% Data encoding
 %%------------------------------------------------------------------------------

+ 12 - 0
apps/emqx_utils/test/emqx_variform_tests.erl

@@ -26,6 +26,18 @@
 render_test_() ->
     [
         {"direct var reference", fun() -> ?assertEqual({ok, <<"1">>}, render("a", #{a => 1})) end},
+        {"direct var reference missing", fun() ->
+            ?assertMatch({error, #{reason := var_unbound}}, render("a", #{}))
+        end},
+        {"direct var reference undefined", fun() ->
+            ?assertEqual({ok, <<"">>}, render("a", #{a => undefined}))
+        end},
+        {"direct var reference null", fun() ->
+            ?assertEqual({ok, <<"">>}, render("a", #{a => null}))
+        end},
+        {"direct var reference emptry str", fun() ->
+            ?assertEqual({ok, <<"">>}, render("a", #{a => <<>>}))
+        end},
         {"concat strings", fun() ->
             ?assertEqual({ok, <<"a,b">>}, render("concat(['a',',','b'])", #{}))
         end},

+ 3 - 0
changes/ce/feat-14147.en.md

@@ -0,0 +1,3 @@
+Added support for using `memberOf` syntax in LDAP extensible match filter, for example:
+
+`(&(objectClass=class)(memberOf:1.2.840.113556.1.4.1941:=CN=GroupName,OU=emqx,DC=WL,DC=com))`

+ 1 - 0
changes/ce/fix-14113.en.md

@@ -0,0 +1 @@
+Fixed a potential race condition where a node that is still starting could crash if it attempted to join a cluster.

+ 2 - 0
changes/ce/fix-14117.en.md

@@ -0,0 +1,2 @@
+Fixed an issue with the REST API documentation that incorrectly showed that the `Users` endpoint supported `Basic` Authentication.
+

+ 1 - 0
changes/ce/fix-14160.en.md

@@ -0,0 +1 @@
+For durable session subscriptions, respect topic matching rules for durable topics starting with `$` symbol according to the MQTT specification.

+ 1 - 0
changes/ce/fix-14172.en.md

@@ -0,0 +1 @@
+Fixed a potential race condition where testing a connector using the HTTP API could leave lingering resources if the HTTP request timed out.

+ 1 - 0
changes/ce/fix-14178.en.md

@@ -0,0 +1 @@
+Fixed the issue where configuration synchronization was stuck on a particular node due to rules being deleted simultaneously across different nodes in the cluster.

+ 1 - 0
changes/ce/fix-14180.en.md

@@ -0,0 +1 @@
+Fix variform expression to return empty string instead of 'undefined' when a variable is bound to value 'undefined' or 'null'

+ 1 - 0
changes/ce/fix-14201.en.md

@@ -0,0 +1 @@
+Stop displaying a check_gc warning when the WebSocket connection encounters a rate limit.

+ 2 - 0
changes/ce/fix-14215.en.md

@@ -0,0 +1,2 @@
+Fixed an issue that calls to retainer(via REST or CLI) will throw an exception if it is disabled.
+

+ 4 - 0
changes/ee/feat-14166.en.md

@@ -0,0 +1,4 @@
+The `exchange` and `routing_key` in RabbitMQ producer can be configured as template values.
+For example, to extract the routing key from the payload, we could set "routing_key" to "${payload.akey}".
+
+Note, the templated `exchange` and `routing_key` are restricted in batch mode: We always assume that the value of them is the same for every message in a batch.

+ 5 - 0
changes/ee/feat-14176.en.md

@@ -0,0 +1,5 @@
+Some metadata was exposed to the rule engine for RabbitMQ source actions, including `queue`, `exchange` and the `routing_key`.
+
+Here is an example:
+`select *, queue as payload.queue, exchange as payload.exchange, routing_key as payload.routing_key from "$bridges/rabbitmq:test"`
+

+ 6 - 0
changes/ee/fix-14126.en.md

@@ -0,0 +1,6 @@
+Fix prepared statements for Oracle integration.
+
+Prior to this fix, when updating a Oracle integration action,
+if an invalid prepared-statements is used, for example reference to an unknown table column name,
+it may cause the action to apply the oldest version prepared-statement from the past.
+

+ 4 - 0
changes/ee/fix-14181.en.md

@@ -0,0 +1,4 @@
+Made Kafka and Pulsar producers tolerate corrupted COMMIT file.
+
+For disk mode buffers, if the COMMIT file is corrupted, it will be ignored.
+This means the producer may replay some already sent messages, but shold not crash.

+ 1 - 0
dev

@@ -348,6 +348,7 @@ $ERL_NAME_ARG $EMQX_NODE_NAME
 +SDio 8
 -shutdown_time 30000
 -pa '$EMQX_DATA_DIR/patches'
+-cache_boot_paths false
 -mnesia dump_log_write_threshold 5000
 -mnesia dump_log_time_threshold 60000
 -os_mon start_disksup false

+ 3 - 3
mix.exs

@@ -207,7 +207,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:cowboy), do: {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}
   def common_dep(:jsone), do: {:jsone, github: "emqx/jsone", tag: "1.7.1", override: true}
   def common_dep(:ecpool), do: {:ecpool, github: "emqx/ecpool", tag: "0.5.10", override: true}
-  def common_dep(:replayq), do: {:replayq, github: "emqx/replayq", tag: "0.3.9", override: true}
+  def common_dep(:replayq), do: {:replayq, github: "emqx/replayq", tag: "0.3.10", override: true}
   def common_dep(:jsx), do: {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}
   # in conflict by emqtt and hocon
   def common_dep(:getopt), do: {:getopt, "1.0.2", override: true}
@@ -276,11 +276,11 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:influxdb),
     do: {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}
 
-  def common_dep(:wolff), do: {:wolff, "4.0.3"}
+  def common_dep(:wolff), do: {:wolff, "4.0.4"}
   def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"}
 
   def common_dep(:kafka_protocol),
-    do: {:kafka_protocol, "4.1.9", override: true}
+    do: {:kafka_protocol, "4.1.10", override: true}
 
   def common_dep(:brod), do: {:brod, "4.3.1"}
   ## TODO: remove `mix.exs` from `wolff` and remove this override

+ 1 - 1
rebar.config

@@ -88,7 +88,7 @@
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
     {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.4"}}},
     {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.10"}}},
-    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.9"}}},
+    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.10"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}},
     {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.1"}}},

+ 1 - 1
rel/config/examples/dashboard-with-http.conf.example

@@ -10,7 +10,7 @@ dashboard {
     cors = false
 
     listeners.http {
-        ## Port or Address to listen on, 0 means disable
+        # bind = 0 to disable this listener
         bind = "0.0.0.0:18083" ## or just a port number, e.g. 18083
 
         ## Socket acceptor pool size for TCP protocols

+ 1 - 1
rel/config/examples/dashboard-with-https.conf.example

@@ -10,7 +10,7 @@ dashboard {
     cors = false
 
     listeners.https {
-        ## Port or Address to listen on, 0 means disable
+        # bind = 0 to disable this listener
         bind = "0.0.0.0:18084" ## or just a port number, e.g. 18084
 
         ssl_options {

+ 0 - 1
rel/config/examples/listeners.quic.conf.example

@@ -1,7 +1,6 @@
 ## MQTT over QUIC Listener
 
 listeners.quic.my_quick_listener_name {
-    ## Port or Address to listen on, 0 means disable
     bind = 14567 ## or with an IP, e.g. "127.0.0.1:14567"
 
     ## When publishing or subscribing, prefix all topics with a mountpoint string

+ 0 - 1
rel/config/examples/listeners.ssl.conf.example

@@ -1,7 +1,6 @@
 ## MQTT over TLS(SSL) Listener
 
 listeners.ssl.my_ssl_listener_name {
-    ## Port or Address to listen on, 0 means disable
     bind = 8883 ## or with an IP e.g. "127.0.0.1:8883"
     acceptors = 16
     enable_authn = true

+ 0 - 1
rel/config/examples/listeners.tcp.conf.example

@@ -1,7 +1,6 @@
 ## MQTT over TCP Listener
 
 listeners.tcp.my_tcp_listener_name {
-    ## Port or Address to listen on, 0 means disable
     bind = 1883 ## or with an IP e.g. "127.0.0.1:1883"
 
     ## Enable the Proxy Protocol V1/2 if the EMQX cluster is deployed behind HAProxy or Nginx

+ 0 - 1
rel/config/examples/listeners.ws.conf.example

@@ -1,7 +1,6 @@
 ## MQTT over WebSocket (HTTP) Listener
 
 listeners.ws.my_ws_listener_name {
-    ## Port or Address to listen on, 0 means disable
     bind = "0.0.0.0:8083" # or just a port number, e.g. 8083
     enable_authn = true
     max_connections = infinity

+ 0 - 1
rel/config/examples/listeners.wss.conf.example

@@ -1,7 +1,6 @@
 ## MQTT over Secured Websocket (HTTPS) Listener
 
 listeners.wss.my_wss_listener_name = {
-    ## Port or Address to listen on, 0 means disable
     bind = 8084 ## or with an IP, e.g. "127.0.0.1:8084"
     enable_authn = true
     max_connections = infinity

+ 2 - 2
rel/i18n/emqx_bridge_rabbitmq_connector_schema.hocon

@@ -56,7 +56,7 @@ auto_reconnect.label:
 """Auto Reconnect"""
 
 exchange.desc:
-"""The name of the RabbitMQ exchange where the messages will be sent."""
+"""The name of the RabbitMQ exchange where the messages will be sent. Supports templates (e.g.: `e-${payload.e}`)."""
 
 exchange.label:
 """Exchange"""
@@ -68,7 +68,7 @@ exchange_type.label:
 """Exchange Type"""
 
 routing_key.desc:
-"""The routing key used to route messages to the correct queue in the RabbitMQ exchange."""
+"""The routing key used to route messages to the correct queue in the RabbitMQ exchange. Supports templates (e.g.: `k-${payload.r}`)."""
 
 routing_key.label:
 """Routing Key"""

+ 0 - 1
rel/i18n/emqx_bridge_rabbitmq_pubsub_schema.hocon

@@ -10,7 +10,6 @@ subscriber_source.desc:
 subscriber_source.label:
 """Source"""
 
-
 action_parameters.desc:
 """The action config defines how this bridge send messages to the remote RabbitMQ broker"""
 action_parameters.label: