Browse Source

Merge pull request #8580 from terry-xiaoyu/webhook_max_retries

fix(webhook): the 'max_retries' opt means request retry times
Xinyu Liu 3 years atrás
parent
commit
527289090d

+ 1 - 0
CHANGES-5.0.md

@@ -23,6 +23,7 @@
   **‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become
   **‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become
   `{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`,
   `{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`,
   which will carry the paging meta information.
   which will carry the paging meta information.
+* Fix the issue that webhook leaks TCP connections. [ehttpc#34](https://github.com/emqx/ehttpc/pull/34), [#8580](https://github.com/emqx/emqx/pull/8580)
 
 
 ## Enhancements
 ## Enhancements
 
 

+ 11 - 0
apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf

@@ -127,6 +127,17 @@ HTTP 请求的正文。</br>
                           }
                           }
                   }
                   }
 
 
+    config_max_retries {
+                   desc {
+                         en: """HTTP request max retry times if failed."""
+                         zh: """HTTP 请求失败最大重试次数"""
+                        }
+                   label: {
+                           en: "HTTP Request Max Retries"
+                           zh: "HTTP 请求重试次数"
+                          }
+                  }
+
     desc_type {
     desc_type {
                    desc {
                    desc {
                          en: """The Bridge Type"""
                          en: """The Bridge Type"""

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
 {application, emqx_bridge, [
     {description, "An OTP application"},
     {description, "An OTP application"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {registered, []},
     {mod, {emqx_bridge_app, []}},
     {mod, {emqx_bridge_app, []}},
     {applications, [
     {applications, [

+ 0 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -225,7 +225,6 @@ info_example_basic(webhook, _) ->
         request_timeout => <<"15s">>,
         request_timeout => <<"15s">>,
         connect_timeout => <<"15s">>,
         connect_timeout => <<"15s">>,
         max_retries => 3,
         max_retries => 3,
-        retry_interval => <<"10s">>,
         pool_type => <<"random">>,
         pool_type => <<"random">>,
         pool_size => 4,
         pool_size => 4,
         enable_pipelining => 100,
         enable_pipelining => 100,

+ 4 - 2
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -238,7 +238,8 @@ parse_confs(
         method := Method,
         method := Method,
         body := Body,
         body := Body,
         headers := Headers,
         headers := Headers,
-        request_timeout := ReqTimeout
+        request_timeout := ReqTimeout,
+        max_retries := Retry
     } = Conf
     } = Conf
 ) ->
 ) ->
     {BaseUrl, Path} = parse_url(Url),
     {BaseUrl, Path} = parse_url(Url),
@@ -251,7 +252,8 @@ parse_confs(
                 method => Method,
                 method => Method,
                 body => Body,
                 body => Body,
                 headers => Headers,
                 headers => Headers,
-                request_timeout => ReqTimeout
+                request_timeout => ReqTimeout,
+                max_retries => Retry
             }
             }
     };
     };
 parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
 parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when

+ 64 - 54
apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl

@@ -14,60 +14,7 @@ namespace() -> "bridge".
 roots() -> [].
 roots() -> [].
 
 
 fields("config") ->
 fields("config") ->
-    basic_config() ++
-        [
-            {url,
-                mk(
-                    binary(),
-                    #{
-                        required => true,
-                        desc => ?DESC("config_url")
-                    }
-                )},
-            {local_topic,
-                mk(
-                    binary(),
-                    #{desc => ?DESC("config_local_topic")}
-                )},
-            {method,
-                mk(
-                    method(),
-                    #{
-                        default => post,
-                        desc => ?DESC("config_method")
-                    }
-                )},
-            {headers,
-                mk(
-                    map(),
-                    #{
-                        default => #{
-                            <<"accept">> => <<"application/json">>,
-                            <<"cache-control">> => <<"no-cache">>,
-                            <<"connection">> => <<"keep-alive">>,
-                            <<"content-type">> => <<"application/json">>,
-                            <<"keep-alive">> => <<"timeout=5">>
-                        },
-                        desc => ?DESC("config_headers")
-                    }
-                )},
-            {body,
-                mk(
-                    binary(),
-                    #{
-                        default => <<"${payload}">>,
-                        desc => ?DESC("config_body")
-                    }
-                )},
-            {request_timeout,
-                mk(
-                    emqx_schema:duration_ms(),
-                    #{
-                        default => <<"15s">>,
-                        desc => ?DESC("config_request_timeout")
-                    }
-                )}
-        ];
+    basic_config() ++ request_config();
 fields("post") ->
 fields("post") ->
     [
     [
         type_field(),
         type_field(),
@@ -106,6 +53,69 @@ basic_config() ->
     ] ++
     ] ++
         proplists:delete(base_url, emqx_connector_http:fields(config)).
         proplists:delete(base_url, emqx_connector_http:fields(config)).
 
 
