|
|
@@ -75,7 +75,9 @@
|
|
|
|
|
|
-define(CONNECTOR_TYPE, iotdb).
|
|
|
-define(IOTDB_PING_PATH, <<"ping">>).
|
|
|
--define(DEFAULT_THRIFT_TIMEOUT, timer:seconds(10)).
|
|
|
+
|
|
|
+%% timer:seconds(10)).
|
|
|
+-define(DEFAULT_THRIFT_TIMEOUT, 10000).
|
|
|
|
|
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
|
|
|
|
|
@@ -336,9 +338,10 @@ on_start(
|
|
|
|
|
|
DriverOpts = maps:merge(
|
|
|
#{
|
|
|
- connect_timeout => ?DEFAULT_THRIFT_TIMEOUT, recv_timeout => ?DEFAULT_THRIFT_TIMEOUT
|
|
|
+ connect_timeout => ?DEFAULT_THRIFT_TIMEOUT,
|
|
|
+ recv_timeout => ?DEFAULT_THRIFT_TIMEOUT
|
|
|
},
|
|
|
- maps:with([connect_timeout, recv_timeout], Config)
|
|
|
+ normalize_thrift_timeout(maps:with([connect_timeout, recv_timeout], Config))
|
|
|
),
|
|
|
|
|
|
DriverOpts1 =
|
|
|
@@ -1103,3 +1106,25 @@ do_on_query(InstanceId, ChannelId, Data, #{driver := restapi} = State) ->
|
|
|
emqx_bridge_http_connector:on_query(InstanceId, {ChannelId, Data}, State);
|
|
|
do_on_query(InstanceId, _ChannelId, Data, #{driver := thrift} = _State) ->
|
|
|
ecpool:pick_and_do(InstanceId, {iotdb, insert_tablet, [Data]}, no_handover).
|
|
|
+
|
|
|
+%% 1. The default timeout in Thrift is `infinity`, but it may cause stuck
|
|
|
+%% 2. The schema of `timeout` accepts a zero value, but the Thrift driver not
|
|
|
+%% 3. If the timeout is too small, the driver may not work properly
|
|
|
+normalize_thrift_timeout(Timeouts) ->
|
|
|
+ maps:map(
|
|
|
+ fun
|
|
|
+ (_K, V) when V >= ?DEFAULT_THRIFT_TIMEOUT ->
|
|
|
+ V;
|
|
|
+ (K, V) ->
|
|
|
+ ?SLOG(warning, #{
|
|
|
+ msg => "iotdb_thrift_timeout_normalized",
|
|
|
+ reason => "The timeout is too small for the Thrift driver to work",
|
|
|
+ timeout => K,
|
|
|
+ from => V,
|
|
|
+ to => ?DEFAULT_THRIFT_TIMEOUT,
|
|
|
+ unit => millisecond
|
|
|
+ }),
|
|
|
+ ?DEFAULT_THRIFT_TIMEOUT
|
|
|
+ end,
|
|
|
+ Timeouts
|
|
|
+ ).
|