Quellcode durchsuchen

Add test cases and fix bugs

zhouzb vor 6 Jahren
Ursprung
Commit
c5ad674dd7

+ 5 - 0
src/emqx_broker_helper.erl

@@ -43,6 +43,11 @@
         , code_change/3
         ]).
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
 -define(HELPER, ?MODULE).
 -define(SUBID, emqx_subid).
 -define(SUBMON, emqx_submon).

+ 2 - 2
src/emqx_flapping.erl

@@ -149,9 +149,9 @@ handle_cast({detected, Flapping = #flapping{clientid   = ClientId,
             ets:insert(?FLAPPING_TAB, BannedFlapping);
         false ->
             ?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
-                 [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval]),
-            ets:delete_object(?FLAPPING_TAB, Flapping)
+                 [ClientId, esockd_net:ntoa(PeerHost), DetectCnt, Interval])
     end,
+    ets:delete_object(?FLAPPING_TAB, Flapping),
     {noreply, State};
 
 handle_cast(Msg, State) ->

+ 1 - 1
src/emqx_metrics.erl

@@ -368,7 +368,7 @@ init([]) ->
                           Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
                           true = ets:insert(?TAB, Metric),
                           ok = counters:put(CRef, Idx, 0)
-                  end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),
+                  end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS),
     {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
 
 handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->

+ 5 - 0
src/emqx_vm.erl

@@ -47,6 +47,11 @@
         , get_port_info/1
         ]).
 
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
 -export([cpu_util/0]).
 
 -define(UTIL_ALLOCATORS, [temp_alloc,

+ 163 - 162
test/emqx_broker_SUITE.erl

@@ -29,24 +29,8 @@
 
 all() -> emqx_ct:all(?MODULE).
 
-groups() ->
-    [{pubsub, [sequence],
-      [t_sub_unsub,
-       t_publish,
-       t_pubsub,
-       t_shared_subscribe,
-       t_dispatch_with_no_sub,
-       't_pubsub#',
-       't_pubsub+'
-      ]},
-     {metrics, [sequence],
-      [inc_dec_metric]},
-     {stats, [sequence],
-      [set_get_stat]
-     }].
-
 init_per_suite(Config) ->
-    emqx_ct_helpers:boot_modules([router, broker]),
+    emqx_ct_helpers:boot_modules(all),
     emqx_ct_helpers:start_apps([]),
     Config.
 
@@ -57,156 +41,173 @@ end_per_suite(_Config) ->
 %% PubSub Test
 %%--------------------------------------------------------------------
 
-t_sub_unsub(_) ->
-    ok = emqx_broker:subscribe(<<"topic">>, <<"clientId">>),
-    ok = emqx_broker:subscribe(<<"topic/1">>, <<"clientId">>, #{qos => 1}),
-    ok = emqx_broker:subscribe(<<"topic/2">>, <<"clientId">>, #{qos => 2}),
-    true = emqx_broker:subscribed(<<"clientId">>, <<"topic">>),
-    Topics = emqx_broker:topics(),
-    lists:foreach(fun(Topic) ->
-                      ?assert(lists:member(Topic, Topics))
-                  end, Topics),
-    ok = emqx_broker:unsubscribe(<<"topic">>),
-    ok = emqx_broker:unsubscribe(<<"topic/1">>),
-    ok = emqx_broker:unsubscribe(<<"topic/2">>).
-
-t_publish(_) ->
-    Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
-    ok = emqx_broker:subscribe(<<"test/+">>),
-    timer:sleep(10),
-    emqx_broker:publish(Msg),
-    ?assert(receive {deliver, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 100 -> false end).
-
-t_dispatch_with_no_sub(_) ->
-    Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
-    Delivery = #delivery{sender = self(), message = Msg},
-    ?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}],
-                 emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
-
-t_pubsub(_) ->
-    true = emqx:is_running(node()),
-    Self = self(),
-    Subscriber = <<"clientId">>,
-    ok = emqx_broker:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }),
-    #{qos := 1} = ets:lookup_element(emqx_suboption, {Self, <<"a/b/c">>}, 2),
-    #{qos := 1} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
-    true = emqx_broker:set_subopts(<<"a/b/c">>, #{qos => 0}),
-    #{qos := 0} = emqx_broker:get_subopts(Subscriber, <<"a/b/c">>),
-    ok = emqx_broker:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }),
-    %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
-    timer:sleep(10),
-    [Self] = emqx_broker:subscribers(<<"a/b/c">>),
-    emqx_broker:publish(
-      emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(
-        receive {deliver, <<"a/b/c">>, _ } ->
-                true;
-            P ->
-                ct:log("Receive Message: ~p~n",[P])
-        after 100 ->
-            false
-        end),
-    spawn(fun() ->
-            emqx_broker:subscribe(<<"a/b/c">>),
-            emqx_broker:subscribe(<<"c/d/e">>),
-            timer:sleep(10),
-            emqx_broker:unsubscribe(<<"a/b/c">>)
-          end),
-    timer:sleep(20),
-    emqx_broker:unsubscribe(<<"a/b/c">>).
-
-t_shared_subscribe(_) ->
-    emqx_broker:subscribe(<<"$share/group2/topic2">>),
-    emqx_broker:subscribe(<<"$queue/topic3">>),
-    timer:sleep(10),
-    ct:pal("Share subscriptions: ~p",
-           [emqx_broker:subscriptions(self())]),
-    ?assertEqual(2, length(emqx_broker:subscriptions(self()))),
-    emqx_broker:unsubscribe(<<"$share/group2/topic2">>),
-    emqx_broker:unsubscribe(<<"$queue/topic3">>),
-    ?assertEqual(0, length(emqx_broker:subscriptions(self()))).
-
-'t_pubsub#'(_) ->
-    emqx_broker:subscribe(<<"a/#">>),
-    timer:sleep(10),
-    emqx_broker:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(receive {deliver, <<"a/#">>, _} -> true after 100 -> false end),
-    emqx_broker:unsubscribe(<<"a/#">>).
-
-'t_pubsub+'(_) ->
-    emqx_broker:subscribe(<<"a/+/+">>),
-    timer:sleep(10), %% TODO: why sleep?
-    emqx_broker:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(receive {deliver, <<"a/+/+">>, _} -> true after 100 -> false end),
-    emqx_broker:unsubscribe(<<"a/+/+">>).
-
-%%--------------------------------------------------------------------
-%% Metric Group
-%%--------------------------------------------------------------------
-
-inc_dec_metric(_) ->
-    emqx_metrics:inc('messages.retained', 10),
-    emqx_metrics:dec('messages.retained', 10).
-
-%%--------------------------------------------------------------------
-%% Stats Group
-%%--------------------------------------------------------------------
-
-set_get_stat(_) ->
-    emqx_stats:setstat('retained.max', 99),
-    ?assertEqual(99, emqx_stats:getstat('retained.max')).
-
-
-t_dispatch(_) ->
-    error('TODO').
-
-t_subscriber_down(_) ->
-    error('TODO').
-
-t_get_subopts(_) ->
-    error('TODO').
-
-t_set_subopts(_) ->
-    error('TODO').
+t_subscribed(_) ->
+    emqx_broker:subscribe(<<"topic">>),
+    ?assertEqual(false, emqx_broker:subscribed(undefined, <<"topic">>)),
+    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
+    emqx_broker:unsubscribe(<<"topic">>).
+
+t_subscribed_2(_) ->
+    emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
+    ?assertEqual(true, emqx_broker:subscribed(<<"clientid">>, <<"topic">>)),
+    ?assertEqual(true, emqx_broker:subscribed(self(), <<"topic">>)),
+    emqx_broker:unsubscribe(<<"topic">>).
+
+t_subopts(_) ->
+    ?assertEqual(false, emqx_broker:set_subopts(<<"topic">>, #{qos => 1})),
+    ?assertEqual(undefined, emqx_broker:get_subopts(self(), <<"topic">>)),
+    ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)),
+    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
+    ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
+    ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),
+    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}),
+    ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
+    ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})),
+    ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
+    emqx_broker:unsubscribe(<<"topic">>).
 
 t_topics(_) ->
