Przeglądaj źródła

chore: merge master into release-51

Ivan Dyachkov 2 lat temu
rodzic
commit
b7f0f7b32d
82 zmienionych plików z 2402 dodań i 170 usunięć
  1. 1 0
      .ci/docker-compose-file/.env
  2. 1 1
      .ci/docker-compose-file/docker-compose-kafka.yaml
  3. 12 0
      .ci/docker-compose-file/docker-compose-kinesis.yaml
  4. 2 0
      .ci/docker-compose-file/docker-compose-toxiproxy.yaml
  5. 1 1
      .ci/docker-compose-file/docker-compose.yaml
  6. 6 0
      .ci/docker-compose-file/toxiproxy.json
  7. 1 1
      .github/CODEOWNERS
  8. 2 2
      .github/workflows/build_and_push_docker_images.yaml
  9. 33 33
      .github/workflows/build_packages.yaml
  10. 2 2
      .github/workflows/build_packages_cron.yaml
  11. 2 2
      .github/workflows/build_slim_packages.yaml
  12. 1 1
      .github/workflows/check_deps_integrity.yaml
  13. 1 1
      .github/workflows/code_style_check.yaml
  14. 61 0
      .github/workflows/codeql.yaml
  15. 1 1
      .github/workflows/elixir_apps_check.yaml
  16. 1 1
      .github/workflows/elixir_deps_check.yaml
  17. 1 1
      .github/workflows/elixir_release.yml
  18. 1 1
      .github/workflows/performance_test.yaml
  19. 6 2
      .github/workflows/release.yaml
  20. 1 1
      .github/workflows/run_conf_tests.yaml
  21. 1 1
      .github/workflows/run_emqx_app_tests.yaml
  22. 3 3
      .github/workflows/run_fvt_tests.yaml
  23. 1 1
      .github/workflows/run_relup_tests.yaml
  24. 3 3
      .github/workflows/run_test_cases.yaml
  25. 3 2
      Makefile
  26. 40 0
      SECURITY.md
  27. 2 1
      apps/emqx/rebar.config
  28. 2 1
      apps/emqx/src/emqx.app.src
  29. 1 0
      apps/emqx/src/emqx_app.erl
  30. 1 0
      apps/emqx/src/emqx_broker.erl
  31. 86 0
      apps/emqx/src/emqx_persistent_session_ds.erl
  32. 8 0
      apps/emqx/src/emqx_schema.erl
  33. 116 0
      apps/emqx/test/emqx_persistent_messages_SUITE.erl
  34. 2 1
      apps/emqx_bridge/src/emqx_bridge.erl
  35. 6 1
      apps/emqx_bridge/src/emqx_bridge_api.erl
  36. 2 0
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  37. 20 4
      apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl
  38. 1 1
      apps/emqx_bridge_dynamo/rebar.config
  39. 2 0
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl
  40. 21 1
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
  41. 94 0
      apps/emqx_bridge_kinesis/BSL.txt
  42. 22 0
      apps/emqx_bridge_kinesis/README.md
  43. 2 0
      apps/emqx_bridge_kinesis/docker-ct
  44. 11 0
      apps/emqx_bridge_kinesis/rebar.config
  45. 13 0
      apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src
  46. 167 0
      apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl
  47. 178 0
      apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl
  48. 247 0
      apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl
  49. 817 0
      apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl
  50. 13 0
      apps/emqx_durable_storage/src/emqx_ds.erl
  51. 1 1
      apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl
  52. 23 17
      apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl
  53. 9 7
      apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl
  54. 1 1
      apps/emqx_durable_storage/src/emqx_durable_storage.app.src
  55. 2 2
      apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl
  56. 2 1
      apps/emqx_machine/src/emqx_machine_boot.erl
  57. 2 1
      apps/emqx_management/src/emqx_mgmt_data_backup.erl
  58. 1 1
      apps/emqx_modules/src/emqx_modules.app.src
  59. 39 2
      apps/emqx_modules/src/emqx_modules_conf.erl
  60. 38 4
      apps/emqx_modules/test/emqx_modules_conf_SUITE.erl
  61. 0 1
      apps/emqx_resource/src/emqx_resource_manager.erl
  62. 1 1
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  63. 1 1
      apps/emqx_s3/rebar.config
  64. 3 3
      bin/emqx
  65. 1 0
      changes/ce/feat-11124.en.md
  66. 1 0
      changes/ce/feat-11289.en.md
  67. 1 0
      changes/ce/feat-11290.en.md
  68. 1 0
      changes/ce/feat-11291.en.md
  69. 4 0
      changes/ce/fix-11296.en.md
  70. 1 0
      changes/ee/feat-11261.en.md
  71. 1 1
      deploy/docker/Dockerfile
  72. 3 3
      dev
  73. 27 6
      mix.exs
  74. 2 2
      rebar.config
  75. 4 2
      rebar.config.erl
  76. 85 0
      rel/i18n/emqx_bridge_kinesis.hocon
  77. 2 2
      scripts/buildx.sh
  78. 78 0
      scripts/check_missing_reboot_apps.exs
  79. 3 0
      scripts/ct/run.sh
  80. 1 1
      scripts/macos-sign-binaries.sh
  81. 41 38
      scripts/pkg-tests.sh
  82. 1 1
      scripts/relup-test/start-relup-test-cluster.sh

+ 1 - 0
.ci/docker-compose-file/.env

@@ -9,6 +9,7 @@ DYNAMO_TAG=1.21.0
 CASSANDRA_TAG=3.11.6
 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z
 OPENTS_TAG=9aa7f88
+KINESIS_TAG=2.1
 
 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
 SQLSERVER_TAG=2019-CU19-ubuntu-20.04

+ 1 - 1
.ci/docker-compose-file/docker-compose-kafka.yaml

@@ -18,7 +18,7 @@ services:
       - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
   kdc:
     hostname: kdc.emqx.net
-    image:  ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu20.04
+    image:  ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu20.04
     container_name: kdc.emqx.net
     expose:
       - 88 # kdc

+ 12 - 0
.ci/docker-compose-file/docker-compose-kinesis.yaml

@@ -0,0 +1,12 @@
+version: '3.9'
+
+services:
+  kinesis:
+    container_name: kinesis
+    image: localstack/localstack:2.1
+    environment:
+      - KINESIS_ERROR_PROBABILITY=0.0
+      - KINESIS_LATENCY=0
+    restart: always
+    networks:
+      - emqx_bridge

+ 2 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -49,6 +49,8 @@ services:
       - 38080:38080
       # HStreamDB
       - 15670:5670
+      # Kinesis
+      - 4566:4566
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 1 - 1
.ci/docker-compose-file/docker-compose.yaml

@@ -3,7 +3,7 @@ version: '3.9'
 services:
   erlang:
     container_name: erlang
-    image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu20.04}
+    image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu20.04}
     env_file:
       - conf.env
     environment:

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -161,5 +161,11 @@
     "listen": "0.0.0.0:6570",
     "upstream": "hstreamdb:6570",
     "enabled": true
+  },
+  {
+    "name": "kinesis",
+    "listen": "0.0.0.0:4566",
+    "upstream": "kinesis:4566",
+    "enabled": true
   }
 ]

+ 1 - 1
.github/CODEOWNERS

@@ -18,7 +18,7 @@
 /apps/emqx_rule_engine/    @emqx/emqx-review-board @kjellwinblad
 /apps/emqx_slow_subs/      @emqx/emqx-review-board @lafirest
 /apps/emqx_statsd/         @emqx/emqx-review-board @JimMoen
-/apps/emqx_durable_storage/ @ieQu1
+/apps/emqx_durable_storage/ @emqx/emqx-review-board @ieQu1 @keynslug 
 
 ## CI
 /deploy/  @emqx/emqx-review-board @Rory-Z

+ 2 - 2
.github/workflows/build_and_push_docker_images.yaml

@@ -25,7 +25,7 @@ jobs:
   prepare:
     runs-on: ubuntu-22.04
     # prepare source with any OTP version, no need for a matrix
-    container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
+    container: "ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04"
 
     outputs:
       PROFILE: ${{ steps.get_profile.outputs.PROFILE }}
@@ -120,7 +120,7 @@ jobs:
         # NOTE: 'otp' and 'elixir' are to configure emqx-builder image
         #       only support latest otp and elixir, not a matrix
         builder:
-          - 5.1-0 # update to latest
+          - 5.1-3 # update to latest
         otp:
           - 25.3.2-1
         elixir:

+ 33 - 33
.github/workflows/build_packages.yaml

@@ -21,7 +21,7 @@ on:
 jobs:
   prepare:
     runs-on: ubuntu-22.04
-    container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
+    container: ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04
     outputs:
       BUILD_PROFILE: ${{ steps.get_profile.outputs.BUILD_PROFILE }}
       IS_EXACT_TAG: ${{ steps.get_profile.outputs.IS_EXACT_TAG }}
@@ -181,24 +181,26 @@ jobs:
           - ubuntu22.04
           - ubuntu20.04
           - ubuntu18.04
+          - debian12
           - debian11
           - debian10
           - el9
           - el8
           - el7
           - amzn2
+          - amzn2023
         build_machine:
           - aws-arm64
-          - ubuntu-22.04
+          - aws-amd64
         builder:
-          - 5.1-0
+          - 5.1-3
         elixir:
           - 1.14.5
         with_elixir:
           - 'no'
         exclude:
           - arch: arm64
-            build_machine: ubuntu-22.04
+            build_machine: aws-amd64
           - arch: amd64
             build_machine: aws-arm64
         include:
@@ -206,16 +208,8 @@ jobs:
             otp: 25.3.2-1
             arch: amd64
             os: ubuntu22.04
-            build_machine: ubuntu-22.04
-            builder: 5.1-0
-            elixir: 1.14.5
-            with_elixir: 'yes'
-          - profile: emqx
-            otp: 25.3.2-1
-            arch: amd64
-            os: amzn2
-            build_machine: ubuntu-22.04
-            builder: 5.1-0
+            build_machine: aws-amd64
+            builder: 5.1-3
             elixir: 1.14.5
             with_elixir: 'yes'
 
@@ -225,18 +219,13 @@ jobs:
 
     steps:
     - uses: AutoModality/action-clean@v1
-      if: matrix.build_machine == 'aws-arm64'
 
     - uses: actions/checkout@v3
       with:
         ref: ${{ github.event.inputs.branch_or_tag }}
         fetch-depth: 0
 
-    - name: build emqx packages
-      env:
-        ELIXIR: ${{ matrix.elixir }}
-        PROFILE: ${{ matrix.profile }}
-        ARCH: ${{ matrix.arch }}
+    - name: fix workdir
       run: |
         set -eu
         git config --global --add safe.directory "$GITHUB_WORKSPACE"
@@ -246,22 +235,33 @@ jobs:
           cd /emqx
         fi
         echo "pwd is $PWD"
-        PKGTYPES="tgz pkg"
-        IS_ELIXIR=${{ matrix.with_elixir }}
+
+    - name: build emqx packages
+      env:
+        PROFILE: ${{ matrix.profile }}
+        IS_ELIXIR: ${{ matrix.with_elixir }}
+        ACLOCAL_PATH: "/usr/share/aclocal:/usr/local/share/aclocal"
+      run: |
+        set -eu
         if [ "${IS_ELIXIR:-}" == 'yes' ]; then
-          PKGTYPES="tgz"
+          make "${PROFILE}-elixir-tgz"
+        else
+          make "${PROFILE}-tgz"
+          make "${PROFILE}-pkg"
+        fi
+    - name: test emqx packages
+      env:
+        PROFILE: ${{ matrix.profile }}
+        IS_ELIXIR: ${{ matrix.with_elixir }}
+      run: |
+        set -eu
+        if [ "${IS_ELIXIR:-}" == 'yes' ]; then
+          ./scripts/pkg-tests.sh "${PROFILE}-elixir-tgz"
+        else
+          ./scripts/pkg-tests.sh "${PROFILE}-tgz"
+          ./scripts/pkg-tests.sh "${PROFILE}-pkg"
         fi
-        for PKGTYPE in ${PKGTYPES};
-        do
-          ./scripts/buildx.sh \
-            --profile "${PROFILE}" \
-            --pkgtype "${PKGTYPE}" \
-            --arch "${ARCH}" \
-            --elixir "${IS_ELIXIR}" \
-            --builder "force_host"
-        done
     - uses: actions/upload-artifact@v3
-      if: success()
       with:
         name: ${{ matrix.profile }}
         path: _packages/${{ matrix.profile }}/

+ 2 - 2
.github/workflows/build_packages_cron.yaml

@@ -30,9 +30,9 @@ jobs:
           - amd64
         os:
           - debian10
-          - amzn2
+          - amzn2023
         builder:
-          - 5.1-0
+          - 5.1-3
         elixir:
           - 1.14.5
 

+ 2 - 2
.github/workflows/build_slim_packages.yaml

@@ -32,10 +32,10 @@ jobs:
         profile:
           - ["emqx", "25.3.2-1", "el7", "erlang"]
           - ["emqx", "25.3.2-1", "ubuntu22.04", "elixir"]
-          - ["emqx-enterprise", "25.3.2-1", "amzn2", "erlang"]
+          - ["emqx-enterprise", "25.3.2-1", "amzn2023", "erlang"]
           - ["emqx-enterprise", "25.3.2-1", "ubuntu20.04", "erlang"]
         builder:
-          - 5.1-0
+          - 5.1-3
         elixir:
           - '1.14.5'
 

+ 1 - 1
.github/workflows/check_deps_integrity.yaml

@@ -6,7 +6,7 @@ on:
 jobs:
   check_deps_integrity:
     runs-on: ubuntu-22.04
-    container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
+    container: ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04
 
     steps:
       - uses: actions/checkout@v3

