Просмотр исходного кода

Merge pull request #7496 from zmstone/0401-5.0-add-parallel-map-lib

refactor: refine pmap with nolink_apply
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
99d50d2455

+ 90 - 34
apps/emqx/src/emqx_misc.erl

@@ -59,8 +59,15 @@
     hexstr_to_bin/1
 ]).
 
+-export([
+    nolink_apply/1,
+    nolink_apply/2
+]).
+
 -export([clamp/3]).
 
+-dialyzer({nowarn_function, [nolink_apply/2]}).
+
 -define(SHORT, 8).
 
 -define(DEFAULT_PMAP_TIMEOUT, 5000).
@@ -375,29 +382,101 @@ explain_posix(estale) -> "Stale remote file handle";
 explain_posix(exdev) -> "Cross-domain link";
 explain_posix(NotPosix) -> NotPosix.
 
--spec pmap(fun((A) -> B), list(A)) -> list(B | {error, term()}).
+%% @doc Like lists:map/2, only the callback function is evaluated
+%% concurrently.
+-spec pmap(fun((A) -> B), list(A)) -> list(B).
 pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
     pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
 
--spec pmap(fun((A) -> B), list(A), timeout()) -> list(B | {error, term()}).
+-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
 pmap(Fun, List, Timeout) when
     is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
 ->
-    Self = self(),
-    Pids = lists:map(
-        fun(El) ->
-            spawn_link(
-                fun() -> pmap_exec(Self, Fun, El, Timeout) end
-            )
-        end,
-        List
+    nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
+
+%% @doc Delegate a function to a worker process.
+%% The function may spawn_link other processes but we do not
+%% want the caller process to be linked.
+%% This is done by isolating the possible link with a not-linked
+%% middleman process.
+nolink_apply(Fun) -> nolink_apply(Fun, infinity).
+
+%% @doc Same as `nolink_apply/1', with a timeout.
+-spec nolink_apply(function(), timer:timeout()) -> term().
+nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
+    Caller = self(),
+    ResRef = make_ref(),
+    Middleman = erlang:spawn(
+        fun() ->
+            process_flag(trap_exit, true),
+            CallerMRef = erlang:monitor(process, Caller),
+            Worker = erlang:spawn_link(
+                fun() ->
+                    Res =
+                        try
+                            {normal, Fun()}
+                        catch
+                            C:E:S ->
+                                {exception, {C, E, S}}
+                        end,
+                    _ = erlang:send(Caller, {ResRef, Res}),
+                    exit(normal)
+                end
+            ),
+            receive
+                {'DOWN', CallerMRef, process, _, _} ->
+                    %% For whatever reason, if the caller is dead,
+                    %% there is no reason to continue
+                    exit(Worker, kill),
+                    exit(normal);
+                {'EXIT', Worker, normal} ->
+                    exit(normal);
+                {'EXIT', Worker, Reason} ->
+                    %% worker exited with some reason other than 'normal'
+                    _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}),
+                    exit(normal)
+            end
+        end
     ),
-    pmap_gather(Pids).
+    receive
+        {ResRef, {normal, Result}} ->
+            Result;
+        {ResRef, {exception, {C, E, S}}} ->
+            erlang:raise(C, E, S);
+        {ResRef, {'EXIT', Reason}} ->
+            exit(Reason)
+    after Timeout ->
+        exit(Middleman, kill),
+        exit(timeout)
+    end.
 
 %%------------------------------------------------------------------------------
 %% Internal Functions
 %%------------------------------------------------------------------------------
 
+do_parallel_map(Fun, List) ->
+    Parent = self(),
+    PidList = lists:map(
+        fun(Item) ->
+            erlang:spawn_link(
+                fun() ->
+                    Parent ! {self(), Fun(Item)}
+                end
+            )
+        end,
+        List
+    ),
+    lists:foldr(
+        fun(Pid, Acc) ->
+            receive
+                {Pid, Result} ->
+                    [Result | Acc]
+            end
+        end,
+        [],
+        PidList
+    ).
+
 int_to_hex(I, N) when is_integer(I), I >= 0 ->
     int_to_hex([], I, 1, N).
 
@@ -418,29 +497,6 @@ pad(L, 0) ->
 pad(L, Count) ->
     pad([$0 | L], Count - 1).
 
-pmap_gather([Pid | Pids]) ->
-    receive
-        {Pid, Result} -> [Result | pmap_gather(Pids)]
-    end;
-pmap_gather([]) ->
-    [].
-
-pmap_exec(CallerPid, Fun, El, Timeout) ->
-    ExecPid = self(),
-    {Pid, Ref} = spawn_monitor(fun() ->
-        Result = Fun(El),
-        ExecPid ! {result, self(), Result}
-    end),
-    ExecResult =
-        receive
-            {result, Pid, Result} -> Result;
-            {'DOWN', Ref, process, Pid, Reason} -> {error, Reason}
-        after Timeout ->
-            true = erlang:exit(Pid, kill),
-            {error, timeout}
-        end,
-    CallerPid ! {ExecPid, ExecResult}.
-
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 

+ 10 - 8
apps/emqx/test/emqx_misc_SUITE.erl

@@ -171,17 +171,18 @@ t_gen_id(_) ->
     ?assertEqual(10, length(emqx_misc:gen_id(10))),
     ?assertEqual(20, length(emqx_misc:gen_id(20))).
 
-t_pmap(_) ->
+t_pmap_normal(_) ->
     ?assertEqual(
         [5, 7, 9],
         emqx_misc:pmap(
             fun({A, B}) -> A + B end,
             [{2, 3}, {3, 4}, {4, 5}]
         )
-    ),
+    ).
 
-    ?assertEqual(
-        [5, 7, {error, timeout}],
+t_pmap_timeout(_) ->
+    ?assertExit(
+        timeout,
         emqx_misc:pmap(
             fun
                 (timeout) -> ct:sleep(1000);
@@ -190,13 +191,14 @@ t_pmap(_) ->
             [{2, 3}, {3, 4}, timeout],
             100
         )
-    ),
+    ).
 
-    ?assertMatch(
-        [5, 7, {error, _}],
+t_pmap_exception(_) ->
+    ?assertExit(
+        {foobar, _},
         emqx_misc:pmap(
             fun
-                (error) -> error(exc);
+                (error) -> error(foobar);
                 ({A, B}) -> A + B
             end,
             [{2, 3}, {3, 4}, error]

+ 2 - 2
apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 

+ 2 - 2
apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").

+ 2 - 2
apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 

+ 2 - 2
apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 

+ 2 - 2
apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").

+ 2 - 2
apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 

+ 2 - 2
apps/emqx_authn/test/emqx_authn_redis_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 

+ 2 - 2
apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl

@@ -19,8 +19,8 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_connector.hrl").
--include("emqx_authn.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_authn/include/emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 

+ 13 - 4
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -241,10 +241,19 @@ on_get_status(InstId, #{poolname := PoolName} = _State) ->
 
 health_check(PoolName) ->
     Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
-    Status = emqx_misc:pmap(
-        fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
-    ),
-    length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status).
+    try
+        emqx_misc:pmap(
+            fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
+        )
+    of
+        [_ | _] = Status ->
+            lists:all(fun(St) -> St =:= true end, Status);
+        [] ->
+            false
+    catch
+        exit:timeout ->
+            false
+    end.
 
 %% ===================================================================