|
|
@@ -391,22 +391,30 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
|
|
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
|
- ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
|
|
|
- TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1),
|
|
|
- TopicFilters3 = run_hooks('client.subscribe',
|
|
|
- [ClientInfo, Properties],
|
|
|
- TopicFilters2
|
|
|
- ),
|
|
|
- {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel),
|
|
|
- case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
|
|
|
- lists:any(fun(ReasonCode) ->
|
|
|
- ReasonCode =:= ?RC_NOT_AUTHORIZED
|
|
|
- end, ReasonCodes) of
|
|
|
- true ->
|
|
|
- handle_out(disconnect, ?RC_NOT_AUTHORIZED, NChannel);
|
|
|
- false ->
|
|
|
- handle_out(suback, {PacketId, ReasonCodes}, NChannel)
|
|
|
- end;
|
|
|
+ ok ->
|
|
|
+ TopicFilters0 = parse_topic_filters(TopicFilters),
|
|
|
+ TupleTopicFilters0 = check_sub_acls(TopicFilters0, Channel),
|
|
|
+ case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
|
|
|
+ lists:any(fun({_TopicFilter, ReasonCode}) ->
|
|
|
+ ReasonCode =:= ?RC_NOT_AUTHORIZED
|
|
|
+ end, TupleTopicFilters0) of
|
|
|
+ true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
|
|
|
+ false ->
|
|
|
+ Replace = fun
|
|
|
+ _Fun(TupleList, [ Tuple = {Key, _Value} | More]) ->
|
|
|
+ _Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More);
|
|
|
+ _Fun(TupleList, []) -> TupleList
|
|
|
+ end,
|
|
|
+ TopicFilters1 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
|
|
|
+ TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1),
|
|
|
+ TopicFilters3 = run_hooks('client.subscribe',
|
|
|
+ [ClientInfo, Properties],
|
|
|
+ TopicFilters2),
|
|
|
+ {TupleTopicFilters1, NChannel} = process_subscribe(TopicFilters3, Properties, Channel),
|
|
|
+ TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1),
|
|
|
+ ReasonCodes2 = [ ReasonCode || {_TopicFilter, ReasonCode} <- TupleTopicFilters2],
|
|
|
+ handle_out(suback, {PacketId, ReasonCodes2}, NChannel)
|
|
|
+ end;
|
|
|
{error, ReasonCode} ->
|
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
|
end;
|
|
|
@@ -613,15 +621,15 @@ process_subscribe(TopicFilters, SubProps, Channel) ->
|
|
|
process_subscribe([], _SubProps, Channel, Acc) ->
|
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
|
|
-process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
|
|
|
- case check_subscribe(TopicFilter, SubOpts, Channel) of
|
|
|
+process_subscribe([Topic = {TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
|
|
|
+ case check_sub_caps(TopicFilter, SubOpts, Channel) of
|
|
|
ok ->
|
|
|
- {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
|
|
|
- process_subscribe(More, SubProps, NChannel, [RC|Acc]);
|
|
|
- {error, RC} ->
|
|
|
+ {ReasonCode, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
|
|
|
+ process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]);
|
|
|
+ {error, ReasonCode} ->
|
|
|
?LOG(warning, "Cannot subscribe ~s due to ~s.",
|
|
|
- [TopicFilter, emqx_reason_codes:text(RC)]),
|
|
|
- process_subscribe(More, SubProps, Channel, [RC|Acc])
|
|
|
+ [TopicFilter, emqx_reason_codes:text(ReasonCode)]),
|
|
|
+ process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc])
|
|
|
end.
|
|
|
|
|
|
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
|
@@ -1390,16 +1398,20 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
|
|
|
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% Check Subscribe
|
|
|
+%% Check Sub ACL
|
|
|
|
|
|
-check_subscribe(TopicFilter, SubOpts, Channel) ->
|
|
|
- case check_sub_acl(TopicFilter, Channel) of
|
|
|
- allow -> check_sub_caps(TopicFilter, SubOpts, Channel);
|
|
|
- deny -> {error, ?RC_NOT_AUTHORIZED}
|
|
|
- end.
|
|
|
+check_sub_acls(TopicFilters, Channel) ->
|
|
|
+ check_sub_acls(TopicFilters, Channel, []).
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Check Sub ACL
|
|
|
+check_sub_acls([ TopicFilter = {Topic, _} | More] , Channel, Acc) ->
|
|
|
+ case check_sub_acl(Topic, Channel) of
|
|
|
+ allow ->
|
|
|
+ check_sub_acls(More, Channel, [ {TopicFilter, 0} | Acc]);
|
|
|
+ deny ->
|
|
|
+ check_sub_acls(More, Channel, [ {TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
|
|
|
+ end;
|
|
|
+check_sub_acls([], _Channel, Acc) ->
|
|
|
+ lists:reverse(Acc).
|
|
|
|
|
|
check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
|
|
|
case is_acl_enabled(ClientInfo) andalso
|