فهرست منبع

feat: add port scan diagnostics to mria waiting for tables checks

Fixes https://emqx.atlassian.net/browse/EMQX-10944

Also updates ekka -> 0.15.15, mria -> 0.6.4

How to test
===========

1. Start 2 or more EMQX nodes and merge them in a cluster.
2. Stop them in order.
3. Start only the first node that was stopped in the previous step.
4. Wait until the log is printed.

Or, more easily:

1. Start 2 or more EMQX nodes and merge them in a cluster.
2. Stop all but one.
3. Run `mria_mnesia:diagnosis([]).` on that node.

Example output
==============

```
   Check check_open_ports should get ok but got #{msg =>
                                                     "some ports are unreachable",
                                                 results =>
                                                     #{'emqx@172.100.239.4' =>
                                                           #{open_ports =>
                                                                 #{4370 => false,
                                                                   5370 =>
                                                                       false},
                                                             ports_to_check =>
                                                                 [4370,5370],
                                                             resolved_ips =>
                                                                 [{172,100,239,
                                                                   4}],
                                                             status =>
                                                                 bad_ports},
                                                       'emqx@172.100.239.5' =>
                                                           #{open_ports =>
                                                                 #{4370 => false,
                                                                   5370 =>
                                                                       false},
                                                             ports_to_check =>
                                                                 [4370,5370],
                                                             resolved_ips =>
                                                                 [{172,100,239,
                                                                   5}],
                                                             status =>
                                                                 bad_ports}}}
```

After one node is back:

```
   Check check_open_ports should get ok but got #{msg =>
                                                     "some ports are unreachable",
                                                 results =>
                                                     #{'emqx@172.100.239.4' =>
                                                           #{ports_to_check =>
                                                                 [4370,5370],
                                                             resolved_ips =>
                                                                 [{172,100,239,
                                                                   4}],
                                                             status => ok},
                                                       'emqx@172.100.239.5' =>
                                                           #{open_ports =>
                                                                 #{4370 => false,
                                                                   5370 =>
                                                                       false},
                                                             ports_to_check =>
                                                                 [4370,5370],
                                                             resolved_ips =>
                                                                 [{172,100,239,
                                                                   5}],
                                                             status =>
                                                                 bad_ports}}}
```
Thales Macedo Garitezi 2 سال پیش
والد
کامیت
d6935b6a67
6فایلهای تغییر یافته به همراه184 افزوده شده و 3 حذف شده
  1. 1 1
      apps/emqx/rebar.config
  2. 116 0
      apps/emqx_machine/src/emqx_machine.erl
  3. 62 0
      apps/emqx_machine/test/emqx_machine_SUITE.erl
  4. 3 0
      changes/ce/feat-11637.en.md
  5. 1 1
      mix.exs
  6. 1 1
      rebar.config

+ 1 - 1
apps/emqx/rebar.config

@@ -28,7 +28,7 @@
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}},
-    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}},
+    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},

+ 116 - 0
apps/emqx_machine/src/emqx_machine.erl

@@ -26,6 +26,13 @@
     update_vips/0
 ]).
 
+-export([open_ports_check/0]).
+
+-ifdef(TEST).
+-export([create_plan/0]).
+-endif.
+
+-include_lib("kernel/include/inet.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 %% @doc EMQX boot entrypoint.
@@ -42,6 +49,7 @@ start() ->
     ok = set_backtrace_depth(),
     start_sysmon(),
     configure_shard_transports(),
+    set_mnesia_extra_diagnostic_checks(),
     ekka:start(),
     ok.
 
@@ -94,3 +102,111 @@ configure_shard_transports() ->
         end,
         maps:to_list(ShardTransports)
     ).
