Explorar o código

test(redis): ensure batch query hit different cluster shards

This will inevitably fail: it's not generally possible to update
different keys through the same cluster connection, one or more
update will fail with `MOVED` status. This testcase should serve
as a regression test later.
Andrew Mayorov %!s(int64=3) %!d(string=hai) anos
pai
achega
903a77b471
Modificáronse 1 ficheiros con 28 adicións e 15 borrados
  1. 28 15
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl

+ 28 - 15
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl

@@ -16,6 +16,9 @@
 %% CT boilerplate
 %% CT boilerplate
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
+-define(KEYSHARDS, 3).
+-define(KEYPREFIX, "MSGS").
+
 -define(REDIS_TOXYPROXY_CONNECT_CONFIG, #{
 -define(REDIS_TOXYPROXY_CONNECT_CONFIG, #{
     <<"server">> => <<"toxiproxy:6379">>,
     <<"server">> => <<"toxiproxy:6379">>,
     <<"redis_type">> => <<"single">>
     <<"redis_type">> => <<"single">>
@@ -23,7 +26,7 @@
 
 
 -define(COMMON_REDIS_OPTS, #{
 -define(COMMON_REDIS_OPTS, #{
     <<"password">> => <<"public">>,
     <<"password">> => <<"public">>,
-    <<"command_template">> => [<<"RPUSH">>, <<"MSGS">>, <<"${payload}">>],
+    <<"command_template">> => [<<"RPUSH">>, <<?KEYPREFIX, "/${topic}">>, <<"${payload}">>],
     <<"local_topic">> => <<"local_topic/#">>
     <<"local_topic">> => <<"local_topic/#">>
 }).
 }).
 
 
@@ -47,7 +50,7 @@
     )
     )
 ).
 ).
 
 
-all() -> [{group, transport_types}, {group, rest}].
+all() -> [{group, transports}, {group, rest}].
 
 
 groups() ->
 groups() ->
     ResourceSpecificTCs = [t_create_delete_bridge],
     ResourceSpecificTCs = [t_create_delete_bridge],
@@ -63,7 +66,7 @@ groups() ->
     ],
     ],
     [
     [
         {rest, TCs},
         {rest, TCs},
-        {transport_types, [
+        {transports, [
             {group, tcp},
             {group, tcp},
             {group, tls}
             {group, tls}
         ]},
         ]},
@@ -79,7 +82,7 @@ groups() ->
 init_per_group(Group, Config) when
 init_per_group(Group, Config) when
     Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster
     Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster
 ->
 ->
-    [{transport_type, Group} | Config];
+    [{connector_type, Group} | Config];
 init_per_group(Group, Config) when
 init_per_group(Group, Config) when
     Group =:= tcp; Group =:= tls
     Group =:= tcp; Group =:= tls
 ->
 ->
@@ -139,7 +142,7 @@ end_per_suite(_Config) ->
 init_per_testcase(_Testcase, Config) ->
 init_per_testcase(_Testcase, Config) ->
     ok = delete_all_rules(),
     ok = delete_all_rules(),
     ok = delete_all_bridges(),
     ok = delete_all_bridges(),
-    case ?config(transport_type, Config) of
+    case ?config(connector_type, Config) of
         undefined ->
         undefined ->
             Config;
             Config;
         RedisType ->
         RedisType ->
@@ -162,7 +165,7 @@ end_per_testcase(_Testcase, Config) ->
 
 
 t_create_delete_bridge(Config) ->
 t_create_delete_bridge(Config) ->
     Name = <<"mybridge">>,
     Name = <<"mybridge">>,
-    Type = ?config(transport_type, Config),
+    Type = ?config(connector_type, Config),
     BridgeConfig = ?config(bridge_config, Config),
     BridgeConfig = ?config(bridge_config, Config),
     IsBatch = ?config(is_batch, Config),
     IsBatch = ?config(is_batch, Config),
     ?assertMatch(
     ?assertMatch(
@@ -350,9 +353,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
         ?wait_async_action(
         ?wait_async_action(
             lists:foreach(
             lists:foreach(
                 fun(I) ->
                 fun(I) ->
-                    IBin = integer_to_binary(I),
-                    Topic = <<BaseTopic/binary, "/", IBin/binary>>,
-                    _ = publish_message(Topic, RandomPayload)
+                    _ = publish_message(format_topic(BaseTopic, I), RandomPayload)
                 end,
                 end,
                 lists:seq(1, N)
                 lists:seq(1, N)
             ),
             ),
@@ -360,7 +361,7 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
             5000
             5000
         ),
         ),
         fun(Trace) ->
         fun(Trace) ->
-            AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)),
+            AddedMsgCount = length(added_msgs(ResourceId, BaseTopic, RandomPayload)),
             case IsBatch of
             case IsBatch of
                 true ->
                 true ->
                     ?assertMatch(
                     ?assertMatch(
@@ -378,11 +379,23 @@ check_resource_queries(ResourceId, BaseTopic, IsBatch) ->
         end
         end
     ).
     ).
 
 
-added_msgs(ResourceId, Payload) ->
-    {ok, Results} = emqx_resource:simple_sync_query(
-        ResourceId, {cmd, [<<"LRANGE">>, <<"MSGS">>, <<"0">>, <<"-1">>]}
-    ),
-    [El || El <- Results, El =:= Payload].
+added_msgs(ResourceId, BaseTopic, Payload) ->
+    lists:flatmap(
+        fun(K) ->
+            {ok, Results} = emqx_resource:simple_sync_query(
+                ResourceId,
+                {cmd, [<<"LRANGE">>, K, <<"0">>, <<"-1">>]}
+            ),
+            [El || El <- Results, El =:= Payload]
+        end,
+        [format_redis_key(BaseTopic, S) || S <- lists:seq(0, ?KEYSHARDS - 1)]
+    ).
+
+format_topic(Base, I) ->
+    iolist_to_binary(io_lib:format("~s/~2..0B", [Base, I rem ?KEYSHARDS])).
+
+format_redis_key(Base, I) ->
+    iolist_to_binary([?KEYPREFIX, "/", format_topic(Base, I)]).
 
 
 conf_schema(StructName) ->
 conf_schema(StructName) ->
     #{
     #{