Explorar o código

feat: cluster link prototype WIP

Serge Tupchii hai 1 ano
pai
achega
f08342c704

+ 60 - 27
apps/emqx/src/emqx_broker.erl

@@ -244,11 +244,22 @@ publish(Msg) when is_record(Msg, message) ->
                 topic => Topic
             }),
             [];
-        Msg1 = #message{topic = Topic} ->
-            PersistRes = persist_publish(Msg1),
-            route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes)
+        Msg1 = #message{} ->
+            do_publish(Msg1);
+        Msgs when is_list(Msgs) -> do_publish_many(Msgs)
     end.
 
+do_publish_many([]) ->
+    [];
+do_publish_many([Msg | T]) ->
+    do_publish(Msg) ++ do_publish_many(T).
+
+do_publish(#message{topic = Topic} = Msg) ->
+    PersistRes = persist_publish(Msg),
+    {Routes, ExtRoutes} = aggre(emqx_router:match_routes(Topic)),
+    Routes1 = maybe_add_ext_routes(ExtRoutes, Routes, Msg),
+    route(Routes1, delivery(Msg), PersistRes).
+
 persist_publish(Msg) ->
     case emqx_persistent_message:persist(Msg) of
         ok ->
@@ -311,26 +322,40 @@ do_route({To, Node}, Delivery) when Node =:= node() ->
     {Node, To, dispatch(To, Delivery)};
 do_route({To, Node}, Delivery) when is_atom(Node) ->
     {Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
+do_route({To, {external, _} = ExtDest}, Delivery) ->
+    {ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)};
 do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
     {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
 
 aggre([]) ->
-    [];
+    {[], []};
 aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
-    [{To, Node}];
+    {[{To, Node}], []};
+aggre([#route{topic = To, dest = {external, _} = ExtDest}]) ->
+    {[], [{To, ExtDest}]};
 aggre([#route{topic = To, dest = {Group, _Node}}]) ->
-    [{To, Group}];
+    {[{To, Group}], []};
 aggre(Routes) ->
-    aggre(Routes, false, []).
-
-aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
-    aggre(Rest, Dedup, [{To, Node} | Acc]);
-aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
-    aggre(Rest, true, [{To, Group} | Acc]);
+    aggre(Routes, false, {[], []}).
+
+aggre([#route{topic = To, dest = Node} | Rest], Dedup, {Acc, ExtAcc}) when is_atom(Node) ->
+    aggre(Rest, Dedup, {[{To, Node} | Acc], ExtAcc});
+aggre([#route{topic = To, dest = {external, _} = ExtDest} | Rest], Dedup, {Acc, ExtAcc}) ->
+    aggre(Rest, Dedup, {Acc, [{To, ExtDest} | ExtAcc]});
+aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, {Acc, ExtAcc}) ->
+    aggre(Rest, true, {[{To, Group} | Acc], ExtAcc});
 aggre([], false, Acc) ->
     Acc;
-aggre([], true, Acc) ->
-    lists:usort(Acc).
+aggre([], true, {Acc, ExtAcc}) ->
+    {lists:usort(Acc), lists:usort(ExtAcc)}.
+
+maybe_add_ext_routes([] = _ExtRoutes, Routes, _Msg) ->
+    Routes;
+maybe_add_ext_routes(ExtRoutes, Routes, Msg) ->
+    case emqx_external_broker:should_route_to_external_dests(Msg) of
+        true -> Routes ++ ExtRoutes;
+        false -> Routes
+    end.
 
 %% @doc Forward message to another node.
 -spec forward(
@@ -643,19 +668,27 @@ maybe_delete_route(Topic) ->
 
 sync_route(Action, Topic, ReplyTo) ->
     EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
-    case EnabledOn of
-        all ->
-            push_sync_route(Action, Topic, ReplyTo);
-        none ->
-            regular_sync_route(Action, Topic);
-        Role ->
-            case Role =:= mria_config:whoami() of
-                true ->
-                    push_sync_route(Action, Topic, ReplyTo);
-                false ->
-                    regular_sync_route(Action, Topic)
-            end
-    end.
+    Res =
+        case EnabledOn of
+            all ->
+                push_sync_route(Action, Topic, ReplyTo);
+            none ->
+                regular_sync_route(Action, Topic);
+            Role ->
+                case Role =:= mria_config:whoami() of
+                    true ->
+                        push_sync_route(Action, Topic, ReplyTo);
+                    false ->
+                        regular_sync_route(Action, Topic)
+                end
+        end,
+    _ = external_sync_route(Action, Topic),
+    Res.
+
+external_sync_route(add, Topic) ->
+    emqx_external_broker:maybe_add_route(Topic);
+external_sync_route(delete, Topic) ->
+    emqx_external_broker:maybe_delete_route(Topic).
 
 push_sync_route(Action, Topic, Opts) ->
     emqx_router_syncer:push(Action, Topic, node(), Opts).

+ 117 - 0
apps/emqx/src/emqx_external_broker.erl

@@ -0,0 +1,117 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_external_broker).
+
+-callback forward(emqx_router:external_dest(), emqx_types:delivery()) ->
+    emqx_types:deliver_result().
+
+-callback should_route_to_external_dests(emqx_types:message()) -> boolean().
+
+-callback maybe_add_route(emqx_types:topic()) -> ok.
+-callback maybe_delete_route(emqx_types:topic()) -> ok.
+
+-export([
+    provider/0,
+    register_provider/1,
+    unregister_provider/1,
+    forward/2,
+    should_route_to_external_dests/1,
+    maybe_add_route/1,
+    maybe_delete_route/1
+]).
+
+-include("logger.hrl").
+
+-define(PROVIDER, {?MODULE, external_broker}).
+
+-define(safe_with_provider(IfRegistered, IfNotRegistered),
+    case persistent_term:get(?PROVIDER, undefined) of
+        undefined ->
+            IfNotRegistered;
+        Provider ->
+            try
+                Provider:IfRegistered
+            catch
+                Err:Reason:St ->
+                    ?SLOG(error, #{
+                        msg => "external_broker_crashed",
+                        provider => Provider,
+                        callback => ?FUNCTION_NAME,
+                        stacktrace => St,
+                        error => Err,
+                        reason => Reason
+                    }),
+                    {error, Reason}
+            end
+    end
+).
+
+%% TODO: provider API copied from emqx_external_traces,
+%% but it can be moved to a common module.
+
+%%--------------------------------------------------------------------
+%% Provider API
+%%--------------------------------------------------------------------
+
+-spec register_provider(module()) -> ok | {error, term()}.
+register_provider(Module) when is_atom(Module) ->
+    case is_valid_provider(Module) of
+        true ->
+            persistent_term:put(?PROVIDER, Module);
+        false ->
+            {error, invalid_provider}
+    end.
+
+-spec unregister_provider(module()) -> ok | {error, term()}.
+unregister_provider(Module) ->
+    case persistent_term:get(?PROVIDER, undefined) of
+        Module ->
+            persistent_term:erase(?PROVIDER),
+            ok;
+        _ ->
+            {error, not_registered}
+    end.
+
+-spec provider() -> module() | undefined.
+provider() ->
+    persistent_term:get(?PROVIDER, undefined).
+
+%%--------------------------------------------------------------------
+%% Broker API
+%%--------------------------------------------------------------------
+
+forward(ExternalDest, Delivery) ->
+    ?safe_with_provider(?FUNCTION_NAME(ExternalDest, Delivery), {error, unknown_dest}).
+
+should_route_to_external_dests(Message) ->
+    ?safe_with_provider(?FUNCTION_NAME(Message), false).
+
+maybe_add_route(Topic) ->
+    ?safe_with_provider(?FUNCTION_NAME(Topic), ok).
+
+maybe_delete_route(Topic) ->
+    ?safe_with_provider(?FUNCTION_NAME(Topic), ok).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+is_valid_provider(Module) ->
+    lists:all(
+        fun({F, A}) -> erlang:function_exported(Module, F, A) end,
+        ?MODULE:behaviour_info(callbacks)
+    ).

+ 31 - 21
apps/emqx/src/emqx_router.erl

@@ -91,11 +91,12 @@
     deinit_schema/0
 ]).
 
--export_type([dest/0]).
+-export_type([dest/0, external_dest/0]).
 -export_type([schemavsn/0]).
 
 -type group() :: binary().
--type dest() :: node() | {group(), node()}.
+-type external_dest() :: {external, term()}.
+-type dest() :: node() | {group(), node()} | external_dest().
 -type schemavsn() :: v1 | v2.
 
 %% Operation :: {add, ...} | {delete, ...}.
@@ -107,7 +108,14 @@
     unused = [] :: nil()
 }).
 
--define(node_patterns(Node), [Node, {'_', Node}]).
+-define(dest_patterns(NodeOrExtDest),
+    case is_atom(NodeOrExtDest) of
+        %% node
+        true -> [NodeOrExtDest, {'_', NodeOrExtDest}];
+        %% external destination
+        false -> [NodeOrExtDest]
+    end
+).
 
 -define(UNSUPPORTED, unsupported).
 
@@ -306,14 +314,14 @@ print_routes(Topic) ->
         match_routes(Topic)
     ).
 
--spec cleanup_routes(node()) -> ok.
-cleanup_routes(Node) ->
-    cleanup_routes(get_schema_vsn(), Node).
+-spec cleanup_routes(node() | external_dest()) -> ok.
+cleanup_routes(NodeOrExtDest) ->
+    cleanup_routes(get_schema_vsn(), NodeOrExtDest).
 
-cleanup_routes(v2, Node) ->
-    cleanup_routes_v2(Node);
-cleanup_routes(v1, Node) ->
-    cleanup_routes_v1(Node).
+cleanup_routes(v2, NodeOrExtDest) ->
+    cleanup_routes_v2(NodeOrExtDest);
+cleanup_routes(v1, NodeOrExtDest) ->
+    cleanup_routes_v1(NodeOrExtDest).
 
 -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
 foldl_routes(FoldFun, AccIn) ->
@@ -430,19 +438,19 @@ has_route_v1(Topic, Dest) ->
 has_route_tab_entry(Topic, Dest) ->
     [] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}).
 
-cleanup_routes_v1(Node) ->
+cleanup_routes_v1(NodeOrExtDest) ->
     ?with_fallback(
         lists:foreach(
             fun(Pattern) ->
                 throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern)))
             end,
