Просмотр исходного кода

Merge pull request #1197 from emqtt/develop

Add http management APIs
Feng Lee 8 лет назад
Родитель
Сommit
36a7b74656

+ 1 - 0
.gitignore

@@ -31,3 +31,4 @@ data/
 _build
 .rebar3
 rebar3.crashdump
+.DS_Store

+ 4 - 2
Makefile

@@ -2,18 +2,20 @@ PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
 PROJECT_VERSION = 2.3
 
-DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt
+DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
 
 dep_goldrush     = git https://github.com/basho/goldrush 0.1.9
 dep_gproc        = git https://github.com/uwiger/gproc
 dep_getopt       = git https://github.com/jcomellas/getopt v0.8.2
 dep_lager        = git https://github.com/basho/lager master
 dep_esockd       = git https://github.com/emqtt/esockd master
-dep_ekka         = git https://github.com/emqtt/ekka emq24
+dep_ekka         = git https://github.com/emqtt/ekka develop
 dep_mochiweb     = git https://github.com/emqtt/mochiweb master
 dep_pbkdf2       = git https://github.com/emqtt/pbkdf2 2.0.1
 dep_lager_syslog = git https://github.com/basho/lager_syslog
 dep_bcrypt       = git https://github.com/smarkets/erlang-bcrypt master
+dep_clique       = git https://github.com/emqtt/clique
+dep_jsx           = git https://github.com/talentdeficit/jsx
 
 ERLC_OPTS += +debug_info
 ERLC_OPTS += +'{parse_transform, lager_transform}'

+ 3 - 0
etc/emq.conf

@@ -177,6 +177,9 @@ mqtt.max_packet_size = 64KB
 ## Check Websocket Protocol Header. Enum: on, off
 mqtt.websocket_protocol_header = on
 
+## The Keepalive timeout: Keepalive * backoff * 2
+mqtt.keepalive_backoff = 1.25
+
 ##--------------------------------------------------------------------
 ## MQTT Connection
 ##--------------------------------------------------------------------

+ 14 - 0
include/emqttd_internal.hrl

@@ -59,3 +59,17 @@
 
 -define(FULLSWEEP_OPTS, [{fullsweep_after, 10}]).
 
+-define(SUCCESS, 0).  %% Success
+-define(ERROR1, 101). %% badrpc
+-define(ERROR2, 102). %% Unknown error
+-define(ERROR3, 103). %% Username or password error
+-define(ERROR4, 104). %% Empty username or password
+-define(ERROR5, 105). %% User does not exist
+-define(ERROR6, 106). %% Admin can not be deleted
+-define(ERROR7, 107). %% Missing request parameter
+-define(ERROR8, 108). %% Request parameter type error
+-define(ERROR9, 109). %% Request parameter is not a json
+-define(ERROR10, 110). %% Plugin has been loaded
+-define(ERROR11, 111). %% Plugin has been loaded
+-define(ERROR12, 112). %% Client not online
+

+ 8 - 1
priv/emq.schema

@@ -474,9 +474,16 @@ end}.
   {datatype, bytesize}
 ]}.
 
+%% @doc Keepalive backoff
+{mapping, "mqtt.keepalive_backoff", "emqttd.protocol", [
+  {default, 1.25},
+  {datatype, float}
+]}.
+
 {translation, "emqttd.protocol", fun(Conf) ->
   [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
-   {max_packet_size,  cuttlefish:conf_get("mqtt.max_packet_size", Conf)}]
+   {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
+   {keepalive_backoff, cuttlefish:conf_get("mqtt.keepalive_backoff", Conf)}]
 end}.
 
 {mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [

+ 1 - 1
src/emqttd_access_control.erl

@@ -48,7 +48,7 @@ start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
 %% @doc Authenticate MQTT Client.
--spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {error, any()}).
+-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, any()}).
 auth(Client, Password) when is_record(Client, mqtt_client) ->
     auth(Client, Password, lookup_mods(auth)).
 auth(_Client, _Password, []) ->

+ 1 - 1
src/emqttd_app.erl

@@ -184,7 +184,7 @@ start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
     mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []});
 
 start_listener({Proto, ListenOn, Opts}) when Proto == api ->
-    mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}).
+    mochiweb:start_http('mqtt:api', ListenOn, Opts, emqttd_http:http_handler()).
 
 start_listener(Proto, ListenOn, Opts) ->
     Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])),

+ 9 - 1
src/emqttd_broker.erl

@@ -31,7 +31,7 @@
 -export([subscribe/1, notify/2]).
 
 %% Broker API
--export([version/0, uptime/0, datetime/0, sysdescr/0]).
+-export([version/0, uptime/0, datetime/0, sysdescr/0, info/0]).
 
 %% Tick API
 -export([start_tick/1, stop_tick/1]).
@@ -75,6 +75,14 @@ subscribe(EventType) ->
 notify(EventType, Event) ->
      gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}).
 
+%% @doc Get broker info
+-spec(info() -> list(tuple())).
+info() ->
+    [{version,  version()},
+     {sysdescr, sysdescr()},
+     {uptime,   uptime()},
+     {datetime, datetime()}].
+
 %% @doc Get broker version
 -spec(version() -> string()).
 version() ->

+ 2 - 1
src/emqttd_cli.erl

@@ -48,7 +48,8 @@
 
 load() ->
     Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
-    [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].
+    [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds],
+    emqttd_cli_config:register_config().
 
 is_cmd(Fun) ->
     not lists:member(Fun, [init, load, module_info]).

+ 244 - 0
src/emqttd_cli_config.erl

