|
|
@@ -281,14 +281,14 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
|
end;
|
|
|
|
|
|
-handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel
|
|
|
+handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
|
|
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
|
case emqx_session:puback(PacketId, Session) of
|
|
|
{ok, Msg, NSession} ->
|
|
|
- ok = after_message_acked(ClientInfo, Msg),
|
|
|
+ ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
{ok, Channel#channel{session = NSession}};
|
|
|
{ok, Msg, Publishes, NSession} ->
|
|
|
- ok = after_message_acked(ClientInfo, Msg),
|
|
|
+ ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
|
|
@@ -300,11 +300,11 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel
|
|
|
{ok, Channel}
|
|
|
end;
|
|
|
|
|
|
-handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel
|
|
|
+handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
|
|
|
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
|
case emqx_session:pubrec(PacketId, Session) of
|
|
|
{ok, Msg, NSession} ->
|
|
|
- ok = after_message_acked(ClientInfo, Msg),
|
|
|
+ ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
|
NChannel = Channel#channel{session = NSession},
|
|
|
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
@@ -347,12 +347,12 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
|
ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
|
|
|
- TopicFilters2 = enrich_subid(Properties, TopicFilters1),
|
|
|
+ TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1),
|
|
|
TopicFilters3 = run_hooks('client.subscribe',
|
|
|
[ClientInfo, Properties],
|
|
|
TopicFilters2
|
|
|
),
|
|
|
- {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
|
|
|
+ {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel),
|
|
|
case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
|
|
|
lists:any(fun(ReasonCode) ->
|
|
|
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
|
|
@@ -373,7 +373,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
[ClientInfo, Properties],
|
|
|
parse_topic_filters(TopicFilters)
|
|
|
),
|
|
|
- {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
|
|
+ {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
|
|
|
handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
|
|
|
{error, ReasonCode} ->
|
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
|
@@ -382,8 +382,8 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
handle_in(?PACKET(?PINGREQ), Channel) ->
|
|
|
{ok, ?PACKET(?PINGRESP), Channel};
|
|
|
|
|
|
-handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel) ->
|
|
|
- NChannel = maybe_clean_will_msg(ReasonCode, Channel),
|
|
|
+handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
+ NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = ConnInfo#{disconn_props => Properties}}),
|
|
|
process_disconnect(ReasonCode, Properties, NChannel);
|
|
|
|
|
|
handle_in(?AUTH_PACKET(), Channel) ->
|
|
|
@@ -437,7 +437,7 @@ process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanSt
|
|
|
%% Process Publish
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
|
+process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
|
Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
|
|
case pipeline([fun process_alias/2,
|
|
|
fun check_pub_alias/2,
|
|
|
@@ -466,12 +466,23 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
|
handle_out(disconnect, ReasonCode, NChannel)
|
|
|
end.
|
|
|
|
|
|
-packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
|
|
|
- clientinfo = ClientInfo =
|
|
|
- #{mountpoint := MountPoint}}) ->
|
|
|
- emqx_mountpoint:mount(
|
|
|
- MountPoint, emqx_packet:to_message(
|
|
|
- ClientInfo, #{proto_ver => ProtoVer}, Packet)).
|
|
|
+packet_to_message(Packet, #channel{
|
|
|
+ conninfo = #{proto_ver := ProtoVer},
|
|
|
+ clientinfo = #{
|
|
|
+ protocol := Protocol,
|
|
|
+ clientid := ClientId,
|
|
|
+ username := Username,
|
|
|
+ peerhost := PeerHost,
|
|
|
+ mountpoint := MountPoint
|
|
|
+ }
|
|
|
+ }) ->
|
|
|
+ emqx_mountpoint:mount(MountPoint,
|
|
|
+ emqx_packet:to_message(
|
|
|
+ Packet, ClientId,
|
|
|
+ #{proto_ver => ProtoVer,
|
|
|
+ protocol => Protocol,
|
|
|
+ username => Username,
|
|
|
+ peerhost => PeerHost})).
|
|
|
|
|
|
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
|
|
|
_ = emqx_broker:publish(Msg),
|
|
|
@@ -504,25 +515,26 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|
|
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
|
|
puback_reason_code([_|_]) -> ?RC_SUCCESS.
|
|
|
|
|
|
--compile({inline, [after_message_acked/2]}).
|
|
|
-after_message_acked(ClientInfo, Msg) ->
|
|
|
+-compile({inline, [after_message_acked/3]}).
|
|
|
+after_message_acked(ClientInfo, Msg, PubAckProps) ->
|
|
|
ok = emqx_metrics:inc('messages.acked'),
|
|
|
- emqx_hooks:run('message.acked', [ClientInfo, Msg]).
|
|
|
+ emqx_hooks:run('message.acked', [ClientInfo,
|
|
|
+ emqx_message:set_header(puback_props, PubAckProps, Msg)]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process Subscribe
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
--compile({inline, [process_subscribe/2]}).
|
|
|
-process_subscribe(TopicFilters, Channel) ->
|
|
|
- process_subscribe(TopicFilters, [], Channel).
|
|
|
+-compile({inline, [process_subscribe/3]}).
|
|
|
+process_subscribe(TopicFilters, SubProps, Channel) ->
|
|
|
+ process_subscribe(TopicFilters, SubProps, Channel, []).
|
|
|
|
|
|
-process_subscribe([], Acc, Channel) ->
|
|
|
+process_subscribe([], _SubProps, Channel, Acc) ->
|
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
|
|
-process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
- {RC, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel),
|
|
|
- process_subscribe(More, [RC|Acc], NChannel).
|
|
|
+process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
|
|
|
+ {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
|
|
|
+ process_subscribe(More, SubProps, NChannel, [RC|Acc]).
|
|
|
|
|
|
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
@@ -557,22 +569,22 @@ process_force_subscribe(Subscriptions, Channel =
|
|
|
%% Process Unsubscribe
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
--compile({inline, [process_unsubscribe/2]}).
|
|
|
-process_unsubscribe(TopicFilters, Channel) ->
|
|
|
- process_unsubscribe(TopicFilters, [], Channel).
|
|
|
+-compile({inline, [process_unsubscribe/3]}).
|
|
|
+process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
|
|
|
+ process_unsubscribe(TopicFilters, UnSubProps, Channel, []).
|
|
|
|
|
|
-process_unsubscribe([], Acc, Channel) ->
|
|
|
+process_unsubscribe([], _UnSubProps, Channel, Acc) ->
|
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
|
|
-process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
- {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts, Channel),
|
|
|
- process_unsubscribe(More, [RC|Acc], NChannel).
|
|
|
+process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) ->
|
|
|
+ {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
|
|
|
+ process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]).
|
|
|
|
|
|
-do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
|
+do_unsubscribe(TopicFilter, SubOpts, Channel =
|
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
session = Session}) ->
|
|
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
- case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of
|
|
|
+ case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
|
|
{ok, NSession} ->
|
|
|
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
|
|
{error, RC} -> {RC, Channel}
|
|
|
@@ -582,9 +594,9 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
|
process_force_unsubscribe(Subscriptions, Channel =
|
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
session = Session}) ->
|
|
|
- lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
|
|
|
+ lists:foldl(fun({TopicFilter, SubOpts}, {ReasonCodes, ChannelAcc}) ->
|
|
|
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
- case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
|
|
|
+ case emqx_session:unsubscribe(ClientInfo, NTopicFilter, SubOpts, Session) of
|
|
|
{ok, NSession} ->
|
|
|
{ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
|
|
|
{error, ReasonCode} ->
|
|
|
@@ -844,7 +856,7 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
|
|
|
[ClientInfo, #{'Internal' => true}],
|
|
|
parse_topic_filters(TopicFilters)
|
|
|
),
|
|
|
- {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
|
|
+ {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, #{}, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
handle_info({force_subscribe, TopicFilters}, Channel) ->
|
|
|
@@ -856,7 +868,7 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
|
|
|
[ClientInfo, #{'Internal' => true}],
|
|
|
parse_topic_filters(TopicFilters)
|
|
|
),
|
|
|
- {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
|
|
+ {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, #{}, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
handle_info({force_unsubscribe, TopicFilters}, Channel) ->
|
|
|
@@ -1329,9 +1341,9 @@ check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Enrich SubId
|
|
|
|
|
|
-enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
|
|
+put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
|
|
[{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
|
|
|
-enrich_subid(_Properties, TopicFilters) -> TopicFilters.
|
|
|
+put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Enrich SubOpts
|
|
|
@@ -1490,7 +1502,7 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
|
|
end.
|
|
|
|
|
|
will_delay_interval(WillMsg) ->
|
|
|
- emqx_message:get_header('Will-Delay-Interval', WillMsg, 0).
|
|
|
+ maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
|
|
|
|
|
|
publish_will_msg(Msg) -> emqx_broker:publish(Msg).
|
|
|
|