Przeglądaj źródła

fix(mgmt): hide route selection behind router interface

Also introduce a generic _stream_ concept, mostly to deal with
iterating over 2 ETS tables at once with `ets:match_object/3`.
Andrew Mayorov 2 lat temu
rodzic
commit
6812ee9d0f

+ 81 - 1
apps/emqx/src/emqx_router.erl

@@ -52,6 +52,9 @@
     lookup_routes/1
 ]).
 
+%% Topics API
+-export([select/3]).
+
 -export([print_routes/1]).
 
 -export([
@@ -59,7 +62,10 @@
     foldr_routes/2
 ]).
 
--export([topics/0]).
+-export([
+    topics/0,
+    stats/1
+]).
 
 %% Exported for tests
 -export([has_route/2]).
@@ -219,6 +225,19 @@ mria_delete_route(v2, Topic, Dest) ->
 mria_delete_route(v1, Topic, Dest) ->
     mria_delete_route_v1(Topic, Dest).
 
+-spec select(Spec, _Limit :: pos_integer(), Continuation) ->
+    {[emqx_types:route()], Continuation} | '$end_of_table'
+when
+    Spec :: {_TopicPat, _DestPat},
+    Continuation :: term() | '$end_of_table'.
+select(MatchSpec, Limit, Cont) ->
+    select(get_schema_vsn(), MatchSpec, Limit, Cont).
+
+select(v2, MatchSpec, Limit, Cont) ->
+    select_v2(MatchSpec, Limit, Cont);
+select(v1, MatchSpec, Limit, Cont) ->
+    select_v1(MatchSpec, Limit, Cont).
+
 -spec topics() -> list(emqx_types:topic()).
 topics() ->
     topics(get_schema_vsn()).
@@ -228,6 +247,15 @@ topics(v2) ->
 topics(v1) ->
     list_topics_v1().
 
+-spec stats(n_routes) -> non_neg_integer().
+stats(Item) ->
+    stats(get_schema_vsn(), Item).
+
+stats(v2, Item) ->
+    get_stats_v2(Item);
+stats(v1, Item) ->
+    get_stats_v1(Item).
+
 %% @doc Print routes to a topic
 -spec print_routes(emqx_types:topic()) -> ok.
 print_routes(Topic) ->
@@ -345,9 +373,17 @@ cleanup_routes_v1(Node) ->
         ]
     end).
 
+select_v1({MTopic, MDest}, Limit, undefined) ->
+    ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit);
+select_v1(_Spec, _Limit, Cont) ->
+    ets:select(Cont).
+
 list_topics_v1() ->
     list_route_tab_topics().
 
+get_stats_v1(n_routes) ->
+    emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0).
+
 list_route_tab_topics() ->
     mnesia:dirty_all_keys(?ROUTE_TAB).
 
@@ -436,11 +472,52 @@ get_dest_node({_, Node}) ->
 get_dest_node(Node) ->
     Node.
 
+select_v2(Spec, Limit, undefined) ->
+    Stream = mk_route_stream(Spec),
+    select_next(Limit, Stream);
+select_v2(_Spec, Limit, Stream) ->
+    select_next(Limit, Stream).
+
+select_next(N, Stream) ->
+    case emqx_utils_stream:take(N, Stream) of
+        {Routes, SRest} ->
+            {Routes, SRest};
+        Routes ->
+            {Routes, '$end_of_table'}
+    end.
+
+mk_route_stream(Spec) ->
+    emqx_utils_stream:chain(
+        mk_route_stream(route, Spec),
+        mk_route_stream(filter, Spec)
+    ).
+
+mk_route_stream(route, Spec) ->
+    emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end);
+mk_route_stream(filter, {MTopic, MDest}) ->
+    emqx_utils_stream:map(
+        fun routeidx_to_route/1,
+        emqx_utils_stream:ets(
+            fun
+                (undefined) ->
+                    MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)},
+                    ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1);
+                (Cont) ->
+                    ets:match_object(Cont)
+            end
+        )
+    ).
+
 list_topics_v2() ->
     Pat = #routeidx{entry = '$1'},
     Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)],
     list_route_tab_topics() ++ Filters.
 
