|
|
@@ -31,20 +31,53 @@
|
|
|
lager:Level("WsClient(~s): " ++ Format,
|
|
|
[esockd_net:format(State#wsocket_state.peername) | Args])).
|
|
|
|
|
|
+
|
|
|
+handle_request(Req) ->
|
|
|
+ handle_request(Req:get(method), Req:get(path), Req).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% Handle WebSocket Request
|
|
|
+%% MQTT Over WebSocket
|
|
|
%%--------------------------------------------------------------------
|
|
|
+handle_request('GET', "/mqtt", Req) ->
|
|
|
+ lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
|
|
|
+ Upgrade = Req:get_header_value("Upgrade"),
|
|
|
+ Proto = check_protocol_header(Req),
|
|
|
+ case {is_websocket(Upgrade), Proto} of
|
|
|
+ {true, "mqtt" ++ _Vsn} ->
|
|
|
+ {ok, ProtoEnv} = emqttd:env(protocol),
|
|
|
+ PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
|
|
|
+ Parser = emqttd_parser:initial_state(PacketSize),
|
|
|
+ %% Upgrade WebSocket.
|
|
|
+ {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
|
|
|
+ {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
|
|
|
+ ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
|
|
|
+ max_packet_size = PacketSize, client_pid = ClientPid});
|
|
|
+ {false, _} ->
|
|
|
+ lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
|
|
|
+ Req:respond({400, [], <<"Bad Request">>});
|
|
|
+ {_, Proto} ->
|
|
|
+ lager:error("WebSocket with error Protocol: ~s", [Proto]),
|
|
|
+ Req:respond({400, [], <<"Bad WebSocket Protocol">>})
|
|
|
+ end;
|
|
|
|
|
|
-%% @doc Handle WebSocket Request.
|
|
|
-handle_request(Req) ->
|
|
|
- {ok, ProtoEnv} = emqttd:env(protocol),
|
|
|
- PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
|
|
|
- Parser = emqttd_parser:initial_state(PacketSize),
|
|
|
- %% Upgrade WebSocket.
|
|
|
- {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
|
|
|
- {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
|
|
|
- ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
|
|
|
- max_packet_size = PacketSize, client_pid = ClientPid}).
|
|
|
+handle_request(Method, Path, Req) ->
|
|
|
+ lager:error("Unexpected WS Request: ~s ~s", [Method, Path]),
|
|
|
+ Req:not_found().
|
|
|
+
|
|
|
+is_websocket(Upgrade) ->
|
|
|
+ Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
|
|
|
+
|
|
|
+check_protocol_header(Req) ->
|
|
|
+ case emqttd:env(websocket_protocol_header, false) of
|
|
|
+ true -> get_protocol_header(Req);
|
|
|
+ false -> "mqtt-v3.1.1"
|
|
|
+ end.
|
|
|
+
|
|
|
+get_protocol_header(Req) ->
|
|
|
+ case Req:get_header_value("EMQ-WebSocket-Protocol") of
|
|
|
+ undefined -> Req:get_header_value("Sec-WebSocket-Protocol");
|
|
|
+ Proto -> Proto
|
|
|
+ end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Receive Loop
|