+ 1 - 1
.github/workflows/code_style_check.yaml

@@ -5,7 +5,7 @@ on: [pull_request]
 jobs:
   code_style_check:
     runs-on: ubuntu-22.04
-    container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
+    container: "ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04"
     steps:
       - uses: actions/checkout@v3
         with:

+ 61 - 0
.github/workflows/codeql.yaml

@@ -0,0 +1,61 @@
+name: "CodeQL"
+
+on:
+  schedule:
+    - cron: '33 14 * * 4'
+  workflow_dispatch:
+    inputs:
+      ref:
+        required: false
+
+jobs:
+  analyze:
+    name: Analyze
+    runs-on: ubuntu-latest
+    timeout-minutes: 360
+    permissions:
+      actions: read
+      contents: read
+      security-events: write
+    container:
+      image: ghcr.io/emqx/emqx-builder/5.1-1:1.14.5-25.3.2-1-ubuntu22.04
+
+    strategy:
+      fail-fast: false
+      matrix:
+        language: [ 'cpp', 'python' ]
+
+    steps:
+    - name: Checkout repository
+      uses: actions/checkout@v3
+      with:
+        ref: ${{ github.event.inputs.ref }}
+
+    - name: Ensure git safe dir
+      run: |
+        git config --global --add safe.directory "$GITHUB_WORKSPACE"
+        make ensure-rebar3
+
+    - name: Initialize CodeQL
+      uses: github/codeql-action/init@v2
+      with:
+        languages: ${{ matrix.language }}
+
+    - name: Build
+      if: matrix.language == 'cpp'
+      env:
+        PROFILE: emqx-enterprise
+      run: |
+        make emqx-enterprise-compile
+
+    - name: Fetch deps
+      if: matrix.language == 'python'
+      env:
+        PROFILE: emqx-enterprise
+      run: |
+        make deps-emqx-enterprise
+
+    - name: Perform CodeQL Analysis
+      uses: github/codeql-action/analyze@v2
+      with:
+        category: "/language:${{matrix.language}}"

+ 1 - 1
.github/workflows/elixir_apps_check.yaml

@@ -9,7 +9,7 @@ jobs:
   elixir_apps_check:
     runs-on: ubuntu-22.04
     # just use the latest builder
-    container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
+    container: "ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04"
 
     strategy:
       fail-fast: false

+ 1 - 1
.github/workflows/elixir_deps_check.yaml

@@ -8,7 +8,7 @@ on:
 jobs:
   elixir_deps_check:
     runs-on: ubuntu-22.04
-    container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
+    container: ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04
 
     steps:
       - name: Checkout

+ 1 - 1
.github/workflows/elixir_release.yml

@@ -17,7 +17,7 @@ jobs:
         profile:
           - emqx
           - emqx-enterprise
-    container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
+    container: ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04
     steps:
       - name: Checkout
         uses: actions/checkout@v3

+ 1 - 1
.github/workflows/performance_test.yaml

@@ -23,7 +23,7 @@ jobs:
   prepare:
     runs-on: ubuntu-latest
     if: github.repository_owner == 'emqx'
-    container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu20.04
+    container: ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu20.04
     outputs:
       BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
       PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}

+ 6 - 2
.github/workflows/release.yaml

@@ -87,20 +87,24 @@ jobs:
           push "debian/buster" "packages/$PROFILE-$VERSION-debian10-arm64.deb"
           push "debian/bullseye" "packages/$PROFILE-$VERSION-debian11-amd64.deb"
           push "debian/bullseye" "packages/$PROFILE-$VERSION-debian11-arm64.deb"
+          push "debian/bookworm" "packages/$PROFILE-$VERSION-debian12-amd64.deb"
+          push "debian/bookworm" "packages/$PROFILE-$VERSION-debian12-arm64.deb"
           push "ubuntu/bionic" "packages/$PROFILE-$VERSION-ubuntu18.04-amd64.deb"
           push "ubuntu/bionic" "packages/$PROFILE-$VERSION-ubuntu18.04-arm64.deb"
           push "ubuntu/focal" "packages/$PROFILE-$VERSION-ubuntu20.04-amd64.deb"
           push "ubuntu/focal" "packages/$PROFILE-$VERSION-ubuntu20.04-arm64.deb"
           push "ubuntu/jammy" "packages/$PROFILE-$VERSION-ubuntu22.04-amd64.deb"
           push "ubuntu/jammy" "packages/$PROFILE-$VERSION-ubuntu22.04-arm64.deb"
-          push "el/6" "packages/$PROFILE-$VERSION-amzn2-amd64.rpm"
-          push "el/6" "packages/$PROFILE-$VERSION-amzn2-arm64.rpm"
           push "el/7" "packages/$PROFILE-$VERSION-el7-amd64.rpm"
           push "el/7" "packages/$PROFILE-$VERSION-el7-arm64.rpm"
           push "el/8" "packages/$PROFILE-$VERSION-el8-amd64.rpm"
           push "el/8" "packages/$PROFILE-$VERSION-el8-arm64.rpm"
           push "el/9" "packages/$PROFILE-$VERSION-el9-amd64.rpm"
           push "el/9" "packages/$PROFILE-$VERSION-el9-arm64.rpm"
+          push "amazon/2" "packages/$PROFILE-$VERSION-amzn2-amd64.rpm"
+          push "amazon/2" "packages/$PROFILE-$VERSION-amzn2-arm64.rpm"
+          push "amazon/2023" "packages/$PROFILE-$VERSION-amzn2023-amd64.rpm"
+          push "amazon/2023" "packages/$PROFILE-$VERSION-amzn2023-arm64.rpm"
 
   rerun-apps-version-check:
     runs-on: ubuntu-22.04

+ 1 - 1
.github/workflows/run_conf_tests.yaml

@@ -26,7 +26,7 @@ jobs:
         profile:
           - emqx
           - emqx-enterprise
-    container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
+    container: "ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04"
     steps:
       - uses: AutoModality/action-clean@v1
       - uses: actions/checkout@v3

+ 1 - 1
.github/workflows/run_emqx_app_tests.yaml

@@ -12,7 +12,7 @@ jobs:
     strategy:
       matrix:
         builder:
-          - 5.1-0
+          - 5.1-3
         otp:
           - 25.3.2-1
         # no need to use more than 1 version of Elixir, since tests

+ 3 - 3
.github/workflows/run_fvt_tests.yaml

@@ -17,7 +17,7 @@ jobs:
   prepare:
     runs-on: ubuntu-22.04
     # prepare source with any OTP version, no need for a matrix
-    container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-debian11
+    container: ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-debian11
 
     steps:
       - uses: actions/checkout@v3
@@ -50,7 +50,7 @@ jobs:
         os:
           - ["debian11", "debian:11-slim"]
         builder:
-          - 5.1-0
+          - 5.1-3
         otp:
           - 25.3.2-1
         elixir:
@@ -123,7 +123,7 @@ jobs:
         os:
         - ["debian11", "debian:11-slim"]
         builder:
-        - 5.1-0
+        - 5.1-3
         otp:
         - 25.3.2-1
         elixir:

+ 1 - 1
.github/workflows/run_relup_tests.yaml

@@ -15,7 +15,7 @@ concurrency:
 jobs:
   relup_test_plan:
     runs-on: ubuntu-22.04
-    container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
+    container: "ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04"
     outputs:
       CUR_EE_VSN: ${{ steps.find-versions.outputs.CUR_EE_VSN }}
       OLD_VERSIONS: ${{ steps.find-versions.outputs.OLD_VERSIONS }}

+ 3 - 3
.github/workflows/run_test_cases.yaml

@@ -34,12 +34,12 @@ jobs:
           MATRIX="$(echo "${APPS}" | jq -c '
             [
               (.[] | select(.profile == "emqx") | . + {
-                builder: "5.1-0",
+                builder: "5.1-3",
                 otp: "25.3.2-1",
                 elixir: "1.14.5"
               }),
               (.[] | select(.profile == "emqx-enterprise") | . + {
-                builder: "5.1-0",
+                builder: "5.1-3",
                 otp: ["25.3.2-1"][],
                 elixir: "1.14.5"
               })
@@ -286,7 +286,7 @@ jobs:
       - ct
       - ct_docker
     runs-on: ubuntu-22.04
-    container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
+    container: "ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu22.04"
     steps:
       - uses: AutoModality/action-clean@v1
       - uses: actions/download-artifact@v3

+ 3 - 2
Makefile

@@ -2,7 +2,7 @@ REBAR = $(CURDIR)/rebar3
 BUILD = $(CURDIR)/build
 SCRIPTS = $(CURDIR)/scripts
 export EMQX_RELUP ?= true
-export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-debian11
+export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-debian11
 export EMQX_DEFAULT_RUNNER = debian:11-slim
 export EMQX_REL_FORM ?= tgz
 export QUICER_DOWNLOAD_FROM_RELEASE = 1
@@ -15,7 +15,7 @@ endif
 
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.3.1
+export EMQX_DASHBOARD_VERSION ?= v1.3.2
 export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.4
 
 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
@@ -99,6 +99,7 @@ static_checks:
 	@$(REBAR) as check do xref, dialyzer
 	@if [ "$${PROFILE}" = 'emqx-enterprise' ]; then $(REBAR) ct --suite apps/emqx/test/emqx_static_checks --readable $(CT_READABLE); fi
 	./scripts/check-i18n-style.sh
+	./scripts/check_missing_reboot_apps.exs
 
 APPS=$(shell $(SCRIPTS)/find-apps.sh)
 

+ 40 - 0
SECURITY.md

@@ -0,0 +1,40 @@
+# Security Policy
+
+## Supported Versions
+
+| Version | Supported          |
+| ------- | ------------------ |
+| 5.1.x   | :white_check_mark: |
+| 5.0.x   | :white_check_mark: |
+| 4.4.x   | :white_check_mark: |
+| < 4.4   | :x:                |
+
+## Qualifying Vulnerabilities
+
+Any design or implementation issue that substantially affects the confidentiality or integrity of user data is likely to be in scope for the program. Common examples including:
+
+* Cross-site scripting
+* Cross-site request forgery
+* Mixed-content scripts
+* Authentication or authorization flaws
+* Server-side code execution bugs
+
+Out of concern for the availability of our services to all users, please do not attempt to carry out DoS attacks, leverage black hat SEO techniques, spam people, brute force authentication, or do other similarly questionable things. We also discourage the use of any vulnerability testing tools that automatically generate very significant volumes of traffic.
+
+## Non-qualifying Vulnerabilities
+
+Depending on their impacts, some of the reported issues may not qualify.
+Although we review them on a case-by-case basis, here are some of the issues that typically do not earn a monetary reward:
+
+* Bugs requiring exceedingly unlikely user interaction Brute forcing
+* User enumeration
+* Non security related bugs
+* Abuse
+
+## Reporting a Vulnerability
+
+1. When investigating a vulnerability, please, only ever target your own accounts. Never attempt to access anyone else's data and do not engage in any activity that would be disruptive or damaging to other users.
+2. In the case the same vulnerability is present on multiple products, please combine and send one report.
+3. If you have found a vulnerability, please contact us at security@emqx.io.
+4. Note that we are only able to answer technical vulnerability reports. Duplicate reports will not be rewarded, first report on the specific vulnerability will be rewarded.
+5. The report should include steps in plain text how to reproduce the vulnerability (not only video or images).

+ 2 - 1
apps/emqx/rebar.config

@@ -23,11 +23,12 @@
 %% `git_subdir` dependency in other projects.
 {deps, [
     {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_durable_storage, {path, "../emqx_durable_storage"}},
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
-    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}},
+    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.6"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.13"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},

+ 2 - 1
apps/emqx/src/emqx.app.src

@@ -16,7 +16,8 @@
         sasl,
         os_mon,
         lc,
-        hocon
+        hocon,
+        emqx_durable_storage
     ]},
     {mod, {emqx_app, []}},
     {env, []},

+ 1 - 0
apps/emqx/src/emqx_app.erl

@@ -39,6 +39,7 @@
 start(_Type, _Args) ->
     ok = maybe_load_config(),
     ok = emqx_persistent_session:init_db_backend(),
+    _ = emqx_persistent_session_ds:init(),
     ok = maybe_start_quicer(),
     ok = emqx_bpapi:start(),
     ok = emqx_alarm_handler:load(),

+ 1 - 0
apps/emqx/src/emqx_broker.erl

@@ -225,6 +225,7 @@ publish(Msg) when is_record(Msg, message) ->
             [];
         Msg1 = #message{topic = Topic} ->
             emqx_persistent_session:persist_message(Msg1),
+            _ = emqx_persistent_session_ds:persist_message(Msg1),
             route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
     end.
 

+ 86 - 0
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -0,0 +1,86 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds).
+
+-export([init/0]).
+
+-export([persist_message/1]).
+
+-export([
+    serialize_message/1,
+    deserialize_message/1
+]).
+
+%% FIXME
+-define(DS_SHARD, <<"local">>).
+
+-define(WHEN_ENABLED(DO),
+    case is_store_enabled() of
+        true -> DO;
+        false -> {skipped, disabled}
+    end
+).
+
+%%
+
+init() ->
+    ?WHEN_ENABLED(
+        ok = emqx_ds:ensure_shard(?DS_SHARD, #{
+            dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
+        })
+    ).
+
+%%
+
+-spec persist_message(emqx_types:message()) ->
+    ok | {skipped, _Reason} | {error, _TODO}.
+persist_message(Msg) ->
+    ?WHEN_ENABLED(
+        case needs_persistence(Msg) andalso find_subscribers(Msg) of
+            [_ | _] ->
+                store_message(Msg);
+            % [] ->
+            %     {skipped, no_subscribers};
+            false ->
+                {skipped, needs_no_persistence}
+        end
+    ).
+
+needs_persistence(Msg) ->
+    not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
+
+store_message(Msg) ->
+    ID = emqx_message:id(Msg),
+    Timestamp = emqx_guid:timestamp(ID),
+    Topic = emqx_topic:words(emqx_message:topic(Msg)),
+    emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
+
+find_subscribers(_Msg) ->
+    [node()].
+
+%%
+
+serialize_message(Msg) ->
+    term_to_binary(emqx_message:to_map(Msg)).
+
+deserialize_message(Bin) ->
+    emqx_message:from_map(binary_to_term(Bin)).
+
+%%
+
+is_store_enabled() ->
+    emqx_config:get([persistent_session_store, ds]).

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -319,6 +319,14 @@ fields("persistent_session_store") ->
                     desc => ?DESC(persistent_session_store_enabled)
                 }
             )},
