Просмотр исходного кода

Merge pull request #13928 from thalesmg/20241003-m-cluster-link-disable-enable

fix(cluster link): don't remove message publish hook if disabling the last link
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
f21757da37

+ 2 - 4
apps/emqx_cluster_link/src/emqx_cluster_link_app.erl

@@ -8,16 +8,14 @@
 
 -export([start/2, prep_stop/1, stop/1]).
 
--define(BROKER_MOD, emqx_cluster_link).
-
 start(_StartType, _StartArgs) ->
     ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()),
     emqx_cluster_link_config:add_handler(),
     LinksConf = emqx_cluster_link_config:enabled_links(),
+    ok = emqx_cluster_link:register_external_broker(),
+    ok = emqx_cluster_link:put_hook(),
     case LinksConf of
         [_ | _] ->
-            ok = emqx_cluster_link:register_external_broker(),
-            ok = emqx_cluster_link:put_hook(),
             ok = start_msg_fwd_resources(LinksConf);
         _ ->
             ok

+ 0 - 13
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -231,7 +231,6 @@ pre_config_update(?LINKS_PATH, NewRawConf, OldRawConf) ->
 post_config_update(?LINKS_PATH, _Req, Old, Old, _AppEnvs) ->
     ok;
 post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
-    ok = toggle_hook_and_broker(enabled_links(New), enabled_links(Old)),
     #{
         removed := Removed,
         added := Added,
