Просмотр исходного кода

Merge pull request #13198 from zmstone/0606-merge-release-57-to-master

0606 merge `release-57` to `master`
zmstone 1 год назад
Родитель
Сommit
15fbb966a0
64 измененных файлов с 630 добавлено и 140 удалено
  1. 1 1
      .ci/docker-compose-file/docker-compose-kafka.yaml
  2. 1 1
      .ci/docker-compose-file/docker-compose.yaml
  3. 8 8
      .github/workflows/_pr_entrypoint.yaml
  4. 9 10
      .github/workflows/_push-entrypoint.yaml
  5. 6 6
      .github/workflows/build_and_push_docker_images.yaml
  6. 2 2
      .github/workflows/build_packages.yaml
  7. 3 3
      .github/workflows/build_packages_cron.yaml
  8. 6 6
      .github/workflows/build_slim_packages.yaml
  9. 1 1
      .github/workflows/codeql.yaml
  10. 1 1
      .github/workflows/performance_test.yaml
  11. 1 1
      .github/workflows/run_relup_tests.yaml
  12. 1 1
      .tool-versions
  13. 3 3
      Makefile
  14. 13 1
      apps/emqx_auth_http/src/emqx_authz_http.erl
  15. 9 0
      apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl
  16. 1 0
      apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl
  17. 1 0
      apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl
  18. 12 1
      apps/emqx_bridge/src/emqx_bridge_v2.erl
  19. 2 0
      apps/emqx_bridge/src/emqx_bridge_v2_api.erl
  20. 1 1
      apps/emqx_bridge_azure_event_hub/rebar.config
  21. 1 1
      apps/emqx_bridge_confluent/rebar.config
  22. 1 1
      apps/emqx_bridge_kafka/rebar.config
  23. 1 1
      apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src
  24. 8 1
      apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl
  25. 1 1
      apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
  26. 6 2
      apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl
  27. 19 1
      apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl
  28. 52 6
      apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl
  29. 28 9
      apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl
  30. 39 9
      apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl
  31. 1 1
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl
  32. 1 1
      apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src
  33. 8 1
      apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl
  34. 12 0
      apps/emqx_connector/src/emqx_connector_api.erl
  35. 8 2
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl
  36. 1 1
      apps/emqx_plugins/src/emqx_plugins.app.src
  37. 21 6
      apps/emqx_plugins/src/emqx_plugins.erl
  38. 1 1
      apps/emqx_postgresql/src/emqx_postgresql.app.src
  39. 63 22
      apps/emqx_postgresql/src/emqx_postgresql.erl
  40. 4 1
      apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl
  41. 1 0
      apps/emqx_resource/src/emqx_resource.erl
  42. 48 4
      apps/emqx_resource/src/emqx_resource_manager.erl
  43. 13 0
      apps/emqx_resource/test/emqx_connector_demo.erl
  44. 37 0
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  45. 2 2
      apps/emqx_rule_engine/src/emqx_rule_events.erl
  46. 33 0
      apps/emqx_rule_engine/src/emqx_rule_funcs.erl
  47. 21 0
      apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl
  48. 2 1
      apps/emqx_utils/src/emqx_placeholder.erl
  49. 66 0
      apps/emqx_utils/test/emqx_utils_agent.erl
  50. 1 1
      bin/emqx
  51. 6 6
      build
  52. 3 0
      changes/ce/feat-13175.en.md
  53. 1 0
      changes/ce/fix-13148.en.md
  54. 6 0
      changes/ce/fix-13164.en.md
  55. 3 0
      changes/ce/fix-13181.en.md
  56. 5 0
      changes/ee/feat-13172.en.md
  57. 3 0
      changes/ee/fix-13093.en.md
  58. 1 1
      deploy/docker/Dockerfile
  59. 2 3
      mix.exs
  60. 9 0
      rel/i18n/emqx_postgresql.hocon
  61. 2 2
      scripts/buildx.sh
  62. 4 4
      scripts/pr-sanity-checks.sh
  63. 1 1
      scripts/relup-test/start-relup-test-cluster.sh
  64. 2 0
      scripts/spellcheck/dicts/emqx.txt

+ 1 - 1
.ci/docker-compose-file/docker-compose-kafka.yaml

@@ -18,7 +18,7 @@ services:
       - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
   kdc:
     hostname: kdc.emqx.net
-    image:  ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04
+    image:  ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04
     container_name: kdc.emqx.net
     expose:
       - 88 # kdc

+ 1 - 1
.ci/docker-compose-file/docker-compose.yaml

@@ -3,7 +3,7 @@ version: '3.9'
 services:
   erlang:
     container_name: erlang
-    image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04}
+    image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04}
     env_file:
       - credentials.env
       - conf.env

+ 8 - 8
.github/workflows/_pr_entrypoint.yaml

@@ -17,16 +17,16 @@ env:
 jobs:
   sanity-checks:
     runs-on: ubuntu-22.04
-    container: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04"
+    container: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04"
     outputs:
       ct-matrix: ${{ steps.matrix.outputs.ct-matrix }}
       ct-host: ${{ steps.matrix.outputs.ct-host }}
       ct-docker: ${{ steps.matrix.outputs.ct-docker }}
       version-emqx: ${{ steps.matrix.outputs.version-emqx }}
       version-emqx-enterprise: ${{ steps.matrix.outputs.version-emqx-enterprise }}
-      builder: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04"
-      builder_vsn: "5.3-5"
-      otp_vsn: "26.2.1-2"
+      builder: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04"
+      builder_vsn: "5.3-7"
+      otp_vsn: "26.2.5-1"
       elixir_vsn: "1.15.7"
 
     permissions:
@@ -96,13 +96,13 @@ jobs:
           MATRIX="$(echo "${APPS}" | jq -c '
             [
               (.[] | select(.profile == "emqx") | . + {
-                builder: "5.3-5",
-                otp: "26.2.1-2",
+                builder: "5.3-7",
+                otp: "26.2.5-1",
                 elixir: "1.15.7"
               }),
               (.[] | select(.profile == "emqx-enterprise") | . + {
-                builder: "5.3-5",
-                otp: ["26.2.1-2"][],
+                builder: "5.3-7",
+                otp: ["26.2.5-1"][],
                 elixir: "1.15.7"
               })
             ]

+ 9 - 10
.github/workflows/_push-entrypoint.yaml

@@ -24,7 +24,7 @@ env:
 jobs:
   prepare:
     runs-on: ubuntu-22.04
-    container: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04'
+    container: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04'
     outputs:
       profile: ${{ steps.parse-git-ref.outputs.profile }}
       release: ${{ steps.parse-git-ref.outputs.release }}
@@ -32,9 +32,9 @@ jobs:
       ct-matrix: ${{ steps.matrix.outputs.ct-matrix }}
       ct-host: ${{ steps.matrix.outputs.ct-host }}
       ct-docker: ${{ steps.matrix.outputs.ct-docker }}
-      builder: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04'
-      builder_vsn: '5.3-5'
-      otp_vsn: '26.2.1-2'
+      builder: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04'
+      builder_vsn: '5.3-7'
+      otp_vsn: '26.2.5-1'
       elixir_vsn: '1.15.7'
 
     permissions:
@@ -66,13 +66,13 @@ jobs:
           MATRIX="$(echo "${APPS}" | jq -c '
             [
               (.[] | select(.profile == "emqx") | . + {
-                builder: "5.3-5",
-                otp: "26.2.1-2",
+                builder: "5.3-7",
+                otp: "26.2.5-1",
                 elixir: "1.15.7"
               }),
               (.[] | select(.profile == "emqx-enterprise") | . + {
-                builder: "5.3-5",
-                otp: ["26.2.1-2"][],
+                builder: "5.3-7",
+                otp: ["26.2.5-1"][],
                 elixir: "1.15.7"
               })
             ]
