Kaynağa Gözat

fix(route-sync): handle batch sync errors gracefully

Andrew Mayorov 2 yıl önce
ebeveyn
işleme
2ac6cddf19

+ 84 - 19
apps/emqx/src/emqx_router_syncer.erl

@@ -16,6 +16,7 @@
 
 -module(emqx_router_syncer).
 
+-include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
 -behaviour(gen_server).
@@ -25,6 +26,8 @@
 -export([push/4]).
 -export([wait/1]).
 
+-export([stats/0]).
+
 -export([
     init/1,
     handle_call/3,
@@ -40,6 +43,16 @@
 -define(MAX_BATCH_SIZE, 1000).
 -define(MIN_SYNC_INTERVAL, 1).
 
+%% How long (ms) to idle after observing a batch sync error?
+%% Should help to avoid excessive retries in situations when errors are caused by
+%% conditions that take some time to resolve (e.g. restarting an upstream core node).
+-define(ERROR_DELAY, 10).
+
+%% How soon (ms) to retry last failed batch sync attempt?
+%% Only matter in absence of new operations, otherwise batch sync is triggered as
+%% soon as `?ERROR_DELAY` is over.
+-define(ERROR_RETRY_INTERVAL, 500).
+
 -define(PRIO_HI, 1).
 -define(PRIO_LO, 2).
 -define(PRIO_BG, 3).
@@ -117,16 +130,36 @@ mk_push_context(_) ->
 
 %%
 
+-type stats() :: #{
+    size := non_neg_integer(),
+    n_add := non_neg_integer(),
+    n_delete := non_neg_integer(),
+    prio_highest := non_neg_integer() | undefined,
+    prio_lowest := non_neg_integer() | undefined
+}.
+
+-spec stats() -> [stats()].
+stats() ->
+    Workers = gproc_pool:active_workers(?POOL),
+    [gen_server:call(Pid, stats, infinity) || {_Name, Pid} <- Workers].
+
+%%
+
 init([Pool, Id]) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
     {ok, #{stash => stash_new()}}.
 
+handle_call(stats, _From, State = #{stash := Stash}) ->
+    {reply, stash_stats(Stash), State};
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
+handle_info({timeout, _TRef, retry}, State) ->
+    NState = run_batch_loop([], maps:remove(retry_timer, State)),
+    {noreply, NState};
 handle_info(Push = ?PUSH(_, _), State) ->
     %% NOTE: Wait a bit to collect potentially overlapping operations.
     ok = timer:sleep(?MIN_SYNC_INTERVAL),
@@ -142,26 +175,41 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) ->
     Stash1 = stash_add(Incoming, Stash0),
     Stash2 = stash_drain(Stash1),
     {Batch, Stash3} = mk_batch(Stash2),
-    ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, #{
-        size => maps:size(Batch),
-        stashed => maps:size(Stash3),
-        n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Batch)),
-        n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Batch)),
-        prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Batch),
-        prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Batch)
-    }),
-    %% TODO: retry if error?
-    Errors = run_batch(Batch),
-    ok = send_replies(Errors, Batch),
-    NState = State#{stash := Stash3},
-    %% TODO: postpone if only ?PRIO_BG operations left?
-    case is_stash_empty(Stash3) of
-        true ->
-            NState;
-        false ->
-            run_batch_loop([], NState)
+    ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)),
+    case run_batch(Batch) of
+        Status = #{} ->
+            ok = send_replies(Status, Batch),
+            NState = cancel_retry_timer(State#{stash := Stash3}),
+            %% TODO: postpone if only ?PRIO_BG operations left?
+            case is_stash_empty(Stash3) of
+                true ->
+                    NState;
+                false ->
+                    run_batch_loop([], NState)
+            end;
+        BatchError ->
+            ?SLOG(warning, #{
+                msg => "router_batch_sync_failed",
+                reason => BatchError,
+                batch => batch_stats(Batch, Stash3)
+            }),
+            NState = State#{stash := Stash2},
+            ok = timer:sleep(?ERROR_DELAY),
+            ensure_retry_timer(NState)
     end.
 
+ensure_retry_timer(State = #{retry_timer := _TRef}) ->
+    State;
+ensure_retry_timer(State) ->
+    TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry),
+    State#{retry_timer => TRef}.
+
+cancel_retry_timer(State = #{retry_timer := TRef}) ->
+    ok = emqx_utils:cancel_timer(TRef),
+    maps:remove(retry_timer, State);
+cancel_retry_timer(State) ->
+    State.
+
 %%
 
 mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE ->
@@ -222,7 +270,7 @@ replyctx_send(Result, {MRef, Pid}) ->
 %%
 
 run_batch(Batch) when map_size(Batch) > 0 ->
-    emqx_router:do_batch(Batch);
+    catch emqx_router:do_batch(Batch);
 run_batch(_Empty) ->
     #{}.
 
@@ -275,6 +323,23 @@ merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2
 
 %%
 
+batch_stats(Batch, Stash) ->
+    BatchStats = stash_stats(Batch),
+    BatchStats#{
+        stashed => maps:size(Stash)
+    }.
+
+stash_stats(Stash) ->
+    #{
+        size => maps:size(Stash),
+        n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Stash)),
+        n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Stash)),
+        prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Stash),
+        prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Stash)
+    }.
+
+%%
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 

+ 85 - 31
apps/emqx/test/emqx_routing_SUITE.erl

@@ -39,21 +39,21 @@ groups() ->
         {group, batch_sync_replicants},
         {group, batch_sync_off}
     ],
