Explorar o código

fix(congestion): change the conn congestion alarm msg body

Shawn %!s(int64=5) %!d(string=hai) anos
pai
achega
0ee489a9be
Modificáronse 2 ficheiros con 39 adicións e 4 borrados
  1. 14 0
      src/emqx_channel.erl
  2. 25 4
      src/emqx_connection.erl

+ 14 - 0
src/emqx_channel.erl

@@ -131,6 +131,20 @@ info(zone, #channel{clientinfo = #{zone := Zone}}) ->
     Zone;
 info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
     ClientId;
+info(username, #channel{clientinfo = #{username := Username}}) ->
+    Username;
+info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
+    SockType;
+info(peername, #channel{conninfo = #{peername := Peername}}) ->
+    Peername;
+info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
+    Sockname;
+info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
+    ProtoName;
+info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
+    ProtoVer;
+info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
+    ConnectedAt;
 info(clientinfo, #channel{clientinfo = ClientInfo}) ->
     ClientInfo;
 info(session, #channel{session = Session}) ->

+ 25 - 4
src/emqx_connection.erl

@@ -104,7 +104,16 @@
 -define(ENABLED(X), (X =/= undefined)).
 
 -define(ALARM_TCP_CONGEST(Channel),
-         list_to_binary(io_lib:format("mqtt_conn/congested/~s", [emqx_channel:info(clientid, Channel)]))).
+        list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s",
+            [emqx_channel:info(clientid, Channel),
+             emqx_channel:info(username, Channel)]))).
+
+-define(ALARM_CONN_INFO_KEYS, [
+    socktype, sockname, peername,
+    clientid, username, proto_name, proto_ver, connected_at
+]).
+-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
+-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
 
 -dialyzer({no_match, [info/2]}).
 -dialyzer({nowarn_function, [ init/4
@@ -616,10 +625,9 @@ maybe_warn_congestion(Socket, Transport, Channel) ->
     IsCongestAlarmSet = is_congestion_alarm_set(),
     case is_congested(Socket, Transport) of
         true when not IsCongestAlarmSet ->
-            {ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]),
-            {ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]),
             ok = set_congestion_alarm(),
-            emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts));
+            emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel),
+                tcp_congestion_alarm_details(Socket, Transport, Channel));
         false when IsCongestAlarmSet ->
             ok = clear_congestion_alarm(),
             emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
@@ -642,6 +650,19 @@ set_congestion_alarm() ->
 clear_congestion_alarm() ->
     erlang:put(conn_congested, false), ok.
 
+tcp_congestion_alarm_details(Socket, Transport, Channel) ->
+    {ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS),
+    {ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS),
+    SockInfo = maps:from_list(Stat ++ Opts),
+    ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]),
+    maps:merge(ConnInfo, SockInfo).
+
+conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
+    {IPStr, Port} = emqx_channel:info(Key, Channel),
+    {Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])};
+conn_info(Key, Channel) ->
+    {Key, emqx_channel:info(Key, Channel)}.
+
 %%--------------------------------------------------------------------
 %% Handle Info