Просмотр исходного кода

fix(cluster-link): correctly replicate overlapping route intersections

Andrew Mayorov 1 год назад
Родитель
Сommit
28824aa74b

+ 14 - 36
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -59,32 +59,34 @@ unregister_external_broker() ->
 %% There is no need to push Node name as this info can be derived from
 %% agent state on the remote cluster.
 add_route(Topic) ->
-    maybe_push_route_op(add, Topic, Topic).
+    emqx_cluster_link_router:push_update(add, Topic, Topic).
 
 delete_route(Topic) ->
-    maybe_push_route_op(delete, Topic, Topic).
+    emqx_cluster_link_router:push_update(delete, Topic, Topic).
 
 add_shared_route(Topic, Group) ->
-    maybe_push_route_op(add, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
+    RouteID = ?SHARED_ROUTE_ID(Topic, Group),
+    emqx_cluster_link_router:push_update(add, Topic, RouteID).
 
 delete_shared_route(Topic, Group) ->
-    maybe_push_route_op(delete, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
+    RouteID = ?SHARED_ROUTE_ID(Topic, Group),
+    emqx_cluster_link_router:push_update(delete, Topic, RouteID).
 
 add_persistent_route(Topic, ID) ->
-    maybe_push_route_op(add, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
+    RouteID = ?PERSISTENT_ROUTE_ID(Topic, ID),
+    emqx_cluster_link_router:push_update_persistent(add, Topic, RouteID).
 
 delete_persistent_route(Topic, ID) ->
-    maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
+    RouteID = ?PERSISTENT_ROUTE_ID(Topic, ID),
+    emqx_cluster_link_router:push_update_persistent(delete, Topic, RouteID).
 
 add_persistent_shared_route(Topic, Group, ID) ->
-    maybe_push_route_op(
-        add, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
-    ).
+    RouteID = ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID),
+    emqx_cluster_link_router:push_update_persistent(add, Topic, RouteID).
 
 delete_persistent_shared_route(Topic, Group, ID) ->
-    maybe_push_route_op(
-        delete, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
-    ).
+    RouteID = ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID),
+    emqx_cluster_link_router:push_update_persistent(delete, Topic, RouteID).
 
 forward(#delivery{message = #message{extra = #{link_origin := _}}}) ->
     %% Do not forward any external messages to other links.
@@ -197,30 +199,6 @@ handle_route_op_msg(
             error
     end.
 
-maybe_push_route_op(Op, Topic, RouteID) ->
-    maybe_push_route_op(Op, Topic, RouteID, push).
-
-maybe_push_route_op(Op, Topic, RouteID, PushFun) ->
-    lists:foreach(
-        fun(#{name := Cluster, topics := LinkFilters}) ->
-            case topic_intersect_any(Topic, LinkFilters) of
-                false ->
-                    ok;
-                TopicIntersection ->
-                    emqx_cluster_link_router_syncer:PushFun(Cluster, Op, TopicIntersection, RouteID)
-            end
-        end,
-        emqx_cluster_link_config:enabled_links()
-    ).
-
-topic_intersect_any(Topic, [LinkFilter | T]) ->
-    case emqx_topic:intersection(Topic, LinkFilter) of
-        false -> topic_intersect_any(Topic, T);
-        TopicOrFilter -> TopicOrFilter
-    end;
-topic_intersect_any(_Topic, []) ->
-    false.
-
 actor_init(
     ClusterName,
     #{actor := Actor, incarnation := Incr},

+ 59 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_router.erl

@@ -0,0 +1,59 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_router).
+
+-export([
+    push_update/3,
+    push_update_persistent/3,
+    compute_intersections/2
+]).
+
+%% Test exports
+-export([push_update/5]).
+
+%%--------------------------------------------------------------------
+
+push_update(Op, Topic, RouteID) ->
+    push_update(Op, Topic, RouteID, fun emqx_cluster_link_router_syncer:push/4).
+
+push_update_persistent(Op, Topic, RouteID) ->
+    push_update(Op, Topic, RouteID, fun emqx_cluster_link_router_syncer:push_persistent_route/4).
+
+push_update(Op, Topic, RouteID, PushFun) ->
+    push_update(Op, Topic, RouteID, PushFun, emqx_cluster_link_config:enabled_links()).
+
+push_update(Op, Topic, RouteID, PushFun, [#{name := Cluster, topics := Filters} | Rest]) ->
+    Intersections = compute_intersections(Topic, Filters),
+    ok = push_intersections(Cluster, Op, RouteID, PushFun, Intersections),
+    push_update(Op, Topic, RouteID, PushFun, Rest);
+push_update(_Op, _Topic, _RouteID, _PushFun, []) ->
+    ok.
+
+push_intersections(Cluster, Op, RouteID, PushFun, [Intersection | Rest]) ->
+    _ = PushFun(Cluster, Op, Intersection, RouteID),
+    push_intersections(Cluster, Op, RouteID, PushFun, Rest);
+push_intersections(_Cluster, _Op, _RouteID, _PushFun, []) ->
+    ok.
+
+compute_intersections(Topic, Filters) ->
+    compute_intersections(Topic, Filters, []).
+
+compute_intersections(Topic, [Filter | Rest], Acc) ->
+    case emqx_topic:intersection(Topic, Filter) of
+        false ->
+            compute_intersections(Topic, Rest, Acc);
+        Intersection ->
+            compute_intersections(Topic, Rest, unionify(Intersection, Acc))
+    end;
+compute_intersections(_Topic, [], Acc) ->
+    Acc.
+
+unionify(Filter, Union) ->
+    %% NOTE: See also `emqx_topic:union/1` implementation.
+    %% Drop filters completely covered by `Filter`.
+    Disjoint = [F || F <- Union, not emqx_topic:subset(F, Filter)],
+    %% Drop `Filter` if completely covered by another filter.
+    Head = [Filter || not lists:any(fun(F) -> emqx_topic:subset(Filter, F) end, Disjoint)],
+    Head ++ Disjoint.

+ 97 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_router_SUITE.erl

@@ -0,0 +1,97 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_router_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start([], #{work_dir => emqx_cth_suite:work_dir(Config)}),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(apps, Config)).
+
+%%
+
+t_no_redundant_filters(_Config) ->
+    %% Verify that redundant filters are effectively ignored.
+    %% I.e. when supplied through a config file.
+    Link = prepare_link(
+        <<"L1">>,
+        [<<"t/1">>, <<"t/2">>, <<"t/3">>, <<"t/+">>, <<"t/#">>, <<"t/+/+">>, <<"t/+/+/+">>]
+    ),
+    ok = add_route(<<"t/6">>, [Link]),
+    ok = add_route(<<"t/1">>, [Link]),
+    ok = delete_route(<<"t/6">>, [Link]),
+    ok = delete_route(<<"t/1">>, [Link]),
+    ?assertEqual(
+        [
+            {<<"L1">>, add, <<"t/6">>, <<"t/6">>},
+            {<<"L1">>, add, <<"t/1">>, <<"t/1">>},
+            {<<"L1">>, delete, <<"t/6">>, <<"t/6">>},
+            {<<"L1">>, delete, <<"t/1">>, <<"t/1">>}
+        ],
+        ?drainMailbox()
+    ).
+
+t_overlapping_filters(_Config) ->
+    %% Verify that partially overlapping filters are respected.
+    %% If those filters produce partially overlapping intersections, this will result
+    %% in _multiple_ route ops.
+    Link = prepare_link(
+        <<"L2">>,
+        [<<"t/+/1/#">>, <<"t/z">>, <<"t/1/+">>]
+    ),
+    ok = add_route(<<"t/+">>, [Link]),
+    ok = add_route(<<"t/+/+">>, [Link]),
+    ?assertEqual(
+        [
+            {<<"L2">>, add, <<"t/z">>, <<"t/+">>},
+            {<<"L2">>, add, <<"t/1/+">>, <<"t/+/+">>},
+            {<<"L2">>, add, <<"t/+/1">>, <<"t/+/+">>}
+        ],
+        ?drainMailbox()
+    ).
+
+t_overlapping_filters_subset(_Config) ->
+    %% Verify that partially overlapping filters are respected.
+    %% But if those filters produce such intersections that one is a subset of another,
+    %% this will result in a _single_ route op.
+    Link = prepare_link(
+        <<"L3">>,
+        [<<"t/1/+/3/#">>, <<"t/z">>, <<"t/1/2/+">>]
+    ),
+    ok = add_route(<<"t/+">>, [Link]),
+    ok = add_route(<<"t/+/2/3/#">>, [Link]),
+    ?assertEqual(
+        [
+            {<<"L3">>, add, <<"t/z">>, <<"t/+">>},
+            {<<"L3">>, add, <<"t/1/2/3/#">>, <<"t/+/2/3/#">>}
+        ],
+        ?drainMailbox()
+    ).
+
+add_route(Topic, Links) ->
+    emqx_cluster_link_router:push_update(add, Topic, Topic, fun push_route/4, Links).
+
+delete_route(Topic, Links) ->
+    emqx_cluster_link_router:push_update(delete, Topic, Topic, fun push_route/4, Links).
+
+push_route(Cluster, OpName, Topic, RouteID) ->
+    self() ! {Cluster, OpName, Topic, RouteID},
+    ok.
+
+prepare_link(Name, Topics) ->
+    emqx_cluster_link_config:prepare_link(#{name => Name, enable => true, topics => Topics}).