nodetool 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. #!/usr/bin/env escript
  2. %% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
  3. %%! +P 65536 +Q 65536 +S 1
  4. %% ex: ft=erlang ts=4 sw=4 et
  5. %% -------------------------------------------------------------------
  6. %%
  7. %% nodetool: Helper Script for interacting with live nodes
  8. %%
  9. %% -------------------------------------------------------------------
  10. -mode(compile).
  11. -define(SHUTDOWN_TIMEOUT_MS, 120_000).
  12. main(Args) ->
  13. case os:type() of
  14. {win32, nt} ->
  15. ok;
  16. _nix ->
  17. case init:get_argument(start_epmd) of
  18. {ok, [["true"]]} ->
  19. ok = start_epmd();
  20. _ ->
  21. ok
  22. end
  23. end,
  24. case Args of
  25. ["hocon" | Rest] ->
  26. ok = add_libs_dir(),
  27. %% forward the call to hocon_cli
  28. hocon_cli:main(Rest);
  29. ["check_license_key", Key0] ->
  30. ok = add_libs_dir(),
  31. Key = cleanup_key(Key0),
  32. check_license(#{key => Key});
  33. _ ->
  34. do(Args)
  35. end.
  36. %% the key is a string (list) representation of a binary, so we need
  37. %% to remove the leading and trailing angle brackets.
  38. cleanup_key(Str0) ->
  39. Str1 = iolist_to_binary(string:replace(Str0, "<<", "", leading)),
  40. iolist_to_binary(string:replace(Str1, ">>", "", trailing)).
  41. do(Args) ->
  42. ok = do_with_halt(Args, "mnesia_dir", fun create_mnesia_dir/2),
  43. Args1 = do_with_ret(
  44. Args,
  45. "-name",
  46. fun(TargetName) ->
  47. ThisNode = this_node_name(longnames, TargetName),
  48. {ok, _} = net_kernel:start([ThisNode, longnames]),
  49. put(target_node, nodename(TargetName))
  50. end
  51. ),
  52. Args2 = do_with_ret(
  53. Args1,
  54. "-sname",
  55. fun(TargetName) ->
  56. ThisNode = this_node_name(shortnames, TargetName),
  57. {ok, _} = net_kernel:start([ThisNode, shortnames]),
  58. put(target_node, nodename(TargetName))
  59. end
  60. ),
  61. RestArgs = do_with_ret(
  62. Args2,
  63. "-setcookie",
  64. fun(Cookie) ->
  65. erlang:set_cookie(node(), list_to_atom(Cookie))
  66. end
  67. ),
  68. [application:start(App) || App <- [crypto, public_key, ssl]],
  69. TargetNode = get(target_node),
  70. %% See if the node is currently running -- if it's not, we'll bail
  71. case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of
  72. {true, pong} ->
  73. ok;
  74. {false, pong} ->
  75. io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]),
  76. halt(1);
  77. {_, pang} ->
  78. io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]),
  79. halt(1)
  80. end,
  81. %% Mute logger from now on.
  82. %% Otherwise Erlang distribution over TLS (inet_tls_dist) warning logs
  83. %% and supervisor reports may contaminate io:format outputs
  84. logger:set_primary_config(level, none),
  85. case RestArgs of
  86. ["getpid"] ->
  87. io:format("~p\n", [list_to_integer(rpc:call(TargetNode, os, getpid, []))]);
  88. ["ping"] ->
  89. %% If we got this far, the node already responded to a ping, so just dump
  90. %% a "pong"
  91. io:format("pong\n");
  92. ["stop"] ->
  93. Pid = start_shutdown_status(),
  94. Res = rpc:call(TargetNode, emqx_machine, graceful_shutdown, [], ?SHUTDOWN_TIMEOUT_MS),
  95. true = stop_shutdown_status(Pid),
  96. case Res of
  97. ok ->
  98. ok;
  99. {badrpc, timeout} ->
  100. io:format(
  101. "EMQX is still shutting down, it failed to stop gracefully "
  102. "within the configured timeout of: ~ps\n",
  103. [erlang:convert_time_unit(?SHUTDOWN_TIMEOUT_MS, millisecond, second)]
  104. ),
  105. halt(1);
  106. {badrpc, nodedown} ->
  107. %% nodetool commands are always executed after a ping
  108. %% which if the code gets here, it's because the target node
  109. %% has shutdown before RPC returns.
  110. ok
  111. end;
  112. ["rpc", Module, Function | RpcArgs] ->
  113. case
  114. rpc:call(
  115. TargetNode,
  116. list_to_atom(Module),
  117. list_to_atom(Function),
  118. [RpcArgs],
  119. 60000
  120. )
  121. of
  122. ok ->
  123. ok;
  124. {error, cmd_not_found} ->
  125. halt(1);
  126. {error, Reason} ->
  127. io:format("RPC to ~s error: ~p\n", [TargetNode, Reason]),
  128. halt(1);
  129. {badrpc, Reason} ->
  130. io:format("RPC to ~s failed: ~p\n", [TargetNode, Reason]),
  131. halt(1);
  132. _ ->
  133. halt(1)
  134. end;
  135. ["rpc_infinity", Module, Function | RpcArgs] ->
  136. case
  137. rpc:call(
  138. TargetNode, list_to_atom(Module), list_to_atom(Function), [RpcArgs], infinity
  139. )
  140. of
  141. ok ->
  142. ok;
  143. {badrpc, Reason} ->
  144. io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
  145. halt(1);
  146. _ ->
  147. halt(1)
  148. end;
  149. ["rpcterms", Module, Function | ArgsAsString] ->
  150. case
  151. rpc:call(
  152. TargetNode,
  153. list_to_atom(Module),
  154. list_to_atom(Function),
  155. consult(lists:flatten(ArgsAsString)),
  156. 60000
  157. )
  158. of
  159. {badrpc, Reason} ->
  160. io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
  161. halt(1);
  162. Other ->
  163. io:format("~p\n", [Other])
  164. end;
  165. ["eval" | ListOfArgs] ->
  166. % parse args locally in the remsh node
  167. Parsed = parse_eval_args(ListOfArgs),
  168. % and evaluate it on the remote node
  169. case rpc:call(TargetNode, emqx_ctl, run_command, [eval_erl, Parsed], infinity) of
  170. {ok, Value} ->
  171. io:format("~p~n", [Value]);
  172. {badrpc, Reason} ->
  173. io:format("RPC to ~p failed: ~p~n", [TargetNode, Reason]),
  174. halt(1)
  175. end;
  176. Other ->
  177. io:format("Other: ~p~n", [Other]),
  178. io:format(
  179. "Usage: nodetool getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval|cold_eval [Terms] [RPC]\n"
  180. )
  181. end,
  182. net_kernel:stop().
  183. start_shutdown_status() ->
  184. spawn_link(fun shutdown_status_loop/0).
  185. stop_shutdown_status(Pid) ->
  186. true = unlink(Pid),
  187. true = exit(Pid, stop).
  188. shutdown_status_loop() ->
  189. timer:sleep(10_000),
  190. io:format("EMQX is shutting down, please wait...\n", []),
  191. shutdown_status_loop().
  192. parse_eval_args(Args) ->
  193. % shells may process args into more than one, and end up stripping
  194. % spaces, so this converts all of that to a single string to parse
  195. String = lists:flatten(lists:join(" ", Args)),
  196. % then just as a convenience to users, if they forgot a trailing
  197. % '.' add it for them.
  198. Normalized =
  199. case lists:reverse(String) of
  200. [$. | _] -> String;
  201. R -> lists:reverse([$. | R])
  202. end,
  203. % then scan and parse the string
  204. {ok, Scanned, _} = erl_scan:string(Normalized),
  205. {ok, Parsed} = erl_parse:parse_exprs(Scanned),
  206. Parsed.
  207. do_with_ret(Args, Name, Handler) ->
  208. {arity, Arity} = erlang:fun_info(Handler, arity),
  209. case take_args(Args, Name, Arity) of
  210. false ->
  211. Args;
  212. {Args1, Rest} ->
  213. _ = erlang:apply(Handler, Args1),
  214. Rest
  215. end.
  216. do_with_halt(Args, Name, Handler) ->
  217. {arity, Arity} = erlang:fun_info(Handler, arity),
  218. case take_args(Args, Name, Arity) of
  219. false ->
  220. ok;
  221. {Args1, _Rest} ->
  222. %% should halt
  223. erlang:apply(Handler, Args1),
  224. io:format(standard_error, "~s handler did not halt", [Name]),
  225. halt(?LINE)
  226. end.
  227. %% Return option args list if found, otherwise 'false'.
  228. take_args(Args, OptName, 0) ->
  229. lists:member(OptName, Args) andalso [];
  230. take_args(Args, OptName, OptArity) ->
  231. take_args(Args, OptName, OptArity, _Scanned = []).
  232. %% no such option
  233. take_args([], _, _, _) ->
  234. false;
  235. take_args([Name | Rest], Name, Arity, Scanned) ->
  236. length(Rest) >= Arity orelse error({not_enough_args_for, Name}),
  237. {Result, Tail} = lists:split(Arity, Rest),
  238. {Result, lists:reverse(Scanned) ++ Tail};
  239. take_args([Other | Rest], Name, Arity, Scanned) ->
  240. take_args(Rest, Name, Arity, [Other | Scanned]).
  241. start_epmd() ->
  242. [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
  243. ok.
  244. epmd_path() ->
  245. ErtsBinDir = filename:dirname(escript:script_name()),
  246. Name = "epmd",
  247. case os:find_executable(Name, ErtsBinDir) of
  248. false ->
  249. case os:find_executable(Name) of
  250. false ->
  251. io:format("Could not find epmd.~n"),
  252. halt(1);
  253. GlobalEpmd ->
  254. GlobalEpmd
  255. end;
  256. Epmd ->
  257. Epmd
  258. end.
  259. nodename(Name) ->
  260. case re:split(Name, "@", [{return, list}, unicode]) of
  261. [_Node, _Host] ->
  262. list_to_atom(Name);
  263. [Node] ->
  264. [_, Host] = re:split(atom_to_list(node()), "@", [{return, list}, unicode]),
  265. list_to_atom(lists:concat([Node, "@", Host]))
  266. end.
  267. this_node_name(longnames, Name) ->
  268. [Node, Host] = re:split(Name, "@", [{return, list}, unicode]),
  269. list_to_atom(lists:concat(["remsh_maint_", Node, node_name_suffix_id(), "@", Host]));
  270. this_node_name(shortnames, Name) ->
  271. list_to_atom(lists:concat(["remsh_maint_", Name, node_name_suffix_id()])).
  272. %% use the reversed value that from pid mod 1000 as the node name suffix
  273. node_name_suffix_id() ->
  274. Pid = os:getpid(),
  275. string:slice(string:reverse(Pid), 0, 3).
  276. %% For windows???
  277. create_mnesia_dir(DataDir, NodeName) ->
  278. MnesiaDir = filename:join(DataDir, NodeName),
  279. file:make_dir(MnesiaDir),
  280. io:format("~s", [MnesiaDir]),
  281. halt(0).
  282. check_license(Config) ->
  283. ok = ensure_application_load(emqx_license),
  284. %% This checks formal license validity to ensure
  285. %% that the node can successfully start with the given license.
  286. %% However, a valid license may be expired. In this case, the node will
  287. %% start but will not be able to receive connections due to connection limits.
  288. %% It may receive license updates from the cluster further.
  289. case emqx_license:read_license(Config) of
  290. {ok, _} ->
  291. ok;
  292. {error, Error} ->
  293. io:format(standard_error, "Error reading license: ~p~n", [Error]),
  294. halt(1)
  295. end.
  296. %%
  297. %% Given a string or binary, parse it into a list of terms, ala file:consult/0
  298. %%
  299. consult(Str) when is_list(Str) ->
  300. consult([], Str, []);
  301. consult(Bin) when is_binary(Bin) ->
  302. consult([], binary_to_list(Bin), []).
  303. consult(Cont, Str, Acc) ->
  304. case erl_scan:tokens(Cont, Str, 0) of
  305. {done, Result, Remaining} ->
  306. case Result of
  307. {ok, Tokens, _} ->
  308. {ok, Term} = erl_parse:parse_term(Tokens),
  309. consult([], Remaining, [Term | Acc]);
  310. {eof, _Other} ->
  311. lists:reverse(Acc);
  312. {error, Info, _} ->
  313. {error, Info}
  314. end;
  315. {more, Cont1} ->
  316. consult(Cont1, eof, Acc)
  317. end.
  318. add_libs_dir() ->
  319. [_ | _] = RootDir = os:getenv("RUNNER_ROOT_DIR"),
  320. CurrentVsn = os:getenv("REL_VSN"),
  321. RelFile = filename:join([RootDir, "releases", "RELEASES"]),
  322. case file:consult(RelFile) of
  323. {ok, [Releases]} ->
  324. Release = lists:keyfind(CurrentVsn, 3, Releases),
  325. {release, _Name, _AppVsn, _ErtsVsn, Libs, _State} = Release,
  326. lists:foreach(
  327. fun({Name, Vsn, _}) ->
  328. add_lib_dir(RootDir, Name, Vsn)
  329. end,
  330. Libs
  331. );
  332. {error, Reason} ->
  333. %% rel file was been deleted by release handler
  334. error({failed_to_read_RELEASES_file, RelFile, Reason})
  335. end,
  336. ok = add_patches_dir(filename:join([RootDir, "data", "patches"])),
  337. ok = add_patches_dir("/var/lib/emqx/patches").
  338. add_patches_dir(PatchesDir) ->
  339. case filelib:is_dir(PatchesDir) of
  340. true ->
  341. true = code:add_patha(PatchesDir),
  342. ok;
  343. false ->
  344. ok
  345. end.
  346. add_lib_dir(RootDir, Name, Vsn) ->
  347. LibDir = filename:join([RootDir, lib, atom_to_list(Name) ++ "-" ++ Vsn, ebin]),
  348. case code:add_patha(LibDir) of
  349. true ->
  350. %% load all applications into application controller, before performing
  351. %% the configuration check of HOCON
  352. %%
  353. %% It helps to implement the feature of dynamically searching schema.
  354. %% See `emqx_gateway_schema:fields(gateway)`
  355. is_emqx_application(Name) andalso ensure_application_load(Name),
  356. ok;
  357. {error, _} ->
  358. error(LibDir)
  359. end.
  360. is_emqx_application(Name) when is_atom(Name) ->
  361. is_emqx_application(atom_to_list(Name));
  362. is_emqx_application("emqx_" ++ _Rest) ->
  363. true;
  364. is_emqx_application(_) ->
  365. false.
  366. ensure_application_load(Name) ->
  367. case application:load(Name) of
  368. ok -> ok;
  369. {error, {already_loaded, _}} -> ok;
  370. {error, Reason} -> error({failed_to_load_application, Name, Reason})
  371. end.