Просмотр исходного кода

Improve test cases, and fix some bugs (#1920)

* Improve emqx_banned, emqx_pqueue, emqx_router test cases

* Improve emqx_broker test case, and fix bug in emqx_broker

* Add emqx_hooks to CT_SUITES
tigercl 7 лет назад
Родитель
Сommit
abb2e5c918

+ 1 - 1
Makefile

@@ -40,7 +40,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
 			emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
 			emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
 			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
-		 	emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge
+		 	emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks
 
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

+ 5 - 0
src/emqx_banned.erl

@@ -102,8 +102,13 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+-ifdef(TEST).
+ensure_expiry_timer(State) ->
+    State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}.
+-else.
 ensure_expiry_timer(State) ->
     State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
+-endif.
 
 expire_banned_items(Now) ->
     mnesia:foldl(fun

+ 4 - 2
src/emqx_broker.erl

@@ -260,9 +260,11 @@ subscription(Topic, Subscriber) ->
 
 -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
 subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
-    length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1;
+    {Match, _} = ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1),
+    length(Match) >= 1;
 subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
-    length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1;
+    {Match, _} = ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1),
+    length(Match) >= 1;
 subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
     ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
 

+ 10 - 6
test/emqx_banned_SUITE.erl

@@ -29,13 +29,17 @@ t_banned_all(_) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     emqx_banned:start_link(),
     TimeNow = erlang:system_time(second),
-    ok = emqx_banned:add(#banned{who = {client_id, <<"TestClient">>},
-                                 reason = <<"test">>,
-                                 by = <<"banned suite">>,
-                                 desc = <<"test">>,
-                                 until = TimeNow + 10}),
+    Banned = #banned{who    = {client_id, <<"TestClient">>},
+                     reason = <<"test">>,
+                     by     = <<"banned suite">>,
+                     desc   = <<"test">>,
+                     until  = TimeNow + 1},
+    ok = emqx_banned:add(Banned),
     % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed
-    timer:sleep(100),
+    ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
+    timer:sleep(2500),
+    ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
+    ok = emqx_banned:add(Banned),
     ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
     emqx_banned:del({client_id, <<"TestClient">>}),
     ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),

+ 11 - 2
test/emqx_broker_SUITE.erl

@@ -60,6 +60,11 @@ subscribe_unsubscribe(_) ->
     ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
     ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }),
     ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
+    true = emqx:subscribed(<<"topic">>, <<"clientId">>),
+    Topics = emqx:topics(),
+    lists:foreach(fun(Topic) -> 
+                      ?assert(lists:member(Topic, Topics))
+                  end, Topics),
     ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
     ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
     ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
@@ -72,12 +77,16 @@ publish(_) ->
     ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
 
 pubsub(_) ->
+    true = emqx:is_running(node()),
     Self = self(),
     Subscriber = {Self, <<"clientId">>},
     ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }),
-    #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    #{qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    #{qos := 1} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
+    true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}),
+    #{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
     ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
-    #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    #{qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
     %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
     timer:sleep(10),
     [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),

+ 55 - 1
test/emqx_pqueue_SUITE.erl

@@ -22,7 +22,7 @@
 
 -define(PQ, emqx_pqueue).
 
-all() -> [t_priority_queue_plen, t_priority_queue_out2].
+all() -> [t_priority_queue_plen, t_priority_queue_out2, t_priority_queues].
 
 t_priority_queue_plen(_) ->
     Q = ?PQ:new(),
@@ -67,3 +67,57 @@ t_priority_queue_out2(_) ->
     {Val5, Q6} = ?PQ:out(Q5),
     {value, a} = Val5,
     {empty, _Q7} = ?PQ:out(Q6).
