Просмотр исходного кода

Merge the latest enterprise branch

Feng Lee 8 лет назад
Родитель
Сommit
fcb2ec8427

+ 1 - 0
.gitignore

@@ -31,3 +31,4 @@ _build
 rebar3.crashdump
 .DS_Store
 rebar.config
+emqx.iml

+ 6 - 5
Makefile

@@ -1,17 +1,19 @@
+.PHONY: plugins tests
+
 PROJECT = emqx
 PROJECT_DESCRIPTION = EMQ X Broker
 PROJECT_VERSION = 3.0
 
-NO_AUTOPATCH = cuttlefish
+NO_AUTOPATCH = gen_rpc cuttlefish
 
 DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
 
 dep_goldrush     = git https://github.com/basho/goldrush 0.1.9
 dep_gproc        = git https://github.com/uwiger/gproc
+dep_jsx          = git https://github.com/talentdeficit/jsx
 dep_getopt       = git https://github.com/jcomellas/getopt v0.8.2
 dep_lager        = git https://github.com/basho/lager master
 dep_lager_syslog = git https://github.com/basho/lager_syslog
-dep_jsx          = git https://github.com/talentdeficit/jsx
 dep_esockd       = git https://github.com/emqtt/esockd v5.2.1
 dep_ekka         = git https://github.com/emqtt/ekka v0.2.2
 dep_mochiweb     = git https://github.com/emqtt/mochiweb v4.2.2
@@ -25,16 +27,15 @@ ERLC_OPTS += +'{parse_transform, lager_transform}'
 BUILD_DEPS = cuttlefish
 dep_cuttlefish = git https://github.com/emqtt/cuttlefish
 
-TEST_DEPS = emqttc emq_dashboard
+TEST_DEPS = emqttc
 dep_emqttc = git https://github.com/emqtt/emqttc
-dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard develop
 
 TEST_ERLC_OPTS += +debug_info
 TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
 
 EUNIT_OPTS = verbose
 
-CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \
+CT_SUITES = emqx emqx_broker emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \
 			emqx_vm emqx_net emqx_protocol emqx_access emqx_router
 
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1

+ 1 - 1
README.md

@@ -1,5 +1,5 @@
 
-# *EMQ* - Erlang MQTT Broker
+# *EMQ X* - EMQ X Broker
 
 [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd)
 

+ 11 - 2
etc/emqx.conf

@@ -400,6 +400,11 @@ mqtt.max_packet_size = 64KB
 ## Value: on | off
 mqtt.websocket_protocol_header = on
 
+## Check Websocket Upgrade Header.
+##
+## Value: on | off
+mqtt.websocket_check_upgrade_header = on
+
 ## The backoff for MQTT keepalive timeout.
 ## EMQ will kick a MQTT connection out until 'Keepalive * backoff * 2' timeout.
 ##
@@ -578,6 +583,9 @@ mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/
 ## Value: File
 mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins
 
+## File to store loaded plugin names.
+mqtt.plugins.expand_plugins_dir = {{ platform_plugins_dir }}/
+
 ##--------------------------------------------------------------------
 ## MQTT Listeners
 ##--------------------------------------------------------------------
@@ -611,6 +619,7 @@ listener.tcp.external.max_clients = 102400
 ## Mountpoint of the MQTT/TCP Listener. All the topics of this
 ## listener will be prefixed with the mount point if this option
 ## is enabled.
+## Notice that EMQ X supports wildcard mount:%c clientid, %u username
 ##
 ## Value: String
 ## listener.tcp.external.mountpoint = external/
@@ -830,7 +839,7 @@ listener.ssl.external.acceptors = 16
 ## Maximum number of concurrent MQTT/SSL connections.
 ##
 ## Value: Number
-listener.ssl.external.max_clients = 1024
+listener.ssl.external.max_clients = 102400
 
 ## TODO: Zone of the external MQTT/SSL listener belonged to.
 ##
@@ -1314,7 +1323,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 
 ## TCP backlog for the WebSocket/SSL connection.
 ##
-## See listener.tcp.<name>.backlog
+## See: listener.tcp.<name>.backlog
 ##
 ## Value: Number >= 0
 listener.wss.external.backlog = 1024

+ 3 - 1
include/emqx.hrl

@@ -85,7 +85,9 @@
           will_topic    :: undefined | binary(),
           ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
           mountpoint    :: undefined | binary(),
-          connected_at  :: erlang:timestamp()
+          connected_at  :: erlang:timestamp(),
+          %%TODO: Headers
+          headers = []  :: list()
         }).
 
 -type(mqtt_client() :: #mqtt_client{}).

+ 9 - 0
priv/emqx.schema

@@ -538,6 +538,11 @@ end}.
   {datatype, flag}
 ]}.
 
+{mapping, "mqtt.websocket_check_upgrade_header", "emqx.websocket_check_upgrade_header", [
+  {default, on},
+  {datatype, flag}
+]}.
+
 %%--------------------------------------------------------------------
 %% MQTT Connection
 %%--------------------------------------------------------------------
@@ -760,6 +765,10 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "mqtt.plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [
+  {datatype, string}
+]}.
+
 %%--------------------------------------------------------------------
 %% MQTT Listeners
 %%--------------------------------------------------------------------

+ 64 - 1
src/emqx_plugins.erl

@@ -28,6 +28,8 @@
 
 -export([list/0]).
 
+-export([load_expand_plugin/1]).
+
 %% @doc Init plugins' config
 -spec(init() -> ok).
 init() ->
@@ -49,6 +51,7 @@ init_config(CfgFile) ->
 %% @doc Load all plugins when the broker started.
 -spec(load() -> list() | {error, term()}).
 load() ->
+    load_expand_plugins(),
     case emqx:env(plugins_loaded_file) of
         {ok, File} ->
             ensure_file(File),
@@ -58,6 +61,66 @@ load() ->
             ignore
     end.
 
+load_expand_plugins() ->
+    case emqx:env(expand_plugins_dir) of
+        {ok, Dir} ->
+            PluginsDir = filelib:wildcard("*", Dir),
+            lists:foreach(fun(PluginDir) ->
+                case filelib:is_dir(Dir ++ PluginDir) of
+                    true  -> load_expand_plugin(Dir ++ PluginDir);
+                    false -> ok
+                end
+            end, PluginsDir);
+        _ -> ok
+    end.
+
+load_expand_plugin(PluginDir) ->
+    init_expand_plugin_config(PluginDir),
+    Ebin = PluginDir ++ "/ebin",
+    code:add_patha(Ebin),
+    Modules = filelib:wildcard(Ebin ++ "/*.beam"),
+    lists:foreach(fun(Mod) ->
+        Module = list_to_atom(filename:basename(Mod, ".beam")),
+        code:load_file(Module)
+    end, Modules),
+    case filelib:wildcard(Ebin ++ "/*.app") of
+        [App|_] -> application:load(list_to_atom(filename:basename(App, ".app")));
+        _ -> lager:error("load application fail"), {error, load_app_fail}
+    end.
+
+init_expand_plugin_config(PluginDir) ->
+    Priv = PluginDir ++ "/priv",
+    Etc  = PluginDir ++ "/etc",
+    Schema = filelib:wildcard(Priv ++ "/*.schema"),
+    Conf = case filelib:wildcard(Etc ++ "/*.conf") of
+        [] -> [];
+        [Conf1] -> cuttlefish_conf:file(Conf1)
+    end,
+    AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf),
+    lists:foreach(fun({AppName, Envs}) ->
+        [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
+    end, AppsEnv).
+
+get_expand_plugin_config() ->
+    case emqx:env(expand_plugins_dir) of
+        {ok, Dir} ->
+            PluginsDir = filelib:wildcard("*", Dir),
+            lists:foldl(fun(PluginDir, Acc) ->
+                case filelib:is_dir(Dir ++ PluginDir) of
+                    true  ->
+                        Etc  = Dir ++ PluginDir ++ "/etc",
+                        case filelib:wildcard("*.{conf,config}", Etc) of
+                            [] -> Acc;
+                            [Conf] -> [Conf | Acc]
+                        end;
+                    false ->
+                        Acc
+                end
+            end, [], PluginsDir);
+        _ -> ok
+    end.
+
+
 ensure_file(File) ->
     case filelib:is_file(File) of false -> write_loaded([]); true -> ok end.
 
@@ -98,7 +161,7 @@ stop_plugins(Names) ->
 list() ->
     case emqx:env(plugins_etc_dir) of
         {ok, PluginsEtc} ->
-            CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc),
+            CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(),
             Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles],
             StartedApps = names(started_app),
             lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->