+request_config() ->
+    [
+        {url,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_url")
+                }
+            )},
+        {local_topic,
+            mk(
+                binary(),
+                #{desc => ?DESC("config_local_topic")}
+            )},
+        {method,
+            mk(
+                method(),
+                #{
+                    default => post,
+                    desc => ?DESC("config_method")
+                }
+            )},
+        {headers,
+            mk(
+                map(),
+                #{
+                    default => #{
+                        <<"accept">> => <<"application/json">>,
+                        <<"cache-control">> => <<"no-cache">>,
+                        <<"connection">> => <<"keep-alive">>,
+                        <<"content-type">> => <<"application/json">>,
+                        <<"keep-alive">> => <<"timeout=5">>
+                    },
+                    desc => ?DESC("config_headers")
+                }
+            )},
+        {body,
+            mk(
+                binary(),
+                #{
+                    default => <<"${payload}">>,
+                    desc => ?DESC("config_body")
+                }
+            )},
+        {max_retries,
+            mk(
+                non_neg_integer(),
+                #{
+                    default => 2,
+                    desc => ?DESC("config_max_retries")
+                }
+            )},
+        {request_timeout,
+            mk(
+                emqx_schema:duration_ms(),
+                #{
+                    default => <<"15s">>,
+                    desc => ?DESC("config_request_timeout")
+                }
+            )}
+    ].
+
 %%======================================================================================
 %%======================================================================================
 
 
 type_field() ->
 type_field() ->

+ 0 - 11
apps/emqx_connector/i18n/emqx_connector_http.conf

@@ -41,17 +41,6 @@ base URL 只包含host和port。</br>
             }
             }
     }
     }
 
 
-    retry_interval {
-        desc {
-          en: "Interval between retries."
-          zh: "重试之间的间隔时间。"
-        }
-        label: {
-              en: "Retry Interval"
-              zh: "重试间隔"
-            }
-    }
-
     pool_type {
     pool_type {
         desc {
         desc {
           en: "The type of the pool. Can be one of `random`, `hash`."
           en: "The type of the pool. Can be one of `random`, `hash`."

+ 23 - 27
apps/emqx_connector/src/emqx_connector_http.erl

@@ -88,22 +88,6 @@ fields(config) ->
                     desc => ?DESC("connect_timeout")
                     desc => ?DESC("connect_timeout")
                 }
                 }
             )},
             )},
