Bläddra i källkod

chore: transaction without nnonymous function

zhongwencool 4 år sedan
förälder
incheckning
d55ba6b2e8
2 ändrade filer med 44 tillägg och 50 borttagningar
  1. 42 49
      apps/emqx/src/emqx_cluster_rpc.erl
  2. 2 1
      apps/emqx/test/emqx_cluster_rpc_SUITE.erl

+ 42 - 49
apps/emqx/src/emqx_cluster_rpc.erl

@@ -92,37 +92,14 @@ multicall(M, F, A, Timeout) ->
 
 -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
 query(TnxId) ->
-    transaction(fun do_query/1, [TnxId]).
-
-do_query(TnxId) ->
-    case mnesia:read(?CLUSTER_MFA, TnxId) of
-        [] -> mnesia:abort(not_found);
-        [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] ->
-            #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
-    end.
+    transaction(fun trans_query/1, [TnxId]).
 
 -spec reset() -> reset.
 reset() -> gen_statem:call(?MODULE, reset).
 
 -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}.
 status() ->
-    Fun = fun() ->
-        mnesia:foldl(fun(Rec, Acc) ->
-            #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec,
-            case mnesia:read(?CLUSTER_MFA, TnxId) of
-                [MFARec] ->
-                    #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec,
-                    [#{
-                        node => Node,
-                        tnx_id => TnxId,
-                        initiator => InitNode,
-                        mfa => MFA,
-                        created_at => CreatedAt
-                    } | Acc];
-                [] -> Acc
-            end end, [], ?CLUSTER_COMMIT)
-          end,
-    transaction(Fun).
+    transaction(fun trans_status/0, []).
 
 %%%===================================================================
 %%% gen_statem callbacks
@@ -158,7 +135,7 @@ handle_event({call, From}, reset, _State, _Data) ->
     {keep_state_and_data, [{reply, From, ok}, ?CATCH_UP_AFTER(0)]};
 
 handle_event({call, From}, {initiate, MFA}, ?REALTIME, Data = #{node := Node}) ->
-    case transaction(fun() -> init_mfa(Node, MFA) end) of
+    case transaction(fun init_mfa/2, [Node, MFA]) of
         {atomic, {ok, TnxId}} ->
             {keep_state, Data, [{reply, From, {ok, TnxId}}]};
         {aborted, Reason} ->
@@ -188,12 +165,12 @@ code_change(_OldVsn, StateName, Data, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 catch_up(#{node := Node, retry_interval := RetryMs} = Data) ->
-    case get_next_mfa(Node) of
+    case transaction(fun get_next_mfa/1, [Node]) of
         {atomic, caught_up} -> {next_state, ?REALTIME, Data};
         {atomic, {still_lagging, NextId, MFA}} ->
             case apply_mfa(NextId, MFA) of
                 ok ->
-                    case transaction(fun() -> commit(Node, NextId) end) of
+                    case transaction(fun commit/2, [Node, NextId]) of
                         {atomic, ok} -> catch_up(Data);
                         _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
                     end;
@@ -203,24 +180,20 @@ catch_up(#{node := Node, retry_interval := RetryMs} = Data) ->
     end.
 
 get_next_mfa(Node) ->
-    Fun =
-        fun() ->
-            NextId =
-                case mnesia:wread({?CLUSTER_COMMIT, Node}) of
-                    [] ->
-                        LatestId = get_latest_id(),
-                        TnxId = max(LatestId - 1, 0),
-                        commit(Node, TnxId),
-                        ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]),
-                        TnxId;
-                    [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1
-                end,
-            case mnesia:read(?CLUSTER_MFA, NextId) of
-                [] -> caught_up;
-                [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA}
-            end
+    NextId =
+        case mnesia:wread({?CLUSTER_COMMIT, Node}) of
+            [] ->
+                LatestId = get_latest_id(),
+                TnxId = max(LatestId - 1, 0),
+                commit(Node, TnxId),
+                ?LOG(notice, "New node(~p) first catch up and start commit at ~p", [Node, TnxId]),
+                TnxId;
+            [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1
         end,
-    transaction(Fun).
+    case mnesia:read(?CLUSTER_MFA, NextId) of
+        [] -> caught_up;
+        [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA}
+    end.
 
 do_catch_up(ToTnxId, Node) ->
     case mnesia:wread({?CLUSTER_COMMIT, Node}) of
@@ -255,11 +228,11 @@ get_latest_id() ->
 
 handle_mfa_write_event(#cluster_rpc_mfa{tnx_id = EventId, mfa = MFA}, Data) ->
     #{node := Node, retry_interval := RetryMs} = Data,
-    {atomic, LastAppliedId} = transaction(fun() -> get_last_applied_id(Node, EventId - 1) end),
+    {atomic, LastAppliedId} = transaction(fun get_last_applied_id/2, [Node, EventId - 1]),
     if LastAppliedId + 1 =:= EventId ->
         case apply_mfa(EventId, MFA) of
             ok ->
-                case transaction(fun() -> commit(Node, EventId) end) of
+                case transaction(fun commit/2, [Node, EventId]) of
                     {atomic, ok} ->
                         {next_state, ?REALTIME, Data};
                     _ -> {next_state, ?CATCH_UP, Data, [?CATCH_UP_AFTER(RetryMs)]}
@@ -306,8 +279,28 @@ do_catch_up_in_one_trans(LatestId, Node) ->
 transaction(Func, Args) ->
     ekka_mnesia:transaction(?COMMON_SHARD, Func, Args).
 
-transaction(Func) ->
-    ekka_mnesia:transaction(?COMMON_SHARD, Func).
+trans_status() ->
+    mnesia:foldl(fun(Rec, Acc) ->
+        #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec,
+        case mnesia:read(?CLUSTER_MFA, TnxId) of
+            [MFARec] ->
+                #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec,
+                [#{
+                    node => Node,
+                    tnx_id => TnxId,
+                    initiator => InitNode,
+                    mfa => MFA,
+                    created_at => CreatedAt
+                } | Acc];
+            [] -> Acc
+        end end, [], ?CLUSTER_COMMIT).
+
+trans_query(TnxId) ->
+    case mnesia:read(?CLUSTER_MFA, TnxId) of
+        [] -> mnesia:abort(not_found);
+        [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] ->
+            #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt}
+    end.
 
 apply_mfa(TnxId, {M, F, A} = MFA) ->
     try

+ 2 - 1
apps/emqx/test/emqx_cluster_rpc_SUITE.erl

@@ -136,7 +136,8 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
     {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]},
     {ok, _} = emqx_cluster_rpc:multicall(M, F, A),
     {ok, _} = emqx_cluster_rpc:multicall(io, format, ["test"]),
-    {atomic, [Status]} = emqx_cluster_rpc:status(),
+    {atomic, [Status|L]} = emqx_cluster_rpc:status(),
+    ?assertEqual([], L),
     ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)),
     ?assertEqual(node(), maps:get(node, Status)),
     ?assertEqual(realtime, element(1, sys:get_state(?NODE1))),