Просмотр исходного кода

Merge pull request #1101 from emqtt/emq22

Version 2.2-rc.1
huangdan 8 лет назад
Родитель
Сommit
0067090d6a

+ 1 - 0
Makefile

@@ -14,6 +14,7 @@ dep_pbkdf2       = git https://github.com/emqtt/pbkdf2 2.0.1
 dep_lager_syslog = git https://github.com/basho/lager_syslog
 dep_bcrypt       = git https://github.com/smarkets/erlang-bcrypt master
 
+ERLC_OPTS += +debug_info
 ERLC_OPTS += +'{parse_transform, lager_transform}'
 
 NO_AUTOPATCH = cuttlefish

+ 11 - 0
etc/emq.conf

@@ -457,6 +457,17 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 
 ## listener.wss.external.fail_if_no_peer_cert = true
 
+##--------------------------------------------------------------------
+## HTTP Management API Listener
+
+listener.api.mgmt = 127.0.0.1:8080
+
+listener.api.mgmt.acceptors = 4
+
+listener.api.mgmt.max_clients = 64
+
+listener.api.mgmt.access.1 = allow all
+
 ##-------------------------------------------------------------------
 ## System Monitor
 ##-------------------------------------------------------------------

+ 99 - 1
priv/emq.schema

@@ -1020,15 +1020,113 @@ end}.
                        end
                    end,
 
+    ApiListeners = fun(Type, Name) ->
+                       Prefix = string:join(["listener", Type, Name], "."),
+                       case cuttlefish:conf_get(Prefix, Conf, undefined) of
+                           undefined ->
+                               [];
+                           ListenOn ->
+                               SslOpts1 = case SslOpts(Prefix) of
+                                        [] -> [];
+                                        SslOpts0 -> [{sslopts, SslOpts0}]
+                                      end,
+                               [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
+                                                        {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}]
+                       end
+                   end,
+
+
     lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn}
                                                <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf)
                                                ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)]
                   ++
                   [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn}
                                                <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf)
-                                               ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
+                                               ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]
+                  ++
+                  [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn}
+                                               <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)])
 end}.
 
+%%--------------------------------------------------------------------
+%% MQTT REST API Listeners
+
+{mapping, "listener.api.$name", "emqttd.listeners", [
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.backlog", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.buffer", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [
+  {datatype, {duration, ms}}
+]}.
+
+{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.certfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.verify", "emqttd.listeners", [
+  {datatype, atom}
+]}.
+
+{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}}
+]}.
+
 %%--------------------------------------------------------------------
 %% System Monitor
 %%--------------------------------------------------------------------

+ 7 - 2
src/emqttd_app.erl

@@ -165,11 +165,14 @@ start_listener({ssl, ListenOn, Opts}) ->
 
 %% Start http listener
 start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
-    mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_http, handle_request, []});
+    mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_ws, handle_request, []});
 
 %% Start https listener
 start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
-    mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_http, handle_request, []}).
+    mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []});
+
+start_listener({Proto, ListenOn, Opts}) when Proto == api ->
+    mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}).
 
 start_listener(Proto, ListenOn, Opts) ->
     Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])),
@@ -197,6 +200,8 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
     mochiweb:stop_http('mqtt:ws', ListenOn);
 stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
     mochiweb:stop_http('mqtt:wss', ListenOn);
+stop_listener({Proto, ListenOn, _Opts}) when Proto == api ->
+    mochiweb:stop_http('mqtt:api', ListenOn);
 stop_listener({Proto, ListenOn, _Opts}) ->
     esockd:close(Proto, ListenOn).
 

+ 1 - 5
src/emqttd_bridge.erl

@@ -37,7 +37,6 @@
 
 -record(state, {pool, id,
                 node, subtopic,
-                qos                = ?QOS_0,
                 topic_suffix       = <<>>,
                 topic_prefix       = <<>>,
                 mqueue             :: emqttd_mqueue:mqueue(),
@@ -45,8 +44,7 @@
                 ping_down_interval = ?PING_DOWN_INTERVAL,
                 status             = up}).
 
