Просмотр исходного кода

fix(mqtt-sn): sleep mode not working #4434 (#4435)

Shawn 4 лет назад
Родитель
Сommit
7bf6ee0f4b

+ 95 - 69
apps/emqx_sn/src/emqx_sn_gateway.erl

@@ -83,11 +83,11 @@
                 keepalive_interval   :: maybe(integer()),
                 connpkt              :: term(),
                 asleep_timer         :: tuple(),
-                asleep_msg_queue     :: list(),
                 enable_stats         :: boolean(),
                 stats_timer          :: maybe(reference()),
                 idle_timeout         :: integer(),
-                enable_qos3 = false  :: boolean()
+                enable_qos3 = false  :: boolean(),
+                has_pending_pingresp = false :: boolean()
                }).
 
 -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
@@ -104,6 +104,15 @@
 
 -define(NO_PEERCERT, undefined).
 
+-define(CONN_INFO(Sockname, Peername),
+    #{socktype => udp,
+      sockname => Sockname,
+      peername => Peername,
+      protocol => 'mqtt-sn',
+      peercert => ?NO_PEERCERT,
+      conn_mod => ?MODULE
+    }).
+
 %%--------------------------------------------------------------------
 %% Exported APIs
 %%--------------------------------------------------------------------
@@ -134,13 +143,7 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
     EnableStats = proplists:get_value(enable_stats, Options, false),
     case inet:sockname(Sock) of
         {ok, Sockname} ->
-            Channel = emqx_channel:init(#{socktype => udp,
-                                          sockname => Sockname,
-                                          peername => Peername,
-                                          protocol => 'mqtt-sn',
-                                          peercert => ?NO_PEERCERT,
-                                          conn_mod => ?MODULE
-                                         }, ?DEFAULT_CHAN_OPTIONS),
+            Channel = emqx_channel:init(?CONN_INFO(Sockname, Peername), ?DEFAULT_CHAN_OPTIONS),
             State = #state{gwid             = GwId,
                            username         = Username,
                            password         = Password,
@@ -152,7 +155,6 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
                            channel          = Channel,
                            registry         = Registry,
                            asleep_timer     = emqx_sn_asleep_timer:init(),
-                           asleep_msg_queue = [],
                            enable_stats     = EnableStats,
                            enable_qos3      = EnableQos3,
                            idle_timeout     = IdleTimeout
@@ -175,9 +177,6 @@ idle(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, Sta
     #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
     do_connect(ClientId, CleanStart, Will, Duration, State);
 
-idle(cast, {incoming, Packet = ?CONNECT_PACKET(_ConnPkt)}, State) ->
-    handle_incoming(Packet, State);
-
 idle(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) ->
     % ignore
     {keep_state, State, State#state.idle_timeout};
@@ -188,7 +187,7 @@ idle(cast, {incoming, ?SN_DISCONNECT_MSG(_Duration)}, State) ->
 
 idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) ->
     ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!", [], State),
-    {keep_state_and_data, State#state.idle_timeout};
+    {keep_state, State#state.idle_timeout};
 
 idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
                                                      topic_id_type = TopicIdType
@@ -206,7 +205,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
             ok
     end,
     ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State),
-    {keep_state_and_data, State#state.idle_timeout};
+    {keep_state, State#state.idle_timeout};
 
 idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
     handle_ping(PingReq, State);
@@ -400,15 +399,23 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(undefined)}, State) ->
     % ClientId in PINGREQ is mandatory
     {keep_state, State};
 
-asleep(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(ClientIdPing)},
-       State = #state{clientid = ClientId}) ->
+asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)},
+       State = #state{clientid = ClientId, channel = Channel}) ->
     case ClientIdPing of
         ClientId ->
-            _ = handle_ping(PingReq, State),
-            self() ! do_awake_jobs,
-            % it is better to go awake state, since the jobs in awake may take long time
-            % and asleep timer get timeout, it will cause disaster
-            {next_state, awake, State};
+            inc_ping_counter(),
+            case emqx_session:dequeue(emqx_channel:get_session(Channel)) of
+                {ok, Session0} ->
+                    send_message(?SN_PINGRESP_MSG(), State),
+                    {keep_state, State#state{
+                        channel = emqx_channel:set_session(Session0, Channel)}};
+                {ok, Delivers, Session0} ->
+                    Events = [emqx_message:to_packet(PckId, Msg) || {PckId, Msg} <- Delivers]
+                             ++ [try_goto_asleep],
+                    {next_state, awake, State#state{
+                        channel = emqx_channel:set_session(Session0, Channel),
+                        has_pending_pingresp = true}, outgoing_events(Events)}
+            end;
         _Other   ->
             {next_state, asleep, State}
     end;
