فهرست منبع

test(kafka_consumer): add more clusterized tests

Thales Macedo Garitezi 2 سال پیش
والد
کامیت
41b8d47696
1فایلهای تغییر یافته به همراه341 افزوده شده و 62 حذف شده
  1. 341 62
      lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl

+ 341 - 62
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl

@@ -55,6 +55,8 @@ only_once_tests() ->
     [
         t_bridge_rule_action_source,
         t_cluster_group,
+        t_node_joins_existing_cluster,
+        t_cluster_node_down,
         t_multiple_topic_mappings
     ].
 
@@ -924,12 +926,16 @@ action_response(Selected, Envs, Args) ->
     }),
     ok.
 
-wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout) ->
-    do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, #{}).
+wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout) ->
+    do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout, #{}).
 
-do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, Acc0) ->
-    case map_size(Acc0) =:= NPartitions of
+do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout, Acc0) ->
+    AllPartitionsCovered = map_size(Acc0) =:= NPartitions,
+    PresentNodes = lists:usort([N || {_Partition, {N, _MemberId}} <- maps:to_list(Acc0)]),
+    AllNodesCovered = PresentNodes =:= lists:usort(Nodes),
+    case AllPartitionsCovered andalso AllNodesCovered of
         true ->
+            ct:pal("group balanced: ~p", [Acc0]),
             {ok, Acc0};
         false ->
             receive
@@ -942,7 +948,7 @@ do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, Acc0) ->
                         topic_assignments => TopicAssignments
                     },
                     Acc = reconstruct_assignments_from_events(KafkaTopic, [Event], Acc0),
-                    do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, Acc)
+                    do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout, Acc)
             after Timeout ->
                 {timeout, Acc0}
             end
@@ -974,6 +980,123 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
         Assignments
     ).
 
