Bläddra i källkod

test: add cluster_sync cli test

zhongwencool 1 år sedan
förälder
incheckning
457ea93570

+ 14 - 8
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -26,7 +26,7 @@
     conf/1,
     audit/3,
     unload/0,
-    mark_fix_log/1
+    mark_fix_log/2
 ]).
 
 -export([keys/0, get_config/0, get_config/1, load_config/2]).
@@ -103,7 +103,8 @@ admins(["fix"]) ->
             #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
             maybe_fix_lagging(Status, #{fix => true}),
             StoppedNodes =/= [] andalso
-                emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]);
+                emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]),
+            ok;
         Role ->
             Leader = emqx_cluster_rpc:find_leader(),
             emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role])
@@ -153,8 +154,8 @@ mark_fix_begin(Node, TnxId) ->
     MFA = {?MODULE, mark_fix_log, [Status]},
     emqx_cluster_rpc:update_mfa(Node, MFA, TnxId).
 
-mark_fix_log(Status) ->
-    ?SLOG(warning, #{msg => cluster_fix_log, status => Status}),
+mark_fix_log(Status, Opts) ->
+    ?SLOG(warning, #{msg => cluster_fix_log, status => Status, opts => Opts}),
     ok.
 
 audit(Level, From, Log) ->
@@ -226,11 +227,16 @@ maybe_fix_lagging(Status, #{fix := Fix}) ->
         {inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} ->
             emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]),
             print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs);
-        {inconsistent_tnx_id, Target, ToTnxId} when Fix ->
+        {inconsistent_tnx_id, ToTnxId, Target} when Fix ->
             print_tnx_id_status(Status),
-            ok = mark_fix_begin(Target, ToTnxId),
-            emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]);
-        {inconsistent_tnx_id, _Target, _ToTnxId} ->
+            case mark_fix_begin(Target, ToTnxId) of
+                ok ->
+                    waiting_for_fix_finish(),
+                    emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]);
+                Error ->
+                    Error
+            end;
+        {inconsistent_tnx_id, _ToTnxId, _Target} ->
             print_tnx_id_status(Status),
             Leader = emqx_cluster_rpc:find_leader(),
             emqx_ctl:print(?SUGGESTION(Leader));

+ 158 - 0
apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl

@@ -0,0 +1,158 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_conf_cluster_sync_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include("emqx_conf.hrl").
+
+-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    WorkDir = ?config(priv_dir, Config),
+    Cluster = mk_cluster_spec(#{}),
+    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
+    [{cluster_nodes, Nodes} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)).
+
+t_fix(Config) ->
+    [Node1, Node2] = ?config(cluster_nodes, Config),
+    ?ON(Node1, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())),
+    ?ON(Node2, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())),
+    ?ON(Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 100}, #{})),
+    ?assertEqual(100, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
+    ?assertEqual(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
+    ?ON(
+        Node1,
+        ?assertMatch(
+            {atomic, [
+                #{node := Node2, tnx_id := 1},
+                #{node := Node1, tnx_id := 1}
+            ]},
+            emqx_cluster_rpc:status()
+        )
+    ),
+    %% fix normal, nothing changed
+    ?ON(Node1, begin
+        ok = emqx_conf_cli:admins(["fix"]),
+        ?assertMatch(
+            {atomic, [
+                #{node := Node2, tnx_id := 1},
+                #{node := Node1, tnx_id := 1}
+            ]},
+            emqx_cluster_rpc:status()
+        )
+    end),
+    %% fix inconsistent_key. tnx_id is the same, so nothing changed.
+    emqx_conf_proto_v4:update(Node1, [<<"mqtt">>], #{<<"max_topic_levels">> => 99}, #{}),
+    ?ON(Node1, begin
+        ok = emqx_conf_cli:admins(["fix"]),
+        ?assertMatch(
+            {atomic, [
+                #{node := Node2, tnx_id := 1},
+                #{node := Node1, tnx_id := 1}
+            ]},
+            emqx_cluster_rpc:status()
+        )
+    end),
+    ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
+    ?assertMatch(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
+
+    %% fix inconsistent_tnx_id_key. tnx_id and key are updated.
+    ?ON(Node1, fake_mfa(2, Node1, {?MODULE, undef, []})),
+    %% 2 -> fake_mfa, 3-> mark_begin_log, 4-> mqtt 5 -> zones
+    ?ON(Node2, begin
+        ok = emqx_conf_cli:admins(["fix"]),
+        ?assertMatch(
+            {atomic, [
+                #{node := Node2, tnx_id := 5},
+                #{node := Node1, tnx_id := 5}
+            ]},
+            emqx_cluster_rpc:status()
+        )
+    end),
+    ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
+    ?assertMatch(99, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
+
+    %% fix inconsistent_tnx_id. tnx_id is updated.
+    {ok, _} = ?ON(
+        Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 98}, #{})
+    ),
+    ?ON(Node2, fake_mfa(7, Node2, {?MODULE, undef1, []})),
+    ?ON(Node1, begin
+        ok = emqx_conf_cli:admins(["fix"]),
+        ?assertMatch(
+            {atomic, [
+                #{node := Node2, tnx_id := 8},
+                #{node := Node1, tnx_id := 8}
+            ]},
+            emqx_cluster_rpc:status()
+        )
+    end),
+    ?assertMatch(98, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])),
+    ?assertMatch(98, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])),
+    %% unchanged
+    ?ON(Node1, begin
+        ok = emqx_conf_cli:admins(["fix"]),
+        ?assertMatch(
+            {atomic, [
+                #{node := Node2, tnx_id := 8},
+                #{node := Node1, tnx_id := 8}
+            ]},
+            emqx_cluster_rpc:status()
+        )
+    end),
+    ok.
+
+fake_mfa(TnxId, Node, MFA) ->
+    Func = fun() ->
+        MFARec = #cluster_rpc_mfa{
+            tnx_id = TnxId,
+            mfa = MFA,
+            initiator = Node,
+            created_at = erlang:localtime()
+        },
+        ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
+        ok = emqx_cluster_rpc:commit(Node, TnxId)
+    end,
+    {atomic, ok} = mria:transaction(?CLUSTER_RPC_SHARD, Func, []),
+    ok.
+
+mk_cluster_spec(Opts) ->
+    Conf = #{
+        listeners => #{
+            tcp => #{default => <<"marked_for_deletion">>},
+            ssl => #{default => <<"marked_for_deletion">>},
+            ws => #{default => <<"marked_for_deletion">>},
+            wss => #{default => <<"marked_for_deletion">>}
+        }
+    },
+    Apps = [
+        {emqx, #{config => Conf}},
+        {emqx_conf, #{config => Conf}}
+    ],
+    [
+        {emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Apps}},
+        {emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Apps}}
+    ].