فهرست منبع

Merge pull request #11320 from id/0721-v5.1.2

Ivan Dyachkov 2 سال پیش
والد
کامیت
e55cd7fe8e
60فایلهای تغییر یافته به همراه719 افزوده شده و 395 حذف شده
  1. 1 1
      Makefile
  2. 1 1
      apps/emqx/include/emqx_release.hrl
  3. 1 0
      apps/emqx/priv/bpapi.versions
  4. 2 2
      apps/emqx/rebar.config
  5. 1 3
      apps/emqx/src/emqx_schema.erl
  6. 0 1
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src
  7. 0 1
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src
  8. 0 1
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src
  9. 0 1
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src
  10. 6 0
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
  11. 0 1
      apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src
  12. 7 0
      apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl
  13. 1 1
      apps/emqx_bridge_http/src/emqx_bridge_http.app.src
  14. 0 1
      apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src
  15. 0 1
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
  16. 0 1
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
  17. 9 2
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
  18. 1 2
      apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src
  19. 0 1
      apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src
  20. 1 2
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src
  21. 0 1
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src
  22. 0 1
      apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src
  23. 0 1
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src
  24. 20 0
      apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
  25. 1 2
      apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
  26. 0 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
  27. 6 0
      apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl
  28. 0 2
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
  29. 0 1
      apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src
  30. 1 1
      apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src
  31. 1 1
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src
  32. 0 1
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src
  33. 1 1
      apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src
  34. 9 9
      apps/emqx_conf/src/emqx_conf.erl
  35. 2 2
      apps/emqx_conf/src/emqx_conf_app.erl
  36. 68 29
      apps/emqx_conf/src/emqx_conf_cli.erl
  37. 119 0
      apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl
  38. 112 0
      apps/emqx_machine/priv/reboot_lists.eterm
  39. 53 39
      apps/emqx_machine/src/emqx_machine_boot.erl
  40. 7 7
      apps/emqx_management/src/emqx_mgmt.erl
  41. 18 9
      apps/emqx_management/src/emqx_mgmt_api_configs.erl
  42. 1 1
      apps/emqx_management/src/emqx_mgmt_api_listeners.erl
  43. 34 9
      apps/emqx_management/src/emqx_mgmt_cli.erl
  44. 25 14
      apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl
  45. 4 3
      apps/emqx_modules/src/emqx_modules_conf.erl
  46. 1 1
      apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl
  47. 1 1
      apps/emqx_oracle/src/emqx_oracle.app.src
  48. 18 2
      apps/emqx_oracle/src/emqx_oracle.erl
  49. 9 8
      apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl
  50. 1 0
      changes/ce/fix-11294.en.md
  51. 2 0
      changes/ce/fix-11309.en.md
  52. 4 0
      changes/ce/fix-11322.en.md
  53. 1 0
      changes/ee/fix-11307.en.md
  54. 72 0
      changes/v5.1.2.en.md
  55. 2 2
      deploy/charts/emqx/Chart.yaml
  56. 53 108
      mix.exs
  57. 2 2
      rebar.config
  58. 31 103
      rebar.config.erl
  59. 4 4
      rel/i18n/emqx_mgmt_api_configs.hocon
  60. 5 6
      rel/i18n/emqx_mgmt_api_key_schema.hocon

+ 1 - 1
Makefile

@@ -296,7 +296,7 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt))))
 
 .PHONY: fmt
 fmt: $(REBAR)
-	@$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}'
+	@$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,priv,test}/**/*.{erl,hrl,app.src,eterm}'
 	@$(SCRIPTS)/erlfmt -w 'rebar.config.erl'
 	@mix format
 

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -32,7 +32,7 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Opensource edition
--define(EMQX_RELEASE_CE, "5.1.1").
+-define(EMQX_RELEASE_CE, "5.1.2").
 
 %% Enterprise edition
 -define(EMQX_RELEASE_EE, "5.1.1-alpha.2").

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -12,6 +12,7 @@
 {emqx_cm,2}.
 {emqx_conf,1}.
 {emqx_conf,2}.
+{emqx_conf,3}.
 {emqx_dashboard,1}.
 {emqx_delayed,1}.
 {emqx_eviction_agent,1}.

+ 2 - 2
apps/emqx/rebar.config

@@ -28,9 +28,9 @@
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
-    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.6"}}},
+    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.7"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
-    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.13"}}},
+    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

+ 1 - 3
apps/emqx/src/emqx_schema.erl

@@ -1844,9 +1844,7 @@ desc("stats") ->
 desc("authorization") ->
     "Settings for client authorization.";
 desc("mqtt") ->
-    "Global MQTT configuration.<br/>"
-    "The configs here work as default values which can be overridden\n"
-    "in <code>zone</code> configs";
+    "Global MQTT configuration.";
 desc("authz_cache") ->
     "Settings for the authorization cache.";
 desc("zone") ->

+ 0 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         ecql
     ]},
     {env, []},

+ 0 - 1
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         clickhouse
     ]},
     {env, []},

+ 0 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         erlcloud
     ]},
     {env, []},

+ 0 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge_http,
         ehttpc
     ]},
     {env, []},

+ 6 - 0
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -598,6 +598,7 @@ start_cluster(Cluster) ->
         end,
         Cluster
     ),
+    NumNodes = length(Nodes),
     on_exit(fun() ->
         emqx_utils:pmap(
             fun(N) ->
@@ -607,6 +608,11 @@ start_cluster(Cluster) ->
             Nodes
         )
     end),
+    {ok, _} = snabbkaffe:block_until(
+        %% -1 because only those that join the first node will emit the event.
+        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
+        30_000
+    ),
     Nodes.
 
 wait_for_cluster_rpc(Node) ->

+ 0 - 1
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         hstreamdb_erl
     ]},
     {env, []},

+ 7 - 0
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl

@@ -309,6 +309,13 @@ do_append_records(false, Producer, Record) ->
                 msg => "HStreamDB producer sync append success",
                 record => Record
             });
+        %% the HStream is warming up or buzy, something are not ready yet, retry after a while
+        {error, {unavailable, _} = Reason} ->
+            {error,
+                {recoverable_error, #{
+                    msg => "HStreamDB is warming up or buzy, will retry after a moment",
+                    reason => Reason
+                }}};
         {error, Reason} = Err ->
             ?tp(
                 hstreamdb_connector_query_return,

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -2,7 +2,7 @@
     {description, "EMQX HTTP Bridge and Connector Application"},
     {vsn, "0.1.1"},
     {registered, []},
-    {applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, ehttpc]},
+    {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
     {env, []},
     {modules, []},
     {links, []}

+ 0 - 1
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         influxdb
     ]},
     {env, []},

+ 0 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -11,7 +11,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge_http,
         %% for module emqx_connector_http
         emqx_connector
     ]},

+ 0 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src

@@ -7,7 +7,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         telemetry,
         wolff,
         brod,

+ 9 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -1071,13 +1071,14 @@ cluster(Config) ->
     Cluster = emqx_common_test_helpers:emqx_cluster(
         [core, core],
         [
-            {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]},
+            {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]},
             {listener_ports, []},
             {peer_mod, PeerModule},
             {priv_data_dir, PrivDataDir},
             {load_schema, true},
             {start_autocluster, true},
             {schema_mod, emqx_enterprise_schema},
+            {load_apps, [emqx_machine]},
             {env_handler, fun
                 (emqx) ->
                     application:set_env(emqx, boot_modules, [broker, router]),
@@ -1901,6 +1902,7 @@ t_cluster_node_down(Config) ->
     ?check_trace(
         begin
             {_N2, Opts2} = lists:nth(2, Cluster),
+            NumNodes = length(Cluster),
             Nodes =
                 [N1, N2 | _] =
                 lists:map(
@@ -1925,6 +1927,11 @@ t_cluster_node_down(Config) ->
                 15_000
             ),
             wait_for_cluster_rpc(N2),
+            {ok, _} = snabbkaffe:block_until(
+                %% -1 because only those that join the first node will emit the event.
+                ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
+                30_000
+            ),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             {ok, _} = snabbkaffe:receive_events(SRef0),
             lists:foreach(
@@ -1980,7 +1987,7 @@ t_cluster_node_down(Config) ->
             ?assertEqual(NPartitions, map_size(Assignments)),
             NumPublished = ets:info(TId, size),
             %% All published messages are eventually received.
-            Published = receive_published(#{n => NumPublished, timeout => 3_000}),
+            Published = receive_published(#{n => NumPublished, timeout => 10_000}),
             ct:pal("published:\n  ~p", [Published]),
             ok
         end

+ 1 - 2
apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src

@@ -5,8 +5,7 @@
     {applications, [
         kernel,
         stdlib,
-        emqx_resource,
-        emqx_bridge
+        emqx_resource
     ]},
     {env, []},
     {modules, []},

+ 0 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src

@@ -7,7 +7,6 @@
         stdlib,
         emqx_connector,
         emqx_resource,
-        emqx_bridge,
         emqx_mongodb
     ]},
     {env, []},

+ 1 - 2
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src

@@ -1,14 +1,13 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_mqtt, [
     {description, "EMQX MQTT Broker Bridge"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {applications, [
         kernel,
         stdlib,
         emqx,
         emqx_resource,
-        emqx_bridge,
         emqtt
     ]},
     {env, []},

+ 0 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src

@@ -7,7 +7,6 @@
         stdlib,
         emqx_connector,
         emqx_resource,
-        emqx_bridge,
         emqx_mysql
     ]},
     {env, []},

+ 0 - 1
apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         opentsdb
     ]},
     {env, []},

+ 0 - 1
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         emqx_oracle
     ]},
     {env, []},