--type(option() :: {qos, mqtt_qos()} |
-                  {topic_suffix, binary()} |
+-type(option() :: {topic_suffix, binary()} |
                   {topic_prefix, binary()} |
                   {max_queue_len, pos_integer()} |
                   {ping_down_interval, pos_integer()}).
@@ -87,8 +85,6 @@ init([Pool, Id, Node, Topic, Options]) ->
 
 parse_opts([], State) ->
     State;
-parse_opts([{qos, Qos} | Opts], State) ->
-    parse_opts(Opts, State#state{qos = Qos});
 parse_opts([{topic_suffix, Suffix} | Opts], State) ->
     parse_opts(Opts, State#state{topic_suffix= Suffix});
 parse_opts([{topic_prefix, Prefix} | Opts], State) ->

+ 1 - 4
src/emqttd_cli.erl

@@ -326,12 +326,11 @@ bridges(["list"]) ->
 
 bridges(["options"]) ->
     ?PRINT_MSG("Options:~n"),
-    ?PRINT_MSG("  qos     = 0 | 1 | 2~n"),
     ?PRINT_MSG("  prefix  = string~n"),
     ?PRINT_MSG("  suffix  = string~n"),
     ?PRINT_MSG("  queue   = integer~n"),
     ?PRINT_MSG("Example:~n"),
-    ?PRINT_MSG("  qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
+    ?PRINT_MSG("  prefix=abc/,suffix=/yxz,queue=1000~n");
 
 bridges(["start", SNode, Topic]) ->
     case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of
@@ -363,8 +362,6 @@ parse_opts(Cmd, OptStr) ->
     Tokens = string:tokens(OptStr, ","),
     [parse_opt(Cmd, list_to_atom(Opt), Val)
         || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
-parse_opt(bridge, qos, Qos) ->
-    {qos, list_to_integer(Qos)};
 parse_opt(bridge, suffix, Suffix) ->
     {topic_suffix, bin(Suffix)};
 parse_opt(bridge, prefix, Prefix) ->

+ 1 - 1
src/emqttd_cluster.erl

@@ -50,7 +50,7 @@ prepare() ->
 %% @doc Is node in cluster?
 -spec(is_clustered(node()) -> boolean()).
 is_clustered(Node) ->
-    lists:member(Node, emqttd_mnesia:running_nodes()).
+    lists:member(Node, emqttd_mnesia:cluster_nodes(all)).
 
 %% @doc Reboot after join or leave cluster.
 -spec(reboot() -> ok).

+ 0 - 34
src/emqttd_http.erl

@@ -52,25 +52,6 @@ handle_request('POST', "/mqtt/publish", Req) ->
         false -> Req:respond({401, [], <<"Unauthorized">>})
     end;
 
-%%--------------------------------------------------------------------
-%% MQTT Over WebSocket
-%%--------------------------------------------------------------------
-
-handle_request('GET', "/mqtt", Req) ->
-    lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
-    Upgrade = Req:get_header_value("Upgrade"),
-    Proto   = check_protocol_header(Req),
-    case {is_websocket(Upgrade), Proto} of
-        {true, "mqtt" ++ _Vsn} ->
-            emqttd_ws:handle_request(Req);
-        {false, _} ->
-            lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
-            Req:respond({400, [], <<"Bad Request">>});
-        {_, Proto} ->
-            lager:error("WebSocket with error Protocol: ~s", [Proto]),
-            Req:respond({400, [], <<"Bad WebSocket Protocol">>})
-    end;
-
 %%--------------------------------------------------------------------
 %% Get static files
 %%--------------------------------------------------------------------
@@ -83,18 +64,6 @@ handle_request(Method, Path, Req) ->
     lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
     Req:not_found().
 
-check_protocol_header(Req) ->
-    case emqttd:env(websocket_protocol_header, false) of
-        true  -> get_protocol_header(Req);
-        false -> "mqtt-v3.1.1"
-    end.
-
-get_protocol_header(Req) ->
-    case Req:get_header_value("EMQ-WebSocket-Protocol") of
-        undefined -> Req:get_header_value("Sec-WebSocket-Protocol");
-        Proto     -> Proto
-    end.
-
 %%--------------------------------------------------------------------
 %% HTTP Publish
 %%--------------------------------------------------------------------
@@ -174,9 +143,6 @@ bool("1") -> true;
 bool(<<"0">>) -> false;
 bool(<<"1">>) -> true.
 
-is_websocket(Upgrade) ->
-    Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
-
 docroot() ->
     {file, Here} = code:is_loaded(?MODULE),
     Dir = filename:dirname(filename:dirname(Here)),

+ 14 - 4
src/emqttd_mnesia.erl

@@ -27,7 +27,7 @@
 
 %% Cluster mnesia
 -export([join_cluster/1, cluster_status/0, leave_cluster/0,
-         remove_from_cluster/1, running_nodes/0]).
+         remove_from_cluster/1, cluster_nodes/1, running_nodes/0]).
 
 %% Schema and tables
 -export([copy_schema/1, delete_schema/0, del_schema_copy/1,
@@ -186,9 +186,11 @@ remove_from_cluster(Node) when Node =/= node() ->
     case {is_node_in_cluster(Node), is_running_db_node(Node)} of
         {true, true} ->
             ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])),
+            mnesia_lib:del(extra_db_nodes, Node),
             ensure_ok(del_schema_copy(Node)),
             ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
         {true, false} ->
+            mnesia_lib:del(extra_db_nodes, Node),
             ensure_ok(del_schema_copy(Node));
             %ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
         {false, _} ->
@@ -213,10 +215,18 @@ connect(Node) ->
         Error        -> Error
     end.
 
-%% @doc Running nodes
+%% @doc Running nodes.
 -spec(running_nodes() -> list(node())).
-running_nodes() ->
-    mnesia:system_info(running_db_nodes).
+running_nodes() -> cluster_nodes(running).
+
+%% @doc Cluster nodes.
+-spec(cluster_nodes(all | running | stopped) -> [node()]).
+cluster_nodes(all) ->
+    mnesia:system_info(db_nodes);
+cluster_nodes(running) ->
+    mnesia:system_info(running_db_nodes);
+cluster_nodes(stopped) ->
+    cluster_nodes(all) -- cluster_nodes(running).
 
 %% @private
 ensure_ok(ok) -> ok;

+ 12 - 3
src/emqttd_sm.erl

@@ -33,7 +33,8 @@
 %% API Function Exports
 -export([start_link/2]).
 
--export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]).
+-export([start_session/2, lookup_session/1, register_session/3,
+         unregister_session/1, unregister_session/2]).
 
 -export([dispatch/3]).
 
