|
|
@@ -457,7 +457,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
|
case QoS of
|
|
|
?QOS_0 -> {ok, NChannel};
|
|
|
_ ->
|
|
|
- handle_out(puback, {PacketId, ReasonCode}, NChannel)
|
|
|
+ handle_out(puback, {PacketId, ReasonCode}, NChannel)
|
|
|
end;
|
|
|
disconnect ->
|
|
|
handle_out(disconnect, ReasonCode, NChannel)
|
|
|
@@ -535,37 +535,25 @@ process_subscribe([], _SubProps, Channel, Acc) ->
|
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
|
|
process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
|
|
|
- {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
|
|
|
- process_subscribe(More, SubProps, NChannel, [RC|Acc]).
|
|
|
-
|
|
|
-do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
|
- #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
- session = Session}) ->
|
|
|
case check_subscribe(TopicFilter, SubOpts, Channel) of
|
|
|
- ok -> TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
- SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
|
|
- case emqx_session:subscribe(ClientInfo, TopicFilter1, SubOpts1, Session) of
|
|
|
- {ok, NSession} ->
|
|
|
- {QoS, Channel#channel{session = NSession}};
|
|
|
- {error, RC} -> {RC, Channel}
|
|
|
- end;
|
|
|
- {error, RC} -> {RC, Channel}
|
|
|
+ ok ->
|
|
|
+ {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
|
|
|
+ process_subscribe(More, SubProps, NChannel, [RC|Acc]);
|
|
|
+ {error, RC} ->
|
|
|
+ process_subscribe(More, SubProps, Channel, [RC|Acc])
|
|
|
end.
|
|
|
|
|
|
--compile({inline, [process_force_subscribe/2]}).
|
|
|
-process_force_subscribe(Subscriptions, Channel =
|
|
|
+do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
session = Session}) ->
|
|
|
- lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) ->
|
|
|
- NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
- NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc),
|
|
|
- case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
|
|
- {ok, NSession} ->
|
|
|
- {ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}};
|
|
|
- {error, ReasonCode} ->
|
|
|
- {ReasonCodes ++ [ReasonCode], ChannelAcc}
|
|
|
- end
|
|
|
- end, {[], Channel}, Subscriptions).
|
|
|
+ NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
+ NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
|
|
+ case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
|
|
+ {ok, NSession} ->
|
|
|
+ {QoS, Channel#channel{session = NSession}};
|
|
|
+ {error, RC} ->
|
|
|
+ {RC, Channel}
|
|
|
+ end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process Unsubscribe
|
|
|
@@ -591,21 +579,6 @@ do_unsubscribe(TopicFilter, SubOpts, Channel =
|
|
|
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
|
|
{error, RC} -> {RC, Channel}
|
|
|
end.
|
|
|
-
|
|
|
--compile({inline, [process_force_unsubscribe/2]}).
|
|
|
-process_force_unsubscribe(Subscriptions, Channel =
|
|
|
- #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
|
- session = Session}) ->
|
|
|
- lists:foldl(fun({TopicFilter, SubOpts}, {ReasonCodes, ChannelAcc}) ->
|
|
|
- NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
- case emqx_session:unsubscribe(ClientInfo, NTopicFilter, SubOpts, Session) of
|
|
|
- {ok, NSession} ->
|
|
|
- {ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
|
|
|
- {error, ReasonCode} ->
|
|
|
- {ReasonCodes ++ [ReasonCode], ChannelAcc}
|
|
|
- end
|
|
|
- end, {[], Channel}, Subscriptions).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process Disconnect
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -855,28 +828,16 @@ handle_call(Req, Channel) ->
|
|
|
|
|
|
-spec(handle_info(Info :: term(), channel())
|
|
|
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
|
|
|
-handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
|
|
- TopicFilters1 = run_hooks('client.subscribe',
|
|
|
- [ClientInfo, #{'Internal' => true}],
|
|
|
- parse_topic_filters(TopicFilters)
|
|
|
- ),
|
|
|
- {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, #{}, Channel),
|
|
|
- {ok, NChannel};
|
|
|
-
|
|
|
-handle_info({force_subscribe, TopicFilters}, Channel) ->
|
|
|
- {_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel),
|
|
|
- {ok, NChannel};
|
|
|
|
|
|
-handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
|
|
- TopicFilters1 = run_hooks('client.unsubscribe',
|
|
|
- [ClientInfo, #{'Internal' => true}],
|
|
|
- parse_topic_filters(TopicFilters)
|
|
|
- ),
|
|
|
- {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, #{}, Channel),
|
|
|
+handle_info({subscribe, TopicFilters}, Channel ) ->
|
|
|
+ {_, NChannel} = lists:foldl(
|
|
|
+ fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
|
|
|
+ do_subscribe(TopicFilter, SubOpts, ChannelAcc)
|
|
|
+ end, {[], Channel}, parse_topic_filters(TopicFilters)),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
-handle_info({force_unsubscribe, TopicFilters}, Channel) ->
|
|
|
- {_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel),
|
|
|
+handle_info({unsubscribe, TopicFilters}, Channel) ->
|
|
|
+ {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
|
|
@@ -1253,7 +1214,7 @@ packing_alias(Packet = #mqtt_packet{
|
|
|
},
|
|
|
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) ->
|
|
|
case find_alias(outbound, Topic, TopicAliases) of
|
|
|
- {ok, AliasId} ->
|
|
|
+ {ok, AliasId} ->
|
|
|
NPublish = Publish#mqtt_packet_publish{
|
|
|
topic_name = <<>>,
|
|
|
properties = #{'Topic-Alias' => AliasId}
|