|
|
@@ -21,6 +21,7 @@
|
|
|
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]).
|
|
|
|
|
|
-define(FRESH_SELECT, fresh_select).
|
|
|
+-define(LONG_QUERY_TIMEOUT, 50000).
|
|
|
|
|
|
-export([
|
|
|
paginate/3,
|
|
|
@@ -29,13 +30,34 @@
|
|
|
|
|
|
%% first_next query APIs
|
|
|
-export([
|
|
|
- node_query/5,
|
|
|
- cluster_query/4,
|
|
|
- select_table_with_count/5,
|
|
|
+ node_query/6,
|
|
|
+ cluster_query/5,
|
|
|
b2i/1
|
|
|
]).
|
|
|
|
|
|
--export([do_query/6]).
|
|
|
+-export_type([
|
|
|
+ match_spec_and_filter/0
|
|
|
+]).
|
|
|
+
|
|
|
+-type query_params() :: list() | map().
|
|
|
+
|
|
|
+-type query_schema() :: [
|
|
|
+ {Key :: binary(), Type :: atom | binary | integer | timestamp | ip | ip_port}
|
|
|
+].
|
|
|
+
|
|
|
+-type query_to_match_spec_fun() :: fun((list(), list()) -> match_spec_and_filter()).
|
|
|
+
|
|
|
+-type match_spec_and_filter() :: #{match_spec := ets:match_spec(), fuzzy_fun := fuzzy_filter_fun()}.
|
|
|
+
|
|
|
+-type fuzzy_filter_fun() :: undefined | {fun(), list()}.
|
|
|
+
|
|
|
+-type format_result_fun() ::
|
|
|
+ fun((node(), term()) -> term())
|
|
|
+ | fun((term()) -> term()).
|
|
|
+
|
|
|
+-type query_return() :: #{meta := map(), data := [term()]}.
|
|
|
+
|
|
|
+-export([do_query/2, apply_total_query/1]).
|
|
|
|
|
|
paginate(Tables, Params, {Module, FormatFun}) ->
|
|
|
Qh = query_handle(Tables),
|
|
|
@@ -117,171 +139,289 @@ limit(Params) when is_map(Params) ->
|
|
|
limit(Params) ->
|
|
|
proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
|
|
|
|
|
|
-init_meta(Params) ->
|
|
|
- Limit = b2i(limit(Params)),
|
|
|
- Page = b2i(page(Params)),
|
|
|
- #{
|
|
|
- page => Page,
|
|
|
- limit => Limit,
|
|
|
- count => 0
|
|
|
- }.
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Node Query
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-node_query(Node, QString, Tab, QSchema, QueryFun) ->
|
|
|
- {_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
- page_limit_check_query(
|
|
|
- init_meta(QString),
|
|
|
- {fun do_node_query/5, [Node, Tab, NQString, QueryFun, init_meta(QString)]}
|
|
|
- ).
|
|
|
+-spec node_query(
|
|
|
+ node(),
|
|
|
+ atom(),
|
|
|
+ query_params(),
|
|
|
+ query_schema(),
|
|
|
+ query_to_match_spec_fun(),
|
|
|
+ format_result_fun()
|
|
|
+) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
|
|
|
+node_query(Node, Tab, QString, QSchema, MsFun, FmtFun) ->
|
|
|
+ case parse_pager_params(QString) of
|
|
|
+ false ->
|
|
|
+ {error, page_limit_invalid};
|
|
|
+ Meta ->
|
|
|
+ {_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
+ ResultAcc = init_query_result(),
|
|
|
+ QueryState = init_query_state(Tab, NQString, MsFun, Meta),
|
|
|
+ NResultAcc = do_node_query(
|
|
|
+ Node, QueryState, ResultAcc
|
|
|
+ ),
|
|
|
+ format_query_result(FmtFun, Meta, NResultAcc)
|
|
|
+ end.
|
|
|
|
|
|
%% @private
|
|
|
-do_node_query(Node, Tab, QString, QueryFun, Meta) ->
|
|
|
- do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
|
|
|
-
|
|
|
do_node_query(
|
|
|
Node,
|
|
|
- Tab,
|
|
|
- QString,
|
|
|
- QueryFun,
|
|
|
- Continuation,
|
|
|
- Meta = #{limit := Limit},
|
|
|
- Results
|
|
|
+ QueryState,
|
|
|
+ ResultAcc
|
|
|
) ->
|
|
|
- case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
|
|
|
+ case do_query(Node, QueryState) of
|
|
|
{error, {badrpc, R}} ->
|
|
|
{error, Node, {badrpc, R}};
|
|
|
- {Len, Rows, ?FRESH_SELECT} ->
|
|
|
- {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
|
- #{meta => NMeta, data => NResults};
|
|
|
- {Len, Rows, NContinuation} ->
|
|
|
- {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
|
- do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
|
|
|
+ {Rows, NQueryState = #{continuation := ?FRESH_SELECT}} ->
|
|
|
+ {_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc),
|
|
|
+ NResultAcc;
|
|
|
+ {Rows, NQueryState} ->
|
|
|
+ case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
|
|
+ {enough, NResultAcc} ->
|
|
|
+ NResultAcc;
|
|
|
+ {more, NResultAcc} ->
|
|
|
+ do_node_query(Node, NQueryState, NResultAcc)
|
|
|
+ end
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Cluster Query
|
|
|
%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-cluster_query(QString, Tab, QSchema, QueryFun) ->
|
|
|
- {_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
- Nodes = mria_mnesia:running_nodes(),
|
|
|
- page_limit_check_query(
|
|
|
- init_meta(QString),
|
|
|
- {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]}
|
|
|
- ).
|
|
|
+-spec cluster_query(
|
|
|
+ atom(),
|
|
|
+ query_params(),
|
|
|
+ query_schema(),
|
|
|
+ query_to_match_spec_fun(),
|
|
|
+ format_result_fun()
|
|
|
+) -> {error, page_limit_invalid} | {error, atom(), term()} | query_return().
|
|
|
+cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
|
|
|
+ case parse_pager_params(QString) of
|
|
|
+ false ->
|
|
|
+ {error, page_limit_invalid};
|
|
|
+ Meta ->
|
|
|
+ {_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
+ Nodes = mria_mnesia:running_nodes(),
|
|
|
+ ResultAcc = init_query_result(),
|
|
|
+ QueryState = init_query_state(Tab, NQString, MsFun, Meta),
|
|
|
+ NResultAcc = do_cluster_query(
|
|
|
+ Nodes, QueryState, ResultAcc
|
|
|
+ ),
|
|
|
+ format_query_result(FmtFun, Meta, NResultAcc)
|
|
|
+ end.
|
|
|
|
|
|
%% @private
|
|
|
-do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) ->
|
|
|
- do_cluster_query(
|
|
|
- Nodes,
|
|
|
- Tab,
|
|
|
- QString,
|
|
|
- QueryFun,
|
|
|
- _Continuation = ?FRESH_SELECT,
|
|
|
- Meta,
|
|
|
- _Results = []
|
|
|
- ).
|
|
|
-
|
|
|
-do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) ->
|
|
|
- #{meta => Meta, data => Results};
|
|
|
+do_cluster_query([], _QueryState, ResultAcc) ->
|
|
|
+ ResultAcc;
|
|
|
do_cluster_query(
|
|
|
[Node | Tail] = Nodes,
|
|
|
- Tab,
|
|
|
- QString,
|
|
|
- QueryFun,
|
|
|
- Continuation,
|
|
|
- Meta = #{limit := Limit},
|
|
|
- Results
|
|
|
+ QueryState,
|
|
|
+ ResultAcc
|
|
|
) ->
|
|
|
- case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
|
|
|
+ case do_query(Node, QueryState) of
|
|
|
{error, {badrpc, R}} ->
|
|
|
- {error, Node, {bar_rpc, R}};
|
|
|
- {Len, Rows, ?FRESH_SELECT} ->
|
|
|
- {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
|
- do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults);
|
|
|
- {Len, Rows, NContinuation} ->
|
|
|
- {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
|
- do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
|
|
|
+ {error, Node, {badrpc, R}};
|
|
|
+ {Rows, NQueryState} ->
|
|
|
+ case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
|
|
+ {enough, NResultAcc} ->
|
|
|
+ maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc);
|
|
|
+ {more, NResultAcc} ->
|
|
|
+ NextNodes =
|
|
|
+ case NQueryState of
|
|
|
+ #{continuation := ?FRESH_SELECT} -> Tail;
|
|
|
+ _ -> Nodes
|
|
|
+ end,
|
|
|
+ do_cluster_query(NextNodes, NQueryState, NResultAcc)
|
|
|
+ end
|
|
|
+ end.
|
|
|
+
|
|
|
+maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) ->
|
|
|
+ ResultAcc;
|
|
|
+maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) ->
|
|
|
+ case counting_total_fun(QueryState) of
|
|
|
+ false ->
|
|
|
+ ResultAcc;
|
|
|
+ _Fun ->
|
|
|
+ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc)
|
|
|
+ end.
|
|
|
+
|
|
|
+collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) ->
|
|
|
+ %% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
|
|
|
+ case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of
|
|
|
+ {_, [Node | _]} ->
|
|
|
+ {error, Node, {badrpc, badnode}};
|
|
|
+ {ResL0, []} ->
|
|
|
+ ResL = lists:zip(Nodes, ResL0),
|
|
|
+ case lists:filter(fun({_, I}) -> not is_integer(I) end, ResL) of
|
|
|
+ [{Node, {badrpc, Reason}} | _] ->
|
|
|
+ {error, Node, {badrpc, Reason}};
|
|
|
+ [] ->
|
|
|
+ ResultAcc#{total => ResL ++ TotalAcc}
|
|
|
+ end
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Do Query (or rpc query)
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+%% QueryState ::
|
|
|
+%% #{continuation := ets:continuation(),
|
|
|
+%% page := pos_integer(),
|
|
|
+%% limit := pos_integer(),
|
|
|
+%% total := [{node(), non_neg_integer()}],
|
|
|
+%% table := atom(),
|
|
|
+%% qs := {Qs, Fuzzy} %% parsed query params
|
|
|
+%% msfun := query_to_match_spec_fun()
|
|
|
+%% }
|
|
|
+init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) ->
|
|
|
+ #{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
|
|
|
+ %% assert FuzzyFun type
|
|
|
+ _ =
|
|
|
+ case FuzzyFun of
|
|
|
+ undefined ->
|
|
|
+ ok;
|
|
|
+ {NamedFun, Args} ->
|
|
|
+ true = is_list(Args),
|
|
|
+ {type, external} = erlang:fun_info(NamedFun, type)
|
|
|
+ end,
|
|
|
+ #{
|
|
|
+ page => Page,
|
|
|
+ limit => Limit,
|
|
|
+ table => Tab,
|
|
|
+ qs => QString,
|
|
|
+ msfun => MsFun,
|
|
|
+ mactch_spec => Ms,
|
|
|
+ fuzzy_fun => FuzzyFun,
|
|
|
+ total => [],
|
|
|
+ continuation => ?FRESH_SELECT
|
|
|
+ }.
|
|
|
+
|
|
|
%% @private This function is exempt from BPAPI
|
|
|
-do_query(Node, Tab, QString, {M, F}, Continuation, Limit) when Node =:= node() ->
|
|
|
- erlang:apply(M, F, [Tab, QString, Continuation, Limit]);
|
|
|
-do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
|
|
|
+do_query(Node, QueryState) when Node =:= node() ->
|
|
|
+ do_select(Node, QueryState);
|
|
|
+do_query(Node, QueryState) ->
|
|
|
case
|
|
|
rpc:call(
|
|
|
Node,
|
|
|
?MODULE,
|
|
|
do_query,
|
|
|
- [Node, Tab, QString, QueryFun, Continuation, Limit],
|
|
|
- 50000
|
|
|
+ [Node, QueryState],
|
|
|
+ ?LONG_QUERY_TIMEOUT
|
|
|
)
|
|
|
of
|
|
|
{badrpc, _} = R -> {error, R};
|
|
|
Ret -> Ret
|
|
|
end.
|
|
|
|
|
|
-sub_query_result(Len, Rows, Limit, Results, Meta) ->
|
|
|
- {Flag, NMeta} = judge_page_with_counting(Len, Meta),
|
|
|
- NResults =
|
|
|
- case Flag of
|
|
|
- more ->
|
|
|
- [];
|
|
|
- cutrows ->
|
|
|
- {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta),
|
|
|
- ThisRows = lists:sublist(Rows, SubStart, NeedNowNum),
|
|
|
- lists:sublist(lists:append(Results, ThisRows), SubStart, Limit);
|
|
|
- enough ->
|
|
|
- lists:sublist(lists:append(Results, Rows), 1, Limit)
|
|
|
+do_select(
|
|
|
+ Node,
|
|
|
+ QueryState0 = #{
|
|
|
+ table := Tab,
|
|
|
+ mactch_spec := Ms,
|
|
|
+ fuzzy_fun := FuzzyFun,
|
|
|
+ continuation := Continuation,
|
|
|
+ limit := Limit
|
|
|
+ }
|
|
|
+) ->
|
|
|
+ QueryState = maybe_apply_total_query(Node, QueryState0),
|
|
|
+ Result =
|
|
|
+ case Continuation of
|
|
|
+ ?FRESH_SELECT ->
|
|
|
+ ets:select(Tab, Ms, Limit);
|
|
|
+ _ ->
|
|
|
+ %% XXX: Repair is necessary because we pass Continuation back
|
|
|
+ %% and forth through the nodes in the `do_cluster_query`
|
|
|
+ ets:select(ets:repair_continuation(Continuation, Ms))
|
|
|
end,
|
|
|
- {NMeta, NResults}.
|
|
|
+ case Result of
|
|
|
+ '$end_of_table' ->
|
|
|
+ {[], QueryState#{continuation => ?FRESH_SELECT}};
|
|
|
+ {Rows, NContinuation} ->
|
|
|
+ NRows =
|
|
|
+ case FuzzyFun of
|
|
|
+ undefined ->
|
|
|
+ Rows;
|
|
|
+ {FilterFun, Args0} when is_function(FilterFun), is_list(Args0) ->
|
|
|
+ lists:filter(
|
|
|
+ fun(E) -> erlang:apply(FilterFun, [E | Args0]) end,
|
|
|
+ Rows
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ {NRows, QueryState#{continuation => NContinuation}}
|
|
|
+ end.
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Table Select
|
|
|
-%%--------------------------------------------------------------------
|
|
|
+maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) ->
|
|
|
+ case proplists:get_value(Node, TotalAcc, undefined) of
|
|
|
+ undefined ->
|
|
|
+ Total = apply_total_query(QueryState),
|
|
|
+ QueryState#{total := [{Node, Total} | TotalAcc]};
|
|
|
+ _ ->
|
|
|
+ QueryState
|
|
|
+ end.
|
|
|
|
|
|
-select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit, FmtFun) when
|
|
|
- is_function(FuzzyFilterFun) andalso Limit > 0
|
|
|
-->
|
|
|
- case ets:select(Tab, Ms, Limit) of
|
|
|
- '$end_of_table' ->
|
|
|
- {0, [], ?FRESH_SELECT};
|
|
|
- {RawResult, NContinuation} ->
|
|
|
- Rows = FuzzyFilterFun(RawResult),
|
|
|
- {length(Rows), lists:map(FmtFun, Rows), NContinuation}
|
|
|
- end;
|
|
|
-select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit, FmtFun) when
|
|
|
- is_function(FuzzyFilterFun)
|
|
|
-->
|
|
|
- case ets:select(ets:repair_continuation(Continuation, Ms)) of
|
|
|
- '$end_of_table' ->
|
|
|
- {0, [], ?FRESH_SELECT};
|
|
|
- {RawResult, NContinuation} ->
|
|
|
- Rows = FuzzyFilterFun(RawResult),
|
|
|
- {length(Rows), lists:map(FmtFun, Rows), NContinuation}
|
|
|
- end;
|
|
|
-select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit, FmtFun) when
|
|
|
- Limit > 0
|
|
|
-->
|
|
|
- case ets:select(Tab, Ms, Limit) of
|
|
|
- '$end_of_table' ->
|
|
|
- {0, [], ?FRESH_SELECT};
|
|
|
- {RawResult, NContinuation} ->
|
|
|
- {length(RawResult), lists:map(FmtFun, RawResult), NContinuation}
|
|
|
+apply_total_query(QueryState = #{table := Tab}) ->
|
|
|
+ case counting_total_fun(QueryState) of
|
|
|
+ false ->
|
|
|
+ %% return a fake total number if the query have any conditions
|
|
|
+ 0;
|
|
|
+ Fun ->
|
|
|
+ Fun(Tab)
|
|
|
+ end.
|
|
|
+
|
|
|
+counting_total_fun(_QueryState = #{qs := {[], []}}) ->
|
|
|
+ fun(Tab) -> ets:info(Tab, size) end;
|
|
|
+counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) ->
|
|
|
+ %% XXX: Calculating the total number of data that match a certain
|
|
|
+ %% condition under a large table is very expensive because the
|
|
|
+ %% entire ETS table needs to be scanned.
|
|
|
+ %%
|
|
|
+ %% XXX: How to optimize it? i.e, using:
|
|
|
+ [{MatchHead, Conditions, _Return}] = Ms,
|
|
|
+ CountingMs = [{MatchHead, Conditions, [true]}],
|
|
|
+ fun(Tab) ->
|
|
|
+ ets:select_count(Tab, CountingMs)
|
|
|
end;
|
|
|
-select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) ->
|
|
|
- case ets:select(ets:repair_continuation(Continuation, Ms)) of
|
|
|
- '$end_of_table' ->
|
|
|
- {0, [], ?FRESH_SELECT};
|
|
|
- {RawResult, NContinuation} ->
|
|
|
- {length(RawResult), lists:map(FmtFun, RawResult), NContinuation}
|
|
|
+counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined ->
|
|
|
+ %% XXX: Calculating the total number for a fuzzy searching is very very expensive
|
|
|
+ %% so it is not supported now
|
|
|
+ false.
|
|
|
+
|
|
|
+%% ResultAcc :: #{count := integer(),
|
|
|
+%% cursor := integer(),
|
|
|
+%% rows := [{node(), Rows :: list()}],
|
|
|
+%% total := [{node() => integer()}]
|
|
|
+%% }
|
|
|
+init_query_result() ->
|
|
|
+ #{cursor => 0, count => 0, rows => [], total => []}.
|
|
|
+
|
|
|
+accumulate_query_rows(
|
|
|
+ Node,
|
|
|
+ Rows,
|
|
|
+ _QueryState = #{page := Page, limit := Limit, total := TotalAcc},
|
|
|
+ ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
|
|
|
+) ->
|
|
|
+ PageStart = (Page - 1) * Limit + 1,
|
|
|
+ PageEnd = Page * Limit,
|
|
|
+ Len = length(Rows),
|
|
|
+ case Cursor + Len of
|
|
|
+ NCursor when NCursor < PageStart ->
|
|
|
+ {more, ResultAcc#{cursor => NCursor, total => TotalAcc}};
|
|
|
+ NCursor when NCursor < PageEnd ->
|
|
|
+ {more, ResultAcc#{
|
|
|
+ cursor => NCursor,
|
|
|
+ count => Count + length(Rows),
|
|
|
+ total => TotalAcc,
|
|
|
+ rows => [{Node, Rows} | RowsAcc]
|
|
|
+ }};
|
|
|
+ NCursor when NCursor >= PageEnd ->
|
|
|
+ SubRows = lists:sublist(Rows, Limit - Count),
|
|
|
+ {enough, ResultAcc#{
|
|
|
+ cursor => NCursor,
|
|
|
+ count => Count + length(SubRows),
|
|
|
+ total => TotalAcc,
|
|
|
+ rows => [{Node, SubRows} | RowsAcc]
|
|
|
+ }}
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -295,6 +435,7 @@ parse_qstring(QString, QSchema) ->
|
|
|
{length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}.
|
|
|
|
|
|
do_parse_qstring([], _, Acc1, Acc2) ->
|
|
|
+ %% remove fuzzy keys if present in accurate query
|
|
|
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
|
|
|
{lists:reverse(Acc1), lists:reverse(NAcc2)};
|
|
|
do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) ->
|
|
|
@@ -379,40 +520,41 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
|
|
|
is_fuzzy_key(_) ->
|
|
|
false.
|
|
|
|
|
|
-page_start(1, _) -> 1;
|
|
|
-page_start(Page, Limit) -> (Page - 1) * Limit + 1.
|
|
|
+format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) ->
|
|
|
+ Error;
|
|
|
+format_query_result(
|
|
|
+ FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc}
|
|
|
+) ->
|
|
|
+ Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc),
|
|
|
+ #{
|
|
|
+ %% The `count` is used in HTTP API to indicate the total number of
|
|
|
+ %% queries that can be read
|
|
|
+ meta => Meta#{count => Total},
|
|
|
+ data => lists:flatten(
|
|
|
+ lists:foldl(
|
|
|
+ fun({Node, Rows}, Acc) ->
|
|
|
+ [lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc]
|
|
|
+ end,
|
|
|
+ [],
|
|
|
+ RowsAcc
|
|
|
+ )
|
|
|
+ )
|
|
|
+ }.
|
|
|
|
|
|
-judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) ->
|
|
|
- PageStart = page_start(Page, Limit),
|
|
|
- PageEnd = Page * Limit,
|
|
|
- case Count + Len of
|
|
|
- NCount when NCount < PageStart ->
|
|
|
- {more, Meta#{count => NCount}};
|
|
|
- NCount when NCount < PageEnd ->
|
|
|
- {cutrows, Meta#{count => NCount}};
|
|
|
- NCount when NCount >= PageEnd ->
|
|
|
- {enough, Meta#{count => NCount}}
|
|
|
+exec_format_fun(FmtFun, Node, Row) ->
|
|
|
+ case erlang:fun_info(FmtFun, arity) of
|
|
|
+ {arity, 1} -> FmtFun(Row);
|
|
|
+ {arity, 2} -> FmtFun(Node, Row)
|
|
|
end.
|
|
|
|
|
|
-rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) ->
|
|
|
- PageStart = page_start(Page, Limit),
|
|
|
- case (Count - Len) < PageStart of
|
|
|
+parse_pager_params(Params) ->
|
|
|
+ Page = b2i(page(Params)),
|
|
|
+ Limit = b2i(limit(Params)),
|
|
|
+ case Page > 0 andalso Limit > 0 of
|
|
|
true ->
|
|
|
- NeedNowNum = Count - PageStart + 1,
|
|
|
- SubStart = Len - NeedNowNum + 1,
|
|
|
- {SubStart, NeedNowNum};
|
|
|
+ #{page => Page, limit => Limit, count => 0};
|
|
|
false ->
|
|
|
- {_SubStart = 1, _NeedNowNum = Len}
|
|
|
- end.
|
|
|
-
|
|
|
-page_limit_check_query(Meta, {F, A}) ->
|
|
|
- case Meta of
|
|
|
- #{page := Page, limit := Limit} when
|
|
|
- Page < 1; Limit < 1
|
|
|
- ->
|
|
|
- {error, page_limit_invalid};
|
|
|
- _ ->
|
|
|
- erlang:apply(F, A)
|
|
|
+ false
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -458,6 +600,11 @@ to_ip_port(IPAddress) ->
|
|
|
Port = list_to_integer(Port0),
|
|
|
{IP, Port}.
|
|
|
|
|
|
+b2i(Bin) when is_binary(Bin) ->
|
|
|
+ binary_to_integer(Bin);
|
|
|
+b2i(Any) ->
|
|
|
+ Any.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% EUnits
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -502,8 +649,3 @@ params2qs_test() ->
|
|
|
{0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
|
|
|
|
|
|
-endif.
|
|
|
-
|
|
|
-b2i(Bin) when is_binary(Bin) ->
|
|
|
- binary_to_integer(Bin);
|
|
|
-b2i(Any) ->
|
|
|
- Any.
|