Просмотр исходного кода

fix(mongodb bridge): correctly handle driver state update after health check

Fixes https://emqx.atlassian.net/browse/EMQX-13496
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
6c0063ea06

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -251,5 +251,11 @@
     "listen": "0.0.0.0:8362",
     "upstream": "datalayers_tls:8362",
     "enabled": true
+  },
+  {
+    "name": "mongo_single_tcp",
+    "listen": "0.0.0.0:27017",
+    "upstream": "mongo:27017",
+    "enabled": true
   }
 ]

+ 10 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -486,6 +486,16 @@ get_action_api(Config) ->
     ct:pal("get action (http) result:\n  ~p", [Res]),
     Res.
 
+get_action_metrics_api(Config) ->
+    ActionName = ?config(action_name, Config),
+    ActionType = ?config(action_type, Config),
+    ActionId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
+    Path = emqx_mgmt_api_test_util:api_path(["actions", ActionId, "metrics"]),
+    ct:pal("getting action (http)"),
+    Res = request(get, Path, []),
+    ct:pal("get action (http) result:\n  ~p", [Res]),
+    simplify_result(Res).
+
 update_bridge_api(Config) ->
     update_bridge_api(Config, _Overrides = #{}).
 

+ 1 - 0
apps/emqx_bridge_mongodb/docker-ct

@@ -1,2 +1,3 @@
+toxiproxy
 mongo
 mongo_rs_sharded

+ 7 - 2
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl

@@ -61,8 +61,13 @@ on_get_channel_status(InstanceId, _ChannelId, State) ->
 on_get_channels(InstanceId) ->
     emqx_bridge_v2:get_channels_for_connector(InstanceId).
 
-on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
-    emqx_mongodb:on_get_status(InstanceId, ConnectorState).
+on_get_status(InstanceId, ConnectorState = #{connector_state := DriverState0}) ->
+    case emqx_mongodb:on_get_status(InstanceId, DriverState0) of
+        {Status, DriverState, Reason} ->
+            {Status, ConnectorState#{connector_state := DriverState}, Reason};
+        Status when is_atom(Status) ->
+            Status
+    end.
 
 on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_state := ConnectorState}) ->
     #{

+ 148 - 13
apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl

@@ -19,9 +19,10 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(BRIDGE_TYPE, mongodb).
--define(BRIDGE_TYPE_BIN, <<"mongodb">>).
+-define(ACTION_TYPE, mongodb).
+-define(ACTION_TYPE_BIN, <<"mongodb">>).
 -define(CONNECTOR_TYPE, mongodb).
 -define(CONNECTOR_TYPE_BIN, <<"mongodb">>).
 
@@ -36,8 +37,11 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"),
+    MongoHost = os:getenv("MONGO_SINGLE_HOST", "toxiproxy"),
     MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
+    ProxyHost = "toxiproxy",
+    ProxyPort = 8474,
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
         true ->
             Apps = emqx_cth_suite:start(
@@ -49,14 +53,15 @@ init_per_suite(Config) ->
                     emqx_bridge_mongodb,
                     emqx_rule_engine,
                     emqx_management,
-                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                    emqx_mgmt_api_test_util:emqx_dashboard()
                 ],
                 #{work_dir => emqx_cth_suite:work_dir(Config)}
             ),
-            {ok, Api} = emqx_common_test_http:create_default_app(),
             [
+                {proxy_name, "mongo_single_tcp"},
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
                 {apps, Apps},
-                {api, Api},
                 {mongo_host, MongoHost},
                 {mongo_port, MongoPort}
                 | Config
@@ -80,6 +85,9 @@ init_per_testcase(TestCase, Config) ->
 
 common_init_per_testcase(TestCase, Config) ->
     ct:timetrap(timer:seconds(60)),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     emqx_config:delete_override_conf_files(),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
@@ -97,15 +105,16 @@ common_init_per_testcase(TestCase, Config) ->
         | Config
     ],
     ConnectorConfig = connector_config(Name, NConfig),
-    BridgeConfig = bridge_config(Name, Name),
+    BridgeConfig = action_config(Name, Name),
     ok = snabbkaffe:start_trace(),
     [
+        {bridge_kind, action},
         {connector_type, ?CONNECTOR_TYPE},
         {connector_name, Name},
         {connector_config, ConnectorConfig},
-        {bridge_type, ?BRIDGE_TYPE},
-        {bridge_name, Name},
-        {bridge_config, BridgeConfig}
+        {action_type, ?ACTION_TYPE},
+        {action_name, Name},
+        {action_config, BridgeConfig}
         | NConfig
     ].
 
@@ -164,7 +173,7 @@ parse_and_check_connector_config(InnerConfigMap, Name) ->
     ct:pal("parsed config: ~p", [Config]),
     InnerConfigMap.
 
-bridge_config(Name, ConnectorId) ->
+action_config(Name, ConnectorId) ->
     InnerConfigMap0 =
         #{
             <<"enable">> => true,
