|
|
@@ -74,6 +74,8 @@ init_per_testcase(_TestCase, Config) ->
|
|
|
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
|
|
|
%% Meck Channel
|
|
|
ok = meck:new(emqx_channel, [passthrough, no_history]),
|
|
|
+ %% Meck Cm
|
|
|
+ ok = meck:new(emqx_cm, [passthrough, no_history]),
|
|
|
%% Meck Metrics
|
|
|
ok = meck:new(emqx_metrics, [passthrough, no_history]),
|
|
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
|
|
@@ -84,6 +86,7 @@ init_per_testcase(_TestCase, Config) ->
|
|
|
end_per_testcase(_TestCase, Config) ->
|
|
|
ok = meck:unload(emqx_transport),
|
|
|
ok = meck:unload(emqx_channel),
|
|
|
+ ok = meck:unload(emqx_cm),
|
|
|
ok = meck:unload(emqx_metrics),
|
|
|
Config.
|
|
|
|
|
|
@@ -135,6 +138,15 @@ t_handle_call_discard(_) ->
|
|
|
ok = emqx_connection:call(CPid, discard),
|
|
|
timer:sleep(100),
|
|
|
ok = trap_exit(CPid, {shutdown, discarded})
|
|
|
+ end, #{trap_exit => true}),
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_channel, handle_call,
|
|
|
+ fun(discard, Channel) ->
|
|
|
+ {shutdown, discarded, ok, ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Channel}
|
|
|
+ end),
|
|
|
+ ok = emqx_connection:call(CPid, discard),
|
|
|
+ timer:sleep(100),
|
|
|
+ ok = trap_exit(CPid, {shutdown, discarded})
|
|
|
end, #{trap_exit => true}).
|
|
|
|
|
|
t_handle_call_takeover(_) ->
|
|
|
@@ -192,6 +204,12 @@ t_handle_incoming_unsubscribe(_) ->
|
|
|
CPid ! {tcp, sock, Frame}
|
|
|
end).
|
|
|
|
|
|
+t_handle_incoming_undefined(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
|
|
+ CPid ! {incoming, undefined}
|
|
|
+ end).
|
|
|
+
|
|
|
t_handle_sock_error(_) ->
|
|
|
with_connection(fun(CPid) ->
|
|
|
ok = meck:expect(emqx_channel, handle_info,
|
|
|
@@ -219,6 +237,15 @@ t_handle_sock_closed(_) ->
|
|
|
CPid ! {tcp_closed, sock},
|
|
|
timer:sleep(100),
|
|
|
trap_exit(CPid, {shutdown, tcp_closed})
|
|
|
+ end, #{trap_exit => true}),
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_channel, handle_info,
|
|
|
+ fun({sock_closed, Reason}, Channel) ->
|
|
|
+ {shutdown, Reason, ?DISCONNECT_PACKET(), Channel}
|
|
|
+ end),
|
|
|
+ CPid ! {tcp_closed, sock},
|
|
|
+ timer:sleep(100),
|
|
|
+ trap_exit(CPid, {shutdown, tcp_closed})
|
|
|
end, #{trap_exit => true}).
|
|
|
|
|
|
t_handle_outgoing(_) ->
|
|
|
@@ -248,6 +275,53 @@ t_conn_pub_limit(_) ->
|
|
|
%%#{sockinfo := #{sockstate := blocked}} = emqx_connection:info(CPid)
|
|
|
end, #{active_n => 1, publish_limit => {1, 2}}).
|
|
|
|
|
|
+t_conn_pingreq(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ CPid ! {incoming, ?PACKET(?PINGREQ)}
|
|
|
+ end).
|
|
|
+
|
|
|
+t_inet_reply(_) ->
|
|
|
+ ok = meck:new(emqx_pd, [passthrough, no_history]),
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
|
|
|
+ CPid ! {inet_reply, for_testing, ok},
|
|
|
+ timer:sleep(100)
|
|
|
+ end, #{active_n => 1, trap_exit => true}),
|
|
|
+ ok = meck:unload(emqx_pd),
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ CPid ! {inet_reply, for_testing, {error, for_testing}},
|
|
|
+ timer:sleep(100),
|
|
|
+ trap_exit(CPid, {shutdown, for_testing})
|
|
|
+ end, #{trap_exit => true}).
|
|
|
+
|
|
|
+t_deliver(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end),
|
|
|
+ CPid ! {deliver, topic, msg}
|
|
|
+ end).
|
|
|
+
|
|
|
+t_event_disconnected(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
|
|
+ ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
|
|
|
+ CPid ! {event, disconnected}
|
|
|
+ end).
|
|
|
+
|
|
|
+t_event_undefined(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
|
|
|
+ ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
|
|
+ ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
|
|
|
+ CPid ! {event, undefined}
|
|
|
+ end).
|
|
|
+
|
|
|
+t_cloes(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ CPid ! {close, normal},
|
|
|
+ timer:sleep(100),
|
|
|
+ trap_exit(CPid, {shutdown, normal})
|
|
|
+ end, #{trap_exit => true}).
|
|
|
+
|
|
|
t_oom_shutdown(_) ->
|
|
|
with_connection(fun(CPid) ->
|
|
|
CPid ! {shutdown, message_queue_too_long},
|
|
|
@@ -263,13 +337,24 @@ t_handle_idle_timeout(_) ->
|
|
|
end, #{zone => external, trap_exit => true}).
|
|
|
|
|
|
t_handle_emit_stats(_) ->
|
|
|
+ ok = emqx_zone:set_env(external, idle_timeout, 1000),
|
|
|
with_connection(fun(CPid) ->
|
|
|
- ok = meck:expect(emqx_channel, handle_timeout,
|
|
|
- fun(_TRef, _TMsg, Channel) ->
|
|
|
- {ok, Channel}
|
|
|
- end),
|
|
|
- CPid ! {timeout, make_ref(), emit_stats}
|
|
|
- end).
|
|
|
+ ok = meck:expect(emqx_channel, stats, fun(_Channel) -> [] end),
|
|
|
+ ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
|
|
|
+ ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> true end),
|
|
|
+ CPid ! {incoming, ?CONNECT_PACKET(#{strict_mode => false,
|
|
|
+ max_size => ?MAX_PACKET_SIZE,
|
|
|
+ version => ?MQTT_PROTO_V4
|
|
|
+ })},
|
|
|
+ timer:sleep(1000)
|
|
|
+ end,#{zone => external, trap_exit => true}).
|
|
|
+
|
|
|
+t_handle_limit_timeout(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ CPid ! {timeout, undefined, limit_timeout},
|
|
|
+ timer:sleep(100),
|
|
|
+ false = erlang:is_process_alive(CPid)
|
|
|
+ end, #{trap_exit => true}).
|
|
|
|
|
|
t_handle_keepalive_timeout(_) ->
|
|
|
with_connection(fun(CPid) ->
|
|
|
@@ -280,6 +365,16 @@ t_handle_keepalive_timeout(_) ->
|
|
|
CPid ! {timeout, make_ref(), keepalive},
|
|
|
timer:sleep(100),
|
|
|
trap_exit(CPid, {shutdown, keepalive_timeout})
|
|
|
+ end, #{trap_exit => true}),
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ ok = meck:expect(emqx_transport, getstat, fun(_Sock, _Options) -> {error, for_testing} end),
|
|
|
+ ok = meck:expect(emqx_channel, handle_timeout,
|
|
|
+ fun(_TRef, _TMsg, Channel) ->
|
|
|
+ {shutdown, keepalive_timeout, Channel}
|
|
|
+ end),
|
|
|
+ CPid ! {timeout, make_ref(), keepalive},
|
|
|
+ timer:sleep(100),
|
|
|
+ false = erlang:is_process_alive(CPid)
|
|
|
end, #{trap_exit => true}).
|
|
|
|
|
|
t_handle_shutdown(_) ->
|
|
|
@@ -289,6 +384,12 @@ t_handle_shutdown(_) ->
|
|
|
trap_exit(CPid, Shutdown)
|
|
|
end, #{trap_exit => true}).
|
|
|
|
|
|
+t_exit_message(_) ->
|
|
|
+ with_connection(fun(CPid) ->
|
|
|
+ CPid ! {'EXIT', CPid, for_testing},
|
|
|
+ timer:sleep(1000)
|
|
|
+ end, #{trap_exit => true}).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Helper functions
|
|
|
%%--------------------------------------------------------------------
|