Просмотр исходного кода

Merge pull request #12232 from zhongwencool/force-leave-clean-rpc-commit

fix: remove rpc commit info when force_leave cluster
JianBo He 2 лет назад
Родитель
Сommit
4ebf776f65

+ 12 - 1
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -30,6 +30,7 @@
     skip_failed_commit/1,
     fast_forward_to_commit/2,
     on_mria_stop/1,
+    force_leave_clean/1,
     wait_for_cluster_rpc/0,
     maybe_init_tnx_id/2
 ]).
@@ -44,6 +45,7 @@
     read_next_mfa/1,
     trans_query/1,
     trans_status/0,
+    on_leave_clean/1,
     on_leave_clean/0,
     get_commit_lag/0,
     get_commit_lag/1
@@ -220,7 +222,10 @@ status() ->
     transaction(fun ?MODULE:trans_status/0, []).
 
 on_leave_clean() ->
-    mnesia:delete({?CLUSTER_COMMIT, node()}).
+    on_leave_clean(node()).
+
+on_leave_clean(Node) ->
+    mnesia:delete({?CLUSTER_COMMIT, Node}).
 
 -spec latest_tnx_id() -> pos_integer().
 latest_tnx_id() ->
@@ -301,6 +306,12 @@ on_mria_stop(leave) ->
 on_mria_stop(_) ->
     ok.
 
+force_leave_clean(Node) ->
+    case transaction(fun ?MODULE:on_leave_clean/1, [Node]) of
+        {atomic, ok} -> ok;
+        {aborted, Reason} -> {error, Reason}
+    end.
+
 wait_for_cluster_rpc() ->
     %% Workaround for https://github.com/emqx/mria/issues/94:
     Msg1 = #{msg => "wait_for_cluster_rpc_shard"},

+ 1 - 1
apps/emqx_conf/src/emqx_conf.app.src

@@ -1,6 +1,6 @@
 {application, emqx_conf, [
     {description, "EMQX configuration management"},
-    {vsn, "0.1.32"},
+    {vsn, "0.1.33"},
     {registered, []},
     {mod, {emqx_conf_app, []}},
     {applications, [kernel, stdlib, emqx_ctl]},

+ 12 - 3
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -115,10 +115,19 @@ cluster(["leave"]) ->
             emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
     end;
 cluster(["force-leave", SNode]) ->
-    case mria:force_leave(ekka_node:parse_name(SNode)) of
+    Node = ekka_node:parse_name(SNode),
+    case mria:force_leave(Node) of
         ok ->
-            emqx_ctl:print("Remove the node from cluster successfully.~n"),
-            cluster(["status"]);
+            case emqx_cluster_rpc:force_leave_clean(Node) of
+                ok ->
+                    emqx_ctl:print("Remove the node from cluster successfully.~n"),
+                    cluster(["status"]);
+                {error, Reason} ->
+                    emqx_ctl:print(
+                        "Failed to remove the node from cluster_rpc.~n~p~n",
+                        [Reason]
+                    )
+            end;
         ignore ->
             emqx_ctl:print("Ignore.~n");
         {error, Error} ->

+ 46 - 0
apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl

@@ -46,11 +46,57 @@ t_broker(_Config) ->
     ok.
 
 t_cluster(_Config) ->
+    SelfNode = node(),
+    FakeNode = 'fake@127.0.0.1',
+    MFA = {io, format, [""]},
+    meck:new(mria_mnesia, [non_strict, passthrough, no_link]),
+    meck:expect(mria_mnesia, running_nodes, 0, [SelfNode, FakeNode]),
+    {atomic, {ok, TnxId, _}} =
+        mria:transaction(
+            emqx_cluster_rpc_shard,
+            fun emqx_cluster_rpc:init_mfa/2,
+            [SelfNode, MFA]
+        ),
+    emqx_cluster_rpc:maybe_init_tnx_id(FakeNode, TnxId),
+    ?assertMatch(
+        {atomic, [
+            #{
+                node := SelfNode,
+                mfa := MFA,
+                created_at := _,
+                tnx_id := TnxId,
+                initiator := SelfNode
+            },
+            #{
+                node := FakeNode,
+                mfa := MFA,
+                created_at := _,
+                tnx_id := TnxId,
+                initiator := SelfNode
+            }
+        ]},
+        emqx_cluster_rpc:status()
+    ),
     %% cluster join <Node>        # Join the cluster
     %% cluster leave              # Leave the cluster
     %% cluster force-leave <Node> # Force the node leave from cluster
     %% cluster status             # Cluster status
     emqx_ctl:run_command(["cluster", "status"]),
+
+    emqx_ctl:run_command(["cluster", "force-leave", atom_to_list(FakeNode)]),
+    ?assertMatch(
+        {atomic, [
+            #{
+                node := SelfNode,
+                mfa := MFA,
+                created_at := _,
+                tnx_id := TnxId,
+                initiator := SelfNode
+            }
+        ]},
+        emqx_cluster_rpc:status()
+    ),
+    meck:unload(mria_mnesia),
     ok.
 
 t_clients(_Config) ->

+ 1 - 0
changes/ce/fix-12232.en.md

@@ -0,0 +1 @@
+Fixed issue where commit log info was incorrectly retained after forcefully leaving a cluster, removing unnecessary commit log data improves leave operation cleanup.