emqttd_pooler.erl 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_pooler).
  17. -behaviour(gen_server).
  18. -include("emqttd_internal.hrl").
  19. %% Start the pool supervisor
  20. -export([start_link/0]).
  21. %% API Exports
  22. -export([start_link/2, submit/1, async_submit/1]).
  23. %% gen_server Function Exports
  24. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  25. terminate/2, code_change/3]).
  26. -record(state, {pool, id}).
  27. %% @doc Start Pooler Supervisor.
  28. start_link() ->
  29. emqttd_pool_sup:start_link(pooler, random, {?MODULE, start_link, []}).
  30. %%--------------------------------------------------------------------
  31. %% API
  32. %%--------------------------------------------------------------------
  33. -spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
  34. start_link(Pool, Id) ->
  35. gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
  36. %% @doc Submit work to pooler
  37. submit(Fun) -> gen_server:call(worker(), {submit, Fun}, infinity).
  38. %% @doc Submit work to pooler asynchronously
  39. async_submit(Fun) ->
  40. gen_server:cast(worker(), {async_submit, Fun}).
  41. worker() ->
  42. gproc_pool:pick_worker(pooler).
  43. %%--------------------------------------------------------------------
  44. %% gen_server callbacks
  45. %%--------------------------------------------------------------------
  46. init([Pool, Id]) ->
  47. ?GPROC_POOL(join, Pool, Id),
  48. {ok, #state{pool = Pool, id = Id}}.
  49. handle_call({submit, Fun}, _From, State) ->
  50. {reply, run(Fun), State};
  51. handle_call(_Req, _From, State) ->
  52. {reply, ok, State}.
  53. handle_cast({async_submit, Fun}, State) ->
  54. try run(Fun)
  55. catch _:Error ->
  56. lager:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()])
  57. end,
  58. {noreply, State};
  59. handle_cast(_Msg, State) ->
  60. {noreply, State}.
  61. handle_info(_Info, State) ->
  62. {noreply, State}.
  63. terminate(_Reason, #state{pool = Pool, id = Id}) ->
  64. ?GPROC_POOL(leave, Pool, Id), ok.
  65. code_change(_OldVsn, State, _Extra) ->
  66. {ok, State}.
  67. %%--------------------------------------------------------------------
  68. %% Internal functions
  69. %%--------------------------------------------------------------------
  70. run({M, F, A}) ->
  71. erlang:apply(M, F, A);
  72. run(Fun) when is_function(Fun) ->
  73. Fun().