Parcourir la source

Merge remote-tracking branch 'upstream/release-58' into 20241007-sync-release-58

Ivan Dyachkov il y a 1 an
Parent
commit
cea7a0f3a1
29 fichiers modifiés avec 672 ajouts et 193 suppressions
  1. 39 0
      .github/workflows/build_and_push_docker_images.yaml
  2. 2 2
      apps/emqx/include/emqx_release.hrl
  3. 18 0
      apps/emqx/src/emqx_broker.erl
  4. 6 1
      apps/emqx_bridge/src/emqx_bridge_v2.erl
  5. 1 1
      apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl
  6. 1 1
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src
  7. 1 1
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl
  8. 272 0
      apps/emqx_bridge_mysql/test/emqx_bridge_v2_mysql_SUITE.erl
  9. 1 11
      apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl
  10. 1 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
  11. 6 0
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl
  12. 13 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl
  13. 2 3
      apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl
  14. 10 9
      apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl
  15. 94 126
      apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl
  16. 0 5
      apps/emqx_dashboard/include/emqx_dashboard.hrl
  17. 81 8
      apps/emqx_dashboard/src/emqx_dashboard_monitor.erl
  18. 42 0
      apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl
  19. 1 1
      apps/emqx_mysql/src/emqx_mysql.app.src
  20. 38 15
      apps/emqx_mysql/src/emqx_mysql.erl
  21. 14 1
      apps/emqx_resource/src/emqx_resource.erl
  22. 14 0
      changes/ce/feat-13942.en.md
  23. 5 0
      changes/ee/fix-13902.en.md
  24. 3 0
      changes/ee/fix-13921.en.md
  25. 1 0
      changes/ee/fix-13927.en.md
  26. 2 2
      deploy/charts/emqx-enterprise/Chart.yaml
  27. 2 2
      deploy/charts/emqx/Chart.yaml
  28. 1 1
      mix.exs
  29. 1 1
      rebar.config

+ 39 - 0
.github/workflows/build_and_push_docker_images.yaml

@@ -27,6 +27,12 @@ on:
         required: false
         required: false
       AWS_SECRET_ACCESS_KEY:
       AWS_SECRET_ACCESS_KEY:
         required: false
         required: false
+      AWS_DEFAULT_REGION:
+        required: false
+      AWS_S3_BUCKET:
+        required: false
+      AWS_CLOUDFRONT_ID:
+        required: false
   workflow_dispatch:
   workflow_dispatch:
     inputs:
     inputs:
       ref:
       ref:
@@ -111,6 +117,7 @@ jobs:
       - uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
       - uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0
         with:
         with:
           ref: ${{ github.event.inputs.ref }}
           ref: ${{ github.event.inputs.ref }}
+
       - uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
       - uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
         with:
         with:
           pattern: "${{ matrix.profile[0] }}-*.tar.gz"
           pattern: "${{ matrix.profile[0] }}-*.tar.gz"
@@ -222,3 +229,35 @@ jobs:
           export BUILD_FROM="${_EMQX_DOCKER_IMAGE_TAG}"
           export BUILD_FROM="${_EMQX_DOCKER_IMAGE_TAG}"
           export EMQX_IMAGE_TAG="${_EMQX_DOCKER_IMAGE_TAG##docker.io/}-sf"
           export EMQX_IMAGE_TAG="${_EMQX_DOCKER_IMAGE_TAG##docker.io/}-sf"
           ./build ${PROFILE} docker
           ./build ${PROFILE} docker
+
+      - uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
+        if: inputs.publish || github.repository_owner != 'emqx'
+        with:
+          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
+          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
+          aws-region: ${{ secrets.AWS_DEFAULT_REGION }}
+
+      - name: upload to aws s3
+        if: inputs.publish || github.repository_owner != 'emqx'
+        env:
+          AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
+          AWS_CLOUDFRONT_ID: ${{ secrets.AWS_CLOUDFRONT_ID }}
+          ORIG_PROFILE: ${{ inputs.profile }}
+        run: |
+          set -xeuo pipefail
+          if [ $ORIG_PROFILE = 'emqx' ]; then
+              s3dir="emqx-ce/v$PKG_VSN"
+          elif [ $ORIG_PROFILE = 'emqx-enterprise' ]; then
+              s3dir="emqx-ee/e$PKG_VSN"
+          else
+              echo "unknown profile $ORIG_PROFILE"
+              exit 1
+          fi
+          docker pull --platform linux/amd64 "${_EMQX_DOCKER_IMAGE_TAG}"
+          docker save "${_EMQX_DOCKER_IMAGE_TAG}" | gzip > "$PROFILE-$PKG_VSN-docker-amd64.tar.gz"
+          docker pull --platform linux/arm64 "${_EMQX_DOCKER_IMAGE_TAG}"
+          docker save "${_EMQX_DOCKER_IMAGE_TAG}" | gzip > "$PROFILE-$PKG_VSN-docker-arm64.tar.gz"
+          ls -lh
+          aws s3 cp "$PROFILE-$PKG_VSN-docker-amd64.tar.gz" "s3://$AWS_S3_BUCKET/$s3dir/"
+          aws s3 cp "$PROFILE-$PKG_VSN-docker-arm64.tar.gz" "s3://$AWS_S3_BUCKET/$s3dir/"
+          aws cloudfront create-invalidation --distribution-id "$AWS_CLOUDFRONT_ID" --paths "/$s3dir/*docker*"

+ 2 - 2
apps/emqx/include/emqx_release.hrl

@@ -32,7 +32,7 @@
 %% `apps/emqx/src/bpapi/README.md'
 %% `apps/emqx/src/bpapi/README.md'
 
 
 %% Opensource edition
 %% Opensource edition
--define(EMQX_RELEASE_CE, "5.8.1-beta.2").
+-define(EMQX_RELEASE_CE, "5.8.1-rc.1").
 
 
 %% Enterprise edition
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.8.1-beta.2").
+-define(EMQX_RELEASE_EE, "5.8.1-rc.1").

+ 18 - 0
apps/emqx/src/emqx_broker.erl

@@ -55,6 +55,11 @@
     subscribed/2
     subscribed/2
 ]).
 ]).
 
 
+%% Folds
+-export([
+    foldl_topics/2
+]).
+
 -export([
 -export([
     get_subopts/2,
     get_subopts/2,
     set_subopts/2
     set_subopts/2
@@ -120,6 +125,7 @@ create_tabs() ->
     TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
     TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
 
 
     %% SubOption: {TopicFilter, SubPid} -> SubOption
     %% SubOption: {TopicFilter, SubPid} -> SubOption
+    %% NOTE: `foldl_topics/2` relies on it being ordered.
     ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]),
     ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]),
 
 
     %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ...
     %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ...
@@ -518,6 +524,18 @@ set_subopts(SubPid, Topic, NewOpts) ->
             false
             false
     end.
     end.
 
 
+-spec foldl_topics(fun((emqx_types:topic() | emqx_types:share(), Acc) -> Acc), Acc) ->
+    Acc.
+foldl_topics(FoldFun, Acc) ->
+    First = ets:first(?SUBOPTION),
+    foldl_topics(FoldFun, Acc, First).
+
+foldl_topics(FoldFun, Acc, {Topic, _SubPid}) ->
+    Next = ets:next(?SUBOPTION, {Topic, _GreaterThanAnyPid = []}),
+    foldl_topics(FoldFun, FoldFun(Topic, Acc), Next);
+foldl_topics(_FoldFun, Acc, '$end_of_table') ->
+    Acc.
+
 -spec topics() -> [emqx_types:topic() | emqx_types:share()].
 -spec topics() -> [emqx_types:topic() | emqx_types:share()].
 topics() ->
 topics() ->
     emqx_router:topics().
     emqx_router:topics().

