Explorar o código

fix(clusterlink): improve actor error handling

Add status and error reason to the actor state, report alarms.
Serge Tupchii hai 1 ano
pai
achega
faa4420e1f
Modificáronse 1 ficheiros con 77 adicións e 26 borrados
  1. 77 26
      apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

+ 77 - 26
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -296,7 +296,9 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
     reconnect_timer :: reference(),
     actor_init_req_id :: binary(),
     actor_init_timer :: reference(),
-    remote_actor_info :: undefined | map()
+    remote_actor_info :: undefined | map(),
+    status :: connecting | connected | disconnected,
+    error :: undefined | term()
 }).
 
 mk_state(TargetCluster, Actor, Incarnation) ->
@@ -304,7 +306,8 @@ mk_state(TargetCluster, Actor, Incarnation) ->
         target = TargetCluster,
         actor = Actor,
         incarnation = Incarnation,
-        bootstrapped = false
+        bootstrapped = false,
+        status = connecting
     }.
 
 init_actor(State = #st{}) ->
@@ -334,18 +337,20 @@ handle_info(
     },
     case Res of
         ok ->
-            {noreply, post_actor_init(St1, NeedBootstrap)};
+            _ = maybe_deactivate_alarm(St),
+            {noreply,
+                post_actor_init(St1#st{error = undefined, status = connected}, NeedBootstrap)};
         Error ->
+            Reason = error_reason(Error),
             ?SLOG(error, #{
                 msg => "failed_to_init_link",
-                reason => Error,
+                reason => Reason,
                 target_cluster => St#st.target,
                 actor => St#st.actor,
                 remote_link_proto_ver => maps:get(proto_ver, AckInfoMap, undefined)
             }),
-            %% TODO: It doesn't fit permanent workers/one_for_all restart/strategy.
-            %% The actor may be kept alive with some error status instead (waiting for a user intervention to fix it)?
-            {stop, {shutdown, Error}, St1}
+            _ = maybe_alarm(Reason, St1),
+            {noreply, St1#st{error = Reason, status = disconnected}}
     end;
 handle_info({publish, #{}}, St) ->
     {noreply, St};
@@ -357,7 +362,10 @@ handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
         target_cluster => St#st.target,
         actor => St#st.actor
     }),
-    {noreply, init_remote_actor(St#st{reconnect_timer = undefined})};
+    Reason = init_timeout,
+    _ = maybe_alarm(Reason, St),
+    {noreply,
+        init_remote_actor(St#st{reconnect_timer = undefined, status = disconnected, error = Reason})};
 %% Stale timeout.
 handle_info({timeout, _, _}, St) ->
     {noreply, St};
@@ -371,6 +379,7 @@ terminate(_Reason, _State) ->
 process_connect(St = #st{target = TargetCluster, actor = Actor}) ->
     case start_link_client(TargetCluster, Actor) of
         {ok, ClientPid} ->
+            _ = maybe_deactivate_alarm(St),
             ok = announce_client(Actor, TargetCluster, ClientPid),
             %% TODO: handle subscribe errors
             {ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1),
@@ -389,19 +398,22 @@ init_remote_actor(
         ),
         ClientPid
     ),
-    case Res of
-        ok ->
-            ok;
-        {error, Reason} ->
-            ?SLOG(error, #{
-                msg => "failed_to_init_remote_actor",
-                reason => Reason,
-                target_cluster => TargetCluster,
-                actor => Actor
-            })
-    end,
+    St1 =
+        case Res of
+            ok ->
+                St#st{status = connecting};
+            {error, Reason} ->
+                ?SLOG(error, #{
+                    msg => "cluster_link_init_failed",
+                    reason => Reason,
+                    target_cluster => TargetCluster,
+                    actor => Actor
+                }),
+                _ = maybe_alarm(Reason, St),
+                St#st{error = Reason, status = disconnected}
+        end,
     TRef = erlang:start_timer(?ACTOR_REINIT_TIMEOUT, self(), actor_reinit),
-    St#st{actor_init_req_id = ReqId, actor_init_timer = TRef}.
+    St1#st{actor_init_req_id = ReqId, actor_init_timer = TRef}.
 
 post_actor_init(
     St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr},
@@ -410,16 +422,28 @@ post_actor_init(
     ok = start_syncer(TargetCluster, Actor, Incr),
     process_bootstrap(St#st{client = ClientPid}, NeedBootstrap).
 
-handle_connect_error(_Reason, St) ->
-    %% TODO: logs
+handle_connect_error(Reason, St) ->
+    ?SLOG(error, #{
+        msg => "cluster_link_connection_failed",
+        reason => Reason,
+        target_cluster => St#st.target,
+        actor => St#st.actor
+    }),
     TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect),
-    St#st{reconnect_timer = TRef}.
+    _ = maybe_alarm(Reason, St),
+    St#st{reconnect_timer = TRef, error = Reason, status = disconnected}.
 
-handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) ->
-    %% TODO: logs
+handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) ->
+    ?SLOG(error, #{
+        msg => "cluster_link_connection_failed",
+        reason => Reason,
+        target_cluster => St#st.target,
+        actor => St#st.actor
+    }),
     %% TODO: syncer may be already down due to one_for_all strategy
     ok = close_syncer(TargetCluster, Actor),
-    process_connect(St#st{client = undefined}).
+    _ = maybe_alarm(Reason, St),
+    process_connect(St#st{client = undefined, error = Reason, status = connecting}).
 
 process_bootstrap(St = #st{bootstrapped = false}, _NeedBootstrap) ->
     run_bootstrap(St);
@@ -471,3 +495,30 @@ process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) ->
 
 process_bootstrap_batch(Batch, #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) ->
     publish_routes(ClientPid, Actor, Incarnation, Batch).
+
+error_reason({error, Reason}) ->
+    Reason;
+error_reason(OtherErr) ->
+    OtherErr.
+
+%% Assume that alarm is already active
+maybe_alarm(Error, #st{error = Error}) ->
+    ok;
+maybe_alarm(Error, St) ->
+    HrError = emqx_utils:readable_error_msg(error_reason(Error)),
+    Name = link_name(St),
+    emqx_alarm:safe_activate(
+        Name,
+        #{custer_link => Name, reason => cluster_link_down},
+        <<"cluster link down: ", HrError/binary>>
+    ).
+
+maybe_deactivate_alarm(#st{error = undefined}) ->
+    ok;
+maybe_deactivate_alarm(St) ->
+    emqx_alarm:safe_deactivate(link_name(St)).
+
+link_name(#st{actor = ?PS_ACTOR = Actor, target = Target}) ->
+    <<"cluster_link:", Target/binary, ":", (get_actor_id())/binary, ":", Actor/binary>>;
+link_name(#st{actor = Actor, target = Target}) ->
+    <<"cluster_link:", Target/binary, ":", Actor/binary>>.