Просмотр исходного кода

feat: add emqx_conf_schema i18n conf

Zhongwen Deng 3 лет назад
Родитель
Сommit
05b4ed58e0

Разница между файлами не показана из-за своего большого размера
+ 1494 - 1352
apps/emqx_conf/i18n/emqx_conf_schema.conf


+ 70 - 166
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -105,28 +105,27 @@ fields("cluster") ->
        sc(atom(),
           #{ mapping => "ekka.cluster_name"
            , default => emqxcl
-           , desc => "Human-friendly name of the EMQX cluster."
+           , desc => ?DESC(cluster_name)
            , 'readOnly' => true
            })}
     , {"discovery_strategy",
        sc(hoconsc:enum([manual, static, mcast, dns, etcd, k8s]),
           #{ default => manual
-           , desc => "Service discovery method for the cluster nodes."
+           , desc => ?DESC(cluster_discovery_strategy)
            , 'readOnly' => true
            })}
     , {"autoclean",
        sc(emqx_schema:duration(),
           #{ mapping => "ekka.cluster_autoclean"
            , default => "5m"
-           , desc => "Remove disconnected nodes from the cluster after this interval."
+           , desc => ?DESC(cluster_autoclean)
            , 'readOnly' => true
            })}
     , {"autoheal",
        sc(boolean(),
           #{ mapping => "ekka.cluster_autoheal"
            , default => true
-           , desc => "If <code>true</code>, the node will try to heal network partitions
- automatically."
+           , desc => ?DESC(cluster_autoheal)
            , 'readOnly' => true
           })}
     , {"proto_dist",
@@ -134,7 +133,7 @@ fields("cluster") ->
           #{ mapping => "ekka.proto_dist"
            , default => inet_tcp
            , 'readOnly' => true
-           , desc => "The Erlang distribution protocol for the cluster."
+           , desc => ?DESC(cluster_proto_dist)
           })}
     , {"static",
        sc(ref(cluster_static),
@@ -162,7 +161,7 @@ fields(cluster_static) ->
     [ {"seeds",
       sc(hoconsc:array(atom()),
          #{ default => []
-          , desc => "List EMQX node names in the static cluster. See <code>node.name</code>."
+          , desc => ?DESC(cluster_static_seeds)
           , 'readOnly' => true
          })}
     ];
@@ -171,50 +170,49 @@ fields(cluster_mcast) ->
     [ {"addr",
        sc(string(),
           #{ default => "239.192.0.1"
-           , desc => "Multicast IPv4 address."
+           , desc => ?DESC(cluster_mcast_addr)
            , 'readOnly' => true
           })}
     , {"ports",
        sc(hoconsc:array(integer()),
           #{ default => [4369, 4370]
            , 'readOnly' => true
-           , desc => "List of UDP ports used for service discovery.<br/>
-Note: probe messages are broadcast to all the specified ports."
+           , desc => ?DESC(cluster_mcast_ports)
            })}
     , {"iface",
        sc(string(),
           #{ default => "0.0.0.0"
-           , desc => "Local IP address the node discovery service needs to bind to."
+           , desc => ?DESC(cluster_mcast_iface)
            , 'readOnly' => true
           })}
     , {"ttl",
        sc(range(0, 255),
           #{ default => 255
-           , desc => "Time-to-live (TTL) for the outgoing UDP datagrams."
+           , desc => ?DESC(cluster_mcast_ttl)
            , 'readOnly' => true
           })}
     , {"loop",
        sc(boolean(),
           #{ default => true
-           , desc => "If <code>true</code>, loop UDP datagrams back to the local socket."
+           , desc => ?DESC(cluster_mcast_loop)
            , 'readOnly' => true
           })}
     , {"sndbuf",
        sc(emqx_schema:bytesize(),
           #{ default => "16KB"
-           , desc => "Size of the kernel-level buffer for outgoing datagrams."
+           , desc => ?DESC(cluster_mcast_sndbuf)
            , 'readOnly' => true
           })}
     , {"recbuf",
        sc(emqx_schema:bytesize(),
           #{ default => "16KB"
-           , desc => "Size of the kernel-level buffer for incoming datagrams."
+           , desc => ?DESC(cluster_mcast_recbuf)
            , 'readOnly' => true
           })}
     , {"buffer",
        sc(emqx_schema:bytesize(),
           #{ default =>"32KB"
-           , desc => "Size of the user-level buffer."
+           , desc => ?DESC(cluster_mcast_buffer)
            , 'readOnly' => true
           })}
     ];
