Browse Source

Merge pull request #12019 from ieQu1/ds-clean-start

Discard sessions when client connects with clean start = true
ieQu1 2 năm trước cách đây
mục cha
commit
2d4b9a7b9d

+ 51 - 15
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -70,6 +70,8 @@
     do_ensure_all_iterators_closed/1
     do_ensure_all_iterators_closed/1
 ]).
 ]).
 
 
+-export([print_session/1]).
+
 -ifdef(TEST).
 -ifdef(TEST).
 -export([
 -export([
     session_open/1,
     session_open/1,
@@ -142,13 +144,19 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
     %% somehow isolate those idling not-yet-expired sessions into a separate process
     %% somehow isolate those idling not-yet-expired sessions into a separate process
     %% space, and move this call back into `emqx_cm` where it belongs.
     %% space, and move this call back into `emqx_cm` where it belongs.
     ok = emqx_cm:discard_session(ClientID),
     ok = emqx_cm:discard_session(ClientID),
-    case session_open(ClientID) of
-        Session0 = #{} ->
-            ensure_timers(),
-            ReceiveMaximum = receive_maximum(ConnInfo),
-            Session = Session0#{receive_maximum => ReceiveMaximum},
-            {true, Session, []};
+    case maps:get(clean_start, ConnInfo, false) of
         false ->
         false ->
+            case session_open(ClientID) of
+                Session0 = #{} ->
+                    ensure_timers(),
+                    ReceiveMaximum = receive_maximum(ConnInfo),
+                    Session = Session0#{receive_maximum => ReceiveMaximum},
+                    {true, Session, []};
+                false ->
+                    false
+            end;
+        true ->
+            session_drop(ClientID),
             false
             false
     end.
     end.
 
 
@@ -220,6 +228,25 @@ info(await_rel_timeout, #{props := Conf}) ->
 stats(Session) ->
 stats(Session) ->
     info(?STATS_KEYS, Session).
     info(?STATS_KEYS, Session).
 
 
+%% Debug/troubleshooting
+-spec print_session(emqx_types:client_id()) -> map() | undefined.
+print_session(ClientId) ->
+    catch ro_transaction(
+        fun() ->
+            case mnesia:read(?SESSION_TAB, ClientId) of
+                [Session] ->
+                    #{
+                        session => Session,
+                        streams => mnesia:read(?SESSION_STREAM_TAB, ClientId),
+                        pubranges => session_read_pubranges(ClientId),
+                        subscriptions => session_read_subscriptions(ClientId)
+                    };
+                [] ->
+                    undefined
+            end
+        end
+    ).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
 %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -557,7 +584,7 @@ session_drop(DSSessionId) ->
 
 
 -spec session_drop_subscriptions(id()) -> ok.
 -spec session_drop_subscriptions(id()) -> ok.
 session_drop_subscriptions(DSSessionId) ->
 session_drop_subscriptions(DSSessionId) ->
-    Subscriptions = session_read_subscriptions(DSSessionId),
+    Subscriptions = session_read_subscriptions(DSSessionId, write),
     lists:foreach(
     lists:foreach(
         fun(#ds_sub{id = DSSubId} = DSSub) ->
         fun(#ds_sub{id = DSSubId} = DSSub) ->
             TopicFilter = subscription_id_to_topic_filter(DSSubId),
             TopicFilter = subscription_id_to_topic_filter(DSSubId),
@@ -620,13 +647,27 @@ session_del_subscription(DSSessionId, TopicFilter) ->
 session_del_subscription(#ds_sub{id = DSSubId}) ->
 session_del_subscription(#ds_sub{id = DSSubId}) ->
     mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write).
     mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write).
 
 
-session_read_subscriptions(DSSessionId) ->
+session_read_subscriptions(DSSessionID) ->
+    session_read_subscriptions(DSSessionID, read).
+
+session_read_subscriptions(DSSessionId, LockKind) ->
     MS = ets:fun2ms(
     MS = ets:fun2ms(
         fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId ->
         fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId ->
             Sub
             Sub
         end
         end
     ),
     ),
-    mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read).
+    mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, LockKind).
+
+session_read_pubranges(DSSessionID) ->
+    session_read_pubranges(DSSessionID, read).
+
+session_read_pubranges(DSSessionId, LockKind) ->
+    MS = ets:fun2ms(
+        fun(#ds_pubrange{id = {Sess, First}}) when Sess =:= DSSessionId ->
+            {DSSessionId, First}
+        end
+    ),
+    mnesia:select(?SESSION_PUBRANGE_TAB, MS, LockKind).
 
 
 -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}.
 -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}.
 new_subscription_id(DSSessionId, TopicFilter) ->
 new_subscription_id(DSSessionId, TopicFilter) ->
@@ -729,12 +770,7 @@ session_drop_streams(DSSessionId) ->
 %% must be called inside a transaction
 %% must be called inside a transaction
 -spec session_drop_pubranges(id()) -> ok.
 -spec session_drop_pubranges(id()) -> ok.
 session_drop_pubranges(DSSessionId) ->
 session_drop_pubranges(DSSessionId) ->
-    MS = ets:fun2ms(
-        fun(#ds_pubrange{id = {DSSessionId0, First}}) when DSSessionId0 =:= DSSessionId ->
-            {DSSessionId, First}
-        end
-    ),
-    RangeIds = mnesia:select(?SESSION_PUBRANGE_TAB, MS, write),
+    RangeIds = session_read_pubranges(DSSessionId, write),
     lists:foreach(
     lists:foreach(
         fun(RangeId) ->
         fun(RangeId) ->
             mnesia:delete(?SESSION_PUBRANGE_TAB, RangeId, write)
             mnesia:delete(?SESSION_PUBRANGE_TAB, RangeId, write)

+ 3 - 1
apps/emqx/src/emqx_shared_sub.erl

@@ -242,7 +242,9 @@ with_redispatch_to(Msg, Group, Topic) ->
 is_redispatch_needed(#message{qos = ?QOS_0}) ->
 is_redispatch_needed(#message{qos = ?QOS_0}) ->
     false;
     false;
 is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) ->
 is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) ->
-    true.
+    true;
+is_redispatch_needed(#message{}) ->
+    false.
 
 
 %% @doc Redispatch shared deliveries to other members in the group.
 %% @doc Redispatch shared deliveries to other members in the group.
 redispatch(Messages0) ->
 redispatch(Messages0) ->

+ 6 - 8
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -745,9 +745,6 @@ t_publish_while_client_is_gone(Config) ->
 
 
     ok = emqtt:disconnect(Client2).
     ok = emqtt:disconnect(Client2).
 
 
-%% TODO: don't skip after QoS2 support is added to DS.
-t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config);
-t_clean_start_drops_subscriptions('end', _Config) -> ok.
 t_clean_start_drops_subscriptions(Config) ->
 t_clean_start_drops_subscriptions(Config) ->
     %% 1. A persistent session is started and disconnected.
     %% 1. A persistent session is started and disconnected.
     %% 2. While disconnected, a message is published and persisted.
     %% 2. While disconnected, a message is published and persisted.
@@ -773,13 +770,13 @@ t_clean_start_drops_subscriptions(Config) ->
         | Config
         | Config
     ]),
     ]),
     {ok, _} = emqtt:ConnFun(Client1),
     {ok, _} = emqtt:ConnFun(Client1),
-    {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
+    {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
 
 
     ok = emqtt:disconnect(Client1),
     ok = emqtt:disconnect(Client1),
     maybe_kill_connection_process(ClientId, Config),
     maybe_kill_connection_process(ClientId, Config),
 
 
     %% 2.
     %% 2.
-    ok = publish(Topic, Payload1),
+    ok = publish(Topic, Payload1, ?QOS_1),
 
 
     %% 3.
     %% 3.
     {ok, Client2} = emqtt:start_link([
     {ok, Client2} = emqtt:start_link([
@@ -791,9 +788,10 @@ t_clean_start_drops_subscriptions(Config) ->
     ]),
     ]),
     {ok, _} = emqtt:ConnFun(Client2),
     {ok, _} = emqtt:ConnFun(Client2),
     ?assertEqual(0, client_info(session_present, Client2)),
     ?assertEqual(0, client_info(session_present, Client2)),
-    {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
+    {ok, _, [1]} = emqtt:subscribe(Client2, STopic, qos1),
 
 
-    ok = publish(Topic, Payload2),
+    timer:sleep(100),
+    ok = publish(Topic, Payload2, ?QOS_1),
     [Msg1] = receive_messages(1),
     [Msg1] = receive_messages(1),
     ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
     ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
 
 
@@ -810,7 +808,7 @@ t_clean_start_drops_subscriptions(Config) ->
     ]),
     ]),
     {ok, _} = emqtt:ConnFun(Client3),
     {ok, _} = emqtt:ConnFun(Client3),
 
 
-    ok = publish(Topic, Payload3),
+    ok = publish(Topic, Payload3, ?QOS_1),
     [Msg2] = receive_messages(1),
     [Msg2] = receive_messages(1),
     ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
     ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),