Explorar o código

Merge pull request #12410 from SergeTupchiy/EMQX-11812-subscribe-performance-degraded

perf(emqx_broker): pick broker pool worker by topic/shard pair to dis…
SergeTupchiy %!s(int64=2) %!d(string=hai) anos
pai
achega
03ff6f2ddf
Modificáronse 2 ficheiros con 88 adicións e 52 borrados
  1. 74 40
      apps/emqx/src/emqx_broker.erl
  2. 14 12
      apps/emqx/src/emqx_router_syncer.erl

+ 74 - 40
apps/emqx/src/emqx_broker.erl

@@ -85,6 +85,16 @@
 %% Guards
 -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).
 
+-define(cast_or_eval(Pid, Msg, Expr),
+    case Pid =:= self() of
+        true ->
+            _ = Expr,
+            ok;
+        false ->
+            cast(Pid, Msg)
+    end
+).
+
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
 start_link(Pool, Id) ->
     ok = create_tabs(),
@@ -159,15 +169,7 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
     %% https://emqx.atlassian.net/browse/EMQX-10214
     I = emqx_broker_helper:get_sub_shard(SubPid, Topic),
     true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}),
-    %% NOTE
-    %% We are relying on the local state to minimize global routing state changes,
-    %% thus it's important that some operations on ETS tables on the same topic
-    %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
-    %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
-    %% `unsubscribe` codepath. So we have to pick a worker according to the topic,
-    %% but not shard. If there are topics with high number of shards, then the
-    %% load across the pool will be unbalanced.
-    Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}),
+    Sync = call(pick({Topic, I}), {subscribe, Topic, SubPid, I}),
     case Sync of
         ok ->
             ok;
@@ -218,15 +220,7 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when
         0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts);
         _ -> ok
     end,
-    %% NOTE
-    %% We are relying on the local state to minimize global routing state changes,
-    %% thus it's important that some operations on ETS tables on the same topic
-    %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
-    %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
-    %% `unsubscribe` codepath. So we have to pick a worker according to the topic,
-    %% but not shard. If there are topics with high number of shards, then the
-    %% load across the pool will be unbalanced.
-    cast(pick(Topic), {unsubscribed, Topic, SubPid, I});
+    cast(pick({Topic, I}), {unsubscribed, Topic, SubPid, I});
 do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
     is_binary(Group), is_binary(Topic), is_pid(SubPid)
 ->
@@ -501,8 +495,8 @@ cast(Broker, Req) ->
     gen_server:cast(Broker, Req).
 
 %% Pick a broker
-pick(Topic) ->
-    gproc_pool:pick_worker(broker_pool, Topic).
+pick(TopicShard) ->
+    gproc_pool:pick_worker(broker_pool, TopicShard).
 
 %%--------------------------------------------------------------------
 %% gen_server callbacks
@@ -514,36 +508,72 @@ init([Pool, Id]) ->
 
 handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) ->
     Existed = ets:member(?SUBSCRIBER, Topic),
-    true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
-    Result = maybe_add_route(Existed, Topic, From),
-    {reply, Result, State};
-handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) ->
-    Existed = ets:member(?SUBSCRIBER, Topic),
-    true = ets:insert(?SUBSCRIBER, [
-        {Topic, {shard, I}},
-        {{shard, Topic, I}, SubPid}
-    ]),
     Result = maybe_add_route(Existed, Topic, From),
+    assert_ok_result(Result),
+    true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
     {reply, Result, State};
+handle_call({subscribe, Topic, SubPid, I}, _From, State) ->
+    Existed = ets:member(?SUBSCRIBER, {shard, Topic, I}),
+    Recs = [{{shard, Topic, I}, SubPid}],
+    Recs1 =
+        case Existed of
+            false ->
+                %% This will attempt to add a route per each new shard.
+                %% The overhead must be negligible, but the consistency in general
+                %% and race conditions safety is expected to be stronger.
+                %% The main purpose is to solve the race when
+                %% `{shard, Topic, N}` (where N > 0)
+                %% is the first ever processed subscribe request per `Topic`.
+                %% It inserts `{Topic, {shard, I}}` to `?SUBSCRIBER` tab.
+                %% After that, another broker worker starts processing
+                %% `{shard, Topic, 0}` sub and already observers `{shard, Topic, N}`,
+                %% i.e. `ets:member(?SUBSCRIBER, Topic)` returns false,
+                %% so it doesn't add the route.
+                %% Even if this happens, this cast is expected to be processed eventually
+                %% and the route should be added (unless the worker restarts...)
+                ?cast_or_eval(
+                    pick({Topic, 0}),
+                    {subscribed, Topic, shard, I},
+                    sync_route(add, Topic, #{})
+                ),
+                [{Topic, {shard, I}} | Recs];
+            true ->
+                Recs
+        end,
+    true = ets:insert(?SUBSCRIBER, Recs1),
+    {reply, ok, State};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
 
+handle_cast({subscribed, Topic, shard, _I}, State) ->
+    %% Do not need to 'maybe add' (i.e. to check if the route exists).
+    %% It was already checked that this shard is newely added.
+    _ = sync_route(add, Topic, #{}),
+    {noreply, State};
+handle_cast({unsubscribed, Topic, shard, _I}, State) ->
+    _ = maybe_delete_route(Topic),
+    {noreply, State};
 handle_cast({unsubscribed, Topic, SubPid, 0}, State) ->
     true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
-    Exists = ets:member(?SUBSCRIBER, Topic),
-    _Result = maybe_delete_route(Exists, Topic),
+    _ = maybe_delete_route(Topic),
     {noreply, State};
 handle_cast({unsubscribed, Topic, SubPid, I}, State) ->
     true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
     case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
         false ->
-            ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}});
+            ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
+            %% Do not attempt to delete any routes here,
+            %% let it be handled only by the same pool worker per topic (0 shard),
+            %% so that all route deletes are serialized.
+            ?cast_or_eval(
+                pick({Topic, 0}),
+                {unsubscribed, Topic, shard, I},
+                maybe_delete_route(Topic)
+            );
         true ->
