Browse Source

Merge pull request #1889 from emqx/improve_connect

Improve will message
turtleDeng 7 years ago
parent
commit
41b79e4f99
7 changed files with 249 additions and 82 deletions
  1. 24 24
      src/emqx_protocol.erl
  2. 62 33
      src/emqx_session.erl
  3. 10 15
      src/emqx_sm.erl
  4. 16 0
      test/emqx_SUITE.erl
  5. 9 8
      test/emqx_mock_client.erl
  6. 119 0
      test/emqx_protocol_SUITE.erl
  7. 9 2
      test/emqx_sm_SUITE.erl

+ 24 - 24
src/emqx_protocol.erl

@@ -144,7 +144,6 @@ attrs(#pstate{zone         = Zone,
               proto_ver    = ProtoVer,
               proto_ver    = ProtoVer,
               proto_name   = ProtoName,
               proto_name   = ProtoName,
               keepalive    = Keepalive,
               keepalive    = Keepalive,
-              will_topic   = WillTopic,
               mountpoint   = Mountpoint,
               mountpoint   = Mountpoint,
               is_super     = IsSuper,
               is_super     = IsSuper,
               is_bridge    = IsBridge,
               is_bridge    = IsBridge,
@@ -158,7 +157,6 @@ attrs(#pstate{zone         = Zone,
      {proto_name, ProtoName},
      {proto_name, ProtoName},
      {clean_start, CleanStart},
      {clean_start, CleanStart},
      {keepalive, Keepalive},
      {keepalive, Keepalive},
-     {will_topic, WillTopic},
      {mountpoint, Mountpoint},
      {mountpoint, Mountpoint},
      {is_super, IsSuper},
      {is_super, IsSuper},
      {is_bridge, IsBridge},
      {is_bridge, IsBridge},
@@ -286,14 +284,13 @@ process_packet(?CONNECT_PACKET(
                                        clean_start = CleanStart,
                                        clean_start = CleanStart,
                                        keepalive   = Keepalive,
                                        keepalive   = Keepalive,
                                        properties  = ConnProps,
                                        properties  = ConnProps,
-                                       will_topic  = WillTopic,
                                        client_id   = ClientId,
                                        client_id   = ClientId,
                                        username    = Username,
                                        username    = Username,
                                        password    = Password} = Connect), PState) ->
                                        password    = Password} = Connect), PState) ->
 
 
     %% TODO: Mountpoint...
     %% TODO: Mountpoint...
     %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
     %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
-    WillMsg = emqx_packet:will_msg(Connect),
+    WillMsg = make_will_msg(Connect),
 
 
     PState1 = set_username(Username,
     PState1 = set_username(Username,
                            PState#pstate{client_id    = ClientId,
                            PState#pstate{client_id    = ClientId,
@@ -302,7 +299,6 @@ process_packet(?CONNECT_PACKET(
                                          clean_start  = CleanStart,
                                          clean_start  = CleanStart,
                                          keepalive    = Keepalive,
                                          keepalive    = Keepalive,
                                          conn_props   = ConnProps,
                                          conn_props   = ConnProps,
-                                         will_topic   = WillTopic,
                                          will_msg     = WillMsg,
                                          will_msg     = WillMsg,
                                          is_bridge    = IsBridge,
                                          is_bridge    = IsBridge,
                                          connected_at = os:timestamp()}),
                                          connected_at = os:timestamp()}),
@@ -527,7 +523,6 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
               'Wildcard-Subscription-Available' => flag(Wildcard),
               'Wildcard-Subscription-Available' => flag(Wildcard),
               'Subscription-Identifier-Available' => 1,
               'Subscription-Identifier-Available' => 1,
               'Response-Information' => ResponseInformation,
               'Response-Information' => ResponseInformation,
-
               'Shared-Subscription-Available' => flag(Shared)},
               'Shared-Subscription-Available' => flag(Shared)},
 
 
     Props1 = if
     Props1 = if
