Просмотр исходного кода

refactor(config): split config and schema

Zaiming Shi 4 лет назад
Родитель
Сommit
522d8e0a4a

+ 0 - 699
apps/emqx/etc/emqx.conf

@@ -1,702 +1,3 @@
-## master-88df1713
-
-## NOTE: The configurations in this file will be overridden by
-## `<path-to-emqx-installation>/data/emqx_overrides.conf`
-
-##==================================================================
-## Node
-##==================================================================
-node {
-  ## Node name.
-  ## See: http://erlang.org/doc/reference_manual/distributed.html
-  ##
-  ## @doc node.name
-  ## ValueType: NodeName
-  ## Default: emqx@127.0.0.1
-  name: "emqx@127.0.0.1"
-
-  ## Cookie for distributed node communication.
-  ##
-  ## @doc node.cookie
-  ## ValueType: String
-  ## Default: emqxsecretcookie
-  cookie: emqxsecretcookie
-
-  ## Data dir for the node
-  ##
-  ## @doc node.data_dir
-  ## ValueType: Folder
-  ## Default: "{{ platform_data_dir }}/"
-  data_dir: "{{ platform_data_dir }}/"
-
-  ## Dir of crash dump file.
-  ##
-  ## @doc node.crash_dump_dir
-  ## ValueType: Folder
-  ## Default: "{{ platform_log_dir }}/"
-  crash_dump_dir: "{{ platform_log_dir }}/"
-
-  ## Global GC Interval.
-  ##
-  ## @doc node.global_gc_interval
-  ## ValueType: Duration
-  ## Default: 15m
-  global_gc_interval: 15m
-
-  ## Sets the net_kernel tick time in seconds.
-  ## Notice that all communicating nodes are to have the same
-  ## TickTime value specified.
-  ##
-  ## See: http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
-  ##
-  ## @doc node.dist_net_ticktime
-  ## ValueType: Number
-  ## Default: 2m
-  dist_net_ticktime: 2m
-
-  ## Sets the port range for the listener socket of a distributed
-  ## Erlang node.
-  ## Note that if there are firewalls between clustered nodes, this
-  ## port segment for nodes’ communication should be allowed.
-  ##
-  ## See: http://www.erlang.org/doc/man/kernel_app.html
-  ##
-  ## @doc node.dist_listen_min
-  ## ValueType: Integer
-  ## Range: [1024,65535]
-  ## Default: 6369
-  dist_listen_min: 6369
-
-  ## Sets the port range for the listener socket of a distributed
-  ## Erlang node.
-  ## Note that if there are firewalls between clustered nodes, this
-  ## port segment for nodes’ communication should be allowed.
-  ##
-  ## See: http://www.erlang.org/doc/man/kernel_app.html
-  ##
-  ## @doc node.dist_listen_max
-  ## ValueType: Integer
-  ## Range: [1024,65535]
-  ## Default: 6369
-  dist_listen_max: 6369
-
-  ## Sets the maximum depth of call stack back-traces in the exit
-  ## reason element of 'EXIT' tuples.
-  ## The flag also limits the stacktrace depth returned by
-  ## process_info item current_stacktrace.
-  ##
-  ## @doc node.backtrace_depth
-  ## ValueType: Integer
-  ## Range: [0,1024]
-  ## Default: 23
-  backtrace_depth: 23
-
-}
-
-##==================================================================
-## Cluster
-##==================================================================
-cluster {
-  ## Cluster name.
-  ##
-  ## @doc cluster.name
-  ## ValueType: String
-  ## Default: emqxcl
-  name: emqxcl
-
-  ## Enable cluster autoheal from network partition.
-  ##
-  ## @doc cluster.autoheal
-  ## ValueType: Boolean
-  ## Default: true
-  autoheal: true
-
-  ## Autoclean down node. A down node will be removed from the cluster
-  ## if this value > 0.
-  ##
-  ## @doc cluster.autoclean
-  ## ValueType: Duration
-  ## Default: 5m
-  autoclean: 5m
-
-  ## Node discovery strategy to join the cluster.
-  ##
-  ## @doc cluster.discovery_strategy
-  ## ValueType: manual | static | mcast | dns | etcd | k8s
-  ##   - manual: Manual join command
-  ##   - static: Static node list
-  ##   - mcast:  IP Multicast
-  ##   - dns:    DNS A Record
-  ##   - etcd:   etcd
-  ##   - k8s:    Kubernetes
-  ##
-  ## Default: manual
-  discovery_strategy: manual
-
-  ##----------------------------------------------------------------
-  ## Cluster using static node list
-  ##----------------------------------------------------------------
-  static {
-    ## Node list of the cluster
-    ##
-    ## @doc cluster.static.seeds
-    ## ValueType: Array<NodeName>
-    ## Default: []
-    seeds: ["emqx1@127.0.0.1", "emqx2@127.0.0.1"]
-  }
-
-  ##----------------------------------------------------------------
-  ## Cluster using IP Multicast
-  ##----------------------------------------------------------------
-  mcast {
-    ## IP Multicast Address.
-    ##
-    ## @doc cluster.mcast.addr
-    ## ValueType: IPAddress
-    ## Default: "239.192.0.1"
-    addr: "239.192.0.1"
-
-    ## Multicast Ports.
-    ##
-    ## @doc cluster.mcast.ports
-    ## ValueType: Array<Port>
-    ## Default: [4369, 4370]
-    ports: [4369, 4370]
-
-    ## Multicast Iface.
-    ##
-    ## @doc cluster.mcast.iface
-    ## ValueType: IPAddress
-    ## Default: "0.0.0.0"
-    iface: "0.0.0.0"
-
-    ## Multicast Ttl.
-    ##
-    ## @doc cluster.mcast.ttl
-    ## ValueType: Integer
-    ## Range: [0,255]
-    ## Default: 255
-    ttl: 255
-
-    ## Multicast loop.
-    ##
-    ## @doc cluster.mcast.loop
-    ## ValueType: Boolean
-    ## Default: true
-    loop: true
-  }
-
-  ##----------------------------------------------------------------
-  ## Cluster using DNS A records
-  ##----------------------------------------------------------------
-  dns {
-    ## DNS name.
-    ##
-    ## @doc cluster.dns.name
-    ## ValueType: String
-    ## Default: localhost
-    name: localhost
-
-    ## The App name is used to build 'node.name' with IP address.
-    ##
-    ## @doc cluster.dns.app
-    ## ValueType: String
-    ## Default: emqx
-    app: emqx
-  }
-
-  ##----------------------------------------------------------------
-  ## Cluster using etcd
-  ##----------------------------------------------------------------
-  etcd {
-    ## Etcd server list, seperated by ','.
-    ##
-    ## @doc cluster.etcd.server
-    ## ValueType: URL
-    ## Required: true
-    server: "http://127.0.0.1:2379"
-
-    ## The prefix helps build nodes path in etcd. Each node in the cluster
-    ## will create a path in etcd: v2/keys/<prefix>/<name>/<node.name>
-    ##
-    ## @doc cluster.etcd.prefix
-    ## ValueType: String
-    ## Default: emqxcl
-    prefix: emqxcl
-
-    ## The TTL for node's path in etcd.
-    ##
-    ## @doc cluster.etcd.node_ttl
-    ## ValueType: Duration
-    ## Default: 1m
-    node_ttl: 1m
-
-    ## Path to the file containing the user's private PEM-encoded key.
-    ##
-    ## @doc cluster.etcd.ssl.keyfile
-    ## ValueType: File
-    ## Default: "{{ platform_etc_dir }}/certs/key.pem"
-    ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem"
-
-    ## Path to a file containing the user certificate.
-    ##
-    ## @doc cluster.etcd.ssl.certfile
-    ## ValueType: File
-    ## Default: "{{ platform_etc_dir }}/certs/cert.pem"
-    ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem"
-
-    ## Path to the file containing PEM-encoded CA certificates. The CA certificates
-    ## are used during server authentication and when building the client certificate chain.
-    ##
-    ## @doc cluster.etcd.ssl.cacertfile
-    ## ValueType: File
-    ## Default: "{{ platform_etc_dir }}/certs/cacert.pem"
-    ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
-  }
-
-  ##----------------------------------------------------------------
-  ## Cluster using Kubernetes
-  ##----------------------------------------------------------------
-  k8s {
-    ## Kubernetes API server list, seperated by ','.
-    ##
-    ## @doc cluster.k8s.apiserver
-    ## ValueType: URL
-    ## Required: true
-    apiserver: "http://10.110.111.204:8080"
-
-    ## The service name helps lookup EMQ nodes in the cluster.
-    ##
-    ## @doc cluster.k8s.service_name
-    ## ValueType: String
-    ## Default: emqx
-    service_name: emqx
-
-    ## The address type is used to extract host from k8s service.
-    ##
-    ## @doc cluster.k8s.address_type
-    ## ValueType: ip | dns | hostname
-    ## Default: ip
-    address_type: ip
-
-    ## The app name helps build 'node.name'.
-    ##
-    ## @doc cluster.k8s.app_name
-    ## ValueType: String
-    ## Default: emqx
-    app_name: emqx
-
-    ## The suffix added to dns and hostname get from k8s service
-    ##
-    ## @doc cluster.k8s.suffix
-    ## ValueType: String
-    ## Default: "pod.local"
-    suffix: "pod.local"
-
-    ## Kubernetes Namespace
-    ##
-    ## @doc cluster.k8s.namespace
-    ## ValueType: String
-    ## Default: default
-    namespace: default
-  }
-
-  db_backend: mnesia
-
-  rlog: {
-      # role: core
-      # core_nodes: []
-  }
-
-}
-
-##==================================================================
-## Log
-##==================================================================
-log {
-  ## The primary log level
-  ##
-  ## - all the log messages with levels lower than this level will
-  ##   be dropped.
-  ## - all the log messages with levels higher than this level will
-  ##   go into the log handlers. The handlers then decide to log it
-  ##   out or drop it according to the level setting of the handler.
-  ##
-  ## Note: Only the messages with severity level higher than or
-  ## equal to this level will be logged.
-  ##
-  ## @doc log.primary_level
-  ## ValueType: debug | info | notice | warning | error | critical | alert | emergency
-  ## Default: warning
-  primary_level: warning
-
-  ##----------------------------------------------------------------
-  ## The console log handler send log messages to emqx console
-  ##----------------------------------------------------------------
-  ## Log to single line
-  ## @doc log.console_handler.enable
-  ## ValueType: Boolean
-  ## Default: false
-  console_handler.enable: false
-
-  ## The log level of this handler
-  ## All the log messages with levels lower than this level will
-  ## be dropped.
-  ##
-  ## @doc log.console_handler.level
-  ## ValueType: debug | info | notice | warning | error | critical | alert | emergency
-  ## Default: warning
-  console_handler.level: warning
-
-  ##----------------------------------------------------------------
-  ## The file log handlers send log messages to files
-  ##----------------------------------------------------------------
-  ## file_handlers.<name>
-  file_handlers.emqx_log: {
-    ## The log level filter of this handler
-    ## All the log messages with levels lower than this level will
-    ## be dropped.
-    ##
-    ## @doc log.file_handlers.<name>.level
-    ## ValueType: debug | info | notice | warning | error | critical | alert | emergency
-    ## Default: warning
-    level: warning
-
-    ## The log file for specified level.
-    ##
-    ## If `rotation` is disabled, this is the file of the log files.
-    ##
-    ## If `rotation` is enabled, this is the base name of the files.
-    ## Each file in a rotated log is named <base_name>.N, where N is an integer.
-    ##
-    ## Note: Log files for a specific log level will only contain all the logs
-    ##       that higher than or equal to that level
-    ##
-    ## @doc log.file_handlers.<name>.file
-    ## ValueType: File
-    ## Required: true
-    file: "{{ platform_log_dir }}/emqx.log"
-
-    ## Enables the log rotation.
-    ## With this enabled, new log files will be created when the current
-    ## log file is full, max to `rotation_count` files will be created.
-    ##
-    ## @doc log.file_handlers.<name>.rotation.enable
-    ## ValueType: Boolean
-    ## Default: true
-    rotation.enable: true
-
-    ## Maximum rotation count of log files.
-    ##
-    ## @doc log.file_handlers.<name>.rotation.count
-    ## ValueType: Integer
-    ## Range: [1, 2048]
-    ## Default: 10
-    rotation.count: 10
-
-    ## Maximum size of each log file.
-    ##
-    ## If the max_size reached and `rotation` is disabled, the handler
-    ## will stop sending log messages, if the `rotation` is enabled,
-    ## the file rotates.
-    ##
-    ## @doc log.file_handlers.<name>.max_size
-    ## ValueType: Size | infinity
-    ## Default: 10MB
-    max_size: 10MB
-  }
-
-  ## file_handlers.<name>
-  ##
-  ## You could also create multiple file handlers for different
-  ## log level for example:
-  file_handlers.emqx_error_log: {
-    level: error
-    file: "{{ platform_log_dir }}/error.log"
-  }
-
-  ## Timezone offset to display in logs
-  ##
-  ## @doc log.time_offset
-  ## ValueType: system | utc | String
-  ##  - "system" use system zone
-  ##  - "utc" for Universal Coordinated Time (UTC)
-  ##  - "+hh:mm" or "-hh:mm" for a specified offset
-  ## Default: system
-  time_offset: system
-
-  ## Limits the total number of characters printed for each log event.
-  ##
-  ## @doc log.chars_limit
-  ## ValueType: Integer | infinity
-  ## Range: [0, infinity)
-  ## Default: infinity
-  chars_limit: infinity
-
-  ## Maximum depth for Erlang term log formatting
-  ## and Erlang process message queue inspection.
-  ##
-  ## @doc log.max_depth
-  ## ValueType: Integer | infinity
-  ## Default: 80
-  max_depth: 80
-
-  ## Log formatter
-  ## @doc log.formatter
-  ## ValueType: text | json
-  ## Default: text
-  formatter: text
-
-  ## Log to single line
-  ## @doc log.single_line
-  ## ValueType: Boolean
-  ## Default: true
-  single_line: true
-
-  ## The max allowed queue length before switching to sync mode.
-  ##
-  ## Log overload protection parameter. If the message queue grows
-  ## larger than this value the handler switches from anync to sync mode.
-  ##
-  ## @doc log.sync_mode_qlen
-  ## ValueType: Integer
-  ## Range: [0, ${log.drop_mode_qlen}]
-  ## Default: 100
-  sync_mode_qlen: 100
-
-  ## The max allowed queue length before switching to drop mode.
-  ##
-  ## Log overload protection parameter. When the message queue grows
-  ## larger than this threshold, the handler switches to a mode in which
-  ## it drops all new events that senders want to log.
-  ##
-  ## @doc log.drop_mode_qlen
-  ## ValueType: Integer
-  ## Range: [${log.sync_mode_qlen}, ${log.flush_qlen}]
-  ## Default: 3000
-  drop_mode_qlen: 3000
-
-  ## The max allowed queue length before switching to flush mode.
-  ##
-  ## Log overload protection parameter. If the length of the message queue
-  ## grows larger than this threshold, a flush (delete) operation takes place.
-  ## To flush events, the handler discards the messages in the message queue
-  ## by receiving them in a loop without logging.
-  ##
-  ## @doc log.flush_qlen
-  ## ValueType: Integer
-  ## Range: [${log.drop_mode_qlen}, infinity)
-  ## Default: 8000
-  flush_qlen: 8000
-
-  ## Kill the log handler when it gets overloaded.
-  ##
-  ## Log overload protection parameter. It is possible that a handler,
-  ## even if it can successfully manage peaks of high load without crashing,
-  ## can build up a large message queue, or use a large amount of memory.
-  ## We could kill the log handler in these cases and restart it after a
-  ## few seconds.
-  ##
-  ## @doc log.overload_kill.enable
-  ## ValueType: Boolean
-  ## Default: true
-  overload_kill.enable: true
-
-  ## The max allowed queue length before killing the log hanlder.
-  ##
-  ## Log overload protection parameter. This is the maximum allowed queue
-  ## length. If the message queue grows larger than this, the handler
-  ## process is terminated.
-  ##
-  ## @doc log.overload_kill.qlen
-  ## ValueType: Integer
-  ## Range: [0, 1048576]
-  ## Default: 20000
-  overload_kill.qlen: 20000
-
-  ## The max allowed memory size before killing the log hanlder.
-  ##
-  ## Log overload protection parameter. This is the maximum memory size
-  ## that the handler process is allowed to use. If the handler grows
-  ## larger than this, the process is terminated.
-  ##
-  ## @doc log.overload_kill.mem_size
-  ## ValueType: Size
-  ## Default: 30MB
-  overload_kill.mem_size: 30MB
-
-  ## Restart the log hanlder after some seconds.
-  ##
-  ## Log overload protection parameter. If the handler is terminated,
-  ## it restarts automatically after a delay specified in seconds.
-  ##
-  ## @doc log.overload_kill.restart_after
-  ## ValueType: Duration
-  ## Default: 5s
-  overload_kill.restart_after: 5s
-
-  ## Controlling Bursts of Log Requests.
-  ##
-  ## Log overload protection parameter. Large bursts of log events - many
-  ## events received by the handler under a short period of time - can
-  ## potentially cause problems. By specifying the maximum number of events
-  ## to be handled within a certain time frame, the handler can avoid
-  ## choking the log with massive amounts of printouts.
-  ##
-  ## Note that there would be no warning if any messages were
-  ## dropped because of burst control.
-  ##
-  ## @doc log.burst_limit.enable
-  ## ValueType: Boolean
-  ## Default: false
-  burst_limit.enable: false
-
-  ## This config controls the maximum number of events to handle within
-  ## a time frame. After the limit is reached, successive events are
-  ## dropped until the end of the time frame defined by `window_time`.
-  ##
-  ## @doc log.burst_limit.max_count
-  ## ValueType: Integer
-  ## Default: 10000
-  burst_limit.max_count: 10000
-
-  ## See the previous description of burst_limit_max_count.
-  ##
-  ## @doc log.burst_limit.window_time
-  ## ValueType: duration
-  ## Default: 1s
-  burst_limit.window_time: 1s
-}
-
-##==================================================================
-## RPC
-##==================================================================
-rpc {
-  ## RPC Mode.
-  ##
-  ## @doc rpc.mode
-  ## ValueType: sync | async
-  ## Default: async
-  mode: async
-
-  ## Max batch size of async RPC requests.
-  ##
-  ## NOTE: RPC batch won't work when rpc.mode = sync
-  ## Zero value disables rpc batching.
-  ##
-  ## @doc rpc.async_batch_size
-  ## ValueType: Integer
-  ## Range: [0, 1048576]
-  ## Default: 0
-  async_batch_size: 256
-
-  ## RPC port discovery
-  ##
-  ## The strategy for discovering the RPC listening port of
-  ## other nodes.
-  ##
-  ## @doc cluster.discovery_strategy
-  ## ValueType: manual | stateless
-  ##   - manual: discover ports by `tcp_server_port`.
-  ##   - stateless: discover ports in a stateless manner.
-  ##     If node name is `emqx<N>@127.0.0.1`, where the `<N>` is
-  ##     an integer, then the listening port will be `5370 + <N>`
-  ##
-  ## Default: `stateless`.
-  port_discovery: stateless
-
-  ## TCP server port for RPC.
-  ##
-  ## Only takes effect when `rpc.port_discovery` = `manual`.
-  ##
-  ## @doc rpc.tcp_server_port
-  ## ValueType: Integer
-  ## Range: [1024-65535]
-  ## Defaults: 5369
-  tcp_server_port: 5369
-
-  ## Number of outgoing RPC connections.
-  ##
-  ## Set this to 1 to keep the message order sent from the same
-  ## client.
-  ##
-  ## @doc rpc.tcp_client_num
-  ## ValueType: Integer
-  ## Range: [1, 256]
-  ## Defaults: 1
-  tcp_client_num: 1
-
-  ## RCP Client connect timeout.
-  ##
-  ## @doc rpc.connect_timeout
-  ## ValueType: Duration
-  ## Default: 5s
-  connect_timeout: 5s
-
-  ## TCP send timeout of RPC client and server.
-  ##
-  ## @doc rpc.send_timeout
-  ## ValueType: Duration
-  ## Default: 5s
-  send_timeout: 5s
-
-  ## Authentication timeout
-  ##
-  ## @doc rpc.authentication_timeout
-  ## ValueType: Duration
-  ## Default: 5s
-  authentication_timeout: 5s
-
-  ## Default receive timeout for call() functions
-  ##
-  ## @doc rpc.call_receive_timeout
-  ## ValueType: Duration
-  ## Default: 15s
-  call_receive_timeout: 15s
-
-  ## Socket idle keepalive.
-  ##
-  ## @doc rpc.socket_keepalive_idle
-  ## ValueType: Duration
-  ## Default: 900s
-  socket_keepalive_idle: 900s
-
-  ## TCP Keepalive probes interval.
-  ##
-  ## @doc rpc.socket_keepalive_interval
-  ## ValueType: Duration
-  ## Default: 75s
-  socket_keepalive_interval: 75s
-
-  ## Probes lost to close the connection
-  ##
-  ## @doc rpc.socket_keepalive_count
-  ## ValueType: Integer
-  ## Default: 9
-  socket_keepalive_count: 9
-
-  ## Size of TCP send buffer.
-  ##
-  ## @doc rpc.socket_sndbuf
-  ## ValueType: Size
-  ## Default: 1MB
-  socket_sndbuf: 1MB
-
-  ## Size of TCP receive buffer.
-  ##
-  ## @doc rpc.socket_recbuf
-  ## ValueType: Size
-  ## Default: 1MB
-  socket_recbuf: 1MB
-
-  ## Size of user-level software socket buffer.
-  ##
-  ## @doc rpc.socket_buffer
-  ## ValueType: Size
-  ## Default: 1MB
-  socket_buffer: 1MB
-}
-
 ##==================================================================
 ## Broker
 ##==================================================================

