Просмотр исходного кода

Merge remote-tracking branch 'upstream/release-53' into 1129-sync-r53

Ivan Dyachkov 2 лет назад
Родитель
Сommit
ec10c51073
42 измененных файлов с 1264 добавлено и 421 удалено
  1. 15 5
      .github/workflows/release.yaml
  2. 1 1
      .github/workflows/upload-helm-charts.yaml
  3. 2 2
      Makefile
  4. 1 1
      apps/emqx/include/emqx_release.hrl
  5. 1 1
      apps/emqx/test/emqx_cth_cluster.erl
  6. 1 1
      apps/emqx_bridge/src/emqx_action_info.erl
  7. 1 1
      apps/emqx_bridge/src/emqx_bridge.erl
  8. 8 7
      apps/emqx_bridge/src/emqx_bridge_api.erl
  9. 15 9
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  10. 64 22
      apps/emqx_bridge/src/emqx_bridge_v2.erl
  11. 1 1
      apps/emqx_bridge/src/emqx_bridge_v2_api.erl
  12. 3 3
      apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl
  13. 5 4
      apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
  14. 28 30
      apps/emqx_bridge/test/emqx_bridge_SUITE.erl
  15. 34 11
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  16. 5 5
      apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
  17. 1 1
      apps/emqx_bridge/test/emqx_bridge_testlib.erl
  18. 4 2
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
  19. 70 36
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
  20. 1 1
      apps/emqx_bridge_http/src/emqx_bridge_http.app.src
  21. 102 0
      apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl
  22. 150 5
      apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
  23. 325 92
      apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl
  24. 43 21
      apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
  25. 140 0
      apps/emqx_bridge_http/test/emqx_bridge_http_v2_SUITE.erl
  26. 2 12
      apps/emqx_connector/src/emqx_connector_api.erl
  27. 10 14
      apps/emqx_connector/src/emqx_connector_resource.erl
  28. 2 13
      apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
  29. 46 2
      apps/emqx_connector/src/schema/emqx_connector_schema.erl
  30. 10 4
      apps/emqx_license/src/emqx_license_schema.erl
  31. 11 0
      apps/emqx_license/test/emqx_license_http_api_SUITE.erl
  32. 1 0
      apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
  33. 1 0
      apps/emqx_redis/src/emqx_redis.app.src
  34. 3 3
      apps/emqx_resource/test/emqx_resource_schema_tests.erl
  35. 12 4
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  36. 1 1
      apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
  37. 44 102
      apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
  38. 1 0
      changes/ce/fix-12044.en.md
  39. 42 0
      changes/e5.3.2.en.md
  40. 38 0
      changes/v5.3.2.en.md
  41. 2 2
      deploy/charts/emqx-enterprise/Chart.yaml
  42. 17 2
      rel/i18n/emqx_bridge_http_schema.hocon

+ 15 - 5
.github/workflows/release.yaml

@@ -20,7 +20,14 @@ jobs:
   upload:
     runs-on: ubuntu-22.04
     permissions:
+      contents: write
+      checks: write
       packages: write
+      actions: read
+      issues: read
+      pull-requests: read
+      repository-projects: read
+      statuses: read
     strategy:
       fail-fast: false
     steps:
@@ -45,11 +52,13 @@ jobs:
             v*)
               echo "profile=emqx" >> $GITHUB_OUTPUT
               echo "version=$(./pkg-vsn.sh emqx)" >> $GITHUB_OUTPUT
+              echo "ref_name=v$(./pkg-vsn.sh emqx)" >> "$GITHUB_ENV"
               echo "s3dir=emqx-ce" >> $GITHUB_OUTPUT
               ;;
             e*)
               echo "profile=emqx-enterprise" >> $GITHUB_OUTPUT
               echo "version=$(./pkg-vsn.sh emqx-enterprise)" >> $GITHUB_OUTPUT
+              echo "ref_name=e$(./pkg-vsn.sh emqx-enterprise)" >> "$GITHUB_ENV"
               echo "s3dir=emqx-ee" >> $GITHUB_OUTPUT
               ;;
           esac
@@ -57,14 +66,15 @@ jobs:
         run: |
           BUCKET=${{ secrets.AWS_S3_BUCKET }}
           OUTPUT_DIR=${{ steps.profile.outputs.s3dir }}
-          aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ github.ref_name }} packages
-      - uses: alexellis/upload-assets@0.4.0
+          aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ env.ref_name }} packages
+      - uses: emqx/upload-assets@8d2083b4dbe3151b0b735572eaa153b6acb647fe # 0.5.0
         env:
           GITHUB_TOKEN: ${{ github.token }}
         with:
           asset_paths: '["packages/*"]'
+          tag_name: "${{ env.ref_name }}"
       - name: update to emqx.io
-        if: startsWith(github.ref_name, 'v') && ((github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts)
+        if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts)
         run: |
           set -eux
           curl -w %{http_code} \
@@ -72,10 +82,10 @@ jobs:
                -H "Content-Type: application/json" \
                -H "token: ${{ secrets.EMQX_IO_TOKEN }}" \
                -X POST \
-               -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
+               -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.ref_name }}\" }" \
                ${{ secrets.EMQX_IO_RELEASE_API }}
       - name: Push to packagecloud.io
-        if: (github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts
+        if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts
         env:
           PROFILE: ${{ steps.profile.outputs.profile }}
           VERSION: ${{ steps.profile.outputs.version }}

+ 1 - 1
.github/workflows/upload-helm-charts.yaml

@@ -43,7 +43,7 @@ jobs:
               ;;
           esac
       - uses: emqx/push-helm-action@v1.1
-        if: github.event_name == 'release' && !github.event.prerelease
+        if: github.event_name == 'release' && !github.event.release.prerelease
         with:
           charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}"
           version: ${{ steps.profile.outputs.version }}

+ 2 - 2
Makefile

@@ -20,8 +20,8 @@ endif
 
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.5.1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2-beta.1
+export EMQX_DASHBOARD_VERSION ?= v1.5.2
+export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2
 
 PROFILE ?= emqx
 REL_PROFILES := emqx emqx-enterprise

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -35,7 +35,7 @@
 -define(EMQX_RELEASE_CE, "5.3.2").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.3.2-alpha.1").
+-define(EMQX_RELEASE_EE, "5.3.2").
 
 %% The HTTP API version
 -define(EMQX_API_VERSION, "5.0").

+ 1 - 1
apps/emqx/test/emqx_cth_cluster.erl

@@ -50,7 +50,7 @@
 -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
 
 -define(TIMEOUT_NODE_START_MS, 15000).
--define(TIMEOUT_APPS_START_MS, 30000).
+-define(TIMEOUT_APPS_START_MS, 60000).
 -define(TIMEOUT_NODE_STOP_S, 15).
 
 %%

+ 1 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -89,7 +89,7 @@ hard_coded_action_info_modules_ee() ->
 -endif.
 
 hard_coded_action_info_modules_common() ->
-    [].
+    [emqx_bridge_http_action_info].
 
 hard_coded_action_info_modules() ->
     hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -364,7 +364,7 @@ get_metrics(Type, Name) ->
 maybe_upgrade(mqtt, Config) ->
     emqx_bridge_compatible_config:maybe_upgrade(Config);
 maybe_upgrade(webhook, Config) ->
-    emqx_bridge_compatible_config:webhook_maybe_upgrade(Config);
+    emqx_bridge_compatible_config:http_maybe_upgrade(Config);
 maybe_upgrade(_Other, Config) ->
     Config.
 

+ 8 - 7
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -143,7 +143,7 @@ param_path_id() ->
             #{
                 in => path,
                 required => true,
-                example => <<"webhook:webhook_example">>,
+                example => <<"http:http_example">>,
                 desc => ?DESC("desc_param_path_id")
             }
         )}.
@@ -166,9 +166,9 @@ bridge_info_array_example(Method) ->
 bridge_info_examples(Method) ->
     maps:merge(
         #{
-            <<"webhook_example">> => #{
-                summary => <<"WebHook">>,
-                value => info_example(webhook, Method)
+            <<"http_example">> => #{
+                summary => <<"HTTP">>,
+                value => info_example(http, Method)
             },
             <<"mqtt_example">> => #{
                 summary => <<"MQTT Bridge">>,
@@ -201,7 +201,7 @@ method_example(Type, Method) when Method == get; Method == post ->
 method_example(_Type, put) ->
     #{}.
 
-info_example_basic(webhook) ->
+info_example_basic(http) ->
     #{
         enable => true,
         url => <<"http://localhost:9901/messages/${topic}">>,
@@ -212,7 +212,7 @@ info_example_basic(webhook) ->
         pool_size => 4,
         enable_pipelining => 100,
         ssl => #{enable => false},
-        local_topic => <<"emqx_webhook/#">>,
+        local_topic => <<"emqx_http/#">>,
         method => post,
         body => <<"${payload}">>,
         resource_opts => #{
@@ -650,7 +650,8 @@ create_or_update_bridge(BridgeType0, BridgeName, Conf, HttpStatusCode) ->
 
 get_metrics_from_local_node(BridgeType0, BridgeName) ->
     BridgeType = upgrade_type(BridgeType0),
-    format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)).
+    MetricsResult = emqx_bridge:get_metrics(BridgeType, BridgeName),
+    format_metrics(MetricsResult).
 
 '/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
     ?TRY_PARSE_ID(

+ 15 - 9
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -63,18 +63,23 @@
 ).
 
 -if(?EMQX_RELEASE_EDITION == ee).
-bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
-bridge_to_resource_type(webhook) -> emqx_bridge_http_connector;
-bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType).
+bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
+    bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
+bridge_to_resource_type(mqtt) ->
+    emqx_bridge_mqtt_connector;
+bridge_to_resource_type(webhook) ->
+    emqx_bridge_http_connector;
+bridge_to_resource_type(BridgeType) ->
+    emqx_bridge_enterprise:resource_type(BridgeType).
 
 bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType).
 -else.
-bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
-bridge_to_resource_type(webhook) -> emqx_bridge_http_connector.
+bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
+    bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
+bridge_to_resource_type(mqtt) ->
+    emqx_bridge_mqtt_connector;
+bridge_to_resource_type(webhook) ->
+    emqx_bridge_http_connector.
 
 bridge_impl_module(_BridgeType) -> undefined.
 -endif.
@@ -309,6 +314,7 @@ remove(Type, Name, _Conf, _Opts) ->
     emqx_resource:remove_local(resource_id(Type, Name)).
 
 %% convert bridge configs to what the connector modules want
