emqx_plugin_libs_pool.erl 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_pool).
  17. -export([
  18. start_pool/3,
  19. stop_pool/1,
  20. pool_name/1,
  21. health_check_ecpool_workers/2,
  22. health_check_ecpool_workers/3
  23. ]).
  24. -include_lib("emqx/include/logger.hrl").
  25. -define(HEALTH_CHECK_TIMEOUT, 15000).
  26. pool_name(ID) when is_binary(ID) ->
  27. list_to_atom(binary_to_list(ID)).
  28. start_pool(Name, Mod, Options) ->
  29. case ecpool:start_sup_pool(Name, Mod, Options) of
  30. {ok, _} ->
  31. ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}),
  32. ok;
  33. {error, {already_started, _Pid}} ->
  34. stop_pool(Name),
  35. start_pool(Name, Mod, Options);
  36. {error, Reason} ->
  37. NReason = parse_reason(Reason),
  38. ?SLOG(error, #{
  39. msg => "start_ecpool_error",
  40. pool_name => Name,
  41. reason => NReason
  42. }),
  43. {error, {start_pool_failed, Name, NReason}}
  44. end.
  45. stop_pool(Name) ->
  46. case ecpool:stop_sup_pool(Name) of
  47. ok ->
  48. ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name});
  49. {error, not_found} ->
  50. ok;
  51. {error, Reason} ->
  52. ?SLOG(error, #{
  53. msg => "stop_ecpool_failed",
  54. pool_name => Name,
  55. reason => Reason
  56. }),
  57. error({stop_pool_failed, Name, Reason})
  58. end.
  59. health_check_ecpool_workers(PoolName, CheckFunc) ->
  60. health_check_ecpool_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
  61. health_check_ecpool_workers(PoolName, CheckFunc, Timeout) when is_function(CheckFunc) ->
  62. Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  63. DoPerWorker =
  64. fun(Worker) ->
  65. case ecpool_worker:client(Worker) of
  66. {ok, Conn} ->
  67. erlang:is_process_alive(Conn) andalso CheckFunc(Conn);
  68. _ ->
  69. false
  70. end
  71. end,
  72. try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
  73. [_ | _] = Status ->
  74. lists:all(fun(St) -> St =:= true end, Status);
  75. [] ->
  76. false
  77. catch
  78. exit:timeout ->
  79. false
  80. end.
  81. parse_reason({
  82. {shutdown, {failed_to_start_child, _, {shutdown, {failed_to_start_child, _, Reason}}}},
  83. _
  84. }) ->
  85. Reason;
  86. parse_reason(Reason) ->
  87. Reason.