@@ -616,14 +611,16 @@ try_open_session(PState = #pstate{zone        = Zone,
                                   client_id   = ClientId,
                                   client_id   = ClientId,
                                   conn_pid    = ConnPid,
                                   conn_pid    = ConnPid,
                                   username    = Username,
                                   username    = Username,
-                                  clean_start = CleanStart}) ->
+                                  clean_start = CleanStart,
+                                  will_msg    = WillMsg}) ->
 
 
     SessAttrs = #{
     SessAttrs = #{
         zone        => Zone,
         zone        => Zone,
         client_id   => ClientId,
         client_id   => ClientId,
         conn_pid    => ConnPid,
         conn_pid    => ConnPid,
         username    => Username,
         username    => Username,
-        clean_start => CleanStart
+        clean_start => CleanStart,
+        will_msg    => WillMsg
     },
     },
 
 
     SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
     SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
@@ -636,14 +633,14 @@ try_open_session(PState = #pstate{zone        = Zone,
 set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
 set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
     maps:put(max_inflight, if
     maps:put(max_inflight, if
                                ProtoVer =:= ?MQTT_PROTO_V5 ->
                                ProtoVer =:= ?MQTT_PROTO_V5 ->
-                                   maps:get('Receive-Maximum', ConnProps, 65535);
+                                   get_property('Receive-Maximum', ConnProps, 65535);
                                true ->
                                true ->
                                    emqx_zone:get_env(Zone, max_inflight, 65535)
                                    emqx_zone:get_env(Zone, max_inflight, 65535)
                            end, SessAttrs);
                            end, SessAttrs);
 set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
 set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
     maps:put(expiry_interval, if
     maps:put(expiry_interval, if
                                ProtoVer =:= ?MQTT_PROTO_V5 ->
                                ProtoVer =:= ?MQTT_PROTO_V5 ->
-                                   maps:get('Session-Expiry-Interval', ConnProps, 0);
+                                   get_property('Session-Expiry-Interval', ConnProps, 0);
                                true ->
                                true ->
                                    case CleanStart of
                                    case CleanStart of
                                        true -> 0;
                                        true -> 0;
@@ -654,7 +651,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c
 set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
 set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
     maps:put(topic_alias_maximum, if
     maps:put(topic_alias_maximum, if
                                     ProtoVer =:= ?MQTT_PROTO_V5 ->
                                     ProtoVer =:= ?MQTT_PROTO_V5 ->
-                                        maps:get('Topic-Alias-Maximum', ConnProps, 0);
+                                        get_property('Topic-Alias-Maximum', ConnProps, 0);
                                     true ->
                                     true ->
                                         emqx_zone:get_env(Zone, max_topic_alias, 0)
                                         emqx_zone:get_env(Zone, max_topic_alias, 0)
                                   end, SessAttrs);
                                   end, SessAttrs);
@@ -678,6 +675,21 @@ set_property(Name, Value, ?NO_PROPS) ->
 set_property(Name, Value, Props) ->
 set_property(Name, Value, Props) ->
     Props#{Name => Value}.
     Props#{Name => Value}.
 
 
+get_property(_Name, undefined, Default) ->
+    Default;
+get_property(Name, Props, Default) ->
+    maps:get(Name, Props, Default).
+
+make_will_msg(#mqtt_packet_connect{proto_ver   = ProtoVer,
+                                   will_props  = WillProps} = Connect) -> 
+    emqx_packet:will_msg(if 
+                             ProtoVer =:= ?MQTT_PROTO_V5 ->
+                                 WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
+                                 Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
+                             true -> 
+                                 Connect
+                         end).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Check Packet
 %% Check Packet
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -816,23 +828,11 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
                                                      Reason =:= discard ->
                                                      Reason =:= discard ->
     emqx_cm:unregister_connection(ClientId);
     emqx_cm:unregister_connection(ClientId);
 shutdown(Reason, PState = #pstate{connected = true,
 shutdown(Reason, PState = #pstate{connected = true,
-                                  client_id = ClientId,
-                                  will_msg  = WillMsg}) ->
+                                  client_id = ClientId}) ->
     ?LOG(info, "Shutdown for ~p", [Reason], PState),
     ?LOG(info, "Shutdown for ~p", [Reason], PState),