@@ -99,9 +100,17 @@ register_session(ClientId, CleanSess, Properties) ->
     ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
 
 %% @doc Unregister a session.
--spec(unregister_session(binary()) -> true).
+-spec(unregister_session(binary()) -> boolean()).
 unregister_session(ClientId) ->
-    ets:delete(mqtt_local_session, ClientId).
+    unregister_session(ClientId, self()).
+
+unregister_session(ClientId, Pid) ->
+    case ets:lookup(mqtt_local_session, ClientId) of
+        [LocalSess = {_, Pid, _, _}] ->
+            ets:delete_object(mqtt_local_session, LocalSess);
+        _ ->
+            false
+    end.
 
 dispatch(ClientId, Topic, Msg) ->
     try ets:lookup_element(mqtt_local_session, ClientId, 2) of

+ 44 - 11
src/emqttd_ws.erl

@@ -31,20 +31,53 @@
               lager:Level("WsClient(~s): " ++ Format,
                           [esockd_net:format(State#wsocket_state.peername) | Args])).
 
+
+handle_request(Req) ->
+    handle_request(Req:get(method), Req:get(path), Req).
+
 %%--------------------------------------------------------------------
-%% Handle WebSocket Request
+%% MQTT Over WebSocket
 %%--------------------------------------------------------------------
+handle_request('GET', "/mqtt", Req) ->
+    lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
+    Upgrade = Req:get_header_value("Upgrade"),
+    Proto   = check_protocol_header(Req),
+    case {is_websocket(Upgrade), Proto} of
+        {true, "mqtt" ++ _Vsn} ->
+            {ok, ProtoEnv} = emqttd:env(protocol),
+            PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
+            Parser = emqttd_parser:initial_state(PacketSize),
+            %% Upgrade WebSocket.
+            {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
+            {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
+            ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
+                                     max_packet_size = PacketSize, client_pid = ClientPid});
+        {false, _} ->
+            lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
+            Req:respond({400, [], <<"Bad Request">>});
+        {_, Proto} ->
+            lager:error("WebSocket with error Protocol: ~s", [Proto]),
+            Req:respond({400, [], <<"Bad WebSocket Protocol">>})
+    end;
 
-%% @doc Handle WebSocket Request.
-handle_request(Req) ->
-    {ok, ProtoEnv} = emqttd:env(protocol),
-    PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
-    Parser = emqttd_parser:initial_state(PacketSize),
-    %% Upgrade WebSocket.
-    {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
-    {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
-    ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
-                             max_packet_size = PacketSize, client_pid = ClientPid}).
+handle_request(Method, Path, Req) ->
+    lager:error("Unexpected WS Request: ~s ~s", [Method, Path]),
+    Req:not_found().
+
+is_websocket(Upgrade) ->
+    Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
+
+check_protocol_header(Req) ->
+    case emqttd:env(websocket_protocol_header, false) of
+        true  -> get_protocol_header(Req);
+        false -> "mqtt-v3.1.1"
+    end.
+
+get_protocol_header(Req) ->
+    case Req:get_header_value("EMQ-WebSocket-Protocol") of
+        undefined -> Req:get_header_value("Sec-WebSocket-Protocol");
+        Proto     -> Proto
+    end.
 
 %%--------------------------------------------------------------------
 %% Receive Loop

+ 4 - 4
test/emqttd_SUITE.erl

@@ -433,7 +433,7 @@ request_status(_) ->
     end,
     Status = iolist_to_binary(io_lib:format("Node ~s is ~s~nemqttd is ~s",
             [node(), InternalStatus, AppStatus])),
-    Url = "http://127.0.0.1:8083/status",
+    Url = "http://127.0.0.1:8080/status",
     {ok, {{"HTTP/1.1", 200, "OK"}, _, Return}} =
     httpc:request(get, {Url, []}, [], []),
     ?assertEqual(binary_to_list(Status), Return).
@@ -446,7 +446,7 @@ request_publish(_) ->
     emqttd:unsubscribe(<<"a/b/c">>).
 
 connect_emqttd_publish_(Method, Api, Params, Auth) ->
-    Url = "http://127.0.0.1:8083/" ++ Api,
+    Url = "http://127.0.0.1:8080/" ++ Api,
     case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of
     {error, socket_closed_remotely} ->
         false;
@@ -647,8 +647,8 @@ conflict_listeners(_) ->
                {current_clients, esockd:get_current_clients(Pid)},
                {shutdown_count, esockd:get_shutdown_count(Pid)}]}
               end, esockd:listeners()),
