Просмотр исходного кода

fix(shared-sub): validate share topic error send DISCONNECT

JimMoen 2 лет назад
Родитель
Сommit
b0e18f5e75
3 измененных файлов с 61 добавлено и 4 удалено
  1. 5 0
      apps/emqx/include/emqx_mqtt.hrl
  2. 32 2
      apps/emqx/src/emqx_topic.erl
  3. 24 2
      apps/emqx/test/emqx_topic_SUITE.erl

+ 5 - 0
apps/emqx/include/emqx_mqtt.hrl

@@ -669,6 +669,11 @@ end).
     end
 ).
 
+-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty).
+-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty).
+-define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter').
+-define(SHARE_NAME_INVALID_CHAR, share_subscription_group_name_cannot_include_wildcard).
+
 -define(FRAME_PARSE_ERROR, frame_parse_error).
 -define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
 -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).

+ 32 - 2
apps/emqx/src/emqx_topic.erl

@@ -90,11 +90,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
 
 -spec validate(name | filter, topic()) -> true.
 validate(_, <<>>) ->
+    %% MQTT-5.0 [MQTT-4.7.3-1]
     error(empty_topic);
 validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
+    %% MQTT-5.0 [MQTT-4.7.3-3]
     error(topic_too_long);
-validate(filter, Topic) when is_binary(Topic) ->
-    validate2(words(Topic));
+validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) ->
+    validate_share(SharedFilter);
+validate(filter, Filter) when is_binary(Filter) ->
+    validate2(words(Filter));
 validate(name, Topic) when is_binary(Topic) ->
     Words = words(Topic),
     validate2(Words) andalso
@@ -122,6 +126,32 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
 validate3(<<_/utf8, Rest/binary>>) ->
     validate3(Rest).
 
+validate_share(<<"$share/", Rest/binary>>) when
+    Rest =:= <<>> orelse Rest =:= <<"/">>
+->
+    %% MQTT-5.0 [MQTT-4.8.2-1]
+    error(?SHARE_EMPTY_FILTER);
+validate_share(<<"$share/", Rest/binary>>) ->
+    case binary:split(Rest, <<"/">>) of
+        %% MQTT-5.0 [MQTT-4.8.2-1]
+        [<<>>, _] ->
+            error(?SHARE_EMPTY_GROUP);
+        %% MQTT-5.0 [MQTT-4.7.3-1]
+        [_, <<>>] ->
+            error(?SHARE_EMPTY_FILTER);
+        [ShareName, Filter] ->
+            validate_share(ShareName, Filter)
+    end.
+
+validate_share(_, <<"$share/", _Rest/binary>>) ->
+    error(?SHARE_RECURSIVELY);
+validate_share(ShareName, Filter) ->
+    case binary:match(ShareName, [<<"+">>, <<"#">>]) of
+        %% MQTT-5.0 [MQTT-4.8.2-2]
+        nomatch -> validate2(words(Filter));
+        _ -> error(?SHARE_NAME_INVALID_CHAR)
+    end.
+
 %% @doc Prepend a topic prefix.
 %% Ensured to have only one / between prefix and suffix.
 prepend(undefined, W) ->

+ 24 - 2
apps/emqx/test/emqx_topic_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
 
 -import(
@@ -130,14 +131,35 @@ t_validate(_) ->
     true = validate({filter, <<"x">>}),
     true = validate({name, <<"x//y">>}),
     true = validate({filter, <<"sport/tennis/#">>}),
+    %% MQTT-5.0 [MQTT-4.7.3-1]
     ?assertError(empty_topic, validate({name, <<>>})),
+    ?assertError(empty_topic, validate({filter, <<>>})),
     ?assertError(topic_name_error, validate({name, <<"abc/#">>})),
     ?assertError(topic_too_long, validate({name, long_topic()})),
-    ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})),
     ?assertError(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})),
     ?assertError(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})),
     ?assertError(topic_invalid_char, validate({filter, <<"sport/tennis#">>})),
-    ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})).
+    %% MQTT-5.0 [MQTT-4.7.1-1]
+    ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})),
+    ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})),
+    %% MQTT-5.0 [MQTT-4.8.2-1]
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/">>})),
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share//">>})),
+    ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//t">>})),
+    ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//test">>})),
+    %% MQTT-5.0 [MQTT-4.7.3-1] for shared-sub
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g/">>})),
+    ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g2/">>})),
+    %% MQTT-5.0 [MQTT-4.8.2-2]
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/p+q/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/m+/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/+n/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#y/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#/1">>})),
+    ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/#y/1">>})),
+    %% share recursively
+    ?assertError(?SHARE_RECURSIVELY, validate({filter, <<"$share/g1/$share/t">>})),
+    true = validate({filter, <<"$share/g1/topic/$share">>}).
 
 t_sigle_level_validate(_) ->
     true = validate({filter, <<"+">>}),