Browse Source

merge mqtt-sn

Feng Lee 9 years atrás
parent
commit
a0e2d2981a
3 changed files with 16 additions and 8 deletions
  1. 7 1
      src/emqttd_client.erl
  2. 3 6
      src/emqttd_protocol.erl
  3. 6 1
      src/emqttd_ws_client.erl

+ 7 - 1
src/emqttd_client.erl

@@ -87,7 +87,13 @@ init([OriginConn, MqttEnv]) ->
     end,
     ConnName = esockd_net:format(PeerName),
     Self = self(),
-    SendFun = fun(Data) ->
+
+    %%TODO: Send packet...
+    SendFun = fun(Packet) ->
+        Data = emqttd_serializer:serialize(Packet),
+        %%TODO: How to Log???
+        ?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}),
+        emqttd_metrics:inc('bytes/sent', size(Data)),
         try Connection:async_send(Data) of
             true -> ok
         catch

+ 3 - 6
src/emqttd_protocol.erl

@@ -236,8 +236,8 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
 
 with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
             State = #proto_state{client_id = ClientId,
-                                 username = Username,
-                                 session = Session}) ->
+                                 username  = Username,
+                                 session   = Session}) ->
     Msg = emqttd_message:from_packet(Username, ClientId, Packet),
     case emqttd_session:publish(Session, Msg) of
         ok ->
@@ -256,10 +256,7 @@ send(Packet, State = #proto_state{sendfun = SendFun})
     when is_record(Packet, mqtt_packet) ->
     trace(send, Packet, State),
     emqttd_metrics:sent(Packet),
-    Data = emqttd_serializer:serialize(Packet),
-    ?LOG(debug, "SEND ~p", [Data], State),
-    emqttd_metrics:inc('bytes/sent', size(Data)),
-    SendFun(Data),
+    SendFun(Packet),
     {ok, State}.
 
 trace(recv, Packet, ProtoState) ->

+ 6 - 1
src/emqttd_ws_client.erl

@@ -66,7 +66,12 @@ init([MqttEnv, WsPid, Req, ReplyChannel]) ->
     {ok, Peername} = Req:get(peername),
     Headers = mochiweb_headers:to_list(
                 mochiweb_request:get(headers, Req)),
-    SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
+    %% SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
+    SendFun = fun(Packet) ->
+                  Data = emqttd_serializer:serialize(Packet),
+                  emqttd_metrics:inc('bytes/sent', size(Data)),
+                  ReplyChannel({binary, Data})
+              end,
     ProtoState = emqttd_protocol:init(Peername, SendFun,
                                       [{ws_initial_headers, Headers} | MqttEnv]),
     {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer),