Преглед изворни кода

fix: fix simple_sync_query

Stefan Strigler пре 2 година
родитељ
комит
1363108678

+ 1 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -290,7 +290,7 @@ query(ResId, Request, Opts) ->
                 {simple_sync, _} ->
                 {simple_sync, _} ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     %% so the buffer worker does not need to lookup the cache again
-                    emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
+                    emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
                 {sync, _} ->
                 {sync, _} ->
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
                 {async, _} ->
                 {async, _} ->

+ 7 - 2
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -38,6 +38,7 @@
 
 
 -export([
 -export([
     simple_sync_query/2,
     simple_sync_query/2,
+    simple_sync_query/3,
     simple_async_query/3
     simple_async_query/3
 ]).
 ]).
 
 
@@ -133,6 +134,10 @@ async_query(Id, Request, Opts0) ->
 %% simple query the resource without batching and queuing.
 %% simple query the resource without batching and queuing.
 -spec simple_sync_query(id(), request()) -> term().
 -spec simple_sync_query(id(), request()) -> term().
 simple_sync_query(Id, Request) ->
 simple_sync_query(Id, Request) ->
+    simple_sync_query(Id, Request, #{}).
+
+-spec simple_sync_query(id(), request(), query_opts()) -> term().
+simple_sync_query(Id, Request, QueryOpts0) ->
     %% Note: since calling this function implies in bypassing the
     %% Note: since calling this function implies in bypassing the
     %% buffer workers, and each buffer worker index is used when
     %% buffer workers, and each buffer worker index is used when
     %% collecting gauge metrics, we use this dummy index.  If this
     %% collecting gauge metrics, we use this dummy index.  If this
@@ -141,12 +146,12 @@ simple_sync_query(Id, Request) ->
     %% `emqx_resource_metrics:*_shift/3'.
     %% `emqx_resource_metrics:*_shift/3'.
     ?tp(simple_sync_query, #{id => Id, request => Request}),
     ?tp(simple_sync_query, #{id => Id, request => Request}),
     Index = undefined,
     Index = undefined,
-    QueryOpts = simple_query_opts(),
+    QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
     Ref = make_request_ref(),
     Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
-    Result.
+    maybe_reply_to(Result, QueryOpts).
 
 
 %% simple async-query the resource without batching and queuing.
 %% simple async-query the resource without batching and queuing.
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 -spec simple_async_query(id(), request(), query_opts()) -> term().