@@ -0,0 +1,244 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module (emqttd_cli_config).
+
+-export ([register_config_cli/0, register_config/0, run/1]).
+
+-define(APP, emqttd).
+
+register_config() ->
+    application:start(clique),
+    F = fun() -> ekka_mnesia:running_nodes() end,
+    clique:register_node_finder(F),
+    register_config_cli().
+
+run(Cmd) ->
+    clique:run(Cmd).
+
+register_config_cli() ->
+    ok = clique_config:load_schema([code:priv_dir(?APP)], ?APP),
+    register_protocol_formatter(),
+    register_client_formatter(),
+    register_session_formatter(),
+    register_queue_formatter(),
+    register_lager_formatter(),
+
+    register_auth_config(),
+    register_protocol_config(),
+    register_connection_config(),
+    register_client_config(),
+    register_session_config(),
+    register_queue_config(),
+    register_broker_config(),
+    register_lager_config().
+
+%%--------------------------------------------------------------------
+%% Auth/Acl
+%%--------------------------------------------------------------------
+
+register_auth_config() ->
+    ConfigKeys = ["mqtt.allow_anonymous",
+                  "mqtt.acl_nomatch",
+                  "mqtt.acl_file",
+                  "mqtt.cache_acl"],
+    [clique:register_config(Key , fun auth_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+auth_config_callback([_, KeyStr], Value) ->
+    application:set_env(?APP, l2a(KeyStr), Value), " successfully\n".
+    
+%%--------------------------------------------------------------------
+%% MQTT Protocol
+%%--------------------------------------------------------------------
+
+register_protocol_formatter() ->
+    ConfigKeys = ["max_clientid_len", 
+                  "max_packet_size",
+                  "websocket_protocol_header",
+                  "keepalive_backoff"],
+    [clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys].
+
+protocol_formatter_callback([_, Key], Params) ->
+    proplists:get_value(l2a(Key), Params).
+
+register_protocol_config() ->
+    ConfigKeys = ["mqtt.max_clientid_len",
+                  "mqtt.max_packet_size",
+                  "mqtt.websocket_protocol_header",
+                  "mqtt.keepalive_backoff"],
+    [clique:register_config(Key , fun protocol_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+protocol_config_callback([_AppStr, KeyStr], Value) ->
+    protocol_config_callback(protocol, l2a(KeyStr), Value).
+protocol_config_callback(App, Key, Value) ->
+    {ok, Env} = emqttd:env(App),
+    application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
+    " successfully\n".
+
+%%--------------------------------------------------------------------
+%% MQTT Connection
+%%--------------------------------------------------------------------
+
+register_connection_config() ->
+    ConfigKeys = ["mqtt.conn.force_gc_count"],
+    [clique:register_config(Key , fun connection_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+connection_config_callback([_, KeyStr0, KeyStr1], Value) ->
+    KeyStr = lists:concat([KeyStr0, "_", KeyStr1]),
+    application:set_env(?APP, l2a(KeyStr), Value),
+    " successfully\n".
+
+%%--------------------------------------------------------------------
+%% MQTT Client
+%%--------------------------------------------------------------------
+
+register_client_formatter() ->
+    ConfigKeys = ["max_publish_rate", 
+                  "idle_timeout",
+                  "enable_stats"],
+    [clique:register_formatter(["mqtt", "client", Key], fun client_formatter_callback/2) || Key <- ConfigKeys].
+
+client_formatter_callback([_, _, Key], Params) ->
+    proplists:get_value(list_to_atom(Key), Params).
+
+register_client_config() ->
+    ConfigKeys = ["mqtt.client.max_publish_rate",
+                  "mqtt.client.idle_timeout",
+                  "mqtt.client.enable_stats"],
+    [clique:register_config(Key , fun client_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+client_config_callback([_, AppStr, KeyStr], Value) ->
+    client_config_callback(l2a(AppStr), l2a(KeyStr), Value).
+client_config_callback(App, Key, Value) ->
+    {ok, Env} = emqttd:env(App),
+    application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
+    " successfully\n".
+
+%%--------------------------------------------------------------------
+%% session
+%%--------------------------------------------------------------------
+
+register_session_formatter() ->
+    ConfigKeys = ["max_subscriptions", 
+                  "upgrade_qos",
+                  "max_inflight",
+                  "retry_interval",
+                  "max_awaiting_rel",
+                  "await_rel_timeout",
+                  "enable_stats",
+                  "expiry_interval",
+                  "ignore_loop_deliver"],
+    [clique:register_formatter(["mqtt", "session", Key], fun session_formatter_callback/2) || Key <- ConfigKeys].
+
+session_formatter_callback([_, _, Key], Params) ->
+    proplists:get_value(list_to_atom(Key), Params).
+
+register_session_config() ->
+    ConfigKeys = ["mqtt.session.max_subscriptions",
+                  "mqtt.session.upgrade_qos",
+                  "mqtt.session.max_inflight",
+                  "mqtt.session.retry_interval",
+                  "mqtt.session.max_awaiting_rel",
+                  "mqtt.session.await_rel_timeout",
+                  "mqtt.session.enable_stats",
+                  "mqtt.session.expiry_interval",
+                  "mqtt.session.ignore_loop_deliver"],
+    [clique:register_config(Key , fun session_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+session_config_callback([_, AppStr, KeyStr], Value) ->
+    session_config_callback(l2a(AppStr), l2a(KeyStr), Value).
+session_config_callback(App, Key, Value) ->
+    {ok, Env} = emqttd:env(App),
+    application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
+    " successfully\n".
+
+l2a(List) -> list_to_atom(List).
+
+%%--------------------------------------------------------------------
+%% MQTT MQueue
+%%--------------------------------------------------------------------
+
+register_queue_formatter() ->
+    ConfigKeys = ["type", 
+                  "priority",
+                  "max_length",
+                  "low_watermark",
+                  "high_watermark",
+                  "store_qos0"],
+    [clique:register_formatter(["mqtt", "mqueue", Key], fun queue_formatter_callback/2) || Key <- ConfigKeys].
+
+queue_formatter_callback([_, _, Key], Params) ->
+    proplists:get_value(list_to_atom(Key), Params).
+
+register_queue_config() ->
+    ConfigKeys = ["mqtt.mqueue.type",
+                  "mqtt.mqueue.priority",
+                  "mqtt.mqueue.max_length",
+                  "mqtt.mqueue.low_watermark",
+                  "mqtt.mqueue.high_watermark",
+                  "mqtt.mqueue.store_qos0"],
+    [clique:register_config(Key , fun queue_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+queue_config_callback([_, AppStr, KeyStr], Value) ->
+    queue_config_callback(l2a(AppStr), l2a(KeyStr), Value).
+queue_config_callback(App, Key, Value) ->
+    {ok, Env} = emqttd:env(App),
+    application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})),
+    " successfully\n".
+
+%%--------------------------------------------------------------------
+%% MQTT Broker
+%%--------------------------------------------------------------------
+
+register_broker_config() ->
+    ConfigKeys = ["mqtt.broker.sys_interval"],
+    [clique:register_config(Key , fun broker_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+broker_config_callback([_, KeyStr0, KeyStr1], Value) ->
+    KeyStr = lists:concat([KeyStr0, "_", KeyStr1]),
+    application:set_env(?APP, l2a(KeyStr), Value),
+    " successfully\n".
+
+%%--------------------------------------------------------------------
+%% MQTT Lager
+%%--------------------------------------------------------------------
+
+register_lager_formatter() ->
+    ConfigKeys = ["level"],
+    [clique:register_formatter(["log", "console", Key], fun lager_formatter_callback/2) || Key <- ConfigKeys].
+
+lager_formatter_callback(_, Params) ->
+    proplists:get_value(lager_console_backend, Params).
+
+register_lager_config() ->
+    ConfigKeys = ["log.console.level"],
+    [clique:register_config(Key , fun lager_config_callback/2) || Key <- ConfigKeys],
+    ok = register_config_whitelist(ConfigKeys).
+
+lager_config_callback(_, Value) ->
+    lager:set_loglevel(lager_console_backend, Value),
+    " successfully\n".
+
+register_config_whitelist(ConfigKeys) ->
+  clique:register_config_whitelist(ConfigKeys, ?APP).
+

+ 1 - 1
src/emqttd_client.erl

@@ -66,7 +66,7 @@
                         [esockd_net:format(State#client_state.peername) | Args])).
 
 start_link(Conn, Env) ->
-    {ok, proc_lib:spawn_opt(?MODULE, init, [[Conn, Env]], [link | ?FULLSWEEP_OPTS])}.
+    {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
 
 info(CPid) ->
     gen_server2:call(CPid, info).

+ 59 - 0
src/emqttd_config.erl

@@ -0,0 +1,59 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc Hot Configuration
+
+-module(emqttd_config).
+
+-export([read/1, dump/2, reload/1, get/2, get/3, set/3]).
+
+-type(env() :: {atom(), term()}).
+
+%% @doc Read the configuration of an application.
+-spec(read(atom()) -> {ok, list(env())} | {error, term()}).
+read(_App) ->
+    %% TODO
+    %% 1. Read the app.conf from etc folder
+    %% 2. Cuttlefish to read the conf
+    %% 3. Return the terms and schema
+    {error, unsupported}.
+
+%% @doc Reload configuration of an application.
+-spec(reload(atom()) -> ok | {error, term()}).
+reload(_App) ->
+    %% TODO
+    %% 1. Read the app.conf from etc folder
+    %% 2. Cuttlefish to generate config terms.
+    %% 3. set/3 to apply the config
+    ok.
+
+-spec(dump(atom(), list(env())) -> ok | {error, term()}).
+dump(_App, _Terms) ->
+    %% TODO
+    ok.
+
+-spec(set(atom(), atom(), term()) -> ok).
+set(App, Par, Val) ->
+    application:set_env(App, Par, Val).
+
+-spec(get(atom(), atom()) -> undefined | {ok, term()}).
+get(App, Par) ->
+    application:get_env(App, Par).
+
+-spec(get(atom(), atom(), atom()) -> term()).
+get(App, Par, Def) ->
+    application:get_env(App, Par, Def).
+

+ 6 - 0
src/emqttd_ctl.erl

@@ -68,6 +68,12 @@ run([]) -> usage(), ok;
 
 run(["help"]) -> usage(), ok;
 
+run(["set" | _] = CmdS) ->
+    emqttd_cli_config:run(["config" | CmdS]), ok;
+
+run(["show" | _] = CmdS) ->
+    emqttd_cli_config:run(["config" | CmdS]), ok;
+
 run([CmdS|Args]) ->
     case lookup(list_to_atom(CmdS)) of
         [{Mod, Fun}] ->

+ 180 - 103
src/emqttd_http.erl

@@ -26,15 +26,41 @@
 
 -import(proplists, [get_value/2, get_value/3]).
 
--export([handle_request/1]).
+-export([http_handler/0, handle_request/2, http_api/0]).
 
-handle_request(Req) ->
-    handle_request(Req:get(method), Req:get(path), Req).
+-include("emqttd_internal.hrl").
 
-handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' ->
+-record(state, {dispatch}).
+
+http_handler() ->
+    APIs = http_api(),
+    State = #state{dispatch = dispatcher(APIs)},
+    {?MODULE, handle_request, [State]}.
+
+http_api() ->
+    Attr = emqttd_rest_api:module_info(attributes),
+    [{Regexp, Method, Function, Args} || {http_api, [{Regexp, Method, Function, Args}]} <- Attr].
+
+%%--------------------------------------------------------------------
+%% Handle HTTP Request
+%%--------------------------------------------------------------------
+handle_request(Req, State) ->
+    Path = Req:get(path),
+    case Path of
+        "/status" ->
+            handle_request("/status", Req, Req:get(method));
+        "/" ->
+            handle_request("/", Req, Req:get(method));
+        _ ->
+            if_authorized(Req, fun() -> handle_request(Path, Req, State) end)
+    end.
+
+handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) ->
+    Dispatch(Req, Url);
+
+handle_request("/status", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' ->
     {InternalStatus, _ProvidedStatus} = init:get_status(),
-    AppStatus =
-    case lists:keysearch(emqttd, 1, application:which_applications()) of
+    AppStatus = case lists:keysearch(emqttd, 1, application:which_applications()) of
         false         -> not_running;
         {value, _Val} -> running
     end,
@@ -42,111 +68,162 @@ handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET'
                             [node(), InternalStatus, AppStatus]),
     Req:ok({"text/plain", iolist_to_binary(Status)});
 
-%%--------------------------------------------------------------------
-%% HTTP Publish API
-%%--------------------------------------------------------------------
+handle_request("/", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' ->
+    respond(Req, 200, api_list());
+
+handle_request(_, Req, #state{}) ->
+    respond(Req, 404, []).
+
+dispatcher(APIs) ->
+    fun(Req, Url) ->
+        Method = Req:get(method),
+        case filter(APIs, Url, Method) of
+            [{Regexp, _Method, Function, FilterArgs}] ->
+                case params(Req) of
+                    {error, Error1} ->
+                        respond(Req, 200, Error1);
+                    Params ->
+                        case {check_params(Params, FilterArgs),
+                              check_params_type(Params, FilterArgs)} of
+                            {true, true} ->
+                                {match, [MatchList]} = re:run(Url, Regexp, [global, {capture, all_but_first, list}]),
+                                Args = lists:append([[Method, Params], MatchList]),
+                                lager:debug("Mod:~p, Fun:~p, Args:~p", [emqttd_rest_api, Function, Args]),
+                                case catch apply(emqttd_rest_api, Function, Args) of
+                                    {ok, Data} ->
+                                        respond(Req, 200, [{code, ?SUCCESS}, {result, Data}]);
+                                    {error, Error} ->
+                                        respond(Req, 200, Error);
+                                    {'EXIT', Reason} ->
+                                        lager:error("Execute API '~s' Error: ~p", [Url, Reason]),
+                                        respond(Req, 404, [])
+                                end;
+                            {false, _} ->
+                                respond(Req, 200, [{code, ?ERROR7}, {message, <<"params error">>}]);
+                            {_, false} ->
+                                respond(Req, 200, [{code, ?ERROR8}, {message, <<"params type error">>}])
+                        end
+                end;
+            _ ->
+                lager:error("No match Url:~p", [Url]),
+                respond(Req, 404, [])
+        end
+    end.
 
-handle_request('POST', "/mqtt/publish", Req) ->
+% %%--------------------------------------------------------------------
+% %% Basic Authorization
+% %%--------------------------------------------------------------------
+if_authorized(Req, Fun) ->
     case authorized(Req) of
-        true  -> http_publish(Req);
-        false -> Req:respond({401, [], <<"Unauthorized">>})
-    end;
-
-%%--------------------------------------------------------------------
-%% Get static files
-%%--------------------------------------------------------------------
-
-handle_request('GET', "/" ++ File, Req) ->
-    lager:info("HTTP GET File: ~s", [File]),
-    mochiweb_request:serve_file(File, docroot(), Req);
-
-handle_request(Method, Path, Req) ->
-    lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
-    Req:not_found().
-
-%%--------------------------------------------------------------------
-%% HTTP Publish
-%%--------------------------------------------------------------------
-
-http_publish(Req) ->
-    Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)],
-    lager:debug("HTTP Publish: ~p", [Params]),
-    Topics   = topics(Params),
-    ClientId = get_value(<<"client">>, Params, http),
-    Qos      = int(get_value(<<"qos">>, Params, "0")),
-    Retain   = bool(get_value(<<"retain">>, Params, "0")),
-    Payload  = iolist_to_binary(get_value(<<"message">>, Params)),
-    case {validate(qos, Qos), validate(topics, Topics)} of
-        {true, true} ->
-            lists:foreach(fun(Topic) ->
-                Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
-                emqttd:publish(Msg#mqtt_message{retain  = Retain})
-            end, Topics),
-            Req:ok({"text/plain", <<"OK">>});
-       {false, _} ->
-            Req:respond({400, [], <<"Bad QoS">>});
-        {_, false} ->
-            Req:respond({400, [], <<"Bad Topics">>})
+        true  -> Fun();
+        false -> respond(Req, 401,  [])
     end.
 
-topics(Params) ->
-    Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")],
-    [iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined].
-
-validate(qos, Qos) ->
-    (Qos >= ?QOS_0) and (Qos =< ?QOS_2);
-
-validate(topics, [Topic|Left]) ->
-    case validate(topic, Topic) of
-        true  -> validate(topics, Left);
-        false -> false
-    end;
-validate(topics, []) ->
-    true;
-
-validate(topic, Topic) ->
-    emqttd_topic:validate({name, Topic}).
-
-%%--------------------------------------------------------------------
-%% basic authorization
-%%--------------------------------------------------------------------
-
 authorized(Req) ->
-    Params = mochiweb_request:parse_post(Req),
-    ClientId = get_value("client", Params, http),
     case Req:get_header_value("Authorization") of
-    undefined ->
-        false;
-    "Basic " ++ BasicAuth ->
-        {Username, Password} = user_passwd(BasicAuth),
-        {ok, Peer} = Req:get(peername),
-        case emqttd_access_control:auth(#mqtt_client{client_id = ClientId, username = Username, peername = Peer}, Password) of
-            ok ->
-                true;
-            {ok, _IsSuper} -> 
-                true;
-            {error, Reason} ->
-                lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]),
-                false
-        end
+        undefined ->
+            false;
+        "Basic " ++ BasicAuth ->
+            {Username, Password} = user_passwd(BasicAuth),
+            {ok, Peer} = Req:get(peername),
+            Params = params(Req),
+            ClientId = get_value(<<"client">>, Params, http),
+            case emqttd_access_control:auth(#mqtt_client{client_id = ClientId,
+                                                         username = Username,
+                                                         peername = Peer}, Password) of
+                ok -> true;
+                {ok, _IsSuper} -> 
+                    true;
+                {error, Reason} ->
+                    lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]),
+                    false
+            end
     end.
 
 user_passwd(BasicAuth) ->
-    list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). 
-
-int(I) when is_integer(I)-> I;
-int(B) when is_binary(B)-> binary_to_integer(B);
-int(S) -> list_to_integer(S).
-
-bool(0)   -> false;
-bool(1)   -> true;
-bool("0") -> false;
-bool("1") -> true;
-bool(<<"0">>) -> false;
-bool(<<"1">>) -> true.
+    list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
+
+respond(Req, 401, Data) ->
+    Req:respond({401, [{"WWW-Authenticate", "Basic Realm=\"emqx control center\""}], Data});
+respond(Req, 404, Data) ->
+    Req:respond({404, [{"Content-Type", "text/plain"}], Data});
+respond(Req, 200, Data) ->
+    Req:respond({200, [{"Content-Type", "application/json"}], to_json(Data)});
+respond(Req, Code, Data) ->
+    Req:respond({Code, [{"Content-Type", "text/plain"}], Data}).
+
+filter(APIs, Url, Method) ->
+    lists:filter(fun({Regexp, Method1, _Function, _Args}) ->
+        case re:run(Url, Regexp, [global, {capture, all_but_first, list}]) of
+            {match, _} -> Method =:= Method1;
+            _ -> false
+        end
+    end, APIs).
+
+params(Req) ->
+    Method = Req:get(method),
+    case Method of
+        'GET' ->
+            mochiweb_request:parse_qs(Req);
+        _ ->
+            case Req:recv_body() of
+                <<>> -> [];
+                undefined -> [];
+                Body ->
+                    case jsx:is_json(Body) of
+                        true -> jsx:decode(Body);
+                        false ->
+                            lager:error("Body:~p", [Body]),
+                            {error, [{code, ?ERROR9}, {message, <<"Body not json">>}]}
+                    end
+            end
+    end.
 
-docroot() ->
-    {file, Here} = code:is_loaded(?MODULE),
-    Dir = filename:dirname(filename:dirname(Here)),
-    filename:join([Dir, "priv", "www"]).
+check_params(_Params, Args) when Args =:= [] ->
+    true;
+check_params(Params, Args)->
+    not lists:any(fun({Item, _Type}) -> undefined =:= proplists:get_value(Item, Params) end, Args).
 
+check_params_type(_Params, Args) when Args =:= [] ->
+    true;
+check_params_type(Params, Args) ->
+    not lists:any(fun({Item, Type}) ->
+        Val = proplists:get_value(Item, Params),
+        case Type of
+            int -> not is_integer(Val);
+            binary -> not is_binary(Val);
+            bool -> not is_boolean(Val)
+        end
+    end, Args).
+
+to_json([])   -> <<"[]">>;
+to_json(Data) -> iolist_to_binary(mochijson2:encode(Data)).
+
+api_list() ->
+    [{paths, [<<"api/v2/management/nodes">>,
+              <<"api/v2/management/nodes/{node_name}">>,
+              <<"api/v2/monitoring/nodes">>,
+              <<"api/v2/monitoring/nodes/{node_name}">>,
+              <<"api/v2/monitoring/listeners">>,
+              <<"api/v2/monitoring/listeners/{node_name}">>,
+              <<"api/v2/monitoring/metrics/">>,
+              <<"api/v2/monitoring/metrics/{node_name}">>,
+              <<"api/v2/monitoring/stats">>,
+              <<"api/v2/monitoring/stats/{node_name}">>,
+              <<"api/v2/nodes/{node_name}/clients">>,
+              <<"api/v2/nodes/{node_name}/clients/{clientid}">>,
+              <<"api/v2/clients/{clientid}">>,
+              <<"api/v2/kick_client/{clientid}">>,
+              <<"api/v2/nodes/{node_name}/sessions">>,
+              <<"api/v2/nodes/{node_name}/sessions/{clientid}">>,
+              <<"api/v2/sessions/{clientid}">>,
+              <<"api/v2/nodes/{node_name}/subscriptions">>,
+              <<"api/v2/nodes/{node_name}/subscriptions/{clientid}">>,
+              <<"api/v2/subscriptions/{clientid}">>,
+              <<"api/v2/routes">>,
+              <<"api/v2/routes/{topic}">>,
+              <<"api/v2/mqtt/publish">>,
+              <<"api/v2/mqtt/subscribe">>,
+              <<"api/v2/mqtt/unsubscribe">>,
+              <<"api/v2/nodes/{node_name}/plugins">>,
+              <<"api/v2/nodes/{node_name}/plugins/{plugin_name}">>]}].

+ 383 - 0
src/emqttd_mgmt.erl

@@ -0,0 +1,383 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqttd_mgmt).
+
+-author("Feng Lee <feng@emqtt.io>").
+
+-include("emqttd.hrl").
+
+-include("emqttd_protocol.hrl").
+
+-include("emqttd_internal.hrl").
+
+-include_lib("stdlib/include/qlc.hrl").
+
+-import(proplists, [get_value/2]).
+
+-export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0,
+         plugins/0, plugins/1, listeners/0, listener/1, nodes_info/0, node_info/1]).
+
+-export([plugin_list/1, plugin_unload/2, plugin_load/2]).
+
+-export([client_list/4, session_list/4, route_list/3, subscription_list/4, alarm_list/0]).
+
+-export([client/1, session/1, route/1, subscription/1]).
+
+-export([query_table/4, lookup_table/3]).
+
+-export([publish/1, subscribe/1, unsubscribe/1]).
+
+-export([kick_client/1]).
+
+-define(KB, 1024).
+-define(MB, (1024*1024)).
+-define(GB, (1024*1024*1024)).
+
+-define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))).
+
+brokers() ->
+    [{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()].
+
+broker(Node) when Node =:= node() ->
+    emqttd_broker:info();
+broker(Node) ->
+    rpc_call(Node, broker, [Node]).
+
+metrics() ->
+    [{Node, metrics(Node)} || Node <- ekka_mnesia:running_nodes()].
+
+metrics(Node) when Node =:= node() ->
+    emqttd_metrics:all();
+metrics(Node)  ->
+    rpc_call(Node, metrics, [Node]).
+
+stats() ->
+    [{Node, stats(Node)} || Node <- ekka_mnesia:running_nodes()].
+
+stats(Node) when Node =:= node() ->
+    emqttd_stats:getstats();
+stats(Node) ->
+    rpc_call(Node, stats, [Node]).
+
+plugins() ->
+    [{Node, plugins(Node)} || Node <- ekka_mnesia:running_nodes()].
+
+plugins(Node) when Node =:= node() ->
+    emqttd_plugins:list(Node);
+plugins(Node) ->
+    rpc_call(Node, plugins, [Node]).
+
+listeners() ->
+    [{Node, listener(Node)} || Node <- ekka_mnesia:running_nodes()].
+
+listener(Node) when Node =:= node() ->
+    lists:map(fun({{Protocol, ListenOn}, Pid}) ->
+                Info = [{acceptors,      esockd:get_acceptors(Pid)},
+                        {max_clients,    esockd:get_max_clients(Pid)},
+                        {current_clients,esockd:get_current_clients(Pid)},
+                        {shutdown_count, esockd:get_shutdown_count(Pid)}],
+                {Protocol, ListenOn, Info}
+            end, esockd:listeners());
+
+listener(Node) ->
+    rpc_call(Node, listener, [Node]).
+
+nodes_info() ->
+    Running = mnesia:system_info(running_db_nodes),
+    Stopped = mnesia:system_info(db_nodes) -- Running,
+    DownNodes = lists:map(fun stop_node/1, Stopped),
+    [node_info(Node) || Node <- Running] ++ DownNodes.
+
+node_info(Node) when Node =:= node() ->
+    CpuInfo = [{K, list_to_binary(V)} || {K, V} <- emqttd_vm:loads()],
+    Memory  = emqttd_vm:get_memory(),
+    OtpRel  = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
+    [{name, node()},
+     {otp_release, list_to_binary(OtpRel)},
+     {memory_total, kmg(get_value(allocated, Memory))},
+     {memory_used,  kmg(get_value(used, Memory))},
+     {process_available, erlang:system_info(process_limit)},
+     {process_used, erlang:system_info(process_count)},
+     {max_fds, get_value(max_fds, erlang:system_info(check_io))},
+     {clients, ets:info(mqtt_client, size)},
+     {node_status, 'Running'} | CpuInfo];
+
+node_info(Node) ->
+    rpc_call(Node, node_info, [Node]).
+
+stop_node(Node) ->
+    [{name, Node}, {node_status, 'Stopped'}].
+%%--------------------------------------------------------
+%% plugins
+%%--------------------------------------------------------
+plugin_list(Node) when Node =:= node() ->
+    emqttd_plugins:list();
+plugin_list(Node) ->
+    rpc_call(Node, plugin_list, [Node]).
+
+plugin_load(Node, PluginName) when Node =:= node() ->
+    emqttd_plugins:load(PluginName);
+plugin_load(Node, PluginName) ->
+    rpc_call(Node, plugin_load, [Node, PluginName]).
+
+plugin_unload(Node, PluginName) when Node =:= node() ->
+    emqttd_plugins:unload(PluginName);
+plugin_unload(Node, PluginName) ->
+    rpc_call(Node, plugin_unload, [Node, PluginName]).
+
+%%--------------------------------------------------------
+%% client
+%%--------------------------------------------------------
+client_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
+    client_list(Key, PageNo, PageSize);
+client_list(Node, Key, PageNo, PageSize) ->
+    rpc_call(Node, client_list, [Node, Key, PageNo, PageSize]).
+
+client(ClientId) ->
+    lists:flatten([client_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
+
+%%--------------------------------------------------------
+%% session
+%%--------------------------------------------------------
+session_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
+    session_list(Key, PageNo, PageSize);
+session_list(Node, Key, PageNo, PageSize) ->
+    rpc_call(Node, session_list, [Node, Key, PageNo, PageSize]).
+
+session(ClientId) ->
+    lists:flatten([session_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
+
+%%--------------------------------------------------------
+%% subscription
+%%--------------------------------------------------------
+subscription_list(Node, Key, PageNo, PageSize) when Node =:= node() ->
+    subscription_list(Key, PageNo, PageSize);
+subscription_list(Node, Key, PageNo, PageSize) ->
+    rpc_call(Node, subscription_list, [Node, Key, PageNo, PageSize]).
+
+subscription(Key) ->
+    lists:flatten([subscription_list(Node, Key, 1, 20) || Node <- ekka_mnesia:running_nodes()]).
+
+%%--------------------------------------------------------
+%% Routes
+%%--------------------------------------------------------
+route(Key) -> route_list(Key, 1, 20).
+
+%%--------------------------------------------------------
+%% alarm
+%%--------------------------------------------------------
+alarm_list() ->
+    emqttd_alarm:get_alarms().
+
+query_table(Qh, PageNo, PageSize, TotalNum) ->
+    Cursor = qlc:cursor(Qh),
+    case PageNo > 1 of
+        true  -> qlc:next_answers(Cursor, (PageNo - 1) * PageSize);
+        false -> ok
+    end,
+    Rows = qlc:next_answers(Cursor, PageSize),
+    qlc:delete_cursor(Cursor),
+    [{totalNum, TotalNum},
+     {totalPage, total_page(TotalNum, PageSize)},
+     {result, Rows}].
+
+total_page(TotalNum, PageSize) ->
+    case TotalNum rem PageSize of
+        0 -> TotalNum div PageSize;
+        _ -> (TotalNum div PageSize) + 1
+    end.
+
+%%TODO: refactor later...
+lookup_table(LookupFun, _PageNo, _PageSize) ->
+    Rows = LookupFun(),
+    Rows.
+
+%%--------------------------------------------------------------------
+%% mqtt 
+%%--------------------------------------------------------------------
+publish({ClientId, Topic, Payload, Qos, Retain}) ->
+    case validate(topic, Topic) of
+        true ->
+            Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
+            emqttd:publish(Msg#mqtt_message{retain  = Retain}),
+            ok;
+        false ->
+            {error, format_error(Topic, "validate topic: ${0} fail")}
+    end.
+
+subscribe({ClientId, Topic, Qos}) ->
+    case validate(topic, Topic) of
+        true ->
+            case emqttd_sm:lookup_session(ClientId) of
+                undefined ->
+                    {error, format_error(ClientId, "Clientid: ${0} not found")};
+                #mqtt_session{sess_pid = SessPid} ->  
+                    emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]),
+                    ok
+            end;
+        false ->
+            {error, format_error(Topic, "validate topic: ${0} fail")}
+    end.
+
+unsubscribe({ClientId, Topic})-> 
+    case validate(topic, Topic) of
+        true ->
+            case emqttd_sm:lookup_session(ClientId) of
+                undefined ->
+                    {error, format_error(ClientId, "Clientid: ${0} not found")};
+                #mqtt_session{sess_pid = SessPid} ->   
+                    emqttd_session:unsubscribe(SessPid, [{Topic, []}]),
+                    ok
+            end;
+        false ->
+            {error, format_error(Topic, "validate topic: ${0} fail")}
+    end.
+    
+% publish(Messages) ->
+%     lists:foldl(
+%         fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) -> 
+%             case validate(topic, Topic) of
+%                 true ->
+%                     Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
+%                     emqttd:publish(Msg#mqtt_message{retain  = Retain}),
+%                     {[[{topic, Topic}]| Success], Failed};
+%                 false ->
+%                     {Success, [[{topic, Topic}]| Failed]}
+%             end
+%         end, {[], []}, Messages).
+
+% subscribers(Subscribers) ->
+%     lists:foldl(
+%         fun({ClientId, Topic, Qos}, {Success, Failed}) ->
+%             case emqttd_sm:lookup_session(ClientId) of
+%                 undefined ->
+%                     {Success, [[{client_id, ClientId}]|Failed]};
+%                 #mqtt_session{sess_pid = SessPid} ->  
+%                     emqttd_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]),
+%                     {[[{client_id, ClientId}]| Success], Failed}
+%             end
+%         end,{[], []}, Subscribers).
+
+% unsubscribers(UnSubscribers)-> 
+%     lists:foldl(
+%         fun({ClientId, Topic}, {Success, Failed}) ->
+%             case emqttd_sm:lookup_session(ClientId) of
+%                 undefined ->
+%                     {Success, [[{client_id, ClientId}]|Failed]};
+%                 #mqtt_session{sess_pid = SessPid} ->   
+%                     emqttd_session:unsubscriber(SessPid, [{Topic, []}]),
+%                     {[[{client_id, ClientId}]| Success], Failed}
+%             end
+%         end, {[], []}, UnSubscribers).
+
+%%--------------------------------------------------------------------
+%% manager API
+%%--------------------------------------------------------------------
+kick_client(ClientId) ->
+    Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
+    case lists:any(fun(Item) -> Item =:= ok end, Result) of
+        true  -> {ok, [{status, success}]};
+        false -> {ok, [{status, failure}]}
+    end.
+
+kick_client(Node, ClientId) when Node =:= node() ->
+    case emqttd_cm:lookup(ClientId) of
+        undefined -> error;
+        #mqtt_client{client_pid = Pid}-> emqttd_client:kick(Pid)
+    end;
+kick_client(Node, ClientId) ->
+    rpc_call(Node, kick_client, [Node, ClientId]).
+
+%%--------------------------------------------------------------------
+%% Internel Functions.
+%%--------------------------------------------------------------------
+
+rpc_call(Node, Fun, Args) ->
+    case rpc:call(Node, ?MODULE, Fun, Args) of
+        {badrpc, Reason} -> {error, Reason};
+        Res -> Res
+    end.
+
+kmg(Byte) when Byte > ?GB ->
+    float(Byte / ?GB, "G");
+kmg(Byte) when Byte > ?MB ->
+    float(Byte / ?MB, "M");
+kmg(Byte) when Byte > ?KB ->
+    float(Byte / ?MB, "K");
+kmg(Byte) ->
+    Byte.
+float(F, S) ->
+    iolist_to_binary(io_lib:format("~.2f~s", [F, S])).
+
+validate(qos, Qos) ->
+    (Qos >= ?QOS_0) and (Qos =< ?QOS_2);
+
+validate(topic, Topic) ->
+    emqttd_topic:validate({name, Topic}).
+
+client_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) ->
+    TotalNum = ets:info(mqtt_client, size),
+    Qh = qlc:q([R || R <- ets:table(mqtt_client)]),
+    query_table(Qh, PageNo, PageSize, TotalNum);
+
+client_list(ClientId, PageNo, PageSize) ->
+    Fun = fun() -> ets:lookup(mqtt_client, ClientId) end,
+    lookup_table(Fun, PageNo, PageSize).
+
+session_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) ->
+    TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_session]]),
+    Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_session]]),
+    query_table(Qh, PageNo, PageSize, TotalNum);
+
+session_list(ClientId, PageNo, PageSize) ->
+    MP = {ClientId, '_', '_', '_'},
+    Fun = fun() -> lists:append([ets:match_object(Tab, MP) || Tab <- [mqtt_local_session]]) end,
+    lookup_table(Fun, PageNo, PageSize).
+
+subscription_list(Key, PageNo, PageSize) when ?EMPTY_KEY(Key) ->
+    TotalNum = ets:info(mqtt_subproperty, size),
+    Qh = qlc:q([E || E <- ets:table(mqtt_subproperty)]),
+    query_table(Qh, PageNo, PageSize, TotalNum);
+
+subscription_list(Key, PageNo, PageSize) ->
+    Keys = ets:lookup(mqtt_subscription, Key),
+    Fun = case length(Keys) == 0 of
+        true ->
+            MP = {{Key, '_'}, '_'},
+            fun() -> ets:match_object(mqtt_subproperty, MP) end;
+        false ->
+            fun() ->
+                lists:map(fun({S, T}) ->[R] = ets:lookup(mqtt_subproperty, {T, S}), R end, Keys)
+            end
+    end,
+    lookup_table(Fun, PageNo, PageSize).
+
+route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) ->
+    TotalNum = lists:sum([ets:info(Tab, size) || Tab <- tables()]),
+    Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- tables()]),
+    query_table(Qh, PageNo, PageSize, TotalNum);
+
+route_list(Topic, PageNo, PageSize) ->
+    Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- tables()]) end,
+    lookup_table(Fun, PageNo, PageSize).
+
+tables() ->
+    [mqtt_route, mqtt_local_route].
+
+format_error(Val, Msg) ->
+    re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]).
+

