Browse Source

feat(buffer_worker): refactor buffer/resource workers to always use queue

This makes the buffer/resource workers always use `replayq` for
queuing, along with collecting multiple requests in a single call.
This is done to avoid long message queues for the buffer workers and
rely on `replayq`'s capabilities of offloading to disk and detecting
overflow.

Also, this deprecates the `enable_batch` and `enable_queue` resource
creation options, as: i) queuing is now always enables; ii) batch_size
> 1 <=> batch_enabled.  The corresponding metric
`dropped.queue_not_enabled` is dropped, along with `batching`.  The
batching is too ephemeral, especially considering a default batch time
of 20 ms, and is not shown in the dashboard, so it was removed.
Thales Macedo Garitezi 3 years ago
parent
commit
fd360ac6c0
28 changed files with 1008 additions and 504 deletions
  1. 1 0
      Makefile
  2. 5 2
      apps/emqx/test/emqx_common_test_helpers.erl
  3. 1 22
      apps/emqx_bridge/i18n/emqx_bridge_schema.conf
  4. 1 9
      apps/emqx_bridge/include/emqx_bridge.hrl
  5. 3 11
      apps/emqx_bridge/src/emqx_bridge_api.erl
  6. 0 1
      apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl
  7. 1 4
      apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
  8. 0 1
      apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
  9. 2 2
      apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
  10. 1 3
      apps/emqx_resource/include/emqx_resource.hrl
  11. 0 1
      apps/emqx_resource/src/emqx_resource_manager.erl
  12. 0 58
      apps/emqx_resource/src/emqx_resource_metrics.erl
  13. 401 218
      apps/emqx_resource/src/emqx_resource_worker.erl
  14. 2 0
      apps/emqx_resource/src/schema/emqx_resource_schema.erl
  15. 38 17
      apps/emqx_resource/test/emqx_connector_demo.erl
  16. 484 55
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  17. 1 0
      changes/v5.0.14/feat-9642.en.md
  18. 1 0
      changes/v5.0.14/feat-9642.zh.md
  19. 3 0
      changes/v5.0.14/fix-9642.en.md
  20. 3 0
      changes/v5.0.14/fix-9642.zh.md
  21. 0 2
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl
  22. 1 2
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl
  23. 0 4
      lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl
  24. 23 60
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
  25. 16 15
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl
  26. 17 9
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl
  27. 2 8
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl
  28. 1 0
      scripts/ct/run.sh

+ 1 - 0
Makefile

@@ -88,6 +88,7 @@ define gen-app-ct-target
 $1-ct: $(REBAR)
 	@$(SCRIPTS)/pre-compile.sh $(PROFILE)
 	@ENABLE_COVER_COMPILE=1 $(REBAR) ct -c -v \
+	        --readable=$(CT_READABLE) \
 		--name $(CT_NODE_NAME) \
 		--cover_export_name $(CT_COVER_EXPORT_PREFIX)-$(subst /,-,$1) \
 		--suite $(shell $(SCRIPTS)/find-suites.sh $1)

+ 5 - 2
apps/emqx/test/emqx_common_test_helpers.erl

@@ -447,8 +447,11 @@ is_all_tcp_servers_available(Servers) ->
             is_tcp_server_available(Host, Port)
         end,
     case lists:partition(Fun, Servers) of
-        {_, []} -> true;
-        {_, Unavail} -> ct:print("Unavailable servers: ~p", [Unavail])
+        {_, []} ->
+            true;
+        {_, Unavail} ->
+            ct:print("Unavailable servers: ~p", [Unavail]),
+            false
     end.
 
 -spec is_tcp_server_available(

+ 1 - 22
apps/emqx_bridge/i18n/emqx_bridge_schema.conf

@@ -78,17 +78,6 @@ emqx_bridge_schema {
                           }
                   }
 
-    metric_batching {
-                   desc {
-                         en: """Count of messages that are currently accumulated in memory waiting for sending in one batch."""
-                         zh: """当前积压在内存里,等待批量发送的消息个数"""
-                        }
-                   label: {
-                           en: "Batched"
-                           zh: "等待批量发送"
-                          }
-                  }
-
     metric_dropped {
                    desc {
                          en: """Count of messages dropped."""
@@ -120,16 +109,6 @@ emqx_bridge_schema {
                            zh: "队列已满被丢弃"
                           }
                   }
-    metric_dropped_queue_not_enabled {
-                   desc {
-                         en: """Count of messages dropped due to the queue is not enabled."""
-                         zh: """因为队列未启用被丢弃的消息个数。"""
-                        }
-                   label: {
-                           en: "Dropped Queue Disabled"
-                           zh: "队列未启用被丢弃"
-                          }
-                  }
     metric_dropped_resource_not_found {
                    desc {
                          en: """Count of messages dropped due to the resource is not found."""
@@ -193,7 +172,7 @@ emqx_bridge_schema {
                           }
                   }
 
-    metric_sent_inflight {
+    metric_inflight {
                    desc {
                          en: """Count of messages that were sent asynchronously but ACKs are not yet received."""
                          zh: """已异步地发送但没有收到 ACK 的消息个数。"""

+ 1 - 9
apps/emqx_bridge/include/emqx_bridge.hrl

@@ -16,16 +16,14 @@
 
 -define(EMPTY_METRICS,
     ?METRICS(
-        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
     )
 ).
 
 -define(METRICS(
-    Batched,
     Dropped,
     DroppedOther,
     DroppedQueueFull,
-    DroppedQueueNotEnabled,
     DroppedResourceNotFound,
     DroppedResourceStopped,
     Matched,
@@ -40,11 +38,9 @@
     Rcvd
 ),
     #{
-        'batching' => Batched,
         'dropped' => Dropped,
         'dropped.other' => DroppedOther,
         'dropped.queue_full' => DroppedQueueFull,
-        'dropped.queue_not_enabled' => DroppedQueueNotEnabled,
         'dropped.resource_not_found' => DroppedResourceNotFound,
         'dropped.resource_stopped' => DroppedResourceStopped,
         'matched' => Matched,
@@ -61,11 +57,9 @@
 ).
 
 -define(metrics(
-    Batched,
     Dropped,
     DroppedOther,
     DroppedQueueFull,
-    DroppedQueueNotEnabled,
     DroppedResourceNotFound,
     DroppedResourceStopped,
     Matched,
@@ -80,11 +74,9 @@
     Rcvd
 ),
     #{
-        'batching' := Batched,
         'dropped' := Dropped,
         'dropped.other' := DroppedOther,
         'dropped.queue_full' := DroppedQueueFull,
-        'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
         'dropped.resource_not_found' := DroppedResourceNotFound,
         'dropped.resource_stopped' := DroppedResourceStopped,
         'matched' := Matched,

+ 3 - 11
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -207,7 +207,6 @@ info_example_basic(webhook) ->
             auto_restart_interval => 15000,
             query_mode => async,
             async_inflight_window => 100,
-            enable_queue => false,
             max_queue_bytes => 100 * 1024 * 1024
         }
     };
@@ -233,7 +232,6 @@ mqtt_main_example() ->
             health_check_interval => <<"15s">>,
             auto_restart_interval => <<"60s">>,
             query_mode => sync,
-            enable_queue => false,
             max_queue_bytes => 100 * 1024 * 1024
         },
         ssl => #{
@@ -634,11 +632,11 @@ aggregate_metrics(AllMetrics) ->
         fun(
             #{
                 metrics := ?metrics(
-                    M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
+                    M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15
                 )
             },
             ?metrics(
-                N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
+                N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15
             )
         ) ->
             ?METRICS(
@@ -656,9 +654,7 @@ aggregate_metrics(AllMetrics) ->
                 M12 + N12,
                 M13 + N13,
                 M14 + N14,
-                M15 + N15,
-                M16 + N16,
-                M17 + N17
+                M15 + N15
             )
         end,
         InitMetrics,
@@ -691,7 +687,6 @@ format_metrics(#{
         'dropped' := Dropped,
         'dropped.other' := DroppedOther,
         'dropped.queue_full' := DroppedQueueFull,
-        'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
         'dropped.resource_not_found' := DroppedResourceNotFound,
         'dropped.resource_stopped' := DroppedResourceStopped,
         'matched' := Matched,
@@ -705,15 +700,12 @@ format_metrics(#{
         matched := #{current := Rate, last5m := Rate5m, max := RateMax}
     }
 }) ->
-    Batched = maps:get('batching', Gauges, 0),
     Queued = maps:get('queuing', Gauges, 0),
     SentInflight = maps:get('inflight', Gauges, 0),
     ?METRICS(
-        Batched,
         Dropped,
         DroppedOther,
         DroppedQueueFull,
-        DroppedQueueNotEnabled,
         DroppedResourceNotFound,
         DroppedResourceStopped,
         Matched,

+ 0 - 1
apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl

@@ -82,7 +82,6 @@ default_resource_opts() ->
     #{
         <<"async_inflight_window">> => 100,
         <<"auto_restart_interval">> => <<"60s">>,
-        <<"enable_queue">> => false,
         <<"health_check_interval">> => <<"15s">>,
         <<"max_queue_bytes">> => <<"1GB">>,
         <<"query_mode">> => <<"sync">>,

+ 1 - 4
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -128,12 +128,9 @@ fields(bridges) ->
     ] ++ ee_fields_bridges();
 fields("metrics") ->
     [
-        {"batching", mk(integer(), #{desc => ?DESC("metric_batching")})},
         {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
         {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
         {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
-        {"dropped.queue_not_enabled",
-            mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})},
         {"dropped.resource_not_found",
             mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})},
         {"dropped.resource_stopped",
@@ -142,7 +139,7 @@ fields("metrics") ->
         {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})},
         {"retried", mk(integer(), #{desc => ?DESC("metric_retried")})},
         {"failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
-        {"inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})},
+        {"inflight", mk(integer(), #{desc => ?DESC("metric_inflight")})},
         {"success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},
         {"rate", mk(float(), #{desc => ?DESC("metric_rate")})},
         {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})},

+ 0 - 1
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -662,7 +662,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
             <<"reconnect_interval">> => <<"1s">>,
             <<"resource_opts">> => #{
                 <<"worker_pool_size">> => 2,
-                <<"enable_queue">> => true,
                 <<"query_mode">> => <<"sync">>,
                 %% to make it check the healthy quickly
                 <<"health_check_interval">> => <<"0.5s">>

+ 2 - 2
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -126,8 +126,8 @@ When disabled the messages are buffered in RAM only."""
 
   batch_size {
     desc {
-      en: """Maximum batch count."""
-      zh: """批量请求大小。"""
+      en: """Maximum batch count. If equal to 1, there's effectively no batching."""
+      zh: """批量请求大小。如果等于1,实际上就没有批处理。"""
     }
     label {
       en: """Batch size"""

+ 1 - 3
apps/emqx_resource/include/emqx_resource.hrl

@@ -64,10 +64,8 @@
     %% If the resource disconnected, we can set to retry starting the resource
     %% periodically.
     auto_restart_interval => pos_integer(),
-    enable_batch => boolean(),
     batch_size => pos_integer(),
     batch_time => pos_integer(),
-    enable_queue => boolean(),
     max_queue_bytes => pos_integer(),
     query_mode => query_mode(),
     resume_interval => pos_integer(),
@@ -90,7 +88,7 @@
 -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>).
 
 %% count
--define(DEFAULT_BATCH_SIZE, 100).
+-define(DEFAULT_BATCH_SIZE, 1).
 
 %% milliseconds
 -define(DEFAULT_BATCH_TIME, 20).

+ 0 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -136,7 +136,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
             'success',
             'failed',
             'dropped',
-            'dropped.queue_not_enabled',
             'dropped.queue_full',
             'dropped.resource_not_found',
             'dropped.resource_stopped',

+ 0 - 58
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -24,9 +24,6 @@
 ]).
 
 -export([
-    batching_set/3,
-    batching_shift/3,
-    batching_get/1,
     inflight_set/3,
     inflight_get/1,
     queuing_set/3,
@@ -40,9 +37,6 @@
     dropped_queue_full_inc/1,
     dropped_queue_full_inc/2,
     dropped_queue_full_get/1,
-    dropped_queue_not_enabled_inc/1,
-    dropped_queue_not_enabled_inc/2,
-    dropped_queue_not_enabled_get/1,
     dropped_resource_not_found_inc/1,
     dropped_resource_not_found_inc/2,
     dropped_resource_not_found_get/1,
@@ -80,10 +74,8 @@ events() ->
     [
         [?TELEMETRY_PREFIX, Event]
      || Event <- [
-            batching,
             dropped_other,
             dropped_queue_full,
-            dropped_queue_not_enabled,
             dropped_resource_not_found,
             dropped_resource_stopped,
             failed,
@@ -125,9 +117,6 @@ handle_telemetry_event(
         dropped_queue_full ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val);
-        dropped_queue_not_enabled ->
-            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
-            emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val);
         dropped_resource_not_found ->
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
             emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val);
@@ -160,8 +149,6 @@ handle_telemetry_event(
     _HandlerConfig
 ) ->
     case Event of
-        batching ->
-            emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
         inflight ->
             emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
         queuing ->
@@ -169,45 +156,12 @@ handle_telemetry_event(
         _ ->
             ok
     end;
-handle_telemetry_event(
-    [?TELEMETRY_PREFIX, Event],
-    _Measurements = #{gauge_shift := Val},
-    _Metadata = #{resource_id := ID, worker_id := WorkerID},
-    _HandlerConfig
-) ->
-    case Event of
-        batching ->
-            emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val);
-        _ ->
-            ok
-    end;
 handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) ->
     ok.
 
 %% Gauges (value can go both up and down):
 %% --------------------------------------
 
-%% @doc Count of messages that are currently accumulated in memory waiting for
-%% being sent in one batch
-batching_set(ID, WorkerID, Val) ->
-    telemetry:execute(
-        [?TELEMETRY_PREFIX, batching],
-        #{gauge_set => Val},
-        #{resource_id => ID, worker_id => WorkerID}
-    ).
-
-batching_shift(_ID, _WorkerID = undefined, _Val) ->
-    ok;
-batching_shift(ID, WorkerID, Val) ->
-    telemetry:execute(
-        [?TELEMETRY_PREFIX, batching],
-        #{gauge_shift => Val},
-        #{resource_id => ID, worker_id => WorkerID}
-    ).
-
-batching_get(ID) ->
-    emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching').
-
 %% @doc Count of batches of messages that are currently
 %% queuing. [Gauge]
 queuing_set(ID, WorkerID, Val) ->
@@ -269,18 +223,6 @@ dropped_queue_full_inc(ID, Val) ->
 dropped_queue_full_get(ID) ->
     emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full').
 
-%% @doc Count of messages dropped because the queue was not enabled
-dropped_queue_not_enabled_inc(ID) ->
-    dropped_queue_not_enabled_inc(ID, 1).
-
-dropped_queue_not_enabled_inc(ID, Val) ->
-    telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{
-        resource_id => ID
-    }).
-
-dropped_queue_not_enabled_get(ID) ->
-    emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled').
-
 %% @doc Count of messages dropped because the resource was not found
 dropped_resource_not_found_inc(ID) ->
     dropped_resource_not_found_inc(ID, 1).

+ 401 - 218
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -54,8 +54,12 @@
 
 -export([reply_after_query/7, batch_reply_after_query/7]).
 
+-elvis([{elvis_style, dont_repeat_yourself, disable}]).
+
 -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
 
+-define(COLLECT_REQ_LIMIT, 1000).
+-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
 -define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}).
 -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
 -define(EXPAND(RESULT, BATCH), [
@@ -64,12 +68,23 @@
 ]).
 
 -type id() :: binary().
--type query() :: {query, from(), request()}.
+-type index() :: pos_integer().
+-type query() :: {query, request(), query_opts()}.
+-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean()).
 -type request() :: term().
--type from() :: pid() | reply_fun().
-
--callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) ->
-    {{from(), result()}, NewCbState :: term()}.
+-type from() :: pid() | reply_fun() | request_from().
+-type request_from() :: undefined | gen_statem:from().
+-type state() :: blocked | running.
+-type data() :: #{
+    id => id(),
+    index => index(),
+    name => atom(),
+    batch_size => pos_integer(),
+    batch_time => timer:time(),
+    queue => replayq:q(),
+    resume_interval => timer:time(),
+    tref => undefined | timer:tref()
+}.
 
 callback_mode() -> [state_functions, state_enter].
 
@@ -80,11 +95,13 @@ start_link(Id, Index, Opts) ->
 sync_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
+    emqx_resource_metrics:matched_inc(Id),
     pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
+    emqx_resource_metrics:matched_inc(Id),
     pick_cast(Id, PickKey, {query, Request, Opts}).
 
 %% simple query the resource without batching and queuing messages.
@@ -97,7 +114,9 @@ simple_sync_query(Id, Request) ->
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
     Index = undefined,
-    Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), #{}),
+    QueryOpts = #{},
+    emqx_resource_metrics:matched_inc(Id),
+    Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), QueryOpts),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
@@ -110,7 +129,9 @@ simple_async_query(Id, Request, ReplyFun) ->
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
     Index = undefined,
