فهرست منبع

Merge pull request #13997 from id/20241015-sync-release-58

sync release-58
Ivan Dyachkov 1 سال پیش
والد
کامیت
e1b6b7208d
81فایلهای تغییر یافته به همراه1250 افزوده شده و 570 حذف شده
  1. 9 0
      .github/workflows/_push-entrypoint.yaml
  2. 179 296
      .github/workflows/performance_test.yaml
  3. 2 2
      Makefile
  4. 2 2
      apps/emqx/include/emqx_release.hrl
  5. 1 1
      apps/emqx/src/emqx.app.src
  6. 4 5
      apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl
  7. 1 1
      apps/emqx_audit/src/emqx_audit.app.src
  8. 18 18
      apps/emqx_audit/src/emqx_audit.erl
  9. 1 1
      apps/emqx_auth/src/emqx_auth.app.src
  10. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  11. 12 1
      apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl
  12. 1 1
      apps/emqx_bridge_azure_event_hub/rebar.config
  13. 1 1
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src
  14. 1 0
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl
  15. 27 12
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl
  16. 20 1
      apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl
  17. 1 1
      apps/emqx_bridge_confluent/rebar.config
  18. 1 1
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src
  19. 2 1
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl
  20. 5 4
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl
  21. 48 38
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl
  22. 43 3
      apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl
  23. 6 2
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl
  24. 1 1
      apps/emqx_bridge_kafka/rebar.config
  25. 6 0
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  26. 2 1
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl
  27. 3 2
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl
  28. 27 0
      apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl
  29. 140 0
      apps/emqx_bridge_snowflake/docs/dev-quick-ref.md
  30. 21 0
      apps/emqx_bridge_snowflake/docs/user-guide.md
  31. 117 15
      apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl
  32. 44 15
      apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl
  33. 4 2
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl
  34. 29 15
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl
  35. 27 0
      apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl
  36. 1 1
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src
  37. 3 1
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl
  38. 18 11
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl
  39. 56 14
      apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl
  40. 1 1
      apps/emqx_cluster_link/src/emqx_cluster_link.app.src
  41. 6 1
      apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl
  42. 1 1
      apps/emqx_dashboard/src/emqx_dashboard.app.src
  43. 1 1
      apps/emqx_dashboard/src/emqx_dashboard.erl
  44. 1 0
      apps/emqx_dashboard/src/emqx_dashboard_api.erl
  45. 67 32
      apps/emqx_dashboard/src/emqx_dashboard_audit.erl
  46. 1 1
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src
  47. 2 0
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl
  48. 4 1
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_api.erl
  49. 1 1
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml.erl
  50. 3 1
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml_api.erl
  51. 1 1
      apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.app.src
  52. 1 1
      apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft.app.src
  53. 1 1
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.app.src
  54. 31 9
      apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl
  55. 19 3
      apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl
  56. 1 1
      apps/emqx_durable_storage/src/emqx_durable_storage.app.src
  57. 1 1
      apps/emqx_gateway/src/emqx_gateway.app.src
  58. 1 1
      apps/emqx_machine/src/emqx_machine.app.src
  59. 1 1
      apps/emqx_management/src/emqx_management.app.src
  60. 10 5
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  61. 4 2
      apps/emqx_management/src/emqx_mgmt_api_publish.erl
  62. 17 11
      apps/emqx_mysql/src/emqx_mysql.erl
  63. 14 8
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl
  64. 1 1
      apps/emqx_retainer/src/emqx_retainer.app.src
  65. 1 1
      apps/emqx_rule_engine/src/emqx_rule_engine.app.src
  66. 2 2
      apps/emqx_rule_engine/src/emqx_rule_funcs.erl
  67. 31 2
      apps/emqx_utils/src/emqx_placeholder.erl
  68. 1 1
      apps/emqx_utils/src/emqx_utils.app.src
  69. 10 0
      changes/ee/feat-13861.en.md
  70. 0 0
      changes/ee/fix-13881.md
  71. 7 0
      changes/ee/fix-13963.en.md
  72. 1 0
      changes/ee/fix-13964.md
  73. 2 0
      changes/ee/fix-13965.en.md
  74. 3 0
      changes/ee/fix-13971.en.md
  75. 1 0
      changes/ee/fix-13973.en.md
  76. 100 0
      changes/v5.8.1.en.md
  77. 2 2
      deploy/charts/emqx-enterprise/Chart.yaml
  78. 2 2
      deploy/charts/emqx/Chart.yaml
  79. 2 2
      mix.exs
  80. 1 1
      rebar.config
  81. 8 0
      rel/i18n/emqx_bridge_v2_schema.hocon

+ 9 - 0
.github/workflows/_push-entrypoint.yaml

@@ -105,6 +105,15 @@ jobs:
       builder_vsn: ${{ needs.init.outputs.BUILDER_VSN }}
     secrets: inherit
 
+  performance_test:
+    if: needs.prepare.outputs.release == 'true'
+    needs:
+      - init
+      - prepare
+      - build_packages
+    uses: ./.github/workflows/performance_test.yaml
+    secrets: inherit
+
   build_and_push_docker_images:
     if: needs.prepare.outputs.release == 'true'
     needs:

+ 179 - 296
.github/workflows/performance_test.yaml

@@ -1,68 +1,40 @@
-name: Performance Test Suite
+name: Performance Test
 
 on:
-  push:
-    branches:
-      - 'perf/**'
-  schedule:
-    - cron:  '0 1 * * MON-FRI'
+  workflow_call:
+    secrets:
+      AWS_ACCESS_KEY_PERF_TEST:
+        required: true
+      AWS_SECRET_ACCESS_KEY_PERF_TEST:
+        required: true
+      AWS_DEFAULT_REGION_PERF_TEST:
+        required: true
+      SLACK_BOT_TOKEN:
+        required: true
+      SLACK_PERFTEST_CHANNEL_ID:
+        required: true
+      EMQX_ENTERPRISE_LICENSE:
+        required: true
   workflow_dispatch:
     inputs:
-      ref:
+      emqx_version:
         required: false
-
-env:
-  TF_AWS_REGION: eu-west-1
-  TF_VAR_s3_bucket_name: tf-emqx-performance-test2
-  TF_VAR_test_duration: 1800
-  TF_VAR_prometheus_remote_write_region: eu-west-1
-  TF_VAR_prometheus_remote_write_url: ${{ secrets.TF_EMQX_PERF_TEST_PROMETHEUS_REMOTE_WRITE_URL }}
-  SLACK_WEBHOOK_URL: ${{ secrets.TF_EMQX_PERF_TEST_SLACK_URL }}
+        default: '5.8.0'
 
 permissions:
   contents: read
 
 jobs:
-  prepare:
+  perftest:
     runs-on: ubuntu-latest
-    if: github.repository_owner == 'emqx'
-    container: ghcr.io/emqx/emqx-builder/5.3-13:1.15.7-26.2.5-1-ubuntu20.04
-    outputs:
-      BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
-      PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}
-
-    steps:
-    - uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
-      with:
-        fetch-depth: 0
-        ref: ${{ github.event.inputs.ref }}
-    - name: Work around https://github.com/actions/checkout/issues/766
-      run: |
-        git config --global --add safe.directory "$GITHUB_WORKSPACE"
-    - id: prepare
-      run: |
-        echo "EMQX_NAME=emqx" >> $GITHUB_ENV
-        echo "CODE_PATH=$GITHUB_WORKSPACE" >> $GITHUB_ENV
-        echo "BENCH_ID=$(date --utc +%F)/emqx-$(./pkg-vsn.sh emqx)" >> $GITHUB_OUTPUT
-    - name: Build deb package
-      run: |
-        make ${EMQX_NAME}-pkg
-        ./scripts/pkg-tests.sh ${EMQX_NAME}-pkg
-    - name: Get package file name
-      id: package_file
-      run: |
-        echo "PACKAGE_FILE=$(find _packages/emqx -name 'emqx-*.deb' | head -n 1 | xargs basename)" >> $GITHUB_OUTPUT
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
-      with:
-        name: emqx-ubuntu20.04
-        path: _packages/emqx/${{ steps.package_file.outputs.PACKAGE_FILE }}
-
-  scenario_1on1:
-    runs-on: ubuntu-latest
-    needs:
-      - prepare
-    env:
-      TF_VAR_package_file: ${{ needs.prepare.outputs.PACKAGE_FILE }}
+    strategy:
+      max-parallel: 1
+      matrix:
+        scenario:
+          - tests/ci/pubsub-2x2c4g-10k-20k-tps
+    defaults:
+      run:
+        shell: bash
 
     steps:
     - name: Configure AWS Credentials
@@ -70,276 +42,187 @@ jobs:
       with:
         aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_PERF_TEST }}
         aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PERF_TEST }}
-        aws-region: eu-west-1
+        aws-region: ${{ secrets.AWS_DEFAULT_REGION_PERF_TEST }}
     - name: Checkout tf-emqx-performance-test
       uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
       with:
         repository: emqx/tf-emqx-performance-test
-        path: tf-emqx-performance-test
-        ref: v0.2.3
-    - uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
-      with:
-        name: emqx-ubuntu20.04
-        path: tf-emqx-performance-test/
+        ref: v0.3.2
     - name: Setup Terraform
       uses: hashicorp/setup-terraform@b9cd54a3c349d3f38e8881555d616ced269862dd # v3.1.2
       with:
+        terraform_version: 1.6.4
         terraform_wrapper: false
