Просмотр исходного кода

Merge pull request #10369 from ieQu1/fix-stats-api

fix(emqx_management): Ignore results from the nodes that are down
ieQu1 2 лет назад
Родитель
Сommit
c0d8e9c402

+ 18 - 0
apps/emqx/src/emqx.erl

@@ -30,6 +30,12 @@
     stop/0
 ]).
 
+%% Cluster API
+-export([
+    cluster_nodes/1,
+    running_nodes/0
+]).
+
 %% PubSub API
 -export([
     subscribe/1,
@@ -102,6 +108,18 @@ is_running() ->
         _ -> true
     end.
 
+%%--------------------------------------------------------------------
+%% Cluster API
+%%--------------------------------------------------------------------
+
+-spec running_nodes() -> [node()].
+running_nodes() ->
+    mria:running_nodes().
+
+-spec cluster_nodes(all | running | cores | stopped) -> [node()].
+cluster_nodes(Type) ->
+    mria:cluster_nodes(Type).
+
 %%--------------------------------------------------------------------
 %% PubSub API
 %%--------------------------------------------------------------------

+ 8 - 0
apps/emqx/test/emqx_SUITE.erl

@@ -148,6 +148,14 @@ t_run_hook(_) ->
     ?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
     ?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)).
 
+t_cluster_nodes(_) ->
+    Expected = [node()],
+    ?assertEqual(Expected, emqx:running_nodes()),
+    ?assertEqual(Expected, emqx:cluster_nodes(running)),
+    ?assertEqual(Expected, emqx:cluster_nodes(all)),
+    ?assertEqual(Expected, emqx:cluster_nodes(cores)),
+    ?assertEqual([], emqx:cluster_nodes(stopped)).
+
 %%--------------------------------------------------------------------
 %% Hook fun
 %%--------------------------------------------------------------------

+ 32 - 25
apps/emqx_management/src/emqx_mgmt.erl

@@ -112,8 +112,8 @@
 %%--------------------------------------------------------------------
 
 list_nodes() ->
-    Running = mria:cluster_nodes(running),
-    Stopped = mria:cluster_nodes(stopped),
+    Running = emqx:cluster_nodes(running),
+    Stopped = emqx:cluster_nodes(stopped),
     DownNodes = lists:map(fun stopped_node_info/1, Stopped),
     [{Node, Info} || #{node := Node} = Info <- node_info(Running)] ++ DownNodes.
 
@@ -199,7 +199,7 @@ vm_stats() ->
 %%--------------------------------------------------------------------
 
 list_brokers() ->
-    Running = mria:running_nodes(),
+    Running = emqx:running_nodes(),
     [{Node, Broker} || #{node := Node} = Broker <- broker_info(Running)].
 
 lookup_broker(Node) ->
@@ -223,7 +223,7 @@ broker_info(Nodes) ->
 %%--------------------------------------------------------------------
 
 get_metrics() ->
-    nodes_info_count([get_metrics(Node) || Node <- mria:running_nodes()]).
+    nodes_info_count([get_metrics(Node) || Node <- emqx:running_nodes()]).
 
 get_metrics(Node) ->
     unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
@@ -238,13 +238,20 @@ get_stats() ->
             'subscriptions.shared.count',
             'subscriptions.shared.max'
         ],
-    CountStats = nodes_info_count([
-        begin
-            Stats = get_stats(Node),
-            delete_keys(Stats, GlobalStatsKeys)
-        end
-     || Node <- mria:running_nodes()
-    ]),
+    CountStats = nodes_info_count(
+        lists:foldl(
+            fun(Node, Acc) ->
+                case get_stats(Node) of
+                    {error, _} ->
+                        Acc;
+                    Stats ->
+                        [delete_keys(Stats, GlobalStatsKeys) | Acc]
+                end
+            end,
+            [],
+            emqx:running_nodes()
+        )
+    ),
     GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
     maps:merge(CountStats, GlobalStats).
 
@@ -275,12 +282,12 @@ nodes_info_count(PropList) ->
 lookup_client({clientid, ClientId}, FormatFun) ->
     lists:append([
         lookup_client(Node, {clientid, ClientId}, FormatFun)
-     || Node <- mria:running_nodes()
+     || Node <- emqx:running_nodes()
     ]);
 lookup_client({username, Username}, FormatFun) ->
     lists:append([
         lookup_client(Node, {username, Username}, FormatFun)
-     || Node <- mria:running_nodes()
+     || Node <- emqx:running_nodes()
     ]).
 
 lookup_client(Node, Key, FormatFun) ->
@@ -307,7 +314,7 @@ kickout_client(ClientId) ->
         [] ->
             {error, not_found};
         _ ->
-            Results = [kickout_client(Node, ClientId) || Node <- mria:running_nodes()],
+            Results = [kickout_client(Node, ClientId) || Node <- emqx:running_nodes()],
             check_results(Results)
     end.
 
@@ -322,7 +329,7 @@ list_client_subscriptions(ClientId) ->
         [] ->
             {error, not_found};
         _ ->
-            Results = [client_subscriptions(Node, ClientId) || Node <- mria:running_nodes()],
+            Results = [client_subscriptions(Node, ClientId) || Node <- emqx:running_nodes()],
             Filter =
                 fun
                     ({error, _}) ->
@@ -340,18 +347,18 @@ client_subscriptions(Node, ClientId) ->
     {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}.
 
 clean_authz_cache(ClientId) ->
-    Results = [clean_authz_cache(Node, ClientId) || Node <- mria:running_nodes()],
+    Results = [clean_authz_cache(Node, ClientId) || Node <- emqx:running_nodes()],
     check_results(Results).
 
 clean_authz_cache(Node, ClientId) ->
     unwrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)).
 
 clean_authz_cache_all() ->
