Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
dbfacae283
1 измененных файлов с 45 добавлено и 32 удалено
  1. 45 32
      apps/emqx_durable_storage/src/emqx_ds.erl

+ 45 - 32
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -136,20 +136,29 @@ message_stats() ->
 %% the broker.
 -spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}.
 session_open(ClientID) ->
-    case mnesia:dirty_read(?SESSION_TAB, ClientID) of
-        [#session{}] ->
-            {false, ClientID};
-        [] ->
-            Session = #session{id = ClientID},
-            mria:dirty_write(?SESSION_TAB, Session),
-            {true, ClientID}
-    end.
+    {atomic, Res} =
+        mria:transaction(?DS_SHARD, fun() ->
+            case mnesia:read(?SESSION_TAB, ClientID, write) of
+                [#session{}] ->
+                    {false, ClientID};
+                [] ->
+                    Session = #session{id = ClientID},
+                    mnesia:write(?SESSION_TAB, Session, write),
+                    {true, ClientID}
+            end
+        end),
+    Res.
 
 %% @doc Called when a client reconnects with `clean session=true' or
 %% during session GC
 -spec session_drop(emqx_types:clientid()) -> ok.
 session_drop(ClientID) ->
-    ok = mria:dirty_delete({?SESSION_TAB, ClientID}),
+    {atomic, ok} = mria:transaction(
+        ?DS_SHARD,
+        fun() ->
+            mnesia:delete({?SESSION_TAB, ClientID})
+        end
+    ),
     ok.
 
 %% @doc Called when a client disconnects. This function terminates all
@@ -164,29 +173,33 @@ session_suspend(_SessionId) ->
     {ok, iterator_id(), time(), _IsNew :: boolean()}.
 session_add_iterator(DSSessionId, TopicFilter) ->
     IteratorRefId = {DSSessionId, TopicFilter},
-    case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of
-        [] ->
-            {IteratorId, StartMS} = new_iterator_id(DSSessionId),
-            IteratorRef = #iterator_ref{
-                ref_id = IteratorRefId,
-                it_id = IteratorId,
-                start_time = StartMS
-            },
-            ok = mria:dirty_write(?ITERATOR_REF_TAB, IteratorRef),
-            ?tp(
-                ds_session_subscription_added,
-                #{iterator_id => IteratorId, session_id => DSSessionId}
-            ),
-            IsNew = true,
-            {ok, IteratorId, StartMS, IsNew};
-        [#iterator_ref{it_id = IteratorId, start_time = StartMS}] ->
-            ?tp(
-                ds_session_subscription_present,
-                #{iterator_id => IteratorId, session_id => DSSessionId}
-            ),
-            IsNew = false,
-            {ok, IteratorId, StartMS, IsNew}
-    end.
+    {atomic, Res} =
+        mria:transaction(?DS_SHARD, fun() ->
+            case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of
+                [] ->
+                    {IteratorId, StartMS} = new_iterator_id(DSSessionId),
+                    IteratorRef = #iterator_ref{
+                        ref_id = IteratorRefId,
+                        it_id = IteratorId,
+                        start_time = StartMS
+                    },
+                    ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write),
+                    ?tp(
+                        ds_session_subscription_added,
+                        #{iterator_id => IteratorId, session_id => DSSessionId}
+                    ),
+                    IsNew = true,
+                    {ok, IteratorId, StartMS, IsNew};
+                [#iterator_ref{it_id = IteratorId, start_time = StartMS}] ->
+                    ?tp(
+                        ds_session_subscription_present,
+                        #{iterator_id => IteratorId, session_id => DSSessionId}
+                    ),
+                    IsNew = false,
+                    {ok, IteratorId, StartMS, IsNew}
+            end
+        end),
+    Res.
 
 %% @doc Called when a client unsubscribes from a topic. Returns `true'
 %% if the session contained the subscription or `false' if it wasn't