|
@@ -191,18 +191,27 @@ handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
|
|
|
end,
|
|
end,
|
|
|
{ok, NewState};
|
|
{ok, NewState};
|
|
|
|
|
|
|
|
|
|
+%% protect from empty topic list
|
|
|
|
|
+handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
|
|
|
+ send(?SUBACK_PACKET(PacketId, []), State);
|
|
|
|
|
+
|
|
|
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) ->
|
|
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) ->
|
|
|
AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
|
|
AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
|
|
|
case lists:member(deny, AllowDenies) of
|
|
case lists:member(deny, AllowDenies) of
|
|
|
true ->
|
|
true ->
|
|
|
- %%TODO: return 128 QoS when deny...
|
|
|
|
|
|
|
+ %%TODO: return 128 QoS when deny... no need to SUBACK?
|
|
|
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
|
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
|
|
{ok, State};
|
|
{ok, State};
|
|
|
false ->
|
|
false ->
|
|
|
|
|
+ %%TODO: GrantedQos should be renamed.
|
|
|
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
|
|
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
|
|
|
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
|
|
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
|
|
+%% protect from empty topic list
|
|
|
|
|
+handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
|
|
|
+ send(?UNSUBACK_PACKET(PacketId), State);
|
|
|
|
|
+
|
|
|
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
|
|
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
|
|
|
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics),
|
|
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics),
|
|
|
send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
|
|
send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession});
|