Просмотр исходного кода

Merge pull request #14004 from keynslug/fix/EMQX-13229/clink-redundant-topics

fix(cluster-link): correctly replicate overlapping route intersections
Andrew Mayorov 1 год назад
Родитель
Сommit
8104c2b0f1

+ 57 - 16
apps/emqx/src/emqx_topic.erl

@@ -34,7 +34,9 @@
     systop/1,
     parse/1,
     parse/2,
-    intersection/2
+    intersection/2,
+    is_subset/2,
+    union/1
 ]).
 
 -export([
@@ -83,22 +85,34 @@ match(<<$$, _/binary>>, <<$+, _/binary>>) ->
 match(<<$$, _/binary>>, <<$#, _/binary>>) ->
     false;
 match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
-    match(words(Name), words(Filter));
+    match_words(words(Name), words(Filter));
 match(#share{} = Name, Filter) ->
     match_share(Name, Filter);
 match(Name, #share{} = Filter) ->
     match_share(Name, Filter);
-match([], []) ->
+match(Name, Filter) when is_binary(Name) ->
+    match_words(words(Name), Filter);
+match(Name, Filter) when is_binary(Filter) ->
+    match_words(Name, words(Filter));
+match(Name, Filter) ->
+    match_words(Name, Filter).
+
+match_words([<<$$, _/binary>> | _], [W | _]) when ?IS_WILDCARD(W) ->
+    false;
+match_words(Name, Filter) ->
+    match_tokens(Name, Filter).
+
+match_tokens([], []) ->
     true;
-match([H | T1], [H | T2]) ->
-    match(T1, T2);
-match([_H | T1], ['+' | T2]) ->
-    match(T1, T2);
-match([<<>> | T1], ['' | T2]) ->
-    match(T1, T2);
-match(_, ['#']) ->
+match_tokens([H | T1], [H | T2]) ->
+    match_tokens(T1, T2);
+match_tokens([_H | T1], ['+' | T2]) ->
+    match_tokens(T1, T2);
+match_tokens([<<>> | T1], ['' | T2]) ->
+    match_tokens(T1, T2);
+match_tokens(_, ['#']) ->
     true;
-match(_, _) ->
+match_tokens(_, _) ->
     false.
 
 %% @doc Finds an intersection between two topics, two filters or a topic and a filter.
@@ -108,10 +122,14 @@ match(_, _) ->
 %% The intersection of two filters is either false or a new topic filter that would match only those topics,
 %% that can be matched by both input filters.
 %% For example, the intersection of "t/global/#" and "t/+/1/+" is "t/global/1/+".
--spec intersection(TopicOrFilter, TopicOrFilter) -> TopicOrFilter | false when
-    TopicOrFilter :: emqx_types:topic().
-intersection(Topic1, Topic2) when is_binary(Topic1), is_binary(Topic2) ->
-    case intersect_start(words(Topic1), words(Topic2)) of
+-spec intersection(TopicOrFilter, TopicOrFilter) -> topic() | false when
+    TopicOrFilter :: topic() | words().
+intersection(Topic1, Topic2) when is_binary(Topic1) ->
+    intersection(words(Topic1), Topic2);
+intersection(Topic1, Topic2) when is_binary(Topic2) ->
+    intersection(Topic1, words(Topic2));
+intersection(Topic1, Topic2) ->
+    case intersect_start(Topic1, Topic2) of
         false -> false;
         Intersection -> join(Intersection)
     end.
@@ -150,6 +168,29 @@ intersect_join(W, Words) -> [W | Words].
 wildcard_intersection(W, W) -> W;
 wildcard_intersection(_, _) -> '+'.
 
+%% @doc Finds out if topic / topic filter T1 is a subset of topic / topic filter T2.
+-spec is_subset(topic() | words(), topic() | words()) -> boolean().
+is_subset(T1, T1) ->
+    true;
+is_subset(T1, T2) when is_binary(T1) ->
+    intersection(T1, T2) =:= T1;
+is_subset(T1, T2) ->
+    intersection(T1, T2) =:= join(T1).
+
+%% @doc Compute the smallest set of topics / topic filters that contain (have as a
+%% subset) each given topic / topic filter.
+%% Resulting set is not optimal, i.e. it's still possible to have a pair of topic
+%% filters with non-empty intersection.
+-spec union(_Set :: [topic() | words()]) -> [topic() | words()].
+union([Filter | Filters]) ->
+    %% Drop filters completely covered by `Filter`.
+    Disjoint = [F || F <- Filters, not is_subset(F, Filter)],
+    %% Drop `Filter` if completely covered by another filter.
+    Head = [Filter || not lists:any(fun(F) -> is_subset(Filter, F) end, Disjoint)],
+    Head ++ union(Disjoint);
+union([]) ->
+    [].
+
 -spec match_share(Name, Filter) -> boolean() when
     Name :: share(),
     Filter :: topic() | share().
@@ -278,7 +319,7 @@ tokens(Topic) ->
     binary:split(Topic, <<"/">>, [global]).
 
 %% @doc Split Topic Path to Words
--spec words(topic()) -> words().
+-spec words(topic() | share()) -> words().
 words(#share{topic = Topic}) when is_binary(Topic) ->
     words(Topic);
 words(Topic) when is_binary(Topic) ->

+ 26 - 0
apps/emqx/test/emqx_topic_SUITE.erl

@@ -186,6 +186,32 @@ t_sys_intersect(_) ->
     false = intersection(<<"$SYS/broker">>, <<"+/+">>),
     false = intersection(<<"$SYS/broker">>, <<"#">>).
 
+t_union(_) ->
+    Union = fun(Topics) -> lists:sort(emqx_topic:union(Topics)) end,
+    ?assertEqual([], Union([])),
+    ?assertEqual([<<"t/1/2/+">>], Union([<<"t/1/2/+">>])),
+    ?assertEqual([<<"t/1/2/#">>], Union([<<"t/1/2/+">>, <<"t/1/2/#">>])),
+    ?assertEqual([<<"t/+/1/#">>, <<"t/1/+/#">>], Union([<<"t/+/1/#">>, <<"t/1/+/#">>])),
+    ?assertEqual(
+        [<<"g/1">>, <<"t/+/+/#">>, <<"t/1">>],
+        Union([<<"t/1">>, <<"t/1">>, <<"t/2/+">>, <<"t/+/+/#">>, <<"t/+/+">>, <<"g/1">>])
+    ),
+    ?assertEqual(
+        [<<"#">>],
+        Union([<<"t/1">>, <<"t/1">>, <<"t/2/+">>, <<"t/+/+/#">>, <<"#">>, <<"g/1">>])
+    ).
+
+t_union_sys(_) ->
+    Union = fun(Topics) -> lists:sort(emqx_topic:union(Topics)) end,
+    ?assertEqual(
+        [<<"$SYS/cluster">>, <<"+/#">>],
+        Union([<<"+/#">>, <<"t/1/2/#">>, <<"$SYS/cluster">>])
+    ),
+    ?assertEqual(
+        [<<"$SYS/+">>, <<"$SYS/cluster/#">>, <<"+/#">>],
+        Union([<<"+/#">>, <<"t/1/2">>, <<"$SYS/cluster">>, <<"$SYS/cluster/#">>, <<"$SYS/+">>])
+    ).
+
 t_validate(_) ->
     true = validate(<<"a/+/#">>),
     true = validate(<<"a/b/c/d">>),

+ 14 - 36
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -59,32 +59,34 @@ unregister_external_broker() ->
 %% There is no need to push Node name as this info can be derived from
 %% agent state on the remote cluster.
 add_route(Topic) ->
-    maybe_push_route_op(add, Topic, Topic).
+    emqx_cluster_link_router:push_update(add, Topic, Topic).
 
 delete_route(Topic) ->
-    maybe_push_route_op(delete, Topic, Topic).
+    emqx_cluster_link_router:push_update(delete, Topic, Topic).
 
 add_shared_route(Topic, Group) ->
-    maybe_push_route_op(add, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
+    RouteID = ?SHARED_ROUTE_ID(Topic, Group),
+    emqx_cluster_link_router:push_update(add, Topic, RouteID).
 
 delete_shared_route(Topic, Group) ->
-    maybe_push_route_op(delete, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
+    RouteID = ?SHARED_ROUTE_ID(Topic, Group),
+    emqx_cluster_link_router:push_update(delete, Topic, RouteID).
 
 add_persistent_route(Topic, ID) ->
-    maybe_push_route_op(add, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
+    RouteID = ?PERSISTENT_ROUTE_ID(Topic, ID),
+    emqx_cluster_link_router:push_update_persistent(add, Topic, RouteID).
 
 delete_persistent_route(Topic, ID) ->
-    maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
+    RouteID = ?PERSISTENT_ROUTE_ID(Topic, ID),
+    emqx_cluster_link_router:push_update_persistent(delete, Topic, RouteID).
 
 add_persistent_shared_route(Topic, Group, ID) ->
-    maybe_push_route_op(
-        add, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
-    ).
+    RouteID = ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID),
+    emqx_cluster_link_router:push_update_persistent(add, Topic, RouteID).
 
 delete_persistent_shared_route(Topic, Group, ID) ->
-    maybe_push_route_op(
-        delete, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
-    ).
+    RouteID = ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID),
+    emqx_cluster_link_router:push_update_persistent(delete, Topic, RouteID).
 
 forward(#delivery{message = #message{extra = #{link_origin := _}}}) ->
     %% Do not forward any external messages to other links.
@@ -197,30 +199,6 @@ handle_route_op_msg(
             error
     end.
 
-maybe_push_route_op(Op, Topic, RouteID) ->
-    maybe_push_route_op(Op, Topic, RouteID, push).
-
-maybe_push_route_op(Op, Topic, RouteID, PushFun) ->
-    lists:foreach(
-        fun(#{name := Cluster, topics := LinkFilters}) ->
-            case topic_intersect_any(Topic, LinkFilters) of
-                false ->
-                    ok;
-                TopicIntersection ->
-                    emqx_cluster_link_router_syncer:PushFun(Cluster, Op, TopicIntersection, RouteID)
-            end
-        end,
-        emqx_cluster_link_config:enabled_links()
-    ).
-
-topic_intersect_any(Topic, [LinkFilter | T]) ->
-    case emqx_topic:intersection(Topic, LinkFilter) of
-        false -> topic_intersect_any(Topic, T);
-        TopicOrFilter -> TopicOrFilter
-    end;
-topic_intersect_any(_Topic, []) ->
-    false.
-
 actor_init(
     ClusterName,
     #{actor := Actor, incarnation := Incr},

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_api.erl

@@ -625,7 +625,7 @@ link_metrics_response_example() ->
     }.
 
 with_link(Name, FoundFn, NotFoundFn) ->
-    case emqx_cluster_link_config:link_raw(Name) of
+    case emqx_cluster_link_config:get_link_raw(Name) of
         undefined ->
             NotFoundFn();
         Link0 = #{} when is_function(FoundFn, 1) ->

+ 3 - 3
apps/emqx_cluster_link/src/emqx_cluster_link_app.erl

@@ -10,10 +10,10 @@
 
 start(_StartType, _StartArgs) ->
     ok = mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()),
-    emqx_cluster_link_config:add_handler(),
-    LinksConf = emqx_cluster_link_config:enabled_links(),
+    ok = emqx_cluster_link_config:load(),
     ok = emqx_cluster_link:register_external_broker(),
     ok = emqx_cluster_link:put_hook(),
+    LinksConf = emqx_cluster_link_config:get_enabled_links(),
     case LinksConf of
         [_ | _] ->
             ok = start_msg_fwd_resources(LinksConf);
@@ -25,7 +25,7 @@ start(_StartType, _StartArgs) ->
     {ok, Sup}.
 
 prep_stop(State) ->
-    emqx_cluster_link_config:remove_handler(),
+    ok = emqx_cluster_link_config:unload(),
     State.
 
 stop(_State) ->

+ 140 - 83
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -6,8 +6,6 @@
 
 -feature(maybe_expr, enable).
 
--behaviour(emqx_config_handler).
-
 -include_lib("emqx/include/logger.hrl").
 
 -define(LINKS_PATH, [cluster, links]).
@@ -29,19 +27,17 @@
 -define(ACTOR_FIELDS, [topics]).
 
 -export([
-    %% General
-    create_link/1,
-    delete_link/1,
-    update_link/1,
-    update/1,
-    cluster/0,
-    enabled_links/0,
+    %% Shortconf
     links/0,
     link/1,
-    link_raw/1,
-    topic_filters/1,
+    enabled_links/0,
+    %% Configuration
+    cluster/0,
+    get_links/0,
+    get_link/1,
+    get_enabled_links/0,
+    get_link_raw/1,
     %% Connections
-    emqtt_options/1,
     mk_emqtt_options/1,
     %% Actor Lifecycle
     actor_ttl/0,
@@ -49,16 +45,114 @@
     actor_heartbeat_interval/0
 ]).
 
+%% Managing configuration:
 -export([
-    add_handler/0,
-    remove_handler/0
+    update/1,
+    create_link/1,
+    delete_link/1,
+    update_link/1
 ]).
 
+%% Application lifecycle:
+-export([
+    load/0,
+    unload/0
+]).
+
+-behaviour(emqx_config_handler).
 -export([
     pre_config_update/3,
     post_config_update/5
 ]).
 
+%% Test exports
+-export([prepare_link/1]).
+
+-export_type([shortconf/0]).
+
+-type shortconf() :: #{
+    name := binary(),
+    enable := boolean(),
+    topics := _Union :: [emqx_types:words()]
+}.
+
+-define(PTERM(K), {?MODULE, K}).
+
+%%
+
+cluster() ->
+    atom_to_binary(emqx_config:get([cluster, name])).
+
+-spec links() -> [shortconf()].
+links() ->
+    get_shortconf(links).
+
+-spec enabled_links() -> [shortconf()].
+enabled_links() ->
+    get_shortconf(enabled).
+
+-spec link(_Name :: binary()) -> shortconf() | undefined.
+link(Name) ->
+    find_link(Name, links()).
+
+-spec get_links() -> [emqx_cluster_link_schema:link()].
+get_links() ->
+    emqx:get_config(?LINKS_PATH, []).
+
+-spec get_enabled_links() -> [emqx_cluster_link_schema:link()].
+get_enabled_links() ->
+    [L || L = #{enable := true} <- get_links()].
+
+-spec get_link(_Name :: binary()) -> emqx_cluster_link_schema:link() | undefined.
+get_link(Name) ->
+    find_link(Name, get_links()).
+
+-spec get_link_raw(_Name :: binary()) -> emqx_config:raw_config().
+get_link_raw(Name) ->
+    find_link(Name, get_links_raw()).
+
+get_links_raw() ->
+    emqx:get_raw_config(?LINKS_PATH, []).
+
+find_link(Name, Links) ->
+    case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of
+        [LinkConf | _] -> LinkConf;
+        [] -> undefined
+    end.
+
+-spec actor_ttl() -> _Milliseconds :: pos_integer().
+actor_ttl() ->
+    ?DEFAULT_ACTOR_TTL.
+
+-spec actor_gc_interval() -> _Milliseconds :: pos_integer().
+actor_gc_interval() ->
+    actor_ttl().
+
+-spec actor_heartbeat_interval() -> _Milliseconds :: pos_integer().
+actor_heartbeat_interval() ->
+    actor_ttl() div 3.
+
+%%
+
+mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->
+    ClientId = maps:get(clientid, LinkConf, cluster()),
+    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
+    Opts = maps:with([username, retry_interval, max_inflight], LinkConf),
+    Opts1 = Opts#{
+        host => Host,
+        port => Port,
+        clientid => ClientId,
+        proto_ver => v5,
+        ssl => EnableSsl,
+        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
+    },
+    with_password(Opts1, LinkConf).
+
+with_password(Opts, #{password := P} = _LinkConf) ->
+    Opts#{password => emqx_secret:unwrap(P)};
+with_password(Opts, _LinkConf) ->
+    Opts.
+
 %%
 
 create_link(LinkConfig) ->
@@ -121,76 +215,13 @@ update(Config) ->
             {error, Reason}
     end.
 
-cluster() ->
-    atom_to_binary(emqx_config:get([cluster, name])).
-
-links() ->
-    emqx:get_config(?LINKS_PATH, []).
-
-links_raw() ->
-    emqx:get_raw_config(?LINKS_PATH, []).
-
-enabled_links() ->
-    [L || L = #{enable := true} <- links()].
-
-link(Name) ->
-    find_link(Name, links()).
-
-link_raw(Name) ->
-    find_link(Name, links_raw()).
-
-find_link(Name, Links) ->
-    case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of
-        [LinkConf | _] -> LinkConf;
-        [] -> undefined
-    end.
-
-emqtt_options(LinkName) ->
-    emqx_maybe:apply(fun mk_emqtt_options/1, ?MODULE:link(LinkName)).
-
-topic_filters(LinkName) ->
-    maps:get(topics, ?MODULE:link(LinkName), []).
-
--spec actor_ttl() -> _Milliseconds :: pos_integer().
-actor_ttl() ->
-    ?DEFAULT_ACTOR_TTL.
-
--spec actor_gc_interval() -> _Milliseconds :: pos_integer().
-actor_gc_interval() ->
-    actor_ttl().
-
--spec actor_heartbeat_interval() -> _Milliseconds :: pos_integer().
-actor_heartbeat_interval() ->
-    actor_ttl() div 3.
-
-%%
-
-mk_emqtt_options(#{server := Server, ssl := #{enable := EnableSsl} = Ssl} = LinkConf) ->
-    ClientId = maps:get(clientid, LinkConf, cluster()),
-    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
-    Opts = maps:with([username, retry_interval, max_inflight], LinkConf),
-    Opts1 = Opts#{
-        host => Host,
-        port => Port,
-        clientid => ClientId,
-        proto_ver => v5,
-        ssl => EnableSsl,
-        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
-    },
-    with_password(Opts1, LinkConf).
-
-with_password(Opts, #{password := P} = _LinkConf) ->
-    Opts#{password => emqx_secret:unwrap(P)};
-with_password(Opts, _LinkConf) ->
-    Opts.
-
-%%
-
-add_handler() ->
+load() ->
+    ok = prepare_shortconf(get_links()),
     ok = emqx_config_handler:add_handler(?LINKS_PATH, ?MODULE).
 
-remove_handler() ->
-    ok = emqx_config_handler:remove_handler(?LINKS_PATH).
+unload() ->
+    ok = emqx_config_handler:remove_handler(?LINKS_PATH),
+    ok = cleanup_shortconf().
 
 pre_config_update(?LINKS_PATH, RawConf, RawConf) ->
     {ok, RawConf};
@@ -242,11 +273,37 @@ post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
     IsAllOk = all_ok(RemovedRes) andalso all_ok(AddedRes) andalso all_ok(UpdatedRes),
     case IsAllOk of
         true ->
-            ok;
+            prepare_shortconf(New);
         false ->
             {error, #{added => AddedRes, removed => RemovedRes, updated => UpdatedRes}}
     end.
 
+%%
+
+prepare_shortconf(Config) ->
+    Links = [prepare_link(L) || L <- Config],
+    ok = persistent_term:put(?PTERM(links), Links),
+    ok = persistent_term:put(?PTERM(enabled), [L || L = #{enable := true} <- Links]).
+
+cleanup_shortconf() ->
+    _ = persistent_term:erase(?PTERM(links)),
+    _ = persistent_term:erase(?PTERM(enabled)),
+    ok.
+
+prepare_link(#{name := Name, enable := Enabled, topics := Topics}) ->
+    #{
+        name => Name,
+        enable => Enabled,
+        topics => prepare_topics(Topics)
+    }.
+
+prepare_topics(Topics) ->
+    Union = emqx_topic:union(Topics),
+    lists:map(fun emqx_topic:words/1, Union).
+
+get_shortconf(K) ->
+    persistent_term:get(?PTERM(K)).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -101,7 +101,7 @@
 resource_id(ClusterName) ->
     ?MSG_RES_ID(ClusterName).
 
--spec ensure_msg_fwd_resource(map()) ->
+-spec ensure_msg_fwd_resource(emqx_cluster_link_schema:link()) ->
     {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
 ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) ->
     ResOpts1 = ResOpts#{

+ 59 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_router.erl

@@ -0,0 +1,59 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_router).
+
+-export([
+    push_update/3,
+    push_update_persistent/3,
+    compute_intersections/2
+]).
+
+%% Test exports
+-export([push_update/5]).
+
+%%--------------------------------------------------------------------
+
+push_update(Op, Topic, RouteID) ->
+    push_update(Op, Topic, RouteID, fun emqx_cluster_link_router_syncer:push/4).
+
+push_update_persistent(Op, Topic, RouteID) ->
+    push_update(Op, Topic, RouteID, fun emqx_cluster_link_router_syncer:push_persistent_route/4).
+
+push_update(Op, Topic, RouteID, PushFun) ->
+    push_update(Op, Topic, RouteID, PushFun, emqx_cluster_link_config:enabled_links()).
+
+push_update(Op, Topic, RouteID, PushFun, [#{name := Cluster, topics := Filters} | Rest]) ->
+    Intersections = compute_intersections(Topic, Filters),
+    ok = push_intersections(Cluster, Op, RouteID, PushFun, Intersections),
+    push_update(Op, Topic, RouteID, PushFun, Rest);
+push_update(_Op, _Topic, _RouteID, _PushFun, []) ->
+    ok.
+
+push_intersections(Cluster, Op, RouteID, PushFun, [Intersection | Rest]) ->
+    _ = PushFun(Cluster, Op, Intersection, RouteID),
+    push_intersections(Cluster, Op, RouteID, PushFun, Rest);
+push_intersections(_Cluster, _Op, _RouteID, _PushFun, []) ->
+    ok.
+
+compute_intersections(Topic, Filters) ->
+    compute_intersections(Topic, Filters, []).
+
+compute_intersections(Topic, [Filter | Rest], Acc) ->
+    case emqx_topic:intersection(Topic, Filter) of
+        false ->
+            compute_intersections(Topic, Rest, Acc);
+        Intersection ->
+            compute_intersections(Topic, Rest, unionify(Intersection, Acc))
+    end;
+compute_intersections(_Topic, [], Acc) ->
+    Acc.
+
+unionify(Filter, Union) ->
+    %% NOTE: See also `emqx_topic:union/1` implementation.
+    %% Drop filters completely covered by `Filter`.
+    Disjoint = [F || F <- Union, not emqx_topic:is_subset(F, Filter)],
+    %% Drop `Filter` if completely covered by another filter.
+    Head = [Filter || not lists:any(fun(F) -> emqx_topic:is_subset(Filter, F) end, Disjoint)],
+    Head ++ Disjoint.

+ 20 - 71
apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl

@@ -4,7 +4,6 @@
 -module(emqx_cluster_link_router_bootstrap).
 
 -include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_router.hrl").
 -include_lib("emqx/include/emqx_shared_sub.hrl").
 -include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
 
@@ -19,8 +18,7 @@
 
 -record(bootstrap, {
     target :: _ClusterName :: binary(),
-    wildcards :: [emqx_types:topic()],
-    topics :: [emqx_types:topic()],
+    filters :: [emqx_types:topic()],
     stash :: [{emqx_types:topic(), _RouteID}],
     max_batch_size :: non_neg_integer(),
     is_persistent_route :: boolean()
@@ -29,12 +27,10 @@
 %%
 
 init(TargetCluster, LinkFilters, Options) ->
-    {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters),
     IsPersistentRoute = maps:get(is_persistent_route, Options, false),
     #bootstrap{
         target = TargetCluster,
-        wildcards = Wildcards,
-        topics = Topics,
+        filters = LinkFilters,
         stash = [],
         max_batch_size = maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE),
         is_persistent_route = IsPersistentRoute
@@ -43,13 +39,11 @@ init(TargetCluster, LinkFilters, Options) ->
 next_batch(B = #bootstrap{stash = S0 = [_ | _], max_batch_size = MBS}) ->
     {Batch, Stash} = mk_batch(S0, MBS),
     {Batch, B#bootstrap{stash = Stash}};
-next_batch(B = #bootstrap{topics = Topics = [_ | _], stash = [], is_persistent_route = IsPs}) ->
-    next_batch(B#bootstrap{topics = [], stash = routes_by_topic(Topics, IsPs)});
-next_batch(
-    B0 = #bootstrap{wildcards = Wildcards = [_ | _], stash = [], is_persistent_route = IsPs}
-) ->
-    next_batch(B0#bootstrap{wildcards = [], stash = routes_by_wildcards(Wildcards, IsPs)});
-next_batch(#bootstrap{topics = [], wildcards = [], stash = []}) ->
+next_batch(B0 = #bootstrap{filters = Filters = [_ | _], stash = [], is_persistent_route = false}) ->
+    next_batch(B0#bootstrap{filters = [], stash = routes_by_wildcards(Filters)});
+next_batch(B0 = #bootstrap{filters = Filters = [_ | _], stash = [], is_persistent_route = true}) ->
+    next_batch(B0#bootstrap{filters = [], stash = ps_routes_by_wildcards(Filters)});
+next_batch(#bootstrap{filters = [], stash = []}) ->
     done.
 
 mk_batch(Stash, MaxBatchSize) when length(Stash) =< MaxBatchSize ->
@@ -60,33 +54,16 @@ mk_batch(Stash, MaxBatchSize) ->
 
 %%
 
-routes_by_topic(Topics, _IsPersistentRoute = false) ->
-    Routes = select_routes_by_topics(Topics),
-    SharedRoutes = select_shared_sub_routes_by_topics(Topics),
-    Routes ++ SharedRoutes;
-routes_by_topic(Topics, _IsPersistentRoute = true) ->
-    lists:foldl(
-        fun(T, Acc) ->
-            Routes = emqx_persistent_session_ds_router:lookup_routes(T),
-            [encode_route(T, ps_route_id(PSRoute)) || #ps_route{} = PSRoute <- Routes] ++ Acc
-        end,
-        [],
-        Topics
-    ).
-
-routes_by_wildcards(Wildcards, _IsPersistentRoute = false) ->
+routes_by_wildcards(Wildcards) ->
     Routes = select_routes_by_wildcards(Wildcards),
     SharedRoutes = select_shared_sub_routes_by_wildcards(Wildcards),
-    Routes ++ SharedRoutes;
-routes_by_wildcards(Wildcards, _IsPersistentRoute = true) ->
+    Routes ++ SharedRoutes.
+
+ps_routes_by_wildcards(Wildcards) ->
     emqx_persistent_session_ds_router:foldl_routes(
-        fun(#ps_route{topic = T} = PSRoute, Acc) ->
-            case topic_intersect_any(T, Wildcards) of
-                false ->
-                    Acc;
-                Intersec ->
-                    [encode_route(Intersec, ps_route_id(PSRoute)) | Acc]
-            end
+        fun(#ps_route{topic = Topic} = PSRoute, Acc) ->
+            Intersections = emqx_cluster_link_router:compute_intersections(Topic, Wildcards),
+            [encode_route(I, ps_route_id(PSRoute)) || I <- Intersections] ++ Acc
         end,
         []
     ).
@@ -96,52 +73,24 @@ ps_route_id(#ps_route{topic = T, dest = #share_dest{group = Group, session_id =
 ps_route_id(#ps_route{topic = T, dest = SessionId}) ->
     ?PERSISTENT_ROUTE_ID(T, SessionId).
 
-select_routes_by_topics(Topics) ->
-    [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].
-
 select_routes_by_wildcards(Wildcards) ->
     emqx_broker:foldl_topics(
-        fun(Topic, Acc) -> intersecting_route(Topic, Wildcards) ++ Acc end,
+        fun(Topic, Acc) ->
+            Intersections = emqx_cluster_link_router:compute_intersections(Topic, Wildcards),
+            [encode_route(I, Topic) || I <- Intersections] ++ Acc
+        end,
         []
     ).
 
-select_shared_sub_routes_by_topics([T | Topics]) ->
-    select_shared_sub_routes(T) ++ select_shared_sub_routes_by_topics(Topics);
-select_shared_sub_routes_by_topics([]) ->
-    [].
-
 select_shared_sub_routes_by_wildcards(Wildcards) ->
     emqx_utils_ets:keyfoldl(
         fun({Group, Topic}, Acc) ->
-            RouteID = ?SHARED_ROUTE_ID(Topic, Group),
-            intersecting_route(Topic, RouteID, Wildcards) ++ Acc
+            Intersections = emqx_cluster_link_router:compute_intersections(Topic, Wildcards),
+            [encode_route(I, ?SHARED_ROUTE_ID(Topic, Group)) || I <- Intersections] ++ Acc
         end,
         [],
         ?SHARED_SUBSCRIBER
     ).
 
-select_shared_sub_routes(Topic) ->
-    LocalGroups = lists:usort(ets:select(?SHARED_SUBSCRIBER, [{{{'$1', Topic}, '_'}, [], ['$1']}])),
-    [encode_route(Topic, ?SHARED_ROUTE_ID(Topic, G)) || G <- LocalGroups].
-
-intersecting_route(Topic, Wildcards) ->
-    intersecting_route(Topic, Topic, Wildcards).
-
-intersecting_route(Topic, RouteID, Wildcards) ->
-    %% TODO: probably nice to validate cluster link topic filters
-    %% to have no intersections between each other?
-    case topic_intersect_any(Topic, Wildcards) of
-        false -> [];
-        Intersection -> [encode_route(Intersection, RouteID)]
-    end.
-
-topic_intersect_any(Topic, [LinkFilter | T]) ->
-    case emqx_topic:intersection(Topic, LinkFilter) of
-        false -> topic_intersect_any(Topic, T);
-        TopicOrFilter -> TopicOrFilter
-    end;
-topic_intersect_any(_Topic, []) ->
-    false.
-
 encode_route(Topic, RouteID) ->
     emqx_cluster_link_mqtt:encode_field(route, {add, {Topic, RouteID}}).

+ 54 - 62
apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl

@@ -21,7 +21,21 @@
     desc/1
 ]).
 
--import(emqx_schema, [mk_duration/2]).
+-export_type([link/0]).
+
+-type link() :: #{
+    name := binary(),
+    enable := boolean(),
+    server := binary(),
+    topics := [emqx_types:topic()],
+    clientid => binary(),
+    username => binary(),
+    password => emqx_secret:t(binary()),
+    ssl := #{atom() => _},
+    retry_interval := non_neg_integer(),
+    max_inflight := pos_integer(),
+    atom() => _
+}.
 
 -define(MQTT_HOST_OPTS, #{default_port => 1883}).
 
@@ -38,7 +52,9 @@ injected_fields() ->
 
 links_schema(Meta) ->
     ?HOCON(?ARRAY(?R_REF("link")), Meta#{
-        default => [], validator => fun links_validator/1, desc => ?DESC("links")
+        default => [],
+        validator => fun validate_unique_names/1,
+        desc => ?DESC("links")
     }).
 
 link_schema() ->
@@ -65,11 +81,17 @@ fields("link") ->
         }},
         {topics,
             ?HOCON(?ARRAY(binary()), #{
-                desc => ?DESC(topics), required => true, validator => fun topics_validator/1
+                desc => ?DESC(topics),
+                required => true,
+                validator => [
+                    fun validate_topics/1,
+                    fun validate_no_special_topics/1,
+                    fun validate_no_redundancy/1
+                ]
             })},
         {pool_size, ?HOCON(pos_integer(), #{default => 8, desc => ?DESC(pool_size)})},
         {retry_interval,
-            mk_duration(
+            emqx_schema:mk_duration(
                 "MQTT Message retry interval. Delay for the link to retry sending the QoS1/QoS2 "
                 "messages in case of ACK not received.",
                 #{default => <<"15s">>}
@@ -117,67 +139,37 @@ is_hidden_res_opt(Field) ->
 
 %% TODO: check that no link name equals local cluster name,
 %% but this may be tricky since the link config is injected into cluster config (emqx_conf_schema).
-links_validator(Links) ->
-    {_, Dups} = lists:foldl(
-        fun(Link, {Acc, DupAcc}) ->
-            Name = link_name(Link),
-            case Acc of
-                #{Name := _} ->
-                    {Acc, [Name | DupAcc]};
-                _ ->
-                    {Acc#{Name => undefined}, DupAcc}
-            end
-        end,
-        {#{}, []},
-        Links
-    ),
-    check_errors(Dups, duplicated_cluster_links, names).
+validate_unique_names(Links) ->
+    Names = [link_name(L) || L <- Links],
+    Dups = Names -- lists:usort(Names),
+    Dups == [] orelse mk_error(duplicated_cluster_links, duplicates, Dups).
 
 link_name(#{name := Name}) -> Name;
 link_name(#{<<"name">> := Name}) -> Name.
 
-topics_validator(Topics) ->
-    Errors0 = lists:foldl(
-        fun(T, ErrAcc) ->
-            try
-                _ = emqx_topic:validate(T),
-                validate_sys_link_topic(T, ErrAcc)
-            catch
-                E:R ->
-                    [{T, {E, R}} | ErrAcc]
-            end
-        end,
-        [],
-        Topics
-    ),
-    Errors = validate_duplicate_topic_filters(Topics),
-    check_errors(Errors0 ++ Errors, invalid_topics, topics).
-
-validate_sys_link_topic(T, ErrAcc) ->
-    case emqx_topic:match(T, ?TOPIC_PREFIX_WILDCARD) of
-        true ->
-            [{T, {error, topic_not_allowed}} | ErrAcc];
-        false ->
-            ErrAcc
+validate_topics(Topics) ->
+    Errors = lists:flatmap(fun validate_topic/1, Topics),
+    Errors == [] orelse mk_error(invalid_topics, topics, Errors).
+
+validate_topic(Topic) ->
+    try
+        emqx_topic:validate(Topic),
+        []
+    catch
+        error:Reason ->
+            [{Topic, Reason}]
     end.
 
-validate_duplicate_topic_filters(TopicFilters) ->
-    {Duplicated, _} =
-        lists:foldl(
-            fun(T, {Acc, Seen}) ->
-                case sets:is_element(T, Seen) of
-                    true ->
-                        {[{T, duplicate_topic_filter} | Acc], Seen};
-                    false ->
-                        {Acc, sets:add_element(T, Seen)}
-                end
-            end,
-            {[], sets:new([{version, 2}])},
-            TopicFilters
-        ),
-    Duplicated.
-
-check_errors([] = _Errors, _Reason, _ValuesField) ->
-    ok;
-check_errors(Errors, Reason, ValuesField) ->
-    {error, #{reason => Reason, ValuesField => Errors}}.
+validate_no_special_topics(Topics) ->
+    Errors = lists:flatmap(fun validate_sys_link_topic/1, Topics),
+    Errors == [] orelse mk_error(invalid_topics, topics, Errors).
+
+validate_sys_link_topic(Topic) ->
+    [{Topic, topic_not_allowed} || emqx_topic:match(Topic, ?TOPIC_PREFIX_WILDCARD)].
+
+validate_no_redundancy(Topics) ->
+    Redundant = Topics -- emqx_topic:union(Topics),
+    Redundant == [] orelse mk_error(redundant_topics, topics, Redundant).
+
+mk_error(Reason, Field, Errors) ->
+    {error, #{reason => Reason, Field => Errors}}.

+ 13 - 23
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -157,9 +157,9 @@ t_message_forwarding(Config) ->
     [SourceNode1 | _] = nodes_source(Config),
     [TargetNode1, TargetNode2 | _] = nodes_target(Config),
 
-    SourceC1 = start_client("t_message_forwarding", SourceNode1),
-    TargetC1 = start_client("t_message_forwarding1", TargetNode1),
-    TargetC2 = start_client("t_message_forwarding2", TargetNode2),
+    SourceC1 = emqx_cluster_link_cth:connect_client("t_message_forwarding", SourceNode1),
+    TargetC1 = emqx_cluster_link_cth:connect_client("t_message_forwarding1", TargetNode1),
+    TargetC2 = emqx_cluster_link_cth:connect_client("t_message_forwarding2", TargetNode2),
     IsShared = ?config(is_shared_sub, Config),
 
     {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/+">>), qos1),
@@ -198,16 +198,21 @@ t_target_extrouting_gc('end', Config) ->
 t_target_extrouting_gc(Config) ->
     [SourceNode1 | _] = nodes_source(Config),
     [TargetNode1, TargetNode2 | _] = nodes_target(Config),
-    SourceC1 = start_client("t_target_extrouting_gc", SourceNode1),
-    TargetC1 = start_client_unlink("t_target_extrouting_gc1", TargetNode1),
-    TargetC2 = start_client_unlink("t_target_extrouting_gc2", TargetNode2),
+    SourceC1 = emqx_cluster_link_cth:connect_client("t_target_extrouting_gc", SourceNode1),
+    TargetC1 = emqx_cluster_link_cth:connect_client_unlink("t_target_extrouting_gc1", TargetNode1),
+    TargetC2 = emqx_cluster_link_cth:connect_client_unlink("t_target_extrouting_gc2", TargetNode2),
     IsShared = ?config(is_shared_sub, Config),
 
     TopicFilter1 = <<"t/+">>,
     TopicFilter2 = <<"t/#">>,
     {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, TopicFilter1), qos1),
     {ok, _, _} = emqtt:subscribe(TargetC2, maybe_shared_topic(IsShared, TopicFilter2), qos1),
-    {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}),
+    {ok, _} = ?block_until(#{
+        ?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := TargetNode1}
+    }),
+    {ok, _} = ?block_until(#{
+        ?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := TargetNode2}
+    }),
     {ok, _} = emqtt:publish(SourceC1, <<"t/1">>, <<"HELLO1">>, qos1),
     {ok, _} = emqtt:publish(SourceC1, <<"t/2/ext">>, <<"HELLO2">>, qos1),
     {ok, _} = emqtt:publish(SourceC1, <<"t/3/ext">>, <<"HELLO3">>, qos1),
@@ -285,7 +290,7 @@ t_disconnect_on_errors(Config) ->
     ct:timetrap({seconds, 20}),
     [SN1 | _] = nodes_source(Config),
     [TargetNode] = nodes_target(Config),
-    SC1 = start_client("t_disconnect_on_errors", SN1),
+    SC1 = emqx_cluster_link_cth:connect_client("t_disconnect_on_errors", SN1),
     ok = ?ON(SN1, meck:new(emqx_cluster_link, [passthrough, no_link, no_history])),
     ?assertMatch(
         {_, {ok, _}},
@@ -317,20 +322,5 @@ maybe_shared_topic(true = _IsShared, Topic) ->
 maybe_shared_topic(false = _IsShared, Topic) ->
     Topic.
 
-start_client_unlink(ClientId, Node) ->
-    Client = start_client(ClientId, Node),
-    _ = erlang:unlink(Client),
-    Client.
-
-start_client(ClientId, Node) ->
-    Port = tcp_port(Node),
-    {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
-    {ok, _} = emqtt:connect(Client),
-    Client.
-
-tcp_port(Node) ->
-    {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
-    Port.
-
 fmt(Fmt, Args) ->
     emqx_utils:format(Fmt, Args).

+ 159 - 149
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -14,9 +14,6 @@
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
--define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])).
--define(CONF_PATH, [cluster, links]).
-
 -define(CACERT, <<
     "-----BEGIN CERTIFICATE-----\n"
     "MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV\n"
@@ -48,15 +45,16 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    AllTCs = emqx_common_test_helpers:all(?MODULE),
-    OtherTCs = AllTCs -- cluster_test_cases(),
+    [{group, cluster}, {group, local}].
+
+groups() ->
     [
-        {group, cluster}
-        | OtherTCs
+        {cluster, cluster_test_cases()},
+        {local, local_test_cases()}
     ].
 
-groups() ->
-    [{cluster, cluster_test_cases()}].
+local_test_cases() ->
+    emqx_common_test_helpers:all(?MODULE) -- cluster_test_cases().
 
 cluster_test_cases() ->
     [
@@ -68,6 +66,12 @@ cluster_test_cases() ->
 init_per_suite(Config) ->
     %% This is called by emqx_machine in EMQX release
     emqx_otel_app:configure_otel_deps(),
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_group(local = _Group, Config) ->
     Apps = emqx_cth_suite:start(
         [
             emqx_conf,
@@ -77,14 +81,8 @@ init_per_suite(Config) ->
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
-    Auth = auth_header(),
-    [{suite_apps, Apps}, {auth, Auth} | Config].
-
-end_per_suite(Config) ->
-    emqx_cth_suite:stop(?config(suite_apps, Config)),
-    emqx_config:delete_override_conf_files(),
-    ok.
-
+    Auth = emqx_mgmt_api_test_util:auth_header_(),
+    [{suite_apps, Apps}, {auth, Auth} | Config];
 init_per_group(cluster = Group, Config) ->
     ok = emqx_cth_suite:stop_apps([emqx_dashboard]),
     SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(Group, Config),
@@ -105,33 +103,31 @@ init_per_group(cluster = Group, Config) ->
         ],
         #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
     ]),
+    Auth = ?ON(SN1, emqx_mgmt_api_test_util:auth_header_()),
     [
         {source_nodes, SourceNodes},
-        {target_nodes, TargetNodes}
+        {target_nodes, TargetNodes},
+        {auth, Auth}
         | Config
-    ];
-init_per_group(_Group, Config) ->
-    Config.
+    ].
 
+end_per_group(local, Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config));
 end_per_group(cluster, Config) ->
     SourceNodes = ?config(source_nodes, Config),
     TargetNodes = ?config(target_nodes, Config),
     ok = emqx_cth_cluster:stop(SourceNodes),
-    ok = emqx_cth_cluster:stop(TargetNodes),
-    _ = emqx_cth_suite:start_apps(
-        [emqx_mgmt_api_test_util:emqx_dashboard()],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    ok;
-end_per_group(_Group, _Config) ->
-    ok.
+    ok = emqx_cth_cluster:stop(TargetNodes).
 
-auth_header() ->
-    emqx_mgmt_api_test_util:auth_header_().
+init_per_testcase(TC, Config) ->
+    [Group] = [G || {G, TCs} <- groups(), lists:member(TC, TCs)],
+    snabbkaffe:start_trace(),
+    init_per_testcase(TC, Group, Config).
 
-init_per_testcase(_TC, Config) ->
+init_per_testcase(_TC, local, Config) ->
     {ok, _} = emqx_cluster_link_config:update([]),
-    snabbkaffe:start_trace(),
+    Config;
+init_per_testcase(_TC, cluster, Config) ->
     Config.
 
 end_per_testcase(_TC, _Config) ->
@@ -146,47 +142,52 @@ end_per_testcase(_TC, _Config) ->
 api_root() ->
     <<"cluster/links">>.
 
-list() ->
-    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
-    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
+api_path(ReqPath) ->
+    emqx_mgmt_api_test_util:api_path([api_root() | ReqPath]).
+
+api_path(Host, ReqPath) ->
+    emqx_mgmt_api_test_util:api_path(Host, [api_root() | ReqPath]).
+
+api_auth(Config) ->
+    ?config(auth, Config).
 
-get_link(Name) ->
-    get_link(source, Name).
+list(Config) ->
+    Path = api_path([]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "", api_auth(Config)).
 
-get_link(SourceOrTargetCluster, Name) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]),
-    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
+get_link(Name, Config) ->
+    get_link(source, Name, Config).
 
-delete_link(Name) ->
-    Path = emqx_mgmt_api_test_util:api_path([api_root(), "link", Name]),
-    emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = "").
+get_link(SourceOrTargetCluster, Name, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "", api_auth(Config)).
 
-update_link(Name, Params) ->
-    update_link(source, Name, Params).
+delete_link(Name, Config) ->
+    Path = api_path(["link", Name]),
+    emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = "", api_auth(Config)).
 
-update_link(SourceOrTargetCluster, Name, Params) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]),
-    emqx_mgmt_api_test_util:simple_request(put, Path, Params).
+update_link(Name, Params, Config) ->
+    update_link(source, Name, Params, Config).
 
-create_link(Name, Params0) ->
+update_link(SourceOrTargetCluster, Name, Params, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name]),
+    emqx_mgmt_api_test_util:simple_request(put, Path, Params, api_auth(Config)).
+
+create_link(Name, Params0, Config) ->
+    Path = api_path([]),
     Params = Params0#{<<"name">> => Name},
-    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
-    emqx_mgmt_api_test_util:simple_request(post, Path, Params).
+    emqx_mgmt_api_test_util:simple_request(post, Path, Params, api_auth(Config)).
 
-get_metrics(Name) ->
-    get_metrics(source, Name).
+get_metrics(Name, Config) ->
+    get_metrics(source, Name, Config).
 
-get_metrics(SourceOrTargetCluster, Name) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name, "metrics"]),
-    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = []).
+get_metrics(SourceOrTargetCluster, Name, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name, "metrics"]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = [], api_auth(Config)).
 
-reset_metrics(SourceOrTargetCluster, Name) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name, "metrics", "reset"]),
-    emqx_mgmt_api_test_util:simple_request(put, Path, _Params = []).
+reset_metrics(SourceOrTargetCluster, Name, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name, "metrics", "reset"]),
+    emqx_mgmt_api_test_util:simple_request(put, Path, _Params = [], api_auth(Config)).
 
 host(source) -> "http://127.0.0.1:18083";
 host(target) -> "http://127.0.0.1:28083".
@@ -220,7 +221,7 @@ disable_and_force_gc(TargetOrSource, Name, Params, TCConfig, Opts) ->
             target -> ?config(source_nodes, TCConfig);
             source -> ?config(target_nodes, TCConfig)
         end,
-    {200, _} = update_link(TargetOrSource, Name, Params#{<<"enable">> := false}),
+    {200, _} = update_link(TargetOrSource, Name, Params#{<<"enable">> := false}, TCConfig),
     %% Note that only when the GC runs and collects the stopped actor it'll actually
     %% remove the routes
     NowMS = erlang:system_time(millisecond),
@@ -252,8 +253,8 @@ wait_for_routes([], _ExpectedTopics) ->
 %% Test cases
 %%------------------------------------------------------------------------------
 
-t_put_get_valid(_Config) ->
-    ?assertMatch({200, []}, list()),
+t_put_get_valid(Config) ->
+    ?assertMatch({200, []}, list(Config)),
 
     Name1 = <<"emqcl_1">>,
     Link1 = link_params(#{
@@ -265,42 +266,42 @@ t_put_get_valid(_Config) ->
         <<"server">> => <<"emqxcl_2.nohost:41883">>,
         <<"name">> => Name2
     }),
-    ?assertMatch({201, _}, create_link(Name1, Link1)),
-    ?assertMatch({201, _}, create_link(Name2, Link2)),
-    ?assertMatch({200, [_, _]}, list()),
+    ?assertMatch({201, _}, create_link(Name1, Link1, Config)),
+    ?assertMatch({201, _}, create_link(Name2, Link2, Config)),
+    ?assertMatch({200, [_, _]}, list(Config)),
 
     DisabledLink1 = Link1#{<<"enable">> => false},
-    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1))),
-    ?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1)),
-    ?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2)),
+    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1), Config)),
+    ?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1, Config)),
+    ?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2, Config)),
 
     SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT},
     SSLLink1 = Link1#{<<"ssl">> => SSL},
-    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1))),
+    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1), Config)),
     ?assertMatch(
         {200, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}},
