Browse Source

Merge pull request #11722 from thalesmg/fix-bridge-buffer-supported-m-20231006

fix(kafka_producer): send messages to wolff producer to buffer even when connector is in `connecting` state (sync query mode)
Thales Macedo Garitezi 2 years atrás
parent
commit
2fe6e8e431

+ 2 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_bridge_kafka_impl_producer).
 
+-behaviour(emqx_resource).
+
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.23"},
+    {vsn, "0.1.24"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 1 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -306,8 +306,7 @@ query(ResId, Request, Opts) ->
                 {simple_async, _} ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
-                    Opts1 = Opts#{is_buffer_supported => true},
-                    emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
+                    emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
                 {simple_sync, _} ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again

+ 4 - 2
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1048,7 +1048,9 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
             ?RESOURCE_ERROR(not_found, "resource not found")
     end.
 
-do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) ->
+do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
+    ResQM =:= simple_async; ResQM =:= simple_sync
+->
     %% The connector supports buffer, send even in disconnected state
     #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
     CallMode = call_mode(QM, CBM),
@@ -1059,7 +1061,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Res
     #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
     CallMode = call_mode(QM, CBM),
     apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
-do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
+do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Resource) ->
     ?RESOURCE_ERROR(not_connected, "resource not connected").
 
 -define(APPLY_RESOURCE(NAME, EXPR, REQ),

+ 1 - 0
changes/ee/fix-11722.en.md

@@ -0,0 +1 @@
+Fixed an issue where a Kafka Producer bridge with `sync` query mode would not buffer messages when in the `connecting` state.