Przeglądaj źródła

refactor(buffer worker): split default query opts for sync and async modes

Thales Macedo Garitezi 1 rok temu
rodzic
commit
0a5e5b742d

+ 14 - 8
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -162,7 +162,7 @@ simple_sync_query(Id, Request, QueryOpts0) ->
     %% would mess up the metrics anyway.  `undefined' is ignored by
     %% `emqx_resource_metrics:*_shift/3'.
     ?tp(simple_sync_query, #{id => Id, request => Request}),
-    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
+    QueryOpts = maps:merge(simple_sync_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
     TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
@@ -176,7 +176,7 @@ simple_sync_query(Id, Request, QueryOpts0) ->
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 simple_async_query(Id, Request, QueryOpts0) ->
     ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
-    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
+    QueryOpts = maps:merge(simple_async_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
     TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
@@ -204,7 +204,7 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         QueryOpts1 = QueryOpts0#{
             reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
         },
-        QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
+        QueryOpts = #{timeout := Timeout} = maps:merge(simple_sync_query_opts(), QueryOpts1),
         case simple_async_query(Id, Request, QueryOpts) of
             {error, _} = Error ->
                 ?tp("resource_simple_sync_internal_buffer_query_error", #{
@@ -235,8 +235,16 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
         _ = unalias(ReplyAlias)
     end.
 
-simple_query_opts() ->
-    ensure_expire_at(#{simple_query => true, timeout => infinity}).
+simple_sync_query_opts() ->
+    %% Default `resource_opts.resource_ttl' is 45 seconds, so we use the same default
+    %% value here to match that.  No need to set `expire_at', since there's no queuing
+    %% involved.
+    #{simple_query => true, timeout => 45_000}.
+
+simple_async_query_opts() ->
+    %% No need to define `expire_at' nor `timeout', since there's no queuing involved in
+    %% simple queries.
+    #{simple_query => true}.
 
 -spec block(pid()) -> ok.
 block(ServerRef) ->
@@ -2342,9 +2350,7 @@ buffer_worker(_Tid) ->
     self().
 
 is_simple_query(#{simple_query := Bool}) ->
-    Bool;
-is_simple_query(_) ->
-    false.
+    Bool.
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").