emqx_machine.erl 7.6 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_machine).
  17. -export([
  18. start/0,
  19. graceful_shutdown/0,
  20. brutal_shutdown/0,
  21. is_ready/0,
  22. node_status/0,
  23. update_vips/0
  24. ]).
  25. -export([open_ports_check/0]).
  26. -export([mria_lb_custom_info/0, mria_lb_custom_info_check/1]).
  27. -ifdef(TEST).
  28. -export([create_plan/0]).
  29. -endif.
  30. -include_lib("kernel/include/inet.hrl").
  31. -include_lib("emqx/include/logger.hrl").
  32. %% @doc EMQX boot entrypoint.
  33. start() ->
  34. emqx_mgmt_cli:load(),
  35. case os:type() of
  36. {win32, nt} ->
  37. ok;
  38. _Nix ->
  39. os:set_signal(sighup, ignore),
  40. %% default is handle
  41. os:set_signal(sigterm, handle)
  42. end,
  43. ok = set_backtrace_depth(),
  44. ok = start_sysmon(),
  45. configure_shard_transports(),
  46. set_mnesia_extra_diagnostic_checks(),
  47. emqx_otel_app:configure_otel_deps(),
  48. %% Register mria callbacks that help to check compatibility of the
  49. %% replicant with the core node. Currently they rely on the exact
  50. %% match of the version of EMQX OTP application:
  51. _ = application:load(mria),
  52. _ = application:load(emqx),
  53. mria_config:register_callback(lb_custom_info, fun ?MODULE:mria_lb_custom_info/0),
  54. mria_config:register_callback(lb_custom_info_check, fun ?MODULE:mria_lb_custom_info_check/1),
  55. ekka:start(),
  56. ok.
  57. graceful_shutdown() ->
  58. emqx_machine_terminator:graceful_wait().
  59. %% only used when failed to boot
  60. brutal_shutdown() ->
  61. init:stop().
  62. set_backtrace_depth() ->
  63. {ok, Depth} = application:get_env(emqx_machine, backtrace_depth),
  64. _ = erlang:system_flag(backtrace_depth, Depth),
  65. ok.
  66. %% @doc Return true if boot is complete.
  67. is_ready() ->
  68. emqx_machine_terminator:is_running().
  69. start_sysmon() ->
  70. _ = application:load(system_monitor),
  71. application:set_env(system_monitor, node_status_fun, {?MODULE, node_status}),
  72. application:set_env(system_monitor, status_checks, [{?MODULE, update_vips, false, 10}]),
  73. case application:get_env(system_monitor, db_hostname) of
  74. {ok, [_ | _]} ->
  75. application:set_env(system_monitor, callback_mod, system_monitor_pg),
  76. _ = application:ensure_all_started(system_monitor, temporary),
  77. ok;
  78. _ ->
  79. %% If there is no sink for the events, there is no reason
  80. %% to run system_monitor_top, ignore start
  81. ok
  82. end.
  83. node_status() ->
  84. emqx_utils_json:encode(#{
  85. backend => mria_rlog:backend(),
  86. role => mria_rlog:role()
  87. }).
  88. update_vips() ->
  89. system_monitor:add_vip(mria_status:shards_up()).
  90. configure_shard_transports() ->
  91. ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}),
  92. lists:foreach(
  93. fun({ShardBin, Transport}) ->
  94. ShardName = binary_to_existing_atom(ShardBin),
  95. mria_config:set_shard_transport(ShardName, Transport)
  96. end,
  97. maps:to_list(ShardTransports)
  98. ).
  99. set_mnesia_extra_diagnostic_checks() ->
  100. Checks = [{check_open_ports, ok, fun ?MODULE:open_ports_check/0}],
  101. mria_config:set_extra_mnesia_diagnostic_checks(Checks),
  102. ok.
  103. -define(PORT_PROBE_TIMEOUT, 10_000).
  104. open_ports_check() ->
  105. Plan = create_plan(),
  106. %% 2 ports to check: ekka/epmd and gen_rpc
  107. Timeout = 2 * ?PORT_PROBE_TIMEOUT + 5_000,
  108. try emqx_utils:pmap(fun do_check/1, Plan, Timeout) of
  109. Results ->
  110. verify_results(Results)
  111. catch
  112. Kind:Reason:Stacktrace ->
  113. #{
  114. msg => "error probing ports",
  115. exception => Kind,
  116. reason => Reason,
  117. stacktrace => Stacktrace
  118. }
  119. end.
  120. verify_results(Results0) ->
  121. Errors = [
  122. R
  123. || R = {_Node, #{status := Status}} <- Results0,
  124. Status =/= ok
  125. ],
  126. case Errors of
  127. [] ->
  128. %% all ok
  129. ok;
  130. _ ->
  131. Results1 = maps:from_list(Results0),
  132. #{results => Results1, msg => "some ports are unreachable"}
  133. end.
  134. create_plan() ->
  135. %% expected core nodes according to mnesia schema
  136. OtherNodes = mnesia:system_info(db_nodes) -- [node()],
  137. lists:map(
  138. fun(N) ->
  139. IPs = node_to_ips(N),
  140. {_GenRPCMod, GenRPCPort} = gen_rpc_helper:get_client_config_per_node(N),
  141. %% 0 or 1 result
  142. EkkaEPMDPort = get_ekka_epmd_port(IPs),
  143. {N, #{
  144. resolved_ips => IPs,
  145. ports_to_check => [GenRPCPort | EkkaEPMDPort]
  146. }}
  147. end,
  148. OtherNodes
  149. ).
  150. get_ekka_epmd_port([IP | _]) ->
  151. %% we're currently only checking the first IP, if there are many
  152. case erl_epmd:names(IP) of
  153. {ok, NamePorts} ->
  154. choose_emqx_epmd_port(NamePorts);
  155. _ ->
  156. []
  157. end;
  158. get_ekka_epmd_port([]) ->
  159. %% failed to get?
  160. [].
  161. %% filter out remsh and take the first emqx port as epmd/ekka port
  162. choose_emqx_epmd_port([{"emqx" ++ _, Port} | _]) ->
  163. [Port];
  164. choose_emqx_epmd_port([{_Name, _Port} | Rest]) ->
  165. choose_emqx_epmd_port(Rest);
  166. choose_emqx_epmd_port([]) ->
  167. [].
  168. do_check({Node, #{resolved_ips := []} = Plan}) ->
  169. {Node, Plan#{status => failed_to_resolve_ip}};
  170. do_check({Node, #{resolved_ips := [IP | _]} = Plan}) ->
  171. %% check other IPs too?
  172. PortsToCheck = maps:get(ports_to_check, Plan),
  173. PortStatus0 = lists:map(fun(P) -> is_tcp_port_open(IP, P) end, PortsToCheck),
  174. case lists:all(fun(IsOpen) -> IsOpen end, PortStatus0) of
  175. true ->
  176. {Node, Plan#{status => ok}};
  177. false ->
  178. PortStatus1 = maps:from_list(lists:zip(PortsToCheck, PortStatus0)),
  179. {Node, Plan#{status => bad_ports, open_ports => PortStatus1}}
  180. end.
  181. node_to_ips(Node) ->
  182. NodeBin0 = atom_to_binary(Node),
  183. HostOrIP = re:replace(NodeBin0, <<"^.+@">>, <<"">>, [{return, list}]),
  184. AddressType = resolve_dist_address_type(),
  185. case inet:gethostbyname(HostOrIP, AddressType) of
  186. {ok, #hostent{h_addr_list = AddrList}} ->
  187. AddrList;
  188. _ ->
  189. []
  190. end.
  191. is_tcp_port_open(IP, Port) ->
  192. case gen_tcp:connect(IP, Port, [], ?PORT_PROBE_TIMEOUT) of
  193. {ok, P} ->
  194. gen_tcp:close(P),
  195. true;
  196. _ ->
  197. false
  198. end.
  199. resolve_dist_address_type() ->
  200. ProtoDistStr = os:getenv("EKKA_PROTO_DIST_MOD", "inet_tcp"),
  201. case ProtoDistStr of
  202. "inet_tcp" ->
  203. inet;
  204. "inet6_tcp" ->
  205. inet6;
  206. "inet_tls" ->
  207. inet;
  208. "inet6_tls" ->
  209. inet6;
  210. _ ->
  211. inet
  212. end.
  213. %% Note: this function is stored in the Mria's application environment
  214. mria_lb_custom_info() ->
  215. get_emqx_vsn().
  216. %% Note: this function is stored in the Mria's application environment
  217. mria_lb_custom_info_check(undefined) ->
  218. false;
  219. mria_lb_custom_info_check(OtherVsn) ->
  220. get_emqx_vsn() =:= OtherVsn.
  221. get_emqx_vsn() ->
  222. case application:get_key(emqx, vsn) of
  223. {ok, Vsn} ->
  224. Vsn;
  225. undefined ->
  226. undefined
  227. end.