+get_stats_v2(n_routes) ->
+    NTopics = emqx_maybe:define(ets:info(?ROUTE_TAB, size), 0),
+    NWildcards = emqx_maybe:define(ets:info(?ROUTE_TAB_FILTERS, size), 0),
+    NTopics + NWildcards.
+
 fold_routes_v2(FunName, FoldFun, AccIn) ->
     FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
     Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB),
@@ -449,6 +526,9 @@ fold_routes_v2(FunName, FoldFun, AccIn) ->
 mk_filtertab_fold_fun(FoldFun) ->
     fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
 
+routeidx_to_route(#routeidx{entry = M}) ->
+    match_to_route(M).
+
 match_to_route(M) ->
     #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
 

+ 1 - 6
apps/emqx/src/emqx_router_helper.erl

@@ -190,12 +190,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 stats_fun() ->
-    case ets:info(?ROUTE_TAB, size) of
-        undefined ->
-            ok;
-        Size ->
-            emqx_stats:setstat('topics.count', 'topics.max', Size)
-    end.
+    emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)).
 
 cleanup_routes(Node) ->
     emqx_router:cleanup_routes(Node).

+ 7 - 1
apps/emqx/src/emqx_trie_search.erl

@@ -98,7 +98,7 @@
 
 -module(emqx_trie_search).
 
--export([make_key/2, filter/1]).
+-export([make_key/2, make_pat/2, filter/1]).
 -export([match/2, matches/3, get_id/1, get_topic/1]).
 -export_type([key/1, word/0, words/0, nextf/0, opts/0]).
 
@@ -127,6 +127,12 @@ make_key(Topic, ID) when is_binary(Topic) ->
 make_key(Words, ID) when is_list(Words) ->
     {Words, {ID}}.
 
+-spec make_pat(emqx_types:topic() | words() | '_', _ID | '_') -> _Pat.
+make_pat(Pattern = '_', ID) ->
+    {Pattern, {ID}};
+make_pat(Topic, ID) ->
+    make_key(Topic, ID).
+
 %% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter.
 -spec filter(emqx_types:topic()) -> words() | false.
 filter(Topic) ->

+ 9 - 0
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -35,6 +35,13 @@
     b2i/1
 ]).
 
+-export([
+    parse_pager_params/1,
+    parse_qstring/2,
+    init_query_result/0,
+    accumulate_query_rows/4
+]).
+
 -ifdef(TEST).
 -export([paginate_test_format/1]).
 -endif.
@@ -444,6 +451,8 @@ accumulate_query_rows(
                 count => Count + length(SubRows),
                 rows => [{Node, SubRows} | RowsAcc]
             }};
+        NCursor when NCursor >= PageEnd + Limit ->
+            {enough, ResultAcc#{cursor => NCursor}};
         NCursor when NCursor >= PageEnd ->
             SubRows = lists:sublist(Rows, Limit - Count),
             {enough, ResultAcc#{

+ 64 - 38
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -18,7 +18,6 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("emqx/include/emqx_router.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
@@ -37,8 +36,6 @@
     topic/2
 ]).
 
--export([qs2ms/2, format/1]).
-
 -define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
 
 -define(TOPICS_QUERY_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]).
