emqx_connection_SUITE.erl 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_connection_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx_mqtt.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. all() -> emqx_common_test_helpers:all(?MODULE).
  23. %%--------------------------------------------------------------------
  24. %% CT callbacks
  25. %%--------------------------------------------------------------------
  26. init_per_suite(Config) ->
  27. %% Meck Transport
  28. ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]),
  29. ok = meck:expect(emqx_transport, shutdown, fun(_, _) -> ok end),
  30. %% Meck Channel
  31. ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
  32. %% Meck Cm
  33. ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
  34. ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
  35. ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
  36. %% Meck Pd
  37. ok = meck:new(emqx_pd, [passthrough, no_history, no_link]),
  38. %% Meck Metrics
  39. ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
  40. ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
  41. ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
  42. ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end),
  43. ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end),
  44. %% Meck Hooks
  45. ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
  46. ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
  47. ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
  48. ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
  49. ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
  50. ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
  51. ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end),
  52. Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
  53. [{apps, Apps} | Config].
  54. end_per_suite(Config) ->
  55. ok = meck:unload(emqx_transport),
  56. catch meck:unload(emqx_channel),
  57. ok = meck:unload(emqx_cm),
  58. ok = meck:unload(emqx_pd),
  59. ok = meck:unload(emqx_metrics),
  60. ok = meck:unload(emqx_hooks),
  61. ok = meck:unload(emqx_alarm),
  62. emqx_cth_suite:stop(proplists:get_value(apps, Config)).
  63. init_per_testcase(TestCase, Config) when
  64. TestCase =/= t_ws_pingreq_before_connected
  65. ->
  66. add_bucket(),
  67. ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end),
  68. ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end),
  69. ok = meck:expect(
  70. emqx_transport,
  71. ensure_ok_or_exit,
  72. fun
  73. (peername, [sock]) -> {ok, {{127, 0, 0, 1}, 3456}};
  74. (sockname, [sock]) -> {ok, {{127, 0, 0, 1}, 1883}};
  75. (peercert, [sock]) -> undefined
  76. end
  77. ),
  78. ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end),
  79. ok = meck:expect(emqx_transport, getopts, fun(_Sock, Options) ->
  80. {ok, [{K, 0} || K <- Options]}
  81. end),
  82. ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
  83. {ok, [{K, 0} || K <- Options]}
  84. end),
  85. ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
  86. ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
  87. ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
  88. case erlang:function_exported(?MODULE, TestCase, 2) of
  89. true -> ?MODULE:TestCase(init, Config);
  90. _ -> Config
  91. end;
  92. init_per_testcase(_, Config) ->
  93. add_bucket(),
  94. Config.
  95. end_per_testcase(TestCase, Config) ->
  96. del_bucket(),
  97. case erlang:function_exported(?MODULE, TestCase, 2) of
  98. true -> ?MODULE:TestCase('end', Config);
  99. false -> ok
  100. end,
  101. Config.
  102. %%--------------------------------------------------------------------
  103. %% Test cases
  104. %%--------------------------------------------------------------------
  105. t_ws_pingreq_before_connected(_) ->
  106. ?assertMatch(
  107. {ok, [_, {close, protocol_error}], _},
  108. handle_msg({incoming, ?PACKET(?PINGREQ)}, st(#{}, #{conn_state => disconnected}))
  109. ).
  110. t_info(_) ->
  111. CPid = spawn(fun() ->
  112. receive
  113. {'$gen_call', From, info} ->
  114. gen_server:reply(From, emqx_connection:info(st()))
  115. after 100 -> error("error")
  116. end
  117. end),
  118. #{sockinfo := SockInfo} = emqx_connection:info(CPid),
  119. ?assertMatch(
  120. #{
  121. peername := {{127, 0, 0, 1}, 3456},
  122. sockname := {{127, 0, 0, 1}, 1883},
  123. sockstate := idle,
  124. socktype := tcp
  125. },
  126. SockInfo
  127. ).
  128. t_info_limiter(_) ->
  129. Limiter = init_limiter(),
  130. St = st(#{limiter => Limiter}),
  131. ?assertEqual(Limiter, emqx_connection:info(limiter, St)).
  132. t_stats(_) ->
  133. CPid = spawn(fun() ->
  134. receive
  135. {'$gen_call', From, stats} ->
  136. gen_server:reply(From, emqx_connection:stats(st()))
  137. after 100 -> error("error")
  138. end
  139. end),
  140. Stats = emqx_connection:stats(CPid),
  141. ?assertMatch(
  142. [
  143. {recv_oct, 0},
  144. {recv_cnt, 0},
  145. {send_oct, 0},
  146. {send_cnt, 0},
  147. {send_pend, 0}
  148. | _
  149. ],
  150. Stats
  151. ).
  152. t_process_msg(_) ->
  153. with_conn(
  154. fun(CPid) ->
  155. ok = meck:expect(
  156. emqx_channel,
  157. handle_in,
  158. fun(_Packet, Channel) ->
  159. {ok, Channel}
  160. end
  161. ),
  162. CPid ! {incoming, ?PACKET(?PINGREQ)},
  163. CPid ! {incoming, undefined},
  164. CPid ! {tcp_passive, sock},
  165. CPid ! {tcp_closed, sock},
  166. timer:sleep(100),
  167. ok = trap_exit(CPid, {shutdown, tcp_closed})
  168. end,
  169. #{trap_exit => true}
  170. ).
  171. t_ensure_stats_timer(_) ->
  172. NStats = emqx_connection:ensure_stats_timer(100, st()),
  173. Stats_timer = emqx_connection:info(stats_timer, NStats),
  174. ?assert(is_reference(Stats_timer)),
  175. ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).
  176. t_cancel_stats_timer(_) ->
  177. NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
  178. Stats_timer = emqx_connection:info(stats_timer, NStats),
  179. ?assertEqual(undefined, Stats_timer),
  180. ?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)).
  181. t_append_msg(_) ->
  182. ?assertEqual([msg], emqx_connection:append_msg([], [msg])),
  183. ?assertEqual([msg], emqx_connection:append_msg([], msg)),
  184. ?assertEqual([msg1, msg], emqx_connection:append_msg([msg1], [msg])),
  185. ?assertEqual([msg1, msg], emqx_connection:append_msg([msg1], msg)).
  186. t_handle_msg(_) ->
  187. From = {make_ref(), self()},
  188. ?assertMatch({ok, _St}, handle_msg({'$gen_call', From, for_testing}, st())),
  189. ?assertMatch(
  190. {stop, {shutdown, discarded}, _St}, handle_msg({'$gen_call', From, discard}, st())
  191. ),
  192. ?assertMatch(
  193. {stop, {shutdown, discarded}, _St}, handle_msg({'$gen_call', From, discard}, st())
  194. ),
  195. ?assertMatch({ok, [], _St}, handle_msg({tcp, From, <<"for_testing">>}, st())),
  196. ?assertMatch({ok, _St}, handle_msg(for_testing, st())).
  197. t_handle_msg_incoming(_) ->
  198. ?assertMatch(
  199. {ok, _Out, _St},
  200. handle_msg({incoming, ?CONNECT_PACKET(#mqtt_packet_connect{})}, st())
  201. ),
  202. ok = meck:expect(emqx_channel, handle_in, fun(_Packet, Channel) -> {ok, Channel} end),
  203. ?assertMatch(
  204. {ok, _St},
  205. handle_msg({incoming, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)}, st())
  206. ),
  207. Sub1 = <<?SUBSCRIBE:4, 2:4, 11, 0, 2, 0, 6, 84, 111, 112, 105, 99, 65, 2>>,
  208. ?assertMatch({ok, _St}, handle_msg({incoming, Sub1}, st())),
  209. Sub2 = <<?UNSUBSCRIBE:4, 2:4, 10, 0, 2, 0, 6, 84, 111, 112, 105, 99, 65>>,
  210. ?assertMatch({ok, _St}, handle_msg({incoming, Sub2}, st())),
  211. ?assertMatch({ok, _St}, handle_msg({incoming, undefined}, st())).
  212. t_handle_msg_outgoing(_) ->
  213. ?assertEqual(ok, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
  214. ?assertEqual(ok, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
  215. ?assertEqual(ok, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
  216. t_handle_msg_tcp_error(_) ->
  217. ?assertMatch(
  218. {stop, {shutdown, econnreset}, _St},
  219. handle_msg({tcp_error, sock, econnreset}, st())
  220. ).
  221. t_handle_msg_tcp_closed(_) ->
  222. ?assertMatch({stop, {shutdown, tcp_closed}, _St}, handle_msg({tcp_closed, sock}, st())).
  223. t_handle_msg_passive(_) ->
  224. ?assertMatch({ok, _Event, _St}, handle_msg({tcp_passive, sock}, st())).
  225. t_handle_msg_deliver(_) ->
  226. ok = meck:expect(emqx_channel, handle_deliver, fun(_, Channel) -> {ok, Channel} end),
  227. ?assertMatch({ok, _St}, handle_msg({deliver, topic, msg}, st())).
  228. t_handle_msg_inet_reply(_) ->
  229. ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
  230. emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 0),
  231. ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())),
  232. emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 100),
  233. ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())),
  234. ?assertMatch(
  235. {stop, {shutdown, for_testing}, _St},
  236. handle_msg({inet_reply, for_testing, {error, for_testing}}, st())
  237. ).
  238. t_handle_msg_connack(_) ->
  239. ?assertEqual(ok, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
  240. t_handle_msg_close(_) ->
  241. ?assertMatch({stop, {shutdown, normal}, _St}, handle_msg({close, normal}, st())).
  242. t_handle_msg_event(_) ->
  243. ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
  244. ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
  245. ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
  246. ?assertEqual(ok, handle_msg({event, connected}, st())),
  247. ?assertMatch({ok, _St}, handle_msg({event, disconnected}, st())),
  248. ?assertMatch({ok, _St}, handle_msg({event, undefined}, st())).
  249. t_handle_msg_timeout(_) ->
  250. ?assertMatch({ok, _St}, handle_msg({timeout, make_ref(), for_testing}, st())).
  251. t_handle_msg_shutdown(_) ->
  252. ?assertMatch({stop, {shutdown, for_testing}, _St}, handle_msg({shutdown, for_testing}, st())).
  253. t_handle_call(_) ->
  254. St = st(#{limiter => init_limiter()}),
  255. ?assertMatch({ok, _St}, handle_msg({event, undefined}, St)),
  256. ?assertMatch({reply, _Info, _NSt}, handle_call(self(), info, St)),
  257. ?assertMatch({reply, _Stats, _NSt}, handle_call(self(), stats, St)),
  258. ?assertEqual({reply, ignored, St}, handle_call(self(), for_testing, St)),
  259. ?assertMatch(
  260. {stop, {shutdown, kicked}, ok, _NSt},
  261. handle_call(self(), kick, St)
  262. ).
  263. t_handle_timeout(_) ->
  264. TRef = make_ref(),
  265. State = st(#{idle_timer => TRef, stats_timer => TRef, limiter => init_limiter()}),
  266. ?assertMatch(
  267. {stop, {shutdown, idle_timeout}, _NState},
  268. emqx_connection:handle_timeout(TRef, idle_timeout, State)
  269. ),
  270. ?assertMatch(
  271. {ok, _NState},
  272. emqx_connection:handle_timeout(TRef, emit_stats, State)
  273. ),
  274. ?assertMatch(
  275. {ok, _NState},
  276. emqx_connection:handle_timeout(TRef, keepalive, State)
  277. ),
  278. ?assertMatch({ok, _NState}, emqx_connection:handle_timeout(TRef, undefined, State)).
  279. t_parse_incoming(_) ->
  280. ?assertMatch({[], _NState}, emqx_connection:parse_incoming(<<>>, [], st())),
  281. ?assertMatch({[], _NState}, emqx_connection:parse_incoming(<<"for_testing">>, [], st())).
  282. t_next_incoming_msgs(_) ->
  283. State = st(#{}),
  284. ?assertEqual(
  285. {ok, [{incoming, packet}], State},
  286. emqx_connection:next_incoming_msgs([packet], [], State)
  287. ),
  288. ?assertEqual(
  289. {ok, [{incoming, packet2}, {incoming, packet1}], State},
  290. emqx_connection:next_incoming_msgs([packet1, packet2], [], State)
  291. ).
  292. t_handle_incoming(_) ->
  293. ?assertMatch(
  294. {ok, _Out, _NState},
  295. emqx_connection:handle_incoming(?CONNECT_PACKET(#mqtt_packet_connect{}), st())
  296. ),
  297. ?assertMatch({ok, _Out, _NState}, emqx_connection:handle_incoming(frame_error, st())).
  298. t_with_channel(_) ->
  299. State = st(),
  300. ok = meck:expect(emqx_channel, handle_in, fun(_, _) -> ok end),
  301. ?assertEqual({ok, State}, emqx_connection:with_channel(handle_in, [for_testing], State)),
  302. ok = meck:expect(emqx_channel, handle_in, fun(_, _) ->
  303. Channel = channel(),
  304. {ok, Channel}
  305. end),
  306. ?assertMatch({ok, _NState}, emqx_connection:with_channel(handle_in, [for_testing], State)),
  307. ok = meck:expect(
  308. emqx_channel,
  309. handle_in,
  310. fun(_, _) ->
  311. Channel = channel(),
  312. {ok, ?DISCONNECT_PACKET(), Channel}
  313. end
  314. ),
  315. ?assertMatch(
  316. {ok, _Out, _NChannel},
  317. emqx_connection:with_channel(handle_in, [for_testing], State)
  318. ),
  319. ok = meck:expect(
  320. emqx_channel,
  321. handle_in,
  322. fun(_, _) ->
  323. Channel = channel(),
  324. {shutdown, [for_testing], Channel}
  325. end
  326. ),
  327. ?assertMatch(
  328. {stop, {shutdown, [for_testing]}, _NState},
  329. emqx_connection:with_channel(handle_in, [for_testing], State)
  330. ),
  331. ok = meck:expect(
  332. emqx_channel,
  333. handle_in,
  334. fun(_, _) ->
  335. Channel = channel(),
  336. {shutdown, [for_testing], ?DISCONNECT_PACKET(), Channel}
  337. end
  338. ),
  339. ?assertMatch(
  340. {stop, {shutdown, [for_testing]}, _NState},
  341. emqx_connection:with_channel(handle_in, [for_testing], State)
  342. ),
  343. meck:unload(emqx_channel).
  344. t_handle_outgoing(_) ->
  345. ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
  346. ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
  347. t_handle_info(_) ->
  348. ?assertMatch(
  349. {ok, {event, running}, _NState},
  350. emqx_connection:handle_info(activate_socket, st())
  351. ),
  352. ?assertMatch(
  353. {stop, {shutdown, for_testing}, _NStats},
  354. emqx_connection:handle_info({sock_error, for_testing}, st())
  355. ),
  356. ?assertMatch({ok, _NState}, emqx_connection:handle_info(for_testing, st())).
  357. t_ensure_rate_limit(_) ->
  358. WhenOk = fun emqx_connection:next_incoming_msgs/3,
  359. {ok, [], State} = emqx_connection:check_limiter(
  360. [],
  361. [],
  362. WhenOk,
  363. [],
  364. st(#{limiter => undefined})
  365. ),
  366. ?assertEqual(undefined, emqx_connection:info(limiter, State)),
  367. Limiter = init_limiter(),
  368. {ok, [], State1} = emqx_connection:check_limiter([], [], WhenOk, [], st(#{limiter => Limiter})),
  369. ?assertEqual(Limiter, emqx_connection:info(limiter, State1)),
  370. ok = meck:new(emqx_htb_limiter, [passthrough, no_history, no_link]),
  371. ok = meck:expect(
  372. emqx_htb_limiter,
  373. make_infinity_limiter,
  374. fun() -> non_infinity end
  375. ),
  376. ok = meck:expect(
  377. emqx_htb_limiter,
  378. check,
  379. fun(_, Client) -> {pause, 3000, undefined, Client} end
  380. ),
  381. {ok, State2} = emqx_connection:check_limiter(
  382. [{1000, bytes}],
  383. [],
  384. WhenOk,
  385. [],
  386. st(#{limiter => init_limiter()})
  387. ),
  388. meck:unload(emqx_htb_limiter),
  389. ?assertNotEqual(undefined, emqx_connection:info(limiter_timer, State2)).
  390. t_activate_socket(_) ->
  391. Limiter = init_limiter(),
  392. State = st(#{limiter => Limiter}),
  393. {ok, NStats} = emqx_connection:activate_socket(State),
  394. ?assertEqual(running, emqx_connection:info(sockstate, NStats)),
  395. State1 = st(#{sockstate => blocked, limiter_timer => any_timer}),
  396. ?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)),
  397. State2 = st(#{sockstate => closed}),
  398. ?assertEqual({ok, State2}, emqx_connection:activate_socket(State2)).
  399. t_close_socket(_) ->
  400. State = emqx_connection:close_socket(st(#{sockstate => closed})),
  401. ?assertEqual(closed, emqx_connection:info(sockstate, State)),
  402. State1 = emqx_connection:close_socket(st()),
  403. ?assertEqual(closed, emqx_connection:info(sockstate, State1)).
  404. t_system_code_change(_) ->
  405. State = st(),
  406. ?assertEqual({ok, State}, emqx_connection:system_code_change(State, [], [], [])).
  407. t_next_msgs(_) ->
  408. ?assertEqual({outgoing, ?CONNECT_PACKET()}, emqx_connection:next_msgs(?CONNECT_PACKET())),
  409. ?assertEqual({}, emqx_connection:next_msgs({})),
  410. ?assertEqual([], emqx_connection:next_msgs([])).
  411. t_start_link_ok(_) ->
  412. with_conn(fun(CPid) -> state = element(1, sys:get_state(CPid)) end).
  413. t_start_link_exit_on_wait(_) ->
  414. ok = exit_on_wait_error(enotconn, normal),
  415. ok = exit_on_wait_error(einval, normal),
  416. ok = exit_on_wait_error(closed, normal),
  417. ok = exit_on_wait_error(timeout, {shutdown, ssl_upgrade_timeout}),
  418. ok = exit_on_wait_error(enetdown, {shutdown, enetdown}).
  419. t_start_link_exit_on_activate(_) ->
  420. ok = exit_on_activate_error(enotconn, normal),
  421. ok = exit_on_activate_error(einval, normal),
  422. ok = exit_on_activate_error(closed, normal),
  423. ok = exit_on_activate_error(econnreset, {shutdown, econnreset}).
  424. t_get_conn_info(_) ->
  425. with_conn(fun(CPid) ->
  426. #{sockinfo := SockInfo} = emqx_connection:info(CPid),
  427. ?assertEqual(
  428. #{
  429. peername => {{127, 0, 0, 1}, 3456},
  430. sockname => {{127, 0, 0, 1}, 1883},
  431. sockstate => running,
  432. socktype => tcp
  433. },
  434. SockInfo
  435. )
  436. end).
  437. t_oom_shutdown(init, Config) ->
  438. ok = snabbkaffe:stop(),
  439. ok = snabbkaffe:start_trace(),
  440. ok = meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]),
  441. meck:expect(
  442. emqx_utils,
  443. check_oom,
  444. fun(_) -> {shutdown, "fake_oom"} end
  445. ),
  446. Config;
  447. t_oom_shutdown('end', _Config) ->
  448. snabbkaffe:stop(),
  449. meck:unload(emqx_utils),
  450. ok.
  451. t_oom_shutdown(_) ->
  452. Opts = #{trap_exit => true},
  453. with_conn(
  454. fun(Pid) ->
  455. Pid ! {tcp_passive, foo},
  456. {ok, _} = ?block_until(#{?snk_kind := check_oom}, 1000),
  457. {ok, _} = ?block_until(#{?snk_kind := terminate}, 100),
  458. Trace = snabbkaffe:collect_trace(),
  459. ?assertEqual(1, length(?of_kind(terminate, Trace))),
  460. receive
  461. {'EXIT', Pid, Reason} ->
  462. ?assertEqual({shutdown, "fake_oom"}, Reason)
  463. after 1000 ->
  464. error(timeout)
  465. end,
  466. ?assertNot(erlang:is_process_alive(Pid))
  467. end,
  468. Opts
  469. ),
  470. ok.
  471. t_cancel_congestion_alarm(_) ->
  472. Opts = #{trap_exit => false},
  473. ok = meck:expect(
  474. emqx_transport,
  475. getstat,
  476. fun
  477. (_Sock, [send_pend]) ->
  478. %% simulate congestion
  479. {ok, [{send_pend, 999}]};
  480. (_Sock, Options) ->
  481. {ok, [{K, 0} || K <- Options]}
  482. end
  483. ),
  484. with_conn(
  485. fun(Pid) ->
  486. #{
  487. channel := Channel,
  488. transport := Transport,
  489. socket := Socket
  490. } = emqx_connection:get_state(Pid),
  491. %% precondition
  492. Zone = emqx_channel:info(zone, Channel),
  493. true = emqx_config:get_zone_conf(Zone, [conn_congestion, enable_alarm]),
  494. %% should not raise errors
  495. ok = emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
  496. %% should not raise errors either
  497. ok = emqx_congestion:cancel_alarms(Socket, Transport, Channel),
  498. ok
  499. end,
  500. Opts
  501. ),
  502. ok.
  503. %%--------------------------------------------------------------------
  504. %% Helper functions
  505. %%--------------------------------------------------------------------
  506. exit_on_wait_error(SockErr, Reason) ->
  507. ok = meck:expect(
  508. emqx_transport,
  509. wait,
  510. fun(_Sock) ->
  511. {error, SockErr}
  512. end
  513. ),
  514. with_conn(
  515. fun(CPid) ->
  516. timer:sleep(100),
  517. trap_exit(CPid, Reason)
  518. end,
  519. #{trap_exit => true}
  520. ).
  521. exit_on_activate_error(SockErr, Reason) ->
  522. ok = meck:expect(
  523. emqx_transport,
  524. setopts,
  525. fun(_Sock, _Opts) ->
  526. {error, SockErr}
  527. end
  528. ),
  529. with_conn(
  530. fun(CPid) ->
  531. timer:sleep(100),
  532. trap_exit(CPid, Reason)
  533. end,
  534. #{trap_exit => true}
  535. ).
  536. with_conn(TestFun) ->
  537. with_conn(TestFun, #{trap_exit => false}).
  538. with_conn(TestFun, Opts) when is_map(Opts) ->
  539. TrapExit = maps:get(trap_exit, Opts, false),
  540. process_flag(trap_exit, TrapExit),
  541. {ok, CPid} = emqx_connection:start_link(
  542. emqx_transport,
  543. sock,
  544. maps:merge(
  545. Opts,
  546. #{
  547. zone => default,
  548. limiter => limiter_cfg(),
  549. listener => {tcp, default}
  550. }
  551. )
  552. ),
  553. TestFun(CPid),
  554. TrapExit orelse emqx_connection:stop(CPid),
  555. ok.
  556. trap_exit(Pid, Reason) ->
  557. receive
  558. {'EXIT', Pid, Reason} -> ok;
  559. {'EXIT', Pid, Other} -> error({unexpect_exit, Other})
  560. after 100 -> error({expect_exit, Reason})
  561. end.
  562. make_frame(Packet) ->
  563. iolist_to_binary(emqx_frame:serialize(Packet)).
  564. payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
  565. st() -> st(#{}, #{}).
  566. st(InitFields) when is_map(InitFields) ->
  567. st(InitFields, #{}).
  568. st(InitFields, ChannelFields) when is_map(InitFields) ->
  569. St = emqx_connection:init_state(emqx_transport, sock, #{
  570. zone => default,
  571. limiter => limiter_cfg(),
  572. listener => {tcp, default}
  573. }),
  574. maps:fold(
  575. fun(N, V, S) -> emqx_connection:set_field(N, V, S) end,
  576. emqx_connection:set_field(channel, channel(ChannelFields), St),
  577. InitFields
  578. ).
  579. channel() -> channel(#{}).
  580. channel(InitFields) ->
  581. ConnInfo = #{
  582. peername => {{127, 0, 0, 1}, 3456},
  583. sockname => {{127, 0, 0, 1}, 18083},
  584. conn_mod => emqx_connection,
  585. proto_name => <<"MQTT">>,
  586. proto_ver => ?MQTT_PROTO_V5,
  587. clean_start => true,
  588. keepalive => 30,
  589. clientid => <<"clientid">>,
  590. username => <<"username">>,
  591. receive_maximum => 100,
  592. expiry_interval => 0
  593. },
  594. ClientInfo = #{
  595. zone => default,
  596. listener => {tcp, default},
  597. protocol => mqtt,
  598. peerhost => {127, 0, 0, 1},
  599. clientid => <<"clientid">>,
  600. username => <<"username">>,
  601. is_superuser => false,
  602. mountpoint => undefined
  603. },
  604. Session = emqx_session:create(
  605. ClientInfo,
  606. #{receive_maximum => 0, expiry_interval => 1000},
  607. _WillMsg = undefined
  608. ),
  609. maps:fold(
  610. fun(Field, Value, Channel) ->
  611. emqx_channel:set_field(Field, Value, Channel)
  612. end,
  613. emqx_channel:init(ConnInfo, #{
  614. zone => default,
  615. limiter => limiter_cfg(),
  616. listener => {tcp, default}
  617. }),
  618. maps:merge(
  619. #{
  620. clientinfo => ClientInfo,
  621. session => Session,
  622. conn_state => connected
  623. },
  624. InitFields
  625. )
  626. ).
  627. handle_msg(Msg, St) -> emqx_connection:handle_msg(Msg, St).
  628. handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St).
  629. -define(LIMITER_ID, 'tcp:default').
  630. init_limiter() ->
  631. emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes, messages], limiter_cfg()).
  632. limiter_cfg() ->
  633. Cfg = bucket_cfg(),
  634. Client = client_cfg(),
  635. #{bytes => Cfg, messages => Cfg, client => #{bytes => Client, messages => Client}}.
  636. bucket_cfg() ->
  637. #{rate => infinity, initial => 0, burst => 0}.
  638. client_cfg() ->
  639. #{
  640. rate => infinity,
  641. initial => 0,
  642. burst => 0,
  643. low_watermark => 1,
  644. divisible => false,
  645. max_retry_time => timer:seconds(5),
  646. failure_strategy => force
  647. }.
  648. add_bucket() ->
  649. Cfg = bucket_cfg(),
  650. emqx_limiter_server:add_bucket(?LIMITER_ID, bytes, Cfg),
  651. emqx_limiter_server:add_bucket(?LIMITER_ID, messages, Cfg).
  652. del_bucket() ->
  653. emqx_limiter_server:del_bucket(?LIMITER_ID, bytes),
  654. emqx_limiter_server:del_bucket(?LIMITER_ID, messages).