|
|
@@ -117,6 +117,7 @@
|
|
|
-type clientinfo() :: emqx_types:clientinfo().
|
|
|
-type conninfo() :: emqx_session:conninfo().
|
|
|
-type replies() :: emqx_session:replies().
|
|
|
+-type timer() :: pull | get_streams | bump_last_alive_at.
|
|
|
|
|
|
-define(STATS_KEYS, [
|
|
|
subscriptions_cnt,
|
|
|
@@ -396,6 +397,11 @@ handle_timeout(
|
|
|
handle_timeout(_ClientInfo, get_streams, Session) ->
|
|
|
renew_streams(Session),
|
|
|
ensure_timer(get_streams),
|
|
|
+ {ok, [], Session};
|
|
|
+handle_timeout(_ClientInfo, bump_last_alive_at, Session0) ->
|
|
|
+ NowMS = now_ms(),
|
|
|
+ Session = session_set_last_alive_at_trans(Session0, NowMS),
|
|
|
+ ensure_timer(bump_last_alive_at),
|
|
|
{ok, [], Session}.
|
|
|
|
|
|
-spec replay(clientinfo(), [], session()) ->
|
|
|
@@ -553,7 +559,7 @@ session_open(SessionId, NewConnInfo) ->
|
|
|
false ->
|
|
|
%% new connection being established
|
|
|
Record1 = Record0#session{conninfo = NewConnInfo},
|
|
|
- Record = session_set_last_alive_at(Record1, never),
|
|
|
+ Record = session_set_last_alive_at(Record1, NowMS),
|
|
|
Session = export_session(Record),
|
|
|
DSSubs = session_read_subscriptions(SessionId),
|
|
|
Subscriptions = export_subscriptions(DSSubs),
|
|
|
@@ -592,6 +598,10 @@ session_create(SessionId, ConnInfo, Props) ->
|
|
|
ok = mnesia:write(?SESSION_TAB, Session, write),
|
|
|
Session.
|
|
|
|
|
|
+session_set_last_alive_at_trans(Session, LastAliveAt) ->
|
|
|
+ #{conninfo := ConnInfo} = Session,
|
|
|
+ session_set_last_alive_at_trans(Session, ConnInfo, LastAliveAt).
|
|
|
+
|
|
|
session_set_last_alive_at_trans(Session, NewConnInfo, LastAliveAt) ->
|
|
|
#{id := SessionId} = Session,
|
|
|
transaction(fun() ->
|
|
|
@@ -863,13 +873,17 @@ export_record(_, _, [], Acc) ->
|
|
|
%% effects. Add `CBM:init' callback to the session behavior?
|
|
|
ensure_timers() ->
|
|
|
ensure_timer(pull),
|
|
|
- ensure_timer(get_streams).
|
|
|
+ ensure_timer(get_streams),
|
|
|
+ ensure_timer(bump_last_alive_at).
|
|
|
|
|
|
--spec ensure_timer(pull | get_streams) -> ok.
|
|
|
+-spec ensure_timer(timer()) -> ok.
|
|
|
+ensure_timer(bump_last_alive_at = Type) ->
|
|
|
+ BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
|
|
|
+ ensure_timer(Type, BumpInterval);
|
|
|
ensure_timer(Type) ->
|
|
|
ensure_timer(Type, 100).
|
|
|
|
|
|
--spec ensure_timer(pull | get_streams, non_neg_integer()) -> ok.
|
|
|
+-spec ensure_timer(timer(), non_neg_integer()) -> ok.
|
|
|
ensure_timer(Type, Timeout) ->
|
|
|
_ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
|
|
|
ok.
|