-        get_link(Name1)
+        get_link(Name1, Config)
     ),
     ok.
 
-t_put_invalid(_Config) ->
+t_put_invalid(Config) ->
     Name = <<"l1">>,
-    {201, _} = create_link(Name, link_params()),
+    {201, _} = create_link(Name, link_params(), Config),
     ?assertMatch(
         {400, _},
-        update_link(Name, maps:remove(<<"server">>, link_params()))
+        update_link(Name, maps:remove(<<"server">>, link_params()), Config)
     ).
 
 %% Tests a sequence of CRUD operations and their expected responses, for common use cases
 %% and configuration states.
-t_crud(_Config) ->
+t_crud(Config) ->
     %% No links initially.
-    ?assertMatch({200, []}, list()),
+    ?assertMatch({200, []}, list(Config)),
     NameA = <<"a">>,
-    ?assertMatch({404, _}, get_link(NameA)),
-    ?assertMatch({404, _}, delete_link(NameA)),
-    ?assertMatch({404, _}, update_link(NameA, link_params())),
-    ?assertMatch({404, _}, get_metrics(NameA)),
+    ?assertMatch({404, _}, get_link(NameA, Config)),
+    ?assertMatch({404, _}, delete_link(NameA, Config)),
+    ?assertMatch({404, _}, update_link(NameA, link_params(), Config)),
+    ?assertMatch({404, _}, get_metrics(NameA, Config)),
 
     Params1 = link_params(),
     ?assertMatch(
@@ -310,9 +311,12 @@ t_crud(_Config) ->
             <<"status">> := _,
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
         }},
