rfc6455_client.erl 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. %% The contents of this file are subject to the Mozilla Public License
  2. %% Version 1.1 (the "License"); you may not use this file except in
  3. %% compliance with the License. You may obtain a copy of the License at
  4. %% http://www.mozilla.org/MPL/
  5. %%
  6. %% Software distributed under the License is distributed on an "AS IS"
  7. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
  8. %% License for the specific language governing rights and limitations
  9. %% under the License.
  10. %%
  11. %% The Original Code is RabbitMQ Management Console.
  12. %%
  13. %% The Initial Developer of the Original Code is GoPivotal, Inc.
  14. %% Copyright (c) 2012-2016 Pivotal Software, Inc. All rights reserved.
  15. %%
  16. -module(rfc6455_client).
  17. -export([new/2, open/1, recv/1, send/2, send_binary/2, close/1, close/2]).
  18. -record(state, {host, port, addr, path, ppid, socket, data, phase}).
  19. %% --------------------------------------------------------------------------
  20. new(WsUrl, PPid) ->
  21. crypto:start(),
  22. "ws://" ++ Rest = WsUrl,
  23. [Addr, Path] = split("/", Rest, 1),
  24. [Host, MaybePort] = split(":", Addr, 1, empty),
  25. Port = case MaybePort of
  26. empty -> 80;
  27. V -> {I, ""} = string:to_integer(V), I
  28. end,
  29. State = #state{host = Host,
  30. port = Port,
  31. addr = Addr,
  32. path = "/" ++ Path,
  33. ppid = PPid},
  34. spawn(fun() ->
  35. start_conn(State)
  36. end).
  37. open(WS) ->
  38. receive
  39. {rfc6455, open, WS, Opts} ->
  40. {ok, Opts};
  41. {rfc6455, close, WS, R} ->
  42. {close, R}
  43. end.
  44. recv(WS) ->
  45. receive
  46. {rfc6455, recv, WS, Payload} ->
  47. {ok, Payload};
  48. {rfc6455, recv_binary, WS, Payload} ->
  49. {binary, Payload};
  50. {rfc6455, close, WS, R} ->
  51. {close, R}
  52. end.
  53. send(WS, IoData) ->
  54. WS ! {send, IoData},
  55. ok.
  56. send_binary(WS, IoData) ->
  57. WS ! {send_binary, IoData},
  58. ok.
  59. close(WS) ->
  60. close(WS, {1000, ""}).
  61. close(WS, WsReason) ->
  62. WS ! {close, WsReason},
  63. receive
  64. {rfc6455, close, WS, R} ->
  65. {close, R}
  66. end.
  67. %% --------------------------------------------------------------------------
  68. start_conn(State) ->
  69. {ok, Socket} = gen_tcp:connect(State#state.host, State#state.port,
  70. [binary,
  71. {packet, 0}]),
  72. Key = base64:encode_to_string(crypto:strong_rand_bytes(16)),
  73. gen_tcp:send(Socket,
  74. "GET " ++ State#state.path ++ " HTTP/1.1\r\n" ++
  75. "Host: " ++ State#state.addr ++ "\r\n" ++
  76. "Upgrade: websocket\r\n" ++
  77. "Connection: Upgrade\r\n" ++
  78. "Sec-WebSocket-Key: " ++ Key ++ "\r\n" ++
  79. "Origin: null\r\n" ++
  80. "Sec-WebSocket-Protocol: mqtt\r\n" ++
  81. "Sec-WebSocket-Version: 13\r\n\r\n"),
  82. loop(State#state{socket = Socket,
  83. data = <<>>,
  84. phase = opening}).
  85. do_recv(State = #state{phase = opening, ppid = PPid, data = Data}) ->
  86. case split("\r\n\r\n", binary_to_list(Data), 1, empty) of
  87. [_Http, empty] -> State;
  88. [Http, Data1] ->
  89. %% TODO: don't ignore http response data, verify key
  90. PPid ! {rfc6455, open, self(), [{http_response, Http}]},
  91. State#state{phase = open,
  92. data = Data1}
  93. end;
  94. do_recv(State = #state{phase = Phase, data = Data, socket = Socket, ppid = PPid})
  95. when Phase =:= open orelse Phase =:= closing ->
  96. R = case Data of
  97. <<F:1, _:3, O:4, 0:1, L:7, Payload:L/binary, Rest/binary>>
  98. when L < 126 ->
  99. {F, O, Payload, Rest};
  100. <<F:1, _:3, O:4, 0:1, 126:7, L2:16, Payload:L2/binary, Rest/binary>> ->
  101. {F, O, Payload, Rest};
  102. <<F:1, _:3, O:4, 0:1, 127:7, L2:64, Payload:L2/binary, Rest/binary>> ->
  103. {F, O, Payload, Rest};
  104. <<_:1, _:3, _:4, 1:1, _/binary>> ->
  105. %% According o rfc6455 5.1 the server must not mask any frames.
  106. die(Socket, PPid, {1006, "Protocol error"}, normal);
  107. _ ->
  108. moredata
  109. end,
  110. case R of
  111. moredata ->
  112. State;
  113. _ -> do_recv2(State, R)
  114. end.
  115. do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) ->
  116. case R of
  117. {1, 1, Payload, Rest} ->
  118. PPid ! {rfc6455, recv, self(), Payload},
  119. State#state{data = Rest};
  120. {1, 2, Payload, Rest} ->
  121. PPid ! {rfc6455, recv_binary, self(), Payload},
  122. State#state{data = Rest};
  123. {1, 8, Payload, _Rest} ->
  124. WsReason = case Payload of
  125. <<WC:16, WR/binary>> -> {WC, WR};
  126. <<>> -> {1005, "No status received"}
  127. end,
  128. case Phase of
  129. open -> %% echo
  130. do_close(State, WsReason),
  131. gen_tcp:close(Socket);
  132. closing ->
  133. ok
  134. end,
  135. die(Socket, PPid, WsReason, normal);
  136. {_, _, _, _Rest2} ->
  137. io:format("Unknown frame type~n"),
  138. die(Socket, PPid, {1006, "Unknown frame type"}, normal)
  139. end.
  140. encode_frame(F, O, Payload) ->
  141. Mask = crypto:strong_rand_bytes(4),
  142. MaskedPayload = apply_mask(Mask, iolist_to_binary(Payload)),
  143. L = byte_size(MaskedPayload),
  144. IoData = case L of
  145. _ when L < 126 ->
  146. [<<F:1, 0:3, O:4, 1:1, L:7>>, Mask, MaskedPayload];
  147. _ when L < 65536 ->
  148. [<<F:1, 0:3, O:4, 1:1, 126:7, L:16>>, Mask, MaskedPayload];
  149. _ ->
  150. [<<F:1, 0:3, O:4, 1:1, 127:7, L:64>>, Mask, MaskedPayload]
  151. end,
  152. iolist_to_binary(IoData).
  153. do_send(State = #state{socket = Socket}, Payload) ->
  154. gen_tcp:send(Socket, encode_frame(1, 1, Payload)),
  155. State.
  156. do_send_binary(State = #state{socket = Socket}, Payload) ->
  157. gen_tcp:send(Socket, encode_frame(1, 2, Payload)),
  158. State.
  159. do_close(State = #state{socket = Socket}, {Code, Reason}) ->
  160. Payload = iolist_to_binary([<<Code:16>>, Reason]),
  161. gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
  162. State#state{phase = closing}.
  163. loop(State = #state{socket = Socket, ppid = PPid, data = Data,
  164. phase = Phase}) ->
  165. receive
  166. {tcp, Socket, Bin} ->
  167. State1 = State#state{data = iolist_to_binary([Data, Bin])},
  168. loop(do_recv(State1));
  169. {send, Payload} when Phase == open ->
  170. loop(do_send(State, Payload));
  171. {send_binary, Payload} when Phase == open ->
  172. loop(do_send_binary(State, Payload));
  173. {tcp_closed, Socket} ->
  174. die(Socket, PPid, {1006, "Connection closed abnormally"}, normal);
  175. {close, WsReason} when Phase == open ->
  176. loop(do_close(State, WsReason))
  177. end.
  178. die(Socket, PPid, WsReason, Reason) ->
  179. gen_tcp:shutdown(Socket, read_write),
  180. PPid ! {rfc6455, close, self(), WsReason},
  181. exit(Reason).
  182. %% --------------------------------------------------------------------------
  183. split(SubStr, Str, Limit) ->
  184. split(SubStr, Str, Limit, "").
  185. split(SubStr, Str, Limit, Default) ->
  186. Acc = split(SubStr, Str, Limit, [], Default),
  187. lists:reverse(Acc).
  188. split(_SubStr, Str, 0, Acc, _Default) -> [Str | Acc];
  189. split(SubStr, Str, Limit, Acc, Default) ->
  190. {L, R} = case string:str(Str, SubStr) of
  191. 0 -> {Str, Default};
  192. I -> {string:substr(Str, 1, I-1),
  193. string:substr(Str, I+length(SubStr))}
  194. end,
  195. split(SubStr, R, Limit-1, [L | Acc], Default).
  196. apply_mask(Mask, Data) when is_number(Mask) ->
  197. apply_mask(<<Mask:32>>, Data);
  198. apply_mask(<<0:32>>, Data) ->
  199. Data;
  200. apply_mask(Mask, Data) ->
  201. iolist_to_binary(lists:reverse(apply_mask2(Mask, Data, []))).
  202. apply_mask2(M = <<Mask:32>>, <<Data:32, Rest/binary>>, Acc) ->
  203. T = Data bxor Mask,
  204. apply_mask2(M, Rest, [<<T:32>> | Acc]);
  205. apply_mask2(<<Mask:24, _:8>>, <<Data:24>>, Acc) ->
  206. T = Data bxor Mask,
  207. [<<T:24>> | Acc];
  208. apply_mask2(<<Mask:16, _:16>>, <<Data:16>>, Acc) ->
  209. T = Data bxor Mask,
  210. [<<T:16>> | Acc];
  211. apply_mask2(<<Mask:8, _:24>>, <<Data:8>>, Acc) ->
  212. T = Data bxor Mask,
  213. [<<T:8>> | Acc];
  214. apply_mask2(_, <<>>, Acc) ->
  215. Acc.