瀏覽代碼

fix(resource): create `simple_async_internal_buffer` query mode for bridges with internal buffering

Since authn/authz backends also use simple async/sync queries, we may want to avoid them
calling the connector when it's not connected.
Thales Macedo Garitezi 2 年之前
父節點
當前提交
eebfb44f72

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -34,7 +34,7 @@
 query_mode(#{kafka := #{query_mode := sync}}) ->
     simple_sync_internal_buffer;
 query_mode(_) ->
-    simple_async.
+    simple_async_internal_buffer.
 
 callback_mode() -> async_if_possible.
 

+ 8 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -311,7 +311,15 @@ query(ResId, Request, Opts) ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
+                {simple_async_internal_buffer, _} ->
+                    %% This is for bridges/connectors that have internal buffering, such
+                    %% as Kafka and Pulsar producers.
+                    %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
+                    %% so the buffer worker does not need to lookup the cache again
+                    emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
                 {simple_sync_internal_buffer, _} ->
+                    %% This is for bridges/connectors that have internal buffering, such
+                    %% as Kafka and Pulsar producers.
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     emqx_resource_buffer_worker:simple_sync_internal_buffer_query(

+ 1 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1088,7 +1088,7 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
     end.
 
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
-    ResQM =:= simple_async; ResQM =:= simple_sync
+    ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
 ->
     %% The connector supports buffer, send even in disconnected state
     #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,