Procházet zdrojové kódy

feat: rm unnecessary transactions, use separate table for iterator references

Thales Macedo Garitezi před 2 roky
rodič
revize
3239f5ac5b

+ 28 - 44
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -141,12 +141,7 @@ session_open(ClientID) ->
 %% during session GC
 -spec session_drop(emqx_types:clientid()) -> ok.
 session_drop(ClientID) ->
-    {atomic, ok} = mria:transaction(
-        ?DS_SHARD,
-        fun() ->
-            mnesia:delete({?SESSION_TAB, ClientID})
-        end
-    ),
+    ok = mria:dirty_delete({?SESSION_TAB, ClientID}),
     ok.
 
 %% @doc Called when a client disconnects. This function terminates all
@@ -158,38 +153,32 @@ session_suspend(_SessionId) ->
 
 %% @doc Called when a client subscribes to a topic. Idempotent.
 -spec session_add_iterator(session_id(), emqx_topic:words()) ->
-    {ok, iterator_id(), time(), _IsNew :: boolean()} | {error, session_not_found}.
+    {ok, iterator_id(), time(), _IsNew :: boolean()}.
 session_add_iterator(DSSessionId, TopicFilter) ->
-    {atomic, Ret} =
-        mria:transaction(
-            ?DS_SHARD,
-            fun() ->
-                case mnesia:wread({?SESSION_TAB, DSSessionId}) of
-                    [] ->
-                        {error, session_not_found};
-                    [#session{iterators = #{TopicFilter := IteratorId}}] ->
-                        StartMS = get_start_ms(IteratorId, DSSessionId),
-                        ?tp(
-                            ds_session_subscription_present,
-                            #{iterator_id => IteratorId, session_id => DSSessionId}
-                        ),
-                        IsNew = false,
-                        {ok, IteratorId, StartMS, IsNew};
-                    [#session{iterators = Iterators0} = Session0] ->
-                        {IteratorId, StartMS} = new_iterator_id(DSSessionId),
-                        Iterators = Iterators0#{TopicFilter => IteratorId},
-                        Session = Session0#session{iterators = Iterators},
-                        mnesia:write(?SESSION_TAB, Session, write),
-                        ?tp(
-                            ds_session_subscription_added,
-                            #{iterator_id => IteratorId, session_id => DSSessionId}
-                        ),
-                        IsNew = true,
-                        {ok, IteratorId, StartMS, IsNew}
-                end
-            end
-        ),
-    Ret.
+    IteratorRefId = {DSSessionId, TopicFilter},
+    case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of
+        [] ->
+            {IteratorId, StartMS} = new_iterator_id(DSSessionId),
+            IteratorRef = #iterator_ref{
+                ref_id = IteratorRefId,
+                it_id = IteratorId,
+                start_time = StartMS
+            },
+            ok = mria:dirty_write(?ITERATOR_REF_TAB, IteratorRef),
+            ?tp(
+                ds_session_subscription_added,
+                #{iterator_id => IteratorId, session_id => DSSessionId}
+            ),
+            IsNew = true,
+            {ok, IteratorId, StartMS, IsNew};
+        [#iterator_ref{it_id = IteratorId, start_time = StartMS}] ->
+            ?tp(
+                ds_session_subscription_present,
+                #{iterator_id => IteratorId, session_id => DSSessionId}
+            ),
+            IsNew = false,
+            {ok, IteratorId, StartMS, IsNew}
+    end.
 
 %% @doc Called when a client unsubscribes from a topic. Returns `true'
 %% if the session contained the subscription or `false' if it wasn't
@@ -231,10 +220,5 @@ iterator_stats() ->
 -spec new_iterator_id(session_id()) -> {iterator_id(), time()}.
 new_iterator_id(DSSessionId) ->
     NowMS = erlang:system_time(microsecond),
-    NowMSBin = integer_to_binary(NowMS),
-    {<<DSSessionId/binary, NowMSBin/binary>>, NowMS}.
-
--spec get_start_ms(iterator_id(), emqx_session:session_id()) -> time().
-get_start_ms(IteratorId, SessionId) ->
-    <<SessionId:(size(SessionId))/binary, StartMSBin/binary>> = IteratorId,
-    binary_to_integer(StartMSBin).
+    IteratorId = <<DSSessionId/binary, (emqx_guid:gen())/binary>>,
+    {IteratorId, NowMS}.

+ 12 - 1
apps/emqx_durable_storage/src/emqx_ds_app.erl

@@ -25,7 +25,18 @@ init_mnesia() ->
             {record_name, session},
             {attributes, record_info(fields, session)}
         ]
-    ).
+    ),
+    ok = mria:create_table(
+        ?ITERATOR_REF_TAB,
+        [
+            {rlog_shard, ?DS_SHARD},
+            {type, ordered_set},
+            {storage, storage()},
+            {record_name, iterator_ref},
+            {attributes, record_info(fields, iterator_ref)}
+        ]
+    ),
+    ok.
 
 storage() ->
     case mria:rocksdb_backend_available() of

+ 7 - 0
apps/emqx_durable_storage/src/emqx_ds_int.hrl

@@ -17,6 +17,7 @@
 -define(EMQX_DS_HRL, true).
 
 -define(SESSION_TAB, emqx_ds_session).
+-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref).
 -define(DS_SHARD, emqx_ds_shard).
 
 -record(session, {
@@ -24,4 +25,10 @@
     iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()}
 }).
 
+-record(iterator_ref, {
+    ref_id :: {emqx_ds:session_id(), emqx_topic:words()},
+    it_id :: emqx_ds:iterator_id(),
+    start_time :: emqx_ds:time()
+}).
+
 -endif.