+
+set_mnesia_extra_diagnostic_checks() ->
+    Checks = [{check_open_ports, ok, fun ?MODULE:open_ports_check/0}],
+    mria_config:set_extra_mnesia_diagnostic_checks(Checks),
+    ok.
+
+-define(PORT_PROBE_TIMEOUT, 10_000).
+open_ports_check() ->
+    Plan = create_plan(),
+    %% 2 ports to check: ekka/epmd and gen_rpc
+    Timeout = 2 * ?PORT_PROBE_TIMEOUT + 5_000,
+    try emqx_utils:pmap(fun do_check/1, Plan, Timeout) of
+        Results ->
+            verify_results(Results)
+    catch
+        Kind:Reason:Stacktrace ->
+            #{
+                msg => "error probing ports",
+                exception => Kind,
+                reason => Reason,
+                stacktrace => Stacktrace
+            }
+    end.
+
+verify_results(Results0) ->
+    Errors = [
+        R
+     || R = {_Node, #{status := Status}} <- Results0,
+        Status =/= ok
+    ],
+    case Errors of
+        [] ->
+            %% all ok
+            ok;
+        _ ->
+            Results1 = maps:from_list(Results0),
+            #{results => Results1, msg => "some ports are unreachable"}
+    end.
+
+create_plan() ->
+    %% expected core nodes according to mnesia schema
+    OtherNodes = mnesia:system_info(db_nodes) -- [node()],
+    lists:map(
+        fun(N) ->
+            IPs = node_to_ips(N),
+            {_GenRPCMod, GenRPCPort} = gen_rpc_helper:get_client_config_per_node(N),
+            %% 0 or 1 result
+            EkkaEPMDPort = get_ekka_epmd_port(IPs),
+            {N, #{
+                resolved_ips => IPs,
+                ports_to_check => [GenRPCPort | EkkaEPMDPort]
+            }}
+        end,
+        OtherNodes
+    ).
+
+get_ekka_epmd_port([IP | _]) ->
+    %% we're currently only checking the first IP, if there are many
+    case erl_epmd:names(IP) of
+        {ok, NamePorts} ->
+            choose_emqx_epmd_port(NamePorts);
+        _ ->
+            []
+    end;
+get_ekka_epmd_port([]) ->
+    %% failed to get?
+    [].
+
+%% filter out remsh and take the first emqx port as epmd/ekka port
+choose_emqx_epmd_port([{"emqx" ++ _, Port} | _]) ->
+    [Port];
+choose_emqx_epmd_port([{_Name, _Port} | Rest]) ->
+    choose_emqx_epmd_port(Rest);
+choose_emqx_epmd_port([]) ->
+    [].
+
+do_check({Node, #{resolved_ips := []} = Plan}) ->
+    {Node, Plan#{status => failed_to_resolve_ip}};
+do_check({Node, #{resolved_ips := [IP | _]} = Plan}) ->
+    %% check other IPs too?
+    PortsToCheck = maps:get(ports_to_check, Plan),
+    PortStatus0 = lists:map(fun(P) -> is_tcp_port_open(IP, P) end, PortsToCheck),
+    case lists:all(fun(IsOpen) -> IsOpen end, PortStatus0) of
+        true ->
+            {Node, Plan#{status => ok}};
+        false ->
+            PortStatus1 = maps:from_list(lists:zip(PortsToCheck, PortStatus0)),
+            {Node, Plan#{status => bad_ports, open_ports => PortStatus1}}
+    end.
+
+node_to_ips(Node) ->
+    NodeBin0 = atom_to_binary(Node),
+    HostOrIP = re:replace(NodeBin0, <<"^.+@">>, <<"">>, [{return, list}]),
+    case inet:gethostbyname(HostOrIP, inet) of
+        {ok, #hostent{h_addr_list = AddrList}} ->
+            AddrList;
+        _ ->
+            []
+    end.
+
+is_tcp_port_open(IP, Port) ->
+    case gen_tcp:connect(IP, Port, [], ?PORT_PROBE_TIMEOUT) of
+        {ok, P} ->
+            gen_tcp:close(P),
+            true;
+        _ ->
+            false
+    end.

+ 62 - 0
apps/emqx_machine/test/emqx_machine_SUITE.erl

@@ -22,6 +22,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
@@ -67,6 +68,15 @@ end_per_suite(_Config) ->
 init_per_testcase(t_custom_shard_transports, Config) ->
     OldConfig = application:get_env(emqx_machine, custom_shard_transports),
     [{old_config, OldConfig} | Config];
+init_per_testcase(t_open_ports_check = TestCase, Config) ->
+    AppSpecs = [emqx],
+    Cluster = [
+        {emqx_machine_SUITE1, #{role => core, apps => AppSpecs}},
+        {emqx_machine_SUITE2, #{role => core, apps => AppSpecs}},
+        {emqx_machine_SUITE3, #{role => replicant, apps => AppSpecs}}
+    ],
+    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
+    [{nodes, Nodes} | Config];
 init_per_testcase(_TestCase, Config) ->
     Config.
 
@@ -80,6 +90,10 @@ end_per_testcase(t_custom_shard_transports, Config) ->
             application:unset_env(emqx_machine, custom_shard_transports)
     end,
     ok;
+end_per_testcase(t_open_ports_check, Config) ->
+    Nodes = ?config(nodes, Config),
+    ok = emqx_cth_cluster:stop(Nodes),
+    ok;
 end_per_testcase(_TestCase, _Config) ->
     ok.
 
@@ -112,3 +126,51 @@ t_node_status(_Config) ->
         },
         jsx:decode(JSON)
     ).
+
+t_open_ports_check(Config) ->
+    [Core1, Core2, Replicant] = ?config(nodes, Config),
+
+    Plan = erpc:call(Core1, emqx_machine, create_plan, []),
+    ?assertMatch(
+        [{Core2, #{ports_to_check := [_GenRPC0, _Ekka0], resolved_ips := [_]}}],
+        Plan
+    ),
+    [{Core2, #{ports_to_check := [GenRPCPort, EkkaPort], resolved_ips := [_]}}] = Plan,
+    ?assertMatch(
+        [{Core1, #{ports_to_check := [_GenRPC1, _Ekka1], resolved_ips := [_]}}],
+        erpc:call(Core2, emqx_machine, create_plan, [])
+    ),
+    ?assertMatch(
+        [],
+        erpc:call(Replicant, emqx_machine, create_plan, [])
+    ),
+
+    ?assertEqual(ok, erpc:call(Core1, emqx_machine, open_ports_check, [])),
+    ?assertEqual(ok, erpc:call(Core2, emqx_machine, open_ports_check, [])),
+    ?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),
+
+    ok = emqx_cth_cluster:stop_node(Core2),
+
+    ?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),
+    ?assertMatch(
+        #{
+            msg := "some ports are unreachable",
+            results :=
+                #{
+                    Core2 :=
+                        #{
+                            open_ports := #{
+                                GenRPCPort := _,
+                                EkkaPort := _
+                            },
+                            ports_to_check := [_, _],
+                            resolved_ips := [_],
+                            status := bad_ports
+                        }
+                }
+        },
+        erpc:call(Core1, emqx_machine, open_ports_check, []),
+        #{core2 => Core2}
+    ),
+
+    ok.

+ 3 - 0
changes/ce/feat-11637.en.md

@@ -0,0 +1,3 @@
+Added an extra diagnostic to help debug issues when mnesia is waiting for tables.
+
+Updated libraries: `ekka` -> 0.15.15, `mria` -> 0.6.4.

+ 1 - 1
mix.exs

@@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
       {:esockd, github: "emqx/esockd", tag: "5.9.7", override: true},
       {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true},
-      {:ekka, github: "emqx/ekka", tag: "0.15.14", override: true},
+      {:ekka, github: "emqx/ekka", tag: "0.15.15", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "3.1.0", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.13", override: true},

+ 1 - 1
rebar.config

@@ -62,7 +62,7 @@
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}}
     , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.13"}}}