Преглед изворни кода

fix: DynamoDB connector status check takes too long

The DynamoDB connector status checks takes very long when the server is
unavailable which makes the resource manager blocked for a long time.
This causes calls to update the Bridge config with the Bridge V1 API fail
due to a timeout when it calls the resource manager to remove the
channel.

A better fix would be to change the resource manager so that the status
check cannot block it for a long time. However, this is more complicated
so it needs to be done in a later commit. A new ticket has been created
for this task https://emqx.atlassian.net/browse/EMQX-12015 .

Fixes:
https://emqx.atlassian.net/browse/EMQX-11984
Kjell Winblad пре 1 година
родитељ
комит
4e9f2c8f5d

+ 2 - 2
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -179,13 +179,13 @@ on_batch_query(_InstanceId, Query, _State) ->
     {error, {unrecoverable_error, {invalid_request, Query}}}.
     {error, {unrecoverable_error, {invalid_request, Query}}}.
 
 
 health_check_timeout() ->
 health_check_timeout() ->
-    15000.
+    2500.
 
 
 on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
 on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
     Health = emqx_resource_pool:health_check_workers(
     Health = emqx_resource_pool:health_check_workers(
         Pool,
         Pool,
         {emqx_bridge_dynamo_connector_client, is_connected, [
         {emqx_bridge_dynamo_connector_client, is_connected, [
-            emqx_resource_pool:health_check_timeout()
+            health_check_timeout()
         ]},
         ]},
         health_check_timeout(),
         health_check_timeout(),
         #{return_values => true}
         #{return_values => true}

+ 2 - 0
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl

@@ -34,6 +34,8 @@ is_connected(Pid, Timeout) ->
     try
     try
         gen_server:call(Pid, is_connected, Timeout)
         gen_server:call(Pid, is_connected, Timeout)
     catch
     catch
+        _:{timeout, _} ->
+            {false, <<"timeout_while_checking_connection_dynamo_client">>};
         _:Error ->
         _:Error ->
             {false, Error}
             {false, Error}
     end.
     end.

+ 49 - 2
apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl

@@ -88,7 +88,9 @@ init_per_suite(Config) ->
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
     emqx_mgmt_api_test_util:end_suite(),
     emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_conf, erlcloud]),
+    ok = emqx_common_test_helpers:stop_apps([
+        emqx_rule_engine, emqx_bridge, emqx_resource, emqx_conf, erlcloud
+    ]),
     ok.
     ok.
 
 
 init_per_testcase(TestCase, Config) ->
 init_per_testcase(TestCase, Config) ->
@@ -134,7 +136,7 @@ common_init(ConfigT) ->
             emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
             emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
             % Ensure enterprise bridge module is loaded
             % Ensure enterprise bridge module is loaded
             ok = emqx_common_test_helpers:start_apps([
             ok = emqx_common_test_helpers:start_apps([
-                emqx_conf, emqx_resource, emqx_bridge
+                emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine
             ]),
             ]),
             _ = application:ensure_all_started(erlcloud),
             _ = application:ensure_all_started(erlcloud),
             _ = emqx_bridge_enterprise:module_info(),
             _ = emqx_bridge_enterprise:module_info(),
@@ -273,6 +275,24 @@ create_bridge_http(Params) ->
         Error -> Error
         Error -> Error
     end.
     end.
 
 
+update_bridge_http(#{<<"type">> := Type, <<"name">> := Name} = Config) ->
+    BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
+    Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeID]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Config) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+get_bridge_http(#{<<"type">> := Type, <<"name">> := Name}) ->
+    BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
+    Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeID]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
 send_message(Config, Payload) ->
 send_message(Config, Payload) ->
     Name = ?config(dynamo_name, Config),
     Name = ?config(dynamo_name, Config),
     BridgeType = ?config(dynamo_bridge_type, Config),
     BridgeType = ?config(dynamo_bridge_type, Config),
@@ -359,6 +379,33 @@ t_setup_via_config_and_publish(Config) ->
     ),
     ),
     ok.
     ok.
 
 
+%% https://emqx.atlassian.net/browse/EMQX-11984
+t_setup_via_http_api_and_update_wrong_config(Config) ->
+    BridgeType = ?config(dynamo_bridge_type, Config),
+    Name = ?config(dynamo_name, Config),
+    PgsqlConfig0 = ?config(dynamo_config, Config),
+    PgsqlConfig = PgsqlConfig0#{
+        <<"name">> => Name,
+        <<"type">> => BridgeType,
+        %% NOTE: using literal secret with HTTP API requests.
+        <<"aws_secret_access_key">> => <<?SECRET_ACCESS_KEY>>
+    },
+    BrokenConfig = PgsqlConfig#{<<"url">> => <<"http://non_existing_host:9999">>},
+    ?assertMatch(
+        {ok, _},
+        create_bridge_http(BrokenConfig)
+    ),
+    WrongURL2 = <<"http://non_existing_host:9998">>,
+    BrokenConfig2 = PgsqlConfig#{<<"url">> => WrongURL2},
+    ?assertMatch(
+        {ok, _},
+        update_bridge_http(BrokenConfig2)
+    ),
+    %% Check that the update worked
+    {ok, Result} = get_bridge_http(PgsqlConfig),
+    ?assertMatch(#{<<"url">> := WrongURL2}, Result),
+    emqx_bridge:remove(BridgeType, Name).
+
 t_setup_via_http_api_and_publish(Config) ->
 t_setup_via_http_api_and_publish(Config) ->
     BridgeType = ?config(dynamo_bridge_type, Config),
     BridgeType = ?config(dynamo_bridge_type, Config),
     Name = ?config(dynamo_name, Config),
     Name = ?config(dynamo_name, Config),