Преглед изворни кода

Merge pull request #9838 from keynslug/fix/redis-cluster-batching

feat(redis): disable batching in redis_cluster bridges
Andrew Mayorov пре 3 година
родитељ
комит
d35e46b2d5

+ 5 - 1
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -222,6 +222,8 @@ is_unrecoverable_error(Results) when is_list(Results) ->
     lists:any(fun is_unrecoverable_error/1, Results);
 is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) ->
     true;
+is_unrecoverable_error({error, invalid_cluster_command}) ->
+    true;
 is_unrecoverable_error(_) ->
     false.
 
@@ -267,7 +269,9 @@ do_cmd(PoolName, cluster, {cmd, Command}) ->
 do_cmd(Conn, _Type, {cmd, Command}) ->
     eredis:q(Conn, Command);
 do_cmd(PoolName, cluster, {cmds, Commands}) ->
-    wrap_qp_result(eredis_cluster:qp(PoolName, Commands));
+    % TODO
+    % Cluster mode is currently incompatible with batching.
+    wrap_qp_result([eredis_cluster:q(PoolName, Command) || Command <- Commands]);
 do_cmd(Conn, _Type, {cmds, Commands}) ->
     wrap_qp_result(eredis:qp(Conn, Commands)).
 

+ 44 - 12
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl

@@ -7,7 +7,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
 
 -export([
     conn_bridge_examples/1
@@ -80,13 +80,20 @@ values(common, RedisType, SpecificOpts) ->
         pool_size => 8,
         password => <<"secret">>,
         command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>],
-        resource_opts => #{
+        resource_opts => values(resource_opts, RedisType, #{}),
+        ssl => #{enable => false}
+    },
+    maps:merge(Config, SpecificOpts);
+values(resource_opts, "cluster", SpecificOpts) ->
+    SpecificOpts;
+values(resource_opts, _RedisType, SpecificOpts) ->
+    maps:merge(
+        #{
             batch_size => 1,
             batch_time => <<"20ms">>
         },
-        ssl => #{enable => false}
-    },
-    maps:merge(Config, SpecificOpts).
+        SpecificOpts
+    ).
 
 %% -------------------------------------------------------------------------------------------------
 %% Hocon Schema Definitions
@@ -115,29 +122,31 @@ fields("get_cluster") ->
 fields(Type) when
     Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster
 ->
-    redis_bridge_common_fields() ++
-        connector_fields(Type).
+    redis_bridge_common_fields(Type) ++
+        connector_fields(Type);
+fields("creation_opts_" ++ Type) ->
+    resource_creation_fields(Type).
 
 method_fileds(post, ConnectorType) ->
-    redis_bridge_common_fields() ++
+    redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType);
 method_fileds(get, ConnectorType) ->
-    redis_bridge_common_fields() ++
+    redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
         emqx_bridge_schema:status_fields();
 method_fileds(put, ConnectorType) ->
-    redis_bridge_common_fields() ++
+    redis_bridge_common_fields(ConnectorType) ++
         connector_fields(ConnectorType).
 
-redis_bridge_common_fields() ->
+redis_bridge_common_fields(Type) ->
     emqx_bridge_schema:common_bridge_fields() ++
         [
             {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
             {command_template, fun command_template/1}
         ] ++
-        emqx_resource_schema:fields("resource_opts").
+        resource_fields(Type).
 
 connector_fields(Type) ->
     RedisType = bridge_type_to_redis_conn_type(Type),
@@ -156,6 +165,27 @@ type_name_fields(Type) ->
         {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
     ].
 
+resource_fields(Type) ->
+    [
+        {resource_opts,
+            mk(
+                ref("creation_opts_" ++ atom_to_list(Type)),
+                #{
+                    required => false,
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+                }
+            )}
+    ].
+
+resource_creation_fields("redis_cluster") ->
+    % TODO
+    % Cluster bridge is currently incompatible with batching.
+    Fields = emqx_resource_schema:fields("creation_opts"),
+    lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]);
+resource_creation_fields(_) ->
+    emqx_resource_schema:fields("creation_opts").
+
 desc("config") ->
     ?DESC("desc_config");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
@@ -166,6 +196,8 @@ desc(redis_sentinel) ->
     ?DESC(emqx_connector_redis, "sentinel");
 desc(redis_cluster) ->
     ?DESC(emqx_connector_redis, "cluster");
+desc("creation_opts_" ++ _Type) ->
+    ?DESC(emqx_resource_schema, "creation_opts");
 desc(_) ->
     undefined.
 

+ 32 - 20
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl

@@ -16,6 +16,9 @@
 %% CT boilerplate
 %%------------------------------------------------------------------------------
 
