emqx_resource_buffer_worker.erl 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% This module implements async message sending, disk message queuing,
  17. %% and message batching using ReplayQ.
  18. -module(emqx_resource_buffer_worker).
  19. -include("emqx_resource.hrl").
  20. -include("emqx_resource_errors.hrl").
  21. -include_lib("emqx/include/logger.hrl").
  22. -include_lib("stdlib/include/ms_transform.hrl").
  23. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  24. -behaviour(gen_statem).
  25. -export([
  26. start_link/3,
  27. sync_query/3,
  28. async_query/3,
  29. block/1,
  30. resume/1,
  31. flush_worker/1
  32. ]).
  33. -export([
  34. simple_sync_query/2,
  35. simple_async_query/3
  36. ]).
  37. -export([
  38. callback_mode/0,
  39. init/1,
  40. terminate/2,
  41. code_change/3
  42. ]).
  43. -export([running/3, blocked/3]).
  44. -export([queue_item_marshaller/1, estimate_size/1]).
  45. -export([handle_async_reply/2, handle_async_batch_reply/2]).
  46. -export([clear_disk_queue_dir/2]).
  47. -elvis([{elvis_style, dont_repeat_yourself, disable}]).
  48. -define(COLLECT_REQ_LIMIT, 1000).
  49. -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
  50. -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
  51. -define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
  52. -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
  53. -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
  54. {Ref, BatchOrQuery, IsRetriable, WorkerMRef}
  55. ).
  56. -define(ITEM_IDX, 2).
  57. -define(RETRY_IDX, 3).
  58. -define(WORKER_MREF_IDX, 4).
  59. -define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR),
  60. (fun() ->
  61. IsFullBefore = is_inflight_full(InflightTID),
  62. case (EXPR) of
  63. blocked ->
  64. ok;
  65. ok ->
  66. ok = maybe_flush_after_async_reply(IsFullBefore)
  67. end
  68. end)()
  69. ).
  70. -type id() :: binary().
  71. -type index() :: pos_integer().
  72. -type expire_at() :: infinity | integer().
  73. -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
  74. -type request() :: term().
  75. -type request_from() :: undefined | gen_statem:from().
  76. -type state() :: blocked | running.
  77. -type inflight_key() :: integer().
  78. -type data() :: #{
  79. id := id(),
  80. index := index(),
  81. inflight_tid := ets:tid(),
  82. async_workers := #{pid() => reference()},
  83. batch_size := pos_integer(),
  84. batch_time := timer:time(),
  85. queue := replayq:q(),
  86. resume_interval := timer:time(),
  87. tref := undefined | timer:tref()
  88. }.
  89. callback_mode() -> [state_functions, state_enter].
  90. start_link(Id, Index, Opts) ->
  91. gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
  92. -spec sync_query(id(), request(), query_opts()) -> Result :: term().
  93. sync_query(Id, Request, Opts0) ->
  94. ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}),
  95. Opts1 = ensure_timeout_query_opts(Opts0, sync),
  96. Opts = ensure_expire_at(Opts1),
  97. PickKey = maps:get(pick_key, Opts, self()),
  98. Timeout = maps:get(timeout, Opts),
  99. emqx_resource_metrics:matched_inc(Id),
  100. pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
  101. -spec async_query(id(), request(), query_opts()) -> Result :: term().
  102. async_query(Id, Request, Opts0) ->
  103. ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}),
  104. Opts1 = ensure_timeout_query_opts(Opts0, async),
  105. Opts = ensure_expire_at(Opts1),
  106. PickKey = maps:get(pick_key, Opts, self()),
  107. emqx_resource_metrics:matched_inc(Id),
  108. pick_cast(Id, PickKey, {query, Request, Opts}).
  109. %% simple query the resource without batching and queuing.
  110. -spec simple_sync_query(id(), request()) -> term().
  111. simple_sync_query(Id, Request) ->
  112. %% Note: since calling this function implies in bypassing the
  113. %% buffer workers, and each buffer worker index is used when
  114. %% collecting gauge metrics, we use this dummy index. If this
  115. %% call ends up calling buffering functions, that's a bug and
  116. %% would mess up the metrics anyway. `undefined' is ignored by
  117. %% `emqx_resource_metrics:*_shift/3'.
  118. ?tp(simple_sync_query, #{id => Id, request => Request}),
  119. Index = undefined,
  120. QueryOpts = simple_query_opts(),
  121. emqx_resource_metrics:matched_inc(Id),
  122. Ref = make_request_ref(),
  123. Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
  124. _ = handle_query_result(Id, Result, _HasBeenSent = false),
  125. Result.
  126. %% simple async-query the resource without batching and queuing.
  127. -spec simple_async_query(id(), request(), query_opts()) -> term().
  128. simple_async_query(Id, Request, QueryOpts0) ->
  129. ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
  130. Index = undefined,
  131. QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
  132. emqx_resource_metrics:matched_inc(Id),
  133. Ref = make_request_ref(),
  134. Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
  135. _ = handle_query_result(Id, Result, _HasBeenSent = false),
  136. Result.
  137. simple_query_opts() ->
  138. ensure_expire_at(#{simple_query => true, timeout => infinity}).
  139. -spec block(pid()) -> ok.
  140. block(ServerRef) ->
  141. gen_statem:cast(ServerRef, block).
  142. -spec resume(pid()) -> ok.
  143. resume(ServerRef) ->
  144. gen_statem:cast(ServerRef, resume).
  145. -spec flush_worker(pid()) -> ok.
  146. flush_worker(ServerRef) ->
  147. gen_statem:cast(ServerRef, flush).
  148. -spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()).
  149. init({Id, Index, Opts}) ->
  150. process_flag(trap_exit, true),
  151. true = gproc_pool:connect_worker(Id, {Id, Index}),
  152. BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
  153. SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
  154. TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
  155. SegBytes = min(SegBytes0, TotalBytes),
  156. QueueOpts =
  157. #{
  158. dir => disk_queue_dir(Id, Index),
  159. marshaller => fun ?MODULE:queue_item_marshaller/1,
  160. max_total_bytes => TotalBytes,
  161. %% we don't want to retain the queue after
  162. %% resource restarts.
  163. offload => {true, volatile},
  164. seg_bytes => SegBytes,
  165. sizer => fun ?MODULE:estimate_size/1
  166. },
  167. Queue = replayq:open(QueueOpts),
  168. emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
  169. emqx_resource_metrics:inflight_set(Id, Index, 0),
  170. InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
  171. InflightTID = inflight_new(InflightWinSize, Id, Index),
  172. HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
  173. Data = #{
  174. id => Id,
  175. index => Index,
  176. inflight_tid => InflightTID,
  177. async_workers => #{},
  178. batch_size => BatchSize,
  179. batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
  180. queue => Queue,
  181. resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
  182. tref => undefined
  183. },
  184. ?tp(buffer_worker_init, #{id => Id, index => Index}),
  185. {ok, running, Data}.
  186. running(enter, _, #{tref := _Tref} = Data) ->
  187. ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}),
  188. %% According to `gen_statem' laws, we mustn't call `maybe_flush'
  189. %% directly because it may decide to return `{next_state, blocked, _}',
  190. %% and that's an invalid response for a state enter call.
  191. %% Returning a next event from a state enter call is also
  192. %% prohibited.
  193. {keep_state, ensure_flush_timer(Data, 0)};
  194. running(cast, resume, _St) ->
  195. keep_state_and_data;
  196. running(cast, flush, Data) ->
  197. flush(Data);
  198. running(cast, block, St) ->
  199. {next_state, blocked, St};
  200. running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
  201. handle_query_requests(Request0, Data);
  202. running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
  203. flush(St#{tref := undefined});
  204. running(info, {flush, _Ref}, _St) ->
  205. ?tp(discarded_stale_flush, #{}),
  206. keep_state_and_data;
  207. running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
  208. is_map_key(Pid, AsyncWorkers0)
  209. ->
  210. ?SLOG(info, #{msg => async_worker_died, state => running, reason => Reason}),
  211. handle_async_worker_down(Data0, Pid);
  212. running(info, Info, _St) ->
  213. ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}),
  214. keep_state_and_data.
  215. blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
  216. ?tp(buffer_worker_enter_blocked, #{}),
  217. %% discard the old timer, new timer will be started when entering running state again
  218. St = cancel_flush_timer(St0),
  219. {keep_state, St, {state_timeout, ResumeT, unblock}};
  220. blocked(cast, block, _St) ->
  221. keep_state_and_data;
  222. blocked(cast, resume, St) ->
  223. resume_from_blocked(St);
  224. blocked(cast, flush, St) ->
  225. resume_from_blocked(St);
  226. blocked(state_timeout, unblock, St) ->
  227. resume_from_blocked(St);
  228. blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
  229. Data = collect_and_enqueue_query_requests(Request0, Data0),
  230. {keep_state, Data};
  231. blocked(info, {flush, _Ref}, _Data) ->
  232. %% ignore stale timer
  233. keep_state_and_data;
  234. blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
  235. is_map_key(Pid, AsyncWorkers0)
  236. ->
  237. ?SLOG(info, #{msg => async_worker_died, state => blocked, reason => Reason}),
  238. handle_async_worker_down(Data0, Pid);
  239. blocked(info, Info, _Data) ->
  240. ?SLOG(error, #{msg => unexpected_msg, state => blocked, info => Info}),
  241. keep_state_and_data.
  242. terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
  243. _ = replayq:close(Q),
  244. emqx_resource_metrics:inflight_set(Id, Index, 0),
  245. %% since we want volatile queues, this will be 0 after
  246. %% termination.
  247. emqx_resource_metrics:queuing_set(Id, Index, 0),
  248. gproc_pool:disconnect_worker(Id, {Id, Index}),
  249. ok.
  250. code_change(_OldVsn, State, _Extra) ->
  251. {ok, State}.
  252. %%==============================================================================
  253. -define(PICK(ID, KEY, PID, EXPR),
  254. try
  255. case gproc_pool:pick_worker(ID, KEY) of
  256. PID when is_pid(PID) ->
  257. EXPR;
  258. _ ->
  259. ?RESOURCE_ERROR(worker_not_created, "resource not created")
  260. end
  261. catch
  262. error:badarg ->
  263. ?RESOURCE_ERROR(worker_not_created, "resource not created");
  264. error:timeout ->
  265. ?RESOURCE_ERROR(timeout, "call resource timeout")
  266. end
  267. ).
  268. pick_call(Id, Key, Query, Timeout) ->
  269. ?PICK(Id, Key, Pid, begin
  270. Caller = self(),
  271. MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
  272. From = {Caller, MRef},
  273. ReplyTo = {fun gen_statem:reply/2, [From]},
  274. erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
  275. receive
  276. {MRef, Response} ->
  277. erlang:demonitor(MRef, [flush]),
  278. Response;
  279. {'DOWN', MRef, process, Pid, Reason} ->
  280. error({worker_down, Reason})
  281. after Timeout ->
  282. erlang:demonitor(MRef, [flush]),
  283. receive
  284. {MRef, Response} ->
  285. Response
  286. after 0 ->
  287. error(timeout)
  288. end
  289. end
  290. end).
  291. pick_cast(Id, Key, Query) ->
  292. ?PICK(Id, Key, Pid, begin
  293. ReplyTo = undefined,
  294. erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
  295. ok
  296. end).
  297. resume_from_blocked(Data) ->
  298. ?tp(buffer_worker_resume_from_blocked_enter, #{}),
  299. #{
  300. id := Id,
  301. index := Index,
  302. inflight_tid := InflightTID
  303. } = Data,
  304. Now = now_(),
  305. case inflight_get_first_retriable(InflightTID, Now) of
  306. none ->
  307. case is_inflight_full(InflightTID) of
  308. true ->
  309. {keep_state, Data};
  310. false ->
  311. {next_state, running, Data}
  312. end;
  313. {expired, Ref, Batch} ->
  314. IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
  315. IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
  316. ?tp(buffer_worker_retry_expired, #{expired => Batch}),
  317. resume_from_blocked(Data);
  318. {single, Ref, Query} ->
  319. %% We retry msgs in inflight window sync, as if we send them
  320. %% async, they will be appended to the end of inflight window again.
  321. retry_inflight_sync(Ref, Query, Data);
  322. {batch, Ref, NotExpired, []} ->
  323. retry_inflight_sync(Ref, NotExpired, Data);
  324. {batch, Ref, NotExpired, Expired} ->
  325. NumExpired = length(Expired),
  326. ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
  327. emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
  328. ?tp(buffer_worker_retry_expired, #{expired => Expired}),
  329. %% We retry msgs in inflight window sync, as if we send them
  330. %% async, they will be appended to the end of inflight window again.
  331. retry_inflight_sync(Ref, NotExpired, Data)
  332. end.
  333. retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
  334. #{
  335. id := Id,
  336. inflight_tid := InflightTID,
  337. index := Index,
  338. resume_interval := ResumeT
  339. } = Data0,
  340. ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
  341. QueryOpts = #{simple_query => false},
  342. Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
  343. ReplyResult =
  344. case QueryOrBatch of
  345. ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
  346. Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
  347. reply_caller_defer_metrics(Id, Reply, QueryOpts);
  348. [?QUERY(_, _, _, _) | _] = Batch ->
  349. batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
  350. end,
  351. case ReplyResult of
  352. %% Send failed because resource is down
  353. {nack, PostFn} ->
  354. PostFn(),
  355. ?tp(
  356. buffer_worker_retry_inflight_failed,
  357. #{
  358. ref => Ref,
  359. query_or_batch => QueryOrBatch
  360. }
  361. ),
  362. {keep_state, Data0, {state_timeout, ResumeT, unblock}};
  363. %% Send ok or failed but the resource is working
  364. {ack, PostFn} ->
  365. IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
  366. %% we need to defer bumping the counters after
  367. %% `inflight_drop' to avoid the race condition when an
  368. %% inflight request might get completed concurrently with
  369. %% the retry, bumping them twice. Since both inflight
  370. %% requests (repeated and original) have the safe `Ref',
  371. %% we bump the counter when removing it from the table.
  372. IsAcked andalso PostFn(),
  373. ?tp(
  374. buffer_worker_retry_inflight_succeeded,
  375. #{
  376. ref => Ref,
  377. query_or_batch => QueryOrBatch
  378. }
  379. ),
  380. resume_from_blocked(Data0)
  381. end.
  382. %% Called during the `running' state only.
  383. -spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) ->
  384. gen_statem:event_handler_result(state(), data()).
  385. handle_query_requests(Request0, Data0) ->
  386. Data = collect_and_enqueue_query_requests(Request0, Data0),
  387. maybe_flush(Data).
  388. collect_and_enqueue_query_requests(Request0, Data0) ->
  389. #{
  390. id := Id,
  391. index := Index,
  392. queue := Q
  393. } = Data0,
  394. Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT),
  395. Queries =
  396. lists:map(
  397. fun
  398. (?SEND_REQ(undefined = _ReplyTo, {query, Req, Opts})) ->
  399. ReplyFun = maps:get(async_reply_fun, Opts, undefined),
  400. HasBeenSent = false,
  401. ExpireAt = maps:get(expire_at, Opts),
  402. ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
  403. (?SEND_REQ(ReplyTo, {query, Req, Opts})) ->
  404. HasBeenSent = false,
  405. ExpireAt = maps:get(expire_at, Opts),
  406. ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt)
  407. end,
  408. Requests
  409. ),
  410. {Overflown, NewQ} = append_queue(Id, Index, Q, Queries),
  411. ok = reply_overflown(Overflown),
  412. Data0#{queue := NewQ}.
  413. reply_overflown([]) ->
  414. ok;
  415. reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) ->
  416. do_reply_caller(ReplyTo, {error, buffer_overflow}),
  417. reply_overflown(More).
  418. do_reply_caller(undefined, _Result) ->
  419. ok;
  420. do_reply_caller({F, Args}, {async_return, Result}) ->
  421. %% this is an early return to async caller, the retry
  422. %% decision has to be made by the caller
  423. do_reply_caller({F, Args}, Result);
  424. do_reply_caller({F, Args}, Result) when is_function(F) ->
  425. _ = erlang:apply(F, Args ++ [Result]),
  426. ok.
  427. maybe_flush(Data0) ->
  428. #{
  429. batch_size := BatchSize,
  430. queue := Q
  431. } = Data0,
  432. QueueCount = queue_count(Q),
  433. case QueueCount >= BatchSize of
  434. true ->
  435. flush(Data0);
  436. false ->
  437. {keep_state, ensure_flush_timer(Data0)}
  438. end.
  439. %% Called during the `running' state only.
  440. -spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
  441. flush(Data0) ->
  442. #{
  443. id := Id,
  444. index := Index,
  445. batch_size := BatchSize,
  446. inflight_tid := InflightTID,
  447. queue := Q0
  448. } = Data0,
  449. Data1 = cancel_flush_timer(Data0),
  450. CurrentCount = queue_count(Q0),
  451. IsFull = is_inflight_full(InflightTID),
  452. ?tp(buffer_worker_flush, #{
  453. queued => CurrentCount,
  454. is_inflight_full => IsFull,
  455. inflight => inflight_count(InflightTID)
  456. }),
  457. case {CurrentCount, IsFull} of
  458. {0, _} ->
  459. ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}),
  460. {keep_state, Data1};
  461. {_, true} ->
  462. ?tp(buffer_worker_flush_but_inflight_full, #{}),
  463. Data2 = ensure_flush_timer(Data1),
  464. {keep_state, Data2};
  465. {_, false} ->
  466. ?tp(buffer_worker_flush_before_pop, #{}),
  467. {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
  468. Data2 = Data1#{queue := Q1},
  469. ?tp(buffer_worker_flush_before_sieve_expired, #{}),
  470. Now = now_(),
  471. %% if the request has expired, the caller is no longer
  472. %% waiting for a response.
  473. case sieve_expired_requests(Batch, Now) of
  474. {[], _AllExpired} ->
  475. ok = replayq:ack(Q1, QAckRef),
  476. emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
  477. emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
  478. ?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
  479. flush(Data2);
  480. {NotExpired, Expired} ->
  481. NumExpired = length(Expired),
  482. emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
  483. IsBatch = (BatchSize > 1),
  484. %% We *must* use the new queue, because we currently can't
  485. %% `nack' a `pop'.
  486. %% Maybe we could re-open the queue?
  487. ?tp(
  488. buffer_worker_flush_potentially_partial,
  489. #{expired => Expired, not_expired => NotExpired}
  490. ),
  491. Ref = make_request_ref(),
  492. do_flush(Data2, #{
  493. is_batch => IsBatch,
  494. batch => NotExpired,
  495. ref => Ref,
  496. ack_ref => QAckRef
  497. })
  498. end
  499. end.
  500. -spec do_flush(data(), #{
  501. is_batch := boolean(),
  502. batch := [queue_query()],
  503. ack_ref := replayq:ack_ref(),
  504. ref := inflight_key()
  505. }) ->
  506. gen_statem:event_handler_result(state(), data()).
  507. do_flush(
  508. #{queue := Q1} = Data0,
  509. #{
  510. is_batch := false,
  511. batch := Batch,
  512. ref := Ref,
  513. ack_ref := QAckRef
  514. }
  515. ) ->
  516. #{
  517. id := Id,
  518. index := Index,
  519. inflight_tid := InflightTID
  520. } = Data0,
  521. %% unwrap when not batching (i.e., batch size == 1)
  522. [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
  523. QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
  524. Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
  525. Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
  526. case reply_caller(Id, Reply, QueryOpts) of
  527. %% Failed; remove the request from the queue, as we cannot pop
  528. %% from it again, but we'll retry it using the inflight table.
  529. nack ->
  530. ok = replayq:ack(Q1, QAckRef),
  531. %% we set it atomically just below; a limitation of having
  532. %% to use tuples for atomic ets updates
  533. IsRetriable = true,
  534. WorkerMRef0 = undefined,
  535. InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0),
  536. %% we must append again to the table to ensure that the
  537. %% request will be retried (i.e., it might not have been
  538. %% inserted during `call_query' if the resource was down
  539. %% and/or if it was a sync request).
  540. inflight_append(InflightTID, InflightItem, Id, Index),
  541. mark_inflight_as_retriable(InflightTID, Ref),
  542. {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
  543. store_async_worker_reference(InflightTID, Ref, WorkerMRef),
  544. emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
  545. ?tp(
  546. buffer_worker_flush_nack,
  547. #{
  548. ref => Ref,
  549. is_retriable => IsRetriable,
  550. batch_or_query => Request,
  551. result => Result
  552. }
  553. ),
  554. {next_state, blocked, Data1};
  555. %% Success; just ack.
  556. ack ->
  557. ok = replayq:ack(Q1, QAckRef),
  558. %% Async requests are acked later when the async worker
  559. %% calls the corresponding callback function. Also, we
  560. %% must ensure the async worker is being monitored for
  561. %% such requests.
  562. IsUnrecoverableError = is_unrecoverable_error(Result),
  563. case is_async_return(Result) of
  564. true when IsUnrecoverableError ->
  565. ack_inflight(InflightTID, Ref, Id, Index);
  566. true ->
  567. ok;
  568. false ->
  569. ack_inflight(InflightTID, Ref, Id, Index)
  570. end,
  571. {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
  572. store_async_worker_reference(InflightTID, Ref, WorkerMRef),
  573. emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
  574. ?tp(
  575. buffer_worker_flush_ack,
  576. #{
  577. batch_or_query => Request,
  578. result => Result
  579. }
  580. ),
  581. CurrentCount = queue_count(Q1),
  582. case CurrentCount > 0 of
  583. true ->
  584. ?tp(buffer_worker_flush_ack_reflush, #{
  585. batch_or_query => Request, result => Result, queue_count => CurrentCount
  586. }),
  587. flush_worker(self());
  588. false ->
  589. ?tp(buffer_worker_queue_drained, #{
  590. inflight => inflight_count(InflightTID)
  591. }),
  592. ok
  593. end,
  594. {keep_state, Data1}
  595. end;
  596. do_flush(#{queue := Q1} = Data0, #{
  597. is_batch := true,
  598. batch := Batch,
  599. ref := Ref,
  600. ack_ref := QAckRef
  601. }) ->
  602. #{
  603. id := Id,
  604. index := Index,
  605. batch_size := BatchSize,
  606. inflight_tid := InflightTID
  607. } = Data0,
  608. QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
  609. Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
  610. case batch_reply_caller(Id, Result, Batch, QueryOpts) of
  611. %% Failed; remove the request from the queue, as we cannot pop
  612. %% from it again, but we'll retry it using the inflight table.
  613. nack ->
  614. ok = replayq:ack(Q1, QAckRef),
  615. %% we set it atomically just below; a limitation of having
  616. %% to use tuples for atomic ets updates
  617. IsRetriable = true,
  618. WorkerMRef0 = undefined,
  619. InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0),
  620. %% we must append again to the table to ensure that the
  621. %% request will be retried (i.e., it might not have been
  622. %% inserted during `call_query' if the resource was down
  623. %% and/or if it was a sync request).
  624. inflight_append(InflightTID, InflightItem, Id, Index),
  625. mark_inflight_as_retriable(InflightTID, Ref),
  626. {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
  627. store_async_worker_reference(InflightTID, Ref, WorkerMRef),
  628. emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
  629. ?tp(
  630. buffer_worker_flush_nack,
  631. #{
  632. ref => Ref,
  633. is_retriable => IsRetriable,
  634. batch_or_query => Batch,
  635. result => Result
  636. }
  637. ),
  638. {next_state, blocked, Data1};
  639. %% Success; just ack.
  640. ack ->
  641. ok = replayq:ack(Q1, QAckRef),
  642. %% Async requests are acked later when the async worker
  643. %% calls the corresponding callback function. Also, we
  644. %% must ensure the async worker is being monitored for
  645. %% such requests.
  646. IsUnrecoverableError = is_unrecoverable_error(Result),
  647. case is_async_return(Result) of
  648. true when IsUnrecoverableError ->
  649. ack_inflight(InflightTID, Ref, Id, Index);
  650. true ->
  651. ok;
  652. false ->
  653. ack_inflight(InflightTID, Ref, Id, Index)
  654. end,
  655. {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
  656. store_async_worker_reference(InflightTID, Ref, WorkerMRef),
  657. emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
  658. CurrentCount = queue_count(Q1),
  659. ?tp(
  660. buffer_worker_flush_ack,
  661. #{
  662. batch_or_query => Batch,
  663. result => Result,
  664. queue_count => CurrentCount
  665. }
  666. ),
  667. Data2 =
  668. case {CurrentCount > 0, CurrentCount >= BatchSize} of
  669. {false, _} ->
  670. ?tp(buffer_worker_queue_drained, #{
  671. inflight => inflight_count(InflightTID)
  672. }),
  673. Data1;
  674. {true, true} ->
  675. ?tp(buffer_worker_flush_ack_reflush, #{
  676. batch_or_query => Batch,
  677. result => Result,
  678. queue_count => CurrentCount,
  679. batch_size => BatchSize
  680. }),
  681. flush_worker(self()),
  682. Data1;
  683. {true, false} ->
  684. ensure_flush_timer(Data1)
  685. end,
  686. {keep_state, Data2}
  687. end.
  688. batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
  689. {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts),
  690. PostFn(),
  691. ShouldBlock.
  692. batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
  693. %% the `Mod:on_batch_query/3` returns a single result for a batch,
  694. %% so we need to expand
  695. Replies = lists:map(
  696. fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
  697. ?REPLY(FROM, SENT, BatchResult)
  698. end,
  699. Batch
  700. ),
  701. {ShouldAck, PostFns} =
  702. lists:foldl(
  703. fun(Reply, {_ShouldAck, PostFns}) ->
  704. %% _ShouldAck should be the same as ShouldAck starting from the second reply
  705. {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
  706. {ShouldAck, [PostFn | PostFns]}
  707. end,
  708. {ack, []},
  709. Replies
  710. ),
  711. PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
  712. {ShouldAck, PostFn}.
  713. reply_caller(Id, Reply, QueryOpts) ->
  714. {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
  715. PostFn(),
  716. ShouldAck.
  717. %% Should only reply to the caller when the decision is final (not
  718. %% retriable). See comment on `handle_query_result_pure'.
  719. reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
  720. handle_query_result_pure(Id, Result, HasBeenSent);
  721. reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) ->
  722. IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
  723. IsUnrecoverableError = is_unrecoverable_error(Result),
  724. {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
  725. case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
  726. {ack, {async_return, _}, true, _} ->
  727. ok = do_reply_caller(ReplyTo, Result);
  728. {ack, {async_return, _}, false, _} ->
  729. ok;
  730. {_, _, _, true} ->
  731. ok = do_reply_caller(ReplyTo, Result);
  732. {nack, _, _, _} ->
  733. ok;
  734. {ack, _, _, _} ->
  735. ok = do_reply_caller(ReplyTo, Result)
  736. end,
  737. {ShouldAck, PostFn}.
  738. handle_query_result(Id, Result, HasBeenSent) ->
  739. {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
  740. PostFn(),
  741. ShouldBlock.
  742. %% We should always retry (nack), except when:
  743. %% * resource is not found
  744. %% * resource is stopped
  745. %% * the result is a success (or at least a delayed result)
  746. %% We also retry even sync requests. In that case, we shouldn't reply
  747. %% the caller until one of those final results above happen.
  748. handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
  749. PostFn = fun() ->
  750. ?SLOG(error, #{msg => resource_exception, info => Msg}),
  751. ok
  752. end,
  753. {nack, PostFn};
  754. handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
  755. NotWorking == not_connected; NotWorking == blocked
  756. ->
  757. {nack, fun() -> ok end};
  758. handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) ->
  759. PostFn = fun() ->
  760. ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
  761. emqx_resource_metrics:dropped_resource_not_found_inc(Id),
  762. ok
  763. end,
  764. {ack, PostFn};
  765. handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
  766. PostFn = fun() ->
  767. ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
  768. emqx_resource_metrics:dropped_resource_stopped_inc(Id),
  769. ok
  770. end,
  771. {ack, PostFn};
  772. handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
  773. PostFn = fun() ->
  774. ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
  775. ok
  776. end,
  777. {nack, PostFn};
  778. handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
  779. case is_unrecoverable_error(Error) of
  780. true ->
  781. PostFn =
  782. fun() ->
  783. ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
  784. inc_sent_failed(Id, HasBeenSent),
  785. ok
  786. end,
  787. {ack, PostFn};
  788. false ->
  789. PostFn =
  790. fun() ->
  791. ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
  792. ok
  793. end,
  794. {nack, PostFn}
  795. end;
  796. handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
  797. handle_query_async_result_pure(Id, Result, HasBeenSent);
  798. handle_query_result_pure(Id, Result, HasBeenSent) ->
  799. PostFn = fun() ->
  800. assert_ok_result(Result),
  801. inc_sent_success(Id, HasBeenSent),
  802. ok
  803. end,
  804. {ack, PostFn}.
  805. handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
  806. case is_unrecoverable_error(Error) of
  807. true ->
  808. PostFn =
  809. fun() ->
  810. ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
  811. inc_sent_failed(Id, HasBeenSent),
  812. ok
  813. end,
  814. {ack, PostFn};
  815. false ->
  816. PostFn = fun() ->
  817. ?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}),
  818. ok
  819. end,
  820. {nack, PostFn}
  821. end;
  822. handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
  823. {ack, fun() -> ok end};
  824. handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
  825. {ack, fun() -> ok end}.
  826. handle_async_worker_down(Data0, Pid) ->
  827. #{async_workers := AsyncWorkers0} = Data0,
  828. {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
  829. Data = Data0#{async_workers := AsyncWorkers},
  830. mark_inflight_items_as_retriable(Data, WorkerMRef),
  831. {keep_state, Data}.
  832. call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
  833. ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}),
  834. case emqx_resource_manager:ets_lookup(Id) of
  835. {ok, _Group, #{status := stopped}} ->
  836. ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
  837. {ok, _Group, Resource} ->
  838. QM =
  839. case QM0 =:= configured of
  840. true -> maps:get(query_mode, Resource);
  841. false -> QM0
  842. end,
  843. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
  844. {error, not_found} ->
  845. ?RESOURCE_ERROR(not_found, "resource not found")
  846. end.
  847. do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) ->
  848. %% The connector supports buffer, send even in disconnected state
  849. #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
  850. CallMode = call_mode(QM, CBM),
  851. apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
  852. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
  853. %% when calling from the buffer worker or other simple queries,
  854. %% only apply the query fun when it's at connected status
  855. #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
  856. CallMode = call_mode(QM, CBM),
  857. apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
  858. do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
  859. ?RESOURCE_ERROR(not_connected, "resource not connected").
  860. -define(APPLY_RESOURCE(NAME, EXPR, REQ),
  861. try
  862. %% if the callback module (connector) wants to return an error that
  863. %% makes the current resource goes into the `blocked` state, it should
  864. %% return `{error, {recoverable_error, Reason}}`
  865. EXPR
  866. catch
  867. ERR:REASON:STACKTRACE ->
  868. ?RESOURCE_ERROR(exception, #{
  869. name => NAME,
  870. id => Id,
  871. request => REQ,
  872. error => {ERR, REASON},
  873. stacktrace => STACKTRACE
  874. })
  875. end
  876. ).
  877. apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
  878. ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
  879. ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
  880. apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
  881. ?tp(call_query_async, #{
  882. id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
  883. }),
  884. InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
  885. ?APPLY_RESOURCE(
  886. call_query_async,
  887. begin
  888. ReplyFun = fun ?MODULE:handle_async_reply/2,
  889. ReplyContext = #{
  890. buffer_worker => self(),
  891. resource_id => Id,
  892. worker_index => Index,
  893. inflight_tid => InflightTID,
  894. request_ref => Ref,
  895. query_opts => QueryOpts,
  896. min_query => minimize(Query)
  897. },
  898. IsRetriable = false,
  899. WorkerMRef = undefined,
  900. InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
  901. ok = inflight_append(InflightTID, InflightItem, Id, Index),
  902. Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt),
  903. {async_return, Result}
  904. end,
  905. Request
  906. );
  907. apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) ->
  908. ?tp(call_batch_query, #{
  909. id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
  910. }),
  911. Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
  912. ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
  913. apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
  914. ?tp(call_batch_query_async, #{
  915. id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
  916. }),
  917. InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
  918. ?APPLY_RESOURCE(
  919. call_batch_query_async,
  920. begin
  921. ReplyFun = fun ?MODULE:handle_async_batch_reply/2,
  922. ReplyContext = #{
  923. buffer_worker => self(),
  924. resource_id => Id,
  925. worker_index => Index,
  926. inflight_tid => InflightTID,
  927. request_ref => Ref,
  928. query_opts => QueryOpts,
  929. min_batch => minimize(Batch)
  930. },
  931. Requests = lists:map(
  932. fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
  933. ),
  934. IsRetriable = false,
  935. WorkerMRef = undefined,
  936. InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
  937. ok = inflight_append(InflightTID, InflightItem, Id, Index),
  938. Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt),
  939. {async_return, Result}
  940. end,
  941. Batch
  942. ).
  943. handle_async_reply(
  944. #{
  945. request_ref := Ref,
  946. inflight_tid := InflightTID,
  947. query_opts := Opts
  948. } = ReplyContext,
  949. Result
  950. ) ->
  951. case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
  952. discard ->
  953. ok;
  954. continue ->
  955. ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result))
  956. end.
  957. handle_async_reply1(
  958. #{
  959. request_ref := Ref,
  960. inflight_tid := InflightTID,
  961. resource_id := Id,
  962. worker_index := Index,
  963. min_query := ?QUERY(_, _, _, ExpireAt) = _Query
  964. } = ReplyContext,
  965. Result
  966. ) ->
  967. ?tp(
  968. handle_async_reply_enter,
  969. #{batch_or_query => [_Query], ref => Ref, result => Result}
  970. ),
  971. Now = now_(),
  972. case is_expired(ExpireAt, Now) of
  973. true ->
  974. IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
  975. IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
  976. ?tp(handle_async_reply_expired, #{expired => [_Query]}),
  977. ok;
  978. false ->
  979. do_handle_async_reply(ReplyContext, Result)
  980. end.
  981. do_handle_async_reply(
  982. #{
  983. query_opts := QueryOpts,
  984. resource_id := Id,
  985. request_ref := Ref,
  986. worker_index := Index,
  987. buffer_worker := Pid,
  988. inflight_tid := InflightTID,
  989. min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
  990. },
  991. Result
  992. ) ->
  993. %% NOTE: 'inflight' is the count of messages that were sent async
  994. %% but received no ACK, NOT the number of messages queued in the
  995. %% inflight window.
  996. {Action, PostFn} = reply_caller_defer_metrics(
  997. Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
  998. ),
  999. ?tp(handle_async_reply, #{
  1000. action => Action,
  1001. batch_or_query => [_Query],
  1002. ref => Ref,
  1003. result => Result
  1004. }),
  1005. case Action of
  1006. nack ->
  1007. %% Keep retrying.
  1008. ok = mark_inflight_as_retriable(InflightTID, Ref),
  1009. ok = ?MODULE:block(Pid),
  1010. blocked;
  1011. ack ->
  1012. ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
  1013. end.
  1014. handle_async_batch_reply(
  1015. #{
  1016. inflight_tid := InflightTID,
  1017. request_ref := Ref,
  1018. query_opts := Opts
  1019. } = ReplyContext,
  1020. Result
  1021. ) ->
  1022. case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of
  1023. discard ->
  1024. ok;
  1025. continue ->
  1026. ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result))
  1027. end.
  1028. handle_async_batch_reply1(
  1029. #{
  1030. inflight_tid := InflightTID,
  1031. request_ref := Ref,
  1032. min_batch := Batch
  1033. } = ReplyContext,
  1034. Result
  1035. ) ->
  1036. ?tp(
  1037. handle_async_reply_enter,
  1038. #{batch_or_query => Batch, ref => Ref, result => Result}
  1039. ),
  1040. Now = now_(),
  1041. case sieve_expired_requests(Batch, Now) of
  1042. {_NotExpired, []} ->
  1043. %% this is the critical code path,
  1044. %% we try not to do ets:lookup in this case
  1045. %% because the batch can be quite big
  1046. do_handle_async_batch_reply(ReplyContext, Result);
  1047. {_NotExpired, _Expired} ->
  1048. %% at least one is expired
  1049. %% the batch from reply context is minimized, so it cannot be used
  1050. %% to update the inflight items, hence discard Batch and lookup the RealBatch
  1051. ?tp(handle_async_reply_expired, #{expired => _Expired}),
  1052. handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now)
  1053. end.
  1054. handle_async_batch_reply2([], _, _, _) ->
  1055. %% this usually should never happen unless the async callback is being evaluated concurrently
  1056. ok;
  1057. handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
  1058. ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight,
  1059. #{
  1060. resource_id := Id,
  1061. worker_index := Index,
  1062. inflight_tid := InflightTID,
  1063. request_ref := Ref,
  1064. min_batch := Batch
  1065. } = ReplyContext,
  1066. %% All batch items share the same HasBeenSent flag
  1067. %% So we just take the original flag from the ReplyContext batch
  1068. %% and put it back to the batch found in inflight table
  1069. %% which must have already been set to `false`
  1070. [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
  1071. {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
  1072. RealNotExpired =
  1073. lists:map(
  1074. fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
  1075. ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
  1076. end,
  1077. RealNotExpired0
  1078. ),
  1079. NumExpired = length(RealExpired),
  1080. emqx_resource_metrics:late_reply_inc(Id, NumExpired),
  1081. case RealNotExpired of
  1082. [] ->
  1083. %% all expired, no need to update back the inflight batch
  1084. _ = ack_inflight(InflightTID, Ref, Id, Index),
  1085. ok;
  1086. _ ->
  1087. %% some queries are not expired, put them back to the inflight batch
  1088. %% so it can be either acked now or retried later
  1089. ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired),
  1090. do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
  1091. end.
  1092. do_handle_async_batch_reply(
  1093. #{
  1094. buffer_worker := Pid,
  1095. resource_id := Id,
  1096. worker_index := Index,
  1097. inflight_tid := InflightTID,
  1098. request_ref := Ref,
  1099. min_batch := Batch,
  1100. query_opts := QueryOpts
  1101. },
  1102. Result
  1103. ) ->
  1104. {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
  1105. ?tp(handle_async_reply, #{
  1106. action => Action,
  1107. batch_or_query => Batch,
  1108. ref => Ref,
  1109. result => Result
  1110. }),
  1111. case Action of
  1112. nack ->
  1113. %% Keep retrying.
  1114. ok = mark_inflight_as_retriable(InflightTID, Ref),
  1115. ok = ?MODULE:block(Pid),
  1116. blocked;
  1117. ack ->
  1118. ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
  1119. end.
  1120. do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
  1121. IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
  1122. case maps:get(simple_query, QueryOpts, false) of
  1123. true ->
  1124. PostFn();
  1125. false when IsKnownRef ->
  1126. PostFn();
  1127. false ->
  1128. ok
  1129. end,
  1130. ok.
  1131. maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) ->
  1132. %% inflight was not full before async reply is handled,
  1133. %% after it is handled, the inflight table must be even smaller
  1134. %% hance we can rely on the buffer worker's flush timer to trigger
  1135. %% the next flush
  1136. ?tp(skip_flushing_worker, #{}),
  1137. ok;
  1138. maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
  1139. %% the inflight table was full before handling aync reply
  1140. ?tp(do_flushing_worker, #{}),
  1141. ok = ?MODULE:flush_worker(self()).
  1142. %% check if the async reply is valid.
  1143. %% e.g. if a connector evaluates the callback more than once:
  1144. %% 1. If the request was previously deleted from inflight table due to
  1145. %% either succeeded previously or expired, this function logs a
  1146. %% warning message and returns 'discard' instruction.
  1147. %% 2. If the request was previously failed and now pending on a retry,
  1148. %% then this function will return 'continue' as there is no way to
  1149. %% tell if this reply is stae or not.
  1150. maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) ->
  1151. continue;
  1152. maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) ->
  1153. try ets:member(InflightTID, Ref) of
  1154. true ->
  1155. continue;
  1156. false ->
  1157. ?tp(
  1158. warning,
  1159. unknown_async_reply_discarded,
  1160. #{inflight_key => Ref}
  1161. ),
  1162. discard
  1163. catch
  1164. error:badarg ->
  1165. %% shutdown ?
  1166. discard
  1167. end.
  1168. %%==============================================================================
  1169. %% operations for queue
  1170. queue_item_marshaller(Bin) when is_binary(Bin) ->
  1171. binary_to_term(Bin);
  1172. queue_item_marshaller(Item) ->
  1173. term_to_binary(Item).
  1174. estimate_size(QItem) ->
  1175. erlang:external_size(QItem).
  1176. -spec append_queue(id(), index(), replayq:q(), [queue_query()]) ->
  1177. {[queue_query()], replayq:q()}.
  1178. append_queue(Id, Index, Q, Queries) ->
  1179. %% this assertion is to ensure that we never append a raw binary
  1180. %% because the marshaller will get lost.
  1181. false = is_binary(hd(Queries)),
  1182. Q0 = replayq:append(Q, Queries),
  1183. {Overflown, Q2} =
  1184. case replayq:overflow(Q0) of
  1185. OverflownBytes when OverflownBytes =< 0 ->
  1186. {[], Q0};
  1187. OverflownBytes ->
  1188. PopOpts = #{bytes_limit => OverflownBytes, count_limit => 999999999},
  1189. {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
  1190. ok = replayq:ack(Q1, QAckRef),
  1191. Dropped = length(Items2),
  1192. emqx_resource_metrics:dropped_queue_full_inc(Id, Dropped),
  1193. ?SLOG(info, #{
  1194. msg => buffer_worker_overflow,
  1195. resource_id => Id,
  1196. worker_index => Index,
  1197. dropped => Dropped
  1198. }),
  1199. {Items2, Q1}
  1200. end,
  1201. emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
  1202. ?tp(
  1203. buffer_worker_appended_to_queue,
  1204. #{
  1205. id => Id,
  1206. items => Queries,
  1207. queue_count => queue_count(Q2),
  1208. overflown => length(Overflown)
  1209. }
  1210. ),
  1211. {Overflown, Q2}.
  1212. %%==============================================================================
  1213. %% the inflight queue for async query
  1214. -define(MAX_SIZE_REF, max_size).
  1215. -define(SIZE_REF, size).
  1216. -define(INITIAL_TIME_REF, initial_time).
  1217. -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
  1218. %% NOTE
  1219. %% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝
  1220. -define(INFLIGHT_META_ROWS, 4).
  1221. inflight_new(InfltWinSZ, Id, Index) ->
  1222. TableId = ets:new(
  1223. emqx_resource_buffer_worker_inflight_tab,
  1224. [ordered_set, public, {write_concurrency, true}]
  1225. ),
  1226. inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index),
  1227. %% we use this counter because we might deal with batches as
  1228. %% elements.
  1229. inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
  1230. inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
  1231. inflight_append(
  1232. TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index
  1233. ),
  1234. TableId.
  1235. -spec inflight_get_first_retriable(ets:tid(), integer()) ->
  1236. none
  1237. | {expired, inflight_key(), [queue_query()]}
  1238. | {single, inflight_key(), queue_query()}
  1239. | {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}.
  1240. inflight_get_first_retriable(InflightTID, Now) ->
  1241. MatchSpec =
  1242. ets:fun2ms(
  1243. fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when
  1244. IsRetriable =:= true
  1245. ->
  1246. {Ref, BatchOrQuery}
  1247. end
  1248. ),
  1249. case ets:select(InflightTID, MatchSpec, _Limit = 1) of
  1250. '$end_of_table' ->
  1251. none;
  1252. {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
  1253. case is_expired(ExpireAt, Now) of
  1254. true ->
  1255. {expired, Ref, [Query]};
  1256. false ->
  1257. {single, Ref, Query}
  1258. end;
  1259. {[{Ref, Batch = [_ | _]}], _Continuation} ->
  1260. case sieve_expired_requests(Batch, Now) of
  1261. {[], _AllExpired} ->
  1262. {expired, Ref, Batch};
  1263. {NotExpired, Expired} ->
  1264. {batch, Ref, NotExpired, Expired}
  1265. end
  1266. end.
  1267. is_inflight_full(undefined) ->
  1268. false;
  1269. is_inflight_full(InflightTID) ->
  1270. [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
  1271. %% we consider number of batches rather than number of messages
  1272. %% because one batch request may hold several messages.
  1273. Size = inflight_count(InflightTID),
  1274. Size >= MaxSize.
  1275. inflight_count(InflightTID) ->
  1276. case ets:info(InflightTID, size) of
  1277. undefined -> 0;
  1278. Size -> max(0, Size - ?INFLIGHT_META_ROWS)
  1279. end.
  1280. inflight_num_msgs(InflightTID) ->
  1281. [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
  1282. Size.
  1283. inflight_append(undefined, _InflightItem, _Id, _Index) ->
  1284. ok;
  1285. inflight_append(
  1286. InflightTID,
  1287. ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
  1288. Id,
  1289. Index
  1290. ) ->
  1291. Batch = mark_as_sent(Batch0),
  1292. InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
  1293. IsNew = ets:insert_new(InflightTID, InflightItem),
  1294. BatchSize = length(Batch),
  1295. IsNew andalso inc_inflight(InflightTID, BatchSize),
  1296. emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
  1297. ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
  1298. ok;
  1299. inflight_append(
  1300. InflightTID,
  1301. ?INFLIGHT_ITEM(
  1302. Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
  1303. ),
  1304. Id,
  1305. Index
  1306. ) ->
  1307. Query = mark_as_sent(Query0),
  1308. InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
  1309. IsNew = ets:insert_new(InflightTID, InflightItem),
  1310. IsNew andalso inc_inflight(InflightTID, 1),
  1311. emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
  1312. ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
  1313. ok;
  1314. inflight_append(InflightTID, {Ref, Data}, _Id, _Index) ->
  1315. ets:insert(InflightTID, {Ref, Data}),
  1316. %% this is a metadata row being inserted; therefore, we don't bump
  1317. %% the inflight metric.
  1318. ok.
  1319. %% a request was already appended and originally not retriable, but an
  1320. %% error occurred and it is now retriable.
  1321. mark_inflight_as_retriable(undefined, _Ref) ->
  1322. ok;
  1323. mark_inflight_as_retriable(InflightTID, Ref) ->
  1324. _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
  1325. %% the old worker's DOWN should not affect this inflight any more
  1326. _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}),
  1327. ok.
  1328. %% Track each worker pid only once.
  1329. ensure_async_worker_monitored(
  1330. Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, WorkerPid}} = _Result
  1331. ) when
  1332. is_pid(WorkerPid), is_map_key(WorkerPid, AsyncWorkers)
  1333. ->
  1334. WorkerMRef = maps:get(WorkerPid, AsyncWorkers),
  1335. {Data0, WorkerMRef};
  1336. ensure_async_worker_monitored(
  1337. Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, WorkerPid}}
  1338. ) when
  1339. is_pid(WorkerPid)
  1340. ->
  1341. WorkerMRef = monitor(process, WorkerPid),
  1342. AsyncWorkers = AsyncWorkers0#{WorkerPid => WorkerMRef},
  1343. Data = Data0#{async_workers := AsyncWorkers},
  1344. {Data, WorkerMRef};
  1345. ensure_async_worker_monitored(Data0, _Result) ->
  1346. {Data0, undefined}.
  1347. store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) ->
  1348. ok;
  1349. store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
  1350. ok;
  1351. store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
  1352. is_reference(WorkerMRef)
  1353. ->
  1354. _ = ets:update_element(
  1355. InflightTID, Ref, {?WORKER_MREF_IDX, WorkerMRef}
  1356. ),
  1357. ok.
  1358. ack_inflight(undefined, _Ref, _Id, _Index) ->
  1359. false;
  1360. ack_inflight(InflightTID, Ref, Id, Index) ->
  1361. Count =
  1362. case ets:take(InflightTID, Ref) of
  1363. [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
  1364. 1;
  1365. [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
  1366. length(Batch);
  1367. [] ->
  1368. 0
  1369. end,
  1370. ok = dec_inflight(InflightTID, Count),
  1371. IsKnownRef = (Count > 0),
  1372. case IsKnownRef of
  1373. true ->
  1374. emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID));
  1375. false ->
  1376. ok
  1377. end,
  1378. IsKnownRef.
  1379. mark_inflight_items_as_retriable(Data, WorkerMRef) ->
  1380. #{inflight_tid := InflightTID} = Data,
  1381. IsRetriable = true,
  1382. MatchSpec =
  1383. ets:fun2ms(
  1384. fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, WorkerMRef0)) when
  1385. WorkerMRef =:= WorkerMRef0
  1386. ->
  1387. ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef0)
  1388. end
  1389. ),
  1390. _NumAffected = ets:select_replace(InflightTID, MatchSpec),
  1391. ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}),
  1392. ok.
  1393. %% used to update a batch after dropping expired individual queries.
  1394. update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
  1395. _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
  1396. ok = dec_inflight(InflightTID, NumExpired).
  1397. inc_inflight(InflightTID, Count) ->
  1398. _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
  1399. ok.
  1400. dec_inflight(_InflightTID, 0) ->
  1401. ok;
  1402. dec_inflight(InflightTID, Count) when Count > 0 ->
  1403. _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
  1404. ok.
  1405. %%==============================================================================
  1406. inc_sent_failed(Id, _HasBeenSent = true) ->
  1407. emqx_resource_metrics:retried_failed_inc(Id);
  1408. inc_sent_failed(Id, _HasBeenSent) ->
  1409. emqx_resource_metrics:failed_inc(Id).
  1410. inc_sent_success(Id, _HasBeenSent = true) ->
  1411. emqx_resource_metrics:retried_success_inc(Id);
  1412. inc_sent_success(Id, _HasBeenSent) ->
  1413. emqx_resource_metrics:success_inc(Id).
  1414. call_mode(sync, _) -> sync;
  1415. call_mode(async, always_sync) -> sync;
  1416. call_mode(async, async_if_possible) -> async.
  1417. assert_ok_result(ok) ->
  1418. true;
  1419. assert_ok_result({async_return, R}) ->
  1420. assert_ok_result(R);
  1421. assert_ok_result(R) when is_tuple(R) ->
  1422. try
  1423. ok = erlang:element(1, R)
  1424. catch
  1425. error:{badmatch, _} ->
  1426. error({not_ok_result, R})
  1427. end;
  1428. assert_ok_result(R) ->
  1429. error({not_ok_result, R}).
  1430. queue_count(Q) ->
  1431. replayq:count(Q).
  1432. disk_queue_dir(Id, Index) ->
  1433. QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
  1434. QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]),
  1435. emqx_misc:safe_filename(QDir).
  1436. clear_disk_queue_dir(Id, Index) ->
  1437. ReplayQDir = disk_queue_dir(Id, Index),
  1438. case file:del_dir_r(ReplayQDir) of
  1439. {error, enoent} ->
  1440. ok;
  1441. Res ->
  1442. Res
  1443. end.
  1444. ensure_flush_timer(Data = #{batch_time := T}) ->
  1445. ensure_flush_timer(Data, T).
  1446. ensure_flush_timer(Data = #{tref := undefined}, T) ->
  1447. Ref = make_ref(),
  1448. TRef = erlang:send_after(T, self(), {flush, Ref}),
  1449. Data#{tref => {TRef, Ref}};
  1450. ensure_flush_timer(Data, _T) ->
  1451. Data.
  1452. cancel_flush_timer(St = #{tref := undefined}) ->
  1453. St;
  1454. cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
  1455. _ = erlang:cancel_timer(TRef),
  1456. St#{tref => undefined}.
  1457. -spec make_request_ref() -> inflight_key().
  1458. make_request_ref() ->
  1459. now_().
  1460. collect_requests(Acc, Limit) ->
  1461. Count = length(Acc),
  1462. do_collect_requests(Acc, Count, Limit).
  1463. do_collect_requests(Acc, Count, Limit) when Count >= Limit ->
  1464. lists:reverse(Acc);
  1465. do_collect_requests(Acc, Count, Limit) ->
  1466. receive
  1467. ?SEND_REQ(_ReplyTo, _Req) = Request ->
  1468. do_collect_requests([Request | Acc], Count + 1, Limit)
  1469. after 0 ->
  1470. lists:reverse(Acc)
  1471. end.
  1472. mark_as_sent(Batch) when is_list(Batch) ->
  1473. lists:map(fun mark_as_sent/1, Batch);
  1474. mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) ->
  1475. HasBeenSent = true,
  1476. ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt).
  1477. is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
  1478. true;
  1479. is_unrecoverable_error({error, {recoverable_error, _}}) ->
  1480. false;
  1481. is_unrecoverable_error({async_return, Result}) ->
  1482. is_unrecoverable_error(Result);
  1483. is_unrecoverable_error({error, _}) ->
  1484. %% TODO: delete this clause.
  1485. %% Ideally all errors except for 'unrecoverable_error' should be
  1486. %% retried, including DB schema errors.
  1487. true;
  1488. is_unrecoverable_error(_) ->
  1489. false.
  1490. is_async_return({async_return, _}) ->
  1491. true;
  1492. is_async_return(_) ->
  1493. false.
  1494. sieve_expired_requests(Batch, Now) ->
  1495. lists:partition(
  1496. fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
  1497. not is_expired(ExpireAt, Now)
  1498. end,
  1499. Batch
  1500. ).
  1501. -spec is_expired(infinity | integer(), integer()) -> boolean().
  1502. is_expired(infinity = _ExpireAt, _Now) ->
  1503. false;
  1504. is_expired(ExpireAt, Now) ->
  1505. Now > ExpireAt.
  1506. now_() ->
  1507. erlang:monotonic_time(nanosecond).
  1508. -spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts().
  1509. ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
  1510. Opts;
  1511. ensure_timeout_query_opts(#{} = Opts0, sync) ->
  1512. Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT};
  1513. ensure_timeout_query_opts(#{} = Opts0, async) ->
  1514. Opts0#{timeout => infinity}.
  1515. -spec ensure_expire_at(query_opts()) -> query_opts().
  1516. ensure_expire_at(#{expire_at := _} = Opts) ->
  1517. Opts;
  1518. ensure_expire_at(#{timeout := infinity} = Opts) ->
  1519. Opts#{expire_at => infinity};
  1520. ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
  1521. TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
  1522. ExpireAt = now_() + TimeoutNS,
  1523. Opts#{expire_at => ExpireAt}.
  1524. %% no need to keep the request for async reply handler
  1525. minimize(?QUERY(_, _, _, _) = Q) ->
  1526. do_minimize(Q);
  1527. minimize(L) when is_list(L) ->
  1528. lists:map(fun do_minimize/1, L).
  1529. -ifdef(TEST).
  1530. do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
  1531. -else.
  1532. do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
  1533. -endif.