+ 20 - 0
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -162,6 +162,9 @@ delete_all_bridges() ->
 sql_insert_template_for_bridge() ->
     "INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload}, ${retain})".
 
+sql_insert_template_with_nested_token_for_bridge() ->
+    "INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload.msg}, ${retain})".
+
 sql_create_table() ->
     "CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
 
@@ -533,6 +536,23 @@ t_start_stop(Config) ->
     ),
     ok.
 
+t_probe_with_nested_tokens(Config) ->
+    ResourceId = resource_id(Config),
+    reset_table(Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config, #{
+            <<"sql">> => sql_insert_template_with_nested_token_for_bridge()
+        })
+    ),
+    %% Since the connection process is async, we give it some time to
+    %% stabilize and avoid flakiness.
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ).
+
 t_on_get_status(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),

+ 1 - 2
apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src

@@ -5,8 +5,7 @@
     {applications, [
         kernel,
         stdlib,
-        emqx_resource,
-        emqx_bridge
+        emqx_resource
     ]},
     {env, []},
     {modules, []},

+ 0 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         pulsar
     ]},
     {env, []},

+ 6 - 0
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl

@@ -547,6 +547,7 @@ start_cluster(Cluster) ->
             emqx_common_test_helpers:start_slave(Name, Opts)
          || {Name, Opts} <- Cluster
         ],
+    NumNodes = length(Nodes),
     on_exit(fun() ->
         emqx_utils:pmap(
             fun(N) ->
@@ -556,6 +557,11 @@ start_cluster(Cluster) ->
             Nodes
         )
     end),
+    {ok, _} = snabbkaffe:block_until(
+        %% -1 because only those that join the first node will emit the event.
+        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
+        30_000
+    ),
     Nodes.
 
 kill_resource_managers() ->

+ 0 - 2
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src

@@ -6,8 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
-        ecql,
         rabbit_common,
         amqp_client
     ]},

+ 0 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src

@@ -7,7 +7,6 @@
         stdlib,
         emqx_connector,
         emqx_resource,
-        emqx_bridge,
         emqx_redis
     ]},
     {env, []},

+ 1 - 1
apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src

@@ -2,7 +2,7 @@
     {description, "EMQX Enterprise RocketMQ Bridge"},
     {vsn, "0.1.3"},
     {registered, []},
-    {applications, [kernel, stdlib, emqx_resource, emqx_bridge, rocketmq]},
+    {applications, [kernel, stdlib, emqx_resource, rocketmq]},
     {env, []},
     {modules, []},
     {links, []}

+ 1 - 1
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src

@@ -2,7 +2,7 @@
     {description, "EMQX Enterprise SQL Server Bridge"},
     {vsn, "0.1.2"},
     {registered, []},
-    {applications, [kernel, stdlib, emqx_resource, emqx_bridge, odbc]},
+    {applications, [kernel, stdlib, emqx_resource, odbc]},
     {env, []},
     {modules, []},
     {links, []}

+ 0 - 1
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src

@@ -6,7 +6,6 @@
         kernel,
         stdlib,
         emqx_resource,
-        emqx_bridge,
         tdengine
     ]},
     {env, []},

+ 1 - 1
apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src

@@ -2,7 +2,7 @@
     {description, "EMQX Enterprise TimescaleDB Bridge"},
     {vsn, "0.1.2"},
     {registered, []},
-    {applications, [kernel, stdlib, emqx_resource, emqx_bridge]},
+    {applications, [kernel, stdlib, emqx_resource]},
     {env, []},
     {modules, []},
     {links, []}

+ 9 - 9
apps/emqx_conf/src/emqx_conf.erl

@@ -71,7 +71,7 @@ get_raw(KeyPath) ->
 %% @doc Returns all values in the cluster.
 -spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}.
 get_all(KeyPath) ->
-    {ResL, []} = emqx_conf_proto_v2:get_all(KeyPath),
+    {ResL, []} = emqx_conf_proto_v3:get_all(KeyPath),
     maps:from_list(ResL).
 
 %% @doc Returns the specified node's KeyPath, or exception if not found
@@ -79,14 +79,14 @@ get_all(KeyPath) ->
 get_by_node(Node, KeyPath) when Node =:= node() ->
     emqx:get_config(KeyPath);
 get_by_node(Node, KeyPath) ->
-    emqx_conf_proto_v2:get_config(Node, KeyPath).
+    emqx_conf_proto_v3:get_config(Node, KeyPath).
 
 %% @doc Returns the specified node's KeyPath, or the default value if not found
 -spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term().
 get_by_node(Node, KeyPath, Default) when Node =:= node() ->
     emqx:get_config(KeyPath, Default);
 get_by_node(Node, KeyPath, Default) ->
-    emqx_conf_proto_v2:get_config(Node, KeyPath, Default).
+    emqx_conf_proto_v3:get_config(Node, KeyPath, Default).
 
 %% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
 -spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term().
@@ -101,7 +101,7 @@ get_node_and_config(KeyPath) ->
 ) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 update(KeyPath, UpdateReq, Opts) ->
-    emqx_conf_proto_v2:update(KeyPath, UpdateReq, Opts).
+    emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts).
 
 %% @doc Update the specified node's key path in local-override.conf.
 -spec update(
@@ -114,7 +114,7 @@ update(KeyPath, UpdateReq, Opts) ->
 update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
     emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
 update(Node, KeyPath, UpdateReq, Opts) ->
-    emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts).
+    emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts).
 
 %% @doc Mark the specified key path as tombstone
 tombstone(KeyPath, Opts) ->
@@ -124,7 +124,7 @@ tombstone(KeyPath, Opts) ->
 -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 remove(KeyPath, Opts) ->
-    emqx_conf_proto_v2:remove_config(KeyPath, Opts).
+    emqx_conf_proto_v3:remove_config(KeyPath, Opts).
 
 %% @doc remove the specified node's key path in local-override.conf.
 -spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
@@ -132,13 +132,13 @@ remove(KeyPath, Opts) ->
 remove(Node, KeyPath, Opts) when Node =:= node() ->
     emqx:remove_config(KeyPath, Opts#{override_to => local});
 remove(Node, KeyPath, Opts) ->
-    emqx_conf_proto_v2:remove_config(Node, KeyPath, Opts).
+    emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts).
 
 %% @doc reset all value of key path in cluster-override.conf or local-override.conf.
 -spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 reset(KeyPath, Opts) ->
-    emqx_conf_proto_v2:reset(KeyPath, Opts).
+    emqx_conf_proto_v3:reset(KeyPath, Opts).
 
 %% @doc reset the specified node's key path in local-override.conf.
 -spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
