Explorar el Código

test(routesync): verify that syncer preserves consistency

Under a highly concurrent load. Be aware that this testcase is not
deterministic.
Andrew Mayorov hace 2 años
padre
commit
a1ccf85c66
Se han modificado 1 ficheros con 137 adiciones y 1 borrados
  1. 137 1
      apps/emqx/test/emqx_routing_SUITE.erl

+ 137 - 1
apps/emqx/test/emqx_routing_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx_router.hrl").
 
@@ -29,7 +30,8 @@ all() ->
         {group, routing_schema_v1},
         {group, routing_schema_v2},
         t_routing_schema_switch_v1,
-        t_routing_schema_switch_v2
+        t_routing_schema_switch_v2,
+        t_concurrent_routing_updates
     ].
 
 groups() ->
@@ -182,6 +184,140 @@ unsubscribe(C, Topic) ->
 
 %%
 
+-define(SUBSCRIBE_TOPICS, [
+    <<"t/#">>,
+    <<"t/fixed">>,
+    <<"t/1/+">>,
+    <<"t/2/+">>,
+    <<"t/42/+/+">>,
+    <<"client/${i}/+">>,
+    <<"client/${i}/fixed">>,
+    <<"client/${i}/#">>,
+    <<"rand/${r}/+">>,
+    <<"rand/${r}/fixed">>
+]).
+
+t_concurrent_routing_updates(init, Config) ->
+    WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{
+                config => #{broker => #{routing => #{storage_schema => v2}}},
+                before_start => fun() ->
+                    % NOTE
+                    % This one is actually defined on `emqx_conf_schema` level, but used
+                    % in `emqx_broker`. Thus we have to resort to this ugly hack.
+                    emqx_config:force_put([node, broker_pool_size], 2)
+                end
+            }}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    ok = snabbkaffe:start_trace(),
+    [{tc_apps, Apps} | Config];
+t_concurrent_routing_updates('end', Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_suite:stop(?config(tc_apps, Config)).
+
+t_concurrent_routing_updates(_Config) ->
+    NClients = 400,
+    NRTopics = 250,
+    MCommands = 8,
+    Port = get_mqtt_tcp_port(node()),
+    Clients = [
+        spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics])
+     || I <- lists:seq(1, NClients)
+    ],
+    ok = lists:foreach(fun ping_concurrent_client/1, Clients),
+    ok = timer:sleep(200),
+    Subscribers = ets:tab2list(?SUBSCRIBER),
+    Topics = maps:keys(maps:from_list(Subscribers)),
+    ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())),
+    ok = lists:foreach(fun stop_concurrent_client/1, Clients),
+    ok = timer:sleep(1000),
+    ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]),
+    ?assertEqual([], ets:tab2list(?SUBSCRIBER)),
+    ?assertEqual([], emqx_router:topics()).
+
+run_concurrent_client(I, Port, MCommands, NRTopics) ->
+    % _ = rand:seed(default, I),
+    Ctx = #{
+        i => I,
+        r => rand:uniform(NRTopics)
+    },
+    {ok, C} = emqtt:start_link(#{port => Port, clientid => render("client:${i}", Ctx)}),
+    {ok, _Props} = emqtt:connect(C),
+    NCommands = rand:uniform(MCommands),
+    Commands = gen_concurrent_client_plan(NCommands, Ctx),
+    ok = subscribe_concurrent_client(C, Commands),
+    run_concurrent_client_loop(C).
+
+gen_concurrent_client_plan(N, Ctx) ->
+    lists:foldl(
+        fun(_, Acc) -> mixin(pick_random_command(Ctx), Acc) end,
+        [],
+        lists:seq(1, N)
+    ).
+
+subscribe_concurrent_client(C, Commands) ->
+    lists:foreach(
+        fun
+            ({subscribe, Topic}) ->
+                {ok, _Props, [0]} = emqtt:subscribe(C, Topic);
+            ({unsubscribe, Topic}) ->
+                {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic)
+        end,
+        Commands
+    ).
+
+pick_random_command(Ctx) ->
+    Topic = render(randpick(?SUBSCRIBE_TOPICS), Ctx),
+    randpick([
+        [{subscribe, Topic}],
+        [{subscribe, Topic}, {unsubscribe, Topic}]
+    ]).
+
+render(Template, Ctx) ->
+    iolist_to_binary(emqx_template:render_strict(emqx_template:parse(Template), Ctx)).
+
+run_concurrent_client_loop(C) ->
+    receive
+        {From, Ref, F} ->
+            Reply = F(C),
+            From ! {Ref, Reply},
+            run_concurrent_client_loop(C)
+    end.
+
+ping_concurrent_client(Pid) ->
+    Ref = make_ref(),
+    Pid ! {self(), Ref, fun emqtt:ping/1},
+    receive
+        {Ref, Reply} -> Reply
+    after 5000 ->
+        error(timeout)
+    end.
+
+stop_concurrent_client(Pid) ->
+    MRef = erlang:monitor(process, Pid),
+    true = erlang:unlink(Pid),
+    true = erlang:exit(Pid, shutdown),
+    receive
+        {'DOWN', MRef, process, Pid, Reason} -> Reason
+    end.
+
+randpick(List) ->
+    lists:nth(rand:uniform(length(List)), List).
+
+mixin(L = [H | T], Into = [HInto | TInto]) ->
+    case rand:uniform(length(Into) + 1) of
+        1 -> [H | mixin(T, Into)];
+        _ -> [HInto | mixin(L, TInto)]
+    end;
+mixin(L, Into) ->
+    L ++ Into.
+
+%%
+
 t_routing_schema_switch_v1(Config) ->
     WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
     t_routing_schema_switch(_From = v2, _To = v1, WorkDir).