Преглед изворни кода

Improve the MQTT over Websocket connection

Feng Lee пре 7 година
родитељ
комит
f80cd2d986

+ 10 - 8
priv/emqx.schema

@@ -1372,21 +1372,21 @@ end}.
 ]}.
 
 {translation, "emqx.zones", fun(Conf) ->
-  Mapping = fun(retain_available, Val) ->
+  Mapping = fun("retain_available", Val) ->
                     {mqtt_retain_available, Val};
-               (wildcard_subscription, Val) ->
+               ("wildcard_subscription", Val) ->
                     {mqtt_wildcard_subscription, Val};
-               (shared_subscription, Val) ->
+               ("shared_subscription", Val) ->
                     {mqtt_shared_subscription, Val};
-               (Opt, Val) -> {Opt, Val}
+               (Opt, Val) ->
+                    {list_to_atom(Opt), Val}
             end,
   maps:to_list(
     lists:foldl(
       fun({["zone", Name, Opt], Val}, Zones) ->
               maps:update_with(list_to_atom(Name),
-                               fun(Opts) ->
-                                       [Mapping(list_to_atom(Opt), Val)|Opts]
-                               end, [], Zones)
+                               fun(Opts) -> [Mapping(Opt, Val)|Opts] end,
+                               [Mapping(Opt, Val)], Zones)
       end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf))))
 end}.
 
@@ -1507,9 +1507,11 @@ end}.
                 maps:update_with(list_to_atom(Name),
                                  fun(Opts) ->
                                          Merge(list_to_atom(Opt), Val, Opts)
-                                 end, [{subscriptions, Subscriptions(Name)}], Acc);
+                                 end, [{list_to_atom(Opt), Val},
+                                       {subscriptions, Subscriptions(Name)}], Acc);
            (_, Acc) -> Acc
         end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))))
+
 end}.
 
 %%--------------------------------------------------------------------

+ 11 - 3
src/emqx_access_control.erl

@@ -22,6 +22,8 @@
 -export([start_link/0, auth/2, check_acl/3, reload_acl/0, lookup_mods/1,
          register_mod/3, register_mod/4, unregister_mod/2, stop/0]).
 
+-export([clean_acl_cache/1, clean_acl_cache/2]).
+
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
@@ -50,9 +52,9 @@ start_link() ->
 
 register_default_mod() ->
     case emqx_config:get_env(acl_file) of
-        {ok, File} ->
-            emqx_access_control:register_mod(acl, emqx_acl_internal, [File]);
-        undefined  -> ok
+        undefined -> ok;
+        File ->
+            emqx_access_control:register_mod(acl, emqx_acl_internal, [File])
     end.
 
 %% @doc Authenticate Client.
@@ -127,6 +129,12 @@ tab_key(acl)  -> acl_modules.
 stop() ->
     gen_server:stop(?MODULE, normal, infinity).
 
+%%TODO: Support ACL cache...
+clean_acl_cache(_ClientId) ->
+    ok.
+clean_acl_cache(_ClientId, _Topic) ->
+    ok.
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------

+ 5 - 5
src/emqx_config.erl

@@ -33,15 +33,15 @@
 
 -define(APP, emqx).
 
+%% @doc Get environment
+-spec(get_env(Key :: atom()) -> term() | undefined).
+get_env(Key) ->
+    get_env(Key, undefined).
+
 -spec(get_env(Key :: atom(), Default :: term()) -> term()).
 get_env(Key, Default) ->
     application:get_env(?APP, Key, Default).
 
-%% @doc Get environment
--spec(get_env(Key :: atom()) -> {ok, any()} | undefined).
-get_env(Key) ->
-    application:get_env(?APP, Key).
-
 %% TODO:
 populate(_App) ->
     ok.

+ 33 - 55
src/emqx_connection.erl

@@ -21,9 +21,8 @@
 -include("emqx_misc.hrl").
 
 -export([start_link/3]).
-
 -export([info/1, stats/1, kick/1]).
--export([get_session/1]).
+-export([session/1]).
 -export([clean_acl_cache/1]).
 -export([get_rate_limit/1, set_rate_limit/2]).
 -export([get_pub_limit/1, set_pub_limit/2]).
@@ -44,7 +43,7 @@
           rate_limit,   %% Traffic rate limit
           limit_timer,  %% Rate limit timer
           proto_state,  %% MQTT protocol state
-          parse_state,  %% MQTT parse state
+          parser_state, %% MQTT parser state
           keepalive,    %% MQTT keepalive timer
           enable_stats, %% Enable stats
           stats_timer,  %% Stats timer
@@ -75,7 +74,7 @@ stats(CPid) ->
 kick(CPid) ->
     gen_server:call(CPid, kick).
 
-get_session(CPid) ->
+session(CPid) ->
     gen_server:call(CPid, session, infinity).
 
 clean_acl_cache(CPid) ->
@@ -100,22 +99,20 @@ set_pub_limit(CPid, Rl = {_Rate, _Burst}) ->
 init([Transport, RawSocket, Options]) ->
     case Transport:wait(RawSocket) of
         {ok, Socket} ->