-    Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}),
+    QueryOpts = #{},
+    emqx_resource_metrics:matched_inc(Id),
+    Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), QueryOpts),
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
@@ -126,6 +147,7 @@ block(ServerRef, Query) ->
 resume(ServerRef) ->
     gen_statem:cast(ServerRef, resume).
 
+-spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()).
 init({Id, Index, Opts}) ->
     process_flag(trap_exit, true),
     true = gproc_pool:connect_worker(Id, {Id, Index}),
@@ -134,24 +156,19 @@ init({Id, Index, Opts}) ->
     SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
     TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
     SegBytes = min(SegBytes0, TotalBytes),
-    Queue =
-        case maps:get(enable_queue, Opts, false) of
-            true ->
-                replayq:open(#{
-                    dir => disk_queue_dir(Id, Index),
-                    marshaller => fun ?MODULE:queue_item_marshaller/1,
-                    max_total_bytes => TotalBytes,
-                    %% we don't want to retain the queue after
-                    %% resource restarts.
-                    offload => true,
-                    seg_bytes => SegBytes,
-                    sizer => fun ?MODULE:estimate_size/1
-                });
-            false ->
-                undefined
-        end,
+    QueueOpts =
+        #{
+            dir => disk_queue_dir(Id, Index),
+            marshaller => fun ?MODULE:queue_item_marshaller/1,
+            max_total_bytes => TotalBytes,
+            %% we don't want to retain the queue after
+            %% resource restarts.
+            offload => true,
+            seg_bytes => SegBytes,
+            sizer => fun ?MODULE:estimate_size/1
+        },
+    Queue = replayq:open(QueueOpts),
     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
-    emqx_resource_metrics:batching_set(Id, Index, 0),
     emqx_resource_metrics:inflight_set(Id, Index, 0),
     InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
     ok = inflight_new(Name, InfltWinSZ, Id, Index),
@@ -160,19 +177,17 @@ init({Id, Index, Opts}) ->
         id => Id,
         index => Index,
         name => Name,
-        enable_batch => maps:get(enable_batch, Opts, false),
         batch_size => BatchSize,
         batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
         queue => Queue,
         resume_interval => maps:get(resume_interval, Opts, HCItvl),
-        acc => [],
-        acc_left => BatchSize,
         tref => undefined
     },
     {ok, blocked, St, {next_event, cast, resume}}.
 
-running(enter, _, _St) ->
-    keep_state_and_data;
+running(enter, _, St) ->
+    ?tp(resource_worker_enter_running, #{}),
+    maybe_flush(St);
 running(cast, resume, _St) ->
     keep_state_and_data;
 running(cast, block, St) ->
@@ -182,22 +197,22 @@ running(
 ) when
     is_list(Batch)
 ->
-    Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]),
+    Q1 = append_queue(Id, Index, Q, Batch),
     {next_state, blocked, St#{queue := Q1}};
-running({call, From}, {query, Request, _Opts}, St) ->
-    query_or_acc(From, Request, St);
-running(cast, {query, Request, Opts}, St) ->
-    ReplyFun = maps:get(async_reply_fun, Opts, undefined),
-    query_or_acc(ReplyFun, Request, St);
+running(info, ?SEND_REQ(_From, _Req) = Request0, Data) ->
+    handle_query_requests(Request0, Data);
 running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
     flush(St#{tref := undefined});
+running(internal, flush, St) ->
+    flush(St);
 running(info, {flush, _Ref}, _St) ->
     keep_state_and_data;
 running(info, Info, _St) ->
-    ?SLOG(error, #{msg => unexpected_msg, info => Info}),
+    ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}),
     keep_state_and_data.
 
 blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
+    ?tp(resource_worker_enter_blocked, #{}),
     {keep_state_and_data, {state_timeout, ResumeT, resume}};
 blocked(cast, block, _St) ->
     keep_state_and_data;
@@ -206,33 +221,37 @@ blocked(
 ) when
     is_list(Batch)
 ->
-    Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]),
+    Q1 = append_queue(Id, Index, Q, Batch),
     {keep_state, St#{queue := Q1}};
 blocked(cast, resume, St) ->
     do_resume(St);
 blocked(state_timeout, resume, St) ->
     do_resume(St);
-blocked({call, From}, {query, Request, _Opts}, #{id := Id, index := Index, queue := Q} = St) ->
-    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
-    _ = reply_caller(Id, ?REPLY(From, Request, false, Error)),
-    {keep_state, St#{
-        queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(From, Request, false))])
-    }};
-blocked(cast, {query, Request, Opts}, #{id := Id, index := Index, queue := Q} = St) ->
-    ReplyFun = maps:get(async_reply_fun, Opts, undefined),
+blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) ->
+    #{
+        id := Id,
+        index := Index,
+        queue := Q
+    } = Data0,
+    From =
+        case ReqFrom of
+            undefined -> maps:get(async_reply_fun, Opts, undefined);
+            From1 -> From1
+        end,
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
-    _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)),
-    {keep_state, St#{
-        queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))])
-    }}.
+    HasBeenSent = false,
+    _ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)),
+    NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]),
+    Data = Data0#{queue := NewQ},
+    {keep_state, Data};
+blocked(info, {flush, _Ref}, _Data) ->
+    keep_state_and_data;
+blocked(info, Info, _Data) ->
+    ?SLOG(error, #{msg => unexpected_msg, state => blocked, info => Info}),
+    keep_state_and_data.
 
 terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
-    GaugeFns =
-        [
-            fun emqx_resource_metrics:batching_set/3,
-            fun emqx_resource_metrics:inflight_set/3
-        ],
-    lists:foreach(fun(Fn) -> Fn(Id, Index, 0) end, GaugeFns),
+    emqx_resource_metrics:inflight_set(Id, Index, 0),
     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
     gproc_pool:disconnect_worker(Id, {Id, Index}).
 
@@ -255,43 +274,71 @@ code_change(_OldVsn, State, _Extra) ->
 ).
 
 pick_call(Id, Key, Query, Timeout) ->
-    ?PICK(Id, Key, gen_statem:call(Pid, Query, {clean_timeout, Timeout})).
+    ?PICK(Id, Key, begin
+        Caller = self(),
+        MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
+        From = {Caller, MRef},
+        erlang:send(Pid, ?SEND_REQ(From, Query)),
+        receive
+            {MRef, Response} ->
+                erlang:demonitor(MRef, [flush]),
+                Response;
+            {'DOWN', MRef, process, Pid, Reason} ->
+                error({worker_down, Reason})
+        after Timeout ->
+            erlang:demonitor(MRef, [flush]),
+            receive
+                {MRef, Response} ->
+                    Response
+            after 0 ->
+                error(timeout)
+            end
+        end
+    end).
 
 pick_cast(Id, Key, Query) ->
-    ?PICK(Id, Key, gen_statem:cast(Pid, Query)).
+    ?PICK(Id, Key, begin
+        From = undefined,
+        erlang:send(Pid, ?SEND_REQ(From, Query)),
+        ok
+    end).
 