-    _ = send_willmsg(WillMsg),
     emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
     emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
     emqx_cm:unregister_connection(ClientId).
     emqx_cm:unregister_connection(ClientId).
 
 
-send_willmsg(undefined) ->
-    ignore;
-send_willmsg(WillMsg = #message{topic = Topic,
-                                headers = #{'Will-Delay-Interval' := Interval}})
-            when is_integer(Interval), Interval > 0 ->
-    SendAfter = integer_to_binary(Interval),
-    emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>});
-send_willmsg(WillMsg) ->
-    emqx_broker:publish(WillMsg).
-
 start_keepalive(0, _PState) ->
 start_keepalive(0, _PState) ->
     ignore;
     ignore;
 start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
 start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->

+ 62 - 33
src/emqx_session.erl

@@ -47,7 +47,7 @@
 -export([info/1, attrs/1]).
 -export([info/1, attrs/1]).
 -export([stats/1]).
 -export([stats/1]).
 -export([resume/2, discard/2]).
 -export([resume/2, discard/2]).
--export([update_expiry_interval/2, update_misc/2]).
+-export([update_expiry_interval/2]).
 -export([subscribe/2, subscribe/4]).
 -export([subscribe/2, subscribe/4]).
 -export([publish/3]).
 -export([publish/3]).
 -export([puback/2, puback/3]).
 -export([puback/2, puback/3]).
@@ -147,7 +147,11 @@
           %% Created at
           %% Created at
           created_at :: erlang:timestamp(),
           created_at :: erlang:timestamp(),
 
 
-          topic_alias_maximum :: pos_integer()
+          topic_alias_maximum :: pos_integer(),
+
+          will_msg :: emqx:message(),
+
+          will_delay_timer :: reference() | undefined
          }).
          }).
 
 
 -type(spid() :: pid()).
 -type(spid() :: pid()).
@@ -307,9 +311,9 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
     UnsubReq = {PacketId, Properties, TopicFilters},
     UnsubReq = {PacketId, Properties, TopicFilters},
     gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
     gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
 
 
--spec(resume(spid(), pid()) -> ok).
-resume(SPid, ConnPid) ->
-    gen_server:cast(SPid, {resume, ConnPid}).
+-spec(resume(spid(), map()) -> ok).
+resume(SPid, SessAttrs) ->
+    gen_server:cast(SPid, {resume, SessAttrs}).
 
 
 %% @doc Discard the session
 %% @doc Discard the session
 -spec(discard(spid(), ByPid :: pid()) -> ok).
 -spec(discard(spid(), ByPid :: pid()) -> ok).
@@ -320,9 +324,6 @@ discard(SPid, ByPid) ->
 update_expiry_interval(SPid, Interval) ->
 update_expiry_interval(SPid, Interval) ->
     gen_server:cast(SPid, {expiry_interval, Interval}).
     gen_server:cast(SPid, {expiry_interval, Interval}).
 
 
-update_misc(SPid, Misc) ->
-    gen_server:cast(SPid, {update_misc, Misc}).
-
 -spec(close(spid()) -> ok).
 -spec(close(spid()) -> ok).
 close(SPid) ->
 close(SPid) ->
     gen_server:call(SPid, close, infinity).
     gen_server:call(SPid, close, infinity).