-    Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria:running_nodes()],
+    Results = [{Node, clean_authz_cache_all(Node)} || Node <- emqx:running_nodes()],
     wrap_results(Results).
 
 clean_pem_cache_all() ->
-    Results = [{Node, clean_pem_cache_all(Node)} || Node <- mria:running_nodes()],
+    Results = [{Node, clean_pem_cache_all(Node)} || Node <- emqx:running_nodes()],
     wrap_results(Results).
 
 wrap_results(Results) ->
@@ -379,7 +386,7 @@ set_keepalive(_ClientId, _Interval) ->
 
 %% @private
 call_client(ClientId, Req) ->
-    Results = [call_client(Node, ClientId, Req) || Node <- mria:running_nodes()],
+    Results = [call_client(Node, ClientId, Req) || Node <- emqx:running_nodes()],
     Expected = lists:filter(
         fun
             ({error, _}) -> false;
@@ -428,7 +435,7 @@ list_subscriptions(Node) ->
 list_subscriptions_via_topic(Topic, FormatFun) ->
     lists:append([
         list_subscriptions_via_topic(Node, Topic, FormatFun)
-     || Node <- mria:running_nodes()
+     || Node <- emqx:running_nodes()
     ]).
 
 list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
@@ -442,7 +449,7 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
 %%--------------------------------------------------------------------
 
 subscribe(ClientId, TopicTables) ->
-    subscribe(mria:running_nodes(), ClientId, TopicTables).
+    subscribe(emqx:running_nodes(), ClientId, TopicTables).
 
 subscribe([Node | Nodes], ClientId, TopicTables) ->
     case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of
@@ -467,7 +474,7 @@ publish(Msg) ->
 -spec unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
     {unsubscribe, _} | {error, channel_not_found}.
 unsubscribe(ClientId, Topic) ->
-    unsubscribe(mria:running_nodes(), ClientId, Topic).
+    unsubscribe(emqx:running_nodes(), ClientId, Topic).
 
 -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
     {unsubscribe, _} | {error, channel_not_found}.
@@ -490,7 +497,7 @@ do_unsubscribe(ClientId, Topic) ->
 -spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
     {unsubscribe, _} | {error, channel_not_found}.
 unsubscribe_batch(ClientId, Topics) ->
-    unsubscribe_batch(mria:running_nodes(), ClientId, Topics).
+    unsubscribe_batch(emqx:running_nodes(), ClientId, Topics).
 
 -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
     {unsubscribe_batch, _} | {error, channel_not_found}.
@@ -515,7 +522,7 @@ do_unsubscribe_batch(ClientId, Topics) ->
 %%--------------------------------------------------------------------
 
 get_alarms(Type) ->
-    [{Node, get_alarms(Node, Type)} || Node <- mria:running_nodes()].
+    [{Node, get_alarms(Node, Type)} || Node <- emqx:running_nodes()].
 
 get_alarms(Node, Type) ->
     add_duration_field(unwrap_rpc(emqx_proto_v1:get_alarms(Node, Type))).
@@ -524,7 +531,7 @@ deactivate(Node, Name) ->
     unwrap_rpc(emqx_proto_v1:deactivate_alarm(Node, Name)).
 
 delete_all_deactivated_alarms() ->
-    [delete_all_deactivated_alarms(Node) || Node <- mria:running_nodes()].
+    [delete_all_deactivated_alarms(Node) || Node <- emqx:running_nodes()].
 
 delete_all_deactivated_alarms(Node) ->
     unwrap_rpc(emqx_proto_v1:delete_all_deactivated_alarms(Node)).

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -163,7 +163,7 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
             {error, page_limit_invalid};
         Meta ->
             {_CodCnt, NQString} = parse_qstring(QString, QSchema),
-            Nodes = mria:running_nodes(),
+            Nodes = emqx:running_nodes(),
             ResultAcc = init_query_result(),
             QueryState = init_query_state(Tab, NQString, MsFun, Meta),
             NResultAcc = do_cluster_query(

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_cluster.erl

@@ -101,7 +101,7 @@ cluster_info(get, _) ->
     ClusterName = application:get_env(ekka, cluster_name, emqxcl),
     Info = #{
         name => ClusterName,
-        nodes => mria:running_nodes(),
+        nodes => emqx:running_nodes(),
         self => node()
     },
     {200, Info}.

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_configs.erl

@@ -258,7 +258,7 @@ configs(get, Params, _Req) ->
     QS = maps:get(query_string, Params, #{}),
     Node = maps:get(<<"node">>, QS, node()),
     case
-        lists:member(Node, mria:running_nodes()) andalso
+        lists:member(Node, emqx:running_nodes()) andalso
             emqx_management_proto_v2:get_full_config(Node)
     of
         false ->

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -483,7 +483,7 @@ err_msg_str(Reason) ->
     io_lib:format("~p", [Reason]).
 
 list_listeners() ->
-    [list_listeners(Node) || Node <- mria:running_nodes()].
+    [list_listeners(Node) || Node <- emqx:running_nodes()].
 
 list_listeners(Node) ->
     wrap_rpc(emqx_management_proto_v2:list_listeners(Node)).

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_metrics.erl

@@ -59,7 +59,7 @@ metrics(get, #{query_string := Qs}) ->
                 maps:from_list(
                     emqx_mgmt:get_metrics(Node) ++ [{node, Node}]
                 )
-             || Node <- mria:running_nodes()
+             || Node <- emqx:running_nodes()
             ],
             {200, Data}
     end.

+ 13 - 13
apps/emqx_management/src/emqx_mgmt_api_stats.erl

@@ -127,21 +127,21 @@ list(get, #{query_string := Qs}) ->
         true ->
             {200, emqx_mgmt:get_stats()};
         _ ->
-            Data = [
-                maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}])
-             || Node <- running_nodes()
-            ],
+            Data = lists:foldl(
+                fun(Node, Acc) ->
+                    case emqx_mgmt:get_stats(Node) of
+                        {error, _Err} ->
+                            Acc;
+                        Stats when is_list(Stats) ->
+                            Data = maps:from_list([{node, Node} | Stats]),
+                            [Data | Acc]
+                    end
+                end,
+                [],
+                emqx:running_nodes()
+            ),
             {200, Data}
     end.
 
 %%%==============================================================================================
 %% Internal
