gen_server2.erl 54 KB


  1. %% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
  2. %% distribution, with the following modifications:
  3. %%
  4. %% 1) the module name is gen_server2
  5. %%
  6. %% 2) more efficient handling of selective receives in callbacks
  7. %% gen_server2 processes drain their message queue into an internal
  8. %% buffer before invoking any callback module functions. Messages are
  9. %% dequeued from the buffer for processing. Thus the effective message
  10. %% queue of a gen_server2 process is the concatenation of the internal
  11. %% buffer and the real message queue.
  12. %% As a result of the draining, any selective receive invoked inside a
  13. %% callback is less likely to have to scan a large message queue.
  14. %%
  15. %% 3) gen_server2:cast is guaranteed to be order-preserving
  16. %% The original code could reorder messages when communicating with a
  17. %% process on a remote node that was not currently connected.
  18. %%
  19. %% 4) The callback module can optionally implement prioritise_call/4,
  20. %% prioritise_cast/3 and prioritise_info/3. These functions take
  21. %% Message, From, Length and State or just Message, Length and State
  22. %% (where Length is the current number of messages waiting to be
  23. %% processed) and return a single integer representing the priority
  24. %% attached to the message, or 'drop' to ignore it (for
  25. %% prioritise_cast/3 and prioritise_info/3 only). Messages with
  26. %% higher priorities are processed before requests with lower
  27. %% priorities. The default priority is 0.
  28. %%
  29. %% 5) The callback module can optionally implement
  30. %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
  31. %% called immediately prior to and post hibernation, respectively. If
  32. %% handle_pre_hibernate returns {hibernate, NewState} then the process
  33. %% will hibernate. If the module does not implement
  34. %% handle_pre_hibernate/1 then the default action is to hibernate.
  35. %%
  36. %% 6) init can return a 4th arg, {backoff, InitialTimeout,
  37. %% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds,
  38. %% 'infinity' does not make sense here). Then, on all callbacks which
  39. %% can return a timeout (including init), timeout can be
  40. %% 'hibernate'. When this is the case, the current timeout value will
  41. %% be used (initially, the InitialTimeout supplied from init). After
  42. %% this timeout has occurred, hibernation will occur as normal. Upon
  43. %% awaking, a new current timeout value will be calculated.
  44. %%
  45. %% The purpose is that the gen_server2 takes care of adjusting the
  46. %% current timeout value such that the process will increase the
  47. %% timeout value repeatedly if it is unable to sleep for the
  48. %% DesiredHibernatePeriod. If it is able to sleep for the
  49. %% DesiredHibernatePeriod it will decrease the current timeout down to
  50. %% the MinimumTimeout, so that the process is put to sleep sooner (and
  51. %% hopefully stays asleep for longer). In short, should a process
  52. %% using this receive a burst of messages, it should not hibernate
  53. %% between those messages, but as the messages become less frequent,
  54. %% the process will not only hibernate, it will do so sooner after
  55. %% each message.
  56. %%
  57. %% When using this backoff mechanism, normal timeout values (i.e. not
  58. %% 'hibernate') can still be used, and if they are used then the
  59. %% handle_info(timeout, State) will be called as normal. In this case,
  60. %% returning 'hibernate' from handle_info(timeout, State) will not
  61. %% hibernate the process immediately, as it would if backoff wasn't
  62. %% being used. Instead it'll wait for the current timeout as described
  63. %% above.
  64. %%
  65. %% 7) The callback module can return from any of the handle_*
  66. %% functions, a {become, Module, State} triple, or a {become, Module,
  67. %% State, Timeout} quadruple. This allows the gen_server to
  68. %% dynamically change the callback module. The State is the new state
  69. %% which will be passed into any of the callback functions in the new
  70. %% module. Note there is no form also encompassing a reply, thus if
  71. %% you wish to reply in handle_call/3 and change the callback module,
  72. %% you need to use gen_server2:reply/2 to issue the reply
  73. %% manually. The init function can similarly return a 5th argument,
  74. %% Module, in order to dynamically decide the callback module on init.
  75. %%
  76. %% 8) The callback module can optionally implement
  77. %% format_message_queue/2 which is the equivalent of format_status/2
  78. %% but where the second argument is specifically the priority_queue
  79. %% which contains the prioritised message_queue.
  80. %%
  81. %% 9) The function with_state/2 can be used to debug a process with
  82. %% heavyweight state (without needing to copy the entire state out of
  83. %% process as sys:get_status/1 would). Pass through a function which
  84. %% can be invoked on the state, get back the result. The state is not
  85. %% modified.
  86. %%
  87. %% 10) an mcall/1 function has been added for performing multiple
  88. %% call/3 in parallel. Unlike multi_call, which sends the same request
  89. %% to same-named processes residing on a supplied list of nodes, it
  90. %% operates on name/request pairs, where name is anything accepted by
  91. %% call/3, i.e. a pid, global name, local name, or local name on a
  92. %% particular node.
  93. %%
  94. %% All modifications are (C) 2009-2013 GoPivotal, Inc.
  95. %% ``The contents of this file are subject to the Erlang Public License,
  96. %% Version 1.1, (the "License"); you may not use this file except in
  97. %% compliance with the License. You should have received a copy of the
  98. %% Erlang Public License along with this software. If not, it can be
  99. %% retrieved via the world wide web at http://www.erlang.org/.
  100. %%
  101. %% Software distributed under the License is distributed on an "AS IS"
  102. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  103. %% the License for the specific language governing rights and limitations
  104. %% under the License.
  105. %%
  106. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  107. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  108. %% AB. All Rights Reserved.''
  109. %%
  110. %% $Id$
  111. %%
  112. -module(gen_server2).
  113. %%% ---------------------------------------------------
  114. %%%
  115. %%% The idea behind THIS server is that the user module
  116. %%% provides (different) functions to handle different
  117. %%% kind of inputs.
  118. %%% If the Parent process terminates the Module:terminate/2
  119. %%% function is called.
  120. %%%
  121. %%% The user module should export:
  122. %%%
  123. %%% init(Args)
  124. %%% ==> {ok, State}
  125. %%% {ok, State, Timeout}
  126. %%% {ok, State, Timeout, Backoff}
  127. %%% {ok, State, Timeout, Backoff, Module}
  128. %%% ignore
  129. %%% {stop, Reason}
  130. %%%
  131. %%% handle_call(Msg, {From, Tag}, State)
  132. %%%
  133. %%% ==> {reply, Reply, State}
  134. %%% {reply, Reply, State, Timeout}
  135. %%% {noreply, State}
  136. %%% {noreply, State, Timeout}
  137. %%% {stop, Reason, Reply, State}
  138. %%% Reason = normal | shutdown | Term terminate(State) is called
  139. %%%
  140. %%% handle_cast(Msg, State)
  141. %%%
  142. %%% ==> {noreply, State}
  143. %%% {noreply, State, Timeout}
  144. %%% {stop, Reason, State}
  145. %%% Reason = normal | shutdown | Term terminate(State) is called
  146. %%%
  147. %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
  148. %%%
  149. %%% ==> {noreply, State}
  150. %%% {noreply, State, Timeout}
  151. %%% {stop, Reason, State}
  152. %%% Reason = normal | shutdown | Term, terminate(State) is called
  153. %%%
  154. %%% terminate(Reason, State) Let the user module clean up
  155. %%% Reason = normal | shutdown | {shutdown, Term} | Term
  156. %%% always called when server terminates
  157. %%%
  158. %%% ==> ok | Term
  159. %%%
  160. %%% handle_pre_hibernate(State)
  161. %%%
  162. %%% ==> {hibernate, State}
  163. %%% {stop, Reason, State}
  164. %%% Reason = normal | shutdown | Term, terminate(State) is called
  165. %%%
  166. %%% handle_post_hibernate(State)
  167. %%%
  168. %%% ==> {noreply, State}
  169. %%% {stop, Reason, State}
  170. %%% Reason = normal | shutdown | Term, terminate(State) is called
  171. %%%
  172. %%% The work flow (of the server) can be described as follows:
  173. %%%
  174. %%% User module Generic
  175. %%% ----------- -------
  176. %%% start -----> start
  177. %%% init <----- .
  178. %%%
  179. %%% loop
  180. %%% handle_call <----- .
  181. %%% -----> reply
  182. %%%
  183. %%% handle_cast <----- .
  184. %%%
  185. %%% handle_info <----- .
  186. %%%
  187. %%% terminate <----- .
  188. %%%
  189. %%% -----> reply
  190. %%%
  191. %%%
  192. %%% ---------------------------------------------------
  193. %% API
  194. -export([start/3, start/4,
  195. start_link/3, start_link/4,
  196. call/2, call/3,
  197. cast/2, reply/2,
  198. abcast/2, abcast/3,
  199. multi_call/2, multi_call/3, multi_call/4,
  200. mcall/1,
  201. with_state/2,
  202. enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
  203. %% System exports
  204. -export([system_continue/3,
  205. system_terminate/4,
  206. system_code_change/4,
  207. format_status/2]).
  208. %% Internal exports
  209. -export([init_it/6]).
  210. -import(error_logger, [format/2]).
  211. %% State record
  212. -record(gs2_state, {parent, name, state, mod, time,
  213. timeout_state, queue, debug, prioritisers}).
  214. -ifdef(use_specs).
  215. %%%=========================================================================
  216. %%% Specs. These exist only to shut up dialyzer's warnings
  217. %%%=========================================================================
  218. -type(gs2_state() :: #gs2_state{}).
  219. -spec(handle_common_termination/3 ::
  220. (any(), atom(), gs2_state()) -> no_return()).
  221. -spec(hibernate/1 :: (gs2_state()) -> no_return()).
  222. -spec(pre_hibernate/1 :: (gs2_state()) -> no_return()).
  223. -spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()).
  224. -type(millis() :: non_neg_integer()).
  225. %%%=========================================================================
  226. %%% API
  227. %%%=========================================================================
  228. -callback init(Args :: term()) ->
  229. {ok, State :: term()} |
  230. {ok, State :: term(), timeout() | hibernate} |
  231. {ok, State :: term(), timeout() | hibernate,
  232. {backoff, millis(), millis(), millis()}} |
  233. {ok, State :: term(), timeout() | hibernate,
  234. {backoff, millis(), millis(), millis()}, atom()} |
  235. ignore |
  236. {stop, Reason :: term()}.
  237. -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
  238. State :: term()) ->
  239. {reply, Reply :: term(), NewState :: term()} |
  240. {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} |
  241. {noreply, NewState :: term()} |
  242. {noreply, NewState :: term(), timeout() | hibernate} |
  243. {stop, Reason :: term(),
  244. Reply :: term(), NewState :: term()}.
  245. -callback handle_cast(Request :: term(), State :: term()) ->
  246. {noreply, NewState :: term()} |
  247. {noreply, NewState :: term(), timeout() | hibernate} |
  248. {stop, Reason :: term(), NewState :: term()}.
  249. -callback handle_info(Info :: term(), State :: term()) ->
  250. {noreply, NewState :: term()} |
  251. {noreply, NewState :: term(), timeout() | hibernate} |
  252. {stop, Reason :: term(), NewState :: term()}.
  253. -callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
  254. State :: term()) ->
  255. ok | term().
  256. -callback code_change(OldVsn :: (term() | {down, term()}), State :: term(),
  257. Extra :: term()) ->
  258. {ok, NewState :: term()} | {error, Reason :: term()}.
  259. %% It's not possible to define "optional" -callbacks, so putting specs
  260. %% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result
  261. %% in warnings (the same applied for the behaviour_info before).
  262. -else.
  263. -export([behaviour_info/1]).
  264. behaviour_info(callbacks) ->
  265. [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
  266. {terminate,2},{code_change,3}];
  267. behaviour_info(_Other) ->
  268. undefined.
  269. -endif.
  270. %%% -----------------------------------------------------------------
  271. %%% Starts a generic server.
  272. %%% start(Mod, Args, Options)
  273. %%% start(Name, Mod, Args, Options)
  274. %%% start_link(Mod, Args, Options)
  275. %%% start_link(Name, Mod, Args, Options) where:
  276. %%% Name ::= {local, atom()} | {global, atom()}
  277. %%% Mod ::= atom(), callback module implementing the 'real' server
  278. %%% Args ::= term(), init arguments (to Mod:init/1)
  279. %%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
  280. %%% Flag ::= trace | log | {logfile, File} | statistics | debug
  281. %%% (debug == log && statistics)
  282. %%% Returns: {ok, Pid} |
  283. %%% {error, {already_started, Pid}} |
  284. %%% {error, Reason}
  285. %%% -----------------------------------------------------------------
  286. start(Mod, Args, Options) ->
  287. gen:start(?MODULE, nolink, Mod, Args, Options).
  288. start(Name, Mod, Args, Options) ->
  289. gen:start(?MODULE, nolink, Name, Mod, Args, Options).
  290. start_link(Mod, Args, Options) ->
  291. gen:start(?MODULE, link, Mod, Args, Options).
  292. start_link(Name, Mod, Args, Options) ->
  293. gen:start(?MODULE, link, Name, Mod, Args, Options).
  294. %% -----------------------------------------------------------------
  295. %% Make a call to a generic server.
  296. %% If the server is located at another node, that node will
  297. %% be monitored.
  298. %% If the client is trapping exits and is linked server termination
  299. %% is handled here (? Shall we do that here (or rely on timeouts) ?).
  300. %% -----------------------------------------------------------------
  301. call(Name, Request) ->
  302. case catch gen:call(Name, '$gen_call', Request) of
  303. {ok,Res} ->
  304. Res;
  305. {'EXIT',Reason} ->
  306. exit({Reason, {?MODULE, call, [Name, Request]}})
  307. end.
  308. call(Name, Request, Timeout) ->
  309. case catch gen:call(Name, '$gen_call', Request, Timeout) of
  310. {ok,Res} ->
  311. Res;
  312. {'EXIT',Reason} ->
  313. exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
  314. end.
  315. %% -----------------------------------------------------------------
  316. %% Make a cast to a generic server.
  317. %% -----------------------------------------------------------------
  318. cast({global,Name}, Request) ->
  319. catch global:send(Name, cast_msg(Request)),
  320. ok;
  321. cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
  322. do_cast(Dest, Request);
  323. cast(Dest, Request) when is_atom(Dest) ->
  324. do_cast(Dest, Request);
  325. cast(Dest, Request) when is_pid(Dest) ->
  326. do_cast(Dest, Request).
  327. do_cast(Dest, Request) ->
  328. do_send(Dest, cast_msg(Request)),
  329. ok.
  330. cast_msg(Request) -> {'$gen_cast',Request}.
  331. %% -----------------------------------------------------------------
  332. %% Send a reply to the client.
  333. %% -----------------------------------------------------------------
  334. reply({To, Tag}, Reply) ->
  335. catch To ! {Tag, Reply}.
  336. %% -----------------------------------------------------------------
  337. %% Asyncronous broadcast, returns nothing, it's just send'n pray
  338. %% -----------------------------------------------------------------
  339. abcast(Name, Request) when is_atom(Name) ->
  340. do_abcast([node() | nodes()], Name, cast_msg(Request)).
  341. abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
  342. do_abcast(Nodes, Name, cast_msg(Request)).
  343. do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
  344. do_send({Name,Node},Msg),
  345. do_abcast(Nodes, Name, Msg);
  346. do_abcast([], _,_) -> abcast.
  347. %%% -----------------------------------------------------------------
  348. %%% Make a call to servers at several nodes.
  349. %%% Returns: {[Replies],[BadNodes]}
  350. %%% A Timeout can be given
  351. %%%
  352. %%% A middleman process is used in case late answers arrives after
  353. %%% the timeout. If they would be allowed to glog the callers message
  354. %%% queue, it would probably become confused. Late answers will
  355. %%% now arrive to the terminated middleman and so be discarded.
  356. %%% -----------------------------------------------------------------
  357. multi_call(Name, Req)
  358. when is_atom(Name) ->
  359. do_multi_call([node() | nodes()], Name, Req, infinity).
  360. multi_call(Nodes, Name, Req)
  361. when is_list(Nodes), is_atom(Name) ->
  362. do_multi_call(Nodes, Name, Req, infinity).
  363. multi_call(Nodes, Name, Req, infinity) ->
  364. do_multi_call(Nodes, Name, Req, infinity);
  365. multi_call(Nodes, Name, Req, Timeout)
  366. when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
  367. do_multi_call(Nodes, Name, Req, Timeout).
  368. %%% -----------------------------------------------------------------
  369. %%% Make multiple calls to multiple servers, given pairs of servers
  370. %%% and messages.
  371. %%% Returns: {[{Dest, Reply}], [{Dest, Error}]}
  372. %%%
  373. %%% Dest can be pid() | RegName :: atom() |
  374. %%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()}
  375. %%%
  376. %%% A middleman process is used to avoid clogging up the callers
  377. %%% message queue.
  378. %%% -----------------------------------------------------------------
  379. mcall(CallSpecs) ->
  380. Tag = make_ref(),
  381. {_, MRef} = spawn_monitor(
  382. fun() ->
  383. Refs = lists:foldl(
  384. fun ({Dest, _Request}=S, Dict) ->
  385. dict:store(do_mcall(S), Dest, Dict)
  386. end, dict:new(), CallSpecs),
  387. collect_replies(Tag, Refs, [], [])
  388. end),
  389. receive
  390. {'DOWN', MRef, _, _, {Tag, Result}} -> Result;
  391. {'DOWN', MRef, _, _, Reason} -> exit(Reason)
  392. end.
  393. do_mcall({{global,Name}=Dest, Request}) ->
  394. %% whereis_name is simply an ets lookup, and is precisely what
  395. %% global:send/2 does, yet we need a Ref to put in the call to the
  396. %% server, so invoking whereis_name makes a lot more sense here.
  397. case global:whereis_name(Name) of
  398. Pid when is_pid(Pid) ->
  399. MRef = erlang:monitor(process, Pid),
  400. catch msend(Pid, MRef, Request),
  401. MRef;
  402. undefined ->
  403. Ref = make_ref(),
  404. self() ! {'DOWN', Ref, process, Dest, noproc},
  405. Ref
  406. end;
  407. do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) ->
  408. {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6
  409. catch msend(Dest, MRef, Request),
  410. MRef;
  411. do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) ->
  412. MRef = erlang:monitor(process, Dest),
  413. catch msend(Dest, MRef, Request),
  414. MRef.
  415. msend(Dest, MRef, Request) ->
  416. erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]).
  417. collect_replies(Tag, Refs, Replies, Errors) ->
  418. case dict:size(Refs) of
  419. 0 -> exit({Tag, {Replies, Errors}});
  420. _ -> receive
  421. {MRef, Reply} ->
  422. {Refs1, Replies1} = handle_call_result(MRef, Reply,
  423. Refs, Replies),
  424. collect_replies(Tag, Refs1, Replies1, Errors);
  425. {'DOWN', MRef, _, _, Reason} ->
  426. Reason1 = case Reason of
  427. noconnection -> nodedown;
  428. _ -> Reason
  429. end,
  430. {Refs1, Errors1} = handle_call_result(MRef, Reason1,
  431. Refs, Errors),
  432. collect_replies(Tag, Refs1, Replies, Errors1)
  433. end
  434. end.
  435. handle_call_result(MRef, Result, Refs, AccList) ->
  436. %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2}
  437. %% here, so we must cope with MRefs that we've already seen and erased
  438. case dict:find(MRef, Refs) of
  439. {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]};
  440. _ -> {Refs, AccList}
  441. end.
  442. %% -----------------------------------------------------------------
  443. %% Apply a function to a generic server's state.
  444. %% -----------------------------------------------------------------
  445. with_state(Name, Fun) ->
  446. case catch gen:call(Name, '$with_state', Fun, infinity) of
  447. {ok,Res} ->
  448. Res;
  449. {'EXIT',Reason} ->
  450. exit({Reason, {?MODULE, with_state, [Name, Fun]}})
  451. end.
  452. %%-----------------------------------------------------------------
  453. %% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
  454. %%
  455. %% Description: Makes an existing process into a gen_server.
  456. %% The calling process will enter the gen_server receive
  457. %% loop and become a gen_server process.
  458. %% The process *must* have been started using one of the
  459. %% start functions in proc_lib, see proc_lib(3).
  460. %% The user is responsible for any initialization of the
  461. %% process, including registering a name for it.
  462. %%-----------------------------------------------------------------
  463. enter_loop(Mod, Options, State) ->
  464. enter_loop(Mod, Options, State, self(), infinity, undefined).
  465. enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
  466. enter_loop(Mod, Options, State, self(), infinity, Backoff);
  467. enter_loop(Mod, Options, State, ServerName = {_, _}) ->
  468. enter_loop(Mod, Options, State, ServerName, infinity, undefined);
  469. enter_loop(Mod, Options, State, Timeout) ->
  470. enter_loop(Mod, Options, State, self(), Timeout, undefined).
  471. enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
  472. enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
  473. enter_loop(Mod, Options, State, ServerName, Timeout) ->
  474. enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
  475. enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
  476. Name = get_proc_name(ServerName),
  477. Parent = get_parent(),
  478. Debug = debug_options(Name, Options),
  479. Queue = priority_queue:new(),
  480. Backoff1 = extend_backoff(Backoff),
  481. loop(find_prioritisers(
  482. #gs2_state { parent = Parent, name = Name, state = State,
  483. mod = Mod, time = Timeout, timeout_state = Backoff1,
  484. queue = Queue, debug = Debug })).
  485. %%%========================================================================
  486. %%% Gen-callback functions
  487. %%%========================================================================
  488. %%% ---------------------------------------------------
  489. %%% Initiate the new process.
  490. %%% Register the name using the Rfunc function
  491. %%% Calls the Mod:init/Args function.
  492. %%% Finally an acknowledge is sent to Parent and the main
  493. %%% loop is entered.
  494. %%% ---------------------------------------------------
  495. init_it(Starter, self, Name, Mod, Args, Options) ->
  496. init_it(Starter, self(), Name, Mod, Args, Options);
  497. init_it(Starter, Parent, Name0, Mod, Args, Options) ->
  498. Name = name(Name0),
  499. Debug = debug_options(Name, Options),
  500. Queue = priority_queue:new(),
  501. GS2State = find_prioritisers(
  502. #gs2_state { parent = Parent,
  503. name = Name,
  504. mod = Mod,
  505. queue = Queue,
  506. debug = Debug }),
  507. case catch Mod:init(Args) of
  508. {ok, State} ->
  509. proc_lib:init_ack(Starter, {ok, self()}),
  510. loop(GS2State #gs2_state { state = State,
  511. time = infinity,
  512. timeout_state = undefined });
  513. {ok, State, Timeout} ->
  514. proc_lib:init_ack(Starter, {ok, self()}),
  515. loop(GS2State #gs2_state { state = State,
  516. time = Timeout,
  517. timeout_state = undefined });
  518. {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
  519. Backoff1 = extend_backoff(Backoff),
  520. proc_lib:init_ack(Starter, {ok, self()}),
  521. loop(GS2State #gs2_state { state = State,
  522. time = Timeout,
  523. timeout_state = Backoff1 });
  524. {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} ->
  525. Backoff1 = extend_backoff(Backoff),
  526. proc_lib:init_ack(Starter, {ok, self()}),
  527. loop(find_prioritisers(
  528. GS2State #gs2_state { mod = Mod1,
  529. state = State,
  530. time = Timeout,
  531. timeout_state = Backoff1 }));
  532. {stop, Reason} ->
  533. %% For consistency, we must make sure that the
  534. %% registered name (if any) is unregistered before
  535. %% the parent process is notified about the failure.
  536. %% (Otherwise, the parent process could get
  537. %% an 'already_started' error if it immediately
  538. %% tried starting the process again.)
  539. unregister_name(Name0),
  540. proc_lib:init_ack(Starter, {error, Reason}),
  541. exit(Reason);
  542. ignore ->
  543. unregister_name(Name0),
  544. proc_lib:init_ack(Starter, ignore),
  545. exit(normal);
  546. {'EXIT', Reason} ->
  547. unregister_name(Name0),
  548. proc_lib:init_ack(Starter, {error, Reason}),
  549. exit(Reason);
  550. Else ->
  551. Error = {bad_return_value, Else},
  552. proc_lib:init_ack(Starter, {error, Error}),
  553. exit(Error)
  554. end.
  555. name({local,Name}) -> Name;
  556. name({global,Name}) -> Name;
  557. %% name(Pid) when is_pid(Pid) -> Pid;
  558. %% when R12 goes away, drop the line beneath and uncomment the line above
  559. name(Name) -> Name.
  560. unregister_name({local,Name}) ->
  561. _ = (catch unregister(Name));
  562. unregister_name({global,Name}) ->
  563. _ = global:unregister_name(Name);
  564. unregister_name(Pid) when is_pid(Pid) ->
  565. Pid;
  566. %% Under R12 let's just ignore it, as we have a single term as Name.
  567. %% On R13 it will never get here, as we get tuple with 'local/global' atom.
  568. unregister_name(_Name) -> ok.
  569. extend_backoff(undefined) ->
  570. undefined;
  571. extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
  572. {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
  573. %%%========================================================================
  574. %%% Internal functions
  575. %%%========================================================================
  576. %%% ---------------------------------------------------
  577. %%% The MAIN loop.
  578. %%% ---------------------------------------------------
  579. loop(GS2State = #gs2_state { time = hibernate,
  580. timeout_state = undefined,
  581. queue = Queue }) ->
  582. case priority_queue:is_empty(Queue) of
  583. true -> pre_hibernate(GS2State);
  584. false -> process_next_msg(GS2State)
  585. end;
  586. loop(GS2State) ->
  587. process_next_msg(drain(GS2State)).
  588. drain(GS2State) ->
  589. receive
  590. Input -> drain(in(Input, GS2State))
  591. after 0 -> GS2State
  592. end.
  593. process_next_msg(GS2State = #gs2_state { time = Time,
  594. timeout_state = TimeoutState,
  595. queue = Queue }) ->
  596. case priority_queue:out(Queue) of
  597. {{value, Msg}, Queue1} ->
  598. process_msg(Msg, GS2State #gs2_state { queue = Queue1 });
  599. {empty, Queue1} ->
  600. {Time1, HibOnTimeout}
  601. = case {Time, TimeoutState} of
  602. {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
  603. {Current, true};
  604. {hibernate, _} ->
  605. %% wake_hib/7 will set Time to hibernate. If
  606. %% we were woken and didn't receive a msg
  607. %% then we will get here and need a sensible
  608. %% value for Time1, otherwise we crash.
  609. %% R13B1 always waits infinitely when waking
  610. %% from hibernation, so that's what we do
  611. %% here too.
  612. {infinity, false};
  613. _ -> {Time, false}
  614. end,
  615. receive
  616. Input ->
  617. %% Time could be 'hibernate' here, so *don't* call loop
  618. process_next_msg(
  619. drain(in(Input, GS2State #gs2_state { queue = Queue1 })))
  620. after Time1 ->
  621. case HibOnTimeout of
  622. true ->
  623. pre_hibernate(
  624. GS2State #gs2_state { queue = Queue1 });
  625. false ->
  626. process_msg(timeout,
  627. GS2State #gs2_state { queue = Queue1 })
  628. end
  629. end
  630. end.
  631. wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
  632. TimeoutState1 = case TS of
  633. undefined ->
  634. undefined;
  635. {SleptAt, TimeoutState} ->
  636. adjust_timeout_state(SleptAt, now(), TimeoutState)
  637. end,
  638. post_hibernate(
  639. drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
  640. hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
  641. TS = case TimeoutState of
  642. undefined -> undefined;
  643. {backoff, _, _, _, _} -> {now(), TimeoutState}
  644. end,
  645. proc_lib:hibernate(?MODULE, wake_hib,
  646. [GS2State #gs2_state { timeout_state = TS }]).
  647. pre_hibernate(GS2State = #gs2_state { state = State,
  648. mod = Mod }) ->
  649. case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
  650. true ->
  651. case catch Mod:handle_pre_hibernate(State) of
  652. {hibernate, NState} ->
  653. hibernate(GS2State #gs2_state { state = NState } );
  654. Reply ->
  655. handle_common_termination(Reply, pre_hibernate, GS2State)
  656. end;
  657. false ->
  658. hibernate(GS2State)
  659. end.
  660. post_hibernate(GS2State = #gs2_state { state = State,
  661. mod = Mod }) ->
  662. case erlang:function_exported(Mod, handle_post_hibernate, 1) of
  663. true ->
  664. case catch Mod:handle_post_hibernate(State) of
  665. {noreply, NState} ->
  666. process_next_msg(GS2State #gs2_state { state = NState,
  667. time = infinity });
  668. {noreply, NState, Time} ->
  669. process_next_msg(GS2State #gs2_state { state = NState,
  670. time = Time });
  671. Reply ->
  672. handle_common_termination(Reply, post_hibernate, GS2State)
  673. end;
  674. false ->
  675. %% use hibernate here, not infinity. This matches
  676. %% R13B. The key is that we should be able to get through
  677. %% to process_msg calling sys:handle_system_msg with Time
  678. %% still set to hibernate, iff that msg is the very msg
  679. %% that woke us up (or the first msg we receive after
  680. %% waking up).
  681. process_next_msg(GS2State #gs2_state { time = hibernate })
  682. end.
  683. adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
  684. DesiredHibPeriod, RandomState}) ->
  685. NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
  686. CurrentMicros = CurrentTO * 1000,
  687. MinimumMicros = MinimumTO * 1000,
  688. DesiredHibMicros = DesiredHibPeriod * 1000,
  689. GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
  690. Base =
  691. %% If enough time has passed between the last two messages then we
  692. %% should consider sleeping sooner. Otherwise stay awake longer.
  693. case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
  694. true -> lists:max([MinimumTO, CurrentTO div 2]);
  695. false -> CurrentTO
  696. end,
  697. {Extra, RandomState1} = random:uniform_s(Base, RandomState),
  698. CurrentTO1 = Base + Extra,
  699. {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
  700. in({'$gen_cast', Msg} = Input,
  701. GS2State = #gs2_state { prioritisers = {_, F, _} }) ->
  702. in(Input, F(Msg, GS2State), GS2State);
  703. in({'$gen_call', From, Msg} = Input,
  704. GS2State = #gs2_state { prioritisers = {F, _, _} }) ->
  705. in(Input, F(Msg, From, GS2State), GS2State);
  706. in({'$with_state', _From, _Fun} = Input, GS2State) ->
  707. in(Input, 0, GS2State);
  708. in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
  709. in(Input, infinity, GS2State);
  710. in({system, _From, _Req} = Input, GS2State) ->
  711. in(Input, infinity, GS2State);
  712. in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
  713. in(Input, F(Input, GS2State), GS2State).
  714. in(_Input, drop, GS2State) ->
  715. GS2State;
  716. in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
  717. GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
  718. process_msg({system, From, Req},
  719. GS2State = #gs2_state { parent = Parent, debug = Debug }) ->
  720. %% gen_server puts Hib on the end as the 7th arg, but that version
  721. %% of the fun seems not to be documented so leaving out for now.
  722. sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
  723. process_msg({'$with_state', From, Fun},
  724. GS2State = #gs2_state{state = State}) ->
  725. reply(From, catch Fun(State)),
  726. loop(GS2State);
  727. process_msg({'EXIT', Parent, Reason} = Msg,
  728. GS2State = #gs2_state { parent = Parent }) ->
  729. terminate(Reason, Msg, GS2State);
  730. process_msg(Msg, GS2State = #gs2_state { debug = [] }) ->
  731. handle_msg(Msg, GS2State);
  732. process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) ->
  733. Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}),
  734. handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }).
  735. %%% ---------------------------------------------------
  736. %%% Send/recive functions
  737. %%% ---------------------------------------------------
  738. do_send(Dest, Msg) ->
  739. catch erlang:send(Dest, Msg).
  740. do_multi_call(Nodes, Name, Req, infinity) ->
  741. Tag = make_ref(),
  742. Monitors = send_nodes(Nodes, Name, Tag, Req),
  743. rec_nodes(Tag, Monitors, Name, undefined);
  744. do_multi_call(Nodes, Name, Req, Timeout) ->
  745. Tag = make_ref(),
  746. Caller = self(),
  747. Receiver =
  748. spawn(
  749. fun () ->
  750. %% Middleman process. Should be unsensitive to regular
  751. %% exit signals. The sychronization is needed in case
  752. %% the receiver would exit before the caller started
  753. %% the monitor.
  754. process_flag(trap_exit, true),
  755. Mref = erlang:monitor(process, Caller),
  756. receive
  757. {Caller,Tag} ->
  758. Monitors = send_nodes(Nodes, Name, Tag, Req),
  759. TimerId = erlang:start_timer(Timeout, self(), ok),
  760. Result = rec_nodes(Tag, Monitors, Name, TimerId),
  761. exit({self(),Tag,Result});
  762. {'DOWN',Mref,_,_,_} ->
  763. %% Caller died before sending us the go-ahead.
  764. %% Give up silently.
  765. exit(normal)
  766. end
  767. end),
  768. Mref = erlang:monitor(process, Receiver),
  769. Receiver ! {self(),Tag},
  770. receive
  771. {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
  772. Result;
  773. {'DOWN',Mref,_,_,Reason} ->
  774. %% The middleman code failed. Or someone did
  775. %% exit(_, kill) on the middleman process => Reason==killed
  776. exit(Reason)
  777. end.
  778. send_nodes(Nodes, Name, Tag, Req) ->
  779. send_nodes(Nodes, Name, Tag, Req, []).
  780. send_nodes([Node|Tail], Name, Tag, Req, Monitors)
  781. when is_atom(Node) ->
  782. Monitor = start_monitor(Node, Name),
  783. %% Handle non-existing names in rec_nodes.
  784. catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
  785. send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
  786. send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
  787. %% Skip non-atom Node
  788. send_nodes(Tail, Name, Tag, Req, Monitors);
  789. send_nodes([], _Name, _Tag, _Req, Monitors) ->
  790. Monitors.
  791. %% Against old nodes:
  792. %% If no reply has been delivered within 2 secs. (per node) check that
  793. %% the server really exists and wait for ever for the answer.
  794. %%
  795. %% Against contemporary nodes:
  796. %% Wait for reply, server 'DOWN', or timeout from TimerId.
  797. rec_nodes(Tag, Nodes, Name, TimerId) ->
  798. rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
  799. rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
  800. receive
  801. {'DOWN', R, _, _, _} ->
  802. rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
  803. {{Tag, N}, Reply} -> %% Tag is bound !!!
  804. unmonitor(R),
  805. rec_nodes(Tag, Tail, Name, Badnodes,
  806. [{N,Reply}|Replies], Time, TimerId);
  807. {timeout, TimerId, _} ->
  808. unmonitor(R),
  809. %% Collect all replies that already have arrived
  810. rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
  811. end;
  812. rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
  813. %% R6 node
  814. receive
  815. {nodedown, N} ->
  816. monitor_node(N, false),
  817. rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
  818. {{Tag, N}, Reply} -> %% Tag is bound !!!
  819. receive {nodedown, N} -> ok after 0 -> ok end,
  820. monitor_node(N, false),
  821. rec_nodes(Tag, Tail, Name, Badnodes,
  822. [{N,Reply}|Replies], 2000, TimerId);
  823. {timeout, TimerId, _} ->
  824. receive {nodedown, N} -> ok after 0 -> ok end,
  825. monitor_node(N, false),
  826. %% Collect all replies that already have arrived
  827. rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
  828. after Time ->
  829. case rpc:call(N, erlang, whereis, [Name]) of
  830. Pid when is_pid(Pid) -> % It exists try again.
  831. rec_nodes(Tag, [N|Tail], Name, Badnodes,
  832. Replies, infinity, TimerId);
  833. _ -> % badnode
  834. receive {nodedown, N} -> ok after 0 -> ok end,
  835. monitor_node(N, false),
  836. rec_nodes(Tag, Tail, Name, [N|Badnodes],
  837. Replies, 2000, TimerId)
  838. end
  839. end;
  840. rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
  841. case catch erlang:cancel_timer(TimerId) of
  842. false -> % It has already sent it's message
  843. receive
  844. {timeout, TimerId, _} -> ok
  845. after 0 ->
  846. ok
  847. end;
  848. _ -> % Timer was cancelled, or TimerId was 'undefined'
  849. ok
  850. end,
  851. {Replies, Badnodes}.
  852. %% Collect all replies that already have arrived
  853. rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
  854. receive
  855. {'DOWN', R, _, _, _} ->
  856. rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
  857. {{Tag, N}, Reply} -> %% Tag is bound !!!
  858. unmonitor(R),
  859. rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
  860. after 0 ->
  861. unmonitor(R),
  862. rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
  863. end;
  864. rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
  865. %% R6 node
  866. receive
  867. {nodedown, N} ->
  868. monitor_node(N, false),
  869. rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
  870. {{Tag, N}, Reply} -> %% Tag is bound !!!
  871. receive {nodedown, N} -> ok after 0 -> ok end,
  872. monitor_node(N, false),
  873. rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
  874. after 0 ->
  875. receive {nodedown, N} -> ok after 0 -> ok end,
  876. monitor_node(N, false),
  877. rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
  878. end;
  879. rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
  880. {Replies, Badnodes}.
  881. %%% ---------------------------------------------------
  882. %%% Monitor functions
  883. %%% ---------------------------------------------------
  884. start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
  885. if node() =:= nonode@nohost, Node =/= nonode@nohost ->
  886. Ref = make_ref(),
  887. self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
  888. {Node, Ref};
  889. true ->
  890. case catch erlang:monitor(process, {Name, Node}) of
  891. {'EXIT', _} ->
  892. %% Remote node is R6
  893. monitor_node(Node, true),
  894. Node;
  895. Ref when is_reference(Ref) ->
  896. {Node, Ref}
  897. end
  898. end.
  899. %% Cancels a monitor started with Ref=erlang:monitor(_, _).
  900. unmonitor(Ref) when is_reference(Ref) ->
  901. erlang:demonitor(Ref),
  902. receive
  903. {'DOWN', Ref, _, _, _} ->
  904. true
  905. after 0 ->
  906. true
  907. end.
  908. %%% ---------------------------------------------------
  909. %%% Message handling functions
  910. %%% ---------------------------------------------------
  911. dispatch({'$gen_cast', Msg}, Mod, State) ->
  912. Mod:handle_cast(Msg, State);
  913. dispatch(Info, Mod, State) ->
  914. Mod:handle_info(Info, State).
  915. common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
  916. reply(From, Reply),
  917. [];
  918. common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) ->
  919. reply(From, Reply),
  920. sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
  921. common_noreply(_Name, _NState, [] = _Debug) ->
  922. [];
  923. common_noreply(Name, NState, Debug) ->
  924. sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}).
  925. common_become(_Name, _Mod, _NState, [] = _Debug) ->
  926. [];
  927. common_become(Name, Mod, NState, Debug) ->
  928. sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}).
  929. handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
  930. state = State,
  931. name = Name,
  932. debug = Debug }) ->
  933. case catch Mod:handle_call(Msg, From, State) of
  934. {reply, Reply, NState} ->
  935. Debug1 = common_reply(Name, From, Reply, NState, Debug),
  936. loop(GS2State #gs2_state { state = NState,
  937. time = infinity,
  938. debug = Debug1 });
  939. {reply, Reply, NState, Time1} ->
  940. Debug1 = common_reply(Name, From, Reply, NState, Debug),
  941. loop(GS2State #gs2_state { state = NState,
  942. time = Time1,
  943. debug = Debug1});
  944. {stop, Reason, Reply, NState} ->
  945. {'EXIT', R} =
  946. (catch terminate(Reason, Msg,
  947. GS2State #gs2_state { state = NState })),
  948. common_reply(Name, From, Reply, NState, Debug),
  949. exit(R);
  950. Other ->
  951. handle_common_reply(Other, Msg, GS2State)
  952. end;
  953. handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
  954. Reply = (catch dispatch(Msg, Mod, State)),
  955. handle_common_reply(Reply, Msg, GS2State).
  956. handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
  957. debug = Debug}) ->
  958. case Reply of
  959. {noreply, NState} ->
  960. Debug1 = common_noreply(Name, NState, Debug),
  961. loop(GS2State #gs2_state {state = NState,
  962. time = infinity,
  963. debug = Debug1});
  964. {noreply, NState, Time1} ->
  965. Debug1 = common_noreply(Name, NState, Debug),
  966. loop(GS2State #gs2_state {state = NState,
  967. time = Time1,
  968. debug = Debug1});
  969. {become, Mod, NState} ->
  970. Debug1 = common_become(Name, Mod, NState, Debug),
  971. loop(find_prioritisers(
  972. GS2State #gs2_state { mod = Mod,
  973. state = NState,
  974. time = infinity,
  975. debug = Debug1 }));
  976. {become, Mod, NState, Time1} ->
  977. Debug1 = common_become(Name, Mod, NState, Debug),
  978. loop(find_prioritisers(
  979. GS2State #gs2_state { mod = Mod,
  980. state = NState,
  981. time = Time1,
  982. debug = Debug1 }));
  983. _ ->
  984. handle_common_termination(Reply, Msg, GS2State)
  985. end.
  986. handle_common_termination(Reply, Msg, GS2State) ->
  987. case Reply of
  988. {stop, Reason, NState} ->
  989. terminate(Reason, Msg, GS2State #gs2_state { state = NState });
  990. {'EXIT', What} ->
  991. terminate(What, Msg, GS2State);
  992. _ ->
  993. terminate({bad_return_value, Reply}, Msg, GS2State)
  994. end.
  995. %%-----------------------------------------------------------------
  996. %% Callback functions for system messages handling.
  997. %%-----------------------------------------------------------------
  998. system_continue(Parent, Debug, GS2State) ->
  999. loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
  1000. system_terminate(Reason, _Parent, Debug, GS2State) ->
  1001. terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
  1002. system_code_change(GS2State = #gs2_state { mod = Mod,
  1003. state = State },
  1004. _Module, OldVsn, Extra) ->
  1005. case catch Mod:code_change(OldVsn, State, Extra) of
  1006. {ok, NewState} ->
  1007. NewGS2State = find_prioritisers(
  1008. GS2State #gs2_state { state = NewState }),
  1009. {ok, [NewGS2State]};
  1010. Else ->
  1011. Else
  1012. end.
  1013. %%-----------------------------------------------------------------
  1014. %% Format debug messages. Print them as the call-back module sees
  1015. %% them, not as the real erlang messages. Use trace for that.
  1016. %%-----------------------------------------------------------------
  1017. print_event(Dev, {in, Msg}, Name) ->
  1018. case Msg of
  1019. {'$gen_call', {From, _Tag}, Call} ->
  1020. io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
  1021. [Name, Call, From]);
  1022. {'$gen_cast', Cast} ->
  1023. io:format(Dev, "*DBG* ~p got cast ~p~n",
  1024. [Name, Cast]);
  1025. _ ->
  1026. io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
  1027. end;
  1028. print_event(Dev, {out, Msg, To, State}, Name) ->
  1029. io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
  1030. [Name, Msg, To, State]);
  1031. print_event(Dev, {noreply, State}, Name) ->
  1032. io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
  1033. print_event(Dev, Event, Name) ->
  1034. io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
  1035. %%% ---------------------------------------------------
  1036. %%% Terminate the server.
  1037. %%% ---------------------------------------------------
  1038. terminate(Reason, Msg, #gs2_state { name = Name,
  1039. mod = Mod,
  1040. state = State,
  1041. debug = Debug }) ->
  1042. case catch Mod:terminate(Reason, State) of
  1043. {'EXIT', R} ->
  1044. error_info(R, Reason, Name, Msg, State, Debug),
  1045. exit(R);
  1046. _ ->
  1047. case Reason of
  1048. normal ->
  1049. exit(normal);
  1050. shutdown ->
  1051. exit(shutdown);
  1052. {shutdown,_}=Shutdown ->
  1053. exit(Shutdown);
  1054. _ ->
  1055. error_info(Reason, undefined, Name, Msg, State, Debug),
  1056. exit(Reason)
  1057. end
  1058. end.
  1059. error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
  1060. %% OTP-5811 Don't send an error report if it's the system process
  1061. %% application_controller which is terminating - let init take care
  1062. %% of it instead
  1063. ok;
  1064. error_info(Reason, RootCause, Name, Msg, State, Debug) ->
  1065. Reason1 = error_reason(Reason),
  1066. Fmt =
  1067. "** Generic server ~p terminating~n"
  1068. "** Last message in was ~p~n"
  1069. "** When Server state == ~p~n"
  1070. "** Reason for termination == ~n** ~p~n",
  1071. case RootCause of
  1072. undefined -> format(Fmt, [Name, Msg, State, Reason1]);
  1073. _ -> format(Fmt ++ "** In 'terminate' callback "
  1074. "with reason ==~n** ~p~n",
  1075. [Name, Msg, State, Reason1,
  1076. error_reason(RootCause)])
  1077. end,
  1078. sys:print_log(Debug),
  1079. ok.
  1080. error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
  1081. case code:is_loaded(M) of
  1082. false -> {'module could not be loaded',[{M,F,A}|MFAs]};
  1083. _ -> case erlang:function_exported(M, F, length(A)) of
  1084. true -> Reason;
  1085. false -> {'function not exported',[{M,F,A}|MFAs]}
  1086. end
  1087. end;
  1088. error_reason(Reason) ->
  1089. Reason.
  1090. %%% ---------------------------------------------------
  1091. %%% Misc. functions.
  1092. %%% ---------------------------------------------------
  1093. opt(Op, [{Op, Value}|_]) ->
  1094. {ok, Value};
  1095. opt(Op, [_|Options]) ->
  1096. opt(Op, Options);
  1097. opt(_, []) ->
  1098. false.
  1099. debug_options(Name, Opts) ->
  1100. case opt(debug, Opts) of
  1101. {ok, Options} -> dbg_options(Name, Options);
  1102. _ -> dbg_options(Name, [])
  1103. end.
  1104. dbg_options(Name, []) ->
  1105. Opts =
  1106. case init:get_argument(generic_debug) of
  1107. error ->
  1108. [];
  1109. _ ->
  1110. [log, statistics]
  1111. end,
  1112. dbg_opts(Name, Opts);
  1113. dbg_options(Name, Opts) ->
  1114. dbg_opts(Name, Opts).
  1115. dbg_opts(Name, Opts) ->
  1116. case catch sys:debug_options(Opts) of
  1117. {'EXIT',_} ->
  1118. format("~p: ignoring erroneous debug options - ~p~n",
  1119. [Name, Opts]),
  1120. [];
  1121. Dbg ->
  1122. Dbg
  1123. end.
  1124. get_proc_name(Pid) when is_pid(Pid) ->
  1125. Pid;
  1126. get_proc_name({local, Name}) ->
  1127. case process_info(self(), registered_name) of
  1128. {registered_name, Name} ->
  1129. Name;
  1130. {registered_name, _Name} ->
  1131. exit(process_not_registered);
  1132. [] ->
  1133. exit(process_not_registered)
  1134. end;
  1135. get_proc_name({global, Name}) ->
  1136. case whereis_name(Name) of
  1137. undefined ->
  1138. exit(process_not_registered_globally);
  1139. Pid when Pid =:= self() ->
  1140. Name;
  1141. _Pid ->
  1142. exit(process_not_registered_globally)
  1143. end.
  1144. get_parent() ->
  1145. case get('$ancestors') of
  1146. [Parent | _] when is_pid(Parent)->
  1147. Parent;
  1148. [Parent | _] when is_atom(Parent)->
  1149. name_to_pid(Parent);
  1150. _ ->
  1151. exit(process_was_not_started_by_proc_lib)
  1152. end.
  1153. name_to_pid(Name) ->
  1154. case whereis(Name) of
  1155. undefined ->
  1156. case whereis_name(Name) of
  1157. undefined ->
  1158. exit(could_not_find_registerd_name);
  1159. Pid ->
  1160. Pid
  1161. end;
  1162. Pid ->
  1163. Pid
  1164. end.
  1165. whereis_name(Name) ->
  1166. case ets:lookup(global_names, Name) of
  1167. [{_Name, Pid, _Method, _RPid, _Ref}] ->
  1168. if node(Pid) == node() ->
  1169. case is_process_alive(Pid) of
  1170. true -> Pid;
  1171. false -> undefined
  1172. end;
  1173. true ->
  1174. Pid
  1175. end;
  1176. [] -> undefined
  1177. end.
  1178. find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
  1179. PCall = function_exported_or_default(Mod, 'prioritise_call', 4,
  1180. fun (_Msg, _From, _State) -> 0 end),
  1181. PCast = function_exported_or_default(Mod, 'prioritise_cast', 3,
  1182. fun (_Msg, _State) -> 0 end),
  1183. PInfo = function_exported_or_default(Mod, 'prioritise_info', 3,
  1184. fun (_Msg, _State) -> 0 end),
  1185. GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
  1186. function_exported_or_default(Mod, Fun, Arity, Default) ->
  1187. case erlang:function_exported(Mod, Fun, Arity) of
  1188. true -> case Arity of
  1189. 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
  1190. state = State }) ->
  1191. Length = priority_queue:len(Queue),
  1192. case catch Mod:Fun(Msg, Length, State) of
  1193. drop ->
  1194. drop;
  1195. Res when is_integer(Res) ->
  1196. Res;
  1197. Err ->
  1198. handle_common_termination(Err, Msg, GS2State)
  1199. end
  1200. end;
  1201. 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
  1202. state = State }) ->
  1203. Length = priority_queue:len(Queue),
  1204. case catch Mod:Fun(Msg, From, Length, State) of
  1205. Res when is_integer(Res) ->
  1206. Res;
  1207. Err ->
  1208. handle_common_termination(Err, Msg, GS2State)
  1209. end
  1210. end
  1211. end;
  1212. false -> Default
  1213. end.
  1214. %%-----------------------------------------------------------------
  1215. %% Status information
  1216. %%-----------------------------------------------------------------
  1217. format_status(Opt, StatusData) ->
  1218. [PDict, SysState, Parent, Debug,
  1219. #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] =
  1220. StatusData,
  1221. NameTag = if is_pid(Name) ->
  1222. pid_to_list(Name);
  1223. is_atom(Name) ->
  1224. Name
  1225. end,
  1226. Header = lists:concat(["Status for generic server ", NameTag]),
  1227. Log = sys:get_debug(log, Debug, []),
  1228. Specfic = callback(Mod, format_status, [Opt, [PDict, State]],
  1229. fun () -> [{data, [{"State", State}]}] end),
  1230. Messages = callback(Mod, format_message_queue, [Opt, Queue],
  1231. fun () -> priority_queue:to_list(Queue) end),
  1232. [{header, Header},
  1233. {data, [{"Status", SysState},
  1234. {"Parent", Parent},
  1235. {"Logged events", Log},
  1236. {"Queued messages", Messages}]} |
  1237. Specfic].
  1238. callback(Mod, FunName, Args, DefaultThunk) ->
  1239. case erlang:function_exported(Mod, FunName, length(Args)) of
  1240. true -> case catch apply(Mod, FunName, Args) of
  1241. {'EXIT', _} -> DefaultThunk();
  1242. Success -> Success
  1243. end;
  1244. false -> DefaultThunk()
  1245. end.