|
|
@@ -77,23 +77,27 @@ start_link(Id, Index, Opts) ->
|
|
|
sync_query(Id, Request, Opts) ->
|
|
|
PickKey = maps:get(pick_key, Opts, self()),
|
|
|
Timeout = maps:get(timeout, Opts, infinity),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
|
|
|
pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
|
|
|
|
|
|
-spec async_query(id(), request(), query_opts()) -> Result :: term().
|
|
|
async_query(Id, Request, Opts) ->
|
|
|
PickKey = maps:get(pick_key, Opts, self()),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
|
|
|
pick_cast(Id, PickKey, {query, Request, Opts}).
|
|
|
|
|
|
%% simple query the resource without batching and queuing messages.
|
|
|
-spec simple_sync_query(id(), request()) -> Result :: term().
|
|
|
simple_sync_query(Id, Request) ->
|
|
|
Result = call_query(sync, Id, ?QUERY(self(), Request), #{}),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
|
|
|
_ = handle_query_result(Id, Result, false),
|
|
|
Result.
|
|
|
|
|
|
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
|
|
|
simple_async_query(Id, Request, ReplyFun) ->
|
|
|
Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
|
|
|
_ = handle_query_result(Id, Result, false),
|
|
|
Result.
|
|
|
|
|
|
@@ -119,13 +123,15 @@ init({Id, Index, Opts}) ->
|
|
|
true ->
|
|
|
replayq:open(#{
|
|
|
dir => disk_queue_dir(Id, Index),
|
|
|
- seg_bytes => maps:get(queue_max_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
|
|
|
+ seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
|
|
|
+ max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
|
|
|
sizer => fun ?MODULE:estimate_size/1,
|
|
|
marshaller => fun ?MODULE:queue_item_marshaller/1
|
|
|
});
|
|
|
false ->
|
|
|
undefined
|
|
|
end,
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)),
|
|
|
ok = inflight_new(Name),
|
|
|
St = #{
|
|
|
id => Id,
|
|
|
@@ -149,8 +155,10 @@ running(cast, resume, _St) ->
|
|
|
keep_state_and_data;
|
|
|
running(cast, block, St) ->
|
|
|
{next_state, block, St};
|
|
|
-running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
|
|
|
- Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
|
|
|
+running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
|
|
|
+ is_list(Batch)
|
|
|
+->
|
|
|
+ Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
|
|
|
{next_state, block, St#{queue := Q1}};
|
|
|
running({call, From}, {query, Request, _Opts}, St) ->
|
|
|
query_or_acc(From, Request, St);
|
|
|
@@ -169,8 +177,10 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
|
|
|
{keep_state_and_data, {state_timeout, ResumeT, resume}};
|
|
|
blocked(cast, block, _St) ->
|
|
|
keep_state_and_data;
|
|
|
-blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
|
|
|
- Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
|
|
|
+blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
|
|
|
+ is_list(Batch)
|
|
|
+->
|
|
|
+ Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
|
|
|
{keep_state, St#{queue := Q1}};
|
|
|
blocked(cast, resume, St) ->
|
|
|
do_resume(St);
|
|
|
@@ -179,12 +189,12 @@ blocked(state_timeout, resume, St) ->
|
|
|
blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
|
|
|
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
|
|
|
_ = reply_caller(Id, ?REPLY(From, Request, Error)),
|
|
|
- {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}};
|
|
|
+ {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request))])}};
|
|
|
blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
|
|
|
ReplayFun = maps:get(async_reply_fun, Opts, undefined),
|
|
|
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
|
|
|
_ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)),
|
|
|
- {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}.
|
|
|
+ {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}.
|
|
|
|
|
|
terminate(_Reason, #{id := Id, index := Index}) ->
|
|
|
gproc_pool:disconnect_worker(Id, {Id, Index}).
|
|
|
@@ -206,10 +216,10 @@ estimate_size(QItem) ->
|
|
|
Pid when is_pid(Pid) ->
|
|
|
EXPR;
|
|
|
_ ->
|
|
|
- ?RESOURCE_ERROR(not_created, "resource not created")
|
|
|
+ ?RESOURCE_ERROR(worker_not_created, "resource not created")
|
|
|
catch
|
|
|
error:badarg ->
|
|
|
- ?RESOURCE_ERROR(not_created, "resource not created");
|
|
|
+ ?RESOURCE_ERROR(worker_not_created, "resource not created");
|
|
|
exit:{timeout, _} ->
|
|
|
?RESOURCE_ERROR(timeout, "call resource timeout")
|
|
|
end
|
|
|
@@ -255,18 +265,20 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S
|
|
|
inflight_drop(Name, Ref),
|
|
|
St0;
|
|
|
_ ->
|
|
|
- St0#{queue => drop_head(Q)}
|
|
|
+ St0#{queue => drop_head(Id, Q)}
|
|
|
end,
|
|
|
{keep_state, St, {state_timeout, 0, resume}}
|
|
|
end.
|
|
|
|
|
|
-drop_head(Q) ->
|
|
|
+drop_head(Id, Q) ->
|
|
|
{Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
|
|
|
ok = replayq:ack(Q1, AckRef),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1),
|
|
|
Q1.
|
|
|
|
|
|
-query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) ->
|
|
|
+query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
|
|
|
Acc1 = [?QUERY(From, Request) | Acc],
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'),
|
|
|
St = St0#{acc := Acc1, acc_left := Left - 1},
|
|
|
case Left =< 1 of
|
|
|
true -> flush(St);
|
|
|
@@ -277,18 +289,15 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St)
|
|
|
inflight_name => maps:get(name, St),
|
|
|
inflight_window => maps:get(async_inflight_window, St)
|
|
|
},
|
|
|
- case send_query(From, Request, Id, QueryOpts) of
|
|
|
+ Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
|
|
|
+ case reply_caller(Id, ?REPLY(From, Request, Result)) of
|
|
|
true ->
|
|
|
Query = ?QUERY(From, Request),
|
|
|
- {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}};
|
|
|
+ {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}};
|
|
|
false ->
|
|
|
{keep_state, St}
|
|
|
end.
|
|
|
|
|
|
-send_query(From, Request, Id, QueryOpts) ->
|
|
|
- Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
|
|
|
- reply_caller(Id, ?REPLY(From, Request, Result)).
|
|
|
-
|
|
|
flush(#{acc := []} = St) ->
|
|
|
{keep_state, St};
|
|
|
flush(
|
|
|
@@ -303,18 +312,39 @@ flush(
|
|
|
inflight_name => maps:get(name, St),
|
|
|
inflight_window => maps:get(async_inflight_window, St)
|
|
|
},
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)),
|
|
|
Result = call_query(configured, Id, Batch, QueryOpts),
|
|
|
St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
|
|
|
case batch_reply_caller(Id, Result, Batch) of
|
|
|
true ->
|
|
|
- Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]),
|
|
|
+ Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]),
|
|
|
{next_state, blocked, St1#{queue := Q1}};
|
|
|
false ->
|
|
|
{keep_state, St1}
|
|
|
end.
|
|
|
|
|
|
-maybe_append_queue(undefined, _Items) -> undefined;
|
|
|
-maybe_append_queue(Q, Items) -> replayq:append(Q, Items).
|
|
|
+maybe_append_queue(Id, undefined, _Items) ->
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
|
|
|
+ undefined;
|
|
|
+maybe_append_queue(Id, Q, Items) ->
|
|
|
+ Q2 =
|
|
|
+ case replayq:overflow(Q) of
|
|
|
+ Overflow when Overflow =< 0 ->
|
|
|
+ Q;
|
|
|
+ Overflow ->
|
|
|
+ PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
|
|
|
+ {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
|
|
|
+ ok = replayq:ack(Q1, QAckRef),
|
|
|
+ Dropped = length(Items2),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
|
|
|
+ ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
|
|
|
+ Q1
|
|
|
+ end,
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'),
|
|
|
+ replayq:append(Q2, Items).
|
|
|
|
|
|
batch_reply_caller(Id, BatchResult, Batch) ->
|
|
|
lists:foldl(
|
|
|
@@ -344,29 +374,41 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
|
|
|
handle_query_result(Id, Result, BlockWorker).
|
|
|
|
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.exception'),
|
|
|
BlockWorker;
|
|
|
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
|
|
|
NotWorking == not_connected; NotWorking == blocked
|
|
|
->
|
|
|
true;
|
|
|
-handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) ->
|
|
|
+handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, _), BlockWorker) ->
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
|
|
|
+ BlockWorker;
|
|
|
+handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, _), BlockWorker) ->
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
|
|
|
+ BlockWorker;
|
|
|
+handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
|
|
|
+ ?SLOG(error, #{msg => other_resource_error, reason => Reason}),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
|
|
|
BlockWorker;
|
|
|
handle_query_result(Id, {error, _}, BlockWorker) ->
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
BlockWorker;
|
|
|
-handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
|
|
|
+handle_query_result(Id, {recoverable_error, _}, _BlockWorker) ->
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
|
|
|
true;
|
|
|
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
|
|
|
true;
|
|
|
-handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) ->
|
|
|
- true;
|
|
|
+handle_query_result(Id, {async_return, {error, _}}, BlockWorker) ->
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
+ BlockWorker;
|
|
|
handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
|
|
|
BlockWorker;
|
|
|
handle_query_result(Id, Result, BlockWorker) ->
|
|
|
assert_ok_result(Result),
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, success),
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'),
|
|
|
BlockWorker.
|
|
|
|
|
|
call_query(QM0, Id, Query, QueryOpts) ->
|
|
|
@@ -390,8 +432,8 @@ call_query(QM0, Id, Query, QueryOpts) ->
|
|
|
-define(APPLY_RESOURCE(EXPR, REQ),
|
|
|
try
|
|
|
%% if the callback module (connector) wants to return an error that
|
|
|
- %% makes the current resource goes into the `error` state, it should
|
|
|
- %% return `{resource_down, Reason}`
|
|
|
+ %% makes the current resource goes into the `blocked` state, it should
|
|
|
+ %% return `{recoverable_error, Reason}`
|
|
|
EXPR
|
|
|
catch
|
|
|
ERR:REASON:STACKTRACE ->
|
|
|
@@ -406,7 +448,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
|
|
|
|
|
|
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
|
|
|
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
|
|
|
?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
|
|
|
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
|
|
|
@@ -418,7 +460,8 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
?tp(inflight_full, #{id => Id, wind_size => WinSize}),
|
|
|
{async_return, inflight_full};
|
|
|
false ->
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
|
|
|
ReplyFun = fun ?MODULE:reply_after_query/6,
|
|
|
Ref = make_message_ref(),
|
|
|
Args = [self(), Id, Name, Ref, Query],
|
|
|
@@ -431,7 +474,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
|
|
|
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
|
|
Requests = [Request || ?QUERY(_From, Request) <- Batch],
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
|
|
|
?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
|
|
|
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
|
|
@@ -443,7 +486,8 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
?tp(inflight_full, #{id => Id, wind_size => WinSize}),
|
|
|
{async_return, inflight_full};
|
|
|
false ->
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
|
|
|
ReplyFun = fun ?MODULE:batch_reply_after_query/6,
|
|
|
Ref = make_message_ref(),
|
|
|
Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
|
|
|
@@ -457,14 +501,20 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
|
|
|
reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
|
|
|
case reply_caller(Id, ?REPLY(From, Request, Result)) of
|
|
|
- true -> ?MODULE:block(Pid);
|
|
|
- false -> inflight_drop(Name, Ref)
|
|
|
+ true ->
|
|
|
+ ?MODULE:block(Pid);
|
|
|
+ false ->
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
|
|
|
+ inflight_drop(Name, Ref)
|
|
|
end.
|
|
|
|
|
|
batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
|
|
|
case batch_reply_caller(Id, Result, Batch) of
|
|
|
- true -> ?MODULE:block(Pid);
|
|
|
- false -> inflight_drop(Name, Ref)
|
|
|
+ true ->
|
|
|
+ ?MODULE:block(Pid);
|
|
|
+ false ->
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)),
|
|
|
+ inflight_drop(Name, Ref)
|
|
|
end.
|
|
|
%%==============================================================================
|
|
|
%% the inflight queue for async query
|
|
|
@@ -518,6 +568,11 @@ assert_ok_result(R) when is_tuple(R) ->
|
|
|
assert_ok_result(R) ->
|
|
|
error({not_ok_result, R}).
|
|
|
|
|
|
+queue_count(undefined) ->
|
|
|
+ 0;
|
|
|
+queue_count(Q) ->
|
|
|
+ replayq:count(Q).
|
|
|
+
|
|
|
-spec name(id(), integer()) -> atom().
|
|
|
name(Id, Index) ->
|
|
|
Mod = atom_to_list(?MODULE),
|