Jelajahi Sumber

feat(metrics): ws client support more send&recv metrics

DDDHuang 4 tahun lalu
induk
melakukan
0826084ce9

+ 28 - 3
apps/emqx/src/emqx_channel.erl

@@ -119,7 +119,33 @@
           quota_timer  => expire_quota_limit
          }).
 
--define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
+-define(CHANNEL_METRICS,
+    [ recv_pkt
+    , recv_msg
+    , 'recv_msg.qos0'
+    , 'recv_msg.qos1'
+    , 'recv_msg.qos2'
+    , 'recv_msg.dropped'
+    , 'recv_msg.dropped.await_pubrel_timeout'
+    , send_pkt
+    , send_msg
+    , 'send_msg.qos0'
+    , 'send_msg.qos1'
+    , 'send_msg.qos2'
+    , 'send_msg.dropped'
+    , 'send_msg.dropped.expired'
+    , 'send_msg.dropped.queue_full'
+    , 'send_msg.dropped.too_large'
+    ]).
+
+-define(INFO_KEYS,
+    [ conninfo
+    , conn_state
+    , clientinfo
+    , session
+    , will_msg
+    ]).
+
 -define(LIMITER_ROUTING, message_routing).
 
 -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@@ -184,10 +210,9 @@ set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = Client
     Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
     Channel#channel{session = Session1}.
 
-%% TODO: Add more stats.
 -spec(stats(channel()) -> emqx_types:stats()).
 stats(#channel{session = Session})->
-    emqx_session:stats(Session).
+    lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)).
 
 -spec(caps(channel()) -> emqx_types:caps()).
 caps(#channel{clientinfo = #{zone := Zone}}) ->

+ 4 - 24
apps/emqx/src/emqx_connection.erl

@@ -131,25 +131,6 @@
     , sockstate
     ]).
 
--define(CONN_STATS,
-    [ recv_pkt
-    , recv_msg
-    , 'recv_msg.qos0'
-    , 'recv_msg.qos1'
-    , 'recv_msg.qos2'
-    , 'recv_msg.dropped'
-    , 'recv_msg.dropped.await_pubrel_timeout'
-    , send_pkt
-    , send_msg
-    , 'send_msg.qos0'
-    , 'send_msg.qos1'
-    , 'send_msg.qos2'
-    , 'send_msg.dropped'
-    , 'send_msg.dropped.expired'
-    , 'send_msg.dropped.queue_full'
-    , 'send_msg.dropped.too_large'
-    ]).
-
 -define(SOCK_STATS,
     [ recv_oct
     , recv_cnt
@@ -236,10 +217,9 @@ stats(#state{transport = Transport,
                     {ok, Ss}   -> Ss;
                     {error, _} -> []
                 end,
-    ConnStats = emqx_pd:get_counters(?CONN_STATS),
     ChanStats = emqx_channel:stats(Channel),
     ProcStats = emqx_misc:proc_stats(),
-    lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
+    lists:append([SockStats, ChanStats, ProcStats]).
 
 %% @doc Set TCP keepalive socket options to override system defaults.
 %% Idle: The number of seconds a connection needs to be idle before
@@ -1030,12 +1010,12 @@ inc_outgoing_stats({error, message_too_large}) ->
     inc_counter('send_msg.dropped.too_large', 1);
 inc_outgoing_stats(Packet = ?PACKET(Type)) ->
     inc_counter(send_pkt, 1),
-    case Type =:= ?PUBLISH of
-        true ->
+    case Type of
+        ?PUBLISH ->
             inc_counter(send_msg, 1),
             inc_counter(outgoing_pubs, 1),
             inc_qos_stats(send_msg, Packet);
-        false ->
+        _ ->
             ok
     end,
     emqx_metrics:inc_sent(Packet).

+ 39 - 12
apps/emqx/src/emqx_ws_connection.erl

@@ -112,7 +112,6 @@
 -define(ACTIVE_N, 100).
 -define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
--define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 
 -define(ENABLED(X), (X =/= undefined)).
 -define(LIMITER_BYTES_IN, bytes_in).
@@ -163,10 +162,9 @@ stats(WsPid) when is_pid(WsPid) ->
     call(WsPid, stats);
 stats(#state{channel = Channel}) ->
     SockStats = emqx_pd:get_counters(?SOCK_STATS),
-    ConnStats = emqx_pd:get_counters(?CONN_STATS),
     ChanStats = emqx_channel:stats(Channel),
     ProcStats = emqx_misc:proc_stats(),
-    lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
+    lists:append([SockStats, ChanStats, ProcStats]).
 
 %% kick|discard|takeover
 -spec(call(pid(), Req :: term()) -> Reply :: term()).
@@ -725,6 +723,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
                                      packet => emqx_packet:format(Packet)}),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
                     ok = emqx_metrics:inc('delivery.dropped'),
+                    ok = inc_outgoing_stats({error, message_too_large}),
                     <<>>;
             Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}),
                     ok = inc_outgoing_stats(Packet),
@@ -762,19 +761,28 @@ inc_recv_stats(Cnt, Oct) ->
 
 inc_incoming_stats(Packet = ?PACKET(Type)) ->
     _ = emqx_pd:inc_counter(recv_pkt, 1),
-    if Type == ?PUBLISH ->
-           inc_counter(recv_msg, 1),
-           inc_counter(incoming_pubs, 1);
-       true -> ok
+    case Type of
+        ?PUBLISH ->
+            inc_counter(recv_msg, 1),
+            inc_qos_stats(recv_msg, Packet),
+            inc_counter(incoming_pubs, 1);
+        _ ->
+            ok
     end,
     emqx_metrics:inc_recv(Packet).
 
+inc_outgoing_stats({error, message_too_large}) ->
+    inc_counter('send_msg.dropped', 1),
+    inc_counter('send_msg.dropped.too_large', 1);
 inc_outgoing_stats(Packet = ?PACKET(Type)) ->
-    _ = emqx_pd:inc_counter(send_pkt, 1),
-    if Type == ?PUBLISH ->
-           inc_counter(send_msg, 1),
-           inc_counter(outgoing_pubs, 1);
-       true -> ok
+    inc_counter(send_pkt, 1),
+    case Type of
+        ?PUBLISH ->
+            inc_counter(send_msg, 1),
+            inc_counter(outgoing_pubs, 1),
+            inc_qos_stats(send_msg, Packet);
+        _ ->
+            ok
     end,
     emqx_metrics:inc_sent(Packet).
 
@@ -787,6 +795,25 @@ inc_sent_stats(Cnt, Oct) ->
 inc_counter(Name, Value) ->
     _ = emqx_pd:inc_counter(Name, Value),
     ok.
+
+inc_qos_stats(Type, Packet) ->
+    case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
+        undefined ->
+            ignore;
+        Key ->
+            inc_counter(Key, 1)
+    end.
+
+inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
+inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
+inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
+
+inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
+inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
+inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
+%% for bad qos
+inc_qos_stats_key(_, _) -> undefined.
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------