Explorar el Código

feat(prometheus): add listener shutdown counts by reason

Fixes https://emqx.atlassian.net/browse/EMQX-13170

Sample output in Prometheus format:

```
ͳ curl -s http://127.0.0.1:18083/api/v5/prometheus/stats\?mode\=node  | rg emqx_client_disconnected_reason
\# TYPE emqx_client_disconnected_reason counter
\# HELP emqx_client_disconnected_reason
emqx_client_disconnected_reason{listener_type="tcp",listener_name="default",reason="kicked"} 1
emqx_client_disconnected_reason{listener_type="tcp",listener_name="default",reason="takenover"} 1

ͳ curl -s http://127.0.0.1:18083/api/v5/prometheus/stats\?mode\=all_nodes_unaggregated  | rg emqx_client_disconnected_reason
\# TYPE emqx_client_disconnected_reason counter
\# HELP emqx_client_disconnected_reason
emqx_client_disconnected_reason{node="emqx@127.0.0.1",listener_type="tcp",listener_name="default",reason="kicked"} 1
emqx_client_disconnected_reason{node="emqx@127.0.0.1",listener_type="tcp",listener_name="default",reason="takenover"} 1
```

In JSON format (now the `client` category is always an array):

```
ͳ curl -s -H 'Accept: application/json' http://127.0.0.1:18083/api/v5/prometheus/stats\?mode\=all_nodes_aggregated | jq '{client}'
{
  "client": [
    {
      "emqx_client_auth_anonymous": 3,
      "emqx_client_authenticate": 3,
      "emqx_client_authorize": 0,
      "emqx_client_connack": 3,
      "emqx_client_connect": 3,
      "emqx_client_connected": 3,
      "emqx_client_disconnected": 3,
      "emqx_client_subscribe": 0,
      "emqx_client_unsubscribe": 0
    },
    {
      "reason": "kicked",
      "listener_type": "tcp",
      "listener_name": "default",
      "emqx_client_disconnected_reason": 1
    },
    {
      "reason": "takenover",
      "listener_type": "tcp",
      "listener_name": "default",
      "emqx_client_disconnected_reason": 1
    }
  ]
}

ͳ curl -s -H 'Accept: application/json' http://127.0.0.1:18083/api/v5/prometheus/stats\?mode\=all_nodes_unaggregated | jq '{client}'
{
  "client": [
    {
      "node": "emqx@127.0.0.1",
      "emqx_client_auth_anonymous": 3,
      "emqx_client_authenticate": 3,
      "emqx_client_authorize": 0,
      "emqx_client_connack": 3,
      "emqx_client_connect": 3,
      "emqx_client_connected": 3,
      "emqx_client_disconnected": 3,
      "emqx_client_subscribe": 0,
      "emqx_client_unsubscribe": 0
    },
    {
      "node": "emqx@127.0.0.1",
      "reason": "takenover",
      "listener_type": "tcp",
      "listener_name": "default",
      "emqx_client_disconnected_reason": 1
    },
    {
      "node": "emqx@127.0.0.1",
      "reason": "kicked",
      "listener_type": "tcp",
      "listener_name": "default",
      "emqx_client_disconnected_reason": 1
    }
  ]
}
```
Thales Macedo Garitezi hace 1 año
padre
commit
deebd8d08e

+ 1 - 1
apps/emqx_prometheus/src/emqx_prometheus.app.src