+%% TODO: remove it, if the http_bridge already ported to v2
 parse_confs(
     <<"webhook">>,
     _Name,

+ 64 - 22
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -1188,7 +1188,7 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
             %% If the bridge v2 does not exist, it is a valid bridge v1
             PreviousRawConf = undefined,
             split_bridge_v1_config_and_create_helper(
-                BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+                BridgeV1Type, BridgeName, RawConf, PreviousRawConf, fun() -> ok end
             );
         _Conf ->
             case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
@@ -1198,9 +1198,13 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
                     PreviousRawConf = emqx:get_raw_config(
                         [?ROOT_KEY, BridgeV2Type, BridgeName], undefined
                     ),
-                    bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps),
+                    %% To avoid losing configurations. We have to make sure that no crash occurs
+                    %% during deletion and creation of configurations.
+                    PreCreateFun = fun() ->
+                        bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps)
+                    end,
                     split_bridge_v1_config_and_create_helper(
-                        BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+                        BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
                     );
                 false ->
                     %% If the bridge v2 exists, it is not a valid bridge v1
@@ -1208,16 +1212,49 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
             end
     end.
 
-split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
-    #{
-        connector_type := ConnectorType,
-        connector_name := NewConnectorName,
-        connector_conf := NewConnectorRawConf,
-        bridge_v2_type := BridgeType,
-        bridge_v2_name := BridgeName,
-        bridge_v2_conf := NewBridgeV2RawConf
-    } =
-        split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf),
+split_bridge_v1_config_and_create_helper(
+    BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
+) ->
+    try
+        #{
+            connector_type := ConnectorType,
+            connector_name := NewConnectorName,
+            connector_conf := NewConnectorRawConf,
+            bridge_v2_type := BridgeType,
+            bridge_v2_name := BridgeName,
+            bridge_v2_conf := NewBridgeV2RawConf
+        } = split_and_validate_bridge_v1_config(
+            BridgeV1Type,
+            BridgeName,
+            RawConf,
+            PreviousRawConf
+        ),
+
+        _ = PreCreateFun(),
+
+        do_connector_and_bridge_create(
+            ConnectorType,
+            NewConnectorName,
+            NewConnectorRawConf,
+            BridgeType,
+            BridgeName,
+            NewBridgeV2RawConf,
+            RawConf
+        )
+    catch
+        throw:Reason ->
+            {error, Reason}
+    end.
+
+do_connector_and_bridge_create(
+    ConnectorType,
+    NewConnectorName,
+    NewConnectorRawConf,
+    BridgeType,
+    BridgeName,
+    NewBridgeV2RawConf,
+    RawConf
+) ->
     case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
         {ok, _} ->
             case create(BridgeType, BridgeName, NewBridgeV2RawConf) of
@@ -1335,15 +1372,20 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
     RawConf = maps:without([<<"name">>], RawConfig0),
     TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
     PreviousRawConf = undefined,
-    #{
-        connector_type := _ConnectorType,
-        connector_name := _NewConnectorName,
-        connector_conf := ConnectorRawConf,
-        bridge_v2_type := BridgeV2Type,
-        bridge_v2_name := _BridgeName,
-        bridge_v2_conf := BridgeV2RawConf
-    } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
-    create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
+    try
+        #{
+            connector_type := _ConnectorType,
+            connector_name := _NewConnectorName,
+            connector_conf := ConnectorRawConf,
+            bridge_v2_type := BridgeV2Type,
+            bridge_v2_name := _BridgeName,
+            bridge_v2_conf := BridgeV2RawConf
+        } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
+        create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf)
+    catch
+        throw:Reason ->
+            {error, Reason}
+    end.
 
 %% Only called by test cases (may create broken references)
 bridge_v1_remove(BridgeV1Type, BridgeName) ->

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -117,7 +117,7 @@ param_path_id() ->
             #{
                 in => path,
                 required => true,
-                example => <<"webhook:webhook_example">>,
+                example => <<"http:my_http_action">>,
                 desc => ?DESC("desc_param_path_id")
             }
         )}.

+ 3 - 3
apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl

@@ -21,7 +21,7 @@
 -export([
     upgrade_pre_ee/2,
     maybe_upgrade/1,
-    webhook_maybe_upgrade/1
+    http_maybe_upgrade/1
 ]).
 
 upgrade_pre_ee(undefined, _UpgradeFunc) ->
@@ -40,10 +40,10 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) ->
 maybe_upgrade(NewVersion) ->
     NewVersion.
 
-webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
+http_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
     Config1 = maps:remove(<<"direction">>, Config0),
     Config1#{<<"resource_opts">> => default_resource_opts()};
-webhook_maybe_upgrade(NewVersion) ->
+http_maybe_upgrade(NewVersion) ->
     NewVersion.
 
 binary_key({K, V}) ->

+ 5 - 4
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -162,13 +162,14 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
 
 fields(bridges) ->
     [
-        {webhook,
+        {http,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
                 #{
+                    aliases => [webhook],
                     desc => ?DESC("bridges_webhook"),
                     required => false,
-                    converter => fun webhook_bridge_converter/2
+                    converter => fun http_bridge_converter/2
                 }
             )},
         {mqtt,
@@ -243,7 +244,7 @@ status() ->
 node_name() ->
     {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
 
-webhook_bridge_converter(Conf0, _HoconOpts) ->
+http_bridge_converter(Conf0, _HoconOpts) ->
     emqx_bridge_compatible_config:upgrade_pre_ee(
-        Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+        Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
     ).

+ 28 - 30
apps/emqx_bridge/test/emqx_bridge_SUITE.erl

@@ -30,14 +30,18 @@ init_per_suite(Config) ->
         [
             emqx,
             emqx_conf,
+            emqx_connector,
+            emqx_bridge_http,
             emqx_bridge
         ],
         #{work_dir => ?config(priv_dir, Config)}
     ),
+    emqx_mgmt_api_test_util:init_suite(),
     [{apps, Apps} | Config].
 
 end_per_suite(Config) ->
     Apps = ?config(apps, Config),
+    emqx_mgmt_api_test_util:end_suite(),
     ok = emqx_cth_suite:stop(Apps),
     ok.
 
@@ -58,6 +62,7 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) ->
             ok = emqx_bridge:remove(BridgeType, BridgeName)
         end,
         [
+            %% Keep using the old bridge names to avoid breaking the tests
             {webhook, <<"basic_usage_info_webhook">>},
             {webhook, <<"basic_usage_info_webhook_disabled">>},
             {mqtt, <<"basic_usage_info_mqtt">>}
@@ -88,7 +93,7 @@ t_get_basic_usage_info_1(_Config) ->
         #{
             num_bridges => 3,
             count_by_type => #{
-                webhook => 1,
+                http => 1,
                 mqtt => 2
             }
         },
@@ -119,40 +124,33 @@ setup_fake_telemetry_data() ->
     HTTPConfig = #{
         url => <<"http://localhost:9901/messages/${topic}">>,
         enable => true,
-        local_topic => "emqx_webhook/#",
+        local_topic => "emqx_http/#",
         method => post,
         body => <<"${payload}">>,
         headers => #{},
         request_timeout => "15s"
     },
-    Conf =
-        #{
-            <<"bridges">> =>
-                #{
-                    <<"webhook">> =>
-                        #{
-                            <<"basic_usage_info_webhook">> => HTTPConfig,
-                            <<"basic_usage_info_webhook_disabled">> =>
-                                HTTPConfig#{enable => false}
-                        },
-                    <<"mqtt">> =>
-                        #{
-                            <<"basic_usage_info_mqtt">> => MQTTConfig1,
-                            <<"basic_usage_info_mqtt_from_select">> => MQTTConfig2
-                        }
-                }
-        },
-    ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf),
-
-    ok = snabbkaffe:start_trace(),
-    Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end,
-    NEvents = 3,
-    BackInTime = 0,
-    Timeout = 11_000,
-    {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
-    ok = emqx_bridge:load(),
-    {ok, _} = snabbkaffe_collector:receive_events(Sub),
-    ok = snabbkaffe:stop(),
+    %% Keep use the old bridge names to test the backward compatibility
+    {ok, _} = emqx_bridge_testlib:create_bridge_api(
+        <<"webhook">>,
+        <<"basic_usage_info_webhook">>,
+        HTTPConfig
+    ),
+    {ok, _} = emqx_bridge_testlib:create_bridge_api(
+        <<"webhook">>,
+        <<"basic_usage_info_webhook_disabled">>,
+        HTTPConfig#{enable => false}
+    ),
+    {ok, _} = emqx_bridge_testlib:create_bridge_api(
+        <<"mqtt">>,
+        <<"basic_usage_info_mqtt">>,
+        MQTTConfig1
+    ),
+    {ok, _} = emqx_bridge_testlib:create_bridge_api(
+        <<"mqtt">>,
+        <<"basic_usage_info_mqtt_from_select">>,
+        MQTTConfig2
+    ),
     ok.
 
 t_update_ssl_conf(Config) ->

+ 34 - 11
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -78,6 +78,9 @@
     emqx_auth,
     emqx_auth_mnesia,
     emqx_management,
+    emqx_connector,
+    emqx_bridge_http,
+    emqx_bridge,
     {emqx_rule_engine, "rule_engine { rules {} }"},
     {emqx_bridge, "bridges {}"}
 ]).
@@ -108,7 +111,7 @@ groups() ->
     ].
 
 suite() ->
-    [{timetrap, {seconds, 60}}].
+    [{timetrap, {seconds, 120}}].
 
 init_per_suite(Config) ->
     Config.
@@ -407,10 +410,7 @@ t_http_crud_apis(Config) ->
         Config
     ),
     ?assertMatch(
-        #{
-            <<"reason">> := <<"unknown_fields">>,
-            <<"unknown">> := <<"curl">>
-        },
+        #{<<"reason">> := <<"required_field">>},
         json(maps:get(<<"message">>, PutFail2))
     ),
     {ok, 400, _} = request_json(
@@ -419,12 +419,16 @@ t_http_crud_apis(Config) ->
         ?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name),
         Config
     ),
-    {ok, 400, _} = request_json(
+    {ok, 400, PutFail3} = request_json(
         put,
         uri(["bridges", BridgeID]),
         ?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name),
         Config
     ),
+    ?assertMatch(
+        #{<<"kind">> := <<"validation_error">>},
+        json(maps:get(<<"message">>, PutFail3))
+    ),
 
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
@@ -463,7 +467,7 @@ t_http_crud_apis(Config) ->
     ),
 
     %% Create non working bridge
-    BrokenURL = ?URL(Port + 1, "/foo"),
+    BrokenURL = ?URL(Port + 1, "foo"),
     {ok, 201, BrokenBridge} = request(
         post,
         uri(["bridges"]),
@@ -471,6 +475,7 @@ t_http_crud_apis(Config) ->
         fun json/1,
         Config
     ),
