ソースを参照

Merge pull request #1146 from emqtt/emq24

Emq24
turtleDeng 8 年 前
コミット
2cdc0223d1

+ 5 - 4
Makefile

@@ -1,15 +1,16 @@
 PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
-PROJECT_VERSION = 2.2
+PROJECT_VERSION = 2.3
 
-DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt
+DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt
 
 dep_goldrush     = git https://github.com/basho/goldrush 0.1.9
 dep_gproc        = git https://github.com/uwiger/gproc
 dep_getopt       = git https://github.com/jcomellas/getopt v0.8.2
 dep_lager        = git https://github.com/basho/lager master
-dep_esockd       = git https://github.com/emqtt/esockd emq22
-dep_mochiweb     = git https://github.com/emqtt/mochiweb emq22
+dep_esockd       = git https://github.com/emqtt/esockd master
+dep_ekka         = git https://github.com/emqtt/ekka master
+dep_mochiweb     = git https://github.com/emqtt/mochiweb master
 dep_pbkdf2       = git https://github.com/emqtt/pbkdf2 2.0.1
 dep_lager_syslog = git https://github.com/basho/lager_syslog
 dep_bcrypt       = git https://github.com/smarkets/erlang-bcrypt master

+ 4 - 0
README.md

@@ -45,6 +45,8 @@ Please visit [emqtt.io](http://emqtt.io) for more service. Follow us on Twitter:
 * Passed eclipse paho interoperability tests
 * Local Subscription
 * Shared Subscription
+* Proxy Protocol V1/2
+* Lua Hook and Web Hook
 
 ## Installation
 
@@ -84,6 +86,8 @@ Plugin                                                                 | Descrip
 [emq_auth_mongo](https://github.com/emqtt/emq_auth_mongo)              | MongoDB Authentication/ACL Plugin
 [emq_auth_http](https://github.com/emqtt/emq_auth_http)                | Authentication/ACL by HTTP API
 [emq_auth_ldap](https://github.com/emqtt/emq_auth_ldap)                | LDAP Authentication Plugin
+[emq_web_hook](https://github.com/emqtt/emq-web-hook)                  | Web Hook Plugin
+[emq_lua_hook](https://github.com/emqtt/emq-lua-hook)                  | Lua Hook Plugin
 [emq_sn](https://github.com/emqtt/emq_sn)                              | MQTT-SN Protocol Plugin
 [emq_coap](https://github.com/emqtt/emq_coap)                          | CoAP Protocol Plugin
 [emq_stomp](https://github.com/emqtt/emq_stomp)                        | Stomp Protocol Plugin

+ 79 - 5
etc/emq.conf

@@ -7,11 +7,71 @@
 ## Cluster
 ##--------------------------------------------------------------------
 
-## The cluster Id
-cluster.id = emq
+## Cluster name
+cluster.name = ekka
 
-## The multicast address and port.
-cluster.multicast = 239.192.0.1:44369
+## Cluster Cookie
+cluster.cookie = ekkaclustercookie
+
+## Cluster Discovery: static | epmd | multicast | gossip | etcd | consul
+cluster.discovery = static
+
+## Cluster Autoheal: on | off
+cluster.autoheal = on
+
+## Clean down node of the cluster
+cluster.clean_down = 1h
+
+##--------------------------------------------------------------------
+## Cluster with epmd
+
+cluster.epmd.seeds = a@127.0.0.1,b@127.0.0.1
+
+##--------------------------------------------------------------------
+## Cluster with multicast
+
+## 1 second
+cluster.mcast.period = 1s
+
+cluster.mcast.addr = 239.192.0.1:4369
+
+cluster.mcast.iface = 0.0.0.0
+
+cluster.mcast.ttl = 1
+
+cluster.mcast.loop = on
+
+##--------------------------------------------------------------------
+## Cluster with Gossip
+
+cluster.gossip.seeds = 127.0.0.1:4369
+
+cluster.gossip.protocol_period = 1s
+
+cluster.gossip.suspicion_factor = 3
+
+##--------------------------------------------------------------------
+## Cluster with Etcd
+
+cluster.etcd.addr = 127.0.0.1:2367
+
+cluster.etcd.prefix = emq
+
+cluster.etcd.node_ttl = 30m
+
+##--------------------------------------------------------------------
+## Cluster by Consul
+
+cluster.consul.addr = 127.0.0.1:8500
+
+cluster.consul.acl_token = example-acl-token
+
+##--------------------------------------------------------------------
+## Discover by Kubernetes
+
+## cluster.k8s.selector = app=emq
+
+## cluster.k8s.node_basename = emq
 
 ##--------------------------------------------------------------------
 ## Node Args
@@ -21,7 +81,7 @@ cluster.multicast = 239.192.0.1:44369
 node.name = emqttd@127.0.0.1
 
 ## Cookie for distributed node
-node.cookie = emqsecretcookie
+## node.cookie = emqsecretcookie
 
 ## SMP support: enable, auto, disable
 node.smp = auto
@@ -85,6 +145,9 @@ log.syslog.level = error
 ## Console log file
 ## log.console.file = {{ platform_log_dir }}/console.log
 
+## Info log file
+## log.info.file = {{ platform_log_dir }}/info.log
+
 ## Error log file
 log.error.file = {{ platform_log_dir }}/error.log
 
@@ -400,6 +463,17 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 ### Notice: 'verify' should be configured as 'verify_peer'
 ## listener.ssl.external.peer_cert_as_username = cn
 
+## SSL Socket Options
+## listener.ssl.external.backlog = 1024
+
+## listener.ssl.external.recbuf = 4KB
+
+## listener.ssl.external.sndbuf = 4KB
+
+## listener.ssl.external.buffer = 4KB
+
+## listener.ssl.external.nodelay = true
+
 ##--------------------------------------------------------------------
 ## External MQTT/WebSocket Listener
 

+ 157 - 16
priv/emq.schema

@@ -5,26 +5,168 @@
 %% Cluster
 %%--------------------------------------------------------------------
 
-%% Cluster ID
-{mapping, "cluster.id", "emqttd.cluster", [
-  {default, "emq"},
+%% @doc Cluster name
+{mapping, "cluster.name", "ekka.cluster_name", [
+  {default, emqcl},
+  {datatype, atom}
+]}.
+
+%% @doc Secret cookie for the cluster
+{mapping, "cluster.cookie", "vm_args.-setcookie", [
+  {default, "emqclustercookie"}
+]}.
+
+%% @doc Cluster discovery
+{mapping, "cluster.discovery", "ekka.cluster_discovery", [
+  {default, manual},
+  {datatype, atom}
+]}.
+
+%% @doc Cluster autoheal
+{mapping, "cluster.autoheal", "ekka.cluster_autoheal", [
+  {datatype, flag},
+  {default, on}
+]}.
+
+
+%% @doc Clean down node from the cluster
+{mapping, "cluster.clean_down", "ekka.cluster_clean_down", [
+  {datatype, {duration, ms}},
+  {default, "1h"}
+]}.
+
+%%--------------------------------------------------------------------
+%% Cluster with epmd
+
+{mapping, "cluster.epmd.seeds", "ekka.cluster_discovery", [
   {datatype, string}
 ]}.
 
-%% Cluster Multicast Addr
-{mapping, "cluster.multicast", "emqttd.cluster", [
-  {default, "239.192.0.1:44369"},
+%%--------------------------------------------------------------------
+%% Cluster with IP Multicast
+
+{mapping, "cluster.mcast.addr", "ekka.cluster_discovery", [
+  {datatype, ip}
+]}.
+
+{mapping, "cluster.mcast.period", "ekka.cluster_discovery", [
+  {datatype, {duration, ms}},
+  {default, "1s"}
+]}.
+
+{mapping, "cluster.mcast.iface", "ekka.cluster_discovery", [
+  {datatype, string},
+  {default, "0.0.0.0"}
+]}.
+
+{mapping, "cluster.mcast.ttl", "ekka.cluster_discovery", [
+  {datatype, integer},
+  {default, 1}
+]}.
+
+{mapping, "cluster.mcast.loop", "ekka.cluster_discovery", [
+  {datatype, flag},
+  {default, on}
+]}.
+
+{mapping, "cluster.mcast.sndbuf", "ekka.cluster_discovery", [
+  {datatype, bytesize},
+  {default, "16KB"}
+]}.
+
+{mapping, "cluster.mcast.recbuf", "ekka.cluster_discovery", [
+  {datatype, bytesize},
+  {default, "16KB"}
+]}.
+
+{mapping, "cluster.mcast.buffer", "ekka.cluster_discovery", [
+  {datatype, bytesize},
+  {default, "32KB"}
+]}.
+
+{mapping, "cluster.gossip.seeds", "ekka.cluster_discovery", [
+  {datatype, string}
+]}.
+
+{mapping, "cluster.gossip.protocol_period", "ekka.cluster_discovery", [
+  {datatype, {duration, ms}},
+  {default, "1s"}
+]}.
+
+{mapping, "cluster.gossip.suspicion_factor", "ekka.cluster_discovery", [
+  {datatype, integer},
+  {default, 3}
+]}.
+
+%%--------------------------------------------------------------------
+%% Cluster with Etcd
+
+{mapping, "cluster.etcd.addr", "ekka.cluster_discovery", [
+  {datatype, string}
+]}.
+
+{mapping, "cluster.etcd.prefix", "ekka.cluster_discovery", [
+  {datatype, string}
+]}.
+
+{mapping, "cluster.etcd.node_ttl", "ekka.cluster_discovery", [
+  {datatype, {duration, ms}},
+  {default, "1m"}
+]}.
+
+%%--------------------------------------------------------------------
+%% Cluster with Consul
+
+{mapping, "cluster.consul.addr", "ekka.cluster_discovery", [
+  {datatype, ip}
+]}.
+
+{mapping, "cluster.consul.acl_token", "ekka.cluster_discovery", [
   {datatype, string}
 ]}.
 
-{translation, "emqttd.cluster", fun(Conf) ->
-    Multicast = cuttlefish:conf_get("cluster.multicast", Conf),
-    [Addr, Port] = string:tokens(Multicast, ":"),
-    {ok, Ip} = inet_parse:address(Addr),
-    [{id, cuttlefish:conf_get("cluster.id", Conf)},
-     {multicast, {Ip, list_to_integer(Port)}}]
+{translation, "ekka.cluster_discovery", fun(Conf) ->
+  Strategy = cuttlefish:conf_get("cluster.discovery", Conf),
+  Filter  = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
+  IpPort = fun(S) ->
+             [Addr, Port] = string:tokens(S, ":"),
+             {ok, Ip} = inet:parse_address(Addr),
+             {Ip, Port}
+           end,
+  Options = fun(static) ->
+                 [{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)}];
+               (mcast) ->
+                 {Addr, Port} = cuttlefish:conf_get("cluster.mcast.addr", Conf),
+                 {ok, Ip} = inet:parse_address(Addr),
+                 {ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)),
+                 [{addr, Ip}, {port, Port}, {iface, Iface},
+                  {period, cuttlefish:conf_get("cluster.mcast.period", Conf)},
+                  {ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)},
+                  {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}];
+               (etcd) ->
+                 [{seeds, cuttlefish:conf_get("cluster.epmd.seeds", Conf)},
+                  {clean_down, cuttlefish:conf_get("cluster.epmd.clean_down", Conf, undefined)}];
+               (gossip) ->
+                 [{seeds, [IpPort(S) || S <- string:tokens(",", cuttlefish:conf_get("cluster.gossip.seeds", Conf))]},
+                  {protocol_period, cuttlefish:conf_get("cluster.gossip.protocol_period", Conf)},
+                  {suspicion_factor, cuttlefish:conf_get("cluster.gossip.suspicion_factor", Conf, 3)}];
+               (etcd) ->
+                 [{addr, cuttlefish:conf_get("cluster.etcd.addr", Conf)},
+                  {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emq")},
+                  {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}];
+               (consul) ->
+                 [{addr, cuttlefish:conf_get("cluster.consul.addr", Conf)},
+                  {acl_token, cuttlefish:conf_get("cluster.consul.acl_token", Conf)}];
+               (k8s) ->
+                 [{host, cuttlefish:conf_get("cluster.k8s.selector", Conf)},
+                  {acl_token, cuttlefish:conf_get("cluster.k8s.node_basename", Conf)}];
+               (manual) ->
+                 [ ]
+            end,
+  {Strategy, Filter(Options(Strategy))}
 end}.
 
+
 %%--------------------------------------------------------------------
 %% Erlang Node
 %%--------------------------------------------------------------------
@@ -35,9 +177,9 @@ end}.
 ]}.
 
 %% @doc Secret cookie for distributed erlang node
-{mapping, "node.cookie", "vm_args.-setcookie", [
-  {default, "emqsecretcookie"}
-]}.
+%% {mapping, "node.cookie", "vm_args.-setcookie", [
+%%  {default, "emqsecretcookie"}
+%%]}.
 
 %% @doc SMP Support
 {mapping, "node.smp", "vm_args.-smp", [
@@ -258,7 +400,6 @@ end}.
       both -> [ConsoleHandler, ConsoleFileHandler];
       _ -> []
     end,
-
     SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of
       false -> [];
       true  -> [{lager_syslog_backend,

ファイルの差分が大きいため隠しています
+ 2 - 2
rebar.config


+ 18 - 0
src/emqttd.erl

@@ -40,6 +40,9 @@
 %% Debug API
 -export([dump/0]).
 
+%% Shutdown and reboot
+-export([shutdown/0, shutdown/1, reboot/0]).
+
 -type(subscriber() :: pid() | binary()).
 
 -type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}).
@@ -161,6 +164,21 @@ run_hooks(Hook, Args) ->
 run_hooks(Hook, Args, Acc) ->
     emqttd_hooks:run(Hook, Args, Acc).
 
+%%--------------------------------------------------------------------
+%% Shutdown and reboot
+%%--------------------------------------------------------------------
+
+shutdown() ->
+    shutdown(normal).
+
+shutdown(Reason) ->
+    lager:error("EMQ shutdown for ~s", [Reason]),
+    emqttd_plugins:unload(),
+    lists:foreach(fun application:stop/1, [emqttd, ekka, mochiweb, esockd, gproc]).
+
+reboot() ->
+    lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, ekka, emqttd]).
+
 %%--------------------------------------------------------------------
 %% Debug
 %%--------------------------------------------------------------------

+ 11 - 2
src/emqttd_app.erl

@@ -34,18 +34,19 @@
 -define(APP, emqttd).
 
 %%--------------------------------------------------------------------
-%% Application callbacks
+%% Application Callbacks
 %%--------------------------------------------------------------------
 
 start(_Type, _Args) ->
     print_banner(),
-    emqttd_mnesia:start(),
+    ekka:start(),
     {ok, Sup} = emqttd_sup:start_link(),
     start_servers(Sup),
     emqttd_cli:load(),
     register_acl_mod(),
     emqttd_plugins:init(),
     emqttd_plugins:load(),
+    init_cluster(),
     start_listeners(),
     register(emqttd, self()),
     print_vsn(),
@@ -146,6 +147,14 @@ register_acl_mod() ->
         undefined  -> ok
     end.
 
+%%--------------------------------------------------------------------
+%% Init Cluster
+%%--------------------------------------------------------------------
+
+init_cluster() ->
+    ekka:callback(prepare, fun emqttd:shutdown/1),
+    ekka:callback(reboot,  fun emqttd:reboot/0).
+
 %%--------------------------------------------------------------------
 %% Start Listeners
 %%--------------------------------------------------------------------

+ 6 - 6
src/emqttd_broker.erl

@@ -40,7 +40,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {started_at, sys_interval, heartbeat, tick_tref, version, sysdescr}).
+-record(state, {started_at, sys_interval, heartbeat, ticker, version, sysdescr}).
 
 -define(APP, emqttd).
 
@@ -122,9 +122,9 @@ init([]) ->
     % Tick
     {ok, #state{started_at = os:timestamp(),
                 heartbeat  = start_tick(1000, heartbeat),
-                version = list_to_binary(version()),
-                sysdescr = list_to_binary(sysdescr()),
-                tick_tref  = start_tick(tick)}, hibernate}.
+                version    = list_to_binary(version()),
+                sysdescr   = list_to_binary(sysdescr()),
+                ticker     = start_tick(tick)}, hibernate}.
 
 handle_call(uptime, _From, State) ->
     {reply, uptime(State), State};
@@ -149,7 +149,7 @@ handle_info(tick, State = #state{version = Version, sysdescr = Descr}) ->
 handle_info(Info, State) ->
     ?UNEXPECTED_INFO(Info, State).
 
-terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) ->
+terminate(_Reason, #state{heartbeat = Hb, ticker = TRef}) ->
     stop_tick(Hb),
     stop_tick(TRef),
     ok.
@@ -163,7 +163,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 retain(brokers) ->
     Payload = list_to_binary(string:join([atom_to_list(N) ||
-                    N <- emqttd_mnesia:running_nodes()], ",")),
+                    N <- ekka_mnesia:running_nodes()], ",")),
     Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload),
     emqttd:publish(emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg))).
 

+ 9 - 9
src/emqttd_cli.erl

@@ -111,7 +111,7 @@ broker(_) ->
 %% @doc Cluster with other nodes
 
 cluster(["join", SNode]) ->
-    case emqttd_cluster:join(emqttd_node:parse_name(SNode)) of
+    case ekka:join(ekka_node:parse_name(SNode)) of
         ok ->
             ?PRINT_MSG("Join the cluster successfully.~n"),
             cluster(["status"]);
@@ -120,7 +120,7 @@ cluster(["join", SNode]) ->
     end;
 
 cluster(["leave"]) ->
-    case emqttd_cluster:leave() of
+    case ekka:leave() of
         ok ->
             ?PRINT_MSG("Leave the cluster successfully.~n"),
             cluster(["status"]);
@@ -128,8 +128,8 @@ cluster(["leave"]) ->
             ?PRINT("Failed to leave the cluster: ~p~n", [Error])
     end;
 
-cluster(["remove", SNode]) ->
-    case emqttd_cluster:remove(emqttd_node:parse_name(SNode)) of
+cluster(["force-leave", SNode]) ->
+    case ekka:force_leave(ekka_node:parse_name(SNode)) of
         ok ->
             ?PRINT_MSG("Remove the node from cluster successfully.~n"),
             cluster(["status"]);
