|
|
@@ -23,8 +23,7 @@
|
|
|
-define(LONG_QUERY_TIMEOUT, 50000).
|
|
|
|
|
|
-export([
|
|
|
- paginate/3,
|
|
|
- paginate/4
|
|
|
+ paginate/3
|
|
|
]).
|
|
|
|
|
|
%% first_next query APIs
|
|
|
@@ -34,6 +33,10 @@
|
|
|
b2i/1
|
|
|
]).
|
|
|
|
|
|
+-ifdef(TEST).
|
|
|
+-export([paginate_test_format/1]).
|
|
|
+-endif.
|
|
|
+
|
|
|
-export_type([
|
|
|
match_spec_and_filter/0
|
|
|
]).
|
|
|
@@ -58,14 +61,14 @@
|
|
|
|
|
|
-export([do_query/2, apply_total_query/1]).
|
|
|
|
|
|
-paginate(Tables, Params, {Module, FormatFun}) ->
|
|
|
- Qh = query_handle(Tables),
|
|
|
- Count = count(Tables),
|
|
|
- do_paginate(Qh, Count, Params, {Module, FormatFun}).
|
|
|
-
|
|
|
-paginate(Tables, MatchSpec, Params, {Module, FormatFun}) ->
|
|
|
- Qh = query_handle(Tables, MatchSpec),
|
|
|
- Count = count(Tables, MatchSpec),
|
|
|
+-spec paginate(atom(), map(), {atom(), atom()}) ->
|
|
|
+ #{
|
|
|
+ meta => #{page => pos_integer(), limit => pos_integer(), count => pos_integer()},
|
|
|
+ data => list(term())
|
|
|
+ }.
|
|
|
+paginate(Table, Params, {Module, FormatFun}) ->
|
|
|
+ Qh = query_handle(Table),
|
|
|
+ Count = count(Table),
|
|
|
do_paginate(Qh, Count, Params, {Module, FormatFun}).
|
|
|
|
|
|
do_paginate(Qh, Count, Params, {Module, FormatFun}) ->
|
|
|
@@ -86,57 +89,17 @@ do_paginate(Qh, Count, Params, {Module, FormatFun}) ->
|
|
|
data => [erlang:apply(Module, FormatFun, [Row]) || Row <- Rows]
|
|
|
}.
|
|
|
|
|
|
-query_handle(Table) when is_atom(Table) ->
|
|
|
- qlc:q([R || R <- ets:table(Table)]);
|
|
|
-query_handle({Table, Opts}) when is_atom(Table) ->
|
|
|
- qlc:q([R || R <- ets:table(Table, Opts)]);
|
|
|
-query_handle([Table]) when is_atom(Table) ->
|
|
|
- qlc:q([R || R <- ets:table(Table)]);
|
|
|
-query_handle([{Table, Opts}]) when is_atom(Table) ->
|
|
|
- qlc:q([R || R <- ets:table(Table, Opts)]);
|
|
|
-query_handle(Tables) ->
|
|
|
- %
|
|
|
- qlc:append([query_handle(T) || T <- Tables]).
|
|
|
-
|
|
|
-query_handle(Table, MatchSpec) when is_atom(Table) ->
|
|
|
- Options = {traverse, {select, MatchSpec}},
|
|
|
- qlc:q([R || R <- ets:table(Table, Options)]);
|
|
|
-query_handle([Table], MatchSpec) when is_atom(Table) ->
|
|
|
- Options = {traverse, {select, MatchSpec}},
|
|
|
- qlc:q([R || R <- ets:table(Table, Options)]);
|
|
|
-query_handle(Tables, MatchSpec) ->
|
|
|
- Options = {traverse, {select, MatchSpec}},
|
|
|
- qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]).
|
|
|
-
|
|
|
-count(Table) when is_atom(Table) ->
|
|
|
- ets:info(Table, size);
|
|
|
-count({Table, _}) when is_atom(Table) ->
|
|
|
- ets:info(Table, size);
|
|
|
-count([Table]) when is_atom(Table) ->
|
|
|
- ets:info(Table, size);
|
|
|
-count([{Table, _}]) when is_atom(Table) ->
|
|
|
- ets:info(Table, size);
|
|
|
-count(Tables) ->
|
|
|
- lists:sum([count(T) || T <- Tables]).
|
|
|
-
|
|
|
-count(Table, MatchSpec) when is_atom(Table) ->
|
|
|
- [{MatchPattern, Where, _Re}] = MatchSpec,
|
|
|
- NMatchSpec = [{MatchPattern, Where, [true]}],
|
|
|
- ets:select_count(Table, NMatchSpec);
|
|
|
-count([Table], MatchSpec) when is_atom(Table) ->
|
|
|
- count(Table, MatchSpec);
|
|
|
-count(Tables, MatchSpec) ->
|
|
|
- lists:sum([count(T, MatchSpec) || T <- Tables]).
|
|
|
-
|
|
|
-page(Params) when is_map(Params) ->
|
|
|
- maps:get(<<"page">>, Params, 1);
|
|
|
+query_handle(Table) ->
|
|
|
+ qlc:q([R || R <- ets:table(Table)]).
|
|
|
+
|
|
|
+count(Table) ->
|
|
|
+ ets:info(Table, size).
|
|
|
+
|
|
|
page(Params) ->
|
|
|
- proplists:get_value(<<"page">>, Params, <<"1">>).
|
|
|
+ maps:get(<<"page">>, Params, 1).
|
|
|
|
|
|
-limit(Params) when is_map(Params) ->
|
|
|
- maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit());
|
|
|
limit(Params) ->
|
|
|
- proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
|
|
|
+ maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Node Query
|
|
|
@@ -210,8 +173,6 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
|
|
|
end.
|
|
|
|
|
|
%% @private
|
|
|
-do_cluster_query([], QueryState, ResultAcc) ->
|
|
|
- finalize_query(ResultAcc, mark_complete(QueryState));
|
|
|
do_cluster_query(
|
|
|
[Node | Tail] = Nodes,
|
|
|
QueryState,
|
|
|
@@ -605,7 +566,7 @@ to_type(V, TargetType) ->
|
|
|
to_type_(V, atom) -> to_atom(V);
|
|
|
to_type_(V, integer) -> to_integer(V);
|
|
|
to_type_(V, timestamp) -> to_timestamp(V);
|
|
|
-to_type_(V, ip) -> aton(V);
|
|
|
+to_type_(V, ip) -> to_ip(V);
|
|
|
to_type_(V, ip_port) -> to_ip_port(V);
|
|
|
to_type_(V, _) -> V.
|
|
|
|
|
|
@@ -624,14 +585,16 @@ to_timestamp(I) when is_integer(I) ->
|
|
|
to_timestamp(B) when is_binary(B) ->
|
|
|
binary_to_integer(B).
|
|
|
|
|
|
-aton(B) when is_binary(B) ->
|
|
|
- list_to_tuple([binary_to_integer(T) || T <- re:split(B, "[.]")]).
|
|
|
+to_ip(IP0) when is_binary(IP0) ->
|
|
|
+ ensure_ok(inet:parse_address(binary_to_list(IP0))).
|
|
|
|
|
|
to_ip_port(IPAddress) ->
|
|
|
- [IP0, Port0] = string:tokens(binary_to_list(IPAddress), ":"),
|
|
|
- {ok, IP} = inet:parse_address(IP0),
|
|
|
- Port = list_to_integer(Port0),
|
|
|
- {IP, Port}.
|
|
|
+ ensure_ok(emqx_schema:to_ip_port(IPAddress)).
|
|
|
+
|
|
|
+ensure_ok({ok, V}) ->
|
|
|
+ V;
|
|
|
+ensure_ok({error, _R} = E) ->
|
|
|
+ throw(E).
|
|
|
|
|
|
b2i(Bin) when is_binary(Bin) ->
|
|
|
binary_to_integer(Bin);
|
|
|
@@ -645,40 +608,115 @@ b2i(Any) ->
|
|
|
-ifdef(TEST).
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
-params2qs_test() ->
|
|
|
+params2qs_test_() ->
|
|
|
QSchema = [
|
|
|
{<<"str">>, binary},
|
|
|
{<<"int">>, integer},
|
|
|
+ {<<"binatom">>, atom},
|
|
|
{<<"atom">>, atom},
|
|
|
{<<"ts">>, timestamp},
|
|
|
{<<"gte_range">>, integer},
|
|
|
{<<"lte_range">>, integer},
|
|
|
{<<"like_fuzzy">>, binary},
|
|
|
- {<<"match_topic">>, binary}
|
|
|
+ {<<"match_topic">>, binary},
|
|
|
+ {<<"ip">>, ip},
|
|
|
+ {<<"ip_port">>, ip_port}
|
|
|
],
|
|
|
QString = [
|
|
|
{<<"str">>, <<"abc">>},
|
|
|
{<<"int">>, <<"123">>},
|
|
|
- {<<"atom">>, <<"connected">>},
|
|
|
+ {<<"binatom">>, <<"connected">>},
|
|
|
+ {<<"atom">>, ok},
|
|
|
{<<"ts">>, <<"156000">>},
|
|
|
{<<"gte_range">>, <<"1">>},
|
|
|
{<<"lte_range">>, <<"5">>},
|
|
|
{<<"like_fuzzy">>, <<"user">>},
|
|
|
- {<<"match_topic">>, <<"t/#">>}
|
|
|
+ {<<"match_topic">>, <<"t/#">>},
|
|
|
+ {<<"ip">>, <<"127.0.0.1">>},
|
|
|
+ {<<"ip_port">>, <<"127.0.0.1:8888">>}
|
|
|
],
|
|
|
ExpectedQs = [
|
|
|
{str, '=:=', <<"abc">>},
|
|
|
{int, '=:=', 123},
|
|
|
- {atom, '=:=', connected},
|
|
|
+ {binatom, '=:=', connected},
|
|
|
+ {atom, '=:=', ok},
|
|
|
{ts, '=:=', 156000},
|
|
|
- {range, '>=', 1, '=<', 5}
|
|
|
+ {range, '>=', 1, '=<', 5},
|
|
|
+ {ip, '=:=', {127, 0, 0, 1}},
|
|
|
+ {ip_port, '=:=', {{127, 0, 0, 1}, 8888}}
|
|
|
],
|
|
|
FuzzyNQString = [
|
|
|
{fuzzy, like, <<"user">>},
|
|
|
{topic, match, <<"t/#">>}
|
|
|
],
|
|
|
- ?assertEqual({7, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)),
|
|
|
-
|
|
|
- {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema).
|
|
|
|
|
|
+ [
|
|
|
+ ?_assertEqual({10, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)),
|
|
|
+ ?_assertEqual({0, {[], []}}, parse_qstring([{not_a_predefined_params, val}], QSchema)),
|
|
|
+ ?_assertEqual(
|
|
|
+ {1, {[{ip, '=:=', {0, 0, 0, 0, 0, 0, 0, 1}}], []}},
|
|
|
+ parse_qstring([{<<"ip">>, <<"::1">>}], QSchema)
|
|
|
+ ),
|
|
|
+ ?_assertEqual(
|
|
|
+ {1, {[{ip_port, '=:=', {{0, 0, 0, 0, 0, 0, 0, 1}, 8888}}], []}},
|
|
|
+ parse_qstring([{<<"ip_port">>, <<"::1:8888">>}], QSchema)
|
|
|
+ ),
|
|
|
+ ?_assertThrow(
|
|
|
+ {bad_value_type, {<<"ip">>, ip, <<"helloworld">>}},
|
|
|
+ parse_qstring([{<<"ip">>, <<"helloworld">>}], QSchema)
|
|
|
+ ),
|
|
|
+ ?_assertThrow(
|
|
|
+ {bad_value_type, {<<"ip_port">>, ip_port, <<"127.0.0.1">>}},
|
|
|
+ parse_qstring([{<<"ip_port">>, <<"127.0.0.1">>}], QSchema)
|
|
|
+ ),
|
|
|
+ ?_assertThrow(
|
|
|
+ {bad_value_type, {<<"ip_port">>, ip_port, <<"helloworld:abcd">>}},
|
|
|
+ parse_qstring([{<<"ip_port">>, <<"helloworld:abcd">>}], QSchema)
|
|
|
+ )
|
|
|
+ ].
|
|
|
+
|
|
|
+paginate_test_format(Row) ->
|
|
|
+ Row.
|
|
|
+
|
|
|
+paginate_test_() ->
|
|
|
+ _ = ets:new(?MODULE, [named_table]),
|
|
|
+ Size = 1000,
|
|
|
+ MyLimit = 10,
|
|
|
+ ets:insert(?MODULE, [{I, foo} || I <- lists:seq(1, Size)]),
|
|
|
+ DefaultLimit = emqx_mgmt:max_row_limit(),
|
|
|
+ NoParamsResult = paginate(?MODULE, #{}, {?MODULE, paginate_test_format}),
|
|
|
+ PaginateResults = [
|
|
|
+ paginate(
|
|
|
+ ?MODULE, #{<<"page">> => I, <<"limit">> => MyLimit}, {?MODULE, paginate_test_format}
|
|
|
+ )
|
|
|
+ || I <- lists:seq(1, floor(Size / MyLimit))
|
|
|
+ ],
|
|
|
+ [
|
|
|
+ ?_assertMatch(
|
|
|
+ #{meta := #{count := Size, page := 1, limit := DefaultLimit}}, NoParamsResult
|
|
|
+ ),
|
|
|
+ ?_assertEqual(DefaultLimit, length(maps:get(data, NoParamsResult))),
|
|
|
+ ?_assertEqual(
|
|
|
+ #{data => [], meta => #{count => Size, limit => DefaultLimit, page => 100}},
|
|
|
+ paginate(?MODULE, #{<<"page">> => <<"100">>}, {?MODULE, paginate_test_format})
|
|
|
+ )
|
|
|
+ ] ++ assert_paginate_results(PaginateResults, Size, MyLimit).
|
|
|
+
|
|
|
+assert_paginate_results(Results, Size, Limit) ->
|
|
|
+ AllData = lists:flatten([Data || #{data := Data} <- Results]),
|
|
|
+ [
|
|
|
+ begin
|
|
|
+ Result = lists:nth(I, Results),
|
|
|
+ [
|
|
|
+ ?_assertMatch(#{meta := #{count := Size, limit := Limit, page := I}}, Result),
|
|
|
+ ?_assertEqual(Limit, length(maps:get(data, Result)))
|
|
|
+ ]
|
|
|
+ end
|
|
|
+ || I <- lists:seq(1, floor(Size / Limit))
|
|
|
+ ] ++
|
|
|
+ [
|
|
|
+ ?_assertEqual(floor(Size / Limit), length(Results)),
|
|
|
+ ?_assertEqual(Size, length(AllData)),
|
|
|
+ ?_assertEqual(Size, sets:size(sets:from_list(AllData)))
|
|
|
+ ].
|
|
|
-endif.
|