|
@@ -75,6 +75,7 @@
|
|
|
|
|
|
|
|
-define(CONNECTOR_TYPE, iotdb).
|
|
-define(CONNECTOR_TYPE, iotdb).
|
|
|
-define(IOTDB_PING_PATH, <<"ping">>).
|
|
-define(IOTDB_PING_PATH, <<"ping">>).
|
|
|
|
|
+-define(DEFAULT_THRIFT_TIMEOUT, timer:seconds(10)).
|
|
|
|
|
|
|
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
|
|
|
|
|
|
@@ -187,6 +188,22 @@ fields("config_thrift") ->
|
|
|
default => 8,
|
|
default => 8,
|
|
|
desc => ?DESC("pool_size")
|
|
desc => ?DESC("pool_size")
|
|
|
}
|
|
}
|
|
|
|
|
+ )},
|
|
|
|
|
+ {connect_timeout,
|
|
|
|
|
+ mk(
|
|
|
|
|
+ emqx_schema:timeout_duration_ms(),
|
|
|
|
|
+ #{
|
|
|
|
|
+ default => <<"10s">>,
|
|
|
|
|
+ desc => ?DESC("connect_timeout")
|
|
|
|
|
+ }
|
|
|
|
|
+ )},
|
|
|
|
|
+ {recv_timeout,
|
|
|
|
|
+ mk(
|
|
|
|
|
+ emqx_schema:timeout_duration_ms(),
|
|
|
|
|
+ #{
|
|
|
|
|
+ default => <<"10s">>,
|
|
|
|
|
+ desc => ?DESC("recv_timeout")
|
|
|
|
|
+ }
|
|
|
)}
|
|
)}
|
|
|
] ++ fields(authentication) ++ emqx_connector_schema_lib:ssl_fields() ++
|
|
] ++ fields(authentication) ++ emqx_connector_schema_lib:ssl_fields() ++
|
|
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
|
@@ -317,22 +334,29 @@ on_start(
|
|
|
|
|
|
|
|
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?THRIFT_HOST_OPTIONS),
|
|
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?THRIFT_HOST_OPTIONS),
|
|
|
|
|
|
|
|
- TransportOpts =
|
|
|
|
|
|
|
+ DriverOpts = maps:merge(
|
|
|
|
|
+ #{
|
|
|
|
|
+ connect_timeout => ?DEFAULT_THRIFT_TIMEOUT, recv_timeout => ?DEFAULT_THRIFT_TIMEOUT
|
|
|
|
|
+ },
|
|
|
|
|
+ maps:with([connect_timeout, recv_timeout], Config)
|
|
|
|
|
+ ),
|
|
|
|
|
+
|
|
|
|
|
+ DriverOpts1 =
|
|
|
case maps:get(enable, SSL) of
|
|
case maps:get(enable, SSL) of
|
|
|
true ->
|
|
true ->
|
|
|
- #{
|
|
|
|
|
|
|
+ DriverOpts#{
|
|
|
ssltransport => true,
|
|
ssltransport => true,
|
|
|
ssloptions => emqx_tls_lib:to_client_opts(SSL)
|
|
ssloptions => emqx_tls_lib:to_client_opts(SSL)
|
|
|
};
|
|
};
|
|
|
false ->
|
|
false ->
|
|
|
- #{}
|
|
|
|
|
|
|
+ DriverOpts
|
|
|
end,
|
|
end,
|
|
|
|
|
|
|
|
IoTDBOpts = IoTDBOpts0#{
|
|
IoTDBOpts = IoTDBOpts0#{
|
|
|
version => Version,
|
|
version => Version,
|
|
|
host => Host,
|
|
host => Host,
|
|
|
port => Port,
|
|
port => Port,
|
|
|
- options => TransportOpts
|
|
|
|
|
|
|
+ options => DriverOpts1
|
|
|
},
|
|
},
|
|
|
|
|
|
|
|
Options = [
|
|
Options = [
|