Просмотр исходного кода

Merge pull request #7323 from savonarola/retainer-indexes

feat(retainer): add topic indexing
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
282766db32

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -23,3 +23,4 @@
 {emqx_topic_metrics,1}.
 {emqx_delayed,1}.
 {emqx_mgmt_cluster,1}.
+{emqx_retainer,1}.

+ 4 - 2
apps/emqx/src/emqx_rpc.erl

@@ -51,11 +51,13 @@
 
 -type badrpc() :: {badrpc, term()} | {badtcp, term()}.
 
--type call_result() :: term() | badrpc().
+-type call_result(Result) :: Result | badrpc().
+
+-type call_result() :: call_result(term()).
 
 -type cast_result() :: true.
 
--type multicall_result(Result) :: {[Result], _BadNodes :: [node()]}.
+-type multicall_result(Result) :: {[call_result(Result)], _BadNodes :: [node()]}.
 
 -type multicall_result() :: multicall_result(term()).
 

+ 5 - 4
apps/emqx/test/emqx_common_test_helpers.erl

@@ -488,8 +488,9 @@ is_tcp_server_available(Host, Port, Timeout) ->
 start_ekka() ->
     try mnesia_hook:module_info() of
         _ -> ekka:start()
-    catch _:_ ->
-        %% Falling back to using Mnesia DB backend.
-        application:set_env(mria, db_backend, mnesia),
-        ekka:start()
+    catch
+        _:_ ->
+            %% Falling back to using Mnesia DB backend.
+            application:set_env(mria, db_backend, mnesia),
+            ekka:start()
     end.

+ 3 - 1
apps/emqx/test/emqx_config_handler_SUITE.erl

@@ -223,7 +223,9 @@ t_callback_crash(_Config) ->
     Opts = #{rawconf_with_defaults => true},
     ok = emqx_config_handler:add_handler(CrashPath, ?MODULE),
     Old = emqx:get_raw_config(CrashPath),
-    ?assertMatch({error, {config_update_crashed, _}}, emqx:update_config(CrashPath, <<"89%">>, Opts)),
+    ?assertMatch(
+        {error, {config_update_crashed, _}}, emqx:update_config(CrashPath, <<"89%">>, Opts)
+    ),
     New = emqx:get_raw_config(CrashPath),
     ?assertEqual(Old, New),
     ok = emqx_config_handler:remove_handler(CrashPath),

+ 3 - 1
apps/emqx_retainer/include/emqx_retainer.hrl

@@ -17,7 +17,9 @@
 -include_lib("emqx/include/emqx.hrl").
 
 -define(APP, emqx_retainer).
--define(TAB, ?APP).
+-define(TAB_MESSAGE, emqx_retainer_message).
+-define(TAB_INDEX, emqx_retainer_index).
+-define(TAB_INDEX_META, emqx_retainer_index_meta).
 -define(RETAINER_SHARD, emqx_retainer_shard).
 
 -type topic() :: binary().

+ 2 - 0
apps/emqx_retainer/src/emqx_retainer_app.erl

@@ -24,7 +24,9 @@
 ]).
 
 start(_Type, _Args) ->
+    ok = emqx_retainer_mnesia_cli:load(),
     emqx_retainer_sup:start_link().
 
 stop(_State) ->
+    ok = emqx_retainer_mnesia_cli:unload(),
     ok.

+ 12 - 0
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -26,6 +26,7 @@
     start_link/2,
     dispatch/2,
     refresh_limiter/0,
+    wait_dispatch_complete/1,
     worker/0
 ]).
 
@@ -61,6 +62,15 @@ refresh_limiter() ->
         Workers
     ).
 
+wait_dispatch_complete(Timeout) ->
+    Workers = gproc_pool:active_workers(?POOL),
+    lists:foreach(
+        fun({_, Pid}) ->
+            ok = gen_server:call(Pid, ?FUNCTION_NAME, Timeout)
+        end,
+        Workers
+    ).
+
 worker() ->
     gproc_pool:pick_worker(?POOL, self()).
 
@@ -120,6 +130,8 @@ init([Pool, Id]) ->
     | {noreply, NewState :: term(), hibernate}
     | {stop, Reason :: term(), Reply :: term(), NewState :: term()}
     | {stop, Reason :: term(), NewState :: term()}.
+handle_call(wait_dispatch_complete, _From, State) ->
+    {reply, ok, State};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.

+ 194 - 0
apps/emqx_retainer/src/emqx_retainer_index.erl