@@ -138,13 +138,13 @@ cluster(["remove", SNode]) ->
     end;
 
 cluster(["status"]) ->
-    ?PRINT("Cluster status: ~p~n", [emqttd_cluster:status()]);
+    ?PRINT("Cluster status: ~p~n", [ekka_cluster:status()]);
 
 cluster(_) ->
-    ?USAGE([{"cluster join <Node>",  "Join the cluster"},
-            {"cluster leave",        "Leave the cluster"},
-            {"cluster remove <Node>","Remove the node from cluster"},
-            {"cluster status",       "Cluster status"}]).
+    ?USAGE([{"cluster join <Node>",       "Join the cluster"},
+            {"cluster leave",             "Leave the cluster"},
+            {"cluster force-leave <Node>","Force the node leave from cluster"},
+            {"cluster status",            "Cluster status"}]).
 
 %%--------------------------------------------------------------------
 %% @doc Users usage

+ 0 - 92
src/emqttd_cluster.erl

@@ -1,92 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_cluster).
-
--author("Feng Lee <feng@emqtt.io>").
-
--include("emqttd.hrl").
-
-%% Cluster API
--export([join/1, leave/0, status/0, remove/1]).
-
-%% RPC Call
--export([prepare/0, reboot/0]).
-
-%% @doc Join cluster
--spec(join(node()) -> ok | {error, any()}).
-join(Node) when Node =:= node() ->
-    {error, {cannot_join_with_self, Node}};
-
-join(Node) when is_atom(Node) ->
-    case {is_clustered(Node), emqttd:is_running(Node)} of
-        {false, true} ->
-            prepare(), ok = emqttd_mnesia:join_cluster(Node), reboot();
-        {false, false} ->
-            {error, {node_not_running, Node}};
-        {true, _} ->
-            {error, {already_clustered, Node}}
-    end.
-
-%% @doc Prepare to join or leave cluster.
--spec(prepare() -> ok).
-prepare() ->
-    emqttd_plugins:unload(),
-    lists:foreach(fun application:stop/1, [emqttd, mochiweb, esockd, gproc]).
-
-%% @doc Is node in cluster?
--spec(is_clustered(node()) -> boolean()).
-is_clustered(Node) ->
-    lists:member(Node, emqttd_mnesia:cluster_nodes(all)).
-
-%% @doc Reboot after join or leave cluster.
--spec(reboot() -> ok).
-reboot() ->
-    lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, emqttd]).
-
-%% @doc Leave from Cluster.
--spec(leave() -> ok | {error, any()}).
-leave() ->
-    case emqttd_mnesia:running_nodes() -- [node()] of
-        [_|_] ->
-            prepare(), ok = emqttd_mnesia:leave_cluster(), reboot();
-        [] ->
-            {error, node_not_in_cluster}
-    end.
-
-%% @doc Remove a node from cluster.
--spec(remove(node()) -> ok | {error, any()}).
-remove(Node) when Node =:= node() ->
-    {error, {cannot_remove_self, Node}};
-
-remove(Node) ->
-    case is_clustered(Node) andalso rpc:call(Node, ?MODULE, prepare, []) of
-        ok ->
-            case emqttd_mnesia:remove_from_cluster(Node) of
-                ok    -> rpc:call(Node, ?MODULE, reboot, []);
-                Error -> Error
-            end;
-        false ->
-            {error, node_not_in_cluster};
-        {badrpc, nodedown} ->
-            emqttd_mnesia:remove_from_cluster(Node);
-        {badrpc, Reason} ->
-            {error, Reason}
-    end.
-
-%% @doc Cluster status
-status() -> emqttd_mnesia:cluster_status().
-

