| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- -module(ws_client).
- -export([
- start_link/0,
- start_link/1,
- send_binary/2,
- send_ping/2,
- recv/2,
- recv/1,
- stop/1
- ]).
- -export([
- init/2,
- websocket_handle/3,
- websocket_info/3,
- websocket_terminate/3
- ]).
- -record(state, {
- buffer = [] :: list(),
- waiting = undefined :: undefined | pid()
- }).
- start_link() ->
- start_link("ws://localhost:8083/mqtt").
- start_link(Url) ->
- websocket_client:start_link(Url, ?MODULE, [], [{extra_headers, [{"Sec-Websocket-Protocol", "mqtt"}]}]).
- stop(Pid) ->
- Pid ! stop.
- send_binary(Pid, Msg) ->
- websocket_client:cast(Pid, {binary, Msg}).
- send_ping(Pid, Msg) ->
- websocket_client:cast(Pid, {ping, Msg}).
- recv(Pid) ->
- recv(Pid, 5000).
- recv(Pid, Timeout) ->
- Pid ! {recv, self()},
- receive
- M -> M
- after
- Timeout -> error
- end.
- init(_, _WSReq) ->
- {ok, #state{}}.
- websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) ->
- lager:info("Client received frame~p", [Frame]),
- {ok, State#state{buffer = [Frame|Buffer]}};
- websocket_handle(Frame, _, State = #state{waiting = From}) ->
- lager:info("Client received frame~p", [Frame]),
- From ! Frame,
- {ok, State#state{waiting = undefined}}.
- websocket_info({send_text, Text}, WSReq, State) ->
- websocket_client:send({text, Text}, WSReq),
- {ok, State};
- websocket_info({recv, From}, _, State = #state{buffer = []}) ->
- {ok, State#state{waiting = From}};
- websocket_info({recv, From}, _, State = #state{buffer = [Top|Rest]}) ->
- From ! Top,
- {ok, State#state{buffer = Rest}};
- websocket_info(stop, _, State) ->
- {close, <<>>, State}.
- websocket_terminate(Close, _, State) ->
- io:format("Websocket closed with frame ~p and state ~p", [Close, State]),
- ok.
|