-    - name: run scenario
-      working-directory: ./tf-emqx-performance-test
-      timeout-minutes: 60
-      env:
-        TF_VAR_bench_id: "${{ needs.prepare.outputs.BENCH_ID }}/1on1"
-        TF_VAR_use_emqttb: 1
-        TF_VAR_use_emqtt_bench: 0
-        TF_VAR_emqttb_instance_count: 1
-        TF_VAR_emqttb_instance_type: c5.2xlarge
-        TF_VAR_emqttb_scenario: '@pubsub_fwd -n 50_000 --pub-qos 1 --sub-qos 1'
-        TF_VAR_emqx_instance_type: c5.2xlarge
-        TF_VAR_emqx_instance_count: 3
-      run: |
-        terraform init
-        terraform apply -auto-approve
-        ./wait-emqttb.sh
-        ./fetch-metrics.sh
-        terraform destroy -auto-approve
-        aws s3 sync --exclude '*' --include '*.tar.gz' s3://$TF_VAR_s3_bucket_name/$TF_VAR_bench_id .
-    - name: Send notification to Slack
-      uses: slackapi/slack-github-action@37ebaef184d7626c5f204ab8d3baff4262dd30f0 # v1.27.0
-      with:
-        payload-file-path: "./tf-emqx-performance-test/slack-payload.json"
-    - name: terraform destroy
-      if: always()
-      working-directory: ./tf-emqx-performance-test
-      run: |
-        terraform destroy -auto-approve
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
-      if: success()
-      with:
-        name: metrics
-        path: |
-          "./tf-emqx-performance-test/*.tar.gz"
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
-      if: failure()
-      with:
-        name: terraform
-        path: |
-          ./tf-emqx-performance-test/.terraform
-          ./tf-emqx-performance-test/*.tfstate
-
-  scenario_fanout:
-    runs-on: ubuntu-latest
-    needs:
-      - prepare
-      - scenario_1on1
-    env:
-      TF_VAR_package_file: ${{ needs.prepare.outputs.PACKAGE_FILE }}
-
-    steps:
-    - name: Configure AWS Credentials
-      uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
+    - uses: actions/setup-python@v5
       with:
-        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_PERF_TEST }}
-        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PERF_TEST }}
-        aws-region: eu-west-1
-    - name: Checkout tf-emqx-performance-test
-      uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
-      with:
-        repository: emqx/tf-emqx-performance-test
-        path: tf-emqx-performance-test
-        ref: v0.2.3
+        python-version: '3.11'
+    - run: pip install -r requirements.txt
     - uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
+      if: github.event_name != 'workflow_dispatch'
       with:
-        name: emqx-ubuntu20.04
-        path: tf-emqx-performance-test/
-    - name: Setup Terraform
-      uses: hashicorp/setup-terraform@b9cd54a3c349d3f38e8881555d616ced269862dd # v3.1.2
-      with:
-        terraform_wrapper: false
-    - name: run scenario
-      working-directory: ./tf-emqx-performance-test
-      timeout-minutes: 60
-      env:
-        TF_VAR_bench_id: "${{ needs.prepare.outputs.BENCH_ID }}/fan-out"
-        TF_VAR_use_emqttb: 1
-        TF_VAR_use_emqtt_bench: 0
-        TF_VAR_emqttb_instance_count: 1
-        TF_VAR_emqttb_instance_type: c5.2xlarge
-        TF_VAR_emqttb_scenario: '@pub --topic "t/%n" --conninterval 10ms --pubinterval 20ms --num-clients 5 --size 16 @sub --topic "t/#" --conninterval 10ms --num-clients 1000'
-        TF_VAR_emqx_instance_type: c5.large
-        TF_VAR_emqx_instance_count: 3
+        pattern: "emqx-enterprise-ubuntu22.04-amd64-*"
+    - name: Download emqx package
+      if: github.event_name == 'workflow_dispatch'
       run: |
-        terraform init
-        terraform apply -auto-approve
-        ./wait-emqttb.sh
-        ./fetch-metrics.sh
-        terraform destroy -auto-approve
-        aws s3 sync --exclude '*' --include '*.tar.gz' s3://$TF_VAR_s3_bucket_name/$TF_VAR_bench_id .
-    - name: Send notification to Slack
-      uses: slackapi/slack-github-action@37ebaef184d7626c5f204ab8d3baff4262dd30f0 # v1.27.0
-      with:
-        payload-file-path: "./tf-emqx-performance-test/slack-payload.json"
-    - name: terraform destroy
-      if: always()
-      working-directory: ./tf-emqx-performance-test
+        version=${{ github.event.inputs.emqx_version }}
+        wget https://www.emqx.com/en/downloads/enterprise/${version}/emqx-enterprise-${version}-ubuntu22.04-amd64.deb
+
+    - name: Create infrastructure
+      id: infra
+      timeout-minutes: 30
       run: |
-        terraform destroy -auto-approve
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
-      if: success()
-      with:
-        name: metrics
-        path: |
-          "./tf-emqx-performance-test/*.tar.gz"
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
-      if: failure()
-      with:
-        name: terraform
-        path: |
-          ./tf-emqx-performance-test/.terraform
-          ./tf-emqx-performance-test/*.tfstate
+        mv emqx-enterprise-*.deb emqx-enterprise-ubuntu22.04-amd64.deb
+        ls -lh *.deb
 
-  scenario_fanin:
-    runs-on: ubuntu-latest
-    needs:
-      - prepare
-      - scenario_1on1
-      - scenario_fanout
-    env:
-      TF_VAR_package_file: ${{ needs.prepare.outputs.PACKAGE_FILE }}
+        echo "${{ secrets.EMQX_ENTERPRISE_LICENSE }}" > emqx5.lic
+        cat ${{ matrix.scenario }}.env >> "$GITHUB_ENV"
+        echo '{}' > slack-payload.json
 
-    steps:
-    - name: Configure AWS Credentials
-      uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
-      with:
-        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_PERF_TEST }}
-        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PERF_TEST }}
-        aws-region: eu-west-1
-    - name: Checkout tf-emqx-performance-test
-      uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
-      with:
-        repository: emqx/tf-emqx-performance-test
-        path: tf-emqx-performance-test
-        ref: v0.2.3
-    - uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
-      with:
-        name: emqx-ubuntu20.04
-        path: tf-emqx-performance-test/
-    - name: Setup Terraform
-      uses: hashicorp/setup-terraform@b9cd54a3c349d3f38e8881555d616ced269862dd # v3.1.2
-      with:
-        terraform_wrapper: false
-    - name: run scenario
-      working-directory: ./tf-emqx-performance-test
-      timeout-minutes: 60
-      env:
-        TF_VAR_bench_id: "${{ needs.prepare.outputs.BENCH_ID }}/fan-in"
-        TF_VAR_use_emqttb: 1
-        TF_VAR_use_emqtt_bench: 0
-        TF_VAR_emqttb_instance_count: 2
-        TF_VAR_emqttb_start_n_multiplier: 25000
-        TF_VAR_emqttb_instance_type: c5.xlarge
-        TF_VAR_emqttb_scenario: '@pub --topic t/%n --conninterval 10ms --pubinterval 1s --num-clients 25_000 --start-n $START_N --size 16 @sub --topic \$share/perf/t/# --conninterval 10ms --num-clients 250'
-        TF_VAR_emqx_instance_type: c5.2xlarge
-        TF_VAR_emqx_instance_count: 3
-      run: |
         terraform init
-        terraform apply -auto-approve
-        ./wait-emqttb.sh
-        ./fetch-metrics.sh
-        terraform destroy -auto-approve
-        aws s3 sync --exclude '*' --include '*.tar.gz' s3://$TF_VAR_s3_bucket_name/$TF_VAR_bench_id .
-    - name: Send notification to Slack
-      uses: slackapi/slack-github-action@37ebaef184d7626c5f204ab8d3baff4262dd30f0 # v1.27.0
-      with:
-        payload-file-path: "./tf-emqx-performance-test/slack-payload.json"
-    - name: terraform destroy
-      if: always()
-      working-directory: ./tf-emqx-performance-test
-      run: |
-        terraform destroy -auto-approve
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
+        set +e
+        terraform apply -var spec_file=${{ matrix.scenario }}.yaml -auto-approve -lock=false
+        # retry once
+        if [ $? != 0 ]; then
+          echo "Retrying once"
+          set -e
+          terraform apply -var spec_file=${{ matrix.scenario }}.yaml -auto-approve -lock=false
+        fi
+        set -e
+        echo "ssh_key_path=$(terraform output -raw ssh_key_path)" >> $GITHUB_OUTPUT
+
+    - uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
       if: success()
       with:
-        name: metrics
+        name: ssh_private_key
         path: |
-          "./tf-emqx-performance-test/*.tar.gz"
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
-      if: failure()
-      with:
-        name: terraform
-        path: |
-          ./tf-emqx-performance-test/.terraform
-          ./tf-emqx-performance-test/*.tfstate
+          ${{ steps.infra.outputs.ssh_key_path }}
 
-  scenario_1m_conns:
-    runs-on: ubuntu-latest
-    needs:
-      - prepare
-      - scenario_fanin
-      - scenario_fanout
-      - scenario_1on1
-    env:
-      TF_VAR_package_file: ${{ needs.prepare.outputs.PACKAGE_FILE }}
+    - name: Report failure
+      if: failure()
+      run: |
+        jq -n '[{"color": "#ff0000", "fields": [{"title": "Infrastructure creation failed", "short": false}]}]' > attachments.json
+        jq -n --argjson attachments "$(<attachments.json)" '{"attachments": $attachments}' > slack-payload.json
 
-    steps:
-    - name: Configure AWS Credentials
-      uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
-      with:
-        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_PERF_TEST }}
-        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PERF_TEST }}
-        aws-region: eu-west-1
-    - name: Checkout tf-emqx-performance-test
-      uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
-      with:
-        repository: emqx/tf-emqx-performance-test
-        path: tf-emqx-performance-test
-        ref: v0.2.3
-    - uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
-      with:
-        name: emqx-ubuntu20.04
-        path: tf-emqx-performance-test/
-    - name: Setup Terraform
-      uses: hashicorp/setup-terraform@b9cd54a3c349d3f38e8881555d616ced269862dd # v3.1.2
-      with:
-        terraform_wrapper: false
-    - name: run scenario
-      working-directory: ./tf-emqx-performance-test
+    - name: Run benchmark
+      if: success()
+      id: benchmark
       timeout-minutes: 60
-      env:
-        TF_VAR_bench_id: "${{ needs.prepare.outputs.BENCH_ID }}/1m-connections"
-        TF_VAR_use_emqttb: 1
-        TF_VAR_use_emqtt_bench: 0
-        TF_VAR_emqttb_instance_count: 5
-        TF_VAR_emqttb_instance_type: c5.2xlarge
-        TF_VAR_emqttb_scenario: '@conn -N 200_000 --conninterval 1ms'
-        TF_VAR_emqx_instance_type: c5.2xlarge
-        TF_VAR_emqx_instance_count: 3
       run: |
-        terraform init
-        terraform apply -auto-approve
-        ./wait-emqttb.sh
-        ./fetch-metrics.sh
-        terraform destroy -auto-approve
-        aws s3 sync --exclude '*' --include '*.tar.gz' s3://$TF_VAR_s3_bucket_name/$TF_VAR_bench_id .
-    - name: Send notification to Slack
-      uses: slackapi/slack-github-action@37ebaef184d7626c5f204ab8d3baff4262dd30f0 # v1.27.0
-      with:
-        payload-file-path: "./tf-emqx-performance-test/slack-payload.json"
-    - name: terraform destroy
+        success=0
+
+        export TMPDIR=$(mktemp -d)
+        echo "TMPDIR=$TMPDIR" >> $GITHUB_ENV
+        echo '[]' > attachments.json
+
+        PERIOD=1m scripts/summary.sh
+
+        MEM_CORE_1=$(jq -r '.[] | select(.host == "emqx-core-1") | .mem' $TMPDIR/mem.json)
+        MEM_CORE_2=$(jq -r '.[] | select(.host == "emqx-core-2") | .mem' $TMPDIR/mem.json)
+
+        if [ $(echo "$MEM_CORE_1 > $INITIAL_RAM_BASELINE * (1 + $ALLOWED_DEVIATION_CPU_RAM)" | bc -l) -eq 1 ] \
+        || [ $(echo "$MEM_CORE_2 > $INITIAL_RAM_BASELINE * (1 + $ALLOWED_DEVIATION_CPU_RAM)" | bc -l) -eq 1 ]; then
+          success=1
+          jq --arg mem1 "$MEM_CORE_1" --arg mem2 "$MEM_CORE_2" '. += [{"color": "#ff0000", "fields": [{"title": "Initial RAM usage is too high", "short": false, "value": "Core 1: \($mem1)%\nCore 2: \($mem2)%"}]}]' \
+            attachments.json 1<> attachments.json
+        fi
+
+        EMQX_API_URL=$(terraform output -raw emqx_dashboard_url)
+        ansible loadgen -m command -a 'systemctl start loadgen' --become --limit 'loadgen-emqtt_bench-1.*'
+        echo "Waiting for subscribers to connect"
+        subs=0
+        while [ $subs -lt 10000 ]; do
+          curl -s -u perftest:perftest "$EMQX_API_URL/api/v5/monitor_current" > "$TMPDIR/monitor_current.json"
+          subs=$(jq -r '.subscriptions' "$TMPDIR/monitor_current.json")
+          sleep 1
+        done
+        ansible loadgen -m command -a 'systemctl start loadgen' --become --limit 'loadgen-emqtt_bench-2.*'
+        echo "Waiting for publishers to connect"
+        conns=$(jq -r '.live_connections' "$TMPDIR/monitor_current.json")
+        while [ $conns -lt 20000 ]; do
+          curl -s -u perftest:perftest "$EMQX_API_URL/api/v5/monitor_current" > "$TMPDIR/monitor_current.json"
+          conns=$(jq -r '.live_connections' "$TMPDIR/monitor_current.json")
+          sleep 1
+        done
+        echo "All clients connected, sleep for $DURATION seconds"
+        sleep $DURATION
+        PERIOD="${DURATION}s" scripts/summary.sh | tee -a $GITHUB_STEP_SUMMARY
+
+        echo "success=$success" >> $GITHUB_OUTPUT
+
+    - name: Cleanup infrastructure
       if: always()
-      working-directory: ./tf-emqx-performance-test
       run: |
-        terraform destroy -auto-approve
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
+        terraform destroy -var spec_file=${{ matrix.scenario }}.yaml -auto-approve
+
+    - name: Analyze results
       if: success()
+      run: |
+        success=${{ steps.benchmark.outputs.success }}
+
+        echo "## Test results analysis" >> $GITHUB_STEP_SUMMARY
+        echo '' >> $GITHUB_STEP_SUMMARY
+
+        CPU_CORE_1=$(jq -r '.[] | select(.host == "emqx-core-1") | .cpu' $TMPDIR/cpu.json)
+        CPU_CORE_2=$(jq -r '.[] | select(.host == "emqx-core-2") | .cpu' $TMPDIR/cpu.json)
+
+        if [ $(echo "$CPU_CORE_1 > $CPU_BASELINE * (1 + $ALLOWED_DEVIATION_CPU_RAM)" | bc -l) -eq 1 ] \
+        || [ $(echo "$CPU_CORE_2 > $CPU_BASELINE * (1 + $ALLOWED_DEVIATION_CPU_RAM)" | bc -l) -eq 1 ]; then
+          success=1
+          jq --arg cpu1 "$CPU_CORE_1" --arg cpu2 "$CPU_CORE_2" '. += [{"color": "#ff0000", "fields": [{"title": "CPU utilization was too high", "short": false, "value": "Core 1: \($cpu1)%\nCore 2: \($cpu2)%"}]}]' \
+            attachments.json 1<> attachments.json
+          echo "* CPU utilization was too high: Core 1: $CPU_CORE_1%, Core 2: $CPU_CORE_2%" >> $GITHUB_STEP_SUMMARY
+        fi
+
+        MEM_CORE_1=$(jq -r '.[] | select(.host == "emqx-core-1") | .mem' $TMPDIR/mem.json)
+        MEM_CORE_2=$(jq -r '.[] | select(.host == "emqx-core-2") | .mem' $TMPDIR/mem.json)
+
+        if [ $(echo "$MEM_CORE_1 > $RAM_BASELINE * (1 + $ALLOWED_DEVIATION_CPU_RAM)" | bc -l) -eq 1 ] \
+        || [ $(echo "$MEM_CORE_2 > $RAM_BASELINE * (1 + $ALLOWED_DEVIATION_CPU_RAM)" | bc -l) -eq 1 ]; then
+          success=1
+          jq --arg mem1 "$MEM_CORE_1" --arg mem2 "$MEM_CORE_2" '. += [{"color": "#ff0000", "fields": [{"title": "RAM usage was too high", "short": false, "value": "Core 1: \($mem1)%\nCore 2: \($mem2)%"}]}]' \
+            attachments.json 1<> attachments.json
+          echo "* RAM usage was too high: Core 1: $MEM_CORE_1%, Core 2: $MEM_CORE_2%" >> $GITHUB_STEP_SUMMARY
+        fi
+
+        RECEIVED_MSG_RATE=$(jq -r '.received_msg_rate' $TMPDIR/emqx_metrics.json)
+        SENT_MSG_RATE=$(jq -r '.sent_msg_rate' $TMPDIR/emqx_metrics.json)
+
+        if [ $(echo "$RECEIVED_MSG_RATE < $RECEIVED_MSG_RATE_BASELINE * (1 - $ALLOWED_DEVIATION_MSG_RATE)" | bc -l) -eq 1 ] \
+        || [ $(echo "$SENT_MSG_RATE < $SENT_MSG_RATE_BASELINE * (1 - $ALLOWED_DEVIATION_MSG_RATE)" | bc -l) -eq 1 ]; then
+          success=1
+          jq --arg received_msg_rate "$RECEIVED_MSG_RATE" --arg sent_msg_rate "$SENT_MSG_RATE" \
+            '. += [{"color": "#ff0000", "fields": [{"title": "Message rate was too low", "short": false, "value": "Received message rate: \($received_msg_rate)\nSent message rate: \($sent_msg_rate)"}]}]' \
+            attachments.json 1<> attachments.json
+          echo "* Message rate was too low: Received message rate: $RECEIVED_MSG_RATE, Sent message rate: $SENT_MSG_RATE" >> $GITHUB_STEP_SUMMARY
+        fi
+
+        MESSAGES_DROPPED=$(jq -r '.messages_dropped' $TMPDIR/emqx_metrics.json)
+        if [ $(echo "$MESSAGES_DROPPED > 100" | bc) -eq 1 ]; then
+          success=1
+          jq --arg dropped "$MESSAGES_DROPPED" '. += [{"color": "#ff0000", "fields": [{"title": "Too many dropped messages", "short": false, "value": "Dropped: \($dropped)"}]}]' \
+            attachments.json 1<> attachments.json
+          echo "* Too many dropped messages: $MESSAGES_DROPPED" >> $GITHUB_STEP_SUMMARY
+        fi
+
+        jq -n --argjson attachments "$(<attachments.json)" '{"attachments": $attachments}' > slack-payload.json
+
+        exit $success
+
+    - name: Post to Slack
+      if: failure()
+      uses: slackapi/slack-github-action@v1.27.0
+      env:
+        SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
       with:
-        name: metrics
-        path: |
-          "./tf-emqx-performance-test/*.tar.gz"
-    - uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
+        channel-id: ${{ secrets.SLACK_PERFTEST_CHANNEL_ID }}
+        slack-message: "EMQX performance test ${{ matrix.scenario }} failed. <${{ github.event.repository.html_url }}/actions/runs/${{ github.run_id }}|Workflow Run>"
+        payload-file-path: slack-payload.json
+        payload-file-path-parsed: false
+
+    - uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
       if: failure()
       with:
         name: terraform
         path: |
-          ./tf-emqx-performance-test/.terraform
-          ./tf-emqx-performance-test/*.tfstate
+          .terraform
+          *.tfstate

+ 2 - 2
Makefile

@@ -10,8 +10,8 @@ include env.sh
 
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.10.0
-export EMQX_EE_DASHBOARD_VERSION ?= e1.8.1-beta.5
+export EMQX_DASHBOARD_VERSION ?= v1.10.1-1
+export EMQX_EE_DASHBOARD_VERSION ?= e1.8.1-1
 
 export EMQX_RELUP ?= true
 export EMQX_REL_FORM ?= tgz

+ 2 - 2
apps/emqx/include/emqx_release.hrl

@@ -32,7 +32,7 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Opensource edition
--define(EMQX_RELEASE_CE, "5.8.1-rc.1").
+-define(EMQX_RELEASE_CE, "5.8.1").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.8.1-rc.1").
+-define(EMQX_RELEASE_EE, "5.8.1").

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -2,7 +2,7 @@
 {application, emqx, [
     {id, "emqx"},
     {description, "EMQX Core"},
-    {vsn, "5.4.1"},
+    {vsn, "5.4.2"},
     {modules, []},
     {registered, []},
     {applications, [

+ 4 - 5
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -521,8 +521,6 @@ is_fully_acked(Srs, S) ->
 ) -> state().
 derive_state(_, _, #srs{unsubscribed = true}) ->
     u;
-derive_state(_, _, #srs{it_end = end_of_stream}) ->
-    u;
 derive_state(Comm1, Comm2, SRS) ->
     case {is_track_acked(?QOS_1, Comm1, SRS), is_track_acked(?QOS_2, Comm2, SRS)} of
         {true, true} -> r;
@@ -537,8 +535,9 @@ derive_state(Comm1, Comm2, SRS) ->
 %% Transfer the stream either to R state if it's pollable or to U
 %% state if it's not.
 -spec to_RU(stream_key(), srs(), t()) -> t().
-to_RU(Key, Srs = #srs{it_end = end_of_stream}, S) ->
-    to_U(Key, Srs, S);
+to_RU(_Key, #srs{it_end = end_of_stream}, S) ->
+    %% Just don't add to ready
+    S;
 to_RU(Key, Srs = #srs{unsubscribed = true}, S) ->
     to_U(Key, Srs, S);
 to_RU(Key, _Srs, S = #s{ready = R}) ->
@@ -766,7 +765,7 @@ remove_fully_replayed_streams(S0) ->
                     Acc;
                 MinRankY when RankY =< MinRankY ->
                     ?SLOG(debug, #{
-                        msg => del_fully_preplayed_stream,
+                        msg => del_fully_replayed_stream,
                         key => Key,
                         rank => {RankX, RankY},
                         min => MinRankY

+ 1 - 1
apps/emqx_audit/src/emqx_audit.app.src

@@ -1,6 +1,6 @@
 {application, emqx_audit, [
     {description, "Audit log for EMQX"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {mod, {emqx_audit_app, []}},
     {applications, [kernel, stdlib, emqx]},

+ 18 - 18
apps/emqx_audit/src/emqx_audit.erl

@@ -50,7 +50,24 @@ to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
         http_method = <<"">>,
         http_request = <<"">>
     };
-to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api ->
+to_audit(#{from := erlang_console, function := F, args := Args}) ->
+    #?AUDIT{
+        from = erlang_console,
+        source = <<"">>,
+        source_ip = <<"">>,
+        %% operation info
+        operation_id = <<"">>,
+        operation_type = <<"">>,
+        operation_result = <<"">>,
+        failure = <<"">>,
+        %% request detail
+        http_status_code = <<"">>,
+        http_method = <<"">>,
+        http_request = <<"">>,
+        duration_ms = 0,
+        args = iolist_to_binary(io_lib:format("~p: ~ts", [F, Args]))
+    };
+to_audit(#{from := From} = Log) when is_atom(From) ->
     #{
         source := Source,
         source_ip := SourceIp,
@@ -79,23 +96,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api
         http_request = Request,
         duration_ms = DurationMs,
         args = <<"">>
-    };
-to_audit(#{from := erlang_console, function := F, args := Args}) ->
-    #?AUDIT{
-        from = erlang_console,
-        source = <<"">>,
-        source_ip = <<"">>,
-        %% operation info
-        operation_id = <<"">>,
-        operation_type = <<"">>,
-        operation_result = <<"">>,
-        failure = <<"">>,
-        %% request detail
-        http_status_code = <<"">>,
-        http_method = <<"">>,
-        http_request = <<"">>,
-        duration_ms = 0,
-        args = iolist_to_binary(io_lib:format("~p: ~ts", [F, Args]))
     }.
 
 log(_Level, undefined, _Handler) ->

+ 1 - 1
apps/emqx_auth/src/emqx_auth.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth, [
     {description, "EMQX Authentication and authorization"},
-    {vsn, "0.4.1"},
+    {vsn, "0.4.2"},
     {modules, []},
     {registered, [emqx_auth_sup]},
     {applications, [

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.2.5"},
+    {vsn, "0.2.6"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 12 - 1
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -59,7 +59,8 @@
 -export([source_resource_opts_fields/0, source_resource_opts_fields/1]).
 
 -export([
-    api_fields/3
+    api_fields/3,
+    undefined_as_null_field/0
 ]).
 
 -export([
@@ -330,6 +331,16 @@ api_fields("post_source", Type, Fields) ->
 api_fields("put_source", _Type, Fields) ->
     Fields.
 
+undefined_as_null_field() ->
+    {undefined_vars_as_null,
+        ?HOCON(
+            boolean(),
+            #{
+                default => false,
+                desc => ?DESC("undefined_vars_as_null")
+            }
+        )}.
+
 %%======================================================================================
 %% HOCON Schema Callbacks
 %%======================================================================================

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.0"},
+    {wolff, "4.0.2"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_clickhouse, [
     {description, "EMQX Enterprise ClickHouse Bridge"},
-    {vsn, "0.4.2"},
+    {vsn, "0.4.3"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 0
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl

@@ -129,6 +129,7 @@ fields(clickhouse_action) ->
 fields(action_parameters) ->
     [
         sql_field(),
+        emqx_bridge_v2_schema:undefined_as_null_field(),
         batch_value_separator_field()
     ];
 fields(connector_resource_opts) ->

+ 27 - 12
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -294,8 +294,10 @@ on_stop(InstanceID, _State) ->
 %% channel related emqx_resouce callbacks
 %% -------------------------------------------------------------------
 on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) ->
-    #{parameters := ParamConf} = ChannConf0,
-    NewChanns = Channs#{ChannId => #{templates => prepare_sql_templates(ParamConf)}},
+    #{parameters := ChannelConf} = ChannConf0,
+    NewChanns = Channs#{
+        ChannId => #{templates => prepare_sql_templates(ChannelConf), channel_conf => ChannelConf}
+    },
     {ok, OldState#{channels => NewChanns}}.
 
 on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
@@ -387,22 +389,28 @@ on_query(
     }),
     %% Have we got a query or data to fit into an SQL template?
     SimplifiedRequestType = query_type(RequestType),
+    ChannelState = get_channel_state(RequestType, State),
     Templates = get_templates(RequestType, State),
-    SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL),
+    SQL = get_sql(
+        SimplifiedRequestType, Templates, DataOrSQL, maps:get(channel_conf, ChannelState, #{})
+    ),
     ClickhouseResult = execute_sql_in_clickhouse_server(RequestType, PoolName, SQL),
     transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
 
 get_templates(ChannId, State) ->
+    maps:get(templates, get_channel_state(ChannId, State), #{}).
+
+get_channel_state(ChannId, State) ->
     case maps:find(channels, State) of
         {ok, Channels} ->
-            maps:get(templates, maps:get(ChannId, Channels, #{}), #{});
+            maps:get(ChannId, Channels, #{});
         error ->
             #{}
     end.
 
-get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) ->
-    emqx_placeholder:proc_tmpl(PreparedSQL, Data, #{return => full_binary});
-get_sql(_, _, SQL) ->
+get_sql(channel_message, #{send_message_template := PreparedSQL}, Data, ChannelConf) ->
+    proc_nullable_tmpl(PreparedSQL, Data, ChannelConf);
+get_sql(_, _, SQL, _) ->
     SQL.
 
 query_type(sql) ->
@@ -425,8 +433,9 @@ on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) ->
     {[ChannId | _] = Keys, ObjectsToInsert} = lists:unzip(BatchReq),
     ensure_channel_messages(Keys),
     Templates = get_templates(ChannId, State),
+    ChannelState = get_channel_state(ChannId, State),
     %% Create batch insert SQL statement
-    SQL = objects_to_sql(ObjectsToInsert, Templates),
+    SQL = objects_to_sql(ObjectsToInsert, Templates, maps:get(channel_conf, ChannelState, #{})),
     %% Do the actual query in the database
     ResultFromClickhouse = execute_sql_in_clickhouse_server(ChannId, PoolName, SQL),
     %% Transform the result to a better format
@@ -447,20 +456,26 @@ objects_to_sql(
     #{
         send_message_template := InsertTemplate,
         extend_send_message_template := BulkExtendInsertTemplate
-    }
+    },
+    ChannelConf
 ) ->
     %% Prepare INSERT-statement and the first row after VALUES
-    InsertStatementHead = emqx_placeholder:proc_tmpl(InsertTemplate, FirstObject),
+    InsertStatementHead = proc_nullable_tmpl(InsertTemplate, FirstObject, ChannelConf),
     FormatObjectDataFunction =
         fun(Object) ->
-            emqx_placeholder:proc_tmpl(BulkExtendInsertTemplate, Object)
+            proc_nullable_tmpl(BulkExtendInsertTemplate, Object, ChannelConf)
         end,
     InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
     CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),
     CompleteStatement;
-objects_to_sql(_, _) ->
+objects_to_sql(_, _, _) ->
     erlang:error(<<"Templates for bulk insert missing.">>).
 
+proc_nullable_tmpl(Template, Data, #{undefined_vars_as_null := true}) ->
+    emqx_placeholder:proc_nullable_tmpl(Template, Data);
+proc_nullable_tmpl(Template, Data, _) ->
+    emqx_placeholder:proc_tmpl(Template, Data).
+
 %% -------------------------------------------------------------------
 %% Helper functions that are used by both on_query/3 and on_batch_query/3
 %% -------------------------------------------------------------------

+ 20 - 1
apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl

@@ -9,6 +9,7 @@
 
 -define(CLICKHOUSE_HOST, "clickhouse").
 -define(CLICKHOUSE_PORT, "8123").
+-include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 
@@ -177,9 +178,12 @@ parse_and_check(ConfigString, BridgeType, Name) ->
     RetConfig.
 
 make_bridge(Config) ->
+    make_bridge(Config, #{}).
+
+make_bridge(Config, Overrides) ->
     Type = <<"clickhouse">>,
     Name = atom_to_binary(?MODULE),
-    BridgeConfig = clickhouse_config(Config),
+    BridgeConfig = maps:merge(clickhouse_config(Config), Overrides),
     {ok, _} = emqx_bridge:create(
         Type,
         Name,
@@ -252,6 +256,21 @@ t_send_message_query(Config) ->
     delete_bridge(),
     ok.
 
+t_undefined_vars_as_null(Config) ->
+    BridgeID = make_bridge(#{enable_batch => false}, #{<<"undefined_vars_as_null">> => true}),
+    Key = 42,
+    Payload = #{key => Key, data => undefined, timestamp => 10000},
+    %% This will use the SQL template included in the bridge
+    emqx_bridge:send_message(BridgeID, Payload),
+    %% Check that the data got to the database
+    check_key_in_clickhouse(Key, Config),
+    ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
+    SQL = io_lib:format("SELECT data FROM mqtt.mqtt_test WHERE key = ~p", [Key]),
+    {ok, 200, ResultString} = clickhouse:query(ClickhouseConnection, SQL, []),
+    ?assertMatch(<<"null">>, iolist_to_binary(string:trim(ResultString))),
+    delete_bridge(),
+    ok.
+
 t_send_simple_batch(Config) ->
     send_simple_batch_helper(Config, #{}).
 

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.0"},
+    {wolff, "4.0.2"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_dynamo, [
     {description, "EMQX Enterprise Dynamo Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl

@@ -200,7 +200,8 @@ fields("config_connector") ->
         end,
         Config,
         [
-            table
+            table,
+            undefined_vars_as_null
         ]
     );
 fields(connector_resource_opts) ->

+ 5 - 4
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -63,7 +63,8 @@ fields(config) ->
                 }
             )},
         {pool_size, fun emqx_connector_schema_lib:pool_size/1},
-        {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
+        {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1},
+        emqx_bridge_v2_schema:undefined_as_null_field()
     ].
 
 %%========================================================================================
@@ -253,7 +254,7 @@ do_query(
                 ecpool:pick_and_do(
                     PoolName,
                     {emqx_bridge_dynamo_connector_client, query, [
-                        Table, QueryTuple, Templates, TraceRenderedCTX
+                        Table, QueryTuple, Templates, TraceRenderedCTX, ChannelState
                     ]},
                     no_handover
                 );
@@ -310,8 +311,8 @@ get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
         {unrecoverable_error,
             {invalid_request, <<"The only query type that supports batching is insert.">>}}
     );
-get_query_tuple([InsertQuery | _]) ->
-    get_query_tuple(InsertQuery).
+get_query_tuple([_InsertQuery | _] = Reqs) ->
+    lists:map(fun get_query_tuple/1, Reqs).
 
 ensuare_dynamo_keys({_, Data} = Query, State) when is_map(Data) ->
     ensuare_dynamo_keys([Query], State);

+ 48 - 38
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl

@@ -10,7 +10,7 @@
 -export([
     start_link/1,
     is_connected/2,
-    query/5
+    query/6
 ]).
 
 %% gen_server callbacks
@@ -24,7 +24,7 @@
 ]).
 
 -ifdef(TEST).
--export([execute/2]).
+-export([execute/3]).
 -endif.
 
 -include_lib("emqx/include/emqx_trace.hrl").
@@ -42,8 +42,10 @@ is_connected(Pid, Timeout) ->
             {false, Error}
     end.
 
-query(Pid, Table, Query, Templates, TraceRenderedCTX) ->
-    gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedCTX}, infinity).
+query(Pid, Table, Query, Templates, TraceRenderedCTX, ChannelState) ->
+    gen_server:call(
+        Pid, {query, Table, Query, Templates, TraceRenderedCTX, ChannelState}, infinity
+    ).
 
 %%--------------------------------------------------------------------
 %% @doc
@@ -79,14 +81,14 @@ handle_call(is_connected, _From, State) ->
                 {false, Error}
         end,
     {reply, IsConnected, State};
-handle_call({query, Table, Query, Templates, TraceRenderedCTX}, _From, State) ->
-    Result = do_query(Table, Query, Templates, TraceRenderedCTX),
+handle_call({query, Table, Query, Templates, TraceRenderedCTX, ChannelState}, _From, State) ->
+    Result = do_query(Table, Query, Templates, TraceRenderedCTX, ChannelState),
     {reply, Result, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
-handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) ->
-    Result = do_query(Table, Query, Templates, {fun(_, _) -> ok end, none}),
+handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}, ChannelState}, State) ->
+    Result = do_query(Table, Query, Templates, {fun(_, _) -> ok end, none}, ChannelState),
     ReplyFun(Context, Result),
     {noreply, State};
 handle_cast(_Request, State) ->
@@ -104,9 +106,9 @@ code_change(_OldVsn, State, _Extra) ->
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
-do_query(Table, Query0, Templates, TraceRenderedCTX) ->
+do_query(Table, Query0, Templates, TraceRenderedCTX, ChannelState) ->
     try
-        Query = apply_template(Query0, Templates),
+        Query = apply_template(Query0, Templates, ChannelState),
         emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
             table => Table,
             query => #emqx_trace_format_func_data{
@@ -114,12 +116,12 @@ do_query(Table, Query0, Templates, TraceRenderedCTX) ->
                 data = Query
             }
         }),
-        execute(Query, Table)
+        execute(Query, Table, ChannelState)
     catch
         error:{unrecoverable_error, Reason} ->
             {error, {unrecoverable_error, Reason}};
-        _Type:Reason ->
-            {error, {unrecoverable_error, {invalid_request, Reason}}}
+        Err:Reason:ST ->
+            {error, {unrecoverable_error, {invalid_request, {Err, Reason, ST}}}}
     end.
 
 trace_format_query({Type, Data}) ->
@@ -131,68 +133,76 @@ trace_format_query(Query) ->
     Query.
 
 %% some simple query commands for authn/authz or test
-execute({insert_item, Msg}, Table) ->
-    Item = convert_to_item(Msg),
+execute({insert_item, Msg}, Table, ChannelState) ->
+    Item = convert_to_item(Msg, ChannelState),
     erlcloud_ddb2:put_item(Table, Item);
-execute({delete_item, Key}, Table) ->
+execute({delete_item, Key}, Table, _) ->
     erlcloud_ddb2:delete_item(Table, Key);
-execute({get_item, Key}, Table) ->
+execute({get_item, Key}, Table, _) ->
     erlcloud_ddb2:get_item(Table, Key);
 %% commands for data bridge query or batch query
-execute({send_message, Msg}, Table) ->
-    Item = convert_to_item(Msg),
+execute({send_message, Msg}, Table, ChannelState) ->
+    Item = convert_to_item(Msg, ChannelState),
     erlcloud_ddb2:put_item(Table, Item);
-execute([{put, _} | _] = Msgs, Table) ->
+execute([{put, _} | _] = Msgs, Table, _) ->
     %% type of batch_write_item argument :: batch_write_item_request_items()
     %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
     %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
     %% batch_write_item_request() :: {put, item()} | {delete, key()}
     erlcloud_ddb2:batch_write_item({Table, Msgs}).
 
-apply_template({Key, Msg} = Req, Templates) ->
-    case maps:get(Key, Templates, undefined) of
-        undefined ->
-            Req;
-        Template ->
-            {Key, emqx_placeholder:proc_tmpl(Template, Msg)}
+apply_template({Key, Msg} = Req, Templates, _) ->
+    case maps:find(Key, Templates) of
+        error -> Req;
+        {ok, Template} -> {Key, emqx_placeholder:proc_tmpl(Template, Msg)}
     end;
 %% now there is no batch delete, so
 %% 1. we can simply replace the `send_message` to `put`
 %% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`,
 %%    so we can reduce some list map cost
-apply_template([{_, _Msg} | _] = Msgs, Templates) ->
+apply_template([{_, _Msg} | _] = Msgs, Templates, ChannelState) ->
     lists:map(
         fun(Req) ->
-            {_, Msg} = apply_template(Req, Templates),
-            {put, convert_to_item(Msg)}
+            {_, Msg} = apply_template(Req, Templates, ChannelState),
+            {put, convert_to_item(Msg, ChannelState)}
         end,
         Msgs
     ).
 
-convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 ->
+convert_to_item(Msg, ChannelState) when is_map(Msg), map_size(Msg) > 0 ->
     maps:fold(
         fun
             (_K, <<>>, AccIn) ->
                 AccIn;
             (K, V, AccIn) ->
-                [{convert2binary(K), convert2binary(V)} | AccIn]
+                [{to_bin(K), val_to_bin(V, ChannelState)} | AccIn]
         end,
         [],
         Msg
     );
-convert_to_item(MsgBin) when is_binary(MsgBin) ->
+convert_to_item(MsgBin, ChannelState) when is_binary(MsgBin) ->
     Msg = emqx_utils_json:decode(MsgBin),
-    convert_to_item(Msg);
-convert_to_item(Item) ->
+    convert_to_item(Msg, ChannelState);
+convert_to_item(Item, _) ->
     erlang:throw({invalid_item, Item}).
 
-convert2binary(Value) when is_atom(Value) ->
+val_to_bin(Null, #{undefined_vars_as_null := true}) when
+    Null =:= <<"undefined">>;
+    Null =:= <<"null">>;
+    Null =:= undefined;
+    Null =:= null
+->
+    {null, true};
+val_to_bin(Val, _) ->
+    to_bin(Val).
+
+to_bin(Value) when is_atom(Value) ->
     erlang:atom_to_binary(Value, utf8);
-convert2binary(Value) when is_binary(Value); is_number(Value) ->
+to_bin(Value) when is_binary(Value); is_number(Value) ->
     Value;
-convert2binary(Value) when is_list(Value) ->
+to_bin(Value) when is_list(Value) ->
     unicode:characters_to_binary(Value);
-convert2binary(Value) when is_map(Value) ->
+to_bin(Value) when is_map(Value) ->
     emqx_utils_json:encode(Value).
 
 to_str(List) when is_list(List) ->

+ 43 - 3
apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl

@@ -348,12 +348,15 @@ directly_setup_dynamo() ->
 
 directly_query(Query) ->
     directly_setup_dynamo(),
-    emqx_bridge_dynamo_connector_client:execute(Query, ?TABLE_BIN).
+    emqx_bridge_dynamo_connector_client:execute(Query, ?TABLE_BIN, #{}).
 
 directly_get_payload(Key) ->
+    directly_get_field(Key, <<"payload">>).
+
+directly_get_field(Key, Field) ->
     case directly_query({get_item, {<<"id">>, Key}}) of
         {ok, Values} ->
-            proplists:get_value(<<"payload">>, Values, {error, {invalid_item, Values}});
+            proplists:get_value(Field, Values, {error, {invalid_item, Values}});
         Error ->
             Error
     end.
@@ -370,7 +373,7 @@ t_setup_via_config_and_publish(Config) ->
         create_bridge(Config)
     ),
     MsgId = emqx_utils:gen_id(),
-    SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD},
+    SentData = #{clientid => <<"clientid">>, id => MsgId, payload => ?PAYLOAD, foo => undefined},
     ?check_trace(
         begin
             ?wait_async_action(
@@ -384,6 +387,43 @@ t_setup_via_config_and_publish(Config) ->
                 ?PAYLOAD,
                 directly_get_payload(MsgId)
             ),
+            ?assertMatch(
+                %% the old behavior without undefined_vars_as_null
+                <<"undefined">>,
+                directly_get_field(MsgId, <<"foo">>)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(dynamo_connector_query_return, Trace0),
+            ?assertMatch([#{result := {ok, _}}], Trace),
+            ok
+        end
+    ),
+    ok.
+
+t_undefined_vars_as_null(Config) ->
+    ?assertNotEqual(undefined, get(aws_config)),
+    create_table(Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config, #{<<"undefined_vars_as_null">> => true})
+    ),
+    MsgId = emqx_utils:gen_id(),
+    SentData = #{clientid => <<"clientid">>, id => MsgId, payload => undefined},
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertMatch(
+                    {ok, _}, send_message(Config, SentData)
+                ),
+                #{?snk_kind := dynamo_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                undefined,
+                directly_get_payload(MsgId)
+            ),
             ok
         end,
         fun(Trace0) ->

+ 6 - 2
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -576,10 +576,14 @@ on_batch_query_async(
     }),
     {error, not_support}.
 
-%% todo
+%% TODO:
+%% Currently, the batch mode is not really `batch` for the Rest API and Thrift drivers.
+%% 1. For Rest API we need to upgrade from v1 to v2 which has a batch endpoint `insertRecords`,
+%%    and we should take care to ensure this is not a breaking change
+%% 2. For the Thrift, we can use the `tSInsertTabletsReq` or `tSInsertRecordsReq` protocol
 on_batch_query(
     InstId,
-    [{ChannelId, _Message}] = Requests,
+    [{ChannelId, _Message} | _] = Requests,
     #{iotdb_version := IoTDBVsn, channels := Channels} = State
 ) ->
     ?tp(iotdb_bridge_on_batch_query, #{instance_id => InstId}),

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, "4.0.0"},
+    {wolff, "4.0.2"},
     {kafka_protocol, "4.1.8"},
     {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 6 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -14,6 +14,7 @@
 -export([
     resource_type/0,
     query_mode/1,
+    query_opts/1,
     callback_mode/0,
     on_start/2,
     on_stop/2,
@@ -45,6 +46,11 @@ query_mode(#{parameters := #{query_mode := sync}}) ->
 query_mode(_) ->
     simple_async_internal_buffer.
 
+query_opts(#{parameters := #{query_mode := sync, sync_query_timeout := Timeout}}) ->
+    #{timeout => Timeout};
+query_opts(_) ->
+    #{}.
+
 callback_mode() -> async_if_possible.
 
 check_config(Key, Config) when is_map_key(Key, Config) ->

+ 2 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl

@@ -148,7 +148,8 @@ fields(action_parameters) ->
             mk(
                 emqx_schema:template(),
                 #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
-            )}
+            )},
+        emqx_bridge_v2_schema:undefined_as_null_field()
     ];
 fields("config_connector") ->
     emqx_connector_schema:common_fields() ++

+ 3 - 2
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl

@@ -130,12 +130,13 @@ on_batch_query(
     Result = emqx_mysql:on_batch_query(
         InstanceId,
         BatchRequest,
-        MergedState1
+        MergedState1,
+        ChannelConfig
     ),
     ?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}),
     Result;
 on_batch_query(InstanceId, BatchRequest, _State = #{connector_state := ConnectorState}) ->
-    emqx_mysql:on_batch_query(InstanceId, BatchRequest, ConnectorState).
+    emqx_mysql:on_batch_query(InstanceId, BatchRequest, ConnectorState, #{}).
 
 on_remove_channel(
     _InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId

+ 27 - 0
apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl

@@ -426,6 +426,33 @@ t_setup_via_config_and_publish(Config) ->
     ),
     ok.
 
+t_undefined_vars_as_null(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config, #{<<"undefined_vars_as_null">> => true})
+    ),
+    SentData = #{payload => undefined, timestamp => 1668602148000},
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := mysql_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                {ok, [<<"payload">>], [[null]]},
+                connect_and_get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(mysql_connector_query_return, Trace0),
+            ?assertMatch([#{result := ok}], Trace),
+            ok
+        end
+    ),
+    ok.
+
 t_setup_via_http_api_and_publish(Config) ->
     BridgeType = ?config(mysql_bridge_type, Config),
     Name = ?config(mysql_name, Config),

+ 140 - 0
apps/emqx_bridge_snowflake/docs/dev-quick-ref.md

@@ -39,6 +39,124 @@ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out snowflake_rsa_key.pr
 openssl rsa -in snowflake_rsa_key.private.pem -pubout -out snowflake_rsa_key.public.pem
 ```
 
+## SQL setup cheat sheet
+
+```sql
+CREATE USER IF NOT EXISTS testuser
+    PASSWORD = 'TestUser99'
+    MUST_CHANGE_PASSWORD = FALSE;
+
+-- Set the RSA public key for 'testuser'
+-- Note: Remove the '-----BEGIN PUBLIC KEY-----' and '-----END PUBLIC KEY-----' lines from your PEM file,
+-- and include the remaining content below, preserving line breaks.
+
+ALTER USER testuser SET RSA_PUBLIC_KEY = '
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_1>
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_2>
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_3>
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_4>
+';
+
+
+create or replace role testrole;
+
+create warehouse testwarehouse;
+
+CREATE OR REPLACE TABLE testdatabase.public.test0 (
+    clientid STRING,
+    topic STRING,
+    payload BINARY,
+    publish_received_at TIMESTAMP_LTZ
+);
+
+CREATE STAGE IF NOT EXISTS testdatabase.public.teststage0
+FILE_FORMAT = (TYPE = CSV PARSE_HEADER = TRUE FIELD_OPTIONALLY_ENCLOSED_BY = '"')
+COPY_OPTIONS = (ON_ERROR = CONTINUE PURGE = TRUE);
+
+CREATE PIPE IF NOT EXISTS testdatabase.public.testpipe0 AS
+COPY INTO testdatabase.public.test0
+FROM @testdatabase.public.teststage0
+MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
+
+
+-- Grant the USAGE privilege on the database and schema that contain the pipe object.
+grant usage on database testdatabase to role testrole;
+grant usage on schema testdatabase.public to role testrole;
+-- Grant the USAGE privilege on the warehouse (only needed for test account)
+grant usage on warehouse testwarehouse to role testrole;
+-- Grant the INSERT, SELECT, TRUNCATE and DELETE privileges on the target table
+-- for cleaning up after tests
+grant insert, select, truncate, delete on testdatabase.public.test0 to role testrole;
+-- Grant the READ and WRITE privilege on the internal stage.
+grant read, write on stage testdatabase.public.teststage0 to role testrole;
+-- Grant the OPERATE and MONITOR privileges on the pipe object.
+grant operate, monitor on pipe testdatabase.public.testpipe0 to role testrole;
+-- Grant the role to a user
+grant role testrole to user testuser;
+-- Set the role as the default role for the user
+alter user testuser set default_role = testrole;
+
+-- Create a role for the Snowpipe privileges.
+create or replace role snowpipe;
+-- Grant the USAGE privilege on the database and schema that contain the pipe object.
+grant usage on database testdatabase to role snowpipe;
+grant usage on schema testdatabase.public to role snowpipe;
+-- Grant the INSERT and SELECT privileges on the target table.
+grant insert, select on testdatabase.public.test0 to role snowpipe;
+-- Grant the READ and WRITE privilege on the internal stage.
+grant read, write on stage testdatabase.public.teststage0 to role snowpipe;
+-- Grant the OPERATE and MONITOR privileges on the pipe object.
+grant operate, monitor on pipe testdatabase.public.testpipe0 to role snowpipe;
+-- Grant the role to a user
+grant role snowpipe to user snowpipeuser;
+-- Set the role as the default role for the user
+alter user snowpipeuser set default_role = snowpipe;
+
+---- OPTIONAL
+-- not required, but helps gather JWT failure reasons like skewed time
+grant monitor on account to role snowpipe;
+
+
+-- Create a role for the Snowpipe privileges, but missing write permissions to
+-- stage, so health check can happen but staging can't.
+
+CREATE USER IF NOT EXISTS snowpipe_ro_user
+    PASSWORD = 'TestUser99'
+    MUST_CHANGE_PASSWORD = FALSE;
+
+-- Set the RSA public key for 'testuser'
+-- Note: Remove the '-----BEGIN PUBLIC KEY-----' and '-----END PUBLIC KEY-----' lines from your PEM file,
+-- and include the remaining content below, preserving line breaks.
+
+ALTER USER snowpipe_ro_user SET RSA_PUBLIC_KEY = '
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_1>
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_2>
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_3>
+<YOUR_PUBLIC_KEY_CONTENTS_LINE_4>
+';
+
+
+create or replace role snowpipe_ro;
+-- Grant the USAGE privilege on the database and schema that contain the pipe object.
+grant usage on database testdatabase to role snowpipe_ro;
+grant usage on schema testdatabase.public to role snowpipe_ro;
+-- Grant the SELECT privileges on the target table.
+grant  select on testdatabase.public.test0 to role snowpipe_ro;
+-- Grant the READ privilege on the internal stage.
+grant read on stage testdatabase.public.teststage0 to role snowpipe_ro;
+-- Grant the MONITOR privileges on the pipe object.
+grant monitor on pipe testdatabase.public.testpipe0 to role snowpipe_ro;
+-- Grant the role to a user
+grant role snowpipe_ro to user snowpipe_ro_user;
+-- Set the role as the default role for the user
+alter user snowpipe_ro_user set default_role = snowpipe_ro;
+
+---- OPTIONAL
+-- not required, but helps gather JWT failure reasons like skewed time
+grant monitor on account to role snowpipe_ro;
+
+```
+
 ## Basic helper functions
 
 ### Elixir
@@ -60,6 +178,13 @@ dsn = "snowflake"
 query = fn conn, sql -> :odbc.sql_query(conn, sql |> to_charlist()) end
 ```
 
+Or, if you have already set up a connector:
+
+```elixir
+conn_res_id = "connector:snowflake:name"
+query = fn sql -> conn_res_id |> :ecpool.pick_and_do(fn conn -> :odbc.sql_query(conn, sql |> to_charlist()) end, :handover) end
+```
+
 ### Erlang
 
 ```erlang
@@ -74,6 +199,13 @@ DSN = "snowflake".
 Query = fun(Conn, Sql) -> odbc:sql_query(Conn, Sql) end.
 ```
 
+Or, if you have already set up a connector:
+
+```Erlang
+ConnResId = <<"connector:snowflake:name">>.
+Query = fun(Sql) -> ecpool:pick_and_do(ConnResId, fun(Conn) -> odbc:sql_query(Conn, Sql) end, handover) end.
+```
+
 ## Initialize Database and user accounts
 
 ### Elixir
@@ -148,6 +280,10 @@ query.(conn, "grant operate, monitor on pipe #{fqn_pipe} to role #{snowpipe_role
 query.(conn, "grant role #{snowpipe_role} to user #{snowpipe_user}")
 # Set the role as the default role for the user
 query.(conn, "alter user #{snowpipe_user} set default_role = #{snowpipe_role}")
+
+## OPTIONAL
+# not required, but helps gather JWT failure reasons like skewed time
+query.(conn, "grant monitor on account to role #{snowpipe_role}")
 ```
 
 ### Erlang
@@ -236,4 +372,8 @@ Query(Conn, ["grant role ", SnowpipeRole, " to user ", SnowpipeUser]).
 
 % Set the role as the default role for the user
 Query(Conn, ["alter user ", SnowpipeUser, " set default_role = ", SnowpipeRole]).
+
+%%  OPTIONAL
+% not required, but helps gather JWT failure reasons like skewed time
+Query(Conn, ["grant monitor on account to role ", SnowpipeRole]).
 ```

+ 21 - 0
apps/emqx_bridge_snowflake/docs/user-guide.md

@@ -115,3 +115,24 @@ SELECT
 FROM
   "t/#"
 ```
+
+## Debugging invalid JWT failures
+
+In case the following error appears in the logs:
+
+```
+JWT token is invalid. [eaa17004-5830-4b84-b357-2a981d28606f]
+```
+
+Copy the UUID in that message (`eaa17004-5830-4b84-b357-2a981d28606f` in this example) and on a Snowflake worksheet with an user that has admin privileges on the account (at least `MONITOR` on account):
+
+```sql
+select SYSTEM$GET_LOGIN_FAILURE_DETAILS('eaa17004-5830-4b84-b357-2a981d28606f');
+```
+
+Which can output more hints on why the JWT is considered invalid by Snowflake:
+
+Ex:
+```json
+{"clientIP":"xxx","clientType":"OTHER","clientVersion":"","username":null,"errorCode":"JWT_TOKEN_INVALID_ISSUE_TIME","timestamp":1728418411}
+```

+ 117 - 15
apps/emqx_bridge_snowflake/src/emqx_bridge_snowflake_connector.erl

@@ -42,7 +42,8 @@
     connect/1,
     disconnect/1,
     do_health_check_connector/1,
-    do_stage_file/6
+    do_stage_file/6,
+    do_get_login_failure_details/2
 ]).
 
 %% `emqx_connector_aggreg_delivery' API
@@ -267,7 +268,8 @@ on_remove_channel(
     destroy_action(ActionResId, ActionState),
     ConnState = ConnState0#{installed_actions := InstalledActions},
     {ok, ConnState};
-on_remove_channel(_ConnResId, ConnState, _ActionResId) ->
+on_remove_channel(_ConnResId, ConnState, ActionResId) ->
+    ensure_common_action_destroyed(ActionResId),
     {ok, ConnState}.
 
 -spec on_get_channels(connector_resource_id()) ->
@@ -282,12 +284,12 @@ on_get_channels(ConnResId) ->
 ) ->
     ?status_connected | ?status_disconnected.
 on_get_channel_status(
-    _ConnResId,
+    ConnResId,
     ActionResId,
     _ConnState = #{installed_actions := InstalledActions}
 ) when is_map_key(ActionResId, InstalledActions) ->
     ActionState = maps:get(ActionResId, InstalledActions),
-    action_status(ActionResId, ActionState);
+    action_status(ConnResId, ActionResId, ActionState);
 on_get_channel_status(_ConnResId, _ActionResId, _ConnState) ->
     ?status_disconnected.
 
@@ -619,7 +621,7 @@ process_complete(TransferState0) ->
             Res ->
                 ?tp("snowflake_insert_files_request_failed", #{response => Res}),
                 %% TODO: retry?
-                exit({insert_failed, Res})
+                exit({upload_failed, Res})
         end
     end.
 
@@ -636,7 +638,7 @@ create_action(
 ) ->
     maybe
         {ok, ActionState0} ?= start_http_pool(ActionResId, ActionConfig, ConnState),
-        _ = check_snowpipe_user_permission(ActionResId, ActionState0),
+        _ = check_snowpipe_user_permission(ActionResId, ConnResId, ActionState0),
         start_aggregator(ConnResId, ActionResId, ActionConfig, ActionState0)
     end.
 
@@ -798,6 +800,10 @@ destroy_action(ActionResId, ActionState) ->
         _ ->
             ok
     end,
+    ok = ensure_common_action_destroyed(ActionResId),
+    ok.
+
+ensure_common_action_destroyed(ActionResId) ->
     ok = ehttpc_sup:stop_pool(ActionResId),
     ok = emqx_connector_jwt:delete_jwt(?JWT_TABLE, ActionResId),
     ok.
@@ -907,13 +913,19 @@ insert_report_request(HTTPPool, Opts, HTTPClientConfig) ->
     JWTToken = emqx_connector_jwt:ensure_jwt(JWTConfig),
     AuthnHeader = [<<"BEARER ">>, JWTToken],
     Headers = http_headers(AuthnHeader),
+    QString = insert_report_query_string(Opts),
     InsertReportPath =
-        case Opts of
-            #{begin_mark := BeginMark} when is_binary(BeginMark) ->
-                <<InsertReportPath0/binary, "?beginMark=", BeginMark/binary>>;
+        case QString of
+            <<>> ->
+                InsertReportPath0;
             _ ->
-                InsertReportPath0
+                <<InsertReportPath0/binary, "?", QString/binary>>
         end,
+    ?SLOG(debug, #{
+        msg => "snowflake_insert_report_request",
+        path => InsertReportPath,
+        pool => HTTPPool
+    }),
     Req = {InsertReportPath, Headers},
     Response = ?MODULE:do_insert_report_request(HTTPPool, Req, RequestTTL, MaxRetries),
     case Response of
@@ -924,6 +936,13 @@ insert_report_request(HTTPPool, Opts, HTTPClientConfig) ->
             {error, Response}
     end.
 
+insert_report_query_string(Opts0) ->
+    Opts1 = maps:with([begin_mark, request_id], Opts0),
+    Opts2 = maps:filter(fun(_K, V) -> is_binary(V) end, Opts1),
+    Opts3 = emqx_utils_maps:rename(begin_mark, <<"beginMark">>, Opts2),
+    Opts = emqx_utils_maps:rename(request_id, <<"requestId">>, Opts3),
+    emqx_utils_conv:bin(uri_string:compose_query(maps:to_list(Opts))).
+
 %% Internal export only for mocking
 do_insert_report_request(HTTPPool, Req, RequestTTL, MaxRetries) ->
     ehttpc:request(HTTPPool, get, Req, RequestTTL, MaxRetries).
@@ -941,7 +960,7 @@ row_to_map(Row0, Headers) ->
     Row = lists:zip(Headers, Row2),
     maps:from_list(Row).
 
-action_status(ActionResId, #{mode := aggregated} = ActionState) ->
+action_status(ConnResId, ActionResId, #{mode := aggregated} = ActionState) ->
     #{
         aggreg_id := AggregId,
         http := #{connect_timeout := ConnectTimeout}
@@ -950,9 +969,9 @@ action_status(ActionResId, #{mode := aggregated} = ActionState) ->
     Timestamp = erlang:system_time(second),
     ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
     ok = check_aggreg_upload_errors(AggregId),
-    ok = check_snowpipe_user_permission(ActionResId, ActionState),
     case http_pool_workers_healthy(ActionResId, ConnectTimeout) of
         true ->
+            ok = check_snowpipe_user_permission(ActionResId, ConnResId, ActionState),
             ?status_connected;
         false ->
             ?status_disconnected
@@ -1027,9 +1046,10 @@ check_aggreg_upload_errors(AggregId) ->
             ok
     end.
 
-check_snowpipe_user_permission(HTTPPool, ActionState) ->
+check_snowpipe_user_permission(HTTPPool, ODBCPool, ActionState) ->
     #{http := HTTPClientConfig} = ActionState,
-    Opts = #{},
+    RequestId = list_to_binary(uuid:uuid_to_string(uuid:get_v4())),
+    Opts = #{request_id => RequestId},
     case insert_report_request(HTTPPool, Opts, HTTPClientConfig) of
         {ok, _} ->
             ok;
@@ -1039,7 +1059,10 @@ check_snowpipe_user_permission(HTTPPool, ActionState) ->
                     {ok, JSON} -> JSON;
                     {error, _} -> Body0
                 end,
-            ?SLOG(debug, #{
+            FailureDetails = try_get_jwt_failure_details(ODBCPool, HTTPPool, Body),
+            ?SLOG(warning, FailureDetails#{
+                pool => HTTPPool,
+                request_id => RequestId,
                 msg => "snowflake_check_snowpipe_user_permission_error",
                 body => Body
             }),
@@ -1068,6 +1091,85 @@ check_snowpipe_user_permission(HTTPPool, ActionState) ->
             throw(Msg)
     end.
 
+try_get_jwt_failure_details(ODBCPool, ActionResId, RespBody) ->
+    maybe
+        #{<<"message">> := Msg} ?= RespBody,
+        {ok, RequestId} ?= get_jwt_error_request_id(Msg),
+        {selected, [_ColHeader], [{Val}]} ?= get_login_failure_details(ODBCPool, RequestId),
+        true ?= is_list(Val) orelse {error, {not_string, Val}},
+        {ok, Data} ?= emqx_utils_json:safe_decode(Val, [return_maps]),
+        #{failure_details => Data}
+    else
+        Err ->
+            ?SLOG(debug, #{
+                msg => "snowflake_action_get_jwt_failure_details_err",
+                action_res_id => ActionResId,
+                reason => Err
+            }),
+            %% When role doesn't have MONITOR on account, the command returns:
+            %% SQL compilation error:\nUnknown function SYSTEM$GET_LOGIN_FAILURE_DETAILS
+            %% SQLSTATE IS: 42601
+            Hint = <<
+                "To get more details about the login failure, log into your",
+                " Snowflake account with an admin role that has the MONITOR privilege",
+                " on the account, and check the output of",
+                " SYSTEM$GET_LOGIN_FAILURE_DETAILS on logged request id."
+            >>,
+            #{failure_details => undefined, hint => Hint}
+    end.
+
+%% Even if we provide a request id for the HTTP call, snowflake decides to use its own
+%% request id when returning JWT errors...
+get_jwt_error_request_id(Msg) when is_binary(Msg) ->
+    %% ece3379e-6715-4d48-adeb-d5507d05e3e2
+    HexChar = <<"[0-9a-fA-F]">>,
+    UUIDRE = iolist_to_binary([
+        HexChar,
+        <<"{8}-">>,
+        HexChar,
+        <<"{4}-">>,
+        HexChar,
+        <<"{4}-">>,
+        HexChar,
+        <<"{4}-">>,
+        HexChar,
+        <<"{12}">>
+    ]),
+    RE = <<"\\[(", UUIDRE/binary, ")\\]">>,
+    case re:run(Msg, RE, [{capture, all_but_first, binary}]) of
+        {match, [UUID]} ->
+            {ok, UUID};
+        _ ->
+            {error, <<"couldn't obtain jwt request id from error message">>}
+    end;
+get_jwt_error_request_id(_) ->
+    {error, <<"couldn't obtain jwt request id from error message">>}.
+
+get_login_failure_details(ODBCPool, RequestId) ->
+    try
+        ecpool:pick_and_do(
+            ODBCPool,
+            fun(ConnPid) ->
+                ?MODULE:do_get_login_failure_details(ConnPid, RequestId)
+            end,
+            %% Must be executed by the ecpool worker, which owns the ODBC connection.
+            handover
+        )
+    catch
+        K:E:Stacktrace ->
+            {error, #{kind => K, reason => E, stacktrace => Stacktrace}}
+    end.
+
+do_get_login_failure_details(ConnPid, RequestId) ->
+    SQL0 = iolist_to_binary([
+        <<"select SYSTEM$GET_LOGIN_FAILURE_DETAILS('">>,
+        RequestId,
+        <<"')">>
+    ]),
+    SQL = binary_to_list(SQL0),
+    Timeout = 5_000,
+    odbc:sql_query(ConnPid, SQL, Timeout).
+
 %%------------------------------------------------------------------------------
 %% Tests
 %%------------------------------------------------------------------------------

+ 44 - 15
apps/emqx_bridge_snowflake/test/emqx_bridge_snowflake_SUITE.erl

@@ -31,7 +31,9 @@
 -define(STAGE, <<"teststage0">>).
 -define(TABLE, <<"test0">>).
 -define(WAREHOUSE, <<"testwarehouse">>).
+-define(PIPE, <<"testpipe0">>).
 -define(PIPE_USER, <<"snowpipeuser">>).
+-define(PIPE_USER_RO, <<"snowpipe_ro_user">>).
 
 -define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])).
 -define(CONF_COLUMN_ORDER(T), [
@@ -146,7 +148,7 @@ end_per_testcase(_Testcase, Config) ->
 timetrap(Config) ->
     case ?config(mock, Config) of
         true ->
-            {seconds, 10};
+            {seconds, 20};
         false ->
             {seconds, 150}
     end.
@@ -294,7 +296,7 @@ aggregated_action_config(Overrides0) ->
                     <<"private_key">> => private_key(),
                     <<"database">> => ?DATABASE,
                     <<"schema">> => ?SCHEMA,
-                    <<"pipe">> => <<"testpipe0">>,
+                    <<"pipe">> => ?PIPE,
                     <<"stage">> => ?STAGE,
                     <<"pipe_user">> => ?PIPE_USER,
                     <<"connect_timeout">> => <<"5s">>,
@@ -1049,28 +1051,42 @@ t_aggreg_invalid_column_values(Config0) ->
     ),
     ok.
 
-t_aggreg_inexistent_database(init, Config) when is_list(Config) ->
-    t_aggreg_inexistent_database(init, maps:from_list(Config));
-t_aggreg_inexistent_database(init, #{mock := true} = Config) ->
+%% Checks that we enqueue aggregated buffer errors if the delivery fails, and that
+%% reflects on the action status.
+t_aggreg_failed_delivery(init, Config) when is_list(Config) ->
+    t_aggreg_failed_delivery(init, maps:from_list(Config));
+t_aggreg_failed_delivery(init, #{mock := true} = Config) ->
     Mod = ?CONN_MOD,
-    meck:expect(Mod, do_stage_file, fun(
-        _ConnPid, _Filename, _Database, _Schema, _Stage, _ActionName
+    meck:expect(Mod, do_insert_files_request, fun(
+        _HTTPPool, _Req, _RequestTTL, _MaxRetries
     ) ->
-        Msg =
-            "SQL compilation error:, Database 'INEXISTENT' does not"
-            " exist or not authorized. SQLSTATE IS: 02000",
-        {error, Msg}
+        Headers = [
+            {<<"content-type">>, <<"application/json">>},
+            {<<"date">>, <<"Wed, 09 Oct 2024 13:12:55 GMT">>},
+            {<<"strict-transport-security">>, <<"max-age=31536000">>},
+            {<<"x-content-type-options">>, <<"nosniff">>},
+            {<<"x-frame-options">>, <<"deny">>},
+            {<<"content-length">>, <<"175">>},
+            {<<"connection">>, <<"keep-alive">>}
+        ],
+        Body = <<
+            "{\n  \"data\" : null,\n  \"code\" : \"390403\",\n  "
+            "\"message\" : \"Not authorized to manage the specified object. "
+            "Pipe access permission denied\",\n  \"success\" : false,\n  "
+            "\"headers\" : null\n}"
+        >>,
+        {ok, 403, Headers, Body}
     end),
     maps:to_list(Config);
-t_aggreg_inexistent_database(init, #{} = Config) ->
+t_aggreg_failed_delivery(init, #{} = Config) ->
     maps:to_list(Config).
-t_aggreg_inexistent_database(Config) ->
+t_aggreg_failed_delivery(Config) ->
     ?check_trace(
         emqx_bridge_v2_testlib:snk_timetrap(),
         begin
             {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(
                 Config,
-                #{<<"parameters">> => #{<<"database">> => <<"inexistent">>}}
+                #{<<"parameters">> => #{<<"pipe_user">> => ?PIPE_USER_RO}}
             ),
             ActionResId = emqx_bridge_v2_testlib:bridge_id(Config),
             %% BeginMark = get_begin_mark(Config, ActionResId),
@@ -1136,6 +1152,20 @@ t_wrong_snowpipe_user(init, #{mock := true} = Config) ->
         Body = emqx_utils_json:encode(InsertReportResponse),
         {ok, 401, Headers, Body}
     end),
+    meck:expect(Mod, do_get_login_failure_details, fun(_Connpid, _RequestId) ->
+        Details = #{
+            <<"clientIP">> => <<"127.0.0.1">>,
+            <<"clientType">> => <<"OTHER">>,
+            <<"clientVersion">> => <<"">>,
+            <<"errorCode">> => <<"JWT_TOKEN_INVALID_ISSUE_TIME">>,
+            <<"timestamp">> => 1728418411,
+            <<"username">> => null
+        },
+        Col = binary_to_list(emqx_utils_json:encode(Details)),
+        {selected, ["SYSTEM$GET_LOGIN_FAILURE_DETAILS('92D86B2E-D652-4D2D-9780-A6ED28B38356')"], [
+            {Col}
+        ]}
+    end),
     maps:to_list(Config);
 t_wrong_snowpipe_user(init, #{} = Config) ->
     maps:to_list(Config).
@@ -1171,4 +1201,3 @@ t_wrong_snowpipe_user(Config) ->
 %%       + Not supported when using pipes: `ABORT_STATEMENT'
 %%    - Missing data for a required column
 %% * Transient failure when staging file
-%% * Transient failure when calling `insertFiles'

+ 4 - 2
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl

@@ -171,7 +171,8 @@ fields("config") ->
                     default => #{},
                     desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
                 }
-            )}
+            )},
+        emqx_bridge_v2_schema:undefined_as_null_field()
     ] ++ driver_fields() ++
         (emqx_bridge_sqlserver_connector:fields(config) --
             emqx_connector_schema_lib:prepare_statement_fields());
@@ -194,7 +195,8 @@ fields(action_parameters) ->
             mk(
                 emqx_schema:template(),
                 #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
-            )}
+            )},
+        emqx_bridge_v2_schema:undefined_as_null_field()
     ];
 fields("creation_opts") ->
     emqx_resource_schema:fields("creation_opts");

+ 29 - 15
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl

@@ -44,8 +44,11 @@
     on_format_query_result/1
 ]).
 
-%% callbacks for ecpool
--export([connect/1]).
+%% `ecpool_worker' API
+-export([
+    connect/1,
+    disconnect/1
+]).
 
 %% Internal exports used to execute code with ecpool worker
 -export([do_get_status/1, worker_do_insert/3]).
@@ -213,7 +216,8 @@ on_start(
         {password, maps:get(password, Config, emqx_secret:wrap(""))},
         {driver, Driver},
         {database, Database},
-        {pool_size, PoolSize}
+        {pool_size, PoolSize},
+        {on_disconnect, {?MODULE, disconnect, []}}
     ],
 
     State = #{
@@ -248,9 +252,9 @@ on_add_channel(
     {ok, NewState}.
 
 create_channel_state(
-    #{parameters := Conf} = _ChannelConfig
+    #{parameters := ChannelConf}
 ) ->
-    State = #{sql_templates => parse_sql_template(Conf)},
+    State = #{sql_templates => parse_sql_template(ChannelConf), channel_conf => ChannelConf},
     {ok, State}.
 
 on_remove_channel(
@@ -350,6 +354,10 @@ connect(Options) ->
     Opts = proplists:get_value(options, Options, []),
     odbc:connect(ConnectStr, Opts).
 
+-spec disconnect(connection_reference()) -> ok | {error, term()}.
+disconnect(ConnectionPid) ->
+    odbc:disconnect(ConnectionPid).
+
 -spec do_get_status(connection_reference()) -> Result :: boolean().
 do_get_status(Conn) ->
     case execute(Conn, <<"SELECT 1">>) of
@@ -414,10 +422,10 @@ do_query(
 
     ChannelId = get_channel_id(Query),
     QueryTuple = get_query_tuple(Query),
-    #{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels),
-
+    #{sql_templates := Templates} = ChannelState = maps:get(ChannelId, Channels),
+    ChannelConf = maps:get(channel_conf, ChannelState, #{}),
     %% only insert sql statement for single query and batch query
-    case apply_template(QueryTuple, Templates) of
+    case apply_template(QueryTuple, Templates, ChannelConf) of
         {?ACTION_SEND_MESSAGE, SQL} ->
             emqx_trace:rendered_action_template(ChannelId, #{
                 sql => SQL
@@ -560,36 +568,42 @@ parse_sql_template([], BatchInsertTks) ->
 
 %% single insert
 apply_template(
-    {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
+    {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates, ChannelConf
 ) ->
     %% TODO: fix emqx_placeholder:proc_tmpl/2
     %% it won't add single quotes for string
-    apply_template([Query], Templates);
+    apply_template([Query], Templates, ChannelConf);
 %% batch inserts
 apply_template(
     [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs,
-    #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates
+    #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates,
+    ChannelConf
 ) ->
     case maps:get(Key, BatchInsertsTks, undefined) of
         undefined ->
             BatchReqs;
         #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
-            SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
+            SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks, ChannelConf),
             {Key, SQL}
     end;
-apply_template(Query, Templates) ->
+apply_template(Query, Templates, _) ->
     %% TODO: more detail information
     ?SLOG(error, #{msg => "apply_sql_template_failed", query => Query, templates => Templates}),
     {error, failed_to_apply_sql_template}.
 
-proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
+proc_batch_sql(BatchReqs, BatchInserts, Tokens, ChannelConf) ->
     Values = erlang:iolist_to_binary(
         lists:join($,, [
-            emqx_placeholder:proc_sql_param_str(Tokens, Msg)
+            proc_msg(Tokens, Msg, ChannelConf)
          || {_, Msg} <- BatchReqs
         ])
     ),
     <<BatchInserts/binary, " values ", Values/binary>>.
 
+proc_msg(Tokens, Msg, #{undefined_vars_as_null := true}) ->
+    emqx_placeholder:proc_sql_param_str2(Tokens, Msg);
+proc_msg(Tokens, Msg, _) ->
+    emqx_placeholder:proc_sql_param_str(Tokens, Msg).
+
 to_bin(List) when is_list(List) ->
     unicode:characters_to_binary(List, utf8).

+ 27 - 0
apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl

@@ -189,6 +189,33 @@ t_setup_via_config_and_publish(Config) ->
     ),
     ok.
 
+t_undefined_vars_as_null(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config, #{<<"undefined_vars_as_null">> => true})
+    ),
+    SentData = maps:put(payload, undefined, sent_data("tmp")),
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := sqlserver_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                [{null}],
+                connect_and_get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(sqlserver_connector_query_return, Trace0),
+            ?assertMatch([#{result := ok}], Trace),
+            ok
+        end
+    ),
+    ok.
+
 t_setup_via_http_api_and_publish(Config) ->
     BridgeType = ?config(sqlserver_bridge_type, Config),
     Name = ?config(sqlserver_name, Config),

+ 1 - 1
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_tdengine, [
     {description, "EMQX Enterprise TDEngine Bridge"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {applications, [
         kernel,

+ 3 - 1
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl

@@ -90,6 +90,7 @@ fields("config") ->
                     format => <<"sql">>
                 }
             )},
+        emqx_bridge_v2_schema:undefined_as_null_field(),
         {local_topic, mk(binary(), #{desc => ?DESC("local_topic"), default => undefined})}
     ] ++
         emqx_resource_schema:fields("resource_opts") ++
@@ -131,7 +132,8 @@ fields(action_parameters) ->
                     default => ?DEFAULT_SQL,
                     format => <<"sql">>
                 }
-            )}
+            )},
+        emqx_bridge_v2_schema:undefined_as_null_field()
     ];
 fields("post_bridge_v2") ->
     emqx_bridge_schema:type_and_name_fields(enum([tdengine])) ++ fields(action_config);

+ 18 - 11
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -35,7 +35,7 @@
 
 -export([connector_examples/1]).
 
--export([connect/1, do_get_status/1, execute/3, do_batch_insert/5]).
+-export([connect/1, do_get_status/1, execute/3, do_batch_insert/6]).
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
@@ -188,8 +188,8 @@ on_stop(InstanceId, _State) ->
 
 on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) ->
     case maps:find(ChannelId, Channels) of
-        {ok, #{insert := Tokens, opts := Opts}} ->
-            Query = emqx_placeholder:proc_tmpl(Tokens, Data),
+        {ok, #{insert := Tokens, opts := Opts} = ChannelState} ->
+            Query = proc_nullable_tmpl(Tokens, Data, maps:get(channel_conf, ChannelState, #{})),
             emqx_trace:rendered_action_template(ChannelId, #{query => Query}),
             do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State);
         _ ->
@@ -203,11 +203,12 @@ on_batch_query(
     #{channels := Channels} = State
 ) ->
     case maps:find(ChannelId, Channels) of
-        {ok, #{batch := Tokens, opts := Opts}} ->
+        {ok, #{batch := Tokens, opts := Opts} = ChannelState} ->
             TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId),
+            ChannelConf = maps:get(channel_conf, ChannelState, #{}),
             do_query_job(
                 InstanceId,
-                {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedCTX]},
+                {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedCTX, ChannelConf]},
                 State
             );
         _ ->
@@ -273,7 +274,7 @@ on_add_channel(
     #{channels := Channels} = OldState,
     ChannelId,
     #{
-        parameters := #{database := Database, sql := SQL}
+        parameters := #{database := Database, sql := SQL} = ChannelConf
     }
 ) ->
     case maps:is_key(ChannelId, Channels) of
@@ -283,7 +284,8 @@ on_add_channel(
             case parse_prepare_sql(SQL) of
                 {ok, Result} ->
                     Opts = [{db_name, Database}],
-                    Channels2 = Channels#{ChannelId => Result#{opts => Opts}},
+                    Channel = Result#{opts => Opts, channel_conf => ChannelConf},
+                    Channels2 = Channels#{ChannelId => Channel},
                     {ok, OldState#{channels := Channels2}};
                 Error ->
                     Error
@@ -349,8 +351,8 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
 execute(Conn, Query, Opts) ->
     tdengine:insert(Conn, Query, Opts).
 
-do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX) ->
-    SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
+do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX, ChannelConf) ->
+    SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>, ChannelConf),
     try
         emqx_trace:rendered_action_template_with_ctx(
             TraceRenderedCTX,
@@ -362,16 +364,21 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX) ->
             {error, Reason}
     end.
 
-aggregate_query(BatchTks, BatchReqs, Acc) ->
+aggregate_query(BatchTks, BatchReqs, Acc, ChannelConf) ->
     lists:foldl(
         fun({_, Data}, InAcc) ->
-            InsertPart = emqx_placeholder:proc_tmpl(BatchTks, Data),
+            InsertPart = proc_nullable_tmpl(BatchTks, Data, ChannelConf),
             <<InAcc/binary, " ", InsertPart/binary>>
         end,
         Acc,
         BatchReqs
     ).
 
+proc_nullable_tmpl(Template, Data, #{undefined_vars_as_null := true}) ->
+    emqx_placeholder:proc_nullable_tmpl(Template, Data);
+proc_nullable_tmpl(Template, Data, _) ->
+    emqx_placeholder:proc_tmpl(Template, Data).
+
 connect(Opts) ->
     %% TODO: teach `tdengine` to accept 0-arity closures as passwords.
     {value, {password, Secret}, OptsRest} = lists:keytake(password, 1, Opts),

+ 56 - 14
apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl

@@ -27,7 +27,6 @@
 -define(SQL_DROP_TABLE, "DROP TABLE t_mqtt_msg").
 -define(SQL_DROP_STABLE, "DROP STABLE s_tab").
 -define(SQL_DELETE, "DELETE FROM t_mqtt_msg").
--define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg").
 
 -define(AUTO_CREATE_BRIDGE,
     "insert into ${clientid} USING s_tab TAGS ('${clientid}') values (${timestamp}, '${payload}')"
@@ -77,23 +76,23 @@ groups() ->
         {without_batch, TCs -- MustBatchCases}
     ].
 
+-define(APPS, [
+    emqx,
+    emqx_conf,
+    emqx_bridge_tdengine,
+    emqx_connector,
+    emqx_bridge,
+    emqx_rule_engine,
+    emqx_management
+]).
+
 init_per_suite(Config) ->
     emqx_bridge_v2_testlib:init_per_suite(
-        Config,
-        [
-            emqx,
-            emqx_conf,
-            emqx_bridge_tdengine,
-            emqx_connector,
-            emqx_bridge,
-            emqx_rule_engine,
-            emqx_management,
-            emqx_mgmt_api_test_util:emqx_dashboard()
-        ]
+        Config, ?APPS ++ [emqx_mgmt_api_test_util:emqx_dashboard()]
     ).
 
 end_per_suite(Config) ->
-    emqx_bridge_v2_testlib:end_per_suite(Config).
+    emqx_bridge_v2_testlib:end_per_suite([{apps, ?APPS} | Config]).
 
 init_per_group(async, Config) ->
     [{query_mode, async} | Config];
@@ -304,8 +303,11 @@ connect_and_clear_table(Config) ->
     ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)).
 
 connect_and_get_payload(Config) ->
+    connect_and_get_column(Config, "SELECT payload FROM t_mqtt_msg").
+
+connect_and_get_column(Config, Select) ->
     ?WITH_CON(
-        {ok, #{<<"code">> := 0, <<"data">> := Result}} = directly_query(Con, ?SQL_SELECT)
+        {ok, #{<<"code">> := 0, <<"data">> := Result}} = directly_query(Con, Select)
     ),
     Result.
 
@@ -376,6 +378,41 @@ t_simple_insert(Config) ->
         connect_and_get_payload(Config)
     ).
 
+t_simple_insert_undefined(Config) ->
+    connect_and_clear_table(Config),
+
+    MakeMessageFun = fun() ->
+        #{payload => undefined, timestamp => 1668602148000, second_ts => 1668602148010}
+    end,
+
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, tdengine_connector_query_return
+    ),
+
+    ?assertMatch(
+        %% the old behavior without undefined_vars_as_null
+        [[<<"undefined">>], [<<"undefined">>]],
+        connect_and_get_payload(Config)
+    ).
+
+t_undefined_vars_as_null(Config0) ->
+    Config = patch_bridge_config(Config0, #{
+        <<"parameters">> => #{<<"undefined_vars_as_null">> => true}
+    }),
+    connect_and_clear_table(Config),
+
+    MakeMessageFun = fun() ->
+        #{payload => undefined, timestamp => 1668602148000, second_ts => 1668602148010}
+    end,
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, tdengine_connector_query_return
+    ),
+
+    ?assertMatch(
+        [[<<"null">>], [<<"null">>]],
+        connect_and_get_payload(Config)
+    ).
+
 t_batch_insert(Config) ->
     connect_and_clear_table(Config),
     ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
@@ -503,3 +540,8 @@ t_auto_create_batch_insert(Config) ->
         end,
         [ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2]
     ).
+
+patch_bridge_config(Config, Overrides) ->
+    BridgeConfig0 = ?config(bridge_config, Config),
+    BridgeConfig1 = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+    [{bridge_config, BridgeConfig1} | proplists:delete(bridge_config, Config)].

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link.app.src

@@ -2,7 +2,7 @@
 {application, emqx_cluster_link, [
     {description, "EMQX Cluster Linking"},
     % strict semver, bump manually!
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {modules, []},
     {registered, []},
     {applications, [

+ 6 - 1
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl

@@ -428,7 +428,12 @@ enqueue_status_error({upload_failed, Error}, St = #st{errors = QErrors}) ->
     %% TODO
     %% This code feels too specific, errors probably need classification.
     St#st{errors = queue:in(Error, QErrors)};
-enqueue_status_error(_AnotherError, St) ->
+enqueue_status_error(_AnotherError, St = #st{name = Name}) ->
+    ?SLOG(debug, #{
+        msg => "aggregated_buffer_error_not_enqueued",
+        error => _AnotherError,
+        action => Name
+    }),
     St.
 
 handle_take_error(St = #st{errors = QErrors0}) ->

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -2,7 +2,7 @@
 {application, emqx_dashboard, [
     {description, "EMQX Web Dashboard"},
     % strict semver, bump manually!
-    {vsn, "5.1.5"},
+    {vsn, "5.1.6"},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
     {applications, [

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.erl

@@ -274,7 +274,7 @@ listener_name(Protocol) ->
 
 audit_log_fun() ->
     case emqx_release:edition() of
-        ee -> fun emqx_dashboard_audit:log/2;
+        ee -> emqx_dashboard_audit:log_fun();
         ce -> undefined
     end.
 

+ 1 - 0
apps/emqx_dashboard/src/emqx_dashboard_api.erl

@@ -219,6 +219,7 @@ field(backend) ->
 login(post, #{body := Params}) ->
     Username = maps:get(<<"username">>, Params),
     Password = maps:get(<<"password">>, Params),
+    minirest_handler:update_log_meta(#{log_from => dashboard, log_source => Username}),
     case emqx_dashboard_admin:sign_token(Username, Password) of
         {ok, Role, Token} ->
             ?SLOG(info, #{msg => "dashboard_login_successful", username => Username}),

+ 67 - 32
apps/emqx_dashboard/src/emqx_dashboard_audit.erl

@@ -19,30 +19,47 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/http_api.hrl").
 %% API
--export([log/2]).
-
-%% filter high frequency events
--define(HIGH_FREQUENCY_REQUESTS, [
-    <<"/publish">>,
-    <<"/clients/:clientid/subscribe">>,
-    <<"/clients/:clientid/unsubscribe">>,
-    <<"/publish/bulk">>,
-    <<"/clients/:clientid/unsubscribe/bulk">>,
-    <<"/clients/:clientid/subscribe/bulk">>,
-    <<"/clients/kickout/bulk">>
-]).
-
-log(#{code := Code, method := Method} = Meta, Req) ->
+-export([log/2, log_fun/0, importance/1]).
+
+%% In the previous versions,
+%% this module used the request method to determine whether the request should be logged,
+%% but here are some exceptions:
+%% 1. the OIDC callback uses the `GET` method, but it is important
+%% 2. some endpoints (called frequency requests) use the `POST` method,
+%%    but most of the time we do not want to log them
+%% So an auxiliary `importance` metadata was introduced.
+%%
+%% The strategy is:
+%% 1. Use `high` to mark an important `GET` method
+%% 2. Use `low` to mark the frequency methods
+%% 3. `medium` is the default importance and is set automatically
+
+-define(AUDIT_IMPORTANCE_HIGH, 100).
+-define(AUDIT_IMPORTANCE_MEDIUM, 60).
+-define(AUDIT_IMPORTANCE_LOW, 30).
+
+-define(CODE_METHOD_NOT_ALLOWED, 405).
+
+log_fun() ->
+    {emqx_dashboard_audit, log, #{importance => medium}}.
+
+importance(Level) when
+    Level =:= high;
+    Level =:= medium;
+    Level =:= low
+->
+    #{importance => Level}.
+
+log(#{code := Code, method := Method, importance := Importance} = Meta, Req) ->
     %% Keep level/2 and log_meta/1 inside of this ?AUDIT macro
-    ?AUDIT(level(Method, Code), log_meta(Meta, Req)).
-
-log_meta(Meta, Req) ->
-    #{operation_id := OperationId, method := Method} = Meta,
-    case
-        Method =:= get orelse
-            (lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso
-                ignore_high_frequency_request())
-    of
+    ImportanceNum = importance_to_num(Code, Importance),
+    ?AUDIT(level(ImportanceNum, Method, Code), log_meta(ImportanceNum, Meta, Req)).
+
+log_meta(Importance, #{method := get} = _Meta, _Req) when Importance =< ?AUDIT_IMPORTANCE_MEDIUM ->
+    undefined;
+log_meta(Importance, Meta, Req) ->
+    #{method := Method} = Meta,
+    case (Importance =< ?AUDIT_IMPORTANCE_LOW) andalso ignore_high_frequency_request() of
         true ->
             undefined;
         false ->
@@ -72,18 +89,20 @@ from(#{auth_type := jwt_token}) ->
     dashboard;
 from(#{auth_type := api_key}) ->
     rest_api;
-from(#{operation_id := <<"/login">>}) ->
-    dashboard;
+from(#{log_from := From}) ->
+    From;
 from(#{code := Code} = Meta) when Code =:= 401 orelse Code =:= 403 ->
     case maps:find(failure, Meta) of
         {ok, #{code := 'BAD_API_KEY_OR_SECRET'}} -> rest_api;
         {ok, #{code := 'UNAUTHORIZED_ROLE', message := ?API_KEY_NOT_ALLOW_MSG}} -> rest_api;
         %% 'TOKEN_TIME_OUT' 'BAD_TOKEN' is dashboard code.
         _ -> dashboard
-    end.
+    end;
+from(_) ->
+    unknown.
 
 source(#{source := Source}) -> Source;
-source(#{operation_id := <<"/login">>, body := #{<<"username">> := Username}}) -> Username;
+source(#{log_source := Source}) -> Source;
 source(_Meta) -> <<"">>.
 
 source_ip(Req) ->
@@ -106,15 +125,31 @@ operation_type(Meta) ->
 http_request(Meta) ->
     maps:with([method, headers, bindings, body], Meta).
 
+operation_result(302, _) -> success;
 operation_result(Code, _) when Code >= 300 -> failure;
 operation_result(_, #{failure := _}) -> failure;
 operation_result(_, _) -> success.
 
-level(get, _Code) -> debug;
-level(_, Code) when Code >= 200 andalso Code < 300 -> info;
-level(_, Code) when Code >= 300 andalso Code < 400 -> warning;
-level(_, Code) when Code >= 400 andalso Code < 500 -> error;
-level(_, _) -> critical.
+%%
+level(?AUDIT_IMPORTANCE_HIGH, _, _) -> warning;
+level(_, get, _Code) -> debug;
+level(_, _, Code) when Code >= 200 andalso Code < 300 -> info;
+level(_, _, Code) when Code >= 300 andalso Code < 400 -> warning;
+level(_, _, Code) when Code >= 400 andalso Code < 500 -> error;
+level(_, _, _) -> critical.
 
 ignore_high_frequency_request() ->
     emqx_conf:get([log, audit, ignore_high_frequency_request], true).
+
+%% This is a special case.
+%% An illegal request (e.g. A `GET` request to a `POST`-only endpoint) does not have metadata,
+%% its `importance` is the default value,
+%% so we have to manually increase the `importance` to record this request.
+importance_to_num(?CODE_METHOD_NOT_ALLOWED, _) ->
+    ?AUDIT_IMPORTANCE_HIGH;
+importance_to_num(_, high) ->
+    ?AUDIT_IMPORTANCE_HIGH;
+importance_to_num(_, medium) ->
+    ?AUDIT_IMPORTANCE_MEDIUM;
+importance_to_num(_, low) ->
+    ?AUDIT_IMPORTANCE_LOW.

+ 1 - 1
apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src

@@ -1,6 +1,6 @@
 {application, emqx_dashboard_sso, [
     {description, "EMQX Dashboard Single Sign-On"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, [emqx_dashboard_sso_sup]},
     {applications, [
         kernel,

+ 2 - 0
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl

@@ -157,6 +157,7 @@ running(get, _Request) ->
     {200, emqx_dashboard_sso_manager:running()}.
 
 login(post, #{bindings := #{backend := Backend}, body := Body} = Request) ->
+    minirest_handler:update_log_meta(#{log_from => dashboard, log_source => Backend}),
     case emqx_dashboard_sso_manager:lookup_state(Backend) of
         undefined ->
             {404, #{code => ?BACKEND_NOT_FOUND, message => <<"Backend not found">>}};
@@ -168,6 +169,7 @@ login(post, #{bindings := #{backend := Backend}, body := Body} = Request) ->
                         request => emqx_utils:redact(Request)
                     }),
                     Username = maps:get(<<"username">>, Body),
+                    minirest_handler:update_log_meta(#{log_source => Username}),
                     {200, login_meta(Username, Role, Token, Backend)};
                 {redirect, Redirect} ->
                     ?SLOG(info, #{

+ 4 - 1
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_api.erl

@@ -67,7 +67,8 @@ schema("/sso/oidc/callback") ->
                 401 => response_schema(401),
                 404 => response_schema(404)
             },
-            security => []
+            security => [],
+            log_meta => emqx_dashboard_audit:importance(high)
         }
     }.
 
@@ -75,6 +76,7 @@ schema("/sso/oidc/callback") ->
 %% API
 %%--------------------------------------------------------------------
 code_callback(get, #{query_string := QS}) ->
+    minirest_handler:update_log_meta(#{log_from => oidc}),
     case ensure_sso_state(QS) of
         {ok, Target} ->
             ?SLOG(info, #{
@@ -185,6 +187,7 @@ retrieve_userinfo(
                 user_info => UserInfo
             }),
             Username = emqx_placeholder:proc_tmpl(NameTks, UserInfo),
+            minirest_handler:update_log_meta(#{log_source => Username}),
             ensure_user_exists(Cfg, Username);
         {error, _Reason} = Error ->
             Error

+ 1 - 1
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml.erl

@@ -240,7 +240,7 @@ gen_redirect_response(DashboardAddr, Username) ->
     case ensure_user_exists(Username) of
         {ok, Role, Token} ->
             Target = login_redirect_target(DashboardAddr, Username, Role, Token),
-            {redirect, {302, ?RESPHEADERS#{<<"location">> => Target}, ?REDIRECT_BODY}};
+            {redirect, Username, {302, ?RESPHEADERS#{<<"location">> => Target}, ?REDIRECT_BODY}};
         {error, Reason} ->
             {error, Reason}
     end.

+ 3 - 1
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_saml_api.erl

@@ -98,14 +98,16 @@ sp_saml_metadata(get, _Req) ->
     end.
 
 sp_saml_callback(post, Req) ->
+    minirest_handler:update_log_meta(#{log_from => saml}),
     case emqx_dashboard_sso_manager:lookup_state(saml) of
         State = #{enable := true} ->
             case (provider(saml)):callback(Req, State) of
-                {redirect, Redirect} ->
+                {redirect, Username, Redirect} ->
                     ?SLOG(info, #{
                         msg => "dashboard_saml_sso_login_successful",
                         redirect => "SAML login successful. Redirecting with LoginMeta."
                     }),
+                    minirest_handler:update_log_meta(#{log_source => Username}),
                     Redirect;
                 {error, Reason} ->
                     ?SLOG(info, #{

+ 1 - 1
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.app.src

@@ -2,7 +2,7 @@
 {application, emqx_ds_builtin_local, [
     {description, "A DS backend that stores all data locally and thus doesn't support clustering."},
     % strict semver, bump manually!
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, gproc, mria, rocksdb, emqx_durable_storage, emqx_utils]},

+ 1 - 1
apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft.app.src

@@ -2,7 +2,7 @@
 {application, emqx_ds_builtin_raft, [
     {description, "Raft replication layer for the durable storage"},
     % strict semver, bump manually!
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, gproc, mria, ra, emqx_durable_storage]},

+ 1 - 1
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ds_shared_sub, [
     {description, "EMQX DS Shared Subscriptions"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, [emqx_ds_shared_sub_sup]},
     {mod, {emqx_ds_shared_sub_app, []}},
     {applications, [

+ 31 - 9
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -16,8 +16,10 @@
 ).
 
 -define(DESC_NOT_FOUND, <<"Queue not found">>).
--define(RESP_NOT_FOUND,
-    {404, #{code => <<"NOT_FOUND">>, message => ?DESC_NOT_FOUND}}
+-define(DESC_DISABLED, <<"Feature is disabled">>).
+-define(RESP_NOT_FOUND, ?RESP_NOT_FOUND(?DESC_NOT_FOUND)).
+-define(RESP_NOT_FOUND(MSG),
+    {404, #{code => <<"NOT_FOUND">>, message => MSG}}
 ).
 
 -define(DESC_CREATE_CONFICT, <<"Queue with given group name and topic filter already exists">>).
@@ -55,13 +57,21 @@
     '/durable_queues/:id'/2
 ]).
 
+%% Internal exports
+-export([
+    check_enabled/2
+]).
+
 -import(hoconsc, [mk/2, ref/1, ref/2]).
 -import(emqx_dashboard_swagger, [error_codes/2]).
 
 namespace() -> "durable_queues".
 
 api_spec() ->
-    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+    emqx_dashboard_swagger:spec(?MODULE, #{
+        check_schema => true,
+        filter => fun ?MODULE:check_enabled/2
+    }).
 
 paths() ->
     [
@@ -82,7 +92,8 @@ schema("/durable_queues") ->
             ],
             responses => #{
                 200 => resp_list_durable_queues(),
-                400 => error_codes(['BAD_REQUEST'], ?DESC_BAD_REQUEST)
+                400 => error_codes(['BAD_REQUEST'], ?DESC_BAD_REQUEST),
+                404 => error_codes(['NOT_FOUND'], ?DESC_DISABLED)
             }
         },
         post => #{
@@ -92,7 +103,8 @@ schema("/durable_queues") ->
             'requestBody' => durable_queue_post(),
             responses => #{
                 201 => resp_create_durable_queue(),
-                409 => error_codes(['CONFLICT'], ?DESC_CREATE_CONFICT)
+                409 => error_codes(['CONFLICT'], ?DESC_CREATE_CONFICT),
+                404 => error_codes(['NOT_FOUND'], ?DESC_DISABLED)
             }
         }
     };
@@ -125,6 +137,12 @@ schema("/durable_queues/:id") ->
         }
     }.
 
+check_enabled(Request, _ReqMeta) ->
+    case emqx_ds_shared_sub_config:enabled() of
+        true -> {ok, Request};
+        false -> ?RESP_NOT_FOUND(<<"Durable queues are disabled">>)
+    end.
+
 '/durable_queues'(get, #{query_string := QString}) ->
     Cursor = maps:get(<<"cursor">>, QString, undefined),
     Limit = maps:get(<<"limit">>, QString),
@@ -233,7 +251,7 @@ durable_queue_get() ->
     ref(durable_queue).
 
 durable_queue_post() ->
-    map().
+    ref(durable_queue_args).
 
 roots() -> [].
 
@@ -246,9 +264,13 @@ fields(durable_queue) ->
         {created_at,
             mk(emqx_utils_calendar:epoch_millisecond(), #{
                 desc => <<"Queue creation time">>
-            })},
-        {group, mk(binary(), #{})},
-        {topic, mk(binary(), #{})},
+            })}
+        | fields(durable_queue_args)
+    ];
+fields(durable_queue_args) ->
+    [
+        {group, mk(binary(), #{required => true})},
+        {topic, mk(binary(), #{required => true})},
         {start_time, mk(emqx_utils_calendar:epoch_millisecond(), #{})}
     ];
 fields(resp_list_durable_queues) ->

+ 19 - 3
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -55,11 +55,12 @@ end_per_suite(Config) ->
     ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok.
 
-init_per_testcase(_TC, Config) ->
+init_per_testcase(TC, Config) ->
     ok = snabbkaffe:start_trace(),
-    Config.
+    emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
 
-end_per_testcase(_TC, _Config) ->
+end_per_testcase(TC, Config) ->
+    _ = emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config),
     ok = snabbkaffe:stop(),
     ok = emqx_ds_shared_sub_registry:purge(),
     ok = destroy_queues(),
@@ -244,6 +245,21 @@ t_duplicate_queue(_Config) ->
         })
     ).
 
+t_404_when_disable('init', Config) ->
+    {ok, _} = emqx_conf:update([durable_queues], #{<<"enable">> => false}, #{}),
+    Config;
+t_404_when_disable('end', _Config) ->
+    {ok, _} = emqx_conf:update([durable_queues], #{<<"enable">> => true}, #{}).
+
+t_404_when_disable(_Config) ->
+    ?assertMatch(
+        {ok, 404, #{}},
+        api(post, ["durable_queues"], #{
+            <<"group">> => <<"disabled">>,
+            <<"topic">> => <<"#">>
+        })
+    ).
+
 %%--------------------------------------------------------------------
 
 destroy_queues() ->

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,7 +2,7 @@
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
-    {vsn, "0.4.0"},
+    {vsn, "0.4.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils, gen_rpc]},

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway, [
     {description, "The Gateway management application"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {registered, []},
     {mod, {emqx_gateway_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.3.5"},
+    {vsn, "0.3.6"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl, redbug]},

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -2,7 +2,7 @@
 {application, emqx_management, [
     {description, "EMQX Management API and CLI"},
     % strict semver, bump manually!
-    {vsn, "5.3.1"},
+    {vsn, "5.3.2"},
     {modules, []},
     {registered, [emqx_management_sup]},
     {applications, [

+ 10 - 5
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -198,7 +198,8 @@ schema("/clients/kickout/bulk") ->
             ),
             responses => #{
                 204 => <<"Kick out clients successfully">>
-            }
+            },
+            log_meta => emqx_dashboard_audit:importance(low)
         }
     };
 schema("/clients/:clientid") ->
@@ -288,7 +289,8 @@ schema("/clients/:clientid/subscribe") ->
                 404 => emqx_dashboard_swagger:error_codes(
                     ['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
                 )
-            }
+            },
+            log_meta => emqx_dashboard_audit:importance(low)
         }
     };
 schema("/clients/:clientid/subscribe/bulk") ->
@@ -304,7 +306,8 @@ schema("/clients/:clientid/subscribe/bulk") ->
                 404 => emqx_dashboard_swagger:error_codes(
                     ['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
                 )
-            }
+            },
+            log_meta => emqx_dashboard_audit:importance(low)
         }
     };
 schema("/clients/:clientid/unsubscribe") ->
@@ -320,7 +323,8 @@ schema("/clients/:clientid/unsubscribe") ->
                 404 => emqx_dashboard_swagger:error_codes(
                     ['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
                 )
-            }
+            },
+            log_meta => emqx_dashboard_audit:importance(low)
         }
     };
 schema("/clients/:clientid/unsubscribe/bulk") ->
@@ -336,7 +340,8 @@ schema("/clients/:clientid/unsubscribe/bulk") ->
                 404 => emqx_dashboard_swagger:error_codes(
                     ['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
                 )
-            }
+            },
+            log_meta => emqx_dashboard_audit:importance(low)
         }
     };
 schema("/clients/:clientid/keepalive") ->

+ 4 - 2
apps/emqx_management/src/emqx_mgmt_api_publish.erl

@@ -62,7 +62,8 @@ schema("/publish") ->
                 ?PARTIALLY_OK => hoconsc:mk(hoconsc:ref(?MODULE, publish_error)),
                 ?BAD_REQUEST => hoconsc:mk(hoconsc:ref(?MODULE, bad_request)),
                 ?DISPATCH_ERROR => hoconsc:mk(hoconsc:ref(?MODULE, publish_error))
-            }
+            },
+            log_meta => emqx_dashboard_audit:importance(low)
         }
     };
 schema("/publish/bulk") ->
@@ -82,7 +83,8 @@ schema("/publish/bulk") ->
                 ?DISPATCH_ERROR => hoconsc:mk(
                     hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{}
                 )
-            }
+            },
+            log_meta => emqx_dashboard_audit:importance(low)
         }
     }.
 

+ 17 - 11
apps/emqx_mysql/src/emqx_mysql.erl

@@ -30,7 +30,7 @@
     on_start/2,
     on_stop/2,
     on_query/3,
-    on_batch_query/3,
+    on_batch_query/4,
     on_get_status/2,
     on_format_query_result/1
 ]).
@@ -197,18 +197,20 @@ on_query(
 on_batch_query(
     InstId,
     BatchReq = [{Key, _} | _],
-    #{query_templates := Templates} = State
+    #{query_templates := Templates} = State,
+    ChannelConfig
 ) ->
     case maps:get({Key, batch}, Templates, undefined) of
         undefined ->
             {error, {unrecoverable_error, batch_select_not_implemented}};
         Template ->
-            on_batch_insert(InstId, BatchReq, Template, State)
+            on_batch_insert(InstId, BatchReq, Template, State, ChannelConfig)
     end;
 on_batch_query(
     InstId,
     BatchReq,
-    State
+    State,
+    _
 ) ->
     ?SLOG(error, #{
         msg => "invalid request",
@@ -509,16 +511,20 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -
 proc_sql_params(_TypeOrKey, SQLOrData, Params, _State) ->
     {SQLOrData, Params}.
 
-on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) ->
-    Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs],
+on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State, ChannelConfig) ->
+    Rows = [render_row(RowTemplate, Msg, ChannelConfig) || {_, Msg} <- BatchReqs],
     Query = [InsertPart, <<" values ">> | lists:join($,, Rows)],
     on_sql_query(InstId, query, Query, no_params, default_timeout, State).
 
-render_row(RowTemplate, Data) ->
-    % NOTE
-    % Ignoring errors here, missing variables are set to "'undefined'" due to backward
-    % compatibility requirements.
-    RenderOpts = #{escaping => mysql, undefined => <<"undefined">>},
+render_row(RowTemplate, Data, ChannelConfig) ->
+    RenderOpts =
+        case maps:get(undefined_vars_as_null, ChannelConfig, false) of
+            % NOTE:
+            %  Ignoring errors here, missing variables are set to "'undefined'" due to backward
+            %  compatibility requirements.
+            false -> #{escaping => mysql, undefined => <<"undefined">>};
+            true -> #{escaping => mysql}
+        end,
     {Row, _Errors} = emqx_template_sql:render(RowTemplate, {emqx_jsonish, Data}, RenderOpts),
     Row.
 

+ 14 - 8
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -162,7 +162,7 @@ simple_sync_query(Id, Request, QueryOpts0) ->
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
     ?tp(simple_sync_query, #{id => Id, request => Request}),
-    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
+    QueryOpts = maps:merge(simple_sync_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
     TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
@@ -176,7 +176,7 @@ simple_sync_query(Id, Request, QueryOpts0) ->
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 simple_async_query(Id, Request, QueryOpts0) ->
     ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
-    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
+    QueryOpts = maps:merge(simple_async_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
     TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
@@ -204,7 +204,7 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         QueryOpts1 = QueryOpts0#{
             reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
         },
-        QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
+        QueryOpts = #{timeout := Timeout} = maps:merge(simple_sync_query_opts(), QueryOpts1),
         case simple_async_query(Id, Request, QueryOpts) of
             {error, _} = Error ->
                 ?tp("resource_simple_sync_internal_buffer_query_error", #{
@@ -235,8 +235,16 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         _ = unalias(ReplyAlias)
     end.
 
-simple_query_opts() ->
-    ensure_expire_at(#{simple_query => true, timeout => infinity}).
+simple_sync_query_opts() ->
+    %% Default `resource_opts.resource_ttl' is 45 seconds, so we use the same default
+    %% value here to match that.  No need to set `expire_at', since there's no queuing
+    %% involved.
+    #{simple_query => true, timeout => 45_000}.
+
+simple_async_query_opts() ->
+    %% No need to define `expire_at' nor `timeout', since there's no queuing involved in
+    %% simple queries.
+    #{simple_query => true}.
 
 -spec block(pid()) -> ok.
 block(ServerRef) ->
@@ -2342,9 +2350,7 @@ buffer_worker(_Tid) ->
     self().
 
 is_simple_query(#{simple_query := Bool}) ->
-    Bool;
-is_simple_query(_) ->
-    false.
+    Bool.
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.27"},
+    {vsn, "5.0.28"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx, emqx_ctl]},

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -2,7 +2,7 @@
 {application, emqx_rule_engine, [
     {description, "EMQX Rule Engine"},
     % strict semver, bump manually!
-    {vsn, "5.2.1"},
+    {vsn, "5.2.2"},
     {modules, []},
     {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
     {applications, [

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -902,9 +902,9 @@ join_to_sql_values_string(List) ->
         [
             case is_list(Item) of
                 true ->
-                    emqx_placeholder:quote_sql(emqx_utils_json:encode(Item));
+                    emqx_placeholder:quote_sql2(emqx_utils_json:encode(Item));
                 false ->
-                    emqx_placeholder:quote_sql(Item)
+                    emqx_placeholder:quote_sql2(Item)
             end
          || Item <- List
         ],

+ 31 - 2
apps/emqx_utils/src/emqx_placeholder.erl

@@ -20,6 +20,7 @@
 -export([
     preproc_tmpl/1,
     preproc_tmpl/2,
+    proc_nullable_tmpl/2,
     proc_tmpl/2,
     proc_tmpl/3,
     preproc_cmd/1,
@@ -29,22 +30,26 @@
     preproc_sql/2,
     proc_sql/2,
     proc_sql_param_str/2,
+    proc_sql_param_str2/2,
     proc_cql_param_str/2,
     proc_param_str/3,
     preproc_tmpl_deep/1,
     preproc_tmpl_deep/2,
     proc_tmpl_deep/2,
     proc_tmpl_deep/3,
-
     bin/1,
+    nullable_bin/1,
     sql_data/1,
     lookup_var/2
 ]).
 
 -export([
     quote_sql/1,
+    quote_sql2/1,
     quote_cql/1,
-    quote_mysql/1
+    quote_cql2/1,
+    quote_mysql/1,
+    quote_mysql2/1
 ]).
 
 -export_type([tmpl_token/0]).
@@ -111,6 +116,10 @@ preproc_tmpl(Str, Opts) ->
     Tokens = re:split(Str, RE, [{return, binary}, group, trim]),
     do_preproc_tmpl(Opts, Tokens, []).
 
+proc_nullable_tmpl(Tokens, Data) ->
+    Opts = #{return => full_binary, var_trans => fun nullable_bin/1},
+    proc_tmpl(Tokens, Data, Opts).
+
 -spec proc_tmpl(tmpl_token(), map()) -> binary().
 proc_tmpl(Tokens, Data) ->
     proc_tmpl(Tokens, Data, #{return => full_binary}).
@@ -182,6 +191,10 @@ proc_sql_param_str(Tokens, Data) ->
     % https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS
     proc_param_str(Tokens, Data, fun quote_sql/1).
 
+-spec proc_sql_param_str2(tmpl_token(), map()) -> binary().
+proc_sql_param_str2(Tokens, Data) ->
+    proc_param_str(Tokens, Data, fun quote_sql2/1).
+
 -spec proc_cql_param_str(tmpl_token(), map()) -> binary().
 proc_cql_param_str(Tokens, Data) ->
     proc_param_str(Tokens, Data, fun quote_cql/1).
@@ -238,6 +251,10 @@ proc_tmpl_deep({tmpl, Tokens}, Data, Opts) ->
 proc_tmpl_deep({tuple, Elements}, Data, Opts) ->
     list_to_tuple([proc_tmpl_deep(El, Data, Opts) || El <- Elements]).
 
+nullable_bin(undefined) -> <<"null">>;
+nullable_bin(null) -> <<"null">>;
+nullable_bin(Var) -> bin(Var).
+
 -spec sql_data(term()) -> term().
 sql_data(undefined) -> null;
 sql_data(List) when is_list(List) -> List;
@@ -254,14 +271,26 @@ bin(Val) -> emqx_utils_conv:bin(Val).
 quote_sql(Str) ->
     emqx_utils_sql:to_sql_string(Str, #{escaping => sql, undefined => <<"undefined">>}).
 
+-spec quote_sql2(_Value) -> iolist().
+quote_sql2(Str) ->
+    emqx_utils_sql:to_sql_string(Str, #{escaping => sql}).
+
 -spec quote_cql(_Value) -> iolist().
 quote_cql(Str) ->
     emqx_utils_sql:to_sql_string(Str, #{escaping => cql, undefined => <<"undefined">>}).
 
+-spec quote_cql2(_Value) -> iolist().
+quote_cql2(Str) ->
+    emqx_utils_sql:to_sql_string(Str, #{escaping => cql}).
+
 -spec quote_mysql(_Value) -> iolist().
 quote_mysql(Str) ->
     emqx_utils_sql:to_sql_string(Str, #{escaping => mysql, undefined => <<"undefined">>}).
 
+-spec quote_mysql2(_Value) -> iolist().
+quote_mysql2(Str) ->
+    emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}).
+
 lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
     Value;
 lookup_var([Prop | Rest], Data0) ->

+ 1 - 1
apps/emqx_utils/src/emqx_utils.app.src

@@ -2,7 +2,7 @@
 {application, emqx_utils, [
     {description, "Miscellaneous utilities for EMQX apps"},
     % strict semver, bump manually!
-    {vsn, "5.4.0"},
+    {vsn, "5.4.1"},
     {modules, [
         emqx_utils,
         emqx_utils_api,

+ 10 - 0
changes/ee/feat-13861.en.md

@@ -0,0 +1,10 @@
+A new configuration item `undefined_vars_as_null` has been added to some of the databases actions, to ensure that undefined variables are treated as NULL when writing data.
+
+The following actions are affected by this configuration item:
+
+Actions:
+- MySQL
+- ClickHouse
+- SQLServer
+- TDengine
+- DynamoDB

changes/ce/fix-13876.md → changes/ee/fix-13881.md


+ 7 - 0
changes/ee/fix-13963.en.md

@@ -0,0 +1,7 @@
+Fixed below bugs of the Audit Log feature:
+
+- The Audit log feature cannot be used with the SSO feature
+
+  Previously, the Audit log feature could not handle SSO events and each event would throw an exception.
+  
+- Illegal access (for example, a `GET` request to a `POST`-only endpoints) not be logged.

+ 1 - 0
changes/ee/fix-13964.md

@@ -0,0 +1 @@
+Fix the issue of data loss when enabling batch writing in DynamoDB Sink

+ 2 - 0
changes/ee/fix-13965.en.md

@@ -0,0 +1,2 @@
+Fixed a function clause error for IotDB in batch mode.
+

+ 3 - 0
changes/ee/fix-13971.en.md

@@ -0,0 +1,3 @@
+Fix Kafka producer bug introudced in EMQX Enterprise 5.8.0.
+
+Kafka producer may crash in case it has failed to fetch metadata at initialization stage.

+ 1 - 0
changes/ee/fix-13973.en.md

@@ -0,0 +1 @@
+Fix an issue with MS SQL Server integration when EMQX would print a few errors and warnings in the log each time connection to the server is stopped.

+ 100 - 0
changes/v5.8.1.en.md

@@ -0,0 +1,100 @@
+# 5.8.1
+
+*Release Date: 2024-10-14*
+
+Make sure to check the breaking changes and known issues before upgrading to EMQX 5.8.1.
+
+## Important Changes
+
+- [#13956](https://github.com/emqx/emqx/pull/13956) Updated the `gen_rpc` library to version 3.4.1, which includes a node crash issue.
+Previously, if a node is force shutdown down while RPC channels are being established, it may cause a cluster peer node to crash.
+
+## Enhancements
+
+### Core MQTT Functionalities
+
+- [#13525](https://github.com/emqx/emqx/pull/13525) Added new configuration item `shared_subscription_initial_sticky_pick` to specify the strategy for making the initial pick when `shared_subscription_strategy` is set to `sticky`.
+
+- [#13942](https://github.com/emqx/emqx/pull/13942) The HTTP client now automatically reconnects if no activity is detected for 10 seconds after the latest request has expired.
+  Previously, it would wait indefinitely for a server response, causing timeouts if the server dropped requests.
+
+  This change impacts below components.
+
+  - HTTP authentication
+  - HTTP authorization
+  - Webhook (HTTP connector)
+
+### Authentication and Authorization
+
+- [#13863](https://github.com/emqx/emqx/pull/13863) EMQX now supports `${cert_common_name}` placeholder in topic name templates for raw ACL rules.
+
+- [#13792](https://github.com/emqx/emqx/pull/13792) The banned-clients API `GET /banned` supports querying the rules using filters in the query string.
+
+  The available filters are:
+
+  - clientid
+  - username
+  - peerhost
+  - like_clientid
+  - like_username
+  - like_peerhost
+  - like_peerhost_net
+
+  When adding a new banned client entry, the default expiration time for entries without the `until` parameter specified has been changed from 1 year to `infinite`.
+
+### Rule Engine
+
+- [#13773](https://github.com/emqx/emqx/pull/13773) Disabled rule actions now do not trigger `out_of_service` warnings.
+
+  Previously, if an action is disabled, there would be a warning log with `msg: out_of_service`,
+  and the `actions.failed` counter was incremented for the rule.
+
+  After this enhancement, disabled action will result in a `debug` level log with `msg: discarded`,
+  and the newly introduced counter `actions.discarded` will be incremented.
+
+### MQTT over QUIC
+
+- [#13814](https://github.com/emqx/emqx/pull/13814) Connection Scope Keepalive for MQTT over QUIC Multi-Stream:
+
+  This update introduces a new feature to maintain MQTT connections over QUIC multi-streams, even when the control stream is idle but other data streams are active.
+
+  Previously, clients had to send `MQTT.PINGREQ` on idle control streams to keep the connection alive. Now, a shared state is maintained for each connection, monitoring activity across all streams. This shared state helps determine if the connection is still active, reducing the risk of keepalive timeouts caused by Head-of-Line (HOL) blocking and improving overall connection stability.
+
+## Bug Fixes
+
+### Core MQTT Functions
+
+- [#13702](https://github.com/emqx/emqx/pull/13702) Clean up the corresponding exclusive subscriptions when a node goes down.
+
+- [#13708](https://github.com/emqx/emqx/pull/13708) Fixed an issue which may cause shared subscription 'sticky' strategy to degrade to 'random'.
+
+- [#13733](https://github.com/emqx/emqx/pull/13733) Made `cacertfile` optional when configuring https listener from `emqx ctl conf load` command.
+
+- [#13742](https://github.com/emqx/emqx/pull/13742) Fixed when subscribing with `+` as the first level, or `#` as a wildcard, retained messages with topics starting with `$` are incorrectly received.
+
+- [#13754](https://github.com/emqx/emqx/pull/13754) Fixed an issue when websocket connection would break consistently on its own.
+
+- [#13756](https://github.com/emqx/emqx/pull/13756) Introduced more randomness to broker assigned client IDs.
+
+- [#13790](https://github.com/emqx/emqx/pull/13790) The default heartbeat interval for the MQTT connector has been reduced from 300 seconds to 160 seconds.
+
+  This change helps maintain the underlying TCP connection by preventing timeouts due to the idle limits
+  imposed by load balancers or firewalls, which typically range from 3 to 5 minutes depending on the cloud provider.
+
+
+- [#13832](https://github.com/emqx/emqx/pull/13832) Fixed that the `Publish` endpoint would have a 500 error when persistent session were enabled.
+
+- [#13842](https://github.com/emqx/emqx/pull/13842) Fixed a UTF-8 string validation exception.
+
+### Upgrade and Migration
+
+- [#13731](https://github.com/emqx/emqx/pull/13731) Resolved an issue that prevented clusters running on EMQX 5.4.0 from upgrading to EMQX 5.8.0. This fix introduces a migration procedure to update specific internal database tables created in version 5.4.0 to align with the new schema.
+
+## Breaking Changes
+
+- [#13792](https://github.com/emqx/emqx/pull/13792) The default expiration time for a banned item that is created without an `until` value is now `infinity` (previsouly capped at 1 year limit).
+
+- [#13742](https://github.com/emqx/emqx/pull/13742) Fixed an issue when a client would receive retained messages for a topic starting with `$` when it subscribed to topic `#` or `+`.
+
+  This fix satisfies the requirement of [MQTT-4.7.2-1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901246).
+

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.8.1-rc.1
+version: 5.8.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.8.1-rc.1
+appVersion: 5.8.1

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.8.1-rc.1
+version: 5.8.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.8.1-rc.1
+appVersion: 5.8.1

+ 2 - 2
mix.exs

@@ -238,7 +238,7 @@ defmodule EMQXUmbrella.MixProject do
     do: {:bcrypt, github: "emqx/erlang-bcrypt", tag: "0.6.2", override: true}
 
   def common_dep(:minirest),
-    do: {:minirest, github: "emqx/minirest", tag: "1.4.3", override: true}
+    do: {:minirest, github: "emqx/minirest", tag: "1.4.4", override: true}
 
   # maybe forbid to fetch quicer
   def common_dep(:emqtt),
@@ -274,7 +274,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:influxdb),
     do: {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}
 
-  def common_dep(:wolff), do: {:wolff, "4.0.0"}
+  def common_dep(:wolff), do: {:wolff, "4.0.2"}
   def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"}
 
   def common_dep(:kafka_protocol),

+ 1 - 1
rebar.config

@@ -86,7 +86,7 @@
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.6"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.4.1"}}},
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
-    {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.3"}}},
+    {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.4"}}},
     {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.10"}}},
     {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.8"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},

+ 8 - 0
rel/i18n/emqx_bridge_v2_schema.hocon

@@ -22,4 +22,12 @@ config_enable.desc:
 config_enable.label:
 """Enable or Disable"""
 
+undefined_vars_as_null.desc:
+"""When writing to databases, treat undefined variables as NULL.
+When this option is enabled, if undefined variables (like ${var}) are used in templates, they will be replaced with "NULL" instead of the string "undefined". If this option is not enabled (default), the string "undefined" might be inserted.
+This option should always be `true` if possible; the default value `false` is only to ensure backward compatibility."""
+
+undefined_vars_as_null.label:
+"""Undefined Vars as Null"""
+
 }