-
-running_nodes() ->
-    Nodes = erlang:nodes([visible, this]),
-    RpcResults = emqx_proto_v2:are_running(Nodes),
-    [
-        Node
-     || {Node, IsRunning} <- lists:zip(Nodes, RpcResults),
-        IsRunning =:= {ok, true}
-    ].

+ 5 - 5
apps/emqx_management/src/emqx_mgmt_api_trace.erl

@@ -390,7 +390,7 @@ trace(get, _Params) ->
                 fun(#{start_at := A}, #{start_at := B}) -> A > B end,
                 emqx_trace:format(List0)
             ),
-            Nodes = mria:running_nodes(),
+            Nodes = emqx:running_nodes(),
             TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v2:get_trace_size(Nodes)),
             AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
             Now = erlang:system_time(second),
@@ -464,7 +464,7 @@ format_trace(Trace0) ->
     LogSize = lists:foldl(
         fun(Node, Acc) -> Acc#{Node => 0} end,
         #{},
-        mria:running_nodes()
+        emqx:running_nodes()
     ),
     Trace2 = maps:without([enable, filter], Trace1),
     Trace2#{
@@ -560,13 +560,13 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
     ).
 
 collect_trace_file(undefined, TraceLog) ->
-    Nodes = mria:running_nodes(),
+    Nodes = emqx:running_nodes(),
     wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file(Nodes, TraceLog));
 collect_trace_file(Node, TraceLog) ->
     wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file([Node], TraceLog)).
 
 collect_trace_file_detail(TraceLog) ->