+ 6 - 1
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -702,7 +702,7 @@ do_query_with_enabled_config(
     ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeType),
     ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeType),
     ConnectorResId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
     ConnectorResId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
     QueryOpts = maps:merge(
     QueryOpts = maps:merge(
-        emqx_bridge:query_opts(Config),
+        query_opts(BridgeType, Config),
         QueryOpts0#{
         QueryOpts0#{
             connector_resource_id => ConnectorResId,
             connector_resource_id => ConnectorResId,
             query_mode => QueryMode
             query_mode => QueryMode
@@ -721,6 +721,11 @@ do_query_with_enabled_config(
 send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
 send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
     query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
     query(BridgeType, BridgeName, {send_message, Message}, QueryOpts0).
 
 
+query_opts(ActionOrSourceType, Config) ->
+    ConnectorType = connector_type(ActionOrSourceType),
+    Mod = emqx_connector_resource:connector_to_resource_type(ConnectorType),
+    emqx_resource:get_query_opts(Mod, Config).
+
 -spec health_check(BridgeType :: term(), BridgeName :: term()) ->
 -spec health_check(BridgeType :: term(), BridgeName :: term()) ->
     #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
     #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
 health_check(BridgeType, BridgeName) ->
 health_check(BridgeType, BridgeName) ->

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl

@@ -1511,7 +1511,7 @@ t_unrecoverable_error(Config) ->
             ),
             ),
         fun(Trace) ->
         fun(Trace) ->
             ?assertMatch(
             ?assertMatch(
-                [#{reason := killed}],
+                [#{reason := killed} | _],
                 ?of_kind(gcp_pubsub_request_failed, Trace)
                 ?of_kind(gcp_pubsub_request_failed, Trace)
             ),
             ),
             ok
             ok

+ 1 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mysql, [
 {application, emqx_bridge_mysql, [
     {description, "EMQX Enterprise MySQL Bridge"},
     {description, "EMQX Enterprise MySQL Bridge"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 1 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl

@@ -141,7 +141,7 @@ on_remove_channel(
     _InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId
     _InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId
 ) when is_map_key(ChannelId, Channels) ->
 ) when is_map_key(ChannelId, Channels) ->
     ChannelConfig = maps:get(ChannelId, Channels),
     ChannelConfig = maps:get(ChannelId, Channels),
-    emqx_mysql:unprepare_sql(maps:merge(ChannelConfig, ConnectorState)),
+    emqx_mysql:unprepare_sql(ChannelId, maps:merge(ChannelConfig, ConnectorState)),
     NewState = State#{channels => maps:remove(ChannelId, Channels)},
     NewState = State#{channels => maps:remove(ChannelId, Channels)},
     {ok, NewState};
     {ok, NewState};
 on_remove_channel(_InstanceId, State, _ChannelId) ->
 on_remove_channel(_InstanceId, State, _ChannelId) ->

+ 272 - 0
apps/emqx_bridge_mysql/test/emqx_bridge_v2_mysql_SUITE.erl

@@ -0,0 +1,272 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_v2_mysql_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(BRIDGE_TYPE, mysql).
+-define(BRIDGE_TYPE_BIN, <<"mysql">>).
+-define(CONNECTOR_TYPE, mysql).
+-define(CONNECTOR_TYPE_BIN, <<"mysql">>).
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+-import(emqx_utils_conv, [bin/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    MysqlHost = os:getenv("MYSQL_TCP_HOST", "toxiproxy"),
+    MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
+    case emqx_common_test_helpers:is_tcp_server_available(MysqlHost, MysqlPort) of
+        true ->
+            Apps = emqx_cth_suite:start(
+                [
+                    emqx,
+                    emqx_conf,
+                    emqx_connector,
+                    emqx_bridge,
+                    emqx_bridge_mysql,
+                    emqx_rule_engine,
+                    emqx_management,
+                    emqx_mgmt_api_test_util:emqx_dashboard()
+                ],
+                #{work_dir => emqx_cth_suite:work_dir(Config)}
+            ),
+            NConfig = [
+                {apps, Apps},
+                {mysql_host, MysqlHost},
+                {mysql_port, MysqlPort},
+                {enable_tls, false},
+                {mysql_host, MysqlHost},
+                {mysql_port, MysqlPort}
+                | Config
+            ],
+            emqx_bridge_mysql_SUITE:connect_and_drop_table(NConfig),
+            NConfig;
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_mysql);
+                _ ->
+                    {skip, no_mysql}
+            end
+    end.
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok.
+
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(_Group, _Config) ->
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    ct:timetrap(timer:seconds(60)),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+    emqx_config:delete_override_conf_files(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
+    Username = <<"root">>,
+    Password = <<"public">>,
+    Passfile = filename:join(?config(priv_dir, Config), "passfile"),
+    ok = file:write_file(Passfile, Password),
+    NConfig = [
+        {mysql_username, Username},
+        {mysql_password, Password},
+        {mysql_passfile, Passfile}
+        | Config
+    ],
+    emqx_bridge_mysql_SUITE:connect_and_create_table(NConfig),
+    ConnectorConfig = connector_config(Name, NConfig),
+    BridgeConfig = bridge_config(Name, Name),
+    ok = snabbkaffe:start_trace(),
+    [
+        {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
+        {connector_name, Name},
+        {connector_config, ConnectorConfig},
+        {action_type, proplists:get_value(action_type, Config, ?BRIDGE_TYPE)},
+        {action_name, Name},
+        {bridge_config, BridgeConfig}
+        | NConfig
+    ].
+
+end_per_testcase(_Testcase, Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ok;
+        false ->
+            emqx_bridge_mysql_SUITE:connect_and_drop_table(Config),
+            emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+            emqx_common_test_helpers:call_janitor(60_000),
+            ok = snabbkaffe:stop(),
+            ok
+    end.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+connector_config(Name, Config) ->
+    MysqlHost = ?config(mysql_host, Config),
+    MysqlPort = ?config(mysql_port, Config),
+    Username = ?config(mysql_username, Config),
+    PassFile = ?config(mysql_passfile, Config),
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"database">> => <<"mqtt">>,
+            <<"server">> => iolist_to_binary([MysqlHost, ":", integer_to_binary(MysqlPort)]),
+            <<"pool_size">> => 8,
+            <<"username">> => Username,
+            <<"password">> => iolist_to_binary(["file://", PassFile]),
+            <<"resource_opts">> => #{
+                <<"health_check_interval">> => <<"15s">>,
+                <<"start_after_created">> => true,
+                <<"start_timeout">> => <<"5s">>
+            }
+        },
+    InnerConfigMap = serde_roundtrip(InnerConfigMap0),
+    emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap).
+
+default_sql() ->
+    <<
+        "INSERT INTO mqtt_test(payload, arrived) "
+        "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))"
+    >>.
+
+bad_sql() ->
+    <<
+        "INSERT INTO mqtt_test(payload, arrivedx) "
+        "VALUES (${payload}, FROM_UNIXTIME(${timestamp}/1000))"
+    >>.
+
+bridge_config(Name, ConnectorId) ->
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"connector">> => ConnectorId,
+            <<"parameters">> =>
+                #{<<"sql">> => default_sql()},
+            <<"local_topic">> => <<"t/mysql">>,
+            <<"resource_opts">> => #{
+                <<"batch_size">> => 1,
+                <<"batch_time">> => <<"0ms">>,
+                <<"buffer_mode">> => <<"memory_only">>,
+                <<"buffer_seg_bytes">> => <<"10MB">>,
+                <<"health_check_interval">> => <<"15s">>,
+                <<"inflight_window">> => 100,
+                <<"max_buffer_bytes">> => <<"256MB">>,
+                <<"metrics_flush_interval">> => <<"1s">>,
+                <<"query_mode">> => <<"sync">>,
+                <<"request_ttl">> => <<"45s">>,
+                <<"worker_pool_size">> => <<"1">>
+            }
+        },
+    InnerConfigMap = serde_roundtrip(InnerConfigMap0),
+    parse_and_check_bridge_config(InnerConfigMap, Name).
+
+%% check it serializes correctly
+serde_roundtrip(InnerConfigMap0) ->
+    IOList = hocon_pp:do(InnerConfigMap0, #{}),
+    {ok, InnerConfigMap} = hocon:binary(IOList),
+    InnerConfigMap.
+
+parse_and_check_bridge_config(InnerConfigMap, Name) ->
+    emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
+
+make_message() ->
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    #{
+        clientid => ClientId,
+        payload => Payload,
+        timestamp => 1668602148000
+    }.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_create_via_http(Config) ->
+    emqx_bridge_v2_testlib:t_create_via_http(Config),
+    ok.
+
+t_on_get_status(Config) ->
+    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    ok.
+
+t_start_action_or_source_with_disabled_connector(Config) ->
+    ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
+    ok.
+
+t_update_with_invalid_prepare(Config) ->
+    ConnectorName = ?config(connector_name, Config),
+    BridgeName = ?config(action_name, Config),
+    {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config),
+    %% arrivedx is a bad column name
+    BadSQL = bad_sql(),
+    Override = #{<<"parameters">> => #{<<"sql">> => BadSQL}},
+    {ok, {{_, 200, "OK"}, _Headers1, Body1}} =
+        emqx_bridge_v2_testlib:update_bridge_api(Config, Override),
+    ?assertMatch(#{<<"status">> := <<"disconnected">>}, Body1),
+    Error1 = maps:get(<<"error">>, Body1),
+    case re:run(Error1, <<"Unknown column">>, [{capture, none}]) of
+        match ->
+            ok;
+        nomatch ->
+            ct:fail(#{
+                expected_pattern => "undefined_column",
+                got => Error1
+            })
+    end,
+    %% assert that although there was an error returned, the invliad SQL is actually put
+    C1 = [{action_name, BridgeName}, {action_type, mysql} | Config],
+    {ok, {{_, 200, "OK"}, _, Action}} = emqx_bridge_v2_testlib:get_action_api(C1),
+    #{<<"parameters">> := #{<<"sql">> := FetchedSQL}} = Action,
+    ?assertEqual(FetchedSQL, BadSQL),
+
+    %% update again with the original sql
+    {ok, {{_, 200, "OK"}, _Headers2, Body2}} =
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{}),
+    %% the error should be gone now, and status should be 'connected'
+    ?assertMatch(#{<<"error">> := <<>>, <<"status">> := <<"connected">>}, Body2),
+    %% finally check if ecpool worker should have exactly one of reconnect callback
+    ConnectorResId = <<"connector:mysql:", ConnectorName/binary>>,
+    Workers = ecpool:workers(ConnectorResId),
+    [_ | _] = WorkerPids = lists:map(fun({_, Pid}) -> Pid end, Workers),
+    lists:foreach(
+        fun(Pid) ->
+            [{emqx_mysql, prepare_sql_to_conn, Args}] =
+                ecpool_worker:get_reconnect_callbacks(Pid),
+            Sig = emqx_mysql:get_reconnect_callback_signature(Args),
+            BridgeResId = <<"action:mysql:", BridgeName/binary, $:, ConnectorResId/binary>>,
+            ?assertEqual(BridgeResId, Sig)
+        end,
+        WorkerPids
+    ),
+    ok.

+ 1 - 11
apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl

@@ -186,17 +186,7 @@ connector_config(Name, Config) ->
             }
             }
         },
         },
     InnerConfigMap = serde_roundtrip(InnerConfigMap0),
     InnerConfigMap = serde_roundtrip(InnerConfigMap0),
-    parse_and_check_connector_config(InnerConfigMap, Name).
-
-parse_and_check_connector_config(InnerConfigMap, Name) ->
-    TypeBin = ?CONNECTOR_TYPE_BIN,
-    RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
-    #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
-        hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
-            required => false, atom_key => false
-        }),
-    ct:pal("parsed config: ~p", [Config]),
-    InnerConfigMap.
+    emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap).
 
 
 bridge_config(Name, ConnectorId) ->
 bridge_config(Name, ConnectorId) ->
     InnerConfigMap0 =
     InnerConfigMap0 =

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pulsar, [
 {application, emqx_bridge_pulsar, [
     {description, "EMQX Pulsar Bridge"},
     {description, "EMQX Pulsar Bridge"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 6 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -13,6 +13,7 @@
     resource_type/0,
     resource_type/0,
     callback_mode/0,
     callback_mode/0,
     query_mode/1,
     query_mode/1,
+    query_opts/1,
     on_start/2,
     on_start/2,
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
@@ -65,6 +66,11 @@ query_mode(#{resource_opts := #{query_mode := sync}}) ->
 query_mode(_Config) ->
 query_mode(_Config) ->
     simple_async_internal_buffer.
     simple_async_internal_buffer.
 
 
+query_opts(#{resource_opts := #{query_mode := sync}, parameters := #{sync_timeout := Timeout}}) ->
+    #{timeout => Timeout};
+query_opts(_) ->
+    #{}.
+
 -spec on_start(resource_id(), config()) -> {ok, state()}.
 -spec on_start(resource_id(), config()) -> {ok, state()}.
 on_start(InstanceId, Config) ->
 on_start(InstanceId, Config) ->
     #{servers := Servers0, ssl := SSL} = Config,
     #{servers := Servers0, ssl := SSL} = Config,

+ 13 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl

@@ -69,9 +69,21 @@ fields(action_resource_opts) ->
         inflight_window,
         inflight_window,
         max_buffer_bytes
         max_buffer_bytes
     ],
     ],
-    lists:filter(
+    Fields = lists:filter(
         fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
         fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
         emqx_bridge_v2_schema:action_resource_opts_fields()
         emqx_bridge_v2_schema:action_resource_opts_fields()
+    ),
+    Overrides = #{request_ttl => #{deprecated => {since, "5.8.1"}}},
+    lists:map(
+        fun({K, Sc}) ->
+            case maps:find(K, Overrides) of
+                {ok, Override} ->
+                    {K, hocon_schema:override(Sc, Override)};
+                error ->
+                    {K, Sc}
+            end
+        end,
+        Fields
     );
     );
 fields(Field) when
 fields(Field) when
     Field == "get_bridge_v2";
     Field == "get_bridge_v2";

+ 2 - 3
apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl

@@ -100,10 +100,9 @@ select_routes_by_topics(Topics) ->
     [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].
     [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].
 
 
 select_routes_by_wildcards(Wildcards) ->
 select_routes_by_wildcards(Wildcards) ->
-    emqx_utils_ets:keyfoldl(
+    emqx_broker:foldl_topics(
         fun(Topic, Acc) -> intersecting_route(Topic, Wildcards) ++ Acc end,
         fun(Topic, Acc) -> intersecting_route(Topic, Wildcards) ++ Acc end,
-        [],
-        ?SUBSCRIBER
+        []
     ).
     ).
 
 
 select_shared_sub_routes_by_topics([T | Topics]) ->
 select_shared_sub_routes_by_topics([T | Topics]) ->

+ 10 - 9
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -373,9 +373,7 @@ handle_info(
         #{
         #{
             result := Res,
             result := Res,
             need_bootstrap := NeedBootstrap
             need_bootstrap := NeedBootstrap
-        } = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(
-        Payload
-    ),
+        } = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(Payload),
     St1 = St#st{
     St1 = St#st{
         actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap
         actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap
     },
     },
@@ -402,8 +400,6 @@ handle_info(
             St2 = ensure_reconnect_timer(St1#st{error = Reason, status = disconnected}),
             St2 = ensure_reconnect_timer(St1#st{error = Reason, status = disconnected}),
             {noreply, St2}
             {noreply, St2}
     end;
     end;
-handle_info({publish, #{}}, St) ->
-    {noreply, St};
 handle_info({timeout, TRef, reconnect}, St = #st{reconnect_timer = TRef}) ->
 handle_info({timeout, TRef, reconnect}, St = #st{reconnect_timer = TRef}) ->
     {noreply, process_connect(St#st{reconnect_timer = undefined})};
     {noreply, process_connect(St#st{reconnect_timer = undefined})};
 handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
 handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
@@ -493,14 +489,19 @@ handle_connect_error(Reason, St) ->
     _ = maybe_alarm(Reason, St),
     _ = maybe_alarm(Reason, St),
     ensure_reconnect_timer(St#st{error = Reason, status = disconnected}).
     ensure_reconnect_timer(St#st{error = Reason, status = disconnected}).
 
 
-handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) ->
-    ?tp(error, "cluster_link_connection_failed", #{
+handle_client_down(
+    Reason,
+    St = #st{target = TargetCluster, actor = Actor, bootstrapped = Bootstrapped}
+) ->
+    ?SLOG(error, #{
+        msg => "cluster_link_connection_failed",
         reason => Reason,
         reason => Reason,
         target_cluster => St#st.target,
         target_cluster => St#st.target,
         actor => St#st.actor
         actor => St#st.actor
     }),
     }),
-    %% TODO: syncer may be already down due to one_for_all strategy
-    ok = suspend_syncer(TargetCluster, Actor),
+    %% NOTE: There's no syncer yet if bootstrap haven't finished.
+    %% TODO: Syncer may be already down due to one_for_all strategy.
+    _ = Bootstrapped andalso suspend_syncer(TargetCluster, Actor),
     _ = maybe_alarm(Reason, St),
     _ = maybe_alarm(Reason, St),
     NSt = cancel_heartbeat(St),
     NSt = cancel_heartbeat(St),
     process_connect(NSt#st{client = undefined, error = Reason, status = connecting}).
     process_connect(NSt#st{client = undefined, error = Reason, status = connecting}).

+ 94 - 126
apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl

@@ -12,6 +12,9 @@
 -compile(export_all).
 -compile(export_all).
 -compile(nowarn_export_all).
 -compile(nowarn_export_all).
 
 
+-define(BASE_CLINK_MQTT_PORT, 1883).
+-define(BASE_CLUSTER_NODE_PORT, 10000).
+
 all() ->
 all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
@@ -22,56 +25,41 @@ end_per_suite(_Config) ->
     ok.
     ok.
 
 
 init_per_testcase(TCName, Config) ->
 init_per_testcase(TCName, Config) ->
-    emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config).
+    emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, [{tc_name, TCName} | Config]).
 
 
 end_per_testcase(TCName, Config) ->
 end_per_testcase(TCName, Config) ->
-    %% @NOTE: Clean work_dir for this TC to avoid running out of disk space
-    %% causing other test run flaky. Uncomment it if you need to preserve the
-    %% work_dir for troubleshooting
-    t_config_update_ds =:= TCName andalso
-        emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
     emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
     emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
 
 
-mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) ->
-    AppsA = [{emqx_conf, ConfA}, emqx_cluster_link],
-    AppsA1 = [
-        {emqx_conf, combine([ConfA, conf_mqtt_listener(PortA)])},
-        emqx_cluster_link
-    ],
-    AppsB = [{emqx_conf, ConfB}, emqx_cluster_link],
-    AppsB1 = [
-        {emqx_conf, combine([ConfB, conf_mqtt_listener(PortB)])},
-        emqx_cluster_link
+mk_cluster(N, ClusterName, BaseSpecs, ExtraConf, CTConfig) when is_list(BaseSpecs) ->
+    Specs = [
+        mk_cluster_nodespec(N, ClusterName, S, I, ExtraConf)
+     || {I, S} <- lists:enumerate(BaseSpecs)
     ],
     ],
+    emqx_cth_cluster:mk_nodespecs(
+        Specs,
+        #{work_dir => emqx_cth_suite:work_dir(CTConfig)}
+    );
+mk_cluster(N, ClusterName, Size, ExtraConf, CTConfig) when is_integer(Size) ->
+    mk_cluster(N, ClusterName, lists:duplicate(Size, #{}), ExtraConf, CTConfig).
+
+mk_cluster_nodespec(N, ClusterName, BaseSpec, NodeI, ExtraConf) ->
+    Conf = mk_emqx_conf(N, ClusterName, NodeI, ExtraConf),
+    Spec = BaseSpec#{
+        apps => [{emqx_conf, Conf}, emqx_cluster_link],
+        base_port => N * ?BASE_CLUSTER_NODE_PORT + NodeI * 100
+    },
+    {mk_nodename(ClusterName, NodeI), Spec}.
 
 
-    NodesA = emqx_cth_cluster:mk_nodespecs(
-        [
-            {mk_nodename(NameA, 1), #{apps => AppsA}},
-            {mk_nodename(NameA, 2), #{apps => AppsA}},
-            {mk_nodename(NameA, 3), #{apps => AppsA1, role => replicant}}
-        ],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    NodesB = emqx_cth_cluster:mk_nodespecs(
-        [
-            {mk_nodename(NameB, 1), #{apps => AppsB, base_port => 20100}},
-            {mk_nodename(NameB, 2), #{apps => AppsB1, base_port => 20200}}
-        ],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    {NodesA, NodesB}.
+mk_emqx_conf(N, ClusterName, _NodeI = 1, ExtraConf) ->
+    MQTTPort = ?BASE_CLINK_MQTT_PORT + N * 10000,
+    ListenerConf = conf_mqtt_listener(MQTTPort),
+    combine([conf_cluster(ClusterName), ListenerConf, ExtraConf]);
+mk_emqx_conf(_, ClusterName, _NodeI, ExtraConf) ->
+    combine([conf_cluster(ClusterName), ExtraConf]).
 
 
 t_config_update_cli('init', Config0) ->
 t_config_update_cli('init', Config0) ->
-    Config1 =
-        [
-            {name_prefix, ?FUNCTION_NAME}
-            | Config0
-        ],
-    Config2 = t_config_update('init', Config1),
-    [
-        {update_from, cli}
-        | lists:keydelete(update_from, 1, Config2)
-    ];
+    Config = t_config_update('init', Config0),
+    lists:keystore(update_from, 1, Config, {update_from, cli});
 t_config_update_cli('end', Config) ->
 t_config_update_cli('end', Config) ->
     t_config_update('end', Config).
     t_config_update('end', Config).
 
 
@@ -79,26 +67,17 @@ t_config_update_cli(Config) ->
     t_config_update(Config).
     t_config_update(Config).
 
 
 t_config_update('init', Config) ->
 t_config_update('init', Config) ->
-    NamePrefix =
-        case ?config(name_prefix, Config) of
-            undefined -> ?FUNCTION_NAME;
-            Name -> Name
-        end,
+    NamePrefix = ?config(tc_name, Config),
     NameA = fmt("~s_~s", [NamePrefix, "a"]),
     NameA = fmt("~s_~s", [NamePrefix, "a"]),
     NameB = fmt("~s_~s", [NamePrefix, "b"]),
     NameB = fmt("~s_~s", [NamePrefix, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log()]),
-    ConfB = combine([conf_cluster(NameB), conf_log()]),
-    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    NodesA = mk_cluster(1, NameA, 2, conf_log(), Config),
+    NodesB = mk_cluster(2, NameB, 2, conf_log(), Config),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     ok = snabbkaffe:start_trace(),
     [
     [
         {cluster_a, ClusterA},
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_a, NameA},
         {name_b, NameB},
         {name_b, NameB},
         {update_from, api}
         {update_from, api}
@@ -110,10 +89,10 @@ t_config_update('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
     ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
 
 
 t_config_update(Config) ->
 t_config_update(Config) ->
-    [NodeA1, _, _] = ?config(cluster_a, Config),
-    [NodeB1, _] = ?config(cluster_b, Config),
-    LPortA = ?config(lport_a, Config),
-    LPortB = ?config(lport_b, Config),
+    ClusterA = [NodeA1 | _] = ?config(cluster_a, Config),
+    ClusterB = [NodeB1 | _] = ?config(cluster_b, Config),
+    LPortA = tcp_port(NodeA1, clink),
+    LPortB = tcp_port(NodeB1, clink),
     NameA = ?config(name_a, Config),
     NameA = ?config(name_a, Config),
     NameB = ?config(name_b, Config),
     NameB = ?config(name_b, Config),
 
 
@@ -127,35 +106,29 @@ t_config_update(Config) ->
     LinkConfA = #{
     LinkConfA = #{
         <<"enable">> => true,
         <<"enable">> => true,
         <<"pool_size">> => 1,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameB
         <<"name">> => NameB
     },
     },
     LinkConfB = #{
     LinkConfB = #{
         <<"enable">> => true,
         <<"enable">> => true,
         <<"pool_size">> => 1,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortA]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameA
         <<"name">> => NameA
     },
     },
 
 
     {ok, SubRef} = snabbkaffe:subscribe(
     {ok, SubRef} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 5 nodes = 5 actors (durable storage is disabled)
-        5,
+        %% Num nodes = num actors (durable storage is disabled)
+        length(ClusterA) + length(ClusterB),
         30_000
         30_000
     ),
     ),
     ?assertMatch({ok, _}, update(NodeA1, [LinkConfA], Config)),
     ?assertMatch({ok, _}, update(NodeA1, [LinkConfA], Config)),
     ?assertMatch({ok, _}, update(NodeB1, [LinkConfB], Config)),
     ?assertMatch({ok, _}, update(NodeB1, [LinkConfB], Config)),
 
 
     ?assertMatch(
     ?assertMatch(
-        {ok, [
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete}
-        ]},
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
         snabbkaffe:receive_events(SubRef)
         snabbkaffe:receive_events(SubRef)
     ),
     ),
 
 
@@ -179,8 +152,7 @@ t_config_update(Config) ->
 
 
     {ok, SubRef1} = snabbkaffe:subscribe(
     {ok, SubRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 3 nodes in cluster a
-        3,
+        length(ClusterA),
         30_000
         30_000
     ),
     ),
 
 
@@ -189,11 +161,7 @@ t_config_update(Config) ->
     ?assertMatch({ok, _}, update(NodeA1, [LinkConfA1], Config)),
     ?assertMatch({ok, _}, update(NodeA1, [LinkConfA1], Config)),
 
 
     ?assertMatch(
     ?assertMatch(
-        {ok, [
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete}
-        ]},
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
         snabbkaffe:receive_events(SubRef1)
         snabbkaffe:receive_events(SubRef1)
     ),
     ),
 
 
@@ -237,20 +205,14 @@ t_config_update(Config) ->
 t_config_validations('init', Config) ->
 t_config_validations('init', Config) ->
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log()]),
-    ConfB = combine([conf_cluster(NameB), conf_log()]),
-    %% Single node clusters are enough for a basic validation test
-    {[NodeA, _, _], [NodeB, _]} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
-    ClusterA = emqx_cth_cluster:start([NodeA]),
-    ClusterB = emqx_cth_cluster:start([NodeB]),
+    NodesA = mk_cluster(1, NameA, 1, conf_log(), Config),
+    NodesB = mk_cluster(2, NameB, 1, conf_log(), Config),
+    ClusterA = emqx_cth_cluster:start(NodesA),
+    ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     ok = snabbkaffe:start_trace(),
     [
     [
         {cluster_a, ClusterA},
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_a, NameA},
         {name_b, NameB}
         {name_b, NameB}
         | Config
         | Config
@@ -262,14 +224,15 @@ t_config_validations('end', Config) ->
 
 
 t_config_validations(Config) ->
 t_config_validations(Config) ->
     [NodeA] = ?config(cluster_a, Config),
     [NodeA] = ?config(cluster_a, Config),
-    LPortB = ?config(lport_b, Config),
-
+    [NodeB] = ?config(cluster_b, Config),
     NameB = ?config(name_b, Config),
     NameB = ?config(name_b, Config),
 
 
+    LPortB = tcp_port(NodeB, clink),
+
     LinkConfA = #{
     LinkConfA = #{
         <<"enable">> => true,
         <<"enable">> => true,
         <<"pool_size">> => 1,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameB
         <<"name">> => NameB
     },
     },
@@ -352,21 +315,17 @@ t_config_validations(Config) ->
     ).
     ).
 
 
 t_config_update_ds('init', Config) ->
 t_config_update_ds('init', Config) ->
+    Conf = combine([conf_log(), conf_ds()]),
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log(), conf_ds()]),
-    ConfB = combine([conf_cluster(NameB), conf_log(), conf_ds()]),
-    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    NodesA = mk_cluster(1, NameA, [#{role => replicant}, #{role => core}], Conf, Config),
+    NodesB = mk_cluster(2, NameB, [#{role => replicant}, #{role => core}], Conf, Config),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     ok = snabbkaffe:start_trace(),
     [
     [
         {cluster_a, ClusterA},
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_a, NameA},
         {name_b, NameB}
         {name_b, NameB}
         | Config
         | Config
@@ -374,15 +333,19 @@ t_config_update_ds('init', Config) ->
 t_config_update_ds('end', Config) ->
 t_config_update_ds('end', Config) ->
     ok = snabbkaffe:stop(),
     ok = snabbkaffe:stop(),
     ok = emqx_cth_cluster:stop(?config(cluster_a, Config)),
     ok = emqx_cth_cluster:stop(?config(cluster_a, Config)),
-    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
+    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)),
+    %% @NOTE: Clean work_dir for this TC to avoid running out of disk space
+    %% causing other test run flaky. Uncomment it if you need to preserve the
+    %% work_dir for troubleshooting
+    emqx_cth_suite:clean_work_dir(?config(work_dir, Config)).
 
 
 t_config_update_ds(Config) ->
 t_config_update_ds(Config) ->
     %% @NOTE: for troubleshooting this TC,
     %% @NOTE: for troubleshooting this TC,
     %% take a look in end_per_testcase/2 to preserve the work dir
     %% take a look in end_per_testcase/2 to preserve the work dir
-    [NodeA1, _, _] = ?config(cluster_a, Config),
-    [NodeB1, _] = ?config(cluster_b, Config),
-    LPortA = ?config(lport_a, Config),
-    LPortB = ?config(lport_b, Config),
+    [NodeA1 | _] = ?config(cluster_a, Config),
+    [NodeB1 | _] = ?config(cluster_b, Config),
+    LPortA = tcp_port(NodeA1, clink),
+    LPortB = tcp_port(NodeB1, clink),
     NameA = ?config(name_a, Config),
     NameA = ?config(name_a, Config),
     NameB = ?config(name_b, Config),
     NameB = ?config(name_b, Config),
 
 
@@ -394,23 +357,22 @@ t_config_update_ds(Config) ->
     LinkConfA = #{
     LinkConfA = #{
         <<"enable">> => true,
         <<"enable">> => true,
         <<"pool_size">> => 1,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameB
         <<"name">> => NameB
     },
     },
     LinkConfB = #{
     LinkConfB = #{
         <<"enable">> => true,
         <<"enable">> => true,
         <<"pool_size">> => 1,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortA]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameA
         <<"name">> => NameA
     },
     },
 
 
     {ok, SubRef} = snabbkaffe:subscribe(
     {ok, SubRef} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 5 nodes = 9 actors (durable storage is enabled,
-        %% 1 replicant node is not doing ds bootstrap)
-        9,
+        %% 2 cores = 4 actors (durable storage enabled) + 2 replicants = 2 more actors
+        6,
         30_000
         30_000
     ),
     ),
     ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])),
     ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])),
