|
|
@@ -14,6 +14,37 @@
|
|
|
%% limitations under the License.
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+%% @doc This module implements an MQTT session that can survive
|
|
|
+%% restart of EMQX node by backing up its state on disk. It consumes
|
|
|
+%% messages from a shared durable storage. This is in contrast to the
|
|
|
+%% regular "mem" sessions that store all recieved messages in their
|
|
|
+%% own memory queues.
|
|
|
+%%
|
|
|
+%% The main challenge of durable session is to replay sent, but
|
|
|
+%% unacked, messages in case of the client reconnect. This
|
|
|
+%% implementation approaches this problem by storing iterators, batch
|
|
|
+%% sizes and sequence numbers of MQTT packets for the consumed
|
|
|
+%% messages as an array of "stream replay state" records (`#srs'), in
|
|
|
+%% such a way, that messages and their corresponging packet IDs can be
|
|
|
+%% reconstructing by "replaying" the stored SRSes.
|
|
|
+%%
|
|
|
+%% The session logic is implemented as two mostly separate loops
|
|
|
+%% ("circuits") that operate on a transient message queue, serving as
|
|
|
+%% a buffer.
|
|
|
+%%
|
|
|
+%% - *Push circuit* polls durable storage, and pushes messages to the
|
|
|
+%% queue. It's assisted by the `stream_scheduler' module that decides
|
|
|
+%% which streams are eligible for pull. Push circuit is responsible
|
|
|
+%% for maintaining a size of the queue at the configured limit.
|
|
|
+%%
|
|
|
+%% - *Pull circuit* consumes messages from the buffer and publishes
|
|
|
+%% them to the client connection. It's responsible for maintining the
|
|
|
+%% number of inflight packets as close to the negitiated
|
|
|
+%% `Recieve-Maximum' as possible to maximize the throughput.
|
|
|
+%%
|
|
|
+%% These circuites interact simply by notifying each other via
|
|
|
+%% `pull_now' or `push_now' functions.
|
|
|
+
|
|
|
-module(emqx_persistent_session_ds).
|
|
|
|
|
|
-behaviour(emqx_session).
|
|
|
@@ -167,22 +198,23 @@
|
|
|
-type shared_sub_state() :: term().
|
|
|
|
|
|
-define(TIMER_PULL, timer_pull).
|
|
|
+-define(TIMER_PUSH, timer_push).
|
|
|
-define(TIMER_GET_STREAMS, timer_get_streams).
|
|
|
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
|
|
|
-define(TIMER_RETRY_REPLAY, timer_retry_replay).
|
|
|
|
|
|
--type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT | ?TIMER_RETRY_REPLAY.
|
|
|
+-type timer() ::
|
|
|
+ ?TIMER_PULL
|
|
|
+ | ?TIMER_PUSH
|
|
|
+ | ?TIMER_GET_STREAMS
|
|
|
+ | ?TIMER_BUMP_LAST_ALIVE_AT
|
|
|
+ | ?TIMER_RETRY_REPLAY.
|
|
|
+
|
|
|
+-type timer_state() :: reference() | undefined.
|
|
|
|
|
|
%% TODO: Needs configuration?
|
|
|
-define(TIMEOUT_RETRY_REPLAY, 1000).
|
|
|
|
|
|
--record(pending_next, {
|
|
|
- ref :: reference(),
|
|
|
- stream_key :: emqx_persistent_session_ds_state:stream_key(),
|
|
|
- it_begin :: emqx_ds:iterator(),
|
|
|
- is_replay :: boolean()
|
|
|
-}).
|
|
|
-
|
|
|
-type session() :: #{
|
|
|
%% Client ID
|
|
|
id := id(),
|
|
|
@@ -193,25 +225,27 @@
|
|
|
%% Shared subscription state:
|
|
|
shared_sub_s := shared_sub_state(),
|
|
|
%% Buffer:
|
|
|
- inflight := emqx_persistent_session_ds_inflight:t(),
|
|
|
- %% Last fetched stream:
|
|
|
- %% Used as a continuation point for fair stream scheduling.
|
|
|
- last_fetched_stream => emqx_persistent_session_ds_state:stream_key(),
|
|
|
+ inflight := emqx_persistent_session_ds_buffer:t(),
|
|
|
+ stream_scheduler_s := emqx_persistent_session_ds_stream_scheduler:t(),
|
|
|
%% In-progress replay:
|
|
|
%% List of stream replay states to be added to the inflight buffer.
|
|
|
- replay => [{_StreamKey, stream_state()}, ...],
|
|
|
+ replay := [{_StreamKey, stream_state()}, ...] | undefined,
|
|
|
%% Timers:
|
|
|
- timer() => reference()
|
|
|
+ ?TIMER_PULL := timer_state(),
|
|
|
+ ?TIMER_PUSH := timer_state(),
|
|
|
+ ?TIMER_GET_STREAMS := timer_state(),
|
|
|
+ ?TIMER_BUMP_LAST_ALIVE_AT := timer_state(),
|
|
|
+ ?TIMER_RETRY_REPLAY := timer_state()
|
|
|
}.
|
|
|
|
|
|
--define(IS_REPLAY_ONGOING(SESS), is_map_key(replay, SESS)).
|
|
|
+-define(IS_REPLAY_ONGOING(REPLAY), is_list(REPLAY)).
|
|
|
|
|
|
-record(req_sync, {
|
|
|
from :: pid(),
|
|
|
ref :: reference()
|
|
|
}).
|
|
|
|
|
|
--type stream_state() :: #srs{}.
|
|
|
+-type stream_state() :: emqx_persistent_session_ds_stream_scheduler:srs().
|
|
|
|
|
|
-type message() :: emqx_types:message().
|
|
|
-type timestamp() :: emqx_utils_calendar:epoch_millisecond().
|
|
|
@@ -259,7 +293,7 @@ open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
|
|
|
ok = emqx_cm:takeover_kick(ClientID),
|
|
|
case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of
|
|
|
Session0 = #{} ->
|
|
|
- Session1 = Session0#{props => Conf},
|
|
|
+ Session1 = Session0#{props := Conf},
|
|
|
Session = do_expire(ClientInfo, Session1),
|
|
|
{true, ensure_timers(Session), []};
|
|
|
false ->
|
|
|
@@ -314,13 +348,13 @@ info(upgrade_qos, #{props := Conf}) ->
|
|
|
info(inflight, #{inflight := Inflight}) ->
|
|
|
Inflight;
|
|
|
info(inflight_cnt, #{inflight := Inflight}) ->
|
|
|
- emqx_persistent_session_ds_inflight:n_inflight(Inflight);
|
|
|
+ emqx_persistent_session_ds_buffer:n_inflight(Inflight);
|
|
|
info(inflight_max, #{inflight := Inflight}) ->
|
|
|
- emqx_persistent_session_ds_inflight:receive_maximum(Inflight);
|
|
|
+ emqx_persistent_session_ds_buffer:receive_maximum(Inflight);
|
|
|
info(retry_interval, #{props := Conf}) ->
|
|
|
maps:get(retry_interval, Conf);
|
|
|
info(mqueue_len, #{inflight := Inflight}) ->
|
|
|
- emqx_persistent_session_ds_inflight:n_buffered(all, Inflight);
|
|
|
+ emqx_persistent_session_ds_buffer:n_buffered(all, Inflight);
|
|
|
info(mqueue_dropped, _Session) ->
|
|
|
0;
|
|
|
%% info(next_pkt_id, #{s := S}) ->
|
|
|
@@ -395,7 +429,7 @@ subscribe(
|
|
|
case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of
|
|
|
{ok, S0, SharedSubS} ->
|
|
|
S = emqx_persistent_session_ds_state:commit(S0),
|
|
|
- {ok, Session#{s => S, shared_sub_s => SharedSubS}};
|
|
|
+ {ok, Session#{s := S, shared_sub_s := SharedSubS}};
|
|
|
Error = {error, _} ->
|
|
|
Error
|
|
|
end;
|
|
|
@@ -407,7 +441,7 @@ subscribe(
|
|
|
case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of
|
|
|
{ok, S1} ->
|
|
|
S = emqx_persistent_session_ds_state:commit(S1),
|
|
|
- {ok, Session#{s => S}};
|
|
|
+ {ok, Session#{s := S}};
|
|
|
Error = {error, _} ->
|
|
|
Error
|
|
|
end.
|
|
|
@@ -419,7 +453,9 @@ subscribe(
|
|
|
{ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
|
|
|
unsubscribe(
|
|
|
#share{} = TopicFilter,
|
|
|
- Session = #{id := SessionId, s := S0, shared_sub_s := SharedSubS0}
|
|
|
+ Session0 = #{
|
|
|
+ id := SessionId, s := S0, shared_sub_s := SharedSubS0, stream_scheduler_s := SchedS0
|
|
|
+ }
|
|
|
) ->
|
|
|
case
|
|
|
emqx_persistent_session_ds_shared_subs:on_unsubscribe(
|
|
|
@@ -427,21 +463,27 @@ unsubscribe(
|
|
|
)
|
|
|
of
|
|
|
{ok, S1, SharedSubS1, #{id := SubId, subopts := SubOpts}} ->
|
|
|
- S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
|
|
|
+ {S2, SchedS} = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(
|
|
|
+ SubId, S1, SchedS0
|
|
|
+ ),
|
|
|
S = emqx_persistent_session_ds_state:commit(S2),
|
|
|
- {ok, Session#{s => S, shared_sub_s => SharedSubS1}, SubOpts};
|
|
|
+ Session = Session0#{s := S, shared_sub_s := SharedSubS1, stream_scheduler_s := SchedS},
|
|
|
+ {ok, Session, SubOpts};
|
|
|
Error = {error, _} ->
|
|
|
Error
|
|
|
end;
|
|
|
unsubscribe(
|
|
|
TopicFilter,
|
|
|
- Session = #{id := SessionId, s := S0}
|
|
|
+ Session0 = #{id := SessionId, s := S0, stream_scheduler_s := SchedS0}
|
|
|
) ->
|
|
|
case emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0) of
|
|
|
{ok, S1, #{id := SubId, subopts := SubOpts}} ->
|
|
|
- S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
|
|
|
+ {S2, SchedS} = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(
|
|
|
+ SubId, S1, SchedS0
|
|
|
+ ),
|
|
|
S = emqx_persistent_session_ds_state:commit(S2),
|
|
|
- {ok, Session#{s => S}, SubOpts};
|
|
|
+ Session = Session0#{s := S, stream_scheduler_s := SchedS},
|
|
|
+ {ok, Session, SubOpts};
|
|
|
Error = {error, _} ->
|
|
|
Error
|
|
|
end.
|
|
|
@@ -477,7 +519,7 @@ publish(
|
|
|
undefined ->
|
|
|
Results = emqx_broker:publish(Msg),
|
|
|
S = emqx_persistent_session_ds_state:put_awaiting_rel(PacketId, Ts, S0),
|
|
|
- {ok, Results, Session#{s => S}};
|
|
|
+ {ok, Results, Session#{s := S}};
|
|
|
_Ts ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
|
|
end;
|
|
|
@@ -530,7 +572,7 @@ do_expire(ClientInfo, Session = #{s := S0, props := Props}) ->
|
|
|
S0,
|
|
|
ExpiredPacketIds
|
|
|
),
|
|
|
- Session#{s => S}.
|
|
|
+ Session#{s := S}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Client -> Broker: PUBACK
|
|
|
@@ -574,7 +616,7 @@ pubrel(PacketId, Session = #{s := S0}) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND};
|
|
|
_TS ->
|
|
|
S = emqx_persistent_session_ds_state:del_awaiting_rel(PacketId, S0),
|
|
|
- {ok, Session#{s => S}}
|
|
|
+ {ok, Session#{s := S}}
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -612,53 +654,63 @@ deliver(ClientInfo, Delivers, Session0) ->
|
|
|
|
|
|
-spec handle_timeout(clientinfo(), _Timeout, session()) ->
|
|
|
{ok, replies(), session()} | {ok, replies(), timeout(), session()}.
|
|
|
-handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
|
|
|
- {Publishes, Session1} =
|
|
|
- case ?IS_REPLAY_ONGOING(Session0) of
|
|
|
- false ->
|
|
|
- drain_buffer(fetch_new_messages(Session0, ClientInfo));
|
|
|
- true ->
|
|
|
- {[], Session0}
|
|
|
- end,
|
|
|
- Timeout =
|
|
|
- case Publishes of
|
|
|
- [] ->
|
|
|
- get_config(ClientInfo, [idle_poll_interval]);
|
|
|
- [_ | _] ->
|
|
|
- 0
|
|
|
- end,
|
|
|
- Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
|
|
|
- {ok, Publishes, Session};
|
|
|
+handle_timeout(_ClientInfo, ?TIMER_PULL, Session0) ->
|
|
|
+ %% Pull circuit loop:
|
|
|
+ ?tp(debug, sessds_pull, #{}),
|
|
|
+ Session1 = Session0#{?TIMER_PULL := undefined},
|
|
|
+ {Publishes, Session} = drain_buffer(Session1),
|
|
|
+ {ok, Publishes, push_now(Session)};
|
|
|
+handle_timeout(ClientInfo, ?TIMER_PUSH, Session0) ->
|
|
|
+ %% Push circuit loop:
|
|
|
+ ?tp(debug, sessds_push, #{}),
|
|
|
+ Session1 = Session0#{?TIMER_PUSH := undefined},
|
|
|
+ #{s := S, stream_scheduler_s := SchedS0, inflight := Inflight, replay := Replay} = Session0,
|
|
|
+ BatchSize = get_config(ClientInfo, [batch_size]),
|
|
|
+ IsFull = emqx_persistent_session_ds_buffer:n_buffered(all, Inflight) >= BatchSize,
|
|
|
+ case ?IS_REPLAY_ONGOING(Replay) orelse IsFull of
|
|
|
+ true ->
|
|
|
+ {ok, [], Session1};
|
|
|
+ false ->
|
|
|
+ Timeout = get_config(ClientInfo, [idle_poll_interval]),
|
|
|
+ PollOpts = #{timeout => Timeout},
|
|
|
+ SchedS = emqx_persistent_session_ds_stream_scheduler:poll(PollOpts, SchedS0, S),
|
|
|
+ {ok, [], Session1#{stream_scheduler_s := SchedS}}
|
|
|
+ end;
|
|
|
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
|
|
|
Session = replay_streams(Session0, ClientInfo),
|
|
|
{ok, [], Session};
|
|
|
-handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
|
|
|
+handle_timeout(
|
|
|
+ ClientInfo,
|
|
|
+ ?TIMER_GET_STREAMS,
|
|
|
+ Session0 = #{s := S0, shared_sub_s := SharedSubS0, stream_scheduler_s := SchedS0}
|
|
|
+) ->
|
|
|
+ ?tp(debug, sessds_renew_streams, #{}),
|
|
|
%% `gc` and `renew_streams` methods may drop unsubscribed streams.
|
|
|
%% Shared subscription handler must have a chance to see unsubscribed streams
|
|
|
%% in the fully replayed state.
|
|
|
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0),
|
|
|
S2 = emqx_persistent_session_ds_subs:gc(S1),
|
|
|
- S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
|
|
|
+ {S3, SchedS} = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2, SchedS0),
|
|
|
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),
|
|
|
Interval = get_config(ClientInfo, [renew_streams_interval]),
|
|
|
- Session = emqx_session:ensure_timer(
|
|
|
+ Session = set_timer(
|
|
|
?TIMER_GET_STREAMS,
|
|
|
Interval,
|
|
|
- Session0#{s => S, shared_sub_s => SharedSubS}
|
|
|
+ Session0#{s := S, shared_sub_s := SharedSubS, stream_scheduler_s := SchedS}
|
|
|
),
|
|
|
- {ok, [], Session};
|
|
|
+ {ok, [], push_now(Session)};
|
|
|
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
|
|
|
S = emqx_persistent_session_ds_state:commit(bump_last_alive(S0)),
|
|
|
- Session = emqx_session:ensure_timer(
|
|
|
+ Session = set_timer(
|
|
|
?TIMER_BUMP_LAST_ALIVE_AT,
|
|
|
bump_interval(),
|
|
|
- Session0#{s => S}
|
|
|
+ Session0#{s := S}
|
|
|
),
|
|
|
{ok, [], Session};
|
|
|
handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S0}) ->
|
|
|
S = emqx_persistent_session_ds_state:commit(S0),
|
|
|
From ! Ref,
|
|
|
- {ok, [], Session#{s => S}};
|
|
|
+ {ok, [], Session#{s := S}};
|
|
|
handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
|
|
|
expire(ClientInfo, Session);
|
|
|
handle_timeout(_ClientInfo, Timeout, Session) ->
|
|
|
@@ -674,7 +726,9 @@ handle_info(
|
|
|
?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := SharedSubS0}, _ClientInfo
|
|
|
) ->
|
|
|
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_info(S0, SharedSubS0, Msg),
|
|
|
- Session#{s => S, shared_sub_s => SharedSubS};
|
|
|
+ Session#{s := S, shared_sub_s := SharedSubS};
|
|
|
+handle_info(AsyncReply = #poll_reply{}, Session, ClientInfo) ->
|
|
|
+ push_now(handle_ds_reply(AsyncReply, Session, ClientInfo));
|
|
|
handle_info(Msg, Session, _ClientInfo) ->
|
|
|
?SLOG(warning, #{msg => emqx_session_ds_unknown_message, message => Msg}),
|
|
|
Session.
|
|
|
@@ -698,7 +752,7 @@ bump_last_alive(S0) ->
|
|
|
{ok, replies(), session()}.
|
|
|
replay(ClientInfo, [], Session0 = #{s := S0}) ->
|
|
|
Streams = emqx_persistent_session_ds_stream_scheduler:find_replay_streams(S0),
|
|
|
- Session = replay_streams(Session0#{replay => Streams}, ClientInfo),
|
|
|
+ Session = replay_streams(Session0#{replay := Streams}, ClientInfo),
|
|
|
{ok, [], Session}.
|
|
|
|
|
|
replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) ->
|
|
|
@@ -714,22 +768,24 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) -
|
|
|
class => recoverable,
|
|
|
retry_in_ms => RetryTimeout
|
|
|
}),
|
|
|
- emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0);
|
|
|
+ set_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0);
|
|
|
{error, unrecoverable, Reason} ->
|
|
|
Session1 = skip_batch(StreamKey, Srs0, Session0, ClientInfo, Reason),
|
|
|
replay_streams(Session1#{replay := Rest}, ClientInfo)
|
|
|
end;
|
|
|
-replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
|
|
|
- Session = maps:remove(replay, Session0),
|
|
|
+replay_streams(Session = #{replay := []}, _ClientInfo) ->
|
|
|
%% Note: we filled the buffer with the historical messages, and
|
|
|
%% from now on we'll rely on the normal inflight/flow control
|
|
|
%% mechanisms to replay them:
|
|
|
- pull_now(Session).
|
|
|
+ pull_now(Session#{replay := undefined}).
|
|
|
|
|
|
-spec replay_batch(
|
|
|
- emqx_persistent_session_ds_state:stream_key(), stream_state(), session(), clientinfo()
|
|
|
+ emqx_persistent_session_ds_stream_scheduler:stream_key(),
|
|
|
+ stream_state(),
|
|
|
+ session(),
|
|
|
+ clientinfo()
|
|
|
) ->
|
|
|
- session() | emqx_ds:error(_).
|
|
|
+ {ok, stream_state(), session()} | emqx_ds:error(_).
|
|
|
replay_batch(StreamKey, Srs0, Session0, ClientInfo) ->
|
|
|
#srs{it_begin = ItBegin, batch_size = BatchSize} = Srs0,
|
|
|
FetchResult = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize),
|
|
|
@@ -794,7 +850,7 @@ disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo
|
|
|
end,
|
|
|
{S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
|
|
|
S = emqx_persistent_session_ds_state:commit(S4),
|
|
|
- {shutdown, Session#{s => S, shared_sub_s => SharedSubS}}.
|
|
|
+ {shutdown, Session#{s := S, shared_sub_s := SharedSubS}}.
|
|
|
|
|
|
-spec terminate(Reason :: term(), session()) -> ok.
|
|
|
terminate(_Reason, Session = #{id := Id, s := S}) ->
|
|
|
@@ -913,15 +969,23 @@ session_open(
|
|
|
S6, shared_sub_opts(SessionId)
|
|
|
),
|
|
|
S = emqx_persistent_session_ds_state:commit(S7),
|
|
|
- Inflight = emqx_persistent_session_ds_inflight:new(
|
|
|
+ Inflight = emqx_persistent_session_ds_buffer:new(
|
|
|
receive_maximum(NewConnInfo)
|
|
|
),
|
|
|
+ SSS = emqx_persistent_session_ds_stream_scheduler:init(S),
|
|
|
#{
|
|
|
id => SessionId,
|
|
|
s => S,
|
|
|
shared_sub_s => SharedSubS,
|
|
|
inflight => Inflight,
|
|
|
- props => #{}
|
|
|
+ props => #{},
|
|
|
+ stream_scheduler_s => SSS,
|
|
|
+ replay => undefined,
|
|
|
+ ?TIMER_PULL => undefined,
|
|
|
+ ?TIMER_PUSH => undefined,
|
|
|
+ ?TIMER_GET_STREAMS => undefined,
|
|
|
+ ?TIMER_BUMP_LAST_ALIVE_AT => undefined,
|
|
|
+ ?TIMER_RETRY_REPLAY => undefined
|
|
|
}
|
|
|
end;
|
|
|
undefined ->
|
|
|
@@ -969,7 +1033,14 @@ session_ensure_new(
|
|
|
props => Conf,
|
|
|
s => S,
|
|
|
shared_sub_s => emqx_persistent_session_ds_shared_subs:new(shared_sub_opts(Id)),
|
|
|
- inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
|
|
|
+ inflight => emqx_persistent_session_ds_buffer:new(receive_maximum(ConnInfo)),
|
|
|
+ stream_scheduler_s => emqx_persistent_session_ds_stream_scheduler:init(S),
|
|
|
+ replay => undefined,
|
|
|
+ ?TIMER_PULL => undefined,
|
|
|
+ ?TIMER_PUSH => undefined,
|
|
|
+ ?TIMER_GET_STREAMS => undefined,
|
|
|
+ ?TIMER_BUMP_LAST_ALIVE_AT => undefined,
|
|
|
+ ?TIMER_RETRY_REPLAY => undefined
|
|
|
}.
|
|
|
|
|
|
%% @doc Called when a client reconnects with `clean session=true' or
|
|
|
@@ -1017,102 +1088,58 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
|
|
|
%% Normal replay:
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
|
|
|
- {S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
|
|
|
- Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1},
|
|
|
- LFS = maps:get(last_fetched_stream, Session1, beginning),
|
|
|
- ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
|
|
|
- BatchSize = get_config(ClientInfo, [batch_size]),
|
|
|
- Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo),
|
|
|
- Session2#{shared_sub_s => SharedSubS1}.
|
|
|
-
|
|
|
-fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
|
|
|
- #{inflight := Inflight} = Session0,
|
|
|
- case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
|
|
|
- true ->
|
|
|
- %% Buffer is full:
|
|
|
- Session0;
|
|
|
- false ->
|
|
|
- case emqx_persistent_session_ds_stream_scheduler:next_stream(ItStream0) of
|
|
|
- {StreamKey, Srs, ItStream} ->
|
|
|
- Session1 = new_batch(StreamKey, Srs, BatchSize, Session0, ClientInfo),
|
|
|
- Session = Session1#{last_fetched_stream => StreamKey},
|
|
|
- fetch_new_messages(ItStream, BatchSize, Session, ClientInfo);
|
|
|
- none ->
|
|
|
- Session0
|
|
|
+push_now(Session) ->
|
|
|
+ ensure_timer(?TIMER_PUSH, 0, Session).
|
|
|
+
|
|
|
+handle_ds_reply(AsyncReply, Session0 = #{s := S0, stream_scheduler_s := SchedS0}, ClientInfo) ->
|
|
|
+ case emqx_persistent_session_ds_stream_scheduler:on_ds_reply(AsyncReply, S0, SchedS0) of
|
|
|
+ {undefined, SchedS} ->
|
|
|
+ Session0#{stream_scheduler_s := SchedS};
|
|
|
+ {{StreamKey, ItBegin, FetchResult}, SchedS} ->
|
|
|
+ Session1 = Session0#{stream_scheduler_s := SchedS},
|
|
|
+ case enqueue_batch(false, Session1, ClientInfo, StreamKey, ItBegin, FetchResult) of
|
|
|
+ {ignore, _, Session} ->
|
|
|
+ Session;
|
|
|
+ {ok, Srs, Session = #{s := S1}} ->
|
|
|
+ S2 = emqx_persistent_session_ds_state:put_seqno(
|
|
|
+ ?next(?QOS_1),
|
|
|
+ Srs#srs.last_seqno_qos1,
|
|
|
+ S1
|
|
|
+ ),
|
|
|
+ S3 = emqx_persistent_session_ds_state:put_seqno(
|
|
|
+ ?next(?QOS_2),
|
|
|
+ Srs#srs.last_seqno_qos2,
|
|
|
+ S2
|
|
|
+ ),
|
|
|
+ S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S3),
|
|
|
+ pull_now(Session#{s := S});
|
|
|
+ {{error, recoverable, Reason}, _Srs, Session} ->
|
|
|
+ ?SLOG(debug, #{
|
|
|
+ msg => "failed_to_fetch_batch",
|
|
|
+ stream => StreamKey,
|
|
|
+ reason => Reason,
|
|
|
+ class => recoverable
|
|
|
+ }),
|
|
|
+ Session;
|
|
|
+ {{error, unrecoverable, Reason}, Srs, Session} ->
|
|
|
+ skip_batch(StreamKey, Srs, Session, ClientInfo, Reason)
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
|
|
|
- Pending = fetch(false, StreamKey, Srs0, BatchSize),
|
|
|
- case enqueue_batch(Session0, ClientInfo, Pending, receive_pending(Pending)) of
|
|
|
- {ok, Srs, Session} ->
|
|
|
- S1 = emqx_persistent_session_ds_state:put_seqno(
|
|
|
- ?next(?QOS_1),
|
|
|
- Srs#srs.last_seqno_qos1,
|
|
|
- S0
|
|
|
- ),
|
|
|
- S2 = emqx_persistent_session_ds_state:put_seqno(
|
|
|
- ?next(?QOS_2),
|
|
|
- Srs#srs.last_seqno_qos2,
|
|
|
- S1
|
|
|
- ),
|
|
|
- S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
|
|
|
- Session#{s => S};
|
|
|
- {error, recoverable, Reason} ->
|
|
|
- ?SLOG(debug, #{
|
|
|
- msg => "failed_to_fetch_batch",
|
|
|
- stream => StreamKey,
|
|
|
- reason => Reason,
|
|
|
- class => recoverable
|
|
|
- }),
|
|
|
- Session0;
|
|
|
- {error, unrecoverable, Reason} ->
|
|
|
- skip_batch(StreamKey, Srs0, Session0, ClientInfo, Reason)
|
|
|
- end.
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Generic functions for fetching messages (during replay or normal
|
|
|
%% operation):
|
|
|
%% --------------------------------------------------------------------
|
|
|
|
|
|
-fetch(IsReplay, StreamKey, Srs0, DefaultBatchSize) ->
|
|
|
- case IsReplay of
|
|
|
- true ->
|
|
|
- %% When we do replay we must use the same starting point
|
|
|
- %% and batch size as initially:
|
|
|
- BatchSize = Srs0#srs.batch_size,
|
|
|
- ItBegin = Srs0#srs.it_begin;
|
|
|
- false ->
|
|
|
- BatchSize = DefaultBatchSize,
|
|
|
- ItBegin = Srs0#srs.it_end
|
|
|
- end,
|
|
|
- {ok, Ref} = emqx_ds:anext(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize),
|
|
|
- #pending_next{
|
|
|
- ref = Ref,
|
|
|
- is_replay = IsReplay,
|
|
|
- it_begin = ItBegin,
|
|
|
- stream_key = StreamKey
|
|
|
- }.
|
|
|
-
|
|
|
-receive_pending(#pending_next{ref = Ref}) ->
|
|
|
- receive
|
|
|
- #ds_async_result{ref = Ref, data = Data} -> Data
|
|
|
- end.
|
|
|
-
|
|
|
-enqueue_batch(Session, ClientInfo, Pending, FetchResult) ->
|
|
|
- #pending_next{is_replay = IsReplay, stream_key = StreamKey, it_begin = ItBegin} = Pending,
|
|
|
- enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, ItBegin, FetchResult).
|
|
|
-
|
|
|
-spec enqueue_batch(
|
|
|
boolean(),
|
|
|
session(),
|
|
|
clientinfo(),
|
|
|
- emqx_persistent_session_ds_state:stream_key(),
|
|
|
+ emqx_persistent_session_ds_stream_scheduler:stream_key(),
|
|
|
emqx_ds:iterator(),
|
|
|
emqx_ds:next_result()
|
|
|
) ->
|
|
|
- {ok | emqx_ds:error(), #srs{}, session()}
|
|
|
+ {ok | emqx_ds:error(_), stream_state(), session()}
|
|
|
| {ignore, undefined, session()}.
|
|
|
enqueue_batch(IsReplay, Session = #{s := S}, ClientInfo, StreamKey, ItBegin, FetchResult) ->
|
|
|
case emqx_persistent_session_ds_state:get_stream(StreamKey, S) of
|
|
|
@@ -1132,38 +1159,45 @@ enqueue_batch(IsReplay, Session = #{s := S}, ClientInfo, StreamKey, ItBegin, Fet
|
|
|
}),
|
|
|
{ignore, undefined, Session};
|
|
|
Srs ->
|
|
|
- do_enqueue_batch(IsReplay, Session, ClientInfo, Srs, ItBegin, FetchResult)
|
|
|
+ do_enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, Srs, ItBegin, FetchResult)
|
|
|
end.
|
|
|
|
|
|
-do_enqueue_batch(IsReplay, Session, ClientInfo, Srs0, ItBegin, FetchResult) ->
|
|
|
- #{s := S0, inflight := Inflight0} = Session,
|
|
|
+do_enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, Srs0, ItBegin, FetchResult) ->
|
|
|
+ #{s := S0, inflight := Inflight0, stream_scheduler_s := SchedS0} = Session,
|
|
|
#srs{sub_state_id = SubStateId} = Srs0,
|
|
|
+ case IsReplay of
|
|
|
+ false ->
|
|
|
+ %% Normally we assign a new set of sequence
|
|
|
+ %% numbers to messages in the batch:
|
|
|
+ FirstSeqnoQos1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
|
|
|
+ FirstSeqnoQos2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0);
|
|
|
+ true ->
|
|
|
+ %% During replay we reuse the original sequence
|
|
|
+ %% numbers:
|
|
|
+ #srs{
|
|
|
+ first_seqno_qos1 = FirstSeqnoQos1,
|
|
|
+ first_seqno_qos2 = FirstSeqnoQos2
|
|
|
+ } = Srs0
|
|
|
+ end,
|
|
|
case FetchResult of
|
|
|
{error, _, _} = Error ->
|
|
|
{Error, Srs0, Session};
|
|
|
{ok, end_of_stream} ->
|
|
|
%% No new messages; just update the end iterator:
|
|
|
Srs = Srs0#srs{
|
|
|
+ first_seqno_qos1 = FirstSeqnoQos1,
|
|
|
+ first_seqno_qos2 = FirstSeqnoQos2,
|
|
|
+ last_seqno_qos1 = FirstSeqnoQos1,
|
|
|
+ last_seqno_qos2 = FirstSeqnoQos2,
|
|
|
it_begin = ItBegin,
|
|
|
it_end = end_of_stream,
|
|
|
batch_size = 0
|
|
|
},
|
|
|
- {ok, Srs, Session};
|
|
|
+ SchedS = emqx_persistent_session_ds_stream_scheduler:on_enqueue(
|
|
|
+ IsReplay, StreamKey, Srs, S0, SchedS0
|
|
|
+ ),
|
|
|
+ {ok, Srs, Session#{stream_scheduler_s := SchedS}};
|
|
|
{ok, ItEnd, Messages} ->
|
|
|
- case IsReplay of
|
|
|
- false ->
|
|
|
- %% Normally we assign a new set of sequence
|
|
|
- %% numbers to messages in the batch:
|
|
|
- FirstSeqnoQos1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
|
|
|
- FirstSeqnoQos2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0);
|
|
|
- true ->
|
|
|
- %% During replay we reuse the original sequence
|
|
|
- %% numbers:
|
|
|
- #srs{
|
|
|
- first_seqno_qos1 = FirstSeqnoQos1,
|
|
|
- first_seqno_qos2 = FirstSeqnoQos2
|
|
|
- } = Srs0
|
|
|
- end,
|
|
|
SubState = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S0),
|
|
|
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
|
|
|
IsReplay,
|
|
|
@@ -1184,7 +1218,10 @@ do_enqueue_batch(IsReplay, Session, ClientInfo, Srs0, ItBegin, FetchResult) ->
|
|
|
last_seqno_qos1 = LastSeqnoQos1,
|
|
|
last_seqno_qos2 = LastSeqnoQos2
|
|
|
},
|
|
|
- {ok, Srs, Session#{inflight := Inflight}}
|
|
|
+ SchedS = emqx_persistent_session_ds_stream_scheduler:on_enqueue(
|
|
|
+ IsReplay, StreamKey, Srs, S0, SchedS0
|
|
|
+ ),
|
|
|
+ {ok, Srs, Session#{inflight := Inflight, stream_scheduler_s := SchedS}}
|
|
|
end.
|
|
|
|
|
|
%% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
|
|
|
@@ -1235,7 +1272,7 @@ process_batch(
|
|
|
%% We ignore QoS 0 messages during replay:
|
|
|
Acc;
|
|
|
?QOS_0 ->
|
|
|
- emqx_persistent_session_ds_inflight:push({undefined, Msg}, Acc);
|
|
|
+ emqx_persistent_session_ds_buffer:push({undefined, Msg}, Acc);
|
|
|
?QOS_1 when SeqNoQos1 =< Comm1 ->
|
|
|
%% QoS1 message has been acked by the client, ignore:
|
|
|
Acc;
|
|
|
@@ -1243,15 +1280,15 @@ process_batch(
|
|
|
%% QoS1 message has been sent but not
|
|
|
%% acked. Retransmit:
|
|
|
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
|
|
- emqx_persistent_session_ds_inflight:push({SeqNoQos1, Msg1}, Acc);
|
|
|
+ emqx_persistent_session_ds_buffer:push({SeqNoQos1, Msg1}, Acc);
|
|
|
?QOS_1 ->
|
|
|
- emqx_persistent_session_ds_inflight:push({SeqNoQos1, Msg}, Acc);
|
|
|
+ emqx_persistent_session_ds_buffer:push({SeqNoQos1, Msg}, Acc);
|
|
|
?QOS_2 when SeqNoQos2 =< Comm2 ->
|
|
|
%% QoS2 message has been PUBCOMP'ed by the client, ignore:
|
|
|
Acc;
|
|
|
?QOS_2 when SeqNoQos2 =< Rec ->
|
|
|
%% QoS2 message has been PUBREC'ed by the client, resend PUBREL:
|
|
|
- emqx_persistent_session_ds_inflight:push({pubrel, SeqNoQos2}, Acc);
|
|
|
+ emqx_persistent_session_ds_buffer:push({pubrel, SeqNoQos2}, Acc);
|
|
|
?QOS_2 when SeqNoQos2 =< Dup2 ->
|
|
|
%% QoS2 message has been sent, but we haven't received PUBREC.
|
|
|
%%
|
|
|
@@ -1259,9 +1296,9 @@ process_batch(
|
|
|
%% DUP flag is never set for QoS2 messages? We
|
|
|
%% do so for mem sessions, though.
|
|
|
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
|
|
- emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg1}, Acc);
|
|
|
+ emqx_persistent_session_ds_buffer:push({SeqNoQos2, Msg1}, Acc);
|
|
|
?QOS_2 ->
|
|
|
- emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg}, Acc)
|
|
|
+ emqx_persistent_session_ds_buffer:push({SeqNoQos2, Msg}, Acc)
|
|
|
end,
|
|
|
SeqNoQos1,
|
|
|
SeqNoQos2
|
|
|
@@ -1293,17 +1330,17 @@ enqueue_transient(
|
|
|
case Qos of
|
|
|
?QOS_0 ->
|
|
|
S = S0,
|
|
|
- Inflight = emqx_persistent_session_ds_inflight:push({undefined, Msg}, Inflight0);
|
|
|
+ Inflight = emqx_persistent_session_ds_buffer:push({undefined, Msg}, Inflight0);
|
|
|
QoS when QoS =:= ?QOS_1; QoS =:= ?QOS_2 ->
|
|
|
SeqNo = inc_seqno(
|
|
|
QoS, emqx_persistent_session_ds_state:get_seqno(?next(QoS), S0)
|
|
|
),
|
|
|
S = emqx_persistent_session_ds_state:put_seqno(?next(QoS), SeqNo, S0),
|
|
|
- Inflight = emqx_persistent_session_ds_inflight:push({SeqNo, Msg}, Inflight0)
|
|
|
+ Inflight = emqx_persistent_session_ds_buffer:push({SeqNo, Msg}, Inflight0)
|
|
|
end,
|
|
|
Session#{
|
|
|
- inflight => Inflight,
|
|
|
- s => S
|
|
|
+ inflight := Inflight,
|
|
|
+ s := S
|
|
|
}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -1312,10 +1349,10 @@ enqueue_transient(
|
|
|
|
|
|
drain_buffer(Session = #{inflight := Inflight0, s := S0}) ->
|
|
|
{Publishes, Inflight, S} = do_drain_buffer(Inflight0, S0, []),
|
|
|
- {Publishes, Session#{inflight => Inflight, s := S}}.
|
|
|
+ {Publishes, Session#{inflight := Inflight, s := S}}.
|
|
|
|
|
|
do_drain_buffer(Inflight0, S0, Acc) ->
|
|
|
- case emqx_persistent_session_ds_inflight:pop(Inflight0) of
|
|
|
+ case emqx_persistent_session_ds_buffer:pop(Inflight0) of
|
|
|
undefined ->
|
|
|
{lists:reverse(Acc), Inflight0, S0};
|
|
|
{{pubrel, SeqNo}, Inflight} ->
|
|
|
@@ -1338,13 +1375,20 @@ do_drain_buffer(Inflight0, S0, Acc) ->
|
|
|
%% effects. Add `CBM:init' callback to the session behavior?
|
|
|
-spec ensure_timers(session()) -> session().
|
|
|
ensure_timers(Session0) ->
|
|
|
- Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0),
|
|
|
- Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
|
|
|
- emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
|
|
|
+ Session1 = set_timer(?TIMER_GET_STREAMS, 100, Session0),
|
|
|
+ set_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session1).
|
|
|
|
|
|
+%% This function triggers sending buffered packets to the client
|
|
|
+%% (provided there is something to send and the number of in-flight
|
|
|
+%% packets is less than `Recieve-Maximum'). Normally, pull is
|
|
|
+%% triggered when:
|
|
|
+%%
|
|
|
+%% - New messages (durable or transient) are enqueued
|
|
|
+%%
|
|
|
+%% - When the client releases a packet ID (via PUBACK or PUBCOMP)
|
|
|
-spec pull_now(session()) -> session().
|
|
|
pull_now(Session) ->
|
|
|
- emqx_session:reset_timer(?TIMER_PULL, 0, Session).
|
|
|
+ ensure_timer(?TIMER_PULL, 0, Session).
|
|
|
|
|
|
-spec receive_maximum(conninfo()) -> pos_integer().
|
|
|
receive_maximum(ConnInfo) ->
|
|
|
@@ -1411,26 +1455,44 @@ maybe_set_offline_info(S, Id) ->
|
|
|
|
|
|
-spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
|
|
|
{ok, emqx_types:message(), session()} | {error, _}.
|
|
|
-update_seqno(Track, PacketId, Session = #{id := SessionId, s := S, inflight := Inflight0}) ->
|
|
|
+update_seqno(
|
|
|
+ Track,
|
|
|
+ PacketId,
|
|
|
+ Session = #{id := SessionId, s := S, inflight := Inflight0, stream_scheduler_s := SchedS0}
|
|
|
+) ->
|
|
|
SeqNo = packet_id_to_seqno(PacketId, S),
|
|
|
case Track of
|
|
|
puback ->
|
|
|
SeqNoKey = ?committed(?QOS_1),
|
|
|
- Result = emqx_persistent_session_ds_inflight:puback(SeqNo, Inflight0);
|
|
|
+ Result = emqx_persistent_session_ds_buffer:puback(SeqNo, Inflight0);
|
|
|
pubrec ->
|
|
|
SeqNoKey = ?rec,
|
|
|
- Result = emqx_persistent_session_ds_inflight:pubrec(SeqNo, Inflight0);
|
|
|
+ Result = emqx_persistent_session_ds_buffer:pubrec(SeqNo, Inflight0);
|
|
|
pubcomp ->
|
|
|
SeqNoKey = ?committed(?QOS_2),
|
|
|
- Result = emqx_persistent_session_ds_inflight:pubcomp(SeqNo, Inflight0)
|
|
|
+ Result = emqx_persistent_session_ds_buffer:pubcomp(SeqNo, Inflight0)
|
|
|
end,
|
|
|
case Result of
|
|
|
{ok, Inflight} ->
|
|
|
%% TODO: we pass a bogus message into the hook:
|
|
|
Msg = emqx_message:make(SessionId, <<>>, <<>>),
|
|
|
+ SchedS =
|
|
|
+ case Track of
|
|
|
+ puback ->
|
|
|
+ emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
|
|
|
+ ?QOS_1, SeqNo, SchedS0
|
|
|
+ );
|
|
|
+ pubcomp ->
|
|
|
+ emqx_persistent_session_ds_stream_scheduler:on_seqno_release(
|
|
|
+ ?QOS_2, SeqNo, SchedS0
|
|
|
+ );
|
|
|
+ _ ->
|
|
|
+ SchedS0
|
|
|
+ end,
|
|
|
{ok, Msg, Session#{
|
|
|
- s => emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S),
|
|
|
- inflight => Inflight
|
|
|
+ s := emqx_persistent_session_ds_state:put_seqno(SeqNoKey, SeqNo, S),
|
|
|
+ inflight := Inflight,
|
|
|
+ stream_scheduler_s := SchedS
|
|
|
}};
|
|
|
{error, Expected} ->
|
|
|
?SLOG(warning, #{
|
|
|
@@ -1539,6 +1601,20 @@ maybe_set_will_message_timer(#{id := SessionId, s := S}) ->
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
+-spec ensure_timer(timer(), non_neg_integer(), session()) -> session().
|
|
|
+ensure_timer(Timer, Time, Session) ->
|
|
|
+ case Session of
|
|
|
+ #{Timer := undefined} ->
|
|
|
+ set_timer(Timer, Time, Session);
|
|
|
+ #{Timer := TRef} when is_reference(TRef) ->
|
|
|
+ Session
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec set_timer(timer(), non_neg_integer(), session()) -> session().
|
|
|
+set_timer(Timer, Time, Session) ->
|
|
|
+ TRef = emqx_utils:start_timer(Time, {emqx_session, Timer}),
|
|
|
+ Session#{Timer := TRef}.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Tests
|
|
|
%%--------------------------------------------------------------------
|