|
@@ -228,7 +228,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
|
|
|
{atomic, ok} -> catch_up(State, false);
|
|
{atomic, ok} -> catch_up(State, false);
|
|
|
Error ->
|
|
Error ->
|
|
|
?SLOG(error, #{
|
|
?SLOG(error, #{
|
|
|
- msg => "failed to commit applied call",
|
|
|
|
|
|
|
+ msg => "failed_to_commit_applied_call",
|
|
|
applied_id => NextId,
|
|
applied_id => NextId,
|
|
|
error => Error}),
|
|
error => Error}),
|
|
|
RetryMs
|
|
RetryMs
|
|
@@ -359,28 +359,34 @@ apply_mfa(TnxId, {M, F, A}) ->
|
|
|
Res =
|
|
Res =
|
|
|
try erlang:apply(M, F, A)
|
|
try erlang:apply(M, F, A)
|
|
|
catch
|
|
catch
|
|
|
- Class:Reason:Stacktrace ->
|
|
|
|
|
|
|
+ throw : Reason ->
|
|
|
|
|
+ {error, #{reason => Reason}};
|
|
|
|
|
+ Class : Reason : Stacktrace ->
|
|
|
{error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
|
|
{error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}}
|
|
|
end,
|
|
end,
|
|
|
- Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)},
|
|
|
|
|
|
|
+ %% Do not log args as it might be sensitive information
|
|
|
|
|
+ Meta = #{tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))},
|
|
|
IsSuccess = is_success(Res),
|
|
IsSuccess = is_success(Res),
|
|
|
- log_and_alarm(IsSuccess, Res, Meta, TnxId),
|
|
|
|
|
|
|
+ log_and_alarm(IsSuccess, Res, Meta),
|
|
|
{IsSuccess, Res}.
|
|
{IsSuccess, Res}.
|
|
|
|
|
|
|
|
|
|
+format_mfa(M, F, A) ->
|
|
|
|
|
+ iolist_to_binary([atom_to_list(M), ":", atom_to_list(F), "/", integer_to_list(A)]).
|
|
|
|
|
+
|
|
|
is_success(ok) -> true;
|
|
is_success(ok) -> true;
|
|
|
is_success({ok, _}) -> true;
|
|
is_success({ok, _}) -> true;
|
|
|
is_success(_) -> false.
|
|
is_success(_) -> false.
|
|
|
|
|
|
|
|
-log_and_alarm(true, Res, Meta, TnxId) ->
|
|
|
|
|
- OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => Res},
|
|
|
|
|
- ?SLOG(debug, OkMeta),
|
|
|
|
|
- Message = ["cluster_rpc_apply_failed:", integer_to_binary(TnxId)],
|
|
|
|
|
- emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)}, Message);
|
|
|
|
|
-log_and_alarm(false, Res, Meta, TnxId) ->
|
|
|
|
|
- NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => Res},
|
|
|
|
|
- ?SLOG(error, NotOkMeta),
|
|
|
|
|
- Message = ["cluster_rpc_apply_failed:", integer_to_binary(TnxId)],
|
|
|
|
|
- emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}, Message).
|
|
|
|
|
|
|
+log_and_alarm(true, Res, Meta) ->
|
|
|
|
|
+ ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => Res}),
|
|
|
|
|
+ do_alarm(deactivate, Res, Meta);
|
|
|
|
|
+log_and_alarm(false, Res, Meta) ->
|
|
|
|
|
+ ?SLOG(error, Meta#{msg => "cluster_rpc_apply_failed", result => Res}),
|
|
|
|
|
+ do_alarm(activate, Res, Meta).
|
|
|
|
|
+
|
|
|
|
|
+do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->
|
|
|
|
|
+ AlarmMsg = ["cluster_rpc_apply_failed=", integer_to_list(Id)],
|
|
|
|
|
+ emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg).
|
|
|
|
|
|
|
|
wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
|
|
wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
|
|
|
case lagging_node(TnxId) of
|
|
case lagging_node(TnxId) of
|