@@ -107,8 +107,7 @@ jobs:
       profile: ${{ needs.prepare.outputs.profile }}
       publish: true
       latest: ${{ needs.prepare.outputs.latest }}
-      # TODO: revert this back to needs.prepare.outputs.otp_vsn when OTP 26 bug is fixed
-      otp_vsn: 25.3.2-2
+      otp_vsn: ${{ needs.prepare.outputs.otp_vsn }}
       elixir_vsn: ${{ needs.prepare.outputs.elixir_vsn }}
       builder_vsn: ${{ needs.prepare.outputs.builder_vsn }}
     secrets: inherit

+ 6 - 6
.github/workflows/build_and_push_docker_images.yaml

@@ -53,7 +53,7 @@ on:
       otp_vsn:
         required: false
         type: string
-        default: '25.3.2-2'
+        default: '26.2.5-1'
       elixir_vsn:
         required: false
         type: string
@@ -61,7 +61,7 @@ on:
       builder_vsn:
         required: false
         type: string
-        default: '5.3-5'
+        default: '5.3-7'
 
 permissions:
   contents: read
@@ -169,8 +169,8 @@ jobs:
           EMQX_DOCKERFILE: 'deploy/docker/Dockerfile'
           PKG_VSN: ${{ needs.build.outputs.PKG_VSN }}
           EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }}
-          EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }}
-          EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }}
+          OTP_VSN: ${{ inputs.otp_vsn }}
+          ELIXIR_VSN: ${{ inputs.elixir_vsn }}
           EMQX_SOURCE_TYPE: tgz
         run: |
           ./build ${PROFILE} docker
@@ -218,8 +218,8 @@ jobs:
           EMQX_DOCKERFILE: 'deploy/docker/Dockerfile'
           PKG_VSN: ${{ needs.build.outputs.PKG_VSN }}
           EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }}
-          EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }}
-          EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }}
+          OTP_VSN: ${{ inputs.otp_vsn }}
+          ELIXIR_VSN: ${{ inputs.elixir_vsn }}
           EMQX_SOURCE_TYPE: tgz
         run: |
           ./build ${PROFILE} docker

+ 2 - 2
.github/workflows/build_packages.yaml

@@ -55,7 +55,7 @@ on:
       otp_vsn:
         required: false
         type: string
-        default: '26.2.1-2'
+        default: '26.2.5-1'
       elixir_vsn:
         required: false
         type: string
@@ -63,7 +63,7 @@ on:
       builder_vsn:
         required: false
         type: string
-        default: '5.3-5'
+        default: '5.3-7'
 
 permissions:
   contents: read

+ 3 - 3
.github/workflows/build_packages_cron.yaml

@@ -23,8 +23,8 @@ jobs:
       fail-fast: false
       matrix:
         profile:
-          - ['emqx', 'master', '5.3-5:1.15.7-26.2.1-2']
-          - ['emqx', 'release-57', '5.3-5:1.15.7-26.2.1-2']
+          - ['emqx', 'master', '5.3-7:1.15.7-26.2.5-1']
+          - ['emqx', 'release-57', '5.3-7:1.15.7-26.2.5-1']
         os:
           - ubuntu22.04
           - amzn2023
@@ -92,7 +92,7 @@ jobs:
         branch:
           - master
         otp:
-          - 26.2.1-2
+          - 26.2.5-1
         os:
           - macos-12-arm64
 

+ 6 - 6
.github/workflows/build_slim_packages.yaml

@@ -27,15 +27,15 @@ on:
       builder:
         required: false
         type: string
-        default: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04'
+        default: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04'
       builder_vsn:
         required: false
         type: string
-        default: '5.3-5'
+        default: '5.3-7'
       otp_vsn:
         required: false
         type: string
-        default: '26.2.1-2'
+        default: '26.2.5-1'
       elixir_vsn:
         required: false
         type: string
@@ -54,9 +54,9 @@ jobs:
       fail-fast: false
       matrix:
         profile:
-          - ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "x64"]
-          - ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "arm64"]
-          - ["emqx-enterprise", "26.2.1-2", "ubuntu22.04", "erlang", "x64"]
+          - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "x64"]
+          - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "arm64"]
+          - ["emqx-enterprise", "26.2.5-1", "ubuntu22.04", "erlang", "x64"]
 
     container: "ghcr.io/emqx/emqx-builder/${{ inputs.builder_vsn }}:${{ inputs.elixir_vsn }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
 

+ 1 - 1
.github/workflows/codeql.yaml

@@ -18,7 +18,7 @@ jobs:
       actions: read
       security-events: write
     container:
-      image: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04
+      image: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04
 
     strategy:
       fail-fast: false

+ 1 - 1
.github/workflows/performance_test.yaml

@@ -26,7 +26,7 @@ jobs:
   prepare:
     runs-on: ubuntu-latest
     if: github.repository_owner == 'emqx'
-    container: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu20.04
+    container: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu20.04
     outputs:
       BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
       PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}

+ 1 - 1
.github/workflows/run_relup_tests.yaml

@@ -74,7 +74,7 @@ jobs:
     steps:
     - uses: erlef/setup-beam@0a541161e47ec43ccbd9510053c5f336ca76c2a2 # v1.17.6
       with:
-        otp-version: 26.2.1
+        otp-version: 26.2.5
     - uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
       with:
         repository: hawk/lux

+ 1 - 1
.tool-versions

@@ -1,2 +1,2 @@
-erlang 26.2.1-2
+erlang 26.2.5-1
 elixir 1.15.7-otp-26

+ 3 - 3
Makefile

@@ -7,7 +7,7 @@ REBAR = $(CURDIR)/rebar3
 BUILD = $(CURDIR)/build
 SCRIPTS = $(CURDIR)/scripts
 export EMQX_RELUP ?= true
-export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12
+export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12
 export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:12-slim
 export EMQX_REL_FORM ?= tgz
 export QUICER_DOWNLOAD_FROM_RELEASE = 1
@@ -20,8 +20,8 @@ endif
 
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.9.0
-export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0
+export EMQX_DASHBOARD_VERSION ?= v1.9.1-beta.1
+export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1-beta.1
 
 -include default-profile.mk
 PROFILE ?= emqx

+ 13 - 1
apps/emqx_auth_http/src/emqx_authz_http.erl

@@ -37,6 +37,10 @@
 -compile(nowarn_export_all).
 -endif.
 
+-define(VAR_ACCESS, "access").
+-define(LEGACY_SUBSCRIBE_ACTION, 1).
+-define(LEGACY_PUBLISH_ACTION, 2).
+
 -define(ALLOWED_VARS, [
     ?VAR_USERNAME,
     ?VAR_CLIENTID,
@@ -47,6 +51,7 @@
     ?VAR_ACTION,
     ?VAR_CERT_SUBJECT,
     ?VAR_CERT_CN_NAME,
+    ?VAR_ACCESS,
     ?VAR_NS_CLIENT_ATTRS
 ]).
 
@@ -185,7 +190,14 @@ generate_request(Action, Topic, Client, Config) ->
 
 client_vars(Client, Action, Topic) ->
     Vars = emqx_authz_utils:vars_for_rule_query(Client, Action),