@@ -197,7 +206,7 @@ serde_roundtrip(InnerConfigMap0) ->
     InnerConfigMap.
 
 parse_and_check_bridge_config(InnerConfigMap, Name) ->
-    emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
+    emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, InnerConfigMap).
 
 shared_secret_path() ->
     os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
@@ -221,6 +230,39 @@ make_message() ->
         timestamp => Time
     }.
 
+create_bridge_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_bridge_api(Config, Overrides)
+    ).
+
+create_action_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_kind_api(Config, Overrides)
+    ).
+
+get_connector_api(Config) ->
+    ConnectorName = ?config(connector_name, Config),
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:get_connector_api(
+            ?CONNECTOR_TYPE,
+            ConnectorName
+        )
+    ).
+
+get_action_api(Config) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:get_action_api(
+            Config
+        )
+    ).
+
+delete_action_api(Config) ->
+    emqx_bridge_v2_testlib:delete_kind_api(
+        action,
+        ?ACTION_TYPE,
+        ?config(action_name, Config)
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -234,7 +276,7 @@ t_create_via_http(Config) ->
     ok.
 
 t_on_get_status(Config) ->
-    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    emqx_bridge_v2_testlib:t_on_get_status(Config),
     ok.
 
 t_sync_query(Config) ->
@@ -245,3 +287,96 @@ t_sync_query(Config) ->
         mongo_bridge_connector_on_query_return
     ),
     ok.
+
+%% Checks that we don't mangle the connector state if the underlying connector
+%% implementation returns a tuple with new state.
+%% See also: https://emqx.atlassian.net/browse/EMQX-13496.
+t_timeout_during_connector_health_check(Config0) ->
+    ProxyName = ?config(proxy_name, Config0),
+    ProxyHost = ?config(proxy_host, Config0),
+    ProxyPort = ?config(proxy_port, Config0),
+    ConnectorName = ?config(connector_name, Config0),
+    Overrides = #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"700ms">>}},
+    Config = emqx_bridge_v2_testlib:proplist_update(
+        Config0,
+        connector_config,
+        fun(Cfg) ->
+            emqx_utils_maps:deep_merge(Cfg, Overrides)
+        end
+    ),
+    ?check_trace(
+        begin
+            {201, _} = create_bridge_api(Config, Overrides),
+
+            %% emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
+            %%   ?retry(500, 10, ?assertEqual(<<"disconnected">>, GetConnectorStatus())),
+            %%   ok
+            %% end),
+            %% Wait until it's disconnected
+            emqx_common_test_helpers:with_mock(
+                emqx_resource_pool,
+                health_check_workers,
+                fun(_Pool, _Fun, _Timeout, _Opts) -> {error, timeout} end,
+                fun() ->
+                    ?retry(
+                        1_000,
+                        10,
+                        ?assertMatch(
+                            {200, #{<<"status">> := <<"disconnected">>}},
+                            get_connector_api(Config)
+                        )
+                    ),
+                    ?retry(
+                        1_000,
+                        10,
+                        ?assertMatch(
+                            {200, #{<<"status">> := <<"disconnected">>}},
+                            get_action_api(Config)
+                        )
+                    ),
+                    ok
+                end
+            ),
+
+            %% Wait for recovery
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_connector_api(Config)
+                )
+            ),
+
+            RuleTopic = <<"t/timeout">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
+                ?ACTION_TYPE, RuleTopic, Config
+            ),
+
+            %% Action should be fine, including its metrics.
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_action_api(Config)
+                )
+            ),
+            emqx:publish(emqx_message:make(RuleTopic, <<"hey">>)),
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"metrics">> := #{<<"matched">> := 1}}},
+                    emqx_bridge_v2_testlib:get_action_metrics_api(Config)
+                )
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertEqual([], ?of_kind("health_check_exception", Trace)),
+            ?assertEqual([], ?of_kind("remove_channel_failed", Trace)),
+            ok
+        end
+    ),
+    ok.

+ 3 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -954,6 +954,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
                 added_channels = NewAddedChannelsMap
             };
         {error, Reason} ->
+            ?tp("remove_channel_failed", #{resource_id => ResId, reason => Reason}),
             ?SLOG(
                 log_level(IsDryRun),
                 #{
@@ -1082,6 +1083,7 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
             {keep_state, update_state(UpdatedData), [{reply, From, ok}]};
         {error, Reason} = Error ->
             IsDryRun = emqx_resource:is_dry_run(Id),
+            ?tp("remove_channel_failed", #{resource_id => Id, reason => Reason}),
             ?SLOG(
                 log_level(IsDryRun),
                 #{
@@ -1681,6 +1683,7 @@ parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
 parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
     {Status, NewState, {error, Error}};
 parse_health_check_result({error, Error}, Data) ->
+    ?tp("health_check_exception", #{resource_id => Data#data.id, reason => Error}),
     ?SLOG(
         error,
         #{