-        create_link(NameA, Params1)
+        create_link(NameA, Params1, Config)
+    ),
+    ?assertMatch(
+        {400, #{<<"code">> := <<"ALREADY_EXISTS">>}},
+        create_link(NameA, Params1, Config)
     ),
-    ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)),
     ?assertMatch(
         {200, [
             #{
@@ -322,7 +326,7 @@ t_crud(_Config) ->
                 <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
             }
         ]},
-        list()
+        list(Config)
     ),
     ?assertMatch(
         {200, #{
@@ -331,9 +335,9 @@ t_crud(_Config) ->
             <<"status">> := _,
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
         }},
-        get_link(NameA)
+        get_link(NameA, Config)
     ),
-    ?assertMatch({200, _}, get_metrics(NameA)),
+    ?assertMatch({200, _}, get_metrics(NameA, Config)),
 
     Params2 = Params1#{<<"pool_size">> := 2},
     ?assertMatch(
@@ -343,32 +347,30 @@ t_crud(_Config) ->
             <<"status">> := _,
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
         }},
-        update_link(NameA, Params2)
+        update_link(NameA, Params2, Config)
     ),
 
-    ?assertMatch({204, _}, delete_link(NameA)),
-    ?assertMatch({404, _}, delete_link(NameA)),
-    ?assertMatch({404, _}, get_link(NameA)),
-    ?assertMatch({404, _}, update_link(NameA, Params1)),
-    ?assertMatch({404, _}, get_metrics(NameA)),
-    ?assertMatch({200, []}, list()),
+    ?assertMatch({204, _}, delete_link(NameA, Config)),
+    ?assertMatch({404, _}, delete_link(NameA, Config)),
+    ?assertMatch({404, _}, get_link(NameA, Config)),
+    ?assertMatch({404, _}, update_link(NameA, Params1, Config)),
+    ?assertMatch({404, _}, get_metrics(NameA, Config)),
+    ?assertMatch({200, []}, list(Config)),
 
     ok.
 