+
+t_priority_queues(_) ->
+    Q0 = ?PQ:new(),
+    Q1 = ?PQ:new(),
+    PQueue = {pqueue, [{0, Q0}, {1, Q1}]},
+    ?assert(?PQ:is_queue(PQueue)),
+    [] = ?PQ:to_list(PQueue),
+
+    PQueue1 = ?PQ:in(a, 0, ?PQ:new()),
+    PQueue2 = ?PQ:in(b, 0, PQueue1),
+
+    PQueue3 = ?PQ:in(c, 1, PQueue2),
+    PQueue4 = ?PQ:in(d, 1, PQueue3),
+
+    4 = ?PQ:len(PQueue4),
+
+    [{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4),
+    PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
+    
+    empty = ?PQ:highest(?PQ:new()),
+    0 = ?PQ:highest(PQueue1),
+    1 = ?PQ:highest(PQueue4),
+
+    PQueue5 = ?PQ:in(e, infinity, PQueue4),
+    PQueue6 = ?PQ:in(f, 1, PQueue5),
+
+    {{value, e}, PQueue7} = ?PQ:out(PQueue6),
+    {empty, _} = ?PQ:out(0, ?PQ:new()), 
+
+    {empty, Q0} = ?PQ:out_p(Q0),
+
+    Q2 = ?PQ:in(a, Q0),
+    Q3 = ?PQ:in(b, Q2),
+    Q4 = ?PQ:in(c, Q3),
+
+    {{value, a, 0}, _Q5} = ?PQ:out_p(Q4),
+
+    {{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7),
+
+    Q4 = ?PQ:join(Q4, ?PQ:new()),
+    Q4 = ?PQ:join(?PQ:new(), Q4),
+
+    {queue, [a], [a], 2} = ?PQ:join(Q2, Q2),
+
+    {pqueue,[{-1,{queue,[f],[d],2}},
+             {0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2),
+
+    {pqueue,[{-1,{queue,[f],[d],2}},
+             {0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8),
+
+    {pqueue,[{-1,{queue,[f],[d,f,d],4}},
+             {0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8).
+
+

+ 14 - 6
test/emqx_router_SUITE.erl

@@ -29,7 +29,9 @@ all() ->
 groups() ->
     [{route, [sequence],
       [add_del_route,
-       match_routes]}].
+       match_routes,
+       has_routes,
+       router_add_del]}].
 
 init_per_suite(Config) ->
     emqx_ct_broker_helpers:run_setup_steps(),
@@ -81,6 +83,7 @@ match_routes(_) ->
 has_routes(_) ->
     From = {self(), make_ref()},
     ?R:add_route(From, <<"devices/+/messages">>, node()),
+    timer:sleep(200),
     ?assert(?R:has_routes(<<"devices/+/messages">>)).
 
 clear_tables() ->
@@ -88,28 +91,33 @@ clear_tables() ->
 
 router_add_del(_) ->
     ?R:add_route(<<"#">>),
-    ?R:add_route(<<"a/b/c">>),
+    ?R:add_route(<<"a/b/c">>, node()),
     ?R:add_route(<<"+/#">>),
     Routes = [R1, R2 | _] = [
             #route{topic = <<"#">>,     dest = node()},
             #route{topic = <<"+/#">>,   dest = node()},
             #route{topic = <<"a/b/c">>, dest = node()}],
+    timer:sleep(500),
     ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
 
+    ?R:print_routes(<<"a/b/c">>),
+
     %% Batch Add
     lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
     ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
 
     %% Del
-    ?R:del_route(<<"a/b/c">>),
-    [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)),
+    ?R:del_route(<<"a/b/c">>, node()),
+    timer:sleep(500),
+    [R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)),
     {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
 
     %% Batch Del
     R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'},
     ?R:add_route(R3),
-    ?R:del_route(R1),
+    ?R:del_route(<<"#">>),
     ?R:del_route(R2),
     ?R:del_route(R3),
-    [] = lists:sort(?R:match(<<"a/b/c">>)).
+    timer:sleep(500),
+    [] = lists:sort(?R:match_routes(<<"a/b/c">>)).