Jelajahi Sumber

Merge pull request #6776 from DDDHuang/client_metrics_5x

feat(metrics): client metrics with more detail
DDDHuang 4 tahun lalu
induk
melakukan
e7dd401de9

+ 56 - 4
apps/emqx/src/emqx_connection.erl

@@ -123,9 +123,37 @@
 -type cache() :: #cache{}.
 -type cache() :: #cache{}.
 
 
 -define(ACTIVE_N, 100).
 -define(ACTIVE_N, 100).
--define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
--define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
--define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
+
+-define(INFO_KEYS,  [ socktype
+                    , peername
+                    , sockname
+                    , sockstate
+                    ]).
+
+-define(CONN_STATS, [ recv_pkt
+                    , recv_msg
+                    , 'recv_msg.qos0'
+                    , 'recv_msg.qos1'
+                    , 'recv_msg.qos2'
+                    , 'recv_msg.dropped'
+                    , 'recv_msg.dropped.expired'
+                    , send_pkt
+                    , send_msg
+                    , 'send_msg.qos0'
+                    , 'send_msg.qos1'
+                    , 'send_msg.qos2'
+                    , 'send_msg.dropped'
+                    , 'send_msg.dropped.expired'
+                    , 'send_msg.dropped.queue_full'
+                    , 'send_msg.dropped.too_large'
+                    ]).
+
+-define(SOCK_STATS, [ recv_oct
+                    , recv_cnt
+                    , send_oct
+                    , send_cnt
+                    , send_pend
+                    ]).
 
 
 -define(ENABLED(X), (X =/= undefined)).
 -define(ENABLED(X), (X =/= undefined)).
 
 
@@ -758,6 +786,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
                     }),
                     }),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
                     ok = emqx_metrics:inc('delivery.dropped'),
                     ok = emqx_metrics:inc('delivery.dropped'),
+                    ok = inc_outgoing_stats({error, message_too_large}),
                     <<>>;
                     <<>>;
             Data ->
             Data ->
                     ?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}),
                     ?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}),
@@ -985,6 +1014,7 @@ inc_incoming_stats(Packet = ?PACKET(Type)) ->
     case Type =:= ?PUBLISH of
     case Type =:= ?PUBLISH of
         true ->
         true ->
             inc_counter(recv_msg, 1),
             inc_counter(recv_msg, 1),
+            inc_qos_stats(recv_msg, Packet),
             inc_counter(incoming_pubs, 1);
             inc_counter(incoming_pubs, 1);
         false ->
         false ->
             ok
             ok
@@ -992,17 +1022,39 @@ inc_incoming_stats(Packet = ?PACKET(Type)) ->
     emqx_metrics:inc_recv(Packet).
     emqx_metrics:inc_recv(Packet).
 
 
 -compile({inline, [inc_outgoing_stats/1]}).
 -compile({inline, [inc_outgoing_stats/1]}).
+inc_outgoing_stats({error, message_too_large}) ->
+    inc_counter('send_msg.dropped', 1),
+    inc_counter('send_msg.dropped.too_large', 1);
 inc_outgoing_stats(Packet = ?PACKET(Type)) ->
 inc_outgoing_stats(Packet = ?PACKET(Type)) ->
     inc_counter(send_pkt, 1),
     inc_counter(send_pkt, 1),
     case Type =:= ?PUBLISH of
     case Type =:= ?PUBLISH of
         true ->
         true ->
             inc_counter(send_msg, 1),
             inc_counter(send_msg, 1),
-            inc_counter(outgoing_pubs, 1);
+            inc_counter(outgoing_pubs, 1),
+            inc_qos_stats(send_msg, Packet);
         false ->
         false ->
             ok
             ok
     end,
     end,
     emqx_metrics:inc_sent(Packet).
     emqx_metrics:inc_sent(Packet).
 
 
+inc_qos_stats(Type, Packet) ->
+    case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
+        undefined ->
+            ignore;
+        Key ->
+            inc_counter(Key, 1)
+    end.
+
+inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
+inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
+inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
+
+inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
+inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
+inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
+%% for bad qos
+inc_qos_stats_key(_, _) -> undefined.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Helper functions
 %% Helper functions
 
 

+ 15 - 1
apps/emqx/src/emqx_session.erl

@@ -329,7 +329,8 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts,
         {ok, SubOpts} ->
         {ok, SubOpts} ->
             ok = emqx_broker:unsubscribe(TopicFilter),
             ok = emqx_broker:unsubscribe(TopicFilter),
             ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS),
             ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS),