+
     ?assertMatch(
         #{
             <<"type">> := ?BRIDGE_TYPE_HTTP,
@@ -1307,6 +1312,7 @@ t_cluster_later_join_metrics(Config) ->
     Name = ?BRIDGE_NAME,
     BridgeParams = ?HTTP_BRIDGE(URL1, Name),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
+
     ?check_trace(
         begin
             %% Create a bridge on only one of the nodes.
@@ -1323,6 +1329,20 @@ t_cluster_later_join_metrics(Config) ->
             ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
             %% Check metrics; shouldn't crash even if the bridge is not
             %% ready on the node that just joined the cluster.
+
+            %% assert: wait for the bridge to be ready on the other node.
+            fun
+                WaitConfSync(0) ->
+                    throw(waiting_config_sync_timeout);
+                WaitConfSync(N) ->
+                    timer:sleep(1000),
+                    case erpc:call(OtherNode, emqx_bridge, list, []) of
+                        [] -> WaitConfSync(N - 1);
+                        [_] -> ok
+                    end
+            end(
+                60
+            ),
             ?assertMatch(
                 {ok, 200, #{
                     <<"metrics">> := #{<<"success">> := _},
@@ -1373,17 +1393,16 @@ t_create_with_bad_name(Config) ->
 
 validate_resource_request_ttl(single, Timeout, Name) ->
     SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
-    ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
+    _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
     ?check_trace(
         begin
             {ok, Res} =
                 ?wait_async_action(
-                    emqx_bridge:send_message(BridgeID, SentData),
+                    do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
                     #{?snk_kind := async_query},
                     1000
                 ),
-            ?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res)
+            ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res)
         end,
         fun(Trace0) ->
             Trace = ?of_kind(async_query, Trace0),
@@ -1394,6 +1413,10 @@ validate_resource_request_ttl(single, Timeout, Name) ->
 validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
     ignore.
 
+do_send_message(BridgeV1Type, Name, Message) ->
+    Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
+    emqx_bridge_v2:send_message(Type, Name, Message, #{}).
+
 %%
 
 request(Method, URL, Config) ->

+ 5 - 5
apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl

@@ -21,7 +21,7 @@ empty_config_test() ->
     Conf1 = #{<<"bridges">> => #{}},
     Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
     ?assertEqual(Conf1, check(Conf1)),
-    ?assertEqual(Conf2, check(Conf2)),
+    ?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)),
     ok.
 
 %% ensure webhook config can be checked
@@ -33,7 +33,7 @@ webhook_config_test() ->
     ?assertMatch(
         #{
             <<"bridges">> := #{
-                <<"webhook">> := #{
+                <<"http">> := #{
                     <<"the_name">> :=
                         #{
                             <<"method">> := get,
@@ -48,7 +48,7 @@ webhook_config_test() ->
     ?assertMatch(
         #{
             <<"bridges">> := #{
-                <<"webhook">> := #{
+                <<"http">> := #{
                     <<"the_name">> :=
                         #{
                             <<"method">> := get,
@@ -61,7 +61,7 @@ webhook_config_test() ->
     ),
     #{
         <<"bridges">> := #{
-            <<"webhook">> := #{
+            <<"http">> := #{
                 <<"the_name">> :=
                     #{
                         <<"method">> := get,
@@ -84,7 +84,7 @@ up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
     Bridges#{<<"mqtt">> := MqttBridges};
 up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
     WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
-        WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+        WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
     ),
     Bridges#{<<"webhook">> := WebhookBridges}.
 

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -92,7 +92,7 @@ end_per_testcase(_Testcase, Config) ->
 delete_all_bridges() ->
     lists:foreach(
         fun(#{name := Name, type := Type}) ->
-            emqx_bridge:remove(Type, Name)
+            ok = emqx_bridge:remove(Type, Name)
         end,
         emqx_bridge:list()
     ).

+ 4 - 2
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -237,7 +237,10 @@ handle_continue(?patch_subscription, State0) ->
             ),
             {noreply, State0};
         error ->
-            %% retry
+            %% retry; add a random delay for the case where multiple workers step on each
+            %% other's toes before retrying.
+            RandomMS = rand:uniform(500),
+            timer:sleep(RandomMS),
             {noreply, State0, {continue, ?patch_subscription}}
     end.
 
@@ -478,7 +481,6 @@ do_pull_async(State0) ->
             Body = body(State0, pull),
             PreparedRequest = {prepared_request, {Method, Path, Body}},
             ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]},
-            %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers...
             Res = emqx_bridge_gcp_pubsub_client:query_async(
                 PreparedRequest,
                 ReplyFunAndArgs,

+ 70 - 36
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl

@@ -196,7 +196,7 @@ consumer_config(TestCase, Config) ->
             "  connect_timeout = \"5s\"\n"
             "  service_account_json = ~s\n"
             "  consumer {\n"
-            "    ack_deadline = \"60s\"\n"
+            "    ack_deadline = \"10s\"\n"
             "    ack_retry_interval = \"1s\"\n"
             "    pull_max_messages = 10\n"
             "    consumer_workers_per_topic = 1\n"
@@ -208,7 +208,7 @@ consumer_config(TestCase, Config) ->
             "  resource_opts {\n"
             "    health_check_interval = \"1s\"\n"
             %% to fail and retry pulling faster
-            "    request_ttl = \"5s\"\n"
+            "    request_ttl = \"1s\"\n"
             "  }\n"
             "}\n",
             [
@@ -285,7 +285,7 @@ start_control_client() ->
             connect_timeout => 5_000,
             max_retries => 0,
             pool_size => 1,
-            resource_opts => #{request_ttl => 5_000},
+            resource_opts => #{request_ttl => 1_000},
             service_account_json => RawServiceAccount
         },
     PoolName = <<"control_connector">>,
@@ -512,10 +512,23 @@ wait_acked(Opts) ->
     %% no need to check return value; we check the property in
     %% the check phase.  this is just to give it a chance to do
     %% so and avoid flakiness.  should be fast.
-    snabbkaffe:block_until(
+    Res = snabbkaffe:block_until(
         ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}),
         Timeout
     ),
+    case Res of
+        {ok, _} ->
+            ok;
+        {timeout, Evts} ->
+            %% Fixme: apparently, snabbkaffe may timeout but still return the expected
+            %% events here.
+            case length(Evts) >= N of
+                true ->
+                    ok;
+                false ->
+                    ct:pal("timed out waiting for acks;\n expected: ~b\n received:\n  ~p", [N, Evts])
+            end
+    end,
     ok.
 
 wait_forgotten() ->
@@ -652,25 +665,24 @@ setup_and_start_listeners(Node, NodeOpts) ->
         end
     ).
 
+dedup([]) ->
+    [];
+dedup([X]) ->
+    [X];
+dedup([X | Rest]) ->
+    [X | dedup(X, Rest)].
+
+dedup(X, [X | Rest]) ->
+    dedup(X, Rest);
+dedup(_X, [Y | Rest]) ->
+    [Y | dedup(Y, Rest)];
+dedup(_X, []) ->
+    [].
+
 %%------------------------------------------------------------------------------
 %% Trace properties
 %%------------------------------------------------------------------------------
 
-prop_pulled_only_once() ->
-    {"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}.
-prop_pulled_only_once(Trace) ->
-    PulledIds =
-        [
-            MsgId
-         || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace),
-            #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs
-        ],
-    NumPulled = length(PulledIds),
-    UniquePulledIds = sets:from_list(PulledIds, [{version, 2}]),
-    UniqueNumPulled = sets:size(UniquePulledIds),
-    ?assertEqual(UniqueNumPulled, NumPulled, #{pulled_ids => PulledIds}),
-    ok.
-
 prop_handled_only_once() ->
     {"all pulled message are processed only once", fun ?MODULE:prop_handled_only_once/1}.
 prop_handled_only_once(Trace) ->
@@ -1046,7 +1058,6 @@ t_consume_ok(Config) ->
         end,
         [
             prop_all_pulled_are_acked(),
-            prop_pulled_only_once(),
             prop_handled_only_once(),
             prop_acked_ids_eventually_forgotten()
         ]
@@ -1119,7 +1130,6 @@ t_bridge_rule_action_source(Config) ->
             #{payload => Payload0}
         end,
         [
-            prop_pulled_only_once(),
             prop_handled_only_once()
         ]
     ),
@@ -1237,7 +1247,6 @@ t_multiple_topic_mappings(Config) ->
         end,
         [
             prop_all_pulled_are_acked(),
-            prop_pulled_only_once(),
             prop_handled_only_once()
         ]
     ),
@@ -1265,11 +1274,12 @@ t_multiple_pull_workers(Config) ->
                     <<"consumer">> => #{
                         %% reduce flakiness
                         <<"ack_deadline">> => <<"10m">>,
+                        <<"ack_retry_interval">> => <<"1s">>,
                         <<"consumer_workers_per_topic">> => NConsumers
                     },
                     <<"resource_opts">> => #{
                         %% reduce flakiness
-                        <<"request_ttl">> => <<"15s">>
+                        <<"request_ttl">> => <<"20s">>
                     }
                 }
             ),
@@ -1297,7 +1307,6 @@ t_multiple_pull_workers(Config) ->
         end,
         [
             prop_all_pulled_are_acked(),
-            prop_pulled_only_once(),
             prop_handled_only_once(),
             {"message is processed only once", fun(Trace) ->
                 ?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})),
@@ -1531,11 +1540,12 @@ t_async_worker_death_mid_pull(Config) ->
                         ct:pal("published message"),
 
                         AsyncWorkerPids = get_async_worker_pids(Config),
+                        Timeout = 20_000,
                         emqx_utils:pmap(
                             fun(AsyncWorkerPid) ->
                                 Ref = monitor(process, AsyncWorkerPid),
                                 ct:pal("killing pid ~p", [AsyncWorkerPid]),
-                                sys:terminate(AsyncWorkerPid, die, 20_000),
+                                exit(AsyncWorkerPid, kill),
                                 receive
                                     {'DOWN', Ref, process, AsyncWorkerPid, _} ->
                                         ct:pal("killed pid ~p", [AsyncWorkerPid]),
@@ -1544,7 +1554,8 @@ t_async_worker_death_mid_pull(Config) ->
                                 end,
                                 ok
                             end,
-                            AsyncWorkerPids
+                            AsyncWorkerPids,
+                            Timeout + 2_000
                         ),
 
                         ok
@@ -1558,7 +1569,13 @@ t_async_worker_death_mid_pull(Config) ->
                 ?wait_async_action(
                     create_bridge(
                         Config,
-                        #{<<"pool_size">> => 1}
+                        #{
+                            <<"pool_size">> => 1,
+                            <<"consumer">> => #{
+                                <<"ack_deadline">> => <<"10s">>,
+                                <<"ack_retry_interval">> => <<"1s">>
+                            }
+                        }
                     ),
                     #{?snk_kind := gcp_pubsub_consumer_worker_init},
                     10_000
@@ -1590,18 +1607,19 @@ t_async_worker_death_mid_pull(Config) ->
                     ],
                     Trace
                 ),
+                SubTraceEvts = ?projection(?snk_kind, SubTrace),
                 ?assertMatch(
                     [
-                        #{?snk_kind := gcp_pubsub_consumer_worker_handled_async_worker_down},
-                        #{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator}
+                        gcp_pubsub_consumer_worker_handled_async_worker_down,
+                        gcp_pubsub_consumer_worker_reply_delegator
                         | _
                     ],
-                    SubTrace,
+                    dedup(SubTraceEvts),
                     #{sub_trace => projection_optional_span(SubTrace)}
                 ),
                 ?assertMatch(
-                    #{?snk_kind := gcp_pubsub_consumer_worker_pull_response_received},
-                    lists:last(SubTrace)
+                    gcp_pubsub_consumer_worker_pull_response_received,
+                    lists:last(SubTraceEvts)
                 ),
                 ok
             end
