Ver código fonte

fix(mongodb): fix mongodb connection healthcheck

Ilya Averyanov 3 anos atrás
pai
commit
0dae3f43a9

+ 47 - 1
apps/emqx/src/emqx_misc.erl

@@ -49,7 +49,9 @@
     ipv6_probe/1,
     gen_id/0,
     gen_id/1,
-    explain_posix/1
+    explain_posix/1,
+    pmap/2,
+    pmap/3
 ]).
 
 -export([
@@ -61,6 +63,8 @@
 
 -define(SHORT, 8).
 
+-define(DEFAULT_PMAP_TIMEOUT, 5000).
+
 %% @doc Parse v4 or v6 string format address to tuple.
 %% `Host' itself is returned if it's not an ip string.
 maybe_parse_ip(Host) ->
@@ -371,6 +375,25 @@ explain_posix(estale) -> "Stale remote file handle";
 explain_posix(exdev) -> "Cross-domain link";
 explain_posix(NotPosix) -> NotPosix.
 
+-spec pmap(fun((A) -> B), list(A)) -> list(B | {error, term()}).
+pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
+    pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
+
+-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B | {error, term()}).
+pmap(Fun, List, Timeout) when
+    is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
+->
+    Self = self(),
+    Pids = lists:map(
+        fun(El) ->
+            spawn_link(
+                fun() -> pmap_exec(Self, Fun, El, Timeout) end
+            )
+        end,
+        List
+    ),
+    pmap_gather(Pids).
+
 %%------------------------------------------------------------------------------
 %% Internal Functions
 %%------------------------------------------------------------------------------
@@ -395,6 +418,29 @@ pad(L, 0) ->
 pad(L, Count) ->
     pad([$0 | L], Count - 1).
 
+pmap_gather([Pid | Pids]) ->
+    receive
+        {Pid, Result} -> [Result | pmap_gather(Pids)]
+    end;
+pmap_gather([]) ->
+    [].
+
+pmap_exec(CallerPid, Fun, El, Timeout) ->
+    ExecPid = self(),
+    {Pid, Ref} = spawn_monitor(fun() ->
+        Result = Fun(El),
+        ExecPid ! {result, self(), Result}
+    end),
+    ExecResult =
+        receive
+            {result, Pid, Result} -> Result;
+            {'DOWN', Ref, process, Pid, Reason} -> {error, Reason}
+        after Timeout ->
+            true = erlang:exit(Pid, kill),
+            {error, timeout}
+        end,
+    CallerPid ! {ExecPid, ExecResult}.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 

+ 1 - 1
apps/emqx/test/emqx_bpapi_static_checks.erl

@@ -59,7 +59,7 @@
 %% List of functions in the RPC backend modules that we can ignore:
 
 % TODO: handle pmap
--define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1, rpc:pmap/3").
+-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1").
 %% List of business-layer functions that are exempt from the checks:
 -define(EXEMPTIONS,
     % Reason: legacy code. A fun and a QC query are

+ 32 - 0
apps/emqx/test/emqx_misc_SUITE.erl

@@ -170,3 +170,35 @@ t_now_to_ms(_) ->
 t_gen_id(_) ->
     ?assertEqual(10, length(emqx_misc:gen_id(10))),
     ?assertEqual(20, length(emqx_misc:gen_id(20))).
+
+t_pmap(_) ->
+    ?assertEqual(
+        [5, 7, 9],
+        emqx_misc:pmap(
+            fun({A, B}) -> A + B end,
+            [{2, 3}, {3, 4}, {4, 5}]
+        )
+    ),
+
+    ?assertEqual(
+        [5, 7, {error, timeout}],
+        emqx_misc:pmap(
+            fun
+                (timeout) -> ct:sleep(1000);
+                ({A, B}) -> A + B
+            end,
+            [{2, 3}, {3, 4}, timeout],
+            100
+        )
+    ),
+
+    ?assertMatch(
+        [5, 7, {error, _}],
+        emqx_misc:pmap(
+            fun
+                (error) -> error(exc);
+                ({A, B}) -> A + B
+            end,
+            [{2, 3}, {3, 4}, error]
+        )
+    ).

+ 4 - 2
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -241,8 +241,10 @@ on_get_status(InstId, #{poolname := PoolName} = _State) ->
 
 health_check(PoolName) ->
     Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
-    Status = rpc:pmap({?MODULE, check_worker_health}, [], Workers),
-    length(Status) > 0 andalso lists:all(fun(St) -> St end, Status).
+    Status = emqx_misc:pmap(
+        fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
+    ),
+    length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status).
 
 %% ===================================================================