@@ -223,13 +221,13 @@ fields(cluster_dns) ->
     [ {"name",
        sc(string(),
           #{ default => "localhost"
-           , desc => "The domain name of the EMQX cluster."
+           , desc => ?DESC(cluster_dns_name)
            , 'readOnly' => true
           })}
     , {"app",
        sc(string(),
           #{ default => "emqx"
-           , desc => "The symbolic name of the EMQX service."
+           , desc => ?DESC(cluster_dns_app)
            , 'readOnly' => true
            })}
     ];
@@ -237,25 +235,24 @@ fields(cluster_dns) ->
 fields(cluster_etcd) ->
     [ {"server",
        sc(emqx_schema:comma_separated_list(),
-          #{ desc => "List of endpoint URLs of the etcd cluster"
+          #{ desc => ?DESC(cluster_etcd_server)
            , 'readOnly' => true
            })}
     , {"prefix",
        sc(string(),
           #{ default => "emqxcl"
-           , desc => "Key prefix used for EMQX service discovery."
+           , desc => ?DESC(cluster_etcd_prefix)
            , 'readOnly' => true
            })}
     , {"node_ttl",
        sc(emqx_schema:duration(),
           #{ default => "1m"
            , 'readOnly' => true
-           , desc => "Expiration time of the etcd key associated with the node.
-It is refreshed automatically, as long as the node is alive."
+           , desc => ?DESC(cluster_etcd_node_ttl)
            })}
     , {"ssl",
        sc(hoconsc:ref(emqx_schema, ssl_client_opts),
-          #{ desc => "Options for the TLS connection to the etcd cluster."
+          #{ desc => ?DESC(cluster_etcd_ssl)
            , 'readOnly' => true
            })}
     ];
@@ -263,42 +260,37 @@ It is refreshed automatically, as long as the node is alive."
 fields(cluster_k8s) ->
     [ {"apiserver",
        sc(string(),
-          #{ desc => "Kubernetes API endpoint URL."
+          #{ desc => ?DESC(cluster_k8s_apiserver)
            , 'readOnly' => true
            })}
     , {"service_name",
        sc(string(),
           #{ default => "emqx"
-           , desc => "EMQX broker service name."
+           , desc => ?DESC(cluster_k8s_service_name)
            , 'readOnly' => true
            })}
     , {"address_type",
        sc(hoconsc:enum([ip, dns, hostname]),
-          #{ desc => "Address type used for connecting to the discovered nodes."
+          #{ desc => ?DESC(cluster_k8s_address_type)
            , 'readOnly' => true
            })}
     , {"app_name",
        sc(string(),
           #{ default => "emqx"
            , 'readOnly' => true
-           , desc => "This parameter should be set to the part of the <code>node.name</code>
-before the '@'.<br/>
-For example, if the <code>node.name</code> is <code>emqx@127.0.0.1</code>, then this parameter
-should be set to <code>emqx</code>."
+           , desc => ?DESC(cluster_k8s_app_name)
            })}
     , {"namespace",
        sc(string(),
           #{ default => "default"
-           , desc => "Kubernetes namespace."
+           , desc => ?DESC(cluster_k8s_namespace)
            , 'readOnly' => true
            })}
     , {"suffix",
        sc(string(),
           #{ default => "pod.local"
            , 'readOnly' => true
-           , desc => "Node name suffix.<br/>
-Note: this parameter is only relevant when <code>address_type</code> is <code>dns</code>
-or <code>hostname</code>."
+           , desc => ?DESC(cluster_k8s_suffix)
            })}
     ];
 