@@ -146,7 +146,7 @@ reset(KeyPath, Opts) ->
 reset(Node, KeyPath, Opts) when Node =:= node() ->
     emqx:reset_config(KeyPath, Opts#{override_to => local});
 reset(Node, KeyPath, Opts) ->
-    emqx_conf_proto_v2:reset(Node, KeyPath, Opts).
+    emqx_conf_proto_v3:reset(Node, KeyPath, Opts).
 
 %% @doc Called from build script.
 %% TODO: move to a external escript after all refactoring is done

+ 2 - 2
apps/emqx_conf/src/emqx_conf_app.erl

@@ -137,7 +137,7 @@ sync_cluster_conf() ->
 
 %% @private Some core nodes are running, try to sync the cluster config from them.
 sync_cluster_conf2(Nodes) ->
-    {Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
+    {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
     {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
     NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
     case (Failed =/= [] orelse NotReady =/= []) of
@@ -284,7 +284,7 @@ conf_sort({ok, _}, {ok, _}) ->
     false.
 
 sync_data_from_node(Node) ->
-    case emqx_conf_proto_v2:sync_data_from_node(Node) of
+    case emqx_conf_proto_v3:sync_data_from_node(Node) of
         {ok, DataBin} ->
             case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
                 {ok, []} ->

+ 68 - 29
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -213,10 +213,20 @@ load_config(Bin, ReplaceOrMerge) when is_binary(Bin) ->
 load_config_from_raw(RawConf, ReplaceOrMerge) ->
     case check_config(RawConf) of
         ok ->
-            lists:foreach(
-                fun({K, V}) -> update_config_cluster(K, V, ReplaceOrMerge) end,
-                to_sorted_list(RawConf)
-            );
+            Error =
+                lists:filtermap(
+                    fun({K, V}) ->
+                        case update_config_cluster(K, V, ReplaceOrMerge) of
+                            ok -> false;
+                            {error, Msg} -> {true, Msg}
+                        end
+                    end,
+                    to_sorted_list(RawConf)
+                ),
+            case iolist_to_binary(Error) of
+                <<"">> -> ok;
+                ErrorBin -> {error, ErrorBin}
+            end;
         {error, ?UPDATE_READONLY_KEYS_PROHIBITED = Reason} ->
             emqx_ctl:warning("load config failed~n~ts~n", [Reason]),
             emqx_ctl:warning(
@@ -234,34 +244,63 @@ load_config_from_raw(RawConf, ReplaceOrMerge) ->
             {error, Errors}
     end.
 
-update_config_cluster(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
-    check_res(Key, emqx_authz:merge(Conf));
-update_config_cluster(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
-    check_res(Key, emqx_authn:merge_config(Conf));
-update_config_cluster(Key, NewConf, merge) ->
+update_config_cluster(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) ->
+    check_res(Key, emqx_authz:merge(Conf), Conf, Mode);
+update_config_cluster(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) ->
+    check_res(Key, emqx_authn:merge_config(Conf), Conf, Mode);
+update_config_cluster(Key, NewConf, merge = Mode) ->
     Merged = merge_conf(Key, NewConf),
-    check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS));
-update_config_cluster(Key, Value, replace) ->
-    check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS)).
+    check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Mode);
+update_config_cluster(Key, Value, replace = Mode) ->
+    check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Mode).
 
 -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
-update_config_local(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
-    check_res(node(), Key, emqx_authz:merge_local(Conf, ?LOCAL_OPTIONS));
-update_config_local(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
-    check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS));
-update_config_local(Key, NewConf, merge) ->
+update_config_local(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) ->
+    check_res(node(), Key, emqx_authz:merge_local(Conf, ?LOCAL_OPTIONS), Conf, Mode);
+update_config_local(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) ->
+    check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Mode);
+update_config_local(Key, NewConf, merge = Mode) ->
     Merged = merge_conf(Key, NewConf),
-    check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS));
-update_config_local(Key, Value, replace) ->
-    check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS)).
-
-check_res(Key, Res) -> check_res(cluster, Key, Res).
-check_res(Mode, Key, {ok, _} = Res) ->
-    emqx_ctl:print("load ~ts in ~p ok~n", [Key, Mode]),
-    Res;
-check_res(_Mode, Key, {error, Reason} = Res) ->
-    emqx_ctl:warning("load ~ts failed~n~p~n", [Key, Reason]),
-    Res.
+    check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Mode);
+update_config_local(Key, Value, replace = Mode) ->
+    check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Mode).
+
+check_res(Key, Res, Conf, Mode) -> check_res(cluster, Key, Res, Conf, Mode).
+check_res(Node, Key, {ok, _}, _Conf, _Mode) ->
+    emqx_ctl:print("load ~ts on ~p ok~n", [Key, Node]),
+    ok;
+check_res(_Node, Key, {error, Reason}, Conf, Mode) ->
+    Warning =
+        "Can't ~ts the new configurations!~n"
+        "Root key: ~ts~n"
+        "Reason: ~p~n",
+    emqx_ctl:warning(Warning, [Mode, Key, Reason]),
+    ActiveMsg0 =
+        "The effective configurations:~n"
+        "```~n"
+        "~ts```~n~n",
+    ActiveMsg = io_lib:format(ActiveMsg0, [hocon_pp:do(#{Key => emqx_conf:get_raw([Key])}, #{})]),
+    FailedMsg0 =
+        "Try to ~ts with:~n"
+        "```~n"
+        "~ts```~n",
+    FailedMsg = io_lib:format(FailedMsg0, [Mode, hocon_pp:do(#{Key => Conf}, #{})]),
+    SuggestMsg = suggest_msg(Mode),
+    Msg = iolist_to_binary([ActiveMsg, FailedMsg, SuggestMsg]),
+    emqx_ctl:print("~ts", [Msg]),
+    {error, iolist_to_binary([Warning, Msg])}.
+
+suggest_msg(Mode) when Mode == merge orelse Mode == replace ->
+    RetryMode =
+        case Mode of
+            merge -> "replace";
+            replace -> "merge"
+        end,
+    io_lib:format(
+        "Tips: There may be some conflicts in the new configuration under `~ts` mode,~n"
+        "Please retry with the `~ts` mode.~n",
+        [Mode, RetryMode]
+    ).
 
 check_config(Conf) ->
     case check_keys_is_not_readonly(Conf) of
@@ -349,7 +388,7 @@ filter_readonly_config(Raw) ->
 reload_config(AllConf, ReplaceOrMerge) ->
     Fold = fun({Key, Conf}, Acc) ->
         case update_config_local(Key, Conf, ReplaceOrMerge) of
-            {ok, _} ->
+            ok ->
                 Acc;
             Error ->
                 ?SLOG(error, #{

+ 119 - 0
apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl

@@ -0,0 +1,119 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_conf_proto_v3).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    sync_data_from_node/1,
+    get_config/2,
+    get_config/3,
+    get_all/1,
+
+    update/3,
+    update/4,
+    remove_config/2,
+    remove_config/3,
+
+    reset/2,
+    reset/3,
+
+    get_override_config_file/1
+]).
+
+-export([get_hocon_config/1, get_hocon_config/2]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.1.1".
+
+-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
+sync_data_from_node(Node) ->
+    rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).
+-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...].
+
+-spec get_config(node(), emqx_utils_maps:config_key_path()) ->
+    term() | emqx_rpc:badrpc().
+get_config(Node, KeyPath) ->
+    rpc:call(Node, emqx, get_config, [KeyPath]).
+
+-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) ->
+    term() | emqx_rpc:badrpc().
+get_config(Node, KeyPath, Default) ->
+    rpc:call(Node, emqx, get_config, [KeyPath, Default]).
+
+-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result().
+get_all(KeyPath) ->
+    rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
+
+-spec update(
+    update_config_key_path(),
+    emqx_config:update_request(),
+    emqx_config:update_opts()
+) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
+update(KeyPath, UpdateReq, Opts) ->
+    emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
+
+-spec update(
+    node(),
+    update_config_key_path(),
+    emqx_config:update_request(),
+    emqx_config:update_opts()
+) ->
+    {ok, emqx_config:update_result()}
+    | {error, emqx_config:update_error()}
+    | emqx_rpc:badrpc().
+update(Node, KeyPath, UpdateReq, Opts) ->
+    rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
+
+-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
+    {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
+remove_config(KeyPath, Opts) ->
+    emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
+
+-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
+    {ok, emqx_config:update_result()}
+    | {error, emqx_config:update_error()}
+    | emqx_rpc:badrpc().
+remove_config(Node, KeyPath, Opts) ->
+    rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
+
+-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
+    {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
+reset(KeyPath, Opts) ->
+    emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
+
+-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
+    {ok, emqx_config:update_result()}
+    | {error, emqx_config:update_error()}
+    | emqx_rpc:badrpc().
+reset(Node, KeyPath, Opts) ->
+    rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).
+
+-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result().
+get_override_config_file(Nodes) ->
+    rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000).
+
+-spec get_hocon_config(node()) -> map() | {badrpc, _}.
+get_hocon_config(Node) ->
+    rpc:call(Node, emqx_conf_cli, get_config, []).
+
+-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}.
+get_hocon_config(Node, Key) ->
+    rpc:call(Node, emqx_conf_cli, get_config, [Key]).

+ 112 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -0,0 +1,112 @@
+%% -*- mode: erlang; -*-
+#{
+    %% must always be of type `load'
+    db_apps =>
+        [
+            mnesia_rocksdb,
+            mnesia,
+            mria,
+            ekka
+        ],
+    system_apps =>
+        [
+            kernel,
+            sasl,
+            crypto,
+            public_key,
+            asn1,
+            syntax_tools,
+            ssl,
+            os_mon,
+            inets,
+            compiler,
+            runtime_tools,
+            redbug,
+            xmerl,
+            {hocon, load},
+            telemetry
+        ],
+    %% must always be of type `load'
+    common_business_apps =>
+        [
+            emqx,
+            emqx_conf,
+
+            esasl,
+            observer_cli,
+            tools,
+            covertool,
+            %% started by emqx_machine
+            system_monitor,
+            emqx_utils,
+            emqx_durable_storage,
+            emqx_http_lib,
+            emqx_resource,
+            emqx_connector,
+            emqx_authn,
+            emqx_authz,
+            emqx_auto_subscribe,
+            emqx_gateway,
+            emqx_gateway_stomp,
+            emqx_gateway_mqttsn,
+            emqx_gateway_coap,
+            emqx_gateway_lwm2m,
+            emqx_gateway_exproto,
+            emqx_exhook,
+            emqx_bridge,
+            emqx_bridge_mqtt,
+            emqx_bridge_http,
+            emqx_rule_engine,
+            emqx_modules,
+            emqx_management,
+            emqx_dashboard,
+            emqx_retainer,
+            emqx_prometheus,
+            emqx_psk,
+            emqx_slow_subs,
+            emqx_mongodb,
+            emqx_redis,
+            emqx_mysql,
+            emqx_plugins,
+            quicer,
+            bcrypt,
+            jq,
+            observer
+        ],
+    %% must always be of type `load'
+    ee_business_apps =>
+        [
+            emqx_license,
+            emqx_enterprise,
+            emqx_bridge_kafka,
+            emqx_bridge_pulsar,
+            emqx_bridge_gcp_pubsub,
+            emqx_bridge_cassandra,
+            emqx_bridge_opents,
+            emqx_bridge_clickhouse,
+            emqx_bridge_dynamo,
+            emqx_bridge_hstreamdb,
+            emqx_bridge_influxdb,
+            emqx_bridge_iotdb,
+            emqx_bridge_matrix,
+            emqx_bridge_mongodb,
+            emqx_bridge_mysql,
+            emqx_bridge_pgsql,
+            emqx_bridge_redis,
+            emqx_bridge_rocketmq,
+            emqx_bridge_tdengine,
+            emqx_bridge_timescale,
+            emqx_bridge_sqlserver,
+            emqx_bridge_kinesis,
+            emqx_oracle,
+            emqx_bridge_oracle,
+            emqx_bridge_rabbitmq,
+            emqx_schema_registry,
+            emqx_eviction_agent,
+            emqx_node_rebalance,
+            emqx_ft
+        ],
+    %% must always be of type `load'
+    ce_business_apps =>
+        [emqx_telemetry]
+}.

