Browse Source

fix: get grpc_timeout from hstreamdb connector config failed

zhongwencool 1 year ago
parent
commit
aa7def425d

+ 9 - 0
apps/emqx_bridge_hstreamdb/include/emqx_bridge_hstreamdb.hrl

@@ -3,3 +3,12 @@
 %%--------------------------------------------------------------------
 
 -define(HSTREAMDB_DEFAULT_PORT, 6570).
+
+-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
+-define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000).
+-define(DEFAULT_MAX_BATCHES, 500).
+-define(DEFAULT_BATCH_INTERVAL, 500).
+-define(DEFAULT_BATCH_INTERVAL_RAW, <<"500ms">>).
+-define(DEFAULT_AGG_POOL_SIZE, 8).
+-define(DEFAULT_WRITER_POOL_SIZE, 8).
+-define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>).

+ 11 - 8
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl

@@ -5,6 +5,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
+-include("emqx_bridge_hstreamdb.hrl").
 
 -import(hoconsc, [mk/2, enum/1]).
 
@@ -23,8 +24,6 @@
 
 -define(CONNECTOR_TYPE, hstreamdb).
 -define(ACTION_TYPE, ?CONNECTOR_TYPE).
--define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
--define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>).
 
 %% -------------------------------------------------------------------------------------------------
 %% api
@@ -113,8 +112,8 @@ action_values() ->
             <<"partition_key">> => <<"hej">>,
             <<"record_template">> => <<"${payload}">>,
             <<"stream">> => <<"mqtt_message">>,
-            <<"aggregation_pool_size">> => 8,
-            <<"writer_pool_size">> => 8
+            <<"aggregation_pool_size">> => ?DEFAULT_AGG_POOL_SIZE,
+            <<"writer_pool_size">> => ?DEFAULT_WRITER_POOL_SIZE
         }
     }.
 
@@ -174,13 +173,17 @@ fields(action_parameters) ->
         {record_template,
             mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})},
         {aggregation_pool_size,
-            mk(integer(), #{default => 8, desc => ?DESC("aggregation_pool_size")})},
-        {max_batches, mk(integer(), #{default => 500, desc => ?DESC("max_batches")})},
-        {writer_pool_size, mk(integer(), #{default => 8, desc => ?DESC("writer_pool_size")})},
+            mk(integer(), #{
+                default => ?DEFAULT_AGG_POOL_SIZE, desc => ?DESC("aggregation_pool_size")
+            })},
+        {max_batches,
+            mk(integer(), #{default => ?DEFAULT_MAX_BATCHES, desc => ?DESC("max_batches")})},
+        {writer_pool_size,
+            mk(integer(), #{default => ?DEFAULT_WRITER_POOL_SIZE, desc => ?DESC("writer_pool_size")})},
         {batch_size, mk(integer(), #{default => 100, desc => ?DESC("batch_size")})},
         {batch_interval,
             mk(emqx_schema:timeout_duration_ms(), #{
-                default => <<"500ms">>, desc => ?DESC("batch_interval")
+                default => ?DEFAULT_BATCH_INTERVAL_RAW, desc => ?DESC("batch_interval")
             })}
     ];
 fields(connector_fields) ->

+ 3 - 12
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl

@@ -8,6 +8,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include("emqx_bridge_hstreamdb.hrl").
 
 -import(hoconsc, [mk/2]).
 
@@ -41,14 +42,6 @@
 %% Allocatable resources
 -define(hstreamdb_client, hstreamdb_client).
 
--define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)).
--define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
--define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000).
--define(DEFAULT_MAX_BATCHES, 500).
--define(DEFAULT_BATCH_INTERVAL, 500).
--define(DEFAULT_AGG_POOL_SIZE, 8).
--define(DEFAULT_WRITER_POOL_SIZE, 8).
-
 %% -------------------------------------------------------------------------------------------------
 %% resource callback
 callback_mode() -> always_sync.
@@ -243,11 +236,9 @@ do_on_start(InstId, Config) ->
             {error, {connect_failed, Error}}
     end.
 
-client_options(Config = #{url := ServerURL, ssl := SSL}) ->
-    GRPCTimeout = maps:get(<<"grpc_timeout">>, Config, ?DEFAULT_GRPC_TIMEOUT),
-    EnableSSL = maps:get(enable, SSL),
+client_options(#{url := ServerURL, ssl := SSL, grpc_timeout := GRPCTimeout}) ->
     RpcOpts =
-        case EnableSSL of
+        case maps:get(enable, SSL) of
             false ->
                 #{pool_size => 1};
             true ->