Przeglądaj źródła

feat(mqttsn): support asleep client

JianBo He 4 lat temu
rodzic
commit
98b97ad217
1 zmienionych plików z 146 dodań i 46 usunięć
  1. 146 46
      apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

+ 146 - 46
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -71,6 +71,8 @@
           register_inflight :: maybe(term()),
           %% Topics list for awaiting to register to client
           register_awaiting_queue :: list(),
+          %% Duration for asleep
+          asleep_timer_duration :: integer() | undefined,
           %% Timer
           timers :: #{atom() => disable | undefined | reference()},
           %%% Takeover
@@ -81,16 +83,17 @@
           pendings :: list()
          }).
 
--type(channel() :: #channel{}).
+-type channel() :: #channel{}.
 
--type(conn_state() :: idle | connecting | connected | asleep | disconnected).
+-type conn_state() :: idle | connecting | connected | asleep | awake
+                    | disconnected.
 
--type(reply() :: {outgoing, mqtt_sn_message()}
+-type reply() :: {outgoing, mqtt_sn_message()}
                | {outgoing, [mqtt_sn_message()]}
                | {event, conn_state()|updated}
-               | {close, Reason :: atom()}).
+               | {close, Reason :: atom()}.
 
--type(replies() :: reply() | [reply()]).
+-type replies() :: reply() | [reply()].
 
 -define(TIMER_TABLE, #{
           alive_timer    => keepalive,
@@ -471,8 +474,25 @@ handle_in(?SN_WILLMSG_MSG(Payload),
             handle_out(connack, ReasonCode, Channel)
     end;
 
+%% TODO: takeover ???
+handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, ClientId),
+          Channel = #channel{
+                       clientinfo = #{clientid := ClientId},
+                       conn_state = ConnState})
+  when ConnState == asleep;
+       ConnState == awake ->
+    %% From the asleep or awake state a client can return either to the
+    %% active state by sending a CONNECT message [6.14]
+    ?SLOG(info, #{ msg => "goto_connected_state"
+                 , previous_state => ConnState
+                 , clientid => ClientId
+                 }),
+    handle_out(connack, ?SN_RC_ACCEPTED,
+               Channel#channel{conn_state = connected});
+
+%% new connection
 handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
-          Channel) ->
+          Channel = #channel{conn_state = idle}) ->
     case emqx_misc:pipeline(
            [ fun enrich_conninfo/2
            , fun run_conn_hooks/2
@@ -589,7 +609,10 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
             case emqx_session:puback(ClientInfo, MsgId, Session) of
                 {ok, Msg, NSession} ->
                     ok = after_message_acked(ClientInfo, Msg, Channel),
-                    {ok, Channel#channel{session = NSession}};
+                    {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
+                                            Channel#channel{session = NSession}
+                                           ),
+                    {ok, Replies, NChannel};
                 {ok, Msg, Publishes, NSession} ->
                     ok = after_message_acked(ClientInfo, Msg, Channel),
                     handle_out(publish,
@@ -672,7 +695,10 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
           Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
     case emqx_session:pubcomp(ClientInfo, MsgId, Session) of
         {ok, NSession} ->
-            {ok, Channel#channel{session = NSession}};
+            {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
+                                    Channel#channel{session = NSession}
+                                   ),
+            {ok, Replies, NChannel};
         {ok, Publishes, NSession} ->
             handle_out(publish, Publishes,
                        Channel#channel{session = NSession});
@@ -732,32 +758,47 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName),
             {ok, {outgoing, UnsubAck}, NChannel}
     end;
 
-handle_in(?SN_PINGREQ_MSG(_ClientId),
-          Channel = #channel{conn_state = asleep}) ->
-    {ok, Outgoing, NChannel} = awake(Channel),
-    NOutgoings = Outgoing ++ [{outgoing, ?SN_PINGRESP_MSG()}],
-    {ok, NOutgoings, NChannel};
-
-handle_in(?SN_PINGREQ_MSG(_ClientId), Channel) ->
+handle_in(?SN_PINGREQ_MSG(ClientId), Channel)
+  when ClientId == undefined;
+       ClientId == <<>> ->
     {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
 
-handle_in(?SN_PINGRESP_MSG(), Channel) ->
+handle_in(?SN_PINGREQ_MSG(ReqClientId),
+          Channel = #channel{clientinfo = #{clientid := ClientId}})
+  when ReqClientId =/= ClientId ->
+    ?SLOG(warning, #{ msg => "awake_pingreq_clientid_not_match"
+                    , clientid => ClientId
+                    , request_clientid => ReqClientId
+                    }),
+    %% FIXME: takeover_and_awake..
     {ok, Channel};
 
-handle_in(?SN_DISCONNECT_MSG(Duration), Channel) ->
-    case Duration of
-        undefined ->
-            handle_out(disconnect, normal, Channel);
-        _ ->
-            %% A DISCONNECT message with a Duration field is sent by a client
-            %% when it wants to go to the “asleep” state. The receipt of this
-            %% message is also acknowledged by the gateway by means of a
-            %% DISCONNECT message (without a duration field) [5.4.21]
-            %%
-            %% TODO: asleep mechanism
-            AckPkt = ?SN_DISCONNECT_MSG(undefined),
-            {ok, {outgoing, AckPkt}, asleep(Duration, Channel)}
-    end;
+handle_in(?SN_PINGREQ_MSG(ClientId),
+          Channel = #channel{conn_state = ConnState})
+  when ConnState == idle; ConnState == asleep; ConnState == awake ->
+    awake(ClientId, Channel);
+
+handle_in(?SN_PINGREQ_MSG(ClientId),
+          Channel = #channel{conn_state = ConnState}) ->
+    ?SLOG(error, #{ msg => "awake_pingreq_in_bad_conn_state"
+                  , conn_state => ConnState
+                  , clientid => ClientId
+                  }),
+    handle_out(disconnect, protocol_error, Channel);
+
+handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) ->
+    handle_out(disconnect, normal, Channel);
+
+handle_in(?SN_DISCONNECT_MSG(Duration),
+          Channel = #channel{conn_state = ConnState})
+  when ConnState == connected; ConnState == asleep ->
+    %% A DISCONNECT message with a Duration field is sent by a client
+    %% when it wants to go to the “asleep” state. The receipt of this
+    %% message is also acknowledged by the gateway by means of a
+    %% DISCONNECT message (without a duration field) [5.4.21]
+    %%
+    AckPkt = ?SN_DISCONNECT_MSG(undefined),
+    {ok, [{outgoing, AckPkt}, {event, asleep}], asleep(Duration, Channel)};
 
 handle_in(?SN_WILLTOPICUPD_MSG(Flags, Topic),
           Channel = #channel{will_msg = WillMsg,
@@ -1100,7 +1141,24 @@ do_unsubscribe(TopicFilters,
 %%--------------------------------------------------------------------
 %% Awake & Asleep
 
-awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
+awake(ClientId, Channel = #channel{conn_state = idle}) ->
+    ?SLOG(warning, #{ msg => "awake_pingreq_in_idle_state"
+                    , clientid => ClientId
+                    }),
+    %% TODO: takeover and awake?
+    %% 1. Query emqx_cm_registry to get the session state?
+    %% 2. Takeover it and goto awake state
+    {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
+
+awake(ClientId, Channel = #channel{
+                             conn_state = ConnState,
+                             session = Session,
+                             clientinfo = ClientInfo = #{clientid := ClientId}})
+  when ConnState == asleep; ConnState == awake ->
+    ?SLOG(info, #{ msg => "goto_awake_state"
+                 , clientid => ClientId
+                 , previous_state => ConnState
+                 }),
     {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
     {NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of
         {ok, Session2} ->
@@ -1108,24 +1166,57 @@ awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
         {ok, More, Session2} ->
             {lists:append(Publishes, More), Session2}
     end,
-    {Replies, NChannel} = outgoing_deliver_and_register(
-                            do_deliver(NPublishes,
-                                       Channel#channel{session = NSession})
-                           ),
-    {ok, Replies, NChannel}.
+    Channel1 = cancel_timer(asleep_timer, Channel),
+    {Replies0, NChannel0} = outgoing_deliver_and_register(
+                              do_deliver(
+                                NPublishes,
+                                Channel1#channel{
+                                  conn_state = awake, session = NSession}
+                               )
+                             ),
+    Replies1 = [{event, awake} | Replies0],
+
+    {Replies2, NChannel} = goto_asleep_if_buffered_msgs_sent(NChannel0),
+    {ok, Replies1 ++ Replies2, NChannel}.
+
+goto_asleep_if_buffered_msgs_sent(
+  Channel = #channel{
+               conn_state = awake,
+               session = Session,
+               asleep_timer_duration = Duration}) ->
+    case emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso
+         emqx_inflight:is_empty(emqx_session:info(inflight, Session)) of
+        true ->
+            ?SLOG(info, #{ msg => "goto_asleep_state"
+                         , reason => buffered_messages_sent
+                         , duration => Duration
+                         }),
+            Replies = [ {outgoing, ?SN_PINGRESP_MSG()}
+                      , {event, asleep}
+                      ],
+            {Replies, ensure_asleep_timer(Channel#channel{conn_state = asleep})};
+        false ->
+            {[], Channel}
+    end;
+goto_asleep_if_buffered_msgs_sent(Channel) ->
+    {[], Channel}.
 
 asleep(Duration, Channel = #channel{conn_state = asleep}) ->
     %% 6.14: The client can also modify its sleep duration
     %% by sending a DISCONNECT message with a new value of
     %% the sleep duration
-    ensure_timer(asleep_timer, Duration,
-                 cancel_timer(asleep_timer, Channel)
-                );
+    %%
+    %% XXX: Do we need to limit the maximum of Duration?
+    ?SLOG(debug, #{ msg => "update_asleep_timer"
+                  , new_duration => Duration
+                  }),
+    ensure_asleep_timer(Duration, cancel_timer(asleep_timer, Channel));
 
 asleep(Duration, Channel = #channel{conn_state = connected}) ->
-    ensure_timer(asleep_timer, Duration,
-                 Channel#channel{conn_state = asleep}
-                ).
+    ?SLOG(info, #{ msg => "goto_asleep_state"
+                 , duration => Duration
+                 }),
+    ensure_asleep_timer(Duration, Channel#channel{conn_state = asleep}).
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packet
@@ -1154,10 +1245,11 @@ handle_out(connack, ReasonCode,
     shutdown(Reason, AckPacket, Channel);
 
 handle_out(publish, Publishes, Channel) ->
-    {Replies, NChannel} = outgoing_deliver_and_register(
-                            do_deliver(Publishes, Channel)
-                           ),
-    {ok, Replies, NChannel};
+    {Replies1, NChannel} = outgoing_deliver_and_register(
+                             do_deliver(Publishes, Channel)
+                            ),
+    {Replies2, NChannel2} = goto_asleep_if_buffered_msgs_sent(NChannel),
+    {ok, Replies1 ++ Replies2, NChannel2};
 
 handle_out(puback, {TopicId, MsgId, Rc}, Channel) ->
     {ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel};
@@ -1688,6 +1780,14 @@ update_will_msg(Will, Payload) ->
 %%--------------------------------------------------------------------
 %% Timer
 
+ensure_asleep_timer(Channel = #channel{asleep_timer_duration = Duration})
+  when is_integer(Duration) ->
+    ensure_asleep_timer(Duration, Channel).
+
+ensure_asleep_timer(Durtion, Channel) ->
+    ensure_timer(asleep_timer, timer:seconds(Durtion),
+                 Channel#channel{asleep_timer_duration = Durtion}).
+
 cancel_timer(Name, Channel = #channel{timers = Timers}) ->
     case maps:get(Name, Timers, undefined) of
         undefined ->