Przeglądaj źródła

test(mgmt): cover emqx_mgmt_api:cluster_query

JianBo He 3 lat temu
rodzic
commit
6d9e1e0d7a

+ 35 - 2
apps/emqx/test/emqx_common_test_helpers.erl

@@ -519,21 +519,51 @@ ensure_quic_listener(Name, UdpPort) ->
 %% Clusterisation and multi-node testing
 %%
 
+-type cluster_spec() :: [node_spec()].
+-type node_spec() :: role() | {role(), shortname()} | {role(), shortname(), node_opts()}.
+-type role() :: core | replicant.
+-type shortname() :: atom().
+-type nodename() :: atom().
+-type node_opts() :: #{
+    %% Need to loaded apps. These apps will be loaded once the node started
+    load_apps => list(),
+    %% Need to started apps. It is the first arg passed to emqx_common_test_helpers:start_apps/2
+    apps => list(),
+    %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2
+    env_handler => fun((AppName :: atom()) -> term()),
+    %% Application env preset before calling `emqx_common_test_helpers:start_apps/2`
+    env => {AppName :: atom(), Key :: atom(), Val :: term()},
+    %% Whether to execute `emqx_config:init_load(SchemaMod)`
+    %% default: true
+    load_schema => boolean(),
+    %% Eval by emqx_config:put/2
+    conf => [{KeyPath :: list(), Val :: term()}],
+    %% Fast option to config listener port
+    %% default rule:
+    %% - tcp: base_port
+    %% - ssl: base_port + 1
+    %% - ws : base_port + 3
+    %% - wss: base_port + 4
+    listener_ports => [{Type :: tcp | ssl | ws | wss, inet:port_number()}]
+}.
+
+-spec emqx_cluster(cluster_spec()) -> [{shortname(), node_opts()}].
 emqx_cluster(Specs) ->
     emqx_cluster(Specs, #{}).
 
+-spec emqx_cluster(cluster_spec(), node_opts()) -> [{shortname(), node_opts()}].
 emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) ->
     emqx_cluster(Specs, maps:from_list(CommonOpts));
 emqx_cluster(Specs0, CommonOpts) ->
     Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
     Specs = expand_node_specs(Specs1, CommonOpts),
-    CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
-    %% Assign grpc ports:
+    %% Assign grpc ports
     GenRpcPorts = maps:from_list([
         {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}}
      || {{_, Name, _}, Num} <- Specs
     ]),
     %% Set the default node of the cluster:
+    CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
     JoinTo =
         case CoreNodes of
             [First | _] -> First;
@@ -554,6 +584,8 @@ emqx_cluster(Specs0, CommonOpts) ->
     ].
 
 %% Lower level starting API