+setup_group_subscriber_spy(Node) ->
+    TestPid = self(),
+    ok = erpc:call(
+        Node,
+        fun() ->
+            ok = meck:new(brod_group_subscriber_v2, [
+                passthrough, no_link, no_history, non_strict
+            ]),
+            ok = meck:expect(
+                brod_group_subscriber_v2,
+                assignments_received,
+                fun(Pid, MemberId, GenerationId, TopicAssignments) ->
+                    ?tp(
+                        kafka_assignment,
+                        #{
+                            node => node(),
+                            pid => Pid,
+                            member_id => MemberId,
+                            generation_id => GenerationId,
+                            topic_assignments => TopicAssignments
+                        }
+                    ),
+                    TestPid !
+                        {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
+                    meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
+                end
+            ),
+            ok
+        end
+    ).
+
+wait_for_cluster_rpc(Node) ->
+    %% need to wait until the config handler is ready after
+    %% restarting during the cluster join.
+    ?retry(
+        _Sleep0 = 100,
+        _Attempts0 = 50,
+        true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
+    ).
+
+setup_and_start_listeners(Node, NodeOpts) ->
+    erpc:call(
+        Node,
+        fun() ->
+            lists:foreach(
+                fun(Type) ->
+                    Port = emqx_common_test_helpers:listener_port(NodeOpts, Type),
+                    ok = emqx_config:put(
+                        [listeners, Type, default, bind],
+                        {{127, 0, 0, 1}, Port}
+                    ),
+                    ok = emqx_config:put_raw(
+                        [listeners, Type, default, bind],
+                        iolist_to_binary([<<"127.0.0.1:">>, integer_to_binary(Port)])
+                    ),
+                    ok
+                end,
+                [tcp, ssl, ws, wss]
+            ),
+            ok = emqx_listeners:start(),
+            ok
+        end
+    ).
+
+cluster(Config) ->
+    PrivDataDir = ?config(priv_dir, Config),
+    Cluster = emqx_common_test_helpers:emqx_cluster(
+        [core, core],
+        [
+            {apps, [emqx_conf, emqx_bridge, emqx_rule_engine]},
+            {listener_ports, []},
+            {peer_mod, slave},
+            {priv_data_dir, PrivDataDir},
+            {load_schema, true},
+            {start_autocluster, true},
+            {schema_mod, emqx_ee_conf_schema},
+            {env_handler, fun
+                (emqx) ->
+                    application:set_env(emqx, boot_modules, [broker, router]),
+                    ok;
+                (emqx_conf) ->
+                    ok;
+                (_) ->
+                    ok
+            end}
+        ]
+    ),
+    ct:pal("cluster: ~p", [Cluster]),
+    Cluster.
+
+start_async_publisher(Config, KafkaTopic) ->
+    TId = ets:new(kafka_payloads, [public, ordered_set]),
+    Loop = fun Go() ->
+        receive
+            stop -> ok
+        after 0 ->
+            Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+            publish(Config, KafkaTopic, [#{key => Payload, value => Payload}]),
+            ets:insert(TId, {Payload}),
+            timer:sleep(400),
+            Go()
+        end
+    end,
+    Pid = spawn_link(Loop),
+    {TId, Pid}.
+
+stop_async_publisher(Pid) ->
+    MRef = monitor(process, Pid),
+    Pid ! stop,
+    receive
+        {'DOWN', MRef, process, Pid, _} ->
+            ok
+    after 1_000 ->
+        ct:fail("publisher didn't die")
+    end,
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -1500,36 +1623,17 @@ t_bridge_rule_action_source(Config) ->
     ),
     ok.
 
+%% checks that an existing cluster can be configured with a kafka
+%% consumer bridge and that the consumers will distribute over the two
+%% nodes.
 t_cluster_group(Config) ->
-    ct:timetrap({seconds, 180}),
-    TestPid = self(),
+    ct:timetrap({seconds, 150}),
     NPartitions = ?config(num_partitions, Config),
     KafkaTopic = ?config(kafka_topic, Config),
     KafkaName = ?config(kafka_name, Config),
     ResourceId = resource_id(Config),
     BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
-    PrivDataDir = ?config(priv_dir, Config),
-    Cluster = emqx_common_test_helpers:emqx_cluster(
-        [core, core],
-        [
-            {apps, [emqx_conf, emqx_bridge, emqx_rule_engine]},
-            {listener_ports, []},
-            {peer_mod, slave},
-            {priv_data_dir, PrivDataDir},
-            {load_schema, true},
-            {schema_mod, emqx_ee_conf_schema},
-            {env_handler, fun
-                (emqx) ->
-                    application:set_env(emqx, boot_modules, []),
-                    ok;
-                (emqx_conf) ->
-                    ok;
-                (_) ->
-                    ok
-            end}
-        ]
-    ),
-    ct:pal("cluster: ~p", [Cluster]),
+    Cluster = cluster(Config),
     ?check_trace(
         begin
             Nodes =
@@ -1540,47 +1644,19 @@ t_cluster_group(Config) ->
             on_exit(fun() ->
                 lists:foreach(
                     fun(N) ->
+                        ct:pal("stopping ~p", [N]),
                         ok = emqx_common_test_helpers:stop_slave(N)
                     end,
                     Nodes
                 )
             end),
-            lists:foreach(
-                fun(N) ->
-                    erpc:call(N, fun() ->
-                        ok = meck:new(brod_group_subscriber_v2, [
-                            passthrough, no_link, no_history, non_strict
-                        ]),
-                        ok = meck:expect(
-                            brod_group_subscriber_v2,
-                            assignments_received,
-                            fun(Pid, MemberId, GenerationId, TopicAssignments) ->
-                                TestPid !
-                                    {kafka_assignment, node(),
-                                        {Pid, MemberId, GenerationId, TopicAssignments}},
-                                ?tp(
-                                    kafka_assignment,
-                                    #{
-                                        node => node(),
-                                        pid => Pid,
-                                        member_id => MemberId,
-                                        generation_id => GenerationId,
-                                        topic_assignments => TopicAssignments
-                                    }
-                                ),
-                                meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
-                            end
-                        ),
-                        ok
-                    end)
-                end,
-                Nodes
-            ),
+            lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
             {ok, SRef0} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
                 length(Nodes),
                 15_000
             ),
