Przeglądaj źródła

feat(exhook): expose tcp some options for grpc client

firest 3 lat temu
rodzic
commit
8d0e95cc6b

+ 28 - 0
apps/emqx_exhook/i18n/emqx_exhook_i18n.conf

@@ -58,5 +58,33 @@ When gRPC is not available, Exhook tries to request the gRPC service at that int
     }
   }
 
+  keepalive {
+    desc {
+      en: """Enables/disables periodic transmission on a connected socket when no other data is exchanged.
+If the other end does not respond, the connection is considered broken and an error message is sent to the controlling process."""
+      zh: """当没有其他数据交换时,是否向连接的对端套接字定期的发送探测包。如果另一端没有响应,则认为连接断开,并向控制进程发送错误消息"""
+    }
+  }
+
+  nodelay {
+    desc {
+      en: """If true, option TCP_NODELAY is turned on for the socket,
+which means that also small amounts of data are sent immediately"""
+      zh: "如果为 true,则为套接字设置 TCP_NODELAY 选项,这意味着会立即发送数据包"
+    }
+  }
 
+  recbuf {
+    desc {
+      en: "The minimum size of the receive buffer to use for the socket"
+      zh: "套接字的最小接收缓冲区大小"
+    }
+  }
+
+  sndbuf {
+    desc {
+      en: "The minimum size of the send buffer to use for the socket"
+      zh: "套接字的最小发送缓冲区大小"
+    }
+  }
 }

+ 18 - 1
apps/emqx_exhook/src/emqx_exhook_schema.erl

@@ -68,6 +68,10 @@ fields(server) ->
             })},
         {failed_action, failed_action()},
         {ssl, ?HOCON(?R_REF(ssl_conf), #{})},
+        {socket_options,
+            ?HOCON(?R_REF(socket_options), #{
+                default => #{<<"keepalive">> => true, <<"nodelay">> => true}
+            })},
         {auto_reconnect,
             ?HOCON(hoconsc:union([false, emqx_schema:duration()]), #{
                 default => "60s",
@@ -81,7 +85,20 @@ fields(server) ->
     ];
 fields(ssl_conf) ->
     Schema = emqx_schema:client_ssl_opts_schema(#{}),
-    lists:keydelete("user_lookup_fun", 1, Schema).
+    lists:keydelete("user_lookup_fun", 1, Schema);
+fields(socket_options) ->
+    [
+        {keepalive, ?HOCON(boolean(), #{default => true, desc => ?DESC(keepalive)})},
+        {nodelay, ?HOCON(boolean(), #{default => true, desc => ?DESC(nodelay)})},
+        {recbuf,
+            ?HOCON(emqx_schema:bytesize(), #{
+                desc => ?DESC(recbuf), required => false, example => <<"64KB">>
+            })},
+        {sndbuf,
+            ?HOCON(emqx_schema:bytesize(), #{
+                desc => ?DESC(sndbuf), required => false, example => <<"16KB">>
+            })}
+    ].
 
 desc(exhook) ->
     "External hook (exhook) configuration.";

+ 8 - 3
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -130,14 +130,19 @@ load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts)
     end.
 
 %% @private
-channel_opts(Opts = #{url := URL}) ->
+channel_opts(Opts = #{url := URL, socket_options := SockOptsT}) ->
     ClientOpts = maps:merge(
         #{pool_size => erlang:system_info(schedulers)},
         Opts
     ),
+    SockOpts = maps:to_list(SockOptsT),
     case uri_string:parse(URL) of
         #{scheme := <<"http">>, host := Host, port := Port} ->
-            {ok, {format_http_uri("http", Host, Port), ClientOpts}};
+            NClientOpts = ClientOpts#{
+                gun_opts =>
+                    #{transport_opts => SockOpts}
+            },
+            {ok, {format_http_uri("http", Host, Port), NClientOpts}};
         #{scheme := <<"https">>, host := Host, port := Port} ->
             SslOpts =
                 case maps:get(ssl, Opts, undefined) of
@@ -158,7 +163,7 @@ channel_opts(Opts = #{url := URL}) ->
                 gun_opts =>
                     #{
                         transport => ssl,
-                        transport_opts => SslOpts
+                        transport_opts => SockOpts ++ SslOpts
                     }
             },
             {ok, {format_http_uri("https", Host, Port), NClientOpts}};