+        {"ds",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
         {"on_disc",
             sc(
                 boolean(),

+ 116 - 0
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -0,0 +1,116 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_messages_SUITE).
+
+-include_lib("stdlib/include/assert.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(NOW,
+    (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}]))
+).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_durable_storage),
+    ok = emqx_common_test_helpers:start_apps([], fun
+        (emqx) ->
+            emqx_common_test_helpers:boot_modules(all),
+            emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>),
+            emqx_app:set_config_loader(?MODULE);
+        (_) ->
+            ok
+    end),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_common_test_helpers:stop_apps([]),
+    application:stop(emqx_durable_storage),
+    ok.
+
+t_messages_persisted(_Config) ->
+    C1 = connect(<<?MODULE_STRING "1">>, true, 30),
+    C2 = connect(<<?MODULE_STRING "2">>, false, 60),
+    C3 = connect(<<?MODULE_STRING "3">>, false, undefined),
+    C4 = connect(<<?MODULE_STRING "4">>, false, 0),
+
+    CP = connect(<<?MODULE_STRING "-pub">>, true, undefined),
+
+    {ok, _, [1]} = emqtt:subscribe(C1, <<"client/+/topic">>, qos1),
+    {ok, _, [0]} = emqtt:subscribe(C2, <<"client/+/topic">>, qos0),
+    {ok, _, [1]} = emqtt:subscribe(C2, <<"random/+">>, qos1),
+    {ok, _, [2]} = emqtt:subscribe(C3, <<"client/#">>, qos2),
+    {ok, _, [0]} = emqtt:subscribe(C4, <<"random/#">>, qos0),
+
+    Messages = [
+        M1 = {<<"client/1/topic">>, <<"1">>},
+        M2 = {<<"client/2/topic">>, <<"2">>},
+        M3 = {<<"client/3/topic/sub">>, <<"3">>},
+        M4 = {<<"client/4">>, <<"4">>},
+        M5 = {<<"random/5">>, <<"5">>},
+        M6 = {<<"random/6/topic">>, <<"6">>},
+        M7 = {<<"client/7/topic">>, <<"7">>},
+        M8 = {<<"client/8/topic/sub">>, <<"8">>},
+        M9 = {<<"random/9">>, <<"9">>},
+        M10 = {<<"random/10">>, <<"10">>}
+    ],
+
+    Results = [emqtt:publish(CP, Topic, Payload, 1) || {Topic, Payload} <- Messages],
+
+    ct:pal("Results = ~p", [Results]),
+
+    Persisted = consume(<<"local">>, {['#'], 0}),
+
+    ct:pal("Persisted = ~p", [Persisted]),
+
+    ?assertEqual(
+        % [M1, M2, M5, M7, M9, M10],
+        [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10],
+        [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
+    ),
+
+    ok.
+
+%%
+
+connect(ClientId, CleanStart, EI) ->
+    {ok, Client} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {clean_start, CleanStart},
+        {properties,
+            maps:from_list(
+                [{'Session-Expiry-Interval', EI} || is_integer(EI)]
+            )}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    Client.
+
+consume(Shard, Replay) ->
+    {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay),
+    consume(It).
+
+consume(It) ->
+    case emqx_ds_storage_layer:next(It) of
+        {value, Msg, NIt} ->
+            [emqx_persistent_session_ds:deserialize_message(Msg) | consume(NIt)];
+        none ->
+            []
+    end.

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -88,7 +88,8 @@
     T == sqlserver;
     T == pulsar_producer;
     T == oracle;
-    T == iotdb
+    T == iotdb;
+    T == kinesis_producer
 ).
 
 -define(ROOT_KEY, bridges).

+ 6 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -546,7 +546,12 @@ schema("/bridges_probe") ->
                     ?NO_CONTENT;
                 {error, #{kind := validation_error} = Reason} ->
                     ?BAD_REQUEST('TEST_FAILED', map_to_json(Reason));
-                {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
+                {error, Reason0} when not is_tuple(Reason0); element(1, Reason0) =/= 'exit' ->
+                    Reason =
+                        case Reason0 of
+                            {unhealthy_target, Message} -> Message;
+                            _ -> Reason0
+                        end,
                     ?BAD_REQUEST('TEST_FAILED', Reason)
             end;
         BadRequest ->

+ 2 - 0
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -374,6 +374,8 @@ parse_confs(<<"kafka">> = _Type, Name, Conf) ->
     Conf#{bridge_name => Name};
 parse_confs(<<"pulsar_producer">> = _Type, Name, Conf) ->
     Conf#{bridge_name => Name};
+parse_confs(<<"kinesis_producer">> = _Type, Name, Conf) ->
+    Conf#{bridge_name => Name};
 parse_confs(_Type, _Name, Conf) ->
     Conf.
 

+ 20 - 4
apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl

@@ -49,7 +49,8 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_pulsar, <<"pulsar_producer">>, Method ++ "_producer"),
         api_ref(emqx_bridge_oracle, <<"oracle">>, Method),
         api_ref(emqx_bridge_iotdb, <<"iotdb">>, Method),
-        api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method)
+        api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method),
+        api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer")
     ].
 
 schema_modules() ->
@@ -74,7 +75,8 @@ schema_modules() ->
         emqx_bridge_pulsar,
         emqx_bridge_oracle,
         emqx_bridge_iotdb,
-        emqx_bridge_rabbitmq
+        emqx_bridge_rabbitmq,
+        emqx_bridge_kinesis
     ].
 
 examples(Method) ->
@@ -119,7 +121,8 @@ resource_type(opents) -> emqx_bridge_opents_connector;
 resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
 resource_type(oracle) -> emqx_oracle;
 resource_type(iotdb) -> emqx_bridge_iotdb_impl;
-resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector.
+resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
+resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer.
 
 fields(bridges) ->
     [
@@ -199,7 +202,8 @@ fields(bridges) ->
     ] ++ kafka_structs() ++ pulsar_structs() ++ gcp_pubsub_structs() ++ mongodb_structs() ++
         influxdb_structs() ++
         redis_structs() ++
-        pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs().
+        pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++
+        kinesis_structs().
 
 mongodb_structs() ->
     [
@@ -365,6 +369,18 @@ rabbitmq_structs() ->
             )}
     ].
 
+kinesis_structs() ->
+    [
+        {kinesis_producer,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_kinesis, "config_producer")),
+                #{
+                    desc => <<"Amazon Kinesis Producer Bridge Config">>,
+                    required => false
+                }
+            )}
+    ].
+
 api_ref(Module, Type, Method) ->
     {Type, ref(Module, Method)}.
 

+ 1 - 1
apps/emqx_bridge_dynamo/rebar.config

@@ -1,6 +1,6 @@
 %% -*- mode: erlang; -*-
 {erl_opts, [debug_info]}.
-{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}}
+{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 2 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl

@@ -249,6 +249,8 @@ check_workers(InstanceId, Client) ->
             #{return_values => true}
         )
     of
+        {ok, []} ->
+            connecting;
         {ok, Values} ->
             AllOk = lists:all(fun(S) -> S =:= subscription_ok end, Values),
             case AllOk of

+ 21 - 1
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -1004,7 +1004,27 @@ t_bridge_rule_action_source(Config) ->
     ok.
 
 t_on_get_status(Config) ->
+    ResourceId = resource_id(Config),
     emqx_bridge_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    %% no workers alive
+    ?retry(
+        _Interval0 = 200,
+        _NAttempts0 = 20,
+        ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    WorkerPids = get_pull_worker_pids(Config),
+    emqx_utils:pmap(
+        fun(Pid) ->
+            Ref = monitor(process, Pid),
+            exit(Pid, kill),
+            receive
+                {'DOWN', Ref, process, Pid, killed} ->
+                    ok
+            end
+        end,
+        WorkerPids
+    ),
+    ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)),
     ok.
 
 t_create_via_http_api(_Config) ->