-            io:format("Options: ~p~n", [Options]),
+            Zone = proplists:get_value(zone, Options),
             {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
             {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
             Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
-            Zone = proplists:get_value(zone, Options),
-            RateLimit = init_rate_limit(proplists:get_value(rate_limit, Options)),
-            PubLimit = init_rate_limit(emqx_zone:get_env(Zone, publish_limit)),
-            EnableStats = emqx_zone:get_env(Zone, enable_stats, false),
-            IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
+            PubLimit = rate_limit(emqx_zone:env(Zone, publish_limit)),
+            RateLimit = rate_limit(proplists:get_value(rate_limit, Options)),
+            EnableStats = emqx_zone:env(Zone, enable_stats, true),
+            IdleTimout = emqx_zone:env(Zone, idle_timeout, 30000),
             SendFun = send_fun(Transport, Socket, Peername),
-            ProtoState = emqx_protocol:init(#{zone     => Zone,
-                                              peername => Peername,
+            ProtoState = emqx_protocol:init(#{peername => Peername,
                                               sockname => Sockname,
                                               peercert => Peercert,
                                               sendfun  => SendFun}, Options),
-            ParseState = emqx_protocol:parser(ProtoState),
+            ParserState = emqx_protocol:parser(ProtoState),
             State = run_socket(#state{transport    = Transport,
                                       socket       = Socket,
                                       peername     = Peername,
@@ -124,7 +121,7 @@ init([Transport, RawSocket, Options]) ->
                                       rate_limit   = RateLimit,
                                       pub_limit    = PubLimit,
                                       proto_state  = ProtoState,
-                                      parse_state  = ParseState,
+                                      parser_state = ParserState,
                                       enable_stats = EnableStats,
                                       idle_timeout = IdleTimout}),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
@@ -133,9 +130,9 @@ init([Transport, RawSocket, Options]) ->
             {stop, Reason}
     end.
 
-init_rate_limit(undefined) ->
+rate_limit(undefined) ->
     undefined;
-init_rate_limit({Rate, Burst}) ->
+rate_limit({Rate, Burst}) ->
     esockd_rate_limit:new(Rate, Burst).
 
 send_fun(Transport, Socket, Peername) ->
@@ -152,8 +149,7 @@ send_fun(Transport, Socket, Peername) ->
 
 handle_call(info, From, State = #state{transport = Transport, socket = Socket, proto_state = ProtoState}) ->
     ProtoInfo = emqx_protocol:info(ProtoState),
-    ConnInfo = [{socktype, Transport:type(Socket)}
-                | ?record_to_proplist(state, State, ?INFO_KEYS)],
+    ConnInfo = [{socktype, Transport:type(Socket)} | ?record_to_proplist(state, State, ?INFO_KEYS)],
     StatsInfo = element(2, handle_call(stats, From, State)),
     {reply, lists:append([ConnInfo, StatsInfo, ProtoInfo]), State};
 
@@ -169,7 +165,7 @@ handle_call(stats, _From, State = #state{transport = Transport, socket = Sock, p
 handle_call(kick, _From, State) ->
     {stop, {shutdown, kick}, ok, State};
 
-handle_call(get_session, _From, State = #state{proto_state = ProtoState}) ->
+handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
     {reply, emqx_protocol:session(ProtoState), State};
 
 handle_call(clean_acl_cache, _From, State = #state{proto_state = ProtoState}) ->
@@ -195,28 +191,20 @@ handle_cast(Msg, State) ->
     ?LOG(error, "unexpected cast: ~p", [Msg], State),
     {noreply, State}.
 
-handle_info(Sub = {subscribe, _TopicTable}, State) ->
-    with_proto(
-      fun(ProtoState) ->
-          emqx_protocol:process(Sub, ProtoState)
-      end, State);
-
-handle_info(Unsub = {unsubscribe, _Topics}, State) ->
-    with_proto(
-      fun(ProtoState) ->
-          emqx_protocol:process(Unsub, ProtoState)
-      end, State);
-
-handle_info({deliver, PubOrAck}, State) ->
-    with_proto(
-      fun(ProtoState) ->
-          emqx_protocol:deliver(PubOrAck, ProtoState)
-      end, maybe_gc(ensure_stats_timer(State)));
+handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
+    case emqx_protocol:deliver(PubOrAck, ProtoState) of
+        {ok, ProtoState1} ->
+            {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))};
+        {error, Reason} ->
+            shutdown(Reason, State);
+        {error, Reason, ProtoState1} ->
+            shutdown(Reason, State#state{proto_state = ProtoState1})
+    end;
 
 handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
     Stats = element(2, handle_call(stats, undefined, State)),
     emqx_cm:set_client_stats(emqx_protocol:clientid(ProtoState), Stats),
-    {noreply, State = #state{stats_timer = undefined}, hibernate};
+    {noreply, State#state{stats_timer = undefined}, hibernate};
 
 handle_info(timeout, State) ->
     shutdown(idle_timeout, State);
@@ -306,20 +294,20 @@ handle_packet(<<>>, State) ->
     {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
 
 handle_packet(Bytes, State = #state{incoming     = Incoming,
-                                    parse_state  = ParseState,
+                                    parser_state = ParserState,
                                     proto_state  = ProtoState,
                                     idle_timeout = IdleTimeout}) ->
-    case catch emqx_frame:parse(Bytes, ParseState) of
-        {more, NewParseState} ->
-            {noreply, State#state{parse_state = NewParseState}, IdleTimeout};
+    case catch emqx_frame:parse(Bytes, ParserState) of
+        {more, NewParserState} ->
+            {noreply, State#state{parser_state = NewParserState}, IdleTimeout};
         {ok, Packet = ?PACKET(Type), Rest} ->
             emqx_metrics:received(Packet),
             case emqx_protocol:received(Packet, ProtoState) of
                 {ok, ProtoState1} ->
-                    ParseState1 = emqx_protocol:parser(ProtoState1),
-                    handle_packet(Rest, State#state{incoming    = count_packets(Type, Incoming),
-                                                    proto_state = ProtoState1,
-                                                    parse_state = ParseState1});
+                    ParserState1 = emqx_protocol:parser(ProtoState1),
+                    handle_packet(Rest, State#state{incoming     = count_packets(Type, Incoming),
+                                                    proto_state  = ProtoState1,
+                                                    parser_state = ParserState1});
                 {error, Error} ->
                     ?LOG(error, "Protocol error - ~p", [Error], State),
                     shutdown(Error, State);
@@ -368,16 +356,6 @@ run_socket(State = #state{transport = Transport, socket = Sock}) ->
     Transport:async_recv(Sock, 0, infinity),
     State#state{await_recv = true}.
 
-with_proto(Fun, State = #state{proto_state = ProtoState}) ->
-    case Fun(ProtoState) of
-        {ok, ProtoState1} ->
-            {noreply, State#state{proto_state = ProtoState1}};
-        {error, Reason} ->
-            shutdown(Reason, State);
-        {error, Reason, ProtoState1} ->
-            shutdown(Reason, State#state{proto_state = ProtoState1})
-    end.
-
 ensure_stats_timer(State = #state{enable_stats = true,
                                   stats_timer  = undefined,
                                   idle_timeout = IdleTimeout}) ->

+ 2 - 2
src/emqx_gc.erl

@@ -27,8 +27,8 @@
 -spec(conn_max_gc_count() -> integer()).
 conn_max_gc_count() ->
     case emqx_config:get_env(conn_force_gc_count) of
-        {ok, I} when I > 0 -> I + rand:uniform(I);
-        {ok, I} when I =< 0 -> undefined;
+        I when is_integer(I), I > 0 -> I + rand:uniform(I);
+        I when is_integer(I), I =< 0 -> undefined;
         undefined -> undefined
     end.
 

+ 1 - 1
src/emqx_listeners.erl

@@ -38,7 +38,7 @@ start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls ->
 
 %% Start MQTT/WS listener
 start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws ->
-    Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]),
+    Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]),
     NumAcceptors = proplists:get_value(acceptors, Options, 4),
     MaxConnections = proplists:get_value(max_connections, Options, 1024),
     TcpOptions = proplists:get_value(tcp_options, Options, []),

+ 17 - 41
src/emqx_mqueue.erl

@@ -39,32 +39,25 @@
 %%
 %% @end
 
+%% TODO: ...
 -module(emqx_mqueue).
 
-%% TODO: XYZ
 -include("emqx.hrl").
-
 -include("emqx_mqtt.hrl").
 
 -import(proplists, [get_value/3]).
 
--export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1,
-         dropped/1, stats/1]).
-
--define(LOW_WM, 0.2).
-
--define(HIGH_WM, 0.6).
+-export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1]).
+-export([dropped/1, stats/1]).
 
 -define(PQUEUE, emqx_pqueue).
 
 -type(priority() :: {iolist(), pos_integer()}).
 
--type(option() :: {type, simple | priority}
-                | {max_length, non_neg_integer()} %% Max queue length
-                | {priority, list(priority())}
-                | {low_watermark, float()}  %% Low watermark
-                | {high_watermark, float()} %% High watermark
-                | {store_qos0, boolean()}). %% Queue Qos0?
+-type(options() :: #{type       => simple | priority,
+                     max_len    => non_neg_integer(),
+                     priority   => list(priority()),
+                     store_qos0 => boolean()}).
 
 -type(stat() :: {max_len, non_neg_integer()}
               | {len, non_neg_integer()}
@@ -76,29 +69,22 @@
                  pseq = 0, priorities = [],
                  %% len of simple queue
                  len = 0, max_len = 0,
-                 low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
                  qos0 = false, dropped = 0}).
 
 -type(mqueue() :: #mqueue{}).
 
--export_type([mqueue/0, priority/0, option/0]).
+-export_type([mqueue/0, priority/0, options/0]).
 
-%% @doc New queue.
--spec(new(iolist(), list(option())) -> mqueue()).
-new(Name, Opts) ->
-    Type = get_value(type, Opts, simple),
-    MaxLen = get_value(max_length, Opts, 0),
+-spec(new(iolist(), options()) -> mqueue()).
+new(Name, #{type := Type, max_len := MaxLen, store_qos0 := StoreQos0}) ->
     init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
-                   len = 0, max_len = MaxLen,
-                   low_wm = low_wm(MaxLen, Opts),
-                   high_wm = high_wm(MaxLen, Opts),
-                   qos0 = get_value(store_qos0, Opts, false)}, Opts).
+                   len = 0, max_len = MaxLen, qos0 = StoreQos0}).
 
