|
@@ -32,6 +32,7 @@
|
|
|
]).
|
|
]).
|
|
|
-export([
|
|
-export([
|
|
|
get_node_tnx_id/1,
|
|
get_node_tnx_id/1,
|
|
|
|
|
+ get_cluster_tnx_id/0,
|
|
|
latest_tnx_id/0,
|
|
latest_tnx_id/0,
|
|
|
make_initiate_call_req/3
|
|
make_initiate_call_req/3
|
|
|
]).
|
|
]).
|
|
@@ -60,9 +61,11 @@
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include("emqx_conf.hrl").
|
|
-include("emqx_conf.hrl").
|
|
|
|
|
|
|
|
--define(INITIATE(MFA, LatestIdLastSeen), {initiate, MFA, LatestIdLastSeen}).
|
|
|
|
|
|
|
+-define(INITIATE(MFA), {initiate, MFA}).
|
|
|
-define(CATCH_UP, catch_up).
|
|
-define(CATCH_UP, catch_up).
|
|
|
-define(TIMEOUT, timer:minutes(1)).
|
|
-define(TIMEOUT, timer:minutes(1)).
|
|
|
|
|
+-define(APPLY_KIND_REPLICATE, replicate).
|
|
|
|
|
+-define(APPLY_KIND_INITIATE, initiate).
|
|
|
|
|
|
|
|
-type tnx_id() :: pos_integer().
|
|
-type tnx_id() :: pos_integer().
|
|
|
|
|
|
|
@@ -74,7 +77,7 @@
|
|
|
| {peers_lagging, tnx_id(), Result, [node()]}.
|
|
| {peers_lagging, tnx_id(), Result, [node()]}.
|
|
|
|
|
|
|
|
-type multicall_return() :: multicall_return(_).
|
|
-type multicall_return() :: multicall_return(_).
|
|
|
--type init_call_req() :: ?INITIATE({module(), atom(), list()}, tnx_id()).
|
|
|
|
|
|
|
+-type init_call_req() :: ?INITIATE({module(), atom(), list()}).
|
|
|
|
|
|
|
|
%%%===================================================================
|
|
%%%===================================================================
|
|
|
%%% API
|
|
%%% API
|
|
@@ -146,25 +149,11 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req
|
|
|
%% return {init_failure, Error} when the initial MFA result is no ok or {ok, term()}.
|
|
%% return {init_failure, Error} when the initial MFA result is no ok or {ok, term()}.
|
|
|
%% return {peers_lagging, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
|
|
%% return {peers_lagging, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
|
|
|
-spec do_multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
|
|
-spec do_multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
|
|
|
-do_multicall(M, F, A, RequiredSyncs, Timeout) when
|
|
|
|
|
- RequiredSyncs =:= all orelse RequiredSyncs >= 1
|
|
|
|
|
-->
|
|
|
|
|
- %% Idealy 'LatestId' should be provided by the multicall originator,
|
|
|
|
|
- %% which is the viewer of the state e.g.
|
|
|
|
|
- %% * Sysadmin who issues CLI-commands or REST-API calls to make config changes
|
|
|
|
|
- %% * Dashboard viewer who is making decision based on what they can see from the UI
|
|
|
|
|
- %% To reach the ideal state, it would require adding transaction ID to each and
|
|
|
|
|
- %% every view/GET requests and also provide the ID as a part of the view/GET responses.
|
|
|
|
|
- %%
|
|
|
|
|
- %% To keep things simple, we try to get the 'old' view when a multicall request
|
|
|
|
|
- %% is received as early as possible.
|
|
|
|
|
- %%
|
|
|
|
|
- %% Reason to do this:
|
|
|
|
|
- %% The 'initiate' call handler tries to take a table lock (cluster-wide) before
|
|
|
|
|
- %% bumping the transaction ID. While waiting for the lock, the ID might have been
|
|
|
|
|
- %% bumpped by another node in the cluster.
|
|
|
|
|
- InitReq = make_initiate_call_req(M, F, A),
|
|
|
|
|
|
|
+do_multicall(M, F, A, RequiredSyncs, Timeout) ->
|
|
|
|
|
+ %% assert
|
|
|
|
|
+ true = (RequiredSyncs =:= all orelse RequiredSyncs >= 1),
|
|
|
Begin = erlang:monotonic_time(),
|
|
Begin = erlang:monotonic_time(),
|
|
|
|
|
+ InitReq = make_initiate_call_req(M, F, A),
|
|
|
InitRes =
|
|
InitRes =
|
|
|
case mria_rlog:role() of
|
|
case mria_rlog:role() of
|
|
|
core ->
|
|
core ->
|
|
@@ -216,13 +205,12 @@ status() ->
|
|
|
|
|
|
|
|
-spec latest_tnx_id() -> pos_integer().
|
|
-spec latest_tnx_id() -> pos_integer().
|
|
|
latest_tnx_id() ->
|
|
latest_tnx_id() ->
|
|
|
- {atomic, TnxId} = transaction(fun get_latest_id/0, []),
|
|
|
|
|
|
|
+ {atomic, TnxId} = transaction(fun get_cluster_tnx_id/0, []),
|
|
|
TnxId.
|
|
TnxId.
|
|
|
|
|
|
|
|
-spec make_initiate_call_req(module(), atom(), list()) -> init_call_req().
|
|
-spec make_initiate_call_req(module(), atom(), list()) -> init_call_req().
|
|
|
make_initiate_call_req(M, F, A) ->
|
|
make_initiate_call_req(M, F, A) ->
|
|
|
- TnxId = get_latest_id(dirty),
|
|
|
|
|
- ?INITIATE({M, F, A}, TnxId).
|
|
|
|
|
|
|
+ ?INITIATE({M, F, A}).
|
|
|
|
|
|
|
|
-spec get_node_tnx_id(node()) -> integer().
|
|
-spec get_node_tnx_id(node()) -> integer().
|
|
|
get_node_tnx_id(Node) ->
|
|
get_node_tnx_id(Node) ->
|
|
@@ -289,8 +277,8 @@ handle_call(reset, _From, State) ->
|
|
|
_ = mria:clear_table(?CLUSTER_COMMIT),
|
|
_ = mria:clear_table(?CLUSTER_COMMIT),
|
|
|
_ = mria:clear_table(?CLUSTER_MFA),
|
|
_ = mria:clear_table(?CLUSTER_MFA),
|
|
|
{reply, ok, State, {continue, ?CATCH_UP}};
|
|
{reply, ok, State, {continue, ?CATCH_UP}};
|
|
|
-handle_call(?INITIATE(MFA, LatestId), _From, State = #{node := Node}) ->
|
|
|
|
|
- case transaction(fun init_mfa/3, [Node, MFA, LatestId]) of
|
|
|
|
|
|
|
+handle_call(?INITIATE(MFA), _From, State = #{node := Node}) ->
|
|
|
|
|
+ case transaction(fun init_mfa/2, [Node, MFA]) of
|
|
|
{atomic, {ok, TnxId, Result}} ->
|
|
{atomic, {ok, TnxId, Result}} ->
|
|
|
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
|
|
{reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
|
|
|
{aborted, Error} ->
|
|
{aborted, Error} ->
|
|
@@ -330,7 +318,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
|
|
|
{atomic, caught_up} ->
|
|
{atomic, caught_up} ->
|
|
|
?TIMEOUT;
|
|
?TIMEOUT;
|
|
|
{atomic, {still_lagging, NextId, MFA}} ->
|
|
{atomic, {still_lagging, NextId, MFA}} ->
|
|
|
- {Succeed, _} = apply_mfa(NextId, MFA, catch_up),
|
|
|
|
|
|
|
+ {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE),
|
|
|
case Succeed orelse SkipResult of
|
|
case Succeed orelse SkipResult of
|
|
|
true ->
|
|
true ->
|
|
|
case transaction(fun commit/2, [Node, NextId]) of
|
|
case transaction(fun commit/2, [Node, NextId]) of
|
|
@@ -356,7 +344,7 @@ read_next_mfa(Node) ->
|
|
|
NextId =
|
|
NextId =
|
|
|
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
|
|
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
|
|
|
[] ->
|
|
[] ->
|
|
|
- LatestId = get_latest_id(),
|
|
|
|
|
|
|
+ LatestId = get_cluster_tnx_id(),
|
|
|
TnxId = max(LatestId - 1, 0),
|
|
TnxId = max(LatestId - 1, 0),
|
|
|
commit(Node, TnxId),
|
|
commit(Node, TnxId),
|
|
|
?SLOG(notice, #{
|
|
?SLOG(notice, #{
|
|
@@ -382,7 +370,7 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
|
|
|
true ->
|
|
true ->
|
|
|
NodeId;
|
|
NodeId;
|
|
|
false ->
|
|
false ->
|
|
|
- {atomic, LatestId} = transaction(fun get_latest_id/0, []),
|
|
|
|
|
|
|
+ {atomic, LatestId} = transaction(fun get_cluster_tnx_id/0, []),
|
|
|
case LatestId =< NodeId of
|
|
case LatestId =< NodeId of
|
|
|
true ->
|
|
true ->
|
|
|
NodeId;
|
|
NodeId;
|
|
@@ -392,24 +380,17 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
|
|
|
end
|
|
end
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-get_latest_id() ->
|
|
|
|
|
- get_latest_id(tnx).
|
|
|
|
|
-
|
|
|
|
|
-get_latest_id(IsolationLevel) ->
|
|
|
|
|
- F =
|
|
|
|
|
- case IsolationLevel of
|
|
|
|
|
- tnx -> fun mnesia:last/1;
|
|
|
|
|
- dirty -> fun mnesia:dirty_last/1
|
|
|
|
|
- end,
|
|
|
|
|
- case F(?CLUSTER_MFA) of
|
|
|
|
|
|
|
+get_cluster_tnx_id() ->
|
|
|
|
|
+ case mnesia:last(?CLUSTER_MFA) of
|
|
|
'$end_of_table' -> 0;
|
|
'$end_of_table' -> 0;
|
|
|
Id -> Id
|
|
Id -> Id
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-init_mfa(Node, MFA, LatestIdLastSeen) ->
|
|
|
|
|
|
|
+init_mfa(Node, MFA) ->
|
|
|
mnesia:write_lock_table(?CLUSTER_MFA),
|
|
mnesia:write_lock_table(?CLUSTER_MFA),
|
|
|
- LatestId = get_latest_id(),
|
|
|
|
|
- case LatestIdLastSeen =:= LatestId of
|
|
|
|
|
|
|
+ LatestId = get_cluster_tnx_id(),
|
|
|
|
|
+ MyTnxId = get_node_tnx_id(node()),
|
|
|
|
|
+ case MyTnxId =:= LatestId of
|
|
|
true ->
|
|
true ->
|
|
|
TnxId = LatestId + 1,
|
|
TnxId = LatestId + 1,
|
|
|
MFARec = #cluster_rpc_mfa{
|
|
MFARec = #cluster_rpc_mfa{
|
|
@@ -420,17 +401,21 @@ init_mfa(Node, MFA, LatestIdLastSeen) ->
|
|
|
},
|
|
},
|
|
|
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
|
|
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
|
|
|
ok = commit(Node, TnxId),
|
|
ok = commit(Node, TnxId),
|
|
|
- case apply_mfa(TnxId, MFA, init) of
|
|
|
|
|
|
|
+ case apply_mfa(TnxId, MFA, ?APPLY_KIND_INITIATE) of
|
|
|
{true, Result} -> {ok, TnxId, Result};
|
|
{true, Result} -> {ok, TnxId, Result};
|
|
|
{false, Error} -> mnesia:abort(Error)
|
|
{false, Error} -> mnesia:abort(Error)
|
|
|
end;
|
|
end;
|
|
|
false ->
|
|
false ->
|
|
|
- ?SLOG(error, #{
|
|
|
|
|
|
|
+ %% refuse to initiate cluster call from this node
|
|
|
|
|
+ %% because it's likely that the caller is based on
|
|
|
|
|
+ %% a stale view.
|
|
|
|
|
+ Reason = #{
|
|
|
msg => stale_view_of_cluster_state,
|
|
msg => stale_view_of_cluster_state,
|
|
|
- tnx_id => LatestId,
|
|
|
|
|
- last_seen_tnx_id => LatestIdLastSeen
|
|
|
|
|
- }),
|
|
|
|
|
- mnesia:abort({error, stale_view_of_cluster_state})
|
|
|
|
|
|
|
+ cluster_tnx_id => LatestId,
|
|
|
|
|
+ node_tnx_id => MyTnxId
|
|
|
|
|
+ },
|
|
|
|
|
+ ?SLOG(warning, Reason),
|
|
|
|
|
+ mnesia:abort({error, Reason})
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
transaction(Func, Args) ->
|
|
transaction(Func, Args) ->
|
|
@@ -495,6 +480,15 @@ is_success(ok) -> true;
|
|
|
is_success({ok, _}) -> true;
|
|
is_success({ok, _}) -> true;
|
|
|
is_success(_) -> false.
|
|
is_success(_) -> false.
|
|
|
|
|
|
|
|
|
|
+log_and_alarm(IsSuccess, Res, #{kind := ?APPLY_KIND_INITIATE} = Meta) ->
|
|
|
|
|
+ %% no alarm or error log in case of failure at originating a new cluster-call
|
|
|
|
|
+ %% because nothing is committed
|
|
|
|
|
+ case IsSuccess of
|
|
|
|
|
+ true ->
|
|
|
|
|
+ ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_result", result => Res});
|
|
|
|
|
+ false ->
|
|
|
|
|
+ ?SLOG(warning, Meta#{msg => "cluster_rpc_apply_result", result => Res})
|
|
|
|
|
+ end;
|
|
|
log_and_alarm(true, Res, Meta) ->
|
|
log_and_alarm(true, Res, Meta) ->
|
|
|
?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => Res}),
|
|
?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => Res}),
|
|
|
do_alarm(deactivate, Res, Meta);
|
|
do_alarm(deactivate, Res, Meta);
|