Просмотр исходного кода

fix(rocketmq): expose the driver parameter `sync_timeout` into the RocketMQ bridge configuration

firest 2 лет назад
Родитель
Сommit
7704995279

+ 11 - 4
lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl

@@ -44,6 +44,11 @@ fields(config) ->
                 binary(),
                 #{default => <<"TopicTest">>, desc => ?DESC(topic)}
             )},
+        {sync_timeout,
+            mk(
+                emqx_schema:duration(),
+                #{default => <<"3s">>, desc => ?DESC(sync_timeout)}
+            )},
         {refresh_interval,
             mk(
                 emqx_schema:duration(),
@@ -76,7 +81,7 @@ add_default_fn(OrigFn, Default) ->
     end.
 
 servers() ->
-    Meta = #{desc => ?DESC("server")},
+    Meta = #{desc => ?DESC("servers")},
     emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
 
 relational_fields() ->
@@ -97,7 +102,7 @@ is_buffer_supported() -> false.
 
 on_start(
     InstanceId,
-    #{servers := BinServers, topic := Topic} = Config1
+    #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1
 ) ->
     ?SLOG(info, #{
         msg => "starting_rocketmq_connector",
@@ -116,8 +121,9 @@ on_start(
     ProducersMapPID = create_producers_map(ClientId),
     State = #{
         client_id => ClientId,
+        topic => Topic,
         topic_tokens => TopicTks,
-        config => Config,
+        sync_timeout => SyncTimeout,
         templates => Templates,
         producers_map_pid => ProducersMapPID,
         producers_opts => ProducerOpts
@@ -173,9 +179,10 @@ do_query(
     #{
         templates := Templates,
         client_id := ClientId,
+        topic := RawTopic,
         topic_tokens := TopicTks,
         producers_opts := ProducerOpts,
-        config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}}
+        sync_timeout := RequestTimeout
     } = State
 ) ->
     ?TRACE(

+ 12 - 1
rel/i18n/emqx_ee_connector_rocketmq.hocon

@@ -1,6 +1,6 @@
 emqx_ee_connector_rocketmq {
 
-    server {
+    servers {
         desc {
           en: """The IPv4 or IPv6 address or the hostname to connect to.<br/>
 A host entry has the following form: `Host[:Port]`.<br/>
@@ -26,6 +26,17 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified."""
             }
     }
 
+    sync_timeout {
+        desc {
+          en: """Timeout of RocketMQ driver synchronous call."""
+          zh: """RocketMQ 驱动同步调用的超时时间。"""
+        }
+        label: {
+              en: "Sync Timeout"
+              zh: "同步调用超时时间"
+            }
+    }
+
     refresh_interval {
         desc {
           en: """RocketMQ Topic Route Refresh Interval."""