|
|
@@ -199,35 +199,31 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
|
|
|
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
|
|
|
ok.
|
|
|
|
|
|
-prepare_batch(
|
|
|
- _ShardId,
|
|
|
- S = #s{trie = Trie, threshold_fun = TFun},
|
|
|
- Operations,
|
|
|
- _Options
|
|
|
-) ->
|
|
|
+prepare_batch(_ShardId, S, Operations, _Options) ->
|
|
|
_ = erase(?lts_persist_ops),
|
|
|
- OperationsCooked = emqx_utils:flattermap(
|
|
|
- fun
|
|
|
- ({Timestamp, Msg = #message{topic = Topic}}) ->
|
|
|
- Tokens = words(Topic),
|
|
|
- {Static, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
|
|
|
- ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
|
|
|
- ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
|
|
|
- case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
|
|
|
- {ok, {Static, Varying}} ->
|
|
|
- ?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete);
|
|
|
- undefined ->
|
|
|
- %% Topic is unknown, nothing to delete.
|
|
|
- []
|
|
|
- end
|
|
|
- end,
|
|
|
- Operations
|
|
|
- ),
|
|
|
+ OperationsCooked = cook(S, Operations, []),
|
|
|
{ok, #{
|
|
|
?cooked_msg_ops => OperationsCooked,
|
|
|
?cooked_lts_ops => pop_lts_persist_ops()
|
|
|
}}.
|
|
|
|
|
|
+cook(_, [], Acc) ->
|
|
|
+ lists:reverse(Acc);
|
|
|
+cook(S, [{Timestamp, Msg = #message{topic = Topic}} | Rest], Acc) ->
|
|
|
+ #s{trie = Trie, threshold_fun = TFun} = S,
|
|
|
+ Tokens = words(Topic),
|
|
|
+ {Static, Varying} = emqx_ds_lts:topic_key(Trie, TFun, Tokens),
|
|
|
+ cook(S, Rest, [?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg)) | Acc]);
|
|
|
+cook(S, [{delete, #message_matcher{topic = Topic, timestamp = Timestamp}} | Rest], Acc) ->
|
|
|
+ #s{trie = Trie} = S,
|
|
|
+ case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
|
|
|
+ {ok, {Static, Varying}} ->
|
|
|
+ cook(S, Rest, [?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete) | Acc]);
|
|
|
+ undefined ->
|
|
|
+ %% Topic is unknown, nothing to delete.
|
|
|
+ cook(S, Rest, Acc)
|
|
|
+ end.
|
|
|
+
|
|
|
commit_batch(
|
|
|
ShardId,
|
|
|
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
|