-    GroupBase = [
-        {group, cluster},
-        t_concurrent_routing_updates
-    ],
     ClusterTCs = [
         t_cluster_routing,
         t_slow_rlog_routing_consistency
     ],
+    SingleTCs = [t_concurrent_routing_updates],
+    BatchSyncTCs = lists:duplicate(5, t_concurrent_routing_updates_with_errors),
     [
         {routing_schema_v1, [], GroupVsn},
         {routing_schema_v2, [], GroupVsn},
-        {batch_sync_on, [], GroupBase},
-        {batch_sync_replicants, [], GroupBase},
-        {batch_sync_off, [], GroupBase},
-        {cluster, [], ClusterTCs}
+        {batch_sync_on, [], [{group, cluster}, {group, single_batch_on}]},
+        {batch_sync_replicants, [], [{group, cluster}, {group, single}]},
+        {batch_sync_off, [], [{group, cluster}, {group, single}]},
+        {cluster, [], ClusterTCs},
+        {single_batch_on, [], SingleTCs ++ BatchSyncTCs},
+        {single, [], SingleTCs}
     ].
 
 init_per_group(routing_schema_v1, Config) ->
@@ -74,10 +74,38 @@ init_per_group(cluster, Config) ->
         {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
     ],
     Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
-    [{cluster, Nodes} | Config].
+    [{cluster, Nodes} | Config];
+init_per_group(GroupName, Config) when
+    GroupName =:= single_batch_on;
+    GroupName =:= single
+->
+    WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{
+                config => mk_config_broker(Config),
+                %% NOTE
+                %% Artificially increasing pool workers contention by forcing small pool size.
+                before_start => fun() ->
+                    % NOTE
+                    % This one is actually defined on `emqx_conf_schema` level, but used
+                    % in `emqx_broker`. Thus we have to resort to this ugly hack.
+                    emqx_config:force_put([node, broker_pool_size], 2),
+                    emqx_app:set_config_loader(?MODULE)
+                end
+            }}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    [{group_apps, Apps} | Config].
 
 end_per_group(cluster, Config) ->
     emqx_cth_cluster:stop(?config(cluster, Config));
+end_per_group(GroupName, Config) when
+    GroupName =:= single_batch_on;
+    GroupName =:= single
+->
+    emqx_cth_suite:stop(?config(group_apps, Config));
 end_per_group(_, _Config) ->
     ok.
 
@@ -226,29 +254,10 @@ unsubscribe(C, Topic) ->
 ]).
 
 t_concurrent_routing_updates(init, Config) ->
-    WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
-    Apps = emqx_cth_suite:start(
-        [
-            {emqx, #{
-                config => mk_config_broker(Config),
-                %% NOTE
-                %% Artificially increasing pool workers contention by forcing small pool size.
-                before_start => fun() ->
-                    % NOTE
-                    % This one is actually defined on `emqx_conf_schema` level, but used
-                    % in `emqx_broker`. Thus we have to resort to this ugly hack.
-                    emqx_config:force_put([node, broker_pool_size], 2),
-                    emqx_app:set_config_loader(?MODULE)
-                end
-            }}
-        ],
-        #{work_dir => WorkDir}
-    ),
     ok = snabbkaffe:start_trace(),
-    [{tc_apps, Apps} | Config];
-t_concurrent_routing_updates('end', Config) ->
-    ok = snabbkaffe:stop(),
-    ok = emqx_cth_suite:stop(?config(tc_apps, Config)).
+    Config;
+t_concurrent_routing_updates('end', _Config) ->
+    ok = snabbkaffe:stop().
 
 t_concurrent_routing_updates(_Config) ->
     NClients = 400,
@@ -270,6 +279,51 @@ t_concurrent_routing_updates(_Config) ->
     ?assertEqual([], ets:tab2list(?SUBSCRIBER)),
     ?assertEqual([], emqx_router:topics()).
 
+t_concurrent_routing_updates_with_errors(init, Config) ->
+    ok = snabbkaffe:start_trace(),
+    ok = meck:new(emqx_router, [passthrough, no_history]),
+    Config;
+t_concurrent_routing_updates_with_errors('end', _Config) ->
+    ok = meck:unload(emqx_router),
+    ok = snabbkaffe:stop().
+
+t_concurrent_routing_updates_with_errors(_Config) ->
+    NClients = 100,
+    NRTopics = 80,
+    MCommands = 6,
+    PSyncError = 0.1,
+    Port = get_mqtt_tcp_port(node()),
+    %% Crash the batch sync operation with some small probability.
+    ok = meck:expect(emqx_router, mria_batch_run, fun(Vsn, Batch) ->
+        case rand:uniform() < PSyncError of
+            false -> meck:passthrough([Vsn, Batch]);
+            true -> error(overload)
+        end
+    end),
+    Clients = [
+        spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics])
+     || I <- lists:seq(1, NClients)
+    ],
+    ok = lists:foreach(fun ping_concurrent_client/1, Clients),
+    0 = ?retry(
+        _Interval = 500,
+        _NTimes = 10,
+        0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()])
+    ),
+    Subscribers = ets:tab2list(?SUBSCRIBER),
+    Topics = maps:keys(maps:from_list(Subscribers)),
+    ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())),
+    ok = lists:foreach(fun stop_concurrent_client/1, Clients),
+    ok = timer:sleep(100),
+    0 = ?retry(
+        500,
+        10,
+        0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()])
+    ),
+    ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]),
+    ?assertEqual([], ets:tab2list(?SUBSCRIBER)),
+    ?assertEqual([], emqx_router:topics()).
+
 run_concurrent_client(I, Port, MCommands, NRTopics) ->
     % _ = rand:seed(default, I),
     Ctx = #{