HuangDan пре 7 година
родитељ
комит
18eee0f1b0
5 измењених фајлова са 136 додато и 82 уклоњено
  1. 4 4
      etc/emqx.conf
  2. 1 2
      priv/emqx.schema
  3. 95 70
      test/emqx_SUITE.erl
  4. 5 2
      test/emqx_client_SUITE.erl
  5. 31 4
      test/emqx_ct_broker_helpers.erl

Разлика између датотеке није приказан због своје велике величине
+ 4 - 4
etc/emqx.conf


+ 1 - 2
priv/emqx.schema

@@ -860,8 +860,7 @@ end}.
 ]}.
 
 {mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [
-  {default, false},
-  {datatype, {enum, [true, false]}}
+  {datatype, {enum, [cn, dn]}}
 ]}.
 
 {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [

+ 95 - 70
test/emqx_SUITE.erl

@@ -25,21 +25,45 @@
 
 -include_lib("common_test/include/ct.hrl").
 
+-include("emqx_mqtt.hrl").
+
+-record(ssl_socket, {tcp, ssl}).
+
+-type(socket() :: inet:socket() | #ssl_socket{}).
+
 -define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
                                 client_id = <<"mqtt_client">>,
                                 username  = <<"admin">>,
                                 password  = <<"public">>})).
+
+-define(CLIENT2, ?CONNECT_PACKET(#mqtt_packet_connect{
+                                username  = <<"admin">>,
+                                clean_start = false,
+                                password  = <<"public">>})).
+
+-define(SUBCODE, [0]).
+
+-define(PACKETID, 1).
+
+-define(PUBQOS, 1).
+
+-define(SUBPACKET, ?SUBSCRIBE_PACKET(?PACKETID, [{<<"sub/topic">>, ?DEFAULT_SUBOPTS}])).
+
+-define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)).
+
 all() ->
