ws_client.erl 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. -module(ws_client).
  2. -export([
  3. start_link/0,
  4. start_link/1,
  5. send_binary/2,
  6. send_ping/2,
  7. recv/2,
  8. recv/1,
  9. stop/1
  10. ]).
  11. -export([
  12. init/2,
  13. websocket_handle/3,
  14. websocket_info/3,
  15. websocket_terminate/3
  16. ]).
  17. -record(state, {
  18. buffer = [] :: list(),
  19. waiting = undefined :: undefined | pid()
  20. }).
  21. start_link() ->
  22. start_link("ws://localhost:8083/mqtt").
  23. start_link(Url) ->
  24. websocket_client:start_link(Url, ?MODULE, [], [{extra_headers, [{"Sec-Websocket-Protocol", "mqtt"}]}]).
  25. stop(Pid) ->
  26. Pid ! stop.
  27. send_binary(Pid, Msg) ->
  28. websocket_client:cast(Pid, {binary, Msg}).
  29. send_ping(Pid, Msg) ->
  30. websocket_client:cast(Pid, {ping, Msg}).
  31. recv(Pid) ->
  32. recv(Pid, 5000).
  33. recv(Pid, Timeout) ->
  34. Pid ! {recv, self()},
  35. receive
  36. M -> M
  37. after
  38. Timeout -> error
  39. end.
  40. init(_, _WSReq) ->
  41. {ok, #state{}}.
  42. websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) ->
  43. lager:info("Client received frame~p", [Frame]),
  44. {ok, State#state{buffer = [Frame|Buffer]}};
  45. websocket_handle(Frame, _, State = #state{waiting = From}) ->
  46. lager:info("Client received frame~p", [Frame]),
  47. From ! Frame,
  48. {ok, State#state{waiting = undefined}}.
  49. websocket_info({send_text, Text}, WSReq, State) ->
  50. websocket_client:send({text, Text}, WSReq),
  51. {ok, State};
  52. websocket_info({recv, From}, _, State = #state{buffer = []}) ->
  53. {ok, State#state{waiting = From}};
  54. websocket_info({recv, From}, _, State = #state{buffer = [Top|Rest]}) ->
  55. From ! Top,
  56. {ok, State#state{buffer = Rest}};
  57. websocket_info(stop, _, State) ->
  58. {close, <<>>, State}.
  59. websocket_terminate(Close, _, State) ->
  60. io:format("Websocket closed with frame ~p and state ~p", [Close, State]),
  61. ok.