@@ -1191,7 +1211,7 @@ t_nonexistent_topic(Config) ->
                 emqx_resource_manager:health_check(ResourceId)
             ),
             ?assertMatch(
-                {ok, _Group, #{error := "GCP PubSub topics are invalid" ++ _}},
+                {ok, _Group, #{error := {unhealthy_target, "GCP PubSub topics are invalid" ++ _}}},
                 emqx_resource_manager:lookup_cached(ResourceId)
             ),
             %% now create the topic and restart the bridge

+ 94 - 0
apps/emqx_bridge_kinesis/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 22 - 0
apps/emqx_bridge_kinesis/README.md

@@ -0,0 +1,22 @@
+# Amazon Kinesis Data Integration Bridge
+
+This application houses the Amazon Kinesis Producer data
+integration bridge for EMQX Enterprise Edition. It provides the means to
+connect to Amazon Kinesis Data Streams and publish messages to it.
+
+# Documentation links
+
+For more information about Amazon Kinesis Data Streams, please see its
+[official site](https://aws.amazon.com/kinesis/data-streams/).
+
+# Configurations
+
+Please see [Ingest Data into Kinesis](https://docs.emqx.com/en/enterprise/v5.1/data-integration/data-bridge-kinesis.html) for more detailed info.
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

+ 2 - 0
apps/emqx_bridge_kinesis/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+kinesis

+ 11 - 0
apps/emqx_bridge_kinesis/rebar.config

@@ -0,0 +1,11 @@
+%% -*- mode: erlang; -*-
+{erl_opts, [debug_info]}.
+{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
+
+{shell, [
+    {apps, [emqx_bridge_kinesis]}
+]}.

+ 13 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src

@@ -0,0 +1,13 @@
+{application, emqx_bridge_kinesis, [
+    {description, "EMQX Enterprise Amazon Kinesis Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        erlcloud
+    ]},
+    {env, []},
+    {modules, []},
+    {links, []}
+]}.

+ 167 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl

@@ -0,0 +1,167 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis).
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% hocon_schema API
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-export([
+    conn_bridge_examples/1
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() ->
+    "bridge_kinesis".
+
+roots() ->
+    [].
+
+fields("config_producer") ->
+    emqx_bridge_schema:common_bridge_fields() ++
+        emqx_resource_schema:fields("resource_opts") ++
+        fields(connector_config) ++ fields(producer);
+fields(connector_config) ->
+    [
+        {aws_access_key_id,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("aws_access_key_id")
+                }
+            )},
+        {aws_secret_access_key,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("aws_secret_access_key"),
+                    sensitive => true
+                }
+            )},
+        {endpoint,
+            mk(
+                binary(),
+                #{
+                    default => <<"https://kinesis.us-east-1.amazonaws.com">>,
+                    desc => ?DESC("endpoint")
+                }
+            )},
+        {max_retries,
+            mk(
+                non_neg_integer(),
+                #{
+                    required => false,
+                    default => 2,
+                    desc => ?DESC("max_retries")
+                }
+            )},
+        {pool_size,
+            sc(
+                pos_integer(),
+                #{
+                    default => 8,
+                    desc => ?DESC("pool_size")
+                }
+            )}
+    ];
+fields(producer) ->
+    [
+        {payload_template,
+            sc(
+                binary(),
+                #{
+                    default => <<>>,
+                    desc => ?DESC("payload_template")
+                }
+            )},
+        {local_topic,
+            sc(
+                binary(),
+                #{
+                    desc => ?DESC("local_topic")
+                }
+            )},
+        {stream_name,
+            sc(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("stream_name")
+                }
+            )},
+        {partition_key,
+            sc(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("partition_key")
+                }
+            )}
+    ];
+fields("get_producer") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_producer");
+fields("post_producer") ->
+    [type_field_producer(), name_field() | fields("config_producer")];
+fields("put_producer") ->
+    fields("config_producer").
+
+desc("config_producer") ->
+    ?DESC("desc_config");
+desc(_) ->
+    undefined.
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"kinesis_producer">> => #{
+                summary => <<"Amazon Kinesis Producer Bridge">>,
+                value => values(producer, Method)
+            }
+        }
+    ].
+
+values(producer, _Method) ->
+    #{
+        aws_access_key_id => <<"aws_access_key_id">>,
+        aws_secret_access_key => <<"******">>,
+        endpoint => <<"https://kinesis.us-east-1.amazonaws.com">>,
+        max_retries => 3,
+        stream_name => <<"stream_name">>,
+        partition_key => <<"key">>,
+        resource_opts => #{
+            worker_pool_size => 1,
+            health_check_interval => 15000,
+            query_mode => async,
+            inflight_window => 100,
+            max_buffer_bytes => 100 * 1024 * 1024
+        }
+    }.
+
+%%-------------------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------------------
+
+sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+mk(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+enum(OfSymbols) -> hoconsc:enum(OfSymbols).
+
+type_field_producer() ->
+    {type, mk(enum([kinesis_producer]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 178 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_connector_client.erl

@@ -0,0 +1,178 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis_connector_client).
+
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("erlcloud/include/erlcloud_aws.hrl").
+
+-behaviour(gen_server).
+
+-type state() :: #{
+    instance_id := resource_id(),
+    partition_key := binary(),
+    stream_name := binary()
+}.
+-type record() :: {Data :: binary(), PartitionKey :: binary()}.
+
+-define(DEFAULT_PORT, 443).
+
+%% API
+-export([
+    start_link/1,
+    connection_status/1,
+    query/2
+]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+-ifdef(TEST).
+-export([execute/2]).
+-endif.
+
+%% The default timeout for Kinesis API calls is 10 seconds,
+%% but this value for `gen_server:call` is 5s,
+%% so we should adjust timeout for `gen_server:call`
+-define(HEALTH_CHECK_TIMEOUT, 15000).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+connection_status(Pid) ->
+    try
+        gen_server:call(Pid, connection_status, ?HEALTH_CHECK_TIMEOUT)
+    catch
+        _:_ ->
+            {error, timeout}
+    end.
+
+query(Pid, Records) ->
+    gen_server:call(Pid, {query, Records}, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts Bridge which communicates to Amazon Kinesis Data Streams
+%% @end
+%%--------------------------------------------------------------------
+start_link(Options) ->
+    gen_server:start_link(?MODULE, Options, []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%% Initialize kinesis connector
+-spec init(emqx_bridge_kinesis_impl_producer:config()) -> {ok, state()}.
+init(#{
+    aws_access_key_id := AwsAccessKey,
+    aws_secret_access_key := AwsSecretAccessKey,
+    endpoint := Endpoint,
+    partition_key := PartitionKey,
+    stream_name := StreamName,
+    max_retries := MaxRetries,
+    instance_id := InstanceId
+}) ->
+    process_flag(trap_exit, true),
+
+    #{scheme := Scheme, hostname := Host, port := Port} =
+        emqx_schema:parse_server(
+            Endpoint,
+            #{
+                default_port => ?DEFAULT_PORT,
+                supported_schemes => ["http", "https"]
+            }
+        ),
+    State = #{
+        instance_id => InstanceId,
+        partition_key => PartitionKey,
+        stream_name => StreamName
+    },
+    New =
+        fun(AccessKeyID, SecretAccessKey, HostAddr, HostPort, ConnectionScheme) ->
+            Config0 = erlcloud_kinesis:new(
+                AccessKeyID,
+                SecretAccessKey,
+                HostAddr,
+                HostPort,
+                ConnectionScheme ++ "://"
+            ),
+            Config0#aws_config{retry_num = MaxRetries}
+        end,
+    erlcloud_config:configure(
+        to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New
+    ),
+    {ok, State}.
+
+handle_call(connection_status, _From, #{stream_name := StreamName} = State) ->
+    Status =
+        case erlcloud_kinesis:describe_stream(StreamName) of
+            {ok, _} ->
+                {ok, connected};
+            {error, {<<"ResourceNotFoundException">>, _}} ->
+                {error, unhealthy_target};
+            Error ->
+                {error, Error}
+        end,
+    {reply, Status, State};
+handle_call({query, Records}, _From, #{stream_name := StreamName} = State) ->
+    Result = do_query(StreamName, Records),
+    {reply, Result, State};
+handle_call(_Request, _From, State) ->
+    {reply, {error, unknown_call}, State}.
+
+handle_cast(_Request, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(Reason, #{instance_id := InstanceId} = _State) ->
+    ?tp(kinesis_stop, #{instance_id => InstanceId, reason => Reason}),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+-spec do_query(binary(), [record()]) ->
+    {ok, jsx:json_term() | binary()}
+    | {error, {unrecoverable_error, term()}}
+    | {error, term()}.
+do_query(StreamName, Records) ->
+    try
+        execute(put_record, {StreamName, Records})
+    catch
+        _Type:Reason ->
+            {error, {unrecoverable_error, {invalid_request, Reason}}}
+    end.
+
+-spec execute(put_record, {binary(), [record()]}) ->
+    {ok, jsx:json_term() | binary()}
+    | {error, term()}.
+execute(put_record, {StreamName, [{Data, PartitionKey}] = Record}) ->
+    Result = erlcloud_kinesis:put_record(StreamName, PartitionKey, Data),
+    ?tp(kinesis_put_record, #{records => Record, result => Result}),
+    Result;
+execute(put_record, {StreamName, Items}) when is_list(Items) ->
+    Result = erlcloud_kinesis:put_records(StreamName, Items),
+    ?tp(kinesis_put_record, #{records => Items, result => Result}),
+    Result.
+
+-spec to_str(list() | binary()) -> list().
+to_str(List) when is_list(List) ->
+    List;
+to_str(Bin) when is_binary(Bin) ->
+    erlang:binary_to_list(Bin).

+ 247 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl

@@ -0,0 +1,247 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis_impl_producer).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(HEALTH_CHECK_TIMEOUT, 15000).
+-define(TOPIC_MESSAGE,
+    "Kinesis stream is invalid. Please check if the stream exist in Kinesis account."
+).
+
+-type config() :: #{
+    aws_access_key_id := binary(),
+    aws_secret_access_key := binary(),
+    endpoint := binary(),
+    stream_name := binary(),
+    partition_key := binary(),
+    payload_template := binary(),
+    max_retries := non_neg_integer(),
+    pool_size := non_neg_integer(),
+    instance_id => resource_id(),
+    any() => term()
+}.
+-type templates() :: #{
+    partition_key := list(),
+    send_message := list()
+}.
+-type state() :: #{
+    pool_name := resource_id(),
+    templates := templates()
+}.
+-export_type([config/0]).
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2
+]).
+
+-export([
+    connect/1
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------------------
+
+callback_mode() -> always_sync.
+
+-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
+on_start(
+    InstanceId,
+    #{
+        pool_size := PoolSize
+    } = Config0
+) ->
+    ?SLOG(info, #{
+        msg => "starting_kinesis_bridge",
+        connector => InstanceId,
+        config => redact(Config0)
+    }),
+    Config = Config0#{instance_id => InstanceId},
+    Options = [
+        {config, Config},
+        {pool_size, PoolSize}
+    ],
+    Templates = parse_template(Config),
+    State = #{
+        pool_name => InstanceId,
+        templates => Templates
+    },
+
+    case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
+        ok ->
+            ?tp(emqx_bridge_kinesis_impl_producer_start_ok, #{config => Config}),
+            {ok, State};
+        Error ->
+            ?tp(emqx_bridge_kinesis_impl_producer_start_failed, #{config => Config}),
+            Error
+    end.
+
+-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, _State) ->
+    emqx_resource_pool:stop(InstanceId).
+
+-spec on_get_status(resource_id(), state()) ->
+    connected | disconnected | {disconnected, state(), {unhealthy_target, string()}}.
+on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
+    case
+        emqx_resource_pool:health_check_workers(
+            Pool,
+            {emqx_bridge_kinesis_connector_client, connection_status, []},
+            ?HEALTH_CHECK_TIMEOUT,
+            #{return_values => true}
+        )
+    of
+        {ok, Values} ->
+            AllOk = lists:all(fun(S) -> S =:= {ok, connected} end, Values),
+            case AllOk of
+                true ->
+                    connected;
+                false ->
+                    Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values),
+                    case Unhealthy of
+                        true -> {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
+                        false -> disconnected
+                    end
+            end;
+        {error, _} ->
+            disconnected
+    end.
+
+-spec on_query(
+    resource_id(),
+    {send_message, map()},
+    state()
+) ->
+    {ok, map()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+on_query(ResourceId, {send_message, Message}, State) ->
+    Requests = [{send_message, Message}],
+    ?tp(emqx_bridge_kinesis_impl_producer_sync_query, #{message => Message}),
+    do_send_requests_sync(ResourceId, Requests, State).
+
+-spec on_batch_query(
+    resource_id(),
+    [{send_message, map()}],
+    state()
+) ->
+    {ok, map()}
+    | {error, {recoverable_error, term()}}
+    | {error, term()}.
+%% we only support batch insert
+on_batch_query(ResourceId, [{send_message, _} | _] = Requests, State) ->
+    ?tp(emqx_bridge_kinesis_impl_producer_sync_batch_query, #{requests => Requests}),
+    do_send_requests_sync(ResourceId, Requests, State).
+
+connect(Opts) ->
+    Options = proplists:get_value(config, Opts),
+    emqx_bridge_kinesis_connector_client:start_link(Options).
+
+%%-------------------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------------------
+
+-spec do_send_requests_sync(
+    resource_id(),
+    [{send_message, map()}],
+    state()
+) ->
+    {ok, jsx:json_term() | binary()}
+    | {error, {recoverable_error, term()}}
+    | {error, {unrecoverable_error, {invalid_request, term()}}}
+    | {error, {unrecoverable_error, {unhealthy_target, string()}}}
+    | {error, {unrecoverable_error, term()}}
+    | {error, term()}.
+do_send_requests_sync(
+    InstanceId,
+    Requests,
+    #{pool_name := PoolName, templates := Templates}
+) ->
+    Records = render_records(Requests, Templates),
+    Result = ecpool:pick_and_do(
+        PoolName,
+        {emqx_bridge_kinesis_connector_client, query, [Records]},
+        no_handover
+    ),
+    handle_result(Result, Requests, InstanceId).
+
+handle_result({ok, _} = Result, _Requests, _InstanceId) ->
+    Result;
+handle_result({error, {<<"ResourceNotFoundException">>, _} = Reason}, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {unrecoverable_error, {unhealthy_target, ?TOPIC_MESSAGE}}};
+handle_result(
+    {error, {<<"ProvisionedThroughputExceededException">>, _} = Reason}, Requests, InstanceId
+) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {recoverable_error, Reason}};
+handle_result({error, {<<"InvalidArgumentException">>, _} = Reason}, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {unrecoverable_error, Reason}};
+handle_result({error, {econnrefused = Reason, _}}, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    {error, {recoverable_error, Reason}};
+handle_result({error, Reason} = Error, Requests, InstanceId) ->
+    ?SLOG(error, #{
+        msg => "kinesis_error_response",
+        request => Requests,
+        connector => InstanceId,
+        reason => Reason
+    }),
+    Error.
+
+parse_template(Config) ->
+    #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config,
+    Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate},
+    maps:map(fun(_K, V) -> emqx_placeholder:preproc_tmpl(V) end, Templates).
+
+render_records(Items, Templates) ->
+    PartitionKeyTemplate = maps:get(partition_key, Templates),
+    MsgTemplate = maps:get(send_message, Templates),
+    render_messages(Items, {MsgTemplate, PartitionKeyTemplate}, []).
+
+render_messages([], _Templates, RenderedMsgs) ->
+    RenderedMsgs;
+render_messages(
+    [{send_message, Msg} | Others],
+    {MsgTemplate, PartitionKeyTemplate} = Templates,
+    RenderedMsgs
+) ->
+    Data = emqx_placeholder:proc_tmpl(MsgTemplate, Msg),
+    PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTemplate, Msg),
+    RenderedMsg = {Data, PartitionKey},
+    render_messages(Others, Templates, [RenderedMsg | RenderedMsgs]).
+
+redact(Config) ->
+    emqx_utils:redact(Config, fun(Any) -> Any =:= aws_secret_access_key end).

+ 817 - 0
apps/emqx_bridge_kinesis/test/emqx_bridge_kinesis_impl_producer_SUITE.erl

@@ -0,0 +1,817 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_kinesis_impl_producer_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(PRODUCER, emqx_bridge_kinesis_impl_producer).
+-define(BRIDGE_TYPE, kinesis_producer).
+-define(BRIDGE_TYPE_BIN, <<"kinesis_producer">>).
+-define(KINESIS_PORT, 4566).
+-define(TOPIC, <<"t/topic">>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, with_batch},
+        {group, without_batch}
+    ].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {with_batch, TCs},
+        {without_batch, TCs}
+    ].
+
+init_per_suite(Config) ->
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy.emqx.net"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    ProxyName = "kinesis",
+    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
+    {ok, _} = application:ensure_all_started(emqx_connector),
+    emqx_mgmt_api_test_util:init_suite(),
+    [
+        {proxy_host, ProxyHost},
+        {proxy_port, ProxyPort},
+        {kinesis_port, ?KINESIS_PORT},
+        {proxy_name, ProxyName}
+        | Config
+    ].
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
+    _ = application:stop(emqx_connector),
+    ok.
+
+init_per_group(with_batch, Config) ->
+    [{batch_size, 100} | Config];
+init_per_group(without_batch, Config) ->
+    [{batch_size, 1} | Config];
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(TestCase, Config0) ->
+    ok = snabbkaffe:start_trace(),
+    ProxyHost = ?config(proxy_host, Config0),
+    ProxyPort = ?config(proxy_port, Config0),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    TimeTrap =
+        case TestCase of
+            t_wrong_server -> 60;
+            _ -> 30
+        end,
+    ct:timetrap({seconds, TimeTrap}),
+    delete_all_bridges(),
+    Tid = install_telemetry_handler(TestCase),
+    put(telemetry_table, Tid),
+    Config = generate_config(Config0),
+    create_stream(Config),
+    [{telemetry_table, Tid} | Config].
+
+end_per_testcase(_TestCase, Config) ->
+    ok = snabbkaffe:stop(),
+    delete_all_bridges(),
+    delete_stream(Config),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+generate_config(Config0) ->
+    #{
+        name := Name,
+        config_string := ConfigString,
+        kinesis_config := KinesisConfig
+    } = kinesis_config(Config0),
+    Endpoint = map_get(<<"endpoint">>, KinesisConfig),
+    #{scheme := Scheme, hostname := Host, port := Port} =
+        emqx_schema:parse_server(
+            Endpoint,
+            #{
+                default_port => 443,
+                supported_schemes => ["http", "https"]
+            }
+        ),
+    ErlcloudConfig = erlcloud_kinesis:new("access_key", "secret", Host, Port, Scheme ++ "://"),
+    ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name),
+    [
+        {kinesis_name, Name},
+        {connection_scheme, Scheme},
+        {kinesis_config, KinesisConfig},
+        {kinesis_config_string, ConfigString},
+        {resource_id, ResourceId},
+        {bridge_id, BridgeId},
+        {erlcloud_config, ErlcloudConfig}
+        | Config0
+    ].
+
+kinesis_config(Config) ->
+    QueryMode = proplists:get_value(query_mode, Config, async),
+    Scheme = proplists:get_value(connection_scheme, Config, "http"),
+    ProxyHost = proplists:get_value(proxy_host, Config),
+    KinesisPort = proplists:get_value(kinesis_port, Config),
+    BatchSize = proplists:get_value(batch_size, Config, 100),
+    BatchTime = proplists:get_value(batch_time, Config, <<"500ms">>),
+    PayloadTemplate = proplists:get_value(payload_template, Config, "${payload}"),
+    StreamName = proplists:get_value(stream_name, Config, <<"mystream">>),
+    PartitionKey = proplists:get_value(partition_key, Config, <<"key">>),
+    MaxRetries = proplists:get_value(max_retries, Config, 3),
+    GUID = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>,
+    ConfigString =
+        io_lib:format(
+            "bridges.kinesis_producer.~s {\n"
+            "  enable = true\n"
+            "  aws_access_key_id = \"aws_access_key_id\"\n"
+            "  aws_secret_access_key = \"aws_secret_access_key\"\n"
+            "  endpoint = \"~s://~s:~b\"\n"
+            "  stream_name = \"~s\"\n"
+            "  partition_key = \"~s\"\n"
+            "  payload_template = \"~s\"\n"
+            "  max_retries = ~b\n"
+            "  pool_size = 1\n"
+            "  resource_opts = {\n"
+            "    health_check_interval = \"3s\"\n"
+            "    request_ttl = 30s\n"
+            "    resume_interval = 1s\n"
+            "    metrics_flush_interval = \"700ms\"\n"
+            "    worker_pool_size = 1\n"
+            "    query_mode = ~s\n"
+            "    batch_size = ~b\n"
+            "    batch_time = \"~s\"\n"
+            "  }\n"
+            "}\n",
+            [
+                Name,
+                Scheme,
+                ProxyHost,
+                KinesisPort,
+                StreamName,
+                PartitionKey,
+                PayloadTemplate,
+                MaxRetries,
+                QueryMode,
+                BatchSize,
+                BatchTime
+            ]
+        ),
+    #{
+        name => Name,
+        config_string => ConfigString,
+        kinesis_config => parse_and_check(ConfigString, Name)
+    }.
+
+parse_and_check(ConfigString, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = <<"kinesis_producer">>,
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+delete_all_bridges() ->
+    ct:pal("deleting all bridges"),
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ).
+
+delete_bridge(Config) ->
+    Type = ?BRIDGE_TYPE,
+    Name = ?config(kinesis_name, Config),
+    ct:pal("deleting bridge ~p", [{Type, Name}]),
+    emqx_bridge:remove(Type, Name).
+
+create_bridge_http(Config) ->
+    create_bridge_http(Config, _KinesisConfigOverrides = #{}).
+
+create_bridge_http(Config, KinesisConfigOverrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(kinesis_name, Config),
+    KinesisConfig0 = ?config(kinesis_config, Config),
+    KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides),
+    Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    ProbeResult = emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params),
+    ct:pal("creating bridge (via http): ~p", [Params]),
+    ct:pal("probe result: ~p", [ProbeResult]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+            {ok, Res0} -> {ok, emqx_utils_json:decode(Res0, [return_maps])};
+            Error -> Error
+        end,
+    ct:pal("bridge creation result: ~p", [Res]),
+    ?assertEqual(element(1, ProbeResult), element(1, Res)),
+    Res.
+
+create_bridge(Config) ->
+    create_bridge(Config, _KinesisConfigOverrides = #{}).
+
+create_bridge(Config, KinesisConfigOverrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(kinesis_name, Config),
+    KinesisConfig0 = ?config(kinesis_config, Config),
+    KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides),
+    ct:pal("creating bridge: ~p", [KinesisConfig]),
+    Res = emqx_bridge:create(TypeBin, Name, KinesisConfig),
+    ct:pal("bridge creation result: ~p", [Res]),
+    Res.
+
+create_rule_and_action_http(Config) ->
+    BridgeId = ?config(bridge_id, Config),
+    Params = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"", ?TOPIC/binary, "\"">>,
+        actions => [BridgeId]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+create_stream(Config) ->
+    KinesisConfig = ?config(kinesis_config, Config),
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    StreamName = map_get(<<"stream_name">>, KinesisConfig),
+    {ok, _} = application:ensure_all_started(erlcloud),
+    delete_stream(StreamName, ErlcloudConfig),
+    {ok, _} = erlcloud_kinesis:create_stream(StreamName, 1, ErlcloudConfig),
+    ?retry(
+        _Sleep = 100,
+        _Attempts = 10,
+        begin
+            {ok, [{<<"StreamDescription">>, StreamInfo}]} =
+                erlcloud_kinesis:describe_stream(StreamName, ErlcloudConfig),
+            ?assertEqual(
+                <<"ACTIVE">>,
+                proplists:get_value(<<"StreamStatus">>, StreamInfo)
+            )
+        end
+    ),
+    ok.
+
+delete_stream(Config) ->
+    KinesisConfig = ?config(kinesis_config, Config),
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    StreamName = map_get(<<"stream_name">>, KinesisConfig),
+    {ok, _} = application:ensure_all_started(erlcloud),
+    delete_stream(StreamName, ErlcloudConfig),
+    ok.
+
+delete_stream(StreamName, ErlcloudConfig) ->
+    case erlcloud_kinesis:delete_stream(StreamName, ErlcloudConfig) of
+        {ok, _} ->
+            ?retry(
+                _Sleep = 100,
+                _Attempts = 10,
+                ?assertMatch(
+                    {error, {<<"ResourceNotFoundException">>, _}},
+                    erlcloud_kinesis:describe_stream(StreamName, ErlcloudConfig)
+                )
+            );
+        _ ->
+            ok
+    end,
+    ok.
+
+wait_record(Config, ShardIt, Timeout, Attempts) ->
+    [Record] = wait_records(Config, ShardIt, 1, Timeout, Attempts),
+    Record.
+
+wait_records(Config, ShardIt, Count, Timeout, Attempts) ->
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    ?retry(
+        Timeout,
+        Attempts,
+        begin
+            {ok, Ret} = erlcloud_kinesis:get_records(ShardIt, ErlcloudConfig),
+            Records = proplists:get_value(<<"Records">>, Ret),
+            Count = length(Records),
+            Records
+        end
+    ).
+
+get_shard_iterator(Config) ->
+    get_shard_iterator(Config, 1).
+
+get_shard_iterator(Config, Index) ->
+    KinesisConfig = ?config(kinesis_config, Config),
+    ErlcloudConfig = ?config(erlcloud_config, Config),
+    StreamName = map_get(<<"stream_name">>, KinesisConfig),
+    {ok, [{<<"Shards">>, Shards}]} = erlcloud_kinesis:list_shards(StreamName, ErlcloudConfig),
+    Shard = lists:nth(Index, lists:sort(Shards)),
+    ShardId = proplists:get_value(<<"ShardId">>, Shard),
+    {ok, [{<<"ShardIterator">>, ShardIt}]} =
+        erlcloud_kinesis:get_shard_iterator(StreamName, ShardId, <<"LATEST">>, ErlcloudConfig),
+    ShardIt.
+
+install_telemetry_handler(TestCase) ->
+    Tid = ets:new(TestCase, [ordered_set, public]),
+    HandlerId = TestCase,
+    TestPid = self(),
+    _ = telemetry:attach_many(
+        HandlerId,
+        emqx_resource_metrics:events(),
+        fun(EventName, Measurements, Metadata, _Config) ->
+            Data = #{
+                name => EventName,
+                measurements => Measurements,
+                metadata => Metadata
+            },
+            ets:insert(Tid, {erlang:monotonic_time(), Data}),
+            TestPid ! {telemetry, Data},
+            ok
+        end,
+        unused_config
+    ),
+    emqx_common_test_helpers:on_exit(fun() ->
+        telemetry:detach(HandlerId),
+        ets:delete(Tid)
+    end),
+    Tid.
+
+current_metrics(ResourceId) ->
+    Mapping = metrics_mapping(),
+    maps:from_list([
+        {Metric, F(ResourceId)}
+     || {Metric, F} <- maps:to_list(Mapping)
+    ]).
+
+metrics_mapping() ->
+    #{
+        dropped => fun emqx_resource_metrics:dropped_get/1,
+        dropped_expired => fun emqx_resource_metrics:dropped_expired_get/1,
+        dropped_other => fun emqx_resource_metrics:dropped_other_get/1,
+        dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1,
+        dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1,
+        dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1,
+        late_reply => fun emqx_resource_metrics:late_reply_get/1,
+        failed => fun emqx_resource_metrics:failed_get/1,
+        inflight => fun emqx_resource_metrics:inflight_get/1,
+        matched => fun emqx_resource_metrics:matched_get/1,
+        queuing => fun emqx_resource_metrics:queuing_get/1,
+        retried => fun emqx_resource_metrics:retried_get/1,
+        retried_failed => fun emqx_resource_metrics:retried_failed_get/1,
+        retried_success => fun emqx_resource_metrics:retried_success_get/1,
+        success => fun emqx_resource_metrics:success_get/1
+    }.
+
+assert_metrics(ExpectedMetrics, ResourceId) ->
+    Mapping = metrics_mapping(),
+    Metrics =
+        lists:foldl(
+            fun(Metric, Acc) ->
+                #{Metric := Fun} = Mapping,
+                Value = Fun(ResourceId),
+                Acc#{Metric => Value}
+            end,
+            #{},
+            maps:keys(ExpectedMetrics)
+        ),
+    CurrentMetrics = current_metrics(ResourceId),
+    TelemetryTable = get(telemetry_table),
+    RecordedEvents = ets:tab2list(TelemetryTable),
+    ?assertEqual(ExpectedMetrics, Metrics, #{
+        current_metrics => CurrentMetrics, recorded_events => RecordedEvents
+    }),
+    ok.
+
+assert_empty_metrics(ResourceId) ->
+    Mapping = metrics_mapping(),
+    ExpectedMetrics =
+        lists:foldl(
+            fun(Metric, Acc) ->
+                Acc#{Metric => 0}
+            end,
+            #{},
+            maps:keys(Mapping)
+        ),
+    assert_metrics(ExpectedMetrics, ResourceId).
+
+wait_telemetry_event(TelemetryTable, EventName, ResourceId) ->
+    wait_telemetry_event(TelemetryTable, EventName, ResourceId, #{timeout => 5_000, n_events => 1}).
+
+wait_telemetry_event(
+    TelemetryTable,
+    EventName,
+    ResourceId,
+    _Opts = #{
+        timeout := Timeout,
+        n_events := NEvents
+    }
+) ->
+    wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName).
+
+wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when NEvents =< 0 ->
+    ok;
+wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
+    receive
+        {telemetry, #{name := [_, _, EventName], measurements := #{counter_inc := Inc}} = Event} ->
+            ct:pal("telemetry event: ~p", [Event]),
+            wait_n_events(TelemetryTable, ResourceId, NEvents - Inc, Timeout, EventName)
+    after Timeout ->
+        RecordedEvents = ets:tab2list(TelemetryTable),
+        CurrentMetrics = current_metrics(ResourceId),
+        ct:pal("recorded events: ~p", [RecordedEvents]),
+        ct:pal("current metrics: ~p", [CurrentMetrics]),
+        error({timeout_waiting_for_telemetry, EventName})
+    end.
+
+wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
+    Events = receive_all_events(GaugeName, Timeout),
+    case length(Events) > 0 andalso lists:last(Events) of
+        #{measurements := #{gauge_set := ExpectedValue}} ->
+            ok;
+        #{measurements := #{gauge_set := Value}} ->
+            ct:pal("events: ~p", [Events]),
+            ct:fail(
+                "gauge ~p didn't reach expected value ~p; last value: ~p",
+                [GaugeName, ExpectedValue, Value]
+            );
+        false ->
+            ct:pal("no ~p gauge events received!", [GaugeName])
+    end.
+
+receive_all_events(EventName, Timeout) ->
+    receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []).
+
+receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
+    lists:reverse(Acc);
+receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
+    receive
+        {telemetry, #{name := [_, _, EventName]} = Event} ->
+            receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
+    after Timeout ->
+        lists:reverse(Acc)
+    end.
+
+to_str(List) when is_list(List) ->
+    List;
+to_str(Bin) when is_binary(Bin) ->
+    erlang:binary_to_list(Bin);
+to_str(Int) when is_integer(Int) ->
+    erlang:integer_to_list(Int).
+
+to_bin(Str) when is_list(Str) ->
+    erlang:list_to_binary(Str).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_create_via_http(Config) ->
+    ?assertMatch({ok, _}, create_bridge_http(Config)),
+    ok.
+
+t_start_failed_then_fix(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = ?config(resource_id, Config),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ct:sleep(1000),
+        ?wait_async_action(
+            create_bridge(Config),
+            #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_failed},
+            20_000
+        )
+    end),
+    ?retry(
+        _Sleep1 = 1_000,
+        _Attempts1 = 30,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    ok.
+
+t_stop(Config) ->
+    Name = ?config(kinesis_name, Config),
+    {ok, _} = create_bridge(Config),
+    ?check_trace(
+        ?wait_async_action(
+            emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
+            #{?snk_kind := kinesis_stop},
+            5_000
+        ),
+        fun(Trace) ->
+            ?assertMatch([_], ?of_kind(kinesis_stop, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_get_status_ok(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    {ok, _} = create_bridge(Config),
+    ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+    ok.
+
+t_create_unhealthy(Config) ->
+    delete_stream(Config),
+    ResourceId = ?config(resource_id, Config),
+    {ok, _} = create_bridge(Config),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertMatch(
+        {ok, _, #{error := {unhealthy_target, _}}},
+        emqx_resource_manager:lookup_cached(ResourceId)
+    ),
+    ok.
+
+t_get_status_unhealthy(Config) ->
+    delete_stream(Config),
+    ResourceId = ?config(resource_id, Config),
+    {ok, _} = create_bridge(Config),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertMatch(
+        {ok, _, #{error := {unhealthy_target, _}}},
+        emqx_resource_manager:lookup_cached(ResourceId)
+    ),
+    ok.
+
+t_publish_success(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"payload">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    emqx:publish(Message),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 1
+        },
+        ResourceId
+    ),
+    Record = wait_record(Config, ShardIt, 100, 10),
+    ?assertEqual(Payload, proplists:get_value(<<"Data">>, Record)),
+    ok.
+
+t_publish_success_with_template(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    Overrides =
+        #{
+            <<"payload_template">> => <<"${payload.data}">>,
+            <<"partition_key">> => <<"${payload.key}">>
+        },
+    ?assertMatch({ok, _}, create_bridge(Config, Overrides)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"{\"key\":\"my_key\", \"data\":\"my_data\"}">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    emqx:publish(Message),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 1
+        },
+        ResourceId
+    ),
+    Record = wait_record(Config, ShardIt, 100, 10),
+    ?assertEqual(<<"my_data">>, proplists:get_value(<<"Data">>, Record)),
+    ok.
+
+t_publish_multiple_msgs_success(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    lists:foreach(
+        fun(I) ->
+            Payload = "payload_" ++ to_str(I),
+            Message = emqx_message:make(?TOPIC, Payload),
+            emqx:publish(Message)
+        end,
+        lists:seq(1, 10)
+    ),
+    Records = wait_records(Config, ShardIt, 10, 100, 10),
+    ReceivedPayloads =
+        lists:map(fun(Record) -> proplists:get_value(<<"Data">>, Record) end, Records),
+    lists:foreach(
+        fun(I) ->
+            ExpectedPayload = to_bin("payload_" ++ to_str(I)),
+            ?assertEqual(
+                {ExpectedPayload, true},
+                {ExpectedPayload, lists:member(ExpectedPayload, ReceivedPayloads)}
+            )
+        end,
+        lists:seq(1, 10)
+    ),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 10,
+            queuing => 0,
+            retried => 0,
+            success => 10
+        },
+        ResourceId
+    ),
+    ok.
+
+t_publish_unhealthy(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"payload">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    delete_stream(Config),
+    emqx:publish(Message),
+    ?assertError(
+        {badmatch, {error, {<<"ResourceNotFoundException">>, _}}},
+        wait_record(Config, ShardIt, 100, 10)
+    ),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, failed, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 1,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 0
+        },
+        ResourceId
+    ),
+    ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+    ?assertMatch(
+        {ok, _, #{error := {unhealthy_target, _}}},
+        emqx_resource_manager:lookup_cached(ResourceId)
+    ),
+    ok.
+
+t_publish_big_msg(Config) ->
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    % Maximum size is 1MB. Using 1MB + 1 here.
+    Payload = binary:copy(<<"a">>, 1 * 1024 * 1024 + 1),
+    Message = emqx_message:make(?TOPIC, Payload),
+    emqx:publish(Message),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, failed, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 1,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 0,
+            success => 0
+        },
+        ResourceId
+    ),
+    ok.
+
+t_publish_connection_down(Config0) ->
+    Config = generate_config([{max_retries, 2} | Config0]),
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = ?config(resource_id, Config),
+    TelemetryTable = ?config(telemetry_table, Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
+    ?retry(
+        _Sleep1 = 1_000,
+        _Attempts1 = 30,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+    assert_empty_metrics(ResourceId),
+    ShardIt = get_shard_iterator(Config),
+    Payload = <<"payload">>,
+    Message = emqx_message:make(?TOPIC, Payload),
+    Kind =
+        case proplists:get_value(batch_size, Config) of
+            1 -> emqx_bridge_kinesis_impl_producer_sync_query;
+            _ -> emqx_bridge_kinesis_impl_producer_sync_batch_query
+        end,
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ct:sleep(1000),
+        ?wait_async_action(
+            emqx:publish(Message),
+            #{?snk_kind := Kind},
+            5_000
+        ),
+        ct:sleep(1000)
+    end),
+    % Wait for reconnection.
+    ?retry(
+        _Sleep3 = 1_000,
+        _Attempts3 = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    Record = wait_record(Config, ShardIt, 2000, 10),
+    %% to avoid test flakiness
+    wait_telemetry_event(TelemetryTable, retried_success, ResourceId),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
+    assert_metrics(
+        #{
+            dropped => 0,
+            failed => 0,
+            inflight => 0,
+            matched => 1,
+            queuing => 0,
+            retried => 1,
+            success => 1,
+            retried_success => 1
+        },
+        ResourceId
+    ),
+    Data = proplists:get_value(<<"Data">>, Record),
+    ?assertEqual(Payload, Data),
+    ok.
+
+t_wrong_server(Config) ->
+    Name = ?config(kinesis_name, Config),
+    ResourceId = ?config(resource_id, Config),
+    Overrides =
+        #{
+            <<"max_retries">> => 0,
+            <<"endpoint">> => <<"https://wrong_server:12345">>,
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"60s">>
+            }
+        },
+    ?wait_async_action(
+        create_bridge(Config, Overrides),
+        #{?snk_kind := emqx_bridge_kinesis_impl_producer_start_ok},
+        30_000
+    ),
+    ?assertEqual({error, timeout}, emqx_resource_manager:health_check(ResourceId)),
+    emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
+    emqx_bridge_resource:remove(?BRIDGE_TYPE, Name),
+    ok.

+ 13 - 0
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -16,6 +16,7 @@
 -module(emqx_ds).
 
 %% API:
+-export([ensure_shard/2]).
 %%   Messages:
 -export([message_store/2, message_store/1, message_stats/0]).
 %%   Iterator:
@@ -79,6 +80,18 @@
 %% API funcions
 %%================================================================================
 
+-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
+    ok | {error, _Reason}.
+ensure_shard(Shard, Options) ->
+    case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
+        {ok, _Pid} ->
+            ok;
+        {error, {already_started, _Pid}} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
 %%--------------------------------------------------------------------------------
 %% Message
 %%--------------------------------------------------------------------------------

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -175,7 +175,7 @@
     cf :: rocksdb:cf_handle(),
     keymapper :: keymapper(),
     write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(),
-    read_options = [] :: emqx_ds_storage_layer:db_write_options()
+    read_options = [] :: emqx_ds_storage_layer:db_read_options()
 }).
 
 -record(it, {

+ 23 - 17
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -6,7 +6,7 @@
 -behaviour(gen_server).
 
 %% API:
--export([start_link/1]).
+-export([start_link/2]).
 -export([create_generation/3]).
 
 -export([store/5]).
@@ -18,7 +18,8 @@
 %% behaviour callbacks:
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
--export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]).
+-export_type([cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
+-export_type([db_options/0, db_write_options/0, db_read_options/0]).
 
 -compile({inline, [meta_lookup/2]}).
 
@@ -26,10 +27,16 @@
 %% Type declarations
 %%================================================================================
 
-%% see rocksdb:db_options()
-% -type options() :: proplists:proplist().
+-type options() :: #{
+    dir => file:filename()
+}.
 
+%% see rocksdb:db_options()
+-type db_options() :: proplists:proplist().
+%% see rocksdb:write_options()
 -type db_write_options() :: proplists:proplist().
+%% see rocksdb:read_options()
+-type db_read_options() :: proplists:proplist().
 
 -type cf_refs() :: [{string(), rocksdb:cf_handle()}].
 
@@ -110,18 +117,16 @@
 %% API funcions
 %%================================================================================
 
--spec start_link(emqx_ds:shard()) -> {ok, pid()}.
-start_link(Shard) ->
-    gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []).
+-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}.
+start_link(Shard, Options) ->
+    gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
 
 -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
     {ok, gen_id()} | {error, nonmonotonic}.
 create_generation(Shard, Since, Config = {_Module, _Options}) ->
     gen_server:call(?REF(Shard), {create_generation, Since, Config}).
 
