Explorar o código

Merge remote-tracking branch 'origin/master' into release-51

Zaiming (Stone) Shi %!s(int64=2) %!d(string=hai) anos
pai
achega
28c564d15b
Modificáronse 95 ficheiros con 1245 adicións e 428 borrados
  1. 1 2
      .ci/docker-compose-file/docker-compose-python.yaml
  2. 1 0
      .ci/docker-compose-file/docker-compose-rocketmq.yaml
  3. 4 4
      .ci/docker-compose-file/python/pytest.sh
  4. 2 0
      .ci/docker-compose-file/rocketmq/conf/broker.conf
  5. 11 0
      .ci/docker-compose-file/rocketmq/conf/plain_acl.yml
  6. 12 2
      .github/workflows/build_slim_packages.yaml
  7. 3 3
      .github/workflows/run_fvt_tests.yaml
  8. 1 0
      apps/emqx/include/emqx_authentication.hrl
  9. 1 0
      apps/emqx/priv/bpapi.versions
  10. 22 6
      apps/emqx/src/emqx.erl
  11. 35 15
      apps/emqx/src/emqx_authentication.erl
  12. 81 15
      apps/emqx/src/emqx_authentication_config.erl
  13. 3 1
      apps/emqx/src/emqx_cm.erl
  14. 9 6
      apps/emqx/src/emqx_config.erl
  15. 11 27
      apps/emqx/src/emqx_config_handler.erl
  16. 2 2
      apps/emqx/src/emqx_maybe.erl
  17. 15 3
      apps/emqx/test/emqx_authentication_SUITE.erl
  18. 2 2
      apps/emqx/test/emqx_common_test_helpers.erl
  19. 2 52
      apps/emqx/test/emqx_config_handler_SUITE.erl
  20. 1 1
      apps/emqx/test/emqx_crl_cache_SUITE.erl
  21. 0 2
      apps/emqx_authn/include/emqx_authn.hrl
  22. 6 5
      apps/emqx_authn/src/emqx_authn_api.erl
  23. 121 0
      apps/emqx_authn/test/emqx_authn_SUITE.erl
  24. 1 0
      apps/emqx_authz/include/emqx_authz.hrl
  25. 1 1
      apps/emqx_authz/src/emqx_authz.app.src
  26. 76 39
      apps/emqx_authz/src/emqx_authz.erl
  27. 71 0
      apps/emqx_authz/test/emqx_authz_SUITE.erl
  28. 10 5
      apps/emqx_authz/test/emqx_authz_test_lib.erl
  29. 6 1
      apps/emqx_bridge/src/emqx_bridge.erl
  30. 25 3
      apps/emqx_bridge/src/emqx_bridge_app.erl
  31. 1 1
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src
  32. 2 0
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl
  33. 1 1
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src
  34. 6 1
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl
  35. 1 1
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src
  36. 6 1
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl
  37. 0 8
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl
  38. 1 1
      apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src
  39. 6 1
      apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl
  40. 1 1
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src
  41. 11 1
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl
  42. 12 0
      apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
  43. 11 4
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
  44. 41 3
      apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl
  45. 1 1
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src
  46. 7 1
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl
  47. 6 1
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl
  48. 55 4
      apps/emqx_conf/src/emqx_conf_cli.erl
  49. 20 3
      apps/emqx_conf/test/emqx_global_gc_SUITE.erl
  50. 9 4
      apps/emqx_connector/src/emqx_connector_ldap.erl
  51. 8 1
      apps/emqx_connector/src/emqx_connector_mongo.erl
  52. 6 1
      apps/emqx_connector/src/emqx_connector_pgsql.erl
  53. 2 0
      apps/emqx_connector/src/emqx_connector_redis.erl
  54. 1 1
      apps/emqx_ctl/src/emqx_ctl.app.src
  55. 30 17
      apps/emqx_ctl/src/emqx_ctl.erl
  56. 2 2
      apps/emqx_ctl/test/emqx_ctl_SUITE.erl
  57. 2 3
      apps/emqx_dashboard/src/emqx_dashboard_schema.erl
  58. 3 2
      apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl
  59. 17 11
      apps/emqx_ft/src/emqx_ft.erl
  60. 8 6
      apps/emqx_ft/src/emqx_ft_assembler.erl
  61. 3 3
      apps/emqx_ft/src/emqx_ft_assembler_sup.erl
  62. 5 5
      apps/emqx_ft/src/emqx_ft_storage.erl
  63. 15 8
      apps/emqx_ft/src/emqx_ft_storage_exporter.erl
  64. 6 6
      apps/emqx_ft/src/emqx_ft_storage_fs.erl
  65. 15 2
      apps/emqx_ft/test/emqx_ft_SUITE.erl
  66. 1 1
      apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
  67. 6 2
      apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl
  68. 1 1
      apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
  69. 13 6
      apps/emqx_management/src/emqx_mgmt_api_plugins.erl
  70. 2 0
      apps/emqx_management/src/emqx_mgmt_app.erl
  71. 14 7
      apps/emqx_management/src/emqx_mgmt_auth.erl
  72. 5 0
      apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl
  73. 52 0
      apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl
  74. 16 23
      apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl
  75. 2 11
      apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl
  76. 1 1
      apps/emqx_oracle/src/emqx_oracle.app.src
  77. 6 1
      apps/emqx_oracle/src/emqx_oracle.erl
  78. 82 1
      apps/emqx_plugins/test/emqx_plugins_SUITE.erl
  79. 3 0
      apps/emqx_rule_engine/include/rule_engine.hrl
  80. 13 11
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  81. 5 10
      apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
  82. 6 2
      apps/emqx_rule_engine/src/emqx_rule_engine_app.erl
  83. 10 12
      apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
  84. 4 0
      changes/ce/fix-10923.en.md
  85. 1 0
      changes/ee/feat-10892.en.md
  86. 1 0
      changes/ee/fix-10913.en.md
  87. 1 0
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
  88. 1 1
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src
  89. 18 24
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl
  90. 2 2
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl
  91. 13 9
      lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl
  92. 5 0
      scripts/spellcheck/spellcheck.sh
  93. 15 0
      scripts/ui-tests/conftest.py
  94. 73 0
      scripts/ui-tests/dashboard_test.py
  95. 16 0
      scripts/ui-tests/docker-compose.yaml

+ 1 - 2
.ci/docker-compose-file/docker-compose-python.yaml

@@ -3,7 +3,7 @@ version: '3.9'
 services:
   python:
     container_name: python
-    image: python:3.7.2-alpine3.9
+    image: python:3.9.16-alpine3.18
     depends_on:
       - emqx1
       - emqx2
@@ -12,4 +12,3 @@ services:
         emqx_bridge:
     volumes:
       - ./python:/scripts
-

+ 1 - 0
.ci/docker-compose-file/docker-compose-rocketmq.yaml

@@ -23,6 +23,7 @@ services:
       - ./rocketmq/logs:/opt/logs
       - ./rocketmq/store:/opt/store
       - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
+      - ./rocketmq/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml
     environment:
         NAMESRV_ADDR: "rocketmq_namesrv:9876"
         JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99"

+ 4 - 4
.ci/docker-compose-file/python/pytest.sh

@@ -18,13 +18,13 @@ else
 fi
 
 apk update && apk add git curl
-git clone -b develop-4.0 https://github.com/emqx/paho.mqtt.testing.git /paho.mqtt.testing
-pip install pytest==6.2.5
+git clone -b develop-5.0 https://github.com/emqx/paho.mqtt.testing.git /paho.mqtt.testing
+pip install pytest==7.1.2 pytest-retry
 
-pytest -v /paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "$TARGET_HOST"
+pytest --retries 3 -v /paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "$TARGET_HOST"
 RESULT=$?
 
-pytest -v /paho.mqtt.testing/interoperability/test_client --host "$TARGET_HOST"
+pytest --retries 3 -v /paho.mqtt.testing/interoperability/test_client --host "$TARGET_HOST"
 RESULT=$(( RESULT + $? ))
 
 # pytest -v /paho.mqtt.testing/interoperability/test_cluster --host1 "node1.emqx.io" --host2 "node2.emqx.io"

+ 2 - 0
.ci/docker-compose-file/rocketmq/conf/broker.conf

@@ -20,3 +20,5 @@ maxMessageSize=65536
 brokerRole=ASYNC_MASTER
 
 flushDiskType=ASYNC_FLUSH
+
+aclEnable=true

+ 11 - 0
.ci/docker-compose-file/rocketmq/conf/plain_acl.yml

@@ -0,0 +1,11 @@
+globalWhiteRemoteAddresses:
+
+accounts:
+  - accessKey: RocketMQ
+    secretKey: 12345678
+    whiteRemoteAddress:
+    admin: false
+    defaultTopicPerm: DENY
+    defaultGroupPerm: PUB|SUB
+    topicPerms:
+      - TopicTest=PUB|SUB

+ 12 - 2
.github/workflows/build_slim_packages.yaml

@@ -165,7 +165,7 @@ jobs:
         path: _packages/**/*
 
   docker:
-    runs-on: ubuntu-22.04
+    runs-on: aws-amd64
 
     strategy:
       fail-fast: false
@@ -196,12 +196,17 @@ jobs:
         tags: ${{ env.EMQX_IMAGE_TAG }}
         build-args: |
           EMQX_NAME=${{ env.EMQX_NAME }}
-    - name: test docker image
+    - name: smoke test
       run: |
         CID=$(docker run -d --rm -P $EMQX_IMAGE_TAG)
         HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID)
         ./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT
         docker stop $CID
+    - name: dashboard tests
+      working-directory: ./scripts/ui-tests
+      run: |
+        set -eu
+        docker compose up --abort-on-container-exit --exit-code-from selenium
     - name: test two nodes cluster with proto_dist=inet_tls in docker
       run: |
         ./scripts/test/start-two-nodes-in-docker.sh -P $EMQX_IMAGE_TAG $EMQX_IMAGE_OLD_VERSION_TAG
@@ -216,6 +221,11 @@ jobs:
       with:
         name: "${{ matrix.profile[0] }}-docker"
         path: "${{ env.EMQX_NAME }}-${{ env.PKG_VSN }}.tar.gz"
+    - name: cleanup
+      if: always()
+      working-directory: ./scripts/ui-tests
+      run: |
+        docker compose rm -fs
 
   spellcheck:
     needs: linux

+ 3 - 3
.github/workflows/run_fvt_tests.yaml

@@ -228,11 +228,11 @@ jobs:
     - uses: actions/checkout@v3
       with:
         repository: emqx/paho.mqtt.testing
-        ref: develop-4.0
+        ref: develop-5.0
         path: paho.mqtt.testing
     - name: install pytest
       run: |
-        pip install pytest
+        pip install pytest==7.1.2 pytest-retry
         echo "$HOME/.local/bin" >> $GITHUB_PATH
     - name: run paho test
       timeout-minutes: 10
@@ -250,6 +250,6 @@ jobs:
           sleep 10
         done
 
-        pytest -v paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "127.0.0.1"
+        pytest --retries 3 -v paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "127.0.0.1"
     - if: failure()
       run: kubectl logs -l "app.kubernetes.io/instance=${{ matrix.profile }}" -c emqx --tail=1000

+ 1 - 0
apps/emqx/include/emqx_authentication.hrl

@@ -20,6 +20,7 @@
 -include_lib("emqx/include/logger.hrl").
 
 -define(AUTHN_TRACE_TAG, "AUTHN").
