Browse Source

add mqtt frame

erylee 13 years ago
parent
commit
fcb5ac6df3
4 changed files with 395 additions and 8 deletions
  1. 127 8
      src/emqtt_client.erl
  2. 4 0
      src/emqtt_networking.erl
  3. 228 0
      src/emqtt_processor.erl
  4. 36 0
      src/emqtt_util.erl

+ 127 - 8
src/emqtt_client.erl

@@ -13,27 +13,76 @@
 
 -include("emqtt.hrl").
 
-go(Pid, Sock) ->
-	gen_server2:call(Pid, {go, Sock}).
+-define(CLIENT_ID_MAXLEN, 23).
+
+-record(state, 	{ socket,
+				  conn_name,
+				  await_recv,
+				  connection_state,
+				  conserve,
+				  parse_state,
+				  proc_state }).
+
+-record(proc_state, { socket,
+                      subscriptions,
+                      consumer_tags,
+                      unacked_pubs,
+                      awaiting_ack,
+                      awaiting_seqno,
+                      message_id,
+                      client_id,
+                      clean_sess,
+                      will_msg,
+                      channels,
+                      connection,
+                      exchange }).
+
 
 start_link() ->
     gen_server2:start_link(?MODULE, [], []).
 
+go(Pid, Sock) ->
+	gen_server2:call(Pid, {go, Sock}).
+
 init([]) ->
     {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}.
 
-handle_call({go, Sock}, _From, State) ->
+handle_call({go, Sock}, _From, _State) ->
     process_flag(trap_exit, true),
     ok = throw_on_error(
            inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end),
     {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
     error_logger:info_msg("accepting MQTT connection (~s)~n", [ConnStr]),
-	%inet:setopts(Sock, [{active, once}]),
-	{reply, ok, State}.
+    control_throttle(
+       #state{ socket           = Sock,
+               conn_name        = ConnStr,
+               await_recv       = false,
+               connection_state = running,
+               conserve         = false,
+               parse_state      = emqtt_frame:initial_state(),
+               proc_state       = emqtt_processor:initial_state(Sock) }).
 
 handle_cast(Msg, State) ->
 	{stop, {badmsg, Msg}, State}.
 