@@ -1888,7 +1906,10 @@ t_connection_down_during_ack(Config) ->
 
             {{ok, _}, {ok, _}} =
                 ?wait_async_action(
-                    create_bridge(Config),
+                    create_bridge(
+                        Config,
+                        #{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}}
+                    ),
                     #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
                     10_000
                 ),
@@ -1930,7 +1951,6 @@ t_connection_down_during_ack(Config) ->
         end,
         [
             prop_all_pulled_are_acked(),
-            prop_pulled_only_once(),
             prop_handled_only_once(),
             {"message is processed only once", fun(Trace) ->
                 ?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})),
@@ -1955,7 +1975,15 @@ t_connection_down_during_ack_redeliver(Config) ->
                 ?wait_async_action(
                     create_bridge(
                         Config,
-                        #{<<"consumer">> => #{<<"ack_deadline">> => <<"10s">>}}
+                        #{
+                            <<"consumer">> => #{
+                                <<"ack_deadline">> => <<"12s">>,
+                                <<"ack_retry_interval">> => <<"1s">>
+                            },
+                            <<"resource_opts">> => #{
+                                <<"request_ttl">> => <<"11s">>
+                            }
+                        }
                     ),
                     #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
                     10_000
@@ -2026,7 +2054,13 @@ t_connection_down_during_pull(Config) ->
 
             {{ok, _}, {ok, _}} =
                 ?wait_async_action(
-                    create_bridge(Config),
+                    create_bridge(
+                        Config,
+                        #{
+                            <<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>},
+                            <<"resource_opts">> => #{<<"request_ttl">> => <<"11s">>}
+                        }
+                    ),
                     #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
                     10_000
                 ),

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -3,7 +3,7 @@
     {vsn, "0.1.5"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},
     {modules, []},
     {links, []}
 ]}.

+ 102 - 0
apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl

@@ -0,0 +1,102 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_http_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0,
+    connector_action_config_to_bridge_v1_config/2,
+    bridge_v1_config_to_action_config/2,
+    bridge_v1_config_to_connector_config/1
+]).
+
+-define(REMOVED_KEYS, [<<"direction">>]).
+-define(ACTION_KEYS, [<<"local_topic">>, <<"resource_opts">>]).
+-define(PARAMETER_KEYS, [<<"body">>, <<"max_retries">>, <<"method">>, <<"request_timeout">>]).
+
+bridge_v1_type_name() -> webhook.
+
+action_type_name() -> http.
+
+connector_type_name() -> http.
+
+schema_module() -> emqx_bridge_http_schema.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
+    %% Move parameters to the top level
+    ParametersMap1 = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
+    ParametersMap2 = maps:without([<<"path">>, <<"headers">>], ParametersMap1),
+    BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
+    BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap2),
+    BridgeV1Config4 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config3),
+
+    Url = maps:get(<<"url">>, ConnectorConfig),
+    Path = maps:get(<<"path">>, ParametersMap1, <<>>),
+
+    Headers1 = maps:get(<<"headers">>, ConnectorConfig, #{}),
+    Headers2 = maps:get(<<"headers">>, ParametersMap1, #{}),
+
+    Url1 =
+        case Path of
+            <<>> -> Url;
+            _ -> iolist_to_binary(emqx_bridge_http_connector:join_paths(Url, Path))
+        end,
+
+    BridgeV1Config4#{
+        <<"headers">> => maps:merge(Headers1, Headers2),
+        <<"url">> => Url1
+    }.
+
+bridge_v1_config_to_connector_config(BridgeV1Conf) ->
+    %% To statisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1
+    ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)),
+    maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf).
+
+bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
+    Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf),
+    Parameters1 = Parameters#{<<"path">> => <<>>, <<"headers">> => #{}},
+    CommonKeys = [<<"enable">>, <<"description">>],
+    ActionConfig = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf),
+    ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}.
+
+%%--------------------------------------------------------------------
+%% helpers
+
+validate_webhook_url(undefined) ->
+    throw(#{
+        kind => validation_error,
+        reason => required_field,
+        required_field => <<"url">>
+    });
+validate_webhook_url(Url) ->
+    {BaseUrl, _Path} = emqx_connector_resource:parse_url(Url),
+    case emqx_http_lib:uri_parse(BaseUrl) of
+        {ok, _} ->
+            ok;
+        {error, Reason} ->
+            throw(#{
+                kind => validation_error,
+                reason => invalid_url,
+                url => Url,
+                error => emqx_utils:readable_error_msg(Reason)
+            })
+    end.

+ 150 - 5
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -31,9 +31,14 @@
     on_query/3,
     on_query_async/4,
     on_get_status/2,
-    reply_delegator/3
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
+-export([reply_delegator/3]).
+
 -export([
     roots/0,
     fields/1,
@@ -41,7 +46,7 @@
     namespace/0
 ]).
 
-%% for other webhook-like connectors.
+%% for other http-like connectors.
 -export([redact_request/1]).
 
 -export([validate_method/1, join_paths/2]).
@@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) ->
             Error
     end.
 
+on_add_channel(
+    _InstId,
+    OldState,
+    ActionId,
+    ActionConfig
+) ->
+    InstalledActions = maps:get(installed_actions, OldState, #{}),
+    {ok, ActionState} = do_create_http_action(ActionConfig),
+    NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
+    NewState = maps:put(installed_actions, NewInstalledActions, OldState),
+    {ok, NewState}.
+
+do_create_http_action(_ActionConfig = #{parameters := Params}) ->
+    {ok, preprocess_request(Params)}.
+
 on_stop(InstId, _State) ->
     ?SLOG(info, #{
         msg => "stopping_http_connector",
@@ -260,6 +280,16 @@ on_stop(InstId, _State) ->
     ?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
     Res.
 
+on_remove_channel(
+    _InstId,
+    OldState = #{installed_actions := InstalledActions},
+    ActionId
+) ->
+    NewInstalledActions = maps:remove(ActionId, InstalledActions),
+    NewState = maps:put(installed_actions, NewInstalledActions, OldState),
+    {ok, NewState}.
+
+%% BridgeV1 entrypoint
 on_query(InstId, {send_message, Msg}, State) ->
     case maps:get(request, State, undefined) of
         undefined ->
@@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) ->
                 State
             )
     end;
+%% BridgeV2 entrypoint
+on_query(
+    InstId,
+    {ActionId, Msg},
+    State = #{installed_actions := InstalledActions}
+) when is_binary(ActionId) ->
+    case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
+        {undefined, _} ->
+            ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
+            {error, arg_request_not_found};
+        {_, undefined} ->
+            ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
+            {error, action_not_found};
+        {Request, ActionState} ->
+            #{
+                method := Method,
+                path := Path,
+                body := Body,
+                headers := Headers,
+                request_timeout := Timeout
+            } = process_request_and_action(Request, ActionState, Msg),
+            %% bridge buffer worker has retry, do not let ehttpc retry
+            Retry = 2,
+            ClientId = maps:get(clientid, Msg, undefined),
+            on_query(
+                InstId,
+                {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
+                State
+            )
+    end;
 on_query(InstId, {Method, Request}, State) ->
     %% TODO: Get retry from State
     on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
@@ -343,6 +403,7 @@ on_query(
             Result
     end.
 
+%% BridgeV1 entrypoint
 on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
     case maps:get(request, State, undefined) of
         undefined ->
@@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
                 State
             )
     end;
+%% BridgeV2 entrypoint
+on_query_async(
+    InstId,
+    {ActionId, Msg},
+    ReplyFunAndArgs,
+    State = #{installed_actions := InstalledActions}
+) when is_binary(ActionId) ->
+    case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
+        {undefined, _} ->
+            ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
+            {error, arg_request_not_found};
+        {_, undefined} ->
+            ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
+            {error, action_not_found};
+        {Request, ActionState} ->
+            #{
+                method := Method,
+                path := Path,
+                body := Body,
+                headers := Headers,
+                request_timeout := Timeout
+            } = process_request_and_action(Request, ActionState, Msg),
+            ClientId = maps:get(clientid, Msg, undefined),
+            on_query_async(
+                InstId,
+                {ClientId, Method, {Path, Headers, Body}, Timeout},
+                ReplyFunAndArgs,
+                State
+            )
+    end;
 on_query_async(
     InstId,
     {KeyOrNum, Method, Request, Timeout},
@@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
             ehttpc_pool:pick_worker(PoolName, Key)
     end.
 
+on_get_channels(ResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ResId).
+
 on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
     case do_get_status(PoolName, Timeout) of
         ok ->
@@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) ->
             {error, timeout}
     end.
 
+on_get_channel_status(
+    InstId,
+    _ChannelId,
+    State
+) ->
+    %% XXX: Reuse the connector status
+    on_get_status(InstId, State).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
@@ -466,10 +568,10 @@ preprocess_request(Req) when map_size(Req) == 0 ->
 preprocess_request(
     #{
         method := Method,
-        path := Path,
-        headers := Headers
+        path := Path
     } = Req
 ) ->
+    Headers = maps:get(headers, Req, []),
     #{
         method => parse_template(to_bin(Method)),
         path => parse_template(Path),
@@ -529,6 +631,49 @@ maybe_parse_template(Key, Conf) ->
 parse_template(String) ->
     emqx_template:parse(String).
 
+process_request_and_action(Request, ActionState, Msg) ->
+    MethodTemplate = maps:get(method, ActionState),
+    Method = make_method(render_template_string(MethodTemplate, Msg)),
+    BodyTemplate = maps:get(body, ActionState),
+    Body = render_request_body(BodyTemplate, Msg),
+
+    PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
+    PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
+
+    Path =
+        case PathSuffix of
+            "" -> PathPrefix;
+            _ -> join_paths(PathPrefix, PathSuffix)
+        end,
+
+    HeadersTemplate1 = maps:get(headers, Request),
+    HeadersTemplate2 = maps:get(headers, ActionState),
+    Headers = merge_proplist(
+        render_headers(HeadersTemplate1, Msg),
+        render_headers(HeadersTemplate2, Msg)
+    ),
+    #{
+        method => Method,
+        path => Path,
+        body => Body,
+        headers => Headers,
+        request_timeout => maps:get(request_timeout, ActionState)
+    }.
+
+merge_proplist(Proplist1, Proplist2) ->
+    lists:foldl(
+        fun({K, V}, Acc) ->
+            case lists:keyfind(K, 1, Acc) of
+                false ->
+                    [{K, V} | Acc];
+                {K, _} = {K, V1} ->
+                    [{K, V1} | Acc]
+            end
+        end,
+        Proplist2,
+        Proplist1
+    ).
+
 process_request(
     #{
         method := MethodTemplate,
@@ -691,7 +836,7 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
             true -> Context;
             false -> Context#{attempt := Attempt + 1}
         end,
-    ?tp(webhook_will_retry_async, #{}),
+    ?tp(http_will_retry_async, #{}),
     Worker = resolve_pool_worker(State, KeyOrNum),
     ok = ehttpc:request_async(
         Worker,

+ 325 - 92
apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl

@@ -18,69 +18,162 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
 
 -export([roots/0, fields/1, namespace/0, desc/1]).
 
+-export([
+    bridge_v2_examples/1,
+    %%conn_bridge_examples/1,
+    connector_examples/1
+]).
+
 %%======================================================================================
 %% Hocon Schema Definitions
 namespace() -> "bridge_http".
 
 roots() -> [].
 
-fields("config") ->
-    basic_config() ++ request_config();
+%%--------------------------------------------------------------------
+%% v1 bridges http api
+%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0
 fields("post") ->
     [
-        type_field(),
+        old_type_field(),
         name_field()
     ] ++ fields("config");
 fields("put") ->
     fields("config");
 fields("get") ->
     emqx_bridge_schema:status_fields() ++ fields("post");
-fields("creation_opts") ->
+%%--- v1 bridges config file
+%% see: emqx_bridge_schema:fields(bridges)
+fields("config") ->
+    basic_config() ++ request_config();
+%%--------------------------------------------------------------------
+%% v2: configuration
+fields(action) ->
+    {http,
+        mk(
+            hoconsc:map(name, ref(?MODULE, "http_action")),
+            #{
+                aliases => [webhook],
+                desc => <<"HTTP Action Config">>,
+                required => false
+            }
+        )};
+fields("http_action") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable_bridge"), default => true})},
+        {connector,
+            mk(binary(), #{
+                desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
+            })},
+        {description, emqx_schema:description_schema()},
+        %% Note: there's an implicit convention in `emqx_bridge' that,
+        %% for egress bridges with this config, the published messages
+        %% will be forwarded to such bridges.
+        {local_topic,
+            mk(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_local_topic"),
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
+        %% Since e5.3.2, we split the http bridge to two parts: a) connector. b) actions.
+        %% some fields are moved to connector, some fields are moved to actions and composed into the
+        %% `parameters` field.
+        {parameters,
+            mk(ref("parameters_opts"), #{
+                required => true,
+                desc => ?DESC("config_parameters_opts")
+            })}
+    ] ++ http_resource_opts();
+fields("parameters_opts") ->
+    [
+        {path,
+            mk(
+                binary(),
+                #{
+                    desc => ?DESC("config_path"),
+                    required => false
+                }
+            )},
+        method_field(),
+        headers_field(),
+        body_field(),
+        max_retries_field(),
+        request_timeout_field()
+    ];
+%% v2: api schema
+%% The parameter equls to
+%%   `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
+%%   `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
+fields("post_" ++ Type) ->
+    [type_field(), name_field() | fields("config_" ++ Type)];
+fields("put_" ++ Type) ->
+    fields("config_" ++ Type);
+fields("get_" ++ Type) ->
+    emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
+fields("config_bridge_v2") ->
+    fields("http_action");
+fields("config_connector") ->
+    [
+        {enable,
+            mk(
+                boolean(),
+                #{
+                    desc => <<"Enable or disable this connector">>,
+                    default => true
+                }
+            )},
+        {description, emqx_schema:description_schema()}
+    ] ++ connector_url_headers() ++ connector_opts();
+%%--------------------------------------------------------------------
+%% v1/v2
+fields("resource_opts") ->
+    UnsupportedOpts = [enable_batch, batch_size, batch_time],
     lists:filter(
-        fun({K, _V}) ->
-            not lists:member(K, unsupported_opts())
-        end,
+        fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
         emqx_resource_schema:fields("creation_opts")
     ).
 
 desc("config") ->
     ?DESC("desc_config");
-desc("creation_opts") ->
+desc("resource_opts") ->
     ?DESC(emqx_resource_schema, "creation_opts");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
+desc("config_connector") ->
+    ?DESC("desc_config");
+desc("http_action") ->
+    ?DESC("desc_config");
+desc("parameters_opts") ->
+    ?DESC("config_parameters_opts");
 desc(_) ->
     undefined.
 
+%%--------------------------------------------------------------------
+%% helpers for v1 only
+
 basic_config() ->
     [
         {enable,
             mk(
                 boolean(),
                 #{
-                    desc => ?DESC("config_enable"),
+                    desc => ?DESC("config_enable_bridge"),
                     default => true
                 }
-            )}
-    ] ++ webhook_creation_opts() ++
-        proplists:delete(
-            max_retries, emqx_bridge_http_connector:fields(config)
-        ).
+            )},
+        {description, emqx_schema:description_schema()}
+    ] ++ http_resource_opts() ++ connector_opts().
 
 request_config() ->
     [
-        {url,
-            mk(
-                binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("config_url")
-                }
-            )},
+        url_field(),
         {direction,
             mk(
                 egress,
@@ -98,81 +191,37 @@ request_config() ->
                     required => false
                 }
             )},
-        {method,
-            mk(
-                method(),
-                #{
-                    default => post,
-                    desc => ?DESC("config_method")
-                }
-            )},
-        {headers,
-            mk(
-                map(),
-                #{
-                    default => #{
-                        <<"accept">> => <<"application/json">>,
-                        <<"cache-control">> => <<"no-cache">>,
-                        <<"connection">> => <<"keep-alive">>,
-                        <<"content-type">> => <<"application/json">>,
-                        <<"keep-alive">> => <<"timeout=5">>
-                    },
-                    desc => ?DESC("config_headers")
-                }
-            )},
-        {body,
-            mk(
-                binary(),
-                #{
-                    default => undefined,
-                    desc => ?DESC("config_body")
-                }
-            )},
-        {max_retries,
-            mk(
-                non_neg_integer(),
-                #{
-                    default => 2,
-                    desc => ?DESC("config_max_retries")
-                }
-            )},
-        {request_timeout,
-            mk(
-                emqx_schema:duration_ms(),
-                #{
-                    default => <<"15s">>,
-                    deprecated => {since, "v5.0.26"},
-                    desc => ?DESC("config_request_timeout")
-                }
-            )}
+        method_field(),
+        headers_field(),
+        body_field(),
+        max_retries_field(),
+        request_timeout_field()
     ].
 
-webhook_creation_opts() ->
-    [
-        {resource_opts,
-            mk(
-                ref(?MODULE, "creation_opts"),
-                #{
-                    required => false,
-                    default => #{},
-                    desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
-                }
-            )}
-    ].
+%%--------------------------------------------------------------------
+%% helpers for v2 only
 
-unsupported_opts() ->
-    [
-        enable_batch,
-        batch_size,
-        batch_time
-    ].
+connector_url_headers() ->
+    [url_field(), headers_field()].
 
-%%======================================================================================
+%%--------------------------------------------------------------------
+%% common funcs
+
+%% `webhook` is kept for backward compatibility.
+old_type_field() ->
+    {type,
+        mk(
+            enum([webhook, http]),
+            #{
+                required => true,
+                desc => ?DESC("desc_type")
+            }
+        )}.
 
 type_field() ->
     {type,
         mk(
-            webhook,
+            http,
             #{
                 required => true,
                 desc => ?DESC("desc_type")
@@ -189,5 +238,189 @@ name_field() ->
             }
         )}.
 
-method() ->
-    enum([post, put, get, delete]).
+url_field() ->
+    {url,
+        mk(
+            binary(),
+            #{
+                required => true,
+                desc => ?DESC("config_url")
+            }
+        )}.
+
+headers_field() ->
+    {headers,
+        mk(
+            map(),
+            #{
+                default => #{
+                    <<"accept">> => <<"application/json">>,
+                    <<"cache-control">> => <<"no-cache">>,
+                    <<"connection">> => <<"keep-alive">>,
+                    <<"content-type">> => <<"application/json">>,
+                    <<"keep-alive">> => <<"timeout=5">>
+                },
+                desc => ?DESC("config_headers")
+            }
+        )}.
+
+method_field() ->
+    {method,
+        mk(
+            enum([post, put, get, delete]),
+            #{
+                default => post,
+                desc => ?DESC("config_method")
+            }
+        )}.
+
+body_field() ->
+    {body,
+        mk(
+            binary(),
+            #{
+                default => undefined,
+                desc => ?DESC("config_body")
+            }
+        )}.
+
+max_retries_field() ->
+    {max_retries,
+        mk(
+            non_neg_integer(),
+            #{
+                default => 2,
+                desc => ?DESC("config_max_retries")
+            }
+        )}.
+
+request_timeout_field() ->
+    {request_timeout,
+        mk(
+            emqx_schema:duration_ms(),
+            #{
+                default => <<"15s">>,
+                deprecated => {since, "v5.0.26"},
+                desc => ?DESC("config_request_timeout")
+            }
+        )}.
+
+http_resource_opts() ->
+    [
+        {resource_opts,
+            mk(
+                ref(?MODULE, "resource_opts"),
+                #{
+                    required => false,
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+                }
+            )}
+    ].
+
+connector_opts() ->
+    mark_request_field_deperecated(
+        proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
+    ).
+
+mark_request_field_deperecated(Fields) ->
+    lists:map(
+        fun({K, V}) ->
+            case K of
+                request ->
+                    {K, V#{
+                        %% Note: if we want to deprecate a reference type, we have to change
+                        %% it to a direct type first.
+                        type => typerefl:map(),
+                        deprecated => {since, "5.3.2"},
+                        desc => <<"This field is never used, so we deprecated it since 5.3.2.">>
+                    }};
+                _ ->
+                    {K, V}
+            end
+        end,
+        Fields
+    ).
+
+%%--------------------------------------------------------------------
+%% Examples
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"http">> => #{
+                summary => <<"HTTP Action">>,
+                value => values({Method, bridge_v2})
+            }
+        }
+    ].
+
+connector_examples(Method) ->
+    [
+        #{
+            <<"http">> => #{
+                summary => <<"HTTP Connector">>,
+                value => values({Method, connector})
+            }
+        }
+    ].
+
+values({get, Type}) ->
+    maps:merge(
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        },
+        values({post, Type})
+    );
+values({post, bridge_v2}) ->
+    maps:merge(
+        #{
+            name => <<"my_http_action">>,
+            type => <<"http">>
+        },
+        values({put, bridge_v2})
+    );
+values({post, connector}) ->
+    maps:merge(
+        #{
+            name => <<"my_http_connector">>,
+            type => <<"http">>
+        },
+        values({put, connector})
+    );
+values({put, bridge_v2}) ->
+    values(bridge_v2);
+values({put, connector}) ->
+    values(connector);
+values(bridge_v2) ->
+    #{
+        enable => true,
+        connector => <<"my_http_connector">>,
+        parameters => #{
+            path => <<"/room/${room_no}">>,
+            method => <<"post">>,
+            headers => #{},
+            body => <<"${.}">>
+        },
+        resource_opts => #{
+            worker_pool_size => 16,
+            health_check_interval => <<"15s">>,
+            query_mode => <<"async">>
+        }
+    };
+values(connector) ->
+    #{
+        enable => true,
+        url => <<"http://localhost:8080/api/v1">>,
+        headers => #{<<"content-type">> => <<"application/json">>},
+        connect_timeout => <<"15s">>,
+        pool_type => <<"hash">>,
+        pool_size => 1,
+        enable_pipelining => 100
+    }.