-    Vars#{topic => Topic}.
+    add_legacy_access_var(Vars#{topic => Topic}).
+
+add_legacy_access_var(#{action := subscribe} = Vars) ->
+    Vars#{access => ?LEGACY_SUBSCRIBE_ACTION};
+add_legacy_access_var(#{action := publish} = Vars) ->
+    Vars#{access => ?LEGACY_PUBLISH_ACTION};
+add_legacy_access_var(Vars) ->
+    Vars.
 
 allowed_vars() ->
     allowed_vars(emqx_authz:feature_available(rich_actions)).

+ 9 - 0
apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl

@@ -199,6 +199,7 @@ t_query_params(_Config) ->
                 mountpoint := <<"MOUNTPOINT">>,
                 topic := <<"t/1">>,
                 action := <<"publish">>,
+                access := <<"2">>,
                 qos := <<"1">>,
                 retain := <<"false">>
             } = cowboy_req:match_qs(
@@ -210,6 +211,7 @@ t_query_params(_Config) ->
                     mountpoint,
                     topic,
                     action,
+                    access,
                     qos,
                     retain
                 ],
@@ -227,6 +229,7 @@ t_query_params(_Config) ->
                 "mountpoint=${mountpoint}&"
                 "topic=${topic}&"
                 "action=${action}&"
+                "access=${access}&"
                 "qos=${qos}&"
                 "retain=${retain}"
             >>
@@ -261,6 +264,7 @@ t_path(_Config) ->
                     "MOUNTPOINT/"
                     "t%2F1/"
                     "publish/"
+                    "2/"
                     "1/"
                     "false"
                 >>,
@@ -278,6 +282,7 @@ t_path(_Config) ->
                 "${mountpoint}/"
                 "${topic}/"
                 "${action}/"
+                "${access}/"
                 "${qos}/"
                 "${retain}"
             >>
@@ -318,6 +323,7 @@ t_json_body(_Config) ->
                     <<"mountpoint">> := <<"MOUNTPOINT">>,
                     <<"topic">> := <<"t">>,
                     <<"action">> := <<"publish">>,
+                    <<"access">> := <<"2">>,
                     <<"qos">> := <<"1">>,
                     <<"retain">> := <<"false">>
                 },
@@ -335,6 +341,7 @@ t_json_body(_Config) ->
                 <<"mountpoint">> => <<"${mountpoint}">>,
                 <<"topic">> => <<"${topic}">>,
                 <<"action">> => <<"${action}">>,
+                <<"access">> => <<"${access}">>,
                 <<"qos">> => <<"${qos}">>,
                 <<"retain">> => <<"${retain}">>
             }
@@ -413,6 +420,7 @@ t_placeholder_and_body(_Config) ->
                     <<"mountpoint">> := <<"MOUNTPOINT">>,
                     <<"topic">> := <<"t">>,
                     <<"action">> := <<"publish">>,
+                    <<"access">> := <<"2">>,
                     <<"CN">> := ?PH_CERT_CN_NAME,
                     <<"CS">> := ?PH_CERT_SUBJECT
                 },
@@ -430,6 +438,7 @@ t_placeholder_and_body(_Config) ->
                 <<"mountpoint">> => <<"${mountpoint}">>,
                 <<"topic">> => <<"${topic}">>,
                 <<"action">> => <<"${action}">>,
+                <<"access">> => <<"${access}">>,
                 <<"CN">> => ?PH_CERT_CN_NAME,
                 <<"CS">> => ?PH_CERT_SUBJECT
             },

+ 1 - 0
apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl

@@ -606,6 +606,7 @@ pgsql_server() ->
 pgsql_config() ->
     #{
         auto_reconnect => true,
+        disable_prepared_statements => false,
         database => <<"mqtt">>,
         username => <<"root">>,
         password => <<"public">>,

+ 1 - 0
apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl

@@ -426,6 +426,7 @@ setup_config(SpecialParams) ->
 pgsql_config() ->
     #{
         auto_reconnect => true,
+        disable_prepared_statements => false,
         database => <<"mqtt">>,
         username => <<"root">>,
         password => <<"public">>,

+ 12 - 1
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -1151,7 +1151,18 @@ post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, undefin
 post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when
     ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES
 ->