+ 9 - 6
src/emqttd_protocol.erl

@@ -42,8 +42,9 @@
 %% ws_initial_headers: Headers from first HTTP request for WebSocket Client.
 -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid,
                       clean_sess, proto_ver, proto_name, username, is_superuser,
-                      will_msg, keepalive, max_clientid_len, session, stats_data,
-                      mountpoint, ws_initial_headers, connected_at}).
+                      will_msg, keepalive, keepalive_backoff, max_clientid_len,
+                      session, stats_data, mountpoint, ws_initial_headers,
+                      connected_at}).
 
 -type(proto_state() :: #proto_state{}).
 
@@ -58,6 +59,7 @@
 
 %% @doc Init protocol
 init(Peername, SendFun, Opts) ->
+    Backoff = get_value(keepalive_backoff, Opts, 1.25),
     EnableStats = get_value(client_enable_stats, Opts, false),
     MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
     WsInitialHeaders = get_value(ws_initial_headers, Opts),
@@ -67,6 +69,7 @@ init(Peername, SendFun, Opts) ->
                  is_superuser       = false,
                  client_pid         = self(),
                  ws_initial_headers = WsInitialHeaders,
+                 keepalive_backoff  = Backoff,
                  stats_data         = #proto_stats{enable_stats = EnableStats}}.
 
 init(Conn, Peername, SendFun, Opts) ->
@@ -202,7 +205,7 @@ process(?CONNECT_PACKET(Var), State0) ->
                             %% Register the client
                             emqttd_cm:reg(client(State2)),
                             %% Start keepalive
-                            start_keepalive(KeepAlive),
+                            start_keepalive(KeepAlive, State2),
                             %% Emit Stats
                             self() ! emit_stats,
                             %% ACCEPT
@@ -411,10 +414,10 @@ send_willmsg(_Client, undefined) ->
 send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
     emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
 
-start_keepalive(0) -> ignore;
+start_keepalive(0, _State) -> ignore;
 
-start_keepalive(Sec) when Sec > 0 ->
-    self() ! {keepalive, start, round(Sec * 1.25)}.
+start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 ->
+    self() ! {keepalive, start, round(Sec * Backoff)}.
 
 %%--------------------------------------------------------------------
 %% Validate Packets

+ 399 - 0
src/emqttd_rest_api.erl

@@ -0,0 +1,399 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module (emqttd_rest_api).
+
+-include("emqttd.hrl").
+
+-include("emqttd_internal.hrl").
+
+-http_api({"^nodes/(.+?)/alarms/?$", 'GET', alarm_list, []}).
+
+-http_api({"^nodes/(.+?)/clients/?$", 'GET', client_list, []}).
+-http_api({"^nodes/(.+?)/clients/(.+?)/?$", 'GET',client_list, []}).
+-http_api({"^clients/(.+?)/?$", 'GET', client, []}).
+-http_api({"^kick_client/(.+?)/?$", 'PUT', kick_client, []}).
+
+-http_api({"^routes?$", 'GET', route_list, []}).
+-http_api({"^routes/(.+?)/?$", 'GET', route, []}).
+
+-http_api({"^nodes/(.+?)/sessions/?$", 'GET', session_list, []}).
+-http_api({"^nodes/(.+?)/sessions/(.+?)/?$", 'GET', session_list, []}).
+-http_api({"^sessions/(.+?)/?$", 'GET', session, []}).
+
+-http_api({"^nodes/(.+?)/subscriptions/?$", 'GET', subscription_list, []}).
+-http_api({"^nodes/(.+?)/subscriptions/(.+?)/?$", 'GET', subscription_list, []}).
+-http_api({"^subscriptions/(.+?)/?$", 'GET', subscription, []}).
+
+-http_api({"^mqtt/publish?$", 'POST', publish, [{<<"topic">>, binary}]}).
+-http_api({"^mqtt/subscribe?$", 'POST', subscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}).
+-http_api({"^mqtt/unsubscribe?$", 'POST', unsubscribe, [{<<"client_id">>, binary},{<<"topic">>, binary}]}).
+
+-http_api({"^management/nodes/?$", 'GET', brokers, []}).
+-http_api({"^management/nodes/(.+?)/?$", 'GET', broker, []}).
+-http_api({"^monitoring/nodes/?$", 'GET', nodes, []}).
+-http_api({"^monitoring/nodes/(.+?)/?$", 'GET', node, []}).
+-http_api({"^monitoring/listeners/?$", 'GET', listeners, []}).
+-http_api({"^monitoring/listeners/(.+?)/?$", 'GET', listener, []}).
+-http_api({"^monitoring/metrics/?$", 'GET', metrics, []}).
+-http_api({"^monitoring/metrics/(.+?)/?$", 'GET', metric, []}).
+-http_api({"^monitoring/stats/?$", 'GET', stats, []}).
+-http_api({"^monitoring/stats/(.+?)/?$", 'GET', stat, []}).
+
+-http_api({"^nodes/(.+?)/plugins/?$", 'GET', plugin_list, []}).
+-http_api({"^nodes/(.+?)/plugins/(.+?)/?$", 'PUT', enabled, [{<<"active">>, bool}]}).
+
+-export([alarm_list/3]).
+-export([client/3, client_list/3, client_list/4, kick_client/3]).
+-export([route/3, route_list/2]).
+-export([session/3, session_list/3, session_list/4]).
+-export([subscription/3, subscription_list/3, subscription_list/4]).
+-export([nodes/2, node/3, brokers/2, broker/3, listeners/2, listener/3, metrics/2, metric/3, stats/2, stat/3]).
+-export([publish/2, subscribe/2, unsubscribe/2]).
+-export([plugin_list/3, enabled/4]).
+
+%%--------------------------------------------------------------------------
+%% alarm
+%%--------------------------------------------------------------------------
+alarm_list('GET', _Req, _Node) ->
+    Alarms = emqttd_mgmt:alarm_list(),
+    {ok, lists:map(fun alarm_row/1, Alarms)}.
+
+alarm_row(#mqtt_alarm{id        = AlarmId,
+                      severity  = Severity,
+                      title     = Title,
+                      summary   = Summary,
+                      timestamp = Timestamp}) ->
+    [{id, AlarmId},
+     {severity, Severity},
+     {title, l2b(Title)},
+     {summary, l2b(Summary)},
+     {occurred_at, l2b(strftime(Timestamp))}].
+
+%%--------------------------------------------------------------------------
+%% client
+%%--------------------------------------------------------------------------
+client('GET', _Params, Key) ->
+    Data = emqttd_mgmt:client(l2b(Key)),
+    {ok, [{objects, [client_row(Row) || Row <- Data]}]}.
+
+client_list('GET', Params, Node) ->
+    {PageNo, PageSize} = page_params(Params),
+    Data = emqttd_mgmt:client_list(l2a(Node), undefined, PageNo, PageSize),
+    Rows = proplists:get_value(result, Data),
+            TotalPage = proplists:get_value(totalPage, Data),
+            TotalNum  = proplists:get_value(totalNum, Data),
+            {ok, [{current_page, PageNo}, 
+                  {page_size, PageSize},
+                  {total_num, TotalNum},
+                  {total_page, TotalPage},
+                  {objects, [client_row(Row) || Row <- Rows]}]}.
+
+client_list('GET', Params, Node, Key) ->
+    {PageNo, PageSize} = page_params(Params),
+    Data = emqttd_mgmt:client_list(l2a(Node), l2b(Key), PageNo, PageSize),
+    {ok, [{objects, [client_row(Row) || Row <- Data]}]}.
+
+kick_client('PUT', _Params, Key) ->
+    case emqttd_mgmt:kick_client(l2b(Key)) of
+        ok -> {ok, []};
+        error -> {error, [{code, ?ERROR12}]}
+    end.
+
+client_row(#mqtt_client{client_id = ClientId,
+                 peername = {IpAddr, Port},
+                 username = Username,
+                 clean_sess = CleanSess,
+                 proto_ver = ProtoVer,
+                 keepalive = KeepAlvie,
+                 connected_at = ConnectedAt}) ->
+    [{client_id, ClientId},
+     {username, Username},
+     {ipaddress, l2b(ntoa(IpAddr))},
+     {port, Port},
+     {clean_sess, CleanSess},
+     {proto_ver, ProtoVer},
+     {keepalive, KeepAlvie},
+     {connected_at, l2b(strftime(ConnectedAt))}].
+
+%%--------------------------------------------------------------------------
+%% route
+%%--------------------------------------------------------------------------
+route('GET', _Params, Key) ->
+    Data = emqttd_mgmt:route(l2b(Key)),
+    {ok, [{objects, [route_row(Row) || Row <- Data]}]}.
+
+route_list('GET', Params) ->
+    {PageNo, PageSize} = page_params(Params),
+    Data = emqttd_mgmt:route_list(undefined, PageNo, PageSize),
+    Rows = proplists:get_value(result, Data),
+    TotalPage = proplists:get_value(totalPage, Data),
+    TotalNum  = proplists:get_value(totalNum, Data),
+    {ok, [{current_page, PageNo}, 
+          {page_size, PageSize},
+          {total_num, TotalNum},
+          {total_page, TotalPage},
+          {objects, [route_row(Row) || Row <- Rows]}]}.
+
+route_row(Route) when is_record(Route, mqtt_route) ->
+    [{topic, Route#mqtt_route.topic}, {node, Route#mqtt_route.node}];
+
+route_row({Topic, Node}) ->
+    [{topic, Topic}, {node, Node}].
+
+%%--------------------------------------------------------------------------
+%% session
+%%--------------------------------------------------------------------------
+session('GET', _Params, Key) ->
+    Data = emqttd_mgmt:session(l2b(Key)),
+    {ok, [{objects, [session_row(Row) || Row <- Data]}]}.
+
+session_list('GET', Params, Node) ->
+    {PageNo, PageSize} = page_params(Params),
+    Data = emqttd_mgmt:session_list(l2a(Node), undefined, PageNo, PageSize),
+    Rows = proplists:get_value(result, Data),
+    TotalPage = proplists:get_value(totalPage, Data),
+    TotalNum  = proplists:get_value(totalNum, Data),
+    {ok, [{current_page, PageNo}, 
+          {page_size, PageSize},
+          {total_num, TotalNum},
+          {total_page, TotalPage},
+          {objects, [session_row(Row) || Row <- Rows]}]}.
+
+session_list('GET', Params, Node, ClientId) ->
+    {PageNo, PageSize} = page_params(Params),
+    Data = emqttd_mgmt:session_list(l2a(Node), l2b(ClientId), PageNo, PageSize),
+    {ok, [{objects, [session_row(Row) || Row <- Data]}]}.
+
+session_row({ClientId, _Pid, _Persistent, Session}) ->
+    InfoKeys = [clean_sess, max_inflight, inflight_queue, message_queue,
+                message_dropped, awaiting_rel, awaiting_ack, awaiting_comp, created_at],
+     [{client_id, ClientId} | [{Key, format(Key, proplists:get_value(Key, Session))} || Key <- InfoKeys]].
+
+%%--------------------------------------------------------------------------
+%% subscription
+%%--------------------------------------------------------------------------
+subscription('GET', _Params, Key) ->
+    Data = emqttd_mgmt:subscription(l2b(Key)),
+    {ok, [{objects, [subscription_row(Row) || Row <- Data]}]}.
+
+subscription_list('GET', Params, Node) ->
+    {PageNo, PageSize} = page_params(Params),
+    Data = emqttd_mgmt:subscription_list(l2a(Node), undefined, PageNo, PageSize),
+    Rows = proplists:get_value(result, Data),
+    TotalPage = proplists:get_value(totalPage, Data),
+    TotalNum  = proplists:get_value(totalNum, Data),
+    {ok, [{current_page, PageNo}, 
+          {page_size, PageSize},
+          {total_num, TotalNum},
+          {total_page, TotalPage},
+          {objects, [subscription_row(Row) || Row <- Rows]}]}.
+
+subscription_list('GET', Params, Node, Key) ->
+    {PageNo, PageSize} = page_params(Params),
+    Data = emqttd_mgmt:subscription_list(l2a(Node), l2b(Key), PageNo, PageSize),
+    {ok, [{objects, [subscription_row(Row) || Row <- Data]}]}.
+
+subscription_row({{Topic, ClientId}, Option}) when is_pid(ClientId) ->
+    subscription_row({{Topic, l2b(pid_to_list(ClientId))}, Option});    
+subscription_row({{Topic, ClientId}, Option}) ->
+    Qos = proplists:get_value(qos, Option),
+    [{client_id, ClientId}, {topic, Topic}, {qos, Qos}].
+
+%%--------------------------------------------------------------------------
+%% management/monitoring
+%%--------------------------------------------------------------------------
+nodes('GET', _Params) ->
+    Data = emqttd_mgmt:nodes_info(),
+    {ok, format_broker(Data)}.
+
+node('GET', _Params, Node) ->
+    Data = emqttd_mgmt:node_info(l2a(Node)),
+    {ok, format_broker(Data)}.
+
+brokers('GET', _Params) ->
+    Data = emqttd_mgmt:brokers(),
+    {ok, [format_broker(Node, Broker) || {Node, Broker} <- Data]}.
+
+broker('GET', _Params, Node) ->
+    Data = emqttd_mgmt:broker(l2a(Node)),
+    {ok, format_broker(Data)}.
+
+listeners('GET', _Params) ->
+    Data = emqttd_mgmt:listeners(),
+    {ok, [{Node, format_listeners(Listeners, [])} || {Node, Listeners} <- Data]}.
+
+listener('GET', _Params, Node) ->
+    Data = emqttd_mgmt:listener(l2a(Node)),
+    {ok, [format_listener(Listeners) || Listeners <- Data]}.
+
+metrics('GET', _Params) ->
+    Data = emqttd_mgmt:metrics(),
+    {ok, Data}.
+
+metric('GET', _Params, Node) ->
+    Data = emqttd_mgmt:metrics(l2a(Node)),
+    {ok, Data}.
+
+stats('GET', _Params) ->
+    Data = emqttd_mgmt:stats(),
+    {ok, Data}.
+
+stat('GET', _Params, Node) ->
+    Data = emqttd_mgmt:stats(l2a(Node)),
+    {ok, Data}.
+
+format_broker(Node, Broker) ->
+    OtpRel  = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
+    [{name,     Node},
+     {version,  bin(proplists:get_value(version, Broker))},
+     {sysdescr, bin(proplists:get_value(sysdescr, Broker))},
+     {uptime,   bin(proplists:get_value(uptime, Broker))},
+     {datetime, bin(proplists:get_value(datetime, Broker))},
+     {otp_release, l2b(OtpRel)},
+     {node_status, 'Running'}].
+
+format_broker(Broker) ->
+    OtpRel  = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version),
+    [{version,  bin(proplists:get_value(version, Broker))},
+     {sysdescr, bin(proplists:get_value(sysdescr, Broker))},
+     {uptime,   bin(proplists:get_value(uptime, Broker))},
+     {datetime, bin(proplists:get_value(datetime, Broker))},
+     {otp_release, l2b(OtpRel)},
+     {node_status, 'Running'}].
+
+format_listeners([], Acc) ->
+    Acc;
+format_listeners([{Protocol, ListenOn, Info}| Listeners], Acc) ->
+    format_listeners(Listeners, [format_listener({Protocol, ListenOn, Info}) | Acc]).
+
+format_listener({Protocol, ListenOn, Info}) ->
+    Listen = l2b(esockd:to_string(ListenOn)),
+    lists:append([{protocol, Protocol}, {listen, Listen}], Info).
+
+%%--------------------------------------------------------------------------
+%% mqtt
+%%--------------------------------------------------------------------------
+publish('POST', Params) ->
+    Topic = proplists:get_value(<<"topic">>, Params),
+    ClientId = proplists:get_value(<<"client_id">>, Params, http),
+    Payload = proplists:get_value(<<"payload">>, Params, <<>>),
+    Qos     = proplists:get_value(<<"qos">>, Params, 0),
+    Retain  = proplists:get_value(<<"retain">>, Params, false),
+    case emqttd_mgmt:publish({ClientId, Topic, Payload, Qos, Retain}) of
+        ok ->
+            {ok, []};
+        {error, Error} ->
+            {error, [{code, ?ERROR2}, {message, Error}]}
+    end.
+
+subscribe('POST', Params) ->
+    ClientId = proplists:get_value(<<"client_id">>, Params),
+    Topic    = proplists:get_value(<<"topic">>, Params),
+    Qos      = proplists:get_value(<<"qos">>, Params, 0),
+    case emqttd_mgmt:subscribe({ClientId, Topic, Qos}) of
+        ok ->
+            {ok, []};
+        {error, Error} ->
+            {error, [{code, ?ERROR2}, {message, Error}]}
+    end.
+
+unsubscribe('POST', Params) ->
+    ClientId = proplists:get_value(<<"client_id">>, Params),
+    Topic    = proplists:get_value(<<"topic">>, Params),
+    case emqttd_mgmt:unsubscribe({ClientId, Topic})of
+        ok ->
+            {ok, []};
+        {error, Error} ->
+            {error, [{code, ?ERROR2}, {message, Error}]}
+    end.
+
+%%--------------------------------------------------------------------------
+%% plugins
+%%--------------------------------------------------------------------------
+plugin_list('GET', _Params, Node) ->
+    Plugins = lists:map(fun plugin/1, emqttd_mgmt:plugin_list(l2a(Node))),
+    {ok, Plugins}.
+
+enabled('PUT', Params, Node, PluginName) ->
+    Active = proplists:get_value(<<"active">>, Params),
+    case Active of
+        true ->
+            return(emqttd_mgmt:plugin_load(l2a(Node), l2a(PluginName)));
+        false ->
+            return(emqttd_mgmt:plugin_unload(l2a(Node), l2a(PluginName)))
+    end.
+
+return(Result) ->
+    case Result of
+        {ok, _} ->
+            {ok, []};
+        {error, already_started} ->
+            {error, [{code, ?ERROR10}, {message, <<"already_started">>}]};
+        {error, not_started} ->
+            {error, [{code, ?ERROR11}, {message, <<"not_started">>}]};
+        Error ->
+            lager:error("error:~p", [Error]),
+            {error, [{code, ?ERROR2}, {message, <<"unknown">>}]}
+    end.
+plugin(#mqtt_plugin{name = Name, version = Ver, descr = Descr,
+                    active = Active}) ->
+    [{name, Name},
+     {version, iolist_to_binary(Ver)},
+     {description, iolist_to_binary(Descr)},
+     {active, Active}].
+
+%%--------------------------------------------------------------------------
+%% Inner function
+%%--------------------------------------------------------------------------
+format(created_at, Val) ->
+    l2b(strftime(Val));
+format(_, Val) ->
+    Val.
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+    inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
+ntoa(IP) ->
+    inet_parse:ntoa(IP).
+
+%%--------------------------------------------------------------------
+%% Strftime
+%%--------------------------------------------------------------------
+strftime({MegaSecs, Secs, _MicroSecs}) ->
+    strftime(datetime(MegaSecs * 1000000 + Secs));
+
+strftime({{Y,M,D}, {H,MM,S}}) ->
+    lists:flatten(
+        io_lib:format(
+            "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
+
+datetime(Timestamp) when is_integer(Timestamp) ->
+    Universal = calendar:gregorian_seconds_to_datetime(Timestamp +
+    calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})),
+    calendar:universal_time_to_local_time(Universal).
+
+bin(S) when is_list(S)   -> l2b(S);
+bin(A) when is_atom(A)   -> bin(atom_to_list(A));
+bin(B) when is_binary(B) -> B;
+bin(undefined) -> <<>>.
+int(L) -> list_to_integer(L).
+l2a(L) -> list_to_atom(L).
+l2b(L) -> list_to_binary(L).
+
+
+page_params(Params) ->
+    PageNo = int(proplists:get_value("curr_page", Params, "1")),
+    PageSize = int(proplists:get_value("page_size", Params, "20")),
+    {PageNo, PageSize}.

