فهرست منبع

fix(mongo): return health check failure reason

Fixes https://emqx.atlassian.net/browse/EMQX-10335
Thales Macedo Garitezi 2 سال پیش
والد
کامیت
18f0510353

+ 30 - 12
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -269,28 +269,46 @@ on_query(
             {ok, Result}
     end.
 
-on_get_status(InstId, #{pool_name := PoolName}) ->
+on_get_status(InstId, State = #{pool_name := PoolName}) ->
     case health_check(PoolName) of
-        true ->
+        ok ->
             ?tp(debug, emqx_connector_mongo_health_check, #{
                 instance_id => InstId,
                 status => ok
             }),
             connected;
-        false ->
+        {error, Reason} ->
             ?tp(warning, emqx_connector_mongo_health_check, #{
                 instance_id => InstId,
+                reason => Reason,
                 status => failed
             }),
-            disconnected
+            {disconnected, State, Reason}
     end.
 
 health_check(PoolName) ->
-    emqx_resource_pool:health_check_workers(
-        PoolName,
-        fun ?MODULE:check_worker_health/1,
-        ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
-    ).
+    Results =
+        emqx_resource_pool:health_check_workers(
+            PoolName,
+            fun ?MODULE:check_worker_health/1,
+            ?HEALTH_CHECK_TIMEOUT + timer:seconds(1),
+            #{return_values => true}
+        ),
+    case Results of
+        {ok, []} ->
+            {error, worker_processes_dead};
+        {ok, Values} ->
+            case lists:partition(fun(V) -> V =:= ok end, Values) of
+                {_Ok, []} ->
+                    ok;
+                {_Ok, [{error, Reason} | _Errors]} ->
+                    {error, Reason};
+                {_Ok, [Error | _Errors]} ->
+                    {error, Error}
+            end;
+        {error, Reason} ->
+            {error, Reason}
+    end.
 
 %% ===================================================================
 
@@ -302,9 +320,9 @@ check_worker_health(Conn) ->
                 msg => "mongo_connection_get_status_error",
                 reason => Reason
             }),
-            false;
+            {error, Reason};
         _ ->
-            true
+            ok
     catch
         Class:Error ->
             ?SLOG(warning, #{
@@ -312,7 +330,7 @@ check_worker_health(Conn) ->
                 class => Class,
                 error => Error
             }),
-            false
+            {error, {Class, Error}}
     end.
 
 do_test_query(Conn) ->

+ 25 - 12
apps/emqx_resource/src/emqx_resource_pool.erl

@@ -20,7 +20,8 @@
     start/3,
     stop/1,
     health_check_workers/2,
-    health_check_workers/3
+    health_check_workers/3,
+    health_check_workers/4
 ]).
 
 -include_lib("emqx/include/logger.hrl").
@@ -66,9 +67,13 @@ stop(Name) ->
     end.
 
 health_check_workers(PoolName, CheckFunc) ->
-    health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
+    health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT, _Opts = #{}).
 
 health_check_workers(PoolName, CheckFunc, Timeout) ->
+    health_check_workers(PoolName, CheckFunc, Timeout, _Opts = #{}).
+
+health_check_workers(PoolName, CheckFunc, Timeout, Opts) ->
+    ReturnValues = maps:get(return_values, Opts, false),
     Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
     DoPerWorker =
         fun(Worker) ->
@@ -76,18 +81,26 @@ health_check_workers(PoolName, CheckFunc, Timeout) ->
                 {ok, Conn} ->
                     erlang:is_process_alive(Conn) andalso
                         ecpool_worker:exec(Worker, CheckFunc, Timeout);
-                _ ->
-                    false
+                Error ->
+                    Error
             end
         end,
-    try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
-        [_ | _] = Status ->
-            lists:all(fun(St) -> St =:= true end, Status);
-        [] ->
-            false
-    catch
-        exit:timeout ->
-            false
+    Results =
+        try
+            {ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)}
+        catch
+            exit:timeout ->
+                {error, timeout}
+        end,
+    case ReturnValues of
+        true ->
+            Results;
+        false ->
+            case Results of
+                {ok, []} -> false;
+                {ok, Rs = [_ | _]} -> lists:all(fun(St) -> St =:= true end, Rs);
+                _ -> false
+            end
     end.
 
 parse_reason({

+ 1 - 0
changes/ee/fix-11107.en.md

@@ -0,0 +1 @@
+Now we return the health check failure reason when probing a MongoDB bridge.

+ 41 - 1
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl

@@ -28,7 +28,8 @@ group_tests() ->
         t_setup_via_http_api_and_publish,
         t_payload_template,
         t_collection_template,
-        t_mongo_date_rule_engine_functions
+        t_mongo_date_rule_engine_functions,
+        t_get_status_server_selection_too_short
     ].
 
 groups() ->
@@ -317,6 +318,27 @@ send_message(Config, Payload) ->
     BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
     emqx_bridge:send_message(BridgeID, Payload).
 
+probe_bridge_api(Config) ->
+    probe_bridge_api(Config, _Overrides = #{}).
+
+probe_bridge_api(Config, Overrides) ->
+    Name = ?config(mongo_name, Config),
+    TypeBin = mongo_type_bin(?config(mongo_type, Config)),
+    MongoConfig0 = ?config(mongo_config, Config),
+    MongoConfig = emqx_utils_maps:deep_merge(MongoConfig0, Overrides),
+    Params = MongoConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("probing bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
+            Error -> Error
+        end,
+    ct:pal("bridge probe result: ~p", [Res]),
+    Res.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -453,3 +475,21 @@ t_mongo_date_rule_engine_functions(Config) ->
         find_all_wait_until_non_empty(Config)
     ),
     ok.
+
+t_get_status_server_selection_too_short(Config) ->
+    Res = probe_bridge_api(
+        Config,
+        #{
+            <<"topology">> => #{<<"server_selection_timeout_ms">> => <<"1ms">>}
+        }
+    ),
+    ?assertMatch({error, {{_, 400, _}, _Headers, _Body}}, Res),
+    {error, {{_, 400, _}, _Headers, Body}} = Res,
+    ?assertMatch(
+        #{
+            <<"code">> := <<"TEST_FAILED">>,
+            <<"message">> := <<"timeout">>
+        },
+        emqx_utils_json:decode(Body)
+    ),
+    ok.