-    error('TODO').
+    Topics = [<<"topic">>, <<"topic/1">>, <<"topic/2">>],
+    ok = emqx_broker:subscribe(lists:nth(1, Topics), <<"clientId">>),
+    ok = emqx_broker:subscribe(lists:nth(2, Topics), <<"clientId">>),
+    ok = emqx_broker:subscribe(lists:nth(3, Topics), <<"clientId">>),
+    Topics1 = emqx_broker:topics(),
+    ?assertEqual(true, lists:foldl(fun(Topic, Acc) ->
+                                       case lists:member(Topic, Topics1) of
+                                           true -> Acc;
+                                           false -> false
+                                       end
+                                   end, true, Topics)),
+    emqx_broker:unsubscribe(lists:nth(1, Topics)),
+    emqx_broker:unsubscribe(lists:nth(2, Topics)),
+    emqx_broker:unsubscribe(lists:nth(3, Topics)).
 
-t_stats_fun(_) ->
-    error('TODO').
-
-t_init(_) ->
-    error('TODO').
-
-t_handle_call(_) ->
-    error('TODO').
-
-t_handle_cast(_) ->
-    error('TODO').
-
-t_handle_info(_) ->
-    error('TODO').
-
-t_terminate(_) ->
-    error('TODO').
-
-t_code_change(_) ->
-    error('TODO').
-
-t_safe_publish(_) ->
-    error('TODO').
-
-t_subscribed(_) ->
-    error('TODO').
+t_subscribers(_) ->
+    emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
+    ?assertEqual([self()], emqx_broker:subscribers(<<"topic">>)),
+    emqx_broker:unsubscribe(<<"topic">>).
 
 t_subscriptions(_) ->
