소스 검색

Handle will message correctly

zhouzb 6 년 전
부모
커밋
f3a92f35f6
2개의 변경된 파일44개의 추가작업 그리고 8개의 파일을 삭제
  1. 22 7
      src/emqx_channel.erl
  2. 22 1
      src/emqx_protocol.erl

+ 22 - 7
src/emqx_channel.erl

@@ -87,7 +87,8 @@
           alive_timer  => keepalive,
           retry_timer  => retry_delivery,
           await_timer  => expire_awaiting_rel,
-          expire_timer => expire_session
+          expire_timer => expire_session,
+          will_timer   => will_message
          }).
 
 %%--------------------------------------------------------------------
@@ -342,16 +343,18 @@ handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Sessi
                                                _ -> Channel#channel{session = emqx_session:update_expiry_interval(Interval, Session)}
                                            end),
             case Interval of
-                ?UINT_MAX -> {ok, NChannel};
-                Int when Int > 0 -> {ok, ensure_timer(expire_timer, NChannel)};
+                ?UINT_MAX ->
+                    {ok, ensure_timer(will_timer, NChannel)};
+                Int when Int > 0 ->
+                    {ok, ensure_timer([will_timer, expire_timer], NChannel)};
                 _Other ->
                     Reason = case RC of
-                                 ?RC_SUCCESS -> closed;
+                                 ?RC_SUCCESS -> normal;
                                  _ ->
                                      Ver = emqx_protocol:info(proto_ver, Protocol),
                                      emqx_reason_codes:name(RC, Ver)
                              end,
-                    {stop, {shutdown, Reason}, Channel}
+                    {stop, {shutdown, Reason}, NChannel}
             end
     end;
 
@@ -680,9 +683,14 @@ timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session,
             {ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
     end;
 
-timeout(_TRef, expire_session, Channel) ->
+timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) ->
     shutdown(expired, Channel);
 
+timeout(TRef, will_message, Channel = #channel{protocol = Protocol,
+                                               timers = #{will_timer := TRef}}) ->
+    publish_will_msg(emqx_protocol:info(will_msg, Protocol)),
+    {ok, clean_timer(will_timer, Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)})};
+
 timeout(_TRef, Msg, Channel) ->
     ?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
     {ok, Channel}.
@@ -691,6 +699,11 @@ timeout(_TRef, Msg, Channel) ->
 %% Ensure timers
 %%--------------------------------------------------------------------
 
+ensure_timer([Name], Channel) ->
+    ensure_timer(Name, Channel);
+ensure_timer([Name | Rest], Channel) ->
+    ensure_timer(Rest, ensure_timer(Name, Channel));
+
 ensure_timer(Name, Channel = #channel{timers = Timers}) ->
     TRef = maps:get(Name, Timers, undefined),
     Time = interval(Name, Channel),
@@ -723,7 +736,9 @@ interval(retry_timer, #channel{session = Session}) ->
 interval(await_timer, #channel{session = Session}) ->
     emqx_session:info(await_rel_timeout, Session);
 interval(expire_timer, #channel{session = Session}) ->
-    timer:seconds(emqx_session:info(expiry_interval, Session)).
+    timer:seconds(emqx_session:info(expiry_interval, Session));
+interval(will_timer, #channel{protocol = Protocol}) ->
+    timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)).
 
 %%--------------------------------------------------------------------
 %% Terminate

+ 22 - 1
src/emqx_protocol.erl

@@ -59,13 +59,22 @@
 -spec(init(#mqtt_packet_connect{}) -> protocol()).
 init(#mqtt_packet_connect{proto_name  = ProtoName,
                           proto_ver   = ProtoVer,
+                          will_props  = WillProps,
                           clean_start = CleanStart,
                           keepalive   = Keepalive,
                           properties  = Properties,
                           client_id   = ClientId,
                           username    = Username
                          } = ConnPkt) ->
-    WillMsg = emqx_packet:will_msg(ConnPkt),
+    WillMsg = emqx_packet:will_msg(
+        case ProtoVer of
+            ?MQTT_PROTO_V5 ->
+                WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
+                ConnPkt#mqtt_packet_connect{
+                    will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
+            _ ->
+                ConnPkt
+        end),
     #protocol{proto_name  = ProtoName,
               proto_ver   = ProtoVer,
               clean_start = CleanStart,
@@ -110,6 +119,8 @@ info(username, #protocol{username = Username}) ->
     Username;
 info(will_msg, #protocol{will_msg = WillMsg}) ->
     WillMsg;
+info(will_delay_interval, #protocol{will_msg = WillMsg}) ->
+    emqx_message:get_header('Will-Delay-Interval', WillMsg, 0);
 info(conn_props, #protocol{conn_props = ConnProps}) ->
     ConnProps;
 info(topic_aliases, #protocol{topic_aliases = Aliases}) ->
@@ -137,3 +148,13 @@ save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = Aliases}) ->
 
 clear_will_msg(Protocol) ->
     Protocol#protocol{will_msg = undefined}.
+
+set_property(Name, Value, undefined) ->
+    #{Name => Value};
+set_property(Name, Value, Props) ->
+    Props#{Name => Value}.
+
+get_property(_Name, undefined, Default) ->
+    Default;
+get_property(Name, Props, Default) ->
+    maps:get(Name, Props, Default).