Parcourir la source

fix(clusterlink): communicate bootstrap requirements via actor handshake

`session_present` flag is not reliable to decide whether bootstrap is needed if durable sessions is enabled.
In this case, the client session may survive cluster restart while all the external routes are lost, as they are not persistent.
Serge Tupchii il y a 1 an
Parent
commit
21711c6e0d

+ 1 - 4
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -170,10 +170,7 @@ actor_init(
                 case MyClusterName of
                     TargetCluster ->
                         Env = #{timestamp => erlang:system_time(millisecond)},
-                        {ok, _} = emqx_cluster_link_extrouter:actor_init(
-                            ClusterName, Actor, Incr, Env
-                        ),
-                        ok;
+                        emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incr, Env);
                     _ ->
                         %% The remote cluster uses a different name to refer to this cluster
                         ?SLOG(error, #{

+ 13 - 5
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -19,7 +19,8 @@
     actor_state/3,
     actor_apply_operation/2,
     actor_apply_operation/3,
-    actor_gc/1
+    actor_gc/1,
+    is_present_incarnation/1
 ]).
 
 %% Internal API
@@ -140,7 +141,8 @@ match_to_route(M) ->
     cluster :: cluster(),
     actor :: actor(),
     incarnation :: incarnation(),
-    lane :: lane() | undefined
+    lane :: lane() | undefined,
+    extra :: map()
 }).
 
 -type state() :: #state{}.