+
+-spec start_slave(shortname(), node_opts()) -> nodename().
 start_slave(Name, Opts) ->
     {ok, Node} = ct_slave:start(
         list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
@@ -590,6 +622,7 @@ epmd_path() ->
 
 %% Node initialization
 
+-spec setup_node(nodename(), node_opts()) -> ok.
 setup_node(Node, Opts) when is_list(Opts) ->
     setup_node(Node, maps:from_list(Opts));
 setup_node(Node, Opts) when is_map(Opts) ->

+ 4 - 2
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -314,7 +314,9 @@ do_select(
             ?FRESH_SELECT ->
                 ets:select(Tab, Ms, Limit);
             _ ->
-                ets:select(Continuation)
+                %% XXX: Repair is necessary because we pass Continuation back
+                %% and forth through the nodes in the `do_cluster_query`
+                ets:select(ets:repair_continuation(Continuation, Ms))
         end,
     case Result of
         '$end_of_table' ->
@@ -508,7 +510,7 @@ format_query_result(
         %% queries that can be read
         meta => Meta#{count => Total},
         data => lists:flatten(
-            lists:foldr(
+            lists:foldl(
                 fun({Node, Rows}, Acc) ->
                     [lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc]
                 end,

+ 103 - 2
apps/emqx_management/test/emqx_mgmt_api_SUITE.erl

@@ -28,15 +28,116 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_mgmt_api_test_util:init_suite(),
     Config.
 
 end_per_suite(_) ->
-    emqx_mgmt_api_test_util:end_suite().
+    ok.
 
 %%--------------------------------------------------------------------
 %% cases
 %%--------------------------------------------------------------------
 
 t_cluster_query(_Config) ->
+    net_kernel:start(['master@127.0.0.1', longnames]),
+    ct:timetrap({seconds, 120}),
+    snabbkaffe:fix_ct_logging(),
+    [{Name, Opts}, {Name1, Opts1}] = cluster_specs(),
+    Node1 = emqx_common_test_helpers:start_slave(Name, Opts),
+    Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1),
+    try
+        process_flag(trap_exit, true),
+        ClientLs1 = [start_emqtt_client(Node1, I, 2883) || I <- lists:seq(1, 10)],
+        ClientLs2 = [start_emqtt_client(Node2, I, 3883) || I <- lists:seq(1, 10)],
+
+        %% returned list should be the same regardless of which node is requested
+        {200, ClientsAll} = query_clients(Node1, #{}),
+        ?assertEqual({200, ClientsAll}, query_clients(Node2, #{})),
+        ?assertMatch(
+            #{page := 1, limit := 100, count := 20},
+            maps:get(meta, ClientsAll)
+        ),
+        ?assertMatch(20, length(maps:get(data, ClientsAll))),
+        %% query the first page, counting in entire cluster
+        {200, ClientsPage1} = query_clients(Node1, #{<<"limit">> => 5}),
+        ?assertMatch(
+            #{page := 1, limit := 5, count := 20},
+            maps:get(meta, ClientsPage1)
+        ),
+        ?assertMatch(5, length(maps:get(data, ClientsPage1))),
+
+        %% assert: AllPage = Page1 + Page2 + Page3 + Page4
+        %% !!!Note: this equation requires that the queried tables must be ordered_set
+        {200, ClientsPage2} = query_clients(Node1, #{<<"page">> => 2, <<"limit">> => 5}),
+        {200, ClientsPage3} = query_clients(Node2, #{<<"page">> => 3, <<"limit">> => 5}),
+        {200, ClientsPage4} = query_clients(Node1, #{<<"page">> => 4, <<"limit">> => 5}),
+        GetClientIds = fun(L) -> lists:map(fun(#{clientid := Id}) -> Id end, L) end,
+        ?assertEqual(
+            GetClientIds(maps:get(data, ClientsAll)),
+            GetClientIds(
+                maps:get(data, ClientsPage1) ++ maps:get(data, ClientsPage2) ++
+                    maps:get(data, ClientsPage3) ++ maps:get(data, ClientsPage4)
+            )
+        ),
+
+        %% exact match can return non-zero total
+        {200, ClientsNode1} = query_clients(Node2, #{<<"username">> => <<"corenode1@127.0.0.1">>}),
+        ?assertMatch(
+            #{count := 10},
+            maps:get(meta, ClientsNode1)
+        ),
+
+        %% fuzzy searching can't return total
+        {200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}),
+        ?assertMatch(
+            #{count := 0},
+            maps:get(meta, ClientsNode2)
+        ),
+        ?assertMatch(10, length(maps:get(data, ClientsNode2))),
+
+        _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),
+        _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2)
+    after
+        emqx_common_test_helpers:stop_slave(Node1),
+        emqx_common_test_helpers:stop_slave(Node2)
+    end,
     ok.
+
+%%--------------------------------------------------------------------
+%% helpers
+%%--------------------------------------------------------------------
+
+cluster_specs() ->
+    Specs =
+        %% default listeners port
+        [
+            {core, corenode1, #{listener_ports => [{tcp, 2883}]}},
+            {core, corenode2, #{listener_ports => [{tcp, 3883}]}}
+        ],
+    CommOpts =
+        [
+            {env, [{emqx, boot_modules, all}]},
+            {apps, []},
+            {conf, [
+                {[listeners, ssl, default, enabled], false},
+                {[listeners, ws, default, enabled], false},
+                {[listeners, wss, default, enabled], false}
+            ]}
+        ],
+    emqx_common_test_helpers:emqx_cluster(
+        Specs,
+        CommOpts
+    ).
+
+start_emqtt_client(Node0, N, Port) ->
+    Node = atom_to_binary(Node0),
+    ClientId = iolist_to_binary([Node, "-", integer_to_binary(N)]),
+    {ok, C} = emqtt:start_link([{clientid, ClientId}, {username, Node}, {port, Port}]),
+    {ok, _} = emqtt:connect(C),
+    C.
+
+query_clients(Node, Qs0) ->
+    Qs = maps:merge(
+        #{<<"page">> => 1, <<"limit">> => 100},
+        Qs0
+    ),
+    rpc:call(Node, emqx_mgmt_api_clients, clients, [get, #{query_string => Qs}]).