@@ -453,6 +460,20 @@ awake(cast, {outgoing, Packet}, State) ->
     ok = handle_outgoing(Packet, State),
     {keep_state, State};
 
+awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
+    do_puback(TopicId, MsgId, ReturnCode, awake, State);
+
+awake(cast, try_goto_asleep, State=#state{channel = Channel,
+        has_pending_pingresp = PingPending}) ->
+    case emqx_mqueue:is_empty(emqx_session:info(mqueue, emqx_channel:get_session(Channel))) of
+        true when PingPending =:= true ->
+            send_message(?SN_PINGRESP_MSG(), State),
+            goto_asleep_state(State#state{has_pending_pingresp = false});
+        true when PingPending =:= false ->
+            goto_asleep_state(State);
+        false -> keep_state_and_data
+    end;
+
 awake(EventType, EventContent, State) ->
     handle_event(EventType, EventContent, awake, State).
 
@@ -489,11 +510,12 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
             shutdown(frame_error, State)
     end;
 
-handle_event(info, Deliver = {deliver, _Topic, Msg}, asleep,
-             State = #state{asleep_msg_queue = AsleepMsgQ}) ->
+handle_event(info, {deliver, _Topic, Msg}, asleep,
+             State = #state{channel = Channel}) ->
     % section 6.14, Support of sleeping clients
     ?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg], State),
-    {keep_state, State#state{asleep_msg_queue = [Deliver|AsleepMsgQ]}};
+    Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)),
+    {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
 
 handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName,
              State = #state{channel = Channel}) ->
@@ -518,18 +540,6 @@ handle_event(info, {timeout, TRef, keepalive}, _StateName, State) ->
 handle_event(info, {timeout, TRef, TMsg}, _StateName, State) ->
     handle_timeout(TRef, TMsg, State);
 
-handle_event(info, do_awake_jobs, StateName, State=#state{clientid = ClientId}) ->
-    ?LOG(debug, "Do awake jobs, statename : ~p", [StateName], State),
-    case process_awake_jobs(ClientId, State) of
-        {keep_state, NewState} ->
-            case StateName of
-                awake  -> goto_asleep_state(NewState);
-                _Other -> {keep_state, NewState}
-                          %% device send a CONNECT immediately before this do_awake_jobs is handled
-            end;
-        Stop -> Stop
-    end;
-
 handle_event(info, asleep_timeout, asleep, State) ->
     ?LOG(debug, "asleep timer timeout, shutdown now", [], State),
     stop(asleep_timeout, State);
@@ -593,33 +603,31 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
 handle_info(Info, State = #state{channel = Channel}) ->
    handle_return(emqx_channel:handle_info(Info, Channel), State).
 
-handle_ping(_PingReq, State) ->
-    inc_counter(recv_oct, 2),
-    inc_counter(recv_msg, 1),
-    ok = send_message(?SN_PINGRESP_MSG(), State),
-    {keep_state, State}.
-
 handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
     handle_return(emqx_channel:handle_timeout(TRef, TMsg, Channel), State).
 
-handle_return({ok, NChannel}, State) ->
-    {keep_state, State#state{channel = NChannel}};
-handle_return({ok, Replies, NChannel}, State) ->
-    {keep_state, State#state{channel = NChannel}, next_events(Replies)};
+handle_return(Return, State) ->
+    handle_return(Return, State, []).
 
-handle_return({shutdown, Reason, NChannel}, State) ->
+handle_return({ok, NChannel}, State, AddEvents) ->
+    handle_return({ok, AddEvents, NChannel}, State, []);
+handle_return({ok, Replies, NChannel}, State, AddEvents) ->
+    {keep_state, State#state{channel = NChannel}, outgoing_events(append(Replies, AddEvents))};
+handle_return({shutdown, Reason, NChannel}, State, _AddEvents) ->
     stop({shutdown, Reason}, State#state{channel = NChannel});
-handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
+handle_return({shutdown, Reason, OutPacket, NChannel}, State, _AddEvents) ->
     NState = State#state{channel = NChannel},
     ok = handle_outgoing(OutPacket, NState),
     stop({shutdown, Reason}, NState).
 
-next_events(Packet) when is_record(Packet, mqtt_packet) ->
+outgoing_events(Actions) ->
+    lists:map(fun outgoing_event/1, Actions).
+
+outgoing_event(Packet) when is_record(Packet, mqtt_packet);
+                            is_record(Packet, mqtt_sn_message)->
     next_event({outgoing, Packet});
-next_events(Action) when is_tuple(Action) ->
-    next_event(Action);
-next_events(Actions) when is_list(Actions) ->
-    lists:map(fun next_event/1, Actions).
+outgoing_event(Action) ->
+    next_event(Action).
 
 close_socket(State = #state{sockstate = closed}) -> State;
 close_socket(State = #state{socket = _Socket}) ->
@@ -673,6 +681,13 @@ call(Pid, Req) ->
 %%--------------------------------------------------------------------
 %% Internal Functions
 %%--------------------------------------------------------------------
+handle_ping(_PingReq, State) ->
+    ok = send_message(?SN_PINGRESP_MSG(), State),
+    inc_ping_counter(),
+    {keep_state, State}.
+
+inc_ping_counter() ->
+    inc_counter(recv_msg, 1).
 
 mqtt2sn(?CONNACK_PACKET(0, _SessPresent),  _State) ->
     ?SN_CONNACK_MSG(0);
@@ -786,11 +801,17 @@ mqttsn_to_mqtt(?SN_PUBCOMP, MsgId) ->
     ?PUBCOMP_PACKET(MsgId).
 
 do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
+    %% 6.6 Client’s Publish Procedure
+    %% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
+    %% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
+    %% before it could start a new level 1 or 2 transaction.
+    OnlyOneInflight = #{'Receive-Maximum' => 1},
     ConnPkt = #mqtt_packet_connect{clientid    = ClientId,
                                    clean_start = CleanStart,
                                    username    = State#state.username,
                                    password    = State#state.password,
-                                   keepalive   = Duration
+                                   keepalive   = Duration,
+                                   properties  = OnlyOneInflight
                                   },
     put(clientid, ClientId),
     case WillFlag of
@@ -939,11 +960,11 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
     _ = emqx_broker:publish(emqx_packet:to_message(Publish, ClientId)),
     ok.
 
-do_puback(TopicId, MsgId, ReturnCode, _StateName,
+do_puback(TopicId, MsgId, ReturnCode, StateName,
           State=#state{clientid = ClientId, registry = Registry}) ->
     case ReturnCode of
         ?SN_RC_ACCEPTED ->
-            handle_incoming(?PUBACK_PACKET(MsgId), State);
+            handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
         ?SN_RC_INVALID_TOPIC_ID ->
             case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
                 undefined -> ok;
@@ -990,15 +1011,6 @@ update_will_msg(undefined, Msg) ->
 update_will_msg(Will = #will_msg{}, Msg) ->
     Will#will_msg{payload = Msg}.
 
-process_awake_jobs(_ClientId, State = #state{asleep_msg_queue = []}) ->
-    {keep_state, State};
-process_awake_jobs(_ClientId, State = #state{channel = Channel,
-                                             asleep_msg_queue = AsleepMsgQ}) ->
-    Delivers = lists:reverse(AsleepMsgQ),
-    NState = State#state{asleep_msg_queue = []},
-    Result = emqx_channel:handle_deliver(Delivers, Channel),
-    handle_return(Result, NState).
-
 enqueue_msgid(suback, MsgId, TopicId) ->
     put({suback, MsgId}, TopicId);
 enqueue_msgid(puback, MsgId, TopicId) ->
@@ -1022,12 +1034,21 @@ get_topic_id(Type, MsgId) ->
         TopicId -> TopicId
     end.
 
-handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
+handle_incoming(Packet, State) ->
+    handle_incoming(Packet, unknown, State).
+
+handle_incoming(?PUBACK_PACKET(_) = Packet, awake, State) ->
+    Result = channel_handle_in(Packet, State),
+    handle_return(Result, State, [try_goto_asleep]);
+handle_incoming(Packet, _StName, State) ->
+    Result = channel_handle_in(Packet, State),
+    handle_return(Result, State).
+
+channel_handle_in(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
     _ = inc_incoming_stats(Type),
     ok = emqx_metrics:inc_recv(Packet),
     ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)], State),
-    Result = emqx_channel:handle_in(Packet, Channel),
-    handle_return(Result, State).
+    emqx_channel:handle_in(Packet, Channel).
 
 handle_outgoing(Packets, State) when is_list(Packets) ->
     lists:foreach(fun(Packet) -> handle_outgoing(Packet, State) end, Packets);
@@ -1081,3 +1102,8 @@ next_event(Content) ->
 inc_counter(Key, Inc) ->
     _ = emqx_pd:inc_counter(Key, Inc),
     ok.
+
+append(Replies, AddEvents) when is_list(Replies) ->
+    Replies ++ AddEvents;
+append(Replies, AddEvents) ->
+    [Replies] ++ AddEvents.

+ 11 - 17
apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

@@ -402,8 +402,6 @@ t_publish_negqos_case09(_) ->
             What = receive_response(Socket),
             ?assertEqual(Eexp, What)
     end,
-    %% dbg:start(), dbg:tracer(), dbg:p(all, c),
-    %% dbg:tpl(emqx_sn_gateway, send_message, x),
 
     send_disconnect_msg(Socket, undefined),
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@@ -1049,14 +1047,15 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
 
     % goto awake state, receive downlink messages, and go back to asleep
     send_pingreq_msg(Socket, ClientId),
-    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
-
-    %% {unexpected_udp_data, _} = receive_response(Socket),
 
+    %% the broker should sent dl msgs to the awake client before sending the pingresp
     UdpData = receive_response(Socket),
     MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData),
     send_puback_msg(Socket, TopicId1, MsgId_udp),
 
+    %% check the pingresp is received at last
+    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
+
     gen_udp:close(Socket).
 
 t_asleep_test04_to_awake_qos1_dl_msg(_) ->
@@ -1106,8 +1105,6 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
     % goto awake state, receive downlink messages, and go back to asleep
     send_pingreq_msg(Socket, <<"test">>),
 
-    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
-
     %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
     %% get REGISTER first, since this topic has never been registered
     %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1119,6 +1116,8 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
     MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData),
     send_puback_msg(Socket, TopicIdNew, MsgId_udp),
 
+    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
+
     gen_udp:close(Socket).
 
 t_asleep_test05_to_awake_qos1_dl_msg(_) ->
@@ -1173,7 +1172,6 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
 
     % goto awake state, receive downlink messages, and go back to asleep
     send_pingreq_msg(Socket, <<"test">>),
-    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
     UdpData_reg = receive_response(Socket),
     {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg),
@@ -1197,7 +1195,7 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
                                                TopicIdNew, Payload4}, UdpData4),
             send_puback_msg(Socket, TopicIdNew, MsgId4)
     end,
-    timer:sleep(50),
+    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
     gen_udp:close(Socket).
 
 t_asleep_test06_to_awake_qos2_dl_msg(_) ->
@@ -1249,14 +1247,12 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
 
     % goto awake state, receive downlink messages, and go back to asleep
     send_pingreq_msg(Socket, <<"test">>),
-    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
     UdpData = wrap_receive_response(Socket),
     MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
     send_pubrec_msg(Socket, MsgId_udp),
 
-    timer:sleep(300),
-
+    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
     gen_udp:close(Socket).
 
 t_asleep_test07_to_connected(_) ->
@@ -1391,8 +1387,6 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
     % goto awake state, receive downlink messages, and go back to asleep
     send_pingreq_msg(Socket, <<"test">>),
 
-    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
-
     UdpData_reg = receive_response(Socket),
     {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg),
     send_regack_msg(Socket, TopicIdNew, MsgId_reg),
@@ -1424,7 +1418,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
                                        TopicIdNew, Payload4}, UdpData4),
             send_puback_msg(Socket, TopicIdNew, MsgId4)
     end,
-    timer:sleep(100),
+    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
     %% send PINGREQ again to enter awake state
     send_pingreq_msg(Socket, <<"test">>),
@@ -1787,7 +1781,7 @@ wrap_receive_response(Socket) ->
             Other
     end.
 receive_response(Socket) ->
-    receive_response(Socket, 5000).
+    receive_response(Socket, 2000).
 receive_response(Socket, Timeout) ->
     receive
         {udp, Socket, _, _, Bin} ->
@@ -1832,8 +1826,8 @@ get_udp_broadcast_address() ->
     "255.255.255.255".
 
 check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) ->
-    ct:pal("UdpData: ~p, Payload: ~p", [UdpData, Payload]),
     <<HeaderUdp:5/binary, MsgId:16, PayloadIn/binary>> = UdpData,
+    ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]),
     Size9 = byte_size(Payload) + 7,
     Eexp = <<Size9:8, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, TopicType:2, TopicId:16>>,
     ?assertEqual(Eexp, HeaderUdp),     % mqtt-sn header should be same

+ 8 - 0
src/emqx_channel.erl

@@ -32,6 +32,8 @@
 -export([ info/1
         , info/2
         , set_conn_state/2
+        , get_session/1
+        , set_session/2
         , stats/1
         , caps/1
         ]).
@@ -167,6 +169,12 @@ info(timers, #channel{timers = Timers}) -> Timers.
 set_conn_state(ConnState, Channel) ->
     Channel#channel{conn_state = ConnState}.
 
+get_session(#channel{session = Session}) ->
+    Session.
+
+set_session(Session, Channel) ->
+    Channel#channel{session = Session}.
+
 %% TODO: Add more stats.
 -spec(stats(channel()) -> emqx_types:stats()).
 stats(#channel{session = Session})->

+ 1 - 0
src/emqx_session.erl

@@ -75,6 +75,7 @@
 
 -export([ deliver/2
         , enqueue/2
+        , dequeue/1
         , retry/1
         , terminate/3
         ]).