-t_create_invalid(_Config) ->
+t_create_invalid(Config) ->
     Params = link_params(),
     EmptyName = <<>>,
-    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message1}} = create_link(
-        EmptyName, Params
-    ),
+    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message1}} =
+        create_link(EmptyName, Params, Config),
     ?assertMatch(
         #{<<"kind">> := <<"validation_error">>, <<"reason">> := <<"Name cannot be empty string">>},
         Message1
     ),
     LongName = binary:copy(<<$a>>, 256),
-    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message2}} = create_link(
-        LongName, Params
-    ),
+    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message2}} =
+        create_link(LongName, Params, Config),
     ?assertMatch(
         #{
             <<"kind">> := <<"validation_error">>,
@@ -377,9 +379,8 @@ t_create_invalid(_Config) ->
         Message2
     ),
     BadName = <<"~!@#$%^&*()_+{}:'<>?|">>,
-    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message3}} = create_link(
-        BadName, Params
-    ),
+    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message3}} =
+        create_link(BadName, Params, Config),
     ?assertMatch(
         #{
             <<"kind">> := <<"validation_error">>,
@@ -412,7 +413,7 @@ t_status(Config) ->
                     ]
                 }
             ]},
-            list()
+            list(Config)
         )
     ),
     ?assertMatch(
@@ -429,7 +430,7 @@ t_status(Config) ->
                 }
             ]
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
 
     %% If one of the nodes reports a different status, the cluster is inconsistent.
