فهرست منبع

Merge remote-tracking branch 'origin/release-58' into sync-release-58-20240826-021952

id 1 سال پیش
والد
کامیت
f36108ce06

+ 23 - 9
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -606,20 +606,34 @@ trans_status() ->
         [],
         ?CLUSTER_COMMIT
     ),
-    Nodes = mria:running_nodes(),
-    IndexNodes = lists:zip(Nodes, lists:seq(1, length(Nodes))),
+    Cores = lists:sort(mria:cluster_nodes(cores)),
+    RunningNodes = mria:running_nodes(),
+    %% Make sure cores is ahead of replicants
+    Replicants = lists:subtract(RunningNodes, Cores),
+    Nodes = lists:append(Cores, Replicants),
+    {NodeIndices, _} = lists:foldl(
+        fun(N, {Acc, Seq}) ->
+            {maps:put(N, Seq, Acc), Seq + 1}
+        end,
+        {#{}, 1},
+        Nodes
+    ),
     lists:sort(
-        fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) ->
-            {IdA, index_nodes(NA, IndexNodes)} > {IdB, index_nodes(NB, IndexNodes)}
+        fun(A, B) ->
+            compare_tnx_id_and_node(A, B, NodeIndices)
         end,
         List
     ).
 
-index_nodes(Node, IndexNodes) ->
-    case lists:keyfind(Node, 1, IndexNodes) of
-        false -> 0;
-        {_, Index} -> Index
-    end.
+compare_tnx_id_and_node(
+    #{tnx_id := Id, node := NA},
+    #{tnx_id := Id, node := NB},
+    NodeIndices
+    %% The smaller the seq, the higher the priority level.
+) ->
+    maps:get(NA, NodeIndices, undefined) < maps:get(NB, NodeIndices, undefined);
+compare_tnx_id_and_node(#{tnx_id := IdA}, #{tnx_id := IdB}, _NodeIndices) ->
+    IdA > IdB.
 
 trans_query(TnxId) ->
     case mnesia:read(?CLUSTER_MFA, TnxId) of

+ 27 - 15
apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl

@@ -85,17 +85,31 @@ t_base_test(_Config) ->
     ?assertEqual(ok, receive_msg(3, Msg)),
     ?assertEqual({ok, 2, ok}, multicall(M, F, A)),
     {atomic, Status} = emqx_cluster_rpc:status(),
+    Node = node(),
     case length(Status) =:= 3 of
         true ->
-            ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
+            ?assertMatch(
+                [
+                    #{node := Node, tnx_id := 2},
+                    #{node := {Node, ?NODE2}, tnx_id := 2},
+                    #{node := {Node, ?NODE3}, tnx_id := 2}
+                ],
+                Status
+            );
         false ->
             %% wait for mnesia to write in.
             ct:sleep(42),
             {atomic, Status1} = emqx_cluster_rpc:status(),
             ct:pal("status: ~p", Status),
             ct:pal("status1: ~p", Status1),
-            ?assertEqual(3, length(Status1)),
-            ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status))
+            ?assertMatch(
+                [
+                    #{node := Node, tnx_id := 2},
+                    #{node := {Node, ?NODE2}, tnx_id := 2},
+                    #{node := {Node, ?NODE3}, tnx_id := 2}
+                ],
+                Status1
+            )
     end,
     ok.
 
@@ -129,13 +143,13 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) ->
     {ok, _, ok} = multicall(M, F, A, 1, 1000),
     {atomic, AllStatus} = emqx_cluster_rpc:status(),
     Node = node(),
