Просмотр исходного кода

fix(clusterlink): ensure extrouter works on replicants

This is sort of a quick fix to make things safe, but it will likely be
a subject to the same drawbacks as the regular router in high-latency
deployments: reduced throughput.
Andrew Mayorov 1 год назад
Родитель
Сommit
4097585f5d

+ 27 - 6
apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl

@@ -20,6 +20,14 @@
     actor_gc/1
 ]).
 
+%% Internal API
+-export([
+    mnesia_actor_init/3,
+    mnesia_actor_heartbeat/3,
+    mnesia_clean_incarnation/1,
+    apply_actor_operation/5
+]).
+
 %% Strictly monotonically increasing integer.
 -type smint() :: integer().
 
@@ -127,8 +135,8 @@ match_to_route(M) ->
 
 -spec actor_init(actor(), incarnation(), env()) -> {ok, state()}.
 actor_init(Actor, Incarnation, Env = #{timestamp := Now}) ->
-    %% FIXME: Sane transactions.
-    case transaction(fun mnesia_actor_init/3, [Actor, Incarnation, Now]) of
+    %% TODO: Rolling upgrade safety?
+    case transaction(fun ?MODULE:mnesia_actor_init/3, [Actor, Incarnation, Now]) of
         {ok, State} ->
             {ok, State};
         {reincarnate, Rec} ->
@@ -173,17 +181,30 @@ actor_apply_operation(
     State = #state{actor = Actor, incarnation = Incarnation, lane = Lane},
     _Env
 ) ->
-    _ = assert_current_incarnation(Actor, Incarnation),
-    _ = apply_operation(emqx_topic_index:make_key(TopicFilter, ID), OpName, Lane),
+    Entry = emqx_topic_index:make_key(TopicFilter, ID),
+    case mria_config:whoami() of
+        Role when Role /= replicant ->
+            apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane);
+        replicant ->
+            mria:async_dirty(
+                ?EXTROUTE_SHARD,
+                fun ?MODULE:apply_actor_operation/5,
+                [Actor, Incarnation, Entry, OpName, Lane]
+            )
+    end,
     State;
 actor_apply_operation(
     heartbeat,
     State = #state{actor = Actor, incarnation = Incarnation},
     _Env = #{timestamp := Now}
 ) ->
-    ok = transaction(fun mnesia_actor_heartbeat/3, [Actor, Incarnation, Now]),
+    ok = transaction(fun ?MODULE:mnesia_actor_heartbeat/3, [Actor, Incarnation, Now]),
     State.
 
+apply_actor_operation(Actor, Incarnation, Entry, OpName, Lane) ->
+    _ = assert_current_incarnation(Actor, Incarnation),
+    apply_operation(Entry, OpName, Lane).
+
 apply_operation(Entry, OpName, Lane) ->
     %% NOTE
     %% This is safe sequence of operations only on core nodes. On replicants,
@@ -259,7 +280,7 @@ mnesia_actor_heartbeat(Actor, Incarnation, TS) ->
     end.
 
 clean_incarnation(Rec) ->
-    transaction(fun mnesia_clean_incarnation/1, [Rec]).
+    transaction(fun ?MODULE:mnesia_clean_incarnation/1, [Rec]).
 
 mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) ->
     case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of

+ 28 - 9
apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl

@@ -33,6 +33,12 @@ end_per_testcase(TC, Config) ->
 init_db() ->
     mria:wait_for_tables(emqx_cluster_link_extrouter:create_tables()).
 
+init_db_nodes(Nodes) ->
+    ok = lists:foreach(
+        fun(Node) -> ok = erpc:call(Node, ?MODULE, init_db, []) end,
+        Nodes
+    ).
+
 %%
 
 t_consistent_routing_view(_Config) ->
@@ -174,20 +180,15 @@ t_consistent_routing_view_concurrent_updates(_Config) ->
 
 t_consistent_routing_view_concurrent_cluster_updates('init', Config) ->
     Specs = [
-        {emqx_external_router1, #{role => core}},
-        {emqx_external_router2, #{role => core}},
-        {emqx_external_router3, #{role => core}}
+        {emqx_cluster_link_extrouter1, #{role => core}},
+        {emqx_cluster_link_extrouter2, #{role => core}},
+        {emqx_cluster_link_extrouter3, #{role => core}}
     ],
     Cluster = emqx_cth_cluster:start(
         Specs,
         #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
     ),
-    ok = lists:foreach(
-        fun(Node) ->
-            ok = erpc:call(Node, ?MODULE, init_db, [])
-        end,
-        Cluster
-    ),
+    ok = init_db_nodes(Cluster),
     [{cluster, Cluster} | Config];
 t_consistent_routing_view_concurrent_cluster_updates('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(cluster, Config)).
@@ -236,6 +237,24 @@ t_consistent_routing_view_concurrent_cluster_updates(Config) ->
         erpc:call(N1, ?MODULE, topics_sorted, [])
     ).
 
+t_consistent_routing_view_concurrent_cluster_replicant_updates('init', Config) ->
+    Specs = [
+        {emqx_cluster_link_extrouter_repl1, #{role => core}},
+        {emqx_cluster_link_extrouter_repl2, #{role => core}},
+        {emqx_cluster_link_extrouter_repl3, #{role => replicant}}
+    ],
+    Cluster = emqx_cth_cluster:start(
+        Specs,
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+    ),
+    ok = init_db_nodes(Cluster),
+    [{cluster, Cluster} | Config];
+t_consistent_routing_view_concurrent_cluster_replicant_updates('end', Config) ->
+    ok = emqx_cth_cluster:stop(?config(cluster, Config)).
+
+t_consistent_routing_view_concurrent_cluster_replicant_updates(Config) ->
+    t_consistent_routing_view_concurrent_cluster_updates(Config).
+
 run_remote_actor({Node, Run}) ->
     erlang:spawn_monitor(Node, ?MODULE, run_actor, [Run]).