Przeglądaj źródła

chore: add config leader to suggestion

zhongwencool 1 rok temu
rodzic
commit
bdf3fc63a6

+ 11 - 0
apps/emqx_conf/include/emqx_conf.hrl

@@ -35,6 +35,17 @@
     tnx_id :: pos_integer() | '$1'
 }).
 
+-define(SUGGESTION(Node),
+    lists:flatten(
+        io_lib:format(
+            "run `./bin/emqx_ctl conf cluster_sync fix`"
+            " on ~p(config leader) to force sync the configs,"
+            "when this node is lagging for more than 3 minutes,",
+            [Node]
+        )
+    )
+).
+
 -define(READONLY_KEYS, [cluster, rpc, node]).
 
 -endif.

+ 21 - 2
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -28,6 +28,7 @@
     reset/0,
     status/0,
     is_initiator/1,
+    find_leader/0,
     skip_failed_commit/1,
     fast_forward_to_commit/2,
     on_mria_stop/1,
@@ -227,6 +228,17 @@ status() ->
 is_initiator(Opts) ->
     ?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE).
 
+find_leader() ->
+    {atomic, Status} = status(),
+    case Status of
+        [#{node := N} | _] ->
+            N;
+        [] ->
+            %% running nodes already sort.
+            [N | _] = emqx:running_nodes(),
+            N
+    end.
+
 %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560.
 on_leave_clean() ->
     on_leave_clean(node()).
@@ -500,10 +512,11 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) ->
     end.
 
 stale_view_of_cluster_msg(Meta, Count) ->
+    Node = find_leader(),
     Reason = Meta#{
         msg => stale_view_of_cluster,
         retry_times => Count,
-        suggested => "run `./bin/emqx_ctl conf cluster_sync fix` when suck for a long time"
+        suggestion => ?SUGGESTION(Node)
     },
     ?SLOG(warning, Reason),
     {error, Reason}.
@@ -537,7 +550,7 @@ transaction(Func, Args) ->
     mria:transaction(?CLUSTER_RPC_SHARD, Func, Args).
 
 trans_status() ->
-    mnesia:foldl(
+    List = mnesia:foldl(
         fun(Rec, Acc) ->
             #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec,
             case mnesia:read(?CLUSTER_MFA, TnxId) of
@@ -560,6 +573,12 @@ trans_status() ->
         end,
         [],
         ?CLUSTER_COMMIT
+    ),
+    lists:sort(
+        fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) ->
+            {IdA, NA} > {IdB, NB}
+        end,
+        List
     ).
 
 trans_query(TnxId) ->

+ 22 - 6
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -96,15 +96,23 @@ admins(["inspect", TnxId0]) ->
     TnxId = list_to_integer(TnxId0),
     print(emqx_cluster_rpc:query(TnxId));
 admins(["fix"]) ->
+    {atomic, Status} = emqx_cluster_rpc:status(),
     case mria_rlog:role() of
         core ->
-            {atomic, Status} = emqx_cluster_rpc:status(),
             #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(),
             maybe_fix_inconsistent(Status, #{fix => true}),
             StoppedNodes =/= [] andalso
                 emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]);
         Role ->
-            emqx_ctl:print("Run fix command on core node, but current is ~p~n", [Role])
+            Core =
+                case find_highest_node(Status) of
+                    {same_tnx_id, _TnxId} ->
+                        {ok, Node} = mria_status:upstream_node(?CLUSTER_RPC_SHARD),
+                        Node;
+                    {ok, Node} ->
+                        Node
+                end,
+            emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Core, Role])
     end;
 admins(["fast_forward"]) ->
     status(),
@@ -128,7 +136,14 @@ admins(_) ->
     emqx_ctl:usage(usage_sync()).
 
 fix_inconsistent_with_raw(Node, Keys) ->
-    Confs = [#{Key => emqx_conf_proto_v4:get_raw_config(Node, Key)} || Key <- Keys],
+    Confs = lists:foldl(
+        fun(Key, Acc) ->
+            KeyRaw = atom_to_binary(Key),
+            Acc#{KeyRaw => emqx_conf_proto_v4:get_raw_config(Node, [Key])}
+        end,
+        #{},
+        Keys
+    ),
     ok = emqx_cluster_rpc:reset(),
     case load_config_from_raw(Confs, #{mode => replace}) of
         ok -> waiting_for_fix_finish();
@@ -179,7 +194,7 @@ usage_sync() ->
             "WARNING: This results in inconsistent configs among the clustered nodes."},
         {"conf cluster_sync fix",
             "Sync the node with the most comprehensive configuration to other node.\n"
-            "WARNING: typically the one with the highest tnxid."}
+            "WARNING: typically the config leader(with the highest tnxid)."}
     ].
 
 status() ->
@@ -210,7 +225,8 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) ->
             emqx_ctl:print("Reset tnxid to 0 successfully~n");
         inconsistent_tnx_id ->
             print_tnx_id_status(Status),
-            emqx_ctl:print("run `./bin/emqx_ctl conf cluster_sync fix` when stuck for a long time");
+            Leader = emqx_cluster_rpc:find_leader(),
+            emqx_ctl:print(?SUGGESTION(Leader));
         {inconsistent_key, TnxId, InconsistentKeys} ->
             [{Target, _} | _] = AllConfs,
             print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
@@ -223,7 +239,7 @@ maybe_fix_inconsistent(Status, #{fix := Fix}) ->
             ),
             emqx_ctl:warning(
                 "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf"
-                " is allowed but not recommended. "
+                " is allowed but not recommended.~n"
             ),
             Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"),
             ok;

+ 1 - 1
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -148,7 +148,7 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
                 retry_times := 2,
                 cluster_tnx_id := 2,
                 node_tnx_id := 1,
-                suggested := _
+                suggestion := _
             }}},
         Res1
     ),

+ 7 - 7
apps/emqx_management/src/emqx_mgmt_api_configs.erl

@@ -314,15 +314,15 @@ global_zone_configs(get, _Params, _Req) ->
     {200, get_zones()};
 global_zone_configs(put, #{body := Body}, _Req) ->
     PrevZones = get_zones(),
-    Res =
+    {Res, Error} =
         maps:fold(
-            fun(Path, Value, Acc) ->
+            fun(Path, Value, {Acc, Error}) ->
                 PrevValue = maps:get(Path, PrevZones),
                 case Value =/= PrevValue of
                     true ->
                         case emqx_conf:update([Path], Value, ?OPTS) of
                             {ok, #{raw_config := RawConf}} ->
-                                Acc#{Path => RawConf};
+                                {Acc#{Path => RawConf}, Error};
                             {error, Reason} ->
                                 ?SLOG(error, #{
                                     msg => "update_global_zone_failed",
@@ -330,18 +330,18 @@ global_zone_configs(put, #{body := Body}, _Req) ->
                                     path => Path,
                                     value => Value
                                 }),
-                                Acc
+                                {Acc, Error#{Path => Reason}}
                         end;
                     false ->
-                        Acc#{Path => Value}
+                        {Acc#{Path => Value}, Error}
                 end
             end,
-            #{},
+            {#{}, #{}},
             Body
         ),
     case maps:size(Res) =:= maps:size(Body) of
         true -> {200, Res};
-        false -> {400, #{code => 'UPDATE_FAILED'}}
+        false -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Error)}}
     end.
 
 config_reset(post, _Params, Req) ->