|
|
@@ -165,9 +165,8 @@ handle_cast(Msg, Channel) ->
|
|
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
|
|
{noreply, Channel}.
|
|
|
|
|
|
-terminate(Reason, #{conninfo := ConnInfo, clientinfo := ClientInfo, session := Session} = Channel) ->
|
|
|
+terminate(Reason, #{clientinfo := ClientInfo, session := Session} = Channel) ->
|
|
|
ok = cancel_expiry_timer(Channel),
|
|
|
- (Reason =:= expired) andalso emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
|
|
|
emqx_session:terminate(ClientInfo, Reason, Session).
|
|
|
|
|
|
code_change(_OldVsn, Channel, _Extra) ->
|
|
|
@@ -205,10 +204,7 @@ handle_deliver(
|
|
|
Delivers1 = emqx_channel:maybe_nack(Delivers),
|
|
|
Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
|
|
|
NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
|
|
|
- NChannel = persist(NSession, Channel),
|
|
|
- %% We consider queued/dropped messages as delivered since they are now in the session state.
|
|
|
- emqx_channel:maybe_mark_as_delivered(Session, Delivers),
|
|
|
- NChannel.
|
|
|
+ Channel#{session := NSession}.
|
|
|
|
|
|
cancel_expiry_timer(#{expiry_timer := TRef}) when is_reference(TRef) ->
|
|
|
_ = erlang:cancel_timer(TRef),
|
|
|
@@ -334,10 +330,6 @@ channel(ConnInfo, ClientInfo) ->
|
|
|
pendings => []
|
|
|
}.
|
|
|
|
|
|
-persist(Session, #{clientinfo := ClientInfo, conninfo := ConnInfo} = Channel) ->
|
|
|
- Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
|
|
|
- Channel#{session => Session1}.
|
|
|
-
|
|
|
info(Channel) ->
|
|
|
#{
|
|
|
conninfo => maps:get(conninfo, Channel, undefined),
|