瀏覽代碼

refactor: refacor query_mode detection code

This commit refactor the query_mode resource detection code according to
a suggestion from @zmstone. This commit should not contain any
functional change except for a change of the Kafka producer bridge
config.
Kjell Winblad 2 年之前
父節點
當前提交
ed9e29e769

+ 0 - 3
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -17,7 +17,6 @@
 %% `emqx_resource' API
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
     on_start/2,
     on_stop/2,
     on_query/3,
@@ -64,8 +63,6 @@ fields(config) ->
 
 callback_mode() -> always_sync.
 
-is_buffer_supported() -> false.
-
 on_start(
     InstanceId,
     #{

+ 1 - 4
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl

@@ -22,8 +22,7 @@
     on_query_async/4,
     on_batch_query/3,
     on_batch_query_async/4,
-    on_get_status/2,
-    is_buffer_supported/0
+    on_get_status/2
 ]).
 -export([reply_delegator/3]).
 
@@ -56,8 +55,6 @@
 %% emqx_resource API
 %%-------------------------------------------------------------------------------------------------
 
-is_buffer_supported() -> false.
-
 callback_mode() -> async_if_possible.
 
 -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.

+ 10 - 25
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -147,7 +147,7 @@ fields("config_producer") ->
 fields("config_consumer") ->
     fields(kafka_consumer);
 fields(kafka_producer) ->
-    fields("config") ++ fields(resource_opts) ++ fields(producer_opts);
+    fields("config") ++ fields(producer_opts);
 fields(kafka_consumer) ->
     fields("config") ++ fields(consumer_opts);
 fields("config") ->
@@ -295,28 +295,6 @@ fields(producer_kafka_opts) ->
                 required => false,
                 desc => ?DESC(producer_buffer)
             })},
-        {query_mode_sync_timeout,
-            mk(
-                emqx_schema:duration_ms(),
-                #{
-                    default => <<"5s">>,
-                    desc => ?DESC(query_mode_sync_timeout)
-                }
-            )}
-    ];
-fields(resource_opts) ->
-    [
-        {resource_opts,
-            mk(
-                ref(?MODULE, resource_opts_fields),
-                #{
-                    required => false,
-                    desc => ?DESC(resource_opts)
-                }
-            )}
-    ];
-fields(resource_opts_fields) ->
-    [
         {query_mode,
             mk(
                 enum([async, sync]),
@@ -324,6 +302,14 @@ fields(resource_opts_fields) ->
                     default => async,
                     desc => ?DESC(query_mode)
                 }
+            )},
+        {query_mode_sync_timeout,
+            mk(
+                emqx_schema:duration_ms(),
+                #{
+                    default => <<"5s">>,
+                    desc => ?DESC(query_mode_sync_timeout)
+                }
             )}
     ];
 fields(kafka_message) ->
@@ -440,8 +426,7 @@ struct_names() ->
         producer_opts,
         consumer_opts,
         consumer_kafka_opts,
-        consumer_topic_mapping,
-        resource_opts_fields
+        consumer_topic_mapping
     ].
 
 %% -------------------------------------------------------------------------------------------------

+ 4 - 6
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -8,7 +8,7 @@
 %% `emqx_resource' API
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
+    query_mode/1,
     on_start/2,
     on_stop/2,
     on_get_status/2
@@ -112,11 +112,9 @@
 callback_mode() ->
     async_if_possible.
 
-%% there are no queries to be made to this bridge, so we say that
-%% buffer is supported so we don't spawn unused resource buffer
-%% workers.
-is_buffer_supported() ->
-    true.
+%% consumer bridges don't need resource workers
+query_mode(_Config) ->
+    no_queries.
 
 -spec on_start(resource_id(), config()) -> {ok, state()}.
 on_start(ResourceId, Config) ->

+ 6 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -4,10 +4,11 @@
 -module(emqx_bridge_kafka_impl_producer).
 
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 %% callbacks of behaviour emqx_resource
 -export([
-    is_buffer_supported/0,
+    query_mode/1,
     callback_mode/0,
     on_start/2,
     on_stop/2,
@@ -32,7 +33,10 @@
 %% to hocon; keeping this as just `kafka' for backwards compatibility.
 -define(BRIDGE_TYPE, kafka).
 
-is_buffer_supported() -> true.
+query_mode(#{kafka := #{query_mode := sync}}) ->
+    simple_sync;
+query_mode(_) ->
+    simple_async.
 
 callback_mode() -> async_if_possible.
 

+ 0 - 3
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -17,7 +17,6 @@
 %% `emqx_resource' API
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
     on_start/2,
     on_stop/2,
     on_query/3,
@@ -49,8 +48,6 @@ fields(config) ->
 
 callback_mode() -> always_sync.
 
-is_buffer_supported() -> false.
-
 on_start(
     InstanceId,
     #{

+ 4 - 5
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl

@@ -11,7 +11,7 @@
 %% `emqx_resource' API
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
+    query_mode/1,
     on_start/2,
     on_stop/2,
     on_get_status/2,
@@ -70,10 +70,9 @@
 
 callback_mode() -> async_if_possible.
 
-%% there are no queries to be made to this bridge, so we say that
-%% buffer is supported so we don't spawn unused resource buffer
-%% workers.
-is_buffer_supported() -> true.
+%% consumer bridges don't need resource workers
+query_mode(_Config) ->
+    no_queries.
 
 -spec on_start(resource_id(), config()) -> {ok, state()}.
 on_start(InstanceId, Config) ->

+ 0 - 6
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -34,7 +34,6 @@
     %% Optional callbacks
     on_get_status/2,
     on_query/3,
-    is_buffer_supported/0,
     on_batch_query/3
 ]).
 
@@ -187,11 +186,6 @@ callback_mode() -> always_sync.
 
 %% emqx_resource callback
 
--spec is_buffer_supported() -> boolean().
-is_buffer_supported() ->
-    %% We want to make use of EMQX's buffer mechanism
-    false.
-
 %% emqx_resource callback called when the resource is started
 
 -spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}.

+ 0 - 3
apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl

@@ -17,7 +17,6 @@
 %% `emqx_resource' API
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
     on_start/2,
     on_stop/2,
     on_query/3,
@@ -86,8 +85,6 @@ servers() ->
 
 callback_mode() -> always_sync.
 
-is_buffer_supported() -> false.
-
 on_start(
     InstanceId,
     #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config

+ 0 - 3
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl

@@ -30,7 +30,6 @@
 %% callbacks for behaviour emqx_resource
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
     on_start/2,
     on_stop/2,
     on_query/3,
@@ -169,8 +168,6 @@ server() ->
 
 callback_mode() -> always_sync.
 
-is_buffer_supported() -> false.
-
 on_start(
     ResourceId = PoolName,
     #{

+ 0 - 3
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -17,7 +17,6 @@
 %% `emqx_resource' API
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
     on_start/2,
     on_stop/2,
     on_query/3,
@@ -79,8 +78,6 @@ server() ->
 
 callback_mode() -> always_sync.
 
-is_buffer_supported() -> false.
-
 on_start(
     InstanceId,
     #{

+ 0 - 3
apps/emqx_oracle/src/emqx_oracle.erl

@@ -18,7 +18,6 @@
 %% callbacks for behaviour emqx_resource
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
     on_start/2,
     on_stop/2,
     on_query/3,
@@ -68,8 +67,6 @@
 % be sync for now.
 callback_mode() -> always_sync.
 
-is_buffer_supported() -> false.
-
 -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
 on_start(
     InstId,

+ 28 - 25
apps/emqx_resource/src/emqx_resource.erl

@@ -100,7 +100,8 @@
     call_health_check/3,
     %% stop the instance
     call_stop/3,
-    is_buffer_supported/1
+    %% get the query mode of the resource
+    query_mode/3
 ]).
 
 %% list all the instances, id only.
@@ -132,7 +133,7 @@
     on_query_async/4,
     on_batch_query_async/4,
     on_get_status/2,
-    is_buffer_supported/0
+    query_mode/1
 ]).
 
 %% when calling emqx_resource:start/1
@@ -173,7 +174,8 @@
     | {resource_status(), resource_state()}
     | {resource_status(), resource_state(), term()}.
 
--callback is_buffer_supported() -> boolean().
+-callback query_mode(Config :: term()) ->
+    simple_sync | simple_async | sync | async | no_queries.
 
 -spec list_types() -> [module()].
 list_types() ->
@@ -276,27 +278,26 @@ query(ResId, Request) ->
     Result :: term().
 query(ResId, Request, Opts) ->
     case emqx_resource_manager:lookup_cached(ResId) of
-        {ok, _Group, #{query_mode := QM, mod := Module} = Config} ->
-            IsBufferSupported = is_buffer_supported(Module),
-            case {IsBufferSupported, QM} of
-                {true, _} ->
-                    %% only Kafka producer so far
+        {ok, _Group, #{query_mode := QM}} ->
+            case QM of
+                simple_async ->
+                    %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
+                    %% so the buffer worker does not need to lookup the cache again
                     Opts1 = Opts#{is_buffer_supported => true},
-                    do_query_built_in_buffer(QM, ResId, Request, Opts1);
-                {false, sync} ->
+                    emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
+                simple_sync ->
+                    %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
+                    %% so the buffer worker does not need to lookup the cache again
+                    emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
+                sync ->
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
-                {false, async} ->
+                async ->
                     emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
             end;
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
     end.
 
-do_query_built_in_buffer(async, ResId, Request, Opts1) ->
-    emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
-do_query_built_in_buffer(sync, ResId, Request, _Opts1) ->
-    emqx_resource_buffer_worker:simple_sync_query(ResId, Request).
-
 -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
 simple_sync_query(ResId, Request) ->
     emqx_resource_buffer_worker:simple_sync_query(ResId, Request).
@@ -372,15 +373,6 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
 get_callback_mode(Mod) ->
     Mod:callback_mode().
 
--spec is_buffer_supported(module()) -> boolean().
-is_buffer_supported(Module) ->
-    try
-        Module:is_buffer_supported()
-    catch
-        _:_ ->
-            false
-    end.
-
 -spec call_start(resource_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
 call_start(ResId, Mod, Config) ->
@@ -417,6 +409,17 @@ call_stop(ResId, Mod, ResourceState) ->
         Res
     end).
 
+-spec query_mode(module(), term(), creation_opts()) ->
+    simple_sync | simple_async | sync | async | no_queries.
+
+query_mode(Mod, Config, Opts) ->
+    case erlang:function_exported(Mod, query_mode, 1) of
+        true ->
+            Mod:query_mode(Config);
+        false ->
+            maps:get(query_mode, Opts, sync)
+    end.
+
 -spec check_config(resource_type(), raw_resource_config()) ->
     {ok, resource_config()} | {error, term()}.
 check_config(ResourceType, Conf) ->

+ 20 - 10
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -144,12 +144,18 @@ create(ResId, Group, ResourceType, Config, Opts) ->
         ],
         [matched]
     ),
-    case emqx_resource:is_buffer_supported(ResourceType) of
-        true ->
-            %% the resource it self supports
-            %% buffer, so there is no need for resource workers
+    QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
+    case QueryMode of
+        %% the resource has built-in buffer, so there is no need for resource workers
+        simple_sync ->
             ok;
-        false ->
+        simple_async ->
+            ok;
+        %% The resource is a consumer resource, so there is no need for resource workers
+        no_queries ->
+            ok;
+        _ ->
+            %% start resource workers as the query type requires them
             ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
             case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
                 true ->
@@ -288,16 +294,20 @@ health_check(ResId) ->
 
 %% @doc Function called from the supervisor to actually start the server
 start_link(ResId, Group, ResourceType, Config, Opts) ->
+    QueryMode =
+        case erlang:function_exported(ResourceType, query_mode, 1) of
+            true ->
+                ResourceType:query_mode(Config);
+            false ->
+                maps:get(query_mode, Opts, sync)
+        end,
+
     Data = #data{
         id = ResId,
         group = Group,
         mod = ResourceType,
         callback_mode = emqx_resource:get_callback_mode(ResourceType),
-        %% query_mode = dynamic | sync | async
-        %% TODO:
-        %%  dynamic mode is async mode when things are going well, but becomes sync mode
-        %%  if the resource worker is overloaded
-        query_mode = maps:get(query_mode, Opts, sync),
+        query_mode = QueryMode,
         config = Config,
         opts = Opts,
         state = undefined,

+ 1 - 1
lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ee_connector, [
     {description, "EMQX Enterprise connectors"},
-    {vsn, "0.1.13"},
+    {vsn, "0.1.14"},
     {registered, []},
     {applications, [
         kernel,

+ 0 - 3
lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl

@@ -15,7 +15,6 @@
 %% `emqx_resource' API
 -export([
     callback_mode/0,
-    is_buffer_supported/0,
     on_start/2,
     on_stop/2,
     on_query/3,
@@ -28,8 +27,6 @@
 
 callback_mode() -> emqx_connector_mongo:callback_mode().
 
-is_buffer_supported() -> false.
-
 on_start(InstanceId, Config) ->
     case emqx_connector_mongo:on_start(InstanceId, Config) of
         {ok, ConnectorState} ->