@@ -307,8 +299,7 @@ fields("node") ->
        sc(string(),
           #{ default => "emqx@127.0.0.1"
            , 'readOnly' => true
-           ,  desc => "Unique name of the EMQX node. It must follow <code>%name%@FQDN</code> or
- <code>%name%@IPv4</code> format."
+           ,  desc => ?DESC(node_name)
            })}
     , {"cookie",
        sc(string(),
@@ -316,65 +307,47 @@ fields("node") ->
              default => "emqxsecretcookie",
              'readOnly' => true,
              sensitive => true,
-             desc => "Secret cookie is a random string that should be the same on all nodes in
- the given EMQX cluster, but unique per EMQX cluster. It is used to prevent EMQX nodes that
- belong to different clusters from accidentally connecting to each other."
+             desc => ?DESC(node_cookie)
            })}
     , {"data_dir",
        sc(string(),
           #{ required => true,
              'readOnly' => true,
              mapping => "emqx.data_dir",
-             desc =>
-"""
-Path to the persistent data directory.
-Possible auto-created subdirectories are:
-  - `mnesia/\<node_name>`: EMQX's built-in database directory.
-    For example, `mnesia/emqx@127.0.0.1`.
-    There should be only one such subdirectory.
-    Meaning, in case the node is to be renamed (to e.g. `emqx@10.0.1.1`),
-    the old dir should be deleted first.
-  - `configs`: Generated configs at boot time, and cluster/local override configs.
-  - `patches`: Hot-patch beam files are to be placed here.
-  - `trace`: Trace log files.
-
-**NOTE**: One data dir cannot be shared by two or more EMQX nodes.
-"""
+             desc => ?DESC(node_data_dir)
            })}
     , {"config_files",
        sc(list(string()),
           #{ mapping => "emqx.config_files"
            , default => undefined
            , 'readOnly' => true
-           , desc => "List of configuration files that are read during startup. The order is
- significant: later configuration files override the previous ones."
+           , desc => ?DESC(node_config_files)
            })}
     , {"global_gc_interval",
        sc(emqx_schema:duration(),
          #{ mapping => "emqx_machine.global_gc_interval"
           , default => "15m"
-          , desc => "Periodic garbage collection interval."
+          , desc => ?DESC(node_global_gc_interval)
           , 'readOnly' => true
           })}
     , {"crash_dump_file",
        sc(file(),
           #{ mapping => "vm_args.-env ERL_CRASH_DUMP"
-           , desc => "Location of the crash dump file"
+           , desc => ?DESC(node_crash_dump_file)
            , 'readOnly' => true
            })}
     , {"crash_dump_seconds",
        sc(emqx_schema:duration_s(),
           #{ mapping => "vm_args.-env ERL_CRASH_DUMP_SECONDS"
            , default => "30s"
-           , desc => "The number of seconds that the broker is allowed to spend writing
-a crash dump"
+           , desc => ?DESC(node_crash_dump_seconds)
            , 'readOnly' => true
            })}
     , {"crash_dump_bytes",
        sc(emqx_schema:bytesize(),
           #{ mapping => "vm_args.-env ERL_CRASH_DUMP_BYTES"
            , default => "100MB"
-           , desc => "The maximum size of a crash dump file in bytes."
+           , desc => ?DESC(node_crash_dump_bytes)
            , 'readOnly' => true
            })}
     , {"dist_net_ticktime",
@@ -382,28 +355,25 @@ a crash dump"
           #{ mapping => "vm_args.-kernel net_ticktime"
            , default => "2m"
            , 'readOnly' => true
-           , desc => "This is the approximate time an EMQX node may be unresponsive "
-                     "until it is considered down and thereby disconnected."
+           , desc => ?DESC(node_dist_net_ticktime)
            })}
     , {"backtrace_depth",
        sc(integer(),
           #{ mapping => "emqx_machine.backtrace_depth"
            , default => 23
            , 'readOnly' => true
-           , desc => "Maximum depth of the call stack printed in error messages and
- <code>process_info</code>."
+           , desc => ?DESC(node_backtrace_depth)
            })}
     , {"applications",
        sc(emqx_schema:comma_separated_atoms(),
           #{ mapping => "emqx_machine.applications"
            , default => []
            , 'readOnly' => true
-           , desc => "List of Erlang applications that shall be rebooted when the EMQX broker joins
- the cluster."
+           , desc => ?DESC(node_applications)
            })}
     , {"etc_dir",
        sc(string(),
-          #{ desc => "<code>etc</code> dir for the node"
+          #{ desc => ?DESC(node_etc_dir)
            , 'readOnly' => true
            }
          )}
@@ -420,81 +390,45 @@ fields("db") ->
           #{ mapping => "mria.db_backend"
            , default => rlog
            , 'readOnly' => true