-    error('TODO').
+    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}),
+    ?assertEqual(#{qos => 1, subid => <<"clientid">>},
+                 proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))),
+    ?assertEqual(#{qos => 1, subid => <<"clientid">>},
+                 proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))),
+    emqx_broker:unsubscribe(<<"topic">>).
+
+t_sub_pub(_) ->
+    ok = emqx_broker:subscribe(<<"topic">>),
+    ct:sleep(10),
+    emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
+    ?assert(
+        receive
+            {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
+                true;
+            _ ->
+                false
+        after 100 ->
+            false
+        end).
 
-t_subscribers(_) ->
-    error('TODO').
+t_nosub_pub(_) ->
+    ?assertEqual(0, emqx_metrics:val('messages.dropped')),
+    emqx_broker:publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
+    ?assertEqual(1, emqx_metrics:val('messages.dropped')).
 
-t_unsubscribe(_) ->
-    error('TODO').
+t_shared_subscribe(_) ->
+    emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{share => <<"group">>}),
+    ct:sleep(10),
+    emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
+    ?assert(receive
+                {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
+                    true;
+                Msg ->
+                    ct:pal("Msg: ~p", [Msg]),
+                    false
+            after 100 ->
+                false
+            end),
+    emqx_broker:unsubscribe(<<"$share/group/topic">>).
+
+t_shared_subscribe_2(_) ->
+    {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
+    {ok, _} = emqtt:connect(ConnPid),
+    {ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),
+
+    {ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
+    {ok, _} = emqtt:connect(ConnPid2),
+    {ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group2/topic">>, 0),
+
+    ct:sleep(10),
+    ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
+    Msgs = recv_msgs(2),
+    ?assertEqual(2, length(Msgs)),
+    ?assertEqual(true, lists:foldl(fun(#{payload := <<"hello">>, topic := <<"topic">>}, Acc) ->
+                                       Acc;
+                                      (_, _) ->
+                                       false
+                                   end, true, Msgs)),
+    emqtt:disconnect(ConnPid),
+    emqtt:disconnect(ConnPid2).
+
+t_shared_subscribe_3(_) ->
+    {ok, ConnPid} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid">>}]),
+    {ok, _} = emqtt:connect(ConnPid),
+    {ok, _, [0]} = emqtt:subscribe(ConnPid, <<"$share/group/topic">>, 0),
+
+    {ok, ConnPid2} = emqtt:start_link([{clean_start, true}, {clientid, <<"clientid2">>}]),
+    {ok, _} = emqtt:connect(ConnPid2),
+    {ok, _, [0]} = emqtt:subscribe(ConnPid2, <<"$share/group/topic">>, 0),
+
+    ct:sleep(10),
+    ok = emqtt:publish(ConnPid, <<"topic">>, <<"hello">>, 0),
+    Msgs = recv_msgs(2),
+    ?assertEqual(1, length(Msgs)),
+    emqtt:disconnect(ConnPid),
+    emqtt:disconnect(ConnPid2).
+
+t_shard(_) ->
+    ok = meck:new(emqx_broker_helper, [passthrough, no_history]),
+    ok = meck:expect(emqx_broker_helper, get_sub_shard, fun(_, _) -> 1 end),
+    emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
+    ct:sleep(10),
+    emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
+    ?assert(
+        receive
+            {deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
+                true;
+            _ ->
+                false
+        after 100 ->
+            false
+        end),
+    ok = meck:unload(emqx_broker_helper).
 
-t_subscribe(_) ->
-    error('TODO').
+t_stats_fun(_) ->
+    ?assertEqual(0, emqx_stats:getstat('subscribers.count')),
+    ?assertEqual(0, emqx_stats:getstat('subscriptions.count')),
+    ?assertEqual(0, emqx_stats:getstat('suboptions.count')),
+    ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
+    ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
+    emqx_broker:stats_fun(),
+    ct:sleep(10),
+    ?assertEqual(2, emqx_stats:getstat('subscribers.count')),
+    ?assertEqual(2, emqx_stats:getstat('subscribers.max')),
+    ?assertEqual(2, emqx_stats:getstat('subscriptions.count')),
+    ?assertEqual(2, emqx_stats:getstat('subscriptions.max')),
+    ?assertEqual(2, emqx_stats:getstat('suboptions.count')),
+    ?assertEqual(2, emqx_stats:getstat('suboptions.max')).
+
+recv_msgs(Count) ->
+    recv_msgs(Count, []).
+
+recv_msgs(0, Msgs) ->
+    Msgs;
+recv_msgs(Count, Msgs) ->
+    receive
+        {publish, Msg} ->
+            recv_msgs(Count-1, [Msg|Msgs]);
+        _Other -> recv_msgs(Count, Msgs)
+    after 100 ->
+        Msgs
+    end.

+ 39 - 35
test/emqx_broker_helper_SUITE.erl

@@ -23,48 +23,52 @@
 
 all() -> emqx_ct:all(?MODULE).
 
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
 init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(_TestCase, Config) ->
     Config.
 
-t_start_link(_) ->
-    error('TODO').
-
 t_lookup_subid(_) ->
-    error('TODO').
-
-t_create_seq(_) ->
-    error('TODO').
-
-t_init(_) ->
-    error('TODO').
-
-t_handle_call(_) ->
-    error('TODO').
-
-t_handle_cast(_) ->
-    error('TODO').
-
-t_handle_info(_) ->
-    error('TODO').
-
-t_terminate(_) ->
-    error('TODO').
-
-t_code_change(_) ->
-    error('TODO').
+    ?assertEqual(undefined, emqx_broker_helper:lookup_subid(self())),
+    emqx_broker_helper:register_sub(self(), <<"clientid">>),
+    ct:sleep(10),
+    ?assertEqual(<<"clientid">>, emqx_broker_helper:lookup_subid(self())).
 
 t_lookup_subpid(_) ->
-    error('TODO').
-
-t_reclaim_seq(_) ->
-    error('TODO').
-
-t_get_sub_shard(_) ->
-    error('TODO').
-
+    ?assertEqual(undefined, emqx_broker_helper:lookup_subpid(<<"clientid">>)),
+    emqx_broker_helper:register_sub(self(), <<"clientid">>),
+    ct:sleep(10),
+    ?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)).
+    
 t_register_sub(_) ->
-    error('TODO').
-
+    ok = emqx_broker_helper:register_sub(self(), <<"clientid">>),
+    ct:sleep(10),
+    ok = emqx_broker_helper:register_sub(self(), <<"clientid">>),
+    try emqx_broker_helper:register_sub(self(), <<"clientid2">>) of
+        _ -> ct:fail(should_throw_error)
+    catch error:Reason ->
+        ?assertEqual(Reason, subid_conflict)
+    end,
+    ?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)).
+
+t_shard_seq(_) ->
+    ?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)),
+    emqx_broker_helper:create_seq(<<"topic">>),
+    ?assertEqual([{<<"topic">>, 1}], ets:lookup(emqx_subseq, <<"topic">>)),
+    emqx_broker_helper:reclaim_seq(<<"topic">>),
+    ?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)).
+
+t_shards_num(_) ->
+    ?assertEqual(emqx_vm:schedulers() * 32, emqx_broker_helper:shards_num()).
+    
+t_get_sub_shard(_) ->
+    ?assertEqual(0, emqx_broker_helper:get_sub_shard(self(), <<"topic">>)).