-    ok = uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf),
+    case uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf) of
+        ok ->
+            ok;
+        {error, timeout} ->
+            throw(<<
+                "Timed out trying to remove action or source.  Please try again and,"
+                " if the error persists, try disabling the connector before retrying."
+            >>);
+        {error, not_found} ->
+            %% Should not happen, unless config is inconsistent.
+            throw(<<"Referenced connector not found">>)
+    end,
     ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf),
     Bridges = emqx_utils_maps:deep_put(
         [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf

+ 2 - 0
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -879,6 +879,8 @@ handle_disable_enable(ConfRootKey, Id, Enable) ->
                 ?SERVICE_UNAVAILABLE(<<"request timeout">>);
             {error, timeout} ->
                 ?SERVICE_UNAVAILABLE(<<"request timeout">>);
+            {error, Reason} when is_binary(Reason) ->
+                ?BAD_REQUEST(Reason);
             {error, Reason} ->
                 ?INTERNAL_ERROR(Reason)
         end

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -5,7 +5,7 @@
     {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
-    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
+    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
     {snappyer, "1.2.9"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -5,7 +5,7 @@
     {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
-    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
+    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
     {snappyer, "1.2.9"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -5,7 +5,7 @@
     {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
-    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
+    {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
     {snappyer, "1.2.9"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},

+ 1 - 1
apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_matrix, [
     {description, "EMQX Enterprise MatrixDB Bridge"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [
         kernel,

+ 8 - 1
apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl

@@ -10,7 +10,8 @@
     bridge_v1_type_name/0,
     action_type_name/0,
     connector_type_name/0,
-    schema_module/0
+    schema_module/0,
+    connector_action_config_to_bridge_v1_config/2
 ]).
 
 bridge_v1_type_name() -> matrix.
@@ -20,3 +21,9 @@ action_type_name() -> matrix.
 connector_type_name() -> matrix.
 
 schema_module() -> emqx_bridge_matrix.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
+        ConnectorConfig,
+        ActionConfig
+    ).

+ 1 - 1
apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pgsql, [
     {description, "EMQX Enterprise PostgreSQL Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [
         kernel,

+ 6 - 2
apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl

@@ -82,6 +82,7 @@ fields("get_bridge_v2") ->
 fields("post_bridge_v2") ->
     fields("post", pgsql, pgsql_action);
 fields("config") ->
+    %% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale)
     [
         {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
         {sql,
@@ -95,8 +96,11 @@ fields("config") ->
                 #{desc => ?DESC("local_topic"), default => undefined}
             )}
     ] ++ emqx_resource_schema:fields("resource_opts") ++
-        (emqx_postgresql:fields(config) --
-            emqx_connector_schema_lib:prepare_statement_fields());
+        proplists:delete(
+            disable_prepared_statements,
+            emqx_postgresql:fields(config) --
+                emqx_connector_schema_lib:prepare_statement_fields()
+        );
 fields("post") ->
     fields("post", ?ACTION_TYPE, "config");
 fields("put") ->

+ 19 - 1
apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl

@@ -10,7 +10,8 @@
     bridge_v1_type_name/0,
     action_type_name/0,
     connector_type_name/0,
-    schema_module/0
+    schema_module/0,
+    connector_action_config_to_bridge_v1_config/2
 ]).
 
 bridge_v1_type_name() -> pgsql.
@@ -20,3 +21,20 @@ action_type_name() -> pgsql.
 connector_type_name() -> pgsql.
 
 schema_module() -> emqx_bridge_pgsql.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config(
+        ConnectorConfig,
+        ActionConfig
+    ),
+    maps:with(bridge_v1_fields(), Config0).
+
+%%------------------------------------------------------------------------------------------
+%% Internal helper functions
+%%------------------------------------------------------------------------------------------
+
+bridge_v1_fields() ->
+    [
+        emqx_utils_conv:bin(K)
+     || {K, _V} <- emqx_bridge_pgsql:fields("config")
+    ].

+ 52 - 6
apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl

@@ -19,6 +19,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(BRIDGE_TYPE, pgsql).
 -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
@@ -33,7 +34,18 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    All0 = emqx_common_test_helpers:all(?MODULE),
+    All = All0 -- matrix_cases(),
+    Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
+    Groups ++ All.
+
+matrix_cases() ->
+    [
+        t_disable_prepared_statements
+    ].
+
+groups() ->
+    emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
 
 init_per_suite(Config) ->
     PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
@@ -80,10 +92,26 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(Apps),
     ok.
 
-init_per_testcase(TestCase, Config) ->
-    common_init_per_testcase(TestCase, Config).
+init_per_group(Group, Config) when
+    Group =:= postgres;
+    Group =:= timescale;
+    Group =:= matrix
+->
+    [
+        {bridge_type, group_to_type(Group)},
+        {connector_type, group_to_type(Group)}
+        | Config
+    ];
+init_per_group(_Group, Config) ->
+    Config.
+
+group_to_type(postgres) -> pgsql;
+group_to_type(Group) -> Group.
 
-common_init_per_testcase(TestCase, Config) ->
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(TestCase, Config) ->
     ct:timetrap(timer:seconds(60)),
     emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     emqx_config:delete_override_conf_files(),
@@ -103,10 +131,10 @@ common_init_per_testcase(TestCase, Config) ->
     BridgeConfig = bridge_config(Name, Name),
     ok = snabbkaffe:start_trace(),
     [
-        {connector_type, ?CONNECTOR_TYPE},
+        {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
         {connector_name, Name},
         {connector_config, ConnectorConfig},
-        {bridge_type, ?BRIDGE_TYPE},
+        {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
         {bridge_name, Name},
         {bridge_config, BridgeConfig}
         | NConfig
@@ -232,3 +260,21 @@ t_sync_query(Config) ->
 t_start_action_or_source_with_disabled_connector(Config) ->
     ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
     ok.
+
+t_disable_prepared_statements(matrix) ->
+    [[postgres], [timescale], [matrix]];
+t_disable_prepared_statements(Config0) ->
+    ConnectorConfig0 = ?config(connector_config, Config0),
+    ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
+    Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config,
+        fun make_message/0,
+        fun(Res) -> ?assertMatch({ok, _}, Res) end,
+        postgres_bridge_connector_on_query_return
+    ),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+    ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+    ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
+    ok.

+ 28 - 9
apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl

@@ -128,8 +128,8 @@ on_query(
                 #{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
             ),
             Result;
-        Error ->
-            Error
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 on_batch_query(
@@ -165,8 +165,8 @@ on_batch_query(
                 }
             ),
             Result;
-        Error ->
-            Error
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 trace_format_commands(Commands0) ->
@@ -204,11 +204,15 @@ query(InstId, Query, RedisConnSt) ->
     end.
 
 proc_command_template(CommandTemplate, Msg) ->
-    lists:map(
-        fun(ArgTks) ->
-            emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary})
-        end,
-        CommandTemplate
+    lists:reverse(
+        lists:foldl(
+            fun(ArgTks, Acc) ->
+                New = proc_tmpl(ArgTks, Msg),
+                lists:reverse(New, Acc)
+            end,
+            [],
+            CommandTemplate
+        )
     ).
 
 preproc_command_template(CommandTemplate) ->
@@ -216,3 +220,18 @@ preproc_command_template(CommandTemplate) ->
         fun emqx_placeholder:preproc_tmpl/1,
         CommandTemplate
     ).
+
+%% This function mimics emqx_placeholder:proc_tmpl/3 but with an
+%% injected special handling of map_to_redis_hset_args result
+%% which is a list of redis command args (all in binary string format)
+proc_tmpl([{var, Phld}], Data) ->
+    case emqx_placeholder:lookup_var(Phld, Data) of
+        [map_to_redis_hset_args | L] ->
+            L;
+        Other ->
+            [emqx_utils_conv:bin(Other)]
+    end;
+proc_tmpl(Tokens, Data) ->
+    %% more than just a var ref, but a string, or a concatenation of string and a var
+    %% this is must be a single arg, format it into a binary
+    [emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary})].

+ 39 - 9
apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl

@@ -46,7 +46,8 @@ matrix_testcases() ->
         t_start_stop,
         t_create_via_http,
         t_on_get_status,
-        t_sync_query
+        t_sync_query,
+        t_map_to_redis_hset_args
     ].
 
 init_per_suite(Config) ->
@@ -133,7 +134,7 @@ common_init_per_testcase(TestCase, Config) ->
     Path = group_path(Config),
     ct:comment(Path),
     ConnectorConfig = connector_config(Name, Path, NConfig),
-    BridgeConfig = action_config(Name, Path, Name),
+    BridgeConfig = action_config(Name, Path, Name, TestCase),
     ok = snabbkaffe:start_trace(),
     [
         {connector_type, ?CONNECTOR_TYPE},
@@ -222,7 +223,14 @@ parse_and_check_connector_config(InnerConfigMap, Name) ->
     ct:pal("parsed config: ~p", [Config]),
     InnerConfigMap.
 
-action_config(Name, Path, ConnectorId) ->
+action_config(Name, Path, ConnectorId, TestCase) ->
+    Template =
+        try
+            ?MODULE:TestCase(command_template)
+        catch
+            _:_ ->
+                [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>]
+        end,
     [RedisType, _Transport | _] = Path,
     CommonCfg =
         #{
@@ -230,7 +238,7 @@ action_config(Name, Path, ConnectorId) ->
             <<"connector">> => ConnectorId,
             <<"parameters">> =>
                 #{
-                    <<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>],
+                    <<"command_template">> => Template,
                     <<"redis_type">> => atom_to_binary(RedisType)
                 },
             <<"local_topic">> => <<"t/redis">>,
@@ -262,8 +270,11 @@ parse_and_check_bridge_config(InnerConfigMap, Name) ->
     emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
 
 make_message() ->
-    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
     Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    make_message_with_payload(Payload).
+
+make_message_with_payload(Payload) ->
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
     #{
         clientid => ClientId,
         payload => Payload,
@@ -290,7 +301,7 @@ t_start_stop(matrix) ->
         [sentinel, tcp],
         [cluster, tcp]
     ]};
-t_start_stop(Config) ->
+t_start_stop(Config) when is_list(Config) ->
     emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped),
     ok.
 
@@ -300,7 +311,7 @@ t_create_via_http(matrix) ->
         [sentinel, tcp],
         [cluster, tcp]
     ]};
-t_create_via_http(Config) ->
+t_create_via_http(Config) when is_list(Config) ->
     emqx_bridge_v2_testlib:t_create_via_http(Config),
     ok.
 
@@ -310,7 +321,7 @@ t_on_get_status(matrix) ->
         [sentinel, tcp],
         [cluster, tcp]
     ]};
-t_on_get_status(Config) ->
+t_on_get_status(Config) when is_list(Config) ->
     emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
     ok.
 
@@ -320,7 +331,7 @@ t_sync_query(matrix) ->
         [sentinel, tcp],
         [cluster, tcp]
     ]};
-t_sync_query(Config) ->
+t_sync_query(Config) when is_list(Config) ->
     ok = emqx_bridge_v2_testlib:t_sync_query(
         Config,
         fun make_message/0,
@@ -328,3 +339,22 @@ t_sync_query(Config) ->
         redis_bridge_connector_send_done
     ),
     ok.
+
+t_map_to_redis_hset_args(matrix) ->
+    {map_to_redis_hset_args, [
+        [single, tcp],
+        [sentinel, tcp],
+        [cluster, tcp]
+    ]};
+t_map_to_redis_hset_args(command_template) ->
+    [<<"HMSET">>, <<"t_map_to_redis_hset_args">>, <<"${payload}">>];
+t_map_to_redis_hset_args(Config) when is_list(Config) ->
+    Payload = emqx_rule_funcs:map_to_redis_hset_args(#{<<"a">> => 1, <<"b">> => <<"2">>}),
+    MsgFn = fun() -> make_message_with_payload(Payload) end,
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config,
+        MsgFn,
+        fun(Res) -> ?assertMatch({ok, _}, Res) end,
+        redis_bridge_connector_send_done
+    ),
+    ok.

+ 1 - 1
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl

@@ -78,7 +78,7 @@
 %% https://www.erlang.org/doc/man/odbc.html
 
 %% as returned by connect/2
--type connection_reference() :: pid().
+-type connection_reference() :: odbc:connection_reference().
 -type time_out() :: milliseconds() | infinity.
 -type sql() :: string() | binary().
 -type milliseconds() :: pos_integer().

