Просмотр исходного кода

feat(clusterlink): implement actor config handler

Serge Tupchii 1 год назад
Родитель
Сommit
94e81ba812

+ 28 - 7
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -7,6 +7,7 @@
 -behaviour(emqx_external_broker).
 -behaviour(emqx_external_broker).
 
 
 -export([
 -export([
+    is_registered/0,
     register_external_broker/0,
     register_external_broker/0,
     unregister_external_broker/0,
     unregister_external_broker/0,
     add_route/1,
     add_route/1,
@@ -36,8 +37,14 @@
 %% emqx_external_broker API
 %% emqx_external_broker API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+is_registered() ->
+    emqx_external_broker:provider() =:= ?MODULE.
+
 register_external_broker() ->
 register_external_broker() ->
-    emqx_external_broker:register_provider(?MODULE).
+    case is_registered() of
+        true -> ok;
+        false -> emqx_external_broker:register_provider(?MODULE)
+    end.
 
 
 unregister_external_broker() ->
 unregister_external_broker() ->
     emqx_external_broker:unregister_provider(?MODULE).
     emqx_external_broker:unregister_provider(?MODULE).
@@ -94,13 +101,18 @@ on_message_publish(
         {route_updates, #{actor := Actor}, RouteOps} ->
         {route_updates, #{actor := Actor}, RouteOps} ->
             ok = update_routes(ClusterName, Actor, RouteOps);
             ok = update_routes(ClusterName, Actor, RouteOps);
         {heartbeat, #{actor := Actor}} ->
         {heartbeat, #{actor := Actor}} ->
-            ok = actor_heartbeat(ClusterName, Actor)
+            ok = actor_heartbeat(ClusterName, Actor);
+        {error, {unknown_payload, ParsedPayload}} ->
+            ?SLOG(warning, #{
+                msg => "unexpected_cluster_link_route_op_payload",
+                payload => ParsedPayload
+            })
     end,
     end,
     {stop, []};
     {stop, []};
 on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
 on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
     case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of
     case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of
         #message{} = ForwardedMsg ->
         #message{} = ForwardedMsg ->
-            {stop, with_sender_name(ForwardedMsg, ClusterName)};
+            {stop, maybe_filter_incomming_msg(ForwardedMsg, ClusterName)};
         _Err ->
         _Err ->
             %% Just ignore it. It must be already logged by the decoder
             %% Just ignore it. It must be already logged by the decoder
             {stop, []}
             {stop, []}
@@ -163,9 +175,7 @@ actor_init(
             %% which will use safe binary_to_term decoding
             %% which will use safe binary_to_term decoding
             %% TODO: add error details?
             %% TODO: add error details?
             {error, <<"unknown_cluster">>};
             {error, <<"unknown_cluster">>};
-        LinkConf ->
-            %% TODO: may be worth checking resource health and communicate it?
-            _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
+        #{enable := true} = _LinkConf ->
             MyClusterName = emqx_cluster_link_config:cluster(),
             MyClusterName = emqx_cluster_link_config:cluster(),
             case MyClusterName of
             case MyClusterName of
                 TargetCluster ->
                 TargetCluster ->
@@ -187,7 +197,9 @@ actor_init(
                         received_from => ClusterName
                         received_from => ClusterName
                     }),
                     }),
                     {error, <<"bad_remote_cluster_link_name">>}
                     {error, <<"bad_remote_cluster_link_name">>}
-            end
+            end;
+        #{enable := false} ->
+            {error, <<"clster_link_disabled">>}
     end.
     end.
 
 
 actor_init_ack(#{actor := Actor}, Res, MsgIn) ->
 actor_init_ack(#{actor := Actor}, Res, MsgIn) ->
@@ -226,3 +238,12 @@ update_actor_state(ActorSt) ->
 %% that doesn't set extra = #{} by default.
 %% that doesn't set extra = #{} by default.
 with_sender_name(#message{extra = Extra} = Msg, ClusterName) when is_map(Extra) ->
 with_sender_name(#message{extra = Extra} = Msg, ClusterName) when is_map(Extra) ->
     Msg#message{extra = Extra#{link_origin => ClusterName}}.
     Msg#message{extra = Extra#{link_origin => ClusterName}}.
+
+maybe_filter_incomming_msg(#message{topic = T} = Msg, ClusterName) ->
+    %% Should prevent irrelevant messages from being dispatched in case
+    %% the remote routing state lags behind the local config changes.
+    #{enable := Enable, topics := Topics} = emqx_cluster_link_config:link(ClusterName),
+    case Enable andalso emqx_topic:match_any(T, Topics) of
+        true -> with_sender_name(Msg, ClusterName);
+        false -> []
+    end.

+ 116 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_api.erl

@@ -0,0 +1,116 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_api).
+
+-behaviour(minirest_api).
+
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/http_api.hrl").
+
+-export([
+    api_spec/0,
+    paths/0,
+    schema/1
+]).
+
+-export([config/2]).
+
+-define(CONF_PATH, [cluster, links]).
+-define(TAGS, [<<"Cluster">>]).
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/cluster/links"
+    ].
+
+schema("/cluster/links") ->
+    #{
+        'operationId' => config,
+        get =>
+            #{
+                description => "Get cluster links configuration",
+                tags => ?TAGS,
+                responses =>
+                    #{200 => links_config_schema()}
+            },
+        put =>
+            #{
+                description => "Update cluster links configuration",
+                tags => ?TAGS,
+                'requestBody' => links_config_schema(),
+                responses =>
+                    #{
+                        200 => links_config_schema(),
+                        400 =>
+                            emqx_dashboard_swagger:error_codes(
+                                [?BAD_REQUEST], <<"Update Config Failed">>
+                            )
+                    }
+            }
+    }.
+
+%%--------------------------------------------------------------------
+%% API Handler funcs
+%%--------------------------------------------------------------------
+
+config(get, _Params) ->
+    {200, get_raw()};
+config(put, #{body := Body}) ->
+    case emqx_cluster_link_config:update(Body) of
+        {ok, NewConfig} ->
+            {200, NewConfig};
+        {error, Reason} ->
+            Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
+            {400, ?BAD_REQUEST, Message}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+get_raw() ->
+    #{<<"links">> := Conf} =
+        emqx_config:fill_defaults(
+            #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)},
+            #{obfuscate_sensitive_values => true}
+        ),
+    Conf.
+
+links_config_schema() ->
+    emqx_cluster_link_schema:links_schema(
+        #{
+            examples => #{<<"example">> => links_config_example()}
+        }
+    ).
+
+links_config_example() ->
+    [
+        #{
+            <<"enable">> => true,
+            <<"pool_size">> => 10,
+            <<"server">> => <<"emqxcl_b.host:1883">>,
+            <<"ssl">> => #{<<"enable">> => false},
+            <<"topics">> =>
+                [
+                    <<"t/topic-example">>,
+                    <<"t/topic-filter-example/1/#">>
+                ],
+            <<"upstream">> => <<"emqxcl_b">>
+        },
+        #{
+            <<"enable">> => true,
+            <<"pool_size">> => 10,
+            <<"server">> => <<"emqxcl_c.host:1883">>,
+            <<"ssl">> => #{<<"enable">> => false},
+            <<"topics">> =>
+                [
+                    <<"t/topic-example">>,
+                    <<"t/topic-filter-example/1/#">>
+                ],
+            <<"upstream">> => <<"emqxcl_c">>
+        }
+    ].

+ 4 - 10
apps/emqx_cluster_link/src/emqx_cluster_link_app.erl

@@ -13,7 +13,7 @@
 start(_StartType, _StartArgs) ->
 start(_StartType, _StartArgs) ->
     ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()),
     ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()),
     emqx_cluster_link_config:add_handler(),
     emqx_cluster_link_config:add_handler(),
-    LinksConf = enabled_links(),
+    LinksConf = emqx_cluster_link_config:enabled_links(),
     _ =
     _ =
         case LinksConf of
         case LinksConf of
             [_ | _] ->
             [_ | _] ->
@@ -32,19 +32,13 @@ prep_stop(State) ->
 stop(_State) ->
 stop(_State) ->
     _ = emqx_cluster_link:delete_hook(),
     _ = emqx_cluster_link:delete_hook(),
     _ = emqx_cluster_link:unregister_external_broker(),
     _ = emqx_cluster_link:unregister_external_broker(),
-    _ = stop_msg_fwd_resources(emqx_cluster_link_config:links()),
+    _ = remove_msg_fwd_resources(emqx_cluster_link_config:links()),
     ok.
     ok.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-enabled_links() ->
-    lists:filter(
-        fun(#{enable := IsEnabled}) -> IsEnabled =:= true end,
-        emqx_cluster_link_config:links()
-    ).
-
 start_msg_fwd_resources(LinksConf) ->
 start_msg_fwd_resources(LinksConf) ->
     lists:foreach(
     lists:foreach(
         fun(LinkConf) ->
         fun(LinkConf) ->
@@ -53,10 +47,10 @@ start_msg_fwd_resources(LinksConf) ->
         LinksConf
         LinksConf
     ).
     ).
 
 
-stop_msg_fwd_resources(LinksConf) ->
+remove_msg_fwd_resources(LinksConf) ->
     lists:foreach(
     lists:foreach(
         fun(#{upstream := Name}) ->
         fun(#{upstream := Name}) ->
-            emqx_cluster_link_mqtt:stop_msg_fwd_resource(Name)
+            emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name)
         end,
         end,
         LinksConf
         LinksConf
     ).
     ).

+ 94 - 40
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -21,6 +21,7 @@
 
 
 -export([
 -export([
     %% General
     %% General
+    update/1,
     cluster/0,
     cluster/0,
     enabled_links/0,
     enabled_links/0,
     links/0,
     links/0,
@@ -47,6 +48,20 @@
 
 
 %%
 %%
 
 
+update(Config) ->
+    case
+        emqx_conf:update(
+            ?LINKS_PATH,
+            Config,
+            #{rawconf_with_defaults => true, override_to => cluster}
+        )
+    of
+        {ok, #{raw_config := NewConfigRows}} ->
+            {ok, NewConfigRows};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
 cluster() ->
 cluster() ->
     atom_to_binary(emqx_config:get([cluster, name])).
     atom_to_binary(emqx_config:get([cluster, name])).
 
 
@@ -83,7 +98,7 @@ actor_heartbeat_interval() ->
 %%
 %%
 
 
 mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->
 mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->
-    ClientId = maps:get(client_id, LinkConf, cluster()),
+    ClientId = maps:get(clientid, LinkConf, cluster()),
     #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
     #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
     Opts = #{
     Opts = #{
         host => Host,
         host => Host,
@@ -115,13 +130,13 @@ remove_handler() ->
 
 
 pre_config_update(?LINKS_PATH, RawConf, RawConf) ->
 pre_config_update(?LINKS_PATH, RawConf, RawConf) ->
     {ok, RawConf};
     {ok, RawConf};
-pre_config_update(?LINKS_PATH, NewRawConf, _RawConf) ->
-    {ok, convert_certs(NewRawConf)}.
+pre_config_update(?LINKS_PATH, NewRawConf, OldRawConf) ->
+    {ok, convert_certs(maybe_increment_ps_actor_incr(NewRawConf, OldRawConf))}.
 
 
 post_config_update(?LINKS_PATH, _Req, Old, Old, _AppEnvs) ->
 post_config_update(?LINKS_PATH, _Req, Old, Old, _AppEnvs) ->
     ok;
     ok;
 post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
 post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
-    ok = maybe_toggle_hook_and_provider(New),
+    ok = toggle_hook_and_broker(enabled_links(New), enabled_links(Old)),
     #{
     #{
         removed := Removed,
         removed := Removed,
         added := Added,
         added := Added,
@@ -142,22 +157,17 @@ post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-maybe_toggle_hook_and_provider(LinksConf) ->
-    case is_any_enabled(LinksConf) of
-        true ->
-            ok = emqx_cluster_link:register_external_broker(),
-            ok = emqx_cluster_link:put_hook();
-        false ->
-            _ = emqx_cluster_link:delete_hook(),
-            _ = emqx_cluster_link:unregister_external_broker(),
-            ok
-    end.
+toggle_hook_and_broker([_ | _] = _NewEnabledLinks, [] = _OldEnabledLinks) ->
+    ok = emqx_cluster_link:register_external_broker(),
+    ok = emqx_cluster_link:put_hook();
+toggle_hook_and_broker([] = _NewEnabledLinks, _OldLinks) ->
+    ok = emqx_cluster_link:unregister_external_broker(),
+    ok = emqx_cluster_link:delete_hook();
+toggle_hook_and_broker(_, _) ->
+    ok.
 
 
-is_any_enabled(LinksConf) ->
-    lists:any(
-        fun(#{enable := IsEnabled}) -> IsEnabled =:= true end,
-        LinksConf
-    ).
+enabled_links(LinksConf) ->
+    [L || #{enable := true} = L <- LinksConf].
 
 
 all_ok(Results) ->
 all_ok(Results) ->
     lists:all(
     lists:all(
@@ -172,42 +182,86 @@ all_ok(Results) ->
 add_links(LinksConf) ->
 add_links(LinksConf) ->
     [add_link(Link) || Link <- LinksConf].
     [add_link(Link) || Link <- LinksConf].
 
 
-add_link(#{enabled := true} = LinkConf) ->
-    %% NOTE: this can be started later during init_link phase, but it looks not harmful to start it beforehand...
-    MsgFwdRes = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
-    %% TODO
-    ActorRes = ok,
-    combine_results(ActorRes, MsgFwdRes);
+add_link(#{enable := true} = LinkConf) ->
+    {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf),
+    {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
+    ok;
 add_link(_DisabledLinkConf) ->
 add_link(_DisabledLinkConf) ->
     ok.
     ok.
 
 
 remove_links(LinksConf) ->
 remove_links(LinksConf) ->
-    [remove_link(Link) || Link <- LinksConf].
+    [remove_link(Name) || #{upstream := Name} <- LinksConf].
 
 
-remove_link(_LinkConf) ->
-    %% TODO
-    ok.
+remove_link(Name) ->
+    _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
+    ensure_actor_stopped(Name).
 
 
 update_links(LinksConf) ->
 update_links(LinksConf) ->
     [update_link(Link) || Link <- LinksConf].
     [update_link(Link) || Link <- LinksConf].
 
 
-update_link(#{enabled := true} = LinkConf) ->
-    _ = remove_link(LinkConf),
-    add_link(LinkConf);
-update_link(#{enabled := false} = LinkConf) ->
-    case remove_link(LinkConf) of
-        {error, not_found} -> ok;
-        Other -> Other
-    end.
+update_link({OldLinkConf, #{enable := true, upstream := Name} = NewLinkConf}) ->
+    _ = ensure_actor_stopped(Name),
+    {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(NewLinkConf),
+    %% TODO: if only msg_fwd resource related config is changed,
+    %% we can skip actor reincarnation/restart.
+    ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf),
+    ok;
+update_link({_OldLinkConf, #{enable := false, upstream := Name} = _NewLinkConf}) ->
+    _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
+    ensure_actor_stopped(Name).
 
 
-combine_results(ok, ok) ->
+update_msg_fwd_resource(#{pool_size := Old}, #{pool_size := Old} = NewConf) ->
+    {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf),
     ok;
     ok;
-combine_results(CoordRes, MsgFwdRes) ->
-    {error, #{coordinator => CoordRes, msg_fwd_resource => MsgFwdRes}}.
+update_msg_fwd_resource(_, #{upstream := Name} = NewConf) ->
+    _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name),
+    {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(NewConf),
+    ok.
+
+ensure_actor_stopped(ClusterName) ->
+    emqx_cluster_link_sup:ensure_actor_stopped(ClusterName).
 
 
 upstream_name(#{upstream := N}) -> N;
 upstream_name(#{upstream := N}) -> N;
 upstream_name(#{<<"upstream">> := N}) -> N.
 upstream_name(#{<<"upstream">> := N}) -> N.
 
 
+maybe_increment_ps_actor_incr(New, Old) ->
+    case emqx_persistent_message:is_persistence_enabled() of
+        true ->
+            %% TODO: what if a link was removed and then added again?
+            %% Assume that incarnation was 0 when the link was removed
+            %% and now it's also 0 (a default value for new actor).
+            %% If persistent routing state changed during this link absence
+            %% and remote GC has not started before ps actor restart (with the same incarnation),
+            %% then some old (stale) external ps routes may be never cleaned on the remote side.
+            %% No (permanent) message loss is expected, as new actor incrantaion will re-bootstrap.
+            %% Similarly, irrelevant messages will be filtered out at receiving end, so
+            %% the main risk is having some stale routes unreachable for GC...
+            #{changed := Changed} = emqx_utils:diff_lists(New, Old, fun upstream_name/1),
+            ChangedNames = [upstream_name(C) || {_, C} <- Changed],
+            lists:foldr(
+                fun(LConf, Acc) ->
+                    case lists:member(upstream_name(LConf), ChangedNames) of
+                        true -> [increment_ps_actor_incr(LConf) | Acc];
+                        false -> [LConf | Acc]
+                    end
+                end,
+                [],
+                New
+            );
+        false ->
+            New
+    end.
+
+increment_ps_actor_incr(#{ps_actor_incarnation := Incr} = Conf) ->
+    Conf#{ps_actor_incarnation => Incr + 1};
+increment_ps_actor_incr(#{<<"ps_actor_incarnation">> := Incr} = Conf) ->
+    Conf#{<<"ps_actor_incarnation">> => Incr + 1};
+%% Default value set in schema is 0, so need to set it to 1 during the first update.
+increment_ps_actor_incr(#{<<"upstream">> := _} = Conf) ->
+    Conf#{<<"ps_actor_incarnation">> => 1};
+increment_ps_actor_incr(#{upstream := _} = Conf) ->
+    Conf#{ps_actor_incarnation => 1}.
+
 convert_certs(LinksConf) ->
 convert_certs(LinksConf) ->
     lists:map(
     lists:map(
         fun
         fun

+ 10 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -22,7 +22,8 @@
     actor_apply_operation/2,
     actor_apply_operation/2,
     actor_apply_operation/3,
     actor_apply_operation/3,
     actor_gc/1,
     actor_gc/1,
-    is_present_incarnation/1
+    is_present_incarnation/1,
+    list_actors/1
 ]).
 ]).
 
 
 %% Internal API
 %% Internal API
@@ -167,6 +168,14 @@ is_present_incarnation(#state{extra = #{is_present_incarnation := IsNew}}) ->
 is_present_incarnation(_State) ->
 is_present_incarnation(_State) ->
     false.
     false.
 
 
+-spec list_actors(cluster()) -> [#{actor := actor(), incarnation := incarnation()}].
+list_actors(Cluster) ->
+    Matches = ets:match(
+        emqx_external_router_actor,
+        #actor{id = {Cluster, '$1'}, incarnation = '$2', _ = '_'}
+    ),
+    [#{actor => Actor, incarnation => Incr} || [Actor, Incr] <- Matches].
+
 mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
 mnesia_actor_init(Cluster, Actor, Incarnation, TS) ->
     %% NOTE
     %% NOTE
     %% We perform this heavy-weight transaction only in the case of a new route
     %% We perform this heavy-weight transaction only in the case of a new route

+ 16 - 10
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -28,7 +28,7 @@
 
 
 -export([
 -export([
     ensure_msg_fwd_resource/1,
     ensure_msg_fwd_resource/1,
-    stop_msg_fwd_resource/1,
+    remove_msg_fwd_resource/1,
     decode_route_op/1,
     decode_route_op/1,
     decode_forwarded_msg/1,
     decode_forwarded_msg/1,
     decode_resp/1
     decode_resp/1
@@ -80,6 +80,15 @@
 
 
 -define(PUB_TIMEOUT, 10_000).
 -define(PUB_TIMEOUT, 10_000).
 
 
+-spec ensure_msg_fwd_resource(binary() | map()) ->
+    {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
+ensure_msg_fwd_resource(ClusterName) when is_binary(ClusterName) ->
+    case emqx_cluster_link_config:link(ClusterName) of
+        #{} = Conf ->
+            ensure_msg_fwd_resource(Conf);
+        undefined ->
+            {error, link_config_not_found}
+    end;
 ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) ->
 ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) ->
     ResConf = #{
     ResConf = #{
         query_mode => async,
         query_mode => async,
@@ -91,8 +100,9 @@ ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf
     },
     },
     emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResConf).
     emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResConf).
 
 
-stop_msg_fwd_resource(ClusterName) ->
-    emqx_resource:stop(?MSG_RES_ID(ClusterName)).
+-spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}.
+remove_msg_fwd_resource(ClusterName) ->
+    emqx_resource:remove_local(?MSG_RES_ID(ClusterName)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% emqx_resource callbacks (message forwarding)
 %% emqx_resource callbacks (message forwarding)
@@ -247,9 +257,9 @@ combine_status(Statuses) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 connect(Options) ->
 connect(Options) ->
-    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    WorkerIdBin = integer_to_binary(proplists:get_value(ecpool_worker_id, Options)),
     #{clientid := ClientId} = ClientOpts = proplists:get_value(client_opts, Options),
     #{clientid := ClientId} = ClientOpts = proplists:get_value(client_opts, Options),
-    ClientId1 = emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId),
+    ClientId1 = <<ClientId/binary, ":", WorkerIdBin/binary>>,
     ClientOpts1 = ClientOpts#{clientid => ClientId1},
     ClientOpts1 = ClientOpts#{clientid => ClientId1},
     case emqtt:start_link(ClientOpts1) of
     case emqtt:start_link(ClientOpts1) of
         {ok, Pid} ->
         {ok, Pid} ->
@@ -369,11 +379,7 @@ decode_route_op1(#{
 }) ->
 }) ->
     {heartbeat, #{actor => Actor, incarnation => Incr}};
     {heartbeat, #{actor => Actor, incarnation => Incr}};
 decode_route_op1(Payload) ->
 decode_route_op1(Payload) ->
-    ?SLOG(warning, #{
-        msg => "unexpected_cluster_link_route_op_payload",
-        payload => Payload
-    }),
-    {error, Payload}.
+    {error, {unknown_payload, Payload}}.
 
 
 decode_resp1(#{
 decode_resp1(#{
     ?F_OPERATION := ?OP_ACTOR_INIT_ACK,
     ?F_OPERATION := ?OP_ACTOR_INIT_ACK,

+ 2 - 3
apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl

@@ -10,7 +10,7 @@
 -include("emqx_cluster_link.hrl").
 -include("emqx_cluster_link.hrl").
 
 
 -export([
 -export([
-    init/2,
+    init/3,
     next_batch/1
     next_batch/1
 ]).
 ]).
 
 
@@ -27,8 +27,7 @@
 
 
 %%
 %%
 
 
-init(TargetCluster, Options) ->
-    LinkFilters = emqx_cluster_link_config:topic_filters(TargetCluster),
+init(TargetCluster, LinkFilters, Options) ->
     {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters),
     {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters),
     IsPersistentRoute = maps:get(is_persistent_route, Options, false),
     IsPersistentRoute = maps:get(is_persistent_route, Options, false),
     #bootstrap{
     #bootstrap{

+ 43 - 52
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -47,6 +47,7 @@
 -define(SYNCER_NAME(Cluster), ?NAME(Cluster, syncer)).
 -define(SYNCER_NAME(Cluster), ?NAME(Cluster, syncer)).
 -define(SYNCER_REF(Cluster), {via, gproc, ?SYNCER_NAME(Cluster)}).
 -define(SYNCER_REF(Cluster), {via, gproc, ?SYNCER_NAME(Cluster)}).
 -define(ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, actor)}).
 -define(ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, actor)}).
+-define(ACTOR_NAME(Cluster), ?NAME(Cluster, actor)).
 
 
 -define(MAX_BATCH_SIZE, 4000).
 -define(MAX_BATCH_SIZE, 4000).
 -define(MIN_SYNC_INTERVAL, 10).
 -define(MIN_SYNC_INTERVAL, 10).
@@ -63,8 +64,8 @@
 %% but it must be tolerable, since persistent route destination is a client ID,
 %% but it must be tolerable, since persistent route destination is a client ID,
 %% which is unique cluster-wide.
 %% which is unique cluster-wide.
 -define(PS_ACTOR, <<"ps-routes-v1">>).
 -define(PS_ACTOR, <<"ps-routes-v1">>).
--define(PS_INCARNATION, 0).
 -define(PS_ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, ps_actor)}).
 -define(PS_ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, ps_actor)}).
+-define(PS_ACTOR_NAME(Cluster), ?NAME(Cluster, ps_actor)).
 -define(PS_CLIENT_NAME(Cluster), ?NAME(Cluster, ps_client)).
 -define(PS_CLIENT_NAME(Cluster), ?NAME(Cluster, ps_client)).
 -define(PS_SYNCER_REF(Cluster), {via, gproc, ?PS_SYNCER_NAME(Cluster)}).
 -define(PS_SYNCER_REF(Cluster), {via, gproc, ?PS_SYNCER_NAME(Cluster)}).
 -define(PS_SYNCER_NAME(Cluster), ?NAME(Cluster, ps_syncer)).
 -define(PS_SYNCER_NAME(Cluster), ?NAME(Cluster, ps_syncer)).
@@ -102,43 +103,30 @@ do_push(SyncerName, OpName, Topic, ID) ->
 %% 1. Actor + MQTT Client
 %% 1. Actor + MQTT Client
 %% 2. Syncer
 %% 2. Syncer
 
 
-start_link(TargetCluster) ->
-    supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, TargetCluster}).
+start_link(#{upstream := TargetCluster} = LinkConf) ->
+    supervisor:start_link(?REF(TargetCluster), ?MODULE, {sup, LinkConf}).
 
 
 %% Actor
 %% Actor
 
 
-start_link_actor(ActorRef, Actor, Incarnation, TargetCluster) ->
+new_incarnation() ->
+    %% TODO: Subject to clock skew, need something more robust.
+    erlang:system_time(millisecond).
+
+start_link_actor(ActorRef, Actor, Incarnation, LinkConf) ->
     gen_server:start_link(
     gen_server:start_link(
         ActorRef,
         ActorRef,
         ?MODULE,
         ?MODULE,
-        {actor, mk_state(TargetCluster, Actor, Incarnation)},
+        {actor, mk_state(LinkConf, Actor, Incarnation)},
         []
         []
     ).
     ).
 
 
 get_actor_id() ->
 get_actor_id() ->
     atom_to_binary(node()).
     atom_to_binary(node()).
 
 
-get_actor_incarnation() ->
-    persistent_term:get({?MODULE, incarnation}).
-
-set_actor_incarnation(Incarnation) ->
-    ok = persistent_term:put({?MODULE, incarnation}, Incarnation),
-    Incarnation.
-
-ensure_actor_incarnation() ->
-    try
-        get_actor_incarnation()
-    catch
-        error:badarg ->
-            %% TODO: Subject to clock skew, need something more robust.
-            Incarnation = erlang:system_time(millisecond),
-            set_actor_incarnation(Incarnation)
-    end.
-
 %% MQTT Client
 %% MQTT Client
 
 
-start_link_client(TargetCluster, Actor) ->
-    Options = emqx_cluster_link_config:emqtt_options(TargetCluster),
+start_link_client(Actor, LinkConf) ->
+    Options = emqx_cluster_link_config:mk_emqtt_options(LinkConf),
     case emqtt:start_link(refine_client_options(Options, Actor)) of
     case emqtt:start_link(refine_client_options(Options, Actor)) of
         {ok, Pid} ->
         {ok, Pid} ->
             case emqtt:connect(Pid) of
             case emqtt:connect(Pid) of
@@ -245,7 +233,7 @@ batch_get_opname(Op) ->
 
 
 %%
 %%
 
 
-init({sup, TargetCluster}) ->
+init({sup, LinkConf}) ->
     %% FIXME: Intensity.
     %% FIXME: Intensity.
     SupFlags = #{
     SupFlags = #{
         %% TODO: one_for_one?
         %% TODO: one_for_one?
@@ -254,24 +242,24 @@ init({sup, TargetCluster}) ->
         period => 60
         period => 60
     },
     },
     Children = lists:append([
     Children = lists:append([
-        [child_spec(actor, TargetCluster)],
-        [child_spec(ps_actor, TargetCluster) || emqx_persistent_message:is_persistence_enabled()]
+        [child_spec(actor, LinkConf)],
+        [child_spec(ps_actor, LinkConf) || emqx_persistent_message:is_persistence_enabled()]
     ]),
     ]),
     {ok, {SupFlags, Children}};
     {ok, {SupFlags, Children}};
 init({actor, State}) ->
 init({actor, State}) ->
     init_actor(State).
     init_actor(State).
 
 
-child_spec(actor, TargetCluster) ->
+child_spec(actor, #{upstream := TargetCluster} = LinkConf) ->
     %% Actor process.
     %% Actor process.
     %% Wraps MQTT Client process.
     %% Wraps MQTT Client process.
     %% ClientID: `mycluster:emqx1@emqx.local:routesync`
     %% ClientID: `mycluster:emqx1@emqx.local:routesync`
     %% Occasional TCP/MQTT-level disconnects are expected, and should be handled
     %% Occasional TCP/MQTT-level disconnects are expected, and should be handled
     %% gracefully.
     %% gracefully.
     Actor = get_actor_id(),
     Actor = get_actor_id(),
-    Incarnation = ensure_actor_incarnation(),
-    actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, TargetCluster);
-child_spec(ps_actor, TargetCluster) ->
-    actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, ?PS_INCARNATION, TargetCluster).
+    Incarnation = new_incarnation(),
+    actor_spec(actor, ?ACTOR_REF(TargetCluster), Actor, Incarnation, LinkConf);
+child_spec(ps_actor, #{upstream := TargetCluster, ps_actor_incarnation := Incr} = LinkConf) ->
+    actor_spec(ps_actor, ?PS_ACTOR_REF(TargetCluster), ?PS_ACTOR, Incr, LinkConf).
 
 
 child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) ->
 child_spec(syncer, ?PS_ACTOR, Incarnation, TargetCluster) ->
     SyncerRef = ?PS_SYNCER_REF(TargetCluster),
     SyncerRef = ?PS_SYNCER_REF(TargetCluster),
@@ -286,10 +274,10 @@ child_spec(syncer, Actor, Incarnation, TargetCluster) ->
     ClientName = ?CLIENT_NAME(TargetCluster),
     ClientName = ?CLIENT_NAME(TargetCluster),
     syncer_spec(syncer, Actor, Incarnation, SyncerRef, ClientName).
     syncer_spec(syncer, Actor, Incarnation, SyncerRef, ClientName).
 
 
-actor_spec(ChildID, ActorRef, Actor, Incarnation, TargetCluster) ->
+actor_spec(ChildID, ActorRef, Actor, Incarnation, LinkConf) ->
     #{
     #{
         id => ChildID,
         id => ChildID,
-        start => {?MODULE, start_link_actor, [ActorRef, Actor, Incarnation, TargetCluster]},
+        start => {?MODULE, start_link_actor, [ActorRef, Actor, Incarnation, LinkConf]},
         restart => permanent,
         restart => permanent,
         type => worker
         type => worker
     }.
     }.
@@ -308,7 +296,7 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
     target :: binary(),
     target :: binary(),
     actor :: binary(),
     actor :: binary(),
     incarnation :: non_neg_integer(),
     incarnation :: non_neg_integer(),
-    client :: {pid(), reference()},
+    client :: {pid(), reference()} | undefined,
     bootstrapped :: boolean(),
     bootstrapped :: boolean(),
     reconnect_timer :: reference(),
     reconnect_timer :: reference(),
     heartbeat_timer :: reference(),
     heartbeat_timer :: reference(),
@@ -316,30 +304,31 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
     actor_init_timer :: reference(),
     actor_init_timer :: reference(),
     remote_actor_info :: undefined | map(),
     remote_actor_info :: undefined | map(),
     status :: connecting | connected | disconnected,
     status :: connecting | connected | disconnected,
-    error :: undefined | term()
+    error :: undefined | term(),
+    link_conf :: map()
 }).
 }).
 
 
-mk_state(TargetCluster, Actor, Incarnation) ->
+mk_state(#{upstream := TargetCluster} = LinkConf, Actor, Incarnation) ->
     #st{
     #st{
         target = TargetCluster,
         target = TargetCluster,
         actor = Actor,
         actor = Actor,
         incarnation = Incarnation,
         incarnation = Incarnation,
         bootstrapped = false,
         bootstrapped = false,
-        status = connecting
+        status = connecting,
+        link_conf = LinkConf
     }.
     }.
 
 
 init_actor(State = #st{}) ->
 init_actor(State = #st{}) ->
     _ = erlang:process_flag(trap_exit, true),
     _ = erlang:process_flag(trap_exit, true),
     {ok, State, {continue, connect}}.
     {ok, State, {continue, connect}}.
 
 
-handle_continue(connect, State) ->
-    {noreply, process_connect(State)}.
-
-handle_call(_Request, _From, State) ->
-    {reply, ignored, State}.
+handle_continue(connect, St) ->
+    {noreply, process_connect(St)}.
+handle_call(_Request, _From, St) ->
+    {reply, ignored, St}.
 
 
-handle_cast(_Request, State) ->
-    {noreply, State}.
+handle_cast(_Request, St) ->
+    {noreply, St}.
 
 
 handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) ->
 handle_info({'EXIT', ClientPid, Reason}, St = #st{client = ClientPid}) ->
     {noreply, handle_client_down(Reason, St)};
     {noreply, handle_client_down(Reason, St)};
@@ -396,8 +385,8 @@ handle_info(Info, St) ->
 terminate(_Reason, _State) ->
 terminate(_Reason, _State) ->
     ok.
     ok.
 
 
-process_connect(St = #st{target = TargetCluster, actor = Actor}) ->
-    case start_link_client(TargetCluster, Actor) of
+process_connect(St = #st{target = TargetCluster, actor = Actor, link_conf = Conf}) ->
+    case start_link_client(Actor, Conf) of
         {ok, ClientPid} ->
         {ok, ClientPid} ->
             _ = maybe_deactivate_alarm(St),
             _ = maybe_deactivate_alarm(St),
             ok = announce_client(Actor, TargetCluster, ClientPid),
             ok = announce_client(Actor, TargetCluster, ClientPid),
@@ -498,17 +487,17 @@ cancel_heartbeat(St = #st{heartbeat_timer = TRef}) ->
 %% is re-established with a clean session. Once bootstrapping is done, it
 %% is re-established with a clean session. Once bootstrapping is done, it
 %% opens the syncer.
 %% opens the syncer.
 
 
-run_bootstrap(St = #st{target = TargetCluster, actor = ?PS_ACTOR}) ->
+run_bootstrap(St = #st{target = TargetCluster, actor = ?PS_ACTOR, link_conf = #{topics := Topics}}) ->
     case mria_config:whoami() of
     case mria_config:whoami() of
         Role when Role /= replicant ->
         Role when Role /= replicant ->
             Opts = #{is_persistent_route => true},
             Opts = #{is_persistent_route => true},
-            Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Opts),
+            Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Topics, Opts),
             run_bootstrap(Bootstrap, St);
             run_bootstrap(Bootstrap, St);
         _ ->
         _ ->
             process_bootstrapped(St)
             process_bootstrapped(St)
     end;
     end;
-run_bootstrap(St = #st{target = TargetCluster}) ->
-    Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, #{}),
+run_bootstrap(St = #st{target = TargetCluster, link_conf = #{topics := Topics}}) ->
+    Bootstrap = emqx_cluster_link_router_bootstrap:init(TargetCluster, Topics, #{}),
     run_bootstrap(Bootstrap, St).
     run_bootstrap(Bootstrap, St).
 
 
 run_bootstrap(Bootstrap, St) ->
 run_bootstrap(Bootstrap, St) ->
@@ -527,7 +516,9 @@ run_bootstrap(Bootstrap, St) ->
             end
             end
     end.
     end.
 
 
-process_bootstrapped(St = #st{target = TargetCluster, actor = Actor}) ->
+process_bootstrapped(
+    St = #st{target = TargetCluster, actor = Actor}
+) ->
     ok = open_syncer(TargetCluster, Actor),
     ok = open_syncer(TargetCluster, Actor),
     St#st{bootstrapped = true}.
     St#st{bootstrapped = true}.
 
 

+ 13 - 10
apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl

@@ -11,6 +11,9 @@
 
 
 -export([injected_fields/0]).
 -export([injected_fields/0]).
 
 
+%% Used in emqx_cluster_link_api
+-export([links_schema/1]).
+
 -export([
 -export([
     roots/0,
     roots/0,
     fields/1,
     fields/1,
@@ -27,14 +30,14 @@ roots() -> [].
 injected_fields() ->
 injected_fields() ->
     #{cluster => fields("cluster_linking")}.
     #{cluster => fields("cluster_linking")}.
 
 
+links_schema(Meta) ->
+    ?HOCON(?ARRAY(?R_REF("link")), Meta#{default => [], validator => fun links_validator/1}).
+
 fields("cluster_linking") ->
 fields("cluster_linking") ->
-    [
-        {links,
-            ?HOCON(?ARRAY(?R_REF("link")), #{default => [], validator => fun links_validator/1})}
-    ];
+    [{links, links_schema(#{})}];
 fields("link") ->
 fields("link") ->
     [
     [
-        {enable, ?HOCON(boolean(), #{default => false})},
+        {enable, ?HOCON(boolean(), #{default => true})},
         {upstream, ?HOCON(binary(), #{required => true})},
         {upstream, ?HOCON(binary(), #{required => true})},
         {server,
         {server,
             emqx_schema:servers_sc(#{required => true, desc => ?DESC("server")}, ?MQTT_HOST_OPTS)},
             emqx_schema:servers_sc(#{required => true, desc => ?DESC("server")}, ?MQTT_HOST_OPTS)},
@@ -46,13 +49,13 @@ fields("link") ->
             default => #{<<"enable">> => false},
             default => #{<<"enable">> => false},
             desc => ?DESC("ssl")
             desc => ?DESC("ssl")
         }},
         }},
-        %% TODO: validate topics:
-        %% - basic topic validation
-        %% - non-overlapping (not intersecting) filters?
-        %%  (this may be not required, depends on config update implementation)
         {topics,
         {topics,
             ?HOCON(?ARRAY(binary()), #{required => true, validator => fun topics_validator/1})},
             ?HOCON(?ARRAY(binary()), #{required => true, validator => fun topics_validator/1})},
-        {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})}
+        {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})},
+        %% Must not be configured manually. The value is incremented by cluster link config handler
+        %% and is used as a globally synchronized sequence to ensure persistent routes actors have
+        %% the same next incarnation after each config change.
+        {ps_actor_incarnation, ?HOCON(integer(), #{default => 0, importance => ?IMPORTANCE_HIDDEN})}
     ].
     ].
 
 
 desc(_) ->
 desc(_) ->

+ 27 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl

@@ -8,9 +8,15 @@
 
 
 -export([start_link/1]).
 -export([start_link/1]).
 
 
+-export([
+    ensure_actor/1,
+    ensure_actor_stopped/1
+]).
+
 -export([init/1]).
 -export([init/1]).
 
 
 -define(SERVER, ?MODULE).
 -define(SERVER, ?MODULE).
+-define(ACTOR_MODULE, emqx_cluster_link_router_syncer).
 
 
 start_link(LinksConf) ->
 start_link(LinksConf) ->
     supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf).
     supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf).
@@ -23,8 +29,8 @@ init(LinksConf) ->
     },
     },
     ExtrouterGC = extrouter_gc_spec(),
     ExtrouterGC = extrouter_gc_spec(),
     RouteActors = [
     RouteActors = [
-        sup_spec(Name, emqx_cluster_link_router_syncer, [Name])
-     || #{upstream := Name} <- LinksConf
+        sup_spec(Name, ?ACTOR_MODULE, [LinkConf])
+     || #{upstream := Name} = LinkConf <- LinksConf
     ],
     ],
     {ok, {SupFlags, [ExtrouterGC | RouteActors]}}.
     {ok, {SupFlags, [ExtrouterGC | RouteActors]}}.
 
 
@@ -46,3 +52,22 @@ sup_spec(Id, Mod, Args) ->
         type => supervisor,
         type => supervisor,
         modules => [Mod]
         modules => [Mod]
     }.
     }.
+
+ensure_actor(#{upstream := Name} = LinkConf) ->
+    case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of
+        {ok, Pid} ->
+            {ok, Pid};
+        {error, {already_started, Pid}} ->
+            {ok, Pid};
+        Err ->
+            Err
+    end.
+
+ensure_actor_stopped(ClusterName) ->
+    case supervisor:terminate_child(?MODULE, ClusterName) of
+        ok ->
+            _ = supervisor:delete_child(?MODULE, ClusterName),
+            ok;
+        {error, not_found} ->
+            ok
+    end.