-do_resume(#{id := Id, name := Name} = St) ->
+do_resume(#{id := Id, name := Name} = Data) ->
     case inflight_get_first(Name) of
         empty ->
-            retry_queue(St);
+            retry_queue(Data);
         {Ref, FirstQuery} ->
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
-            retry_inflight_sync(Id, Ref, FirstQuery, Name, St)
+            retry_inflight_sync(Id, Ref, FirstQuery, Name, Data)
     end.
 
-retry_queue(#{queue := undefined} = St) ->
-    {next_state, running, St};
 retry_queue(
     #{
-        queue := Q,
+        queue := Q0,
         id := Id,
         index := Index,
-        enable_batch := false,
+        batch_size := 1,
+        name := Name,
         resume_interval := ResumeT
-    } = St
+    } = Data0
 ) ->
-    case get_first_n_from_queue(Q, 1) of
-        [] ->
-            {next_state, running, St};
-        [?QUERY(_, Request, HasSent) = Query] ->
-            QueryOpts = #{inflight_name => maps:get(name, St)},
+    %% no batching
+    case get_first_n_from_queue(Q0, 1) of
+        empty ->
+            {next_state, running, Data0};
+        {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
+            QueryOpts = #{inflight_name => Name},
             Result = call_query(configured, Id, Index, Query, QueryOpts),
-            case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of
+            Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
+            case reply_caller(Id, Reply) of
                 true ->
-                    {keep_state, St, {state_timeout, ResumeT, resume}};
+                    {keep_state, Data0, {state_timeout, ResumeT, resume}};
                 false ->
-                    retry_queue(St#{queue := drop_head(Q, Id, Index)})
+                    ok = replayq:ack(Q1, QAckRef),
+                    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+                    Data = Data0#{queue := Q1},
+                    retry_queue(Data)
             end
     end;
 retry_queue(
@@ -299,101 +346,202 @@ retry_queue(
         queue := Q,
         id := Id,
         index := Index,
-        enable_batch := true,
         batch_size := BatchSize,
+        name := Name,
         resume_interval := ResumeT
-    } = St
+    } = Data0
 ) ->
+    %% batching
     case get_first_n_from_queue(Q, BatchSize) of
-        [] ->
-            {next_state, running, St};
-        Batch0 ->
-            QueryOpts = #{inflight_name => maps:get(name, St)},
+        empty ->
+            {next_state, running, Data0};
+        {Q1, QAckRef, Batch0} ->
+            QueryOpts = #{inflight_name => Name},
             Result = call_query(configured, Id, Index, Batch0, QueryOpts),
             %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
             %% we now change the 'from' field to 'undefined' so it will not reply the caller again.
-            Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0],
+            Batch = [
+                ?QUERY(undefined, Request, HasBeenSent0)
+             || ?QUERY(_, Request, HasBeenSent0) <- Batch0
+            ],
             case batch_reply_caller(Id, Result, Batch) of
                 true ->
-                    {keep_state, St, {state_timeout, ResumeT, resume}};
+                    ?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}),
+                    {keep_state, Data0, {state_timeout, ResumeT, resume}};
                 false ->
-                    retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id, Index)})
+                    ?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}),
+                    ok = replayq:ack(Q1, QAckRef),
+                    emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+                    Data = Data0#{queue := Q1},
+                    retry_queue(Data)
             end
     end.
 
 retry_inflight_sync(
     Id,
     Ref,
-    ?QUERY(_, _, HasSent) = Query,
+    QueryOrBatch,
     Name,
-    #{index := Index, resume_interval := ResumeT} = St0
+    #{index := Index, resume_interval := ResumeT} = Data0
 ) ->
-    Result = call_query(sync, Id, Index, Query, #{}),
-    case handle_query_result(Id, Result, HasSent, false) of
-        %% Send failed because resource down
+    QueryOpts = #{},
+    %% if we are retrying an inflight query, it has been sent
+    HasBeenSent = true,
+    Result = call_query(sync, Id, Index, QueryOrBatch, QueryOpts),
+    BlockWorker = false,
+    case handle_query_result(Id, Result, HasBeenSent, BlockWorker) of
+        %% Send failed because resource is down
         true ->
-            {keep_state, St0, {state_timeout, ResumeT, resume}};
+            {keep_state, Data0, {state_timeout, ResumeT, resume}};
         %% Send ok or failed but the resource is working
         false ->
             inflight_drop(Name, Ref, Id, Index),
-            do_resume(St0)
+            do_resume(Data0)
     end.
 
-query_or_acc(
-    From,
-    Request,
+%% Called during the `running' state only.
+-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data().
+handle_query_requests(Request0, Data0) ->
     #{
-        enable_batch := true,
-        acc := Acc,
-        acc_left := Left,
+        id := Id,
         index := Index,
-        id := Id
-    } = St0
-) ->
-    Acc1 = [?QUERY(From, Request, false) | Acc],
-    emqx_resource_metrics:batching_shift(Id, Index, 1),
-    St = St0#{acc := Acc1, acc_left := Left - 1},
-    case Left =< 1 of
-        true -> flush(St);
-        false -> {keep_state, ensure_flush_timer(St)}
-    end;
-query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, index := Index} = St) ->
-    QueryOpts = #{
-        inflight_name => maps:get(name, St)
-    },
-    Result = call_query(configured, Id, Index, ?QUERY(From, Request, false), QueryOpts),
-    case reply_caller(Id, ?REPLY(From, Request, false, Result)) of
+        queue := Q
+    } = Data0,
+    Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT),
+    QueueItems =
+        lists:map(
+            fun
+                (?SEND_REQ(undefined = _From, {query, Req, Opts})) ->
+                    ReplyFun = maps:get(async_reply_fun, Opts, undefined),
+                    HasBeenSent = false,
+                    ?QUERY(ReplyFun, Req, HasBeenSent);
+                (?SEND_REQ(From, {query, Req, _Opts})) ->
+                    HasBeenSent = false,
+                    ?QUERY(From, Req, HasBeenSent)
+            end,
+            Requests
+        ),
+    NewQ = append_queue(Id, Index, Q, QueueItems),
+    Data = Data0#{queue := NewQ},
+    maybe_flush(Data).
+
+maybe_flush(Data) ->
+    #{
+        batch_size := BatchSize,
+        queue := Q
+    } = Data,
+    QueueCount = queue_count(Q),
+    case QueueCount >= BatchSize of
         true ->
-            Query = ?QUERY(From, Request, false),
-            {next_state, blocked, St#{queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query)])}};
+            flush(Data);
         false ->
-            {keep_state, St}
+            {keep_state, ensure_flush_timer(Data)}
+    end.
+
+%% Called during the `running' state only.
+-spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
+flush(Data0) ->
+    #{
+        batch_size := BatchSize,
+        queue := Q0
+    } = Data0,
+    case replayq:count(Q0) of
+        0 ->
+            Data = cancel_flush_timer(Data0),
+            {keep_state, Data};
+        _ ->
+            {Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}),
+            Batch = [Item || ?Q_ITEM(Item) <- Batch0],
+            IsBatch = BatchSize =/= 1,
+            do_flush(Data0, #{
+                new_queue => Q1,
+                is_batch => IsBatch,
+                batch => Batch,
+                ack_ref => QAckRef
+            })
     end.
 
-flush(#{acc := []} = St) ->
-    {keep_state, St};
-flush(
+-spec do_flush(data(), #{
+    is_batch := boolean(),
+    batch := [?QUERY(from(), request(), boolean())],
+    ack_ref := replayq:ack_ref()
+}) ->
+    gen_statem:event_handler_result(state(), data()).
+do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
     #{
         id := Id,
         index := Index,
-        acc := Batch0,
-        batch_size := Size,
-        queue := Q0
-    } = St
-) ->
-    Batch = lists:reverse(Batch0),
-    QueryOpts = #{
-        inflight_name => maps:get(name, St)
-    },
-    emqx_resource_metrics:batching_shift(Id, Index, -length(Batch)),
+        name := Name
+    } = Data0,
+    %% unwrap when not batching (i.e., batch size == 1)
+    [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
+    QueryOpts = #{inflight_name => Name},
+    Result = call_query(configured, Id, Index, Request, QueryOpts),
+    IsAsync = is_async(Id),
+    Data1 = cancel_flush_timer(Data0),
+    Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
+    case {reply_caller(Id, Reply), IsAsync} of
+        %% failed and is not async; keep the request in the queue to
+        %% be retried
+        {true, false} ->
+            {next_state, blocked, Data1};
+        %% failed and is async; remove the request from the queue, as
+        %% it is already in inflight table
+        {true, true} ->
+            ok = replayq:ack(Q1, QAckRef),
+            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+            Data = Data1#{queue := Q1},
+            {next_state, blocked, Data};
+        %% success; just ack
+        {false, _} ->
+            ok = replayq:ack(Q1, QAckRef),
+            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+            Data2 = Data1#{queue := Q1},
+            case replayq:count(Q1) > 0 of
+                true ->
+                    {keep_state, Data2, [{next_event, internal, flush}]};
+                false ->
+                    {keep_state, Data2}
+            end
+    end;
+do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
+    #{
+        id := Id,
+        index := Index,
+        batch_size := BatchSize,
+        name := Name
+    } = Data0,
+    QueryOpts = #{inflight_name => Name},
     Result = call_query(configured, Id, Index, Batch, QueryOpts),
-    St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
-    case batch_reply_caller(Id, Result, Batch) of
-        true ->
-            Q1 = maybe_append_queue(Id, Index, Q0, [?Q_ITEM(Query) || Query <- Batch]),
-            {next_state, blocked, St1#{queue := Q1}};
-        false ->
-            {keep_state, St1}
+    IsAsync = is_async(Id),
+    Data1 = cancel_flush_timer(Data0),
+    case {batch_reply_caller(Id, Result, Batch), IsAsync} of
+        %% failed and is not async; keep the request in the queue to
+        %% be retried
+        {true, false} ->
+            {next_state, blocked, Data1};
+        %% failed and is async; remove the request from the queue, as
+        %% it is already in inflight table
+        {true, true} ->
+            ok = replayq:ack(Q1, QAckRef),
+            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+            Data = Data1#{queue := Q1},
+            {next_state, blocked, Data};
+        %% success; just ack
+        {false, _} ->
+            ok = replayq:ack(Q1, QAckRef),
+            emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+            CurrentCount = replayq:count(Q1),
+            Data2 = Data1#{queue := Q1},
+            case {CurrentCount > 0, CurrentCount >= BatchSize} of
+                {false, _} ->
+                    {keep_state, Data2};
+                {true, true} ->
+                    {keep_state, Data2, [{next_event, internal, flush}]};
+                {true, false} ->
+                    Data3 = ensure_flush_timer(Data2),
+                    {keep_state, Data3}
+            end
     end.
 
 batch_reply_caller(Id, BatchResult, Batch) ->
@@ -408,11 +556,12 @@ batch_reply_caller(Id, BatchResult, Batch) ->
     ).
 
 reply_caller(Id, Reply) ->
-    reply_caller(Id, Reply, false).
+    BlockWorker = false,
+    reply_caller(Id, Reply, BlockWorker).
 
-reply_caller(Id, ?REPLY(undefined, _, HasSent, Result), BlockWorker) ->
-    handle_query_result(Id, Result, HasSent, BlockWorker);
-reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasSent, Result), BlockWorker) when
+reply_caller(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) ->
+    handle_query_result(Id, Result, HasBeenSent, BlockWorker);
+reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when
     is_function(ReplyFun)
 ->
     _ =
@@ -420,52 +569,52 @@ reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasSent, Result), BlockWorker) when
             {async_return, _} -> no_reply_for_now;
             _ -> apply(ReplyFun, Args ++ [Result])
         end,
-    handle_query_result(Id, Result, HasSent, BlockWorker);
-reply_caller(Id, ?REPLY(From, _, HasSent, Result), BlockWorker) ->
+    handle_query_result(Id, Result, HasBeenSent, BlockWorker);
+reply_caller(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) ->
     gen_statem:reply(From, Result),
-    handle_query_result(Id, Result, HasSent, BlockWorker).
+    handle_query_result(Id, Result, HasBeenSent, BlockWorker).
 
-handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasSent, BlockWorker) ->
+handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) ->
     ?SLOG(error, #{msg => resource_exception, info => Msg}),
-    inc_sent_failed(Id, HasSent),
+    inc_sent_failed(Id, HasBeenSent),
     BlockWorker;
-handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when
+handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when
     NotWorking == not_connected; NotWorking == blocked
 ->
     true;
-handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) ->
+handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
     emqx_resource_metrics:dropped_resource_not_found_inc(Id),
     BlockWorker;
-handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) ->
+handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
     emqx_resource_metrics:dropped_resource_stopped_inc(Id),
     BlockWorker;
-handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) ->
+handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
     emqx_resource_metrics:dropped_other_inc(Id),
     BlockWorker;
-handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) ->
+handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) ->
     %% the message will be queued in replayq or inflight window,
     %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
     %% sent this message.
     ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
     true;
-handle_query_result(Id, {error, Reason}, HasSent, BlockWorker) ->
+handle_query_result(Id, {error, Reason}, HasBeenSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
-    inc_sent_failed(Id, HasSent),
+    inc_sent_failed(Id, HasBeenSent),
     BlockWorker;
-handle_query_result(_Id, {async_return, inflight_full}, _HasSent, _BlockWorker) ->
+handle_query_result(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) ->
     true;
-handle_query_result(Id, {async_return, {error, Msg}}, HasSent, BlockWorker) ->
+handle_query_result(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) ->
     ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
-    inc_sent_failed(Id, HasSent),
+    inc_sent_failed(Id, HasBeenSent),
     BlockWorker;
-handle_query_result(_Id, {async_return, ok}, _HasSent, BlockWorker) ->
+handle_query_result(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) ->
     BlockWorker;
-handle_query_result(Id, Result, HasSent, BlockWorker) ->
+handle_query_result(Id, Result, HasBeenSent, BlockWorker) ->
     assert_ok_result(Result),
-    inc_sent_success(Id, HasSent),
+    inc_sent_success(Id, HasBeenSent),
     BlockWorker.
 
 call_query(QM0, Id, Index, Query, QueryOpts) ->
@@ -478,13 +627,10 @@ call_query(QM0, Id, Index, Query, QueryOpts) ->
                     _ -> QM0
                 end,
             CM = maps:get(callback_mode, Data),
-            emqx_resource_metrics:matched_inc(Id),
             apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts);
         {ok, _Group, #{status := stopped}} ->
-            emqx_resource_metrics:matched_inc(Id),
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
         {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
-            emqx_resource_metrics:matched_inc(Id),
             ?RESOURCE_ERROR(not_connected, "resource not connected");
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
@@ -516,7 +662,7 @@ apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, Que
     Name = maps:get(inflight_name, QueryOpts, undefined),
     ?APPLY_RESOURCE(
         call_query_async,
-        case inflight_is_full(Name) of
+        case is_inflight_full(Name) of
             true ->
                 {async_return, inflight_full};
             false ->
@@ -538,26 +684,26 @@ apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, Que
     Name = maps:get(inflight_name, QueryOpts, undefined),
     ?APPLY_RESOURCE(
         call_batch_query_async,
-        case inflight_is_full(Name) of
+        case is_inflight_full(Name) of
             true ->
                 {async_return, inflight_full};
             false ->
                 ReplyFun = fun ?MODULE:batch_reply_after_query/7,
                 Ref = make_message_ref(),
-                Args = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
+                ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
                 Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
                 ok = inflight_append(Name, Ref, Batch, Id, Index),
-                Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt),
+                Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
                 {async_return, Result}
         end,
         Batch
     ).
 
-reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasSent), Result) ->
+reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent), Result) ->
     %% NOTE: 'inflight' is the count of messages that were sent async
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
-    case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of
+    case reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Result)) of
         true ->
             ?MODULE:block(Pid);
         false ->
@@ -576,7 +722,7 @@ batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
     end.
 
 drop_inflight_and_resume(Pid, Name, Ref, Id, Index) ->
-    case inflight_is_full(Name) of
+    case is_inflight_full(Name) of
         true ->
             inflight_drop(Name, Ref, Id, Index),
             ?MODULE:resume(Pid);
@@ -594,10 +740,8 @@ queue_item_marshaller(Bin) when is_binary(Bin) ->
 estimate_size(QItem) ->
     size(queue_item_marshaller(QItem)).
 
-maybe_append_queue(Id, _Index, undefined, _Items) ->
-    emqx_resource_metrics:dropped_queue_not_enabled_inc(Id),
-    undefined;
-maybe_append_queue(Id, Index, Q, Items) ->
+-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q().
+append_queue(Id, Index, Q, Queries) ->
     Q2 =
         case replayq:overflow(Q) of
             Overflow when Overflow =< 0 ->
@@ -611,42 +755,38 @@ maybe_append_queue(Id, Index, Q, Items) ->
                 ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
                 Q1
         end,
+    Items = [?Q_ITEM(X) || X <- Queries],
     Q3 = replayq:append(Q2, Items),
     emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)),
+    ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
     Q3.
 
+-spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
+    empty | {replayq:q(), replayq:ack_ref(), [?Q_ITEM(?QUERY(_From, _Request, _HasBeenSent))]}.
 get_first_n_from_queue(Q, N) ->
