Explorar o código

fix(sessds): Delete the routes when the session expires

ieQu1 %!s(int64=2) %!d(string=hai) anos
pai
achega
963df8f941

+ 19 - 13
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -85,6 +85,7 @@ end_per_testcase(TestCase, Config) when
     Nodes = ?config(nodes, Config),
     emqx_common_test_helpers:call_janitor(60_000),
     ok = emqx_cth_cluster:stop(Nodes),
+    snabbkaffe:stop(),
     ok;
 end_per_testcase(_TestCase, _Config) ->
     emqx_common_test_helpers:call_janitor(60_000),
@@ -164,10 +165,19 @@ is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}})
     EI > 0.
 
 list_all_sessions(Node) ->
-    erpc:call(Node, emqx_persistent_session_ds, list_all_sessions, []).
+    erpc:call(Node, emqx_persistent_session_ds_state, list_sessions, []).
 
 list_all_subscriptions(Node) ->
-    erpc:call(Node, emqx_persistent_session_ds, list_all_subscriptions, []).
+    Sessions = list_all_sessions(Node),
+    lists:flatmap(
+        fun(ClientId) ->
+            #{s := #{subscriptions := Subs}} = erpc:call(
+                Node, emqx_persistent_session_ds, print_session, [ClientId]
+            ),
+            maps:to_list(Subs)
+        end,
+        Sessions
+    ).
 
 list_all_pubranges(Node) ->
     erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
@@ -485,7 +495,7 @@ do_t_session_expiration(_Config, Opts) ->
             Client0 = start_client(Params0),
             {ok, _} = emqtt:connect(Client0),
             {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, Topic, ?QOS_2),
