Просмотр исходного кода

feat(clusterlink): integrate node local routes replication and message forwarding

Serge Tupchii 1 год назад
Родитель
Сommit
f036b641eb

+ 1 - 1
apps/emqx/include/emqx.hrl

@@ -67,7 +67,7 @@
 
 -record(route, {
     topic :: binary(),
-    dest :: node() | {binary(), node()} | emqx_session:session_id()
+    dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest()
 }).
 
 %%--------------------------------------------------------------------

+ 29 - 25
apps/emqx/src/emqx_broker.erl

@@ -256,9 +256,10 @@ do_publish_many([Msg | T]) ->
 
 do_publish(#message{topic = Topic} = Msg) ->
     PersistRes = persist_publish(Msg),
-    {Routes, ExtRoutes} = aggre(emqx_router:match_routes(Topic)),
-    Routes1 = maybe_add_ext_routes(ExtRoutes, Routes, Msg),
-    route(Routes1, delivery(Msg), PersistRes).
+    Routes = aggre(emqx_router:match_routes(Topic)),
+    Delivery = delivery(Msg),
+    RouteRes = route(Routes, Delivery, PersistRes),
+    ext_route(ext_routes(Topic, Msg), Delivery, RouteRes).
 
 persist_publish(Msg) ->
     case emqx_persistent_message:persist(Msg) of
@@ -322,41 +323,44 @@ do_route({To, Node}, Delivery) when Node =:= node() ->
     {Node, To, dispatch(To, Delivery)};
 do_route({To, Node}, Delivery) when is_atom(Node) ->
     {Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
-do_route({To, {external, _} = ExtDest}, Delivery) ->
-    {ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)};
 do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
     {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
 
 aggre([]) ->
-    {[], []};
+    [];
 aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
-    {[{To, Node}], []};
-aggre([#route{topic = To, dest = {external, _} = ExtDest}]) ->
-    {[], [{To, ExtDest}]};
+    [{To, Node}];
 aggre([#route{topic = To, dest = {Group, _Node}}]) ->
-    {[{To, Group}], []};
+    [{To, Group}];
 aggre(Routes) ->
-    aggre(Routes, false, {[], []}).
-
-aggre([#route{topic = To, dest = Node} | Rest], Dedup, {Acc, ExtAcc}) when is_atom(Node) ->
-    aggre(Rest, Dedup, {[{To, Node} | Acc], ExtAcc});
-aggre([#route{topic = To, dest = {external, _} = ExtDest} | Rest], Dedup, {Acc, ExtAcc}) ->
-    aggre(Rest, Dedup, {Acc, [{To, ExtDest} | ExtAcc]});
-aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, {Acc, ExtAcc}) ->
-    aggre(Rest, true, {[{To, Group} | Acc], ExtAcc});
+    aggre(Routes, false, []).
+
+aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
+    aggre(Rest, Dedup, [{To, Node} | Acc]);
+aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
+    aggre(Rest, true, [{To, Group} | Acc]);
 aggre([], false, Acc) ->
     Acc;
-aggre([], true, {Acc, ExtAcc}) ->
-    {lists:usort(Acc), lists:usort(ExtAcc)}.
+aggre([], true, Acc) ->
+    lists:usort(Acc).
 
-maybe_add_ext_routes([] = _ExtRoutes, Routes, _Msg) ->
-    Routes;
-maybe_add_ext_routes(ExtRoutes, Routes, Msg) ->
+ext_routes(Topic, Msg) ->
     case emqx_external_broker:should_route_to_external_dests(Msg) of
-        true -> Routes ++ ExtRoutes;
-        false -> Routes
+        true -> emqx_external_broker:match_routes(Topic);
+        false -> []
     end.
 
+ext_route([], _Delivery, RouteRes) ->
+    RouteRes;
+ext_route(ExtRoutes, Delivery, RouteRes) ->
+    lists:foldl(
+        fun(#route{topic = To, dest = ExtDest}, Acc) ->
+            [{ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)} | Acc]
+        end,
+        RouteRes,
+        ExtRoutes
+    ).
+
 %% @doc Forward message to another node.
 -spec forward(
     node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async

+ 11 - 1
apps/emqx/src/emqx_external_broker.erl

@@ -24,6 +24,10 @@
 -callback maybe_add_route(emqx_types:topic()) -> ok.
 -callback maybe_delete_route(emqx_types:topic()) -> ok.
 
+-callback match_routes(emqx_types:topic()) -> [emqx_types:route()].
+
+-type dest() :: term().
+
 -export([
     provider/0,
     register_provider/1,
@@ -31,9 +35,12 @@
     forward/2,
     should_route_to_external_dests/1,
     maybe_add_route/1,
-    maybe_delete_route/1
+    maybe_delete_route/1,
+    match_routes/1
 ]).
 
+-export_type([dest/0]).
+
 -include("logger.hrl").
 
 -define(PROVIDER, {?MODULE, external_broker}).
@@ -106,6 +113,9 @@ maybe_add_route(Topic) ->
 maybe_delete_route(Topic) ->
     ?safe_with_provider(?FUNCTION_NAME(Topic), ok).
 
+match_routes(Topic) ->
+    ?safe_with_provider(?FUNCTION_NAME(Topic), ok).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

+ 1 - 2
apps/emqx/src/emqx_router.erl

@@ -95,8 +95,7 @@
 -export_type([schemavsn/0]).
 
 -type group() :: binary().
--type external_dest() :: {external, term()}.
--type dest() :: node() | {group(), node()} | external_dest().
+-type dest() :: node() | {group(), node()}.
 -type schemavsn() :: v1 | v2.
 
 %% Operation :: {add, ...} | {delete, ...}.

+ 1 - 1
apps/emqx/src/emqx_types.erl

@@ -267,7 +267,7 @@
     [
         {node(), topic(), deliver_result()}
         | {share, topic(), deliver_result()}
-        | {emqx_router:external_dest(), topic(), deliver_result()}
+        | {emqx_external_broker:dest(), topic(), deliver_result()}
         | persisted
     ]
     | disconnect.

+ 31 - 7
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -11,6 +11,7 @@
     unregister_external_broker/0,
     maybe_add_route/1,
     maybe_delete_route/1,
+    match_routes/1,
     forward/2,
     should_route_to_external_dests/1
 ]).
@@ -38,15 +39,16 @@ unregister_external_broker() ->
     emqx_external_broker:unregister_provider(?MODULE).
 
 maybe_add_route(Topic) ->
-    emqx_cluster_link_coordinator:route_op(<<"add">>, Topic).
+    maybe_push_route_op(add, Topic).
 
-maybe_delete_route(_Topic) ->
-    %% Not implemented yet
-    %% emqx_cluster_link_coordinator:route_op(<<"delete">>, Topic).
-    ok.
+maybe_delete_route(Topic) ->
+    maybe_push_route_op(delete, Topic).
+
+forward(DestCluster, Delivery) ->
+    emqx_cluster_link_mqtt:forward(DestCluster, Delivery).
 
-forward(ExternalDest, Delivery) ->
-    emqx_cluster_link_mqtt:forward(ExternalDest, Delivery).
+match_routes(Topic) ->
+    emqx_cluster_link_extrouter:match_routes(Topic).
 
 %% Do not forward any external messages to other links.
 %% Only forward locally originated messages to all the relevant links, i.e. no gossip message forwarding.
@@ -105,6 +107,28 @@ delete_hook() ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+maybe_push_route_op(Op, Topic) ->
+    lists:foreach(
+        fun(#{upstream := Cluster, topics := LinkFilters}) ->
+            case topic_intersect_any(Topic, LinkFilters) of
+                false ->
+                    ok;
+                TopicIntersection ->
+                    ID = Topic,
+                    emqx_cluster_link_router_syncer:push(Cluster, Op, TopicIntersection, ID)
+            end
+        end,
+        emqx_cluster_link_config:enabled_links()
+    ).
+
+topic_intersect_any(Topic, [LinkFilter | T]) ->
+    case emqx_topic:intersection(Topic, LinkFilter) of
+        false -> topic_intersect_any(Topic, T);
+        TopicOrFilter -> TopicOrFilter
+    end;
+topic_intersect_any(_Topic, []) ->
+    false.
+
 actor_init(ClusterName, Actor, Incarnation) ->
     Env = #{timestamp => erlang:system_time(millisecond)},
     {ok, _} = emqx_cluster_link_extrouter:actor_init(ClusterName, Actor, Incarnation, Env).

+ 4 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_config.erl

@@ -16,6 +16,7 @@
 -export([
     %% General
     cluster/0,
+    enabled_links/0,
     links/0,
     link/1,
     topic_filters/1,
@@ -41,6 +42,9 @@ cluster() ->
 links() ->
     emqx:get_config(?LINKS_PATH, []).
 
+enabled_links() ->
+    [L || L = #{enable := true} <- links()].
+
 link(Name) ->
     case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, links()) of
         [LinkConf | _] -> LinkConf;

+ 6 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -64,6 +64,8 @@
 %%    Op4 | n2@ds delete client/42/# → MCounter -= 1 bsl 1 = 0 → route deleted
 -type lane() :: non_neg_integer().
 
+-include_lib("emqx/include/emqx.hrl").
+
 -define(DEFAULT_ACTOR_TTL_MS, 30_000).
 
 -define(EXTROUTE_SHARD, ?MODULE).
@@ -117,7 +119,8 @@ create_tables() ->
 
 match_routes(Topic) ->
     Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]),
-    [match_to_route(M) || M <- Matches].
+    %% `unique` opt is not enough, since we keep the original Topic as a part of RouteID
+    lists:usort([match_to_route(M) || M <- Matches]).
 
 lookup_routes(Topic) ->
     Pat = #extroute{entry = emqx_topic_index:make_key(Topic, '$1'), _ = '_'},
@@ -128,7 +131,8 @@ topics() ->
     [emqx_topic_index:get_topic(K) || [K] <- ets:match(?EXTROUTE_TAB, Pat)].
 
 match_to_route(M) ->
-    emqx_topic_index:get_topic(M).
+    ?ROUTE_ID(Cluster, _) = emqx_topic_index:get_id(M),
+    #route{topic = emqx_topic_index:get_topic(M), dest = Cluster}.
 
 %%
 

+ 3 - 3
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -556,8 +556,8 @@ encode_field(route, {add, Route = {_Topic, _ID}}) ->
 encode_field(route, {delete, {Topic, ID}}) ->
     {?ROUTE_DELETE, Topic, ID}.
 
-decode_field(route, {?ROUTE_DELETE, Route = {_Topic, _ID}}) ->
-    {delete, Route};
+decode_field(route, {?ROUTE_DELETE, Topic, ID}) ->
+    {delete, {Topic, ID}};
 decode_field(route, Route = {_Topic, _ID}) ->
     {add, Route}.
 
@@ -565,7 +565,7 @@ decode_field(route, Route = {_Topic, _ID}) ->
 %% emqx_external_broker
 %%--------------------------------------------------------------------
 
-forward({external, {link, ClusterName}}, #delivery{message = #message{topic = Topic} = Msg}) ->
+forward(ClusterName, #delivery{message = #message{topic = Topic} = Msg}) ->
     QueryOpts = #{pick_key => Topic},
     emqx_resource:query(?MSG_RES_ID(ClusterName), Msg, QueryOpts).
 

+ 7 - 4
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -123,7 +123,11 @@ refine_client_options(Options = #{clientid := ClientID}) ->
 
 client_session_present(ClientPid) ->
     Info = emqtt:info(ClientPid),
-    proplists:get_value(session_present, Info, false).
+    %% FIXME: waitnig for emqtt release that fixes session_present type (must be a boolean)
+    case proplists:get_value(session_present, Info, 0) of
+        0 -> false;
+        1 -> true
+    end.
 
 announce_client(TargetCluster, Pid) ->
     true = gproc:reg_other(?CLIENT_NAME(TargetCluster), Pid),
@@ -272,11 +276,10 @@ terminate(_Reason, _State) ->
 process_connect(St = #st{target = TargetCluster, actor = Actor, incarnation = Incr}) ->
     case start_link_client(TargetCluster) of
         {ok, ClientPid} ->
-            ok = start_syncer(TargetCluster),
-            ok = announce_client(TargetCluster, ClientPid),
             %% TODO: error handling, handshake
-
             {ok, _} = emqx_cluster_link_mqtt:publish_actor_init_sync(ClientPid, Actor, Incr),
+            ok = start_syncer(TargetCluster),
+            ok = announce_client(TargetCluster, ClientPid),
             process_bootstrap(St#st{client = ClientPid});
         {error, Reason} ->
             handle_connect_error(Reason, St)