Browse Source

test(cluster-link): add e2e replication actor GC testcase

Andrew Mayorov 1 year ago
parent
commit
d0df4de2a3

+ 4 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -13,7 +13,11 @@
 
 -define(MQTT_HOST_OPTS, #{default_port => 1883}).
 
+-ifndef(TEST).
 -define(DEFAULT_ACTOR_TTL, 30_000).
+-else.
+-define(DEFAULT_ACTOR_TTL, 3_000).
+-endif.
 
 -export([
     %% General

+ 12 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -4,6 +4,8 @@
 
 -module(emqx_cluster_link_extrouter).
 
+-include_lib("snabbkaffe/include/trace.hrl").
+
 -export([create_tables/0]).
 
 %% Router API
@@ -318,8 +320,16 @@ mnesia_actor_heartbeat(ActorID, Incarnation, TS) ->
             mnesia:abort({nonexistent_actor, ActorID})
     end.
 
-clean_incarnation(Rec) ->
-    transaction(fun ?MODULE:mnesia_clean_incarnation/1, [Rec]).
+clean_incarnation(Rec = #actor{id = {Cluster, Actor}}) ->
+    case transaction(fun ?MODULE:mnesia_clean_incarnation/1, [Rec]) of
+        ok ->
+            ?tp(debug, clink_extrouter_actor_cleaned, #{
+                cluster => Cluster,
+                actor => Actor
+            });
+        Result ->
+            Result
+    end.
 
 mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) ->
     case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of

+ 4 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl

@@ -20,7 +20,11 @@
 
 -define(SERVER, ?MODULE).
 
+-ifndef(TEST).
 -define(REPEAT_GC_INTERVAL, 5_000).
+-else.
+-define(REPEAT_GC_INTERVAL, 1_000).
+-endif.
 
 %%
 

+ 21 - 10
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -8,6 +8,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 -behaviour(emqx_resource).
 -behaviour(ecpool_worker).
@@ -123,15 +124,19 @@ on_query(_ResourceId, FwdMsg, #{pool_name := PoolName, topic := LinkTopic} = _St
     is_record(FwdMsg, message)
 ->
     #message{topic = Topic, qos = QoS} = FwdMsg,
-    handle_send_result(
-        ecpool:pick_and_do(
-            {PoolName, Topic},
-            fun(ConnPid) ->
-                emqtt:publish(ConnPid, LinkTopic, ?ENCODE(FwdMsg), QoS)
-            end,
-            no_handover
-        )
-    ).
+    PubResult = ecpool:pick_and_do(
+        {PoolName, Topic},
+        fun(ConnPid) ->
+            emqtt:publish(ConnPid, LinkTopic, ?ENCODE(FwdMsg), QoS)
+        end,
+        no_handover
+    ),
+    ?tp_ignore_side_effects_in_prod(clink_message_forwarded, #{
+        pool => PoolName,
+        message => FwdMsg,
+        pub_result => PubResult
+    }),
+    handle_send_result(PubResult).
 
 on_query_async(
     _ResourceId, FwdMsg, CallbackIn, #{pool_name := PoolName, topic := LinkTopic} = _State
@@ -145,7 +150,13 @@ on_query_async(
             %% #delivery{} record has no valuable data for a remote link...
             Payload = ?ENCODE(FwdMsg),
             %% TODO: check override QOS requirements (if any)
-            emqtt:publish_async(ConnPid, LinkTopic, Payload, QoS, Callback)
+            PubResult = emqtt:publish_async(ConnPid, LinkTopic, Payload, QoS, Callback),
+            ?tp_ignore_side_effects_in_prod(clink_message_forwarded, #{
+                pool => PoolName,
+                message => FwdMsg,
+                pub_result => PubResult
+            }),
+            PubResult
         end,
         no_handover
     ).

+ 109 - 27
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -7,6 +7,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("emqx/include/asserts.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -17,17 +18,10 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    SourceCluster = start_source_cluster(Config),
-    TargetCluster = start_target_cluster(Config),
-    [
-        {source_cluster, SourceCluster},
-        {target_cluster, TargetCluster}
-        | Config
-    ].
+    Config.
 
-end_per_suite(Config) ->
-    ok = emqx_cth_cluster:stop(?config(source_cluster, Config)),
-    ok = emqx_cth_cluster:stop(?config(target_cluster, Config)).
+end_per_suite(_Config) ->
+    ok.
 
 init_per_testcase(TCName, Config) ->
     emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config).
@@ -37,7 +31,7 @@ end_per_testcase(TCName, Config) ->
 
 %%
 
-start_source_cluster(Config) ->
+mk_source_cluster(BaseName, Config) ->
     SourceConf =
         "cluster {"
         "\n name = cl.source"
@@ -51,15 +45,15 @@ start_source_cluster(Config) ->
         "\n ]}",
     SourceApps1 = [{emqx_conf, combine([conf_log(), SourceConf])}],
     SourceApps2 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(41883), SourceConf])}],