-           , desc => """
-Select the backend for the embedded database.<br/>
-<code>rlog</code> is the default backend,
-that is suitable for very large clusters.<br/>
-<code>mnesia</code> is a backend that offers decent performance in small clusters.
-"""
+           , desc => ?DESC(db_backend)
            })}
     , {"role",
        sc(hoconsc:enum([core, replicant]),
           #{ mapping => "mria.node_role"
            , default => core
            , 'readOnly' => true
-           , desc => """
-Select a node role.<br/>
-<code>core</code> nodes provide durability of the data, and take care of writes.
-It is recommended to place core nodes in different racks or different availability zones.<br/>
-<code>replicant</code> nodes are ephemeral worker nodes. Removing them from the cluster
-doesn't affect database redundancy<br/>
-It is recommended to have more replicant nodes than core nodes.<br/>
-Note: this parameter only takes effect when the <code>backend</code> is set
-to <code>rlog</code>.
-"""
+           , desc => ?DESC(db_role)
            })}
     , {"core_nodes",
        sc(emqx_schema:comma_separated_atoms(),
           #{ mapping => "mria.core_nodes"
            , default => []
            , 'readOnly' => true
-           , desc => """
-List of core nodes that the replicant will connect to.<br/>
-Note: this parameter only takes effect when the <code>backend</code> is set
-to <code>rlog</code> and the <code>role</code> is set to <code>replicant</code>.<br/>
-This value needs to be defined for manual or static cluster discovery mechanisms.<br/>
-If an automatic cluster discovery mechanism is being used (such as <code>etcd</code>),
-there is no need to set this value.
-"""
+           , desc => ?DESC(db_core_nodes)
            })}
     , {"rpc_module",
        sc(hoconsc:enum([gen_rpc, rpc]),
           #{ mapping => "mria.rlog_rpc_module"
            , default => gen_rpc
            , 'readOnly' => true
-           , desc => """
-Protocol used for pushing transaction logs to the replicant nodes.
-"""
+           , desc => ?DESC(db_rpc_module)
            })}
     , {"tlog_push_mode",
        sc(hoconsc:enum([sync, async]),
           #{ mapping => "mria.tlog_push_mode"
            , default => async
            , 'readOnly' => true
-           , desc => """
-In sync mode the core node waits for an ack from the replicant nodes before sending the next
-transaction log entry.
-"""
+           , desc => ?DESC(db_tlog_push_mode)
            })}
     , {"default_shard_transport",
        sc(hoconsc:enum([gen_rpc, distr]),
           #{ mapping => "mria.shard_transport"
            , default => gen_rpc
-           , desc =>
-               "Defines the default transport for pushing transaction logs.<br/>"
-               "This may be overridden on a per-shard basis in <code>db.shard_transports</code>."
-               "<code>gen_rpc</code> uses the <code>gen_rpc</code> library, "
-               "<code>distr</code> uses the Erlang distribution.<br/>"
+           , desc => ?DESC(db_default_shard_transport)
            })}
     , {"shard_transports",
        sc(map(shard, hoconsc:enum([gen_rpc, distr])),
-          #{ desc =>
-               "Allows to tune the transport method used for transaction log replication, "
-               "on a per-shard basis.<br/>"
-               "<code>gen_rpc</code> uses the <code>gen_rpc</code> library, "
-               "<code>distr</code> uses the Erlang distribution.<br/>"
-               "If not specified, the default is to use the value "
-               "set in <code>db.default_shard_transport</code>."
+          #{ desc => ?DESC(db_shard_transports)
            , mapping => "emqx_machine.custom_shard_transports"
            , default => #{}
            })}
@@ -503,19 +437,17 @@ transaction log entry.
 fields("cluster_call") ->
     [ {"retry_interval",
        sc(emqx_schema:duration(),
-         #{ desc => "Time interval to retry after a failed call."
+         #{ desc => ?DESC(cluster_call_retry_interval)
           , default => "1s"
           })}
     , {"max_history",
        sc(range(1, 500),
-          #{ desc => "Retain the maximum number of completed transactions (for queries)."
+          #{ desc => ?DESC(cluster_call_max_history)
            , default => 100
            })}
     , {"cleanup_interval",
        sc(emqx_schema:duration(),
-          #{ desc =>
-"Time interval to clear completed but stale transactions.
-Ensure that the number of completed transactions is less than the <code>max_history</code>."
+          #{ desc => ?DESC(cluster_call_cleanup_interval)
            , default => "5m"
            })}
     ];
