|
|
@@ -62,9 +62,9 @@
|
|
|
%% MQTT ClientInfo
|
|
|
clientinfo :: emqx_types:clientinfo(),
|
|
|
%% MQTT Session
|
|
|
- session :: emqx_session:session(),
|
|
|
+ session :: maybe(emqx_session:session()),
|
|
|
%% Keepalive
|
|
|
- keepalive :: emqx_keepalive:keepalive(),
|
|
|
+ keepalive :: maybe(emqx_keepalive:keepalive()),
|
|
|
%% MQTT Will Msg
|
|
|
will_msg :: maybe(emqx_types:message()),
|
|
|
%% MQTT Topic Aliases
|
|
|
@@ -108,6 +108,8 @@
|
|
|
|
|
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
|
|
|
|
|
+-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Info, Attrs and Caps
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -281,14 +283,14 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
|
end;
|
|
|
|
|
|
-handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel
|
|
|
+handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
|
|
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
|
case emqx_session:puback(PacketId, Session) of
|
|
|
{ok, Msg, NSession} ->
|
|
|
- ok = after_message_acked(ClientInfo, Msg),
|
|
|
+ ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
{ok, Channel#channel{session = NSession}};
|
|
|
{ok, Msg, Publishes, NSession} ->
|
|
|
- ok = after_message_acked(ClientInfo, Msg),
|
|
|
+ ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
|
|
@@ -300,11 +302,11 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel
|
|
|
{ok, Channel}
|
|
|
end;
|
|
|
|
|
|
-handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel
|
|
|
+handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
|
|
|
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
|
case emqx_session:pubrec(PacketId, Session) of
|
|
|
{ok, Msg, NSession} ->
|
|
|
- ok = after_message_acked(ClientInfo, Msg),
|
|
|
+ ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
NChannel = Channel#channel{session = NSession},
|
|
|
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
@@ -347,12 +349,12 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
|
ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
|
|
|
- TopicFilters2 = enrich_subid(Properties, TopicFilters1),
|
|
|
+ TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1),
|
|
|
TopicFilters3 = run_hooks('client.subscribe',
|
|
|
[ClientInfo, Properties],
|
|
|
TopicFilters2
|
|
|
),
|
|
|
- {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
|
|
|
+ {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel),
|
|
|
case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
|
|
|
lists:any(fun(ReasonCode) ->
|
|
|
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
|
|
@@ -373,7 +375,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
[ClientInfo, Properties],
|
|
|
parse_topic_filters(TopicFilters)
|
|
|
),
|
|
|
- {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
|
|
+ {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
|
|
|
handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
|
|
|
{error, ReasonCode} ->
|
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
|
@@ -382,8 +384,8 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
handle_in(?PACKET(?PINGREQ), Channel) ->
|
|
|
{ok, ?PACKET(?PINGRESP), Channel};
|
|
|
|
|
|
-handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel) ->
|
|
|
- NChannel = maybe_clean_will_msg(ReasonCode, Channel),
|
|
|
+handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
+ NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = ConnInfo#{disconn_props => Properties}}),
|
|
|
process_disconnect(ReasonCode, Properties, NChannel);
|
|
|
|
|
|
handle_in(?AUTH_PACKET(), Channel) ->
|
|
|
@@ -437,7 +439,7 @@ process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanSt
|
|
|
%% Process Publish
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
|
+process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
|
Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
|
|
case pipeline([fun process_alias/2,
|
|
|
fun check_pub_alias/2,
|
|
|
@@ -466,12 +468,23 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
|
handle_out(disconnect, ReasonCode, NChannel)
|
|
|
end.
|
|
|
|
|
|
-packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
|
|
|
- clientinfo = ClientInfo =
|
|
|
- #{mountpoint := MountPoint}}) ->
|
|
|
- emqx_mountpoint:mount(
|
|
|
- MountPoint, emqx_packet:to_message(
|
|
|
- ClientInfo, #{proto_ver => ProtoVer}, Packet)).
|
|
|
+packet_to_message(Packet, #channel{
|
|
|
+ conninfo = #{proto_ver := ProtoVer},
|
|
|
+ clientinfo = #{
|
|
|
+ protocol := Protocol,
|
|
|
+ clientid := ClientId,
|
|
|
+ username := Username,
|
|
|
+ peerhost := PeerHost,
|
|
|
+ mountpoint := MountPoint
|
|
|
+ }
|
|
|
+ }) ->
|
|
|
+ emqx_mountpoint:mount(MountPoint,
|
|
|
+ emqx_packet:to_message(
|
|
|
+ Packet, ClientId,
|
|
|
+ #{proto_ver => ProtoVer,
|
|
|
+ protocol => Protocol,
|
|
|
+ username => Username,
|
|
|
+ peerhost => PeerHost})).
|
|
|
|
|
|
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
|
|
|
_ = emqx_broker:publish(Msg),
|
|
|
@@ -504,25 +517,26 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|
|
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
|
|
puback_reason_code([_|_]) -> ?RC_SUCCESS.
|
|
|
|
|
|
--compile({inline, [after_message_acked/2]}).
|
|
|
-after_message_acked(ClientInfo, Msg) ->
|
|
|
+-compile({inline, [after_message_acked/3]}).
|
|
|
+after_message_acked(ClientInfo, Msg, PubAckProps) ->
|
|
|
ok = emqx_metrics:inc('messages.acked'),
|
|
|
- emqx_hooks:run('message.acked', [ClientInfo, Msg]).
|
|
|
+ emqx_hooks:run('message.acked', [ClientInfo,
|
|
|
+ emqx_message:set_header(puback_props, PubAckProps, Msg)]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process Subscribe
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
--compile({inline, [process_subscribe/2]}).
|
|
|
-process_subscribe(TopicFilters, Channel) ->
|
|
|
- process_subscribe(TopicFilters, [], Channel).
|
|
|
+-compile({inline, [process_subscribe/3]}).
|
|
|
+process_subscribe(TopicFilters, SubProps, Channel) ->
|
|
|
+ process_subscribe(TopicFilters, SubProps, Channel, []).
|
|
|
|
|
|
-process_subscribe([], Acc, Channel) ->
|
|
|
+process_subscribe([], _SubProps, Channel, Acc) ->
|
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
|
|
-process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
- {RC, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel),
|
|
|
- process_subscribe(More, [RC|Acc], NChannel).
|
|
|
+process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
|
|
|
+ {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
|
|
|
+ process_subscribe(More, SubProps, NChannel, [RC|Acc]).
|
|
|
|
|
|
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
@@ -557,22 +571,22 @@ process_force_subscribe(Subscriptions, Channel =
|
|
|
%% Process Unsubscribe
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
--compile({inline, [process_unsubscribe/2]}).
|
|
|
-process_unsubscribe(TopicFilters, Channel) ->
|
|
|
- process_unsubscribe(TopicFilters, [], Channel).
|
|
|
+-compile({inline, [process_unsubscribe/3]}).
|
|
|
+process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
|
|
|
+ process_unsubscribe(TopicFilters, UnSubProps, Channel, []).
|
|
|
|
|
|
-process_unsubscribe([], Acc, Channel) ->
|
|
|
+process_unsubscribe([], _UnSubProps, Channel, Acc) ->
|
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
|
|
-process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
- {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts, Channel),
|
|
|
- process_unsubscribe(More, [RC|Acc], NChannel).
|
|
|
+process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) ->
|
|
|
+ {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
|
|
|
+ process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]).
|
|
|
|
|
|
-do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
|
+do_unsubscribe(TopicFilter, SubOpts, Channel =
|
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
session = Session}) ->
|
|
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
- case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of
|
|
|
+ case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
|
|
{ok, NSession} ->
|
|
|
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
|
|
{error, RC} -> {RC, Channel}
|
|
|
@@ -582,9 +596,9 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
|
process_force_unsubscribe(Subscriptions, Channel =
|
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
session = Session}) ->
|
|
|
- lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
|
|
|
+ lists:foldl(fun({TopicFilter, SubOpts}, {ReasonCodes, ChannelAcc}) ->
|
|
|
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
- case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
|
|
|
+ case emqx_session:unsubscribe(ClientInfo, NTopicFilter, SubOpts, Session) of
|
|
|
{ok, NSession} ->
|
|
|
{ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
|
|
|
{error, ReasonCode} ->
|
|
|
@@ -662,6 +676,7 @@ not_nacked({deliver, _Topic, Msg}) ->
|
|
|
handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
AckProps = run_fold([fun enrich_connack_caps/2,
|
|
|
fun enrich_server_keepalive/2,
|
|
|
+ fun enrich_response_information/2,
|
|
|
fun enrich_assigned_clientid/2
|
|
|
], Props, Channel),
|
|
|
NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
|
|
|
@@ -806,7 +821,8 @@ return_unsuback(Packet, Channel) ->
|
|
|
|
|
|
-spec(handle_call(Req :: term(), channel())
|
|
|
-> {reply, Reply :: term(), channel()}
|
|
|
- | {shutdown, Reason :: term(), Reply :: term(), channel()}).
|
|
|
+ | {shutdown, Reason :: term(), Reply :: term(), channel()}
|
|
|
+ | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
|
|
|
handle_call(kick, Channel) ->
|
|
|
Channel1 = ensure_disconnected(kicked, Channel),
|
|
|
disconnect_and_shutdown(kicked, ok, Channel1);
|
|
|
@@ -844,7 +860,7 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
|
|
|
[ClientInfo, #{'Internal' => true}],
|
|
|
parse_topic_filters(TopicFilters)
|
|
|
),
|
|
|
- {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
|
|
+ {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, #{}, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
handle_info({force_subscribe, TopicFilters}, Channel) ->
|
|
|
@@ -856,7 +872,7 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
|
|
|
[ClientInfo, #{'Internal' => true}],
|
|
|
parse_topic_filters(TopicFilters)
|
|
|
),
|
|
|
- {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
|
|
+ {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, #{}, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
handle_info({force_unsubscribe, TopicFilters}, Channel) ->
|
|
|
@@ -924,9 +940,6 @@ handle_timeout(_TRef, retry_delivery,
|
|
|
case emqx_session:retry(Session) of
|
|
|
{ok, NSession} ->
|
|
|
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
|
|
- {ok, Publishes, NSession} ->
|
|
|
- NChannel = Channel#channel{session = NSession},
|
|
|
- handle_out(publish, Publishes, reset_timer(retry_timer, NChannel));
|
|
|
{ok, Publishes, Timeout, NSession} ->
|
|
|
NChannel = Channel#channel{session = NSession},
|
|
|
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
|
|
@@ -1001,6 +1014,7 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
|
|
|
%% Terminate
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+-spec(terminate(any(), channel()) -> ok).
|
|
|
terminate(_, #channel{conn_state = idle}) -> ok;
|
|
|
terminate(normal, Channel) ->
|
|
|
run_terminate_hook(normal, Channel);
|
|
|
@@ -1176,7 +1190,7 @@ enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{
|
|
|
end;
|
|
|
|
|
|
enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
- AuthMethod = maps:get('Authentication-Method', maps:get(conn_props, ConnInfo), undefined),
|
|
|
+ AuthMethod = emqx_mqtt_props:get('Authentication-Method', emqx_mqtt_props:get(conn_props, ConnInfo, #{}), undefined),
|
|
|
NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
|
|
|
AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
|
|
|
case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of
|
|
|
@@ -1329,9 +1343,9 @@ check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Enrich SubId
|
|
|
|
|
|
-enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
|
|
+put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
|
|
[{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
|
|
|
-enrich_subid(_Properties, TopicFilters) -> TopicFilters.
|
|
|
+put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Enrich SubOpts
|
|
|
@@ -1380,6 +1394,16 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
|
|
|
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
|
|
end.
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Enrich response information
|
|
|
+
|
|
|
+enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps},
|
|
|
+ clientinfo = #{zone := Zone}}) ->
|
|
|
+ case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of
|
|
|
+ 0 -> AckProps;
|
|
|
+ 1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)}
|
|
|
+ end.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Enrich Assigned ClientId
|
|
|
|
|
|
@@ -1490,7 +1514,7 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
|
|
end.
|
|
|
|
|
|
will_delay_interval(WillMsg) ->
|
|
|
- emqx_message:get_header('Will-Delay-Interval', WillMsg, 0).
|
|
|
+ maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
|
|
|
|
|
|
publish_will_msg(Msg) -> emqx_broker:publish(Msg).
|
|
|
|