-    get_first_n_from_queue(Q, N, []).
-
-get_first_n_from_queue(_Q, 0, Acc) ->
-    lists:reverse(Acc);
-get_first_n_from_queue(Q, N, Acc) when N > 0 ->
-    case replayq:peek(Q) of
-        empty -> Acc;
-        ?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc])
+    case replayq:count(Q) of
+        0 ->
+            empty;
+        _ ->
+            {NewQ, QAckRef, Items} = replayq:pop(Q, #{count_limit => N}),
+            Queries = [X || ?Q_ITEM(X) <- Items],
+            {NewQ, QAckRef, Queries}
     end.
 
-drop_first_n_from_queue(Q, 0, _Id, _Index) ->
-    Q;
-drop_first_n_from_queue(Q, N, Id, Index) when N > 0 ->
-    drop_first_n_from_queue(drop_head(Q, Id, Index), N - 1, Id, Index).
-
-drop_head(Q, Id, Index) ->
-    {NewQ, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
-    ok = replayq:ack(NewQ, AckRef),
-    emqx_resource_metrics:queuing_set(Id, Index, replayq:count(NewQ)),
-    NewQ.
-
 %%==============================================================================
 %% the inflight queue for async query
--define(SIZE_REF, -1).
+-define(MAX_SIZE_REF, -1).
+-define(SIZE_REF, -2).
 inflight_new(Name, InfltWinSZ, Id, Index) ->
     _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
-    inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
+    inflight_append(Name, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
+    %% we use this counter because we might deal with batches as
+    %% elements.
+    inflight_append(Name, ?SIZE_REF, 0, Id, Index),
     ok.
 
 inflight_get_first(Name) ->
-    case ets:next(Name, ?SIZE_REF) of
+    case ets:next(Name, ?MAX_SIZE_REF) of
         '$end_of_table' ->
             empty;
         Ref ->
@@ -659,31 +799,42 @@ inflight_get_first(Name) ->
             end
     end.
 
-inflight_is_full(undefined) ->
+is_inflight_full(undefined) ->
     false;
-inflight_is_full(Name) ->
-    [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF),
-    Size = inflight_size(Name),
+is_inflight_full(Name) ->
+    [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?MAX_SIZE_REF),
+    %% we consider number of batches rather than number of messages
+    %% because one batch request may hold several messages.
+    Size = inflight_num_batches(Name),
     Size >= MaxSize.
 
-inflight_size(Name) ->
-    %% Note: we subtract 1 because there's a metadata row that hold
-    %% the maximum size value.
-    MetadataRowCount = 1,
+inflight_num_batches(Name) ->
+    %% Note: we subtract 2 because there're 2 metadata rows that hold
+    %% the maximum size value and the number of messages.
+    MetadataRowCount = 2,
     case ets:info(Name, size) of
         undefined -> 0;
         Size -> max(0, Size - MetadataRowCount)
     end.
 
+inflight_num_msgs(Name) ->
+    [{_, Size}] = ets:lookup(Name, ?SIZE_REF),
+    Size.
+
 inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
     ok;
-inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch, Id, Index) ->
-    ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
+inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
+    Batch = mark_as_sent(Batch0),
+    ets:insert(Name, {Ref, Batch}),
+    BatchSize = length(Batch),
+    ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
     ok;
-inflight_append(Name, Ref, ?QUERY(From, Req, _), Id, Index) ->
-    ets:insert(Name, {Ref, ?QUERY(From, Req, true)}),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
+inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
+    Query = mark_as_sent(Query0),
+    ets:insert(Name, {Ref, Query}),
+    ets:update_counter(Name, ?SIZE_REF, {2, 1}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
     ok;
 inflight_append(Name, Ref, Data, _Id, _Index) ->
     ets:insert(Name, {Ref, Data}),
@@ -694,20 +845,26 @@ inflight_append(Name, Ref, Data, _Id, _Index) ->
 inflight_drop(undefined, _, _Id, _Index) ->
     ok;
 inflight_drop(Name, Ref, Id, Index) ->
-    ets:delete(Name, Ref),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)),
+    Count =
+        case ets:take(Name, Ref) of
+            [{Ref, ?QUERY(_, _, _)}] -> 1;
+            [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
+            _ -> 0
+        end,
+    Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
     ok.
 
 %%==============================================================================
 
-inc_sent_failed(Id, _HasSent = true) ->
+inc_sent_failed(Id, _HasBeenSent = true) ->
     emqx_resource_metrics:retried_failed_inc(Id);
-inc_sent_failed(Id, _HasSent) ->
+inc_sent_failed(Id, _HasBeenSent) ->
     emqx_resource_metrics:failed_inc(Id).
 
-inc_sent_success(Id, _HasSent = true) ->
+inc_sent_success(Id, _HasBeenSent = true) ->
     emqx_resource_metrics:retried_success_inc(Id);
-inc_sent_success(Id, _HasSent) ->
+inc_sent_success(Id, _HasBeenSent) ->
     emqx_resource_metrics:success_inc(Id).
 
 call_mode(sync, _) -> sync;
@@ -728,8 +885,6 @@ assert_ok_result(R) when is_tuple(R) ->
 assert_ok_result(R) ->
     error({not_ok_result, R}).
 
-queue_count(undefined) ->
-    0;
 queue_count(Q) ->
     replayq:count(Q).
 
@@ -744,12 +899,12 @@ disk_queue_dir(Id, Index) ->
     QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
     filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).
 
-ensure_flush_timer(St = #{tref := undefined, batch_time := T}) ->
+ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) ->
     Ref = make_ref(),
     TRef = erlang:send_after(T, self(), {flush, Ref}),
-    St#{tref => {TRef, Ref}};
-ensure_flush_timer(St) ->
-    St.
+    Data#{tref => {TRef, Ref}};
+ensure_flush_timer(Data) ->
+    Data.
 
 cancel_flush_timer(St = #{tref := undefined}) ->
     St;
@@ -759,3 +914,31 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
 
 make_message_ref() ->
     erlang:unique_integer([monotonic, positive]).
+
+collect_requests(Acc, Limit) ->
+    Count = length(Acc),
+    do_collect_requests(Acc, Count, Limit).
+
+do_collect_requests(Acc, Count, Limit) when Count >= Limit ->
+    lists:reverse(Acc);
+do_collect_requests(Acc, Count, Limit) ->
+    receive
+        ?SEND_REQ(_From, _Req) = Request ->
+            do_collect_requests([Request | Acc], Count + 1, Limit)
+    after 0 ->
+        lists:reverse(Acc)
+    end.
+
+mark_as_sent(Batch) when is_list(Batch) ->
+    lists:map(fun mark_as_sent/1, Batch);
+mark_as_sent(?QUERY(From, Req, _)) ->
+    HasBeenSent = true,
+    ?QUERY(From, Req, HasBeenSent).
+
+is_async(ResourceId) ->
+    case emqx_resource_manager:ets_lookup(ResourceId) of
+        {ok, _Group, #{query_mode := QM, callback_mode := CM}} ->
+            call_mode(QM, CM) =:= async;
+        _ ->
+            false
+    end.

+ 2 - 0
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -83,12 +83,14 @@ query_mode(_) -> undefined.
 enable_batch(type) -> boolean();
 enable_batch(required) -> false;
 enable_batch(default) -> true;
+enable_batch(deprecated) -> {since, "v5.0.14"};
 enable_batch(desc) -> ?DESC("enable_batch");
 enable_batch(_) -> undefined.
 
 enable_queue(type) -> boolean();
 enable_queue(required) -> false;
 enable_queue(default) -> false;
+enable_queue(deprecated) -> {since, "v5.0.14"};
 enable_queue(desc) -> ?DESC("enable_queue");
 enable_queue(_) -> undefined.
 

+ 38 - 17
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -17,6 +17,7 @@
 -module(emqx_connector_demo).
 
 -include_lib("typerefl/include/types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -behaviour(emqx_resource).
 
@@ -28,6 +29,7 @@
     on_query/3,
     on_query_async/4,
     on_batch_query/3,
+    on_batch_query_async/4,
     on_get_status/2
 ]).
 
@@ -36,6 +38,8 @@
 %% callbacks for emqx_resource config schema
 -export([roots/0]).
 
+-define(CM_KEY, {?MODULE, callback_mode}).
+
 roots() ->
     [
         {name, fun name/1},
@@ -51,7 +55,6 @@ register(required) -> true;
 register(default) -> false;
 register(_) -> undefined.
 
--define(CM_KEY, {?MODULE, callback_mode}).
 callback_mode() ->
     persistent_term:get(?CM_KEY).
 
@@ -60,17 +63,12 @@ set_callback_mode(Mode) ->
 
 on_start(_InstId, #{create_error := true}) ->
     error("some error");
-on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
-    Register = maps:get(register, Opts, false),
-    {ok, Opts#{
-        id => InstId,
-        stop_error => true,
-        pid => spawn_counter_process(Name, Register)
-    }};
 on_start(InstId, #{name := Name} = Opts) ->
     Register = maps:get(register, Opts, false),
+    StopError = maps:get(stop_error, Opts, false),
     {ok, Opts#{
         id => InstId,
+        stop_error => StopError,
         pid => spawn_counter_process(Name, Register)
     }}.
 
@@ -95,8 +93,11 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
     From = {self(), ReqRef},
     Pid ! {From, {inc, N}},
     receive
-        {ReqRef, ok} -> ok;
-        {ReqRef, incorrect_status} -> {error, {recoverable_error, incorrect_status}}
+        {ReqRef, ok} ->
+            ?tp(connector_demo_inc_counter, #{n => N}),
+            ok;
+        {ReqRef, incorrect_status} ->
+            {error, {recoverable_error, incorrect_status}}
     after 1000 ->
         {error, timeout}
     end;
@@ -127,18 +128,30 @@ on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
     ok.
 
 on_batch_query(InstId, BatchReq, State) ->
-    %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed.
+    %% Requests can be either 'get_counter' or 'inc_counter', but
+    %% cannot be mixed.
     case hd(BatchReq) of
         {inc_counter, _} ->
-            batch_inc_counter(InstId, BatchReq, State);
+            batch_inc_counter(sync, InstId, BatchReq, State);
         get_counter ->
-            batch_get_counter(InstId, State)
+            batch_get_counter(sync, InstId, State)
     end.
 
-batch_inc_counter(InstId, BatchReq, State) ->
+on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
+    %% Requests can be either 'get_counter' or 'inc_counter', but
+    %% cannot be mixed.
+    case hd(BatchReq) of
+        {inc_counter, _} ->
+            batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State);
+        get_counter ->
+            batch_get_counter({async, ReplyFunAndArgs}, InstId, State)
+    end.
+
+batch_inc_counter(CallMode, InstId, BatchReq, State) ->
     TotalN = lists:foldl(
         fun
             ({inc_counter, N}, Total) ->
+                ?tp(connector_demo_batch_inc_individual, #{n => N}),
                 Total + N;
             (Req, _Total) ->
                 error({mixed_requests_not_allowed, {inc_counter, Req}})
@@ -146,10 +159,17 @@ batch_inc_counter(InstId, BatchReq, State) ->
         0,
         BatchReq
     ),
-    on_query(InstId, {inc_counter, TotalN}, State).
+    case CallMode of
+        sync ->
+            on_query(InstId, {inc_counter, TotalN}, State);
+        {async, ReplyFunAndArgs} ->
+            on_query_async(InstId, {inc_counter, TotalN}, ReplyFunAndArgs, State)
+    end.
 
-batch_get_counter(InstId, State) ->
-    on_query(InstId, get_counter, State).
+batch_get_counter(sync, InstId, State) ->
+    on_query(InstId, get_counter, State);
+batch_get_counter({async, ReplyFunAndArgs}, InstId, State) ->
+    on_query_async(InstId, get_counter, ReplyFunAndArgs, State).
 
 on_get_status(_InstId, #{health_check_error := true}) ->
     disconnected;
@@ -187,6 +207,7 @@ counter_loop(
             {inc, N, ReplyFun} when Status == running ->
                 %ct:pal("async counter recv: ~p", [{inc, N}]),
                 apply_reply(ReplyFun, ok),
+                ?tp(connector_demo_inc_counter_async, #{n => N}),
                 State#{counter => Num + N};
             {{FromPid, ReqRef}, {inc, N}} when Status == running ->
                 %ct:pal("sync counter recv: ~p", [{inc, N}]),

+ 484 - 55
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -30,6 +30,8 @@
 -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
 -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -37,11 +39,15 @@ groups() ->
     [].
 
 init_per_testcase(_, Config) ->
+    ct:timetrap({seconds, 30}),
     emqx_connector_demo:set_callback_mode(always_sync),
     Config.
 
 end_per_testcase(_, _Config) ->
-    _ = emqx_resource:remove(?ID).
+    snabbkaffe:stop(),
+    _ = emqx_resource:remove(?ID),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
 
 init_per_suite(Config) ->
     code:ensure_loaded(?TEST_RESOURCE),
@@ -140,6 +146,7 @@ t_create_remove_local(_) ->
     ?assertNot(is_process_alive(Pid)).
 
 t_do_not_start_after_created(_) ->
+    ct:pal("creating resource"),
     {ok, _} = emqx_resource:create_local(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
@@ -159,16 +166,19 @@ t_do_not_start_after_created(_) ->
     ),
 
     %% start the resource manually..
+    ct:pal("starting resource manually"),
     ok = emqx_resource:start(?ID),
     {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     ?assert(is_process_alive(Pid)),
 
     %% restart the resource
+    ct:pal("restarting resource"),
     ok = emqx_resource:restart(?ID),
     ?assertNot(is_process_alive(Pid)),
     {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
     ?assert(is_process_alive(Pid2)),
 
+    ct:pal("removing resource"),
     ok = emqx_resource:remove_local(?ID),
 
     ?assertNot(is_process_alive(Pid2)).
@@ -207,12 +217,13 @@ t_query_counter(_) ->
     ok = emqx_resource:remove_local(?ID).
 
 t_batch_query_counter(_) ->
+    BatchSize = 100,
     {ok, _} = emqx_resource:create_local(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, register => true},
-        #{enable_batch => true, query_mode => sync}
+        #{batch_size => BatchSize, query_mode => sync}
     ),
 
     ?check_trace(
@@ -225,15 +236,26 @@ t_batch_query_counter(_) ->
         end
     ),
 
+    NMsgs = 1_000,
     ?check_trace(
         ?TRACE_OPTS,
-        inc_counter_in_parallel(1000),
+        begin
+            NEvents = round(math:ceil(NMsgs / BatchSize)),
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := connector_demo_inc_counter}),
+                NEvents,
+                _Timeout = 10_000
+            ),
+            inc_counter_in_parallel(NMsgs),
+            {ok, _} = snabbkaffe:receive_events(SRef),
+            ok
+        end,
         fun(Trace) ->
             QueryTrace = ?of_kind(call_batch_query, Trace),
             ?assertMatch([#{batch := BatchReq} | _] when length(BatchReq) > 1, QueryTrace)
         end
     ),
-    {ok, 1000} = emqx_resource:query(?ID, get_counter),
+    {ok, NMsgs} = emqx_resource:query(?ID, get_counter),
 
     ok = emqx_resource:remove_local(?ID).
 
@@ -243,20 +265,28 @@ t_query_counter_async_query(_) ->
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, register => true},
-        #{query_mode => async, enable_batch => false}
+        #{query_mode => async, batch_size => 1}
     ),
     ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+    NMsgs = 1_000,
     ?check_trace(
         ?TRACE_OPTS,
-        inc_counter_in_parallel(1000),
+        begin
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := connector_demo_inc_counter}),
+                NMsgs,
+                _Timeout = 60_000
+            ),
+            inc_counter_in_parallel(NMsgs),
+            {ok, _} = snabbkaffe:receive_events(SRef),
+            ok
+        end,
         fun(Trace) ->
-            %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
+            %% the callback_mode of 'emqx_connector_demo' is 'always_sync'.
             QueryTrace = ?of_kind(call_query, Trace),
             ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
         end
     ),
-    %% wait for 1s to make sure all the aysnc query is sent to the resource.
-    timer:sleep(1000),
     %% simple query ignores the query_mode and batching settings in the resource_worker
     ?check_trace(
         ?TRACE_OPTS,
@@ -285,20 +315,32 @@ t_query_counter_async_callback(_) ->
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
         #{name => test_resource, register => true},
-        #{query_mode => async, enable_batch => false, async_inflight_window => 1000000}
+        #{
+            query_mode => async,
+            batch_size => 1,
+            async_inflight_window => 1000000
+        }
     ),
     ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+    NMsgs = 1_000,
     ?check_trace(
         ?TRACE_OPTS,
-        inc_counter_in_parallel(1000, ReqOpts),
+        begin
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
+                NMsgs,
+                _Timeout = 60_000
+            ),
+            inc_counter_in_parallel(NMsgs, ReqOpts),
+            {ok, _} = snabbkaffe:receive_events(SRef),
+            ok
+        end,
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
             ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
         end
     ),
 
-    %% wait for 1s to make sure all the aysnc query is sent to the resource.
-    timer:sleep(1000),
     %% simple query ignores the query_mode and batching settings in the resource_worker
     ?check_trace(
         ?TRACE_OPTS,
@@ -325,12 +367,29 @@ t_query_counter_async_callback(_) ->
 
 t_query_counter_async_inflight(_) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
+    MetricsTab = ets:new(metrics_tab, [ordered_set, public]),
+    ok = telemetry:attach_many(
+        ?FUNCTION_NAME,
+        emqx_resource_metrics:events(),
+        fun(Event, Measurements, Meta, _Config) ->
+            ets:insert(
+                MetricsTab,
+                {erlang:monotonic_time(), #{
+                    event => Event, measurements => Measurements, metadata => Meta
+                }}
+            ),
+            ok
+        end,
+        unused_config
+    ),
+    on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end),
 
     Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
-    Insert0 = fun(Tab, Result) ->
-        ets:insert(Tab, {make_ref(), Result})
+    Insert0 = fun(Tab, Ref, Result) ->
+        ct:pal("inserting ~p", [{Ref, Result}]),
+        ets:insert(Tab, {Ref, Result})
     end,
-    ReqOpts = #{async_reply_fun => {Insert0, [Tab0]}},
+    ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
     WindowSize = 15,
     {ok, _} = emqx_resource:create_local(
         ?ID,
@@ -339,11 +398,10 @@ t_query_counter_async_inflight(_) ->
         #{name => test_resource, register => true},
         #{
             query_mode => async,
-            enable_batch => false,
+            batch_size => 1,
             async_inflight_window => WindowSize,
             worker_pool_size => 1,
-            resume_interval => 300,
-            enable_queue => false
+            resume_interval => 300
         }
     ),
     ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
@@ -360,40 +418,76 @@ t_query_counter_async_inflight(_) ->
             ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
         end
     ),
+    tap_metrics(?LINE),
 
     %% this will block the resource_worker as the inflight window is full now
-    ok = emqx_resource:query(?ID, {inc_counter, 1}),
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqx_resource:query(?ID, {inc_counter, 2}),
+            #{?snk_kind := resource_worker_enter_blocked},
+            1_000
+        ),
     ?assertMatch(0, ets:info(Tab0, size)),