@@ -159,6 +161,12 @@ actor_init(Cluster, Actor, Incarnation, Env = #{timestamp := Now}) ->
             actor_init(Cluster, Actor, Incarnation, Env)
     end.
 
+-spec is_present_incarnation(state()) -> boolean().
+is_present_incarnation(#state{extra = #{is_present_incarnation := IsNew}}) ->
+    IsNew;
+is_present_incarnation(_State) ->
+    false.
+
 mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
     %% NOTE
     %% We perform this heavy-weight transaction only in the case of a new route
@@ -173,7 +181,7 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
     case mnesia:read(?EXTROUTE_ACTOR_TAB, ActorID, write) of
         [#actor{incarnation = Incarnation, lane = Lane} = Rec] ->
             ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec#actor{until = bump_actor_ttl(TS)}, write),
-            {ok, State#state{lane = Lane}};
+            {ok, State#state{lane = Lane, extra = #{is_present_incarnation => true}}};
         [] ->
             Lane = mnesia_assign_lane(Cluster),
             Rec = #actor{
@@ -183,7 +191,7 @@ mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
                 until = bump_actor_ttl(TS)
             },
             ok = mnesia:write(?EXTROUTE_ACTOR_TAB, Rec, write),
-            {ok, State#state{lane = Lane}};
+            {ok, State#state{lane = Lane, extra = #{is_present_incarnation => false}}};
         [#actor{incarnation = Outdated} = Rec] when Incarnation > Outdated ->
             {reincarnate, Rec};
         [#actor{incarnation = Newer}] ->
@@ -321,7 +329,7 @@ mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = La
 clean_lane(Lane) ->
     ets:foldl(
         fun(#extroute{entry = Entry, mcounter = MCounter}, _) ->
-            apply_operation(Entry, MCounter, del, Lane)
+            apply_operation(Entry, MCounter, delete, Lane)
         end,
         0,
         ?EXTROUTE_TAB

+ 20 - 5
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -71,6 +71,7 @@
 -define(F_TARGET_CLUSTER, 13).
 -define(F_PROTO_VER, 14).
 -define(F_RESULT, 15).
+-define(F_NEED_BOOTSTRAP, 16).
 
 -define(ROUTE_DELETE, 100).
 
@@ -279,18 +280,29 @@ actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) ->
     Payload = #{
         ?F_OPERATION => ?OP_ACTOR_INIT_ACK,
         ?F_PROTO_VER => ?PROTO_VER,
-        ?F_ACTOR => Actor,
-        ?F_RESULT => InitRes
+        ?F_ACTOR => Actor
     },
+    Payload1 = with_res_and_bootstrap(Payload, InitRes),
     emqx_message:make(
         undefined,
         ?QOS_1,
         RespTopic,
-        ?ENCODE(Payload),
+        ?ENCODE(Payload1),
         #{},
         #{properties => #{'Correlation-Data' => ReqId}}
     ).
 
+with_res_and_bootstrap(Payload, {ok, ActorState}) ->
+    Payload#{
+        ?F_RESULT => ok,
+        ?F_NEED_BOOTSTRAP => not emqx_cluster_link_extrouter:is_present_incarnation(ActorState)
+    };
+with_res_and_bootstrap(Payload, Error) ->
+    Payload#{
+        ?F_RESULT => Error,
+        ?F_NEED_BOOTSTRAP => false
+    }.
+
 publish_route_sync(ClientPid, Actor, Incarnation, Updates) ->
     PubTopic = ?ROUTE_TOPIC,
     Payload = #{
@@ -339,9 +351,12 @@ decode_resp1(#{
     ?F_OPERATION := ?OP_ACTOR_INIT_ACK,
     ?F_ACTOR := Actor,
     ?F_PROTO_VER := ProtoVer,
-    ?F_RESULT := InitResult
+    ?F_RESULT := InitResult,
+    ?F_NEED_BOOTSTRAP := NeedBootstrap
 }) ->
-    {actor_init_ack, #{actor => Actor, result => InitResult, proto_ver => ProtoVer}}.
+    {actor_init_ack, #{
+        actor => Actor, result => InitResult, proto_ver => ProtoVer, need_bootstrap => NeedBootstrap
+    }}.
 
 decode_forwarded_msg(Payload) ->
     case ?DECODE(Payload) of

+ 12 - 17
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -164,14 +164,6 @@ refine_client_options(Options = #{clientid := ClientID}, Actor) ->
         retry_interval => 0
     }.
 
-client_session_present(ClientPid) ->
-    Info = emqtt:info(ClientPid),
-    %% FIXME: waitnig for emqtt release that fixes session_present type (must be a boolean)
-    case proplists:get_value(session_present, Info, 0) of
-        0 -> false;
-        1 -> true
-    end.
-
 announce_client(Actor, TargetCluster, Pid) ->
     Name =
         case Actor of
@@ -334,13 +326,15 @@ handle_info(
     {publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}},
     St = #st{actor_init_req_id = ReqId}
 ) ->
-    {actor_init_ack, #{result := Res} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(Payload),
+    {actor_init_ack, #{result := Res, need_bootstrap := NeedBootstrap} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(
+        Payload
+    ),
     St1 = St#st{
         actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap
     },
     case Res of
         ok ->
-            {noreply, post_actor_init(St1)};
+            {noreply, post_actor_init(St1, NeedBootstrap)};
         Error ->
             ?SLOG(error, #{
                 msg => "failed_to_init_link",
@@ -410,10 +404,11 @@ init_remote_actor(
     St#st{actor_init_req_id = ReqId, actor_init_timer = TRef}.
 
 post_actor_init(
-    St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr}
+    St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr},
+    NeedBootstrap
 ) ->
     ok = start_syncer(TargetCluster, Actor, Incr),
-    process_bootstrap(St#st{client = ClientPid}).
+    process_bootstrap(St#st{client = ClientPid}, NeedBootstrap).
 
 handle_connect_error(_Reason, St) ->
     %% TODO: logs
@@ -426,14 +421,14 @@ handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) ->
     ok = close_syncer(TargetCluster, Actor),
     process_connect(St#st{client = undefined}).
 
-process_bootstrap(St = #st{bootstrapped = false}) ->
+process_bootstrap(St = #st{bootstrapped = false}, _NeedBootstrap) ->
     run_bootstrap(St);
-process_bootstrap(St = #st{client = ClientPid, bootstrapped = true}) ->
-    case client_session_present(ClientPid) of
+process_bootstrap(St = #st{bootstrapped = true}, NeedBootstrap) ->
+    case NeedBootstrap of
         true ->
-            process_bootstrapped(St);
+            run_bootstrap(St);
         false ->
-            run_bootstrap(St)
+            process_bootstrapped(St)
     end.
 
 %% Bootstrapping.