@@ -465,7 +466,7 @@ t_status(Config) ->
                 ]
             }
         ]},
-        list()
+        list(Config)
     ),
     ?assertMatch(
         {200, #{
@@ -481,7 +482,7 @@ t_status(Config) ->
                 }
             ]
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
 
     %% Simulating erpc failures
@@ -512,7 +513,7 @@ t_status(Config) ->
                 ]
             }
         ]},
-        list()
+        list(Config)
     ),
     ?assertMatch(
         {200, #{
@@ -529,7 +530,7 @@ t_status(Config) ->
                 }
             ]
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
     %% Simulate another inconsistency
     ?ON(SN1, begin
@@ -552,7 +553,7 @@ t_status(Config) ->
                 }
             ]
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
 
     ok.
@@ -627,7 +628,7 @@ t_metrics(Config) ->
                 }
             ]
         }},
-        get_metrics(source, SourceName)
+        get_metrics(source, SourceName, Config)
     ),
     ?assertMatch(
         {200, #{
@@ -643,11 +644,11 @@ t_metrics(Config) ->
                 }
             ]
         }},
-        get_metrics(target, TargetName)
+        get_metrics(target, TargetName, Config)
     ),
 
-    SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1),
-    SourceC2 = emqx_cluster_link_SUITE:start_client(<<"sc2">>, SN2),
+    SourceC1 = emqx_cluster_link_cth:connect_client(<<"sc1">>, SN1),
+    SourceC2 = emqx_cluster_link_cth:connect_client(<<"sc2">>, SN2),
     {ok, _, _} = emqtt:subscribe(SourceC1, <<"t/sc1">>),
     {ok, _, _} = emqtt:subscribe(SourceC2, <<"t/sc2">>),
 