+ 45 - 32
test/emqx_cm_registry_SUITE.erl

@@ -21,44 +21,57 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
+%%--------------------------------------------------------------------
+%% CT callbacks
+%%--------------------------------------------------------------------
+
 all() -> emqx_ct:all(?MODULE).
 
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([]).
+
 init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(_TestCase, Config) ->
     Config.
 
-t_start_link(_) ->
-    error('TODO').
-
-t_init(_) ->
-    error('TODO').
-
-t_handle_call(_) ->
-    error('TODO').
-
-t_handle_cast(_) ->
-    error('TODO').
-
-t_handle_info(_) ->
-    error('TODO').
-
-t_terminate(_) ->
-    error('TODO').
-
-t_code_change(_) ->
-    error('TODO').
-
-t_lookup_channels(_) ->
-    error('TODO').
-
 t_is_enabled(_) ->
-    error('TODO').
-
-t_unregister_channel(_) ->
-    error('TODO').
-
-t_register_channel(_) ->
-    error('TODO').
-
+    application:set_env(emqx, enable_channel_registry, false),
+    ?assertEqual(false, emqx_cm_registry:is_enabled()),
+    application:set_env(emqx, enable_channel_registry, true),
+    ?assertEqual(true, emqx_cm_registry:is_enabled()).
+
+t_register_unregister_channel(_) ->
+    ClientId = <<"clientid">>,
+    application:set_env(emqx, enable_channel_registry, false),
+    emqx_cm_registry:register_channel(ClientId),
+    ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
+
+    application:set_env(emqx, enable_channel_registry, true),
+    emqx_cm_registry:register_channel(ClientId),
+    ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
+
+    application:set_env(emqx, enable_channel_registry, false),
+    emqx_cm_registry:unregister_channel(ClientId),
+    ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
+
+    application:set_env(emqx, enable_channel_registry, true),
+    emqx_cm_registry:unregister_channel(ClientId),
+    ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)).
+
+t_cleanup_channels(_) ->
+    ClientId = <<"clientid">>,
+    ClientId2 = <<"clientid2">>,
+    emqx_cm_registry:register_channel(ClientId),
+    emqx_cm_registry:register_channel(ClientId2),
+    ?assertEqual([self()], emqx_cm_registry:lookup_channels(ClientId)),
+    emqx_cm_registry ! {membership, {mnesia, down, node()}},
+    ct:sleep(100),
+    ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
+    ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)).

+ 59 - 19
test/emqx_metrics_SUITE.erl

@@ -24,24 +24,10 @@
 
 all() -> emqx_ct:all(?MODULE).
 
-t_val(_) ->
-    error('TODO').
-
-t_dec(_) ->
-    error('TODO').
-
-t_set(_) ->
-    error('TODO').
-
-t_commit(_) ->
-    error('TODO').
-
-t_inc(_) ->
-    error('TODO').
-
 t_new(_) ->
     with_metrics_server(
       fun() ->
+          ok = emqx_metrics:new('metrics.test'),
           ok = emqx_metrics:new('metrics.test'),
           0 = emqx_metrics:val('metrics.test'),
           ok = emqx_metrics:inc('metrics.test'),
@@ -86,16 +72,70 @@ t_inc_recv(_) ->
     with_metrics_server(
       fun() ->
         ok = emqx_metrics:inc_recv(?PACKET(?CONNECT)),
-        ?assertEqual(1, emqx_metrics:val('packets.received')),
-        ?assertEqual(1, emqx_metrics:val('packets.connect.received'))
+        ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(0, 0)),
+        ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(1, 0)),
+        ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(2, 0)),
+        ok = emqx_metrics:inc_recv(?PUBLISH_PACKET(3, 0)),
+        ok = emqx_metrics:inc_recv(?PACKET(?PUBACK)),
+        ok = emqx_metrics:inc_recv(?PACKET(?PUBREC)),
+        ok = emqx_metrics:inc_recv(?PACKET(?PUBREL)),
+        ok = emqx_metrics:inc_recv(?PACKET(?PUBCOMP)),
+        ok = emqx_metrics:inc_recv(?PACKET(?SUBSCRIBE)),
+        ok = emqx_metrics:inc_recv(?PACKET(?UNSUBSCRIBE)),
+        ok = emqx_metrics:inc_recv(?PACKET(?PINGREQ)),
+        ok = emqx_metrics:inc_recv(?PACKET(?DISCONNECT)),
+        ok = emqx_metrics:inc_recv(?PACKET(?AUTH)),
+        ignore = emqx_metrics:inc_recv(?PACKET(?RESERVED)),
+        ?assertEqual(15, emqx_metrics:val('packets.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.connect.received')),
+        ?assertEqual(4, emqx_metrics:val('messages.received')),
+        ?assertEqual(1, emqx_metrics:val('messages.qos0.received')),
+        ?assertEqual(1, emqx_metrics:val('messages.qos1.received')),
+        ?assertEqual(1, emqx_metrics:val('messages.qos2.received')),
+        ?assertEqual(4, emqx_metrics:val('packets.publish.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.puback.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.pubrec.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.pubrel.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.pubcomp.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.subscribe.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.unsubscribe.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.pingreq.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.disconnect.received')),
+        ?assertEqual(1, emqx_metrics:val('packets.auth.received'))
       end).
 
 t_inc_sent(_) ->
     with_metrics_server(
       fun() ->
         ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)),
