|
|
@@ -38,7 +38,8 @@
|
|
|
]).
|
|
|
|
|
|
-export([
|
|
|
- simple_sync_query/2
|
|
|
+ simple_sync_query/2,
|
|
|
+ simple_async_query/2
|
|
|
]).
|
|
|
|
|
|
-export([
|
|
|
@@ -60,21 +61,24 @@
|
|
|
|
|
|
-define(COLLECT_REQ_LIMIT, 1000).
|
|
|
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
|
|
|
--define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}).
|
|
|
+-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
|
|
|
+-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
|
|
|
-define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
|
|
|
-define(EXPAND(RESULT, BATCH), [
|
|
|
?REPLY(FROM, REQUEST, SENT, RESULT)
|
|
|
- || ?QUERY(FROM, REQUEST, SENT) <- BATCH
|
|
|
+ || ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH
|
|
|
]).
|
|
|
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
|
|
|
{Ref, BatchOrQuery, IsRetriable, WorkerMRef}
|
|
|
).
|
|
|
+-define(ITEM_IDX, 2).
|
|
|
-define(RETRY_IDX, 3).
|
|
|
-define(WORKER_MREF_IDX, 4).
|
|
|
|
|
|
-type id() :: binary().
|
|
|
-type index() :: pos_integer().
|
|
|
--type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean()).
|
|
|
+-type expire_at() :: infinity | integer().
|
|
|
+-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()).
|
|
|
-type request() :: term().
|
|
|
-type from() :: pid() | reply_fun() | request_from().
|
|
|
-type request_from() :: undefined | gen_statem:from().
|
|
|
@@ -98,20 +102,24 @@ start_link(Id, Index, Opts) ->
|
|
|
gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
|
|
|
|
|
|
-spec sync_query(id(), request(), query_opts()) -> Result :: term().
|
|
|
-sync_query(Id, Request, Opts) ->
|
|
|
+sync_query(Id, Request, Opts0) ->
|
|
|
+ Opts1 = ensure_timeout_query_opts(Opts0, sync),
|
|
|
+ Opts = ensure_expire_at(Opts1),
|
|
|
PickKey = maps:get(pick_key, Opts, self()),
|
|
|
- Timeout = maps:get(timeout, Opts, timer:seconds(15)),
|
|
|
+ Timeout = maps:get(timeout, Opts),
|
|
|
emqx_resource_metrics:matched_inc(Id),
|
|
|
pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
|
|
|
|
|
|
-spec async_query(id(), request(), query_opts()) -> Result :: term().
|
|
|
-async_query(Id, Request, Opts) ->
|
|
|
+async_query(Id, Request, Opts0) ->
|
|
|
+ Opts1 = ensure_timeout_query_opts(Opts0, async),
|
|
|
+ Opts = ensure_expire_at(Opts1),
|
|
|
PickKey = maps:get(pick_key, Opts, self()),
|
|
|
emqx_resource_metrics:matched_inc(Id),
|
|
|
pick_cast(Id, PickKey, {query, Request, Opts}).
|
|
|
|
|
|
-%% simple query the resource without batching and queuing messages.
|
|
|
--spec simple_sync_query(id(), request()) -> Result :: term().
|
|
|
+%% simple query the resource without batching and queuing.
|
|
|
+-spec simple_sync_query(id(), request()) -> term().
|
|
|
simple_sync_query(Id, Request) ->
|
|
|
%% Note: since calling this function implies in bypassing the
|
|
|
%% buffer workers, and each buffer worker index is used when
|
|
|
@@ -120,14 +128,27 @@ simple_sync_query(Id, Request) ->
|
|
|
%% would mess up the metrics anyway. `undefined' is ignored by
|
|
|
%% `emqx_resource_metrics:*_shift/3'.
|
|
|
Index = undefined,
|
|
|
- QueryOpts = #{simple_query => true},
|
|
|
+ QueryOpts = simple_query_opts(),
|
|
|
emqx_resource_metrics:matched_inc(Id),
|
|
|
Ref = make_message_ref(),
|
|
|
- Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
|
|
|
- HasBeenSent = false,
|
|
|
- _ = handle_query_result(Id, Result, HasBeenSent),
|
|
|
+ Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
|
|
|
+ _ = handle_query_result(Id, Result, _HasBeenSent = false),
|
|
|
Result.
|
|
|
|
|
|
+%% simple async-query the resource without batching and queuing.
|
|
|
+-spec simple_async_query(id(), request()) -> term().
|
|
|
+simple_async_query(Id, Request) ->
|
|
|
+ Index = undefined,
|
|
|
+ QueryOpts = simple_query_opts(),
|
|
|
+ emqx_resource_metrics:matched_inc(Id),
|
|
|
+ Ref = make_message_ref(),
|
|
|
+ Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
|
|
|
+ _ = handle_query_result(Id, Result, _HasBeenSent = false),
|
|
|
+ Result.
|
|
|
+
|
|
|
+simple_query_opts() ->
|
|
|
+ ensure_expire_at(#{simple_query => true, timeout => infinity}).
|
|
|
+
|
|
|
-spec block(pid()) -> ok.
|
|
|
block(ServerRef) ->
|
|
|
gen_statem:cast(ServerRef, block).
|
|
|
@@ -179,9 +200,14 @@ init({Id, Index, Opts}) ->
|
|
|
?tp(buffer_worker_init, #{id => Id, index => Index}),
|
|
|
{ok, running, Data}.
|
|
|
|
|
|
-running(enter, _, St) ->
|
|
|
+running(enter, _, Data) ->
|
|
|
?tp(buffer_worker_enter_running, #{}),
|
|
|
- maybe_flush(St);
|
|
|
+ %% According to `gen_statem' laws, we mustn't call `maybe_flush'
|
|
|
+ %% directly because it may decide to return `{next_state, blocked, _}',
|
|
|
+ %% and that's an invalid response for a state enter call.
|
|
|
+ %% Returning a next event from a state enter call is also
|
|
|
+ %% prohibited.
|
|
|
+ {keep_state, ensure_flush_timer(Data, 0)};
|
|
|
running(cast, resume, _St) ->
|
|
|
keep_state_and_data;
|
|
|
running(cast, flush, Data) ->
|
|
|
@@ -243,9 +269,9 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
%%==============================================================================
|
|
|
--define(PICK(ID, KEY, EXPR),
|
|
|
+-define(PICK(ID, KEY, PID, EXPR),
|
|
|
try gproc_pool:pick_worker(ID, KEY) of
|
|
|
- Pid when is_pid(Pid) ->
|
|
|
+ PID when is_pid(PID) ->
|
|
|
EXPR;
|
|
|
_ ->
|
|
|
?RESOURCE_ERROR(worker_not_created, "resource not created")
|
|
|
@@ -258,7 +284,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
).
|
|
|
|
|
|
pick_call(Id, Key, Query, Timeout) ->
|
|
|
- ?PICK(Id, Key, begin
|
|
|
+ ?PICK(Id, Key, Pid, begin
|
|
|
Caller = self(),
|
|
|
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
|
|
|
From = {Caller, MRef},
|
|
|
@@ -281,15 +307,21 @@ pick_call(Id, Key, Query, Timeout) ->
|
|
|
end).
|
|
|
|
|
|
pick_cast(Id, Key, Query) ->
|
|
|
- ?PICK(Id, Key, begin
|
|
|
+ ?PICK(Id, Key, Pid, begin
|
|
|
From = undefined,
|
|
|
erlang:send(Pid, ?SEND_REQ(From, Query)),
|
|
|
ok
|
|
|
end).
|
|
|
|
|
|
resume_from_blocked(Data) ->
|
|
|
- #{inflight_tid := InflightTID} = Data,
|
|
|
- case inflight_get_first_retriable(InflightTID) of
|
|
|
+ ?tp(buffer_worker_resume_from_blocked_enter, #{}),
|
|
|
+ #{
|
|
|
+ id := Id,
|
|
|
+ index := Index,
|
|
|
+ inflight_tid := InflightTID
|
|
|
+ } = Data,
|
|
|
+ Now = now_(),
|
|
|
+ case inflight_get_first_retriable(InflightTID, Now) of
|
|
|
none ->
|
|
|
case is_inflight_full(InflightTID) of
|
|
|
true ->
|
|
|
@@ -297,14 +329,32 @@ resume_from_blocked(Data) ->
|
|
|
false ->
|
|
|
{next_state, running, Data}
|
|
|
end;
|
|
|
- {Ref, FirstQuery} ->
|
|
|
+ {expired, Ref, Batch} ->
|
|
|
+ IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
|
|
+ IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
|
|
|
+ ?tp(buffer_worker_retry_expired, #{expired => Batch}),
|
|
|
+ resume_from_blocked(Data);
|
|
|
+ {single, Ref, Query} ->
|
|
|
+ %% We retry msgs in inflight window sync, as if we send them
|
|
|
+ %% async, they will be appended to the end of inflight window again.
|
|
|
+ case is_inflight_full(InflightTID) of
|
|
|
+ true ->
|
|
|
+ {keep_state, Data};
|
|
|
+ false ->
|
|
|
+ retry_inflight_sync(Ref, Query, Data)
|
|
|
+ end;
|
|
|
+ {batch, Ref, NotExpired, Expired} ->
|
|
|
+ update_inflight_item(InflightTID, Ref, NotExpired),
|
|
|
+ NumExpired = length(Expired),
|
|
|
+ emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
|
|
|
+ NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
|
|
|
%% We retry msgs in inflight window sync, as if we send them
|
|
|
%% async, they will be appended to the end of inflight window again.
|
|
|
case is_inflight_full(InflightTID) of
|
|
|
true ->
|
|
|
{keep_state, Data};
|
|
|
false ->
|
|
|
- retry_inflight_sync(Ref, FirstQuery, Data)
|
|
|
+ retry_inflight_sync(Ref, NotExpired, Data)
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
@@ -320,10 +370,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
|
|
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
|
|
|
ReplyResult =
|
|
|
case QueryOrBatch of
|
|
|
- ?QUERY(From, CoreReq, HasBeenSent) ->
|
|
|
+ ?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) ->
|
|
|
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
|
|
|
reply_caller_defer_metrics(Id, Reply, QueryOpts);
|
|
|
- [?QUERY(_, _, _) | _] = Batch ->
|
|
|
+ [?QUERY(_, _, _, _) | _] = Batch ->
|
|
|
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
|
|
|
end,
|
|
|
case ReplyResult of
|
|
|
@@ -378,10 +428,12 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
|
|
|
(?SEND_REQ(undefined = _From, {query, Req, Opts})) ->
|
|
|
ReplyFun = maps:get(async_reply_fun, Opts, undefined),
|
|
|
HasBeenSent = false,
|
|
|
- ?QUERY(ReplyFun, Req, HasBeenSent);
|
|
|
- (?SEND_REQ(From, {query, Req, _Opts})) ->
|
|
|
+ ExpireAt = maps:get(expire_at, Opts),
|
|
|
+ ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
|
|
|
+ (?SEND_REQ(From, {query, Req, Opts})) ->
|
|
|
HasBeenSent = false,
|
|
|
- ?QUERY(From, Req, HasBeenSent)
|
|
|
+ ExpireAt = maps:get(expire_at, Opts),
|
|
|
+ ?QUERY(From, Req, HasBeenSent, ExpireAt)
|
|
|
end,
|
|
|
Requests
|
|
|
),
|
|
|
@@ -406,6 +458,8 @@ maybe_flush(Data0) ->
|
|
|
-spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
|
|
|
flush(Data0) ->
|
|
|
#{
|
|
|
+ id := Id,
|
|
|
+ index := Index,
|
|
|
batch_size := BatchSize,
|
|
|
inflight_tid := InflightTID,
|
|
|
queue := Q0
|
|
|
@@ -419,25 +473,45 @@ flush(Data0) ->
|
|
|
Data2 = ensure_flush_timer(Data1),
|
|
|
{keep_state, Data2};
|
|
|
{_, false} ->
|
|
|
+ ?tp(buffer_worker_flush_before_pop, #{}),
|
|
|
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
|
|
|
- IsBatch = BatchSize =/= 1,
|
|
|
- %% We *must* use the new queue, because we currently can't
|
|
|
- %% `nack' a `pop'.
|
|
|
- %% Maybe we could re-open the queue?
|
|
|
Data2 = Data1#{queue := Q1},
|
|
|
- Ref = make_message_ref(),
|
|
|
- do_flush(Data2, #{
|
|
|
- new_queue => Q1,
|
|
|
- is_batch => IsBatch,
|
|
|
- batch => Batch,
|
|
|
- ref => Ref,
|
|
|
- ack_ref => QAckRef
|
|
|
- })
|
|
|
+ ?tp(buffer_worker_flush_before_sieve_expired, #{}),
|
|
|
+ Now = now_(),
|
|
|
+ %% if the request has expired, the caller is no longer
|
|
|
+ %% waiting for a response.
|
|
|
+ case sieve_expired_requests(Batch, Now) of
|
|
|
+ all_expired ->
|
|
|
+ ok = replayq:ack(Q1, QAckRef),
|
|
|
+ emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
|
|
|
+ emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
|
|
+ ?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
|
|
|
+ flush(Data2);
|
|
|
+ {NotExpired, Expired} ->
|
|
|
+ NumExpired = length(Expired),
|
|
|
+ emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
|
|
|
+ IsBatch = BatchSize =/= 1,
|
|
|
+ %% We *must* use the new queue, because we currently can't
|
|
|
+ %% `nack' a `pop'.
|
|
|
+ %% Maybe we could re-open the queue?
|
|
|
+ ?tp(
|
|
|
+ buffer_worker_flush_potentially_partial,
|
|
|
+ #{expired => Expired, not_expired => NotExpired}
|
|
|
+ ),
|
|
|
+ Ref = make_message_ref(),
|
|
|
+ do_flush(Data2, #{
|
|
|
+ new_queue => Q1,
|
|
|
+ is_batch => IsBatch,
|
|
|
+ batch => NotExpired,
|
|
|
+ ref => Ref,
|
|
|
+ ack_ref => QAckRef
|
|
|
+ })
|
|
|
+ end
|
|
|
end.
|
|
|
|
|
|
-spec do_flush(data(), #{
|
|
|
is_batch := boolean(),
|
|
|
- batch := [?QUERY(from(), request(), boolean())],
|
|
|
+ batch := [queue_query()],
|
|
|
ack_ref := replayq:ack_ref(),
|
|
|
ref := inflight_key(),
|
|
|
new_queue := replayq:q()
|
|
|
@@ -459,7 +533,7 @@ do_flush(
|
|
|
inflight_tid := InflightTID
|
|
|
} = Data0,
|
|
|
%% unwrap when not batching (i.e., batch size == 1)
|
|
|
- [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
|
|
|
+ [?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch,
|
|
|
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
|
|
|
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
|
|
|
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
|
|
|
@@ -520,10 +594,11 @@ do_flush(
|
|
|
),
|
|
|
case queue_count(Q1) > 0 of
|
|
|
true ->
|
|
|
- {keep_state, Data1, [{next_event, internal, flush}]};
|
|
|
+ flush_worker(self());
|
|
|
false ->
|
|
|
- {keep_state, Data1}
|
|
|
- end
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ {keep_state, Data1}
|
|
|
end;
|
|
|
do_flush(Data0, #{
|
|
|
is_batch := true,
|
|
|
@@ -596,15 +671,17 @@ do_flush(Data0, #{
|
|
|
}
|
|
|
),
|
|
|
CurrentCount = queue_count(Q1),
|
|
|
- case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
|
|
- {false, _} ->
|
|
|
- {keep_state, Data1};
|
|
|
- {true, true} ->
|
|
|
- {keep_state, Data1, [{next_event, internal, flush}]};
|
|
|
- {true, false} ->
|
|
|
- Data2 = ensure_flush_timer(Data1),
|
|
|
- {keep_state, Data2}
|
|
|
- end
|
|
|
+ Data2 =
|
|
|
+ case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
|
|
+ {false, _} ->
|
|
|
+ Data1;
|
|
|
+ {true, true} ->
|
|
|
+ flush_worker(self()),
|
|
|
+ Data1;
|
|
|
+ {true, false} ->
|
|
|
+ ensure_flush_timer(Data1)
|
|
|
+ end,
|
|
|
+ {keep_state, Data2}
|
|
|
end.
|
|
|
|
|
|
batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
|
|
|
@@ -720,45 +797,26 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
|
|
|
ok
|
|
|
end,
|
|
|
{nack, PostFn};
|
|
|
-handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) ->
|
|
|
- PostFn = fun() ->
|
|
|
- ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
|
|
|
- inc_sent_failed(Id, HasBeenSent),
|
|
|
- ok
|
|
|
- end,
|
|
|
- {ack, PostFn};
|
|
|
-handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) ->
|
|
|
- %% the message will be queued in replayq or inflight window,
|
|
|
- %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
|
|
|
- %% sent this message.
|
|
|
- PostFn = fun() ->
|
|
|
- ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
|
|
|
- ok
|
|
|
- end,
|
|
|
- {nack, PostFn};
|
|
|
-handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) ->
|
|
|
- PostFn = fun() ->
|
|
|
- ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
|
|
|
- ok
|
|
|
- end,
|
|
|
- {nack, PostFn};
|
|
|
-handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) ->
|
|
|
- PostFn = fun() ->
|
|
|
- ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
|
|
|
- inc_sent_failed(Id, HasBeenSent),
|
|
|
- ok
|
|
|
- end,
|
|
|
- {ack, PostFn};
|
|
|
-handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) ->
|
|
|
- PostFn = fun() ->
|
|
|
- ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
|
|
|
- ok
|
|
|
- end,
|
|
|
- {nack, PostFn};
|
|
|
-handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
|
|
|
- {ack, fun() -> ok end};
|
|
|
-handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) ->
|
|
|
- {ack, fun() -> ok end};
|
|
|
+handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
|
|
|
+ case is_unrecoverable_error(Error) of
|
|
|
+ true ->
|
|
|
+ PostFn =
|
|
|
+ fun() ->
|
|
|
+ ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
|
|
|
+ inc_sent_failed(Id, HasBeenSent),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ {ack, PostFn};
|
|
|
+ false ->
|
|
|
+ PostFn =
|
|
|
+ fun() ->
|
|
|
+ ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ {nack, PostFn}
|
|
|
+ end;
|
|
|
+handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
|
|
|
+ handle_query_async_result_pure(Id, Result, HasBeenSent);
|
|
|
handle_query_result_pure(Id, Result, HasBeenSent) ->
|
|
|
PostFn = fun() ->
|
|
|
assert_ok_result(Result),
|
|
|
@@ -767,6 +825,28 @@ handle_query_result_pure(Id, Result, HasBeenSent) ->
|
|
|
end,
|
|
|
{ack, PostFn}.
|
|
|
|
|
|
+handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
|
|
|
+ case is_unrecoverable_error(Error) of
|
|
|
+ true ->
|
|
|
+ PostFn =
|
|
|
+ fun() ->
|
|
|
+ ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
|
|
|
+ inc_sent_failed(Id, HasBeenSent),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ {ack, PostFn};
|
|
|
+ false ->
|
|
|
+ PostFn = fun() ->
|
|
|
+ ?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ {nack, PostFn}
|
|
|
+ end;
|
|
|
+handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
|
|
|
+ {ack, fun() -> ok end};
|
|
|
+handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
|
|
|
+ {ack, fun() -> ok end}.
|
|
|
+
|
|
|
handle_async_worker_down(Data0, Pid) ->
|
|
|
#{async_workers := AsyncWorkers0} = Data0,
|
|
|
{WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
|
|
|
@@ -779,9 +859,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
|
|
|
case emqx_resource_manager:ets_lookup(Id) of
|
|
|
{ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
|
|
|
QM =
|
|
|
- case QM0 of
|
|
|
- configured -> maps:get(query_mode, Data);
|
|
|
- _ -> QM0
|
|
|
+ case QM0 =:= configured of
|
|
|
+ true -> maps:get(query_mode, Data);
|
|
|
+ false -> QM0
|
|
|
end,
|
|
|
CBM = maps:get(callback_mode, Data),
|
|
|
CallMode = call_mode(QM, CBM),
|
|
|
@@ -812,10 +892,10 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
|
|
|
end
|
|
|
).
|
|
|
|
|
|
-apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
|
|
|
+apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
|
|
|
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
|
|
|
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
|
|
|
-apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
|
|
|
+apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
|
|
|
?tp(call_query_async, #{
|
|
|
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
|
|
|
}),
|
|
|
@@ -834,13 +914,13 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
|
|
|
end,
|
|
|
Request
|
|
|
);
|
|
|
-apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
|
|
|
+apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) ->
|
|
|
?tp(call_batch_query, #{
|
|
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
|
|
|
}),
|
|
|
- Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
|
|
|
+ Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
|
|
|
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
|
|
|
-apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
+apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
?tp(call_batch_query_async, #{
|
|
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
|
|
|
}),
|
|
|
@@ -850,7 +930,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
|
|
|
begin
|
|
|
ReplyFun = fun ?MODULE:batch_reply_after_query/8,
|
|
|
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
|
|
|
- Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
|
|
|
+ Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
|
|
|
IsRetriable = false,
|
|
|
WorkerMRef = undefined,
|
|
|
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
|
|
|
@@ -862,7 +942,41 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
|
|
|
).
|
|
|
|
|
|
reply_after_query(
|
|
|
- Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result
|
|
|
+ Pid,
|
|
|
+ Id,
|
|
|
+ Index,
|
|
|
+ InflightTID,
|
|
|
+ Ref,
|
|
|
+ ?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query,
|
|
|
+ QueryOpts,
|
|
|
+ Result
|
|
|
+) ->
|
|
|
+ ?tp(
|
|
|
+ buffer_worker_reply_after_query_enter,
|
|
|
+ #{batch_or_query => [Query], ref => Ref}
|
|
|
+ ),
|
|
|
+ Now = now_(),
|
|
|
+ case is_expired(ExpireAt, Now) of
|
|
|
+ true ->
|
|
|
+ IsFullBefore = is_inflight_full(InflightTID),
|
|
|
+ IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
|
|
+ IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
|
|
+ IsFullBefore andalso ?MODULE:flush_worker(Pid),
|
|
|
+ ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}),
|
|
|
+ ok;
|
|
|
+ false ->
|
|
|
+ do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result)
|
|
|
+ end.
|
|
|
+
|
|
|
+do_reply_after_query(
|
|
|
+ Pid,
|
|
|
+ Id,
|
|
|
+ Index,
|
|
|
+ InflightTID,
|
|
|
+ Ref,
|
|
|
+ ?QUERY(From, Request, HasBeenSent, _ExpireAt),
|
|
|
+ QueryOpts,
|
|
|
+ Result
|
|
|
) ->
|
|
|
%% NOTE: 'inflight' is the count of messages that were sent async
|
|
|
%% but received no ACK, NOT the number of messages queued in the
|
|
|
@@ -875,7 +989,7 @@ reply_after_query(
|
|
|
%% Keep retrying.
|
|
|
?tp(buffer_worker_reply_after_query, #{
|
|
|
action => Action,
|
|
|
- batch_or_query => ?QUERY(From, Request, HasBeenSent),
|
|
|
+ batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
|
|
|
ref => Ref,
|
|
|
result => Result
|
|
|
}),
|
|
|
@@ -884,18 +998,42 @@ reply_after_query(
|
|
|
ack ->
|
|
|
?tp(buffer_worker_reply_after_query, #{
|
|
|
action => Action,
|
|
|
- batch_or_query => ?QUERY(From, Request, HasBeenSent),
|
|
|
+ batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
|
|
|
ref => Ref,
|
|
|
result => Result
|
|
|
}),
|
|
|
+ do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
|
|
|
+ end.
|
|
|
+
|
|
|
+batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
|
|
|
+ ?tp(
|
|
|
+ buffer_worker_reply_after_query_enter,
|
|
|
+ #{batch_or_query => Batch, ref => Ref}
|
|
|
+ ),
|
|
|
+ Now = now_(),
|
|
|
+ case sieve_expired_requests(Batch, Now) of
|
|
|
+ all_expired ->
|
|
|
IsFullBefore = is_inflight_full(InflightTID),
|
|
|
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
|
|
- IsAcked andalso PostFn(),
|
|
|
+ IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
|
|
IsFullBefore andalso ?MODULE:flush_worker(Pid),
|
|
|
- ok
|
|
|
+ ?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}),
|
|
|
+ ok;
|
|
|
+ {NotExpired, Expired} ->
|
|
|
+ NumExpired = length(Expired),
|
|
|
+ emqx_resource_metrics:late_reply_inc(Id, NumExpired),
|
|
|
+ NumExpired > 0 andalso
|
|
|
+ ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}),
|
|
|
+ do_batch_reply_after_query(
|
|
|
+ Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result
|
|
|
+ )
|
|
|
end.
|
|
|
|
|
|
-batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
|
|
|
+do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
|
|
|
+ ?tp(
|
|
|
+ buffer_worker_reply_after_query_enter,
|
|
|
+ #{batch_or_query => Batch, ref => Ref}
|
|
|
+ ),
|
|
|
%% NOTE: 'inflight' is the count of messages that were sent async
|
|
|
%% but received no ACK, NOT the number of messages queued in the
|
|
|
%% inflight window.
|
|
|
@@ -918,13 +1056,23 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Resu
|
|
|
ref => Ref,
|
|
|
result => Result
|
|
|
}),
|
|
|
- IsFullBefore = is_inflight_full(InflightTID),
|
|
|
- IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
|
|
- IsAcked andalso PostFn(),
|
|
|
- IsFullBefore andalso ?MODULE:flush_worker(Pid),
|
|
|
- ok
|
|
|
+ do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts)
|
|
|
end.
|
|
|
|
|
|
+do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) ->
|
|
|
+ IsFullBefore = is_inflight_full(InflightTID),
|
|
|
+ IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
|
|
|
+ case maps:get(simple_query, QueryOpts, false) of
|
|
|
+ true ->
|
|
|
+ PostFn();
|
|
|
+ false when IsKnownRef ->
|
|
|
+ PostFn();
|
|
|
+ false ->
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ IsFullBefore andalso ?MODULE:flush_worker(WorkerPid),
|
|
|
+ ok.
|
|
|
+
|
|
|
%%==============================================================================
|
|
|
%% operations for queue
|
|
|
queue_item_marshaller(Bin) when is_binary(Bin) ->
|
|
|
@@ -982,13 +1130,16 @@ inflight_new(InfltWinSZ, Id, Index) ->
|
|
|
inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
|
|
|
inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
|
|
|
inflight_append(
|
|
|
- TableId, {?INITIAL_MONOTONIC_TIME_REF, erlang:monotonic_time(nanosecond)}, Id, Index
|
|
|
+ TableId, {?INITIAL_MONOTONIC_TIME_REF, make_message_ref()}, Id, Index
|
|
|
),
|
|
|
TableId.
|
|
|
|
|
|
--spec inflight_get_first_retriable(ets:tid()) ->
|
|
|
- none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}.
|
|
|
-inflight_get_first_retriable(InflightTID) ->
|
|
|
+-spec inflight_get_first_retriable(ets:tid(), integer()) ->
|
|
|
+ none
|
|
|
+ | {expired, inflight_key(), [queue_query()]}
|
|
|
+ | {single, inflight_key(), queue_query()}
|
|
|
+ | {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}.
|
|
|
+inflight_get_first_retriable(InflightTID, Now) ->
|
|
|
MatchSpec =
|
|
|
ets:fun2ms(
|
|
|
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when
|
|
|
@@ -1000,8 +1151,22 @@ inflight_get_first_retriable(InflightTID) ->
|
|
|
case ets:select(InflightTID, MatchSpec, _Limit = 1) of
|
|
|
'$end_of_table' ->
|
|
|
none;
|
|
|
- {[{Ref, BatchOrQuery}], _Continuation} ->
|
|
|
- {Ref, BatchOrQuery}
|
|
|
+ {[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
|
|
|
+ case is_expired(ExpireAt, Now) of
|
|
|
+ true ->
|
|
|
+ {expired, Ref, [Query]};
|
|
|
+ false ->
|
|
|
+ {single, Ref, Query}
|
|
|
+ end;
|
|
|
+ {[{Ref, Batch = [_ | _]}], _Continuation} ->
|
|
|
+ %% batch is non-empty because we check that in
|
|
|
+ %% `sieve_expired_requests'.
|
|
|
+ case sieve_expired_requests(Batch, Now) of
|
|
|
+ all_expired ->
|
|
|
+ {expired, Ref, Batch};
|
|
|
+ {NotExpired, Expired} ->
|
|
|
+ {batch, Ref, NotExpired, Expired}
|
|
|
+ end
|
|
|
end.
|
|
|
|
|
|
is_inflight_full(undefined) ->
|
|
|
@@ -1030,7 +1195,7 @@ inflight_append(undefined, _InflightItem, _Id, _Index) ->
|
|
|
ok;
|
|
|
inflight_append(
|
|
|
InflightTID,
|
|
|
- ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
|
|
|
+ ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
|
|
|
Id,
|
|
|
Index
|
|
|
) ->
|
|
|
@@ -1044,7 +1209,9 @@ inflight_append(
|
|
|
ok;
|
|
|
inflight_append(
|
|
|
InflightTID,
|
|
|
- ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef),
|
|
|
+ ?INFLIGHT_ITEM(
|
|
|
+ Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
|
|
|
+ ),
|
|
|
Id,
|
|
|
Index
|
|
|
) ->
|
|
|
@@ -1106,9 +1273,9 @@ ack_inflight(undefined, _Ref, _Id, _Index) ->
|
|
|
ack_inflight(InflightTID, Ref, Id, Index) ->
|
|
|
Count =
|
|
|
case ets:take(InflightTID, Ref) of
|
|
|
- [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] ->
|
|
|
+ [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
|
|
|
1;
|
|
|
- [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
|
|
|
+ [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
|
|
|
length(Batch);
|
|
|
_ ->
|
|
|
0
|
|
|
@@ -1133,6 +1300,12 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
|
|
|
?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}),
|
|
|
ok.
|
|
|
|
|
|
+%% used to update a batch after dropping expired individual queries.
|
|
|
+update_inflight_item(InflightTID, Ref, NewBatch) ->
|
|
|
+ _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
|
|
|
+ ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}),
|
|
|
+ ok.
|
|
|
+
|
|
|
%%==============================================================================
|
|
|
|
|
|
inc_sent_failed(Id, _HasBeenSent = true) ->
|
|
|
@@ -1180,11 +1353,14 @@ clear_disk_queue_dir(Id, Index) ->
|
|
|
Res
|
|
|
end.
|
|
|
|
|
|
-ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) ->
|
|
|
+ensure_flush_timer(Data = #{batch_time := T}) ->
|
|
|
+ ensure_flush_timer(Data, T).
|
|
|
+
|
|
|
+ensure_flush_timer(Data = #{tref := undefined}, T) ->
|
|
|
Ref = make_ref(),
|
|
|
TRef = erlang:send_after(T, self(), {flush, Ref}),
|
|
|
Data#{tref => {TRef, Ref}};
|
|
|
-ensure_flush_timer(Data) ->
|
|
|
+ensure_flush_timer(Data, _T) ->
|
|
|
Data.
|
|
|
|
|
|
cancel_flush_timer(St = #{tref := undefined}) ->
|
|
|
@@ -1195,7 +1371,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
|
|
|
|
|
|
-spec make_message_ref() -> inflight_key().
|
|
|
make_message_ref() ->
|
|
|
- erlang:monotonic_time(nanosecond).
|
|
|
+ now_().
|
|
|
|
|
|
collect_requests(Acc, Limit) ->
|
|
|
Count = length(Acc),
|
|
|
@@ -1213,9 +1389,9 @@ do_collect_requests(Acc, Count, Limit) ->
|
|
|
|
|
|
mark_as_sent(Batch) when is_list(Batch) ->
|
|
|
lists:map(fun mark_as_sent/1, Batch);
|
|
|
-mark_as_sent(?QUERY(From, Req, _)) ->
|
|
|
+mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) ->
|
|
|
HasBeenSent = true,
|
|
|
- ?QUERY(From, Req, HasBeenSent).
|
|
|
+ ?QUERY(From, Req, HasBeenSent, ExpireAt).
|
|
|
|
|
|
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
|
|
|
true;
|
|
|
@@ -1235,3 +1411,48 @@ is_async_return({async_return, _}) ->
|
|
|
true;
|
|
|
is_async_return(_) ->
|
|
|
false.
|
|
|
+
|
|
|
+sieve_expired_requests(Batch, Now) ->
|
|
|
+ {Expired, NotExpired} =
|
|
|
+ lists:partition(
|
|
|
+ fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) ->
|
|
|
+ is_expired(ExpireAt, Now)
|
|
|
+ end,
|
|
|
+ Batch
|
|
|
+ ),
|
|
|
+ case {NotExpired, Expired} of
|
|
|
+ {[], []} ->
|
|
|
+ %% Should be impossible for batch_size >= 1.
|
|
|
+ all_expired;
|
|
|
+ {[], [_ | _]} ->
|
|
|
+ all_expired;
|
|
|
+ {[_ | _], _} ->
|
|
|
+ {NotExpired, Expired}
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec is_expired(infinity | integer(), integer()) -> boolean().
|
|
|
+is_expired(infinity = _ExpireAt, _Now) ->
|
|
|
+ false;
|
|
|
+is_expired(ExpireAt, Now) ->
|
|
|
+ Now > ExpireAt.
|
|
|
+
|
|
|
+now_() ->
|
|
|
+ erlang:monotonic_time(nanosecond).
|
|
|
+
|
|
|
+-spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts().
|
|
|
+ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
|
|
|
+ Opts;
|
|
|
+ensure_timeout_query_opts(#{} = Opts0, sync) ->
|
|
|
+ Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT};
|
|
|
+ensure_timeout_query_opts(#{} = Opts0, async) ->
|
|
|
+ Opts0#{timeout => infinity}.
|
|
|
+
|
|
|
+-spec ensure_expire_at(query_opts()) -> query_opts().
|
|
|
+ensure_expire_at(#{expire_at := _} = Opts) ->
|
|
|
+ Opts;
|
|
|
+ensure_expire_at(#{timeout := infinity} = Opts) ->
|
|
|
+ Opts#{expire_at => infinity};
|
|
|
+ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
|
|
|
+ TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
|
|
|
+ ExpireAt = now_() + TimeoutNS,
|
|
|
+ Opts#{expire_at => ExpireAt}.
|