+ 5 - 323
apps/emqx/src/emqx_schema.erl

@@ -24,7 +24,6 @@
 
 -include_lib("typerefl/include/types.hrl").
 
--type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all.
 -type duration() :: integer().
 -type duration_s() :: integer().
 -type duration_ms() :: integer().
@@ -60,179 +59,18 @@
 
 -behaviour(hocon_schema).
 
--reflect_type([ log_level/0, duration/0, duration_s/0, duration_ms/0,
+-reflect_type([ duration/0, duration_s/0, duration_ms/0,
                 bytesize/0, wordsize/0, percent/0, file/0,
                 comma_separated_list/0, bar_separated_list/0, ip_port/0,
                 cipher/0,
                 comma_separated_atoms/0]).
 
--export([structs/0, fields/1, translations/0, translation/1]).
+-export([structs/0, fields/1]).
 -export([t/1, t/3, t/4, ref/1]).
 -export([conf_get/2, conf_get/3, keys/2, filter/1]).
 -export([ssl/1]).
 
-%% will be used by emqx_ct_helper to find the dependent apps
--export([includes/0, extra_schema_fields/1]).
-
-structs() -> ["cluster", "node", "rpc", "log",
-              "zones", "listeners", "broker",
-              "plugins", "sysmon", "alarm"]
-             ++ ?MODULE:includes().
-
--ifndef(EMQX_EXT_SCHEMAS).
-includes() -> [].
--else.
-includes() ->
-    [FieldName || {FieldName, _SchemaMod} <- ?EMQX_EXT_SCHEMAS].
--endif.
-
-fields("cluster") ->
-    [ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
-    , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]),
-        undefined, manual)}
-    , {"autoclean", t(duration(), "ekka.cluster_autoclean", "5m")}
-    , {"autoheal", t(boolean(), "ekka.cluster_autoheal", true)}
-    , {"static", ref("static")}
-    , {"mcast", ref("mcast")}
-    , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)}
-    , {"dns", ref("dns")}
-    , {"etcd", ref("etcd")}
-    , {"k8s", ref("k8s")}
-    , {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)}
-    , {"rlog", ref("rlog")}
-    ];
-
-fields("static") ->
-    [ {"seeds", t(hoconsc:array(string()), undefined, [])}];
-
-fields("mcast") ->
-    [ {"addr", t(string(), undefined, "239.192.0.1")}
-    , {"ports", t(hoconsc:array(integer()), undefined, [4369, 4370])}
-    , {"iface", t(string(), undefined, "0.0.0.0")}
-    , {"ttl", t(range(0, 255), undefined, 255)}
-    , {"loop", t(boolean(), undefined, true)}
-    , {"sndbuf", t(bytesize(), undefined, "16KB")}
-    , {"recbuf", t(bytesize(), undefined, "16KB")}
-    , {"buffer", t(bytesize(), undefined, "32KB")}
-    ];
-
-fields("dns") ->
-    [ {"name", t(string(), undefined, "localhost")}
-    , {"app", t(string(), undefined, "emqx")}];
-
-fields("etcd") ->
-    [ {"server", t(comma_separated_list())}
-    , {"prefix", t(string(), undefined, "emqxcl")}
-    , {"node_ttl", t(duration(), undefined, "1m")}
-    , {"ssl", ref("etcd_ssl")}
-    ];
-
-fields("etcd_ssl") ->
-    ssl(#{});
-
-fields("k8s") ->
-    [ {"apiserver", t(string())}
-    , {"service_name", t(string(), undefined, "emqx")}
-    , {"address_type", t(union([ip, dns, hostname]))}
-    , {"app_name", t(string(), undefined, "emqx")}
-    , {"namespace", t(string(), undefined, "default")}
-    , {"suffix", t(string(), undefined, "pod.local")}
-    ];
-
-fields("rlog") ->
-    [ {"role", t(union([core, replicant]), "ekka.node_role", core)}
-    , {"core_nodes", t(comma_separated_atoms(), "ekka.core_nodes", [])}
-    ];
-
-fields("node") ->
-    [ {"name", hoconsc:t(string(), #{default => "emqx@127.0.0.1",
-                                     override_env => "EMQX_NODE_NAME"
-                                    })}
-    , {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie",
-                                       default => "emqxsecretcookie",
-                                       sensitive => true,
-                                       override_env => "EMQX_NODE_COOKIE"
-                                      })}
-    , {"data_dir", t(string())}
-    , {"config_files", t(comma_separated_list())}
-    , {"global_gc_interval", t(duration(), undefined, "15m")}
-    , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
-    , {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
-    , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
-    , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)}
-    , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)}
-    ];
-
-fields("rpc") ->
-    [ {"mode", t(union(sync, async), undefined, async)}
-    , {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)}
-    , {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)}
-    , {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)}
-    , {"tcp_client_num", t(range(1, 256), undefined, 1)}
-    , {"connect_timeout", t(duration(), "gen_rpc.connect_timeout", "5s")}
-    , {"send_timeout", t(duration(), "gen_rpc.send_timeout", "5s")}
-    , {"authentication_timeout", t(duration(), "gen_rpc.authentication_timeout", "5s")}
-    , {"call_receive_timeout", t(duration(), "gen_rpc.call_receive_timeout", "15s")}
-    , {"socket_keepalive_idle", t(duration_s(), "gen_rpc.socket_keepalive_idle", "7200s")}
-    , {"socket_keepalive_interval", t(duration_s(), "gen_rpc.socket_keepalive_interval", "75s")}
-    , {"socket_keepalive_count", t(integer(), "gen_rpc.socket_keepalive_count", 9)}
-    , {"socket_sndbuf", t(bytesize(), "gen_rpc.socket_sndbuf", "1MB")}
-    , {"socket_recbuf", t(bytesize(), "gen_rpc.socket_recbuf", "1MB")}
-    , {"socket_buffer", t(bytesize(), "gen_rpc.socket_buffer", "1MB")}
-    ];
-
-fields("log") ->
-    [ {"primary_level", t(log_level(), undefined, warning)}
-    , {"console_handler", ref("console_handler")}
-    , {"file_handlers", ref("file_handlers")}
-    , {"time_offset", t(string(), undefined, "system")}
-    , {"chars_limit", maybe_infinity(range(1, inf))}
-    , {"supervisor_reports", t(union([error, progress]), undefined, error)}
-    , {"max_depth", t(union([infinity, integer()]),
-                      "kernel.error_logger_format_depth", 80)}
-    , {"formatter", t(union([text, json]), undefined, text)}
-    , {"single_line", t(boolean(), undefined, true)}
-    , {"sync_mode_qlen", t(integer(), undefined, 100)}
-    , {"drop_mode_qlen", t(integer(), undefined, 3000)}
-    , {"flush_qlen", t(integer(), undefined, 8000)}
-    , {"overload_kill", ref("log_overload_kill")}
-    , {"burst_limit", ref("log_burst_limit")}
-    , {"error_logger", t(atom(), "kernel.error_logger", silent)}
-    ];
-
-fields("console_handler") ->
-    [ {"enable", t(boolean(), undefined, false)}
-    , {"level", t(log_level(), undefined, warning)}
-    ];
-
-fields("file_handlers") ->
-    [ {"$name", ref("log_file_handler")}
-    ];
-
-fields("log_file_handler") ->
-    [ {"level", t(log_level(), undefined, warning)}
-    , {"file", t(file(), undefined, undefined)}
-    , {"rotation", ref("log_rotation")}
-    , {"max_size", maybe_infinity(bytesize(), "10MB")}
-    ];
-
-fields("log_rotation") ->
-    [ {"enable", t(boolean(), undefined, true)}
-    , {"count", t(range(1, 2048), undefined, 10)}
-    ];
-
-fields("log_overload_kill") ->
-    [ {"enable", t(boolean(), undefined, true)}
-    , {"mem_size", t(bytesize(), undefined, "30MB")}
-    , {"qlen", t(integer(), undefined, 20000)}
-    , {"restart_after", t(union(duration(), infinity), undefined, "5s")}
-    ];
-
-fields("log_burst_limit") ->
-    [ {"enable", t(boolean(), undefined, true)}
-    , {"max_count", t(integer(), undefined, 10000)}
-    , {"window_time", t(duration(), undefined, "1s")}
-    ];
+structs() -> ["zones", "listeners", "broker", "plugins", "sysmon", "alarm"].
 
 fields("stats") ->
     [ {"enable", t(boolean(), undefined, true)}
@@ -480,20 +318,7 @@ fields("alarm") ->
     [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])}
     , {"size_limit", t(integer(), undefined, 1000)}
     , {"validity_period", t(duration(), undefined, "24h")}