+-define(KEYSHARDS, 3).
+-define(KEYPREFIX, "MSGS").
+
 -define(REDIS_TOXYPROXY_CONNECT_CONFIG, #{
     <<"server">> => <<"toxiproxy:6379">>,
     <<"redis_type">> => <<"single">>
@@ -23,7 +26,7 @@
 
 -define(COMMON_REDIS_OPTS, #{
     <<"password">> => <<"public">>,
-    <<"command_template">> => [<<"RPUSH">>, <<"MSGS">>, <<"${payload}">>],
+    <<"command_template">> => [<<"RPUSH">>, <<?KEYPREFIX, "/${topic}">>, <<"${payload}">>],
     <<"local_topic">> => <<"local_topic/#">>
 }).
 
@@ -47,7 +50,7 @@
     )
 ).
 
-all() -> [{group, transport_types}, {group, rest}].
+all() -> [{group, transports}, {group, rest}].
 
 groups() ->
     ResourceSpecificTCs = [t_create_delete_bridge],
@@ -63,7 +66,7 @@ groups() ->
     ],
     [
         {rest, TCs},
-        {transport_types, [
+        {transports, [
             {group, tcp},
             {group, tls}
         ]},
@@ -79,7 +82,7 @@ groups() ->
 init_per_group(Group, Config) when
     Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster
 ->
-    [{transport_type, Group} | Config];
+    [{connector_type, Group} | Config];
 init_per_group(Group, Config) when
     Group =:= tcp; Group =:= tls
 ->
@@ -139,12 +142,13 @@ end_per_suite(_Config) ->
 init_per_testcase(_Testcase, Config) ->
     ok = delete_all_rules(),
     ok = delete_all_bridges(),
-    case ?config(transport_type, Config) of
-        undefined ->
+    case {?config(connector_type, Config), ?config(batch_mode, Config)} of
+        {undefined, _} ->
             Config;
-        RedisType ->
+        {redis_cluster, batch_on} ->
+            {skip, "Batching is not supported by 'redis_cluster' bridge type"};
+        {RedisType, BatchMode} ->
             Transport = ?config(transport, Config),
-            BatchMode = ?config(batch_mode, Config),
             #{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(),
             #{BatchMode := ResourceConfig} = resource_configs(),
             IsBatch = (BatchMode =:= batch_on),
@@ -162,7 +166,7 @@ end_per_testcase(_Testcase, Config) ->
 
 t_create_delete_bridge(Config) ->
     Name = <<"mybridge">>,
-    Type = ?config(transport_type, Config),
+    Type = ?config(connector_type, Config),
     BridgeConfig = ?config(bridge_config, Config),
     IsBatch = ?config(is_batch, Config),
     ?assertMatch(
@@ -350,9 +354,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
         ?wait_async_action(
             lists:foreach(
                 fun(I) ->
-                    IBin = integer_to_binary(I),
-                    Topic = <<BaseTopic/binary, "/", IBin/binary>>,
-                    _ = publish_message(Topic, RandomPayload)
+                    _ = publish_message(format_topic(BaseTopic, I), RandomPayload)
                 end,
                 lists:seq(1, N)
             ),
@@ -360,7 +362,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
             5000
         ),
         fun(Trace) ->
-            AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)),
+            AddedMsgCount = length(added_msgs(ResourceId, BaseTopic, RandomPayload)),
             case IsBatch of
                 true ->
                     ?assertMatch(
@@ -378,11 +380,23 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
         end
     ).
 
-added_msgs(ResourceId, Payload) ->
-    {ok, Results} = emqx_resource:simple_sync_query(
-        ResourceId, {cmd, [<<"LRANGE">>, <<"MSGS">>, <<"0">>, <<"-1">>]}
-    ),
-    [El || El <- Results, El =:= Payload].
+added_msgs(ResourceId, BaseTopic, Payload) ->
+    lists:flatmap(
+        fun(K) ->
+            {ok, Results} = emqx_resource:simple_sync_query(
+                ResourceId,
+                {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}
+            ),
+            [El || El <- Results, El =:= Payload]
+        end,
+        [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)]
+    ).
+
+format_topic(Base, I) ->
+    iolist_to_binary(io_lib:format("~s/~2..0B", [Base, I rem ?KEYSHARDS])).
+
+format_redis_key(Base, I) ->
+    iolist_to_binary([?KEYPREFIX, "/", format_topic(Base, I)]).
 
 conf_schema(StructName) ->
     #{
@@ -509,7 +523,6 @@ invalid_command_bridge_config() ->
     Conf1#{
         <<"resource_opts">> => #{
             <<"query_mode">> => <<"sync">>,
-            <<"batch_size">> => <<"1">>,
             <<"worker_pool_size">> => <<"1">>,
             <<"start_timeout">> => <<"15s">>
         },
@@ -520,7 +533,6 @@ resource_configs() ->
     #{
         batch_off => #{
             <<"query_mode">> => <<"sync">>,
-            <<"batch_size">> => <<"1">>,
             <<"start_timeout">> => <<"15s">>
         },
         batch_on => #{