Просмотр исходного кода

Merge remote-tracking branch 'origin/master' into release-58

zmstone 1 год назад
Родитель
Сommit
aebc9ed5fd
100 измененных файлов с 1574 добавлено и 375 удалено
  1. 30 0
      .ci/docker-compose-file/docker-compose-iotdb.yaml
  2. 12 0
      .ci/docker-compose-file/toxiproxy.json
  3. 4 2
      .github/workflows/bump-dashboard-version.yaml
  4. 2 4
      .github/workflows/green_master.yaml
  5. 8 7
      .github/workflows/sync-release-branch.yaml
  6. 0 1
      apps/emqx/include/emqx_shared_sub.hrl
  7. 1 3
      apps/emqx/src/emqx_base62.erl
  8. 12 5
      apps/emqx/src/emqx_broker_sup.erl
  9. 2 2
      apps/emqx/src/emqx_channel.erl
  10. 118 23
      apps/emqx/src/emqx_exclusive_subscription.erl
  11. 2 11
      apps/emqx/src/emqx_guid.erl
  12. 4 14
      apps/emqx/src/emqx_shared_sub.erl
  13. 37 11
      apps/emqx/src/emqx_tls_lib.erl
  14. 3 3
      apps/emqx/src/emqx_ws_connection.erl
  15. 0 4
      apps/emqx/test/emqx_guid_SUITE.erl
  16. 1 1
      apps/emqx/test/emqx_listeners_update_SUITE.erl
  17. 1 1
      apps/emqx/test/emqx_ocsp_cache_SUITE.erl
  18. 5 0
      apps/emqx/test/emqx_proper_types.erl
  19. 33 5
      apps/emqx/test/emqx_tls_lib_tests.erl
  20. 1 1
      apps/emqx/test/props/prop_emqx_base62.erl
  21. 1 1
      apps/emqx_auth/src/emqx_auth.app.src
  22. 1 1
      apps/emqx_auth/src/emqx_authn/emqx_authn_api.erl
  23. 1 1
      apps/emqx_auth_kerberos/rebar.config
  24. 7 4
      apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
  25. 20 3
      apps/emqx_bridge/test/emqx_bridge_v2_tests.erl
  26. 1 1
      apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage.app.src
  27. 10 5
      apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl
  28. 11 0
      apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector_schema.erl
  29. 60 0
      apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl
  30. 5 7
      apps/emqx_bridge_azure_event_hub/mix.exs
  31. 3 3
      apps/emqx_bridge_azure_event_hub/rebar.config
  32. 5 7
      apps/emqx_bridge_confluent/mix.exs
  33. 3 3
      apps/emqx_bridge_confluent/rebar.config
  34. 1 1
      apps/emqx_bridge_http/src/emqx_bridge_http.app.src
  35. 17 2
      apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
  36. 12 0
      apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
  37. 3 3
      apps/emqx_bridge_iotdb/mix.exs
  38. 2 1
      apps/emqx_bridge_iotdb/rebar.config
  39. 3 2
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src
  40. 1 1
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl
  41. 286 56
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl
  42. 39 4
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector_info.erl
  43. 93 30
      apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl
  44. 5 7
      apps/emqx_bridge_kafka/mix.exs
  45. 4 5
      apps/emqx_bridge_kafka/rebar.config
  46. 2 4
      apps/emqx_bridge_pulsar/mix.exs
  47. 1 1
      apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src
  48. 1 1
      apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl
  49. 1 1
      apps/emqx_cluster_link/src/emqx_cluster_link.app.src
  50. 19 2
      apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl
  51. 23 0
      apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl
  52. 64 0
      apps/emqx_cluster_link/test/emqx_cluster_link_tests.erl
  53. 1 1
      apps/emqx_connector/src/emqx_connector.app.src
  54. 10 6
      apps/emqx_connector/src/emqx_connector_ssl.erl
  55. 65 11
      apps/emqx_connector/src/schema/emqx_connector_schema.erl
  56. 1 1
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl
  57. 1 0
      apps/emqx_connector_aggregator/mix.exs
  58. 3 2
      apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src
  59. 23 7
      apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl
  60. 98 0
      apps/emqx_connector_aggregator/test/emqx_connector_aggregator_SUITE.erl
  61. 1 1
      apps/emqx_dashboard/src/emqx_dashboard.app.src
  62. 1 1
      apps/emqx_dashboard/src/emqx_dashboard_listener.erl
  63. 1 1
      apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
  64. 3 3
      apps/emqx_ft/test/props/prop_emqx_ft_assembly.erl
  65. 1 1
      apps/emqx_gateway/src/emqx_gateway.app.src
  66. 2 7
      apps/emqx_gateway/src/emqx_gateway_http.erl
  67. 1 1
      apps/emqx_gateway/src/emqx_gateway_insta_sup.erl
  68. 2 1
      apps/emqx_gateway_coap/src/emqx_coap_channel.erl
  69. 1 1
      apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src
  70. 1 1
      apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src
  71. 1 1
      apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl
  72. 3 3
      apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl
  73. 1 1
      apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src
  74. 3 1
      apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl
  75. 1 1
      apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src
  76. 2 1
      apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl
  77. 1 1
      apps/emqx_license/src/emqx_license.app.src
  78. 2 1
      apps/emqx_license/src/emqx_license_cli.erl
  79. 1 1
      apps/emqx_management/src/emqx_mgmt_api_status.erl
  80. 1 1
      apps/emqx_management/test/emqx_mgmt_api_test_util.erl
  81. 1 1
      apps/emqx_message_transformation/src/emqx_message_transformation.app.src
  82. 22 3
      apps/emqx_message_transformation/src/emqx_message_transformation.erl
  83. 41 0
      apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl
  84. 6 6
      apps/emqx_mix_utils/lib/mix/tasks/emqx.ct.ex
  85. 1 1
      apps/emqx_mix_utils/lib/mix/tasks/emqx.eunit.ex
  86. 1 1
      apps/emqx_mix_utils/lib/mix/tasks/emqx.proper.ex
  87. 1 1
      apps/emqx_resource/src/emqx_resource.app.src
  88. 16 2
      apps/emqx_resource/src/emqx_resource.erl
  89. 11 1
      apps/emqx_resource/src/emqx_resource_manager.erl
  90. 1 1
      apps/emqx_retainer/src/emqx_retainer.app.src
  91. 65 18
      apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
  92. 167 26
      apps/emqx_retainer/test/emqx_retainer_SUITE.erl
  93. 1 1
      apps/emqx_utils/src/emqx_utils.app.src
  94. 21 0
      apps/emqx_utils/src/emqx_utils.erl
  95. 2 0
      changes/ce/fix-13702.en.md
  96. 1 0
      changes/ce/fix-13708.en.md
  97. 1 0
      changes/ce/fix-13733.en.md
  98. 1 0
      changes/ce/fix-13742.en.md
  99. 1 0
      changes/ce/fix-13754.en.md
  100. 0 0
      changes/ce/fix-13756.en.md

+ 30 - 0
.ci/docker-compose-file/docker-compose-iotdb.yaml

@@ -88,3 +88,33 @@ services:
     #     - "18080:18080"
     networks:
       - emqx_bridge
+
+  iotdb-thrift:
+    container_name: iotdb-thrift
+    hostname: iotdb-thrift
+    image: apache/iotdb:1.3.0-standalone
+    restart: always
+    environment:
+      - enable_rest_service=true
+      - cn_internal_address=iotdb-thrift
+      - cn_internal_port=10710
+      - cn_consensus_port=10720
+      - cn_seed_config_node=iotdb-thrift:10710
+      - dn_rpc_address=iotdb-thrift
+      - dn_internal_address=iotdb-thrift
+      - dn_rpc_port=6667
+      - dn_mpp_data_exchange_port=10740
+      - dn_schema_region_consensus_port=10750
+      - dn_data_region_consensus_port=10760
+      - dn_seed_config_node=iotdb-thrift:10710
+    # volumes:
+    #     - ./data:/iotdb/data
+    #     - ./logs:/iotdb/logs
+    expose:
+      - "18080"
+      - "6667"
+    # IoTDB's REST interface, uncomment for local testing
+    # ports:
+    #     - "18080:18080"
+    networks:
+      - emqx_bridge

+ 12 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -156,6 +156,18 @@
     "upstream": "iotdb013:18080",
     "enabled": true
   },
+  {
+    "name": "iotdb_thrift",
+    "listen": "0.0.0.0:46667",
+    "upstream": "iotdb-thrift:6667",
+    "enabled": true
+  },
+  {
+    "name": "iotdb_thrift_rest",
+    "listen": "0.0.0.0:48080",
+    "upstream": "iotdb-thrift:18080",
+    "enabled": true
+  },
   {
     "name": "minio_tcp",
     "listen": "0.0.0.0:19000",

+ 4 - 2
.github/workflows/bump-dashboard-version.yaml

@@ -36,6 +36,8 @@ jobs:
       - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
 
       - name: Create PR to update dashboard version in Makefile
+        env:
+          GH_TOKEN: ${{ github.token }}
         run: |
           set -euxo pipefail
           git config --global user.name "${GITHUB_ACTOR}"
@@ -47,10 +49,10 @@ jobs:
 
           case "${EMQX_NAME}" in
             emqx)
-              sed -i "s|EMQX_DASHBOARD_VERSION=.*|EMQX_DASHBOARD_VERSION=${DASHBOARD_VERSION}|" Makefile
+              sed -i "s|EMQX_DASHBOARD_VERSION ?= .*|EMQX_DASHBOARD_VERSION ?= ${DASHBOARD_VERSION}|" Makefile
               ;;
             emqx-enterprise)
-              sed -i "s|EMQX_EE_DASHBOARD_VERSION=.*|EMQX_EE_DASHBOARD_VERSION=${DASHBOARD_VERSION}|" Makefile
+              sed -i "s|EMQX_EE_DASHBOARD_VERSION ?= .*|EMQX_EE_DASHBOARD_VERSION ?= ${DASHBOARD_VERSION}|" Makefile
               ;;
           esac
           git add Makefile

+ 2 - 4
.github/workflows/green_master.yaml

@@ -35,8 +35,6 @@ jobs:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
           GITHUB_REPO: ${{ github.repository }}
         run: |
-          gh api --method GET -f head_sha=$(git rev-parse HEAD) -f status=completed -f exclude_pull_requests=true /repos/${GITHUB_REPO}/actions/runs > runs.json
-          for id in $(jq -r '.workflow_runs[] | select((."conclusion" == "failure") and (."name" != "Keep master green") and .run_attempt < 3) | .id' runs.json); do
-            echo "rerun https://github.com/${GITHUB_REPO}/actions/runs/$id"
-            gh api --method POST /repos/${GITHUB_REPO}/actions/runs/$id/rerun-failed-jobs || true
+          for id in $(gh run list --branch ${{ matrix.ref }} --workflow "Push Entrypoint" --commit $(git rev-parse HEAD) --status failure --json databaseId,attempt --jq '.[] | select(.attempt < 3) | .databaseId'); do
+            gh run rerun "$id" --failed
           done

+ 8 - 7
.github/workflows/sync-release-branch.yaml

@@ -38,22 +38,23 @@ jobs:
         with:
           fetch-depth: 0
 
-      - name: create new branch
+      - name: merge and create PR
+        env:
+          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
         run: |
           set -euxo pipefail
+          DIFF=$(git rev-list --count HEAD...origin/${SYNC_BRANCH})
+          if [ $DIFF -eq 0 ]; then
+            echo "No changes in ${SYNC_BRANCH}"
+            exit 0
+          fi
           NEW_BRANCH_NAME=sync-${SYNC_BRANCH}-$(date +"%Y%m%d-%H%M%S")
-          echo "NEW_BRANCH_NAME=${NEW_BRANCH_NAME}" >> $GITHUB_ENV
           git config --global user.name "${GITHUB_ACTOR}"
           git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com"
           git checkout -b ${NEW_BRANCH_NAME}
           git merge origin/${SYNC_BRANCH} 2>&1 | tee merge.log
           git push origin ${NEW_BRANCH_NAME}:${NEW_BRANCH_NAME}
 
-      - name: create pull request
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-        run: |
-          set -euxo pipefail
           for pr in $(gh pr list --state open --base master --label sync-release-branch --search "Sync ${SYNC_BRANCH} in:title" --repo ${{ github.repository }} --json number --jq '.[] | .number'); do
             gh pr close $pr --repo ${{ github.repository }} --delete-branch || true
           done

+ 0 - 1
apps/emqx/include/emqx_shared_sub.hrl

@@ -22,7 +22,6 @@
 
 %% ETS tables for Shared PubSub
 -define(SHARED_SUBSCRIBER, emqx_shared_subscriber).
--define(ALIVE_SHARED_SUBSCRIBERS, emqx_alive_shared_subscribers).
 -define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter).
 
 -endif.

+ 1 - 3
apps/emqx/src/emqx_base62.erl

@@ -27,9 +27,7 @@
 %%--------------------------------------------------------------------
 
 %% @doc Encode any data to base62 binary
--spec encode(string() | integer() | binary()) -> binary().
-encode(I) when is_integer(I) ->
-    encode(integer_to_binary(I));
+-spec encode(string() | binary()) -> binary().
 encode(S) when is_list(S) ->
     encode(unicode:characters_to_binary(S));
 encode(B) when is_binary(B) ->

+ 12 - 5
apps/emqx/src/emqx_broker_sup.erl

@@ -23,10 +23,7 @@
 -export([init/1]).
 
 start_link() ->
-    ok = mria:wait_for_tables(
-        emqx_shared_sub:create_tables() ++
-            emqx_exclusive_subscription:create_tables()
-    ),
+    ok = mria:wait_for_tables(emqx_shared_sub:create_tables()),
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 %%--------------------------------------------------------------------
@@ -70,4 +67,14 @@ init([]) ->
         modules => [emqx_broker_helper]
     },
 
-    {ok, {{one_for_all, 0, 1}, [SyncerPool, BrokerPool, SharedSub, Helper]}}.
+    %% exclusive subscription
+    ExclusiveSub = #{
+        id => exclusive_subscription,
+        start => {emqx_exclusive_subscription, start_link, []},
+        restart => permanent,
+        shutdown => 2000,
+        type => worker,
+        modules => [emqx_exclusive_subscription]
+    },
+
+    {ok, {{one_for_all, 0, 1}, [SyncerPool, BrokerPool, SharedSub, Helper, ExclusiveSub]}}.

+ 2 - 2
apps/emqx/src/emqx_channel.erl

@@ -155,6 +155,7 @@
 
 -define(LIMITER_ROUTING, message_routing).
 -define(chan_terminating, chan_terminating).
+-define(RAND_CLIENTID_BYTES, 16).
 
 -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
 
@@ -1704,8 +1705,7 @@ maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId}) when
 ->
     {ok, ClientInfo};
 maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
