|
|
@@ -638,6 +638,9 @@ try_open_session(PState = #pstate{zone = Zone,
|
|
|
case emqx_sm:open_session(SessAttrs1) of
|
|
|
{ok, SPid} ->
|
|
|
{ok, SPid, false};
|
|
|
+ {ok, SPid, true} ->
|
|
|
+ emqx_delayed_publish:cancel_publish(ClientId),
|
|
|
+ {ok, SPid, true};
|
|
|
Other -> Other
|
|
|
end.
|
|
|
|
|
|
@@ -838,11 +841,9 @@ shutdown(Reason, PState = #pstate{connected = true,
|
|
|
|
|
|
send_willmsg(undefined, _ClientId) ->
|
|
|
ignore;
|
|
|
-send_willmsg(WillMsg = #message{topic = Topic,
|
|
|
- headers = #{'Will-Delay-Interval' := Interval} = Headers}, ClientId)
|
|
|
+send_willmsg(WillMsg = #message{headers = #{'Will-Delay-Interval' := Interval}}, ClientId)
|
|
|
when is_integer(Interval), Interval > 0 ->
|
|
|
- SendAfter = integer_to_binary(Interval),
|
|
|
- emqx_broker:publish(WillMsg#message{topic = emqx_topic:join([<<"$will">>, SendAfter, Topic]), headers = Headers#{client_id => ClientId}});
|
|
|
+ emqx_delayed_publish:delay_publish(WillMsg, ClientId);
|
|
|
send_willmsg(WillMsg, _ClientId) ->
|
|
|
emqx_broker:publish(WillMsg).
|
|
|
|