@@ -338,7 +339,8 @@ init([Parent, #{zone                := Zone,
                 clean_start         := CleanStart,
                 clean_start         := CleanStart,
                 expiry_interval     := ExpiryInterval,
                 expiry_interval     := ExpiryInterval,
                 max_inflight        := MaxInflight,
                 max_inflight        := MaxInflight,
-                topic_alias_maximum := TopicAliasMaximum}]) ->
+                topic_alias_maximum := TopicAliasMaximum,
+                will_msg            := WillMsg}]) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
     true = link(ConnPid),
     true = link(ConnPid),
     IdleTimout = get_env(Zone, idle_timeout, 30000),
     IdleTimout = get_env(Zone, idle_timeout, 30000),
@@ -362,7 +364,8 @@ init([Parent, #{zone                := Zone,
                    deliver_stats       = 0,
                    deliver_stats       = 0,
                    enqueue_stats       = 0,
                    enqueue_stats       = 0,
                    created_at          = os:timestamp(),
                    created_at          = os:timestamp(),
-                   topic_alias_maximum = TopicAliasMaximum
+                   topic_alias_maximum = TopicAliasMaximum,
+                   will_msg            = WillMsg
                   },
                   },
     emqx_sm:register_session(ClientId, attrs(State)),
     emqx_sm:register_session(ClientId, attrs(State)),
     emqx_sm:set_session_stats(ClientId, stats(State)),
     emqx_sm:set_session_stats(ClientId, stats(State)),
@@ -511,17 +514,22 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
     end;
     end;
 
 
 %% RESUME:
 %% RESUME:
-handle_cast({resume, ConnPid}, State = #state{client_id       = ClientId,
-                                              conn_pid        = OldConnPid,
-                                              clean_start     = CleanStart,
-                                              retry_timer     = RetryTimer,
-                                              await_rel_timer = AwaitTimer,
-                                              expiry_timer    = ExpireTimer}) ->
+handle_cast({resume, #{conn_pid            := ConnPid,
+                       will_msg            := WillMsg,
+                       expiry_interval     := SessionExpiryInterval,
+                       max_inflight        := MaxInflight, 
+                       topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id        = ClientId,
+                                                                                  conn_pid         = OldConnPid,
+                                                                                  clean_start      = CleanStart,
+                                                                                  retry_timer      = RetryTimer,
+                                                                                  await_rel_timer  = AwaitTimer,
+                                                                                  expiry_timer     = ExpireTimer,
+                                                                                  will_delay_timer = WillDelayTimer}) ->
 
 
     ?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
     ?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
 
 
     %% Cancel Timers
     %% Cancel Timers
-    lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
+    lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
 
 
     case kick(ClientId, OldConnPid, ConnPid) of
     case kick(ClientId, OldConnPid, ConnPid) of
         ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
         ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
@@ -530,14 +538,19 @@ handle_cast({resume, ConnPid}, State = #state{client_id       = ClientId,
 
 
     true = link(ConnPid),
     true = link(ConnPid),
 
 
-    State1 = State#state{conn_pid        = ConnPid,
-                         binding         = binding(ConnPid),
-                         old_conn_pid    = OldConnPid,
-                         clean_start     = false,
-                         retry_timer     = undefined,
-                         awaiting_rel    = #{},
-                         await_rel_timer = undefined,
-                         expiry_timer    = undefined},
+    State1 = State#state{conn_pid            = ConnPid,
+                         binding             = binding(ConnPid),
+                         old_conn_pid        = OldConnPid,
+                         clean_start         = false,
+                         retry_timer         = undefined,
+                         awaiting_rel        = #{},
+                         await_rel_timer     = undefined,
+                         expiry_timer        = undefined,
+                         expiry_interval     = SessionExpiryInterval,
+                         inflight            = emqx_inflight:update_size(MaxInflight, State#state.inflight), 
+                         topic_alias_maximum = TopicAliasMaximum,
+                         will_delay_timer    = undefined,
+                         will_msg            = WillMsg},
 
 
     %% Clean Session: true -> false???
     %% Clean Session: true -> false???
     CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
     CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
