瀏覽代碼

improve suback

Feng Lee 10 年之前
父節點
當前提交
565c8abb3a
共有 4 個文件被更改,包括 25 次插入14 次删除
  1. 11 6
      src/emqttd_client.erl
  2. 1 4
      src/emqttd_protocol.erl
  3. 8 4
      src/emqttd_session.erl
  4. 5 0
      src/emqttd_ws_client.erl

+ 11 - 6
src/emqttd_client.erl

@@ -135,12 +135,11 @@ handle_cast(Msg, State) ->
 
 handle_info(timeout, State) ->
     stop({shutdown, timeout}, State);
-    
-handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
-                                                        conn_name   = ConnName}) ->
-    lager:warning("Shutdown for duplicate clientid: ~s, conn:~s",
-                  [emqttd_protocol:clientid(ProtoState), ConnName]),
-    stop({shutdown, duplicate_id}, State);
+
+%% Asynchronous SUBACK
+handle_info({suback, PacketId, GrantedQos}, State = #state{proto_state = ProtoState}) ->
+    {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState),
+    noreply(State#state{proto_state = ProtoState1});
 
 handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
     {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
@@ -150,6 +149,12 @@ handle_info({redeliver, {?PUBREL, PacketId}},  State = #state{proto_state = Prot
     {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
     noreply(State#state{proto_state = ProtoState1});
 
+handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
+                                                        conn_name   = ConnName}) ->
+    lager:warning("Shutdown for duplicate clientid: ~s, conn:~s",
+                  [emqttd_protocol:clientid(ProtoState), ConnName]),
+    stop({shutdown, duplicate_id}, State);
+
 handle_info(activate_sock, State) ->
     noreply(run_socket(State#state{conn_state = running}));
 

+ 1 - 4
src/emqttd_protocol.erl

@@ -240,10 +240,7 @@ process(?SUBSCRIBE_PACKET(PacketId, TopicTable),
             lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
             send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
         false ->
-            AckFun = fun(GrantedQos) ->
-                        send(?SUBACK_PACKET(PacketId, GrantedQos), State)
-                     end,
-            emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State}
+            emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
     end;
 
 %% protect from empty topic list

+ 8 - 4
src/emqttd_session.erl

@@ -167,8 +167,12 @@ destroy(SessPid, ClientId) ->
 subscribe(SessPid, TopicTable) ->
     subscribe(SessPid, TopicTable, fun(_) -> ok end).
 
--spec subscribe(pid(), [{binary(), mqtt_qos()}], AckFun :: fun()) -> ok.
-subscribe(SessPid, TopicTable, AckFun) ->
+-spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok.
+subscribe(SessPid, PacketId, TopicTable) ->
+    From   = self(),
+    AckFun = fun(GrantedQos) ->
+                 From ! {suback, PacketId, GrantedQos}
+             end,
     gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
 
 %%------------------------------------------------------------------------------
@@ -298,13 +302,13 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{
 
     case TopicTable -- Subscriptions of
         [] ->
-            catch AckFun([Qos || {_, Qos} <- TopicTable]),
+            AckFun([Qos || {_, Qos} <- TopicTable]),
             noreply(Session);
         _  ->
             %% subscribe first and don't care if the subscriptions have been existed
             {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
 
-            catch AckFun(GrantedQos),
+            AckFun(GrantedQos),
 
             emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
 

+ 5 - 0
src/emqttd_ws_client.erl

@@ -164,6 +164,11 @@ handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState})
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
+%% Asynchronous SUBACK
+handle_info({suback, PacketId, GrantedQos}, State = #client_state{proto_state = ProtoState}) ->
+    {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState),
+    noreply(State#client_state{proto_state = ProtoState1});
+
 handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) ->
     {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
     noreply(State#client_state{proto_state = ProtoState1});