Просмотр исходного кода

Update test cases for emqx_connection (#3116)

张奇怪 6 лет назад
Родитель
Сommit
2cf3af12ef
2 измененных файлов с 277 добавлено и 256 удалено
  1. 2 0
      src/emqx_connection.erl
  2. 275 256
      test/emqx_connection_SUITE.erl

+ 2 - 0
src/emqx_connection.erl

@@ -136,6 +136,8 @@ info(sockstate, #state{sockstate = SockSt}) ->
     SockSt;
     SockSt;
 info(active_n, #state{active_n = ActiveN}) ->
 info(active_n, #state{active_n = ActiveN}) ->
     ActiveN;
     ActiveN;
+info(stats_timer, #state{stats_timer = Stats_timer}) ->
+    Stats_timer;
 info(limiter, #state{limiter = Limiter}) ->
 info(limiter, #state{limiter = Limiter}) ->
     maybe_apply(fun emqx_limiter:info/1, Limiter).
     maybe_apply(fun emqx_limiter:info/1, Limiter).
 
 

+ 275 - 256
test/emqx_connection_SUITE.erl

@@ -35,6 +35,10 @@ init_per_suite(Config) ->
     ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
     ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
     %% Meck Cm
     %% Meck Cm
     ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
     ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
+    %% Meck Limiter
+    ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
+    %% Meck Pd
+    ok = meck:new(emqx_pd, [passthrough, no_history, no_link]),
     %% Meck Metrics
     %% Meck Metrics
     ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
     ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
     ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
@@ -46,6 +50,8 @@ end_per_suite(_Config) ->
     ok = meck:unload(emqx_transport),
     ok = meck:unload(emqx_transport),
     ok = meck:unload(emqx_channel),
     ok = meck:unload(emqx_channel),
     ok = meck:unload(emqx_cm),
     ok = meck:unload(emqx_cm),
+    ok = meck:unload(emqx_limiter),
+    ok = meck:unload(emqx_pd),
     ok = meck:unload(emqx_metrics),
     ok = meck:unload(emqx_metrics),
     ok.
     ok.
 
 
@@ -72,6 +78,233 @@ end_per_testcase(_TestCase, Config) ->
 %% Test cases
 %% Test cases
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+t_info(_) ->
+    CPid = spawn(fun() ->
+                    receive 
+                        {'$gen_call', From, info} ->
+                            gen_server:reply(From, emqx_connection:info(st()))
+                    after
+                        0 -> error("error")
+                    end
+                end),
+    #{sockinfo := SockInfo} = emqx_connection:info(CPid),
+    ?assertMatch(#{active_n := 100,
+                    peername := {{127,0,0,1},3456},
+                    sockname := {{127,0,0,1},1883},
+                    sockstate := idle,
+                    socktype := tcp}, SockInfo).
+
+t_info_limiter(_) ->
+    St = st(#{limiter => emqx_limiter:init([])}),
+    ?assertEqual(undefined, emqx_connection:info(limiter, St)).
+
+t_stats(_) ->
+    CPid = spawn(fun() ->
+                        receive 
+                            {'$gen_call', From, stats} ->
+                                gen_server:reply(From, emqx_connection:stats(st()))
+                        after
+                            0 -> error("error")
+                        end
+                    end),
+    Stats = emqx_connection:stats(CPid),
+    ?assertMatch([{recv_oct,0},
+                    {recv_cnt,0},
+                    {send_oct,0},
+                    {send_cnt,0},
+                    {send_pend,0}| _] , Stats).
+
+t_process_msg(_) ->
+    with_conn(fun(CPid) -> 
+                        ok = meck:expect(emqx_channel, handle_in, 
+                                        fun(_Packet, Channel) -> 
+                                                {ok, Channel} 
+                                        end),
+                        CPid ! {incoming, ?PACKET(?PINGREQ)},
+                        CPid ! {incoming, undefined},
+                        CPid ! {tcp_passive, sock},
+                        CPid ! {tcp_closed, sock},
+                        timer:sleep(100),
+                        ok = trap_exit(CPid, {shutdown, tcp_closed})
+                end, #{trap_exit => true}).
+
+t_ensure_stats_timer(_) ->
+    NStats = emqx_connection:ensure_stats_timer(100, st()),
+    Stats_timer = emqx_connection:info(stats_timer, NStats),
+    ?assert(is_reference(Stats_timer)),
+    ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).
+
+t_cancel_stats_timer(_) ->
+    NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
+    Stats_timer = emqx_connection:info(stats_timer, NStats),
+    ?assertEqual(undefined, Stats_timer),
+    ?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)).
+
+t_append_msg(_) ->
+    ?assertEqual([msg], emqx_connection:append_msg([], [msg])),
+    ?assertEqual([msg], emqx_connection:append_msg([], msg)),
+    ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], [msg])),
+    ?assertEqual([msg1,msg], emqx_connection:append_msg([msg1], msg)).
+
+t_handle_msg(_) ->
+    From = {make_ref(), self()},
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({'$gen_call', From, for_testing}, st())),
+    ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())),
+    ?assertMatch({stop, {shutdown,discarded}, _St}, emqx_connection:handle_msg({'$gen_call', From, discard}, st())),
+    ?assertMatch({ok, [], _St}, emqx_connection:handle_msg({tcp, From, <<"for_testing">>}, st())),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg(for_testing, st())).
+
+t_handle_msg_incoming(_) ->
+    ?assertMatch({ok, _Out, _St}, emqx_connection:handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())),
+    ?assertEqual(ok, emqx_connection:handle_msg({incoming, ?PACKET(?PINGREQ)}, st())),
+    ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>}, st())),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>}, st())),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({incoming, undefined}, st())).
+
+t_handle_msg_outgoing(_) ->
+    ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
+    ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
+    ?assertEqual(ok, emqx_connection:handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
+
+t_handle_msg_tcp_error(_) ->
+    ?assertMatch({stop, {shutdown, econnreset}, _St}, emqx_connection:handle_msg({tcp_error, sock, econnreset}, st())).
+
+t_handle_msg_tcp_closed(_) ->
+    ?assertMatch({stop, {shutdown, tcp_closed}, _St}, emqx_connection:handle_msg({tcp_closed, sock}, st())).
+
+t_handle_msg_passive(_) ->
+    ?assertMatch({ok, _Event, _St}, emqx_connection:handle_msg({tcp_passive, sock}, st())).
+    
+t_handle_msg_deliver(_) ->
+    ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({deliver, topic, msg}, st())).
+    
+t_handle_msg_inet_reply(_) ->
+    ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))),
+    ?assertEqual(ok, emqx_connection:handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))),
+    ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({inet_reply, for_testing, {error, for_testing}}, st())).
+
+t_handle_msg_connack(_) ->
+    ?assertEqual(ok, emqx_connection:handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
+
+t_handle_msg_close(_) ->
+    ?assertMatch({stop, {shutdown, normal}, _St}, emqx_connection:handle_msg({close, normal}, st())).
+    
+t_handle_msg_event(_) ->
+    ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
+    ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
+    ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
+    ?assertEqual(ok, emqx_connection:handle_msg({event, connected}, st())),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, disconnected}, st())),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, st())).
+    
+t_handle_msg_timeout(_) ->
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({timeout, make_ref(), for_testing}, st())).
+
+t_handle_msg_shutdown(_) ->
+    ?assertMatch({stop, {shutdown, for_testing}, _St}, emqx_connection:handle_msg({shutdown, for_testing}, st())).
+
+t_handle_call(_) ->
+    St = st(),
+    ?assertMatch({ok, _St}, emqx_connection:handle_msg({event, undefined}, St)),
+    ?assertMatch({reply, _Info, _NSt}, emqx_connection:handle_call(self(), info, St)),
+    ?assertMatch({reply, _Stats, _NSt }, emqx_connection:handle_call(self(), stats, St)),
+    ?assertEqual({reply, ignored, St}, emqx_connection:handle_call(self(), for_testing, St)),
+    ?assertEqual({stop, {shutdown,kicked}, ok, St}, emqx_connection:handle_call(self(), kick, St)).
+
+t_handle_timeout(_) ->
+    TRef = make_ref(),
+    State = st(#{idle_timer => TRef, limit_timer => TRef, stats_timer => TRef}),
+    ?assertMatch({stop, {shutdown,idle_timeout}, _NState}, emqx_connection:handle_timeout(TRef, idle_timeout, State)),
+    ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_timeout(TRef, limit_timeout, State)),
+    ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, emit_stats, State)),
+    ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)),
+
+    ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
+    ?assertMatch({stop, {shutdown,for_testing}, _NState}, emqx_connection:handle_timeout(TRef, keepalive, State)),
+    ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)).
+
+t_parse_incoming(_) ->
+    ?assertMatch({ok, [], _NState}, emqx_connection:parse_incoming(<<>>, st())),
+    ?assertMatch({[], _NState}, emqx_connection:parse_incoming(<<"for_testing">>, [], st())).
+
+t_next_incoming_msgs(_) ->
+    ?assertEqual({incoming, packet}, emqx_connection:next_incoming_msgs([packet])),
+    ?assertEqual([{incoming, packet2}, {incoming, packet1}], emqx_connection:next_incoming_msgs([packet1, packet2])).
+
+t_handle_incoming(_) ->
+    ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())),
+    ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())).
+
+t_with_channel(_) ->
+    State = st(),
+    
+    ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> ok end),
+    ?assertEqual({ok, State}, emqx_connection:with_channel(handle_in, [for_testing], State)),
+
+    ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, Channel} end),
+    ?assertMatch({ok, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)),
+
+    ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {ok, ?DISCONNECT_PACKET(),Channel} end),
+    ?assertMatch({ok, _Out, _NChannel}, emqx_connection:with_channel(handle_in, [for_testing], State)),
+
+    ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], Channel} end),
+    ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)),
+
+    ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> Channel = channel(), {shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel} end),
+    ?assertMatch({stop, {shutdown,[for_testing]}, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)).
+
+t_handle_outgoing(_) ->
+    ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
+    ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
+    
+t_handle_info(_) ->
+    ?assertMatch({ok, {event,running}, _NState}, emqx_connection:handle_info(activate_socket, st())),
+    ?assertMatch({stop, {shutdown, for_testing}, _NStats}, emqx_connection:handle_info({sock_error, for_testing}, st())),
+    ?assertMatch({ok, _NState}, emqx_connection:handle_info(for_testing, st())).
+
+t_ensure_rate_limit(_) ->
+    State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})),
+    ?assertEqual(undefined, emqx_connection:info(limiter, State)),
+
+    ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init([])} end),
+    State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
+    ?assertEqual(undefined, emqx_connection:info(limiter, State1)),
+
+    ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init([])} end),
+    State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
+    ?assertEqual(undefined, emqx_connection:info(limiter, State2)),
+    ?assertEqual(blocked, emqx_connection:info(sockstate, State2)).
+
+t_activate_socket(_) ->
+    State = st(),
+    {ok, NStats} = emqx_connection:activate_socket(State),
+    ?assertEqual(running, emqx_connection:info(sockstate, NStats)),
+ 
+    State1 = st(#{sockstate => blocked}),
+    ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)),
+
+    State2 = st(#{sockstate => closed}),
+    ?assertEqual({ok, State2}, emqx_connection:activate_socket(State2)).
+
+t_close_socket(_) ->
+    State = emqx_connection:close_socket(st(#{sockstate => closed})),
+    ?assertEqual(closed, emqx_connection:info(sockstate, State)),
+    State1 = emqx_connection:close_socket(st()),
+    ?assertEqual(closed, emqx_connection:info(sockstate, State1)).
+
+t_system_code_change(_) ->
+    State = st(),
+    ?assertEqual({ok, State}, emqx_connection:system_code_change(State, [], [], [])).
+
+t_next_msgs(_) ->
+    ?assertEqual({outgoing, ?CONNECT_PACKET()}, emqx_connection:next_msgs(?CONNECT_PACKET())),
+    ?assertEqual({}, emqx_connection:next_msgs({})),
+    ?assertEqual([], emqx_connection:next_msgs([])).
+
 t_start_link_ok(_) ->
 t_start_link_ok(_) ->
     with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end).
     with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end).
 
 
@@ -99,262 +332,6 @@ t_get_conn_info(_) ->
                                     }, SockInfo)
                                     }, SockInfo)
               end).
               end).
 
 
