| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_machine).
- -export([
- start/0,
- graceful_shutdown/0,
- brutal_shutdown/0,
- is_ready/0,
- node_status/0,
- update_vips/0
- ]).
- -export([open_ports_check/0]).
- -export([mria_lb_custom_info/0, mria_lb_custom_info_check/1]).
- -ifdef(TEST).
- -export([create_plan/0]).
- -endif.
- -include_lib("kernel/include/inet.hrl").
- -include_lib("emqx/include/logger.hrl").
- %% @doc EMQX boot entrypoint.
- start() ->
- emqx_mgmt_cli:load(),
- case os:type() of
- {win32, nt} ->
- ok;
- _Nix ->
- os:set_signal(sighup, ignore),
- %% default is handle
- os:set_signal(sigterm, handle)
- end,
- ok = set_backtrace_depth(),
- ok = start_sysmon(),
- configure_shard_transports(),
- set_mnesia_extra_diagnostic_checks(),
- emqx_otel_app:configure_otel_deps(),
- %% Register mria callbacks that help to check compatibility of the
- %% replicant with the core node. Currently they rely on the exact
- %% match of the version of EMQX OTP application:
- _ = application:load(mria),
- _ = application:load(emqx),
- mria_config:register_callback(lb_custom_info, fun ?MODULE:mria_lb_custom_info/0),
- mria_config:register_callback(lb_custom_info_check, fun ?MODULE:mria_lb_custom_info_check/1),
- ekka:start(),
- ok.
- graceful_shutdown() ->
- emqx_machine_terminator:graceful_wait().
- %% only used when failed to boot
- brutal_shutdown() ->
- init:stop().
- set_backtrace_depth() ->
- {ok, Depth} = application:get_env(emqx_machine, backtrace_depth),
- _ = erlang:system_flag(backtrace_depth, Depth),
- ok.
- %% @doc Return true if boot is complete.
- is_ready() ->
- emqx_machine_terminator:is_running().
- start_sysmon() ->
- _ = application:load(system_monitor),
- application:set_env(system_monitor, node_status_fun, {?MODULE, node_status}),
- application:set_env(system_monitor, status_checks, [{?MODULE, update_vips, false, 10}]),
- case application:get_env(system_monitor, db_hostname) of
- {ok, [_ | _]} ->
- application:set_env(system_monitor, callback_mod, system_monitor_pg),
- _ = application:ensure_all_started(system_monitor, temporary),
- ok;
- _ ->
- %% If there is no sink for the events, there is no reason
- %% to run system_monitor_top, ignore start
- ok
- end.
- node_status() ->
- emqx_utils_json:encode(#{
- backend => mria_rlog:backend(),
- role => mria_rlog:role()
- }).
- update_vips() ->
- system_monitor:add_vip(mria_status:shards_up()).
- configure_shard_transports() ->
- ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}),
- lists:foreach(
- fun({ShardBin, Transport}) ->
- ShardName = binary_to_existing_atom(ShardBin),
- mria_config:set_shard_transport(ShardName, Transport)
- end,
- maps:to_list(ShardTransports)
- ).
- set_mnesia_extra_diagnostic_checks() ->
- Checks = [{check_open_ports, ok, fun ?MODULE:open_ports_check/0}],
- mria_config:set_extra_mnesia_diagnostic_checks(Checks),
- ok.
- -define(PORT_PROBE_TIMEOUT, 10_000).
- open_ports_check() ->
- Plan = create_plan(),
- %% 2 ports to check: ekka/epmd and gen_rpc
- Timeout = 2 * ?PORT_PROBE_TIMEOUT + 5_000,
- try emqx_utils:pmap(fun do_check/1, Plan, Timeout) of
- Results ->
- verify_results(Results)
- catch
- Kind:Reason:Stacktrace ->
- #{
- msg => "error probing ports",
- exception => Kind,
- reason => Reason,
- stacktrace => Stacktrace
- }
- end.
- verify_results(Results0) ->
- Errors = [
- R
- || R = {_Node, #{status := Status}} <- Results0,
- Status =/= ok
- ],
- case Errors of
- [] ->
- %% all ok
- ok;
- _ ->
- Results1 = maps:from_list(Results0),
- #{results => Results1, msg => "some ports are unreachable"}
- end.
- create_plan() ->
- %% expected core nodes according to mnesia schema
- OtherNodes = mnesia:system_info(db_nodes) -- [node()],
- lists:map(
- fun(N) ->
- IPs = node_to_ips(N),
- {_GenRPCMod, GenRPCPort} = gen_rpc_helper:get_client_config_per_node(N),
- %% 0 or 1 result
- EkkaEPMDPort = get_ekka_epmd_port(IPs),
- {N, #{
- resolved_ips => IPs,
- ports_to_check => [GenRPCPort | EkkaEPMDPort]
- }}
- end,
- OtherNodes
- ).
- get_ekka_epmd_port([IP | _]) ->
- %% we're currently only checking the first IP, if there are many
- case erl_epmd:names(IP) of
- {ok, NamePorts} ->
- choose_emqx_epmd_port(NamePorts);
- _ ->
- []
- end;
- get_ekka_epmd_port([]) ->
- %% failed to get?
- [].
- %% filter out remsh and take the first emqx port as epmd/ekka port
- choose_emqx_epmd_port([{"emqx" ++ _, Port} | _]) ->
- [Port];
- choose_emqx_epmd_port([{_Name, _Port} | Rest]) ->
- choose_emqx_epmd_port(Rest);
- choose_emqx_epmd_port([]) ->
- [].
- do_check({Node, #{resolved_ips := []} = Plan}) ->
- {Node, Plan#{status => failed_to_resolve_ip}};
- do_check({Node, #{resolved_ips := [IP | _]} = Plan}) ->
- %% check other IPs too?
- PortsToCheck = maps:get(ports_to_check, Plan),
- PortStatus0 = lists:map(fun(P) -> is_tcp_port_open(IP, P) end, PortsToCheck),
- case lists:all(fun(IsOpen) -> IsOpen end, PortStatus0) of
- true ->
- {Node, Plan#{status => ok}};
- false ->
- PortStatus1 = maps:from_list(lists:zip(PortsToCheck, PortStatus0)),
- {Node, Plan#{status => bad_ports, open_ports => PortStatus1}}
- end.
- node_to_ips(Node) ->
- NodeBin0 = atom_to_binary(Node),
- HostOrIP = re:replace(NodeBin0, <<"^.+@">>, <<"">>, [{return, list}]),
- AddressType = resolve_dist_address_type(),
- case inet:gethostbyname(HostOrIP, AddressType) of
- {ok, #hostent{h_addr_list = AddrList}} ->
- AddrList;
- _ ->
- []
- end.
- is_tcp_port_open(IP, Port) ->
- case gen_tcp:connect(IP, Port, [], ?PORT_PROBE_TIMEOUT) of
- {ok, P} ->
- gen_tcp:close(P),
- true;
- _ ->
- false
- end.
- resolve_dist_address_type() ->
- ProtoDistStr = os:getenv("EKKA_PROTO_DIST_MOD", "inet_tcp"),
- case ProtoDistStr of
- "inet_tcp" ->
- inet;
- "inet6_tcp" ->
- inet6;
- "inet_tls" ->
- inet;
- "inet6_tls" ->
- inet6;
- _ ->
- inet
- end.
- %% Note: this function is stored in the Mria's application environment
- mria_lb_custom_info() ->
- get_emqx_vsn().
- %% Note: this function is stored in the Mria's application environment
- mria_lb_custom_info_check(undefined) ->
- false;
- mria_lb_custom_info_check(OtherVsn) ->
- get_emqx_vsn() =:= OtherVsn.
- get_emqx_vsn() ->
- case application:get_key(emqx, vsn) of
- {ok, Vsn} ->
- Vsn;
- undefined ->
- undefined
- end.
|