@@ -252,18 +251,6 @@ post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-toggle_hook_and_broker([_ | _] = _NewEnabledLinks, [] = _OldEnabledLinks) ->
-    ok = emqx_cluster_link:register_external_broker(),
-    ok = emqx_cluster_link:put_hook();
-toggle_hook_and_broker([] = _NewEnabledLinks, _OldLinks) ->
-    _ = emqx_cluster_link:unregister_external_broker(),
-    ok = emqx_cluster_link:delete_hook();
-toggle_hook_and_broker(_, _) ->
-    ok.
-
-enabled_links(LinksConf) ->
-    [L || #{enable := true} = L <- LinksConf].
-
 all_ok(Results) ->
     lists:all(
         fun

+ 2 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl

@@ -5,6 +5,7 @@
 -module(emqx_cluster_link_extrouter_gc).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 -export([start_link/0]).
 
@@ -56,6 +57,7 @@ handle_cast(Cast, State) ->
 
 handle_info({timeout, TRef, _GC}, St = #st{gc_timer = TRef}) ->
     Result = run_gc_exclusive(),
+    ?tp("clink_extrouter_gc_ran", #{result => Result}),
     Timeout = choose_timeout(Result),
     {noreply, schedule_gc(Timeout, St#st{gc_timer = undefined})};
 handle_info(Info, St) ->

+ 10 - 4
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -399,8 +399,8 @@ handle_info(
                 clink_handshake_error,
                 #{actor => {St1#st.actor, St1#st.incarnation}, reason => Reason}
             ),
-            %% TODO: retry after a timeout?
-            {noreply, St1#st{error = Reason, status = disconnected}}
+            St2 = ensure_reconnect_timer(St1#st{error = Reason, status = disconnected}),
+            {noreply, St2}
     end;
 handle_info({publish, #{}}, St) ->
     {noreply, St};
@@ -476,6 +476,13 @@ post_actor_init(
     NSt = schedule_heartbeat(St#st{client = ClientPid}),
     process_bootstrap(NSt, NeedBootstrap).
 
+ensure_reconnect_timer(#st{reconnect_timer = undefined} = St) ->
+    TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect),
+    St#st{reconnect_timer = TRef};
+ensure_reconnect_timer(#st{reconnect_timer = TRef} = St) ->
+    _ = erlang:cancel_timer(TRef),
+    ensure_reconnect_timer(St#st{reconnect_timer = undefined}).
+
 handle_connect_error(Reason, St) ->
     ?SLOG(error, #{
         msg => "cluster_link_connection_failed",
@@ -483,9 +490,8 @@ handle_connect_error(Reason, St) ->
         target_cluster => St#st.target,
         actor => St#st.actor
     }),
-    TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect),
     _ = maybe_alarm(Reason, St),
-    St#st{reconnect_timer = TRef, error = Reason, status = disconnected}.
+    ensure_reconnect_timer(St#st{error = Reason, status = disconnected}).
 
 handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) ->
     ?SLOG(error, #{

+ 119 - 23
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -10,6 +10,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -60,7 +61,8 @@ groups() ->
 cluster_test_cases() ->
     [
         t_status,
-        t_metrics
+        t_metrics,
+        t_disable_reenable
     ].
 
 init_per_suite(Config) ->
@@ -202,6 +204,55 @@ link_params(Overrides) ->
     },
     emqx_utils_maps:deep_merge(Default, Overrides).
 
+remove_api_virtual_fields(Response) ->
+    maps:without([<<"name">>, <<"node_status">>, <<"status">>], Response).
+
+%% Node
+disable_and_force_gc(TargetOrSource, Name, Params, TCConfig, Opts) ->
+    NExpectedDeletions = maps:get(expected_num_route_deletions, Opts),
+    {ok, SRef} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := "cluster_link_extrouter_route_deleted"}),
+        NExpectedDeletions,
+        infinity
+    ),
+    Nodes =
+        case TargetOrSource of
+            target -> ?config(source_nodes, TCConfig);
+            source -> ?config(target_nodes, TCConfig)
+        end,
+    {200, _} = update_link(TargetOrSource, Name, Params#{<<"enable">> := false}),
+    %% Note that only when the GC runs and collects the stopped actor it'll actually
+    %% remove the routes
+    NowMS = erlang:system_time(millisecond),
+    TTL = emqx_cluster_link_config:actor_ttl(),
+    ct:pal("gc"),
+    Timestamp = NowMS + TTL * 3,
+    lists:foreach(fun(N) -> ok = do_actor_gc(N, Timestamp) end, Nodes),
+    {ok, _} = snabbkaffe:receive_events(SRef),
+    ct:pal("gc done"),
+    ok.
+
+do_actor_gc(Node, Timestamp) ->
+    %% 2 Actors: one for normal routes, one for PS routes
+    case ?ON(Node, emqx_cluster_link_extrouter:actor_gc(#{timestamp => Timestamp})) of
+        1 ->
+            do_actor_gc(Node, Timestamp);
+        0 ->
+            ok
+    end.
+
+wait_for_routes([Node | Nodes], ExpectedTopics) ->
+    Topics = ?ON(Node, emqx_cluster_link_extrouter:topics()),
+    case lists:sort(ExpectedTopics) == lists:sort(Topics) of
+        true ->
+            wait_for_routes(Nodes, ExpectedTopics);
+        false ->
+            timer:sleep(100),
+            wait_for_routes([Node | Nodes], ExpectedTopics)
+    end;
+wait_for_routes([], _ExpectedTopics) ->
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Test cases
 %%------------------------------------------------------------------------------
@@ -704,25 +755,10 @@ t_metrics(Config) ->
     %% Disabling the link should remove the routes.
     ct:pal("disabling"),
     {200, TargetLink0} = get_link(target, TargetName),
-    TargetLink1 = maps:without([<<"status">>, <<"node_status">>], TargetLink0),
-    TargetLink2 = TargetLink1#{<<"enable">> := false},
-    {_, {ok, _}} =
-        ?wait_async_action(
-            begin
-                {200, _} = update_link(target, TargetName, TargetLink2),
-                %% Note that only when the GC runs and collects the stopped actor it'll actually
-                %% remove the routes
-                NowMS = erlang:system_time(millisecond),
-                TTL = emqx_cluster_link_config:actor_ttl(),
-                ct:pal("gc"),
-                %% 2 Actors: one for normal routes, one for PS routes
-                1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})),
-                1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})),
-                ct:pal("gc done"),
-                ok
-            end,
-            #{?snk_kind := "cluster_link_extrouter_route_deleted"}
-        ),
+    TargetLink1 = remove_api_virtual_fields(TargetLink0),
+    ok = disable_and_force_gc(target, TargetName, TargetLink1, Config, #{
+        expected_num_route_deletions => 1
+    }),
 
     ?retry(
         300,
@@ -737,11 +773,11 @@ t_metrics(Config) ->
     ),
 
     %% Enabling again
-    TargetLink3 = TargetLink2#{<<"enable">> := true},
+    TargetLink2 = TargetLink1#{<<"enable">> := true},
     {_, {ok, _}} =
         ?wait_async_action(
             begin
-                {200, _} = update_link(target, TargetName, TargetLink3)
+                {200, _} = update_link(target, TargetName, TargetLink2)
             end,
             #{?snk_kind := "cluster_link_extrouter_route_added"}
         ),
@@ -799,7 +835,7 @@ t_update_password(_Config) ->
             {201, Response1} = create_link(Name, Params1),
             [#{name := Name, password := WrappedPassword0}] = emqx_config:get([cluster, links]),
             ?assertEqual(Password, emqx_secret:unwrap(WrappedPassword0)),
-            Params2A = maps:without([<<"name">>, <<"node_status">>, <<"status">>], Response1),
+            Params2A = remove_api_virtual_fields(Response1),
             Params2 = Params2A#{<<"pool_size">> := 2},
             ?assertEqual(?REDACTED, maps:get(<<"password">>, Params2)),
             ?assertMatch({200, _}, update_link(Name, Params2)),
@@ -842,3 +878,63 @@ t_optional_fields_update(_Config) ->
     {201, _} = create_link(Name, Params0),
     ?assertMatch({200, _}, update_link(Name, Params0)),
     ok.
+
+%% Verifies that, if we disable a link and then re-enable it, it should keep working.
+t_disable_reenable(Config) ->
+    ct:timetrap({seconds, 20}),
+    [SN1, _SN2] = SourceNodes = ?config(source_nodes, Config),
+    [TN1, TN2] = ?config(target_nodes, Config),
+    SourceName = <<"cl.target">>,
+    SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1),
+    TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1),
+    TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2),
+    Topic1 = <<"t/tc1">>,
+    Topic2 = <<"t/tc2">>,
+    {ok, _, _} = emqtt:subscribe(TargetC1, Topic1),
+    {ok, _, _} = emqtt:subscribe(TargetC2, Topic2),
+    %% fixme: use snabbkaffe subscription
+    ?block_until(#{?snk_kind := clink_route_sync_complete}),
+    {ok, _} = emqtt:publish(SourceC1, Topic1, <<"1">>, [{qos, 1}]),
+    {ok, _} = emqtt:publish(SourceC1, Topic2, <<"2">>, [{qos, 1}]),
+    %% Sanity check: link is working, initially.
+    ?assertReceive({publish, #{topic := Topic1, payload := <<"1">>}}),
+    ?assertReceive({publish, #{topic := Topic2, payload := <<"2">>}}),
+
+    %% Now we just disable and re-enable it in the link in the source cluster.
+    {200, #{<<"enable">> := true} = SourceLink0} = get_link(source, SourceName),
+    SourceLink1 = remove_api_virtual_fields(SourceLink0),
+    %% We force GC to simulate that we left the link disable for enough time that the GC
+    %% kicks in.
+    ?assertMatch(
+        {200, #{<<"enable">> := false}},
+        update_link(source, SourceName, SourceLink1#{<<"enable">> := false})
+    ),
+    %% In the original issue, GC deleted the state of target cluster's agent in source
+    %% cluster.  After the fix, there's no longer GC, so we ignore timeouts here.
+    _ = ?block_until(
+        #{?snk_kind := "clink_extrouter_gc_ran", result := NumDeleted} when
+            NumDeleted > 0,
+        emqx_cluster_link_config:actor_ttl() + 1_000
+    ),
+    ?assertMatch(
+        {200, #{<<"enable">> := true}},
+        update_link(source, SourceName, SourceLink1)
+    ),
+
+    Topic3 = <<"t/tc3">>,
+    Topic4 = <<"t/tc4">>,
+    {ok, _, _} = emqtt:subscribe(TargetC1, Topic3),
+    {ok, _, _} = emqtt:subscribe(TargetC2, Topic4),
+    ct:pal("waiting for routes to be synced..."),
+    ExpectedTopics = [Topic1, Topic2, Topic3, Topic4],
+    wait_for_routes(SourceNodes, ExpectedTopics),
+
+    {ok, _} = emqtt:publish(SourceC1, Topic1, <<"3">>, [{qos, 1}]),
+    {ok, _} = emqtt:publish(SourceC1, Topic2, <<"4">>, [{qos, 1}]),
+    {ok, _} = emqtt:publish(SourceC1, Topic3, <<"5">>, [{qos, 1}]),
+    {ok, _} = emqtt:publish(SourceC1, Topic4, <<"6">>, [{qos, 1}]),
+    ?assertReceive({publish, #{topic := Topic1, payload := <<"3">>}}),
+    ?assertReceive({publish, #{topic := Topic2, payload := <<"4">>}}),
+    ?assertReceive({publish, #{topic := Topic3, payload := <<"5">>}}),
+    ?assertReceive({publish, #{topic := Topic4, payload := <<"6">>}}),
+    ok.

+ 1 - 0
changes/ee/fix-13928.en.md

@@ -0,0 +1 @@
+Fixed an issue where a bidirectional cluster link could become stuck and stop working if one of the sides disabled the link for a long period of time before re-enabling it.