-    emqx_cth_cluster:start(
+    emqx_cth_cluster:mk_nodespecs(
         [
-            {emqx_clink_msgfwd_source1, #{apps => SourceApps1}},
-            {emqx_clink_msgfwd_source2, #{apps => SourceApps2}}
+            {mk_nodename(BaseName, s1), #{apps => SourceApps1}},
+            {mk_nodename(BaseName, s2), #{apps => SourceApps2}}
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ).
 
-start_target_cluster(Config) ->
+mk_target_cluster(BaseName, Config) ->
     TargetConf =
         "cluster {"
         "\n name = cl.target"
@@ -73,14 +67,17 @@ start_target_cluster(Config) ->
         "\n ]}",
     TargetApps1 = [{emqx_conf, combine([conf_log(), TargetConf])}],
     TargetApps2 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(31883), TargetConf])}],
-    emqx_cth_cluster:start(
+    emqx_cth_cluster:mk_nodespecs(
         [
-            {emqx_clink_msgfwd_target1, #{apps => TargetApps1, base_port => 20100}},
-            {emqx_clink_msgfwd_target2, #{apps => TargetApps2, base_port => 20200}}
+            {mk_nodename(BaseName, t1), #{apps => TargetApps1, base_port => 20100}},
+            {mk_nodename(BaseName, t2), #{apps => TargetApps2, base_port => 20200}}
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ).
 
+mk_nodename(BaseName, Suffix) ->
+    binary_to_atom(fmt("emqx_clink_~s_~s", [BaseName, Suffix])).
+
 conf_mqtt_listener(LPort) when is_integer(LPort) ->
     fmt("listeners.tcp.clink { bind = ~p }", [LPort]);
 conf_mqtt_listener(_) ->
@@ -92,8 +89,7 @@ conf_log() ->
 combine([Entry | Rest]) ->
     lists:foldl(fun emqx_cth_suite:merge_config/2, Entry, Rest).
 
-start_cluster_link(Config) ->
-    Nodes = nodes_all(Config),
+start_cluster_link(Nodes, Config) ->
     [{ok, Apps}] = lists:usort(
         erpc:multicall(Nodes, emqx_cth_suite, start_apps, [
             [emqx_cluster_link],
@@ -115,20 +111,27 @@ nodes_all(Config) ->
     nodes_source(Config) ++ nodes_target(Config).
 
 nodes_source(Config) ->
-    ?config(source_cluster, Config).
+    ?config(source_nodes, Config).
 
 nodes_target(Config) ->
-    ?config(target_cluster, Config).
+    ?config(target_nodes, Config).
 
 %%
 
 t_message_forwarding('init', Config) ->
-    Apps = start_cluster_link(Config),
+    SourceNodes = emqx_cth_cluster:start(mk_source_cluster(?FUNCTION_NAME, Config)),
+    TargetNodes = emqx_cth_cluster:start(mk_target_cluster(?FUNCTION_NAME, Config)),
+    _Apps = start_cluster_link(SourceNodes ++ TargetNodes, Config),
     ok = snabbkaffe:start_trace(),
-    [{tc_apps, Apps} | Config];
+    [
+        {source_nodes, SourceNodes},
+        {target_nodes, TargetNodes}
+        | Config
+    ];
 t_message_forwarding('end', Config) ->
     ok = snabbkaffe:stop(),
-    stop_cluster_link(Config).
+    ok = emqx_cth_cluster:stop(?config(source_nodes, Config)),
+    ok = emqx_cth_cluster:stop(?config(target_nodes, Config)).
 
 t_message_forwarding(Config) ->
     [SourceNode1 | _] = nodes_source(Config),
@@ -150,11 +153,90 @@ t_message_forwarding(Config) ->
     ok = emqtt:stop(SourceC1),
     ok = emqtt:stop(TargetC1),
     ok = emqtt:stop(TargetC2).
+
+t_target_extrouting_gc('init', Config) ->
+    SourceCluster = mk_source_cluster(?FUNCTION_NAME, Config),
+    SourceNodes = emqx_cth_cluster:start(SourceCluster),
+    TargetCluster = mk_target_cluster(?FUNCTION_NAME, Config),
+    TargetNodes = emqx_cth_cluster:start(TargetCluster),
+    _Apps = start_cluster_link(SourceNodes ++ TargetNodes, Config),
+    ok = snabbkaffe:start_trace(),
+    [
+        {source_cluster, SourceCluster},
+        {source_nodes, SourceNodes},
+        {target_cluster, TargetCluster},
+        {target_nodes, TargetNodes}
+        | Config
+    ];
+t_target_extrouting_gc('end', Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(source_nodes, Config)).
+
+t_target_extrouting_gc(Config) ->
+    [SourceNode1 | _] = nodes_source(Config),
+    [TargetNode1, TargetNode2 | _] = nodes_target(Config),
+    SourceC1 = start_client("t_target_extrouting_gc", SourceNode1),
+    TargetC1 = start_client_unlink("t_target_extrouting_gc1", TargetNode1),
+    TargetC2 = start_client_unlink("t_target_extrouting_gc2", TargetNode2),
+    {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/#">>, qos1),
+    {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/+">>, qos1),
+    {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}),
+    {ok, _} = emqtt:publish(SourceC1, <<"t/1">>, <<"HELLO1">>, qos1),
+    {ok, _} = emqtt:publish(SourceC1, <<"t/2/ext">>, <<"HELLO2">>, qos1),
+    {ok, _} = emqtt:publish(SourceC1, <<"t/3/ext">>, <<"HELLO3">>, qos1),
+    Pubs1 = [M || {publish, M} <- ?drainMailbox(1_000)],
+    {ok, _} = ?wait_async_action(
+        emqx_cth_cluster:stop_node(TargetNode1),
+        #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>}
+    ),
+    {ok, _} = emqtt:publish(SourceC1, <<"t/4/ext">>, <<"HELLO4">>, qos1),
+    {ok, _} = emqtt:publish(SourceC1, <<"t/5">>, <<"HELLO5">>, qos1),
+    Pubs2 = [M || {publish, M} <- ?drainMailbox(1_000)],
+    {ok, _} = ?wait_async_action(
+        emqx_cth_cluster:stop_node(TargetNode2),
+        #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>}
+    ),
     ok = emqtt:stop(SourceC1),
-    ok = emqtt:stop(TargetC1).
+    %% Verify that extrouter table eventually becomes empty.
+    ?assertEqual(
+        [],
+        erpc:call(SourceNode1, emqx_cluster_link_extrouter, topics, []),
+        {
+            erpc:call(SourceNode1, ets, tab2list, [emqx_external_router_actor]),
+            erpc:call(SourceNode1, ets, tab2list, [emqx_external_router_route])
+        }
+    ),
+    %% Verify all relevant messages were forwarded.
+    ?assertMatch(
+        [
+            #{topic := <<"t/1">>, payload := <<"HELLO1">>, client_pid := _C1},
+            #{topic := <<"t/1">>, payload := <<"HELLO1">>, client_pid := _C2},
+            #{topic := <<"t/2/ext">>, payload := <<"HELLO2">>},
+            #{topic := <<"t/3/ext">>, payload := <<"HELLO3">>},
+            #{topic := <<"t/5">>, payload := <<"HELLO5">>}
+        ],
+        lists:sort(emqx_utils_maps:key_comparer(topic), Pubs1 ++ Pubs2)
+    ),
+    %% Verify there was no unnecessary forwarding.
+    Trace = snabbkaffe:collect_trace(),
+    ?assertMatch(
+        [
+            #{message := #message{topic = <<"t/1">>, payload = <<"HELLO1">>}},
+            #{message := #message{topic = <<"t/2/ext">>, payload = <<"HELLO2">>}},
+            #{message := #message{topic = <<"t/3/ext">>, payload = <<"HELLO3">>}},
+            #{message := #message{topic = <<"t/5">>, payload = <<"HELLO5">>}}
+        ],
+        ?of_kind(clink_message_forwarded, Trace),
+        Trace
+    ).
 
 %%
 
+start_client_unlink(ClientId, Node) ->
+    Client = start_client(ClientId, Node),
+    _ = erlang:unlink(Client),
+    Client.
+
 start_client(ClientId, Node) ->
     Port = tcp_port(Node),
     {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
@@ -166,4 +248,4 @@ tcp_port(Node) ->
     Port.
 
 fmt(Fmt, Args) ->
-    io_lib:format(Fmt, Args).
+    emqx_utils:format(Fmt, Args).