+ 43 - 21
apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl

@@ -39,18 +39,33 @@ all() ->
 groups() ->
     [].
 
-init_per_suite(_Config) ->
-    emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
-    ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]),
-    ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
-    {ok, _} = application:ensure_all_started(emqx_connector),
-    [].
+init_per_suite(Config0) ->
+    Config =
+        case os:getenv("DEBUG_CASE") of
+            [_ | _] = DebugCase ->
+                CaseName = list_to_atom(DebugCase),
+                [{debug_case, CaseName} | Config0];
+            _ ->
+                Config0
+        end,
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            emqx_connector,
+            emqx_bridge_http,
+            emqx_bridge,
+            emqx_rule_engine
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    emqx_mgmt_api_test_util:init_suite(),
+    [{apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]),
-    ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
-    _ = application:stop(emqx_connector),
-    _ = application:stop(emqx_bridge),
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_cth_suite:stop(Apps),
     ok.
 
 suite() ->
@@ -115,7 +130,8 @@ end_per_testcase(TestCase, _Config) when
 ->
     ok = emqx_bridge_http_connector_test_server:stop(),
     persistent_term:erase({?MODULE, times_called}),
-    emqx_bridge_testlib:delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_connectors(),
     emqx_common_test_helpers:call_janitor(),
     ok;
 end_per_testcase(_TestCase, Config) ->
@@ -123,7 +139,8 @@ end_per_testcase(_TestCase, Config) ->
         undefined -> ok;
         Server -> stop_http_server(Server)
     end,
-    emqx_bridge_testlib:delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_connectors(),
     emqx_common_test_helpers:call_janitor(),
     ok.
 
@@ -420,7 +437,7 @@ t_send_async_connection_timeout(Config) ->
     ),
     NumberOfMessagesToSend = 10,
     [
-        emqx_bridge:send_message(BridgeID, #{<<"id">> => Id})
+        do_send_message(#{<<"id">> => Id})
      || Id <- lists:seq(1, NumberOfMessagesToSend)
     ],
     %% Make sure server receives all messages
@@ -431,7 +448,7 @@ t_send_async_connection_timeout(Config) ->
 
 t_async_free_retries(Config) ->
     #{port := Port} = ?config(http_server, Config),
-    BridgeID = make_bridge(#{
+    _BridgeID = make_bridge(#{
         port => Port,
         pool_size => 1,
         query_mode => "sync",
@@ -445,7 +462,7 @@ t_async_free_retries(Config) ->
     Fn = fun(Get, Error) ->
         ?assertMatch(
             {ok, 200, _, _},
-            emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+            do_send_message(#{<<"hello">> => <<"world">>}),
             #{error => Error}
         ),
         ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -456,7 +473,7 @@ t_async_free_retries(Config) ->
 
 t_async_common_retries(Config) ->
     #{port := Port} = ?config(http_server, Config),
-    BridgeID = make_bridge(#{
+    _BridgeID = make_bridge(#{
         port => Port,
         pool_size => 1,
         query_mode => "sync",
@@ -471,7 +488,7 @@ t_async_common_retries(Config) ->
     FnSucceed = fun(Get, Error) ->
         ?assertMatch(
             {ok, 200, _, _},
-            emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+            do_send_message(#{<<"hello">> => <<"world">>}),
             #{error => Error, attempts => Get()}
         ),
         ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -479,7 +496,7 @@ t_async_common_retries(Config) ->
     FnFail = fun(Get, Error) ->
         ?assertMatch(
             Error,
-            emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+            do_send_message(#{<<"hello">> => <<"world">>}),
             #{error => Error, attempts => Get()}
         ),
         ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -559,7 +576,7 @@ t_path_not_found(Config) ->
             ok
         end,
         fun(Trace) ->
-            ?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)),
+            ?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
             ok
         end
     ),
@@ -600,7 +617,7 @@ t_too_many_requests(Config) ->
             ok
         end,
         fun(Trace) ->
-            ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)),
+            ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)),
             ok
         end
     ),
@@ -711,6 +728,11 @@ t_bridge_probes_header_atoms(Config) ->
     ok.
 
 %% helpers
+
+do_send_message(Message) ->
+    Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE),
+    emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}).
+
 do_t_async_retries(TestCase, TestContext, Error, Fn) ->
     #{error_attempts := ErrorAttempts} = TestContext,
     PTKey = {?MODULE, TestCase, attempts},

+ 140 - 0
apps/emqx_bridge_http/test/emqx_bridge_http_v2_SUITE.erl

@@ -0,0 +1,140 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_http_v2_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-import(emqx_mgmt_api_test_util, [request/3]).
+-import(emqx_common_test_helpers, [on_exit/1]).
+-import(emqx_bridge_http_SUITE, [start_http_server/1, stop_http_server/1]).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+-define(BRIDGE_TYPE, <<"http">>).
+-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
+-define(CONNECTOR_NAME, atom_to_binary(?MODULE)).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+    [].
+
+init_per_suite(Config0) ->
+    Config =
+        case os:getenv("DEBUG_CASE") of
+            [_ | _] = DebugCase ->
+                CaseName = list_to_atom(DebugCase),
+                [{debug_case, CaseName} | Config0];
+            _ ->
+                Config0
+        end,
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            emqx_connector,
+            emqx_bridge_http,
+            emqx_bridge,
+            emqx_rule_engine
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    emqx_mgmt_api_test_util:init_suite(),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_cth_suite:stop(Apps),
+    ok.
+
+suite() ->
+    [{timetrap, {seconds, 60}}].
+
+init_per_testcase(_TestCase, Config) ->
+    Server = start_http_server(#{response_delay_ms => 0}),
+    [{http_server, Server} | Config].
+
+end_per_testcase(_TestCase, Config) ->
+    case ?config(http_server, Config) of
+        undefined -> ok;
+        Server -> stop_http_server(Server)
+    end,
+    emqx_bridge_v2_testlib:delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_connectors(),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
+%%--------------------------------------------------------------------
+%% tests
+%%--------------------------------------------------------------------
+
+t_compose_connector_url_and_action_path(Config) ->
+    Path = <<"/foo/bar">>,
+    ConnectorCfg = make_connector_config(Config),
+    ActionCfg = make_action_config([{path, Path} | Config]),
+    CreateConfig = [
+        {bridge_type, ?BRIDGE_TYPE},
+        {bridge_name, ?BRIDGE_NAME},
+        {bridge_config, ActionCfg},
+        {connector_type, ?BRIDGE_TYPE},
+        {connector_name, ?CONNECTOR_NAME},
+        {connector_config, ConnectorCfg}
+    ],
+    {ok, _} = emqx_bridge_v2_testlib:create_bridge(CreateConfig),
+
+    %% assert: the url returned v1 api is composed by the url of the connector and the
+    %% path of the action
+    #{port := Port} = ?config(http_server, Config),
+    ExpectedUrl = iolist_to_binary(io_lib:format("http://localhost:~p/foo/bar", [Port])),
+    {ok, {_, _, [Bridge]}} = emqx_bridge_testlib:list_bridges_api(),
+    ?assertMatch(
+        #{<<"url">> := ExpectedUrl},
+        Bridge
+    ),
+    ok.
+
+%%--------------------------------------------------------------------
+%% helpers
+%%--------------------------------------------------------------------
+
+make_connector_config(Config) ->
+    #{port := Port} = ?config(http_server, Config),
+    #{
+        <<"enable">> => true,
+        <<"url">> => iolist_to_binary(io_lib:format("http://localhost:~p", [Port])),
+        <<"headers">> => #{},
+        <<"pool_type">> => <<"hash">>,
+        <<"pool_size">> => 1
+    }.
+
+make_action_config(Config) ->
+    Path = ?config(path, Config),
+    #{
+        <<"enable">> => true,
+        <<"connector">> => ?CONNECTOR_NAME,
+        <<"parameters">> => #{
+            <<"path">> => Path,
+            <<"method">> => <<"post">>,
+            <<"headers">> => #{},
+            <<"body">> => <<"${.}">>
+        }
+    }.

+ 2 - 12
apps/emqx_connector/src/emqx_connector_api.erl

@@ -137,7 +137,7 @@ param_path_id() ->
             #{
                 in => path,
                 required => true,
-                example => <<"webhook:webhook_example">>,
+                example => <<"http:my_http_connector">>,
                 desc => ?DESC("desc_param_path_id")
             }
         )}.
@@ -158,17 +158,7 @@ connector_info_array_example(Method) ->
     lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))).
 
 connector_info_examples(Method) ->
-    maps:merge(
-        #{},
-        emqx_enterprise_connector_examples(Method)
-    ).
-
--if(?EMQX_RELEASE_EDITION == ee).
-emqx_enterprise_connector_examples(Method) ->
-    emqx_connector_ee_schema:examples(Method).
--else.
-emqx_enterprise_connector_examples(_Method) -> #{}.
--endif.
+    emqx_connector_schema:examples(Method).
 
 schema("/connectors") ->
     #{

+ 10 - 14
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -49,6 +49,8 @@
     get_channels/2
 ]).
 
+-export([parse_url/1]).
+
 -callback connector_config(ParsedConfig) ->
     ParsedConfig
 when
@@ -77,8 +79,10 @@ connector_impl_module(_ConnectorType) ->
 
 -endif.
 
-connector_to_resource_type_ce(_ConnectorType) ->
-    no_bridge_v2_for_c2_so_far.
+connector_to_resource_type_ce(http) ->
+    emqx_bridge_http_connector;
+connector_to_resource_type_ce(ConnectorType) ->
+    error({no_bridge_v2, ConnectorType}).
 
 resource_id(ConnectorId) when is_binary(ConnectorId) ->
     <<"connector:", ConnectorId/binary>>.
@@ -271,13 +275,11 @@ remove(Type, Name, _Conf, _Opts) ->
 
 %% convert connector configs to what the connector modules want
 parse_confs(
-    <<"webhook">>,
+    <<"http">>,
     _Name,
     #{
         url := Url,
-        method := Method,
-        headers := Headers,
-        max_retries := Retry
+        headers := Headers
     } = Conf
 ) ->
     Url1 = bin(Url),
@@ -290,20 +292,14 @@ parse_confs(
                 Reason1 = emqx_utils:readable_error_msg(Reason),
                 invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
         end,
-    RequestTTL = emqx_utils_maps:deep_get(
-        [resource_opts, request_ttl],
-        Conf
-    ),
     Conf#{
         base_url => BaseUrl1,
         request =>
             #{
                 path => Path,
-                method => Method,
-                body => maps:get(body, Conf, undefined),
                 headers => Headers,
-                request_ttl => RequestTTL,
-                max_retries => Retry
+                body => undefined,
+                method => undefined
             }
     };
 parse_confs(<<"iotdb">>, Name, Conf) ->

