|
|
@@ -1,5 +1,5 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
%%
|
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
%% you may not use this file except in compliance with the License.
|
|
|
@@ -356,7 +356,6 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
|
|
NChannel = Channel#channel{session = Session},
|
|
|
handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
|
|
|
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
|
|
- %%TODO: improve later.
|
|
|
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
|
|
NChannel = Channel#channel{session = Session,
|
|
|
resuming = true,
|
|
|
@@ -558,19 +557,20 @@ handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = Co
|
|
|
fun enrich_server_keepalive/2,
|
|
|
fun enrich_assigned_clientid/2
|
|
|
], #{}, Channel),
|
|
|
- AckPacket = run_hooks('client.connack', [ConnInfo],
|
|
|
- ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps)),
|
|
|
- return_connack(AckPacket,
|
|
|
- ensure_keepalive(AckProps,
|
|
|
+ NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
|
|
|
+
|
|
|
+ return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
|
|
|
+ ensure_keepalive(NAckProps,
|
|
|
ensure_connected(ConnPkt, Channel)));
|
|
|
|
|
|
handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
+ Reason = emqx_reason_codes:name(ReasonCode),
|
|
|
+ AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
|
|
|
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
|
|
|
?MQTT_PROTO_V5 -> ReasonCode;
|
|
|
_ -> emqx_reason_codes:compat(connack, ReasonCode)
|
|
|
- end),
|
|
|
- AckPacket1 = run_hooks('client.connack', [ConnInfo], AckPacket),
|
|
|
- shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel);
|
|
|
+ end, sp(false), AckProps),
|
|
|
+ shutdown(Reason, AckPacket, Channel);
|
|
|
|
|
|
%% Optimize?
|
|
|
handle_out(publish, [], Channel) ->
|
|
|
@@ -699,7 +699,8 @@ return_unsuback(Packet, Channel) ->
|
|
|
-> {reply, Reply :: term(), channel()}
|
|
|
| {shutdown, Reason :: term(), Reply :: term(), channel()}).
|
|
|
handle_call(kick, Channel) ->
|
|
|
- shutdown(kicked, ok, Channel);
|
|
|
+ Channel1 = ensure_disconnected(kicked, Channel),
|
|
|
+ shutdown(kicked, ok, Channel1);
|
|
|
|
|
|
handle_call(discard, Channel = #channel{conn_state = connected}) ->
|
|
|
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
|
|
@@ -715,7 +716,7 @@ handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
|
|
|
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
|
|
|
pendings = Pendings}) ->
|
|
|
ok = emqx_session:takeover(Session),
|
|
|
- %% TODO: Should not drain deliver here
|
|
|
+ %% TODO: Should not drain deliver here (side effect)
|
|
|
Delivers = emqx_misc:drain_deliver(),
|
|
|
AllPendings = lists:append(Delivers, Pendings),
|
|
|
shutdown(takeovered, AllPendings, Channel);
|
|
|
@@ -943,9 +944,10 @@ receive_maximum(#{zone := Zone}, ConnProps) ->
|
|
|
%% Run Connect Hooks
|
|
|
|
|
|
run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
- case run_hooks('client.connect', [ConnInfo], ConnPkt) of
|
|
|
+ ConnProps = emqx_packet:info(properties, ConnPkt),
|
|
|
+ case run_hooks('client.connect', [ConnInfo], ConnProps) of
|
|
|
Error = {error, _Reason} -> Error;
|
|
|
- NConnPkt -> {ok, NConnPkt, Channel}
|
|
|
+ NConnProps -> {ok, emqx_packet:set_props(NConnProps, ConnPkt), Channel}
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|