-            true
+            ok
     end,
-    Exists = ets:member(?SUBSCRIBER, Topic),
-    _Result = maybe_delete_route(Exists, Topic),
     {noreply, State};
 handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
@@ -582,7 +612,7 @@ do_dispatch(Topic, #delivery{message = Msg}) ->
             {ok, DispN}
     end.
 
-%% Donot dispatch to share subscriber here.
+%% Don't dispatch to share subscriber here.
 %% we do it in `emqx_shared_sub.erl` with configured strategy
 do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
     case erlang:is_process_alive(SubPid) of
@@ -603,15 +633,19 @@ do_dispatch({shard, I}, Topic, Msg) ->
 
 %%
 
+assert_ok_result(ok) -> ok;
+assert_ok_result(Ref) when is_reference(Ref) -> ok.
+
 maybe_add_route(_Existed = false, Topic, ReplyTo) ->
     sync_route(add, Topic, #{reply => ReplyTo});
 maybe_add_route(_Existed = true, _Topic, _ReplyTo) ->
     ok.
 
-maybe_delete_route(_Exists = false, Topic) ->
-    sync_route(delete, Topic, #{});
-maybe_delete_route(_Exists = true, _Topic) ->
-    ok.
+maybe_delete_route(Topic) ->
+    case ets:member(?SUBSCRIBER, Topic) of
+        true -> ok;
+        false -> sync_route(delete, Topic, #{})
+    end.
 
 sync_route(Action, Topic, ReplyTo) ->
     EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),

+ 14 - 12
apps/emqx/src/emqx_router_syncer.erl

@@ -97,7 +97,7 @@ push(Action, Topic, Dest, Opts) ->
     Context = mk_push_context(Opts),
     _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})),
     case Context of
-        {MRef, _} ->
+        [{MRef, _}] ->
             MRef;
         [] ->
             ok
@@ -128,7 +128,7 @@ designate_prio(delete, #{}) ->
 
 mk_push_context(#{reply := To}) ->
     MRef = erlang:make_ref(),
-    {MRef, To};
+    [{MRef, To}];
 mk_push_context(_) ->
     [].
 
@@ -272,8 +272,8 @@ send_replies(Errors, Batch) ->
 
 replyctx_send(_Result, []) ->
     noreply;
-replyctx_send(Result, {MRef, Pid}) ->
-    _ = erlang:send(Pid, {MRef, Result}),
+replyctx_send(Result, RefsPids) ->
+    _ = lists:foreach(fun({MRef, Pid}) -> erlang:send(Pid, {MRef, Result}) end, RefsPids),
     ok.
 
 %%
@@ -316,10 +316,11 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) ->
             Stash#{Route := RouteOpMerged}
     end.
 
-merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) ->
-    %% NOTE: This should not happen anyway.
-    _ = replyctx_send(ignored, Ctx1),
-    DestOp;
+merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), ?ROUTEOP(Action, Prio2, Ctx2)) ->
+    %% NOTE: This can happen as topic shard can be processed concurrently
+    %% by different broker worker, see emqx_broker for more details.
+    MergedCtx = Ctx1 ++ Ctx2,
+    ?ROUTEOP(Action, Prio2, MergedCtx);
 merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) ->
     %% NOTE: Latter cancel the former.
     %% Strictly speaking, in ideal conditions we could just cancel both, because they
@@ -352,7 +353,7 @@ stash_stats(Stash) ->
 
 batch_test() ->
     Dest = node(),
-    Ctx = fun(N) -> {N, self()} end,
+    Ctx = fun(N) -> [{N, self()}] end,
     Stash = stash_add(
         [
             ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))),
@@ -375,6 +376,7 @@ batch_test() ->
         stash_new()
     ),
     {Batch, StashLeft} = mk_batch(Stash, 5),
+
     ?assertMatch(
         #{
             {<<"t/1">>, Dest} := {add, ?PRIO_LO, _},
@@ -392,16 +394,16 @@ batch_test() ->
         },
         StashLeft
     ),
+
+    %% Replies are only sent to superseded ops:
     ?assertEqual(
         [
-            {2, ignored},
             {1, ok},
             {5, ok},
-            {7, ignored},
             {4, ok},
             {9, ok},
+            {7, ok},
             {8, ok},
-            {13, ignored},
             {11, ok}
         ],
         emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))