@@ -2,7 +2,7 @@
 {application, emqx_prometheus, [
     {description, "Prometheus for EMQX"},
     % strict semver, bump manually!
-    {vsn, "5.2.6"},
+    {vsn, "5.2.7"},
     {modules, []},
     {registered, [emqx_prometheus_sup]},
     {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]},

+ 55 - 9
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -253,7 +253,7 @@ collect(<<"json">>) ->
         packets => collect_json_data(?MG(emqx_packet_data, RawData)),
         messages => collect_json_data(?MG(emqx_message_data, RawData)),
         delivery => collect_json_data(?MG(emqx_delivery_data, RawData)),
-        client => collect_json_data(?MG(emqx_client_data, RawData)),
+        client => collect_client_json_data(?MG(emqx_client_data, RawData)),
         session => collect_json_data(?MG(emqx_session_data, RawData)),
         cluster => collect_json_data(?MG(cluster_data, RawData)),
         olp => collect_json_data(?MG(emqx_olp_data, RawData)),
@@ -285,7 +285,7 @@ fetch_from_local_node(Mode) ->
         emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta(), Mode),
         emqx_message_data => emqx_metric_data(message_metric_meta(), Mode),
         emqx_delivery_data => emqx_metric_data(delivery_metric_meta(), Mode),
-        emqx_client_data => emqx_metric_data(client_metric_meta(), Mode),
+        emqx_client_data => client_metric_data(Mode),
         emqx_session_data => emqx_metric_data(session_metric_meta(), Mode),
         emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
         emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
@@ -464,6 +464,7 @@ emqx_collect(K = emqx_client_authorize, D) -> counter_metrics(?MG(K, D));
 emqx_collect(K = emqx_client_subscribe, D) -> counter_metrics(?MG(K, D));
 emqx_collect(K = emqx_client_unsubscribe, D) -> counter_metrics(?MG(K, D));
 emqx_collect(K = emqx_client_disconnected, D) -> counter_metrics(?MG(K, D));
+emqx_collect(K = emqx_client_disconnected_reason, D) -> counter_metrics(?MG(K, D));
 %%--------------------------------------------------------------------
 %% Metrics - session
 emqx_collect(K = emqx_session_created, D) -> counter_metrics(?MG(K, D));
@@ -672,15 +673,52 @@ do_cluster_data(Labels) ->
 %%========================================
 
 emqx_metric_data(MetricNameTypeKeyL, Mode) ->
+    emqx_metric_data(MetricNameTypeKeyL, Mode, _Acc = #{}).
+
+emqx_metric_data(MetricNameTypeKeyL, Mode, Acc) ->
     Metrics = emqx_metrics:all(),
     lists:foldl(
-        fun({Name, _Type, MetricKAtom}, AccIn) ->
-            AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Metrics)}]}
+        fun
+            ({_Name, _Type, undefined}, AccIn) ->
+                AccIn;
+            ({Name, _Type, MetricKAtom}, AccIn) ->
+                AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Metrics)}]}
         end,
-        #{},
+        Acc,
         MetricNameTypeKeyL
     ).
 