-        ?assertEqual(1, emqx_metrics:val('packets.sent')),
-        ?assertEqual(1, emqx_metrics:val('packets.connack.sent'))
+        ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(0, 0)),
+        ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(1, 0)),
+        ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(2, 0)),
+        ok = emqx_metrics:inc_sent(?PUBACK_PACKET(0, 0)),
+        ok = emqx_metrics:inc_sent(?PUBREC_PACKET(3, 0)),
+        ok = emqx_metrics:inc_sent(?PACKET(?PUBREL)),
+        ok = emqx_metrics:inc_sent(?PACKET(?PUBCOMP)),
+        ok = emqx_metrics:inc_sent(?PACKET(?SUBACK)),
+        ok = emqx_metrics:inc_sent(?PACKET(?UNSUBACK)),
+        ok = emqx_metrics:inc_sent(?PACKET(?PINGRESP)),
+        ok = emqx_metrics:inc_sent(?PACKET(?DISCONNECT)),
+        ok = emqx_metrics:inc_sent(?PACKET(?AUTH)),
+        ?assertEqual(13, emqx_metrics:val('packets.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.connack.sent')),
+        ?assertEqual(3, emqx_metrics:val('messages.sent')),
+        ?assertEqual(1, emqx_metrics:val('messages.qos0.sent')),
+        ?assertEqual(1, emqx_metrics:val('messages.qos1.sent')),
+        ?assertEqual(1, emqx_metrics:val('messages.qos2.sent')),
+        ?assertEqual(3, emqx_metrics:val('packets.publish.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.puback.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.pubrec.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.pubrel.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.pubcomp.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.suback.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.unsuback.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.pingresp.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.disconnect.sent')),
+        ?assertEqual(1, emqx_metrics:val('packets.auth.sent'))
       end).
 
 t_trans(_) ->

+ 21 - 4
test/emqx_misc_SUITE.erl

@@ -64,7 +64,10 @@ t_pipeline(_) ->
             fun(_I, St)  -> {ok, St+1} end,
             fun(I, St)   -> {ok, I+1, St+1} end,
             fun(I, St)   -> {ok, I*2, St*2} end],
-    ?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)).
+    ?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)),
+    ?assertEqual({error, undefined, 1}, emqx_misc:pipeline([fun(_I) -> {error, undefined} end], 1, 1)),
+    ?assertEqual({error, undefined, 2}, emqx_misc:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)),
+    ?assertEqual({error, undefined, 1}, emqx_misc:pipeline([fun(_I, _St) -> erlang:error(undefined) end], 1, 1)).
 
 t_start_timer(_) ->
     TRef = emqx_misc:start_timer(1, tmsg),
@@ -75,7 +78,8 @@ t_start_timer(_) ->
 t_cancel_timer(_) ->
     Timer = emqx_misc:start_timer(0, foo),
     ok = emqx_misc:cancel_timer(Timer),
-    ?assertEqual([], drain()).
+    ?assertEqual([], drain()),
+    ok = emqx_misc:cancel_timer(undefined).
 
 t_proc_name(_) ->
     ?assertEqual(emqx_pool_1, emqx_misc:proc_name(emqx_pool, 1)).
@@ -84,7 +88,11 @@ t_proc_stats(_) ->
     Pid1 = spawn(fun() -> exit(normal) end),
     timer:sleep(10),
     ?assertEqual([], emqx_misc:proc_stats(Pid1)),
-    Pid2 = spawn(fun() -> timer:sleep(100) end),
+    Pid2 = spawn(fun() ->
+                     ?assertMatch([{mailbox_len, 0}|_], emqx_misc:proc_stats()),
+                     timer:sleep(200)
+                 end),
+    timer:sleep(10),
     Pid2 ! msg,
     timer:sleep(10),
     ?assertMatch([{mailbox_len, 1}|_], emqx_misc:proc_stats(Pid2)).