-    ];
-
-fields(FieldName) ->
-    ?MODULE:extra_schema_fields(FieldName).
-
--ifndef(EMQX_EXT_SCHEMAS).
-%% Function extra_schema_fields/1 only terminates with explicit exception
--dialyzer([{nowarn_function, [extra_schema_fields/1]}]).
-extra_schema_fields(FieldName) -> error({unknown_field, FieldName}).
--else.
-extra_schema_fields(FieldName) ->
-    {_, Mod} = lists:keyfind(FieldName, 1, ?EMQX_EXT_SCHEMAS),
-    Mod:fields(FieldName).
--endif.
+    ].
 
 mqtt_listener() ->
     base_listener() ++
@@ -509,117 +334,6 @@ base_listener() ->
     , {"rate_limit", ref("rate_limit")}
     ].
 
-translations() -> ["ekka", "kernel", "emqx"].
-
-translation("ekka") ->
-    [ {"cluster_discovery", fun tr_cluster__discovery/1}];
-
-translation("kernel") ->
-    [ {"logger_level", fun tr_logger_level/1}
-    , {"logger", fun tr_logger/1}];
-
-translation("emqx") ->
-    [ {"config_files", fun tr_config_files/1}
-    ].
-
-tr_config_files(Conf) ->
-    case conf_get("emqx.config_files", Conf) of
-        [_ | _] = Files ->
-            Files;
-        _ ->
-            case os:getenv("RUNNER_ETC_DIR") of
-                false ->
-                    [filename:join([code:lib_dir(emqx), "etc", "emqx.conf"])];
-                Dir ->
-                    [filename:join([Dir, "emqx.conf"])]
-            end
-    end.
-
-tr_cluster__discovery(Conf) ->
-    Strategy = conf_get("cluster.discovery_strategy", Conf),
-    {Strategy, filter(options(Strategy, Conf))}.
-
-tr_logger_level(Conf) -> conf_get("log.primary_level", Conf).
-
-tr_logger(Conf) ->
-    CharsLimit = case conf_get("log.chars_limit", Conf) of
-                     infinity -> unlimited;
-                     V -> V
-                 end,
-    SingleLine = conf_get("log.single_line", Conf),
-    FmtName = conf_get("log.formatter", Conf),
-    Formatter = formatter(FmtName, CharsLimit, SingleLine),
-    BasicConf = #{
-        sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf),
-        drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf),
-        flush_qlen => conf_get("log.flush_qlen", Conf),
-        overload_kill_enable => conf_get("log.overload_kill.enable", Conf),
-        overload_kill_qlen => conf_get("log.overload_kill.qlen", Conf),
-        overload_kill_mem_size => conf_get("log.overload_kill.mem_size", Conf),
-        overload_kill_restart_after => conf_get("log.overload_kill.restart_after", Conf),
-        burst_limit_enable => conf_get("log.burst_limit.enable", Conf),
-        burst_limit_max_count => conf_get("log.burst_limit.max_count", Conf),
-        burst_limit_window_time => conf_get("log.burst_limit.window_time", Conf)
-    },
-    Filters = case conf_get("log.supervisor_reports", Conf) of
-                  error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}];
-                  progress -> []
-              end,
-    %% For the default logger that outputs to console
-    ConsoleHandler =
-        case conf_get("log.console_handler.enable", Conf) of
-            true ->
-                [{handler, console, logger_std_h, #{
-                    level => conf_get("log.console_handler.level", Conf),
-                    config => BasicConf#{type => standard_io},
-                    formatter => Formatter,
-                    filters => Filters
-                }}];
-            false -> []
-        end,
-    %% For the file logger
-    FileHandlers =
-        [{handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{
-                level => conf_get("level", SubConf),
-                config => BasicConf#{
-                    type => case conf_get("rotation.enable", SubConf) of
-                                true -> wrap;
-                                _ -> halt
-                            end,
-                    file => conf_get("file", SubConf),
-                    max_no_files => conf_get("rotation.count", SubConf),
-                    max_no_bytes => conf_get("max_size", SubConf)
-                },
-                formatter => Formatter,
-                filters => Filters,
-                filesync_repeat_interval => no_repeat
-            }}
-        || {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf, #{}))],
-
-    [{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers.
-
-%% helpers
-formatter(json, CharsLimit, SingleLine) ->
-    {emqx_logger_jsonfmt,
-        #{chars_limit => CharsLimit,
-            single_line => SingleLine
-        }};
-formatter(text, CharsLimit, SingleLine) ->
-    {emqx_logger_textfmt,
-        #{template =>
-        [time," [",level,"] ",
-            {clientid,
-                [{peername,
-                    [clientid,"@",peername," "],
-                    [clientid, " "]}],
-                [{peername,
-                    [peername," "],
-                    []}]},
-            msg,"\n"],
-            chars_limit => CharsLimit,
-            single_line => SingleLine
-        }}.
-
 %% utils
 -spec(conf_get(string() | [string()], hocon:config()) -> term()).
 conf_get(Key, Conf) ->
@@ -740,8 +454,7 @@ t(Type, Mapping, Default, OverrideEnv, Validator) ->
                      , validator => Validator
                      }).
 