--spec store(
-    emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()
-) ->
+-spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
     ok | {error, _}.
 store(Shard, GUID, Time, Topic, Msg) ->
     {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
@@ -181,9 +186,9 @@ discard_iterator(Shard, ReplayID) ->
 %% behaviour callbacks
 %%================================================================================
 
-init([Shard]) ->
+init({Shard, Options}) ->
     process_flag(trap_exit, true),
-    {ok, S0} = open_db(Shard),
+    {ok, S0} = open_db(Shard, Options),
     S = ensure_current_generation(S0),
     ok = populate_metadata(S),
     {ok, S}.
@@ -265,16 +270,17 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
     },
     {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
 
--spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}.
-open_db(Shard) ->
-    Filename = binary_to_list(Shard),
+-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
+open_db(Shard, Options) ->
+    DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)),
     DBOptions = [
         {create_if_missing, true},
         {create_missing_column_families, true}
         | emqx_ds_conf:db_options()
     ],
+    _ = filelib:ensure_dir(DBDir),
     ExistingCFs =
-        case rocksdb:list_column_families(Filename, DBOptions) of
+        case rocksdb:list_column_families(DBDir, DBOptions) of
             {ok, CFs} ->
                 [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
             % DB is not present. First start
@@ -286,7 +292,7 @@ open_db(Shard) ->
         {?ITERATOR_CF, ?ITERATOR_CF_OPTS}
         | ExistingCFs
     ],
