浏览代码

Merge pull request #13970 from thalesmg/20241009-r58-kafka-sync-query-opts

fix(kafka producer action): set query timeout using non-conventional parameter schema
zmstone 1 年之前
父节点
当前提交
d673b9ade3

+ 6 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -14,6 +14,7 @@
 -export([
     resource_type/0,
     query_mode/1,
+    query_opts/1,
     callback_mode/0,
     on_start/2,
     on_stop/2,
@@ -45,6 +46,11 @@ query_mode(#{parameters := #{query_mode := sync}}) ->
 query_mode(_) ->
     simple_async_internal_buffer.
 
+query_opts(#{parameters := #{query_mode := sync, sync_query_timeout := Timeout}}) ->
+    #{timeout => Timeout};
+query_opts(_) ->
+    #{}.
+
 callback_mode() -> async_if_possible.
 
 check_config(Key, Config) when is_map_key(Key, Config) ->

+ 14 - 8
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -162,7 +162,7 @@ simple_sync_query(Id, Request, QueryOpts0) ->
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
     ?tp(simple_sync_query, #{id => Id, request => Request}),
-    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
+    QueryOpts = maps:merge(simple_sync_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
     TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
@@ -176,7 +176,7 @@ simple_sync_query(Id, Request, QueryOpts0) ->
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 simple_async_query(Id, Request, QueryOpts0) ->
     ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
-    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
+    QueryOpts = maps:merge(simple_async_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
     TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
@@ -204,7 +204,7 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         QueryOpts1 = QueryOpts0#{
             reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
         },
-        QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
+        QueryOpts = #{timeout := Timeout} = maps:merge(simple_sync_query_opts(), QueryOpts1),
         case simple_async_query(Id, Request, QueryOpts) of
             {error, _} = Error ->
                 ?tp("resource_simple_sync_internal_buffer_query_error", #{
@@ -235,8 +235,16 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         _ = unalias(ReplyAlias)
     end.
 
-simple_query_opts() ->
-    ensure_expire_at(#{simple_query => true, timeout => infinity}).
+simple_sync_query_opts() ->
+    %% Default `resource_opts.resource_ttl' is 45 seconds, so we use the same default
+    %% value here to match that.  No need to set `expire_at', since there's no queuing
+    %% involved.
+    #{simple_query => true, timeout => 45_000}.
+
+simple_async_query_opts() ->
+    %% No need to define `expire_at' nor `timeout', since there's no queuing involved in
+    %% simple queries.
+    #{simple_query => true}.
 
 -spec block(pid()) -> ok.
 block(ServerRef) ->
@@ -2342,9 +2350,7 @@ buffer_worker(_Tid) ->
     self().
 
 is_simple_query(#{simple_query := Bool}) ->
-    Bool;
-is_simple_query(_) ->
-    false.
+    Bool.
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").