|
@@ -250,6 +250,7 @@ retry_first_from_queue(Q, Id, St) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) ->
|
|
retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) ->
|
|
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
|
|
|
Result = call_query(sync, Id, FirstQuery, #{}),
|
|
Result = call_query(sync, Id, FirstQuery, #{}),
|
|
|
case handle_query_result(Id, Result, false) of
|
|
case handle_query_result(Id, Result, false) of
|
|
|
%% Send failed because resource down
|
|
%% Send failed because resource down
|
|
@@ -376,49 +377,48 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
|
|
|
|
|
|
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) ->
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) ->
|
|
|
?SLOG(error, #{msg => resource_exception, info => Msg}),
|
|
?SLOG(error, #{msg => resource_exception, info => Msg}),
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
|
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
|
|
|
BlockWorker;
|
|
BlockWorker;
|
|
|
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
|
|
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
|
|
|
NotWorking == not_connected; NotWorking == blocked
|
|
NotWorking == not_connected; NotWorking == blocked
|
|
|
->
|
|
->
|
|
|
true;
|
|
true;
|
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) ->
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) ->
|
|
|
- ?SLOG(error, #{msg => resource_not_found, info => Msg}),
|
|
|
|
|
|
|
+ ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
|
|
|
BlockWorker;
|
|
BlockWorker;
|
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) ->
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) ->
|
|
|
- ?SLOG(error, #{msg => resource_stopped, info => Msg}),
|
|
|
|
|
|
|
+ ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
|
|
|
BlockWorker;
|
|
BlockWorker;
|
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
|
|
handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
|
|
|
- ?SLOG(error, #{msg => other_resource_error, reason => Reason}),
|
|
|
|
|
|
|
+ ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
|
|
|
BlockWorker;
|
|
BlockWorker;
|
|
|
handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) ->
|
|
handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) ->
|
|
|
%% the message will be queued in replayq or inflight window,
|
|
%% the message will be queued in replayq or inflight window,
|
|
|
- %% i.e. the counter 'queuing' will increase, so we pretend that we have not
|
|
|
|
|
|
|
+ %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
|
|
|
%% sent this message.
|
|
%% sent this message.
|
|
|
- ?SLOG(warning, #{msg => recoverable_error, reason => Reason}),
|
|
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
|
|
|
|
|
|
|
+ ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
|
|
|
true;
|
|
true;
|
|
|
handle_query_result(Id, {error, Reason}, BlockWorker) ->
|
|
handle_query_result(Id, {error, Reason}, BlockWorker) ->
|
|
|
- ?SLOG(error, #{msg => send_error, reason => Reason}),
|
|
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
|
|
|
|
+ ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
|
|
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
|
|
|
BlockWorker;
|
|
BlockWorker;
|
|
|
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
|
|
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
|
|
|
true;
|
|
true;
|
|
|
handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) ->
|
|
handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) ->
|
|
|
- ?SLOG(error, #{msg => async_send_error, info => Msg}),
|
|
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
|
|
|
|
+ ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
|
|
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'failed'),
|
|
|
BlockWorker;
|
|
BlockWorker;
|
|
|
handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
|
|
handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
|
|
|
BlockWorker;
|
|
BlockWorker;
|
|
|
handle_query_result(Id, Result, BlockWorker) ->
|
|
handle_query_result(Id, Result, BlockWorker) ->
|
|
|
assert_ok_result(Result),
|
|
assert_ok_result(Result),
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'),
|
|
|
|
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'success'),
|
|
|
BlockWorker.
|
|
BlockWorker.
|
|
|
|
|
|
|
|
call_query(QM0, Id, Query, QueryOpts) ->
|
|
call_query(QM0, Id, Query, QueryOpts) ->
|
|
@@ -459,11 +459,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
|
|
|
|
|
|
|
|
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
|
|
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
|
|
|
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
|
|
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
|
|
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
|
|
|
|
|
- Result = ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
|
|
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
|
|
|
|
|
- Result;
|
|
|
|
|
|
|
+ ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
|
|
|
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
|
|
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
|
|
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
|
@@ -475,8 +471,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
|
|
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
|
|
|
{async_return, inflight_full};
|
|
{async_return, inflight_full};
|
|
|
false ->
|
|
false ->
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
|
|
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
|
|
|
|
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight'),
|
|
|
ReplyFun = fun ?MODULE:reply_after_query/6,
|
|
ReplyFun = fun ?MODULE:reply_after_query/6,
|
|
|
Ref = make_message_ref(),
|
|
Ref = make_message_ref(),
|
|
|
Args = [self(), Id, Name, Ref, Query],
|
|
Args = [self(), Id, Name, Ref, Query],
|
|
@@ -489,12 +484,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
|
|
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
|
|
|
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
|
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
|
|
Requests = [Request || ?QUERY(_From, Request) <- Batch],
|
|
Requests = [Request || ?QUERY(_From, Request) <- Batch],
|
|
|
- BatchLen = length(Batch),
|
|
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
|
|
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
|
|
|
|
|
- Result = ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch),
|
|
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
|
|
|
|
|
- Result;
|
|
|
|
|
|
|
+ ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
|
|
|
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
|
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
|
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
|
@@ -507,8 +497,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
{async_return, inflight_full};
|
|
{async_return, inflight_full};
|
|
|
false ->
|
|
false ->
|
|
|
BatchLen = length(Batch),
|
|
BatchLen = length(Batch),
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
|
|
|
|
|
- ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
|
|
|
|
|
|
|
+ ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', BatchLen),
|
|
|
ReplyFun = fun ?MODULE:batch_reply_after_query/6,
|
|
ReplyFun = fun ?MODULE:batch_reply_after_query/6,
|
|
|
Ref = make_message_ref(),
|
|
Ref = make_message_ref(),
|
|
|
Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
|
|
Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
|
|
@@ -521,12 +510,13 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
).
|
|
).
|
|
|
|
|
|
|
|
reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
|
|
reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
|
|
|
- %% NOTE: 'sent.inflight' is message count that sent but no ACK received,
|
|
|
|
|
|
|
+ %% NOTE: 'inflight' is message count that sent async but no ACK received,
|
|
|
%% NOT the message number ququed in the inflight window.
|
|
%% NOT the message number ququed in the inflight window.
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
|
|
|
|
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -1),
|
|
|
case reply_caller(Id, ?REPLY(From, Request, Result)) of
|
|
case reply_caller(Id, ?REPLY(From, Request, Result)) of
|
|
|
true ->
|
|
true ->
|
|
|
- %% we marked these messages are 'queuing' although they are in inflight window
|
|
|
|
|
|
|
+ %% we marked these messages are 'queuing' although they are actually
|
|
|
|
|
+ %% keeped in inflight window, not replayq
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
|
|
|
?MODULE:block(Pid);
|
|
?MODULE:block(Pid);
|
|
|
false ->
|
|
false ->
|
|
@@ -534,13 +524,14 @@ reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
|
|
batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
|
|
|
- %% NOTE: 'sent.inflight' is message count that sent but no ACK received,
|
|
|
|
|
|
|
+ %% NOTE: 'inflight' is message count that sent async but no ACK received,
|
|
|
%% NOT the message number ququed in the inflight window.
|
|
%% NOT the message number ququed in the inflight window.
|
|
|
BatchLen = length(Batch),
|
|
BatchLen = length(Batch),
|
|
|
- emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
|
|
|
|
|
|
|
+ emqx_metrics_worker:inc(?RES_METRICS, Id, 'inflight', -BatchLen),
|
|
|
case batch_reply_caller(Id, Result, Batch) of
|
|
case batch_reply_caller(Id, Result, Batch) of
|
|
|
true ->
|
|
true ->
|
|
|
- %% we marked these messages are 'queuing' although they are in inflight window
|
|
|
|
|
|
|
+ %% we marked these messages are 'queuing' although they are actually
|
|
|
|
|
+ %% keeped in inflight window, not replayq
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', BatchLen),
|
|
|
?MODULE:block(Pid);
|
|
?MODULE:block(Pid);
|
|
|
false ->
|
|
false ->
|