| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqttd_pooler).
- -behaviour(gen_server).
- -include("emqttd_internal.hrl").
- %% Start the pool supervisor
- -export([start_link/0]).
- %% API Exports
- -export([start_link/2, submit/1, async_submit/1]).
- %% gen_server Function Exports
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
- -record(state, {pool, id}).
- %% @doc Start Pooler Supervisor.
- start_link() ->
- emqttd_pool_sup:start_link(pooler, random, {?MODULE, start_link, []}).
- %%--------------------------------------------------------------------
- %% API
- %%--------------------------------------------------------------------
- -spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
- start_link(Pool, Id) ->
- gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
- %% @doc Submit work to pooler
- submit(Fun) -> gen_server:call(worker(), {submit, Fun}, infinity).
- %% @doc Submit work to pooler asynchronously
- async_submit(Fun) ->
- gen_server:cast(worker(), {async_submit, Fun}).
- worker() ->
- gproc_pool:pick_worker(pooler).
- %%--------------------------------------------------------------------
- %% gen_server callbacks
- %%--------------------------------------------------------------------
- init([Pool, Id]) ->
- ?GPROC_POOL(join, Pool, Id),
- {ok, #state{pool = Pool, id = Id}}.
- handle_call({submit, Fun}, _From, State) ->
- {reply, run(Fun), State};
- handle_call(_Req, _From, State) ->
- {reply, ok, State}.
- handle_cast({async_submit, Fun}, State) ->
- try run(Fun)
- catch _:Error ->
- lager:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()])
- end,
- {noreply, State};
- handle_cast(_Msg, State) ->
- {noreply, State}.
- handle_info(_Info, State) ->
- {noreply, State}.
- terminate(_Reason, #state{pool = Pool, id = Id}) ->
- ?GPROC_POOL(leave, Pool, Id), ok.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %%--------------------------------------------------------------------
- %% Internal functions
- %%--------------------------------------------------------------------
- run({M, F, A}) ->
- erlang:apply(M, F, A);
- run(Fun) when is_function(Fun) ->
- Fun().
|