@@ -524,136 +456,118 @@ fields("rpc") ->
     [ {"mode",
        sc(hoconsc:enum([sync, async]),
           #{ default => async
-           , desc => "In <code>sync</code> mode the sending side waits for the ack from the "
-                     "receiving side."
+           , desc => ?DESC(rpc_mode)
            })}
     , {"driver",
        sc(hoconsc:enum([tcp, ssl]),
           #{ mapping => "gen_rpc.driver"
            , default => tcp
-           , desc => "Transport protocol used for inter-broker communication"
+           , desc => ?DESC(rpc_driver)
            })}
     , {"async_batch_size",
        sc(integer(),
           #{ mapping => "gen_rpc.max_batch_size"
            , default => 256
-           , desc => "The maximum number of batch messages sent in asynchronous mode. "
-                     "Note that this configuration does not work in synchronous mode."
+           , desc => ?DESC(rpc_async_batch_size)
            })}
     , {"port_discovery",
        sc(hoconsc:enum([manual, stateless]),
           #{ mapping => "gen_rpc.port_discovery"
            , default => stateless
-           , desc => "<code>manual</code>: discover ports by <code>tcp_server_port</code>.<br/>"
-                     "<code>stateless</code>: discover ports in a stateless manner, "
-                     "using the following algorithm. "
-                     "If node name is <code>emqxN@127.0.0.1</code>, where the N is an integer, "
-                     "then the listening port will be 5370 + N."
+           , desc => ?DESC(rpc_port_discovery)
            })}
     , {"tcp_server_port",
        sc(integer(),
           #{ mapping => "gen_rpc.tcp_server_port"
            , default => 5369
-           , desc => "Listening port used by RPC local service.<br/> "
-                     "Note that this config only takes effect when rpc.port_discovery "
-                     "is set to manual."
+           , desc => ?DESC(rpc_tcp_server_port)
            })}
     , {"ssl_server_port",
        sc(integer(),
           #{ mapping => "gen_rpc.ssl_server_port"
            , default => 5369
-           , desc => "Listening port used by RPC local service.<br/> "
-                     "Note that this config only takes effect when rpc.port_discovery "
-                     "is set to manual and <code>driver</code> is set to <code>ssl</code>."
+           , desc => ?DESC(rpc_ssl_server_port)
            })}
     , {"tcp_client_num",
        sc(range(1, 256),
           #{ default => 10
-           , desc => "Set the maximum number of RPC communication channels initiated by this node "
-                     "to each remote node."
+           , desc => ?DESC(rpc_tcp_client_num)
            })}
     , {"connect_timeout",
        sc(emqx_schema:duration(),
           #{ mapping => "gen_rpc.connect_timeout"
            , default => "5s"
-           , desc => "Timeout for establishing an RPC connection."
+           , desc => ?DESC(rpc_connect_timeout)
            })}
     , {"certfile",
        sc(file(),
           #{ mapping => "gen_rpc.certfile"
-           , desc => "Path to TLS certificate file used to validate identity of the cluster nodes. "
-                     "Note that this config only takes effect when <code>rpc.driver</code> "
-                     "is set to <code>ssl</code>."
+           , desc => ?DESC(rpc_certfile)
            })}
     , {"keyfile",
        sc(file(),
           #{ mapping => "gen_rpc.keyfile"
-           , desc => "Path to the private key file for the <code>rpc.certfile</code>.<br/>"
-                     "Note: contents of this file are secret, so "
-                     "it's necessary to set permissions to 600."
+           , desc => ?DESC(rpc_keyfile)
            })}
     , {"cacertfile",
        sc(file(),
           #{ mapping => "gen_rpc.cacertfile"
-           , desc => "Path to certification authority TLS certificate file used to validate "
-                     "<code>rpc.certfile</code>.<br/>"
-                     "Note: certificates of all nodes in the cluster must be signed by the same CA."
+           , desc => ?DESC(rpc_cacertfile)
            })}
     , {"send_timeout",
        sc(emqx_schema:duration(),
           #{ mapping => "gen_rpc.send_timeout"
            , default => "5s"
-           , desc => "Timeout for sending the RPC request."
+           , desc => ?DESC(rpc_send_timeout)
            })}
     , {"authentication_timeout",
        sc(emqx_schema:duration(),
           #{ mapping=> "gen_rpc.authentication_timeout"
            , default => "5s"
-           , desc => "Timeout for the remote node authentication."
+           , desc => ?DESC(rpc_authentication_timeout)
            })}
     , {"call_receive_timeout",
        sc(emqx_schema:duration(),
           #{ mapping => "gen_rpc.call_receive_timeout"
            , default => "15s"
-           , desc => "Timeout for the reply to a synchronous RPC."
+           , desc => ?DESC(rpc_call_receive_timeout)
            })}
     , {"socket_keepalive_idle",
        sc(emqx_schema:duration_s(),
           #{ mapping => "gen_rpc.socket_keepalive_idle"
            , default => "7200s"
-           , desc => "How long the connections between the brokers "
-                     "should remain open after the last message is sent."
+              , desc => ?DESC(rpc_socket_keepalive_idle)
+
            })}
     , {"socket_keepalive_interval",
        sc(emqx_schema:duration_s(),
           #{ mapping => "gen_rpc.socket_keepalive_interval"
            , default => "75s"
-           , desc => "The interval between keepalive messages."
+           , desc => ?DESC(rpc_socket_keepalive_interval)
            })}
     , {"socket_keepalive_count",
        sc(integer(),
           #{ mapping => "gen_rpc.socket_keepalive_count"
            , default => 9
-           , desc => "How many times the keepalive probe message can fail to receive a reply "
-                     "until the RPC connection is considered lost."
+           , desc =>           ?DESC(rpc_socket_keepalive_count)
            })}
     , {"socket_sndbuf",
        sc(emqx_schema:bytesize(),
           #{ mapping => "gen_rpc.socket_sndbuf"
            , default => "1MB"
-           , desc => "TCP tuning parameters. TCP sending buffer size."
+           , desc => ?DESC(rpc_socket_sndbuf)
            })}
     , {"socket_recbuf",
        sc(emqx_schema:bytesize(),
           #{ mapping => "gen_rpc.socket_recbuf"
            , default => "1MB"
-           , desc => "TCP tuning parameters. TCP receiving buffer size."
+           , desc => ?DESC(rpc_socket_recbuf)
            })}
     , {"socket_buffer",
        sc(emqx_schema:bytesize(),
           #{ mapping => "gen_rpc.socket_buffer"
            , default => "1MB"
-           , desc => "TCP tuning parameters. Socket buffer size in user mode."
+           , desc => ?DESC(rpc_socket_buffer)
            })}
     ];
 
