Просмотр исходного кода

fix(mqtt-sn): Fix stop due to keepAlive at sleep mode

Turtle 4 лет назад
Родитель
Сommit
e8790f6e11

+ 1 - 1
apps/emqx_sn/src/emqx_sn.app.src

@@ -1,6 +1,6 @@
 {application, emqx_sn,
  [{description, "EMQ X MQTT-SN Plugin"},
-  {vsn, "4.3.0"}, % strict semver, bump manually!
+  {vsn, "4.3.1"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,esockd]},

+ 17 - 0
apps/emqx_sn/src/emqx_sn.appup.src

@@ -0,0 +1,17 @@
+%% -*-: erlang -*-
+{VSN,
+ [
+   {"4.3.0", [
+     {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
+     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
+   ]},
+   {<<".*">>, []}
+ ],
+ [
+   {"4.3.0", [
+     {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
+     {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
+   ]},
+   {<<".*">>, []}
+ ]
+}.

+ 10 - 5
apps/emqx_sn/src/emqx_sn_asleep_timer.erl

@@ -18,6 +18,7 @@
 
 -export([ init/0
         , ensure/2
+        , cancel/1
         ]).
 
 -record(asleep_state, {
@@ -42,8 +43,8 @@ init() ->
 -spec(ensure(undefined | integer(), asleep_state()) -> asleep_state()).
 ensure(undefined, State = #asleep_state{duration = Duration}) ->
     ensure(Duration, State);
-ensure(Duration, State = #asleep_state{tref = TRef}) ->
-    _ = cancel(TRef),
+ensure(Duration, State) ->
+    cancel(State),
     State#asleep_state{duration = Duration, tref = start(Duration)}.
 
 %%--------------------------------------------------------------------
@@ -55,6 +56,10 @@ ensure(Duration, State = #asleep_state{tref = TRef}) ->
 start(Duration) ->
     erlang:send_after(timer:seconds(Duration), self(), asleep_timeout).
 
-cancel(undefined) -> ok;
-cancel(TRef) when is_reference(TRef) ->
-    erlang:cancel_timer(TRef).
+cancel(#asleep_state{tref = Timer}) when is_reference(Timer) ->
+    case erlang:cancel_timer(Timer) of
+        false ->
+            receive {timeout, Timer, _} -> ok after 0 -> ok end;
+        _ -> ok
+    end;
+cancel(_) -> ok.

+ 10 - 8
apps/emqx_sn/src/emqx_sn_gateway.erl

@@ -439,12 +439,11 @@ asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
 %    4) emq-sn regard this CONNECT as a signal to connected state, not a bootup CONNECT. For this reason, will procedure is lost
 % this should be a bug in mqtt-sn channel.
 asleep(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)},
-       State = #state{keepalive_interval = _Interval}) ->
-    % device wakeup and goto connected state
-    % keepalive timer may timeout in asleep state and delete itself, need to restart keepalive
-    % TODO: Fixme later.
-    %% self() ! {keepalive, start, Interval},
-    {next_state, connected, send_connack(State)};
+       State = #state{channel = Channel, asleep_timer = Timer}) ->
+    NChannel = emqx_channel:ensure_keepalive(#{}, Channel),
+    emqx_sn_asleep_timer:cancel(Timer),
+    {next_state, connected, send_connack(State#state{channel = NChannel,
+                                                     asleep_timer = emqx_sn_asleep_timer:init()})};
 
 asleep(EventType, EventContent, State) ->
     handle_event(EventType, EventContent, asleep, State).
@@ -771,10 +770,13 @@ send_message(Msg = #mqtt_sn_message{type = Type},
 
 goto_asleep_state(State) ->
     goto_asleep_state(undefined, State).
-goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) ->
+goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer,
+                                         channel = Channel}) ->
     ?LOG(debug, "goto_asleep_state Duration=~p", [Duration]),
     NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer),
-    {next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}.
+    NChannel = emqx_channel:clear_keepalive(Channel),
+    {next_state, asleep, State#state{asleep_timer = NewTimer,
+                                     channel = NChannel}, hibernate}.
 
 %%--------------------------------------------------------------------
 %% Helper funcs

+ 5 - 5
apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

@@ -856,7 +856,7 @@ t_will_test2(_) ->
     send_pingreq_msg(Socket, undefined),
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
-    timer:sleep(10000),
+    timer:sleep(4000),
 
     receive_response(Socket), % ignore PUBACK
     receive_response(Socket), % ignore PUBCOMP
@@ -878,7 +878,7 @@ t_will_test3(_) ->
     send_pingreq_msg(Socket, undefined),
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
-    timer:sleep(10000),
+    timer:sleep(4000),
 
     ?assertEqual(udp_receive_timeout, receive_response(Socket)),
 
@@ -906,7 +906,7 @@ t_will_test4(_) ->
     send_willmsgupd_msg(Socket, <<"1A2B3C">>),
     ?assertEqual(<<3, ?SN_WILLMSGRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
 
-    timer:sleep(10000),
+    timer:sleep(4000),
 
     receive_response(Socket), % ignore PUBACK
 
@@ -1359,7 +1359,7 @@ t_asleep_test07_to_connected(_) ->
     timer:sleep(1500),
     % asleep timer should get timeout, without any effect
 
-    timer:sleep(9000),
+    timer:sleep(4000),
     % keepalive timer should get timeout
 
     gen_udp:close(Socket).
@@ -1517,7 +1517,7 @@ t_awake_test01_to_connected(_) ->
     timer:sleep(1500),
     % asleep timer should get timeout
 
-    timer:sleep(9000),
+    timer:sleep(4000),
     % keepalive timer should get timeout
     gen_udp:close(Socket).
 

+ 12 - 3
src/emqx.appup.src

@@ -5,7 +5,8 @@
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
-     {load_module, emqx_node_dump, brutal_purge, soft_purge, []}
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []}
    ]},
    {"4.3.0", [
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
@@ -13,6 +14,9 @@
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_trie, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []},
      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
      {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
    ]},
@@ -23,7 +27,8 @@
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
-     {load_module, emqx_node_dump, brutal_purge, soft_purge, []}
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []}
    ]},
    {"4.3.0", [
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
@@ -31,9 +36,13 @@
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_trie, brutal_purge, soft_purge, []},
+     {load_module, emqx_cm, brutal_purge, soft_purge, []},
+     {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
+     {load_module, emqx_channel, brutal_purge, soft_purge, []},
      %% Just load the module. We don't need to change the 'messages.retained'
      %% and 'messages.retained' counter type.
-     {load_module, emqx_metrics, brutal_purge, soft_purge, []}
+     {load_module, emqx_metrics, brutal_purge, soft_purge, []},
+     {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
    ]},
 
    {<<".*">>, []}

+ 12 - 1
src/emqx_channel.erl

@@ -49,7 +49,10 @@
         ]).
 
 %% Export for emqx_sn
--export([do_deliver/2]).
+-export([ do_deliver/2
+        , ensure_keepalive/2
+        , clear_keepalive/1
+        ]).
 
 %% Exports for CT
 -export([set_field/3]).
@@ -1562,6 +1565,14 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
     Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
     ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
 
+clear_keepalive(Channel = #channel{timers = Timers}) ->
+    case maps:get(alive_timer, Timers, undefined) of
+        undefined ->
+            Channel;
+        TRef ->
+            emqx_misc:cancel_timer(TRef),
+            Channel#channel{timers = maps:without([alive_timer], Timers)}
+    end.
 %%--------------------------------------------------------------------
 %% Maybe Resume Session