-    case rocksdb:open(Filename, DBOptions, ColumnFamilies) of
+    case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
         {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
             {CFNames, _} = lists:unzip(ExistingCFs),
             {ok, #s{

+ 9 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -6,7 +6,7 @@
 -behaviour(supervisor).
 
 %% API:
--export([start_link/0, start_shard/1, stop_shard/1]).
+-export([start_link/0, start_shard/2, stop_shard/1]).
 
 %% behaviour callbacks:
 -export([init/1]).
@@ -25,9 +25,10 @@
 start_link() ->
     supervisor:start_link({local, ?SUP}, ?MODULE, []).
 
--spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret().
-start_shard(Shard) ->
-    supervisor:start_child(?SUP, shard_child_spec(Shard)).
+-spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+    supervisor:startchild_ret().
+start_shard(Shard, Options) ->
+    supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
 
 -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
 stop_shard(Shard) ->
@@ -51,11 +52,12 @@ init([]) ->
 %% Internal functions
 %%================================================================================
 
--spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec().
-shard_child_spec(Shard) ->
+-spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+    supervisor:child_spec().
+shard_child_spec(Shard, Options) ->
     #{
         id => Shard,
-        start => {emqx_ds_storage_layer, start_link, [Shard]},
+        start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
         shutdown => 5_000,
         restart => permanent,
         type => worker

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,7 +2,7 @@
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, rocksdb, gproc, mria]},

+ 2 - 2
apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl

@@ -33,7 +33,7 @@
 %% Smoke test for opening and reopening the database
 t_open(_Config) ->
     ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD).
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
 
 %% Smoke test of store function
 t_store(_Config) ->
@@ -263,7 +263,7 @@ end_per_suite(_Config) ->
 
 init_per_testcase(TC, Config) ->
     ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)),
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}),
     Config.
 
 end_per_testcase(TC, _Config) ->

+ 2 - 1
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -146,7 +146,8 @@ basic_reboot_apps() ->
             emqx_slow_subs,
             emqx_auto_subscribe,
             emqx_plugins,
-            emqx_psk
+            emqx_psk,
+            emqx_durable_storage
         ] ++ basic_reboot_apps_edition(emqx_release:edition()).
 
 basic_reboot_apps_edition(ce) ->

+ 2 - 1
apps/emqx_management/src/emqx_mgmt_data_backup.erl

@@ -57,7 +57,8 @@
     <<"flapping_detect">>,
     <<"broker">>,
     <<"force_gc">>,