-t_handle_call_discard(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_call,
-                                       fun(discard, Channel) ->
-                                               {shutdown, discarded, ok, Channel}
-                                       end),
-                      ok = emqx_connection:call(CPid, discard),
-                      timer:sleep(100),
-                      ok = trap_exit(CPid, {shutdown, discarded})
-              end, #{trap_exit => true}),
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_call,
-                                       fun(discard, Channel) ->
-                                               {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel}
-                                       end),
-                      ok = emqx_connection:call(CPid, discard),
-                      timer:sleep(100),
-                      ok = trap_exit(CPid, {shutdown, discarded})
-              end, #{trap_exit => true}).
-
-t_handle_call_takeover(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_call,
-                                       fun({takeover, 'begin'}, Channel) ->
-                                               {reply, session, Channel};
-                                          ({takeover, 'end'}, Channel) ->
-                                               {shutdown, takeovered, [], Channel}
-                                       end),
-                      session = emqx_connection:call(CPid, {takeover, 'begin'}),
-                      [] = emqx_connection:call(CPid, {takeover, 'end'}),
-                      timer:sleep(100),
-                      ok = trap_exit(CPid, {shutdown, takeovered})
-              end, #{trap_exit => true}).
-
-t_handle_call_any(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_call,
-                                       fun(_Req, Channel) -> {reply, ok, Channel} end),
-                      ok = emqx_connection:call(CPid, req)
-              end).
-
-t_handle_incoming_connect(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
-                      ConnPkt = #mqtt_packet_connect{proto_ver   = ?MQTT_PROTO_V5,
-                                                     proto_name  = <<"MQTT">>,
-                                                     clientid    = <<>>,
-                                                     clean_start = true,
-                                                     keepalive   = 60
-                                                    },
-                      Frame = make_frame(?CONNECT_PACKET(ConnPkt)),
-                      CPid ! {tcp, sock, Frame}
-              end).
-
-t_handle_incoming_publish(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
-                      Frame = make_frame(?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)),
-                      CPid ! {tcp, sock, Frame}
-              end).
-
-t_handle_incoming_subscribe(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
-                      Frame = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
-                      CPid ! {tcp, sock, Frame}
-              end).
-
-t_handle_incoming_unsubscribe(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
-                      Frame = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
-                      CPid ! {tcp, sock, Frame}
-              end).
-
-t_handle_incoming_undefined(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
-                      CPid ! {incoming, undefined}
-              end).
-
-t_handle_sock_error(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_info,
-                                       fun({_, Reason}, Channel) ->
-                                               {shutdown, Reason, Channel}
-                                       end),
-                            %% TODO: fixme later
-                            CPid ! {tcp_error, sock, econnreset},
-                            timer:sleep(100),
-                            trap_exit(CPid, {shutdown, econnreset})
-              end, #{trap_exit => true}).
-
-t_handle_sock_activate(_) ->
-    with_conn(fun(CPid) -> CPid ! activate_socket end).
-
-t_handle_sock_closed(_) ->
-    with_conn(fun(CPid) ->
-                            ok = meck:expect(emqx_channel, handle_info,
-                                             fun({sock_closed, Reason}, Channel) ->
-                                                     {shutdown, Reason, Channel}
-                                             end),
-                            CPid ! {tcp_closed, sock},
-                            timer:sleep(100),
-                            trap_exit(CPid, {shutdown, tcp_closed})
-                    end, #{trap_exit => true}),
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_info,
-                                       fun({sock_closed, Reason}, Channel) ->
-                                               {shutdown, Reason, ?DISCONNECT_PACKET(), Channel}
-                                       end),
-                      CPid ! {tcp_closed, sock},
-                      timer:sleep(100),
-                      trap_exit(CPid, {shutdown, tcp_closed})
-              end, #{trap_exit => true}).
-
-t_handle_outgoing(_) ->
-    with_conn(fun(CPid) ->
-                      Publish = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
-                      CPid ! {outgoing, Publish},
-                      CPid ! {outgoing, ?PUBREL_PACKET(1)},
-                      CPid ! {outgoing, [?PUBCOMP_PACKET(1)]}
-              end).
-
-t_conn_rate_limit(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
-                      lists:foreach(fun(I) ->
-                                            Publish = ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, payload(2000)),
-                                            CPid ! {tcp, sock, make_frame(Publish)}
-                                    end, [1, 2])
-              end, #{active_n => 1, rate_limit => {1, 1024}}).
-
-t_conn_pub_limit(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_in, fun(_, Channel) -> {ok, Channel} end),
-                      ok = lists:foreach(fun(I) ->
-                                                 CPid ! {incoming, ?PUBLISH_PACKET(?QOS_0, <<"Topic">>, I, <<>>)}
-                                         end, lists:seq(1, 3))
-                      %%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
-              end, #{active_n => 1, publish_limit => {1, 2}}).
-
-t_conn_pingreq(_) ->
-    with_conn(fun(CPid) -> CPid ! {incoming, ?PACKET(?PINGREQ)} end).
-
-t_inet_reply(_) ->
-    ok = meck:new(emqx_pd, [passthrough, no_history]),
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
-                      CPid ! {inet_reply, for_testing, ok},
-                      timer:sleep(100)
-              end, #{active_n => 1, trap_exit => true}),
-    ok = meck:unload(emqx_pd),
-    with_conn(fun(CPid) ->
-                      CPid ! {inet_reply, for_testing, {error, for_testing}},
-                      timer:sleep(100),
-                      trap_exit(CPid, {shutdown, for_testing})
-              end, #{trap_exit => true}).
-
-t_deliver(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_deliver,
-                                       fun(_, Channel) -> {ok, Channel} end),
-                      CPid ! {deliver, topic, msg}
-              end).
-
-t_event_disconnected(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
-                      ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
-                      CPid ! {event, disconnected}
-              end).
-
-t_event_undefined(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
-                      ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
-                      ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
-                      CPid ! {event, undefined}
-              end).
-
-t_cloes(_) ->
-    with_conn(fun(CPid) ->
-                      CPid ! {close, normal},
-                      timer:sleep(100),
-                      trap_exit(CPid, {shutdown, normal})
-              end, #{trap_exit => true}).
-
-t_oom_shutdown(_) ->
-    with_conn(fun(CPid) ->
-                      CPid ! {shutdown, message_queue_too_long},
-                      timer:sleep(100),
-                      trap_exit(CPid, {shutdown, message_queue_too_long})
-              end, #{trap_exit => true}).
-
-t_handle_idle_timeout(_) ->
-    ok = emqx_zone:set_env(external, idle_timeout, 10),
-    with_conn(fun(CPid) ->
-                      timer:sleep(100),
-                      trap_exit(CPid, {shutdown, idle_timeout})
-              end, #{zone => external, trap_exit => true}).
-
-t_handle_emit_stats(_) ->
-    ok = emqx_zone:set_env(external, idle_timeout, 1000),
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
-                      ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
-                      ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
-                      CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false,
-                                                          max_size    => ?MAX_PACKET_SIZE,
-                                                          version     => ?MQTT_PROTO_V4
-                                                         })},
-                      timer:sleep(1000)
-              end,#{zone => external, trap_exit => true}).
-
-t_handle_limit_timeout(_) ->
-    with_conn(fun(CPid) ->
-                      CPid ! {timeout, undefined, limit_timeout},
-                      timer:sleep(100),
-                      true = erlang:is_process_alive(CPid)
-              end).
-
-t_handle_keepalive_timeout(_) ->
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_channel, handle_timeout,
-                                       fun(_TRef, _TMsg, Channel) ->
-                                               {shutdown, keepalive_timeout, Channel}
-                                       end),
-                      CPid ! {timeout, make_ref(), keepalive},
-                      timer:sleep(100),
-                      trap_exit(CPid, {shutdown, keepalive_timeout})
-              end, #{trap_exit => true}),
-    with_conn(fun(CPid) ->
-                      ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
-                      ok = meck:expect(emqx_channel, handle_timeout,
-                                       fun(_TRef, _TMsg, Channel) ->
-                                               {shutdown, keepalive_timeout, Channel}
-                                       end),
-                      CPid ! {timeout, make_ref(), keepalive},
-                      timer:sleep(100),
-                      false = erlang:is_process_alive(CPid)
-              end, #{trap_exit => true}).
-
-t_handle_shutdown(_) ->
-    with_conn(fun(CPid) ->
-                      CPid ! Shutdown = {shutdown, reason},
-                      timer:sleep(100),
-                      trap_exit(CPid, Shutdown)
-              end, #{trap_exit => true}).
-
-t_exit_message(_) ->
-    with_conn(fun(CPid) ->
-                      CPid ! {'EXIT', CPid, for_testing},
-                      timer:sleep(1000)
-              end, #{trap_exit => true}).
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Helper functions
 %% Helper functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -406,3 +383,45 @@ make_frame(Packet) ->
 
 
 payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
 payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
 
 
+st() -> st(#{}).
+st(InitFields) when is_map(InitFields) ->
+    St = emqx_connection:init_state(emqx_transport, sock, [#{zone => external}]),
+    maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end,
+              emqx_connection:set_field(channel, channel(), St),
+              InitFields
+             ).
+
+channel() -> channel(#{}).
+channel(InitFields) ->
+    ConnInfo = #{peername => {{127,0,0,1}, 3456},
+                 sockname => {{127,0,0,1}, 18083},
+                 conn_mod => emqx_connection,
+                 proto_name => <<"MQTT">>,
+                 proto_ver => ?MQTT_PROTO_V5,
+                 clean_start => true,
+                 keepalive => 30,
+                 clientid => <<"clientid">>,
+                 username => <<"username">>,
+                 receive_maximum => 100,
+                 expiry_interval => 0
+                },
+    ClientInfo = #{zone       => zone,
+                   protocol   => mqtt,
+                   peerhost   => {127,0,0,1},
+                   clientid   => <<"clientid">>,
+                   username   => <<"username">>,
+                   is_superuser => false,
+                   peercert   => undefined,
+                   mountpoint => undefined
+                  },
+    Session = emqx_session:init(#{zone => external},
+                                #{receive_maximum => 0}
+                               ),
+    maps:fold(fun(Field, Value, Channel) ->
+                      emqx_channel:set_field(Field, Value, Channel)
+              end,
+              emqx_channel:init(ConnInfo, [{zone, zone}]),
+              maps:merge(#{clientinfo => ClientInfo,
+                           session    => Session,
+                           conn_state => connected
+                          }, InitFields)).