Sfoglia il codice sorgente

fix(http pipelining): support to switch http pipelining (#5279)

tigercl 4 anni fa
parent
commit
fee3462603

+ 5 - 0
apps/emqx_auth_http/etc/emqx_auth_http.conf

@@ -136,6 +136,11 @@ auth.http.connect_timeout = 5s
 ## Value: Number
 auth.http.pool_size = 32
 
+## Whether to enable HTTP Pipelining
+##
+## See: https://en.wikipedia.org/wiki/HTTP_pipelining
+auth.http.enable_pipelining = true
+
 ##------------------------------------------------------------------------------
 ## SSL options
 

+ 5 - 0
apps/emqx_auth_http/priv/emqx_auth_http.schema

@@ -109,6 +109,11 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "auth.http.enable_pipelining", "emqx_auth_http.enable_pipelining", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
 {mapping, "auth.http.ssl.cacertfile", "emqx_auth_http.cacertfile", [
   {datatype, string}
 ]}.

+ 1 - 1
apps/emqx_auth_http/src/emqx_auth_http.app.src

@@ -1,6 +1,6 @@
 {application, emqx_auth_http,
  [{description, "EMQ X Authentication/ACL with HTTP API"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.3.2"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_auth_http_sup]},
   {applications, [kernel,stdlib,ehttpc]},

+ 2 - 2
apps/emqx_auth_http/src/emqx_auth_http.appup.src

@@ -2,13 +2,13 @@
 
 {VSN,
   [
-    {"4.3.0", [
+    {<<"4.3.[0-1]">>, [
      {restart_application, emqx_auth_http}
     ]},
     {<<".*">>, []}
   ],
   [
-    {"4.3.0", [
+    {<<"4.3.[0-1]">>, [
      {restart_application, emqx_auth_http}
     ]},
     {<<".*">>, []}

+ 2 - 0
apps/emqx_auth_http/src/emqx_auth_http_app.erl

@@ -50,6 +50,7 @@ translate_env(EnvName) ->
     case application:get_env(?APP, EnvName) of
         undefined -> ok;
         {ok, Req} ->
+            {ok, EnablePipelining} = application:get_env(?APP, enable_pipelining),
             {ok, PoolSize} = application:get_env(?APP, pool_size),
             {ok, ConnectTimeout} = application:get_env(?APP, connect_timeout),
             URL = proplists:get_value(url, Req),
@@ -88,6 +89,7 @@ translate_env(EnvName) ->
                         end,
             PoolOpts = [{host, Host},
                         {port, Port},
+                        {enable_pipelining, EnablePipelining},
                         {pool_size, PoolSize},
                         {pool_type, random},
                         {connect_timeout, ConnectTimeout},

+ 2 - 2
apps/emqx_auth_http/src/emqx_auth_http_cli.erl

@@ -29,7 +29,7 @@
 
 request(PoolName, get, Path, Headers, Params, Timeout) ->
     NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
-    reply(ehttpc:request(ehttpc_pool:pick_worker(PoolName), get, {NewPath, Headers}, Timeout));
+    reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout));
 
 request(PoolName, post, Path, Headers, Params, Timeout) ->
     Body = case proplists:get_value("content-type", Headers) of
@@ -38,7 +38,7 @@ request(PoolName, post, Path, Headers, Params, Timeout) ->
                "application/json" -> 
                    emqx_json:encode(bin_kw(Params))
            end,
-    reply(ehttpc:request(ehttpc_pool:pick_worker(PoolName), post, {Path, Headers, Body}, Timeout)).
+    reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)).
 
 reply({ok, StatusCode, _Headers}) ->
     {ok, StatusCode, <<>>};

+ 5 - 0
apps/emqx_web_hook/etc/emqx_web_hook.conf

@@ -57,6 +57,11 @@ web.hook.body.encoding_of_payload_field = plain
 ## Value: Number
 web.hook.pool_size = 32
 
+## Whether to enable HTTP Pipelining
+##
+## See: https://en.wikipedia.org/wiki/HTTP_pipelining
+web.hook.enable_pipelining = true
+
 ##--------------------------------------------------------------------
 ## Hook Rules
 ## These configuration items represent a list of events should be forwarded

+ 5 - 0
apps/emqx_web_hook/priv/emqx_web_hook.schema

@@ -43,6 +43,11 @@
   {datatype, integer}
 ]}.
 
+{mapping, "web.hook.enable_pipelining", "emqx_web_hook.enable_pipelining", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
 {mapping, "web.hook.rule.client.connect.$name", "emqx_web_hook.rules", [
   {datatype, string}
 ]}.

+ 1 - 1
apps/emqx_web_hook/src/emqx_web_hook.app.src

@@ -1,6 +1,6 @@
 {application, emqx_web_hook,
  [{description, "EMQ X WebHook Plugin"},
-  {vsn, "4.3.2"}, % strict semver, bump manually!
+  {vsn, "4.3.3"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_web_hook_sup]},
   {applications, [kernel,stdlib,ehttpc]},

+ 2 - 2
apps/emqx_web_hook/src/emqx_web_hook.appup.src

@@ -2,14 +2,14 @@
 
 {VSN,
   [
-    {<<"4.3.[0-1]">>, [
+    {<<"4.3.[0-2]">>, [
      {restart_application, emqx_web_hook},
      {apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
     ]},
     {<<".*">>, []}
   ],
   [
-    {<<"4.3.[0-1]">>, [
+    {<<"4.3.[0-2]">>, [
      {restart_application, emqx_web_hook},
      {apply,{emqx_rule_engine,refresh_resource,[web_hook]}}
     ]},

+ 1 - 1
apps/emqx_web_hook/src/emqx_web_hook.erl

@@ -326,7 +326,7 @@ send_http_request(ClientID, Params) ->
     Headers = application:get_env(?APP, headers, []),
     Body = emqx_json:encode(Params),
     ?LOG(debug, "Send to: ~0p, params: ~s", [Path, Body]),
-    case ehttpc:request(ehttpc_pool:pick_worker(?APP, ClientID), post, {Path, Headers, Body}) of
+    case ehttpc:request({?APP, ClientID}, post, {Path, Headers, Body}) of
         {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
             ok;
         {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->

+ 13 - 6
apps/emqx_web_hook/src/emqx_web_hook_actions.erl

@@ -67,35 +67,42 @@
                    description => #{en => <<"Connection Pool">>,
                                     zh => <<"连接池大小"/utf8>>}
                 },
-    cacertfile => #{order => 5,
+    enable_pipelining => #{order => 5,
+                           type => boolean,
+                           default => true,
+                           title => #{en => <<"Enable Pipelining">>, zh => <<"Enable Pipelining"/utf8>>},
+                           description => #{en => <<"Whether to enable HTTP Pipelining">>,
+                                            zh => <<"是否开启 HTTP Pipelining"/utf8>>}
+                },
+    cacertfile => #{order => 6,
                     type => file,
                     default => <<"">>,
                     title => #{en => <<"CA Certificate File">>,
                                zh => <<"CA 证书文件"/utf8>>},
                     description => #{en => <<"CA Certificate file">>,
                                      zh => <<"CA 证书文件"/utf8>>}},
-    keyfile => #{order => 6,
+    keyfile => #{order => 7,
                  type => file,
                  default => <<"">>,
                  title =>#{en => <<"SSL Key">>,
                            zh => <<"SSL Key"/utf8>>},
                  description => #{en => <<"Your ssl keyfile">>,
                                   zh => <<"SSL 私钥"/utf8>>}},
-    certfile => #{order => 7,
+    certfile => #{order => 8,
                   type => file,
                   default => <<"">>,
                   title => #{en => <<"SSL Cert">>,
                              zh => <<"SSL Cert"/utf8>>},
                   description => #{en => <<"Your ssl certfile">>,
                                    zh => <<"SSL 证书"/utf8>>}},
-    verify => #{order => 8,
+    verify => #{order => 9,
                 type => boolean,
                 default => false,
                 title => #{en => <<"Verify Server Certfile">>,
                            zh => <<"校验服务器证书"/utf8>>},
                 description => #{en => <<"Whether to verify the server certificate. By default, the client will not verify the server's certificate. If verification is required, please set it to true.">>,
                                  zh => <<"是否校验服务器证书。 默认客户端不会去校验服务器的证书,如果需要校验,请设置成true。"/utf8>>}},