-    <<"zones">>
+    <<"zones">>,
+    <<"slow_subs">>
 ]).
 
 -define(DEFAULT_OPTS, #{}).

+ 1 - 1
apps/emqx_modules/src/emqx_modules.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_modules, [
     {description, "EMQX Modules"},
-    {vsn, "5.0.17"},
+    {vsn, "5.0.18"},
     {modules, []},
     {applications, [kernel, stdlib, emqx, emqx_ctl]},
     {mod, {emqx_modules_app, []}},

+ 39 - 2
apps/emqx_modules/src/emqx_modules_conf.erl

@@ -18,6 +18,7 @@
 -module(emqx_modules_conf).
 
 -behaviour(emqx_config_handler).
+-behaviour(emqx_config_backup).
 
 %% Load/Unload
 -export([
@@ -37,6 +38,11 @@
     post_config_update/5
 ]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 %%--------------------------------------------------------------------
 %% Load/Unload
 %%--------------------------------------------------------------------
@@ -78,6 +84,20 @@ remove_topic_metrics(Topic) ->
         {error, Reason} -> {error, Reason}
     end.
 
+%%--------------------------------------------------------------------
+%% Data backup (Topic-Metrics)
+%%--------------------------------------------------------------------
+
+import_config(#{<<"topic_metrics">> := Topics}) ->
+    case emqx_conf:update([topic_metrics], {merge_topics, Topics}, #{override_to => cluster}) of
+        {ok, _} ->
+            {ok, #{root_key => topic_metrics, changed => []}};
+        Error ->
+            {error, #{root_key => topic_metrics, reason => Error}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => topic_metrics, changed => []}}.
+
 %%--------------------------------------------------------------------
 %%  Config Handler
 %%--------------------------------------------------------------------
@@ -103,7 +123,13 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
             {ok, RawConf -- [Topic]};
         _ ->
             {error, not_found}
-    end.
+    end;
+pre_config_update(_, {merge_topics, NewConf}, OldConf) ->
+    KeyFun = fun(#{<<"topic">> := T}) -> T end,
+    MergedConf = emqx_utils:merge_lists(OldConf, NewConf, KeyFun),
+    {ok, MergedConf};
+pre_config_update(_, NewConf, _OldConf) ->
+    {ok, NewConf}.
 
 -spec post_config_update(
     list(atom()),
@@ -113,7 +139,6 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
     emqx_config:app_envs()
 ) ->
     ok | {ok, Result :: any()} | {error, Reason :: term()}.
-
 post_config_update(
     _,
     {add_topic_metrics, Topic},
@@ -135,6 +160,18 @@ post_config_update(
     case emqx_topic_metrics:deregister(Topic) of
         ok -> ok;
         {error, Reason} -> {error, Reason}
+    end;
+post_config_update(_, _UpdateReq, NewConfig, OldConfig, _AppEnvs) ->
+    #{
+        removed := Removed,
+        added := Added
+    } = emqx_utils:diff_lists(NewConfig, OldConfig, fun(#{topic := T}) -> T end),
+    Deregistered = [emqx_topic_metrics:deregister(T) || #{topic := T} <- Removed],
+    Registered = [emqx_topic_metrics:register(T) || #{topic := T} <- Added],
+    Errs = [Res || Res <- Registered ++ Deregistered, Res =/= ok],
+    case Errs of
+        [] -> ok;
+        _ -> {error, Errs}
     end.
 
 %%--------------------------------------------------------------------

+ 38 - 4
apps/emqx_modules/test/emqx_modules_conf_SUITE.erl

@@ -39,12 +39,46 @@ end_per_suite(_Conf) ->
 init_per_testcase(_CaseName, Conf) ->
     Conf.
 
+end_per_testcase(_CaseName, _Conf) ->
+    [emqx_modules_conf:remove_topic_metrics(T) || T <- emqx_modules_conf:topic_metrics()],
+    ok.
+
 %%--------------------------------------------------------------------
 %% Cases
 %%--------------------------------------------------------------------
 
-t_topic_metrics_list(_) ->
-    ok.
-
 t_topic_metrics_add_remove(_) ->
-    ok.
+    ?assertEqual([], emqx_modules_conf:topic_metrics()),
+    ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic">>)),
+    ?assertEqual([<<"test-topic">>], emqx_modules_conf:topic_metrics()),
+    ?assertEqual(ok, emqx_modules_conf:remove_topic_metrics(<<"test-topic">>)),
+    ?assertEqual([], emqx_modules_conf:topic_metrics()),
+    ?assertMatch({error, _}, emqx_modules_conf:remove_topic_metrics(<<"test-topic">>)).
+
+t_topic_metrics_merge_update(_) ->
+    ?assertEqual([], emqx_modules_conf:topic_metrics()),
+    ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-import1">>)),
+    ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-import2">>)),
+    ImportConf = #{
+        <<"topic_metrics">> =>
+            [
+                #{<<"topic">> => <<"imported_topic1">>},
+                #{<<"topic">> => <<"imported_topic2">>}
+            ]
+    },
+    ?assertMatch({ok, _}, emqx_modules_conf:import_config(ImportConf)),
+    ExpTopics = [
+        <<"test-topic-before-import1">>,
+        <<"test-topic-before-import2">>,
+        <<"imported_topic1">>,
+        <<"imported_topic2">>
+    ],
+    ?assertEqual(ExpTopics, emqx_modules_conf:topic_metrics()).
+
+t_topic_metrics_update(_) ->
+    ?assertEqual([], emqx_modules_conf:topic_metrics()),
+    ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-update1">>)),
+    ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-update2">>)),
+    UpdConf = [#{<<"topic">> => <<"new_topic1">>}, #{<<"topic">> => <<"new_topic2">>}],
+    ?assertMatch({ok, _}, emqx_conf:update([topic_metrics], UpdConf, #{override_to => cluster})),
+    ?assertEqual([<<"new_topic1">>, <<"new_topic2">>], emqx_modules_conf:topic_metrics()).

+ 0 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -642,7 +642,6 @@ status_to_error(_) ->
     {error, undefined}.
 
 %% Compatibility
-external_error({error, {unhealthy_target, Message}}) -> Message;
 external_error({error, Reason}) -> Reason;
 external_error(Other) -> Other.
 

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -18,7 +18,7 @@
 
 -behaviour(gen_server).
 -behaviour(emqx_config_handler).
--behaiour(emqx_config_backup).
+-behaviour(emqx_config_backup).
 
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").

+ 1 - 1
apps/emqx_s3/rebar.config

@@ -1,6 +1,6 @@
 {deps, [
     {emqx, {path, "../../apps/emqx"}},
-    {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}},
+    {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}},
     {emqx_bridge_http, {path, "../emqx_bridge_http"}}
 ]}.
 

+ 3 - 3
bin/emqx

@@ -811,8 +811,8 @@ is_down() {
     PID="$1"
     if ps -p "$PID" >/dev/null; then
         # still around
-        # shellcheck disable=SC2009 # this grep pattern is not a part of the progra names
-        if ps -p "$PID" | $GREP -q 'defunct'; then
+        # shellcheck disable=SC2009 # this grep pattern is not a part of the program names
+        if ps -efp "$PID" | $GREP -q 'defunct'; then
             # zombie state, print parent pid
             parent="$(ps -o ppid= -p "$PID" | tr -d ' ')"
             logwarn "$PID is marked <defunct>, parent: $(ps -p "$parent")"
@@ -974,7 +974,7 @@ maybe_warn_default_cookie() {
 ## using Mnesia DB backend.
 if [[ "$IS_BOOT_COMMAND" == 'yes' && "$(get_boot_config 'node.db_backend')" == "rlog" ]]; then
     if ! (echo -e "$COMPATIBILITY_INFO" | $GREP -q 'MNESIA_OK'); then
-      logerr "DB Backend is RLOG, but an incompatible OTP version has been detected. Falling back to using Mnesia DB backend."
+      logwarn "DB Backend is RLOG, but an incompatible OTP version has been detected. Falling back to using Mnesia DB backend."
       export EMQX_NODE__DB_BACKEND=mnesia
       export EMQX_NODE__DB_ROLE=core
     fi

+ 1 - 0
changes/ce/feat-11124.en.md

@@ -0,0 +1 @@
+Release packages for Amazon Linux 2023

+ 1 - 0
changes/ce/feat-11289.en.md

@@ -0,0 +1 @@
+Release packages for Debian 12.

+ 1 - 0
changes/ce/feat-11290.en.md

@@ -0,0 +1 @@
+Updated `jq` dependency to version 0.3.10 which includes `oniguruma` library update to version 6.9.8 with few minor security fixes.

+ 1 - 0
changes/ce/feat-11291.en.md

@@ -0,0 +1 @@
+Updated RocksDB version to 1.8.0-emqx-1 via ekka update to 0.15.6.

+ 4 - 0
changes/ce/fix-11296.en.md

@@ -0,0 +1,4 @@
+Import additional configurations from EMQX backup file (`emqx ctl import` command):
+ - rule_engine (previously not imported due to the bug)
+ - topic_metrics (previously not implemented)
+ - slow_subs (previously not implemented).

+ 1 - 0
changes/ee/feat-11261.en.md

@@ -0,0 +1 @@
+Implemented Amazon Kinesis Data Streams producer data integration bridge .

+ 1 - 1
deploy/docker/Dockerfile

@@ -1,4 +1,4 @@
-ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-debian11
+ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-debian11
 ARG RUN_FROM=debian:11-slim
 FROM ${BUILD_FROM} AS builder
 

+ 3 - 3
dev

@@ -373,15 +373,15 @@ boot() {
           {:ok, _} = Application.ensure_all_started(:emqx_machine)
         '
         if [ -n "${EPMD_ARGS:-}" ]; then
-            EPMD_ARGS_ELIXIR="--erl $EPMD_ARGS"
+            EPMD_ARGS_ELIXIR="$EPMD_ARGS"
         else
-            EPMD_ARGS_ELIXIR=""
+            EPMD_ARGS_ELIXIR="-no_op true"
         fi
 
         # shellcheck disable=SC2086
         env APPS="$APPS" iex \
           --name "$EMQX_NODE_NAME" \
-          $EPMD_ARGS_ELIXIR \
+          --erl "$EPMD_ARGS_ELIXIR" \
           --erl '-user Elixir.IEx.CLI' \
           --erl '-proto_dist ekka' \
           --vm-args "$ARGS_FILE" \

+ 27 - 6
mix.exs

@@ -54,8 +54,8 @@ defmodule EMQXUmbrella.MixProject do
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
       {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
-      {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true},
-      {:ekka, github: "emqx/ekka", tag: "0.15.5", override: true},
+      {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true},
+      {:ekka, github: "emqx/ekka", tag: "0.15.6", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.11", override: true},
@@ -191,7 +191,8 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_ft,
       :emqx_s3,
       :emqx_schema_registry,
-      :emqx_enterprise
+      :emqx_enterprise,
+      :emqx_bridge_kinesis
     ])
   end
 
@@ -423,7 +424,8 @@ defmodule EMQXUmbrella.MixProject do
           emqx_schema_registry: :permanent,
           emqx_eviction_agent: :permanent,
           emqx_node_rebalance: :permanent,
-          emqx_ft: :permanent
+          emqx_ft: :permanent,
+          emqx_bridge_kinesis: :permanent
         ],
         else: [
           emqx_telemetry: :permanent
@@ -446,13 +448,32 @@ defmodule EMQXUmbrella.MixProject do
 
   def check_profile!() do
     valid_envs = [
-      :dev,
       :emqx,
       :"emqx-pkg",
       :"emqx-enterprise",
       :"emqx-enterprise-pkg"
     ]
 
+    if Mix.env() == :dev do
+      env_profile = System.get_env("PROFILE")
+
+      if env_profile do
+        # copy from PROFILE env var
+        System.get_env("PROFILE")
+        |> String.to_atom()
+        |> Mix.env()
+      else
+        IO.puts(
+          IO.ANSI.format([
+            :yellow,
+            "Warning: env var PROFILE is unset; defaulting to emqx"
+          ])
+        )
+
+        Mix.env(:emqx)
+      end
+    end
+
     if Mix.env() not in valid_envs do
       formatted_envs =
         valid_envs
@@ -823,7 +844,7 @@ defmodule EMQXUmbrella.MixProject do
 
   defp jq_dep() do
     if enable_jq?(),
-      do: [{:jq, github: "emqx/jq", tag: "v0.3.9", override: true}],
+      do: [{:jq, github: "emqx/jq", tag: "v0.3.10", override: true}],
       else: []
   end
 

+ 2 - 2
rebar.config

@@ -61,8 +61,8 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
-    , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}}
+    , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.6"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}}

+ 4 - 2
rebar.config.erl

@@ -42,7 +42,7 @@ quicer() ->
     {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.114"}}}.
 
 jq() ->
-    {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.9"}}}.
+    {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.10"}}}.
 
 deps(Config) ->
     {deps, OldDeps} = lists:keyfind(deps, 1, Config),
@@ -104,6 +104,7 @@ is_community_umbrella_app("apps/emqx_ft") -> false;
 is_community_umbrella_app("apps/emqx_s3") -> false;
 is_community_umbrella_app("apps/emqx_schema_registry") -> false;
 is_community_umbrella_app("apps/emqx_enterprise") -> false;
+is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false;
 is_community_umbrella_app(_) -> true.
 
 is_jq_supported() ->
@@ -491,7 +492,8 @@ relx_apps_per_edition(ee) ->
         emqx_schema_registry,
         emqx_eviction_agent,
         emqx_node_rebalance,
-        emqx_ft
+        emqx_ft,
+        emqx_bridge_kinesis
     ];
 relx_apps_per_edition(ce) ->
     [emqx_telemetry].

+ 85 - 0
rel/i18n/emqx_bridge_kinesis.hocon

@@ -0,0 +1,85 @@
+emqx_bridge_kinesis {
+
+config_enable.desc:
+"""Enable or disable this bridge"""
+
+config_enable.label:
+"""Enable Or Disable Bridge"""
+
+desc_config.desc:
+"""Configuration for an Amazon Kinesis bridge."""
+
+desc_config.label:
+"""Amazon Kinesis Bridge Configuration"""
+
+desc_name.desc:
+"""Bridge name."""
+
+desc_name.label:
+"""Bridge Name"""
+
+desc_type.desc:
+"""The Bridge Type"""
+
+desc_type.label:
+"""Bridge Type"""
+
+pool_size.desc:
+"""The pool size."""
+
+pool_size.label:
+"""Pool Size"""
+
+local_topic.desc:
+"""The MQTT topic filter to be forwarded to Amazon Kinesis. All MQTT `PUBLISH` messages with the topic
+matching the `local_topic` will be forwarded.</br>
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also `local_topic` is
+configured, then both the data got from the rule and the MQTT messages that match `local_topic`
+will be forwarded."""
+
+local_topic.label:
+"""Local Topic"""
+
+payload_template.desc:
+"""The template for formatting the outgoing messages.  If undefined, will send all the available context in JSON format."""
+
+payload_template.label:
+"""Payload template"""
+
+aws_access_key_id.desc:
+"""Access Key ID for connecting to Amazon Kinesis."""
+
+aws_access_key_id.label:
+"""AWS Access Key ID"""
+
+aws_secret_access_key.desc:
+"""AWS Secret Access Key for connecting to Amazon Kinesis."""
+
+aws_secret_access_key.label:
+"""AWS Secret Access Key"""
+
+endpoint.desc:
+"""The url of Amazon Kinesis endpoint."""
+
+endpoint.label:
+"""Amazon Kinesis Endpoint"""
+
+stream_name.desc:
+"""The Amazon Kinesis Stream to publish messages to."""
+
+stream_name.label:
+"""Amazon Kinesis Stream"""
+
+partition_key.desc:
+"""The Amazon Kinesis Partition Key associated to published message. Placeholders in format of ${var} are supported."""
+
+partition_key.label:
+"""Partition key"""
+
+max_retries.desc:
+"""Max retry times if an error occurs when sending a request."""
+
+max_retries.label:
+"""Max Retries"""
+
+}

+ 2 - 2
scripts/buildx.sh

@@ -9,7 +9,7 @@
 
 ## example:
 ## ./scripts/buildx.sh --profile emqx --pkgtype tgz --arch arm64 \
-##     --builder ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-debian11
+##     --builder ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-debian11
 
 set -euo pipefail
 
@@ -24,7 +24,7 @@ help() {
     echo "--arch amd64|arm64:        Target arch to build the EMQX package for"
     echo "--src_dir <SRC_DIR>:       EMQX source code in this dir, default to PWD"
     echo "--builder <BUILDER>:       Builder image to pull"
-    echo "                           E.g. ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-debian11"
+    echo "                           E.g. ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-debian11"
 }
 
 die() {

+ 78 - 0
scripts/check_missing_reboot_apps.exs

@@ -0,0 +1,78 @@
+#!/usr/bin/env elixir
+
+alias EMQXUmbrella.MixProject
+
+{:ok, _} = Application.ensure_all_started(:mix)
+# note: run from the project root
+File.cwd!()
+|> Path.join("mix.exs")
+|> Code.compile_file()
+
+inputs = MixProject.check_profile!()
+profile = Mix.env()
+
+# need to use this information because we might have compiled all
+# applications in the test profile, and thus filter what's in the
+# release lib directory.
+rel_apps = MixProject.applications(inputs.edition_type)
+
+apps =
+  rel_apps
+  |> Keyword.keys()
+  |> Enum.filter(&(to_string(&1) =~ "emqx"))
+  |> Enum.reject(&(&1 in [:emqx_mix]))
+
+:xref.start(:xref)
+:xref.set_default(:xref, warnings: false)
+rel_dir = '_build/#{profile}/lib/'
+:xref.add_release(:xref, rel_dir)
+
+{:ok, calls} = :xref.q(:xref, '(App) (XC | [#{Enum.join(apps, ",")}] || mria:create_table/_)')
+
+emqx_calls =
+  calls
+  |> Enum.map(&elem(&1, 0))
+  |> Enum.filter(&(to_string(&1) =~ "emqx_"))
+  |> MapSet.new()
+
+Path.wildcard(rel_dir ++ "*/ebin")
+|> Enum.each(fn dir ->
+  dir
+  |> to_charlist()
+  |> :code.add_pathz()
+end)
+
+Path.wildcard(rel_dir ++ "*")
+|> Enum.map(fn dir ->
+  dir
+  |> Path.basename()
+  |> String.to_atom()
+  |> Application.load()
+end)
+
+reboot_apps = :emqx_machine_boot.sorted_reboot_apps() |> MapSet.new()
+
+missing_reboot_apps = MapSet.difference(emqx_calls, reboot_apps)
+
+if MapSet.size(missing_reboot_apps) != 0 do
+  IO.puts(
+    :stderr,
+    IO.ANSI.format([
+      :red,
+      "Some applications are missing from `emqx_machine_boot:sorted_reboot_apps/0`!\n",
+      "Missing applications:\n",
+      Enum.map(missing_reboot_apps, fn app ->
+        "  * #{app}\n"
+      end),
+      "\n",
+      :green,
+      "Hint: maybe add them to `emqx_machine_boot:basic_reboot_apps_edition/1`\n",
+      "\n",
+      :yellow,
+      "Applications that call `mria:create_table` need to be added to that list;\n",
+      " otherwise, when a node joins a cluster, it might lose tables.\n"
+    ])
+  )
+
+  System.halt(1)
+end

+ 3 - 0
scripts/ct/run.sh

@@ -219,6 +219,9 @@ for dep in ${CT_DEPS}; do
         hstreamdb)
             FILES+=( '.ci/docker-compose-file/docker-compose-hstreamdb.yaml' )
             ;;
+        kinesis)
+            FILES+=( '.ci/docker-compose-file/docker-compose-kinesis.yaml' )
+            ;;
         *)
             echo "unknown_ct_dependency $dep"
             exit 1

+ 1 - 1
scripts/macos-sign-binaries.sh

@@ -57,7 +57,7 @@ codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=r
 codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime \
          "${REL_DIR}"/lib/os_mon-*/priv/bin/{cpu_sup,memsup}
 codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime \
-         "${REL_DIR}"/lib/jq-*/priv/{jq_nif1.so,libjq.1.dylib,libonig.4.dylib,erlang_jq_port}
+         "${REL_DIR}"/lib/jq-*/priv/{jq_nif1.so,libjq.1.dylib,libonig.5.dylib,erlang_jq_port}
 # other files from runtime and dependencies
 for f in \
         asn1rt_nif.so \

+ 41 - 38
scripts/pkg-tests.sh

@@ -46,6 +46,7 @@ export SCRIPTS="${CODE_PATH}/scripts"
 export EMQX_NAME
 export PACKAGE_PATH="${CODE_PATH}/_packages/${EMQX_NAME}"
 export RELUP_PACKAGE_PATH="${CODE_PATH}/_upgrade_base"
+export PAHO_MQTT_TESTING_PATH="${PAHO_MQTT_TESTING_PATH:-/paho-mqtt-testing}"
 
 SYSTEM="$("$SCRIPTS"/get-distro.sh)"
 
@@ -64,7 +65,7 @@ fi
 PACKAGE_VERSION="$("$CODE_PATH"/pkg-vsn.sh "${EMQX_NAME}")"
 PACKAGE_VERSION_LONG="$("$CODE_PATH"/pkg-vsn.sh "${EMQX_NAME}" --long --elixir "${IS_ELIXIR}")"
 PACKAGE_NAME="${EMQX_NAME}-${PACKAGE_VERSION_LONG}"
-PACKAGE_FILE_NAME="${PACKAGE_NAME}.${PKG_SUFFIX}"
+PACKAGE_FILE_NAME="${PACKAGE_FILE_NAME:-${PACKAGE_NAME}.${PKG_SUFFIX}}"
 
 PACKAGE_FILE="${PACKAGE_PATH}/${PACKAGE_FILE_NAME}"
 if ! [ -f "$PACKAGE_FILE" ]; then
@@ -75,9 +76,21 @@ fi
 emqx_prepare(){
     mkdir -p "${PACKAGE_PATH}"
 
-    if [ ! -d "/paho-mqtt-testing" ]; then
-        git clone -b develop-4.0 https://github.com/emqx/paho.mqtt.testing.git /paho-mqtt-testing
+    if [ ! -d "${PAHO_MQTT_TESTING_PATH}" ]; then
+        git clone -b develop-4.0 https://github.com/emqx/paho.mqtt.testing.git "${PAHO_MQTT_TESTING_PATH}"
     fi
+    # Debian 12 complains if we don't use venv
+    case "${SYSTEM:-}" in
+        debian12)
+            apt-get update -y && apt-get install -y virtualenv
+            virtualenv venv
+            # https://www.shellcheck.net/wiki/SC1091
+            # shellcheck source=/dev/null
+            source ./venv/bin/activate
+            ;;
+        *)
+            ;;
+    esac
     pip3 install pytest
 }
 
@@ -97,36 +110,22 @@ emqx_test(){
             # fi
             # sed -i '/emqx_telemetry/d' "${PACKAGE_PATH}"/emqx/data/loaded_plugins
 
-            echo "running ${packagename} start"
-            if ! "${PACKAGE_PATH}"/emqx/bin/emqx start; then
-                cat "${PACKAGE_PATH}"/emqx/log/erlang.log.1 || true
-                cat "${PACKAGE_PATH}"/emqx/log/emqx.log.1 || true
-                exit 1
-            fi
-            "$SCRIPTS/test/emqx-smoke-test.sh" 127.0.0.1 18083
-            pytest -v /paho-mqtt-testing/interoperability/test_client/V5/test_connect.py::test_basic
-            if ! "${PACKAGE_PATH}"/emqx/bin/emqx stop; then
-                cat "${PACKAGE_PATH}"/emqx/log/erlang.log.1 || true
-                cat "${PACKAGE_PATH}"/emqx/log/emqx.log.1 || true
-                exit 1
-            fi
-            echo "running ${packagename} stop"
+            run_test "${PACKAGE_PATH}/emqx/bin" "${PACKAGE_PATH}/emqx/log" "${PACKAGE_PATH}/emqx/releases/emqx_vars"
+
             rm -rf "${PACKAGE_PATH}"/emqx
         ;;
         "deb")
             dpkg -i "${PACKAGE_PATH}/${packagename}"
-            if [ "$(dpkg -l |grep emqx |awk '{print $1}')" != "ii" ]
+            if [ "$(dpkg -l | grep ${EMQX_NAME} | awk '{print $1}')" != "ii" ]
             then
                 echo "package install error"
                 exit 1
             fi
 
-            echo "running ${packagename} start"
-            run_test
-            echo "running ${packagename} stop"
+            run_test "/usr/bin" "/var/log/emqx" "$(dpkg -L ${EMQX_NAME} | grep emqx_vars)"
 
             dpkg -r "${EMQX_NAME}"
-            if [ "$(dpkg -l |grep emqx |awk '{print $1}')" != "rc" ]
+            if [ "$(dpkg -l | grep ${EMQX_NAME} | awk '{print $1}')" != "rc" ]
             then
                 echo "package remove error"
                 exit 1
@@ -146,6 +145,10 @@ emqx_test(){
                     # el8 is fine with python3
                     true
                     ;;
+                "el9")
+                    # el9 is fine with python3
+                    true
+                    ;;
                 *)
                     alternatives --list | grep python && alternatives --set python /usr/bin/python2
                     ;;
@@ -161,12 +164,10 @@ emqx_test(){
                 exit 1
             fi
 
-            echo "running ${packagename} start"
-            run_test
-            echo "running ${packagename} stop"
+            run_test "/usr/bin" "/var/log/emqx" "$(rpm -ql ${EMQX_NAME} | grep emqx_vars)"
 
             rpm -e "${EMQX_NAME}"
-            if [ "$(rpm -q emqx)" != "package emqx is not installed" ];then
+            if [ "$(rpm -q ${EMQX_NAME})" != "package ${EMQX_NAME} is not installed" ];then
                 echo "package uninstall error"
                 exit 1
             fi
@@ -175,8 +176,10 @@ emqx_test(){
 }
 
 run_test(){
+    local bin_dir="$1"
+    local log_dir="$2"
+    local emqx_env_vars="$3"
     # sed -i '/emqx_telemetry/d' /var/lib/emqx/loaded_plugins
-    emqx_env_vars=$(dirname "$(readlink "$(command -v emqx)")")/../releases/emqx_vars
 
     if [ -f "$emqx_env_vars" ];
     then
@@ -194,21 +197,21 @@ EOF
         echo "Error: cannot locate emqx_vars"
         exit 1
     fi
-    if ! emqx 'start'; then
-        cat /var/log/emqx/erlang.log.1 || true
-        cat /var/log/emqx/emqx.log.1 || true
+    echo "running ${packagename} start"
+    if ! "${bin_dir}/emqx" 'start'; then
+        echo "ERROR: failed_to_start_emqx"
+        cat "${log_dir}/erlang.log.1" || true
+        cat "${log_dir}/emqx.log.1" || true
         exit 1
     fi
     "$SCRIPTS/test/emqx-smoke-test.sh" 127.0.0.1 18083
-    pytest -v /paho-mqtt-testing/interoperability/test_client/V5/test_connect.py::test_basic
-    # shellcheck disable=SC2009 # pgrep does not support Extended Regular Expressions
-    ps -ef | grep -E '\-progname\s.+emqx\s'
-    if ! emqx 'stop'; then
-        # shellcheck disable=SC2009 # pgrep does not support Extended Regular Expressions
-        ps -ef | grep -E '\-progname\s.+emqx\s'
+    pytest -v "${PAHO_MQTT_TESTING_PATH}"/interoperability/test_client/V5/test_connect.py::test_basic
+    "${bin_dir}/emqx" ping
+    echo "running ${packagename} stop"
+    if ! "${bin_dir}/emqx" 'stop'; then
         echo "ERROR: failed_to_stop_emqx_with_the_stop_command"
-        cat /var/log/emqx/erlang.log.1 || true
-        cat /var/log/emqx/emqx.log.1 || true
+        cat "${log_dir}/erlang.log.1" || true
+        cat "${log_dir}/emqx.log.1" || true
         exit 1
     fi
 }

+ 1 - 1
scripts/relup-test/start-relup-test-cluster.sh

@@ -22,7 +22,7 @@ WEBHOOK="webhook.$NET"
 BENCH="bench.$NET"
 COOKIE='this-is-a-secret'
 ## Erlang image is needed to run webhook server and emqtt-bench
-ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu20.04"
+ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.1-3:1.14.5-25.3.2-1-ubuntu20.04"
 # builder has emqtt-bench installed
 BENCH_IMAGE="$ERLANG_IMAGE"