+-define(GLOBAL, 'mqtt:global').
 
 -define(TRACE_AUTHN_PROVIDER(Msg), ?TRACE_AUTHN_PROVIDER(Msg, #{})).
 -define(TRACE_AUTHN_PROVIDER(Msg, Meta), ?TRACE_AUTHN_PROVIDER(debug, Msg, Meta)).

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -29,6 +29,7 @@
 {emqx_management,3}.
 {emqx_management,4}.
 {emqx_mgmt_api_plugins,1}.
+{emqx_mgmt_api_plugins,2}.
 {emqx_mgmt_cluster,1}.
 {emqx_mgmt_trace,1}.
 {emqx_mgmt_trace,2}.

+ 22 - 6
apps/emqx/src/emqx.erl

@@ -239,14 +239,30 @@ remove_config([RootName | _] = KeyPath, Opts) ->
 
 -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
-reset_config([RootName | _] = KeyPath, Opts) ->
+reset_config([RootName | SubKeys] = KeyPath, Opts) ->
     case emqx_config:get_default_value(KeyPath) of
         {ok, Default} ->
-            emqx_config_handler:update_config(
-                emqx_config:get_schema_mod(RootName),
-                KeyPath,
-                {{update, Default}, Opts}
-            );
+            Mod = emqx_config:get_schema_mod(RootName),
+            case SubKeys =:= [] of
+                true ->
+                    emqx_config_handler:update_config(
+                        Mod,
+                        KeyPath,
+                        {{update, Default}, Opts}
+                    );
+                false ->
+                    NewConf =
+                        emqx_utils_maps:deep_put(
+                            SubKeys,
+                            emqx_config:get_raw([RootName], #{}),
+                            Default
+                        ),
+                    emqx_config_handler:update_config(
+                        Mod,
+                        [RootName],
+                        {{update, NewConf}, Opts}
+                    )
+            end;
         {error, _} = Error ->
             Error
     end.

+ 35 - 15
apps/emqx/src/emqx_authentication.erl

@@ -60,7 +60,8 @@
     update_authenticator/3,
     lookup_authenticator/2,
     list_authenticators/1,
-    move_authenticator/3
+    move_authenticator/3,
+    reorder_authenticator/2
 ]).
 
 %% APIs for observer built_in_database
@@ -86,12 +87,6 @@
 %% utility functions
 -export([authenticator_id/1, metrics_id/2]).
 
-%% proxy callback
--export([
-    pre_config_update/3,
-    post_config_update/5
-]).
-
 -export_type([
     authenticator_id/0,
     position/0,
@@ -275,12 +270,6 @@ get_enabled(Authenticators) ->
 %% APIs
 %%------------------------------------------------------------------------------
 
-pre_config_update(Path, UpdateReq, OldConfig) ->
-    emqx_authentication_config:pre_config_update(Path, UpdateReq, OldConfig).
-
-post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
-    emqx_authentication_config:post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs).
-
 %% @doc Get all registered authentication providers.
 get_providers() ->
     call(get_providers).
@@ -413,6 +402,12 @@ list_authenticators(ChainName) ->
 move_authenticator(ChainName, AuthenticatorID, Position) ->
     call({move_authenticator, ChainName, AuthenticatorID, Position}).
 
+-spec reorder_authenticator(chain_name(), [authenticator_id()]) -> ok.
+reorder_authenticator(_ChainName, []) ->
+    ok;
+reorder_authenticator(ChainName, AuthenticatorIDs) ->
+    call({reorder_authenticator, ChainName, AuthenticatorIDs}).
+
 -spec import_users(chain_name(), authenticator_id(), {binary(), binary()}) ->
     ok | {error, term()}.
 import_users(ChainName, AuthenticatorID, Filename) ->
@@ -447,8 +442,9 @@ list_users(ChainName, AuthenticatorID, FuzzyParams) ->
 
 init(_Opts) ->
     process_flag(trap_exit, true),
-    ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
-    ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], ?MODULE),
+    Module = emqx_authentication_config,
+    ok = emqx_config_handler:add_handler([?CONF_ROOT], Module),
+    ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], Module),
     {ok, #{hooked => false, providers => #{}}}.
 
 handle_call(get_providers, _From, #{providers := Providers} = State) ->
@@ -504,6 +500,12 @@ handle_call({move_authenticator, ChainName, AuthenticatorID, Position}, _From, S
     end,
     Reply = with_chain(ChainName, UpdateFun),
     reply(Reply, State);
+handle_call({reorder_authenticator, ChainName, AuthenticatorIDs}, _From, State) ->
+    UpdateFun = fun(Chain) ->
+        handle_reorder_authenticator(Chain, AuthenticatorIDs)
+    end,
+    Reply = with_chain(ChainName, UpdateFun),
+    reply(Reply, State);
 handle_call({import_users, ChainName, AuthenticatorID, Filename}, _From, State) ->
     Reply = call_authenticator(ChainName, AuthenticatorID, import_users, [Filename]),
     reply(Reply, State);
@@ -609,6 +611,24 @@ handle_move_authenticator(Chain, AuthenticatorID, Position) ->
             {error, Reason}
     end.
 
+handle_reorder_authenticator(Chain, AuthenticatorIDs) ->
+    #chain{authenticators = Authenticators} = Chain,
+    NAuthenticators =
+        lists:filtermap(
+            fun(ID) ->
+                case lists:keyfind(ID, #authenticator.id, Authenticators) of
+                    false ->
+                        ?SLOG(error, #{msg => "authenticator_not_found", id => ID}),
+                        false;
+                    Authenticator ->
+                        {true, Authenticator}
+                end
+            end,
+            AuthenticatorIDs
+        ),
+    NewChain = Chain#chain{authenticators = NAuthenticators},
+    {ok, ok, NewChain}.
+
 handle_create_authenticator(Chain, Config, Providers) ->
     #chain{name = Name, authenticators = Authenticators} = Chain,
     AuthenticatorID = authenticator_id(Config),

+ 81 - 15
apps/emqx/src/emqx_authentication_config.erl

@@ -65,8 +65,8 @@
 
 -spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
     {ok, map() | list()} | {error, term()}.
-pre_config_update(_, UpdateReq, OldConfig) ->
-    try do_pre_config_update(UpdateReq, to_list(OldConfig)) of
+pre_config_update(Paths, UpdateReq, OldConfig) ->
+    try do_pre_config_update(Paths, UpdateReq, to_list(OldConfig)) of
         {error, Reason} -> {error, Reason};
         {ok, NewConfig} -> {ok, NewConfig}
     catch
@@ -74,9 +74,9 @@ pre_config_update(_, UpdateReq, OldConfig) ->
             {error, Reason}
     end.
 
-do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) ->
+do_pre_config_update(_, {create_authenticator, ChainName, Config}, OldConfig) ->
     NewId = authenticator_id(Config),
-    case lists:filter(fun(OldConfig0) -> authenticator_id(OldConfig0) =:= NewId end, OldConfig) of
+    case filter_authenticator(NewId, OldConfig) of
         [] ->
             CertsDir = certs_dir(ChainName, Config),
             NConfig = convert_certs(CertsDir, Config),
@@ -84,7 +84,7 @@ do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) ->
         [_] ->
             {error, {already_exists, {authenticator, NewId}}}
     end;
-do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) ->
+do_pre_config_update(_, {delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) ->
     NewConfig = lists:filter(
         fun(OldConfig0) ->
             AuthenticatorID =/= authenticator_id(OldConfig0)
@@ -92,7 +92,7 @@ do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldCon
         OldConfig
     ),
     {ok, NewConfig};
-do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) ->
+do_pre_config_update(_, {update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) ->
     CertsDir = certs_dir(ChainName, AuthenticatorID),
     NewConfig = lists:map(
         fun(OldConfig0) ->
@@ -104,7 +104,7 @@ do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config},
         OldConfig
     ),
     {ok, NewConfig};
-do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) ->
+do_pre_config_update(_, {move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) ->
     case split_by_id(AuthenticatorID, OldConfig) of
         {error, Reason} ->
             {error, Reason};
@@ -129,7 +129,18 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
                             {ok, BeforeNFound ++ [FoundRelated, Found | AfterNFound]}
                     end
             end
-    end.
+    end;
+do_pre_config_update(_, OldConfig, OldConfig) ->
+    {ok, OldConfig};
+do_pre_config_update(Paths, NewConfig, _OldConfig) ->
+    ChainName = chain_name(Paths),
+    {ok, [
+        begin
+            CertsDir = certs_dir(ChainName, New),
+            convert_certs(CertsDir, New)
+        end
+     || New <- to_list(NewConfig)
+    ]}.
 
 -spec post_config_update(
     list(atom()),
@@ -139,13 +150,16 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
     emqx_config:app_envs()
 ) ->
     ok | {ok, map()} | {error, term()}.
-post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
-    do_post_config_update(UpdateReq, to_list(NewConfig), OldConfig, AppEnvs).
+post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
+    do_post_config_update(Paths, UpdateReq, to_list(NewConfig), OldConfig, AppEnvs).
 
-do_post_config_update({create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs) ->
+do_post_config_update(
+    _, {create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs
+) ->
     NConfig = get_authenticator_config(authenticator_id(Config), NewConfig),
     emqx_authentication:create_authenticator(ChainName, NConfig);
 do_post_config_update(
+    _,
     {delete_authenticator, ChainName, AuthenticatorID},
     _NewConfig,
     OldConfig,
@@ -160,6 +174,7 @@ do_post_config_update(
             {error, Reason}
     end;
 do_post_config_update(
+    _,
     {update_authenticator, ChainName, AuthenticatorID, Config},
     NewConfig,
     _OldConfig,
@@ -172,12 +187,57 @@ do_post_config_update(
             emqx_authentication:update_authenticator(ChainName, AuthenticatorID, NConfig)
     end;
 do_post_config_update(
+    _,
     {move_authenticator, ChainName, AuthenticatorID, Position},
     _NewConfig,
     _OldConfig,
     _AppEnvs
 ) ->
-    emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position).
+    emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position);
+do_post_config_update(_, _UpdateReq, OldConfig, OldConfig, _AppEnvs) ->
+    ok;
+do_post_config_update(Paths, _UpdateReq, NewConfig0, OldConfig0, _AppEnvs) ->
+    ChainName = chain_name(Paths),
+    OldConfig = to_list(OldConfig0),
+    NewConfig = to_list(NewConfig0),
+    OldIds = lists:map(fun authenticator_id/1, OldConfig),
+    NewIds = lists:map(fun authenticator_id/1, NewConfig),
+    ok = delete_authenticators(NewIds, ChainName, OldConfig),
+    ok = create_or_update_authenticators(OldIds, ChainName, NewConfig),
+    ok = emqx_authentication:reorder_authenticator(ChainName, NewIds),
+    ok.
+
+%% create new authenticators and update existing ones
+create_or_update_authenticators(OldIds, ChainName, NewConfig) ->
+    lists:foreach(
+        fun(Conf) ->
+            Id = authenticator_id(Conf),
+            case lists:member(Id, OldIds) of
+                true ->
+                    emqx_authentication:update_authenticator(ChainName, Id, Conf);
+                false ->
+                    emqx_authentication:create_authenticator(ChainName, Conf)
+            end
+        end,
+        NewConfig
+    ).
+
+%% delete authenticators that are not in the new config
+delete_authenticators(NewIds, ChainName, OldConfig) ->
+    lists:foreach(
+        fun(Conf) ->
+            Id = authenticator_id(Conf),
+            case lists:member(Id, NewIds) of
+                true ->
+                    ok;
+                false ->
+                    _ = emqx_authentication:delete_authenticator(ChainName, Id),
+                    CertsDir = certs_dir(ChainName, Conf),
+                    ok = clear_certs(CertsDir, Conf)
+            end
+        end,
+        OldConfig
+    ).
 
 to_list(undefined) -> [];
 to_list(M) when M =:= #{} -> [];
@@ -213,14 +273,15 @@ clear_certs(CertsDir, Config) ->
     ok = emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).
 
 get_authenticator_config(AuthenticatorID, AuthenticatorsConfig) ->
-    case
-        lists:filter(fun(C) -> AuthenticatorID =:= authenticator_id(C) end, AuthenticatorsConfig)
-    of
+    case filter_authenticator(AuthenticatorID, AuthenticatorsConfig) of
         [C] -> C;
         [] -> {error, not_found};
         _ -> error({duplicated_authenticator_id, AuthenticatorsConfig})
     end.
 
+filter_authenticator(ID, Authenticators) ->
+    lists:filter(fun(A) -> ID =:= authenticator_id(A) end, Authenticators).
+
 split_by_id(ID, AuthenticatorsConfig) ->
     case
         lists:foldl(
@@ -287,3 +348,8 @@ dir(ChainName, ID) when is_binary(ID) ->
     emqx_utils:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID]));
 dir(ChainName, Config) when is_map(Config) ->
     dir(ChainName, authenticator_id(Config)).
+
+chain_name([authentication]) ->
+    ?GLOBAL;
+chain_name([listeners, Type, Name, authentication]) ->
+    binary_to_existing_atom(<<(atom_to_binary(Type))/binary, ":", (atom_to_binary(Name))/binary>>).

+ 3 - 1
apps/emqx/src/emqx_cm.erl

@@ -176,11 +176,13 @@ insert_channel_info(ClientId, Info, Stats) ->
 %% Note that: It should be called on a lock transaction
 register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
     Chan = {ClientId, ChanPid},
+    %% cast (for process monitor) before inserting ets tables
+    cast({registered, Chan}),
     true = ets:insert(?CHAN_TAB, Chan),
     true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
     ok = emqx_cm_registry:register_channel(Chan),
     mark_channel_connected(ChanPid),
-    cast({registered, Chan}).
+    ok.
 
 %% @doc Unregister a channel.
 -spec unregister_channel(emqx_types:clientid()) -> ok.

+ 9 - 6
apps/emqx/src/emqx_config.erl

@@ -279,25 +279,28 @@ get_default_value([RootName | _] = KeyPath) ->
     end.
 
 -spec get_raw(emqx_utils_maps:config_key_path()) -> term().
-get_raw([Root | T]) when is_atom(Root) -> get_raw([bin(Root) | T]);
-get_raw(KeyPath) -> do_get_raw(KeyPath).
+get_raw([Root | _] = KeyPath) when is_binary(Root) -> do_get_raw(KeyPath);
+get_raw([Root | T]) -> get_raw([bin(Root) | T]);
+get_raw([]) -> do_get_raw([]).
 
 -spec get_raw(emqx_utils_maps:config_key_path(), term()) -> term().
-get_raw([Root | T], Default) when is_atom(Root) -> get_raw([bin(Root) | T], Default);
-get_raw(KeyPath, Default) -> do_get_raw(KeyPath, Default).
+get_raw([Root | _] = KeyPath, Default) when is_binary(Root) -> do_get_raw(KeyPath, Default);
+get_raw([Root | T], Default) -> get_raw([bin(Root) | T], Default);
+get_raw([], Default) -> do_get_raw([], Default).
 
 -spec put_raw(map()) -> ok.
 put_raw(Config) ->
     maps:fold(
         fun(RootName, RootV, _) ->
-            ?MODULE:put_raw([RootName], RootV)
+            ?MODULE:put_raw([bin(RootName)], RootV)
         end,
         ok,
         hocon_maps:ensure_plain(Config)
     ).
 
 -spec put_raw(emqx_utils_maps:config_key_path(), term()) -> ok.
-put_raw(KeyPath, Config) ->
+put_raw(KeyPath0, Config) ->
+    KeyPath = [bin(K) || K <- KeyPath0],
     Putter = fun(Path, Map, Value) ->
         emqx_utils_maps:deep_force_put(Path, Map, Value)
     end,

+ 11 - 27
apps/emqx/src/emqx_config_handler.erl

@@ -31,8 +31,7 @@
     remove_handler/1,
     update_config/3,
     get_raw_cluster_override_conf/0,
-    info/0,
-    merge_to_old_config/2
+    info/0
 ]).
 
 %% gen_server callbacks
@@ -332,31 +331,16 @@ do_post_config_update(
     SubOldConf = get_sub_config(ConfKey, OldConf),
     SubNewConf = get_sub_config(ConfKey, NewConf),
     SubHandlers = get_sub_handlers(ConfKey, Handlers),
-    case
-        do_post_config_update(
-            SubConfKeyPath,
-            SubHandlers,
-            SubOldConf,
-            SubNewConf,
-            AppEnvs,
-            UpdateArgs,
-            Result,
-            ConfKeyPath
-        )
-    of
-        {ok, Result1} ->
-            call_post_config_update(
-                Handlers,
-                OldConf,
-                NewConf,
-                AppEnvs,
-                up_req(UpdateArgs),
-                Result1,
-                ConfKeyPath
-            );
-        Error ->
-            Error
-    end.
+    do_post_config_update(
+        SubConfKeyPath,
+        SubHandlers,
+        SubOldConf,
+        SubNewConf,
+        AppEnvs,
+        UpdateArgs,
+        Result,
+        ConfKeyPath
+    ).
 
 get_sub_handlers(ConfKey, Handlers) ->
     case maps:find(ConfKey, Handlers) of

+ 2 - 2
apps/emqx/src/emqx_maybe.erl

@@ -45,8 +45,8 @@ define(Term, _) ->
     Term.
 
 %% @doc Apply a function to a maybe argument.
--spec apply(fun((A) -> maybe(A)), maybe(A)) ->
-    maybe(A).
+-spec apply(fun((A) -> B), maybe(A)) ->
+    maybe(B).
 apply(_Fun, undefined) ->
     undefined;
 apply(Fun, Term) when is_function(Fun) ->

+ 15 - 3
apps/emqx/test/emqx_authentication_SUITE.erl

@@ -174,7 +174,7 @@ t_authenticator(Config) when is_list(Config) ->
     register_provider(AuthNType1, ?MODULE),
     ID1 = <<"password_based:built_in_database">>,
 
-    % CRUD of authencaticator
+    % CRUD of authenticator
     ?assertMatch(
         {ok, #{id := ID1, state := #{mark := 1}}},
         ?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)
@@ -296,7 +296,10 @@ t_update_config({init, Config}) ->
         | Config
     ];
 t_update_config(Config) when is_list(Config) ->
-    emqx_config_handler:add_handler([?CONF_ROOT], emqx_authentication),
+    emqx_config_handler:add_handler([?CONF_ROOT], emqx_authentication_config),
+    ok = emqx_config_handler:add_handler(
+        [listeners, '?', '?', ?CONF_ROOT], emqx_authentication_config
+    ),
     ok = register_provider(?config("auth1"), ?MODULE),
     ok = register_provider(?config("auth2"), ?MODULE),
     Global = ?config(global),
@@ -355,6 +358,10 @@ t_update_config(Config) when is_list(Config) ->
 
     ?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(Global)),
 
+    [Raw2, Raw1] = emqx:get_raw_config([?CONF_ROOT]),
+    ?assertMatch({ok, _}, update_config([?CONF_ROOT], [Raw1, Raw2])),
+    ?assertMatch({ok, [#{id := ID1}, #{id := ID2}]}, ?AUTHN:list_authenticators(Global)),
+
     ?assertMatch({ok, _}, update_config([?CONF_ROOT], {delete_authenticator, Global, ID1})),
     ?assertEqual(
         {error, {not_found, {authenticator, ID1}}},
@@ -417,11 +424,16 @@ t_update_config(Config) when is_list(Config) ->
         {ok, _},
         update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, ?CMD_MOVE_FRONT})
     ),
-
     ?assertMatch(
         {ok, [#{id := ID2}, #{id := ID1}]},
         ?AUTHN:list_authenticators(ListenerID)
     ),
+    [LRaw2, LRaw1] = emqx:get_raw_config(ConfKeyPath),
+    ?assertMatch({ok, _}, update_config(ConfKeyPath, [LRaw1, LRaw2])),
+    ?assertMatch(
+        {ok, [#{id := ID1}, #{id := ID2}]},
+        ?AUTHN:list_authenticators(ListenerID)
+    ),
 
     ?assertMatch(
         {ok, _},

+ 2 - 2
apps/emqx/test/emqx_common_test_helpers.erl

@@ -277,7 +277,7 @@ wait_for_app_processes(_) ->
 %% and stop others, and then the `application:start/2' callback is
 %% never called again for this application.
 perform_sanity_checks(emqx_rule_engine) ->
-    ensure_config_handler(emqx_rule_engine, [rule_engine, rules]),
+    ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']),
     ok;
 perform_sanity_checks(emqx_bridge) ->
     ensure_config_handler(emqx_bridge, [bridges]),
@@ -289,7 +289,7 @@ ensure_config_handler(Module, ConfigPath) ->
     #{handlers := Handlers} = sys:get_state(emqx_config_handler),
     case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
         #{{mod} := Module} -> ok;
-        _NotFound -> error({config_handler_missing, ConfigPath, Module})
+        NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
     end,
     ok.
 

+ 2 - 52
apps/emqx/test/emqx_config_handler_SUITE.erl

@@ -130,7 +130,7 @@ t_root_key_update(_Config) ->
     ?assertEqual(
         {ok, #{
             config => 0.81,
-            post_config_update => #{?MODULE => ok},
+            post_config_update => #{},
             raw_config => <<"81%">>
         }},
         emqx:update_config(SubKey, "81%", Opts)
@@ -174,7 +174,7 @@ t_sub_key_update_remove(_Config) ->
 
     %% remove
     ?assertEqual(
-        {ok, #{post_config_update => #{emqx_config_handler_SUITE => ok}}},
+        {ok, #{post_config_update => #{?MODULE => ok}}},
         emqx:remove_config(KeyPath)
     ),
     ?assertError(
@@ -184,18 +184,6 @@ t_sub_key_update_remove(_Config) ->
     ?assertEqual(false, lists:member(<<"cpu_check_interval">>, OSKey)),
     ?assert(length(OSKey) > 0),
 
-    ?assertEqual(
-        {ok, #{
-            config => 60000,
-            post_config_update => #{?MODULE => ok},
-            raw_config => <<"60s">>
-        }},
-        emqx:reset_config(KeyPath, Opts)
-    ),
-    OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])),
-    ?assertEqual(true, lists:member(<<"cpu_check_interval">>, OSKey1)),
-    ?assert(length(OSKey1) > 1),
-
     ok = emqx_config_handler:remove_handler(KeyPath),
     ok = emqx_config_handler:remove_handler(KeyPath2),
     ok.
@@ -292,44 +280,6 @@ t_get_raw_cluster_override_conf(_Config) ->
     ?assertEqual(OldInfo, NewInfo),
     ok.
 
-t_save_config_failed(_Config) ->
-    ok.
-
-t_update_sub(_Config) ->
-    PathKey = [sysmon],
-    Opts = #{rawconf_with_defaults => true},
-    ok = emqx_config_handler:add_handler(PathKey, ?MODULE),
-    %% update sub key
-    #{<<"os">> := OS1} = emqx:get_raw_config(PathKey),
-    {ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts),
-    ?assertMatch(
-        #{
-            config := 120000,
-            post_config_update := #{?MODULE := ok},
-            raw_config := <<"120s">>
-        },
-        Res
-    ),
-    ?assertMatch(#{os := #{cpu_check_interval := 120000}}, emqx:get_config(PathKey)),
-    #{<<"os">> := OS2} = emqx:get_raw_config(PathKey),
-    ?assertEqual(lists:sort(maps:keys(OS1)), lists:sort(maps:keys(OS2))),
-
-    %% update sub key
-    SubKey = PathKey ++ [os, cpu_high_watermark],
-    ?assertEqual(
-        {ok, #{
-            config => 0.81,
-            post_config_update => #{?MODULE => ok},
-            raw_config => <<"81%">>
-        }},
-        emqx:update_config(SubKey, "81%", Opts)
-    ),
-    ?assertEqual(0.81, emqx:get_config(SubKey)),
-    ?assertEqual("81%", emqx:get_raw_config(SubKey)),
-
-    ok = emqx_config_handler:remove_handler(PathKey),
-    ok.
-
 pre_config_update([sysmon], UpdateReq, _RawConf) ->
     {ok, UpdateReq};
 pre_config_update([sysmon, os], UpdateReq, _RawConf) ->

+ 1 - 1
apps/emqx/test/emqx_crl_cache_SUITE.erl

@@ -279,7 +279,7 @@ does_module_exist(Mod) ->
 
 clear_listeners() ->
     emqx_config:put([listeners], #{}),
-    emqx_config:put_raw([listeners], #{}),
+    emqx_config:put_raw([<<"listeners">>], #{}),
     ok.
 
 assert_http_get(URL) ->

+ 0 - 2
apps/emqx_authn/include/emqx_authn.hrl

@@ -23,8 +23,6 @@
 
 -define(AUTHN, emqx_authentication).
 
--define(GLOBAL, 'mqtt:global').
-
 -define(RE_PLACEHOLDER, "\\$\\{[a-z0-9\\-]+\\}").
 
 -define(AUTH_SHARD, emqx_authn_shard).

+ 6 - 5
apps/emqx_authn/src/emqx_authn_api.erl

@@ -31,6 +31,7 @@
 -define(NOT_FOUND, 'NOT_FOUND').
 -define(ALREADY_EXISTS, 'ALREADY_EXISTS').
 -define(INTERNAL_ERROR, 'INTERNAL_ERROR').
+-define(CONFIG, emqx_authentication_config).
 
 % Swagger
 
@@ -833,12 +834,12 @@ with_chain(ListenerID, Fun) ->
 create_authenticator(ConfKeyPath, ChainName, Config) ->
     case update_config(ConfKeyPath, {create_authenticator, ChainName, Config}) of
         {ok, #{
-            post_config_update := #{emqx_authentication := #{id := ID}},
+            post_config_update := #{?CONFIG := #{id := ID}},
             raw_config := AuthenticatorsConfig
         }} ->
             {ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig),
             {200, maps:put(id, ID, convert_certs(fill_defaults(AuthenticatorConfig)))};
-        {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
+        {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
             serialize_error(Reason);
         {error, Reason} ->
             serialize_error(Reason)
@@ -1017,7 +1018,7 @@ update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) ->
     of
         {ok, _} ->
             {204};
-        {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
+        {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
             serialize_error(Reason);
         {error, Reason} ->
             serialize_error(Reason)
@@ -1027,7 +1028,7 @@ delete_authenticator(ConfKeyPath, ChainName, AuthenticatorID) ->
     case update_config(ConfKeyPath, {delete_authenticator, ChainName, AuthenticatorID}) of
         {ok, _} ->
             {204};
-        {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
+        {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
             serialize_error(Reason);
         {error, Reason} ->
             serialize_error(Reason)
@@ -1044,7 +1045,7 @@ move_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Position) ->
             of
                 {ok, _} ->
                     {204};
-                {error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
+                {error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
                     serialize_error(Reason);
                 {error, Reason} ->
                     serialize_error(Reason)

+ 121 - 0
apps/emqx_authn/test/emqx_authn_SUITE.erl

@@ -200,6 +200,127 @@ t_union_selector_errors(Config) when is_list(Config) ->
     ),
     ok.
 
+t_update_conf({init, Config}) ->
+    emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn]),
+    {ok, _} = emqx:update_config([authentication], []),
+    Config;
+t_update_conf({'end', _Config}) ->
+    {ok, _} = emqx:update_config([authentication], []),
+    emqx_common_test_helpers:stop_apps([emqx_authn, emqx_conf]),
+    ok;
+t_update_conf(Config) when is_list(Config) ->
+    Authn1 = #{
+        <<"mechanism">> => <<"password_based">>,
+        <<"backend">> => <<"built_in_database">>,
+        <<"user_id_type">> => <<"clientid">>,
+        <<"enable">> => true
+    },
+    Authn2 = #{
+        <<"mechanism">> => <<"password_based">>,
+        <<"backend">> => <<"http">>,
+        <<"method">> => <<"post">>,
+        <<"url">> => <<"http://127.0.0.1:18083">>,
+        <<"headers">> => #{
+            <<"content-type">> => <<"application/json">>
+        },
+        <<"enable">> => true
+    },
+    Authn3 = #{
+        <<"mechanism">> => <<"jwt">>,
+        <<"use_jwks">> => false,
+        <<"algorithm">> => <<"hmac-based">>,
+        <<"secret">> => <<"mysecret">>,
+        <<"secret_base64_encoded">> => false,
+        <<"verify_claims">> => #{<<"username">> => <<"${username}">>},
+        <<"enable">> => true
+    },
+    Chain = 'mqtt:global',
+    {ok, _} = emqx:update_config([authentication], [Authn1]),
+    ?assertMatch(
+        {ok, #{
+            authenticators := [
+                #{
+                    enable := true,
+                    id := <<"password_based:built_in_database">>,
+                    provider := emqx_authn_mnesia
+                }
+            ]
+        }},
+        emqx_authentication:lookup_chain(Chain)
+    ),
+
+    {ok, _} = emqx:update_config([authentication], [Authn1, Authn2, Authn3]),
+    ?assertMatch(
+        {ok, #{
+            authenticators := [
+                #{
+                    enable := true,
+                    id := <<"password_based:built_in_database">>,
+                    provider := emqx_authn_mnesia
+                },
+                #{
+                    enable := true,
+                    id := <<"password_based:http">>,
+                    provider := emqx_authn_http
+                },
+                #{
+                    enable := true,
+                    id := <<"jwt">>,
+                    provider := emqx_authn_jwt
+                }
+            ]
+        }},
+        emqx_authentication:lookup_chain(Chain)
+    ),
+    {ok, _} = emqx:update_config([authentication], [Authn2, Authn1]),
+    ?assertMatch(
+        {ok, #{
+            authenticators := [
+                #{
+                    enable := true,
+                    id := <<"password_based:http">>,
+                    provider := emqx_authn_http
+                },
+                #{
+                    enable := true,
+                    id := <<"password_based:built_in_database">>,
+                    provider := emqx_authn_mnesia
+                }
+            ]
+        }},
+        emqx_authentication:lookup_chain(Chain)
+    ),
+
+    {ok, _} = emqx:update_config([authentication], [Authn3, Authn2, Authn1]),
+    ?assertMatch(
+        {ok, #{
+            authenticators := [
+                #{
+                    enable := true,
+                    id := <<"jwt">>,
+                    provider := emqx_authn_jwt
+                },
+                #{
+                    enable := true,
+                    id := <<"password_based:http">>,
+                    provider := emqx_authn_http
+                },
+                #{
+                    enable := true,
+                    id := <<"password_based:built_in_database">>,
+                    provider := emqx_authn_mnesia
+                }
+            ]
+        }},
+        emqx_authentication:lookup_chain(Chain)
+    ),
+    {ok, _} = emqx:update_config([authentication], []),
+    ?assertMatch(
+        {error, {not_found, {chain, Chain}}},
+        emqx_authentication:lookup_chain(Chain)
+    ),
+    ok.
+
 parse(Bytes) ->
     {ok, Frame, <<>>, {none, _}} = emqx_frame:parse(Bytes),
     Frame.

+ 1 - 0
apps/emqx_authz/include/emqx_authz.hrl

@@ -43,6 +43,7 @@
 -define(CMD_MOVE_BEFORE(Before), {before, Before}).
 -define(CMD_MOVE_AFTER(After), {'after', After}).
 
+-define(ROOT_KEY, [authorization]).
 -define(CONF_KEY_PATH, [authorization, sources]).
 
 -define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}").

+ 1 - 1
apps/emqx_authz/src/emqx_authz.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_authz, [
     {description, "An OTP application"},
-    {vsn, "0.1.21"},
+    {vsn, "0.1.22"},
     {registered, []},
     {mod, {emqx_authz_app, []}},
     {applications, [

+ 76 - 39
apps/emqx_authz/src/emqx_authz.erl

@@ -101,6 +101,7 @@ init() ->
     ok = register_metrics(),
     ok = init_metrics(client_info_source()),
     emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE),
+    emqx_conf:add_handler(?ROOT_KEY, ?MODULE),
     Sources = emqx_conf:get(?CONF_KEY_PATH, []),
     ok = check_dup_types(Sources),
     NSources = create_sources(Sources),
@@ -109,6 +110,7 @@ init() ->
 deinit() ->
     ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}),
     emqx_conf:remove_handler(?CONF_KEY_PATH),
+    emqx_conf:remove_handler(?ROOT_KEY),
     emqx_authz_utils:cleanup_resources().
 
 lookup() ->
@@ -139,14 +141,29 @@ update({?CMD_DELETE, Type}, Sources) ->
 update(Cmd, Sources) ->
     emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}).
 
-pre_config_update(_, Cmd, Sources) ->
-    try do_pre_config_update(Cmd, Sources) of
+pre_config_update(Path, Cmd, Sources) ->
+    try do_pre_config_update(Path, Cmd, Sources) of
         {error, Reason} -> {error, Reason};
         NSources -> {ok, NSources}
     catch
         _:Reason -> {error, Reason}
     end.
 
+do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) ->
+    do_pre_config_update(Cmd, Sources);
+do_pre_config_update(?ROOT_KEY, NewConf, OldConf) ->
+    do_pre_config_replace(NewConf, OldConf).
+
+%% override the entire config when updating the root key
+%% emqx_conf:update(?ROOT_KEY, Conf);
+do_pre_config_replace(Conf, Conf) ->
+    Conf;
+do_pre_config_replace(NewConf, OldConf) ->
+    #{<<"sources">> := NewSources} = NewConf,
+    #{<<"sources">> := OldSources} = OldConf,
+    NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources),
+    NewConf#{<<"sources">> := NewSources1}.
+
 do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) ->
     do_move(Cmd, Sources);
 do_pre_config_update({?CMD_PREPEND, Source}, Sources) ->
@@ -179,47 +196,53 @@ do_pre_config_update({Op, Source}, Sources) ->
 
 post_config_update(_, _, undefined, _OldSource, _AppEnvs) ->
     ok;
-post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) ->
-    Actions = do_post_config_update(Cmd, NewSources),
+post_config_update(Path, Cmd, NewSources, _OldSource, _AppEnvs) ->
+    Actions = do_post_config_update(Path, Cmd, NewSources),
     ok = update_authz_chain(Actions),
     ok = emqx_authz_cache:drain_cache().
 
-do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
-    InitedSources = lookup(),
-    do_move(Cmd, InitedSources);
-do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) ->
-    InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
-    %% create metrics
+do_post_config_update(?CONF_KEY_PATH, {?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
+    do_move(Cmd, lookup());
+do_post_config_update(?CONF_KEY_PATH, {?CMD_PREPEND, RawNewSource}, Sources) ->
     TypeName = type(RawNewSource),
-    ok = emqx_metrics_worker:create_metrics(
-        authz_metrics,
-        TypeName,
-        [total, allow, deny, nomatch],
-        [total]
-    ),
-    [InitedNewSource] ++ lookup();
-do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) ->
-    InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
-    lookup() ++ [InitedNewSource];
-do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
+    NewSources = create_sources([get_source_by_type(TypeName, Sources)]),
+    NewSources ++ lookup();
+do_post_config_update(?CONF_KEY_PATH, {?CMD_APPEND, RawNewSource}, Sources) ->
+    NewSources = create_sources([get_source_by_type(type(RawNewSource), Sources)]),
+    lookup() ++ NewSources;
+do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
     OldSources = lookup(),
     {OldSource, Front, Rear} = take(Type, OldSources),
     NewSource = get_source_by_type(type(RawNewSource), Sources),
     InitedSources = update_source(type(RawNewSource), OldSource, NewSource),
     Front ++ [InitedSources] ++ Rear;
-do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
+do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
     OldInitedSources = lookup(),
     {OldSource, Front, Rear} = take(Type, OldInitedSources),
-    %% delete metrics
-    ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type),
-    ok = ensure_resource_deleted(OldSource),
-    clear_certs(OldSource),
+    ok = ensure_deleted(OldSource, #{clear_metric => true}),
     Front ++ Rear;
-do_post_config_update({?CMD_REPLACE, _RawNewSources}, Sources) ->
-    %% overwrite the entire config!
-    OldInitedSources = lookup(),
-    lists:foreach(fun ensure_resource_deleted/1, OldInitedSources),
-    lists:foreach(fun clear_certs/1, OldInitedSources),
+do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) ->
+    overwrite_entire_sources(Sources);
+do_post_config_update(?ROOT_KEY, Conf, Conf) ->
+    #{sources := Sources} = Conf,
+    Sources;
+do_post_config_update(?ROOT_KEY, _Conf, NewConf) ->
+    #{sources := NewSources} = NewConf,
+    overwrite_entire_sources(NewSources).
+
+overwrite_entire_sources(Sources) ->
+    PrevSources = lookup(),
+    NewSourcesTypes = lists:map(fun type/1, Sources),
+    EnsureDelete = fun(S) ->
+        TypeName = type(S),
+        Opts =
+            case lists:member(TypeName, NewSourcesTypes) of
+                true -> #{clear_metric => false};
+                false -> #{clear_metric => true}
+            end,
+        ensure_deleted(S, Opts)
+    end,
+    lists:foreach(EnsureDelete, PrevSources),
     create_sources(Sources).
 
 %% @doc do source move
@@ -238,8 +261,14 @@ do_move({?CMD_MOVE, Type, ?CMD_MOVE_AFTER(After)}, Sources) ->
     {S2, Front2, Rear2} = take(After, Front1 ++ Rear1),
     Front2 ++ [S2, S1] ++ Rear2.
 
-ensure_resource_deleted(#{enable := false}) ->
+ensure_deleted(#{enable := false}, _) ->
     ok;
+ensure_deleted(Source, #{clear_metric := ClearMetric}) ->
+    TypeName = type(Source),
+    ensure_resource_deleted(Source),
+    clear_certs(Source),
+    ClearMetric andalso emqx_metrics_worker:clear_metrics(authz_metrics, TypeName).
+
 ensure_resource_deleted(#{type := Type} = Source) ->
     Module = authz_module(Type),
     Module:destroy(Source).
@@ -287,12 +316,18 @@ update_source(Type, OldSource, NewSource) ->
 
 init_metrics(Source) ->
     TypeName = type(Source),
-    emqx_metrics_worker:create_metrics(
-        authz_metrics,
-        TypeName,
-        [total, allow, deny, nomatch],
-        [total]
-    ).
+    case emqx_metrics_worker:has_metrics(authz_metrics, TypeName) of
+        %% Don't reset the metrics if it already exists
+        true ->
+            ok;
+        false ->
+            emqx_metrics_worker:create_metrics(
+                authz_metrics,
+                TypeName,
+                [total, allow, deny, nomatch],
+                [total]
+            )
+    end.
 
 %%--------------------------------------------------------------------
 %% AuthZ callbacks
@@ -487,7 +522,9 @@ write_acl_file(#{<<"rules">> := Rules} = Source0) ->
     ok = check_acl_file_rules(AclPath, Rules),
     ok = write_file(AclPath, Rules),
     Source1 = maps:remove(<<"rules">>, Source0),
-    maps:put(<<"path">>, AclPath, Source1).
+    maps:put(<<"path">>, AclPath, Source1);
+write_acl_file(Source) ->
+    Source.
 
 %% @doc where the acl.conf file is stored.
 acl_conf_file() ->

+ 71 - 0
apps/emqx_authz/test/emqx_authz_SUITE.erl

@@ -272,9 +272,80 @@ t_update_source(_) ->
         ],
         emqx_conf:get([authorization, sources], [])
     ),
+    ?assertMatch(
+        [
+            #{type := http, enable := false},
+            #{type := mongodb, enable := false},
+            #{type := mysql, enable := false},
+            #{type := postgresql, enable := false},
+            #{type := redis, enable := false},
+            #{type := file, enable := false}
+        ],
+        emqx_authz:lookup()
+    ),
 
     {ok, _} = emqx_authz:update(?CMD_REPLACE, []).
 
+t_replace_all(_) ->
+    RootKey = [<<"authorization">>],
+    Conf = emqx:get_raw_config(RootKey),
+    emqx_authz_utils:update_config(RootKey, Conf#{
+        <<"sources">> => [
+            ?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1
+        ]
+    }),
+    %% config
+    ?assertMatch(
+        [
+            #{type := file, enable := true},
+            #{type := redis, enable := true},
+            #{type := postgresql, enable := true},
+            #{type := mysql, enable := true},
+            #{type := mongodb, enable := true},
+            #{type := http, enable := true}
+        ],
+        emqx_conf:get([authorization, sources], [])
+    ),
+    %% hooks status
+    ?assertMatch(
+        [
+            #{type := file, enable := true},
+            #{type := redis, enable := true},
+            #{type := postgresql, enable := true},
+            #{type := mysql, enable := true},
+            #{type := mongodb, enable := true},
+            #{type := http, enable := true}
+        ],
+        emqx_authz:lookup()
+    ),
+    Ids = [http, mongodb, mysql, postgresql, redis, file],
+    %% metrics
+    lists:foreach(
+        fun(Id) ->
+            ?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
+        end,
+        Ids
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        emqx_authz_utils:update_config(
+            RootKey,
+            Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]}
+        )
+    ),
+    %% hooks status
+    ?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()),
+    %% metrics
+    ?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)),
+    lists:foreach(
+        fun(Id) ->
+            ?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
+        end,
+        Ids -- [http]
+    ),
+    ok.
+
 t_delete_source(_) ->
     {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]),
 

+ 10 - 5
apps/emqx_authz/test/emqx_authz_test_lib.erl

@@ -25,21 +25,26 @@
 -define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000).
 
 reset_authorizers() ->
-    reset_authorizers(deny, false).
+    reset_authorizers(deny, false, []).
 
 restore_authorizers() ->
-    reset_authorizers(allow, true).
+    reset_authorizers(allow, true, []).
 
-reset_authorizers(Nomatch, ChacheEnabled) ->
+reset_authorizers(Nomatch, CacheEnabled, Source) ->
     {ok, _} = emqx:update_config(
         [authorization],
         #{
             <<"no_match">> => atom_to_binary(Nomatch),
-            <<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)},
-            <<"sources">> => []
+            <<"cache">> => #{<<"enable">> => atom_to_binary(CacheEnabled)},
+            <<"sources">> => Source
         }
     ),
     ok.
+%% Don't reset sources
+reset_authorizers(Nomatch, CacheEnabled) ->
+    {ok, _} = emqx:update_config([<<"authorization">>, <<"no_match">>], Nomatch),
+    {ok, _} = emqx:update_config([<<"authorization">>, <<"cache">>, <<"enable">>], CacheEnabled),
+    ok.
 
 setup_config(BaseConfig, SpecialParams) ->
     Config = maps:merge(BaseConfig, SpecialParams),

+ 6 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -39,7 +39,8 @@
     disable_enable/3,
     remove/2,
     check_deps_and_remove/3,
-    list/0
+    list/0,
+    reload_hook/1
 ]).
 
 -export([
@@ -133,6 +134,10 @@ safe_load_bridge(Type, Name, Conf, Opts) ->
             })
     end.
 
+reload_hook(Bridges) ->
+    ok = unload_hook(),
+    ok = load_hook(Bridges).
+
 load_hook() ->
     Bridges = emqx:get_config([bridges], #{}),
     load_hook(Bridges).

+ 25 - 3
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -69,10 +69,32 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
             {ok, ConfNew}
     end.
 
-post_config_update(Path, '$remove', _, OldConf, _AppEnvs) ->
-    _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf);
-post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) ->
+post_config_update([bridges, BridgeType, BridgeName] = Path, '$remove', _, OldConf, _AppEnvs) ->
+    _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf),
+    ok = emqx_bridge_resource:remove(BridgeType, BridgeName),
+    Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([bridges])),
+    emqx_bridge:reload_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
+    ok;
+post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, undefined, _AppEnvs) ->
+    _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, undefined),
+    ResOpts = emqx_resource:fetch_creation_opts(NewConf),
+    ok = emqx_bridge_resource:create(BridgeType, BridgeName, NewConf, ResOpts),
+    Bridges = emqx_utils_maps:deep_put(
+        [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
+    ),
+    emqx_bridge:reload_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
+    ok;
+post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, OldConf, _AppEnvs) ->
     _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf),
+    ResOpts = emqx_resource:fetch_creation_opts(NewConf),
+    ok = emqx_bridge_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts),
+    Bridges = emqx_utils_maps:deep_put(
+        [BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
+    ),
+    emqx_bridge:reload_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
     ok.
 
 %% internal functions

+ 1 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_cassandra, [
     {description, "EMQX Enterprise Cassandra Bridge"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [kernel, stdlib, ecql]},
     {env, []},

+ 2 - 0
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -480,6 +480,8 @@ prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co
 
 handle_result({error, disconnected}) ->
     {error, {recoverable_error, disconnected}};
+handle_result({error, ecpool_empty}) ->
+    {error, {recoverable_error, ecpool_empty}};
 handle_result({error, Error}) ->
     {error, {unrecoverable_error, Error}};
 handle_result(Res) ->

+ 1 - 1
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_clickhouse, [
     {description, "EMQX Enterprise ClickHouse Bridge"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [kernel, stdlib, clickhouse, emqx_resource]},
     {env, []},

+ 6 - 1
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -464,7 +464,12 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
         sql => SQL,
         reason => ClickhouseErrorResult
     }),
-    {error, ClickhouseErrorResult}.
+    case ClickhouseErrorResult of
+        {error, ecpool_empty} ->
+            {error, {recoverable_error, ecpool_empty}};
+        _ ->
+            {error, ClickhouseErrorResult}
+    end.
 
 snabbkaffe_log_return(_Result) ->
     ?tp(

+ 1 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_dynamo, [
     {description, "EMQX Enterprise Dynamo Bridge"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [kernel, stdlib, erlcloud]},
     {env, []},

+ 6 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -170,7 +170,12 @@ do_query(
                 query => Query,
                 reason => Reason
             }),
-            Result;
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    Result
+            end;
         _ ->
             ?tp(
                 dynamo_connector_query_return,

+ 0 - 8
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -115,14 +115,6 @@ end_per_testcase(_TestCase, _Config) ->
     delete_all_bridges(),
     ok.
 
-set_special_configs(emqx_management) ->
-    Listeners = #{http => #{port => 8081}},
-    Config = #{
-        listeners => Listeners,
-        applications => [#{id => "admin", secret => "public"}]
-    },
-    emqx_config:put([emqx_management], Config),
-    ok;
 set_special_configs(emqx_dashboard) ->
     emqx_dashboard_api_test_helpers:set_default_config(),
     ok;

+ 1 - 1
apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_opents, [
     {description, "EMQX Enterprise OpenTSDB Bridge"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [
         kernel,

+ 6 - 1
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -142,7 +142,12 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) ->
                 query => Query,
                 reason => Reason
             }),
-            Result;
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    Result
+            end;
         _ ->
             ?tp(
                 opents_connector_query_return,

+ 1 - 1
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_oracle, [
     {description, "EMQX Enterprise Oracle Database Bridge"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [
         kernel,

+ 11 - 1
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl

@@ -16,7 +16,8 @@
     namespace/0,
     roots/0,
     fields/1,
-    desc/1
+    desc/1,
+    config_validator/1
 ]).
 
 -define(DEFAULT_SQL, <<
@@ -107,3 +108,12 @@ type_field(Type) ->
 
 name_field() ->
     {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
+
+config_validator(#{<<"server">> := Server} = Config) when
+    not is_map(Server) andalso
+        not is_map_key(<<"sid">>, Config) andalso
+        not is_map_key(<<"service_name">>, Config)
+->
+    {error, "neither SID nor Service Name was set"};
+config_validator(_) ->
+    ok.

+ 12 - 0
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -517,3 +517,15 @@ t_on_get_status(Config) ->
         ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
     ),
     ok.
+
+t_no_sid_nor_service_name(Config0) ->
+    OracleConfig0 = ?config(oracle_config, Config0),
+    OracleConfig1 = maps:remove(<<"sid">>, OracleConfig0),
+    OracleConfig = maps:remove(<<"service_name">>, OracleConfig1),
+    NewOracleConfig = {oracle_config, OracleConfig},
+    Config = lists:keyreplace(oracle_config, 1, Config0, NewOracleConfig),
+    ?assertMatch(
+        {error, #{kind := validation_error}},
+        create_bridge(Config)
+    ),
+    ok.

+ 11 - 4
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -431,11 +431,12 @@ on_query(
         state => emqx_utils:redact(State)
     }),
     MessageData = format_data(PayloadTemplate, Data),
-    ecpool:pick_and_do(
+    Res = ecpool:pick_and_do(
         PoolName,
         {?MODULE, publish_messages, [Config, [MessageData]]},
         no_handover
-    ).
+    ),
+    handle_result(Res).
 
 %% emqx_resource callback that is called when a batch query is received
 
@@ -467,11 +468,12 @@ on_batch_query(
      || Data <- MessagesToInsert
     ],
     %% Publish the messages
-    ecpool:pick_and_do(
+    Res = ecpool:pick_and_do(
         PoolName,
         {?MODULE, publish_messages, [Config, FormattedMessages]},
         no_handover
-    ).
+    ),
+    handle_result(Res).
 
 publish_messages(
     {_Connection, Channel},
@@ -543,3 +545,8 @@ format_data([], Msg) ->
     emqx_utils_json:encode(Msg);
 format_data(Tokens, Msg) ->
     emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
+
+handle_result({error, ecpool_empty}) ->
+    {error, {recoverable_error, ecpool_empty}};
+handle_result(Res) ->
+    Res.

+ 41 - 3
apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl

@@ -13,6 +13,9 @@
 
 % Bridge defaults
 -define(TOPIC, "TopicTest").
+-define(DENY_TOPIC, "DENY_TOPIC").
+-define(ACCESS_KEY, "RocketMQ").
+-define(SECRET_KEY, "12345678").
 -define(BATCH_SIZE, 10).
 -define(PAYLOAD, <<"HELLO">>).
 
@@ -25,17 +28,19 @@
 all() ->
     [
         {group, async},
-        {group, sync}
+        {group, sync},
+        {group, acl}
     ].
 
 groups() ->
-    TCs = emqx_common_test_helpers:all(?MODULE),
+    TCs = emqx_common_test_helpers:all(?MODULE) -- [t_acl_deny],
     BatchingGroups = [{group, with_batch}, {group, without_batch}],
     [
         {async, BatchingGroups},
         {sync, BatchingGroups},
         {with_batch, TCs},
-        {without_batch, TCs}
+        {without_batch, TCs},
+        {acl, [t_acl_deny]}
     ].
 
 init_per_group(async, Config) ->
@@ -48,6 +53,9 @@ init_per_group(with_batch, Config0) ->
 init_per_group(without_batch, Config0) ->
     Config = [{batch_size, 1} | Config0],
     common_init(Config);
+init_per_group(acl, Config0) ->
+    Config = [{batch_size, 1}, {query_mode, sync} | Config0],
+    common_init(Config);
 init_per_group(_Group, Config) ->
     Config.
 
@@ -137,6 +145,8 @@ rocketmq_config(BridgeType, Config) ->
             "bridges.~s.~s {\n"
             "  enable = true\n"
             "  servers = ~p\n"
+            "  access_key = ~p\n"
+            "  secret_key = ~p\n"
             "  topic = ~p\n"
             "  resource_opts = {\n"
             "    request_timeout = 1500ms\n"
@@ -148,6 +158,8 @@ rocketmq_config(BridgeType, Config) ->
                 BridgeType,
                 Name,
                 Server,
+                ?ACCESS_KEY,
+                ?SECRET_KEY,
                 ?TOPIC,
                 BatchSize,
                 QueryMode
@@ -271,3 +283,29 @@ t_simple_query(Config) ->
     Result = query_resource(Config, Request),
     ?assertEqual(ok, Result),
     ok.
+
+t_acl_deny(Config0) ->
+    RocketCfg = ?GET_CONFIG(rocketmq_config, Config0),
+    RocketCfg2 = RocketCfg#{<<"topic">> := ?DENY_TOPIC},
+    Config = lists:keyreplace(rocketmq_config, 1, Config0, {rocketmq_config, RocketCfg2}),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    SentData = #{payload => ?PAYLOAD},
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertMatch({error, #{<<"code">> := 1}}, send_message(Config, SentData)),
+                #{?snk_kind := rocketmq_connector_query_return},
+                10_000
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
+            ?assertMatch([#{error := #{<<"code">> := 1}}], Trace),
+            ok
+        end
+    ),
+    ok.

+ 1 - 1
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_sqlserver, [
     {description, "EMQX Enterprise SQL Server Bridge"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [kernel, stdlib, odbc]},
     {env, []},

+ 7 - 1
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl

@@ -336,6 +336,7 @@ conn_str([{_, _} | Opts], Acc) ->
 ) ->
     {ok, list()}
     | {error, {recoverable_error, term()}}
+    | {error, {unrecoverable_error, term()}}
     | {error, term()}.
 do_query(
     ResourceId,
@@ -374,7 +375,12 @@ do_query(
                 query => Query,
                 reason => Reason
             }),
-            Result;
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    Result
+            end;
         _ ->
             ?tp(
                 sqlserver_connector_query_return,

+ 6 - 1
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -200,7 +200,12 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
                 job => Job,
                 reason => Reason
             }),
-            Result;
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    Result
+            end;
         _ ->
             ?tp(
                 tdengine_connector_query_return,

+ 55 - 4
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -18,16 +18,40 @@
 -export([
     load/0,
     admins/1,
+    conf/1,
     unload/0
 ]).
 
--define(CMD, cluster_call).
+-define(CLUSTER_CALL, cluster_call).
+-define(CONF, conf).
 
 load() ->
-    emqx_ctl:register_command(?CMD, {?MODULE, admins}, []).
+    emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, []),
+    emqx_ctl:register_command(?CONF, {?MODULE, conf}, []).
 
 unload() ->
-    emqx_ctl:unregister_command(?CMD).
+    emqx_ctl:unregister_command(?CLUSTER_CALL),
+    emqx_ctl:unregister_command(?CONF).
+
+conf(["show", "--keys-only"]) ->
+    print(emqx_config:get_root_names());
+conf(["show"]) ->
+    print_hocon(get_config());
+conf(["show", Key]) ->
+    print_hocon(get_config(Key));
+conf(["load", Path]) ->
+    load_config(Path);
+conf(_) ->
+    emqx_ctl:usage(
+        [
+            %% TODO add reload
+            %{"conf reload", "reload etc/emqx.conf on local node"},
+            {"conf show --keys-only", "print all keys"},
+            {"conf show", "print all running configures"},
+            {"conf show <key>", "print a specific configuration"},
+            {"conf load <path>", "load a hocon file to all nodes"}
+        ]
+    ).
 
 admins(["status"]) ->
     status();
@@ -43,7 +67,7 @@ admins(["skip", Node0]) ->
     status();
 admins(["tnxid", TnxId0]) ->
     TnxId = list_to_integer(TnxId0),
-    emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]);
+    print(emqx_cluster_rpc:query(TnxId));
 admins(["fast_forward"]) ->
     status(),
     Nodes = mria:running_nodes(),
@@ -91,3 +115,30 @@ status() ->
         Status
     ),
     emqx_ctl:print("-----------------------------------------------\n").
+
+print(Json) ->
+    emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]).
+
+print_hocon(Hocon) ->
+    emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]).
+
+get_config() -> emqx_config:fill_defaults(emqx:get_raw_config([])).
+get_config(Key) -> emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}).
+
+-define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}).
+load_config(Path) ->
+    case hocon:files([Path]) of
+        {ok, Conf} ->
+            maps:foreach(
+                fun(Key, Value) ->
+                    case emqx_conf:update([Key], Value, ?OPTIONS) of
+                        {ok, _} -> emqx_ctl:print("load ~ts ok~n", [Key]);
+                        {error, Reason} -> emqx_ctl:print("load ~ts failed: ~p~n", [Key, Reason])
+                    end
+                end,
+                Conf
+            );
+        {error, Reason} ->
+            emqx_ctl:print("load ~ts failed~n~p~n", [Path, Reason]),
+            {error, bad_hocon_file}
+    end.

+ 20 - 3
apps/emqx_conf/test/emqx_global_gc_SUITE.erl

@@ -23,14 +23,31 @@
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_config:erase_all(),
+    ok.
+
 t_run_gc(_) ->
-    ok = emqx_config:put([node, global_gc_interval], 1000),
+    Conf0 = #{
+        node => #{
+            cookie => <<"cookie">>,
+            data_dir => <<"data">>,
+            global_gc_interval => 1000
+        }
+    },
+    emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0),
+    ?assertEqual({ok, 1000}, application:get_env(emqx_machine, global_gc_interval)),
     {ok, _} = emqx_global_gc:start_link(),
     ok = timer:sleep(1500),
     {ok, MilliSecs} = emqx_global_gc:run(),
-    ct:print("Global GC: ~w(ms)~n", [MilliSecs]),
+    ct:pal("Global GC: ~w(ms)~n", [MilliSecs]),
     emqx_global_gc:stop(),
-    ok = emqx_config:put([node, global_gc_interval], disabled),
+    Conf1 = emqx_utils_maps:deep_put([node, global_gc_interval], Conf0, disabled),
+    emqx_common_test_helpers:load_config(emqx_conf_schema, Conf1),
     {ok, Pid} = emqx_global_gc:start_link(),
     ?assertMatch(#{timer := undefined}, sys:get_state(Pid)),
+    ?assertEqual({ok, disabled}, application:get_env(emqx_machine, global_gc_interval)),
     ok.

+ 9 - 4
apps/emqx_connector/src/emqx_connector_ldap.erl

@@ -135,11 +135,16 @@ on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} =
                 request => Request,
                 connector => InstId,
                 reason => Reason
-            });
+            }),
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    Result
+            end;
         _ ->
-            ok
-    end,
-    Result.
+            Result
+    end.
 
 on_get_status(_InstId, _State) -> connected.
 

+ 8 - 1
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -233,6 +233,8 @@ on_query(
                 connector => InstId
             }),
             {error, Reason};
+        {error, ecpool_empty} ->
+            {error, {recoverable_error, ecpool_empty}};
         {{true, _Info}, _Document} ->
             ok
     end;
@@ -261,7 +263,12 @@ on_query(
                 reason => Reason,
                 connector => InstId
             }),
-            {error, Reason};
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    {error, Reason}
+            end;
         {ok, Cursor} when is_pid(Cursor) ->
             {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)};
         Result ->

+ 6 - 1
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -241,7 +241,12 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
                 sql => NameOrSQL,
                 reason => Reason
             }),
-            Result;
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    Result
+            end;
         Result ->
             ?tp(
                 pgsql_connector_query_return,

+ 2 - 0
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -220,6 +220,8 @@ do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) ->
             case is_unrecoverable_error(Reason) of
                 true ->
                     {error, {unrecoverable_error, Reason}};
+                false when Reason =:= ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
                 false ->
                     Result
             end;

+ 1 - 1
apps/emqx_ctl/src/emqx_ctl.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ctl, [
     {description, "Backend for emqx_ctl script"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {mod, {emqx_ctl_app, []}},
     {applications, [

+ 30 - 17
apps/emqx_ctl/src/emqx_ctl.erl

@@ -128,16 +128,21 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
                     }),
                     {error, Reason}
             end;
-        [] ->
+        Error ->
             help(),
-            {error, cmd_not_found}
+            Error
     end.
 
 -spec lookup_command(cmd()) -> [{module(), atom()}].
 lookup_command(Cmd) when is_atom(Cmd) ->
-    case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of
-        [El] -> El;
-        [] -> []
+    case is_initialized() of
+        true ->
+            case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of
+                [El] -> El;
+                [] -> {error, cmd_not_found}
+            end;
+        false ->
+            {error, cmd_is_initializing}
     end.
 
 -spec get_commands() -> list({cmd(), module(), atom()}).
@@ -145,18 +150,23 @@ get_commands() ->
     [{Cmd, M, F} || {{_Seq, Cmd}, {M, F}, _Opts} <- ets:tab2list(?CMD_TAB)].
 
 help() ->
-    case ets:tab2list(?CMD_TAB) of
-        [] ->
-            print("No commands available.~n");
-        Cmds ->
-            print("Usage: ~ts~n", ["emqx ctl"]),
-            lists:foreach(
-                fun({_, {Mod, Cmd}, _}) ->
-                    print("~110..-s~n", [""]),
-                    apply(Mod, Cmd, [usage])
-                end,
-                Cmds
-            )
+    case is_initialized() of
+        true ->
+            case ets:tab2list(?CMD_TAB) of
+                [] ->
+                    print("No commands available.~n");
+                Cmds ->
+                    print("Usage: ~ts~n", ["emqx ctl"]),
+                    lists:foreach(
+                        fun({_, {Mod, Cmd}, _}) ->
+                            print("~110..-s~n", [""]),
+                            apply(Mod, Cmd, [usage])
+                        end,
+                        Cmds
+                    )
+            end;
+        false ->
+            print("Command table is initializing.~n")
     end.
 
 -spec print(io:format()) -> ok.
@@ -279,3 +289,6 @@ safe_to_existing_atom(Str) ->
         _:badarg ->
             undefined
     end.
+
+is_initialized() ->
+    ets:info(?CMD_TAB) =/= undefined.

+ 2 - 2
apps/emqx_ctl/test/emqx_ctl_SUITE.erl

@@ -49,8 +49,8 @@ t_reg_unreg_command(_) ->
             emqx_ctl:unregister_command(cmd1),
             emqx_ctl:unregister_command(cmd2),
             ct:sleep(100),
-            ?assertEqual([], emqx_ctl:lookup_command(cmd1)),
-            ?assertEqual([], emqx_ctl:lookup_command(cmd2)),
+            ?assertEqual({error, cmd_not_found}, emqx_ctl:lookup_command(cmd1)),
+            ?assertEqual({error, cmd_not_found}, emqx_ctl:lookup_command(cmd2)),
             ?assertEqual([], emqx_ctl:get_commands())
         end
     ).

+ 2 - 3
apps/emqx_dashboard/src/emqx_dashboard_schema.erl

@@ -62,9 +62,8 @@ fields("dashboard") ->
                 #{
                     desc => ?DESC(bootstrap_users_file),
                     required => false,
-                    importance => ?IMPORTANCE_HIDDEN,
-                    default => <<>>
-                    %% deprecated => {since, "5.1.0"}
+                    default => <<>>,
+                    deprecated => {since, "5.1.0"}
                 }
             )}
     ];

+ 3 - 2
apps/emqx_dashboard/test/emqx_swagger_requestBody_SUITE.erl

@@ -33,8 +33,9 @@ init_per_suite(Config) ->
     _ = emqx_mgmt_api_test_util:init_suite([emqx_conf]),
     Config.
 
-end_per_suite(Config) ->
-    emqx_mgmt_api_test_util:end_suite([emqx_conf]).
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite([emqx_conf]),
+    ok.
 
 t_object(_Config) ->
     Spec = #{

+ 17 - 11
apps/emqx_ft/src/emqx_ft.erl

@@ -45,7 +45,8 @@
     offset/0,
     filemeta/0,
     segment/0,
-    checksum/0
+    checksum/0,
+    finopts/0
 ]).
 
 %% Number of bytes
@@ -80,6 +81,10 @@
 
 -type segment() :: {offset(), _Content :: binary()}.
 
+-type finopts() :: #{
+    checksum => checksum()
+}.
+
 %%--------------------------------------------------------------------
 %% API for app
 %%--------------------------------------------------------------------
@@ -170,8 +175,8 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
             ChecksumBin = emqx_maybe:from_list(MaybeChecksum),
             validate(
                 [{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}],
-                fun([FinalSize, Checksum]) ->
-                    on_fin(PacketId, Msg, Transfer, FinalSize, Checksum)
+                fun([FinalSize, FinalChecksum]) ->
+                    on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum)
                 end
             );
         [<<"abort">>] ->
@@ -251,13 +256,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
         end
     end).
 
-on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
+on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
     ?tp(info, "file_transfer_fin", #{
         mqtt_msg => Msg,
         packet_id => PacketId,
         transfer => Transfer,
         final_size => FinalSize,
-        checksum => Checksum
+        checksum => FinalChecksum
     }),
     %% TODO: handle checksum? Do we need it?
     FinPacketKey = {self(), PacketId},
@@ -265,7 +270,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
         ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
     end,
     with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
-        case assemble(Transfer, FinalSize) of
+        case assemble(Transfer, FinalSize, FinalChecksum) of
             %% Assembling completed, ack through the responder right away
             ok ->
                 emqx_ft_responder:ack(FinPacketKey, ok);
@@ -314,9 +319,10 @@ store_segment(Transfer, Segment) ->
             {error, {internal_error, E}}
     end.
 
-assemble(Transfer, FinalSize) ->
+assemble(Transfer, FinalSize, FinalChecksum) ->
     try
-        emqx_ft_storage:assemble(Transfer, FinalSize)
+        FinOpts = [{checksum, FinalChecksum} || FinalChecksum /= undefined],
+        emqx_ft_storage:assemble(Transfer, FinalSize, maps:from_list(FinOpts))
     catch
         C:E:S ->
             ?tp(error, "start_assemble_failed", #{
@@ -397,8 +403,8 @@ do_validate([{checksum, Checksum} | Rest], Parsed) ->
         {error, _Reason} ->
             {error, {invalid_checksum, Checksum}}
     end;
-do_validate([{integrity, Payload, Checksum} | Rest], Parsed) ->
-    case crypto:hash(sha256, Payload) of
+do_validate([{integrity, Payload, {Algo, Checksum}} | Rest], Parsed) ->
+    case crypto:hash(Algo, Payload) of
         Checksum ->
             do_validate(Rest, [Payload | Parsed]);
         Mismatch ->
@@ -411,7 +417,7 @@ do_validate([{{maybe, T}, Value} | Rest], Parsed) ->
 
 parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 ->
     try
-        {ok, binary:decode_hex(Checksum)}
+        {ok, {sha256, binary:decode_hex(Checksum)}}
     catch
         error:badarg ->
             {error, invalid_checksum}

+ 8 - 6
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -16,7 +16,7 @@
 
 -module(emqx_ft_assembler).
 
--export([start_link/3]).
+-export([start_link/4]).
 
 -behaviour(gen_statem).
 -export([callback_mode/0]).
@@ -29,6 +29,7 @@
 -type stdata() :: #{
     storage := emqx_ft_storage_fs:storage(),
     transfer := emqx_ft:transfer(),
+    finopts := emqx_ft:finopts(),
     assembly := emqx_ft_assembly:t(),
     export => emqx_ft_storage_exporter:export()
 }.
@@ -38,8 +39,8 @@
 
 %%
 
-start_link(Storage, Transfer, Size) ->
-    gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []).
+start_link(Storage, Transfer, Size, Opts) ->
+    gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size, Opts}, []).
 
 where(Transfer) ->
     gproc:where(?NAME(Transfer)).
@@ -60,11 +61,12 @@ callback_mode() ->
     handle_event_function.
 
 -spec init(_Args) -> {ok, state(), stdata()}.
-init({Storage, Transfer, Size}) ->
+init({Storage, Transfer, Size, Opts}) ->
     _ = erlang:process_flag(trap_exit, true),
     St = #{
         storage => Storage,
         transfer => Transfer,
+        finopts => Opts,
         assembly => emqx_ft_assembly:new(Size)
     },
     {ok, idle, St}.
@@ -164,8 +166,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export :=
     end;
 handle_event(internal, _, {assemble, []}, St = #{}) ->
     {next_state, complete, St, ?internal([])};
-handle_event(internal, _, complete, St = #{export := Export}) ->
-    Result = emqx_ft_storage_exporter:complete(Export),
+handle_event(internal, _, complete, St = #{export := Export, finopts := Opts}) ->
+    Result = emqx_ft_storage_exporter:complete(Export, Opts),
     _ = maybe_garbage_collect(Result, St),
     {stop, {shutdown, Result}, maps:remove(export, St)}.
 

+ 3 - 3
apps/emqx_ft/src/emqx_ft_assembler_sup.erl

@@ -17,7 +17,7 @@
 -module(emqx_ft_assembler_sup).
 
 -export([start_link/0]).
--export([ensure_child/3]).
+-export([ensure_child/4]).
 
 -behaviour(supervisor).
 -export([init/1]).
@@ -25,10 +25,10 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
-ensure_child(Storage, Transfer, Size) ->
+ensure_child(Storage, Transfer, Size, Opts) ->
     Childspec = #{
         id => Transfer,
-        start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]},
+        start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size, Opts]},
         restart => temporary
     },
     case supervisor:start_child(?MODULE, Childspec) of

+ 5 - 5
apps/emqx_ft/src/emqx_ft_storage.erl

@@ -20,7 +20,7 @@
     [
         store_filemeta/2,
         store_segment/2,
-        assemble/2,
+        assemble/3,
 
         files/0,
         files/1,
@@ -88,7 +88,7 @@
     ok | {async, pid()} | {error, term()}.
 -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
     ok | {async, pid()} | {error, term()}.
--callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
+-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) ->
     ok | {async, pid()} | {error, term()}.
 
 -callback files(storage(), query(Cursor)) ->
@@ -114,10 +114,10 @@ store_filemeta(Transfer, FileMeta) ->
 store_segment(Transfer, Segment) ->
     dispatch(store_segment, [Transfer, Segment]).
 
--spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
+-spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
     ok | {async, pid()} | {error, term()}.
-assemble(Transfer, Size) ->
-    dispatch(assemble, [Transfer, Size]).
+assemble(Transfer, Size, FinOpts) ->
+    dispatch(assemble, [Transfer, Size, FinOpts]).
 
 -spec files() ->
     {ok, page(file_info(), _)} | {error, term()}.

+ 15 - 8
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -24,7 +24,7 @@
 %% Export API
 -export([start_export/3]).
 -export([write/2]).
--export([complete/1]).
+-export([complete/2]).
 -export([discard/1]).
 
 %% Listing API
@@ -117,12 +117,19 @@ write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) ->
             Error
     end.
 
--spec complete(export()) ->
+-spec complete(export(), emqx_ft:finopts()) ->
     ok | {error, _Reason}.
-complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) ->
-    case verify_checksum(Hash, Filemeta) of
-        {ok, Checksum} ->
-            ExporterMod:complete(ExportSt, Checksum);
+complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}, Opts) ->
+    Checksum = emqx_maybe:define(
+        % NOTE
+        % Checksum in `Opts` takes precedence over one in `Filemeta` according to the spec.
+        % We do not care if they differ.
+        maps:get(checksum, Opts, undefined),
+        maps:get(checksum, Filemeta, undefined)
+    ),
+    case verify_checksum(Hash, Checksum) of
+        {ok, ExportChecksum} ->
+            ExporterMod:complete(ExportSt, ExportChecksum);
         {error, _} = Error ->
             _ = ExporterMod:discard(ExportSt),
             Error
@@ -183,13 +190,13 @@ init_checksum(#{}) ->
 update_checksum(Ctx, IoData) ->
     crypto:hash_update(Ctx, IoData).
 
-verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) ->
+verify_checksum(Ctx, {Algo, Digest} = Checksum) ->
     case crypto:hash_final(Ctx) of
         Digest ->
             {ok, Checksum};
         Mismatch ->
             {error, {checksum, Algo, binary:encode_hex(Mismatch)}}
     end;
-verify_checksum(Ctx, #{}) ->
+verify_checksum(Ctx, undefined) ->
     Digest = crypto:hash_final(Ctx),
     {ok, {sha256, Digest}}.

+ 6 - 6
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -36,7 +36,7 @@
 -export([list/3]).
 -export([pread/5]).
 -export([lookup_local_assembler/1]).
--export([assemble/3]).
+-export([assemble/4]).
 
 -export([transfers/1]).
 
@@ -211,14 +211,14 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
             {error, Reason}
     end.
 
--spec assemble(storage(), transfer(), emqx_ft:bytes()) ->
+-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
     {async, _Assembler :: pid()} | ok | {error, _TODO}.
-assemble(Storage, Transfer, Size) ->
+assemble(Storage, Transfer, Size, Opts) ->
     LookupSources = [
         fun() -> lookup_local_assembler(Transfer) end,
         fun() -> lookup_remote_assembler(Transfer) end,
         fun() -> check_if_already_exported(Storage, Transfer) end,
-        fun() -> ensure_local_assembler(Storage, Transfer, Size) end
+        fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end
     ],
     lookup_assembler(LookupSources).
 
@@ -295,8 +295,8 @@ lookup_remote_assembler(Transfer) ->
         _ -> {error, not_found}
     end.
 
-ensure_local_assembler(Storage, Transfer, Size) ->
-    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
+ensure_local_assembler(Storage, Transfer, Size, Opts) ->
+    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts),
     {async, Pid}.
 
 -spec transfers(storage()) ->

+ 15 - 2
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -159,6 +159,10 @@ t_invalid_topic_format(Config) ->
         unspecified_error,
         emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1)
     ),
+    ?assertRCName(
+        unspecified_error,
+        emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1)
+    ),
     ?assertRCName(
         unspecified_error,
         emqtt:publish(C, <<"$file/">>, <<>>, 1)
@@ -390,9 +394,18 @@ t_invalid_checksum(Config) ->
         with_offsets(Data)
     ),
 
+    % Send `fin` w/o checksum, should fail since filemeta checksum is invalid
+    FinTopic = mk_fin_topic(FileId, Filesize),
     ?assertRCName(
         unspecified_error,
-        emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ),
+
+    % Send `fin` with the correct checksum
+    Checksum = binary:encode_hex(sha256(Data)),
+    ?assertRCName(
+        success,
+        emqtt:publish(C, <<FinTopic/binary, "/", Checksum/binary>>, <<>>, 1)
     ).
 
 t_corrupted_segment_retry(Config) ->
@@ -507,7 +520,7 @@ t_assemble_crash(Config) ->
     C = ?config(client, Config),
 
     meck:new(emqx_ft_storage_fs),
-    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end),
+    meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end),
 
     ?assertRCName(
         unspecified_error,

+ 1 - 1
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -178,7 +178,7 @@ complete_assemble(Storage, Transfer, Size) ->
     complete_assemble(Storage, Transfer, Size, 1000).
 
 complete_assemble(Storage, Transfer, Size, Timeout) ->
-    {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
+    {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}),
     MRef = erlang:monitor(process, Pid),
     Pid ! kickoff,
     receive

+ 6 - 2
apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl

@@ -110,9 +110,13 @@ t_upload_error(Config) ->
     Name = "cool_name",
     Data = <<"data"/utf8>>,
 
-    {ok, _} = emqx_conf:update(
-        [file_transfer, storage, local, exporter, s3, bucket], <<"invalid-bucket">>, #{}
+    Conf = emqx_conf:get_raw([file_transfer], #{}),
+    Conf1 = emqx_utils_maps:deep_put(
+        [<<"storage">>, <<"local">>, <<"exporter">>, <<"s3">>, <<"bucket">>],
+        Conf,
+        <<"invalid-bucket">>
     ),
+    {ok, _} = emqx_conf:update([file_transfer], Conf1, #{}),
 
     ?assertEqual(
         {error, unspecified_error},

+ 1 - 1
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -381,7 +381,7 @@ complete_transfer(Storage, Transfer, Size) ->
     complete_transfer(Storage, Transfer, Size, 100).
 
 complete_transfer(Storage, Transfer, Size, Timeout) ->
-    case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of
+    case emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}) of
         ok ->
             ok;
         {async, Pid} ->

+ 13 - 6
apps/emqx_management/src/emqx_mgmt_api_plugins.erl

@@ -322,7 +322,8 @@ validate_name(Name) ->
 
 %% API CallBack Begin
 list_plugins(get, _) ->
-    {Plugins, []} = emqx_mgmt_api_plugins_proto_v1:get_plugins(),
+    Nodes = emqx:running_nodes(),
+    {Plugins, []} = emqx_mgmt_api_plugins_proto_v2:get_plugins(Nodes),
     {200, format_plugins(Plugins)}.
 
 get_plugins() ->
@@ -373,7 +374,8 @@ upload_install(post, #{}) ->
 
 do_install_package(FileName, Bin) ->
     %% TODO: handle bad nodes
-    {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
+    Nodes = emqx:running_nodes(),
+    {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin),
     case lists:filter(fun(R) -> R =/= ok end, Res) of
         [] ->
             {200};
@@ -386,7 +388,11 @@ do_install_package(FileName, Bin) ->
                 end,
                 Filtered
             ),
-            {error, #{error := Reason}} = hd(Filtered),
+            Reason =
+                case hd(Filtered) of
+                    {error, #{error := Reason0}} -> Reason0;
+                    {error, #{reason := Reason0}} -> Reason0
+                end,
             {400, #{
                 code => 'BAD_PLUGIN_INFO',
                 message => iolist_to_binary([Reason, ":", FileName])
@@ -394,17 +400,18 @@ do_install_package(FileName, Bin) ->
     end.
 
 plugin(get, #{bindings := #{name := Name}}) ->
-    {Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name),
+    Nodes = emqx:running_nodes(),
+    {Plugins, _} = emqx_mgmt_api_plugins_proto_v2:describe_package(Nodes, Name),
     case format_plugins(Plugins) of
         [Plugin] -> {200, Plugin};
         [] -> {404, #{code => 'NOT_FOUND', message => Name}}
     end;
 plugin(delete, #{bindings := #{name := Name}}) ->
-    Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
+    Res = emqx_mgmt_api_plugins_proto_v2:delete_package(Name),
     return(204, Res).
 
 update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
-    Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
+    Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action),
     return(204, Res).
 
 update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->

+ 2 - 0
apps/emqx_management/src/emqx_mgmt_app.erl

@@ -31,10 +31,12 @@ start(_Type, _Args) ->
     ok = mria_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity),
     case emqx_mgmt_auth:init_bootstrap_file() of
         ok ->
+            emqx_conf:add_handler([api_key], emqx_mgmt_auth),
             emqx_mgmt_sup:start_link();
         {error, Reason} ->
             {error, Reason}
     end.
 
 stop(_State) ->
+    emqx_conf:remove_handler([api_key]),
     ok.

+ 14 - 7
apps/emqx_management/src/emqx_mgmt_auth.erl

@@ -20,6 +20,7 @@
 %% API
 -export([mnesia/1]).
 -boot_mnesia({mnesia, [boot]}).
+-behaviour(emqx_config_handler).
 
 -export([
     create/4,
@@ -31,6 +32,7 @@
 ]).
 
 -export([authorize/3]).
+-export([post_config_update/5]).
 
 %% Internal exports (RPC)
 -export([
@@ -65,6 +67,17 @@ mnesia(boot) ->
         {attributes, record_info(fields, ?APP)}
     ]).
 
+post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) ->
+    #{bootstrap_file := File} = NewConf,
+    case init_bootstrap_file(File) of
+        ok ->
+            ?SLOG(debug, #{msg => "init_bootstrap_api_keys_from_file_ok", file => File});
+        {error, Reason} ->
+            Msg = "init_bootstrap_api_keys_from_file_failed",
+            ?SLOG(error, #{msg => Msg, reason => Reason, file => File})
+    end,
+    ok.
+
 -spec init_bootstrap_file() -> ok | {error, _}.
 init_bootstrap_file() ->
     File = bootstrap_file(),
@@ -230,13 +243,7 @@ generate_api_secret() ->
     emqx_base62:encode(Random).
 
 bootstrap_file() ->
-    case emqx:get_config([api_key, bootstrap_file], <<>>) of
-        %% For compatible remove until 5.1.0
-        <<>> ->
-            emqx:get_config([dashboard, bootstrap_users_file], <<>>);
-        File ->
-            File
-    end.
+    emqx:get_config([api_key, bootstrap_file], <<>>).
 
 init_bootstrap_file(<<>>) ->
     ok;

+ 5 - 0
apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl

@@ -19,6 +19,8 @@
 
 -export([
     introduced_in/0,
+    deprecated_since/0,
+
     get_plugins/0,
     install_package/2,
     describe_package/1,
@@ -31,6 +33,9 @@
 introduced_in() ->
     "5.0.0".
 
+deprecated_since() ->
+    "5.1.0".
+
 -spec get_plugins() -> emqx_rpc:multicall_result().
 get_plugins() ->
     rpc:multicall(emqx_mgmt_api_plugins, get_plugins, [], 15000).

+ 52 - 0
apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v2.erl

@@ -0,0 +1,52 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_plugins_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    get_plugins/1,
+    install_package/3,
+    describe_package/2,
+    delete_package/1,
+    ensure_action/2
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.1.0".
+
+-spec get_plugins([node()]) -> emqx_rpc:multicall_result().
+get_plugins(Nodes) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000).
+
+-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result().
+install_package(Nodes, Filename, Bin) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000).
+
+-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result().
+describe_package(Nodes, Name) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000).
+
+-spec delete_package(binary() | string()) -> ok | {error, any()}.
+delete_package(Name) ->
+    emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).
+
+-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}.
+ensure_action(Name, Action) ->
+    emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).

+ 16 - 23
apps/emqx_management/test/emqx_mgmt_api_api_keys_SUITE.erl

@@ -29,19 +29,18 @@ groups() ->
     ].
 
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite([emqx_conf]),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
     Config.
 
 end_per_suite(_) ->
-    emqx_mgmt_api_test_util:end_suite([emqx_conf]).
+    emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_management]).
 
 t_bootstrap_file(_) ->
     TestPath = <<"/api/v5/status">>,
     Bin = <<"test-1:secret-1\ntest-2:secret-2">>,
     File = "./bootstrap_api_keys.txt",
     ok = file:write_file(File, Bin),
-    emqx:update_config([api_key, bootstrap_file], File),
-    ok = emqx_mgmt_auth:init_bootstrap_file(),
+    update_file(File),
     ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-1">>)),
     ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-2">>)),
     ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-1">>)),
@@ -49,39 +48,33 @@ t_bootstrap_file(_) ->
     %% relaunch to check if the table is changed.
     Bin1 = <<"test-1:new-secret-1\ntest-2:new-secret-2">>,
     ok = file:write_file(File, Bin1),
-    ok = emqx_mgmt_auth:init_bootstrap_file(),
+    update_file(File),
     ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-1">>)),
     ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-2">>)),
     ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"new-secret-1">>)),
     ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"new-secret-2">>)),
 
-    %% Compatibility
-    Bin2 = <<"test-3:new-secret-3\ntest-4:new-secret-4">>,
-    ok = file:write_file(File, Bin2),
-    emqx:update_config([api_key, bootstrap_file], <<>>),
-    emqx:update_config([dashboard, bootstrap_users_file], File),
-    ok = emqx_mgmt_auth:init_bootstrap_file(),
-    ?assertMatch(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"new-secret-1">>)),
-    ?assertMatch(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"new-secret-2">>)),
-    ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-3">>, <<"new-secret-3">>)),
-    ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-4">>, <<"new-secret-4">>)),
-
-    %% not found
-    NotFoundFile = "./bootstrap_apps_not_exist.txt",
-    emqx:update_config([api_key, bootstrap_file], NotFoundFile),
-    ?assertMatch({error, "No such file or directory"}, emqx_mgmt_auth:init_bootstrap_file()),
+    %% not error when bootstrap_file is empty
+    update_file(<<>>),
+    update_file("./bootstrap_apps_not_exist.txt"),
+    ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-1">>)),
+    ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-2">>)),
+    ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"new-secret-1">>)),
+    ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"new-secret-2">>)),
 
     %% bad format
     BadBin = <<"test-1:secret-11\ntest-2 secret-12">>,
     ok = file:write_file(File, BadBin),
-    emqx:update_config([api_key, bootstrap_file], File),
+    update_file(File),
     ?assertMatch({error, #{reason := "invalid_format"}}, emqx_mgmt_auth:init_bootstrap_file()),
     ?assertEqual(ok, emqx_mgmt_auth:authorize(TestPath, <<"test-1">>, <<"secret-11">>)),
     ?assertMatch({error, _}, emqx_mgmt_auth:authorize(TestPath, <<"test-2">>, <<"secret-12">>)),
-    emqx:update_config([api_key, bootstrap_file], <<>>),
-    emqx:update_config([dashboard, bootstrap_users_file], <<>>),
+    update_file(<<>>),
     ok.
 
+update_file(File) ->
+    ?assertMatch({ok, _}, emqx:update_config([<<"api_key">>], #{<<"bootstrap_file">> => File})).
+
 t_create(_Config) ->
     Name = <<"EMQX-API-KEY-1">>,
     {ok, Create} = create_app(Name),

+ 2 - 11
apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl

@@ -24,20 +24,11 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    mria:start(),
-    ok = emqx_common_test_helpers:start_apps([emqx_management]),
-    emqx_common_test_helpers:start_apps([] ++ [emqx_dashboard], fun set_special_configs/1),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
     Config.
 
 end_per_suite(_) ->
-    emqx_common_test_helpers:stop_apps([emqx_management] ++ [emqx_dashboard]),
-    emqx_config:delete_override_conf_files(),
-    ok.
-
-set_special_configs(emqx_dashboard) ->
-    emqx_dashboard_api_test_helpers:set_default_config();
-set_special_configs(_App) ->
-    ok.
+    emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
 
 t_status(_Config) ->
     emqx_ctl:run_command([]),

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [
         kernel,

+ 6 - 1
apps/emqx_oracle/src/emqx_oracle.erl

@@ -222,7 +222,12 @@ on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
                 sql => NameOrSQL,
                 reason => Reason
             }),
-            Result;
+            case Reason of
+                ecpool_empty ->
+                    {error, {recoverable_error, Reason}};
+                _ ->
+                    Result
+            end;
         Result ->
             ?tp(
                 oracle_connector_query_return,

+ 82 - 1
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -47,7 +47,8 @@ groups() ->
     [
         {copy_plugin, [sequence], [
             group_t_copy_plugin_to_a_new_node,
-            group_t_copy_plugin_to_a_new_node_single_node
+            group_t_copy_plugin_to_a_new_node_single_node,
+            group_t_cluster_leave
         ]},
         {create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]}
     ].
@@ -676,6 +677,86 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
     ),
     ok.
 
+group_t_cluster_leave({init, Config}) ->
+    PrivDataDir = ?config(priv_dir, Config),
+    ToInstallDir = filename:join(PrivDataDir, "plugins_copy_to"),
+    file:del_dir_r(ToInstallDir),
+    ok = filelib:ensure_path(ToInstallDir),
+    #{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
+    NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
+    Cluster =
+        emqx_common_test_helpers:emqx_cluster(
+            [core, core],
+            #{
+                apps => [emqx_conf, emqx_plugins],
+                env => [
+                    {emqx, init_config_load_done, false},
+                    {emqx, boot_modules, []}
+                ],
+                env_handler => fun
+                    (emqx_plugins) ->
+                        ok = emqx_plugins:put_config(install_dir, ToInstallDir),
+                        %% this is to simulate an user setting the state
+                        %% via environment variables before starting the node
+                        ok = emqx_plugins:put_config(
+                            states,
+                            [#{name_vsn => NameVsn, enable => true}]
+                        ),
+                        ok;
+                    (_) ->
+                        ok
+                end,
+                priv_data_dir => PrivDataDir,
+                schema_mod => emqx_conf_schema,
+                peer_mod => slave,
+                load_schema => true
+            }
+        ),
+    Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
+    [
+        {to_install_dir, ToInstallDir},
+        {cluster, Cluster},
+        {nodes, Nodes},
+        {name_vsn, NameVsn},
+        {plugin_name, PluginName}
+        | Config
+    ];
+group_t_cluster_leave({'end', Config}) ->
+    Nodes = proplists:get_value(nodes, Config),
+    [ok = emqx_common_test_helpers:stop_slave(N) || N <- Nodes],
+    ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
+    ok;
+group_t_cluster_leave(Config) ->
+    [N1, N2] = ?config(nodes, Config),
+    NameVsn = proplists:get_value(name_vsn, Config),
+    ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]),
+    ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]),
+    ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]),
+    Params = unused,
+    %% 2 nodes running
+    ?assertMatch(
+        {200, [#{running_status := [#{status := running}, #{status := running}]}]},
+        erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
+    ),
+    ?assertMatch(
+        {200, [#{running_status := [#{status := running}, #{status := running}]}]},
+        erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
+    ),
+
+    %% Now, one node leaves the cluster.
+    ok = erpc:call(N2, ekka, leave, []),
+
+    %% Each node will no longer ask the plugin status to the other.
+    ?assertMatch(
+        {200, [#{running_status := [#{node := N1, status := running}]}]},
+        erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
+    ),
+    ?assertMatch(
+        {200, [#{running_status := [#{node := N2, status := running}]}]},
+        erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
+    ),
+    ok.
+
 make_tar(Cwd, NameWithVsn) ->
     make_tar(Cwd, NameWithVsn, NameWithVsn).
 

+ 3 - 0
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -121,3 +121,6 @@
             false
     end)
 ).
+
+-define(KEY_PATH, [rule_engine, rules]).
+-define(RULE_PATH(RULE), [rule_engine, rules, RULE]).

+ 13 - 11
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -26,8 +26,7 @@
 -export([start_link/0]).
 
 -export([
-    post_config_update/5,
-    config_key_path/0
+    post_config_update/5
 ]).
 
 %% Rule Management
@@ -102,9 +101,6 @@
 
 -type action_name() :: binary() | #{function := binary()}.
 
-config_key_path() ->
-    [rule_engine, rules].
-
 -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}.
 start_link() ->
     gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
@@ -112,7 +108,13 @@ start_link() ->
 %%------------------------------------------------------------------------------
 %% The config handler for emqx_rule_engine
 %%------------------------------------------------------------------------------
-post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
+post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
+    create_rule(NewRule#{id => bin(RuleId)});
+post_config_update(?RULE_PATH(RuleId), '$remove', undefined, _OldRule, _AppEnvs) ->
+    delete_rule(bin(RuleId));
+post_config_update(?RULE_PATH(RuleId), _Req, NewRule, _OldRule, _AppEnvs) ->
+    update_rule(NewRule#{id => bin(RuleId)});
+post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRules}, _AppEnvs) ->
     #{added := Added, removed := Removed, changed := Updated} =
         emqx_utils_maps:diff_maps(NewRules, OldRules),
     try
@@ -134,7 +136,7 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
             end,
             Added
         ),
-        {ok, get_rules()}
+        ok
     catch
         throw:#{kind := _} = Error ->
             {error, Error}
@@ -247,11 +249,11 @@ ensure_action_removed(RuleId, ActionName) ->
     case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
         not_found ->
             ok;
-        #{<<"actions">> := Acts} ->
+        #{<<"actions">> := Acts} = Conf ->
             NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
             {ok, _} = emqx_conf:update(
-                emqx_rule_engine:config_key_path() ++ [RuleId, actions],
-                NewActs,
+                ?RULE_PATH(RuleId),
+                Conf#{<<"actions">> => NewActs},
                 #{override_to => cluster}
             ),
             ok
@@ -372,7 +374,7 @@ init([]) ->
         {write_concurrency, true},
         {read_concurrency, true}
     ]),
-    ok = emqx_config_handler:add_handler(
+    ok = emqx_conf:add_handler(
         [rule_engine, jq_implementation_module],
         emqx_rule_engine_schema
     ),

+ 5 - 10
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -351,14 +351,13 @@ param_path_id() ->
             {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
         Id ->
             Params = filter_out_request_body(add_metadata(Params0)),
-            ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
             case emqx_rule_engine:get_rule(Id) of
                 {ok, _Rule} ->
                     {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
                 not_found ->
+                    ConfPath = ?RULE_PATH(Id),
                     case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
-                        {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
-                            [Rule] = get_one_rule(AllRules, Id),
+                        {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
                             {201, format_rule_info_resp(Rule)};
                         {error, Reason} ->
                             ?SLOG(error, #{
@@ -396,10 +395,9 @@ param_path_id() ->
     end;
 '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
     Params = filter_out_request_body(Params0),
-    ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
+    ConfPath = ?RULE_PATH(Id),
     case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
-        {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
-            [Rule] = get_one_rule(AllRules, Id),
+        {ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
             {200, format_rule_info_resp(Rule)};
         {error, Reason} ->
             ?SLOG(error, #{
@@ -412,7 +410,7 @@ param_path_id() ->
 '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
     case emqx_rule_engine:get_rule(Id) of
         {ok, _Rule} ->
-            ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
+            ConfPath = ?RULE_PATH(Id),
             case emqx_conf:remove(ConfPath, #{override_to => cluster}) of
                 {ok, _} ->
                     {204};
@@ -655,9 +653,6 @@ aggregate_metrics(AllMetrics) ->
         AllMetrics
     ).
 
-get_one_rule(AllRules, Id) ->
-    [R || R = #{id := Id0} <- AllRules, Id0 == Id].
-
 add_metadata(Params) ->
     Params#{
         <<"metadata">> => #{

+ 6 - 2
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -29,11 +29,15 @@ start(_Type, _Args) ->
     ok = emqx_rule_events:reload(),
     SupRet = emqx_rule_engine_sup:start_link(),
     ok = emqx_rule_engine:load_rules(),
-    emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine),
+    RulePath = [RuleEngine | _] = ?KEY_PATH,
+    emqx_conf:add_handler(RulePath ++ ['?'], emqx_rule_engine),
+    emqx_conf:add_handler([RuleEngine], emqx_rule_engine),
     emqx_rule_engine_cli:load(),
     SupRet.
 
 stop(_State) ->
     emqx_rule_engine_cli:unload(),
-    emqx_conf:remove_handler(emqx_rule_engine:config_key_path()),
+    RulePath = [RuleEngine | _] = ?KEY_PATH,
+    emqx_conf:remove_handler(RulePath ++ ['?']),
+    emqx_conf:remove_handler([RuleEngine]),
     ok = emqx_rule_events:unload().

+ 10 - 12
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -472,19 +472,17 @@ t_ensure_action_removed(_) ->
     Id = <<"t_ensure_action_removed">>,
     GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
     emqx:update_config(
-        [rule_engine, rules],
+        [rule_engine, rules, Id],
         #{
-            Id => #{
-                <<"actions">> => [
-                    #{<<"function">> => GetSelectedData},
-                    #{<<"function">> => <<"console">>},
-                    #{<<"function">> => <<"republish">>},
-                    <<"mysql:foo">>,
-                    <<"mqtt:bar">>
-                ],
-                <<"description">> => <<"">>,
-                <<"sql">> => <<"SELECT * FROM \"t/#\"">>
-            }
+            <<"actions">> => [
+                #{<<"function">> => GetSelectedData},
+                #{<<"function">> => <<"console">>},
+                #{<<"function">> => <<"republish">>},
+                <<"mysql:foo">>,
+                <<"mqtt:bar">>
+            ],
+            <<"description">> => <<"">>,
+            <<"sql">> => <<"SELECT * FROM \"t/#\"">>
         }
     ),
     ?assertMatch(

+ 4 - 0
changes/ce/fix-10923.en.md

@@ -0,0 +1,4 @@
+Fix a race-condition in channel info registration.
+
+Prior to this fix, when system is under heavy load, it might happen that a client is disconnected (or has its session expired) but still can be found in the clients page in dashboard.
+One of the possible reasons is a race condition fixed in this PR: the connection is killed in the middle of channel data registration.

+ 1 - 0
changes/ee/feat-10892.en.md

@@ -0,0 +1 @@
+Require that SID or Service Name is set on Oracle Database bridge creation.

+ 1 - 0
changes/ee/fix-10913.en.md

@@ -0,0 +1 @@
+Fixed an issue where a node that left the cluster would still report plugin status from other nodes.

+ 1 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -186,6 +186,7 @@ fields(bridges) ->
                 hoconsc:map(name, ref(emqx_bridge_oracle, "config")),
                 #{
                     desc => <<"Oracle Bridge Config">>,
+                    validator => fun emqx_bridge_oracle:config_validator/1,
                     required => false
                 }
             )},

+ 1 - 1
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ee_schema_registry, [
     {description, "EMQX Schema Registry"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, [emqx_ee_schema_registry_sup]},
     {mod, {emqx_ee_schema_registry_app, []}},
     {applications, [

+ 18 - 24
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl

@@ -104,38 +104,32 @@ list_schemas() ->
 %%-------------------------------------------------------------------------------------------------
 %% `emqx_config_handler' API
 %%-------------------------------------------------------------------------------------------------
-
+%% remove
 post_config_update(
-    [?CONF_KEY_ROOT, schemas] = _Path,
+    [?CONF_KEY_ROOT, schemas, Name],
+    '$remove',
+    _NewSchemas,
+    _OldSchemas,
+    _AppEnvs
+) ->
+    async_delete_serdes([Name]),
+    ok;
+%% add or update
+post_config_update(
+    [?CONF_KEY_ROOT, schemas, NewName],
     _Cmd,
-    NewConf = #{schemas := NewSchemas},
-    OldConf = #{},
+    NewSchemas,
+    %% undefined or OldSchemas
+    _,
     _AppEnvs
 ) ->
-    OldSchemas = maps:get(schemas, OldConf, #{}),
-    #{
-        added := Added,
-        changed := Changed0,
-        removed := Removed
-    } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas),
-    Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
-    RemovedNames = maps:keys(Removed),
-    case RemovedNames of
-        [] ->
-            ok;
-        _ ->
-            async_delete_serdes(RemovedNames)
-    end,
-    SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
-    case build_serdes(SchemasToBuild) of
+    case build_serdes([{NewName, NewSchemas}]) of
         ok ->
-            {ok, NewConf};
+            {ok, #{NewName => NewSchemas}};
         {error, Reason, SerdesToRollback} ->
             lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
             {error, Reason}
-    end;
-post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
-    {ok, NewConf}.
+    end.
 
 %%-------------------------------------------------------------------------------------------------
 %% `gen_server' API

+ 2 - 2
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl

@@ -11,9 +11,9 @@
 
 start(_StartType, _StartArgs) ->
     ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
-    emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
+    emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),
     emqx_ee_schema_registry_sup:start_link().
 
 stop(_State) ->
-    emqx_conf:remove_handler(?CONF_KEY_PATH),
+    emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),
     ok.

+ 13 - 9
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -607,21 +607,25 @@ t_fail_rollback(Config) ->
     SerdeType = ?config(serde_type, Config),
     OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
     BrokenSchema = OkSchema#{<<"source">> := <<"{}">>},
-    %% hopefully, for this small map, the key order is used.
-    Serdes = #{
-        <<"a">> => OkSchema,
-        <<"z">> => BrokenSchema
-    },
+
+    ?assertMatch(
+        {ok, _},
+        emqx_conf:update(
+            [?CONF_KEY_ROOT, schemas, <<"a">>],
+            OkSchema,
+            #{}
+        )
+    ),
     ?assertMatch(
         {error, _},
         emqx_conf:update(
-            [?CONF_KEY_ROOT, schemas],
-            Serdes,
+            [?CONF_KEY_ROOT, schemas, <<"z">>],
+            BrokenSchema,
             #{}
         )
     ),
-    %% no serdes should be in the table
-    ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)),
+    ?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)),
+    %% no z serdes should be in the table
     ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)),
     ok.
 

+ 5 - 0
scripts/spellcheck/spellcheck.sh

@@ -12,6 +12,11 @@ else
     SCHEMA="$(realpath "$1")"
 fi
 
+if ! [ -f "$SCHEMA" ]; then
+  echo "Schema file $SCHEMA does not exist; did you forget to run 'make emqx{,-enterprise}' ?"
+  exit 1
+fi
+
 set +e
 docker run --rm -i --name spellcheck \
     -v "${PROJ_ROOT}"/scripts/spellcheck/dicts:/dicts \

+ 15 - 0
scripts/ui-tests/conftest.py

@@ -0,0 +1,15 @@
+import pytest
+from selenium import webdriver
+
+def pytest_addoption(parser):
+    parser.addoption("--dashboard-host", action="store", default="localhost", help="Dashboard host")
+    parser.addoption("--dashboard-port", action="store", default="18083", help="Dashboard port")
+
+@pytest.fixture
+def dashboard_host(request):
+    return request.config.getoption("--dashboard-host")
+
+@pytest.fixture
+def dashboard_port(request):
+    return request.config.getoption("--dashboard-port")
+

+ 73 - 0
scripts/ui-tests/dashboard_test.py

@@ -0,0 +1,73 @@
+import time
+import unittest
+import pytest
+from urllib.parse import urljoin
+from selenium import webdriver
+from selenium.webdriver.common.by import By
+from selenium.webdriver.common.keys import Keys
+from selenium.webdriver.chrome.options import Options
+from selenium.webdriver.support.wait import WebDriverWait
+from selenium.webdriver.common import utils
+
+@pytest.fixture
+def driver():
+    options = Options()
+    options.add_argument("--headless")
+    options.add_argument("--no-sandbox")
+    _driver = webdriver.Chrome(options=options)
+    yield _driver
+    _driver.quit()
+
+@pytest.fixture(autouse=True)
+def dashboard_url(dashboard_host, dashboard_port):
+    count = 0
+    while utils.is_connectable(port=dashboard_port, host=dashboard_host) is False:
+        if count == 30:
+            raise Exception("Dashboard is not ready")
+        count += 1
+        time.sleep(1)
+    return f"http://{dashboard_host}:{dashboard_port}"
+
+@pytest.fixture
+def login(driver, dashboard_url):
+    driver.get(dashboard_url)
+    assert "EMQX Dashboard" == driver.title
+    assert f"{dashboard_url}/#/login?to=/dashboard/overview" == driver.current_url
+    driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='text']").send_keys("admin")
+    driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='password']").send_keys("admin")
+    driver.find_element(By.XPATH, "//div[@class='login']//form[1]//button[1]").click()
+    dest_url = urljoin(dashboard_url, "/#/dashboard/overview")
+    driver.get(dest_url)
+    ensure_current_url(driver, dest_url)
+
+def ensure_current_url(driver, url):
+    count = 0
+    while url != driver.current_url:
+        if count == 10:
+            raise Exception(f"Failed to load {url}")
+        count += 1
+        time.sleep(1)
+
+def wait_title(driver):
+    return WebDriverWait(driver, 10).until(lambda x: x.find_element("xpath", "//div[@id='app']//h1[@class='header-title']")) 
+
+def test_basic(driver, login, dashboard_url):
+    driver.get(dashboard_url)
+    title = wait_title(driver)
+    assert "Cluster Overview" == title.text
+
+def test_log(driver, login, dashboard_url):
+    dest_url = urljoin(dashboard_url, "/#/log")
+    driver.get(dest_url)
+    ensure_current_url(driver, dest_url)
+    title = wait_title(driver)
+    assert "Logging" == title.text
+    label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Enable Log Handler']]")
+    assert driver.find_elements(By.ID, label.get_attribute("for"))
+    label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Log Level']]")
+    assert driver.find_elements(By.ID, label.get_attribute("for"))
+    label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Log Formatter']]")
+    assert driver.find_elements(By.ID, label.get_attribute("for"))
+    label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[./label/span[text()='Time Offset']]")
+    assert driver.find_elements(By.ID, label.get_attribute("for"))
+

+ 16 - 0
scripts/ui-tests/docker-compose.yaml

@@ -0,0 +1,16 @@
+version: '3.9'
+
+services:
+  emqx:
+    image: ${EMQX_IMAGE_TAG:-emqx/emqx:latest}
+    environment:
+      EMQX_DASHBOARD__DEFAULT_PASSWORD: admin
+
+  selenium:
+    shm_size: '2gb'
+    image: ghcr.io/emqx/selenium-chrome:latest
+    volumes:
+      - ./:/app
+    depends_on:
+      - emqx
+    command: python3 -m pytest --dashboard-host emqx --dashboard-port 18083