-    [{group, connect},
-     {group, cleanSession}].
+    [{group, connect}%,
+    % {group, cleanSession}
+    ].
 
 groups() ->
     [{connect, [non_parallel_tests],
-      [mqtt_connect,
-%       mqtt_connect_with_tcp,
-       mqtt_connect_with_ssl_oneway,
-       mqtt_connect_with_ssl_twoway%,
-     %  mqtt_connect_with_ws
+      [
+      mqtt_connect,
+      mqtt_connect_with_tcp,
+      mqtt_connect_with_ssl_oneway,
+      mqtt_connect_with_ssl_twoway,
+      mqtt_connect_with_ws
       ]},
      {cleanSession, [sequence],
       [cleanSession_validate]
@@ -48,7 +72,6 @@ groups() ->
 
 init_per_suite(Config) ->
     emqx_ct_broker_helpers:run_setup_steps(),
-   % ct:log("Apps:~p", [Apps]),
     Config.
 
 end_per_suite(_Config) ->
@@ -65,78 +88,79 @@ mqtt_connect(_) ->
     ?assertEqual(<<32,2,0,0>>, connect_broker_(<<16,12,0,4,77,81,84,84,4,2,0,90,0,0>>, 4)).
 
 connect_broker_(Packet, RecvSize) ->
-    {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]),
-    gen_tcp:send(Sock, Packet),
+    {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
+    emqx_client_sock:send(Sock, Packet),
     {ok, Data} = gen_tcp:recv(Sock, RecvSize, 3000),
-    gen_tcp:close(Sock),
+    emqx_client_sock:close(Sock),
     Data.
 
-
-%% mqtt_connect_with_tcp(_) ->
-%%     %% Issue #599
-%%     %% Empty clientId and clean_session = false
-%%     {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]),
-%%     Packet = raw_send_serialise(?CLIENT),
-%%     gen_tcp:send(Sock, Packet),
-%%     {ok, Data} = gen_tcp:recv(Sock, 0),
-%% %    {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
-%%     gen_tcp:close(Sock).
+mqtt_connect_with_tcp(_) ->
+    %% Issue #599
+    %% Empty clientId and clean_session = false
+    {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
+    Packet = raw_send_serialise(?CLIENT2),
+    emqx_client_sock:send(Sock, Packet),
+    {ok, Data} = gen_tcp:recv(Sock, 0),
+    {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data),
+    emqx_client_sock:close(Sock).
 
 mqtt_connect_with_ssl_oneway(_) ->
-    emqx:stop(),
+    emqx:shutdown(),
     emqx_ct_broker_helpers:change_opts(ssl_oneway),
     emqx:start(),
-    timer:sleep(5000),
-    {ok, SslOneWay} = emqttc:start_link([{host, "localhost"},
-                                         {port, 8883},
-                                         {logger, debug},
-                                         {client_id, <<"ssloneway">>}, ssl]),
-    timer:sleep(100),
-    emqttc:subscribe(SslOneWay, <<"topic">>, qos1),
-    {ok, Pub} = emqttc:start_link([{host, "localhost"},
-                                   {client_id, <<"pub">>}]),
-    emqttc:publish(Pub, <<"topic">>, <<"SSL oneWay test">>, [{qos, 1}]),
-    timer:sleep(100),
-    receive {publish, _Topic, RM} ->
-        ?assertEqual(<<"SSL oneWay test">>, RM)
-    after 1000 -> false
-    end,
-    timer:sleep(100),
-    emqttc:disconnect(SslOneWay),
-    emqttc:disconnect(Pub).
+    ClientSsl = emqx_ct_broker_helpers:client_ssl(),
+    {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}
+    = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
+%%     Packet = raw_send_serialise(?CLIENT),
+%%     ssl:send(SslSock, Packet),
+%%     receive Data  ->
+%%         ct:log("Data:~p~n", [Data])
+%%     after 30000 ->
+%%               ok
+%%     end,
+    ssl:close(SslSock).
 
 mqtt_connect_with_ssl_twoway(_Config) ->
-    emqx:stop(),
+    emqx:shutdown(),
     emqx_ct_broker_helpers:change_opts(ssl_twoway),
     emqx:start(),
-    timer:sleep(3000),
-    ClientSSl = emqx_ct_broker_helpers:client_ssl(),
-    {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"},
-                                         {port, 8883},
-                                         {client_id, <<"ssltwoway">>},
-                                         {ssl, ClientSSl}]),
-    {ok, Sub} = emqttc:start_link([{host, "localhost"},
-                                   {client_id, <<"sub">>}]),
-    emqttc:subscribe(Sub, <<"topic">>, qos1),
-    emqttc:publish(SslTwoWay, <<"topic">>, <<"ssl client pub message">>, [{qos, 1}]),
-    timer:sleep(10),
-    receive {publish, _Topic, RM} ->
-        ?assertEqual(<<"ssl client pub message">>, RM)
-    after 1000 -> false
+    ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(),
+    {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock}
+    = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
+    Packet = raw_send_serialise(?CLIENT),
+    emqx_client_sock:setopts(Sock, [{active, once}]),
+    emqx_client_sock:send(Sock, Packet),
+    timer:sleep(500),
+    receive {ssl, _, Data}->
+        {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data)
+    after 1000 ->
+        ok
     end,
-    emqttc:disconnect(SslTwoWay),
-    emqttc:disconnect(Sub).
-
-
-%% mqtt_connect_with_ws(_Config) ->
-%%     WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
-%%     {ok, _} = rfc6455_client:open(WS),
-%%     Packet = raw_send_serialise(?CLIENT),
-%%     ok = rfc6455_client:send_binary(WS, Packet),
-%%     {binary, P} = rfc6455_client:recv(WS),
-%% %    {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P),
-%%     {close, _} = rfc6455_client:close(WS),
-%%     ok.
+    emqx_client_sock:close(Sock).
+
+mqtt_connect_with_ws(_Config) ->
+    WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
+    {ok, _} = rfc6455_client:open(WS),
+
+    %% Connect Packet
+    Packet = raw_send_serialise(?CLIENT),
+    ok = rfc6455_client:send_binary(WS, Packet),
+    {binary, CONACK} = rfc6455_client:recv(WS),
+    {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK),
+
+    %% Sub Packet
+    SubPacket = raw_send_serialise(?SUBPACKET),
+    rfc6455_client:send_binary(WS, SubPacket),
+    {binary, SubAck} = rfc6455_client:recv(WS),
+    {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck),
+
+    %% Pub Packet QoS 1
+    PubPacket = raw_send_serialise(?PUBPACKET),
+    rfc6455_client:send_binary(WS, PubPacket),
+    {binary, PubAck} = rfc6455_client:recv(WS),
+    {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck),
+    {close, _} = rfc6455_client:close(WS),
+    ok.
 
 cleanSession_validate(_) ->
     {ok, C1} = emqttc:start_link([{host, "localhost"},
@@ -165,8 +189,9 @@ cleanSession_validate(_) ->
     emqttc:disconnect(C11).
 
 raw_send_serialise(Packet) ->
-    emqttc_serialiser:serialise(Packet).
+    emqx_frame:serialize(Packet).
 
 raw_recv_pase(P) ->
-    emqttc_parser:parse(P, emqttc_parser:new()).
+    emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
+                                 version         => ?MQTT_PROTO_V4} }).
 

+ 5 - 2
test/emqx_client_SUITE.erl

@@ -21,9 +21,9 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
-all() -> [].
+all() -> [{group, connect}].
 
-groups() -> [].
+groups() -> [{connect, [start]}].
 
 init_per_suite(Config) ->
     Config.
@@ -37,3 +37,6 @@ init_per_group(_Group, Config) ->
 end_per_group(_Group, _Config) ->
 	ok.
 
+start(_Config) ->
+    {ok, ClientPid, _} = emqx_client:start_link().
+

+ 31 - 4
test/emqx_ct_broker_helpers.erl

@@ -27,6 +27,31 @@
                           {cacertfile, "certs/cacert.pem"},
                           {certfile, "certs/client-cert.pem"}]).
 