+ 53 - 39
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -16,6 +16,7 @@
 -module(emqx_machine_boot).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -export([post_boot/0]).
 -export([stop_apps/0, ensure_apps_started/0]).
@@ -24,7 +25,6 @@
 -export([stop_port_apps/0]).
 
 -dialyzer({no_match, [basic_reboot_apps/0]}).
--dialyzer({no_match, [basic_reboot_apps_edition/1]}).
 
 -ifdef(TEST).
 -export([sorted_reboot_apps/1, reboot_apps/0]).
@@ -94,7 +94,8 @@ stop_one_app(App) ->
 
 ensure_apps_started() ->
     ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
-    lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
+    lists:foreach(fun start_one_app/1, sorted_reboot_apps()),
+    ?tp(emqx_machine_boot_apps_started, #{}).
 
 start_one_app(App) ->
     ?SLOG(debug, #{msg => "starting_app", app => App}),
@@ -128,45 +129,39 @@ reboot_apps() ->
     BaseRebootApps ++ ConfigApps.
 
 basic_reboot_apps() ->
-    ?BASIC_REBOOT_APPS ++
-        [
-            emqx_prometheus,
-            emqx_modules,
-            emqx_dashboard,
-            emqx_connector,
-            emqx_gateway,
-            emqx_resource,
-            emqx_rule_engine,
-            emqx_bridge,
-            emqx_management,
-            emqx_retainer,
-            emqx_exhook,
-            emqx_authn,
-            emqx_authz,
-            emqx_slow_subs,
-            emqx_auto_subscribe,
-            emqx_plugins,
-            emqx_psk,
-            emqx_durable_storage
-        ] ++ basic_reboot_apps_edition(emqx_release:edition()).
-
-basic_reboot_apps_edition(ce) ->
-    [emqx_telemetry];
-basic_reboot_apps_edition(ee) ->
-    [
-        emqx_license,
-        emqx_s3,
-        emqx_ft,
-        emqx_eviction_agent,
-        emqx_node_rebalance,
-        emqx_schema_registry
-    ];
-%% unexcepted edition, should not happen
-basic_reboot_apps_edition(_) ->
-    [].
+    PrivDir = code:priv_dir(emqx_machine),
+    RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]),
+    {ok, [
+        #{
+            common_business_apps := CommonBusinessApps,
+            ee_business_apps := EEBusinessApps,
+            ce_business_apps := CEBusinessApps
+        }
+    ]} = file:consult(RebootListPath),
+    EditionSpecificApps =
+        case emqx_release:edition() of
+            ee -> EEBusinessApps;
+            ce -> CEBusinessApps;
+            _ -> []
+        end,
+    BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
+    ?BASIC_REBOOT_APPS ++ (BusinessApps -- excluded_apps()).
+
+excluded_apps() ->
+    OptionalApps = [bcrypt, jq, observer],
+    [system_monitor, observer_cli] ++
+        [App || App <- OptionalApps, not is_app(App)].
+
+is_app(Name) ->
+    case application:load(Name) of
+        ok -> true;
+        {error, {already_loaded, _}} -> true;
+        _ -> false
+    end.
 
 sorted_reboot_apps() ->
-    Apps = [{App, app_deps(App)} || App <- reboot_apps()],
+    Apps0 = [{App, app_deps(App)} || App <- reboot_apps()],
+    Apps = inject_bridge_deps(Apps0),
     sorted_reboot_apps(Apps).
 
 app_deps(App) ->
