emqx_ft_async_reply_SUITE.erl 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_async_reply_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("stdlib/include/assert.hrl").
  21. -include_lib("emqx/include/asserts.hrl").
  22. all() -> emqx_common_test_helpers:all(?MODULE).
  23. init_per_suite(Config) ->
  24. Apps = emqx_cth_suite:start(
  25. [
  26. {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
  27. {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s }"}
  28. ],
  29. #{work_dir => ?config(priv_dir, Config)}
  30. ),
  31. [{suite_apps, Apps} | Config].
  32. end_per_suite(Config) ->
  33. ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
  34. ok.
  35. init_per_testcase(_Case, Config) ->
  36. ok = snabbkaffe:start_trace(),
  37. Config.
  38. end_per_testcase(_Case, _Config) ->
  39. ok = snabbkaffe:stop(),
  40. ok.
  41. %%--------------------------------------------------------------------
  42. %% Tests
  43. %%--------------------------------------------------------------------
  44. t_register(_Config) ->
  45. PacketId = 1,
  46. MRef = make_ref(),
  47. TRef = make_ref(),
  48. ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
  49. ?assertEqual(
  50. undefined,
  51. emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
  52. ),
  53. ?assertEqual(
  54. ok,
  55. emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
  56. ),
  57. ?assertEqual(
  58. {ok, PacketId, TRef, somedata},
  59. emqx_ft_async_reply:take_by_mref(MRef)
  60. ).
  61. t_process_independence(_Config) ->
  62. PacketId = 1,
  63. MRef = make_ref(),
  64. TRef = make_ref(),
  65. ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
  66. Self = self(),
  67. spawn_link(fun() ->
  68. Self ! emqx_ft_async_reply:take_by_mref(MRef)
  69. end),
  70. Res1 =
  71. receive
  72. Msg1 -> Msg1
  73. end,
  74. ?assertEqual(
  75. not_found,
  76. Res1
  77. ),
  78. spawn_link(fun() ->
  79. Self ! emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
  80. end),
  81. Res2 =
  82. receive
  83. Msg2 -> Msg2
  84. end,
  85. ?assertEqual(
  86. ok,
  87. Res2
  88. ).
  89. t_take(_Config) ->
  90. PacketId = 1,
  91. MRef = make_ref(),
  92. TRef = make_ref(),
  93. ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata),
  94. ?assertEqual(
  95. {ok, PacketId, TRef, somedata},
  96. emqx_ft_async_reply:take_by_mref(MRef)
  97. ),
  98. ?assertEqual(
  99. not_found,
  100. emqx_ft_async_reply:take_by_mref(MRef)
  101. ),
  102. ?assertEqual(
  103. ok,
  104. emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
  105. ).
  106. t_cleanup(_Config) ->
  107. PacketId = 1,
  108. MRef0 = make_ref(),
  109. TRef0 = make_ref(),
  110. MRef1 = make_ref(),
  111. TRef1 = make_ref(),
  112. ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0, somedata0),
  113. Self = self(),
  114. Pid = spawn_link(fun() ->
  115. ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1, somedata1),
  116. receive
  117. kickoff ->
  118. ?assertEqual(
  119. undefined,
  120. emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
  121. ),
  122. ?assertEqual(
  123. {ok, PacketId, TRef1, somedata1},
  124. emqx_ft_async_reply:take_by_mref(MRef1)
  125. ),
  126. Self ! done
  127. end
  128. end),
  129. ?assertEqual(
  130. undefined,
  131. emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
  132. ),
  133. ok = emqx_ft_async_reply:deregister_all(Self),
  134. ?assertEqual(
  135. ok,
  136. emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
  137. ),
  138. Pid ! kickoff,
  139. receive
  140. done -> ok
  141. end.
  142. t_reply_by_tiemout(_Config) ->
  143. process_flag(trap_exit, true),
  144. ClientId = atom_to_binary(?FUNCTION_NAME),
  145. C = emqx_ft_test_helpers:start_client(ClientId, node()),
  146. SleepForever = fun() ->
  147. Ref = make_ref(),
  148. receive
  149. Ref -> ok
  150. end
  151. end,
  152. ok = meck:new(emqx_ft_storage, [passthrough]),
  153. meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end),
  154. FinTopic = <<"$file/fakeid/fin/999999">>,
  155. ?assertMatch(
  156. {ok, #{reason_code_name := unspecified_error}},
  157. emqtt:publish(C, FinTopic, <<>>, 1)
  158. ),
  159. meck:unload(emqx_ft_storage),
  160. emqtt:stop(C).
  161. t_cleanup_by_cm(_Config) ->
  162. process_flag(trap_exit, true),
  163. ClientId = atom_to_binary(?FUNCTION_NAME),
  164. C = emqx_ft_test_helpers:start_client(ClientId, node()),
  165. ok = meck:new(emqx_ft_storage, [passthrough]),
  166. meck:expect(emqx_ft_storage, kickoff, fun(_) -> meck:exception(error, oops) end),
  167. FinTopic = <<"$file/fakeid/fin/999999">>,
  168. [ClientPid] = emqx_cm:lookup_channels(ClientId),
  169. ?assertWaitEvent(
  170. begin
  171. emqtt:publish(C, FinTopic, <<>>, 1),
  172. exit(ClientPid, kill)
  173. end,
  174. #{?snk_kind := emqx_cm_clean_down, client_id := ClientId},
  175. 1000
  176. ),
  177. ?assertEqual(
  178. {0, 0},
  179. emqx_ft_async_reply:info()
  180. ),
  181. meck:unload(emqx_ft_storage).
  182. t_unrelated_events(_Config) ->
  183. process_flag(trap_exit, true),
  184. ClientId = atom_to_binary(?FUNCTION_NAME),
  185. C = emqx_ft_test_helpers:start_client(ClientId, node()),
  186. [ClientPid] = emqx_cm:lookup_channels(ClientId),
  187. erlang:monitor(process, ClientPid),
  188. ClientPid ! {'DOWN', make_ref(), process, self(), normal},
  189. ClientPid ! {timeout, make_ref(), unknown_timer_event},
  190. ?assertNotReceive(
  191. {'DOWN', _Ref, process, ClientPid, _Reason},
  192. 500
  193. ),
  194. emqtt:stop(C).