Kaynağa Gözat

Ensure session expiration (#2825)

Ensure session expiration
Feng Lee 6 yıl önce
ebeveyn
işleme
c69a2b1b48

+ 2 - 0
include/emqx_mqtt.hrl

@@ -17,6 +17,8 @@
 -ifndef(EMQ_X_MQTT_HRL).
 -ifndef(EMQ_X_MQTT_HRL).
 -define(EMQ_X_MQTT_HRL, true).
 -define(EMQ_X_MQTT_HRL, true).
 
 
+-define(UINT_MAX, 16#FFFFFFFF).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% MQTT SockOpts
 %% MQTT SockOpts
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 41 - 11
src/emqx_channel.erl

@@ -74,6 +74,7 @@
           %% Connected
           %% Connected
           connected :: boolean(),
           connected :: boolean(),
           connected_at :: erlang:timestamp(),
           connected_at :: erlang:timestamp(),
+          disconnected_at :: erlang:timestamp(),
           %% Takeover/Resume
           %% Takeover/Resume
           resuming :: boolean(),
           resuming :: boolean(),
           pendings :: list()
           pendings :: list()
@@ -169,7 +170,9 @@ info(oom_policy, #channel{oom_policy = Policy}) ->
 info(connected, #channel{connected = Connected}) ->
 info(connected, #channel{connected = Connected}) ->
     Connected;
     Connected;
 info(connected_at, #channel{connected_at = ConnectedAt}) ->
 info(connected_at, #channel{connected_at = ConnectedAt}) ->
-    ConnectedAt.
+    ConnectedAt;
+info(disconnected_at, #channel{disconnected_at = DisconnectedAt}) ->
+    DisconnectedAt.
 
 
 -spec(attrs(channel()) -> emqx_types:attrs()).
 -spec(attrs(channel()) -> emqx_types:attrs()).
 attrs(#channel{client       = Client,
 attrs(#channel{client       = Client,
@@ -240,11 +243,12 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{pro
             ProtoVer = emqx_protocol:info(proto_ver, Protocol),
             ProtoVer = emqx_protocol:info(proto_ver, Protocol),
             ?LOG(warning, "Cannot publish message to ~s due to ~s",
             ?LOG(warning, "Cannot publish message to ~s due to ~s",
                  [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]),
                  [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]),
-            case QoS of
-                ?QOS_0 -> handle_out({puberr, ReasonCode}, NChannel);
-                ?QOS_1 -> handle_out({puback, PacketId, ReasonCode}, NChannel);
-                ?QOS_2 -> handle_out({pubrec, PacketId, ReasonCode}, NChannel)
-            end
+            handle_out({disconnect, ReasonCode}, NChannel)
+            % case QoS of
+            %     ?QOS_0 -> handle_out({puberr, ReasonCode}, NChannel);
+            %     ?QOS_1 -> handle_out({puback, PacketId, ReasonCode}, NChannel);
+            %     ?QOS_2 -> handle_out({pubrec, PacketId, ReasonCode}, NChannel)
+            % end
     end;
     end;
 
 
 %%TODO: How to handle the ReasonCode?
 %%TODO: How to handle the ReasonCode?
@@ -589,6 +593,18 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
     {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
     {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
     {ok, NChannel};
     {ok, NChannel};
 
 
+handle_info(sock_closed, Channel = #channel{connected = false}) ->
+    shutdown(closed, Channel);
+handle_info(sock_closed, Channel = #channel{session  = Session}) ->
+    Interval = emqx_session:info(expiry_interval, Session),
+    case Interval of
+        ?UINT_MAX ->
+            {ok, ensure_disconnected(Channel)};
+        Int when Int > 0 ->
+            {ok, ensure_timer(expire_timer, ensure_disconnected(Channel))};
+        _Other -> shutdown(closed, Channel)
+    end;
+
 handle_info(Info, Channel) ->
 handle_info(Info, Channel) ->
     ?LOG(error, "Unexpected info: ~p~n", [Info]),
     ?LOG(error, "Unexpected info: ~p~n", [Info]),
     {ok, Channel}.
     {ok, Channel}.
@@ -643,7 +659,7 @@ timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session,
     end;
     end;
 
 
 timeout(_TRef, expire_session, Channel) ->
 timeout(_TRef, expire_session, Channel) ->
-    {ok, Channel};
+    shutdown(expired, Channel);
 
 
 timeout(_TRef, Msg, Channel) ->
 timeout(_TRef, Msg, Channel) ->
     ?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
     ?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
@@ -685,7 +701,7 @@ interval(retry_timer, #channel{session = Session}) ->
 interval(await_timer, #channel{session = Session}) ->
 interval(await_timer, #channel{session = Session}) ->
     emqx_session:info(await_rel_timeout, Session);
     emqx_session:info(await_rel_timeout, Session);
 interval(expire_timer, #channel{session = Session}) ->
 interval(expire_timer, #channel{session = Session}) ->
-    emqx_session:info(expiry_interval, Session).
+    timer:seconds(emqx_session:info(expiry_interval, Session)).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Terminate
 %% Terminate
@@ -886,11 +902,19 @@ auth_connect(#mqtt_packet_connect{client_id = ClientId,
 
 
 open_session(#mqtt_packet_connect{clean_start = CleanStart,
 open_session(#mqtt_packet_connect{clean_start = CleanStart,
                                   properties  = ConnProps},
                                   properties  = ConnProps},
-             #channel{client = Client = #{zone := Zone}}) ->
+             #channel{client = Client = #{zone := Zone}, protocol = Protocol}) ->
     MaxInflight = get_property('Receive-Maximum', ConnProps,
     MaxInflight = get_property('Receive-Maximum', ConnProps,
                                emqx_zone:get_env(Zone, max_inflight, 65535)),
                                emqx_zone:get_env(Zone, max_inflight, 65535)),
-    Interval = get_property('Session-Expiry-Interval', ConnProps,
-                            emqx_zone:get_env(Zone, session_expiry_interval, 0)),
+    
+    Interval = 
+        case emqx_protocol:info(proto_ver, Protocol) of
+            ?MQTT_PROTO_V5 -> get_property('Session-Expiry-Interval', ConnProps, 0);
+            _ ->
+                case CleanStart of
+                    true -> 0;
+                    false -> emqx_zone:get_env(Zone, session_expiry_interval, 0)
+                end
+        end,
     emqx_cm:open_session(CleanStart, Client, #{max_inflight    => MaxInflight,
     emqx_cm:open_session(CleanStart, Client, #{max_inflight    => MaxInflight,
                                                expiry_interval => Interval
                                                expiry_interval => Interval
                                               }).
                                               }).
@@ -1034,6 +1058,9 @@ enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId},
 ensure_connected(Channel) ->
 ensure_connected(Channel) ->
     Channel#channel{connected = true, connected_at = os:timestamp()}.
     Channel#channel{connected = true, connected_at = os:timestamp()}.
 
 
+ensure_disconnected(Channel) ->
+    Channel#channel{connected = false, disconnected_at = os:timestamp()}.
+
 ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
 ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
     ensure_keepalive_timer(Interval, Channel);
     ensure_keepalive_timer(Interval, Channel);
 ensure_keepalive(_AckProp, Channel = #channel{protocol = Protocol}) ->
 ensure_keepalive(_AckProp, Channel = #channel{protocol = Protocol}) ->
@@ -1111,3 +1138,6 @@ sp(false) -> 0.
 flag(true)  -> 1;
 flag(true)  -> 1;
 flag(false) -> 0.
 flag(false) -> 0.
 
 
+shutdown(Reason, Channel) ->
+    {stop, {shutdown, Reason}, Channel}.
+

+ 5 - 7
src/emqx_connection.erl

@@ -341,15 +341,13 @@ handle(info, {Error, _Sock, Reason}, State)
   when Error == tcp_error; Error == ssl_error ->
   when Error == tcp_error; Error == ssl_error ->
     shutdown(Reason, State);
     shutdown(Reason, State);
 
 
-%%TODO: fixme later.
 handle(info, {Closed, _Sock}, State = #state{chan_state = ChanState})
 handle(info, {Closed, _Sock}, State = #state{chan_state = ChanState})
   when Closed == tcp_closed; Closed == ssl_closed ->
   when Closed == tcp_closed; Closed == ssl_closed ->
-    case emqx_channel:info(protocol, ChanState) of
-        undefined -> shutdown(closed, State);
-        #{clean_start := true} ->
-            shutdown(closed, State);
-        #{clean_start := false} ->
-            {next_state, disconnected, State}
+    case emqx_channel:handle_info(sock_closed, ChanState) of
+        {ok, NChanState} ->
+            {next_state, disconnected, State#state{chan_state = NChanState}};
+        {stop, Reason, NChanState} ->
+            stop(Reason, State#state{chan_state = NChanState})
     end;
     end;
 
 
 handle(info, {Passive, _Sock}, State) when Passive == tcp_passive;
 handle(info, {Passive, _Sock}, State) when Passive == tcp_passive;

+ 1 - 1
src/emqx_session.erl

@@ -214,7 +214,7 @@ info(max_awaiting_rel, #session{max_awaiting_rel = MaxAwaitingRel}) ->
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
 info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
     Timeout;
     Timeout;
 info(expiry_interval, #session{expiry_interval = Interval}) ->
 info(expiry_interval, #session{expiry_interval = Interval}) ->
-    Interval div 1000;
+    Interval;
 info(created_at, #session{created_at = CreatedAt}) ->
 info(created_at, #session{created_at = CreatedAt}) ->
     CreatedAt.
     CreatedAt.
 
 

+ 4 - 20
test/emqx_session_SUITE.erl

@@ -73,12 +73,12 @@ apply_ops(Session, [Op | Rest]) ->
 apply_op(Session, info) ->
 apply_op(Session, info) ->
     Info = emqx_session:info(Session),
     Info = emqx_session:info(Session),
     ?assert(is_map(Info)),
     ?assert(is_map(Info)),
-    ?assertEqual(16, maps:size(Info)),
+    ?assertEqual(15, maps:size(Info)),
     Session;
     Session;
 apply_op(Session, attrs) ->
 apply_op(Session, attrs) ->
     Attrs = emqx_session:attrs(Session),
     Attrs = emqx_session:attrs(Session),
     ?assert(is_map(Attrs)),
     ?assert(is_map(Attrs)),
-    ?assertEqual(3, maps:size(Attrs)),
+    ?assertEqual(2, maps:size(Attrs)),
     Session;
     Session;
 apply_op(Session, stats) ->
 apply_op(Session, stats) ->
     Stats = emqx_session:stats(Session),
     Stats = emqx_session:stats(Session),
@@ -145,14 +145,7 @@ apply_op(Session, {pubcomp, PacketId}) ->
     end;
     end;
 apply_op(Session, {deliver, Delivers}) ->
 apply_op(Session, {deliver, Delivers}) ->
     {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session),
     {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session),
-    NSession;
-apply_op(Session, {timeout, {TRef, TimeoutMsg}}) ->
-    case emqx_session:timeout(TRef, TimeoutMsg, Session) of
-        {ok, NSession} ->
-            NSession;
-        {ok, _Msg, NSession} ->
-            NSession
-    end.
+    NSession.
 
 
 %%%%%%%%%%%%%%%%%%
 %%%%%%%%%%%%%%%%%%
 %%% Generators %%%
 %%% Generators %%%
@@ -169,17 +162,13 @@ session_op_list() ->
              {pubrec, pubrec_args()},
              {pubrec, pubrec_args()},
              {pubrel, pubrel_args()},
              {pubrel, pubrel_args()},
              {pubcomp, pubcomp_args()},
              {pubcomp, pubcomp_args()},
-             {deliver, deliver_args()},
-             {timeout, timeout_args()}
+             {deliver, deliver_args()}
             ],
             ],
     list(?LAZY(oneof(Union))).
     list(?LAZY(oneof(Union))).
 
 
 deliver_args() ->
 deliver_args() ->
     list({deliver, topic(), message()}).
     list({deliver, topic(), message()}).
 
 
-timeout_args() ->
-    {tref(), timeout_msg()}.
-
 info_args() ->
 info_args() ->
     oneof([subscriptions,
     oneof([subscriptions,
            max_subscriptions,
            max_subscriptions,
@@ -225,11 +214,6 @@ pubrel_args() ->
 pubcomp_args() ->
 pubcomp_args() ->
     packetid().
     packetid().
 
 
-timeout_msg() ->
-    oneof([retry_delivery, check_awaiting_rel]).
-
-tref() -> oneof([tref, undefined]).
-
 sub_opts() ->
 sub_opts() ->
     ?LET({RH, RAP, NL, QOS, SHARE, SUBID},
     ?LET({RH, RAP, NL, QOS, SHARE, SUBID},
          {rh(), rap(), nl(), qos(), share(), subid()}
          {rh(), rap(), nl(), qos(), share(), subid()}