emqx_plugin_libs_pool.erl 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_plugin_libs_pool).
  17. -export([ start_pool/3
  18. , stop_pool/1
  19. , pool_name/1
  20. , health_check/3
  21. ]).
  22. -include_lib("emqx/include/logger.hrl").
  23. pool_name(ID) when is_binary(ID) ->
  24. list_to_atom(binary_to_list(ID)).
  25. start_pool(Name, Mod, Options) ->
  26. case ecpool:start_sup_pool(Name, Mod, Options) of
  27. {ok, _} ->
  28. ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name});
  29. {error, {already_started, _Pid}} ->
  30. stop_pool(Name),
  31. start_pool(Name, Mod, Options);
  32. {error, Reason} ->
  33. ?SLOG(error, #{msg => "start_ecpool_error", pool_name => Name,
  34. reason => Reason}),
  35. error({start_pool_failed, Name})
  36. end.
  37. stop_pool(Name) ->
  38. case ecpool:stop_sup_pool(Name) of
  39. ok ->
  40. ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name});
  41. {error, not_found} ->
  42. ok;
  43. {error, Reason} ->
  44. ?SLOG(error, #{msg => "stop_ecpool_failed", pool_name => Name,
  45. reason => Reason}),
  46. error({stop_pool_failed, Name})
  47. end.
  48. health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) ->
  49. Status = [begin
  50. case ecpool_worker:client(Worker) of
  51. {ok, Conn} -> CheckFunc(Conn);
  52. _ -> false
  53. end
  54. end || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
  55. case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
  56. true -> {ok, State};
  57. false -> {error, health_check_failed, State}
  58. end.