|
|
@@ -263,12 +263,22 @@ reindex_status() ->
|
|
|
|
|
|
do_store_retained(Msg, TopicTokens, ExpiryTime) ->
|
|
|
%% Retained message is stored syncronously on all core nodes
|
|
|
+ %%
|
|
|
+ %% No transaction, meaning that concurrent writes in the cluster may
|
|
|
+ %% lead to inconsistent replicas. This could manifest in two clients
|
|
|
+ %% getting different retained messages for the same topic, depending
|
|
|
+ %% on which node they are connected to. We tolerate that.
|
|
|
ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime),
|
|
|
%% Since retained message was stored syncronously on all core nodes,
|
|
|
%% now we are sure that
|
|
|
%% * either we will write correct indices
|
|
|
%% * or if we a replicant with outdated write indices due to reindexing,
|
|
|
%% the correct indices will be added by reindexing
|
|
|
+ %%
|
|
|
+ %% No transacation as well, meaning that concurrent writes in the cluster
|
|
|
+ %% may lead to inconsistent index replicas. This essentially allows for
|
|
|
+ %% inconsistent query results, where index entry has different expiry time
|
|
|
+ %% than the message it points to.
|
|
|
ok = do_store_retained_indices(TopicTokens, ExpiryTime).
|
|
|
|
|
|
do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
|
|
|
@@ -281,18 +291,20 @@ do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
|
|
|
|
|
|
do_store_retained_indices(TopicTokens, ExpiryTime) ->
|
|
|
Indices = dirty_indices(write),
|
|
|
- ok = emqx_retainer_index:foreach_index_key(
|
|
|
- fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
|
|
- Indices,
|
|
|
- TopicTokens
|
|
|
- ).
|
|
|
+ ok = mria:async_dirty(?RETAINER_SHARD, fun() ->
|
|
|
+ emqx_retainer_index:foreach_index_key(
|
|
|
+ fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
|
|
+ Indices,
|
|
|
+ TopicTokens
|
|
|
+ )
|
|
|
+ end).
|
|
|
|
|
|
do_store_retained_index(Key, ExpiryTime) ->
|
|
|
RetainedIndex = #retained_index{
|
|
|
key = Key,
|
|
|
expiry_time = ExpiryTime
|
|
|
},
|
|
|
- mria:dirty_write(?TAB_INDEX, RetainedIndex).
|
|
|
+ mnesia:write(?TAB_INDEX, RetainedIndex, write).
|
|
|
|
|
|
msg_table(SearchTable) ->
|
|
|
qlc:q([
|