-ref(Field) ->
-    fun (type) -> Field; (_) -> undefined end.
+ref(Field) -> hoconsc:t(hoconsc:ref(?MODULE, Field)).
 
 maybe_disabled(T) ->
     maybe_sth(disabled, T, disabled).
@@ -817,37 +530,6 @@ to_erl_cipher_suite(Str) ->
         Cipher -> Cipher
     end.
 
-options(static, Conf) ->
-    [{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}];
-options(mcast, Conf) ->
-    {ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)),
-    {ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)),
-    Ports = conf_get("cluster.mcast.ports", Conf),
-    [{addr, Addr}, {ports, Ports}, {iface, Iface},
-     {ttl, conf_get("cluster.mcast.ttl", Conf, 1)},
-     {loop, conf_get("cluster.mcast.loop", Conf, true)}];
-options(dns, Conf) ->
-    [{name, conf_get("cluster.dns.name", Conf)},
-     {app, conf_get("cluster.dns.app", Conf)}];
-options(etcd, Conf) ->
-    Namespace = "cluster.etcd.ssl",
-    SslOpts = fun(C) ->
-        Options = keys(Namespace, C),
-        lists:map(fun(Key) -> {to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end,
-    [{server, conf_get("cluster.etcd.server", Conf)},
-     {prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")},
-     {node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)},
-     {ssl_options, filter(SslOpts(Conf))}];
-options(k8s, Conf) ->
-    [{apiserver, conf_get("cluster.k8s.apiserver", Conf)},
-     {service_name, conf_get("cluster.k8s.service_name", Conf)},
-     {address_type, conf_get("cluster.k8s.address_type", Conf, ip)},
-     {app_name, conf_get("cluster.k8s.app_name", Conf)},
-     {namespace, conf_get("cluster.k8s.namespace", Conf)},
-     {suffix, conf_get("cluster.k8s.suffix", Conf, "")}];
-options(manual, _Conf) ->
-    [].
-
 to_atom(Atom) when is_atom(Atom) ->
     Atom;
 to_atom(Str) when is_list(Str) ->

+ 696 - 0
apps/emqx_machine/etc/emqx_machine.conf

@@ -0,0 +1,696 @@
+## NOTE: The configurations in this file will be overridden by
+## `<path-to-emqx-installation>/data/emqx_overrides.conf`
+
+##==================================================================
+## Node
+##==================================================================
+node {
+  ## Node name.
+  ## See: http://erlang.org/doc/reference_manual/distributed.html
+  ##
+  ## @doc node.name
+  ## ValueType: NodeName
+  ## Default: emqx@127.0.0.1
+  name: "emqx@127.0.0.1"
+
+  ## Cookie for distributed node communication.
+  ##
+  ## @doc node.cookie
+  ## ValueType: String
+  ## Default: emqxsecretcookie
+  cookie: emqxsecretcookie
+
+  ## Data dir for the node
+  ##
+  ## @doc node.data_dir
+  ## ValueType: Folder
+  ## Default: "{{ platform_data_dir }}/"
+  data_dir: "{{ platform_data_dir }}/"
+
+  ## Dir of crash dump file.
+  ##
+  ## @doc node.crash_dump_dir
+  ## ValueType: Folder
+  ## Default: "{{ platform_log_dir }}/"
+  crash_dump_dir: "{{ platform_log_dir }}/"
+
+  ## Global GC Interval.
+  ##
+  ## @doc node.global_gc_interval
+  ## ValueType: Duration
+  ## Default: 15m
+  global_gc_interval: 15m
+
+  ## Sets the net_kernel tick time in seconds.
+  ## Notice that all communicating nodes are to have the same
+  ## TickTime value specified.
+  ##
+  ## See: http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
+  ##
+  ## @doc node.dist_net_ticktime
+  ## ValueType: Number
+  ## Default: 2m
+  dist_net_ticktime: 2m
+
+  ## Sets the port range for the listener socket of a distributed
+  ## Erlang node.
+  ## Note that if there are firewalls between clustered nodes, this
+  ## port segment for nodes’ communication should be allowed.
+  ##
+  ## See: http://www.erlang.org/doc/man/kernel_app.html
+  ##
+  ## @doc node.dist_listen_min
+  ## ValueType: Integer
+  ## Range: [1024,65535]
+  ## Default: 6369
+  dist_listen_min: 6369
+
+  ## Sets the port range for the listener socket of a distributed
+  ## Erlang node.
+  ## Note that if there are firewalls between clustered nodes, this
+  ## port segment for nodes’ communication should be allowed.
+  ##
+  ## See: http://www.erlang.org/doc/man/kernel_app.html
+  ##
+  ## @doc node.dist_listen_max
+  ## ValueType: Integer
+  ## Range: [1024,65535]
+  ## Default: 6369
+  dist_listen_max: 6369
+
+  ## Sets the maximum depth of call stack back-traces in the exit
+  ## reason element of 'EXIT' tuples.
+  ## The flag also limits the stacktrace depth returned by
+  ## process_info item current_stacktrace.
+  ##
+  ## @doc node.backtrace_depth
+  ## ValueType: Integer
+  ## Range: [0,1024]
+  ## Default: 23
+  backtrace_depth: 23
+
+}
+
+##==================================================================
+## Cluster
+##==================================================================
+cluster {
+  ## Cluster name.
+  ##
+  ## @doc cluster.name
+  ## ValueType: String
+  ## Default: emqxcl
+  name: emqxcl
+
+  ## Enable cluster autoheal from network partition.
+  ##
+  ## @doc cluster.autoheal
+  ## ValueType: Boolean
+  ## Default: true
+  autoheal: true
+
+  ## Autoclean down node. A down node will be removed from the cluster
+  ## if this value > 0.
+  ##
+  ## @doc cluster.autoclean
+  ## ValueType: Duration
+  ## Default: 5m
+  autoclean: 5m
+
+  ## Node discovery strategy to join the cluster.
+  ##
+  ## @doc cluster.discovery_strategy
+  ## ValueType: manual | static | mcast | dns | etcd | k8s
+  ##   - manual: Manual join command
+  ##   - static: Static node list
+  ##   - mcast:  IP Multicast
+  ##   - dns:    DNS A Record
+  ##   - etcd:   etcd
+  ##   - k8s:    Kubernetes
+  ##
+  ## Default: manual
+  discovery_strategy: manual
+
+  ##----------------------------------------------------------------
+  ## Cluster using static node list
+  ##----------------------------------------------------------------
+  static {
+    ## Node list of the cluster
+    ##
+    ## @doc cluster.static.seeds
+    ## ValueType: Array<NodeName>
+    ## Default: []
+    seeds: ["emqx1@127.0.0.1", "emqx2@127.0.0.1"]
+  }
+
+  ##----------------------------------------------------------------
+  ## Cluster using IP Multicast
+  ##----------------------------------------------------------------
+  mcast {
+    ## IP Multicast Address.
+    ##
+    ## @doc cluster.mcast.addr
+    ## ValueType: IPAddress
+    ## Default: "239.192.0.1"
+    addr: "239.192.0.1"
+
+    ## Multicast Ports.
+    ##
+    ## @doc cluster.mcast.ports
+    ## ValueType: Array<Port>
+    ## Default: [4369, 4370]
+    ports: [4369, 4370]
+
+    ## Multicast Iface.
+    ##
+    ## @doc cluster.mcast.iface
+    ## ValueType: IPAddress
+    ## Default: "0.0.0.0"
+    iface: "0.0.0.0"
+
+    ## Multicast Ttl.
+    ##
+    ## @doc cluster.mcast.ttl
+    ## ValueType: Integer
+    ## Range: [0,255]
+    ## Default: 255
+    ttl: 255
+
+    ## Multicast loop.
+    ##
+    ## @doc cluster.mcast.loop
+    ## ValueType: Boolean
+    ## Default: true
+    loop: true
+  }
+
+  ##----------------------------------------------------------------
+  ## Cluster using DNS A records
+  ##----------------------------------------------------------------
+  dns {
+    ## DNS name.
+    ##
+    ## @doc cluster.dns.name
+    ## ValueType: String
+    ## Default: localhost
+    name: localhost
+
+    ## The App name is used to build 'node.name' with IP address.
+    ##
+    ## @doc cluster.dns.app
+    ## ValueType: String
+    ## Default: emqx
+    app: emqx
+  }
+
+  ##----------------------------------------------------------------
+  ## Cluster using etcd
+  ##----------------------------------------------------------------
+  etcd {
+    ## Etcd server list, seperated by ','.
+    ##
+    ## @doc cluster.etcd.server
+    ## ValueType: URL
+    ## Required: true
+    server: "http://127.0.0.1:2379"
+
+    ## The prefix helps build nodes path in etcd. Each node in the cluster
+    ## will create a path in etcd: v2/keys/<prefix>/<name>/<node.name>
+    ##
+    ## @doc cluster.etcd.prefix
+    ## ValueType: String
+    ## Default: emqxcl
+    prefix: emqxcl
+
+    ## The TTL for node's path in etcd.
+    ##
+    ## @doc cluster.etcd.node_ttl
+    ## ValueType: Duration
+    ## Default: 1m
+    node_ttl: 1m
+
+    ## Path to the file containing the user's private PEM-encoded key.
+    ##
+    ## @doc cluster.etcd.ssl.keyfile
+    ## ValueType: File
+    ## Default: "{{ platform_etc_dir }}/certs/key.pem"
+    ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem"
+
+    ## Path to a file containing the user certificate.
+    ##
+    ## @doc cluster.etcd.ssl.certfile
+    ## ValueType: File
+    ## Default: "{{ platform_etc_dir }}/certs/cert.pem"
+    ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem"
+
+    ## Path to the file containing PEM-encoded CA certificates. The CA certificates
+    ## are used during server authentication and when building the client certificate chain.
+    ##
+    ## @doc cluster.etcd.ssl.cacertfile
+    ## ValueType: File
+    ## Default: "{{ platform_etc_dir }}/certs/cacert.pem"
+    ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
+  }
+
+  ##----------------------------------------------------------------
+  ## Cluster using Kubernetes
+  ##----------------------------------------------------------------
+  k8s {
+    ## Kubernetes API server list, seperated by ','.
+    ##
+    ## @doc cluster.k8s.apiserver
+    ## ValueType: URL
+    ## Required: true
+    apiserver: "http://10.110.111.204:8080"
+
+    ## The service name helps lookup EMQ nodes in the cluster.
+    ##
+    ## @doc cluster.k8s.service_name
+    ## ValueType: String
+    ## Default: emqx
+    service_name: emqx
+
+    ## The address type is used to extract host from k8s service.
+    ##
+    ## @doc cluster.k8s.address_type
+    ## ValueType: ip | dns | hostname
+    ## Default: ip
+    address_type: ip
+
+    ## The app name helps build 'node.name'.
+    ##
+    ## @doc cluster.k8s.app_name
+    ## ValueType: String
+    ## Default: emqx
+    app_name: emqx
+
+    ## The suffix added to dns and hostname get from k8s service
+    ##
+    ## @doc cluster.k8s.suffix
+    ## ValueType: String
+    ## Default: "pod.local"
+    suffix: "pod.local"
+
+    ## Kubernetes Namespace
+    ##
+    ## @doc cluster.k8s.namespace
+    ## ValueType: String
+    ## Default: default
+    namespace: default
+  }
+
+  db_backend: mnesia
+
+  rlog: {
+      # role: core
+      # core_nodes: []
+  }
+
+}
+
+##==================================================================
+## Log
+##==================================================================
+log {
+  ## The primary log level
+  ##
+  ## - all the log messages with levels lower than this level will
+  ##   be dropped.
+  ## - all the log messages with levels higher than this level will
+  ##   go into the log handlers. The handlers then decide to log it
+  ##   out or drop it according to the level setting of the handler.
+  ##
+  ## Note: Only the messages with severity level higher than or
+  ## equal to this level will be logged.
+  ##
+  ## @doc log.primary_level
+  ## ValueType: debug | info | notice | warning | error | critical | alert | emergency
+  ## Default: warning
+  primary_level: warning
+
+  ##----------------------------------------------------------------
+  ## The console log handler send log messages to emqx console
+  ##----------------------------------------------------------------
+  ## Log to single line
+  ## @doc log.console_handler.enable
+  ## ValueType: Boolean
+  ## Default: false
+  console_handler.enable: false
+
+  ## The log level of this handler
+  ## All the log messages with levels lower than this level will
+  ## be dropped.
+  ##
+  ## @doc log.console_handler.level
+  ## ValueType: debug | info | notice | warning | error | critical | alert | emergency
+  ## Default: warning
+  console_handler.level: warning
+
+  ##----------------------------------------------------------------
+  ## The file log handlers send log messages to files
+  ##----------------------------------------------------------------
+  ## file_handlers.<name>
+  file_handlers.emqx_log: {
+    ## The log level filter of this handler
+    ## All the log messages with levels lower than this level will
+    ## be dropped.
+    ##
+    ## @doc log.file_handlers.<name>.level
+    ## ValueType: debug | info | notice | warning | error | critical | alert | emergency
+    ## Default: warning
+    level: warning
+
+    ## The log file for specified level.
+    ##
+    ## If `rotation` is disabled, this is the file of the log files.
+    ##
+    ## If `rotation` is enabled, this is the base name of the files.
+    ## Each file in a rotated log is named <base_name>.N, where N is an integer.
+    ##
+    ## Note: Log files for a specific log level will only contain all the logs
+    ##       that higher than or equal to that level
+    ##
+    ## @doc log.file_handlers.<name>.file
+    ## ValueType: File
+    ## Required: true
+    file: "{{ platform_log_dir }}/emqx.log"
+
+    ## Enables the log rotation.
+    ## With this enabled, new log files will be created when the current
+    ## log file is full, max to `rotation_count` files will be created.
+    ##
+    ## @doc log.file_handlers.<name>.rotation.enable
+    ## ValueType: Boolean
+    ## Default: true
+    rotation.enable: true
+
+    ## Maximum rotation count of log files.
+    ##
+    ## @doc log.file_handlers.<name>.rotation.count
+    ## ValueType: Integer
+    ## Range: [1, 2048]
+    ## Default: 10
+    rotation.count: 10
+
+    ## Maximum size of each log file.
+    ##
+    ## If the max_size reached and `rotation` is disabled, the handler
+    ## will stop sending log messages, if the `rotation` is enabled,
+    ## the file rotates.
+    ##
+    ## @doc log.file_handlers.<name>.max_size
+    ## ValueType: Size | infinity
+    ## Default: 10MB
+    max_size: 10MB
+  }
+
+  ## file_handlers.<name>
+  ##
+  ## You could also create multiple file handlers for different
+  ## log level for example:
+  file_handlers.emqx_error_log: {
+    level: error
+    file: "{{ platform_log_dir }}/error.log"
+  }
+
+  ## Timezone offset to display in logs
+  ##
+  ## @doc log.time_offset
+  ## ValueType: system | utc | String
+  ##  - "system" use system zone
+  ##  - "utc" for Universal Coordinated Time (UTC)
+  ##  - "+hh:mm" or "-hh:mm" for a specified offset
+  ## Default: system
+  time_offset: system
+
+  ## Limits the total number of characters printed for each log event.
+  ##
+  ## @doc log.chars_limit
+  ## ValueType: Integer | infinity
+  ## Range: [0, infinity)
+  ## Default: infinity
+  chars_limit: infinity
+
+  ## Maximum depth for Erlang term log formatting
+  ## and Erlang process message queue inspection.
+  ##
+  ## @doc log.max_depth
+  ## ValueType: Integer | infinity
+  ## Default: 80
+  max_depth: 80
+
+  ## Log formatter
+  ## @doc log.formatter
+  ## ValueType: text | json
+  ## Default: text
+  formatter: text
+
+  ## Log to single line
+  ## @doc log.single_line
+  ## ValueType: Boolean
+  ## Default: true
+  single_line: true
+
+  ## The max allowed queue length before switching to sync mode.
+  ##
+  ## Log overload protection parameter. If the message queue grows
+  ## larger than this value the handler switches from anync to sync mode.
+  ##
+  ## @doc log.sync_mode_qlen
+  ## ValueType: Integer
+  ## Range: [0, ${log.drop_mode_qlen}]
+  ## Default: 100
+  sync_mode_qlen: 100
+
+  ## The max allowed queue length before switching to drop mode.
+  ##
+  ## Log overload protection parameter. When the message queue grows
+  ## larger than this threshold, the handler switches to a mode in which
+  ## it drops all new events that senders want to log.
+  ##
+  ## @doc log.drop_mode_qlen
+  ## ValueType: Integer
+  ## Range: [${log.sync_mode_qlen}, ${log.flush_qlen}]
+  ## Default: 3000
+  drop_mode_qlen: 3000
+
+  ## The max allowed queue length before switching to flush mode.
+  ##
+  ## Log overload protection parameter. If the length of the message queue
+  ## grows larger than this threshold, a flush (delete) operation takes place.
+  ## To flush events, the handler discards the messages in the message queue
+  ## by receiving them in a loop without logging.
+  ##
+  ## @doc log.flush_qlen
+  ## ValueType: Integer
+  ## Range: [${log.drop_mode_qlen}, infinity)
+  ## Default: 8000
+  flush_qlen: 8000
+
+  ## Kill the log handler when it gets overloaded.
+  ##
+  ## Log overload protection parameter. It is possible that a handler,
+  ## even if it can successfully manage peaks of high load without crashing,
+  ## can build up a large message queue, or use a large amount of memory.
+  ## We could kill the log handler in these cases and restart it after a
+  ## few seconds.
+  ##
+  ## @doc log.overload_kill.enable
+  ## ValueType: Boolean
+  ## Default: true
+  overload_kill.enable: true
+
+  ## The max allowed queue length before killing the log hanlder.
+  ##
+  ## Log overload protection parameter. This is the maximum allowed queue
+  ## length. If the message queue grows larger than this, the handler
+  ## process is terminated.
+  ##
+  ## @doc log.overload_kill.qlen
+  ## ValueType: Integer
+  ## Range: [0, 1048576]
+  ## Default: 20000
+  overload_kill.qlen: 20000
+
+  ## The max allowed memory size before killing the log hanlder.
+  ##
+  ## Log overload protection parameter. This is the maximum memory size
+  ## that the handler process is allowed to use. If the handler grows
+  ## larger than this, the process is terminated.
+  ##
+  ## @doc log.overload_kill.mem_size
+  ## ValueType: Size
+  ## Default: 30MB
+  overload_kill.mem_size: 30MB
+
+  ## Restart the log hanlder after some seconds.
+  ##
+  ## Log overload protection parameter. If the handler is terminated,
+  ## it restarts automatically after a delay specified in seconds.
+  ##
+  ## @doc log.overload_kill.restart_after
+  ## ValueType: Duration
+  ## Default: 5s
+  overload_kill.restart_after: 5s
+
+  ## Controlling Bursts of Log Requests.
+  ##
+  ## Log overload protection parameter. Large bursts of log events - many
+  ## events received by the handler under a short period of time - can
+  ## potentially cause problems. By specifying the maximum number of events
+  ## to be handled within a certain time frame, the handler can avoid
+  ## choking the log with massive amounts of printouts.
+  ##
+  ## Note that there would be no warning if any messages were
+  ## dropped because of burst control.
+  ##
+  ## @doc log.burst_limit.enable
+  ## ValueType: Boolean
+  ## Default: false
+  burst_limit.enable: false
+
+  ## This config controls the maximum number of events to handle within
+  ## a time frame. After the limit is reached, successive events are
+  ## dropped until the end of the time frame defined by `window_time`.
+  ##
+  ## @doc log.burst_limit.max_count
+  ## ValueType: Integer
+  ## Default: 10000
+  burst_limit.max_count: 10000
+
+  ## See the previous description of burst_limit_max_count.
+  ##
+  ## @doc log.burst_limit.window_time
+  ## ValueType: duration
+  ## Default: 1s
+  burst_limit.window_time: 1s
+}
+
+##==================================================================
+## RPC
+##==================================================================
+rpc {
+  ## RPC Mode.
+  ##
+  ## @doc rpc.mode
+  ## ValueType: sync | async
+  ## Default: async
+  mode: async
+
+  ## Max batch size of async RPC requests.
+  ##
+  ## NOTE: RPC batch won't work when rpc.mode = sync
+  ## Zero value disables rpc batching.
+  ##
+  ## @doc rpc.async_batch_size
+  ## ValueType: Integer
+  ## Range: [0, 1048576]
+  ## Default: 0
+  async_batch_size: 256
+
+  ## RPC port discovery
+  ##
+  ## The strategy for discovering the RPC listening port of
+  ## other nodes.
+  ##
+  ## @doc cluster.discovery_strategy
+  ## ValueType: manual | stateless
+  ##   - manual: discover ports by `tcp_server_port`.
+  ##   - stateless: discover ports in a stateless manner.
+  ##     If node name is `emqx<N>@127.0.0.1`, where the `<N>` is
+  ##     an integer, then the listening port will be `5370 + <N>`
+  ##
+  ## Default: `stateless`.
+  port_discovery: stateless
+
+  ## TCP server port for RPC.
+  ##
+  ## Only takes effect when `rpc.port_discovery` = `manual`.
+  ##
+  ## @doc rpc.tcp_server_port
+  ## ValueType: Integer
+  ## Range: [1024-65535]
+  ## Defaults: 5369
+  tcp_server_port: 5369
+
+  ## Number of outgoing RPC connections.
+  ##
+  ## Set this to 1 to keep the message order sent from the same
+  ## client.
+  ##
+  ## @doc rpc.tcp_client_num
+  ## ValueType: Integer
+  ## Range: [1, 256]
+  ## Defaults: 1
+  tcp_client_num: 1
+
+  ## RCP Client connect timeout.
+  ##
+  ## @doc rpc.connect_timeout
+  ## ValueType: Duration
+  ## Default: 5s
+  connect_timeout: 5s
+
+  ## TCP send timeout of RPC client and server.
+  ##
+  ## @doc rpc.send_timeout
+  ## ValueType: Duration
+  ## Default: 5s
+  send_timeout: 5s
+
+  ## Authentication timeout
+  ##
+  ## @doc rpc.authentication_timeout
+  ## ValueType: Duration
+  ## Default: 5s
+  authentication_timeout: 5s
+
+  ## Default receive timeout for call() functions
+  ##
+  ## @doc rpc.call_receive_timeout
+  ## ValueType: Duration
+  ## Default: 15s
+  call_receive_timeout: 15s
+
+  ## Socket idle keepalive.
+  ##
+  ## @doc rpc.socket_keepalive_idle
+  ## ValueType: Duration
+  ## Default: 900s
+  socket_keepalive_idle: 900s
+
+  ## TCP Keepalive probes interval.
+  ##
+  ## @doc rpc.socket_keepalive_interval
+  ## ValueType: Duration
+  ## Default: 75s
+  socket_keepalive_interval: 75s
+
+  ## Probes lost to close the connection
+  ##
+  ## @doc rpc.socket_keepalive_count
+  ## ValueType: Integer
+  ## Default: 9
+  socket_keepalive_count: 9
+
+  ## Size of TCP send buffer.
+  ##
+  ## @doc rpc.socket_sndbuf
+  ## ValueType: Size
+  ## Default: 1MB
+  socket_sndbuf: 1MB
+
+  ## Size of TCP receive buffer.
+  ##
+  ## @doc rpc.socket_recbuf
+  ## ValueType: Size
+  ## Default: 1MB
+  socket_recbuf: 1MB
+
+  ## Size of user-level software socket buffer.
+  ##
+  ## @doc rpc.socket_buffer
+  ## ValueType: Size
+  ## Default: 1MB
+  socket_buffer: 1MB
+}

+ 11 - 0
apps/emqx_machine/src/emqx_machine_app.erl

@@ -30,6 +30,8 @@ start(_Type, _Args) ->
     ok = print_otp_version_warning(),
     _ = load_modules(),
 
+    ok = load_config_files(),
+
     {ok, _} = application:ensure_all_started(emqx),
 
     _ = emqx_plugins:load(),
@@ -75,3 +77,12 @@ load_modules() ->
 start_modules() ->
     ok.
 -endif.
+
+load_config_files() ->
+    %% the app env 'config_files' for 'emqx` app should be set
+    %% in app.time.config by boot script before starting Erlang VM
+    ConfFiles = application:get_env(emqx, config_files, []),
+    %% emqx_machine_schema is a superset of emqx_schema
+    ok = emqx_config:init_load(emqx_machine_schema, ConfFiles),
+    %% to avoid config being loaded again when emqx app starts.
+    ok = emqx_app:set_init_config_load_done().

+ 421 - 0
apps/emqx_machine/src/emqx_machine_schema.erl

@@ -0,0 +1,421 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_machine_schema).
+
+-dialyzer(no_return).
+-dialyzer(no_match).
+-dialyzer(no_contracts).
+-dialyzer(no_unused).
+-dialyzer(no_fail_call).
+
+-include_lib("typerefl/include/types.hrl").
+
+-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all.
+-type file() :: string().
+-type cipher() :: map().
+
+-behaviour(hocon_schema).
+
+-reflect_type([ log_level/0,
+                file/0,
+                cipher/0]).
+
+-export([structs/0, fields/1, translations/0, translation/1]).
+-export([t/1, t/3, t/4, ref/1]).
+-export([conf_get/2, conf_get/3, keys/2, filter/1]).
+
+%% Static apps which merge their configs into the merged emqx.conf
+%% The list can not be made a dynamic read at run-time as it is used
+%% by nodetool to generate app.<time>.config before EMQ X is started
+-define(MERGED_CONFIGS,
+        [ emqx_schema
+        , emqx_data_bridge_schema
+        , emqx_retainer_schema
+        , emqx_statsd_schema
+        , emqx_authn_schema
+        , emqx_authz_schema
+        , emqx_bridge_mqtt_schema
+        , emqx_modules_schema
+        , emqx_management_schema
+        , emqx_dashboard_schema
+        , emqx_gateway_schema
+        , emqx_prometheus_schema
+        ]).
+
+%% TODO: add a test case to ensure the list elements are unique
+structs() ->
+    ["cluster", "node", "rpc", "log"]
+    ++ lists:flatmap(fun(Mod) -> Mod:structs() end, ?MERGED_CONFIGS).
+
+fields("cluster") ->
+    [ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
+    , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]),
+        undefined, manual)}
+    , {"autoclean", t(emqx_schema:duration(), "ekka.cluster_autoclean", "5m")}
+    , {"autoheal", t(boolean(), "ekka.cluster_autoheal", true)}
+    , {"static", ref("static")}
+    , {"mcast", ref("mcast")}
+    , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)}
+    , {"dns", ref("dns")}
+    , {"etcd", ref("etcd")}
+    , {"k8s", ref("k8s")}
+    , {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)}
+    , {"rlog", ref("rlog")}
+    ];
+
+fields("static") ->
+    [ {"seeds", t(hoconsc:array(string()), undefined, [])}];
+
+fields("mcast") ->
+    [ {"addr", t(string(), undefined, "239.192.0.1")}
+    , {"ports", t(hoconsc:array(integer()), undefined, [4369, 4370])}
+    , {"iface", t(string(), undefined, "0.0.0.0")}
+    , {"ttl", t(range(0, 255), undefined, 255)}
+    , {"loop", t(boolean(), undefined, true)}
+    , {"sndbuf", t(emqx_schema:bytesize(), undefined, "16KB")}
+    , {"recbuf", t(emqx_schema:bytesize(), undefined, "16KB")}
+    , {"buffer", t(emqx_schema:bytesize(), undefined, "32KB")}
+    ];
+
+fields("dns") ->
+    [ {"name", t(string(), undefined, "localhost")}
+    , {"app", t(string(), undefined, "emqx")}];
+
+fields("etcd") ->
+    [ {"server", t(emqx_schema:comma_separated_list())}
+    , {"prefix", t(string(), undefined, "emqxcl")}
+    , {"node_ttl", t(emqx_schema:duration(), undefined, "1m")}
+    , {"ssl", ref("etcd_ssl")}
+    ];
+
+fields("etcd_ssl") ->
+    emqx_schema:ssl(#{});
+
+fields("k8s") ->
+    [ {"apiserver", t(string())}
+    , {"service_name", t(string(), undefined, "emqx")}
+    , {"address_type", t(union([ip, dns, hostname]))}
+    , {"app_name", t(string(), undefined, "emqx")}
+    , {"namespace", t(string(), undefined, "default")}
+    , {"suffix", t(string(), undefined, "pod.local")}
+    ];
+
+fields("rlog") ->
+    [ {"role", t(union([core, replicant]), "ekka.node_role", core)}
+    , {"core_nodes", t(emqx_schema:comma_separated_atoms(), "ekka.core_nodes", [])}
+    ];
+
+fields("node") ->
+    [ {"name", hoconsc:t(string(), #{default => "emqx@127.0.0.1",
+                                     override_env => "EMQX_NODE_NAME"
+                                    })}
+    , {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie",
+                                       default => "emqxsecretcookie",
+                                       sensitive => true,
+                                       override_env => "EMQX_NODE_COOKIE"
+                                      })}
+    , {"data_dir", t(string(), undefined, undefined)}
+    , {"config_files", t(list(string()), "emqx.config_files",
+        [ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"])
+        ])}
+    , {"global_gc_interval", t(emqx_schema:duration(), undefined, "15m")}
+    , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
+    , {"dist_net_ticktime", t(emqx_schema:duration(), "vm_args.-kernel net_ticktime", "2m")}
+    , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
+    , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)}
+    , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)}
+    ];
+
+fields("rpc") ->
+    [ {"mode", t(union(sync, async), undefined, async)}
+    , {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)}
+    , {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)}
+    , {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)}
+    , {"tcp_client_num", t(range(1, 256), undefined, 1)}
+    , {"connect_timeout", t(emqx_schema:duration(), "gen_rpc.connect_timeout", "5s")}
+    , {"send_timeout", t(emqx_schema:duration(), "gen_rpc.send_timeout", "5s")}
+    , {"authentication_timeout", t(emqx_schema:duration(), "gen_rpc.authentication_timeout", "5s")}
+    , {"call_receive_timeout", t(emqx_schema:duration(), "gen_rpc.call_receive_timeout", "15s")}
+    , {"socket_keepalive_idle", t(emqx_schema:duration_s(), "gen_rpc.socket_keepalive_idle", "7200s")}
+    , {"socket_keepalive_interval", t(emqx_schema:duration_s(), "gen_rpc.socket_keepalive_interval", "75s")}
+    , {"socket_keepalive_count", t(integer(), "gen_rpc.socket_keepalive_count", 9)}
+    , {"socket_sndbuf", t(emqx_schema:bytesize(), "gen_rpc.socket_sndbuf", "1MB")}
+    , {"socket_recbuf", t(emqx_schema:bytesize(), "gen_rpc.socket_recbuf", "1MB")}
+    , {"socket_buffer", t(emqx_schema:bytesize(), "gen_rpc.socket_buffer", "1MB")}
+    ];
+
+fields("log") ->
+    [ {"primary_level", t(log_level(), undefined, warning)}
+    , {"console_handler", ref("console_handler")}
+    , {"file_handlers", ref("file_handlers")}
+    , {"time_offset", t(string(), undefined, "system")}
+    , {"chars_limit", maybe_infinity(range(1, inf))}
+    , {"supervisor_reports", t(union([error, progress]), undefined, error)}
+    , {"max_depth", t(union([infinity, integer()]),
+                      "kernel.error_logger_format_depth", 80)}
+    , {"formatter", t(union([text, json]), undefined, text)}
+    , {"single_line", t(boolean(), undefined, true)}
+    , {"sync_mode_qlen", t(integer(), undefined, 100)}
+    , {"drop_mode_qlen", t(integer(), undefined, 3000)}
+    , {"flush_qlen", t(integer(), undefined, 8000)}
+    , {"overload_kill", ref("log_overload_kill")}
+    , {"burst_limit", ref("log_burst_limit")}
+    , {"error_logger", t(atom(), "kernel.error_logger", silent)}
+    ];
+
+fields("console_handler") ->
+    [ {"enable", t(boolean(), undefined, false)}
+    , {"level", t(log_level(), undefined, warning)}
+    ];
+
+fields("file_handlers") ->
+    [ {"$name", ref("log_file_handler")}
+    ];
+
+fields("log_file_handler") ->
+    [ {"level", t(log_level(), undefined, warning)}
+    , {"file", t(file(), undefined, undefined)}
+    , {"rotation", ref("log_rotation")}
+    , {"max_size", maybe_infinity(emqx_schema:bytesize(), "10MB")}
+    ];
+
+fields("log_rotation") ->
+    [ {"enable", t(boolean(), undefined, true)}
+    , {"count", t(range(1, 2048), undefined, 10)}
+    ];
+
+fields("log_overload_kill") ->
+    [ {"enable", t(boolean(), undefined, true)}
+    , {"mem_size", t(emqx_schema:bytesize(), undefined, "30MB")}
+    , {"qlen", t(integer(), undefined, 20000)}
+    , {"restart_after", t(union(emqx_schema:duration(), infinity), undefined, "5s")}
+    ];
+
+fields("log_burst_limit") ->
+    [ {"enable", t(boolean(), undefined, true)}
+    , {"max_count", t(integer(), undefined, 10000)}
+    , {"window_time", t(emqx_schema:duration(), undefined, "1s")}
+    ];
+fields(Name) ->
+    find_field(Name, ?MERGED_CONFIGS).
+
+find_field(Name, []) ->
+    error({unknown_config_struct_field, Name});
+find_field(Name, [SchemaModule | Rest]) ->
+    case lists:member(Name, SchemaModule:structs()) of
+        true -> SchemaModule:fields(Name);
+        false -> find_field(Name, Rest)
+    end.
+
+translations() -> ["ekka", "kernel", "emqx"].
+
+translation("ekka") ->
+    [ {"cluster_discovery", fun tr_cluster__discovery/1}];
+translation("kernel") ->
+    [ {"logger_level", fun tr_logger_level/1}
+    , {"logger", fun tr_logger/1}];
+translation("emqx") ->
+    [ {"config_files", fun tr_config_files/1}
+    ].
+
+tr_config_files(Conf) ->
+    case conf_get("emqx.config_files", Conf) of
+        [_ | _] = Files ->
+            Files;
+        _ ->
+            case os:getenv("RUNNER_ETC_DIR") of
+                false ->
+                    [filename:join([code:lib_dir(emqx), "etc", "emqx.conf"])];
+                Dir ->
+                    [filename:join([Dir, "emqx.conf"])]
+            end
+    end.
+
+tr_cluster__discovery(Conf) ->
+    Strategy = conf_get("cluster.discovery_strategy", Conf),
+    {Strategy, filter(options(Strategy, Conf))}.
+
+tr_logger_level(Conf) -> conf_get("log.primary_level", Conf).
+
+tr_logger(Conf) ->
+    CharsLimit = case conf_get("log.chars_limit", Conf) of
+                     infinity -> unlimited;
+                     V -> V
+                 end,
+    SingleLine = conf_get("log.single_line", Conf),
+    FmtName = conf_get("log.formatter", Conf),
+    Formatter = formatter(FmtName, CharsLimit, SingleLine),
+    BasicConf = #{
+        sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf),
+        drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf),
+        flush_qlen => conf_get("log.flush_qlen", Conf),
+        overload_kill_enable => conf_get("log.overload_kill.enable", Conf),
+        overload_kill_qlen => conf_get("log.overload_kill.qlen", Conf),
+        overload_kill_mem_size => conf_get("log.overload_kill.mem_size", Conf),
+        overload_kill_restart_after => conf_get("log.overload_kill.restart_after", Conf),
+        burst_limit_enable => conf_get("log.burst_limit.enable", Conf),
+        burst_limit_max_count => conf_get("log.burst_limit.max_count", Conf),
+        burst_limit_window_time => conf_get("log.burst_limit.window_time", Conf)
+    },
+    Filters = case conf_get("log.supervisor_reports", Conf) of
+                  error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}];
+                  progress -> []
+              end,
+    %% For the default logger that outputs to console
+    ConsoleHandler =
+        case conf_get("log.console_handler.enable", Conf) of
+            true ->
+                [{handler, console, logger_std_h, #{
+                    level => conf_get("log.console_handler.level", Conf),
+                    config => BasicConf#{type => standard_io},
+                    formatter => Formatter,
+                    filters => Filters
+                }}];
+            false -> []
+        end,
+    %% For the file logger
+    FileHandlers =
+        [{handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{
+                level => conf_get("level", SubConf),
+                config => BasicConf#{
+                    type => case conf_get("rotation.enable", SubConf) of
+                                true -> wrap;
+                                _ -> halt
+                            end,
+                    file => conf_get("file", SubConf),
+                    max_no_files => conf_get("rotation.count", SubConf),
+                    max_no_bytes => conf_get("max_size", SubConf)
+                },
+                formatter => Formatter,
+                filters => Filters,
+                filesync_repeat_interval => no_repeat
+            }}
+        || {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf, #{}))],
+
+    [{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers.
+
+%% helpers
+formatter(json, CharsLimit, SingleLine) ->
+    {emqx_logger_jsonfmt,
+        #{chars_limit => CharsLimit,
+            single_line => SingleLine
+        }};
+formatter(text, CharsLimit, SingleLine) ->
+    {emqx_logger_textfmt,
+        #{template =>
+        [time," [",level,"] ",
+            {clientid,
+                [{peername,
+                    [clientid,"@",peername," "],
+                    [clientid, " "]}],
+                [{peername,
+                    [peername," "],
+                    []}]},
+            msg,"\n"],
+            chars_limit => CharsLimit,
+            single_line => SingleLine
+        }}.
+
+%% utils
+-spec(conf_get(string() | [string()], hocon:config()) -> term()).
+conf_get(Key, Conf) ->
+    V = hocon_schema:get_value(Key, Conf),
+    case is_binary(V) of
+        true ->
+            binary_to_list(V);
+        false ->
+            V
+    end.
+
+conf_get(Key, Conf, Default) ->
+    V = hocon_schema:get_value(Key, Conf, Default),
+    case is_binary(V) of
+        true ->
+            binary_to_list(V);
+        false ->
+            V
+    end.
+
+filter(Opts) ->
+    [{K, V} || {K, V} <- Opts, V =/= undefined].
+
+%% @private return a list of keys in a parent field
+-spec(keys(string(), hocon:config()) -> [string()]).
+keys(Parent, Conf) ->
+    [binary_to_list(B) || B <- maps:keys(conf_get(Parent, Conf, #{}))].
+
+%% types
+
+t(Type) -> hoconsc:t(Type).
+
+t(Type, Mapping, Default) ->
+    hoconsc:t(Type, #{mapping => Mapping, default => Default}).
+
+t(Type, Mapping, Default, OverrideEnv) ->
+    hoconsc:t(Type, #{ mapping => Mapping
+                     , default => Default
+                     , override_env => OverrideEnv
+                     }).
+
+ref(Field) -> hoconsc:t(hoconsc:ref(Field)).
+
+maybe_infinity(T) ->
+    maybe_sth(infinity, T, infinity).
+
+maybe_infinity(T, Default) ->
+    maybe_sth(infinity, T, Default).
+
+maybe_sth(What, Type, Default) ->
+    t(union([What, Type]), undefined, Default).
+
+options(static, Conf) ->
+    [{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}];
+options(mcast, Conf) ->
+    {ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)),
+    {ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)),
+    Ports = conf_get("cluster.mcast.ports", Conf),
+    [{addr, Addr}, {ports, Ports}, {iface, Iface},
+     {ttl, conf_get("cluster.mcast.ttl", Conf, 1)},
+     {loop, conf_get("cluster.mcast.loop", Conf, true)}];
+options(dns, Conf) ->
+    [{name, conf_get("cluster.dns.name", Conf)},
+     {app, conf_get("cluster.dns.app", Conf)}];
+options(etcd, Conf) ->
+    Namespace = "cluster.etcd.ssl",
+    SslOpts = fun(C) ->
+        Options = keys(Namespace, C),
+        lists:map(fun(Key) -> {to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end,
+    [{server, conf_get("cluster.etcd.server", Conf)},
+     {prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")},
+     {node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)},
+     {ssl_options, filter(SslOpts(Conf))}];
+options(k8s, Conf) ->
+    [{apiserver, conf_get("cluster.k8s.apiserver", Conf)},
+     {service_name, conf_get("cluster.k8s.service_name", Conf)},
+     {address_type, conf_get("cluster.k8s.address_type", Conf, ip)},
+     {app_name, conf_get("cluster.k8s.app_name", Conf)},
+     {namespace, conf_get("cluster.k8s.namespace", Conf)},
+     {suffix, conf_get("cluster.k8s.suffix", Conf, "")}];
+options(manual, _Conf) ->
+    [].
+
+to_atom(Atom) when is_atom(Atom) ->
+    Atom;
+to_atom(Str) when is_list(Str) ->
+    list_to_atom(Str);
+to_atom(Bin) when is_binary(Bin) ->
+    binary_to_atom(Bin, utf8).

+ 5 - 4
bin/emqx

@@ -16,6 +16,7 @@ ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
 RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME"
 CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}"
 REL_DIR="$RUNNER_ROOT_DIR/releases/$REL_VSN"
+SCHEMA_MOD=emqx_machine_schema
 
 WHOAMI=$(whoami)
 
@@ -240,7 +241,7 @@ generate_config() {
     ## ths command populates two files: app.<time>.config and vm.<time>.args
     ## disable SC2086 to allow EMQX_LICENSE_CONF_OPTION to split
     # shellcheck disable=SC2086
-    call_hocon -v -t "$NOW_TIME" -s emqx_schema -c "$RUNNER_ETC_DIR"/emqx.conf $EMQX_LICENSE_CONF_OPTION -d "$RUNNER_DATA_DIR"/configs generate
+    call_hocon -v -t "$NOW_TIME" -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf $EMQX_LICENSE_CONF_OPTION -d "$RUNNER_DATA_DIR"/configs generate
 
     ## filenames are per-hocon convention
     local CONF_FILE="$CONFIGS_DIR/app.$NOW_TIME.config"
@@ -331,7 +332,7 @@ if [ -z "$NAME" ]; then
         NAME="$(grep -E '^-s?name' "$LATEST_VM_ARGS" | awk '{print $2}')"
     else
         # for boot commands, inspect emqx.conf for node name
-        NAME="$(call_hocon -s emqx_schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.name | tr -d \")"
+        NAME="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get node.name | tr -d \")"
     fi
 fi
 
@@ -353,7 +354,7 @@ PIPE_DIR="${PIPE_DIR:-/$RUNNER_DATA_DIR/${WHOAMI}_erl_pipes/$NAME/}"
 COOKIE="${EMQX_NODE_COOKIE:-}"
 if [ -z "$COOKIE" ]; then
     if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
-        COOKIE="$(call_hocon -s emqx_schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie | tr -d \")"
+        COOKIE="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie | tr -d \")"
     else
         # shellcheck disable=SC2012,SC2086
         LATEST_VM_ARGS="$(ls -t $CONFIGS_DIR/vm.*.args | head -1)"
@@ -370,7 +371,7 @@ if [ -z "$COOKIE" ]; then
 fi
 
 # Support for IPv6 Dist. See: https://github.com/emqtt/emqttd/issues/1460
-PROTO_DIST="$(call_hocon -s emqx_schema -c "$RUNNER_ETC_DIR"/emqx.conf get cluster.proto_dist | tr -d \")"
+PROTO_DIST="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get cluster.proto_dist | tr -d \")"
 if [ -z "$PROTO_DIST" ]; then
     PROTO_DIST_ARG=""
 else

+ 0 - 22
extension_schemas.config

@@ -1,22 +0,0 @@
-%% -*-: erlang -*-
-%% This file lists all the hocon schemas other than the `emqx_schema`.
-%% Each element is a two-element tuple where the first element is the field name,
-%% and the second element is the schema module name.
-%%
-[ {"emqx_data_bridge", emqx_data_bridge_schema}
-, {"emqx_retainer", emqx_retainer_schema}
-, {"authentication", emqx_authn_schema}
-, {"authorization", emqx_authz_schema}
-, {"emqx_bridge_mqtt", emqx_bridge_mqtt_schema}
-, {"emqx_management", emqx_management_schema}
-, {"emqx_dashboard", emqx_dashboard_schema}
-, {"gateway", emqx_gateway_schema}
-, {"prometheus", emqx_prometheus_schema}
-, {"statsd", emqx_statsd_schema}
-, {"delayed", emqx_modules_schema}
-, {"recon", emqx_modules_schema}
-, {"telemetry", emqx_modules_schema}
-, {"event_message", emqx_modules_schema}
-, {"rewrite", emqx_modules_schema}
-, {"topic_metrics", emqx_modules_schema}
-].

+ 1 - 6
rebar.config.erl

@@ -79,10 +79,6 @@ is_cover_enabled() ->
 is_enterprise() ->
     filelib:is_regular("EMQX_ENTERPRISE").
 
-emqx_ext_schemas() ->
-    {ok, Schemas} = file:script("extension_schemas.config"),
-    Schemas.
-
 is_quicer_supported() ->
     not (false =/= os:getenv("BUILD_WITHOUT_QUIC") orelse
          is_win32() orelse is_centos_6()
@@ -144,7 +140,6 @@ common_compile_opts() ->
     ] ++
     [{d, 'EMQX_DEP_APPS', AppNames -- [emqx]}] ++
     [{d, 'EMQX_ENTERPRISE'} || is_enterprise()] ++
-    [{d, 'EMQX_EXT_SCHEMAS', emqx_ext_schemas()}] ++
     [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1" ].
 
 prod_compile_opts() ->
@@ -383,7 +378,7 @@ emqx_etc_overlay(edge) ->
     ].
 
 emqx_etc_overlay_common() ->
-    [ {"{{base_dir}}/lib/emqx/etc/emqx.conf.all", "etc/emqx.conf"}
+    [ {"{{base_dir}}/lib/emqx_machine/etc/emqx.conf.all", "etc/emqx.conf"}
     , {"{{base_dir}}/lib/emqx/etc/ssl_dist.conf", "etc/ssl_dist.conf"}
     ].
 

+ 3 - 3
scripts/merge-config.escript

@@ -11,9 +11,9 @@
 -mode(compile).
 
 main(_) ->
-    BaseConf = "apps/emqx/etc/emqx.conf",
+    BaseConf = "apps/emqx_machine/etc/emqx_machine.conf",
     {ok, Bin} = file:read_file(BaseConf),
-    Apps = filelib:wildcard("emqx_*", "apps/"),
+    Apps = filelib:wildcard("*", "apps/"),
     Conf = lists:foldl(fun(App, Acc) ->
         Filename = filename:join([apps, App, "etc", App]) ++ ".conf",
         case filelib:is_regular(Filename) of
@@ -23,4 +23,4 @@ main(_) ->
             false -> Acc
         end
     end, Bin, Apps),
-    ok = file:write_file("apps/emqx/etc/emqx.conf.all", Conf).
+    ok = file:write_file("apps/emqx_machine/etc/emqx.conf.all", Conf).