emqttd_ws_client.erl 7.6 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_ws_client).
  17. -behaviour(gen_server).
  18. -include("emqttd.hrl").
  19. -include("emqttd_protocol.hrl").
  20. %% API Exports
  21. -export([start_link/4, session/1, info/1, kick/1]).
  22. %% SUB/UNSUB Asynchronously
  23. -export([subscribe/2, unsubscribe/2]).
  24. %% gen_server Function Exports
  25. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  26. terminate/2, code_change/3]).
  27. %% WebSocket Client State
  28. -record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive}).
  29. -define(WSLOG(Level, Peer, Format, Args),
  30. lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
  31. %% @doc Start WebSocket Client.
  32. start_link(MqttEnv, WsPid, Req, ReplyChannel) ->
  33. gen_server:start_link(?MODULE, [MqttEnv, WsPid, Req, ReplyChannel], []).
  34. session(CPid) ->
  35. gen_server:call(CPid, session, infinity).
  36. info(CPid) ->
  37. gen_server:call(CPid, info, infinity).
  38. kick(CPid) ->
  39. gen_server:call(CPid, kick).
  40. subscribe(CPid, TopicTable) ->
  41. gen_server:cast(CPid, {subscribe, TopicTable}).
  42. unsubscribe(CPid, Topics) ->
  43. gen_server:cast(CPid, {unsubscribe, Topics}).
  44. %%--------------------------------------------------------------------
  45. %% gen_server Callbacks
  46. %%--------------------------------------------------------------------
  47. init([MqttEnv, WsPid, Req, ReplyChannel]) ->
  48. true = link(WsPid),
  49. {ok, Peername} = Req:get(peername),
  50. Headers = mochiweb_headers:to_list(
  51. mochiweb_request:get(headers, Req)),
  52. PktOpts = proplists:get_value(packet, MqttEnv),
  53. SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
  54. ProtoState = emqttd_protocol:init(Peername, SendFun,
  55. [{ws_initial_headers, Headers} | PktOpts]),
  56. {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer),
  57. connection = Req:get(connection),
  58. proto_state = ProtoState}, idle_timeout(MqttEnv)}.
  59. idle_timeout(MqttEnv) ->
  60. ClientOpts = proplists:get_value(client, MqttEnv),
  61. timer:seconds(proplists:get_value(idle_timeout, ClientOpts, 10)).
  62. handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
  63. {reply, emqttd_protocol:session(ProtoState), State};
  64. handle_call(info, _From, State = #wsclient_state{peer = Peer,
  65. proto_state = ProtoState}) ->
  66. ProtoInfo = emqttd_protocol:info(ProtoState),
  67. {reply, [{websocket, true}, {peer, Peer}| ProtoInfo], State};
  68. handle_call(kick, _From, State) ->
  69. {stop, {shutdown, kick}, ok, State};
  70. handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
  71. ?WSLOG(critical, Peer, "Unexpected request: ~p", [Req]),
  72. {reply, {error, unsupported_request}, State}.
  73. handle_cast({subscribe, TopicTable}, State) ->
  74. with_session(fun(SessPid) ->
  75. emqttd_session:subscribe(SessPid, TopicTable)
  76. end, State);
  77. handle_cast({unsubscribe, Topics}, State) ->
  78. with_session(fun(SessPid) ->
  79. emqttd_session:unsubscribe(SessPid, Topics)
  80. end, State);
  81. handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
  82. case emqttd_protocol:received(Packet, ProtoState) of
  83. {ok, ProtoState1} ->
  84. noreply(State#wsclient_state{proto_state = ProtoState1});
  85. {error, Error} ->
  86. ?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
  87. shutdown(Error, State);
  88. {error, Error, ProtoState1} ->
  89. shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
  90. {stop, Reason, ProtoState1} ->
  91. stop(Reason, State#wsclient_state{proto_state = ProtoState1})
  92. end;
  93. handle_cast(Msg, State = #wsclient_state{peer = Peer}) ->
  94. ?WSLOG(critical, Peer, "Unexpected msg: ~p", [Msg]),
  95. noreply(State).
  96. handle_info(timeout, State) ->
  97. shutdown(idle_timeout, State);
  98. handle_info({suback, PacketId, GrantedQos}, State) ->
  99. with_proto_state(fun(ProtoState) ->
  100. Packet = ?SUBACK_PACKET(PacketId, GrantedQos),
  101. emqttd_protocol:send(Packet, ProtoState)
  102. end, State);
  103. handle_info({deliver, Message}, State) ->
  104. with_proto_state(fun(ProtoState) ->
  105. emqttd_protocol:send(Message, ProtoState)
  106. end, State);
  107. handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
  108. with_proto_state(fun(ProtoState) ->
  109. emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState)
  110. end, State);
  111. handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) ->
  112. ?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
  113. shutdown(conflict, State);
  114. handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) ->
  115. ?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]),
  116. StatFun = fun() ->
  117. case Conn:getstat([recv_oct]) of
  118. {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
  119. {error, Error} -> {error, Error}
  120. end
  121. end,
  122. KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
  123. noreply(State#wsclient_state{keepalive = KeepAlive});
  124. handle_info({keepalive, check}, State = #wsclient_state{peer = Peer,
  125. keepalive = KeepAlive}) ->
  126. case emqttd_keepalive:check(KeepAlive) of
  127. {ok, KeepAlive1} ->
  128. noreply(State#wsclient_state{keepalive = KeepAlive1});
  129. {error, timeout} ->
  130. ?WSLOG(debug, Peer, "Keepalive Timeout!", []),
  131. shutdown(keepalive_timeout, State);
  132. {error, Error} ->
  133. ?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]),
  134. shutdown(keepalive_error, State)
  135. end;
  136. handle_info(Info, State = #wsclient_state{peer = Peer}) ->
  137. ?WSLOG(critical, Peer, "Unexpected Info: ~p", [Info]),
  138. noreply(State).
  139. terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
  140. emqttd_keepalive:cancel(KeepAlive),
  141. case Reason of
  142. {shutdown, Error} ->
  143. emqttd_protocol:shutdown(Error, ProtoState);
  144. _ ->
  145. emqttd_protocol:shutdown(Reason, ProtoState)
  146. end.
  147. code_change(_OldVsn, State, _Extra) ->
  148. {ok, State}.
  149. %%--------------------------------------------------------------------
  150. %% Internal functions
  151. %%--------------------------------------------------------------------
  152. with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
  153. {ok, ProtoState1} = Fun(ProtoState),
  154. noreply(State#wsclient_state{proto_state = ProtoState1}).
  155. with_session(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
  156. Fun(emqttd_protocol:session(ProtoState)), noreply(State).
  157. noreply(State) ->
  158. {noreply, State, hibernate}.
  159. shutdown(Reason, State) ->
  160. stop({shutdown, Reason}, State).
  161. stop(Reason, State ) ->
  162. {stop, Reason, State}.