Przeglądaj źródła

fix(webhook): convert `request_timeout`s in root and resource_opts

Thales Macedo Garitezi 3 lat temu
rodzic
commit
f95a30ae89

+ 54 - 5
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -17,6 +17,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -import(hoconsc, [mk/2, ref/2]).
 
@@ -140,11 +141,7 @@ fields(bridges) ->
                 #{
                     desc => ?DESC("bridges_webhook"),
                     required => false,
-                    converter => fun(X, _HoconOpts) ->
-                        emqx_bridge_compatible_config:upgrade_pre_ee(
-                            X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
-                        )
-                    end
+                    converter => fun webhook_bridge_converter/2
                 }
             )},
         {mqtt,
@@ -212,3 +209,55 @@ status() ->
 
 node_name() ->
     {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
+
+webhook_bridge_converter(Conf0, _HoconOpts) ->
+    Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee(
+        Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+    ),
+    case Conf1 of
+        undefined ->
+            undefined;
+        _ ->
+            do_convert_webhook_config(Conf1)
+    end.
+
+do_convert_webhook_config(
+    #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf
+) ->
+    %% ok: same values
+    Conf;
+do_convert_webhook_config(
+    #{
+        <<"request_timeout">> := ReqTRootRaw,
+        <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw}
+    } = Conf0
+) ->
+    %% different values; we set them to the same, if they are valid
+    %% durations
+    MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw),
+    MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw),
+    case {MReqTRoot, MReqTResource} of
+        {{ok, ReqTRoot}, {ok, ReqTResource}} ->
+            {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}),
+            ?SLOG(
+                debug,
+                #{
+                    msg => adjusting_webhook_bridge_request_time,
+                    new_value => ReqTRaw
+                }
+            ),
+            Conf1 = emqx_map_lib:deep_merge(
+                Conf0,
+                #{
+                    <<"request_timeout">> => ReqTRaw,
+                    <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw}
+                }
+            ),
+            Conf1;
+        _ ->
+            %% invalid values; let the type checker complain about
+            %% that.
+            Conf0
+    end;
+do_convert_webhook_config(Conf) ->
+    Conf.

+ 29 - 0
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -818,6 +818,35 @@ t_metrics(Config) ->
     ),
     ok.
 
+%% request_timeout in bridge root should match request_timeout in
+%% resource_opts.
+t_inconsistent_webhook_request_timeouts(Config) ->
+    Port = ?config(port, Config),
+    URL1 = ?URL(Port, "path1"),
+    Name = ?BRIDGE_NAME,
+    BadBridgeParams =
+        emqx_map_lib:deep_merge(
+            ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name),
+            #{
+                <<"request_timeout">> => <<"1s">>,
+                <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
+            }
+        ),
+    {ok, 201, RawResponse} = request(
+        post,
+        uri(["bridges"]),
+        BadBridgeParams
+    ),
+    %% note: same value on both fields
+    ?assertMatch(
+        #{
+            <<"request_timeout">> := <<"2s">>,
+            <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>}
+        },
+        emqx_json:decode(RawResponse, [return_maps])
+    ),
+    ok.
+
 operation_path(node, Oper, BridgeID) ->
     uri(["nodes", node(), "bridges", BridgeID, Oper]);
 operation_path(cluster, Oper, BridgeID) ->

+ 57 - 1
apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl

@@ -28,6 +28,7 @@ empty_config_test() ->
 webhook_config_test() ->
     Conf1 = parse(webhook_v5011_hocon()),
     Conf2 = parse(full_webhook_v5011_hocon()),
+    Conf3 = parse(full_webhook_v5019_hocon()),
 
     ?assertMatch(
         #{
@@ -59,6 +60,26 @@ webhook_config_test() ->
         check(Conf2)
     ),
 
+    %% the converter should pick the greater of the two
+    %% request_timeouts and place them in the root and inside
+    %% resource_opts.
+    ?assertMatch(
+        #{
+            <<"bridges">> := #{
+                <<"webhook">> := #{
+                    <<"the_name">> :=
+                        #{
+                            <<"method">> := get,
+                            <<"request_timeout">> := 60_000,
+                            <<"resource_opts">> := #{<<"request_timeout">> := 60_000},
+                            <<"body">> := <<"${payload}">>
+                        }
+                }
+            }
+        },
+        check(Conf3)
+    ),
+
     ok.
 
 up(#{<<"bridges">> := Bridges0} = Conf0) ->
@@ -124,7 +145,7 @@ bridges{
       max_retries = 3
       method = \"get\"
       pool_size = 4
-      request_timeout = \"5s\"
+      request_timeout = \"15s\"
       ssl {enable = false, verify = \"verify_peer\"}
       url = \"http://localhost:8080\"
     }
@@ -164,6 +185,41 @@ full_webhook_v5011_hocon() ->
     "}\n"
     "".
 
+%% does not contain direction
+full_webhook_v5019_hocon() ->
+    ""
+    "\n"
+    "bridges{\n"
+    "  webhook {\n"
+    "    the_name{\n"
+    "      body = \"${payload}\"\n"
+    "      connect_timeout = \"5s\"\n"
+    "      enable_pipelining = 100\n"
+    "      headers {\"content-type\" = \"application/json\"}\n"
+    "      max_retries = 3\n"
+    "      method = \"get\"\n"
+    "      pool_size = 4\n"
+    "      pool_type = \"random\"\n"
+    "      request_timeout = \"1m\"\n"
+    "      resource_opts = {\n"
+    "        request_timeout = \"7s\"\n"
+    "      }\n"
+    "      ssl {\n"
+    "        ciphers = \"\"\n"
+    "        depth = 10\n"
+    "        enable = false\n"
+    "        reuse_sessions = true\n"
+    "        secure_renegotiate = true\n"
+    "        user_lookup_fun = \"emqx_tls_psk:lookup\"\n"
+    "        verify = \"verify_peer\"\n"
+    "        versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n"
+    "      }\n"
+    "      url = \"http://localhost:8080\"\n"
+    "    }\n"
+    "  }\n"
+    "}\n"
+    "".
+
 %% erlfmt-ignore
 %% this is a generated from v5.0.11
 mqtt_v5011_hocon() ->

+ 1 - 1
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1544,7 +1544,7 @@ ensure_flush_timer(Data = #{tref := undefined}, 0) ->
     %% if the batch_time is 0, we don't need to start a timer, which
     %% can be costly at high rates.
     Ref = make_ref(),
-    self() ! {flush, {Ref, Ref}},
+    self() ! {flush, Ref},
     Data#{tref => {Ref, Ref}};
 ensure_flush_timer(Data = #{tref := undefined}, T) ->
     Ref = make_ref(),