Преглед изворни кода

refactor(sessds): Simplify representation of QoS tracks

ieQu1 пре 2 година
родитељ
комит
978a3bfef3

+ 7 - 5
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -290,7 +290,7 @@ subscribe(
             %% router and iterator information can be reconstructed
             %% router and iterator information can be reconstructed
             %% from this table, if needed.
             %% from this table, if needed.
             ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
             ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
-            {SubId, S1} = emqx_persistent_session_ds_state:new_subid(S0),
+            {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
             Subscription = #{
             Subscription = #{
                 start_time => now_ms(),
                 start_time => now_ms(),
                 props => SubOpts,
                 props => SubOpts,
@@ -314,10 +314,10 @@ unsubscribe(
     TopicFilter,
     TopicFilter,
     Session = #{id := ID, s := S0}
     Session = #{id := ID, s := S0}
 ) ->
 ) ->
-    %% TODO: drop streams and messages from the buffer
     case subs_lookup(TopicFilter, S0) of
     case subs_lookup(TopicFilter, S0) of
-        #{props := SubOpts, id := _SubId} ->
-            S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
+        #{props := SubOpts, id := SubId} ->
+            S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
+            S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1),
             ?tp_span(
             ?tp_span(
                 persistent_session_ds_subscription_route_delete,
                 persistent_session_ds_subscription_route_delete,
                 #{session_id => ID},
                 #{session_id => ID},
@@ -662,11 +662,13 @@ enqueue_batch(IsReplay, BatchSize, Ifs0, Session = #{inflight := Inflight0}, Cli
     case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It0, BatchSize) of
     case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It0, BatchSize) of
         {ok, It, []} ->
         {ok, It, []} ->
             %% No new messages; just update the end iterator:
             %% No new messages; just update the end iterator:
+            logger:warning(#{msg => "batch_empty"}),
             {Ifs0#ifs{it_end = It}, Inflight0};
             {Ifs0#ifs{it_end = It}, Inflight0};
         {ok, end_of_stream} ->
         {ok, end_of_stream} ->
             %% No new messages; just update the end iterator:
             %% No new messages; just update the end iterator:
             {Ifs0#ifs{it_end = end_of_stream}, Inflight0};
             {Ifs0#ifs{it_end = end_of_stream}, Inflight0};
-        {ok, It, Messages} ->
+        {ok, It, [{K, _} | _] = Messages} ->
+            logger:warning(#{msg => "batch", it => K, msgs => length(Messages)}),
             {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
             {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
                 IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0
                 IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0
             ),
             ),

+ 11 - 7
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -34,15 +34,19 @@
 
 
 %% Seqno becomes committed after receiving PUBACK for QoS1 or PUBCOMP
 %% Seqno becomes committed after receiving PUBACK for QoS1 or PUBCOMP
 %% for QoS2.
 %% for QoS2.
--define(committed(QOS), {0, QOS}).
+-define(committed(QOS), QOS).
 %% Seqno becomes dup:
 %% Seqno becomes dup:
 %%
 %%
-%% 1. After broker sends QoS1 message to the client
-%% 2. After it receives PUBREC from the client for the QoS2 message
--define(dup(QOS), {1, QOS}).
-%% Last seqno assigned to some message (that may reside in the
-%% mqueue):
--define(next(QOS), {2, QOS}).
+%% 1. After broker sends QoS1 message to the client. Upon session
+%% reconnect, QoS1 messages with seqno in the committed..dup range are
+%% retransmitted with DUP flag.
+%%
+%% 2. After it receives PUBREC from the client for the QoS2 message.
+%% Upon session reconnect, PUBREL for QoS2 messages with seqno in
+%% committed..dup are retransmitted.
+-define(dup(QOS), (10 + QOS)).
+%% Last seqno assigned to a message.
+-define(next(QOS), (20 + QOS)).
 
 
 %%%%% State of the stream:
 %%%%% State of the stream:
 -record(ifs, {
 -record(ifs, {

+ 38 - 40
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -30,7 +30,7 @@
 -export([get_created_at/1, set_created_at/2]).
 -export([get_created_at/1, set_created_at/2]).
 -export([get_last_alive_at/1, set_last_alive_at/2]).
 -export([get_last_alive_at/1, set_last_alive_at/2]).
 -export([get_conninfo/1, set_conninfo/2]).
 -export([get_conninfo/1, set_conninfo/2]).
--export([new_subid/1]).
+-export([new_id/1]).
 -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
 -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
 -export([get_seqno/2, put_seqno/3]).
 -export([get_seqno/2, put_seqno/3]).
 -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
 -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
@@ -43,6 +43,7 @@
 
 
 -include("emqx_mqtt.hrl").
 -include("emqx_mqtt.hrl").
 -include("emqx_persistent_session_ds.hrl").
 -include("emqx_persistent_session_ds.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 
 %%================================================================================
 %%================================================================================
 %% Type declarations
 %% Type declarations
@@ -79,14 +80,15 @@
 -define(created_at, created_at).
 -define(created_at, created_at).
 -define(last_alive_at, last_alive_at).
 -define(last_alive_at, last_alive_at).
 -define(conninfo, conninfo).
 -define(conninfo, conninfo).
--define(last_subid, last_subid).
+%% Unique integer used to create unique identities
+-define(last_id, last_id).
 
 
 -type metadata() ::
 -type metadata() ::
     #{
     #{
         ?created_at => emqx_persistent_session_ds:timestamp(),
         ?created_at => emqx_persistent_session_ds:timestamp(),
         ?last_alive_at => emqx_persistent_session_ds:timestamp(),
         ?last_alive_at => emqx_persistent_session_ds:timestamp(),
         ?conninfo => emqx_types:conninfo(),
         ?conninfo => emqx_types:conninfo(),
-        ?last_subid => integer()
+        ?last_id => integer()
     }.
     }.
 
 
 -type seqno_type() ::
 -type seqno_type() ::
@@ -112,7 +114,7 @@
 -define(stream_tab, emqx_ds_session_streams).
 -define(stream_tab, emqx_ds_session_streams).
 -define(seqno_tab, emqx_ds_session_seqnos).
 -define(seqno_tab, emqx_ds_session_seqnos).
 -define(rank_tab, emqx_ds_session_ranks).
 -define(rank_tab, emqx_ds_session_ranks).
--define(bag_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]).
+-define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]).
 
 
 %%================================================================================
 %%================================================================================
 %% API funcions
 %% API funcions
@@ -130,8 +132,8 @@ create_tables() ->
             {attributes, record_info(fields, kv)}
             {attributes, record_info(fields, kv)}
         ]
         ]
     ),
     ),
-    [create_kv_bag_table(Table) || Table <- ?bag_tables],
-    mria:wait_for_tables([?session_tab | ?bag_tables]).
+    [create_kv_pmap_table(Table) || Table <- ?pmap_tables],
+    mria:wait_for_tables([?session_tab | ?pmap_tables]).
 
 
 -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined.
 -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined.
 open(SessionId) ->
 open(SessionId) ->
@@ -191,7 +193,7 @@ list_sessions() ->
 delete(Id) ->
 delete(Id) ->
     transaction(
     transaction(
         fun() ->
         fun() ->
-            [kv_delete(Table, Id) || Table <- ?bag_tables],
+            [kv_pmap_delete(Table, Id) || Table <- ?pmap_tables],
             mnesia:delete(?session_tab, Id, write)
             mnesia:delete(?session_tab, Id, write)
         end
         end
     ).
     ).
@@ -259,14 +261,14 @@ get_conninfo(Rec) ->
 set_conninfo(Val, Rec) ->
 set_conninfo(Val, Rec) ->
     set_meta(?conninfo, Val, Rec).
     set_meta(?conninfo, Val, Rec).
 
 
--spec new_subid(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
-new_subid(Rec) ->
-    LastSubId =
-        case get_meta(?last_subid, Rec) of
+-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
+new_id(Rec) ->
+    LastId =
+        case get_meta(?last_id, Rec) of
             undefined -> 0;
             undefined -> 0;
             N when is_integer(N) -> N
             N when is_integer(N) -> N
         end,
         end,
-    {LastSubId, set_meta(?last_subid, LastSubId + 1, Rec)}.
+    {LastId, set_meta(?last_id, LastId + 1, Rec)}.
 
 
 %%
 %%
 
 
@@ -283,7 +285,7 @@ get_subscriptions(#{subscriptions := Subs}) ->
 put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) ->
 put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) ->
     %% Note: currently changes to the subscriptions are persisted immediately.
     %% Note: currently changes to the subscriptions are persisted immediately.
     Key = {TopicFilter, SubId},
     Key = {TopicFilter, SubId},
-    transaction(fun() -> kv_bag_persist(?subscription_tab, Id, Key, Subscription) end),
+    transaction(fun() -> kv_pmap_persist(?subscription_tab, Id, Key, Subscription) end),
     Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0),
     Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0),
     Rec#{subscriptions => Subs}.
     Rec#{subscriptions => Subs}.
 
 
@@ -291,13 +293,13 @@ put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptio
 del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) ->
 del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) ->
     %% Note: currently the subscriptions are persisted immediately.
     %% Note: currently the subscriptions are persisted immediately.
     Key = {TopicFilter, SubId},
     Key = {TopicFilter, SubId},
