|
|
@@ -120,6 +120,9 @@
|
|
|
-define(NO_INDEX, []).
|
|
|
-define(NO_REQ_REF, []).
|
|
|
-define(NO_BUFFER_WORKER, []).
|
|
|
+-define(IS_BYPASS(Mode),
|
|
|
+ (Mode =:= simple_sync_internal_buffer orelse Mode =:= simple_async_internal_buffer)
|
|
|
+).
|
|
|
|
|
|
callback_mode() -> [state_functions, state_enter].
|
|
|
|
|
|
@@ -161,11 +164,10 @@ simple_sync_query(Id, Request, QueryOpts0) ->
|
|
|
?tp(simple_sync_query, #{id => Id, request => Request}),
|
|
|
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
|
|
|
emqx_resource_metrics:matched_inc(Id),
|
|
|
- Ref = make_request_ref(),
|
|
|
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
|
|
TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
|
|
|
Result = call_query(
|
|
|
- force_sync, Id, ?NO_INDEX, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
|
|
+ force_sync, Id, ?NO_INDEX, ?NO_REQ_REF, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
|
|
),
|
|
|
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
|
|
Result.
|
|
|
@@ -173,16 +175,18 @@ simple_sync_query(Id, Request, QueryOpts0) ->
|
|
|
%% simple async-query the resource without batching and queuing.
|
|
|
-spec simple_async_query(id(), request(), query_opts()) -> term().
|
|
|
simple_async_query(Id, Request, QueryOpts0) ->
|
|
|
- simple_async_query(Id, Request, QueryOpts0, make_request_ref()).
|
|
|
-
|
|
|
-simple_async_query(Id, Request, QueryOpts0, Ref) ->
|
|
|
?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
|
|
|
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
|
|
|
emqx_resource_metrics:matched_inc(Id),
|
|
|
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
|
|
TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
|
|
|
Result = call_query(
|
|
|
- async_if_possible, Id, ?NO_INDEX, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
|
|
+ async_if_possible,
|
|
|
+ Id,
|
|
|
+ ?NO_INDEX,
|
|
|
+ ?NO_REQ_REF,
|
|
|
+ ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx),
|
|
|
+ QueryOpts
|
|
|
),
|
|
|
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
|
|
Result.
|
|
|
@@ -201,7 +205,7 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
|
|
|
reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
|
|
|
},
|
|
|
QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
|
|
|
- case simple_async_query(Id, Request, QueryOpts, _Ref = ?NO_REQ_REF) of
|
|
|
+ case simple_async_query(Id, Request, QueryOpts) of
|
|
|
{error, _} = Error ->
|
|
|
?tp("resource_simple_sync_internal_buffer_query_error", #{
|
|
|
id => Id, request => Request
|
|
|
@@ -1331,7 +1335,7 @@ error_if_channel_is_not_installed(Id, QueryOpts) ->
|
|
|
end.
|
|
|
|
|
|
do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when
|
|
|
- ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer
|
|
|
+ ?IS_BYPASS(ReqQM)
|
|
|
->
|
|
|
%% The query overrides the query mode of the resource, send even in disconnected state
|
|
|
?tp(simple_query_override, #{query_mode => ReqQM}),
|
|
|
@@ -1340,7 +1344,7 @@ do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Res
|
|
|
?tp(simple_query_enter, #{}),
|
|
|
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
|
|
|
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
|
|
|
- ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
|
|
|
+ ?IS_BYPASS(ResQM)
|
|
|
->
|
|
|
%% The connector supports buffer, send even in disconnected state
|
|
|
#{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
|