|
|
@@ -20,7 +20,6 @@
|
|
|
|
|
|
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]).
|
|
|
|
|
|
--define(FRESH_SELECT, fresh_select).
|
|
|
-define(LONG_QUERY_TIMEOUT, 50000).
|
|
|
|
|
|
-export([
|
|
|
@@ -174,13 +173,12 @@ do_node_query(
|
|
|
case do_query(Node, QueryState) of
|
|
|
{error, {badrpc, R}} ->
|
|
|
{error, Node, {badrpc, R}};
|
|
|
- {Rows, NQueryState = #{continuation := ?FRESH_SELECT}} ->
|
|
|
- {_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc),
|
|
|
- NResultAcc;
|
|
|
- {Rows, NQueryState} ->
|
|
|
+ {Rows, NQueryState = #{complete := Complete}} ->
|
|
|
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
|
|
{enough, NResultAcc} ->
|
|
|
- NResultAcc;
|
|
|
+ finalize_query(NResultAcc, NQueryState);
|
|
|
+ {_, NResultAcc} when Complete ->
|
|
|
+ finalize_query(NResultAcc, NQueryState);
|
|
|
{more, NResultAcc} ->
|
|
|
do_node_query(Node, NQueryState, NResultAcc)
|
|
|
end
|
|
|
@@ -212,8 +210,8 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
|
|
|
end.
|
|
|
|
|
|
%% @private
|
|
|
-do_cluster_query([], _QueryState, ResultAcc) ->
|
|
|
- ResultAcc;
|
|
|
+do_cluster_query([], QueryState, ResultAcc) ->
|
|
|
+ finalize_query(ResultAcc, mark_complete(QueryState));
|
|
|
do_cluster_query(
|
|
|
[Node | Tail] = Nodes,
|
|
|
QueryState,
|
|
|
@@ -222,31 +220,29 @@ do_cluster_query(
|
|
|
case do_query(Node, QueryState) of
|
|
|
{error, {badrpc, R}} ->
|
|
|
{error, Node, {badrpc, R}};
|
|
|
- {Rows, NQueryState} ->
|
|
|
+ {Rows, NQueryState = #{complete := Complete}} ->
|
|
|
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
|
|
{enough, NResultAcc} ->
|
|
|
- maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc);
|
|
|
+ FQueryState = maybe_collect_total_from_tail_nodes(Tail, NQueryState),
|
|
|
+ FComplete = Complete andalso Tail =:= [],
|
|
|
+ finalize_query(NResultAcc, mark_complete(FQueryState, FComplete));
|
|
|
+ {more, NResultAcc} when not Complete ->
|
|
|
+ do_cluster_query(Nodes, NQueryState, NResultAcc);
|
|
|
+ {more, NResultAcc} when Tail =/= [] ->
|
|
|
+ do_cluster_query(Tail, reset_query_state(NQueryState), NResultAcc);
|
|
|
{more, NResultAcc} ->
|
|
|
- NextNodes =
|
|
|
- case NQueryState of
|
|
|
- #{continuation := ?FRESH_SELECT} -> Tail;
|
|
|
- _ -> Nodes
|
|
|
- end,
|
|
|
- do_cluster_query(NextNodes, NQueryState, NResultAcc)
|
|
|
+ finalize_query(NResultAcc, NQueryState)
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) ->
|
|
|
- ResultAcc;
|
|
|
-maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) ->
|
|
|
- case counting_total_fun(QueryState) of
|
|
|
- false ->
|
|
|
- ResultAcc;
|
|
|
- _Fun ->
|
|
|
- collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc)
|
|
|
- end.
|
|
|
+maybe_collect_total_from_tail_nodes([], QueryState) ->
|
|
|
+ QueryState;
|
|
|
+maybe_collect_total_from_tail_nodes(Nodes, QueryState = #{total := _}) ->
|
|
|
+ collect_total_from_tail_nodes(Nodes, QueryState);
|
|
|
+maybe_collect_total_from_tail_nodes(_Nodes, QueryState) ->
|
|
|
+ QueryState.
|
|
|
|
|
|
-collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) ->
|
|
|
+collect_total_from_tail_nodes(Nodes, QueryState = #{total := TotalAcc}) ->
|
|
|
%% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
|
|
|
case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of
|
|
|
{_, [Node | _]} ->
|
|
|
@@ -257,7 +253,8 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc
|
|
|
[{Node, {badrpc, Reason}} | _] ->
|
|
|
{error, Node, {badrpc, Reason}};
|
|
|
[] ->
|
|
|
- ResultAcc#{total => ResL ++ TotalAcc}
|
|
|
+ NTotalAcc = maps:merge(TotalAcc, maps:from_list(ResL)),
|
|
|
+ QueryState#{total := NTotalAcc}
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
@@ -266,13 +263,14 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
%% QueryState ::
|
|
|
-%% #{continuation := ets:continuation(),
|
|
|
+%% #{continuation => ets:continuation(),
|
|
|
%% page := pos_integer(),
|
|
|
%% limit := pos_integer(),
|
|
|
-%% total := [{node(), non_neg_integer()}],
|
|
|
+%% total => #{node() => non_neg_integer()},
|
|
|
%% table := atom(),
|
|
|
-%% qs := {Qs, Fuzzy} %% parsed query params
|
|
|
-%% msfun := query_to_match_spec_fun()
|
|
|
+%% qs := {Qs, Fuzzy}, %% parsed query params
|
|
|
+%% msfun := query_to_match_spec_fun(),
|
|
|
+%% complete := boolean()
|
|
|
%% }
|
|
|
init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) ->
|
|
|
#{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
|
|
@@ -285,17 +283,31 @@ init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) -
|
|
|
true = is_list(Args),
|
|
|
{type, external} = erlang:fun_info(NamedFun, type)
|
|
|
end,
|
|
|
- #{
|
|
|
+ QueryState = #{
|
|
|
page => Page,
|
|
|
limit => Limit,
|
|
|
table => Tab,
|
|
|
qs => QString,
|
|
|
msfun => MsFun,
|
|
|
- mactch_spec => Ms,
|
|
|
+ match_spec => Ms,
|
|
|
fuzzy_fun => FuzzyFun,
|
|
|
- total => [],
|
|
|
- continuation => ?FRESH_SELECT
|
|
|
- }.
|
|
|
+ complete => false
|
|
|
+ },
|
|
|
+ case counting_total_fun(QueryState) of
|
|
|
+ false ->
|
|
|
+ QueryState;
|
|
|
+ Fun when is_function(Fun) ->
|
|
|
+ QueryState#{total => #{}}
|
|
|
+ end.
|
|
|
+
|
|
|
+reset_query_state(QueryState) ->
|
|
|
+ maps:remove(continuation, mark_complete(QueryState, false)).
|
|
|
+
|
|
|
+mark_complete(QueryState) ->
|
|
|
+ mark_complete(QueryState, true).
|
|
|
+
|
|
|
+mark_complete(QueryState, Complete) ->
|
|
|
+ QueryState#{complete => Complete}.
|
|
|
|
|
|
%% @private This function is exempt from BPAPI
|
|
|
do_query(Node, QueryState) when Node =:= node() ->
|
|
|
@@ -318,47 +330,50 @@ do_select(
|
|
|
Node,
|
|
|
QueryState0 = #{
|
|
|
table := Tab,
|
|
|
- mactch_spec := Ms,
|
|
|
- fuzzy_fun := FuzzyFun,
|
|
|
- continuation := Continuation,
|
|
|
- limit := Limit
|
|
|
+ match_spec := Ms,
|
|
|
+ limit := Limit,
|
|
|
+ complete := false
|
|
|
}
|
|
|
) ->
|
|
|
QueryState = maybe_apply_total_query(Node, QueryState0),
|
|
|
Result =
|
|
|
- case Continuation of
|
|
|
- ?FRESH_SELECT ->
|
|
|
+ case maps:get(continuation, QueryState, undefined) of
|
|
|
+ undefined ->
|
|
|
ets:select(Tab, Ms, Limit);
|
|
|
- _ ->
|
|
|
+ Continuation ->
|
|
|
%% XXX: Repair is necessary because we pass Continuation back
|
|
|
%% and forth through the nodes in the `do_cluster_query`
|
|
|
ets:select(ets:repair_continuation(Continuation, Ms))
|
|
|
end,
|
|
|
case Result of
|
|
|
- '$end_of_table' ->
|
|
|
- {[], QueryState#{continuation => ?FRESH_SELECT}};
|
|
|
+ {Rows, '$end_of_table'} ->
|
|
|
+ NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
|
|
|
+ {NRows, mark_complete(QueryState)};
|
|
|
{Rows, NContinuation} ->
|
|
|
- NRows =
|
|
|
- case FuzzyFun of
|
|
|
- undefined ->
|
|
|
- Rows;
|
|
|
- {FilterFun, Args0} when is_function(FilterFun), is_list(Args0) ->
|
|
|
- lists:filter(
|
|
|
- fun(E) -> erlang:apply(FilterFun, [E | Args0]) end,
|
|
|
- Rows
|
|
|
- )
|
|
|
- end,
|
|
|
- {NRows, QueryState#{continuation => NContinuation}}
|
|
|
+ NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
|
|
|
+ {NRows, QueryState#{continuation => NContinuation}};
|
|
|
+ '$end_of_table' ->
|
|
|
+ {[], mark_complete(QueryState)}
|
|
|
end.
|
|
|
|
|
|
-maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) ->
|
|
|
- case proplists:get_value(Node, TotalAcc, undefined) of
|
|
|
- undefined ->
|
|
|
- Total = apply_total_query(QueryState),
|
|
|
- QueryState#{total := [{Node, Total} | TotalAcc]};
|
|
|
- _ ->
|
|
|
- QueryState
|
|
|
- end.
|
|
|
+maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := undefined}) ->
|
|
|
+ Rows;
|
|
|
+maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := {FilterFun, Args}}) ->
|
|
|
+ lists:filter(
|
|
|
+ fun(E) -> erlang:apply(FilterFun, [E | Args]) end,
|
|
|
+ Rows
|
|
|
+ ).
|
|
|
+
|
|
|
+maybe_apply_total_query(Node, QueryState = #{total := Acc}) ->
|
|
|
+ case Acc of
|
|
|
+ #{Node := _} ->
|
|
|
+ QueryState;
|
|
|
+ #{} ->
|
|
|
+ NodeTotal = apply_total_query(QueryState),
|
|
|
+ QueryState#{total := Acc#{Node => NodeTotal}}
|
|
|
+ end;
|
|
|
+maybe_apply_total_query(_Node, QueryState = #{}) ->
|
|
|
+ QueryState.
|
|
|
|
|
|
apply_total_query(QueryState = #{table := Tab}) ->
|
|
|
case counting_total_fun(QueryState) of
|
|
|
@@ -371,7 +386,7 @@ apply_total_query(QueryState = #{table := Tab}) ->
|
|
|
|
|
|
counting_total_fun(_QueryState = #{qs := {[], []}}) ->
|
|
|
fun(Tab) -> ets:info(Tab, size) end;
|
|
|
-counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) ->
|
|
|
+counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
|
|
|
%% XXX: Calculating the total number of data that match a certain
|
|
|
%% condition under a large table is very expensive because the
|
|
|
%% entire ETS table needs to be scanned.
|
|
|
@@ -390,15 +405,16 @@ counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= und
|
|
|
%% ResultAcc :: #{count := integer(),
|
|
|
%% cursor := integer(),
|
|
|
%% rows := [{node(), Rows :: list()}],
|
|
|
-%% total := [{node() => integer()}]
|
|
|
+%% partial := boolean(),
|
|
|
+%% hasnext => boolean()
|
|
|
%% }
|
|
|
init_query_result() ->
|
|
|
- #{cursor => 0, count => 0, rows => [], total => []}.
|
|
|
+ #{cursor => 0, count => 0, rows => [], partial => false}.
|
|
|
|
|
|
accumulate_query_rows(
|
|
|
Node,
|
|
|
Rows,
|
|
|
- _QueryState = #{page := Page, limit := Limit, total := TotalAcc},
|
|
|
+ _QueryState = #{page := Page, limit := Limit},
|
|
|
ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
|
|
|
) ->
|
|
|
PageStart = (Page - 1) * Limit + 1,
|
|
|
@@ -406,24 +422,34 @@ accumulate_query_rows(
|
|
|
Len = length(Rows),
|
|
|
case Cursor + Len of
|
|
|
NCursor when NCursor < PageStart ->
|
|
|
- {more, ResultAcc#{cursor => NCursor, total => TotalAcc}};
|
|
|
+ {more, ResultAcc#{cursor => NCursor}};
|
|
|
NCursor when NCursor < PageEnd ->
|
|
|
+ SubRows = lists:nthtail(max(0, PageStart - Cursor - 1), Rows),
|
|
|
{more, ResultAcc#{
|
|
|
cursor => NCursor,
|
|
|
- count => Count + length(Rows),
|
|
|
- total => TotalAcc,
|
|
|
- rows => [{Node, Rows} | RowsAcc]
|
|
|
+ count => Count + length(SubRows),
|
|
|
+ rows => [{Node, SubRows} | RowsAcc]
|
|
|
}};
|
|
|
NCursor when NCursor >= PageEnd ->
|
|
|
SubRows = lists:sublist(Rows, Limit - Count),
|
|
|
{enough, ResultAcc#{
|
|
|
cursor => NCursor,
|
|
|
count => Count + length(SubRows),
|
|
|
- total => TotalAcc,
|
|
|
- rows => [{Node, SubRows} | RowsAcc]
|
|
|
+ rows => [{Node, SubRows} | RowsAcc],
|
|
|
+ partial => (Limit - Count) < Len
|
|
|
}}
|
|
|
end.
|
|
|
|
|
|
+finalize_query(Result = #{partial := Partial}, QueryState = #{complete := Complete}) ->
|
|
|
+ HasNext = Partial orelse not Complete,
|
|
|
+ maybe_accumulate_totals(Result#{hasnext => HasNext}, QueryState).
|
|
|
+
|
|
|
+maybe_accumulate_totals(Result, #{total := TotalAcc}) ->
|
|
|
+ QueryTotal = maps:fold(fun(_Node, T, N) -> N + T end, 0, TotalAcc),
|
|
|
+ Result#{total => QueryTotal};
|
|
|
+maybe_accumulate_totals(Result, _QueryState) ->
|
|
|
+ Result.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal Functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -520,16 +546,22 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
|
|
|
is_fuzzy_key(_) ->
|
|
|
false.
|
|
|
|
|
|
-format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) ->
|
|
|
+format_query_result(_FmtFun, _MetaIn, Error = {error, _Node, _Reason}) ->
|
|
|
Error;
|
|
|
format_query_result(
|
|
|
- FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc}
|
|
|
+ FmtFun, MetaIn, ResultAcc = #{hasnext := HasNext, rows := RowsAcc}
|
|
|
) ->
|
|
|
- Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc),
|
|
|
+ Meta =
|
|
|
+ case ResultAcc of
|
|
|
+ #{total := QueryTotal} ->
|
|
|
+ %% The `count` is used in HTTP API to indicate the total number of
|
|
|
+ %% queries that can be read
|
|
|
+ MetaIn#{hasnext => HasNext, count => QueryTotal};
|
|
|
+ #{} ->
|
|
|
+ MetaIn#{hasnext => HasNext}
|
|
|
+ end,
|
|
|
#{
|
|
|
- %% The `count` is used in HTTP API to indicate the total number of
|
|
|
- %% queries that can be read
|
|
|
- meta => Meta#{count => Total},
|
|
|
+ meta => Meta,
|
|
|
data => lists:flatten(
|
|
|
lists:foldl(
|
|
|
fun({Node, Rows}, Acc) ->
|
|
|
@@ -552,7 +584,7 @@ parse_pager_params(Params) ->
|
|
|
Limit = b2i(limit(Params)),
|
|
|
case Page > 0 andalso Limit > 0 of
|
|
|
true ->
|
|
|
- #{page => Page, limit => Limit, count => 0};
|
|
|
+ #{page => Page, limit => Limit};
|
|
|
false ->
|
|
|
false
|
|
|
end.
|