+ 26 - 10
src/emqx_protocol.erl

@@ -35,6 +35,10 @@
 
 -export([process/2]).
 
+-ifdef(TEST).
+-compile(export_all).
+-endif.
+
 -record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0,
                       send_pkt = 0, send_msg = 0}).
 
@@ -289,7 +293,7 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable),
         false ->
             case emqx_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
                 {ok, TopicTable1} ->
-                    emqx_session:subscribe(Session, PacketId, mount(MountPoint, TopicTable1)),
+                    emqx_session:subscribe(Session, PacketId, mount(replvar(MountPoint, State), TopicTable1)),
                     {ok, State};
                 {stop, _} ->
                     {ok, State}
@@ -307,7 +311,7 @@ process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics),
                              session    = Session}) ->
     case emqx_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
         {ok, TopicTable} ->
-            emqx_session:unsubscribe(Session, mount(MountPoint, TopicTable));
+            emqx_session:unsubscribe(Session, mount(replvar(MountPoint, State), TopicTable));
         {stop, _} ->
             ok
     end,
@@ -321,12 +325,12 @@ process(?PACKET(?DISCONNECT), State) ->
     {stop, normal, State#proto_state{will_msg = undefined}}.
 
 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
-        #proto_state{client_id  = ClientId,
-                     username   = Username,
-                     mountpoint = MountPoint,
-                     session    = Session}) ->
+        State = #proto_state{client_id  = ClientId,
+                             username   = Username,
+                             mountpoint = MountPoint,
+                             session    = Session}) ->
     Msg = emqx_message:from_packet(Username, ClientId, Packet),
-    emqx_session:publish(Session, mount(MountPoint, Msg));
+    emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg));
 
 publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
     with_puback(?PUBACK, Packet, State);
@@ -340,7 +344,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
                                  mountpoint = MountPoint,
                                  session    = Session}) ->
     Msg = emqx_message:from_packet(Username, ClientId, Packet),
-    case emqx_session:publish(Session, mount(MountPoint, Msg)) of
+    case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg)) of
         ok ->
             send(?PUBACK_PACKET(Type, PacketId), State);
         {error, Error} ->
@@ -415,10 +419,10 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
     %% emqx_cm:unreg(ClientId).
     ok.
 
-willmsg(Packet, #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) ->
+willmsg(Packet, State = #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) ->
     case emqx_message:from_packet(Packet) of
         undefined -> undefined;
-        Msg -> mount(MountPoint, Msg)
+        Msg -> mount(replvar(MountPoint, State), Msg)
     end.
 
 %% Generate a client if if nulll
@@ -577,6 +581,18 @@ clean_retain(_IsBridge, Msg) ->
 %% Mount Point
 %%--------------------------------------------------------------------
 
+replvar(undefined, _State) ->
+    undefined;
+replvar(MountPoint, #proto_state{client_id = ClientId, username = Username}) ->
+    lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
+
+feed_var({<<"%c">>, ClientId}, MountPoint) ->
+    emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
+feed_var({<<"%u">>, undefined}, MountPoint) ->
+    MountPoint;
+feed_var({<<"%u">>, Username}, MountPoint) ->
+    emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
+
 mount(undefined, Any) ->
     Any;
 mount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->

+ 1 - 1
src/emqx_session.erl

@@ -667,7 +667,7 @@ expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs],
     case (timer:now_diff(Now, TS) div 1000) of
         Diff when Diff >= Timeout ->
             ?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State),
-            emqttd_metrics:inc('messages/qos2/dropped'),
+            emqx_metrics:inc('messages/qos2/dropped'),
             expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
         Diff ->
             State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)}

+ 2 - 1
src/emqx_ws.erl

@@ -74,7 +74,8 @@ handle_request(Method, Path, Req) ->
     Req:not_found().
 
 is_websocket(Upgrade) ->
-    Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
+    (not emqx:env(websocket_check_upgrade_header, true)) orelse
+        (Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket").
 
 check_protocol_header(Req) ->
     case emqx:env(websocket_protocol_header, false) of

+ 47 - 623
test/emqx_SUITE.erl

@@ -18,9 +18,7 @@
 
 -compile(export_all).
 
--include("emqx.hrl").
-
--include("emqx_mqtt.hrl").
+-include_lib("emqttc/include/emqttc_packet.hrl").
 
 -define(APP, emqx).
 
@@ -28,115 +26,37 @@
 
 -include_lib("common_test/include/ct.hrl").
 
--define(CONTENT_TYPE, "application/json").
-
--define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"},
-                          {verify, verify_peer},
-                          {fail_if_no_peer_cert, true}]).
-
--define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"},
-                          {cacertfile, "certs/cacert.pem"},
-                          {certfile, "certs/client-cert.pem"}]).
-
--define(URL, "http://localhost:8080/api/v2/").
-
--define(APPL_JSON, "application/json").
-
--define(PRINT(PATH), lists:flatten(io_lib:format(PATH, [atom_to_list(node())]))).
-
--define(GET_API, ["management/nodes",
-                  ?PRINT("management/nodes/~s"),
-                  "monitoring/nodes",
-                  ?PRINT("monitoring/nodes/~s"),
-                  "monitoring/listeners",
-                  ?PRINT("monitoring/listeners/~s"),
-                  "monitoring/metrics",
-                  ?PRINT("monitoring/metrics/~s"),
-                  "monitoring/stats",
-                  ?PRINT("monitoring/stats/~s"),
-                  ?PRINT("nodes/~s/clients"),
-                  "routes"]).
-
+-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
+                                client_id = <<"mqtt_client">>,
+                                username  = <<"admin">>,
+                                password  = <<"public">>})).
 all() ->
