emqx_ft_request_SUITE.erl 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_request_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("common_test/include/ct.hrl").
  20. -include_lib("stdlib/include/assert.hrl").
  21. all() -> emqx_common_test_helpers:all(?MODULE).
  22. init_per_suite(Config) ->
  23. Apps = emqx_cth_suite:start(
  24. [
  25. {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s}"}
  26. ],
  27. #{work_dir => ?config(priv_dir, Config)}
  28. ),
  29. [{suite_apps, Apps} | Config].
  30. end_per_suite(Config) ->
  31. ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
  32. ok.
  33. init_per_testcase(_Case, Config) ->
  34. Config.
  35. end_per_testcase(_Case, _Config) ->
  36. ok.
  37. %%-------------------------------------------------------------------
  38. %% Tests
  39. %%-------------------------------------------------------------------
  40. t_upload_via_requests(_Config) ->
  41. C = emqx_ft_test_helpers:start_client(<<"client">>),
  42. FileId = <<"f1">>,
  43. Data = <<"hello world">>,
  44. Size = byte_size(Data),
  45. Meta = #{
  46. name => "test.txt",
  47. expire_at => erlang:system_time(_Unit = second) + 3600,
  48. size => Size
  49. },
  50. MetaPayload = emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)),
  51. MetaTopic = <<"$file/", FileId/binary, "/init">>,
  52. ?assertMatch(
  53. {ok, #{<<"reason_code">> := 0, <<"topic">> := MetaTopic}},
  54. request(C, MetaTopic, MetaPayload)
  55. ),
  56. SegmentTopic = <<"$file/", FileId/binary, "/0">>,
  57. ?assertMatch(
  58. {ok, #{<<"reason_code">> := 0, <<"topic">> := SegmentTopic}},
  59. request(C, SegmentTopic, Data)
  60. ),
  61. FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>,
  62. ?assertMatch(
  63. {ok, #{<<"reason_code">> := 0, <<"topic">> := FinTopic}},
  64. request(C, FinTopic, <<>>)
  65. ).
  66. %%--------------------------------------------------------------------
  67. %% Helper functions
  68. %%--------------------------------------------------------------------
  69. request(C, Topic, Request) ->
  70. CorrelaionData = emqx_ft_test_helpers:unique_binary_string(),
  71. ResponseTopic = emqx_ft_test_helpers:unique_binary_string(),
  72. Properties = #{
  73. 'Correlation-Data' => CorrelaionData,
  74. 'Response-Topic' => ResponseTopic
  75. },
  76. Opts = [{qos, 1}],
  77. {ok, _, _} = emqtt:subscribe(C, ResponseTopic, 1),
  78. {ok, _} = emqtt:publish(C, Topic, Properties, Request, Opts),
  79. try
  80. receive
  81. {publish, #{
  82. topic := ResponseTopic,
  83. payload := Payload,
  84. properties := #{'Correlation-Data' := CorrelaionData}
  85. }} ->
  86. {ok, emqx_utils_json:decode(Payload)}
  87. after 1000 ->
  88. {error, timeout}
  89. end
  90. after
  91. emqtt:unsubscribe(C, ResponseTopic)
  92. end.