|
@@ -179,6 +179,7 @@ process_received_bytes(<<>>, State) ->
|
|
|
process_received_bytes(Bytes,
|
|
process_received_bytes(Bytes,
|
|
|
State = #state{ parse_state = ParseState,
|
|
State = #state{ parse_state = ParseState,
|
|
|
conn_name = ConnStr }) ->
|
|
conn_name = ConnStr }) ->
|
|
|
|
|
+ ?INFO("~p~n", [Bytes]),
|
|
|
case emqtt_frame:parse(Bytes, ParseState) of
|
|
case emqtt_frame:parse(Bytes, ParseState) of
|
|
|
{more, ParseState1} ->
|
|
{more, ParseState1} ->
|
|
|
{noreply,
|
|
{noreply,
|
|
@@ -434,12 +435,17 @@ validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name =
|
|
|
false -> {error, badtopic}
|
|
false -> {error, badtopic}
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
-validate_frame(?UNSUBSCRIBE, Frame) ->
|
|
|
|
|
- validate_frame(?SUBSCRIBE, Frame);
|
|
|
|
|
|
|
+validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
|
|
|
|
+ ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
|
|
|
|
|
+ not emqtt_topic:validate({subscribe, Topic})],
|
|
|
|
|
+ case ErrTopics of
|
|
|
|
|
+ [] -> ok;
|
|
|
|
|
+ _ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
|
|
|
|
|
+ end;
|
|
|
|
|
|
|
|
validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
|
validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
|
|
- ErrTopics = [Topic || #mqtt_topic{name=Topic} <- Topics,
|
|
|
|
|
- not emqtt_topic:validate({subscribe, Topic})],
|
|
|
|
|
|
|
+ ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
|
|
|
|
|
+ not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))],
|
|
|
case ErrTopics of
|
|
case ErrTopics of
|
|
|
[] -> ok;
|
|
[] -> ok;
|
|
|
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
|
|
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
|