-    %% Generate a rand clientId
-    {ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
+    {ok, ClientInfo#{clientid => emqx_utils:rand_id(?RAND_CLIENTID_BYTES)}};
 maybe_assign_clientid(#mqtt_packet_connect{clientid = ClientId}, ClientInfo) ->
     {ok, ClientInfo#{clientid => ClientId}}.
 

+ 118 - 23
apps/emqx/src/emqx_exclusive_subscription.erl

@@ -16,16 +16,24 @@
 
 -module(emqx_exclusive_subscription).
 
+-behaviour(gen_server).
+
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
 
--logger_header("[exclusive]").
-
-%% Mnesia bootstrap
--export([create_tables/0]).
+%% API
+-export([start_link/0]).
 
-%% For upgrade
--export([on_add_module/0, on_delete_module/0]).
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
 
 -export([
     check_subscribe/2,
@@ -36,12 +44,15 @@
 
 %% Internal exports (RPC)
 -export([
-    try_subscribe/2
+    try_subscribe/3,
+    do_cleanup_subscriptions/1
 ]).
 
 -record(exclusive_subscription, {
     topic :: emqx_types:topic(),
-    clientid :: emqx_types:clientid()
+    clientid :: emqx_types:clientid(),
+    node :: node(),
+    extra = #{} :: map()
 }).
 
 -define(TAB, emqx_exclusive_subscription).
@@ -51,40 +62,60 @@
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
 
-create_tables() ->
+init_tables() ->
     StoreProps = [
         {ets, [
             {read_concurrency, true},
             {write_concurrency, true}
         ]}
     ],
+
+    Fields = record_info(fields, exclusive_subscription),
+
     ok = mria:create_table(?TAB, [
         {rlog_shard, ?EXCLUSIVE_SHARD},
         {type, set},
         {storage, ram_copies},
         {record_name, exclusive_subscription},
-        {attributes, record_info(fields, exclusive_subscription)},
+        {attributes, Fields},
         {storage_properties, StoreProps}
     ]),
-    [?TAB].
+    ok = mria:wait_for_tables([?TAB]),
+    try_upgrade_table(Fields).
+
+try_upgrade_table(Fields) ->
+    case mnesia:table_info(?TAB, attributes) =:= Fields of
+        true ->
+            ok;
+        false ->
+            TransFun = fun
+                ({exclusive_subscription, Topic, ClientId}) ->
+                    #exclusive_subscription{
+                        topic = Topic,
+                        clientid = ClientId,
+                        node = undefined,
+                        extra = #{}
+                    };
+                (Data = #exclusive_subscription{}) ->
+                    Data
+            end,
+            {atomic, ok} = mnesia:transform_table(?TAB, TransFun, Fields),
+            ok
+    end.
 
 %%--------------------------------------------------------------------
-%% Upgrade
+%% APIs
 %%--------------------------------------------------------------------
 
-on_add_module() ->
-    mria:wait_for_tables(create_tables()).
-
-on_delete_module() ->
-    clear().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-%%--------------------------------------------------------------------
-%% APIs
-%%--------------------------------------------------------------------
 -spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
     allow | deny.
 check_subscribe(#{clientid := ClientId}, Topic) ->
-    case mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/2, [ClientId, Topic]) of
+    case
+        mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/3, [ClientId, Topic, node()])
+    of
         {atomic, Res} ->
             Res;
         {aborted, Reason} ->
@@ -111,17 +142,60 @@ dirty_lookup_clientid(Topic) ->
 clear() ->
     mria:clear_table(?TAB).
 
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    process_flag(trap_exit, true),
+    init_tables(),
+    ok = ekka:monitor(membership),
+    {ok, #{}}.
+
+handle_call(Request, From, State) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_call",
+        call => Request,
+        from => From
+    }),
+    {reply, ok, State}.
+
+handle_cast(Request, State) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_cast",
+        cast => Request
+    }),
+    {noreply, State}.
+
+handle_info({membership, {mnesia, down, Node}}, State) ->
+    cleanup_subscriptions(Node),
+    {noreply, State};
+handle_info({membership, {node, down, Node}}, State) ->
+    cleanup_subscriptions(Node),
+    {noreply, State};
+handle_info(Info, State) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_info",
+        info => Info
+    }),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok = ekka:unmonitor(membership).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
-try_subscribe(ClientId, Topic) ->
+
+try_subscribe(ClientId, Topic, Node) ->
     case mnesia:wread({?TAB, Topic}) of
         [] ->
             mnesia:write(
                 ?TAB,
                 #exclusive_subscription{
                     clientid = ClientId,
-                    topic = Topic
+                    topic = Topic,
+                    node = Node
                 },
                 write
             ),
@@ -136,3 +210,24 @@ try_subscribe(ClientId, Topic) ->
         [_] ->
             deny
     end.
+
+cleanup_subscriptions(Node) ->
+    global:trans(
+        {{?MODULE, ?FUNCTION_NAME}, self()},
+        fun() ->
+            mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:do_cleanup_subscriptions/1, [Node])
+        end
+    ).
+
+do_cleanup_subscriptions(Node0) ->
+    Spec = ets:fun2ms(fun(#exclusive_subscription{node = Node} = Data) when
+        Node0 =:= Node
+    ->
+        Data
+    end),
+    lists:foreach(
+        fun(Obj) ->
+            mnesia:delete_object(?TAB, Obj, write)
+        end,
+        mnesia:select(?TAB, Spec, write)
+    ).

+ 2 - 11
apps/emqx/src/emqx_guid.erl

@@ -35,9 +35,7 @@
     new/0,
     timestamp/1,
     to_hexstr/1,
-    from_hexstr/1,
-    to_base62/1,
-    from_base62/1
+    from_hexstr/1
 ]).
 
 -export_type([guid/0]).
@@ -83,7 +81,7 @@ npid() ->
     <<NodeD01, NodeD02, NodeD03, NodeD04, NodeD05, NodeD06, NodeD07, NodeD08, NodeD09, NodeD10,
         NodeD11, NodeD12, NodeD13, NodeD14, NodeD15, NodeD16, NodeD17, NodeD18, NodeD19,
         NodeD20>> =
-        crypto:hash(sha, erlang:list_to_binary(erlang:atom_to_list(node()))),
+        crypto:hash(sha, erlang:atom_to_binary(node())),
 
     PidBin =
         case erlang:term_to_binary(self()) of
@@ -149,10 +147,3 @@ to_hexstr(I) when byte_size(I) =:= 16 ->
 
 from_hexstr(S) when byte_size(S) =:= 32 ->
     emqx_utils:hexstr_to_bin(S).
-
-to_base62(<<I:128>>) ->
-    emqx_base62:encode(I).
-
-from_base62(S) ->
-    I = binary_to_integer(emqx_base62:decode(S)),
-    <<I:128>>.

+ 4 - 14
apps/emqx/src/emqx_shared_sub.erl

@@ -96,7 +96,7 @@
 -define(SERVER, ?MODULE).
 
 -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
--define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
+-define(IS_REMOTE_PID(Pid), (is_pid(Pid) andalso node(Pid) =/= node())).
 -define(ACK, shared_sub_ack).
 -define(NACK(Reason), {shared_sub_nack, Reason}).
 -define(NO_ACK, no_ack).
@@ -427,7 +427,6 @@ init([]) ->
     {ok, _} = mnesia:subscribe({table, ?SHARED_SUBSCRIPTION, simple}),
     {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0),
     ok = emqx_utils_ets:new(?SHARED_SUBSCRIBER, [protected, bag]),
-    ok = emqx_utils_ets:new(?ALIVE_SHARED_SUBSCRIBERS, [protected, set, {read_concurrency, true}]),
     ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [
         public, set, {write_concurrency, true}
     ]),
@@ -452,7 +451,6 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
             _ = emqx_external_broker:add_shared_route(Topic, Group),
             ok
     end,
