Explorar o código

feat(clusterlink): implement actor init handshake

Serge Tupchii hai 1 ano
pai
achega
b1aeb35370

+ 6 - 0
apps/emqx_cluster_link/include/emqx_cluster_link.hrl

@@ -5,6 +5,12 @@
 -define(TOPIC_PREFIX, "$LINK/cluster/").
 -define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/").
 -define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/").
+-define(RESP_TOPIC_PREFIX, ?TOPIC_PREFIX "resp/").
+
+-define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()).
+-define(ROUTE_TOPIC, <<?ROUTE_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
+-define(MSG_FWD_TOPIC, <<?MSG_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
+-define(RESP_TOPIC(Actor), <<?RESP_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary, "/", Actor/binary>>).
 
 %% Fairly compact text encoding.
 -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>).

+ 60 - 6
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -83,11 +83,13 @@ should_route_to_external_dests(_Msg) ->
 %% EMQX Hooks
 %%--------------------------------------------------------------------
 
-on_message_publish(#message{topic = <<?ROUTE_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
+on_message_publish(
+    #message{topic = <<?ROUTE_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload} = Msg
+) ->
     _ =
         case emqx_cluster_link_mqtt:decode_route_op(Payload) of
-            {actor_init, #{actor := Actor, incarnation := Incr}} ->
-                actor_init(ClusterName, Actor, Incr);
+            {actor_init, InitInfoMap} ->
+                actor_init(ClusterName, emqx_message:get_header(properties, Msg), InitInfoMap);
             {route_updates, #{actor := Actor, incarnation := Incr}, RouteOps} ->
                 update_routes(ClusterName, Actor, Incr, RouteOps)
         end,
@@ -137,9 +139,61 @@ topic_intersect_any(Topic, [LinkFilter | T]) ->
 topic_intersect_any(_Topic, []) ->
     false.
 
-actor_init(ClusterName, Actor, Incarnation) ->
-    Env = #{timestamp => erlang:system_time(millisecond)},
-    {ok, _} = emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incarnation, Env).
+actor_init(
+    ClusterName,
+    #{'Correlation-Data' := ReqId, 'Response-Topic' := RespTopic},
+    #{
+        actor := Actor,
+        incarnation := Incr,
+        cluster := TargetCluster,
+        proto_ver := _
+    }
+) ->
+    Res =
+        case emqx_cluster_link_config:link(ClusterName) of
+            undefined ->
+                ?SLOG(
+                    error,
+                    #{
+                        msg => "init_link_request_from_unknown_cluster",
+                        link_name => ClusterName
+                    }
+                ),
+                %% Avoid atom error reasons, since they can be sent to the remote cluster,
+                %% which will use safe binary_to_term decoding
+                %% TODO: add error details?
+                {error, <<"unknown_cluster">>};
+            LinkConf ->
+                %% TODO: may be worth checking resource health and communicate it?
+                _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
+                MyClusterName = emqx_cluster_link_config:cluster(),
+                case MyClusterName of
+                    TargetCluster ->
+                        Env = #{timestamp => erlang:system_time(millisecond)},
+                        {ok, _} = emqx_cluster_link_extrouter:actor_init(
+                            ClusterName, Actor, Incr, Env
+                        ),
+                        ok;
+                    _ ->
+                        %% The remote cluster uses a different name to refer to this cluster
+                        ?SLOG(error, #{
+                            msg => "misconfigured_cluster_link_name",
+                            %% How this cluster names itself
+                            local_name => MyClusterName,
+                            %% How the remote cluster names this local cluster
+                            remote_name => TargetCluster,
+                            %% How the remote cluster names itself
+                            received_from => ClusterName
+                        }),
+                        {error, <<"bad_remote_cluster_link_name">>}
+                end
+        end,
+    _ = actor_init_ack(Actor, Res, ReqId, RespTopic),
+    {stop, []}.
+
+actor_init_ack(Actor, Res, ReqId, RespTopic) ->
+    RespMsg = emqx_cluster_link_mqtt:actor_init_ack_resp_msg(Actor, Res, ReqId, RespTopic),
+    emqx_broker:publish(RespMsg).
 
 update_routes(ClusterName, Actor, Incarnation, RouteOps) ->
     ActorState = emqx_cluster_link_extrouter:actor_state(ClusterName, Actor, Incarnation),

+ 52 - 23
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -29,11 +29,13 @@
     ensure_msg_fwd_resource/1,
     stop_msg_fwd_resource/1,
     decode_route_op/1,
