Просмотр исходного кода

Merge pull request #12083 from thalesmg/fix-connector-bws-m-20231201

fix(connector): don't start buffer workers for the connector itself
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
d594b38ceb

+ 5 - 1
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -382,9 +382,13 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
 safe_atom(Atom) when is_atom(Atom) -> Atom.
 
 parse_opts(Conf, Opts0) ->
-    override_start_after_created(Conf, Opts0).
+    Opts1 = override_start_after_created(Conf, Opts0),
+    set_no_buffer_workers(Opts1).
 
 override_start_after_created(Config, Opts) ->
     Enabled = maps:get(enable, Config, true),
     StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled),
     Opts#{start_after_created => StartAfterCreated}.
+
+set_no_buffer_workers(Opts) ->
+    Opts#{spawn_buffer_workers => false}.

+ 45 - 5
apps/emqx_connector/test/emqx_connector_SUITE.erl

@@ -163,11 +163,11 @@ t_remove_fail({'init', Config}) ->
     meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
     meck:expect(?CONNECTOR, on_stop, 2, ok),
     meck:expect(?CONNECTOR, on_get_status, 2, connected),
-    [{mocked_mods, [?CONNECTOR, emqx_connector_ee_schema]} | Config];
-t_remove_fail({'end', Config}) ->
-    MockedMods = ?config(mocked_mods, Config),
-    meck:unload(MockedMods),
+    meck:expect(?CONNECTOR, query_mode, 1, simple_async_internal_buffer),
     Config;
+t_remove_fail({'end', _Config}) ->
+    meck:unload(),
+    ok;
 t_remove_fail(_Config) ->
     ?assertEqual(
         [],
@@ -200,7 +200,20 @@ t_remove_fail(_Config) ->
             {_, {?CONNECTOR, on_add_channel, _}, {ok, connector_state}},
             {_, {?CONNECTOR, on_get_channels, [_]}, _}
         ],
-        meck:history(?CONNECTOR)
+        lists:filter(
+            fun({_, {?CONNECTOR, Fun, _Args}, _}) ->
+                lists:member(
+                    Fun, [
+                        callback_mode,
+                        on_start,
+                        on_get_channels,
+                        on_get_status,
+                        on_add_channel
+                    ]
+                )
+            end,
+            meck:history(?CONNECTOR)
+        )
     ),
     ok.
 
@@ -269,6 +282,33 @@ t_create_with_bad_name_root_path(_Config) ->
     ),
     ok.
 
+t_no_buffer_workers({'init', Config}) ->
+    meck:new(emqx_connector_ee_schema, [passthrough]),
+    meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
+    meck:new(?CONNECTOR, [non_strict]),
+    meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
+    meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
+    meck:expect(?CONNECTOR, on_get_channels, 1, []),
+    meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}),
+    meck:expect(?CONNECTOR, on_stop, 2, ok),
+    meck:expect(?CONNECTOR, on_get_status, 2, connected),
+    meck:expect(?CONNECTOR, query_mode, 1, sync),
+    [
+        {path, [connectors, kafka_producer, no_bws]}
+        | Config
+    ];
+t_no_buffer_workers({'end', Config}) ->
+    Path = ?config(path, Config),
+    {ok, _} = emqx:remove_config(Path),
+    meck:unload(),
+    ok;
+t_no_buffer_workers(Config) ->
+    Path = ?config(path, Config),
+    ConnConfig = connector_config(),
+    ?assertMatch({ok, _}, emqx:update_config(Path, ConnConfig)),
+    ?assertEqual([], supervisor:which_children(emqx_resource_buffer_worker_sup)),
+    ok.
+
 %% helpers
 
 connector_config() ->

+ 2 - 0
apps/emqx_connector/test/emqx_connector_dummy_impl.erl

@@ -17,6 +17,7 @@
 -module(emqx_connector_dummy_impl).
 
 -export([
+    query_mode/1,
     callback_mode/0,
     on_start/2,
     on_stop/2,
@@ -24,6 +25,7 @@
     on_get_channel_status/3
 ]).
 
+query_mode(_) -> error(unexpected).
 callback_mode() -> error(unexpected).
 on_start(_, _) -> error(unexpected).
 on_stop(_, _) -> error(unexpected).

+ 4 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -101,7 +101,10 @@
     max_buffer_bytes => pos_integer(),
     query_mode => query_mode(),
     resume_interval => pos_integer(),
-    inflight_window => pos_integer()
+    inflight_window => pos_integer(),
+    %% Only for `emqx_resource_manager' usage.  If false, prevents spawning buffer
+    %% workers, regardless of resource query mode.
+    spawn_buffer_workers => boolean()
 }.
 -type query_result() ::
     ok

+ 6 - 11
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -136,16 +136,9 @@ create(ResId, Group, ResourceType, Config, Opts) ->
     % Create metrics for the resource
     ok = emqx_resource:create_metrics(ResId),
     QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
-    case QueryMode of
-        %% the resource has built-in buffer, so there is no need for resource workers
-        simple_sync_internal_buffer ->
-            ok;
-        simple_async_internal_buffer ->
-            ok;
-        %% The resource is a consumer resource, so there is no need for resource workers
-        no_queries ->
-            ok;
-        _ ->
+    SpawnBufferWorkers = maps:get(spawn_buffer_workers, Opts, true),
+    case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of
+        true ->
             %% start resource workers as the query type requires them
             ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
             case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
@@ -153,7 +146,9 @@ create(ResId, Group, ResourceType, Config, Opts) ->
                     wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
                 false ->
                     ok
-            end
+            end;
+        false ->
+            ok
     end.
 
 %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.