@@ -100,7 +108,16 @@ t_drain_down(_) ->
     {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
     {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
     timer:sleep(100),
-    ?assertEqual([Pid1, Pid2], emqx_misc:drain_down(2)).
+    ?assertEqual([Pid1, Pid2], emqx_misc:drain_down(2)),
+    ?assertEqual([], emqx_misc:drain_down(1)).
+
+t_index_of(_) ->
+    try emqx_misc:index_of(a, []) of
+        _ -> ct:fail(should_throw_error)
+    catch error:Reason ->
+        ?assertEqual(badarg, Reason)
+    end,
+    ?assertEqual(3, emqx_misc:index_of(a, [b, c, a, e, f])).
 
 drain() ->
     drain([]).

+ 128 - 118
test/emqx_pqueue_SUITE.erl

@@ -26,142 +26,152 @@
 
 all() -> emqx_ct:all(?SUITE).
 
-
 t_is_queue(_) ->
-    error('TODO').
+    Q = ?PQ:new(),
+    ?assertEqual(true, ?PQ:is_queue(Q)),
+    Q1 = ?PQ:in(a, 1, Q),
+    ?assertEqual(true, ?PQ:is_queue(Q1)),
+    ?assertEqual(false, ?PQ:is_queue(bad_queue)).
 
 t_is_empty(_) ->
-    error('TODO').
-
-t_to_list(_) ->
-    error('TODO').
-
-t_from_list(_) ->
-    error('TODO').
-
-t_in(_) ->
-    error('TODO').
-
-t_out_p(_) ->
-    error('TODO').
-
-t_join(_) ->
-    error('TODO').
-
-t_filter(_) ->
-    error('TODO').
+    Q = ?PQ:new(),
+    ?assertEqual(true, ?PQ:is_empty(Q)),
+    ?assertEqual(false, ?PQ:is_empty(?PQ:in(a, Q))).
 
-t_fold(_) ->
-    error('TODO').
+t_len(_) ->
+    Q = ?PQ:new(),
+    Q1 = ?PQ:in(a, Q),
+    ?assertEqual(1, ?PQ:len(Q1)),
+    Q2 = ?PQ:in(b, 1, Q1),
+    ?assertEqual(2, ?PQ:len(Q2)).
 
-t_highest(_) ->
-    error('TODO').
+t_plen(_) ->
+    Q = ?PQ:new(),
+    Q1 = ?PQ:in(a, Q),
+    ?assertEqual(1, ?PQ:plen(0, Q1)),
+    ?assertEqual(0, ?PQ:plen(1, Q1)),
+    Q2 = ?PQ:in(b, 1, Q1),
+    Q3 = ?PQ:in(c, 1, Q2),
+    ?assertEqual(2, ?PQ:plen(1, Q3)),
+    ?assertEqual(1, ?PQ:plen(0, Q3)),
+    ?assertEqual(0, ?PQ:plen(0, {pqueue, []})).
 
-t_out(_) ->
-    error('TODO').
+t_to_list(_) ->
+    Q = ?PQ:new(),
+    ?assertEqual([], ?PQ:to_list(Q)),
 
-t_len(_) ->
-    error('TODO').
+    Q1 = ?PQ:in(a, Q),
+    L1 = ?PQ:to_list(Q1),
+    ?assertEqual([{0, a}], L1),
 
-t_plen(_) ->
-    error('TODO').
+    Q2 = ?PQ:in(b, 1, Q1),
+    L2 = ?PQ:to_list(Q2),
+    ?assertEqual([{1, b}, {0, a}], L2).
 
-t_new(_) ->
-    error('TODO').
+t_from_list(_) ->
+    Q = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
+    ?assertEqual({pqueue, [{-1, {queue, [d], [c], 2}}, {0, {queue, [b], [a], 2}}]}, Q),
+    ?assertEqual(true, ?PQ:is_queue(Q)),
+    ?assertEqual(4, ?PQ:len(Q)).
 
-t_priority_queue_plen(_) ->
+t_in(_) ->
     Q = ?PQ:new(),
-    0 = ?PQ:plen(0, Q),
-    Q0 = ?PQ:in(z, Q),
-    1 = ?PQ:plen(0, Q0),
-    Q1 = ?PQ:in(x, 1, Q0),
-    1 = ?PQ:plen(1, Q1),
-    Q2 = ?PQ:in(y, 2, Q1),
-    1 = ?PQ:plen(2, Q2),
-    Q3 = ?PQ:in(z, 2, Q2),
-    2 = ?PQ:plen(2, Q3),
-    {_, Q4} = ?PQ:out(1, Q3),
-    0 = ?PQ:plen(1, Q4),
-    {_, Q5} = ?PQ:out(Q4),
-    1 = ?PQ:plen(2, Q5),
-    {_, Q6} = ?PQ:out(Q5),
-    0 = ?PQ:plen(2, Q6),
-    1 = ?PQ:len(Q6),
-    {_, Q7} = ?PQ:out(Q6),
-    0 = ?PQ:len(Q7).
-
-t_priority_queue_out2(_) ->
-    Els = [a, {b, 1}, {c, 1}, {d, 2}, {e, 2}, {f, 2}],
-    Q  = ?PQ:new(),
-    Q0 = lists:foldl(
+    Els = [a, b, {c, 1}, {d, 1}, {e, infinity}, {f, 2}],
+    Q1 = lists:foldl(
             fun({El, P}, Acc) ->
                     ?PQ:in(El, P, Acc);
                 (El, Acc) ->
                     ?PQ:in(El, Acc)
             end, Q, Els),
-    {Val, Q1} = ?PQ:out(Q0),
-    {value, d} = Val,
-    {Val1, Q2} = ?PQ:out(2, Q1),
-    {value, e} = Val1,
-    {Val2, Q3} = ?PQ:out(1, Q2),
-    {value, b} = Val2,
-    {Val3, Q4} = ?PQ:out(Q3),
-    {value, f} = Val3,
-    {Val4, Q5} = ?PQ:out(Q4),
-    {value, c} = Val4,
-    {Val5, Q6} = ?PQ:out(Q5),
-    {value, a} = Val5,
-    {empty, _Q7} = ?PQ:out(Q6).
-
-t_priority_queues(_) ->
-    Q0 = ?PQ:new(),
-    Q1 = ?PQ:new(),
-    PQueue = {pqueue, [{0, Q0}, {1, Q1}]},
-    ?assert(?PQ:is_queue(PQueue)),
-    [] = ?PQ:to_list(PQueue),
-
-    PQueue1 = ?PQ:in(a, 0, ?PQ:new()),
-    PQueue2 = ?PQ:in(b, 0, PQueue1),
-
-    PQueue3 = ?PQ:in(c, 1, PQueue2),
-    PQueue4 = ?PQ:in(d, 1, PQueue3),
-
-    4 = ?PQ:len(PQueue4),
-
-    [{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4),
-    PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
-
-    empty = ?PQ:highest(?PQ:new()),
-    0 = ?PQ:highest(PQueue1),
-    1 = ?PQ:highest(PQueue4),
-
-    PQueue5 = ?PQ:in(e, infinity, PQueue4),
-    PQueue6 = ?PQ:in(f, 1, PQueue5),
-
-    {{value, e}, PQueue7} = ?PQ:out(PQueue6),
-    {empty, _} = ?PQ:out(0, ?PQ:new()),
-
-    {empty, Q0} = ?PQ:out_p(Q0),
-
-    Q2 = ?PQ:in(a, Q0),
-    Q3 = ?PQ:in(b, Q2),
-    Q4 = ?PQ:in(c, Q3),
+    ?assertEqual({pqueue, [{infinity, {queue, [e], [], 1}},
+                           {-2, {queue, [f], [], 1}},
+                           {-1, {queue, [d], [c], 2}},
+                           {0, {queue, [b], [a], 2}}]}, Q1).
 
-    {{value, a, 0}, _Q5} = ?PQ:out_p(Q4),
-
-    {{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7),
-
-    Q4 = ?PQ:join(Q4, ?PQ:new()),
-    Q4 = ?PQ:join(?PQ:new(), Q4),
-
-    {queue, [a], [a], 2} = ?PQ:join(Q2, Q2),
+t_out(_) ->
+    Q = ?PQ:new(),
+    {empty, Q} = ?PQ:out(Q),
+    {empty, Q} = ?PQ:out(0, Q),
+    try ?PQ:out(1, Q) of
+        _ -> ct:fail(should_throw_error)
+    catch error:Reason ->
+        ?assertEqual(Reason, badarg)
+    end,
+    {{value, a}, Q} = ?PQ:out(?PQ:from_list([{0, a}])),
+    {{value, a}, {queue, [], [b], 1}} = ?PQ:out(?PQ:from_list([{0, a}, {0, b}])),
+    {{value, a}, {queue, [], [], 0}} = ?PQ:out({queue, [], [a], 1}),
+    {{value, a}, {queue, [c], [b], 2}} = ?PQ:out({queue, [c, b], [a], 3}),
+    {{value, a}, {queue, [e, d], [b, c], 4}} = ?PQ:out({queue, [e, d, c, b], [a], 5}),
+    {{value, a}, {queue, [c], [b], 2}} = ?PQ:out({queue, [c, b, a], [], 3}),
+    {{value, a}, {queue, [d, c], [b], 3}} = ?PQ:out({queue, [d, c], [a, b], 4}),
+    {{value, a}, {queue, [], [], 0}} = ?PQ:out(?PQ:from_list([{1, a}])),
+    {{value, a}, {queue, [c], [b], 2}} = ?PQ:out(?PQ:from_list([{1, a}, {0, b}, {0, c}])),
+    {{value, a}, {pqueue, [{-1, {queue, [b], [], 1}}]}} = ?PQ:out(?PQ:from_list([{1, b}, {2, a}])),
+    {{value, a}, {pqueue, [{-1, {queue, [], [b], 1}}]}} = ?PQ:out(?PQ:from_list([{1, a}, {1, b}])).
+
+t_out_2(_) ->
+    {empty, {pqueue, [{-1, {queue, [a], [], 1}}]}} = ?PQ:out(0, ?PQ:from_list([{1, a}])),
+    {{value, a}, {queue, [], [], 0}} = ?PQ:out(1, ?PQ:from_list([{1, a}])),
+    {{value, a}, {pqueue, [{-1, {queue, [], [b], 1}}]}} = ?PQ:out(1, ?PQ:from_list([{1, a}, {1, b}])),
+    {{value, a}, {queue, [b], [], 1}} = ?PQ:out(1, ?PQ:from_list([{1, a}, {0, b}])).
 
-    {pqueue,[{-1,{queue,[f],[d],2}},
-             {0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2),
+t_out_p(_) ->
+    {empty, {queue, [], [], 0}} = ?PQ:out_p(?PQ:new()),
+    {{value, a, 1}, {queue, [b], [], 1}} = ?PQ:out_p(?PQ:from_list([{1, a}, {0, b}])).
 
-    {pqueue,[{-1,{queue,[f],[d],2}},
-             {0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8),
+t_join(_) ->
+    Q = ?PQ:in(a, ?PQ:new()),
+    Q = ?PQ:join(Q, ?PQ:new()),
+    Q = ?PQ:join(?PQ:new(), Q),
+
+    Q1 = ?PQ:in(a, ?PQ:new()),
+    Q2 = ?PQ:in(b, Q1),
+    Q3 = ?PQ:in(c, Q2),
+    {queue,[c,b],[a],3} = Q3,
+
+    Q4 = ?PQ:in(x, ?PQ:new()),
+    Q5 = ?PQ:in(y, Q4),
+    Q6 = ?PQ:in(z, Q5),
+    {queue,[z,y],[x],3} = Q6,
+
+    {queue,[z,y],[a,b,c,x],6} = ?PQ:join(Q3, Q6),
+
+    PQueue1 = ?PQ:from_list([{1, c}, {1, d}]),
+    PQueue2 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
+    PQueue3 = ?PQ:from_list([{1, c}, {1, d}, {-1, a}, {-1, b}]),
+
+    {pqueue,[{-1,{queue,[d],[c],2}},
+             {0,{queue,[z,y],[x],3}}]} = ?PQ:join(PQueue1, Q6),
+    {pqueue,[{-1,{queue,[d],[c],2}},
+             {0,{queue,[z,y],[x],3}}]} = ?PQ:join(Q6, PQueue1),
+
+    {pqueue,[{-1,{queue,[d],[c],2}},
+             {0,{queue,[z,y],[a,b,x],5}}]} = ?PQ:join(PQueue2, Q6),
+    {pqueue,[{-1,{queue,[d],[c],2}},
+             {0,{queue,[b],[x,y,z,a],5}}]} = ?PQ:join(Q6, PQueue2),
+
+    {pqueue,[{-1,{queue,[d],[c],2}},
+             {0,{queue,[z,y],[x],3}},
+             {1,{queue,[b],[a],2}}]} = ?PQ:join(PQueue3, Q6),
+    {pqueue,[{-1,{queue,[d],[c],2}},
+             {0,{queue,[z,y],[x],3}},
+             {1,{queue,[b],[a],2}}]} = ?PQ:join(Q6, PQueue3),
+
+    PQueue4 = ?PQ:from_list([{1, c}, {1, d}]),
+    PQueue5 = ?PQ:from_list([{2, a}, {2, b}]),
+    {pqueue,[{-2,{queue,[b],[a],2}},
+             {-1,{queue,[d],[c],2}}]} = ?PQ:join(PQueue4, PQueue5).
 
-    {pqueue,[{-1,{queue,[f],[d,f,d],4}},
-             {0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8).
+t_filter(_) ->
+    {pqueue, [{-2, {queue, [10], [4], 2}},
+              {-1, {queue, [2], [], 1}}]} = 
+        ?PQ:filter(fun(V) when V rem 2 =:= 0 ->
+                       true;
+                      (_) ->
+                       false
+                   end, ?PQ:from_list([{0, 1}, {0, 3}, {1, 2}, {2, 4}, {2, 10}])).
 
+t_highest(_) ->
+    empty = ?PQ:highest(?PQ:new()),
+    0 = ?PQ:highest(?PQ:from_list([{0, a}, {0, b}])),
+    2 = ?PQ:highest(?PQ:from_list([{0, a}, {0, b}, {1, c}, {2, d}, {2, e}])).

+ 44 - 3
test/emqx_vm_SUITE.erl

@@ -104,7 +104,8 @@ t_load(_Config) ->
 
 t_systeminfo(_Config) ->
    Keys =  [Key || {Key, _} <- emqx_vm:get_system_info()],
-   ?SYSTEM_INFO = Keys.
+   ?SYSTEM_INFO = Keys,
+   ?assertEqual(undefined, emqx_vm:get_system_info(undefined)).
 
 t_mem_info(_Config) ->
     application:ensure_all_started(os_mon),
@@ -139,10 +140,19 @@ t_get_ets_info(_Config) ->
     ets:new(test, [named_table]),
     [] = emqx_vm:get_ets_info(test1),
     EtsInfo = emqx_vm:get_ets_info(test),
-    test = proplists:get_value(name, EtsInfo).
+    test = proplists:get_value(name, EtsInfo),
+    Tid = proplists:get_value(id, EtsInfo),
+    EtsInfos = emqx_vm:get_ets_info(),
+    ?assertEqual(true, lists:foldl(fun(Info, Acc) ->
+                           case proplists:get_value(id, Info) of
+                               Tid -> true;
+                               _ -> Acc
+                           end
+                       end, false, EtsInfos)).
 
 t_get_ets_object(_Config) ->
     ets:new(test, [named_table]),
+    [] = emqx_vm:get_ets_object(test),
     ets:insert(test, {k, v}),
     [{k, v}] = emqx_vm:get_ets_object(test).
 
@@ -150,7 +160,20 @@ t_get_port_types(_Config) ->
     emqx_vm:get_port_types().
 
 t_get_port_info(_Config) ->
-    emqx_vm:get_port_info().
+    emqx_vm:get_port_info(),
+    spawn(fun easy_server/0),
+    ct:sleep(100),
+    {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]),
+    emqx_vm:get_port_info(),
+    ok = gen_tcp:close(Sock),
+    [Port | _] = erlang:ports(),
+    [{connected, _}, {name, _}] = emqx_vm:port_info(Port, [connected, name]).
+
+t_transform_port(_Config) ->
+    [Port | _] = erlang:ports(),
+    ?assertEqual(Port, emqx_vm:transform_port(Port)),
+    <<131, 102, 100, NameLen:2/unit:8, _Name:NameLen/binary, N:4/unit:8, _Vsn:8>> = erlang:term_to_binary(Port),
+    ?assertEqual(Port, emqx_vm:transform_port("#Port<0." ++ integer_to_list(N) ++ ">")).
 
 t_scheduler_usage(_Config) ->
     emqx_vm:scheduler_usage(5000).
@@ -170,3 +193,21 @@ t_get_process_group_leader_info(_Config) ->
 t_get_process_limit(_Config) ->
     emqx_vm:get_process_limit().
 
+t_cpu_util(_Config) ->
+    ?assertEqual(0, emqx_vm:cpu_util()).
+
+easy_server() ->
+    {ok, LSock} = gen_tcp:listen(5678, [binary, {packet, 0}, {active, false}]),
+    {ok, Sock} = gen_tcp:accept(LSock),
+    ok = do_recv(Sock),
+    ok = gen_tcp:close(Sock),
+    ok = gen_tcp:close(LSock).
+
+do_recv(Sock) ->
+    case gen_tcp:recv(Sock, 0) of
+        {ok, _} ->
+            do_recv(Sock);
+        {error, closed} ->
+            ok
+    end.
+