@@ -175,6 +170,25 @@ app_deps(App) ->
         {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
     end.
 
+%% `emqx_bridge' is special in that it needs all the bridges apps to
+%% be started before it, so that, when it loads the bridges from
+%% configuration, the bridge app and its dependencies need to be up.
+inject_bridge_deps(RebootAppDeps) ->
+    BridgeApps = [
+        App
+     || {App, _Deps} <- RebootAppDeps,
+        lists:prefix("emqx_bridge_", atom_to_list(App))
+    ],
+    lists:map(
+        fun
+            ({emqx_bridge, Deps0}) when is_list(Deps0) ->
+                {emqx_bridge, Deps0 ++ BridgeApps};
+            (App) ->
+                App
+        end,
+        RebootAppDeps
+    ).
+
 sorted_reboot_apps(Apps) ->
     G = digraph:new(),
     try

+ 7 - 7
apps/emqx_management/src/emqx_mgmt.erl

@@ -179,7 +179,7 @@ get_sys_memory() ->
     end.
 
 node_info(Nodes) ->
-    emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)).
+    emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)).
 
 stopped_node_info(Node) ->
     {Node, #{node => Node, node_status => 'stopped', role => core}}.
@@ -223,7 +223,7 @@ convert_broker_info({K, V}, M) ->
     M#{K => iolist_to_binary(V)}.
 
 broker_info(Nodes) ->
-    emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)).
+    emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)).
 
 %%--------------------------------------------------------------------
 %% Metrics and Stats
@@ -446,7 +446,7 @@ do_call_client(ClientId, Req) ->
 
 %% @private
 call_client(Node, ClientId, Req) ->
-    unwrap_rpc(emqx_management_proto_v3:call_client(Node, ClientId, Req)).
+    unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)).
 
 %%--------------------------------------------------------------------
 %% Subscriptions
@@ -459,7 +459,7 @@ do_list_subscriptions() ->
     throw(not_implemented).
 
 list_subscriptions(Node) ->
-    unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)).
+    unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)).
 
 list_subscriptions_via_topic(Topic, FormatFun) ->
     lists:append([
@@ -481,7 +481,7 @@ subscribe(ClientId, TopicTables) ->
     subscribe(emqx:running_nodes(), ClientId, TopicTables).
 
 subscribe([Node | Nodes], ClientId, TopicTables) ->
-    case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of
+    case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of
         {error, _} -> subscribe(Nodes, ClientId, TopicTables);
         {subscribe, Res} -> {subscribe, Res, Node}
     end;
@@ -508,7 +508,7 @@ unsubscribe(ClientId, Topic) ->
 -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
     {unsubscribe, _} | {error, channel_not_found}.
 unsubscribe([Node | Nodes], ClientId, Topic) ->
-    case unwrap_rpc(emqx_management_proto_v3:unsubscribe(Node, ClientId, Topic)) of
+    case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of
         {error, _} -> unsubscribe(Nodes, ClientId, Topic);
         Re -> Re
     end;
@@ -531,7 +531,7 @@ unsubscribe_batch(ClientId, Topics) ->
 -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
     {unsubscribe_batch, _} | {error, channel_not_found}.
 unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
-    case unwrap_rpc(emqx_management_proto_v3:unsubscribe_batch(Node, ClientId, Topics)) of
+    case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of
         {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
         Re -> Re
     end;

+ 18 - 9
apps/emqx_management/src/emqx_mgmt_api_configs.erl

@@ -346,11 +346,10 @@ configs(get, #{query_string := QueryStr, headers := Headers}, _Req) ->
 configs(put, #{body := Conf, query_string := #{<<"mode">> := Mode}}, _Req) ->
     case emqx_conf_cli:load_config(Conf, Mode) of
         ok -> {200};
-        {error, [{_, Reason}]} -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}};
-        {error, Errors} -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Errors)}}
+        {error, Msg} -> {400, #{<<"content-type">> => <<"text/plain">>}, Msg}
     end.
 
-find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Perferences) > 0 ->
+find_suitable_accept(Headers, Preferences) when is_list(Preferences), length(Preferences) > 0 ->
     AcceptVal = maps:get(<<"accept">>, Headers, <<"*/*">>),
     %% Multiple types, weighted with the quality value syntax:
     %% Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, */*;q=0.8
@@ -363,20 +362,27 @@ find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Per
     ),
     case lists:member(<<"*/*">>, Accepts) of
         true ->
-            {ok, lists:nth(1, Perferences)};
+            {ok, lists:nth(1, Preferences)};
         false ->
-            Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Perferences),
+            Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Preferences),
             case Found of
-                [] -> {error, no_suitalbe_accept};
+                [] -> {error, no_suitable_accept};
                 _ -> {ok, lists:nth(1, Found)}
             end
     end.
 
+%% To return a JSON formatted configuration file, which is used to be compatible with the already
+%% implemented `GET /configs` in the old versions 5.0 and 5.1.
+%%
+%% In e5.1.1, we support to return a hocon configuration file by `get_configs_v2/1`. It's more
+%% useful for the user to read or reload the configuration file via HTTP API.
+%%
+%% The `get_configs_v1/1` should be deprecated since 5.2.0.
 get_configs_v1(QueryStr) ->
     Node = maps:get(<<"node">>, QueryStr, node()),
     case
         lists:member(Node, emqx:running_nodes()) andalso
-            emqx_management_proto_v2:get_full_config(Node)
+            emqx_management_proto_v4:get_full_config(Node)
     of
         false ->
             Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
@@ -389,10 +395,13 @@ get_configs_v1(QueryStr) ->
     end.
 
 get_configs_v2(QueryStr) ->
+    Node = maps:get(<<"node">>, QueryStr, node()),
     Conf =
         case maps:find(<<"key">>, QueryStr) of
-            error -> emqx_conf_cli:get_config();
-            {ok, Key} -> emqx_conf_cli:get_config(atom_to_binary(Key))
+            error ->
+                emqx_conf_proto_v3:get_hocon_config(Node);
+            {ok, Key} ->
+                emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key))
         end,
     {
         200,

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -515,7 +515,7 @@ list_listeners() ->
     lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
 
 list_listeners(Node) ->
-    wrap_rpc(emqx_management_proto_v2:list_listeners(Node)).
+    wrap_rpc(emqx_management_proto_v4:list_listeners(Node)).
 
 listener_status_by_id(NodeL) ->
     Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),

+ 34 - 9
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -110,17 +110,21 @@ broker(_) ->
 %% @doc Cluster with other nodes
 
 cluster(["join", SNode]) ->
-    case ekka:join(ekka_node:parse_name(SNode)) of
+    case mria:join(ekka_node:parse_name(SNode)) of
         ok ->
             emqx_ctl:print("Join the cluster successfully.~n"),
-            cluster(["status"]);
+            %% FIXME: running status on the replicant immediately
+            %% after join produces stale output
+            mria_rlog:role() =:= core andalso
+                cluster(["status"]),
+            ok;
         ignore ->
             emqx_ctl:print("Ignore.~n");
         {error, Error} ->
             emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
     end;
 cluster(["leave"]) ->
-    case ekka:leave() of
+    case mria:leave() of
         ok ->
             emqx_ctl:print("Leave the cluster successfully.~n"),
             cluster(["status"]);
@@ -128,7 +132,7 @@ cluster(["leave"]) ->
             emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
     end;
 cluster(["force-leave", SNode]) ->
-    case ekka:force_leave(ekka_node:parse_name(SNode)) of
+    case mria:force_leave(ekka_node:parse_name(SNode)) of
         ok ->
             emqx_ctl:print("Remove the node from cluster successfully.~n"),
             cluster(["status"]);
@@ -138,9 +142,9 @@ cluster(["force-leave", SNode]) ->
             emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
     end;
 cluster(["status"]) ->
-    emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]);
+    emqx_ctl:print("Cluster status: ~p~n", [cluster_info()]);
 cluster(["status", "--json"]) ->
-    Info = sort_map_list_fields(ekka_cluster:info()),
+    Info = sort_map_list_fields(cluster_info()),
     emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]);
 cluster(_) ->
     emqx_ctl:usage([
@@ -158,9 +162,7 @@ sort_map_list_fields(Map) when is_map(Map) ->
         end,
         Map,
         maps:keys(Map)
-    );
-sort_map_list_fields(NotMap) ->
-    NotMap.
+    ).
 
 sort_map_list_field(Field, Map) ->
     case maps:get(Field, Map) of
@@ -925,3 +927,26 @@ with_log(Fun, Msg) ->
         {error, Reason} ->
             emqx_ctl:print("~s FAILED~n~p~n", [Msg, Reason])
     end.
+
+cluster_info() ->
+    RunningNodes = safe_call_mria(running_nodes, [], []),
+    StoppedNodes = safe_call_mria(cluster_nodes, [stopped], []),
+    #{
+        running_nodes => RunningNodes,
+        stopped_nodes => StoppedNodes
+    }.
+
+%% CLI starts before mria, so we should handle errors gracefully:
+safe_call_mria(Fun, Args, OnFail) ->
+    try
+        apply(mria, Fun, Args)
+    catch
+        EC:Err:Stack ->
+            ?SLOG(warning, #{
+                msg => "Call to mria failed",
+                call => {mria, Fun, Args},
+                EC => Err,
+                stacktrace => Stack
+            }),
+            OnFail
+    end.

+ 25 - 14
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl

@@ -272,18 +272,22 @@ t_dashboard(_Config) ->
 t_configs_node({'init', Config}) ->
     Node = node(),
     meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end),
-    meck:expect(
-        emqx_management_proto_v2,
-        get_full_config,
-        fun
-            (Node0) when Node0 =:= Node -> <<"\"self\"">>;
-            (other_node) -> <<"\"other\"">>;
-            (bad_node) -> {badrpc, bad}
-        end
-    ),
+    F = fun
+        (Node0) when Node0 =:= Node -> <<"\"self\"">>;
+        (other_node) -> <<"\"other\"">>;
+        (bad_node) -> {badrpc, bad}
+    end,
+    F2 = fun
+        (Node0, _) when Node0 =:= Node -> <<"log=1">>;
+        (other_node, _) -> <<"log=2">>;
+        (bad_node, _) -> {badrpc, bad}
+    end,
+    meck:expect(emqx_management_proto_v4, get_full_config, F),
+    meck:expect(emqx_conf_proto_v3, get_hocon_config, F2),
+    meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end),
     Config;
 t_configs_node({'end', _}) ->
-    meck:unload([emqx, emqx_management_proto_v2]);
+    meck:unload([emqx, emqx_management_proto_v4, emqx_conf_proto_v3, hocon_pp]);
 t_configs_node(_) ->
     Node = atom_to_list(node()),
 
@@ -296,7 +300,10 @@ t_configs_node(_) ->
     {_, _, Body} = ExpRes,
     ?assertMatch(#{<<"code">> := <<"NOT_FOUND">>}, emqx_utils_json:decode(Body, [return_maps])),
 
-    ?assertMatch({error, {_, 500, _}}, get_configs_with_json("bad_node")).
+    ?assertMatch({error, {_, 500, _}}, get_configs_with_json("bad_node")),
+
+    ?assertEqual({ok, #{<<"log">> => 1}}, get_configs_with_binary("log", Node)),
+    ?assertEqual({ok, #{<<"log">> => 2}}, get_configs_with_binary("log", "other_node")).
 
 %% v2 version binary
 t_configs_key(_Config) ->
@@ -386,12 +393,16 @@ get_configs_with_json(Node, Opts) ->
     end.
 
 get_configs_with_binary(Key) ->
+    get_configs_with_binary(Key, atom_to_list(node())).
+
+get_configs_with_binary(Key, Node) ->
+    Path0 = "configs?node=" ++ Node,
     Path =
         case Key of
-            undefined -> ["configs"];
-            _ -> ["configs?key=" ++ Key]
+            undefined -> Path0;
+            _ -> Path0 ++ "&key=" ++ Key
         end,
-    URI = emqx_mgmt_api_test_util:api_path(Path),
+    URI = emqx_mgmt_api_test_util:api_path([Path]),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
     Headers = [{"accept", "text/plain"}, Auth],
     case emqx_mgmt_api_test_util:request_api(get, URI, [], Headers, [], #{return_all => true}) of

+ 4 - 3
apps/emqx_modules/src/emqx_modules_conf.erl

@@ -168,10 +168,11 @@ post_config_update(_, _UpdateReq, NewConfig, OldConfig, _AppEnvs) ->
     } = emqx_utils:diff_lists(NewConfig, OldConfig, fun(#{topic := T}) -> T end),
     Deregistered = [emqx_topic_metrics:deregister(T) || #{topic := T} <- Removed],
     Registered = [emqx_topic_metrics:register(T) || #{topic := T} <- Added],
-    Errs = [Res || Res <- Registered ++ Deregistered, Res =/= ok],
-    case Errs of
+    DeregisteredErrs = [Res || Res <- Deregistered, Res =/= ok, Res =/= {error, topic_not_found}],
+    RegisteredErrs = [Res || Res <- Registered, Res =/= ok, Res =/= {error, already_existed}],
+    case DeregisteredErrs ++ RegisteredErrs of
         [] -> ok;
-        _ -> {error, Errs}
+        Errs -> {error, Errs}
     end.
 
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl

@@ -506,7 +506,7 @@ fields(local_status_enabled) ->
             )},
         {"process",
             mk(
-                hoconsc:union([rebalance, evacuation]),
+                hoconsc:enum([rebalance, evacuation]),
                 #{
                     desc => ?DESC(local_status_process),
                     required => true

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [
         kernel,

+ 18 - 2
apps/emqx_oracle/src/emqx_oracle.erl

@@ -9,6 +9,10 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
+-define(UNHEALTHY_TARGET_MSG,
+    "Oracle table is invalid. Please check if the table exists in Oracle Database."
+).
+
 %%====================================================================
 %% Exports
 %%====================================================================
@@ -239,7 +243,7 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
                     {connected, NState};
                 {error, {undefined_table, NState}} ->
                     %% return new state indicating that we are connected but the target table is not created
-                    {disconnected, NState, unhealthy_target};
+                    {disconnected, NState, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}};
                 {error, _Reason} ->
                     %% do not log error, it is logged in prepare_sql_to_conn
                     connecting
@@ -408,7 +412,19 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) whe
             Error
     end.
 
-check_if_table_exists(Conn, SQL, Tokens) ->
+check_if_table_exists(Conn, SQL, Tokens0) ->
+    % Discard nested tokens for checking if table exist. As payload here is defined as
+    % a single string, it would fail if Token is, for instance, ${payload.msg}, causing
+    % bridge probe to fail.
+    Tokens = lists:map(
+        fun
+            ({var, [Token | _DiscardedDeepTokens]}) ->
+                {var, [Token]};
+            (Token) ->
+                Token
+        end,
+        Tokens0
+    ),
     {Event, _Headers} = emqx_rule_events:eventmsg_publish(
         emqx_message:make(<<"t/opic">>, "test query")
     ),

+ 9 - 8
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -364,11 +364,7 @@ cluster(Config) ->
             {load_schema, true},
             {start_autocluster, true},
             {schema_mod, emqx_enterprise_schema},
-            %% need to restart schema registry app in the tests so
-            %% that it re-registers the config handler that is lost
-            %% when emqx_conf restarts during join.
-            {env, [{emqx_machine, applications, [emqx_schema_registry]}]},
-            {load_apps, [emqx_machine | ?APPS]},
+            {load_apps, [emqx_machine]},
             {env_handler, fun
                 (emqx) ->
                     application:set_env(emqx, boot_modules, [broker, router]),
@@ -388,6 +384,7 @@ start_cluster(Cluster) ->
         emqx_common_test_helpers:start_slave(Name, Opts)
      || {Name, Opts} <- Cluster
     ],
+    NumNodes = length(Nodes),
     on_exit(fun() ->
         emqx_utils:pmap(
             fun(N) ->
@@ -397,7 +394,11 @@ start_cluster(Cluster) ->
             Nodes
         )
     end),
-    erpc:multicall(Nodes, mria_rlog, wait_for_shards, [[?SCHEMA_REGISTRY_SHARD], 30_000]),
+    {ok, _} = snabbkaffe:block_until(
+        %% -1 because only those that join the first node will emit the event.
+        ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
+        30_000
+    ),
     Nodes.
 
 wait_for_cluster_rpc(Node) ->
@@ -658,7 +659,7 @@ t_cluster_serde_build(Config) ->
             Nodes = [N1, N2 | _] = start_cluster(Cluster),
             NumNodes = length(Nodes),
             wait_for_cluster_rpc(N2),
-            ?assertEqual(
+            ?assertMatch(
                 ok,
                 erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema])
             ),
@@ -687,7 +688,7 @@ t_cluster_serde_build(Config) ->
             {ok, SRef1} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := schema_registry_serdes_deleted}),
                 NumNodes,
-                5_000
+                10_000
             ),
             ?assertEqual(
                 ok,

+ 1 - 0
changes/ce/fix-11294.en.md

@@ -0,0 +1 @@
+Fix `emqx_ctl cluster join`, `leave`, and `status` commands.

+ 2 - 0
changes/ce/fix-11309.en.md

@@ -0,0 +1,2 @@
+Improve startup order of EMQX applications.
+Simplify build scripts and improve code reuse.

+ 4 - 0
changes/ce/fix-11322.en.md

@@ -0,0 +1,4 @@
+Import additional configurations from EMQX backup file (`emqx ctl import` command):
+ - rule_engine (previously not imported due to the bug)
+ - topic_metrics (previously not implemented)
+ - slow_subs (previously not implemented).

+ 1 - 0
changes/ee/fix-11307.en.md

@@ -0,0 +1 @@
+Fixed check for table existence to return a more friendly message in the Oracle bridge.

+ 72 - 0
changes/v5.1.2.en.md

@@ -0,0 +1,72 @@
+# v5.1.2
+
+## Enhancements
+
+- [#11124](https://github.com/emqx/emqx/pull/11124) Release packages for Amazon Linux 2023
+
+- [#11226](https://github.com/emqx/emqx/pull/11226) Unify the listener switch to `enable`, while being compatible with the previous `enabled`.
+
+- [#11249](https://github.com/emqx/emqx/pull/11249) Support HTTP API for setting alarm watermark of license.
+
+- [#11251](https://github.com/emqx/emqx/pull/11251) Add `/cluster/topology` HTTP API endpoint
+
+  `GET` request to the endpoint returns the cluster topology: connections between RLOG core and replicant nodes.
+
+- [#11253](https://github.com/emqx/emqx/pull/11253) The Webhook/HTTP bridge has been refactored to its own Erlang application. This allows for more flexibility in the future, and also allows for the bridge to be run as a standalone application.
+
+- [#11289](https://github.com/emqx/emqx/pull/11289) Release packages for Debian 12.
+
+- [#11290](https://github.com/emqx/emqx/pull/11290) Updated `jq` dependency to version 0.3.10 which includes `oniguruma` library update to version 6.9.8 with few minor security fixes.
+
+- [#11291](https://github.com/emqx/emqx/pull/11291) Updated RocksDB version to 1.8.0-emqx-1 via ekka update to 0.15.6.
+
+- [#11236](https://github.com/emqx/emqx/pull/11236) Improve the speed of clients querying in HTTP API `/clients` endpoint with default parameters
+
+## Bug Fixes
+
+- [#11065](https://github.com/emqx/emqx/pull/11065) Avoid logging irrelevant error messages during EMQX shutdown.
+
+- [#11077](https://github.com/emqx/emqx/pull/11077) Fixes crash when updating binding with a non-integer port.
+
+- [#11184](https://github.com/emqx/emqx/pull/11184) Config value for `max_packet_size` has a max value of 256MB defined by protocol. This is now enforced and any configuration with a value greater than that will break.
+
+- [#11192](https://github.com/emqx/emqx/pull/11192) Fix produces valid HOCON file when atom type is used.
+  Remove unnecessary `"` from HOCON file.
+
+- [#11195](https://github.com/emqx/emqx/pull/11195) Avoid to create duplicated subscription by HTTP API or client in Stomp gateway
+
+- [#11206](https://github.com/emqx/emqx/pull/11206) Make the username and password params of CoAP client to optional in connection mode.
+
+- [#11208](https://github.com/emqx/emqx/pull/11208) Fix the issue of abnormal data statistics for LwM2M client.
+
+- [#11211](https://github.com/emqx/emqx/pull/11211) Consistently return `404` for `DELETE` operations on non-existent resources.
+
+- [#11214](https://github.com/emqx/emqx/pull/11214) Fix a bug where node configuration may fail to synchronize correctly when joining the cluster.
+
+- [#11229](https://github.com/emqx/emqx/pull/11229) Fixed an issue preventing plugins from starting/stopping after changing configuration via `emqx ctl conf load`.
+
+- [#11237](https://github.com/emqx/emqx/pull/11237) The `headers` default value in /prometheus API should be a map instead of a list.
+
+- [#11250](https://github.com/emqx/emqx/pull/11250) Fix while a WebSocket packet contains more than one MQTT packet, the order of MQTT packets will be reversed.
+
+
+- [#11271](https://github.com/emqx/emqx/pull/11271) Ensure that the range of percentage type is from 0% to 100%.
+
+- [#11272](https://github.com/emqx/emqx/pull/11272) Fix a typo in the log, when EMQX received an abnormal `PUBREL` packet, the `pubrel` was mistakenly typo as `pubrec`.
+
+- [#11281](https://github.com/emqx/emqx/pull/11281) Restored support for the special `$queue/` shared subscription.
+
+- [#11294](https://github.com/emqx/emqx/pull/11294) Fix `emqx_ctl cluster join`, `leave`, and `status` commands.
+
+- [#11296](https://github.com/emqx/emqx/pull/11296) Import additional configurations from EMQX backup file (`emqx ctl import` command):
+  - rule_engine (previously not imported due to the bug)
+  - topic_metrics (previously not implemented)
+  - slow_subs (previously not implemented).
+
+- [#11309](https://github.com/emqx/emqx/pull/11309) Improve startup order of EMQX applications.
+  Simplify build scripts and improve code reuse.
+
+- [#11322](https://github.com/emqx/emqx/pull/11322) Import additional configurations from EMQX backup file (`emqx ctl import` command):
+  - rule_engine (previously not imported due to the bug)
+  - topic_metrics (previously not implemented)
+  - slow_subs (previously not implemented).

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.1.1
+version: 5.1.2
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.1.1
+appVersion: 5.1.2

+ 53 - 108
mix.exs

@@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
       {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
       {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true},
-      {:ekka, github: "emqx/ekka", tag: "0.15.6", override: true},
+      {:ekka, github: "emqx/ekka", tag: "0.15.7", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.11", override: true},
@@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
       # in conflict by emqtt and hocon
       {:getopt, "1.0.2", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
-      {:hocon, github: "emqx/hocon", tag: "0.39.13", override: true},
+      {:hocon, github: "emqx/hocon", tag: "0.39.14", override: true},
       {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
       {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
@@ -298,6 +298,7 @@ defmodule EMQXUmbrella.MixProject do
         [
           applications: applications(edition_type),
           skip_mode_validation_for: [
+            :emqx_mix,
             :emqx_gateway,
             :emqx_gateway_stomp,
             :emqx_gateway_mqttsn,
@@ -317,7 +318,10 @@ defmodule EMQXUmbrella.MixProject do
             :emqx_auto_subscribe,
             :emqx_slow_subs,
             :emqx_plugins,
-            :emqx_ft
+            :emqx_ft,
+            :emqx_s3,
+            :emqx_durable_storage,
+            :rabbit_common
           ],
           steps: steps,
           strip_beams: false
@@ -327,113 +331,54 @@ defmodule EMQXUmbrella.MixProject do
   end
 
   def applications(edition_type) do
-    [
-      crypto: :permanent,
-      public_key: :permanent,
-      asn1: :permanent,
-      syntax_tools: :permanent,
-      ssl: :permanent,
-      os_mon: :permanent,
-      inets: :permanent,
-      compiler: :permanent,
-      runtime_tools: :permanent,
-      redbug: :permanent,
-      xmerl: :permanent,
-      hocon: :load,
-      telemetry: :permanent,
-      emqx: :load,
-      emqx_conf: :load,
-      emqx_machine: :permanent
-    ] ++
-      if(enable_rocksdb?(),
-        do: [mnesia_rocksdb: :load],
-        else: []
-      ) ++
-      [
-        mnesia: :load,
-        ekka: :load,
-        esasl: :load,
-        observer_cli: :permanent,
-        tools: :permanent,
-        covertool: :load,
-        system_monitor: :load,
-        emqx_utils: :load,
-        emqx_http_lib: :permanent,
-        emqx_resource: :permanent,
-        emqx_connector: :permanent,
-        emqx_authn: :permanent,
-        emqx_authz: :permanent,
-        emqx_auto_subscribe: :permanent,
-        emqx_gateway: :permanent,
-        emqx_gateway_stomp: :permanent,
-        emqx_gateway_mqttsn: :permanent,
-        emqx_gateway_coap: :permanent,
-        emqx_gateway_lwm2m: :permanent,
-        emqx_gateway_exproto: :permanent,
-        emqx_exhook: :permanent,
-        emqx_bridge: :permanent,
-        emqx_bridge_mqtt: :permanent,
-        emqx_bridge_http: :permanent,
-        emqx_rule_engine: :permanent,
-        emqx_modules: :permanent,
-        emqx_management: :permanent,
-        emqx_dashboard: :permanent,
-        emqx_retainer: :permanent,
-        emqx_prometheus: :permanent,
-        emqx_psk: :permanent,
-        emqx_slow_subs: :permanent,
-        emqx_mongodb: :permanent,
-        emqx_redis: :permanent,
-        emqx_mysql: :permanent,
-        emqx_plugins: :permanent,
-        emqx_mix: :none
-      ] ++
-      if(enable_quicer?(), do: [quicer: :permanent], else: []) ++
-      if(enable_bcrypt?(), do: [bcrypt: :permanent], else: []) ++
-      if(enable_jq?(), do: [jq: :load], else: []) ++
-      if(is_app(:observer),
-        do: [observer: :load],
-        else: []
-      ) ++
-      if(edition_type == :enterprise,
-        do: [
-          emqx_license: :permanent,
-          emqx_enterprise: :load,
-          emqx_bridge_kafka: :permanent,
-          emqx_bridge_pulsar: :permanent,
-          emqx_bridge_gcp_pubsub: :permanent,
-          emqx_bridge_cassandra: :permanent,
-          emqx_bridge_opents: :permanent,
-          emqx_bridge_clickhouse: :permanent,
-          emqx_bridge_dynamo: :permanent,
-          emqx_bridge_hstreamdb: :permanent,
-          emqx_bridge_influxdb: :permanent,
-          emqx_bridge_iotdb: :permanent,
-          emqx_bridge_matrix: :permanent,
-          emqx_bridge_mongodb: :permanent,
-          emqx_bridge_mysql: :permanent,
-          emqx_bridge_pgsql: :permanent,
-          emqx_bridge_redis: :permanent,
-          emqx_bridge_rocketmq: :permanent,
-          emqx_bridge_tdengine: :permanent,
-          emqx_bridge_timescale: :permanent,
-          emqx_bridge_sqlserver: :permanent,
-          emqx_oracle: :permanent,
-          emqx_bridge_oracle: :permanent,
-          emqx_bridge_rabbitmq: :permanent,
-          emqx_schema_registry: :permanent,
-          emqx_eviction_agent: :permanent,
-          emqx_node_rebalance: :permanent,
-          emqx_ft: :permanent,
-          emqx_bridge_kinesis: :permanent
-        ],
-        else: [
-          emqx_telemetry: :permanent
-        ]
-      )
+    {:ok,
+     [
+       %{
+         db_apps: db_apps,
+         system_apps: system_apps,
+         common_business_apps: common_business_apps,
+         ee_business_apps: ee_business_apps,
+         ce_business_apps: ce_business_apps
+       }
+     ]} = :file.consult("apps/emqx_machine/priv/reboot_lists.eterm")
+
+    edition_specific_apps =
+      if edition_type == :enterprise do
+        ee_business_apps
+      else
+        ce_business_apps
+      end
+
+    business_apps = common_business_apps ++ edition_specific_apps
+
+    excluded_apps = excluded_apps()
+
+    system_apps =
+      Enum.map(system_apps, fn app ->
+        if is_atom(app), do: {app, :permanent}, else: app
+      end)
+
+    db_apps = Enum.map(db_apps, &{&1, :load})
+    business_apps = Enum.map(business_apps, &{&1, :load})
+
+    [system_apps, db_apps, [emqx_machine: :permanent], business_apps]
+    |> List.flatten()
+    |> Keyword.reject(fn {app, _type} -> app in excluded_apps end)
+  end
+
+  defp excluded_apps() do
+    %{
+      mnesia_rocksdb: enable_rocksdb?(),
+      quicer: enable_quicer?(),
+      bcrypt: enable_bcrypt?(),
+      jq: enable_jq?(),
+      observer: is_app?(:observer)
+    }
+    |> Enum.reject(&elem(&1, 1))
+    |> Enum.map(&elem(&1, 0))
   end
 
-  defp is_app(name) do
+  defp is_app?(name) do
     case Application.load(name) do
       :ok ->
         true

+ 2 - 2
rebar.config

@@ -62,7 +62,7 @@
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
     , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.6"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.7"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}}
@@ -75,7 +75,7 @@
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
     , {getopt, "1.0.2"}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
-    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.13"}}}
+    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}
     , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

+ 31 - 103
rebar.config.erl

@@ -387,74 +387,37 @@ overlay_vars_pkg(pkg) ->
     ].
 
 relx_apps(ReleaseType, Edition) ->
-    [
-        kernel,
-        sasl,
-        crypto,
-        public_key,
-        asn1,
-        syntax_tools,
-        ssl,
-        os_mon,
-        inets,
-        compiler,
-        runtime_tools,
-        redbug,
-        xmerl,
-        {hocon, load},
-        telemetry,
-        % started by emqx_machine
-        {emqx, load},
-        {emqx_conf, load},
-        emqx_machine
-    ] ++
-        [{mnesia_rocksdb, load} || is_rocksdb_supported()] ++
-        [
-            {mnesia, load},
-            {ekka, load},
-            {esasl, load},
-            observer_cli,
-            tools,
-            {covertool, load},
-            % started by emqx_machine
-            {system_monitor, load},
-            {emqx_utils, load},
-            emqx_http_lib,
-            emqx_resource,
-            emqx_connector,
-            emqx_authn,
-            emqx_authz,
-            emqx_auto_subscribe,
-            emqx_gateway,
-            emqx_gateway_stomp,
-            emqx_gateway_mqttsn,
-            emqx_gateway_coap,
-            emqx_gateway_lwm2m,
-            emqx_gateway_exproto,
-            emqx_exhook,
-            emqx_bridge,
-            emqx_bridge_mqtt,
-            emqx_bridge_http,
-            emqx_rule_engine,
-            emqx_modules,
-            emqx_management,
-            emqx_dashboard,
-            emqx_retainer,
-            emqx_prometheus,
-            emqx_psk,
-            emqx_slow_subs,
-            emqx_mongodb,
-            emqx_redis,
-            emqx_mysql,
-            emqx_plugins
-        ] ++
-        [quicer || is_quicer_supported()] ++
-        [bcrypt || provide_bcrypt_release(ReleaseType)] ++
-        %% Started automatically when needed (only needs to be started when the
-        %% port implementation is used)
-        [{jq, load} || is_jq_supported()] ++
-        [{observer, load} || is_app(observer)] ++
-        relx_apps_per_edition(Edition).
+    {ok, [
+        #{
+            db_apps := DBApps,
+            system_apps := SystemApps,
+            common_business_apps := CommonBusinessApps,
+            ee_business_apps := EEBusinessApps,
+            ce_business_apps := CEBusinessApps
+        }
+    ]} = file:consult("apps/emqx_machine/priv/reboot_lists.eterm"),
+    EditionSpecificApps =
+        case Edition of
+            ee -> EEBusinessApps;
+            ce -> CEBusinessApps
+        end,
+    BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
+    ExcludedApps = excluded_apps(ReleaseType),
+    SystemApps ++
+        %% EMQX starts the DB and the business applications:
+        [{App, load} || App <- (DBApps -- ExcludedApps)] ++
+        [emqx_machine] ++
+        [{App, load} || App <- (BusinessApps -- ExcludedApps)].
+
+excluded_apps(ReleaseType) ->
+    OptionalApps = [
+        {quicer, is_quicer_supported()},
+        {bcrypt, provide_bcrypt_release(ReleaseType)},
+        {jq, is_jq_supported()},
+        {observer, is_app(observer)},
+        {mnesia_rocksdb, is_rocksdb_supported()}
+    ],
+    [App || {App, false} <- OptionalApps].
 
 is_app(Name) ->
     case application:load(Name) of
@@ -463,41 +426,6 @@ is_app(Name) ->
         _ -> false
     end.
 
-relx_apps_per_edition(ee) ->
-    [
-        emqx_license,
-        {emqx_enterprise, load},
-        emqx_bridge_kafka,
-        emqx_bridge_pulsar,
-        emqx_bridge_gcp_pubsub,
-        emqx_bridge_cassandra,
-        emqx_bridge_opents,
-        emqx_bridge_clickhouse,
-        emqx_bridge_dynamo,
-        emqx_bridge_hstreamdb,
-        emqx_bridge_influxdb,
-        emqx_bridge_iotdb,
-        emqx_bridge_matrix,
-        emqx_bridge_mongodb,
-        emqx_bridge_mysql,
-        emqx_bridge_pgsql,
-        emqx_bridge_redis,
-        emqx_bridge_rocketmq,
-        emqx_bridge_tdengine,
-        emqx_bridge_timescale,
-        emqx_bridge_sqlserver,
-        emqx_oracle,
-        emqx_bridge_oracle,
-        emqx_bridge_rabbitmq,
-        emqx_schema_registry,
-        emqx_eviction_agent,
-        emqx_node_rebalance,
-        emqx_ft,
-        emqx_bridge_kinesis
-    ];
-relx_apps_per_edition(ce) ->
-    [emqx_telemetry].
-
 relx_overlay(ReleaseType, Edition) ->
     [
         {mkdir, "log/"},

+ 4 - 4
rel/i18n/emqx_mgmt_api_configs.hocon

@@ -23,14 +23,14 @@ rest_conf_query.label:
 """Reset the config entry with query"""
 
 get_global_zone_configs.desc:
-"""Get the global zone configs"""
+"""Get the MQTT-related configuration"""
 get_global_zone_configs.label:
-"""Get the global zone configs"""
+"""Get the MQTT-related configuration"""
 
 update_global_zone_configs.desc:
-"""Update global zone configs"""
+"""Update MQTT-related configuration"""
 update_global_zone_configs.label:
-"""Update global zone configs"""
+"""Update MQTT-related configuration"""
 
 get_node_level_limiter_configs.desc:
 """Get the node-level limiter configs"""

+ 5 - 6
rel/i18n/emqx_mgmt_api_key_schema.hocon

@@ -7,12 +7,11 @@ api_key.label:
 """API Key"""
 
 bootstrap_file.desc:
-"""Bootstrap file is used to add an api_key when emqx is launched,
-      the format is:
-       ```
-       7e729ae70d23144b:2QILI9AcQ9BYlVqLDHQNWN2saIjBV4egr1CZneTNKr9CpK
-       ec3907f865805db0:Ee3taYltUKtoBVD9C3XjQl9C6NXheip8Z9B69BpUv5JxVHL
-       ```"""
+"""The bootstrap file provides API keys for EMQX.
+EMQX will load these keys on startup to authorize API requests.
+It contains key-value pairs in the format:`api_key:api_secret`.
+Each line specifies an API key and its associated secret.
+"""
 
 bootstrap_file.label:
 """Initialize api_key file."""