|
|
@@ -132,19 +132,20 @@ init_meta(Params) ->
|
|
|
%% Node Query
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-node_query(Node, Params, Tab, QsSchema, QueryFun) ->
|
|
|
- {_CodCnt, Qs} = params2qs(Params, QsSchema),
|
|
|
- page_limit_check_query(init_meta(Params),
|
|
|
- {fun do_node_query/5, [Node, Tab, Qs, QueryFun, init_meta(Params)]}).
|
|
|
+node_query(Node, QString, Tab, QSchema, QueryFun) ->
|
|
|
+ {_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
+ page_limit_check_query( init_meta(QString)
|
|
|
+ , { fun do_node_query/5
|
|
|
+ , [Node, Tab, NQString, QueryFun, init_meta(QString)]}).
|
|
|
|
|
|
%% @private
|
|
|
-do_node_query(Node, Tab, Qs, QueryFun, Meta) ->
|
|
|
- do_node_query(Node, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
|
|
|
+do_node_query(Node, Tab, QString, QueryFun, Meta) ->
|
|
|
+ do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
|
|
|
|
|
|
-do_node_query( Node, Tab, Qs, QueryFun, Continuation
|
|
|
+do_node_query( Node, Tab, QString, QueryFun, Continuation
|
|
|
, Meta = #{limit := Limit}
|
|
|
, Results) ->
|
|
|
- case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of
|
|
|
+ case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
|
|
|
{error, {badrpc, R}} ->
|
|
|
{error, Node, {badrpc, R}};
|
|
|
{Len, Rows, ?FRESH_SELECT} ->
|
|
|
@@ -152,36 +153,38 @@ do_node_query( Node, Tab, Qs, QueryFun, Continuation
|
|
|
#{meta => NMeta, data => NResults};
|
|
|
{Len, Rows, NContinuation} ->
|
|
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
|
- do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
|
|
|
+ do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Cluster Query
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-cluster_query(Params, Tab, QsSchema, QueryFun) ->
|
|
|
- {_CodCnt, Qs} = params2qs(Params, QsSchema),
|
|
|
+cluster_query(QString, Tab, QSchema, QueryFun) ->
|
|
|
+ {_CodCnt, NQString} = parse_qstring(QString, QSchema),
|
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
|
- page_limit_check_query(init_meta(Params),
|
|
|
- {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, init_meta(Params)]}).
|
|
|
+ page_limit_check_query(
|
|
|
+ init_meta(QString)
|
|
|
+ , {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]}).
|
|
|
|
|
|
%% @private
|
|
|
-do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) ->
|
|
|
- do_cluster_query(Nodes, Tab, Qs, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []).
|
|
|
+do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) ->
|
|
|
+ do_cluster_query( Nodes, Tab, QString, QueryFun
|
|
|
+ , _Continuation = ?FRESH_SELECT, Meta, _Results = []).
|
|
|
|
|
|
-do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, Meta, Results) ->
|
|
|
+do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) ->
|
|
|
#{meta => Meta, data => Results};
|
|
|
-do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation,
|
|
|
+do_cluster_query([Node | Tail] = Nodes, Tab, QString, QueryFun, Continuation,
|
|
|
Meta = #{limit := Limit}, Results) ->
|
|
|
- case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of
|
|
|
+ case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of
|
|
|
{error, {badrpc, R}} ->
|
|
|
{error, Node, {bar_rpc, R}};
|
|
|
{Len, Rows, ?FRESH_SELECT} ->
|
|
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
|
- do_cluster_query(Tail, Tab, Qs, QueryFun, ?FRESH_SELECT, NMeta, NResults);
|
|
|
+ do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults);
|
|
|
{Len, Rows, NContinuation} ->
|
|
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
|
|
- do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
|
|
|
+ do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults)
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -189,11 +192,11 @@ do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation,
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
%% @private This function is exempt from BPAPI
|
|
|
-do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() ->
|
|
|
- erlang:apply(M, F, [Tab, Qs, Continuation, Limit]);
|
|
|
-do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) ->
|
|
|
+do_query(Node, Tab, QString, {M,F}, Continuation, Limit) when Node =:= node() ->
|
|
|
+ erlang:apply(M, F, [Tab, QString, Continuation, Limit]);
|
|
|
+do_query(Node, Tab, QString, QueryFun, Continuation, Limit) ->
|
|
|
case rpc:call(Node, ?MODULE, do_query,
|
|
|
- [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000) of
|
|
|
+ [Node, Tab, QString, QueryFun, Continuation, Limit], 50000) of
|
|
|
{badrpc, _} = R -> {error, R};
|
|
|
Ret -> Ret
|
|
|
end.
|
|
|
@@ -255,19 +258,19 @@ select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) ->
|
|
|
%% Internal Functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-params2qs(Params, QsSchema) when is_map(Params) ->
|
|
|
- params2qs(maps:to_list(Params), QsSchema);
|
|
|
-params2qs(Params, QsSchema) ->
|
|
|
- {Qs, Fuzzy} = pick_params_to_qs(Params, QsSchema, [], []),
|
|
|
- {length(Qs) + length(Fuzzy), {Qs, Fuzzy}}.
|
|
|
+parse_qstring(QString, QSchema) when is_map(QString) ->
|
|
|
+ parse_qstring(maps:to_list(QString), QSchema);
|
|
|
+parse_qstring(QString, QSchema) ->
|
|
|
+ {NQString, FuzzyQString} = do_parse_qstring(QString, QSchema, [], []),
|
|
|
+ {length(NQString) + length(FuzzyQString), {NQString, FuzzyQString}}.
|
|
|
|
|
|
-pick_params_to_qs([], _, Acc1, Acc2) ->
|
|
|
+do_parse_qstring([], _, Acc1, Acc2) ->
|
|
|
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
|
|
|
{lists:reverse(Acc1), lists:reverse(NAcc2)};
|
|
|
|
|
|
-pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) ->
|
|
|
- case proplists:get_value(Key, QsSchema) of
|
|
|
- undefined -> pick_params_to_qs(Params, QsSchema, Acc1, Acc2);
|
|
|
+do_parse_qstring([{Key, Value} | RestQString], QSchema, Acc1, Acc2) ->
|
|
|
+ case proplists:get_value(Key, QSchema) of
|
|
|
+ undefined -> do_parse_qstring(RestQString, QSchema, Acc1, Acc2);
|
|
|
Type ->
|
|
|
case Key of
|
|
|
<<Prefix:4/binary, NKey/binary>>
|
|
|
@@ -277,22 +280,22 @@ pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) ->
|
|
|
<<"gte_">> -> <<"lte_", NKey/binary>>;
|
|
|
<<"lte_">> -> <<"gte_", NKey/binary>>
|
|
|
end,
|
|
|
- case lists:keytake(OpposeKey, 1, Params) of
|
|
|
+ case lists:keytake(OpposeKey, 1, RestQString) of
|
|
|
false ->
|
|
|
- pick_params_to_qs(Params, QsSchema,
|
|
|
- [qs(Key, Value, Type) | Acc1], Acc2);
|
|
|
+ do_parse_qstring( RestQString, QSchema
|
|
|
+ , [qs(Key, Value, Type) | Acc1], Acc2);
|
|
|
{value, {K2, V2}, NParams} ->
|
|
|
- pick_params_to_qs(NParams, QsSchema,
|
|
|
- [qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
|
|
|
+ do_parse_qstring( NParams, QSchema
|
|
|
+ , [qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
|
|
|
end;
|
|
|
_ ->
|
|
|
case is_fuzzy_key(Key) of
|
|
|
true ->
|
|
|
- pick_params_to_qs(Params, QsSchema, Acc1,
|
|
|
- [qs(Key, Value, Type) | Acc2]);
|
|
|
+ do_parse_qstring( RestQString, QSchema
|
|
|
+ , Acc1, [qs(Key, Value, Type) | Acc2]);
|
|
|
_ ->
|
|
|
- pick_params_to_qs(Params, QsSchema,
|
|
|
- [qs(Key, Value, Type) | Acc1], Acc2)
|
|
|
+ do_parse_qstring( RestQString, QSchema
|
|
|
+ , [qs(Key, Value, Type) | Acc1], Acc2)
|
|
|
|
|
|
end
|
|
|
end
|
|
|
@@ -397,7 +400,6 @@ to_integer(B) when is_binary(B) ->
|
|
|
to_timestamp(I) when is_integer(I) ->
|
|
|
I;
|
|
|
to_timestamp(B) when is_binary(B) ->
|
|
|
-
|
|
|
binary_to_integer(B).
|
|
|
|
|
|
aton(B) when is_binary(B) ->
|
|
|
@@ -417,7 +419,7 @@ to_ip_port(IPAddress) ->
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
params2qs_test() ->
|
|
|
- Schema = [{<<"str">>, binary},
|
|
|
+ QSchema = [{<<"str">>, binary},
|
|
|
{<<"int">>, integer},
|
|
|
{<<"atom">>, atom},
|
|
|
{<<"ts">>, timestamp},
|
|
|
@@ -425,7 +427,7 @@ params2qs_test() ->
|
|
|
{<<"lte_range">>, integer},
|
|
|
{<<"like_fuzzy">>, binary},
|
|
|
{<<"match_topic">>, binary}],
|
|
|
- Params = [{<<"str">>, <<"abc">>},
|
|
|
+ QString = [{<<"str">>, <<"abc">>},
|
|
|
{<<"int">>, <<"123">>},
|
|
|
{<<"atom">>, <<"connected">>},
|
|
|
{<<"ts">>, <<"156000">>},
|
|
|
@@ -439,11 +441,11 @@ params2qs_test() ->
|
|
|
{ts, '=:=', 156000},
|
|
|
{range, '>=', 1, '=<', 5}
|
|
|
],
|
|
|
- FuzzyQs = [{fuzzy, like, <<"user">>},
|
|
|
- {topic, match, <<"t/#">>}],
|
|
|
- ?assertEqual({7, {ExpectedQs, FuzzyQs}}, params2qs(Params, Schema)),
|
|
|
+ FuzzyNQString = [{fuzzy, like, <<"user">>},
|
|
|
+ {topic, match, <<"t/#">>}],
|
|
|
+ ?assertEqual({7, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)),
|
|
|
|
|
|
- {0, {[], []}} = params2qs([{not_a_predefined_params, val}], Schema).
|
|
|
+ {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
|
|
|
|
|
|
-endif.
|
|
|
|