-            #{subscriptions := Subs0} = emqx_persistent_session_ds:print_session(ClientId),
+            #{s := #{subscriptions := Subs0}} = emqx_persistent_session_ds:print_session(ClientId),
             ?assertEqual(1, map_size(Subs0), #{subs => Subs0}),
             Info0 = maps:from_list(emqtt:info(Client0)),
             ?assertEqual(0, maps:get(session_present, Info0), #{info => Info0}),
@@ -512,7 +522,8 @@ do_t_session_expiration(_Config, Opts) ->
             emqtt:publish(Client2, Topic, <<"payload">>),
             ?assertNotReceive({publish, #{topic := Topic}}),
             %% ensure subscriptions are absent from table.
-            ?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()),
+            #{s := #{subscriptions := Subs3}} = emqx_persistent_session_ds:print_session(ClientId),
+            ?assertEqual([], maps:to_list(Subs3)),
             emqtt:disconnect(Client2, ?RC_NORMAL_DISCONNECTION, ThirdDisconn),
 
             ok
@@ -580,10 +591,8 @@ t_session_gc(Config) ->
             ),
             ?assertMatch({ok, _}, Res0),
             {ok, #{?snk_meta := #{time := T0}}} = Res0,
-            Sessions0 = list_all_sessions(Node1),
-            Subs0 = list_all_subscriptions(Node1),
-            ?assertEqual(3, map_size(Sessions0), #{sessions => Sessions0}),
-            ?assertEqual(3, map_size(Subs0), #{subs => Subs0}),
+            ?assertMatch([_, _, _], list_all_sessions(Node1), sessions),
+            ?assertMatch([_, _, _], list_all_subscriptions(Node1), subscriptions),
 
             %% Now we disconnect 2 of them; only those should be GC'ed.
             ?assertMatch(
@@ -628,11 +637,8 @@ t_session_gc(Config) ->
                     4 * GCInterval + 1_000
                 )
             ),
-            Sessions1 = list_all_sessions(Node1),
-            Subs1 = list_all_subscriptions(Node1),
-            ?assertEqual(1, map_size(Sessions1), #{sessions => Sessions1}),
-            ?assertEqual(1, map_size(Subs1), #{subs => Subs1}),
-
+            ?assertMatch([_], list_all_sessions(Node1), sessions),
+            ?assertMatch([_], list_all_subscriptions(Node1), subscriptions),
             ok
         end,
         [

+ 41 - 24
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -20,7 +20,7 @@
 
 -include("emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
 
 -include("emqx_mqtt.hrl").
@@ -188,7 +188,7 @@ destroy(#{clientid := ClientID}) ->
     destroy_session(ClientID).
 
 destroy_session(ClientID) ->
-    session_drop(ClientID).
+    session_drop(ClientID, destroy).
 
 %%--------------------------------------------------------------------
 %% Info, Stats
@@ -321,19 +321,28 @@ unsubscribe(
     Session = #{id := ID, s := S0}
 ) ->
     case subs_lookup(TopicFilter, S0) of
-        #{props := SubOpts, id := SubId} ->
-            S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
-            S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1),
-            ?tp_span(
-                persistent_session_ds_subscription_route_delete,
-                #{session_id => ID},
-                ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, ID)
-            ),
-            {ok, Session#{s => S}, SubOpts};
         undefined ->
-            {error, ?RC_NO_SUBSCRIPTION_EXISTED}
+            {error, ?RC_NO_SUBSCRIPTION_EXISTED};
+        Subscription = #{props := SubOpts} ->
+            S = do_unsubscribe(ID, TopicFilter, Subscription, S0),
+            {ok, Session#{s => S}, SubOpts}
     end.
 
+-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
+    emqx_persistent_session_ds_state:t().
+do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) ->
+    ?tp(persistent_session_ds_subscription_delete, #{
+        session_id => SessionId, topic_filter => TopicFilter
+    }),
+    S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
+    S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1),
+    ?tp_span(
+        persistent_session_ds_subscription_route_delete,
+        #{session_id => SessionId, topic_filter => TopicFilter},
+        ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
+    ),
+    S.
+
 -spec get_subscription(topic_filter(), session()) ->
     emqx_types:subopts() | undefined.
 get_subscription(TopicFilter, #{s := S}) ->
@@ -534,12 +543,6 @@ sync(ClientId) ->
             {error, noproc}
     end.
 
--define(IS_EXPIRED(NOW_MS, LAST_ALIVE_AT, EI),
-    (is_number(LAST_ALIVE_AT) andalso
-        is_number(EI) andalso
-        (NOW_MS >= LAST_ALIVE_AT + EI))
-).
-
 %% @doc Called when a client connects. This function looks up a
 %% session or returns `false` if previous one couldn't be found.
 %%
@@ -553,11 +556,12 @@ session_open(SessionId, NewConnInfo) ->
         {ok, S0} ->
             EI = expiry_interval(emqx_persistent_session_ds_state:get_conninfo(S0)),
             LastAliveAt = emqx_persistent_session_ds_state:get_last_alive_at(S0),
-            case ?IS_EXPIRED(NowMS, LastAliveAt, EI) of
+            case NowMS >= LastAliveAt + EI of
                 true ->
-                    emqx_persistent_session_ds_state:delete(SessionId),
+                    session_drop(SessionId, expired),
                     false;
                 false ->
+                    ?tp(open_session, #{ei => EI, now => NowMS, laa => LastAliveAt}),
                     %% New connection being established
                     S1 = emqx_persistent_session_ds_state:set_conninfo(NewConnInfo, S0),
                     S2 = emqx_persistent_session_ds_state:set_last_alive_at(NowMS, S1),
@@ -608,9 +612,22 @@ session_ensure_new(Id, ConnInfo, Conf) ->
 
 %% @doc Called when a client reconnects with `clean session=true' or
 %% during session GC
--spec session_drop(id()) -> ok.
-session_drop(ID) ->
-    emqx_persistent_session_ds_state:delete(ID).
+-spec session_drop(id(), _Reason) -> ok.
+session_drop(ID, Reason) ->
+    case emqx_persistent_session_ds_state:open(ID) of
+        {ok, S0} ->
+            ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}),
+            _S = subs_fold(
+                fun(TopicFilter, Subscription, S) ->
+                    do_unsubscribe(ID, TopicFilter, Subscription, S)
+                end,
+                S0,
+                S0
+            ),
+            emqx_persistent_session_ds_state:delete(ID);
+        undefined ->
+            ok
+    end.
 
 now_ms() ->
     erlang:system_time(millisecond).
@@ -1083,7 +1100,7 @@ seqno_proper_test_() ->
             end,
             [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}}.
 
-apply_n_times(0, Fun, A) ->
+apply_n_times(0, _Fun, A) ->
     A;
 apply_n_times(N, Fun, A) when N > 0 ->
     apply_n_times(N - 1, Fun, Fun(A)).

+ 11 - 12
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -159,7 +159,7 @@ open(SessionId) ->
                     streams => pmap_open(?stream_tab, SessionId),
                     seqnos => pmap_open(?seqno_tab, SessionId),
                     ranks => pmap_open(?rank_tab, SessionId),
-                    dirty => false
+                    ?unset_dirty
                 },
                 {ok, Rec};
             [] ->
@@ -222,17 +222,16 @@ commit(
         ranks := Ranks
     }
 ) ->
-    check_sequence(
-        transaction(fun() ->
-            kv_persist(?session_tab, SessionId, Metadata),
-            Rec#{
-                streams => pmap_commit(SessionId, Streams),
-                seqnos => pmap_commit(SessionId, SeqNos),
-                ranks => pmap_commit(SessionId, Ranks),
-                ?unset_dirty
-            }
-        end)
-    ).
+    check_sequence(Rec),
+    transaction(fun() ->
+        kv_persist(?session_tab, SessionId, Metadata),
+        Rec#{
+            streams => pmap_commit(SessionId, Streams),
+            seqnos => pmap_commit(SessionId, SeqNos),
+            ranks => pmap_commit(SessionId, Ranks),
+            ?unset_dirty
+        }
+    end).
 
 -spec create_new(emqx_persistent_session_ds:id()) -> t().
 create_new(SessionId) ->

+ 1 - 25
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -216,31 +216,7 @@ t_session_subscription_iterators(Config) ->
                 messages => [Message1, Message2, Message3, Message4]
             }
         end,
-        fun(Trace) ->
-            ct:pal("trace:\n  ~p", [Trace]),
-            case ?of_kind(ds_session_subscription_added, Trace) of
-                [] ->
-                    %% Since `emqx_durable_storage' is a dependency of `emqx', it gets
-                    %% compiled in "prod" mode when running emqx standalone tests.
-                    ok;
-                [_ | _] ->
-                    ?assertMatch(
-                        [
-                            #{?snk_kind := ds_session_subscription_added},
-                            #{?snk_kind := ds_session_subscription_present}
-                        ],
-                        ?of_kind(
-                            [
-                                ds_session_subscription_added,
-                                ds_session_subscription_present
-                            ],
-                            Trace
-                        )
-                    ),
-                    ok
-            end,
-            ok
-        end
+        []
     ),
     ok.
 

+ 2 - 2
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -594,7 +594,7 @@ fields("node") ->
             sc(
                 hoconsc:enum([gen_rpc, distr]),
                 #{
-                    mapping => "mria.shardp_transport",
+                    mapping => "mria.shard_transport",
                     importance => ?IMPORTANCE_HIDDEN,
                     default => distr,
                     desc => ?DESC(db_default_shard_transport)