-init_q(MQ = #mqueue{type = simple}, _Opts) ->
+init_q(MQ = #mqueue{type = simple}) ->
     MQ#mqueue{q = queue:new()};
-init_q(MQ = #mqueue{type = priority}, Opts) ->
-    Priorities = get_value(priority, Opts, []),
-    init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
+init_q(MQ = #mqueue{type = priority}) ->
+    %%Priorities = get_value(priority, Opts, []),
+    init_p([], MQ#mqueue{q = ?PQUEUE:new()}).
 
 init_p([], MQ) ->
     MQ;
@@ -110,16 +96,6 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
     <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
     {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
 
-low_wm(0, _Opts) ->
-    undefined;
-low_wm(MaxLen, Opts) ->
-    round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)).
-
-high_wm(0, _Opts) ->
-    undefined;
-high_wm(MaxLen, Opts) ->
-    round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
-
 -spec(name(mqueue()) -> iolist()).
 name(#mqueue{name = Name}) ->
     Name.
@@ -172,8 +148,8 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
             MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
     end;
 in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
-                                                    priorities = Priorities,
-                                                    max_len = MaxLen}) ->
+                                               priorities = Priorities,
+                                               max_len = MaxLen}) ->
     case lists:keysearch(Topic, 1, Priorities) of
         {value, {_, Pri}} ->
             case ?PQUEUE:plen(Pri, Q) >= MaxLen of

+ 1 - 1
src/emqx_packet.erl

@@ -114,7 +114,7 @@ format_variable(#mqtt_packet_connect{
 
 format_variable(#mqtt_packet_connack{ack_flags   = AckFlags,
                                      reason_code = ReasonCode}) ->
-    io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReasonCode]);
+    io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]);
 
 format_variable(#mqtt_packet_publish{topic_name = TopicName,
                                      packet_id  = PacketId}) ->

+ 24 - 27
src/emqx_plugins.erl

@@ -32,12 +32,11 @@
 -spec(init() -> ok).
 init() ->
     case emqx_config:get_env(plugins_etc_dir) of
-        {ok, PluginsEtc} ->
+        undefined  -> ok;
+        PluginsEtc ->
             CfgFiles = [filename:join(PluginsEtc, File) ||
-                          File <- filelib:wildcard("*.config", PluginsEtc)],
-            lists:foreach(fun init_config/1, CfgFiles);
-        undefined ->
-            ok
+                        File <- filelib:wildcard("*.config", PluginsEtc)],
+            lists:foreach(fun init_config/1, CfgFiles)
     end.
 
 init_config(CfgFile) ->
@@ -51,25 +50,24 @@ init_config(CfgFile) ->
 load() ->
     load_expand_plugins(),
     case emqx_config:get_env(plugins_loaded_file) of
-        {ok, File} ->
+        undefined -> %% No plugins available
+            ignore;
+        File ->
             ensure_file(File),
-            with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end);
-        undefined ->
-            %% No plugins available
-            ignore
+            with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
     end.
 
 load_expand_plugins() ->
     case emqx_config:get_env(expand_plugins_dir) of
-        {ok, Dir} ->
+        undefined -> ok;
+        Dir ->
             PluginsDir = filelib:wildcard("*", Dir),
             lists:foreach(fun(PluginDir) ->
                 case filelib:is_dir(Dir ++ PluginDir) of
                     true  -> load_expand_plugin(Dir ++ PluginDir);
                     false -> ok
                 end
-            end, PluginsDir);
-        _ -> ok
+            end, PluginsDir)
     end.
 
 load_expand_plugin(PluginDir) ->
@@ -102,7 +100,8 @@ init_expand_plugin_config(PluginDir) ->
 
 get_expand_plugin_config() ->
     case emqx_config:get_env(expand_plugins_dir) of
-        {ok, Dir} ->
+        undefined -> ok;
+        Dir ->
             PluginsDir = filelib:wildcard("*", Dir),
             lists:foldl(fun(PluginDir, Acc) ->
                 case filelib:is_dir(Dir ++ PluginDir) of
@@ -115,11 +114,9 @@ get_expand_plugin_config() ->
                     false ->
                         Acc
                 end
-            end, [], PluginsDir);
-        _ -> ok
+            end, [], PluginsDir)
     end.
 
-
 ensure_file(File) ->
     case filelib:is_file(File) of false -> write_loaded([]); true -> ok end.
 
@@ -145,10 +142,10 @@ load_plugins(Names, Persistent) ->
 -spec(unload() -> list() | {error, term()}).
 unload() ->
     case emqx_config:get_env(plugins_loaded_file) of
-        {ok, File} ->
-            with_loaded_file(File, fun stop_plugins/1);
         undefined ->
-            ignore
+            ignore;
+        File ->
+            with_loaded_file(File, fun stop_plugins/1)
     end.
 
 %% stop plugins
@@ -159,7 +156,9 @@ stop_plugins(Names) ->
 -spec(list() -> [plugin()]).
 list() ->
     case emqx_config:get_env(plugins_etc_dir) of
-        {ok, PluginsEtc} ->
+        undefined ->
+            [];
+        PluginsEtc ->
             CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(),
             Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles],
             StartedApps = names(started_app),
@@ -168,9 +167,7 @@ list() ->
                               true  -> Plugin#plugin{active = true};
                               false -> Plugin
                           end
-                      end, Plugins);
-        undefined ->
-            []
+                      end, Plugins)
     end.
 
 plugin(CfgFile) ->
@@ -314,14 +311,14 @@ plugin_unloaded(Name, true) ->
 
 read_loaded() ->
     case emqx_config:get_env(plugins_loaded_file) of
-        {ok, File} -> read_loaded(File);
-        undefined  -> {error, not_found}
+        undefined -> {error, not_found};
+        File      -> read_loaded(File)
     end.
 
 read_loaded(File) -> file:consult(File).
 
 write_loaded(AppNames) ->
-    {ok, File} = emqx_config:get_env(plugins_loaded_file),
+    File = emqx_config:get_env(plugins_loaded_file),
     case file:open(File, [binary, write]) of
         {ok, Fd} ->
             lists:foreach(fun(Name) ->

+ 14 - 32
src/emqx_protocol.erl

@@ -36,7 +36,7 @@
                        {shared_subscription,   true},
                        {wildcard_subscription, true}]).
 