-            ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
+            ok = emqx_hooks:run('session.unsubscribed',
+                     [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
             {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
             {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
         error ->
         error ->
             {error, ?RC_NO_SUBSCRIPTION_EXISTED}
             {error, ?RC_NO_SUBSCRIPTION_EXISTED}
@@ -541,11 +542,14 @@ log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
     case (QoS == ?QOS_0) andalso (not StoreQos0) of
     case (QoS == ?QOS_0) andalso (not StoreQos0) of
         true  ->
         true  ->
             ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
             ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
+            ok = inc_pd('send_msg.dropped'),
             ?SLOG(warning, #{msg => "dropped_qos0_msg",
             ?SLOG(warning, #{msg => "dropped_qos0_msg",
                              queue => QueueInfo,
                              queue => QueueInfo,
                              payload => Payload}, #{topic => Topic});
                              payload => Payload}, #{topic => Topic});
         false ->
         false ->
             ok = emqx_metrics:inc('delivery.dropped.queue_full'),
             ok = emqx_metrics:inc('delivery.dropped.queue_full'),
+            ok = inc_pd('send_msg.dropped'),
+            ok = inc_pd('send_msg.dropped.queue_full'),
             ?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full",
             ?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full",
                              queue => QueueInfo,
                              queue => QueueInfo,
                              payload => Payload}, #{topic => Topic})
                              payload => Payload}, #{topic => Topic})
@@ -723,13 +727,23 @@ run_hook(Name, Args) ->
 inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
 inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
 
 
 inc_expired_cnt(delivery, N) ->
 inc_expired_cnt(delivery, N) ->
+    ok = inc_pd('send_msg.dropped', N),
+    ok = inc_pd('send_msg.dropped.expired', N),
     ok = emqx_metrics:inc('delivery.dropped', N),
     ok = emqx_metrics:inc('delivery.dropped', N),
     emqx_metrics:inc('delivery.dropped.expired', N);
     emqx_metrics:inc('delivery.dropped.expired', N);
 
 
 inc_expired_cnt(message, N) ->
 inc_expired_cnt(message, N) ->
+    ok = inc_pd('recv_msg.dropped', N),
+    ok = inc_pd('recv_msg.dropped.expired', N),
     ok = emqx_metrics:inc('messages.dropped', N),
     ok = emqx_metrics:inc('messages.dropped', N),
     emqx_metrics:inc('messages.dropped.expired', N).
     emqx_metrics:inc('messages.dropped.expired', N).
 
 
+inc_pd(Key) ->
+    inc_pd(Key, 1).
+inc_pd(Key, Inc) ->
+    _ = emqx_pd:inc_counter(Key, Inc),
+    ok.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Next Packet Id
 %% Next Packet Id
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 24 - 0
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -146,6 +146,16 @@ properties(client) ->
      <<"Number of TCP packets received">>},
      <<"Number of TCP packets received">>},
     {recv_msg,          integer,
     {recv_msg,          integer,
      <<"Number of PUBLISH packets received">>},
      <<"Number of PUBLISH packets received">>},
+    {'recv_msg.qos0',   integer,
+     <<"Number of PUBLISH QoS0 packets received">>},
+    {'recv_msg.qos1',   integer,
+     <<"Number of PUBLISH QoS1 packets received">>},
+    {'recv_msg.qos2',   integer,
+     <<"Number of PUBLISH QoS2 packets received">>},
+    {'recv_msg.dropped', integer,
+     <<"Number of dropped PUBLISH packets">>},
+    {'recv_msg.dropped.expired', integer,
+     <<"Number of dropped PUBLISH packets due to expired">>},
     {recv_oct,          integer,
     {recv_oct,          integer,
      <<"Number of bytes received by EMQ X Broker (the same below)">>},
      <<"Number of bytes received by EMQ X Broker (the same below)">>},
     {recv_pkt,          integer,
     {recv_pkt,          integer,
@@ -156,6 +166,20 @@ properties(client) ->
      <<"Number of TCP packets sent">>},
      <<"Number of TCP packets sent">>},
     {send_msg,          integer,
     {send_msg,          integer,
      <<"Number of PUBLISH packets sent">>},
      <<"Number of PUBLISH packets sent">>},
+    {'send_msg.qos0',   integer,
+     <<"Number of PUBLISH QoS0 packets sent">>},
+    {'send_msg.qos1',   integer,
+     <<"Number of PUBLISH QoS1 packets sent">>},
+    {'send_msg.qos2',   integer,
+     <<"Number of PUBLISH QoS2 packets sent">>},
+    {'send_msg.dropped', integer,
+     <<"Number of dropped PUBLISH packets">>},
+    {'send_msg.dropped.expired', integer,
+     <<"Number of dropped PUBLISH packets due to expired">>},
+    {'send_msg.dropped.queue_full', integer,
+     <<"Number of dropped PUBLISH packets due to queue full">>},
+    {'send_msg.dropped.too_large', integer,
+     <<"Number of dropped PUBLISH packets due to packet length too large">>},
     {send_oct,          integer,
     {send_oct,          integer,
      <<"Number of bytes sent">>},
      <<"Number of bytes sent">>},
     {send_pkt,          integer,
     {send_pkt,          integer,