-    [{group, protocol},
-     {group, pubsub},
-     {group, session},
-     {group, broker},
-     {group, metrics},
-     {group, stats},
-     {group, hook},
-     {group, http},
-     {group, alarms},
-     {group, cli},
+    [{group, connect},
      {group, cleanSession}].
 
 groups() ->
-    [{protocol, [sequence],
+    [{connect, [non_parallel_tests],
       [mqtt_connect,
-       mqtt_ssl_oneway,
-       mqtt_ssl_twoway]},
-     {pubsub, [sequence],
-      [subscribe_unsubscribe,
-       publish, pubsub,
-       t_local_subscribe,
-       t_shared_subscribe,
-       'pubsub#', 'pubsub+']},
-     {session, [sequence],
-      [start_session]},
-     {broker, [sequence],
-      [hook_unhook]},
-     {metrics, [sequence],
-      [inc_dec_metric]},
-     {stats, [sequence],
-      [set_get_stat]},
-     {hook, [sequence],
-      [add_delete_hook,
-       run_hooks]},
-    {http, [sequence], 
-     [request_status,
-      request_publish,
-      get_api_lists
-     % websocket_test
-     ]},
-     {alarms, [sequence], 
-     [set_alarms]
-     },
-     {cli, [sequence],
-      [ctl_register_cmd,
-       cli_status,
-       cli_broker,
-       cli_clients,
-       cli_sessions,
-       cli_routes,
-       cli_topics,
-       cli_subscriptions,
-       cli_bridges,
-       cli_plugins,
-       {listeners, [sequence],
-        [cli_listeners,
-         conflict_listeners
-         ]},
-       cli_vm]},
+       mqtt_connect_with_tcp,
+       mqtt_connect_with_ssl_oneway,
+       mqtt_connect_with_ssl_twoway,
+       mqtt_connect_with_ws]},
      {cleanSession, [sequence],
-      [cleanSession_validate,
-       cleanSession_validate1]
+      [cleanSession_validate]
      }
     ].
 
 init_per_suite(Config) ->
-    NewConfig = generate_config(),
-    lists:foreach(fun set_app_env/1, NewConfig),
-    Apps = application:ensure_all_started(?APP),
-    ct:log("Apps:~p", [Apps]),
+    emqx_ct_broker_helpers:run_setup_steps(),
+   % ct:log("Apps:~p", [Apps]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx:shutdown().
+    emqx_ct_broker_helpers:run_teardown_steps().
 
 %%--------------------------------------------------------------------
 %% Protocol Test
 %%--------------------------------------------------------------------
-
 mqtt_connect(_) ->
     %% Issue #599
     %% Empty clientId and clean_session = false
@@ -151,10 +71,21 @@ connect_broker_(Packet, RecvSize) ->
     gen_tcp:close(Sock),
     Data.
 
-mqtt_ssl_oneway(_) ->
+mqtt_connect_with_tcp(_) ->
+    %% Issue #599
+    %% Empty clientId and clean_session = false
+    {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]),
+    Packet = raw_send_serialise(?CLIENT),
+    gen_tcp:send(Sock, Packet),
+    {ok, Data} = gen_tcp:recv(Sock, 0),
+    {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
+    gen_tcp:close(Sock).
+
+mqtt_connect_with_ssl_oneway(_) ->
     emqx:stop(),
-    change_opts(ssl_oneway),
+    emqx_ct_broker_helpers:change_opts(ssl_oneway),
     emqx:start(),
+    timer:sleep(5000),
     {ok, SslOneWay} = emqttc:start_link([{host, "localhost"},
                                          {port, 8883},
                                          {logger, debug},
@@ -173,12 +104,12 @@ mqtt_ssl_oneway(_) ->
     emqttc:disconnect(SslOneWay),
     emqttc:disconnect(Pub).
 
-mqtt_ssl_twoway(_Config) ->
+mqtt_connect_with_ssl_twoway(_Config) ->
     emqx:stop(),
-    change_opts(ssl_twoway),
+    emqx_ct_broker_helpers:change_opts(ssl_twoway),
     emqx:start(),
     timer:sleep(3000),
-    ClientSSl = [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT],
+    ClientSSl = emqx_ct_broker_helpers:client_ssl(),
     {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"},
                                          {port, 8883},
                                          {client_id, <<"ssltwoway">>},
@@ -195,369 +126,16 @@ mqtt_ssl_twoway(_Config) ->
     emqttc:disconnect(SslTwoWay),
     emqttc:disconnect(Sub).
 
-%%--------------------------------------------------------------------
-%% PubSub Test
-%%--------------------------------------------------------------------
-
-subscribe_unsubscribe(_) ->
-    ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
-    ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]),
-    ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]),
-    ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
-    ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
-    ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
-
-publish(_) ->
-    Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
-    ok = emqx:subscribe(<<"test/+">>),
-    timer:sleep(10),
-    emqx:publish(Msg),
-    ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
-
-pubsub(_) ->
-    Self = self(),
-    ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]),
-    ?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])),
-    timer:sleep(10),
-    [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self),
-    [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>),
-    emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
-    spawn(fun() ->
-            emqx:subscribe(<<"a/b/c">>),
-            emqx:subscribe(<<"c/d/e">>),
-            timer:sleep(10),
-            emqx:unsubscribe(<<"a/b/c">>)
-          end),
-    timer:sleep(20),
-    emqx:unsubscribe(<<"a/b/c">>).
-
-t_local_subscribe(_) ->
-    ok = emqx:subscribe("$local/topic0"),
-    ok = emqx:subscribe("$local/topic1", <<"x">>),
-    ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]),
-    timer:sleep(10),
-    ?assertEqual([self()], emqx:subscribers("$local/topic0")),
-    ?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")),
-    ?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []},
-                  {{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}],
-                 emqx:subscriptions(<<"x">>)),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
-    ?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)),
-    ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)),
-    ?assertEqual([], emqx:subscribers("topic1")),
-    ?assertEqual([], emqx:subscriptions(<<"x">>)).
-
-t_shared_subscribe(_) ->
-    emqx:subscribe("$local/$share/group1/topic1"),
-    emqx:subscribe("$share/group2/topic2"),
-    emqx:subscribe("$queue/topic3"),
-    timer:sleep(10),
-    ?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
-    ?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []},
-                  {self(), <<"$queue/topic3">>, []},
-                  {self(), <<"$share/group2/topic2">>, []}],
-                 lists:sort(emqx:subscriptions(self()))),
-    emqx:unsubscribe("$local/$share/group1/topic1"),
-    emqx:unsubscribe("$share/group2/topic2"),
-    emqx:unsubscribe("$queue/topic3"),
-    ?assertEqual([], lists:sort(emqx:subscriptions(self()))).
-
-'pubsub#'(_) ->
-    emqx:subscribe(<<"a/#">>),
-    timer:sleep(10),
-    emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end),
-    emqx:unsubscribe(<<"a/#">>).
-
-'pubsub+'(_) ->
-    emqx:subscribe(<<"a/+/+">>),
-    timer:sleep(10),
-    emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
-    ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
-    emqx:unsubscribe(<<"a/+/+">>).
-
-loop_recv(Topic, Timeout) ->
-    loop_recv(Topic, Timeout, []).
-
-loop_recv(Topic, Timeout, Acc) ->
-    receive
-        {dispatch, Topic, Msg} ->
-            loop_recv(Topic, Timeout, [Msg|Acc])
-    after
-        Timeout -> {ok, Acc}
-    end.
-
-recv_loop(Msgs) ->
-    receive
-        {dispatch, _Topic, Msg} ->
-            recv_loop([Msg|Msgs])
-        after
-            100 -> lists:reverse(Msgs)
-    end.
-
-%%--------------------------------------------------------------------
-%% Session Group
-%%--------------------------------------------------------------------
-
-start_session(_) ->
-    {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>),
-    {ok, SessPid} = emqx_mock_client:start_session(ClientPid),
-    Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
-    Message1 = Message#mqtt_message{pktid = 1},
-    emqx_session:publish(SessPid, Message1),
-    emqx_session:pubrel(SessPid, 1),
-    emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]),
-    Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
-    emqx_session:publish(SessPid, Message2),
-    emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]),
-    emqx_mock_client:stop(ClientPid).
-
-%%--------------------------------------------------------------------
-%% Broker Group
-%%--------------------------------------------------------------------
-hook_unhook(_) ->
+mqtt_connect_with_ws(_Config) ->
+    WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
+    {ok, _} = rfc6455_client:open(WS),
+    Packet = raw_send_serialise(?CLIENT),
+    ok = rfc6455_client:send_binary(WS, Packet),
+    {binary, P} = rfc6455_client:recv(WS),
+    {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P),
+    {close, _} = rfc6455_client:close(WS),
     ok.
 
