|
|
@@ -61,8 +61,7 @@
|
|
|
|
|
|
%% Export for emqx_channel implementations
|
|
|
-export([
|
|
|
- maybe_nack/1,
|
|
|
- maybe_mark_as_delivered/2
|
|
|
+ maybe_nack/1
|
|
|
]).
|
|
|
|
|
|
%% Exports for CT
|
|
|
@@ -199,11 +198,6 @@ info(timers, #channel{timers = Timers}) ->
|
|
|
set_conn_state(ConnState, Channel) ->
|
|
|
Channel#channel{conn_state = ConnState}.
|
|
|
|
|
|
-set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
|
|
- %% Assume that this is also an updated session. Allow side effect.
|
|
|
- Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
|
|
|
- Channel#channel{session = Session1}.
|
|
|
-
|
|
|
-spec stats(channel()) -> emqx_types:stats().
|
|
|
stats(#channel{session = undefined}) ->
|
|
|
emqx_pd:get_counters(?CHANNEL_METRICS);
|
|
|
@@ -417,10 +411,10 @@ handle_in(
|
|
|
case emqx_session:puback(ClientInfo, PacketId, Session) of
|
|
|
{ok, Msg, NSession} ->
|
|
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
- {ok, set_session(NSession, Channel)};
|
|
|
+ {ok, Channel#channel{session = NSession}};
|
|
|
{ok, Msg, Publishes, NSession} ->
|
|
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
- handle_out(publish, Publishes, set_session(NSession, Channel));
|
|
|
+ handle_out(publish, Publishes, Channel#channel{session = NSession});
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
|
|
|
ok = emqx_metrics:inc('packets.puback.inuse'),
|
|
|
@@ -438,7 +432,7 @@ handle_in(
|
|
|
case emqx_session:pubrec(ClientInfo, PacketId, Session) of
|
|
|
{ok, Msg, NSession} ->
|
|
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
- NChannel = set_session(NSession, Channel),
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
|
|
|
@@ -458,7 +452,7 @@ handle_in(
|
|
|
) ->
|
|
|
case emqx_session:pubrel(ClientInfo, PacketId, Session) of
|
|
|
{ok, NSession} ->
|
|
|
- NChannel = set_session(NSession, Channel),
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
|
?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
|
|
|
@@ -473,9 +467,9 @@ handle_in(
|
|
|
) ->
|
|
|
case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
|
|
|
{ok, NSession} ->
|
|
|
- {ok, set_session(NSession, Channel)};
|
|
|
+ {ok, Channel#channel{session = NSession}};
|
|
|
{ok, Publishes, NSession} ->
|
|
|
- handle_out(publish, Publishes, set_session(NSession, Channel));
|
|
|
+ handle_out(publish, Publishes, Channel#channel{session = NSession});
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
ok = emqx_metrics:inc('packets.pubcomp.inuse'),
|
|
|
{ok, Channel};
|
|
|
@@ -734,7 +728,7 @@ do_publish(
|
|
|
case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
|
|
|
{ok, PubRes, NSession} ->
|
|
|
RC = pubrec_reason_code(PubRes),
|
|
|
- NChannel0 = set_session(NSession, Channel),
|
|
|
+ NChannel0 = Channel#channel{session = NSession},
|
|
|
NChannel1 = ensure_timer(await_timer, NChannel0),
|
|
|
NChannel2 = ensure_quota(PubRes, NChannel1),
|
|
|
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
|
|
@@ -830,7 +824,7 @@ do_subscribe(
|
|
|
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
|
|
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
|
|
{ok, NSession} ->
|
|
|
- {QoS, set_session(NSession, Channel)};
|
|
|
+ {QoS, Channel#channel{session = NSession}};
|
|
|
{error, RC} ->
|
|
|
?SLOG(
|
|
|
warning,
|
|
|
@@ -869,7 +863,7 @@ do_unsubscribe(
|
|
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
|
|
{ok, NSession} ->
|
|
|
- {?RC_SUCCESS, set_session(NSession, Channel)};
|
|
|
+ {?RC_SUCCESS, Channel#channel{session = NSession}};
|
|
|
{error, RC} ->
|
|
|
{RC, Channel}
|
|
|
end.
|
|
|
@@ -898,7 +892,7 @@ process_disconnect(ReasonCode, Properties, Channel) ->
|
|
|
|
|
|
maybe_update_expiry_interval(
|
|
|
#{'Session-Expiry-Interval' := Interval},
|
|
|
- Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}
|
|
|
+ Channel = #channel{conninfo = ConnInfo}
|
|
|
) ->
|
|
|
EI = timer:seconds(Interval),
|
|
|
OldEI = maps:get(expiry_interval, ConnInfo, 0),
|
|
|
@@ -907,12 +901,11 @@ maybe_update_expiry_interval(
|
|
|
Channel;
|
|
|
false ->
|
|
|
NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}},
|
|
|
- ClientID = maps:get(clientid, ClientInfo, undefined),
|
|
|
%% Check if the client turns off persistence (turning it on is disallowed)
|
|
|
case EI =:= 0 andalso OldEI > 0 of
|
|
|
true ->
|
|
|
- S = emqx_persistent_session:discard(ClientID, NChannel#channel.session),
|
|
|
- set_session(S, NChannel);
|
|
|
+ NSession = emqx_session:unpersist(NChannel#channel.session),
|
|
|
+ NChannel#channel{session = NSession};
|
|
|
false ->
|
|
|
NChannel
|
|
|
end
|
|
|
@@ -956,9 +949,7 @@ handle_deliver(
|
|
|
Delivers1 = maybe_nack(Delivers),
|
|
|
Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
|
|
|
NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
|
|
|
- NChannel = set_session(NSession, Channel),
|
|
|
- %% We consider queued/dropped messages as delivered since they are now in the session state.
|
|
|
- maybe_mark_as_delivered(Session, Delivers),
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
{ok, NChannel};
|
|
|
handle_deliver(
|
|
|
Delivers,
|
|
|
@@ -976,11 +967,10 @@ handle_deliver(
|
|
|
)
|
|
|
of
|
|
|
{ok, Publishes, NSession} ->
|
|
|
- NChannel = set_session(NSession, Channel),
|
|
|
- maybe_mark_as_delivered(NSession, Delivers),
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
|
|
{ok, NSession} ->
|
|
|
- {ok, set_session(NSession, Channel)}
|
|
|
+ {ok, Channel#channel{session = NSession}}
|
|
|
end.
|
|
|
|
|
|
%% Nack delivers from shared subscription
|
|
|
@@ -996,15 +986,6 @@ not_nacked({deliver, _Topic, Msg}) ->
|
|
|
true
|
|
|
end.
|
|
|
|
|
|
-maybe_mark_as_delivered(Session, Delivers) ->
|
|
|
- case emqx_session:info(is_persistent, Session) of
|
|
|
- false ->
|
|
|
- skip;
|
|
|
- true ->
|
|
|
- SessionID = emqx_session:info(id, Session),
|
|
|
- emqx_persistent_session:mark_as_delivered(SessionID, Delivers)
|
|
|
- end.
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Handle outgoing packet
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -1096,11 +1077,11 @@ return_connack(AckPacket, Channel) ->
|
|
|
ignore ->
|
|
|
{ok, Replies, Channel};
|
|
|
{ok, Publishes, NSession} ->
|
|
|
- NChannel0 = Channel#channel{
|
|
|
+ NChannel1 = Channel#channel{
|
|
|
resuming = false,
|
|
|
- pendings = []
|
|
|
+ pendings = [],
|
|
|
+ session = NSession
|
|
|
},
|
|
|
- NChannel1 = set_session(NSession, NChannel0),
|
|
|
{Packets, NChannel2} = do_deliver(Publishes, NChannel1),
|
|
|
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
|
|
{ok, Replies ++ Outgoing, NChannel2}
|
|
|
@@ -1345,9 +1326,10 @@ handle_timeout(
|
|
|
) ->
|
|
|
case emqx_session:retry(ClientInfo, Session) of
|
|
|
{ok, NSession} ->
|
|
|
- {ok, clean_timer(retry_timer, set_session(NSession, Channel))};
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
+ {ok, clean_timer(retry_timer, NChannel)};
|
|
|
{ok, Publishes, Timeout, NSession} ->
|
|
|
- NChannel = set_session(NSession, Channel),
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
|
|
end;
|
|
|
handle_timeout(
|
|
|
@@ -1363,9 +1345,11 @@ handle_timeout(
|
|
|
) ->
|
|
|
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
|
|
|
{ok, NSession} ->
|
|
|
- {ok, clean_timer(await_timer, set_session(NSession, Channel))};
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
+ {ok, clean_timer(await_timer, NChannel)};
|
|
|
{ok, Timeout, NSession} ->
|
|
|
- {ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))}
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
+ {ok, reset_timer(await_timer, Timeout, NChannel)}
|
|
|
end;
|
|
|
handle_timeout(_TRef, expire_session, Channel) ->
|
|
|
shutdown(expired, Channel);
|
|
|
@@ -1453,25 +1437,11 @@ terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg
|
|
|
%% if will_msg still exists when the session is terminated, it
|
|
|
%% must be published immediately.
|
|
|
WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
|
|
|
- (Reason =:= expired) andalso persist_if_session(Channel),
|
|
|
run_terminate_hook(Reason, Channel).
|
|
|
|
|
|
-persist_if_session(#channel{session = Session} = Channel) ->
|
|
|
- case emqx_session:is_session(Session) of
|
|
|
- true ->
|
|
|
- _ = emqx_persistent_session:persist(
|
|
|
- Channel#channel.clientinfo,
|
|
|
- Channel#channel.conninfo,
|
|
|
- Channel#channel.session
|
|
|
- ),
|
|
|
- ok;
|
|
|
- false ->
|
|
|
- ok
|
|
|
- end.
|
|
|
-
|
|
|
-run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) ->
|
|
|
+run_terminate_hook(_Reason, #channel{session = undefined}) ->
|
|
|
ok;
|
|
|
-run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session} = _Channel) ->
|
|
|
+run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
|
emqx_session:terminate(ClientInfo, Reason, Session).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -2096,11 +2066,9 @@ maybe_resume_session(#channel{
|
|
|
session = Session,
|
|
|
resuming = true,
|
|
|
pendings = Pendings,
|
|
|
- clientinfo = #{clientid := ClientId} = ClientInfo
|
|
|
+ clientinfo = ClientInfo
|
|
|
}) ->
|
|
|
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
|
|
|
- %% We consider queued/dropped messages as delivered since they are now in the session state.
|
|
|
- emqx_persistent_session:mark_as_delivered(ClientId, Pendings),
|
|
|
case emqx_session:deliver(ClientInfo, Pendings, Session1) of
|
|
|
{ok, Session2} ->
|
|
|
{ok, Publishes, Session2};
|