Przeglądaj źródła

feat(cluster-link): implement replication actor heartbeats

Andrew Mayorov 1 rok temu
rodzic
commit
d4b449c6e1

+ 4 - 0
.gitignore

@@ -79,3 +79,7 @@ rebar-git-cache.tar
 apps/emqx_utils/src/emqx_variform_parser.erl
 apps/emqx_utils/src/emqx_variform_scan.erl
 default-profile.mk
+# local
+/_compat
+/scratch
+SCRATCH

+ 8 - 1
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -91,7 +91,9 @@ on_message_publish(
             {actor_init, InitInfoMap} ->
                 actor_init(ClusterName, emqx_message:get_header(properties, Msg), InitInfoMap);
             {route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} ->
-                update_routes(ClusterName, Actor, Incr, RouteOps)
+                update_routes(ClusterName, Actor, Incr, RouteOps);
+            {heartbeat, #{actor := Actor, incarnation := Incr}} ->
+                actor_heartbeat(ClusterName, Actor, Incr)
         end,
     {stop, []};
 on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
@@ -201,6 +203,11 @@ update_routes(ClusterName, Actor, Incarnation, RouteOps) ->
         RouteOps
     ).
 
+actor_heartbeat(ClusterName, Actor, Incarnation) ->
+    Env = #{timestamp => erlang:system_time(millisecond)},
+    ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation),
+    _State = emqx_cluster_link_extrouter:actor_apply_operation(heartbeat, ActorState, Env).
+
 %% let it crash if extra is not a map,
 %% we don't expect the message to be forwarded from an older EMQX release,
 %% that doesn't set extra = #{} by default.

+ 0 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -244,7 +244,6 @@ apply_operation(Entry, OpName, Lane) ->
     %% This is safe sequence of operations only on core nodes. On replicants,
     %% `mria:dirty_update_counter/3` will be replicated asynchronously, which
     %% means this read can be stale.
-    % MCounter = ets:lookup_element(Tab, Entry, 2, 0),
     case mnesia:dirty_read(?EXTROUTE_TAB, Entry) of
         [#extroute{mcounter = MCounter}] ->
             apply_operation(Entry, MCounter, OpName, Lane);

+ 18 - 4
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -37,6 +37,7 @@
     publish_actor_init_sync/6,
     actor_init_ack_resp_msg/4,
     publish_route_sync/4,
+    publish_heartbeat/3,
     encode_field/2
 ]).
 
@@ -62,6 +63,7 @@
 
 -define(F_OPERATION, '$op').
 -define(OP_ROUTE, <<"route">>).
+-define(OP_HEARTBEAT, <<"heartbeat">>).
 -define(OP_ACTOR_INIT, <<"actor_init">>).
 -define(OP_ACTOR_INIT_ACK, <<"actor_init_ack">>).
 
@@ -262,7 +264,6 @@ connect(Options) ->
 %%% New leader-less Syncer/Actor implementation
 
 publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incarnation) ->
-    PubTopic = ?ROUTE_TOPIC,
     Payload = #{
         ?F_OPERATION => ?OP_ACTOR_INIT,
         ?F_PROTO_VER => ?PROTO_VER,
@@ -274,7 +275,7 @@ publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incar
         'Response-Topic' => RespTopic,
         'Correlation-Data' => ReqId
     },
-    emqtt:publish(ClientPid, PubTopic, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]).
+    emqtt:publish(ClientPid, ?ROUTE_TOPIC, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]).
 
 actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) ->
     Payload = #{
@@ -304,14 +305,21 @@ with_res_and_bootstrap(Payload, Error) ->
     }.
 
 publish_route_sync(ClientPid, Actor, Incarnation, Updates) ->
-    PubTopic = ?ROUTE_TOPIC,
     Payload = #{
         ?F_OPERATION => ?OP_ROUTE,
         ?F_ACTOR => Actor,
         ?F_INCARNATION => Incarnation,
         ?F_ROUTES => Updates
     },