--record(proto_state, {sockprops, capabilities, connected, client_id, client_pid,
+-record(proto_state, {zone, sockprops, capabilities, connected, client_id, client_pid,
                       clean_start, proto_ver, proto_name, username, connprops,
                       is_superuser, will_msg, keepalive, keepalive_backoff, session,
                       recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0,
@@ -56,15 +56,17 @@
 
 -export_type([proto_state/0]).
 
-init(SockProps = #{zone := Zone, peercert := Peercert}, Options) ->
-    MountPoint = emqx_zone:get_env(Zone, mountpoint),
-    Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
+init(SockProps = #{peercert := Peercert}, Options) ->
+    Zone = proplists:get_value(zone, Options),
+    MountPoint = emqx_zone:env(Zone, mountpoint),
+    Backoff = emqx_zone:env(Zone, keepalive_backoff, 0.75),
     Username = case proplists:get_value(peer_cert_as_username, Options) of
                    cn -> esockd_peercert:common_name(Peercert);
                    dn -> esockd_peercert:subject(Peercert);
                    _  -> undefined
                end,
-    #proto_state{sockprops          = SockProps,
+    #proto_state{zone               = Zone,
+                 sockprops          = SockProps,
                  capabilities       = capabilities(Zone),
                  connected          = false,
                  clean_start        = true,
@@ -82,7 +84,7 @@ init(SockProps = #{zone := Zone, peercert := Peercert}, Options) ->
                  send_msg           = 0}.
 
 capabilities(Zone) ->
-    Capabilities = emqx_zone:get_env(Zone, mqtt_capabilities, []),
+    Capabilities = emqx_zone:env(Zone, mqtt_capabilities, []),
     maps:from_list(lists:ukeymerge(1, ?CAPABILITIES, Capabilities)).
 
 parser(#proto_state{capabilities = #{max_packet_size := Size}, proto_ver = Ver}) ->
@@ -128,7 +130,9 @@ received(Packet = ?PACKET(Type), ProtoState) ->
             {error, Reason, ProtoState}
     end.
 
-process(?CONNECT_PACKET(Var), ProtoState = #proto_state{username = Username0, client_pid = ClientPid}) ->
+process(?CONNECT_PACKET(Var), ProtoState = #proto_state{zone       = Zone,
+                                                        username   = Username0,
+                                                        client_pid = ClientPid}) ->
     #mqtt_packet_connect{proto_name  = ProtoName,
                          proto_ver   = ProtoVer,
                          is_bridge   = IsBridge,
@@ -160,7 +164,8 @@ process(?CONNECT_PACKET(Var), ProtoState = #proto_state{username = Username0, cl
                     %% Generate clientId if null
                     ProtoState2 = maybe_set_clientid(ProtoState1),
                     %% Open session
-                    case emqx_sm:open_session(#{clean_start => CleanStart,
+                    case emqx_sm:open_session(#{zone        => Zone,
+                                                clean_start => CleanStart,
                                                 client_id   => clientid(ProtoState2),
                                                 username    => Username,
                                                 client_pid  => ClientPid}) of
@@ -242,23 +247,10 @@ process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), State) ->
                 {ok, TopicFilters1} ->
                     ok = emqx_session:subscribe(Session, {PacketId, Properties, mount(replvar(MountPoint, State), TopicFilters1)}),
                     {ok, State};
-                {stop, _} ->
-                    {ok, State}
+                {stop, _} -> {ok, State}
             end
     end;
 
-process({subscribe, RawTopicTable},
-        State = #proto_state{client_id = ClientId,
-                             username  = Username,
-                             session   = Session}) ->
-    TopicTable = parse_topic_filters(RawTopicTable),
-    case emqx_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
-        {ok, TopicTable1} ->
-            emqx_session:subscribe(Session, TopicTable1);
-        {stop, _} -> ok
-    end,
-    {ok, State};
-
 %% Protect from empty topic list
 process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
     send(?UNSUBACK_PACKET(PacketId), State);
@@ -276,16 +268,6 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopics),
     end,
     send(?UNSUBACK_PACKET(PacketId), State);
 
-process({unsubscribe, RawTopics}, State = #proto_state{client_id = ClientId,
-                                                       username  = Username,
-                                                       session   = Session}) ->
-    case emqx_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
-        {ok, TopicTable} ->
-            emqx_session:unsubscribe(Session, {undefined, #{}, TopicTable});
-        {stop, _} -> ok
-    end,
-    {ok, State};
-
 process(?PACKET(?PINGREQ), ProtoState) ->
     send(?PACKET(?PINGRESP), ProtoState);
 

+ 0 - 1
src/emqx_router.erl

@@ -39,7 +39,6 @@
 -type(destination() :: node() | {binary(), node()}).
 
 -record(batch, {enabled, timer, pending}).
-
 -record(state, {pool, id, batch :: #batch{}}).
 
 -define(ROUTE, emqx_route).

+ 53 - 32
src/emqx_session.erl

@@ -48,7 +48,9 @@
 -export([resume/2, discard/2]).
 -export([subscribe/2]).%%, subscribe/3]).
 -export([publish/3]).
--export([puback/2, pubrec/2, pubrel/2, pubcomp/2]).
+-export([puback/2, puback/3]).
+-export([pubrec/2, pubrec/3]).
+-export([pubrel/2, pubcomp/2]).
 -export([unsubscribe/2]).
 
 %% gen_server callbacks
@@ -139,7 +141,11 @@
         }).
 
 -define(TIMEOUT, 60000).
+
+-define(DEFAULT_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}).
+
 -define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]).
+
 -define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid,
                      next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
                      max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
@@ -151,16 +157,21 @@
                           "Session(~s): " ++ Format, [State#state.client_id | Args])).
 
 %% @doc Start a session
--spec(start_link(Attrs :: map()) -> {ok, pid()} | {error, term()}).
-start_link(Attrs) ->
-    gen_server:start_link(?MODULE, Attrs, [{hibernate_after, 10000}]).
+-spec(start_link(SessAttrs :: map()) -> {ok, pid()} | {error, term()}).
+start_link(SessAttrs) ->
+    gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, 30000}]).
 
 %%------------------------------------------------------------------------------
 %% PubSub API
 %%------------------------------------------------------------------------------
 
