|
|
@@ -23,7 +23,6 @@
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
-include_lib("stdlib/include/ms_transform.hrl").
|
|
|
--include_lib("stdlib/include/qlc.hrl").
|
|
|
|
|
|
%% emqx_retainer callbacks
|
|
|
-export([
|
|
|
@@ -64,7 +63,6 @@
|
|
|
|
|
|
-define(META_KEY, index_meta).
|
|
|
|
|
|
--define(CLEAR_BATCH_SIZE, 1000).
|
|
|
-define(REINDEX_BATCH_SIZE, 1000).
|
|
|
-define(REINDEX_DISPATCH_WAIT, 30000).
|
|
|
-define(REINDEX_RPC_RETRY_INTERVAL, 1000).
|
|
|
@@ -177,15 +175,20 @@ clear_expired(_) ->
|
|
|
|
|
|
clear_expired() ->
|
|
|
NowMs = erlang:system_time(millisecond),
|
|
|
- QH = qlc:q([
|
|
|
- RetainedMsg
|
|
|
- || #retained_message{
|
|
|
- expiry_time = ExpiryTime
|
|
|
- } = RetainedMsg <- ets:table(?TAB_MESSAGE),
|
|
|
- (ExpiryTime =/= 0) and (ExpiryTime < NowMs)
|
|
|
- ]),
|
|
|
- QC = qlc:cursor(QH),
|
|
|
- clear_batch(dirty_indices(write), QC).
|
|
|
+ S0 = ets_stream(?TAB_MESSAGE),
|
|
|
+ S1 = stream_filter(
|
|
|
+ fun(#retained_message{expiry_time = ExpiryTime}) ->
|
|
|
+ ExpiryTime =/= 0 andalso ExpiryTime < NowMs
|
|
|
+ end,
|
|
|
+ S0
|
|
|
+ ),
|
|
|
+ DirtyWriteIndices = dirty_indices(write),
|
|
|
+ stream_foreach(
|
|
|
+ fun(RetainedMsg) ->
|
|
|
+ delete_message_with_indices(RetainedMsg, DirtyWriteIndices)
|
|
|
+ end,
|
|
|
+ S1
|
|
|
+ ).
|
|
|
|
|
|
delete_message(_State, Topic) ->
|
|
|
Tokens = topic_to_tokens(Topic),
|
|
|
@@ -193,13 +196,11 @@ delete_message(_State, Topic) ->
|
|
|
false ->
|
|
|
ok = delete_message_by_topic(Tokens, dirty_indices(write));
|
|
|
true ->
|
|
|
- QH = search_table(Tokens, 0),
|
|
|
- qlc:fold(
|
|
|
- fun(RetainedMsg, _) ->
|
|
|
- ok = delete_message_with_indices(RetainedMsg, dirty_indices(write))
|
|
|
- end,
|
|
|
- undefined,
|
|
|
- QH
|
|
|
+ S = search_stream(Tokens, 0),
|
|
|
+ DirtyWriteIndices = dirty_indices(write),
|
|
|
+ stream_foreach(
|
|
|
+ fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end,
|
|
|
+ S
|
|
|
)
|
|
|
end.
|
|
|
|
|
|
@@ -209,59 +210,52 @@ read_message(_State, Topic) ->
|
|
|
match_messages(State, Topic, undefined) ->
|
|
|
Tokens = topic_to_tokens(Topic),
|
|
|
Now = erlang:system_time(millisecond),
|
|
|
- QH = msg_table(search_table(Tokens, Now)),
|
|
|
+ S = msg_stream(search_stream(Tokens, Now)),
|
|
|
case batch_read_number() of
|
|
|
all_remaining ->
|
|
|
- {ok, qlc:eval(QH), undefined};
|
|
|
+ {ok, emqx_utils_stream:consume(S), undefined};
|
|
|
BatchNum when is_integer(BatchNum) ->
|
|
|
- Cursor = qlc:cursor(QH),
|
|
|
- match_messages(State, Topic, {Cursor, BatchNum})
|
|
|
+ match_messages(undefined, Topic, {S, BatchNum})
|
|
|
end;
|
|
|
-match_messages(_State, _Topic, {Cursor, BatchNum}) ->
|
|
|
- case qlc_next_answers(Cursor, BatchNum) of
|
|
|
- {closed, Rows} ->
|
|
|
- {ok, Rows, undefined};
|
|
|
- {more, Rows} ->
|
|
|
- {ok, Rows, {Cursor, BatchNum}}
|
|
|
+match_messages(_State, _Topic, {S0, BatchNum}) ->
|
|
|
+ case emqx_utils_stream:consume(BatchNum, S0) of
|
|
|
+ {Rows, S1} ->
|
|
|
+ {ok, Rows, {S1, BatchNum}};
|
|
|
+ Rows when is_list(Rows) ->
|
|
|
+ {ok, Rows, undefined}
|
|
|
end.
|
|
|
|
|
|
-delete_cursor(_State, {Cursor, _}) ->
|
|
|
- qlc:delete_cursor(Cursor);
|
|
|
-delete_cursor(_State, undefined) ->
|
|
|
+delete_cursor(_State, _Cursor) ->
|
|
|
ok.
|
|
|
|
|
|
page_read(_State, Topic, Page, Limit) ->
|
|
|
Now = erlang:system_time(millisecond),
|
|
|
- QH =
|
|
|
+ S0 =
|
|
|
case Topic of
|
|
|
undefined ->
|
|
|
- msg_table(search_table(undefined, ['#'], Now));
|
|
|
+ msg_stream(search_stream(undefined, ['#'], Now));
|
|
|
_ ->
|
|
|
Tokens = topic_to_tokens(Topic),
|
|
|
- msg_table(search_table(Tokens, Now))
|
|
|
+ msg_stream(search_stream(Tokens, Now))
|
|
|
end,
|
|
|
- OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
|
|
|
- Cursor = qlc:cursor(OrderedQH),
|
|
|
+ %% This is very inefficient, but we are limited with inherited API
|
|
|
+ S1 = emqx_utils_stream:list(
|
|
|
+ lists:sort(
|
|
|
+ fun compare_message/2,
|
|
|
+ emqx_utils_stream:consume(S0)
|
|
|
+ )
|
|
|
+ ),
|
|
|
NSkip = (Page - 1) * Limit,
|
|
|
- SkipResult =
|
|
|
- case NSkip > 0 of
|
|
|
- true ->
|
|
|
- {Result, _} = qlc_next_answers(Cursor, NSkip),
|
|
|
- Result;
|
|
|
- false ->
|
|
|
- more
|
|
|
- end,
|
|
|
- case SkipResult of
|
|
|
- closed ->
|
|
|
- {ok, false, []};
|
|
|
- more ->
|
|
|
- case qlc_next_answers(Cursor, Limit) of
|
|
|
- {closed, Rows} ->
|
|
|
- {ok, false, Rows};
|
|
|
- {more, Rows} ->
|
|
|
- qlc:delete_cursor(Cursor),
|
|
|
- {ok, true, Rows}
|
|
|
- end
|
|
|
+ case emqx_utils_stream:consume(NSkip, S1) of
|
|
|
+ {_, S2} ->
|
|
|
+ case emqx_utils_stream:consume(Limit, S2) of
|
|
|
+ {Rows, _S3} ->
|
|
|
+ {ok, true, Rows};
|
|
|
+ Rows when is_list(Rows) ->
|
|
|
+ {ok, false, Rows}
|
|
|
+ end;
|
|
|
+ Rows when is_list(Rows) ->
|
|
|
+ {ok, false, []}
|
|
|
end.
|
|
|
|
|
|
clean(_) ->
|
|
|
@@ -333,58 +327,63 @@ do_store_retained_index(Key, ExpiryTime) ->
|
|
|
},
|
|
|
mnesia:write(?TAB_INDEX, RetainedIndex, write).
|
|
|
|
|
|
-msg_table(SearchTable) ->
|
|
|
- qlc:q([
|
|
|
- Msg
|
|
|
- || #retained_message{
|
|
|
- msg = Msg
|
|
|
- } <- SearchTable
|
|
|
- ]).
|
|
|
+msg_stream(SearchStream) ->
|
|
|
+ emqx_utils_stream:map(
|
|
|
+ fun(#retained_message{msg = Msg}) -> Msg end,
|
|
|
+ SearchStream
|
|
|
+ ).
|
|
|
|
|
|
-search_table(Tokens, Now) ->
|
|
|
+search_stream(Tokens, Now) ->
|
|
|
Indices = dirty_indices(read),
|
|
|
Index = emqx_retainer_index:select_index(Tokens, Indices),
|
|
|
- search_table(Index, Tokens, Now).
|
|
|
+ search_stream(Index, Tokens, Now).
|
|
|
|
|
|
-search_table(undefined, Tokens, Now) ->
|
|
|
+search_stream(undefined, Tokens, Now) ->
|
|
|
Ms = make_message_match_spec(Tokens, Now),
|
|
|
- ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
|
|
|
-search_table(Index, FilterTokens, Now) ->
|
|
|
+ emqx_utils_stream:ets(
|
|
|
+ fun
|
|
|
+ (undefined) -> ets:select(?TAB_MESSAGE, Ms, 1);
|
|
|
+ (Cont) -> ets:select(Cont)
|
|
|
+ end
|
|
|
+ );
|
|
|
+search_stream(Index, FilterTokens, Now) ->
|
|
|
{Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
|
|
|
- Topics = [
|
|
|
- emqx_retainer_index:restore_topic(Key)
|
|
|
- || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
|
|
|
- ],
|
|
|
- RetainedMsgQH = qlc:q([
|
|
|
- ets:lookup(?TAB_MESSAGE, TopicTokens)
|
|
|
- || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens)
|
|
|
- ]),
|
|
|
- qlc:q([
|
|
|
- RetainedMsg
|
|
|
- || [
|
|
|
- #retained_message{
|
|
|
- expiry_time = ExpiryTime
|
|
|
- } = RetainedMsg
|
|
|
- ] <- RetainedMsgQH,
|
|
|
- (ExpiryTime == 0) or (ExpiryTime > Now)
|
|
|
- ]).
|
|
|
+ IndexRecordStream = emqx_utils_stream:ets(
|
|
|
+ fun
|
|
|
+ (undefined) -> ets:select(?TAB_INDEX, Ms, 1);
|
|
|
+ (Cont) -> ets:select(Cont)
|
|
|
+ end
|
|
|
+ ),
|
|
|
+ TopicStream = emqx_utils_stream:map(
|
|
|
+ fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end,
|
|
|
+ IndexRecordStream
|
|
|
+ ),
|
|
|
+ MatchingTopicStream = stream_filter(
|
|
|
+ fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end,
|
|
|
+ TopicStream
|
|
|
+ ),
|
|
|
+ LookedUpRetainMsgStream = emqx_utils_stream:map(
|
|
|
+ fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end,
|
|
|
+ MatchingTopicStream
|
|
|
+ ),
|
|
|
+ FoundAndValidLookedUpRetainMsgStream = stream_filter(
|
|
|
+ fun
|
|
|
+ ([#retained_message{expiry_time = ExpiryTime}]) ->
|
|
|
+ ExpiryTime =:= 0 orelse ExpiryTime >= Now;
|
|
|
+ ([]) ->
|
|
|
+ false
|
|
|
+ end,
|
|
|
+ LookedUpRetainMsgStream
|
|
|
+ ),
|
|
|
+ RetainMsgStream = emqx_utils_stream:map(
|
|
|
+ fun([RetainedMsg]) -> RetainedMsg end,
|
|
|
+ FoundAndValidLookedUpRetainMsgStream
|
|
|
+ ),
|
|
|
+ RetainMsgStream.
|
|
|
|
|
|
match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
|
|
|
match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
|
|
|
|
|
|
-clear_batch(Indices, QC) ->
|
|
|
- {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
|
|
|
- lists:foreach(
|
|
|
- fun(RetainedMsg) ->
|
|
|
- delete_message_with_indices(RetainedMsg, Indices)
|
|
|
- end,
|
|
|
- Rows
|
|
|
- ),
|
|
|
- case Result of
|
|
|
- closed -> ok;
|
|
|
- more -> clear_batch(Indices, QC)
|
|
|
- end.
|
|
|
-
|
|
|
delete_message_by_topic(TopicTokens, Indices) ->
|
|
|
case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of
|
|
|
[] -> ok;
|
|
|
@@ -424,21 +423,6 @@ read_messages(Topic) ->
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-qlc_next_answers(QC, N) ->
|
|
|
- case qlc:next_answers(QC, N) of
|
|
|
- NextAnswers when
|
|
|
- is_list(NextAnswers) andalso
|
|
|
- length(NextAnswers) < N
|
|
|
- ->
|
|
|
- qlc:delete_cursor(QC),
|
|
|
- {closed, NextAnswers};
|
|
|
- NextAnswers when is_list(NextAnswers) ->
|
|
|
- {more, NextAnswers};
|
|
|
- {error, Module, Reason} ->
|
|
|
- qlc:delete_cursor(QC),
|
|
|
- error({qlc_error, Module, Reason})
|
|
|
- end.
|
|
|
-
|
|
|
make_message_match_spec(Tokens, NowMs) ->
|
|
|
Cond = emqx_retainer_index:condition(Tokens),
|
|
|
MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},
|
|
|
@@ -567,9 +551,11 @@ reindex(NewIndices, Force, StatusFun) when
|
|
|
{atomic, ok} = mria:clear_table(?TAB_INDEX),
|
|
|
|
|
|
%% Fill index records in batches.
|
|
|
- QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
|
|
|
-
|
|
|
- ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
|
|
|
+ TopicStream = emqx_utils_stream:map(
|
|
|
+ fun(#retained_message{topic = Topic}) -> Topic end,
|
|
|
+ ets_stream(?TAB_MESSAGE)
|
|
|
+ ),
|
|
|
+ ok = reindex_batch(TopicStream, 0, StatusFun),
|
|
|
|
|
|
%% Enable read indices and unlock reindexing.
|
|
|
finalize_reindex();
|
|
|
@@ -647,12 +633,12 @@ reindex_topic(Indices, Topic) ->
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
-reindex_batch(QC, Done, StatusFun) ->
|
|
|
- case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of
|
|
|
- {atomic, {more, NewDone}} ->
|
|
|
+reindex_batch(Stream0, Done, StatusFun) ->
|
|
|
+ case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [Stream0, Done]) of
|
|
|
+ {atomic, {more, NewDone, Stream1}} ->
|
|
|
_ = StatusFun(NewDone),
|
|
|
- reindex_batch(QC, NewDone, StatusFun);
|
|
|
- {atomic, {closed, NewDone}} ->
|
|
|
+ reindex_batch(Stream1, NewDone, StatusFun);
|
|
|
+ {atomic, {done, NewDone}} ->
|
|
|
_ = StatusFun(NewDone),
|
|
|
ok;
|
|
|
{aborted, Reason} ->
|
|
|
@@ -663,14 +649,26 @@ reindex_batch(QC, Done, StatusFun) ->
|
|
|
{error, Reason}
|
|
|
end.
|
|
|
|
|
|
-do_reindex_batch(QC, Done) ->
|
|
|
+do_reindex_batch(Stream0, Done) ->
|
|
|
Indices = db_indices(write),
|
|
|
- {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
|
|
|
+ Result = emqx_utils_stream:consume(?REINDEX_BATCH_SIZE, Stream0),
|
|
|
+ Topics =
|
|
|
+ case Result of
|
|
|
+ {Rows, _Stream1} ->
|
|
|
+ Rows;
|
|
|
+ Rows when is_list(Rows) ->
|
|
|
+ Rows
|
|
|
+ end,
|
|
|
ok = lists:foreach(
|
|
|
fun(Topic) -> reindex_topic(Indices, Topic) end,
|
|
|
Topics
|
|
|
),
|
|
|
- {Status, Done + length(Topics)}.
|
|
|
+ case Result of
|
|
|
+ {_Rows, Stream1} ->
|
|
|
+ {more, Done + length(Topics), Stream1};
|
|
|
+ _Rows ->
|
|
|
+ {done, Done + length(Topics)}
|
|
|
+ end.
|
|
|
|
|
|
wait_dispatch_complete(Timeout) ->
|
|
|
Nodes = mria:running_nodes(),
|
|
|
@@ -706,3 +704,37 @@ are_indices_updated(Indices) ->
|
|
|
_ ->
|
|
|
false
|
|
|
end.
|
|
|
+
|
|
|
+ets_stream(Tab) ->
|
|
|
+ emqx_utils_stream:ets(
|
|
|
+ fun
|
|
|
+ (undefined) -> ets:match_object(Tab, '$1', 1);
|
|
|
+ (Cont) -> ets:match_object(Cont)
|
|
|
+ end
|
|
|
+ ).
|
|
|
+
|
|
|
+stream_foreach(F, S) ->
|
|
|
+ case emqx_utils_stream:next(S) of
|
|
|
+ [X | Rest] ->
|
|
|
+ F(X),
|
|
|
+ stream_foreach(F, Rest);
|
|
|
+ [] ->
|
|
|
+ ok
|
|
|
+ end.
|
|
|
+
|
|
|
+%% TODO: move to emqx_utils_stream
|
|
|
+stream_filter(F, S) ->
|
|
|
+ FilterNext = fun FilterNext(St) ->
|
|
|
+ case emqx_utils_stream:next(St) of
|
|
|
+ [X | Rest] ->
|
|
|
+ case F(X) of
|
|
|
+ true ->
|
|
|
+ [X | stream_filter(F, Rest)];
|
|
|
+ false ->
|
|
|
+ FilterNext(Rest)
|
|
|
+ end;
|
|
|
+ [] ->
|
|
|
+ []
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ fun() -> FilterNext(S) end.
|