Explorar el Código

refactor(config): change mqtt.session_expiry_interval to ms

Shawn hace 4 años
padre
commit
0704cbc986

+ 3 - 3
apps/emqx/src/emqx_channel.erl

@@ -1118,7 +1118,7 @@ interval(retry_timer, #channel{session = Session}) ->
 interval(await_timer, #channel{session = Session}) ->
     emqx_session:info(await_rel_timeout, Session);
 interval(expire_timer, #channel{conninfo = ConnInfo}) ->
-    timer:seconds(maps:get(expiry_interval, ConnInfo));
+    maps:get(expiry_interval, ConnInfo);
 interval(will_timer, #channel{will_msg = WillMsg}) ->
     timer:seconds(will_delay_interval(WillMsg)).
 
@@ -1176,7 +1176,7 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{
 %% If the Session Expiry Interval is absent the value 0 is used.
 expiry_interval(_, #mqtt_packet_connect{proto_ver  = ?MQTT_PROTO_V5,
                                             properties = ConnProps}) ->
-    emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
+    timer:seconds(emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0));
 expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
     get_mqtt_conf(Zone, session_expiry_interval);
 expiry_interval(_, #mqtt_packet_connect{clean_start = true}) ->
@@ -1615,7 +1615,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
     case maps:get(expiry_interval, ConnInfo) of
         ?UINT_MAX -> {ok, Channel};
         I when I > 0 ->
-            {ok, ensure_timer(expire_timer, timer:seconds(I), Channel)};
+            {ok, ensure_timer(expire_timer, I, Channel)};
         _ -> shutdown(Reason, Channel)
     end.
 

+ 1 - 1
apps/emqx/src/emqx_schema.erl

@@ -291,7 +291,7 @@ fields("mqtt") ->
     , {"retry_interval", t(duration(), undefined, "30s")}
     , {"max_awaiting_rel", maybe_infinity(integer(), 100)}
     , {"await_rel_timeout", t(duration(), undefined, "300s")}
-    , {"session_expiry_interval", t(duration_s(), undefined, "2h")}
+    , {"session_expiry_interval", t(duration(), undefined, "2h")}
     , {"max_mqueue_len", maybe_infinity(range(0, inf), 1000)}
     , {"mqueue_priorities", maybe_disabled(map())}
     , {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)}

+ 1 - 1
apps/emqx/test/emqx_channel_SUITE.erl

@@ -51,7 +51,7 @@ mqtt_conf() ->
     retain_available => true,
     retry_interval => 30000,
     server_keepalive => disabled,
-    session_expiry_interval => 7200,
+    session_expiry_interval => 7200000,
     shared_subscription => true,
     strict_mode => false,
     upgrade_qos => false,

+ 12 - 9
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -590,18 +590,21 @@ print({client, {ClientId, ChanPid}}) ->
     InfoKeys = [clientid, username, peername,
                 clean_start, keepalive, expiry_interval,
                 subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
-                connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of
-                                                            true  -> [disconnected_at];
-                                                            false -> []
-                                                        end,
+                connected, created_at, connected_at] ++
+                case maps:is_key(disconnected_at, Info) of
+                    true  -> [disconnected_at];
+                    false -> []
+                end,
+    Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
     emqx_ctl:print("Client(~s, username=~s, peername=~s, "
                     "clean_start=~s, keepalive=~w, session_expiry_interval=~w, "
                     "subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, "
-                    "connected=~s, created_at=~w, connected_at=~w" ++ case maps:is_key(disconnected_at, Info) of
-                                                                          true  -> ", disconnected_at=~w)~n";
-                                                                          false -> ")~n"
-                                                                      end,
-                    [format(K, maps:get(K, Info)) || K <- InfoKeys]);
+                    "connected=~s, created_at=~w, connected_at=~w" ++
+                    case maps:is_key(disconnected_at, Info1) of
+                        true  -> ", disconnected_at=~w)~n";
+                        false -> ")~n"
+                    end,
+        [format(K, maps:get(K, Info1)) || K <- InfoKeys]);
 
 print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
     emqx_ctl:print("~s -> ~s~n", [Topic, Node]);

+ 1 - 1
apps/emqx_modules/src/emqx_mod_presence.erl

@@ -102,7 +102,7 @@ connected_presence(#{peerhost := PeerHost,
       keepalive => Keepalive,
       connack => 0, %% Deprecated?
       clean_start => CleanStart,
-      expiry_interval => ExpiryInterval,
+      expiry_interval => ExpiryInterval div 1000,
       connected_at => ConnectedAt,
       ts => erlang:system_time(millisecond)
      }.

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -182,7 +182,7 @@ eventmsg_connected(_ClientInfo = #{
           keepalive => Keepalive,
           clean_start => CleanStart,
           receive_maximum => RcvMax,
-          expiry_interval => ExpiryInterval,
+          expiry_interval => ExpiryInterval div 1000,
           is_bridge => IsBridge,
           conn_props => printable_maps(ConnProps),
           connected_at => ConnectedAt