+ 2 - 3
src/emqttd_session.erl

@@ -174,8 +174,7 @@
 %% @doc Start a Session
 -spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}).
 start_link(CleanSess, {ClientId, Username}, ClientPid) ->
-    gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid],
-                           [{spawn_opt, ?FULLSWEEP_OPTS}]). %% Tune GC.
+    gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
 
 %%--------------------------------------------------------------------
 %% PubSub API
@@ -183,7 +182,7 @@ start_link(CleanSess, {ClientId, Username}, ClientPid) ->
 
 %% @doc Subscribe topics
 -spec(subscribe(pid(), [{binary(), [emqttd_topic:option()]}]) -> ok).
-subscribe(Session, TopicTable) ->%%TODO: the ack function??...
+subscribe(Session, TopicTable) -> %%TODO: the ack function??...
     gen_server2:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}).
 
 -spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqttd_topic:option()]}]) -> ok).

+ 71 - 7
test/emqttd_SUITE.erl

@@ -26,7 +26,7 @@
 
 -define(APP, emqttd).
 
--define(CONTENT_TYPE, "application/x-www-form-urlencoded").
+-define(CONTENT_TYPE, "application/json").
 
 -define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"},
                           {verify, verify_peer},
@@ -36,6 +36,24 @@
                           {cacertfile, "certs/cacert.pem"},
                           {certfile, "certs/client-cert.pem"}]).
 
