Просмотр исходного кода

fix(sessds): Apply review remarks

ieQu1 2 лет назад
Родитель
Сommit
f5b9bd30aa

+ 7 - 6
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -373,7 +373,7 @@ publish(_PacketId, Msg, Session) ->
     {ok, emqx_types:message(), replies(), session()}
     | {error, emqx_types:reason_code()}.
 puback(_ClientInfo, PacketId, Session0) ->
-    case commit_seqno(puback, PacketId, Session0) of
+    case update_seqno(puback, PacketId, Session0) of
         {ok, Msg, Session} ->
             {ok, Msg, [], inc_send_quota(Session)};
         Error ->
@@ -388,7 +388,7 @@ puback(_ClientInfo, PacketId, Session0) ->
     {ok, emqx_types:message(), session()}
     | {error, emqx_types:reason_code()}.
 pubrec(PacketId, Session0) ->
-    case commit_seqno(pubrec, PacketId, Session0) of
+    case update_seqno(pubrec, PacketId, Session0) of
         {ok, Msg, Session} ->
             {ok, Msg, Session};
         Error = {error, _} ->
@@ -413,7 +413,7 @@ pubrel(_PacketId, Session = #{}) ->
     {ok, emqx_types:message(), replies(), session()}
     | {error, emqx_types:reason_code()}.
 pubcomp(_ClientInfo, PacketId, Session0) ->
-    case commit_seqno(pubcomp, PacketId, Session0) of
+    case update_seqno(pubcomp, PacketId, Session0) of
         {ok, Msg, Session} ->
             {ok, Msg, [], inc_send_quota(Session)};
         Error = {error, _} ->
@@ -540,6 +540,7 @@ sync(ClientId) ->
                 {'DOWN', Ref, process, _Pid, Reason} ->
                     {error, Reason};
                 Ref ->
+                    demonitor(Ref, [flush]),
                     ok
             end;
         [] ->
@@ -767,7 +768,7 @@ process_batch(
                     SeqNoQos2 = inc_seqno(?QOS_2, SeqNoQos20)
             end,
             {
-                case Msg#message.qos of
+                case Qos of
                     ?QOS_0 when IsReplay ->
                         %% We ignore QoS 0 messages during replay:
                         Acc;
@@ -895,9 +896,9 @@ bump_interval() ->
 %% SeqNo tracking
 %% --------------------------------------------------------------------
 
--spec commit_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
+-spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
     {ok, emqx_types:message(), session()} | {error, _}.
-commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
+update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
     SeqNo = packet_id_to_seqno(PacketId, S),
     case Track of
         puback ->

+ 9 - 6
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -31,7 +31,7 @@
 %%   -----|----------|-----|-----|------> seqno
 %%        |          |     |     |
 %%   committed      dup   rec   next
-%                        (Qos2)
+%%                       (Qos2)
 
 %% Seqno becomes committed after receiving PUBACK for QoS1 or PUBCOMP
 %% for QoS2.
@@ -41,23 +41,26 @@
 %% committed..dup range are retransmitted with DUP flag.
 %%
 -define(dup(QOS), (10 + QOS)).
+%% Rec flag is specific for the QoS2. It contains seqno of the last
+%% PUBREC received from the client. When the session reconnects,
+%% PUBREL packages for the dup..rec range are retransmitted.
 -define(rec, 22).
-%% Last seqno assigned to a message.
+%% Last seqno assigned to a message (it may not be sent yet).
 -define(next(QOS), (30 + QOS)).
 
 %%%%% State of the stream:
 -record(ifs, {
     rank_x :: emqx_ds:rank_x(),
     rank_y :: emqx_ds:rank_y(),
-    %% Iterator at the beginning and end of the last batch:
+    %% Iterators at the beginning and the end of the last batch:
     it_begin :: emqx_ds:iterator() | undefined,
     it_end :: emqx_ds:iterator() | end_of_stream,
-    %% Key that points at the beginning of the batch:
+    %% Size of the last batch:
     batch_size = 0 :: non_neg_integer(),
-    %% Session sequence number at the time when the batch was fetched:
+    %% Session sequence numbers at the time when the batch was fetched:
     first_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(),
     first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
-    %% Number of messages collected in the last batch:
+    %% Sequence numbers that have to be committed for the batch:
     last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(),
     last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno()
 }).

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl

@@ -126,7 +126,7 @@ gc_loop(MinLastAlive, It0) ->
 
 do_gc(SessionId, MinLastAlive, LastAliveAt, EI) when LastAliveAt + EI < MinLastAlive ->
     emqx_persistent_session_ds:destroy_session(SessionId),
-    ?tp(error, ds_session_gc_cleaned, #{
+    ?tp(debug, ds_session_gc_cleaned, #{
         session_id => SessionId,
         last_alive_at => LastAliveAt,
         expiry_interval => EI,

+ 0 - 3
apps/emqx/src/emqx_persistent_session_ds_inflight.erl

@@ -18,9 +18,6 @@
 %% API:
 -export([new/1, push/2, pop/1, n_buffered/2, n_inflight/1, inc_send_quota/1, receive_maximum/1]).
 
-%% behavior callbacks:
--export([]).
-
 %% internal exports:
 -export([]).
 

+ 6 - 5
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -68,9 +68,10 @@
 %% It should be possible to make frequent changes to the pmap without
 %% stressing Mria.
 %%
-%% It's implemented as three maps: `clean', `dirty' and `tombstones'.
-%% Updates are made to the `dirty' area. `pmap_commit' function saves
-%% the updated entries to Mnesia and moves them to the `clean' area.
+%% It's implemented as two maps: `cache', and `dirty'. `cache' stores
+%% the data, and `dirty' contains information about dirty and deleted
+%% keys. When `commit/1' is called, dirty keys are dumped to the
+%% tables, and deleted keys are removed from the tables.
 -record(pmap, {table, cache, dirty}).
 
 -type pmap(K, V) ::
@@ -530,9 +531,9 @@ kv_pmap_persist(Tab, SessionId, Key, Val0) ->
     mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write).
 
 kv_pmap_restore(Table, SessionId) ->
-    MS = [{#kv{k = {SessionId, '_'}, _ = '_'}, [], ['$_']}],
+    MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}],
     Objs = mnesia:select(Table, MS, read),
-    [{K, encoder(decode, Table, V)} || #kv{k = {_, K}, v = V} <- Objs].
+    [{K, encoder(decode, Table, V)} || {K, V} <- Objs].
 
 kv_pmap_delete(Table, SessionId) ->
     MS = [{#kv{k = {SessionId, '$1'}, _ = '_'}, [], ['$1']}],

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -142,7 +142,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
     case emqx_persistent_session_ds_state:get_stream(Key, S) of
         undefined ->
             ?SLOG(debug, #{
-                '$msg' => new_stream, key => Key, stream => Stream
+                msg => new_stream, key => Key, stream => Stream
             }),
             {ok, Iterator} = emqx_ds:make_iterator(
                 ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime

+ 1 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -554,7 +554,7 @@ t_process_dies_session_expires(Config) ->
 
     ok = publish(Topic, Payload),
 
-    timer:sleep(1100),
+    timer:sleep(2000),
 
     {ok, Client2} = emqtt:start_link([
         {proto_ver, v5},

+ 7 - 0
changes/ce/feat-12251.en.md

@@ -0,0 +1,7 @@
+Optimize performance of the RocksDB-based persistent session.
+Reduce RAM usage and frequency of database requests.
+
+- Introduce dirty session state to avoid frequent mria transactions
+- Introduce an intermediate buffer for the persistent messages
+- Use separate tracks of PacketIds for QoS1 and QoS2 messages
+- Limit the number of continuous ranges of infligtht messages to one per stream