-            ?node_patterns(Node)
+            ?dest_patterns(NodeOrExtDest)
         ),
-        cleanup_routes_v1_fallback(Node)
+        cleanup_routes_v1_fallback(NodeOrExtDest)
     ).
 
-cleanup_routes_v1_fallback(Node) ->
-    Patterns = [make_route_rec_pat(P) || P <- ?node_patterns(Node)],
+cleanup_routes_v1_fallback(NodeOrExtDest) ->
+    Patterns = [make_route_rec_pat(P) || P <- ?dest_patterns(NodeOrExtDest)],
     mria:transaction(?ROUTE_SHARD, fun() ->
         [
             mnesia:delete_object(?ROUTE_TAB, Route, write)
@@ -525,7 +533,7 @@ has_route_v2(Topic, Dest) ->
             has_route_tab_entry(Topic, Dest)
     end.
 
-cleanup_routes_v2(Node) ->
+cleanup_routes_v2(NodeOrExtDest) ->
     ?with_fallback(
         lists:foreach(
             fun(Pattern) ->
@@ -537,18 +545,18 @@ cleanup_routes_v2(Node) ->
                 ),
                 throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern)))
             end,
-            ?node_patterns(Node)
+            ?dest_patterns(NodeOrExtDest)
         ),
-        cleanup_routes_v2_fallback(Node)
+        cleanup_routes_v2_fallback(NodeOrExtDest)
     ).
 
-cleanup_routes_v2_fallback(Node) ->
+cleanup_routes_v2_fallback(NodeOrExtDest) ->
     %% NOTE
     %% No point in transaction here because all the operations on filters table are dirty.
     ok = ets:foldl(
         fun(#routeidx{entry = K}, ok) ->
             case get_dest_node(emqx_topic_index:get_id(K)) of
-                Node ->
+                NodeOrExtDest ->
                     mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
                 _ ->
                     ok
@@ -560,7 +568,7 @@ cleanup_routes_v2_fallback(Node) ->
     ok = ets:foldl(
         fun(#route{dest = Dest} = Route, ok) ->
             case get_dest_node(Dest) of
-                Node ->
+                NodeOrExtDest ->
                     mria:dirty_delete_object(?ROUTE_TAB, Route);
                 _ ->
                     ok
@@ -570,6 +578,8 @@ cleanup_routes_v2_fallback(Node) ->
         ?ROUTE_TAB
     ).
 
+get_dest_node({external, _} = ExtDest) ->
+    ExtDest;
 get_dest_node({_, Node}) ->
     Node;
 get_dest_node(Node) ->

+ 43 - 1
apps/emqx/src/emqx_topic.erl

@@ -33,7 +33,8 @@
     feed_var/3,
     systop/1,
     parse/1,
-    parse/2
+    parse/2,
+    intersection/2
 ]).
 
 -export([
@@ -52,6 +53,8 @@
     ((C =:= '#' orelse C =:= <<"#">>) andalso REST =/= [])
 ).
 
+-define(IS_WILDCARD(W), W =:= '+' orelse W =:= '#').
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -98,6 +101,45 @@ match(_, ['#']) ->
 match(_, _) ->
     false.
 
+%% @doc Finds an intersection between two topics, two filters or a topic and a filter.
+%% The function is commutative: reversing parameters doesn't affect the returned value.
+%% Two topics intersect only when they are equal.
+%% The intersection of a topic and a filter is always either the topic itself or false (no intersection).
+%% The intersection of two filters is either false or a new topic filter that would match only those topics,
+%% that can be matched by both input filters.
+%% For example, the intersection of "t/global/#" and "t/+/1/+" is "t/global/1/+".
+-spec intersection(TopicOrFilter, TopicOrFilter) -> TopicOrFilter | false when
+    TopicOrFilter :: emqx_types:topic().
+intersection(Topic1, Topic2) when is_binary(Topic1), is_binary(Topic2) ->
+    case intersection(words(Topic1), words(Topic2), []) of
+        [] -> false;
+        Intersection -> join(lists:reverse(Intersection))
+    end.
+
+intersection(Words1, ['#'], Acc) ->
+    lists:reverse(Words1, Acc);
+intersection(['#'], Words2, Acc) ->
+    lists:reverse(Words2, Acc);
+intersection([W1], ['+'], Acc) ->
+    [W1 | Acc];
+intersection(['+'], [W2], Acc) ->
+    [W2 | Acc];
+intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1), ?IS_WILDCARD(W2) ->
+    intersection(T1, T2, [wildcard_intersection(W1, W2) | Acc]);
+intersection([W | T1], [W | T2], Acc) ->
+    intersection(T1, T2, [W | Acc]);
+intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1) ->
+    intersection(T1, T2, [W2 | Acc]);
+intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W2) ->
+    intersection(T1, T2, [W1 | Acc]);
+intersection([], [], Acc) ->
+    Acc;
+intersection(_, _, _) ->
+    [].
+
+wildcard_intersection(W, W) -> W;
+wildcard_intersection(_, _) -> '+'.
+
 -spec match_share(Name, Filter) -> boolean() when
     Name :: share(),
     Filter :: topic() | share().

+ 7 - 0
apps/emqx/src/emqx_topic_index.erl

@@ -23,6 +23,7 @@
 -export([delete/3]).
 -export([match/2]).
 -export([matches/3]).
+-export([matches_filter/3]).
 
 -export([make_key/2]).
 
@@ -72,6 +73,12 @@ match(Topic, Tab) ->
 matches(Topic, Tab, Opts) ->
     emqx_trie_search:matches(Topic, make_nextf(Tab), Opts).
 
+%% @doc Match given topic filter against the index and return _all_ matches.
+%% If `unique` option is given, return only unique matches by record ID.
+-spec matches_filter(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [match(_ID)].
+matches_filter(TopicFilter, Tab, Opts) ->
+    emqx_trie_search:matches_filter(TopicFilter, make_nextf(Tab), Opts).
+
 %% @doc Extract record ID from the match.
 -spec get_id(match(ID)) -> ID.
 get_id(Key) ->

+ 27 - 3
apps/emqx/src/emqx_trie_search.erl

@@ -99,7 +99,7 @@
 -module(emqx_trie_search).
 
 -export([make_key/2, make_pat/2, filter/1]).
--export([match/2, matches/3, get_id/1, get_topic/1]).
+-export([match/2, matches/3, get_id/1, get_topic/1, matches_filter/3]).
 -export_type([key/1, word/0, words/0, nextf/0, opts/0]).
 
 -define(END, '$end_of_table').
@@ -183,9 +183,20 @@ match(Topic, NextF) ->
 matches(Topic, NextF, Opts) ->
     search(Topic, NextF, Opts).
 
+%% @doc Match given topic filter against the index and return _all_ matches.
+-spec matches_filter(emqx_types:topic(), nextf(), opts()) -> [key(_)].
+matches_filter(TopicFilter, NextF, Opts) ->
+    search(TopicFilter, NextF, [topic_filter | Opts]).
+
 %% @doc Entrypoint of the search for a given topic.
 search(Topic, NextF, Opts) ->
-    Words = topic_words(Topic),
+    %% A private opt
+    IsFilter = proplists:get_bool(topic_filter, Opts),
+    Words =
+        case IsFilter of
+            true -> filter_words(Topic);
+            false -> topic_words(Topic)
+        end,
     Base = base_init(Words),
     ORetFirst = proplists:get_bool(return_first, Opts),
     OUnique = proplists:get_bool(unique, Opts),
@@ -200,8 +211,10 @@ search(Topic, NextF, Opts) ->
         end,
     Matches =
         case search_new(Words, Base, NextF, Acc0) of
-            {Cursor, Acc} ->
+            {Cursor, Acc} when not IsFilter ->
                 match_topics(Topic, Cursor, NextF, Acc);
+            {_Cursor, Acc} ->
+                Acc;
             Acc ->
                 Acc
         end,
@@ -275,6 +288,17 @@ compare(['#'], _Words, _) ->
     % Closest possible next entries that we must not miss:
     % * a/+/+/d/# (same topic but a different ID)
     match_full;
+%% Filter search %%
+compare(_Filter, ['#'], _) ->
+    match_full;
+compare([_ | TF], ['+' | TW], Pos) ->
+    case compare(TF, TW, Pos + 1) of
+        lower ->
+            lower;
+        Other ->
+            Other
+    end;
+%% Filter search end %%
 compare(['+' | TF], [HW | TW], Pos) ->
     case compare(TF, TW, Pos + 1) of
         lower ->

+ 1 - 0
apps/emqx/src/emqx_types.erl

@@ -267,6 +267,7 @@
     [
         {node(), topic(), deliver_result()}
         | {share, topic(), deliver_result()}
+        | {emqx_router:external_dest(), topic(), deliver_result()}
         | persisted
     ]
     | disconnect.

+ 94 - 0
apps/emqx_cluster_link/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2024
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2028-04-17
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 10 - 0
apps/emqx_cluster_link/include/emqx_cluster_link.hrl

@@ -0,0 +1,10 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-define(TOPIC_PREFIX, "$LINK/cluster/").
+-define(CTRL_TOPIC_PREFIX, ?TOPIC_PREFIX "ctrl/").
+-define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/").
+-define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/").
+
+-define(DEST(FromClusterName), {external, {link, FromClusterName}}).

+ 24 - 0
apps/emqx_cluster_link/src/emqx_cluster_link.app.src

@@ -0,0 +1,24 @@
+%% -*- mode: erlang -*-
+{application, emqx_cluster_link, [
+    {description, "EMQX Cluster Linking"},
+    % strict semver, bump manually!
+    {vsn, "0.1.0"},
+    {modules, []},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        emqtt,
+        ecpool,
+        emqx,
+        emqx_resource
+    ]},
+    {mod, {emqx_cluster_link_app, []}},
+    {env, []},
+    {licenses, ["Business Source License 1.1"]},
+    {maintainers, ["EMQX Team <contact@emqx.io>"]},
+    {links, [
+        {"Homepage", "https://emqx.io/"},
+        {"Github", "https://github.com/emqx/emqx"}
+    ]}
+]}.

