Procházet zdrojové kódy

feat(conf): skip/fast_forward tnx_id via cluster_call cli

zhongwencool před 4 roky
rodič
revize
e7a7d64004

+ 10 - 3
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -18,8 +18,9 @@
 
 %% API
 -export([start_link/0, mnesia/1]).
--export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]).
--export([get_node_tnx_id/1]).
+-export([multicall/3, multicall/5, query/1, reset/0, status/0,
+         skip_failed_commit/1, fast_forward_to_commit/2]).
+-export([get_node_tnx_id/1, latest_tnx_id/0]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
     handle_continue/2, code_change/3]).
@@ -132,6 +133,11 @@ reset() -> gen_server:call(?MODULE, reset).
 status() ->
     transaction(fun trans_status/0, []).
 
+-spec latest_tnx_id() -> pos_integer().
+latest_tnx_id() ->
+    {atomic, TnxId} = transaction(fun get_latest_id/0, []),
+    TnxId.
+
 -spec get_node_tnx_id(node()) -> integer().
 get_node_tnx_id(Node) ->
     case mnesia:wread({?CLUSTER_COMMIT, Node}) of
@@ -267,7 +273,8 @@ do_catch_up(ToTnxId, Node) ->
                 {false, Error} -> mnesia:abort(Error)
             end;
         [#cluster_rpc_commit{tnx_id = LastAppliedId}] ->
-            Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
+            Reason = lists:flatten(
+                io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
                 [Node, LastAppliedId, ToTnxId])),
             ?SLOG(error, #{
                 msg => "catch up failed!",

+ 92 - 0
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -0,0 +1,92 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_conf_cli).
+-export([ load/0
+        , admins/1
+        , unload/0
+        ]).
+
+-define(CMD, cluster_call).
+
+load() ->
+    emqx_ctl:register_command(?CMD, {?MODULE, admins}, []).
+
+unload() ->
+    emqx_ctl:unregister_command(?CMD).
+
+admins(["status"]) -> status();
+
+admins(["skip"]) ->
+    status(),
+    Nodes = mria_mnesia:running_nodes(),
+    lists:foreach(fun emqx_cluster_rpc:skip_failed_commit/1,  Nodes),
+    status();
+
+admins(["skip", Node0]) ->
+    status(),
+    Node = list_to_existing_atom(Node0),
+    emqx_cluster_rpc:skip_failed_commit(Node),
+    status();
+
+admins(["tnxid", TnxId0]) ->
+    TnxId = list_to_integer(TnxId0),
+    emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]);
+
+admins(["fast_forward"]) ->
+    status(),
+    Nodes = mria_mnesia:running_nodes(),
+    TnxId = emqx_cluster_rpc:latest_tnx_id(),
+    lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
+    status();
+
+admins(["fast_forward", ToTnxId]) ->
+    status(),
+    Nodes = mria_mnesia:running_nodes(),
+    TnxId = list_to_integer(ToTnxId),
+    lists:foreach(fun(N) -> emqx_cluster_rpc:fast_forward_to_commit(N, TnxId) end, Nodes),
+    status();
+
+admins(["fast_forward", Node0, ToTnxId]) ->
+    status(),
+    TnxId = list_to_integer(ToTnxId),
+    Node = list_to_existing_atom(Node0),
+    emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId),
+    status();
+
+admins(_) ->
+    emqx_ctl:usage(
+      [
+          {"cluster_call status",  "status"},
+          {"cluster_call skip [node]", "increase one commit on specific node"},
+          {"cluster_call tnxid <TnxId>", "get detailed about TnxId"},
+          {"cluster_call  fast_forward [node] [tnx_id]", "fast forwards to tnx_id" }
+      ]).
+
+status() ->
+    emqx_ctl:print("-----------------------------------------------\n"),
+    {atomic, Status} = emqx_cluster_rpc:status(),
+    lists:foreach(fun(S) ->
+        #{
+            node := Node,
+            tnx_id := TnxId,
+            mfa := {M, F, A},
+            created_at := CreatedAt
+        } = S,
+        emqx_ctl:print("~p:[~w] CreatedAt:~p ~p:~p/~w\n",
+            [Node, TnxId, CreatedAt, M, F, length(A)])
+                  end, Status),
+    emqx_ctl:print("-----------------------------------------------\n").

+ 3 - 2
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -205,12 +205,13 @@ t_fast_forward_commit(_Config) ->
     {ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
     {ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
     {ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000),
+    {retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000),
     3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000),
     4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000),
-    5 = gen_server:call(?NODE2, {fast_forward_to_commit, 6}, 5000),
+    6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000),
     2 = gen_server:call(?NODE3, {fast_forward_to_commit, 2}, 5000),
     {atomic, List2} = emqx_cluster_rpc:status(),
-    ?assertEqual([{Node, 5}, {{Node, ?NODE2}, 5}, {{Node, ?NODE3}, 2}],
+    ?assertEqual([{Node, 6}, {{Node, ?NODE2}, 6}, {{Node, ?NODE3}, 2}],
         tnx_ids(List2)),
     ok.
 

+ 2 - 0
apps/emqx_modules/src/emqx_modules_app.erl

@@ -36,6 +36,7 @@ maybe_enable_modules() ->
     emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
     emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
     emqx_event_message:enable(),
+    emqx_conf_cli:load(),
     ok = emqx_rewrite:enable(),
     emqx_topic_metrics:enable().
 
@@ -45,4 +46,5 @@ maybe_disable_modules() ->
     emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
     emqx_event_message:disable(),
     emqx_rewrite:disable(),
+    emqx_conf_cli:unload(),
     emqx_topic_metrics:disable().