+ 1 - 1
src/emqttd_http.erl

@@ -70,7 +70,7 @@ handle_request(Method, Path, Req) ->
 
 http_publish(Req) ->
     Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)],
-    lager:info("HTTP Publish: ~p", [Params]),
+    lager:debug("HTTP Publish: ~p", [Params]),
     Topics   = topics(Params),
     ClientId = get_value(<<"client">>, Params, http),
     Qos      = int(get_value(<<"qos">>, Params, "0")),

+ 0 - 268
src/emqttd_mnesia.erl

@@ -1,268 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_mnesia).
-
--author("Feng Lee <feng@emqtt.io>").
-
--include("emqttd.hrl").
-
--include("emqttd_internal.hrl").
-
-%% Start and stop mnesia
--export([start/0, ensure_started/0, ensure_stopped/0, connect/1]).
-
-%% Cluster mnesia
--export([join_cluster/1, cluster_status/0, leave_cluster/0,
-         remove_from_cluster/1, cluster_nodes/1, running_nodes/0]).
-
-%% Schema and tables
--export([copy_schema/1, delete_schema/0, del_schema_copy/1,
-         create_table/2, copy_table/1, copy_table/2]).
-
-%%--------------------------------------------------------------------
-%% Start and init mnesia
-%%--------------------------------------------------------------------
-
-%% @doc Start mnesia database.
--spec(start() -> ok).
-start() ->
-    ensure_ok(ensure_data_dir()),
-    ensure_ok(init_schema()),
-    ok = mnesia:start(),
-    init_tables(),
-    wait_for(tables).
-
-%% @private
-ensure_data_dir() ->
-    Dir = mnesia:system_info(directory) ++ "/",
-    case filelib:ensure_dir(Dir) of
-        ok              -> ok;
-        {error, Reason} -> {error, {mnesia_dir_error, Dir, Reason}}
-    end.
-
-%% @doc ensure mnesia started.
--spec(ensure_started() -> ok | {error, any()}).
-ensure_started() ->
-    ok = mnesia:start(), wait_for(start).
-
-%% @doc ensure mnesia stopped.
--spec(ensure_stopped() -> ok | {error, any()}).
-ensure_stopped() ->
-    stopped = mnesia:stop(), wait_for(stop).
-
-%% @private
-%% @doc Init mnesia schema or tables.
-init_schema() ->
-    case mnesia:system_info(extra_db_nodes) of
-        []    -> mnesia:create_schema([node()]);
-        [_|_] -> ok
-    end.
-
-%% @private
-%% @doc Init mnesia tables.
-init_tables() ->
-    case mnesia:system_info(extra_db_nodes) of
-        []    -> create_tables();
-        [_|_] -> copy_tables()
-    end.
-
-%% @doc Create mnesia tables.
-create_tables() ->
-    emqttd_boot:apply_module_attributes(boot_mnesia).
-
-%% @doc Copy mnesia tables.
-copy_tables() ->
-    emqttd_boot:apply_module_attributes(copy_mnesia).
-
-%% @doc Create mnesia table.
--spec(create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}).
-create_table(Name, TabDef) ->
-    ensure_tab(mnesia:create_table(Name, TabDef)).
-
-%% @doc Copy mnesia table.
--spec(copy_table(Name :: atom()) -> ok).
-copy_table(Name) ->
-    copy_table(Name, ram_copies).
-
--spec(copy_table(Name:: atom(), ram_copies | disc_copies) -> ok).
-copy_table(Name, RamOrDisc) ->
-    ensure_tab(mnesia:add_table_copy(Name, node(), RamOrDisc)).
-
-%% @doc Copy schema.
-copy_schema(Node) ->
-    case mnesia:change_table_copy_type(schema, Node, disc_copies) of
-        {atomic, ok} ->
-            ok;
-        {aborted, {already_exists, schema, Node, disc_copies}} ->
-            ok;
-        {aborted, Error} ->
-            {error, Error}
-    end.
-
-%% @doc Force to delete schema.
-delete_schema() ->
-    mnesia:delete_schema([node()]).
-
-%% @doc Delete schema copy
-del_schema_copy(Node) ->
-    case mnesia:del_table_copy(schema, Node) of
-        {atomic, ok} -> ok;
-        {aborted, Reason} -> {error, Reason}
-    end.
-
-%%--------------------------------------------------------------------
-%% Cluster mnesia
-%%--------------------------------------------------------------------
-
-%% @doc Join the mnesia cluster
--spec(join_cluster(node()) -> ok).
-join_cluster(Node) when Node =/= node() ->
-    %% Stop mnesia and delete schema first
-    ensure_ok(ensure_stopped()),
-    ensure_ok(delete_schema()),
-    %% Start mnesia and cluster to node
-    ensure_ok(ensure_started()),
-    ensure_ok(connect(Node)),
-    ensure_ok(copy_schema(node())),
-    %% Copy tables
-    copy_tables(),
-    ensure_ok(wait_for(tables)).
-
-%% @doc Cluster status
--spec(cluster_status() -> list()).
-cluster_status() ->
-    Running = mnesia:system_info(running_db_nodes),
-    Stopped = mnesia:system_info(db_nodes) -- Running,
-    ?IF(Stopped =:= [], [{running_nodes, Running}],
-            [{running_nodes, Running}, {stopped_nodes, Stopped}]).
-
-%% @doc This node try leave the cluster
--spec(leave_cluster() -> ok | {error, any()}).
-leave_cluster() ->
-    case running_nodes() -- [node()] of
-        [] ->
-            {error, node_not_in_cluster};
-        Nodes ->
-            case lists:any(fun(Node) ->
-                            case leave_cluster(Node) of
-                                ok               -> true;
-                                {error, _Reason} -> false
-                            end
-                          end, Nodes) of
-                true  -> ok;
-                false -> {error, {failed_to_leave, Nodes}}
-            end
-    end.
-
--spec(leave_cluster(node()) -> ok | {error, any()}).
-leave_cluster(Node) when Node =/= node() ->
-    case is_running_db_node(Node) of
-        true ->
-            ensure_ok(ensure_stopped()),
-            ensure_ok(rpc:call(Node, ?MODULE, del_schema_copy, [node()])),
-            ensure_ok(delete_schema());
-            %%ensure_ok(start()); %% restart?
-        false ->
-            {error, {node_not_running, Node}}
-    end.
-
-%% @doc Remove node from mnesia cluster.
--spec(remove_from_cluster(node()) -> ok | {error, any()}).
-remove_from_cluster(Node) when Node =/= node() ->
-    case {is_node_in_cluster(Node), is_running_db_node(Node)} of
-        {true, true} ->
-            ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])),
-            mnesia_lib:del(extra_db_nodes, Node),
-            ensure_ok(del_schema_copy(Node)),
-            ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
-        {true, false} ->
-            mnesia_lib:del(extra_db_nodes, Node),
-            ensure_ok(del_schema_copy(Node));
-            %ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
-        {false, _} ->
-            {error, node_not_in_cluster}
-    end.
-
-%% @doc Is node in mnesia cluster.
-is_node_in_cluster(Node) ->
-    lists:member(Node, mnesia:system_info(db_nodes)).
-
-%% @private
-%% @doc Is running db node.
-is_running_db_node(Node) ->
-    lists:member(Node, running_nodes()).
-
-%% @doc Cluster with node.
--spec(connect(node()) -> ok | {error, any()}).
-connect(Node) ->
-    case mnesia:change_config(extra_db_nodes, [Node]) of
-        {ok, [Node]} -> ok;
-        {ok, []}     -> {error, {failed_to_connect_node, Node}};
-        Error        -> Error
-    end.
-
-%% @doc Running nodes.
--spec(running_nodes() -> list(node())).
-running_nodes() -> cluster_nodes(running).
-
-%% @doc Cluster nodes.
--spec(cluster_nodes(all | running | stopped) -> [node()]).
-cluster_nodes(all) ->
-    mnesia:system_info(db_nodes);
-cluster_nodes(running) ->
-    mnesia:system_info(running_db_nodes);
-cluster_nodes(stopped) ->
-    cluster_nodes(all) -- cluster_nodes(running).
-
-%% @private
-ensure_ok(ok) -> ok;
-ensure_ok({error, {_Node, {already_exists, _Node}}}) -> ok;
-ensure_ok({badrpc, Reason}) -> throw({error, {badrpc, Reason}});
-ensure_ok({error, Reason}) -> throw({error, Reason}).
-
-%% @private
-ensure_tab({atomic, ok})                             -> ok;
-ensure_tab({aborted, {already_exists, _Name}})       -> ok;
-ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok;
-ensure_tab({aborted, Error})                         -> Error.
-
-%% @doc Wait for mnesia to start, stop or tables ready.
--spec(wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}).
-wait_for(start) ->
-    case mnesia:system_info(is_running) of
-        yes      -> ok;
-        no       -> {error, mnesia_unexpectedly_stopped};
-        stopping -> {error, mnesia_unexpectedly_stopping};
-        starting -> timer:sleep(1000), wait_for(start)
-    end;
- 
-wait_for(stop) ->
-    case mnesia:system_info(is_running) of
-        no       -> ok;
-        yes      -> {error, mnesia_unexpectedly_running};
-        starting -> {error, mnesia_unexpectedly_starting};
-        stopping -> timer:sleep(1000), wait_for(stop)
-    end;
-
-wait_for(tables) ->
-    Tables = mnesia:system_info(local_tables),
-    case mnesia:wait_for_tables(Tables, 600000) of
-        ok                   -> ok;
-        {error, Reason}      -> {error, Reason};
-        {timeout, BadTables} -> {error, {timetout, BadTables}}
-    end.
-