-    decode_forwarded_msg/1
+    decode_forwarded_msg/1,
+    decode_resp/1
 ]).
 
 -export([
-    publish_actor_init_sync/3,
+    publish_actor_init_sync/6,
+    actor_init_ack_resp_msg/4,
     publish_route_sync/4,
     encode_field/2
 ]).
@@ -45,11 +47,6 @@
 -define(MSG_CLIENTID_SUFFIX, ":msg:").
 
 -define(MQTT_HOST_OPTS, #{default_port => 1883}).
--define(MY_CLUSTER_NAME, emqx_cluster_link_config:cluster()).
-
--define(ROUTE_TOPIC, <<?ROUTE_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
--define(MSG_FWD_TOPIC, <<?MSG_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
-%%-define(CTRL_TOPIC(ClusterName), <<?CTRL_TOPIC_PREFIX, (ClusterName)/binary>>).
 
 -define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:").
 -define(RES_NAME(Prefix, ClusterName), <<Prefix, ClusterName/binary>>).
@@ -58,8 +55,7 @@
 -define(HEALTH_CHECK_TIMEOUT, 1000).
 -define(RES_GROUP, <<"emqx_cluster_link">>).
 
-%% Protocol
-%% -define(PROTO_VER, <<"1.0">>).
+-define(PROTO_VER, 1).
 
 -define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])).
 -define(ENCODE(Payload), erlang:term_to_binary(Payload)).
@@ -67,10 +63,14 @@
 -define(F_OPERATION, '$op').
 -define(OP_ROUTE, <<"route">>).
 -define(OP_ACTOR_INIT, <<"actor_init">>).
+-define(OP_ACTOR_INIT_ACK, <<"actor_init_ack">>).
 
 -define(F_ACTOR, 10).
 -define(F_INCARNATION, 11).
 -define(F_ROUTES, 12).
+-define(F_TARGET_CLUSTER, 13).
+-define(F_PROTO_VER, 14).
+-define(F_RESULT, 15).
 
 -define(ROUTE_DELETE, 100).
 
@@ -128,16 +128,6 @@ on_query(_ResourceId, FwdMsg, #{pool_name := PoolName, topic := LinkTopic} = _St
             end,
             no_handover
         )
-    );
-on_query(_ResourceId, {Topic, Props, Payload, QoS}, #{pool_name := PoolName} = _State) ->
-    handle_send_result(
-        ecpool:pick_and_do(
-            {PoolName, Topic},
-            fun(ConnPid) ->
-                emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}])
-            end,
-            no_handover
-        )
     ).
 
 on_query_async(
@@ -270,15 +260,36 @@ connect(Options) ->
 
 %%% New leader-less Syncer/Actor implementation
 
-publish_actor_init_sync(ClientPid, Actor, Incarnation) ->
-    %% TODO: handshake (request / response) to make sure the link is established
+publish_actor_init_sync(ClientPid, ReqId, RespTopic, TargetCluster, Actor, Incarnation) ->
     PubTopic = ?ROUTE_TOPIC,
     Payload = #{
         ?F_OPERATION => ?OP_ACTOR_INIT,
+        ?F_PROTO_VER => ?PROTO_VER,
+        ?F_TARGET_CLUSTER => TargetCluster,
         ?F_ACTOR => Actor,
         ?F_INCARNATION => Incarnation
     },
-    emqtt:publish(ClientPid, PubTopic, ?ENCODE(Payload), ?QOS_1).
+    Properties = #{
+        'Response-Topic' => RespTopic,
+        'Correlation-Data' => ReqId
+    },
+    emqtt:publish(ClientPid, PubTopic, Properties, ?ENCODE(Payload), [{qos, ?QOS_1}]).
+
+actor_init_ack_resp_msg(Actor, InitRes, ReqId, RespTopic) ->
+    Payload = #{
+        ?F_OPERATION => ?OP_ACTOR_INIT_ACK,
+        ?F_PROTO_VER => ?PROTO_VER,
+        ?F_ACTOR => Actor,
+        ?F_RESULT => InitRes
+    },
+    emqx_message:make(
+        undefined,
+        ?QOS_1,
+        RespTopic,
+        ?ENCODE(Payload),
+        #{},
+        #{properties => #{'Correlation-Data' => ReqId}}
+    ).
 
 publish_route_sync(ClientPid, Actor, Incarnation, Updates) ->
     PubTopic = ?ROUTE_TOPIC,
@@ -293,12 +304,22 @@ publish_route_sync(ClientPid, Actor, Incarnation, Updates) ->
 decode_route_op(Payload) ->
     decode_route_op1(?DECODE(Payload)).
 