-    server_name_indication => #{order => 9,
+    server_name_indication => #{order => 10,
                                 type => string,
                                 title => #{en => <<"Server Name Indication">>,
                                            zh => <<"服务器名称指示"/utf8>>},
@@ -254,7 +261,7 @@ on_action_data_to_webserver(Selected, _Envs =
     NBody = format_msg(BodyTokens, Selected),
     NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
     Req = create_req(Method, NPath, Headers, NBody),
-    case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of
+    case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
         {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
             emqx_rule_metrics:inc_actions_success(Id);
         {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->

+ 2 - 0
apps/emqx_web_hook/src/emqx_web_hook_app.erl

@@ -45,6 +45,7 @@ translate_env() ->
            port := Port,
            scheme := Scheme} = URIMap} = emqx_http_lib:uri_parse(URL),
     Path = path(URIMap),
+    {ok, EnablePipelining} = application:get_env(?APP, enable_pipelining),
     PoolSize = application:get_env(?APP, pool_size, 32),
     MoreOpts = case Scheme of
                    http ->
@@ -77,6 +78,7 @@ translate_env() ->
                 end,
     PoolOpts = [{host, Host},
                 {port, Port},
+                {enable_pipelining, EnablePipelining},
                 {pool_size, PoolSize},
                 {pool_type, hash},
                 {connect_timeout, 5000},

+ 6 - 2
lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src

@@ -4,12 +4,16 @@
     %% load all plugins
     %% NOTE: this depends on the fact that emqx_dashboard is always
     %% the last application gets upgraded
-    [ {apply, {emqx_plugins, load, []}}
+    [ {apply, {emqx_rule_engine, load_providers, []}} 
+    , {restart_application, emqx_dashboard}
+    , {apply, {emqx_plugins, load, []}}
     ]},
    {<<".*">>, []}
  ],
  [ {<<"4.3.[0-1]">>,
-    [ {apply, {emqx_plugins, load, []}}
+    [ {apply, {emqx_rule_engine, load_providers, []}} 
+    , {restart_application, emqx_dashboard}
+    , {apply, {emqx_plugins, load, []}}
     ]},
    {<<".*">>, []}
  ]

+ 1 - 1
rebar.config

@@ -37,7 +37,7 @@
 
 {deps,
     [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
-    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.6"}}}
+    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.8"}}}
     , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}}
     , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}

+ 1 - 0
scripts/inject-deps.escript

@@ -54,6 +54,7 @@ base_deps() ->
   [ {emqx_dashboard, [{re, "emqx_.*"}]}
   , {emqx_management, [{re, "emqx_.*"}, {exclude, emqx_dashboard}]}
   , {{re, "emqx_.*"}, [emqx]}
+  , {emqx_web_hook, [ehttpc]}
   ].
 
 main([Profile | _]) ->