+client_metric_data(Mode) ->
+    Acc = listener_shutdown_counts(Mode),
+    emqx_metric_data(client_metric_meta(), Mode, Acc).
+
+listener_shutdown_counts(Mode) ->
+    Data =
+        lists:flatmap(
+            fun(Listener) ->
+                get_listener_shutdown_counts_with_labels(Listener, Mode)
+            end,
+            emqx_listeners:list()
+        ),
+    #{emqx_client_disconnected_reason => Data}.
+
+get_listener_shutdown_counts_with_labels({Id, #{bind := Bind}}, Mode) ->
+    {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
+    AddLabels = fun({Reason, Count}) ->
+        Labels = [
+            {listener_type, Type},
+            {listener_name, Name},
+            {reason, Reason}
+        ],
+        {with_node_label(Mode, Labels), Count}
+    end,
+    case emqx_listeners:shutdown_count(Id, Bind) of
+        {error, _} ->
+            [];
+        Counts ->
+            lists:map(AddLabels, Counts)
+    end.
+
 %%==========
 %% Durable Storage
 maybe_add_ds_meta() ->
@@ -799,7 +837,8 @@ client_metric_meta() ->
         {emqx_client_authorize, counter, 'client.authorize'},
         {emqx_client_subscribe, counter, 'client.subscribe'},
         {emqx_client_unsubscribe, counter, 'client.unsubscribe'},
-        {emqx_client_disconnected, counter, 'client.disconnected'}
+        {emqx_client_disconnected, counter, 'client.disconnected'},
+        {emqx_client_disconnected_reason, counter, undefined}
     ].
 
 %%==========
@@ -1118,6 +1157,13 @@ collect_stats_json_data(StatsData, StatsClData) ->
 collect_cert_json_data(Data) ->
     collect_json_data_(Data).
 
+collect_client_json_data(Data0) ->
+    ShutdownCounts = maps:with([emqx_client_disconnected_reason], Data0),
+    Data = maps:without([emqx_client_disconnected_reason], Data0),
+    JSON0 = collect_json_data(Data),
+    JSON1 = collect_json_data_(ShutdownCounts),
+    lists:flatten([JSON0 | JSON1]).
+
 collect_vm_json_data(Data) ->
     DataListPerNode = collect_json_data_(Data),
     case ?GET_PROM_DATA_MODE() of
@@ -1153,9 +1199,9 @@ collect_json_data_(Data) ->
 
 zip_json_prom_stats_metrics(Key, Points, [] = _AccIn) ->
     lists:foldl(
-        fun({Lables, Metric}, AccIn2) ->
-            LablesKVMap = maps:from_list(Lables),
-            Point = LablesKVMap#{Key => Metric},
+        fun({Labels, Metric}, AccIn2) ->
+            LabelsKVMap = maps:from_list(Labels),
+            Point = LabelsKVMap#{Key => Metric},
             [Point | AccIn2]
         end,
         [],

+ 6 - 3
apps/emqx_prometheus/src/emqx_prometheus_cluster.erl

@@ -46,14 +46,17 @@ raw_data(Module, undefined) ->
     raw_data(Module, ?PROM_DATA_MODE__NODE);
 raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED = Mode) ->
     AllNodesMetrics = aggre_cluster(Module, Mode),
+    %% TODO: fix this typo
     Cluster = Module:fetch_cluster_consistented_data(),
     maps:merge(AllNodesMetrics, Cluster);
 raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED = Mode) ->
     AllNodesMetrics = zip_cluster_data(Module, Mode),
+    %% TODO: fix this typo
     Cluster = Module:fetch_cluster_consistented_data(),
     maps:merge(AllNodesMetrics, Cluster);
 raw_data(Module, ?PROM_DATA_MODE__NODE = Mode) ->
     {_Node, LocalNodeMetrics} = Module:fetch_from_local_node(Mode),
+    %% TODO: fix this typo
     Cluster = Module:fetch_cluster_consistented_data(),
     maps:merge(LocalNodeMetrics, Cluster).
 
@@ -172,9 +175,9 @@ do_zip_cluster(NodeMetrics, AccIn0) ->
     ).
 
 point_to_map_fun(Key) ->