-    %% sleep to make the resource_worker resume some times
-    timer:sleep(2000),
 
+    tap_metrics(?LINE),
     %% send query now will fail because the resource is blocked.
     Insert = fun(Tab, Ref, Result) ->
-        ets:insert(Tab, {Ref, Result})
+        ct:pal("inserting ~p", [{Ref, Result}]),
+        ets:insert(Tab, {Ref, Result}),
+        ?tp(tmp_query_inserted, #{})
     end,
-    ok = emqx_resource:query(?ID, {inc_counter, 1}, #{
-        async_reply_fun => {Insert, [Tab0, tmp_query]}
-    }),
-    timer:sleep(100),
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqx_resource:query(?ID, {inc_counter, 3}, #{
+                async_reply_fun => {Insert, [Tab0, tmp_query]}
+            }),
+            #{?snk_kind := tmp_query_inserted},
+            1_000
+        ),
+    %% since this counts as a failure, it'll be enqueued and retried
+    %% later, when the resource is unblocked.
     ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
-
-    %% all response should be received after the resource is resumed.
+    tap_metrics(?LINE),
+
+    %% all responses should be received after the resource is resumed.
+    {ok, SRef0} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
+        %% +1 because the tmp_query above will be retried and succeed
+        %% this time.
+        WindowSize + 1,
+        _Timeout = 60_000
+    ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
-    timer:sleep(1000),
+    tap_metrics(?LINE),
+    {ok, _} = snabbkaffe:receive_events(SRef0),
+    %% since the previous tmp_query was enqueued to be retried, we
+    %% take it again from the table; this time, it should have
+    %% succeeded.
+    ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
     ?assertEqual(WindowSize, ets:info(Tab0, size)),
+    tap_metrics(?LINE),
 
     %% send async query, this time everything should be ok.
     Num = 10,
     ?check_trace(
         ?TRACE_OPTS,
-        inc_counter_in_parallel(Num, ReqOpts),
+        begin
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
+                Num,
+                _Timeout = 60_000
+            ),
+            inc_counter_in_parallel(Num, ReqOpts),
+            {ok, _} = snabbkaffe:receive_events(SRef),
+            ok
+        end,
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
-            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
+            ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace)
         end
     ),
-    timer:sleep(1000),
-    ?assertEqual(WindowSize + Num, ets:info(Tab0, size)),
+    ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+    tap_metrics(?LINE),
 
     %% block the resource
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
@@ -411,27 +505,253 @@ t_query_counter_async_inflight(_) ->
     ok = emqx_resource:query(?ID, {inc_counter, 1}),
 
     Sent = WindowSize + Num + WindowSize,
+    {ok, SRef1} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
+        WindowSize,
+        _Timeout = 60_000
+    ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
-    timer:sleep(1000),
+    {ok, _} = snabbkaffe:receive_events(SRef1),
     ?assertEqual(Sent, ets:info(Tab0, size)),
 
     {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
     ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
     ?assert(Sent =< Counter),
 
-    {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
-    ct:pal("metrics: ~p", [C]),
-    {ok, IncorrectStatusCount} = emqx_resource:simple_sync_query(?ID, get_incorrect_status_count),
-    %% The `simple_sync_query' we just did also increases the matched
-    %% count, hence the + 1.
-    ExtraSimpleCallCount = IncorrectStatusCount + 1,
+    %% give the metrics some time to stabilize.
+    ct:sleep(1000),
+    #{counters := C, gauges := G} = tap_metrics(?LINE),
     ?assertMatch(
-        #{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when
-            M == Ss + Dp - Rs + ExtraSimpleCallCount,
-        C,
         #{
-            metrics => C,
-            extra_simple_call_count => ExtraSimpleCallCount
+            counters :=
+                #{matched := M, success := Ss, dropped := Dp},
+            gauges := #{queuing := Qing, inflight := Infl}
+        } when
+            M == Ss + Dp + Qing + Infl,
+        #{counters => C, gauges => G},
+        #{
+            metrics => #{counters => C, gauges => G},
+            results => ets:tab2list(Tab0),
+            metrics_trace => ets:tab2list(MetricsTab)
+        }
+    ),
+    ?assert(
+        lists:all(
+            fun
+                ({_, ok}) -> true;
+                (_) -> false
+            end,
+            ets:tab2list(Tab0)
+        )
+    ),
+    ok = emqx_resource:remove_local(?ID).
+
+t_query_counter_async_inflight_batch(_) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    MetricsTab = ets:new(metrics_tab, [ordered_set, public]),
+    ok = telemetry:attach_many(
+        ?FUNCTION_NAME,
+        emqx_resource_metrics:events(),
+        fun(Event, Measurements, Meta, _Config) ->
+            ets:insert(
+                MetricsTab,
+                {erlang:monotonic_time(), #{
+                    event => Event, measurements => Measurements, metadata => Meta
+                }}
+            ),
+            ok
+        end,
+        unused_config
+    ),
+    on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end),
+
+    Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
+    Insert0 = fun(Tab, Ref, Result) ->
+        ct:pal("inserting ~p", [{Ref, Result}]),
+        ets:insert(Tab, {Ref, Result})
+    end,
+    ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
+    BatchSize = 2,
+    WindowSize = 3,
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource, register => true},
+        #{
+            query_mode => async,
+            batch_size => BatchSize,
+            async_inflight_window => WindowSize,
+            worker_pool_size => 1,
+            resume_interval => 300
+        }
+    ),
+    ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+
+    %% block the resource
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
+
+    %% send async query to make the inflight window full
+    NumMsgs = BatchSize * WindowSize,
+    ?check_trace(
+        begin
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := call_batch_query_async}),
+                WindowSize,
+                _Timeout = 60_000
+            ),
+            inc_counter_in_parallel(NumMsgs, ReqOpts),
+            {ok, _} = snabbkaffe:receive_events(SRef),
+            ok
+        end,
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_batch_query_async, Trace),
+            ?assertMatch(
+                [
+                    #{
+                        batch := [
+                            {query, _, {inc_counter, 1}, _},
+                            {query, _, {inc_counter, 1}, _}
+                        ]
+                    }
+                    | _
+                ],
+                QueryTrace
+            )
+        end
+    ),
+    tap_metrics(?LINE),
+
+    ?check_trace(
+        begin
+            %% this will block the resource_worker as the inflight window is full now
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_resource:query(?ID, {inc_counter, 2}),
+                    #{?snk_kind := resource_worker_enter_blocked},
+                    5_000
+                ),
+            ?assertMatch(0, ets:info(Tab0, size)),
+            ok
+        end,
+        []
+    ),
+
+    tap_metrics(?LINE),
+    %% send query now will fail because the resource is blocked.
+    Insert = fun(Tab, Ref, Result) ->
+        ct:pal("inserting ~p", [{Ref, Result}]),
+        ets:insert(Tab, {Ref, Result}),
+        ?tp(tmp_query_inserted, #{})
+    end,
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqx_resource:query(?ID, {inc_counter, 3}, #{
+                async_reply_fun => {Insert, [Tab0, tmp_query]}
+            }),
+            #{?snk_kind := tmp_query_inserted},
+            1_000
+        ),
+    %% since this counts as a failure, it'll be enqueued and retried
+    %% later, when the resource is unblocked.
+    ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
+    tap_metrics(?LINE),
+
+    %% all responses should be received after the resource is resumed.
+    {ok, SRef0} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
+        %% +1 because the tmp_query above will be retried and succeed
+        %% this time.
+        WindowSize + 1,
+        _Timeout = 60_000
+    ),
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
+    tap_metrics(?LINE),
+    {ok, _} = snabbkaffe:receive_events(SRef0),
+    %% since the previous tmp_query was enqueued to be retried, we
+    %% take it again from the table; this time, it should have
+    %% succeeded.
+    ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
+    ?assertEqual(NumMsgs, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+    tap_metrics(?LINE),
+
+    %% send async query, this time everything should be ok.
+    NumBatches1 = 3,
+    NumMsgs1 = BatchSize * NumBatches1,
+    ?check_trace(
+        ?TRACE_OPTS,
+        begin
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
+                NumBatches1,
+                _Timeout = 60_000
+            ),
+            inc_counter_in_parallel(NumMsgs1, ReqOpts),
+            {ok, _} = snabbkaffe:receive_events(SRef),
+            ok
+        end,
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_batch_query_async, Trace),
+            ?assertMatch(
+                [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _],
+                QueryTrace
+            )
+        end
+    ),
+    ?assertEqual(
+        NumMsgs + NumMsgs1,
+        ets:info(Tab0, size),
+        #{tab => ets:tab2list(Tab0)}
+    ),
+    tap_metrics(?LINE),
+
+    %% block the resource
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
+    %% again, send async query to make the inflight window full
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(WindowSize, ReqOpts),
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_batch_query_async, Trace),
+            ?assertMatch(
+                [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _],
+                QueryTrace
+            )
+        end
+    ),
+
+    %% this will block the resource_worker
+    ok = emqx_resource:query(?ID, {inc_counter, 1}),
+
+    Sent = NumMsgs + NumMsgs1 + WindowSize,
+    {ok, SRef1} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
+        WindowSize,
+        _Timeout = 60_000
+    ),
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
+    {ok, _} = snabbkaffe:receive_events(SRef1),
+    ?assertEqual(Sent, ets:info(Tab0, size)),
+
+    {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
+    ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
+    ?assert(Sent =< Counter),
+
+    %% give the metrics some time to stabilize.
+    ct:sleep(1000),
+    #{counters := C, gauges := G} = tap_metrics(?LINE),
+    ?assertMatch(
+        #{
+            counters :=
+                #{matched := M, success := Ss, dropped := Dp},
+            gauges := #{queuing := Qing, inflight := Infl}
+        } when
+            M == Ss + Dp + Qing + Infl,
+        #{counters => C, gauges => G},
+        #{
+            metrics => #{counters => C, gauges => G},
+            results => ets:tab2list(Tab0),
+            metrics_trace => ets:tab2list(MetricsTab)
         }
     ),
     ?assert(
@@ -506,9 +826,9 @@ t_stop_start(_) ->
     %% add some metrics to test their persistence
     WorkerID0 = <<"worker:0">>,
     WorkerID1 = <<"worker:1">>,
-    emqx_resource_metrics:batching_set(?ID, WorkerID0, 2),
-    emqx_resource_metrics:batching_set(?ID, WorkerID1, 3),
-    ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
+    emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2),
+    emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3),
+    ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
 
     {ok, _} = emqx_resource:check_and_recreate(
         ?ID,
@@ -522,7 +842,7 @@ t_stop_start(_) ->
     ?assert(is_process_alive(Pid0)),
 
     %% metrics are reset when recreating
-    ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),
+    ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
 
     ok = emqx_resource:stop(?ID),
 
@@ -541,11 +861,11 @@ t_stop_start(_) ->
     ?assert(is_process_alive(Pid1)),
 
     %% now stop while resetting the metrics
-    emqx_resource_metrics:batching_set(?ID, WorkerID0, 1),
-    emqx_resource_metrics:batching_set(?ID, WorkerID1, 4),
-    ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)),
+    emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
+    emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
+    ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
     ok = emqx_resource:stop(?ID),
-    ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)),
+    ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
 
     ok.
 
@@ -641,18 +961,22 @@ create_dry_run_local_succ() ->
     ?assertEqual(undefined, whereis(test_resource)).
 
 t_create_dry_run_local_failed(_) ->
+    ct:timetrap({seconds, 120}),
+    ct:pal("creating with creation error"),
     Res1 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{create_error => true}
     ),
     ?assertMatch({error, _}, Res1),
 
+    ct:pal("creating with health check error"),
     Res2 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{name => test_resource, health_check_error => true}
     ),
     ?assertMatch({error, _}, Res2),
 
+    ct:pal("creating with stop error"),
     Res3 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{name => test_resource, stop_error => true}
@@ -689,16 +1013,116 @@ t_auto_retry(_) ->
     ),
     ?assertEqual(ok, Res).
 
+t_retry_batch(_Config) ->
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 5,
+            worker_pool_size => 1,
+            resume_interval => 1_000
+        }
+    ),
+
+    ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
+    Matched0 = emqx_resource_metrics:matched_get(?ID),
+    ?assertEqual(1, Matched0),
+
+    %% these requests will batch together and fail; the buffer worker
+    %% will enter the `blocked' state and they'll be retried later,
+    %% after it unblocks.
+    Payloads = lists:seq(1, 5),
+    NumPayloads = length(Payloads),
+    ExpectedCount = 15,
+
+    ?check_trace(
+        begin
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    lists:foreach(
+                        fun(N) ->
+                            ok = emqx_resource:query(?ID, {inc_counter, N})
+                        end,
+                        Payloads
+                    ),
+                    #{?snk_kind := resource_worker_enter_blocked},
+                    5_000
+                ),
+            %% now the individual messages should have been counted
+            Matched1 = emqx_resource_metrics:matched_get(?ID),
+            ?assertEqual(Matched0 + NumPayloads, Matched1),
+
+            %% wait for two more retries while the failure is enabled; the
+            %% batch shall remain enqueued.
+            {ok, _} =
+                snabbkaffe:block_until(
+                    ?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}),
+                    5_000
+                ),
+            %% should not have increased the matched count with the retries
+            Matched2 = emqx_resource_metrics:matched_get(?ID),
+            ?assertEqual(Matched1, Matched2),
+
+            %% now unblock the buffer worker so it may retry the batch,
+            %% but it'll still fail
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    ok = emqx_resource:simple_sync_query(?ID, resume),
+                    #{?snk_kind := resource_worker_retry_queue_batch_succeeded},
+                    5_000
+                ),
+            %% 1 more because of the `resume' call
+            Matched3 = emqx_resource_metrics:matched_get(?ID),
+            ?assertEqual(Matched2 + 1, Matched3),
+
+            {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
+            {Counter, Matched3}
+        end,
+        fun({Counter, Matched3}, Trace) ->
+            %% 1 original attempt + 2 failed retries + final
+            %% successful attempt.
+            %% each time should be the original batch (no duplicate
+            %% elements or reordering).
+            ExpectedSeenPayloads = lists:flatten(lists:duplicate(4, Payloads)),
+            ?assertEqual(
+                ExpectedSeenPayloads,
+                ?projection(n, ?of_kind(connector_demo_batch_inc_individual, Trace))
+            ),
+            ?assertMatch(
+                [#{n := ExpectedCount}],
+                ?of_kind(connector_demo_inc_counter, Trace)
+            ),
+            ?assertEqual(ExpectedCount, Counter),
+            %% matched should count only the original requests, and not retries
+            %% + 1 for `resume' call
+            %% + 1 for `block' call
+            %% + 1 for `get_counter' call
+            %% and the message count (1 time)
+            Matched4 = emqx_resource_metrics:matched_get(?ID),
+            ?assertEqual(Matched3 + 1, Matched4),
+            ok
+        end
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
 inc_counter_in_parallel(N) ->
     inc_counter_in_parallel(N, #{}).
 
-inc_counter_in_parallel(N, Opts) ->
+inc_counter_in_parallel(N, Opts0) ->
     Parent = self(),
     Pids = [
         erlang:spawn(fun() ->
+            Opts =
+                case is_function(Opts0) of
+                    true -> Opts0();
+                    false -> Opts0
+                end,
             emqx_resource:query(?ID, {inc_counter, 1}, Opts),
             Parent ! {complete, self()}
         end)
@@ -719,3 +1143,8 @@ bin_config() ->
 config() ->
     {ok, Config} = hocon:binary(bin_config()),
     Config.
+
+tap_metrics(Line) ->
+    {ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID),
+    ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
+    #{counters => C, gauges => G}.

+ 1 - 0
changes/v5.0.14/feat-9642.en.md

@@ -0,0 +1 @@
+Deprecates `enable_batch` and `enable_queue` options for bridges/resources.  After this change, queuing is always enabled for bridges, and batching is controlled by the `batch_size` option: `batch_size > 1` means batching will be enabled.

+ 1 - 0
changes/v5.0.14/feat-9642.zh.md

@@ -0,0 +1 @@
+废弃了桥梁/资源的`enable_batch`和`enable_queue`选项 。 在这一改变之后,队列总是被启用,而批处理由`batch_size`选项控制:`batch_size > 1`意味着批处理将被启用。

+ 3 - 0
changes/v5.0.14/fix-9642.en.md

@@ -0,0 +1,3 @@
+Fix some issues that could lead to wrong bridge metrics.
+Fix and issue that could lead to message loss and wrong metrics with Kafka Producer bridge when Kafka or the connection to it is down.
+Fix some issues that could lead to the same message being delivered more than once when using batching for bridges and when the batch was retried.

+ 3 - 0
changes/v5.0.14/fix-9642.zh.md

@@ -0,0 +1,3 @@
+修复一些可能导致错误桥接指标的问题。
+修复当Kafka或其连接中断时,可能导致Kafka Producer桥的消息丢失和错误指标的问题。
+修复一些问题,这些问题可能导致在为桥接使用批处理时,同一消息被多次传递,以及批处理被重试时。

+ 0 - 2
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl

@@ -58,11 +58,9 @@ values(post) ->
             worker_pool_size => 1,
             health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
             auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
-            enable_batch => true,
             batch_size => ?DEFAULT_BATCH_SIZE,
             batch_time => ?DEFAULT_BATCH_TIME,
             query_mode => async,
-            enable_queue => false,
             max_queue_bytes => ?DEFAULT_QUEUE_SIZE
         }
     };

