Feng 9 лет назад
Родитель
Сommit
700ec7aaef
1 измененных файлов с 34 добавлено и 20 удалено
  1. 34 20
      src/emqttd_protocol.erl

+ 34 - 20
src/emqttd_protocol.erl

@@ -35,12 +35,14 @@
 
 
 -export([process/2]).
 -export([process/2]).
 
 
+-record(proto_stats, {recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0}).
+
 %% Protocol State
 %% Protocol State
 -record(proto_state, {peername, sendfun, connected = false,
 -record(proto_state, {peername, sendfun, connected = false,
                       client_id, client_pid, clean_sess,
                       client_id, client_pid, clean_sess,
                       proto_ver, proto_name, username, is_superuser = false,
                       proto_ver, proto_name, username, is_superuser = false,
                       will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
                       will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
-                      session, ws_initial_headers, %% Headers from first HTTP request for websocket client
+                      session, stats, ws_initial_headers, %% Headers from first HTTP request for websocket client
                       connected_at}).
                       connected_at}).
 
 
 -type(proto_state() :: #proto_state{}).
 -type(proto_state() :: #proto_state{}).
@@ -56,20 +58,20 @@
 
 
 %% @doc Init protocol
 %% @doc Init protocol
 init(Peername, SendFun, Opts) ->
 init(Peername, SendFun, Opts) ->
-    lists:foreach(fun(K) -> put(K, 0) end, ?STATS_KEYS),
     MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
     MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
     WsInitialHeaders = get_value(ws_initial_headers, Opts),
     WsInitialHeaders = get_value(ws_initial_headers, Opts),
     #proto_state{peername           = Peername,
     #proto_state{peername           = Peername,
                  sendfun            = SendFun,
                  sendfun            = SendFun,
                  max_clientid_len   = MaxLen,
                  max_clientid_len   = MaxLen,
                  client_pid         = self(),
                  client_pid         = self(),
+                 stats              = #proto_stats{},
                  ws_initial_headers = WsInitialHeaders}.
                  ws_initial_headers = WsInitialHeaders}.
 
 
 info(ProtoState) ->
 info(ProtoState) ->
     ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
     ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
 
 
