Browse Source

Merge pull request #13927 from keynslug/fix/clink/bootstrap-crash

fix(cluster-link): provide `emqx_broker` API to fold over topics
Andrew Mayorov 1 year atrás
parent
commit
5f3d7b7441

+ 18 - 0
apps/emqx/src/emqx_broker.erl

@@ -55,6 +55,11 @@
     subscribed/2
 ]).
 
+%% Folds
+-export([
+    foldl_topics/2
+]).
+
 -export([
     get_subopts/2,
     set_subopts/2
@@ -120,6 +125,7 @@ create_tabs() ->
     TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
 
     %% SubOption: {TopicFilter, SubPid} -> SubOption
+    %% NOTE: `foldl_topics/2` relies on it being ordered.
     ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]),
 
     %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ...
@@ -518,6 +524,18 @@ set_subopts(SubPid, Topic, NewOpts) ->
             false
     end.
 
+-spec foldl_topics(fun((emqx_types:topic() | emqx_types:share(), Acc) -> Acc), Acc) ->
+    Acc.
+foldl_topics(FoldFun, Acc) ->
+    First = ets:first(?SUBOPTION),
+    foldl_topics(FoldFun, Acc, First).
+
+foldl_topics(FoldFun, Acc, {Topic, _SubPid}) ->
+    Next = ets:next(?SUBOPTION, {Topic, _GreaterThanAnyPid = []}),
+    foldl_topics(FoldFun, FoldFun(Topic, Acc), Next);
+foldl_topics(_FoldFun, Acc, '$end_of_table') ->
+    Acc.
+
 -spec topics() -> [emqx_types:topic() | emqx_types:share()].
 topics() ->
     emqx_router:topics().

+ 2 - 3
apps/emqx_cluster_link/src/emqx_cluster_link_router_bootstrap.erl

@@ -100,10 +100,9 @@ select_routes_by_topics(Topics) ->
     [encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].
 
 select_routes_by_wildcards(Wildcards) ->
-    emqx_utils_ets:keyfoldl(
+    emqx_broker:foldl_topics(
         fun(Topic, Acc) -> intersecting_route(Topic, Wildcards) ++ Acc end,
-        [],
-        ?SUBSCRIBER
+        []
     ).
 
 select_shared_sub_routes_by_topics([T | Topics]) ->

+ 8 - 8
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -373,9 +373,7 @@ handle_info(
         #{
             result := Res,
             need_bootstrap := NeedBootstrap
-        } = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(
-        Payload
-    ),
+        } = AckInfoMap} = emqx_cluster_link_mqtt:decode_resp(Payload),
     St1 = St#st{
         actor_init_req_id = undefined, actor_init_timer = undefined, remote_actor_info = AckInfoMap
     },
@@ -402,8 +400,6 @@ handle_info(
             %% TODO: retry after a timeout?
             {noreply, St1#st{error = Reason, status = disconnected}}
     end;
-handle_info({publish, #{}}, St) ->
-    {noreply, St};
 handle_info({timeout, TRef, reconnect}, St = #st{reconnect_timer = TRef}) ->
     {noreply, process_connect(St#st{reconnect_timer = undefined})};
 handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
@@ -487,15 +483,19 @@ handle_connect_error(Reason, St) ->
     _ = maybe_alarm(Reason, St),
     St#st{reconnect_timer = TRef, error = Reason, status = disconnected}.
 
-handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) ->
+handle_client_down(
+    Reason,
+    St = #st{target = TargetCluster, actor = Actor, bootstrapped = Bootstrapped}
+) ->
     ?SLOG(error, #{
         msg => "cluster_link_connection_failed",
         reason => Reason,
         target_cluster => St#st.target,
         actor => St#st.actor
     }),
-    %% TODO: syncer may be already down due to one_for_all strategy
-    ok = suspend_syncer(TargetCluster, Actor),
+    %% NOTE: There's no syncer yet if bootstrap haven't finished.
+    %% TODO: Syncer may be already down due to one_for_all strategy.
+    _ = Bootstrapped andalso suspend_syncer(TargetCluster, Actor),
     _ = maybe_alarm(Reason, St),
     NSt = cancel_heartbeat(St),
     process_connect(NSt#st{client = undefined, error = Reason, status = connecting}).

+ 1 - 0
changes/ee/fix-13927.en.md

@@ -0,0 +1 @@
+Fixed an issue where Cluster Link bootstrap process could have crashed if the source cluster had one or more very crowded topics.