+-spec(subscribe(pid(), list({topic(), map()}) |
+                {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
+%% internal call
+subscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
+    %%TODO: Parse the topic filters?
+    subscribe(SPid, {undefined, #{}, TopicFilters});
 %% for mqtt 5.0
--spec(subscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
 subscribe(SPid, SubReq = {PacketId, Props, TopicFilters}) ->
     gen_server:cast(SPid, {subscribe, self(), SubReq}).
 
@@ -200,6 +211,9 @@ pubcomp(SPid, PacketId) ->
     gen_server:cast(SPid, {pubcomp, PacketId}).
 
 -spec(unsubscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
+unsubscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
+    %%TODO: Parse the topic filters?
+    unsubscribe(SPid, {undefined, #{}, TopicFilters});
 unsubscribe(SPid, UnsubReq = {PacketId, Properties, TopicFilters}) ->
     gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
 
@@ -252,40 +266,43 @@ close(SPid) ->
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
 
-init(#{clean_start := CleanStart, client_id := ClientId, username := Username, client_pid  := ClientPid}) ->
+init(#{zone        := Zone,
+       client_id   := ClientId,
+       client_pid  := ClientPid,
+       clean_start := CleanStart,
+       username    := Username}) ->
     process_flag(trap_exit, true),
     true = link(ClientPid),
     init_stats([deliver_msg, enqueue_msg]),
-    {ok, Env} = emqx_config:get_env(session),
-    {ok, QEnv} = emqx_config:get_env(mqueue),
-    MaxInflight = proplists:get_value(max_inflight, Env, 0),
-    EnableStats = proplists:get_value(enable_stats, Env, false),
-    IgnoreLoopDeliver = proplists:get_value(ignore_loop_deliver, Env, false),
-    MQueue = emqx_mqueue:new(ClientId, QEnv),
+    MaxInflight = emqx_zone:env(Zone, max_inflight),
     State = #state{clean_start       = CleanStart,
                    binding           = binding(ClientPid),
                    client_id         = ClientId,
                    client_pid        = ClientPid,
                    username          = Username,
                    subscriptions     = #{},
-                   max_subscriptions = proplists:get_value(max_subscriptions, Env, 0),
-                   upgrade_qos       = proplists:get_value(upgrade_qos, Env, false),
+                   max_subscriptions = emqx_zone:env(Zone, max_subscriptions, 0),
+                   upgrade_qos       = emqx_zone:env(Zone, upgrade_qos, false),
                    max_inflight      = MaxInflight,
                    inflight          = emqx_inflight:new(MaxInflight),
-                   mqueue            = MQueue,
-                   retry_interval    = proplists:get_value(retry_interval, Env),
+                   mqueue            = init_mqueue(Zone, ClientId),
+                   retry_interval    = emqx_zone:env(Zone, retry_interval, 0),
                    awaiting_rel      = #{},
-                   await_rel_timeout = proplists:get_value(await_rel_timeout, Env),
-                   max_awaiting_rel  = proplists:get_value(max_awaiting_rel, Env),
-                   expiry_interval   = proplists:get_value(expiry_interval, Env),
-                   enable_stats      = EnableStats,
-                   ignore_loop_deliver = IgnoreLoopDeliver,
+                   await_rel_timeout = emqx_zone:env(Zone, await_rel_timeout),
+                   max_awaiting_rel  = emqx_zone:env(Zone, max_awaiting_rel),
+                   expiry_interval   = emqx_zone:env(Zone, session_expiry_interval),
+                   enable_stats      = emqx_zone:env(Zone, enable_stats, true),
+                   ignore_loop_deliver = emqx_zone:env(Zone, ignore_loop_deliver, true),
                    created_at        = os:timestamp()},
     emqx_sm:register_session(ClientId, info(State)),
-    emqx_hooks:run('session.created', [ClientId, Username]),
-    io:format("Session started: ~p~n", [self()]),
+    emqx_hooks:run('session.created', [ClientId]),
     {ok, emit_stats(State), hibernate}.
 
+init_mqueue(Zone, ClientId) ->
+    emqx_mqueue:new(ClientId, #{type => simple,
+                                max_len => emqx_zone:env(Zone, max_mqueue_len),
+                                store_qos0 => emqx_zone:env(Zone, mqueue_store_qos0)}).
+
 init_stats(Keys) ->
     lists:foreach(fun(K) -> put(K, 0) end, Keys).
 
@@ -331,7 +348,7 @@ handle_call(Req, _From, State) ->
     {reply, ignored, State}.
 
 handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}},
-            State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
+            State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
     ?LOG(info, "Subscribe ~p", [TopicFilters], State),
     {ReasonCodes, Subscriptions1} =
     lists:foldl(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
@@ -342,12 +359,12 @@ handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}},
                              SubMap;
                          {ok, OldOpts} ->
                              emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
-                             emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, SubOpts}),
+                             emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
                              ?LOG(warning, "Duplicated subscribe ~s, old_opts: ~p, new_opts: ~p", [Topic, OldOpts, SubOpts], State),
                              maps:put(Topic, SubOpts, SubMap);
                          error ->
                              emqx_broker:subscribe(Topic, ClientId, SubOpts),
-                             emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, SubOpts}),
+                             emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
                              maps:put(Topic, SubOpts, SubMap)
                      end}
                 end, {[], Subscriptions}, TopicFilters),
