|
|
@@ -100,7 +100,7 @@
|
|
|
retried_success => non_neg_integer(),
|
|
|
retried_failed => non_neg_integer()
|
|
|
}.
|
|
|
--type inflight_table() :: ets:tid() | atom() | reference().
|
|
|
+-type inflight_table() :: ets:tid().
|
|
|
-type data() :: #{
|
|
|
id := id(),
|
|
|
index := index(),
|
|
|
@@ -116,6 +116,11 @@
|
|
|
metrics_tref := undefined | {reference(), reference()}
|
|
|
}.
|
|
|
|
|
|
+-define(NO_INFLIGHT, []).
|
|
|
+-define(NO_INDEX, []).
|
|
|
+-define(NO_REQ_REF, []).
|
|
|
+-define(NO_BUFFER_WORKER, []).
|
|
|
+
|
|
|
callback_mode() -> [state_functions, state_enter].
|
|
|
|
|
|
start_link(Id, Index, Opts) ->
|
|
|
@@ -154,14 +159,13 @@ simple_sync_query(Id, Request, QueryOpts0) ->
|
|
|
%% would mess up the metrics anyway. `undefined' is ignored by
|
|
|
%% `emqx_resource_metrics:*_shift/3'.
|
|
|
?tp(simple_sync_query, #{id => Id, request => Request}),
|
|
|
- Index = undefined,
|
|
|
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
|
|
|
emqx_resource_metrics:matched_inc(Id),
|
|
|
Ref = make_request_ref(),
|
|
|
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
|
|
TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
|
|
|
Result = call_query(
|
|
|
- force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
|
|
+ force_sync, Id, ?NO_INDEX, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
|
|
),
|
|
|
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
|
|
Result.
|
|
|
@@ -173,13 +177,12 @@ simple_async_query(Id, Request, QueryOpts0) ->
|
|
|
|
|
|
simple_async_query(Id, Request, QueryOpts0, Ref) ->
|
|
|
?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
|
|
|
- Index = undefined,
|
|
|
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
|
|
|
emqx_resource_metrics:matched_inc(Id),
|
|
|
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
|
|
TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
|
|
|
Result = call_query(
|
|
|
- async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
|
|
+ async_if_possible, Id, ?NO_INDEX, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
|
|
),
|
|
|
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
|
|
Result.
|
|
|
@@ -198,7 +201,7 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
|
|
|
reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
|
|
|
},
|
|
|
QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
|
|
|
- case simple_async_query(Id, Request, QueryOpts, _Ref = []) of
|
|
|
+ case simple_async_query(Id, Request, QueryOpts, _Ref = ?NO_REQ_REF) of
|
|
|
{error, _} = Error ->
|
|
|
?tp("resource_simple_sync_internal_buffer_query_error", #{
|
|
|
id => Id, request => Request
|
|
|
@@ -1420,7 +1423,7 @@ apply_query_fun(
|
|
|
?tp(call_query_async, #{
|
|
|
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
|
|
|
}),
|
|
|
- InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
|
|
|
+ InflightTID = maps:get(inflight_tid, QueryOpts, ?NO_INFLIGHT),
|
|
|
?APPLY_RESOURCE(
|
|
|
call_query_async,
|
|
|
begin
|
|
|
@@ -1506,7 +1509,7 @@ apply_query_fun(
|
|
|
?tp(call_batch_query_async, #{
|
|
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
|
|
|
}),
|
|
|
- InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
|
|
|
+ InflightTID = maps:get(inflight_tid, QueryOpts),
|
|
|
?APPLY_RESOURCE(
|
|
|
call_batch_query_async,
|
|
|
begin
|
|
|
@@ -1784,7 +1787,7 @@ do_async_ack(InflightTID, Ref, Id, PostFn, BufferWorkerPid, DeltaCounters, IsSim
|
|
|
%% 2. If the request was previously failed and now pending on a retry,
|
|
|
%% then this function will return 'continue' as there is no way to
|
|
|
%% tell if this reply is stae or not.
|
|
|
-maybe_handle_unknown_async_reply(undefined, _Ref, true) ->
|
|
|
+maybe_handle_unknown_async_reply(?NO_INFLIGHT, _Ref, true) ->
|
|
|
continue;
|
|
|
maybe_handle_unknown_async_reply(InflightTID, Ref, _) ->
|
|
|
try ets:member(InflightTID, Ref) of
|
|
|
@@ -1911,7 +1914,7 @@ inflight_get_first_retriable(InflightTID, Now) ->
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-is_inflight_full(undefined) ->
|
|
|
+is_inflight_full(?NO_INFLIGHT) ->
|
|
|
false;
|
|
|
is_inflight_full(InflightTID) ->
|
|
|
[{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
|
|
|
@@ -1927,7 +1930,7 @@ inflight_num_msgs(InflightTID) ->
|
|
|
[{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
|
|
|
Size.
|
|
|
|
|
|
-inflight_append(undefined, _InflightItem) ->
|
|
|
+inflight_append(?NO_INFLIGHT, _InflightItem) ->
|
|
|
ok;
|
|
|
inflight_append(
|
|
|
InflightTID,
|
|
|
@@ -1963,7 +1966,7 @@ inflight_append(InflightTID, {Ref, Data}) ->
|
|
|
|
|
|
%% a request was already appended and originally not retriable, but an
|
|
|
%% error occurred and it is now retriable.
|
|
|
-mark_inflight_as_retriable(undefined, _Ref) ->
|
|
|
+mark_inflight_as_retriable(?NO_INFLIGHT, _Ref) ->
|
|
|
ok;
|
|
|
mark_inflight_as_retriable(InflightTID, Ref) ->
|
|
|
_ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
|
|
|
@@ -1991,21 +1994,18 @@ ensure_async_worker_monitored(
|
|
|
ensure_async_worker_monitored(Data0, _Result) ->
|
|
|
{Data0, undefined}.
|
|
|
|
|
|
--spec store_async_worker_reference(undefined | ets:tid(), inflight_key(), undefined | reference()) ->
|
|
|
+-spec store_async_worker_reference(inflight_table(), inflight_key(), undefined | reference()) ->
|
|
|
ok.
|
|
|
-store_async_worker_reference(undefined = _InflightTID, _Ref, _AsyncWorkerMRef) ->
|
|
|
- ok;
|
|
|
-store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
|
|
|
+store_async_worker_reference(_InflightTID, _Ref, undefined) ->
|
|
|
ok;
|
|
|
-store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) when
|
|
|
- is_reference(AsyncWorkerMRef)
|
|
|
-->
|
|
|
+store_async_worker_reference(InflightTID, Ref, AsyncWorkerMRef) ->
|
|
|
+ is_integer(Ref) orelse error(#{unexpected_ref => Ref}),
|
|
|
_ = ets:update_element(
|
|
|
InflightTID, Ref, {?WORKER_MREF_IDX, AsyncWorkerMRef}
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
-ack_inflight(undefined, _Ref, _BufferWorkerPid) ->
|
|
|
+ack_inflight(?NO_INFLIGHT, _Ref, _BufferWorkerPid) ->
|
|
|
false;
|
|
|
ack_inflight(InflightTID, Ref, BufferWorkerPid) ->
|
|
|
{Count, Removed} =
|
|
|
@@ -2054,7 +2054,7 @@ inc_inflight(InflightTID, Count) ->
|
|
|
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
|
|
|
ok.
|
|
|
|
|
|
--spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
|
|
|
+-spec dec_inflight_remove(?NO_INFLIGHT | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
|
|
|
no_flush | flush.
|
|
|
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
|
|
|
no_flush;
|
|
|
@@ -2343,9 +2343,9 @@ reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
|
|
|
do_reply_caller(MaybeReplyTo, Response).
|
|
|
|
|
|
%% record buffer worker only when it's a request initiated from buffer worker.
|
|
|
-buffer_worker(undefined) ->
|
|
|
+buffer_worker(?NO_INFLIGHT) ->
|
|
|
%% simple_sync_internal_buffer_query
|
|
|
- [];
|
|
|
+ ?NO_BUFFER_WORKER;
|
|
|
buffer_worker(_Tid) ->
|
|
|
self().
|
|
|
|