-%%--------------------------------------------------------------------
-%% Metric Group
-%%--------------------------------------------------------------------
-inc_dec_metric(_) ->
-    emqx_metrics:inc(gauge, 'messages/retained', 10),
-    emqx_metrics:dec(gauge, 'messages/retained', 10).
-
-%%--------------------------------------------------------------------
-%% Stats Group
-%%--------------------------------------------------------------------
-set_get_stat(_) ->
-    emqx_stats:setstat('retained/max', 99),
-    99 = emqx_stats:getstat('retained/max').
-
-%%--------------------------------------------------------------------
-%% Hook Test
-%%--------------------------------------------------------------------
-
-add_delete_hook(_) ->
-    ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
-    ok = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
-    {error, already_hooked} = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
-    Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0},
-                 {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}],
-    Callbacks = emqx_hooks:lookup(test_hook),
-    ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1),
-    ct:print("Callbacks: ~p~n", [emqx_hooks:lookup(test_hook)]),
-    ok = emqx:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}),
-    {error, not_found} = emqx:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}),
-    [] = emqx_hooks:lookup(test_hook),
-
-    ok = emqx:hook(emqx_hook, fun ?MODULE:hook_fun1/1, [], 9),
-    ok = emqx:hook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8),
-    Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8},
-                  {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}],
-    Callbacks2 = emqx_hooks:lookup(emqx_hook),
-    ok = emqx:unhook(emqx_hook, fun ?MODULE:hook_fun1/1),
-    ok = emqx:unhook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}),
-    [] = emqx_hooks:lookup(emqx_hook).
-
-run_hooks(_) ->
-    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
-    ok = emqx:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]),
-    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
-    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
-    {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []),
-    {ok, []} = emqx:run_hooks(unknown_hook, [], []),
-
-    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
-    ok = emqx:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]),
-    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
-    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
-    stop = emqx:run_hooks(foreach_hook, [arg]).
-
-hook_fun1([]) -> ok.
-hook_fun2([]) -> {ok, []}.
-
-hook_fun3(arg1, arg2, _Acc, init) -> ok.
-hook_fun4(arg1, arg2, Acc, init)  -> {ok, [r2 | Acc]}.
-hook_fun5(arg1, arg2, Acc, init)  -> {stop, [r3 | Acc]}.
-
-hook_fun6(arg, initArg) -> ok.
-hook_fun7(arg, initArg) -> any.
-hook_fun8(arg, initArg) -> stop.
-
-%%--------------------------------------------------------------------
-%% HTTP Request Test
-%%--------------------------------------------------------------------
-
-request_status(_) ->
-    {InternalStatus, _ProvidedStatus} = init:get_status(),
-    AppStatus =
-    case lists:keysearch(?APP, 1, application:which_applications()) of
-        false         -> not_running;
-        {value, _Val} -> running
-    end,
-    Status = iolist_to_binary(io_lib:format("Node ~s is ~s~nemqx is ~s",
-            [node(), InternalStatus, AppStatus])),
-    Url = "http://127.0.0.1:8080/status",
-    {ok, {{"HTTP/1.1", 200, "OK"}, _, Return}} =
-    httpc:request(get, {Url, []}, [], []),
-    ?assertEqual(binary_to_list(Status), Return).
-
-request_publish(_) ->
-    emqttc:start_link([{host, "localhost"},
-                       {port, 1883},
-                       {client_id, <<"random">>},
-                       {clean_sess, false}]),
-    SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
-    ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("", ""))),
-    ok = emqx:subscribe(<<"a/b/c">>, self(), [{qos, 1}]),
-    Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}",
-    ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("", ""))),
-    ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
-
-    UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
-    ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("", ""))).
-
-connect_emqx_publish_(Method, Api, Params, Auth) ->
-    Url = "http://127.0.0.1:8080/" ++ Api,
-    case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of
-    {error, socket_closed_remotely} ->
-        false;
-    {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} }  ->
-        true;
-    {ok, {{"HTTP/1.1", 400, _}, _, []}} ->
-        false;
-    {ok, {{"HTTP/1.1", 404, _}, _, []}} ->
-        false
-    end.
-	
-auth_header_(User, Pass) ->
-    Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
-    {"Authorization","Basic " ++ Encoded}.
-
-get_api_lists(_Config) ->
-    lists:foreach(fun request/1, ?GET_API).
-
-websocket_test(_) ->
-    Conn = esockd_connection:new(esockd_transport, nil, []),
-    Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1},
-                                mochiweb_headers:make([{"Sec-WebSocket-Key","Xn3fdKyc3qEXPuj2A3O+ZA=="}])),
-
-    ct:log("Req:~p", [Req]).
-    %%emqx_http:handle_request(Req).
-
-set_alarms(_) ->
-    AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
-    emqx_alarm:set_alarm(AlarmTest),
-    Alarms = emqx_alarm:get_alarms(),
-    ?assertEqual(1, length(Alarms)),
-    emqx_alarm:clear_alarm(<<"1">>),
-    [] = emqx_alarm:get_alarms().
-
-%%--------------------------------------------------------------------
-%% Cli group
-%%--------------------------------------------------------------------
-
-ctl_register_cmd(_) ->
-    emqx_ctl:register_cmd(test_cmd, {?MODULE, test_cmd}),
-    erlang:yield(),
-    timer:sleep(5),
-    [{?MODULE, test_cmd}] = emqx_ctl:lookup(test_cmd),
-    emqx_ctl:run(["test_cmd", "arg1", "arg2"]),
-    emqx_ctl:unregister_cmd(test_cmd).
-
-test_cmd(["arg1", "arg2"]) ->
-    ct:print("test_cmd is called");
-
-test_cmd([]) ->
-    io:format("test command").
-
-cli_status(_) ->
-    emqx_cli:status([]).
-
-cli_broker(_) ->
-    emqx_cli:broker([]),
-    emqx_cli:broker(["stats"]),
-    emqx_cli:broker(["metrics"]),
-    emqx_cli:broker(["pubsub"]).
-
-cli_clients(_) ->
-    emqx_cli:clients(["list"]),
-    emqx_cli:clients(["show", "clientId"]),
-    emqx_cli:clients(["kick", "clientId"]).
-
-cli_sessions(_) ->
-    emqx_cli:sessions(["list"]),
-    emqx_cli:sessions(["list", "persistent"]),
-    emqx_cli:sessions(["list", "transient"]),
-    emqx_cli:sessions(["show", "clientId"]).
-
-cli_routes(_) ->
-    emqx:subscribe(<<"topic/route">>),
-    emqx_cli:routes(["list"]),
-    emqx_cli:routes(["show", "topic/route"]),
-    emqx:unsubscribe(<<"topic/route">>).
-
-cli_topics(_) ->
-    emqx:subscribe(<<"topic">>),
-    emqx_cli:topics(["list"]),
-    emqx_cli:topics(["show", "topic"]),
-    emqx:unsubscribe(<<"topic">>).
-
-cli_subscriptions(_) ->
-    emqx_cli:subscriptions(["list"]),
-    emqx_cli:subscriptions(["show", "clientId"]),
-    emqx_cli:subscriptions(["add", "clientId", "topic", "2"]),
-    emqx_cli:subscriptions(["del", "clientId", "topic"]).
-
-cli_plugins(_) ->
-    emqx_cli:plugins(["list"]),
-    emqx_cli:plugins(["load", "emqx_plugin_template"]),
-    emqx_cli:plugins(["unload", "emqx_plugin_template"]).
-
-cli_bridges(_) ->
-    emqx_cli:bridges(["list"]),
-    emqx_cli:bridges(["start", "a@127.0.0.1", "topic"]),
-    emqx_cli:bridges(["stop", "a@127.0.0.1", "topic"]).
-
-cli_listeners(_) ->
-    emqx_cli:listeners([]).
-
-conflict_listeners(_) ->
-    F =
-    fun() ->
-    process_flag(trap_exit, true),
-    emqttc:start_link([{host, "localhost"},
-                       {port, 1883},
-                       {client_id, <<"c1">>},
-                       {clean_sess, false}])
-    end,
-    spawn_link(F),
-
-    {ok, C2} = emqttc:start_link([{host, "localhost"},
-                                  {port, 1883},
-                                  {client_id, <<"c1">>},
-                                  {clean_sess, false}]),
-    timer:sleep(100),
-
-    Listeners =
-    lists:map(fun({{Protocol, ListenOn}, Pid}) ->
-        Key = atom_to_list(Protocol) ++ ":" ++ esockd:to_string(ListenOn),
-        {Key, [{acceptors, esockd:get_acceptors(Pid)},
-               {max_clients, esockd:get_max_clients(Pid)},
-               {current_clients, esockd:get_current_clients(Pid)},
-               {shutdown_count, esockd:get_shutdown_count(Pid)}]}
-              end, esockd:listeners()),
-    L = proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners),
-    ?assertEqual(1, proplists:get_value(current_clients, L)),
-    ?assertEqual(1, proplists:get_value(conflict, proplists:get_value(shutdown_count, L))),
-    timer:sleep(100),
-    emqttc:disconnect(C2).
-
-cli_vm(_) ->
-    emqx_cli:vm([]),
-    emqx_cli:vm(["ports"]).
-
 cleanSession_validate(_) ->
     {ok, C1} = emqttc:start_link([{host, "localhost"},
                                          {port, 1883},
@@ -565,7 +143,6 @@ cleanSession_validate(_) ->
                                          {clean_sess, false}]),
     timer:sleep(10),
     emqttc:subscribe(C1, <<"topic">>, qos0),