@@ -550,10 +563,6 @@ handle_cast({resume, ConnPid}, State = #state{client_id       = ClientId,
 handle_cast({expiry_interval, Interval}, State) ->
 handle_cast({expiry_interval, Interval}, State) ->
     {noreply, State#state{expiry_interval = Interval}};
     {noreply, State#state{expiry_interval = Interval}};
 
 
-handle_cast({update_misc, #{max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State) ->
-    {noreply, State#state{inflight            = emqx_inflight:update_size(MaxInflight, State#state.inflight),
-                          topic_alias_maximum = TopicAliasMaximum}};
-
 handle_cast(Msg, State) ->
 handle_cast(Msg, State) ->
     emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
     emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
     {noreply, State}.
     {noreply, State}.
@@ -612,11 +621,17 @@ handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
     ?LOG(info, "expired, shutdown now:(", [], State),
     ?LOG(info, "expired, shutdown now:(", [], State),
     shutdown(expired, State);
     shutdown(expired, State);
 
 
-handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
-    {stop, Reason, State#state{conn_pid = undefined}};
+handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
+    send_willmsg(WillMsg),
+    {noreply, State#state{will_msg = undefined}};
+
+handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
+    send_willmsg(WillMsg),
+    {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
 
 
 handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
 handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
-    {noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
+    State1 = ensure_will_delay_timer(State),
+    {noreply, ensure_expire_timer(State1#state{conn_pid = undefined})};
 
 
 handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
 handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
     %% ignore
     %% ignore
@@ -631,8 +646,9 @@ handle_info(Info, State) ->
     emqx_logger:error("[Session] unexpected info: ~p", [Info]),
     emqx_logger:error("[Session] unexpected info: ~p", [Info]),
     {noreply, State}.
     {noreply, State}.
 
 
-terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) ->
+terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) ->
     emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
     emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
+    send_willmsg(WillMsg),
     %% Ensure to shutdown the connection
     %% Ensure to shutdown the connection
     if
     if
         ConnPid =/= undefined ->
         ConnPid =/= undefined ->
@@ -714,6 +730,14 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
             ensure_retry_timer(Interval - max(0, Age), State)
             ensure_retry_timer(Interval - max(0, Age), State)
     end.
     end.
 
 
+%%------------------------------------------------------------------------------
+%% Send Will Message
+%%------------------------------------------------------------------------------
+send_willmsg(undefined) ->
+    ignore;
+send_willmsg(WillMsg) ->
+    emqx_broker:publish(WillMsg).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Expire Awaiting Rel
 %% Expire Awaiting Rel
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -899,6 +923,11 @@ ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval >
 ensure_expire_timer(State) ->
 ensure_expire_timer(State) ->
     State.
     State.
 
 
+ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) ->
+    State#state{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)};
+ensure_will_delay_timer(State) ->
+    State.
+
 ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
 ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
                                   idle_timeout = IdleTimeout}) ->
                                   idle_timeout = IdleTimeout}) ->
     State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
     State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};

+ 10 - 15
src/emqx_sm.erl

@@ -22,7 +22,7 @@
 
 
 -export([open_session/1, close_session/1]).
 -export([open_session/1, close_session/1]).
 -export([lookup_session/1, lookup_session_pid/1]).
 -export([lookup_session/1, lookup_session_pid/1]).
--export([resume_session/1, resume_session/2]).
+-export([resume_session/2]).
 -export([discard_session/1, discard_session/2]).
 -export([discard_session/1, discard_session/2]).
 -export([register_session/2, unregister_session/1]).
 -export([register_session/2, unregister_session/1]).
 -export([get_session_attrs/1, set_session_attrs/2]).
 -export([get_session_attrs/1, set_session_attrs/2]).
@@ -59,15 +59,13 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
                  end,
                  end,
     emqx_sm_locker:trans(ClientId, CleanStart);
     emqx_sm_locker:trans(ClientId, CleanStart);
 
 
-open_session(SessAttrs = #{clean_start := false, 
-                           client_id := ClientId, 
-                           conn_pid := ConnPid, 
-                           max_inflight := MaxInflight,
-                           topic_alias_maximum := TopicAliasMaximum}) ->
+open_session(SessAttrs = #{clean_start          := false, 
+                           client_id            := ClientId,
+                           max_inflight         := MaxInflight,
+                           topic_alias_maximum  := TopicAliasMaximum}) ->
     ResumeStart = fun(_) ->
     ResumeStart = fun(_) ->