+-define(CIPHERS,    [{ciphers,
+                        ["ECDHE-ECDSA-AES256-GCM-SHA384",
+                         "ECDHE-RSA-AES256-GCM-SHA384",
+                         "ECDHE-ECDSA-AES256-SHA384",
+                         "ECDHE-RSA-AES256-SHA384","ECDHE-ECDSA-DES-CBC3-SHA",
+                         "ECDH-ECDSA-AES256-GCM-SHA384",
+                         "ECDH-RSA-AES256-GCM-SHA384",
+                         "ECDH-ECDSA-AES256-SHA384","ECDH-RSA-AES256-SHA384",
+                         "DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256",
+                         "AES256-GCM-SHA384","AES256-SHA256",
+                         "ECDHE-ECDSA-AES128-GCM-SHA256",
+                         "ECDHE-RSA-AES128-GCM-SHA256",
+                         "ECDHE-ECDSA-AES128-SHA256",
+                         "ECDHE-RSA-AES128-SHA256",
+                         "ECDH-ECDSA-AES128-GCM-SHA256",
+                         "ECDH-RSA-AES128-GCM-SHA256",
+                         "ECDH-ECDSA-AES128-SHA256","ECDH-RSA-AES128-SHA256",
+                         "DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256",
+                         "AES128-GCM-SHA256","AES128-SHA256",
+                         "ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA",
+                         "DHE-DSS-AES256-SHA","ECDH-ECDSA-AES256-SHA",
+                         "ECDH-RSA-AES256-SHA","AES256-SHA",
+                         "ECDHE-ECDSA-AES128-SHA","ECDHE-RSA-AES128-SHA",
+                         "DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA",
+                         "ECDH-RSA-AES128-SHA","AES128-SHA"]}]).
 
 run_setup_steps() ->
     NewConfig = generate_config(),
@@ -69,7 +94,7 @@ change_opts(SslType) ->
     lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
     case Protocol of
     ssl ->
-            SslOpts = proplists:get_value(sslopts, Opts),
+            SslOpts = proplists:get_value(ssl_options, Opts),
             Keyfile = local_path(["etc/certs", "key.pem"]),
             Certfile = local_path(["etc/certs", "cert.pem"]),
             TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
@@ -87,13 +112,15 @@ change_opts(SslType) ->
                                  (_) -> true
                              end, TupleList2)
             end,
-            [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc];
+            [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc];
         _ ->
             [Listener | Acc]
     end
     end, [], Listeners),
     application:set_env(?APP, listeners, NewListeners).
 
-client_ssl() ->
-    [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT].
+client_ssl_twoway() ->
+    [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT] ++ ?CIPHERS.
 
+client_ssl() ->
+    ?CIPHERS ++ [{reuse_sessions, true}].