Jelajahi Sumber

Merge pull request #342 from emqtt/dev-feng

0.12.1
Feng Lee 10 tahun lalu
induk
melakukan
07c0987b48
4 mengubah file dengan 76 tambahan dan 41 penghapusan
  1. 20 0
      CHANGELOG.md
  2. 9 9
      include/emqttd_protocol.hrl
  3. 44 29
      src/emqttd_metrics.erl
  4. 3 3
      src/emqttd_protocol.erl

+ 20 - 0
CHANGELOG.md

@@ -2,6 +2,26 @@
 emqttd ChangeLog
 ==================
 
+0.12.1-beta (2015-10-15)
+-------------------------
+
+Highlight: Release for Bugfix and Code Refactor.
+
+Feature: Retained message expiration (#182)
+
+Improve: '$SYS/#' publish will not match '#' or '+/#' (#68)
+
+Improve: Add more metrics and ignore '$SYS/#' publish (#266)
+
+Improve: emqttd_sm should be optimized for clustered nodes may be crashed (#282)
+
+Improve: Refactor emqttd_sysmon and suppress 'monitor' messages (#328)
+
+Task: benchmark for 0.12.0 release (#225)
+
+Benchmark: About 900K concurrent connections established on a 20Core, 32G CentOS server.
+
+
 0.12.0-beta (2015-10-08)
 -------------------------
 

+ 9 - 9
include/emqttd_protocol.hrl

@@ -209,20 +209,20 @@
     #mqtt_packet{header   = #mqtt_packet_header{type = ?CONNACK},
                  variable = #mqtt_packet_connack{return_code = ReturnCode}}).
 
+-define(PUBLISH_PACKET(Qos, PacketId),
+    #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBLISH,
+                                                qos = Qos},
+                 variable = #mqtt_packet_publish{packet_id = PacketId}}).
+
 -define(PUBLISH_PACKET(Qos, Topic, PacketId, Payload),
-    #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
-                                              qos = Qos},
+    #mqtt_packet{header   = #mqtt_packet_header{type = ?PUBLISH,
+                                                qos = Qos},
                  variable = #mqtt_packet_publish{topic_name = Topic,
                                                  packet_id  = PacketId},
-                 payload = Payload}).
-
--define(PUBLISH(Qos, PacketId),
-    #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
-                                              qos = Qos},
-                 variable = #mqtt_packet_publish{packet_id  = PacketId}}).
+                 payload  = Payload}).
 
 -define(PUBACK_PACKET(Type, PacketId),
-    #mqtt_packet{header = #mqtt_packet_header{type = Type},
+    #mqtt_packet{header   = #mqtt_packet_header{type = Type},
                  variable = #mqtt_packet_puback{packet_id = PacketId}}).
 
 -define(PUBREL_PACKET(PacketId),

+ 44 - 29
src/emqttd_metrics.erl

@@ -113,33 +113,40 @@
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
-received(Packet = ?PACKET(Type)) ->
+%%------------------------------------------------------------------------------
+%% @doc Count packets received.
+%% @end
+%%------------------------------------------------------------------------------
+-spec received(mqtt_packet()) -> ok.
+received(Packet) ->
     inc('packets/received'),
-    received(Type, Packet).
-received(?CONNECT, _Packet) ->
-    inc('packets/connect');
-received(?PUBLISH, ?PUBLISH(Qos, _PktId)) ->
+    received1(Packet).
+received1(?PUBLISH_PACKET(Qos, _PktId)) ->
     inc('packets/publish/received'),
     inc('messages/received'),
     qos_received(Qos);
-received(?PUBACK, _Packet) ->  
+received1(?PACKET(Type)) ->
+    received2(Type).
+received2(?CONNECT) ->
+    inc('packets/connect');
+received2(?PUBACK) ->
     inc('packets/puback/received');
-received(?PUBREC, _Packet) ->  
+received2(?PUBREC) ->
     inc('packets/pubrec/received');
-received(?PUBREL, _Packet) ->
+received2(?PUBREL) ->
     inc('packets/pubrel/received');
-received(?PUBCOMP, _Packet) ->
+received2(?PUBCOMP) ->
     inc('packets/pubcomp/received');
-received(?SUBSCRIBE, _Packet) ->
+received2(?SUBSCRIBE) ->
     inc('packets/subscribe');
-received(?UNSUBSCRIBE, _Packet) ->
+received2(?UNSUBSCRIBE) ->
     inc('packets/unsubscribe');
-received(?PINGREQ, _Packet) ->
+received2(?PINGREQ) ->
     inc('packets/pingreq');
-received(?DISCONNECT, _Packet) ->
+received2(?DISCONNECT) ->
     inc('packets/disconnect');
-received(_, _) -> ignore.
-
+received2(_) ->
+    ignore.
 qos_received(?QOS_0) ->
     inc('messages/qos0/received');
 qos_received(?QOS_1) ->
@@ -147,32 +154,40 @@ qos_received(?QOS_1) ->
 qos_received(?QOS_2) ->
     inc('messages/qos2/received').
 
-sent(Packet = ?PACKET(Type)) ->
+%%------------------------------------------------------------------------------
+%% @doc Count packets received. Will not count $SYS PUBLISH.
+%% @end
+%%------------------------------------------------------------------------------
+-spec sent(mqtt_packet()) -> ok.
+sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
+    ignore;
+sent(Packet) ->
     emqttd_metrics:inc('packets/sent'),
-    sent(Type, Packet).
-sent(?CONNACK, _Packet) ->
-    inc('packets/connack');
-sent(?PUBLISH, ?PUBLISH(Qos, _PktId)) ->
+    sent1(Packet).
+sent1(?PUBLISH_PACKET(Qos, _PktId)) ->
     inc('packets/publish/sent'),
     inc('messages/sent'),
     qos_sent(Qos);
-sent(?PUBACK, _Packet) ->
+sent1(?PACKET(Type)) ->
+    sent2(Type).
+sent2(?CONNACK) ->
+    inc('packets/connack');
+sent2(?PUBACK) ->
     inc('packets/puback/sent');
-sent(?PUBREC, _Packet) ->
+sent2(?PUBREC) ->
     inc('packets/pubrec/sent');
-sent(?PUBREL, _Packet) ->
+sent2(?PUBREL) ->
     inc('packets/pubrel/sent');
-sent(?PUBCOMP, _Packet) ->
+sent2(?PUBCOMP) ->
     inc('packets/pubcomp/sent');
-sent(?SUBACK, _Packet) ->
+sent2(?SUBACK) ->
     inc('packets/suback');
-sent(?UNSUBACK, _Packet) ->
+sent2(?UNSUBACK) ->
     inc('packets/unsuback');
-sent(?PINGRESP, _Packet) ->
+sent2(?PINGRESP) ->
     inc('packets/pingresp');
-sent(_Type, _Packet) ->
+sent2(_Type) ->
     ingore.
-
 qos_sent(?QOS_0) ->
     inc('messages/qos0/sent');
 qos_sent(?QOS_1) ->

+ 3 - 3
src/emqttd_protocol.erl

@@ -261,12 +261,12 @@ process(?PACKET(?DISCONNECT), State) ->
     % clean willmsg
     {stop, normal, State#proto_state{will_msg = undefined}}.
 
-publish(Packet = ?PUBLISH(?QOS_0, _PacketId),
+publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
         #proto_state{client_id = ClientId, session = Session}) ->
     Msg = emqttd_message:from_packet(ClientId, Packet),
     emqttd_session:publish(Session, Msg);
 
-publish(Packet = ?PUBLISH(?QOS_1, PacketId),
+publish(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId),
         State = #proto_state{client_id = ClientId, session = Session}) ->
     Msg = emqttd_message:from_packet(ClientId, Packet),
     case emqttd_session:publish(Session, Msg) of
@@ -276,7 +276,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId),
             lager:error("Client(~s): publish qos1 error - ~p", [ClientId, Error])
     end;
 
-publish(Packet = ?PUBLISH(?QOS_2, PacketId),
+publish(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
         State = #proto_state{client_id = ClientId, session = Session}) ->
     Msg = emqttd_message:from_packet(ClientId, Packet),
     case emqttd_session:publish(Session, Msg) of