|
|
@@ -4,7 +4,6 @@
|
|
|
-module(emqx_cluster_link_router_bootstrap).
|
|
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
--include_lib("emqx/include/emqx_router.hrl").
|
|
|
-include_lib("emqx/include/emqx_shared_sub.hrl").
|
|
|
-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
|
|
|
|
|
|
@@ -19,8 +18,7 @@
|
|
|
|
|
|
-record(bootstrap, {
|
|
|
target :: _ClusterName :: binary(),
|
|
|
- wildcards :: [emqx_types:topic()],
|
|
|
- topics :: [emqx_types:topic()],
|
|
|
+ filters :: [emqx_types:topic()],
|
|
|
stash :: [{emqx_types:topic(), _RouteID}],
|
|
|
max_batch_size :: non_neg_integer(),
|
|
|
is_persistent_route :: boolean()
|
|
|
@@ -29,12 +27,10 @@
|
|
|
%%
|
|
|
|
|
|
init(TargetCluster, LinkFilters, Options) ->
|
|
|
- {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters),
|
|
|
IsPersistentRoute = maps:get(is_persistent_route, Options, false),
|
|
|
#bootstrap{
|
|
|
target = TargetCluster,
|
|
|
- wildcards = Wildcards,
|
|
|
- topics = Topics,
|
|
|
+ filters = LinkFilters,
|
|
|
stash = [],
|
|
|
max_batch_size = maps:get(max_batch_size, Options, ?MAX_BATCH_SIZE),
|
|
|
is_persistent_route = IsPersistentRoute
|
|
|
@@ -43,13 +39,11 @@ init(TargetCluster, LinkFilters, Options) ->
|
|
|
next_batch(B = #bootstrap{stash = S0 = [_ | _], max_batch_size = MBS}) ->
|
|
|
{Batch, Stash} = mk_batch(S0, MBS),
|
|
|
{Batch, B#bootstrap{stash = Stash}};
|
|
|
-next_batch(B = #bootstrap{topics = Topics = [_ | _], stash = [], is_persistent_route = IsPs}) ->
|
|
|
- next_batch(B#bootstrap{topics = [], stash = routes_by_topic(Topics, IsPs)});
|
|
|
-next_batch(
|
|
|
- B0 = #bootstrap{wildcards = Wildcards = [_ | _], stash = [], is_persistent_route = IsPs}
|
|
|
-) ->
|
|
|
- next_batch(B0#bootstrap{wildcards = [], stash = routes_by_wildcards(Wildcards, IsPs)});
|
|
|
-next_batch(#bootstrap{topics = [], wildcards = [], stash = []}) ->
|
|
|
+next_batch(B0 = #bootstrap{filters = Filters = [_ | _], stash = [], is_persistent_route = false}) ->
|
|
|
+ next_batch(B0#bootstrap{filters = [], stash = routes_by_wildcards(Filters)});
|
|
|
+next_batch(B0 = #bootstrap{filters = Filters = [_ | _], stash = [], is_persistent_route = true}) ->
|
|
|
+ next_batch(B0#bootstrap{filters = [], stash = ps_routes_by_wildcards(Filters)});
|
|
|
+next_batch(#bootstrap{filters = [], stash = []}) ->
|
|
|
done.
|
|
|
|
|
|
mk_batch(Stash, MaxBatchSize) when length(Stash) =< MaxBatchSize ->
|
|
|
@@ -60,33 +54,16 @@ mk_batch(Stash, MaxBatchSize) ->
|
|
|
|
|
|
%%
|
|
|
|
|
|
-routes_by_topic(Topics, _IsPersistentRoute = false) ->
|
|
|
- Routes = select_routes_by_topics(Topics),
|
|
|
- SharedRoutes = select_shared_sub_routes_by_topics(Topics),
|
|
|
- Routes ++ SharedRoutes;
|
|
|
-routes_by_topic(Topics, _IsPersistentRoute = true) ->
|
|
|
- lists:foldl(
|
|
|
- fun(T, Acc) ->
|
|
|
- Routes = emqx_persistent_session_ds_router:lookup_routes(T),
|
|
|
- [encode_route(T, ps_route_id(PSRoute)) || #ps_route{} = PSRoute <- Routes] ++ Acc
|
|
|
- end,
|
|
|
- [],
|
|
|
- Topics
|
|
|
- ).
|
|
|
-
|
|
|
-routes_by_wildcards(Wildcards, _IsPersistentRoute = false) ->
|
|
|
+routes_by_wildcards(Wildcards) ->
|
|
|
Routes = select_routes_by_wildcards(Wildcards),
|
|
|
SharedRoutes = select_shared_sub_routes_by_wildcards(Wildcards),
|
|
|
- Routes ++ SharedRoutes;
|
|
|
-routes_by_wildcards(Wildcards, _IsPersistentRoute = true) ->
|
|
|
+ Routes ++ SharedRoutes.
|
|
|
+
|
|
|
+ps_routes_by_wildcards(Wildcards) ->
|
|
|
emqx_persistent_session_ds_router:foldl_routes(
|
|
|
- fun(#ps_route{topic = T} = PSRoute, Acc) ->
|
|
|
- case topic_intersect_any(T, Wildcards) of
|
|
|
- false ->
|
|
|
- Acc;
|
|
|
- Intersec ->
|
|
|
- [encode_route(Intersec, ps_route_id(PSRoute)) | Acc]
|
|
|
- end
|
|
|
+ fun(#ps_route{topic = Topic} = PSRoute, Acc) ->
|
|
|
+ Intersections = emqx_cluster_link_router:compute_intersections(Topic, Wildcards),
|
|
|
+ [encode_route(I, ps_route_id(PSRoute)) || I <- Intersections] ++ Acc
|
|
|
end,
|
|
|
[]
|
|
|
).
|
|
|
@@ -96,52 +73,24 @@ ps_route_id(#ps_route{topic = T, dest = #share_dest{group = Group, session_id =
|
|
|
ps_route_id(#ps_route{topic = T, dest = SessionId}) ->
|
|
|
?PERSISTENT_ROUTE_ID(T, SessionId).
|
|
|
|
|
|
-select_routes_by_topics(Topics) ->
|
|
|
- [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].
|
|
|
-
|
|
|
select_routes_by_wildcards(Wildcards) ->
|
|
|
emqx_broker:foldl_topics(
|
|
|
- fun(Topic, Acc) -> intersecting_route(Topic, Wildcards) ++ Acc end,
|
|
|
+ fun(Topic, Acc) ->
|
|
|
+ Intersections = emqx_cluster_link_router:compute_intersections(Topic, Wildcards),
|
|
|
+ [encode_route(I, Topic) || I <- Intersections] ++ Acc
|
|
|
+ end,
|
|
|
[]
|
|
|
).
|
|
|
|
|
|
-select_shared_sub_routes_by_topics([T | Topics]) ->
|
|
|
- select_shared_sub_routes(T) ++ select_shared_sub_routes_by_topics(Topics);
|
|
|
-select_shared_sub_routes_by_topics([]) ->
|
|
|
- [].
|
|
|
-
|
|
|
select_shared_sub_routes_by_wildcards(Wildcards) ->
|
|
|
emqx_utils_ets:keyfoldl(
|
|
|
fun({Group, Topic}, Acc) ->
|
|
|
- RouteID = ?SHARED_ROUTE_ID(Topic, Group),
|
|
|
- intersecting_route(Topic, RouteID, Wildcards) ++ Acc
|
|
|
+ Intersections = emqx_cluster_link_router:compute_intersections(Topic, Wildcards),
|
|
|
+ [encode_route(I, ?SHARED_ROUTE_ID(Topic, Group)) || I <- Intersections] ++ Acc
|
|
|
end,
|
|
|
[],
|
|
|
?SHARED_SUBSCRIBER
|
|
|
).
|
|
|
|
|
|
-select_shared_sub_routes(Topic) ->
|
|
|
- LocalGroups = lists:usort(ets:select(?SHARED_SUBSCRIBER, [{{{'$1', Topic}, '_'}, [], ['$1']}])),
|
|
|
- [encode_route(Topic, ?SHARED_ROUTE_ID(Topic, G)) || G <- LocalGroups].
|
|
|
-
|
|
|
-intersecting_route(Topic, Wildcards) ->
|
|
|
- intersecting_route(Topic, Topic, Wildcards).
|
|
|
-
|
|
|
-intersecting_route(Topic, RouteID, Wildcards) ->
|
|
|
- %% TODO: probably nice to validate cluster link topic filters
|
|
|
- %% to have no intersections between each other?
|
|
|
- case topic_intersect_any(Topic, Wildcards) of
|
|
|
- false -> [];
|
|
|
- Intersection -> [encode_route(Intersection, RouteID)]
|
|
|
- end.
|
|
|
-
|
|
|
-topic_intersect_any(Topic, [LinkFilter | T]) ->
|
|
|
- case emqx_topic:intersection(Topic, LinkFilter) of
|
|
|
- false -> topic_intersect_any(Topic, T);
|
|
|
- TopicOrFilter -> TopicOrFilter
|
|
|
- end;
|
|
|
-topic_intersect_any(_Topic, []) ->
|
|
|
- false.
|
|
|
-
|
|
|
encode_route(Topic, RouteID) ->
|
|
|
emqx_cluster_link_mqtt:encode_field(route, {add, {Topic, RouteID}}).
|