emqx_plugin_libs_pool.erl 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_pool).
  17. -export([ start_pool/3
  18. , stop_pool/1
  19. , pool_name/1
  20. , health_check/3
  21. ]).
  22. -include_lib("emqx/include/logger.hrl").
  23. pool_name(ID) when is_binary(ID) ->
  24. list_to_atom(binary_to_list(ID)).
  25. start_pool(Name, Mod, Options) ->
  26. case ecpool:start_sup_pool(Name, Mod, Options) of
  27. {ok, _} ->
  28. ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}),
  29. ok;
  30. {error, {already_started, _Pid}} ->
  31. stop_pool(Name),
  32. start_pool(Name, Mod, Options);
  33. {error, Reason} ->
  34. ?SLOG(error, #{msg => "start_ecpool_error", pool_name => Name,
  35. reason => Reason}),
  36. {error, {start_pool_failed, Name, Reason}}
  37. end.
  38. stop_pool(Name) ->
  39. case ecpool:stop_sup_pool(Name) of
  40. ok ->
  41. ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name});
  42. {error, not_found} ->
  43. ok;
  44. {error, Reason} ->
  45. ?SLOG(error, #{msg => "stop_ecpool_failed", pool_name => Name,
  46. reason => Reason}),
  47. error({stop_pool_failed, Name, Reason})
  48. end.
  49. health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) ->
  50. Status = [begin
  51. case ecpool_worker:client(Worker) of
  52. {ok, Conn} -> CheckFunc(Conn);
  53. _ -> false
  54. end
  55. end || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  56. case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
  57. true -> {ok, State};
  58. false -> {error, health_check_failed, State}
  59. end.