nodetool 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. #!/usr/bin/env escript
  2. %% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
  3. %% ex: ft=erlang ts=4 sw=4 et
  4. %% -------------------------------------------------------------------
  5. %%
  6. %% nodetool: Helper Script for interacting with live nodes
  7. %%
  8. %% -------------------------------------------------------------------
  9. -mode(compile).
  10. -define(SHUTDOWN_TIMEOUT_MS, 120_000).
  11. main(Args) ->
  12. case os:type() of
  13. {win32, nt} ->
  14. ok;
  15. _nix ->
  16. case init:get_argument(start_epmd) of
  17. {ok, [["true"]]} ->
  18. ok = start_epmd();
  19. _ ->
  20. ok
  21. end
  22. end,
  23. case Args of
  24. ["hocon" | Rest] ->
  25. ok = add_libs_dir(),
  26. %% forward the call to hocon_cli
  27. hocon_cli:main(Rest);
  28. ["check_license_key", Key0] ->
  29. ok = add_libs_dir(),
  30. Key = cleanup_key(Key0),
  31. check_license(#{key => Key});
  32. _ ->
  33. do(Args)
  34. end.
  35. %% the key is a string (list) representation of a binary, so we need
  36. %% to remove the leading and trailing angle brackets.
  37. cleanup_key(Str0) ->
  38. Str1 = iolist_to_binary(string:replace(Str0, "<<", "", leading)),
  39. iolist_to_binary(string:replace(Str1, ">>", "", trailing)).
  40. do(Args) ->
  41. ok = do_with_halt(Args, "mnesia_dir", fun create_mnesia_dir/2),
  42. ok = do_with_halt(Args, "chkconfig", fun("-config", X) -> chkconfig(X) end),
  43. ok = do_with_halt(Args, "chkconfig", fun chkconfig/1),
  44. Args1 = do_with_ret(
  45. Args,
  46. "-name",
  47. fun(TargetName) ->
  48. ThisNode = this_node_name(longnames, TargetName),
  49. {ok, _} = net_kernel:start([ThisNode, longnames]),
  50. put(target_node, nodename(TargetName))
  51. end
  52. ),
  53. Args2 = do_with_ret(
  54. Args1,
  55. "-sname",
  56. fun(TargetName) ->
  57. ThisNode = this_node_name(shortnames, TargetName),
  58. {ok, _} = net_kernel:start([ThisNode, shortnames]),
  59. put(target_node, nodename(TargetName))
  60. end
  61. ),
  62. RestArgs = do_with_ret(
  63. Args2,
  64. "-setcookie",
  65. fun(Cookie) ->
  66. erlang:set_cookie(node(), list_to_atom(Cookie))
  67. end
  68. ),
  69. [application:start(App) || App <- [crypto, public_key, ssl]],
  70. TargetNode = get(target_node),
  71. %% See if the node is currently running -- if it's not, we'll bail
  72. case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of
  73. {true, pong} ->
  74. ok;
  75. {false, pong} ->
  76. io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]),
  77. halt(1);
  78. {_, pang} ->
  79. io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]),
  80. halt(1)
  81. end,
  82. %% Mute logger from now on.
  83. %% Otherwise Erlang distribution over TLS (inet_tls_dist) warning logs
  84. %% and supervisor reports may contaminate io:format outputs
  85. logger:set_primary_config(level, none),
  86. case RestArgs of
  87. ["getpid"] ->
  88. io:format("~p\n", [list_to_integer(rpc:call(TargetNode, os, getpid, []))]);
  89. ["ping"] ->
  90. %% If we got this far, the node already responded to a ping, so just dump
  91. %% a "pong"
  92. io:format("pong\n");
  93. ["stop"] ->
  94. Pid = start_shutdown_status(),
  95. Res = rpc:call(TargetNode, emqx_machine, graceful_shutdown, [], ?SHUTDOWN_TIMEOUT_MS),
  96. true = stop_shutdown_status(Pid),
  97. case Res of
  98. ok ->
  99. ok;
  100. {badrpc, timeout} ->
  101. io:format(
  102. "EMQX is still shutting down, it failed to stop gracefully "
  103. "within the configured timeout of: ~ps\n",
  104. [erlang:convert_time_unit(?SHUTDOWN_TIMEOUT_MS, millisecond, second)]
  105. ),
  106. halt(1);
  107. {badrpc, nodedown} ->
  108. %% nodetool commands are always executed after a ping
  109. %% which if the code gets here, it's because the target node
  110. %% has shutdown before RPC returns.
  111. ok
  112. end;
  113. ["rpc", Module, Function | RpcArgs] ->
  114. case
  115. rpc:call(
  116. TargetNode,
  117. list_to_atom(Module),
  118. list_to_atom(Function),
  119. [RpcArgs],
  120. 60000
  121. )
  122. of
  123. ok ->
  124. ok;
  125. {error, cmd_not_found} ->
  126. halt(1);
  127. {error, Reason} ->
  128. io:format("RPC to ~s error: ~p\n", [TargetNode, Reason]),
  129. halt(1);
  130. {badrpc, Reason} ->
  131. io:format("RPC to ~s failed: ~p\n", [TargetNode, Reason]),
  132. halt(1);
  133. _ ->
  134. halt(1)
  135. end;
  136. ["rpc_infinity", Module, Function | RpcArgs] ->
  137. case
  138. rpc:call(
  139. TargetNode, list_to_atom(Module), list_to_atom(Function), [RpcArgs], infinity
  140. )
  141. of
  142. ok ->
  143. ok;
  144. {badrpc, Reason} ->
  145. io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
  146. halt(1);
  147. _ ->
  148. halt(1)
  149. end;
  150. ["rpcterms", Module, Function | ArgsAsString] ->
  151. case
  152. rpc:call(
  153. TargetNode,
  154. list_to_atom(Module),
  155. list_to_atom(Function),
  156. consult(lists:flatten(ArgsAsString)),
  157. 60000
  158. )
  159. of
  160. {badrpc, Reason} ->
  161. io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
  162. halt(1);
  163. Other ->
  164. io:format("~p\n", [Other])
  165. end;
  166. ["eval" | ListOfArgs] ->
  167. % parse args locally in the remsh node
  168. Parsed = parse_eval_args(ListOfArgs),
  169. % and evaluate it on the remote node
  170. case rpc:call(TargetNode, emqx_ctl, run_command, [eval_erl, Parsed], infinity) of
  171. {ok, Value} ->
  172. io:format("~p~n", [Value]);
  173. {badrpc, Reason} ->
  174. io:format("RPC to ~p failed: ~p~n", [TargetNode, Reason]),
  175. halt(1)
  176. end;
  177. Other ->
  178. io:format("Other: ~p~n", [Other]),
  179. io:format(
  180. "Usage: nodetool chkconfig|getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval|cold_eval [Terms] [RPC]\n"
  181. )
  182. end,
  183. net_kernel:stop().
  184. start_shutdown_status() ->
  185. spawn_link(fun shutdown_status_loop/0).
  186. stop_shutdown_status(Pid) ->
  187. true = unlink(Pid),
  188. true = exit(Pid, stop).
  189. shutdown_status_loop() ->
  190. timer:sleep(10_000),
  191. io:format("EMQX is shutting down, please wait...\n", []),
  192. shutdown_status_loop().
  193. parse_eval_args(Args) ->
  194. % shells may process args into more than one, and end up stripping
  195. % spaces, so this converts all of that to a single string to parse
  196. String = binary_to_list(
  197. list_to_binary(
  198. join(Args, " ")
  199. )
  200. ),
  201. % then just as a convenience to users, if they forgot a trailing
  202. % '.' add it for them.
  203. Normalized =
  204. case lists:reverse(String) of
  205. [$. | _] -> String;
  206. R -> lists:reverse([$. | R])
  207. end,
  208. % then scan and parse the string
  209. {ok, Scanned, _} = erl_scan:string(Normalized),
  210. {ok, Parsed} = erl_parse:parse_exprs(Scanned),
  211. Parsed.
  212. do_with_ret(Args, Name, Handler) ->
  213. {arity, Arity} = erlang:fun_info(Handler, arity),
  214. case take_args(Args, Name, Arity) of
  215. false ->
  216. Args;
  217. {Args1, Rest} ->
  218. _ = erlang:apply(Handler, Args1),
  219. Rest
  220. end.
  221. do_with_halt(Args, Name, Handler) ->
  222. {arity, Arity} = erlang:fun_info(Handler, arity),
  223. case take_args(Args, Name, Arity) of
  224. false ->
  225. ok;
  226. {Args1, _Rest} ->
  227. %% should halt
  228. erlang:apply(Handler, Args1),
  229. io:format(standard_error, "~s handler did not halt", [Name]),
  230. halt(?LINE)
  231. end.
  232. %% Return option args list if found, otherwise 'false'.
  233. take_args(Args, OptName, 0) ->
  234. lists:member(OptName, Args) andalso [];
  235. take_args(Args, OptName, OptArity) ->
  236. take_args(Args, OptName, OptArity, _Scanned = []).
  237. %% no such option
  238. take_args([], _, _, _) ->
  239. false;
  240. take_args([Name | Rest], Name, Arity, Scanned) ->
  241. length(Rest) >= Arity orelse error({not_enough_args_for, Name}),
  242. {Result, Tail} = lists:split(Arity, Rest),
  243. {Result, lists:reverse(Scanned) ++ Tail};
  244. take_args([Other | Rest], Name, Arity, Scanned) ->
  245. take_args(Rest, Name, Arity, [Other | Scanned]).
  246. start_epmd() ->
  247. [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
  248. ok.
  249. epmd_path() ->
  250. ErtsBinDir = filename:dirname(escript:script_name()),
  251. Name = "epmd",
  252. case os:find_executable(Name, ErtsBinDir) of
  253. false ->
  254. case os:find_executable(Name) of
  255. false ->
  256. io:format("Could not find epmd.~n"),
  257. halt(1);
  258. GlobalEpmd ->
  259. GlobalEpmd
  260. end;
  261. Epmd ->
  262. Epmd
  263. end.
  264. nodename(Name) ->
  265. case re:split(Name, "@", [{return, list}, unicode]) of
  266. [_Node, _Host] ->
  267. list_to_atom(Name);
  268. [Node] ->
  269. [_, Host] = re:split(atom_to_list(node()), "@", [{return, list}, unicode]),
  270. list_to_atom(lists:concat([Node, "@", Host]))
  271. end.
  272. this_node_name(longnames, Name) ->
  273. [Node, Host] = re:split(Name, "@", [{return, list}, unicode]),
  274. list_to_atom(lists:concat(["remsh_maint_", Node, node_name_suffix_id(), "@", Host]));
  275. this_node_name(shortnames, Name) ->
  276. list_to_atom(lists:concat(["remsh_maint_", Name, node_name_suffix_id()])).
  277. %% use the reversed value that from pid mod 1000 as the node name suffix
  278. node_name_suffix_id() ->
  279. Pid = os:getpid(),
  280. string:slice(string:reverse(Pid), 0, 3).
  281. %% For windows???
  282. create_mnesia_dir(DataDir, NodeName) ->
  283. MnesiaDir = filename:join(DataDir, NodeName),
  284. file:make_dir(MnesiaDir),
  285. io:format("~s", [MnesiaDir]),
  286. halt(0).
  287. chkconfig(File) ->
  288. case file:consult(File) of
  289. {ok, Terms} ->
  290. case validate(Terms) of
  291. ok ->
  292. halt(0);
  293. {error, Problems} ->
  294. lists:foreach(fun print_issue/1, Problems),
  295. %% halt(1) if any problems were errors
  296. halt(
  297. case [x || {error, _} <- Problems] of
  298. [] -> 0;
  299. _ -> 1
  300. end
  301. )
  302. end;
  303. {error, {Line, Mod, Term}} ->
  304. io:format(
  305. standard_error, ["Error on line ", file:format_error({Line, Mod, Term}), "\n"], []
  306. ),
  307. halt(1);
  308. {error, Error} ->
  309. io:format(
  310. standard_error,
  311. ["Error reading config file: ", File, " ", file:format_error(Error), "\n"],
  312. []
  313. ),
  314. halt(1)
  315. end.
  316. check_license(Config) ->
  317. ok = ensure_application_load(emqx_license),
  318. %% This checks formal license validity to ensure
  319. %% that the node can successfully start with the given license.
  320. %% However, a valid license may be expired. In this case, the node will
  321. %% start but will not be able to receive connections due to connection limits.
  322. %% It may receive license updates from the cluster further.
  323. case emqx_license:read_license(Config) of
  324. {ok, _} ->
  325. ok;
  326. {error, Error} ->
  327. io:format(standard_error, "Error reading license: ~p~n", [Error]),
  328. halt(1)
  329. end.
  330. %%
  331. %% Given a string or binary, parse it into a list of terms, ala file:consult/0
  332. %%
  333. consult(Str) when is_list(Str) ->
  334. consult([], Str, []);
  335. consult(Bin) when is_binary(Bin) ->
  336. consult([], binary_to_list(Bin), []).
  337. consult(Cont, Str, Acc) ->
  338. case erl_scan:tokens(Cont, Str, 0) of
  339. {done, Result, Remaining} ->
  340. case Result of
  341. {ok, Tokens, _} ->
  342. {ok, Term} = erl_parse:parse_term(Tokens),
  343. consult([], Remaining, [Term | Acc]);
  344. {eof, _Other} ->
  345. lists:reverse(Acc);
  346. {error, Info, _} ->
  347. {error, Info}
  348. end;
  349. {more, Cont1} ->
  350. consult(Cont1, eof, Acc)
  351. end.
  352. %%
  353. %% Validation functions for checking the app.config
  354. %%
  355. validate([Terms]) ->
  356. Results = [ValidateFun(Terms) || ValidateFun <- get_validation_funs()],
  357. Failures = [Res || Res <- Results, Res /= true],
  358. case Failures of
  359. [] ->
  360. ok;
  361. _ ->
  362. {error, Failures}
  363. end.
  364. %% Some initial and basic checks for the app.config file
  365. get_validation_funs() ->
  366. [].
  367. print_issue({warning, Warning}) ->
  368. io:format(standard_error, "Warning in app.config: ~s~n", [Warning]);
  369. print_issue({error, Error}) ->
  370. io:format(standard_error, "Error in app.config: ~s~n", [Error]).
  371. %% string:join/2 copy; string:join/2 is getting obsoleted
  372. %% and replaced by lists:join/2, but lists:join/2 is too new
  373. %% for version support (only appeared in 19.0) so it cannot be
  374. %% used. Instead we just adopt join/2 locally and hope it works
  375. %% for most unicode use cases anyway.
  376. join([], Sep) when is_list(Sep) ->
  377. [];
  378. join([H | T], Sep) ->
  379. H ++ lists:append([Sep ++ X || X <- T]).
  380. add_libs_dir() ->
  381. [_ | _] = RootDir = os:getenv("RUNNER_ROOT_DIR"),
  382. CurrentVsn = os:getenv("REL_VSN"),
  383. RelFile = filename:join([RootDir, "releases", "RELEASES"]),
  384. case file:consult(RelFile) of
  385. {ok, [Releases]} ->
  386. Release = lists:keyfind(CurrentVsn, 3, Releases),
  387. {release, _Name, _AppVsn, _ErtsVsn, Libs, _State} = Release,
  388. lists:foreach(
  389. fun({Name, Vsn, _}) ->
  390. add_lib_dir(RootDir, Name, Vsn)
  391. end,
  392. Libs
  393. );
  394. {error, Reason} ->
  395. %% rel file was been deleted by release handler
  396. error({failed_to_read_RELEASES_file, RelFile, Reason})
  397. end,
  398. ok = add_patches_dir(filename:join([RootDir, "data", "patches"])),
  399. ok = add_patches_dir("/var/lib/emqx/patches").
  400. add_patches_dir(PatchesDir) ->
  401. case filelib:is_dir(PatchesDir) of
  402. true ->
  403. true = code:add_patha(PatchesDir),
  404. ok;
  405. false ->
  406. ok
  407. end.
  408. add_lib_dir(RootDir, Name, Vsn) ->
  409. LibDir = filename:join([RootDir, lib, atom_to_list(Name) ++ "-" ++ Vsn, ebin]),
  410. case code:add_patha(LibDir) of
  411. true ->
  412. %% load all applications into application controller, before performing
  413. %% the configuration check of HOCON
  414. %%
  415. %% It helps to implement the feature of dynamically searching schema.
  416. %% See `emqx_gateway_schema:fields(gateway)`
  417. is_emqx_application(Name) andalso ensure_application_load(Name),
  418. ok;
  419. {error, _} ->
  420. error(LibDir)
  421. end.
  422. is_emqx_application(Name) when is_atom(Name) ->
  423. is_emqx_application(atom_to_list(Name));
  424. is_emqx_application("emqx_" ++ _Rest) ->
  425. true;
  426. is_emqx_application(_) ->
  427. false.
  428. ensure_application_load(Name) ->
  429. case application:load(Name) of
  430. ok -> ok;
  431. {error, {already_loaded, _}} -> ok;
  432. {error, Reason} -> error({failed_to_load_application, Name, Reason})
  433. end.