@@ -0,0 +1,194 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_index).
+
+-export([
+    foreach_index_key/3,
+    to_index_key/2,
+    index_score/2,
+    select_index/2,
+    condition/1,
+    condition/2,
+    restore_topic/1
+]).
+
+-export_type([index/0]).
+
+-type index() :: list(pos_integer()).
+
+%% @doc Index key is a term that can be effectively searched in the index table.
+-type index_key() :: {index(), {emqx_topic:words(), emqx_topic:words()}}.
+
+-type match_pattern_part() :: term().
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+%% @doc Given words of a concrete topic (`Tokens') and a list of `Indices',
+%% constructs index keys for the topic and each of the indices.
+%% `Fun' is called with each of these keys.
+-spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_topic:words()) -> ok.
+foreach_index_key(_Fun, [], _Tokens) ->
+    ok;
+foreach_index_key(Fun, [Index | Indices], Tokens) ->
+    Key = to_index_key(Index, Tokens),
+    _ = Fun(Key),
+    foreach_index_key(Fun, Indices, Tokens).
+
+%% @doc Given a concrete topic and an index
+%% returns the corresponding index key.
+%%
+%% In an index key words from indexed and unindexed positions are split.
+%%
+%% E.g given `[2, 3]' index and `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic,
+%% returns `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' term.
+%%
+%% @see foreach_index_key/3
+-spec to_index_key(index(), emqx_topic:words()) -> index_key().
+to_index_key(Index, Tokens) ->
+    {Index, split_index_tokens(Index, Tokens, 1, [], [])}.
+
+%% @doc Given an index and a wildcard topic
+%% returns the length of the constant prefix of the
+%% according index key.
+%%
+%% E.g. for `[2,3]' index and <code>['+', <<"b">>, '+', <<"d">>]</code> wildcard topic
+%% the score is `1', because the according index key pattern is
+%% <code>{[<<"b">>, '_'], ['_', <<"d">>]}</code>.
+%%
+%% @see foreach_index_key/3
+%% @see to_index_key/2
+-spec index_score(index(), emqx_topic:words()) -> non_neg_integer().
+index_score(Index, Tokens) ->
+    index_score(Index, Tokens, 1, 0).
+
+%% @doc Given a list of indices and a wildcard topic
+%% returns index with the best score.
+%%
+%% Returns `undefined' if there are no indices with score `> 0'.
+%%
+%% @see index_score/2
+-spec select_index(emqx:words(), list(index())) -> index() | undefined.
+select_index(Tokens, Indices) ->
+    select_index(Tokens, Indices, 0, undefined).
+
+%% @doc For an index and a wildcard topic
+%% returns a matchspec pattern for the corresponding index key.
+%%
+%% E.g. for `[2, 3]' index and <code>['+', <<"b">>, '+', <<"d">>]</code> wildcard topic
+%% returns <code>{[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}</code> pattern.
+-spec condition(index(), emqx_topic:words()) -> match_pattern_part().
+condition(Index, Tokens) ->
+    {Index, condition(Index, Tokens, 1, [], [])}.
+
+%% @doc Returns a matchspec pattern for a wildcard topic.
+%%
+%% E.g. for <code>['+', <<"b">>, '+', <<"d">>, '#']</code> wildcard topic
+%% returns <code>['_', <<"b">>, '_', <<"d">> | '_']</code> pattern.
+-spec condition(emqx_topic:words()) -> match_pattern_part().
+condition(Tokens) ->
+    Tokens1 = [
+        case W =:= '+' of
+            true -> '_';
+            _ -> W
+        end
+     || W <- Tokens
+    ],
+    case length(Tokens1) > 0 andalso lists:last(Tokens1) =:= '#' of
+        false -> Tokens1;
+        _ -> (Tokens1 -- ['#']) ++ '_'
+    end.
+
+%% @doc Restores concrete topic from its index key representation.
+%%
+%% E.g given `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' index key
+%% returns `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic.
+-spec restore_topic(index_key()) -> emqx_topic:words().
+restore_topic({Index, {IndexTokens, OtherTokens}}) ->
+    restore_topic(Index, IndexTokens, OtherTokens, 1, []).
+
+%%--------------------------------------------------------------------
+%% Private
+%%--------------------------------------------------------------------
+
+split_index_tokens([NIndex | OtherIndex], [Token | Tokens], N, IndexTokens, OtherTokens) when
+    NIndex == N
+->
+    split_index_tokens(OtherIndex, Tokens, N + 1, [Token | IndexTokens], OtherTokens);
+split_index_tokens([_NIndex | _] = Index, [Token | Tokens], N, IndexTokens, OtherTokens) ->
+    split_index_tokens(Index, Tokens, N + 1, IndexTokens, [Token | OtherTokens]);
+split_index_tokens([], Tokens, _N, IndexTokens, OtherTokens) ->
+    {lists:reverse(IndexTokens), lists:reverse(OtherTokens) ++ Tokens};
+split_index_tokens(_Index, [], _N, IndexTokens, OtherTokens) ->
+    {lists:reverse(IndexTokens), lists:reverse(OtherTokens)}.
+
+index_score([N | _Index], [Ph | _Tokens], N, Score) when
+    Ph =:= '+'; Ph =:= '#'
+->
+    Score;
+index_score([N | Index], [_Word | Tokens], N, Score) ->
+    index_score(Index, Tokens, N + 1, Score + 1);
+index_score(Index, [_Word | Tokens], N, Score) ->
+    index_score(Index, Tokens, N + 1, Score);
+index_score([], _Tokens, _N, Score) ->
+    Score;
+index_score(_Index, [], _N, Score) ->
+    Score.
+
+select_index(_Tokens, [], _MaxScore, SelectedIndex) ->
+    SelectedIndex;
+select_index(Tokens, [Index | Indices], MaxScore, SelectedIndex) ->
+    Score = index_score(Index, Tokens),
+    case Score > MaxScore of
+        true ->
+            select_index(Tokens, Indices, Score, Index);
+        false ->
+            select_index(Tokens, Indices, MaxScore, SelectedIndex)
+    end.
+
+condition([_NIndex | _OtherIndex], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
+    {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'};
+condition([], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
+    {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'};
+condition([], Tokens, _N, IndexMatch, OtherMatch) ->
+    {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)};
+condition([_NIndex | _OtherIndex], [], _N, IndexMatch, OtherMatch) ->
+    {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch)};
+condition([NIndex | OtherIndex], ['+' | OtherTokens], N, IndexMatch, OtherMatch) when
+    NIndex =:= N
+->
+    condition(OtherIndex, OtherTokens, N + 1, ['_' | IndexMatch], OtherMatch);
+condition(Index, ['+' | OtherTokens], N, IndexMatch, OtherMatch) ->
+    condition(Index, OtherTokens, N + 1, IndexMatch, ['_' | OtherMatch]);
+condition([NIndex | OtherIndex], [Token | OtherTokens], N, IndexMatch, OtherMatch) when
+    NIndex =:= N, is_binary(Token)
+->
+    condition(OtherIndex, OtherTokens, N + 1, [Token | IndexMatch], OtherMatch);
+condition(Index, [Token | OtherTokens], N, IndexMatch, OtherMatch) when
+    is_binary(Token)
+->
+    condition(Index, OtherTokens, N + 1, IndexMatch, [Token | OtherMatch]).
+
+restore_topic(_Index, [], OtherTokens, _N, Tokens) ->
+    lists:reverse(Tokens) ++ OtherTokens;
+restore_topic([NIndex | OtherIndex], [IndexToken | OtherIndexTokens], OtherTokens, N, Tokens) when
+    NIndex =:= N
+->
+    restore_topic(OtherIndex, OtherIndexTokens, OtherTokens, N + 1, [IndexToken | Tokens]);
+restore_topic(OtherIndex, IndexTokens, [Token | OtherTokens], N, Tokens) ->
+    restore_topic(OtherIndex, IndexTokens, OtherTokens, N + 1, [Token | Tokens]).

+ 499 - 162
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -20,6 +20,7 @@
 
 -include("emqx_retainer.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 
@@ -36,15 +37,52 @@
 
 -export([create_resource/1]).
 
--record(retained, {topic, msg, expiry_time}).
+-export([reindex/2, reindex_status/0]).
 
--type batch_read_result() ::
-    {ok, list(emqx:message()), cursor()}.
+-ifdef(TEST).
+-export([populate_index_meta/0]).
+-export([reindex/3]).
+-endif.
+
+-record(retained_message, {topic, msg, expiry_time}).
+-record(retained_index, {key, expiry_time}).
+-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
+
+-define(META_KEY, index_meta).
+
+-define(CLEAR_BATCH_SIZE, 1000).
+-define(REINDEX_BATCH_SIZE, 1000).
+-define(REINDEX_DISPATCH_WAIT, 30000).
 
 %%--------------------------------------------------------------------
-%% emqx_retainer_storage callbacks
+%% emqx_retainer callbacks
 %%--------------------------------------------------------------------
+
 create_resource(#{storage_type := StorageType}) ->
+    ok = create_table(
+        ?TAB_INDEX_META,
+        retained_index_meta,
+        record_info(fields, retained_index_meta),
+        set,
+        StorageType
+    ),
+    ok = populate_index_meta(),
+    ok = create_table(
+        ?TAB_MESSAGE,
+        retained_message,
+        record_info(fields, retained_message),
+        ordered_set,
+        StorageType
+    ),
+    ok = create_table(
+        ?TAB_INDEX,
+        retained_index,
+        record_info(fields, retained_index),
+        ordered_set,
+        StorageType
+    ).
+
+create_table(Table, RecordName, Attributes, Type, StorageType) ->
     Copies =
         case StorageType of
             ram -> ram_copies;
@@ -60,221 +98,321 @@ create_resource(#{storage_type := StorageType}) ->
         {dets, [{auto_save, 1000}]}
     ],
 
-    ok = mria:create_table(?TAB, [
-        {type, ordered_set},
+    ok = mria:create_table(Table, [
+        {type, Type},
         {rlog_shard, ?RETAINER_SHARD},
         {storage, Copies},
-        {record_name, retained},
-        {attributes, record_info(fields, retained)},
+        {record_name, RecordName},
+        {attributes, Attributes},
         {storage_properties, StoreProps}
     ]),
     ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
-    case mnesia:table_info(?TAB, storage_type) of
+    case mnesia:table_info(Table, storage_type) of
         Copies ->
             ok;
         _Other ->
-            {atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies),
+            {atomic, ok} = mnesia:change_table_copy_type(Table, node(), Copies),
             ok
     end.
 
-store_retained(_, Msg = #message{topic = Topic}) ->
+store_retained(_, #message{topic = Topic} = Msg) ->
     ExpiryTime = emqx_retainer:get_expiry_time(Msg),
-    case is_table_full() of
-        false ->
-            mria:dirty_write(
-                ?TAB,
-                #retained{
-                    topic = topic2tokens(Topic),
-                    msg = Msg,
-                    expiry_time = ExpiryTime
-                }
-            );
-        _ ->
-            Tokens = topic2tokens(Topic),
-            Fun = fun() ->
-                case mnesia:read(?TAB, Tokens) of
-                    [_] ->
-                        mnesia:write(
-                            ?TAB,
-                            #retained{
-                                topic = Tokens,
-                                msg = Msg,
-                                expiry_time = ExpiryTime
-                            },
-                            write
-                        );
-                    [] ->
-                        mnesia:abort(table_is_full)
+    Tokens = topic_to_tokens(Topic),
+    Fun =
+        case is_table_full() of
+            false ->
+                fun() ->
+                    store_retained(db_indices(write), Msg, Tokens, ExpiryTime)
+                end;
+            _ ->
+                fun() ->
+                    case mnesia:read(?TAB_MESSAGE, Tokens, write) of
+                        [_] ->
+                            store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
+                        [] ->
+                            mnesia:abort(table_is_full)
+                    end
                 end
-            end,
-            case mria:transaction(?RETAINER_SHARD, Fun) of
-                {atomic, ok} ->
-                    ok;
-                {aborted, Reason} ->
-                    ?SLOG(error, #{
-                        msg => "failed_to_retain_message",
-                        topic => Topic,
-                        reason => Reason
-                    })
-            end
+        end,
+    case mria:transaction(?RETAINER_SHARD, Fun) of
+        {atomic, ok} ->
+            ?tp(debug, message_retained, #{topic => Topic}),
+            ok;
+        {aborted, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_retain_message",
+                topic => Topic,
+                reason => Reason
+            })
     end.
 
 clear_expired(_) ->
     NowMs = erlang:system_time(millisecond),
-    MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'},
-    Ms = [{MsHd, [{'=/=', '$3', 0}, {'<', '$3', NowMs}], ['$1']}],
+    QH = qlc:q([
+        TopicTokens
+     || #retained_message{
+            topic = TopicTokens,
+            expiry_time = ExpiryTime
+        } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
+        (ExpiryTime =/= 0) and (ExpiryTime < NowMs)
+    ]),
     Fun = fun() ->
-        Keys = mnesia:select(?TAB, Ms, write),
-        lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys)
+        QC = qlc:cursor(QH),
+        clear_batch(db_indices(write), QC)
     end,
     {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun),
     ok.
 
 delete_message(_, Topic) ->
-    case emqx_topic:wildcard(Topic) of
-        true ->
-            match_delete_messages(Topic);
-        false ->
-            Tokens = topic2tokens(Topic),
-            Fun = fun() ->
-                mnesia:delete({?TAB, Tokens})
-            end,
-            _ = mria:transaction(?RETAINER_SHARD, Fun),
-            ok
-    end,
+    Tokens = topic_to_tokens(Topic),
+    DeleteFun =
+        case emqx_topic:wildcard(Topic) of
+            false ->
+                fun() ->
+                    ok = delete_message_by_topic(Tokens, db_indices(write))
+                end;
+            true ->
+                fun() ->
+                    QH = topic_search_table(Tokens),
+                    qlc:fold(
+                        fun(TopicTokens, _) ->
+                            ok = delete_message_by_topic(TopicTokens, db_indices(write))
+                        end,
+                        undefined,
+                        QH
+                    )
+                end
+        end,
+    {atomic, _} = mria:transaction(?RETAINER_SHARD, DeleteFun),
     ok.
 
 read_message(_, Topic) ->
     {ok, read_messages(Topic)}.
 
-page_read(_, Topic, Page, Limit) ->
-    Cursor = make_cursor(Topic),
-    case Page > 1 of
-        true ->
-            _ = qlc:next_answers(Cursor, (Page - 1) * Limit),
-            ok;
-        _ ->
-            ok
-    end,
-    Rows = qlc:next_answers(Cursor, Limit),
-    qlc:delete_cursor(Cursor),
-    {ok, Rows}.
-
-match_messages(_, Topic, Cursor) ->
-    BatchReadNum = emqx:get_config([retainer, flow_control, batch_read_number]),
-    case Cursor of
-        undefined ->
-            case BatchReadNum of
-                0 ->
-                    {ok, sort_retained(match_messages(Topic)), undefined};
-                _ ->
-                    start_batch_read(Topic, BatchReadNum)
-            end;
-        _ ->
-            batch_read_messages(Cursor, BatchReadNum)
+match_messages(_, Topic, undefined) ->
+    Tokens = topic_to_tokens(Topic),
+    Now = erlang:system_time(millisecond),
+    QH = search_table(Tokens, Now),
+    case batch_read_number() of
+        all_remaining ->
+            {ok, qlc:eval(QH), undefined};
+        BatchNum when is_integer(BatchNum) ->
+            Cursor = qlc:cursor(QH),
+            match_messages(undefined, Topic, {Cursor, BatchNum})
+    end;
+match_messages(_, _Topic, {Cursor, BatchNum}) ->
+    case qlc_next_answers(Cursor, BatchNum) of
+        {closed, Rows} ->
+            {ok, Rows, undefined};
+        {more, Rows} ->
+            {ok, Rows, {Cursor, BatchNum}}
     end.
 
+page_read(_, Topic, Page, Limit) ->
+    Now = erlang:system_time(millisecond),
+    QH =
+        case Topic of
+            undefined ->
+                search_table(undefined, ['#'], Now);
+            _ ->
+                Tokens = topic_to_tokens(Topic),
+                search_table(Tokens, Now)
+        end,
+    OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
+    Cursor = qlc:cursor(OrderedQH),
+    NSkip = (Page - 1) * Limit,
+    SkipResult =
+        case NSkip > 0 of
+            true ->
+                {Result, _} = qlc_next_answers(Cursor, NSkip),
+                Result;
+            false ->
+                more
+        end,
+    PageRows =
+        case SkipResult of
+            closed ->
+                [];
+            more ->
+                case qlc_next_answers(Cursor, Limit) of
+                    {closed, Rows} ->
+                        Rows;
+                    {more, Rows} ->
+                        qlc:delete_cursor(Cursor),
+                        Rows
+                end
+        end,
+    {ok, PageRows}.
+
 clean(_) ->
-    _ = mria:clear_table(?TAB),
+    _ = mria:clear_table(?TAB_MESSAGE),
+    _ = mria:clear_table(?TAB_INDEX),
     ok.
 
 size(_) ->
     table_size().
 
+reindex(Force, StatusFun) ->
+    reindex(config_indices(), Force, StatusFun).
+
+reindex_status() ->
+    Fun = fun() ->
+        mnesia:read(?TAB_INDEX_META, ?META_KEY)
+    end,
+    case mria:transaction(?RETAINER_SHARD, Fun) of
+        {atomic, [#retained_index_meta{reindexing = true}]} ->
+            true;
+        {atomic, _} ->
+            false;
+        {aborted, Reason} ->
+            {error, Reason}
+    end.
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
-sort_retained([]) -> [];
-sort_retained([Msg]) -> [Msg];
-sort_retained(Msgs) -> lists:sort(fun compare_message/2, Msgs).
+
+store_retained(Indices, Msg, Tokens, ExpiryTime) ->
+    ok = store_retained_message(Msg, Tokens, ExpiryTime),
+    ok = emqx_retainer_index:foreach_index_key(
+        fun(Key) -> store_retained_index(Key, ExpiryTime) end,
+        Indices,
+        Tokens
+    ).
+
+store_retained_message(Msg, Tokens, ExpiryTime) ->
+    RetainedMessage = #retained_message{
+        topic = Tokens,
+        msg = Msg,
+        expiry_time = ExpiryTime
+    },
+    mnesia:write(?TAB_MESSAGE, RetainedMessage, write).
+
+store_retained_index(Key, ExpiryTime) ->
+    RetainedIndex = #retained_index{
+        key = Key,
+        expiry_time = ExpiryTime
+    },
+    mnesia:write(?TAB_INDEX, RetainedIndex, write).
+
+topic_search_table(Tokens) ->
+    Index = emqx_retainer_index:select_index(Tokens, db_indices(read)),
+    topic_search_table(Index, Tokens).
+
+topic_search_table(undefined, Tokens) ->
+    Cond = emqx_retainer_index:condition(Tokens),
+    Ms = [{#retained_message{topic = Cond, msg = '_', expiry_time = '_'}, [], ['$_']}],
+    MsgQH = mnesia:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]),
+    qlc:q([Topic || #retained_message{topic = Topic} <- MsgQH]);
+topic_search_table(Index, Tokens) ->
+    Cond = emqx_retainer_index:condition(Index, Tokens),
+    Ms = [{#retained_index{key = Cond, expiry_time = '_'}, [], ['$_']}],
+    IndexQH = mnesia:table(?TAB_INDEX, [{traverse, {select, Ms}}]),
+    qlc:q([
+        emqx_retainer_index:restore_topic(Key)
+     || #retained_index{key = Key} <- IndexQH
+    ]).
+
+search_table(Tokens, Now) ->
+    Indices = dirty_read_indices(),
+    Index = emqx_retainer_index:select_index(Tokens, Indices),
+    search_table(Index, Tokens, Now).
+
+search_table(undefined, Tokens, Now) ->
+    Ms = make_message_match_spec(Tokens, Now),
+    ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
+search_table(Index, Tokens, Now) ->
+    Ms = make_index_match_spec(Index, Tokens, Now),
+    Topics = [
+        emqx_retainer_index:restore_topic(Key)
+     || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
+    ],
+    RetainedMsgQH = qlc:q([
+        ets:lookup(?TAB_MESSAGE, TopicTokens)
+     || TopicTokens <- Topics
+    ]),
+    qlc:q([
+        Msg
+     || [
+            #retained_message{
+                msg = Msg,
+                expiry_time = ExpiryTime
+            }
+        ] <- RetainedMsgQH,
+        (ExpiryTime == 0) or (ExpiryTime > Now)
+    ]).
+
+dirty_read_indices() ->
+    case ets:lookup(?TAB_INDEX_META, ?META_KEY) of
+        [#retained_index_meta{read_indices = ReadIndices}] -> ReadIndices;
+        [] -> []
+    end.
+
+clear_batch(Indices, QC) ->
+    {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
+    lists:foreach(
+        fun(TopicTokens) -> delete_message_by_topic(TopicTokens, Indices) end,
+        Rows
+    ),
+    case Result of
+        closed -> ok;
+        more -> clear_batch(Indices, QC)
+    end.
+
+delete_message_by_topic(TopicTokens, Indices) ->
+    ok = emqx_retainer_index:foreach_index_key(
+        fun(Key) ->
+            mnesia:delete({?TAB_INDEX, Key})
+        end,
+        Indices,
+        TopicTokens
+    ),
+    ok = mnesia:delete({?TAB_MESSAGE, TopicTokens}).
 
 compare_message(M1, M2) ->
     M1#message.timestamp =< M2#message.timestamp.
 
-topic2tokens(Topic) ->
+topic_to_tokens(Topic) ->
     emqx_topic:words(Topic).
 
--spec start_batch_read(topic(), pos_integer()) -> batch_read_result().
-start_batch_read(Topic, MaxReadNum) ->
-    Cursor = make_cursor(Topic),
-    batch_read_messages(Cursor, MaxReadNum).
-
--spec batch_read_messages(emqx_retainer_storage:cursor(), pos_integer()) -> batch_read_result().
-batch_read_messages(Cursor, MaxReadNum) ->
-    Answers = qlc:next_answers(Cursor, MaxReadNum),
-    case erlang:length(Answers) < MaxReadNum of
-        true ->
-            qlc:delete_cursor(Cursor),
-            {ok, Answers, undefined};
-        _ ->
-            {ok, Answers, Cursor}
-    end.
-
 -spec read_messages(emqx_types:topic()) ->
     [emqx_types:message()].
 read_messages(Topic) ->
-    Tokens = topic2tokens(Topic),
-    case mnesia:dirty_read(?TAB, Tokens) of
+    Tokens = topic_to_tokens(Topic),
+    case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
         [] ->
             [];
-        [#retained{msg = Msg, expiry_time = Et}] ->
+        [#retained_message{msg = Msg, expiry_time = Et}] ->
             case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of
                 true -> [Msg];
                 false -> []
             end
     end.
 
--spec match_messages(emqx_types:topic()) ->
-    [emqx_types:message()].
-match_messages(Filter) ->
-    Ms = make_match_spec(Filter),
-    mnesia:dirty_select(?TAB, Ms).
-
--spec match_delete_messages(emqx_types:topic()) -> ok.
-match_delete_messages(Filter) ->
-    Cond = condition(emqx_topic:words(Filter)),
-    MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'},
-    Ms = [{MsHd, [], ['$_']}],
-    Rs = mnesia:dirty_select(?TAB, Ms),
-    lists:foreach(fun(R) -> mria:dirty_delete_object(?TAB, R) end, Rs).
-
-%% @private
-condition(Ws) ->
-    Ws1 = [
-        case W =:= '+' of
-            true -> '_';
-            _ -> W
-        end
-     || W <- Ws
-    ],
-    case lists:last(Ws1) =:= '#' of
-        false -> Ws1;
-        _ -> (Ws1 -- ['#']) ++ '_'
+qlc_next_answers(QC, N) ->
+    case qlc:next_answers(QC, N) of
+        NextAnswers when
+            is_list(NextAnswers) andalso
+                length(NextAnswers) < N
+        ->
+            qlc:delete_cursor(QC),
+            {closed, NextAnswers};
+        NextAnswers when is_list(NextAnswers) ->
+            {more, NextAnswers};
+        {error, Module, Reason} ->
+            qlc:delete_cursor(QC),
+            error({qlc_error, Module, Reason})
     end.
 
--spec make_match_spec(undefined | topic()) -> ets:match_spec().
-make_match_spec(Topic) ->
-    NowMs = erlang:system_time(millisecond),
-    Cond =
-        case Topic of
-            undefined ->
-                '_';
-            _ ->
-                condition(emqx_topic:words(Topic))
-        end,
-    MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'},
-    [
-        {MsHd, [{'=:=', '$3', 0}], ['$2']},
-        {MsHd, [{'>', '$3', NowMs}], ['$2']}
-    ].
-
--spec make_cursor(undefined | topic()) -> qlc:query_cursor().
-make_cursor(Topic) ->
-    Ms = make_match_spec(Topic),
-    TabQH = ets:table(?TAB, [{traverse, {select, Ms}}]),
-    QH = qlc:q([E || E <- TabQH]),
-    QH2 = qlc:sort(QH, {order, fun compare_message/2}),
-    qlc:cursor(QH2).
+make_message_match_spec(Tokens, NowMs) ->
+    Cond = emqx_retainer_index:condition(Tokens),
+    MsHd = #retained_message{topic = Cond, msg = '$2', expiry_time = '$3'},
+    [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$2']}].
+
+make_index_match_spec(Index, Tokens, NowMs) ->
+    Cond = emqx_retainer_index:condition(Index, Tokens),
+    MsHd = #retained_index{key = Cond, expiry_time = '$3'},
+    [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
 
 -spec is_table_full() -> boolean().
 is_table_full() ->
@@ -283,4 +421,203 @@ is_table_full() ->
 
 -spec table_size() -> non_neg_integer().
 table_size() ->
-    mnesia:table_info(?TAB, size).
+    mnesia:table_info(?TAB_MESSAGE, size).
+
+config_indices() ->
+    lists:sort(emqx_config:get([retainer, backend, index_specs])).
+
+populate_index_meta() ->
+    ConfigIndices = config_indices(),
+    Fun = fun() ->
+        case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
+            [
+                #retained_index_meta{
+                    read_indices = ReadIndices,
+                    write_indices = WriteIndices,
+                    reindexing = Reindexing
+                }
+            ] ->
+                case {ReadIndices, WriteIndices, Reindexing} of
+                    {_, _, true} ->
+                        ok;
+                    {ConfigIndices, ConfigIndices, false} ->
+                        ok;
+                    {DBWriteIndices, DBReadIndices, false} ->
+                        {error, DBWriteIndices, DBReadIndices}
+                end;
+            [] ->
+                mnesia:write(
+                    ?TAB_INDEX_META,
+                    #retained_index_meta{
+                        key = ?META_KEY,
+                        read_indices = ConfigIndices,
+                        write_indices = ConfigIndices,
+                        reindexing = false
+                    },
+                    write
+                )
+        end
+    end,
+    case mria:transaction(?RETAINER_SHARD, Fun) of
+        {atomic, ok} ->
+            ok;
+        {atomic, {error, DBWriteIndices, DBReadIndices}} ->
+            ?SLOG(warning, #{
+                msg => "emqx_retainer_outdated_indices",
+                config_indices => ConfigIndices,
+                db_write_indices => DBWriteIndices,
+                db_read_indices => DBReadIndices
+            }),
+            ok;
+        {aborted, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_populate_emqx_retainer_indices",
+                reason => Reason
+            }),
+            {error, Reason}
+    end.
+
+db_indices(Type) ->
+    case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
+        [#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
+            case Type of
+                read -> ReadIndices;
+                write -> WriteIndices
+            end;
+        [] ->
+            []
+    end.
+
+batch_read_number() ->
+    case emqx:get_config([retainer, flow_control, batch_read_number]) of
+        0 -> all_remaining;
+        BatchNum when is_integer(BatchNum) -> BatchNum
+    end.
+
+reindex(NewIndices, Force, StatusFun) when
+    is_boolean(Force) andalso is_function(StatusFun, 1)
+->
+    %% Disable read indices and update write indices so that new records are written
+    %% with correct indices. Also block parallel reindexing.
+    case try_start_reindex(NewIndices, Force) of
+        {atomic, ok} ->
+            %% Wait for all dispatch operations to be completed to avoid
+            %% inconsistent results.
+            true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT),
+
+            %% All new dispatch operations will see
+            %% indices disabled, so we feel free to clear index table.
+
+            %% Clear old index records.
+            {atomic, ok} = mria:clear_table(?TAB_INDEX),
+
+            %% Fill index records in batches.
+            QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
+            ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
+
+            %% Enable read indices and unlock reindexing.
+            finalize_reindex();
+        {atomic, Reason} ->
+            Reason
+    end.
+
+try_start_reindex(NewIndices, true) ->
+    mria:transaction(
+        ?RETAINER_SHARD,
+        fun() -> start_reindex(NewIndices) end
+    );
+try_start_reindex(NewIndices, false) ->
+    mria:transaction(
+        ?RETAINER_SHARD,
+        fun() ->
+            case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
+                [#retained_index_meta{reindexing = false}] ->
+                    start_reindex(NewIndices);
+                _ ->
+                    {error, already_started}
+            end
+        end
+    ).
+
+start_reindex(NewIndices) ->
+    ?tp(warning, retainer_message_reindexing_started, #{
+        hint => <<"during reindexing, subscription performance may degrade">>
+    }),
+    mnesia:write(
+        ?TAB_INDEX_META,
+        #retained_index_meta{
+            key = ?META_KEY,
+            read_indices = [],
+            write_indices = NewIndices,
+            reindexing = true
+        },
+        write
+    ).
+
+finalize_reindex() ->
+    {atomic, ok} = mria:transaction(
+        ?RETAINER_SHARD,
+        fun() ->
+            case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
+                [#retained_index_meta{write_indices = WriteIndices} = Meta] ->
+                    mnesia:write(
+                        ?TAB_INDEX_META,
+                        Meta#retained_index_meta{
+                            key = ?META_KEY,
+                            read_indices = WriteIndices,
+                            reindexing = false
+                        },
+                        write
+                    );
+                [] ->
+                    ok
+            end
+        end
+    ),
+    ?tp(warning, retainer_message_reindexing_finished, #{}),
+    ok.
+
+reindex_topic(Indices, Topic) ->
+    case mnesia:read(?TAB_MESSAGE, Topic, read) of
+        [#retained_message{expiry_time = ExpiryTime}] ->
+            ok = emqx_retainer_index:foreach_index_key(
+                fun(Key) -> store_retained_index(Key, ExpiryTime) end,
+                Indices,
+                Topic
+            );
+        [] ->
+            ok
+    end.
+
+reindex_batch(QC, Done, StatusFun) ->
+    Fun = fun() ->
+        Indices = db_indices(write),
+        {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
+        ok = lists:foreach(
+            fun(Topic) -> reindex_topic(Indices, Topic) end,
+            Topics
+        ),
+        {Status, Done + length(Topics)}
+    end,
+    case mria:transaction(?RETAINER_SHARD, Fun) of
+        {atomic, {more, NewDone}} ->
+            _ = StatusFun(NewDone),
+            reindex_batch(QC, NewDone, StatusFun);
+        {atomic, {closed, NewDone}} ->
+            _ = StatusFun(NewDone),
+            ok;
+        {aborted, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_reindex_retained_messages",
+                reason => Reason
+            }),
+            {error, Reason}
+    end.
+
+wait_dispatch_complete(Timeout) ->
+    Nodes = mria_mnesia:running_nodes(),
+    {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),
+    lists:all(
+        fun(Result) -> Result =:= ok end,
+        Results
+    ).

+ 77 - 0
apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl

@@ -0,0 +1,77 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_mnesia_cli).
+
+-include_lib("emqx/include/logger.hrl").
+
+-export([load/0, retainer/1, unload/0]).
+
+-define(PRINT_MSG(Msg), io:format(Msg)).
+
+-define(PRINT(Format, Args), io:format(Format, Args)).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+load() ->
+    ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
+
+retainer(["reindex", "status"]) ->
+    case emqx_retainer_mnesia:reindex_status() of
+        true ->
+            ?PRINT_MSG("Reindexing is in progress~n");
+        false ->
+            ?PRINT_MSG("Reindexing is not running~n");
+        {error, Reason} ->
+            ?PRINT("Can't get reindex status: ~p~n", [Reason])
+    end;
+retainer(["reindex", "start"]) ->
+    retainer(["reindex", "start", "false"]);
+retainer(["reindex", "start", ForceParam]) ->
+    Force =
+        case ForceParam of
+            "true" -> true;
+            _ -> false
+        end,
+    ?PRINT_MSG("Starting reindexing~n"),
+    emqx_retainer_mnesia:reindex(
+        Force,
+        fun(Done) ->
+            ?SLOG(
+                info,
+                #{
+                    msg => "retainer_message_record_reindexing_progress",
+                    done => Done
+                }
+            ),
+            ?PRINT("Reindexed ~p messages~n", [Done])
+        end
+    ),
+    ?PRINT_MSG("Reindexing finished~n");
+retainer(_) ->
+    emqx_ctl:usage(
+        [
+            {"retainer reindex status", "Show reindex status"},
+            {"retainer reindex start [force]",
+                "Generate new retainer topic indices config settings.\n"
+                "Pass true as <Force> to ignore previously started reindexing"}
+        ]
+    ).
+
+unload() ->
+    ok = emqx_ctl:unregister_command(retainer).

+ 42 - 1
apps/emqx_retainer/src/emqx_retainer_schema.erl

@@ -5,6 +5,13 @@
 
 -export([roots/0, fields/1, desc/1, namespace/0]).
 
+-define(DEFAULT_INDICES, [
+    [1, 2, 3],
+    [1, 3],
+    [2, 3],
+    [3]
+]).
+
 namespace() -> "retainer".
 
 roots() -> ["retainer"].
@@ -54,7 +61,8 @@ fields(mnesia_config) ->
                 max_retained_messages,
                 0,
                 fun is_pos_integer/1
-            )}
+            )},
+        {index_specs, fun retainer_indices/1}
     ];
 fields(flow_control) ->
     [
@@ -113,3 +121,36 @@ backend_config() ->
         backend,
         mnesia_config
     ).
+
+retainer_indices(type) ->
+    list(list(integer()));
+retainer_indices(desc) ->
+    "Retainer index specifications: list of arrays of positive ascending integers. "
+    "Each array specifies an index. Numbers in an index specification are 1-based "
+    "word positions in topics. Words from specified positions will be used for indexing.<br>"
+    "For example, it is good to have <code>[2, 4]</code> index to optimize "
+    "<code>+/X/+/Y/...</code> topic wildcard subscriptions.";
+retainer_indices(example) ->
+    [[2, 4], [1, 3]];
+retainer_indices(default) ->
+    ?DEFAULT_INDICES;
+retainer_indices(validator) ->
+    fun is_valid_index_specs/1;
+retainer_indices(_) ->
+    undefined.
+
+is_valid_index_specs(IndexSpecs) ->
+    case lists:all(fun is_valid_index_spec/1, IndexSpecs) of
+        true ->
+            case length(IndexSpecs) =:= ordsets:size(ordsets:from_list(IndexSpecs)) of
+                true -> ok;
+                false -> {error, duplicate_index_specs}
+            end;
+        false ->
+            {error, invalid_index_spec}
+    end.
+
+is_valid_index_spec(IndexSpec) ->
+    length(IndexSpec) > 0 andalso
+        lists:all(fun(Idx) -> Idx > 0 end, IndexSpec) andalso
+        IndexSpec =:= ordsets:to_list(ordsets:from_list(IndexSpec)).

+ 33 - 0
apps/emqx_retainer/src/proto/emqx_retainer_proto_v1.erl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-export([
+    introduced_in/0,
+    wait_dispatch_complete/2
+]).
+
+introduced_in() ->
+    "5.0.0".
+
+-spec wait_dispatch_complete(list(node()), timeout()) -> emqx_rpc:multicall_result(ok).
+wait_dispatch_complete(Nodes, Timeout) ->
+    rpc:multicall(Nodes, emqx_retainer_dispatcher, wait_dispatch_complete, [Timeout]).

+ 155 - 23
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -19,13 +19,30 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
--define(APP, emqx_retainer).
 -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
 
+-include("emqx_retainer.hrl").
+
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+all() ->
+    [
+        {group, mnesia_without_indices},
+        {group, mnesia_with_indices},
+        {group, mnesia_reindex}
+    ].
+
+groups() ->
+    [
+        {mnesia_without_indices, [sequence], common_tests()},
+        {mnesia_with_indices, [sequence], common_tests()},
+        {mnesia_reindex, [sequence], [t_reindex]}
+    ].
 
-all() -> emqx_common_test_helpers:all(?MODULE).
+common_tests() ->
+    emqx_common_test_helpers:all(?MODULE) -- [t_reindex].
 
 -define(BASE_CONF, <<
     ""
@@ -54,13 +71,9 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 %%--------------------------------------------------------------------
 
 init_per_suite(Config) ->
-    application:load(emqx_conf),
-    ok = ekka:start(),
-    ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
-
     load_base_conf(),
     emqx_ratelimiter_SUITE:base_conf(),
-    emqx_common_test_helpers:start_apps([emqx_retainer]),
+    emqx_common_test_helpers:start_apps([emqx_conf, ?APP]),
     Config.
 
 end_per_suite(_Config) ->
@@ -68,11 +81,26 @@ end_per_suite(_Config) ->
     mria:stop(),
     mria_mnesia:delete_schema(),
 
-    emqx_common_test_helpers:stop_apps([emqx_retainer]).
+    emqx_common_test_helpers:stop_apps([?APP, emqx_conf]).
+
+init_per_group(mnesia_without_indices, Config) ->
+    mnesia:clear_table(?TAB_INDEX_META),
+    mnesia:clear_table(?TAB_INDEX),
+    mnesia:clear_table(?TAB_MESSAGE),
+    Config;
+init_per_group(mnesia_reindex, Config) ->
+    emqx_retainer_mnesia:populate_index_meta(),
+    mnesia:clear_table(?TAB_INDEX),
+    mnesia:clear_table(?TAB_MESSAGE),
+    Config;
+init_per_group(_, Config) ->
+    emqx_retainer_mnesia:populate_index_meta(),
+    mnesia:clear_table(?TAB_INDEX),
+    mnesia:clear_table(?TAB_MESSAGE),
+    Config.
 
-init_per_testcase(_, Config) ->
-    {ok, _} = emqx_cluster_rpc:start_link(),
-    timer:sleep(200),
+end_per_group(_Group, Config) ->
+    emqx_retainer_mnesia:populate_index_meta(),
     Config.
 
 load_base_conf() ->
@@ -116,6 +144,8 @@ t_retain_handling(_) ->
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),
 
+    ok = emqx_retainer:clean(),
+
     %% rh = 0, no wildcard, and with empty retained message
     {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
     ?assertEqual(0, length(receive_messages(1))),
@@ -247,21 +277,26 @@ t_message_expiry(_) ->
     ok = emqtt:disconnect(C1).
 
 t_message_expiry_2(_) ->
-    emqx_retainer:update_config(#{<<"msg_expiry_interval">> => <<"2s">>}),
-    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
-    {ok, _} = emqtt:connect(C1),
-    emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
+    ConfMod = fun(Conf) ->
+        Conf#{<<"msg_expiry_interval">> := <<"2s">>}
+    end,
+    Case = fun() ->
+        {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+        {ok, _} = emqtt:connect(C1),
+        emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
 
-    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
-    ?assertEqual(1, length(receive_messages(1))),
-    timer:sleep(4000),
-    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
-    ?assertEqual(0, length(receive_messages(1))),
-    {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
+        ?assertEqual(1, length(receive_messages(1))),
+        timer:sleep(4000),
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
+        ?assertEqual(0, length(receive_messages(1))),
+        {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
 
-    emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]),
+        emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]),
 
-    ok = emqtt:disconnect(C1).
+        ok = emqtt:disconnect(C1)
+    end,
+    with_conf(ConfMod, Case).
 
 t_clean(_) ->
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
@@ -488,10 +523,107 @@ t_only_for_coverage(_) ->
     true = erlang:exit(Dispatcher, normal),
     ok.
 
+t_reindex(_) ->
+    {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+
+    ok = emqx_retainer:clean(),
+    ok = emqx_retainer_mnesia:reindex([[1, 3]], false, fun(_Done) -> ok end),
+
+    %% Prepare retained messages for "retained/N1/N2" topics
+    ?check_trace(
+        ?wait_async_action(
+            lists:foreach(
+                fun(N1) ->
+                    lists:foreach(
+                        fun(N2) ->
+                            emqtt:publish(
+                                C,
+                                erlang:iolist_to_binary([
+                                    <<"retained/">>,
+                                    io_lib:format("~5..0w", [N1]),
+                                    <<"/">>,
+                                    io_lib:format("~5..0w", [N2])
+                                ]),
+                                <<"this is a retained message">>,
+                                [{qos, 0}, {retain, true}]
+                            )
+                        end,
+                        lists:seq(1, 10)
+                    )
+                end,
+                lists:seq(1, 1000)
+            ),
+            #{?snk_kind := message_retained, topic := <<"retained/01000/00010">>},
+            5000
+        ),
+        []
+    ),
+
+    ?check_trace(
+        ?wait_async_action(
+            begin
+                %% Spawn reindexing in the background
+                spawn_link(
+                    fun() ->
+                        timer:sleep(1000),
+                        emqx_retainer_mnesia:reindex(
+                            [[1, 4]],
+                            false,
+                            fun(Done) ->
+                                ?tp(
+                                    info,
+                                    reindexing_progress,
+                                    #{done => Done}
+                                )
+                            end
+                        )
+                    end
+                ),
+
+                %% Subscribe to "retained/N/+" for some time, while reindexing is in progress
+                T = erlang:monotonic_time(millisecond),
+                ok = test_retain_while_reindexing(C, T + 3000)
+            end,
+            #{?snk_kind := reindexing_progress, done := 10000},
+            10000
+        ),
+        fun(Trace) ->
+            ?assertMatch(
+                [_ | _],
+                lists:filter(
+                    fun
+                        (#{done := 10000}) -> true;
+                        (_) -> false
+                    end,
+                    ?of_kind(reindexing_progress, Trace)
+                )
+            )
+        end
+    ).
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------
 
+test_retain_while_reindexing(C, Deadline) ->
+    case erlang:monotonic_time(millisecond) > Deadline of
+        true ->
+            ok;
+        false ->
+            N = rand:uniform(1000),
+            Topic = iolist_to_binary([
+                <<"retained/">>,
+                io_lib:format("~5..0w", [N]),
+                <<"/+">>
+            ]),
+            {ok, #{}, [0]} = emqtt:subscribe(C, Topic, [{qos, 0}, {rh, 0}]),
+            Messages = receive_messages(10),
+            ?assertEqual(10, length(Messages)),
+            {ok, #{}, [0]} = emqtt:unsubscribe(C, Topic),
+            test_retain_while_reindexing(C, Deadline)
+    end.
+
 receive_messages(Count) ->
     receive_messages(Count, []).
 receive_messages(0, Msgs) ->

+ 9 - 3
apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl

@@ -21,6 +21,7 @@
 -include("emqx_retainer.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
 
@@ -89,7 +90,6 @@ t_messages(_) ->
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),
     emqx_retainer:clean(),
-    timer:sleep(500),
 
     Each = fun(I) ->
         emqtt:publish(
@@ -100,8 +100,14 @@ t_messages(_) ->
         )
     end,
 
-    lists:foreach(Each, lists:seq(1, 5)),
-    timer:sleep(500),
+    ?check_trace(
+        ?wait_async_action(
+            lists:foreach(Each, lists:seq(1, 5)),
+            #{?snk_kind := message_retained, topic := <<"retained/A">>},
+            500
+        ),
+        []
+    ),
 
     {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
     Msgs = decode_json(MsgsJson),

+ 42 - 9
apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl

@@ -19,21 +19,54 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include("emqx_retainer.hrl").
+
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
-init_per_testcase(_TestCase, Config) ->
+init_per_suite(Config) ->
+    emqx_retainer_SUITE:load_base_conf(),
+    %% Start Apps
+    emqx_common_test_helpers:start_apps([emqx_retainer]),
     Config.
 
-end_per_testcase(_TestCase, Config) ->
-    Config.
+end_per_suite(_Config) ->
+    emqx_common_test_helpers:stop_apps([emqx_retainer]).
+
+t_reindex_status(_Config) ->
+    ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]).
+
+t_reindex(_Config) ->
+    {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+
+    ok = emqx_retainer:clean(),
 
-% t_cmd(_) ->
-%     error('TODO').
+    ?check_trace(
+        ?wait_async_action(
+            lists:foreach(
+                fun(N) ->
+                    emqtt:publish(
+                        C,
+                        erlang:iolist_to_binary([
+                            <<"retained/">>,
+                            io_lib:format("~5..0w", [N])
+                        ]),
+                        <<"this is a retained message">>,
+                        [{qos, 0}, {retain, true}]
+                    )
+                end,
+                lists:seq(1, 1000)
+            ),
+            #{?snk_kind := message_retained, topic := <<"retained/01000">>},
+            1000
+        ),
+        []
+    ),
 
-% t_unload(_) ->
-%     error('TODO').
+    emqx_config:put([retainer, backend, index_specs], [[4, 5]]),
+    ok = emqx_retainer_mnesia_cli:retainer(["reindex", "start"]),
 
-% t_load(_) ->
-%     error('TODO').
+    ?assertEqual(1000, mnesia:table_info(?TAB_INDEX, size)).

+ 218 - 0
apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl

@@ -0,0 +1,218 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_index_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(_TestCase, Config) ->
+    Config.
+
+t_foreach_index_key(_Config) ->
+    put(index_key, undefined),
+    ok = emqx_retainer_index:foreach_index_key(
+        fun(IndexKey) -> put(index_key, IndexKey) end,
+        [[1, 3]],
+        [<<"a">>, <<"b">>, <<"c">>]
+    ),
+
+    ?assertEqual(
+        {[1, 3], {[<<"a">>, <<"c">>], [<<"b">>]}},
+        get(index_key)
+    ).
+
+t_to_index_key(_Config) ->
+    ?assertEqual(
+        {[1, 3], {[<<"a">>, <<"c">>], [<<"b">>]}},
+        emqx_retainer_index:to_index_key(
+            [1, 3],
+            [<<"a">>, <<"b">>, <<"c">>]
+        )
+    ),
+
+    ?assertEqual(
+        {[1, 4], {[<<"a">>], [<<"b">>, <<"c">>]}},
+        emqx_retainer_index:to_index_key(
+            [1, 4],
+            [<<"a">>, <<"b">>, <<"c">>]
+        )
+    ).
+
+t_index_score(_Config) ->
+    ?assertEqual(
+        0,
+        emqx_retainer_index:index_score(
+            [1, 4],
+            ['+', <<"a">>, <<"b">>, '+']
+        )
+    ),
+
+    ?assertEqual(
+        0,
+        emqx_retainer_index:index_score(
+            [1, 2],
+            ['+', <<"a">>, <<"b">>, '+']
+        )
+    ),
+
+    ?assertEqual(
+        2,
+        emqx_retainer_index:index_score(
+            [1, 2],
+            [<<"a">>, <<"b">>, '+']
+        )
+    ),
+
+    ?assertEqual(
+        1,
+        emqx_retainer_index:index_score(
+            [1, 2],
+            [<<"a">>]
+        )
+    ),
+
+    ?assertEqual(
+        1,
+        emqx_retainer_index:index_score(
+            [2, 3, 4, 5],
+            ['+', <<"a">>, '#']
+        )
+    ),
+
+    ?assertEqual(
+        2,
+        emqx_retainer_index:index_score(
+            [2, 3, 4, 5],
+            ['+', <<"a">>, <<"b">>, '+']
+        )
+    ).
+
+t_select_index(_Config) ->
+    ?assertEqual(
+        [2, 3, 4, 5],
+        emqx_retainer_index:select_index(
+            ['+', <<"a">>, <<"b">>, '+'],
+            [
+                [1, 4],
+                [2, 3, 4, 5],
+                [1, 2]
+            ]
+        )
+    ),
+
+    ?assertEqual(
+        undefined,
+        emqx_retainer_index:select_index(
+            ['+', <<"a">>, <<"b">>, '+'],
+            [
+                [1, 4]
+            ]
+        )
+    ).
+
+t_condition(_Config) ->
+    ?assertEqual(
+        ['_', <<"a">>, <<"b">>, '_'],
+        emqx_retainer_index:condition(
+            ['+', <<"a">>, <<"b">>, '+']
+        )
+    ),
+
+    ?assertEqual(
+        ['_', <<"a">> | '_'],
+        emqx_retainer_index:condition(
+            ['+', <<"a">>, '#']
+        )
+    ).
+
+t_condition_index(_Config) ->
+    ?assertEqual(
+        {[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}},
+        emqx_retainer_index:condition(
+            [2, 3],
+            ['+', <<"a">>, <<"b">>, '+']
+        )
+    ),
+
+    ?assertEqual(
+        {[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}},
+        emqx_retainer_index:condition(
+            [3, 4],
+            ['+', <<"a">>, <<"b">>, '+']
+        )
+    ),
+
+    ?assertEqual(
+        {[3, 5], {[<<"b">> | '_'], ['_', <<"a">>, '_']}},
+        emqx_retainer_index:condition(
+            [3, 5],
+            ['+', <<"a">>, <<"b">>, '+']
+        )
+    ),
+
+    ?assertEqual(
+        {[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}},
+        emqx_retainer_index:condition(
+            [3, 5],
+            ['+', <<"a">>, <<"b">>, '#']
+        )
+    ),
+
+    ?assertEqual(
+        {[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}},
+        emqx_retainer_index:condition(
+            [3, 4],
+            ['+', <<"a">>, <<"b">>, '#']
+        )
+    ),
+
+    ?assertEqual(
+        {[1], {[<<"a">>], '_'}},
+        emqx_retainer_index:condition(
+            [1],
+            [<<"a">>, '#']
+        )
+    ).
+
+t_restore_topic(_Config) ->
+    ?assertEqual(
+        [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
+        emqx_retainer_index:restore_topic(
+            {[2, 3], {[<<"a">>, <<"b">>], [<<"x">>, <<"y">>]}}
+        )
+    ),
+
+    ?assertEqual(
+        [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
+        emqx_retainer_index:restore_topic(
+            {[3, 4], {[<<"b">>, <<"y">>], [<<"x">>, <<"a">>]}}
+        )
+    ),
+
+    ?assertEqual(
+        [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
+        emqx_retainer_index:restore_topic(
+            {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}}
+        )
+    ).