-    emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1).
+    emqtt:publish(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_1).
+
+publish_heartbeat(ClientPid, Actor, Incarnation) ->
+    Payload = #{
+        ?F_OPERATION => ?OP_HEARTBEAT,
+        ?F_ACTOR => Actor,
+        ?F_INCARNATION => Incarnation
+    },
+    emqtt:publish_async(ClientPid, ?ROUTE_TOPIC, ?ENCODE(Payload), ?QOS_0, undefined).
 
 decode_route_op(Payload) ->
     decode_route_op1(?DECODE(Payload)).
@@ -340,6 +348,12 @@ decode_route_op1(#{
 }) ->
     RouteOps1 = lists:map(fun(Op) -> decode_field(route, Op) end, RouteOps),
     {route_updates, #{actor => Actor, incarnation => Incr}, RouteOps1};
+decode_route_op1(#{
+    ?F_OPERATION := ?OP_HEARTBEAT,
+    ?F_ACTOR := Actor,
+    ?F_INCARNATION := Incr
+}) ->
+    {heartbeat, #{actor => Actor, incarnation => Incr}};
 decode_route_op1(Payload) ->
     ?SLOG(warning, #{
         msg => "unexpected_cluster_link_route_op_payload",

+ 19 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -53,6 +53,7 @@
 
 -define(RECONNECT_TIMEOUT, 5_000).
 -define(ACTOR_REINIT_TIMEOUT, 7000).
+-define(HEARTBEAT_INTERVAL, 10_000).
 
 -define(CLIENT_SUFFIX, ":routesync:").
 -define(PS_CLIENT_SUFFIX, ":routesync-ps:").
@@ -180,6 +181,10 @@ publish_routes(ClientPid, Actor, Incarnation, Updates) ->
         #{}
     ).
 
+publish_heartbeat(ClientPid, Actor, Incarnation) ->
+    %% NOTE: Fully asynchronous, no need for error handling.
+    emqx_cluster_link_mqtt:publish_heartbeat(ClientPid, Actor, Incarnation).
+
 %% Route syncer
 
 start_syncer(TargetCluster, Actor, Incr) ->
@@ -294,6 +299,7 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
     client :: {pid(), reference()},
     bootstrapped :: boolean(),
     reconnect_timer :: reference(),
+    heartbeat_timer :: reference(),
     actor_init_req_id :: binary(),
     actor_init_timer :: reference(),
     remote_actor_info :: undefined | map(),
@@ -366,6 +372,8 @@ handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
     _ = maybe_alarm(Reason, St),
     {noreply,
         init_remote_actor(St#st{reconnect_timer = undefined, status = disconnected, error = Reason})};
+handle_info({timeout, TRef, _Heartbeat}, St = #st{heartbeat_timer = TRef}) ->
+    {noreply, process_heartbeat(St#st{heartbeat_timer = undefined})};
 %% Stale timeout.
 handle_info({timeout, _, _}, St) ->
     {noreply, St};
@@ -420,7 +428,9 @@ post_actor_init(
     NeedBootstrap
 ) ->
     ok = start_syncer(TargetCluster, Actor, Incr),
-    process_bootstrap(St#st{client = ClientPid}, NeedBootstrap).
+    %% TODO: Heartbeats are currently blocked by bootstrapping.
+    NSt = schedule_heartbeat(St#st{client = ClientPid}),
+    process_bootstrap(NSt, NeedBootstrap).
 
 handle_connect_error(Reason, St) ->
     ?SLOG(error, #{
@@ -455,6 +465,14 @@ process_bootstrap(St = #st{bootstrapped = true}, NeedBootstrap) ->
             process_bootstrapped(St)
     end.
 
+process_heartbeat(St = #st{client = ClientPid, actor = Actor, incarnation = Incarnation}) ->
+    ok = publish_heartbeat(ClientPid, Actor, Incarnation),
+    schedule_heartbeat(St).
+
+schedule_heartbeat(St = #st{heartbeat_timer = undefined}) ->
+    TRef = erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat),
+    St#st{heartbeat_timer = TRef}.
+
 %% Bootstrapping.
 %% Responsible for transferring local routing table snapshot to the target
 %% cluster. Does so either during the initial startup or when MQTT connection