-    ok = emqx_cli:sessions(["list", "persistent"]),
     emqttc:disconnect(C1),
     {ok, Pub} = emqttc:start_link([{host, "localhost"},
                                          {port, 1883},
@@ -578,169 +155,16 @@ cleanSession_validate(_) ->
                                    {client_id, <<"c1">>},
                                    {clean_sess, false}]),
     timer:sleep(100),
-    Metrics = emqx_metrics:all(),
-    ct:log("Metrics:~p~n", [Metrics]),
-    ?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)),
-    ?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)),
-    emqttc:disconnect(Pub),
-    emqttc:disconnect(C11).
-
-cleanSession_validate1(_) ->
-    {ok, C1} = emqttc:start_link([{host, "localhost"},
-                                         {port, 1883},
-                                         {client_id, <<"c1">>},
-                                         {clean_sess, true}]),
-    timer:sleep(10),
-    emqttc:subscribe(C1, <<"topic">>, qos1),
-    ok = emqx_cli:sessions(["list", "transient"]),
-    emqttc:disconnect(C1),
-    {ok, Pub} = emqttc:start_link([{host, "localhost"},
-                                         {port, 1883},
-                                         {client_id, <<"pub">>}]),
-
-    emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]),
-    timer:sleep(10),
-    {ok, C11} = emqttc:start_link([{host, "localhost"},
-                                         {port, 1883},
-                                         {client_id, <<"c1">>},
-                                         {clean_sess, false}]),
-    timer:sleep(100),
-    Metrics = emqx_metrics:all(),
-    ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)),
-    ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)),
+    receive {publish, _Topic, M1} ->
+        ?assertEqual(<<"m1">>, M1)
+    after 1000 -> false
+    end,
     emqttc:disconnect(Pub),
     emqttc:disconnect(C11).
 
-connect_emqx_pubsub_(Method, Api, Params, Auth) ->
-    Url = "http://127.0.0.1:8080/" ++ Api,
-    case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of
-    {error, socket_closed_remotely} ->
-        false;
-    {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} }  ->
-        true;
-    {ok, {{"HTTP/1.1", 400, _}, _, []}} ->
-        false;
-    {ok, {{"HTTP/1.1", 404, _}, _, []}} ->
-        false
-    end.
-
-request(Path) ->
-    http_get(get, Path).
-
-http_get(Method, Path) ->
-    req(Method, Path, []).
-
-http_put(Method, Path, Params) ->
-    req(Method, Path, format_for_upload(Params)).
-
-http_post(Method, Path, Params) ->
-    req(Method, Path, format_for_upload(Params)).
-
-req(Method, Path, Body) ->
-   Url = ?URL ++ Path,
-   Headers = auth_header_("", ""),
-   case httpc:request(Method, {Url, [Headers]}, [], []) of
-   {error, R} ->
-       ct:log("R:~p~n", [R]),
-       false;
-   {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} }  ->
-       true;
-   {ok, {{"HTTP/1.1", 400, _}, _, []}} ->
-       false;
-    {ok, {{"HTTP/1.1", 404, _}, _, []}} ->
-        false
-    end.
-
-format_for_upload(none) ->
-    <<"">>;
-format_for_upload(List) ->
-    iolist_to_binary(mochijson2:encode(List)).
-
-ensure_ok(ok) -> ok;
-ensure_ok({error, {already_started, _}}) -> ok.
-
-host() -> ct:print("!!!! Node: ~p~n", [node()]), [_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
-
-wait_running(Node) ->
-    wait_running(Node, 30000).
-
-wait_running(Node, Timeout) when Timeout < 0 ->
-    throw({wait_timeout, Node});
-
-wait_running(Node, Timeout) ->
-    case rpc:call(Node, emqx, is_running, [Node]) of
-        true  -> ok;
-        false -> timer:sleep(100),
-                 wait_running(Node, Timeout - 100)
-    end.
-
-slave(emqx, Node) ->
-    {ok, Slave} = slave:start(host(), Node, "-config ../../test/emqx_SUITE_data/slave.config " ++ ensure_slave()),
-    ct:log("Slave:~p~n", [Slave]),
-    rpc:call(Slave, application, ensure_all_started, [emqx]),
-    Slave;
-
-slave(node, Node) ->
-    {ok, N} = slave:start(host(), Node, ensure_slave()),
-    N.
-
-ensure_slave() ->
-    EbinDir = local_path(["ebin"]),
-    DepsDir = local_path(["deps", "*", "ebin"]),
-    RpcDir = local_path(["deps", "gen_rpc", "_build", "dev", "lib", "*", "ebin"]),
-    "-pa " ++ EbinDir ++ " -pa " ++ DepsDir ++  " -pa " ++ RpcDir.
-
-change_opts(SslType) ->
-    {ok, Listeners} = application:get_env(?APP, listeners),
-    NewListeners =
-    lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
-    case Protocol of
-    ssl ->
-            SslOpts = proplists:get_value(sslopts, Opts),
-            Keyfile = local_path(["etc/certs", "key.pem"]),
-            Certfile = local_path(["etc/certs", "cert.pem"]),
-            TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
-            TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
-            TupleList3 =
-            case SslType of
-            ssl_twoway->
-                CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
-                MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
-                lists:merge(TupleList2, MutSslList);
-            _ ->
-                lists:filter(fun ({cacertfile, _}) -> false;
-                                 ({verify, _}) -> false;
-                                 ({fail_if_no_peer_cert, _}) -> false;
-                                 (_) -> true
-                             end, TupleList2)
-            end,
-            [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc];
-        _ ->
-            [Listener | Acc]
-    end
-    end, [], Listeners),
-    application:set_env(?APP, listeners, NewListeners).
-
-generate_config() ->
-    Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
-    Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]),
-    cuttlefish_generator:map(Schema, Conf).
-
-get_base_dir(Module) ->
-    {file, Here} = code:is_loaded(Module),
-    filename:dirname(filename:dirname(Here)).
-
-get_base_dir() ->
-    get_base_dir(?MODULE).
-
-local_path(Components, Module) ->
-    filename:join([get_base_dir(Module) | Components]).
-
-local_path(Components) ->
-    local_path(Components, ?MODULE).
+raw_send_serialise(Packet) ->
+    emqttc_serialiser:serialise(Packet).
 
