Просмотр исходного кода

fix: bad tnx-id when rejoin cluster

zhongwencool 2 лет назад
Родитель
Сommit
221f6eba06

+ 1 - 13
apps/emqx/src/emqx_app.erl

@@ -25,9 +25,7 @@
     get_description/0,
     get_description/0,
     get_release/0,
     get_release/0,
     set_config_loader/1,
     set_config_loader/1,
-    get_config_loader/0,
-    set_init_tnx_id/1,
-    get_init_tnx_id/0
+    get_config_loader/0
 ]).
 ]).
 
 
 -include("logger.hrl").
 -include("logger.hrl").
@@ -65,16 +63,6 @@ set_config_loader(Module) when is_atom(Module) ->
 get_config_loader() ->
 get_config_loader() ->
     application:get_env(emqx, config_loader, emqx).
     application:get_env(emqx, config_loader, emqx).
 
 
-%% @doc Set the transaction id from which this node should start applying after boot.
-%% The transaction ID is received from the core node which we just copied the latest
-%% config from.
-set_init_tnx_id(TnxId) ->
-    application:set_env(emqx, cluster_rpc_init_tnx_id, TnxId).
-
-%% @doc Get the transaction id from which this node should start applying after boot.
-get_init_tnx_id() ->
-    application:get_env(emqx, cluster_rpc_init_tnx_id, -1).
-
 maybe_load_config() ->
 maybe_load_config() ->
     case get_config_loader() of
     case get_config_loader() of
         emqx ->
         emqx ->

+ 44 - 16
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -17,7 +17,7 @@
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 
 %% API
 %% API
--export([start_link/0, mnesia/1]).
+-export([start_link/1, mnesia/1]).
 
 
 %% Note: multicall functions are statically checked by
 %% Note: multicall functions are statically checked by
 %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't
 %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't
@@ -29,7 +29,8 @@
     status/0,
     status/0,
     skip_failed_commit/1,
     skip_failed_commit/1,
     fast_forward_to_commit/2,
     fast_forward_to_commit/2,
-    on_mria_stop/1
+    on_mria_stop/1,
+    wait_for_cluster_rpc/0
 ]).
 ]).
 -export([
 -export([
     commit/2,
     commit/2,
@@ -62,6 +63,10 @@
 -ifdef(TEST).
 -ifdef(TEST).
 -compile(export_all).
 -compile(export_all).
 -compile(nowarn_export_all).
 -compile(nowarn_export_all).
+
+start_link() ->
+    start_link(-1).
+
 -endif.
 -endif.
 
 
 -boot_mnesia({mnesia, [boot]}).
 -boot_mnesia({mnesia, [boot]}).
@@ -107,11 +112,11 @@ mnesia(boot) ->
         {attributes, record_info(fields, cluster_rpc_commit)}
         {attributes, record_info(fields, cluster_rpc_commit)}
     ]).
     ]).
 
 
-start_link() ->
-    start_link(node(), ?MODULE, get_retry_ms()).
+start_link(TnxId) ->
+    start_link(TnxId, node(), ?MODULE, get_retry_ms()).
 
 
-start_link(Node, Name, RetryMs) ->
-    case gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []) of
+start_link(TnxId, Node, Name, RetryMs) ->
+    case gen_server:start_link({local, Name}, ?MODULE, [TnxId, Node, RetryMs], []) of
         {ok, Pid} ->
         {ok, Pid} ->
             {ok, Pid};
             {ok, Pid};
         {error, {already_started, Pid}} ->
         {error, {already_started, Pid}} ->
@@ -276,29 +281,46 @@ on_mria_stop(leave) ->
 on_mria_stop(_) ->
 on_mria_stop(_) ->
     ok.
     ok.
 
 