@@ -423,8 +385,10 @@ t_config_update_ds(Config) ->
         [#{ps_actor_incarnation := 0}], erpc:call(NodeB1, emqx, get_config, [[cluster, links]])
         [#{ps_actor_incarnation := 0}], erpc:call(NodeB1, emqx, get_config, [[cluster, links]])
     ),
     ),
 
 
-    {ok, Events} = snabbkaffe:receive_events(SubRef),
-    ?assertEqual(9, length(Events)),
+    ?assertMatch(
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
+        snabbkaffe:receive_events(SubRef)
+    ),
 
 
     {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"hello-from-a">>, qos1),
     {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"hello-from-a">>, qos1),
     {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"hello-from-b">>, qos1),
     {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"hello-from-b">>, qos1),
@@ -445,8 +409,8 @@ t_config_update_ds(Config) ->
     ?assertNotReceive({publish, _Message = #{}}),
     ?assertNotReceive({publish, _Message = #{}}),
     {ok, SubRef1} = snabbkaffe:subscribe(
     {ok, SubRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 3 nodes (1 replicant) in cluster a (5 actors including ds)
-        5,
+        %% 2 nodes (1 replicant) in cluster a (3 actors including ds)
+        3,
         30_000
         30_000
     ),
     ),
 
 
@@ -455,8 +419,10 @@ t_config_update_ds(Config) ->
     LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
     LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
     ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])),
     ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])),
 
 