-    Nodes = mria:running_nodes(),
+    Nodes = emqx:running_nodes(),
     wrap_rpc(emqx_mgmt_trace_proto_v2:trace_file_detail(Nodes, TraceLog)).
 
 wrap_rpc({GoodRes, BadNodes}) ->
@@ -696,7 +696,7 @@ parse_node(Query, Default) ->
                 {ok, Default};
             {ok, NodeBin} ->
                 Node = binary_to_existing_atom(NodeBin),
-                true = lists:member(Node, mria:running_nodes()),
+                true = lists:member(Node, emqx:running_nodes()),
                 {ok, Node}
         end
     catch

+ 5 - 5
apps/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -36,16 +36,16 @@ end_per_suite(_) ->
     emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
 
 init_per_testcase(TestCase, Config) ->
-    meck:expect(mria, running_nodes, 0, [node()]),
+    meck:expect(emqx, running_nodes, 0, [node()]),
     emqx_common_test_helpers:init_per_testcase(?MODULE, TestCase, Config).
 
 end_per_testcase(TestCase, Config) ->
-    meck:unload(mria),
+    meck:unload(emqx),
     emqx_common_test_helpers:end_per_testcase(?MODULE, TestCase, Config).
 
 t_list_nodes(init, Config) ->
     meck:expect(
-        mria,
+        emqx,
         cluster_nodes,
         fun
             (running) -> [node()];
@@ -125,7 +125,7 @@ t_lookup_client(_Config) ->
         emqx_mgmt:lookup_client({username, <<"user1">>}, ?FORMATFUN)
     ),
     ?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)),
-    meck:expect(mria, running_nodes, 0, [node(), 'fake@nonode']),
+    meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']),
     ?assertMatch(
         [_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN)
     ).
@@ -188,7 +188,7 @@ t_clean_cache(_Config) ->
         {error, _},
         emqx_mgmt:clean_pem_cache_all()
     ),