@@ -667,7 +668,7 @@ t_metrics(Config) ->
                 }
             ]
         }},
-        get_metrics(source, SourceName)
+        get_metrics(source, SourceName, Config)
     ),
     ?assertMatch(
         {200, #{
@@ -683,11 +684,11 @@ t_metrics(Config) ->
                 }
             ]
         }},
-        get_metrics(target, TargetName)
+        get_metrics(target, TargetName, Config)
     ),
 
-    TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1),
-    TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2),
+    TargetC1 = emqx_cluster_link_cth:connect_client(<<"tc1">>, TN1),
+    TargetC2 = emqx_cluster_link_cth:connect_client(<<"tc2">>, TN2),
     {_, {ok, _}} =
         ?wait_async_action(
             begin
@@ -711,7 +712,7 @@ t_metrics(Config) ->
                     #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}}
                 ]
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
     ),
     ?assertMatch(
@@ -719,7 +720,7 @@ t_metrics(Config) ->
             <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
             <<"node_metrics">> := _
         }},
-        get_metrics(target, TargetName)
+        get_metrics(target, TargetName, Config)
     ),
 
     %% Unsubscribe and remove route.
@@ -743,13 +744,13 @@ t_metrics(Config) ->
                     #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}}
                 ]
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
     ),
 
     %% Disabling the link should remove the routes.
     ct:pal("disabling"),
-    {200, TargetLink0} = get_link(target, TargetName),
+    {200, TargetLink0} = get_link(target, TargetName, Config),
     TargetLink1 = remove_api_virtual_fields(TargetLink0),
     ok = disable_and_force_gc(target, TargetName, TargetLink1, Config, #{
         expected_num_route_deletions => 1
@@ -763,7 +764,7 @@ t_metrics(Config) ->
                 <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
                 <<"node_metrics">> := _
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
     ),
 
@@ -772,7 +773,7 @@ t_metrics(Config) ->
     {_, {ok, _}} =
         ?wait_async_action(
             begin
-                {200, _} = update_link(target, TargetName, TargetLink2)
+                {200, _} = update_link(target, TargetName, TargetLink2, Config)
             end,
             #{?snk_kind := "cluster_link_extrouter_route_added"}
         ),
@@ -785,12 +786,12 @@ t_metrics(Config) ->
                 <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}},
                 <<"node_metrics">> := _
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
     ),
 
     %% Reset metrics
-    ?assertMatch({204, _}, reset_metrics(source, SourceName)),
+    ?assertMatch({204, _}, reset_metrics(source, SourceName, Config)),
     ?assertMatch(
         {200, #{
             <<"metrics">> := #{
@@ -814,26 +815,29 @@ t_metrics(Config) ->
                 }
             ]
         }},
-        get_metrics(source, SourceName)
+        get_metrics(source, SourceName, Config)
     ),
 