-    ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:1883", Listeners))),
-    ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:1883", Listeners))),
+    ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))),
+    ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners))),
     emqttc:disconnect(C2).
 
 cli_vm(_) ->

+ 26 - 12
test/emqttd_SUITE_data/emqttd.conf

@@ -54,7 +54,7 @@ node.max_ets_tables = 256000
 node.fullsweep_after = 1000
 
 ## Crash dump
-node.crash_dump = log/crash.dump
+node.crash_dump = {{ platform_log_dir }}/crash.dump
 
 ## Distributed node ticktime
 node.dist_net_ticktime = 60
@@ -68,7 +68,7 @@ node.dist_listen_max = 6369
 ##--------------------------------------------------------------------
 
 ## Set the log dir
-log.dir = log
+log.dir = {{ platform_log_dir }}
 
 ## Console log. Enum: off, file, console, both
 log.console = console
@@ -83,15 +83,15 @@ log.syslog = on
 log.syslog.level = error
 
 ## Console log file
-## log.console.file = log/console.log
+## log.console.file = {{ platform_log_dir }}/console.log
 
 ## Error log file
-log.error.file = log/error.log
+log.error.file = {{ platform_log_dir }}/error.log
 
 ## Enable the crash log. Enum: on, off
 log.crash = on
 
-log.crash.file = log/crash.log
+log.crash.file = {{ platform_log_dir }}/crash.log
 
 ##--------------------------------------------------------------------
 ## Allow Anonymous and Default ACL
@@ -104,7 +104,7 @@ mqtt.allow_anonymous = true
 mqtt.acl_nomatch = allow
 
 ## Default ACL File
-mqtt.acl_file = etc/acl.conf
+mqtt.acl_file = {{ platform_etc_dir }}/acl.conf
 
 ## Cache ACL for PUBLISH
 mqtt.cache_acl = true
@@ -119,6 +119,9 @@ mqtt.max_clientid_len = 1024
 ## Max Packet Size Allowed, 64K by default.
 mqtt.max_packet_size = 64KB
 
