Kaynağa Gözat

feat(pulsar): improve authn error check time and add connect timeout

Fixes https://emqx.atlassian.net/browse/EMQX-9910
Thales Macedo Garitezi 2 yıl önce
ebeveyn
işleme
65f973044f

+ 1 - 1
apps/emqx_bridge_pulsar/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}},
+    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.2"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 8 - 0
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl

@@ -57,6 +57,14 @@ fields(config) ->
                     sensitive => true,
                     desc => ?DESC("authentication")
                 }
+            )},
+        {connect_timeout,
+            mk(
+                emqx_schema:duration_ms(),
+                #{
+                    default => <<"5s">>,
+                    desc => ?DESC("connect_timeout")
+                }
             )}
     ] ++ emqx_connector_schema_lib:ssl_fields();
 fields(producer_opts) ->

+ 27 - 2
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl

@@ -48,6 +48,7 @@
         memory_overload_protection := boolean()
     },
     compression := compression_mode(),
+    connect_timeout := emqx_schema:duration_ms(),
     max_batch_bytes := emqx_schema:bytesize(),
     message := message_template_raw(),
     pulsar_topic := binary(),
@@ -81,7 +82,9 @@ on_start(InstanceId, Config) ->
     Servers = format_servers(Servers0),
     ClientId = make_client_id(InstanceId, BridgeName),
     SSLOpts = emqx_tls_lib:to_client_opts(SSL),
+    ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
     ClientOpts = #{
+        connect_timeout => ConnectTimeout,
         ssl_opts => SSLOpts,
         conn_opts => conn_opts(Config)
     },
@@ -96,13 +99,19 @@ on_start(InstanceId, Config) ->
                 }
             );
         {error, Reason} ->
+            RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1),
             ?SLOG(error, #{
                 msg => "failed_to_start_pulsar_client",
                 instance_id => InstanceId,
                 pulsar_hosts => Servers,
-                reason => emqx_utils:redact(Reason, fun is_sensitive_key/1)
+                reason => RedactedReason
             }),
-            throw(failed_to_start_pulsar_client)
+            Message =
+                case get_error_message(RedactedReason) of
+                    {ok, Msg} -> Msg;
+                    error -> failed_to_start_pulsar_client
+                end,
+            throw(Message)
     end,
     start_producer(Config, InstanceId, ClientId, ClientOpts).
 
@@ -422,3 +431,19 @@ partition_strategy(Strategy) -> Strategy.
 
 is_sensitive_key(auth_data) -> true;
 is_sensitive_key(_) -> false.
+
+get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
+    Iter = maps:iterator(BrokerErrorMap),
+    do_get_error_message(Iter);
+get_error_message(_Error) ->
+    error.
+
+do_get_error_message(Iter) ->
+    case maps:next(Iter) of
+        {{_Broker, _Port}, #{message := Message}, _NIter} ->
+            {ok, Message};
+        {_K, _V, NIter} ->
+            do_get_error_message(NIter);
+        none ->
+            error
+    end.

+ 5 - 0
rel/i18n/emqx_bridge_pulsar.hocon

@@ -67,6 +67,11 @@ emqx_bridge_pulsar {
     label = "Enable or Disable"
   }
 
+  connect_timeout {
+    desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
+    label = "Connect Timeout"
+  }
+
   desc_name {
     desc = "Bridge name, used as a human-readable description of the bridge."
     label = "Bridge Name"

+ 5 - 0
rel/i18n/zh/emqx_bridge_pulsar.hocon

@@ -20,6 +20,11 @@ emqx_bridge_pulsar {
     label = "启用或停用"
   }
 
+  connect_timeout {
+    desc = "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
+    label = "连接超时"
+  }
+
   servers {
     desc = "以逗号分隔的 <code>scheme://host[:port]</code> 格式的 Pulsar URL 列表,"
            "支持的 scheme 有 <code>pulsar://</code> (默认)"