소스 검색

fix issue #440 - mqtt_message record: add 'sender' field

Feng 10 년 전
부모
커밋
0d21e5c911
2개의 변경된 파일27개의 추가작업 그리고 10개의 파일을 삭제
  1. 20 6
      src/emqttd_message.erl
  2. 7 4
      src/emqttd_protocol.erl

+ 20 - 6
src/emqttd_message.erl

@@ -21,12 +21,17 @@
 
 -include("emqttd_protocol.hrl").
 
--export([make/3, make/4, from_packet/1, from_packet/2, to_packet/1]).
+-export([make/3, make/4, from_packet/1, from_packet/2, from_packet/3,
+         to_packet/1]).
 
 -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
 
 -export([format/1]).
 
+-ifdef(TEST).
+-compile(export_all).
+-endif.
+
 %% @doc Make a message
 -spec make(From, Topic, Payload) -> mqtt_message() when
     From    :: atom() | binary(),
@@ -72,12 +77,16 @@ from_packet(#mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
 from_packet(#mqtt_packet_connect{will_flag  = false}) ->
     undefined;
 
-from_packet(#mqtt_packet_connect{will_retain = Retain,
+from_packet(#mqtt_packet_connect{client_id   = ClientId,
+                                 username    = Username,
+                                 will_retain = Retain,
                                  will_qos    = Qos,
                                  will_topic  = Topic,
                                  will_msg    = Msg}) ->
     #mqtt_message{msgid     = msgid(Qos),
                   topic     = Topic,
+                  from      = ClientId,
+                  sender    = Username,
                   retain    = Retain,
                   qos       = Qos,
                   dup       = false,
@@ -85,7 +94,12 @@ from_packet(#mqtt_packet_connect{will_retain = Retain,
                   timestamp = os:timestamp()}.
 
 from_packet(ClientId, Packet) ->
-    Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}.
+    Msg = from_packet(Packet),
+    Msg#mqtt_message{from = ClientId}.
+
+from_packet(Username, ClientId, Packet) ->
+    Msg = from_packet(Packet),
+    Msg#mqtt_message{from = ClientId, sender = Username}.
 
 msgid(?QOS_0) ->
     undefined;
@@ -140,10 +154,10 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
 unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
 
 %% @doc Format MQTT Message
-format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From,
+format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From, sender = Sender,
                      qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
-    io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)",
-                  [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]).
+    io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Sender=~s, Topic=~s)",
+                  [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Sender, Topic]).
 
 i(true)  -> 1;
 i(false) -> 0;

+ 7 - 4
src/emqttd_protocol.erl

@@ -224,8 +224,9 @@ process(?PACKET(?DISCONNECT), State) ->
     {stop, normal, State#proto_state{will_msg = undefined}}.
 
 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
-        #proto_state{client_id = ClientId, session = Session}) ->
-    emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
+        #proto_state{client_id = ClientId, username = Username, session = Session}) ->
+    Msg = emqttd_message:from_packet(Username, ClientId, Packet),
+    emqttd_session:publish(Session, Msg);
 
 publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
     with_puback(?PUBACK, Packet, State);
@@ -234,8 +235,10 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
     with_puback(?PUBREC, Packet, State).
 
 with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
-            State = #proto_state{client_id = ClientId, session = Session}) ->
-    Msg = emqttd_message:from_packet(ClientId, Packet),
+            State = #proto_state{client_id = ClientId,
+                                 username = Username,
+                                 session = Session}) ->
+    Msg = emqttd_message:from_packet(Username, ClientId, Packet),
     case emqttd_session:publish(Session, Msg) of
         ok ->
             send(?PUBACK_PACKET(Type, PacketId), State);