Browse Source

Merge pull request #1439 from emqtt/develop

Fix issue #1216 - Redeliver pubrel packet
Feng Lee 8 years ago
parent
commit
b041f05f08
2 changed files with 14 additions and 5 deletions
  1. 1 1
      README.md
  2. 13 4
      src/emqttd_session.erl

+ 1 - 1
README.md

@@ -77,7 +77,7 @@ Plugin                                                                 | Descrip
 -----------------------------------------------------------------------|--------------------------------------
 -----------------------------------------------------------------------|--------------------------------------
 [emq_plugin_template](https://github.com/emqtt/emq_plugin_template)    | Plugin template and demo
 [emq_plugin_template](https://github.com/emqtt/emq_plugin_template)    | Plugin template and demo
 [emq_dashboard](https://github.com/emqtt/emq_dashboard)                | Web Dashboard
 [emq_dashboard](https://github.com/emqtt/emq_dashboard)                | Web Dashboard
-[emq_retainer](https://github.com/emqtt/emq_retainer)                  | Store MQTT Retained Messages
+[emq_retainer](https://github.com/emqtt/emq-retainer)                  | Store MQTT Retained Messages
 [emq_modules](https://github.com/emqtt/emq-modules)                    | Presence, Subscription and Rewrite Modules
 [emq_modules](https://github.com/emqtt/emq-modules)                    | Presence, Subscription and Rewrite Modules
 [emq_auth_username](https://github.com/emqtt/emq_auth_username)        | Username/Password Authentication Plugin
 [emq_auth_username](https://github.com/emqtt/emq_auth_username)        | Username/Password Authentication Plugin
 [emq_auth_clientid](https://github.com/emqtt/emq_auth_clientid)        | ClientId Authentication Plugin
 [emq_auth_clientid](https://github.com/emqtt/emq_auth_clientid)        | ClientId Authentication Plugin

+ 13 - 4
src/emqttd_session.erl

@@ -453,6 +453,8 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
     {noreply,
     {noreply,
      case maps:take(PacketId, AwaitingRel) of
      case maps:take(PacketId, AwaitingRel) of
          {Msg, AwaitingRel1} ->
          {Msg, AwaitingRel1} ->
+             %% Implement Qos2 by method A [MQTT 4.33]
+             %% Dispatch to subscriber when received PUBREL
              spawn(emqttd_server, publish, [Msg]), %%:)
              spawn(emqttd_server, publish, [Msg]), %%:)
              gc(State#state{awaiting_rel = AwaitingRel1});
              gc(State#state{awaiting_rel = AwaitingRel1});
          error ->
          error ->
@@ -628,8 +630,10 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now,
                     redeliver(Msg, State),
                     redeliver(Msg, State),
                     Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
                     Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
                     retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
                     retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
-                {pubrel, PacketId} -> %% remove 'pubrel' directly?
-                    retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight:delete(PacketId)})
+                {pubrel, PacketId} ->
+                    redeliver({pubrel, PacketId}, State),
+                    Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}),
+                    retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1})
             end;
             end;
         true ->
         true ->
             State#state{retry_timer = start_timer(Interval - Diff, retry_delivery)}
             State#state{retry_timer = start_timer(Interval - Diff, retry_delivery)}
@@ -649,11 +653,13 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
 expire_awaiting_rel([], _Now, State) ->
 expire_awaiting_rel([], _Now, State) ->
     State#state{await_rel_timer = undefined};
     State#state{await_rel_timer = undefined};
 
 
-expire_awaiting_rel([{PacketId, #mqtt_message{timestamp = TS}} | Msgs],
+expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs],
                     Now, State = #state{awaiting_rel      = AwaitingRel,
                     Now, State = #state{awaiting_rel      = AwaitingRel,
                                         await_rel_timeout = Timeout}) ->
                                         await_rel_timeout = Timeout}) ->
     case (timer:now_diff(Now, TS) div 1000) of
     case (timer:now_diff(Now, TS) div 1000) of
         Diff when Diff >= Timeout ->
         Diff when Diff >= Timeout ->
+            ?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State),
+            emqttd_metrics:inc('messages/qos2/dropped'),
             expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
             expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
         Diff ->
         Diff ->
             State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)}
             State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)}
@@ -714,7 +720,10 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
 redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
-    deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State).
+    deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State);
+
+redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
+    Pid ! {redeliver, {?PUBREL, PacketId}}.
 
 
 deliver(Msg, #state{client_pid = Pid}) ->
 deliver(Msg, #state{client_pid = Pid}) ->
     inc_stats(deliver_msg),
     inc_stats(deliver_msg),