+ 1 - 2
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl

@@ -79,8 +79,7 @@ values(common, RedisType, SpecificOpts) ->
         auto_reconnect => true,
         command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>],
         resource_opts => #{
-            enable_batch => false,
-            batch_size => 100,
+            batch_size => 1,
             batch_time => <<"20ms">>
         },
         ssl => #{enable => false}

+ 0 - 4
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -287,11 +287,9 @@ kafka_bridge_rest_api_helper(Config) ->
     ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
-    ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
-    ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),
@@ -314,11 +312,9 @@ kafka_bridge_rest_api_helper(Config) ->
     ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
-    ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
-    ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),

+ 23 - 60
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl

@@ -38,18 +38,12 @@ groups() ->
         {group, sync_query},
         {group, async_query}
     ],
-    QueueGroups = [
-        {group, queue_enabled},
-        {group, queue_disabled}
-    ],
     ResourceGroups = [{group, gcp_pubsub}],
     [
         {with_batch, SynchronyGroups},
         {without_batch, SynchronyGroups},
-        {sync_query, QueueGroups},
-        {async_query, QueueGroups},
-        {queue_enabled, ResourceGroups},
-        {queue_disabled, ResourceGroups},
+        {sync_query, ResourceGroups},
+        {async_query, ResourceGroups},
         {gcp_pubsub, MatrixTCs}
     ].
 
@@ -99,13 +93,9 @@ init_per_group(sync_query, Config) ->
 init_per_group(async_query, Config) ->
     [{query_mode, async} | Config];
 init_per_group(with_batch, Config) ->
-    [{enable_batch, true} | Config];
+    [{batch_size, 100} | Config];
 init_per_group(without_batch, Config) ->
-    [{enable_batch, false} | Config];
-init_per_group(queue_enabled, Config) ->
-    [{enable_queue, true} | Config];
-init_per_group(queue_disabled, Config) ->
-    [{enable_queue, false} | Config];
+    [{batch_size, 1} | Config];
 init_per_group(_Group, Config) ->
     Config.
 
@@ -118,16 +108,16 @@ end_per_group(_Group, _Config) ->
 init_per_testcase(TestCase, Config0) when
     TestCase =:= t_publish_success_batch
 ->
-    case ?config(enable_batch, Config0) of
-        true ->
+    case ?config(batch_size, Config0) of
+        1 ->
+            {skip, no_batching};
+        _ ->
             {ok, _} = start_echo_http_server(),
             delete_all_bridges(),
             Tid = install_telemetry_handler(TestCase),
             Config = generate_config(Config0),
             put(telemetry_table, Tid),
-            [{telemetry_table, Tid} | Config];
-        false ->
-            {skip, no_batching}
+            [{telemetry_table, Tid} | Config]
     end;
 init_per_testcase(TestCase, Config0) ->
     {ok, _} = start_echo_http_server(),
@@ -271,9 +261,7 @@ certs() ->
     ].
 
 gcp_pubsub_config(Config) ->
-    EnableBatch = proplists:get_value(enable_batch, Config, true),
     QueryMode = proplists:get_value(query_mode, Config, sync),
-    EnableQueue = proplists:get_value(enable_queue, Config, false),
     BatchSize = proplists:get_value(batch_size, Config, 100),
     BatchTime = proplists:get_value(batch_time, Config, <<"20ms">>),
     PayloadTemplate = proplists:get_value(payload_template, Config, ""),
@@ -296,9 +284,7 @@ gcp_pubsub_config(Config) ->
             "  pipelining = ~b\n"
             "  resource_opts = {\n"
             "    worker_pool_size = 1\n"
-            "    enable_batch = ~p\n"
             "    query_mode = ~s\n"
-            "    enable_queue = ~p\n"
             "    batch_size = ~b\n"
             "    batch_time = \"~s\"\n"
             "  }\n"
@@ -309,9 +295,7 @@ gcp_pubsub_config(Config) ->
                 PayloadTemplate,
                 PubSubTopic,
                 PipelineSize,
-                EnableBatch,
                 QueryMode,
-                EnableQueue,
                 BatchSize,
                 BatchTime
             ]
@@ -358,11 +342,9 @@ service_account_json(PrivateKeyPEM) ->
 
 metrics_mapping() ->
     #{
-        batching => fun emqx_resource_metrics:batching_get/1,
         dropped => fun emqx_resource_metrics:dropped_get/1,
         dropped_other => fun emqx_resource_metrics:dropped_other_get/1,
         dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1,
-        dropped_queue_not_enabled => fun emqx_resource_metrics:dropped_queue_not_enabled_get/1,
         dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1,
         dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1,
         failed => fun emqx_resource_metrics:failed_get/1,
@@ -625,7 +607,6 @@ t_publish_success(Config) ->
     ),
     assert_metrics(
         #{
-            batching => 0,
             dropped => 0,
             failed => 0,
             inflight => 0,
@@ -674,7 +655,6 @@ t_publish_success_local_topic(Config) ->
     ),
     assert_metrics(
         #{
-            batching => 0,
             dropped => 0,
             failed => 0,
             inflight => 0,
@@ -761,7 +741,6 @@ t_publish_templated(Config) ->
     ),
     assert_metrics(
         #{
-            batching => 0,
             dropped => 0,
             failed => 0,
             inflight => 0,
@@ -830,11 +809,10 @@ t_publish_success_batch(Config) ->
     wait_until_gauge_is(inflight, 0, _Timeout = 400),
     assert_metrics(
         #{
-            batching => 0,
             dropped => 0,
             failed => 0,
             inflight => 0,
-            matched => NumMessages div BatchSize,
+            matched => NumMessages,
             queuing => 0,
             retried => 0,
             success => NumMessages
@@ -1013,7 +991,6 @@ t_publish_timeout(Config) ->
     do_econnrefused_or_timeout_test(Config, timeout).
 
 do_econnrefused_or_timeout_test(Config, Error) ->
-    EnableQueue = ?config(enable_queue, Config),
     QueryMode = ?config(query_mode, Config),
     ResourceId = ?config(resource_id, Config),
     TelemetryTable = ?config(telemetry_table, Config),
@@ -1089,39 +1066,22 @@ do_econnrefused_or_timeout_test(Config, Error) ->
         end
     ),
 
-    case {Error, QueryMode, EnableQueue} of
-        {_, sync, false} ->
-            wait_telemetry_event(TelemetryTable, dropped_queue_not_enabled, ResourceId, #{
-                timeout => 10_000,
-                n_events => 1
-            }),
-            assert_metrics(
-                #{
-                    batching => 0,
-                    dropped => 1,
-                    dropped_queue_not_enabled => 1,
-                    failed => 0,
-                    inflight => 0,
-                    matched => 1,
-                    queuing => 0,
-                    retried => 0,
-                    success => 0
-                },
-                ResourceId
-            );
+    case {Error, QueryMode} of
         %% apparently, async with disabled queue doesn't mark the
         %% message as dropped; and since it never considers the
         %% response expired, this succeeds.