+            wait_for_cluster_rpc(N2),
             erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
             {ok, _} = snabbkaffe:receive_events(SRef0),
             lists:foreach(
@@ -1598,8 +1674,7 @@ t_cluster_group(Config) ->
             %% sleep so that the two nodes have time to distribute the
             %% subscribers, rather than just one node containing all
             %% of them.
-            ct:sleep(10_000),
-            {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, 30_000),
+            {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000),
             lists:foreach(
                 fun(N) ->
                     ?assertEqual(
@@ -1630,3 +1705,207 @@ t_cluster_group(Config) ->
         end
     ),
     ok.
+
+%% test that the kafka consumer group rebalances correctly if a bridge
+%% already exists when a new EMQX node joins the cluster.
+t_node_joins_existing_cluster(Config) ->
+    ct:timetrap({seconds, 150}),
+    TopicMapping = ?config(topic_mapping, Config),
+    [MQTTTopic] = [MQTTTopic || #{mqtt_topic := MQTTTopic} <- TopicMapping],
+    NPartitions = ?config(num_partitions, Config),
+    KafkaTopic = ?config(kafka_topic, Config),
+    KafkaName = ?config(kafka_name, Config),
+    ResourceId = resource_id(Config),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
+    Cluster = cluster(Config),
+    ?check_trace(
+        begin
+            [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
+            N1 = emqx_common_test_helpers:start_slave(Name1, Opts1),
+            on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N1) end),
+            setup_group_subscriber_spy(N1),
+            {{ok, _}, {ok, _}} =
+                ?wait_async_action(
+                    erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end),
+                    #{?snk_kind := kafka_consumer_subscriber_started},
+                    15_000
+                ),
+            ?assertMatch({ok, _}, erpc:call(N1, emqx_bridge, lookup, [BridgeId])),
+            {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N1], 30_000),
+            ?assertEqual(
+                {ok, connected},
+                erpc:call(N1, emqx_resource_manager, health_check, [ResourceId])
+            ),
+
+            %% Now, we start the second node and have it join the cluster.
+            setup_and_start_listeners(N1, Opts1),
+            TCPPort1 = emqx_common_test_helpers:listener_port(Opts1, tcp),
+            {ok, C1} = emqtt:start_link([{port, TCPPort1}, {proto_ver, v5}]),
+            on_exit(fun() -> catch emqtt:stop(C1) end),
+            {ok, _} = emqtt:connect(C1),
+            {ok, _, [2]} = emqtt:subscribe(C1, MQTTTopic, 2),
+
+            {ok, SRef0} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
+                1,
+                30_000
+            ),
+            N2 = emqx_common_test_helpers:start_slave(Name2, Opts2),
+            on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N2) end),
+            setup_group_subscriber_spy(N2),
+            Nodes = [N1, N2],
+            wait_for_cluster_rpc(N2),
+
+            {ok, _} = snabbkaffe:receive_events(SRef0),
+            ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])),
+            %% Give some time for the consumers in both nodes to
+            %% rebalance.
+            {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000),
+            %% Publish some messages so we can check they came from each node.
+            ?retry(
+                _Sleep1 = 100,
+                _Attempts1 = 50,
+                true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic])
+            ),
+            {ok, SRef1} =
+                snabbkaffe:subscribe(
+                    ?match_event(#{
+                        ?snk_kind := kafka_consumer_handle_message,
+                        ?snk_span := {complete, _}
+                    }),
+                    NPartitions,
+                    10_000
+                ),
+            lists:foreach(
+                fun(N) ->
+                    Key = <<"k", (integer_to_binary(N))/binary>>,
+                    Val = <<"v", (integer_to_binary(N))/binary>>,
+                    publish(Config, KafkaTopic, [#{key => Key, value => Val}])
+                end,
+                lists:seq(1, NPartitions)
+            ),
+            {ok, _} = snabbkaffe:receive_events(SRef1),
+
+            #{nodes => Nodes}
+        end,
+        fun(Res, Trace0) ->
+            #{nodes := Nodes} = Res,
+            Trace1 = ?of_kind(kafka_assignment, Trace0),
+            Assignments = reconstruct_assignments_from_events(KafkaTopic, Trace1),
+            NodeAssignments = lists:usort([
+                N
+             || {_Partition, {N, _MemberId}} <-
+                    maps:to_list(Assignments)
+            ]),
+            ?assertEqual(lists:usort(Nodes), NodeAssignments),
+            ?assertEqual(NPartitions, map_size(Assignments)),
+            Published = receive_published(#{n => NPartitions, timeout => 3_000}),
+            ct:pal("published:\n  ~p", [Published]),
+            PublishingNodesFromTrace =
+                [
+                    N
+                 || #{
+                        ?snk_kind := kafka_consumer_handle_message,
+                        ?snk_span := start,
+                        ?snk_meta := #{node := N}
+                    } <- Trace0
+                ],
+            ?assertEqual(lists:usort(Nodes), lists:usort(PublishingNodesFromTrace)),
+            ok
+        end
+    ),
+    ok.
+
+%% Checks that the consumers get rebalanced after an EMQX nodes goes
+%% down.
+t_cluster_node_down(Config) ->
+    ct:timetrap({seconds, 150}),
+    TopicMapping = ?config(topic_mapping, Config),
+    [MQTTTopic] = [MQTTTopic || #{mqtt_topic := MQTTTopic} <- TopicMapping],
+    NPartitions = ?config(num_partitions, Config),
+    KafkaTopic = ?config(kafka_topic, Config),
+    KafkaName = ?config(kafka_name, Config),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
+    Cluster = cluster(Config),
+    ?check_trace(
+        begin
+            {_N2, Opts2} = lists:nth(2, Cluster),
+            Nodes =
+                [N1, N2 | _] =
+                lists:map(
+                    fun({Name, Opts}) -> emqx_common_test_helpers:start_slave(Name, Opts) end,
+                    Cluster
+                ),
+            on_exit(fun() ->
+                lists:foreach(
+                    fun(N) ->
+                        ct:pal("stopping ~p", [N]),
+                        ok = emqx_common_test_helpers:stop_slave(N)
+                    end,
+                    Nodes
+                )
+            end),
+            lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
+            {ok, SRef0} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
+                length(Nodes),
+                15_000
+            ),
+            wait_for_cluster_rpc(N2),
+            erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end),
+            {ok, _} = snabbkaffe:receive_events(SRef0),
+            lists:foreach(
+                fun(N) ->
+                    ?assertMatch(
+                        {ok, _},
+                        erpc:call(N, emqx_bridge, lookup, [BridgeId]),
+                        #{node => N}
+                    )
+                end,
+                Nodes
+            ),
+            {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000),
+
+            %% Now, we stop one of the nodes and watch the group
+            %% rebalance.
+            setup_and_start_listeners(N2, Opts2),
+            TCPPort = emqx_common_test_helpers:listener_port(Opts2, tcp),
+            {ok, C} = emqtt:start_link([{port, TCPPort}, {proto_ver, v5}]),
+            on_exit(fun() -> catch emqtt:stop(C) end),
+            {ok, _} = emqtt:connect(C),
+            {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2),
+            {TId, Pid} = start_async_publisher(Config, KafkaTopic),
+
+            ct:pal("stopping node ~p", [N1]),
+            ok = emqx_common_test_helpers:stop_slave(N1),
+
+            %% Give some time for the consumers in remaining node to
+            %% rebalance.
+            {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N2], 60_000),
+
+            ok = stop_async_publisher(Pid),
+
+            #{nodes => Nodes, payloads_tid => TId}
+        end,
+        fun(Res, Trace0) ->
+            #{nodes := Nodes, payloads_tid := TId} = Res,
+            [_N1, N2 | _] = Nodes,
+            Trace1 = ?of_kind(kafka_assignment, Trace0),
+            Assignments = reconstruct_assignments_from_events(KafkaTopic, Trace1),
+            NodeAssignments = lists:usort([
+                N
+             || {_Partition, {N, _MemberId}} <-
+                    maps:to_list(Assignments)
+            ]),
+            %% The surviving node has all the partitions assigned to
+            %% it.
+            ?assertEqual([N2], NodeAssignments),
+            ?assertEqual(NPartitions, map_size(Assignments)),
+            NumPublished = ets:info(TId, size),
+            %% All published messages are eventually received.
+            Published = receive_published(#{n => NumPublished, timeout => 3_000}),
+            ct:pal("published:\n  ~p", [Published]),
+            ok
+        end
+    ),
+    ok.