|
|
@@ -73,9 +73,8 @@
|
|
|
-type id() :: binary().
|
|
|
-type index() :: pos_integer().
|
|
|
-type expire_at() :: infinity | integer().
|
|
|
--type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()).
|
|
|
+-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
|
|
|
-type request() :: term().
|
|
|
--type from() :: pid() | reply_fun() | request_from().
|
|
|
-type request_from() :: undefined | gen_statem:from().
|
|
|
-type state() :: blocked | running.
|
|
|
-type inflight_key() :: integer().
|
|
|
@@ -913,7 +912,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R
|
|
|
?tp(call_batch_query, #{
|
|
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
|
|
|
}),
|
|
|
- Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
|
|
|
+ Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
|
|
|
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
|
|
|
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
?tp(call_batch_query_async, #{
|
|
|
@@ -925,7 +924,9 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
|
|
|
begin
|
|
|
ReplyFun = fun ?MODULE:handle_async_batch_reply/8,
|
|
|
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
|
|
|
- Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
|
|
|
+ Requests = lists:map(
|
|
|
+ fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
|
|
|
+ ),
|
|
|
IsRetriable = false,
|
|
|
WorkerMRef = undefined,
|
|
|
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
|