Ery Lee 11 лет назад
Родитель
Сommit
dccbee2905

+ 16 - 1
apps/emqtt/src/emqtt_packet.erl

@@ -31,7 +31,7 @@
 
 -export([parse/2, serialise/1]).
 
--export([dump/1]).
+-export([validate/2, dump/1]).
 
 -define(MAX_LEN, 16#fffffff).
 -define(HIGHBIT, 2#10000000).
@@ -259,6 +259,21 @@ opt(X) when is_integer(X) -> X.
 protocol_name_approved(Ver, Name) ->
     lists:member({Ver, Name}, ?PROTOCOL_NAMES).
 
+validate(protocol, {Ver, Name}) ->
+    protocol_name_approved(Ver, Name);
+
+validate(clientid, {_, ClientId}) when ( size(ClientId) >= 1 ) 
+    andalso ( size(ClientId) >= ?MAX_CLIENTID_LEN ) ->
+    true;
+
+%% MQTT3.1.1 allow null clientId.
+validate(clientid, {?MQTT_PROTO_V311, ClientId}) 
+    when size(ClientId) =:= 0 ->
+    true;
+
+validate(clientid, {_, _}) -> 
+    false.
+
 dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) when
      Payload =:= undefined orelse Payload =:= <<>>  ->
     dump_header(Header, dump_variable(Variable));

+ 42 - 28
apps/emqtt/src/emqtt_protocol.erl

@@ -109,35 +109,31 @@ handle_packet(?CONNECT, Packet = #mqtt_packet {
                                     variable = #mqtt_packet_connect { 
                                          username   = Username, 
                                          password   = Password, 
-                                         proto_ver  = ProtoVersion, 
                                          clean_sess = CleanSess, 
                                          keep_alive = KeepAlive, 
                                          client_id  = ClientId } = Var }, 
-              State = #proto_state{ peer_name = PeerName} ) ->
+              State = #proto_state{ peer_name = PeerName } ) ->
     lager:info("RECV from ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
-    {ReturnCode, State1} =
-        case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
-              valid_client_id(ClientId)} of
-            {false, _} ->
-                {?CONNACK_PROTO_VER, State#proto_state{client_id = ClientId}};
-            {_, false} ->
-                {?CONNACK_INVALID_ID, State#proto_state{client_id = ClientId}};
-            _ ->
-                case emqtt_auth:check(Username, Password) of
-                    false ->
-                        lager:error("MQTT login failed - no credentials"),
-                        {?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}};
-                    true ->
-                        start_keepalive(KeepAlive),
-						emqtt_cm:register(ClientId, self()),
-						{?CONNACK_ACCEPT,
-						 State #proto_state{ will_msg   = make_will_msg(Var),
-											 client_id  = ClientId }}
-                end
-        end,
-		send_packet( #mqtt_packet { 
-                        header = #mqtt_packet_header { type = ?CONNACK }, 
-                        variable = #mqtt_packet_connack{ return_code = ReturnCode }}, State1 ),
+    {ReturnCode1, State1} =
+    case validate_connect(Var) of
+        ?CONNACK_ACCEPT ->
+            case emqtt_auth:check(Username, Password) of
+                true ->
+                    ClientId1 = clientid(ClientId, State), 
+                    start_keepalive(KeepAlive),
+                    emqtt_cm:register(ClientId1, self()),
+                    {?CONNACK_ACCEPT,
+                        State#proto_state{ will_msg = make_will_msg(Var), client_id  = ClientId1 }};
+                false ->
+                    lager:error("~s@~s: username '~s' login failed - no credentials", [ClientId, PeerName, Username]),
+                    {?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}}
+            end;
+        ReturnCode ->
+            {ReturnCode, State#proto_state{client_id = ClientId}}
+    end,
+    send_packet( #mqtt_packet {
+                    header = #mqtt_packet_header { type = ?CONNACK }, 
+                    variable = #mqtt_packet_connack{ return_code = ReturnCode1 }}, State1 ),
     {ok, State1};
 
 handle_packet(?PUBLISH, Packet = #mqtt_packet {
@@ -320,9 +316,22 @@ next_packet_id(State = #proto_state{ packet_id = 16#ffff }) ->
 next_packet_id(State = #proto_state{ packet_id = PacketId }) ->
     State #proto_state{ packet_id = PacketId + 1 }.
 
-valid_client_id(ClientId) ->
-    ClientIdLen = size(ClientId),
-    1 =< ClientIdLen andalso ClientIdLen =< ?MAX_CLIENTID_LEN.
+
+validate_connect( #mqtt_packet_connect { 
+                        proto_ver  = Ver, 
+                        proto_name = Name, 
+                        client_id = ClientId } ) -> 
+    case emqtt_packet:validate(protocol, {Ver, Name}) of
+        true -> 
+            case emqtt_packet:validate(clientid, {Ver, ClientId}) of
+                true -> 
+                    ?CONNACK_ACCEPT;
+                false -> 
+                    ?CONNACK_INVALID_ID
+            end;
+        false -> 
+            ?CONNACK_PROTO_VER
+    end.
 
 validate_packet(?PUBLISH, #mqtt_packet {
                             variable = #mqtt_packet_publish{
@@ -353,6 +362,11 @@ validate_packet(?SUBSCRIBE, #mqtt_packet{variable = #mqtt_packet_subscribe{topic
 validate_packet(_Type, _Frame) ->
 	ok.
 
+clientid(<<>>, #proto_state{peer_name = PeerName}) ->
+    <<"eMQTT/", (base64:encode(PeerName))/binary>>;
+
+clientid(ClientId, _State) -> ClientId.
+
 maybe_clean_sess(false, _Conn, _ClientId) ->
     % todo: establish subscription to deliver old unacknowledged messages
     ok.

+ 12 - 0
apps/emqtt/test/emqtt_packet_tests.erl

@@ -56,6 +56,18 @@ parse_connect_test() ->
                                                            clean_sess = true, 
                                                            keep_alive = 60 } }, <<>>}, parse(V311ConnBin, State)),
 
+    %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
+    V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
+    ?assertMatch({ok, #mqtt_packet{ 
+                         header = #mqtt_packet_header { type = ?CONNECT, 
+                                                        dup = false, 
+                                                        qos = 0, 
+                                                        retain = false}, 
+                         variable = #mqtt_packet_connect { proto_ver = 4, 
+                                                           proto_name = <<"MQTT">>, 
+                                                           client_id = <<>>,
+                                                           clean_sess = true, 
+                                                           keep_alive = 60 } }, <<>>}, parse(V311ConnWithoutClientId, State)),
     %%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
     ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
     ?assertMatch({ok, #mqtt_packet{