-    ok.
+    ok = lists:foreach(
+        fun emqx_cluster_link_cth:disconnect_client/1,
+        [SourceC1, SourceC2, TargetC1, TargetC2]
+    ).
 
 %% Checks that we can update a link via the API in the same fashion as the frontend does,
 %% by sending secrets as `******', and the secret is not mangled.
-t_update_password(_Config) ->
+t_update_password(Config) ->
     ?check_trace(
         begin
             Name = atom_to_binary(?FUNCTION_NAME),
             Password = <<"my secret password">>,
             Params1 = link_params(#{<<"password">> => Password}),
-            {201, Response1} = create_link(Name, Params1),
+            {201, Response1} = create_link(Name, Params1, Config),
             [#{name := Name, password := WrappedPassword0}] = emqx_config:get([cluster, links]),
             ?assertEqual(Password, emqx_secret:unwrap(WrappedPassword0)),
             Params2A = remove_api_virtual_fields(Response1),
             Params2 = Params2A#{<<"pool_size">> := 2},
             ?assertEqual(?REDACTED, maps:get(<<"password">>, Params2)),
-            ?assertMatch({200, _}, update_link(Name, Params2)),
+            ?assertMatch({200, _}, update_link(Name, Params2, Config)),
             [#{name := Name, password := WrappedPassword}] = emqx_config:get([cluster, links]),
             ?assertEqual(Password, emqx_secret:unwrap(WrappedPassword)),
             ok
@@ -843,7 +847,7 @@ t_update_password(_Config) ->
     ok.
 
 %% Checks that we forbid duplicate topic filters.
-t_duplicate_topic_filters(_Config) ->
+t_duplicate_topic_filters(Config) ->
     ?check_trace(
         begin
             Name = atom_to_binary(?FUNCTION_NAME),
@@ -852,12 +856,12 @@ t_duplicate_topic_filters(_Config) ->
                 {400, #{
                     <<"message">> := #{
                         <<"reason">> := #{
-                            <<"reason">> := <<"invalid_topics">>,
-                            <<"topics">> := #{<<"t">> := <<"duplicate_topic_filter">>}
+                            <<"reason">> := <<"redundant_topics">>,
+                            <<"topics">> := [<<"t">>]
                         }
                     }
                 }},
-                create_link(Name, Params1)
+                create_link(Name, Params1, Config)
             ),
             ok
         end,
@@ -867,11 +871,11 @@ t_duplicate_topic_filters(_Config) ->
 
 %% Verifies that some fields are not required when updating a link, such as:
 %%  - clientid
-t_optional_fields_update(_Config) ->
+t_optional_fields_update(Config) ->
     Name = <<"mylink">>,
     Params0 = maps:without([<<"clientid">>], link_params()),
-    {201, _} = create_link(Name, Params0),
-    ?assertMatch({200, _}, update_link(Name, Params0)),
+    {201, _} = create_link(Name, Params0, Config),
+    ?assertMatch({200, _}, update_link(Name, Params0, Config)),
     ok.
 
 %% Verifies that, if we disable a link and then re-enable it, it should keep working.
@@ -880,15 +884,17 @@ t_disable_reenable(Config) ->
     [SN1, _SN2] = SourceNodes = ?config(source_nodes, Config),
     [TN1, TN2] = ?config(target_nodes, Config),
     SourceName = <<"cl.target">>,
-    SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1),
-    TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1),
-    TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2),
+
+    SourceC1 = emqx_cluster_link_cth:connect_client(<<"sc1">>, SN1),
+    TargetC1 = emqx_cluster_link_cth:connect_client(<<"tc1">>, TN1),
+    TargetC2 = emqx_cluster_link_cth:connect_client(<<"tc2">>, TN2),
     Topic1 = <<"t/tc1">>,
     Topic2 = <<"t/tc2">>,
     {ok, _, _} = emqtt:subscribe(TargetC1, Topic1),
     {ok, _, _} = emqtt:subscribe(TargetC2, Topic2),
     %% fixme: use snabbkaffe subscription
-    ?block_until(#{?snk_kind := clink_route_sync_complete}),
+    ?block_until(#{?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := TN1}}),
+    ?block_until(#{?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := TN2}}),
     {ok, _} = emqtt:publish(SourceC1, Topic1, <<"1">>, [{qos, 1}]),
     {ok, _} = emqtt:publish(SourceC1, Topic2, <<"2">>, [{qos, 1}]),
     %% Sanity check: link is working, initially.
@@ -896,13 +902,13 @@ t_disable_reenable(Config) ->
     ?assertReceive({publish, #{topic := Topic2, payload := <<"2">>}}),
 
     %% Now we just disable and re-enable it in the link in the source cluster.
-    {200, #{<<"enable">> := true} = SourceLink0} = get_link(source, SourceName),
+    {200, #{<<"enable">> := true} = SourceLink0} = get_link(source, SourceName, Config),
     SourceLink1 = remove_api_virtual_fields(SourceLink0),
     %% We force GC to simulate that we left the link disable for enough time that the GC
     %% kicks in.
     ?assertMatch(
         {200, #{<<"enable">> := false}},
-        update_link(source, SourceName, SourceLink1#{<<"enable">> := false})
+        update_link(source, SourceName, SourceLink1#{<<"enable">> := false}, Config)
     ),
     %% In the original issue, GC deleted the state of target cluster's agent in source
     %% cluster.  After the fix, there's no longer GC, so we ignore timeouts here.
@@ -913,7 +919,7 @@ t_disable_reenable(Config) ->
     ),
     ?assertMatch(
         {200, #{<<"enable">> := true}},
-        update_link(source, SourceName, SourceLink1)
+        update_link(source, SourceName, SourceLink1, Config)
     ),
 
     Topic3 = <<"t/tc3">>,
@@ -932,4 +938,8 @@ t_disable_reenable(Config) ->
     ?assertReceive({publish, #{topic := Topic2, payload := <<"4">>}}),
     ?assertReceive({publish, #{topic := Topic3, payload := <<"5">>}}),
     ?assertReceive({publish, #{topic := Topic4, payload := <<"6">>}}),
-    ok.
+
+    ok = lists:foreach(
+        fun emqx_cluster_link_cth:disconnect_client/1,
+        [SourceC1, TargetC1, TargetC2]
+    ).

+ 1 - 1
apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl

@@ -238,7 +238,7 @@ t_config_validations(Config) ->
     },
     DuplicatedLinks = [LinkConfA, LinkConfA#{<<"enable">> => false, <<"pool_size">> => 2}],
     ?assertMatch(
-        {error, #{reason := #{reason := duplicated_cluster_links, names := _}}},
+        {error, #{reason := #{reason := duplicated_cluster_links, duplicates := _}}},
         erpc:call(NodeA, emqx_cluster_link_config, update, [DuplicatedLinks])
     ),
 

+ 31 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_cth.erl

@@ -0,0 +1,31 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_cth).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%
+
+-spec connect_client_unlink(_ClientID :: binary(), node()) -> _Pid :: emqtt:client().
+connect_client_unlink(ClientId, Node) ->
+    Client = connect_client(ClientId, Node),
+    _ = erlang:unlink(Client),
+    Client.
+
+-spec connect_client(_ClientID :: binary(), node()) -> _Pid :: emqtt:client().
+connect_client(ClientId, Node) ->
+    Port = tcp_port(Node),
+    {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
+    {ok, _} = emqtt:connect(Client),
+    Client.
+
+-spec disconnect_client(emqtt:client()) -> ok.
+disconnect_client(Pid) ->
+    emqtt:disconnect(Pid).
+
+tcp_port(Node) ->
+    {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.

+ 124 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_router_SUITE.erl

@@ -0,0 +1,124 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_router_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start([], #{work_dir => emqx_cth_suite:work_dir(Config)}),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
+
+%%
+
+t_no_redundant_filters(_Config) ->
+    %% Verify that redundant filters are effectively ignored.
+    %% I.e. when supplied through a config file.
+    Link = prepare_link(
+        <<"L1">>,
+        [<<"t/1">>, <<"t/2">>, <<"t/3">>, <<"t/+">>, <<"t/#">>, <<"t/+/+">>, <<"t/+/+/+">>]
+    ),
+    ok = add_route(<<"t/6">>, [Link]),
+    ok = add_route(<<"t/1">>, [Link]),
+    ok = delete_route(<<"t/6">>, [Link]),
+    ok = delete_route(<<"t/1">>, [Link]),
+    ?assertEqual(
+        [
+            {<<"L1">>, add, <<"t/6">>, <<"t/6">>},
+            {<<"L1">>, add, <<"t/1">>, <<"t/1">>},
+            {<<"L1">>, delete, <<"t/6">>, <<"t/6">>},
+            {<<"L1">>, delete, <<"t/1">>, <<"t/1">>}
+        ],
+        ?drainMailbox()
+    ).
+
+t_overlapping_filters(_Config) ->
+    %% Verify that partially overlapping filters are respected.
+    %% If those filters produce partially overlapping intersections, this will result
+    %% in _multiple_ route ops.
+    Link = prepare_link(
+        <<"L2">>,
+        [<<"t/+/1/#">>, <<"t/z">>, <<"t/1/+">>]
+    ),
+    ok = add_route(<<"t/+">>, [Link]),
+    ok = add_route(<<"t/+/+">>, [Link]),
+    ?assertEqual(
+        [
+            {<<"L2">>, add, <<"t/z">>, <<"t/+">>},
+            {<<"L2">>, add, <<"t/1/+">>, <<"t/+/+">>},
+            {<<"L2">>, add, <<"t/+/1">>, <<"t/+/+">>}
+        ],
+        ?drainMailbox()
+    ).
+
+t_overlapping_filters_many(_Config) ->
+    %% Verify that _all_ partially overlapping filters are respected.
+    Link1 = prepare_link(
+        <<"LO1">>,
+        [<<"t/1">>, <<"t/2">>, <<"t/3">>]
+    ),
+    ok = add_route(<<"t/+/#">>, [Link1]),
+    ?assertEqual(
+        [
+            {<<"LO1">>, add, <<"t/3">>, <<"t/+/#">>},
+            {<<"LO1">>, add, <<"t/2">>, <<"t/+/#">>},
+            {<<"LO1">>, add, <<"t/1">>, <<"t/+/#">>}
+        ],
+        ?drainMailbox()
+    ),
+    Link2 = prepare_link(
+        <<"LO2">>,
+        [<<"t/1">>, <<"t/2">>, <<"t/3">>, <<"t/+">>]
+    ),
+    ok = add_route(<<"t/+/#">>, [Link2]),
+    ?assertEqual(
+        [
+            {<<"LO2">>, add, <<"t/+">>, <<"t/+/#">>}
+        ],
+        ?drainMailbox()
+    ).
+
+t_overlapping_filters_subset(_Config) ->
+    %% Verify that partially overlapping filters are respected.
+    %% But if those filters produce such intersections that one is a subset of another,
+    %% this will result in a _single_ route op.
+    Link = prepare_link(
+        <<"L3">>,
+        [<<"t/1/+/3/#">>, <<"t/z">>, <<"t/1/2/+">>]
+    ),
+    ok = add_route(<<"t/+">>, [Link]),
+    ok = add_route(<<"t/+/2/3/#">>, [Link]),
+    ?assertEqual(
+        [
+            {<<"L3">>, add, <<"t/z">>, <<"t/+">>},
+            {<<"L3">>, add, <<"t/1/2/3/#">>, <<"t/+/2/3/#">>}
+        ],
+        ?drainMailbox()
+    ).
+
+add_route(Topic, Links) ->
+    emqx_cluster_link_router:push_update(add, Topic, Topic, fun push_route/4, Links).
+
+delete_route(Topic, Links) ->
+    emqx_cluster_link_router:push_update(delete, Topic, Topic, fun push_route/4, Links).
+
+push_route(Cluster, OpName, Topic, RouteID) ->
+    self() ! {Cluster, OpName, Topic, RouteID},
+    ok.
+
+prepare_link(Name, Topics) ->
+    emqx_cluster_link_config:prepare_link(#{name => Name, enable => true, topics => Topics}).

+ 99 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_schema_tests.erl

@@ -0,0 +1,99 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_schema_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+parse_and_check(InnerConfigs) ->
+    RootBin = <<"links">>,
+    RawConf = #{RootBin => InnerConfigs},
+    #{RootBin := Checked} = hocon_tconf:check_plain(
+        #{roots => [{links, emqx_cluster_link_schema:links_schema(#{})}]},
+        RawConf,
+        #{
+            required => false,
+            atom_key => false,
+            make_serializable => false
+        }
+    ),
+    Checked.
+
+link(Name) ->
+    link(Name, _Overrides = #{}).
+
+link(Name, Overrides) ->
+    Default = #{
+        <<"name">> => Name,
+        <<"clientid">> => <<"linkclientid">>,
+        <<"password">> => <<"my secret password">>,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"emqxcl_2.nohost:31883">>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>]
+    },
+    emqx_utils_maps:deep_merge(Default, Overrides).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+invalid_topics_test_() ->
+    {"Invalid topics are rejected",
+        ?_assertThrow(
+            {_Schema, [
+                #{
+                    kind := validation_error,
+                    reason := #{
+                        reason := invalid_topics,
+                        topics := [{<<>>, empty_topic}, {<<"$SYS/#/+">>, 'topic_invalid_#'}]
+                    }
+                }
+            ]},
+            parse_and_check([
+                link(<<"link1">>, #{
+                    <<"topics">> => [<<"t/+">>, <<>>, <<"$SYS/#/+">>]
+                })
+            ])
+        )}.
+
+redundant_topics_test_() ->
+    {"Redundant topics is an error",
+        ?_assertThrow(
+            {_Schema, [
+                #{
+                    kind := validation_error,
+                    reason := #{
+                        reason := redundant_topics,
+                        topics := [<<"t/+">>, <<"t/1">>]
+                    }
+                }
+            ]},
+            parse_and_check([
+                link(<<"link1">>, #{
+                    <<"topics">> => [<<"t/+">>, <<"g/2">>, <<"t/1">>, <<"t/+/#">>, <<"t">>]
+                })
+            ])
+        )}.
+
+special_topics_test_() ->
+    {"$LINK topics are not allowed",
+        ?_assertThrow(
+            {_Schema, [
+                #{
+                    kind := validation_error,
+                    reason := #{
+                        reason := invalid_topics,
+                        topics := [{<<"$LINK/cluster/+/name">>, topic_not_allowed}]
+                    }
+                }
+            ]},
+            parse_and_check([
+                link(<<"link1">>, #{
+                    <<"topics">> => [<<"t/+">>, <<"t/1">>, <<"$LINK/cluster/+/name">>]
+                })
+            ])
+        )}.

+ 0 - 64
apps/emqx_cluster_link/test/emqx_cluster_link_tests.erl

@@ -1,64 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
--module(emqx_cluster_link_tests).
-
--include_lib("eunit/include/eunit.hrl").
-
-%%------------------------------------------------------------------------------
-%% Helper fns
-%%------------------------------------------------------------------------------
-
-bin(X) -> emqx_utils_conv:bin(X).
-
-parse_and_check(InnerConfigs) ->
-    RootBin = <<"links">>,
-    RawConf = #{RootBin => InnerConfigs},
-    #{RootBin := Checked} = hocon_tconf:check_plain(
-        #{roots => [{links, emqx_cluster_link_schema:links_schema(#{})}]},
-        RawConf,
-        #{
-            required => false,
-            atom_key => false,
-            make_serializable => false
-        }
-    ),
-    Checked.
-
-link_params(Name) ->
-    link_params(Name, _Overrides = #{}).
-
-link_params(Name, Overrides) ->
-    Default = #{
-        <<"name">> => Name,
-        <<"clientid">> => <<"linkclientid">>,
-        <<"password">> => <<"my secret password">>,
-        <<"pool_size">> => 1,
-        <<"server">> => <<"emqxcl_2.nohost:31883">>,
-        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>]
-    },
-    emqx_utils_maps:deep_merge(Default, Overrides).
-
-%%------------------------------------------------------------------------------
-%% Test cases
-%%------------------------------------------------------------------------------
-
-schema_test_() ->
-    [
-        {"topic filters must be unique",
-            ?_assertThrow(
-                {_Schema, [
-                    #{
-                        reason := #{
-                            reason := invalid_topics,
-                            topics := [{<<"t">>, duplicate_topic_filter}]
-                        },
-                        value := [_, _],
-                        kind := validation_error
-                    }
-                ]},
-                parse_and_check([
-                    link_params(<<"l1">>, #{<<"topics">> => [<<"t">>, <<"t">>]})
-                ])
-            )}
-    ].

+ 3 - 0
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -309,6 +309,9 @@ maybe_json_decode(X) ->
 
 simple_request(Method, Path, Params) ->
     AuthHeader = auth_header_(),
+    simple_request(Method, Path, Params, AuthHeader).
+
+simple_request(Method, Path, Params, AuthHeader) ->
     Opts = #{return_all => true},
     case request_api(Method, Path, "", AuthHeader, Params, Opts) of
         {ok, {{_, Status, _}, _Headers, Body0}} ->

+ 9 - 53
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -341,63 +341,19 @@ deliver_to_client([Msg | Rest], Pid, Topic) ->
 deliver_to_client([], _, _) ->
     ok.
 
--define(DELIVER_ALLOWED, true).
--define(DELIVER_NOT_ALLOWED, false).
--define(publisher_client_banned, publisher_client_banned).
--define(msg_topic_not_match, msg_topic_not_match).
-
-filter_delivery(Messages, Topic) ->
-    FilterFun =
-        fun(Msg) ->
-            Pipe = emqx_utils:pipeline(
-                [
-                    fun check_clientid_banned/2,
-                    fun 'check_prefixed_$_with_wildcard'/2
-                ],
-                {Msg, Topic},
-                ?DELIVER_NOT_ALLOWED
-            ),
-            _ =
-                case Pipe of
-                    {ok, _, ?DELIVER_ALLOWED} ->
-                        true;
-                    {error, _, _} ->
-                        false
-                end
-        end,
-    lists:filter(FilterFun, Messages).
+filter_delivery(Messages, _Topic) ->
+    lists:filter(fun check_clientid_banned/1, Messages).
 
-check_clientid_banned({Msg, _Topic} = Input, _) ->
+check_clientid_banned(Msg) ->
     case emqx_banned:check_clientid(Msg#message.from) of
         false ->
-            {ok, Input, ?DELIVER_ALLOWED};
-        true ->
-            ?tp(
-                debug,
-                ignore_retained_message_due_to_banned,
-                #{
-                    reason => ?publisher_client_banned,
-                    clientid => Msg#message.from
-                }
-            ),
-            {error, ?publisher_client_banned, ?DELIVER_NOT_ALLOWED}
-    end.
-
-%% [MQTT-4.7.2-1]
-'check_prefixed_$_with_wildcard'({Msg, Topic} = Input, _) ->
-    case emqx_topic:match(Msg#message.topic, Topic) of
-        false ->
-            ?tp(
-                ignore_retained_message_due_to_topic_not_match,
-                #{
-                    reason => ?msg_topic_not_match,
-                    msg_topic => Msg#message.topic,
-                    subscribed_topic => Topic
-                }
-            ),
-            {error, ?msg_topic_not_match, ?DELIVER_NOT_ALLOWED};
+            true;
         true ->
-            {ok, Input, ?DELIVER_ALLOWED}
+            ?tp(debug, ignore_retained_message_due_to_banned, #{
+                reason => publisher_client_banned,
+                clientid => Msg#message.from
+            }),
+            false
     end.
 
 take(N, List) ->

+ 19 - 4
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -381,12 +381,24 @@ search_stream(Tokens, Now) ->
 
 search_stream(undefined, Tokens, Now) ->
     Ms = make_message_match_spec(Tokens, Now),
-    emqx_utils_stream:ets(
+    MsgStream = emqx_utils_stream:ets(
         fun
             (undefined) -> ets:select(?TAB_MESSAGE, Ms, 1);
             (Cont) -> ets:select(Cont)
         end
-    );
+    ),
+    case Tokens of
+        %% NOTE: Can not match only with $SPECIAL topics [MQTT-4.7.2-1].
+        [T | _] when T == '+' orelse T == '#' ->
+            emqx_utils_stream:filter(
+                fun(#retained_message{topic = [TopicToken | _]}) ->
+                    emqx_topic:match([TopicToken], [T])
+                end,
+                MsgStream
+            );
+        _ ->
+            MsgStream
+    end;
 search_stream(Index, FilterTokens, Now) ->
     {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
     IndexRecordStream = emqx_utils_stream:ets(
@@ -415,8 +427,11 @@ search_stream(Index, FilterTokens, Now) ->
     ),
     ValidRetainMsgStream.
 
-match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
-match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
+match(_IsExactMs = true, [TopicToken | _], [FilterToken | _]) ->
+    %% NOTE: Can not match only with $SPECIAL topics [MQTT-4.7.2-1].
+    emqx_topic:match([TopicToken], [FilterToken]);
+match(_IsExactMs = false, TopicTokens, FilterTokens) ->
+    emqx_topic:match(TopicTokens, FilterTokens).
 
 delete_message_by_topic(TopicTokens, Indices) ->
     case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of

+ 13 - 38
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -250,68 +250,49 @@ t_wildcard_subscription(Config) ->
     emqtt:publish(C1, <<"/x/y/z">>, <<"">>, [{qos, 0}, {retain, true}]),
     ok = emqtt:disconnect(C1).
 
-'t_wildcard_no_$_prefix'(Config) ->
+'t_wildcard_no_$_prefix'(_Config) ->
     {ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C0),
     emqtt:publish(
         C0,
         <<"$test/t/0">>,
         <<"this is a retained message with $ prefix in topic">>,
-        [{qos, 0}, {retain, true}]
+        [{qos, 1}, {retain, true}]
     ),
     emqtt:publish(
         C0,
         <<"$test/test/1">>,
         <<"this is another retained message with $ prefix in topic">>,
-        [{qos, 0}, {retain, true}]
+        [{qos, 1}, {retain, true}]
     ),
 
     emqtt:publish(
         C0,
         <<"t/1">>,
         <<"this is a retained message 1">>,
-        [{qos, 0}, {retain, true}]
+        [{qos, 1}, {retain, true}]
     ),
     emqtt:publish(
         C0,
         <<"t/2">>,
         <<"this is a retained message 2">>,
-        [{qos, 0}, {retain, true}]
+        [{qos, 1}, {retain, true}]
     ),
-    publish(
+    emqtt:publish(
         C0,
         <<"/t/3">>,
         <<"this is a retained message 3">>,
-        [{qos, 0}, {retain, true}],
-        Config
+        [{qos, 1}, {retain, true}]
     ),
 
-    snabbkaffe:start_trace(),
-    SnabbkaffeSubFun = fun(NEvents) ->
-        snabbkaffe:subscribe(
-            ?match_event(#{?snk_kind := ignore_retained_message_due_to_topic_not_match}),
-            NEvents,
-            _Timeout = 10000,
-            0
-        )
-    end,
-    SnabbkaffeReceiveAndAssert = fun(SubRef, NEvents) ->
-        {ok, Trace} = snabbkaffe:receive_events(SubRef),
-        ?assertEqual(
-            NEvents, length(?of_kind(ignore_retained_message_due_to_topic_not_match, Trace))
-        )
-    end,
-
     %%%%%%%%%%
     %% C1 subscribes to `#'