+## Check Websocket Protocol Header. Enum: on, off
+mqtt.websocket_protocol_header = on
+
 ##--------------------------------------------------------------------
 ## MQTT Connection
 ##--------------------------------------------------------------------
@@ -188,7 +191,7 @@ mqtt.mqueue.type = simple
 
 ## Max queue length. Enqueued messages when persistent client disconnected,
 ## or inflight window is full. 0 means no limit.
-mqtt.mqueue.max_length = 0
+mqtt.mqueue.max_length = 1000
 
 ## Low-water mark of queued messages
 mqtt.mqueue.low_watermark = 20%
@@ -229,10 +232,10 @@ mqtt.bridge.ping_down_interval = 1
 ##-------------------------------------------------------------------
 
 ## Dir of plugins' config
-mqtt.plugins.etc_dir =etc/plugins/
+mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/
 
 ## File to store loaded plugin names.
-mqtt.plugins.loaded_file = data/loaded_plugins
+mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins
 
 ##--------------------------------------------------------------------
 ## MQTT Listeners
@@ -354,9 +357,9 @@ listener.ssl.external.keyfile = certs/key.pem
 
 listener.ssl.external.certfile = certs/cert.pem
 
-## listener.ssl.external.cacertfile = certs/cacert.pem
+## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
 
-## listener.ssl.external.dhfile = certs/dh-params.pem
+## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem
 
 ## listener.ssl.external.verify = verify_peer
 
@@ -437,12 +440,23 @@ listener.wss.external.keyfile = certs/key.pem
 
 listener.wss.external.certfile = certs/cert.pem
 
-## listener.wss.external.cacertfile = certs/cacert.pem
+## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
 
 ## listener.wss.external.verify = verify_peer
 
 ## listener.wss.external.fail_if_no_peer_cert = true
 
+##--------------------------------------------------------------------
+## HTTP Management API Listener
+
+listener.api.mgmt = 127.0.0.1:8080
+
+listener.api.mgmt.acceptors = 4
+
+listener.api.mgmt.max_clients = 64
+
+listener.api.mgmt.access.1 = allow all
+
 ##-------------------------------------------------------------------
 ## System Monitor
 ##-------------------------------------------------------------------

+ 104 - 1
test/emqttd_SUITE_data/emqttd.schema

@@ -346,6 +346,11 @@ end}.
    {max_packet_size,  cuttlefish:conf_get("mqtt.max_packet_size", Conf)}]
 end}.
 
+{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [
+  {default, on},
+  {datatype, flag}
+]}.
+
 %%--------------------------------------------------------------------
 %% MQTT Connection
 %%--------------------------------------------------------------------
@@ -1016,15 +1021,113 @@ end}.
                        end
                    end,
 
+    ApiListeners = fun(Type, Name) ->
+                       Prefix = string:join(["listener", Type, Name], "."),
+                       case cuttlefish:conf_get(Prefix, Conf, undefined) of
+                           undefined ->
+                               [];
+                           ListenOn ->
+                               SslOpts1 = case SslOpts(Prefix) of
+                                        [] -> [];
+                                        SslOpts0 -> [{sslopts, SslOpts0}]
+                                      end,
+                               [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
+                                                        {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}]
+                       end
+                   end,
+
+
     lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn}
                                                <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf)
                                                ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)]
                   ++
                   [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn}
                                                <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf)
-                                               ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
+                                               ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]
+                  ++
+                  [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn}
+                                               <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)])
 end}.
 
+%%--------------------------------------------------------------------
+%% MQTT REST API Listeners
+
+{mapping, "listener.api.$name", "emqttd.listeners", [
+  {datatype, [integer, ip]}
+]}.
+
+{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [
+  {default, 8},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.backlog", "emqttd.listeners", [
+  {default, 1024},
+  {datatype, integer}
+]}.
+
+{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.buffer", "emqttd.listeners", [
+  {datatype, bytesize},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [
+  {datatype, flag},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}},
+  hidden
+]}.
+
+{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [
+  {datatype, {duration, ms}}
+]}.
+
+{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.certfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.api.$name.verify", "emqttd.listeners", [
+  {datatype, atom}
+]}.
+
+{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [
+  {datatype, {enum, [true, false]}}
+]}.
+
 %%--------------------------------------------------------------------
 %% System Monitor
 %%--------------------------------------------------------------------