emqx_request_response_SUITE.erl 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. %% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  2. %%
  3. %% Licensed under the Apache License, Version 2.0 (the "License");
  4. %% you may not use this file except in compliance with the License.
  5. %% You may obtain a copy of the License at
  6. %%
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. -module(emqx_request_response_SUITE).
  15. -compile(export_all).
  16. -compile(nowarn_export_all).
  17. -include("emqx_mqtt.hrl").
  18. -include_lib("eunit/include/eunit.hrl").
  19. -include_lib("common_test/include/ct.hrl").
  20. init_per_suite(Config) ->
  21. emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]).
  22. end_per_suite(_Config) ->
  23. emqx_ct_broker_helpers:run_teardown_steps().
  24. all() ->
  25. [request_response].
  26. request_response(_Config) ->
  27. request_response_per_qos(?QOS_0),
  28. request_response_per_qos(?QOS_1),
  29. request_response_per_qos(?QOS_2).
  30. request_response_per_qos(QoS) ->
  31. ReqTopic = <<"request_topic">>,
  32. RspTopic = <<"response_topic">>,
  33. {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS,
  34. [{proto_ver, v5},
  35. {client_id, <<"requester">>},
  36. {properties, #{ 'Request-Response-Information' => 1}}]),
  37. %% This is a square service
  38. Square = fun(_CorrData, ReqBin) ->
  39. I = b2i(ReqBin),
  40. i2b(I * I)
  41. end,
  42. {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square,
  43. [{proto_ver, v5},
  44. {client_id, <<"responser">>}
  45. ]),
  46. ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS),
  47. receive
  48. {response, <<"corr-1">>, <<"4">>} ->
  49. ok;
  50. Other ->
  51. erlang:error({unexpected, Other})
  52. after
  53. 100 ->
  54. erlang:error(timeout)
  55. end,
  56. ok = emqx_request_sender:stop(Requester),
  57. ok = emqx_request_handler:stop(Responser).
  58. b2i(B) -> binary_to_integer(B).
  59. i2b(I) -> integer_to_binary(I).