Преглед изворни кода

Merge pull request #6350 from zhongwencool/emqx-conf

feat: support eqmx_ctl cluster_call command.
zhongwencool пре 4 година
родитељ
комит
85a6f0f1e8

+ 1 - 0
apps/emqx/test/emqx_trace_handler_SUITE.erl

@@ -199,6 +199,7 @@ t_trace_ip_address(_Config) ->
     ?assertEqual([], emqx_trace_handler:running()).
 
 filesync(Name, Type) ->
+    ct:sleep(50),
     filesync(Name, Type, 3).
 
 %% sometime the handler process is not started yet.

+ 55 - 11
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -18,8 +18,9 @@
 
 %% API
 -export([start_link/0, mnesia/1]).
--export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]).
--export([get_node_tnx_id/1]).
+-export([multicall/3, multicall/5, query/1, reset/0, status/0,
+         skip_failed_commit/1, fast_forward_to_commit/2]).
+-export([get_node_tnx_id/1, latest_tnx_id/0]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
     handle_continue/2, code_change/3]).
@@ -60,21 +61,28 @@ start_link() ->
 start_link(Node, Name, RetryMs) ->
     gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
 
--spec multicall(Module, Function, Args) -> {ok, TnxId, term()} | {error, Reason} when
+%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
+%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}.
+%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
+-spec multicall(Module, Function, Args) ->
+    {ok, TnxId, term()} | {error, Reason} | {retry, TnxId, MFARes, node()} when
     Module :: module(),
     Function :: atom(),
     Args :: [term()],
+    MFARes :: term(),
     TnxId :: pos_integer(),
     Reason :: string().
 multicall(M, F, A) ->
     multicall(M, F, A, all, timer:minutes(2)).
 
--spec multicall(Module, Function, Args, SucceedNum, Timeout) -> {ok, TnxId, term()} |{error, Reason} when
+-spec multicall(Module, Function, Args, SucceedNum, Timeout) ->
+    {ok, TnxId, MFARes} | {error, Reason} | {retry, TnxId, MFARes, node()} when
     Module :: module(),
     Function :: atom(),
     Args :: [term()],
     SucceedNum :: pos_integer() | all,
     TnxId :: pos_integer(),
+    MFARes :: term(),
     Timeout :: timeout(),
     Reason :: string().
 multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
@@ -108,7 +116,10 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
         end,
     case OkOrFailed of
         ok -> InitRes;
-        _ -> OkOrFailed
+        {error, Error0} -> {error, Error0};
+        {retry, Node0} ->
+            {ok, TnxId0, MFARes} = InitRes,
+            {retry, TnxId0, MFARes, Node0}
     end.
 
 -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@@ -122,6 +133,11 @@ reset() -> gen_server:call(?MODULE, reset).
 status() ->
     transaction(fun trans_status/0, []).
 
+-spec latest_tnx_id() -> pos_integer().
+latest_tnx_id() ->
+    {atomic, TnxId} = transaction(fun get_latest_id/0, []),
+    TnxId.
+
 -spec get_node_tnx_id(node()) -> integer().
 get_node_tnx_id(Node) ->
     case mnesia:wread({?CLUSTER_COMMIT, Node}) of
@@ -136,6 +152,13 @@ get_node_tnx_id(Node) ->
 skip_failed_commit(Node) ->
     gen_server:call({?MODULE, Node}, skip_failed_commit).
 
+%% Regardless of what MFA is returned, consider it a success),
+%% then skip the specified TnxId.
+%% If CurrTnxId >= TnxId, nothing happened.
+%% If CurrTnxId < TnxId, the CurrTnxId will skip to TnxId.
+-spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer().
+fast_forward_to_commit(Node, ToTnxId) ->
+    gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}).
 %%%===================================================================
 %%% gen_server callbacks
 %%%===================================================================
@@ -165,8 +188,13 @@ handle_call({initiate, MFA}, _From, State = #{node := Node}) ->
         {aborted, Reason} ->
             {reply, {error, Reason}, State, {continue, ?CATCH_UP}}
     end;
-handle_call(skip_failed_commit, _From, State) ->
-    {reply, ok, State, catch_up(State, true)};
+handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
+    Timeout = catch_up(State, true),
+    {atomic, LatestId} = transaction(fun get_node_tnx_id/1, [Node]),
+    {reply, LatestId, State, Timeout};
+handle_call({fast_forward_to_commit, ToTnxId}, _From, State) ->
+    NodeId = do_fast_forward_to_commit(ToTnxId, State),
+    {reply, NodeId, State, catch_up(State)};
 handle_call(_, _From, State) ->
     {reply, ok, State, catch_up(State)}.
 
@@ -245,7 +273,8 @@ do_catch_up(ToTnxId, Node) ->
                 {false, Error} -> mnesia:abort(Error)
             end;
         [#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
-            Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
+            Reason = lists:flatten(
+                io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
                 [Node, LastAppliedId, ToTnxId])),
             ?SLOG(error, #{
                 msg => "catch up failed!",
@@ -258,6 +287,20 @@ do_catch_up(ToTnxId, Node) ->
 commit(Node, TnxId) ->
     ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write).
 
+do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
+    {atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]),
+    case NodeId >= ToTnxId of
+        true -> NodeId;
+        false ->
+            {atomic, LatestId} = transaction(fun get_latest_id/0, []),
+            case LatestId =< NodeId of
+                true -> NodeId;
+                false ->
+                    catch_up(State, true),
+                    do_fast_forward_to_commit(ToTnxId, State)
+            end
+    end.
+
 get_latest_id() ->
     case mnesia:last(?CLUSTER_MFA) of
         '$end_of_table' -> 0;
