Преглед изворни кода

fix(redis): disable batching in `redis_cluster` bridges

Through configuration subsystem.
Andrew Mayorov пре 3 година
родитељ
комит
26fcaecad7

+ 44 - 12
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl

@@ -7,7 +7,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
 
 -export([
     conn_bridge_examples/1
@@ -80,13 +80,20 @@ values(common, RedisType, SpecificOpts) ->
         pool_size => 8,
         password => <<"secret">>,
         command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>],
-        resource_opts => #{
+        resource_opts => values(resource_opts, RedisType, #{}),
+        ssl => #{enable => false}
+    },
+    maps:merge(Config, SpecificOpts);
+values(resource_opts, "cluster", SpecificOpts) ->
+    SpecificOpts;
+values(resource_opts, _RedisType, SpecificOpts) ->
+    maps:merge(
+        #{
             batch_size => 1,
             batch_time => <<"20ms">>
         },
-        ssl => #{enable => false}
-    },
-    maps:merge(Config, SpecificOpts).
+        SpecificOpts
+    ).
 
 %% -------------------------------------------------------------------------------------------------
 %% Hocon Schema Definitions
@@ -115,29 +122,31 @@ fields("get_cluster") ->
 fields(Type) when
     Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster
 ->
-    redis_bridge_common_fields() ++
-        connector_fields(Type).
+    redis_bridge_common_fields(Type) ++
+        connector_fields(Type);
+fields("creation_opts_" ++ Type) ->
+    resource_creation_fields(Type).
 
 method_fileds(post, ConnectorType) ->
-    redis_bridge_common_fields() ++
+    redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType);
 method_fileds(get, ConnectorType) ->
-    redis_bridge_common_fields() ++
+    redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
         emqx_bridge_schema:status_fields();
 method_fileds(put, ConnectorType) ->
-    redis_bridge_common_fields() ++
+    redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType).
 
-redis_bridge_common_fields() ->
+redis_bridge_common_fields(Type) ->
     emqx_bridge_schema:common_bridge_fields() ++
         [
             {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
             {command_template, fun command_template/1}
         ] ++
-        emqx_resource_schema:fields("resource_opts").
+        resource_fields(Type).
 
 connector_fields(Type) ->
     RedisType = bridge_type_to_redis_conn_type(Type),
@@ -156,6 +165,27 @@ type_name_fields(Type) ->
         {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
     ].
 
+resource_fields(Type) ->
+    [
+        {resource_opts,
+            mk(
+                ref("creation_opts_" ++ atom_to_list(Type)),
+                #{
+                    required => false,
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+                }
+            )}
+    ].
+
+resource_creation_fields("redis_cluster") ->
+    % TODO
+    % Cluster bridge is currently incompatible with batching.
+    Fields = emqx_resource_schema:fields("creation_opts"),
+    lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]);
+resource_creation_fields(_) ->
+    emqx_resource_schema:fields("creation_opts").
+
 desc("config") ->
     ?DESC("desc_config");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
@@ -166,6 +196,8 @@ desc(redis_sentinel) ->
     ?DESC(emqx_connector_redis, "sentinel");
 desc(redis_cluster) ->
     ?DESC(emqx_connector_redis, "cluster");
+desc("creation_opts_" ++ _Type) ->
+    ?DESC(emqx_resource_schema, "creation_opts");
 desc(_) ->
     undefined.
 

+ 5 - 6
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl

@@ -142,12 +142,13 @@ end_per_suite(_Config) ->
 init_per_testcase(_Testcase, Config) ->
     ok = delete_all_rules(),
     ok = delete_all_bridges(),
-    case ?config(connector_type, Config) of
-        undefined ->
+    case {?config(connector_type, Config), ?config(batch_mode, Config)} of
+        {undefined, _} ->
             Config;
-        RedisType ->
+        {redis_cluster, batch_on} ->
+            {skip, "Batching is not supported by 'redis_cluster' bridge type"};
+        {RedisType, BatchMode} ->
             Transport = ?config(transport, Config),
-            BatchMode = ?config(batch_mode, Config),
             #{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(),
             #{BatchMode := ResourceConfig} = resource_configs(),
             IsBatch = (BatchMode =:= batch_on),
@@ -522,7 +523,6 @@ invalid_command_bridge_config() ->
     Conf1#{
         <<"resource_opts">> => #{
             <<"query_mode">> => <<"sync">>,
-            <<"batch_size">> => <<"1">>,
             <<"worker_pool_size">> => <<"1">>,
             <<"start_timeout">> => <<"15s">>
         },
@@ -533,7 +533,6 @@ resource_configs() ->
     #{
         batch_off => #{
             <<"query_mode">> => <<"sync">>,
-            <<"batch_size">> => <<"1">>,
             <<"start_timeout">> => <<"15s">>
         },
         batch_on => #{