-set_app_env({App, Lists}) ->
-    lists:foreach(fun({Par, Var}) ->
-                  application:set_env(App, Par, Var)
-                  end, Lists).
+raw_recv_pase(P) ->
+    emqttc_parser:parse(P, emqttc_parser:new()).
 

+ 2 - 0
test/emqx_access_SUITE.erl

@@ -20,6 +20,8 @@
 
 -include("emqx.hrl").
 
+-include_lib("common_test/include/ct.hrl").
+
 -define(AC, emqx_access_control).
 
 -import(emqx_access_rule, [compile/1, match/3]).

+ 238 - 0
test/emqx_broker_SUITE.erl

@@ -0,0 +1,238 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017 EMQ Enterprise, Inc. (http://emqtt.io)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_broker_SUITE).
+
+-compile(export_all).
+
+-define(APP, emqx).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-include_lib("common_test/include/ct.hrl").
+
+-include("emqx.hrl").
+
+all() ->
+    [
+     {group, pubsub},
+     {group, session},
+     {group, broker},
+     {group, metrics},
+     {group, stats},
+     {group, hook},
+     {group, alarms}].
+
+groups() ->
+    [
+     {pubsub, [sequence], [subscribe_unsubscribe,
+                           publish, pubsub,
+                           t_local_subscribe,
+                           t_shared_subscribe,
+                           'pubsub#', 'pubsub+']},
+     {session, [sequence], [start_session]},
+     {broker, [sequence], [hook_unhook]},
+     {metrics, [sequence], [inc_dec_metric]},
+     {stats, [sequence], [set_get_stat]},
+     {hook, [sequence], [add_delete_hook, run_hooks]},
+     {alarms, [sequence], [set_alarms]}
+    ].
+
+init_per_suite(Config) ->
+    emqx_ct_broker_helpers:run_setup_steps(),
+    Config.
+
+end_per_suite(Config) ->
+    emqx_ct_broker_helpers:run_teardown_steps().
+    
+%%--------------------------------------------------------------------
+%% PubSub Test
+%%--------------------------------------------------------------------
+
+subscribe_unsubscribe(_) ->
+    ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
+    ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]),
+    ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]),
+    ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
+    ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
+    ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
+
+publish(_) ->
+    Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
+    ok = emqx:subscribe(<<"test/+">>),
+    timer:sleep(10),
+    emqx:publish(Msg),
+    ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
+
+pubsub(_) ->
+    Self = self(),
+    ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]),
+    ?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])),
+    timer:sleep(10),
+    [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self),
+    [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>),
+    emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
+    ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
+    spawn(fun() ->
+            emqx:subscribe(<<"a/b/c">>),
+            emqx:subscribe(<<"c/d/e">>),
+            timer:sleep(10),
+            emqx:unsubscribe(<<"a/b/c">>)
+          end),
+    timer:sleep(20),
+    emqx:unsubscribe(<<"a/b/c">>).
+
+t_local_subscribe(_) ->
+    ok = emqx:subscribe("$local/topic0"),
+    ok = emqx:subscribe("$local/topic1", <<"x">>),
+    ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]),
+    timer:sleep(10),
+    ?assertEqual([self()], emqx:subscribers("$local/topic0")),
+    ?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")),
+    ?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []},
+                  {{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}],
+                 emqx:subscriptions(<<"x">>)),
+    ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
+    ?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")),
+    ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)),
+    ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)),
+    ?assertEqual([], emqx:subscribers("topic1")),
+    ?assertEqual([], emqx:subscriptions(<<"x">>)).
+
+t_shared_subscribe(_) ->
+    emqx:subscribe("$local/$share/group1/topic1"),
+    emqx:subscribe("$share/group2/topic2"),
+    emqx:subscribe("$queue/topic3"),
+    timer:sleep(10),
+    ?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
+    ?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []},
+                  {self(), <<"$queue/topic3">>, []},
+                  {self(), <<"$share/group2/topic2">>, []}],
+                 lists:sort(emqx:subscriptions(self()))),
+    emqx:unsubscribe("$local/$share/group1/topic1"),
+    emqx:unsubscribe("$share/group2/topic2"),
+    emqx:unsubscribe("$queue/topic3"),
+    ?assertEqual([], lists:sort(emqx:subscriptions(self()))).
+
+'pubsub#'(_) ->
+    emqx:subscribe(<<"a/#">>),
+    timer:sleep(10),
+    emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
+    ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end),
+    emqx:unsubscribe(<<"a/#">>).
+
+'pubsub+'(_) ->
+    emqx:subscribe(<<"a/+/+">>),
+    timer:sleep(10),
+    emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
+    ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
+    emqx:unsubscribe(<<"a/+/+">>).
+
+%%--------------------------------------------------------------------
+%% Session Group
+%%--------------------------------------------------------------------
+start_session(_) ->
+    {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>),
+    {ok, SessPid} = emqx_mock_client:start_session(ClientPid),
+    Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
+    Message1 = Message#mqtt_message{pktid = 1},
+    emqx_session:publish(SessPid, Message1),
+    emqx_session:pubrel(SessPid, 1),
+    emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]),
+    Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
+    emqx_session:publish(SessPid, Message2),
+    emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]),
+    emqx_mock_client:stop(ClientPid).
+
+%%--------------------------------------------------------------------
+%% Broker Group
+%%--------------------------------------------------------------------
+hook_unhook(_) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Metric Group
+%%--------------------------------------------------------------------
+inc_dec_metric(_) ->
+    emqx_metrics:inc(gauge, 'messages/retained', 10),
+    emqx_metrics:dec(gauge, 'messages/retained', 10).
+
+%%--------------------------------------------------------------------
+%% Stats Group
+%%--------------------------------------------------------------------
+set_get_stat(_) ->
+    emqx_stats:setstat('retained/max', 99),
+    99 = emqx_stats:getstat('retained/max').
+
+%%--------------------------------------------------------------------
+%% Hook Test
+%%--------------------------------------------------------------------
+
+add_delete_hook(_) ->
+    ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
+    ok = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
+    {error, already_hooked} = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []),
+    Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0},
+                 {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}],
+    Callbacks = emqx_hooks:lookup(test_hook),
+    ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1),
+    ct:print("Callbacks: ~p~n", [emqx_hooks:lookup(test_hook)]),
+    ok = emqx:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}),
+    {error, not_found} = emqx:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}),
+    [] = emqx_hooks:lookup(test_hook),
+
+    ok = emqx:hook(emqx_hook, fun ?MODULE:hook_fun1/1, [], 9),
+    ok = emqx:hook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8),
+    Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8},
+                  {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}],
+    Callbacks2 = emqx_hooks:lookup(emqx_hook),
+    ok = emqx:unhook(emqx_hook, fun ?MODULE:hook_fun1/1),
+    ok = emqx:unhook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}),
+    [] = emqx_hooks:lookup(emqx_hook).
+
+run_hooks(_) ->
+    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
+    ok = emqx:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]),
+    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
+    ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
+    {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []),
+    {ok, []} = emqx:run_hooks(unknown_hook, [], []),
+
+    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
+    ok = emqx:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]),
+    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
+    ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
+    stop = emqx:run_hooks(foreach_hook, [arg]).
+
+hook_fun1([]) -> ok.
+hook_fun2([]) -> {ok, []}.
+
+hook_fun3(arg1, arg2, _Acc, init) -> ok.
+hook_fun4(arg1, arg2, Acc, init)  -> {ok, [r2 | Acc]}.
+hook_fun5(arg1, arg2, Acc, init)  -> {stop, [r3 | Acc]}.
+
+hook_fun6(arg, initArg) -> ok.
+hook_fun7(arg, initArg) -> any.
+hook_fun8(arg, initArg) -> stop.
+
+set_alarms(_) ->
+    AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
+    emqx_alarm:set_alarm(AlarmTest),
+    Alarms = emqx_alarm:get_alarms(),
+    ?assertEqual(1, length(Alarms)),
+    emqx_alarm:clear_alarm(<<"1">>),
+    [] = emqx_alarm:get_alarms().
+
+