+handle_info({route, Msg}, State) ->
+	emqtt_processor:send_client(Msg),
+    {noreply, State};
+
+handle_info({inet_reply, _Ref, ok}, State) ->
+    {noreply, State, hibernate};
+
+handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock }=State) ->
+    process_received_bytes(
+      Data, control_throttle(State #state{ await_recv = false }));
+
+handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
+    network_error(Reason, State);
+
+handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
+	error_logger:info_msg("sock error: ~p~n", [Reason]), 
+	{noreply, State};
+
 handle_info(Info, State) ->
 	{stop, {badinfo, Info}, State}.
 
@@ -45,9 +94,79 @@ code_change(_OldVsn, State, _Extra) ->
 	
 throw_on_error(E, Thunk) ->
     case Thunk() of
-        {error, Reason} -> throw({E, Reason});
-        {ok, Res}       -> Res;
-        Res             -> Res
+	{error, Reason} -> throw({E, Reason});
+	{ok, Res}       -> Res;
+	Res             -> Res
+    end.
+
+async_recv(Sock, Length, infinity) when is_port(Sock) ->
+    prim_inet:async_recv(Sock, Length, -1);
+
+async_recv(Sock, Length, Timeout) when is_port(Sock) ->
+    prim_inet:async_recv(Sock, Length, Timeout).
+
+process_received_bytes(<<>>, State) ->
+    {noreply, State};
+process_received_bytes(Bytes,
+                       State = #state{ parse_state = ParseState,
+                                       proc_state  = ProcState,
+                                       conn_name   = ConnStr }) ->
+    case
+        emqtt_frame:parse(Bytes, ParseState) of
+            {more, ParseState1} ->
+                {noreply,
+                 control_throttle( State #state{ parse_state = ParseState1 }),
+                 hibernate};
+            {ok, Frame, Rest} ->
+                case emqtt_processor:process_frame(Frame, ProcState) of
+                    {ok, ProcState1} ->
+                        PS = emqtt_frame:initial_state(),
+                        process_received_bytes(
+                          Rest,
+                          State #state{ parse_state = PS,
+                                        proc_state = ProcState1 });
+                    {err, Reason, ProcState1} ->
+                        error_logger:info_msg("MQTT protocol error ~p for connection ~p~n",
+                                  [Reason, ConnStr]),
+                        stop({shutdown, Reason}, pstate(State, ProcState1));
+                    {stop, ProcState1} ->
+                        stop(normal, pstate(State, ProcState1))
+                end;
+            {error, Error} ->
+                error_logger:erro_msg("MQTT detected framing error ~p for connection ~p~n",
+                           [ConnStr, Error]),
+                stop({shutdown, Error}, State)
+    end.
+
+pstate(State = #state {}, PState = #proc_state{}) ->
+    State #state{ proc_state = PState }.
+
+%%----------------------------------------------------------------------------
+network_error(_Reason,
+              State = #state{ conn_name  = ConnStr,
+                              proc_state = PState }) ->
+    error_logger:info_msg("MQTT detected network error for ~p~n", [ConnStr]),
+    emqtt_processor:send_will(PState),
+    % todo: flush channel after publish
+    stop({shutdown, conn_closed}, State).
+
+run_socket(State = #state{ connection_state = blocked }) ->
+    State;
+run_socket(State = #state{ await_recv = true }) ->
+    State;
+run_socket(State = #state{ socket = Sock }) ->
+    async_recv(Sock, 0, infinity),
+    State#state{ await_recv = true }.
+
+control_throttle(State = #state{ connection_state = Flow,
+                                 conserve         = Conserve }) ->
+    case {Flow, Conserve orelse credit_flow:blocked()} of
+        {running,   true} -> State #state{ connection_state = blocked };
+        {blocked,  false} -> run_socket(State #state{
+                                                connection_state = running });
+        {_,            _} -> run_socket(State)
     end.
 
+stop(Reason, State ) ->
+    {stop, Reason, State}.
 

+ 4 - 0
src/emqtt_networking.erl

@@ -89,6 +89,10 @@ start_client(Sock) ->
     {ok, Client} = supervisor:start_child(emqtt_client_sup, []),
 	ok = gen_tcp:controlling_process(Sock, Client),
 	emqtt_client:go(Client, Sock),
+
+    %% see comment in rabbit_networking:start_client/2
+    gen_event:which_handlers(error_logger),
+
 	Client.
 
 %%--------------------------------------------------------------------

+ 228 - 0
src/emqtt_processor.erl

@@ -0,0 +1,228 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
+%%
+
+%%Don't send amqp message
+
+-module(emqtt_processor).
+
+-export([info/2, initial_state/1,
+         process_frame/2, send_will/1]).
+
+-include("emqtt.hrl").
+-include("emqtt_frame.hrl").
+
+-record(proc_state, { socket,
+                      subscriptions,
+                      consumer_tags,
+                      unacked_pubs,
+                      awaiting_ack,
+                      awaiting_seqno,
+                      message_id,
+                      client_id,
+                      clean_sess,
+                      will_msg,
+                      channels,
+                      connection,
+                      exchange }).
+
+-define(FRAME_TYPE(Frame, Type),
+        Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
+
+initial_state(Socket) ->
+    #proc_state{ unacked_pubs  = gb_trees:empty(),
+                 awaiting_ack  = gb_trees:empty(),
+                 message_id    = 1,
+                 subscriptions = dict:new(),
+                 consumer_tags = {undefined, undefined},
+                 channels      = {undefined, undefined},
+                 exchange      = emqtt_util:env(exchange),
+                 socket        = Socket }.
+
+info(client_id, #proc_state{ client_id = ClientId }) -> ClientId.
+
+process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
+              PState = #proc_state{ connection = undefined } )
+  when Type =/= ?CONNECT ->
+    {err, connect_expected, PState};
+
+process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
+              PState ) ->
+    process_request(Type, Frame, PState).
+
+process_request(?CONNECT,
+                #mqtt_frame{ variable = #mqtt_frame_connect{
+                                          username   = Username,
+                                          password   = Password,
+                                          proto_ver  = ProtoVersion,
+                                          clean_sess = CleanSess,
+                                          client_id  = ClientId } = Var}, PState) ->
+    {ReturnCode, PState1} =
+        case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
+              emqtt_util:valid_client_id(ClientId)} of
+            {false, _} ->
+                {?CONNACK_PROTO_VER, PState};
+            {_, false} ->
+                {?CONNACK_INVALID_ID, PState};
+            _ ->
+                case creds(Username, Password) of
+                    nocreds ->
+                        error_logger:error_msg("MQTT login failed - no credentials~n"),
+                        {?CONNACK_CREDENTIALS, PState};
+                    {UserBin, PassBin} ->
+						{?CONNACK_ACCEPT,
+						 PState #proc_state{ will_msg   = make_will_msg(Var),
+											 client_id  = ClientId }}
+                end
+        end,
+		send_client(#mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?CONNACK},
+								 variable = #mqtt_frame_connack{
+                                         return_code = ReturnCode }}, PState1),
+    {ok, PState1};
+
+process_request(?PUBACK,
+                #mqtt_frame{
+                  variable = #mqtt_frame_publish{ message_id = MessageId }},
+                #proc_state{awaiting_ack = Awaiting } = PState) ->
+    {ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}};
+
+process_request(?PUBLISH,
+                #mqtt_frame{
+                  fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) ->
+    {err, qos2_not_supported, PState};
+
+process_request(?PUBLISH,
+                #mqtt_frame{
+                  fixed = #mqtt_frame_fixed{ qos    = Qos,
+                                             retain = Retain,
+                                             dup    = Dup },
+                  variable = #mqtt_frame_publish{ topic_name = Topic,
+                                                  message_id = MessageId },
+                  payload = Payload }, PState) ->
+	Msg =  #mqtt_msg{ retain     = Retain,
+					 qos        = Qos,
+					 topic      = Topic,
+					 dup        = Dup,
+					 message_id = MessageId,
+					 payload    = Payload },
+	emqtt_router:route(Msg),
+
+	send_client(
+	  #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK },
+				   variable = #mqtt_frame_publish{ message_id = MessageId }},
+              PState),
+    {ok, PState};
+
+
+process_request(?SUBSCRIBE,
+                #mqtt_frame{
+                  variable = #mqtt_frame_subscribe{ message_id  = MessageId,
+                                                    topic_table = Topics },
+                  payload = undefined },
+                #proc_state{} = PState0) ->
+    QosResponse =
+	lists:foldl(fun (#mqtt_topic{ name = TopicName,
+								   qos  = Qos }, QosList) ->
+				   SupportedQos = supported_subs_qos(Qos),
+				   [SupportedQos | QosList]
+			   end, [], Topics),
+
+	[emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics],
+
+	[emqtt_router:insert(#subscriber{topic=Name, pid=self()}) 
+				|| #mqtt_topic{name=Name} <- Topics],
+	
+    send_client(#mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?SUBACK },
+                             variable = #mqtt_frame_suback{
+                                         message_id = MessageId,
+                                         qos_table  = QosResponse }}, PState0),
+
+    {ok, PState0};
+
+process_request(?UNSUBSCRIBE,
+                #mqtt_frame{
+                  variable = #mqtt_frame_subscribe{ message_id  = MessageId,
+                                                    topic_table = Topics },
+                  payload = undefined }, #proc_state{ client_id     = ClientId,
+                                                      subscriptions = Subs0} = PState) ->
+
+	
+	[emqtt_router:delete(#subscriber{topic=Name, pid=self()}) 
+		|| #mqtt_topic{name=Name} <- Topics],
+	
+    send_client(#mqtt_frame{ fixed    = #mqtt_frame_fixed { type       = ?UNSUBACK },
+                             variable = #mqtt_frame_suback{ message_id = MessageId }},
+                PState),
+
+    {ok, PState #proc_state{ subscriptions = Subs0 }};
+
+process_request(?PINGREQ, #mqtt_frame{}, PState) ->
+    send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
+                PState),
+    {ok, PState};
+
+process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
+    {stop, PState}.
+
+
+
+next_msg_id(PState = #proc_state{ message_id = 16#ffff }) ->
+    PState #proc_state{ message_id = 1 };
+next_msg_id(PState = #proc_state{ message_id = MsgId }) ->
+    PState #proc_state{ message_id = MsgId + 1 }.
+
+%% decide at which qos level to deliver based on subscription
+%% and the message publish qos level. non-MQTT publishes are
+%% assumed to be qos 1, regardless of delivery_mode.
+delivery_qos(Tag, _Headers,  #proc_state{ consumer_tags = {Tag, _} }) ->
+    {?QOS_0, ?QOS_0};
+delivery_qos(Tag, Headers,   #proc_state{ consumer_tags = {_, Tag} }) ->
+    case emqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
+        {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
+        undefined   -> {?QOS_1, ?QOS_1}
+    end.
+
+maybe_clean_sess(false, _Conn, _ClientId) ->
+    % todo: establish subscription to deliver old unacknowledged messages
+    ok.
+
+%%----------------------------------------------------------------------------
+
+make_will_msg(#mqtt_frame_connect{ will_flag   = false }) ->
+    undefined;
+make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
+                                   will_qos    = Qos,
+                                   will_topic  = Topic,
+                                   will_msg    = Msg }) ->
+    #mqtt_msg{ retain  = Retain,
+               qos     = Qos,
+               topic   = Topic,
+               dup     = false,
+               payload = Msg }.
+
+creds(User, Pass) ->
+	{User, Pass}.
+
+supported_subs_qos(?QOS_0) -> ?QOS_0;
+supported_subs_qos(?QOS_1) -> ?QOS_1;
+supported_subs_qos(?QOS_2) -> ?QOS_1.
+
+send_will(PState = #proc_state{ will_msg = WillMsg }) ->
+	error_logger:info_msg("willmsg: ~p~n", [WillMsg]).
+
+
+send_client(Frame, #proc_state{ socket = Sock }) ->
+    erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
+

+ 36 - 0
src/emqtt_util.erl

@@ -0,0 +1,36 @@
+-module(emqtt_util).
+
+-include("emqtt.hrl").
+
+-define(CLIENT_ID_MAXLEN, 23).
+
+-compile(export_all).
+
+subcription_queue_name(ClientId) ->
+    Base = "mqtt-subscription-" ++ ClientId ++ "qos",
+    {list_to_binary(Base ++ "0"), list_to_binary(Base ++ "1")}.
+
+%% amqp mqtt descr
+%% *    +    match one topic level
+%% #    #    match multiple topic levels
+%% .    /    topic level separator
+mqtt2amqp(Topic) ->
+    erlang:iolist_to_binary(
+      re:replace(re:replace(Topic, "/", ".", [global]),
+                 "[\+]", "*", [global])).
+
+amqp2mqtt(Topic) ->
+    erlang:iolist_to_binary(
+      re:replace(re:replace(Topic, "[\*]", "+", [global]),
+                 "[\.]", "/", [global])).
+
+valid_client_id(ClientId) ->
+    ClientIdLen = length(ClientId),
+    1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
+
+env(Key) ->
+    case application:get_env(emqtt, Key) of
+        {ok, Val} -> Val;
+        undefined -> undefined
+    end.
+