-                      case resume_session(ClientId, ConnPid) of
+                      case resume_session(ClientId, SessAttrs) of
                           {ok, SPid} ->
                           {ok, SPid} ->
-                              emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}),
                               {ok, SPid, true};
                               {ok, SPid, true};
                           {error, not_found} ->
                           {error, not_found} ->
                               emqx_session_sup:start_session(SessAttrs)
                               emqx_session_sup:start_session(SessAttrs)
@@ -90,15 +88,12 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
                   end, lookup_session(ClientId)).
                   end, lookup_session(ClientId)).
 
 
 %% @doc Try to resume a session.
 %% @doc Try to resume a session.
--spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}).
-resume_session(ClientId) ->
-    resume_session(ClientId, self()).
-
-resume_session(ClientId, ConnPid) ->
+-spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}).
+resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
     case lookup_session(ClientId) of
     case lookup_session(ClientId) of
         [] -> {error, not_found};
         [] -> {error, not_found};
         [{_ClientId, SPid}] ->
         [{_ClientId, SPid}] ->
-            ok = emqx_session:resume(SPid, ConnPid),
+            ok = emqx_session:resume(SPid, SessAttrs),
             {ok, SPid};
             {ok, SPid};
         Sessions ->
         Sessions ->
             [{_, SPid}|StaleSessions] = lists:reverse(Sessions),
             [{_, SPid}|StaleSessions] = lists:reverse(Sessions),
@@ -106,7 +101,7 @@ resume_session(ClientId, ConnPid) ->
             lists:foreach(fun({_, StalePid}) ->
             lists:foreach(fun({_, StalePid}) ->
                               catch emqx_session:discard(StalePid, ConnPid)
                               catch emqx_session:discard(StalePid, ConnPid)
                           end, StaleSessions),
                           end, StaleSessions),
-            ok = emqx_session:resume(SPid, ConnPid),
+            ok = emqx_session:resume(SPid, SessAttrs),
             {ok, SPid}
             {ok, SPid}
     end.
     end.
 
 

+ 16 - 0
test/emqx_SUITE.erl

@@ -38,6 +38,13 @@
                                 clean_start = false,
                                 clean_start = false,
                                 password  = <<"public">>})).
                                 password  = <<"public">>})).
 
 