-stats(_ProtoState) ->
-    [{K, get(K)} || K <- ?STATS_KEYS].
+stats(#proto_state{stats = Stats}) ->
+    ?record_to_proplist(proto_stats, Stats).
 
 
 clientid(#proto_state{client_id = ClientId}) ->
 clientid(#proto_state{client_id = ClientId}) ->
     ClientId.
     ClientId.
@@ -106,8 +108,10 @@ session(#proto_state{session = Session}) ->
 
 
 %% A Client can only send the CONNECT Packet once over a Network Connection. 
 %% A Client can only send the CONNECT Packet once over a Network Connection. 
 -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
 -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
-received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
-    process(Packet, State#proto_state{connected = true});
+received(Packet = ?PACKET(?CONNECT),
+         State = #proto_state{connected = false, stats = Stats}) ->
+    trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats),
+    process(Packet, State#proto_state{connected = true, stats = Stats1});
 
 
 received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
 received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
     {error, protocol_bad_connect, State};
     {error, protocol_bad_connect, State};
@@ -116,11 +120,11 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
 received(_Packet, State = #proto_state{connected = false}) ->
 received(_Packet, State = #proto_state{connected = false}) ->
     {error, protocol_not_connected, State};
     {error, protocol_not_connected, State};
 
 
-received(Packet = ?PACKET(_Type), State) ->
-    trace(recv, Packet, State),
+received(Packet = ?PACKET(Type), State = #proto_state{stats = Stats}) ->
+    trace(recv, Packet, State), Stats1 = inc_stats(recv, Type, Stats),
     case validate_packet(Packet) of
     case validate_packet(Packet) of
         ok ->
         ok ->
-            process(Packet, State);
+            process(Packet, State#proto_state{stats = Stats1});
         {error, Reason} ->
         {error, Reason} ->
             {error, Reason, State}
             {error, Reason, State}
     end.
     end.
@@ -151,7 +155,7 @@ unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId,
 %% @doc Send PUBREL
 %% @doc Send PUBREL
 pubrel(PacketId, State) -> send(?PUBREL_PACKET(PacketId), State).
 pubrel(PacketId, State) -> send(?PUBREL_PACKET(PacketId), State).
 
 
-process(Packet = ?CONNECT_PACKET(Var), State0) ->
+process(?CONNECT_PACKET(Var), State0) ->
 
 
     #mqtt_packet_connect{proto_ver  = ProtoVer,
     #mqtt_packet_connect{proto_ver  = ProtoVer,
                          proto_name = ProtoName,
                          proto_name = ProtoName,
@@ -170,8 +174,6 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
                                 will_msg   = willmsg(Var),
                                 will_msg   = willmsg(Var),
                                 connected_at = os:timestamp()},
                                 connected_at = os:timestamp()},
 
 
-    trace(recv, Packet, State1),
-
     {ReturnCode1, SessPresent, State3} =
     {ReturnCode1, SessPresent, State3} =
     case validate_connect(Var, State1) of
     case validate_connect(Var, State1) of
         ?CONNACK_ACCEPT ->
         ?CONNACK_ACCEPT ->
@@ -312,22 +314,34 @@ send(Msg, State = #proto_state{client_id = ClientId, username = Username})
     emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
     emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
     send(emqttd_message:to_packet(Msg), State);
     send(emqttd_message:to_packet(Msg), State);
 
 
-send(Packet, State = #proto_state{sendfun = SendFun})
-    when is_record(Packet, mqtt_packet) ->
+send(Packet = ?PACKET(Type),
+     State = #proto_state{sendfun = SendFun, stats = Stats}) ->
     trace(send, Packet, State),
     trace(send, Packet, State),
     emqttd_metrics:sent(Packet),
     emqttd_metrics:sent(Packet),
     SendFun(Packet),
     SendFun(Packet),
-    {ok, State}.
+    Stats1 = inc_stats(send, Type, Stats),
+    {ok, State#proto_state{stats = Stats1}}.
 
 
-trace(recv, Packet = ?PACKET(Type), ProtoState) ->
-    inc(recv_pkt), ?IF(Type =:= ?PUBLISH, inc(recv_msg), ok),
+trace(recv, Packet, ProtoState) ->
     ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
     ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
 
 
-trace(send, Packet = ?PACKET(Type), ProtoState) ->
-    inc(send_pkt), ?IF(Type =:= ?PUBLISH, inc(send_msg), ok),
+trace(send, Packet, ProtoState) ->
     ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
     ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
 
 
-inc(Key) -> put(Key, get(Key) + 1).
+inc_stats(recv, Type, Stats) ->
+    #proto_stats{recv_pkt = Pkt, recv_msg = Msg} = Stats,
+    inc_stats(Type, #proto_stats.recv_pkt, Pkt, #proto_stats.recv_msg, Msg, Stats);
+
+inc_stats(send, Type, Stats) ->
+    #proto_stats{send_pkt = Pkt, send_msg = Msg} = Stats,
+    inc_stats(Type, #proto_stats.send_pkt, Pkt, #proto_stats.send_msg, Msg, Stats).
+
+inc_stats(Type, PktPos, PktCnt, MsgPos, MsgCnt, Stats) ->
+    Stats1 = setelement(PktPos, Stats, PktCnt + 1),
+    case Type =:= ?PUBLISH of
+        true  -> setelement(MsgPos, Stats1, MsgCnt + 1);
+        false -> Stats1
+    end.
 
 
 stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH ->
 stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH ->
     {stop, {shutdown, auth_failure}, State};
     {stop, {shutdown, auth_failure}, State};