+ 1 - 1
apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_timescale, [
     {description, "EMQX Enterprise TimescaleDB Bridge"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_resource]},
     {env, [

+ 8 - 1
apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl

@@ -10,7 +10,8 @@
     bridge_v1_type_name/0,
     action_type_name/0,
     connector_type_name/0,
-    schema_module/0
+    schema_module/0,
+    connector_action_config_to_bridge_v1_config/2
 ]).
 
 bridge_v1_type_name() -> timescale.
@@ -20,3 +21,9 @@ action_type_name() -> timescale.
 connector_type_name() -> timescale.
 
 schema_module() -> emqx_bridge_timescale.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
+        ConnectorConfig,
+        ActionConfig
+    ).

+ 12 - 0
apps/emqx_connector/src/emqx_connector_api.erl

@@ -685,6 +685,10 @@ is_ok(OkResult = {ok, _}) ->
     OkResult;
 is_ok(Error = {error, _}) ->
     Error;
+is_ok(timeout) ->
+    %% Returned by `emqx_resource_manager:start' when the connector fails to reach either
+    %% `?status_connected' or `?status_disconnected' within `start_timeout'.
+    timeout;
 is_ok(ResL) ->
     case
         lists:filter(
@@ -723,6 +727,14 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName
     case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
         Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
             ?NO_CONTENT;
+        timeout ->
+            %% Returned by `emqx_resource_manager:start' when the connector fails to reach
+            %% either `?status_connected' or `?status_disconnected' within
+            %% `start_timeout'.
+            ?BAD_REQUEST(<<
+                "Timeout while waiting for connector to reach connected status."
+                " Please try again."
+            >>);
         {error, not_implemented} ->
             ?NOT_IMPLEMENTED;
         {error, timeout} ->

+ 8 - 2
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -536,14 +536,20 @@ do_start_connector(TestType, Config) ->
         request_json(
             post,
             uri(["connectors"]),
-            ?KAFKA_CONNECTOR(BadName, BadServer),
+            (?KAFKA_CONNECTOR(BadName, BadServer))#{
+                <<"resource_opts">> => #{
+                    <<"start_timeout">> => <<"10ms">>
+                }
+            },
             Config
         )
     ),
     BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName),
+    %% Checks that an `emqx_resource_manager:start' timeout when waiting for the resource to
+    %% be connected doesn't return a 500 error.
     ?assertMatch(
         %% request from product: return 400 on such errors
-        {ok, SC, _} when SC == 500 orelse SC == 400,
+        {ok, 400, _},
         request(post, {operation, TestType, start, BadConnectorID}, Config)
     ),
     ok = gen_tcp:close(Sock),

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugins, [
     {description, "EMQX Plugin Management"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {modules, []},
     {mod, {emqx_plugins_app, []}},
     {applications, [kernel, stdlib, emqx, erlavro]},

+ 21 - 6
apps/emqx_plugins/src/emqx_plugins.erl

@@ -136,6 +136,19 @@ parse_name_vsn(NameVsn) when is_list(NameVsn) ->
 make_name_vsn_string(Name, Vsn) ->
     binary_to_list(iolist_to_binary([Name, "-", Vsn])).
 
+app_dir(AppName, Apps) ->
+    case
+        lists:filter(
+            fun(AppNameVsn) -> nomatch =/= string:prefix(AppNameVsn, AppName) end,
+            Apps
+        )
+    of
+        [AppNameVsn] ->
+            {ok, AppNameVsn};
+        _ ->
+            {error, not_found}
+    end.
+
 %%--------------------------------------------------------------------
 %% Package operations
 
@@ -1372,12 +1385,14 @@ plugin_dir(NameVsn) ->
 
 -spec plugin_priv_dir(name_vsn()) -> string().
 plugin_priv_dir(NameVsn) ->
-    case read_plugin_info(NameVsn, #{fill_readme => false}) of
-        {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} ->
-            AppDir = make_name_vsn_string(Name, Vsn),
-            wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"]));
-        _ ->
-            wrap_to_list(filename:join([install_dir(), NameVsn, "priv"]))
+    maybe
+        {ok, #{<<"name">> := Name, <<"rel_apps">> := Apps}} ?=
+            read_plugin_info(NameVsn, #{fill_readme => false}),
+        {ok, AppDir} ?= app_dir(Name, Apps),
+        wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"]))
+    else
+        %% Otherwise assume the priv directory is under the plugin root directory
+        _ -> wrap_to_list(filename:join([install_dir(), NameVsn, "priv"]))
     end.
 
 -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.

+ 1 - 1
apps/emqx_postgresql/src/emqx_postgresql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_postgresql, [
     {description, "EMQX PostgreSQL Database Connector"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [
         kernel,

+ 63 - 22
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -50,6 +50,8 @@
     execute_batch/3
 ]).
 
+-export([disable_prepared_statements/0]).
+
 %% for ecpool workers usage
 -export([do_get_status/1, prepare_sql_to_conn/2]).
 
@@ -62,7 +64,7 @@
     #{
         pool_name := binary(),
         query_templates := #{binary() => template()},
-        prepares := #{binary() => epgsql:statement()} | {error, _}
+        prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
     }.
 
 %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
@@ -78,7 +80,10 @@ roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 fields(config) ->
-    [{server, server()}] ++
+    [
+        {server, server()},
+        disable_prepared_statements()
+    ] ++
         adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
         emqx_connector_schema_lib:ssl_fields() ++
         emqx_connector_schema_lib:prepare_statement_fields().
@@ -87,6 +92,17 @@ server() ->
     Meta = #{desc => ?DESC("server")},
     emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
 
+disable_prepared_statements() ->
+    {disable_prepared_statements,
+        hoconsc:mk(
+            boolean(),
+            #{
+                default => false,
+                required => false,
+                desc => ?DESC("disable_prepared_statements")
+            }
+        )}.
+
 adjust_fields(Fields) ->
     lists:map(
         fun
@@ -108,6 +124,7 @@ on_start(
     InstId,
     #{
         server := Server,
+        disable_prepared_statements := DisablePreparedStatements,
         database := DB,
         username := User,
         pool_size := PoolSize,
@@ -143,11 +160,16 @@ on_start(
         {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
         {pool_size, PoolSize}
     ],
-    State1 = parse_prepare_sql(Config, <<"send_message">>),
+    State1 = parse_sql_template(Config, <<"send_message">>),
     State2 = State1#{installed_channels => #{}},
     case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
         ok ->
-            {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})};
+            Prepares =
+                case DisablePreparedStatements of
+                    true -> disabled;
+                    false -> #{}
+                end,
+            {ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
         {error, Reason} ->
             ?tp(
                 pgsql_connector_start_failed,
@@ -209,13 +231,17 @@ on_add_channel(
 
 create_channel_state(
     ChannelId,
-    #{pool_name := PoolName} = _ConnectorState,
+    #{
+        pool_name := PoolName,
+        prepares := Prepares
+    } = _ConnectorState,
     #{parameters := Parameters} = _ChannelConfig
 ) ->
-    State1 = parse_prepare_sql(Parameters, ChannelId),
+    State1 = parse_sql_template(Parameters, ChannelId),
     {ok,
         init_prepare(State1#{
             pool_name => PoolName,
+            prepares => Prepares,
             prepare_statement => #{}
         })}.
 
@@ -233,6 +259,8 @@ on_remove_channel(
     NewState = OldState#{installed_channels => NewInstalledChannels},
     {ok, NewState}.
 
+close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
+    ok;
 close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
     WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
     close_prepared_statement(WorkerPids, ChannelId, State),
@@ -243,7 +271,7 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
     %% prepared statement doesn't exist.
     try ecpool_worker:client(WorkerPid) of
         {ok, Conn} ->
-            Statement = get_prepared_statement(ChannelId, State),
+            Statement = get_templated_statement(ChannelId, State),
             _ = epgsql:close(Conn, Statement),
             close_prepared_statement(Rest, ChannelId, State);
         _ ->
@@ -303,21 +331,23 @@ on_query(
         sql => NameOrSQL,
         state => State
     }),
-    Type = pgsql_query_type(TypeOrKey),
+    Type = pgsql_query_type(TypeOrKey, State),
     {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
     Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
     ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
     handle_result(Res).
 
-pgsql_query_type(sql) ->
+pgsql_query_type(_TypeOrTag, #{prepares := disabled}) ->
     query;
-pgsql_query_type(query) ->
+pgsql_query_type(sql, _ConnectorState) ->
     query;
-pgsql_query_type(prepared_query) ->
+pgsql_query_type(query, _ConnectorState) ->
+    query;
+pgsql_query_type(prepared_query, _ConnectorState) ->
     prepared_query;
 %% for bridge
-pgsql_query_type(_) ->
-    pgsql_query_type(prepared_query).
+pgsql_query_type(_, ConnectorState) ->
+    pgsql_query_type(prepared_query, ConnectorState).
 
 on_batch_query(
     InstId,
@@ -336,9 +366,9 @@ on_batch_query(
             ?SLOG(error, Log),
             {error, {unrecoverable_error, batch_prepare_not_implemented}};
         {_Statement, RowTemplate} ->
-            PrepStatement = get_prepared_statement(BinKey, State),
+            StatementTemplate = get_templated_statement(BinKey, State),
             Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
-            case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of
+            case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of
                 {error, _Error} = Result ->
                     handle_result(Result);
                 {_Column, Results} ->
@@ -359,12 +389,19 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
 proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
     {SQLOrKey, Params};
 proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
+    DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled,
     BinKey = to_bin(TypeOrKey),
     case get_template(BinKey, State) of
         undefined ->
             {SQLOrData, Params};
-        {_Statement, RowTemplate} ->
-            {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)}
+        {Statement, RowTemplate} ->
+            Rendered = render_prepare_sql_row(RowTemplate, SQLOrData),
+            case DisablePreparedStatements of
+                true ->
+                    {Statement, Rendered};
+                false ->
+                    {BinKey, Rendered}
+            end
     end.
 
 get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
@@ -376,14 +413,14 @@ get_template(Key, #{query_templates := Templates}) ->
     BinKey = to_bin(Key),
     maps:get(BinKey, Templates, undefined).
 
-get_prepared_statement(Key, #{installed_channels := Channels} = _State) when
+get_templated_statement(Key, #{installed_channels := Channels} = _State) when
     is_map_key(Key, Channels)
 ->
     BinKey = to_bin(Key),
     ChannelState = maps:get(BinKey, Channels),
     ChannelPreparedStatements = maps:get(prepares, ChannelState),
     maps:get(BinKey, ChannelPreparedStatements);
-get_prepared_statement(Key, #{prepares := PrepStatements}) ->
+get_templated_statement(Key, #{prepares := PrepStatements}) ->
     BinKey = to_bin(Key),
     maps:get(BinKey, PrepStatements).
 
@@ -480,6 +517,8 @@ do_check_prepares(
         {error, Reason} ->
             {error, Reason}
     end;
+do_check_prepares(#{prepares := disabled}) ->
+    ok;
 do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
     ok;
 do_check_prepares(#{prepares := {error, _}} = State) ->
@@ -579,7 +618,7 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
 conn_opts([_Opt | Opts], Acc) ->
     conn_opts(Opts, Acc).
 
-parse_prepare_sql(Config, SQLID) ->
+parse_sql_template(Config, SQLID) ->
     Queries =
         case Config of
             #{prepare_statement := Qs} ->
@@ -589,10 +628,10 @@ parse_prepare_sql(Config, SQLID) ->
             #{} ->
                 #{}
         end,
-    Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries),
+    Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
     #{query_templates => Templates}.
 
-parse_prepare_sql(Key, Query, Acc) ->
+parse_sql_template(Key, Query, Acc) ->
     Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
     Acc#{Key => Template}.
 
@@ -601,6 +640,8 @@ render_prepare_sql_row(RowTemplate, Data) ->
     {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
     Row.
 
+init_prepare(State = #{prepares := disabled}) ->
+    State;
 init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
     State;
 init_prepare(State = #{}) ->

+ 4 - 1
apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl

@@ -47,7 +47,10 @@ roots() ->
     [].
 
 fields("connection_fields") ->
-    [{server, server()}] ++
+    [
+        {server, server()},
+        emqx_postgresql:disable_prepared_statements()
+    ] ++
         adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
         emqx_connector_schema_lib:ssl_fields();
 fields("config_connector") ->

+ 1 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -85,6 +85,7 @@
     get_allocated_resources_list/1,
     forget_allocated_resources/1,
     deallocate_resource/2,
+    clean_allocated_resources/2,
     %% Get channel config from resource
     call_get_channel_config/3,
     % Call the format query result function

+ 48 - 4
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -63,6 +63,10 @@
 %% Internal exports.
 -export([worker_resource_health_check/1, worker_channel_health_check/2]).
 
+-ifdef(TEST).
+-export([stop/2]).
+-endif.
+
 % State record
 -record(data, {
     id,
@@ -254,7 +258,17 @@ remove(ResId) when is_binary(ResId) ->
 -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
 remove(ResId, ClearMetrics) when is_binary(ResId) ->
     try
-        safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
+        case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
+            {error, timeout} ->
+                ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
+                    action => remove,
+                    resource_id => ResId
+                }),
+                force_kill(ResId),
+                ok;
+            Res ->
+                Res
+        end
     after
         %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
         %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition.
@@ -274,7 +288,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
     end.
 
 %% @doc Start the resource
--spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
+-spec start(resource_id(), creation_opts()) -> ok | timeout | {error, Reason :: term()}.
 start(ResId, Opts) ->
     StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION),
     case safe_call(ResId, start, StartTimeout) of
@@ -287,9 +301,20 @@ start(ResId, Opts) ->
 %% @doc Stop the resource
 -spec stop(resource_id()) -> ok | {error, Reason :: term()}.
 stop(ResId) ->
-    case safe_call(ResId, stop, ?T_OPERATION) of
+    stop(ResId, ?T_OPERATION).
+
+-spec stop(resource_id(), timeout()) -> ok | {error, Reason :: term()}.
+stop(ResId, Timeout) ->
+    case safe_call(ResId, stop, Timeout) of
         ok ->
             ok;
+        {error, timeout} ->
+            ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
+                action => stop,
+                resource_id => ResId
+            }),
+            force_kill(ResId),
+            ok;
         {error, _Reason} = Error ->
             Error
     end.
@@ -406,6 +431,25 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
 get_error(_ResId, #{error := Error}) ->
     Error.
 
+force_kill(ResId) ->
+    case gproc:whereis_name(?NAME(ResId)) of
+        undefined ->
+            ok;
+        Pid when is_pid(Pid) ->
+            exit(Pid, kill),
+            try_clean_allocated_resources(ResId),
+            ok
+    end.
+
+try_clean_allocated_resources(ResId) ->
+    case try_read_cache(ResId) of
+        #data{mod = Mod} ->
+            catch emqx_resource:clean_allocated_resources(ResId, Mod),
+            ok;
+        _ ->
+            ok
+    end.
+
 %% Server start/stop callbacks
 
 %% @doc Function called from the supervisor to actually start the server
@@ -737,7 +781,7 @@ maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) ->
     Data.
 
 stop_resource(#data{state = ResState, id = ResId} = Data) ->
-    %% We don't care the return value of the Mod:on_stop/2.
+    %% We don't care about the return value of `Mod:on_stop/2'.
     %% The callback mod should make sure the resource is stopped after on_stop/2
     %% is returned.
     HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),

+ 13 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -71,6 +71,16 @@ set_callback_mode(Mode) ->
 on_start(_InstId, #{create_error := true}) ->
     ?tp(connector_demo_start_error, #{}),
     error("some error");
+on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) ->
+    ?tp(connector_demo_start_delay, #{}),
+    case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of
+        not_called ->
+            emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep),
+            timer:sleep(Delay),
+            on_start(InstId, maps:remove(create_error, Opts));
+        called ->
+            on_start(InstId, maps:remove(create_error, Opts))
+    end;
 on_start(InstId, #{name := Name} = Opts) ->
     Register = maps:get(register, Opts, false),
     StopError = maps:get(stop_error, Opts, false),
@@ -81,6 +91,9 @@ on_start(InstId, #{name := Name} = Opts) ->
         pid => spawn_counter_process(Name, Register)
     }}.
 
+on_stop(_InstId, undefined) ->
+    ?tp(connector_demo_free_resources_without_state, #{}),
+    ok;
 on_stop(_InstId, #{stop_error := true}) ->
     {error, stop_error};
 on_stop(InstId, #{pid := Pid}) ->

+ 37 - 0
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -3189,6 +3189,43 @@ t_non_blocking_channel_health_check(_Config) ->
     ),
     ok.
 
+%% Test that `stop' forcefully stops the resource manager even if it's stuck on a sync
+%% call such as `on_start', and that the claimed resources, if any, are freed.
+t_force_stop(_Config) ->
+    ?check_trace(
+        begin
+            {ok, Agent} = emqx_utils_agent:start_link(not_called),
+            {ok, _} =
+                create(
+                    ?ID,
+                    ?DEFAULT_RESOURCE_GROUP,
+                    ?TEST_RESOURCE,
+                    #{
+                        name => test_resource,
+                        create_error => {delay, 30_000, Agent}
+                    },
+                    #{
+                        health_check_interval => 100,
+                        start_timeout => 100
+                    }
+                ),
+            ?assertEqual(ok, emqx_resource_manager:stop(?ID, _Timeout = 100)),
+            ok
+        end,
+        [
+            log_consistency_prop(),
+            fun(Trace) ->
+                ?assertMatch([_ | _], ?of_kind(connector_demo_start_delay, Trace)),
+                ?assertMatch(
+                    [_ | _], ?of_kind("forcefully_stopping_resource_due_to_timeout", Trace)
+                ),
+                ?assertMatch([_ | _], ?of_kind(connector_demo_free_resources_without_state, Trace)),
+                ok
+            end
+        ]
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -733,8 +733,8 @@ event_info() ->
 event_info_schema_validation_failed() ->
     event_info_common(
         'schema.validation_failed',
-        {<<"schema validation failed">>, <<"TODO"/utf8>>},
-        {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>},
+        {<<"schema validation failed">>, <<"schema 验证失败"/utf8>>},
+        {<<"messages that do not pass configured validations">>, <<"未通过验证的消息"/utf8>>},
         <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">>
     ).
 ee_event_info() ->

+ 33 - 0
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -160,6 +160,7 @@
     find/3,
     join_to_string/1,
     join_to_string/2,
+    map_to_redis_hset_args/1,
     join_to_sql_values_string/1,
     jq/2,
     jq/3,
@@ -814,6 +815,38 @@ join_to_string(Str) -> emqx_variform_bif:join_to_string(Str).
 
 join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List).
 
+%% @doc Format map key-value pairs as redis HSET (or HMSET) command fields.
+%% Notes:
+%% - Non-string keys in the input map are dropped
+%% - Keys are not quoted
+%% - String values are always quoted
+%% - No escape sequence for keys and values
+%% - Float point values are formatted with fixed (6) decimal point compact-formatting
+map_to_redis_hset_args(Map) when erlang:is_map(Map) ->
+    [map_to_redis_hset_args | maps:fold(fun redis_hset_acc/3, [], Map)].
+
+redis_hset_acc(K, V, IoData) ->
+    try
+        [redis_field_name(K), redis_field_value(V) | IoData]
+    catch
+        _:_ ->
+            IoData
+    end.
+
+redis_field_name(K) when erlang:is_binary(K) ->
+    K;
+redis_field_name(K) ->
+    throw({bad_redis_field_name, K}).
+
+redis_field_value(V) when erlang:is_binary(V) ->
+    V;
+redis_field_value(V) when erlang:is_integer(V) ->
+    integer_to_binary(V);
+redis_field_value(V) when erlang:is_float(V) ->
+    float2str(V, 6);
+redis_field_value(V) when erlang:is_boolean(V) ->
+    atom_to_binary(V).
+
 join_to_sql_values_string(List) ->
     QuotedList =
         [

+ 21 - 0
apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl

@@ -1376,6 +1376,27 @@ t_parse_date_errors(_) ->
 
     ok.
 
+t_map_to_redis_hset_args(_Config) ->
+    Do = fun(Map) -> tl(emqx_rule_funcs:map_to_redis_hset_args(Map)) end,
+    ?assertEqual([], Do(#{})),
+    ?assertEqual([], Do(#{1 => 2})),
+    ?assertEqual([<<"a">>, <<"1">>], Do(#{<<"a">> => 1, 3 => 4})),
+    ?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})),
+    ?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})),
+    ?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})),
+    ?assertEqual([<<"a">>, <<"">>], Do(#{<<"a">> => <<"">>})),
+    ?assertEqual([<<"a">>, <<"i j">>], Do(#{<<"a">> => <<"i j">>})),
+    %% no determined ordering
+    ?assert(
+        case Do(#{<<"a">> => 1, <<"b">> => 2}) of
+            [<<"a">>, <<"1">>, <<"b">>, <<"2">>] ->
+                true;
+            [<<"b">>, <<"2">>, <<"a">>, <<"1">>] ->
+                true
+        end
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Utility functions
 %%------------------------------------------------------------------------------

+ 2 - 1
apps/emqx_utils/src/emqx_placeholder.erl

@@ -37,7 +37,8 @@
     proc_tmpl_deep/3,
 
     bin/1,
-    sql_data/1
+    sql_data/1,
+    lookup_var/2
 ]).
 
 -export([

+ 66 - 0
apps/emqx_utils/test/emqx_utils_agent.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc Similar to Elixir's [`Agent'](https://hexdocs.pm/elixir/Agent.html).
+
+-module(emqx_utils_agent).
+
+%% API
+-export([start_link/1, get/1, get_and_update/2]).
+
+%% `gen_server' API
+-export([init/1, handle_call/3]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-type state() :: term().
+
+-type get_and_update_fn() :: fun((state()) -> {term(), state()}).
+
+-record(get_and_update, {fn :: get_and_update_fn()}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link(state()) -> gen_server:start_ret().
+start_link(InitState) ->
+    gen_server:start_link(?MODULE, InitState, []).
+
+-spec get(gen_server:server_ref()) -> term().
+get(ServerRef) ->
+    Fn = fun(St) -> {St, St} end,
+    gen_server:call(ServerRef, #get_and_update{fn = Fn}).
+
+-spec get_and_update(gen_server:server_ref(), get_and_update_fn()) -> term().
+get_and_update(ServerRef, Fn) ->
+    gen_server:call(ServerRef, #get_and_update{fn = Fn}).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(InitState) ->
+    {ok, InitState}.
+
+handle_call(#get_and_update{fn = Fn}, _From, State0) ->
+    {Reply, State} = Fn(State0),
+    {reply, Reply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------

+ 1 - 1
bin/emqx

@@ -1204,7 +1204,7 @@ case "${COMMAND}" in
         esac
         case "$COMMAND" in
             foreground)
-                FOREGROUNDOPTIONS="-enable-feature maybe_expr -noshell -noinput +Bd"
+                FOREGROUNDOPTIONS="-enable-feature maybe_expr -noinput -noshell +Bd"
                 ;;
             *)
                 FOREGROUNDOPTIONS='-enable-feature maybe_expr'

+ 6 - 6
build

@@ -397,11 +397,11 @@ function is_ecr_and_enterprise() {
 
 ## Build the default docker image based on debian 12.
 make_docker() {
-    local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-5}"
+    local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-7}"
     local EMQX_BUILDER_PLATFORM="${EMQX_BUILDER_PLATFORM:-debian12}"
-    local EMQX_BUILDER_OTP="${EMQX_BUILDER_OTP:-25.3.2-2}"
-    local EMQX_BUILDER_ELIXIR="${EMQX_BUILDER_ELIXIR:-1.15.7}"
-    local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}}
+    local OTP_VSN="${OTP_VSN:-26.2.5-1}"
+    local ELIXIR_VSN="${ELIXIR_VSN:-1.15.7}"
+    local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}}
     local EMQX_RUNNER="${EMQX_RUNNER:-${EMQX_DEFAULT_RUNNER}}"
     local EMQX_DOCKERFILE="${EMQX_DOCKERFILE:-deploy/docker/Dockerfile}"
     local EMQX_SOURCE_TYPE="${EMQX_SOURCE_TYPE:-src}"
@@ -465,7 +465,7 @@ make_docker() {
        --label org.opencontainers.image.description="${PRODUCT_DESCRIPTION}" \
        --label org.opencontainers.image.documentation="${DOCUMENTATION_URL}" \
        --label org.opencontainers.image.licenses="${LICENSE}" \
-       --label org.opencontainers.image.otp.version="${EMQX_BUILDER_OTP}" \
+       --label org.opencontainers.image.otp.version="${OTP_VSN}" \
        --pull
     )
     :> ./.emqx_docker_image_tags
@@ -477,7 +477,7 @@ make_docker() {
         DOCKER_BUILDX_ARGS+=(--no-cache)
     fi
     if [ "${SUFFIX}" = '-elixir' ]; then
-        DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${EMQX_BUILDER_ELIXIR}")
+        DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${ELIXIR_VSN}")
     fi
     if [ "${DOCKER_LATEST:-false}" = true ]; then
         for r in "${DOCKER_REGISTRIES[@]}"; do

+ 3 - 0
changes/ce/feat-13175.en.md

@@ -0,0 +1,3 @@
+Added the `disable_prepared_statements` option for Postgres-based connectors.
+
+This option is to be used with endpoints that do not support the prepared statements session feature, such as PGBouncer and Supabase in Transaction mode.

+ 1 - 0
changes/ce/fix-13148.en.md

@@ -0,0 +1 @@
+Fixed an issue where a 500 HTTP status code could be returned by `/connectors/:connector-id/start` when there is a timeout waiting for the resource to be connected.

+ 6 - 0
changes/ce/fix-13164.en.md

@@ -0,0 +1,6 @@
+Fix HTTP authorization request body encoding.
+
+Prior to this fix, the HTTP authorization request body encoding format was taken from the `accept` header.
+The fix is to respect the `content-type` header instead.
+Also added `access` templating variable for v4 compatibility.
+The access code of SUBSCRIBE action is `1` and SUBSCRIBE action is `2`.

+ 3 - 0
changes/ce/fix-13181.en.md

@@ -0,0 +1,3 @@
+Now, when attempting to stop a connector, if such operation times out, we forcefully shut down the connector process.
+
+Error messages when attempting to disable an action/source when its underlying connector is stuck were also improved.

+ 5 - 0
changes/ee/feat-13172.en.md

@@ -0,0 +1,5 @@
+Added a rule function `map_to_redis_hset_args` to help preparing redis HSET (or HMSET) multi-fields values.
+
+For example, if `payload.value` is a map of multiple data fields,
+this rule `SELECT  map_to_redis_hset_args(payload.value) as hset_fields FROM  "t/#"` can prepare `hset_fields`
+for redis action to render the command template like `HMSET name1 ${hset_fields}`.

+ 3 - 0
changes/ee/fix-13093.en.md

@@ -0,0 +1,3 @@
+Improve Kafka consumer group stability.
+
+Prior to this change, Kafka consumer group sometimes may need to rebalance twice after Kafka group coordinator restart.

+ 1 - 1
deploy/docker/Dockerfile

@@ -1,4 +1,4 @@
-ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12
+ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12
 ARG RUN_FROM=public.ecr.aws/debian/debian:12-slim
 ARG SOURCE_TYPE=src # tgz
 

+ 2 - 3
mix.exs

@@ -102,8 +102,7 @@ defmodule EMQXUmbrella.MixProject do
       {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
       {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
       {:ra, "2.7.3", override: true},
-      {:mimerl, "1.2.0", override: true},
-      {:supervisor3, "1.1.12", override: true}
+      {:mimerl, "1.2.0", override: true}
     ] ++
       emqx_apps(profile_info, version) ++
       enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep()
@@ -215,7 +214,7 @@ defmodule EMQXUmbrella.MixProject do
       {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
-      {:brod, github: "kafka4beam/brod", tag: "3.16.8"},
+      {:brod, github: "kafka4beam/brod", tag: "3.18.0"},
       {:snappyer, "1.2.9", override: true},
       {:crc32cer, "0.1.8", override: true},
       {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},

+ 9 - 0
rel/i18n/emqx_postgresql.hocon

@@ -14,4 +14,13 @@ config_connector.desc:
 config_connector.label:
 """PostgreSQL Connector Config"""
 
+disable_prepared_statements.label:
+"""Disable Prepared Statements"""
+disable_prepared_statements.desc:
+"""~
+Disables the usage of prepared statements in the connections.
+Some endpoints, like PGBouncer or Supabase in Transaction mode, do not
+support session features such as prepared statements.  For such connections,
+this option should be enabled.~"""
+
 }

+ 2 - 2
scripts/buildx.sh

@@ -9,7 +9,7 @@
 
 ## example:
 ## ./scripts/buildx.sh --profile emqx --pkgtype tgz --arch arm64 \
-##     --builder ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12
+##     --builder ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12
 
 set -euo pipefail
 
@@ -24,7 +24,7 @@ help() {
     echo "--arch amd64|arm64:        Target arch to build the EMQX package for"
     echo "--src_dir <SRC_DIR>:       EMQX source code in this dir, default to PWD"
     echo "--builder <BUILDER>:       Builder image to pull"
-    echo "                           E.g. ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12"
+    echo "                           E.g. ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12"
 }
 
 die() {

+ 4 - 4
scripts/pr-sanity-checks.sh

@@ -12,11 +12,11 @@ if ! type "yq" > /dev/null; then
     exit 1
 fi
 
-EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-5}
-EMQX_BUILDER_OTP=${EMQX_BUILDER_OTP:-26.2.1-2}
-EMQX_BUILDER_ELIXIR=${EMQX_BUILDER_ELIXIR:-1.15.7}
+EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-7}
+OTP_VSN=${OTP_VSN:-26.2.5-1}
+ELIXIR_VSN=${ELIXIR_VSN:-1.15.7}
 EMQX_BUILDER_PLATFORM=${EMQX_BUILDER_PLATFORM:-ubuntu22.04}
-EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}}
+EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}}
 
 commands=$(yq ".jobs.sanity-checks.steps[].run" .github/workflows/_pr_entrypoint.yaml | grep -v null)
 

+ 1 - 1
scripts/relup-test/start-relup-test-cluster.sh

@@ -22,7 +22,7 @@ WEBHOOK="webhook.$NET"
 BENCH="bench.$NET"
 COOKIE='this-is-a-secret'
 ## Erlang image is needed to run webhook server and emqtt-bench
-ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04"
+ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04"
 # builder has emqtt-bench installed
 BENCH_IMAGE="$ERLANG_IMAGE"
 

+ 2 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -49,6 +49,7 @@ NIF
 OCSP
 OTP
 PEM
+PGBouncer
 PINGREQ
 PSK
 PSK
@@ -65,6 +66,7 @@ Riak
 SHA
 SMS
 Struct
+Supabase
 TCP
 TLS
 TTL