+-define(CLIENT3, ?CONNECT_PACKET(#mqtt_packet_connect{
+                                username  = <<"admin">>,
+                                proto_ver = ?MQTT_PROTO_V5,
+                                clean_start = false,
+                                password  = <<"public">>,
+                                will_props = #{'Will-Delay-Interval' => 2}})).
+
 -define(SUBCODE, [0]).
 -define(SUBCODE, [0]).
 
 
 -define(PACKETID, 1).
 -define(PACKETID, 1).
@@ -66,6 +73,7 @@ groups() ->
       [
       [
       mqtt_connect,
       mqtt_connect,
       mqtt_connect_with_tcp,
       mqtt_connect_with_tcp,
+      mqtt_connect_with_will_props,
       mqtt_connect_with_ssl_oneway,
       mqtt_connect_with_ssl_oneway,
       mqtt_connect_with_ssl_twoway,
       mqtt_connect_with_ssl_twoway,
       mqtt_connect_with_ws
       mqtt_connect_with_ws
@@ -109,6 +117,14 @@ mqtt_connect_with_tcp(_) ->
     {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data),
     {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data),
     emqx_client_sock:close(Sock).
     emqx_client_sock:close(Sock).
 
 
+mqtt_connect_with_will_props(_) ->
+    %% Issue #599
+    %% Empty clientId and clean_session = false
+    {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
+    Packet = raw_send_serialize(?CLIENT3),
+    emqx_client_sock:send(Sock, Packet),
+    emqx_client_sock:close(Sock).
+
 mqtt_connect_with_ssl_oneway(_) ->
 mqtt_connect_with_ssl_oneway(_) ->
     emqx:shutdown(),
     emqx:shutdown(),
     emqx_ct_broker_helpers:change_opts(ssl_oneway),
     emqx_ct_broker_helpers:change_opts(ssl_oneway),

+ 9 - 8
test/emqx_mock_client.erl

@@ -46,14 +46,15 @@ init([ClientId]) ->
     }.
     }.
 
 
 handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
 handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
-    Attrs = #{ zone             => Zone,
-               client_id        => ClientId,
-               conn_pid         => ClientPid,
-               clean_start      => true,
-               username         => undefined,
-               expiry_interval  => 0,
-               max_inflight     => 0,
-               topic_alias_maximum => 0
+    Attrs = #{ zone                 => Zone,
+               client_id            => ClientId,
+               conn_pid             => ClientPid,
+               clean_start          => true,
+               username             => undefined,
+               expiry_interval      => 0,
+               max_inflight         => 0,
+               topic_alias_maximum  => 0,
+               will_msg             => undefined
              },
              },
     {ok, SessPid} = emqx_sm:open_session(Attrs),
     {ok, SessPid} = emqx_sm:open_session(Attrs),
     {reply, {ok, SessPid},
     {reply, {ok, SessPid},

+ 119 - 0
test/emqx_protocol_SUITE.erl

@@ -131,6 +131,125 @@ connect_v5(_) ->
                                                  #{'Response-Information' := _RespInfo}), _} =
                                                  #{'Response-Information' := _RespInfo}), _} =
                                 raw_recv_parse(Data, ?MQTT_PROTO_V5)
                                 raw_recv_parse(Data, ?MQTT_PROTO_V5)
                     end),
                     end),
+    
+    % test clean start 
+    with_connection(fun(Sock) ->
+                            emqx_client_sock:send(Sock,
+                                                    raw_send_serialize(
+                                                        ?CONNECT_PACKET(
+                                                            #mqtt_packet_connect{
+                                                                proto_ver   = ?MQTT_PROTO_V5,
+                                                                clean_start = true,
+                                                                client_id   = <<"myclient">>,
+                                                                properties  =
+                                                                    #{'Session-Expiry-Interval' => 10}})
+                                                  )),
+                            {ok, Data} = gen_tcp:recv(Sock, 0),
+                            {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
+                            emqx_client_sock:send(Sock, raw_send_serialize(
+                                                            ?DISCONNECT_PACKET(?RC_SUCCESS)
+                                                  ))
+                    end),
+
+    timer:sleep(1000),
+
+    with_connection(fun(Sock) ->
+                            emqx_client_sock:send(Sock,
+                                                    raw_send_serialize(
+                                                        ?CONNECT_PACKET(
+                                                            #mqtt_packet_connect{
+                                                                proto_ver   = ?MQTT_PROTO_V5,
+                                                                clean_start = false,
+                                                                client_id   = <<"myclient">>,
+                                                                properties  =
+                                                                    #{'Session-Expiry-Interval' => 10}})
+                                                  )),
+                            {ok, Data} = gen_tcp:recv(Sock, 0),
+                            {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
+                    end),
+
+    % test will message publish and cancel
+    with_connection(fun(Sock) ->
+                            emqx_client_sock:send(Sock,
+                                                    raw_send_serialize(
+                                                        ?CONNECT_PACKET(
+                                                            #mqtt_packet_connect{
+                                                                proto_ver    = ?MQTT_PROTO_V5,
+                                                                clean_start  = true,
+                                                                client_id    = <<"myclient">>,
+                                                                will_flag    = true,
+                                                                will_qos     = ?QOS_1,
+                                                                will_retain  = false,
+                                                                will_props   = #{'Will-Delay-Interval' => 5},
+                                                                will_topic   = <<"TopicA">>,
+                                                                will_payload = <<"will message">>,
+                                                                properties   = #{'Session-Expiry-Interval' => 3}
+                                                            }
+                                                        )
+                                                    )
+                            ),
+                            {ok, Data} = gen_tcp:recv(Sock, 0),
+                            {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
+
+                            {ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
+                                                                   [binary, {packet, raw},
+                                                                    {active, false}], 3000),
+
+                            do_connect(Sock2, ?MQTT_PROTO_V5),
+
+                            emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh  => 1,
+                                                                                                                   qos => ?QOS_2,
+                                                                                                                   rap => 0,
+                                                                                                                   nl  => 0,
+                                                                                                                   rc  => 0}}]),
+                                                                            #{version => ?MQTT_PROTO_V5})),
+
+                            {ok, SubData} = gen_tcp:recv(Sock2, 0),
+                            {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
+
+                            emqx_client_sock:send(Sock, raw_send_serialize(
+                                                            ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
+                                                        )
+                            ),
+
+                            {error, timeout} = gen_tcp:recv(Sock2, 0, 1000),
+
+                            % session resumed
+                            {ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
+                                                                   [binary, {packet, raw},
+                                                                    {active, false}], 3000),
+
+                            emqx_client_sock:send(Sock3,
+                                                    raw_send_serialize(
+                                                        ?CONNECT_PACKET(
+                                                            #mqtt_packet_connect{
+                                                                proto_ver    = ?MQTT_PROTO_V5,
+                                                                clean_start  = false,
+                                                                client_id    = <<"myclient">>,
+                                                                will_flag    = true,
+                                                                will_qos     = ?QOS_1,
+                                                                will_retain  = false,
+                                                                will_props   = #{'Will-Delay-Interval' => 5},
+                                                                will_topic   = <<"TopicA">>,
+                                                                will_payload = <<"will message 2">>,
+                                                                properties   = #{'Session-Expiry-Interval' => 3}
+                                                            }
+                                                        )
+                                                    )
+                            ),
+                            {ok, Data3} = gen_tcp:recv(Sock3, 0),
+                            {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
+
+                            emqx_client_sock:send(Sock3, raw_send_serialize(
+                                                            ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
+                                                        )
+                            ),
+                            
+                            {ok, WillData} = gen_tcp:recv(Sock2, 0),
+                            {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
+
+                            emqx_client_sock:close(Sock2)
+                    end),
     ok.
     ok.
 
 
 do_connect(Sock, ProtoVer) ->
 do_connect(Sock, ProtoVer) ->

+ 9 - 2
test/emqx_sm_SUITE.erl

@@ -24,8 +24,15 @@ all() -> [t_open_close_session].
 t_open_close_session(_) ->
 t_open_close_session(_) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     emqx_ct_broker_helpers:run_setup_steps(),
     {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
     {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
-    Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
-              zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0, topic_alias_maximum => 0},
+    Attrs = #{clean_start         => true, 
+              client_id           => <<"client">>, 
+              conn_pid            => ClientPid,
+              zone                => internal, 
+              username            => <<"zhou">>, 
+              expiry_interval     => 0, 
+              max_inflight        => 0, 
+              topic_alias_maximum => 0,
+              will_msg            => undefined},
     {ok, SPid} = emqx_sm:open_session(Attrs),
     {ok, SPid} = emqx_sm:open_session(Attrs),
     [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
     [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
     SPid = emqx_sm:lookup_session_pid(<<"client">>),
     SPid = emqx_sm:lookup_session_pid(<<"client">>),