-    meck:expect(mria, running_nodes, 0, [node(), 'fake@nonode']),
+    meck:expect(emqx, running_nodes, 0, [node(), 'fake@nonode']),
     ?assertMatch(
         {error, [{'fake@nonode', {error, _}}]},
         emqx_mgmt:clean_authz_cache_all()

+ 3 - 3
apps/emqx_management/test/emqx_mgmt_api_SUITE.erl

@@ -179,14 +179,14 @@ t_bad_rpc(_) ->
     ClientLs1 = [start_emqtt_client(node(), I, 1883) || I <- lists:seq(1, 10)],
     Path = emqx_mgmt_api_test_util:api_path(["clients?limit=2&page=2"]),
     try
-        meck:expect(mria, running_nodes, 0, ['fake@nohost']),
+        meck:expect(emqx, running_nodes, 0, ['fake@nohost']),
         {error, {_, 500, _}} = emqx_mgmt_api_test_util:request_api(get, Path),
         %% good cop, bad cop
-        meck:expect(mria, running_nodes, 0, [node(), 'fake@nohost']),
+        meck:expect(emqx, running_nodes, 0, [node(), 'fake@nohost']),
         {error, {_, 500, _}} = emqx_mgmt_api_test_util:request_api(get, Path)
     after
         _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),
-        meck:unload(mria),
+        meck:unload(emqx),
         emqx_mgmt_api_test_util:end_suite()
     end.
 

+ 2 - 2
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl

@@ -246,7 +246,7 @@ t_dashboard(_Config) ->
 
 t_configs_node({'init', Config}) ->
     Node = node(),
-    meck:expect(mria, running_nodes, fun() -> [Node, bad_node, other_node] end),
+    meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end),
     meck:expect(
         emqx_management_proto_v2,
         get_full_config,
@@ -258,7 +258,7 @@ t_configs_node({'init', Config}) ->
     ),
     Config;
 t_configs_node({'end', _}) ->
-    meck:unload([mria, emqx_management_proto_v2]);
+    meck:unload([emqx, emqx_management_proto_v2]);
 t_configs_node(_) ->
     Node = atom_to_list(node()),
 

+ 2 - 2
apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl

@@ -168,8 +168,8 @@ t_api_listeners_list_not_ready(Config) when is_list(Config) ->
         L3 = get_tcp_listeners(Node2),
 
         Comment = #{
-            node1 => rpc:call(Node1, mria, running_nodes, []),
-            node2 => rpc:call(Node2, mria, running_nodes, [])
+            node1 => rpc:call(Node1, emqx, running_nodes, []),
+            node2 => rpc:call(Node2, emqx, running_nodes, [])
         },
 
         ?assert(length(L1) > length(L2), Comment),

+ 2 - 0
apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl

@@ -24,10 +24,12 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
+    meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']),
     emqx_mgmt_api_test_util:init_suite(),
     Config.
 
 end_per_suite(_) ->
+    meck:unload(emqx),
     emqx_mgmt_api_test_util:end_suite().
 
 t_stats_api(_) ->

+ 2 - 2
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -599,8 +599,8 @@ emqx_cluster() ->
     ].
 
 emqx_cluster_data() ->
-    Running = mria:cluster_nodes(running),
-    Stopped = mria:cluster_nodes(stopped),
+    Running = emqx:cluster_nodes(running),
+    Stopped = emqx:cluster_nodes(stopped),
     [
         {nodes_running, length(Running)},
         {nodes_stopped, length(Stopped)}

+ 6 - 0
changes/ce/fix-10369.en.md

@@ -0,0 +1,6 @@
+Fix error in `/api/v5/monitor_current` API endpoint that happens when some EMQX nodes are down.
+
+Prior to this fix, sometimes the request returned HTTP code 500 and the following message:
+```
+{"code":"INTERNAL_ERROR","message":"error, badarg, [{erlang,'++',[{error,nodedown},[{node,'emqx@10.42.0.150'}]], ...
+```