+ 2 - 13
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -15,7 +15,8 @@
 -export([
     api_schemas/1,
     fields/1,
-    examples/1
+    %%examples/1
+    schema_modules/0
 ]).
 
 resource_type(Type) when is_binary(Type) ->
@@ -141,18 +142,6 @@ connector_structs() ->
             )}
     ].
 
-examples(Method) ->
-    MergeFun =
-        fun(Example, Examples) ->
-            maps:merge(Examples, Example)
-        end,
-    Fun =
-        fun(Module, Examples) ->
-            ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
-            lists:foldl(MergeFun, Examples, ConnectorExamples)
-        end,
-    lists:foldl(Fun, #{}, schema_modules()).
-
 schema_modules() ->
     [
         emqx_bridge_azure_event_hub,

+ 46 - 2
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -42,6 +42,8 @@
 
 -export([resource_opts_fields/0, resource_opts_fields/1]).
 
+-export([examples/1]).
+
 -if(?EMQX_RELEASE_EDITION == ee).
 enterprise_api_schemas(Method) ->
     %% We *must* do this to ensure the module is really loaded, especially when we use
@@ -71,9 +73,40 @@ enterprise_fields_connectors() -> [].
 
 -endif.
 
+api_schemas(Method) ->
+    [
+        %% We need to map the `type' field of a request (binary) to a
+        %% connector schema module.
+        api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector")
+    ].
+
+api_ref(Module, Type, Method) ->
+    {Type, ref(Module, Method)}.
+
+examples(Method) ->
+    MergeFun =
+        fun(Example, Examples) ->
+            maps:merge(Examples, Example)
+        end,
+    Fun =
+        fun(Module, Examples) ->
+            ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
+            lists:foldl(MergeFun, Examples, ConnectorExamples)
+        end,
+    lists:foldl(Fun, #{}, schema_modules()).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+schema_modules() ->
+    [emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules().
+-else.
+schema_modules() ->
+    [emqx_bridge_http_schema].
+-endif.
+
 connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
 connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
 connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
+connector_type_to_bridge_types(http) -> [http, webhook];
 connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
 connector_type_to_bridge_types(matrix) -> [matrix];
 connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
@@ -311,8 +344,9 @@ post_request() ->
     api_schema("post").
 
 api_schema(Method) ->
+    CE = api_schemas(Method),
     EE = enterprise_api_schemas(Method),
-    hoconsc:union(connector_api_union(EE)).
+    hoconsc:union(connector_api_union(CE ++ EE)).
 
 connector_api_union(Refs) ->
     Index = maps:from_list(Refs),
@@ -357,7 +391,17 @@ roots() ->
     end.
 
 fields(connectors) ->
-    [] ++ enterprise_fields_connectors();
+    [
+        {http,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")),
+                #{
+                    alias => [webhook],
+                    desc => <<"HTTP Connector Config">>,
+                    required => false
+                }
+            )}
+    ] ++ enterprise_fields_connectors();
 fields("node_status") ->
     [
         node_name(),

+ 10 - 4
apps/emqx_license/src/emqx_license_schema.erl

@@ -72,10 +72,16 @@ check_license_watermark(Conf) ->
         undefined ->
             true;
         Low ->
-            High = hocon_maps:get("license.connection_high_watermark", Conf),
-            case High =/= undefined andalso High > Low of
-                true -> true;
-                false -> {bad_license_watermark, #{high => High, low => Low}}
+            case hocon_maps:get("license.connection_high_watermark", Conf) of
+                undefined ->
+                    {bad_license_watermark, #{high => undefined, low => Low}};
+                High ->
+                    {ok, HighFloat} = emqx_schema:to_percent(High),
+                    {ok, LowFloat} = emqx_schema:to_percent(Low),
+                    case HighFloat > LowFloat of
+                        true -> true;
+                        false -> {bad_license_watermark, #{high => High, low => Low}}
+                    end
             end
     end.
 

+ 11 - 0
apps/emqx_license/test/emqx_license_http_api_SUITE.erl

@@ -204,6 +204,17 @@ t_license_setting(_Config) ->
     ?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])),
     ?assertEqual(0.55, emqx_config:get([license, connection_high_watermark])),
 
+    %% update
+    Low1 = <<"50%">>,
+    High1 = <<"100%">>,
+    UpdateRes1 = request(put, uri(["license", "setting"]), #{
+        <<"connection_low_watermark">> => Low1,
+        <<"connection_high_watermark">> => High1
+    }),
+    validate_setting(UpdateRes1, Low1, High1),
+    ?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])),
+    ?assertEqual(1.0, emqx_config:get([license, connection_high_watermark])),
+
     %% update bad setting low >= high
     ?assertMatch(
         {ok, 400, _},

+ 1 - 0
apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl

@@ -464,6 +464,7 @@ apps_to_start() ->
         emqx_modules,
         emqx_gateway,
         emqx_exhook,
+        emqx_bridge_http,
         emqx_bridge,
         emqx_auto_subscribe,
 

+ 1 - 0
apps/emqx_redis/src/emqx_redis.app.src

@@ -5,6 +5,7 @@
     {applications, [
         kernel,
         stdlib,
+        eredis,
         eredis_cluster,
         emqx_connector,
         emqx_resource

+ 3 - 3
apps/emqx_resource/test/emqx_resource_schema_tests.erl

@@ -80,7 +80,7 @@ worker_pool_size_test_() ->
         Conf = emqx_utils_maps:deep_put(
             [
                 <<"bridges">>,
-                <<"webhook">>,
+                <<"http">>,
                 <<"simple">>,
                 <<"resource_opts">>,
                 <<"worker_pool_size">>
@@ -88,7 +88,7 @@ worker_pool_size_test_() ->
             BaseConf,
             WorkerPoolSize
         ),
-        #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
+        #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
         #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
         WPS
     end,
@@ -117,7 +117,7 @@ worker_pool_size_test_() ->
 %%===========================================================================
 
 parse_and_check_webhook_bridge(Hocon) ->
-    #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
+    #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
     Conf.
 
 parse(Hocon) ->

+ 12 - 4
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -583,10 +583,18 @@ get_referenced_hookpoints(Froms) ->
     ].
 
 get_egress_bridges(Actions) ->
-    [
-        emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
-     || {bridge, BridgeType, BridgeName, _ResId} <- Actions
-    ].
+    lists:foldr(
+        fun
+            ({bridge, BridgeType, BridgeName, _ResId}, Acc) ->
+                [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
+            ({bridge_v2, BridgeType, BridgeName}, Acc) ->
+                [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
+            (_, Acc) ->
+                Acc
+        end,
+        [],
+        Actions
+    ).
 
 %% For allowing an external application to add extra "built-in" functions to the
 %% rule engine SQL like language. The module set by

+ 1 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -3468,7 +3468,7 @@ t_get_basic_usage_info_1(_Config) ->
             referenced_bridges =>
                 #{
                     mqtt => 1,
-                    webhook => 3
+                    http => 3
                 }
         },
         emqx_rule_engine:get_basic_usage_info()

+ 44 - 102
apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl

@@ -41,44 +41,32 @@ suite() ->
 apps() ->
     [
         emqx_conf,
-        emqx_management,
+        emqx_connector,
         emqx_retainer,
         emqx_auth,
         emqx_auth_redis,
         emqx_auth_mnesia,
         emqx_auth_postgresql,
         emqx_modules,
-        emqx_telemetry
+        emqx_telemetry,
+        emqx_bridge_http,
+        emqx_bridge,
+        emqx_rule_engine,
+        emqx_management
     ].
 
 init_per_suite(Config) ->
-    net_kernel:start(['master@127.0.0.1', longnames]),
-    ok = meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(
-        emqx_authz_file,
-        acl_conf_file,
-        fun() ->
-            emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf")
-        end
-    ),
-    ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
-    emqx_gateway_test_utils:load_all_gateway_apps(),
-    start_apps(),
-    Config.
+    WorkDir = ?config(priv_dir, Config),
+    Apps = emqx_cth_suite:start(apps(), #{work_dir => WorkDir}),
+    emqx_mgmt_api_test_util:init_suite(),
+    [{apps, Apps}, {work_dir, WorkDir} | Config].
 
-end_per_suite(_Config) ->
-    {ok, _} = emqx:update_config(
-        [authorization],
-        #{
-            <<"no_match">> => <<"allow">>,
-            <<"cache">> => #{<<"enable">> => <<"true">>},
-            <<"sources">> => []
-        }
-    ),
+end_per_suite(Config) ->
     mnesia:clear_table(cluster_rpc_commit),
     mnesia:clear_table(cluster_rpc_mfa),
-    stop_apps(),
-    meck:unload(emqx_authz_file),
+    Apps = ?config(apps, Config),
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_cth_suite:stop(Apps),
     ok.
 
 init_per_testcase(t_get_telemetry_without_memsup, Config) ->
@@ -123,7 +111,6 @@ init_per_testcase(t_advanced_mqtt_features, Config) ->
     mock_advanced_mqtt_features(),
     Config;
 init_per_testcase(t_authn_authz_info, Config) ->
-    mock_httpc(),
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     create_authn('mqtt:global', built_in_database),
     create_authn('tcp:default', redis),
@@ -141,14 +128,11 @@ init_per_testcase(t_send_after_enable, Config) ->
     mock_httpc(),
     Config;
 init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
-    mock_httpc(),
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_bridge]),
     ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
     ok = setup_fake_rule_engine_data(),
     Config;
 init_per_testcase(t_exhook_info, Config) ->
-    mock_httpc(),
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     ExhookConf =
         #{
@@ -173,31 +157,8 @@ init_per_testcase(t_cluster_uuid, Config) ->
     Node = start_slave(n1),
     [{n1, Node} | Config];
 init_per_testcase(t_uuid_restored_from_file, Config) ->
-    mock_httpc(),
-    NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
-    ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
-    DataDir = emqx:data_dir(),
-    NodeUUIDFile = filename:join(DataDir, "node.uuid"),
-    ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
-    file:delete(NodeUUIDFile),
-    file:delete(ClusterUUIDFile),
-    ok = file:write_file(NodeUUIDFile, NodeUUID),
-    ok = file:write_file(ClusterUUIDFile, ClusterUUID),
-
-    %% clear the UUIDs in the DB
-    {atomic, ok} = mria:clear_table(emqx_telemetry),
-    stop_apps(),
-    ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
-    start_apps(),
-    Node = start_slave(n1),
-    [
-        {n1, Node},
-        {node_uuid, NodeUUID},
-        {cluster_uuid, ClusterUUID}
-        | Config
-    ];
+    Config;
 init_per_testcase(t_uuid_saved_to_file, Config) ->
-    mock_httpc(),
     DataDir = emqx:data_dir(),
     NodeUUIDFile = filename:join(DataDir, "node.uuid"),
     ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@@ -205,7 +166,6 @@ init_per_testcase(t_uuid_saved_to_file, Config) ->
     file:delete(ClusterUUIDFile),
     Config;
 init_per_testcase(t_num_clients, Config) ->
-    mock_httpc(),
     ok = snabbkaffe:start_trace(),
     Config;
 init_per_testcase(_Testcase, Config) ->
@@ -227,7 +187,6 @@ end_per_testcase(t_advanced_mqtt_features, _Config) ->
     {atomic, ok} = mria:clear_table(emqx_delayed),
     ok;
 end_per_testcase(t_authn_authz_info, _Config) ->
-    meck:unload([httpc]),
     emqx_authz:update({delete, postgresql}, #{}),
     lists:foreach(
         fun(ChainName) ->
@@ -244,19 +203,8 @@ end_per_testcase(t_enable, _Config) ->
 end_per_testcase(t_send_after_enable, _Config) ->
     meck:unload([httpc, emqx_telemetry_config]);
 end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
-    meck:unload(httpc),
-    lists:foreach(
-        fun(App) ->
-            ok = application:stop(App)
-        end,
-        [
-            emqx_bridge,
-            emqx_rule_engine
-        ]
-    ),
     ok;
 end_per_testcase(t_exhook_info, _Config) ->
-    meck:unload(httpc),
     emqx_exhook_demo_svr:stop(),
     application:stop(emqx_exhook),
     ok;
@@ -264,21 +212,12 @@ end_per_testcase(t_cluster_uuid, Config) ->
     Node = proplists:get_value(n1, Config),
     ok = stop_slave(Node);
 end_per_testcase(t_num_clients, Config) ->
-    meck:unload([httpc]),
     ok = snabbkaffe:stop(),
     Config;
-end_per_testcase(t_uuid_restored_from_file, Config) ->
-    Node = ?config(n1, Config),
-    DataDir = emqx:data_dir(),
-    NodeUUIDFile = filename:join(DataDir, "node.uuid"),
-    ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
-    ok = file:delete(NodeUUIDFile),
-    ok = file:delete(ClusterUUIDFile),
-    meck:unload([httpc]),
-    ok = stop_slave(Node),
-    ok;
 end_per_testcase(_Testcase, _Config) ->
-    meck:unload([httpc]),
+    case catch meck:unload([httpc]) of
+        _ -> ok
+    end,
     ok.
 
 %%------------------------------------------------------------------------------
@@ -315,19 +254,34 @@ t_cluster_uuid(Config) ->
 %% should attempt read UUID from file in data dir to keep UUIDs
 %% unique, in the event of a database purge.
 t_uuid_restored_from_file(Config) ->
-    ExpectedNodeUUID = ?config(node_uuid, Config),
-    ExpectedClusterUUID = ?config(cluster_uuid, Config),
+    %% Stop the emqx_telemetry application first
+    {atomic, ok} = mria:clear_table(emqx_telemetry),
+    application:stop(emqx_telemetry),
+
+    %% Rewrite the the uuid files
+    NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
+    ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
+    DataDir = ?config(work_dir, Config),
+    NodeUUIDFile = filename:join(DataDir, "node.uuid"),
+    ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
+    ok = file:write_file(NodeUUIDFile, NodeUUID),
+    ok = file:write_file(ClusterUUIDFile, ClusterUUID),
+
+    %% Start the emqx_telemetry application again
+    application:start(emqx_telemetry),
+
+    %% Check the UUIDs
     ?assertEqual(
-        {ok, ExpectedNodeUUID},
+        {ok, NodeUUID},
         emqx_telemetry:get_node_uuid()
     ),
     ?assertEqual(
-        {ok, ExpectedClusterUUID},
+        {ok, ClusterUUID},
         emqx_telemetry:get_cluster_uuid()
     ),
     ok.
 
-t_uuid_saved_to_file(_Config) ->
+t_uuid_saved_to_file(Config) ->
     DataDir = emqx:data_dir(),
     NodeUUIDFile = filename:join(DataDir, "node.uuid"),
     ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@@ -337,9 +291,10 @@ t_uuid_saved_to_file(_Config) ->
 
     %% clear the UUIDs in the DB
     {atomic, ok} = mria:clear_table(emqx_telemetry),
-    stop_apps(),
-    ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
-    start_apps(),
+    application:stop(emqx_telemetry),
+
+    application:start(emqx_telemetry),
+
     {ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
     {ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
     ?assertEqual(
@@ -578,6 +533,7 @@ t_mqtt_runtime_insights(_) ->
 
 t_rule_engine_and_data_bridge_info(_Config) ->
     {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
+    ct:pal("telemetry data: ~p~n", [TelemetryData]),
     RuleInfo = get_value(rule_engine, TelemetryData),
     BridgeInfo = get_value(bridge, TelemetryData),
     ?assertEqual(
@@ -588,7 +544,7 @@ t_rule_engine_and_data_bridge_info(_Config) ->
         #{
             data_bridge =>
                 #{
-                    webhook => #{num => 1, num_linked_by_rules => 3},
+                    http => #{num => 1, num_linked_by_rules => 3},
                     mqtt => #{num => 2, num_linked_by_rules => 2}
                 },
             num_data_bridges => 3
@@ -811,14 +767,6 @@ setup_fake_rule_engine_data() ->
         ),
     ok.
 
-set_special_configs(emqx_auth) ->
-    {ok, _} = emqx:update_config([authorization, cache, enable], false),
-    {ok, _} = emqx:update_config([authorization, no_match], deny),
-    {ok, _} = emqx:update_config([authorization, sources], []),
-    ok;
-set_special_configs(_App) ->
-    ok.
-
 %% for some unknown reason, gen_rpc running locally or in CI might
 %% start with different `port_discovery' modes, which means that'll
 %% either be listening at the port in the config (`tcp_server_port',
@@ -887,9 +835,3 @@ leave_cluster() ->
 
 is_official_version(V) ->
     emqx_telemetry_config:is_official_version(V).
-
-start_apps() ->
-    emqx_common_test_helpers:start_apps(apps(), fun set_special_configs/1).
-
-stop_apps() ->
-    emqx_common_test_helpers:stop_apps(lists:reverse(apps())).

+ 1 - 0
changes/ce/fix-12044.en.md

@@ -0,0 +1 @@
+Fix Redis authorization, authentication, and bridges. Previously connections to Redis servers could not be established because driver was not properly loaded.

Разница между файлами не показана из-за своего большого размера
+ 42 - 0
changes/e5.3.2.en.md


+ 38 - 0
changes/v5.3.2.en.md

@@ -0,0 +1,38 @@
+# v5.3.2
+
+## Enhancements
+
+- [#11725](https://github.com/emqx/emqx/pull/11725) Introduced the LDAP as a new authentication and authorization backend.
+
+- [#11752](https://github.com/emqx/emqx/pull/11752) Changed default RPC driver from `gen_rpc` to `rpc` for core-replica database synchronization.
+
+  This improves core-replica data replication latency.
+
+- [#11785](https://github.com/emqx/emqx/pull/11785) Allowed users with the "Viewer" role to change their own passwords. However, those with the "Viewer" role do not have permission to change the passwords of other users.
+
+- [#11787](https://github.com/emqx/emqx/pull/11787) Improved the performance of the `emqx` command.
+
+- [#11790](https://github.com/emqx/emqx/pull/11790) Added validation to Redis commands in Redis authorization source.
+  Additionally, this improvement refines the parsing of Redis commands during authentication and authorization processes.  The parsing now aligns with `redis-cli` compatibility standards and supports quoted arguments.
+
+## Bug Fixes
+
+- [#11757](https://github.com/emqx/emqx/pull/11757) Fixed the error response code when downloading non-existent trace files. Now the response returns `404` instead of `500`.
+
+- [#11762](https://github.com/emqx/emqx/pull/11762) Fixed an issue in EMQX's `built_in_database` authorization source. With this update, all Access Control List (ACL) records are completely removed when an authorization source is deleted. This resolves the issue of residual records remaining in the database when re-creating authorization sources.
+
+- [#11771](https://github.com/emqx/emqx/pull/11771) Fixed validation of Bcrypt salt rounds in authentication management through the API/Dashboard.
+
+- [#11780](https://github.com/emqx/emqx/pull/11780) Fixed validation of the `iterations` field of the `pbkdf2` password hashing algorithm. Now, `iterations` must be strictly positive. Previously, it could be set to 0, which led to a nonfunctional authenticator.
+
+- [#11791](https://github.com/emqx/emqx/pull/11791) Fixed an issue in the EMQX CoAP Gateway where heartbeats were not effectively maintaining the connection's active status. This fix ensures that the heartbeat mechanism properly sustains the liveliness of CoAP Gateway connections.
+
+- [#11797](https://github.com/emqx/emqx/pull/11797) Modified HTTP API behavior for APIs managing the `built_in_database` authorization source. They will now return a `404` status code if `built_in_database` is not set as the authorization source, replacing the former `20X` response.
+
+- [#11965](https://github.com/emqx/emqx/pull/11965) Improved the termination of EMQX services to ensure a graceful stop even in the presence of an unavailable MongoDB resource. 
+
+- [#11975](https://github.com/emqx/emqx/pull/11975) This fix addresses an issue where redundant error logs were generated due to a race condition during simultaneous socket closure by a peer and the server. Previously, concurrent socket close events triggered by the operating system and EMQX resulted in unnecessary error logging. The implemented fix improves event handling to eliminate unnecessary error messages.
+
+- [#11987](https://github.com/emqx/emqx/pull/11987) Fixed a bug where attempting to set the `active_n` option on a TCP/SSL socket could lead to a connection crash. 
+
+  The problem occurred if the socket had already been closed by the time the connection process attempted to apply the `active_n` setting, resulting in a `case_clause` crash.

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.3.2-alpha.1
+version: 5.3.2
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.3.2-alpha.1
+appVersion: 5.3.2

+ 17 - 2
rel/i18n/emqx_bridge_http_schema.hocon

@@ -18,10 +18,10 @@ config_direction.desc:
 config_direction.label:
 """Bridge Direction"""
 
-config_enable.desc:
+config_enable_bridge.desc:
 """Enable or disable this bridge"""
 
-config_enable.label:
+config_enable_bridge.label:
 """Enable Or Disable Bridge"""
 
 config_headers.desc:
@@ -71,6 +71,21 @@ is not allowed."""
 config_url.label:
 """HTTP Bridge"""
 
+config_path.desc:
+"""The URL path for this Action.<br/>
+This path will be appended to the Connector's <code>url</code> configuration to form the full
+URL address.
+Template with variables is allowed in this option. For example, <code>/room/{$room_no}</code>"""
+
+config_path.label:
+"""URL Path"""
+
+config_parameters_opts.desc:
+"""The parameters for HTTP action."""
+
+config_parameters_opts.label:
+"""Parameters"""
+
 desc_config.desc:
 """Configuration for an HTTP bridge."""