-    ok = maybe_insert_alive_tab(SubPid),
     ok = maybe_insert_round_robin_count({Group, Topic}),
     true = ets:insert(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
     {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
@@ -474,7 +472,6 @@ handle_info(
     {mnesia_table_event, {write, #?SHARED_SUBSCRIPTION{subpid = SubPid}, _}},
     State = #state{pmon = PMon}
 ) ->
-    ok = maybe_insert_alive_tab(SubPid),
     {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
 %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
 %% it `unsubscribed` the last topic.
@@ -533,14 +530,7 @@ if_no_more_subscribers(GroupTopic, Fn) ->
     end,
     ok.
 
-%% keep track of alive remote pids
-maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
-maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
-    ets:insert(?ALIVE_SHARED_SUBSCRIBERS, {Pid}),
-    ok.
-
 cleanup_down(SubPid) ->
-    ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SHARED_SUBSCRIBERS, SubPid),
     lists:foreach(
         fun(Record = #?SHARED_SUBSCRIPTION{topic = Topic, group = Group}) ->
             ok = mria:dirty_delete_object(?SHARED_SUBSCRIPTION, Record),
@@ -566,10 +556,10 @@ is_active_sub(Pid, FailedSubs, All) ->
         is_alive_sub(Pid).
 
 %% erlang:is_process_alive/1 does not work with remote pid.
-is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
-    erlang:is_process_alive(Pid);
 is_alive_sub(Pid) ->
-    [] =/= ets:lookup(?ALIVE_SHARED_SUBSCRIBERS, Pid).
+    %% When process is not local, the best guess is it's alive.
+    %% The race is when the pid is actually down cleanup_down is not evaluated yet
+    ?IS_REMOTE_PID(Pid) orelse erlang:is_process_alive(Pid).
 
 delete_route_if_needed({Group, Topic} = GroupTopic) ->
     if_no_more_subscribers(GroupTopic, fun() ->

+ 37 - 11
apps/emqx/src/emqx_tls_lib.erl

@@ -353,7 +353,7 @@ ensure_ssl_files_per_key(Dir, SSL, [KeyPath | KeyPaths], Opts) ->
         {ok, NewSSL} ->
             ensure_ssl_files_per_key(Dir, NewSSL, KeyPaths, Opts);
         {error, Reason} ->
-            {error, Reason#{which_options => [KeyPath]}}
+            {error, Reason#{which_option => format_key_path(KeyPath)}}
     end.
 
 ensure_ssl_file(_Dir, _KeyPath, SSL, undefined, _Opts) ->
@@ -392,13 +392,10 @@ do_ensure_ssl_file(Dir, RawDir, KeyPath, SSL, MaybePem, DryRun) ->
             case is_valid_pem_file(MaybePem) of
                 true ->
                     {ok, SSL};
-                {error, enoent} when DryRun ->
+                {error, #{pem_check := enoent}} when DryRun ->
                     {ok, SSL};
                 {error, Reason} ->
-                    {error, #{
-                        pem_check => invalid_pem,
-                        file_read => Reason
-                    }}
+                    {error, Reason}
             end
     end.
 
@@ -476,11 +473,29 @@ is_hex_str(Str) ->
 %% @doc Returns 'true' when the file is a valid pem, otherwise {error, Reason}.
 is_valid_pem_file(Path0) ->
     Path = resolve_cert_path_for_read(Path0),
-    case file:read_file(Path) of
-        {ok, Pem} -> is_pem(Pem) orelse {error, not_pem};
-        {error, Reason} -> {error, Reason}
+    case is_valid_filename(Path) of
+        true ->
+            case file:read_file(Path) of
+                {ok, Pem} ->
+                    case is_pem(Pem) of
+                        true ->
+                            true;
+                        false ->
+                            {error, #{pem_check => not_pem, file_path => Path}}
+                    end;
+                {error, Reason} ->
+                    {error, #{pem_check => Reason, file_path => Path}}
+            end;
+        false ->
+            %% do not report path because the content can be huge
+            {error, #{pem_check => not_pem, file_path => not_file_path}}
     end.
 
+%% no controle chars 0-31
+%% the input is always string for this function
+is_valid_filename(Path) ->
+    lists:all(fun(C) -> C >= 32 end, Path).
+
 %% @doc Input and output are both HOCON-checked maps, with invalid SSL
 %% file options dropped.
 %% This is to give a feedback to the front-end or management API caller
@@ -711,10 +726,21 @@ ensure_ssl_file_key(SSL, RequiredKeyPaths) ->
         end
     end,
     case lists:filter(Filter, RequiredKeyPaths) of
-        [] -> ok;
-        Miss -> {error, #{reason => ssl_file_option_not_found, which_options => Miss}}
+        [] ->
+            ok;
+        MissingL ->
+            {error, #{
+                reason => ssl_file_option_not_found,
+                missing_options => format_key_paths(MissingL)
+            }}
     end.
 
+format_key_paths(Paths) ->
+    lists:map(fun format_key_path/1, Paths).
+
+format_key_path(Path) ->
+    iolist_to_binary(lists:join(".", [ensure_bin(S) || S <- Path])).
+
 -spec maybe_inject_ssl_fun(root_fun | verify_fun, map()) -> map().
 maybe_inject_ssl_fun(FunName, SslOpts) ->
     case persistent_term:get(?EMQX_SSL_FUN_MFA(FunName), undefined) of

+ 3 - 3
apps/emqx/src/emqx_ws_connection.erl

@@ -810,9 +810,9 @@ handle_outgoing(
                 get_active_n(Type, Listener)
         of
             true ->
-                Cnt = emqx_pd:reset_counter(outgoing_pubs),
-                Oct = emqx_pd:reset_counter(outgoing_bytes),
-                postpone({check_gc, Cnt, Oct}, State);
+                CntPubs = emqx_pd:reset_counter(outgoing_pubs),
+                CntBytes = emqx_pd:reset_counter(outgoing_bytes),
+                postpone({check_gc, CntPubs, CntBytes}, State);
             false ->
                 State
         end,

+ 0 - 4
apps/emqx/test/emqx_guid_SUITE.erl

@@ -35,7 +35,3 @@ t_guid_gen(_) ->
 t_guid_hexstr(_) ->
     Guid = emqx_guid:gen(),
     ?assertEqual(Guid, emqx_guid:from_hexstr(emqx_guid:to_hexstr(Guid))).
-
-t_guid_base62(_) ->
-    Guid = emqx_guid:gen(),
-    ?assertEqual(Guid, emqx_guid:from_base62(emqx_guid:to_base62(Guid))).

+ 1 - 1
apps/emqx/test/emqx_listeners_update_SUITE.erl

@@ -337,7 +337,7 @@ t_update_empty_ssl_options_conf(_Conf) ->
         {error,
             {bad_ssl_config, #{
                 reason := pem_file_path_or_string_is_required,
-                which_options := [[<<"keyfile">>]]
+                which_option := <<"keyfile">>
             }}},
         emqx:update_config(?LISTENERS, BadRaw)
     ),

+ 1 - 1
apps/emqx/test/emqx_ocsp_cache_SUITE.erl

@@ -916,7 +916,7 @@ do_t_validations(_Config) ->
         emqx_utils_json:decode(ResRaw3, [return_maps]),
     %% we can't remove certfile now, because it has default value.
     ?assertMatch({match, _}, re:run(MsgRaw3, <<"enoent">>)),
-    ?assertMatch({match, _}, re:run(MsgRaw3, <<"invalid_pem">>)),
+    ?assertMatch({match, _}, re:run(MsgRaw3, <<"ocsp\\.issuer_pem">>)),
     ok.
 
 t_unknown_error_fetching_ocsp_response(_Config) ->

+ 5 - 0
apps/emqx/test/emqx_proper_types.erl

@@ -57,6 +57,7 @@
 %% Generic Types
 -export([
     scaled/2,
+    logscaled/2,
     fixedmap/1
 ]).
 
@@ -691,6 +692,10 @@ limited_list(N, T) ->
 scaled(F, T) when F > 0 ->
     ?SIZED(S, resize(round(S * F), T)).
 
+-spec logscaled(number(), proptype()) -> proptype().
+logscaled(F, T) when F > 0 ->
+    ?SIZED(S, resize(round(math:log(S + 1) * F), T)).
+
 -spec fixedmap(#{_Key => proptype()}) -> proptype().
 fixedmap(M) ->
     ?LET(PList, maps:to_list(M), maps:from_list(PList)).

+ 33 - 5
apps/emqx/test/emqx_tls_lib_tests.erl

@@ -100,12 +100,40 @@ ssl_files_failure_test_() ->
                 emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("dir", Disabled)
             )
         end},
+        {"mandatory_keys", fun() ->
+            ?assertMatch(
+                {error, #{missing_options := [<<"keyfile">>]}},
+                emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("dir", #{}, #{
+                    required_keys => [[<<"keyfile">>]]
+                })
+            ),
+            ?assertMatch(
+                {error, #{missing_options := [<<"keyfile">>]}},
+                emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("dir", #{}, #{
+                    required_keys => [[keyfile]]
+                })
+            )
+        end},
+        {"invalid_file_path", fun() ->
+            ?assertMatch(
+                {error, #{
+                    pem_check := not_pem,
+                    file_path := not_file_path,
+                    which_option := <<"keyfile">>
+                }},
+                emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("/tmp", #{
+                    keyfile => <<"abc", $\n, "123">>,
+                    certfile => test_key(),
+                    cacertfile => test_key()
+                })
+            )
+        end},
         {"enoent_key_file", fun() ->
             NonExistingFile = filename:join(
                 "/tmp", integer_to_list(erlang:system_time(microsecond))
             ),
             ?assertMatch(
-                {error, #{file_read := enoent, pem_check := invalid_pem}},
+                {error, #{pem_check := enoent, file_path := _}},
                 emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("/tmp", #{
                     <<"keyfile">> => NonExistingFile,
                     <<"certfile">> => test_key(),
@@ -128,7 +156,7 @@ ssl_files_failure_test_() ->
             ?assertMatch(
                 {error, #{
                     reason := pem_file_path_or_string_is_required,
-                    which_options := [[<<"keyfile">>]]
+                    which_option := <<"keyfile">>
                 }},
                 emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("/tmp", #{
                     <<"keyfile">> => <<>>,
@@ -139,7 +167,7 @@ ssl_files_failure_test_() ->
             %% not valid unicode
             ?assertMatch(
                 {error, #{
-                    reason := invalid_file_path_or_pem_string, which_options := [[<<"keyfile">>]]
+                    reason := invalid_file_path_or_pem_string, which_option := <<"keyfile">>
                 }},
                 emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("/tmp", #{
                     <<"keyfile">> => <<255, 255>>,
@@ -150,7 +178,7 @@ ssl_files_failure_test_() ->
             ?assertMatch(
                 {error, #{
                     reason := invalid_file_path_or_pem_string,
-                    which_options := [[<<"ocsp">>, <<"issuer_pem">>]]
+                    which_option := <<"ocsp.issuer_pem">>
                 }},
                 emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir("/tmp", #{
                     <<"keyfile">> => test_key(),
@@ -172,7 +200,7 @@ ssl_files_failure_test_() ->
             try
                 ok = file:write_file(TmpFile, <<"not a valid pem">>),
                 ?assertMatch(
-                    {error, #{file_read := not_pem}},
+                    {error, #{pem_check := not_pem}},
                     emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir(
                         "/tmp",
                         #{

+ 1 - 1
apps/emqx/test/props/prop_emqx_base62.erl

@@ -75,4 +75,4 @@ base62_size(Data, Encoded) ->
 %%--------------------------------------------------------------------
 
 raw_data() ->
-    oneof([integer(), string(), binary()]).
+    oneof([string(), binary()]).

+ 1 - 1
apps/emqx_auth/src/emqx_auth.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth, [
     {description, "EMQX Authentication and authorization"},
-    {vsn, "0.4.0"},
+    {vsn, "0.4.1"},
     {modules, []},
     {registered, [emqx_auth_sup]},
     {applications, [

+ 1 - 1
apps/emqx_auth/src/emqx_authn/emqx_authn_api.erl

@@ -1296,7 +1296,7 @@ serialize_error(unsupported_operation) ->
 serialize_error({bad_ssl_config, Details}) ->
     {400, #{
         code => <<"BAD_REQUEST">>,
-        message => binfmt("bad_ssl_config ~p", [Details])
+        message => binfmt("bad_ssl_config ~0p", [Details])
     }};
 serialize_error({missing_parameter, Detail}) ->
     {400, #{

+ 1 - 1
apps/emqx_auth_kerberos/rebar.config

@@ -3,5 +3,5 @@
 {deps, [
     {emqx, {path, "../emqx"}},
     {emqx_utils, {path, "../emqx_utils"}},
-    {sasl_auth, "2.3.0"}
+    {sasl_auth, "2.3.1"}
 ]}.

+ 7 - 4
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -988,6 +988,8 @@ t_consume(Config, Opts) ->
     ok.
 
 t_create_via_http(Config) ->
+    t_create_via_http(Config, false).
+t_create_via_http(Config, IsOnlyV2) ->
     ?check_trace(
         begin
             ?assertMatch({ok, _}, create_bridge_api(Config)),
@@ -1000,10 +1002,11 @@ t_create_via_http(Config) ->
             ),
 
             %% check that v1 list API is fine
-            ?assertMatch(
-                {ok, {{_, 200, _}, _, _}},
-                list_bridges_http_api_v1()
-            ),
+            (not IsOnlyV2) andalso
+                ?assertMatch(
+                    {ok, {{_, 200, _}, _, _}},
+                    list_bridges_http_api_v1()
+                ),
 
             ok
         end,

+ 20 - 3
apps/emqx_bridge/test/emqx_bridge_v2_tests.erl

@@ -75,10 +75,27 @@ connector_resource_opts_test() ->
         start_timeout
     ],
     ConnectorSchemasRefs =
-        lists:map(
-            fun({Type, #{type := ?MAP(_, ?R_REF(SchemaMod, FieldName))}}) ->
-                {Type, find_resource_opts_fields(SchemaMod, FieldName)}
+        lists:foldl(
+            fun
+                ({Type, #{type := ?MAP(_, ?R_REF(SchemaMod, FieldName))}}, Acc) ->
+                    [{Type, find_resource_opts_fields(SchemaMod, FieldName)} | Acc];
+                ({Type, #{type := ?MAP(_, ?UNION(UnionType))}}, Acc) ->
+                    Types =
+                        case UnionType of
+                            List when is_list(List) ->
+                                List;
+                            Func when is_function(Func, 1) ->
+                                Func(all_union_members)
+                        end,
+                    lists:foldl(
+                        fun(?R_REF(SchemaMod, FieldName), InAcc) ->
+                            [{Type, find_resource_opts_fields(SchemaMod, FieldName)} | InAcc]
+                        end,
+                        Acc,
+                        Types
+                    )
             end,
+            [],
             emqx_connector_schema:fields(connectors)
         ),
     ConnectorsMissingRO = [Type || {Type, undefined} <- ConnectorSchemasRefs],

+ 1 - 1
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_azure_blob_storage, [
     {description, "EMQX Enterprise Azure Blob Storage Bridge"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, [emqx_bridge_azure_blob_storage_sup]},
     {applications, [
         kernel,

+ 10 - 5
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl

@@ -178,9 +178,14 @@ on_stop(_ConnResId, _ConnState) ->
     ok.
 
 -spec on_get_status(connector_resource_id(), connector_state()) ->
-    ?status_connected | ?status_disconnected.
-on_get_status(_ConnResId, _ConnState = #{driver_state := DriverState}) ->
-    health_check(DriverState).
+    ?status_connected | ?status_disconnected | {?status_disconnected, connector_state(), term()}.
+on_get_status(_ConnResId, ConnState = #{driver_state := DriverState}) ->
+    case health_check(DriverState) of
+        {Status, Message} ->
+            {Status, ConnState, Message};
+        Status when is_atom(Status) ->
+            Status
+    end.
 
 -spec on_add_channel(
     connector_resource_id(),
@@ -646,8 +651,8 @@ channel_status(#{mode := aggregated} = ActionState, ConnState) ->
 
 health_check(DriverState) ->
     case erlazure:list_containers(DriverState, []) of
-        {error, _} ->
-            ?status_disconnected;
+        {error, Reason} ->
+            {?status_disconnected, Reason};
         {L, _} when is_list(L) ->
             ?status_connected
     end.

+ 11 - 0
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector_schema.erl

@@ -60,6 +60,8 @@ fields(connector_config) ->
         {account_key,
             emqx_schema_secret:mk(
                 #{
+                    required => true,
+                    validator => fun account_key_validator/1,
                     desc => ?DESC("account_key")
                 }
             )},
@@ -140,3 +142,12 @@ connector_example(put) ->
 %%------------------------------------------------------------------------------
 
 mk(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+account_key_validator(Val) ->
+    try
+        _ = base64:decode(emqx_secret:unwrap(Val)),
+        ok
+    catch
+        _:_ ->
+            {error, <<"bad account key">>}
+    end.

+ 60 - 0
apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl

@@ -702,3 +702,63 @@ t_max_block_size_direct_transfer(Config) ->
         emqx_resource:simple_sync_query(ResourceId, Message)
     ),
     ok.
+
+%% Checks that account keys that are not base64 encoded return a friendly error.
+t_bad_account_key(Config) ->
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {400, #{
+                    <<"message">> := #{
+                        <<"kind">> := <<"validation_error">>,
+                        <<"reason">> := <<"bad account key">>
+                    }
+                }},
+                emqx_bridge_v2_testlib:simplify_result(
+                    emqx_bridge_v2_testlib:create_connector_api(
+                        Config,
+                        #{<<"account_key">> => <<"aaa">>}
+                    )
+                )
+            ),
+            ?assertMatch(
+                {400, #{
+                    <<"message">> := #{
+                        <<"kind">> := <<"validation_error">>,
+                        <<"reason">> := <<"bad account key">>
+                    }
+                }},
+                emqx_bridge_v2_testlib:simplify_result(
+                    emqx_bridge_v2_testlib:probe_connector_api(
+                        Config,
+                        #{<<"account_key">> => <<"aaa">>}
+                    )
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+%% Checks that account names that are non-existent return a friendly error.
+t_bad_account_name(Config) ->
+    ConnectorConfig0 = ?config(connector_config, Config),
+    ConnectorConfig = maps:remove(<<"endpoint">>, ConnectorConfig0),
+    ?check_trace(
+        begin
+            Res0 = emqx_bridge_v2_testlib:simplify_result(
+                emqx_bridge_v2_testlib:probe_connector_api(
+                    [{connector_config, ConnectorConfig} | Config],
+                    #{<<"account_name">> => <<"idontexistzzzzzaa">>}
+                )
+            ),
+            ?assertMatch({400, #{<<"message">> := _}}, Res0),
+            {400, #{<<"message">> := Msg}} = Res0,
+            ?assertEqual(match, re:run(Msg, <<"failed_connect">>, [{capture, none}])),
+            ?assertEqual(match, re:run(Msg, <<"nxdomain">>, [{capture, none}])),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 5 - 7
apps/emqx_bridge_azure_event_hub/mix.exs

@@ -23,13 +23,11 @@ defmodule EMQXBridgeAzureEventHub.MixProject do
 
   def deps() do
     [
-      {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
-      {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
-      {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
-      {:brod, github: "kafka4beam/brod", tag: "3.18.0"},
-      ## TODO: remove `mix.exs` from `wolff` and remove this override
-      ## TODO: remove `mix.exs` from `pulsar` and remove this override
-      {:snappyer, "1.2.9", override: true},
+      UMP.common_dep(:wolff),
+      UMP.common_dep(:kafka_protocol),
+      UMP.common_dep(:brod_gssapi),
+      UMP.common_dep(:brod),
+      UMP.common_dep(:snappyer),
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false}

+ 3 - 3
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,9 +2,9 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
-    {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
-    {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
+    {wolff, "3.0.4"},
+    {kafka_protocol, "4.1.8"},
+    {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
     {snappyer, "1.2.9"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},

+ 5 - 7
apps/emqx_bridge_confluent/mix.exs

@@ -23,13 +23,11 @@ defmodule EMQXBridgeConfluent.MixProject do
 
   def deps() do
     [
-      {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
-      {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
-      {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
-      {:brod, github: "kafka4beam/brod", tag: "3.18.0"},
-      ## TODO: remove `mix.exs` from `wolff` and remove this override
-      ## TODO: remove `mix.exs` from `pulsar` and remove this override
-      {:snappyer, "1.2.9", override: true},
+      UMP.common_dep(:wolff),
+      UMP.common_dep(:kafka_protocol),
+      UMP.common_dep(:brod_gssapi),
+      UMP.common_dep(:brod),
+      UMP.common_dep(:snappyer),
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false}

+ 3 - 3
apps/emqx_bridge_confluent/rebar.config

@@ -2,9 +2,9 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
-    {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
-    {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
+    {wolff, "3.0.4"},
+    {kafka_protocol, "4.1.8"},
+    {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
     {snappyer, "1.2.9"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_http, [
     {description, "EMQX HTTP Bridge and Connector Application"},
-    {vsn, "0.3.4"},
+    {vsn, "0.3.5"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_resource, ehttpc]},
     {env, [

+ 17 - 2
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -851,7 +851,19 @@ formalize_request(_Method, BasePath, {Path, Headers}) ->
 %%
 %% See also: `join_paths_test_/0`
 join_paths(Path1, Path2) ->
-    [without_trailing_slash(Path1), $/, without_starting_slash(Path2)].
+    case is_start_with_question_mark(Path2) of
+        true ->
+            [Path1, Path2];
+        false ->
+            [without_trailing_slash(Path1), $/, without_starting_slash(Path2)]
+    end.
+
+is_start_with_question_mark([$? | _]) ->
+    true;
+is_start_with_question_mark(<<$?, _/binary>>) ->
+    true;
+is_start_with_question_mark(_) ->
+    false.
 
 without_starting_slash(Path) ->
     case do_without_starting_slash(Path) of
@@ -1080,7 +1092,10 @@ join_paths_test_() ->
         ?_assert(iolists_equal("/cde", join_paths("/", "cde"))),
         ?_assert(iolists_equal("/cde", join_paths("/", "/cde"))),
         ?_assert(iolists_equal("//cde/", join_paths("/", "//cde/"))),
-        ?_assert(iolists_equal("abc///cde/", join_paths("abc//", "//cde/")))
+        ?_assert(iolists_equal("abc///cde/", join_paths("abc//", "//cde/"))),
+        ?_assert(iolists_equal("abc?v=1", join_paths("abc", "?v=1"))),
+        ?_assert(iolists_equal("abc?v=1", join_paths("abc", <<"?v=1">>))),
+        ?_assert(iolists_equal("abc/?v=1", join_paths("abc/", <<"?v=1">>)))
     ].
 
 -endif.

+ 12 - 0
apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl

@@ -11,4 +11,16 @@
 -define(VSN_1_0_X, 'v1.0.x').
 -define(VSN_0_13_X, 'v0.13.x').
 
+-define(THRIFT_HOST_OPTIONS, #{
+    default_port => 6667
+}).
+
+-define(PROTOCOL_V1, 'protocol_v1').
+-define(PROTOCOL_V2, 'protocol_v2').
+-define(PROTOCOL_V3, 'protocol_v3').
+
+-define(THRIFT_NOT_SUPPORT_ASYNC_MSG, <<"The Thrift backend does not support asynchronous calls">>).
+
+-type driver() :: resetapi | thrift.
+
 -endif.

+ 3 - 3
apps/emqx_bridge_iotdb/mix.exs

@@ -5,7 +5,7 @@ defmodule EMQXBridgeIotdb.MixProject do
   def project do
     [
       app: :emqx_bridge_iotdb,
-      version: "0.1.0",
+      version: "0.2.3",
       build_path: "../../_build",
       erlc_options: UMP.erlc_options(),
       erlc_paths: UMP.erlc_paths(),
@@ -23,11 +23,11 @@ defmodule EMQXBridgeIotdb.MixProject do
 
   def deps() do
     [
-      {:emqx, in_umbrella: true},
+      {:iotdb, github: "emqx/iotdb-client-erl", tag: "0.1.5"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false},
-      {:emqx_bridge_http, in_umbrella: true}
+      {:emqx_bridge_http, in_umbrella: true, runtime: false}
     ]
   end
 end

+ 2 - 1
apps/emqx_bridge_iotdb/rebar.config

@@ -9,7 +9,8 @@
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}},
-    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}},
+    {iotdb, {git, "https://github.com/emqx/iotdb-client-erl.git", {tag, "0.1.5"}}}
 ]}.
 {plugins, [rebar3_path_deps]}.
 {project_plugins, [erlfmt]}.

+ 3 - 2
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_iotdb, [
     {description, "EMQX Enterprise Apache IoTDB Bridge"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {modules, [
         emqx_bridge_iotdb,
         emqx_bridge_iotdb_connector
@@ -10,7 +10,8 @@
     {applications, [
         kernel,
         stdlib,
-        emqx_resource
+        emqx_resource,
+        iotdb
     ]},
     {env, [
         {emqx_action_info_modules, [emqx_bridge_iotdb_action_info]},

+ 1 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl

@@ -53,7 +53,7 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
     ).
 
 bridge_v1_config_to_connector_config(BridgeV1Config) ->
-    ConnectorKeys = schema_keys(emqx_bridge_iotdb_connector, config),
+    ConnectorKeys = schema_keys(emqx_bridge_iotdb_connector, "config_restapi"),
     emqx_utils_maps:update_if_present(
         <<"resource_opts">>,
         fun emqx_connector_schema:project_to_connector_resource_opts/1,

+ 286 - 56
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -17,6 +17,7 @@
 -export([
     resource_type/0,
     callback_mode/0,
+    callback_mode/1,
     on_start/2,
     on_stop/2,
     on_get_status/2,
@@ -31,6 +32,8 @@
     on_format_query_result/1
 ]).
 
+-export([connect/1, do_get_status/1]).
+
 -export([
     namespace/0,
     roots/0,
@@ -45,7 +48,8 @@
 
 -type config() ::
     #{
-        request_base := #{
+        driver := driver(),
+        request_base => #{
             scheme := http | https,
             host := iolist(),
             port := inet:port_number()
@@ -53,18 +57,17 @@
         connect_timeout := pos_integer(),
         pool_type := random | hash,
         pool_size := pos_integer(),
-        iotdb_version := atom(),
+        iotdb_version => atom(),
+        protocol_version => atom(),
         request => undefined | map(),
         atom() => _
     }.
 
 -type state() ::
     #{
-        connect_timeout := pos_integer(),
-        pool_type := random | hash,
+        driver := driver(),
         channels := map(),
         iotdb_version := atom(),
-        request => undefined | map(),
         atom() => _
     }.
 
@@ -115,13 +118,13 @@ connector_example_values() ->
 namespace() -> "iotdb".
 
 roots() ->
-    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+    [].
 
-fields(config) ->
+fields("config_restapi") ->
     proplists_without(
         [url, request, retry_interval, headers],
         emqx_bridge_http_schema:fields("config_connector")
-    ) ++
+    ) ++ common_fields(restapi) ++
         fields("connection_fields");
 fields("connection_fields") ->
     [
@@ -143,38 +146,88 @@ fields("connection_fields") ->
             )},
         {authentication,
             mk(
-                hoconsc:union([ref(?MODULE, auth_basic)]),
+                hoconsc:union([ref(?MODULE, authentication)]),
                 #{
                     default => auth_basic, desc => ?DESC("config_authentication")
                 }
             )}
     ];
-fields(auth_basic) ->
+fields(authentication) ->
     [
-        {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
+        {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_username")})},
         {password,
             emqx_schema_secret:mk(#{
                 required => true,
-                desc => ?DESC("config_auth_basic_password")
+                desc => ?DESC("config_auth_password")
             })}
     ];
-fields("post") ->
-    emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config);
-fields("put") ->
-    fields(config);
-fields("get") ->
-    emqx_bridge_schema:status_fields() ++ fields("post").
-
-desc(config) ->
-    ?DESC("desc_config");
-desc(auth_basic) ->
-    "Basic Authentication";
-desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
-    ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
+fields("config_thrift") ->
+    Meta = #{desc => ?DESC("server")},
+    emqx_connector_schema:common_fields() ++
+        common_fields(thrift) ++
+        [
+            {server, emqx_schema:servers_sc(Meta, ?THRIFT_HOST_OPTIONS)},
+            {protocol_version,
+                mk(
+                    hoconsc:enum([?PROTOCOL_V1, ?PROTOCOL_V2, ?PROTOCOL_V3]),
+                    #{
+                        desc => ?DESC("config_protocol_version"),
+                        default => ?PROTOCOL_V3
+                    }
+                )},
+            {'zoneId',
+                mk(
+                    binary(),
+                    #{default => <<"Asia/Shanghai">>, desc => ?DESC("config_zoneId")}
+                )},
+            {pool_size,
+                mk(
+                    pos_integer(),
+                    #{
+                        default => 8,
+                        desc => ?DESC("pool_size")
+                    }
+                )}
+        ] ++ fields(authentication) ++ emqx_connector_schema_lib:ssl_fields() ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields("post_" ++ Driver) ->
+    emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields("config_" ++ Driver);
+fields("put_" ++ Driver) ->
+    fields("config_" ++ Driver);
+fields("get_" ++ Driver) ->
+    emqx_bridge_schema:status_fields() ++ fields("post_" ++ Driver).
+
+common_fields(Driver) ->
+    [
+        {driver,
+            mk(
+                hoconsc:enum([Driver]),
+                #{
+                    desc => ?DESC("config_driver"),
+                    default => <<"restapi">>
+                }
+            )}
+    ].
+
+desc(authentication) ->
+    ?DESC("config_authentication");
+desc(connector_resource_opts) ->
+    "Connector resource options";
+desc(Struct) when is_list(Struct) ->
+    case string:split(Struct, "_") of
+        ["config", _] ->
+            ?DESC("desc_config");
+        [Method, _] when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+            ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
+        _ ->
+            undefined
+    end;
 desc(_) ->
     undefined.
 
-connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
+connector_config(#{driver := restapi} = Conf, #{name := Name, parse_confs := ParseConfs}) ->
     #{
         base_url := BaseUrl,
         authentication :=
@@ -199,7 +252,9 @@ connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
         <<"http">>,
         Name,
         WebhookConfig
-    ).
+    );
+connector_config(Conf, _) ->
+    Conf.
 
 proplists_without(Keys, List) ->
     [El || El = {K, _} <- List, not lists:member(K, Keys)].
@@ -211,8 +266,13 @@ resource_type() -> iotdb.
 
 callback_mode() -> async_if_possible.
 
+callback_mode(#{driver := restapi}) ->
+    async_if_possible;
+callback_mode(#{driver := thrift}) ->
+    always_sync.
+
 -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
-on_start(InstanceId, #{iotdb_version := Version} = Config) ->
+on_start(InstanceId, #{driver := restapi, iotdb_version := Version} = Config) ->
     %% [FIXME] The configuration passed in here is pre-processed and transformed
     %% in emqx_bridge_resource:parse_confs/2.
     case emqx_bridge_http_connector:on_start(InstanceId, Config) of
@@ -222,8 +282,78 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) ->
                 instance_id => InstanceId,
                 request => emqx_utils:redact(maps:get(request, State, <<>>))
             }),
-            ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
-            {ok, State#{iotdb_version => Version, channels => #{}}};
+            ?tp(iotdb_bridge_started, #{driver => restapi, instance_id => InstanceId}),
+            {ok, State#{driver => restapi, iotdb_version => Version, channels => #{}}};
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_iotdb_bridge",
+                instance_id => InstanceId,
+                request => emqx_utils:redact(maps:get(request, Config, <<>>)),
+                reason => Reason
+            }),
+            throw(failed_to_start_iotdb_bridge)
+    end;
+on_start(
+    InstanceId,
+    #{
+        driver := thrift,
+        protocol_version := ProtocolVsn,
+        server := Server,
+        pool_size := PoolSize,
+        ssl := SSL
+    } = Config
+) ->
+    IoTDBOpts0 = maps:with(['zoneId', username, password], Config),
+
+    Version =
+        case ProtocolVsn of
+            ?PROTOCOL_V1 ->
+                0;
+            ?PROTOCOL_V2 ->
+                1;
+            ?PROTOCOL_V3 ->
+                2
+        end,
+
+    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?THRIFT_HOST_OPTIONS),
+
+    TransportOpts =
+        case maps:get(enable, SSL) of
+            true ->
+                #{
+                    ssltransport => true,
+                    ssloptions => emqx_tls_lib:to_client_opts(SSL)
+                };
+            false ->
+                #{}
+        end,
+
+    IoTDBOpts = IoTDBOpts0#{
+        version => Version,
+        host => Host,
+        port => Port,
+        options => TransportOpts
+    },
+
+    Options = [
+        {pool_size, PoolSize},
+        {iotdb_options, IoTDBOpts}
+    ],
+
+    case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
+        ok ->
+            ?SLOG(info, #{
+                msg => "iotdb_bridge_started",
+                instance_id => InstanceId
+            }),
+
+            ?tp(iotdb_bridge_started, #{driver => thrift, instance_id => InstanceId}),
+
+            {ok, #{
+                driver => thrift,
+                iotdb_version => ProtocolVsn,
+                channels => #{}
+            }};
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_iotdb_bridge",
@@ -235,18 +365,26 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) ->
     end.
 
 -spec on_stop(manager_id(), state()) -> ok | {error, term()}.
-on_stop(InstanceId, State) ->
+on_stop(InstanceId, #{driver := restapi} = State) ->
     ?SLOG(info, #{
         msg => "stopping_iotdb_bridge",
         connector => InstanceId
     }),
     Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
     ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
-    Res.
+    Res;
+on_stop(InstanceId, #{driver := thrift} = _State) ->
+    ?SLOG(info, #{
+        msg => "stopping_iotdb_bridge",
+        connector => InstanceId
+    }),
+
+    ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
+    emqx_resource_pool:stop(InstanceId).
 
 -spec on_get_status(manager_id(), state()) ->
     connected | connecting | {disconnected, state(), term()}.
-on_get_status(InstanceId, State) ->
+on_get_status(InstanceId, #{driver := restapi} = State) ->
     Func = fun(Worker, Timeout) ->
         Request = {?IOTDB_PING_PATH, [], undefined},
         NRequest = emqx_bridge_http_connector:formalize_request(get, Request),
@@ -265,7 +403,27 @@ on_get_status(InstanceId, State) ->
                 {error, {unexpected_ping_result, Result}}
         end
     end,
-    emqx_bridge_http_connector:on_get_status(InstanceId, State, Func).
+    emqx_bridge_http_connector:on_get_status(InstanceId, State, Func);
+on_get_status(InstanceId, #{driver := thrift} = _State) ->
+    case emqx_resource_pool:health_check_workers(InstanceId, fun ?MODULE:do_get_status/1) of
+        true ->
+            ?status_connected;
+        false ->
+            ?status_disconnected
+    end.
+
+do_get_status(Conn) ->
+    case iotdb:ping(Conn) of
+        {ok, _} ->
+            true;
+        {error, _} ->
+            false
+    end.
+
+connect(Opts) ->
+    {iotdb_options, #{password := Password} = IoTDBOpts0} = lists:keyfind(iotdb_options, 1, Opts),
+    IoTDBOpts = IoTDBOpts0#{password := emqx_secret:unwrap(Password)},
+    iotdb:start_link(IoTDBOpts).
 
 -spec on_query(manager_id(), {send_message, map()}, state()) ->
     {ok, pos_integer(), [term()], term()}
@@ -287,8 +445,8 @@ on_query(
     case try_render_messages([Req], IoTDBVsn, Channels) of
         {ok, [IoTDBPayload]} ->
             handle_response(
-                emqx_bridge_http_connector:on_query(
-                    InstanceId, {ChannelId, IoTDBPayload}, State
+                do_on_query(
+                    InstanceId, ChannelId, IoTDBPayload, State
                 )
             );
         Error ->
@@ -301,7 +459,7 @@ on_query_async(
     InstanceId,
     {ChannelId, _Message} = Req,
     ReplyFunAndArgs0,
-    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+    #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
 ) ->
     ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
@@ -325,13 +483,27 @@ on_query_async(
             );
         Error ->
             Error
-    end.
+    end;
+on_query_async(
+    InstanceId,
+    Req,
+    _ReplyFunAndArgs0,
+    #{driver := thrift} = State
+) ->
+    ?SLOG(error, #{
+        msg => "iotdb_bridge_async_query_failed",
+        instance_id => InstanceId,
+        send_message => Req,
+        reason => ?THRIFT_NOT_SUPPORT_ASYNC_MSG,
+        state => emqx_utils:redact(State)
+    }),
+    {error, not_support}.
 
 on_batch_query_async(
     InstId,
     Requests,
     Callback,
-    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+    #{driver := restapi, iotdb_version := IoTDBVsn, channels := Channels} = State
 ) ->
     ?tp(iotdb_bridge_on_batch_query_async, #{instance_id => InstId}),
     [{ChannelId, _Message} | _] = Requests,
@@ -361,8 +533,23 @@ on_batch_query_async(
             );
         Error ->
             Error
-    end.
+    end;
+on_batch_query_async(
+    InstanceId,
+    Req,
+    _ReplyFunAndArgs0,
+    #{driver := thrift} = State
+) ->
+    ?SLOG(error, #{
+        msg => "iotdb_bridge_async_query_failed",
+        instance_id => InstanceId,
+        send_message => Req,
+        reason => ?THRIFT_NOT_SUPPORT_ASYNC_MSG,
+        state => emqx_utils:redact(State)
+    }),
+    {error, not_support}.
 
+%% todo
 on_batch_query(
     InstId,
     [{ChannelId, _Message}] = Requests,
@@ -381,8 +568,8 @@ on_batch_query(
             lists:map(
                 fun(IoTDBPayload) ->
                     handle_response(
-                        emqx_bridge_http_connector:on_query(
-                            InstId, {ChannelId, IoTDBPayload}, State
+                        do_on_query(
+                            InstId, ChannelId, IoTDBPayload, State
                         )
                     )
                 end,
@@ -397,7 +584,7 @@ on_format_query_result(Result) ->
 
 on_add_channel(
     InstanceId,
-    #{iotdb_version := Version, channels := Channels} = OldState0,
+    #{driver := restapi, iotdb_version := Version, channels := Channels} = OldState0,
     ChannelId,
     #{
         parameters := #{data := Data} = Parameter
@@ -429,6 +616,27 @@ on_add_channel(
                 InstanceId, OldState0, ChannelId, HTTPReq
             ),
 
+            %% update IoTDB channel
+            DeviceId = maps:get(device_id, Parameter, <<>>),
+            Channel = Parameter#{
+                device_id => emqx_placeholder:preproc_tmpl(DeviceId),
+                data := preproc_data_template(Data)
+            },
+            Channels2 = Channels#{ChannelId => Channel},
+            {ok, OldState#{channels := Channels2}}
+    end;
+on_add_channel(
+    _InstanceId,
+    #{driver := thrift, channels := Channels} = OldState,
+    ChannelId,
+    #{
+        parameters := #{data := Data} = Parameter
+    }
+) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            {error, already_exists};
+        _ ->
             %% update IoTDB channel
             DeviceId = maps:get(device_id, Parameter, <<>>),
             Channel = Parameter#{
@@ -439,8 +647,11 @@ on_add_channel(
             {ok, OldState#{channels := Channels2}}
     end.
 
-on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
+on_remove_channel(InstanceId, #{driver := restapi, channels := Channels} = OldState0, ChannelId) ->
     {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
+    Channels2 = maps:remove(ChannelId, Channels),
+    {ok, OldState#{channels => Channels2}};
+on_remove_channel(_InstanceId, #{driver := thrift, channels := Channels} = OldState, ChannelId) ->
     Channels2 = maps:remove(ChannelId, Channels),
     {ok, OldState#{channels => Channels2}}.
 
@@ -536,16 +747,16 @@ proc_data(
     ],
     Msg,
     Nows,
-    IotDbVsn,
+    IoTDbVsn,
     Acc
 ) ->
     DataType = list_to_binary(
         string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
     ),
     try
-        proc_data(T, Msg, Nows, IotDbVsn, [
+        proc_data(T, Msg, Nows, IoTDbVsn, [
             #{
-                timestamp => iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows),
+                timestamp => iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows),
                 measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
                 data_type => DataType,
                 value => proc_value(DataType, ValueTkn, Msg)
@@ -559,28 +770,28 @@ proc_data(
             ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
             {error, invalid_data}
     end;
-proc_data([], _Msg, _Nows, _IotDbVsn, Acc) ->
+proc_data([], _Msg, _Nows, _IoTDbVsn, Acc) ->
     {ok, lists:reverse(Acc)}.
 
-iot_timestamp(_IotDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
+iot_timestamp(_IoTDbVsn, Timestamp, _, _) when is_integer(Timestamp) ->
     Timestamp;
-iot_timestamp(IotDbVsn, TimestampTkn, Msg, Nows) ->
-    iot_timestamp(IotDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
+iot_timestamp(IoTDbVsn, TimestampTkn, Msg, Nows) ->
+    iot_timestamp(IoTDbVsn, emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
 
 %% > v1.3.0 don't allow write nanoseconds nor microseconds
 iot_timestamp(?VSN_1_3_X, <<"now_us">>, #{now_ms := NowMs}) ->
     NowMs;
 iot_timestamp(?VSN_1_3_X, <<"now_ns">>, #{now_ms := NowMs}) ->
     NowMs;
-iot_timestamp(_IotDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
+iot_timestamp(_IoTDbVsn, <<"now_us">>, #{now_us := NowUs}) ->
     NowUs;
-iot_timestamp(_IotDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
+iot_timestamp(_IoTDbVsn, <<"now_ns">>, #{now_ns := NowNs}) ->
     NowNs;
-iot_timestamp(_IotDbVsn, Timestamp, #{now_ms := NowMs}) when
+iot_timestamp(_IoTDbVsn, Timestamp, #{now_ms := NowMs}) when
     Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
 ->
     NowMs;
-iot_timestamp(_IotDbVsn, Timestamp, _) when is_binary(Timestamp) ->
+iot_timestamp(_IoTDbVsn, Timestamp, _) when is_binary(Timestamp) ->
     binary_to_integer(Timestamp).
 
 proc_value(<<"TEXT">>, ValueTkn, Msg) ->
@@ -719,6 +930,10 @@ iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
     <<"is_aligned">>;
 iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
     <<"isAligned">>;
+iotdb_field_key(is_aligned, Vsn) when
+    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
+->
+    'isAligned';
 iotdb_field_key(device_id, ?VSN_1_3_X) ->
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_1_1_X) ->
@@ -727,6 +942,10 @@ iotdb_field_key(device_id, ?VSN_1_0_X) ->
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_0_13_X) ->
     <<"deviceId">>;
+iotdb_field_key(device_id, Vsn) when
+    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
+->
+    'deviceId';
 iotdb_field_key(data_types, ?VSN_1_3_X) ->
     <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_1_1_X) ->
@@ -734,7 +953,11 @@ iotdb_field_key(data_types, ?VSN_1_1_X) ->
 iotdb_field_key(data_types, ?VSN_1_0_X) ->
     <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_0_13_X) ->
-    <<"dataTypes">>.
+    <<"dataTypes">>;
+iotdb_field_key(data_types, Vsn) when
+    Vsn == ?PROTOCOL_V1; Vsn == ?PROTOCOL_V2; Vsn == ?PROTOCOL_V3
+->
+    dtypes.
 
 to_list(List) when is_list(List) -> List;
 to_list(Data) -> [Data].
@@ -756,6 +979,8 @@ handle_response({ok, Code, _Headers, Body}) ->
     {error, #{code => Code, body => Body}};
 handle_response({ok, Code, Body}) ->
     {error, #{code => Code, body => Body}};
+handle_response({ok, _} = Resp) ->
+    Resp;
 handle_response({error, _} = Error) ->
     Error.
 
@@ -849,3 +1074,8 @@ get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
 %% This is a self-describing message
 get_data_template(#{data := []}, Payloads) ->
     preproc_data_list(Payloads).
+
+do_on_query(InstanceId, ChannelId, Data, #{driver := restapi} = State) ->
+    emqx_bridge_http_connector:on_query(InstanceId, {ChannelId, Data}, State);
+do_on_query(InstanceId, _ChannelId, Data, #{driver := thrift} = _State) ->
+    ecpool:pick_and_do(InstanceId, {iotdb, insert_tablet, [Data]}, no_handover).

+ 39 - 4
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector_info.erl

@@ -16,6 +16,10 @@
     api_schema/1
 ]).
 
+-define(CONNECTOR, emqx_bridge_iotdb_connector).
+-define(DRIVER_REST, "restapi").
+-define(DRIVER_THRIFT, "thrift").
+
 type_name() ->
     iotdb.
 
@@ -31,7 +35,7 @@ config_transform_module() ->
 config_schema() ->
     {iotdb,
         hoconsc:mk(
-            hoconsc:map(name, hoconsc:ref(emqx_bridge_iotdb_connector, config)),
+            hoconsc:map(name, hoconsc:union(fun driver_union_selector/1)),
             #{
                 desc => <<"IoTDB Connector Config">>,
                 required => false
@@ -42,6 +46,37 @@ schema_module() ->
     emqx_bridge_iotdb_connector.
 
 api_schema(Method) ->
-    emqx_connector_schema:api_ref(
-        emqx_bridge_iotdb_connector, <<"iotdb">>, Method
-    ).
+    {<<"iotdb">>, hoconsc:union(mk_api_union_selector(Method))}.
+
+driver_union_selector(all_union_members) ->
+    [
+        ref(?DRIVER_REST, "config"),
+        ref(?DRIVER_THRIFT, "config")
+    ];
+driver_union_selector({value, Value}) ->
+    case Value of
+        #{<<"driver">> := <<"thrift">>} ->
+            [ref(?DRIVER_THRIFT, "config")];
+        _ ->
+            [ref(?DRIVER_REST, "config")]
+    end.
+
+mk_api_union_selector(Method) ->
+    fun
+        (all_union_members) ->
+            [
+                ref(?DRIVER_REST, Method),
+                ref(?DRIVER_THRIFT, Method)
+            ];
+        ({value, Value}) ->
+            case Value of
+                #{<<"driver">> := <<"thrift">>} ->
+                    [ref(?DRIVER_THRIFT, Method)];
+                _ ->
+                    [ref(?DRIVER_REST, Method)]
+            end
+    end.
+
+ref(Driver, Field) ->
+    Name = Field ++ "_" ++ Driver,
+    hoconsc:ref(?CONNECTOR, Name).

+ 93 - 30
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -21,15 +21,25 @@ all() ->
     [
         {group, iotdb110},
         {group, iotdb130},
-        {group, legacy}
+        {group, legacy},
+        {group, thrift}
     ].
 
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
+    Async = [
+        t_async_device_id_missing,
+        t_async_invalid_template,
+        t_async_query,
+        t_extract_device_id_from_rule_engine_message,
+        %%todo
+        t_sync_query_aggregated
+    ],
     [
         {iotdb110, AllTCs},
         {iotdb130, AllTCs},
-        {legacy, AllTCs}
+        {legacy, AllTCs},
+        {thrift, AllTCs -- Async}
     ].
 
 init_per_suite(Config) ->
@@ -65,6 +75,7 @@ init_per_group(Type, Config0) when Type =:= iotdb110 orelse Type =:= iotdb130 ->
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
+                {rest_port, Port},
                 {proxy_name, ProxyName},
                 {iotdb_version, IotDbVersion},
                 {iotdb_rest_prefix, <<"/rest/v2/">>}
@@ -88,6 +99,7 @@ init_per_group(legacy = Type, Config0) ->
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
+                {rest_port, Port},
                 {proxy_name, ProxyName},
                 {iotdb_version, ?VSN_0_13_X},
                 {iotdb_rest_prefix, <<"/rest/v1/">>}
@@ -101,6 +113,30 @@ init_per_group(legacy = Type, Config0) ->
                     {skip, no_iotdb}
             end
     end;
+init_per_group(thrift = Type, Config0) ->
+    Host = os:getenv("IOTDB_THRIFT_HOST", "toxiproxy.emqx.net"),
+    Port = list_to_integer(os:getenv("IOTDB_THRIFT_PORT", "46667")),
+    ProxyName = "iotdb_thrift",
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
+            [
+                {bridge_host, Host},
+                {bridge_port, Port},
+                {rest_port, 48080},
+                {proxy_name, ProxyName},
+                {iotdb_version, ?PROTOCOL_V3},
+                {iotdb_rest_prefix, <<"/rest/v2/">>}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_iotdb);
+                _ ->
+                    {skip, no_iotdb}
+            end
+    end;
 init_per_group(_Group, Config) ->
     Config.
 
@@ -121,6 +157,7 @@ init_per_testcase(TestCase, Config0) ->
         (atom_to_binary(TestCase))/binary, UniqueNum/binary
     >>,
     {_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
+
     {_, ActionConfig} = action_config(Name, Config0),
     Config = [
         {connector_type, Type},
@@ -229,16 +266,14 @@ iotdb_request(Config, Path, Body) ->
     iotdb_request(Config, Path, Body, #{}).
 
 iotdb_request(Config, Path, Body, Opts) ->
-    _BridgeConfig =
-        #{
-            <<"base_url">> := BaseURL,
-            <<"authentication">> := #{
-                <<"username">> := Username,
-                <<"password">> := Password
-            }
-        } =
-        ?config(connector_config, Config),
-    ct:pal("bridge config: ~p", [_BridgeConfig]),
+    BridgeConfig = ?config(connector_config, Config),
+    Host = ?config(bridge_host, Config),
+    Port = ?config(rest_port, Config),
+    Username = <<"root">>,
+    Password = <<"root">>,
+    BaseURL = iotdb_server_url(Host, Port),
+
+    ct:pal("bridge config: ~p", [BridgeConfig]),
     URL = <<BaseURL/binary, Path/binary>>,
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
     Headers = [
@@ -265,6 +300,8 @@ iotdb_query(Config, Query) ->
 
 is_success_check({ok, 200, _, Body}) ->
     ?assert(is_code(200, emqx_utils_json:decode(Body)));
+is_success_check({ok, _}) ->
+    ok;
 is_success_check(Other) ->
     throw(Other).
 
@@ -303,23 +340,46 @@ connector_config(Name, Config) ->
     Version = ?config(iotdb_version, Config),
     ServerURL = iotdb_server_url(Host, Port),
     ConfigString =
-        io_lib:format(
-            "connectors.~s.~s {\n"
-            "  enable = true\n"
-            "  base_url = \"~s\"\n"
-            "  iotdb_version = \"~s\"\n"
-            "  authentication = {\n"
-            "     username = \"root\"\n"
-            "     password = \"root\"\n"
-            "  }\n"
-            "}\n",
-            [
-                Type,
-                Name,
-                ServerURL,
-                Version
-            ]
-        ),
+        case ?config(test_group, Config) of
+            thrift ->
+                io_lib:format(
+                    "connectors.~s.~s {\n"
+                    "  enable = true\n"
+                    "  driver = \"thrift\"\n"
+                    "  server = \"~s:~p\"\n"
+                    "  protocol_version = \"~p\"\n"
+                    "  username = \"root\"\n"
+                    "  password = \"root\"\n"
+                    "  zoneId = \"Asia/Shanghai\"\n"
+                    "  ssl.enable = false\n"
+                    "}\n",
+                    [
+                        Type,
+                        Name,
+                        Host,
+                        Port,
+                        Version
+                    ]
+                );
+            _ ->
+                io_lib:format(
+                    "connectors.~s.~s {\n"
+                    "  enable = true\n"
+                    "  base_url = \"~s\"\n"
+                    "  iotdb_version = \"~s\"\n"
+                    "  authentication = {\n"
+                    "     username = \"root\"\n"
+                    "     password = \"root\"\n"
+                    "  }\n"
+                    "}\n",
+                    [
+                        Type,
+                        Name,
+                        ServerURL,
+                        Version
+                    ]
+                )
+        end,
     ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
     {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
 
@@ -556,7 +616,10 @@ t_async_invalid_template(Config) ->
     ).
 
 t_create_via_http(Config) ->
-    emqx_bridge_v2_testlib:t_create_via_http(Config).
+    emqx_bridge_v2_testlib:t_create_via_http(
+        Config,
+        thrift =:= ?config(test_group, Config)
+    ).
 
 t_start_stop(Config) ->
     emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped).

+ 5 - 7
apps/emqx_bridge_kafka/mix.exs

@@ -23,13 +23,11 @@ defmodule EMQXBridgeKafka.MixProject do
 
   def deps() do
     [
-      {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
-      {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
-      {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
-      {:brod, github: "kafka4beam/brod", tag: "3.18.0"},
-      ## TODO: remove `mix.exs` from `wolff` and remove this override
-      ## TODO: remove `mix.exs` from `pulsar` and remove this override
-      {:snappyer, "1.2.9", override: true},
+      UMP.common_dep(:wolff),
+      UMP.common_dep(:kafka_protocol),
+      UMP.common_dep(:brod_gssapi),
+      UMP.common_dep(:brod),
+      UMP.common_dep(:snappyer),
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false}

+ 4 - 5
apps/emqx_bridge_kafka/rebar.config

@@ -2,15 +2,14 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
-    {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
-    {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
+    {wolff, "3.0.4"},
+    {kafka_protocol, "4.1.8"},
+    {brod_gssapi, "0.1.3"},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
     {snappyer, "1.2.9"},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
-    {emqx_bridge, {path, "../../apps/emqx_bridge"}},
-    {sasl_auth, "2.3.0"}
+    {emqx_bridge, {path, "../../apps/emqx_bridge"}}
 ]}.
 
 {shell, [

+ 2 - 4
apps/emqx_bridge_pulsar/mix.exs

@@ -23,10 +23,8 @@ defmodule EMQXBridgePulsar.MixProject do
 
   def deps() do
     [
-      {:crc32cer, git: "https://github.com/zmstone/crc32cer", tag: "0.1.8", override: true},
-      ## TODO: remove `mix.exs` from `pulsar` and remove this override
-      ## TODO: remove `mix.exs` from `pulsar` and remove this override
-      {:snappyer, "1.2.9", override: true},
+      UMP.common_dep(:crc32cer),
+      UMP.common_dep(:snappyer),
       {:pulsar, github: "emqx/pulsar-client-erl", tag: "0.8.3"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},

+ 1 - 1
apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_rocketmq, [
     {description, "EMQX Enterprise RocketMQ Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_resource, rocketmq]},
     {env, [

+ 1 - 1
apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl

@@ -360,7 +360,7 @@ parse_dispatch_strategy(Template) ->
                         %% better distribute the load, effectively making it `random'
                         %% dispatch if the key is absent and we are using `key_dispatch'.
                         %% Otherwise, it'll be deterministic.
-                        emqx_guid:to_base62(emqx_guid:gen());
+                        emqx_utils:rand_id(8);
                     Key ->
                         Key
                 end

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link.app.src

@@ -2,7 +2,7 @@
 {application, emqx_cluster_link, [
     {description, "EMQX Cluster Linking"},
     % strict semver, bump manually!
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {modules, []},
     {registered, []},
     {applications, [

+ 19 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl

@@ -137,7 +137,7 @@ link_name(#{name := Name}) -> Name;
 link_name(#{<<"name">> := Name}) -> Name.
 
 topics_validator(Topics) ->
-    Errors = lists:foldl(
+    Errors0 = lists:foldl(
         fun(T, ErrAcc) ->
             try
                 _ = emqx_topic:validate(T),
@@ -150,7 +150,8 @@ topics_validator(Topics) ->
         [],
         Topics
     ),
-    check_errors(Errors, invalid_topics, topics).
+    Errors = validate_duplicate_topic_filters(Topics),
+    check_errors(Errors0 ++ Errors, invalid_topics, topics).
 
 validate_sys_link_topic(T, ErrAcc) ->
     case emqx_topic:match(T, ?TOPIC_PREFIX_WILDCARD) of
@@ -160,6 +161,22 @@ validate_sys_link_topic(T, ErrAcc) ->
             ErrAcc
     end.
 
+validate_duplicate_topic_filters(TopicFilters) ->
+    {Duplicated, _} =
+        lists:foldl(
+            fun(T, {Acc, Seen}) ->
+                case sets:is_element(T, Seen) of
+                    true ->
+                        {[{T, duplicate_topic_filter} | Acc], Seen};
+                    false ->
+                        {Acc, sets:add_element(T, Seen)}
+                end
+            end,
+            {[], sets:new([{version, 2}])},
+            TopicFilters
+        ),
+    Duplicated.
+
 check_errors([] = _Errors, _Reason, _ValuesField) ->
     ok;
 check_errors(Errors, Reason, ValuesField) ->

+ 23 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -779,3 +779,26 @@ t_update_password(_Config) ->
         []
     ),
     ok.
+
+%% Checks that we forbid duplicate topic filters.
+t_duplicate_topic_filters(_Config) ->
+    ?check_trace(
+        begin
+            Name = atom_to_binary(?FUNCTION_NAME),
+            Params1 = link_params(#{<<"topics">> => [<<"t">>, <<"t">>]}),
+            ?assertMatch(
+                {400, #{
+                    <<"message">> := #{
+                        <<"reason">> := #{
+                            <<"reason">> := <<"invalid_topics">>,
+                            <<"topics">> := #{<<"t">> := <<"duplicate_topic_filter">>}
+                        }
+                    }
+                }},
+                create_link(Name, Params1)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 64 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_tests.erl

@@ -0,0 +1,64 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+bin(X) -> emqx_utils_conv:bin(X).
+
+parse_and_check(InnerConfigs) ->
+    RootBin = <<"links">>,
+    RawConf = #{RootBin => InnerConfigs},
+    #{RootBin := Checked} = hocon_tconf:check_plain(
+        #{roots => [{links, emqx_cluster_link_schema:links_schema(#{})}]},
+        RawConf,
+        #{
+            required => false,
+            atom_key => false,
+            make_serializable => false
+        }
+    ),
+    Checked.
+
+link_params(Name) ->
+    link_params(Name, _Overrides = #{}).
+
+link_params(Name, Overrides) ->
+    Default = #{
+        <<"name">> => Name,
+        <<"clientid">> => <<"linkclientid">>,
+        <<"password">> => <<"my secret password">>,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"emqxcl_2.nohost:31883">>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>]
+    },
+    emqx_utils_maps:deep_merge(Default, Overrides).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+schema_test_() ->
+    [
+        {"topic filters must be unique",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := #{
+                            reason := invalid_topics,
+                            topics := [{<<"t">>, duplicate_topic_filter}]
+                        },
+                        value := [_, _],
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    link_params(<<"l1">>, #{<<"topics">> => [<<"t">>, <<"t">>]})
+                ])
+            )}
+    ].

+ 1 - 1
apps/emqx_connector/src/emqx_connector.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_connector, [
     {description, "EMQX Data Integration Connectors"},
-    {vsn, "0.3.4"},
+    {vsn, "0.3.5"},
     {registered, []},
     {mod, {emqx_connector_app, []}},
     {applications, [

+ 10 - 6
apps/emqx_connector/src/emqx_connector_ssl.erl

@@ -49,22 +49,26 @@ new_ssl_config(#{<<"ssl">> := _} = Config, NewSSL) ->
 new_ssl_config(Config, _NewSSL) ->
     Config.
 
-map_bad_ssl_error(#{pem_check := invalid_pem} = TLSLibError) ->
-    #{which_options := Paths, file_read := Reason} = TLSLibError,
+map_bad_ssl_error(#{
+    pem_check := NotPem,
+    file_path := FilePath,
+    which_option := Field
+}) ->
     #{
         kind => validation_error,
         reason => <<"bad_ssl_config">>,
-        bad_fields => Paths,
+        bad_field => Field,
+        file_path => FilePath,
         details => emqx_utils:format(
             "Failed to access certificate / key file: ~s",
-            [emqx_utils:explain_posix(Reason)]
+            [emqx_utils:explain_posix(NotPem)]
         )
     };
-map_bad_ssl_error(#{which_options := Paths, reason := Reason}) ->
+map_bad_ssl_error(#{which_option := Field, reason := Reason}) ->
     #{
         kind => validation_error,
         reason => <<"bad_ssl_config">>,
-        bad_fields => Paths,
+        bad_field => Field,
         details => Reason
     };
 map_bad_ssl_error(TLSLibError) ->

+ 65 - 11
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -283,6 +283,26 @@ generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
             ConnectorName
     end.
 
+transform_old_style_bridges_to_connector_and_actions_of_type(
+    {ConnectorType, #{type := ?MAP(_Name, ?UNION(UnionFun))}},
+    RawConfig
+) when is_function(UnionFun, 1) ->
+    AllMembers = UnionFun(all_union_members),
+    lists:foldl(
+        fun(
+            ?R_REF(ConnectorConfSchemaMod, ConnectorConfSchemaName),
+            RawConfigSoFar
+        ) ->
+            transform_old_style_bridges_to_connector_and_actions_of_type(
+                {ConnectorType, #{
+                    type => ?MAP(name, ?R_REF(ConnectorConfSchemaMod, ConnectorConfSchemaName))
+                }},
+                RawConfigSoFar
+            )
+        end,
+        RawConfig,
+        AllMembers
+    );
 transform_old_style_bridges_to_connector_and_actions_of_type(
     {ConnectorType, #{type := ?MAP(_Name, ?R_REF(ConnectorConfSchemaMod, ConnectorConfSchemaName))}},
     RawConfig
@@ -352,7 +372,12 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
         end,
         RawConfig,
         ActionConnectorTuples
-    ).
+    );
+transform_old_style_bridges_to_connector_and_actions_of_type(
+    {_ConnectorType, #{type := ?MAP(_Name, _)}},
+    RawConfig
+) ->
+    RawConfig.
 
 transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) ->
     ConnectorFields = ?MODULE:fields(connectors),
@@ -636,10 +661,11 @@ status() ->
 -include_lib("hocon/include/hocon_types.hrl").
 schema_homogeneous_test() ->
     case
-        lists:filtermap(
-            fun({_Name, Schema}) ->
-                is_bad_schema(Schema)
+        lists:foldl(
+            fun({_Name, Schema}, Bads) ->
+                is_bad_schema(Schema, Bads)
             end,
+            [],
             fields(connectors)
         )
     of
@@ -649,7 +675,32 @@ schema_homogeneous_test() ->
             throw(List)
     end.
 
-is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
+is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}, Bads) ->
+    is_bad_schema_type(Module, TypeName, Bads);
+is_bad_schema(#{type := ?MAP(_, ?UNION(Types))}, Bads) when is_list(Types) ->
+    is_bad_schema_types(Types, Bads);
+is_bad_schema(#{type := ?MAP(_, ?UNION(Func))}, Bads) when is_function(Func, 1) ->
+    Types = Func(all_union_members),
+    is_bad_schema_types(Types, Bads).
+
+is_bad_schema_types(Types, Bads) ->
+    lists:foldl(
+        fun
+            (?R_REF(Module, TypeName), Acc) ->
+                is_bad_schema_type(Module, TypeName, Acc);
+            (Type, Acc) ->
+                [
+                    #{
+                        type => Type
+                    }
+                    | Acc
+                ]
+        end,
+        Bads,
+        Types
+    ).
+
+is_bad_schema_type(Module, TypeName, Bads) ->
     Fields = Module:fields(TypeName),
     ExpectedFieldNames = common_field_names(),
     MissingFields = lists:filter(
@@ -657,13 +708,16 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
     ),
     case MissingFields of
         [] ->
-            false;
+            Bads;
         _ ->
-            {true, #{
-                schema_module => Module,
-                type_name => TypeName,
-                missing_fields => MissingFields
-            }}
+            [
+                #{
+                    schema_module => Module,
+                    type_name => TypeName,
+                    missing_fields => MissingFields
+                }
+                | Bads
+            ]
     end.
 
 common_field_names() ->

+ 1 - 1
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -742,7 +742,7 @@ t_create_with_bad_tls_files(Config) ->
                     <<"reason">> := <<"bad_ssl_config">>,
                     <<"details">> :=
                         <<"Failed to access certificate / key file: No such file or directory">>,
-                    <<"bad_fields">> := [[<<"cacertfile">>]]
+                    <<"bad_field">> := <<"cacertfile">>
                 },
                 json(Msg0)
             ),

+ 1 - 0
apps/emqx_connector_aggregator/mix.exs

@@ -24,6 +24,7 @@ defmodule EMQXConnectorAggregator.MixProject do
   def deps() do
     [
       {:emqx, in_umbrella: true},
+      UMP.common_dep(:gproc),
       {:erl_csv, "0.2.0"}
     ]
   end

+ 3 - 2
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src

@@ -1,10 +1,11 @@
 {application, emqx_connector_aggregator, [
     {description, "EMQX Enterprise Connector Data Aggregator"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {applications, [
         kernel,
-        stdlib
+        stdlib,
+        gproc
     ]},
     {env, []},
     {modules, []},

+ 23 - 7
apps/emqx_connector_aggregator/src/emqx_connector_aggregator.erl

@@ -85,28 +85,34 @@ buffer_to_map(#buffer{} = Buffer) ->
 %%
 
 write_records_limited(Name, Buffer = #buffer{max_records = undefined}, Records) ->
-    write_records(Name, Buffer, Records);
+    write_records(Name, Buffer, Records, _NumWritten = undefined);
 write_records_limited(Name, Buffer = #buffer{max_records = MaxRecords}, Records) ->
     NR = length(Records),
     case inc_num_records(Buffer, NR) of
         NR ->
             %% NOTE: Allow unconditionally if it's the first write.
-            write_records(Name, Buffer, Records);
+            write_records(Name, Buffer, Records, NR);
         NWritten when NWritten > MaxRecords ->
             NextBuffer = rotate_buffer(Name, Buffer),
             write_records_limited(Name, NextBuffer, Records);
-        _ ->
-            write_records(Name, Buffer, Records)
+        NWritten ->
+            write_records(Name, Buffer, Records, NWritten)
     end.
 
-write_records(Name, Buffer = #buffer{fd = Writer}, Records) ->
+write_records(Name, Buffer = #buffer{fd = Writer, max_records = MaxRecords}, Records, NumWritten) ->
     case emqx_connector_aggreg_buffer:write(Records, Writer) of
         ok ->
             ?tp(connector_aggreg_records_written, #{action => Name, records => Records}),
+            case is_number(NumWritten) andalso NumWritten >= MaxRecords of
+                true ->
+                    rotate_buffer_async(Name, Buffer);
+                false ->
+                    ok
+            end,
             ok;
         {error, terminated} ->
             BufferNext = rotate_buffer(Name, Buffer),
-            write_records(Name, BufferNext, Records);
+            write_records_limited(Name, BufferNext, Records);
         {error, _} = Error ->
             Error
     end.
@@ -120,6 +126,9 @@ next_buffer(Name, Timestamp) ->
 rotate_buffer(Name, #buffer{fd = FD}) ->
     gen_server:call(?SRVREF(Name), {rotate_buffer, FD}).
 
+rotate_buffer_async(Name, #buffer{fd = FD}) ->
+    gen_server:cast(?SRVREF(Name), {rotate_buffer, FD}).
+
 send_close_buffer(Name, Timestamp) ->
     gen_server:cast(?SRVREF(Name), {close_buffer, Timestamp}).
 
@@ -184,6 +193,9 @@ handle_call(take_error, _From, St0) ->
 
 handle_cast({close_buffer, Timestamp}, St) ->
     {noreply, handle_close_buffer(Timestamp, St)};
+handle_cast({rotate_buffer, FD}, St0) ->
+    St = handle_rotate_buffer(FD, St0),
+    {noreply, St, 0};
 handle_cast(_Cast, St) ->
     {noreply, St}.
 
@@ -469,6 +481,8 @@ parse_filename(Filename) ->
 
 %%
 
+-define(COUNTER_POS, 2).
+
 add_counter({Tab, Counter}) ->
     add_counter({Tab, Counter}, 0).
 
@@ -476,11 +490,13 @@ add_counter({Tab, Counter}, N) ->
     ets:insert(Tab, {Counter, N}).
 
 inc_counter({Tab, Counter}, Size) ->
-    ets:update_counter(Tab, Counter, {2, Size}).
+    ets:update_counter(Tab, Counter, {?COUNTER_POS, Size}).
 
 del_counter({Tab, Counter}) ->
     ets:delete(Tab, Counter).
 
+-undef(COUNTER_POS).
+
 %%
 
 create_tab(Name) ->

+ 98 - 0
apps/emqx_connector_aggregator/test/emqx_connector_aggregator_SUITE.erl

@@ -0,0 +1,98 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_connector_aggregator_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [emqx_connector_aggregator],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% `emqx_connector_aggreg_delivery' API
+%%------------------------------------------------------------------------------
+
+init_transfer_state(_Buffer, Opts) ->
+    #{opts => Opts}.
+
+process_append(_IOData, State) ->
+    State.
+
+process_write(State) ->
+    {ok, State}.
+
+process_complete(_State) ->
+    {ok, done}.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+now_ms() ->
+    erlang:system_time(millisecond).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+%% Verifies that a new delivery is triggered when we reach the configured maximum number
+%% of records.
+t_trigger_max_records(Config) ->
+    ?check_trace(
+        #{timetrap => 3_000},
+        begin
+            AggregId = ?FUNCTION_NAME,
+            MaxRecords = 3,
+            AggregOpts = #{
+                max_records => MaxRecords,
+                time_interval => 120_000,
+                work_dir => emqx_cth_suite:work_dir(Config)
+            },
+            ContainerOpts = #{
+                type => csv,
+                column_order => []
+            },
+            DeliveryOpts = #{
+                callback_module => ?MODULE,
+                container => ContainerOpts,
+                upload_options => #{}
+            },
+            {ok, _Sup} = emqx_connector_aggreg_upload_sup:start_link(
+                AggregId, AggregOpts, DeliveryOpts
+            ),
+            Timestamp = now_ms(),
+            Records = lists:duplicate(MaxRecords, #{}),
+            %% Should immediately trigger a delivery process to be kicked off.
+            ?assertMatch(
+                {ok, {ok, _}},
+                ?wait_async_action(
+                    emqx_connector_aggregator:push_records(AggregId, Timestamp, Records),
+                    #{?snk_kind := connector_aggreg_delivery_completed}
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -2,7 +2,7 @@
 {application, emqx_dashboard, [
     {description, "EMQX Web Dashboard"},
     % strict semver, bump manually!
-    {vsn, "5.1.4"},
+    {vsn, "5.1.5"},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
     {applications, [

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_listener.erl

@@ -188,7 +188,7 @@ ensure_ssl_cert(#{<<"listeners">> := #{<<"https">> := #{<<"bind">> := Bind} = Ht
     Https1 = emqx_dashboard_schema:https_converter(Https0, #{}),
     Conf1 = emqx_utils_maps:deep_put([<<"listeners">>, <<"https">>], Conf0, Https1),
     Ssl = maps:get(<<"ssl_options">>, Https1, undefined),
-    Opts = #{required_keys => [[<<"keyfile">>], [<<"certfile">>], [<<"cacertfile">>]]},
+    Opts = #{required_keys => [[<<"keyfile">>], [<<"certfile">>]]},
     case emqx_tls_lib:ensure_ssl_files_in_mutable_certs_dir(?DIR, Ssl, Opts) of
         {ok, undefined} ->
             {error, <<"ssl_cert_not_found">>};

+ 1 - 1
apps/emqx_ft/test/emqx_ft_conf_SUITE.erl

@@ -349,7 +349,7 @@ mk_s3_config(S3Config) ->
     maps:merge(BaseS3Config, S3Config).
 
 gen_clientid() ->
-    emqx_base62:encode(emqx_guid:gen()).
+    emqx_utils:rand_id(16).
 
 list_ssl_certfiles(_Config) ->
     CertDir = emqx:mutable_certs_dir(),

+ 3 - 3
apps/emqx_ft/test/props/prop_emqx_ft_assembly.erl

@@ -18,7 +18,7 @@
 
 -include_lib("proper/include/proper.hrl").
 
--import(emqx_proper_types, [scaled/2, fixedmap/1, typegen/0, generate/2]).
+-import(emqx_proper_types, [scaled/2, logscaled/2, fixedmap/1, typegen/0, generate/2]).
 
 -define(COVERAGE_TIMEOUT, 10000).
 
@@ -177,10 +177,10 @@ nsegs(Filesize, [BaseSegsize | _]) ->
     Filesize div max(1, BaseSegsize) + 1.
 
 segments_t(Filesize, Segsizes) ->
-    scaled(nsegs(Filesize, Segsizes), list({node_t(), segment_t(Filesize, Segsizes)})).
+    logscaled(nsegs(Filesize, Segsizes), list({node_t(), segment_t(Filesize, Segsizes)})).
 
 segments_t(Filesize, Segsizes, Hole) ->
-    scaled(nsegs(Filesize, Segsizes), list({node_t(), segment_t(Filesize, Segsizes, Hole)})).
+    logscaled(nsegs(Filesize, Segsizes), list({node_t(), segment_t(Filesize, Segsizes, Hole)})).
 
 segment_t(Filesize, Segsizes, Hole) ->
     ?SUCHTHATMAYBE(

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway, [
     {description, "The Gateway management application"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {mod, {emqx_gateway_app, []}},
     {applications, [

+ 2 - 7
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -490,13 +490,8 @@ reason2msg(
         "The authentication already exist on ~s",
         [listener_id(GwName, LType, LName)]
     );
-reason2msg(
-    {bad_ssl_config, #{
-        reason := Reason,
-        which_options := Options
-    }}
-) ->
-    fmtstr("Bad TLS configuration for ~p, reason: ~s", [Options, Reason]);
+reason2msg({bad_ssl_config, Reason}) ->
+    fmtstr("Bad TLS configuration: ~0p", [Reason]);
 reason2msg(
     {#{roots := [{gateway, _}]}, [_ | _]} = Error
 ) ->

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_insta_sup.erl

@@ -209,7 +209,7 @@ handle_info(
     end;
 handle_info(Info, State) ->
     ?SLOG(warning, #{
-        msg => "unexcepted_info",
+        msg => "unexpected_info",
         info => Info
     }),
     {noreply, State}.

+ 2 - 1
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -86,6 +86,7 @@
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
 
 -define(DEF_IDLE_SECONDS, 30).
+-define(RAND_CLIENTID_BYTES, 16).
 
 -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
 
@@ -142,7 +143,7 @@ init(
             peerhost => PeerHost,
             peername => PeerName,
             sockport => SockPort,
-            clientid => emqx_guid:to_base62(emqx_guid:gen()),
+            clientid => emqx_utils:rand_id(?RAND_CLIENTID_BYTES),
             username => undefined,
             is_bridge => false,
             is_superuser => false,

+ 1 - 1
apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_coap, [
     {description, "CoAP Gateway"},
-    {vsn, "0.1.10"},
+    {vsn, "0.1.11"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_gbt32960, [
     {description, "GBT32960 Gateway"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -271,7 +271,7 @@ handle_in(Frame = #frame{cmd = Cmd}, Channel = #channel{inflight = Inflight}) ->
     _ = upstreaming(Frame, NChannel),
     {ok, [{outgoing, Outgoings}], NChannel};
 handle_in(Frame, Channel) ->
-    log(warning, #{msg => "unexcepted_frame", frame => Frame}, Channel),
+    log(warning, #{msg => "unexpected_frame", frame => Frame}, Channel),
     {ok, Channel}.
 
 %%--------------------------------------------------------------------

+ 3 - 3
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl

@@ -572,7 +572,7 @@ parse_params_(<<16#0F, Val:?WORD, Rest/binary>>, Acc) ->
 parse_params_(<<16#10, Val:?BYTE, Rest/binary>>, Acc) ->
     parse_params_(Rest, [#{<<"0x10">> => Val} | Acc]);
 parse_params_(Cmd, Acc) ->
-    ?SLOG(error, #{msg => "unexcepted_param_identifier", cmd => Cmd}),
+    ?SLOG(error, #{msg => "unexpected_param_identifier", cmd => Cmd}),
     lists:reverse(Acc).
 
 parse_ctrl_param(16#01, Param) ->
@@ -590,7 +590,7 @@ parse_ctrl_param(16#06, <<Level:?BYTE, Msg/binary>>) ->
 parse_ctrl_param(16#07, _) ->
     <<>>;
 parse_ctrl_param(Cmd, Param) ->
-    ?SLOG(error, #{msg => "unexcepted_param", param => Param, cmd => Cmd}),
+    ?SLOG(error, #{msg => "unexpected_param", param => Param, cmd => Cmd}),
     <<>>.
 
 parse_upgrade_feild(Param) ->
@@ -745,7 +745,7 @@ tune_ctrl_param(16#06, #{<<"Level">> := Level, <<"Message">> := Msg}) ->
 tune_ctrl_param(16#07, _) ->
     <<>>;
 tune_ctrl_param(Cmd, Param) ->
-    ?SLOG(error, #{msg => "unexcepted_cmd", cmd => Cmd, param => Param}),
+    ?SLOG(error, #{msg => "unexpected_cmd", cmd => Cmd, param => Param}),
     <<>>.
 
 tune_upgrade_feild(Param) ->

+ 1 - 1
apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_mqttsn, [
     {description, "MQTT-SN Gateway"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 3 - 1
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -123,6 +123,8 @@
 %% 2h
 -define(DEFAULT_SESSION_EXPIRY, 7200000).
 
+-define(RAND_CLIENTID_BYTES, 16).
+
 %%--------------------------------------------------------------------
 %% Init the channel
 %%--------------------------------------------------------------------
@@ -307,7 +309,7 @@ maybe_assign_clientid(_Packet, ClientInfo = #{clientid := ClientId}) when
     ClientId == undefined;
     ClientId == <<>>
 ->
-    {ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
+    {ok, ClientInfo#{clientid => emqx_utils:rand_id(?RAND_CLIENTID_BYTES)}};
 maybe_assign_clientid(_Packet, ClientInfo) ->
     {ok, ClientInfo}.
 

+ 1 - 1
apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_stomp, [
     {description, "Stomp Gateway"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 2 - 1
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -109,6 +109,7 @@
 ).
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
+-define(RAND_CLIENTID_BYETS, 16).
 
 %%--------------------------------------------------------------------
 %% Init the channel
@@ -303,7 +304,7 @@ maybe_assign_clientid(_Packet, ClientInfo = #{clientid := ClientId}) when
     ClientId == undefined;
     ClientId == <<>>
 ->
-    {ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
+    {ok, ClientInfo#{clientid => emqx_utils:rand_id(?RAND_CLIENTID_BYETS)}};
 maybe_assign_clientid(_Packet, ClientInfo) ->
     {ok, ClientInfo}.
 

+ 1 - 1
apps/emqx_license/src/emqx_license.app.src

@@ -1,6 +1,6 @@
 {application, emqx_license, [
     {description, "EMQX License"},
-    {vsn, "5.1.0"},
+    {vsn, "5.1.1"},
     {modules, []},
     {registered, [emqx_license_sup]},
     {applications, [kernel, stdlib, emqx_ctl, emqx_gateway]},

+ 2 - 1
apps/emqx_license/src/emqx_license_cli.erl

@@ -41,7 +41,8 @@ license(_) ->
     emqx_ctl:usage(
         [
             {"license info", "Show license info"},
-            {"license update <License>", "Update license given as a string"}
+            {"license update '<License>'|'file:///tmp/emqx.lic'",
+                "Update license given as a string\nor referenced by a file path via 'file://' prefix"}
         ]
     ).
 

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_status.erl

@@ -114,7 +114,7 @@ running_status(Format) ->
                 end,
             ContentType =
                 case Format of
-                    <<"json">> -> <<"applicatin/json">>;
+                    <<"json">> -> <<"application/json">>;
                     _ -> <<"text/plain">>
                 end,
             Headers = #{

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -212,7 +212,7 @@ upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -
     Method = post,
     Filename = filename:basename(FilePath),
     {ok, Data} = file:read_file(FilePath),
-    Boundary = emqx_guid:to_base62(emqx_guid:gen()),
+    Boundary = emqx_utils:rand_id(32),
     RequestBody = format_multipart_formdata(
         Data,
         RequestData,

+ 1 - 1
apps/emqx_message_transformation/src/emqx_message_transformation.app.src

@@ -1,6 +1,6 @@
 {application, emqx_message_transformation, [
     {description, "EMQX Message Transformation"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]},
     {mod, {emqx_message_transformation_app, []}},
     {applications, [

+ 22 - 3
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -446,7 +446,7 @@ take_from_context(Context, Message) ->
 
 decode(Payload, #{type := none}, _Transformation) ->
     {ok, Payload};
-decode(Payload, #{type := json}, Transformation) ->
+decode(Payload, #{type := json}, Transformation) when is_binary(Payload) ->
     case emqx_utils_json:safe_decode(Payload, [return_maps]) of
         {ok, JSON} ->
             {ok, JSON};
@@ -461,7 +461,7 @@ decode(Payload, #{type := json}, Transformation) ->
             },
             {error, TraceFailureContext}
     end;
-decode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
+decode(Payload, #{type := avro, schema := SerdeName}, Transformation) when is_binary(Payload) ->
     try
         {ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
     catch
@@ -531,7 +531,26 @@ decode(
                 }
             },
             {error, TraceFailureContext}
-    end.
+    end;
+decode(NotABinary, #{} = Decoder, Transformation) ->
+    DecoderContext0 = maps:with([type, name, message_type], Decoder),
+    DecoderContext1 = emqx_utils_maps:rename(name, schema_name, DecoderContext0),
+    DecoderContext = emqx_utils_maps:rename(type, decoder, DecoderContext1),
+    Context =
+        maps:merge(
+            DecoderContext,
+            #{
+                reason => <<"payload must be a binary">>,
+                hint => <<"check the transformation(s) before this one for inconsistencies">>,
+                bad_payload => NotABinary
+            }
+        ),
+    TraceFailureContext = #trace_failure_context{
+        transformation = Transformation,
+        tag = "payload_decode_failed",
+        context = Context
+    },
+    {error, TraceFailureContext}.
 
 encode(Payload, #{type := none}, _Transformation) ->
     {ok, Payload};

+ 41 - 0
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -2000,3 +2000,44 @@ t_dryrun_transformation(_Config) ->
         []
     ),
     ok.
+
+%% Verifies that if a transformation's decoder is fed a non-binary input (e.g.:
+%% `undefined'), it returns a friendly message.
+t_non_binary_input_for_decoder(_Config) ->
+    ?check_trace(
+        begin
+            %% The transformations set up here lead to an invalid input payload for the
+            %% second transformation: `payload = undefined' (an atom) after the first
+            %% transformation.
+            Name1 = <<"foo">>,
+            Operations1 = [operation(payload, <<"flags.dup">>)],
+            NoSerde = #{<<"type">> => <<"none">>},
+            Transformation1 = transformation(Name1, Operations1, #{
+                <<"payload_decoder">> => NoSerde,
+                <<"payload_encoder">> => NoSerde
+            }),
+            Name2 = <<"bar">>,
+            Operations2 = [],
+            JSONSerde = #{<<"type">> => <<"json">>},
+            Transformation2 = transformation(Name2, Operations2, #{
+                <<"payload_decoder">> => JSONSerde,
+                <<"payload_encoder">> => JSONSerde
+            }),
+            {201, _} = insert(Transformation1),
+            {201, _} = insert(Transformation2),
+
+            C = connect(<<"c1">>),
+            {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+            ok = publish(C, <<"t/1">>, #{x => 1, y => true}),
+            ?assertNotReceive({publish, _}),
+
+            ok
+        end,
+        fun(Trace) ->
+            SubTrace = ?of_kind(message_transformation_failed, Trace),
+            ?assertMatch([], [E || E = #{reason := function_clause} <- SubTrace]),
+            ?assertMatch([#{reason := <<"payload must be a binary">>} | _], SubTrace),
+            ok
+        end
+    ),
+    ok.

+ 6 - 6
apps/emqx_mix_utils/lib/mix/tasks/emqx.ct.ex

@@ -20,12 +20,12 @@ defmodule Mix.Tasks.Emqx.Ct do
     #   |> IO.inspect(label: app_to_debug)
     # end)
 
-    ensure_whole_emqx_project_is_loaded!()
+    ensure_whole_emqx_project_is_loaded()
     unload_emqx_applications!()
     load_common_helpers!()
     hack_test_data_dirs!(opts.suites)
 
-    # ensure_suites_are_loaded!(opts.suites)
+    # ensure_suites_are_loaded(opts.suites)
 
     {_, 0} = System.cmd("epmd", ["-daemon"])
     node_name = :"test@127.0.0.1"
@@ -76,7 +76,7 @@ defmodule Mix.Tasks.Emqx.Ct do
   This needs to run before we unload the applications, otherwise we lose their `:modules`
   attribute.
   """
-  def ensure_whole_emqx_project_is_loaded!() do
+  def ensure_whole_emqx_project_is_loaded() do
     apps_path =
       Mix.Project.project_file()
       |> Path.dirname()
@@ -100,7 +100,7 @@ defmodule Mix.Tasks.Emqx.Ct do
           []
       end
     end)
-    |> Code.ensure_all_loaded!()
+    |> Code.ensure_all_loaded()
   end
 
   @doc """
@@ -170,7 +170,7 @@ defmodule Mix.Tasks.Emqx.Ct do
     end
   end
 
-  defp ensure_suites_are_loaded!(suites) do
+  defp ensure_suites_are_loaded(suites) do
     suites
     |> Enum.map(fn suite_path ->
       ["apps", app_name | _] = Path.split(suite_path)
@@ -181,7 +181,7 @@ defmodule Mix.Tasks.Emqx.Ct do
       {:ok, mods} = :application.get_key(app, :modules)
       mods
     end)
-    |> Code.ensure_all_loaded!()
+    |> Code.ensure_all_loaded()
   end
 
   def add_to_path_and_cache(lib_name) do

+ 1 - 1
apps/emqx_mix_utils/lib/mix/tasks/emqx.eunit.ex

@@ -14,7 +14,7 @@ defmodule Mix.Tasks.Emqx.Eunit do
 
     Enum.each([:common_test, :eunit, :mnesia], &ECt.add_to_path_and_cache/1)
 
-    ECt.ensure_whole_emqx_project_is_loaded!()
+    ECt.ensure_whole_emqx_project_is_loaded()
     ECt.unload_emqx_applications!()
 
     {_, 0} = System.cmd("epmd", ["-daemon"])

+ 1 - 1
apps/emqx_mix_utils/lib/mix/tasks/emqx.proper.ex

@@ -14,7 +14,7 @@ defmodule Mix.Tasks.Emqx.Proper do
 
     Enum.each([:common_test, :eunit, :mnesia], &ECt.add_to_path_and_cache/1)
 
-    ECt.ensure_whole_emqx_project_is_loaded!()
+    ECt.ensure_whole_emqx_project_is_loaded()
     ECt.unload_emqx_applications!()
 
     {_, 0} = System.cmd("epmd", ["-daemon"])

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.33"},
+    {vsn, "0.1.34"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 16 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -97,6 +97,7 @@
     %% get the callback mode of a specific module
     get_callback_mode/1,
     get_resource_type/1,
+    get_callback_mode/2,
     %% start the instance
     call_start/3,
     %% verify if the resource is working normally
@@ -160,7 +161,8 @@
     on_remove_channel/3,
     on_get_channels/1,
     query_mode/1,
-    on_format_query_result/1
+    on_format_query_result/1,
+    callback_mode/1
 ]).
 
 %% when calling emqx_resource:start/1
@@ -171,7 +173,10 @@
 -callback on_stop(resource_id(), resource_state()) -> term().
 
 %% when calling emqx_resource:get_callback_mode/1
--callback callback_mode() -> callback_mode().
+-callback callback_mode() -> callback_mode() | undefined.
+
+%% when calling emqx_resource:get_callback_mode/1
+-callback callback_mode(resource_state()) -> callback_mode().
 
 %% when calling emqx_resource:query/3
 -callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
@@ -495,6 +500,15 @@ get_callback_mode(Mod) ->
 get_resource_type(Mod) ->
     Mod:resource_type().
 
+-spec get_callback_mode(module(), resource_state()) -> callback_mode() | undefined.
+get_callback_mode(Mod, State) ->
+    case erlang:function_exported(Mod, callback_mode, 1) of
+        true ->
+            Mod:callback_mode(State);
+        _ ->
+            undefined
+    end.
+
 -spec call_start(resource_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
 call_start(ResId, Mod, Config) ->

+ 11 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -795,8 +795,10 @@ start_resource(Data, From) ->
             UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
             %% Perform an initial health_check immediately before transitioning into a connected state
             UpdatedData2 = add_channels(UpdatedData1),
+            UpdatedData3 = maybe_update_callback_mode(UpdatedData2),
+
             Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
-            {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions};
+            {next_state, ?state_connecting, update_state(UpdatedData3, Data), Actions};
         {error, Reason} = Err ->
             IsDryRun = emqx_resource:is_dry_run(ResId),
             ?SLOG(
@@ -836,6 +838,14 @@ add_channels(Data) ->
     ),
     Data#data{added_channels = NewChannels}.
 
+maybe_update_callback_mode(Data = #data{mod = ResourceType, state = ResourceState}) ->
+    case emqx_resource:get_callback_mode(ResourceType, ResourceState) of
+        undefined ->
+            Data;
+        CallMode ->
+            Data#data{callback_mode = CallMode}
+    end.
+
 add_channels_in_list([], Data) ->
     Data;
 add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.26"},
+    {vsn, "5.0.27"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx, emqx_ctl]},

+ 65 - 18
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -308,12 +308,13 @@ deliver(Messages, Pid, Topic, Limiter) ->
             no_receiver;
         _ ->
             BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
+            NMessages = filter_delivery(Messages, Topic),
             case BatchSize of
                 0 ->
-                    deliver_to_client(Messages, Pid, Topic),
+                    deliver_to_client(NMessages, Pid, Topic),
                     {ok, Limiter};
                 _ ->
-                    deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter)
+                    deliver_in_batches(NMessages, BatchSize, Pid, Topic, Limiter)
             end
     end.
 
@@ -334,25 +335,71 @@ deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
             Drop
     end.
 
-deliver_to_client([Msg | T], Pid, Topic) ->
-    _ =
-        case emqx_banned:check_clientid(Msg#message.from) of
-            false ->
-                Pid ! {deliver, Topic, Msg};
-            true ->
-                ?tp(
-                    notice,
-                    ignore_retained_message_deliver,
-                    #{
-                        reason => "client is banned",
-                        clientid => Msg#message.from
-                    }
-                )
-        end,
-    deliver_to_client(T, Pid, Topic);
+deliver_to_client([Msg | Rest], Pid, Topic) ->
+    Pid ! {deliver, Topic, Msg},
+    deliver_to_client(Rest, Pid, Topic);
 deliver_to_client([], _, _) ->
     ok.
 
+-define(DELIVER_ALLOWED, true).
+-define(DELIVER_NOT_ALLOWED, false).
+-define(publisher_client_banned, publisher_client_banned).
+-define(msg_topic_not_match, msg_topic_not_match).
+
+filter_delivery(Messages, Topic) ->
+    FilterFun =
+        fun(Msg) ->
+            Pipe = emqx_utils:pipeline(
+                [
+                    fun check_clientid_banned/2,
+                    fun 'check_prefixed_$_with_wildcard'/2
+                ],
+                {Msg, Topic},
+                ?DELIVER_NOT_ALLOWED
+            ),
+            _ =
+                case Pipe of
+                    {ok, _, ?DELIVER_ALLOWED} ->
+                        true;
+                    {error, _, _} ->
+                        false
+                end
+        end,
+    lists:filter(FilterFun, Messages).
+
+check_clientid_banned({Msg, _Topic} = Input, _) ->
+    case emqx_banned:check_clientid(Msg#message.from) of
+        false ->
+            {ok, Input, ?DELIVER_ALLOWED};
+        true ->
+            ?tp(
+                debug,
+                ignore_retained_message_due_to_banned,
+                #{
+                    reason => ?publisher_client_banned,
+                    clientid => Msg#message.from
+                }
+            ),
+            {error, ?publisher_client_banned, ?DELIVER_NOT_ALLOWED}
+    end.
+
+%% [MQTT-4.7.2-1]
+'check_prefixed_$_with_wildcard'({Msg, Topic} = Input, _) ->
+    case emqx_topic:match(Msg#message.topic, Topic) of
+        false ->
+            ?tp(
+                ignore_retained_message_due_to_topic_not_match,
+                #{
+                    reason => ?msg_topic_not_match,
+                    msg_topic => Msg#message.topic,
+                    subscribed_topic => Topic
+                }
+            ),
+            {error, ?msg_topic_not_match, ?DELIVER_NOT_ALLOWED};
+        true ->
+            {ok, Input, ?DELIVER_ALLOWED}
+    end.
+
 take(N, List) ->
     take(N, List, 0, []).
 

+ 167 - 26
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -45,23 +45,25 @@ groups() ->
 common_tests() ->
     emqx_common_test_helpers:all(?MODULE) -- [t_reindex].
 
--define(BASE_CONF, <<
-    "retainer {\n"
-    "    enable = true\n"
-    "    msg_clear_interval = 0s\n"
-    "    msg_expiry_interval = 0s\n"
-    "    max_payload_size = 1MB\n"
-    "    flow_control {\n"
-    "        batch_read_number = 0\n"
-    "        batch_deliver_number = 0\n"
-    "     }\n"
-    "   backend {\n"
-    "        type = built_in_database\n"
-    "        storage_type = ram\n"
-    "        max_retained_messages = 0\n"
-    "     }\n"
-    "}"
->>).
+%% erlfmt-ignore
+-define(BASE_CONF, <<"
+retainer {
+  enable = true
+  msg_clear_interval = 0s
+  msg_expiry_interval = 0s
+  max_payload_size = 1MB
+  delivery_rate = \"1000/s\"
+  flow_control {
+    batch_read_number = 0
+    batch_deliver_number = 0
+  }
+  backend {
+    type = built_in_database
+    storage_type = ram
+    max_retained_messages = 0
+  }
+}
+">>).
 
 %%--------------------------------------------------------------------
 %% Setups
@@ -99,9 +101,9 @@ init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(t_flow_control, _Config) ->
-    restore_delivery();
+    reset_delivery_rate_to_default();
 end_per_testcase(t_cursor_cleanup, _Config) ->
-    restore_delivery();
+    reset_delivery_rate_to_default();
 end_per_testcase(_TestCase, _Config) ->
     ok.
 
@@ -248,6 +250,141 @@ t_wildcard_subscription(Config) ->
     emqtt:publish(C1, <<"/x/y/z">>, <<"">>, [{qos, 0}, {retain, true}]),
     ok = emqtt:disconnect(C1).
 
+'t_wildcard_no_$_prefix'(Config) ->
+    {ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C0),
+    emqtt:publish(
+        C0,
+        <<"$test/t/0">>,
+        <<"this is a retained message with $ prefix in topic">>,
+        [{qos, 0}, {retain, true}]
+    ),
+    emqtt:publish(
+        C0,
+        <<"$test/test/1">>,
+        <<"this is another retained message with $ prefix in topic">>,
+        [{qos, 0}, {retain, true}]
+    ),
+
+    emqtt:publish(
+        C0,
+        <<"t/1">>,
+        <<"this is a retained message 1">>,
+        [{qos, 0}, {retain, true}]
+    ),
+    emqtt:publish(
+        C0,
+        <<"t/2">>,
+        <<"this is a retained message 2">>,
+        [{qos, 0}, {retain, true}]
+    ),
+    publish(
+        C0,
+        <<"/t/3">>,
+        <<"this is a retained message 3">>,
+        [{qos, 0}, {retain, true}],
+        Config
+    ),
+
+    snabbkaffe:start_trace(),
+    SnabbkaffeSubFun = fun(NEvents) ->
+        snabbkaffe:subscribe(
+            ?match_event(#{?snk_kind := ignore_retained_message_due_to_topic_not_match}),
+            NEvents,
+            _Timeout = 10000,
+            0
+        )
+    end,
+    SnabbkaffeReceiveAndAssert = fun(SubRef, NEvents) ->
+        {ok, Trace} = snabbkaffe:receive_events(SubRef),
+        ?assertEqual(
+            NEvents, length(?of_kind(ignore_retained_message_due_to_topic_not_match, Trace))
+        )
+    end,
+
+    %%%%%%%%%%
+    %% C1 subscribes to `#'
+    {ok, SubRef1} = SnabbkaffeSubFun(2),
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C1),
+    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"#">>, 0),
+    %% Matched 5 msgs but only receive 3 msgs, 2 ignored
+    %% (`$test/t/0` and `$test/test/1` with `$` prefix in topic are ignored)
+    SnabbkaffeReceiveAndAssert(SubRef1, 2),
+    Msgs1 = receive_messages(3),
+    ?assertMatch(
+        %% The order in which messages are received is not always the same as the order in which they are published.
+        %% The received order follows the order in which the indexes match.
+        %% i.e.
+        %%   The first level of the topic `/t/3` is empty.
+        %%   So it will be the first message that be matched and be sent.
+        [
+            #{topic := <<"/t/3">>},
+            #{topic := <<"t/1">>},
+            #{topic := <<"t/2">>}
+        ],
+        Msgs1,
+        #{msgs => Msgs1}
+    ),
+    ok = emqtt:disconnect(C1),
+
+    %%%%%%%%%%
+    %% C2 subscribes to `$test/#'
+    {ok, C2} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C2),
+    {ok, #{}, [0]} = emqtt:subscribe(C2, <<"$test/#">>, 0),
+    %% Matched 2 msgs and receive them all, no ignored
+    Msgs2 = receive_messages(2),
+    ?assertMatch(
+        [
+            #{topic := <<"$test/t/0">>},
+            #{topic := <<"$test/test/1">>}
+        ],
+        Msgs2,
+        #{msgs => Msgs2}
+    ),
+    ok = emqtt:disconnect(C2),
+
+    %%%%%%%%%%
+    %% C3 subscribes to `+/+'
+    {ok, C3} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C3),
+    {ok, #{}, [0]} = emqtt:subscribe(C3, <<"+/+">>, 0),
+    %% Matched 2 msgs and receive them all, no ignored
+    Msgs3 = receive_messages(2),
+    ?assertMatch(
+        [
+            #{topic := <<"t/1">>},
+            #{topic := <<"t/2">>}
+        ],
+        Msgs3,
+        #{msgs => Msgs3}
+    ),
+    ok = emqtt:disconnect(C3),
+
+    %%%%%%%%%%
+    %% C4 subscribes to `+/t/#'
+    {ok, SubRef4} = SnabbkaffeSubFun(1),
+    {ok, C4} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C4),
+    {ok, #{}, [0]} = emqtt:subscribe(C4, <<"+/t/#">>, 0),
+    %% Matched 2 msgs but only receive 1 msgs, 1 ignored
+    %% (`$test/t/0` with `$` prefix in topic are ignored)
+    SnabbkaffeReceiveAndAssert(SubRef4, 1),
+    Msgs4 = receive_messages(1),
+    ?assertMatch(
+        [
+            #{topic := <<"/t/3">>}
+        ],
+        Msgs4,
+        #{msgs => Msgs4}
+    ),
+    ok = emqtt:disconnect(C4),
+
+    snabbkaffe:stop(),
+
+    ok.
+
 t_message_expiry(Config) ->
     ConfMod = fun(Conf) ->
         Conf#{<<"delivery_rate">> := <<"infinity">>}
@@ -689,7 +826,7 @@ t_deliver_when_banned(_) ->
     snabbkaffe:start_trace(),
     {ok, SubRef} =
         snabbkaffe:subscribe(
-            ?match_event(#{?snk_kind := ignore_retained_message_deliver}),
+            ?match_event(#{?snk_kind := ignore_retained_message_due_to_banned}),
             _NEvents = 3,
             _Timeout = 10000,
             0
@@ -698,7 +835,7 @@ t_deliver_when_banned(_) ->
     {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
 
     {ok, Trace} = snabbkaffe:receive_events(SubRef),
-    ?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
+    ?assertEqual(3, length(?of_kind(ignore_retained_message_due_to_banned, Trace))),
     snabbkaffe:stop(),
     emqx_banned:delete(Who),
     {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
@@ -802,7 +939,10 @@ test_retain_while_reindexing(C, Deadline) ->
     end.
 
 receive_messages(Count) ->
-    receive_messages(Count, []).
+    lists:reverse(
+        receive_messages(Count, [])
+    ).
+
 receive_messages(0, Msgs) ->
     Msgs;
 receive_messages(Count, Msgs) ->
@@ -811,7 +951,7 @@ receive_messages(Count, Msgs) ->
             ct:log("Msg: ~p ~n", [Msg]),
             receive_messages(Count - 1, [Msg | Msgs]);
         Other ->
-            ct:log("Other Msg: ~p~n", [Other]),
+            ct:print("Other Msg: ~p~n", [Other]),
             receive_messages(Count, Msgs)
     after 2000 ->
         Msgs
@@ -914,13 +1054,14 @@ setup_slow_delivery() ->
             }
     }).
 
-restore_delivery() ->
+reset_delivery_rate_to_default() ->
     emqx_limiter_server:del_bucket(emqx_retainer, internal),
     emqx_retainer:update_config(#{
+        <<"delivery_rate">> => <<"1000/s">>,
         <<"flow_control">> =>
             #{
-                <<"batch_read_number">> => 1,
-                <<"batch_deliver_number">> => 1
+                <<"batch_read_number">> => 0,
+                <<"batch_deliver_number">> => 0
             }
     }).
 

+ 1 - 1
apps/emqx_utils/src/emqx_utils.app.src

@@ -2,7 +2,7 @@
 {application, emqx_utils, [
     {description, "Miscellaneous utilities for EMQX apps"},
     % strict semver, bump manually!
-    {vsn, "5.3.0"},
+    {vsn, "5.4.0"},
     {modules, [
         emqx_utils,
         emqx_utils_api,

+ 21 - 0
apps/emqx_utils/src/emqx_utils.erl

@@ -43,6 +43,7 @@
     proc_stats/0,
     proc_stats/1,
     rand_seed/0,
+    rand_id/1,
     now_to_secs/1,
     now_to_ms/1,
     index_of/2,
@@ -901,6 +902,26 @@ is_restricted_str(String) ->
     RE = <<"^[A-Za-z0-9]+[A-Za-z0-9-_]*$">>,
     match =:= re:run(String, RE, [{capture, none}]).
 
+%% @doc Generate random, printable bytes as an ID.
+%% The first byte is ensured to be a-z or A-Z.
+rand_id(Len) when Len > 0 ->
+    iolist_to_binary([rand_first_char(), rand_chars(Len - 1)]).
+
+rand_first_char() ->
+    base62(rand:uniform(52) - 1).
+
+rand_chars(0) ->
+    [];
+rand_chars(N) ->
+    [rand_char() | rand_chars(N - 1)].
+
+rand_char() ->
+    base62(rand:uniform(62) - 1).
+
+base62(I) when I < 26 -> $A + I;
+base62(I) when I < 52 -> $a + I - 26;
+base62(I) -> $0 + I - 52.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 

+ 2 - 0
changes/ce/fix-13702.en.md

@@ -0,0 +1,2 @@
+Clean up the corresponding exclusive subscriptions when a node goes down.
+

+ 1 - 0
changes/ce/fix-13708.en.md

@@ -0,0 +1 @@
+Fixed an issue which may cause shared subscription 'sticky' strategy to degrade to 'random'.

+ 1 - 0
changes/ce/fix-13733.en.md

@@ -0,0 +1 @@
+Made `cacertfile` optional when configuring https listener from `emqx ctl conf load` command.

+ 1 - 0
changes/ce/fix-13742.en.md

@@ -0,0 +1 @@
+Fixed when subscribing with `+` as the first level, or `#` as a wildcard, retained messages with topics starting with `$` are incorrectly received.

+ 1 - 0
changes/ce/fix-13754.en.md

@@ -0,0 +1 @@
+Fixed an issue when websocket connection would break consistently on its own.

+ 0 - 0
changes/ce/fix-13756.en.md


Некоторые файлы не были показаны из-за большого количества измененных файлов