+decode_resp(Payload) ->
+    decode_resp1(?DECODE(Payload)).
+
 decode_route_op1(#{
     ?F_OPERATION := ?OP_ACTOR_INIT,
+    ?F_PROTO_VER := ProtoVer,
+    ?F_TARGET_CLUSTER := TargetCluster,
     ?F_ACTOR := Actor,
     ?F_INCARNATION := Incr
 }) ->
-    {actor_init, #{actor => Actor, incarnation => Incr}};
+    {actor_init, #{
+        actor => Actor,
+        incarnation => Incr,
+        cluster => TargetCluster,
+        proto_ver => ProtoVer
+    }};
 decode_route_op1(#{
     ?F_OPERATION := ?OP_ROUTE,
     ?F_ACTOR := Actor,
@@ -314,6 +335,14 @@ decode_route_op1(Payload) ->
     }),
     {error, Payload}.
 
+decode_resp1(#{
+    ?F_OPERATION := ?OP_ACTOR_INIT_ACK,
+    ?F_ACTOR := Actor,
+    ?F_PROTO_VER := ProtoVer,
+    ?F_RESULT := InitResult
+}) ->
+    {actor_init_ack, #{actor => Actor, result => InitResult, proto_ver => ProtoVer}}.
+
 decode_forwarded_msg(Payload) ->
     case ?DECODE(Payload) of
         #message{} = Msg ->

+ 103 - 22
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -4,6 +4,8 @@
 -module(emqx_cluster_link_router_syncer).
 
 -include_lib("emqtt/include/emqtt.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_cluster_link.hrl").
 
 %% API
 -export([start_link/1]).
@@ -50,9 +52,10 @@
 -define(ERROR_DELAY, 200).
 
 -define(RECONNECT_TIMEOUT, 5_000).
+-define(ACTOR_REINIT_TIMEOUT, 7000).
 
--define(CLIENT_SUFFIX, ":routesync").
--define(PS_CLIENT_SUFFIX, ":routesync-ps").
+-define(CLIENT_SUFFIX, ":routesync:").
+-define(PS_CLIENT_SUFFIX, ":routesync-ps:").
 
 %% Special actor for persistent routes that has the same actor name on all nodes.
 %% Node actors with the same name  nay race with each other (e.g. during bootstrap),
@@ -65,6 +68,21 @@
 -define(PS_SYNCER_REF(Cluster), {via, gproc, ?PS_SYNCER_NAME(Cluster)}).
 -define(PS_SYNCER_NAME(Cluster), ?NAME(Cluster, ps_syncer)).
 
+-define(SAFE_MQTT_PUB(Expr, ClientPid), ?SAFE_MQTT_PUB(Expr, ClientPid, ok)).
+-define(SAFE_MQTT_PUB(Expr, ClientPid, OnSuccess),
+    try Expr of
+        {ok, #{reason_code := __RC}} when __RC < ?RC_UNSPECIFIED_ERROR ->
+            OnSuccess;
+        {ok, #{reason_code_name := __RCN}} ->
+            {error, {mqtt, __RCN}};
+        {error, __Reason} ->
+            {error, __Reason}
+    catch
+        exit:__Reason ->
+            {error, {client, ClientPid, __Reason}}
+    end
+).
+
 push(TargetCluster, OpName, Topic, ID) ->
     do_push(?SYNCER_NAME(TargetCluster), OpName, Topic, ID).
 
@@ -164,17 +182,11 @@ announce_client(Actor, TargetCluster, Pid) ->
     ok.
 
 publish_routes(ClientPid, Actor, Incarnation, Updates) ->
-    try emqx_cluster_link_mqtt:publish_route_sync(ClientPid, Actor, Incarnation, Updates) of
-        {ok, #{reason_code := RC}} when RC < ?RC_UNSPECIFIED_ERROR ->
-            #{};
-        {ok, #{reason_code_name := RCN}} ->
-            {error, {mqtt, RCN}};
-        {error, Reason} ->
-            {error, Reason}
-    catch
-        exit:Reason ->
-            {error, {client, ClientPid, Reason}}
-    end.
+    ?SAFE_MQTT_PUB(
+        emqx_cluster_link_mqtt:publish_route_sync(ClientPid, Actor, Incarnation, Updates),
+        ClientPid,
+        #{}
+    ).
 
 %% Route syncer
 
@@ -227,6 +239,7 @@ batch_get_opname(Op) ->
 init({sup, TargetCluster}) ->
     %% FIXME: Intensity.
     SupFlags = #{
+        %% TODO: one_for_one?
         strategy => one_for_all,
         intensity => 10,
         period => 60
@@ -288,7 +301,10 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
     incarnation :: non_neg_integer(),
     client :: {pid(), reference()},
     bootstrapped :: boolean(),
-    reconnect_timer :: reference()
+    reconnect_timer :: reference(),
+    actor_init_req_id :: binary(),
+    actor_init_timer :: reference(),
+    remote_actor_info :: undefined | map()
 }).
 
 mk_state(TargetCluster, Actor, Incarnation) ->
@@ -314,27 +330,91 @@ handle_cast(_Request, State) ->
 
 handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) ->
     {noreply, handle_client_down(Reason, St)};
-handle_info({timeout, TRef, _Reconnect}, St = #st{reconnect_timer = TRef}) ->
+handle_info(
+    {publish, #{payload := Payload, properties := #{'Correlation-Data' := ReqId}}},
+    St = #st{actor_init_req_id = ReqId}
+) ->
+    {actor_init_ack, #{result := Res} = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(Payload),
+    St1 = St#st{
+        actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap
+    },
+    case Res of
+        ok ->
+            {noreply, post_actor_init(St1)};
+        Error ->
+            ?SLOG(error, #{
+                msg => "failed_to_init_link",
+                reason => Error,
+                target_cluster => St#st.target,
+                actor => St#st.actor,
+                remote_link_proto_ver => maps:get(proto_ver, AckInfoMap, undefined)
+            }),
+            %% TODO: It doesn't fit permanent workers/one_for_all restart/strategy.
+            %% The actor may be kept alive with some error status instead (waiting for a user intervention to fix it)?
+            {stop, {shutdown, Error}, St1}
+    end;
+handle_info({publish, #{}}, St) ->
+    {noreply, St};
+handle_info({timeout, TRef, reconnect}, St = #st{reconnect_timer = TRef}) ->
     {noreply, process_connect(St#st{reconnect_timer = undefined})};
-handle_info(_Info, St) ->
-    %% TODO: log?
+handle_info({timeout, TRef, actor_reinit}, St = #st{reconnect_timer = TRef}) ->
+    ?SLOG(error, #{
+        msg => "remote_actor_init_timeout",
+        target_cluster => St#st.target,
+        actor => St#st.actor
+    }),
+    {noreply, init_remote_actor(St#st{reconnect_timer = undefined})};
+%% Stale timeout.
+handle_info({timeout, _, _}, St) ->
+    {noreply, St};
+handle_info(Info, St) ->
+    ?SLOG(warning, #{msg => "unexpected_info", info => Info}),
     {noreply, St}.
 
 terminate(_Reason, _State) ->
     ok.
 
-process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) ->
+process_connect(St = #st{target = TargetCluster, actor = Actor}) ->
     case start_link_client(TargetCluster, Actor) of
         {ok, ClientPid} ->
-            %% TODO: error handling, handshake
-            {ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr),
-            ok = start_syncer(TargetCluster, Actor, Incr),
             ok = announce_client(Actor, TargetCluster, ClientPid),
-            process_bootstrap(St#st{client = ClientPid});
+            %% TODO: handle subscribe errors
+            {ok, _, _} = emqtt:subscribe(ClientPid, ?RESP_TOPIC(Actor), ?QOS_1),
+            init_remote_actor(St#st{client = ClientPid});
         {error, Reason} ->
             handle_connect_error(Reason, St)
     end.
 
+init_remote_actor(
+    St = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr}
+) ->
+    ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)),
+    Res = ?SAFE_MQTT_PUB(
+        emqx_cluster_link_mqtt:publish_actor_init_sync(
+            ClientPid, ReqId, ?RESP_TOPIC(Actor), TargetCluster, Actor, Incr
+        ),
+        ClientPid
+    ),
+    case Res of
+        ok ->
+            ok;
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_init_remote_actor",
+                reason => Reason,
+                target_cluster => TargetCluster,
+                actor => Actor
+            })
+    end,
+    TRef = erlang:start_timer(?ACTOR_REINIT_TIMEOUT, self(), actor_reinit),
+    St#st{actor_init_req_id = ReqId, actor_init_timer = TRef}.
+
+post_actor_init(
+    St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr}
+) ->
+    ok = start_syncer(TargetCluster, Actor, Incr),
+    process_bootstrap(St#st{client = ClientPid}).
+
 handle_connect_error(_Reason, St) ->
     %% TODO: logs
     TRef = erlang:start_timer(?RECONNECT_TIMEOUT, self(), reconnect),
@@ -342,6 +422,7 @@ handle_connect_error(_Reason, St) ->
 
 handle_client_down(_Reason, St = #st{target = TargetCluster, actor = Actor}) ->
     %% TODO: logs
+    %% TODO: syncer may be already down due to one_for_all strategy
     ok = close_syncer(TargetCluster, Actor),
     process_connect(St#st{client = undefined}).