+-define(URL, "http://localhost:8080/api/v2/").
+
+-define(APPL_JSON, "application/json").
+
+-define(PRINT(PATH), lists:flatten(io_lib:format(PATH, [atom_to_list(node())]))).
+
+-define(GET_API, ["management/nodes",
+                  ?PRINT("management/nodes/~s"),
+                  "monitoring/nodes",
+                  ?PRINT("monitoring/nodes/~s"),
+                  "monitoring/listeners",
+                  ?PRINT("monitoring/listeners/~s"),
+                  "monitoring/metrics",
+                  ?PRINT("monitoring/metrics/~s"),
+                  "monitoring/stats",
+                  ?PRINT("monitoring/stats/~s"),
+                  ?PRINT("nodes/~s/clients"),
+                  "routes"]).
 
 all() ->
     [{group, protocol},
@@ -49,6 +67,7 @@ all() ->
      {group, http},
      {group, alarms},
      {group, cli},
+     {group, get_api},
      {group, cleanSession}].
 
 groups() ->
@@ -103,13 +122,14 @@ groups() ->
          ]},
        cli_vm]},
     {cleanSession, [sequence],
-      [cleanSession_validate
-       ]}].
+      [cleanSession_validate]},
+    {get_api, [sequence], [get_api_lists]}].
 
 init_per_suite(Config) ->
     NewConfig = generate_config(),
     lists:foreach(fun set_app_env/1, NewConfig),
     application:ensure_all_started(?APP),
