Jelajahi Sumber

Merge pull request #12303 from savonarola/0111-fix-indexing

fix(retainer): fix topic search by index
Ilya Averyanov 2 tahun lalu
induk
melakukan
4c66a1135b

+ 16 - 11
apps/emqx_retainer/src/emqx_retainer_index.erl

@@ -88,13 +88,16 @@ select_index(Tokens, Indices) ->
     select_index(Tokens, Indices, 0, undefined).
 
 %% @doc For an index and a wildcard topic
-%% returns a matchspec pattern for the corresponding index key.
+%% returns a tuple of:
+%% * matchspec pattern for the corresponding index key
+%% * boolean flag indicating whether the pattern is exact
 %%
 %% E.g. for `[2, 3]' index and <code>['+', <<"b">>, '+', <<"d">>]</code> wildcard topic
 %% returns <code>{[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}</code> pattern.
--spec condition(index(), emqx_types:words()) -> match_pattern_part().
+-spec condition(index(), emqx_types:words()) -> {match_pattern_part(), boolean()}.
 condition(Index, Tokens) ->
-    {Index, condition(Index, Tokens, 1, [], [])}.
+    {Condition, IsExact} = condition(Index, Tokens, 1, [], []),
+    {{Index, Condition}, IsExact}.
 
 %% @doc Returns a matchspec pattern for a wildcard topic.
 %%
@@ -103,15 +106,17 @@ condition(Index, Tokens) ->
 -spec condition(emqx_types:words()) -> match_pattern_part().
 condition(Tokens) ->
     Tokens1 = [
-        case W =:= '+' of
-            true -> '_';
+        case W of
+            '+' -> '_';
             _ -> W
         end
      || W <- Tokens
     ],
     case length(Tokens1) > 0 andalso lists:last(Tokens1) =:= '#' of
-        false -> Tokens1;
-        _ -> (Tokens1 -- ['#']) ++ '_'
+        false ->
+            Tokens1;
+        _ ->
+            (Tokens1 -- ['#']) ++ '_'
     end.
 
 %% @doc Restores concrete topic from its index key representation.
@@ -162,13 +167,13 @@ select_index(Tokens, [Index | Indices], MaxScore, SelectedIndex) ->
     end.
 
 condition([_NIndex | _OtherIndex], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
-    {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'};
+    {{lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'}, false};
 condition([], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
-    {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'};
+    {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'}, true};
 condition([], Tokens, _N, IndexMatch, OtherMatch) ->
-    {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)};
+    {{lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)}, true};
 condition([_NIndex | _OtherIndex], [], _N, IndexMatch, OtherMatch) ->
-    {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch)};
+    {{lists:reverse(IndexMatch), lists:reverse(OtherMatch)}, true};
 condition([NIndex | OtherIndex], ['+' | OtherTokens], N, IndexMatch, OtherMatch) when
     NIndex =:= N
 ->

+ 8 - 5
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -328,15 +328,15 @@ search_table(Tokens, Now) ->
 search_table(undefined, Tokens, Now) ->
     Ms = make_message_match_spec(Tokens, Now),
     ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
-search_table(Index, Tokens, Now) ->
-    Ms = make_index_match_spec(Index, Tokens, Now),
+search_table(Index, FilterTokens, Now) ->
+    {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
     Topics = [
         emqx_retainer_index:restore_topic(Key)
      || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
     ],
     RetainedMsgQH = qlc:q([
         ets:lookup(?TAB_MESSAGE, TopicTokens)
-     || TopicTokens <- Topics
+     || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens)
     ]),
     qlc:q([
         RetainedMsg
@@ -348,6 +348,9 @@ search_table(Index, Tokens, Now) ->
         (ExpiryTime == 0) or (ExpiryTime > Now)
     ]).
 
+match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
+match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
+
 clear_batch(Indices, QC) ->
     {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
     lists:foreach(
@@ -421,9 +424,9 @@ make_message_match_spec(Tokens, NowMs) ->
     [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
 
 make_index_match_spec(Index, Tokens, NowMs) ->
-    Cond = emqx_retainer_index:condition(Index, Tokens),
+    {Cond, IsExact} = emqx_retainer_index:condition(Index, Tokens),
     MsHd = #retained_index{key = Cond, expiry_time = '$3'},
-    [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
+    {[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}.
 
 is_table_full() ->
     Limit = emqx:get_config([retainer, backend, max_retained_messages]),

+ 148 - 67
apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl

@@ -22,8 +22,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
--define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
+-include_lib("emqx/include/asserts.hrl").
 
 -import(emqx_mgmt_api_test_util, [
     request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0
@@ -33,25 +32,38 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    application:load(emqx_conf),
-    ok = ekka:start(),
-    ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
-    emqx_retainer_SUITE:load_conf(),
-    emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            emqx,
+            emqx_retainer,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{
+            work_dir => emqx_cth_suite:work_dir(Config)
+        }
+    ),
+    _ = emqx_common_test_http:create_default_app(),
     %% make sure no "$SYS/#" topics
-    emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
-    Config.
+    _ = emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
+    [{apps, Apps} | Config].
 
 end_per_suite(Config) ->
-    ekka:stop(),
-    mria:stop(),
-    mria_mnesia:delete_schema(),
-    emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]),
-    Config.
+    emqx_common_test_http:delete_default_app(),
+    emqx_cth_suite:stop(?config(apps, Config)).
 
 init_per_testcase(_, Config) ->
-    {ok, _} = emqx_cluster_rpc:start_link(),
-    Config.
+    snabbkaffe:start_trace(),
+    emqx_retainer:clean(),
+    {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+    [{client, C} | Config].
+
+end_per_testcase(_, Config) ->
+    ok = emqtt:disconnect(?config(client, Config)),
+    snabbkaffe:stop(),
+    ok.
 
 %%------------------------------------------------------------------------------
 %% Test Cases
@@ -65,7 +77,6 @@ t_config(_Config) ->
         #{
             backend := _,
             enable := _,
-            flow_control := _,
             max_payload_size := _,
             msg_clear_interval := _,
             msg_expiry_interval := _
@@ -90,28 +101,22 @@ t_config(_Config) ->
     UpdateConf(false),
     UpdateConf(true).
 
-t_messages(_) ->
-    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
-    emqx_retainer:clean(),
+t_messages1(Config) ->
+    C = ?config(client, Config),
 
     Each = fun(I) ->
         emqtt:publish(
-            C1,
+            C,
             <<"retained/", (I + 60)>>,
             <<"retained">>,
             [{qos, 0}, {retain, true}]
         )
     end,
 
-    ?check_trace(
-        {ok, {ok, _}} =
-            ?wait_async_action(
-                lists:foreach(Each, lists:seq(1, 5)),
-                #{?snk_kind := message_retained, topic := <<"retained/A">>},
-                500
-            ),
-        []
+    ?assertWaitEvent(
+        lists:foreach(Each, lists:seq(1, 5)),
+        #{?snk_kind := message_retained, topic := <<"retained/A">>},
+        500
     ),
 
     {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
@@ -133,32 +138,120 @@ t_messages(_) ->
             from_username := _
         },
         First
+    ).
+
+t_messages2(Config) ->
+    C = ?config(client, Config),
+
+    ok = lists:foreach(
+        fun(Topic) ->
+            ?assertWaitEvent(
+                emqtt:publish(C, Topic, <<"retained">>, [{qos, 0}, {retain, true}]),
+                #{?snk_kind := message_retained, topic := Topic},
+                500
+            )
+        end,
+        [<<"c">>, <<"c/1">>, <<"c/1/1">>]
     ),
 
-    ok = emqtt:disconnect(C1).
+    {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages?topic=c"])),
 
-t_messages_page(_) ->
-    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
-    emqx_retainer:clean(),
+    #{data := Msgs, meta := _} = decode_json(MsgsJson),
+
+    ?assertEqual(1, length(Msgs)).
+
+t_message1(Config) ->
+    C = ?config(client, Config),
+
+    ?assertWaitEvent(
+        emqtt:publish(C, <<"c/1">>, <<"retained">>, [{qos, 0}, {retain, true}]),
+        #{?snk_kind := message_retained, topic := <<"c/1">>},
+        500
+    ),
+
+    ?assertMatch(
+        {error, {_, 404, _}},
+        request_api(
+            get,
+            api_path(["mqtt", "retainer", "message", "c"])
+        )
+    ),
+
+    {ok, Json} =
+        request_api(
+            get,
+            api_path(["mqtt", "retainer", "message", "c%2F1"])
+        ),
+
+    ?assertMatch(
+        #{
+            topic := <<"c/1">>,
+            payload := <<"cmV0YWluZWQ=">>
+        },
+        decode_json(Json)
+    ).
+
+t_message2(Config) ->
+    C = ?config(client, Config),
+
+    ?assertWaitEvent(
+        emqtt:publish(C, <<"c">>, <<"retained">>, [{qos, 0}, {retain, true}]),
+        #{?snk_kind := message_retained, topic := <<"c">>},
+        500
+    ),
+
+    ?assertMatch(
+        {error, {_, 404, _}},
+        request_api(
+            get,
+            api_path(["mqtt", "retainer", "message", "c%2F%2B"])
+        )
+    ),
+
+    {ok, Json0} =
+        request_api(
+            get,
+            api_path(["mqtt", "retainer", "message", "c"])
+        ),
+
+    ?assertMatch(
+        #{
+            topic := <<"c">>,
+            payload := <<"cmV0YWluZWQ=">>
+        },
+        decode_json(Json0)
+    ),
+
+    {ok, Json1} =
+        request_api(
+            get,
+            api_path(["mqtt", "retainer", "message", "c%2F%23"])
+        ),
+
+    ?assertMatch(
+        #{
+            topic := <<"c">>,
+            payload := <<"cmV0YWluZWQ=">>
+        },
+        decode_json(Json1)
+    ).
+
+t_messages_page(Config) ->
+    C = ?config(client, Config),
 
     Each = fun(I) ->
         emqtt:publish(
-            C1,
+            C,
             <<"retained/", (I + 60)>>,
             <<"retained">>,
             [{qos, 0}, {retain, true}]
         )
     end,
 
-    ?check_trace(
-        {ok, {ok, _}} =
-            ?wait_async_action(
-                lists:foreach(Each, lists:seq(1, 5)),
-                #{?snk_kind := message_retained, topic := <<"retained/A">>},
-                500
-            ),
-        []
+    ?assertWaitEvent(
+        lists:foreach(Each, lists:seq(1, 5)),
+        #{?snk_kind := message_retained, topic := <<"retained/A">>},
+        500
     ),
     Page = 4,
 
@@ -187,17 +280,13 @@ t_messages_page(_) ->
             from_username := _
         },
         OnlyOne
-    ),
+    ).
 
-    ok = emqtt:disconnect(C1).
-
-t_lookup_and_delete(_) ->
-    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
-    emqx_retainer:clean(),
+t_lookup_and_delete(Config) ->
+    C = ?config(client, Config),
     timer:sleep(300),
 
-    emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
+    emqtt:publish(C, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
     timer:sleep(300),
 
     API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
@@ -220,9 +309,7 @@ t_lookup_and_delete(_) ->
     {ok, []} = request_api(delete, API),
 
     {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
-    {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API),
-
-    ok = emqtt:disconnect(C1).
+    {error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API).
 
 t_change_storage_type(_Config) ->
     Path = api_path(["mqtt", "retainer"]),
@@ -310,14 +397,12 @@ t_change_storage_type(_Config) ->
 
     ok.
 
-t_match_and_clean(_) ->
-    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
-    emqx_retainer:clean(),
+t_match_and_clean(Config) ->
+    C = ?config(client, Config),
     timer:sleep(300),
 
     _ = [
-        emqtt:publish(C1, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}])
+        emqtt:publish(C, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}])
      || P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>]
     ],
 
@@ -337,20 +422,16 @@ t_match_and_clean(_) ->
     {ok, []} = request_api(delete, CleanAPI),
 
     {ok, LookupJson2} = request_api(get, API),
-    ?assertMatch(#{data := []}, decode_json(LookupJson2)),
-
-    ok = emqtt:disconnect(C1).
+    ?assertMatch(#{data := []}, decode_json(LookupJson2)).
 
 %%--------------------------------------------------------------------
-%% HTTP Request
+%% Internal funcs
 %%--------------------------------------------------------------------
+
 decode_json(Data) ->
     BinJson = emqx_utils_json:decode(Data, [return_maps]),
     emqx_utils_maps:unsafe_atom_key_map(BinJson).
 
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
 raw_systopic_conf() ->
     #{
         <<"sys_event_messages">> =>

+ 64 - 7
apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl

@@ -57,6 +57,30 @@ t_to_index_key(_Config) ->
             [1, 4],
             [<<"a">>, <<"b">>, <<"c">>]
         )
+    ),
+
+    ?assertEqual(
+        {[1, 2, 3], {[<<"a">>], []}},
+        emqx_retainer_index:to_index_key(
+            [1, 2, 3],
+            [<<"a">>]
+        )
+    ),
+
+    ?assertEqual(
+        {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}},
+        emqx_retainer_index:to_index_key(
+            [3, 5],
+            [<<"x">>, <<"a">>, <<"b">>, <<"y">>]
+        )
+    ),
+
+    ?assertEqual(
+        {[3, 5], {[<<"b">>, <<"z">>], [<<"x">>, <<"a">>, <<"y">>]}},
+        emqx_retainer_index:to_index_key(
+            [3, 5],
+            [<<"x">>, <<"a">>, <<"b">>, <<"y">>, <<"z">>]
+        )
     ).
 
 t_index_score(_Config) ->
@@ -148,7 +172,7 @@ t_condition(_Config) ->
 
 t_condition_index(_Config) ->
     ?assertEqual(
-        {[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}},
+        {{[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}}, true},
         emqx_retainer_index:condition(
             [2, 3],
             ['+', <<"a">>, <<"b">>, '+']
@@ -156,7 +180,7 @@ t_condition_index(_Config) ->
     ),
 
     ?assertEqual(
-        {[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}},
+        {{[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}}, true},
         emqx_retainer_index:condition(
             [3, 4],
             ['+', <<"a">>, <<"b">>, '+']
@@ -164,7 +188,7 @@ t_condition_index(_Config) ->
     ),
 
     ?assertEqual(
-        {[3, 5], {[<<"b">> | '_'], ['_', <<"a">>, '_']}},
+        {{[3, 5], {[<<"b">>], ['_', <<"a">>, '_']}}, true},
         emqx_retainer_index:condition(
             [3, 5],
             ['+', <<"a">>, <<"b">>, '+']
@@ -172,7 +196,7 @@ t_condition_index(_Config) ->
     ),
 
     ?assertEqual(
-        {[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}},
+        {{[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false},
         emqx_retainer_index:condition(
             [3, 5],
             ['+', <<"a">>, <<"b">>, '#']
@@ -180,7 +204,7 @@ t_condition_index(_Config) ->
     ),
 
     ?assertEqual(
-        {[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}},
+        {{[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}}, false},
         emqx_retainer_index:condition(
             [3, 4],
             ['+', <<"a">>, <<"b">>, '#']
@@ -188,7 +212,7 @@ t_condition_index(_Config) ->
     ),
 
     ?assertEqual(
-        {[1], {[<<"a">>], '_'}},
+        {{[1], {[<<"a">>], '_'}}, true},
         emqx_retainer_index:condition(
             [1],
             [<<"a">>, '#']
@@ -196,13 +220,39 @@ t_condition_index(_Config) ->
     ),
 
     ?assertEqual(
-        {[1, 2, 3], {['', <<"saya">>, '_'], []}},
+        {{[1, 2, 3], {['', <<"saya">>, '_'], []}}, true},
         emqx_retainer_index:condition(
             [1, 2, 3],
             ['', <<"saya">>, '+']
         )
+    ),
+
+    ?assertEqual(
+        {{[1, 2, 3], {[<<"c">>], []}}, true},
+        emqx_retainer_index:condition(
+            [1, 2, 3],
+            [<<"c">>]
+        )
+    ),
+
+    ?assertEqual(
+        {{[1, 2, 3], {[<<"c">> | '_'], '_'}}, false},
+        emqx_retainer_index:condition(
+            [1, 2, 3],
+            [<<"c">>, '#']
+        )
+    ),
+
+    ?assertEqual(
+        {{[1], {['_'], '_'}}, true},
+        emqx_retainer_index:condition(
+            [1],
+            ['+', '#']
+        )
     ).
 
+% {[2],[[<<48>>,<<48>>]],['+','+','#']}
+
 t_restore_topic(_Config) ->
     ?assertEqual(
         [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
@@ -223,4 +273,11 @@ t_restore_topic(_Config) ->
         emqx_retainer_index:restore_topic(
             {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}}
         )
+    ),
+
+    ?assertEqual(
+        [<<"a">>],
+        emqx_retainer_index:restore_topic(
+            {[1, 2, 3], {[<<"a">>], []}}
+        )
     ).

+ 106 - 0
apps/emqx_retainer/test/props/prop_emqx_retainer_index.erl

@@ -0,0 +1,106 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(prop_emqx_retainer_index).
+
+-include_lib("proper/include/proper.hrl").
+
+-define(CHARS, 6).
+-define(MAX_TOPIC_LEN, 12).
+-define(MAX_INDEX_LEN, 4).
+-define(MAX_FILTER_LEN, 6).
+
+%%--------------------------------------------------------------------
+%% Properties
+%%--------------------------------------------------------------------
+
+prop_index() ->
+    ?FORALL(
+        {Index, Topics0, Filter},
+        {index_t(), list(topic_t()), filter_t()},
+        begin
+            Topics = lists:usort(Topics0),
+
+            MatchedTopicsDirectly = lists:filter(
+                fun(Topic) ->
+                    emqx_topic:match(Topic, Filter)
+                end,
+                Topics
+            ),
+
+            Tab = ets:new(?MODULE, [set]),
+            ok = lists:foreach(
+                fun(Topic) ->
+                    Key = emqx_retainer_index:to_index_key(Index, Topic),
+                    ets:insert(Tab, {Key, true})
+                end,
+                Topics
+            ),
+
+            {IndexMs, IsExact} = emqx_retainer_index:condition(Index, Filter),
+            Ms = [{{IndexMs, '_'}, [], ['$_']}],
+            MatchedTopixByIndex0 = [
+                emqx_retainer_index:restore_topic(Key)
+             || {Key, _} <- ets:select(Tab, Ms)
+            ],
+            MatchedTopixByIndex =
+                case IsExact of
+                    true ->
+                        MatchedTopixByIndex0;
+                    false ->
+                        lists:filter(
+                            fun(Topic) ->
+                                emqx_topic:match(Topic, Filter)
+                            end,
+                            MatchedTopixByIndex0
+                        )
+                end,
+
+            lists:sort(MatchedTopicsDirectly) =:= lists:sort(MatchedTopixByIndex)
+        end
+    ).
+
+index_t() ->
+    ?LET(
+        {Ints, Len},
+        {non_empty(list(integer(1, ?MAX_TOPIC_LEN))), integer(1, ?MAX_INDEX_LEN)},
+        lists:usort(lists:sublist(Ints, Len))
+    ).
+
+topic_t() ->
+    ?LET(
+        {Topic, Len},
+        {non_empty(list(topic_segment_t())), integer(1, ?MAX_TOPIC_LEN)},
+        lists:sublist(Topic, Len)
+    ).
+
+filter_t() ->
+    ?LET(
+        {TopicFilter, Len, MLWildcard},
+        {
+            non_empty(list(oneof([topic_segment_t(), '+']))),
+            integer(1, ?MAX_FILTER_LEN),
+            oneof([[], ['#']])
+        },
+        lists:sublist(TopicFilter, Len) ++ MLWildcard
+    ).
+
+topic_segment_t() ->
+    ?LET(
+        I,
+        integer(0, ?CHARS - 1),
+        <<($0 + I)>>
+    ).

+ 1 - 0
changes/ce/fix-12303.en.md

@@ -0,0 +1 @@
+Fix message indexing in retainer. Previously, clients with wildcard subscriptions could receive excess retained messages, not belonging to the topics matching the subscription.