|
|
@@ -29,7 +29,9 @@
|
|
|
status/0,
|
|
|
skip_failed_commit/1,
|
|
|
fast_forward_to_commit/2,
|
|
|
- on_mria_stop/1
|
|
|
+ on_mria_stop/1,
|
|
|
+ wait_for_cluster_rpc/0,
|
|
|
+ maybe_init_tnx_id/2
|
|
|
]).
|
|
|
-export([
|
|
|
commit/2,
|
|
|
@@ -59,16 +61,17 @@
|
|
|
|
|
|
-export_type([tnx_id/0, succeed_num/0]).
|
|
|
|
|
|
--ifdef(TEST).
|
|
|
--compile(export_all).
|
|
|
--compile(nowarn_export_all).
|
|
|
--endif.
|
|
|
-
|
|
|
-boot_mnesia({mnesia, [boot]}).
|
|
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include("emqx_conf.hrl").
|
|
|
|
|
|
+-ifdef(TEST).
|
|
|
+-compile(export_all).
|
|
|
+-compile(nowarn_export_all).
|
|
|
+
|
|
|
+-endif.
|
|
|
+
|
|
|
-define(INITIATE(MFA), {initiate, MFA}).
|
|
|
-define(CATCH_UP, catch_up).
|
|
|
-define(TIMEOUT, timer:minutes(1)).
|
|
|
@@ -276,6 +279,20 @@ on_mria_stop(leave) ->
|
|
|
on_mria_stop(_) ->
|
|
|
ok.
|
|
|
|
|
|
+wait_for_cluster_rpc() ->
|
|
|
+ %% Workaround for https://github.com/emqx/mria/issues/94:
|
|
|
+ Msg1 = #{msg => "wait_for_cluster_rpc_shard"},
|
|
|
+ case mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1500) of
|
|
|
+ ok -> ?SLOG(info, Msg1#{result => ok});
|
|
|
+ Error0 -> ?SLOG(error, Msg1#{result => Error0})
|
|
|
+ end,
|
|
|
+ Msg2 = #{msg => "wait_for_cluster_rpc_tables"},
|
|
|
+ case mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]) of
|
|
|
+ ok -> ?SLOG(info, Msg2#{result => ok});
|
|
|
+ Error1 -> ?SLOG(error, Msg2#{result => Error1})
|
|
|
+ end,
|
|
|
+ ok.
|
|
|
+
|
|
|
%%%===================================================================
|
|
|
%%% gen_server callbacks
|
|
|
%%%===================================================================
|
|
|
@@ -285,20 +302,19 @@ init([Node, RetryMs]) ->
|
|
|
register_mria_stop_cb(fun ?MODULE:on_mria_stop/1),
|
|
|
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
|
|
|
State = #{node => Node, retry_interval => RetryMs, is_leaving => false},
|
|
|
- %% The init transaction ID is set in emqx_conf_app after
|
|
|
- %% it has fetched the latest config from one of the core nodes
|
|
|
- TnxId = emqx_app:get_init_tnx_id(),
|
|
|
- ok = maybe_init_tnx_id(Node, TnxId),
|
|
|
%% Now continue with the normal catch-up process
|
|
|
%% That is: apply the missing transactions after the config
|
|
|
%% was copied until now.
|
|
|
- {ok, State, {continue, ?CATCH_UP}}.
|
|
|
+ {ok, State, {continue, {?CATCH_UP, init}}}.
|
|
|
|
|
|
%% @private
|
|
|
-handle_continue(?CATCH_UP, State) ->
|
|
|
+handle_continue({?CATCH_UP, init}, State) ->
|
|
|
%% emqx app must be started before
|
|
|
%% trying to catch up the rpc commit logs
|
|
|
ok = wait_for_emqx_ready(),
|
|
|
+ ok = wait_for_cluster_rpc(),
|
|
|
+ {noreply, State, catch_up(State)};
|
|
|
+handle_continue(?CATCH_UP, State) ->
|
|
|
{noreply, State, catch_up(State)}.
|
|
|
|
|
|
handle_call(reset, _From, State) ->
|
|
|
@@ -388,7 +404,8 @@ read_next_mfa(Node) ->
|
|
|
}),
|
|
|
TnxId;
|
|
|
[#cluster_rpc_commit{tnx_id = LastAppliedID}] ->
|
|
|
- LastAppliedID + 1
|
|
|
+ OldestId = get_oldest_mfa_id(),
|
|
|
+ max(LastAppliedID + 1, OldestId)
|
|
|
end,
|
|
|
case mnesia:read(?CLUSTER_MFA, NextId) of
|
|
|
[] -> caught_up;
|
|
|
@@ -404,8 +421,7 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
|
|
|
true ->
|
|
|
NodeId;
|
|
|
false ->
|
|
|
- {atomic, LatestId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
|
|
|
- case LatestId =< NodeId of
|
|
|
+ case latest_tnx_id() =< NodeId of
|
|
|
true ->
|
|
|
NodeId;
|
|
|
false ->
|
|
|
@@ -420,6 +436,12 @@ get_cluster_tnx_id() ->
|
|
|
Id -> Id
|
|
|
end.
|
|
|
|
|
|
+get_oldest_mfa_id() ->
|
|
|
+ case mnesia:first(?CLUSTER_MFA) of
|
|
|
+ '$end_of_table' -> 0;
|
|
|
+ Id -> Id
|
|
|
+ end.
|
|
|
+
|
|
|
%% The entry point of a config change transaction.
|
|
|
init_mfa(Node, MFA) ->
|
|
|
mnesia:write_lock_table(?CLUSTER_MFA),
|