+ 102 - 0
test/emqx_ct_broker_helpers.erl

@@ -0,0 +1,102 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017 EMQ Enterprise, Inc. (http://emqtt.io)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ct_broker_helpers).
+
+-compile(export_all).
+
+-define(APP, emqx).
+
+-define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"},
+                          {verify, verify_peer},
+                          {fail_if_no_peer_cert, true}]).
+
+-define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"},
+                          {cacertfile, "certs/cacert.pem"},
+                          {certfile, "certs/client-cert.pem"}]).
+
+
+run_setup_steps() ->
+    NewConfig = generate_config(),
+    lists:foreach(fun set_app_env/1, NewConfig),
+    application:ensure_all_started(?APP).
+
+run_teardown_steps() ->
+    ?APP:shutdown().
+
+generate_config() ->
+    Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
+    Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]),
+    cuttlefish_generator:map(Schema, Conf).
+
+get_base_dir(Module) ->
+    {file, Here} = code:is_loaded(Module),
+    filename:dirname(filename:dirname(Here)).
+
+get_base_dir() ->
+    get_base_dir(?MODULE).
+
+local_path(Components, Module) ->
+    filename:join([get_base_dir(Module) | Components]).
+
+local_path(Components) ->
+    local_path(Components, ?MODULE).
+
+set_app_env({App, Lists}) ->
+    lists:foreach(fun({acl_file, _Var}) ->
+                        application:set_env(App, acl_file, local_path(["etc", "acl.conf"]));
+                     ({license_file, _Var}) ->
+                        application:set_env(App, license_file, local_path(["etc", "emqx.lic"]));
+                     ({plugins_loaded_file, _Var}) ->
+                        application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"]));
+                     ({Par, Var}) ->
+                        application:set_env(App, Par, Var)
+                  end, Lists).
+
+change_opts(SslType) ->
+    {ok, Listeners} = application:get_env(?APP, listeners),
+    NewListeners =
+    lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
+    case Protocol of
+    ssl ->
+            SslOpts = proplists:get_value(sslopts, Opts),
+            Keyfile = local_path(["etc/certs", "key.pem"]),
+            Certfile = local_path(["etc/certs", "cert.pem"]),
+            TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
+            TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
+            TupleList3 =
+            case SslType of
+            ssl_twoway->
+                CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
+                MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
+                lists:merge(TupleList2, MutSslList);
+            _ ->
+                lists:filter(fun ({cacertfile, _}) -> false;
+                                 ({verify, _}) -> false;
+                                 ({fail_if_no_peer_cert, _}) -> false;
+                                 (_) -> true
+                             end, TupleList2)
+            end,
+            [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc];
+        _ ->
+            [Listener | Acc]
+    end
+    end, [], Listeners),
+    application:set_env(?APP, listeners, NewListeners).
+
+client_ssl() ->
+    [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT].
+

+ 0 - 9
test/emqx_mod_SUITE.erl

@@ -23,13 +23,4 @@
 all() -> [mod_subscription_rep].
 
 mod_subscription_rep(_) -> ok.
-%%    <<"topic/clientId">> = emqttd_mod_subscription:rep(
-%%            <<"$c">>, <<"clientId">>, <<"topic/$c">>),
-%%   <<"topic/username">> = emqttd_mod_subscription:rep(
-%%           <<"$u">>, <<"username">>, <<"topic/$u">>),
-%%   <<"topic/username/clientId">> = emqttd_mod_subscription:rep(
-%%           <<"$c">>, <<"clientId">>, emqttd_mod_subscription:rep(
-%%               <<"$u">>, <<"username">>, <<"topic/$u/$c">>)).
  
-
-

+ 252 - 0
test/rfc6455_client.erl