+ 0 - 44
src/emqttd_node.erl

@@ -1,44 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqttd_node).
-
--author("Feng Lee <feng@emqtt.io>").
-
--import(lists, [concat/1]).
-
--export([is_aliving/1, parse_name/1]).
-
-%% @doc Is node aliving
--spec(is_aliving(node()) -> boolean()).
-is_aliving(Node) ->
-    case net_adm:ping(Node) of
-        pong -> true;
-        pang -> false
-    end.
-
-%% @doc Parse node name
--spec(parse_name(string()) -> atom()).
-parse_name(Name) when is_list(Name) ->
-    case string:tokens(Name, "@") of
-        [_Node, _Host] -> list_to_atom(Name);
-        _              -> with_host(Name)
-    end.
-
-with_host(Name) ->
-    [_, Host] = string:tokens(atom_to_list(node()), "@"),
-    list_to_atom(concat([Name, "@", Host])).
-

+ 5 - 5
src/emqttd_plugins.erl

@@ -156,8 +156,8 @@ load_app(App) ->
 start_app(App, SuccFun) ->
     case application:ensure_all_started(App) of
         {ok, Started} ->
-            lager:info("started Apps: ~p", [Started]),
-            lager:info("load plugin ~p successfully", [App]),
+            lager:info("Started Apps: ~p", [Started]),
+            lager:info("Load plugin ~p successfully", [App]),
             SuccFun(App),
             {ok, Started};
         {error, {ErrApp, Reason}} ->