@@ -269,7 +312,8 @@ init_mfa(Node, MFA) ->
     LatestId = get_latest_id(),
     ok = do_catch_up_in_one_trans(LatestId, Node),
     TnxId = LatestId + 1,
-    MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, initiator = Node, created_at = erlang:localtime()},
+    MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA,
+        initiator = Node, created_at = erlang:localtime()},
     ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
     ok = commit(Node, TnxId),
     case apply_mfa(TnxId, MFA) of
@@ -344,7 +388,7 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
             ok = timer:sleep(Delay),
             wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
         [] -> ok;
-        Nodes -> {error, Nodes}
+        Nodes -> {retry, Nodes}
     end.
 
 wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
@@ -356,7 +400,7 @@ wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) ->
         false ->
             case lagging_node(TnxId) of
                 [] -> ok; %% All commit but The succeedNum > length(nodes()).
-                Nodes -> {error, Nodes}
+                Nodes -> {retry, Nodes}
             end
     end.
 

+ 10 - 4
apps/emqx_conf/src/emqx_conf.erl

@@ -16,6 +16,7 @@
 -module(emqx_conf).
 
 -compile({no_auto_import, [get/1, get/2]}).
+-include_lib("emqx/include/logger.hrl").
 
 -export([add_handler/2, remove_handler/1]).
 -export([get/1, get/2, get_raw/2, get_all/1]).
@@ -123,13 +124,18 @@ reset(Node, KeyPath, Opts) ->
     rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]).
 
 %%--------------------------------------------------------------------
-%% Internal funcs
+%% Internal functions
 %%--------------------------------------------------------------------
 
 multicall(M, F, Args) ->
     case emqx_cluster_rpc:multicall(M, F, Args) of
-        {ok, _TnxId, Res} ->
+        {ok, _TnxId, Res} -> Res;
+        {retry, TnxId, Res, Nodes} ->
+            %% The init MFA return ok, but other nodes failed.
+            %% We return ok and alert an alarm.
+            ?SLOG(error, #{msg => "failed to update config in cluster", nodes => Nodes,
+                tnx_id => TnxId, mfa => {M, F, Args}}),
             Res;
-        {error, Reason} ->
-            {error, Reason}
+        {error, Error} -> %% all MFA return not ok or {ok, term()}.
+            Error
     end.

+ 92 - 0
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -0,0 +1,92 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_conf_cli).
+-export([ load/0
+        , admins/1
+        , unload/0
+        ]).
+
+-define(CMD, cluster_call).
+
+load() ->
+    emqx_ctl:register_command(?CMD, {?MODULE, admins}, []).
+
+unload() ->
+    emqx_ctl:unregister_command(?CMD).
+
+admins(["status"]) -> status();
+
+admins(["skip"]) ->
+    status(),
+    Nodes = mria_mnesia:running_nodes(),
+    lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1,  Nodes),
+    status();
+
+admins(["skip", Node0]) ->
+    status(),
+    Node = list_to_existing_atom(Node0),
+    emqx_cluster_rpc:skip_failed_commit(Node),
+    status();
+
+admins(["tnxid", TnxId0]) ->
+    TnxId = list_to_integer(TnxId0),
+    emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]);
+
+admins(["fast_forward"]) ->
+    status(),
+    Nodes = mria_mnesia:running_nodes(),
+    TnxId = emqx_cluster_rpc:latest_tnx_id(),
+    lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
+    status();
+
+admins(["fast_forward", ToTnxId]) ->
+    status(),
+    Nodes = mria_mnesia:running_nodes(),
+    TnxId = list_to_integer(ToTnxId),
+    lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
+    status();
+
+admins(["fast_forward", Node0, ToTnxId]) ->
+    status(),
+    TnxId = list_to_integer(ToTnxId),
+    Node = list_to_existing_atom(Node0),
+    emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId),
+    status();
+
+admins(_) ->
+    emqx_ctl:usage(
+      [
+          {"cluster_call status",  "status"},
+          {"cluster_call skip [node]", "increase one commit on specific node"},
+          {"cluster_call tnxid <TnxId>", "get detailed about TnxId"},
+          {"cluster_call  fast_forward [node] [tnx_id]", "fast forwards to tnx_id" }
+      ]).
+
+status() ->
+    emqx_ctl:print("-----------------------------------------------\n"),
+    {atomic, Status} = emqx_cluster_rpc:status(),
+    lists:foreach(fun(S) ->
+        #{
+            node := Node,
+            tnx_id := TnxId,
+            mfa := {M, F, A},
+            created_at := CreatedAt
+        } = S,
+        emqx_ctl:print("~p:[~w] CreatedAt:~p ~p:~p/~w\n",
+            [Node, TnxId, CreatedAt, M, F, length(A)])
+                  end, Status),
+    emqx_ctl:print("-----------------------------------------------\n").

