Просмотр исходного кода

Merge pull request #8774 from ieQu1/master

Refactor transactions
ieQu1 3 лет назад
Родитель
Сommit
97d574f62c

+ 6 - 1
apps/emqx/src/emqx_cm_registry.erl

@@ -44,6 +44,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    do_cleanup_channels/1
+]).
+
 -define(REGISTRY, ?MODULE).
 -define(TAB, emqx_channel_registry).
 -define(LOCK, {?MODULE, cleanup_down}).
@@ -155,7 +160,7 @@ cleanup_channels(Node) ->
     global:trans(
         {?LOCK, self()},
         fun() ->
-            mria:transaction(?CM_SHARD, fun do_cleanup_channels/1, [Node])
+            mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node])
         end
     ).
 

+ 67 - 65
apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl

@@ -52,6 +52,14 @@
     group_match_spec/1
 ]).
 
+%% Internal exports (RPC)
+-export([
+    do_destroy/1,
+    do_add_user/2,
+    do_delete_user/2,
+    do_update_user/3
+]).
+
 -define(TAB, ?MODULE).
 -define(AUTHN_QSCHEMA, [
     {<<"like_user_id">>, binary},
@@ -170,83 +178,79 @@ authenticate(_Credential, _State) ->
     ignore.
 
 destroy(#{user_group := UserGroup}) ->
+    trans(fun ?MODULE:do_destroy/1, [UserGroup]).
+
+do_destroy(UserGroup) ->
     MatchSpec = group_match_spec(UserGroup),
-    trans(
-        fun() ->
-            ok = lists:foreach(
-                fun(UserInfo) ->
-                    mnesia:delete_object(?TAB, UserInfo, write)
-                end,
-                mnesia:select(?TAB, MatchSpec, write)
-            )
-        end
+    ok = lists:foreach(
+        fun(UserInfo) ->
+            mnesia:delete_object(?TAB, UserInfo, write)
+        end,
+        mnesia:select(?TAB, MatchSpec, write)
     ).
 
-add_user(
+add_user(UserInfo, State) ->
+    trans(fun ?MODULE:do_add_user/2, [UserInfo, State]).
+
+do_add_user(
     #{
         user_id := UserID,
         password := Password
     } = UserInfo,
     #{user_group := UserGroup} = State
 ) ->
-    trans(
-        fun() ->
-            case mnesia:read(?TAB, {UserGroup, UserID}, write) of
-                [] ->
-                    IsSuperuser = maps:get(is_superuser, UserInfo, false),
-                    add_user(UserGroup, UserID, Password, IsSuperuser, State),
-                    {ok, #{user_id => UserID, is_superuser => IsSuperuser}};
-                [_] ->
-                    {error, already_exist}
-            end
-        end
-    ).
+    case mnesia:read(?TAB, {UserGroup, UserID}, write) of
+        [] ->
+            IsSuperuser = maps:get(is_superuser, UserInfo, false),
+            add_user(UserGroup, UserID, Password, IsSuperuser, State),
+            {ok, #{user_id => UserID, is_superuser => IsSuperuser}};
+        [_] ->
+            {error, already_exist}
+    end.
 
-delete_user(UserID, #{user_group := UserGroup}) ->
-    trans(
-        fun() ->
-            case mnesia:read(?TAB, {UserGroup, UserID}, write) of
-                [] ->
-                    {error, not_found};
-                [_] ->
-                    mnesia:delete(?TAB, {UserGroup, UserID}, write)
-            end
-        end
-    ).
+delete_user(UserID, State) ->
+    trans(fun ?MODULE:do_delete_user/2, [UserID, State]).
+
+do_delete_user(UserID, #{user_group := UserGroup}) ->
+    case mnesia:read(?TAB, {UserGroup, UserID}, write) of
+        [] ->
+            {error, not_found};
+        [_] ->
+            mnesia:delete(?TAB, {UserGroup, UserID}, write)
+    end.
+
+update_user(UserID, User, State) ->
+    trans(fun ?MODULE:do_update_user/3, [UserID, User, State]).
 
-update_user(
+do_update_user(
     UserID,
     User,
     #{user_group := UserGroup} = State
 ) ->
-    trans(
-        fun() ->
-            case mnesia:read(?TAB, {UserGroup, UserID}, write) of
-                [] ->
-                    {error, not_found};
-                [#user_info{is_superuser = IsSuperuser} = UserInfo] ->
-                    UserInfo1 = UserInfo#user_info{
-                        is_superuser = maps:get(is_superuser, User, IsSuperuser)
-                    },
-                    UserInfo2 =
-                        case maps:get(password, User, undefined) of
-                            undefined ->
-                                UserInfo1;
-                            Password ->
-                                {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(
-                                    Password, State
-                                ),
-                                UserInfo1#user_info{
-                                    stored_key = StoredKey,
-                                    server_key = ServerKey,
-                                    salt = Salt
-                                }
-                        end,
-                    mnesia:write(?TAB, UserInfo2, write),
-                    {ok, format_user_info(UserInfo2)}
-            end
-        end
-    ).
+    case mnesia:read(?TAB, {UserGroup, UserID}, write) of
+        [] ->
+            {error, not_found};
+        [#user_info{is_superuser = IsSuperuser} = UserInfo] ->
+            UserInfo1 = UserInfo#user_info{
+                is_superuser = maps:get(is_superuser, User, IsSuperuser)
+            },
+            UserInfo2 =
+                case maps:get(password, User, undefined) of
+                    undefined ->
+                        UserInfo1;
+                    Password ->
+                        {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(
+                            Password, State
+                        ),
+                        UserInfo1#user_info{
+                            stored_key = StoredKey,
+                            server_key = ServerKey,
+                            salt = Salt
+                        }
+                end,
+            mnesia:write(?TAB, UserInfo2, write),
+            {ok, format_user_info(UserInfo2)}
+    end.
 
 lookup_user(UserID, #{user_group := UserGroup}) ->
     case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of
@@ -386,12 +390,10 @@ retrieve(UserID, #{user_group := UserGroup}) ->
     end.
 
 %% TODO: Move to emqx_authn_utils.erl
-trans(Fun) ->
-    trans(Fun, []).
-
 trans(Fun, Args) ->
     case mria:transaction(?AUTH_SHARD, Fun, Args) of
         {atomic, Res} -> Res;
+        {aborted, {function_clause, Stack}} -> erlang:raise(error, function_clause, Stack);
         {aborted, Reason} -> {error, Reason}
     end.
 

+ 70 - 67
apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl

@@ -54,6 +54,16 @@
     group_match_spec/1
 ]).
 
+%% Internal exports (RPC)
+-export([
+    do_destroy/1,
+    do_add_user/2,
+    do_delete_user/2,
+    do_update_user/3,
+    import/2,
+    import_csv/3
+]).
+
 -type user_group() :: binary().
 -type user_id() :: binary().
 
@@ -175,15 +185,14 @@ authenticate(
     end.
 
 destroy(#{user_group := UserGroup}) ->
-    trans(
-        fun() ->
-            ok = lists:foreach(
-                fun(User) ->
-                    mnesia:delete_object(?TAB, User, write)
-                end,
-                mnesia:select(?TAB, group_match_spec(UserGroup), write)
-            )
-        end
+    trans(fun ?MODULE:do_destroy/1, [UserGroup]).
+
+do_destroy(UserGroup) ->
+    ok = lists:foreach(
+        fun(User) ->
+            mnesia:delete_object(?TAB, User, write)
+        end,
+        mnesia:select(?TAB, group_match_spec(UserGroup), write)
     ).
 
 import_users({Filename0, FileData}, State) ->
@@ -200,7 +209,10 @@ import_users({Filename0, FileData}, State) ->
             {error, {unsupported_file_format, Extension}}
     end.
 
-add_user(
+add_user(UserInfo, State) ->
+    trans(fun ?MODULE:do_add_user/2, [UserInfo, State]).
+
+do_add_user(
     #{
         user_id := UserID,
         password := Password
@@ -210,33 +222,31 @@ add_user(
         password_hash_algorithm := Algorithm
     }
 ) ->
-    trans(
-        fun() ->
-            case mnesia:read(?TAB, {UserGroup, UserID}, write) of
-                [] ->
-                    {PasswordHash, Salt} = emqx_authn_password_hashing:hash(Algorithm, Password),
-                    IsSuperuser = maps:get(is_superuser, UserInfo, false),
-                    insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
-                    {ok, #{user_id => UserID, is_superuser => IsSuperuser}};
-                [_] ->
-                    {error, already_exist}
-            end
-        end
-    ).
+    case mnesia:read(?TAB, {UserGroup, UserID}, write) of
+        [] ->
+            {PasswordHash, Salt} = emqx_authn_password_hashing:hash(Algorithm, Password),
+            IsSuperuser = maps:get(is_superuser, UserInfo, false),
+            insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
+            {ok, #{user_id => UserID, is_superuser => IsSuperuser}};
+        [_] ->
+            {error, already_exist}
+    end.
 
-delete_user(UserID, #{user_group := UserGroup}) ->
-    trans(
-        fun() ->
-            case mnesia:read(?TAB, {UserGroup, UserID}, write) of
-                [] ->
-                    {error, not_found};
-                [_] ->
-                    mnesia:delete(?TAB, {UserGroup, UserID}, write)
-            end
-        end
-    ).
+delete_user(UserID, State) ->
+    trans(fun ?MODULE:do_delete_user/2, [UserID, State]).
+
+do_delete_user(UserID, #{user_group := UserGroup}) ->
+    case mnesia:read(?TAB, {UserGroup, UserID}, write) of
+        [] ->
+            {error, not_found};
+        [_] ->
+            mnesia:delete(?TAB, {UserGroup, UserID}, write)
+    end.
+
+update_user(UserID, UserInfo, State) ->
+    trans(fun ?MODULE:do_update_user/3, [UserID, UserInfo, State]).
 
-update_user(
+do_update_user(
     UserID,
     UserInfo,
     #{
@@ -244,33 +254,29 @@ update_user(
         password_hash_algorithm := Algorithm
     }
 ) ->
-    trans(
-        fun() ->
-            case mnesia:read(?TAB, {UserGroup, UserID}, write) of
-                [] ->
-                    {error, not_found};
-                [
-                    #user_info{
-                        password_hash = PasswordHash,
-                        salt = Salt,
-                        is_superuser = IsSuperuser
-                    }
-                ] ->
-                    NSuperuser = maps:get(is_superuser, UserInfo, IsSuperuser),
-                    {NPasswordHash, NSalt} =
-                        case UserInfo of
-                            #{password := Password} ->
-                                emqx_authn_password_hashing:hash(
-                                    Algorithm, Password
-                                );
-                            #{} ->
-                                {PasswordHash, Salt}
-                        end,
-                    insert_user(UserGroup, UserID, NPasswordHash, NSalt, NSuperuser),
-                    {ok, #{user_id => UserID, is_superuser => NSuperuser}}
-            end
-        end
-    ).
+    case mnesia:read(?TAB, {UserGroup, UserID}, write) of
+        [] ->
+            {error, not_found};
+        [
+            #user_info{
+                password_hash = PasswordHash,
+                salt = Salt,
+                is_superuser = IsSuperuser
+            }
+        ] ->
+            NSuperuser = maps:get(is_superuser, UserInfo, IsSuperuser),
+            {NPasswordHash, NSalt} =
+                case UserInfo of
+                    #{password := Password} ->
+                        emqx_authn_password_hashing:hash(
+                            Algorithm, Password
+                        );
+                    #{} ->
+                        {PasswordHash, Salt}
+                end,
+            insert_user(UserGroup, UserID, NPasswordHash, NSalt, NSuperuser),
+            {ok, #{user_id => UserID, is_superuser => NSuperuser}}
+    end.
 
 lookup_user(UserID, #{user_group := UserGroup}) ->
     case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of
@@ -335,7 +341,7 @@ run_fuzzy_filter(
 import_users_from_json(Bin, #{user_group := UserGroup}) ->
     case emqx_json:safe_decode(Bin, [return_maps]) of
         {ok, List} ->
-            trans(fun import/2, [UserGroup, List]);
+            trans(fun ?MODULE:import/2, [UserGroup, List]);
         {error, Reason} ->
             {error, Reason}
     end.
@@ -344,7 +350,7 @@ import_users_from_json(Bin, #{user_group := UserGroup}) ->
 import_users_from_csv(CSV, #{user_group := UserGroup}) ->
     case get_csv_header(CSV) of
         {ok, Seq, NewCSV} ->
-            trans(fun import_csv/3, [UserGroup, NewCSV, Seq]);
+            trans(fun ?MODULE:import_csv/3, [UserGroup, NewCSV, Seq]);
         {error, Reason} ->
             {error, Reason}
     end.
@@ -435,9 +441,6 @@ get_user_identity(#{clientid := ClientID}, clientid) ->
 get_user_identity(_, Type) ->
     {error, {bad_user_identity_type, Type}}.
 
-trans(Fun) ->
-    trans(Fun, []).
-
 trans(Fun, Args) ->
     case mria:transaction(?AUTH_SHARD, Fun, Args) of
         {atomic, Res} -> Res;

+ 20 - 14
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -31,10 +31,16 @@
     fast_forward_to_commit/2
 ]).
 -export([
-    get_node_tnx_id/1,
+    commit/2,
+    commit_status_trans/2,
     get_cluster_tnx_id/0,
+    get_node_tnx_id/1,
+    init_mfa/2,
     latest_tnx_id/0,
-    make_initiate_call_req/3
+    make_initiate_call_req/3,
+    read_next_mfa/1,
+    trans_query/1,
+    trans_status/0
 ]).
 
 -export([
@@ -194,18 +200,18 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) ->
 
 -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
 query(TnxId) ->
-    transaction(fun trans_query/1, [TnxId]).
+    transaction(fun ?MODULE:trans_query/1, [TnxId]).
 
 -spec reset() -> reset.
 reset() -> gen_server:call(?MODULE, reset).
 
 -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}.
 status() ->
-    transaction(fun trans_status/0, []).
+    transaction(fun ?MODULE:trans_status/0, []).
 
 -spec latest_tnx_id() -> pos_integer().
 latest_tnx_id() ->
-    {atomic, TnxId} = transaction(fun get_cluster_tnx_id/0, []),
+    {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
     TnxId.
 
 -spec make_initiate_call_req(module(), atom(), list()) -> init_call_req().
@@ -280,7 +286,7 @@ handle_call(reset, _From, State) ->
     _ = mria:clear_table(?CLUSTER_MFA),
     {reply, ok, State, {continue, ?CATCH_UP}};
 handle_call(?INITIATE(MFA), _From, State = #{node := Node}) ->
-    case transaction(fun init_mfa/2, [Node, MFA]) of
+    case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of
         {atomic, {ok, TnxId, Result}} ->
             {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}};
         {aborted, Error} ->
@@ -288,7 +294,7 @@ handle_call(?INITIATE(MFA), _From, State = #{node := Node}) ->
     end;
 handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
     Timeout = catch_up(State, true),
-    {atomic, LatestId} = transaction(fun get_node_tnx_id/1, [Node]),
+    {atomic, LatestId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]),
     {reply, LatestId, State, Timeout};
 handle_call({fast_forward_to_commit, ToTnxId}, _From, State) ->
     NodeId = do_fast_forward_to_commit(ToTnxId, State),
@@ -316,14 +322,14 @@ code_change(_OldVsn, State, _Extra) ->
 catch_up(State) -> catch_up(State, false).
 
 catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
-    case transaction(fun read_next_mfa/1, [Node]) of
+    case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of
         {atomic, caught_up} ->
             ?TIMEOUT;
         {atomic, {still_lagging, NextId, MFA}} ->
             {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE),
             case Succeed orelse SkipResult of
                 true ->
-                    case transaction(fun commit/2, [Node, NextId]) of
+                    case transaction(fun ?MODULE:commit/2, [Node, NextId]) of
                         {atomic, ok} ->
                             catch_up(State, false);
                         Error ->
@@ -367,12 +373,12 @@ commit(Node, TnxId) ->
     ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write).
 
 do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) ->
-    {atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]),
+    {atomic, NodeId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]),
     case NodeId >= ToTnxId of
         true ->
             NodeId;
         false ->
-            {atomic, LatestId} = transaction(fun get_cluster_tnx_id/0, []),
+            {atomic, LatestId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
             case LatestId =< NodeId of
                 true ->
                     NodeId;
@@ -529,11 +535,11 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
     end.
 
 lagging_node(TnxId) ->
-    {atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]),
+    {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]),
     Nodes.
 
 synced_nodes(TnxId) ->
-    {atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]),
+    {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['>=', TnxId]),
     Nodes.
 
 commit_status_trans(Operator, TnxId) ->
@@ -547,5 +553,5 @@ get_retry_ms() ->
 
 maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok;
 maybe_init_tnx_id(Node, TnxId) ->
-    {atomic, _} = transaction(fun commit/2, [Node, TnxId]),
+    {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]),
     ok.

+ 6 - 1
apps/emqx_conf/src/emqx_cluster_rpc_handler.erl

@@ -30,6 +30,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    del_stale_mfa/1
+]).
+
 start_link() ->
     MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
     CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000),
@@ -56,7 +61,7 @@ handle_cast(Msg, State) ->
     {noreply, State}.
 
 handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
-    case mria:transaction(?CLUSTER_RPC_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
+    case mria:transaction(?CLUSTER_RPC_SHARD, fun ?MODULE:del_stale_mfa/1, [MaxHistory]) of
         {atomic, ok} -> ok;
         Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error})
     end,

+ 1 - 1
apps/emqx_conf/src/emqx_conf.app.src

@@ -1,6 +1,6 @@
 {application, emqx_conf, [
     {description, "EMQX configuration management"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {mod, {emqx_conf_app, []}},
     {applications, [kernel, stdlib]},

+ 6 - 1
apps/emqx_gateway/src/emqx_gateway_cm_registry.erl

@@ -42,6 +42,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    do_cleanup_channels/2
+]).
+
 -define(CM_SHARD, emqx_gateway_cm_shard).
 -define(LOCK, {?MODULE, cleanup_down}).
 
@@ -148,7 +153,7 @@ cleanup_channels(Node, Name) ->
     global:trans(
         {?LOCK, self()},
         fun() ->
-            mria:transaction(?CM_SHARD, fun do_cleanup_channels/2, [Node, Tab])
+            mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/2, [Node, Tab])
         end
     ).
 

+ 36 - 27
apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl

@@ -46,6 +46,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    do_register/4
+]).
+
 -export([lookup_name/1]).
 
 -define(SN_SHARD, emqx_sn_shard).
@@ -173,33 +178,11 @@ handle_call(
                 TopicId when TopicId >= 16#FFFF ->
                     {reply, {error, too_large}, State};
                 TopicId ->
-                    Fun = fun() ->
-                        mnesia:write(
-                            Tab,
-                            #emqx_sn_registry{
-                                key = {ClientId, next_topic_id},
-                                value = TopicId + 1
-                            },
-                            write
-                        ),
-                        mnesia:write(
-                            Tab,
-                            #emqx_sn_registry{
-                                key = {ClientId, TopicName},
-                                value = TopicId
-                            },
-                            write
-                        ),
-                        mnesia:write(
-                            Tab,
-                            #emqx_sn_registry{
-                                key = {ClientId, TopicId},
-                                value = TopicName
-                            },
-                            write
-                        )
-                    end,
-                    case mria:transaction(?SN_SHARD, Fun) of
+                    case
+                        mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [
+                            Tab, ClientId, TopicId, TopicName
+                        ])
+                    of
                         {atomic, ok} ->
                             {reply, TopicId, State};
                         {aborted, Error} ->
@@ -248,6 +231,32 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+do_register(Tab, ClientId, TopicId, TopicName) ->
+    mnesia:write(
+        Tab,
+        #emqx_sn_registry{
+            key = {ClientId, next_topic_id},
+            value = TopicId + 1
+        },
+        write
+    ),
+    mnesia:write(
+        Tab,
+        #emqx_sn_registry{
+            key = {ClientId, TopicName},
+            value = TopicId
+        },
+        write
+    ),
+    mnesia:write(
+        Tab,
+        #emqx_sn_registry{
+            key = {ClientId, TopicId},
+            value = TopicName
+        },
+        write
+    ).
+
 %%-----------------------------------------------------------------------------
 
 next_topic_id(Tab, PredefId, ClientId) ->

+ 51 - 44
apps/emqx_modules/src/emqx_telemetry.erl

@@ -54,6 +54,11 @@
 
 -export([official_version/1]).
 
+%% Internal exports (RPC)
+-export([
+    do_ensure_uuids/0
+]).
+
 %% internal export
 -export([read_raw_build_info/0]).
 
@@ -530,54 +535,56 @@ bin(B) when is_binary(B) ->
     B.
 
 ensure_uuids() ->
-    Txn = fun() ->
-        NodeUUID =
-            case mnesia:wread({?TELEMETRY, node()}) of
-                [] ->
-                    NodeUUID0 =
-                        case get_uuid_from_file(node) of
-                            {ok, NUUID} -> NUUID;
-                            undefined -> generate_uuid()
-                        end,
-                    mnesia:write(
-                        ?TELEMETRY,
-                        #telemetry{
-                            id = node(),
-                            uuid = NodeUUID0
-                        },
-                        write
-                    ),
-                    NodeUUID0;
-                [#telemetry{uuid = NodeUUID0}] ->
-                    NodeUUID0
-            end,
-        ClusterUUID =
-            case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of
-                [] ->
-                    ClusterUUID0 =
-                        case get_uuid_from_file(cluster) of
-                            {ok, CUUID} -> CUUID;
-                            undefined -> generate_uuid()
-                        end,
-                    mnesia:write(
-                        ?TELEMETRY,
-                        #telemetry{
-                            id = ?CLUSTER_UUID_KEY,
-                            uuid = ClusterUUID0
-                        },
-                        write
-                    ),
-                    ClusterUUID0;
-                [#telemetry{uuid = ClusterUUID0}] ->
-                    ClusterUUID0
-            end,
-        {NodeUUID, ClusterUUID}
-    end,
-    {atomic, {NodeUUID, ClusterUUID}} = mria:transaction(?TELEMETRY_SHARD, Txn),
+    {atomic, {NodeUUID, ClusterUUID}} = mria:transaction(
+        ?TELEMETRY_SHARD, fun ?MODULE:do_ensure_uuids/0
+    ),
     save_uuid_to_file(NodeUUID, node),
     save_uuid_to_file(ClusterUUID, cluster),
     {NodeUUID, ClusterUUID}.
 
+do_ensure_uuids() ->
+    NodeUUID =
+        case mnesia:wread({?TELEMETRY, node()}) of
+            [] ->
+                NodeUUID0 =
+                    case get_uuid_from_file(node) of
+                        {ok, NUUID} -> NUUID;
+                        undefined -> generate_uuid()
+                    end,
+                mnesia:write(
+                    ?TELEMETRY,
+                    #telemetry{
+                        id = node(),
+                        uuid = NodeUUID0
+                    },
+                    write
+                ),
+                NodeUUID0;
+            [#telemetry{uuid = NodeUUID0}] ->
+                NodeUUID0
+        end,
+    ClusterUUID =
+        case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of
+            [] ->
+                ClusterUUID0 =
+                    case get_uuid_from_file(cluster) of
+                        {ok, CUUID} -> CUUID;
+                        undefined -> generate_uuid()
+                    end,
+                mnesia:write(
+                    ?TELEMETRY,
+                    #telemetry{
+                        id = ?CLUSTER_UUID_KEY,
+                        uuid = ClusterUUID0
+                    },
+                    write
+                ),
+                ClusterUUID0;
+            [#telemetry{uuid = ClusterUUID0}] ->
+                ClusterUUID0
+        end,
+    {NodeUUID, ClusterUUID}.
+
 get_uuid_from_file(Type) ->
     Path = uuid_file_path(Type),
     case file:read_file(Path) of

+ 1 - 1
apps/emqx_psk/src/emqx_psk.app.src

@@ -2,7 +2,7 @@
 {application, emqx_psk, [
     {description, "EMQX PSK"},
     % strict semver, bump manually!
-    {vsn, "5.0.0"},
+    {vsn, "5.0.1"},
     {modules, []},
     {registered, [emqx_psk_sup]},
     {applications, [kernel, stdlib]},

+ 7 - 2
apps/emqx_psk/src/emqx_psk.erl

@@ -43,6 +43,11 @@
     code_change/3
 ]).
 
+%% Internal exports (RPC)
+-export([
+    insert_psks/1
+]).
+
 -record(psk_entry, {
     psk_id :: binary(),
     shared_secret :: binary(),
@@ -199,10 +204,10 @@ import_psks(SrcFile) ->
 import_psks(Io, Delimiter, ChunkSize, NChunk) ->
     case get_psks(Io, Delimiter, ChunkSize) of
         {ok, Entries} ->
-            _ = trans(fun insert_psks/1, [Entries]),
+            _ = trans(fun ?MODULE:insert_psks/1, [Entries]),
             import_psks(Io, Delimiter, ChunkSize, NChunk + 1);
         {eof, Entries} ->
-            _ = trans(fun insert_psks/1, [Entries]),
+            _ = trans(fun ?MODULE:insert_psks/1, [Entries]),
             ok;
         {error, {bad_format, {line, N}}} ->
             {error, {bad_format, {line, NChunk * ChunkSize + N}}};

+ 96 - 96
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -36,6 +36,15 @@
     size/1
 ]).
 
+%% Internal exports (RPC)
+-export([
+    do_store_retained/1,
+    do_clear_expired/0,
+    do_delete_message/1,
+    do_populate_index_meta/1,
+    do_reindex_batch/2
+]).
+
 %% Management API:
 -export([topics/0]).
 
@@ -126,26 +135,8 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
             ok
     end.
 
-store_retained(_, #message{topic = Topic} = Msg) ->
-    ExpiryTime = emqx_retainer:get_expiry_time(Msg),
-    Tokens = topic_to_tokens(Topic),
-    Fun =
-        case is_table_full() of
-            false ->
-                fun() ->
-                    store_retained(db_indices(write), Msg, Tokens, ExpiryTime)
-                end;
-            _ ->
-                fun() ->
-                    case mnesia:read(?TAB_MESSAGE, Tokens, write) of
-                        [_] ->
-                            store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
-                        [] ->
-                            mnesia:abort(table_is_full)
-                    end
-                end
-        end,
-    case mria:transaction(?RETAINER_SHARD, Fun) of
+store_retained(_, Msg = #message{topic = Topic}) ->
+    case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of
         {atomic, ok} ->
             ?tp(debug, message_retained, #{topic => Topic}),
             ok;
@@ -157,7 +148,26 @@ store_retained(_, #message{topic = Topic} = Msg) ->
             })
     end.
 
+do_store_retained(#message{topic = Topic} = Msg) ->
+    ExpiryTime = emqx_retainer:get_expiry_time(Msg),
+    Tokens = topic_to_tokens(Topic),
+    case is_table_full() of
+        false ->
+            store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
+        _ ->
+            case mnesia:read(?TAB_MESSAGE, Tokens, write) of
+                [_] ->
+                    store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
+                [] ->
+                    mnesia:abort(table_is_full)
+            end
+    end.
+
 clear_expired(_) ->
+    {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0),
+    ok.
+
+do_clear_expired() ->
     NowMs = erlang:system_time(millisecond),
     QH = qlc:q([
         TopicTokens
@@ -167,36 +177,29 @@ clear_expired(_) ->
         } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
         (ExpiryTime =/= 0) and (ExpiryTime < NowMs)
     ]),
-    Fun = fun() ->
-        QC = qlc:cursor(QH),
-        clear_batch(db_indices(write), QC)
-    end,
-    {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun),
-    ok.
+    QC = qlc:cursor(QH),
+    clear_batch(db_indices(write), QC).
 
 delete_message(_, Topic) ->
-    Tokens = topic_to_tokens(Topic),
-    DeleteFun =
-        case emqx_topic:wildcard(Topic) of
-            false ->
-                fun() ->
-                    ok = delete_message_by_topic(Tokens, db_indices(write))
-                end;
-            true ->
-                fun() ->
-                    QH = topic_search_table(Tokens),
-                    qlc:fold(
-                        fun(TopicTokens, _) ->
-                            ok = delete_message_by_topic(TopicTokens, db_indices(write))
-                        end,
-                        undefined,
-                        QH
-                    )
-                end
-        end,
-    {atomic, _} = mria:transaction(?RETAINER_SHARD, DeleteFun),
+    {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]),
     ok.
 
+do_delete_message(Topic) ->
+    Tokens = topic_to_tokens(Topic),
+    case emqx_topic:wildcard(Topic) of
+        false ->
+            ok = delete_message_by_topic(Tokens, db_indices(write));
+        true ->
+            QH = topic_search_table(Tokens),
+            qlc:fold(
+                fun(TopicTokens, _) ->
+                    ok = delete_message_by_topic(TopicTokens, db_indices(write))
+                end,
+                undefined,
+                QH
+            )
+    end.
+
 read_message(_, Topic) ->
     {ok, read_messages(Topic)}.
 
@@ -267,16 +270,11 @@ reindex(Force, StatusFun) ->
     reindex(config_indices(), Force, StatusFun).
 
 reindex_status() ->
-    Fun = fun() ->
-        mnesia:read(?TAB_INDEX_META, ?META_KEY)
-    end,
-    case mria:transaction(?RETAINER_SHARD, Fun) of
-        {atomic, [#retained_index_meta{reindexing = true}]} ->
+    case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of
+        [#retained_index_meta{reindexing = true}] ->
             true;
-        {atomic, _} ->
-            false;
-        {aborted, Reason} ->
-            {error, Reason}
+        _ ->
+            false
     end.
 
 %%--------------------------------------------------------------------
@@ -439,37 +437,7 @@ config_indices() ->
 
 populate_index_meta() ->
     ConfigIndices = config_indices(),
-    Fun = fun() ->
-        case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
-            [
-                #retained_index_meta{
-                    read_indices = ReadIndices,
-                    write_indices = WriteIndices,
-                    reindexing = Reindexing
-                }
-            ] ->
-                case {ReadIndices, WriteIndices, Reindexing} of
-                    {_, _, true} ->
-                        ok;
-                    {ConfigIndices, ConfigIndices, false} ->
-                        ok;
-                    {DBWriteIndices, DBReadIndices, false} ->
-                        {error, DBWriteIndices, DBReadIndices}
-                end;
-            [] ->
-                mnesia:write(
-                    ?TAB_INDEX_META,
-                    #retained_index_meta{
-                        key = ?META_KEY,
-                        read_indices = ConfigIndices,
-                        write_indices = ConfigIndices,
-                        reindexing = false
-                    },
-                    write
-                )
-        end
-    end,
-    case mria:transaction(?RETAINER_SHARD, Fun) of
+    case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of
         {atomic, ok} ->
             ok;
         {atomic, {error, DBWriteIndices, DBReadIndices}} ->
@@ -488,6 +456,36 @@ populate_index_meta() ->
             {error, Reason}
     end.
 
+do_populate_index_meta(ConfigIndices) ->
+    case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
+        [
+            #retained_index_meta{
+                read_indices = ReadIndices,
+                write_indices = WriteIndices,
+                reindexing = Reindexing
+            }
+        ] ->
+            case {ReadIndices, WriteIndices, Reindexing} of
+                {_, _, true} ->
+                    ok;
+                {ConfigIndices, ConfigIndices, false} ->
+                    ok;
+                {DBWriteIndices, DBReadIndices, false} ->
+                    {error, DBWriteIndices, DBReadIndices}
+            end;
+        [] ->
+            mnesia:write(
+                ?TAB_INDEX_META,
+                #retained_index_meta{
+                    key = ?META_KEY,
+                    read_indices = ConfigIndices,
+                    write_indices = ConfigIndices,
+                    reindexing = false
+                },
+                write
+            )
+    end.
+
 db_indices(Type) ->
     case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
         [#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
@@ -533,6 +531,7 @@ reindex(NewIndices, Force, StatusFun) when
     end.
 
 try_start_reindex(NewIndices, true) ->
+    %% Note: we don't expect reindexing during upgrade, so this function is internal
     mria:transaction(
         ?RETAINER_SHARD,
         fun() -> start_reindex(NewIndices) end
@@ -566,6 +565,7 @@ start_reindex(NewIndices) ->
     ).
 
 finalize_reindex() ->
+    %% Note: we don't expect reindexing during upgrade, so this function is internal
     {atomic, ok} = mria:transaction(
         ?RETAINER_SHARD,
         fun() ->
@@ -601,16 +601,7 @@ reindex_topic(Indices, Topic) ->
     end.
 
 reindex_batch(QC, Done, StatusFun) ->
-    Fun = fun() ->
-        Indices = db_indices(write),
-        {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
-        ok = lists:foreach(
-            fun(Topic) -> reindex_topic(Indices, Topic) end,
-            Topics
-        ),
-        {Status, Done + length(Topics)}
-    end,
-    case mria:transaction(?RETAINER_SHARD, Fun) of
+    case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of
         {atomic, {more, NewDone}} ->
             _ = StatusFun(NewDone),
             reindex_batch(QC, NewDone, StatusFun);
@@ -625,6 +616,15 @@ reindex_batch(QC, Done, StatusFun) ->
             {error, Reason}
     end.
 
+do_reindex_batch(QC, Done) ->
+    Indices = db_indices(write),
+    {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
+    ok = lists:foreach(
+        fun(Topic) -> reindex_topic(Indices, Topic) end,
+        Topics
+    ),
+    {Status, Done + length(Topics)}.
+
 wait_dispatch_complete(Timeout) ->
     Nodes = mria_mnesia:running_nodes(),
     {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),

+ 1 - 3
apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl

@@ -45,9 +45,7 @@ retainer(["reindex", "status"]) ->
         true ->
             ?PRINT_MSG("Reindexing is in progress~n");
         false ->
-            ?PRINT_MSG("Reindexing is not running~n");
-        {error, Reason} ->
-            ?PRINT("Can't get reindex status: ~p~n", [Reason])
+            ?PRINT_MSG("Reindexing is not running~n")
     end;
 retainer(["reindex", "start"]) ->
     retainer(["reindex", "start", "false"]);