@@ -196,11 +196,11 @@ unload_plugin(App, Persistent) ->
 stop_app(App) ->
     case application:stop(App) of
         ok ->
-            lager:info("stop plugin ~p successfully~n", [App]), ok;
+            lager:info("Stop plugin ~p successfully~n", [App]), ok;
         {error, {not_started, App}} ->
-            lager:error("plugin ~p is not started~n", [App]), ok;
+            lager:error("Plugin ~p is not started~n", [App]), ok;
         {error, Reason} ->
-            lager:error("stop plugin ~p error: ~p", [App]), {error, Reason}
+            lager:error("Stop plugin ~p error: ~p", [App]), {error, Reason}
     end.
 
 %%--------------------------------------------------------------------

+ 3 - 3
src/emqttd_protocol.erl

@@ -344,10 +344,10 @@ send(Packet = ?PACKET(Type),
     {ok, State#proto_state{stats_data = Stats1}}.
 
 trace(recv, Packet, ProtoState) ->
-    ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
+    ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
 
 trace(send, Packet, ProtoState) ->
-    ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
+    ?LOG(debug, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
 
 inc_stats(_Direct, _Type, Stats = #proto_stats{enable_stats = false}) ->
     Stats;
@@ -382,7 +382,7 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) ->
     ignore;
 
 shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
-    ?LOG(info, "Shutdown for ~p", [Error], State),
+    ?LOG(debug, "Shutdown for ~p", [Error], State),
     Client = client(State),
     send_willmsg(Client, WillMsg),
     emqttd_hooks:run('client.disconnected', [Error], Client),

+ 9 - 23
src/emqttd_router.erl

@@ -53,19 +53,19 @@
 %%--------------------------------------------------------------------
 
 mnesia(boot) ->
-    ok = emqttd_mnesia:create_table(mqtt_topic, [
+    ok = ekka_mnesia:create_table(mqtt_topic, [
                 {ram_copies, [node()]},
                 {record_name, mqtt_topic},
                 {attributes, record_info(fields, mqtt_topic)}]),
-    ok = emqttd_mnesia:create_table(mqtt_route, [
+    ok = ekka_mnesia:create_table(mqtt_route, [
                 {type, bag},
                 {ram_copies, [node()]},
                 {record_name, mqtt_route},
                 {attributes, record_info(fields, mqtt_route)}]);
 
 mnesia(copy) ->
-    ok = emqttd_mnesia:copy_table(mqtt_topic),
-    ok = emqttd_mnesia:copy_table(mqtt_route, ram_copies).
+    ok = ekka_mnesia:copy_table(mqtt_topic),
+    ok = ekka_mnesia:copy_table(mqtt_route, ram_copies).
 
 %%--------------------------------------------------------------------
 %% Start the Router
@@ -216,7 +216,7 @@ stop() -> gen_server:call(?ROUTER, stop).
 %%--------------------------------------------------------------------
 
 init([]) ->
-    mnesia:subscribe(system),
+    ekka:monitor(membership),
     ets:new(mqtt_local_route, [set, named_table, protected]),
     {ok, TRef}  = timer:send_interval(timer:seconds(1), stats),
     {ok, #state{stats_timer = TRef}}.
@@ -239,27 +239,13 @@ handle_cast({del_local_route, Topic}, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({mnesia_system_event, {mnesia_up, Node}}, State) ->
-    lager:error("Mnesia up: ~p~n", [Node]),
-    {noreply, State};
-
-handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
-    lager:error("Mnesia down: ~p~n", [Node]),
+handle_info({membership, {mnesia, down, Node}}, State) ->
     clean_routes_(Node),
     update_stats_(),
     {noreply, State, hibernate};
 
-handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) ->
-    %% 1. Backup and restart
-    %% 2. Set master nodes
-    lager:critical("Mnesia inconsistent_database event: ~p, ~p~n", [Context, Node]),
-    {noreply, State};
-
-handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) ->
-    lager:critical("Mnesia overload: ~p~n", [Details]),
-    {noreply, State};
-
-handle_info({mnesia_system_event, _Event}, State) ->
+handle_info({membership, _Event}, State) ->
+    %% ignore
     {noreply, State};
 
 handle_info(stats, State) ->
@@ -271,7 +257,7 @@ handle_info(_Info, State) ->
 
 terminate(_Reason, #state{stats_timer = TRef}) ->
     timer:cancel(TRef),
-    mnesia:unsubscribe(system).
+    ekka:unmonitor(membership).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.

+ 5 - 5
src/emqttd_server.erl

@@ -102,11 +102,11 @@ trace(publish, From, _Msg) when is_atom(From) ->
     %% Dont' trace '$SYS' publish
     ignore;
 trace(publish, {ClientId, Username}, #mqtt_message{topic = Topic, payload = Payload}) ->
-    lager:info([{client, ClientId}, {topic, Topic}],
-               "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);
-trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) when is_binary(From); is_list(From) ->
-    lager:info([{client, From}, {topic, Topic}],
-               "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
+    lager:debug([{client, ClientId}, {topic, Topic}],
+                "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);
+trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
+    lager:debug([{client, From}, {topic, Topic}],
+                "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
 
 %% @doc Unsubscribe
 -spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).

+ 5 - 5
src/emqttd_session.erl

@@ -379,7 +379,7 @@ handle_cast({subscribe, _From, TopicTable, AckFun},
             State = #state{client_id     = ClientId,
                            username      = Username,
                            subscriptions = Subscriptions}) ->
-    ?LOG(info, "Subscribe ~p", [TopicTable], State),
+    ?LOG(debug, "Subscribe ~p", [TopicTable], State),
     {GrantedQos, Subscriptions1} =
     lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) ->
                 NewQos = proplists:get_value(qos, Opts),
@@ -407,7 +407,7 @@ handle_cast({unsubscribe, _From, TopicTable},
             State = #state{client_id     = ClientId,
                            username      = Username,
                            subscriptions = Subscriptions}) ->
-    ?LOG(info, "Unsubscribe ~p", [TopicTable], State),
+    ?LOG(debug, "Unsubscribe ~p", [TopicTable], State),
     Subscriptions1 =
     lists:foldl(fun({Topic, Opts}, SubMap) ->
                 case maps:find(Topic, SubMap) of
@@ -482,7 +482,7 @@ handle_cast({resume, ClientId, ClientPid},
                            await_rel_timer = AwaitTimer,
                            expiry_timer    = ExpireTimer}) ->
 
-    ?LOG(info, "Resumed by ~p", [ClientPid], State),
+    ?LOG(debug, "Resumed by ~p", [ClientPid], State),
 
     %% Cancel Timers
     lists:foreach(fun emqttd_misc:cancel_timer/1,
@@ -552,7 +552,7 @@ handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
     hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined})));
 
 handle_info({timeout, _Timer, expired}, State) ->
-    ?LOG(info, "Expired, shutdown now.", [], State),
+    ?LOG(debug, "Expired, shutdown now.", [], State),
     shutdown(expired, State);
 
 handle_info({'EXIT', ClientPid, _Reason},
@@ -563,7 +563,7 @@ handle_info({'EXIT', ClientPid, Reason},
             State = #state{clean_sess      = false,
                            client_pid      = ClientPid,
                            expiry_interval = Interval}) ->
-    ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
+    ?LOG(debug, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
     ExpireTimer = start_timer(Interval, expired),
     State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
     hibernate(emit_stats(State1));

+ 2 - 2
src/emqttd_sm.erl

@@ -62,14 +62,14 @@
 
 mnesia(boot) ->
     %% Global Session Table
-    ok = emqttd_mnesia:create_table(mqtt_session, [
+    ok = ekka_mnesia:create_table(mqtt_session, [
                 {type, set},
                 {ram_copies, [node()]},
                 {record_name, mqtt_session},
                 {attributes, record_info(fields, mqtt_session)}]);
 
 mnesia(copy) ->
-    ok = emqttd_mnesia:copy_table(mqtt_session).
+    ok = ekka_mnesia:copy_table(mqtt_session).
 
 %%--------------------------------------------------------------------
 %% API

+ 7 - 8
src/emqttd_sm_helper.erl

@@ -34,7 +34,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {stats_fun, tick_tref}).
+-record(state, {stats_fun, ticker}).
 
 %% @doc Start a session helper
 -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}).
@@ -42,9 +42,9 @@ start_link(StatsFun) ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
 
 init([StatsFun]) ->
-    mnesia:subscribe(system),
+    ekka:monitor(membership),
     {ok, TRef} = timer:send_interval(timer:seconds(1), tick),
-    {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}.
+    {ok, #state{stats_fun = StatsFun, ticker = TRef}}.
 
 handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
@@ -52,8 +52,7 @@ handle_call(Req, _From, State) ->
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
 
-handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
-    lager:error("!!!Mnesia node down: ~s", [Node]),
+handle_info({membership, {mnesia, down, Node}}, State) ->
     Fun = fun() ->
             ClientIds =
             mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'},
@@ -63,7 +62,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
     mnesia:async_dirty(Fun),
     {noreply, State};
 
-handle_info({mnesia_system_event, {mnesia_up, _Node}}, State) ->
+handle_info({membership, _Event}, State) ->
     {noreply, State};
 
 handle_info(tick, State) ->
@@ -72,9 +71,9 @@ handle_info(tick, State) ->
 handle_info(Info, State) ->
     ?UNEXPECTED_INFO(Info, State).
 
-terminate(_Reason, _State = #state{tick_tref = TRef}) ->
+terminate(_Reason, _State = #state{ticker = TRef}) ->
     timer:cancel(TRef),
-    mnesia:unsubscribe(system).
+    ekka:unmonitor(membership).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.

+ 4 - 4
src/emqttd_trie.erl

@@ -41,21 +41,21 @@
 -spec(mnesia(boot | copy) -> ok).
 mnesia(boot) ->
     %% Trie Table
-    ok = emqttd_mnesia:create_table(mqtt_trie, [
+    ok = ekka_mnesia:create_table(mqtt_trie, [
                 {ram_copies, [node()]},
                 {record_name, trie},
                 {attributes, record_info(fields, trie)}]),
     %% Trie Node Table
-    ok = emqttd_mnesia:create_table(mqtt_trie_node, [
+    ok = ekka_mnesia:create_table(mqtt_trie_node, [
                 {ram_copies, [node()]},
                 {record_name, trie_node},
                 {attributes, record_info(fields, trie_node)}]);
 
 mnesia(copy) ->
     %% Copy Trie Table
-    ok = emqttd_mnesia:copy_table(mqtt_trie),
+    ok = ekka_mnesia:copy_table(mqtt_trie),
     %% Copy Trie Node Table
-    ok = emqttd_mnesia:copy_table(mqtt_trie_node).
+    ok = ekka_mnesia:copy_table(mqtt_trie_node).
 
 %%--------------------------------------------------------------------
 %% Trie API

+ 1 - 1
src/emqttd_ws.erl

@@ -39,7 +39,7 @@ handle_request(Req) ->
 %% MQTT Over WebSocket
 %%--------------------------------------------------------------------
 handle_request('GET', "/mqtt", Req) ->
-    lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
+    lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
     Upgrade = Req:get_header_value("Upgrade"),
     Proto   = check_protocol_header(Req),
     case {is_websocket(Upgrade), Proto} of

+ 0 - 112
test/emqttd_SUITE.erl

@@ -45,7 +45,6 @@ all() ->
      {group, stats},
      {group, hook},
      {group, http},
-     {group, cluster},
      {group, alarms},
      {group, cli},
      {group, cleanSession}].
@@ -81,14 +80,6 @@ groups() ->
       request_publish
      % websocket_test
      ]},
-    {cluster, [sequence],
-     [cluster_test,
-      cluster_join,
-      cluster_leave,
-      cluster_remove,
-      cluster_remove2,
-      cluster_node_down
-     ]},
      {alarms, [sequence], 
      [set_alarms]
      },
@@ -469,79 +460,6 @@ websocket_test(_) ->
 
     ct:log("Req:~p", [Req]),
     emqttd_http:handle_request(Req).
-%%--------------------------------------------------------------------
-%% cluster group
-%%--------------------------------------------------------------------
-cluster_test(_Config) ->
-    Z = slave(emqttd, cluster_test_z),
-    wait_running(Z),
-    true = emqttd:is_running(Z),
-    Node = node(),
-    ok = rpc:call(Z, emqttd_cluster, join, [Node]),
-    [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)),
-    ct:log("Z:~p, Node:~p", [Z, Node]),
-    ok = rpc:call(Z, emqttd_cluster, leave, []),
-    [Node] = lists:sort(mnesia:system_info(running_db_nodes)),
-    ok = slave:stop(Z).
-
-cluster_join(_) ->
-    Z = slave(emqttd, cluster_join_z),
-    N = slave(node, cluster_join_n),
-    wait_running(Z),
-    true = emqttd:is_running(Z),
-    Node = node(),
-    {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node),
-    {error, {node_not_running, N}} = emqttd_cluster:join(N),
-    ok = emqttd_cluster:join(Z),
-    slave:stop(Z),
-    slave:stop(N).
- 
-cluster_leave(_) ->
-    Z = slave(emqttd, cluster_leave_z),
-    wait_running(Z),
-    {error, node_not_in_cluster} = emqttd_cluster:leave(),
-    ok = emqttd_cluster:join(Z),
-    Node = node(),
-    [Z, Node] = emqttd_mnesia:running_nodes(),
-    ok = emqttd_cluster:leave(),
-    [Node] = emqttd_mnesia:running_nodes(),
-    slave:stop(Z).
-
-cluster_remove(_) ->
-    Z = slave(emqttd, cluster_remove_z),
-    wait_running(Z),
-    Node = node(),
-    {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node),
-    ok = emqttd_cluster:join(Z),
-    [Z, Node] = emqttd_mnesia:running_nodes(),
-    ok = emqttd_cluster:remove(Z),
-    [Node] = emqttd_mnesia:running_nodes(),
-    slave:stop(Z).
-
-cluster_remove2(_) ->
-    Z = slave(emqttd, cluster_remove2_z),
-    wait_running(Z),
-    ok = emqttd_cluster:join(Z),
-    Node = node(),
-    [Z, Node] = emqttd_mnesia:running_nodes(),
-    ok = emqttd_cluster:remove(Z),
-    ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []),
-    [Node] = emqttd_mnesia:running_nodes(),
-    slave:stop(Z).
-
-cluster_node_down(_) ->
-    Z = slave(emqttd, cluster_node_down),
-    timer:sleep(1000),
-    wait_running(Z),
-    ok = emqttd_cluster:join(Z),
-    ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]),
-    ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]),
-    Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)),
-    ct:log("Routes: ~p~n", [Routes]),
-    [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes],
-    slave:stop(Z),
-    timer:sleep(1000),
-    [] = lists:sort(emqttd_router:match(<<"a/b/c">>)).
 
 set_alarms(_) ->
     AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
@@ -551,8 +469,6 @@ set_alarms(_) ->
     emqttd_alarm:clear_alarm(<<"1">>),
     [] = emqttd_alarm:get_alarms().
 
-
-
 %%--------------------------------------------------------------------
 %% Cli group
 %%--------------------------------------------------------------------
@@ -705,34 +621,6 @@ cleanSession_validate1(_) ->
     emqttc:disconnect(Pub),
     emqttc:disconnect(C11).
 
-
-ensure_ok(ok) -> ok;
-ensure_ok({error, {already_started, _}}) -> ok.
-
-host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
-
-wait_running(Node) ->
-    wait_running(Node, 30000).
-
-wait_running(Node, Timeout) when Timeout < 0 ->
-    throw({wait_timeout, Node});
-
-wait_running(Node, Timeout) ->
-    case rpc:call(Node, emqttd, is_running, [Node]) of
-        true  -> ok;
-        false -> timer:sleep(100),
-                 wait_running(Node, Timeout - 100)
-    end.
-
-slave(emqttd, Node) ->
-    {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
-    rpc:call(Emq, application, ensure_all_started, [emqttd]),
-    Emq;
-
-slave(node, Node) ->
-    {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
-    N.
-
 emqttd_config(DataDir) ->
     Schema = cuttlefish_schema:files([filename:join([DataDir, "emqttd.schema"])]),
     Conf = conf_parse:file(filename:join([DataDir, "emqttd.conf"])),