-    ?assertEqual(
+    ?assertMatch(
         [
-            {1, {Node, emqx_cluster_rpc2}},
-            {1, {Node, emqx_cluster_rpc3}},
-            {2, Node}
+            #{tnx_id := 2, node := Node},
+            #{tnx_id := 1, node := {Node, emqx_cluster_rpc2}},
+            #{tnx_id := 1, node := {Node, emqx_cluster_rpc3}}
         ],
-        lists:sort([{T, N} || #{tnx_id := T, node := N} <- AllStatus])
+        AllStatus
     ),
     erlang:send(?NODE2, test),
     Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A),
@@ -361,13 +375,11 @@ t_cleaner_unexpected_msg(_Config) ->
     ok.
 
 tnx_ids(Status) ->
-    lists:sort(
-        lists:map(
-            fun(#{tnx_id := TnxId, node := Node}) ->
-                {Node, TnxId}
-            end,
-            Status
-        )
+    lists:map(
+        fun(#{tnx_id := TnxId, node := Node}) ->
+            {Node, TnxId}
+        end,
+        Status
     ).
 
 start() ->

+ 12 - 12
apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl

@@ -47,8 +47,8 @@ t_fix(Config) ->
         Node1,
         ?assertMatch(
             {atomic, [
-                #{node := Node2, tnx_id := 1},
-                #{node := Node1, tnx_id := 1}
+                #{node := Node1, tnx_id := 1},
+                #{node := Node2, tnx_id := 1}
             ]},
             emqx_cluster_rpc:status()
         )
@@ -58,8 +58,8 @@ t_fix(Config) ->
         ok = emqx_conf_cli:admins(["fix"]),
         ?assertMatch(
             {atomic, [
-                #{node := Node2, tnx_id := 1},
-                #{node := Node1, tnx_id := 1}
+                #{node := Node1, tnx_id := 1},
+                #{node := Node2, tnx_id := 1}
             ]},
             emqx_cluster_rpc:status()
         )
@@ -70,8 +70,8 @@ t_fix(Config) ->
         ok = emqx_conf_cli:admins(["fix"]),
         ?assertMatch(
             {atomic, [
-                #{node := Node2, tnx_id := 1},
-                #{node := Node1, tnx_id := 1}
+                #{node := Node1, tnx_id := 1},
+                #{node := Node2, tnx_id := 1}
             ]},
             emqx_cluster_rpc:status()
         )
@@ -86,8 +86,8 @@ t_fix(Config) ->
         ok = emqx_conf_cli:admins(["fix"]),
         ?assertMatch(
             {atomic, [
-                #{node := Node2, tnx_id := 5},
-                #{node := Node1, tnx_id := 5}
+                #{node := Node1, tnx_id := 5},
+                #{node := Node2, tnx_id := 5}
             ]},
             emqx_cluster_rpc:status()
         )
@@ -104,8 +104,8 @@ t_fix(Config) ->
         ok = emqx_conf_cli:admins(["fix"]),
         ?assertMatch(
             {atomic, [
-                #{node := Node2, tnx_id := 8},
-                #{node := Node1, tnx_id := 8}
+                #{node := Node1, tnx_id := 8},
+                #{node := Node2, tnx_id := 8}
             ]},
             emqx_cluster_rpc:status()
         )
@@ -117,8 +117,8 @@ t_fix(Config) ->
         ok = emqx_conf_cli:admins(["fix"]),
         ?assertMatch(
             {atomic, [
-                #{node := Node2, tnx_id := 8},
-                #{node := Node1, tnx_id := 8}
+                #{node := Node1, tnx_id := 8},
+                #{node := Node2, tnx_id := 8}
             ]},
             emqx_cluster_rpc:status()
         )

+ 1 - 1
mix.exs

@@ -219,7 +219,7 @@ defmodule EMQXUmbrella.MixProject do
 
   def common_dep(:uuid), do: {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true}
   def common_dep(:redbug), do: {:redbug, github: "emqx/redbug", tag: "2.0.10"}
-  def common_dep(:observer_cli), do: {:observer_cli, "1.7.1"}
+  def common_dep(:observer_cli), do: {:observer_cli, "1.7.5"}
 
   def common_dep(:jose),
     do: {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2", override: true}

+ 1 - 1
rebar.config

@@ -93,7 +93,7 @@
     {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}},
     {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.1"}}},
     % NOTE: depends on recon 2.5.x
-    {observer_cli, "1.7.1"},
+    {observer_cli, "1.7.5"},
     {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.5"}}},
     {getopt, "1.0.2"},
     {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},