|
|
@@ -47,6 +47,8 @@
|
|
|
-include("logger.hrl").
|
|
|
-include("types.hrl").
|
|
|
|
|
|
+-logger_header("[Session]").
|
|
|
+
|
|
|
-export([start_link/1]).
|
|
|
|
|
|
-export([ info/1
|
|
|
@@ -399,11 +401,11 @@ handle_call(stats, _From, State) ->
|
|
|
reply(stats(State), State);
|
|
|
|
|
|
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
|
|
|
- ?LOG(warning, "[Session] Discarded by ~p", [ByPid]),
|
|
|
+ ?LOG(warning, "Discarded by ~p", [ByPid]),
|
|
|
{stop, {shutdown, discarded}, ok, State};
|
|
|
|
|
|
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
|
|
- ?LOG(warning, "[Session] Conn ~p is discarded by ~p", [ConnPid, ByPid]),
|
|
|
+ ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
|
|
|
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
|
|
|
{stop, {shutdown, discarded}, ok, State};
|
|
|
|
|
|
@@ -423,7 +425,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
|
|
|
{ok, ensure_stats_timer(ensure_await_rel_timer(State1))}
|
|
|
end;
|
|
|
true ->
|
|
|
- ?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
|
|
|
+ ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
|
|
|
ok = emqx_metrics:inc('messages.qos2.dropped'),
|
|
|
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
|
|
|
end);
|
|
|
@@ -435,7 +437,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
|
|
|
true ->
|
|
|
{ok, ensure_stats_timer(acked(pubrec, PacketId, State))};
|
|
|
false ->
|
|
|
- ?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]),
|
|
|
+ ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId]),
|
|
|
ok = emqx_metrics:inc('packets.pubrec.missed'),
|
|
|
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
|
|
end);
|
|
|
@@ -447,7 +449,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
|
|
|
{_Ts, AwaitingRel1} ->
|
|
|
{ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})};
|
|
|
error ->
|
|
|
- ?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]),
|
|
|
+ ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
|
|
|
ok = emqx_metrics:inc('packets.pubrel.missed'),
|
|
|
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
|
|
end);
|
|
|
@@ -456,7 +458,7 @@ handle_call(close, _From, State) ->
|
|
|
{stop, normal, ok, State};
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
- ?LOG(error, "[Session] Unexpected call: ~p", [Req]),
|
|
|
+ ?LOG(error, "Unexpected call: ~p", [Req]),
|
|
|
{reply, ignored, State}.
|
|
|
|
|
|
%% SUBSCRIBE:
|
|
|
@@ -497,7 +499,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
|
|
|
true ->
|
|
|
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
|
|
|
false ->
|
|
|
- ?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]),
|
|
|
+ ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
|
|
|
ok = emqx_metrics:inc('packets.puback.missed'),
|
|
|
State
|
|
|
end);
|
|
|
@@ -509,7 +511,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
|
|
|
true ->
|
|
|
ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State)));
|
|
|
false ->
|
|
|
- ?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]),
|
|
|
+ ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
|
|
|
ok = emqx_metrics:inc('packets.pubcomp.missed'),
|
|
|
State
|
|
|
end);
|
|
|
@@ -527,14 +529,14 @@ handle_cast({resume, #{conn_pid := ConnPid,
|
|
|
expiry_timer = ExpireTimer,
|
|
|
will_delay_timer = WillDelayTimer}) ->
|
|
|
|
|
|
- ?LOG(info, "[Session] Resumed by connection ~p ", [ConnPid]),
|
|
|
+ ?LOG(info, "Resumed by connection ~p ", [ConnPid]),
|
|
|
|
|
|
%% Cancel Timers
|
|
|
lists:foreach(fun emqx_misc:cancel_timer/1,
|
|
|
[RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
|
|
|
|
|
|
case kick(ClientId, OldConnPid, ConnPid) of
|
|
|
- ok -> ?LOG(warning, "[Session] Connection ~p kickout ~p", [ConnPid, OldConnPid]);
|
|
|
+ ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid]);
|
|
|
ignore -> ok
|
|
|
end,
|
|
|
|
|
|
@@ -565,7 +567,7 @@ handle_cast({update_expiry_interval, Interval}, State) ->
|
|
|
{noreply, State#state{expiry_interval = Interval}};
|
|
|
|
|
|
handle_cast(Msg, State) ->
|
|
|
- ?LOG(error, "[Session] Unexpected cast: ~p", [Msg]),
|
|
|
+ ?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
|
|
|
@@ -600,12 +602,12 @@ handle_info({timeout, Timer, emit_stats},
|
|
|
GcState1 = emqx_gc:reset(GcState),
|
|
|
{noreply, NewState#state{gc_state = GcState1}, hibernate};
|
|
|
{shutdown, Reason} ->
|
|
|
- ?LOG(warning, "[Session] Shutdown exceptionally due to ~p", [Reason]),
|
|
|
+ ?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]),
|
|
|
shutdown(Reason, NewState)
|
|
|
end;
|
|
|
|
|
|
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
|
|
- ?LOG(info, "[Session] Expired, shutdown now.", []),
|
|
|
+ ?LOG(info, "Expired, shutdown now.", []),
|
|
|
shutdown(expired, State);
|
|
|
|
|
|
handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
|
|
|
@@ -640,12 +642,12 @@ handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
|
|
|
{noreply, State#state{old_conn_pid = undefined}};
|
|
|
|
|
|
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
|
|
- ?LOG(error, "[Session] Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
|
|
|
+ ?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
|
|
|
[ConnPid, Pid, Reason]),
|
|
|
{noreply, State};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
- ?LOG(error, "[Session] Unexpected info: ~p", [Info]),
|
|
|
+ ?LOG(error, "Unexpected info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
terminate(Reason, #state{will_msg = WillMsg,
|
|
|
@@ -771,7 +773,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
|
|
|
case (timer:now_diff(Now, Ts) div 1000) of
|
|
|
Age when Age >= Timeout ->
|
|
|
ok = emqx_metrics:inc('messages.qos2.expired'),
|
|
|
- ?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
|
|
|
+ ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
|
|
|
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
|
|
Age ->
|
|
|
ensure_await_rel_timer(Timeout - max(0, Age), State)
|
|
|
@@ -981,7 +983,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username
|
|
|
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
|
|
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
|
|
none ->
|
|
|
- ?LOG(warning, "[Session] Duplicated PUBACK PacketId ~w", [PacketId]),
|
|
|
+ ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]),
|
|
|
State
|
|
|
end;
|
|
|
|
|
|
@@ -991,10 +993,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username
|
|
|
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
|
|
|
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
|
|
|
{value, {pubrel, PacketId, _Ts}} ->
|
|
|
- ?LOG(warning, "[Session] Duplicated PUBREC PacketId ~w", [PacketId]),
|
|
|
+ ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]),
|
|
|
State;
|
|
|
none ->
|
|
|
- ?LOG(warning, "[Session] Unexpected PUBREC PacketId ~w", [PacketId]),
|
|
|
+ ?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId]),
|
|
|
State
|
|
|
end;
|
|
|
|