emqx_authn_mqtt_test_client.erl 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_authn_mqtt_test_client).
  17. -behaviour(gen_server).
  18. -include_lib("emqx/include/emqx_mqtt.hrl").
  19. %% API
  20. -export([
  21. start_link/2,
  22. stop/1
  23. ]).
  24. -export([send/2]).
  25. %% gen_server callbacks
  26. -export([
  27. init/1,
  28. handle_call/3,
  29. handle_cast/2,
  30. handle_info/2,
  31. terminate/2
  32. ]).
  33. -define(TIMEOUT, 1000).
  34. -define(TCP_OPTIONS, [
  35. binary,
  36. {packet, raw},
  37. {active, once},
  38. {nodelay, true}
  39. ]).
  40. -define(PARSE_OPTIONS, #{
  41. strict_mode => false,
  42. max_size => ?MAX_PACKET_SIZE,
  43. version => ?MQTT_PROTO_V5
  44. }).
  45. %%--------------------------------------------------------------------
  46. %% API
  47. %%--------------------------------------------------------------------
  48. start_link(Host, Port) ->
  49. gen_server:start_link(?MODULE, [Host, Port, self()], []).
  50. stop(Pid) ->
  51. gen_server:call(Pid, stop).
  52. send(Pid, Packet) ->
  53. gen_server:call(Pid, {send, Packet}).
  54. %%--------------------------------------------------------------------
  55. %% gen_server callbacks
  56. %%--------------------------------------------------------------------
  57. init([Host, Port, Owner]) ->
  58. {ok, Socket} = gen_tcp:connect(Host, Port, ?TCP_OPTIONS, ?TIMEOUT),
  59. {ok, #{
  60. owner => Owner,
  61. socket => Socket,
  62. parse_state => emqx_frame:initial_parse_state(?PARSE_OPTIONS)
  63. }}.
  64. handle_info(
  65. {tcp, _Sock, Data},
  66. #{
  67. parse_state := PSt,
  68. owner := Owner,
  69. socket := Socket
  70. } = St
  71. ) ->
  72. {NewPSt, Packets} = process_incoming(PSt, Data, []),
  73. ok = deliver(Owner, Packets),
  74. ok = run_sock(Socket),
  75. {noreply, St#{parse_state => NewPSt}};
  76. handle_info({tcp_closed, _Sock}, St) ->
  77. {stop, normal, St}.
  78. handle_call({send, Packet}, _From, #{socket := Socket} = St) ->
  79. ok = gen_tcp:send(Socket, emqx_frame:serialize(Packet, ?MQTT_PROTO_V5)),
  80. {reply, ok, St};
  81. handle_call(stop, _From, #{socket := Socket} = St) ->
  82. ok = gen_tcp:close(Socket),
  83. {stop, normal, ok, St}.
  84. handle_cast(_, St) ->
  85. {noreply, St}.
  86. terminate(_Reason, _St) ->
  87. ok.
  88. %%--------------------------------------------------------------------
  89. %% internal functions
  90. %%--------------------------------------------------------------------
  91. process_incoming(PSt, Data, Packets) ->
  92. case emqx_frame:parse(Data, PSt) of
  93. {more, NewPSt} ->
  94. {NewPSt, lists:reverse(Packets)};
  95. {ok, Packet, Rest, NewPSt} ->
  96. process_incoming(NewPSt, Rest, [Packet | Packets])
  97. end.
  98. deliver(_Owner, []) ->
  99. ok;
  100. deliver(Owner, [Packet | Packets]) ->
  101. Owner ! {packet, Packet},
  102. deliver(Owner, Packets).
  103. run_sock(Socket) ->
  104. inet:setopts(Socket, [{active, once}]).