+    timer:sleep(6000),
     Config.
 
 end_per_suite(_Config) ->
@@ -137,6 +157,7 @@ mqtt_ssl_oneway(_) ->
     emqttd:stop(),
     change_opts(ssl_oneway),
     emqttd:start(),
+    timer:sleep(6000),
     {ok, SslOneWay} = emqttc:start_link([{host, "localhost"},
                                          {port, 8883},
                                          {client_id, <<"ssloneway">>}, ssl]),
@@ -158,6 +179,7 @@ mqtt_ssl_twoway(_) ->
     emqttd:stop(),
     change_opts(ssl_twoway),
     emqttd:start(),
+    timer:sleep(6000),
     ClientSSl = [{Key, local_path(["etc", File])} ||
                  {Key, File} <- ?MQTT_SSL_CLIENT],
     {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"},
@@ -427,13 +449,21 @@ request_status(_) ->
     ?assertEqual(binary_to_list(Status), Return).
 
 request_publish(_) ->
+    emqttc:start_link([{host, "localhost"},
+                       {port, 1883},
+                       {client_id, <<"random">>},
+                       {clean_sess, false}]),
+    SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
+    ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("", ""))),
     ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]),
-    Params = "qos=1&retain=0&topic=a/b/c&message=hello",
-    ?assert(connect_emqttd_publish_(post, "mqtt/publish", Params, auth_header_("", ""))),
+    Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}",
+    ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("", ""))),
     ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