-    transaction(fun() -> kv_bag_delete(?subscription_tab, Id, Key) end),
+    transaction(fun() -> kv_pmap_delete(?subscription_tab, Id, Key) end),
     Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0),
     Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0),
     Rec#{subscriptions => Subs}.
     Rec#{subscriptions => Subs}.
 
 
 %%
 %%
 
 
--type stream_key() :: {emqx_persistent_session_ds:subscription_id(), emqx_ds:stream()}.
+-type stream_key() :: {emqx_persistent_session_ds:subscription_id(), binary()}.
 
 
 -spec get_stream(stream_key(), t()) ->
 -spec get_stream(stream_key(), t()) ->
     emqx_persistent_session_ds:stream_state() | undefined.
     emqx_persistent_session_ds:stream_state() | undefined.
@@ -390,7 +392,7 @@ gen_del(Field, Key, Rec) ->
 %%
 %%
 
 
 read_subscriptions(SessionId) ->
 read_subscriptions(SessionId) ->
-    Records = kv_bag_restore(?subscription_tab, SessionId),
+    Records = kv_pmap_restore(?subscription_tab, SessionId),
     lists:foldl(
     lists:foldl(
         fun({{TopicFilter, SubId}, Subscription}, Acc) ->
         fun({{TopicFilter, SubId}, Subscription}, Acc) ->
             emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc)
             emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc)