-        {econnrefused, async, _} ->
+        {econnrefused, async} ->
             wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
                 timeout => 10_000, n_events => 1
             }),
+            %% even waiting, hard to avoid flakiness... simpler to just sleep
+            %% a bit until stabilization.
+            ct:sleep(200),
             CurrentMetrics = current_metrics(ResourceId),
             RecordedEvents = ets:tab2list(TelemetryTable),
             ct:pal("telemetry events: ~p", [RecordedEvents]),
             ?assertMatch(
                 #{
-                    batching := 0,
                     dropped := Dropped,
                     failed := 0,
                     inflight := Inflight,
@@ -1132,7 +1092,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 } when Matched >= 1 andalso Inflight + Queueing + Dropped =< 2,
                 CurrentMetrics
             );
-        {timeout, async, _} ->
+        {timeout, async} ->
             wait_telemetry_event(TelemetryTable, success, ResourceId, #{
                 timeout => 10_000, n_events => 2
             }),
@@ -1140,7 +1100,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
             wait_until_gauge_is(queuing, 0, _Timeout = 400),
             assert_metrics(
                 #{
-                    batching => 0,
                     dropped => 0,
                     failed => 0,
                     inflight => 0,
@@ -1151,13 +1110,15 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 },
                 ResourceId
             );
-        {_, sync, true} ->
+        {_, sync} ->
             wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
                 timeout => 10_000, n_events => 2
             }),
+            %% even waiting, hard to avoid flakiness... simpler to just sleep
+            %% a bit until stabilization.
+            ct:sleep(200),
             assert_metrics(
                 #{
-                    batching => 0,
                     dropped => 0,
                     failed => 0,
                     inflight => 0,
@@ -1364,9 +1325,11 @@ t_unrecoverable_error(Config) ->
         ResourceId,
         #{n_events => ExpectedInflightEvents, timeout => 5_000}
     ),
+    %% even waiting, hard to avoid flakiness... simpler to just sleep
+    %% a bit until stabilization.
+    ct:sleep(200),
     assert_metrics(
         #{
-            batching => 0,
             dropped => 0,
             failed => 1,
             inflight => 0,

+ 16 - 15
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl

@@ -204,9 +204,9 @@ init_per_group(sync_query, Config) ->
 init_per_group(async_query, Config) ->
     [{query_mode, async} | Config];
 init_per_group(with_batch, Config) ->
-    [{enable_batch, true} | Config];
+    [{batch_size, 100} | Config];
 init_per_group(without_batch, Config) ->
-    [{enable_batch, false} | Config];
+    [{batch_size, 1} | Config];
 init_per_group(_Group, Config) ->
     Config.
 
@@ -261,7 +261,6 @@ example_write_syntax() ->
         "${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>.
 
 influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
-    EnableBatch = proplists:get_value(enable_batch, Config, true),
     BatchSize = proplists:get_value(batch_size, Config, 100),
     QueryMode = proplists:get_value(query_mode, Config, sync),
     UseTLS = proplists:get_value(use_tls, Config, false),
@@ -278,7 +277,6 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
             "  precision = ns\n"
             "  write_syntax = \"~s\"\n"
             "  resource_opts = {\n"
-            "    enable_batch = ~p\n"
             "    query_mode = ~s\n"
             "    batch_size = ~b\n"
             "  }\n"
@@ -292,7 +290,6 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
                 InfluxDBHost,
                 InfluxDBPort,
                 WriteSyntax,
-                EnableBatch,
                 QueryMode,
                 BatchSize,
                 UseTLS
@@ -300,7 +297,6 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
         ),
     {Name, ConfigString, parse_and_check(ConfigString, Type, Name)};
 influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
-    EnableBatch = proplists:get_value(enable_batch, Config, true),
     BatchSize = proplists:get_value(batch_size, Config, 100),
     QueryMode = proplists:get_value(query_mode, Config, sync),
     UseTLS = proplists:get_value(use_tls, Config, false),
@@ -317,7 +313,6 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
             "  precision = ns\n"
             "  write_syntax = \"~s\"\n"
             "  resource_opts = {\n"
-            "    enable_batch = ~p\n"
             "    query_mode = ~s\n"
             "    batch_size = ~b\n"
             "  }\n"
@@ -331,7 +326,6 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
                 InfluxDBHost,
                 InfluxDBPort,
                 WriteSyntax,
-                EnableBatch,
                 QueryMode,
                 BatchSize,
                 UseTLS
@@ -723,7 +717,7 @@ t_bad_timestamp(Config) ->
     InfluxDBType = ?config(influxdb_type, Config),
     InfluxDBName = ?config(influxdb_name, Config),
     QueryMode = ?config(query_mode, Config),
-    EnableBatch = ?config(enable_batch, Config),
+    BatchSize = ?config(batch_size, Config),
     InfluxDBConfigString0 = ?config(influxdb_config_string, Config),
     InfluxDBTypeCfg =
         case InfluxDBType of
@@ -774,7 +768,8 @@ t_bad_timestamp(Config) ->
         fun(Result, Trace) ->
             ?assertMatch({_, {ok, _}}, Result),
             {Return, {ok, _}} = Result,
-            case {QueryMode, EnableBatch} of
+            IsBatch = BatchSize > 1,
+            case {QueryMode, IsBatch} of
                 {async, true} ->
                     ?assertEqual(ok, Return),
                     ?assertMatch(
@@ -921,12 +916,13 @@ t_write_failure(Config) ->
 
 t_missing_field(Config) ->
     QueryMode = ?config(query_mode, Config),
-    EnableBatch = ?config(enable_batch, Config),
+    BatchSize = ?config(batch_size, Config),
+    IsBatch = BatchSize > 1,
     {ok, _} =
         create_bridge(
             Config,
             #{
-                <<"resource_opts">> => #{<<"batch_size">> => 1},
+                <<"resource_opts">> => #{<<"worker_pool_size">> => 1},
                 <<"write_syntax">> => <<"${clientid} foo=${foo}i">>
             }
         ),
@@ -943,9 +939,14 @@ t_missing_field(Config) ->
         begin
             emqx:publish(Msg0),
             emqx:publish(Msg1),
+            NEvents =
+                case IsBatch of
+                    true -> 1;
+                    false -> 2
+                end,
             {ok, _} =
                 snabbkaffe:block_until(
-                    ?match_n_events(2, #{
+                    ?match_n_events(NEvents, #{
                         ?snk_kind := influxdb_connector_send_query_error,
                         mode := QueryMode
                     }),
@@ -956,10 +957,10 @@ t_missing_field(Config) ->
         fun(Trace) ->
             PersistedData0 = query_by_clientid(ClientId0, Config),
             PersistedData1 = query_by_clientid(ClientId1, Config),
-            case EnableBatch of
+            case IsBatch of
                 true ->
                     ?assertMatch(
-                        [#{error := points_trans_failed}, #{error := points_trans_failed} | _],
+                        [#{error := points_trans_failed} | _],
                         ?of_kind(influxdb_connector_send_query_error, Trace)
                     );
                 false ->

+ 17 - 9
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl

@@ -78,10 +78,10 @@ init_per_group(tls, Config) ->
         | Config
     ];
 init_per_group(with_batch, Config0) ->
-    Config = [{enable_batch, true} | Config0],
+    Config = [{batch_size, 100} | Config0],
     common_init(Config);
 init_per_group(without_batch, Config0) ->
-    Config = [{enable_batch, false} | Config0],
+    Config = [{batch_size, 1} | Config0],
     common_init(Config);
 init_per_group(_Group, Config) ->
     Config.
@@ -157,7 +157,7 @@ mysql_config(BridgeType, Config) ->
     MysqlPort = integer_to_list(?config(mysql_port, Config)),
     Server = ?config(mysql_host, Config) ++ ":" ++ MysqlPort,
     Name = atom_to_binary(?MODULE),
-    EnableBatch = ?config(enable_batch, Config),
+    BatchSize = ?config(batch_size, Config),
     QueryMode = ?config(query_mode, Config),
     TlsEnabled = ?config(enable_tls, Config),
     ConfigString =
@@ -170,7 +170,7 @@ mysql_config(BridgeType, Config) ->
             "  password = ~p\n"
             "  sql = ~p\n"
             "  resource_opts = {\n"
-            "    enable_batch = ~p\n"
+            "    batch_size = ~b\n"
             "    query_mode = ~s\n"
             "  }\n"
             "  ssl = {\n"
@@ -185,7 +185,7 @@ mysql_config(BridgeType, Config) ->
                 ?MYSQL_USERNAME,
                 ?MYSQL_PASSWORD,
                 ?SQL_BRIDGE,
-                EnableBatch,
+                BatchSize,
                 QueryMode,
                 TlsEnabled
             ]
@@ -440,7 +440,9 @@ t_simple_sql_query(Config) ->
     ),
     Request = {sql, <<"SELECT count(1) AS T">>},
     Result = query_resource(Config, Request),
-    case ?config(enable_batch, Config) of
+    BatchSize = ?config(batch_size, Config),
+    IsBatch = BatchSize > 1,
+    case IsBatch of
         true -> ?assertEqual({error, batch_select_not_implemented}, Result);
         false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
     end,
@@ -452,7 +454,9 @@ t_missing_data(Config) ->
         create_bridge(Config)
     ),
     Result = send_message(Config, #{}),
-    case ?config(enable_batch, Config) of
+    BatchSize = ?config(batch_size, Config),
+    IsBatch = BatchSize > 1,
+    case IsBatch of
         true ->
             ?assertMatch(
                 {error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result
@@ -469,7 +473,9 @@ t_bad_sql_parameter(Config) ->
     ),
     Request = {sql, <<"">>, [bad_parameter]},
     Result = query_resource(Config, Request),
-    case ?config(enable_batch, Config) of
+    BatchSize = ?config(batch_size, Config),
+    IsBatch = BatchSize > 1,
+    case IsBatch of
         true -> ?assertEqual({error, invalid_request}, Result);
         false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result)
     end,
@@ -482,7 +488,9 @@ t_unprepared_statement_query(Config) ->
     ),
     Request = {prepared_query, unprepared_query, []},
     Result = query_resource(Config, Request),
-    case ?config(enable_batch, Config) of
+    BatchSize = ?config(batch_size, Config),
+    IsBatch = BatchSize > 1,
+    case IsBatch of
         true -> ?assertEqual({error, invalid_request}, Result);
         false -> ?assertEqual({error, prepared_statement_invalid}, Result)
     end,

+ 2 - 8
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl

@@ -451,8 +451,6 @@ toxiproxy_redis_bridge_config() ->
     Conf0 = ?REDIS_TOXYPROXY_CONNECT_CONFIG#{
         <<"resource_opts">> => #{
             <<"query_mode">> => <<"async">>,
-            <<"enable_batch">> => <<"true">>,
-            <<"enable_queue">> => <<"true">>,
             <<"worker_pool_size">> => <<"1">>,
             <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
             <<"health_check_interval">> => <<"1s">>
@@ -465,8 +463,7 @@ invalid_command_bridge_config() ->
     Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS),
     Conf1#{
         <<"resource_opts">> => #{
-            <<"enable_batch">> => <<"false">>,
-            <<"enable_queue">> => <<"false">>,
+            <<"batch_size">> => <<"1">>,
             <<"worker_pool_size">> => <<"1">>
         },
         <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>]
@@ -476,13 +473,10 @@ resource_configs() ->
     #{
         batch_off => #{
             <<"query_mode">> => <<"sync">>,
-            <<"enable_batch">> => <<"false">>,
-            <<"enable_queue">> => <<"false">>
+            <<"batch_size">> => <<"1">>
         },
         batch_on => #{
             <<"query_mode">> => <<"async">>,
-            <<"enable_batch">> => <<"true">>,
-            <<"enable_queue">> => <<"true">>,
             <<"worker_pool_size">> => <<"1">>,
             <<"batch_size">> => integer_to_binary(?BATCH_SIZE)
         }

+ 1 - 0
scripts/ct/run.sh

@@ -69,6 +69,7 @@ done
 
 if [ "${WHICH_APP}" = 'novalue' ]; then
     echo "must provide --app arg"
+    help
     exit 1
 fi