Просмотр исходного кода

fix(router): wait for tables replicate before choosing schema vsn

Andrew Mayorov 2 лет назад
Родитель
Сommit
5024304bf9
3 измененных файлов с 117 добавлено и 20 удалено
  1. 2 0
      apps/emqx/src/emqx_router.erl
  2. 5 0
      apps/emqx/src/emqx_trie.erl
  3. 110 20
      apps/emqx/test/emqx_routing_SUITE.erl

+ 2 - 0
apps/emqx/src/emqx_router.erl

@@ -465,6 +465,8 @@ get_schema_vsn() ->
 
 -spec init_schema() -> ok.
 init_schema() ->
+    ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]),
+    ok = emqx_trie:wait_for_tables(),
     ConfSchema = emqx_config:get([broker, routing, storage_schema]),
     Schema = choose_schema_vsn(ConfSchema),
     ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),

+ 5 - 0
apps/emqx/src/emqx_trie.erl

@@ -21,6 +21,7 @@
 %% Mnesia bootstrap
 -export([
     mnesia/1,
+    wait_for_tables/0,
     create_session_trie/1
 ]).
 
@@ -105,6 +106,10 @@ create_session_trie(Type) ->
         ]
     ).
 
+-spec wait_for_tables() -> ok | {error, _Reason}.
+wait_for_tables() ->
+    mria:wait_for_tables([?TRIE]).
+
 %%--------------------------------------------------------------------
 %% Topics APIs
 %%--------------------------------------------------------------------

+ 110 - 20
apps/emqx/test/emqx_routing_SUITE.erl

@@ -26,11 +26,15 @@
 all() ->
     [
         {group, routing_schema_v1},
-        {group, routing_schema_v2}
+        {group, routing_schema_v2},
+        t_routing_schema_switch_v1,
+        t_routing_schema_switch_v2
     ].
 
 groups() ->
-    TCs = emqx_common_test_helpers:all(?MODULE),
+    TCs = [
+        t_cluster_routing
+    ],
     [
         {routing_schema_v1, [], TCs},
         {routing_schema_v2, [], TCs}
@@ -39,28 +43,38 @@ groups() ->
 init_per_group(GroupName, Config) ->
     WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
     NodeSpecs = [
-        {emqx_routing_SUITE1, #{apps => mk_appspecs(GroupName, 1), role => core}},
-        {emqx_routing_SUITE2, #{apps => mk_appspecs(GroupName, 2), role => core}},
-        {emqx_routing_SUITE3, #{apps => mk_appspecs(GroupName, 3), role => replicant}}
+        {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}},
+        {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}},
+        {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}}
     ],
     Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
-    [{cluster, Nodes}, Config].
+    [{cluster, Nodes} | Config].
 
 end_per_group(_GroupName, Config) ->
     emqx_cth_cluster:stop(?config(cluster, Config)).
 
-mk_appspecs(GroupName, N) ->
-    [
-        {emqx, #{
-            config => mk_config(GroupName, N),
-            after_start => fun() ->
-                % NOTE
-                % This one is actually defined on `emqx_conf_schema` level, but used
-                % in `emqx_broker`. Thus we have to resort to this ugly hack.
-                emqx_config:force_put([rpc, mode], async)
-            end
-        }}
-    ].
+init_per_testcase(TC, Config) ->
+    WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]),
+    [{work_dir, WorkDir} | Config].
+
+end_per_testcase(_TC, _Config) ->
+    ok.
+
+mk_emqx_appspec(GroupName, N) ->
+    {emqx, #{
+        config => mk_config(GroupName, N),
+        after_start => fun() ->
+            % NOTE
+            % This one is actually defined on `emqx_conf_schema` level, but used
+            % in `emqx_broker`. Thus we have to resort to this ugly hack.
+            emqx_config:force_put([rpc, mode], async)
+        end
+    }}.
+
+mk_genrpc_appspec() ->
+    {gen_rpc, #{
+        override_env => [{port_discovery, stateless}]
+    }}.
 
 mk_config(GroupName, N) ->
     #{
@@ -68,9 +82,9 @@ mk_config(GroupName, N) ->
         listeners => mk_config_listeners(N)
     }.
 
-mk_config_broker(routing_schema_v1) ->
+mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 ->
     #{routing => #{storage_schema => v1}};
-mk_config_broker(routing_schema_v2) ->
+mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 ->
     #{routing => #{storage_schema => v2}}.
 
 mk_config_listeners(N) ->
@@ -82,6 +96,8 @@ mk_config_listeners(N) ->
         wss => #{default => #{enable => false}}
     }.
 
+%%
+
 t_cluster_routing(Config) ->
     Cluster = ?config(cluster, Config),
     Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster],
@@ -163,6 +179,80 @@ unsubscribe(C, Topic) ->
     {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic),
     ok = timer:sleep(200).
 
+%%
+
+t_routing_schema_switch_v1(Config) ->
+    t_routing_schema_switch(_From = v2, _To = v1, Config).
+
+t_routing_schema_switch_v2(Config) ->
+    t_routing_schema_switch(_From = v1, _To = v2, Config).
+
+t_routing_schema_switch(VFrom, VTo, Config) ->
+    % Start first node with routing schema VTo (e.g. v1)
+    WorkDir = ?config(work_dir, Config),
+    [Node1] = emqx_cth_cluster:start(
+        [
+            {routing_schema_switch1, #{
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)]
+            }}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    % Ensure there's at least 1 route on Node1
+    C1 = start_client(Node1),
+    ok = subscribe(C1, <<"a/+/c">>),
+    ok = subscribe(C1, <<"d/e/f/#">>),
+    % Start rest of nodes with routing schema VFrom (e.g. v2)
+    [Node2, Node3] = emqx_cth_cluster:start(
+        [
+            {routing_schema_switch2, #{
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)],
+                base_port => 20000,
+                join_to => Node1
+            }},
+            {routing_schema_switch3, #{
+                apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)],
+                base_port => 20100,
+                join_to => Node1
+            }}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    % Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively
+    Nodes = [Node1, Node2, Node3],
+    ?assertEqual(
+        [{ok, VTo}, {ok, VTo}, {ok, VTo}],
+        erpc:multicall(Nodes, emqx_router, get_schema_vsn, [])
+    ),
+    % Wait for all nodes to agree on cluster state
+    ?retry(
+        500,
+        10,
+        ?assertMatch(
+            [{ok, [Node1, Node2, Node3]}],
+            lists:usort(erpc:multicall(Nodes, emqx, running_nodes, []))
+        )
+    ),
+    % Verify that routing works as expected
+    C2 = start_client(Node2),
+    ok = subscribe(C2, <<"a/+/d">>),
+    C3 = start_client(Node3),
+    ok = subscribe(C3, <<"d/e/f/#">>),
+    {ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>),
+    {ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>),
+    {ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>),
+    ?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}),
+    ?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}),
+    ?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
+    ?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
+    ?assertNotReceive(_),
+    ok = emqtt:stop(C1),
+    ok = emqtt:stop(C2),
+    ok = emqtt:stop(C3),
+    ok = emqx_cth_cluster:stop(Nodes).
+
+%%
+
 get_mqtt_tcp_port(Node) ->
     {_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
     Port.