|
@@ -35,8 +35,7 @@
|
|
|
b2i/1
|
|
b2i/1
|
|
|
]).
|
|
]).
|
|
|
|
|
|
|
|
--export([do_query/5]).
|
|
|
|
|
--export([parse_qstring/2]).
|
|
|
|
|
|
|
+-export([do_query/2, apply_total_query/1]).
|
|
|
|
|
|
|
|
paginate(Tables, Params, {Module, FormatFun}) ->
|
|
paginate(Tables, Params, {Module, FormatFun}) ->
|
|
|
Qh = query_handle(Tables),
|
|
Qh = query_handle(Tables),
|
|
@@ -152,9 +151,9 @@ node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) ->
|
|
|
Meta ->
|
|
Meta ->
|
|
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
ResultAcc = init_query_result(),
|
|
ResultAcc = init_query_result(),
|
|
|
- QueryState = init_query_state(Meta),
|
|
|
|
|
|
|
+ QueryState = init_query_state(Tab, NQString, MsFun, Meta),
|
|
|
NResultAcc = do_node_query(
|
|
NResultAcc = do_node_query(
|
|
|
- Node, Tab, NQString, MsFun, QueryState, ResultAcc
|
|
|
|
|
|
|
+ Node, QueryState, ResultAcc
|
|
|
),
|
|
),
|
|
|
format_query_result(FmtFun, Meta, NResultAcc)
|
|
format_query_result(FmtFun, Meta, NResultAcc)
|
|
|
end.
|
|
end.
|
|
@@ -162,13 +161,10 @@ node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) ->
|
|
|
%% @private
|
|
%% @private
|
|
|
do_node_query(
|
|
do_node_query(
|
|
|
Node,
|
|
Node,
|
|
|
- Tab,
|
|
|
|
|
- QString,
|
|
|
|
|
- MsFun,
|
|
|
|
|
QueryState,
|
|
QueryState,
|
|
|
ResultAcc
|
|
ResultAcc
|
|
|
) ->
|
|
) ->
|
|
|
- case do_query(Node, Tab, QString, MsFun, QueryState) of
|
|
|
|
|
|
|
+ case do_query(Node, QueryState) of
|
|
|
{error, {badrpc, R}} ->
|
|
{error, {badrpc, R}} ->
|
|
|
{error, Node, {badrpc, R}};
|
|
{error, Node, {badrpc, R}};
|
|
|
{Rows, NQueryState = #{continuation := ?FRESH_SELECT}} ->
|
|
{Rows, NQueryState = #{continuation := ?FRESH_SELECT}} ->
|
|
@@ -179,7 +175,7 @@ do_node_query(
|
|
|
{enough, NResultAcc} ->
|
|
{enough, NResultAcc} ->
|
|
|
NResultAcc;
|
|
NResultAcc;
|
|
|
{more, NResultAcc} ->
|
|
{more, NResultAcc} ->
|
|
|
- do_node_query(Node, Tab, QString, MsFun, NQueryState, NResultAcc)
|
|
|
|
|
|
|
+ do_node_query(Node, NQueryState, NResultAcc)
|
|
|
end
|
|
end
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
@@ -201,57 +197,51 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
|
|
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
{_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
|
ResultAcc = init_query_result(),
|
|
ResultAcc = init_query_result(),
|
|
|
- QueryState = init_query_state(Meta),
|
|
|
|
|
|
|
+ QueryState = init_query_state(Tab, NQString, MsFun, Meta),
|
|
|
NResultAcc = do_cluster_query(
|
|
NResultAcc = do_cluster_query(
|
|
|
- Nodes, Tab, NQString, MsFun, QueryState, ResultAcc
|
|
|
|
|
|
|
+ Nodes, QueryState, ResultAcc
|
|
|
),
|
|
),
|
|
|
format_query_result(FmtFun, Meta, NResultAcc)
|
|
format_query_result(FmtFun, Meta, NResultAcc)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
%% @private
|
|
%% @private
|
|
|
-do_cluster_query([], _Tab, _QString, _MsFun, _QueryState, ResultAcc) ->
|
|
|
|
|
|
|
+do_cluster_query([], _QueryState, ResultAcc) ->
|
|
|
ResultAcc;
|
|
ResultAcc;
|
|
|
do_cluster_query(
|
|
do_cluster_query(
|
|
|
[Node | Tail] = Nodes,
|
|
[Node | Tail] = Nodes,
|
|
|
- Tab,
|
|
|
|
|
- QString,
|
|
|
|
|
- MsFun,
|
|
|
|
|
QueryState,
|
|
QueryState,
|
|
|
ResultAcc
|
|
ResultAcc
|
|
|
) ->
|
|
) ->
|
|
|
- case do_query(Node, Tab, QString, MsFun, QueryState) of
|
|
|
|
|
|
|
+ case do_query(Node, QueryState) of
|
|
|
{error, {badrpc, R}} ->
|
|
{error, {badrpc, R}} ->
|
|
|
{error, Node, {badrpc, R}};
|
|
{error, Node, {badrpc, R}};
|
|
|
{Rows, NQueryState} ->
|
|
{Rows, NQueryState} ->
|
|
|
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
|
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
|
|
{enough, NResultAcc} ->
|
|
{enough, NResultAcc} ->
|
|
|
- maybe_collect_total_from_tail_nodes(Tail, Tab, QString, MsFun, NResultAcc);
|
|
|
|
|
|
|
+ maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc);
|
|
|
{more, NResultAcc} ->
|
|
{more, NResultAcc} ->
|
|
|
NextNodes =
|
|
NextNodes =
|
|
|
case NQueryState of
|
|
case NQueryState of
|
|
|
#{continuation := ?FRESH_SELECT} -> Tail;
|
|
#{continuation := ?FRESH_SELECT} -> Tail;
|
|
|
_ -> Nodes
|
|
_ -> Nodes
|
|
|
end,
|
|
end,
|
|
|
- do_cluster_query(NextNodes, Tab, QString, MsFun, NQueryState, NResultAcc)
|
|
|
|
|
|
|
+ do_cluster_query(NextNodes, NQueryState, NResultAcc)
|
|
|
end
|
|
end
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-maybe_collect_total_from_tail_nodes([], _Tab, _QString, _MsFun, ResultAcc) ->
|
|
|
|
|
|
|
+maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) ->
|
|
|
ResultAcc;
|
|
ResultAcc;
|
|
|
-maybe_collect_total_from_tail_nodes(Nodes, Tab, QString, MsFun, ResultAcc) ->
|
|
|
|
|
- {Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
|
|
|
|
- case counting_total_fun(Ms, FuzzyFun) of
|
|
|
|
|
|
|
+maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) ->
|
|
|
|
|
+ case counting_total_fun(QueryState) of
|
|
|
false ->
|
|
false ->
|
|
|
ResultAcc;
|
|
ResultAcc;
|
|
|
_Fun ->
|
|
_Fun ->
|
|
|
- collect_total_from_tail_nodes(Nodes, Tab, Ms, FuzzyFun, ResultAcc)
|
|
|
|
|
|
|
+ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-collect_total_from_tail_nodes(Nodes, Tab, Ms, FuzzyFun, ResultAcc = #{total := TotalAcc}) ->
|
|
|
|
|
|
|
+collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) ->
|
|
|
%% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
|
|
%% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
|
|
|
- case
|
|
|
|
|
- rpc:multicall(Nodes, ?MODULE, apply_total_query, [Tab, Ms, FuzzyFun], ?LONG_QUERY_TIMEOUT)
|
|
|
|
|
- of
|
|
|
|
|
|
|
+ case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of
|
|
|
{_, [Node | _]} ->
|
|
{_, [Node | _]} ->
|
|
|
{error, Node, {badrpc, badnode}};
|
|
{error, Node, {badrpc, badnode}};
|
|
|
{ResL0, []} ->
|
|
{ResL0, []} ->
|
|
@@ -272,27 +262,35 @@ collect_total_from_tail_nodes(Nodes, Tab, Ms, FuzzyFun, ResultAcc = #{total := T
|
|
|
%% #{continuation := ets:continuation(),
|
|
%% #{continuation := ets:continuation(),
|
|
|
%% page := pos_integer(),
|
|
%% page := pos_integer(),
|
|
|
%% limit := pos_integer(),
|
|
%% limit := pos_integer(),
|
|
|
-%% total := [{node(), non_neg_integer()}]
|
|
|
|
|
|
|
+%% total := [{node(), non_neg_integer()}],
|
|
|
|
|
+%% table := atom(),
|
|
|
|
|
+%% qs := {Qs, Fuzzy} %% parsed query params
|
|
|
|
|
+%% msfun := query_to_match_spec_fun()
|
|
|
%% }
|
|
%% }
|
|
|
-init_query_state(_Meta = #{page := Page, limit := Limit}) ->
|
|
|
|
|
|
|
+init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) ->
|
|
|
|
|
+ {Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
|
|
#{
|
|
#{
|
|
|
- continuation => ?FRESH_SELECT,
|
|
|
|
|
page => Page,
|
|
page => Page,
|
|
|
limit => Limit,
|
|
limit => Limit,
|
|
|
- total => []
|
|
|
|
|
|
|
+ table => Tab,
|
|
|
|
|
+ qs => QString,
|
|
|
|
|
+ msfun => MsFun,
|
|
|
|
|
+ mactch_spec => Ms,
|
|
|
|
|
+ fuzzy_fun => FuzzyFun,
|
|
|
|
|
+ total => [],
|
|
|
|
|
+ continuation => ?FRESH_SELECT
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
%% @private This function is exempt from BPAPI
|
|
%% @private This function is exempt from BPAPI
|
|
|
-do_query(Node, Tab, QString, MsFun, QueryState) when Node =:= node(), is_function(MsFun) ->
|
|
|
|
|
- {Ms, FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
|
|
|
|
- do_select(Node, Tab, Ms, FuzzyFun, QueryState);
|
|
|
|
|
-do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) ->
|
|
|
|
|
|
|
+do_query(Node, QueryState) when Node =:= node() ->
|
|
|
|
|
+ do_select(Node, QueryState);
|
|
|
|
|
+do_query(Node, QueryState) ->
|
|
|
case
|
|
case
|
|
|
rpc:call(
|
|
rpc:call(
|
|
|
Node,
|
|
Node,
|
|
|
?MODULE,
|
|
?MODULE,
|
|
|
do_query,
|
|
do_query,
|
|
|
- [Node, Tab, QString, MsFun, QueryState],
|
|
|
|
|
|
|
+ [Node, QueryState],
|
|
|
?LONG_QUERY_TIMEOUT
|
|
?LONG_QUERY_TIMEOUT
|
|
|
)
|
|
)
|
|
|
of
|
|
of
|
|
@@ -302,18 +300,21 @@ do_query(Node, Tab, QString, MsFun, QueryState) when is_function(MsFun) ->
|
|
|
|
|
|
|
|
do_select(
|
|
do_select(
|
|
|
Node,
|
|
Node,
|
|
|
- Tab,
|
|
|
|
|
- Ms,
|
|
|
|
|
- FuzzyFun,
|
|
|
|
|
- QueryState0 = #{continuation := Continuation, limit := Limit}
|
|
|
|
|
|
|
+ QueryState0 = #{
|
|
|
|
|
+ table := Tab,
|
|
|
|
|
+ mactch_spec := Ms,
|
|
|
|
|
+ fuzzy_fun := FuzzyFun,
|
|
|
|
|
+ continuation := Continuation,
|
|
|
|
|
+ limit := Limit
|
|
|
|
|
+ }
|
|
|
) ->
|
|
) ->
|
|
|
- QueryState = maybe_apply_total_query(Node, Tab, Ms, FuzzyFun, QueryState0),
|
|
|
|
|
|
|
+ QueryState = maybe_apply_total_query(Node, QueryState0),
|
|
|
Result =
|
|
Result =
|
|
|
case Continuation of
|
|
case Continuation of
|
|
|
?FRESH_SELECT ->
|
|
?FRESH_SELECT ->
|
|
|
ets:select(Tab, Ms, Limit);
|
|
ets:select(Tab, Ms, Limit);
|
|
|
_ ->
|
|
_ ->
|
|
|
- ets:select(ets:repair_continuation(Continuation, Ms))
|
|
|
|
|
|
|
+ ets:select(Continuation)
|
|
|
end,
|
|
end,
|
|
|
case Result of
|
|
case Result of
|
|
|
'$end_of_table' ->
|
|
'$end_of_table' ->
|
|
@@ -327,17 +328,17 @@ do_select(
|
|
|
{NRows, QueryState#{continuation => NContinuation}}
|
|
{NRows, QueryState#{continuation => NContinuation}}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-maybe_apply_total_query(Node, Tab, Ms, FuzzyFun, QueryState = #{total := TotalAcc}) ->
|
|
|
|
|
|
|
+maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) ->
|
|
|
case proplists:get_value(Node, TotalAcc, undefined) of
|
|
case proplists:get_value(Node, TotalAcc, undefined) of
|
|
|
undefined ->
|
|
undefined ->
|
|
|
- Total = apply_total_query(Tab, Ms, FuzzyFun),
|
|
|
|
|
|
|
+ Total = apply_total_query(QueryState),
|
|
|
QueryState#{total := [{Node, Total} | TotalAcc]};
|
|
QueryState#{total := [{Node, Total} | TotalAcc]};
|
|
|
_ ->
|
|
_ ->
|
|
|
QueryState
|
|
QueryState
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-apply_total_query(Tab, Ms, FuzzyFun) ->
|
|
|
|
|
- case counting_total_fun(Ms, FuzzyFun) of
|
|
|
|
|
|
|
+apply_total_query(QueryState = #{table := Tab}) ->
|
|
|
|
|
+ case counting_total_fun(QueryState) of
|
|
|
false ->
|
|
false ->
|
|
|
%% return a fake total number if the query have any conditions
|
|
%% return a fake total number if the query have any conditions
|
|
|
0;
|
|
0;
|
|
@@ -345,19 +346,20 @@ apply_total_query(Tab, Ms, FuzzyFun) ->
|
|
|
Fun(Tab)
|
|
Fun(Tab)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-counting_total_fun(Ms, undefined) ->
|
|
|
|
|
|
|
+counting_total_fun(_QueryState = #{qs := {[], []}}) ->
|
|
|
|
|
+ fun(Tab) -> ets:info(Tab, size) end;
|
|
|
|
|
+counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) ->
|
|
|
%% XXX: Calculating the total number of data that match a certain
|
|
%% XXX: Calculating the total number of data that match a certain
|
|
|
%% condition under a large table is very expensive because the
|
|
%% condition under a large table is very expensive because the
|
|
|
%% entire ETS table needs to be scanned.
|
|
%% entire ETS table needs to be scanned.
|
|
|
%%
|
|
%%
|
|
|
%% XXX: How to optimize it? i.e, using:
|
|
%% XXX: How to optimize it? i.e, using:
|
|
|
- %% `fun(Tab) -> ets:info(Tab, size) end`
|
|
|
|
|
[{MatchHead, Conditions, _Return}] = Ms,
|
|
[{MatchHead, Conditions, _Return}] = Ms,
|
|
|
CountingMs = [{MatchHead, Conditions, [true]}],
|
|
CountingMs = [{MatchHead, Conditions, [true]}],
|
|
|
fun(Tab) ->
|
|
fun(Tab) ->
|
|
|
ets:select_count(Tab, CountingMs)
|
|
ets:select_count(Tab, CountingMs)
|
|
|
end;
|
|
end;
|
|
|
-counting_total_fun(_Ms, FuzzyFun) when is_function(FuzzyFun) ->
|
|
|
|
|
|
|
+counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when is_function(FuzzyFun) ->
|
|
|
%% XXX: Calculating the total number for a fuzzy searching is very very expensive
|
|
%% XXX: Calculating the total number for a fuzzy searching is very very expensive
|
|
|
%% so it is not supported now
|
|
%% so it is not supported now
|
|
|
false.
|
|
false.
|