@@ -1094,15 +1008,5 @@ emqx_schema_high_prio_roots() ->
     Roots = emqx_schema:roots(high),
     Authz = {"authorization",
              sc(hoconsc:ref(?MODULE, "authorization"),
-             #{ desc => """
-Authorization a.k.a. ACL.<br>
-In EMQX, MQTT client access control is extremely flexible.<br>
-An out-of-the-box set of authorization data sources are supported.
-For example,<br>
-'file' source is to support concise and yet generic ACL rules in a file;<br>
-'built_in_database' source can be used to store per-client customizable rule sets,
-natively in the EMQX node;<br>
-'http' source to make EMQX call an external HTTP API to make the decision;<br>
-'PostgreSQL' etc. to look up clients or rules from external databases;<br>
-""" })},
+             #{ desc => ?DESC(authorization)})},
     lists:keyreplace("authorization", 1, Roots, Authz).

+ 15 - 9
apps/emqx_dashboard/src/emqx_dashboard_monitor.erl

@@ -313,7 +313,7 @@ next_interval() ->
 sample(Time) ->
     Fun =
         fun(Key, Res) ->
-            maps:put(Key, value(Key), Res)
+            maps:put(Key, getstats(Key), Res)
         end,
     Data = lists:foldl(Fun, #{}, ?SAMPLER_LIST),
     #emqx_monit{time = Time, data = Data}.
@@ -362,11 +362,17 @@ count_map(M1, M2) ->
         end,
     lists:foldl(Fun, #{}, ?SAMPLER_LIST).
 
-value(connections) -> emqx_stats:getstat('connections.count');
-value(topics) -> emqx_stats:getstat('topics.count');
-value(subscriptions) -> emqx_stats:getstat('subscriptions.count');
-value(received) -> emqx_metrics:val('messages.received');
-value(received_bytes) -> emqx_metrics:val('bytes.received');
-value(sent) -> emqx_metrics:val('messages.sent');
-value(sent_bytes) -> emqx_metrics:val('bytes.sent');
-value(dropped) -> emqx_metrics:val('messages.dropped').
+getstats(Key) ->
+    %% Stats ets maybe not exist when ekka join.
+    try stats(Key)
+    catch _: _ -> 0
+    end.
+
+stats(connections) -> emqx_stats:getstat('connections.count');
+stats(topics) -> emqx_stats:getstat('topics.count');
+stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
+stats(received) -> emqx_metrics:val('messages.received');
+stats(received_bytes) -> emqx_metrics:val('bytes.received');
+stats(sent) -> emqx_metrics:val('messages.sent');
+stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
+stats(dropped) -> emqx_metrics:val('messages.dropped').