|
|
@@ -17,6 +17,7 @@
|
|
|
-module(emqx_connector_mqtt_schema).
|
|
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
|
+-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
|
|
-behaviour(hocon_schema).
|
|
|
|
|
|
@@ -44,24 +45,12 @@ fields("connector") ->
|
|
|
[ {mode,
|
|
|
sc(hoconsc:enum([cluster_shareload]),
|
|
|
#{ default => cluster_shareload
|
|
|
- , desc => "
|
|
|
-The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'<br/>
|
|
|
-
|
|
|
-- cluster_singleton: create a unique MQTT connection within the emqx cluster.<br/>
|
|
|
-In 'cluster_singleton' node, all messages toward the remote broker go through the same
|
|
|
-MQTT connection.<br/>
|
|
|
-- cluster_shareload: create an MQTT connection on each node in the emqx cluster.<br/>
|
|
|
-In 'cluster_shareload' mode, the incoming load from the remote broker is shared by
|
|
|
-using shared subscription.<br/>
|
|
|
-Note that the 'clientid' is suffixed by the node name, this is to avoid
|
|
|
-clientid conflicts between different nodes. And we can only use shared subscription
|
|
|
-topic filters for 'remote_topic' of ingress connections.
|
|
|
-"
|
|
|
+ , desc => ?DESC("mode")
|
|
|
})}
|
|
|
, {server,
|
|
|
sc(emqx_schema:ip_port(),
|
|
|
#{ default => "127.0.0.1:1883"
|
|
|
- , desc => "The host and port of the remote MQTT broker"
|
|
|
+ , desc => ?DESC("server")
|
|
|
})}
|
|
|
, {reconnect_interval, mk_duration(
|
|
|
"Reconnect interval. Delay for the MQTT bridge to retry establishing the connection "
|
|
|
@@ -70,22 +59,22 @@ topic filters for 'remote_topic' of ingress connections.
|
|
|
, {proto_ver,
|
|
|
sc(hoconsc:enum([v3, v4, v5]),
|
|
|
#{ default => v4
|
|
|
- , desc => "The MQTT protocol version"
|
|
|
+ , desc => ?DESC("proto_ver")
|
|
|
})}
|
|
|
, {username,
|
|
|
sc(binary(),
|
|
|
#{ default => "emqx"
|
|
|
- , desc => "The username of the MQTT protocol"
|
|
|
+ , desc => ?DESC("username")
|
|
|
})}
|
|
|
, {password,
|
|
|
sc(binary(),
|
|
|
#{ default => "emqx"
|
|
|
- , desc => "The password of the MQTT protocol"
|
|
|
+ , desc => ?DESC("password")
|
|
|
})}
|
|
|
, {clean_start,
|
|
|
sc(boolean(),
|
|
|
#{ default => true
|
|
|
- , desc => "The clean-start or the clean-session of the MQTT protocol"
|
|
|
+ , desc => ?DESC("clean_start")
|
|
|
})}
|
|
|
, {keepalive, mk_duration("MQTT Keepalive.", #{default => "300s"})}
|
|
|
, {retry_interval, mk_duration(
|
|
|
@@ -95,7 +84,7 @@ topic filters for 'remote_topic' of ingress connections.
|
|
|
, {max_inflight,
|
|
|
sc(non_neg_integer(),
|
|
|
#{ default => 32
|
|
|
- , desc => "Max inflight (sent, but un-acked) messages of the MQTT protocol"
|
|
|
+ , desc => ?DESC("max_inflight")
|
|
|
})}
|
|
|
, {replayq,
|
|
|
sc(ref("replayq"), #{})}
|
|
|
@@ -107,33 +96,26 @@ fields("ingress") ->
|
|
|
sc(binary(),
|
|
|
#{ required => true
|
|
|
, validator => fun ?MODULE:non_empty_string/1
|
|
|
- , desc => "Receive messages from which topic of the remote broker"
|
|
|
+ , desc => ?DESC("ingress_remote_topic")
|
|
|
})}
|
|
|
, {remote_qos,
|
|
|
sc(qos(),
|
|
|
#{ default => 1
|
|
|
- , desc => "The QoS level to be used when subscribing to the remote broker"
|
|
|
+ , desc => ?DESC("ingress_remote_qos")
|
|
|
})}
|
|
|
, {local_topic,
|
|
|
sc(binary(),
|
|
|
#{ validator => fun ?MODULE:non_empty_string/1
|
|
|
- , desc => "
|
|
|
-Send messages to which topic of the local broker.<br/>
|
|
|
-Template with variables is allowed.
|
|
|
-"
|
|
|
+ , desc => ?DESC("ingress_local_topic")
|
|
|
})}
|
|
|
, {local_qos,
|
|
|
sc(qos(),
|
|
|
#{ default => <<"${qos}">>
|
|
|
- , desc => "
|
|
|
-The QoS of the MQTT message to be sent.<br/>
|
|
|
-Template with variables is allowed."
|
|
|
+ , desc => ?DESC("ingress_local_qos")
|
|
|
})}
|
|
|
, {hookpoint,
|
|
|
sc(binary(),
|
|
|
- #{ desc => "
|
|
|
-The hook point will be triggered when there's any message received from the remote broker.
|
|
|
-"
|
|
|
+ #{ desc => ?DESC("ingress_hookpoint")
|
|
|
})}
|
|
|
] ++ common_inout_confs();
|
|
|
|
|
|
@@ -141,52 +123,36 @@ fields("egress") ->
|
|
|
%% the message maybe sent from rules, in this case 'local_topic' is not necessary
|
|
|
[ {local_topic,
|
|
|
sc(binary(),
|
|
|
- #{ desc => "The local topic to be forwarded to the remote broker"
|
|
|
+ #{ desc => ?DESC("egress_local_topic")
|
|
|
, validator => fun ?MODULE:non_empty_string/1
|
|
|
})}
|
|
|
, {remote_topic,
|
|
|
sc(binary(),
|
|
|
#{ default => <<"${topic}">>
|
|
|
, validator => fun ?MODULE:non_empty_string/1
|
|
|
- , desc => "
|
|
|
-Forward to which topic of the remote broker.<br/>
|
|
|
-Template with variables is allowed.
|
|
|
-"
|
|
|
+ , desc => ?DESC("egress_remote_topic")
|
|
|
})}
|
|
|
, {remote_qos,
|
|
|
sc(qos(),
|
|
|
#{ default => <<"${qos}">>
|
|
|
- , desc => "
|
|
|
-The QoS of the MQTT message to be sent.<br/>
|
|
|
-Template with variables is allowed."
|
|
|
+ , desc => ?DESC("egress_remote_qos")
|
|
|
})}
|
|
|
] ++ common_inout_confs();
|
|
|
|
|
|
fields("replayq") ->
|
|
|
[ {dir,
|
|
|
sc(hoconsc:union([boolean(), string()]),
|
|
|
- #{ desc => "
|
|
|
-The dir where the replayq file saved.<br/>
|
|
|
-Set to 'false' disables the replayq feature.
|
|
|
-"
|
|
|
+ #{ desc => ?DESC("dir")
|
|
|
})}
|
|
|
, {seg_bytes,
|
|
|
sc(emqx_schema:bytesize(),
|
|
|
#{ default => "100MB"
|
|
|
- , desc => "
|
|
|
-The size in bytes of a single segment.<br/>
|
|
|
-A segment is mapping to a file in the replayq dir. If the current segment is full, a new segment
|
|
|
-(file) will be opened to write.
|
|
|
-"
|
|
|
+ , desc => ?DESC("seg_bytes")
|
|
|
})}
|
|
|
, {offload,
|
|
|
sc(boolean(),
|
|
|
#{ default => false
|
|
|
- , desc => "
|
|
|
-In offload mode, the disk queue is only used to offload queue tail segments.<br/>
|
|
|
-The messages are cached in the memory first, then it writes to the replayq files after the size of
|
|
|
-the memory cache reaches 'seg_bytes'.
|
|
|
-"
|
|
|
+ , desc => ?DESC("offload")
|
|
|
})}
|
|
|
].
|
|
|
|
|
|
@@ -235,16 +201,12 @@ common_inout_confs() ->
|
|
|
[ {retain,
|
|
|
sc(hoconsc:union([boolean(), binary()]),
|
|
|
#{ default => <<"${retain}">>
|
|
|
- , desc => "
|
|
|
-The 'retain' flag of the MQTT message to be sent.<br/>
|
|
|
-Template with variables is allowed."
|
|
|
+ , desc => ?DESC("retain")
|
|
|
})}
|
|
|
, {payload,
|
|
|
sc(binary(),
|
|
|
#{ default => <<"${payload}">>
|
|
|
- , desc => "
|
|
|
-The payload of the MQTT message to be sent.<br/>
|
|
|
-Template with variables is allowed."
|
|
|
+ , desc => ?DESC("payload")
|
|
|
})}
|
|
|
].
|
|
|
|