Prechádzať zdrojové kódy

feat(sessds): Store awaiting rel

ieQu1 1 rok pred
rodič
commit
3e0c649e8e

+ 86 - 25
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -184,7 +184,9 @@
     seqno_q2_dup,
     seqno_q2_rec,
     seqno_q2_next,
-    n_streams
+    n_streams,
+    awaiting_rel_cnt,
+    awaiting_rel_max
 ]).
 
 %%
@@ -206,7 +208,8 @@ open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
     ok = emqx_cm:takeover_kick(ClientID),
     case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of
         Session0 = #{} ->
-            Session = Session0#{props => Conf},
+            Session1 = Session0#{props => Conf},
+            Session = do_expire(ClientInfo, Session1),
             {true, ensure_timers(Session), []};
         false ->
             false
@@ -262,21 +265,21 @@ info(inflight_max, #{inflight := Inflight}) ->
     emqx_persistent_session_ds_inflight:receive_maximum(Inflight);
 info(retry_interval, #{props := Conf}) ->
     maps:get(retry_interval, Conf);
-% info(mqueue, #sessmem{mqueue = MQueue}) ->
-%     MQueue;
 info(mqueue_len, #{inflight := Inflight}) ->
     emqx_persistent_session_ds_inflight:n_buffered(all, Inflight);
-% info(mqueue_max, #sessmem{mqueue = MQueue}) ->
-%     emqx_mqueue:max_len(MQueue);
 info(mqueue_dropped, _Session) ->
     0;
 %% info(next_pkt_id, #{s := S}) ->
 %%     {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S),
 %%     PacketId;
-% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) ->
-%     AwaitingRel;
-%% info(awaiting_rel_cnt, #{s := S}) ->
-%%     seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
+info(awaiting_rel, #{s := S}) ->
+    emqx_persistent_session_ds_state:fold_awaiting_rel(fun maps:put/3, #{}, S);
+info(awaiting_rel_max, #{props := Conf}) ->
+    maps:get(max_awaiting_rel, Conf);
+info(awaiting_rel_cnt, #{s := S}) ->
+    emqx_persistent_session_ds_state:n_awaiting_rel(S);
+info(await_rel_timeout, #{props := Conf}) ->
+    maps:get(await_rel_timeout, Conf);
 info(seqno_q1_comm, #{s := S}) ->
     emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S);
 info(seqno_q1_dup, #{s := S}) ->
@@ -292,17 +295,7 @@ info(seqno_q2_rec, #{s := S}) ->
 info(seqno_q2_next, #{s := S}) ->
     emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S);
 info(n_streams, #{s := S}) ->
-    emqx_persistent_session_ds_state:fold_streams(
-        fun(_, _, Acc) -> Acc + 1 end,
-        0,
-        S
-    );
-info(awaiting_rel_max, #{props := Conf}) ->
-    maps:get(max_awaiting_rel, Conf);
-info(await_rel_timeout, #{props := _Conf}) ->
-    %% TODO: currently this setting is ignored:
-    %% maps:get(await_rel_timeout, Conf).
-    0;
+    emqx_persistent_session_ds_state:n_streams(S);
 info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs ->
     {error, not_implemented}.
 
@@ -446,11 +439,72 @@ get_subscription(TopicFilter, #{s := S}) ->
 -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) ->
     {ok, emqx_types:publish_result(), session()}
     | {error, emqx_types:reason_code()}.
+publish(
+    PacketId,
+    Msg = #message{qos = ?QOS_2, timestamp = Ts},
+    Session = #{s := S0}
+) ->
+    case is_awaiting_full(Session) of
+        false ->
+            case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of
+                undefined ->
+                    Results = emqx_broker:publish(Msg),
+                    S = emqx_persistent_session_ds_state:put_awaiting_rel(PacketId, Ts, S0),
+                    {ok, Results, Session#{s => S}};
+                _Ts ->
+                    {error, ?RC_PACKET_IDENTIFIER_IN_USE}
+            end;
+        true ->
+            {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
+    end;
 publish(_PacketId, Msg, Session) ->
-    %% TODO: QoS2
     Result = emqx_broker:publish(Msg),
     {ok, Result, Session}.
 
+is_awaiting_full(#{s := S, props := Props}) ->
+    emqx_persistent_session_ds_state:n_awaiting_rel(S) >=
+        maps:get(max_awaiting_rel, Props, infinity).
+
+-spec expire(emqx_types:clientinfo(), session()) ->
+    {ok, [], timeout(), session()} | {ok, [], session()}.
+expire(ClientInfo, Session0 = #{props := Props}) ->
+    Session = #{s := S} = do_expire(ClientInfo, Session0),
+    case emqx_persistent_session_ds_state:n_awaiting_rel(S) of
+        0 ->
+            {ok, [], Session};
+        _ ->
+            AwaitRelTimeout = maps:get(await_rel_timeout, Props),
+            {ok, [], AwaitRelTimeout, Session}
+    end.
+
+do_expire(ClientInfo, Session = #{s := S0, props := Props}) ->
+    %% 1. Find expired packet IDs:
+    Now = erlang:system_time(millisecond),
+    AwaitRelTimeout = maps:get(await_rel_timeout, Props),
+    ExpiredPacketIds =
+        emqx_persistent_session_ds_state:fold_awaiting_rel(
+            fun(PacketId, Ts, Acc) ->
+                Age = Now - Ts,
+                case Age > AwaitRelTimeout of
+                    true ->
+                        [PacketId | Acc];
+                    false ->
+                        Acc
+                end
+            end,
+            [],
+            S0
+        ),
+    %% 2. Perform side effects:
+    _ = emqx_session_events:handle_event(ClientInfo, {expired_rel, length(ExpiredPacketIds)}),
+    %% 3. Update state:
+    S = lists:foldl(
+        fun emqx_persistent_session_ds_state:del_awaiting_rel/2,
+        S0,
+        ExpiredPacketIds
+    ),
+    Session#{s => S}.
+
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBACK
 %%--------------------------------------------------------------------
@@ -487,9 +541,14 @@ pubrec(PacketId, Session0) ->
 
 -spec pubrel(emqx_types:packet_id(), session()) ->
     {ok, session()} | {error, emqx_types:reason_code()}.
-pubrel(_PacketId, Session = #{}) ->
-    % TODO: stub
-    {ok, Session}.
+pubrel(PacketId, Session = #{s := S0}) ->
+    case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of
+        undefined ->
+            {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND};
+        _TS ->
+            S = emqx_persistent_session_ds_state:del_awaiting_rel(PacketId, S0),
+            {ok, Session#{s => S}}
+    end.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBCOMP
@@ -562,6 +621,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s :=
     S = emqx_persistent_session_ds_state:commit(S0),
     From ! Ref,
     {ok, [], Session#{s => S}};
+handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
+    expire(ClientInfo, Session);
 handle_timeout(_ClientInfo, Timeout, Session) ->
     ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
     {ok, [], Session}.

+ 57 - 6
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -34,10 +34,17 @@
 -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
 -export([get_peername/1, set_peername/2]).
 -export([new_id/1]).
--export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]).
+-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]).
 -export([get_seqno/2, put_seqno/3]).
 -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
 -export([get_subscriptions/1, put_subscription/4, del_subscription/3]).
+-export([
+    get_awaiting_rel/2,
+    put_awaiting_rel/3,
+    del_awaiting_rel/2,
+    fold_awaiting_rel/3,
+    n_awaiting_rel/1
+]).
 
 -export([make_session_iterator/0, session_iterator_next/2]).
 
@@ -117,7 +124,8 @@
     subscriptions := subscriptions(),
     seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()),
     streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()),
-    ranks := pmap(term(), integer())
+    ranks := pmap(term(), integer()),
+    awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer())
 }.
 
 -define(session_tab, emqx_ds_session_tab).
@@ -125,7 +133,8 @@
 -define(stream_tab, emqx_ds_session_streams).
 -define(seqno_tab, emqx_ds_session_seqnos).
 -define(rank_tab, emqx_ds_session_ranks).
--define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]).
+-define(awaiting_rel_tab, emqx_ds_session_awaiting_rel).
+-define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab, ?awaiting_rel_tab]).
 
 %% Enable this flag if you suspect some code breaks the sequence:
 -ifndef(CHECK_SEQNO).
@@ -167,6 +176,7 @@ open(SessionId) ->
                     streams => pmap_open(?stream_tab, SessionId),
                     seqnos => pmap_open(?seqno_tab, SessionId),
                     ranks => pmap_open(?rank_tab, SessionId),
+                    awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId),
                     ?unset_dirty
                 },
                 {ok, Rec};
@@ -190,7 +200,8 @@ format(#{
     subscriptions := SubsGBT,
     streams := Streams,
     seqnos := Seqnos,
-    ranks := Ranks
+    ranks := Ranks,
+    awaiting_rel := AwaitingRel
 }) ->
     Subs = emqx_topic_gbt:fold(
         fun(Key, Sub, Acc) ->
@@ -204,7 +215,8 @@ format(#{
         subscriptions => Subs,
         streams => pmap_format(Streams),
         seqnos => pmap_format(Seqnos),
-        ranks => pmap_format(Ranks)
+        ranks => pmap_format(Ranks),
+        awaiting_rel => pmap_format(AwaitingRel)
     }.
 
 -spec list_sessions() -> [emqx_persistent_session_ds:id()].
@@ -229,7 +241,8 @@ commit(
         metadata := Metadata,
         streams := Streams,
         seqnos := SeqNos,
-        ranks := Ranks
+        ranks := Ranks,
+        awaiting_rel := AwaitingRel
     }
 ) ->
     check_sequence(Rec),
@@ -239,6 +252,7 @@ commit(
             streams => pmap_commit(SessionId, Streams),
             seqnos => pmap_commit(SessionId, SeqNos),
             ranks => pmap_commit(SessionId, Ranks),
+            awaiting_rel => pmap_commit(SessionId, AwaitingRel),
             ?unset_dirty
         }
     end).
@@ -254,6 +268,7 @@ create_new(SessionId) ->
             streams => pmap_open(?stream_tab, SessionId),
             seqnos => pmap_open(?seqno_tab, SessionId),
             ranks => pmap_open(?rank_tab, SessionId),
+            awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId),
             ?set_dirty
         }
     end).
@@ -382,6 +397,10 @@ del_stream(Key, Rec) ->
 fold_streams(Fun, Acc, Rec) ->
     gen_fold(streams, Fun, Acc, Rec).
 
+-spec n_streams(t()) -> non_neg_integer().
+n_streams(Rec) ->
+    gen_size(streams, Rec).
+
 %%
 
 -spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined.
@@ -412,6 +431,30 @@ del_rank(Key, Rec) ->
 fold_ranks(Fun, Acc, Rec) ->
     gen_fold(ranks, Fun, Acc, Rec).
 
+%%
+
+-spec get_awaiting_rel(emqx_types:packet_id(), t()) -> integer() | undefined.
+get_awaiting_rel(Key, Rec) ->
+    gen_get(awaiting_rel, Key, Rec).
+
+-spec put_awaiting_rel(emqx_types:packet_id(), _Timestamp :: integer(), t()) -> t().
+put_awaiting_rel(Key, Val, Rec) ->
+    gen_put(awaiting_rel, Key, Val, Rec).
+
+-spec del_awaiting_rel(emqx_types:packet_id(), t()) -> t().
+del_awaiting_rel(Key, Rec) ->
+    gen_del(awaiting_rel, Key, Rec).
+
+-spec fold_awaiting_rel(fun(), Acc, t()) -> Acc.
+fold_awaiting_rel(Fun, Acc, Rec) ->
+    gen_fold(awaiting_rel, Fun, Acc, Rec).
+
+-spec n_awaiting_rel(t()) -> non_neg_integer().
+n_awaiting_rel(Rec) ->
+    gen_size(awaiting_rel, Rec).
+
+%%
+
 -spec make_session_iterator() -> session_iterator().
 make_session_iterator() ->
     mnesia:dirty_first(?session_tab).
@@ -475,6 +518,10 @@ gen_del(Field, Key, Rec) ->
         Rec#{?set_dirty}
     ).
 
+gen_size(Field, Rec) ->
+    check_sequence(Rec),
+    pmap_size(maps:get(Field, Rec)).
+
 %%
 
 read_subscriptions(SessionId) ->
@@ -547,6 +594,10 @@ pmap_commit(
 pmap_format(#pmap{cache = Cache}) ->
     Cache.
 
+-spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
+pmap_size(#pmap{cache = Cache}) ->
+    maps:size(Cache).
+
 %% Functions dealing with set tables:
 
 kv_persist(Tab, SessionId, Val0) ->

+ 9 - 1
apps/emqx/src/emqx_persistent_session_ds_subs.erl

@@ -24,7 +24,15 @@
 -module(emqx_persistent_session_ds_subs).
 
 %% API:
--export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]).
+-export([
+    on_subscribe/3,
+    on_unsubscribe/3,
+    gc/1,
+    lookup/2,
+    to_map/1,
+    fold/3,
+    fold_all/3
+]).
 
 -export_type([]).