@@ -0,0 +1,252 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ Management Console.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2012-2016 Pivotal Software, Inc.  All rights reserved.
+%%
+
+-module(rfc6455_client).
+
+-export([new/2, open/1, recv/1, send/2, send_binary/2, close/1, close/2]).
+
+-record(state, {host, port, addr, path, ppid, socket, data, phase}).
+
+%% --------------------------------------------------------------------------
+
+new(WsUrl, PPid) ->
+    crypto:start(),
+    "ws://" ++ Rest = WsUrl,
+    [Addr, Path] = split("/", Rest, 1),
+    [Host, MaybePort] = split(":", Addr, 1, empty),
+    Port = case MaybePort of
+               empty -> 80;
+               V     -> {I, ""} = string:to_integer(V), I
+           end,
+    State = #state{host = Host,
+                   port = Port,
+                   addr = Addr,
+                   path = "/" ++ Path,
+                   ppid = PPid},
+    spawn(fun () ->
+                  start_conn(State)
+          end).
+
+open(WS) ->
+    receive
+        {rfc6455, open, WS, Opts} ->
+            {ok, Opts};
+        {rfc6455, close, WS, R} ->
+            {close, R}
+    end.
+
+recv(WS) ->
+    receive
+        {rfc6455, recv, WS, Payload} ->
+            {ok, Payload};
+        {rfc6455, recv_binary, WS, Payload} ->
+            {binary, Payload};
+        {rfc6455, close, WS, R} ->
+            {close, R}
+    end.
+
+send(WS, IoData) ->
+    WS ! {send, IoData},
+    ok.
+
+send_binary(WS, IoData) ->
+    WS ! {send_binary, IoData},
+    ok.
+
+close(WS) ->
+    close(WS, {1000, ""}).
+
+close(WS, WsReason) ->
+    WS ! {close, WsReason},
+    receive
+        {rfc6455, close, WS, R} ->
+            {close, R}
+    end.
+
+
+%% --------------------------------------------------------------------------
+
+start_conn(State) ->
+    {ok, Socket} = gen_tcp:connect(State#state.host, State#state.port,
+                                   [binary,
+                                    {packet, 0}]),
+    Key = base64:encode_to_string(crypto:strong_rand_bytes(16)),
+    gen_tcp:send(Socket,
+        "GET " ++ State#state.path ++ " HTTP/1.1\r\n" ++
+        "Host: " ++ State#state.addr ++ "\r\n" ++
+        "Upgrade: websocket\r\n" ++
+        "Connection: Upgrade\r\n" ++
+        "Sec-WebSocket-Key: " ++ Key ++ "\r\n" ++
+        "Origin: null\r\n" ++
+        "Sec-WebSocket-Protocol: mqtt\r\n" ++
+        "Sec-WebSocket-Version: 13\r\n\r\n"),
+
+    loop(State#state{socket = Socket,
+                     data   = <<>>,
+                     phase = opening}).
+
+do_recv(State = #state{phase = opening, ppid = PPid, data = Data}) ->
+    case split("\r\n\r\n", binary_to_list(Data), 1, empty) of
+        [_Http, empty] -> State;
+        [Http, Data1]   ->
+            %% TODO: don't ignore http response data, verify key
+            PPid ! {rfc6455, open, self(), [{http_response, Http}]},
+            State#state{phase = open,
+                        data = Data1}
+    end;
+do_recv(State = #state{phase = Phase, data = Data, socket = Socket, ppid = PPid})
+  when Phase =:= open orelse Phase =:= closing ->
+    R = case Data of
+            <<F:1, _:3, O:4, 0:1, L:7, Payload:L/binary, Rest/binary>>
+              when L < 126 ->
+                {F, O, Payload, Rest};
+
+            <<F:1, _:3, O:4, 0:1, 126:7, L2:16, Payload:L2/binary, Rest/binary>> ->
+                {F, O, Payload, Rest};
+
+            <<F:1, _:3, O:4, 0:1, 127:7, L2:64, Payload:L2/binary, Rest/binary>> ->
+                {F, O, Payload, Rest};
+
+            <<_:1, _:3, _:4, 1:1, _/binary>> ->
+                %% According o rfc6455 5.1 the server must not mask any frames.
+                die(Socket, PPid, {1006, "Protocol error"}, normal);
+            _ ->
+                moredata
+        end,
+    case R of
+        moredata ->
+            State;
+        _ -> do_recv2(State, R)
+    end.
+
+do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) ->
+    case R of
+        {1, 1, Payload, Rest} ->
+            PPid ! {rfc6455, recv, self(), Payload},
+            State#state{data = Rest};
+        {1, 2, Payload, Rest} ->
+            PPid ! {rfc6455, recv_binary, self(), Payload},
+            State#state{data = Rest};
+        {1, 8, Payload, _Rest} ->
+            WsReason = case Payload of
+                           <<WC:16, WR/binary>> -> {WC, WR};
+                           <<>> -> {1005, "No status received"}
+                       end,
+            case Phase of
+                open -> %% echo
+                    do_close(State, WsReason),
+                    gen_tcp:close(Socket);
+                closing ->
+                    ok
+            end,
+            die(Socket, PPid, WsReason, normal);
+        {_, _, _, Rest2} ->
+            io:format("Unknown frame type~n"),
+            die(Socket, PPid, {1006, "Unknown frame type"}, normal)
+    end.
+
+encode_frame(F, O, Payload) ->
+    Mask = crypto:strong_rand_bytes(4),
+    MaskedPayload = apply_mask(Mask, iolist_to_binary(Payload)),
+
+    L = byte_size(MaskedPayload),
+    IoData = case L of
+                 _ when L < 126 ->
+                     [<<F:1, 0:3, O:4, 1:1, L:7>>, Mask, MaskedPayload];
+                 _ when L < 65536 ->
+                     [<<F:1, 0:3, O:4, 1:1, 126:7, L:16>>, Mask, MaskedPayload];
+                 _ ->
+                     [<<F:1, 0:3, O:4, 1:1, 127:7, L:64>>, Mask, MaskedPayload]
+           end,
+    iolist_to_binary(IoData).
+
+do_send(State = #state{socket = Socket}, Payload) ->
+    gen_tcp:send(Socket, encode_frame(1, 1, Payload)),
+    State.
+
+do_send_binary(State = #state{socket = Socket}, Payload) ->
+    gen_tcp:send(Socket, encode_frame(1, 2, Payload)),
+    State.
+
+do_close(State = #state{socket = Socket}, {Code, Reason}) ->
+    Payload = iolist_to_binary([<<Code:16>>, Reason]),
+    gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
+    State#state{phase = closing}.
+
+
+loop(State = #state{socket = Socket, ppid = PPid, data = Data,
+                    phase = Phase}) ->
+    receive
+        {tcp, Socket, Bin} ->
+            State1 = State#state{data = iolist_to_binary([Data, Bin])},
+            loop(do_recv(State1));
+        {send, Payload} when Phase == open ->
+            loop(do_send(State, Payload));
+        {send_binary, Payload} when Phase == open ->
+            loop(do_send_binary(State, Payload));
+        {tcp_closed, Socket} ->
+            die(Socket, PPid, {1006, "Connection closed abnormally"}, normal);
+        {close, WsReason} when Phase == open ->
+            loop(do_close(State, WsReason))
+    end.
+
+
+die(Socket, PPid, WsReason, Reason) ->
+    gen_tcp:shutdown(Socket, read_write),
+    PPid ! {rfc6455, close, self(), WsReason},
+    exit(Reason).
+
+
+%% --------------------------------------------------------------------------
+
+split(SubStr, Str, Limit) ->
+    split(SubStr, Str, Limit, "").
+
+split(SubStr, Str, Limit, Default) ->
+    Acc = split(SubStr, Str, Limit, [], Default),
+    lists:reverse(Acc).
+split(_SubStr, Str, 0, Acc, _Default) -> [Str | Acc];
+split(SubStr, Str, Limit, Acc, Default) ->
+    {L, R} = case string:str(Str, SubStr) of
+                 0 -> {Str, Default};
+                 I -> {string:substr(Str, 1, I-1),
+                       string:substr(Str, I+length(SubStr))}
+             end,
+    split(SubStr, R, Limit-1, [L | Acc], Default).
+
+
+apply_mask(Mask, Data) when is_number(Mask) ->
+    apply_mask(<<Mask:32>>, Data);
+
+apply_mask(<<0:32>>, Data) ->
+    Data;
+apply_mask(Mask, Data) ->
+    iolist_to_binary(lists:reverse(apply_mask2(Mask, Data, []))).
+
+apply_mask2(M = <<Mask:32>>, <<Data:32, Rest/binary>>, Acc) ->
+    T = Data bxor Mask,
+    apply_mask2(M, Rest, [<<T:32>> | Acc]);
+apply_mask2(<<Mask:24, _:8>>, <<Data:24>>, Acc) ->
+    T = Data bxor Mask,
+    [<<T:24>> | Acc];
+apply_mask2(<<Mask:16, _:16>>, <<Data:16>>, Acc) ->
+    T = Data bxor Mask,
+    [<<T:16>> | Acc];
+apply_mask2(<<Mask:8, _:24>>, <<Data:8>>, Acc) ->
+    T = Data bxor Mask,
+    [<<T:8>> | Acc];
+apply_mask2(_, <<>>, Acc) ->
+    Acc.