+wait_for_cluster_rpc() ->
+    %% Workaround for https://github.com/emqx/mria/issues/94:
+    Msg1 = #{msg => "wait_for_cluster_rpc_shard"},
+    case mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1500) of
+        ok -> ?SLOG(info, Msg1#{result => ok});
+        Error0 -> ?SLOG(error, Msg1#{result => Error0})
+    end,
+    Msg2 = #{msg => "wait_for_cluster_rpc_tables"},
+    case mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]) of
+        ok -> ?SLOG(info, Msg2#{result => ok});
+        Error1 -> ?SLOG(error, Msg2#{result => Error1})
+    end,
+    ok.
+
 %%%===================================================================
 %%%===================================================================
 %%% gen_server callbacks
 %%% gen_server callbacks
 %%%===================================================================
 %%%===================================================================
 
 
 %% @private
 %% @private
-init([Node, RetryMs]) ->
+init([TnxId, Node, RetryMs]) ->
     register_mria_stop_cb(fun ?MODULE:on_mria_stop/1),
     register_mria_stop_cb(fun ?MODULE:on_mria_stop/1),
     {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
     {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
     State = #{node => Node, retry_interval => RetryMs, is_leaving => false},
     State = #{node => Node, retry_interval => RetryMs, is_leaving => false},
-    %% The init transaction ID is set in emqx_conf_app after
-    %% it has fetched the latest config from one of the core nodes
-    TnxId = emqx_app:get_init_tnx_id(),
-    ok = maybe_init_tnx_id(Node, TnxId),
     %% Now continue with the normal catch-up process
     %% Now continue with the normal catch-up process
     %% That is: apply the missing transactions after the config
     %% That is: apply the missing transactions after the config
     %% was copied until now.
     %% was copied until now.
-    {ok, State, {continue, ?CATCH_UP}}.
+    {ok, State, {continue, {?CATCH_UP, TnxId}}}.
 
 
 %% @private
 %% @private
-handle_continue(?CATCH_UP, State) ->
+handle_continue({?CATCH_UP, TnxId}, State = #{node := Node}) ->
     %% emqx app must be started before
     %% emqx app must be started before
     %% trying to catch up the rpc commit logs
     %% trying to catch up the rpc commit logs
     ok = wait_for_emqx_ready(),
     ok = wait_for_emqx_ready(),
+    ok = wait_for_cluster_rpc(),
+    %% The init transaction ID is set in emqx_conf_app after
+    %% it has fetched the latest config from one of the core nodes
+    ok = maybe_init_tnx_id(Node, TnxId),
+    {noreply, State, catch_up(State)};
+%% @private
+handle_continue(?CATCH_UP, State) ->
     {noreply, State, catch_up(State)}.
     {noreply, State, catch_up(State)}.
 
 
 handle_call(reset, _From, State) ->
 handle_call(reset, _From, State) ->
@@ -388,7 +410,8 @@ read_next_mfa(Node) ->
                 }),
                 }),
                 TnxId;
                 TnxId;
             [#cluster_rpc_commit{tnx_id = LastAppliedID}] ->
             [#cluster_rpc_commit{tnx_id = LastAppliedID}] ->
-                LastAppliedID + 1
+                OldestId = get_oldest_mfa_id(),
+                max(LastAppliedID + 1, OldestId)
         end,
         end,
     case mnesia:read(?CLUSTER_MFA, NextId) of
     case mnesia:read(?CLUSTER_MFA, NextId) of
         [] -> caught_up;
         [] -> caught_up;
@@ -404,8 +427,7 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
         true ->
         true ->
             NodeId;
             NodeId;
         false ->
         false ->
-            {atomic, LatestId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
-            case LatestId =< NodeId of
+            case latest_tnx_id() =< NodeId of
                 true ->
                 true ->
                     NodeId;
                     NodeId;
                 false ->
                 false ->
@@ -420,6 +442,12 @@ get_cluster_tnx_id() ->
         Id -> Id
         Id -> Id
     end.
     end.
 
 
+get_oldest_mfa_id() ->
+    case mnesia:first(?CLUSTER_MFA) of
+        '$end_of_table' -> 0;
+        Id -> Id
+    end.
+
 %% The entry point of a config change transaction.
 %% The entry point of a config change transaction.
 init_mfa(Node, MFA) ->
 init_mfa(Node, MFA) ->
     mnesia:write_lock_table(?CLUSTER_MFA),
     mnesia:write_lock_table(?CLUSTER_MFA),

+ 1 - 1
apps/emqx_conf/src/emqx_conf.app.src

@@ -1,6 +1,6 @@
 {application, emqx_conf, [
 {application, emqx_conf, [
     {description, "EMQX configuration management"},
     {description, "EMQX configuration management"},
-    {vsn, "0.1.23"},
+    {vsn, "0.1.24"},
     {registered, []},
     {registered, []},
     {mod, {emqx_conf_app, []}},
     {mod, {emqx_conf_app, []}},
     {applications, [kernel, stdlib, emqx_ctl]},
     {applications, [kernel, stdlib, emqx_ctl]},

+ 13 - 14
apps/emqx_conf/src/emqx_conf_app.erl

@@ -31,16 +31,17 @@
 -define(DEFAULT_INIT_TXN_ID, -1).
 -define(DEFAULT_INIT_TXN_ID, -1).
 
 
 start(_StartType, _StartArgs) ->
 start(_StartType, _StartArgs) ->
-    try
-        ok = init_conf()
-    catch
-        C:E:St ->
-            %% logger is not quite ready.
-            io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
-            init:stop(1)
-    end,
+    {ok, TnxId} =
+        try
+            {ok, _} = init_conf()
+        catch
+            C:E:St ->
+                %% logger is not quite ready.
+                io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
+                init:stop(1)
+        end,
     ok = emqx_config_logger:refresh_config(),
     ok = emqx_config_logger:refresh_config(),
-    emqx_conf_sup:start_link().
+    emqx_conf_sup:start_link(TnxId).
 
 
 stop(_State) ->
 stop(_State) ->
     ok.
     ok.
@@ -112,12 +113,10 @@ init_load_done() ->
     emqx_app:get_config_loader() =/= emqx.
     emqx_app:get_config_loader() =/= emqx.
 
 
 init_conf() ->
 init_conf() ->
-    %% Workaround for https://github.com/emqx/mria/issues/94:
-    _ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
-    _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
+    emqx_cluster_rpc:wait_for_cluster_rpc(),
     {ok, TnxId} = sync_cluster_conf(),
     {ok, TnxId} = sync_cluster_conf(),
-    _ = emqx_app:set_init_tnx_id(TnxId),
-    ok = init_load().
+    ok = init_load(),
+    {ok, TnxId}.
 
 
 cluster_nodes() ->
 cluster_nodes() ->
     mria:cluster_nodes(cores) -- [node()].
     mria:cluster_nodes(cores) -- [node()].

+ 3 - 3
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -684,10 +684,10 @@ fields("cluster_call") ->
             )},
             )},
         {"max_history",
         {"max_history",
             sc(
             sc(
-                range(1, 500),
+                range(100, 10240),
                 #{
                 #{
                     desc => ?DESC(cluster_call_max_history),
                     desc => ?DESC(cluster_call_max_history),
-                    default => 100
+                    default => 1024
                 }
                 }
             )},
             )},
         {"cleanup_interval",
         {"cleanup_interval",
@@ -695,7 +695,7 @@ fields("cluster_call") ->
                 emqx_schema:duration(),
                 emqx_schema:duration(),
                 #{
                 #{
                     desc => ?DESC(cluster_call_cleanup_interval),
                     desc => ?DESC(cluster_call_cleanup_interval),
-                    default => <<"5m">>
+                    default => <<"24h">>
                 }
                 }
             )}
             )}
     ];
     ];

+ 5 - 5
apps/emqx_conf/src/emqx_conf_sup.erl

@@ -18,16 +18,16 @@
 
 
 -behaviour(supervisor).
 -behaviour(supervisor).
 
 
--export([start_link/0]).
+-export([start_link/1]).
 
 
 -export([init/1]).
 -export([init/1]).
 
 
 -define(SERVER, ?MODULE).
 -define(SERVER, ?MODULE).
 
 
-start_link() ->
-    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link(TnxId) ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, [TnxId]).
 
 
-init([]) ->
+init([TnxId]) ->
     SupFlags = #{
     SupFlags = #{
         strategy => one_for_all,
         strategy => one_for_all,
         intensity => 10,
         intensity => 10,
@@ -35,7 +35,7 @@ init([]) ->
     },
     },
     ChildSpecs =
     ChildSpecs =
         [
         [
-            child_spec(emqx_cluster_rpc, []),
+            child_spec(emqx_cluster_rpc, [TnxId]),
             child_spec(emqx_cluster_rpc_cleaner, [])
             child_spec(emqx_cluster_rpc_cleaner, [])
         ],
         ],
     {ok, {SupFlags, ChildSpecs}}.
     {ok, {SupFlags, ChildSpecs}}.