-    {ok, SubRef1} = SnabbkaffeSubFun(2),
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),
     {ok, #{}, [0]} = emqtt:subscribe(C1, <<"#">>, 0),
     %% Matched 5 msgs but only receive 3 msgs, 2 ignored
     %% (`$test/t/0` and `$test/test/1` with `$` prefix in topic are ignored)
-    SnabbkaffeReceiveAndAssert(SubRef1, 2),
-    Msgs1 = receive_messages(3),
+    Msgs1 = receive_messages(5),
     ?assertMatch(
         %% The order in which messages are received is not always the same as the order in which they are published.
         %% The received order follows the order in which the indexes match.
@@ -323,8 +304,7 @@ t_wildcard_subscription(Config) ->
             #{topic := <<"t/1">>},
             #{topic := <<"t/2">>}
         ],
-        Msgs1,
-        #{msgs => Msgs1}
+        Msgs1
     ),
     ok = emqtt:disconnect(C1),
 
@@ -340,8 +320,7 @@ t_wildcard_subscription(Config) ->
             #{topic := <<"$test/t/0">>},
             #{topic := <<"$test/test/1">>}
         ],
-        Msgs2,
-        #{msgs => Msgs2}
+        Msgs2
     ),
     ok = emqtt:disconnect(C2),
 
@@ -357,31 +336,27 @@ t_wildcard_subscription(Config) ->
             #{topic := <<"t/1">>},
             #{topic := <<"t/2">>}
         ],
-        Msgs3,
-        #{msgs => Msgs3}
+        Msgs3
     ),
     ok = emqtt:disconnect(C3),
 
     %%%%%%%%%%
     %% C4 subscribes to `+/t/#'
-    {ok, SubRef4} = SnabbkaffeSubFun(1),
     {ok, C4} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C4),
     {ok, #{}, [0]} = emqtt:subscribe(C4, <<"+/t/#">>, 0),
     %% Matched 2 msgs but only receive 1 msgs, 1 ignored
     %% (`$test/t/0` with `$` prefix in topic are ignored)
-    SnabbkaffeReceiveAndAssert(SubRef4, 1),
     Msgs4 = receive_messages(1),
     ?assertMatch(
         [
             #{topic := <<"/t/3">>}
         ],
-        Msgs4,
-        #{msgs => Msgs4}
+        Msgs4
     ),
     ok = emqtt:disconnect(C4),
 
-    snabbkaffe:stop(),
+    ?assertNotReceive(_),
 
     ok.
 

+ 5 - 0
changes/ee/fix-14004.en.md

@@ -0,0 +1,5 @@
+Fix an issue with Cluster Linking where having overlapping topic filters in `topics` configuration would lead to inconsistent and incomplete cross-cluster message routing. Each topic filter is respected now, but overlapping filters may lead to increased cross-cluster routing complexity.
+
+## Breaking changes
+
+Redundant topic filters (e.g. `t/1` and `t/+`) in the `topics` configuration are now considered invalid. The link will fail to start if such configuration is detected.