-    emqttd:unsubscribe(<<"a/b/c">>).
 
-connect_emqttd_publish_(Method, Api, Params, Auth) ->
+    UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}",
+    ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("", ""))).
+
+connect_emqttd_pubsub_(Method, Api, Params, Auth) ->
     Url = "http://127.0.0.1:8080/" ++ Api,
     case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of
     {error, socket_closed_remotely} ->
@@ -593,6 +623,9 @@ cleanSession_validate(_) ->
     emqttc:disconnect(Pub),
     emqttc:disconnect(C11).
 
+get_api_lists(_Config) ->
+    lists:foreach(fun request/1, ?GET_API).
+
 change_opts(SslType) ->
     {ok, Listeners} = application:get_env(?APP, listeners),
     NewListeners =
@@ -646,3 +679,34 @@ set_app_env({App, Lists}) ->
     lists:foreach(fun({Par, Var}) ->
                   application:set_env(App, Par, Var)
                   end, Lists).
+
+request(Path) ->
+    http_get(get, Path).
+
+http_get(Method, Path) ->
+    req(Method, Path, []).
+
+http_put(Method, Path, Params) ->
+    req(Method, Path, format_for_upload(Params)).
+
+http_post(Method, Path, Params) ->
+    req(Method, Path, format_for_upload(Params)).
+
+req(Method, Path, Body) ->
+   Url = ?URL ++ Path,
+   Headers = auth_header_("", ""),
+   case httpc:request(Method, {Url, [Headers]}, [], []) of
+   {error, socket_closed_remotely} ->
+       false;
+   {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} }  ->
+       true;
+   {ok, {{"HTTP/1.1", 400, _}, _, []}} ->
+       false;
+    {ok, {{"HTTP/1.1", 404, _}, _, []}} ->
+        false
+    end.
+
+format_for_upload(none) ->
+    <<"">>;
+format_for_upload(List) ->
+    iolist_to_binary(mochijson2:encode(List)).