@@ -405,7 +407,7 @@ read_subscriptions(SessionId) ->
 %% This functtion should be ran in a transaction.
 %% This functtion should be ran in a transaction.
 -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
 -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
 pmap_open(Table, SessionId) ->
 pmap_open(Table, SessionId) ->
-    Clean = maps:from_list(kv_bag_restore(Table, SessionId)),
+    Clean = maps:from_list(kv_pmap_restore(Table, SessionId)),
     #pmap{
     #pmap{
         table = Table,
         table = Table,
         cache = Clean,
         cache = Clean,
@@ -444,10 +446,10 @@ pmap_commit(
     maps:foreach(
     maps:foreach(
         fun
         fun
             (K, del) ->
             (K, del) ->
-                kv_bag_delete(Tab, SessionId, K);
+                kv_pmap_delete(Tab, SessionId, K);
             (K, dirty) ->
             (K, dirty) ->
                 V = maps:get(K, Cache),
                 V = maps:get(K, Cache),
-                kv_bag_persist(Tab, SessionId, K, V)
+                kv_pmap_persist(Tab, SessionId, K, V)
         end,
         end,
         Dirty
         Dirty
     ),
     ),
@@ -465,47 +467,43 @@ kv_persist(Tab, SessionId, Val0) ->
     Val = encoder(encode, Tab, Val0),
     Val = encoder(encode, Tab, Val0),
     mnesia:write(Tab, #kv{k = SessionId, v = Val}, write).
     mnesia:write(Tab, #kv{k = SessionId, v = Val}, write).
 
 
-kv_delete(Table, Namespace) ->
-    mnesia:delete({Table, Namespace}).
-
 kv_restore(Tab, SessionId) ->
 kv_restore(Tab, SessionId) ->
     [encoder(decode, Tab, V) || #kv{v = V} <- mnesia:read(Tab, SessionId)].
     [encoder(decode, Tab, V) || #kv{v = V} <- mnesia:read(Tab, SessionId)].
 
 
 %% Functions dealing with bags:
 %% Functions dealing with bags:
 
 
 %% @doc Create a mnesia table for the PMAP:
 %% @doc Create a mnesia table for the PMAP:
--spec create_kv_bag_table(atom()) -> ok.
-create_kv_bag_table(Table) ->
+-spec create_kv_pmap_table(atom()) -> ok.
+create_kv_pmap_table(Table) ->
     mria:create_table(Table, [
     mria:create_table(Table, [
-        {type, bag},
+        {type, ordered_set},
         {rlog_shard, ?DS_MRIA_SHARD},
         {rlog_shard, ?DS_MRIA_SHARD},
         {storage, rocksdb_copies},
         {storage, rocksdb_copies},
         {record_name, kv},
         {record_name, kv},
         {attributes, record_info(fields, kv)}
         {attributes, record_info(fields, kv)}
     ]).
     ]).
 
 
-kv_bag_persist(Tab, SessionId, Key, Val0) ->
-    %% Remove the previous entry corresponding to the key:
-    kv_bag_delete(Tab, SessionId, Key),
+kv_pmap_persist(Tab, SessionId, Key, Val0) ->
     %% Write data to mnesia:
     %% Write data to mnesia:
     Val = encoder(encode, Tab, Val0),
     Val = encoder(encode, Tab, Val0),
-    mnesia:write(Tab, #kv{k = SessionId, v = {Key, Val}}, write).
+    mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write).
+
+kv_pmap_restore(Table, SessionId) ->
+    MS = [{#kv{k = {SessionId, '_'}, _ = '_'}, [], ['$_']}],
+    Objs = mnesia:select(Table, MS, read),
+    [{K, encoder(decode, Table, V)} || #kv{k = {_, K}, v = V} <- Objs].
 
 
-kv_bag_restore(Tab, SessionId) ->
-    [{K, encoder(decode, Tab, V)} || #kv{v = {K, V}} <- mnesia:read(Tab, SessionId)].
+kv_pmap_delete(Table, SessionId) ->
+    MS = [{#kv{k = {SessionId, '$1'}, _ = '_'}, [], ['$1']}],
+    Keys = mnesia:select(Table, MS, read),
+    [mnesia:delete(Table, {SessionId, K}, write) || K <- Keys],
+    ok.
 
 
-kv_bag_delete(Table, SessionId, Key) ->
+kv_pmap_delete(Table, SessionId, Key) ->
     %% Note: this match spec uses a fixed primary key, so it doesn't
     %% Note: this match spec uses a fixed primary key, so it doesn't
     %% require a table scan, and the transaction doesn't grab the
     %% require a table scan, and the transaction doesn't grab the
     %% whole table lock:
     %% whole table lock:
-    MS = [{#kv{k = SessionId, v = {Key, '_'}}, [], ['$_']}],
-    Objs = mnesia:select(Table, MS, write),
-    lists:foreach(
-        fun(Obj) ->
-            mnesia:delete_object(Table, Obj, write)
-        end,
-        Objs
-    ).
+    mnesia:delete(Table, {SessionId, Key}, write).
 
 
 %%
 %%
 
 

+ 22 - 2
apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl

@@ -17,7 +17,7 @@
 
 
 %% API:
 %% API:
 -export([find_new_streams/1, find_replay_streams/1]).
 -export([find_new_streams/1, find_replay_streams/1]).
--export([renew_streams/1]).
+-export([renew_streams/1, del_subscription/2]).
 
 
 %% behavior callbacks:
 %% behavior callbacks:
 -export([]).
 -export([]).
@@ -113,12 +113,31 @@ renew_streams(S0) ->
         emqx_persistent_session_ds_state:get_subscriptions(S1)
         emqx_persistent_session_ds_state:get_subscriptions(S1)
     ).
     ).
 
 
+-spec del_subscription(
+    emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
+) ->
+    emqx_persistent_session_ds_state:t().
+del_subscription(SubId, S0) ->
+    emqx_persistent_session_ds_state:fold_streams(
+        fun(Key, _, Acc) ->
+            case Key of
+                {SubId, _Stream} ->
+                    emqx_persistent_session_ds_state:del_stream(Key, Acc);
+                _ ->
+                    Acc
+            end
+        end,
+        S0,
+        S0
+    ).
+
 %%================================================================================
 %%================================================================================
 %% Internal functions
 %% Internal functions
 %%================================================================================
 %%================================================================================
 
 
 ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
 ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
-    Key = {SubId, Stream},
+    %% TODO: use next_id to enumerate streams
+    Key = {SubId, term_to_binary(Stream)},
     case emqx_persistent_session_ds_state:get_stream(Key, S) of
     case emqx_persistent_session_ds_state:get_stream(Key, S) of
         undefined ->
         undefined ->
             {ok, Iterator} = emqx_ds:make_iterator(
             {ok, Iterator} = emqx_ds:make_iterator(
@@ -127,6 +146,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
             NewStreamState = #ifs{
             NewStreamState = #ifs{
                 rank_x = RankX,
                 rank_x = RankX,
                 rank_y = RankY,
                 rank_y = RankY,
+                it_begin = Iterator,
                 it_end = Iterator
                 it_end = Iterator
             },
             },
             emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
             emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);

+ 9 - 4
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -54,7 +54,8 @@ all() ->
 groups() ->
 groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
     TCs = emqx_common_test_helpers:all(?MODULE),
     TCsNonGeneric = [t_choose_impl],
     TCsNonGeneric = [t_choose_impl],
-    TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
+    % {group, quic}, {group, ws}],
+    TCGroups = [{group, tcp}],
     [
     [
         %% {persistence_disabled, TCGroups},
         %% {persistence_disabled, TCGroups},
         {persistence_enabled, TCGroups},
         {persistence_enabled, TCGroups},
@@ -694,6 +695,9 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
     ok = publish_many(Pubs2),
     ok = publish_many(Pubs2),
     NPubs2 = length(Pubs2),
     NPubs2 = length(Pubs2),
 
 
+    _ = receive_messages(NPubs1, 2000),
+    [] = receive_messages(NPubs1, 2000),
+    debug_info(ClientId),
     {ok, Client2} = emqtt:start_link([
     {ok, Client2} = emqtt:start_link([
         {proto_ver, v5},
         {proto_ver, v5},
         {clientid, ClientId},
         {clientid, ClientId},
@@ -702,12 +706,14 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
         {auto_ack, false}
         {auto_ack, false}
         | Config
         | Config
     ]),
     ]),
+
     {ok, _} = emqtt:ConnFun(Client2),
     {ok, _} = emqtt:ConnFun(Client2),
 
 
     %% Try to receive _at most_ `NPubs` messages.
     %% Try to receive _at most_ `NPubs` messages.
     %% There shouldn't be that much unacked messages in the replay anyway,
     %% There shouldn't be that much unacked messages in the replay anyway,
     %% but it's an easy number to pick.
     %% but it's an easy number to pick.
     NPubs = NPubs1 + NPubs2,
     NPubs = NPubs1 + NPubs2,
+
     Msgs2 = receive_messages(NPubs, _Timeout = 2000),
     Msgs2 = receive_messages(NPubs, _Timeout = 2000),
     NMsgs2 = length(Msgs2),
     NMsgs2 = length(Msgs2),
 
 
@@ -1086,7 +1092,6 @@ skip_ds_tc(Config) ->
             Config
             Config
     end.
     end.
 
 
-throw_with_debug_info(Error, ClientId) ->
+debug_info(ClientId) ->
     Info = emqx_persistent_session_ds:print_session(ClientId),
     Info = emqx_persistent_session_ds:print_session(ClientId),
-    ct:pal("!!! Assertion failed: ~p~nState:~n~p", [Error, Info]),
-    exit(Error).
+    ct:pal("*** State:~n~p", [Info]).