Explorar o código

Merge pull request #14155 from thalesmg/20241105-r58-clink-actor-init

fix(cluster link): restart connection if actor init fails
Thales Macedo Garitezi hai 1 ano
pai
achega
92b31d1a65

+ 1 - 0
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -35,6 +35,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 %%--------------------------------------------------------------------
 %% emqx_external_broker API

+ 30 - 26
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -61,7 +61,7 @@
 -define(ERROR_DELAY, 200).
 
 -define(RECONNECT_TIMEOUT, 5_000).
--define(ACTOR_REINIT_TIMEOUT, 7000).
+-define(ACTOR_INIT_TIMEOUT, 7000).
 
 -define(CLIENT_SUFFIX, ":routesync:").
 -define(PS_CLIENT_SUFFIX, ":routesync-ps:").
@@ -402,16 +402,16 @@ handle_info(
     end;
 handle_info({timeout, TRef, reconnect}, St = #st{reconnect_timer = TRef}) ->
     {noreply, process_connect(St#st{reconnect_timer = undefined})};
-handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
-    ?SLOG(error, #{
-        msg => "remote_actor_init_timeout",
-        target_cluster => St#st.target,
-        actor => St#st.actor
+handle_info({timeout, TRef, actor_init_timeout}, St0 = #st{actor_init_timer = TRef}) ->
+    ?tp(error, "remote_actor_init_timeout", #{
+        target_cluster => St0#st.target,
+        actor => St0#st.actor
     }),
     Reason = init_timeout,
-    _ = maybe_alarm(Reason, St),
-    {noreply,
-        init_remote_actor(St#st{actor_init_timer = undefined, status = disconnected, error = Reason})};
+    _ = maybe_alarm(Reason, St0),
+    St1 = St0#st{actor_init_timer = undefined, status = disconnected, error = Reason},
+    St = stop_link_client(St1),
+    {noreply, ensure_reconnect_timer(St)};
 handle_info({timeout, TRef, _Heartbeat}, St = #st{heartbeat_timer = TRef}) ->
     {noreply, process_heartbeat(St#st{heartbeat_timer = undefined})};
 %% Stale timeout.
@@ -436,7 +436,7 @@ process_connect(St = #st{target = TargetCluster, actor = Actor, link_conf = Conf
     end.
 
 init_remote_actor(
-    St = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr}
+    St0 = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr}
 ) ->
     ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)),
     %% TODO: handle subscribe errors
@@ -447,22 +447,20 @@ init_remote_actor(
         ),
         ClientPid
     ),
-    St1 =
-        case Res of
-            ok ->
-                St#st{status = connecting};
-            {error, Reason} ->
-                ?SLOG(error, #{
-                    msg => "cluster_link_init_failed",
-                    reason => Reason,
-                    target_cluster => TargetCluster,
-                    actor => Actor
-                }),
-                _ = maybe_alarm(Reason, St),
-                St#st{error = Reason, status = disconnected}
-        end,
-    TRef = erlang:start_timer(?ACTOR_REINIT_TIMEOUT, self(), actor_reinit),
-    St1#st{actor_init_req_id = ReqId, actor_init_timer = TRef}.
+    case Res of
+        ok ->
+            TRef = erlang:start_timer(?ACTOR_INIT_TIMEOUT, self(), actor_init_timeout),
+            St0#st{status = connecting, actor_init_req_id = ReqId, actor_init_timer = TRef};
+        {error, Reason} ->
+            ?tp(error, "cluster_link_init_failed", #{
+                reason => Reason,
+                target_cluster => TargetCluster,
+                actor => Actor
+            }),
+            _ = maybe_alarm(Reason, St0),
+            St = stop_link_client(St0#st{error = Reason, status = disconnected}),
+            ensure_reconnect_timer(St)
+    end.
 
 post_actor_init(
     St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr},
@@ -505,6 +503,12 @@ handle_client_down(
     NSt = cancel_heartbeat(St),
     process_connect(NSt#st{client = undefined, error = Reason, status = connecting}).
 
+stop_link_client(#st{client = ClientPid} = St0) ->
+    ?tp("clink_stop_link_client", #{}),
+    ok = emqtt:stop(ClientPid),
+    flush_link_signal(ClientPid),
+    St0#st{client = undefined}.
+
 process_bootstrap(St = #st{bootstrapped = false}, _NeedBootstrap) ->
     run_bootstrap(St);
 process_bootstrap(St = #st{bootstrapped = true}, NeedBootstrap) ->

+ 110 - 5
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -12,6 +12,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 -define(ON(NODE, DO), erpc:call(NODE, fun() -> DO end)).
 
 %%
@@ -63,8 +65,13 @@ mk_source_cluster(BaseName, Config) ->
         "\n     topics = []"
         "\n   }"
         "\n ]}",
-    SourceApps1 = [{emqx_conf, combine([conf_log(), SourceConf])}],
-    SourceApps2 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(41883), SourceConf])}],
+    ExtraApps = proplists:get_value(extra_apps, Config, []),
+    SourceApps1 = [{emqx_conf, combine([conf_log(), SourceConf])}, emqx | ExtraApps],
+    SourceApps2 = [
+        {emqx_conf, combine([conf_log(), SourceConf])},
+        {emqx, conf_mqtt_listener(41883)}
+        | ExtraApps
+    ],
     emqx_cth_cluster:mk_nodespecs(
         [
             {mk_nodename(BaseName, s1), #{apps => SourceApps1}},
@@ -85,7 +92,10 @@ mk_target_cluster(BaseName, Config) ->
         "\n     topics = [\"#\"]"
         "\n   }"
         "\n ]}",
-    TargetApps1 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(31883), TargetConf])}],
+    TargetApps1 = [
+        {emqx_conf, combine([conf_log(), TargetConf])},
+        {emqx, conf_mqtt_listener(31883)}
+    ],
     TargetApps2 = [{emqx_conf, combine([conf_log(), TargetConf])}],
     emqx_cth_cluster:mk_nodespecs(
         [
@@ -110,13 +120,13 @@ combine([Entry | Rest]) ->
     lists:foldl(fun emqx_cth_suite:merge_config/2, Entry, Rest).
 
 start_cluster_link(Nodes, Config) ->
-    [{ok, Apps}] = lists:usort(
+    Results = lists:usort(
         erpc:multicall(Nodes, emqx_cth_suite, start_apps, [
             [emqx_cluster_link],
             #{work_dir => emqx_cth_suite:work_dir(Config)}
         ])
     ),
-    Apps.
+    lists:flatmap(fun({ok, Apps}) -> Apps end, Results).
 
 stop_cluster_link(Config) ->
     Apps = ?config(tc_apps, Config),
@@ -315,6 +325,101 @@ t_disconnect_on_errors(Config) ->
     ok = emqtt:stop(SC1),
     ok.
 
+%% Checks that if a timeout occurs during actor state initialization, we close the
+%% (potentially unhealthy) connection and start anew.
+t_restart_connection_on_actor_init_timeout('init', Config0) ->
+    ExtraApps = [{emqx_auth, "authorization.no_match = deny"}],
+    SourceNodesSpec = mk_source_cluster(?FUNCTION_NAME, [{extra_apps, ExtraApps} | Config0]),
+    TargetNodesSpec = mk_target_cluster(?FUNCTION_NAME, Config0),
+    ok = snabbkaffe:start_trace(),
+    [
+        {source_nodes_spec, SourceNodesSpec},
+        {target_nodes_spec, TargetNodesSpec}
+        | Config0
+    ];
+t_restart_connection_on_actor_init_timeout('end', _Config) ->
+    ok = snabbkaffe:stop(),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+t_restart_connection_on_actor_init_timeout(Config) ->
+    SourceNodesSpec = ?config(source_nodes_spec, Config),
+    TargetNodesSpec = ?config(target_nodes_spec, Config),
+    SourceNodes = [SN | _] = emqx_cth_cluster:start(SourceNodesSpec),
+    on_exit(fun() -> ok = emqx_cth_cluster:stop(SourceNodes) end),
+
+    %% Simulate a poorly configured node that'll reject the actor init ack
+    %% message, making the initialization time out.
+    ok = ?ON(
+        SN,
+        emqx_authz_test_lib:setup_config(
+            #{
+                <<"type">> => <<"file">>,
+                <<"enable">> => true,
+                <<"rules">> =>
+                    <<
+                        "{deny, all, subscribe, [\"#\"]}.\n"
+                        "{allow, all, publish, [\"$LINK/#\", \"#\"]}."
+                    >>
+            },
+            #{}
+        )
+    ),
+    %% For some reason, it's fruitless to try to set this config in the app specs....
+    {ok, _} = ?ON(
+        SN,
+        emqx_conf:update([authorization, no_match], deny, #{override_to => cluster})
+    ),
+
+    TargetNodes = emqx_cth_cluster:start(TargetNodesSpec),
+    on_exit(fun() -> ok = emqx_cth_cluster:stop(TargetNodes) end),
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            ct:pal("starting cluster link"),
+            ?wait_async_action(
+                start_cluster_link(SourceNodes ++ TargetNodes, Config),
+                #{?snk_kind := "remote_actor_init_timeout"}
+            ),
+
+            %% Fix the authorization config, it should reconnect.
+            ct:pal("fixing config"),
+            ?wait_async_action(
+                begin
+                    ok = ?ON(
+                        SN,
+                        emqx_authz_test_lib:setup_config(
+                            #{
+                                <<"type">> => <<"file">>,
+                                <<"enable">> => true,
+                                <<"rules">> => <<"{allow, all}.">>
+                            },
+                            #{}
+                        )
+                    ),
+                    {ok, _} = ?ON(
+                        SN,
+                        emqx_conf:update([authorization, no_match], allow, #{override_to => cluster})
+                    )
+                end,
+                #{?snk_kind := clink_route_bootstrap_complete}
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            ?assert(
+                ?strict_causality(
+                    #{?snk_kind := "remote_actor_init_timeout", ?snk_meta := #{node := _N1}},
+                    #{?snk_kind := "clink_stop_link_client", ?snk_meta := #{node := _N2}},
+                    _N1 =:= _N2,
+                    Trace
+                )
+            ),
+            ok
+        end
+    ),
+    ok.
+
 %%
 
 maybe_shared_topic(true = _IsShared, Topic) ->