-        {max_retries,
-            sc(
-                non_neg_integer(),
-                #{
-                    default => 5,
-                    desc => ?DESC("max_retries")
-                }
-            )},
-        {retry_interval,
-            sc(
-                emqx_schema:duration(),
-                #{
-                    default => "1s",
-                    desc => ?DESC("retry_interval")
-                }
-            )},
         {pool_type,
         {pool_type,
             sc(
             sc(
                 pool_type(),
                 pool_type(),
@@ -147,6 +131,14 @@ fields("request") ->
         {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
         {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
         {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
         {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
         {headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
         {headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
+        {max_retries,
+            sc(
+                non_neg_integer(),
+                #{
+                    required => false,
+                    desc => ?DESC("max_retries")
+                }
+            )},
         {request_timeout,
         {request_timeout,
             sc(
             sc(
                 emqx_schema:duration_ms(),
                 emqx_schema:duration_ms(),
@@ -182,8 +174,6 @@ on_start(
             path := BasePath
             path := BasePath
         },
         },
         connect_timeout := ConnectTimeout,
         connect_timeout := ConnectTimeout,
-        max_retries := MaxRetries,
-        retry_interval := RetryInterval,
         pool_type := PoolType,
         pool_type := PoolType,
         pool_size := PoolSize
         pool_size := PoolSize
     } = Config
     } = Config
@@ -206,8 +196,6 @@ on_start(
         {host, Host},
         {host, Host},
         {port, Port},
         {port, Port},
         {connect_timeout, ConnectTimeout},
         {connect_timeout, ConnectTimeout},
-        {retry, MaxRetries},
-        {retry_timeout, RetryInterval},
         {keepalive, 30000},
         {keepalive, 30000},
         {pool_type, PoolType},
         {pool_type, PoolType},
         {pool_size, PoolSize},
         {pool_size, PoolSize},
@@ -247,17 +235,23 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
                 path := Path,
                 path := Path,
                 body := Body,
                 body := Body,
                 headers := Headers,
                 headers := Headers,
-                request_timeout := Timeout
+                request_timeout := Timeout,
+                max_retries := Retry
             } = process_request(Request, Msg),
             } = process_request(Request, Msg),
-            on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
+            on_query(
+                InstId,
+                {undefined, Method, {Path, Headers, Body}, Timeout, Retry},
+                AfterQuery,
+                State
+            )
     end;
     end;
 on_query(InstId, {Method, Request}, AfterQuery, State) ->
 on_query(InstId, {Method, Request}, AfterQuery, State) ->
-    on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
+    on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State);
 on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
 on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
-    on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
+    on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State);
 on_query(
 on_query(
     InstId,
     InstId,
-    {KeyOrNum, Method, Request, Timeout},
+    {KeyOrNum, Method, Request, Timeout, Retry},
     AfterQuery,
     AfterQuery,
     #{pool_name := PoolName, base_path := BasePath} = State
     #{pool_name := PoolName, base_path := BasePath} = State
 ) ->
 ) ->
@@ -275,7 +269,8 @@ on_query(
             end,
             end,
             Method,
             Method,
             NRequest,
             NRequest,
-            Timeout
+            Timeout,
+            Retry
         )
         )
     of
     of
         {error, Reason} ->
         {error, Reason} ->
@@ -368,7 +363,8 @@ preprocess_request(
         path => emqx_plugin_libs_rule:preproc_tmpl(Path),
         path => emqx_plugin_libs_rule:preproc_tmpl(Path),
         body => emqx_plugin_libs_rule:preproc_tmpl(Body),
         body => emqx_plugin_libs_rule:preproc_tmpl(Body),
         headers => preproc_headers(Headers),
         headers => preproc_headers(Headers),
-        request_timeout => maps:get(request_timeout, Req, 30000)
+        request_timeout => maps:get(request_timeout, Req, 30000),
+        max_retries => maps:get(max_retries, Req, 2)
     }.
     }.
 
 
 preproc_headers(Headers) when is_map(Headers) ->
 preproc_headers(Headers) when is_map(Headers) ->

+ 1 - 1
mix.exs

@@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
       {:lc, github: "emqx/lc", tag: "0.3.1"},
       {:lc, github: "emqx/lc", tag: "0.3.1"},
       {:redbug, "2.0.7"},
       {:redbug, "2.0.7"},
       {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
       {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
-      {:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"},
+      {:ehttpc, github: "emqx/ehttpc", tag: "0.3.0"},
       {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
       {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},

+ 1 - 1
rebar.config

@@ -49,7 +49,7 @@
     , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
     , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
     , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
     , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
     , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
     , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
-    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}}
+    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}}
     , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
     , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}

+ 1 - 1
scripts/relup-test/relup.lux

@@ -15,7 +15,7 @@
     ?SH-PROMPT
     ?SH-PROMPT
 
 
     ## create a webhook data bridge with id "my_webhook"
     ## create a webhook data bridge with id "my_webhook"
-    !curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","request_timeout":"5s","max_retries":3,"type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
+    !curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
     ?connected
     ?connected
     ?SH-PROMPT
     ?SH-PROMPT