+ 155 - 0
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -0,0 +1,155 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link).
+
+-behaviour(emqx_external_broker).
+
+-export([
+    register_external_broker/0,
+    unregister_external_broker/0,
+    maybe_add_route/1,
+    maybe_delete_route/1,
+    forward/2,
+    should_route_to_external_dests/1
+]).
+
+%% emqx hooks
+-export([
+    put_hook/0,
+    delete_hook/0,
+    on_message_publish/1
+]).
+
+-include("emqx_cluster_link.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%%--------------------------------------------------------------------
+%% emqx_external_broker API
+%%--------------------------------------------------------------------
+
+register_external_broker() ->
+    emqx_external_broker:register_provider(?MODULE).
+
+unregister_external_broker() ->
+    emqx_external_broker:unregister_provider(?MODULE).
+
+maybe_add_route(Topic) ->
+    emqx_cluster_link_coordinator:route_op(<<"add">>, Topic).
+
+maybe_delete_route(_Topic) ->
+    %% Not implemented yet
+    %% emqx_cluster_link_coordinator:route_op(<<"delete">>, Topic).
+    ok.
+
+forward(ExternalDest, Delivery) ->
+    emqx_cluster_link_mqtt:forward(ExternalDest, Delivery).
+
+%% Do not forward any external messages to other links.
+%% Only forward locally originated messages to all the relevant links, i.e. no gossip message forwarding.
+should_route_to_external_dests(#message{extra = #{link_origin := _}}) ->
+    false;
+should_route_to_external_dests(_Msg) ->
+    true.
+
+%%--------------------------------------------------------------------
+%% EMQX Hooks
+%%--------------------------------------------------------------------
+
+on_message_publish(#message{topic = <<?ROUTE_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
+    _ =
+        case emqx_cluster_link_mqtt:decode_route_op(Payload) of
+            {add, Topics} when is_list(Topics) ->
+                add_routes(Topics, ClusterName);
+            {add, Topic} ->
+                emqx_router_syncer:push(add, Topic, ?DEST(ClusterName), #{});
+            {delete, _} ->
+                %% Not implemented yet
+                ok;
+            cleanup_routes ->
+                cleanup_routes(ClusterName)
+        end,
+    {stop, []};
+on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
+    case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of
+        #message{} = ForwardedMsg ->
+            {stop, with_sender_name(ForwardedMsg, ClusterName)};
+        _Err ->
+            %% Just ignore it. It must be already logged by the decoder
+            {stop, []}
+    end;
+on_message_publish(
+    #message{topic = <<?CTRL_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload} = Msg
+) ->
+    case emqx_cluster_link_mqtt:decode_ctrl_msg(Payload, ClusterName) of
+        {init_link, InitRes} ->
+            on_init(InitRes, ClusterName, Msg);
+        {ack_link, Res} ->
+            on_init_ack(Res, ClusterName, Msg);
+        unlink ->
+            %% Stop pushing messages to the cluster that requested unlink,
+            %% It brings the link to a half-closed (unidirectional) state,
+            %% as this cluster may still replicate routes and receive messages from ClusterName.
+            emqx_cluster_link_mqtt:stop_msg_fwd_resource(ClusterName),
+            cleanup_routes(ClusterName)
+    end,
+    {stop, []};
+on_message_publish(_Msg) ->
+    ok.
+
+put_hook() ->
+    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_SYS_MSGS).
+
+delete_hook() ->
+    emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+cleanup_routes(ClusterName) ->
+    emqx_router:cleanup_routes(?DEST(ClusterName)).
+
+lookup_link_conf(ClusterName) ->
+    lists:search(
+        fun(#{upstream := N}) -> N =:= ClusterName end,
+        emqx:get_config([cluster, links], [])
+    ).
+
+on_init(Res, ClusterName, Msg) ->
+    #{
+        'Correlation-Data' := ReqId,
+        'Response-Topic' := RespTopic
+    } = emqx_message:get_header(properties, Msg),
+    case lookup_link_conf(ClusterName) of
+        {value, LinkConf} ->
+            _ = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
+            emqx_cluster_link_mqtt:ack_link(ClusterName, Res, RespTopic, ReqId);
+        false ->
+            ?SLOG(error, #{
+                msg => "init_link_request_from_unknown_cluster",
+                link_name => ClusterName
+            }),
+            %% Cannot ack/reply since we don't know how to reach the link cluster,
+            %% The cluster that tried to initiatw this link is expected to eventually fail with timeout.
+            ok
+    end.
+
+on_init_ack(Res, ClusterName, Msg) ->
+    #{'Correlation-Data' := ReqId} = emqx_message:get_header(properties, Msg),
+    emqx_cluster_link_coordinator:on_link_ack(ClusterName, ReqId, Res).
+
+add_routes(Topics, ClusterName) ->
+    lists:foreach(
+        fun(T) -> emqx_router_syncer:push(add, T, ?DEST(ClusterName), #{}) end,
+        Topics
+    ).
+
+%% let it crash if extra is not a map,
+%% we don't expect the message to be forwarded from an older EMQX release,
+%% that doesn't set extra = #{} by default.
+with_sender_name(#message{extra = Extra} = Msg, ClusterName) when is_map(Extra) ->
+    Msg#message{extra = Extra#{link_origin => ClusterName}}.

+ 61 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_app.erl

@@ -0,0 +1,61 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_app).
+
+-behaviour(application).
+
+-export([start/2, prep_stop/1, stop/1]).
+
+-define(BROKER_MOD, emqx_cluster_link).
+
+start(_StartType, _StartArgs) ->
+    emqx_cluster_link_config:add_handler(),
+    LinksConf = enabled_links(),
+    _ =
+        case LinksConf of
+            [_ | _] ->
+                ok = emqx_cluster_link:register_external_broker(),
+                ok = emqx_cluster_link:put_hook(),
+                ok = start_msg_fwd_resources(LinksConf);
+            _ ->
+                ok
+        end,
+    emqx_cluster_link_sup:start_link(LinksConf).
+
+prep_stop(State) ->
+    emqx_cluster_link_config:remove_handler(),
+    State.
+
+stop(_State) ->
+    _ = emqx_cluster_link:delete_hook(),
+    _ = emqx_cluster_link:unregister_external_broker(),
+    _ = stop_msg_fwd_resources(emqx:get_config([cluster, links], [])),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+enabled_links() ->
+    lists:filter(
+        fun(#{enable := IsEnabled}) -> IsEnabled =:= true end,
+        emqx:get_config([cluster, links], [])
+    ).
+
+start_msg_fwd_resources(LinksConf) ->
+    lists:foreach(
+        fun(LinkConf) ->
+            {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf)
+        end,
+        LinksConf
+    ).
+
+stop_msg_fwd_resources(LinksConf) ->
+    lists:foreach(
+        fun(#{upstream := Name}) ->
+            emqx_cluster_link_mqtt:stop_msg_fwd_resource(Name)
+        end,
+        LinksConf
+    ).

+ 162 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -0,0 +1,162 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_config).
+
+-behaviour(emqx_config_handler).
+
+-include_lib("emqx/include/logger.hrl").
+
+-define(LINKS_PATH, [cluster, links]).
+-define(CERTS_PATH(LinkName), filename:join(["cluster", "links", LinkName])).
+
+-export([
+    add_handler/0,
+    remove_handler/0
+]).
+
+-export([
+    pre_config_update/3,
+    post_config_update/5
+]).
+
+add_handler() ->
+    ok = emqx_config_handler:add_handler(?LINKS_PATH, ?MODULE).
+
+remove_handler() ->
+    ok = emqx_config_handler:remove_handler(?LINKS_PATH).
+
+pre_config_update(?LINKS_PATH, RawConf, RawConf) ->
+    {ok, RawConf};
+pre_config_update(?LINKS_PATH, NewRawConf, _RawConf) ->
+    {ok, convert_certs(NewRawConf)}.
+
+post_config_update(?LINKS_PATH, _Req, Old, Old, _AppEnvs) ->
+    ok;
+post_config_update(?LINKS_PATH, _Req, New, Old, _AppEnvs) ->
+    ok = maybe_toggle_hook_and_provider(New),
+    #{
+        removed := Removed,
+        added := Added,
+        changed := Changed
+    } = emqx_utils:diff_lists(New, Old, fun upstream_name/1),
+    RemovedRes = remove_links(Removed),
+    AddedRes = add_links(Added),
+    UpdatedRes = update_links(Changed),
+    IsAllOk = all_ok(RemovedRes) andalso all_ok(AddedRes) andalso all_ok(UpdatedRes),
+    case IsAllOk of
+        true ->
+            ok;
+        false ->
+            {error, #{added => AddedRes, removed => RemovedRes, updated => UpdatedRes}}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+maybe_toggle_hook_and_provider(LinksConf) ->
+    case is_any_enabled(LinksConf) of
+        true ->
+            ok = emqx_cluster_link:register_external_broker(),
+            ok = emqx_cluster_link:put_hook();
+        false ->
+            _ = emqx_cluster_link:delete_hook(),
+            _ = emqx_cluster_link:unregister_external_broker(),
+            ok
+    end.
+
+is_any_enabled(LinksConf) ->
+    lists:any(
+        fun(#{enable := IsEnabled}) -> IsEnabled =:= true end,
+        LinksConf
+    ).
+
+all_ok(Results) ->
+    lists:all(
+        fun
+            (ok) -> true;
+            ({ok, _}) -> true;
+            (_) -> false
+        end,
+        Results
+    ).
+
+add_links(LinksConf) ->
+    [add_link(Link) || Link <- LinksConf].
+
+add_link(#{enabled := true} = LinkConf) ->
+    %% NOTE: this can be started later during init_link phase, but it looks not harmful to start it beforehand...
+    MsgFwdRes = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf),
+    CoordRes = ensure_coordinator(LinkConf),
+    combine_results(CoordRes, MsgFwdRes);
+add_link(_DisabledLinkConf) ->
+    ok.
+
+remove_links(LinksConf) ->
+    [remove_link(Link) || Link <- LinksConf].
+
+remove_link(LinkConf) ->
+    emqx_cluster_link_coord_sup:stop_coordinator(LinkConf).
+
+update_links(LinksConf) ->
+    [update_link(Link) || Link <- LinksConf].
+
+%% TODO: do some updates without restart (at least without coordinator restart and re-election)
+update_link(#{enabled := true} = LinkConf) ->
+    _ = remove_link(LinkConf),
+    add_link(LinkConf);
+update_link(#{enabled := false} = LinkConf) ->
+    case remove_link(LinkConf) of
+        {error, not_found} -> ok;
+        Other -> Other
+    end.
+
+ensure_coordinator(LinkConf) ->
+    case emqx_cluster_link_coord_sup:start_coordinator(LinkConf) of
+        {error, {already_started, Pid}} ->
+            {ok, Pid};
+        {error, already_present} ->
+            emqx_cluster_link_coord_sup:restart_coordinator(LinkConf)
+    end.
+
+combine_results(ok, ok) ->
+    ok;
+combine_results(CoordRes, MsgFwdRes) ->
+    {error, #{coordinator => CoordRes, msg_fwd_resource => MsgFwdRes}}.
+
+upstream_name(#{upstream := N}) -> N;
+upstream_name(#{<<"upstream">> := N}) -> N.
+
+convert_certs(LinksConf) ->
+    lists:map(
+        fun
+            (#{ssl := SSLOpts} = LinkConf) ->
+                LinkConf#{ssl => do_convert_certs(upstream_name(LinkConf), SSLOpts)};
+            (#{<<"ssl">> := SSLOpts} = LinkConf) ->
+                LinkConf#{<<"ssl">> => do_convert_certs(upstream_name(LinkConf), SSLOpts)};
+            (LinkConf) ->
+                LinkConf
+        end,
+        LinksConf
+    ).
+
+do_convert_certs(LinkName, SSLOpts) ->
+    case emqx_tls_lib:ensure_ssl_files(?CERTS_PATH(LinkName), SSLOpts) of
+        {ok, undefined} ->
+            SSLOpts;
+        {ok, SSLOpts1} ->
+            SSLOpts1;
+        {error, Reason} ->
+            ?SLOG(
+                error,
+                #{
+                    msg => "bad_ssl_config",
+                    config_path => ?LINKS_PATH,
+                    name => LinkName,
+                    reason => Reason
+                }
+            ),
+            throw({bad_ssl_config, Reason})
+    end.

+ 57 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_coord_sup.erl

@@ -0,0 +1,57 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_coord_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/1]).
+-export([init/1]).
+
+-export([
+    start_coordinator/1,
+    restart_coordinator/1,
+    stop_coordinator/1
+]).
+
+-define(SERVER, ?MODULE).
+-define(COORDINATOR_MOD, emqx_cluster_link_coordinator).
+
+start_link(LinksConf) ->
+    supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf).
+
+init(LinksConf) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 5
+    },
+    {ok, {SupFlags, children(LinksConf)}}.
+
+start_coordinator(#{upstream := Name} = LinkConf) ->
+    supervisor:start_child(?SERVER, worker_spec(Name, LinkConf)).
+
+restart_coordinator(#{upstream := Name} = _LinkConf) ->
+    supervisor:restart_child(?SERVER, Name).
+
+stop_coordinator(#{upstream := Name} = _LinkConf) ->
+    case supervisor:terminate_child(?SERVER, Name) of
+        ok ->
+            supervisor:delete_child(?SERVER, Name);
+        Err ->
+            Err
+    end.
+
+worker_spec(Id, LinkConf) ->
+    #{
+        id => Id,
+        start => {?COORDINATOR_MOD, start_link, [LinkConf]},
+        restart => permanent,
+        shutdown => 5000,
+        type => worker,
+        modules => [?COORDINATOR_MOD]
+    }.
+
+children(LinksConf) ->
+    [worker_spec(Name, Conf) || #{upstream := Name, enable := true} = Conf <- LinksConf].

+ 454 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_coordinator.erl

@@ -0,0 +1,454 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+%% @doc experimental prototype implementation.
+%% The idea is to add a sync point for all cluster route operations,
+%% so that, routes can be batched/shrunk (via using emqx_route_syncer) before pushing them to linked clusters.
+%% The expected result is reduced communication between linked clusters:
+%% each nodes communicates with other clusters through coordinator.
+%% The drawbacks are numerous though:
+%%   - complexity/leader elections,
+%%   - routes removal seems hard to implement unless remote cluster routes as stored per node,
+%%     in that case global coordinator per cluster is not needed any more. - TBD
+-module(emqx_cluster_link_coordinator).
+
+-behaviour(gen_statem).
+
+%% API
+-export([
+    route_op/2,
+    on_link_ack/3
+]).
+
+-export([start_link/1]).
+
+%% gen_statem
+-export([
+    callback_mode/0,
+    init/1,
+    terminate/3
+]).
+
+%% gen_statem state functions
+-export([
+    wait_for_coordinator/3,
+    connecting/3,
+    init_linking/3,
+    bootstrapping/3,
+    coordinating/3,
+    following/3
+]).
+
+-export([select_routes/1]).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-define(COORDINATOR(UpstreamName), {?MODULE, UpstreamName}).
+-define(SERVER, ?MODULE).
+-define(WAIT_COORD_RETRY_INTERVAL, 100).
+-define(CONN_RETRY_INTERVAL, 5000).
+-define(INIT_LINK_RESP_TIMEOUT, 15_000).
+-define(INIT_LINK_RETRIES, 5).
+-define(UPSTREAM_DEST, {external, {link, _}}).
+-define(IS_ROUTE_OP(Op), Op =:= <<"add">>; Op =:= <<"delete">>).
+
+start_link(Conf) ->
+    gen_statem:start_link(?MODULE, Conf, []).
+
+route_op(Op, Topic) ->
+    lists:foreach(
+        fun(#{upstream := UpstreamName, topics := LinkFilters}) ->
+            case topic_intersect_any(Topic, LinkFilters) of
+                false -> ok;
+                TopicOrFilter -> maybe_cast(UpstreamName, {Op, TopicOrFilter})
+            end
+        end,
+        emqx:get_config([cluster, links])
+    ).
+
+on_link_ack(ClusterName, ReqId, Res) ->
+    maybe_cast(ClusterName, {ack_link, ClusterName, ReqId, Res}).
+
+callback_mode() ->
+    [state_functions, state_enter].
+
+init(LinkConf) ->
+    process_flag(trap_exit, true),
+    %% It helps to avoid unnecessary global name conflicts (and, as a result, coordinator re-election),
+    %% e.g. when a down nodes comes back
+    %% TODO: need to better understand `global` behaviour
+    _ = global:sync(),
+    Data = #{is_coordinator => false, link_conf => LinkConf},
+    {ok, wait_for_coordinator, Data}.
+
+wait_for_coordinator(enter, _OldState, _Data) ->
+    {keep_state_and_data, [{state_timeout, 0, do_wait_for_coordinator}]};
+wait_for_coordinator(_, do_wait_for_coordinator, Data) ->
+    #{link_conf := #{upstream := Name}} = Data,
+    case global:whereis_name(?COORDINATOR(Name)) of
+        undefined ->
+            case register_coordinator(Name) of
+                yes ->
+                    {next_state, connecting, Data#{is_coordinator => true}};
+                no ->
+                    %% TODO: this should not happen forever, if it does, we need to detect it
+                    {keep_state_and_data, [
+                        {state_timeout, ?WAIT_COORD_RETRY_INTERVAL, do_wait_for_coordinator}
+                    ]}
+            end;
+        %% Can be a prev stale pid?
+        %% Let it crash with case_clause if it happens...
+        Pid when is_pid(Pid) andalso Pid =/= self() ->
+            Data1 = Data#{coordinator_mon => erlang:monitor(process, Pid), coordinator_pid => Pid},
+            {next_state, following, Data1}
+    end;
+wait_for_coordinator(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
+    %% Ignore any route op, until bootstrapping is started.
+    %% All ignored route ops are expected to be caught up during the bootstrap.
+    keep_state_and_data;
+wait_for_coordinator(EventType, Event, Data) ->
+    handle_event_(?FUNCTION_NAME, EventType, Event, Data).
+
+connecting(enter, _OldState, _Data) ->
+    {keep_state_and_data, [{state_timeout, 0, reconnect}]};
+connecting(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
+    %% Ignore any route op, until bootstrapping is started.
+    %% All ignored route ops are expected to be caught up during the bootstrap.
+    keep_state_and_data;
+connecting(_EventType, reconnect, Data) ->
+    ensure_conn_pool(init_linking, Data);
+connecting(EventType, Event, Data) ->
+    handle_event_(?FUNCTION_NAME, EventType, Event, Data).
+
+init_linking(enter, _OldState, Data) ->
+    {keep_state, Data#{link_retries => ?INIT_LINK_RETRIES}, [{state_timeout, 0, init_link}]};
+init_linking(cast, {ack_link, _ClusterName, ReqId, Res}, #{link_req_id := ReqId} = Data) ->
+    case Res of
+        %% This state machine is not suitable to bootstrap the upstream cluster conditionally,
+        %% since it ignores any route ops received before bootstrapping...
+        {ok, #{proto_ver := _, need_bootstrap := _}} ->
+            {next_state, bootstrapping, maps:without([link_req_id, link_retries], Data)};
+        {error, <<"bad_upstream_name">>} ->
+            %% unrecoverable error that needs a user intervention,
+            %% TODO: maybe need to transition to some error state
+            {keep_state, maps:without([link_req_id, link_retries], Data), [{state_timeout, cancel}]}
+    end;
+init_linking(_, init_link, #{link_conf := #{upstream := Name}, link_retries := Retries} = Data) ->
+    case Retries > 0 of
+        true ->
+            {ReqId, {ok, _}} = emqx_cluster_link_mqtt:init_link(Name),
+            Data1 = Data#{link_req_id => ReqId, link_retries => Retries - 1},
+            {keep_state, Data1, [{state_timeout, ?INIT_LINK_RESP_TIMEOUT, init_link}]};
+        false ->
+            ?SLOG(error, #{
+                msg => "no_link_ack_response_received",
+                link_name => Name
+            }),
+            %% unrecoverable error that needs a user intervention,
+            %% TODO: maybe need to transition to some error state
+            keep_state_and_data
+    end;
+init_linking(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
+    %% Ignore any route op, until bootstrapping is started.
+    %% All ignored route ops are expected to be caught up during the bootstrap.
+    keep_state_and_data;
+init_linking(EventType, Event, Data) ->
+    handle_event_(?FUNCTION_NAME, EventType, Event, Data).
+
+bootstrapping(enter, _OldState, #{link_conf := LinkConf} = Data) ->
+    #{topics := LinkFilters, upstream := ClusterName} = LinkConf,
+    %% TODO add timeout?
+    {Pid, Ref} = erlang:spawn_monitor(fun() -> bootstrap(ClusterName, LinkFilters) end),
+    {keep_state, Data#{bootstrap_pid => Pid, bootstrap_ref => Ref}};
+bootstrapping(info, {'DOWN', Ref, process, _Pid, Reason}, #{bootstrap_ref := Ref} = Data) ->
+    %% TODO: think about the best way to proceed if bootstrapping failed,
+    %% perhaps just transition back to connecting state?
+    normal = Reason,
+    Data1 = maps:without([bootstrap_ref, bootstrap_pid], Data),
+    {next_state, coordinating, Data1};
+%% Accumulate new route ops, since there is no guarantee
+%% they will be included in the bootstrapped data
+bootstrapping(cast, {Op, _Topic}, _Data) when ?IS_ROUTE_OP(Op) ->
+    {keep_state_and_data, [postpone]};
+bootstrapping(EventType, Event, Data) ->
+    handle_event_(?FUNCTION_NAME, EventType, Event, Data).
+
+coordinating(enter, _OldState, _Data) ->
+    keep_state_and_data;
+coordinating(cast, {Op, Topic}, Data) when ?IS_ROUTE_OP(Op) ->
+    #{link_conf := #{upstream := ClusterName}} = Data,
+    %% TODO: batching
+    case emqx_cluster_link_mqtt:publish_route_op(async, ClusterName, Op, Topic) of
+        {error, _} ->
+            %% Conn pool error, reconnect.
+            {next_state, connecting, stop_conn_pool(Data)};
+        _Ref ->
+            keep_state_and_data
+    end;
+%% TODO: this can also be received in other states, move to generic handler?
+coordinating(info, {global_name_conflict, CoordName}, Data) ->
+    LogData = #{
+        msg => "emqx_cluster_link_coordinator_name_conflict",
+        coordinator_name => CoordName
+    },
+    LogData1 =
+        %% TODO: this can be a previous (self) coordinator?
+        case global:whereis_name(CoordName) of
+            undefined -> LogData;
+            Pid -> LogData#{new_coordinator => Pid, coordinator_node => node(Pid)}
+        end,
+    ?SLOG(warning, LogData1),
+    Data1 = stop_conn_pool(Data),
+    {next_state, wait_for_coordinator, Data1#{is_coordinator => false}};
+%% only errors results are expected
+%% TODO: a single error causes reconnection and re-bootstrapping,
+%% it's worth considering some optimizations.
+coordinating(info, {pub_result, _Ref, {error, Reason}}, #{link_conf := #{upstream := Name}} = Data) ->
+    ?SLOG(error, #{
+        msg => "failed_to_replicate_route_op_to_linked_cluster",
+        link_name => Name,
+        reason => Reason
+    }),
+    %% TODO: check errors, some may be not possible to correct by re-connecting
+    Data1 = stop_conn_pool(Data),
+    {next_state, connecting, Data1};
+coordinating(EventType, Event, Data) ->
+    handle_event_(?FUNCTION_NAME, EventType, Event, Data).
+
+following(enter, _OldState, _Data) ->
+    keep_state_and_data;
+following(info, {'DOWN', MRef, process, _Pid, _Info}, #{coordinator_mon := MRef} = Data) ->
+    {next_state, wait_for_coordinator, maps:without([coordinator_mon, coordinator_pid], Data)};
+following(EventType, Event, Data) ->
+    handle_event_(?FUNCTION_NAME, EventType, Event, Data).
+
+handle_event_(_State, info, {'DOWN', Ref, process, _Pid, Reason}, Data) ->
+    case Data of
+        #{conn_pool_mons := #{Ref := WorkerName}, is_coordinator := true} ->
+            ?SLOG(warning, #{
+                msg => "cluster_link_route_connection_is_down",
+                reason => Reason,
+                worker => WorkerName
+            }),
+            {next_state, connecting, stop_conn_pool(Data)};
+        _ ->
+            %% Must be a stale 'DOWN' msg (e.g., from the next worker) which is already handled.
+            keep_state_and_data
+    end;
+handle_event_(State, EventType, Event, Data) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_event",
+        event => Event,
+        event_type => EventType,
+        state => State,
+        data => Data
+    }),
+    keep_state_and_data.
+
+terminate(Reason, _State, #{link_conf := #{upstream := ClusterName}} = Data) ->
+    %% TODO unregister coordinator?
+    IsCoordinator = maps:get(is_coordinator, Data, false),
+    case Reason of
+        shutdown when IsCoordinator ->
+            %% must be sync, since we are going to stop the pool
+            %% NOTE: there is no guarantee that unlink op will arrive the last one
+            %% (since there may be other route op sent over another pool worker)
+            %% and clear everything, but it must be good enough to GC most of the routes.
+            _ = emqx_cluster_link_mqtt:remove_link(ClusterName);
+        _ ->
+            ok
+    end,
+    _ = stop_conn_pool(Data),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+topic_intersect_any(Topic, [LinkFilter | T]) ->
+    case emqx_topic:intersection(Topic, LinkFilter) of
+        false -> topic_intersect_any(Topic, T);
+        TopicOrFilter -> TopicOrFilter
+    end;
+topic_intersect_any(_Topic, []) ->
+    false.
+
+bootstrap(ClusterName, LinkFilters) ->
+    %% TODO: do this in chunks
+    Topics = select_routes(LinkFilters),
+    {ok, _} = emqx_cluster_link_mqtt:publish_routes(sync, ClusterName, Topics).
+
+%% TODO: if a local route matches link filter exactly,
+%% it's enough to only select this matching filter itself and skip any other routes?
+%% E.g., local routes: "t/global/#", "t/global/1/+", clsuter link topics = ["t/global/#"],
+%% it's enough to replicate "t/global/#" only to the linked cluster.
+%% What to do when "t/global/#" subscriber unsubscribers
+%% and we start to get forwarded messages (e.g. "t/global/2/3") matching no subscribers?
+%% How can we efficiently replace "t/global/#" route with "t/global/1/+"
+%% (intersection of "t/global/#" and "t/global/#")?
+%% So maybe better not to do it at all and replicate both "t/global/1/+" and "t/global/#" ?
+select_routes(LinkFilters) ->
+    {Wildcards, Topics} = lists:partition(fun emqx_topic:wildcard/1, LinkFilters),
+    Routes = select_routes_by_topics(Topics),
+    Routes1 = intersecting_routes(Wildcards),
+    AllRoutes = Routes ++ Routes1,
+    case emqx_router:get_schema_vsn() of
+        v1 -> AllRoutes;
+        %% v2 stores filters (Wildcard subscriptions routes) in a separate index,
+        %% so WildcardRoutes contains only non-wildcard routes matching wildcard link filters.
+        %% Thus, we need to select wildcard routes additionally
+        v2 -> intersecting_routes_v2(Wildcards) ++ AllRoutes
+    end.
+
+select_routes_by_topics([]) ->
+    [];
+select_routes_by_topics([Topic | T]) ->
+    case filter_out_upstream_routes(emqx_router:match_routes(Topic)) of
+        [_ | _] ->
+            %% These are non-wildcard link topics, so we don't care about actual
+            %% routes as long as they are matched, and just need to replicate
+            %% topic routes to the linked cluster
+            [Topic | select_routes_by_topics(T)];
+        _ ->
+            select_routes_by_topics(T)
+    end.
+
+filter_out_upstream_routes(Routes) ->
+    lists:filter(
+        fun
+            (#route{dest = ?UPSTREAM_DEST}) -> false;
+            (_) -> true
+        end,
+        Routes
+    ).
+
+%% selects only non-wildcard routes that match wildcards (filters),
+%% can only be done as a linear search over all routes
+intersecting_routes([]) ->
+    [];
+intersecting_routes(Wildcards) ->
+    Res = ets:foldl(
+        fun
+            (#route{dest = ?UPSTREAM_DEST}, Acc) ->
+                Acc;
+            (#route{topic = T}, Acc) ->
+                %% TODO: probably nice to validate cluster link topic filters
+                %% to have no intersections between each other?
+                case topic_intersect_any(T, Wildcards) of
+                    false -> Acc;
+                    Intersection -> Acc#{Intersection => undefined}
+                end
+        end,
+        #{},
+        ?ROUTE_TAB
+    ),
+    maps:keys(Res).
+
+intersecting_routes_v2([]) ->
+    [];
+intersecting_routes_v2(Wildcards) ->
+    lists:foldl(
+        fun(Wildcard, Acc) ->
+            MatchedFilters = matched_filters_v2(Wildcard),
+            all_intersections(Wildcard, MatchedFilters, Acc)
+        end,
+        [],
+        Wildcards
+    ).
+
+matched_filters_v2(Wildcard) ->
+    MatchesAcc = lists:foldl(
+        fun(M, Acc) ->
+            case emqx_topic_index:get_id(M) of
+                ?UPSTREAM_DEST ->
+                    Acc;
+                _ ->
+                    Acc#{emqx_topic_index:get_topic(M) => undefined}
+            end
+        end,
+        #{},
+        emqx_topic_index:matches_filter(Wildcard, ?ROUTE_TAB_FILTERS, [])
+    ),
+    maps:keys(MatchesAcc).
+
+all_intersections(Wildcard, [W | Wildcards], Acc) ->
+    case emqx_topic:intersection(Wildcard, W) of
+        false -> all_intersections(Wildcard, Wildcards, Acc);
+        Intersection -> all_intersections(Wildcard, Wildcards, [Intersection | Acc])
+    end;
+all_intersections(_, [], Acc) ->
+    lists:usort(Acc).
+
+maybe_cast(UpstreamName, Msg) ->
+    case global:whereis_name(?COORDINATOR(UpstreamName)) of
+        Pid when is_pid(Pid) ->
+            gen_statem:cast(Pid, Msg);
+        undefined ->
+            %% Ignore and rely on coordinator bootstrapping once it's elected
+            ok
+    end.
+
+register_coordinator(UpstreamName) ->
+    case mria_config:role() of
+        core ->
+            global:register_name(
+                ?COORDINATOR(UpstreamName), self(), fun global:random_notify_name/3
+            );
+        _ ->
+            no
+    end.
+
+%% connecting state helper
+ensure_conn_pool(NextState, #{link_conf := LinkConf} = Data) ->
+    Res = start_conn_pool(LinkConf),
+    Data1 = Data#{conn_pool => Res},
+    case Res of
+        {ok, _} ->
+            Data2 = Data1#{conn_pool_mons => mon_pool_workers(LinkConf)},
+            {next_state, NextState, Data2};
+        _Err ->
+            {keep_state, Data1, [{state_timeout, ?CONN_RETRY_INTERVAL, reconnect}]}
+    end.
+
+start_conn_pool(LinkConf) ->
+    case emqx_cluster_link_mqtt:start_routing_pool(LinkConf) of
+        {ok, _Pid} = Ok ->
+            Ok;
+        {error, Reason} = Err ->
+            #{upstream := Name} = LinkConf,
+            ?SLOG(error, #{
+                msg => "failed_to_connect_to_linked_cluster",
+                cluster_name => Name,
+                reason => Reason
+            }),
+            Err
+    end.
+
+stop_conn_pool(#{link_conf := #{upstream := Name}} = Data) ->
+    case Data of
+        #{conn_pool := {ok, _}} ->
+            Data1 = maybe_unmointor_workers(Data),
+            Data1#{conn_pool => {stopped, emqx_cluster_link_mqtt:stop_routing_pool(Name)}};
+        _ ->
+            Data
+    end.
+
+maybe_unmointor_workers(#{conn_pool_mons := MonitorsMap} = Data) ->
+    _ = maps:foreach(
+        fun(Mref, _Name) ->
+            erlang:demonitor(Mref)
+        end,
+        MonitorsMap
+    ),
+    maps:remove(conn_pool_mons, Data);
+maybe_unmointor_workers(Data) ->
+    Data.
+
+mon_pool_workers(LinkConf) ->
+    maps:from_list([
+        {erlang:monitor(process, Pid), Name}
+     || {Name, Pid} <- emqx_cluster_link_mqtt:routing_pool_workers(LinkConf)
+    ]).

+ 547 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -0,0 +1,547 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_mqtt).
+
+-include("emqx_cluster_link.hrl").
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%-include_lib("emqtt/include/emqtt.hrl").
+
+-behaviour(emqx_resource).
+-behaviour(ecpool_worker).
+
+%% ecpool
+-export([connect/1]).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_query_async/4,
+    on_get_status/2
+]).
+
+-export([
+    ensure_msg_fwd_resource/1,
+    stop_msg_fwd_resource/1,
+    start_routing_pool/1,
+    stop_routing_pool/1,
+    routing_pool_workers/1,
+    init_link/1,
+    ack_link/4,
+    remove_link/1,
+    publish_route_op/4,
+    publish_routes/3,
+    cleanup_routes/1,
+    decode_ctrl_msg/2,
+    decode_route_op/1,
+    decode_forwarded_msg/1
+]).
+
+-export([
+    forward/2
+]).
+
+-define(ROUTE_CLIENTID_SUFFIX, ":route:").
+-define(MSG_CLIENTID_SUFFIX, ":msg:").
+-define(CLIENTID(Base, Suffix), emqx_bridge_mqtt_lib:clientid_base([Base, Suffix])).
+
+-define(MQTT_HOST_OPTS, #{default_port => 1883}).
+-define(MY_CLUSTER_NAME, atom_to_binary(emqx_config:get([cluster, name]))).
+
+-define(ROUTE_TOPIC, <<?ROUTE_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
+-define(MSG_FWD_TOPIC, <<?MSG_TOPIC_PREFIX, (?MY_CLUSTER_NAME)/binary>>).
+-define(CTRL_TOPIC(ClusterName), <<?CTRL_TOPIC_PREFIX, (ClusterName)/binary>>).
+
+%% ecpool and emqx_resource names
+-define(ROUTE_POOL_PREFIX, "emqx_cluster_link_mqtt:route:").
+-define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:").
+-define(RES_NAME(Prefix, ClusterName), <<Prefix, ClusterName/binary>>).
+-define(ROUTE_POOL_NAME(ClusterName), ?RES_NAME(?ROUTE_POOL_PREFIX, ClusterName)).
+-define(MSG_RES_ID(ClusterName), ?RES_NAME(?MSG_POOL_PREFIX, ClusterName)).
+-define(HEALTH_CHECK_TIMEOUT, 1000).
+-define(RES_GROUP, <<"emqx_cluster_link">>).
+-define(DEFAULT_POOL_KEY, <<"default">>).
+
+%% Protocol
+-define(PROTO_VER, <<"1.0">>).
+-define(INIT_LINK_OP, <<"init_link">>).
+-define(ACK_LINK_OP, <<"ack_link">>).
+-define(UNLINK_OP, <<"unlink">>).
+-define(BATCH_ROUTES_OP, <<"add_routes">>).
+-define(CLEANUP_ROUTES_OP, <<"cleanup_routes">>).
+%% It's worth optimizing non-batch op payload size,
+%% thus it's encoded as a plain binary
+-define(TOPIC_WITH_OP(Op, Topic), <<Op/binary, "_", Topic/binary>>).
+-define(DECODE(Payload), erlang:binary_to_term(Payload, [safe])).
+-define(ENCODE(Payload), erlang:term_to_binary(Payload)).
+
+-define(PUB_TIMEOUT, 10_000).
+
+ensure_msg_fwd_resource(#{upstream := Name, pool_size := PoolSize} = ClusterConf) ->
+    ResConf = #{
+        query_mode => async,
+        start_after_created => true,
+        start_timeout => 5000,
+        health_check_interval => 5000,
+        %% TODO: configure res_buf_worker pool separately?
+        worker_pool_size => PoolSize
+    },
+    emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResConf).
+
+stop_msg_fwd_resource(ClusterName) ->
+    emqx_resource:stop(?MSG_RES_ID(ClusterName)).
+
+%%--------------------------------------------------------------------
+%% emqx_resource callbacks (message forwarding)
+%%--------------------------------------------------------------------
+
+callback_mode() -> async_if_possible.
+
+on_start(ResourceId, #{pool_size := PoolSize} = ClusterConf) ->
+    PoolName = ResourceId,
+    Options = [
+        {name, PoolName},
+        {pool_size, PoolSize},
+        {pool_type, hash},
+        {client_opts, emqtt_client_opts(?MSG_CLIENTID_SUFFIX, ClusterConf)}
+    ],
+    ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName),
+    case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
+        ok ->
+            {ok, #{pool_name => PoolName, topic => ?MSG_FWD_TOPIC}};
+        {error, {start_pool_failed, _, Reason}} ->
+            {error, Reason}
+    end.
+
+on_stop(ResourceId, _State) ->
+    #{pool_name := PoolName} = emqx_resource:get_allocated_resources(ResourceId),
+    emqx_resource_pool:stop(PoolName).
+
+on_query(_ResourceId, FwdMsg, #{pool_name := PoolName, topic := LinkTopic} = _State) when
+    is_record(FwdMsg, message)
+->
+    #message{topic = Topic, qos = QoS} = FwdMsg,
+    handle_send_result(
+        ecpool:pick_and_do(
+            {PoolName, Topic},
+            fun(ConnPid) ->
+                emqtt:publish(ConnPid, LinkTopic, ?ENCODE(FwdMsg), QoS)
+            end,
+            no_handover
+        )
+    );
+on_query(_ResourceId, {Topic, Props, Payload, QoS}, #{pool_name := PoolName} = _State) ->
+    handle_send_result(
+        ecpool:pick_and_do(
+            {PoolName, Topic},
+            fun(ConnPid) ->
+                emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}])
+            end,
+            no_handover
+        )
+    ).
+
+on_query_async(
+    _ResourceId, FwdMsg, CallbackIn, #{pool_name := PoolName, topic := LinkTopic} = _State
+) ->
+    Callback = {fun on_async_result/2, [CallbackIn]},
+    #message{topic = Topic, qos = QoS} = FwdMsg,
+    %% TODO check message ordering, pick by topic,client pair?
+    ecpool:pick_and_do(
+        {PoolName, Topic},
+        fun(ConnPid) ->
+            %% #delivery{} record has no valuable data for a remote link...
+            Payload = ?ENCODE(FwdMsg),
+            %% TODO: check override QOS requirements (if any)
+            emqtt:publish_async(ConnPid, LinkTopic, Payload, QoS, Callback)
+        end,
+        no_handover
+    ).
+
+%% copied from emqx_bridge_mqtt_connector
+
+on_async_result(Callback, Result) ->
+    apply_callback_function(Callback, handle_send_result(Result)).
+
+apply_callback_function(F, Result) when is_function(F) ->
+    erlang:apply(F, [Result]);
+apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
+    erlang:apply(F, A ++ [Result]);
+apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
+    erlang:apply(M, F, A ++ [Result]).
+
+handle_send_result(ok) ->
+    ok;
+handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) ->
+    ok;
+handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) ->
+    ok;
+handle_send_result({ok, Reply}) ->
+    {error, classify_reply(Reply)};
+handle_send_result({error, Reason}) ->
+    {error, classify_error(Reason)}.
+
+classify_reply(Reply = #{reason_code := _}) ->
+    {unrecoverable_error, Reply}.
+
+classify_error(disconnected = Reason) ->
+    {recoverable_error, Reason};
+classify_error(ecpool_empty) ->
+    {recoverable_error, disconnected};
+classify_error({disconnected, _RC, _} = Reason) ->
+    {recoverable_error, Reason};
+classify_error({shutdown, _} = Reason) ->
+    {recoverable_error, Reason};
+classify_error(shutdown = Reason) ->
+    {recoverable_error, Reason};
+classify_error(Reason) ->
+    {unrecoverable_error, Reason}.
+
+%% copied from emqx_bridge_mqtt_connector
+on_get_status(_ResourceId, #{pool_name := PoolName} = _State) ->
+    Workers = [Worker || {_Name, Worker} <- ecpool:workers(PoolName)],
+    try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
+        Statuses ->
+            combine_status(Statuses)
+    catch
+        exit:timeout ->
+            connecting
+    end.
+
+get_status(Worker) ->
+    case ecpool_worker:client(Worker) of
+        {ok, Client} -> status(Client);
+        {error, _} -> disconnected
+    end.
+
+status(Pid) ->
+    try
+        case proplists:get_value(socket, emqtt:info(Pid)) of
+            Socket when Socket /= undefined ->
+                connected;
+            undefined ->
+                connecting
+        end
+    catch
+        exit:{noproc, _} ->
+            disconnected
+    end.
+
+combine_status(Statuses) ->
+    %% NOTE
+    %% Natural order of statuses: [connected, connecting, disconnected]
+    %% * `disconnected` wins over any other status
+    %% * `connecting` wins over `connected`
+    case lists:reverse(lists:usort(Statuses)) of
+        [Status | _] ->
+            Status;
+        [] ->
+            disconnected
+    end.
+
+%%--------------------------------------------------------------------
+%% ecpool
+%%--------------------------------------------------------------------
+
+connect(Options) ->
+    WorkerId = proplists:get_value(ecpool_worker_id, Options),
+    #{clientid := ClientId} = ClientOpts = proplists:get_value(client_opts, Options),
+    ClientId1 = emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId),
+    ClientOpts1 = ClientOpts#{clientid => ClientId1},
+    case emqtt:start_link(ClientOpts1) of
+        {ok, Pid} ->
+            case emqtt:connect(Pid) of
+                {ok, _Props} ->
+                    {ok, Pid};
+                Error ->
+                    Error
+            end;
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "client_start_failed",
+                config => emqx_utils:redact(ClientOpts),
+                reason => Reason
+            }),
+            Error
+    end.
+
+%%--------------------------------------------------------------------
+%% Routing
+%%--------------------------------------------------------------------
+
+routing_pool_workers(#{upstream := ClusterName} = _ClusterConf) ->
+    ecpool:workers(?ROUTE_POOL_NAME(ClusterName)).
+
+start_routing_pool(#{upstream := ClusterName} = ClusterConf) ->
+    start_pool(?ROUTE_POOL_NAME(ClusterName), ?ROUTE_CLIENTID_SUFFIX, ClusterConf).
+
+stop_routing_pool(ClusterName) ->
+    ecpool:stop_sup_pool(?ROUTE_POOL_NAME(ClusterName)).
+
+init_link(ClusterName) ->
+    Payload = #{
+        <<"op">> => ?INIT_LINK_OP,
+        <<"proto_ver">> => ?PROTO_VER,
+        <<"upstream">> => ClusterName,
+        %% TODO: may no need to reserve it as it is a map?
+        <<"extra">> => #{}
+    },
+    ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)),
+    Properties = #{
+        'Response-Topic' => ?CTRL_TOPIC(ClusterName),
+        'Correlation-Data' => ReqId
+    },
+    Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME),
+    {ReqId, publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, Properties, Topic, ?QOS_1)}.
+
+ack_link(ClusterName, Result, RespTopic, ReqId) ->
+    Payload = #{
+        <<"op">> => ?ACK_LINK_OP,
+        %% The links may compare and downgrade/adjust protocol in future
+        <<"proto_ver">> => ?PROTO_VER,
+        %% may be used in future to avoud re-bootrstrapping all the routes,
+        %% for example, if the connection was abrupted for a while but the cluster was healthy
+        %% and didn't lost any routes. In that case, retrying lost route updates would be sufficient.
+        %% For now, it's always true for simplicitiy reasons.
+        <<"need_bootstrap">> => true,
+        <<"extra">> => #{}
+    },
+    Payload1 =
+        case Result of
+            {ok, _} ->
+                Payload#{<<"result">> => <<"ok">>};
+            {error, Reason} ->
+                Payload#{<<"result">> => <<"error">>, reason => Reason}
+        end,
+    Props = #{'Correlation-Data' => ReqId},
+    Query = {RespTopic, Props, Payload1, ?QOS_1},
+    %% Using msg forwading resource to send the response back.
+    %% TODO: maybe async query?
+    emqx_resource:query(?MSG_RES_ID(ClusterName), Query, #{
+        query_mode => simple_sync, pick_key => RespTopic
+    }).
+
+remove_link(ClusterName) ->
+    Payload = #{<<"op">> => ?UNLINK_OP},
+    Topic = ?CTRL_TOPIC(?MY_CLUSTER_NAME),
+    publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, Topic, ?QOS_0).
+
+publish_routes(QueryType, ClusterName, Topics) ->
+    %% Picks the same pool worker consistently.
+    %% Although, as writes are idompotent we can pick it randomly - TBD.
+    publish_routes(QueryType, ClusterName, ?DEFAULT_POOL_KEY, Topics).
+
+publish_routes(QueryType, ClusterName, PoolKey, Topics) ->
+    Payload = #{<<"op">> => ?BATCH_ROUTES_OP, <<"topics">> => Topics},
+    publish(QueryType, ClusterName, PoolKey, Payload).
+
+cleanup_routes(ClusterName) ->
+    Payload = #{<<"op">> => ?CLEANUP_ROUTES_OP},
+    publish(sync, ClusterName, ?DEFAULT_POOL_KEY, Payload, #{}, ?ROUTE_TOPIC, ?QOS_0).
+
+publish_route_op(QueryType, ClusterName, Op, Topic) when Op =:= <<"add">>; Op =:= <<"delete">> ->
+    Payload = ?TOPIC_WITH_OP(Op, Topic),
+    publish(QueryType, ClusterName, Topic, Payload).
+
+publish(QueryType, ClusterName, PoolKey, Payload) ->
+    publish(QueryType, ClusterName, PoolKey, Payload, #{}).
+
+publish(QueryType, ClusterName, PoolKey, Payload, Props) ->
+    %% Deletes are not implemented for now, writes are idempotent, so QOS_1 is fine.
+    publish(QueryType, ClusterName, PoolKey, Payload, Props, ?ROUTE_TOPIC, ?QOS_1).
+
+publish(async, ClusterName, PoolKey, Payload, Props, Topic, QoS) ->
+    ecpool:pick_and_do(
+        {?ROUTE_POOL_NAME(ClusterName), PoolKey},
+        fun(ConnPid) ->
+            Ref = erlang:make_ref(),
+            Cb = {fun publish_result/3, [self(), Ref]},
+            emqtt:publish_async(
+                ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}], ?PUB_TIMEOUT, Cb
+            ),
+            Ref
+        end,
+        no_handover
+    );
+publish(sync, ClusterName, PoolKey, Payload, Props, Topic, QoS) ->
+    ecpool:pick_and_do(
+        {?ROUTE_POOL_NAME(ClusterName), PoolKey},
+        fun(ConnPid) ->
+            emqtt:publish(ConnPid, Topic, Props, ?ENCODE(Payload), [{qos, QoS}])
+        end,
+        no_handover
+    ).
+
+publish_result(Caller, Ref, Result) ->
+    case handle_send_result(Result) of
+        ok ->
+            %% avoid extra message passing, we only care about errors for now
+            ok;
+        Err ->
+            Caller ! {pub_result, Ref, Err}
+    end.
+
+%%--------------------------------------------------------------------
+%% Protocol
+%%--------------------------------------------------------------------
+
+decode_ctrl_msg(Payload, ClusterName) ->
+    decode_ctrl_msg1(?DECODE(Payload), ClusterName).
+
+decode_ctrl_msg1(
+    #{
+        <<"op">> := ?INIT_LINK_OP,
+        <<"proto_ver">> := ProtoVer,
+        <<"upstream">> := UpstreamName
+    },
+    ClusterName
+) ->
+    ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName),
+    %% UpstreamName is the name the remote linked cluster refers to this cluster,
+    %% so it must equal to the local cluster name, more clear naming is desired...
+    MyClusterName = ?MY_CLUSTER_NAME,
+    case UpstreamName of
+        MyClusterName ->
+            {init_link, {ok, #{proto_ver => ProtoVer1}}};
+        _ ->
+            ?SLOG(error, #{
+                msg => "misconfigured_cluster_link_name",
+                %% How this cluster names itself
+                local_name => MyClusterName,
+                %% How the remote cluster names itself
+                link_name => ClusterName,
+                %% How the remote cluster names this local cluster
+                upstream_name => UpstreamName
+            }),
+            {init_link, {error, <<"bad_upstream_name">>}}
+    end;
+decode_ctrl_msg1(
+    #{
+        <<"op">> := ?ACK_LINK_OP,
+        <<"result">> := <<"ok">>,
+        <<"proto_ver">> := ProtoVer,
+        <<"need_bootstrap">> := IsBootstrapNeeded
+    },
+    ClusterName
+) ->
+    ProtoVer1 = decode_proto_ver(ProtoVer, ClusterName),
+    {ack_link, {ok, #{proto_ver => ProtoVer1, need_bootstrap => IsBootstrapNeeded}}};
+decode_ctrl_msg1(
+    #{
+        <<"op">> := ?ACK_LINK_OP,
+        <<"result">> := <<"error">>,
+        <<"reason">> := Reason
+    },
+    _ClusterName
+) ->
+    {ack_link, {error, Reason}};
+decode_ctrl_msg1(#{<<"op">> := ?UNLINK_OP}, _ClusterName) ->
+    unlink.
+
+decode_route_op(Payload) ->
+    decode_route_op1(?DECODE(Payload)).
+
+decode_route_op1(<<"add_", Topic/binary>>) ->
+    {add, Topic};
+decode_route_op1(<<"delete_", Topic/binary>>) ->
+    {delete, Topic};
+decode_route_op1(#{<<"op">> := ?BATCH_ROUTES_OP, <<"topics">> := Topics}) when is_list(Topics) ->
+    {add, Topics};
+decode_route_op1(#{<<"op">> := ?CLEANUP_ROUTES_OP}) ->
+    cleanup_routes;
+decode_route_op1(Payload) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_cluster_link_route_op_payload",
+        payload => Payload
+    }),
+    {error, Payload}.
+
+decode_forwarded_msg(Payload) ->
+    case ?DECODE(Payload) of
+        #message{} = Msg ->
+            Msg;
+        _ ->
+            ?SLOG(warning, #{
+                msg => "unexpected_cluster_link_forwarded_msg_payload",
+                payload => Payload
+            }),
+            {error, Payload}
+    end.
+
+decode_proto_ver(ProtoVer, ClusterName) ->
+    {MyMajor, MyMinor} = decode_proto_ver1(?PROTO_VER),
+    case decode_proto_ver1(ProtoVer) of
+        {Major, Minor} = Res when
+            Major > MyMajor;
+            Minor > MyMinor
+        ->
+            ?SLOG(notice, #{
+                msg => "different_cluster_link_protocol_versions",
+                protocol_version => ?PROTO_VER,
+                link_protocol_version => ProtoVer,
+                link_name => ClusterName
+            }),
+            Res;
+        Res ->
+            Res
+    end.
+
+decode_proto_ver1(ProtoVer) ->
+    [Major, Minor] = binary:split(ProtoVer, <<".">>),
+    %% Let it fail (for now), we don't expect invalid data to pass through the linking protocol..
+    {emqx_utils_conv:int(Major), emqx_utils_conv:int(Minor)}.
+
+%%--------------------------------------------------------------------
+%% emqx_external_broker
+%%--------------------------------------------------------------------
+
+forward({external, {link, ClusterName}}, #delivery{message = #message{topic = Topic} = Msg}) ->
+    QueryOpts = #{pick_key => Topic},
+    emqx_resource:query(?MSG_RES_ID(ClusterName), Msg, QueryOpts).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+emqtt_client_opts(
+    ClientIdSuffix, #{server := Server, ssl := #{enable := EnableSsl} = Ssl} = ClusterConf
+) ->
+    BaseClientId = maps:get(client_id, ClusterConf, ?MY_CLUSTER_NAME),
+    ClientId = ?CLIENTID(BaseClientId, ClientIdSuffix),
+    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MQTT_HOST_OPTS),
+    Opts = #{
+        host => Host,
+        port => Port,
+        clientid => ClientId,
+        proto_ver => v5,
+        ssl => EnableSsl,
+        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
+    },
+    with_password(with_user(Opts, ClusterConf), ClusterConf).
+
+with_user(Opts, #{username := U} = _ClusterConf) ->
+    Opts#{username => U};
+with_user(Opts, _ClusterConf) ->
+    Opts.
+
+with_password(Opts, #{password := P} = _ClusterConf) ->
+    Opts#{password => emqx_secret:unwrap(P)};
+with_password(Opts, _ClusterConf) ->
+    Opts.
+
+start_pool(PoolName, ClientIdSuffix, #{pool_size := PoolSize} = ClusterConf) ->
+    ClientOpts = emqtt_client_opts(ClientIdSuffix, ClusterConf),
+    Opts = [
+        {name, PoolName},
+        {pool_size, PoolSize},
+        {pool_type, hash},
+        {client_opts, ClientOpts}
+    ],
+    ecpool:start_sup_pool(PoolName, ?MODULE, Opts).

+ 56 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl

@@ -0,0 +1,56 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_schema).
+
+-behaviour(emqx_schema_hooks).
+
+-include_lib("hocon/include/hoconsc.hrl").
+
+-export([injected_fields/0]).
+
+-export([
+    roots/0,
+    fields/1,
+    namespace/0,
+    desc/1
+]).
+
+-define(MQTT_HOST_OPTS, #{default_port => 1883}).
+
+namespace() -> "cluster_linking".
+
+roots() -> [].
+
+injected_fields() ->
+    #{cluster => fields("cluster_linking")}.
+
+fields("cluster_linking") ->
+    [
+        %% TODO: validate and ensure upstream names are unique!
+        {links, ?HOCON(?ARRAY(?R_REF("link")), #{default => []})}
+    ];
+fields("link") ->
+    [
+        {enable, ?HOCON(boolean(), #{default => false})},
+        {upstream, ?HOCON(binary(), #{required => true})},
+        {server,
+            emqx_schema:servers_sc(#{required => true, desc => ?DESC("server")}, ?MQTT_HOST_OPTS)},
+        {clientid, ?HOCON(binary(), #{desc => ?DESC("clientid")})},
+        {username, ?HOCON(binary(), #{desc => ?DESC("username")})},
+        {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})},
+        {ssl, #{
+            type => ?R_REF(emqx_schema, "ssl_client_opts"),
+            default => #{<<"enable">> => false},
+            desc => ?DESC("ssl")
+        }},
+        %% TODO: validate topics:
+        %% - basic topic validation
+        %% - non-overlapping (not intersecting) filters ?
+        {topics, ?HOCON(?ARRAY(binary()), #{required => true})},
+        {pool_size, ?HOCON(pos_integer(), #{default => emqx_vm:schedulers() * 2})}
+    ].
+
+desc(_) ->
+    "todo".

+ 36 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl

@@ -0,0 +1,36 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/1]).
+
+-export([init/1]).
+
+-define(COORD_SUP, emqx_cluster_link_coord_sup).
+-define(SERVER, ?MODULE).
+
+start_link(LinksConf) ->
+    supervisor:start_link({local, ?SERVER}, ?SERVER, LinksConf).
+
+init(LinksConf) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 5
+    },
+    Children = [sup_spec(?COORD_SUP, ?COORD_SUP, LinksConf)],
+    {ok, {SupFlags, Children}}.
+
+sup_spec(Id, Mod, Conf) ->
+    #{
+        id => Id,
+        start => {Mod, start_link, [Conf]},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor,
+        modules => [Mod]
+    }.

+ 3 - 1
apps/emqx_conf/include/emqx_conf.hrl

@@ -74,7 +74,9 @@
     (?CE_AUTHN_PROVIDER_SCHEMA_MODS ++ ?EE_AUTHN_PROVIDER_SCHEMA_MODS)
 ).
 
--define(OTHER_INJECTING_CONFIGS, ?AUTH_EXT_SCHEMA_MODS).
+-define(CLUSTER_LINKING_SCHEMA_MODS, [emqx_cluster_link_schema]).
+
+-define(OTHER_INJECTING_CONFIGS, ?AUTH_EXT_SCHEMA_MODS ++ ?CLUSTER_LINKING_SCHEMA_MODS).
 
 -else.
 

+ 1 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -255,7 +255,7 @@ fields("cluster") ->
                     importance => ?IMPORTANCE_HIDDEN
                 }
             )}
-    ];
+    ] ++ emqx_schema_hooks:injection_point(cluster);
 fields(cluster_static) ->
     [
         {"seeds",

+ 2 - 1
apps/emqx_machine/priv/reboot_lists.eterm

@@ -133,7 +133,8 @@
             emqx_bridge_syskeeper,
             emqx_bridge_confluent,
             emqx_ds_shared_sub,
-            emqx_auth_ext
+            emqx_auth_ext,
+            emqx_cluster_link
         ],
     %% must always be of type `load'
     ce_business_apps =>