@@ -355,14 +372,14 @@ handle_cast({subscribe, From, {PacketId, _Properties, TopicFilters}},
     {noreply, emit_stats(State#state{subscriptions = Subscriptions1})};
 
 handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
-            State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
+            State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
     ?LOG(info, "Unsubscribe ~p", [TopicFilters], State),
     {ReasonCodes, Subscriptions1} =
     lists:foldl(fun(Topic, {RcAcc, SubMap}) ->
                 case maps:find(Topic, SubMap) of
                     {ok, SubOpts} ->
                         emqx_broker:unsubscribe(Topic, ClientId),
-                        emqx_hooks:run('session.unsubscribed', [ClientId, Username], {Topic, SubOpts}),
+                        emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]),
                         {[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)};
                     error ->
                         {[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap}
@@ -473,13 +490,18 @@ handle_cast(Msg, State) ->
     emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-%% Ignore Messages delivered by self
+handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
+    {noreply, lists:foldl(fun(Msg, NewState) ->
+                              element(2, handle_info({dispatch, Topic, Msg}, NewState))
+                          end, State, Msgs)};
+
+%% Ignore messages delivered by self
 handle_info({dispatch, _Topic, #message{from = ClientId}},
              State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
     {noreply, State};
 
 %% Dispatch Message
-handle_info({dispatch, Topic, Msg}, State) ->
+handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
     {noreply, gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))};
 
 %% Do nothing if the client has been disconnected.
@@ -510,11 +532,10 @@ handle_info({'EXIT', ClientPid, Reason},
     {noreply, emit_stats(State1), hibernate};
 
 handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
-    %%ignore
+    %% ignore
     {noreply, State, hibernate};
 
 handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
-
     ?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
          [ClientPid, Pid, Reason], State),
     {noreply, State, hibernate};

+ 0 - 3
src/emqx_sup.erl

@@ -71,8 +71,6 @@ init([]) ->
     SessionSup = supervisor_spec(emqx_session_sup),
     %% Connection Manager
     CMSup = supervisor_spec(emqx_cm_sup),
-    %% WebSocket Connection Sup
-    WSConnSup = supervisor_spec(emqx_ws_connection_sup),
     %% Sys Sup
     SysSup = supervisor_spec(emqx_sys_sup),
     {ok, {{one_for_all, 0, 1},
@@ -84,7 +82,6 @@ init([]) ->
            SMSup,
            SessionSup,
            CMSup,
-           WSConnSup,
            SysSup]}}.
 
 %%--------------------------------------------------------------------

+ 1 - 1
src/emqx_time.erl

@@ -20,7 +20,7 @@ seed() ->
     rand:seed(exsplus, erlang:timestamp()).
 
 now_ms() ->
-    now_ms(os:timestamp()).
+    os:system_time(milli_seconds).
 
 now_ms({MegaSecs, Secs, MicroSecs}) ->
     (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).

+ 0 - 103
src/emqx_ws.erl

@@ -1,103 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_ws).
-
--include("emqx_mqtt.hrl").
-
--import(proplists, [get_value/3]).
-
-%% WebSocket Loop State
--record(wsocket_state, {req, peername, client_pid, max_packet_size, parser}).
-
--define(WSLOG(Level, Format, Args, State),
-              lager:Level("WsClient(~s): " ++ Format,
-                          [esockd_net:format(State#wsocket_state.peername) | Args])).
-
--export([init/2]).
--export([websocket_init/1]).
--export([websocket_handle/2]).
--export([websocket_info/2]).
-
-init(Req0, _State) ->
-    case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req0) of
-        undefined ->
-            {cowboy_websocket, Req0, #wsocket_state{}};
-        Subprotocols ->
-            case lists:member(<<"mqtt">>, Subprotocols) of
-                true ->
-                    Peername = cowboy_req:peer(Req0),
-                    Req = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req0),
-                    {cowboy_websocket, Req, #wsocket_state{req = Req, peername = Peername}, #{idle_timeout => 86400000}};
-                false ->
-                    Req = cowboy_req:reply(400, Req0),
-                    {ok, Req, #wsocket_state{}}
-            end
-    end.
-
-websocket_init(State = #wsocket_state{req = Req}) ->
-    case emqx_ws_connection_sup:start_connection(self(), Req) of
-        {ok, ClientPid} ->
-            {ok, ProtoEnv} = emqx_config:get_env(protocol),
-            PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
-            Parser = emqx_frame:initial_state(#{max_packet_size => PacketSize}),
-            NewState = State#wsocket_state{parser = Parser,
-                                           max_packet_size = PacketSize,
-                                           client_pid = ClientPid},
-            {ok, NewState};
-        Error ->
-            ?WSLOG(error, "Start client fail: ~p", [Error], State),
-            {stop, State}
-    end.
-
-websocket_handle({binary, <<>>}, State) ->
-    {ok, State};
-websocket_handle({binary, [<<>>]}, State) ->
-    {ok, State};
-
-websocket_handle({binary, Data}, State = #wsocket_state{client_pid = ClientPid, parser = Parser}) ->
-    ?WSLOG(debug, "RECV ~p", [Data], State),
-    BinSize = iolist_size(Data),
-    emqx_metrics:inc('bytes/received', BinSize),
-    case catch emqx_frame:parse(iolist_to_binary(Data), Parser) of
-        {more, NewParser} ->
-            {ok, State#wsocket_state{parser = NewParser}};
-        {ok, Packet, Rest} ->
-            gen_server:cast(ClientPid, {received, Packet, BinSize}),
-            websocket_handle({binary, Rest}, reset_parser(State));
-        {error, Error} ->
-            ?WSLOG(error, "Frame error: ~p", [Error], State),
-            {stop, State};
-        {'EXIT', Reason} ->
-            ?WSLOG(error, "Frame error: ~p", [Reason], State),
-            ?WSLOG(error, "Error data: ~p", [Data], State),
-            {stop, State}
-    end.
-
-websocket_info({binary, Data}, State) ->
-    {reply, {binary, Data}, State};
-
-websocket_info({'EXIT', Pid, Reason}, State = #wsocket_state{client_pid = Pid}) ->
-    ?WSLOG(debug, "EXIT: ~p", [Reason], State),
-    {stop, State};
-
-websocket_info(_Info, State) ->
-    {ok, State}.
-
-reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) ->
-    State#wsocket_state{parser = emqx_frame:initial_state(#{max_packet_size => PacketSize})}.
-
-

+ 205 - 228
src/emqx_ws_connection.erl

@@ -14,190 +14,211 @@
 
 -module(emqx_ws_connection).
 
--behaviour(gen_server).
-
 -include("emqx.hrl").
-
 -include("emqx_mqtt.hrl").
+-include("emqx_misc.hrl").
 
--import(proplists, [get_value/2, get_value/3]).
-
-%% API Exports
--export([start_link/3]).
-
-%% Management and Monitor API
--export([info/1, stats/1, kick/1, clean_acl_cache/2]).
-
-%% SUB/UNSUB Asynchronously
--export([subscribe/2, unsubscribe/2]).
-
-%% Get the session proc?
+-export([info/1]).
+-export([stats/1]).
+-export([kick/1]).
 -export([session/1]).
 
-%% gen_server Function Exports
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
-%% WebSocket Client State
--record(wsclient_state, {ws_pid, peername, proto_state, keepalive,
-                         enable_stats, force_gc_count}).
-
-%% recv_oct
-%% Number of bytes received by the socket.
-
-%% recv_cnt
-%% Number of packets received by the socket.
-
--define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
+%% websocket callbacks
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+-export([terminate/3]).
+
+-record(state, {
+          request,
+          options,
+          peername,
+          sockname,
+          proto_state,
+          parser_state,
+          keepalive,
+          enable_stats,
+          stats_timer,
+          idle_timeout,
+          shutdown_reason
+         }).
+
+-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
+
+-define(INFO_KEYS, [peername, sockname]).
 
 -define(WSLOG(Level, Format, Args, State),
-              emqx_logger:Level("WsClient(~s): " ++ Format,
-                                [esockd_net:format(State#wsclient_state.peername) | Args])).
-
-%% @doc Start WebSocket Client.
-start_link(Env, WsPid, Req) ->
-    gen_server:start_link(?MODULE, [Env, WsPid, Req],
-                          [[{hibernate_after, 10000}]]).
-
-info(CPid) ->
-    gen_server:call(CPid, info).
-
-stats(CPid) ->
-    gen_server:call(CPid, stats).
-
-kick(CPid) ->
-    gen_server:call(CPid, kick).
-
-subscribe(CPid, TopicTable) ->
-    CPid ! {subscribe, TopicTable}.
-
-unsubscribe(CPid, Topics) ->
-    CPid ! {unsubscribe, Topics}.
-
-session(CPid) ->
-    gen_server:call(CPid, session).
-
-clean_acl_cache(CPid, Topic) ->
-    gen_server:call(CPid, {clean_acl_cache, Topic}).
+        lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+info(WSPid) ->
+    call(WSPid, info).
+
+stats(WSPid) ->
+    call(WSPid, stats).
+
+kick(WSPid) ->
+    call(WSPid, kick).
+
+session(WSPid) ->
+    call(WSPid, session).
+
+call(WSPid, Req) ->
+    Mref = erlang:monitor(process, WSPid),
+    WSPid ! {call, {self(), Mref}, Req},
+    receive
+        {Mref, Reply} ->
+            erlang:demonitor(Mref, [flush]),
+            Reply;
+        {'DOWN', Mref, _, _, Reason} ->
+            exit(Reason)
+    after 5000 ->
+        erlang:demonitor(Mref, [flush]),
+        exit(timeout)
+    end.
 
-%%--------------------------------------------------------------------
-%% gen_server Callbacks
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% WebSocket callbacks
+%%------------------------------------------------------------------------------
+
+init(Req, Opts) ->
+    io:format("Opts: ~p~n", [Opts]),
+    case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
+        undefined ->
+            {cowboy_websocket, Req, #state{}};
+        Subprotocols ->
+            case lists:member(<<"mqtt">>, Subprotocols) of
+                true ->
+                    Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req),
+                    {cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}};
+                false ->
+                    {ok, cowboy_req:reply(400, Req), #state{}}
+            end
+    end.
 
-init([Options, WsPid, Req]) ->
-    init_stas(),
-    process_flag(trap_exit, true),
-    true = link(WsPid),
+websocket_init(#state{request = Req, options = Options}) ->
     Peername = cowboy_req:peer(Req),
-    Headers  = cowboy_req:headers(Req),
     Sockname = cowboy_req:sock(Req),
     Peercert = cowboy_req:cert(Req),
-    Zone     = proplists:get_value(zone, Options),
-    ProtoState = emqx_protocol:init(#{zone     => Zone,
-                                      peername => Peername,
+    ProtoState = emqx_protocol:init(#{peername => Peername,
                                       sockname => Sockname,
                                       peercert => Peercert,
-                                      sendfun  => send_fun(WsPid)},
-                                    [{ws_initial_headers, Headers} | Options]),
-    IdleTimeout = get_value(client_idle_timeout, Options, 30000),
-    EnableStats = get_value(client_enable_stats, Options, false),
-    ForceGcCount = emqx_gc:conn_max_gc_count(),
-    {ok, #wsclient_state{ws_pid         = WsPid,
-                         peername       = Peername,
-                         proto_state    = ProtoState,
-                         enable_stats   = EnableStats,
-                         force_gc_count = ForceGcCount}, IdleTimeout}.
-
-handle_call(info, From, State = #wsclient_state{peername    = Peername,
-                                                proto_state = ProtoState}) ->
-    Info = [{websocket, true}, {peername, Peername} | emqx_protocol:info(ProtoState)],
-    {reply, Stats, _, _} = handle_call(stats, From, State),
-    reply(lists:append(Info, Stats), State);
-
-handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) ->
-    reply(lists:append([emqx_misc:proc_stats(),
-                        wsock_stats(),
-                        emqx_protocol:stats(ProtoState)]), State);
-
-handle_call(kick, _From, State) ->
-    {stop, {shutdown, kick}, ok, State};
-
-handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
-    reply(emqx_protocol:session(ProtoState), State);
-
-handle_call({clean_acl_cache, Topic}, _From, State) ->
-    erase({acl, publish, Topic}),
-    reply(ok, State);
-
-handle_call(Req, _From, State) ->
-    ?WSLOG(error, "Unexpected request: ~p", [Req], State),
-    reply({error, unexpected_request}, State).
-
-handle_cast({received, Packet, BinSize}, State = #wsclient_state{proto_state = ProtoState}) ->
-    put(recv_oct, get(recv_oct) + BinSize),
-    put(recv_cnt, get(recv_cnt) + 1),
-    emqx_metrics:received(Packet),
-    case emqx_protocol:received(Packet, ProtoState) of
-        {ok, ProtoState1} ->
-            {noreply, gc(State#wsclient_state{proto_state = ProtoState1}), hibernate};
-        {error, Error} ->
-            ?WSLOG(error, "Protocol error - ~p", [Error], State),
-            shutdown(Error, State);
-        {error, Error, ProtoState1} ->
-            shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
-        {stop, Reason, ProtoState1} ->
-            stop(Reason, State#wsclient_state{proto_state = ProtoState1})
-    end;
-
-handle_cast(Msg, State) ->
-    ?WSLOG(error, "unexpected msg: ~p", [Msg], State),
-    {noreply, State}.
-
-handle_info(SubReq ={subscribe, _TopicTable}, State) ->
-    with_proto(
-      fun(ProtoState) ->
-          emqx_protocol:process(SubReq, ProtoState)
-      end, State);
-
-handle_info(UnsubReq = {unsubscribe, _Topics}, State) ->
-    with_proto(
-      fun(ProtoState) ->
-          emqx_protocol:process(UnsubReq, ProtoState)
-      end, State);
-
-handle_info({deliver, PubOrAck}, State) ->
-    with_proto(
-      fun(ProtoState) ->
-          emqx_protocol:deliver(PubOrAck, ProtoState)
-      end, gc(State));
+                                      sendfun  => send_fun(self())}, Options),
+    ParserState = emqx_protocol:parser(ProtoState),
+    Zone = proplists:get_value(zone, Options),
+    EnableStats = emqx_zone:env(Zone, enable_stats, true),
+    IdleTimout = emqx_zone:env(Zone, idle_timeout, 30000),
+    lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
+    {ok, #state{peername     = Peername,
+                sockname     = Sockname,
+                parser_state = ParserState,
+                proto_state  = ProtoState,
+                enable_stats = EnableStats,
+                idle_timeout = IdleTimout}}.
 
-handle_info(emit_stats, State) ->
-    {noreply, emit_stats(State), hibernate};
+send_fun(WsPid) ->
+    fun(Data) ->
+        BinSize = iolist_size(Data),
+        emqx_metrics:inc('bytes/sent', BinSize),
+        put(send_oct, get(send_oct) + BinSize),
+        put(send_cnt, get(send_cnt) + 1),
+        WsPid ! {binary, iolist_to_binary(Data)}
+    end.
 
-handle_info(timeout, State) ->
-    shutdown(idle_timeout, State);
+stat_fun() ->
+    fun() -> {ok, get(recv_oct)} end.
+
+websocket_handle({binary, <<>>}, State) ->
+    {ok, State};
+websocket_handle({binary, [<<>>]}, State) ->
+    {ok, State};
+websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
+                                                proto_state  = ProtoState}) ->
+    BinSize = iolist_size(Data),
+    put(recv_oct, get(recv_oct) + BinSize),
+    ?WSLOG(debug, "RECV ~p", [Data], State),
+    emqx_metrics:inc('bytes/received', BinSize),
+    case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
+        {more, NewParserState} ->
+            {ok, State#state{parser_state = NewParserState}};
+        {ok, Packet, Rest} ->
+            emqx_metrics:received(Packet),
+            put(recv_cnt, get(recv_cnt) + 1),
+            case emqx_protocol:received(Packet, ProtoState) of
+                {ok, ProtoState1} ->
+                    websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
+                {error, Error} ->
+                    ?WSLOG(error, "Protocol error - ~p", [Error], State),
+                    {stop, State};
+                {error, Error, ProtoState1} ->
+                    shutdown(Error, State#state{proto_state = ProtoState1});
+                {stop, Reason, ProtoState1} ->
+                    shutdown(Reason, State#state{proto_state = ProtoState1})
+            end;
+        {error, Error} ->
+            ?WSLOG(error, "Frame error: ~p", [Error], State),
+            {stop, State};
+        {'EXIT', Reason} ->
+            ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State),
+            {stop, State}
+    end.
 
-handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
-    ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
-    shutdown(conflict, State);
+websocket_info({call, From, info}, State = #state{peername    = Peername,
+                                                  sockname    = Sockname,
+                                                  proto_state = ProtoState}) ->
+    ProtoInfo = emqx_protocol:info(ProtoState),
+    ConnInfo = [{socktype, websocket}, {conn_state, running},
+                {peername, Peername}, {sockname, Sockname}],
+    gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])),
+    {ok, State};
+
+websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) ->
+    Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]),
+    gen_server:reply(From, Stats),
+    {ok, State};
+
+websocket_info({call, From, kick}, State) ->
+    gen_server:reply(From, ok),
+    shutdown(kick, State);
+
+websocket_info({call, From, session}, State = #state{proto_state = ProtoState}) ->
+    gen_server:reply(From, emqx_protocol:session(ProtoState)),
+    {ok, State};
+
+websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
+    case emqx_protocol:deliver(PubOrAck, ProtoState) of
+        {ok, ProtoState1} ->
+            {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})};
+        {error, Reason} ->
+            shutdown(Reason, State);
+        {error, Reason, ProtoState1} ->
+            shutdown(Reason, State#state{proto_state = ProtoState1})
+    end;
 
-handle_info({shutdown, Reason}, State) ->
-    shutdown(Reason, State);
+websocket_info(emit_stats, State = #state{proto_state = ProtoState}) ->
+    Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(),
+                          emqx_protocol:stats(ProtoState)]),
+    emqx_cm:set_client_stats(emqx_protocol:clientid(ProtoState), Stats),
+    {ok, State#state{stats_timer = undefined}, hibernate};
 
-handle_info({keepalive, start, Interval}, State) ->
+websocket_info({keepalive, start, Interval}, State) ->
     ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
     case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
         {ok, KeepAlive} ->
-            {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
+            {ok, State#state{keepalive = KeepAlive}};
         {error, Error} ->
             ?WSLOG(warning, "Keepalive error - ~p", [Error], State),
             shutdown(Error, State)
     end;
 
-handle_info({keepalive, check}, State = #wsclient_state{keepalive = KeepAlive}) ->
+websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
     case emqx_keepalive:check(KeepAlive) of
         {ok, KeepAlive1} ->
-            {noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate};
+            {ok, State#state{keepalive = KeepAlive1}};
         {error, timeout} ->
             ?WSLOG(debug, "Keepalive Timeout!", [], State),
             shutdown(keepalive_timeout, State);
@@ -206,90 +227,46 @@ handle_info({keepalive, check}, State = #wsclient_state{keepalive = KeepAlive})
             shutdown(keepalive_error, State)
     end;
 
-handle_info({'EXIT', WsPid, normal}, State = #wsclient_state{ws_pid = WsPid}) ->
-    stop(normal, State);
+websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
+    ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
+    shutdown(conflict, State);
+
+websocket_info({binary, Data}, State) ->
+    {reply, {binary, Data}, State};
 
-handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
-    ?WSLOG(error, "shutdown: ~p",[Reason], State),
+websocket_info({shutdown, Reason}, State) ->
     shutdown(Reason, State);
 
-%% The session process exited unexpectedly.
-handle_info({'EXIT', Pid, Reason}, State = #wsclient_state{proto_state = ProtoState}) ->
-    case emqx_protocol:session(ProtoState) of
-        Pid -> stop(Reason, State);
-        _   -> ?WSLOG(error, "Unexpected EXIT: ~p, Reason: ~p", [Pid, Reason], State),
-               {noreply, State, hibernate}
-    end;
-
-handle_info(Info, State) ->
-    ?WSLOG(error, "Unexpected Info: ~p", [Info], State),
-    {noreply, State, hibernate}.
+websocket_info(Info, State) ->
+    ?WSLOG(error, "unexpected info: ~p", [Info], State),
+    {ok, State}.
 
-terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
-    emqx_keepalive:cancel(KeepAlive),
+terminate(SockError, _Req, #state{keepalive       = Keepalive,
+                                  proto_state     = ProtoState,
+                                  shutdown_reason = Reason}) ->
+    emqx_keepalive:cancel(Keepalive),
+    io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]),
     case Reason of
-        {shutdown, Error} ->
-            emqx_protocol:shutdown(Error, ProtoState);
+        undefined ->
+            ok;
+            %%emqx_protocol:shutdown(SockError, ProtoState);
         _ ->
-            emqx_protocol:shutdown(Reason, ProtoState)
-    end.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-send_fun(WsPid) ->
-    fun(Data) ->
-        BinSize = iolist_size(Data),
-        emqx_metrics:inc('bytes/sent', BinSize),
-        put(send_oct, get(send_oct) + BinSize),
-        put(send_cnt, get(send_cnt) + 1),
-        WsPid ! {binary, iolist_to_binary(Data)}
-    end.
-
-stat_fun() ->
-    fun() ->
-        {ok, get(recv_oct)}
+            ok%%emqx_protocol:shutdown(Reason, ProtoState)
     end.
 
-emit_stats(State = #wsclient_state{proto_state = ProtoState}) ->
-    emit_stats(emqx_protocol:clientid(ProtoState), State).
+reset_parser(State = #state{proto_state = ProtoState}) ->
+    State#state{parser_state = emqx_protocol:parser(ProtoState)}.
 
-emit_stats(_ClientId, State = #wsclient_state{enable_stats = false}) ->
-    State;
-emit_stats(undefined, State) ->
-    State;
-emit_stats(ClientId, State) ->
-    {reply, Stats, _, _} = handle_call(stats, undefined, State),
-    emqx_cm:set_client_stats(ClientId, Stats),
+ensure_stats_timer(State = #state{enable_stats = true,
+                                  stats_timer  = undefined,
+                                  idle_timeout = Timeout}) ->
+    State#state{stats_timer = erlang:send_after(Timeout, self(), emit_stats)};
+ensure_stats_timer(State) ->
     State.
 
-wsock_stats() ->
-    [{Key, get(Key)}|| Key <- ?SOCK_STATS].
-
-with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
-    {ok, ProtoState1} = Fun(ProtoState),
-    {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}.
-
-reply(Reply, State) ->
-    {reply, Reply, State, hibernate}.
-
 shutdown(Reason, State) ->
-    stop({shutdown, Reason}, State).
-
-stop(Reason, State) ->
-    {stop, Reason, State}.
+    {stop, State#state{shutdown_reason = Reason}}.
 
-gc(State) ->
-    Cb = fun() -> emit_stats(State) end,
-    emqx_gc:maybe_force_gc(#wsclient_state.force_gc_count, State, Cb).
-
-init_stas() ->
-    put(recv_oct, 0),
-    put(recv_cnt, 0),
-    put(send_oct, 0),
-    put(send_cnt, 0).
+wsock_stats() ->
+    [{Key, get(Key)} || Key <- ?SOCK_STATS].
 

+ 0 - 44
src/emqx_ws_connection_sup.erl

@@ -1,44 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_ws_connection_sup).
-
--behavior(supervisor).
-
--export([start_link/0, start_connection/2]).
-
--export([init/1]).
-
--spec(start_link() -> {ok, pid()}).
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-%% @doc Start a MQTT/WebSocket Connection.
--spec(start_connection(pid(), cowboy_req:req()) -> {ok, pid()}).
-start_connection(WsPid, Req) ->
-    supervisor:start_child(?MODULE, [WsPid, Req]).
-
-%%--------------------------------------------------------------------
-%% Supervisor callbacks
-%%--------------------------------------------------------------------
-
-init([]) ->
-    %%TODO: Cannot upgrade the environments, Use zone?
-    Env = lists:append(emqx_config:get_env(client, []), emqx_config:get_env(protocol, [])),
-    {ok, {{simple_one_for_one, 0, 1},
-           [{ws_connection, {emqx_ws_connection, start_link, [Env]},
-             temporary, 5000, worker, [emqx_ws_connection]}]}}.
-

+ 17 - 9
src/emqx_zone.erl

@@ -17,24 +17,32 @@
 -behaviour(gen_server).
 
 -export([start_link/0]).
--export([get_env/2, get_env/3]).
+
+-export([env/2, env/3]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
 -record(state, {timer}).
+
 -define(TAB, ?MODULE).
--define(SERVER, ?MODULE).
 
 start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-get_env(Zone, Par) ->
-    get_env(Zone, Par, undefined).
+env(undefined, Par) ->
+    emqx_config:get_env(Par);
+env(Zone, Par) ->
+    env(Zone, Par, undefined).
 
-get_env(Zone, Par, Def) ->
-    try ets:lookup_element(?TAB, {Zone, Par}, 2) catch error:badarg -> Def end.
+env(undefined, Par, Default) ->
+    emqx_config:get_env(Par, Default);
+env(Zone, Par, Default) ->
+    try ets:lookup_element(?TAB, {Zone, Par}, 2)
+    catch error:badarg ->
+        emqx_config:get_env(Par, Default)
+    end.
 
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
@@ -54,8 +62,8 @@ handle_cast(Msg, State) ->
 
 handle_info(reload, State) ->
     lists:foreach(
-      fun({Zone, Options}) ->
-          [ets:insert(?TAB, {{Zone, Par}, Val}) || {Par, Val} <- Options]
+      fun({Zone, Opts}) ->
+          [ets:insert(?TAB, {{Zone, Par}, Val}) || {Par, Val} <- Opts]
       end, emqx_config:get_env(zones, [])),
     {noreply, ensure_reload_timer(State), hibernate};