@@ -110,23 +107,15 @@ topic(get, #{bindings := Bindings}) ->
 %%%==============================================================================================
 %% api apply
 do_list(Params) ->
-    case
-        emqx_mgmt_api:node_query(
-            node(),
-            ?ROUTE_TAB,
-            Params,
-            ?TOPICS_QUERY_SCHEMA,
-            fun ?MODULE:qs2ms/2,
-            fun ?MODULE:format/1
-        )
-    of
-        {error, page_limit_invalid} ->
-            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
-        {error, Node, Error} ->
-            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
-            {500, #{code => <<"NODE_DOWN">>, message => Message}};
-        Response ->
-            {200, Response}
+    try
+        Pager = parse_pager_params(Params),
+        {_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
+        QState = Pager#{continuation => undefined},
+        QResult = eval_topic_query(qs2ms(Query), QState),
+        {200, format_list_response(Pager, QResult)}
+    catch
+        throw:{error, page_limit_invalid} ->
+            {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}
     end.
 
 lookup(#{topic := Topic}) ->
@@ -140,27 +129,64 @@ lookup(#{topic := Topic}) ->
 
 %%%==============================================================================================
 %% internal
--spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
-qs2ms(_Tab, {Qs, _}) ->
+
+parse_pager_params(Params) ->
+    try emqx_mgmt_api:parse_pager_params(Params) of
+        Pager = #{} ->
+            Pager;
+        false ->
+            throw({error, page_limit_invalid})
+    catch
+        error:badarg ->
+            throw({error, page_limit_invalid})
+    end.
+
+-spec qs2ms({list(), list()}) -> tuple().
+qs2ms({Qs, _}) ->
+    lists:foldl(fun gen_match_spec/2, {'_', '_'}, Qs).
+
+gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) ->
+    case emqx_topic:parse(QTopic) of
+        {#share{group = Group, topic = Topic}, _SubOpts} ->
+            {Topic, {Group, MNode}};
+        {Topic, _SubOpts} ->
+            {Topic, MNode}
+    end;
+gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) ->
+    {MTopic, QNode}.
+
+eval_topic_query(MS, QState) ->
+    finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())).
+
+eval_topic_query(MS, QState, QResult) ->
+    QPage = eval_topic_query_page(MS, QState),
+    case QPage of
+        {Rows, '$end_of_table'} ->
+            {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
+            NQResult#{complete => true};
+        {Rows, NCont} ->
+            {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
+            eval_topic_query(MS, QState#{continuation := NCont}, NQResult);
+        '$end_of_table' ->
+            QResult#{complete => true}
+    end.
+
+eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) ->
+    emqx_router:select(MS, Limit, Cont).
+
+finalize_query(QResult = #{overflow := Overflow, complete := Complete}) ->
+    HasNext = Overflow orelse not Complete,
+    QResult#{hasnext => HasNext}.
+
+format_list_response(Meta, _QResult = #{hasnext := HasNext, rows := RowsAcc, cursor := Cursor}) ->
     #{
-        match_spec => gen_match_spec(Qs, [{{route, '_', '_'}, [], ['$_']}]),
-        fuzzy_fun => undefined
+        meta => Meta#{hasnext => HasNext, count => Cursor},
+        data => lists:flatmap(
+            fun({_Node, Rows}) -> [format(R) || R <- Rows] end,
+            RowsAcc
+        )
     }.
 
-gen_match_spec([], Res) ->
-    Res;
-gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) ->
-    {T, D} =
-        case emqx_topic:parse(T0) of
-            {#share{group = Group, topic = Topic}, _SubOpts} ->
-                {Topic, {Group, Node}};
-            {T1, _SubOpts} ->
-                {T1, Node}
-        end,
-    gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]);
-gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
-    gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
-
 format(#route{topic = Topic, dest = {Group, Node}}) ->
     #{topic => ?SHARE(Group, Topic), node => Node};
 format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->

+ 131 - 0
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -0,0 +1,131 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_utils_stream).
+
+%% Constructors / Combinators
+-export([
+    empty/0,
+    list/1,
+    map/2,
+    chain/2
+]).
+
+%% Evaluating
+-export([
+    next/1,
+    take/2,
+    consume/1
+]).
+
+%% Streams from ETS tables
+-export([
+    ets/1
+]).
+
+-export_type([stream/1]).
+
+%% @doc A stream is essentially a lazy list.
+-type stream(T) :: fun(() -> next(T) | []).
+-type next(T) :: nonempty_improper_list(T, stream(T)).
+
+-dialyzer(no_improper_lists).
+
+%%
+
+-spec empty() -> stream(none()).
+empty() ->
+    fun() -> [] end.
+
+-spec list([T]) -> stream(T).
+list([]) ->
+    empty();
+list([X | Rest]) ->
+    fun() -> [X | list(Rest)] end.
+
+-spec map(fun((X) -> Y), stream(X)) -> stream(Y).
+map(F, S) ->
+    fun() ->
+        case next(S) of
+            [X | Rest] ->
+                [F(X) | map(F, Rest)];
+            [] ->
+                []
+        end
+    end.
+
+-spec chain(stream(X), stream(Y)) -> stream(X | Y).
+chain(SFirst, SThen) ->
+    fun() ->
+        case next(SFirst) of
+            [X | SRest] ->
+                [X | chain(SRest, SThen)];
+            [] ->
+                next(SThen)
+        end
+    end.
+
+%%
+
+-spec next(stream(T)) -> next(T) | [].
+next(S) ->
+    S().
+
+-spec take(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T].
+take(N, S) ->
+    take(N, S, []).
+
+take(0, S, Acc) ->
+    {lists:reverse(Acc), S};
+take(N, S, Acc) ->
+    case next(S) of
+        [X | SRest] ->
+            take(N - 1, SRest, [X | Acc]);
+        [] ->
+            lists:reverse(Acc)
+    end.
+
+-spec consume(stream(T)) -> [T].
+consume(S) ->
+    case next(S) of
+        [X | SRest] ->
+            [X | consume(SRest)];
+        [] ->
+            []
+    end.
+
+%%
+
+-type select_result(Record, Cont) ::
+    {[Record], Cont}
+    | {[Record], '$end_of_table'}
+    | '$end_of_table'.
+
+-spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record).
+ets(ContF) ->
+    ets(undefined, ContF).
+
+ets(Cont, ContF) ->
+    fun() ->
+        case ContF(Cont) of
+            {Records, '$end_of_table'} ->
+                next(list(Records));
+            {Records, NCont} ->
+                next(chain(list(Records), ets(NCont, ContF)));
+            '$end_of_table' ->
+                []
+        end
+    end.

+ 75 - 0
apps/emqx_utils/test/emqx_utils_stream_tests.erl

@@ -0,0 +1,75 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_utils_stream_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+empty_test() ->
+    S = emqx_utils_stream:empty(),
+    ?assertEqual([], emqx_utils_stream:next(S)).
+
+empty_consume_test() ->
+    S = emqx_utils_stream:empty(),
+    ?assertEqual([], emqx_utils_stream:consume(S)).
+
+chain_empties_test() ->
+    S = emqx_utils_stream:chain(
+        emqx_utils_stream:empty(),
+        emqx_utils_stream:empty()
+    ),
+    ?assertEqual([], emqx_utils_stream:next(S)).
+
+chain_list_test() ->
+    S = emqx_utils_stream:chain(
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6])
+    ),
+    ?assertEqual(
+        [1, 2, 3, 4, 5, 6],
+        emqx_utils_stream:consume(S)
+    ).
+
+chain_take_test() ->
+    S = emqx_utils_stream:chain(
+        emqx_utils_stream:list([1, 2, 3]),
+        emqx_utils_stream:list([4, 5, 6, 7, 8])
+    ),
+    ?assertMatch(
+        {[1, 2, 3, 4, 5], _SRest},
+        emqx_utils_stream:take(5, S)
+    ),
+    {_, SRest} = emqx_utils_stream:take(5, S),
+    ?assertEqual(
+        [6, 7, 8],
+        emqx_utils_stream:take(5, SRest)
+    ).
+
+chain_list_map_test() ->
+    S = emqx_utils_stream:map(
+        fun integer_to_list/1,
+        emqx_utils_stream:chain(
+            emqx_utils_stream:list([1, 2, 3]),
+            emqx_utils_stream:chain(
+                emqx_utils_stream:empty(),
+                emqx_utils_stream:list([4, 5, 6])
+            )
+        )
+    ),
+    ?assertEqual(
+        ["1", "2", "3", "4", "5", "6"],
+        emqx_utils_stream:consume(S)
+    ).