|
|
@@ -38,11 +38,9 @@
|
|
|
|
|
|
%% Internal exports (RPC)
|
|
|
-export([
|
|
|
- do_store_retained/1,
|
|
|
- do_clear_expired/0,
|
|
|
- do_delete_message/1,
|
|
|
do_populate_index_meta/1,
|
|
|
- do_reindex_batch/2
|
|
|
+ do_reindex_batch/2,
|
|
|
+ active_indices/0
|
|
|
]).
|
|
|
|
|
|
%% Management API:
|
|
|
@@ -66,6 +64,8 @@
|
|
|
-define(CLEAR_BATCH_SIZE, 1000).
|
|
|
-define(REINDEX_BATCH_SIZE, 1000).
|
|
|
-define(REINDEX_DISPATCH_WAIT, 30000).
|
|
|
+-define(REINDEX_RPC_RETRY_INTERVAL, 1000).
|
|
|
+-define(REINDEX_INDEX_UPDATE_WAIT, 30000).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Management API
|
|
|
@@ -136,64 +136,41 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
|
|
|
end.
|
|
|
|
|
|
store_retained(_, Msg = #message{topic = Topic}) ->
|
|
|
- case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of
|
|
|
- {atomic, ok} ->
|
|
|
- ?tp(debug, message_retained, #{topic => Topic}),
|
|
|
- ok;
|
|
|
- {aborted, Reason} ->
|
|
|
+ ExpiryTime = emqx_retainer:get_expiry_time(Msg),
|
|
|
+ Tokens = topic_to_tokens(Topic),
|
|
|
+ case is_table_full() andalso is_new_topic(Tokens) of
|
|
|
+ true ->
|
|
|
?SLOG(error, #{
|
|
|
msg => "failed_to_retain_message",
|
|
|
topic => Topic,
|
|
|
- reason => Reason
|
|
|
- })
|
|
|
- end.
|
|
|
-
|
|
|
-do_store_retained(#message{topic = Topic} = Msg) ->
|
|
|
- ExpiryTime = emqx_retainer:get_expiry_time(Msg),
|
|
|
- Tokens = topic_to_tokens(Topic),
|
|
|
- case is_table_full() of
|
|
|
+ reason => table_is_full
|
|
|
+ });
|
|
|
false ->
|
|
|
- store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
|
|
|
- _ ->
|
|
|
- case mnesia:read(?TAB_MESSAGE, Tokens, write) of
|
|
|
- [_] ->
|
|
|
- store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
|
|
|
- [] ->
|
|
|
- mnesia:abort(table_is_full)
|
|
|
- end
|
|
|
+ do_store_retained(Msg, Tokens, ExpiryTime)
|
|
|
end.
|
|
|
|
|
|
clear_expired(_) ->
|
|
|
- {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0),
|
|
|
- ok.
|
|
|
-
|
|
|
-do_clear_expired() ->
|
|
|
NowMs = erlang:system_time(millisecond),
|
|
|
QH = qlc:q([
|
|
|
- TopicTokens
|
|
|
+ RetainedMsg
|
|
|
|| #retained_message{
|
|
|
- topic = TopicTokens,
|
|
|
expiry_time = ExpiryTime
|
|
|
- } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
|
|
|
+ } = RetainedMsg <- ets:table(?TAB_MESSAGE),
|
|
|
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
|
|
|
]),
|
|
|
QC = qlc:cursor(QH),
|
|
|
- clear_batch(db_indices(write), QC).
|
|
|
+ clear_batch(dirty_indices(write), QC).
|
|
|
|
|
|
delete_message(_, Topic) ->
|
|
|
- {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]),
|
|
|
- ok.
|
|
|
-
|
|
|
-do_delete_message(Topic) ->
|
|
|
Tokens = topic_to_tokens(Topic),
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
false ->
|
|
|
- ok = delete_message_by_topic(Tokens, db_indices(write));
|
|
|
+ ok = delete_message_by_topic(Tokens, dirty_indices(write));
|
|
|
true ->
|
|
|
- QH = topic_search_table(Tokens),
|
|
|
+ QH = search_table(Tokens, 0),
|
|
|
qlc:fold(
|
|
|
- fun(TopicTokens, _) ->
|
|
|
- ok = delete_message_by_topic(TopicTokens, db_indices(write))
|
|
|
+ fun(RetainedMsg, _) ->
|
|
|
+ ok = delete_message_with_indices(RetainedMsg, dirty_indices(write))
|
|
|
end,
|
|
|
undefined,
|
|
|
QH
|
|
|
@@ -206,7 +183,7 @@ read_message(_, Topic) ->
|
|
|
match_messages(_, Topic, undefined) ->
|
|
|
Tokens = topic_to_tokens(Topic),
|
|
|
Now = erlang:system_time(millisecond),
|
|
|
- QH = search_table(Tokens, Now),
|
|
|
+ QH = msg_table(search_table(Tokens, Now)),
|
|
|
case batch_read_number() of
|
|
|
all_remaining ->
|
|
|
{ok, qlc:eval(QH), undefined};
|
|
|
@@ -227,10 +204,10 @@ page_read(_, Topic, Page, Limit) ->
|
|
|
QH =
|
|
|
case Topic of
|
|
|
undefined ->
|
|
|
- search_table(undefined, ['#'], Now);
|
|
|
+ msg_table(search_table(undefined, ['#'], Now));
|
|
|
_ ->
|
|
|
Tokens = topic_to_tokens(Topic),
|
|
|
- search_table(Tokens, Now)
|
|
|
+ msg_table(search_table(Tokens, Now))
|
|
|
end,
|
|
|
OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
|
|
|
Cursor = qlc:cursor(OrderedQH),
|
|
|
@@ -281,49 +258,49 @@ reindex_status() ->
|
|
|
%% Internal functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-store_retained(Indices, Msg, Tokens, ExpiryTime) ->
|
|
|
- ok = store_retained_message(Msg, Tokens, ExpiryTime),
|
|
|
- ok = emqx_retainer_index:foreach_index_key(
|
|
|
- fun(Key) -> store_retained_index(Key, ExpiryTime) end,
|
|
|
- Indices,
|
|
|
- Tokens
|
|
|
- ).
|
|
|
-
|
|
|
-store_retained_message(Msg, Tokens, ExpiryTime) ->
|
|
|
+do_store_retained(Msg, TopicTokens, ExpiryTime) ->
|
|
|
+ %% Retained message is stored syncronously on all core nodes
|
|
|
+ ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime),
|
|
|
+ %% Since retained message was stored syncronously on all core nodes,
|
|
|
+ %% now we are sure that
|
|
|
+ %% * either we will write correct indices
|
|
|
+ %% * or if we a replicant with outdated write indices due to reindexing,
|
|
|
+ %% the correct indices will be added by reindexing
|
|
|
+ ok = do_store_retained_indices(TopicTokens, ExpiryTime).
|
|
|
+
|
|
|
+do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
|
|
|
RetainedMessage = #retained_message{
|
|
|
- topic = Tokens,
|
|
|
+ topic = TopicTokens,
|
|
|
msg = Msg,
|
|
|
expiry_time = ExpiryTime
|
|
|
},
|
|
|
- mnesia:write(?TAB_MESSAGE, RetainedMessage, write).
|
|
|
+ ok = mria:dirty_write_sync(?TAB_MESSAGE, RetainedMessage).
|
|
|
|
|
|
-store_retained_index(Key, ExpiryTime) ->
|
|
|
+do_store_retained_indices(TopicTokens, ExpiryTime) ->
|
|
|
+ Indices = dirty_indices(write),
|
|
|
+ ok = emqx_retainer_index:foreach_index_key(
|
|
|
+ fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
|
|
+ Indices,
|
|
|
+ TopicTokens
|
|
|
+ ).
|
|
|
+
|
|
|
+do_store_retained_index(Key, ExpiryTime) ->
|
|
|
RetainedIndex = #retained_index{
|
|
|
key = Key,
|
|
|
expiry_time = ExpiryTime
|
|
|
},
|
|
|
- mnesia:write(?TAB_INDEX, RetainedIndex, write).
|
|
|
-
|
|
|
-topic_search_table(Tokens) ->
|
|
|
- Index = emqx_retainer_index:select_index(Tokens, db_indices(read)),
|
|
|
- topic_search_table(Index, Tokens).
|
|
|
+ mria:dirty_write(?TAB_INDEX, RetainedIndex).
|
|
|
|
|
|
-topic_search_table(undefined, Tokens) ->
|
|
|
- Cond = emqx_retainer_index:condition(Tokens),
|
|
|
- Ms = [{#retained_message{topic = Cond, msg = '_', expiry_time = '_'}, [], ['$_']}],
|
|
|
- MsgQH = mnesia:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]),
|
|
|
- qlc:q([Topic || #retained_message{topic = Topic} <- MsgQH]);
|
|
|
-topic_search_table(Index, Tokens) ->
|
|
|
- Cond = emqx_retainer_index:condition(Index, Tokens),
|
|
|
- Ms = [{#retained_index{key = Cond, expiry_time = '_'}, [], ['$_']}],
|
|
|
- IndexQH = mnesia:table(?TAB_INDEX, [{traverse, {select, Ms}}]),
|
|
|
+msg_table(SearchTable) ->
|
|
|
qlc:q([
|
|
|
- emqx_retainer_index:restore_topic(Key)
|
|
|
- || #retained_index{key = Key} <- IndexQH
|
|
|
+ Msg
|
|
|
+ || #retained_message{
|
|
|
+ msg = Msg
|
|
|
+ } <- SearchTable
|
|
|
]).
|
|
|
|
|
|
search_table(Tokens, Now) ->
|
|
|
- Indices = dirty_read_indices(),
|
|
|
+ Indices = dirty_indices(read),
|
|
|
Index = emqx_retainer_index:select_index(Tokens, Indices),
|
|
|
search_table(Index, Tokens, Now).
|
|
|
|
|
|
@@ -341,26 +318,21 @@ search_table(Index, Tokens, Now) ->
|
|
|
|| TopicTokens <- Topics
|
|
|
]),
|
|
|
qlc:q([
|
|
|
- Msg
|
|
|
+ RetainedMsg
|
|
|
|| [
|
|
|
#retained_message{
|
|
|
- msg = Msg,
|
|
|
expiry_time = ExpiryTime
|
|
|
- }
|
|
|
+ } = RetainedMsg
|
|
|
] <- RetainedMsgQH,
|
|
|
(ExpiryTime == 0) or (ExpiryTime > Now)
|
|
|
]).
|
|
|
|
|
|
-dirty_read_indices() ->
|
|
|
- case ets:lookup(?TAB_INDEX_META, ?META_KEY) of
|
|
|
- [#retained_index_meta{read_indices = ReadIndices}] -> ReadIndices;
|
|
|
- [] -> []
|
|
|
- end.
|
|
|
-
|
|
|
clear_batch(Indices, QC) ->
|
|
|
{Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
|
|
|
lists:foreach(
|
|
|
- fun(TopicTokens) -> delete_message_by_topic(TopicTokens, Indices) end,
|
|
|
+ fun(RetainedMsg) ->
|
|
|
+ delete_message_with_indices(RetainedMsg, Indices)
|
|
|
+ end,
|
|
|
Rows
|
|
|
),
|
|
|
case Result of
|
|
|
@@ -369,14 +341,23 @@ clear_batch(Indices, QC) ->
|
|
|
end.
|
|
|
|
|
|
delete_message_by_topic(TopicTokens, Indices) ->
|
|
|
+ case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of
|
|
|
+ [] -> ok;
|
|
|
+ [RetainedMsg] -> delete_message_with_indices(RetainedMsg, Indices)
|
|
|
+ end.
|
|
|
+
|
|
|
+delete_message_with_indices(RetainedMsg, Indices) ->
|
|
|
+ #retained_message{topic = TopicTokens, expiry_time = ExpiryTime} = RetainedMsg,
|
|
|
ok = emqx_retainer_index:foreach_index_key(
|
|
|
fun(Key) ->
|
|
|
- mnesia:delete({?TAB_INDEX, Key})
|
|
|
+ mria:dirty_delete_object(?TAB_INDEX, #retained_index{
|
|
|
+ key = Key, expiry_time = ExpiryTime
|
|
|
+ })
|
|
|
end,
|
|
|
Indices,
|
|
|
TopicTokens
|
|
|
),
|
|
|
- ok = mnesia:delete({?TAB_MESSAGE, TopicTokens}).
|
|
|
+ ok = mria:dirty_delete_object(?TAB_MESSAGE, RetainedMsg).
|
|
|
|
|
|
compare_message(M1, M2) ->
|
|
|
M1#message.timestamp =< M2#message.timestamp.
|
|
|
@@ -415,20 +396,26 @@ qlc_next_answers(QC, N) ->
|
|
|
|
|
|
make_message_match_spec(Tokens, NowMs) ->
|
|
|
Cond = emqx_retainer_index:condition(Tokens),
|
|
|
- MsHd = #retained_message{topic = Cond, msg = '$2', expiry_time = '$3'},
|
|
|
- [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$2']}].
|
|
|
+ MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},
|
|
|
+ [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
|
|
|
|
|
|
make_index_match_spec(Index, Tokens, NowMs) ->
|
|
|
Cond = emqx_retainer_index:condition(Index, Tokens),
|
|
|
MsHd = #retained_index{key = Cond, expiry_time = '$3'},
|
|
|
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
|
|
|
|
|
|
--spec is_table_full() -> boolean().
|
|
|
is_table_full() ->
|
|
|
Limit = emqx:get_config([retainer, backend, max_retained_messages]),
|
|
|
Limit > 0 andalso (table_size() >= Limit).
|
|
|
|
|
|
--spec table_size() -> non_neg_integer().
|
|
|
+is_new_topic(Tokens) ->
|
|
|
+ case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
|
|
|
+ [_] ->
|
|
|
+ false;
|
|
|
+ [] ->
|
|
|
+ true
|
|
|
+ end.
|
|
|
+
|
|
|
table_size() ->
|
|
|
mnesia:table_info(?TAB_MESSAGE, size).
|
|
|
|
|
|
@@ -486,8 +473,14 @@ do_populate_index_meta(ConfigIndices) ->
|
|
|
)
|
|
|
end.
|
|
|
|
|
|
+dirty_indices(Type) ->
|
|
|
+ indices(ets:lookup(?TAB_INDEX_META, ?META_KEY), Type).
|
|
|
+
|
|
|
db_indices(Type) ->
|
|
|
- case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
|
|
|
+ indices(mnesia:read(?TAB_INDEX_META, ?META_KEY), Type).
|
|
|
+
|
|
|
+indices(IndexRecords, Type) ->
|
|
|
+ case IndexRecords of
|
|
|
[#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
|
|
|
case Type of
|
|
|
read -> ReadIndices;
|
|
|
@@ -506,10 +499,15 @@ batch_read_number() ->
|
|
|
reindex(NewIndices, Force, StatusFun) when
|
|
|
is_boolean(Force) andalso is_function(StatusFun, 1)
|
|
|
->
|
|
|
+ %% Do not run on replicants
|
|
|
+ core = mria_rlog:role(),
|
|
|
%% Disable read indices and update write indices so that new records are written
|
|
|
%% with correct indices. Also block parallel reindexing.
|
|
|
case try_start_reindex(NewIndices, Force) of
|
|
|
{atomic, ok} ->
|
|
|
+ %% Wait for all nodes to have new indices, including rlog nodes
|
|
|
+ true = wait_indices_updated({[], NewIndices}, ?REINDEX_INDEX_UPDATE_WAIT),
|
|
|
+
|
|
|
%% Wait for all dispatch operations to be completed to avoid
|
|
|
%% inconsistent results.
|
|
|
true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT),
|
|
|
@@ -592,7 +590,7 @@ reindex_topic(Indices, Topic) ->
|
|
|
case mnesia:read(?TAB_MESSAGE, Topic, read) of
|
|
|
[#retained_message{expiry_time = ExpiryTime}] ->
|
|
|
ok = emqx_retainer_index:foreach_index_key(
|
|
|
- fun(Key) -> store_retained_index(Key, ExpiryTime) end,
|
|
|
+ fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
|
|
Indices,
|
|
|
Topic
|
|
|
);
|
|
|
@@ -627,8 +625,35 @@ do_reindex_batch(QC, Done) ->
|
|
|
|
|
|
wait_dispatch_complete(Timeout) ->
|
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
|
- {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),
|
|
|
+ {Results, []} = emqx_retainer_proto_v2:wait_dispatch_complete(Nodes, Timeout),
|
|
|
lists:all(
|
|
|
fun(Result) -> Result =:= ok end,
|
|
|
Results
|
|
|
).
|
|
|
+
|
|
|
+wait_indices_updated(_Indices, TimeLeft) when TimeLeft < 0 -> false;
|
|
|
+wait_indices_updated(Indices, TimeLeft) ->
|
|
|
+ case timer:tc(fun() -> are_indices_updated(Indices) end) of
|
|
|
+ {_, true} ->
|
|
|
+ true;
|
|
|
+ {TimePassed, false} ->
|
|
|
+ timer:sleep(?REINDEX_RPC_RETRY_INTERVAL),
|
|
|
+ wait_indices_updated(
|
|
|
+ Indices, TimeLeft - ?REINDEX_RPC_RETRY_INTERVAL - TimePassed / 1000
|
|
|
+ )
|
|
|
+ end.
|
|
|
+
|
|
|
+active_indices() ->
|
|
|
+ {dirty_indices(read), dirty_indices(write)}.
|
|
|
+
|
|
|
+are_indices_updated(Indices) ->
|
|
|
+ Nodes = mria_mnesia:running_nodes(),
|
|
|
+ case emqx_retainer_proto_v2:active_mnesia_indices(Nodes) of
|
|
|
+ {Results, []} ->
|
|
|
+ lists:all(
|
|
|
+ fun(NodeIndices) -> NodeIndices =:= Indices end,
|
|
|
+ Results
|
|
|
+ );
|
|
|
+ _ ->
|
|
|
+ false
|
|
|
+ end.
|