Просмотр исходного кода

feat(exhook): expose process pool_size for grpc client

JianBo He 4 лет назад
Родитель
Сommit
143c685452

+ 6 - 0
apps/emqx_exhook/etc/emqx_exhook.conf

@@ -25,6 +25,12 @@ exhook {
     ## Value: false | Duration
     auto_reconnect = 60s
 
+    ## The process pool size for gRPC client
+    ##
+    ## Default: Equals cpu cores
+    ## Value: Integer
+    #pool_size = 16
+
     servers = [
     #    { name: "default"
     #      url: "http://127.0.0.1:9000"

+ 11 - 0
apps/emqx_exhook/src/emqx_exhook_mngr.erl

@@ -36,6 +36,8 @@
         , server/1
         , put_request_failed_action/1
         , get_request_failed_action/0
+        , put_pool_size/1
+        , get_pool_size/0
         ]).
 
 %% gen_server callbacks
@@ -117,6 +119,9 @@ init([Servers, AutoReconnect, ReqOpts0]) ->
     put_request_failed_action(
       maps:get(request_failed_action, ReqOpts0, deny)
      ),
+    put_pool_size(
+      maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers))
+     ),
 
     %% Load the hook servers
     ReqOpts = maps:without([request_failed_action], ReqOpts0),
@@ -291,6 +296,12 @@ put_request_failed_action(Val) ->
 get_request_failed_action() ->
     persistent_term:get({?APP, request_failed_action}).
 
+put_pool_size(Val) ->
+    persistent_term:put({?APP, pool_size}, Val).
+
+get_pool_size() ->
+    persistent_term:get({?APP, pool_size}).
+
 save(Name, ServerState) ->
     Saved = persistent_term:get(?APP, []),
     persistent_term:put(?APP, lists:reverse([Name | Saved])),

+ 8 - 3
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -108,9 +108,10 @@ load(Name, Opts0, ReqOpts) ->
 
 %% @private
 channel_opts(Opts = #{url := URL}) ->
+    ClientOpts = #{pool_size => emqx_exhook_mngr:get_pool_size()},
     case uri_string:parse(URL) of
         #{scheme := "http", host := Host, port := Port} ->
-            {format_http_uri("http", Host, Port), #{}};
+            {format_http_uri("http", Host, Port), ClientOpts};
         #{scheme := "https", host := Host, port := Port} ->
             SslOpts =
                 case maps:get(ssl, Opts, undefined) of
@@ -122,8 +123,12 @@ channel_opts(Opts = #{url := URL}) ->
                            {keyfile, maps:get(keyfile, MapOpts, undefined)}
                           ])
                 end,
-            {format_http_uri("https", Host, Port),
-             #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}};
+            NClientOpts = ClientOpts#{
+                            gun_opts =>
+                              #{transport => ssl,
+                                transport_opts => SslOpts}
+                           },
+            {format_http_uri("https", Host, Port), NClientOpts};
         _ ->
             error(bad_server_url)
     end.

+ 3 - 2
apps/emqx_exhook/src/emqx_exhook_sup.erl

@@ -54,7 +54,8 @@ auto_reconnect() ->
 
 request_options() ->
     #{timeout => env(request_timeout, 5000),
-      request_failed_action => env(request_failed_action, deny)
+      request_failed_action => env(request_failed_action, deny),
+      pool_size => env(pool_size, erlang:system_info(schedulers))
      }.
 
 env(Key, Def) ->
@@ -67,7 +68,7 @@ env(Key, Def) ->
 -spec start_grpc_client_channel(
         binary(),
         uri_string:uri_string(),
-        grpc_client:options()) -> {ok, pid()} | {error, term()}.
+        grpc_client_sup:options()) -> {ok, pid()} | {error, term()}.
 start_grpc_client_channel(Name, SvrAddr, Options) ->
     grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).