-    fun({Lables, Metric}, AccIn2) ->
-        LablesKVMap = maps:from_list(Lables),
-        [maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
+    fun({Labels, Metric}, AccIn2) ->
+        LabelsKVMap = maps:from_list(Labels),
+        [maps:merge(LabelsKVMap, #{Key => Metric}) | AccIn2]
     end.
 
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 193 - 5
apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl

@@ -21,6 +21,8 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx_prometheus/include/emqx_prometheus.hrl").
 
 %%--------------------------------------------------------------------
 %% Setups
@@ -32,11 +34,16 @@ all() ->
     ].
 
 groups() ->
+    LegacyTCs = legacy_config_test_cases(),
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {new_config, [sequence], [t_stats_auth_api, t_stats_no_auth_api, t_prometheus_api]},
-        {legacy_config, [sequence], [t_stats_no_auth_api, t_legacy_prometheus_api]}
+        {new_config, [sequence], AllTCs -- LegacyTCs},
+        {legacy_config, [sequence], LegacyTCs}
     ].
 
+legacy_config_test_cases() ->
+    [t_stats_no_auth_api, t_legacy_prometheus_api].
+
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
         lists:flatten([
@@ -282,18 +289,183 @@ t_stats_no_auth_api(_) ->
             ok
     end,
     emqx_dashboard_listener:regenerate_minirest_dispatch(),
-    Headers = accept_josn_header(),
+    Headers = accept_json_header(),
     request_stats(Headers, []).
 
 t_stats_auth_api(_) ->
     {ok, _} = emqx:update_config([prometheus, enable_basic_auth], true),
     emqx_dashboard_listener:regenerate_minirest_dispatch(),
     Auth = emqx_mgmt_api_test_util:auth_header_(),
-    Headers = [Auth | accept_josn_header()],
+    Headers = [Auth | accept_json_header()],
     request_stats(Headers, Auth),
     ok.
 
-accept_josn_header() ->
+%% Simple smoke test for verifying reason code labels in `emqx_client_disconnected_reason'
+%% counter metric.
+t_listener_shutdown_count(_Config) ->
+    ClientId1 = <<"shutdown_count_test">>,
+    {ok, C1} = emqtt:start_link(#{clientid => ClientId1}),
+    {ok, _} = emqtt:connect(C1),
+    %% Takeover
+    unlink(C1),
+    {ok, C2} = emqtt:start_link(#{clientid => ClientId1}),
+    {ok, _} = emqtt:connect(C2),
+    %% Kick
+    unlink(C2),
+    ok = emqx_cm:kick_session(ClientId1),
+    %% Normal disconnect
+    ClientId2 = <<"shutdown_count_test2">>,
+    {ok, C3} = emqtt:start_link(#{clientid => ClientId2}),
+    {ok, _} = emqtt:connect(C3),
+    ok = emqtt:stop(C3),
+    %% Disconnect with reason code
+    {ok, C4} = emqtt:start_link(#{clientid => ClientId2}),
+    {ok, _} = emqtt:connect(C4),
+    ok = emqtt:disconnect(C4, ?RC_IMPLEMENTATION_SPECIFIC_ERROR),
+    OnlyDisconnectStats = fun(Stats0) ->
+        Stats = lists:filter(
+            fun
+                (#{<<"emqx_client_disconnected_reason">> := _}) ->
+                    true;
+                (_) ->
+                    false
+            end,
+            Stats0
+        ),
+        lists:sort(Stats)
+    end,
+    #{<<"client">> := JSONClientStatsNode} = get_stats(json, ?PROM_DATA_MODE__NODE),
+    ?assertEqual(
+        [
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"discarded">>
+            },
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"kicked">>
+            },
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"tcp_closed">>
+            }
+        ],
+        OnlyDisconnectStats(JSONClientStatsNode)
+    ),
+    #{<<"client">> := JSONClientStatsAgg} = get_stats(json, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED),
+    ?assertEqual(
+        [
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"discarded">>
+            },
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"kicked">>
+            },
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"tcp_closed">>
+            }
+        ],
+        OnlyDisconnectStats(JSONClientStatsAgg)
+    ),
+    #{<<"client">> := JSONClientStatsUnagg} = get_stats(
+        json, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED
+    ),
+    NodeBin = atom_to_binary(node()),
+    ?assertEqual(
+        [
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"node">> => NodeBin,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"discarded">>
+            },
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"node">> => NodeBin,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"kicked">>
+            },
+            #{
+                <<"emqx_client_disconnected_reason">> => 1,
+                <<"node">> => NodeBin,
+                <<"listener_type">> => <<"tcp">>,
+                <<"listener_name">> => <<"default">>,
+                <<"reason">> => <<"tcp_closed">>
+            }
+        ],
+        OnlyDisconnectStats(JSONClientStatsUnagg)
+    ),
+    AssertExpectedLines = fun(ExpectedLines, Output) ->
+        lists:foreach(
+            fun(ExpectedLine) ->
+                ?assertEqual(
+                    match,
+                    re:run(Output, ExpectedLine, [global, {capture, none}]),
+                    #{
+                        expected => ExpectedLine,
+                        output => string:split(Output, <<"\n">>, all)
+                    }
+                )
+            end,
+            ExpectedLines
+        )
+    end,
+    PromClientStatsNode = get_stats(prometheus, ?PROM_DATA_MODE__NODE),
+    ExpectedLines1 = [
+        iolist_to_binary(
+            io_lib:format(
+                "emqx_client_disconnected_reason{"
+                "listener_type=\"tcp\",listener_name=\"default\","
+                "reason=\"~s\"} ~b",
+                [Reason, N]
+            )
+        )
+     || {Reason, N} <- [
+            {"discarded", 1},
+            {"kicked", 1},
+            {"tcp_closed", 1}
+        ]
+    ],
+    AssertExpectedLines(ExpectedLines1, PromClientStatsNode),
+    PromClientStatsAgg = get_stats(prometheus, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED),
+    AssertExpectedLines(ExpectedLines1, PromClientStatsAgg),
+    PromClientStatsUnagg = get_stats(prometheus, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED),
+    ExpectedLines2 = [
+        iolist_to_binary(
+            io_lib:format(
+                "emqx_client_disconnected_reason{"
+                "node=\"~s\",listener_type=\"tcp\",listener_name=\"default\","
+                "reason=\"~s\"} ~b",
+                [NodeBin, Reason, N]
+            )
+        )
+     || {Reason, N} <- [
+            {"discarded", 1},
+            {"kicked", 1},
+            {"tcp_closed", 1}
+        ]
+    ],
+    AssertExpectedLines(ExpectedLines2, PromClientStatsUnagg),
+    ok.
+
+accept_json_header() ->
     [{"accept", "application/json"}].
 
 request_stats(Headers, Auth) ->
@@ -321,3 +493,19 @@ do_env_collectors([Collector | Rest], Acc) when is_atom(Collector) ->
 
 all_collectors() ->
     emqx_prometheus_config:all_collectors().
+
+get_stats(Format, Mode) ->
+    Headers =
+        case Format of
+            json -> accept_json_header();
+            prometheus -> []
+        end,
+    QueryString = uri_string:compose_query([{"mode", atom_to_binary(Mode)}]),
+    Path = emqx_mgmt_api_test_util:api_path(["prometheus", "stats"]),
+    {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, QueryString, Headers),
+    case Format of
+        json ->
+            emqx_utils_json:decode(Response, [return_maps]);
+        prometheus ->
+            Response
+    end.

+ 20 - 3
apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl

@@ -441,10 +441,27 @@ assert_json_data__olp(M, Mode) when
 assert_json_data__olp(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when is_list(Ms) ->
     ok.
 
-assert_json_data__client(M, Mode) when
-    (Mode =:= ?PROM_DATA_MODE__NODE orelse
-        Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
+assert_json_data__client(Ms, Mode) when
+    (Mode =:= ?PROM_DATA_MODE__NODE orelse Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) andalso
+        is_list(Ms)
 ->
+    ?assertMatch(
+        [
+            #{
+                emqx_client_connect := _,
+                emqx_client_connack := _,
+                emqx_client_connected := _,
+                emqx_client_authenticate := _,
+                emqx_client_auth_anonymous := _,
+                emqx_client_authorize := _,
+                emqx_client_subscribe := _,
+                emqx_client_unsubscribe := _,
+                emqx_client_disconnected := _
+            }
+        ],
+        Ms
+    );
+assert_json_data__client(#{} = M, ?PROM_DATA_MODE__NODE) ->
     ?assertMatch(
         #{
             emqx_client_connect := _,

+ 1 - 0
changes/ce/breaking-14360.en.md

@@ -0,0 +1 @@
+When requesting Prometheus metrics in JSON format, now the `client` top-level key will always be an array of JSON objects instead of a single JSON object.

+ 8 - 0
changes/ce/feat-14360.en.md

@@ -0,0 +1,8 @@
+Added listener shutdown counts labeled by shutdown reason to Prometheus metrics, under the `emqx_client_disconnected_reason` counters.
+
+Example output:
+
+```
+emqx_client_disconnected_reason{node="emqx@127.0.0.1",reason="takenover"} 1
+emqx_client_disconnected_reason{node="emqx@127.0.0.1",reason="kicked"} 1
+```