emqx_plugin_libs_pool.erl 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_pool).
  17. -export([
  18. start_pool/3,
  19. stop_pool/1,
  20. pool_name/1,
  21. health_check_ecpool_workers/2,
  22. health_check_ecpool_workers/3
  23. ]).
  24. -include_lib("emqx/include/logger.hrl").
  25. -define(HEALTH_CHECK_TIMEOUT, 15000).
  26. pool_name(ID) when is_binary(ID) ->
  27. list_to_atom(binary_to_list(ID)).
  28. start_pool(Name, Mod, Options) ->
  29. case ecpool:start_sup_pool(Name, Mod, Options) of
  30. {ok, _} ->
  31. ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}),
  32. ok;
  33. {error, {already_started, _Pid}} ->
  34. stop_pool(Name),
  35. start_pool(Name, Mod, Options);
  36. {error, Reason} ->
  37. NReason = parse_reason(Reason),
  38. ?SLOG(error, #{
  39. msg => "start_ecpool_error",
  40. pool_name => Name,
  41. reason => NReason
  42. }),
  43. {error, {start_pool_failed, Name, NReason}}
  44. end.
  45. stop_pool(Name) ->
  46. case ecpool:stop_sup_pool(Name) of
  47. ok ->
  48. ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name});
  49. {error, not_found} ->
  50. ok;
  51. {error, Reason} ->
  52. ?SLOG(error, #{
  53. msg => "stop_ecpool_failed",
  54. pool_name => Name,
  55. reason => Reason
  56. }),
  57. error({stop_pool_failed, Name, Reason})
  58. end.
  59. health_check_ecpool_workers(PoolName, CheckFunc) ->
  60. health_check_ecpool_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
  61. health_check_ecpool_workers(PoolName, CheckFunc, Timeout) ->
  62. Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  63. DoPerWorker =
  64. fun(Worker) ->
  65. case ecpool_worker:client(Worker) of
  66. {ok, Conn} ->
  67. erlang:is_process_alive(Conn) andalso
  68. ecpool_worker:exec(Worker, CheckFunc, Timeout);
  69. _ ->
  70. false
  71. end
  72. end,
  73. try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
  74. [_ | _] = Status ->
  75. lists:all(fun(St) -> St =:= true end, Status);
  76. [] ->
  77. false
  78. catch
  79. exit:timeout ->
  80. false
  81. end.
  82. parse_reason({
  83. {shutdown, {failed_to_start_child, _, {shutdown, {failed_to_start_child, _, Reason}}}},
  84. _
  85. }) ->
  86. Reason;
  87. parse_reason(Reason) ->
  88. Reason.