-    {ok, Events1} = snabbkaffe:receive_events(SubRef1),
-    ?assertEqual(5, length(Events1)),
+    ?assertMatch(
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
+        snabbkaffe:receive_events(SubRef1)
+    ),
 
 
     %% wait for route sync on ClientA node
     %% wait for route sync on ClientA node
     {{ok, _, _}, {ok, _}} = ?wait_async_action(
     {{ok, _, _}, {ok, _}} = ?wait_async_action(
@@ -490,19 +456,14 @@ t_config_update_ds(Config) ->
 t_misconfigured_links('init', Config) ->
 t_misconfigured_links('init', Config) ->
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log()]),
-    ConfB = combine([conf_cluster(NameB), conf_log()]),
-    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    NodesA = mk_cluster(1, NameA, [#{role => replicant}, #{role => core}], conf_log(), Config),
+    NodesB = mk_cluster(2, NameB, [#{role => core}], conf_log(), Config),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     ok = snabbkaffe:start_trace(),
     [
     [
         {cluster_a, ClusterA},
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_a, NameA},
         {name_b, NameB}
         {name_b, NameB}
         | Config
         | Config
@@ -513,10 +474,10 @@ t_misconfigured_links('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
     ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
 
 
 t_misconfigured_links(Config) ->
 t_misconfigured_links(Config) ->
-    [NodeA1, _, _] = ?config(cluster_a, Config),
-    [NodeB1, _] = ?config(cluster_b, Config),
-    LPortA = ?config(lport_a, Config),
-    LPortB = ?config(lport_b, Config),
+    [NodeA1 | _] = ?config(cluster_a, Config),
+    [NodeB1 | _] = ?config(cluster_b, Config),
+    LPortA = tcp_port(NodeA1, clink),
+    LPortB = tcp_port(NodeB1, clink),
     NameA = ?config(name_a, Config),
     NameA = ?config(name_a, Config),
     NameB = ?config(name_b, Config),
     NameB = ?config(name_b, Config),
 
 
@@ -529,14 +490,14 @@ t_misconfigured_links(Config) ->
     LinkConfA = #{
     LinkConfA = #{
         <<"enable">> => true,
         <<"enable">> => true,
         <<"pool_size">> => 1,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => <<"bad-b-name">>
         <<"name">> => <<"bad-b-name">>
     },
     },
     LinkConfB = #{
     LinkConfB = #{
         <<"enable">> => true,
         <<"enable">> => true,
         <<"pool_size">> => 1,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortA]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameA
         <<"name">> => NameA
     },
     },
@@ -651,7 +612,14 @@ start_client(ClientId, Node, CleanStart) ->
     Client.
     Client.
 
 
 tcp_port(Node) ->
 tcp_port(Node) ->
-    {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    tcp_port(Node, default).
+
+tcp_port(Node, Listener) ->
+    get_bind_port(erpc:call(Node, emqx_config, get, [[listeners, tcp, Listener, bind]])).
+
+get_bind_port({_Host, Port}) ->
+    Port;
+get_bind_port(Port) when is_integer(Port) ->
     Port.
     Port.
 
 
 combine([Entry | Rest]) ->
 combine([Entry | Rest]) ->

+ 0 - 5
apps/emqx_dashboard/include/emqx_dashboard.hrl

@@ -62,9 +62,7 @@
 
 
 -define(DELTA_SAMPLER_LIST, [
 -define(DELTA_SAMPLER_LIST, [
     received,
     received,
-    %, received_bytes
     sent,
     sent,
-    %, sent_bytes
     validation_succeeded,
     validation_succeeded,
     validation_failed,
     validation_failed,
     transformation_succeeded,
     transformation_succeeded,
@@ -86,9 +84,6 @@
 
 
 -define(DELTA_SAMPLER_RATE_MAP, #{
 -define(DELTA_SAMPLER_RATE_MAP, #{
     received => received_msg_rate,
     received => received_msg_rate,
-    %% In 5.0.0, temporarily comment it to suppress bytes rate
-    %received_bytes  => received_bytes_rate,
-    %sent_bytes      => sent_bytes_rate,
     sent => sent_msg_rate,
     sent => sent_msg_rate,
     validation_succeeded => validation_succeeded_rate,
     validation_succeeded => validation_succeeded_rate,
     validation_failed => validation_failed_rate,
     validation_failed => validation_failed_rate,

+ 81 - 8
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -57,7 +57,8 @@
     randomize/2,
     randomize/2,
     randomize/3,
     randomize/3,
     sample_fill_gap/2,
     sample_fill_gap/2,
-    fill_gaps/2
+    fill_gaps/2,
+    all_data/0
 ]).
 ]).
 
 
 -define(TAB, ?MODULE).
 -define(TAB, ?MODULE).
@@ -191,6 +192,7 @@ handle_info({sample, Time}, State = #state{last = Last}) ->
 handle_info(clean_expired, #state{clean_timer = TrefOld} = State) ->
 handle_info(clean_expired, #state{clean_timer = TrefOld} = State) ->
     ok = maybe_cancel_timer(TrefOld),
     ok = maybe_cancel_timer(TrefOld),
     clean(),
     clean(),
+    inplace_downsample(),
     TrefNew = clean_timer(),
     TrefNew = clean_timer(),
     {noreply, State#state{clean_timer = TrefNew}};
     {noreply, State#state{clean_timer = TrefNew}};
 handle_info(_Info, State = #state{}) ->
 handle_info(_Info, State = #state{}) ->
@@ -205,6 +207,68 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
 %% -------------------------------------------------------------------------------------------------
 %% -------------------------------------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 
 
+all_data() ->
+    Fn = fun(#emqx_monit{time = Time, data = Data}, Acc) -> [{Time, Data} | Acc] end,
+    lists:keysort(1, ets:foldl(Fn, [], ?TAB)).
+
+inplace_downsample() ->
+    All = all_data(),
+    Now = erlang:system_time(millisecond),
+    Compacted = compact(Now, All, []),
+    {Deletes, Writes} = compare(All, Compacted, [], []),
+    {atomic, ok} = mria:transaction(
+        mria:local_content_shard(),
+        fun() ->
+            lists:foreach(
+                fun(T) ->
+                    mnesia:delete(?TAB, T, write)
+                end,
+                Deletes
+            ),
+            lists:foreach(
+                fun({T, D}) ->
+                    mnesia:write(?TAB, #emqx_monit{time = T, data = D}, write)
+                end,
+                Writes
+            )
+        end
+    ),
+    ok.
+
+%% compare the original data points with the compacted data points
+%% return the timestamps to be deleted and the new data points to be written
+compare(Remain, [], Deletes, Writes) ->
+    %% all compacted buckets have been processed, remaining datapoints should all be deleted
+    RemainTsList = lists:map(fun({T, _Data}) -> T end, Remain),
+    {Deletes ++ RemainTsList, Writes};
+compare([{T, Data} | All], [{T, Data} | Compacted], Deletes, Writes) ->
+    %% no change, do nothing
+    compare(All, Compacted, Deletes, Writes);
+compare([{T, _} | All], [{T, Data} | Compacted], Deletes, Writes) ->
+    %% this timetamp has been compacted away, but overwrite it with new data
+    compare(All, Compacted, Deletes, [{T, Data} | Writes]);
+compare([{T0, _} | All], [{T1, _} | _] = Compacted, Deletes, Writes) when T0 < T1 ->
+    %% this timstamp has been compacted away, delete it
+    compare(All, Compacted, [T0 | Deletes], Writes);
+compare([{T0, _} | _] = All, [{T1, Data1} | Compacted], Deletes, Writes) when T0 > T1 ->
+    %% compare with the next compacted bucket timestamp
+    compare(All, Compacted, Deletes, [{T1, Data1} | Writes]).
+
+%% compact the data points to a smaller set of buckets
+compact(_Now, [], Acc) ->
+    lists:reverse(Acc);
+compact(Now, [{Time, Data} | Rest], Acc) ->
+    Interval = sample_interval(Now - Time),
+    Bucket = round_down(Time, Interval),
+    NewAcc = merge_to_bucket(Bucket, Data, Acc),
+    compact(Now, Rest, NewAcc).
+
+merge_to_bucket(Bucket, Data, [{Bucket, Data0} | Acc]) ->
+    NewData = merge_sampler_maps(Data, Data0),
+    [{Bucket, NewData} | Acc];
+merge_to_bucket(Bucket, Data, Acc) ->
+    [{Bucket, Data} | Acc].
+
 %% for testing
 %% for testing
 randomize(Count, Data) when is_map(Data) ->
 randomize(Count, Data) when is_map(Data) ->
     MaxAge = 7 * ?DAYS,
     MaxAge = 7 * ?DAYS,
@@ -212,12 +276,10 @@ randomize(Count, Data) when is_map(Data) ->
 
 
 randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
 randomize(Count, Data, Age) when is_map(Data) andalso is_integer(Age) ->
     Now = erlang:system_time(millisecond) - 1,
     Now = erlang:system_time(millisecond) - 1,
-    Interval = sample_interval(Age),
-    NowBase = Now - (Now rem Interval),
-    StartTs = NowBase - Age,
+    StartTs = Now - Age,
     lists:foreach(
     lists:foreach(
         fun(_) ->
         fun(_) ->
-            Ts = StartTs + rand:uniform(Now - StartTs),
+            Ts = round_down(StartTs + rand:uniform(Age), timer:seconds(10)),
             Record = #emqx_monit{time = Ts, data = Data},
             Record = #emqx_monit{time = Ts, data = Data},
             case ets:lookup(?TAB, Ts) of
             case ets:lookup(?TAB, Ts) of
                 [] ->
                 [] ->
@@ -417,7 +479,6 @@ cal_rate_(Key, {Now, Last, TDelta, Res}) ->
 %% < 3d: sample every 5m: 864 data points
 %% < 3d: sample every 5m: 864 data points
 %% < 7d: sample every 10m: 1008 data points
 %% < 7d: sample every 10m: 1008 data points
 sample_interval(Age) when Age =< 60 * ?SECONDS ->
 sample_interval(Age) when Age =< 60 * ?SECONDS ->
-    %% so far this can happen only during tests
     ?ONE_SECOND;
     ?ONE_SECOND;
 sample_interval(Age) when Age =< ?ONE_HOUR ->
 sample_interval(Age) when Age =< ?ONE_HOUR ->
     10 * ?SECONDS;
     10 * ?SECONDS;
@@ -550,8 +611,20 @@ clean() ->
 
 
 clean(Retention) ->
 clean(Retention) ->
     Now = erlang:system_time(millisecond),
     Now = erlang:system_time(millisecond),
-    MS = ets:fun2ms(fun(#emqx_monit{time = T}) -> Now - T > Retention end),
-    _ = ets:select_delete(?TAB, MS),
+    MS = ets:fun2ms(fun(#emqx_monit{time = T}) when Now - T > Retention -> T end),
+    TsList = ets:select(?TAB, MS),
+    {atomic, ok} =
+        mria:transaction(
+            mria:local_content_shard(),
+            fun() ->
+                lists:foreach(
+                    fun(T) ->
+                        mnesia:delete(?TAB, T, write)
+                    end,
+                    TsList
+                )
+            end
+        ),
     ok.
     ok.
 
 
 %% This data structure should not be changed because it's a RPC contract.
 %% This data structure should not be changed because it's a RPC contract.

+ 42 - 0
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -183,6 +183,48 @@ t_pmap_nodes(_Config) ->
     ok = check_sample_intervals(Interval, hd(Data), tl(Data)),
     ok = check_sample_intervals(Interval, hd(Data), tl(Data)),
     ?assertEqual(DataPoints * length(Nodes), sum_value(Data, sent)).
     ?assertEqual(DataPoints * length(Nodes), sum_value(Data, sent)).
 
 
+t_inplace_downsample(_Config) ->
+    ok = emqx_dashboard_monitor:clean(0),
+    %% -20s to ensure the oldest data point will not expire during the test
+    SinceT = 7 * timer:hours(24) - timer:seconds(20),
+    Total = 10000,
+    emqx_dashboard_monitor:randomize(Total, #{sent => 1}, SinceT),
+    %% assert original data (before downsample)
+    All0 = emqx_dashboard_monitor:all_data(),
+    AllSent0 = lists:map(fun({_, #{sent := S}}) -> S end, All0),
+    ?assertEqual(Total, lists:sum(AllSent0)),
+    emqx_dashboard_monitor ! clean_expired,
+    %% ensure downsample happened
+    ok = gen_server:call(emqx_dashboard_monitor, dummy, infinity),
+    All1 = emqx_dashboard_monitor:all_data(),
+    All = drop_dummy_data_points(All1),
+    AllSent = lists:map(fun({_, #{sent := S}}) -> S end, All),
+    ?assertEqual(Total, lists:sum(AllSent)),
+    %% check timestamps are not random after downsample
+    ExpectedIntervals = [timer:minutes(10), timer:minutes(5), timer:minutes(1), timer:seconds(10)],
+    ok = check_intervals(ExpectedIntervals, All),
+    ok.
+
+%% there might be some data points added while downsample is running
+%% because the sampling interval during test is 1s, so they do not perfectly
+%% match the expected intervals
+%% this function is to dorp those dummy data points
+drop_dummy_data_points(All) ->
+    IsZeroValues = fun(Map) -> lists:all(fun(Value) -> Value =:= 0 end, maps:values(Map)) end,
+    lists:filter(fun({_, Map}) -> not IsZeroValues(Map) end, All).
+
+check_intervals(_, []) ->
+    ok;
+check_intervals([], All) ->
+    throw({bad_intervals, All});
+check_intervals([Interval | Rest], [{Ts, _} | RestData] = All) ->
+    case (Ts rem Interval) =:= 0 of
+        true ->
+            check_intervals([Interval | Rest], RestData);
+        false ->
+            check_intervals(Rest, All)
+    end.
+
 t_randomize(_Config) ->
 t_randomize(_Config) ->
     ok = emqx_dashboard_monitor:clean(0),
     ok = emqx_dashboard_monitor:clean(0),
     emqx_dashboard_monitor:randomize(1, #{sent => 100}),
     emqx_dashboard_monitor:randomize(1, #{sent => 100}),

+ 1 - 1
apps/emqx_mysql/src/emqx_mysql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_mysql, [
 {application, emqx_mysql, [
     {description, "EMQX MySQL Database Connector"},
     {description, "EMQX MySQL Database Connector"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 38 - 15
apps/emqx_mysql/src/emqx_mysql.erl

@@ -36,14 +36,14 @@
 ]).
 ]).
 
 
 %% ecpool connect & reconnect
 %% ecpool connect & reconnect
--export([connect/1, prepare_sql_to_conn/2]).
+-export([connect/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
 
 
 -export([
 -export([
     init_prepare/1,
     init_prepare/1,
     prepare_sql/2,
     prepare_sql/2,
     parse_prepare_sql/1,
     parse_prepare_sql/1,
     parse_prepare_sql/2,
     parse_prepare_sql/2,
-    unprepare_sql/1
+    unprepare_sql/2
 ]).
 ]).
 
 
 -export([roots/0, fields/1, namespace/0]).
 -export([roots/0, fields/1, namespace/0]).
@@ -356,13 +356,16 @@ do_prepare_sql(Templates, PoolName) ->
     prepare_sql_to_conn_list(Conns, Templates).
     prepare_sql_to_conn_list(Conns, Templates).
 
 
 get_connections_from_pool(PoolName) ->
 get_connections_from_pool(PoolName) ->
-    [
-        begin
+    lists:map(
+        fun(Worker) ->
             {ok, Conn} = ecpool_worker:client(Worker),
             {ok, Conn} = ecpool_worker:client(Worker),
             Conn
             Conn
-        end
-     || {_Name, Worker} <- ecpool:workers(PoolName)
-    ].
+        end,
+        pool_workers(PoolName)
+    ).
+
+pool_workers(PoolName) ->
+    lists:map(fun({_Name, Worker}) -> Worker end, ecpool:workers(PoolName)).
 
 
 prepare_sql_to_conn_list([], _Templates) ->
 prepare_sql_to_conn_list([], _Templates) ->
     ok;
     ok;
@@ -376,6 +379,21 @@ prepare_sql_to_conn_list([Conn | ConnList], Templates) ->
             {error, R}
             {error, R}
     end.
     end.
 
 
+%% this callback accepts the arg list provided to
+%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
+%% so ecpool_worker can de-duplicate the callbacks based on the signature.
+get_reconnect_callback_signature([Templates]) ->
+    [{{ChannelID, _}, _}] = lists:filter(
+        fun
+            ({{_, prepstmt}, _}) ->
+                true;
+            (_) ->
+                false
+        end,
+        Templates
+    ),
+    ChannelID.
+
 prepare_sql_to_conn(_Conn, []) ->
 prepare_sql_to_conn(_Conn, []) ->
     ok;
     ok;
 prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
 prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
@@ -400,16 +418,21 @@ prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
 prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
 prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
     prepare_sql_to_conn(Conn, Rest).
     prepare_sql_to_conn(Conn, Rest).
 
 
-unprepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
-    ecpool:remove_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn}),
+unprepare_sql(ChannelID, #{query_templates := Templates, pool_name := PoolName}) ->
     lists:foreach(
     lists:foreach(
-        fun(Conn) ->
-            lists:foreach(
-                fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
-                maps:to_list(Templates)
-            )
+        fun(Worker) ->
+            ok = ecpool_worker:remove_reconnect_callback_by_signature(Worker, ChannelID),
+            case ecpool_worker:client(Worker) of
+                {ok, Conn} ->
+                    lists:foreach(
+                        fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
+                        maps:to_list(Templates)
+                    );
+                _ ->
+                    ok
+            end
         end,
         end,
-        get_connections_from_pool(PoolName)
+        pool_workers(PoolName)
     ).
     ).
 
 
 unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->
 unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->

+ 14 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -98,6 +98,7 @@
     get_callback_mode/1,
     get_callback_mode/1,
     get_resource_type/1,
     get_resource_type/1,
     get_callback_mode/2,
     get_callback_mode/2,
+    get_query_opts/2,
     %% start the instance
     %% start the instance
     call_start/3,
     call_start/3,
     %% verify if the resource is working normally
     %% verify if the resource is working normally
@@ -163,7 +164,8 @@
     on_get_channels/1,
     on_get_channels/1,
     query_mode/1,
     query_mode/1,
     on_format_query_result/1,
     on_format_query_result/1,
-    callback_mode/1
+    callback_mode/1,
+    query_opts/1
 ]).
 ]).
 
 
 %% when calling emqx_resource:start/1
 %% when calling emqx_resource:start/1
@@ -215,6 +217,8 @@
 
 
 -callback query_mode(Config :: term()) -> query_mode().
 -callback query_mode(Config :: term()) -> query_mode().
 
 
+-callback query_opts(Config :: term()) -> #{timeout => timeout()}.
+
 %% This callback handles the installation of a specified channel.
 %% This callback handles the installation of a specified channel.
 %%
 %%
 %% If the channel cannot be successfully installed, the callback shall
 %% If the channel cannot be successfully installed, the callback shall
@@ -516,6 +520,15 @@ get_callback_mode(Mod, State) ->
             undefined
             undefined
     end.
     end.
 
 
+-spec get_query_opts(module(), map()) -> #{timeout => timeout()}.
+get_query_opts(Mod, ActionOrSourceConfig) ->
+    case erlang:function_exported(Mod, query_opts, 1) of
+        true ->
+            Mod:query_opts(ActionOrSourceConfig);
+        false ->
+            emqx_bridge:query_opts(ActionOrSourceConfig)
+    end.
+
 -spec call_start(resource_id(), module(), resource_config()) ->
 -spec call_start(resource_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
     {ok, resource_state()} | {error, Reason :: term()}.
 call_start(ResId, Mod, Config) ->
 call_start(ResId, Mod, Config) ->

+ 14 - 0
changes/ce/feat-13942.en.md

@@ -0,0 +1,14 @@
+The HTTP client now automatically reconnects if no activity is detected for 10 seconds after the latest request has expired.
+Previously, it would wait indefinitely for a server response, causing timeouts if the server dropped requests.
+
+This change impacts below components.
+
+- HTTP authentication
+- HTTP authorization
+- Webhook (HTTP connector)
+- GCP PubSub connector (Enterprise edition)
+- S3 connector (Enterprise edition)
+- InfluxDB connector (Enterprise edition)
+- Couchbase connector (Enterprise edition)
+- IoTDB connector (Enterprise edition)
+- Snowflake connector (Enterprise edition)

+ 5 - 0
changes/ee/fix-13902.en.md

@@ -0,0 +1,5 @@
+Fix prepared statements for MySQL integration.
+
+Prior to this fix, when updating a MySQL integration action,
+if an invalid prepared-statements is used, for example reference to an unknown table column name,
+it may cause the action to apply the oldest version prepared-statement from the past.

+ 3 - 0
changes/ee/fix-13921.en.md

@@ -0,0 +1,3 @@
+Fixed an issue where changing the `sync_timeout` parameter for Pulsar producer action would not have the expected effect on request timeout.
+
+Deprecated `resource_opts.request_ttl` configuration for Pulsar producer action.  It did not affect the request TTL as expected (`retention_period` does) and thus could confuse users.

+ 1 - 0
changes/ee/fix-13927.en.md

@@ -0,0 +1 @@
+Fixed an issue where Cluster Link bootstrap process could have crashed if the source cluster had one or more very crowded topics.

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 
 # This is the chart version. This version number should be incremented each time you make changes
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
 # to the chart and its templates, including the app version.
-version: 5.8.1-beta.2
+version: 5.8.1-rc.1
 
 
 # This is the version number of the application being deployed. This version number should be
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
 # incremented each time you make changes to the application.
-appVersion: 5.8.1-beta.2
+appVersion: 5.8.1-rc.1

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 
 # This is the chart version. This version number should be incremented each time you make changes
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
 # to the chart and its templates, including the app version.
-version: 5.8.1-beta.2
+version: 5.8.1-rc.1
 
 
 # This is the version number of the application being deployed. This version number should be
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
 # incremented each time you make changes to the application.
-appVersion: 5.8.1-beta.2
+appVersion: 5.8.1-rc.1

+ 1 - 1
mix.exs

@@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:ranch), do: {:ranch, github: "emqx/ranch", tag: "1.8.1-emqx", override: true}
   def common_dep(:ranch), do: {:ranch, github: "emqx/ranch", tag: "1.8.1-emqx", override: true}
 
 
   def common_dep(:ehttpc),
   def common_dep(:ehttpc),
-    do: {:ehttpc, github: "emqx/ehttpc", tag: "0.5.0", override: true}
+    do: {:ehttpc, github: "emqx/ehttpc", tag: "0.6.0", override: true}
 
 
   def common_dep(:jiffy), do: {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}
   def common_dep(:jiffy), do: {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}
 
 

+ 1 - 1
rebar.config

@@ -77,7 +77,7 @@
     {gpb, "4.19.9"},
     {gpb, "4.19.9"},
     {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}},
     {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}},
     {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.11"}}},
     {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.11"}}},
-    {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.5.0"}}},
+    {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.6.0"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},