+ 28 - 3
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -33,7 +33,8 @@ all() -> [
     t_commit_ok_but_apply_fail_on_other_node,
     t_commit_ok_apply_fail_on_other_node_then_recover,
     t_del_stale_mfa,
-    t_skip_failed_commit
+    t_skip_failed_commit,
+    t_fast_forward_commit
 ].
 suite() -> [{timetrap, {minutes, 3}}].
 groups() -> [].
@@ -183,13 +184,37 @@ t_skip_failed_commit(_Config) ->
     ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
         tnx_ids(List1)),
     {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
-    {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
-    ok = gen_server:call(?NODE2, skip_failed_commit, 5000),
+    {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
+    2 = gen_server:call(?NODE2, skip_failed_commit, 5000),
     {atomic, List2} = emqx_cluster_rpc:status(),
     ?assertEqual([{Node, 2}, {{Node, ?NODE2}, 2}, {{Node, ?NODE3}, 1}],
         tnx_ids(List2)),
     ok.
 
+t_fast_forward_commit(_Config) ->
+    emqx_cluster_rpc:reset(),
+    {atomic, []} = emqx_cluster_rpc:status(),
+    {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
+    ct:sleep(180),
+    {atomic, List1} = emqx_cluster_rpc:status(),
+    Node = node(),
+    ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
+        tnx_ids(List1)),
+    {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]},
+    {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
+    {ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
+    {ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
+    {ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
+    {retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000),
+    3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000),
+    4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000),
+    6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
+    2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000),
+    {atomic, List2} = emqx_cluster_rpc:status(),
+    ?assertEqual([{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}],
+        tnx_ids(List2)),
+    ok.
+
 tnx_ids(Status) ->
     lists:sort(lists:map(fun(#{tnx_id := TnxId, node := Node}) ->
         {Node, TnxId} end, Status)).

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_conf.erl

@@ -232,7 +232,7 @@ update(Req) ->
     res(emqx_conf:update([gateway], Req, #{override_to => cluster})).
 
 res({ok, _Result}) -> ok;
-res({error, {error, {pre_config_update,emqx_gateway_conf,Reason}}}) -> {error, Reason};
+res({error, {pre_config_update, emqx_gateway_conf, Reason}}) -> {error, Reason};
 res({error, Reason}) -> {error, Reason}.
 
 bin({LType, LName}) ->

+ 2 - 0
apps/emqx_modules/src/emqx_modules_app.erl

@@ -36,6 +36,7 @@ maybe_enable_modules() ->
     emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
     emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
     emqx_event_message:enable(),
+    emqx_conf_cli:load(),
     ok = emqx_rewrite:enable(),
     emqx_topic_metrics:enable().
 
@@ -45,4 +46,5 @@ maybe_disable_modules() ->
     emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
     emqx_event_message:disable(),
     emqx_rewrite:disable(),
+    emqx_conf_cli:unload(),
     emqx_topic_metrics:disable().

+ 5 - 6
apps/emqx_modules/test/emqx_rewrite_SUITE.erl

@@ -168,12 +168,11 @@ t_update_re_failed(_Config) ->
     }],
     Error = {badmatch,
         {error,
-            {error,
-                {emqx_modules_schema,
-                    [{validation_error,
-                        #{array_index => 1,path => "rewrite.re",
-                            reason => {<<"*^test/*">>,{"nothing to repeat",0}},
-                            value => <<"*^test/*">>}}]}}}},
+            {emqx_modules_schema,
+                [{validation_error,
+                    #{array_index => 1,path => "rewrite.re",
+                        reason => {<<"*^test/*">>,{"nothing to repeat",0}},
+                        value => <<"*^test/*">>}}]}}},
     ?assertError(Error, emqx_rewrite:update(Rules)),
     ok.