|
|
@@ -374,18 +374,21 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
|
|
|
gen_statem:reply(From, Result),
|
|
|
handle_query_result(Id, Result, BlockWorker).
|
|
|
|
|
|
-handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
|
|
|
+handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), BlockWorker) ->
|
|
|
+ ?SLOG(error, #{msg => resource_exception, info => Msg}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
BlockWorker;
|
|
|
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
|
|
|
NotWorking == not_connected; NotWorking == blocked
|
|
|
->
|
|
|
true;
|
|
|
-handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, _), BlockWorker) ->
|
|
|
+handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), BlockWorker) ->
|
|
|
+ ?SLOG(error, #{msg => resource_not_found, info => Msg}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
|
|
|
BlockWorker;
|
|
|
-handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, _), BlockWorker) ->
|
|
|
+handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), BlockWorker) ->
|
|
|
+ ?SLOG(error, #{msg => resource_stopped, info => Msg}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
|
|
|
BlockWorker;
|
|
|
@@ -394,18 +397,21 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
|
|
|
BlockWorker;
|
|
|
-handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) ->
|
|
|
+handle_query_result(Id, {error, {recoverable_error, Reason}}, _BlockWorker) ->
|
|
|
%% the message will be queued in replayq or inflight window,
|
|
|
%% i.e. the counter 'queuing' will increase, so we pretend that we have not
|
|
|
%% sent this message.
|
|
|
+ ?SLOG(warning, #{msg => recoverable_error, reason => Reason}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
|
|
|
true;
|
|
|
-handle_query_result(Id, {error, _}, BlockWorker) ->
|
|
|
+handle_query_result(Id, {error, Reason}, BlockWorker) ->
|
|
|
+ ?SLOG(error, #{msg => send_error, reason => Reason}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
BlockWorker;
|
|
|
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
|
|
|
true;
|
|
|
-handle_query_result(Id, {async_return, {error, _}}, BlockWorker) ->
|
|
|
+handle_query_result(Id, {async_return, {error, Msg}}, BlockWorker) ->
|
|
|
+ ?SLOG(error, #{msg => async_send_error, info => Msg}),
|
|
|
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
|
|
|
BlockWorker;
|
|
|
handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
|
|
|
@@ -433,7 +439,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
|
|
|
?RESOURCE_ERROR(not_found, "resource not found")
|
|
|
end.
|
|
|
|
|
|
--define(APPLY_RESOURCE(EXPR, REQ),
|
|
|
+-define(APPLY_RESOURCE(NAME, EXPR, REQ),
|
|
|
try
|
|
|
%% if the callback module (connector) wants to return an error that
|
|
|
%% makes the current resource goes into the `blocked` state, it should
|
|
|
@@ -441,12 +447,13 @@ call_query(QM0, Id, Query, QueryOpts) ->
|
|
|
EXPR
|
|
|
catch
|
|
|
ERR:REASON:STACKTRACE ->
|
|
|
- MSG = io_lib:format(
|
|
|
- "call query failed, func: ~s, id: ~s, error: ~0p, Request: ~0p",
|
|
|
- [??EXPR, Id, {ERR, REASON, STACKTRACE}, REQ],
|
|
|
- [{chars_limit, 1024}]
|
|
|
- ),
|
|
|
- ?RESOURCE_ERROR(exception, MSG)
|
|
|
+ ?RESOURCE_ERROR(exception, #{
|
|
|
+ name => NAME,
|
|
|
+ id => Id,
|
|
|
+ request => REQ,
|
|
|
+ error => {ERR, REASON},
|
|
|
+ stacktrace => STACKTRACE
|
|
|
+ })
|
|
|
end
|
|
|
).
|
|
|
|
|
|
@@ -454,7 +461,7 @@ apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -
|
|
|
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
|
|
|
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
|
|
|
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
|
|
|
- Result = ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request),
|
|
|
+ Result = ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
|
|
|
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
|
|
|
Result;
|
|
|
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
@@ -462,6 +469,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
|
|
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
|
|
WinSize = maps:get(inflight_window, QueryOpts, undefined),
|
|
|
?APPLY_RESOURCE(
|
|
|
+ call_query_async,
|
|
|
case inflight_is_full(Name, WinSize) of
|
|
|
true ->
|
|
|
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
|
|
|
@@ -484,7 +492,7 @@ apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
|
|
|
BatchLen = length(Batch),
|
|
|
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', BatchLen),
|
|
|
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', BatchLen),
|
|
|
- Result = ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch),
|
|
|
+ Result = ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch),
|
|
|
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -BatchLen),
|
|
|
Result;
|
|
|
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
@@ -492,6 +500,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
|
|
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
|
|
WinSize = maps:get(inflight_window, QueryOpts, undefined),
|
|
|
?APPLY_RESOURCE(
|
|
|
+ call_batch_query_async,
|
|
|
case inflight_is_full(Name, WinSize) of
|
|
|
true ->
|
|
|
?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}),
|