Просмотр исходного кода

Merge pull request #12092 from keynslug/fix/EMQX-11520/ds-empty-levels

fix(ds): pass topics to emqx_topic:words/1 before feeding LTS tree
Andrew Mayorov 2 лет назад
Родитель
Сommit
6a1db5db4a

+ 3 - 5
apps/emqx/src/emqx_topic.erl

@@ -91,13 +91,11 @@ match([H | T1], [H | T2]) ->
     match(T1, T2);
 match([_H | T1], ['+' | T2]) ->
     match(T1, T2);
+match([<<>> | T1], ['' | T2]) ->
+    match(T1, T2);
 match(_, ['#']) ->
     true;
-match([_H1 | _], [_H2 | _]) ->
-    false;
-match([_H1 | _], []) ->
-    false;
-match([], [_H | _T2]) ->
+match(_, _) ->
     false.
 
 -spec match_share(Name, Filter) -> boolean() when

+ 36 - 5
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -282,6 +282,34 @@ t_publish_as_persistent(_Config) ->
         emqtt:stop(Pub)
     end.
 
+t_publish_empty_topic_levels(_Config) ->
+    Sub = connect(<<?MODULE_STRING "1">>, true, 30),
+    Pub = connect(<<?MODULE_STRING "2">>, true, 30),
+    try
+        {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t//+//#">>, qos1),
+        Messages = [
+            {<<"t//1">>, <<"1">>},
+            {<<"t//1/">>, <<"2">>},
+            {<<"t//2//">>, <<"3">>},
+            {<<"t//2//foo">>, <<"4">>},
+            {<<"t//2/foo">>, <<"5">>},
+            {<<"t/3/bar">>, <<"6">>}
+        ],
+        [emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages],
+        Received = receive_messages(length(Messages), 1_500),
+        ?assertMatch(
+            [
+                #{topic := <<"t//1/">>, payload := <<"2">>},
+                #{topic := <<"t//2//">>, payload := <<"3">>},
+                #{topic := <<"t//2//foo">>, payload := <<"4">>}
+            ],
+            lists:sort(emqx_utils_maps:key_comparer(payload), Received)
+        )
+    after
+        emqtt:stop(Sub),
+        emqtt:stop(Pub)
+    end.
+
 %%
 
 connect(ClientId, CleanStart, EI) ->
@@ -322,15 +350,18 @@ consume(It) ->
     end.
 
 receive_messages(Count) ->
-    lists:reverse(receive_messages(Count, [])).
+    receive_messages(Count, 5_000).
+
+receive_messages(Count, Timeout) ->
+    lists:reverse(receive_messages(Count, [], Timeout)).
 
-receive_messages(0, Msgs) ->
+receive_messages(0, Msgs, _Timeout) ->
     Msgs;
-receive_messages(Count, Msgs) ->
+receive_messages(Count, Msgs, Timeout) ->
     receive
         {publish, Msg} ->
-            receive_messages(Count - 1, [Msg | Msgs])
-    after 5_000 ->
+            receive_messages(Count - 1, [Msg | Msgs], Timeout)
+    after Timeout ->
         Msgs
     end.
 

+ 6 - 0
apps/emqx/test/emqx_topic_SUITE.erl

@@ -115,6 +115,12 @@ t_sys_match(_) ->
     true = match(<<"a/b/$c">>, <<"a/b/#">>),
     true = match(<<"a/b/$c">>, <<"a/#">>).
 
+t_match_tokens(_) ->
+    true = match(emqx_topic:tokens(<<"a/b/c">>), words(<<"a/+/c">>)),
+    true = match(emqx_topic:tokens(<<"a//c">>), words(<<"a/+/c">>)),
+    false = match(emqx_topic:tokens(<<"a//c/">>), words(<<"a/+/c">>)),
+    true = match(emqx_topic:tokens(<<"a//c/">>), words(<<"a/+/c/#">>)).
+
 t_match_perf(_) ->
     true = match(<<"a/b/ccc">>, <<"a/#">>),
     Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,

+ 2 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -368,7 +368,7 @@ check_message(
     #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
     #message{timestamp = Timestamp, topic = Topic}
 ) when Timestamp >= StartTime ->
-    emqx_topic:match(emqx_topic:words(Topic), TopicFilter);
+    emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter);
 check_message(_Cutoff, _It, _Msg) ->
     false.
 
@@ -378,7 +378,7 @@ format_key(KeyMapper, Key) ->
 
 -spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}.
 make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
-    Tokens = emqx_topic:tokens(TopicBin),
+    Tokens = emqx_topic:words(TopicBin),
     {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
     VaryingHashes = [hash_topic_level(I) || I <- Varying],
     KeyMapper = array:get(length(Varying), KeyMappers),

+ 15 - 1
apps/emqx_utils/src/emqx_utils_maps.erl

@@ -35,7 +35,8 @@
     if_only_to_toggle_enable/2,
     update_if_present/3,
     put_if/4,
-    rename/3
+    rename/3,
+    key_comparer/1
 ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -318,3 +319,16 @@ rename(OldKey, NewKey, Map) ->
         error ->
             Map
     end.
+
+-spec key_comparer(K) -> fun((M, M) -> boolean()) when M :: #{K => _V}.
+key_comparer(K) ->
+    fun
+        (#{K := V1}, #{K := V2}) ->
+            V1 < V2;
+        (#{K := _}, _) ->
+            false;
+        (_, #{K := _}) ->
+            true;
+        (M1, M2) ->
+            M1 < M2
+    end.

+ 19 - 0
apps/emqx_utils/test/emqx_utils_maps_tests.erl

@@ -110,3 +110,22 @@ best_effort_recursive_sum_test_() ->
             )
         )
     ].
+
+key_comparer_test() ->
+    Comp = emqx_utils_maps:key_comparer(foo),
+    ?assertEqual(
+        [
+            #{},
+            #{baz => 42},
+            #{foo => 1},
+            #{foo => 42},
+            #{foo => bar, baz => 42}
+        ],
+        lists:sort(Comp, [
+            #{foo => 42},
+            #{baz => 42},
+            #{foo => bar, baz => 42},
+            #{foo => 1},
+            #{}
+        ])
+    ).